diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs index 6f437fa410..bcbd356c67 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs @@ -20,7 +20,6 @@ use futures::SinkExt; use futures::StreamExt; use http::HeaderMap; use http::HeaderValue; -use serde_json::Value; use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::AtomicBool; @@ -230,10 +229,6 @@ impl RealtimeWebsocketConnection { .await } - pub async fn send_response_create(&self) -> Result<(), ApiError> { - self.writer.send_response_create().await - } - pub async fn close(&self) -> Result<(), ApiError> { self.writer.close().await } @@ -302,10 +297,6 @@ impl RealtimeWebsocketWriter { .await } - pub async fn send_json_value(&self, message: Value) -> Result<(), ApiError> { - self.send_payload(message.to_string()).await - } - pub async fn send_session_update( &self, instructions: String, @@ -337,7 +328,7 @@ impl RealtimeWebsocketWriter { self.send_payload(payload).await } - async fn send_payload(&self, payload: String) -> Result<(), ApiError> { + pub async fn send_payload(&self, payload: String) -> Result<(), ApiError> { if self.is_closed.load(Ordering::SeqCst) { return Err(ApiError::Stream( "realtime websocket connection is closed".to_string(), diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index 829778e418..4584c7ff66 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -111,52 +111,6 @@ impl RealtimeHandoffState { session_kind, } } - - async fn send_output(&self, output_text: String) -> CodexResult<()> { - let Some(handoff_id) = self.active_handoff.lock().await.clone() else { - return Ok(()); - }; - - *self.last_output_text.lock().await = Some(output_text.clone()); - match self.session_kind { - RealtimeSessionKind::V1 => { - self.output_tx - .send(HandoffOutput::ImmediateAppend { - handoff_id, - output_text, - }) - .await - .map_err(|_| { - CodexErr::InvalidRequest("conversation is not running".to_string()) - })?; - } - RealtimeSessionKind::V2 => {} - } - Ok(()) - } - - async fn send_final_output(&self) -> CodexResult<()> { - match self.session_kind { - RealtimeSessionKind::V1 => return Ok(()), - RealtimeSessionKind::V2 => {} - } - - let Some(handoff_id) = self.active_handoff.lock().await.clone() else { - return Ok(()); - }; - let Some(output_text) = self.last_output_text.lock().await.clone() else { - return Ok(()); - }; - - self.output_tx - .send(HandoffOutput::FinalToolCall { - handoff_id, - output_text, - }) - .await - .map_err(|_| CodexErr::InvalidRequest("conversation is not running".to_string()))?; - Ok(()) - } } #[allow(dead_code)] @@ -304,7 +258,22 @@ impl RealtimeConversationManager { state.handoff.clone() }; - handoff.send_output(output_text).await + let Some(handoff_id) = handoff.active_handoff.lock().await.clone() else { + return Ok(()); + }; + + *handoff.last_output_text.lock().await = Some(output_text.clone()); + if matches!(handoff.session_kind, RealtimeSessionKind::V1) { + handoff + .output_tx + .send(HandoffOutput::ImmediateAppend { + handoff_id, + output_text, + }) + .await + .map_err(|_| CodexErr::InvalidRequest("conversation is not running".to_string()))?; + } + Ok(()) } pub(crate) async fn handoff_complete(&self) -> CodexResult<()> { @@ -315,7 +284,25 @@ impl RealtimeConversationManager { let Some(handoff) = handoff else { return Ok(()); }; - handoff.send_final_output().await + if matches!(handoff.session_kind, RealtimeSessionKind::V1) { + return Ok(()); + } + + let Some(handoff_id) = handoff.active_handoff.lock().await.clone() else { + return Ok(()); + }; + let Some(output_text) = handoff.last_output_text.lock().await.clone() else { + return Ok(()); + }; + + handoff + .output_tx + .send(HandoffOutput::FinalToolCall { + handoff_id, + output_text, + }) + .await + .map_err(|_| CodexErr::InvalidRequest("conversation is not running".to_string())) } pub(crate) async fn active_handoff_id(&self) -> Option { @@ -716,12 +703,13 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> { .as_deref() .is_none_or(|item_id| item_id == output_audio_state.item_id) && let Err(err) = writer - .send_json_value(json!({ + .send_payload(json!({ "type": "conversation.item.truncate", "item_id": output_audio_state.item_id, "content_index": 0, "audio_end_ms": output_audio_state.audio_end_ms, - })) + }) + .to_string()) .await { let mapped_error = map_api_error(err);