Compare commits

...

1 Commits

Author SHA1 Message Date
Ahmed Ibrahim
6b0427219b Replay rollout history for v2 thread reconstruction 2026-02-05 19:13:44 -08:00
8 changed files with 1011 additions and 62 deletions

View File

@@ -3,8 +3,8 @@ use crate::codex_message_processor::PendingInterrupts;
use crate::codex_message_processor::PendingRollbacks;
use crate::codex_message_processor::TurnSummary;
use crate::codex_message_processor::TurnSummaryStore;
use crate::codex_message_processor::read_event_msgs_from_rollout;
use crate::codex_message_processor::read_summary_from_rollout;
use crate::codex_message_processor::read_turns_from_rollout;
use crate::codex_message_processor::summary_to_thread;
use crate::error_code::INTERNAL_ERROR_CODE;
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
@@ -69,7 +69,6 @@ use codex_app_server_protocol::TurnInterruptResponse;
use codex_app_server_protocol::TurnPlanStep;
use codex_app_server_protocol::TurnPlanUpdatedNotification;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::build_turns_from_event_msgs;
use codex_core::CodexThread;
use codex_core::parse_command::shlex_join;
use codex_core::protocol::ApplyPatchApprovalRequestEvent;
@@ -1077,9 +1076,9 @@ pub(crate) async fn apply_bespoke_event_handling(
{
Ok(summary) => {
let mut thread = summary_to_thread(summary);
match read_event_msgs_from_rollout(rollout_path.as_path()).await {
Ok(events) => {
thread.turns = build_turns_from_event_msgs(&events);
match read_turns_from_rollout(rollout_path.as_path()).await {
Ok(turns) => {
thread.turns = turns;
ThreadRollbackResponse { thread }
}
Err(err) => {

View File

@@ -135,7 +135,6 @@ use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::UserInfoResponse;
use codex_app_server_protocol::UserInput as V2UserInput;
use codex_app_server_protocol::UserSavedConfig;
use codex_app_server_protocol::build_turns_from_event_msgs;
use codex_backend_client::Client as BackendClient;
use codex_chatgpt::connectors;
use codex_core::AuthManager;
@@ -178,6 +177,7 @@ use codex_core::protocol::ReviewTarget as CoreReviewTarget;
use codex_core::protocol::SessionConfiguredEvent;
use codex_core::read_head_for_summary;
use codex_core::read_session_meta_line;
use codex_core::replay_rollout_response_items;
use codex_core::rollout_date_parts;
use codex_core::sandboxing::SandboxPermissions;
use codex_core::skills::remote::download_remote_skill;
@@ -204,6 +204,7 @@ use codex_protocol::protocol::RateLimitSnapshot as CoreRateLimitSnapshot;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::USER_MESSAGE_BEGIN;
use codex_protocol::protocol::UserMessageEvent;
use codex_protocol::user_input::UserInput as CoreInputItem;
use codex_rmcp_client::perform_oauth_login_return_url;
use codex_utils_json_to_toml::json_to_toml;
@@ -2259,9 +2260,9 @@ impl CodexMessageProcessor {
};
if include_turns && let Some(rollout_path) = rollout_path.as_ref() {
match read_event_msgs_from_rollout(rollout_path).await {
Ok(events) => {
thread.turns = build_turns_from_event_msgs(&events);
match read_turns_from_rollout(rollout_path).await {
Ok(turns) => {
thread.turns = turns;
}
Err(err) => {
self.send_internal_error(
@@ -2442,11 +2443,7 @@ impl CodexMessageProcessor {
session_configured,
..
}) => {
let SessionConfiguredEvent {
rollout_path,
initial_messages,
..
} = session_configured;
let SessionConfiguredEvent { rollout_path, .. } = session_configured;
let Some(rollout_path) = rollout_path else {
self.send_internal_error(
request_id,
@@ -2486,9 +2483,20 @@ impl CodexMessageProcessor {
return;
}
};
thread.turns = initial_messages
.as_deref()
.map_or_else(Vec::new, build_turns_from_event_msgs);
thread.turns = match read_turns_from_rollout(rollout_path.as_path()).await {
Ok(turns) => turns,
Err(err) => {
self.send_internal_error(
request_id,
format!(
"failed to load rollout `{}` for thread {thread_id}: {err}",
rollout_path.display()
),
)
.await;
return;
}
};
let response = ThreadResumeResponse {
thread,
@@ -2654,11 +2662,7 @@ impl CodexMessageProcessor {
}
};
let SessionConfiguredEvent {
rollout_path,
initial_messages,
..
} = session_configured;
let SessionConfiguredEvent { rollout_path, .. } = session_configured;
let Some(rollout_path) = rollout_path else {
self.send_internal_error(
request_id,
@@ -2698,9 +2702,20 @@ impl CodexMessageProcessor {
return;
}
};
thread.turns = initial_messages
.as_deref()
.map_or_else(Vec::new, build_turns_from_event_msgs);
thread.turns = match read_turns_from_rollout(rollout_path.as_path()).await {
Ok(turns) => turns,
Err(err) => {
self.send_internal_error(
request_id,
format!(
"failed to load rollout `{}` for thread {thread_id}: {err}",
rollout_path.display()
),
)
.await;
return;
}
};
let response = ThreadForkResponse {
thread: thread.clone(),
@@ -5043,22 +5058,227 @@ pub(crate) async fn read_summary_from_rollout(
})
}
pub(crate) async fn read_event_msgs_from_rollout(
path: &Path,
) -> std::io::Result<Vec<codex_protocol::protocol::EventMsg>> {
fn build_turns_from_rollout_items(rollout_items: &[RolloutItem]) -> Vec<Turn> {
let response_items = replay_rollout_response_items(rollout_items);
let replayed_user_messages = replay_rollout_user_messages(rollout_items);
build_turns_from_response_items(&response_items, &replayed_user_messages)
}
fn build_turns_from_response_items(
response_items: &[ResponseItem],
replayed_user_messages: &[Option<UserMessageEvent>],
) -> Vec<Turn> {
let mut turns = Vec::new();
let mut current_turn: Option<Turn> = None;
let mut next_turn_index: i64 = 1;
let mut next_item_index: i64 = 1;
let mut next_user_message_index: usize = 0;
for response_item in response_items {
let Some(mut turn_item) = codex_core::parse_turn_item(response_item) else {
continue;
};
let is_user_message = matches!(&turn_item, TurnItem::UserMessage(_));
if is_user_message {
if let Some(user_message_event) = replayed_user_messages
.get(next_user_message_index)
.and_then(Option::as_ref)
{
overwrite_user_message_content(&mut turn_item, user_message_event);
}
next_user_message_index += 1;
finish_current_turn(&mut turns, &mut current_turn);
current_turn = Some(Turn {
id: next_turn_id(&mut next_turn_index),
items: Vec::new(),
error: None,
status: TurnStatus::Completed,
});
} else if current_turn.is_none() {
current_turn = Some(Turn {
id: next_turn_id(&mut next_turn_index),
items: Vec::new(),
error: None,
status: TurnStatus::Completed,
});
}
overwrite_turn_item_id(&mut turn_item, next_item_id(&mut next_item_index));
if let Some(turn) = current_turn.as_mut() {
turn.items.push(ThreadItem::from(turn_item));
}
}
finish_current_turn(&mut turns, &mut current_turn);
turns
}
fn overwrite_turn_item_id(turn_item: &mut TurnItem, id: String) {
let item_id = match turn_item {
TurnItem::UserMessage(item) => &mut item.id,
TurnItem::AgentMessage(item) => &mut item.id,
TurnItem::Plan(item) => &mut item.id,
TurnItem::Reasoning(item) => &mut item.id,
TurnItem::WebSearch(item) => &mut item.id,
TurnItem::ContextCompaction(item) => &mut item.id,
};
*item_id = id;
}
fn overwrite_user_message_content(turn_item: &mut TurnItem, user_message_event: &UserMessageEvent) {
if let TurnItem::UserMessage(user_message) = turn_item {
user_message.content = user_message_content_from_event(user_message_event);
}
}
fn user_message_content_from_event(user_message_event: &UserMessageEvent) -> Vec<CoreInputItem> {
let mut content = Vec::new();
if !user_message_event.message.trim().is_empty() {
content.push(CoreInputItem::Text {
text: user_message_event.message.clone(),
text_elements: user_message_event.text_elements.clone(),
});
}
if let Some(images) = &user_message_event.images {
for image in images {
content.push(CoreInputItem::Image {
image_url: image.clone(),
});
}
}
for path in &user_message_event.local_images {
content.push(CoreInputItem::LocalImage { path: path.clone() });
}
content
}
fn replay_rollout_user_messages(rollout_items: &[RolloutItem]) -> Vec<Option<UserMessageEvent>> {
let mut replayed_entries = Vec::new();
let mut replayed_prefix = Vec::new();
for rollout_item in rollout_items {
replayed_prefix.push(rollout_item.clone());
match rollout_item {
RolloutItem::ResponseItem(response_item) => {
replayed_entries.push(ReplayEntry {
response_item: response_item.clone(),
user_message_event: None,
});
}
RolloutItem::EventMsg(EventMsg::UserMessage(user_message_event)) => {
if let Some(entry) = replayed_entries.iter_mut().rev().find(|entry| {
is_user_response_item(&entry.response_item)
&& entry.user_message_event.is_none()
}) {
entry.user_message_event = Some(user_message_event.clone());
}
}
RolloutItem::Compacted(compacted) => {
let replayed_response_items = replay_rollout_response_items(&replayed_prefix);
replayed_entries = if compacted.replacement_history.is_some() {
replayed_response_items
.into_iter()
.map(|response_item| ReplayEntry {
response_item,
user_message_event: None,
})
.collect()
} else {
carry_user_message_events(&replayed_entries, &replayed_response_items)
};
}
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(_)) => {
let replayed_response_items = replay_rollout_response_items(&replayed_prefix);
replayed_entries =
carry_user_message_events(&replayed_entries, &replayed_response_items);
}
_ => {}
}
}
replayed_entries
.into_iter()
.filter_map(|entry| {
if is_user_response_item(&entry.response_item) {
Some(entry.user_message_event)
} else {
None
}
})
.collect()
}
fn carry_user_message_events(
replayed_entries: &[ReplayEntry],
replayed_response_items: &[ResponseItem],
) -> Vec<ReplayEntry> {
let mut replayed_entry_cursor = 0usize;
replayed_response_items
.iter()
.map(|response_item| {
let user_message_event = replayed_entries
.iter()
.enumerate()
.skip(replayed_entry_cursor)
.find(|(_, entry)| entry.response_item == *response_item)
.and_then(|(idx, entry)| {
replayed_entry_cursor = idx + 1;
entry.user_message_event.clone()
});
ReplayEntry {
response_item: response_item.clone(),
user_message_event: if is_user_response_item(response_item) {
user_message_event
} else {
None
},
}
})
.collect()
}
fn is_user_response_item(response_item: &ResponseItem) -> bool {
matches!(
response_item,
ResponseItem::Message { role, .. } if role == "user"
)
}
#[derive(Clone)]
struct ReplayEntry {
response_item: ResponseItem,
user_message_event: Option<UserMessageEvent>,
}
fn finish_current_turn(turns: &mut Vec<Turn>, current_turn: &mut Option<Turn>) {
if let Some(turn) = current_turn.take()
&& !turn.items.is_empty()
{
turns.push(turn);
}
}
fn next_turn_id(next_turn_index: &mut i64) -> String {
let id = format!("turn-{next_turn_index}");
*next_turn_index += 1;
id
}
fn next_item_id(next_item_index: &mut i64) -> String {
let id = format!("item-{next_item_index}");
*next_item_index += 1;
id
}
pub(crate) async fn read_turns_from_rollout(path: &Path) -> std::io::Result<Vec<Turn>> {
let items = match RolloutRecorder::get_rollout_history(path).await? {
InitialHistory::New => Vec::new(),
InitialHistory::Forked(items) => items,
InitialHistory::Resumed(resumed) => resumed.history,
};
Ok(items
.into_iter()
.filter_map(|item| match item {
RolloutItem::EventMsg(event) => Some(event),
_ => None,
})
.collect())
Ok(build_turns_from_rollout_items(&items))
}
fn extract_conversation_summary(
@@ -5195,11 +5415,230 @@ pub(crate) fn summary_to_thread(summary: ConversationSummary) -> Thread {
mod tests {
use super::*;
use anyhow::Result;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::CompactedItem;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::ThreadRolledBackEvent;
use codex_protocol::protocol::UserMessageEvent;
use codex_protocol::user_input::ByteRange;
use codex_protocol::user_input::TextElement;
use pretty_assertions::assert_eq;
use serde_json::json;
use tempfile::TempDir;
fn user_msg(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: text.to_string(),
}],
end_turn: None,
phase: None,
}
}
fn assistant_msg(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: text.to_string(),
}],
end_turn: None,
phase: None,
}
}
fn user_message_event(message: &str, placeholder: &str) -> UserMessageEvent {
UserMessageEvent {
message: message.to_string(),
images: None,
local_images: Vec::new(),
text_elements: vec![TextElement::new(
ByteRange {
start: 0,
end: message.len(),
},
Some(placeholder.to_string()),
)],
}
}
#[test]
fn build_turns_from_rollout_items_applies_compaction_replacement_history() {
let rollout_items = vec![
RolloutItem::ResponseItem(user_msg("old user")),
RolloutItem::ResponseItem(assistant_msg("old assistant")),
RolloutItem::Compacted(CompactedItem {
message: "summary".to_string(),
replacement_history: Some(vec![user_msg("summary user")]),
}),
RolloutItem::ResponseItem(user_msg("latest user")),
RolloutItem::ResponseItem(assistant_msg("latest assistant")),
];
let turns = build_turns_from_rollout_items(&rollout_items);
let expected = vec![
Turn {
id: "turn-1".to_string(),
items: vec![ThreadItem::UserMessage {
id: "item-1".to_string(),
content: vec![V2UserInput::Text {
text: "summary user".to_string(),
text_elements: Vec::new(),
}],
}],
error: None,
status: TurnStatus::Completed,
},
Turn {
id: "turn-2".to_string(),
items: vec![
ThreadItem::UserMessage {
id: "item-2".to_string(),
content: vec![V2UserInput::Text {
text: "latest user".to_string(),
text_elements: Vec::new(),
}],
},
ThreadItem::AgentMessage {
id: "item-3".to_string(),
text: "latest assistant".to_string(),
},
],
error: None,
status: TurnStatus::Completed,
},
];
assert_eq!(turns, expected);
}
#[test]
fn build_turns_from_rollout_items_applies_compaction_and_rollback() {
let rollout_items = vec![
RolloutItem::ResponseItem(user_msg("old user")),
RolloutItem::ResponseItem(assistant_msg("old assistant")),
RolloutItem::Compacted(CompactedItem {
message: "summary".to_string(),
replacement_history: Some(vec![user_msg("summary user")]),
}),
RolloutItem::ResponseItem(user_msg("latest user")),
RolloutItem::ResponseItem(assistant_msg("latest assistant")),
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(ThreadRolledBackEvent {
num_turns: 1,
})),
];
let turns = build_turns_from_rollout_items(&rollout_items);
let expected = vec![Turn {
id: "turn-1".to_string(),
items: vec![ThreadItem::UserMessage {
id: "item-1".to_string(),
content: vec![V2UserInput::Text {
text: "summary user".to_string(),
text_elements: Vec::new(),
}],
}],
error: None,
status: TurnStatus::Completed,
}];
assert_eq!(turns, expected);
}
#[test]
fn build_turns_from_rollout_items_preserves_text_elements_from_user_message_events() {
let rollout_items = vec![
RolloutItem::ResponseItem(user_msg("hello")),
RolloutItem::EventMsg(EventMsg::UserMessage(user_message_event("hello", "<note>"))),
];
let turns = build_turns_from_rollout_items(&rollout_items);
let expected = vec![Turn {
id: "turn-1".to_string(),
items: vec![ThreadItem::UserMessage {
id: "item-1".to_string(),
content: vec![V2UserInput::Text {
text: "hello".to_string(),
text_elements: vec![
TextElement::new(
ByteRange { start: 0, end: 5 },
Some("<note>".to_string()),
)
.into(),
],
}],
}],
error: None,
status: TurnStatus::Completed,
}];
assert_eq!(turns, expected);
}
#[test]
fn build_turns_from_rollout_items_does_not_reuse_pre_compaction_user_message_events() {
let rollout_items = vec![
RolloutItem::ResponseItem(user_msg("old user")),
RolloutItem::EventMsg(EventMsg::UserMessage(user_message_event(
"old user", "<old>",
))),
RolloutItem::Compacted(CompactedItem {
message: "summary".to_string(),
replacement_history: Some(vec![user_msg("summary user")]),
}),
RolloutItem::ResponseItem(user_msg("latest user")),
RolloutItem::EventMsg(EventMsg::UserMessage(user_message_event(
"latest user",
"<latest>",
))),
];
let turns = build_turns_from_rollout_items(&rollout_items);
let expected = vec![
Turn {
id: "turn-1".to_string(),
items: vec![ThreadItem::UserMessage {
id: "item-1".to_string(),
content: vec![V2UserInput::Text {
text: "summary user".to_string(),
text_elements: Vec::new(),
}],
}],
error: None,
status: TurnStatus::Completed,
},
Turn {
id: "turn-2".to_string(),
items: vec![ThreadItem::UserMessage {
id: "item-2".to_string(),
content: vec![V2UserInput::Text {
text: "latest user".to_string(),
text_elements: vec![
TextElement::new(
ByteRange {
start: 0,
end: "latest user".len(),
},
Some("<latest>".to_string()),
)
.into(),
],
}],
}],
error: None,
status: TurnStatus::Completed,
},
];
assert_eq!(turns, expected);
}
#[test]
fn validate_dynamic_tools_rejects_unsupported_input_schema() {
let tools = vec![ApiDynamicToolSpec {

View File

@@ -2,6 +2,7 @@ use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_fake_rollout_with_text_elements;
use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::rollout_path;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
@@ -11,6 +12,15 @@ use codex_app_server_protocol::ThreadReadParams;
use codex_app_server_protocol::ThreadReadResponse;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::UserInput;
use codex_protocol::ThreadId;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::CompactedItem;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::SessionMeta;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::SessionSource as CoreSessionSource;
use codex_protocol::user_input::ByteRange;
use codex_protocol::user_input::TextElement;
use pretty_assertions::assert_eq;
@@ -134,6 +144,156 @@ async fn thread_read_can_include_turns() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn thread_read_include_turns_replays_compaction_replacement_history() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let conversation_id = create_rollout_with_compaction_replacement(
codex_home.path(),
"2025-01-06T12-00-00",
"2025-01-06T12:00:00Z",
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let read_id = mcp
.send_thread_read_request(ThreadReadParams {
thread_id: conversation_id,
include_turns: true,
})
.await?;
let read_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
)
.await??;
let ThreadReadResponse { thread } = to_response::<ThreadReadResponse>(read_resp)?;
assert_eq!(thread.turns.len(), 2);
assert_eq!(
thread.turns[0].items,
vec![ThreadItem::UserMessage {
id: "item-1".to_string(),
content: vec![UserInput::Text {
text: "compacted summary".to_string(),
text_elements: Vec::new(),
}],
}]
);
assert_eq!(
thread.turns[1].items,
vec![
ThreadItem::UserMessage {
id: "item-2".to_string(),
content: vec![UserInput::Text {
text: "latest user".to_string(),
text_elements: Vec::new(),
}],
},
ThreadItem::AgentMessage {
id: "item-3".to_string(),
text: "latest assistant".to_string(),
},
]
);
Ok(())
}
fn create_rollout_with_compaction_replacement(
codex_home: &Path,
filename_ts: &str,
meta_rfc3339: &str,
) -> Result<String> {
let uuid = uuid::Uuid::new_v4();
let thread_id = ThreadId::from_string(&uuid.to_string())?;
let path = rollout_path(codex_home, filename_ts, &uuid.to_string());
let parent = path
.parent()
.ok_or_else(|| anyhow::anyhow!("rollout path missing parent: {}", path.display()))?;
std::fs::create_dir_all(parent)?;
let session_meta = SessionMeta {
id: thread_id,
forked_from_id: None,
timestamp: meta_rfc3339.to_string(),
cwd: PathBuf::from("/"),
originator: "codex".to_string(),
cli_version: "0.0.0".to_string(),
source: CoreSessionSource::Cli,
model_provider: Some("mock_provider".to_string()),
base_instructions: None,
dynamic_tools: None,
};
let lines = [
RolloutLine {
timestamp: meta_rfc3339.to_string(),
item: RolloutItem::SessionMeta(SessionMetaLine {
meta: session_meta,
git: None,
}),
},
RolloutLine {
timestamp: meta_rfc3339.to_string(),
item: RolloutItem::ResponseItem(user_msg("old user")),
},
RolloutLine {
timestamp: meta_rfc3339.to_string(),
item: RolloutItem::ResponseItem(assistant_msg("old assistant")),
},
RolloutLine {
timestamp: meta_rfc3339.to_string(),
item: RolloutItem::Compacted(CompactedItem {
message: "summary".to_string(),
replacement_history: Some(vec![user_msg("compacted summary")]),
}),
},
RolloutLine {
timestamp: meta_rfc3339.to_string(),
item: RolloutItem::ResponseItem(user_msg("latest user")),
},
RolloutLine {
timestamp: meta_rfc3339.to_string(),
item: RolloutItem::ResponseItem(assistant_msg("latest assistant")),
},
];
let content = lines
.iter()
.map(serde_json::to_string)
.collect::<Result<Vec<_>, _>>()?
.join("\n");
std::fs::write(path, format!("{content}\n"))?;
Ok(uuid.to_string())
}
fn user_msg(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: text.to_string(),
}],
end_turn: None,
phase: None,
}
}
fn assistant_msg(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: text.to_string(),
}],
end_turn: None,
phase: None,
}
}
// Helper to create a config.toml pointing at the mock model server.
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");

View File

@@ -2,6 +2,7 @@ use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::create_mock_responses_server_sequence_unchecked;
use app_test_support::rollout_path;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
@@ -14,7 +15,19 @@ use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::UserInput as V2UserInput;
use codex_protocol::ThreadId;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::CompactedItem;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::SessionMeta;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::SessionSource as CoreSessionSource;
use codex_protocol::protocol::ThreadRolledBackEvent;
use pretty_assertions::assert_eq;
use std::path::PathBuf;
use tempfile::TempDir;
use tokio::time::timeout;
@@ -157,6 +170,146 @@ async fn thread_rollback_drops_last_turns_and_persists_to_rollout() -> Result<()
Ok(())
}
#[tokio::test]
async fn thread_resume_replays_compaction_and_rollback_markers() -> Result<()> {
let responses = vec![create_final_assistant_message_sse_response("Done")?];
let server = create_mock_responses_server_sequence_unchecked(responses).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let thread_id = create_rollout_with_compaction_and_rollback(
codex_home.path(),
"2025-01-07T12-00-00",
"2025-01-07T12:00:00Z",
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let resume_id = mcp
.send_thread_resume_request(ThreadResumeParams {
thread_id,
..Default::default()
})
.await?;
let resume_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
)
.await??;
let ThreadResumeResponse { thread, .. } = to_response::<ThreadResumeResponse>(resume_resp)?;
assert_eq!(thread.turns.len(), 1);
assert_eq!(
thread.turns[0].items,
vec![ThreadItem::UserMessage {
id: "item-1".to_string(),
content: vec![V2UserInput::Text {
text: "compacted summary".to_string(),
text_elements: Vec::new(),
}],
}]
);
Ok(())
}
fn create_rollout_with_compaction_and_rollback(
codex_home: &std::path::Path,
filename_ts: &str,
meta_rfc3339: &str,
) -> Result<String> {
let uuid = uuid::Uuid::new_v4();
let thread_id = ThreadId::from_string(&uuid.to_string())?;
let path = rollout_path(codex_home, filename_ts, &uuid.to_string());
let parent = path
.parent()
.ok_or_else(|| anyhow::anyhow!("rollout path missing parent: {}", path.display()))?;
std::fs::create_dir_all(parent)?;
let session_meta = SessionMeta {
id: thread_id,
forked_from_id: None,
timestamp: meta_rfc3339.to_string(),
cwd: PathBuf::from("/"),
originator: "codex".to_string(),
cli_version: "0.0.0".to_string(),
source: CoreSessionSource::Cli,
model_provider: Some("mock_provider".to_string()),
base_instructions: None,
dynamic_tools: None,
};
let lines = [
RolloutLine {
timestamp: meta_rfc3339.to_string(),
item: RolloutItem::SessionMeta(SessionMetaLine {
meta: session_meta,
git: None,
}),
},
RolloutLine {
timestamp: meta_rfc3339.to_string(),
item: RolloutItem::ResponseItem(user_msg("old user")),
},
RolloutLine {
timestamp: meta_rfc3339.to_string(),
item: RolloutItem::ResponseItem(assistant_msg("old assistant")),
},
RolloutLine {
timestamp: meta_rfc3339.to_string(),
item: RolloutItem::Compacted(CompactedItem {
message: "summary".to_string(),
replacement_history: Some(vec![user_msg("compacted summary")]),
}),
},
RolloutLine {
timestamp: meta_rfc3339.to_string(),
item: RolloutItem::ResponseItem(user_msg("latest user")),
},
RolloutLine {
timestamp: meta_rfc3339.to_string(),
item: RolloutItem::ResponseItem(assistant_msg("latest assistant")),
},
RolloutLine {
timestamp: meta_rfc3339.to_string(),
item: RolloutItem::EventMsg(EventMsg::ThreadRolledBack(ThreadRolledBackEvent {
num_turns: 1,
})),
},
];
let content = lines
.iter()
.map(serde_json::to_string)
.collect::<Result<Vec<_>, _>>()?
.join("\n");
std::fs::write(path, format!("{content}\n"))?;
Ok(uuid.to_string())
}
fn user_msg(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: text.to_string(),
}],
end_turn: None,
phase: None,
}
}
fn assistant_msg(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: text.to_string(),
}],
end_turn: None,
phase: None,
}
}
fn create_config_toml(codex_home: &std::path::Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(

View File

@@ -167,6 +167,7 @@ use crate::rollout::RolloutRecorder;
use crate::rollout::RolloutRecorderParams;
use crate::rollout::map_session_init_error;
use crate::rollout::metadata;
use crate::rollout::replay_rollout_response_items_with_initial_context;
use crate::shell;
use crate::shell_snapshot::ShellSnapshot;
use crate::skills::SkillError;
@@ -1757,34 +1758,12 @@ impl Session {
turn_context: &TurnContext,
rollout_items: &[RolloutItem],
) -> Vec<ResponseItem> {
let initial_context = self.build_initial_context(turn_context).await;
let replayed = replay_rollout_response_items_with_initial_context(rollout_items, |_| {
initial_context.clone()
});
let mut history = ContextManager::new();
for item in rollout_items {
match item {
RolloutItem::ResponseItem(response_item) => {
history.record_items(
std::iter::once(response_item),
turn_context.truncation_policy,
);
}
RolloutItem::Compacted(compacted) => {
if let Some(replacement) = &compacted.replacement_history {
history.replace(replacement.clone());
} else {
let user_messages = collect_user_messages(history.raw_items());
let rebuilt = compact::build_compacted_history(
self.build_initial_context(turn_context).await,
&user_messages,
&compacted.message,
);
history.replace(rebuilt);
}
}
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(rollback)) => {
history.drop_last_n_user_turns(rollback.num_turns);
}
_ => {}
}
}
history.record_items(replayed.iter(), turn_context.truncation_policy);
history.raw_items().to_vec()
}

View File

@@ -118,6 +118,8 @@ pub use rollout::list::ThreadsPage;
pub use rollout::list::parse_cursor;
pub use rollout::list::read_head_for_summary;
pub use rollout::list::read_session_meta_line;
pub use rollout::replay_rollout_response_items;
pub use rollout::replay_rollout_response_items_with_initial_context;
pub use rollout::rollout_date_parts;
pub use rollout::session_index::find_thread_names_by_ids;
pub use transport_manager::TransportManager;

View File

@@ -12,6 +12,7 @@ pub mod list;
pub(crate) mod metadata;
pub(crate) mod policy;
pub mod recorder;
pub mod replay;
pub(crate) mod session_index;
pub(crate) mod truncation;
@@ -24,6 +25,8 @@ pub use list::find_thread_path_by_id_str as find_conversation_path_by_id_str;
pub use list::rollout_date_parts;
pub use recorder::RolloutRecorder;
pub use recorder::RolloutRecorderParams;
pub use replay::replay_rollout_response_items;
pub use replay::replay_rollout_response_items_with_initial_context;
pub use session_index::find_thread_path_by_name_str;
#[cfg(test)]

View File

@@ -0,0 +1,214 @@
use crate::compact;
use crate::compact::collect_user_messages;
use crate::context_manager::is_user_turn_boundary;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::RolloutItem;
/// Replays rollout items into effective response history.
///
/// This applies compaction and rollback markers so callers can reconstruct
/// the post-replay thread state instead of the raw persisted stream.
pub fn replay_rollout_response_items(rollout_items: &[RolloutItem]) -> Vec<ResponseItem> {
replay_rollout_response_items_with_initial_context(rollout_items, infer_initial_context)
}
/// Replays rollout items into effective response history, with a custom initial-context provider
/// used when rebuilding compaction entries that do not have replacement history.
pub fn replay_rollout_response_items_with_initial_context<F>(
rollout_items: &[RolloutItem],
mut initial_context: F,
) -> Vec<ResponseItem>
where
F: FnMut(&[ResponseItem]) -> Vec<ResponseItem>,
{
let mut history = Vec::new();
for item in rollout_items {
match item {
RolloutItem::ResponseItem(response_item) => {
history.push(response_item.clone());
}
RolloutItem::Compacted(compacted) => {
if let Some(replacement) = &compacted.replacement_history {
history = replacement.clone();
} else {
let initial_context = initial_context(&history);
let user_messages = collect_user_messages(&history);
history = compact::build_compacted_history(
initial_context,
&user_messages,
&compacted.message,
);
}
}
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(rollback)) => {
drop_last_n_user_turns(&mut history, rollback.num_turns);
}
_ => {}
}
}
history
}
fn infer_initial_context(history: &[ResponseItem]) -> Vec<ResponseItem> {
match history.iter().position(is_user_turn_boundary) {
Some(first_user_idx) => history[..first_user_idx].to_vec(),
None => history.to_vec(),
}
}
fn drop_last_n_user_turns(history: &mut Vec<ResponseItem>, num_turns: u32) {
if num_turns == 0 {
return;
}
let user_positions = user_message_positions(history);
let Some(&first_user_idx) = user_positions.first() else {
return;
};
let n_from_end = usize::try_from(num_turns).unwrap_or(usize::MAX);
let cut_idx = if n_from_end >= user_positions.len() {
first_user_idx
} else {
user_positions[user_positions.len() - n_from_end]
};
history.truncate(cut_idx);
}
fn user_message_positions(items: &[ResponseItem]) -> Vec<usize> {
let mut positions = Vec::new();
for (idx, item) in items.iter().enumerate() {
if is_user_turn_boundary(item) {
positions.push(idx);
}
}
positions
}
#[cfg(test)]
mod tests {
use super::*;
use codex_protocol::models::ContentItem;
use codex_protocol::protocol::CompactedItem;
use codex_protocol::protocol::ThreadRolledBackEvent;
use pretty_assertions::assert_eq;
fn user_msg(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: text.to_string(),
}],
end_turn: None,
phase: None,
}
}
fn assistant_msg(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: text.to_string(),
}],
end_turn: None,
phase: None,
}
}
fn system_msg(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "system".to_string(),
content: vec![ContentItem::OutputText {
text: text.to_string(),
}],
end_turn: None,
phase: None,
}
}
#[test]
fn compaction_replay_uses_replacement_history_when_present() {
let replacement = vec![user_msg("replacement summary")];
let rollout = vec![
RolloutItem::ResponseItem(user_msg("before")),
RolloutItem::ResponseItem(assistant_msg("assistant before")),
RolloutItem::Compacted(CompactedItem {
message: "summary".to_string(),
replacement_history: Some(replacement.clone()),
}),
RolloutItem::ResponseItem(assistant_msg("after")),
];
let replayed = replay_rollout_response_items(&rollout);
let expected = vec![replacement[0].clone(), assistant_msg("after")];
assert_eq!(replayed, expected);
}
#[test]
fn compaction_replay_rebuilds_when_replacement_history_absent() {
let rollout = vec![
RolloutItem::ResponseItem(system_msg("prefix")),
RolloutItem::ResponseItem(user_msg("first user")),
RolloutItem::ResponseItem(assistant_msg("assistant reply")),
RolloutItem::Compacted(CompactedItem {
message: "summary".to_string(),
replacement_history: None,
}),
];
let replayed = replay_rollout_response_items(&rollout);
let expected = vec![
system_msg("prefix"),
user_msg("first user"),
user_msg("summary"),
];
assert_eq!(replayed, expected);
}
#[test]
fn rollback_marker_applies_after_compaction_replay() {
let rollout = vec![
RolloutItem::ResponseItem(user_msg("before")),
RolloutItem::ResponseItem(assistant_msg("assistant before")),
RolloutItem::Compacted(CompactedItem {
message: "summary".to_string(),
replacement_history: Some(vec![user_msg("summary")]),
}),
RolloutItem::ResponseItem(user_msg("latest")),
RolloutItem::ResponseItem(assistant_msg("assistant latest")),
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(ThreadRolledBackEvent {
num_turns: 1,
})),
];
let replayed = replay_rollout_response_items(&rollout);
assert_eq!(replayed, vec![user_msg("summary")]);
}
#[test]
fn multiple_rollback_markers_apply_in_sequence() {
let rollout = vec![
RolloutItem::ResponseItem(system_msg("prefix")),
RolloutItem::ResponseItem(user_msg("u1")),
RolloutItem::ResponseItem(assistant_msg("a1")),
RolloutItem::ResponseItem(user_msg("u2")),
RolloutItem::ResponseItem(assistant_msg("a2")),
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(ThreadRolledBackEvent {
num_turns: 1,
})),
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(ThreadRolledBackEvent {
num_turns: 1,
})),
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(ThreadRolledBackEvent {
num_turns: 1,
})),
];
let replayed = replay_rollout_response_items(&rollout);
assert_eq!(replayed, vec![system_msg("prefix")]);
}
}