Compare commits

...

29 Commits

Author SHA1 Message Date
Ahmed Ibrahim
082b775a88 revert 2025-09-08 16:50:54 -07:00
Ahmed Ibrahim
e2318f38b2 patch-tools 2025-09-08 15:52:40 -07:00
Ahmed Ibrahim
87a4de05db token count 2025-09-08 15:52:39 -07:00
Ahmed Ibrahim
91036ea7df timestamp 2025-09-08 15:52:39 -07:00
Ahmed Ibrahim
543e22b215 timestamp 2025-09-08 15:51:40 -07:00
Ahmed Ibrahim
2354594eeb rebase 2025-09-08 15:51:40 -07:00
Ahmed Ibrahim
b7c95b57fd rebase 2025-09-08 15:47:56 -07:00
Ahmed Ibrahim
5f18406e8d recorder: document Flush as a channel barrier (OS flush), not fsync; clarify writer ack semantics 2025-09-08 15:46:07 -07:00
Ahmed Ibrahim
e9b78c9296 rebase 2025-09-08 15:46:07 -07:00
Ahmed Ibrahim
2a1499a626 Cleanup: remove accidentally re-added gitlinks under codex-rs/.codex from index again 2025-09-08 15:45:16 -07:00
Ahmed Ibrahim
52a5ea5f4e core: revert fork_conversation to (path, conversation_id); protocol: ConversationPathResponseEvent carries path again; core: emit path in ConversationHistory; recorder: expose path(); tui: backtrack uses path+id; fmt 2025-09-08 15:45:16 -07:00
Ahmed Ibrahim
aa94d9c4b3 Cleanup: remove inadvertently added embedded git directories under codex-rs/.codex from index 2025-09-08 15:43:32 -07:00
Ahmed Ibrahim
c2c7778c38 Rebase: resolve conflicts; use ConversationHistory entries; align fork_conversation API; update protocol and recorder; fix TUI app_backtrack; rustfmt 2025-09-08 15:43:31 -07:00
Ahmed Ibrahim
5f37b45158 tests 2025-09-08 15:42:32 -07:00
Ahmed Ibrahim
d5e2032490 tests 2025-09-08 15:42:32 -07:00
Ahmed Ibrahim
268be2ac52 tests 2025-09-08 15:41:54 -07:00
Ahmed Ibrahim
a045ef05f2 tests 2025-09-08 15:41:54 -07:00
Ahmed Ibrahim
660327aa9c fix 2025-09-08 15:41:54 -07:00
Ahmed Ibrahim
2d976aa667 progress 2025-09-08 15:41:54 -07:00
Ahmed Ibrahim
adf9dae907 progress 2025-09-08 15:41:54 -07:00
Ahmed Ibrahim
4a7b842b53 progress 2025-09-08 15:41:54 -07:00
Ahmed Ibrahim
e7a20f5109 progress 2025-09-08 15:41:02 -07:00
Ahmed Ibrahim
f7671481c1 fix 2025-09-08 15:39:24 -07:00
Ahmed Ibrahim
50e001888b back comp 2025-09-08 15:38:16 -07:00
Ahmed Ibrahim
b48d47de21 back comp 2025-09-08 15:38:16 -07:00
Ahmed Ibrahim
4d5335f797 back comp 2025-09-08 15:38:16 -07:00
Ahmed Ibrahim
0c732e4a53 back comp 2025-09-08 15:36:50 -07:00
Ahmed Ibrahim
777fbba58c progress 2025-09-08 15:33:48 -07:00
Ahmed Ibrahim
40da893c46 progress 2025-09-08 15:32:56 -07:00
13 changed files with 714 additions and 348 deletions

View File

