Stream Realtime V2 Codex progress

Send Codex progress to Realtime V2 as user update messages and keep new tool calls tied to the active task while it is still running.

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Ahmed Ibrahim
2026-04-09 15:42:11 -07:00
parent d75083181c
commit 3ebd01a092
3 changed files with 116 additions and 24 deletions

View File

@@ -1220,6 +1220,7 @@ async fn webrtc_v2_codex_tool_call_delegates_and_returns_function_output() -> Re
v2_codex_tool_call("call_v2", "delegate from v2"),
],
vec![],
vec![],
])]),
)
.await?;
@@ -1247,8 +1248,15 @@ async fn webrtc_v2_codex_tool_call_delegates_and_returns_function_output() -> Re
requests[0]
);
let tool_output = harness.sideband_outbound_request(/*request_index*/ 1).await;
let progress = harness.sideband_outbound_request(/*request_index*/ 1).await;
assert_v2_progress_update(&progress, "delegated from v2");
let tool_output = harness.sideband_outbound_request(/*request_index*/ 2).await;
assert_v2_function_call_output(&tool_output, "call_v2", "delegated from v2");
assert_eq!(
function_call_output_sideband_requests(&harness.realtime_server).len(),
1
);
harness.shutdown().await;
Ok(())
@@ -1276,6 +1284,7 @@ async fn webrtc_v2_tool_call_delegated_turn_can_execute_shell_tool() -> Result<(
v2_codex_tool_call("call_shell", "run shell through delegated turn"),
],
vec![],
vec![],
])]);
let mut harness = RealtimeE2eHarness::new_with_sandbox(
@@ -1327,7 +1336,10 @@ async fn webrtc_v2_tool_call_delegated_turn_can_execute_shell_tool() -> Result<(
requests[1]
);
let tool_output = harness.sideband_outbound_request(/*request_index*/ 1).await;
let progress = harness.sideband_outbound_request(/*request_index*/ 1).await;
assert_v2_progress_update(&progress, "shell tool finished");
let tool_output = harness.sideband_outbound_request(/*request_index*/ 2).await;
assert_v2_function_call_output(&tool_output, "call_shell", "shell tool finished");
assert_eq!(
function_call_output_sideband_requests(&harness.realtime_server).len(),
@@ -1377,6 +1389,7 @@ async fn webrtc_v2_tool_call_does_not_block_sideband_audio() -> Result<()> {
}),
],
vec![],
vec![],
])]),
)
.await?;
@@ -1403,7 +1416,10 @@ async fn webrtc_v2_tool_call_does_not_block_sideband_audio() -> Result<()> {
.await?;
assert_eq!(turn_completed.thread_id, harness.thread_id);
let tool_output = harness.sideband_outbound_request(/*request_index*/ 1).await;
let progress = harness.sideband_outbound_request(/*request_index*/ 1).await;
assert_v2_progress_update(&progress, "late delegated result");
let tool_output = harness.sideband_outbound_request(/*request_index*/ 2).await;
assert_v2_function_call_output(&tool_output, "call_audio", "late delegated result");
harness.shutdown().await;
@@ -1641,6 +1657,23 @@ fn assert_v2_function_call_output(request: &Value, call_id: &str, expected_outpu
);
}
fn assert_v2_progress_update(request: &Value, expected_text: &str) {
assert_eq!(
request,
&json!({
"type": "conversation.item.create",
"item": {
"type": "message",
"role": "user",
"content": [{
"type": "input_text",
"text": format!("{expected_text}\n\nUpdate from Codex (task hasn't finished yet):")
}]
}
})
);
}
fn assert_v1_session_update(request: &Value) -> Result<()> {
assert_eq!(request["type"].as_str(), Some("session.update"));
assert_eq!(request["session"]["type"].as_str(), Some("quicksilver"));

View File

@@ -28,7 +28,7 @@ use serde_json::json;
const REALTIME_V2_OUTPUT_MODALITY_AUDIO: &str = "audio";
const REALTIME_V2_TOOL_CHOICE: &str = "auto";
const REALTIME_V2_CODEX_TOOL_NAME: &str = "codex";
const REALTIME_V2_CODEX_TOOL_DESCRIPTION: &str = "Delegate a request to Codex and return the final result to the user. Use this as the default action. If the user asks to do something next, later, after this, or once current work finishes, call this tool so the work is actually queued instead of merely promising to do it later.";
const REALTIME_V2_CODEX_TOOL_DESCRIPTION: &str = "Send a user request to Codex. Use this as the default action. If Codex is idle, this starts a new task and returns the final result to the user. If Codex is already working on a task, this sends the request as guidance to steer that previous task. If the user asks to do something next, later, after this, or once current work finishes, call this tool so the work is actually queued instead of merely promising to do it later.";
pub(super) fn conversation_item_create_message(text: String) -> RealtimeOutboundMessage {
RealtimeOutboundMessage::ConversationItemCreate {

View File

@@ -62,6 +62,9 @@ const HANDOFF_OUT_QUEUE_CAPACITY: usize = 64;
const OUTPUT_EVENTS_QUEUE_CAPACITY: usize = 256;
const REALTIME_STARTUP_CONTEXT_TOKEN_BUDGET: usize = 5_000;
const DEFAULT_REALTIME_MODEL: &str = "gpt-realtime-1.5";
const REALTIME_V2_PROGRESS_UPDATE_SUFFIX: &str =
"\n\nUpdate from Codex (task hasn't finished yet):";
const REALTIME_V2_STEER_ACKNOWLEDGEMENT: &str = "This was sent to steer the previous Codex task.";
const ACTIVE_RESPONSE_CONFLICT_ERROR_PREFIX: &str =
"Conversation already has an active response in progress:";
@@ -97,7 +100,7 @@ struct RealtimeHandoffState {
#[derive(Debug, PartialEq, Eq)]
enum HandoffOutput {
ImmediateAppend {
Immediate {
handoff_id: String,
output_text: String,
},
@@ -122,6 +125,7 @@ struct RealtimeInputTask {
events_tx: Sender<RealtimeEvent>,
handoff_state: RealtimeHandoffState,
session_kind: RealtimeSessionKind,
event_parser: RealtimeEventParser,
}
impl RealtimeHandoffState {
@@ -195,7 +199,8 @@ impl RealtimeConversationManager {
model_client,
sdp,
} = start;
let session_kind = match session_config.event_parser {
let event_parser = session_config.event_parser;
let session_kind = match event_parser {
RealtimeEventParser::V1 => RealtimeSessionKind::V1,
RealtimeEventParser::RealtimeV2 => RealtimeSessionKind::V2,
};
@@ -253,6 +258,7 @@ impl RealtimeConversationManager {
events_tx,
handoff_state: handoff.clone(),
session_kind,
event_parser,
});
let mut guard = self.state.lock().await;
@@ -366,16 +372,14 @@ impl RealtimeConversationManager {
};
*handoff.last_output_text.lock().await = Some(output_text.clone());
if matches!(handoff.session_kind, RealtimeSessionKind::V1) {
handoff
.output_tx
.send(HandoffOutput::ImmediateAppend {
handoff_id,
output_text,
})
.await
.map_err(|_| CodexErr::InvalidRequest("conversation is not running".to_string()))?;
}
handoff
.output_tx
.send(HandoffOutput::Immediate {
handoff_id,
output_text,
})
.await
.map_err(|_| CodexErr::InvalidRequest("conversation is not running".to_string()))?;
Ok(())
}
@@ -855,6 +859,7 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
events_tx,
handoff_state,
session_kind,
event_parser,
} = input;
tokio::spawn(async move {
@@ -898,14 +903,28 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
match handoff_output {
Ok(handoff_output) => {
match handoff_output {
HandoffOutput::ImmediateAppend {
HandoffOutput::Immediate {
handoff_id,
output_text,
} => {
if let Err(err) = writer
.send_conversation_handoff_append(handoff_id, output_text)
.await
{
let result = match event_parser {
RealtimeEventParser::V1 => {
writer
.send_conversation_handoff_append(
handoff_id,
output_text,
)
.await
}
RealtimeEventParser::RealtimeV2 => {
writer
.send_conversation_item_create(format!(
"{output_text}{REALTIME_V2_PROGRESS_UPDATE_SUFFIX}"
))
.await
}
};
if let Err(err) = result {
let mapped_error = map_api_error(err);
warn!("failed to send handoff output: {mapped_error}");
let _ = events_tx
@@ -1017,11 +1036,51 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
}
}
RealtimeEvent::HandoffRequested(handoff) => {
*handoff_state.active_handoff.lock().await =
Some(handoff.handoff_id.clone());
*handoff_state.last_output_text.lock().await = None;
response_in_progress = false;
output_audio_state = None;
*handoff_state.last_output_text.lock().await = None;
let acknowledge_handoff = {
let mut active_handoff = handoff_state.active_handoff.lock().await;
match session_kind {
RealtimeSessionKind::V2 if active_handoff.is_some() => true,
RealtimeSessionKind::V1 | RealtimeSessionKind::V2 => {
*active_handoff = Some(handoff.handoff_id.clone());
false
}
}
};
if acknowledge_handoff {
if let Err(err) = writer
.send_conversation_handoff_append(
handoff.handoff_id.clone(),
REALTIME_V2_STEER_ACKNOWLEDGEMENT.to_string(),
)
.await
{
let mapped_error = map_api_error(err);
warn!(
"failed to send handoff steering acknowledgement: {mapped_error}"
);
let _ = events_tx
.send(RealtimeEvent::Error(mapped_error.to_string()))
.await;
break;
}
if let Err(err) = writer.send_response_create().await {
let mapped_error = map_api_error(err);
warn!(
"failed to send handoff steering response.create: {mapped_error}"
);
let _ = events_tx
.send(RealtimeEvent::Error(mapped_error.to_string()))
.await;
break;
}
pending_response_create = false;
response_in_progress = true;
}
}
RealtimeEvent::Error(message)
if matches!(session_kind, RealtimeSessionKind::V2)