Compare commits

...

5 Commits

Author SHA1 Message Date
celia-oai
6ae688dd9f fix tests 2026-02-09 20:52:27 -08:00
Celia Chen
3453efaa75 Merge branch 'main' into dev/cc/agent 2026-02-09 20:08:18 -08:00
celia-oai
5eba8b2088 changes 2026-02-09 19:47:47 -08:00
celia-oai
eaea9d6725 changes 2026-02-09 19:23:51 -08:00
celia-oai
d99aecc485 changes 2026-02-09 17:31:28 -08:00
13 changed files with 393 additions and 169 deletions

View File

@@ -7,19 +7,21 @@ use codex_protocol::protocol::AgentReasoningEvent;
use codex_protocol::protocol::AgentReasoningRawContentEvent;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::ItemCompletedEvent;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::ThreadRolledBackEvent;
use codex_protocol::protocol::TurnAbortedEvent;
use codex_protocol::protocol::TurnContextItem;
use codex_protocol::protocol::UserMessageEvent;
use uuid::Uuid;
/// Convert persisted [`EventMsg`] entries into a sequence of [`Turn`] values.
/// Convert persisted [`RolloutItem`] entries into a sequence of [`Turn`] values.
///
/// The purpose of this is to convert the EventMsgs persisted in a rollout file
/// into a sequence of Turns and ThreadItems, which allows the client to render
/// the historical messages when resuming a thread.
pub fn build_turns_from_event_msgs(events: &[EventMsg]) -> Vec<Turn> {
/// When available, this uses `TurnContext.turn_id` as the canonical turn id so
/// resumed/rebuilt thread history preserves the original turn identifiers.
pub fn build_turns_from_rollout_items(items: &[RolloutItem]) -> Vec<Turn> {
let mut builder = ThreadHistoryBuilder::new();
for event in events {
builder.handle_event(event);
for item in items {
builder.handle_rollout_item(item);
}
builder.finish()
}
@@ -27,7 +29,12 @@ pub fn build_turns_from_event_msgs(events: &[EventMsg]) -> Vec<Turn> {
struct ThreadHistoryBuilder {
turns: Vec<Turn>,
current_turn: Option<PendingTurn>,
next_turn_index: i64,
/// Carries the most recently seen `TurnContext.turn_id` that should apply
/// to the next reconstructed turn. This is needed because rollout entries
/// may place `TurnContext` before the next `UserMessage` (or occasionally
/// while finalizing the previous turn), so we buffer the id until the next
/// turn is created.
pending_turn_id: Option<String>,
next_item_index: i64,
}
@@ -36,7 +43,7 @@ impl ThreadHistoryBuilder {
Self {
turns: Vec::new(),
current_turn: None,
next_turn_index: 1,
pending_turn_id: None,
next_item_index: 1,
}
}
@@ -67,6 +74,33 @@ impl ThreadHistoryBuilder {
}
}
fn handle_rollout_item(&mut self, item: &RolloutItem) {
match item {
RolloutItem::TurnContext(payload) => self.handle_turn_context(payload),
RolloutItem::EventMsg(event) => self.handle_event(event),
RolloutItem::SessionMeta(_)
| RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_) => {}
}
}
fn handle_turn_context(&mut self, payload: &TurnContextItem) {
let Some(turn_id) = payload.turn_id.clone() else {
return;
};
if let Some(turn) = self.current_turn.as_mut() {
if turn.id.is_none() {
turn.id = Some(turn_id);
} else if turn.id.as_deref() != Some(turn_id.as_str()) {
self.pending_turn_id = Some(turn_id);
}
return;
}
self.pending_turn_id = Some(turn_id);
}
fn handle_user_message(&mut self, payload: &UserMessageEvent) {
self.finish_current_turn();
let mut turn = self.new_turn();
@@ -157,9 +191,7 @@ impl ThreadHistoryBuilder {
self.turns.truncate(self.turns.len().saturating_sub(n));
}
// Re-number subsequent synthetic ids so the pruned history is consistent.
self.next_turn_index =
i64::try_from(self.turns.len().saturating_add(1)).unwrap_or(i64::MAX);
// Re-number synthetic item ids so the pruned history is consistent.
let item_count: usize = self.turns.iter().map(|t| t.items.len()).sum();
self.next_item_index = i64::try_from(item_count.saturating_add(1)).unwrap_or(i64::MAX);
}
@@ -169,13 +201,23 @@ impl ThreadHistoryBuilder {
if turn.items.is_empty() {
return;
}
self.turns.push(turn.into());
let turn_id = match turn.id {
Some(id) => id,
None => Uuid::now_v7().to_string(),
};
self.turns.push(Turn {
id: turn_id,
items: turn.items,
error: turn.error,
status: turn.status,
});
}
}
fn new_turn(&mut self) -> PendingTurn {
let turn_id = self.pending_turn_id.take();
PendingTurn {
id: self.next_turn_id(),
id: turn_id,
items: Vec::new(),
error: None,
status: TurnStatus::Completed,
@@ -195,12 +237,6 @@ impl ThreadHistoryBuilder {
unreachable!("current turn must exist after initialization");
}
fn next_turn_id(&mut self) -> String {
let id = format!("turn-{}", self.next_turn_index);
self.next_turn_index += 1;
id
}
fn next_item_id(&mut self) -> String {
let id = format!("item-{}", self.next_item_index);
self.next_item_index += 1;
@@ -233,69 +269,86 @@ impl ThreadHistoryBuilder {
}
struct PendingTurn {
id: String,
id: Option<String>,
items: Vec<ThreadItem>,
error: Option<TurnError>,
status: TurnStatus,
}
impl From<PendingTurn> for Turn {
fn from(value: PendingTurn) -> Self {
Self {
id: value.id,
items: value.items,
error: value.error,
status: value.status,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use codex_protocol::config_types::ReasoningSummary;
use codex_protocol::protocol::AgentMessageEvent;
use codex_protocol::protocol::AgentReasoningEvent;
use codex_protocol::protocol::AgentReasoningRawContentEvent;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::ThreadRolledBackEvent;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnAbortedEvent;
use codex_protocol::protocol::TurnContextItem;
use codex_protocol::protocol::UserMessageEvent;
use pretty_assertions::assert_eq;
use std::path::PathBuf;
fn turn_context(turn_id: Option<&str>) -> TurnContextItem {
TurnContextItem {
turn_id: turn_id.map(ToString::to_string),
cwd: PathBuf::from("/tmp"),
approval_policy: AskForApproval::OnRequest,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: "gpt-5".to_string(),
personality: None,
collaboration_mode: None,
effort: None,
summary: ReasoningSummary::Auto,
user_instructions: None,
developer_instructions: None,
final_output_json_schema: None,
truncation_policy: None,
}
}
#[test]
fn builds_multiple_turns_with_reasoning_items() {
let events = vec![
EventMsg::UserMessage(UserMessageEvent {
let items = vec![
RolloutItem::TurnContext(turn_context(Some("uuid-turn-1"))),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "First turn".into(),
images: Some(vec!["https://example.com/one.png".into()]),
text_elements: Vec::new(),
local_images: Vec::new(),
}),
EventMsg::AgentMessage(AgentMessageEvent {
})),
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
message: "Hi there".into(),
}),
EventMsg::AgentReasoning(AgentReasoningEvent {
})),
RolloutItem::EventMsg(EventMsg::AgentReasoning(AgentReasoningEvent {
text: "thinking".into(),
}),
EventMsg::AgentReasoningRawContent(AgentReasoningRawContentEvent {
text: "full reasoning".into(),
}),
EventMsg::UserMessage(UserMessageEvent {
})),
RolloutItem::EventMsg(EventMsg::AgentReasoningRawContent(
AgentReasoningRawContentEvent {
text: "full reasoning".into(),
},
)),
RolloutItem::TurnContext(turn_context(Some("uuid-turn-2"))),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "Second turn".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
}),
EventMsg::AgentMessage(AgentMessageEvent {
})),
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
message: "Reply two".into(),
}),
})),
];
let turns = build_turns_from_event_msgs(&events);
let turns = build_turns_from_rollout_items(&items);
assert_eq!(turns.len(), 2);
let first = &turns[0];
assert_eq!(first.id, "turn-1");
assert_eq!(first.id, "uuid-turn-1");
assert_eq!(first.status, TurnStatus::Completed);
assert_eq!(first.items.len(), 3);
assert_eq!(
@@ -330,7 +383,7 @@ mod tests {
);
let second = &turns[1];
assert_eq!(second.id, "turn-2");
assert_eq!(second.id, "uuid-turn-2");
assert_eq!(second.items.len(), 2);
assert_eq!(
second.items[0],
@@ -353,28 +406,31 @@ mod tests {
#[test]
fn splits_reasoning_when_interleaved() {
let events = vec![
EventMsg::UserMessage(UserMessageEvent {
let items = vec![
RolloutItem::TurnContext(turn_context(Some("uuid-turn-1"))),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "Turn start".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
}),
EventMsg::AgentReasoning(AgentReasoningEvent {
})),
RolloutItem::EventMsg(EventMsg::AgentReasoning(AgentReasoningEvent {
text: "first summary".into(),
}),
EventMsg::AgentReasoningRawContent(AgentReasoningRawContentEvent {
text: "first content".into(),
}),
EventMsg::AgentMessage(AgentMessageEvent {
})),
RolloutItem::EventMsg(EventMsg::AgentReasoningRawContent(
AgentReasoningRawContentEvent {
text: "first content".into(),
},
)),
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
message: "interlude".into(),
}),
EventMsg::AgentReasoning(AgentReasoningEvent {
})),
RolloutItem::EventMsg(EventMsg::AgentReasoning(AgentReasoningEvent {
text: "second summary".into(),
}),
})),
];
let turns = build_turns_from_event_msgs(&events);
let turns = build_turns_from_rollout_items(&items);
assert_eq!(turns.len(), 1);
let turn = &turns[0];
assert_eq!(turn.items.len(), 4);
@@ -399,34 +455,37 @@ mod tests {
#[test]
fn marks_turn_as_interrupted_when_aborted() {
let events = vec![
EventMsg::UserMessage(UserMessageEvent {
let items = vec![
RolloutItem::TurnContext(turn_context(Some("uuid-turn-1"))),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "Please do the thing".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
}),
EventMsg::AgentMessage(AgentMessageEvent {
})),
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
message: "Working...".into(),
}),
EventMsg::TurnAborted(TurnAbortedEvent {
})),
RolloutItem::EventMsg(EventMsg::TurnAborted(TurnAbortedEvent {
reason: TurnAbortReason::Replaced,
}),
EventMsg::UserMessage(UserMessageEvent {
})),
RolloutItem::TurnContext(turn_context(Some("uuid-turn-2"))),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "Let's try again".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
}),
EventMsg::AgentMessage(AgentMessageEvent {
})),
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
message: "Second attempt complete.".into(),
}),
})),
];
let turns = build_turns_from_event_msgs(&events);
let turns = build_turns_from_rollout_items(&items);
assert_eq!(turns.len(), 2);
let first_turn = &turns[0];
assert_eq!(first_turn.id, "uuid-turn-1");
assert_eq!(first_turn.status, TurnStatus::Interrupted);
assert_eq!(first_turn.items.len(), 2);
assert_eq!(
@@ -448,6 +507,7 @@ mod tests {
);
let second_turn = &turns[1];
assert_eq!(second_turn.id, "uuid-turn-2");
assert_eq!(second_turn.status, TurnStatus::Completed);
assert_eq!(second_turn.items.len(), 2);
assert_eq!(
@@ -471,41 +531,46 @@ mod tests {
#[test]
fn drops_last_turns_on_thread_rollback() {
let events = vec![
EventMsg::UserMessage(UserMessageEvent {
let items = vec![
RolloutItem::TurnContext(turn_context(Some("uuid-turn-1"))),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "First".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
}),
EventMsg::AgentMessage(AgentMessageEvent {
})),
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
message: "A1".into(),
}),
EventMsg::UserMessage(UserMessageEvent {
})),
RolloutItem::TurnContext(turn_context(Some("uuid-turn-2"))),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "Second".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
}),
EventMsg::AgentMessage(AgentMessageEvent {
})),
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
message: "A2".into(),
}),
EventMsg::ThreadRolledBack(ThreadRolledBackEvent { num_turns: 1 }),
EventMsg::UserMessage(UserMessageEvent {
})),
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(ThreadRolledBackEvent {
num_turns: 1,
})),
RolloutItem::TurnContext(turn_context(Some("uuid-turn-3"))),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "Third".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
}),
EventMsg::AgentMessage(AgentMessageEvent {
})),
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
message: "A3".into(),
}),
})),
];
let turns = build_turns_from_event_msgs(&events);
let turns = build_turns_from_rollout_items(&items);
let expected = vec![
Turn {
id: "turn-1".into(),
id: "uuid-turn-1".into(),
status: TurnStatus::Completed,
error: None,
items: vec![
@@ -523,7 +588,7 @@ mod tests {
],
},
Turn {
id: "turn-2".into(),
id: "uuid-turn-3".into(),
status: TurnStatus::Completed,
error: None,
items: vec![
@@ -546,29 +611,147 @@ mod tests {
#[test]
fn thread_rollback_clears_all_turns_when_num_turns_exceeds_history() {
let events = vec![
EventMsg::UserMessage(UserMessageEvent {
let items = vec![
RolloutItem::TurnContext(turn_context(Some("uuid-turn-1"))),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "One".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
}),
EventMsg::AgentMessage(AgentMessageEvent {
})),
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
message: "A1".into(),
}),
EventMsg::UserMessage(UserMessageEvent {
})),
RolloutItem::TurnContext(turn_context(Some("uuid-turn-2"))),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "Two".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
}),
EventMsg::AgentMessage(AgentMessageEvent {
})),
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
message: "A2".into(),
}),
EventMsg::ThreadRolledBack(ThreadRolledBackEvent { num_turns: 99 }),
})),
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(ThreadRolledBackEvent {
num_turns: 99,
})),
];
let turns = build_turns_from_event_msgs(&events);
let turns = build_turns_from_rollout_items(&items);
assert_eq!(turns, Vec::<Turn>::new());
}
#[test]
fn rebuild_uses_turn_context_turn_ids() {
let items = vec![
RolloutItem::TurnContext(turn_context(Some("uuid-turn-1"))),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "First".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
})),
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
message: "A1".into(),
})),
RolloutItem::TurnContext(turn_context(Some("uuid-turn-2"))),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "Second".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
})),
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
message: "A2".into(),
})),
];
let turns = build_turns_from_rollout_items(&items);
assert_eq!(turns.len(), 2);
assert_eq!(turns[0].id, "uuid-turn-1");
assert_eq!(turns[1].id, "uuid-turn-2");
}
#[test]
fn rebuild_updates_synthetic_id_when_turn_context_arrives_late() {
let items = vec![
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "First".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
})),
RolloutItem::TurnContext(turn_context(Some("uuid-turn-1"))),
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
message: "A1".into(),
})),
];
let turns = build_turns_from_rollout_items(&items);
assert_eq!(turns.len(), 1);
assert_eq!(turns[0].id, "uuid-turn-1");
}
#[test]
fn repeated_turn_context_for_open_turn_does_not_seed_next_turn_with_same_id() {
let items = vec![
RolloutItem::TurnContext(turn_context(Some("uuid-turn-1"))),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "First".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
})),
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
message: "A1".into(),
})),
RolloutItem::TurnContext(turn_context(Some("uuid-turn-1"))),
RolloutItem::TurnContext(turn_context(Some("uuid-turn-1"))),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "Second".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
})),
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
message: "A2".into(),
})),
];
let turns = build_turns_from_rollout_items(&items);
assert_eq!(turns.len(), 2);
assert_eq!(turns[0].id, "uuid-turn-1");
assert_ne!(turns[1].id, "uuid-turn-1");
}
#[test]
fn different_turn_context_for_open_turn_is_buffered_for_next_turn() {
let items = vec![
RolloutItem::TurnContext(turn_context(Some("uuid-turn-1"))),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "First".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
})),
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
message: "A1".into(),
})),
RolloutItem::TurnContext(turn_context(Some("uuid-turn-2"))),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "Second".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
})),
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
message: "A2".into(),
})),
];
let turns = build_turns_from_rollout_items(&items);
assert_eq!(turns.len(), 2);
assert_eq!(turns[0].id, "uuid-turn-1");
assert_eq!(turns[1].id, "uuid-turn-2");
}
}

