mirror of
https://github.com/openai/codex.git
synced 2026-03-04 05:33:19 +00:00
Compare commits
4 Commits
fix/notify
...
codex/real
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8d4dcadd7c | ||
|
|
be2529b0eb | ||
|
|
1cd8d5b717 | ||
|
|
faeec29511 |
@@ -162,6 +162,10 @@ pub enum SteerInputError {
|
||||
ExpectedTurnMismatch { expected: String, actual: String },
|
||||
EmptyInput,
|
||||
}
|
||||
|
||||
const REALTIME_START_DEVELOPER_INSTRUCTIONS: &str = "Realtime conversation handling is now active. Keep responses concise, low-latency, interruption-safe, and operationally safe for live interaction. This instruction applies only while realtime is active.";
|
||||
const REALTIME_END_DEVELOPER_INSTRUCTIONS: &str = "Realtime conversation handling has ended. The realtime-specific developer instruction no longer applies.";
|
||||
|
||||
use crate::exec_policy::ExecPolicyUpdateError;
|
||||
use crate::feedback_tags;
|
||||
use crate::file_watcher::FileWatcher;
|
||||
@@ -3035,6 +3039,71 @@ impl Session {
|
||||
state.session_configuration.collaboration_mode.clone()
|
||||
}
|
||||
|
||||
pub(crate) async fn realtime_instructions_active(&self) -> bool {
|
||||
let state = self.state.lock().await;
|
||||
state.realtime_instructions_active()
|
||||
}
|
||||
|
||||
pub(crate) async fn emit_realtime_start_instructions(&self) {
|
||||
if self.realtime_instructions_active().await {
|
||||
return;
|
||||
}
|
||||
self.record_session_developer_message(REALTIME_START_DEVELOPER_INSTRUCTIONS)
|
||||
.await;
|
||||
let mut state = self.state.lock().await;
|
||||
state.set_realtime_instructions_active(true);
|
||||
}
|
||||
|
||||
pub(crate) async fn emit_realtime_end_instructions(&self) {
|
||||
if !self.realtime_instructions_active().await {
|
||||
return;
|
||||
}
|
||||
self.record_session_developer_message(REALTIME_END_DEVELOPER_INSTRUCTIONS)
|
||||
.await;
|
||||
let mut state = self.state.lock().await;
|
||||
state.set_realtime_instructions_active(false);
|
||||
}
|
||||
|
||||
async fn record_session_developer_message(&self, text: &str) {
|
||||
let message: ResponseItem = DeveloperInstructions::new(text.to_string()).into();
|
||||
|
||||
let active_turn_context = {
|
||||
let active = self.active_turn.lock().await;
|
||||
active
|
||||
.as_ref()
|
||||
.and_then(|active_turn| active_turn.tasks.first())
|
||||
.map(|(_, task)| Arc::clone(&task.turn_context))
|
||||
};
|
||||
if let Some(turn_context) = active_turn_context {
|
||||
self.record_conversation_items(turn_context.as_ref(), std::slice::from_ref(&message))
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
|
||||
let session_configuration = {
|
||||
let state = self.state.lock().await;
|
||||
state.session_configuration.clone()
|
||||
};
|
||||
let per_turn_config = Self::build_per_turn_config(&session_configuration);
|
||||
let model_info = self
|
||||
.services
|
||||
.models_manager
|
||||
.get_model_info(
|
||||
session_configuration.collaboration_mode.model(),
|
||||
&per_turn_config,
|
||||
)
|
||||
.await;
|
||||
{
|
||||
let mut state = self.state.lock().await;
|
||||
state.record_items(
|
||||
std::slice::from_ref(&message).iter(),
|
||||
model_info.truncation_policy.into(),
|
||||
);
|
||||
}
|
||||
self.persist_rollout_response_items(std::slice::from_ref(&message))
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn send_raw_response_items(&self, turn_context: &TurnContext, items: &[ResponseItem]) {
|
||||
for item in items {
|
||||
self.send_event(
|
||||
@@ -4551,6 +4620,7 @@ mod handlers {
|
||||
pub async fn shutdown(sess: &Arc<Session>, sub_id: String) -> bool {
|
||||
sess.abort_all_tasks(TurnAbortReason::Interrupted).await;
|
||||
let _ = sess.conversation.shutdown().await;
|
||||
sess.emit_realtime_end_instructions().await;
|
||||
sess.services
|
||||
.unified_exec_manager
|
||||
.terminate_all_processes()
|
||||
@@ -8922,6 +8992,39 @@ mod tests {
|
||||
assert!(environment_update.contains("<timezone>Europe/Berlin</timezone>"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn shutdown_emits_realtime_end_instructions() {
|
||||
let (session, _turn_context, _rx) = make_session_and_context_with_rx().await;
|
||||
session.emit_realtime_start_instructions().await;
|
||||
assert!(session.realtime_instructions_active().await);
|
||||
|
||||
assert!(handlers::shutdown(&session, "shutdown-test".to_string()).await);
|
||||
assert!(!session.realtime_instructions_active().await);
|
||||
|
||||
let developer_texts = session
|
||||
.clone_history()
|
||||
.await
|
||||
.raw_items()
|
||||
.iter()
|
||||
.filter_map(|item| match item {
|
||||
ResponseItem::Message { role, content, .. } if role == "developer" => {
|
||||
content.iter().find_map(|content| match content {
|
||||
ContentItem::InputText { text } => Some(text.clone()),
|
||||
_ => None,
|
||||
})
|
||||
}
|
||||
_ => None,
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(
|
||||
developer_texts,
|
||||
vec![
|
||||
REALTIME_START_DEVELOPER_INSTRUCTIONS.to_string(),
|
||||
REALTIME_END_DEVELOPER_INSTRUCTIONS.to_string(),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn record_context_updates_and_set_reference_context_item_injects_full_context_when_baseline_missing()
|
||||
{
|
||||
|
||||
@@ -54,11 +54,7 @@ fn build_collaboration_mode_update_item(
|
||||
) -> Option<DeveloperInstructions> {
|
||||
let prev = previous?;
|
||||
if prev.collaboration_mode.as_ref() != Some(&next.collaboration_mode) {
|
||||
// If the next mode has empty developer instructions, this returns None and we emit no
|
||||
// update, so prior collaboration instructions remain in the prompt history.
|
||||
Some(DeveloperInstructions::from_collaboration_mode(
|
||||
&next.collaboration_mode,
|
||||
)?)
|
||||
DeveloperInstructions::from_collaboration_mode(&next.collaboration_mode)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
|
||||
@@ -186,6 +186,9 @@ pub(crate) async fn handle_start(
|
||||
.session_id
|
||||
.or_else(|| Some(sess.conversation_id.to_string()));
|
||||
info!("starting realtime conversation");
|
||||
if sess.realtime_instructions_active().await {
|
||||
sess.emit_realtime_end_instructions().await;
|
||||
}
|
||||
let events_rx = match sess
|
||||
.conversation
|
||||
.start(api_provider, None, prompt, requested_session_id.clone())
|
||||
@@ -198,6 +201,7 @@ pub(crate) async fn handle_start(
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
sess.emit_realtime_start_instructions().await;
|
||||
|
||||
info!("realtime conversation started");
|
||||
|
||||
@@ -238,13 +242,13 @@ pub(crate) async fn handle_start(
|
||||
}
|
||||
if let Some(()) = sess_clone.conversation.running_state().await {
|
||||
info!("realtime conversation transport closed");
|
||||
sess_clone
|
||||
.send_event_raw(ev(EventMsg::RealtimeConversationClosed(
|
||||
RealtimeConversationClosedEvent {
|
||||
reason: Some("transport_closed".to_string()),
|
||||
},
|
||||
)))
|
||||
.await;
|
||||
close_realtime_and_emit_end_instruction(
|
||||
&sess_clone,
|
||||
ev,
|
||||
"transport_closed",
|
||||
Some(sub_id.clone()),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
});
|
||||
|
||||
@@ -298,20 +302,44 @@ pub(crate) async fn handle_text(
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_close(sess: &Arc<Session>, sub_id: String) {
|
||||
match sess.conversation.shutdown().await {
|
||||
Ok(()) => {
|
||||
sess.send_event_raw(Event {
|
||||
id: sub_id,
|
||||
msg: EventMsg::RealtimeConversationClosed(RealtimeConversationClosedEvent {
|
||||
reason: Some("requested".to_string()),
|
||||
}),
|
||||
})
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
let error_sub_id = sub_id.clone();
|
||||
close_realtime_and_emit_end_instruction(
|
||||
sess,
|
||||
|msg| Event {
|
||||
id: sub_id.clone(),
|
||||
msg,
|
||||
},
|
||||
"requested",
|
||||
Some(error_sub_id),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn close_realtime_and_emit_end_instruction<F>(
|
||||
sess: &Arc<Session>,
|
||||
event_builder: F,
|
||||
reason: &str,
|
||||
error_sub_id: Option<String>,
|
||||
) where
|
||||
F: Fn(EventMsg) -> Event,
|
||||
{
|
||||
if let Err(err) = sess.conversation.shutdown().await {
|
||||
if let Some(sub_id) = error_sub_id {
|
||||
send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::Other).await;
|
||||
} else {
|
||||
warn!("failed to shutdown realtime conversation: {err}");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
sess.emit_realtime_end_instructions().await;
|
||||
|
||||
sess.send_event_raw(event_builder(EventMsg::RealtimeConversationClosed(
|
||||
RealtimeConversationClosedEvent {
|
||||
reason: Some(reason.to_string()),
|
||||
},
|
||||
)))
|
||||
.await;
|
||||
}
|
||||
|
||||
fn spawn_realtime_input_task(
|
||||
|
||||
@@ -19,6 +19,7 @@ use codex_protocol::protocol::TurnContextItem;
|
||||
pub(crate) struct SessionState {
|
||||
pub(crate) session_configuration: SessionConfiguration,
|
||||
pub(crate) history: ContextManager,
|
||||
pub(crate) realtime_instructions_active: bool,
|
||||
pub(crate) latest_rate_limits: Option<RateLimitSnapshot>,
|
||||
pub(crate) server_reasoning_included: bool,
|
||||
pub(crate) dependency_env: HashMap<String, String>,
|
||||
@@ -40,6 +41,7 @@ impl SessionState {
|
||||
Self {
|
||||
session_configuration,
|
||||
history,
|
||||
realtime_instructions_active: false,
|
||||
latest_rate_limits: None,
|
||||
server_reasoning_included: false,
|
||||
dependency_env: HashMap::new(),
|
||||
@@ -71,6 +73,14 @@ impl SessionState {
|
||||
self.history.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn realtime_instructions_active(&self) -> bool {
|
||||
self.realtime_instructions_active
|
||||
}
|
||||
|
||||
pub(crate) fn set_realtime_instructions_active(&mut self, active: bool) {
|
||||
self.realtime_instructions_active = active;
|
||||
}
|
||||
|
||||
pub(crate) fn replace_history(
|
||||
&mut self,
|
||||
items: Vec<ResponseItem>,
|
||||
|
||||
@@ -544,6 +544,88 @@ async fn collaboration_mode_update_emits_new_instruction_message_when_mode_chang
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn collaboration_mode_update_emits_empty_marker_when_instructions_clear() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let _req1 = mount_sse_once(
|
||||
&server,
|
||||
sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]),
|
||||
)
|
||||
.await;
|
||||
let req2 = mount_sse_once(
|
||||
&server,
|
||||
sse(vec![ev_response_created("resp-2"), ev_completed("resp-2")]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let test = test_codex().build(&server).await?;
|
||||
let plan_text = "plan mode instructions";
|
||||
|
||||
test.codex
|
||||
.submit(Op::OverrideTurnContext {
|
||||
cwd: None,
|
||||
approval_policy: None,
|
||||
sandbox_policy: None,
|
||||
windows_sandbox_level: None,
|
||||
model: None,
|
||||
effort: None,
|
||||
summary: None,
|
||||
collaboration_mode: Some(collab_mode_with_mode_and_instructions(
|
||||
ModeKind::Plan,
|
||||
Some(plan_text),
|
||||
)),
|
||||
personality: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
test.codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "hello 1".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await?;
|
||||
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
test.codex
|
||||
.submit(Op::OverrideTurnContext {
|
||||
cwd: None,
|
||||
approval_policy: None,
|
||||
sandbox_policy: None,
|
||||
windows_sandbox_level: None,
|
||||
model: None,
|
||||
effort: None,
|
||||
summary: None,
|
||||
collaboration_mode: Some(collab_mode_with_mode_and_instructions(
|
||||
ModeKind::Default,
|
||||
None,
|
||||
)),
|
||||
personality: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
test.codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "hello 2".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await?;
|
||||
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
let input = req2.single_request().input();
|
||||
let dev_texts = developer_texts(&input);
|
||||
assert_eq!(count_exact(&dev_texts, &collab_xml(plan_text)), 1);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn collaboration_mode_update_noop_does_not_append_when_mode_is_unchanged() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
@@ -700,58 +782,3 @@ async fn resume_replays_collaboration_instructions() -> Result<()> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn empty_collaboration_instructions_are_ignored() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let req = mount_sse_once(
|
||||
&server,
|
||||
sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let test = test_codex().build(&server).await?;
|
||||
let current_model = test.session_configured.model.clone();
|
||||
|
||||
test.codex
|
||||
.submit(Op::OverrideTurnContext {
|
||||
cwd: None,
|
||||
approval_policy: None,
|
||||
sandbox_policy: None,
|
||||
windows_sandbox_level: None,
|
||||
model: None,
|
||||
effort: None,
|
||||
summary: None,
|
||||
collaboration_mode: Some(CollaborationMode {
|
||||
mode: ModeKind::Default,
|
||||
settings: Settings {
|
||||
model: current_model,
|
||||
reasoning_effort: None,
|
||||
developer_instructions: Some("".to_string()),
|
||||
},
|
||||
}),
|
||||
personality: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
test.codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "hello".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await?;
|
||||
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
let input = req.single_request().input();
|
||||
let dev_texts = developer_texts(&input);
|
||||
assert_eq!(dev_texts.len(), 1);
|
||||
let collab_text = collab_xml("");
|
||||
assert_eq!(count_exact(&dev_texts, &collab_text), 0);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -25,6 +25,25 @@ use serde_json::json;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
fn developer_texts(input: &[Value]) -> Vec<String> {
|
||||
input
|
||||
.iter()
|
||||
.filter_map(|item| {
|
||||
let role = item.get("role")?.as_str()?;
|
||||
if role != "developer" {
|
||||
return None;
|
||||
}
|
||||
let text = item
|
||||
.get("content")?
|
||||
.as_array()?
|
||||
.first()?
|
||||
.get("text")?
|
||||
.as_str()?;
|
||||
Some(text.to_string())
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn conversation_start_audio_text_close_round_trip() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
@@ -76,7 +95,6 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> {
|
||||
.await
|
||||
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
|
||||
assert!(started.session_id.is_some());
|
||||
|
||||
let session_created = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionCreated { session_id },
|
||||
@@ -157,11 +175,65 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> {
|
||||
closed.reason.as_deref(),
|
||||
Some("requested" | "transport_closed")
|
||||
));
|
||||
|
||||
server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn conversation_start_persists_realtime_start_instruction_into_next_turn() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let api_server = start_mock_server().await;
|
||||
let response_mock = responses::mount_sse_once(
|
||||
&api_server,
|
||||
responses::sse(vec![
|
||||
responses::ev_response_created("resp_1"),
|
||||
responses::ev_completed("resp_1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let realtime_server = start_websocket_server(vec![vec![vec![json!({
|
||||
"type": "session.created",
|
||||
"session": { "id": "sess_1" }
|
||||
})]]])
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex().with_config({
|
||||
let realtime_base_url = realtime_server.uri().to_string();
|
||||
move |config| {
|
||||
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
||||
}
|
||||
});
|
||||
let test = builder.build(&api_server).await?;
|
||||
|
||||
test.codex
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
}))
|
||||
.await?;
|
||||
|
||||
let _session_created = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionCreated { session_id },
|
||||
}) => Some(session_id.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
|
||||
test.submit_turn("hello").await?;
|
||||
|
||||
let dev_texts = developer_texts(&response_mock.single_request().input());
|
||||
assert!(dev_texts.iter().any(|text| {
|
||||
text.contains("Realtime conversation handling is now active.")
|
||||
&& text.contains("This instruction applies only while realtime is active.")
|
||||
}));
|
||||
|
||||
realtime_server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn conversation_transport_close_emits_closed_event() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
@@ -191,7 +263,6 @@ async fn conversation_transport_close_emits_closed_event() -> Result<()> {
|
||||
.await
|
||||
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
|
||||
assert!(started.session_id.is_some());
|
||||
|
||||
let session_created = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionCreated { session_id },
|
||||
@@ -207,11 +278,72 @@ async fn conversation_transport_close_emits_closed_event() -> Result<()> {
|
||||
})
|
||||
.await;
|
||||
assert_eq!(closed.reason.as_deref(), Some("transport_closed"));
|
||||
|
||||
server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn conversation_close_persists_realtime_end_instruction_into_next_turn() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let api_server = start_mock_server().await;
|
||||
let response_mock = responses::mount_sse_once(
|
||||
&api_server,
|
||||
responses::sse(vec![
|
||||
responses::ev_response_created("resp_1"),
|
||||
responses::ev_completed("resp_1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let realtime_server = start_websocket_server(vec![vec![vec![json!({
|
||||
"type": "session.created",
|
||||
"session": { "id": "sess_1" }
|
||||
})]]])
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex().with_config({
|
||||
let realtime_base_url = realtime_server.uri().to_string();
|
||||
move |config| {
|
||||
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
||||
}
|
||||
});
|
||||
let test = builder.build(&api_server).await?;
|
||||
|
||||
test.codex
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
}))
|
||||
.await?;
|
||||
|
||||
let _session_created = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionCreated { session_id },
|
||||
}) => Some(session_id.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
|
||||
test.codex.submit(Op::RealtimeConversationClose).await?;
|
||||
let _closed = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationClosed(closed) => Some(closed.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
|
||||
test.submit_turn("hello").await?;
|
||||
|
||||
let dev_texts = developer_texts(&response_mock.single_request().input());
|
||||
assert!(dev_texts.iter().any(|text| {
|
||||
text.contains("Realtime conversation handling has ended.")
|
||||
&& text.contains("realtime-specific developer instruction no longer applies")
|
||||
}));
|
||||
|
||||
realtime_server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn conversation_audio_before_start_emits_error() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
@@ -715,7 +847,6 @@ async fn inbound_realtime_text_ignores_user_role_and_still_forwards_audio() -> R
|
||||
async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_audio() -> Result<()>
|
||||
{
|
||||
skip_if_no_network!(Ok(()));
|
||||
let start = std::time::Instant::now();
|
||||
|
||||
let (gate_completed_tx, gate_completed_rx) = oneshot::channel();
|
||||
let first_chunks = vec![
|
||||
@@ -807,22 +938,9 @@ async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_au
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
eprintln!(
|
||||
"[realtime test +{}ms] saw trigger text={:?}",
|
||||
start.elapsed().as_millis(),
|
||||
"delegate now"
|
||||
);
|
||||
|
||||
let mirrored_request = realtime_server.wait_for_request(0, 1).await;
|
||||
let mirrored_request_body = mirrored_request.body_json();
|
||||
eprintln!(
|
||||
"[realtime test +{}ms] saw mirrored request type={:?} role={:?} text={:?} data={:?}",
|
||||
start.elapsed().as_millis(),
|
||||
mirrored_request_body["type"].as_str(),
|
||||
mirrored_request_body["item"]["role"].as_str(),
|
||||
mirrored_request_body["item"]["content"][0]["text"].as_str(),
|
||||
mirrored_request_body["item"]["content"][0]["data"].as_str(),
|
||||
);
|
||||
assert_eq!(
|
||||
mirrored_request_body["type"].as_str(),
|
||||
Some("conversation.item.create")
|
||||
@@ -839,13 +957,6 @@ async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_au
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
eprintln!(
|
||||
"[realtime test +{}ms] saw audio out data={} sample_rate={} num_channels={}",
|
||||
start.elapsed().as_millis(),
|
||||
audio_out.data,
|
||||
audio_out.sample_rate,
|
||||
audio_out.num_channels
|
||||
);
|
||||
assert_eq!(audio_out.data, "AQID");
|
||||
|
||||
let completion = completions
|
||||
@@ -856,10 +967,6 @@ async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_au
|
||||
completion
|
||||
.await
|
||||
.expect("delegated turn request did not complete");
|
||||
eprintln!(
|
||||
"[realtime test +{}ms] delegated completion resolved",
|
||||
start.elapsed().as_millis()
|
||||
);
|
||||
wait_for_event(&test.codex, |event| {
|
||||
matches!(event, EventMsg::TurnComplete(_))
|
||||
})
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
---
|
||||
source: core/tests/suite/model_visible_layout.rs
|
||||
assertion_line: 361
|
||||
expression: "format_labeled_requests_snapshot(\"First post-resume turn where resumed config model differs from rollout and personality changes.\",\n&[(\"Last Request Before Resume\", &initial_request),\n(\"First Request After Resume\", &resumed_request),])"
|
||||
---
|
||||
Scenario: First post-resume turn where resumed config model differs from rollout and personality changes.
|
||||
@@ -18,8 +19,9 @@ Scenario: First post-resume turn where resumed config model differs from rollout
|
||||
[02] <ENVIRONMENT_CONTEXT:cwd=<CWD>>
|
||||
02:message/user:seed resume history
|
||||
03:message/assistant:recorded before resume
|
||||
04:message/developer[2]:
|
||||
04:message/developer[3]:
|
||||
[01] <model_switch>\nThe user was previously using a different model. Please continue the conversatio...
|
||||
[02] <PERMISSIONS_INSTRUCTIONS>
|
||||
[03] <collaboration_mode></collaboration_mode>
|
||||
05:message/user:<ENVIRONMENT_CONTEXT:cwd=PRETURN_CONTEXT_DIFF_CWD>
|
||||
06:message/user:resume and change personality
|
||||
|
||||
Reference in New Issue
Block a user