[codex-analytics] add protocol-native turn timestamps (#16638)

---
[//]: # (BEGIN SAPLING FOOTER)
Stack created with [Sapling](https://sapling-scm.com). Best reviewed
with [ReviewStack](https://reviewstack.dev/openai/codex/pull/16638).
* #16870
* #16706
* #16659
* #16641
* #16640
* __->__ #16638
This commit is contained in:
rhan-oai
2026-04-06 16:22:59 -07:00
committed by GitHub
parent e88c2cf4d7
commit 756c45ec61
58 changed files with 1134 additions and 36 deletions

View File

@@ -864,22 +864,29 @@ impl ThreadHistoryBuilder {
}
fn handle_turn_aborted(&mut self, payload: &TurnAbortedEvent) {
let apply_abort = |turn: &mut PendingTurn| {
turn.status = TurnStatus::Interrupted;
turn.completed_at = payload.completed_at;
turn.duration_ms = payload.duration_ms;
};
if let Some(turn_id) = payload.turn_id.as_deref() {
// Prefer an exact ID match so we interrupt the turn explicitly targeted by the event.
if let Some(turn) = self.current_turn.as_mut().filter(|turn| turn.id == turn_id) {
turn.status = TurnStatus::Interrupted;
apply_abort(turn);
return;
}
if let Some(turn) = self.turns.iter_mut().find(|turn| turn.id == turn_id) {
turn.status = TurnStatus::Interrupted;
turn.completed_at = payload.completed_at;
turn.duration_ms = payload.duration_ms;
return;
}
}
// If the event has no ID (or refers to an unknown turn), fall back to the active turn.
if let Some(turn) = self.current_turn.as_mut() {
turn.status = TurnStatus::Interrupted;
apply_abort(turn);
}
}
@@ -888,15 +895,18 @@ impl ThreadHistoryBuilder {
self.current_turn = Some(
self.new_turn(Some(payload.turn_id.clone()))
.with_status(TurnStatus::InProgress)
.with_started_at(payload.started_at)
.opened_explicitly(),
);
}
fn handle_turn_complete(&mut self, payload: &TurnCompleteEvent) {
let mark_completed = |status: &mut TurnStatus| {
if matches!(*status, TurnStatus::Completed | TurnStatus::InProgress) {
*status = TurnStatus::Completed;
let mark_completed = |turn: &mut PendingTurn| {
if matches!(turn.status, TurnStatus::Completed | TurnStatus::InProgress) {
turn.status = TurnStatus::Completed;
}
turn.completed_at = payload.completed_at;
turn.duration_ms = payload.duration_ms;
};
// Prefer an exact ID match from the active turn and then close it.
@@ -905,7 +915,7 @@ impl ThreadHistoryBuilder {
.as_mut()
.filter(|turn| turn.id == payload.turn_id)
{
mark_completed(&mut current_turn.status);
mark_completed(current_turn);
self.finish_current_turn();
return;
}
@@ -915,13 +925,17 @@ impl ThreadHistoryBuilder {
.iter_mut()
.find(|turn| turn.id == payload.turn_id)
{
mark_completed(&mut turn.status);
if matches!(turn.status, TurnStatus::Completed | TurnStatus::InProgress) {
turn.status = TurnStatus::Completed;
}
turn.completed_at = payload.completed_at;
turn.duration_ms = payload.duration_ms;
return;
}
// If the completion event cannot be matched, apply it to the active turn.
if let Some(current_turn) = self.current_turn.as_mut() {
mark_completed(&mut current_turn.status);
mark_completed(current_turn);
self.finish_current_turn();
}
}
@@ -954,7 +968,7 @@ impl ThreadHistoryBuilder {
if turn.items.is_empty() && !turn.opened_explicitly && !turn.saw_compaction {
return;
}
self.turns.push(turn.into());
self.turns.push(Turn::from(turn));
}
}
@@ -964,6 +978,9 @@ impl ThreadHistoryBuilder {
items: Vec::new(),
error: None,
status: TurnStatus::Completed,
started_at: None,
completed_at: None,
duration_ms: None,
opened_explicitly: false,
saw_compaction: false,
rollout_start_index: self.current_rollout_index,
@@ -1082,6 +1099,9 @@ struct PendingTurn {
items: Vec<ThreadItem>,
error: Option<TurnError>,
status: TurnStatus,
started_at: Option<i64>,
completed_at: Option<i64>,
duration_ms: Option<i64>,
/// True when this turn originated from an explicit `turn_started`/`turn_complete`
/// boundary, so we preserve it even if it has no renderable items.
opened_explicitly: bool,
@@ -1102,6 +1122,11 @@ impl PendingTurn {
self.status = status;
self
}
fn with_started_at(mut self, started_at: Option<i64>) -> Self {
self.started_at = started_at;
self
}
}
impl From<PendingTurn> for Turn {
@@ -1111,6 +1136,9 @@ impl From<PendingTurn> for Turn {
items: value.items,
error: value.error,
status: value.status,
started_at: value.started_at,
completed_at: value.completed_at,
duration_ms: value.duration_ms,
}
}
}
@@ -1122,6 +1150,9 @@ impl From<&PendingTurn> for Turn {
items: value.items.clone(),
error: value.error.clone(),
status: value.status.clone(),
started_at: value.started_at,
completed_at: value.completed_at,
duration_ms: value.duration_ms,
}
}
}
@@ -1273,6 +1304,7 @@ mod tests {
let events = vec![
EventMsg::TurnStarted(TurnStartedEvent {
turn_id: turn_id.to_string(),
started_at: None,
model_context_window: None,
collaboration_mode_kind: Default::default(),
}),
@@ -1293,6 +1325,8 @@ mod tests {
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: turn_id.to_string(),
last_agent_message: None,
completed_at: None,
duration_ms: None,
}),
];
@@ -1345,6 +1379,7 @@ mod tests {
let items = vec![
RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-image".into(),
started_at: None,
model_context_window: None,
collaboration_mode_kind: Default::default(),
})),
@@ -1364,6 +1399,8 @@ mod tests {
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-image".into(),
last_agent_message: None,
completed_at: None,
duration_ms: None,
})),
];
@@ -1375,6 +1412,9 @@ mod tests {
id: "turn-image".into(),
status: TurnStatus::Completed,
error: None,
started_at: None,
completed_at: None,
duration_ms: None,
items: vec![
ThreadItem::UserMessage {
id: "item-1".into(),
@@ -1464,6 +1504,8 @@ mod tests {
EventMsg::TurnAborted(TurnAbortedEvent {
turn_id: Some("turn-1".into()),
reason: TurnAbortReason::Replaced,
completed_at: None,
duration_ms: None,
}),
EventMsg::UserMessage(UserMessageEvent {
message: "Let's try again".into(),
@@ -1661,6 +1703,7 @@ mod tests {
let events = vec![
EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-a".into(),
started_at: None,
model_context_window: None,
collaboration_mode_kind: Default::default(),
}),
@@ -1679,6 +1722,8 @@ mod tests {
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-a".into(),
last_agent_message: None,
completed_at: None,
duration_ms: None,
}),
];
@@ -1715,6 +1760,7 @@ mod tests {
let events = vec![
EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-1".into(),
started_at: None,
model_context_window: None,
collaboration_mode_kind: Default::default(),
}),
@@ -1820,6 +1866,7 @@ mod tests {
let events = vec![
EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-1".into(),
started_at: None,
model_context_window: None,
collaboration_mode_kind: Default::default(),
}),
@@ -1879,6 +1926,7 @@ mod tests {
let events = vec![
EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-1".into(),
started_at: None,
model_context_window: None,
collaboration_mode_kind: Default::default(),
}),
@@ -1966,6 +2014,7 @@ mod tests {
let events = vec![
EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-1".into(),
started_at: None,
model_context_window: None,
collaboration_mode_kind: Default::default(),
}),
@@ -2038,6 +2087,7 @@ mod tests {
let events = vec![
EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-1".into(),
started_at: None,
model_context_window: None,
collaboration_mode_kind: Default::default(),
}),
@@ -2096,6 +2146,7 @@ mod tests {
let events = vec![
EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-a".into(),
started_at: None,
model_context_window: None,
collaboration_mode_kind: Default::default(),
}),
@@ -2108,9 +2159,12 @@ mod tests {
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-a".into(),
last_agent_message: None,
completed_at: None,
duration_ms: None,
}),
EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-b".into(),
started_at: None,
model_context_window: None,
collaboration_mode_kind: Default::default(),
}),
@@ -2142,6 +2196,8 @@ mod tests {
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-b".into(),
last_agent_message: None,
completed_at: None,
duration_ms: None,
}),
];
@@ -2179,6 +2235,7 @@ mod tests {
let events = vec![
EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-a".into(),
started_at: None,
model_context_window: None,
collaboration_mode_kind: Default::default(),
}),
@@ -2191,9 +2248,12 @@ mod tests {
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-a".into(),
last_agent_message: None,
completed_at: None,
duration_ms: None,
}),
EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-b".into(),
started_at: None,
model_context_window: None,
collaboration_mode_kind: Default::default(),
}),
@@ -2225,6 +2285,8 @@ mod tests {
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-b".into(),
last_agent_message: None,
completed_at: None,
duration_ms: None,
}),
];
@@ -2257,6 +2319,7 @@ mod tests {
let events = vec![
EventMsg::TurnStarted(TurnStartedEvent {
turn_id: turn_id.to_string(),
started_at: None,
model_context_window: None,
collaboration_mode_kind: Default::default(),
}),
@@ -2320,6 +2383,7 @@ mod tests {
let events = vec![
EventMsg::TurnStarted(TurnStartedEvent {
turn_id: turn_id.to_string(),
started_at: None,
model_context_window: None,
collaboration_mode_kind: Default::default(),
}),
@@ -2382,6 +2446,7 @@ mod tests {
let events = vec![
EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-a".into(),
started_at: None,
model_context_window: None,
collaboration_mode_kind: Default::default(),
}),
@@ -2394,9 +2459,12 @@ mod tests {
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-a".into(),
last_agent_message: None,
completed_at: None,
duration_ms: None,
}),
EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-b".into(),
started_at: None,
model_context_window: None,
collaboration_mode_kind: Default::default(),
}),
@@ -2409,6 +2477,8 @@ mod tests {
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-a".into(),
last_agent_message: None,
completed_at: None,
duration_ms: None,
}),
EventMsg::AgentMessage(AgentMessageEvent {
message: "still in b".into(),
@@ -2418,6 +2488,8 @@ mod tests {
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-b".into(),
last_agent_message: None,
completed_at: None,
duration_ms: None,
}),
];
@@ -2437,6 +2509,7 @@ mod tests {
let events = vec![
EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-a".into(),
started_at: None,
model_context_window: None,
collaboration_mode_kind: Default::default(),
}),
@@ -2449,9 +2522,12 @@ mod tests {
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-a".into(),
last_agent_message: None,
completed_at: None,
duration_ms: None,
}),
EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-b".into(),
started_at: None,
model_context_window: None,
collaboration_mode_kind: Default::default(),
}),
@@ -2464,6 +2540,8 @@ mod tests {
EventMsg::TurnAborted(TurnAbortedEvent {
turn_id: Some("turn-a".into()),
reason: TurnAbortReason::Replaced,
completed_at: None,
duration_ms: None,
}),
EventMsg::AgentMessage(AgentMessageEvent {
message: "still in b".into(),
@@ -2489,6 +2567,7 @@ mod tests {
let items = vec![
RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-compact".into(),
started_at: None,
model_context_window: None,
collaboration_mode_kind: Default::default(),
})),
@@ -2499,6 +2578,8 @@ mod tests {
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-compact".into(),
last_agent_message: None,
completed_at: None,
duration_ms: None,
})),
];
@@ -2509,6 +2590,9 @@ mod tests {
id: "turn-compact".into(),
status: TurnStatus::Completed,
error: None,
started_at: None,
completed_at: None,
duration_ms: None,
items: Vec::new(),
}]
);
@@ -2726,6 +2810,7 @@ mod tests {
let events = vec![
EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-a".into(),
started_at: None,
model_context_window: None,
collaboration_mode_kind: Default::default(),
}),
@@ -2738,6 +2823,8 @@ mod tests {
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-a".into(),
last_agent_message: None,
completed_at: None,
duration_ms: None,
}),
EventMsg::Error(ErrorEvent {
message: "request-level failure".into(),
@@ -2757,6 +2844,9 @@ mod tests {
id: "turn-a".into(),
status: TurnStatus::Completed,
error: None,
started_at: None,
completed_at: None,
duration_ms: None,
items: vec![ThreadItem::UserMessage {
id: "item-1".into(),
content: vec![UserInput::Text {
@@ -2773,6 +2863,7 @@ mod tests {
let events = vec![
EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-a".into(),
started_at: None,
model_context_window: None,
collaboration_mode_kind: Default::default(),
}),
@@ -2791,6 +2882,8 @@ mod tests {
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-a".into(),
last_agent_message: None,
completed_at: None,
duration_ms: None,
}),
];
@@ -2826,6 +2919,7 @@ mod tests {
let items = vec![
RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-a".into(),
started_at: None,
model_context_window: None,
collaboration_mode_kind: Default::default(),
})),
@@ -2839,6 +2933,8 @@ mod tests {
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-a".into(),
last_agent_message: None,
completed_at: None,
duration_ms: None,
})),
];
@@ -2869,6 +2965,7 @@ mod tests {
let items = vec![
RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-a".into(),
started_at: None,
model_context_window: None,
collaboration_mode_kind: Default::default(),
})),
@@ -2884,6 +2981,8 @@ mod tests {
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-a".into(),
last_agent_message: None,
completed_at: None,
duration_ms: None,
})),
];