This commit is contained in:
Ahmed Ibrahim
2025-09-05 12:50:06 -07:00
parent f7671481c1
commit e7a20f5109
9 changed files with 115 additions and 82 deletions

Submodule codex-rs/.codex/pro/1757202005/agent-a added at 6e743e8496

Submodule codex-rs/.codex/pro/1757202005/agent-b added at 6e743e8496

View File

@@ -17,7 +17,7 @@ use codex_apply_patch::ApplyPatchAction;
use codex_apply_patch::MaybeApplyPatchVerified;
use codex_apply_patch::maybe_parse_apply_patch_verified;
use codex_protocol::mcp_protocol::ConversationId;
use codex_protocol::protocol::ConversationHistoryResponseEvent;
use codex_protocol::protocol::ConversationPathResponseEvent;
use codex_protocol::protocol::TaskStartedEvent;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnAbortedEvent;
@@ -571,7 +571,9 @@ impl Session {
Some(turn_context.sandbox_policy.clone()),
Some(self.user_shell.clone()),
)));
self.record_conversation_items(&conversation_items).await;
for item in conversation_items {
self.record_conversation_items(item).await;
}
vec![]
}
@@ -590,6 +592,7 @@ impl Session {
let mut msgs = Vec::new();
for item in items.clone() {
match item {
<<<<<<< HEAD
RolloutItem::ResponseItem(ref v) => {
responses.extend(v.clone());
let new_msgs: Vec<EventMsg> = v
@@ -598,6 +601,13 @@ impl Session {
map_response_item_to_event_messages(ri, self.show_raw_agent_reasoning)
})
.collect();
=======
RolloutItem::ResponseItem(response) => {
let new_msgs: Vec<EventMsg> = map_response_item_to_event_messages(
&response,
self.show_raw_agent_reasoning,
);
>>>>>>> 4245659e (progress)
if before_resume_session {
msgs.extend(new_msgs);
} else {
@@ -608,8 +618,15 @@ impl Session {
);
}
}
<<<<<<< HEAD
RolloutItem::Event(events) => msgs.extend(events.iter().map(|e| e.msg.clone())),
RolloutItem::SessionMeta(..) => {}
=======
RolloutItem::Event(event) => msgs.push(event.msg.clone()),
RolloutItem::SessionMeta(..) => {
// Session meta does not emit events
}
>>>>>>> 4245659e (progress)
}
}
@@ -632,7 +649,7 @@ impl Session {
};
if let Some(rec) = recorder
&& let Err(e) = rec
.record_items(crate::rollout::RolloutItem::Event(vec![event_to_record]))
.record_items(crate::rollout::RolloutItem::Event(event_to_record))
.await
{
error!("failed to record rollout event: {e:#}");
@@ -725,6 +742,7 @@ impl Session {
/// Records items to both the rollout and the chat completions/ZDR
/// transcript, if enabled.
<<<<<<< HEAD
async fn record_conversation_items(&self, items: &[ResponseItem]) {
self.record_conversation_items_internal(items, true).await;
}
@@ -734,6 +752,20 @@ impl Session {
if persist {
self.record_state_snapshot(RolloutItem::ResponseItem(items.to_vec()))
.await;
=======
async fn record_conversation_items<T>(&self, item: T)
where
T: Into<RolloutItem>,
{
let item: RolloutItem = item.into();
debug!("Recording items for conversation: {item:?}");
self.record_state_snapshot(item.clone()).await;
if let RolloutItem::ResponseItem(response_item) = &item {
self.state
.lock_unchecked()
.history
.record_items(std::slice::from_ref(response_item));
>>>>>>> 4245659e (progress)
}
self.state.lock_unchecked().history.record_items(items);
@@ -1187,13 +1219,13 @@ async fn submission_loop(
// Install the new persistent context for subsequent tasks/turns.
turn_context = Arc::new(new_turn_context);
if cwd.is_some() || approval_policy.is_some() || sandbox_policy.is_some() {
sess.record_conversation_items(&[ResponseItem::from(EnvironmentContext::new(
sess.record_conversation_items(ResponseItem::from(EnvironmentContext::new(
cwd,
approval_policy,
sandbox_policy,
// Shell is not configurable from turn to turn
None,
))])
)))
.await;
}
}
@@ -1402,12 +1434,12 @@ async fn submission_loop(
sess.send_event(event).await;
break;
}
Op::GetHistory => {
Op::GetConversationPath => {
let sub_id = sub.id.clone();
let event = Event {
id: sub_id.clone(),
msg: EventMsg::ConversationHistory(ConversationHistoryResponseEvent {
msg: EventMsg::ConversationHistory(ConversationPathResponseEvent {
conversation_id: sess.conversation_id,
entries: sess.state.lock_unchecked().history.contents(),
}),
@@ -1453,7 +1485,7 @@ async fn run_task(
sess.send_event(event).await;
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
sess.record_conversation_items(&[ResponseItem::from(initial_input_for_turn.clone())])
sess.record_conversation_items(ResponseItem::from(initial_input_for_turn.clone()))
.await;
let mut last_agent_message: Option<String> = None;
@@ -1470,7 +1502,9 @@ async fn run_task(
.into_iter()
.map(ResponseItem::from)
.collect::<Vec<ResponseItem>>();
sess.record_conversation_items(&pending_input).await;
for item in pending_input.iter().cloned() {
sess.record_conversation_items(item).await;
}
// Construct the input that we will send to the model. When using the
// Chat completions API (or ZDR clients), the model needs the full
@@ -1597,8 +1631,9 @@ async fn run_task(
// Only attempt to take the lock if there is something to record.
if !items_to_record_in_conversation_history.is_empty() {
sess.record_conversation_items(&items_to_record_in_conversation_history)
.await;
for item in items_to_record_in_conversation_history.iter().cloned() {
sess.record_conversation_items(item).await;
}
}
if responses.is_empty() {

View File

@@ -156,11 +156,17 @@ impl ConversationManager {
/// caller's `config`). The new conversation will have a fresh id.
pub async fn fork_conversation(
&self,
conversation_history: Vec<ResponseItem>,
conversation_path: PathBuf,
conversation_id: Uuid,
num_messages_to_drop: usize,
config: Config,
) -> CodexResult<NewConversation> {
// Compute the prefix up to the cut point.
let initial_history = RolloutRecorder::get_rollout_history(&conversation_path).await?;
let conversation_history = match initial_history {
InitialHistory::Resumed(items) => items,
InitialHistory::New => return Err(CodexErr::ConversationNotFound(conversation_id)),
};
let history =
truncate_after_dropping_last_messages(conversation_history, num_messages_to_drop);
@@ -177,21 +183,17 @@ impl ConversationManager {
/// Return a prefix of `items` obtained by dropping the last `n` user messages
/// and all items that follow them.
fn truncate_after_dropping_last_messages(items: Vec<ResponseItem>, n: usize) -> InitialHistory {
fn truncate_after_dropping_last_messages(items: Vec<RolloutItem>, n: usize) -> InitialHistory {
if n == 0 {
return InitialHistory::Resumed(
items
.into_iter()
.map(|ri| RolloutItem::ResponseItem(vec![ri]))
.collect(),
);
return InitialHistory::Resumed(items);
}
// Walk backwards counting only `user` Message items, find cut index.
let mut count = 0usize;
let mut cut_index = 0usize;
for (idx, item) in items.iter().enumerate().rev() {
if let ResponseItem::Message { role, .. } = item
if let RolloutItem::ResponseItem(response_item) = item
&& let ResponseItem::Message { role, .. } = response_item
&& role == "user"
{
count += 1;
@@ -206,13 +208,7 @@ fn truncate_after_dropping_last_messages(items: Vec<ResponseItem>, n: usize) ->
// No prefix remains after dropping; start a new conversation.
InitialHistory::New
} else {
InitialHistory::Resumed(
items
.into_iter()
.take(cut_index)
.map(|ri| RolloutItem::ResponseItem(vec![ri]))
.collect(),
)
InitialHistory::Resumed(items.into_iter().take(cut_index).collect())
}
}

View File

@@ -77,6 +77,7 @@ pub struct SavedSession {
#[derive(Clone)]
pub struct RolloutRecorder {
tx: Sender<RolloutCmd>,
path: PathBuf,
}
#[derive(Clone)]
@@ -99,23 +100,26 @@ struct SessionMetaLine<'a> {
#[derive(Debug, Clone)]
pub enum RolloutItem {
ResponseItem(Vec<ResponseItem>),
Event(Vec<Event>),
ResponseItem(ResponseItem),
Event(Event),
SessionMeta(SessionMetaWithGit),
}
impl<T> From<T> for RolloutItem
where
T: AsRef<[ResponseItem]>,
{
fn from(items: T) -> Self {
RolloutItem::ResponseItem(items.as_ref().to_vec())
impl From<ResponseItem> for RolloutItem {
fn from(item: ResponseItem) -> Self {
RolloutItem::ResponseItem(item)
}
}
impl From<Event> for RolloutItem {
fn from(event: Event) -> Self {
RolloutItem::Event(event)
}
}
enum RolloutCmd {
AddResponseItems(Vec<ResponseItem>),
AddEvent(Vec<Event>),
AddEvents(Vec<Event>),
AddSessionMeta(SessionMetaWithGit),
Shutdown { ack: oneshot::Sender<()> },
}
@@ -201,48 +205,36 @@ impl RolloutRecorder {
// driver instead of blocking the runtime.
tokio::task::spawn(rollout_writer(file, rx, meta, cwd));
Ok(Self { tx })
Ok(Self { tx, path })
}
pub(crate) async fn record_items(&self, item: RolloutItem) -> std::io::Result<()> {
match item {
RolloutItem::ResponseItem(items) => self.record_response_items(&items).await,
RolloutItem::Event(events) => self.record_event(&events).await,
RolloutItem::ResponseItem(item) => self.record_response_item(&item).await,
RolloutItem::Event(event) => self.record_event(&event).await,
RolloutItem::SessionMeta(meta) => self.record_session_meta(&meta).await,
}
}
async fn record_response_items(&self, items: &[ResponseItem]) -> std::io::Result<()> {
let mut filtered = Vec::new();
for item in items {
// Note that function calls may look a bit strange if they are
// "fully qualified MCP tool calls," so we could consider
// reformatting them in that case.
if is_persisted_response_item(item) {
filtered.push(item.clone());
}
}
if filtered.is_empty() {
async fn record_response_item(&self, item: &ResponseItem) -> std::io::Result<()> {
// Note that function calls may look a bit strange if they are
// "fully qualified MCP tool calls," so we could consider
// reformatting them in that case.
if !is_persisted_response_item(item) {
return Ok(());
}
self.tx
.send(RolloutCmd::AddResponseItems(filtered))
.send(RolloutCmd::AddResponseItems(vec![item.clone()]))
.await
.map_err(|e| IoError::other(format!("failed to queue rollout items: {e}")))
}
async fn record_event(&self, events: &[Event]) -> std::io::Result<()> {
let mut filtered = Vec::new();
for event in events {
if is_persisted_event(event) {
filtered.push(event.clone());
}
}
if filtered.is_empty() {
async fn record_event(&self, event: &Event) -> std::io::Result<()> {
if !is_persisted_event(event) {
return Ok(());
}
self.tx
.send(RolloutCmd::AddEvent(filtered))
.send(RolloutCmd::AddEvents(vec![event.clone()]))
.await
.map_err(|e| IoError::other(format!("failed to queue rollout event: {e}")))
}
@@ -294,7 +286,7 @@ impl RolloutRecorder {
obj.remove("record_type");
}
match serde_json::from_value::<Event>(ev_val) {
Ok(ev) => items.push(RolloutItem::Event(vec![ev])),
Ok(ev) => items.push(RolloutItem::Event(ev)),
Err(e) => warn!("failed to parse event: {v:?}, error: {e}"),
}
}
@@ -312,7 +304,7 @@ impl RolloutRecorder {
match serde_json::from_value::<ResponseItem>(v.clone()) {
Ok(item) => {
if is_persisted_response_item(&item) {
items.push(RolloutItem::ResponseItem(vec![item]));
items.push(RolloutItem::ResponseItem(item));
}
}
Err(e) => {
@@ -346,6 +338,10 @@ impl RolloutRecorder {
}))
}
pub fn path(&self) -> &Path {
&self.path
}
pub async fn shutdown(&self) -> std::io::Result<()> {
let (tx_done, rx_done) = oneshot::channel();
match self.tx.send(RolloutCmd::Shutdown { ack: tx_done }).await {
@@ -371,6 +367,9 @@ struct LogFileInfo {
/// Timestamp for the start of the session.
timestamp: OffsetDateTime,
/// Full filesystem path to the rollout file.
path: PathBuf,
}
fn create_log_file(
@@ -407,6 +406,7 @@ fn create_log_file(
file,
conversation_id,
timestamp,
path,
})
}
@@ -444,7 +444,7 @@ async fn rollout_writer(
}
}
}
RolloutCmd::AddEvent(events) => {
RolloutCmd::AddEvents(events) => {
for event in events {
#[derive(Serialize)]
struct EventLine<'a> {

View File

@@ -3,7 +3,7 @@ use codex_core::ConversationManager;
use codex_core::ModelProviderInfo;
use codex_core::NewConversation;
use codex_core::built_in_model_providers;
use codex_core::protocol::ConversationHistoryResponseEvent;
use codex_core::protocol::ConversationPathResponseEvent;
use codex_core::protocol::EventMsg;
use codex_core::protocol::InputItem;
use codex_core::protocol::Op;
@@ -72,13 +72,13 @@ async fn fork_conversation_twice_drops_to_first_message() {
}
// Request history from the base conversation.
codex.submit(Op::GetHistory).await.unwrap();
codex.submit(Op::GetConversationPath).await.unwrap();
let base_history =
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ConversationHistory(_))).await;
// Capture entries from the base history and compute expected prefixes after each fork.
let entries_after_three = match &base_history {
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { entries, .. }) => {
EventMsg::ConversationHistory(ConversationPathResponseEvent { entries, .. }) => {
entries.clone()
}
_ => panic!("expected ConversationHistory event"),
@@ -117,16 +117,16 @@ async fn fork_conversation_twice_drops_to_first_message() {
.await
.expect("fork 1");
codex_fork1.submit(Op::GetHistory).await.unwrap();
codex_fork1.submit(Op::GetConversationPath).await.unwrap();
let fork1_history = wait_for_event(&codex_fork1, |ev| {
matches!(ev, EventMsg::ConversationHistory(_))
})
.await;
let entries_after_first_fork = match &fork1_history {
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { entries, .. }) => {
EventMsg::ConversationHistory(ConversationPathResponseEvent { entries, .. }) => {
assert!(matches!(
fork1_history,
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { ref entries, .. }) if *entries == expected_after_first
EventMsg::ConversationHistory(ConversationPathResponseEvent { ref entries, .. }) if *entries == expected_after_first
));
entries.clone()
}
@@ -142,13 +142,13 @@ async fn fork_conversation_twice_drops_to_first_message() {
.await
.expect("fork 2");
codex_fork2.submit(Op::GetHistory).await.unwrap();
codex_fork2.submit(Op::GetConversationPath).await.unwrap();
let fork2_history = wait_for_event(&codex_fork2, |ev| {
matches!(ev, EventMsg::ConversationHistory(_))
})
.await;
assert!(matches!(
fork2_history,
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { ref entries, .. }) if *entries == expected_after_second
EventMsg::ConversationHistory(ConversationPathResponseEvent { ref entries, .. }) if *entries == expected_after_second
));
}

View File

@@ -149,7 +149,7 @@ pub enum Op {
/// Request the full in-memory conversation transcript for the current session.
/// Reply is delivered via `EventMsg::ConversationHistory`.
GetHistory,
GetConversationPath,
/// Request the list of MCP tools available across all configured servers.
/// Reply is delivered via `EventMsg::McpListToolsResponse`.
@@ -499,7 +499,7 @@ pub enum EventMsg {
/// Notification that the agent is shutting down.
ShutdownComplete,
ConversationHistory(ConversationHistoryResponseEvent),
ConversationHistory(ConversationPathResponseEvent),
}
// Individual event payload types matching each `EventMsg` variant.
@@ -799,7 +799,7 @@ pub struct WebSearchEndEvent {
/// Response payload for `Op::GetHistory` containing the current session's
/// in-memory transcript.
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
pub struct ConversationHistoryResponseEvent {
pub struct ConversationPathResponseEvent {
pub conversation_id: ConversationId,
pub entries: Vec<ResponseItem>,
}

View File

@@ -3,7 +3,7 @@ use crate::backtrack_helpers;
use crate::pager_overlay::Overlay;
use crate::tui;
use crate::tui::TuiEvent;
use codex_core::protocol::ConversationHistoryResponseEvent;
use codex_core::protocol::ConversationPathResponseEvent;
use codex_protocol::mcp_protocol::ConversationId;
use color_eyre::eyre::Result;
use crossterm::event::KeyCode;
@@ -98,7 +98,7 @@ impl App {
) {
self.backtrack.pending = Some((base_id, drop_last_messages, prefill));
self.app_event_tx.send(crate::app_event::AppEvent::CodexOp(
codex_core::protocol::Op::GetHistory,
codex_core::protocol::Op::GetConversationPath,
));
}
@@ -265,7 +265,7 @@ impl App {
pub(crate) async fn on_conversation_history_for_backtrack(
&mut self,
tui: &mut tui::Tui,
ev: ConversationHistoryResponseEvent,
ev: ConversationPathResponseEvent,
) -> Result<()> {
if let Some((base_id, _, _)) = self.backtrack.pending.as_ref()
&& ev.conversation_id == *base_id
@@ -281,7 +281,7 @@ impl App {
async fn fork_and_switch_to_new_conversation(
&mut self,
tui: &mut tui::Tui,
ev: ConversationHistoryResponseEvent,
ev: ConversationPathResponseEvent,
drop_count: usize,
prefill: String,
) {
@@ -301,12 +301,12 @@ impl App {
/// Thin wrapper around ConversationManager::fork_conversation.
async fn perform_fork(
&self,
entries: Vec<codex_protocol::models::ResponseItem>,
conversation_history: Vec<codex_core::protocol::ResponseItem>,
drop_count: usize,
cfg: codex_core::config::Config,
) -> codex_core::error::Result<codex_core::NewConversation> {
self.server
.fork_conversation(entries, drop_count, cfg)
.fork_conversation(conversation_history, drop_count, cfg)
.await
}

View File

@@ -1,4 +1,4 @@
use codex_core::protocol::ConversationHistoryResponseEvent;
use codex_core::protocol::ConversationPathResponseEvent;
use codex_core::protocol::Event;
use codex_file_search::FileMatch;
@@ -58,5 +58,5 @@ pub(crate) enum AppEvent {
UpdateSandboxPolicy(SandboxPolicy),
/// Forwarded conversation history snapshot from the current conversation.
ConversationHistory(ConversationHistoryResponseEvent),
ConversationHistory(ConversationPathResponseEvent),
}