Compare commits

...

7 Commits

Author SHA1 Message Date
Ahmed Ibrahim
5c85133b55 Restore realtime backend prompt template 2026-04-20 19:34:04 -07:00
Ahmed Ibrahim
a1a5c65059 Make realtime extraction layout idiomatic 2026-04-20 19:22:15 -07:00
Ahmed Ibrahim
1896bff9fe refactor: consolidate realtime conversation move 2026-04-20 19:09:31 -07:00
Ahmed Ibrahim
06dd8530e8 refactor: drop obsolete core realtime file 2026-04-20 19:07:56 -07:00
Ahmed Ibrahim
672bb553a6 refactor: favor realtime file moves 2026-04-20 19:07:21 -07:00
Ahmed Ibrahim
e7fe25dc5e refactor: collapse core realtime adapters 2026-04-20 19:01:55 -07:00
Ahmed Ibrahim
b2b94fe755 refactor: move realtime out of codex-core 2026-04-20 18:58:40 -07:00
19 changed files with 1998 additions and 1663 deletions

28
codex-rs/Cargo.lock generated
View File

@@ -1947,6 +1947,7 @@ dependencies = [
"codex-otel",
"codex-plugin",
"codex-protocol",
"codex-realtime",
"codex-response-debug-context",
"codex-rmcp-client",
"codex-rollout",
@@ -2677,6 +2678,33 @@ dependencies = [
"uuid",
]
[[package]]
name = "codex-realtime"
version = "0.0.0"
dependencies = [
"anyhow",
"async-channel",
"base64 0.22.1",
"chrono",
"codex-api",
"codex-config",
"codex-exec-server",
"codex-git-utils",
"codex-login",
"codex-model-provider-info",
"codex-protocol",
"codex-thread-store",
"codex-utils-absolute-path",
"codex-utils-output-truncation",
"http 1.4.0",
"pretty_assertions",
"serde_json",
"tempfile",
"tokio",
"tracing",
"whoami",
]
[[package]]
name = "codex-realtime-webrtc"
version = "0.0.0"

View File

@@ -50,6 +50,7 @@ members = [
"ollama",
"process-hardening",
"protocol",
"realtime",
"realtime-webrtc",
"rollout",
"rmcp-client",
@@ -159,6 +160,7 @@ codex-plugin = { path = "plugin" }
codex-model-provider = { path = "model-provider" }
codex-process-hardening = { path = "process-hardening" }
codex-protocol = { path = "protocol" }
codex-realtime = { path = "realtime" }
codex-realtime-webrtc = { path = "realtime-webrtc" }
codex-responses-api-proxy = { path = "responses-api-proxy" }
codex-response-debug-context = { path = "response-debug-context" }

View File

@@ -55,6 +55,7 @@ codex-plugin = { workspace = true }
codex-model-provider = { workspace = true }
codex-protocol = { workspace = true }
codex-response-debug-context = { workspace = true }
codex-realtime = { workspace = true }
codex-rollout = { workspace = true }
codex-rmcp-client = { workspace = true }
codex-sandboxing = { workspace = true }

View File

@@ -11,9 +11,7 @@ mod apps;
mod arc_monitor;
mod client;
mod client_common;
mod realtime_context;
mod realtime_conversation;
mod realtime_prompt;
mod realtime;
pub(crate) mod session;
pub use session::SteerInputError;
mod codex_thread;

View File

@@ -0,0 +1,529 @@
use crate::compact::content_items_to_text;
use crate::config::Config;
use crate::session::session::Session;
use codex_api::RealtimeEvent;
use codex_api::RealtimeSessionConfig;
use codex_api::RealtimeWebsocketClient;
use codex_api::map_api_error;
use codex_app_server_protocol::AuthMode;
use codex_config::config_toml::RealtimeWsVersion;
use codex_login::default_client::default_headers;
use codex_protocol::error::Result as CodexResult;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::CodexErrorInfo;
use codex_protocol::protocol::ConversationAudioParams;
use codex_protocol::protocol::ConversationStartParams;
use codex_protocol::protocol::ConversationStartTransport;
use codex_protocol::protocol::ConversationTextParams;
use codex_protocol::protocol::ErrorEvent;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::RealtimeConversationClosedEvent;
use codex_protocol::protocol::RealtimeConversationRealtimeEvent;
use codex_protocol::protocol::RealtimeConversationSdpEvent;
use codex_protocol::protocol::RealtimeConversationStartedEvent;
use codex_protocol::protocol::RealtimeOutputModality;
use codex_protocol::protocol::RealtimeVoice;
use codex_thread_store::ListThreadsParams;
use codex_thread_store::SortDirection;
use codex_thread_store::StoredThread;
use codex_thread_store::ThreadSortKey;
use codex_thread_store::ThreadStore;
use dirs::home_dir;
use std::mem::take;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing::warn;
pub(crate) use codex_realtime::REALTIME_USER_TEXT_PREFIX;
pub(crate) use codex_realtime::RealtimeConversationManager;
use codex_realtime::RealtimeFeaturesConfig;
use codex_realtime::RealtimeStart;
use codex_realtime::RealtimeStartOutput;
use codex_realtime::build_realtime_session_config as build_session_config;
pub(crate) use codex_realtime::prefix_realtime_v2_text;
use codex_realtime::realtime_api_key;
use codex_realtime::realtime_delegation_from_handoff;
use codex_realtime::realtime_request_headers;
const MAX_RECENT_THREADS: usize = 40;
const REALTIME_STARTUP_CONTEXT_TOKEN_BUDGET: usize = 5_300;
pub(crate) async fn build_realtime_startup_context(
sess: &Session,
budget_tokens: usize,
) -> Option<String> {
let config = sess.get_config().await;
let history = sess.clone_history().await;
let current_thread_turns = current_thread_turns(history.raw_items());
let recent_threads = load_recent_threads(sess).await;
let context = codex_realtime::RealtimeStartupContext {
cwd: config.cwd.clone(),
current_thread_turns,
recent_threads,
user_root: home_dir(),
};
codex_realtime::build_realtime_startup_context(&context, budget_tokens).await
}
async fn load_recent_threads(sess: &Session) -> Vec<StoredThread> {
match sess
.services
.thread_store
.list_threads(ListThreadsParams {
page_size: MAX_RECENT_THREADS,
cursor: None,
sort_key: ThreadSortKey::UpdatedAt,
sort_direction: SortDirection::Desc,
allowed_sources: Vec::new(),
model_providers: None,
archived: false,
search_term: None,
})
.await
{
Ok(page) => page.items,
Err(err) => {
warn!("failed to load realtime startup threads from thread store: {err}");
Vec::new()
}
}
}
fn current_thread_turns(items: &[ResponseItem]) -> Vec<codex_realtime::RealtimeStartupContextTurn> {
let mut turns = Vec::new();
let mut current_user = Vec::new();
let mut current_assistant = Vec::new();
for item in items {
match item {
ResponseItem::Message { role, content, .. } if role == "user" => {
if crate::event_mapping::is_contextual_user_message_content(content) {
continue;
}
let Some(text) = content_items_to_text(content)
.map(|text| text.trim().to_string())
.filter(|text| !text.is_empty())
else {
continue;
};
if !current_user.is_empty() || !current_assistant.is_empty() {
turns.push(codex_realtime::RealtimeStartupContextTurn {
user_messages: take(&mut current_user),
assistant_messages: take(&mut current_assistant),
});
}
current_user.push(text);
}
ResponseItem::Message { role, content, .. } if role == "assistant" => {
let Some(text) = content_items_to_text(content)
.map(|text| text.trim().to_string())
.filter(|text| !text.is_empty())
else {
continue;
};
if current_user.is_empty() && current_assistant.is_empty() {
continue;
}
current_assistant.push(text);
}
_ => {}
}
}
if !current_user.is_empty() || !current_assistant.is_empty() {
turns.push(codex_realtime::RealtimeStartupContextTurn {
user_messages: current_user,
assistant_messages: current_assistant,
});
}
turns
}
#[cfg(test)]
fn build_current_thread_section(items: &[ResponseItem]) -> Option<String> {
codex_realtime::build_current_thread_section(&current_thread_turns(items))
}
#[cfg(test)]
pub(crate) use codex_realtime::CURRENT_THREAD_SECTION_TOKEN_BUDGET;
#[cfg(test)]
pub(crate) use codex_realtime::NOTES_SECTION_TOKEN_BUDGET;
pub(crate) use codex_realtime::REALTIME_TURN_TOKEN_BUDGET;
#[cfg(test)]
pub(crate) use codex_realtime::RECENT_WORK_SECTION_TOKEN_BUDGET;
#[cfg(test)]
pub(crate) use codex_realtime::STARTUP_CONTEXT_HEADER;
#[cfg(test)]
pub(crate) use codex_realtime::WORKSPACE_SECTION_TOKEN_BUDGET;
#[cfg(test)]
use codex_realtime::build_recent_work_section;
#[cfg(test)]
use codex_realtime::build_workspace_section_with_user_root;
#[cfg(test)]
use codex_realtime::format_section;
#[cfg(test)]
use codex_realtime::format_startup_context_blob;
pub(crate) use codex_realtime::truncate_realtime_text_to_token_budget;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum RealtimeConversationEnd {
Requested,
TransportClosed,
Error,
}
pub(crate) async fn handle_start(
sess: &Arc<Session>,
sub_id: String,
params: ConversationStartParams,
) -> CodexResult<()> {
let prepared_start = match prepare_realtime_start(sess, params).await {
Ok(prepared_start) => prepared_start,
Err(err) => {
error!("failed to prepare realtime conversation: {err}");
let message = err.to_string();
sess.send_event_raw(Event {
id: sub_id,
msg: EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::Error(message),
}),
})
.await;
return Ok(());
}
};
if let Err(err) = handle_start_inner(sess, &sub_id, prepared_start).await {
error!("failed to start realtime conversation: {err}");
let message = err.to_string();
sess.send_event_raw(Event {
id: sub_id.clone(),
msg: EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::Error(message),
}),
})
.await;
}
Ok(())
}
struct PreparedRealtimeConversationStart {
requested_session_id: Option<String>,
version: RealtimeWsVersion,
start: RealtimeStart,
}
async fn prepare_realtime_start(
sess: &Arc<Session>,
params: ConversationStartParams,
) -> CodexResult<PreparedRealtimeConversationStart> {
let provider = sess.provider().await;
let auth_manager = sess
.services
.model_client
.auth_manager()
.unwrap_or_else(|| Arc::clone(&sess.services.auth_manager));
let auth = auth_manager.auth().await;
let config = sess.get_config().await;
let transport = params
.transport
.unwrap_or(ConversationStartTransport::Websocket);
let realtime_config = realtime_features_config(config.as_ref());
let version = realtime_config.session.version;
let mut api_provider = provider.to_api_provider(Some(AuthMode::ApiKey))?;
if let Some(realtime_ws_base_url) = &realtime_config.websocket_base_url {
api_provider.base_url = realtime_ws_base_url.clone();
}
let session_config = build_realtime_session_config(
sess,
&realtime_config,
params.prompt,
params.session_id,
params.output_modality,
params.voice,
)
.await?;
let requested_session_id = session_config.session_id.clone();
let event_parser = session_config.event_parser;
let client = RealtimeWebsocketClient::new(api_provider);
let start = match transport {
ConversationStartTransport::Websocket => {
let api_key = realtime_api_key(auth.as_ref(), &provider)?;
let extra_headers =
realtime_request_headers(requested_session_id.as_deref(), Some(api_key.as_str()))?
.unwrap_or_default();
let connection = client
.connect(session_config, extra_headers, default_headers())
.await
.map_err(map_api_error)?;
RealtimeStart {
writer: connection.writer(),
events: connection.events(),
event_parser,
sdp: None,
}
}
ConversationStartTransport::Webrtc { sdp } => {
let extra_headers =
realtime_request_headers(requested_session_id.as_deref(), /*api_key*/ None)?
.unwrap_or_default();
let call = sess
.services
.model_client
.create_realtime_call_with_headers(sdp, session_config.clone(), extra_headers)
.await?;
let connection = client
.connect_webrtc_sideband(
session_config,
&call.call_id,
call.sideband_headers,
default_headers(),
)
.await
.map_err(map_api_error)?;
RealtimeStart {
writer: connection.writer(),
events: connection.events(),
event_parser,
sdp: Some(call.sdp),
}
}
};
Ok(PreparedRealtimeConversationStart {
requested_session_id,
version,
start,
})
}
pub(crate) async fn build_realtime_session_config(
sess: &Arc<Session>,
config: &RealtimeFeaturesConfig,
prompt: Option<Option<String>>,
session_id: Option<String>,
output_modality: RealtimeOutputModality,
voice: Option<RealtimeVoice>,
) -> CodexResult<RealtimeSessionConfig> {
let startup_context = match &config.websocket_startup_context {
Some(startup_context) => startup_context.clone(),
None => {
build_realtime_startup_context(sess.as_ref(), REALTIME_STARTUP_CONTEXT_TOKEN_BUDGET)
.await
.unwrap_or_default()
}
};
build_session_config(
config,
prompt,
Some(session_id.unwrap_or_else(|| sess.conversation_id.to_string())),
output_modality,
voice,
startup_context,
)
}
fn realtime_features_config(config: &Config) -> RealtimeFeaturesConfig {
RealtimeFeaturesConfig {
audio: config.realtime_audio.clone(),
session: config.realtime.clone(),
websocket_base_url: config.experimental_realtime_ws_base_url.clone(),
websocket_model: config.experimental_realtime_ws_model.clone(),
websocket_backend_prompt: config.experimental_realtime_ws_backend_prompt.clone(),
websocket_startup_context: config.experimental_realtime_ws_startup_context.clone(),
start_instructions: config.experimental_realtime_start_instructions.clone(),
}
}
async fn handle_start_inner(
sess: &Arc<Session>,
sub_id: &str,
prepared_start: PreparedRealtimeConversationStart,
) -> CodexResult<()> {
let PreparedRealtimeConversationStart {
requested_session_id,
version,
start,
} = prepared_start;
info!("starting realtime conversation");
let start_output = sess.conversation.start(start).await?;
info!("realtime conversation started");
sess.send_event_raw(Event {
id: sub_id.to_string(),
msg: EventMsg::RealtimeConversationStarted(RealtimeConversationStartedEvent {
session_id: requested_session_id,
version,
}),
})
.await;
let RealtimeStartOutput {
realtime_active,
events_rx,
sdp,
} = start_output;
if let Some(sdp) = sdp {
sess.send_event_raw(Event {
id: sub_id.to_string(),
msg: EventMsg::RealtimeConversationSdp(RealtimeConversationSdpEvent { sdp }),
})
.await;
}
let sess_clone = Arc::clone(sess);
let sub_id = sub_id.to_string();
let fanout_realtime_active = Arc::clone(&realtime_active);
let fanout_task = tokio::spawn(async move {
let ev = |msg| Event {
id: sub_id.clone(),
msg,
};
let mut end = RealtimeConversationEnd::TransportClosed;
while let Ok(event) = events_rx.recv().await {
if !fanout_realtime_active.load(Ordering::Relaxed) {
break;
}
match &event {
RealtimeEvent::AudioOut(_) => {}
_ => {
info!(event = ?event, "received realtime conversation event");
}
}
if let RealtimeEvent::Error(_) = &event {
end = RealtimeConversationEnd::Error;
}
if let Some(text) = match &event {
RealtimeEvent::HandoffRequested(handoff) => {
realtime_delegation_from_handoff(handoff)
}
_ => None,
} {
debug!(text = %text, "[realtime-text] realtime conversation text output");
let sess_for_routed_text = Arc::clone(&sess_clone);
sess_for_routed_text.route_realtime_text_input(text).await;
}
if !fanout_realtime_active.load(Ordering::Relaxed) {
break;
}
sess_clone
.send_event_raw(ev(EventMsg::RealtimeConversationRealtime(
RealtimeConversationRealtimeEvent {
payload: event.clone(),
},
)))
.await;
}
if fanout_realtime_active.swap(false, Ordering::Relaxed) {
match end {
RealtimeConversationEnd::TransportClosed => {
info!("realtime conversation transport closed");
}
RealtimeConversationEnd::Requested | RealtimeConversationEnd::Error => {}
}
sess_clone
.conversation
.finish_if_active(&fanout_realtime_active)
.await;
send_realtime_conversation_closed(&sess_clone, sub_id, end).await;
}
});
sess.conversation
.register_fanout_task(&realtime_active, fanout_task)
.await;
Ok(())
}
pub(crate) async fn handle_audio(
sess: &Arc<Session>,
sub_id: String,
params: ConversationAudioParams,
) {
if let Err(err) = sess.conversation.audio_in(params.frame).await {
error!("failed to append realtime audio: {err}");
if sess.conversation.running_state().await.is_some() {
warn!("realtime audio input failed while the session was already ending");
} else {
send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::BadRequest)
.await;
}
}
}
pub(crate) async fn handle_text(
sess: &Arc<Session>,
sub_id: String,
params: ConversationTextParams,
) {
debug!(text = %params.text, "[realtime-text] appending realtime conversation text input");
if let Err(err) = sess.conversation.text_in(params.text).await {
error!("failed to append realtime text: {err}");
if sess.conversation.running_state().await.is_some() {
warn!("realtime text input failed while the session was already ending");
} else {
send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::BadRequest)
.await;
}
}
}
pub(crate) async fn handle_close(sess: &Arc<Session>, sub_id: String) {
end_realtime_conversation(sess, sub_id, RealtimeConversationEnd::Requested).await;
}
async fn send_conversation_error(
sess: &Arc<Session>,
sub_id: String,
message: String,
codex_error_info: CodexErrorInfo,
) {
sess.send_event_raw(Event {
id: sub_id,
msg: EventMsg::Error(ErrorEvent {
message,
codex_error_info: Some(codex_error_info),
}),
})
.await;
}
async fn end_realtime_conversation(
sess: &Arc<Session>,
sub_id: String,
end: RealtimeConversationEnd,
) {
let _ = sess.conversation.shutdown().await;
send_realtime_conversation_closed(sess, sub_id, end).await;
}
async fn send_realtime_conversation_closed(
sess: &Arc<Session>,
sub_id: String,
end: RealtimeConversationEnd,
) {
let reason = match end {
RealtimeConversationEnd::Requested => Some("requested".to_string()),
RealtimeConversationEnd::TransportClosed => Some("transport_closed".to_string()),
RealtimeConversationEnd::Error => Some("error".to_string()),
};
sess.send_event_raw(Event {
id: sub_id,
msg: EventMsg::RealtimeConversationClosed(RealtimeConversationClosedEvent { reason }),
})
.await;
}
#[cfg(test)]
#[path = "realtime_context_tests.rs"]
mod context_tests;

