mirror of
https://github.com/openai/codex.git
synced 2026-04-24 06:35:50 +00:00
Compare commits
1 Commits
ccunningha
...
codex/repl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6b0427219b |
@@ -3,8 +3,8 @@ use crate::codex_message_processor::PendingInterrupts;
|
||||
use crate::codex_message_processor::PendingRollbacks;
|
||||
use crate::codex_message_processor::TurnSummary;
|
||||
use crate::codex_message_processor::TurnSummaryStore;
|
||||
use crate::codex_message_processor::read_event_msgs_from_rollout;
|
||||
use crate::codex_message_processor::read_summary_from_rollout;
|
||||
use crate::codex_message_processor::read_turns_from_rollout;
|
||||
use crate::codex_message_processor::summary_to_thread;
|
||||
use crate::error_code::INTERNAL_ERROR_CODE;
|
||||
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
|
||||
@@ -69,7 +69,6 @@ use codex_app_server_protocol::TurnInterruptResponse;
|
||||
use codex_app_server_protocol::TurnPlanStep;
|
||||
use codex_app_server_protocol::TurnPlanUpdatedNotification;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_app_server_protocol::build_turns_from_event_msgs;
|
||||
use codex_core::CodexThread;
|
||||
use codex_core::parse_command::shlex_join;
|
||||
use codex_core::protocol::ApplyPatchApprovalRequestEvent;
|
||||
@@ -1077,9 +1076,9 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
{
|
||||
Ok(summary) => {
|
||||
let mut thread = summary_to_thread(summary);
|
||||
match read_event_msgs_from_rollout(rollout_path.as_path()).await {
|
||||
Ok(events) => {
|
||||
thread.turns = build_turns_from_event_msgs(&events);
|
||||
match read_turns_from_rollout(rollout_path.as_path()).await {
|
||||
Ok(turns) => {
|
||||
thread.turns = turns;
|
||||
ThreadRollbackResponse { thread }
|
||||
}
|
||||
Err(err) => {
|
||||
|
||||
@@ -135,7 +135,6 @@ use codex_app_server_protocol::TurnStatus;
|
||||
use codex_app_server_protocol::UserInfoResponse;
|
||||
use codex_app_server_protocol::UserInput as V2UserInput;
|
||||
use codex_app_server_protocol::UserSavedConfig;
|
||||
use codex_app_server_protocol::build_turns_from_event_msgs;
|
||||
use codex_backend_client::Client as BackendClient;
|
||||
use codex_chatgpt::connectors;
|
||||
use codex_core::AuthManager;
|
||||
@@ -178,6 +177,7 @@ use codex_core::protocol::ReviewTarget as CoreReviewTarget;
|
||||
use codex_core::protocol::SessionConfiguredEvent;
|
||||
use codex_core::read_head_for_summary;
|
||||
use codex_core::read_session_meta_line;
|
||||
use codex_core::replay_rollout_response_items;
|
||||
use codex_core::rollout_date_parts;
|
||||
use codex_core::sandboxing::SandboxPermissions;
|
||||
use codex_core::skills::remote::download_remote_skill;
|
||||
@@ -204,6 +204,7 @@ use codex_protocol::protocol::RateLimitSnapshot as CoreRateLimitSnapshot;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::SessionMetaLine;
|
||||
use codex_protocol::protocol::USER_MESSAGE_BEGIN;
|
||||
use codex_protocol::protocol::UserMessageEvent;
|
||||
use codex_protocol::user_input::UserInput as CoreInputItem;
|
||||
use codex_rmcp_client::perform_oauth_login_return_url;
|
||||
use codex_utils_json_to_toml::json_to_toml;
|
||||
@@ -2259,9 +2260,9 @@ impl CodexMessageProcessor {
|
||||
};
|
||||
|
||||
if include_turns && let Some(rollout_path) = rollout_path.as_ref() {
|
||||
match read_event_msgs_from_rollout(rollout_path).await {
|
||||
Ok(events) => {
|
||||
thread.turns = build_turns_from_event_msgs(&events);
|
||||
match read_turns_from_rollout(rollout_path).await {
|
||||
Ok(turns) => {
|
||||
thread.turns = turns;
|
||||
}
|
||||
Err(err) => {
|
||||
self.send_internal_error(
|
||||
@@ -2442,11 +2443,7 @@ impl CodexMessageProcessor {
|
||||
session_configured,
|
||||
..
|
||||
}) => {
|
||||
let SessionConfiguredEvent {
|
||||
rollout_path,
|
||||
initial_messages,
|
||||
..
|
||||
} = session_configured;
|
||||
let SessionConfiguredEvent { rollout_path, .. } = session_configured;
|
||||
let Some(rollout_path) = rollout_path else {
|
||||
self.send_internal_error(
|
||||
request_id,
|
||||
@@ -2486,9 +2483,20 @@ impl CodexMessageProcessor {
|
||||
return;
|
||||
}
|
||||
};
|
||||
thread.turns = initial_messages
|
||||
.as_deref()
|
||||
.map_or_else(Vec::new, build_turns_from_event_msgs);
|
||||
thread.turns = match read_turns_from_rollout(rollout_path.as_path()).await {
|
||||
Ok(turns) => turns,
|
||||
Err(err) => {
|
||||
self.send_internal_error(
|
||||
request_id,
|
||||
format!(
|
||||
"failed to load rollout `{}` for thread {thread_id}: {err}",
|
||||
rollout_path.display()
|
||||
),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let response = ThreadResumeResponse {
|
||||
thread,
|
||||
@@ -2654,11 +2662,7 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
};
|
||||
|
||||
let SessionConfiguredEvent {
|
||||
rollout_path,
|
||||
initial_messages,
|
||||
..
|
||||
} = session_configured;
|
||||
let SessionConfiguredEvent { rollout_path, .. } = session_configured;
|
||||
let Some(rollout_path) = rollout_path else {
|
||||
self.send_internal_error(
|
||||
request_id,
|
||||
@@ -2698,9 +2702,20 @@ impl CodexMessageProcessor {
|
||||
return;
|
||||
}
|
||||
};
|
||||
thread.turns = initial_messages
|
||||
.as_deref()
|
||||
.map_or_else(Vec::new, build_turns_from_event_msgs);
|
||||
thread.turns = match read_turns_from_rollout(rollout_path.as_path()).await {
|
||||
Ok(turns) => turns,
|
||||
Err(err) => {
|
||||
self.send_internal_error(
|
||||
request_id,
|
||||
format!(
|
||||
"failed to load rollout `{}` for thread {thread_id}: {err}",
|
||||
rollout_path.display()
|
||||
),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let response = ThreadForkResponse {
|
||||
thread: thread.clone(),
|
||||
@@ -5043,22 +5058,227 @@ pub(crate) async fn read_summary_from_rollout(
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn read_event_msgs_from_rollout(
|
||||
path: &Path,
|
||||
) -> std::io::Result<Vec<codex_protocol::protocol::EventMsg>> {
|
||||
fn build_turns_from_rollout_items(rollout_items: &[RolloutItem]) -> Vec<Turn> {
|
||||
let response_items = replay_rollout_response_items(rollout_items);
|
||||
let replayed_user_messages = replay_rollout_user_messages(rollout_items);
|
||||
build_turns_from_response_items(&response_items, &replayed_user_messages)
|
||||
}
|
||||
|
||||
fn build_turns_from_response_items(
|
||||
response_items: &[ResponseItem],
|
||||
replayed_user_messages: &[Option<UserMessageEvent>],
|
||||
) -> Vec<Turn> {
|
||||
let mut turns = Vec::new();
|
||||
let mut current_turn: Option<Turn> = None;
|
||||
let mut next_turn_index: i64 = 1;
|
||||
let mut next_item_index: i64 = 1;
|
||||
let mut next_user_message_index: usize = 0;
|
||||
|
||||
for response_item in response_items {
|
||||
let Some(mut turn_item) = codex_core::parse_turn_item(response_item) else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let is_user_message = matches!(&turn_item, TurnItem::UserMessage(_));
|
||||
if is_user_message {
|
||||
if let Some(user_message_event) = replayed_user_messages
|
||||
.get(next_user_message_index)
|
||||
.and_then(Option::as_ref)
|
||||
{
|
||||
overwrite_user_message_content(&mut turn_item, user_message_event);
|
||||
}
|
||||
next_user_message_index += 1;
|
||||
finish_current_turn(&mut turns, &mut current_turn);
|
||||
current_turn = Some(Turn {
|
||||
id: next_turn_id(&mut next_turn_index),
|
||||
items: Vec::new(),
|
||||
error: None,
|
||||
status: TurnStatus::Completed,
|
||||
});
|
||||
} else if current_turn.is_none() {
|
||||
current_turn = Some(Turn {
|
||||
id: next_turn_id(&mut next_turn_index),
|
||||
items: Vec::new(),
|
||||
error: None,
|
||||
status: TurnStatus::Completed,
|
||||
});
|
||||
}
|
||||
|
||||
overwrite_turn_item_id(&mut turn_item, next_item_id(&mut next_item_index));
|
||||
if let Some(turn) = current_turn.as_mut() {
|
||||
turn.items.push(ThreadItem::from(turn_item));
|
||||
}
|
||||
}
|
||||
|
||||
finish_current_turn(&mut turns, &mut current_turn);
|
||||
turns
|
||||
}
|
||||
|
||||
fn overwrite_turn_item_id(turn_item: &mut TurnItem, id: String) {
|
||||
let item_id = match turn_item {
|
||||
TurnItem::UserMessage(item) => &mut item.id,
|
||||
TurnItem::AgentMessage(item) => &mut item.id,
|
||||
TurnItem::Plan(item) => &mut item.id,
|
||||
TurnItem::Reasoning(item) => &mut item.id,
|
||||
TurnItem::WebSearch(item) => &mut item.id,
|
||||
TurnItem::ContextCompaction(item) => &mut item.id,
|
||||
};
|
||||
*item_id = id;
|
||||
}
|
||||
|
||||
fn overwrite_user_message_content(turn_item: &mut TurnItem, user_message_event: &UserMessageEvent) {
|
||||
if let TurnItem::UserMessage(user_message) = turn_item {
|
||||
user_message.content = user_message_content_from_event(user_message_event);
|
||||
}
|
||||
}
|
||||
|
||||
fn user_message_content_from_event(user_message_event: &UserMessageEvent) -> Vec<CoreInputItem> {
|
||||
let mut content = Vec::new();
|
||||
if !user_message_event.message.trim().is_empty() {
|
||||
content.push(CoreInputItem::Text {
|
||||
text: user_message_event.message.clone(),
|
||||
text_elements: user_message_event.text_elements.clone(),
|
||||
});
|
||||
}
|
||||
if let Some(images) = &user_message_event.images {
|
||||
for image in images {
|
||||
content.push(CoreInputItem::Image {
|
||||
image_url: image.clone(),
|
||||
});
|
||||
}
|
||||
}
|
||||
for path in &user_message_event.local_images {
|
||||
content.push(CoreInputItem::LocalImage { path: path.clone() });
|
||||
}
|
||||
content
|
||||
}
|
||||
|
||||
fn replay_rollout_user_messages(rollout_items: &[RolloutItem]) -> Vec<Option<UserMessageEvent>> {
|
||||
let mut replayed_entries = Vec::new();
|
||||
let mut replayed_prefix = Vec::new();
|
||||
|
||||
for rollout_item in rollout_items {
|
||||
replayed_prefix.push(rollout_item.clone());
|
||||
match rollout_item {
|
||||
RolloutItem::ResponseItem(response_item) => {
|
||||
replayed_entries.push(ReplayEntry {
|
||||
response_item: response_item.clone(),
|
||||
user_message_event: None,
|
||||
});
|
||||
}
|
||||
RolloutItem::EventMsg(EventMsg::UserMessage(user_message_event)) => {
|
||||
if let Some(entry) = replayed_entries.iter_mut().rev().find(|entry| {
|
||||
is_user_response_item(&entry.response_item)
|
||||
&& entry.user_message_event.is_none()
|
||||
}) {
|
||||
entry.user_message_event = Some(user_message_event.clone());
|
||||
}
|
||||
}
|
||||
RolloutItem::Compacted(compacted) => {
|
||||
let replayed_response_items = replay_rollout_response_items(&replayed_prefix);
|
||||
replayed_entries = if compacted.replacement_history.is_some() {
|
||||
replayed_response_items
|
||||
.into_iter()
|
||||
.map(|response_item| ReplayEntry {
|
||||
response_item,
|
||||
user_message_event: None,
|
||||
})
|
||||
.collect()
|
||||
} else {
|
||||
carry_user_message_events(&replayed_entries, &replayed_response_items)
|
||||
};
|
||||
}
|
||||
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(_)) => {
|
||||
let replayed_response_items = replay_rollout_response_items(&replayed_prefix);
|
||||
replayed_entries =
|
||||
carry_user_message_events(&replayed_entries, &replayed_response_items);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
replayed_entries
|
||||
.into_iter()
|
||||
.filter_map(|entry| {
|
||||
if is_user_response_item(&entry.response_item) {
|
||||
Some(entry.user_message_event)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn carry_user_message_events(
|
||||
replayed_entries: &[ReplayEntry],
|
||||
replayed_response_items: &[ResponseItem],
|
||||
) -> Vec<ReplayEntry> {
|
||||
let mut replayed_entry_cursor = 0usize;
|
||||
replayed_response_items
|
||||
.iter()
|
||||
.map(|response_item| {
|
||||
let user_message_event = replayed_entries
|
||||
.iter()
|
||||
.enumerate()
|
||||
.skip(replayed_entry_cursor)
|
||||
.find(|(_, entry)| entry.response_item == *response_item)
|
||||
.and_then(|(idx, entry)| {
|
||||
replayed_entry_cursor = idx + 1;
|
||||
entry.user_message_event.clone()
|
||||
});
|
||||
ReplayEntry {
|
||||
response_item: response_item.clone(),
|
||||
user_message_event: if is_user_response_item(response_item) {
|
||||
user_message_event
|
||||
} else {
|
||||
None
|
||||
},
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn is_user_response_item(response_item: &ResponseItem) -> bool {
|
||||
matches!(
|
||||
response_item,
|
||||
ResponseItem::Message { role, .. } if role == "user"
|
||||
)
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct ReplayEntry {
|
||||
response_item: ResponseItem,
|
||||
user_message_event: Option<UserMessageEvent>,
|
||||
}
|
||||
|
||||
fn finish_current_turn(turns: &mut Vec<Turn>, current_turn: &mut Option<Turn>) {
|
||||
if let Some(turn) = current_turn.take()
|
||||
&& !turn.items.is_empty()
|
||||
{
|
||||
turns.push(turn);
|
||||
}
|
||||
}
|
||||
|
||||
fn next_turn_id(next_turn_index: &mut i64) -> String {
|
||||
let id = format!("turn-{next_turn_index}");
|
||||
*next_turn_index += 1;
|
||||
id
|
||||
}
|
||||
|
||||
fn next_item_id(next_item_index: &mut i64) -> String {
|
||||
let id = format!("item-{next_item_index}");
|
||||
*next_item_index += 1;
|
||||
id
|
||||
}
|
||||
|
||||
pub(crate) async fn read_turns_from_rollout(path: &Path) -> std::io::Result<Vec<Turn>> {
|
||||
let items = match RolloutRecorder::get_rollout_history(path).await? {
|
||||
InitialHistory::New => Vec::new(),
|
||||
InitialHistory::Forked(items) => items,
|
||||
InitialHistory::Resumed(resumed) => resumed.history,
|
||||
};
|
||||
|
||||
Ok(items
|
||||
.into_iter()
|
||||
.filter_map(|item| match item {
|
||||
RolloutItem::EventMsg(event) => Some(event),
|
||||
_ => None,
|
||||
})
|
||||
.collect())
|
||||
Ok(build_turns_from_rollout_items(&items))
|
||||
}
|
||||
|
||||
fn extract_conversation_summary(
|
||||
@@ -5195,11 +5415,230 @@ pub(crate) fn summary_to_thread(summary: ConversationSummary) -> Thread {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use anyhow::Result;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::CompactedItem;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::ThreadRolledBackEvent;
|
||||
use codex_protocol::protocol::UserMessageEvent;
|
||||
use codex_protocol::user_input::ByteRange;
|
||||
use codex_protocol::user_input::TextElement;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use tempfile::TempDir;
|
||||
|
||||
fn user_msg(text: &str) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::InputText {
|
||||
text: text.to_string(),
|
||||
}],
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn assistant_msg(text: &str) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".to_string(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: text.to_string(),
|
||||
}],
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn user_message_event(message: &str, placeholder: &str) -> UserMessageEvent {
|
||||
UserMessageEvent {
|
||||
message: message.to_string(),
|
||||
images: None,
|
||||
local_images: Vec::new(),
|
||||
text_elements: vec![TextElement::new(
|
||||
ByteRange {
|
||||
start: 0,
|
||||
end: message.len(),
|
||||
},
|
||||
Some(placeholder.to_string()),
|
||||
)],
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn build_turns_from_rollout_items_applies_compaction_replacement_history() {
|
||||
let rollout_items = vec![
|
||||
RolloutItem::ResponseItem(user_msg("old user")),
|
||||
RolloutItem::ResponseItem(assistant_msg("old assistant")),
|
||||
RolloutItem::Compacted(CompactedItem {
|
||||
message: "summary".to_string(),
|
||||
replacement_history: Some(vec![user_msg("summary user")]),
|
||||
}),
|
||||
RolloutItem::ResponseItem(user_msg("latest user")),
|
||||
RolloutItem::ResponseItem(assistant_msg("latest assistant")),
|
||||
];
|
||||
|
||||
let turns = build_turns_from_rollout_items(&rollout_items);
|
||||
let expected = vec![
|
||||
Turn {
|
||||
id: "turn-1".to_string(),
|
||||
items: vec![ThreadItem::UserMessage {
|
||||
id: "item-1".to_string(),
|
||||
content: vec![V2UserInput::Text {
|
||||
text: "summary user".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
}],
|
||||
error: None,
|
||||
status: TurnStatus::Completed,
|
||||
},
|
||||
Turn {
|
||||
id: "turn-2".to_string(),
|
||||
items: vec![
|
||||
ThreadItem::UserMessage {
|
||||
id: "item-2".to_string(),
|
||||
content: vec![V2UserInput::Text {
|
||||
text: "latest user".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
},
|
||||
ThreadItem::AgentMessage {
|
||||
id: "item-3".to_string(),
|
||||
text: "latest assistant".to_string(),
|
||||
},
|
||||
],
|
||||
error: None,
|
||||
status: TurnStatus::Completed,
|
||||
},
|
||||
];
|
||||
|
||||
assert_eq!(turns, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn build_turns_from_rollout_items_applies_compaction_and_rollback() {
|
||||
let rollout_items = vec![
|
||||
RolloutItem::ResponseItem(user_msg("old user")),
|
||||
RolloutItem::ResponseItem(assistant_msg("old assistant")),
|
||||
RolloutItem::Compacted(CompactedItem {
|
||||
message: "summary".to_string(),
|
||||
replacement_history: Some(vec![user_msg("summary user")]),
|
||||
}),
|
||||
RolloutItem::ResponseItem(user_msg("latest user")),
|
||||
RolloutItem::ResponseItem(assistant_msg("latest assistant")),
|
||||
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(ThreadRolledBackEvent {
|
||||
num_turns: 1,
|
||||
})),
|
||||
];
|
||||
|
||||
let turns = build_turns_from_rollout_items(&rollout_items);
|
||||
let expected = vec![Turn {
|
||||
id: "turn-1".to_string(),
|
||||
items: vec![ThreadItem::UserMessage {
|
||||
id: "item-1".to_string(),
|
||||
content: vec![V2UserInput::Text {
|
||||
text: "summary user".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
}],
|
||||
error: None,
|
||||
status: TurnStatus::Completed,
|
||||
}];
|
||||
|
||||
assert_eq!(turns, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn build_turns_from_rollout_items_preserves_text_elements_from_user_message_events() {
|
||||
let rollout_items = vec![
|
||||
RolloutItem::ResponseItem(user_msg("hello")),
|
||||
RolloutItem::EventMsg(EventMsg::UserMessage(user_message_event("hello", "<note>"))),
|
||||
];
|
||||
|
||||
let turns = build_turns_from_rollout_items(&rollout_items);
|
||||
let expected = vec![Turn {
|
||||
id: "turn-1".to_string(),
|
||||
items: vec![ThreadItem::UserMessage {
|
||||
id: "item-1".to_string(),
|
||||
content: vec![V2UserInput::Text {
|
||||
text: "hello".to_string(),
|
||||
text_elements: vec![
|
||||
TextElement::new(
|
||||
ByteRange { start: 0, end: 5 },
|
||||
Some("<note>".to_string()),
|
||||
)
|
||||
.into(),
|
||||
],
|
||||
}],
|
||||
}],
|
||||
error: None,
|
||||
status: TurnStatus::Completed,
|
||||
}];
|
||||
|
||||
assert_eq!(turns, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn build_turns_from_rollout_items_does_not_reuse_pre_compaction_user_message_events() {
|
||||
let rollout_items = vec![
|
||||
RolloutItem::ResponseItem(user_msg("old user")),
|
||||
RolloutItem::EventMsg(EventMsg::UserMessage(user_message_event(
|
||||
"old user", "<old>",
|
||||
))),
|
||||
RolloutItem::Compacted(CompactedItem {
|
||||
message: "summary".to_string(),
|
||||
replacement_history: Some(vec![user_msg("summary user")]),
|
||||
}),
|
||||
RolloutItem::ResponseItem(user_msg("latest user")),
|
||||
RolloutItem::EventMsg(EventMsg::UserMessage(user_message_event(
|
||||
"latest user",
|
||||
"<latest>",
|
||||
))),
|
||||
];
|
||||
|
||||
let turns = build_turns_from_rollout_items(&rollout_items);
|
||||
let expected = vec![
|
||||
Turn {
|
||||
id: "turn-1".to_string(),
|
||||
items: vec![ThreadItem::UserMessage {
|
||||
id: "item-1".to_string(),
|
||||
content: vec![V2UserInput::Text {
|
||||
text: "summary user".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
}],
|
||||
error: None,
|
||||
status: TurnStatus::Completed,
|
||||
},
|
||||
Turn {
|
||||
id: "turn-2".to_string(),
|
||||
items: vec![ThreadItem::UserMessage {
|
||||
id: "item-2".to_string(),
|
||||
content: vec![V2UserInput::Text {
|
||||
text: "latest user".to_string(),
|
||||
text_elements: vec![
|
||||
TextElement::new(
|
||||
ByteRange {
|
||||
start: 0,
|
||||
end: "latest user".len(),
|
||||
},
|
||||
Some("<latest>".to_string()),
|
||||
)
|
||||
.into(),
|
||||
],
|
||||
}],
|
||||
}],
|
||||
error: None,
|
||||
status: TurnStatus::Completed,
|
||||
},
|
||||
];
|
||||
|
||||
assert_eq!(turns, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn validate_dynamic_tools_rejects_unsupported_input_schema() {
|
||||
let tools = vec![ApiDynamicToolSpec {
|
||||
|
||||
@@ -2,6 +2,7 @@ use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_fake_rollout_with_text_elements;
|
||||
use app_test_support::create_mock_responses_server_repeating_assistant;
|
||||
use app_test_support::rollout_path;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
@@ -11,6 +12,15 @@ use codex_app_server_protocol::ThreadReadParams;
|
||||
use codex_app_server_protocol::ThreadReadResponse;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_app_server_protocol::UserInput;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::CompactedItem;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::RolloutLine;
|
||||
use codex_protocol::protocol::SessionMeta;
|
||||
use codex_protocol::protocol::SessionMetaLine;
|
||||
use codex_protocol::protocol::SessionSource as CoreSessionSource;
|
||||
use codex_protocol::user_input::ByteRange;
|
||||
use codex_protocol::user_input::TextElement;
|
||||
use pretty_assertions::assert_eq;
|
||||
@@ -134,6 +144,156 @@ async fn thread_read_can_include_turns() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_read_include_turns_replays_compaction_replacement_history() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
let conversation_id = create_rollout_with_compaction_replacement(
|
||||
codex_home.path(),
|
||||
"2025-01-06T12-00-00",
|
||||
"2025-01-06T12:00:00Z",
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let read_id = mcp
|
||||
.send_thread_read_request(ThreadReadParams {
|
||||
thread_id: conversation_id,
|
||||
include_turns: true,
|
||||
})
|
||||
.await?;
|
||||
let read_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadReadResponse { thread } = to_response::<ThreadReadResponse>(read_resp)?;
|
||||
|
||||
assert_eq!(thread.turns.len(), 2);
|
||||
assert_eq!(
|
||||
thread.turns[0].items,
|
||||
vec![ThreadItem::UserMessage {
|
||||
id: "item-1".to_string(),
|
||||
content: vec![UserInput::Text {
|
||||
text: "compacted summary".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
}]
|
||||
);
|
||||
assert_eq!(
|
||||
thread.turns[1].items,
|
||||
vec![
|
||||
ThreadItem::UserMessage {
|
||||
id: "item-2".to_string(),
|
||||
content: vec![UserInput::Text {
|
||||
text: "latest user".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
},
|
||||
ThreadItem::AgentMessage {
|
||||
id: "item-3".to_string(),
|
||||
text: "latest assistant".to_string(),
|
||||
},
|
||||
]
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn create_rollout_with_compaction_replacement(
|
||||
codex_home: &Path,
|
||||
filename_ts: &str,
|
||||
meta_rfc3339: &str,
|
||||
) -> Result<String> {
|
||||
let uuid = uuid::Uuid::new_v4();
|
||||
let thread_id = ThreadId::from_string(&uuid.to_string())?;
|
||||
let path = rollout_path(codex_home, filename_ts, &uuid.to_string());
|
||||
let parent = path
|
||||
.parent()
|
||||
.ok_or_else(|| anyhow::anyhow!("rollout path missing parent: {}", path.display()))?;
|
||||
std::fs::create_dir_all(parent)?;
|
||||
|
||||
let session_meta = SessionMeta {
|
||||
id: thread_id,
|
||||
forked_from_id: None,
|
||||
timestamp: meta_rfc3339.to_string(),
|
||||
cwd: PathBuf::from("/"),
|
||||
originator: "codex".to_string(),
|
||||
cli_version: "0.0.0".to_string(),
|
||||
source: CoreSessionSource::Cli,
|
||||
model_provider: Some("mock_provider".to_string()),
|
||||
base_instructions: None,
|
||||
dynamic_tools: None,
|
||||
};
|
||||
|
||||
let lines = [
|
||||
RolloutLine {
|
||||
timestamp: meta_rfc3339.to_string(),
|
||||
item: RolloutItem::SessionMeta(SessionMetaLine {
|
||||
meta: session_meta,
|
||||
git: None,
|
||||
}),
|
||||
},
|
||||
RolloutLine {
|
||||
timestamp: meta_rfc3339.to_string(),
|
||||
item: RolloutItem::ResponseItem(user_msg("old user")),
|
||||
},
|
||||
RolloutLine {
|
||||
timestamp: meta_rfc3339.to_string(),
|
||||
item: RolloutItem::ResponseItem(assistant_msg("old assistant")),
|
||||
},
|
||||
RolloutLine {
|
||||
timestamp: meta_rfc3339.to_string(),
|
||||
item: RolloutItem::Compacted(CompactedItem {
|
||||
message: "summary".to_string(),
|
||||
replacement_history: Some(vec![user_msg("compacted summary")]),
|
||||
}),
|
||||
},
|
||||
RolloutLine {
|
||||
timestamp: meta_rfc3339.to_string(),
|
||||
item: RolloutItem::ResponseItem(user_msg("latest user")),
|
||||
},
|
||||
RolloutLine {
|
||||
timestamp: meta_rfc3339.to_string(),
|
||||
item: RolloutItem::ResponseItem(assistant_msg("latest assistant")),
|
||||
},
|
||||
];
|
||||
|
||||
let content = lines
|
||||
.iter()
|
||||
.map(serde_json::to_string)
|
||||
.collect::<Result<Vec<_>, _>>()?
|
||||
.join("\n");
|
||||
std::fs::write(path, format!("{content}\n"))?;
|
||||
Ok(uuid.to_string())
|
||||
}
|
||||
|
||||
fn user_msg(text: &str) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::InputText {
|
||||
text: text.to_string(),
|
||||
}],
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn assistant_msg(text: &str) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".to_string(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: text.to_string(),
|
||||
}],
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
}
|
||||
}
|
||||
|
||||
// Helper to create a config.toml pointing at the mock model server.
|
||||
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
|
||||
let config_toml = codex_home.join("config.toml");
|
||||
|
||||
@@ -2,6 +2,7 @@ use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_final_assistant_message_sse_response;
|
||||
use app_test_support::create_mock_responses_server_sequence_unchecked;
|
||||
use app_test_support::rollout_path;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
@@ -14,7 +15,19 @@ use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::UserInput as V2UserInput;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::CompactedItem;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::RolloutLine;
|
||||
use codex_protocol::protocol::SessionMeta;
|
||||
use codex_protocol::protocol::SessionMetaLine;
|
||||
use codex_protocol::protocol::SessionSource as CoreSessionSource;
|
||||
use codex_protocol::protocol::ThreadRolledBackEvent;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::path::PathBuf;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
|
||||
@@ -157,6 +170,146 @@ async fn thread_rollback_drops_last_turns_and_persists_to_rollout() -> Result<()
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_resume_replays_compaction_and_rollback_markers() -> Result<()> {
|
||||
let responses = vec![create_final_assistant_message_sse_response("Done")?];
|
||||
let server = create_mock_responses_server_sequence_unchecked(responses).await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
let thread_id = create_rollout_with_compaction_and_rollback(
|
||||
codex_home.path(),
|
||||
"2025-01-07T12-00-00",
|
||||
"2025-01-07T12:00:00Z",
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let resume_id = mcp
|
||||
.send_thread_resume_request(ThreadResumeParams {
|
||||
thread_id,
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let resume_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadResumeResponse { thread, .. } = to_response::<ThreadResumeResponse>(resume_resp)?;
|
||||
|
||||
assert_eq!(thread.turns.len(), 1);
|
||||
assert_eq!(
|
||||
thread.turns[0].items,
|
||||
vec![ThreadItem::UserMessage {
|
||||
id: "item-1".to_string(),
|
||||
content: vec![V2UserInput::Text {
|
||||
text: "compacted summary".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
}]
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn create_rollout_with_compaction_and_rollback(
|
||||
codex_home: &std::path::Path,
|
||||
filename_ts: &str,
|
||||
meta_rfc3339: &str,
|
||||
) -> Result<String> {
|
||||
let uuid = uuid::Uuid::new_v4();
|
||||
let thread_id = ThreadId::from_string(&uuid.to_string())?;
|
||||
let path = rollout_path(codex_home, filename_ts, &uuid.to_string());
|
||||
let parent = path
|
||||
.parent()
|
||||
.ok_or_else(|| anyhow::anyhow!("rollout path missing parent: {}", path.display()))?;
|
||||
std::fs::create_dir_all(parent)?;
|
||||
|
||||
let session_meta = SessionMeta {
|
||||
id: thread_id,
|
||||
forked_from_id: None,
|
||||
timestamp: meta_rfc3339.to_string(),
|
||||
cwd: PathBuf::from("/"),
|
||||
originator: "codex".to_string(),
|
||||
cli_version: "0.0.0".to_string(),
|
||||
source: CoreSessionSource::Cli,
|
||||
model_provider: Some("mock_provider".to_string()),
|
||||
base_instructions: None,
|
||||
dynamic_tools: None,
|
||||
};
|
||||
let lines = [
|
||||
RolloutLine {
|
||||
timestamp: meta_rfc3339.to_string(),
|
||||
item: RolloutItem::SessionMeta(SessionMetaLine {
|
||||
meta: session_meta,
|
||||
git: None,
|
||||
}),
|
||||
},
|
||||
RolloutLine {
|
||||
timestamp: meta_rfc3339.to_string(),
|
||||
item: RolloutItem::ResponseItem(user_msg("old user")),
|
||||
},
|
||||
RolloutLine {
|
||||
timestamp: meta_rfc3339.to_string(),
|
||||
item: RolloutItem::ResponseItem(assistant_msg("old assistant")),
|
||||
},
|
||||
RolloutLine {
|
||||
timestamp: meta_rfc3339.to_string(),
|
||||
item: RolloutItem::Compacted(CompactedItem {
|
||||
message: "summary".to_string(),
|
||||
replacement_history: Some(vec![user_msg("compacted summary")]),
|
||||
}),
|
||||
},
|
||||
RolloutLine {
|
||||
timestamp: meta_rfc3339.to_string(),
|
||||
item: RolloutItem::ResponseItem(user_msg("latest user")),
|
||||
},
|
||||
RolloutLine {
|
||||
timestamp: meta_rfc3339.to_string(),
|
||||
item: RolloutItem::ResponseItem(assistant_msg("latest assistant")),
|
||||
},
|
||||
RolloutLine {
|
||||
timestamp: meta_rfc3339.to_string(),
|
||||
item: RolloutItem::EventMsg(EventMsg::ThreadRolledBack(ThreadRolledBackEvent {
|
||||
num_turns: 1,
|
||||
})),
|
||||
},
|
||||
];
|
||||
|
||||
let content = lines
|
||||
.iter()
|
||||
.map(serde_json::to_string)
|
||||
.collect::<Result<Vec<_>, _>>()?
|
||||
.join("\n");
|
||||
std::fs::write(path, format!("{content}\n"))?;
|
||||
Ok(uuid.to_string())
|
||||
}
|
||||
|
||||
fn user_msg(text: &str) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::InputText {
|
||||
text: text.to_string(),
|
||||
}],
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn assistant_msg(text: &str) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".to_string(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: text.to_string(),
|
||||
}],
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn create_config_toml(codex_home: &std::path::Path, server_uri: &str) -> std::io::Result<()> {
|
||||
let config_toml = codex_home.join("config.toml");
|
||||
std::fs::write(
|
||||
|
||||
@@ -167,6 +167,7 @@ use crate::rollout::RolloutRecorder;
|
||||
use crate::rollout::RolloutRecorderParams;
|
||||
use crate::rollout::map_session_init_error;
|
||||
use crate::rollout::metadata;
|
||||
use crate::rollout::replay_rollout_response_items_with_initial_context;
|
||||
use crate::shell;
|
||||
use crate::shell_snapshot::ShellSnapshot;
|
||||
use crate::skills::SkillError;
|
||||
@@ -1757,34 +1758,12 @@ impl Session {
|
||||
turn_context: &TurnContext,
|
||||
rollout_items: &[RolloutItem],
|
||||
) -> Vec<ResponseItem> {
|
||||
let initial_context = self.build_initial_context(turn_context).await;
|
||||
let replayed = replay_rollout_response_items_with_initial_context(rollout_items, |_| {
|
||||
initial_context.clone()
|
||||
});
|
||||
let mut history = ContextManager::new();
|
||||
for item in rollout_items {
|
||||
match item {
|
||||
RolloutItem::ResponseItem(response_item) => {
|
||||
history.record_items(
|
||||
std::iter::once(response_item),
|
||||
turn_context.truncation_policy,
|
||||
);
|
||||
}
|
||||
RolloutItem::Compacted(compacted) => {
|
||||
if let Some(replacement) = &compacted.replacement_history {
|
||||
history.replace(replacement.clone());
|
||||
} else {
|
||||
let user_messages = collect_user_messages(history.raw_items());
|
||||
let rebuilt = compact::build_compacted_history(
|
||||
self.build_initial_context(turn_context).await,
|
||||
&user_messages,
|
||||
&compacted.message,
|
||||
);
|
||||
history.replace(rebuilt);
|
||||
}
|
||||
}
|
||||
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(rollback)) => {
|
||||
history.drop_last_n_user_turns(rollback.num_turns);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
history.record_items(replayed.iter(), turn_context.truncation_policy);
|
||||
history.raw_items().to_vec()
|
||||
}
|
||||
|
||||
|
||||
@@ -118,6 +118,8 @@ pub use rollout::list::ThreadsPage;
|
||||
pub use rollout::list::parse_cursor;
|
||||
pub use rollout::list::read_head_for_summary;
|
||||
pub use rollout::list::read_session_meta_line;
|
||||
pub use rollout::replay_rollout_response_items;
|
||||
pub use rollout::replay_rollout_response_items_with_initial_context;
|
||||
pub use rollout::rollout_date_parts;
|
||||
pub use rollout::session_index::find_thread_names_by_ids;
|
||||
pub use transport_manager::TransportManager;
|
||||
|
||||
@@ -12,6 +12,7 @@ pub mod list;
|
||||
pub(crate) mod metadata;
|
||||
pub(crate) mod policy;
|
||||
pub mod recorder;
|
||||
pub mod replay;
|
||||
pub(crate) mod session_index;
|
||||
pub(crate) mod truncation;
|
||||
|
||||
@@ -24,6 +25,8 @@ pub use list::find_thread_path_by_id_str as find_conversation_path_by_id_str;
|
||||
pub use list::rollout_date_parts;
|
||||
pub use recorder::RolloutRecorder;
|
||||
pub use recorder::RolloutRecorderParams;
|
||||
pub use replay::replay_rollout_response_items;
|
||||
pub use replay::replay_rollout_response_items_with_initial_context;
|
||||
pub use session_index::find_thread_path_by_name_str;
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
214
codex-rs/core/src/rollout/replay.rs
Normal file
214
codex-rs/core/src/rollout/replay.rs
Normal file
@@ -0,0 +1,214 @@
|
||||
use crate::compact;
|
||||
use crate::compact::collect_user_messages;
|
||||
use crate::context_manager::is_user_turn_boundary;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
|
||||
/// Replays rollout items into effective response history.
|
||||
///
|
||||
/// This applies compaction and rollback markers so callers can reconstruct
|
||||
/// the post-replay thread state instead of the raw persisted stream.
|
||||
pub fn replay_rollout_response_items(rollout_items: &[RolloutItem]) -> Vec<ResponseItem> {
|
||||
replay_rollout_response_items_with_initial_context(rollout_items, infer_initial_context)
|
||||
}
|
||||
|
||||
/// Replays rollout items into effective response history, with a custom initial-context provider
|
||||
/// used when rebuilding compaction entries that do not have replacement history.
|
||||
pub fn replay_rollout_response_items_with_initial_context<F>(
|
||||
rollout_items: &[RolloutItem],
|
||||
mut initial_context: F,
|
||||
) -> Vec<ResponseItem>
|
||||
where
|
||||
F: FnMut(&[ResponseItem]) -> Vec<ResponseItem>,
|
||||
{
|
||||
let mut history = Vec::new();
|
||||
for item in rollout_items {
|
||||
match item {
|
||||
RolloutItem::ResponseItem(response_item) => {
|
||||
history.push(response_item.clone());
|
||||
}
|
||||
RolloutItem::Compacted(compacted) => {
|
||||
if let Some(replacement) = &compacted.replacement_history {
|
||||
history = replacement.clone();
|
||||
} else {
|
||||
let initial_context = initial_context(&history);
|
||||
let user_messages = collect_user_messages(&history);
|
||||
history = compact::build_compacted_history(
|
||||
initial_context,
|
||||
&user_messages,
|
||||
&compacted.message,
|
||||
);
|
||||
}
|
||||
}
|
||||
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(rollback)) => {
|
||||
drop_last_n_user_turns(&mut history, rollback.num_turns);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
history
|
||||
}
|
||||
|
||||
fn infer_initial_context(history: &[ResponseItem]) -> Vec<ResponseItem> {
|
||||
match history.iter().position(is_user_turn_boundary) {
|
||||
Some(first_user_idx) => history[..first_user_idx].to_vec(),
|
||||
None => history.to_vec(),
|
||||
}
|
||||
}
|
||||
|
||||
fn drop_last_n_user_turns(history: &mut Vec<ResponseItem>, num_turns: u32) {
|
||||
if num_turns == 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
let user_positions = user_message_positions(history);
|
||||
let Some(&first_user_idx) = user_positions.first() else {
|
||||
return;
|
||||
};
|
||||
|
||||
let n_from_end = usize::try_from(num_turns).unwrap_or(usize::MAX);
|
||||
let cut_idx = if n_from_end >= user_positions.len() {
|
||||
first_user_idx
|
||||
} else {
|
||||
user_positions[user_positions.len() - n_from_end]
|
||||
};
|
||||
history.truncate(cut_idx);
|
||||
}
|
||||
|
||||
fn user_message_positions(items: &[ResponseItem]) -> Vec<usize> {
|
||||
let mut positions = Vec::new();
|
||||
for (idx, item) in items.iter().enumerate() {
|
||||
if is_user_turn_boundary(item) {
|
||||
positions.push(idx);
|
||||
}
|
||||
}
|
||||
positions
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::protocol::CompactedItem;
|
||||
use codex_protocol::protocol::ThreadRolledBackEvent;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
fn user_msg(text: &str) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::InputText {
|
||||
text: text.to_string(),
|
||||
}],
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn assistant_msg(text: &str) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".to_string(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: text.to_string(),
|
||||
}],
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn system_msg(text: &str) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
role: "system".to_string(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: text.to_string(),
|
||||
}],
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn compaction_replay_uses_replacement_history_when_present() {
|
||||
let replacement = vec![user_msg("replacement summary")];
|
||||
let rollout = vec![
|
||||
RolloutItem::ResponseItem(user_msg("before")),
|
||||
RolloutItem::ResponseItem(assistant_msg("assistant before")),
|
||||
RolloutItem::Compacted(CompactedItem {
|
||||
message: "summary".to_string(),
|
||||
replacement_history: Some(replacement.clone()),
|
||||
}),
|
||||
RolloutItem::ResponseItem(assistant_msg("after")),
|
||||
];
|
||||
|
||||
let replayed = replay_rollout_response_items(&rollout);
|
||||
let expected = vec![replacement[0].clone(), assistant_msg("after")];
|
||||
assert_eq!(replayed, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn compaction_replay_rebuilds_when_replacement_history_absent() {
|
||||
let rollout = vec![
|
||||
RolloutItem::ResponseItem(system_msg("prefix")),
|
||||
RolloutItem::ResponseItem(user_msg("first user")),
|
||||
RolloutItem::ResponseItem(assistant_msg("assistant reply")),
|
||||
RolloutItem::Compacted(CompactedItem {
|
||||
message: "summary".to_string(),
|
||||
replacement_history: None,
|
||||
}),
|
||||
];
|
||||
|
||||
let replayed = replay_rollout_response_items(&rollout);
|
||||
let expected = vec![
|
||||
system_msg("prefix"),
|
||||
user_msg("first user"),
|
||||
user_msg("summary"),
|
||||
];
|
||||
assert_eq!(replayed, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rollback_marker_applies_after_compaction_replay() {
|
||||
let rollout = vec![
|
||||
RolloutItem::ResponseItem(user_msg("before")),
|
||||
RolloutItem::ResponseItem(assistant_msg("assistant before")),
|
||||
RolloutItem::Compacted(CompactedItem {
|
||||
message: "summary".to_string(),
|
||||
replacement_history: Some(vec![user_msg("summary")]),
|
||||
}),
|
||||
RolloutItem::ResponseItem(user_msg("latest")),
|
||||
RolloutItem::ResponseItem(assistant_msg("assistant latest")),
|
||||
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(ThreadRolledBackEvent {
|
||||
num_turns: 1,
|
||||
})),
|
||||
];
|
||||
|
||||
let replayed = replay_rollout_response_items(&rollout);
|
||||
assert_eq!(replayed, vec![user_msg("summary")]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn multiple_rollback_markers_apply_in_sequence() {
|
||||
let rollout = vec![
|
||||
RolloutItem::ResponseItem(system_msg("prefix")),
|
||||
RolloutItem::ResponseItem(user_msg("u1")),
|
||||
RolloutItem::ResponseItem(assistant_msg("a1")),
|
||||
RolloutItem::ResponseItem(user_msg("u2")),
|
||||
RolloutItem::ResponseItem(assistant_msg("a2")),
|
||||
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(ThreadRolledBackEvent {
|
||||
num_turns: 1,
|
||||
})),
|
||||
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(ThreadRolledBackEvent {
|
||||
num_turns: 1,
|
||||
})),
|
||||
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(ThreadRolledBackEvent {
|
||||
num_turns: 1,
|
||||
})),
|
||||
];
|
||||
|
||||
let replayed = replay_rollout_response_items(&rollout);
|
||||
assert_eq!(replayed, vec![system_msg("prefix")]);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user