@@ -10,13 +10,15 @@ use std::time::Duration;
use crate::AuthManager;
use crate::event_mapping::map_response_item_to_event_messages;
use crate::rollout::RolloutItem;
use crate::rollout::recorder::RolloutItemSliceExt;
use async_channel::Receiver;
use async_channel::Sender;
use codex_apply_patch::ApplyPatchAction;
use codex_apply_patch::MaybeApplyPatchVerified;
use codex_apply_patch::maybe_parse_apply_patch_verified;
use codex_protocol::mcp_protocol::ConversationId;
use codex_protocol::protocol::ConversationHistoryResponseEvent;
use codex_protocol::protocol::ConversationPathResponseEvent;
use codex_protocol::protocol::TaskStartedEvent;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnAbortedEvent;
@@ -103,7 +105,6 @@ use crate::protocol::TokenUsageInfo;
use crate::protocol::TurnDiffEvent;
use crate::protocol::WebSearchBeginEvent;
use crate::rollout::RolloutRecorder;
use crate::rollout::RolloutRecorderParams;
use crate::safety::SafetyCheck;
use crate::safety::assess_command_safety;
use crate::safety::assess_safety_for_untrusted_command;
@@ -203,9 +204,6 @@ impl Codex {
error!("Failed to create session: {e:#}");
CodexErr::InternalAgentDied
})?;
session
.record_initial_history(&turn_context, conversation_history)
.await;
let conversation_id = session.conversation_id;
// This task will run until Op::Shutdown is received.
@@ -379,18 +377,9 @@ impl Session {
return Err(anyhow::anyhow!("cwd is not absolute: {cwd:?}"));
}
let (conversation_id, rollout_params) = match &initial_history {
InitialHistory::New | InitialHistory::Forked(_) => {
let conversation_id = ConversationId::default();
(
conversation_id,
RolloutRecorderParams::new(conversation_id, user_instructions.clone()),
)
}
InitialHistory::Resumed(resumed_history) => (
resumed_history.conversation_id,
RolloutRecorderParams::resume(resumed_history.rollout_path.clone()),
),
let conversation_id = match &initial_history {
InitialHistory::New | InitialHistory::Forked(_) => ConversationId::default(),
InitialHistory::Resumed(resumed_history) => resumed_history.conversation_id,
};
// Error messages to dispatch after SessionConfigured is sent.
@@ -402,7 +391,7 @@ impl Session {
// - spin up MCP connection manager
// - perform default shell discovery
// - load history metadata
let rollout_fut = RolloutRecorder::new(&config, rollout_params);
let rollout_fut = RolloutRecorder::new(&config, conversation_id, user_instructions.clone());
let mcp_fut = McpConnectionManager::new(config.mcp_servers.clone());
let default_shell_fut = shell::default_user_shell();
@@ -492,13 +481,10 @@ impl Session {
// Dispatch the SessionConfiguredEvent first and then report any errors.
// If resuming, include converted initial messages in the payload so UIs can render them immediately.
let initial_messages = match &initial_history {
InitialHistory::New => None,
InitialHistory::Forked(items) => Some(sess.build_initial_messages(items)),
InitialHistory::Resumed(resumed_history) => {
Some(sess.build_initial_messages(&resumed_history.history))
}
};
let initial_messages = Some(
sess.apply_initial_history(&turn_context, initial_history.clone())
.await,
);
let events = std::iter::once(Event {
id: INITIAL_SUBMIT_ID.to_owned(),
@@ -512,9 +498,7 @@ impl Session {
})
.chain(post_session_configured_error_events.into_iter());
for event in events {
if let Err(e) = tx_event.send(event).await {
error!("failed to send event: {e:?}");
}
sess.send_event(event).await;
}
Ok((sess, turn_context))
@@ -537,26 +521,31 @@ impl Session {
}
}
async fn record_initial_history(
async fn apply_initial_history(
&self,
turn_context: &TurnContext,
conversation_history: InitialHistory,
) {
) -> Vec<EventMsg> {
match conversation_history {
InitialHistory::New => {
self.record_initial_history_new(turn_context).await;
}
InitialHistory::New => self.record_initial_history_new(turn_context).await,
InitialHistory::Forked(items) => {
self.record_initial_history_from_items(items).await;
self.record_conversation_items_internal(&items, true).await;
items
.into_iter()
.flat_map(|ri| {
map_response_item_to_event_messages(&ri, self.show_raw_agent_reasoning)
})
.filter(|m| matches!(m, EventMsg::UserMessage(_)))
.collect()
}
InitialHistory::Resumed(resumed_history) => {
self.record_initial_history_from_items(resumed_history.history)
.await;
self.record_initial_history_resumed(resumed_history.history)
.await
}
}
}
async fn record_initial_history_new(&self, turn_context: &TurnContext) {
async fn record_initial_history_new(&self, turn_context: &TurnContext) -> Vec<EventMsg> {
// record the initial user instructions and environment context,
// regardless of whether we restored items.
// TODO: Those items shouldn't be "user messages" IMO. Maybe developer messages.
@@ -570,29 +559,44 @@ impl Session {
Some(turn_context.sandbox_policy.clone()),
Some(self.user_shell.clone()),
)));
self.record_conversation_items(&conversation_items).await;
for item in conversation_items {
self.record_conversation_item(item).await;
}
vec![]
}
async fn record_initial_history_from_items(&self, items: Vec<ResponseItem>) {
self.record_conversation_items_internal(&items, false).await;
}
/// build the initial messages vector for SessionConfigured by converting
/// ResponseItems into EventMsg.
fn build_initial_messages(&self, items: &[ResponseItem]) -> Vec<EventMsg> {
items
.iter()
.flat_map(|item| {
map_response_item_to_event_messages(item, self.show_raw_agent_reasoning)
})
.collect()
async fn record_initial_history_resumed(&self, items: Vec<RolloutItem>) -> Vec<EventMsg> {
// Record transcript (without persisting again)
let responses: Vec<ResponseItem> = items.as_slice().get_response_items();
if !responses.is_empty() {
self.record_conversation_items_internal(&responses, true)
.await;
}
items.as_slice().get_events()
}
/// Sends the given event to the client and swallows the send event, if
/// any, logging it as an error.
/// Sends the given event to the client and records it to the rollout (if enabled).
/// Any send/record errors are logged and swallowed.
pub(crate) async fn send_event(&self, event: Event) {
let event_to_record = event.clone();
if let Err(e) = self.tx_event.send(event).await {
error!("failed to send tool call event: {e}");
error!("failed to send event: {e}");
}
let recorder = {
let guard = self.rollout.lock_unchecked();
guard.as_ref().cloned()
};
if let Some(rec) = recorder
&& let Err(e) = rec
.record_items(crate::rollout::RolloutItem::Event(event_to_record))
.await
{
error!("failed to record rollout event: {e:#}");
}
}
@@ -624,7 +628,7 @@ impl Session {
reason,
}),
};
let _ = self.tx_event.send(event).await;
self.send_event(event).await;
rx_approve
}
@@ -656,7 +660,7 @@ impl Session {
grant_root,
}),
};
let _ = self.tx_event.send(event).await;
self.send_event(event).await;
rx_approve
}
@@ -686,30 +690,55 @@ impl Session {
self.record_conversation_items_internal(items, true).await;
}
async fn record_conversation_item(&self, item: ResponseItem) {
let items = [item];
self.record_conversation_items_internal(&items, true).await;
}
async fn record_conversation_items_internal(&self, items: &[ResponseItem], persist: bool) {
debug!("Recording items for conversation: {items:?}");
if persist {
self.record_state_snapshot(items).await;
// Record snapshot of these items into rollout
for item in items {
self.record_state_snapshot(RolloutItem::ResponseItem(item.clone()))
.await;
}
}
self.state.lock_unchecked().history.record_items(items);
}
async fn record_state_snapshot(&self, items: &[ResponseItem]) {
let snapshot = { crate::rollout::SessionStateSnapshot {} };
async fn record_state_snapshot(&self, item: RolloutItem) {
let recorder = {
let guard = self.rollout.lock_unchecked();
guard.as_ref().cloned()
};
if let Some(rec) = recorder {
if let Err(e) = rec.record_state(snapshot).await {
error!("failed to record rollout state: {e:#}");
}
if let Err(e) = rec.record_items(items).await {
error!("failed to record rollout items: {e:#}");
}
if let Some(rec) = recorder
&& let Err(e) = rec.record_items(item).await
{
error!("failed to record rollout items: {e:#}");
}
}
/// Records a user input into conversation history AND a corresponding UserMessage event in rollout.
/// Does not send events to the UI.
async fn record_user_input(&self, sub_id: &str, response_item: ResponseItem) {
// Record the message/tool input in conversation history/rollout state
self.record_conversation_item(response_item.clone()).await;
// Derive and record a UserMessage event alongside it in the rollout
let user_events =
map_response_item_to_event_messages(&response_item, self.show_raw_agent_reasoning)
.into_iter()
.filter(|m| matches!(m, EventMsg::UserMessage(_)));
for msg in user_events {
let event = Event {
id: sub_id.to_string(),
msg,
};
self.record_state_snapshot(RolloutItem::Event(event)).await;
}
}
@@ -752,7 +781,7 @@ impl Session {
id: sub_id.to_string(),
msg,
};
let _ = self.tx_event.send(event).await;
self.send_event(event).await;
}
async fn on_exec_command_end(
@@ -799,7 +828,7 @@ impl Session {
id: sub_id.to_string(),
msg,
};
let _ = self.tx_event.send(event).await;
self.send_event(event).await;
// If this is an apply_patch, after we emit the end patch, emit a second event
// with the full turn diff if there is one.
@@ -811,7 +840,7 @@ impl Session {
id: sub_id.into(),
msg,
};
let _ = self.tx_event.send(event).await;
self.send_event(event).await;
}
}
}
@@ -877,7 +906,7 @@ impl Session {
message: message.into(),
}),
};
let _ = self.tx_event.send(event).await;
self.send_event(event).await;
}
async fn notify_stream_error(&self, sub_id: &str, message: impl Into<String>) {
@@ -887,7 +916,7 @@ impl Session {
message: message.into(),
}),
};
let _ = self.tx_event.send(event).await;
self.send_event(event).await;
}
/// Build the full turn input by concatenating the current conversation
@@ -1050,9 +1079,9 @@ impl AgentTask {
id: self.sub_id,
msg: EventMsg::TurnAborted(TurnAbortedEvent { reason }),
};
let tx_event = self.sess.tx_event.clone();
let sess = self.sess.clone();
tokio::spawn(async move {
tx_event.send(event).await.ok();
sess.send_event(event).await;
});
}
}
@@ -1148,13 +1177,13 @@ async fn submission_loop(
// Install the new persistent context for subsequent tasks/turns.
turn_context = Arc::new(new_turn_context);
if cwd.is_some() || approval_policy.is_some() || sandbox_policy.is_some() {
sess.record_conversation_items(&[ResponseItem::from(EnvironmentContext::new(
sess.record_conversation_item(ResponseItem::from(EnvironmentContext::new(
cwd,
approval_policy,
sandbox_policy,
// Shell is not configurable from turn to turn
None,
))])
)))
.await;
}
}
@@ -1257,7 +1286,7 @@ async fn submission_loop(
Op::GetHistoryEntryRequest { offset, log_id } => {
let config = config.clone();
let tx_event = sess.tx_event.clone();
let sess_for_spawn = sess.clone();
let sub_id = sub.id.clone();
tokio::spawn(async move {
@@ -1285,13 +1314,10 @@ async fn submission_loop(
),
};
if let Err(e) = tx_event.send(event).await {
warn!("failed to send GetHistoryEntryResponse event: {e}");
}
sess_for_spawn.send_event(event).await;
});
}
Op::ListMcpTools => {
let tx_event = sess.tx_event.clone();
let sub_id = sub.id.clone();
// This is a cheap lookup from the connection manager's cache.
@@ -1302,12 +1328,9 @@ async fn submission_loop(
crate::protocol::McpListToolsResponseEvent { tools },
),
};
if let Err(e) = tx_event.send(event).await {
warn!("failed to send McpListToolsResponse event: {e}");
}
sess.send_event(event).await;
}
Op::ListCustomPrompts => {
let tx_event = sess.tx_event.clone();
let sub_id = sub.id.clone();
let custom_prompts: Vec<CustomPrompt> =
@@ -1323,9 +1346,7 @@ async fn submission_loop(
custom_prompts,
}),
};
if let Err(e) = tx_event.send(event).await {
warn!("failed to send ListCustomPromptsResponse event: {e}");
}
sess.send_event(event).await;
}
Op::Compact => {
// Create a summarization request as user input
@@ -1361,34 +1382,36 @@ async fn submission_loop(
message: "Failed to shutdown rollout recorder".to_string(),
}),
};
if let Err(e) = sess.tx_event.send(event).await {
warn!("failed to send error message: {e:?}");
}
sess.send_event(event).await;
}
let event = Event {
id: sub.id.clone(),
msg: EventMsg::ShutdownComplete,
};
if let Err(e) = sess.tx_event.send(event).await {
warn!("failed to send Shutdown event: {e}");
}
sess.send_event(event).await;
break;
}
Op::GetHistory => {
let tx_event = sess.tx_event.clone();
Op::GetConversationPath => {
let sub_id = sub.id.clone();
// Ensure rollout file is flushed so consumers can read it immediately.
let rec_opt = { sess.rollout.lock_unchecked().as_ref().cloned() };
if let Some(rec) = rec_opt {
let _ = rec.flush().await;
}
let event = Event {
id: sub_id.clone(),
msg: EventMsg::ConversationHistory(ConversationHistoryResponseEvent {
msg: EventMsg::ConversationHistory(ConversationPathResponseEvent {
conversation_id: sess.conversation_id,
entries: sess.state.lock_unchecked().history.contents(),
path: sess
.rollout
.lock_unchecked()
.as_ref()
.map(|r| r.path().to_path_buf())
.unwrap_or_default(),
}),
};
if let Err(e) = tx_event.send(event).await {
warn!("failed to send ConversationHistory event: {e}");
}
sess.send_event(event).await;
}
_ => {
// Ignore unknown ops; enum is non_exhaustive to allow extensions.
@@ -1420,19 +1443,17 @@ async fn run_task(
if input.is_empty() {
return;
}
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
// Record the user's input and corresponding event into the rollout
let user_input_response: ResponseItem = ResponseItem::from(initial_input_for_turn.clone());
sess.record_user_input(&sub_id, user_input_response).await;
let event = Event {
id: sub_id.clone(),
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: turn_context.client.get_model_context_window(),
}),
};
if sess.tx_event.send(event).await.is_err() {
return;
}
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
sess.record_conversation_items(&[initial_input_for_turn.clone().into()])
.await;
sess.send_event(event).await;
let mut last_agent_message: Option<String> = None;
// Although from the perspective of codex.rs, TurnDiffTracker has the lifecycle of a Task which contains
@@ -1448,7 +1469,9 @@ async fn run_task(
.into_iter()
.map(ResponseItem::from)
.collect::<Vec<ResponseItem>>();
sess.record_conversation_items(&pending_input).await;
for item in pending_input.iter() {
sess.record_user_input(&sub_id, item.clone()).await;
}
// Construct the input that we will send to the model. When using the
// Chat completions API (or ZDR clients), the model needs the full
@@ -1575,8 +1598,9 @@ async fn run_task(
// Only attempt to take the lock if there is something to record.
if !items_to_record_in_conversation_history.is_empty() {
sess.record_conversation_items(&items_to_record_in_conversation_history)
.await;
for item in items_to_record_in_conversation_history.iter().cloned() {
sess.record_conversation_item(item).await;
}
}
if responses.is_empty() {
@@ -1600,7 +1624,7 @@ async fn run_task(
message: e.to_string(),
}),
};
sess.tx_event.send(event).await.ok();
sess.send_event(event).await;
// let the user continue the conversation
break;
}
@@ -1611,7 +1635,7 @@ async fn run_task(
id: sub_id,
msg: EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }),
};
sess.tx_event.send(event).await.ok();
sess.send_event(event).await;
}
async fn run_turn(
@@ -1787,13 +1811,11 @@ async fn try_run_turn(
output.push(ProcessedResponseItem { item, response });
}
ResponseEvent::WebSearchCallBegin { call_id } => {
let _ = sess
.tx_event
.send(Event {
id: sub_id.to_string(),
msg: EventMsg::WebSearchBegin(WebSearchBeginEvent { call_id }),
})
.await;
sess.send_event(Event {
id: sub_id.to_string(),
msg: EventMsg::WebSearchBegin(WebSearchBeginEvent { call_id }),
})
.await;
}
ResponseEvent::Completed {
response_id: _,
@@ -1809,13 +1831,11 @@ async fn try_run_turn(
st.token_info = info.clone();
info
};
sess.tx_event
.send(Event {
id: sub_id.to_string(),
msg: EventMsg::TokenCount(crate::protocol::TokenCountEvent { info }),
})
.await
.ok();
sess.send_event(Event {
id: sub_id.to_string(),
msg: EventMsg::TokenCount(crate::protocol::TokenCountEvent { info }),
})
.await;
let unified_diff = turn_diff_tracker.get_unified_diff();
if let Ok(Some(unified_diff)) = unified_diff {
@@ -1824,7 +1844,7 @@ async fn try_run_turn(
id: sub_id.to_string(),
msg,
};
let _ = sess.tx_event.send(event).await;
sess.send_event(event).await;
}
return Ok(output);
@@ -1834,21 +1854,21 @@ async fn try_run_turn(
id: sub_id.to_string(),
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }),
};
sess.tx_event.send(event).await.ok();
sess.send_event(event).await;
}
ResponseEvent::ReasoningSummaryDelta(delta) => {
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }),
};
sess.tx_event.send(event).await.ok();
sess.send_event(event).await;
}
ResponseEvent::ReasoningSummaryPartAdded => {
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentReasoningSectionBreak(AgentReasoningSectionBreakEvent {}),
};
sess.tx_event.send(event).await.ok();
sess.send_event(event).await;
}
ResponseEvent::ReasoningContentDelta(delta) => {
if sess.show_raw_agent_reasoning {
@@ -1858,7 +1878,7 @@ async fn try_run_turn(
AgentReasoningRawContentDeltaEvent { delta },
),
};
sess.tx_event.send(event).await.ok();
sess.send_event(event).await;
}
}
}
@@ -1879,9 +1899,7 @@ async fn run_compact_task(
model_context_window,
}),
};
if sess.tx_event.send(start_event).await.is_err() {
return;
}
sess.send_event(start_event).await;
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
let turn_input: Vec<ResponseItem> =
@@ -2059,7 +2077,7 @@ async fn handle_response_item(
id: sub_id.to_string(),
msg,
};
sess.tx_event.send(event).await.ok();
sess.send_event(event).await;
}
None
}
@@ -2892,13 +2910,11 @@ async fn drain_to_completed(
info
};
sess.tx_event
.send(Event {
id: sub_id.to_string(),
msg: EventMsg::TokenCount(crate::protocol::TokenCountEvent { info }),
})
.await
.ok();
sess.send_event(Event {
id: sub_id.to_string(),
msg: EventMsg::TokenCount(crate::protocol::TokenCountEvent { info }),
})
.await;
return Ok(());
}