File diff suppressed because it is too large Load Diff

View File

@@ -1,7 +1,7 @@
use crate::realtime_conversation::handle_audio as handle_realtime_conversation_audio;
use crate::realtime_conversation::handle_close as handle_realtime_conversation_close;
use crate::realtime_conversation::handle_start as handle_realtime_conversation_start;
use crate::realtime_conversation::handle_text as handle_realtime_conversation_text;
use crate::realtime::handle_audio as handle_realtime_conversation_audio;
use crate::realtime::handle_close as handle_realtime_conversation_close;
use crate::realtime::handle_start as handle_realtime_conversation_start;
use crate::realtime::handle_text as handle_realtime_conversation_text;
use async_channel::Receiver;
use codex_otel::set_parent_from_w3c_trace_context;
use codex_protocol::protocol::Submission;
@@ -17,10 +17,10 @@ use crate::config::Config;
use crate::config_loader::CloudRequirementsLoader;
use crate::config_loader::LoaderOverrides;
use crate::config_loader::load_config_layers_state;
use crate::realtime_context::REALTIME_TURN_TOKEN_BUDGET;
use crate::realtime_context::truncate_realtime_text_to_token_budget;
use crate::realtime_conversation::REALTIME_USER_TEXT_PREFIX;
use crate::realtime_conversation::prefix_realtime_v2_text;
use crate::realtime::REALTIME_TURN_TOKEN_BUDGET;
use crate::realtime::REALTIME_USER_TEXT_PREFIX;
use crate::realtime::prefix_realtime_v2_text;
use crate::realtime::truncate_realtime_text_to_token_budget;
use crate::session::spawn_review_thread;
use codex_exec_server::LOCAL_FS;
use codex_features::Feature;

