mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
Inline thin realtime runtime helpers
Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
@@ -20,7 +20,6 @@ use futures::SinkExt;
|
||||
use futures::StreamExt;
|
||||
use http::HeaderMap;
|
||||
use http::HeaderValue;
|
||||
use serde_json::Value;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
@@ -230,10 +229,6 @@ impl RealtimeWebsocketConnection {
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn send_response_create(&self) -> Result<(), ApiError> {
|
||||
self.writer.send_response_create().await
|
||||
}
|
||||
|
||||
pub async fn close(&self) -> Result<(), ApiError> {
|
||||
self.writer.close().await
|
||||
}
|
||||
@@ -302,10 +297,6 @@ impl RealtimeWebsocketWriter {
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn send_json_value(&self, message: Value) -> Result<(), ApiError> {
|
||||
self.send_payload(message.to_string()).await
|
||||
}
|
||||
|
||||
pub async fn send_session_update(
|
||||
&self,
|
||||
instructions: String,
|
||||
@@ -337,7 +328,7 @@ impl RealtimeWebsocketWriter {
|
||||
self.send_payload(payload).await
|
||||
}
|
||||
|
||||
async fn send_payload(&self, payload: String) -> Result<(), ApiError> {
|
||||
pub async fn send_payload(&self, payload: String) -> Result<(), ApiError> {
|
||||
if self.is_closed.load(Ordering::SeqCst) {
|
||||
return Err(ApiError::Stream(
|
||||
"realtime websocket connection is closed".to_string(),
|
||||
|
||||
@@ -111,52 +111,6 @@ impl RealtimeHandoffState {
|
||||
session_kind,
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_output(&self, output_text: String) -> CodexResult<()> {
|
||||
let Some(handoff_id) = self.active_handoff.lock().await.clone() else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
*self.last_output_text.lock().await = Some(output_text.clone());
|
||||
match self.session_kind {
|
||||
RealtimeSessionKind::V1 => {
|
||||
self.output_tx
|
||||
.send(HandoffOutput::ImmediateAppend {
|
||||
handoff_id,
|
||||
output_text,
|
||||
})
|
||||
.await
|
||||
.map_err(|_| {
|
||||
CodexErr::InvalidRequest("conversation is not running".to_string())
|
||||
})?;
|
||||
}
|
||||
RealtimeSessionKind::V2 => {}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_final_output(&self) -> CodexResult<()> {
|
||||
match self.session_kind {
|
||||
RealtimeSessionKind::V1 => return Ok(()),
|
||||
RealtimeSessionKind::V2 => {}
|
||||
}
|
||||
|
||||
let Some(handoff_id) = self.active_handoff.lock().await.clone() else {
|
||||
return Ok(());
|
||||
};
|
||||
let Some(output_text) = self.last_output_text.lock().await.clone() else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
self.output_tx
|
||||
.send(HandoffOutput::FinalToolCall {
|
||||
handoff_id,
|
||||
output_text,
|
||||
})
|
||||
.await
|
||||
.map_err(|_| CodexErr::InvalidRequest("conversation is not running".to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
@@ -304,7 +258,22 @@ impl RealtimeConversationManager {
|
||||
state.handoff.clone()
|
||||
};
|
||||
|
||||
handoff.send_output(output_text).await
|
||||
let Some(handoff_id) = handoff.active_handoff.lock().await.clone() else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
*handoff.last_output_text.lock().await = Some(output_text.clone());
|
||||
if matches!(handoff.session_kind, RealtimeSessionKind::V1) {
|
||||
handoff
|
||||
.output_tx
|
||||
.send(HandoffOutput::ImmediateAppend {
|
||||
handoff_id,
|
||||
output_text,
|
||||
})
|
||||
.await
|
||||
.map_err(|_| CodexErr::InvalidRequest("conversation is not running".to_string()))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn handoff_complete(&self) -> CodexResult<()> {
|
||||
@@ -315,7 +284,25 @@ impl RealtimeConversationManager {
|
||||
let Some(handoff) = handoff else {
|
||||
return Ok(());
|
||||
};
|
||||
handoff.send_final_output().await
|
||||
if matches!(handoff.session_kind, RealtimeSessionKind::V1) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let Some(handoff_id) = handoff.active_handoff.lock().await.clone() else {
|
||||
return Ok(());
|
||||
};
|
||||
let Some(output_text) = handoff.last_output_text.lock().await.clone() else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
handoff
|
||||
.output_tx
|
||||
.send(HandoffOutput::FinalToolCall {
|
||||
handoff_id,
|
||||
output_text,
|
||||
})
|
||||
.await
|
||||
.map_err(|_| CodexErr::InvalidRequest("conversation is not running".to_string()))
|
||||
}
|
||||
|
||||
pub(crate) async fn active_handoff_id(&self) -> Option<String> {
|
||||
@@ -716,12 +703,13 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
|
||||
.as_deref()
|
||||
.is_none_or(|item_id| item_id == output_audio_state.item_id)
|
||||
&& let Err(err) = writer
|
||||
.send_json_value(json!({
|
||||
.send_payload(json!({
|
||||
"type": "conversation.item.truncate",
|
||||
"item_id": output_audio_state.item_id,
|
||||
"content_index": 0,
|
||||
"audio_end_ms": output_audio_state.audio_end_ms,
|
||||
}))
|
||||
})
|
||||
.to_string())
|
||||
.await
|
||||
{
|
||||
let mapped_error = map_api_error(err);
|
||||
|
||||
Reference in New Issue
Block a user