This commit is contained in:
Ahmed Ibrahim
2025-09-08 11:35:10 -07:00
parent 5f18406e8d
commit b7c95b57fd
3 changed files with 50 additions and 236 deletions

View File

@@ -11,6 +11,7 @@ use std::time::Duration;
use crate::AuthManager;
use crate::event_mapping::map_response_item_to_event_messages;
use crate::rollout::RolloutItem;
use crate::rollout::recorder::RolloutItemSliceExt;
use async_channel::Receiver;
use async_channel::Sender;
use codex_apply_patch::ApplyPatchAction;
@@ -204,9 +205,6 @@ impl Codex {
error!("Failed to create session: {e:#}");
CodexErr::InternalAgentDied
})?;
let _ = session
.apply_initial_history(&turn_context, conversation_history)
.await;
let conversation_id = session.conversation_id;
// This task will run until Op::Shutdown is received.
@@ -582,39 +580,35 @@ impl Session {
}
async fn record_initial_history_resumed(&self, items: Vec<RolloutItem>) -> Vec<EventMsg> {
let mut responses: Vec<ResponseItem> = Vec::new();
// Include everything before we see a session meta marker; after that, include only user messages
// Record transcript (without persisting again)
let responses: Vec<ResponseItem> = items.as_slice().get_response_items();
if !responses.is_empty() {
self.record_conversation_items_internal(&responses, false).await;
}
// Build initial UI messages: include everything before session resume marker,
// and only user messages afterwards
let before_resume_session = items
.get(0)
.map(|it| !matches!(it, RolloutItem::SessionMeta(..)))
.unwrap_or(true);
let mut msgs = Vec::new();
for item in items.clone() {
match item {
RolloutItem::ResponseItem(ref response) => {
responses.push(response.clone());
let new_msgs: Vec<EventMsg> = map_response_item_to_event_messages(
response,
self.show_raw_agent_reasoning,
);
if before_resume_session {
msgs.extend(new_msgs);
} else {
msgs.extend(
new_msgs
.into_iter()
.filter(|m| matches!(m, EventMsg::UserMessage(_))),
);
}
}
RolloutItem::Event(event) => msgs.push(event.msg.clone()),
RolloutItem::SessionMeta(..) => {}
for response in items.as_slice().get_response_items() {
let new_msgs: Vec<EventMsg> =
map_response_item_to_event_messages(&response, self.show_raw_agent_reasoning);
if before_resume_session {
msgs.extend(new_msgs);
} else {
msgs.extend(
new_msgs
.into_iter()
.filter(|m| matches!(m, EventMsg::UserMessage(_))),
);
}
}
if !responses.is_empty() {
self.record_conversation_items_internal(&responses, false).await;
for event in items.as_slice().get_events() {
msgs.push(event.msg);
}
msgs
}
@@ -1404,7 +1398,11 @@ async fn submission_loop(
}
Op::GetConversationPath => {
let sub_id = sub.id.clone();
// Ensure rollout file is flushed so consumers can read it immediately.
let rec_opt = { sess.rollout.lock_unchecked().as_ref().cloned() };
if let Some(rec) = rec_opt {
let _ = rec.flush().await;
}
let event = Event {
id: sub_id.clone(),
msg: EventMsg::ConversationHistory(ConversationPathResponseEvent {

View File

@@ -1,5 +1,9 @@
use crate::AuthManager;
use crate::CodexAuth;
use crate::rollout::RolloutItem;
use crate::rollout::recorder::RolloutItemSliceExt;
use codex_protocol::mcp_protocol::ConversationId;
use tokio::sync::RwLock;
use crate::codex::Codex;
use crate::codex::CodexSpawnOk;
use crate::codex::INITIAL_SUBMIT_ID;
@@ -33,6 +37,24 @@ pub enum InitialHistory {
Forked(Vec<ResponseItem>),
}
impl InitialHistory {
/// Return all response items contained in this initial history.
pub fn get_response_items(&self) -> Vec<ResponseItem> {
match self {
InitialHistory::New => Vec::new(),
InitialHistory::Resumed(items) => items.as_slice().get_response_items(),
}
}
/// Return all events contained in this initial history.
pub fn get_events(&self) -> Vec<crate::protocol::Event> {
match self {
InitialHistory::New => Vec::new(),
InitialHistory::Resumed(items) => items.as_slice().get_events(),
}
}
}
/// Represents a newly created Codex conversation, including the first event
/// (which is [`EventMsg::SessionConfigured`]).
pub struct NewConversation {
@@ -94,206 +116,4 @@ impl ConversationManager {
codex: Codex,
conversation_id: ConversationId,
) -> CodexResult<NewConversation> {
// The first event must be `SessionInitialized`. Validate and forward it
// to the caller so that they can display it in the conversation
// history.
let event = codex.next_event().await?;
let session_configured = match event {
Event {
id,
msg: EventMsg::SessionConfigured(session_configured),
} if id == INITIAL_SUBMIT_ID => session_configured,
_ => {
return Err(CodexErr::SessionConfiguredNotFirstEvent);
}
};
let conversation = Arc::new(CodexConversation::new(codex));
self.conversations
.write()
.await
.insert(conversation_id, conversation.clone());
Ok(NewConversation {
conversation_id,
conversation,
session_configured,
})
}
pub async fn get_conversation(
&self,
conversation_id: ConversationId,
) -> CodexResult<Arc<CodexConversation>> {
let conversations = self.conversations.read().await;
conversations
.get(&conversation_id)
.cloned()
.ok_or_else(|| CodexErr::ConversationNotFound(conversation_id))
}
pub async fn resume_conversation_from_rollout(
&self,
config: Config,
rollout_path: PathBuf,
auth_manager: Arc<AuthManager>,
) -> CodexResult<NewConversation> {
let initial_history = RolloutRecorder::get_rollout_history(&rollout_path).await?;
let CodexSpawnOk {
codex,
conversation_id,
} = Codex::spawn(config, auth_manager, initial_history).await?;
self.finalize_spawn(codex, conversation_id).await
}
pub async fn remove_conversation(&self, conversation_id: ConversationId) {
self.conversations.write().await.remove(&conversation_id);
}
/// Fork an existing conversation by dropping the last `drop_last_messages`
/// user/assistant messages from its transcript and starting a new
/// conversation with identical configuration (unless overridden by the
/// caller's `config`). The new conversation will have a fresh id.
pub async fn fork_conversation(
&self,
conversation_path: PathBuf,
conversation_id: ConversationId,
num_messages_to_drop: usize,
config: Config,
) -> CodexResult<NewConversation> {
// Compute the prefix up to the cut point.
let initial_history = RolloutRecorder::get_rollout_history(&conversation_path).await?;
let conversation_history = match initial_history {
InitialHistory::Resumed(items) => items,
InitialHistory::New => return Err(CodexErr::ConversationNotFound(conversation_id)),
};
let history =
truncate_after_dropping_last_messages(conversation_history, num_messages_to_drop);
// Spawn a new conversation with the computed initial history.
let auth_manager = self.auth_manager.clone();
let CodexSpawnOk {
codex,
conversation_id,
} = Codex::spawn(config, auth_manager, history).await?;
self.finalize_spawn(codex, conversation_id).await
}
}
/// Return a prefix of `items` obtained by dropping the last `n` user messages
/// and all items that follow them.
fn truncate_after_dropping_last_messages(items: Vec<RolloutItem>, n: usize) -> InitialHistory {
if n == 0 {
return InitialHistory::Resumed(items);
}
// Walk backwards counting only `user` Message items, find cut index.
let mut count = 0usize;
let mut cut_index = 0usize;
for (idx, item) in items.iter().enumerate().rev() {
if let RolloutItem::ResponseItem(response_item) = item
&& let ResponseItem::Message { role, .. } = response_item
&& role == "user"
{
count += 1;
if count == n {
// Cut everything from this user message to the end.
cut_index = idx;
break;
}
}
}
if cut_index == 0 {
// No prefix remains after dropping; start a new conversation.
InitialHistory::New
} else {
InitialHistory::Resumed(items.into_iter().take(cut_index).collect())
}
}
#[cfg(test)]
mod tests {
use super::*;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ReasoningItemReasoningSummary;
use codex_protocol::models::ResponseItem;
fn user_msg(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::OutputText {
text: text.to_string(),
}],
}
}
fn assistant_msg(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: text.to_string(),
}],
}
}
#[test]
fn drops_from_last_user_only() {
let items = vec![
user_msg("u1"),
assistant_msg("a1"),
assistant_msg("a2"),
user_msg("u2"),
assistant_msg("a3"),
ResponseItem::Reasoning {
id: "r1".to_string(),
summary: vec![ReasoningItemReasoningSummary::SummaryText {
text: "s".to_string(),
}],
content: None,
encrypted_content: None,
},
ResponseItem::FunctionCall {
id: None,
name: "tool".to_string(),
arguments: "{}".to_string(),
call_id: "c1".to_string(),
},
assistant_msg("a4"),
];
let items: Vec<RolloutItem> = items.into_iter().map(RolloutItem::from).collect();
let truncated = truncate_after_dropping_last_messages(items.clone(), 1);
if let InitialHistory::Resumed(resumed) = truncated {
let get_text = |ri: &ResponseItem| -> Option<String> {
if let ResponseItem::Message { content, .. } = ri {
for c in content {
if let ContentItem::OutputText { text } = c {
return Some(text.clone());
}
}
}
None
};
let texts: Vec<String> = resumed
.iter()
.filter_map(|it| match it {
RolloutItem::ResponseItem(ri) => get_text(ri),
_ => None,
})
.collect();
assert_eq!(
texts,
vec!["u1".to_string(), "a1".to_string(), "a2".to_string()]
);
} else {
panic!("expected Resumed history");
}
let truncated2 = truncate_after_dropping_last_messages(items, 2);
assert!(matches!(truncated2, InitialHistory::New));
}
}
// The first event must be `

View File

@@ -229,10 +229,6 @@ impl RolloutRecorder {
/// processes the channel serially; when it dequeues `Flush`, all prior
/// `AddResponseItems`/`AddEvents`/`AddSessionMeta` have already been written
/// via `write_line`, which calls `file.flush()` (OSbuffer flush).
///
/// Note: this does NOT perform an fsync (`sync_data`/`sync_all`). If durable
/// persistence is required (powerloss safety), we should add that here or
/// provide a separate method.
pub async fn flush(&self) -> std::io::Result<()> {
let (tx_done, rx_done) = oneshot::channel();
self.tx