Compare commits

...

4 Commits

Author SHA1 Message Date
Ahmed Ibrahim
8d4dcadd7c Use transient developer instructions for realtime sessions 2026-02-27 11:35:25 -08:00
Ahmed Ibrahim
be2529b0eb Remove empty collaboration update fallback 2026-02-27 10:16:48 -08:00
Ahmed Ibrahim
1cd8d5b717 Update app-server schema fixtures for realtime mode 2026-02-27 10:16:48 -08:00
Ahmed Ibrahim
faeec29511 Add internal realtime collaboration mode 2026-02-27 10:16:48 -08:00
7 changed files with 381 additions and 108 deletions

View File

@@ -162,6 +162,10 @@ pub enum SteerInputError {
ExpectedTurnMismatch { expected: String, actual: String },
EmptyInput,
}
const REALTIME_START_DEVELOPER_INSTRUCTIONS: &str = "Realtime conversation handling is now active. Keep responses concise, low-latency, interruption-safe, and operationally safe for live interaction. This instruction applies only while realtime is active.";
const REALTIME_END_DEVELOPER_INSTRUCTIONS: &str = "Realtime conversation handling has ended. The realtime-specific developer instruction no longer applies.";
use crate::exec_policy::ExecPolicyUpdateError;
use crate::feedback_tags;
use crate::file_watcher::FileWatcher;
@@ -3035,6 +3039,71 @@ impl Session {
state.session_configuration.collaboration_mode.clone()
}
pub(crate) async fn realtime_instructions_active(&self) -> bool {
let state = self.state.lock().await;
state.realtime_instructions_active()
}
pub(crate) async fn emit_realtime_start_instructions(&self) {
if self.realtime_instructions_active().await {
return;
}
self.record_session_developer_message(REALTIME_START_DEVELOPER_INSTRUCTIONS)
.await;
let mut state = self.state.lock().await;
state.set_realtime_instructions_active(true);
}
pub(crate) async fn emit_realtime_end_instructions(&self) {
if !self.realtime_instructions_active().await {
return;
}
self.record_session_developer_message(REALTIME_END_DEVELOPER_INSTRUCTIONS)
.await;
let mut state = self.state.lock().await;
state.set_realtime_instructions_active(false);
}
async fn record_session_developer_message(&self, text: &str) {
let message: ResponseItem = DeveloperInstructions::new(text.to_string()).into();
let active_turn_context = {
let active = self.active_turn.lock().await;
active
.as_ref()
.and_then(|active_turn| active_turn.tasks.first())
.map(|(_, task)| Arc::clone(&task.turn_context))
};
if let Some(turn_context) = active_turn_context {
self.record_conversation_items(turn_context.as_ref(), std::slice::from_ref(&message))
.await;
return;
}
let session_configuration = {
let state = self.state.lock().await;
state.session_configuration.clone()
};
let per_turn_config = Self::build_per_turn_config(&session_configuration);
let model_info = self
.services
.models_manager
.get_model_info(
session_configuration.collaboration_mode.model(),
&per_turn_config,
)
.await;
{
let mut state = self.state.lock().await;
state.record_items(
std::slice::from_ref(&message).iter(),
model_info.truncation_policy.into(),
);
}
self.persist_rollout_response_items(std::slice::from_ref(&message))
.await;
}
async fn send_raw_response_items(&self, turn_context: &TurnContext, items: &[ResponseItem]) {
for item in items {
self.send_event(
@@ -4551,6 +4620,7 @@ mod handlers {
pub async fn shutdown(sess: &Arc<Session>, sub_id: String) -> bool {
sess.abort_all_tasks(TurnAbortReason::Interrupted).await;
let _ = sess.conversation.shutdown().await;
sess.emit_realtime_end_instructions().await;
sess.services
.unified_exec_manager
.terminate_all_processes()
@@ -8922,6 +8992,39 @@ mod tests {
assert!(environment_update.contains("<timezone>Europe/Berlin</timezone>"));
}
#[tokio::test]
async fn shutdown_emits_realtime_end_instructions() {
let (session, _turn_context, _rx) = make_session_and_context_with_rx().await;
session.emit_realtime_start_instructions().await;
assert!(session.realtime_instructions_active().await);
assert!(handlers::shutdown(&session, "shutdown-test".to_string()).await);
assert!(!session.realtime_instructions_active().await);
let developer_texts = session
.clone_history()
.await
.raw_items()
.iter()
.filter_map(|item| match item {
ResponseItem::Message { role, content, .. } if role == "developer" => {
content.iter().find_map(|content| match content {
ContentItem::InputText { text } => Some(text.clone()),
_ => None,
})
}
_ => None,
})
.collect::<Vec<_>>();
assert_eq!(
developer_texts,
vec![
REALTIME_START_DEVELOPER_INSTRUCTIONS.to_string(),
REALTIME_END_DEVELOPER_INSTRUCTIONS.to_string(),
]
);
}
#[tokio::test]
async fn record_context_updates_and_set_reference_context_item_injects_full_context_when_baseline_missing()
{

View File

@@ -54,11 +54,7 @@ fn build_collaboration_mode_update_item(
) -> Option<DeveloperInstructions> {
let prev = previous?;
if prev.collaboration_mode.as_ref() != Some(&next.collaboration_mode) {
// If the next mode has empty developer instructions, this returns None and we emit no
// update, so prior collaboration instructions remain in the prompt history.
Some(DeveloperInstructions::from_collaboration_mode(
&next.collaboration_mode,
)?)
DeveloperInstructions::from_collaboration_mode(&next.collaboration_mode)
} else {
None
}

View File

@@ -186,6 +186,9 @@ pub(crate) async fn handle_start(
.session_id
.or_else(|| Some(sess.conversation_id.to_string()));
info!("starting realtime conversation");
if sess.realtime_instructions_active().await {
sess.emit_realtime_end_instructions().await;
}
let events_rx = match sess
.conversation
.start(api_provider, None, prompt, requested_session_id.clone())
@@ -198,6 +201,7 @@ pub(crate) async fn handle_start(
return Ok(());
}
};
sess.emit_realtime_start_instructions().await;
info!("realtime conversation started");
@@ -238,13 +242,13 @@ pub(crate) async fn handle_start(
}
if let Some(()) = sess_clone.conversation.running_state().await {
info!("realtime conversation transport closed");
sess_clone
.send_event_raw(ev(EventMsg::RealtimeConversationClosed(
RealtimeConversationClosedEvent {
reason: Some("transport_closed".to_string()),
},
)))
.await;
close_realtime_and_emit_end_instruction(
&sess_clone,
ev,
"transport_closed",
Some(sub_id.clone()),
)
.await;
}
});
@@ -298,20 +302,44 @@ pub(crate) async fn handle_text(
}
pub(crate) async fn handle_close(sess: &Arc<Session>, sub_id: String) {
match sess.conversation.shutdown().await {
Ok(()) => {
sess.send_event_raw(Event {
id: sub_id,
msg: EventMsg::RealtimeConversationClosed(RealtimeConversationClosedEvent {
reason: Some("requested".to_string()),
}),
})
.await;
}
Err(err) => {
let error_sub_id = sub_id.clone();
close_realtime_and_emit_end_instruction(
sess,
|msg| Event {
id: sub_id.clone(),
msg,
},
"requested",
Some(error_sub_id),
)
.await;
}
async fn close_realtime_and_emit_end_instruction<F>(
sess: &Arc<Session>,
event_builder: F,
reason: &str,
error_sub_id: Option<String>,
) where
F: Fn(EventMsg) -> Event,
{
if let Err(err) = sess.conversation.shutdown().await {
if let Some(sub_id) = error_sub_id {
send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::Other).await;
} else {
warn!("failed to shutdown realtime conversation: {err}");
}
return;
}
sess.emit_realtime_end_instructions().await;
sess.send_event_raw(event_builder(EventMsg::RealtimeConversationClosed(
RealtimeConversationClosedEvent {
reason: Some(reason.to_string()),
},
)))
.await;
}
fn spawn_realtime_input_task(

View File

@@ -19,6 +19,7 @@ use codex_protocol::protocol::TurnContextItem;
pub(crate) struct SessionState {
pub(crate) session_configuration: SessionConfiguration,
pub(crate) history: ContextManager,
pub(crate) realtime_instructions_active: bool,
pub(crate) latest_rate_limits: Option<RateLimitSnapshot>,
pub(crate) server_reasoning_included: bool,
pub(crate) dependency_env: HashMap<String, String>,
@@ -40,6 +41,7 @@ impl SessionState {
Self {
session_configuration,
history,
realtime_instructions_active: false,
latest_rate_limits: None,
server_reasoning_included: false,
dependency_env: HashMap::new(),
@@ -71,6 +73,14 @@ impl SessionState {
self.history.clone()
}
pub(crate) fn realtime_instructions_active(&self) -> bool {
self.realtime_instructions_active
}
pub(crate) fn set_realtime_instructions_active(&mut self, active: bool) {
self.realtime_instructions_active = active;
}
pub(crate) fn replace_history(
&mut self,
items: Vec<ResponseItem>,

View File

@@ -544,6 +544,88 @@ async fn collaboration_mode_update_emits_new_instruction_message_when_mode_chang
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn collaboration_mode_update_emits_empty_marker_when_instructions_clear() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let _req1 = mount_sse_once(
&server,
sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]),
)
.await;
let req2 = mount_sse_once(
&server,
sse(vec![ev_response_created("resp-2"), ev_completed("resp-2")]),
)
.await;
let test = test_codex().build(&server).await?;
let plan_text = "plan mode instructions";
test.codex
.submit(Op::OverrideTurnContext {
cwd: None,
approval_policy: None,
sandbox_policy: None,
windows_sandbox_level: None,
model: None,
effort: None,
summary: None,
collaboration_mode: Some(collab_mode_with_mode_and_instructions(
ModeKind::Plan,
Some(plan_text),
)),
personality: None,
})
.await?;
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "hello 1".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
test.codex
.submit(Op::OverrideTurnContext {
cwd: None,
approval_policy: None,
sandbox_policy: None,
windows_sandbox_level: None,
model: None,
effort: None,
summary: None,
collaboration_mode: Some(collab_mode_with_mode_and_instructions(
ModeKind::Default,
None,
)),
personality: None,
})
.await?;
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "hello 2".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let input = req2.single_request().input();
let dev_texts = developer_texts(&input);
assert_eq!(count_exact(&dev_texts, &collab_xml(plan_text)), 1);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn collaboration_mode_update_noop_does_not_append_when_mode_is_unchanged() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -700,58 +782,3 @@ async fn resume_replays_collaboration_instructions() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn empty_collaboration_instructions_are_ignored() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let req = mount_sse_once(
&server,
sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]),
)
.await;
let test = test_codex().build(&server).await?;
let current_model = test.session_configured.model.clone();
test.codex
.submit(Op::OverrideTurnContext {
cwd: None,
approval_policy: None,
sandbox_policy: None,
windows_sandbox_level: None,
model: None,
effort: None,
summary: None,
collaboration_mode: Some(CollaborationMode {
mode: ModeKind::Default,
settings: Settings {
model: current_model,
reasoning_effort: None,
developer_instructions: Some("".to_string()),
},
}),
personality: None,
})
.await?;
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "hello".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let input = req.single_request().input();
let dev_texts = developer_texts(&input);
assert_eq!(dev_texts.len(), 1);
let collab_text = collab_xml("");
assert_eq!(count_exact(&dev_texts, &collab_text), 0);
Ok(())
}

View File

@@ -25,6 +25,25 @@ use serde_json::json;
use std::time::Duration;
use tokio::sync::oneshot;
fn developer_texts(input: &[Value]) -> Vec<String> {
input
.iter()
.filter_map(|item| {
let role = item.get("role")?.as_str()?;
if role != "developer" {
return None;
}
let text = item
.get("content")?
.as_array()?
.first()?
.get("text")?
.as_str()?;
Some(text.to_string())
})
.collect()
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_start_audio_text_close_round_trip() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -76,7 +95,6 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> {
.await
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
assert!(started.session_id.is_some());
let session_created = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionCreated { session_id },
@@ -157,11 +175,65 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> {
closed.reason.as_deref(),
Some("requested" | "transport_closed")
));
server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_start_persists_realtime_start_instruction_into_next_turn() -> Result<()> {
skip_if_no_network!(Ok(()));
let api_server = start_mock_server().await;
let response_mock = responses::mount_sse_once(
&api_server,
responses::sse(vec![
responses::ev_response_created("resp_1"),
responses::ev_completed("resp_1"),
]),
)
.await;
let realtime_server = start_websocket_server(vec![vec![vec![json!({
"type": "session.created",
"session": { "id": "sess_1" }
})]]])
.await;
let mut builder = test_codex().with_config({
let realtime_base_url = realtime_server.uri().to_string();
move |config| {
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
}
});
let test = builder.build(&api_server).await?;
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
}))
.await?;
let _session_created = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionCreated { session_id },
}) => Some(session_id.clone()),
_ => None,
})
.await;
test.submit_turn("hello").await?;
let dev_texts = developer_texts(&response_mock.single_request().input());
assert!(dev_texts.iter().any(|text| {
text.contains("Realtime conversation handling is now active.")
&& text.contains("This instruction applies only while realtime is active.")
}));
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_transport_close_emits_closed_event() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -191,7 +263,6 @@ async fn conversation_transport_close_emits_closed_event() -> Result<()> {
.await
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
assert!(started.session_id.is_some());
let session_created = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionCreated { session_id },
@@ -207,11 +278,72 @@ async fn conversation_transport_close_emits_closed_event() -> Result<()> {
})
.await;
assert_eq!(closed.reason.as_deref(), Some("transport_closed"));
server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_close_persists_realtime_end_instruction_into_next_turn() -> Result<()> {
skip_if_no_network!(Ok(()));
let api_server = start_mock_server().await;
let response_mock = responses::mount_sse_once(
&api_server,
responses::sse(vec![
responses::ev_response_created("resp_1"),
responses::ev_completed("resp_1"),
]),
)
.await;
let realtime_server = start_websocket_server(vec![vec![vec![json!({
"type": "session.created",
"session": { "id": "sess_1" }
})]]])
.await;
let mut builder = test_codex().with_config({
let realtime_base_url = realtime_server.uri().to_string();
move |config| {
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
}
});
let test = builder.build(&api_server).await?;
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
}))
.await?;
let _session_created = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionCreated { session_id },
}) => Some(session_id.clone()),
_ => None,
})
.await;
test.codex.submit(Op::RealtimeConversationClose).await?;
let _closed = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationClosed(closed) => Some(closed.clone()),
_ => None,
})
.await;
test.submit_turn("hello").await?;
let dev_texts = developer_texts(&response_mock.single_request().input());
assert!(dev_texts.iter().any(|text| {
text.contains("Realtime conversation handling has ended.")
&& text.contains("realtime-specific developer instruction no longer applies")
}));
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_audio_before_start_emits_error() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -715,7 +847,6 @@ async fn inbound_realtime_text_ignores_user_role_and_still_forwards_audio() -> R
async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_audio() -> Result<()>
{
skip_if_no_network!(Ok(()));
let start = std::time::Instant::now();
let (gate_completed_tx, gate_completed_rx) = oneshot::channel();
let first_chunks = vec![
@@ -807,22 +938,9 @@ async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_au
_ => None,
})
.await;
eprintln!(
"[realtime test +{}ms] saw trigger text={:?}",
start.elapsed().as_millis(),
"delegate now"
);
let mirrored_request = realtime_server.wait_for_request(0, 1).await;
let mirrored_request_body = mirrored_request.body_json();
eprintln!(
"[realtime test +{}ms] saw mirrored request type={:?} role={:?} text={:?} data={:?}",
start.elapsed().as_millis(),
mirrored_request_body["type"].as_str(),
mirrored_request_body["item"]["role"].as_str(),
mirrored_request_body["item"]["content"][0]["text"].as_str(),
mirrored_request_body["item"]["content"][0]["data"].as_str(),
);
assert_eq!(
mirrored_request_body["type"].as_str(),
Some("conversation.item.create")
@@ -839,13 +957,6 @@ async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_au
_ => None,
})
.await;
eprintln!(
"[realtime test +{}ms] saw audio out data={} sample_rate={} num_channels={}",
start.elapsed().as_millis(),
audio_out.data,
audio_out.sample_rate,
audio_out.num_channels
);
assert_eq!(audio_out.data, "AQID");
let completion = completions
@@ -856,10 +967,6 @@ async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_au
completion
.await
.expect("delegated turn request did not complete");
eprintln!(
"[realtime test +{}ms] delegated completion resolved",
start.elapsed().as_millis()
);
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})

View File

@@ -1,5 +1,6 @@
---
source: core/tests/suite/model_visible_layout.rs
assertion_line: 361
expression: "format_labeled_requests_snapshot(\"First post-resume turn where resumed config model differs from rollout and personality changes.\",\n&[(\"Last Request Before Resume\", &initial_request),\n(\"First Request After Resume\", &resumed_request),])"
---
Scenario: First post-resume turn where resumed config model differs from rollout and personality changes.
@@ -18,8 +19,9 @@ Scenario: First post-resume turn where resumed config model differs from rollout
[02] <ENVIRONMENT_CONTEXT:cwd=<CWD>>
02:message/user:seed resume history
03:message/assistant:recorded before resume
04:message/developer[2]:
04:message/developer[3]:
[01] <model_switch>\nThe user was previously using a different model. Please continue the conversatio...
[02] <PERMISSIONS_INSTRUCTIONS>
[03] <collaboration_mode></collaboration_mode>
05:message/user:<ENVIRONMENT_CONTEXT:cwd=PRETURN_CONTEXT_DIFF_CWD>
06:message/user:resume and change personality