View File

@@ -10,7 +10,9 @@ use crate::error::Result as CodexResult;
use crate::protocol::Event;
use crate::protocol::EventMsg;
use crate::protocol::SessionConfiguredEvent;
use crate::rollout::RolloutItem;
use crate::rollout::RolloutRecorder;
use crate::rollout::recorder::RolloutItemSliceExt;
use codex_protocol::mcp_protocol::ConversationId;
use codex_protocol::models::ResponseItem;
use std::collections::HashMap;
@@ -18,20 +20,55 @@ use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone)]
pub struct ResumedHistory {
pub conversation_id: ConversationId,
pub history: Vec<ResponseItem>,
pub history: Vec<RolloutItem>,
pub rollout_path: PathBuf,
}
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone)]
pub enum InitialHistory {
New,
Resumed(ResumedHistory),
Forked(Vec<ResponseItem>),
}
impl PartialEq for InitialHistory {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(InitialHistory::New, InitialHistory::New) => true,
(InitialHistory::Forked(a), InitialHistory::Forked(b)) => a == b,
(InitialHistory::Resumed(_), InitialHistory::Resumed(_)) => true,
_ => false,
}
}
}
impl InitialHistory {
/// Return all response items contained in this initial history.
pub fn get_response_items(&self) -> Vec<ResponseItem> {
match self {
InitialHistory::New => Vec::new(),
InitialHistory::Forked(_) => Vec::new(),
InitialHistory::Resumed(items) => {
<[_] as RolloutItemSliceExt>::get_response_items(items.history.as_slice())
}
}
}
/// Return all events contained in this initial history.
pub fn get_events(&self) -> Vec<crate::protocol::EventMsg> {
match self {
InitialHistory::New => Vec::new(),
InitialHistory::Forked(_) => Vec::new(),
InitialHistory::Resumed(items) => {
<[_] as RolloutItemSliceExt>::get_events(items.history.as_slice())
}
}
}
}
/// Represents a newly created Codex conversation, including the first event
/// (which is [`EventMsg::SessionConfigured`]).
pub struct NewConversation {
@@ -155,21 +192,43 @@ impl ConversationManager {
/// caller's `config`). The new conversation will have a fresh id.
pub async fn fork_conversation(
&self,
conversation_history: Vec<ResponseItem>,
base_rollout_path: PathBuf,
_base_conversation_id: ConversationId,
num_messages_to_drop: usize,
config: Config,
) -> CodexResult<NewConversation> {
// Compute the prefix up to the cut point.
let history =
truncate_after_dropping_last_messages(conversation_history, num_messages_to_drop);
// Read prior responses from the rollout file (tolerate both tagged and legacy formats).
let text = tokio::fs::read_to_string(&base_rollout_path)
.await
.map_err(|e| CodexErr::Io(std::io::Error::other(format!("read rollout: {e}"))))?;
let mut responses: Vec<ResponseItem> = Vec::new();
for line in text.lines() {
if line.trim().is_empty() {
continue;
}
let v: serde_json::Value = match serde_json::from_str(line) {
Ok(v) => v,
Err(_) => continue,
};
// Only consider response items (legacy lines have no record_type)
match v.get("record_type").and_then(|s| s.as_str()) {
Some("response") | None => {
if let Ok(item) = serde_json::from_value::<ResponseItem>(v) {
responses.push(item);
}
}
_ => {}
}
}
let kept = truncate_after_dropping_last_messages(responses, num_messages_to_drop);
// Spawn a new conversation with the computed initial history.
let auth_manager = self.auth_manager.clone();
let CodexSpawnOk {
codex,
conversation_id,
} = Codex::spawn(config, auth_manager, history).await?;
} = Codex::spawn(config, auth_manager, kept).await?;
self.finalize_spawn(codex, conversation_id).await
}
}
@@ -262,6 +321,6 @@ mod tests {
);
let truncated2 = truncate_after_dropping_last_messages(items, 2);
assert_eq!(truncated2, InitialHistory::New);
assert!(matches!(truncated2, InitialHistory::New));
}
}

