mirror of
https://github.com/openai/codex.git
synced 2026-04-26 07:35:29 +00:00
chore: persist turn_id in rollout session and make turn_id uuid based (#11246)
Problem:
1. turn id is constructed in-memory;
2. on resuming threads, turn_id might not be unique;
3. client cannot no the boundary of a turn from rollout files easily.
This PR does three things:
1. persist `task_started` and `task_complete` events;
1. persist `turn_id` in rollout turn events;
5. generate turn_id as unique uuids instead of incrementing it in
memory.
This helps us resolve the issue of clients wanting to have unique turn
ids for resuming a thread, and knowing the boundry of each turn in
rollout files.
example debug logs
```
2026-02-11T00:32:10.746876Z DEBUG codex_app_server_protocol::protocol::thread_history: built turn from rollout items turn_index=8 turn=Turn { id: "019c4a07-d809-74c3-bc4b-fd9618487b4b", items: [UserMessage { id: "item-24", content: [Text { text: "hi", text_elements: [] }] }, AgentMessage { id: "item-25", text: "Hi. I’m in the workspace with your current changes loaded and ready. Send the next task and I’ll execute it end-to-end." }], status: Completed, error: None }
2026-02-11T00:32:10.746888Z DEBUG codex_app_server_protocol::protocol::thread_history: built turn from rollout items turn_index=9 turn=Turn { id: "019c4a18-1004-76c0-a0fb-a77610f6a9b8", items: [UserMessage { id: "item-26", content: [Text { text: "hello", text_elements: [] }] }, AgentMessage { id: "item-27", text: "Hello. Ready for the next change in `codex-rs`; I can continue from the current in-progress diff or start a new task." }], status: Completed, error: None }
2026-02-11T00:32:10.746899Z DEBUG codex_app_server_protocol::protocol::thread_history: built turn from rollout items turn_index=10 turn=Turn { id: "019c4a19-41f0-7db0-ad78-74f1503baeb8", items: [UserMessage { id: "item-28", content: [Text { text: "hello", text_elements: [] }] }, AgentMessage { id: "item-29", text: "Hello. Send the specific change you want in `codex-rs`, and I’ll implement it and run the required checks." }], status: Completed, error: None }
```
backward compatibility:
if you try to resume an old session without task_started and
task_complete event populated, the following happens:
- If you resume and do nothing: those reconstructed historical IDs can
differ next time you resume.
- If you resume and send a new turn: the new turn gets a fresh UUID from
live submission flow and is persisted, so that new turn’s ID is stable
on later resumes.
I think this behavior is fine, because we only care about deterministic
turn id once a turn is triggered.
This commit is contained in:
@@ -5,21 +5,25 @@ use crate::protocol::v2::TurnStatus;
|
||||
use crate::protocol::v2::UserInput;
|
||||
use codex_protocol::protocol::AgentReasoningEvent;
|
||||
use codex_protocol::protocol::AgentReasoningRawContentEvent;
|
||||
use codex_protocol::protocol::CompactedItem;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::ItemCompletedEvent;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::ThreadRolledBackEvent;
|
||||
use codex_protocol::protocol::TurnAbortedEvent;
|
||||
use codex_protocol::protocol::TurnCompleteEvent;
|
||||
use codex_protocol::protocol::TurnStartedEvent;
|
||||
use codex_protocol::protocol::UserMessageEvent;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Convert persisted [`EventMsg`] entries into a sequence of [`Turn`] values.
|
||||
/// Convert persisted [`RolloutItem`] entries into a sequence of [`Turn`] values.
|
||||
///
|
||||
/// The purpose of this is to convert the EventMsgs persisted in a rollout file
|
||||
/// into a sequence of Turns and ThreadItems, which allows the client to render
|
||||
/// the historical messages when resuming a thread.
|
||||
pub fn build_turns_from_event_msgs(events: &[EventMsg]) -> Vec<Turn> {
|
||||
/// When available, this uses `TurnContext.turn_id` as the canonical turn id so
|
||||
/// resumed/rebuilt thread history preserves the original turn identifiers.
|
||||
pub fn build_turns_from_rollout_items(items: &[RolloutItem]) -> Vec<Turn> {
|
||||
let mut builder = ThreadHistoryBuilder::new();
|
||||
for event in events {
|
||||
builder.handle_event(event);
|
||||
for item in items {
|
||||
builder.handle_rollout_item(item);
|
||||
}
|
||||
builder.finish()
|
||||
}
|
||||
@@ -27,7 +31,6 @@ pub fn build_turns_from_event_msgs(events: &[EventMsg]) -> Vec<Turn> {
|
||||
struct ThreadHistoryBuilder {
|
||||
turns: Vec<Turn>,
|
||||
current_turn: Option<PendingTurn>,
|
||||
next_turn_index: i64,
|
||||
next_item_index: i64,
|
||||
}
|
||||
|
||||
@@ -36,7 +39,6 @@ impl ThreadHistoryBuilder {
|
||||
Self {
|
||||
turns: Vec::new(),
|
||||
current_turn: None,
|
||||
next_turn_index: 1,
|
||||
next_item_index: 1,
|
||||
}
|
||||
}
|
||||
@@ -63,13 +65,36 @@ impl ThreadHistoryBuilder {
|
||||
EventMsg::ThreadRolledBack(payload) => self.handle_thread_rollback(payload),
|
||||
EventMsg::UndoCompleted(_) => {}
|
||||
EventMsg::TurnAborted(payload) => self.handle_turn_aborted(payload),
|
||||
EventMsg::TurnStarted(payload) => self.handle_turn_started(payload),
|
||||
EventMsg::TurnComplete(payload) => self.handle_turn_complete(payload),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_rollout_item(&mut self, item: &RolloutItem) {
|
||||
match item {
|
||||
RolloutItem::EventMsg(event) => self.handle_event(event),
|
||||
RolloutItem::Compacted(payload) => self.handle_compacted(payload),
|
||||
RolloutItem::TurnContext(_)
|
||||
| RolloutItem::SessionMeta(_)
|
||||
| RolloutItem::ResponseItem(_) => {}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_user_message(&mut self, payload: &UserMessageEvent) {
|
||||
self.finish_current_turn();
|
||||
let mut turn = self.new_turn();
|
||||
// User messages should stay in explicitly opened turns. For backward
|
||||
// compatibility with older streams that did not open turns explicitly,
|
||||
// close any implicit/inactive turn and start a fresh one for this input.
|
||||
if let Some(turn) = self.current_turn.as_ref()
|
||||
&& !turn.opened_explicitly
|
||||
&& !(turn.saw_compaction && turn.items.is_empty())
|
||||
{
|
||||
self.finish_current_turn();
|
||||
}
|
||||
let mut turn = self
|
||||
.current_turn
|
||||
.take()
|
||||
.unwrap_or_else(|| self.new_turn(None));
|
||||
let id = self.next_item_id();
|
||||
let content = self.build_user_inputs(payload);
|
||||
turn.items.push(ThreadItem::UserMessage { id, content });
|
||||
@@ -147,6 +172,30 @@ impl ThreadHistoryBuilder {
|
||||
turn.status = TurnStatus::Interrupted;
|
||||
}
|
||||
|
||||
fn handle_turn_started(&mut self, payload: &TurnStartedEvent) {
|
||||
self.finish_current_turn();
|
||||
self.current_turn = Some(
|
||||
self.new_turn(Some(payload.turn_id.clone()))
|
||||
.opened_explicitly(),
|
||||
);
|
||||
}
|
||||
|
||||
fn handle_turn_complete(&mut self, _payload: &TurnCompleteEvent) {
|
||||
if let Some(current_turn) = self.current_turn.as_mut() {
|
||||
current_turn.status = TurnStatus::Completed;
|
||||
self.finish_current_turn();
|
||||
}
|
||||
}
|
||||
|
||||
/// Marks the current turn as containing a persisted compaction marker.
|
||||
///
|
||||
/// This keeps compaction-only legacy turns from being dropped by
|
||||
/// `finish_current_turn` when they have no renderable items and were not
|
||||
/// explicitly opened.
|
||||
fn handle_compacted(&mut self, _payload: &CompactedItem) {
|
||||
self.ensure_turn().saw_compaction = true;
|
||||
}
|
||||
|
||||
fn handle_thread_rollback(&mut self, payload: &ThreadRolledBackEvent) {
|
||||
self.finish_current_turn();
|
||||
|
||||
@@ -157,34 +206,33 @@ impl ThreadHistoryBuilder {
|
||||
self.turns.truncate(self.turns.len().saturating_sub(n));
|
||||
}
|
||||
|
||||
// Re-number subsequent synthetic ids so the pruned history is consistent.
|
||||
self.next_turn_index =
|
||||
i64::try_from(self.turns.len().saturating_add(1)).unwrap_or(i64::MAX);
|
||||
let item_count: usize = self.turns.iter().map(|t| t.items.len()).sum();
|
||||
self.next_item_index = i64::try_from(item_count.saturating_add(1)).unwrap_or(i64::MAX);
|
||||
}
|
||||
|
||||
fn finish_current_turn(&mut self) {
|
||||
if let Some(turn) = self.current_turn.take() {
|
||||
if turn.items.is_empty() {
|
||||
if turn.items.is_empty() && !turn.opened_explicitly && !turn.saw_compaction {
|
||||
return;
|
||||
}
|
||||
self.turns.push(turn.into());
|
||||
}
|
||||
}
|
||||
|
||||
fn new_turn(&mut self) -> PendingTurn {
|
||||
fn new_turn(&mut self, id: Option<String>) -> PendingTurn {
|
||||
PendingTurn {
|
||||
id: self.next_turn_id(),
|
||||
id: id.unwrap_or_else(|| Uuid::now_v7().to_string()),
|
||||
items: Vec::new(),
|
||||
error: None,
|
||||
status: TurnStatus::Completed,
|
||||
opened_explicitly: false,
|
||||
saw_compaction: false,
|
||||
}
|
||||
}
|
||||
|
||||
fn ensure_turn(&mut self) -> &mut PendingTurn {
|
||||
if self.current_turn.is_none() {
|
||||
let turn = self.new_turn();
|
||||
let turn = self.new_turn(None);
|
||||
return self.current_turn.insert(turn);
|
||||
}
|
||||
|
||||
@@ -195,12 +243,6 @@ impl ThreadHistoryBuilder {
|
||||
unreachable!("current turn must exist after initialization");
|
||||
}
|
||||
|
||||
fn next_turn_id(&mut self) -> String {
|
||||
let id = format!("turn-{}", self.next_turn_index);
|
||||
self.next_turn_index += 1;
|
||||
id
|
||||
}
|
||||
|
||||
fn next_item_id(&mut self) -> String {
|
||||
let id = format!("item-{}", self.next_item_index);
|
||||
self.next_item_index += 1;
|
||||
@@ -237,6 +279,19 @@ struct PendingTurn {
|
||||
items: Vec<ThreadItem>,
|
||||
error: Option<TurnError>,
|
||||
status: TurnStatus,
|
||||
/// True when this turn originated from an explicit `turn_started`/`turn_complete`
|
||||
/// boundary, so we preserve it even if it has no renderable items.
|
||||
opened_explicitly: bool,
|
||||
/// True when this turn includes a persisted `RolloutItem::Compacted`, which
|
||||
/// should keep the turn from being dropped even without normal items.
|
||||
saw_compaction: bool,
|
||||
}
|
||||
|
||||
impl PendingTurn {
|
||||
fn opened_explicitly(mut self) -> Self {
|
||||
self.opened_explicitly = true;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PendingTurn> for Turn {
|
||||
@@ -256,11 +311,15 @@ mod tests {
|
||||
use codex_protocol::protocol::AgentMessageEvent;
|
||||
use codex_protocol::protocol::AgentReasoningEvent;
|
||||
use codex_protocol::protocol::AgentReasoningRawContentEvent;
|
||||
use codex_protocol::protocol::CompactedItem;
|
||||
use codex_protocol::protocol::ThreadRolledBackEvent;
|
||||
use codex_protocol::protocol::TurnAbortReason;
|
||||
use codex_protocol::protocol::TurnAbortedEvent;
|
||||
use codex_protocol::protocol::TurnCompleteEvent;
|
||||
use codex_protocol::protocol::TurnStartedEvent;
|
||||
use codex_protocol::protocol::UserMessageEvent;
|
||||
use pretty_assertions::assert_eq;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[test]
|
||||
fn builds_multiple_turns_with_reasoning_items() {
|
||||
@@ -291,11 +350,15 @@ mod tests {
|
||||
}),
|
||||
];
|
||||
|
||||
let turns = build_turns_from_event_msgs(&events);
|
||||
let items = events
|
||||
.into_iter()
|
||||
.map(RolloutItem::EventMsg)
|
||||
.collect::<Vec<_>>();
|
||||
let turns = build_turns_from_rollout_items(&items);
|
||||
assert_eq!(turns.len(), 2);
|
||||
|
||||
let first = &turns[0];
|
||||
assert_eq!(first.id, "turn-1");
|
||||
assert!(Uuid::parse_str(&first.id).is_ok());
|
||||
assert_eq!(first.status, TurnStatus::Completed);
|
||||
assert_eq!(first.items.len(), 3);
|
||||
assert_eq!(
|
||||
@@ -330,7 +393,8 @@ mod tests {
|
||||
);
|
||||
|
||||
let second = &turns[1];
|
||||
assert_eq!(second.id, "turn-2");
|
||||
assert!(Uuid::parse_str(&second.id).is_ok());
|
||||
assert_ne!(first.id, second.id);
|
||||
assert_eq!(second.items.len(), 2);
|
||||
assert_eq!(
|
||||
second.items[0],
|
||||
@@ -374,7 +438,11 @@ mod tests {
|
||||
}),
|
||||
];
|
||||
|
||||
let turns = build_turns_from_event_msgs(&events);
|
||||
let items = events
|
||||
.into_iter()
|
||||
.map(RolloutItem::EventMsg)
|
||||
.collect::<Vec<_>>();
|
||||
let turns = build_turns_from_rollout_items(&items);
|
||||
assert_eq!(turns.len(), 1);
|
||||
let turn = &turns[0];
|
||||
assert_eq!(turn.items.len(), 4);
|
||||
@@ -410,6 +478,7 @@ mod tests {
|
||||
message: "Working...".into(),
|
||||
}),
|
||||
EventMsg::TurnAborted(TurnAbortedEvent {
|
||||
turn_id: Some("turn-1".into()),
|
||||
reason: TurnAbortReason::Replaced,
|
||||
}),
|
||||
EventMsg::UserMessage(UserMessageEvent {
|
||||
@@ -423,7 +492,11 @@ mod tests {
|
||||
}),
|
||||
];
|
||||
|
||||
let turns = build_turns_from_event_msgs(&events);
|
||||
let items = events
|
||||
.into_iter()
|
||||
.map(RolloutItem::EventMsg)
|
||||
.collect::<Vec<_>>();
|
||||
let turns = build_turns_from_rollout_items(&items);
|
||||
assert_eq!(turns.len(), 2);
|
||||
|
||||
let first_turn = &turns[0];
|
||||
@@ -502,46 +575,49 @@ mod tests {
|
||||
}),
|
||||
];
|
||||
|
||||
let turns = build_turns_from_event_msgs(&events);
|
||||
let expected = vec![
|
||||
Turn {
|
||||
id: "turn-1".into(),
|
||||
status: TurnStatus::Completed,
|
||||
error: None,
|
||||
items: vec![
|
||||
ThreadItem::UserMessage {
|
||||
id: "item-1".into(),
|
||||
content: vec![UserInput::Text {
|
||||
text: "First".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
},
|
||||
ThreadItem::AgentMessage {
|
||||
id: "item-2".into(),
|
||||
text: "A1".into(),
|
||||
},
|
||||
],
|
||||
},
|
||||
Turn {
|
||||
id: "turn-2".into(),
|
||||
status: TurnStatus::Completed,
|
||||
error: None,
|
||||
items: vec![
|
||||
ThreadItem::UserMessage {
|
||||
id: "item-3".into(),
|
||||
content: vec![UserInput::Text {
|
||||
text: "Third".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
},
|
||||
ThreadItem::AgentMessage {
|
||||
id: "item-4".into(),
|
||||
text: "A3".into(),
|
||||
},
|
||||
],
|
||||
},
|
||||
];
|
||||
assert_eq!(turns, expected);
|
||||
let items = events
|
||||
.into_iter()
|
||||
.map(RolloutItem::EventMsg)
|
||||
.collect::<Vec<_>>();
|
||||
let turns = build_turns_from_rollout_items(&items);
|
||||
assert_eq!(turns.len(), 2);
|
||||
assert!(Uuid::parse_str(&turns[0].id).is_ok());
|
||||
assert!(Uuid::parse_str(&turns[1].id).is_ok());
|
||||
assert_ne!(turns[0].id, turns[1].id);
|
||||
assert_eq!(turns[0].status, TurnStatus::Completed);
|
||||
assert_eq!(turns[1].status, TurnStatus::Completed);
|
||||
assert_eq!(
|
||||
turns[0].items,
|
||||
vec![
|
||||
ThreadItem::UserMessage {
|
||||
id: "item-1".into(),
|
||||
content: vec![UserInput::Text {
|
||||
text: "First".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
},
|
||||
ThreadItem::AgentMessage {
|
||||
id: "item-2".into(),
|
||||
text: "A1".into(),
|
||||
},
|
||||
]
|
||||
);
|
||||
assert_eq!(
|
||||
turns[1].items,
|
||||
vec![
|
||||
ThreadItem::UserMessage {
|
||||
id: "item-3".into(),
|
||||
content: vec![UserInput::Text {
|
||||
text: "Third".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
},
|
||||
ThreadItem::AgentMessage {
|
||||
id: "item-4".into(),
|
||||
text: "A3".into(),
|
||||
},
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -568,7 +644,95 @@ mod tests {
|
||||
EventMsg::ThreadRolledBack(ThreadRolledBackEvent { num_turns: 99 }),
|
||||
];
|
||||
|
||||
let turns = build_turns_from_event_msgs(&events);
|
||||
let items = events
|
||||
.into_iter()
|
||||
.map(RolloutItem::EventMsg)
|
||||
.collect::<Vec<_>>();
|
||||
let turns = build_turns_from_rollout_items(&items);
|
||||
assert_eq!(turns, Vec::<Turn>::new());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn uses_explicit_turn_boundaries_for_mid_turn_steering() {
|
||||
let events = vec![
|
||||
EventMsg::TurnStarted(TurnStartedEvent {
|
||||
turn_id: "turn-a".into(),
|
||||
model_context_window: None,
|
||||
collaboration_mode_kind: Default::default(),
|
||||
}),
|
||||
EventMsg::UserMessage(UserMessageEvent {
|
||||
message: "Start".into(),
|
||||
images: None,
|
||||
text_elements: Vec::new(),
|
||||
local_images: Vec::new(),
|
||||
}),
|
||||
EventMsg::UserMessage(UserMessageEvent {
|
||||
message: "Steer".into(),
|
||||
images: None,
|
||||
text_elements: Vec::new(),
|
||||
local_images: Vec::new(),
|
||||
}),
|
||||
EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-a".into(),
|
||||
last_agent_message: None,
|
||||
}),
|
||||
];
|
||||
|
||||
let items = events
|
||||
.into_iter()
|
||||
.map(RolloutItem::EventMsg)
|
||||
.collect::<Vec<_>>();
|
||||
let turns = build_turns_from_rollout_items(&items);
|
||||
assert_eq!(turns.len(), 1);
|
||||
assert_eq!(turns[0].id, "turn-a");
|
||||
assert_eq!(
|
||||
turns[0].items,
|
||||
vec![
|
||||
ThreadItem::UserMessage {
|
||||
id: "item-1".into(),
|
||||
content: vec![UserInput::Text {
|
||||
text: "Start".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
},
|
||||
ThreadItem::UserMessage {
|
||||
id: "item-2".into(),
|
||||
content: vec![UserInput::Text {
|
||||
text: "Steer".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
},
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn preserves_compaction_only_turn() {
|
||||
let items = vec![
|
||||
RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent {
|
||||
turn_id: "turn-compact".into(),
|
||||
model_context_window: None,
|
||||
collaboration_mode_kind: Default::default(),
|
||||
})),
|
||||
RolloutItem::Compacted(CompactedItem {
|
||||
message: String::new(),
|
||||
replacement_history: None,
|
||||
}),
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-compact".into(),
|
||||
last_agent_message: None,
|
||||
})),
|
||||
];
|
||||
|
||||
let turns = build_turns_from_rollout_items(&items);
|
||||
assert_eq!(
|
||||
turns,
|
||||
vec![Turn {
|
||||
id: "turn-compact".into(),
|
||||
status: TurnStatus::Completed,
|
||||
error: None,
|
||||
items: Vec::new(),
|
||||
}]
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user