[App-server] add new v2 events:item/reasoning/delta, item/agentMessage/delta & item/reasoning/summaryPartAdded (#6559)

core event to app server event mapping:
1. `codex/event/reasoning_content_delta` ->
`item/reasoning/summaryTextDelta`.
2. `codex/event/reasoning_raw_content_delta` ->
`item/reasoning/textDelta`
3. `codex/event/agent_message_content_delta` →
`item/agentMessage/delta`.
4. `codex/event/agent_reasoning_section_break` ->
`item/reasoning/summaryPartAdded`.

Also added a change in core to pass down content index, summary index
and item id from events.

Tested with the `git checkout owen/app_server_test_client && cargo run
-p codex-app-server-test-client -- send-message-v2 "hello"` and verified
that new events are emitted correctly.
This commit is contained in:
Celia Chen
2025-11-13 16:25:01 -08:00
committed by GitHub
parent 2c1b693da4
commit b8ec97c0ef
10 changed files with 203 additions and 43 deletions

View File

@@ -508,6 +508,9 @@ server_notification_definitions! {
McpToolCallProgress => "item/mcpToolCall/progress" (v2::McpToolCallProgressNotification),
AccountUpdated => "account/updated" (v2::AccountUpdatedNotification),
AccountRateLimitsUpdated => "account/rateLimits/updated" (v2::AccountRateLimitsUpdatedNotification),
ReasoningSummaryTextDelta => "item/reasoning/summaryTextDelta" (v2::ReasoningSummaryTextDeltaNotification),
ReasoningSummaryPartAdded => "item/reasoning/summaryPartAdded" (v2::ReasoningSummaryPartAddedNotification),
ReasoningTextDelta => "item/reasoning/textDelta" (v2::ReasoningTextDeltaNotification),
#[serde(rename = "account/login/completed")]
#[ts(rename = "account/login/completed")]

View File

@@ -516,7 +516,10 @@ pub enum ThreadItem {
},
Reasoning {
id: String,
text: String,
#[serde(default)]
summary: Vec<String>,
#[serde(default)]
content: Vec<String>,
},
CommandExecution {
id: String,
@@ -575,17 +578,11 @@ impl From<CoreTurnItem> for ThreadItem {
.collect::<String>();
ThreadItem::AgentMessage { id: agent.id, text }
}
CoreTurnItem::Reasoning(reasoning) => {
let text = if !reasoning.summary_text.is_empty() {
reasoning.summary_text.join("\n")
} else {
reasoning.raw_content.join("\n")
};
ThreadItem::Reasoning {
id: reasoning.id,
text,
}
}
CoreTurnItem::Reasoning(reasoning) => ThreadItem::Reasoning {
id: reasoning.id,
summary: reasoning.summary_text,
content: reasoning.raw_content,
},
CoreTurnItem::WebSearch(search) => ThreadItem::WebSearch {
id: search.id,
query: search.query,
@@ -719,6 +716,32 @@ pub struct AgentMessageDeltaNotification {
pub delta: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ReasoningSummaryTextDeltaNotification {
pub item_id: String,
pub delta: String,
pub summary_index: i64,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ReasoningSummaryPartAddedNotification {
pub item_id: String,
pub summary_index: i64,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ReasoningTextDeltaNotification {
pub item_id: String,
pub delta: String,
pub content_index: i64,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
@@ -867,7 +890,8 @@ mod tests {
ThreadItem::from(reasoning_item),
ThreadItem::Reasoning {
id: "reasoning-1".to_string(),
text: "line one\nline two".to_string(),
summary: vec!["line one".to_string(), "line two".to_string()],
content: vec![],
}
);

View File

@@ -12,6 +12,7 @@ use codex_app_server_protocol::AccountRateLimitsUpdatedNotification;
use codex_app_server_protocol::AccountUpdatedNotification;
use codex_app_server_protocol::AddConversationListenerParams;
use codex_app_server_protocol::AddConversationSubscriptionResponse;
use codex_app_server_protocol::AgentMessageDeltaNotification;
use codex_app_server_protocol::ApplyPatchApprovalParams;
use codex_app_server_protocol::ApplyPatchApprovalResponse;
use codex_app_server_protocol::ArchiveConversationParams;
@@ -62,6 +63,9 @@ use codex_app_server_protocol::ModelListParams;
use codex_app_server_protocol::ModelListResponse;
use codex_app_server_protocol::NewConversationParams;
use codex_app_server_protocol::NewConversationResponse;
use codex_app_server_protocol::ReasoningSummaryPartAddedNotification;
use codex_app_server_protocol::ReasoningSummaryTextDeltaNotification;
use codex_app_server_protocol::ReasoningTextDeltaNotification;
use codex_app_server_protocol::RemoveConversationListenerParams;
use codex_app_server_protocol::RemoveConversationSubscriptionResponse;
use codex_app_server_protocol::RequestId;
@@ -2681,6 +2685,48 @@ async fn apply_bespoke_event_handling(
on_patch_approval_response(event_id, rx, conversation).await;
});
}
EventMsg::AgentMessageContentDelta(event) => {
let notification = AgentMessageDeltaNotification {
item_id: event.item_id,
delta: event.delta,
};
outgoing
.send_server_notification(ServerNotification::AgentMessageDelta(notification))
.await;
}
EventMsg::ReasoningContentDelta(event) => {
let notification = ReasoningSummaryTextDeltaNotification {
item_id: event.item_id,
delta: event.delta,
summary_index: event.summary_index,
};
outgoing
.send_server_notification(ServerNotification::ReasoningSummaryTextDelta(
notification,
))
.await;
}
EventMsg::ReasoningRawContentDelta(event) => {
let notification = ReasoningTextDeltaNotification {
item_id: event.item_id,
delta: event.delta,
content_index: event.content_index,
};
outgoing
.send_server_notification(ServerNotification::ReasoningTextDelta(notification))
.await;
}
EventMsg::AgentReasoningSectionBreak(event) => {
let notification = ReasoningSummaryPartAddedNotification {
item_id: event.item_id,
summary_index: event.summary_index,
};
outgoing
.send_server_notification(ServerNotification::ReasoningSummaryPartAdded(
notification,
))
.await;
}
EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
call_id,
command,

View File

@@ -477,10 +477,14 @@ async fn append_reasoning_text(
..
}) = reasoning_item
{
let content_index = content.len() as i64;
content.push(ReasoningItemContent::ReasoningText { text: text.clone() });
let _ = tx_event
.send(Ok(ResponseEvent::ReasoningContentDelta(text.clone())))
.send(Ok(ResponseEvent::ReasoningContentDelta {
delta: text.clone(),
content_index,
}))
.await;
}
}
@@ -898,20 +902,26 @@ where
continue;
}
}
Poll::Ready(Some(Ok(ResponseEvent::ReasoningContentDelta(delta)))) => {
Poll::Ready(Some(Ok(ResponseEvent::ReasoningContentDelta {
delta,
content_index,
}))) => {
// Always accumulate reasoning deltas so we can emit a final Reasoning item at Completed.
this.cumulative_reasoning.push_str(&delta);
if matches!(this.mode, AggregateMode::Streaming) {
// In streaming mode, also forward the delta immediately.
return Poll::Ready(Some(Ok(ResponseEvent::ReasoningContentDelta(delta))));
return Poll::Ready(Some(Ok(ResponseEvent::ReasoningContentDelta {
delta,
content_index,
})));
} else {
continue;
}
}
Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta(_)))) => {
Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta { .. }))) => {
continue;
}
Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryPartAdded))) => {
Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryPartAdded { .. }))) => {
continue;
}
Poll::Ready(Some(Ok(ResponseEvent::OutputItemAdded(item)))) => {

View File

@@ -560,6 +560,8 @@ struct SseEvent {
response: Option<Value>,
item: Option<Value>,
delta: Option<String>,
summary_index: Option<i64>,
content_index: Option<i64>,
}
#[derive(Debug, Deserialize)]
@@ -819,16 +821,22 @@ async fn process_sse<S>(
}
}
"response.reasoning_summary_text.delta" => {
if let Some(delta) = event.delta {
let event = ResponseEvent::ReasoningSummaryDelta(delta);
if let (Some(delta), Some(summary_index)) = (event.delta, event.summary_index) {
let event = ResponseEvent::ReasoningSummaryDelta {
delta,
summary_index,
};
if tx_event.send(Ok(event)).await.is_err() {
return;
}
}
}
"response.reasoning_text.delta" => {
if let Some(delta) = event.delta {
let event = ResponseEvent::ReasoningContentDelta(delta);
if let (Some(delta), Some(content_index)) = (event.delta, event.content_index) {
let event = ResponseEvent::ReasoningContentDelta {
delta,
content_index,
};
if tx_event.send(Ok(event)).await.is_err() {
return;
}
@@ -905,10 +913,12 @@ async fn process_sse<S>(
}
}
"response.reasoning_summary_part.added" => {
// Boundary between reasoning summary sections (e.g., titles).
let event = ResponseEvent::ReasoningSummaryPartAdded;
if tx_event.send(Ok(event)).await.is_err() {
return;
if let Some(summary_index) = event.summary_index {
// Boundary between reasoning summary sections (e.g., titles).
let event = ResponseEvent::ReasoningSummaryPartAdded { summary_index };
if tx_event.send(Ok(event)).await.is_err() {
return;
}
}
}
"response.reasoning_summary_text.done" => {}

View File

@@ -203,9 +203,17 @@ pub enum ResponseEvent {
token_usage: Option<TokenUsage>,
},
OutputTextDelta(String),
ReasoningSummaryDelta(String),
ReasoningContentDelta(String),
ReasoningSummaryPartAdded,
ReasoningSummaryDelta {
delta: String,
summary_index: i64,
},
ReasoningContentDelta {
delta: String,
content_index: i64,
},
ReasoningSummaryPartAdded {
summary_index: i64,
},
RateLimits(RateLimitSnapshot),
}

View File

@@ -2232,13 +2232,17 @@ async fn try_run_turn(
error_or_panic("ReasoningSummaryDelta without active item".to_string());
}
}
ResponseEvent::ReasoningSummaryDelta(delta) => {
ResponseEvent::ReasoningSummaryDelta {
delta,
summary_index,
} => {
if let Some(active) = active_item.as_ref() {
let event = ReasoningContentDeltaEvent {
thread_id: sess.conversation_id.to_string(),
turn_id: turn_context.sub_id.clone(),
item_id: active.id(),
delta: delta.clone(),
delta,
summary_index,
};
sess.send_event(&turn_context, EventMsg::ReasoningContentDelta(event))
.await;
@@ -2246,18 +2250,29 @@ async fn try_run_turn(
error_or_panic("ReasoningSummaryDelta without active item".to_string());
}
}
ResponseEvent::ReasoningSummaryPartAdded => {
let event =
EventMsg::AgentReasoningSectionBreak(AgentReasoningSectionBreakEvent {});
sess.send_event(&turn_context, event).await;
ResponseEvent::ReasoningSummaryPartAdded { summary_index } => {
if let Some(active) = active_item.as_ref() {
let event =
EventMsg::AgentReasoningSectionBreak(AgentReasoningSectionBreakEvent {
item_id: active.id(),
summary_index,
});
sess.send_event(&turn_context, event).await;
} else {
error_or_panic("ReasoningSummaryPartAdded without active item".to_string());
}
}
ResponseEvent::ReasoningContentDelta(delta) => {
ResponseEvent::ReasoningContentDelta {
delta,
content_index,
} => {
if let Some(active) = active_item.as_ref() {
let event = ReasoningRawContentDeltaEvent {
thread_id: sess.conversation_id.to_string(),
turn_id: turn_context.sub_id.clone(),
item_id: active.id(),
delta: delta.clone(),
delta,
content_index,
};
sess.send_event(&turn_context, EventMsg::ReasoningRawContentDelta(event))
.await;

View File

@@ -215,7 +215,13 @@ async fn streams_reasoning_from_string_delta() {
}
match &events[1] {
ResponseEvent::ReasoningContentDelta(text) => assert_eq!(text, "think1"),
ResponseEvent::ReasoningContentDelta {
delta,
content_index,
} => {
assert_eq!(delta, "think1");
assert_eq!(content_index, &0);
}
other => panic!("expected reasoning delta, got {other:?}"),
}
@@ -267,12 +273,24 @@ async fn streams_reasoning_from_object_delta() {
}
match &events[1] {
ResponseEvent::ReasoningContentDelta(text) => assert_eq!(text, "partA"),
ResponseEvent::ReasoningContentDelta {
delta,
content_index,
} => {
assert_eq!(delta, "partA");
assert_eq!(content_index, &0);
}
other => panic!("expected reasoning delta, got {other:?}"),
}
match &events[2] {
ResponseEvent::ReasoningContentDelta(text) => assert_eq!(text, "partB"),
ResponseEvent::ReasoningContentDelta {
delta,
content_index,
} => {
assert_eq!(delta, "partB");
assert_eq!(content_index, &1);
}
other => panic!("expected reasoning delta, got {other:?}"),
}
@@ -319,7 +337,13 @@ async fn streams_reasoning_from_final_message() {
}
match &events[1] {
ResponseEvent::ReasoningContentDelta(text) => assert_eq!(text, "final-cot"),
ResponseEvent::ReasoningContentDelta {
delta,
content_index,
} => {
assert_eq!(delta, "final-cot");
assert_eq!(content_index, &0);
}
other => panic!("expected reasoning delta, got {other:?}"),
}
@@ -354,7 +378,13 @@ async fn streams_reasoning_before_tool_call() {
}
match &events[1] {
ResponseEvent::ReasoningContentDelta(text) => assert_eq!(text, "pre-tool"),
ResponseEvent::ReasoningContentDelta {
delta,
content_index,
} => {
assert_eq!(delta, "pre-tool");
assert_eq!(content_index, &0);
}
other => panic!("expected reasoning delta, got {other:?}"),
}

View File

@@ -296,6 +296,7 @@ pub fn ev_reasoning_summary_text_delta(delta: &str) -> Value {
serde_json::json!({
"type": "response.reasoning_summary_text.delta",
"delta": delta,
"summary_index": 0,
})
}
@@ -303,6 +304,7 @@ pub fn ev_reasoning_text_delta(delta: &str) -> Value {
serde_json::json!({
"type": "response.reasoning_text.delta",
"delta": delta,
"content_index": 0,
})
}

View File

@@ -617,6 +617,9 @@ pub struct ReasoningContentDeltaEvent {
pub turn_id: String,
pub item_id: String,
pub delta: String,
// load with default value so it's backward compatible with the old format.
#[serde(default)]
pub summary_index: i64,
}
impl HasLegacyEvent for ReasoningContentDeltaEvent {
@@ -633,6 +636,9 @@ pub struct ReasoningRawContentDeltaEvent {
pub turn_id: String,
pub item_id: String,
pub delta: String,
// load with default value so it's backward compatible with the old format.
#[serde(default)]
pub content_index: i64,
}
impl HasLegacyEvent for ReasoningRawContentDeltaEvent {
@@ -923,7 +929,13 @@ pub struct AgentReasoningRawContentDeltaEvent {
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
pub struct AgentReasoningSectionBreakEvent {}
pub struct AgentReasoningSectionBreakEvent {
// load with default value so it's backward compatible with the old format.
#[serde(default)]
pub item_id: String,
#[serde(default)]
pub summary_index: i64,
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
pub struct AgentReasoningDeltaEvent {