View File

@@ -10,6 +10,7 @@ use time::macros::format_description;
use uuid::Uuid;
use super::SESSIONS_SUBDIR;
use super::recorder::SessionMetaWithGit;
/// Returned page of conversation summaries.
#[derive(Debug, Default, PartialEq)]
@@ -170,7 +171,9 @@ async fn traverse_directories_for_paths(
let head = read_first_jsonl_records(&path, HEAD_RECORD_LIMIT)
.await
.unwrap_or_default();
items.push(ConversationItem { path, head });
if should_include_session(&head) {
items.push(ConversationItem { path, head });
}
}
}
}
@@ -296,3 +299,37 @@ async fn read_first_jsonl_records(
}
Ok(head)
}
/// Return true if this conversation should be included in the listing.
///
/// Current rule: include only when the first JSON object is a session meta record
/// (i.e., has `{"record_type": "session_meta", ...}`), which is how rollout
/// files are written. Empty or malformed heads are excluded.
fn should_include_session(head: &[serde_json::Value]) -> bool {
let Some(first) = head.first() else {
return false;
};
passes_session_meta_filter(first)
}
/// Validate that the first record is a fullyformed session meta line.
///
/// Requirements:
/// - `record_type == "session_meta"`
/// - Remaining fields (after removing `record_type`) deserialize into
/// `SessionMetaWithGit`.
fn passes_session_meta_filter(first: &serde_json::Value) -> bool {
let Some(obj) = first.as_object() else {
return false;
};
let record_type = obj.get("record_type").and_then(|v| v.as_str());
if record_type != Some("session_meta") {
return false;
}
// Remove the marker field and validate the remainder matches SessionMetaWithGit
let mut cleaned = obj.clone();
cleaned.remove("record_type");
let val = serde_json::Value::Object(cleaned);
serde_json::from_value::<SessionMetaWithGit>(val).is_ok()
}

View File

@@ -6,8 +6,8 @@ pub mod list;
pub(crate) mod policy;
pub mod recorder;
pub use recorder::RolloutItem;
pub use recorder::RolloutRecorder;
pub use recorder::RolloutRecorderParams;
pub use recorder::SessionMeta;
pub use recorder::SessionStateSnapshot;

View File

@@ -1,4 +1,6 @@
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
/// Whether a `ResponseItem` should be persisted in rollout files.
#[inline]
@@ -14,3 +16,42 @@ pub(crate) fn is_persisted_response_item(item: &ResponseItem) -> bool {
ResponseItem::WebSearchCall { .. } | ResponseItem::Other => false,
}
}
pub(crate) fn is_persisted_event(event: &Event) -> bool {
match event.msg {
EventMsg::ExecApprovalRequest(_)
| EventMsg::ApplyPatchApprovalRequest(_)
| EventMsg::AgentReasoningDelta(_)
| EventMsg::AgentReasoningRawContentDelta(_)
| EventMsg::ExecCommandOutputDelta(_)
| EventMsg::GetHistoryEntryResponse(_)
| EventMsg::AgentMessageDelta(_)
| EventMsg::TaskStarted(_)
| EventMsg::TaskComplete(_)
| EventMsg::McpToolCallBegin(_)
| EventMsg::McpToolCallEnd(_)
| EventMsg::WebSearchBegin(_)
| EventMsg::WebSearchEnd(_)
| EventMsg::ExecCommandBegin(_)
| EventMsg::ExecCommandEnd(_)
| EventMsg::PatchApplyBegin(_)
| EventMsg::PatchApplyEnd(_)
| EventMsg::TurnDiff(_)
| EventMsg::BackgroundEvent(_)
| EventMsg::McpListToolsResponse(_)
| EventMsg::ListCustomPromptsResponse(_)
| EventMsg::ShutdownComplete
| EventMsg::ConversationHistory(_)
| EventMsg::PlanUpdate(_)
| EventMsg::TurnAborted(_)
| EventMsg::StreamError(_)
| EventMsg::Error(_)
| EventMsg::AgentReasoningSectionBreak(_)
| EventMsg::SessionConfigured(_) => false,
EventMsg::UserMessage(_)
| EventMsg::AgentMessage(_)
| EventMsg::AgentReasoning(_)
| EventMsg::AgentReasoningRawContent(_)
| EventMsg::TokenCount(_) => true,
}
}

View File

