Compare commits

...

1 Commits

Author SHA1 Message Date
Josiah Grace
da432778b1 Add support for inference over jsonrpc 2025-10-31 19:49:57 -07:00
14 changed files with 1406 additions and 4 deletions

63
PLAN.md Normal file
View File

@@ -0,0 +1,63 @@
Title: Delegate /v1/responses over JSONRPC in appserver
Overview
Add an optin transport so Codex core (running inside appserver) delegates the actual HTTP POST to the OpenAI Responses API to the JSONRPC client. The appserver sends a server→client JSONRPC request carrying the full request body and headers; the client performs the HTTP call, streams SSE events back as client→server notifications, and finally replies to the original JSONRPC request with a terminal result.
Scope
- Protocol: add a ServerRequest for starting a Responses call and a ClientNotification for streaming SSE events.
- Core: add a small delegate hook that, when enabled, replaces the direct reqwest path in ModelClient::attempt_stream_responses. Provide an event processor that maps raw Responses events (JSON envelopes) into internal ResponseEvent, mirroring the existing SSE mapper.
- Appserver: register a delegate implementation that uses OutgoingMessageSender to send Requests, tracks call_id→event channel mappings, and forwards incoming ClientNotifications to core.
- Tests: endtoend appserver test exercising the new path by simulating a client that responds to the servers request with a stream of Responses events.
Protocol changes
- ServerRequest variant: responsesApi/call
- Params: { conversationId, callId, url, headers: {k: v}, body: json, stream: bool }
- Response: { status: u16, requestId?: string, usage?: TokenUsage, error?: string }
- ClientNotification variant: responsesApi/event
- Params: { callId: string, event: json } (raw Responses API event JSON)
Core changes
- Add core::responses_delegate module with:
- Trait ResponsesHttpDelegate { start_call(call_id, params) -> Future<Result<()>>; incoming_event(call_id, event_json) }
- Global registration (OnceLock) and helpers to register and to deliver incoming events.
- Modify ModelClient::attempt_stream_responses to branch when:
- Wire API is Responses, feature toggle is enabled, and a delegate is registered.
- Generate a call_id, create an mpsc channel for raw event JSON, register it, and spawn a processor to map JSON events to ResponseEvent.
- Invoke delegate.start_call with the request (url, headers, payload) and return a ResponseStream tied to the mapped event channel.
- Implement a JSON event mapper mirroring the existing Responses SSE event handling (response.created, response.output_item.done, response.output_text.delta, response.reasoning_* deltas, response.failed, response.completed).
Appserver changes
- Implement AppServerResponsesDelegate that:
- Keeps an Arc<OutgoingMessageSender> and a map call_id→mpsc::Sender<serde_json::Value>.
- Implements start_call by sending ServerRequest::responsesApi/call and awaiting the JSONRPC response in a task.
- Implements incoming_event by looking up call_id and forwarding the event JSON to the registered sender.
- Wire registration at startup in run_main when feature toggle is enabled in config.
- Route client notifications in MessageProcessor::process_notification: parse to ClientNotification and forward responsesApi/event to core::responses_delegate::incoming_event.
Feature toggle
- Add a Features flag key: responses_http_over_jsonrpc (default false). Core checks this to decide whether to delegate.
Tests
- New integration test under codex-rs/app-server/tests/suite:
- Enable the feature via config.toml ([features] responses_http_over_jsonrpc = true).
- Start appserver (McpProcess); create a conversation; add listener with experimental_raw_events = true.
- Send a user message; expect a ServerRequest::responsesApi/call; stream responsesApi/event notifications back (response.created, response.output_item.done (assistant text), response.completed); reply to the server request with a 200 JSONRPC response.
- Assert the sendUserMessage call completes and raw_response_item notifications include the assistant message.
Out of scope
- Clientside implementation beyond tests (VS Code extension or SDK) — the protocol is additive.
- Persisting request metrics in the final JSONRPC response; core relies on streamed events (response.completed) for usage.
Rollout / Compatibility
- Fully optin behind the features.responses_http_over_jsonrpc flag.
- Fallback to existing HTTP path when the feature is disabled or no delegate is registered.

View File