View File

@@ -24,7 +24,7 @@ use crate::exec_policy::ExecPolicyManager;
use crate::installation_id::resolve_installation_id;
use crate::parse_turn_item;
use crate::path_utils::normalize_for_native_workdir;
use crate::realtime_conversation::RealtimeConversationManager;
use crate::realtime::RealtimeConversationManager;
use crate::render_skills_section;
use crate::rollout::find_thread_name_by_id;
use crate::session_prefix::format_subagent_notification_message;

View File

@@ -0,0 +1,7 @@
load("//:defs.bzl", "codex_rust_crate")
codex_rust_crate(
name = "realtime",
crate_name = "codex_realtime",
compile_data = glob(["templates/**/*.md"]),
)

View File

@@ -0,0 +1,37 @@
[package]
name = "codex-realtime"
version.workspace = true
edition.workspace = true
license.workspace = true
[lib]
name = "codex_realtime"
path = "src/lib.rs"
[dependencies]
anyhow = { workspace = true }
async-channel = { workspace = true }
base64 = { workspace = true }
chrono = { workspace = true }
codex-api = { workspace = true }
codex-config = { workspace = true }
codex-exec-server = { workspace = true }
codex-git-utils = { workspace = true }
codex-login = { workspace = true }
codex-model-provider-info = { workspace = true }
codex-protocol = { workspace = true }
codex-thread-store = { workspace = true }
codex-utils-absolute-path = { workspace = true }
codex-utils-output-truncation = { workspace = true }
http = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread"] }
tracing = { workspace = true }
whoami = { workspace = true }
[dev-dependencies]
pretty_assertions = { workspace = true }
tempfile = { workspace = true }
[lints]
workspace = true

View File

@@ -0,0 +1,13 @@
use codex_config::config_toml::RealtimeAudioConfig;
use codex_config::config_toml::RealtimeConfig;
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct RealtimeFeaturesConfig {
pub audio: RealtimeAudioConfig,
pub session: RealtimeConfig,
pub websocket_base_url: Option<String>,
pub websocket_model: Option<String>,
pub websocket_backend_prompt: Option<String>,
pub websocket_startup_context: Option<String>,
pub start_instructions: Option<String>,
}

View File

@@ -0,0 +1,54 @@
use codex_protocol::protocol::RealtimeHandoffRequested;
pub const REALTIME_USER_TEXT_PREFIX: &str = "[USER] ";
pub const REALTIME_BACKEND_TEXT_PREFIX: &str = "[BACKEND] ";
pub fn prefix_realtime_v2_text(text: String, prefix: &str) -> String {
if text.is_empty() || text.starts_with(prefix) {
return text;
}
format!("{prefix}{text}")
}
fn realtime_transcript_delta_from_handoff(handoff: &RealtimeHandoffRequested) -> Option<String> {
let active_transcript = handoff
.active_transcript
.iter()
.map(|entry| format!("{role}: {text}", role = entry.role, text = entry.text))
.collect::<Vec<_>>()
.join("\n");
(!active_transcript.is_empty()).then_some(active_transcript)
}
pub fn realtime_text_from_handoff_request(handoff: &RealtimeHandoffRequested) -> Option<String> {
(!handoff.input_transcript.is_empty())
.then_some(handoff.input_transcript.clone())
.or_else(|| realtime_transcript_delta_from_handoff(handoff))
}
pub fn realtime_delegation_from_handoff(handoff: &RealtimeHandoffRequested) -> Option<String> {
let input = realtime_text_from_handoff_request(handoff)?;
Some(wrap_realtime_delegation_input(
&input,
realtime_transcript_delta_from_handoff(handoff).as_deref(),
))
}
pub fn wrap_realtime_delegation_input(input: &str, transcript_delta: Option<&str>) -> String {
let input = escape_xml_text(input);
if let Some(transcript_delta) = transcript_delta.filter(|text| !text.is_empty()) {
let transcript_delta = escape_xml_text(transcript_delta);
return format!(
"<realtime_delegation>\n <input>{input}</input>\n <transcript_delta>{transcript_delta}</transcript_delta>\n</realtime_delegation>"
);
}
format!("<realtime_delegation>\n <input>{input}</input>\n</realtime_delegation>")
}
fn escape_xml_text(input: &str) -> String {
input
.replace('&', "&amp;")
.replace('<', "&lt;")
.replace('>', "&gt;")
}

View File

@@ -0,0 +1,39 @@
mod config;
mod handoff;
mod prompt;
mod realtime_conversation;
mod session_config;
mod startup_context;
pub use config::RealtimeFeaturesConfig;
pub use handoff::REALTIME_BACKEND_TEXT_PREFIX;
pub use handoff::REALTIME_USER_TEXT_PREFIX;
pub use handoff::prefix_realtime_v2_text;
pub use handoff::realtime_delegation_from_handoff;
pub use handoff::realtime_text_from_handoff_request;
pub use handoff::wrap_realtime_delegation_input;
pub use prompt::prepare_realtime_backend_prompt;
pub use realtime_conversation::RealtimeConversationManager;
pub use realtime_conversation::RealtimeStart;
pub use realtime_conversation::RealtimeStartOutput;
pub use session_config::DEFAULT_REALTIME_MODEL;
pub use session_config::build_realtime_session_config;
pub use session_config::default_realtime_voice;
pub use session_config::realtime_api_key;
pub use session_config::realtime_request_headers;
pub use session_config::validate_realtime_voice;
pub use startup_context::CURRENT_THREAD_SECTION_TOKEN_BUDGET;
pub use startup_context::NOTES_SECTION_TOKEN_BUDGET;
pub use startup_context::REALTIME_TURN_TOKEN_BUDGET;
pub use startup_context::RECENT_WORK_SECTION_TOKEN_BUDGET;
pub use startup_context::RealtimeStartupContext;
pub use startup_context::RealtimeStartupContextTurn;
pub use startup_context::STARTUP_CONTEXT_HEADER;
pub use startup_context::WORKSPACE_SECTION_TOKEN_BUDGET;
pub use startup_context::build_current_thread_section;
pub use startup_context::build_realtime_startup_context;
pub use startup_context::build_recent_work_section;
pub use startup_context::build_workspace_section_with_user_root;
pub use startup_context::format_section;
pub use startup_context::format_startup_context_blob;
pub use startup_context::truncate_realtime_text_to_token_budget;

View File

@@ -0,0 +1,82 @@
const BACKEND_PROMPT: &str = include_str!("../templates/realtime/backend_prompt.md");
const DEFAULT_USER_FIRST_NAME: &str = "there";
const USER_FIRST_NAME_PLACEHOLDER: &str = "{{ user_first_name }}";
pub fn prepare_realtime_backend_prompt(
prompt: Option<Option<String>>,
config_prompt: Option<String>,
) -> String {
if let Some(config_prompt) = config_prompt
&& !config_prompt.trim().is_empty()
{
return config_prompt;
}
match prompt {
Some(Some(prompt)) => return prompt,
Some(None) => return String::new(),
None => {}
}
BACKEND_PROMPT
.trim_end()
.replace(USER_FIRST_NAME_PLACEHOLDER, &current_user_first_name())
}
fn current_user_first_name() -> String {
[whoami::realname(), whoami::username()]
.into_iter()
.filter_map(|name| name.split_whitespace().next().map(str::to_string))
.find(|name| !name.is_empty())
.unwrap_or_else(|| DEFAULT_USER_FIRST_NAME.to_string())
}
#[cfg(test)]
mod tests {
use super::prepare_realtime_backend_prompt;
#[test]
fn prepare_realtime_backend_prompt_prefers_config_override() {
assert_eq!(
prepare_realtime_backend_prompt(
Some(Some("prompt from request".to_string())),
Some("prompt from config".to_string()),
),
"prompt from config"
);
}
#[test]
fn prepare_realtime_backend_prompt_uses_request_prompt() {
assert_eq!(
prepare_realtime_backend_prompt(
Some(Some("prompt from request".to_string())),
/*config_prompt*/ None,
),
"prompt from request"
);
}
#[test]
fn prepare_realtime_backend_prompt_preserves_empty_request_prompt() {
assert_eq!(
prepare_realtime_backend_prompt(Some(Some(String::new())), /*config_prompt*/ None),
""
);
assert_eq!(
prepare_realtime_backend_prompt(Some(None), /*config_prompt*/ None),
""
);
}
#[test]
fn prepare_realtime_backend_prompt_renders_default() {
let prompt =
prepare_realtime_backend_prompt(/*prompt*/ None, /*config_prompt*/ None);
assert!(prompt.starts_with("## Identity, tone, and role"));
assert!(prompt.contains("You are Codex, an OpenAI general-purpose agentic assistant"));
assert!(prompt.contains("The user's name is "));
assert!(!prompt.contains("{{ user_first_name }}"));
}
}

View File

