Compare commits

...

1 Commits

Author SHA1 Message Date
Dylan Hurd
80fafc212f [client] Add item ids for deltas and messages 2025-07-23 12:13:25 -07:00
10 changed files with 61 additions and 27 deletions

View File

@@ -41,7 +41,12 @@ pub(crate) async fn stream_chat_completions(
for item in &prompt.input {
match item {
ResponseItem::Message { role, content } => {
ResponseItem::Message {
// id will always be None for input items
id: _,
role,
content,
} => {
let mut text = String::new();
for c in content {
match c {
@@ -255,6 +260,7 @@ async fn process_chat_sse<S>(
.and_then(|c| c.as_str())
{
let item = ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: content.to_string(),
@@ -402,6 +408,7 @@ where
}))) => {
if !this.cumulative.is_empty() {
let aggregated_item = crate::models::ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![crate::models::ContentItem::OutputText {
text: std::mem::take(&mut this.cumulative),
@@ -430,8 +437,8 @@ where
// will never appear in a Chat Completions stream.
continue;
}
Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(_))))
| Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta(_)))) => {
Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta { .. })))
| Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta { .. }))) => {
// Deltas are ignored here since aggregation waits for the
// final OutputItemDone.
continue;

View File

@@ -230,6 +230,7 @@ struct SseEvent {
response: Option<Value>,
item: Option<Value>,
delta: Option<String>,
item_id: Option<String>,
}
#[derive(Debug, Deserialize)]
@@ -360,21 +361,22 @@ async fn process_sse<S>(
};
let event = ResponseEvent::OutputItemDone(item);
trace!(?event, "output_item.done");
if tx_event.send(Ok(event)).await.is_err() {
return;
}
}
"response.output_text.delta" => {
if let Some(delta) = event.delta {
let event = ResponseEvent::OutputTextDelta(delta);
if let (Some(delta), Some(item_id)) = (event.delta, event.item_id) {
let event = ResponseEvent::OutputTextDelta { delta, item_id };
if tx_event.send(Ok(event)).await.is_err() {
return;
}
}
}
"response.reasoning_summary_text.delta" => {
if let Some(delta) = event.delta {
let event = ResponseEvent::ReasoningSummaryDelta(delta);
if let (Some(delta), Some(item_id)) = (event.delta, event.item_id) {
let event = ResponseEvent::ReasoningSummaryDelta { delta, item_id };
if tx_event.send(Ok(event)).await.is_err() {
return;
}

View File

@@ -56,6 +56,8 @@ impl Prompt {
}
}
/// Events emitted by the response stream.
/// https://platform.openai.com/docs/api-reference/responses-streaming/response
#[derive(Debug)]
pub enum ResponseEvent {
Created,
@@ -64,8 +66,14 @@ pub enum ResponseEvent {
response_id: String,
token_usage: Option<TokenUsage>,
},
OutputTextDelta(String),
ReasoningSummaryDelta(String),
OutputTextDelta {
delta: String,
item_id: String,
},
ReasoningSummaryDelta {
delta: String,
item_id: String,
},
}
#[derive(Debug, Serialize)]

View File

@@ -1248,17 +1248,17 @@ async fn try_run_turn(
state.previous_response_id = Some(response_id);
return Ok(output);
}
ResponseEvent::OutputTextDelta(delta) => {
ResponseEvent::OutputTextDelta { delta, item_id } => {
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }),
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta, item_id }),
};
sess.tx_event.send(event).await.ok();
}
ResponseEvent::ReasoningSummaryDelta(delta) => {
ResponseEvent::ReasoningSummaryDelta { delta, item_id } => {
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }),
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta, item_id }),
};
sess.tx_event.send(event).await.ok();
}
@@ -1273,26 +1273,32 @@ async fn handle_response_item(
) -> CodexResult<Option<ResponseInputItem>> {
debug!(?item, "Output item");
let output = match item {
ResponseItem::Message { content, .. } => {
ResponseItem::Message { content, id, .. } => {
for item in content {
if let ContentItem::OutputText { text } = item {
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentMessage(AgentMessageEvent { message: text }),
msg: EventMsg::AgentMessage(AgentMessageEvent {
id: id.clone(),
message: text,
}),
};
sess.tx_event.send(event).await.ok();
}
}
None
}
ResponseItem::Reasoning { id: _, summary } => {
ResponseItem::Reasoning { id, summary } => {
for item in summary {
let text = match item {
ReasoningItemReasoningSummary::SummaryText { text } => text,
};
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentReasoning(AgentReasoningEvent { text }),
msg: EventMsg::AgentReasoning(AgentReasoningEvent {
id: id.clone(),
text,
}),
};
sess.tx_event.send(event).await.ok();
}
@@ -2092,7 +2098,7 @@ fn format_exec_output(output: &str, exit_code: i32, duration: Duration) -> Strin
fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -> Option<String> {
responses.iter().rev().find_map(|item| {
if let ResponseItem::Message { role, content } = item {
if let ResponseItem::Message { role, content, .. } = item {
if role == "assistant" {
content.iter().rev().find_map(|ci| {
if let ContentItem::OutputText { text } = ci {

View File

@@ -37,6 +37,7 @@ pub enum ContentItem {
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ResponseItem {
Message {
id: Option<String>,
role: String,
content: Vec<ContentItem>,
},
@@ -78,7 +79,11 @@ pub enum ResponseItem {
impl From<ResponseInputItem> for ResponseItem {
fn from(item: ResponseInputItem) -> Self {
match item {
ResponseInputItem::Message { role, content } => Self::Message { role, content },
ResponseInputItem::Message { role, content } => Self::Message {
id: None,
role,
content,
},
ResponseInputItem::FunctionCallOutput { call_id, output } => {
Self::FunctionCallOutput { call_id, output }
}

View File

@@ -351,22 +351,26 @@ pub struct TokenUsage {
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct AgentMessageEvent {
pub id: Option<String>,
pub message: String,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct AgentMessageDeltaEvent {
pub delta: String,
pub item_id: String,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct AgentReasoningEvent {
pub id: String,
pub text: String,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct AgentReasoningDeltaEvent {
pub delta: String,
pub item_id: String,
}
#[derive(Debug, Clone, Deserialize, Serialize)]

View File

@@ -120,7 +120,7 @@ async fn live_streaming_and_prev_id_reset() {
.expect("agent closed");
match &ev.msg {
EventMsg::AgentMessage(AgentMessageEvent { message })
EventMsg::AgentMessage(AgentMessageEvent { id: _, message })
if message.contains("second turn succeeded") =>
{
got_expected = true;

View File

@@ -174,7 +174,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
EventMsg::TokenCount(TokenUsage { total_tokens, .. }) => {
ts_println!(self, "tokens used: {total_tokens}");
}
EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }) => {
EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta, item_id: _ }) => {
if !self.answer_started {
ts_println!(self, "{}\n", "codex".style(self.italic).style(self.magenta));
self.answer_started = true;
@@ -183,7 +183,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
#[allow(clippy::expect_used)]
std::io::stdout().flush().expect("could not flush stdout");
}
EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }) => {
EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta, item_id: _ }) => {
if !self.show_agent_reasoning {
return;
}
@@ -199,7 +199,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
#[allow(clippy::expect_used)]
std::io::stdout().flush().expect("could not flush stdout");
}
EventMsg::AgentMessage(AgentMessageEvent { message }) => {
EventMsg::AgentMessage(AgentMessageEvent { id: _, message }) => {
// if answer_started is false, this means we haven't received any
// delta. Thus, we need to print the message as a new answer.
if !self.answer_started {

View File

@@ -16,6 +16,7 @@ use serde::Serialize;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tracing::trace;
use tracing::warn;
pub(crate) struct OutgoingMessageSender {
@@ -79,6 +80,7 @@ impl OutgoingMessageSender {
}
pub(crate) async fn send_event_as_notification(&self, event: &Event) {
trace!(?event, "sending event as notification");
#[expect(clippy::expect_used)]
let params = Some(serde_json::to_value(event).expect("Event must serialize"));
let outgoing_message = OutgoingMessage::Notification(OutgoingNotification {

View File

@@ -246,7 +246,7 @@ impl ChatWidget<'_> {
self.request_redraw();
}
EventMsg::AgentMessage(AgentMessageEvent { message }) => {
EventMsg::AgentMessage(AgentMessageEvent { id: _, message }) => {
// if the answer buffer is empty, this means we haven't received any
// delta. Thus, we need to print the message as a new answer.
if self.answer_buffer.is_empty() {
@@ -259,7 +259,7 @@ impl ChatWidget<'_> {
self.answer_buffer.clear();
self.request_redraw();
}
EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }) => {
EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta, .. }) => {
if self.answer_buffer.is_empty() {
self.conversation_history
.add_agent_message(&self.config, "".to_string());
@@ -269,7 +269,7 @@ impl ChatWidget<'_> {
.replace_prev_agent_message(&self.config, self.answer_buffer.clone());
self.request_redraw();
}
EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }) => {
EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta, .. }) => {
if self.reasoning_buffer.is_empty() {
self.conversation_history
.add_agent_reasoning(&self.config, "".to_string());
@@ -279,7 +279,7 @@ impl ChatWidget<'_> {
.replace_prev_agent_reasoning(&self.config, self.reasoning_buffer.clone());
self.request_redraw();
}
EventMsg::AgentReasoning(AgentReasoningEvent { text }) => {
EventMsg::AgentReasoning(AgentReasoningEvent { id: _, text }) => {
// if the reasoning buffer is empty, this means we haven't received any
// delta. Thus, we need to print the message as a new reasoning.
if self.reasoning_buffer.is_empty() {