Compare commits

...

3 Commits

Author SHA1 Message Date
aibrahim-oai
26aff08990 Merge branch 'main' into codex/implement-mcp-support-for-apply-patch/exec 2025-07-18 11:49:46 -07:00
Ahmed Ibrahim
08124dfd70 approval 2025-07-17 22:38:52 -07:00
aibrahim-oai
ea2c30bd42 Add stub MCP approval handler 2025-07-17 21:39:58 -07:00
2 changed files with 174 additions and 57 deletions

View File

@@ -5,22 +5,31 @@
use codex_core::codex_wrapper::init_codex;
use codex_core::config::Config as CodexConfig;
use codex_core::protocol::AgentMessageEvent;
use codex_core::protocol::ApplyPatchApprovalRequestEvent;
use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
use codex_core::protocol::ExecApprovalRequestEvent;
use codex_core::protocol::InputItem;
use codex_core::protocol::Op;
use codex_core::protocol::ReviewDecision;
use codex_core::protocol::Submission;
use codex_core::protocol::TaskCompleteEvent;
use mcp_types::CallToolResult;
use mcp_types::CallToolResultContent;
use mcp_types::JSONRPC_VERSION;
use mcp_types::JSONRPCMessage;
use mcp_types::JSONRPCNotification as McpNotification;
use mcp_types::JSONRPCResponse;
use mcp_types::RequestId;
use mcp_types::TextContent;
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;
/// Convert a Codex [`Event`] to an MCP notification.
///
/// NOTE: This helper is kept local because we only ever emit notifications
/// from within this worker. The implementation is intentionally infallible
/// serialization failures are treated as bugs.
fn codex_event_to_notification(event: &Event) -> JSONRPCMessage {
#[expect(clippy::expect_used)]
JSONRPCMessage::Notification(mcp_types::JSONRPCNotification {
@@ -39,6 +48,7 @@ pub async fn run_codex_tool_session(
initial_prompt: String,
config: CodexConfig,
outgoing: Sender<JSONRPCMessage>,
mut approval_rx: Receiver<ReviewDecision>,
) {
let (codex, first_event, _ctrl_c) = match init_codex(config).await {
Ok(res) => res,
@@ -76,7 +86,7 @@ pub async fn run_codex_tool_session(
};
let submission = Submission {
id: sub_id,
id: sub_id.clone(),
op: Op::UserInput {
items: vec![InputItem::Text {
text: initial_prompt.clone(),
@@ -90,8 +100,10 @@ pub async fn run_codex_tool_session(
let mut last_agent_message: Option<String> = None;
// Stream events until the task needs to pause for user interaction or
// completes.
// Stream events until the Codex task completes. When Codex asks for
// approval we pause, wait for a decision from the MCP client (delivered
// over `approval_rx` via `codex/approval`), forward the decision, and
// continue the session.
loop {
match codex.next_event().await {
Ok(event) => {
@@ -101,45 +113,77 @@ pub async fn run_codex_tool_session(
EventMsg::AgentMessage(AgentMessageEvent { message }) => {
last_agent_message = Some(message.clone());
}
EventMsg::ExecApprovalRequest(_) => {
let result = CallToolResult {
content: vec![CallToolResultContent::TextContent(TextContent {
r#type: "text".to_string(),
text: "EXEC_APPROVAL_REQUIRED".to_string(),
annotations: None,
})],
is_error: None,
};
let _ = outgoing
.send(JSONRPCMessage::Response(JSONRPCResponse {
jsonrpc: JSONRPC_VERSION.into(),
id: id.clone(),
result: result.into(),
}))
.await;
break;
}
EventMsg::ApplyPatchApprovalRequest(_) => {
let result = CallToolResult {
content: vec![CallToolResultContent::TextContent(TextContent {
r#type: "text".to_string(),
text: "PATCH_APPROVAL_REQUIRED".to_string(),
annotations: None,
})],
is_error: None,
};
let _ = outgoing
.send(JSONRPCMessage::Response(JSONRPCResponse {
jsonrpc: JSONRPC_VERSION.into(),
id: id.clone(),
result: result.into(),
}))
.await;
break;
}
EventMsg::TaskComplete(TaskCompleteEvent {
last_agent_message: _,
EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
command,
cwd,
reason,
}) => {
// Dispatch an informational notification so the client can surface a UI.
// We intentionally send a *notification* rather than a *request* because most
// generic MCP clients (including the Inspector) do not implement a handler for
// custom server->client requests and will otherwise respond with -32601.
let params = serde_json::json!({
"id": sub_id,
"kind": "exec",
"command": command,
"cwd": cwd,
"reason": reason,
});
let _ = outgoing
.send(JSONRPCMessage::Notification(McpNotification {
jsonrpc: JSONRPC_VERSION.into(),
method: "codex/approval".into(),
params: Some(params),
}))
.await;
// Wait for the MCP client to respond with an approval decision.
let decision = approval_rx.recv().await.unwrap_or_default();
// Forward to Codex.
if let Err(e) = codex
.submit(Op::ExecApproval {
id: event.id.clone(),
decision,
})
.await
{
tracing::error!("failed to submit ExecApproval op: {e}");
break;
}
}
EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
reason,
grant_root,
..
}) => {
let params = serde_json::json!({
"id": sub_id,
"kind": "patch",
"reason": reason,
"grant_root": grant_root,
});
let _ = outgoing
.send(JSONRPCMessage::Notification(McpNotification {
jsonrpc: JSONRPC_VERSION.into(),
method: "codex/approval".into(),
params: Some(params),
}))
.await;
let decision = approval_rx.recv().await.unwrap_or_default();
if let Err(e) = codex
.submit(Op::PatchApproval {
id: event.id.clone(),
decision,
})
.await
{
tracing::error!("failed to submit PatchApproval op: {e}");
break;
}
}
EventMsg::TaskComplete(TaskCompleteEvent { .. }) => {
// Session finished send the final MCP response.
let result = if let Some(msg) = last_agent_message {
CallToolResult {
content: vec![CallToolResultContent::TextContent(TextContent {
@@ -169,15 +213,11 @@ pub async fn run_codex_tool_session(
break;
}
EventMsg::SessionConfigured(_) => {
tracing::error!("unexpected SessionConfigured event");
// Already surfaced above; ignore duplicates.
}
EventMsg::AgentMessageDelta(_) => {
// TODO: think how we want to support this in the MCP
}
EventMsg::AgentReasoningDelta(_) => {
// TODO: think how we want to support this in the MCP
}
EventMsg::Error(_)
EventMsg::AgentMessageDelta(_)
| EventMsg::AgentReasoningDelta(_)
| EventMsg::Error(_)
| EventMsg::TaskStarted
| EventMsg::TokenCount(_)
| EventMsg::AgentReasoning(_)
@@ -189,12 +229,7 @@ pub async fn run_codex_tool_session(
| EventMsg::PatchApplyBegin(_)
| EventMsg::PatchApplyEnd(_)
| EventMsg::GetHistoryEntryResponse(_) => {
// For now, we do not do anything extra for these
// events. Note that
// send(codex_event_to_notification(&event)) above has
// already dispatched these events as notifications,
// though we may want to do give different treatment to
// individual events in the future.
// No special handling.
}
}
}

View File

@@ -1,9 +1,14 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use crate::codex_tool_config::CodexToolCallParam;
use crate::codex_tool_config::create_tool_for_codex_tool_call_param;
use crate::codex_tool_runner::run_codex_tool_session;
use codex_core::config::Config as CodexConfig;
use codex_core::protocol::ReviewDecision;
use mcp_types::CallToolRequestParams;
use mcp_types::CallToolResult;
use mcp_types::CallToolResultContent;
@@ -27,10 +32,19 @@ use serde_json::json;
use tokio::sync::mpsc;
use tokio::task;
#[derive(Debug, serde::Deserialize)]
struct ApprovalParams {
id: String,
decision: ReviewDecision,
}
pub(crate) struct MessageProcessor {
outgoing: mpsc::Sender<JSONRPCMessage>,
initialized: bool,
codex_linux_sandbox_exe: Option<PathBuf>,
/// Map from Codex submission id (stringified MCP request id) -> channel used
/// to forward approval decisions to the corresponding running Codex tool session.
pending_approval_senders: Arc<Mutex<HashMap<String, mpsc::Sender<ReviewDecision>>>>,
}
impl MessageProcessor {
@@ -44,6 +58,7 @@ impl MessageProcessor {
outgoing,
initialized: false,
codex_linux_sandbox_exe,
pending_approval_senders: Arc::new(Mutex::new(HashMap::new())),
}
}
@@ -51,6 +66,20 @@ impl MessageProcessor {
// Hold on to the ID so we can respond.
let request_id = request.id.clone();
if request.method == "codex/approval" {
let params_json = request.params.unwrap_or(serde_json::Value::Null);
let params: ApprovalParams = match serde_json::from_value(params_json) {
Ok(p) => p,
Err(e) => {
tracing::warn!("Failed to parse approval params: {e}");
return;
}
};
self.handle_codex_approval(request_id, params);
return;
}
let client_request = match ClientRequest::try_from(request) {
Ok(client_request) => client_request,
Err(e) => {
@@ -392,15 +421,31 @@ impl MessageProcessor {
}
};
// Create a channel to receive approval decisions for this Codex session.
// We stringify the MCP request id to use as the Codex submission id.
let sub_id_str = match &id {
RequestId::String(s) => s.clone(),
RequestId::Integer(n) => n.to_string(),
};
let (approval_tx, approval_rx) = mpsc::channel::<ReviewDecision>(8);
{
let mut map = self.pending_approval_senders.lock().unwrap();
map.insert(sub_id_str.clone(), approval_tx);
}
// Clone outgoing sender to move into async task.
let outgoing = self.outgoing.clone();
let approval_map = self.pending_approval_senders.clone();
// Spawn an async task to handle the Codex session so that we do not
// block the synchronous message-processing loop.
task::spawn(async move {
// Run the Codex session and stream events back to the client.
crate::codex_tool_runner::run_codex_tool_session(id, initial_prompt, config, outgoing)
.await;
run_codex_tool_session(id, initial_prompt, config, outgoing, approval_rx).await;
// Session finished; drop the sender entry so future approvals are ignored.
let mut map = approval_map.lock().unwrap();
map.remove(&sub_id_str);
});
}
@@ -418,6 +463,43 @@ impl MessageProcessor {
tracing::info!("completion/complete -> params: {:?}", params);
}
fn handle_codex_approval(&self, id: RequestId, params: ApprovalParams) {
tracing::info!("codex/approval -> params: {:?}", params);
// Forward the decision to the running Codex session (if any).
let mut delivered = false;
{
let mut map = self.pending_approval_senders.lock().unwrap();
if let Some(tx) = map.get_mut(&params.id) {
match tx.try_send(params.decision) {
Ok(()) => {
delivered = true;
}
Err(e) => {
tracing::warn!("failed to forward approval to session {}: {e}", params.id);
}
}
} else {
tracing::warn!(
"no pending Codex session found for approval id {}",
params.id
);
}
}
// Ack the JSON-RPC request regardless of whether we were able to deliver the decision.
// Include a boolean in the result so clients can detect delivery failures if desired.
let response = JSONRPCMessage::Response(JSONRPCResponse {
jsonrpc: JSONRPC_VERSION.into(),
id,
result: serde_json::json!({ "delivered": delivered }),
});
if let Err(e) = self.outgoing.try_send(response) {
tracing::error!("Failed to send approval response: {e}");
}
}
// ---------------------------------------------------------------------
// Notification handlers
// ---------------------------------------------------------------------