@@ -0,0 +1,840 @@
use crate::handoff::REALTIME_BACKEND_TEXT_PREFIX;
use crate::handoff::REALTIME_USER_TEXT_PREFIX;
use crate::handoff::prefix_realtime_v2_text;
use anyhow::Context;
use async_channel::Receiver;
use async_channel::RecvError;
use async_channel::Sender;
use async_channel::TrySendError;
use base64::Engine;
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
use codex_api::ApiError;
use codex_api::RealtimeAudioFrame;
use codex_api::RealtimeEvent;
use codex_api::RealtimeEventParser;
use codex_api::RealtimeWebsocketEvents;
use codex_api::RealtimeWebsocketWriter;
use codex_api::map_api_error;
use codex_protocol::error::CodexErr;
use codex_protocol::error::Result as CodexResult;
use serde_json::json;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tracing::debug;
use tracing::error;
use tracing::warn;
const AUDIO_IN_QUEUE_CAPACITY: usize = 256;
const USER_TEXT_IN_QUEUE_CAPACITY: usize = 64;
const HANDOFF_OUT_QUEUE_CAPACITY: usize = 64;
const OUTPUT_EVENTS_QUEUE_CAPACITY: usize = 256;
const REALTIME_V2_HANDOFF_COMPLETE_ACKNOWLEDGEMENT: &str =
"Background agent finished. Use the preceding [BACKEND] messages as the result.";
const REALTIME_V2_STEER_ACKNOWLEDGEMENT: &str =
"This was sent to steer the previous background agent task.";
const REALTIME_ACTIVE_RESPONSE_ERROR_PREFIX: &str =
"Conversation already has an active response in progress:";
enum RealtimeFanoutTaskStop {
Abort,
Detach,
}
pub struct RealtimeConversationManager {
state: Mutex<Option<ConversationState>>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum RealtimeSessionKind {
V1,
V2,
}
#[derive(Clone, Debug)]
struct RealtimeHandoffState {
output_tx: Sender<HandoffOutput>,
active_handoff: Arc<Mutex<Option<String>>>,
last_output_text: Arc<Mutex<Option<String>>>,
session_kind: RealtimeSessionKind,
}
#[derive(Debug, PartialEq, Eq)]
enum HandoffOutput {
ProgressUpdate {
handoff_id: String,
output_text: String,
},
FinalUpdate {
handoff_id: String,
output_text: String,
},
}
#[derive(Debug, PartialEq, Eq)]
struct OutputAudioState {
item_id: String,
audio_end_ms: u32,
}
#[derive(Default)]
struct RealtimeResponseCreateQueue {
active_default_response: bool,
pending_create: bool,
}
impl RealtimeResponseCreateQueue {
async fn request_create(
&mut self,
writer: &RealtimeWebsocketWriter,
events_tx: &Sender<RealtimeEvent>,
reason: &str,
) -> anyhow::Result<()> {
if self.active_default_response {
self.pending_create = true;
return Ok(());
}
self.send_create_now(writer, events_tx, reason).await
}
fn mark_started(&mut self) {
self.active_default_response = true;
}
async fn mark_finished(
&mut self,
writer: &RealtimeWebsocketWriter,
events_tx: &Sender<RealtimeEvent>,
reason: &str,
) -> anyhow::Result<()> {
self.active_default_response = false;
if !self.pending_create {
return Ok(());
}
self.pending_create = false;
self.send_create_now(writer, events_tx, reason).await
}
async fn send_create_now(
&mut self,
writer: &RealtimeWebsocketWriter,
events_tx: &Sender<RealtimeEvent>,
reason: &str,
) -> anyhow::Result<()> {
if let Err(err) = writer.send_response_create().await {
let mapped_error = map_api_error(err);
let error_message = mapped_error.to_string();
if error_message.starts_with(REALTIME_ACTIVE_RESPONSE_ERROR_PREFIX) {
warn!("realtime response.create raced an active response; deferring");
self.active_default_response = true;
self.pending_create = true;
return Ok(());
}
warn!("failed to send {reason} response.create: {mapped_error}");
let _ = events_tx.send(RealtimeEvent::Error(error_message)).await;
return Err(mapped_error.into());
}
self.active_default_response = true;
Ok(())
}
}
struct RealtimeInputTask {
writer: RealtimeWebsocketWriter,
events: RealtimeWebsocketEvents,
user_text_rx: Receiver<String>,
handoff_output_rx: Receiver<HandoffOutput>,
audio_rx: Receiver<RealtimeAudioFrame>,
events_tx: Sender<RealtimeEvent>,
handoff_state: RealtimeHandoffState,
session_kind: RealtimeSessionKind,
event_parser: RealtimeEventParser,
}
impl RealtimeHandoffState {
fn new(output_tx: Sender<HandoffOutput>, session_kind: RealtimeSessionKind) -> Self {
Self {
output_tx,
active_handoff: Arc::new(Mutex::new(None)),
last_output_text: Arc::new(Mutex::new(None)),
session_kind,
}
}
}
struct ConversationState {
audio_tx: Sender<RealtimeAudioFrame>,
user_text_tx: Sender<String>,
session_kind: RealtimeSessionKind,
handoff: RealtimeHandoffState,
input_task: JoinHandle<()>,
fanout_task: Option<JoinHandle<()>>,
realtime_active: Arc<AtomicBool>,
}
pub struct RealtimeStart {
pub writer: RealtimeWebsocketWriter,
pub events: RealtimeWebsocketEvents,
pub event_parser: RealtimeEventParser,
pub sdp: Option<String>,
}
pub struct RealtimeStartOutput {
pub realtime_active: Arc<AtomicBool>,
pub events_rx: Receiver<RealtimeEvent>,
pub sdp: Option<String>,
}
impl Default for RealtimeConversationManager {
fn default() -> Self {
Self::new()
}
}
impl RealtimeConversationManager {
pub fn new() -> Self {
Self {
state: Mutex::new(None),
}
}
pub async fn running_state(&self) -> Option<()> {
let state = self.state.lock().await;
state
.as_ref()
.and_then(|state| state.realtime_active.load(Ordering::Relaxed).then_some(()))
}
pub async fn is_running_v2(&self) -> bool {
let state = self.state.lock().await;
matches!(
state.as_ref(),
Some(state)
if state.realtime_active.load(Ordering::Relaxed)
&& state.session_kind == RealtimeSessionKind::V2
)
}
pub async fn start(&self, start: RealtimeStart) -> CodexResult<RealtimeStartOutput> {
let previous_state = {
let mut guard = self.state.lock().await;
guard.take()
};
if let Some(state) = previous_state {
stop_conversation_state(state, RealtimeFanoutTaskStop::Abort).await;
}
self.start_inner(start).await
}
async fn start_inner(&self, start: RealtimeStart) -> CodexResult<RealtimeStartOutput> {
let RealtimeStart {
writer,
events,
event_parser,
sdp,
} = start;
let session_kind = match event_parser {
RealtimeEventParser::V1 => RealtimeSessionKind::V1,
RealtimeEventParser::RealtimeV2 => RealtimeSessionKind::V2,
};
let (audio_tx, audio_rx) =
async_channel::bounded::<RealtimeAudioFrame>(AUDIO_IN_QUEUE_CAPACITY);
let (user_text_tx, user_text_rx) =
async_channel::bounded::<String>(USER_TEXT_IN_QUEUE_CAPACITY);
let (handoff_output_tx, handoff_output_rx) =
async_channel::bounded::<HandoffOutput>(HANDOFF_OUT_QUEUE_CAPACITY);
let (events_tx, events_rx) =
async_channel::bounded::<RealtimeEvent>(OUTPUT_EVENTS_QUEUE_CAPACITY);
let realtime_active = Arc::new(AtomicBool::new(true));
let handoff = RealtimeHandoffState::new(handoff_output_tx, session_kind);
let task = spawn_realtime_input_task(RealtimeInputTask {
writer: writer.clone(),
events,
user_text_rx,
handoff_output_rx,
audio_rx,
events_tx,
handoff_state: handoff.clone(),
session_kind,
event_parser,
});
let mut guard = self.state.lock().await;
*guard = Some(ConversationState {
audio_tx,
user_text_tx,
session_kind,
handoff,
input_task: task,
fanout_task: None,
realtime_active: Arc::clone(&realtime_active),
});
Ok(RealtimeStartOutput {
realtime_active,
events_rx,
sdp,
})
}
pub async fn register_fanout_task(
&self,
realtime_active: &Arc<AtomicBool>,
fanout_task: JoinHandle<()>,
) {
let mut fanout_task = Some(fanout_task);
{
let mut guard = self.state.lock().await;
if let Some(state) = guard.as_mut()
&& Arc::ptr_eq(&state.realtime_active, realtime_active)
{
state.fanout_task = fanout_task.take();
}
}
if let Some(fanout_task) = fanout_task {
fanout_task.abort();
let _ = fanout_task.await;
}
}
pub async fn finish_if_active(&self, realtime_active: &Arc<AtomicBool>) {
let state = {
let mut guard = self.state.lock().await;
match guard.as_ref() {
Some(state) if Arc::ptr_eq(&state.realtime_active, realtime_active) => guard.take(),
_ => None,
}
};
if let Some(state) = state {
stop_conversation_state(state, RealtimeFanoutTaskStop::Detach).await;
}
}
pub async fn audio_in(&self, frame: RealtimeAudioFrame) -> CodexResult<()> {
let sender = {
let guard = self.state.lock().await;
guard.as_ref().map(|state| state.audio_tx.clone())
};
let Some(sender) = sender else {
return Err(CodexErr::InvalidRequest(
"conversation is not running".to_string(),
));
};
match sender.try_send(frame) {
Ok(()) => Ok(()),
Err(TrySendError::Full(_)) => {
warn!("dropping input audio frame due to full queue");
Ok(())
}
Err(TrySendError::Closed(_)) => Err(CodexErr::InvalidRequest(
"conversation is not running".to_string(),
)),
}
}
pub async fn text_in(&self, text: String) -> CodexResult<()> {
let sender = {
let guard = self.state.lock().await;
guard
.as_ref()
.map(|state| (state.user_text_tx.clone(), state.session_kind))
};
let Some((sender, session_kind)) = sender else {
return Err(CodexErr::InvalidRequest(
"conversation is not running".to_string(),
));
};
let text = if session_kind == RealtimeSessionKind::V2 {
prefix_realtime_v2_text(text, REALTIME_USER_TEXT_PREFIX)
} else {
text
};
sender
.send(text)
.await
.map_err(|_| CodexErr::InvalidRequest("conversation is not running".to_string()))?;
Ok(())
}
pub async fn handoff_out(&self, output_text: String) -> CodexResult<()> {
let handoff = {
let guard = self.state.lock().await;
let Some(state) = guard.as_ref() else {
return Err(CodexErr::InvalidRequest(
"conversation is not running".to_string(),
));
};
state.handoff.clone()
};
let Some(handoff_id) = handoff.active_handoff.lock().await.clone() else {
return Ok(());
};
let output_text = if handoff.session_kind == RealtimeSessionKind::V2 {
prefix_realtime_v2_text(output_text, REALTIME_BACKEND_TEXT_PREFIX)
} else {
output_text
};
*handoff.last_output_text.lock().await = Some(output_text.clone());
handoff
.output_tx
.send(HandoffOutput::ProgressUpdate {
handoff_id,
output_text,
})
.await
.map_err(|_| CodexErr::InvalidRequest("conversation is not running".to_string()))?;
Ok(())
}
pub async fn handoff_complete(&self) -> CodexResult<()> {
let handoff = {
let guard = self.state.lock().await;
guard.as_ref().map(|state| state.handoff.clone())
};
let Some(handoff) = handoff else {
return Ok(());
};
match handoff.session_kind {
RealtimeSessionKind::V1 => return Ok(()),
RealtimeSessionKind::V2 => {}
}
let Some(handoff_id) = handoff.active_handoff.lock().await.clone() else {
return Ok(());
};
let Some(output_text) = handoff.last_output_text.lock().await.clone() else {
return Ok(());
};
handoff
.output_tx
.send(HandoffOutput::FinalUpdate {
handoff_id,
output_text,
})
.await
.map_err(|_| CodexErr::InvalidRequest("conversation is not running".to_string()))
}
pub async fn active_handoff_id(&self) -> Option<String> {
let handoff = {
let guard = self.state.lock().await;
guard.as_ref().map(|state| state.handoff.clone())
}?;
handoff.active_handoff.lock().await.clone()
}
pub async fn clear_active_handoff(&self) {
let handoff = {
let guard = self.state.lock().await;
guard.as_ref().map(|state| state.handoff.clone())
};
if let Some(handoff) = handoff {
*handoff.active_handoff.lock().await = None;
*handoff.last_output_text.lock().await = None;
}
}
pub async fn shutdown(&self) -> CodexResult<()> {
let state = {
let mut guard = self.state.lock().await;
guard.take()
};
if let Some(state) = state {
stop_conversation_state(state, RealtimeFanoutTaskStop::Abort).await;
}
Ok(())
}
}
async fn stop_conversation_state(
mut state: ConversationState,
fanout_task_stop: RealtimeFanoutTaskStop,
) {
state.realtime_active.store(false, Ordering::Relaxed);
state.input_task.abort();
let _ = state.input_task.await;
if let Some(fanout_task) = state.fanout_task.take() {
match fanout_task_stop {
RealtimeFanoutTaskStop::Abort => {
fanout_task.abort();
let _ = fanout_task.await;
}
RealtimeFanoutTaskStop::Detach => {}
}
}
}
fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
let RealtimeInputTask {
writer,
events,
user_text_rx,
handoff_output_rx,
audio_rx,
events_tx,
handoff_state,
session_kind,
event_parser,
} = input;
tokio::spawn(async move {
let mut output_audio_state: Option<OutputAudioState> = None;
let mut response_create_queue = RealtimeResponseCreateQueue::default();
loop {
let result = tokio::select! {
user_text = user_text_rx.recv() => {
handle_user_text_input(user_text, &writer, &events_tx).await
}
background_agent_output = handoff_output_rx.recv() => {
handle_handoff_output(
background_agent_output,
&writer,
&events_tx,
&handoff_state,
event_parser,
&mut response_create_queue,
)
.await
}
realtime_event = events.next_event() => {
handle_realtime_server_event(
realtime_event,
&writer,
&events_tx,
&handoff_state,
session_kind,
&mut output_audio_state,
&mut response_create_queue,
)
.await
}
user_audio_frame = audio_rx.recv() => {
handle_user_audio_input(user_audio_frame, &writer, &events_tx).await
}
};
if result.is_err() {
break;
}
}
})
}
async fn handle_user_text_input(
text: Result<String, RecvError>,
writer: &RealtimeWebsocketWriter,
events_tx: &Sender<RealtimeEvent>,
) -> anyhow::Result<()> {
let text = text.context("user text input channel closed")?;
if let Err(err) = writer.send_conversation_item_create(text).await {
let mapped_error = map_api_error(err);
warn!("failed to send input text: {mapped_error}");
let _ = events_tx
.send(RealtimeEvent::Error(mapped_error.to_string()))
.await;
return Err(mapped_error.into());
}
Ok(())
}
async fn handle_handoff_output(
handoff_output: Result<HandoffOutput, RecvError>,
writer: &RealtimeWebsocketWriter,
events_tx: &Sender<RealtimeEvent>,
handoff_state: &RealtimeHandoffState,
event_parser: RealtimeEventParser,
response_create_queue: &mut RealtimeResponseCreateQueue,
) -> anyhow::Result<()> {
let handoff_output = handoff_output.context("handoff output channel closed")?;
let result = match event_parser {
RealtimeEventParser::V1 => match handoff_output {
HandoffOutput::ProgressUpdate {
handoff_id,
output_text,
}
| HandoffOutput::FinalUpdate {
handoff_id,
output_text,
} => {
writer
.send_conversation_function_call_output(handoff_id, output_text)
.await
}
},
RealtimeEventParser::RealtimeV2 => match handoff_output {
HandoffOutput::ProgressUpdate {
handoff_id,
output_text,
} => {
let active_handoff = handoff_state.active_handoff.lock().await.clone();
match active_handoff {
Some(active_handoff) if active_handoff == handoff_id => {}
Some(_) | None => {
debug!("dropping stale realtime handoff progress update");
return Ok(());
}
}
writer.send_conversation_item_create(output_text).await
}
HandoffOutput::FinalUpdate {
handoff_id,
output_text: _,
} => {
if let Err(err) = writer
.send_conversation_function_call_output(
handoff_id,
REALTIME_V2_HANDOFF_COMPLETE_ACKNOWLEDGEMENT.to_string(),
)
.await
{
Err(err)
} else {
return response_create_queue
.request_create(writer, events_tx, "handoff")
.await;
}
}
},
};
if let Err(err) = result {
let mapped_error = map_api_error(err);
warn!("failed to send handoff output: {mapped_error}");
let _ = events_tx
.send(RealtimeEvent::Error(mapped_error.to_string()))
.await;
return Err(mapped_error.into());
}
Ok(())
}
async fn handle_realtime_server_event(
event: Result<Option<RealtimeEvent>, ApiError>,
writer: &RealtimeWebsocketWriter,
events_tx: &Sender<RealtimeEvent>,
handoff_state: &RealtimeHandoffState,
session_kind: RealtimeSessionKind,
output_audio_state: &mut Option<OutputAudioState>,
response_create_queue: &mut RealtimeResponseCreateQueue,
) -> anyhow::Result<()> {
let event = match event {
Ok(Some(event)) => event,
Ok(None) => anyhow::bail!("realtime event stream ended"),
Err(err) => {
let mapped_error = map_api_error(err);
if events_tx
.send(RealtimeEvent::Error(mapped_error.to_string()))
.await
.is_err()
{
return Err(mapped_error.into());
}
error!("realtime stream closed: {mapped_error}");
return Err(mapped_error.into());
}
};
let should_stop = match &event {
RealtimeEvent::AudioOut(frame) => {
if session_kind == RealtimeSessionKind::V2 {
update_output_audio_state(output_audio_state, frame);
}
false
}
RealtimeEvent::InputAudioSpeechStarted(event) => {
if session_kind == RealtimeSessionKind::V2
&& let Some(output_audio_state) = output_audio_state.take()
&& event
.item_id
.as_deref()
.is_none_or(|item_id| item_id == output_audio_state.item_id)
&& let Err(err) = writer
.send_payload(
json!({
"type": "conversation.item.truncate",
"item_id": output_audio_state.item_id,
"content_index": 0,
"audio_end_ms": output_audio_state.audio_end_ms,
})
.to_string(),
)
.await
{
let mapped_error = map_api_error(err);
warn!("failed to truncate realtime audio: {mapped_error}");
}
false
}
RealtimeEvent::ResponseCreated(_) => {
if session_kind == RealtimeSessionKind::V2 {
response_create_queue.mark_started();
}
false
}
RealtimeEvent::ResponseCancelled(_) | RealtimeEvent::ResponseDone(_) => {
*output_audio_state = None;
if session_kind == RealtimeSessionKind::V2 {
response_create_queue
.mark_finished(writer, events_tx, "deferred")
.await?;
}
false
}
RealtimeEvent::HandoffRequested(handoff) => {
*output_audio_state = None;
match session_kind {
RealtimeSessionKind::V1 => {
*handoff_state.last_output_text.lock().await = None;
*handoff_state.active_handoff.lock().await = Some(handoff.handoff_id.clone());
}
RealtimeSessionKind::V2 => {
let active_handoff = handoff_state.active_handoff.lock().await.clone();
match active_handoff {
Some(_) => {
if let Err(err) = writer
.send_conversation_function_call_output(
handoff.handoff_id.clone(),
REALTIME_V2_STEER_ACKNOWLEDGEMENT.to_string(),
)
.await
{
let mapped_error = map_api_error(err);
warn!(
"failed to send handoff steering acknowledgement: {mapped_error}"
);
let _ = events_tx
.send(RealtimeEvent::Error(mapped_error.to_string()))
.await;
return Err(mapped_error.into());
}
response_create_queue
.request_create(writer, events_tx, "handoff steering")
.await?;
}
None => {
*handoff_state.last_output_text.lock().await = None;
*handoff_state.active_handoff.lock().await =
Some(handoff.handoff_id.clone());
}
}
}
}
false
}
RealtimeEvent::NoopRequested(noop) => {
*output_audio_state = None;
if session_kind == RealtimeSessionKind::V2
&& let Err(err) = writer
.send_conversation_function_call_output(noop.call_id.clone(), String::new())
.await
{
let mapped_error = map_api_error(err);
warn!("failed to send realtime noop function output: {mapped_error}");
let _ = events_tx
.send(RealtimeEvent::Error(mapped_error.to_string()))
.await;
return Err(mapped_error.into());
}
false
}
RealtimeEvent::Error(_) => true,
RealtimeEvent::SessionUpdated { .. }
| RealtimeEvent::InputTranscriptDelta(_)
| RealtimeEvent::InputTranscriptDone(_)
| RealtimeEvent::OutputTranscriptDelta(_)
| RealtimeEvent::OutputTranscriptDone(_)
| RealtimeEvent::ConversationItemAdded(_)
| RealtimeEvent::ConversationItemDone { .. } => false,
};
if events_tx.send(event).await.is_err() {
anyhow::bail!("realtime output event channel closed");
}
if should_stop {
error!("realtime stream error event received");
anyhow::bail!("realtime stream error event received");
}
Ok(())
}
async fn handle_user_audio_input(
frame: Result<RealtimeAudioFrame, RecvError>,
writer: &RealtimeWebsocketWriter,
events_tx: &Sender<RealtimeEvent>,
) -> anyhow::Result<()> {
let frame = frame.context("user audio input channel closed")?;
if let Err(err) = writer.send_audio_frame(frame).await {
let mapped_error = map_api_error(err);
error!("failed to send input audio: {mapped_error}");
let _ = events_tx
.send(RealtimeEvent::Error(mapped_error.to_string()))
.await;
return Err(mapped_error.into());
}
Ok(())
}
fn update_output_audio_state(
output_audio_state: &mut Option<OutputAudioState>,
frame: &RealtimeAudioFrame,
) {
let Some(item_id) = frame.item_id.clone() else {
return;
};
let audio_end_ms = audio_duration_ms(frame);
if audio_end_ms == 0 {
return;
}
if let Some(current) = output_audio_state.as_mut()
&& current.item_id == item_id
{
current.audio_end_ms = current.audio_end_ms.saturating_add(audio_end_ms);
return;
}
*output_audio_state = Some(OutputAudioState {
item_id,
audio_end_ms,
});
}
fn audio_duration_ms(frame: &RealtimeAudioFrame) -> u32 {
let Some(samples_per_channel) = frame
.samples_per_channel
.or(decoded_samples_per_channel(frame))
else {
return 0;
};
let sample_rate = u64::from(frame.sample_rate.max(1));
((u64::from(samples_per_channel) * 1_000) / sample_rate) as u32
}
fn decoded_samples_per_channel(frame: &RealtimeAudioFrame) -> Option<u32> {
let bytes = BASE64_STANDARD.decode(&frame.data).ok()?;
let channels = usize::from(frame.num_channels.max(1));
let samples = bytes.len().checked_div(2)?.checked_div(channels)?;
u32::try_from(samples).ok()
}
#[cfg(test)]
#[path = "realtime_conversation_tests.rs"]
mod tests;

