codex: restore exec json streaming compatibility (#15106)

This commit is contained in:
Eric Traut
2026-03-21 13:42:48 -06:00
parent efa0f04e6f
commit be5dba6189
4 changed files with 115 additions and 17 deletions

View File

@@ -5,7 +5,7 @@ use codex_core::config::Config;
use codex_protocol::protocol::SessionConfiguredEvent;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum CodexStatus {
pub enum CodexStatus {
Running,
InitiateShutdown,
}

View File

@@ -87,6 +87,10 @@ impl EventProcessorWithJsonOutput {
}
}
pub fn final_message(&self) -> Option<&str> {
self.final_message.as_deref()
}
fn next_item_id(&self) -> String {
format!("item_{}", self.next_item_id.fetch_add(1, Ordering::SeqCst))
}
@@ -128,8 +132,7 @@ impl EventProcessorWithJsonOutput {
.collect()
}
fn map_item(item: ThreadItem) -> Option<ExecThreadItem> {
let id = item.id().to_string();
fn map_item_with_id(id: String, item: ThreadItem) -> Option<ExecThreadItem> {
let details = match item {
ThreadItem::AgentMessage { text, .. } => {
ThreadItemDetails::AgentMessage(AgentMessageItem { text })
@@ -279,6 +282,24 @@ impl EventProcessorWithJsonOutput {
Some(ExecThreadItem { id, details })
}
fn map_started_item(item: ThreadItem) -> Option<ExecThreadItem> {
match item {
ThreadItem::AgentMessage { .. }
| ThreadItem::Reasoning { .. }
| ThreadItem::FileChange { .. } => None,
other => Self::map_item_with_id(other.id().to_string(), other),
}
}
fn map_completed_item(&self, item: ThreadItem) -> Option<ExecThreadItem> {
match item {
ThreadItem::AgentMessage { .. } | ThreadItem::Reasoning { .. } => {
Self::map_item_with_id(self.next_item_id(), item)
}
other => Self::map_item_with_id(other.id().to_string(), other),
}
}
fn final_message_from_turn_items(items: &[ThreadItem]) -> Option<String> {
items
.iter()
@@ -365,13 +386,13 @@ impl EventProcessorWithJsonOutput {
CodexStatus::Running
}
ServerNotification::ItemStarted(notification) => {
if let Some(item) = Self::map_item(notification.item) {
if let Some(item) = Self::map_started_item(notification.item) {
events.push(ThreadEvent::ItemStarted(ItemStartedEvent { item }));
}
CodexStatus::Running
}
ServerNotification::ItemCompleted(notification) => {
if let Some(item) = Self::map_item(notification.item) {
if let Some(item) = self.map_completed_item(notification.item) {
if let ThreadItemDetails::AgentMessage(AgentMessageItem { text }) =
&item.details
{

View File

@@ -71,6 +71,7 @@ use codex_protocol::protocol::ReviewRequest;
use codex_protocol::protocol::ReviewTarget;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::SessionConfiguredEvent;
use codex_protocol::protocol::SessionSource;
use codex_protocol::user_input::UserInput;

View File

@@ -254,7 +254,7 @@ fn reasoning_items_emit_summary_not_raw_content() {
CollectedThreadEvents {
events: vec![ThreadEvent::ItemCompleted(ItemCompletedEvent {
item: ExecThreadItem {
id: "reasoning-1".to_string(),
id: "item_0".to_string(),
details: ThreadItemDetails::Reasoning(ReasoningItem {
text: "safe summary".to_string(),
}),
@@ -802,7 +802,7 @@ fn agent_message_item_updates_final_message() {
CollectedThreadEvents {
events: vec![ThreadEvent::ItemCompleted(ItemCompletedEvent {
item: ExecThreadItem {
id: "msg-1".to_string(),
id: "item_0".to_string(),
details: ThreadItemDetails::AgentMessage(AgentMessageItem {
text: "hello".to_string(),
}),
@@ -811,7 +811,64 @@ fn agent_message_item_updates_final_message() {
status: CodexStatus::Running,
}
);
assert_eq!(processor.final_message.as_deref(), Some("hello"));
assert_eq!(processor.final_message(), Some("hello"));
}
#[test]
fn agent_message_item_started_is_ignored() {
let mut processor = EventProcessorWithJsonOutput::new(None);
let collected =
processor.collect_thread_events(ServerNotification::ItemStarted(ItemStartedNotification {
item: ThreadItem::AgentMessage {
id: "msg-1".to_string(),
text: "hello".to_string(),
phase: None,
memory_citation: None,
},
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
}));
assert_eq!(
collected,
CollectedThreadEvents {
events: Vec::new(),
status: CodexStatus::Running,
}
);
}
#[test]
fn reasoning_item_completed_uses_synthetic_id() {
let mut processor = EventProcessorWithJsonOutput::new(None);
let collected = processor.collect_thread_events(ServerNotification::ItemCompleted(
ItemCompletedNotification {
item: ThreadItem::Reasoning {
id: "rs-1".to_string(),
summary: vec!["thinking...".to_string()],
content: vec!["raw".to_string()],
},
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
},
));
assert_eq!(
collected,
CollectedThreadEvents {
events: vec![ThreadEvent::ItemCompleted(ItemCompletedEvent {
item: ExecThreadItem {
id: "item_0".to_string(),
details: ThreadItemDetails::Reasoning(ReasoningItem {
text: "thinking...".to_string(),
}),
},
})],
status: CodexStatus::Running,
}
);
}
#[test]
@@ -1115,13 +1172,24 @@ fn turn_completion_recovers_final_message_from_turn_items() {
status: CodexStatus::InitiateShutdown,
}
);
assert_eq!(processor.final_message.as_deref(), Some("final answer"));
assert_eq!(processor.final_message(), Some("final answer"));
}
#[test]
fn turn_completion_overwrites_stale_final_message_from_turn_items() {
let mut processor = EventProcessorWithJsonOutput::new(None);
processor.final_message = Some("stale answer".to_string());
let _ = processor.collect_thread_events(ServerNotification::ItemCompleted(
ItemCompletedNotification {
item: ThreadItem::AgentMessage {
id: "msg-stale".to_string(),
text: "stale answer".to_string(),
phase: None,
memory_citation: None,
},
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
},
));
let completed = processor.collect_thread_events(ServerNotification::TurnCompleted(
TurnCompletedNotification {
@@ -1149,13 +1217,24 @@ fn turn_completion_overwrites_stale_final_message_from_turn_items() {
status: CodexStatus::InitiateShutdown,
}
);
assert_eq!(processor.final_message.as_deref(), Some("final answer"));
assert_eq!(processor.final_message(), Some("final answer"));
}
#[test]
fn turn_completion_preserves_streamed_final_message_when_turn_items_are_empty() {
let mut processor = EventProcessorWithJsonOutput::new(None);
processor.final_message = Some("streamed answer".to_string());
let _ = processor.collect_thread_events(ServerNotification::ItemCompleted(
ItemCompletedNotification {
item: ThreadItem::AgentMessage {
id: "msg-streamed".to_string(),
text: "streamed answer".to_string(),
phase: None,
memory_citation: None,
},
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
},
));
let completed = processor.collect_thread_events(ServerNotification::TurnCompleted(
TurnCompletedNotification {
@@ -1178,7 +1257,7 @@ fn turn_completion_preserves_streamed_final_message_when_turn_items_are_empty()
status: CodexStatus::InitiateShutdown,
}
);
assert_eq!(processor.final_message.as_deref(), Some("streamed answer"));
assert_eq!(processor.final_message(), Some("streamed answer"));
}
#[test]
@@ -1209,10 +1288,7 @@ fn turn_completion_falls_back_to_final_plan_text() {
status: CodexStatus::InitiateShutdown,
}
);
assert_eq!(
processor.final_message.as_deref(),
Some("ship the typed adapter")
);
assert_eq!(processor.final_message(), Some("ship the typed adapter"));
}
#[test]