Compare commits

...

5 Commits

Author SHA1 Message Date
aibrahim-oai
26aff08990 Merge branch 'main' into codex/implement-mcp-support-for-apply-patch/exec 2025-07-18 11:49:46 -07:00
Michael Bolin
cc874c9205 chore: use AtomicBool instead of Mutex<bool> (#1616) 2025-07-18 11:13:34 -07:00
pakrym-oai
6f2b01bb6b feat: ensure session ID header is sent in Response API request (#1614)
Include the current session id in Responses API requests.
2025-07-18 09:59:07 -07:00
Ahmed Ibrahim
08124dfd70 approval 2025-07-17 22:38:52 -07:00
aibrahim-oai
ea2c30bd42 Add stub MCP approval handler 2025-07-17 21:39:58 -07:00
6 changed files with 309 additions and 69 deletions

View File

@@ -15,6 +15,7 @@ use tokio_util::io::ReaderStream;
use tracing::debug;
use tracing::trace;
use tracing::warn;
use uuid::Uuid;
use crate::chat_completions::AggregateStreamExt;
use crate::chat_completions::stream_chat_completions;
@@ -44,6 +45,7 @@ pub struct ModelClient {
config: Arc<Config>,
client: reqwest::Client,
provider: ModelProviderInfo,
session_id: Uuid,
effort: ReasoningEffortConfig,
summary: ReasoningSummaryConfig,
}
@@ -54,11 +56,13 @@ impl ModelClient {
provider: ModelProviderInfo,
effort: ReasoningEffortConfig,
summary: ReasoningSummaryConfig,
session_id: Uuid,
) -> Self {
Self {
config,
client: reqwest::Client::new(),
provider,
session_id,
effort,
summary,
}
@@ -143,6 +147,7 @@ impl ModelClient {
.provider
.create_request_builder(&self.client)?
.header("OpenAI-Beta", "responses=experimental")
.header("session_id", self.session_id.to_string())
.header(reqwest::header::ACCEPT, "text/event-stream")
.json(&payload);

View File

@@ -591,6 +591,7 @@ async fn submission_loop(
provider.clone(),
model_reasoning_effort,
model_reasoning_summary,
session_id,
);
// abort any current running session and clone its state

View File

@@ -0,0 +1,117 @@
use std::time::Duration;
use codex_core::Codex;
use codex_core::ModelProviderInfo;
use codex_core::exec::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
use codex_core::protocol::EventMsg;
use codex_core::protocol::InputItem;
use codex_core::protocol::Op;
use codex_core::protocol::SessionConfiguredEvent;
mod test_support;
use tempfile::TempDir;
use test_support::load_default_config_for_test;
use test_support::load_sse_fixture_with_id;
use tokio::time::timeout;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path;
/// Build minimal SSE stream with completed marker using the JSON fixture.
fn sse_completed(id: &str) -> String {
load_sse_fixture_with_id("tests/fixtures/completed_template.json", id)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn includes_session_id_and_model_headers_in_request() {
#![allow(clippy::unwrap_used)]
if std::env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() {
println!(
"Skipping test because it cannot execute when network is disabled in a Codex sandbox."
);
return;
}
// Mock server
let server = MockServer::start().await;
// First request must NOT include `previous_response_id`.
let first = ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(sse_completed("resp1"), "text/event-stream");
Mock::given(method("POST"))
.and(path("/v1/responses"))
.respond_with(first)
.expect(1)
.mount(&server)
.await;
// Environment
// Update environment `set_var` is `unsafe` starting with the 2024
// edition so we group the calls into a single `unsafe { … }` block.
unsafe {
std::env::set_var("OPENAI_REQUEST_MAX_RETRIES", "0");
std::env::set_var("OPENAI_STREAM_MAX_RETRIES", "0");
}
let model_provider = ModelProviderInfo {
name: "openai".into(),
base_url: format!("{}/v1", server.uri()),
// Environment variable that should exist in the test environment.
// ModelClient will return an error if the environment variable for the
// provider is not set.
env_key: Some("PATH".into()),
env_key_instructions: None,
wire_api: codex_core::WireApi::Responses,
query_params: None,
http_headers: Some(
[("originator".to_string(), "codex_cli_rs".to_string())]
.into_iter()
.collect(),
),
env_http_headers: None,
};
// Init session
let codex_home = TempDir::new().unwrap();
let mut config = load_default_config_for_test(&codex_home);
config.model_provider = model_provider;
let ctrl_c = std::sync::Arc::new(tokio::sync::Notify::new());
let (codex, _init_id) = Codex::spawn(config, ctrl_c.clone()).await.unwrap();
codex
.submit(Op::UserInput {
items: vec![InputItem::Text {
text: "hello".into(),
}],
})
.await
.unwrap();
let mut current_session_id = None;
// Wait for TaskComplete
loop {
let ev = timeout(Duration::from_secs(1), codex.next_event())
.await
.unwrap()
.unwrap();
if let EventMsg::SessionConfigured(SessionConfiguredEvent { session_id, .. }) = ev.msg {
current_session_id = Some(session_id.to_string());
}
if matches!(ev.msg, EventMsg::TaskComplete(_)) {
break;
}
}
// get request from the server
let request = &server.received_requests().await.unwrap()[0];
let request_body = request.headers.get("session_id").unwrap();
let originator = request.headers.get("originator").unwrap();
assert!(current_session_id.is_some());
assert_eq!(request_body.to_str().unwrap(), &current_session_id.unwrap());
assert_eq!(originator.to_str().unwrap(), "codex_cli_rs");
}

View File

@@ -5,22 +5,31 @@
use codex_core::codex_wrapper::init_codex;
use codex_core::config::Config as CodexConfig;
use codex_core::protocol::AgentMessageEvent;
use codex_core::protocol::ApplyPatchApprovalRequestEvent;
use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
use codex_core::protocol::ExecApprovalRequestEvent;
use codex_core::protocol::InputItem;
use codex_core::protocol::Op;
use codex_core::protocol::ReviewDecision;
use codex_core::protocol::Submission;
use codex_core::protocol::TaskCompleteEvent;
use mcp_types::CallToolResult;
use mcp_types::CallToolResultContent;
use mcp_types::JSONRPC_VERSION;
use mcp_types::JSONRPCMessage;
use mcp_types::JSONRPCNotification as McpNotification;
use mcp_types::JSONRPCResponse;
use mcp_types::RequestId;
use mcp_types::TextContent;
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;
/// Convert a Codex [`Event`] to an MCP notification.
///
/// NOTE: This helper is kept local because we only ever emit notifications
/// from within this worker. The implementation is intentionally infallible
/// serialization failures are treated as bugs.
fn codex_event_to_notification(event: &Event) -> JSONRPCMessage {
#[expect(clippy::expect_used)]
JSONRPCMessage::Notification(mcp_types::JSONRPCNotification {
@@ -39,6 +48,7 @@ pub async fn run_codex_tool_session(
initial_prompt: String,
config: CodexConfig,
outgoing: Sender<JSONRPCMessage>,
mut approval_rx: Receiver<ReviewDecision>,
) {
let (codex, first_event, _ctrl_c) = match init_codex(config).await {
Ok(res) => res,
@@ -76,7 +86,7 @@ pub async fn run_codex_tool_session(
};
let submission = Submission {
id: sub_id,
id: sub_id.clone(),
op: Op::UserInput {
items: vec![InputItem::Text {
text: initial_prompt.clone(),
@@ -90,8 +100,10 @@ pub async fn run_codex_tool_session(
let mut last_agent_message: Option<String> = None;
// Stream events until the task needs to pause for user interaction or
// completes.
// Stream events until the Codex task completes. When Codex asks for
// approval we pause, wait for a decision from the MCP client (delivered
// over `approval_rx` via `codex/approval`), forward the decision, and
// continue the session.
loop {
match codex.next_event().await {
Ok(event) => {
@@ -101,45 +113,77 @@ pub async fn run_codex_tool_session(
EventMsg::AgentMessage(AgentMessageEvent { message }) => {
last_agent_message = Some(message.clone());
}
EventMsg::ExecApprovalRequest(_) => {
let result = CallToolResult {
content: vec![CallToolResultContent::TextContent(TextContent {
r#type: "text".to_string(),
text: "EXEC_APPROVAL_REQUIRED".to_string(),
annotations: None,
})],
is_error: None,
};
let _ = outgoing
.send(JSONRPCMessage::Response(JSONRPCResponse {
jsonrpc: JSONRPC_VERSION.into(),
id: id.clone(),
result: result.into(),
}))
.await;
break;
}
EventMsg::ApplyPatchApprovalRequest(_) => {
let result = CallToolResult {
content: vec![CallToolResultContent::TextContent(TextContent {
r#type: "text".to_string(),
text: "PATCH_APPROVAL_REQUIRED".to_string(),
annotations: None,
})],
is_error: None,
};
let _ = outgoing
.send(JSONRPCMessage::Response(JSONRPCResponse {
jsonrpc: JSONRPC_VERSION.into(),
id: id.clone(),
result: result.into(),
}))
.await;
break;
}
EventMsg::TaskComplete(TaskCompleteEvent {
last_agent_message: _,
EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
command,
cwd,
reason,
}) => {
// Dispatch an informational notification so the client can surface a UI.
// We intentionally send a *notification* rather than a *request* because most
// generic MCP clients (including the Inspector) do not implement a handler for
// custom server->client requests and will otherwise respond with -32601.
let params = serde_json::json!({
"id": sub_id,
"kind": "exec",
"command": command,
"cwd": cwd,
"reason": reason,
});
let _ = outgoing
.send(JSONRPCMessage::Notification(McpNotification {
jsonrpc: JSONRPC_VERSION.into(),
method: "codex/approval".into(),
params: Some(params),
}))
.await;
// Wait for the MCP client to respond with an approval decision.
let decision = approval_rx.recv().await.unwrap_or_default();
// Forward to Codex.
if let Err(e) = codex
.submit(Op::ExecApproval {
id: event.id.clone(),
decision,
})
.await
{
tracing::error!("failed to submit ExecApproval op: {e}");
break;
}
}
EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
reason,
grant_root,
..
}) => {
let params = serde_json::json!({
"id": sub_id,
"kind": "patch",
"reason": reason,
"grant_root": grant_root,
});
let _ = outgoing
.send(JSONRPCMessage::Notification(McpNotification {
jsonrpc: JSONRPC_VERSION.into(),
method: "codex/approval".into(),
params: Some(params),
}))
.await;
let decision = approval_rx.recv().await.unwrap_or_default();
if let Err(e) = codex
.submit(Op::PatchApproval {
id: event.id.clone(),
decision,
})
.await
{
tracing::error!("failed to submit PatchApproval op: {e}");
break;
}
}
EventMsg::TaskComplete(TaskCompleteEvent { .. }) => {
// Session finished send the final MCP response.
let result = if let Some(msg) = last_agent_message {
CallToolResult {
content: vec![CallToolResultContent::TextContent(TextContent {
@@ -169,15 +213,11 @@ pub async fn run_codex_tool_session(
break;
}
EventMsg::SessionConfigured(_) => {
tracing::error!("unexpected SessionConfigured event");
// Already surfaced above; ignore duplicates.
}
EventMsg::AgentMessageDelta(_) => {
// TODO: think how we want to support this in the MCP
}
EventMsg::AgentReasoningDelta(_) => {
// TODO: think how we want to support this in the MCP
}
EventMsg::Error(_)
EventMsg::AgentMessageDelta(_)
| EventMsg::AgentReasoningDelta(_)
| EventMsg::Error(_)
| EventMsg::TaskStarted
| EventMsg::TokenCount(_)
| EventMsg::AgentReasoning(_)
@@ -189,12 +229,7 @@ pub async fn run_codex_tool_session(
| EventMsg::PatchApplyBegin(_)
| EventMsg::PatchApplyEnd(_)
| EventMsg::GetHistoryEntryResponse(_) => {
// For now, we do not do anything extra for these
// events. Note that
// send(codex_event_to_notification(&event)) above has
// already dispatched these events as notifications,
// though we may want to do give different treatment to
// individual events in the future.
// No special handling.
}
}
}

View File

@@ -1,9 +1,14 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use crate::codex_tool_config::CodexToolCallParam;
use crate::codex_tool_config::create_tool_for_codex_tool_call_param;
use crate::codex_tool_runner::run_codex_tool_session;
use codex_core::config::Config as CodexConfig;
use codex_core::protocol::ReviewDecision;
use mcp_types::CallToolRequestParams;
use mcp_types::CallToolResult;
use mcp_types::CallToolResultContent;
@@ -27,10 +32,19 @@ use serde_json::json;
use tokio::sync::mpsc;
use tokio::task;
#[derive(Debug, serde::Deserialize)]
struct ApprovalParams {
id: String,
decision: ReviewDecision,
}
pub(crate) struct MessageProcessor {
outgoing: mpsc::Sender<JSONRPCMessage>,
initialized: bool,
codex_linux_sandbox_exe: Option<PathBuf>,
/// Map from Codex submission id (stringified MCP request id) -> channel used
/// to forward approval decisions to the corresponding running Codex tool session.
pending_approval_senders: Arc<Mutex<HashMap<String, mpsc::Sender<ReviewDecision>>>>,
}
impl MessageProcessor {
@@ -44,6 +58,7 @@ impl MessageProcessor {
outgoing,
initialized: false,
codex_linux_sandbox_exe,
pending_approval_senders: Arc::new(Mutex::new(HashMap::new())),
}
}
@@ -51,6 +66,20 @@ impl MessageProcessor {
// Hold on to the ID so we can respond.
let request_id = request.id.clone();
if request.method == "codex/approval" {
let params_json = request.params.unwrap_or(serde_json::Value::Null);
let params: ApprovalParams = match serde_json::from_value(params_json) {
Ok(p) => p,
Err(e) => {
tracing::warn!("Failed to parse approval params: {e}");
return;
}
};
self.handle_codex_approval(request_id, params);
return;
}
let client_request = match ClientRequest::try_from(request) {
Ok(client_request) => client_request,
Err(e) => {
@@ -392,15 +421,31 @@ impl MessageProcessor {
}
};
// Create a channel to receive approval decisions for this Codex session.
// We stringify the MCP request id to use as the Codex submission id.
let sub_id_str = match &id {
RequestId::String(s) => s.clone(),
RequestId::Integer(n) => n.to_string(),
};
let (approval_tx, approval_rx) = mpsc::channel::<ReviewDecision>(8);
{
let mut map = self.pending_approval_senders.lock().unwrap();
map.insert(sub_id_str.clone(), approval_tx);
}
// Clone outgoing sender to move into async task.
let outgoing = self.outgoing.clone();
let approval_map = self.pending_approval_senders.clone();
// Spawn an async task to handle the Codex session so that we do not
// block the synchronous message-processing loop.
task::spawn(async move {
// Run the Codex session and stream events back to the client.
crate::codex_tool_runner::run_codex_tool_session(id, initial_prompt, config, outgoing)
.await;
run_codex_tool_session(id, initial_prompt, config, outgoing, approval_rx).await;
// Session finished; drop the sender entry so future approvals are ignored.
let mut map = approval_map.lock().unwrap();
map.remove(&sub_id_str);
});
}
@@ -418,6 +463,43 @@ impl MessageProcessor {
tracing::info!("completion/complete -> params: {:?}", params);
}
fn handle_codex_approval(&self, id: RequestId, params: ApprovalParams) {
tracing::info!("codex/approval -> params: {:?}", params);
// Forward the decision to the running Codex session (if any).
let mut delivered = false;
{
let mut map = self.pending_approval_senders.lock().unwrap();
if let Some(tx) = map.get_mut(&params.id) {
match tx.try_send(params.decision) {
Ok(()) => {
delivered = true;
}
Err(e) => {
tracing::warn!("failed to forward approval to session {}: {e}", params.id);
}
}
} else {
tracing::warn!(
"no pending Codex session found for approval id {}",
params.id
);
}
}
// Ack the JSON-RPC request regardless of whether we were able to deliver the decision.
// Include a boolean in the result so clients can detect delivery failures if desired.
let response = JSONRPCMessage::Response(JSONRPCResponse {
jsonrpc: JSONRPC_VERSION.into(),
id,
result: serde_json::json!({ "delivered": delivered }),
});
if let Err(e) = self.outgoing.try_send(response) {
tracing::error!("Failed to send approval response: {e}");
}
}
// ---------------------------------------------------------------------
// Notification handlers
// ---------------------------------------------------------------------

View File

@@ -19,7 +19,8 @@ use crossterm::event::MouseEvent;
use crossterm::event::MouseEventKind;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::mpsc::Receiver;
use std::sync::mpsc::channel;
use std::thread;
@@ -54,7 +55,7 @@ pub(crate) struct App<'a> {
file_search: FileSearchManager,
/// True when a redraw has been scheduled but not yet executed.
pending_redraw: Arc<Mutex<bool>>,
pending_redraw: Arc<AtomicBool>,
/// Stored parameters needed to instantiate the ChatWidget later, e.g.,
/// after dismissing the Git-repo warning.
@@ -80,7 +81,7 @@ impl App<'_> {
) -> Self {
let (app_event_tx, app_event_rx) = channel();
let app_event_tx = AppEventSender::new(app_event_tx);
let pending_redraw = Arc::new(Mutex::new(false));
let pending_redraw = Arc::new(AtomicBool::new(false));
let scroll_event_helper = ScrollEventHelper::new(app_event_tx.clone());
// Spawn a dedicated thread for reading the crossterm event loop and
@@ -177,13 +178,14 @@ impl App<'_> {
/// Schedule a redraw if one is not already pending.
#[allow(clippy::unwrap_used)]
fn schedule_redraw(&self) {
// Attempt to set the flag to `true`. If it was already `true`, another
// redraw is already pending so we can return early.
if self
.pending_redraw
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_err()
{
#[allow(clippy::unwrap_used)]
let mut flag = self.pending_redraw.lock().unwrap();
if *flag {
return;
}
*flag = true;
return;
}
let tx = self.app_event_tx.clone();
@@ -191,9 +193,7 @@ impl App<'_> {
thread::spawn(move || {
thread::sleep(REDRAW_DEBOUNCE);
tx.send(AppEvent::Redraw);
#[allow(clippy::unwrap_used)]
let mut f = pending_redraw.lock().unwrap();
*f = false;
pending_redraw.store(false, Ordering::SeqCst);
});
}