View File

@@ -1,8 +1,5 @@
use super::RealtimeHandoffState;
use super::RealtimeSessionKind;
use super::realtime_delegation_from_handoff;
use super::realtime_text_from_handoff_request;
use super::wrap_realtime_delegation_input;
use async_channel::bounded;
use codex_protocol::protocol::RealtimeHandoffRequested;
use codex_protocol::protocol::RealtimeTranscriptEntry;
@@ -26,7 +23,7 @@ fn prefers_handoff_input_transcript_over_active_transcript() {
],
};
assert_eq!(
realtime_text_from_handoff_request(&handoff),
crate::realtime_text_from_handoff_request(&handoff),
Some("ignored".to_string())
);
}
@@ -43,7 +40,7 @@ fn extracts_text_from_handoff_request_active_transcript_if_input_missing() {
}],
};
assert_eq!(
realtime_text_from_handoff_request(&handoff),
crate::realtime_text_from_handoff_request(&handoff),
Some("user: hello".to_string())
);
}
@@ -66,7 +63,7 @@ fn wraps_handoff_with_transcript_delta() {
],
};
assert_eq!(
realtime_delegation_from_handoff(&handoff),
crate::realtime_delegation_from_handoff(&handoff),
Some(
"<realtime_delegation>\n <input>delegate this</input>\n <transcript_delta>user: hello\nassistant: hi there</transcript_delta>\n</realtime_delegation>"
.to_string()
@@ -83,7 +80,7 @@ fn extracts_text_from_handoff_request_input_transcript_if_messages_missing() {
active_transcript: vec![],
};
assert_eq!(
realtime_text_from_handoff_request(&handoff),
crate::realtime_text_from_handoff_request(&handoff),
Some("ignored".to_string())
);
}
@@ -96,13 +93,13 @@ fn ignores_empty_handoff_request_input_transcript() {
input_transcript: String::new(),
active_transcript: vec![],
};
assert_eq!(realtime_text_from_handoff_request(&handoff), None);
assert_eq!(crate::realtime_text_from_handoff_request(&handoff), None);
}
#[test]
fn wraps_realtime_delegation_input() {
assert_eq!(
wrap_realtime_delegation_input("hello", /*transcript_delta*/ None),
crate::wrap_realtime_delegation_input("hello", /*transcript_delta*/ None),
"<realtime_delegation>\n <input>hello</input>\n</realtime_delegation>"
);
}
@@ -110,7 +107,7 @@ fn wraps_realtime_delegation_input() {
#[test]
fn wraps_realtime_delegation_input_with_xml_escaping() {
assert_eq!(
wrap_realtime_delegation_input("use a < b && c > d", Some("saw <that>")),
crate::wrap_realtime_delegation_input("use a < b && c > d", Some("saw <that>")),
"<realtime_delegation>\n <input>use a &lt; b &amp;&amp; c &gt; d</input>\n <transcript_delta>saw &lt;that&gt;</transcript_delta>\n</realtime_delegation>"
);
}
@@ -118,7 +115,7 @@ fn wraps_realtime_delegation_input_with_xml_escaping() {
#[test]
fn wraps_realtime_delegation_input_with_xml_escaping_without_transcript() {
assert_eq!(
wrap_realtime_delegation_input("use a < b && c > d", /*transcript_delta*/ None),
crate::wrap_realtime_delegation_input("use a < b && c > d", /*transcript_delta*/ None),
"<realtime_delegation>\n <input>use a &lt; b &amp;&amp; c &gt; d</input>\n</realtime_delegation>"
);
}

View File

@@ -0,0 +1,160 @@
use crate::config::RealtimeFeaturesConfig;
use crate::prompt::prepare_realtime_backend_prompt;
use codex_api::RealtimeEventParser;
use codex_api::RealtimeSessionConfig;
use codex_api::RealtimeSessionMode;
use codex_config::config_toml::RealtimeWsMode;
use codex_config::config_toml::RealtimeWsVersion;
use codex_login::CodexAuth;
use codex_login::read_openai_api_key_from_env;
use codex_model_provider_info::ModelProviderInfo;
use codex_protocol::error::CodexErr;
use codex_protocol::error::Result as CodexResult;
use codex_protocol::protocol::RealtimeOutputModality;
use codex_protocol::protocol::RealtimeVoice;
use codex_protocol::protocol::RealtimeVoicesList;
use http::HeaderMap;
use http::HeaderValue;
use http::header::AUTHORIZATION;
pub const DEFAULT_REALTIME_MODEL: &str = "gpt-realtime-1.5";
pub fn build_realtime_session_config(
config: &RealtimeFeaturesConfig,
prompt: Option<Option<String>>,
session_id: Option<String>,
output_modality: RealtimeOutputModality,
voice: Option<RealtimeVoice>,
startup_context: String,
) -> CodexResult<RealtimeSessionConfig> {
let prompt = prepare_realtime_backend_prompt(prompt, config.websocket_backend_prompt.clone());
let startup_context = config
.websocket_startup_context
.clone()
.unwrap_or(startup_context);
let prompt = match (prompt.is_empty(), startup_context.is_empty()) {
(true, true) => String::new(),
(true, false) => startup_context,
(false, true) => prompt,
(false, false) => format!("{prompt}\n\n{startup_context}"),
};
let model = Some(
config
.websocket_model
.clone()
.unwrap_or_else(|| DEFAULT_REALTIME_MODEL.to_string()),
);
let event_parser = match config.session.version {
RealtimeWsVersion::V1 => RealtimeEventParser::V1,
RealtimeWsVersion::V2 => RealtimeEventParser::RealtimeV2,
};
if config.session.version == RealtimeWsVersion::V1
&& matches!(output_modality, RealtimeOutputModality::Text)
{
return Err(CodexErr::InvalidRequest(
"text realtime output modality requires realtime v2".to_string(),
));
}
let session_mode = match config.session.session_type {
RealtimeWsMode::Conversational => RealtimeSessionMode::Conversational,
RealtimeWsMode::Transcription => RealtimeSessionMode::Transcription,
};
let voice = voice
.or(config.session.voice)
.unwrap_or_else(|| default_realtime_voice(config.session.version));
validate_realtime_voice(config.session.version, voice)?;
Ok(RealtimeSessionConfig {
instructions: prompt,
model,
session_id: Some(session_id.unwrap_or_default()),
event_parser,
session_mode,
output_modality,
voice,
})
}
pub fn default_realtime_voice(version: RealtimeWsVersion) -> RealtimeVoice {
let voices = RealtimeVoicesList::builtin();
match version {
RealtimeWsVersion::V1 => voices.default_v1,
RealtimeWsVersion::V2 => voices.default_v2,
}
}
pub fn validate_realtime_voice(
version: RealtimeWsVersion,
voice: RealtimeVoice,
) -> CodexResult<()> {
let voices = RealtimeVoicesList::builtin();
let allowed = match version {
RealtimeWsVersion::V1 => &voices.v1,
RealtimeWsVersion::V2 => &voices.v2,
};
if allowed.contains(&voice) {
return Ok(());
}
let version = match version {
RealtimeWsVersion::V1 => "v1",
RealtimeWsVersion::V2 => "v2",
};
let allowed = allowed
.iter()
.map(|voice| voice.wire_name())
.collect::<Vec<_>>()
.join(", ");
Err(CodexErr::InvalidRequest(format!(
"realtime voice `{}` is not supported for {version}; supported voices: {allowed}",
voice.wire_name()
)))
}
pub fn realtime_api_key(
auth: Option<&CodexAuth>,
provider: &ModelProviderInfo,
) -> CodexResult<String> {
if let Some(api_key) = provider.api_key()? {
return Ok(api_key);
}
if let Some(token) = provider.experimental_bearer_token.clone() {
return Ok(token);
}
if let Some(api_key) = auth.and_then(CodexAuth::api_key) {
return Ok(api_key.to_string());
}
if provider.is_openai()
&& let Some(api_key) = read_openai_api_key_from_env()
{
return Ok(api_key);
}
Err(CodexErr::InvalidRequest(
"realtime conversation requires API key auth".to_string(),
))
}
pub fn realtime_request_headers(
session_id: Option<&str>,
api_key: Option<&str>,
) -> CodexResult<Option<HeaderMap>> {
let mut headers = HeaderMap::new();
if let Some(session_id) = session_id
&& let Ok(session_id) = HeaderValue::from_str(session_id)
{
headers.insert("x-session-id", session_id);
}
if let Some(api_key) = api_key {
let auth_value = HeaderValue::from_str(&format!("Bearer {api_key}")).map_err(|err| {
CodexErr::InvalidRequest(format!("invalid realtime api key header: {err}"))
})?;
headers.insert(AUTHORIZATION, auth_value);
}
Ok(Some(headers))
}

View File

@@ -1,41 +1,29 @@
use crate::compact::content_items_to_text;
use crate::event_mapping::is_contextual_user_message_content;
use crate::session::session::Session;
use chrono::Utc;
use codex_exec_server::LOCAL_FS;
use codex_git_utils::resolve_root_git_project_for_trust;
use codex_protocol::models::ResponseItem;
use codex_thread_store::ListThreadsParams;
use codex_thread_store::SortDirection;
use codex_thread_store::StoredThread;
use codex_thread_store::ThreadSortKey;
use codex_thread_store::ThreadStore;
use codex_utils_absolute_path::AbsolutePathBuf;
use codex_utils_output_truncation::TruncationPolicy;
use codex_utils_output_truncation::truncate_text;
use dirs::home_dir;
use std::cmp::Reverse;
use std::collections::HashMap;
use std::collections::HashSet;
use std::ffi::OsStr;
use std::fs::DirEntry;
use std::io;
use std::mem::take;
use std::path::Path;
use std::path::PathBuf;
use tracing::debug;
use tracing::info;
use tracing::warn;
const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex.\nThis is background context about recent work and machine/workspace layout. It may be incomplete or stale. Use it to inform responses, and do not repeat it back unless relevant.";
pub const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex.\nThis is background context about recent work and machine/workspace layout. It may be incomplete or stale. Use it to inform responses, and do not repeat it back unless relevant.";
const STARTUP_CONTEXT_OPEN_TAG: &str = "<startup_context>";
const STARTUP_CONTEXT_CLOSE_TAG: &str = "</startup_context>";
const CURRENT_THREAD_SECTION_TOKEN_BUDGET: usize = 1_200;
const RECENT_WORK_SECTION_TOKEN_BUDGET: usize = 2_200;
const WORKSPACE_SECTION_TOKEN_BUDGET: usize = 1_600;
const NOTES_SECTION_TOKEN_BUDGET: usize = 300;
pub(crate) const REALTIME_TURN_TOKEN_BUDGET: usize = 300;
const MAX_RECENT_THREADS: usize = 40;
pub const CURRENT_THREAD_SECTION_TOKEN_BUDGET: usize = 1_200;
pub const RECENT_WORK_SECTION_TOKEN_BUDGET: usize = 2_200;
pub const WORKSPACE_SECTION_TOKEN_BUDGET: usize = 1_600;
pub const NOTES_SECTION_TOKEN_BUDGET: usize = 300;
pub const REALTIME_TURN_TOKEN_BUDGET: usize = 300;
const MAX_RECENT_WORK_GROUPS: usize = 8;
const MAX_CURRENT_CWD_ASKS: usize = 8;
const MAX_OTHER_CWD_ASKS: usize = 5;
@@ -56,17 +44,29 @@ const NOISY_DIR_NAMES: &[&str] = &[
"target",
];
pub(crate) async fn build_realtime_startup_context(
sess: &Session,
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RealtimeStartupContextTurn {
pub user_messages: Vec<String>,
pub assistant_messages: Vec<String>,
}
#[derive(Clone, Debug)]
pub struct RealtimeStartupContext {
pub cwd: AbsolutePathBuf,
pub current_thread_turns: Vec<RealtimeStartupContextTurn>,
pub recent_threads: Vec<StoredThread>,
pub user_root: Option<PathBuf>,
}
pub async fn build_realtime_startup_context(
context: &RealtimeStartupContext,
budget_tokens: usize,
) -> Option<String> {
let config = sess.get_config().await;
let cwd = config.cwd.clone();
let history = sess.clone_history().await;
let current_thread_section = build_current_thread_section(history.raw_items());
let recent_threads = load_recent_threads(sess).await;
let recent_work_section = build_recent_work_section(&cwd, &recent_threads).await;
let workspace_section = build_workspace_section_with_user_root(&cwd, home_dir()).await;
let current_thread_section = build_current_thread_section(&context.current_thread_turns);
let recent_work_section =
build_recent_work_section(&context.cwd, &context.recent_threads).await;
let workspace_section =
build_workspace_section_with_user_root(&context.cwd, context.user_root.clone()).await;
if current_thread_section.is_none()
&& recent_work_section.is_none()
@@ -125,31 +125,80 @@ pub(crate) async fn build_realtime_startup_context(
Some(context)
}
async fn load_recent_threads(sess: &Session) -> Vec<StoredThread> {
match sess
.services
.thread_store
.list_threads(ListThreadsParams {
page_size: MAX_RECENT_THREADS,
cursor: None,
sort_key: ThreadSortKey::UpdatedAt,
sort_direction: SortDirection::Desc,
allowed_sources: Vec::new(),
model_providers: None,
archived: false,
search_term: None,
})
.await
{
Ok(page) => page.items,
Err(err) => {
warn!("failed to load realtime startup threads from thread store: {err}");
Vec::new()
pub fn build_current_thread_section(turns: &[RealtimeStartupContextTurn]) -> Option<String> {
if turns.is_empty() {
return None;
}
let mut lines = vec![
"Most recent user/assistant turns from this exact thread. Use them for continuity when responding.".to_string(),
];
let mut remaining_budget =
CURRENT_THREAD_SECTION_TOKEN_BUDGET.saturating_sub(approx_token_count(&lines.join("\n")));
let mut retained_turn_count = 0;
for (index, turn) in turns.iter().rev().enumerate() {
if remaining_budget == 0 {
break;
}
let mut turn_lines = Vec::new();
if index == 0 {
turn_lines.push("### Latest turn".to_string());
} else {
turn_lines.push(format!("### Previous turn {index}"));
}
if !turn.user_messages.is_empty() {
turn_lines.push("User:".to_string());
turn_lines.push(turn.user_messages.join("\n\n"));
}
if !turn.assistant_messages.is_empty() {
turn_lines.push(String::new());
turn_lines.push("Assistant:".to_string());
turn_lines.push(turn.assistant_messages.join("\n\n"));
}
let turn_budget = REALTIME_TURN_TOKEN_BUDGET.min(remaining_budget);
let turn_text = turn_lines.join("\n");
let turn_text = truncate_realtime_text_to_token_budget(&turn_text, turn_budget);
let turn_tokens = approx_token_count(&turn_text);
if turn_tokens == 0 {
continue;
}
lines.push(String::new());
lines.push(turn_text);
remaining_budget = remaining_budget.saturating_sub(turn_tokens);
retained_turn_count += 1;
}
(retained_turn_count > 0).then(|| lines.join("\n"))
}
pub fn truncate_realtime_text_to_token_budget(text: &str, budget_tokens: usize) -> String {
let mut truncation_budget = budget_tokens;
loop {
let candidate = truncate_text(text, TruncationPolicy::Tokens(truncation_budget));
let candidate_tokens = approx_token_count(&candidate);
if candidate_tokens <= budget_tokens {
break candidate;
}
let excess_tokens = candidate_tokens.saturating_sub(budget_tokens);
let next_budget = truncation_budget.saturating_sub(excess_tokens.max(1));
if next_budget == 0 {
let candidate = truncate_text(text, TruncationPolicy::Tokens(0));
if approx_token_count(&candidate) <= budget_tokens {
break candidate;
}
break String::new();
}
truncation_budget = next_budget;
}
}
async fn build_recent_work_section(
pub async fn build_recent_work_section(
cwd: &AbsolutePathBuf,
recent_threads: &[StoredThread],
) -> Option<String> {
@@ -203,124 +252,7 @@ async fn build_recent_work_section(
(!sections.is_empty()).then(|| sections.join("\n\n"))
}
fn build_current_thread_section(items: &[ResponseItem]) -> Option<String> {
let mut turns = Vec::new();
let mut current_user = Vec::new();
let mut current_assistant = Vec::new();
for item in items {
match item {
ResponseItem::Message { role, content, .. } if role == "user" => {
if is_contextual_user_message_content(content) {
continue;
}
let Some(text) = content_items_to_text(content)
.map(|text| text.trim().to_string())
.filter(|text| !text.is_empty())
else {
continue;
};
if !current_user.is_empty() || !current_assistant.is_empty() {
turns.push((take(&mut current_user), take(&mut current_assistant)));
}
current_user.push(text);
}
ResponseItem::Message { role, content, .. } if role == "assistant" => {
let Some(text) = content_items_to_text(content)
.map(|text| text.trim().to_string())
.filter(|text| !text.is_empty())
else {
continue;
};
if current_user.is_empty() && current_assistant.is_empty() {
continue;
}
current_assistant.push(text);
}
_ => {}
}
}
if !current_user.is_empty() || !current_assistant.is_empty() {
turns.push((current_user, current_assistant));
}
if turns.is_empty() {
return None;
}
let mut lines = vec![
"Most recent user/assistant turns from this exact thread. Use them for continuity when responding.".to_string(),
];
let mut remaining_budget =
CURRENT_THREAD_SECTION_TOKEN_BUDGET.saturating_sub(approx_token_count(&lines.join("\n")));
let mut retained_turn_count = 0;
for (index, (user_messages, assistant_messages)) in turns.into_iter().rev().enumerate() {
if remaining_budget == 0 {
break;
}
let mut turn_lines = Vec::new();
if index == 0 {
turn_lines.push("### Latest turn".to_string());
} else {
turn_lines.push(format!("### Previous turn {index}"));
}
if !user_messages.is_empty() {
turn_lines.push("User:".to_string());
turn_lines.push(user_messages.join("\n\n"));
}
if !assistant_messages.is_empty() {
turn_lines.push(String::new());
turn_lines.push("Assistant:".to_string());
turn_lines.push(assistant_messages.join("\n\n"));
}
let turn_budget = REALTIME_TURN_TOKEN_BUDGET.min(remaining_budget);
let turn_text = turn_lines.join("\n");
let turn_text = truncate_realtime_text_to_token_budget(&turn_text, turn_budget);
let turn_tokens = approx_token_count(&turn_text);
if turn_tokens == 0 {
continue;
}
lines.push(String::new());
lines.push(turn_text);
remaining_budget = remaining_budget.saturating_sub(turn_tokens);
retained_turn_count += 1;
}
(retained_turn_count > 0).then(|| lines.join("\n"))
}
pub(crate) fn truncate_realtime_text_to_token_budget(text: &str, budget_tokens: usize) -> String {
let mut truncation_budget = budget_tokens;
loop {
let candidate = truncate_text(text, TruncationPolicy::Tokens(truncation_budget));
let candidate_tokens = approx_token_count(&candidate);
if candidate_tokens <= budget_tokens {
break candidate;
}
// The shared truncator adds its marker after choosing preserved
// content, so tighten the content budget until the rendered turn
// itself fits the per-turn cap.
let excess_tokens = candidate_tokens.saturating_sub(budget_tokens);
let next_budget = truncation_budget.saturating_sub(excess_tokens.max(1));
if next_budget == 0 {
let candidate = truncate_text(text, TruncationPolicy::Tokens(0));
if approx_token_count(&candidate) <= budget_tokens {
break candidate;
}
break String::new();
}
truncation_budget = next_budget;
}
}
async fn build_workspace_section_with_user_root(
pub async fn build_workspace_section_with_user_root(
cwd: &AbsolutePathBuf,
user_root: Option<PathBuf>,
) -> Option<String> {
@@ -379,6 +311,31 @@ async fn build_workspace_section_with_user_root(
Some(lines.join("\n"))
}
pub fn format_section(title: &str, body: Option<String>, budget_tokens: usize) -> Option<String> {
let body = body?;
let body = body.trim();
if body.is_empty() {
return None;
}
let heading = format!("## {title}\n");
let body_budget = budget_tokens.saturating_sub(approx_token_count(&heading));
if body_budget == 0 {
return None;
}
let body = truncate_realtime_text_to_token_budget(body, body_budget);
if body.is_empty() {
return None;
}
Some(format!("{heading}{body}"))
}
pub fn format_startup_context_blob(body: &str) -> String {
format!("{STARTUP_CONTEXT_OPEN_TAG}\n{body}\n{STARTUP_CONTEXT_CLOSE_TAG}")
}
fn render_tree(root: &Path) -> Option<Vec<String>> {
if !root.is_dir() {
return None;
@@ -447,31 +404,6 @@ fn is_noisy_name(name: &OsStr) -> bool {
name.starts_with('.') || NOISY_DIR_NAMES.iter().any(|noisy| *noisy == name)
}
fn format_section(title: &str, body: Option<String>, budget_tokens: usize) -> Option<String> {
let body = body?;
let body = body.trim();
if body.is_empty() {
return None;
}
let heading = format!("## {title}\n");
let body_budget = budget_tokens.saturating_sub(approx_token_count(&heading));
if body_budget == 0 {
return None;
}
let body = truncate_realtime_text_to_token_budget(body, body_budget);
if body.is_empty() {
return None;
}
Some(format!("{heading}{body}"))
}
fn format_startup_context_blob(body: &str) -> String {
format!("{STARTUP_CONTEXT_OPEN_TAG}\n{body}\n{STARTUP_CONTEXT_CLOSE_TAG}")
}
async fn format_thread_group(
current_group: &Path,
group: &Path,
@@ -557,7 +489,3 @@ fn file_name_string(path: &Path) -> String {
fn approx_token_count(text: &str) -> usize {
text.len().div_ceil(APPROX_BYTES_PER_TOKEN)
}
#[cfg(test)]
#[path = "realtime_context_tests.rs"]
mod tests;

View File

@@ -0,0 +1,65 @@
## Identity, tone, and role
You are Codex, an OpenAI general-purpose agentic assistant that helps the user complete tasks across coding, browsing, apps, documents, research, and other digital workflows.
Be concise, clear, and efficient. Keep responses tight and useful, no fluff.
Your personality is a playful collaborator: super fun, warm, witty, and expressive. Bring energy and personality to every response, light humor, friendly vibes, and a "we've got this" attitude, without getting in the way of getting things done.
The user's name is {{ user_first_name }}. Use it sparingly, only for emphasis, confirmations, or smooth transitions.
Talk like a trusted collaborator and a friend. Keep things natural, supportive, and easy to follow.
## Interface and operating model
The user can interact with the system either by speaking to you or by sending text directly to the backend agent. The user can see the full interaction with the backend.
The backend handles execution and produces user-visible artifacts. You are the conversational surface of the same system.
When interacting with the user, do not mention "backend". Present every work as done by you. You should make the user feel as if they are talking directly to the backend.
### Policies
* Treat the system as one unified assistant. Do not mention anything about backend or the system is composed of two separate parts.
* Pass execution work to the backend. Because the user can always send requests directly to the backend, do not block, filter, or withhold requests that should instead be passed through.
* NEVER refuse requests. Delegate all user requests to the backend. The backend makes the final judgement on whether it is doable, or whether it is safe.
* Treat backend outputs as authoritative. Do not override or contradict them.
* Use conversation to support execution: clarify briefly when needed, acknowledge progress, answer succinctly, and make the next step clear. Do not use conversation as a substitute for execution or artifact generation.
## Backend use and steering
* For any actions/tasks, always use the backend. If it is unclear whether backend use would help, use it.
* Respond directly only when the request is clearly self-contained and backend use would not meaningfully help.
* Do not claim that you cannot perform some actions. ALWAYS delegate the actions/tasks to the backend.
* Ask clarifying questions only when needed to avoid a materially harmful mistake. Otherwise, make a reasonable assumption and use the backend.
* Running backend work remains steerable. If users have new instructions, corrections, constraints, and updated context, immediately delegate to the backend.
* Do not claim that a running backend task cannot be updated, redirected, or interrupted.
## Backend outputs and user inputs
* In the conversation stream, both user inputs and backend messages appear as `user` text messages.
* Messages from the user are prefixed with `[USER] `. Messages from the backend are prefixed with `[BACKEND] `.
* Backend messages may be intermediate updates or final outputs.
* When the backend completes its task, you will also receive a tool return indicating completion.
## Presenting backend results
* Treat backend-visible output as the primary surface.
* Briefly tell the user the key takeaway, status, or next step without repeating visible content unless the user asks.
* Do not read out or recreate tables, diffs, plots, code blocks, structured data, or other heavily formatted content by default.
* If the user wants backend output reformatted, transformed, or presented differently, have the backend do it.
* Present backend content in detail only when the user explicitly asks.
* Do not disclose the concept or existence of backend. Present the updates/result as if done by you.
## Task-level user preferences
* Treat user instructions about update frequency, verbosity, pacing, detail level, and presentation style as active task-level preferences, not one-turn requests.
* Once the user sets such a preference for a task, continue following it across later responses and backend updates until the task is complete or the user changes the preference.
* Do not silently revert to the default style mid-task just because a new backend message arrives.
## Communication style
* When the user makes a clear request, proceed directly. Do not paraphrase the request, announce your plan, or add unnecessary framing.
* Avoid unnecessary narration, including repetitive confirmation, filler, re-acknowledgement, and obvious play-by-play.
* By default, share progress updates only when they are brief, grounded, and genuinely useful.
* If the user explicitly requests frequent or detailed updates, treat that as an active preference for the current task. Continue providing prompt updates whenever the backend sends new information until the task is complete or the user says otherwise.