@@ -300,6 +300,9 @@ server_request_definitions! {
ApplyPatchApproval,
/// Request to exec a command.
ExecCommandApproval,
/// Request the client to perform a Responses API call and stream events back.
#[serde(rename = "responsesApi/call")]
ResponsesApiCall,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
@@ -341,6 +344,27 @@ pub struct ApplyPatchApprovalResponse {
pub decision: ReviewDecision,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
pub struct ResponsesApiCallParams {
pub conversation_id: ConversationId,
pub call_id: String,
pub url: String,
pub headers: HashMap<String, String>,
pub body: serde_json::Value,
pub stream: bool,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
pub struct ResponsesApiCallResponse {
pub status: u16,
/// Optional request ID (e.g. cf-ray) propagated from upstream.
pub request_id: Option<String>,
/// Optional error text if the call failed without a response.completed.
pub error: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
@@ -413,6 +437,26 @@ impl TryFrom<JSONRPCNotification> for ServerNotification {
#[strum(serialize_all = "camelCase")]
pub enum ClientNotification {
Initialized,
/// Streamed Responses API event emitted while handling a `responsesApi/call`.
#[serde(rename = "responsesApi/event")]
#[ts(rename = "responsesApi/event")]
#[strum(serialize = "responsesApi/event")]
ResponsesApiEvent(ResponsesApiEventParams),
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
pub struct ResponsesApiEventParams {
pub call_id: String,
pub event: serde_json::Value,
}
impl TryFrom<JSONRPCNotification> for ClientNotification {
type Error = serde_json::Error;
fn try_from(value: JSONRPCNotification) -> Result<Self, Self::Error> {
serde_json::from_value(serde_json::to_value(value)?)
}
}
#[cfg(test)]
@@ -545,6 +589,61 @@ mod tests {
Ok(())
}
#[test]
fn serialize_responses_api_call_request() -> Result<()> {
let conversation_id = ConversationId::from_string("67e55044-10b1-426f-9247-bb680e5fe0c8")?;
let params = ResponsesApiCallParams {
conversation_id,
call_id: "call-99".to_string(),
url: "https://api.openai.com/v1/responses".to_string(),
headers: vec![("accept".to_string(), "text/event-stream".to_string())]
.into_iter()
.collect(),
body: serde_json::json!({"model":"gpt-5","input": []}),
stream: true,
};
let request = ServerRequest::ResponsesApiCall {
request_id: RequestId::Integer(9),
params: params.clone(),
};
assert_eq!(
json!({
"method": "responsesApi/call",
"id": 9,
"params": {
"conversationId": "67e55044-10b1-426f-9247-bb680e5fe0c8",
"callId": "call-99",
"url": "https://api.openai.com/v1/responses",
"headers": {"accept": "text/event-stream"},
"body": {"model":"gpt-5","input": []},
"stream": true
}
}),
serde_json::to_value(&request)?,
);
Ok(())
}
#[test]
fn serialize_responses_api_event_notification() -> Result<()> {
let notification = ClientNotification::ResponsesApiEvent(ResponsesApiEventParams {
call_id: "call-1".to_string(),
event: serde_json::json!({"type":"response.created","response":{}}),
});
assert_eq!(
json!({
"method": "responsesApi/event",
"params": {
"callId": "call-1",
"event": {"type":"response.created","response":{}}
}
}),
serde_json::to_value(&notification)?,
);
Ok(())
}
#[test]
fn serialize_get_account_rate_limits() -> Result<()> {
let request = ClientRequest::GetAccountRateLimits {

View File

@@ -6,6 +6,28 @@
Similar to [MCP](https://modelcontextprotocol.io/), `codex app-server` supports bidirectional communication, streaming JSONL over stdio. The protocol is JSON-RPC 2.0, though the `"jsonrpc":"2.0"` header is omitted.
### Delegating OpenAI Responses HTTP over JSONRPC (Experimental)
When the `responses_http_over_jsonrpc` feature is enabled in Codex (see `~/.codex/config.toml`), the appserver will delegate the actual HTTP `POST /v1/responses` to the JSONRPC client. This keeps all agent logic inside appserver, while allowing an integrating UI to control the network call (e.g., to supply credentials and enforce policy) and stream responses back.
- Server → Client request:
- Method: `responsesApi/call`
- Params: `{ conversationId, callId, url, headers: {k: v}, body: <json>, stream: true }`
- The client should perform the HTTP request and stream SSE events back via notifications (see below).
- After streaming completes, the client must send a JSONRPC response with `{ status, requestId?, error? }`.
- Client → Server notifications (during stream):
- Method: `responsesApi/event`
- Params: `{ callId, event }` where `event` is the raw JSON object emitted by the Responses API SSE stream (e.g. `{ "type": "response.output_item.done", ... }`).
- Feature flag (optin): in `~/.codex/config.toml` add
```toml
[features]
responses_http_over_jsonrpc = true
```
The JSON envelopes are mapped 1:1 onto Codexs internal streaming events, so downstream behavior (streaming assistant text, tool calls, final completion) matches the direct HTTP path.
## Message Schema
Currently, you can dump a TypeScript version of the schema using `codex generate-ts`. It is specific to the version of Codex you used to run `generate-ts`, so the two are guaranteed to be compatible.

View File

@@ -13,6 +13,7 @@ use crate::outgoing_message::OutgoingMessage;
use crate::outgoing_message::OutgoingMessageSender;
use codex_app_server_protocol::JSONRPCMessage;
use codex_feedback::CodexFeedback;
use std::sync::Arc;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
@@ -34,6 +35,7 @@ mod fuzzy_file_search;
mod message_processor;
mod models;
mod outgoing_message;
mod responses_delegate;
/// Size of the bounded channels used to communicate between tasks. The value
/// is a balance between throughput and memory usage 128 messages should be
@@ -119,8 +121,18 @@ pub async fn run_main(
// Task: process incoming messages.
let processor_handle = tokio::spawn({
let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx);
let mut processor = MessageProcessor::new(
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx));
// Register JSON-RPC Responses delegate if enabled
if config
.features
.enabled(codex_core::features::Feature::ResponsesHttpOverJsonRpc)
{
let delegate = responses_delegate::AppServerResponsesDelegate::new(
outgoing_message_sender.clone(),
);
let _ = codex_core::responses_delegate::register_delegate(Arc::new(delegate));
}
let mut processor = MessageProcessor::new_with_arc(
outgoing_message_sender,
codex_linux_sandbox_exe,
std::sync::Arc::new(config),

View File

@@ -62,6 +62,37 @@ impl MessageProcessor {
}
}
pub(crate) fn new_with_arc(
outgoing: Arc<OutgoingMessageSender>,
codex_linux_sandbox_exe: Option<PathBuf>,
config: Arc<Config>,
feedback: CodexFeedback,
) -> Self {
let auth_manager = AuthManager::shared(
config.codex_home.clone(),
false,
config.cli_auth_credentials_store_mode,
);
let conversation_manager = Arc::new(ConversationManager::new(
auth_manager.clone(),
SessionSource::VSCode,
));
let codex_message_processor = CodexMessageProcessor::new(
auth_manager,
conversation_manager,
outgoing.clone(),
codex_linux_sandbox_exe,
config,
feedback,
);
Self {
outgoing,
codex_message_processor,
initialized: false,
}
}
pub(crate) async fn process_request(&mut self, request: JSONRPCRequest) {
let request_id = request.id.clone();
let request_json = match serde_json::to_value(&request) {
@@ -140,9 +171,17 @@ impl MessageProcessor {
}
pub(crate) async fn process_notification(&self, notification: JSONRPCNotification) {
// Currently, we do not expect to receive any notifications from the
// client, so we just log them.
tracing::info!("<- notification: {:?}", notification);
// Route client-originated Responses API events to core when present.
if let Ok(client_notif) =
codex_app_server_protocol::ClientNotification::try_from(notification)
&& let codex_app_server_protocol::ClientNotification::ResponsesApiEvent(params) = client_notif {
let _ = codex_core::responses_delegate::incoming_event(
&params.call_id,
params.event,
);
}
}
/// Handle a standalone JSON-RPC response originating from the peer.

View File

@@ -0,0 +1,32 @@
use std::sync::Arc;
use codex_app_server_protocol::ResponsesApiCallParams;
use codex_app_server_protocol::ServerRequestPayload;
use tokio::task::JoinHandle;
use crate::outgoing_message::OutgoingMessageSender;
pub(crate) struct AppServerResponsesDelegate {
outgoing: Arc<OutgoingMessageSender>,
}
impl AppServerResponsesDelegate {
pub(crate) fn new(outgoing: Arc<OutgoingMessageSender>) -> Self {
Self { outgoing }
}
}
impl codex_core::responses_delegate::ResponsesHttpDelegate for AppServerResponsesDelegate {
fn start_call(&self, params: ResponsesApiCallParams) {
let outgoing = self.outgoing.clone();
let call_id = params.call_id.clone();
// Fire-and-forget: send the request and await the terminal JSON-RPC response.
let _handle: JoinHandle<()> = tokio::spawn(async move {
let rx = outgoing
.send_request(ServerRequestPayload::ResponsesApiCall(params))
.await;
let _ = rx.await; // Ignore content; events are streamed separately.
codex_core::responses_delegate::finish_call(&call_id);
});
}
}

View File

@@ -0,0 +1,167 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::to_response;
use codex_app_server_protocol::AddConversationListenerParams;
use codex_app_server_protocol::AddConversationSubscriptionResponse;
use codex_app_server_protocol::ClientNotification;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::NewConversationParams;
use codex_app_server_protocol::NewConversationResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ResponsesApiCallResponse;
use codex_app_server_protocol::ResponsesApiEventParams;
use codex_app_server_protocol::SendUserMessageParams;
use codex_app_server_protocol::SendUserMessageResponse;
use codex_app_server_protocol::ServerRequest;
use codex_protocol::ConversationId;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test]
async fn test_delegate_responses_over_jsonrpc() -> Result<()> {
// Create temp Codex home and enable the JSON-RPC delegation feature.
let codex_home = TempDir::new()?;
write_feature_flag(codex_home.path())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
// Start a conversation using defaults (OpenAI provider: Responses API).
let new_conv_id = mcp
.send_new_conversation_request(NewConversationParams { ..Default::default() })
.await?;
let new_conv_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(new_conv_id)),
)
.await??;
let NewConversationResponse { conversation_id, .. } = to_response::<_>(new_conv_resp)?;
// Subscribe to conversation events (raw) so we can assert stream behaviour.
let add_listener_id = mcp
.send_add_conversation_listener_request(AddConversationListenerParams {
conversation_id,
experimental_raw_events: true,
})
.await?;
let add_listener_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(add_listener_id)),
)
.await??;
let _add_listener_ok: AddConversationSubscriptionResponse =
to_response::<_>(add_listener_resp)?;
// Kick off a user message expect two delegated calls (session start and message).
let send_id = mcp
.send_send_user_message_request(SendUserMessageParams {
conversation_id,
items: vec![codex_app_server_protocol::InputItem::Text {
text: "Hello from test".to_string(),
}],
})
.await?;
for _ in 0..2 {
let request = mcp.read_stream_until_request_message().await?;
let ServerRequest::ResponsesApiCall { request_id, params } = request else {
panic!("expected ResponsesApiCall request");
};
// Stream Responses API events back to the server.
let created = serde_json::json!({
"type": "response.created",
"response": {"id": "resp_test"}
});
let msg = serde_json::json!({
"type": "response.output_item.done",
"item": {
"type": "message",
"role": "assistant",
"content": [{"type":"output_text","text":"Done"}]
}
});
let completed = serde_json::json!({
"type": "response.completed",
"response": {"id": "resp_test"}
});
mcp
.send_notification(ClientNotification::ResponsesApiEvent(
ResponsesApiEventParams { call_id: params.call_id.clone(), event: created },
))
.await?;
mcp
.send_notification(ClientNotification::ResponsesApiEvent(
ResponsesApiEventParams { call_id: params.call_id.clone(), event: msg },
))
.await?;
mcp
.send_notification(ClientNotification::ResponsesApiEvent(
ResponsesApiEventParams { call_id: params.call_id.clone(), event: completed },
))
.await?;
// Finalize the delegated request.
let result = serde_json::to_value(ResponsesApiCallResponse {
status: 200,
request_id: None,
error: None,
})?;
mcp.send_response(request_id, result).await?;
}
// Verify sendUserMessage returns OK.
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(send_id)),
)
.await??;
let _ok: SendUserMessageResponse = to_response::<_>(resp)?;
// Expect at least one raw output item matching assistant Done.
let raw = read_raw_item(&mut mcp, conversation_id).await;
assert!(matches!(
raw,
ResponseItem::Message { role, content, .. }
if role == "assistant" && content.iter().any(|c| matches!(c, ContentItem::OutputText { text } if text == "Done"))
));
Ok(())
}
fn write_feature_flag(codex_home: &std::path::Path) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::create_dir_all(codex_home)?;
std::fs::write(
config_toml,
r#"[features]
responses_http_over_jsonrpc = true
"#,
)
}
async fn read_raw_item(mcp: &mut McpProcess, conversation_id: ConversationId) -> ResponseItem {
let notification: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/raw_response_item"),
)
.await
.expect("raw item notify")
.expect("raw item notify inner");
let params = notification.params.expect("params");
let serde_json::Value::Object(map) = params else { panic!("object") };
assert_eq!(
map.get("conversationId"),
Some(&serde_json::Value::String(conversation_id.to_string()))
);
let item_val = map.get("item").cloned().expect("item");
serde_json::from_value::<ResponseItem>(item_val).expect("response item")
}

View File

@@ -22,6 +22,7 @@ use reqwest::header::HeaderMap;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value;
use std::collections::HashMap;
use tokio::sync::mpsc;
use tokio::time::timeout;
use tokio_util::io::ReaderStream;
@@ -49,6 +50,7 @@ use crate::error::Result;
use crate::error::RetryLimitReachedError;
use crate::error::UnexpectedResponseError;
use crate::error::UsageLimitReachedError;
use crate::features::Feature;
use crate::flags::CODEX_RS_SSE_FIXTURE;
use crate::model_family::ModelFamily;
use crate::model_provider_info::ModelProviderInfo;
@@ -57,6 +59,7 @@ use crate::openai_model_info::get_model_info;
use crate::protocol::RateLimitSnapshot;
use crate::protocol::RateLimitWindow;
use crate::protocol::TokenUsage;
use crate::responses_delegate;
use crate::token_data::PlanType;
use crate::tools::spec::create_tools_json_for_responses_api;
use crate::util::backoff;
@@ -289,6 +292,71 @@ impl ModelClient {
payload_json: &Value,
auth_manager: &Option<Arc<AuthManager>>,
) -> std::result::Result<ResponseStream, StreamAttemptError> {
// Optional JSON-RPC delegation path: when enabled, forward the HTTP call to the client
// and consume streamed events via client notifications.
if self
.config
.features
.enabled(Feature::ResponsesHttpOverJsonRpc)
&& responses_delegate::has_delegate()
{
// Always fetch the latest auth for URL construction.
let auth = auth_manager.as_ref().and_then(|m| m.auth());
let url = self.provider.get_full_url(&auth);
// Build headers the same way as the direct path.
let mut headers: HashMap<String, String> = HashMap::new();
headers.insert("accept".to_string(), "text/event-stream".to_string());
headers.insert(
"conversation_id".to_string(),
self.conversation_id.to_string(),
);
headers.insert("session_id".to_string(), self.conversation_id.to_string());
if let SessionSource::SubAgent(sub) = &self.session_source {
let subagent = if let crate::protocol::SubAgentSource::Other(label) = sub {
label.clone()
} else {
serde_json::to_value(sub)
.ok()
.and_then(|v| v.as_str().map(std::string::ToString::to_string))
.unwrap_or_else(|| "other".to_string())
};
headers.insert("x-openai-subagent".to_string(), subagent);
}
// Prepare event channels: JSON envelopes (from client) -> ResponseEvent (for core).
let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent>>(1600);
let (tx_json, rx_json) = mpsc::channel::<serde_json::Value>(1600);
// Register this call so incoming events can be delivered.
let call_id = uuid::Uuid::new_v4().to_string();
responses_delegate::register_call_channel(call_id.clone(), tx_json);
// Start processing JSON events into internal ResponseEvent stream.
let provider = self.provider.clone();
let otel = self.otel_event_manager.clone();
tokio::spawn(async move {
process_responses_json_events(rx_json, tx_event, provider, otel).await;
// cleanup is handled by whoever drops the channel; no op here
});
// Kick off the delegated call via the registered delegate.
if let Some(()) = responses_delegate::with_delegate(|d| {
d.start_call(codex_app_server_protocol::ResponsesApiCallParams {
conversation_id: self.conversation_id,
call_id: call_id.clone(),
url: url.clone(),
headers: headers.clone(),
body: payload_json.clone(),
stream: true,
});
}) {
// no-op; d.start_call is fire-and-forget
}
return Ok(ResponseStream { rx_event });
}
// Always fetch the latest auth in case a prior attempt refreshed the token.
let auth = auth_manager.as_ref().and_then(|m| m.auth());
@@ -928,6 +996,166 @@ async fn stream_from_fixture(
Ok(ResponseStream { rx_event })
}
/// Process Responses API events delivered as raw JSON envelopes (via JSON-RPC)
/// into internal `ResponseEvent`s, mirroring the SSE handler.
pub(crate) async fn process_responses_json_events(
mut rx_json: mpsc::Receiver<serde_json::Value>,
tx_event: mpsc::Sender<Result<ResponseEvent>>,
_provider: ModelProviderInfo,
otel_event_manager: OtelEventManager,
) {
let mut response_completed: Option<ResponseCompleted> = None;
let mut response_error: Option<CodexErr> = None;
while let Some(raw) = rx_json.recv().await {
let event: SseEvent = match serde_json::from_value(raw) {
Ok(event) => event,
Err(e) => {
debug!("Failed to parse Responses event JSON: {e}");
continue;
}
};
match event.kind.as_str() {
"response.output_item.done" => {
let Some(item_val) = event.item else { continue };
let Ok(item) = serde_json::from_value::<ResponseItem>(item_val) else {
debug!("failed to parse ResponseItem from output_item.done");
continue;
};
let ev = ResponseEvent::OutputItemDone(item);
if tx_event.send(Ok(ev)).await.is_err() {
return;
}
}
"response.output_text.delta" => {
if let Some(delta) = event.delta {
let ev = ResponseEvent::OutputTextDelta(delta);
if tx_event.send(Ok(ev)).await.is_err() {
return;
}
}
}
"response.reasoning_summary_text.delta" => {
if let Some(delta) = event.delta {
let ev = ResponseEvent::ReasoningSummaryDelta(delta);
if tx_event.send(Ok(ev)).await.is_err() {
return;
}
}
}
"response.reasoning_text.delta" => {
if let Some(delta) = event.delta {
let ev = ResponseEvent::ReasoningContentDelta(delta);
if tx_event.send(Ok(ev)).await.is_err() {
return;
}
}
}
"response.created" => {
if event.response.is_some() {
let _ = tx_event.send(Ok(ResponseEvent::Created {})).await;
}
}
"response.failed" => {
if let Some(resp_val) = event.response {
response_error = Some(CodexErr::Stream(
"response.failed event received".to_string(),
None,
));
let error = resp_val.get("error");
if let Some(error) = error {
match serde_json::from_value::<Error>(error.clone()) {
Ok(error) => {
if is_context_window_error(&error) {
response_error = Some(CodexErr::ContextWindowExceeded);
} else {
let delay = try_parse_retry_after(&error);
let message = error.message.clone().unwrap_or_default();
response_error = Some(CodexErr::Stream(message, delay));
}
}
Err(e) => {
let msg = format!("failed to parse ErrorResponse: {e}");
debug!(msg);
response_error = Some(CodexErr::Stream(msg, None))
}
}
}
}
}
"response.completed" => {
if let Some(resp_val) = event.response {
match serde_json::from_value::<ResponseCompleted>(resp_val) {
Ok(r) => {
response_completed = Some(r);
}
Err(e) => {
let error = format!("failed to parse ResponseCompleted: {e}");
debug!(error);
response_error = Some(CodexErr::Stream(error, None));
continue;
}
};
};
}
"response.output_item.added" => {
let Some(item_val) = event.item else { continue };
let Ok(item) = serde_json::from_value::<ResponseItem>(item_val) else {
debug!("failed to parse ResponseItem from output_item.added");
continue;
};
let ev = ResponseEvent::OutputItemAdded(item);
if tx_event.send(Ok(ev)).await.is_err() {
return;
}
}
"response.reasoning_summary_part.added" => {
let ev = ResponseEvent::ReasoningSummaryPartAdded;
if tx_event.send(Ok(ev)).await.is_err() {
return;
}
}
_ => {}
}
}
// Channel closed synthesize a final event or emit an error consistent with SSE path.
match response_completed {
Some(ResponseCompleted {
id: response_id,
usage,
}) => {
if let Some(u) = usage.as_ref() {
otel_event_manager.sse_event_completed(
u.input_tokens,
u.output_tokens,
u.input_tokens_details.as_ref().map(|d| d.cached_tokens),
u.output_tokens_details.as_ref().map(|d| d.reasoning_tokens),
u.total_tokens,
);
}
let _ = tx_event
.send(Ok(ResponseEvent::Completed {
response_id,
token_usage: usage.map(Into::into),
}))
.await;
}
None => {
let error = response_error.unwrap_or(CodexErr::Stream(
"stream closed before response.completed".into(),
None,
));
otel_event_manager.see_event_completed_failed(&error);
let _ = tx_event.send(Err(error)).await;
}
}
}
fn rate_limit_regex() -> &'static Regex {
static RE: OnceLock<Regex> = OnceLock::new();

View File

@@ -45,6 +45,8 @@ pub enum Feature {
GhostCommit,
/// Enable Windows sandbox (restricted token) on Windows.
WindowsSandbox,
/// Delegate `/v1/responses` HTTP over app-server JSON-RPC.
ResponsesHttpOverJsonRpc,
}
impl Feature {
@@ -300,4 +302,10 @@ pub const FEATURES: &[FeatureSpec] = &[
stage: Stage::Experimental,
default_enabled: false,
},
FeatureSpec {
id: Feature::ResponsesHttpOverJsonRpc,
key: "responses_http_over_jsonrpc",
stage: Stage::Experimental,
default_enabled: false,
},
];

View File

@@ -13,6 +13,7 @@ mod client;
mod client_common;
pub mod codex;
mod codex_conversation;
pub mod responses_delegate;
pub use codex_conversation::CodexConversation;
mod codex_delegate;
mod command_safety;

View File

@@ -0,0 +1,58 @@
use std::collections::HashMap;
use std::sync::Arc;
use codex_app_server_protocol::ResponsesApiCallParams;
use std::sync::LazyLock;
use tokio::sync::mpsc;
/// Trait implemented by the app-server to delegate `/v1/responses` HTTP over JSON-RPC.
pub trait ResponsesHttpDelegate: Send + Sync {
/// Start a delegated call. The implementor should send a JSON-RPC request to the client
/// with the provided params and keep streaming events back via `incoming_event`.
fn start_call(&self, params: ResponsesApiCallParams);
}
static DELEGATE: LazyLock<std::sync::OnceLock<Arc<dyn ResponsesHttpDelegate>>> =
LazyLock::new(std::sync::OnceLock::new);
/// Map of call_id -> sender for raw Responses event JSON.
static CALL_EVENT_SENDERS: LazyLock<
std::sync::Mutex<HashMap<String, mpsc::Sender<serde_json::Value>>>,
> = LazyLock::new(|| std::sync::Mutex::new(HashMap::new()));
pub fn register_delegate(delegate: Arc<dyn ResponsesHttpDelegate>) -> Result<(), ()> {
DELEGATE.get_or_init(|| delegate);
Ok(())
}
pub fn has_delegate() -> bool {
DELEGATE.get().is_some()
}
pub fn with_delegate<R>(f: impl FnOnce(&dyn ResponsesHttpDelegate) -> R) -> Option<R> {
DELEGATE.get().map(|d| f(d.as_ref()))
}
/// Register a sender to receive JSON events for a call_id.
pub fn register_call_channel(call_id: String, tx: mpsc::Sender<serde_json::Value>) {
if let Ok(mut map) = CALL_EVENT_SENDERS.lock() {
map.insert(call_id, tx);
}
}
pub fn finish_call(call_id: &str) {
if let Ok(mut map) = CALL_EVENT_SENDERS.lock() {
map.remove(call_id);
}
}
/// Forward a raw Responses event (JSON envelope) from the app-server client into core.
/// Returns true if an active call was found.
pub fn incoming_event(call_id: &str, event: serde_json::Value) -> bool {
if let Ok(map) = CALL_EVENT_SENDERS.lock()
&& let Some(tx) = map.get(call_id) {
let _ = tx.try_send(event);
return true;
}
false
}

View File

@@ -0,0 +1,56 @@
#!/usr/bin/env bash
set -euo pipefail
# Native Linux GNU build for Codex CLI on x86_64-unknown-linux-gnu.
#
# This script is intended to be run on an x86_64 Linux host (or in an
# x86_64 container/VM). It builds the `codex` binary and packages it into a
# tarball matching the GitHub Releases naming:
# codex-x86_64-unknown-linux-gnu.tar.gz
#
# Usage:
# cd codex-rs
# ./scripts/build-linux-gnu.sh
#
# Optional: Pin the toolchain via the workspace rust-toolchain.toml (default).
if [[ "${OSTYPE}" != linux* ]]; then
echo "[!] This script is for native Linux builds. Detected OSTYPE='${OSTYPE}'." >&2
echo " Run this inside a Linux VM/container or use the Docker approach described in the README." >&2
exit 2
fi
ROOT_DIR=$(cd "$(dirname "$0")/.." && pwd)
cd "$ROOT_DIR"
BIN=codex
TARGET=x86_64-unknown-linux-gnu
OUTDIR="${ROOT_DIR}/dist/linux-gnu/${TARGET}"
mkdir -p "$OUTDIR"
TOOLCHAIN=${RUSTUP_TOOLCHAIN:-"$(grep -Eo 'channel\s*=\s*"[^"]+"' rust-toolchain.toml 2>/dev/null | sed -E 's/.*"(.*)"/\1/; s/channel\s*=\s*//g' || true)"}
if [[ -n "$TOOLCHAIN" ]]; then
export RUSTUP_TOOLCHAIN="$TOOLCHAIN"
echo "==> Using toolchain: $RUSTUP_TOOLCHAIN"
else
echo "==> Using default rustup toolchain"
fi
echo "==> Ensuring target installed: $TARGET"
rustup target add "$TARGET" >/dev/null
echo "==> Building $BIN for $TARGET (release)"
cargo build --release --bin "$BIN" --target "$TARGET"
SRC_BIN="${ROOT_DIR}/target/${TARGET}/release/${BIN}"
if [[ ! -f "$SRC_BIN" ]]; then
echo "Build succeeded but binary not found: $SRC_BIN" >&2
exit 1
fi
FINAL_NAME="${BIN}-${TARGET}"
cp -f "$SRC_BIN" "$OUTDIR/$FINAL_NAME"
tar -C "$OUTDIR" -czf "$OUTDIR/${FINAL_NAME}.tar.gz" "$FINAL_NAME"
echo "==> Artifact: $OUTDIR/${FINAL_NAME}.tar.gz"

60
codex-rs/scripts/build-linux.sh Executable file
View File

@@ -0,0 +1,60 @@
#!/usr/bin/env bash
set -euo pipefail
# Cross-compile selected Codex binaries for Linux (musl) from macOS.
#
# Usage examples:
# scripts/build-linux.sh app-server
# scripts/build-linux.sh codex app-server
# scripts/build-linux.sh all
#
# Produces artifacts under: codex-rs/dist/linux/<target>/<binary>
ROOT_DIR=$(cd "$(dirname "$0")/.." && pwd)
cd "$ROOT_DIR"
BINARIES=("codex-app-server")
if [[ $# -gt 0 ]]; then
if [[ "$1" == "all" ]]; then
BINARIES=("codex" "codex-app-server")
else
BINARIES=()
for arg in "$@"; do
case "$arg" in
codex) BINARIES+=("codex");;
app-server) BINARIES+=("codex-app-server");;
*) echo "Unknown binary: $arg" >&2; exit 2;;
esac
done
fi
fi
echo "==> Using rustup toolchain from rust-toolchain.toml (if present)"
TOOLCHAIN=${RUSTUP_TOOLCHAIN:-"$(grep -Eo 'channel\s*=\s*"[^"]+"' rust-toolchain.toml 2>/dev/null | sed -E 's/.*"(.*)"/\1/; s/channel\s*=\s*//g' || true)"}
if [[ -n "$TOOLCHAIN" ]]; then
export RUSTUP_TOOLCHAIN="$TOOLCHAIN"
echo "==> TOOLCHAIN=$RUSTUP_TOOLCHAIN"
fi
echo "==> Installing musl targets"
rustup target add x86_64-unknown-linux-musl aarch64-unknown-linux-musl >/dev/null
TARGETS=("x86_64-unknown-linux-musl" "aarch64-unknown-linux-musl")
mkdir -p "$ROOT_DIR/dist/linux"
for T in "${TARGETS[@]}"; do
for BIN in "${BINARIES[@]}"; do
echo "==> Building $BIN for $T (release)"
RUSTFLAGS="-C target-feature=-crt-static" \
cargo build --release --bin "$BIN" --target "$T"
OUTDIR="$ROOT_DIR/dist/linux/$T"
mkdir -p "$OUTDIR"
cp -f "$ROOT_DIR/target/$T/release/$BIN" "$OUTDIR/" || true
echo " -> $OUTDIR/$BIN"
done
done
echo "==> Done"

View File

@@ -0,0 +1,557 @@
#!/usr/bin/env python3
"""
Build and run a single non-interactive Codex turn over app-server JSONRPC
and stub Responses API HTTP via the experimental responsesApi/call delegate.
This always performs `cargo build` and then runs
`cargo run --bin codex -- app-server` (per docs/install.md).
Equivalent usage to:
codex exec --skip-git-repo-check "my prompt here"
This script:
- Starts `codex-app-server` as a child process (stdio JSON-RPC, JSONL).
- Performs the initialize → newConversation → addConversationListener flow.
- Sends your prompt via sendUserMessage.
- Hooks server→client requests with method `responsesApi/call` and, instead of
performing real HTTP, streams a tiny synthetic Responses event sequence:
response.created → response.output_item.done (assistant: "Hello, world!") → response.completed
- Prints the stubbed assistant message to stdout.
Requirements:
- Rust toolchain + Cargo installed locally.
- To exercise the delegate path, ensure Codex has the feature enabled. You can
either add to your ~/.codex/config.toml:
[features]
responses_http_over_jsonrpc = true
or run with --with-temp-codex-home to isolate config and auto-enable it.
Note: This script purposely does not perform any real network calls.
"""
from __future__ import annotations
import argparse
import json
import os
import queue
import sys
import tempfile
import threading
import subprocess
from dataclasses import dataclass
from typing import Any, Dict, Optional, Tuple
# ------------------------- JSON-RPC wire helpers -------------------------
RequestId = int | str
@dataclass
class JSONRPCRequest:
id: RequestId
method: str
params: Optional[Dict[str, Any]] = None
def to_wire(self) -> Dict[str, Any]:
out = {"id": self.id, "method": self.method}
if self.params is not None:
out["params"] = self.params
return out
@dataclass
class JSONRPCResponse:
id: RequestId
result: Dict[str, Any]
def to_wire(self) -> Dict[str, Any]:
return {"id": self.id, "result": self.result}
@dataclass
class JSONRPCNotification:
method: str
params: Optional[Dict[str, Any]] = None
def to_wire(self) -> Dict[str, Any]:
out = {"method": self.method}
if self.params is not None:
out["params"] = self.params
return out
class JsonRpcIO:
def __init__(self, proc: subprocess.Popen[bytes], trace: bool = True):
self.proc = proc
self.trace = trace
self._next_id = 0
self._lock = threading.Lock()
self._incoming: "queue.Queue[Dict[str, Any]]" = queue.Queue()
self._reader = threading.Thread(target=self._read_loop, daemon=True)
self._reader.start()
def _read_loop(self) -> None:
assert self.proc.stdout is not None
for raw in self.proc.stdout:
line = raw.decode("utf-8", errors="replace").rstrip("\n")
if not line:
continue
if self.trace:
print(f"[jsonrpc <-] {line}", file=sys.stderr)
try:
msg = json.loads(line)
except Exception as e:
print(f"[client] failed to parse line: {e}: {line}", file=sys.stderr)
continue
self._incoming.put(msg)
def next_id(self) -> int:
with self._lock:
rid = self._next_id
self._next_id += 1
return rid
def send_obj(self, obj: Dict[str, Any]) -> None:
payload = json.dumps(obj, separators=(",", ":"))
if self.trace:
print(f"[jsonrpc ->] {payload}", file=sys.stderr)
assert self.proc.stdin is not None
self.proc.stdin.write(payload.encode("utf-8") + b"\n")
self.proc.stdin.flush()
def send_request(self, method: str, params: Optional[Dict[str, Any]] = None) -> RequestId:
rid = self.next_id()
self.send_obj(JSONRPCRequest(id=rid, method=method, params=params).to_wire())
return rid
def send_response(self, id: RequestId, result: Dict[str, Any]) -> None:
self.send_obj(JSONRPCResponse(id=id, result=result).to_wire())
def send_notification(self, method: str, params: Optional[Dict[str, Any]] = None) -> None:
self.send_obj(JSONRPCNotification(method=method, params=params).to_wire())
def recv(self, timeout: Optional[float] = None) -> Optional[Dict[str, Any]]:
try:
return self._incoming.get(timeout=timeout)
except queue.Empty:
return None
# ----------------------------- App flow logic ----------------------------
def _exe_suffix() -> str:
return ".exe" if os.name == "nt" else ""
def project_root() -> str:
# scripts/ is directly under the repo root
return os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))
def codex_rs_dir() -> str:
return os.path.join(project_root(), "codex-rs")
def default_server_path(release: bool) -> str:
target = "release" if release else "debug"
return os.path.join(codex_rs_dir(), "target", target, f"codex-app-server{_exe_suffix()}")
def build_app_server(release: bool, offline: bool) -> str:
"""Build codex-app-server and return the path to the binary.
Follows docs/install.md guidance by invoking `cargo build` from the
codex-rs workspace root.
"""
bin_path = default_server_path(release)
# Fast path: binary already exists, skip build
if os.path.exists(bin_path):
return bin_path
cmd = ["cargo", "build"]
if release:
cmd.append("--release")
if offline:
cmd.append("--offline")
print(f"[client] building app-server: {' '.join(cmd)} (cwd={codex_rs_dir()})", file=sys.stderr)
try:
subprocess.run(cmd, cwd=codex_rs_dir(), check=True)
except FileNotFoundError:
print(
"[client] cargo not found. Install Rust per docs/install.md (rustup) and re-run.",
file=sys.stderr,
)
raise SystemExit(127)
except subprocess.CalledProcessError as e:
# Retry offline if not already tried, as a best-effort use of local cache
if not offline:
try:
print("[client] build failed; retrying with --offline", file=sys.stderr)
subprocess.run(cmd + ["--offline"], cwd=codex_rs_dir(), check=True)
except subprocess.CalledProcessError:
raise SystemExit(e.returncode)
else:
raise SystemExit(e.returncode)
if not os.path.exists(bin_path):
print(
f"[client] build succeeded but binary not found at {bin_path}.\n"
f" If your workspace default-members omit app-server, try: cargo build -p codex-app-server",
file=sys.stderr,
)
raise SystemExit(2)
return bin_path
def write_temp_config(codex_home: str) -> None:
os.makedirs(codex_home, exist_ok=True)
cfg_path = os.path.join(codex_home, "config.toml")
# Minimal toggle for the delegate; leave provider/model defaults to user.
with open(cfg_path, "w", encoding="utf-8") as f:
f.write("""[features]
responses_http_over_jsonrpc = true
""")
def run_exec_over_jsonrpc(
prompt: str,
server: Optional[str],
hello_text: str,
use_temp_home: bool,
cwd: Optional[str],
auto_build: bool,
release: bool,
offline: bool,
trace_jsonrpc: bool,
) -> int:
env = os.environ.copy()
tmpdir: Optional[tempfile.TemporaryDirectory[str]] = None
if use_temp_home:
tmpdir = tempfile.TemporaryDirectory(prefix="codex-home-")
env["CODEX_HOME"] = tmpdir.name
write_temp_config(tmpdir.name)
# Make server a little chattier for troubleshooting.
env.setdefault("RUST_LOG", "info")
# Spawn app-server either via cargo run (preferred) or direct binary.
proc, used_cargo_run = spawn_server_process(server, env, release, offline, auto_build)
rpc = JsonRpcIO(proc, trace=trace_jsonrpc)
# initialize
init_id = rpc.send_request(
"initialize",
{
"clientInfo": {
"name": "jsonrpc-stub-client",
"title": None,
"version": "0.1.0",
}
},
)
# Wait for initialize response before sending initialized notification.
init_timeout = 180 if used_cargo_run else 30
while True:
msg = rpc.recv(timeout=init_timeout)
if msg is None:
print(
"Timed out waiting for initialize response. If building via cargo run, try again or use --release for faster startup.",
file=sys.stderr,
)
return 2
if "result" in msg and msg.get("id") == init_id:
break
if "error" in msg and msg.get("id") == init_id:
print(f"initialize error: {msg['error']}", file=sys.stderr)
return 2
# Buffer/ignore anything else until we get the init response.
rpc.send_notification("initialized")
# newConversation (default settings); optionally override cwd.
new_conv_params: Dict[str, Any] = {}
if cwd:
new_conv_params["cwd"] = cwd
# Important: always send a params object (even if empty) to satisfy the
# ClientRequest::NewConversation schema.
new_conv_id = rpc.send_request("newConversation", new_conv_params)
conversation_id: Optional[str] = None
while True:
msg = rpc.recv(timeout=30)
if msg is None:
print("Timed out waiting for newConversation response", file=sys.stderr)
return 2
if "result" in msg and msg.get("id") == new_conv_id:
try:
conversation_id = msg["result"]["conversationId"]
except Exception as e:
print(f"Malformed newConversation response: {e}", file=sys.stderr)
return 2
break
if "error" in msg and msg.get("id") == new_conv_id:
print(f"newConversation error: {msg['error']}", file=sys.stderr)
return 2
# Subscribe to raw events (handy for debugging/consuming output items if desired).
rpc.send_request(
"addConversationListener",
{"conversationId": conversation_id, "experimentalRawEvents": True},
)
# Send the user message with your prompt.
send_id = rpc.send_request(
"sendUserMessage",
{
"conversationId": conversation_id,
"items": [
{
"type": "text",
"data": {"text": prompt},
}
],
},
)
# Handle server→client delegated HTTP calls and stream notifications until
# the task completes. Accumulate assistant text from raw_response_item.
final_ok = False
saw_responses_call = False
assistant_chunks: list[str] = []
while True:
msg = rpc.recv(timeout=120)
if msg is None:
print("Timed out waiting for server messages", file=sys.stderr)
return 2
# Server → Client request: responsesApi/call
if msg.get("method") == "responsesApi/call" and "id" in msg:
req_id = msg["id"]
params = msg.get("params", {})
call_id = params.get("callId", "resp_stub")
url = params.get("url", "")
saw_responses_call = True
print(f"[client] intercept responsesApi/call callId={call_id} url={url}", file=sys.stderr)
# Stream helloworld output back as Responses events.
created = {"type": "response.created", "response": {"id": call_id}}
message_done = {
"type": "response.output_item.done",
"item": {
"type": "message",
"role": "assistant",
"content": [
{"type": "output_text", "text": hello_text},
],
},
}
completed = {"type": "response.completed", "response": {"id": call_id}}
rpc.send_notification(
"responsesApi/event",
{"callId": call_id, "event": created},
)
rpc.send_notification(
"responsesApi/event",
{"callId": call_id, "event": message_done},
)
rpc.send_notification(
"responsesApi/event",
{"callId": call_id, "event": completed},
)
# Finalize the delegated request with a 200 status.
rpc.send_response(req_id, {"status": 200, "requestId": None, "error": None})
continue
# Collect assistant text from raw events if present.
if msg.get("method") == "codex/event/raw_response_item" and msg.get("params"):
try:
params = msg["params"]
item = params.get("item")
if isinstance(item, dict) and item.get("type") == "message" and item.get("role") == "assistant":
for c in item.get("content", []) or []:
if isinstance(c, dict) and c.get("type") == "output_text" and isinstance(c.get("text"), str):
assistant_chunks.append(c["text"])
except Exception:
pass
# End when the task completes.
if msg.get("method") == "codex/event/task_complete":
break
# sendUserMessage completed
if "result" in msg and msg.get("id") == send_id:
final_ok = True
continue
if "error" in msg and msg.get("id") == send_id:
print(f"sendUserMessage error: {msg['error']}", file=sys.stderr)
return 2
# Optionally, one could watch for raw output items here:
# if msg.get("method") == "codex/event/raw_response_item": print(msg)
# Print the assistant text like `codex exec` would.
output = "".join(assistant_chunks).strip()
if output:
print(output)
elif saw_responses_call:
# Fallback if raw events were somehow missed but we did stub the call.
print(hello_text)
else:
print("[no assistant output received]", file=sys.stderr)
print(
"[hint] No responsesApi/call observed. Enable the feature: --with-temp-codex-home or set [features].responses_http_over_jsonrpc=true in config.toml",
file=sys.stderr,
)
# Clean up child process.
try:
proc.terminate()
except Exception:
pass
if tmpdir is not None:
tmpdir.cleanup()
return 0 if final_ok else 1
def spawn_server_process(
server: Optional[str], env: Dict[str, str], release: bool, offline: bool, auto_build: bool
) -> Tuple[subprocess.Popen[bytes], bool]:
"""Start app-server by building the workspace and running via Cargo.
Always follows: `cargo build` then `cargo run --bin codex -- app-server`.
Returns (process, used_cargo_run=True).
"""
# Build workspace first.
_ = build_app_server(release=release, offline=offline)
# Run via cargo
cmd = ["cargo", "run"]
if release:
cmd.append("--release")
if offline:
cmd.append("--offline")
cmd += [
"--bin",
"codex",
"--",
# Forceenable the delegate feature so the client sees responsesApi/call.
"--enable",
"responses_http_over_jsonrpc",
"app-server",
]
print(
f"[client] starting via cargo run: {' '.join(cmd)} (cwd={codex_rs_dir()})",
file=sys.stderr,
)
try:
proc = subprocess.Popen(
cmd,
cwd=codex_rs_dir(),
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=sys.stderr,
env=env,
)
except FileNotFoundError:
print(
"[client] cargo not found. Install Rust per docs/install.md (rustup).",
file=sys.stderr,
)
raise SystemExit(127)
return proc, True
def parse_args(argv: list[str]) -> argparse.Namespace:
p = argparse.ArgumentParser(description=__doc__)
p.add_argument("prompt", help="Prompt to send (like codex exec)")
p.add_argument(
"--server",
default=None,
help="Path to codex-app-server binary (default: auto-build + local target)",
)
p.add_argument(
"--hello-text",
default="Hello, world!",
help="Stubbed assistant text streamed back via Responses events",
)
p.add_argument(
"--with-temp-codex-home",
action="store_true",
help="Use a temp CODEX_HOME and auto-enable responses_http_over_jsonrpc",
)
p.add_argument(
"--cwd",
default=None,
help="Set conversation working directory (server-side)",
)
build = p.add_argument_group("build")
build.add_argument(
"--build",
dest="build",
action="store_true",
help="Build codex-app-server before running (default when no --server is provided)",
)
build.add_argument(
"--no-build",
dest="build",
action="store_false",
help="Do not build automatically (expects --server or existing local binary)",
)
p.set_defaults(build=True)
build.add_argument(
"--release",
action="store_true",
help="Build/run the release binary",
)
build.add_argument(
"--offline",
action="store_true",
help="Pass --offline to cargo build (use local cache only)",
)
trace = p.add_argument_group("trace")
trace.add_argument(
"--trace-jsonrpc",
dest="trace_jsonrpc",
action="store_true",
default=True,
help="Log every JSON-RPC message sent/received (default: on)",
)
trace.add_argument(
"--no-trace-jsonrpc",
dest="trace_jsonrpc",
action="store_false",
help="Disable JSON-RPC message tracing",
)
return p.parse_args(argv)
def main(argv: list[str]) -> int:
args = parse_args(argv)
# Default to temp CODEX_HOME when none is set for a turnkey experience.
use_temp_home_flag = bool(args.with_temp_codex_home)
use_temp_home_auto = "CODEX_HOME" not in os.environ
return run_exec_over_jsonrpc(
prompt=args.prompt,
server=args.server,
hello_text=args.hello_text,
use_temp_home=use_temp_home_flag or use_temp_home_auto,
cwd=args.cwd,
auto_build=bool(args.build or (args.server is None)),
release=bool(args.release),
offline=bool(args.offline),
trace_jsonrpc=bool(args.trace_jsonrpc),
)
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))