Branch realtime flow on session kind

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Ahmed Ibrahim
2026-03-16 12:57:48 -07:00
parent bb87c8ecb1
commit 3c15083067

View File

@@ -59,12 +59,18 @@ pub(crate) struct RealtimeConversationManager {
state: Mutex<Option<ConversationState>>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum RealtimeSessionKind {
V1,
V2,
}
#[derive(Clone, Debug)]
struct RealtimeHandoffState {
output_tx: Sender<HandoffOutput>,
active_handoff: Arc<Mutex<Option<String>>>,
last_output_text: Arc<Mutex<Option<String>>>,
use_final_tool_output: bool,
session_kind: RealtimeSessionKind,
}
#[derive(Debug, PartialEq, Eq)]
@@ -99,16 +105,16 @@ struct RealtimeInputTask {
audio_rx: Receiver<RealtimeAudioFrame>,
events_tx: Sender<RealtimeEvent>,
handoff_state: RealtimeHandoffState,
use_response_create_flow: bool,
session_kind: RealtimeSessionKind,
}
impl RealtimeHandoffState {
fn new(output_tx: Sender<HandoffOutput>, use_final_tool_output: bool) -> Self {
fn new(output_tx: Sender<HandoffOutput>, session_kind: RealtimeSessionKind) -> Self {
Self {
output_tx,
active_handoff: Arc::new(Mutex::new(None)),
last_output_text: Arc::new(Mutex::new(None)),
use_final_tool_output,
session_kind,
}
}
@@ -118,21 +124,27 @@ impl RealtimeHandoffState {
};
*self.last_output_text.lock().await = Some(output_text.clone());
if !self.use_final_tool_output {
self.output_tx
.send(HandoffOutput::ImmediateAppend {
handoff_id,
output_text,
})
.await
.map_err(|_| CodexErr::InvalidRequest("conversation is not running".to_string()))?;
match self.session_kind {
RealtimeSessionKind::V1 => {
self.output_tx
.send(HandoffOutput::ImmediateAppend {
handoff_id,
output_text,
})
.await
.map_err(|_| {
CodexErr::InvalidRequest("conversation is not running".to_string())
})?;
}
RealtimeSessionKind::V2 => {}
}
Ok(())
}
async fn send_final_output(&self) -> CodexResult<()> {
if !self.use_final_tool_output {
return Ok(());
match self.session_kind {
RealtimeSessionKind::V1 => return Ok(()),
RealtimeSessionKind::V2 => {}
}
let Some(handoff_id) = self.active_handoff.lock().await.clone() else {
@@ -193,8 +205,10 @@ impl RealtimeConversationManager {
state.task.abort();
let _ = state.task.await;
}
let use_response_create_flow =
session_config.event_parser == RealtimeEventParser::RealtimeV2;
let session_kind = match session_config.event_parser {
RealtimeEventParser::V1 => RealtimeSessionKind::V1,
RealtimeEventParser::RealtimeV2 => RealtimeSessionKind::V2,
};
let client = RealtimeWebsocketClient::new(api_provider);
let connection = client
@@ -218,7 +232,7 @@ impl RealtimeConversationManager {
async_channel::bounded::<RealtimeEvent>(OUTPUT_EVENTS_QUEUE_CAPACITY);
let realtime_active = Arc::new(AtomicBool::new(true));
let handoff = RealtimeHandoffState::new(handoff_output_tx, use_response_create_flow);
let handoff = RealtimeHandoffState::new(handoff_output_tx, session_kind);
let task = spawn_realtime_input_task(RealtimeInputTask {
writer: writer.clone(),
events,
@@ -227,7 +241,7 @@ impl RealtimeConversationManager {
audio_rx,
events_tx,
handoff_state: handoff.clone(),
use_response_create_flow,
session_kind,
});
let mut guard = self.state.lock().await;
@@ -580,7 +594,7 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
audio_rx,
events_tx,
handoff_state,
use_response_create_flow,
session_kind,
} = input;
tokio::spawn(async move {
@@ -598,7 +612,7 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
warn!("failed to send input text: {mapped_error}");
break;
}
if use_response_create_flow {
if matches!(session_kind, RealtimeSessionKind::V2) {
if response_in_progress {
pending_response_create = true;
} else if let Err(err) = writer.send_response_create().await {
@@ -643,7 +657,7 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
warn!("failed to send handoff output: {mapped_error}");
break;
}
if use_response_create_flow {
if matches!(session_kind, RealtimeSessionKind::V2) {
if response_in_progress {
pending_response_create = true;
} else if let Err(err) = writer.send_response_create().await {
@@ -672,10 +686,14 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
match &event {
RealtimeEvent::ConversationItemAdded(item) => {
match item.get("type").and_then(Value::as_str) {
Some("response.created") if use_response_create_flow => {
Some("response.created")
if matches!(session_kind, RealtimeSessionKind::V2) =>
{
response_in_progress = true;
}
Some("response.done") if use_response_create_flow => {
Some("response.done")
if matches!(session_kind, RealtimeSessionKind::V2) =>
{
response_in_progress = false;
output_audio_state = None;
if pending_response_create {
@@ -694,12 +712,12 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
}
}
RealtimeEvent::AudioOut(frame) => {
if use_response_create_flow {
if matches!(session_kind, RealtimeSessionKind::V2) {
update_output_audio_state(&mut output_audio_state, frame);
}
}
RealtimeEvent::InputAudioSpeechStarted(event) => {
if use_response_create_flow
if matches!(session_kind, RealtimeSessionKind::V2)
&& let Some(truncate) = output_audio_truncate_params(
&mut output_audio_state,
event.item_id.as_deref(),
@@ -719,7 +737,9 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
RealtimeEvent::ResponseCancelled(_) => {
response_in_progress = false;
output_audio_state = None;
if use_response_create_flow && pending_response_create {
if matches!(session_kind, RealtimeSessionKind::V2)
&& pending_response_create
{
if let Err(err) = writer.send_response_create().await {
let mapped_error = map_api_error(err);
warn!(
@@ -739,7 +759,7 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
output_audio_state = None;
}
RealtimeEvent::Error(message)
if use_response_create_flow
if matches!(session_kind, RealtimeSessionKind::V2)
&& message.starts_with(ACTIVE_RESPONSE_CONFLICT_ERROR_PREFIX) =>
{
warn!(