Compare commits

...

2 Commits

Author SHA1 Message Date
Ahmed Ibrahim
571889d8ab Mirror send_event text into realtime for exec and apply_patch 2026-02-20 20:28:13 -08:00
Ahmed Ibrahim
0fe7e9515c Add experimental realtime websocket backend prompt override 2026-02-20 20:00:38 -08:00
5 changed files with 246 additions and 10 deletions

View File

@@ -1541,6 +1541,10 @@
"experimental_compact_prompt_file": {
"$ref": "#/definitions/AbsolutePathBuf"
},
"experimental_realtime_ws_backend_prompt": {
"description": "Experimental / do not use. Overrides only the realtime conversation websocket transport backend prompt (the `Op::RealtimeConversation` `/ws` session.create backend_prompt) without changing normal prompts.",
"type": "string"
},
"experimental_realtime_ws_base_url": {
"description": "Experimental / do not use. Overrides only the realtime conversation websocket transport base URL (the `Op::RealtimeConversation` `/ws` connection) without changing normal provider HTTP requests.",
"type": "string"

View File

@@ -182,6 +182,7 @@ use crate::protocol::ModelRerouteEvent;
use crate::protocol::ModelRerouteReason;
use crate::protocol::NetworkApprovalContext;
use crate::protocol::Op;
use crate::protocol::PatchApplyStatus;
use crate::protocol::PlanDeltaEvent;
use crate::protocol::RateLimitSnapshot;
use crate::protocol::ReasoningContentDeltaEvent;
@@ -2075,6 +2076,8 @@ impl Session {
msg,
};
self.send_event_raw(event).await;
self.maybe_mirror_event_text_to_realtime(&legacy_source)
.await;
let show_raw_agent_reasoning = self.show_raw_agent_reasoning();
for legacy in legacy_source.as_legacy_events(show_raw_agent_reasoning) {
@@ -2086,6 +2089,18 @@ impl Session {
}
}
async fn maybe_mirror_event_text_to_realtime(&self, msg: &EventMsg) {
let Some(text) = realtime_text_for_event(msg) else {
return;
};
if self.conversation.running_state().await.is_none() {
return;
}
if let Err(err) = self.conversation.text_in(text).await {
debug!("failed to mirror event text to realtime conversation: {err}");
}
}
pub(crate) async fn send_event_raw(&self, event: Event) {
// Record the last known agent status.
if let Some(status) = agent_status_from_event(&event.msg) {
@@ -5263,6 +5278,51 @@ fn agent_message_text(item: &codex_protocol::items::AgentMessageItem) -> String
.collect()
}
fn realtime_text_for_event(msg: &EventMsg) -> Option<String> {
match msg {
EventMsg::ExecCommandBegin(event) => {
let command = event.command.join(" ");
Some(format!(
"Exec command started: {command}\nWorking directory: {}",
event.cwd.display()
))
}
EventMsg::PatchApplyBegin(event) => {
let mut files: Vec<String> = event
.changes
.keys()
.map(|path| path.display().to_string())
.collect();
files.sort();
let file_list = if files.is_empty() {
"none".to_string()
} else {
files.join(", ")
};
Some(format!(
"apply_patch started ({count} file change(s))\nFiles: {file_list}",
count = files.len()
))
}
EventMsg::PatchApplyEnd(event) => {
let status = match event.status {
PatchApplyStatus::Completed => "completed",
PatchApplyStatus::Failed => "failed",
PatchApplyStatus::Declined => "declined",
};
let mut text = format!("apply_patch {status}");
if !event.stdout.is_empty() {
text.push_str(&format!("\nstdout:\n{}", event.stdout));
}
if !event.stderr.is_empty() {
text.push_str(&format!("\nstderr:\n{}", event.stderr));
}
Some(text)
}
_ => None,
}
}
/// Split the stream into normal assistant text vs. proposed plan content.
/// Normal text becomes AgentMessage deltas; plan content becomes PlanDelta +
/// TurnItem::Plan.
@@ -5848,13 +5908,22 @@ mod tests {
use crate::protocol::CompactedItem;
use crate::protocol::CreditsSnapshot;
use crate::protocol::ExecCommandBeginEvent;
use crate::protocol::ExecCommandOutputDeltaEvent;
use crate::protocol::ExecCommandSource;
use crate::protocol::ExecOutputStream;
use crate::protocol::InitialHistory;
use crate::protocol::PatchApplyBeginEvent;
use crate::protocol::PatchApplyEndEvent;
use crate::protocol::PatchApplyStatus;
use crate::protocol::RateLimitSnapshot;
use crate::protocol::RateLimitWindow;
use crate::protocol::ResumedHistory;
use crate::protocol::TerminalInteractionEvent;
use crate::protocol::TokenCountEvent;
use crate::protocol::TokenUsage;
use crate::protocol::TokenUsageInfo;
use crate::protocol::TurnDiffEvent;
use crate::state::TaskKind;
use crate::tasks::SessionTask;
use crate::tasks::SessionTaskContext;
@@ -5934,6 +6003,87 @@ mod tests {
}
}
#[test]
fn realtime_text_for_event_includes_exec_command_begin() {
let event = EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
call_id: "call-1".to_string(),
process_id: None,
turn_id: "turn-1".to_string(),
command: vec!["echo".to_string(), "hi".to_string()],
cwd: PathBuf::from("/tmp"),
parsed_cmd: Vec::new(),
source: ExecCommandSource::Agent,
interaction_input: None,
});
let text = realtime_text_for_event(&event).expect("expected mirrored text");
assert!(text.contains("Exec command started: echo hi"));
assert!(text.contains("Working directory: /tmp"));
}
#[test]
fn realtime_text_for_event_includes_patch_apply_begin_and_end() {
let mut changes = HashMap::new();
changes.insert(
PathBuf::from("b.txt"),
FileChange::Add {
content: "b".to_string(),
},
);
changes.insert(
PathBuf::from("a.txt"),
FileChange::Delete {
content: "a".to_string(),
},
);
let begin = EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
call_id: "patch-1".to_string(),
turn_id: "turn-1".to_string(),
auto_approved: true,
changes: changes.clone(),
});
let begin_text = realtime_text_for_event(&begin).expect("expected patch begin text");
assert!(begin_text.contains("apply_patch started (2 file change(s))"));
assert!(begin_text.contains("Files: a.txt, b.txt"));
let end = EventMsg::PatchApplyEnd(PatchApplyEndEvent {
call_id: "patch-1".to_string(),
turn_id: "turn-1".to_string(),
stdout: "Updated files".to_string(),
stderr: "warning".to_string(),
success: true,
changes,
status: PatchApplyStatus::Completed,
});
let end_text = realtime_text_for_event(&end).expect("expected patch end text");
assert!(end_text.contains("apply_patch completed"));
assert!(end_text.contains("stdout:\nUpdated files"));
assert!(end_text.contains("stderr:\nwarning"));
}
#[test]
fn realtime_text_for_event_skips_turn_diff_and_shell_deltas() {
let turn_diff = EventMsg::TurnDiff(TurnDiffEvent {
unified_diff: "--- a\n+++ b\n".to_string(),
});
assert_eq!(realtime_text_for_event(&turn_diff), None);
let output_delta = EventMsg::ExecCommandOutputDelta(ExecCommandOutputDeltaEvent {
call_id: "call-1".to_string(),
stream: ExecOutputStream::Stdout,
chunk: b"hello".to_vec(),
});
assert_eq!(realtime_text_for_event(&output_delta), None);
let terminal_interaction = EventMsg::TerminalInteraction(TerminalInteractionEvent {
call_id: "call-1".to_string(),
process_id: "proc-1".to_string(),
stdin: "ls\n".to_string(),
});
assert_eq!(realtime_text_for_event(&terminal_interaction), None);
}
fn make_mcp_tool(
server_name: &str,
tool_name: &str,

View File

@@ -398,7 +398,10 @@ pub struct Config {
/// websocket transport base URL (the `Op::RealtimeConversation` `/ws`
/// connection) without changing normal provider HTTP requests.
pub experimental_realtime_ws_base_url: Option<String>,
/// Experimental / do not use. Overrides only the realtime conversation
/// websocket transport backend prompt (the `Op::RealtimeConversation`
/// `/ws` session.create backend_prompt) without changing normal prompts.
pub experimental_realtime_ws_backend_prompt: Option<String>,
/// When set, restricts ChatGPT login to a specific workspace identifier.
pub forced_chatgpt_workspace_id: Option<String>,
@@ -1128,7 +1131,10 @@ pub struct ConfigToml {
/// websocket transport base URL (the `Op::RealtimeConversation` `/ws`
/// connection) without changing normal provider HTTP requests.
pub experimental_realtime_ws_base_url: Option<String>,
/// Experimental / do not use. Overrides only the realtime conversation
/// websocket transport backend prompt (the `Op::RealtimeConversation`
/// `/ws` session.create backend_prompt) without changing normal prompts.
pub experimental_realtime_ws_backend_prompt: Option<String>,
pub projects: Option<HashMap<String, ProjectConfig>>,
/// Controls the web search tool mode: disabled, cached, or live.
@@ -2054,6 +2060,7 @@ impl Config {
.or(cfg.chatgpt_base_url)
.unwrap_or("https://chatgpt.com/backend-api/".to_string()),
experimental_realtime_ws_base_url: cfg.experimental_realtime_ws_base_url,
experimental_realtime_ws_backend_prompt: cfg.experimental_realtime_ws_backend_prompt,
forced_chatgpt_workspace_id,
forced_login_method,
include_apply_patch_tool: include_apply_patch_tool_flag,
@@ -4595,6 +4602,7 @@ model_verbosity = "high"
personality: Some(Personality::Pragmatic),
chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(),
experimental_realtime_ws_base_url: None,
experimental_realtime_ws_backend_prompt: None,
base_instructions: None,
developer_instructions: None,
compact_prompt: None,
@@ -4715,6 +4723,7 @@ model_verbosity = "high"
personality: Some(Personality::Pragmatic),
chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(),
experimental_realtime_ws_base_url: None,
experimental_realtime_ws_backend_prompt: None,
base_instructions: None,
developer_instructions: None,
compact_prompt: None,
@@ -4833,6 +4842,7 @@ model_verbosity = "high"
personality: Some(Personality::Pragmatic),
chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(),
experimental_realtime_ws_base_url: None,
experimental_realtime_ws_backend_prompt: None,
base_instructions: None,
developer_instructions: None,
compact_prompt: None,
@@ -4937,6 +4947,7 @@ model_verbosity = "high"
personality: Some(Personality::Pragmatic),
chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(),
experimental_realtime_ws_base_url: None,
experimental_realtime_ws_backend_prompt: None,
base_instructions: None,
developer_instructions: None,
compact_prompt: None,
@@ -5723,7 +5734,6 @@ trust_level = "untrusted"
);
Ok(())
}
#[test]
fn experimental_realtime_ws_base_url_loads_from_config_toml() -> std::io::Result<()> {
let cfg: ConfigToml = toml::from_str(
@@ -5751,6 +5761,34 @@ experimental_realtime_ws_base_url = "http://127.0.0.1:8011"
);
Ok(())
}
#[test]
fn experimental_realtime_ws_backend_prompt_loads_from_config_toml() -> std::io::Result<()> {
let cfg: ConfigToml = toml::from_str(
r#"
experimental_realtime_ws_backend_prompt = "prompt from config"
"#,
)
.expect("TOML deserialization should succeed");
assert_eq!(
cfg.experimental_realtime_ws_backend_prompt.as_deref(),
Some("prompt from config")
);
let codex_home = TempDir::new()?;
let config = Config::load_from_base_config_with_overrides(
cfg,
ConfigOverrides::default(),
codex_home.path().to_path_buf(),
)?;
assert_eq!(
config.experimental_realtime_ws_backend_prompt.as_deref(),
Some("prompt from config")
);
Ok(())
}
}
#[cfg(test)]

View File

@@ -174,18 +174,17 @@ pub(crate) async fn handle_start(
if let Some(realtime_ws_base_url) = &config.experimental_realtime_ws_base_url {
api_provider.base_url = realtime_ws_base_url.clone();
}
let prompt = config
.experimental_realtime_ws_backend_prompt
.clone()
.unwrap_or(params.prompt);
let requested_session_id = params
.session_id
.or_else(|| Some(sess.conversation_id.to_string()));
let events_rx = match sess
.conversation
.start(
api_provider,
None,
params.prompt,
requested_session_id.clone(),
)
.start(api_provider, None, prompt, requested_session_id.clone())
.await
{
Ok(events_rx) => events_rx,

View File

@@ -358,7 +358,6 @@ async fn conversation_second_start_replaces_runtime() -> Result<()> {
server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_uses_experimental_realtime_ws_base_url_override() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -413,3 +412,49 @@ async fn conversation_uses_experimental_realtime_ws_base_url_override() -> Resul
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_uses_experimental_realtime_ws_backend_prompt_override() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_websocket_server(vec![
vec![],
vec![vec![json!({
"type": "session.created",
"session": { "id": "sess_override" }
})]],
])
.await;
let mut builder = test_codex().with_config(|config| {
config.experimental_realtime_ws_backend_prompt = Some("prompt from config".to_string());
});
let test = builder.build_with_websocket_server(&server).await?;
assert!(server.wait_for_handshakes(1, Duration::from_secs(2)).await);
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "prompt from op".to_string(),
session_id: None,
}))
.await?;
let session_created = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionCreated { session_id },
}) => Some(session_id.clone()),
_ => None,
})
.await;
assert_eq!(session_created, "sess_override");
let connections = server.connections();
assert_eq!(connections.len(), 2);
assert_eq!(
connections[1][0].body_json()["session"]["backend_prompt"].as_str(),
Some("prompt from config")
);
server.shutdown().await;
Ok(())
}