View File

@@ -3,7 +3,7 @@ use crate::codex_message_processor::PendingInterrupts;
use crate::codex_message_processor::PendingRollbacks;
use crate::codex_message_processor::TurnSummary;
use crate::codex_message_processor::TurnSummaryStore;
use crate::codex_message_processor::read_event_msgs_from_rollout;
use crate::codex_message_processor::read_rollout_items_from_rollout;
use crate::codex_message_processor::read_summary_from_rollout;
use crate::codex_message_processor::summary_to_thread;
use crate::error_code::INTERNAL_ERROR_CODE;
@@ -69,7 +69,7 @@ use codex_app_server_protocol::TurnInterruptResponse;
use codex_app_server_protocol::TurnPlanStep;
use codex_app_server_protocol::TurnPlanUpdatedNotification;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::build_turns_from_event_msgs;
use codex_app_server_protocol::build_turns_from_rollout_items;
use codex_core::CodexThread;
use codex_core::parse_command::shlex_join;
use codex_core::protocol::ApplyPatchApprovalRequestEvent;
@@ -1101,9 +1101,9 @@ pub(crate) async fn apply_bespoke_event_handling(
{
Ok(summary) => {
let mut thread = summary_to_thread(summary);
match read_event_msgs_from_rollout(rollout_path.as_path()).await {
Ok(events) => {
thread.turns = build_turns_from_event_msgs(&events);
match read_rollout_items_from_rollout(rollout_path.as_path()).await {
Ok(items) => {
thread.turns = build_turns_from_rollout_items(&items);
ThreadRollbackResponse { thread }
}
Err(err) => {

View File

@@ -148,7 +148,7 @@ use codex_app_server_protocol::TurnSteerResponse;
use codex_app_server_protocol::UserInfoResponse;
use codex_app_server_protocol::UserInput as V2UserInput;
use codex_app_server_protocol::UserSavedConfig;
use codex_app_server_protocol::build_turns_from_event_msgs;
use codex_app_server_protocol::build_turns_from_rollout_items;
use codex_backend_client::Client as BackendClient;
use codex_chatgpt::connectors;
use codex_cloud_requirements::cloud_requirements_loader;
@@ -2543,9 +2543,9 @@ impl CodexMessageProcessor {
};
if include_turns && let Some(rollout_path) = rollout_path.as_ref() {
match read_event_msgs_from_rollout(rollout_path).await {
Ok(events) => {
thread.turns = build_turns_from_event_msgs(&events);
match read_rollout_items_from_rollout(rollout_path).await {
Ok(items) => {
thread.turns = build_turns_from_rollout_items(&items);
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
self.send_invalid_request_error(
@@ -2737,11 +2737,7 @@ impl CodexMessageProcessor {
session_configured,
..
}) => {
let SessionConfiguredEvent {
rollout_path,
initial_messages,
..
} = session_configured;
let SessionConfiguredEvent { rollout_path, .. } = session_configured;
let Some(rollout_path) = rollout_path else {
self.send_internal_error(
request_id,
@@ -2781,9 +2777,22 @@ impl CodexMessageProcessor {
return;
}
};
thread.turns = initial_messages
.as_deref()
.map_or_else(Vec::new, build_turns_from_event_msgs);
match read_rollout_items_from_rollout(rollout_path.as_path()).await {
Ok(items) => {
thread.turns = build_turns_from_rollout_items(&items);
}
Err(err) => {
self.send_internal_error(
request_id,
format!(
"failed to load rollout `{}` for thread {thread_id}: {err}",
rollout_path.display()
),
)
.await;
return;
}
}
let response = ThreadResumeResponse {
thread,
@@ -2945,11 +2954,7 @@ impl CodexMessageProcessor {
}
};
let SessionConfiguredEvent {
rollout_path,
initial_messages,
..
} = session_configured;
let SessionConfiguredEvent { rollout_path, .. } = session_configured;
let Some(rollout_path) = rollout_path else {
self.send_internal_error(
request_id,
@@ -2989,9 +2994,22 @@ impl CodexMessageProcessor {
return;
}
};
thread.turns = initial_messages
.as_deref()
.map_or_else(Vec::new, build_turns_from_event_msgs);
match read_rollout_items_from_rollout(rollout_path.as_path()).await {
Ok(items) => {
thread.turns = build_turns_from_rollout_items(&items);
}
Err(err) => {
self.send_internal_error(
request_id,
format!(
"failed to load rollout `{}` for thread {thread_id}: {err}",
rollout_path.display()
),
)
.await;
return;
}
}
let response = ThreadForkResponse {
thread: thread.clone(),
@@ -5899,22 +5917,16 @@ pub(crate) async fn read_summary_from_rollout(
})
}
pub(crate) async fn read_event_msgs_from_rollout(
pub(crate) async fn read_rollout_items_from_rollout(
path: &Path,
) -> std::io::Result<Vec<codex_protocol::protocol::EventMsg>> {
) -> std::io::Result<Vec<RolloutItem>> {
let items = match RolloutRecorder::get_rollout_history(path).await? {
InitialHistory::New => Vec::new(),
InitialHistory::Forked(items) => items,
InitialHistory::Resumed(resumed) => resumed.history,
};
Ok(items
.into_iter()
.filter_map(|item| match item {
RolloutItem::EventMsg(event) => Some(event),
_ => None,
})
.collect())
Ok(items)
}
fn extract_conversation_summary(

View File

@@ -560,6 +560,7 @@ fn append_rollout_turn_context(path: &Path, timestamp: &str, model: &str) -> std
let line = RolloutLine {
timestamp: timestamp.to_string(),
item: RolloutItem::TurnContext(TurnContextItem {
turn_id: None,
cwd: PathBuf::from("/"),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::DangerFullAccess,

View File

@@ -101,6 +101,7 @@ use tracing::instrument;
use tracing::trace;
use tracing::trace_span;
use tracing::warn;
use uuid::Uuid;
use crate::ModelProviderInfo;
use crate::client::ModelClient;
@@ -240,7 +241,6 @@ use codex_utils_readiness::ReadinessFlag;
/// The high-level interface to the Codex system.
/// It operates as a queue pair where you send submissions and receive events.
pub struct Codex {
pub(crate) next_id: AtomicU64,
pub(crate) tx_sub: Sender<Submission>,
pub(crate) rx_event: Receiver<Event>,
// Last known status of the agent.
@@ -423,7 +423,6 @@ impl Codex {
submission_loop(Arc::clone(&session), config, rx_sub).instrument(session_loop_span),
);
let codex = Codex {
next_id: AtomicU64::new(0),
tx_sub,
rx_event,
agent_status: agent_status_rx,
@@ -440,10 +439,7 @@ impl Codex {
/// Submit the `op` wrapped in a `Submission` with a unique ID.
pub async fn submit(&self, op: Op) -> CodexResult<String> {
let id = self
.next_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
.to_string();
let id = Uuid::now_v7().to_string();
let sub = Submission { id: id.clone(), op };
self.submit_with_id(sub).await?;
Ok(id)
@@ -4748,6 +4744,7 @@ async fn try_run_sampling_request(
) -> CodexResult<SamplingRequestResult> {
let collaboration_mode = sess.current_collaboration_mode().await;
let rollout_item = RolloutItem::TurnContext(TurnContextItem {
turn_id: Some(turn_context.sub_id.clone()),
cwd: turn_context.cwd.clone(),
approval_policy: turn_context.approval_policy,
sandbox_policy: turn_context.sandbox_policy.clone(),

View File

@@ -1,6 +1,5 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use async_channel::Receiver;
use async_channel::Sender;
@@ -90,7 +89,6 @@ pub(crate) async fn run_codex_thread_interactive(
});
Ok(Codex {
next_id: AtomicU64::new(0),
tx_sub: tx_ops,
rx_event: rx_sub,
agent_status: codex.agent_status.clone(),
@@ -166,7 +164,6 @@ pub(crate) async fn run_codex_thread_one_shot(
drop(rx_closed);
Ok(Codex {
next_id: AtomicU64::new(0),
rx_event: rx_bridge,
tx_sub: tx_closed,
agent_status,
@@ -462,7 +459,6 @@ mod tests {
let (_agent_status_tx, agent_status) = watch::channel(AgentStatus::PendingInit);
let (session, ctx, _rx_evt) = crate::codex::make_session_and_context_with_rx().await;
let codex = Arc::new(Codex {
next_id: AtomicU64::new(0),
tx_sub,
rx_event: rx_events,
agent_status,

View File

@@ -95,6 +95,7 @@ async fn run_compact_task_inner(
// session config before this write occurs.
let collaboration_mode = sess.current_collaboration_mode().await;
let rollout_item = RolloutItem::TurnContext(TurnContextItem {
turn_id: Some(turn_context.sub_id.clone()),
cwd: turn_context.cwd.clone(),
approval_policy: turn_context.approval_policy,
sandbox_policy: turn_context.sandbox_policy.clone(),

View File

@@ -1528,7 +1528,7 @@ async fn run_scenario(scenario: &ScenarioSpec) -> Result<()> {
}
test.codex
.submit(Op::ExecApproval {
id: "0".into(),
id: approval.turn_id,
decision: decision.clone(),
})
.await?;
@@ -1549,7 +1549,7 @@ async fn run_scenario(scenario: &ScenarioSpec) -> Result<()> {
}
test.codex
.submit(Op::PatchApproval {
id: "0".into(),
id: approval.turn_id,
decision: decision.clone(),
})
.await?;
@@ -1624,10 +1624,10 @@ async fn approving_apply_patch_for_session_skips_future_prompts_for_same_file()
sandbox_policy.clone(),
)
.await?;
let _ = expect_patch_approval(&test, call_id_1).await;
let patch_approval = expect_patch_approval(&test, call_id_1).await;
test.codex
.submit(Op::PatchApproval {
id: "0".into(),
id: patch_approval.turn_id,
decision: ReviewDecision::ApprovedForSession,
})
.await?;
@@ -1746,7 +1746,7 @@ async fn approving_execpolicy_amendment_persists_policy_and_skips_future_prompts
test.codex
.submit(Op::ExecApproval {
id: "0".into(),
id: approval.turn_id,
decision: ReviewDecision::ApprovedExecpolicyAmendment {
proposed_execpolicy_amendment: expected_execpolicy_amendment.clone(),
},

View File

@@ -87,15 +87,18 @@ async fn codex_delegate_forwards_exec_approval_and_proceeds_on_approval() {
.await;
// Expect parent-side approval request (forwarded by delegate).
wait_for_event(&test.codex, |ev| {
let approval_event = wait_for_event(&test.codex, |ev| {
matches!(ev, EventMsg::ExecApprovalRequest(_))
})
.await;
let EventMsg::ExecApprovalRequest(approval_request) = approval_event else {
panic!("expected exec approval request");
};
// Approve via parent; id "0" is the active sub_id in tests.
// Approve via parent turn id from approval request.
test.codex
.submit(Op::ExecApproval {
id: "0".into(),
id: approval_request.turn_id,
decision: ReviewDecision::Approved,
})
.await
@@ -161,15 +164,18 @@ async fn codex_delegate_forwards_patch_approval_and_proceeds_on_decision() {
matches!(ev, EventMsg::EnteredReviewMode(_))
})
.await;
wait_for_event(&test.codex, |ev| {
let approval_event = wait_for_event(&test.codex, |ev| {
matches!(ev, EventMsg::ApplyPatchApprovalRequest(_))
})
.await;
let EventMsg::ApplyPatchApprovalRequest(approval_request) = approval_event else {
panic!("expected patch approval request");
};
// Deny via parent so delegate can continue; id "0" is the active sub_id in tests.
// Deny via parent so delegate can continue.
test.codex
.submit(Op::PatchApproval {
id: "0".into(),
id: approval_request.turn_id,
decision: ReviewDecision::Denied,
})
.await

View File

@@ -1032,11 +1032,15 @@ async fn handle_container_exec_user_approved_records_tool_decision() {
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecApprovalRequest(_))).await;
let approval_event =
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecApprovalRequest(_))).await;
let EventMsg::ExecApprovalRequest(approval_request) = approval_event else {
panic!("expected exec approval request");
};
codex
.submit(Op::ExecApproval {
id: "0".into(),
id: approval_request.turn_id,
decision: ReviewDecision::Approved,
})
.await
@@ -1092,11 +1096,15 @@ async fn handle_container_exec_user_approved_for_session_records_tool_decision()
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecApprovalRequest(_))).await;
let approval_event =
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecApprovalRequest(_))).await;
let EventMsg::ExecApprovalRequest(approval_request) = approval_event else {
panic!("expected exec approval request");
};
codex
.submit(Op::ExecApproval {
id: "0".into(),
id: approval_request.turn_id,
decision: ReviewDecision::ApprovedForSession,
})
.await
@@ -1152,11 +1160,15 @@ async fn handle_sandbox_error_user_approves_retry_records_tool_decision() {
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecApprovalRequest(_))).await;
let approval_event =
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecApprovalRequest(_))).await;
let EventMsg::ExecApprovalRequest(approval_request) = approval_event else {
panic!("expected exec approval request");
};
codex
.submit(Op::ExecApproval {
id: "0".into(),
id: approval_request.turn_id,
decision: ReviewDecision::Approved,
})
.await
@@ -1212,11 +1224,15 @@ async fn handle_container_exec_user_denies_records_tool_decision() {
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecApprovalRequest(_))).await;
let approval_event =
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecApprovalRequest(_))).await;
let EventMsg::ExecApprovalRequest(approval_request) = approval_event else {
panic!("expected exec approval request");
};
codex
.submit(Op::ExecApproval {
id: "0".into(),
id: approval_request.turn_id,
decision: ReviewDecision::Denied,
})
.await
@@ -1272,11 +1288,15 @@ async fn handle_sandbox_error_user_approves_for_session_records_tool_decision()
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecApprovalRequest(_))).await;
let approval_event =
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecApprovalRequest(_))).await;
let EventMsg::ExecApprovalRequest(approval_request) = approval_event else {
panic!("expected exec approval request");
};
codex
.submit(Op::ExecApproval {
id: "0".into(),
id: approval_request.turn_id,
decision: ReviewDecision::ApprovedForSession,
})
.await
@@ -1333,11 +1353,15 @@ async fn handle_sandbox_error_user_denies_records_tool_decision() {
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecApprovalRequest(_))).await;
let approval_event =
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecApprovalRequest(_))).await;
let EventMsg::ExecApprovalRequest(approval_request) = approval_event else {
panic!("expected exec approval request");
};
codex
.submit(Op::ExecApproval {
id: "0".into(),
id: approval_request.turn_id,
decision: ReviewDecision::Denied,
})
.await

View File

@@ -22,6 +22,7 @@ fn resume_history(
rollout_path: &std::path::Path,
) -> InitialHistory {
let turn_ctx = TurnContextItem {
turn_id: None,
cwd: config.cwd.clone(),
approval_policy: config.approval_policy.value(),
sandbox_policy: config.sandbox_policy.get().clone(),

View File

@@ -1737,6 +1737,8 @@ impl From<CompactedItem> for ResponseItem {
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, TS)]
pub struct TurnContextItem {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub turn_id: Option<String>,
pub cwd: PathBuf,
pub approval_policy: AskForApproval,
pub sandbox_policy: SandboxPolicy,

View File

@@ -1018,6 +1018,7 @@ mod tests {
.clone()
.unwrap_or_else(|| "gpt-5.1".to_string());
TurnContextItem {
turn_id: None,
cwd,
approval_policy: config.approval_policy.value(),
sandbox_policy: config.sandbox_policy.get().clone(),