mirror of
https://github.com/openai/codex.git
synced 2026-04-24 06:35:50 +00:00
Wrap realtime delegations in XML
This commit is contained in:
@@ -71,6 +71,8 @@ const REALTIME_V2_STEER_ACKNOWLEDGEMENT: &str =
|
||||
"This was sent to steer the previous background agent task.";
|
||||
const REALTIME_ACTIVE_RESPONSE_ERROR_PREFIX: &str =
|
||||
"Conversation already has an active response in progress:";
|
||||
const REALTIME_DELEGATION_OPEN_TAG: &str = "<realtime_delegation>\n <input>";
|
||||
const REALTIME_DELEGATION_CLOSE_TAG: &str = "</input>\n</realtime_delegation>";
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
enum RealtimeConversationEnd {
|
||||
@@ -862,9 +864,16 @@ fn realtime_text_from_handoff_request(handoff: &RealtimeHandoffRequested) -> Opt
|
||||
.map(|entry| format!("{role}: {text}", role = entry.role, text = entry.text))
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n");
|
||||
(!active_transcript.is_empty())
|
||||
let text = (!active_transcript.is_empty())
|
||||
.then_some(active_transcript)
|
||||
.or((!handoff.input_transcript.is_empty()).then_some(handoff.input_transcript.clone()))
|
||||
.or((!handoff.input_transcript.is_empty()).then_some(handoff.input_transcript.clone()))?;
|
||||
let text = text
|
||||
.replace('&', "&")
|
||||
.replace('<', "<")
|
||||
.replace('>', ">");
|
||||
Some(format!(
|
||||
"{REALTIME_DELEGATION_OPEN_TAG}{text}{REALTIME_DELEGATION_CLOSE_TAG}"
|
||||
))
|
||||
}
|
||||
|
||||
fn realtime_api_key(auth: Option<&CodexAuth>, provider: &ModelProviderInfo) -> CodexResult<String> {
|
||||
|
||||
@@ -63,12 +63,20 @@ const MEMORY_PROMPT_PHRASE: &str =
|
||||
"You have access to a memory folder with guidance from prior runs.";
|
||||
const REALTIME_CONVERSATION_TEST_SUBPROCESS_ENV_VAR: &str =
|
||||
"CODEX_REALTIME_CONVERSATION_TEST_SUBPROCESS";
|
||||
const EXPECTED_REALTIME_DELEGATION_OPEN_TAG: &str = "<realtime_delegation>\n <input>";
|
||||
const EXPECTED_REALTIME_DELEGATION_CLOSE_TAG: &str = "</input>\n</realtime_delegation>";
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct RealtimeCallRequestCapture {
|
||||
requests: Arc<Mutex<Vec<WiremockRequest>>>,
|
||||
}
|
||||
|
||||
fn expected_realtime_delegation(input: &str) -> String {
|
||||
format!(
|
||||
"{EXPECTED_REALTIME_DELEGATION_OPEN_TAG}{input}{EXPECTED_REALTIME_DELEGATION_CLOSE_TAG}"
|
||||
)
|
||||
}
|
||||
|
||||
impl RealtimeCallRequestCapture {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
@@ -2351,15 +2359,11 @@ async fn inbound_handoff_request_starts_turn() -> Result<()> {
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_inbound", "instructions": "backend prompt" }
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.input_transcript.delta",
|
||||
"delta": "text from realtime"
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.handoff.requested",
|
||||
"handoff_id": "handoff_inbound",
|
||||
"item_id": "item_inbound",
|
||||
"input_transcript": "text from realtime"
|
||||
"input_transcript": "text <from> & realtime"
|
||||
}),
|
||||
]]])
|
||||
.await;
|
||||
@@ -2396,7 +2400,7 @@ async fn inbound_handoff_request_starts_turn() -> Result<()> {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::HandoffRequested(handoff),
|
||||
}) if handoff.handoff_id == "handoff_inbound"
|
||||
&& handoff.input_transcript == "text from realtime" =>
|
||||
&& handoff.input_transcript == "text <from> & realtime" =>
|
||||
{
|
||||
Some(())
|
||||
}
|
||||
@@ -2411,10 +2415,93 @@ async fn inbound_handoff_request_starts_turn() -> Result<()> {
|
||||
|
||||
let request = response_mock.single_request();
|
||||
let user_texts = request.message_input_texts("user");
|
||||
assert!(
|
||||
user_texts
|
||||
.iter()
|
||||
.any(|text| text == "user: text from realtime")
|
||||
assert_eq!(
|
||||
user_texts,
|
||||
vec![expected_realtime_delegation(
|
||||
"text <from> & realtime"
|
||||
)]
|
||||
);
|
||||
|
||||
realtime_server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn inbound_handoff_request_from_realtime_v2_starts_turn() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let api_server = start_mock_server().await;
|
||||
let response_mock = responses::mount_sse_once(
|
||||
&api_server,
|
||||
responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_assistant_message("msg-1", "ok"),
|
||||
responses::ev_completed("resp-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let arguments = serde_json::to_string(&json!({ "prompt": "text <from> & realtime" }))?;
|
||||
let realtime_server = start_websocket_server(vec![vec![vec![
|
||||
json!({
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_inbound_v2", "instructions": "backend prompt" }
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.item.done",
|
||||
"item": {
|
||||
"id": "item_inbound_v2",
|
||||
"type": "function_call",
|
||||
"name": "background_agent",
|
||||
"call_id": "handoff_inbound_v2",
|
||||
"arguments": arguments
|
||||
}
|
||||
}),
|
||||
]]])
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex().with_config({
|
||||
let realtime_base_url = realtime_server.uri().to_string();
|
||||
move |config| {
|
||||
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
||||
config.realtime.version = RealtimeWsVersion::V2;
|
||||
}
|
||||
});
|
||||
let test = builder.build(&api_server).await?;
|
||||
|
||||
test.codex
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
output_modality: RealtimeOutputModality::Audio,
|
||||
prompt: Some(Some("backend prompt".to_string())),
|
||||
session_id: None,
|
||||
transport: None,
|
||||
voice: None,
|
||||
}))
|
||||
.await?;
|
||||
|
||||
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::HandoffRequested(handoff),
|
||||
}) if handoff.handoff_id == "handoff_inbound_v2"
|
||||
&& handoff.input_transcript == "text <from> & realtime" =>
|
||||
{
|
||||
Some(())
|
||||
}
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
|
||||
wait_for_event(&test.codex, |event| {
|
||||
matches!(event, EventMsg::TurnComplete(_))
|
||||
})
|
||||
.await;
|
||||
|
||||
let request = response_mock.single_request();
|
||||
let user_texts = request.message_input_texts("user");
|
||||
assert_eq!(
|
||||
user_texts,
|
||||
vec![expected_realtime_delegation(
|
||||
"text <from> & realtime"
|
||||
)]
|
||||
);
|
||||
|
||||
realtime_server.shutdown().await;
|
||||
@@ -2496,8 +2583,12 @@ async fn inbound_handoff_request_uses_active_transcript() -> Result<()> {
|
||||
|
||||
let request = response_mock.single_request();
|
||||
let user_texts = request.message_input_texts("user");
|
||||
assert!(user_texts.iter().any(|text| text
|
||||
== "assistant: assistant context\nuser: delegated query\nassistant: assist confirm"));
|
||||
assert_eq!(
|
||||
user_texts,
|
||||
vec![expected_realtime_delegation(
|
||||
"assistant: assistant context\nuser: delegated query\nassistant: assist confirm"
|
||||
)]
|
||||
);
|
||||
|
||||
realtime_server.shutdown().await;
|
||||
Ok(())
|
||||
@@ -2611,22 +2702,20 @@ async fn inbound_handoff_request_clears_active_transcript_after_each_handoff() -
|
||||
assert_eq!(requests.len(), 2);
|
||||
|
||||
let first_user_texts = requests[0].message_input_texts("user");
|
||||
assert!(
|
||||
first_user_texts
|
||||
.iter()
|
||||
.any(|text| text == "user: first question")
|
||||
assert_eq!(
|
||||
first_user_texts,
|
||||
vec![expected_realtime_delegation("user: first question")]
|
||||
);
|
||||
|
||||
let second_user_texts = requests[1].message_input_texts("user");
|
||||
assert!(
|
||||
second_user_texts
|
||||
.iter()
|
||||
.any(|text| text == "user: second question")
|
||||
.any(|text| text == &expected_realtime_delegation("user: second question"))
|
||||
);
|
||||
assert!(
|
||||
!second_user_texts
|
||||
.iter()
|
||||
.any(|text| text == "user: first question\nuser: second question")
|
||||
!second_user_texts.iter().any(|text| text
|
||||
== &expected_realtime_delegation("user: first question\nuser: second question"))
|
||||
);
|
||||
|
||||
realtime_server.shutdown().await;
|
||||
@@ -3146,17 +3235,13 @@ async fn inbound_handoff_request_steers_active_turn() -> Result<()> {
|
||||
let first_texts = message_input_texts(&first_body, "user");
|
||||
let second_texts = message_input_texts(&second_body, "user");
|
||||
|
||||
assert!(first_texts.iter().any(|text| text == "first prompt"));
|
||||
assert!(
|
||||
!first_texts
|
||||
.iter()
|
||||
.any(|text| text == "user: steer via realtime")
|
||||
);
|
||||
assert!(second_texts.iter().any(|text| text == "first prompt"));
|
||||
assert!(
|
||||
second_texts
|
||||
.iter()
|
||||
.any(|text| text == "user: steer via realtime")
|
||||
assert_eq!(first_texts, vec!["first prompt".to_string()]);
|
||||
assert_eq!(
|
||||
second_texts,
|
||||
vec![
|
||||
"first prompt".to_string(),
|
||||
expected_realtime_delegation("user: steer via realtime"),
|
||||
]
|
||||
);
|
||||
|
||||
realtime_server.shutdown().await;
|
||||
@@ -3272,8 +3357,12 @@ async fn inbound_handoff_request_starts_turn_and_does_not_block_realtime_audio()
|
||||
assert_eq!(requests.len(), 1);
|
||||
let first_body: Value = serde_json::from_slice(&requests[0]).expect("parse first request");
|
||||
let first_texts = message_input_texts(&first_body, "user");
|
||||
let expected_text = format!("user: {delegated_text}");
|
||||
assert!(first_texts.iter().any(|text| text == &expected_text));
|
||||
assert_eq!(
|
||||
first_texts,
|
||||
vec![expected_realtime_delegation(&format!(
|
||||
"user: {delegated_text}"
|
||||
))]
|
||||
);
|
||||
|
||||
realtime_server.shutdown().await;
|
||||
api_server.shutdown().await;
|
||||
|
||||
Reference in New Issue
Block a user