@@ -7,9 +7,10 @@ use std::path::Path;
use std::path::PathBuf;
use codex_protocol::mcp_protocol::ConversationId;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value;
use time::OffsetDateTime;
use time::format_description::FormatItem;
use time::macros::format_description;
@@ -30,24 +31,29 @@ use crate::conversation_manager::InitialHistory;
use crate::conversation_manager::ResumedHistory;
use crate::git_info::GitInfo;
use crate::git_info::collect_git_info;
use crate::rollout::policy::is_persisted_event;
use codex_protocol::models::ResponseItem;
#[derive(Serialize, Deserialize, Clone, Default)]
#[derive(Serialize, Deserialize, Clone, Default, Debug)]
pub struct SessionMeta {
pub id: ConversationId,
pub timestamp: String,
pub cwd: String,
pub originator: String,
pub cli_version: String,
pub instructions: Option<String>,
}
#[derive(Serialize)]
struct SessionMetaWithGit {
// SessionMetaWithGit is used in writes and reads; ensure it implements Debug.
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct SessionMetaWithGit {
#[serde(flatten)]
meta: SessionMeta,
#[serde(skip_serializing_if = "Option::is_none")]
git: Option<GitInfo>,
}
#[derive(Serialize, Deserialize, Default, Clone)]
#[derive(Serialize, Deserialize, Default, Clone, Debug)]
pub struct SessionStateSnapshot {}
#[derive(Serialize, Deserialize, Default, Clone)]
@@ -72,39 +78,98 @@ pub struct SavedSession {
#[derive(Clone)]
pub struct RolloutRecorder {
tx: Sender<RolloutCmd>,
path: PathBuf,
}
#[derive(Clone)]
pub enum RolloutRecorderParams {
Create {
conversation_id: ConversationId,
instructions: Option<String>,
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(tag = "record_type", rename_all = "snake_case")]
enum TaggedLine {
Response {
#[serde(flatten)]
item: ResponseItem,
},
Resume {
path: PathBuf,
Event {
#[serde(flatten)]
event: Event,
},
SessionMeta {
#[serde(flatten)]
meta: SessionMetaWithGit,
},
PrevSessionMeta {
#[serde(flatten)]
meta: SessionMetaWithGit,
},
State {
#[serde(flatten)]
state: SessionStateSnapshot,
},
}
#[derive(Serialize, Deserialize, Debug, Clone)]
struct TimestampedLine {
timestamp: String,
#[serde(flatten)]
record: TaggedLine,
}
#[derive(Debug, Clone)]
pub enum RolloutItem {
ResponseItem(ResponseItem),
Event(Event),
SessionMeta(SessionMetaWithGit),
}
impl From<ResponseItem> for RolloutItem {
fn from(item: ResponseItem) -> Self {
RolloutItem::ResponseItem(item)
}
}
impl From<Event> for RolloutItem {
fn from(event: Event) -> Self {
RolloutItem::Event(event)
}
}
/// Convenience helpers to extract typed items from a list of rollout items.
pub trait RolloutItemSliceExt {
fn get_response_items(&self) -> Vec<ResponseItem>;
fn get_events(&self) -> Vec<EventMsg>;
}
impl RolloutItemSliceExt for [RolloutItem] {
fn get_response_items(&self) -> Vec<ResponseItem> {
self.iter()
.filter_map(|it| match it {
RolloutItem::ResponseItem(ri) => Some(ri.clone()),
_ => None,
})
.collect()
}
fn get_events(&self) -> Vec<EventMsg> {
self.iter()
.filter_map(|it| match it {
RolloutItem::Event(ev) => Some(ev.msg.clone()),
_ => None,
})
.collect()
}
}
enum RolloutCmd {
AddItems(Vec<ResponseItem>),
UpdateState(SessionStateSnapshot),
AddResponseItems(Vec<ResponseItem>),
AddEvents(Vec<Event>),
AddSessionMeta(SessionMetaWithGit),
Flush { ack: oneshot::Sender<()> },
Shutdown { ack: oneshot::Sender<()> },
}
impl RolloutRecorderParams {
pub fn new(conversation_id: ConversationId, instructions: Option<String>) -> Self {
Self::Create {
conversation_id,
instructions,
}
}
pub fn resume(path: PathBuf) -> Self {
Self::Resume { path }
}
}
impl RolloutRecorder {
pub fn path(&self) -> &Path {
&self.path
}
#[allow(dead_code)]
/// List conversations (rollout files) under the provided Codex home directory.
pub async fn list_conversations(
@@ -118,84 +183,100 @@ impl RolloutRecorder {
/// Attempt to create a new [`RolloutRecorder`]. If the sessions directory
/// cannot be created or the rollout file cannot be opened we return the
/// error so the caller can decide whether to disable persistence.
pub async fn new(config: &Config, params: RolloutRecorderParams) -> std::io::Result<Self> {
let (file, meta) = match params {
RolloutRecorderParams::Create {
conversation_id,
pub async fn new(
config: &Config,
conversation_id: ConversationId,
instructions: Option<String>,
) -> std::io::Result<Self> {
let LogFileInfo {
file,
conversation_id: session_id,
timestamp,
path,
} = create_log_file(config, conversation_id)?;
let timestamp_format: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]:[minute]:[second]Z");
let timestamp = timestamp
.to_offset(time::UtcOffset::UTC)
.format(timestamp_format)
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
let cwd = config.cwd.to_path_buf();
let (tx, rx) = mpsc::channel(100);
tokio::task::spawn(rollout_writer(
tokio::fs::File::from_std(file),
rx,
Some(SessionMeta {
timestamp,
id: session_id,
cwd: config.cwd.to_string_lossy().to_string(),
originator: config.responses_originator_header.clone(),
cli_version: env!("CARGO_PKG_VERSION").to_string(),
instructions,
} => {
let LogFileInfo {
file,
conversation_id: session_id,
timestamp,
} = create_log_file(config, conversation_id)?;
}),
cwd,
));
let timestamp_format: &[FormatItem] = format_description!(
"[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z"
);
let timestamp = timestamp
.to_offset(time::UtcOffset::UTC)
.format(timestamp_format)
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
(
tokio::fs::File::from_std(file),
Some(SessionMeta {
timestamp,
id: session_id,
instructions,
}),
)
}
RolloutRecorderParams::Resume { path } => (
tokio::fs::OpenOptions::new()
.append(true)
.open(path)
.await?,
None,
),
};
// Clone the cwd for the spawned task to collect git info asynchronously
let cwd = config.cwd.clone();
// A reasonably-sized bounded channel. If the buffer fills up the send
// future will yield, which is fine we only need to ensure we do not
// perform *blocking* I/O on the caller's thread.
let (tx, rx) = mpsc::channel::<RolloutCmd>(256);
// Spawn a Tokio task that owns the file handle and performs async
// writes. Using `tokio::fs::File` keeps everything on the async I/O
// driver instead of blocking the runtime.
tokio::task::spawn(rollout_writer(file, rx, meta, cwd));
Ok(Self { tx })
Ok(Self { tx, path })
}
pub(crate) async fn record_items(&self, items: &[ResponseItem]) -> std::io::Result<()> {
let mut filtered = Vec::new();
for item in items {
// Note that function calls may look a bit strange if they are
// "fully qualified MCP tool calls," so we could consider
// reformatting them in that case.
if is_persisted_response_item(item) {
filtered.push(item.clone());
}
pub(crate) async fn record_items(&self, item: RolloutItem) -> std::io::Result<()> {
match item {
RolloutItem::ResponseItem(item) => self.record_response_item(&item).await,
RolloutItem::Event(event) => self.record_event(&event).await,
RolloutItem::SessionMeta(meta) => self.record_session_meta(&meta).await,
}
if filtered.is_empty() {
}
/// Ensure all writes up to this point have been processed by the writer task.
///
/// This is a sequencing barrier for readers that plan to open and read the
/// rollout file immediately after calling this method. The background writer
/// processes the channel serially; when it dequeues `Flush`, all prior
/// `AddResponseItems`/`AddEvents`/`AddSessionMeta` have already been written
/// via `write_line`, which calls `file.flush()` (OSbuffer flush).
pub async fn flush(&self) -> std::io::Result<()> {
let (tx_done, rx_done) = oneshot::channel();
self.tx
.send(RolloutCmd::Flush { ack: tx_done })
.await
.map_err(|e| IoError::other(format!("failed to queue rollout flush: {e}")))?;
rx_done
.await
.map_err(|e| IoError::other(format!("failed waiting for rollout flush: {e}")))
}
async fn record_response_item(&self, item: &ResponseItem) -> std::io::Result<()> {
// Note that function calls may look a bit strange if they are
// "fully qualified MCP tool calls," so we could consider
// reformatting them in that case.
if !is_persisted_response_item(item) {
return Ok(());
}
self.tx
.send(RolloutCmd::AddItems(filtered))
.send(RolloutCmd::AddResponseItems(vec![item.clone()]))
.await
.map_err(|e| IoError::other(format!("failed to queue rollout items: {e}")))
}
pub(crate) async fn record_state(&self, state: SessionStateSnapshot) -> std::io::Result<()> {
async fn record_event(&self, event: &Event) -> std::io::Result<()> {
if !is_persisted_event(event) {
return Ok(());
}
self.tx
.send(RolloutCmd::UpdateState(state))
.send(RolloutCmd::AddEvents(vec![event.clone()]))
.await
.map_err(|e| IoError::other(format!("failed to queue rollout state: {e}")))
.map_err(|e| IoError::other(format!("failed to queue rollout event: {e}")))
}
async fn record_session_meta(&self, meta: &SessionMetaWithGit) -> std::io::Result<()> {
self.tx
.send(RolloutCmd::AddSessionMeta(meta.clone()))
.await
.map_err(|e| IoError::other(format!("failed to queue rollout session meta: {e}")))
}
pub async fn get_rollout_history(path: &Path) -> std::io::Result<InitialHistory> {
@@ -206,46 +287,53 @@ impl RolloutRecorder {
let first_line = lines
.next()
.ok_or_else(|| IoError::other("empty session file"))?;
let conversation_id = match serde_json::from_str::<SessionMeta>(first_line) {
Ok(rollout_session_meta) => {
tracing::error!(
"Parsed conversation ID from rollout file: {:?}",
rollout_session_meta.id
);
Some(rollout_session_meta.id)
}
Err(e) => {
return Err(IoError::other(format!(
"failed to parse first line of rollout file as SessionMeta: {e}"
)));
}
let conversation_id = if let Ok(TimestampedLine {
record: TaggedLine::SessionMeta { meta },
..
}) = serde_json::from_str::<TimestampedLine>(first_line)
{
Some(meta.meta.id)
} else if let Ok(meta) = serde_json::from_str::<SessionMetaWithGit>(first_line) {
Some(meta.meta.id)
} else if let Ok(meta) = serde_json::from_str::<SessionMeta>(first_line) {
Some(meta.id)
} else {
return Err(IoError::other(
"failed to parse first line of rollout file as SessionMeta",
));
};
let mut items = Vec::new();
let mut items: Vec<RolloutItem> = Vec::new();
for line in lines {
if line.trim().is_empty() {
continue;
}
let v: Value = match serde_json::from_str(line) {
Ok(v) => v,
Err(_) => continue,
};
if v.get("record_type")
.and_then(|rt| rt.as_str())
.map(|s| s == "state")
.unwrap_or(false)
{
continue;
}
match serde_json::from_value::<ResponseItem>(v.clone()) {
Ok(item) => {
match serde_json::from_str::<TimestampedLine>(line) {
Ok(TimestampedLine {
record: TaggedLine::State { .. },
..
}) => {}
Ok(TimestampedLine {
record: TaggedLine::Event { event },
..
}) => items.push(RolloutItem::Event(event)),
Ok(TimestampedLine {
record: TaggedLine::SessionMeta { meta },
..
})
| Ok(TimestampedLine {
record: TaggedLine::PrevSessionMeta { meta },
..
}) => items.push(RolloutItem::SessionMeta(meta)),
Ok(TimestampedLine {
record: TaggedLine::Response { item },
..
}) => {
if is_persisted_response_item(&item) {
items.push(item);
items.push(RolloutItem::ResponseItem(item));
}
}
Err(e) => {
warn!("failed to parse item: {v:?}, error: {e}");
}
Err(_) => warn!("failed to parse rollout line: {line}"),
}
}
@@ -294,6 +382,9 @@ struct LogFileInfo {
/// Timestamp for the start of the session.
timestamp: OffsetDateTime,
/// Full filesystem path to the rollout file.
path: PathBuf,
}
fn create_log_file(
@@ -301,8 +392,7 @@ fn create_log_file(
conversation_id: ConversationId,
) -> std::io::Result<LogFileInfo> {
// Resolve ~/.codex/sessions/YYYY/MM/DD and create it if missing.
let timestamp = OffsetDateTime::now_local()
.map_err(|e| IoError::other(format!("failed to get local time: {e}")))?;
let timestamp = OffsetDateTime::now_utc();
let mut dir = config.codex_home.clone();
dir.push(SESSIONS_SUBDIR);
dir.push(timestamp.year().to_string());
@@ -330,6 +420,7 @@ fn create_log_file(
file,
conversation_id,
timestamp,
path,
})
}
@@ -348,33 +439,37 @@ async fn rollout_writer(
meta: session_meta,
git: git_info,
};
// Write the SessionMeta as the first item in the file
writer.write_line(&session_meta_with_git).await?;
writer
.write_tagged(TaggedLine::SessionMeta {
meta: session_meta_with_git,
})
.await?;
}
// Process rollout commands
while let Some(cmd) = rx.recv().await {
match cmd {
RolloutCmd::AddItems(items) => {
RolloutCmd::AddResponseItems(items) => {
for item in items {
if is_persisted_response_item(&item) {
writer.write_line(&item).await?;
writer.write_tagged(TaggedLine::Response { item }).await?;
}
}
}
RolloutCmd::UpdateState(state) => {
#[derive(Serialize)]
struct StateLine<'a> {
record_type: &'static str,
#[serde(flatten)]
state: &'a SessionStateSnapshot,
RolloutCmd::AddEvents(events) => {
for event in events {
writer.write_tagged(TaggedLine::Event { event }).await?;
}
}
// Sequencing barrier: by the time we handle `Flush`, all previously
// queued writes have been applied and flushed to OS buffers.
RolloutCmd::Flush { ack } => {
let _ = ack.send(());
}
RolloutCmd::AddSessionMeta(meta) => {
writer
.write_line(&StateLine {
record_type: "state",
state: &state,
})
.write_tagged(TaggedLine::PrevSessionMeta { meta })
.await?;
}
RolloutCmd::Shutdown { ack } => {
@@ -398,4 +493,12 @@ impl JsonlWriter {
self.file.flush().await?;
Ok(())
}
async fn write_tagged(&mut self, record: TaggedLine) -> std::io::Result<()> {
let timestamp = time::OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
let line = TimestampedLine { timestamp, record };
self.write_line(&line).await
}
}

View File

@@ -41,8 +41,13 @@ fn write_session_file(
let mut file = File::create(file_path)?;
let meta = serde_json::json!({
"record_type": "session_meta",
"timestamp": ts_str,
"id": uuid.to_string()
"id": uuid.to_string(),
"cwd": "/",
"originator": "test",
"cli_version": "0.0.0",
"instructions": null
});
writeln!(file, "{meta}")?;
@@ -56,6 +61,18 @@ fn write_session_file(
Ok((dt, uuid))
}
fn expected_session_meta(ts: &str, uuid: Uuid) -> serde_json::Value {
serde_json::json!({
"record_type": "session_meta",
"timestamp": ts,
"id": uuid.to_string(),
"cwd": "/",
"originator": "test",
"cli_version": "0.0.0",
"instructions": null
})
}
#[tokio::test]
async fn test_list_conversations_latest_first() {
let temp = TempDir::new().unwrap();
@@ -94,19 +111,19 @@ async fn test_list_conversations_latest_first() {
.join(format!("rollout-2025-01-01T12-00-00-{u1}.jsonl"));
let head_3 = vec![
serde_json::json!({"timestamp": "2025-01-03T12-00-00", "id": u3.to_string()}),
expected_session_meta("2025-01-03T12-00-00", u3),
serde_json::json!({"record_type": "response", "index": 0}),
serde_json::json!({"record_type": "response", "index": 1}),
serde_json::json!({"record_type": "response", "index": 2}),
];
let head_2 = vec![
serde_json::json!({"timestamp": "2025-01-02T12-00-00", "id": u2.to_string()}),
expected_session_meta("2025-01-02T12-00-00", u2),
serde_json::json!({"record_type": "response", "index": 0}),
serde_json::json!({"record_type": "response", "index": 1}),
serde_json::json!({"record_type": "response", "index": 2}),
];
let head_1 = vec![
serde_json::json!({"timestamp": "2025-01-01T12-00-00", "id": u1.to_string()}),
expected_session_meta("2025-01-01T12-00-00", u1),
serde_json::json!({"record_type": "response", "index": 0}),
serde_json::json!({"record_type": "response", "index": 1}),
serde_json::json!({"record_type": "response", "index": 2}),
@@ -171,11 +188,11 @@ async fn test_pagination_cursor() {
.join("04")
.join(format!("rollout-2025-03-04T09-00-00-{u4}.jsonl"));
let head_5 = vec![
serde_json::json!({"timestamp": "2025-03-05T09-00-00", "id": u5.to_string()}),
expected_session_meta("2025-03-05T09-00-00", u5),
serde_json::json!({"record_type": "response", "index": 0}),
];
let head_4 = vec![
serde_json::json!({"timestamp": "2025-03-04T09-00-00", "id": u4.to_string()}),
expected_session_meta("2025-03-04T09-00-00", u4),
serde_json::json!({"record_type": "response", "index": 0}),
];
let expected_cursor1: Cursor =
@@ -213,11 +230,11 @@ async fn test_pagination_cursor() {
.join("02")
.join(format!("rollout-2025-03-02T09-00-00-{u2}.jsonl"));
let head_3 = vec![
serde_json::json!({"timestamp": "2025-03-03T09-00-00", "id": u3.to_string()}),
expected_session_meta("2025-03-03T09-00-00", u3),
serde_json::json!({"record_type": "response", "index": 0}),
];
let head_2 = vec![
serde_json::json!({"timestamp": "2025-03-02T09-00-00", "id": u2.to_string()}),
expected_session_meta("2025-03-02T09-00-00", u2),
serde_json::json!({"record_type": "response", "index": 0}),
];
let expected_cursor2: Cursor =
@@ -249,7 +266,7 @@ async fn test_pagination_cursor() {
.join("01")
.join(format!("rollout-2025-03-01T09-00-00-{u1}.jsonl"));
let head_1 = vec![
serde_json::json!({"timestamp": "2025-03-01T09-00-00", "id": u1.to_string()}),
expected_session_meta("2025-03-01T09-00-00", u1),
serde_json::json!({"record_type": "response", "index": 0}),
];
let expected_cursor3: Cursor =
@@ -288,7 +305,7 @@ async fn test_get_conversation_contents() {
.join("01")
.join(format!("rollout-2025-04-01T10-30-00-{uuid}.jsonl"));
let expected_head = vec![
serde_json::json!({"timestamp": ts, "id": uuid.to_string()}),
expected_session_meta(ts, uuid),
serde_json::json!({"record_type": "response", "index": 0}),
serde_json::json!({"record_type": "response", "index": 1}),
];
@@ -305,7 +322,7 @@ async fn test_get_conversation_contents() {
assert_eq!(page, expected_page);
// Entire file contents equality
let meta = serde_json::json!({"timestamp": ts, "id": uuid.to_string()});
let meta = expected_session_meta(ts, uuid);
let rec0 = serde_json::json!({"record_type": "response", "index": 0});
let rec1 = serde_json::json!({"record_type": "response", "index": 1});
let expected_content = format!("{meta}\n{rec0}\n{rec1}\n");
@@ -340,9 +357,7 @@ async fn test_stable_ordering_same_second_pagination() {
.join("07")
.join("01")
.join(format!("rollout-2025-07-01T00-00-00-{u2}.jsonl"));
let head = |u: Uuid| -> Vec<serde_json::Value> {
vec![serde_json::json!({"timestamp": ts, "id": u.to_string()})]
};
let head = |u: Uuid| -> Vec<serde_json::Value> { vec![expected_session_meta(ts, u)] };
let expected_cursor1: Cursor = serde_json::from_str(&format!("\"{ts}|{u2}\"")).unwrap();
let expected_page1 = ConversationsPage {
items: vec![

View File

@@ -4,9 +4,12 @@ use codex_core::ModelProviderInfo;
use codex_core::NewConversation;
use codex_core::WireApi;
use codex_core::built_in_model_providers;
use codex_core::protocol::AgentMessageEvent;
use codex_core::protocol::EventMsg;
use codex_core::protocol::InputItem;
use codex_core::protocol::InputMessageKind;
use codex_core::protocol::Op;
use codex_core::protocol::UserMessageEvent;
use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
use codex_protocol::mcp_protocol::AuthMode;
use core_test_support::load_default_config_for_test;
@@ -126,7 +129,14 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
writeln!(
f,
"{}",
json!({"meta":"test","instructions":"be nice", "id": Uuid::new_v4(), "timestamp": "2024-01-01T00:00:00Z"})
json!({
"record_type": "session_meta",
"id": Uuid::new_v4(),
"timestamp": "2024-01-01T00:00:00Z",
"cwd": tmpdir.path().to_string_lossy(),
"originator": "test",
"cli_version": "0.0.0-test"
})
)
.unwrap();
@@ -138,7 +148,30 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
text: "resumed user message".to_string(),
}],
};
writeln!(f, "{}", serde_json::to_string(&prior_user).unwrap()).unwrap();
let mut prior_user_obj = serde_json::to_value(&prior_user)
.unwrap()
.as_object()
.unwrap()
.clone();
prior_user_obj.insert("record_type".to_string(), serde_json::json!("response"));
prior_user_obj.insert(
"timestamp".to_string(),
serde_json::json!("2025-01-01T00:00:00Z"),
);
writeln!(f, "{}", serde_json::Value::Object(prior_user_obj)).unwrap();
// Also include a matching user message event to preserve ordering at resume
let prior_user_event = EventMsg::UserMessage(UserMessageEvent {
message: "resumed user message".to_string(),
kind: Some(InputMessageKind::Plain),
});
let prior_user_event_line = serde_json::json!({
"timestamp": "2025-01-01T00:00:00Z",
"record_type": "event",
"id": "resume-0",
"msg": prior_user_event,
});
writeln!(f, "{prior_user_event_line}").unwrap();
// Prior item: system message (excluded from API history)
let prior_system = codex_protocol::models::ResponseItem::Message {
@@ -148,7 +181,17 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
text: "resumed system instruction".to_string(),
}],
};
writeln!(f, "{}", serde_json::to_string(&prior_system).unwrap()).unwrap();
let mut prior_system_obj = serde_json::to_value(&prior_system)
.unwrap()
.as_object()
.unwrap()
.clone();
prior_system_obj.insert("record_type".to_string(), serde_json::json!("response"));
prior_system_obj.insert(
"timestamp".to_string(),
serde_json::json!("2025-01-01T00:00:00Z"),
);
writeln!(f, "{}", serde_json::Value::Object(prior_system_obj)).unwrap();
// Prior item: assistant message
let prior_item = codex_protocol::models::ResponseItem::Message {
@@ -158,7 +201,27 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
text: "resumed assistant message".to_string(),
}],
};
writeln!(f, "{}", serde_json::to_string(&prior_item).unwrap()).unwrap();
let mut prior_item_obj = serde_json::to_value(&prior_item)
.unwrap()
.as_object()
.unwrap()
.clone();
prior_item_obj.insert("record_type".to_string(), serde_json::json!("response"));
prior_item_obj.insert(
"timestamp".to_string(),
serde_json::json!("2025-01-01T00:00:00Z"),
);
writeln!(f, "{}", serde_json::Value::Object(prior_item_obj)).unwrap();
let prior_item_event = EventMsg::AgentMessage(AgentMessageEvent {
message: "resumed assistant message".to_string(),
});
let prior_event_line = serde_json::json!({
"timestamp": "2025-01-01T00:00:00Z",
"record_type": "event",
"id": "resume-1",
"msg": prior_item_event,
});
writeln!(f, "{prior_event_line}").unwrap();
drop(f);
// Mock server that will receive the resumed request

View File

@@ -3,10 +3,11 @@ use codex_core::ConversationManager;
use codex_core::ModelProviderInfo;
use codex_core::NewConversation;
use codex_core::built_in_model_providers;
use codex_core::protocol::ConversationHistoryResponseEvent;
use codex_core::protocol::ConversationPathResponseEvent;
use codex_core::protocol::EventMsg;
use codex_core::protocol::InputItem;
use codex_core::protocol::Op;
use codex_protocol::models::ResponseItem;
use core_test_support::load_default_config_for_test;
use core_test_support::wait_for_event;
use tempfile::TempDir;
@@ -72,17 +73,34 @@ async fn fork_conversation_twice_drops_to_first_message() {
}
// Request history from the base conversation.
codex.submit(Op::GetHistory).await.unwrap();
codex.submit(Op::GetConversationPath).await.unwrap();
let base_history =
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ConversationHistory(_))).await;
// Capture entries from the base history and compute expected prefixes after each fork.
let entries_after_three = match &base_history {
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { entries, .. }) => {
entries.clone()
}
// Capture path/id from the base history and compute expected prefixes after each fork.
let (base_conv_id, base_path) = match &base_history {
EventMsg::ConversationHistory(ConversationPathResponseEvent {
conversation_id,
path,
}) => (*conversation_id, path.clone()),
_ => panic!("expected ConversationHistory event"),
};
// Read entries from rollout file.
async fn read_response_entries(path: &std::path::Path) -> Vec<ResponseItem> {
let text = tokio::fs::read_to_string(path).await.unwrap_or_default();
let mut out = Vec::new();
for line in text.lines() {
if line.trim().is_empty() {
continue;
}
if let Ok(item) = serde_json::from_str::<ResponseItem>(line) {
out.push(item);
}
}
out
}
let entries_after_three: Vec<ResponseItem> = read_response_entries(&base_path).await;
// History layout for this test:
// [0] user instructions,
// [1] environment context,
@@ -113,42 +131,46 @@ async fn fork_conversation_twice_drops_to_first_message() {
conversation: codex_fork1,
..
} = conversation_manager
.fork_conversation(entries_after_three.clone(), 1, config_for_fork.clone())
.fork_conversation(base_path.clone(), base_conv_id, 1, config_for_fork.clone())
.await
.expect("fork 1");
codex_fork1.submit(Op::GetHistory).await.unwrap();
codex_fork1.submit(Op::GetConversationPath).await.unwrap();
let fork1_history = wait_for_event(&codex_fork1, |ev| {
matches!(ev, EventMsg::ConversationHistory(_))
})
.await;
let entries_after_first_fork = match &fork1_history {
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { entries, .. }) => {
assert!(matches!(
fork1_history,
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { ref entries, .. }) if *entries == expected_after_first
));
entries.clone()
}
let (fork1_id, fork1_path) = match &fork1_history {
EventMsg::ConversationHistory(ConversationPathResponseEvent {
conversation_id,
path,
}) => (*conversation_id, path.clone()),
_ => panic!("expected ConversationHistory event after first fork"),
};
let entries_after_first_fork: Vec<ResponseItem> = read_response_entries(&fork1_path).await;
assert_eq!(entries_after_first_fork, expected_after_first);
// Fork again with n=1 → drops the (new) last user message, leaving only the first.
let NewConversation {
conversation: codex_fork2,
..
} = conversation_manager
.fork_conversation(entries_after_first_fork.clone(), 1, config_for_fork.clone())
.fork_conversation(fork1_path.clone(), fork1_id, 1, config_for_fork.clone())
.await
.expect("fork 2");
codex_fork2.submit(Op::GetHistory).await.unwrap();
codex_fork2.submit(Op::GetConversationPath).await.unwrap();
let fork2_history = wait_for_event(&codex_fork2, |ev| {
matches!(ev, EventMsg::ConversationHistory(_))
})
.await;
assert!(matches!(
fork2_history,
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { ref entries, .. }) if *entries == expected_after_second
));
let (_fork2_id, fork2_path) = match &fork2_history {
EventMsg::ConversationHistory(ConversationPathResponseEvent {
conversation_id,
path,
}) => (*conversation_id, path.clone()),
_ => panic!("expected ConversationHistory event after second fork"),
};
let entries_after_second_fork: Vec<ResponseItem> = read_response_entries(&fork2_path).await;
assert_eq!(entries_after_second_fork, expected_after_second);
}

View File

@@ -156,8 +156,17 @@ fn create_fake_rollout(codex_home: &Path, filename_ts: &str, meta_rfc3339: &str,
let file_path = dir.join(format!("rollout-{filename_ts}-{uuid}.jsonl"));
let mut lines = Vec::new();
// Meta line with timestamp
lines.push(json!({"timestamp": meta_rfc3339, "id": uuid}).to_string());
lines.push(
json!({
"record_type": "session_meta",
"id": uuid,
"timestamp": meta_rfc3339,
"cwd": codex_home.to_string_lossy(),
"originator": "test",
"cli_version": "0.0.0-test"
})
.to_string(),
);
// Minimal user message entry as a persisted response item
lines.push(
json!({

View File

@@ -15,7 +15,6 @@ use crate::config_types::ReasoningSummary as ReasoningSummaryConfig;
use crate::custom_prompts::CustomPrompt;
use crate::mcp_protocol::ConversationId;
use crate::message_history::HistoryEntry;
use crate::models::ResponseItem;
use crate::num_format::format_with_separators;
use crate::parse_command::ParsedCommand;
use crate::plan_tool::UpdatePlanArgs;
@@ -149,7 +148,7 @@ pub enum Op {
/// Request the full in-memory conversation transcript for the current session.
/// Reply is delivered via `EventMsg::ConversationHistory`.
GetHistory,
GetConversationPath,
/// Request the list of MCP tools available across all configured servers.
/// Reply is delivered via `EventMsg::McpListToolsResponse`.
@@ -425,7 +424,7 @@ pub enum EventMsg {
/// Agent text output message
AgentMessage(AgentMessageEvent),
/// User/system input message (what was sent to the model)
/// User/system input message (what was sent to the model).
UserMessage(UserMessageEvent),
/// Agent text output delta message
@@ -499,7 +498,7 @@ pub enum EventMsg {
/// Notification that the agent is shutting down.
ShutdownComplete,
ConversationHistory(ConversationHistoryResponseEvent),
ConversationHistory(ConversationPathResponseEvent),
}
// Individual event payload types matching each `EventMsg` variant.
@@ -799,9 +798,9 @@ pub struct WebSearchEndEvent {
/// Response payload for `Op::GetHistory` containing the current session's
/// in-memory transcript.
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
pub struct ConversationHistoryResponseEvent {
pub struct ConversationPathResponseEvent {
pub conversation_id: ConversationId,
pub entries: Vec<ResponseItem>,
pub path: PathBuf,
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]

View File

@@ -3,7 +3,7 @@ use crate::backtrack_helpers;
use crate::pager_overlay::Overlay;
use crate::tui;
use crate::tui::TuiEvent;
use codex_core::protocol::ConversationHistoryResponseEvent;
use codex_core::protocol::ConversationPathResponseEvent;
use codex_protocol::mcp_protocol::ConversationId;
use color_eyre::eyre::Result;
use crossterm::event::KeyCode;
@@ -98,7 +98,7 @@ impl App {
) {
self.backtrack.pending = Some((base_id, drop_last_messages, prefill));
self.app_event_tx.send(crate::app_event::AppEvent::CodexOp(
codex_core::protocol::Op::GetHistory,
codex_core::protocol::Op::GetConversationPath,
));
}
@@ -265,7 +265,7 @@ impl App {
pub(crate) async fn on_conversation_history_for_backtrack(
&mut self,
tui: &mut tui::Tui,
ev: ConversationHistoryResponseEvent,
ev: ConversationPathResponseEvent,
) -> Result<()> {
if let Some((base_id, _, _)) = self.backtrack.pending.as_ref()
&& ev.conversation_id == *base_id
@@ -281,15 +281,16 @@ impl App {
async fn fork_and_switch_to_new_conversation(
&mut self,
tui: &mut tui::Tui,
ev: ConversationHistoryResponseEvent,
ev: ConversationPathResponseEvent,
drop_count: usize,
prefill: String,
) {
let cfg = self.chat_widget.config_ref().clone();
// Perform the fork via a thin wrapper for clarity/testability.
let result = self
.perform_fork(ev.entries.clone(), drop_count, cfg.clone())
.perform_fork(ev.path.clone(), ev.conversation_id, drop_count, cfg.clone())
.await;
// We aren't using the initial history UI replay in session configured because we have more accurate version of the history.
match result {
Ok(new_conv) => {
self.install_forked_conversation(tui, cfg, new_conv, drop_count, &prefill)
@@ -301,12 +302,13 @@ impl App {
/// Thin wrapper around ConversationManager::fork_conversation.
async fn perform_fork(
&self,
entries: Vec<codex_protocol::models::ResponseItem>,
conversation_path: std::path::PathBuf,
conversation_id: codex_protocol::mcp_protocol::ConversationId,
drop_count: usize,
cfg: codex_core::config::Config,
) -> codex_core::error::Result<codex_core::NewConversation> {
self.server
.fork_conversation(entries, drop_count, cfg)
.fork_conversation(conversation_path, conversation_id, drop_count, cfg)
.await
}

View File

@@ -1,4 +1,4 @@
use codex_core::protocol::ConversationHistoryResponseEvent;
use codex_core::protocol::ConversationPathResponseEvent;
use codex_core::protocol::Event;
use codex_file_search::FileMatch;
@@ -58,5 +58,5 @@ pub(crate) enum AppEvent {
UpdateSandboxPolicy(SandboxPolicy),
/// Forwarded conversation history snapshot from the current conversation.
ConversationHistory(ConversationHistoryResponseEvent),
ConversationHistory(ConversationPathResponseEvent),
}