Compare commits

...

3 Commits

Author SHA1 Message Date
aibrahim-oai
26aff08990 Merge branch 'main' into codex/implement-mcp-support-for-apply-patch/exec 2025-07-18 11:49:46 -07:00
Ahmed Ibrahim
08124dfd70 approval 2025-07-17 22:38:52 -07:00
aibrahim-oai
ea2c30bd42 Add stub MCP approval handler 2025-07-17 21:39:58 -07:00
2 changed files with 174 additions and 57 deletions

View File

@@ -5,22 +5,31 @@
use codex_core::codex_wrapper::init_codex; use codex_core::codex_wrapper::init_codex;
use codex_core::config::Config as CodexConfig; use codex_core::config::Config as CodexConfig;
use codex_core::protocol::AgentMessageEvent; use codex_core::protocol::AgentMessageEvent;
use codex_core::protocol::ApplyPatchApprovalRequestEvent;
use codex_core::protocol::Event; use codex_core::protocol::Event;
use codex_core::protocol::EventMsg; use codex_core::protocol::EventMsg;
use codex_core::protocol::ExecApprovalRequestEvent;
use codex_core::protocol::InputItem; use codex_core::protocol::InputItem;
use codex_core::protocol::Op; use codex_core::protocol::Op;
use codex_core::protocol::ReviewDecision;
use codex_core::protocol::Submission; use codex_core::protocol::Submission;
use codex_core::protocol::TaskCompleteEvent; use codex_core::protocol::TaskCompleteEvent;
use mcp_types::CallToolResult; use mcp_types::CallToolResult;
use mcp_types::CallToolResultContent; use mcp_types::CallToolResultContent;
use mcp_types::JSONRPC_VERSION; use mcp_types::JSONRPC_VERSION;
use mcp_types::JSONRPCMessage; use mcp_types::JSONRPCMessage;
use mcp_types::JSONRPCNotification as McpNotification;
use mcp_types::JSONRPCResponse; use mcp_types::JSONRPCResponse;
use mcp_types::RequestId; use mcp_types::RequestId;
use mcp_types::TextContent; use mcp_types::TextContent;
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
/// Convert a Codex [`Event`] to an MCP notification. /// Convert a Codex [`Event`] to an MCP notification.
///
/// NOTE: This helper is kept local because we only ever emit notifications
/// from within this worker. The implementation is intentionally infallible
/// serialization failures are treated as bugs.
fn codex_event_to_notification(event: &Event) -> JSONRPCMessage { fn codex_event_to_notification(event: &Event) -> JSONRPCMessage {
#[expect(clippy::expect_used)] #[expect(clippy::expect_used)]
JSONRPCMessage::Notification(mcp_types::JSONRPCNotification { JSONRPCMessage::Notification(mcp_types::JSONRPCNotification {
@@ -39,6 +48,7 @@ pub async fn run_codex_tool_session(
initial_prompt: String, initial_prompt: String,
config: CodexConfig, config: CodexConfig,
outgoing: Sender<JSONRPCMessage>, outgoing: Sender<JSONRPCMessage>,
mut approval_rx: Receiver<ReviewDecision>,
) { ) {
let (codex, first_event, _ctrl_c) = match init_codex(config).await { let (codex, first_event, _ctrl_c) = match init_codex(config).await {
Ok(res) => res, Ok(res) => res,
@@ -76,7 +86,7 @@ pub async fn run_codex_tool_session(
}; };
let submission = Submission { let submission = Submission {
id: sub_id, id: sub_id.clone(),
op: Op::UserInput { op: Op::UserInput {
items: vec![InputItem::Text { items: vec![InputItem::Text {
text: initial_prompt.clone(), text: initial_prompt.clone(),
@@ -90,8 +100,10 @@ pub async fn run_codex_tool_session(
let mut last_agent_message: Option<String> = None; let mut last_agent_message: Option<String> = None;
// Stream events until the task needs to pause for user interaction or // Stream events until the Codex task completes. When Codex asks for
// completes. // approval we pause, wait for a decision from the MCP client (delivered
// over `approval_rx` via `codex/approval`), forward the decision, and
// continue the session.
loop { loop {
match codex.next_event().await { match codex.next_event().await {
Ok(event) => { Ok(event) => {
@@ -101,45 +113,77 @@ pub async fn run_codex_tool_session(
EventMsg::AgentMessage(AgentMessageEvent { message }) => { EventMsg::AgentMessage(AgentMessageEvent { message }) => {
last_agent_message = Some(message.clone()); last_agent_message = Some(message.clone());
} }
EventMsg::ExecApprovalRequest(_) => { EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
let result = CallToolResult { command,
content: vec![CallToolResultContent::TextContent(TextContent { cwd,
r#type: "text".to_string(), reason,
text: "EXEC_APPROVAL_REQUIRED".to_string(),
annotations: None,
})],
is_error: None,
};
let _ = outgoing
.send(JSONRPCMessage::Response(JSONRPCResponse {
jsonrpc: JSONRPC_VERSION.into(),
id: id.clone(),
result: result.into(),
}))
.await;
break;
}
EventMsg::ApplyPatchApprovalRequest(_) => {
let result = CallToolResult {
content: vec![CallToolResultContent::TextContent(TextContent {
r#type: "text".to_string(),
text: "PATCH_APPROVAL_REQUIRED".to_string(),
annotations: None,
})],
is_error: None,
};
let _ = outgoing
.send(JSONRPCMessage::Response(JSONRPCResponse {
jsonrpc: JSONRPC_VERSION.into(),
id: id.clone(),
result: result.into(),
}))
.await;
break;
}
EventMsg::TaskComplete(TaskCompleteEvent {
last_agent_message: _,
}) => { }) => {
// Dispatch an informational notification so the client can surface a UI.
// We intentionally send a *notification* rather than a *request* because most
// generic MCP clients (including the Inspector) do not implement a handler for
// custom server->client requests and will otherwise respond with -32601.
let params = serde_json::json!({
"id": sub_id,
"kind": "exec",
"command": command,
"cwd": cwd,
"reason": reason,
});
let _ = outgoing
.send(JSONRPCMessage::Notification(McpNotification {
jsonrpc: JSONRPC_VERSION.into(),
method: "codex/approval".into(),
params: Some(params),
}))
.await;
// Wait for the MCP client to respond with an approval decision.
let decision = approval_rx.recv().await.unwrap_or_default();
// Forward to Codex.
if let Err(e) = codex
.submit(Op::ExecApproval {
id: event.id.clone(),
decision,
})
.await
{
tracing::error!("failed to submit ExecApproval op: {e}");
break;
}
}
EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
reason,
grant_root,
..
}) => {
let params = serde_json::json!({
"id": sub_id,
"kind": "patch",
"reason": reason,
"grant_root": grant_root,
});
let _ = outgoing
.send(JSONRPCMessage::Notification(McpNotification {
jsonrpc: JSONRPC_VERSION.into(),
method: "codex/approval".into(),
params: Some(params),
}))
.await;
let decision = approval_rx.recv().await.unwrap_or_default();
if let Err(e) = codex
.submit(Op::PatchApproval {
id: event.id.clone(),
decision,
})
.await
{
tracing::error!("failed to submit PatchApproval op: {e}");
break;
}
}
EventMsg::TaskComplete(TaskCompleteEvent { .. }) => {
// Session finished send the final MCP response.
let result = if let Some(msg) = last_agent_message { let result = if let Some(msg) = last_agent_message {
CallToolResult { CallToolResult {
content: vec![CallToolResultContent::TextContent(TextContent { content: vec![CallToolResultContent::TextContent(TextContent {
@@ -169,15 +213,11 @@ pub async fn run_codex_tool_session(
break; break;
} }
EventMsg::SessionConfigured(_) => { EventMsg::SessionConfigured(_) => {
tracing::error!("unexpected SessionConfigured event"); // Already surfaced above; ignore duplicates.
} }
EventMsg::AgentMessageDelta(_) => { EventMsg::AgentMessageDelta(_)
// TODO: think how we want to support this in the MCP | EventMsg::AgentReasoningDelta(_)
} | EventMsg::Error(_)
EventMsg::AgentReasoningDelta(_) => {
// TODO: think how we want to support this in the MCP
}
EventMsg::Error(_)
| EventMsg::TaskStarted | EventMsg::TaskStarted
| EventMsg::TokenCount(_) | EventMsg::TokenCount(_)
| EventMsg::AgentReasoning(_) | EventMsg::AgentReasoning(_)
@@ -189,12 +229,7 @@ pub async fn run_codex_tool_session(
| EventMsg::PatchApplyBegin(_) | EventMsg::PatchApplyBegin(_)
| EventMsg::PatchApplyEnd(_) | EventMsg::PatchApplyEnd(_)
| EventMsg::GetHistoryEntryResponse(_) => { | EventMsg::GetHistoryEntryResponse(_) => {
// For now, we do not do anything extra for these // No special handling.
// events. Note that
// send(codex_event_to_notification(&event)) above has
// already dispatched these events as notifications,
// though we may want to do give different treatment to
// individual events in the future.
} }
} }
} }

View File

@@ -1,9 +1,14 @@
use std::collections::HashMap;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use crate::codex_tool_config::CodexToolCallParam; use crate::codex_tool_config::CodexToolCallParam;
use crate::codex_tool_config::create_tool_for_codex_tool_call_param; use crate::codex_tool_config::create_tool_for_codex_tool_call_param;
use crate::codex_tool_runner::run_codex_tool_session;
use codex_core::config::Config as CodexConfig; use codex_core::config::Config as CodexConfig;
use codex_core::protocol::ReviewDecision;
use mcp_types::CallToolRequestParams; use mcp_types::CallToolRequestParams;
use mcp_types::CallToolResult; use mcp_types::CallToolResult;
use mcp_types::CallToolResultContent; use mcp_types::CallToolResultContent;
@@ -27,10 +32,19 @@ use serde_json::json;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::task; use tokio::task;
#[derive(Debug, serde::Deserialize)]
struct ApprovalParams {
id: String,
decision: ReviewDecision,
}
pub(crate) struct MessageProcessor { pub(crate) struct MessageProcessor {
outgoing: mpsc::Sender<JSONRPCMessage>, outgoing: mpsc::Sender<JSONRPCMessage>,
initialized: bool, initialized: bool,
codex_linux_sandbox_exe: Option<PathBuf>, codex_linux_sandbox_exe: Option<PathBuf>,
/// Map from Codex submission id (stringified MCP request id) -> channel used
/// to forward approval decisions to the corresponding running Codex tool session.
pending_approval_senders: Arc<Mutex<HashMap<String, mpsc::Sender<ReviewDecision>>>>,
} }
impl MessageProcessor { impl MessageProcessor {
@@ -44,6 +58,7 @@ impl MessageProcessor {
outgoing, outgoing,
initialized: false, initialized: false,
codex_linux_sandbox_exe, codex_linux_sandbox_exe,
pending_approval_senders: Arc::new(Mutex::new(HashMap::new())),
} }
} }
@@ -51,6 +66,20 @@ impl MessageProcessor {
// Hold on to the ID so we can respond. // Hold on to the ID so we can respond.
let request_id = request.id.clone(); let request_id = request.id.clone();
if request.method == "codex/approval" {
let params_json = request.params.unwrap_or(serde_json::Value::Null);
let params: ApprovalParams = match serde_json::from_value(params_json) {
Ok(p) => p,
Err(e) => {
tracing::warn!("Failed to parse approval params: {e}");
return;
}
};
self.handle_codex_approval(request_id, params);
return;
}
let client_request = match ClientRequest::try_from(request) { let client_request = match ClientRequest::try_from(request) {
Ok(client_request) => client_request, Ok(client_request) => client_request,
Err(e) => { Err(e) => {
@@ -392,15 +421,31 @@ impl MessageProcessor {
} }
}; };
// Create a channel to receive approval decisions for this Codex session.
// We stringify the MCP request id to use as the Codex submission id.
let sub_id_str = match &id {
RequestId::String(s) => s.clone(),
RequestId::Integer(n) => n.to_string(),
};
let (approval_tx, approval_rx) = mpsc::channel::<ReviewDecision>(8);
{
let mut map = self.pending_approval_senders.lock().unwrap();
map.insert(sub_id_str.clone(), approval_tx);
}
// Clone outgoing sender to move into async task. // Clone outgoing sender to move into async task.
let outgoing = self.outgoing.clone(); let outgoing = self.outgoing.clone();
let approval_map = self.pending_approval_senders.clone();
// Spawn an async task to handle the Codex session so that we do not // Spawn an async task to handle the Codex session so that we do not
// block the synchronous message-processing loop. // block the synchronous message-processing loop.
task::spawn(async move { task::spawn(async move {
// Run the Codex session and stream events back to the client. // Run the Codex session and stream events back to the client.
crate::codex_tool_runner::run_codex_tool_session(id, initial_prompt, config, outgoing) run_codex_tool_session(id, initial_prompt, config, outgoing, approval_rx).await;
.await;
// Session finished; drop the sender entry so future approvals are ignored.
let mut map = approval_map.lock().unwrap();
map.remove(&sub_id_str);
}); });
} }
@@ -418,6 +463,43 @@ impl MessageProcessor {
tracing::info!("completion/complete -> params: {:?}", params); tracing::info!("completion/complete -> params: {:?}", params);
} }
fn handle_codex_approval(&self, id: RequestId, params: ApprovalParams) {
tracing::info!("codex/approval -> params: {:?}", params);
// Forward the decision to the running Codex session (if any).
let mut delivered = false;
{
let mut map = self.pending_approval_senders.lock().unwrap();
if let Some(tx) = map.get_mut(&params.id) {
match tx.try_send(params.decision) {
Ok(()) => {
delivered = true;
}
Err(e) => {
tracing::warn!("failed to forward approval to session {}: {e}", params.id);
}
}
} else {
tracing::warn!(
"no pending Codex session found for approval id {}",
params.id
);
}
}
// Ack the JSON-RPC request regardless of whether we were able to deliver the decision.
// Include a boolean in the result so clients can detect delivery failures if desired.
let response = JSONRPCMessage::Response(JSONRPCResponse {
jsonrpc: JSONRPC_VERSION.into(),
id,
result: serde_json::json!({ "delivered": delivered }),
});
if let Err(e) = self.outgoing.try_send(response) {
tracing::error!("Failed to send approval response: {e}");
}
}
// --------------------------------------------------------------------- // ---------------------------------------------------------------------
// Notification handlers // Notification handlers
// --------------------------------------------------------------------- // ---------------------------------------------------------------------