mirror of
https://github.com/openai/codex.git
synced 2026-02-09 10:23:42 +00:00
Compare commits
1 Commits
patch-squa
...
dh--delta-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
80fafc212f |
@@ -41,7 +41,12 @@ pub(crate) async fn stream_chat_completions(
|
||||
|
||||
for item in &prompt.input {
|
||||
match item {
|
||||
ResponseItem::Message { role, content } => {
|
||||
ResponseItem::Message {
|
||||
// id will always be None for input items
|
||||
id: _,
|
||||
role,
|
||||
content,
|
||||
} => {
|
||||
let mut text = String::new();
|
||||
for c in content {
|
||||
match c {
|
||||
@@ -255,6 +260,7 @@ async fn process_chat_sse<S>(
|
||||
.and_then(|c| c.as_str())
|
||||
{
|
||||
let item = ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".to_string(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: content.to_string(),
|
||||
@@ -402,6 +408,7 @@ where
|
||||
}))) => {
|
||||
if !this.cumulative.is_empty() {
|
||||
let aggregated_item = crate::models::ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".to_string(),
|
||||
content: vec![crate::models::ContentItem::OutputText {
|
||||
text: std::mem::take(&mut this.cumulative),
|
||||
@@ -430,8 +437,8 @@ where
|
||||
// will never appear in a Chat Completions stream.
|
||||
continue;
|
||||
}
|
||||
Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(_))))
|
||||
| Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta(_)))) => {
|
||||
Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta { .. })))
|
||||
| Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta { .. }))) => {
|
||||
// Deltas are ignored here since aggregation waits for the
|
||||
// final OutputItemDone.
|
||||
continue;
|
||||
|
||||
@@ -230,6 +230,7 @@ struct SseEvent {
|
||||
response: Option<Value>,
|
||||
item: Option<Value>,
|
||||
delta: Option<String>,
|
||||
item_id: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
@@ -360,21 +361,22 @@ async fn process_sse<S>(
|
||||
};
|
||||
|
||||
let event = ResponseEvent::OutputItemDone(item);
|
||||
trace!(?event, "output_item.done");
|
||||
if tx_event.send(Ok(event)).await.is_err() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
"response.output_text.delta" => {
|
||||
if let Some(delta) = event.delta {
|
||||
let event = ResponseEvent::OutputTextDelta(delta);
|
||||
if let (Some(delta), Some(item_id)) = (event.delta, event.item_id) {
|
||||
let event = ResponseEvent::OutputTextDelta { delta, item_id };
|
||||
if tx_event.send(Ok(event)).await.is_err() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
"response.reasoning_summary_text.delta" => {
|
||||
if let Some(delta) = event.delta {
|
||||
let event = ResponseEvent::ReasoningSummaryDelta(delta);
|
||||
if let (Some(delta), Some(item_id)) = (event.delta, event.item_id) {
|
||||
let event = ResponseEvent::ReasoningSummaryDelta { delta, item_id };
|
||||
if tx_event.send(Ok(event)).await.is_err() {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -56,6 +56,8 @@ impl Prompt {
|
||||
}
|
||||
}
|
||||
|
||||
/// Events emitted by the response stream.
|
||||
/// https://platform.openai.com/docs/api-reference/responses-streaming/response
|
||||
#[derive(Debug)]
|
||||
pub enum ResponseEvent {
|
||||
Created,
|
||||
@@ -64,8 +66,14 @@ pub enum ResponseEvent {
|
||||
response_id: String,
|
||||
token_usage: Option<TokenUsage>,
|
||||
},
|
||||
OutputTextDelta(String),
|
||||
ReasoningSummaryDelta(String),
|
||||
OutputTextDelta {
|
||||
delta: String,
|
||||
item_id: String,
|
||||
},
|
||||
ReasoningSummaryDelta {
|
||||
delta: String,
|
||||
item_id: String,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
|
||||
@@ -1248,17 +1248,17 @@ async fn try_run_turn(
|
||||
state.previous_response_id = Some(response_id);
|
||||
return Ok(output);
|
||||
}
|
||||
ResponseEvent::OutputTextDelta(delta) => {
|
||||
ResponseEvent::OutputTextDelta { delta, item_id } => {
|
||||
let event = Event {
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }),
|
||||
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta, item_id }),
|
||||
};
|
||||
sess.tx_event.send(event).await.ok();
|
||||
}
|
||||
ResponseEvent::ReasoningSummaryDelta(delta) => {
|
||||
ResponseEvent::ReasoningSummaryDelta { delta, item_id } => {
|
||||
let event = Event {
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }),
|
||||
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta, item_id }),
|
||||
};
|
||||
sess.tx_event.send(event).await.ok();
|
||||
}
|
||||
@@ -1273,26 +1273,32 @@ async fn handle_response_item(
|
||||
) -> CodexResult<Option<ResponseInputItem>> {
|
||||
debug!(?item, "Output item");
|
||||
let output = match item {
|
||||
ResponseItem::Message { content, .. } => {
|
||||
ResponseItem::Message { content, id, .. } => {
|
||||
for item in content {
|
||||
if let ContentItem::OutputText { text } = item {
|
||||
let event = Event {
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::AgentMessage(AgentMessageEvent { message: text }),
|
||||
msg: EventMsg::AgentMessage(AgentMessageEvent {
|
||||
id: id.clone(),
|
||||
message: text,
|
||||
}),
|
||||
};
|
||||
sess.tx_event.send(event).await.ok();
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
ResponseItem::Reasoning { id: _, summary } => {
|
||||
ResponseItem::Reasoning { id, summary } => {
|
||||
for item in summary {
|
||||
let text = match item {
|
||||
ReasoningItemReasoningSummary::SummaryText { text } => text,
|
||||
};
|
||||
let event = Event {
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::AgentReasoning(AgentReasoningEvent { text }),
|
||||
msg: EventMsg::AgentReasoning(AgentReasoningEvent {
|
||||
id: id.clone(),
|
||||
text,
|
||||
}),
|
||||
};
|
||||
sess.tx_event.send(event).await.ok();
|
||||
}
|
||||
@@ -2092,7 +2098,7 @@ fn format_exec_output(output: &str, exit_code: i32, duration: Duration) -> Strin
|
||||
|
||||
fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -> Option<String> {
|
||||
responses.iter().rev().find_map(|item| {
|
||||
if let ResponseItem::Message { role, content } = item {
|
||||
if let ResponseItem::Message { role, content, .. } = item {
|
||||
if role == "assistant" {
|
||||
content.iter().rev().find_map(|ci| {
|
||||
if let ContentItem::OutputText { text } = ci {
|
||||
|
||||
@@ -37,6 +37,7 @@ pub enum ContentItem {
|
||||
#[serde(tag = "type", rename_all = "snake_case")]
|
||||
pub enum ResponseItem {
|
||||
Message {
|
||||
id: Option<String>,
|
||||
role: String,
|
||||
content: Vec<ContentItem>,
|
||||
},
|
||||
@@ -78,7 +79,11 @@ pub enum ResponseItem {
|
||||
impl From<ResponseInputItem> for ResponseItem {
|
||||
fn from(item: ResponseInputItem) -> Self {
|
||||
match item {
|
||||
ResponseInputItem::Message { role, content } => Self::Message { role, content },
|
||||
ResponseInputItem::Message { role, content } => Self::Message {
|
||||
id: None,
|
||||
role,
|
||||
content,
|
||||
},
|
||||
ResponseInputItem::FunctionCallOutput { call_id, output } => {
|
||||
Self::FunctionCallOutput { call_id, output }
|
||||
}
|
||||
|
||||
@@ -351,22 +351,26 @@ pub struct TokenUsage {
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct AgentMessageEvent {
|
||||
pub id: Option<String>,
|
||||
pub message: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct AgentMessageDeltaEvent {
|
||||
pub delta: String,
|
||||
pub item_id: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct AgentReasoningEvent {
|
||||
pub id: String,
|
||||
pub text: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct AgentReasoningDeltaEvent {
|
||||
pub delta: String,
|
||||
pub item_id: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
|
||||
@@ -120,7 +120,7 @@ async fn live_streaming_and_prev_id_reset() {
|
||||
.expect("agent closed");
|
||||
|
||||
match &ev.msg {
|
||||
EventMsg::AgentMessage(AgentMessageEvent { message })
|
||||
EventMsg::AgentMessage(AgentMessageEvent { id: _, message })
|
||||
if message.contains("second turn succeeded") =>
|
||||
{
|
||||
got_expected = true;
|
||||
|
||||
@@ -174,7 +174,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
|
||||
EventMsg::TokenCount(TokenUsage { total_tokens, .. }) => {
|
||||
ts_println!(self, "tokens used: {total_tokens}");
|
||||
}
|
||||
EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }) => {
|
||||
EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta, item_id: _ }) => {
|
||||
if !self.answer_started {
|
||||
ts_println!(self, "{}\n", "codex".style(self.italic).style(self.magenta));
|
||||
self.answer_started = true;
|
||||
@@ -183,7 +183,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
|
||||
#[allow(clippy::expect_used)]
|
||||
std::io::stdout().flush().expect("could not flush stdout");
|
||||
}
|
||||
EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }) => {
|
||||
EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta, item_id: _ }) => {
|
||||
if !self.show_agent_reasoning {
|
||||
return;
|
||||
}
|
||||
@@ -199,7 +199,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
|
||||
#[allow(clippy::expect_used)]
|
||||
std::io::stdout().flush().expect("could not flush stdout");
|
||||
}
|
||||
EventMsg::AgentMessage(AgentMessageEvent { message }) => {
|
||||
EventMsg::AgentMessage(AgentMessageEvent { id: _, message }) => {
|
||||
// if answer_started is false, this means we haven't received any
|
||||
// delta. Thus, we need to print the message as a new answer.
|
||||
if !self.answer_started {
|
||||
|
||||
@@ -16,6 +16,7 @@ use serde::Serialize;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tracing::trace;
|
||||
use tracing::warn;
|
||||
|
||||
pub(crate) struct OutgoingMessageSender {
|
||||
@@ -79,6 +80,7 @@ impl OutgoingMessageSender {
|
||||
}
|
||||
|
||||
pub(crate) async fn send_event_as_notification(&self, event: &Event) {
|
||||
trace!(?event, "sending event as notification");
|
||||
#[expect(clippy::expect_used)]
|
||||
let params = Some(serde_json::to_value(event).expect("Event must serialize"));
|
||||
let outgoing_message = OutgoingMessage::Notification(OutgoingNotification {
|
||||
|
||||
@@ -246,7 +246,7 @@ impl ChatWidget<'_> {
|
||||
|
||||
self.request_redraw();
|
||||
}
|
||||
EventMsg::AgentMessage(AgentMessageEvent { message }) => {
|
||||
EventMsg::AgentMessage(AgentMessageEvent { id: _, message }) => {
|
||||
// if the answer buffer is empty, this means we haven't received any
|
||||
// delta. Thus, we need to print the message as a new answer.
|
||||
if self.answer_buffer.is_empty() {
|
||||
@@ -259,7 +259,7 @@ impl ChatWidget<'_> {
|
||||
self.answer_buffer.clear();
|
||||
self.request_redraw();
|
||||
}
|
||||
EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }) => {
|
||||
EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta, .. }) => {
|
||||
if self.answer_buffer.is_empty() {
|
||||
self.conversation_history
|
||||
.add_agent_message(&self.config, "".to_string());
|
||||
@@ -269,7 +269,7 @@ impl ChatWidget<'_> {
|
||||
.replace_prev_agent_message(&self.config, self.answer_buffer.clone());
|
||||
self.request_redraw();
|
||||
}
|
||||
EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }) => {
|
||||
EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta, .. }) => {
|
||||
if self.reasoning_buffer.is_empty() {
|
||||
self.conversation_history
|
||||
.add_agent_reasoning(&self.config, "".to_string());
|
||||
@@ -279,7 +279,7 @@ impl ChatWidget<'_> {
|
||||
.replace_prev_agent_reasoning(&self.config, self.reasoning_buffer.clone());
|
||||
self.request_redraw();
|
||||
}
|
||||
EventMsg::AgentReasoning(AgentReasoningEvent { text }) => {
|
||||
EventMsg::AgentReasoning(AgentReasoningEvent { id: _, text }) => {
|
||||
// if the reasoning buffer is empty, this means we haven't received any
|
||||
// delta. Thus, we need to print the message as a new reasoning.
|
||||
if self.reasoning_buffer.is_empty() {
|
||||
|
||||
Reference in New Issue
Block a user