mirror of
https://github.com/openai/codex.git
synced 2026-02-02 06:57:03 +00:00
Compare commits
3 Commits
patch-guar
...
codex/impl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
26aff08990 | ||
|
|
08124dfd70 | ||
|
|
ea2c30bd42 |
@@ -5,22 +5,31 @@
|
||||
use codex_core::codex_wrapper::init_codex;
|
||||
use codex_core::config::Config as CodexConfig;
|
||||
use codex_core::protocol::AgentMessageEvent;
|
||||
use codex_core::protocol::ApplyPatchApprovalRequestEvent;
|
||||
use codex_core::protocol::Event;
|
||||
use codex_core::protocol::EventMsg;
|
||||
use codex_core::protocol::ExecApprovalRequestEvent;
|
||||
use codex_core::protocol::InputItem;
|
||||
use codex_core::protocol::Op;
|
||||
use codex_core::protocol::ReviewDecision;
|
||||
use codex_core::protocol::Submission;
|
||||
use codex_core::protocol::TaskCompleteEvent;
|
||||
use mcp_types::CallToolResult;
|
||||
use mcp_types::CallToolResultContent;
|
||||
use mcp_types::JSONRPC_VERSION;
|
||||
use mcp_types::JSONRPCMessage;
|
||||
use mcp_types::JSONRPCNotification as McpNotification;
|
||||
use mcp_types::JSONRPCResponse;
|
||||
use mcp_types::RequestId;
|
||||
use mcp_types::TextContent;
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
|
||||
/// Convert a Codex [`Event`] to an MCP notification.
|
||||
///
|
||||
/// NOTE: This helper is kept local because we only ever emit notifications
|
||||
/// from within this worker. The implementation is intentionally infallible –
|
||||
/// serialization failures are treated as bugs.
|
||||
fn codex_event_to_notification(event: &Event) -> JSONRPCMessage {
|
||||
#[expect(clippy::expect_used)]
|
||||
JSONRPCMessage::Notification(mcp_types::JSONRPCNotification {
|
||||
@@ -39,6 +48,7 @@ pub async fn run_codex_tool_session(
|
||||
initial_prompt: String,
|
||||
config: CodexConfig,
|
||||
outgoing: Sender<JSONRPCMessage>,
|
||||
mut approval_rx: Receiver<ReviewDecision>,
|
||||
) {
|
||||
let (codex, first_event, _ctrl_c) = match init_codex(config).await {
|
||||
Ok(res) => res,
|
||||
@@ -76,7 +86,7 @@ pub async fn run_codex_tool_session(
|
||||
};
|
||||
|
||||
let submission = Submission {
|
||||
id: sub_id,
|
||||
id: sub_id.clone(),
|
||||
op: Op::UserInput {
|
||||
items: vec![InputItem::Text {
|
||||
text: initial_prompt.clone(),
|
||||
@@ -90,8 +100,10 @@ pub async fn run_codex_tool_session(
|
||||
|
||||
let mut last_agent_message: Option<String> = None;
|
||||
|
||||
// Stream events until the task needs to pause for user interaction or
|
||||
// completes.
|
||||
// Stream events until the Codex task completes. When Codex asks for
|
||||
// approval we pause, wait for a decision from the MCP client (delivered
|
||||
// over `approval_rx` via `codex/approval`), forward the decision, and
|
||||
// continue the session.
|
||||
loop {
|
||||
match codex.next_event().await {
|
||||
Ok(event) => {
|
||||
@@ -101,45 +113,77 @@ pub async fn run_codex_tool_session(
|
||||
EventMsg::AgentMessage(AgentMessageEvent { message }) => {
|
||||
last_agent_message = Some(message.clone());
|
||||
}
|
||||
EventMsg::ExecApprovalRequest(_) => {
|
||||
let result = CallToolResult {
|
||||
content: vec![CallToolResultContent::TextContent(TextContent {
|
||||
r#type: "text".to_string(),
|
||||
text: "EXEC_APPROVAL_REQUIRED".to_string(),
|
||||
annotations: None,
|
||||
})],
|
||||
is_error: None,
|
||||
};
|
||||
let _ = outgoing
|
||||
.send(JSONRPCMessage::Response(JSONRPCResponse {
|
||||
jsonrpc: JSONRPC_VERSION.into(),
|
||||
id: id.clone(),
|
||||
result: result.into(),
|
||||
}))
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
EventMsg::ApplyPatchApprovalRequest(_) => {
|
||||
let result = CallToolResult {
|
||||
content: vec![CallToolResultContent::TextContent(TextContent {
|
||||
r#type: "text".to_string(),
|
||||
text: "PATCH_APPROVAL_REQUIRED".to_string(),
|
||||
annotations: None,
|
||||
})],
|
||||
is_error: None,
|
||||
};
|
||||
let _ = outgoing
|
||||
.send(JSONRPCMessage::Response(JSONRPCResponse {
|
||||
jsonrpc: JSONRPC_VERSION.into(),
|
||||
id: id.clone(),
|
||||
result: result.into(),
|
||||
}))
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
EventMsg::TaskComplete(TaskCompleteEvent {
|
||||
last_agent_message: _,
|
||||
EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
|
||||
command,
|
||||
cwd,
|
||||
reason,
|
||||
}) => {
|
||||
// Dispatch an informational notification so the client can surface a UI.
|
||||
// We intentionally send a *notification* rather than a *request* because most
|
||||
// generic MCP clients (including the Inspector) do not implement a handler for
|
||||
// custom server->client requests and will otherwise respond with -32601.
|
||||
let params = serde_json::json!({
|
||||
"id": sub_id,
|
||||
"kind": "exec",
|
||||
"command": command,
|
||||
"cwd": cwd,
|
||||
"reason": reason,
|
||||
});
|
||||
let _ = outgoing
|
||||
.send(JSONRPCMessage::Notification(McpNotification {
|
||||
jsonrpc: JSONRPC_VERSION.into(),
|
||||
method: "codex/approval".into(),
|
||||
params: Some(params),
|
||||
}))
|
||||
.await;
|
||||
|
||||
// Wait for the MCP client to respond with an approval decision.
|
||||
let decision = approval_rx.recv().await.unwrap_or_default();
|
||||
// Forward to Codex.
|
||||
if let Err(e) = codex
|
||||
.submit(Op::ExecApproval {
|
||||
id: event.id.clone(),
|
||||
decision,
|
||||
})
|
||||
.await
|
||||
{
|
||||
tracing::error!("failed to submit ExecApproval op: {e}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
|
||||
reason,
|
||||
grant_root,
|
||||
..
|
||||
}) => {
|
||||
let params = serde_json::json!({
|
||||
"id": sub_id,
|
||||
"kind": "patch",
|
||||
"reason": reason,
|
||||
"grant_root": grant_root,
|
||||
});
|
||||
let _ = outgoing
|
||||
.send(JSONRPCMessage::Notification(McpNotification {
|
||||
jsonrpc: JSONRPC_VERSION.into(),
|
||||
method: "codex/approval".into(),
|
||||
params: Some(params),
|
||||
}))
|
||||
.await;
|
||||
|
||||
let decision = approval_rx.recv().await.unwrap_or_default();
|
||||
if let Err(e) = codex
|
||||
.submit(Op::PatchApproval {
|
||||
id: event.id.clone(),
|
||||
decision,
|
||||
})
|
||||
.await
|
||||
{
|
||||
tracing::error!("failed to submit PatchApproval op: {e}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
EventMsg::TaskComplete(TaskCompleteEvent { .. }) => {
|
||||
// Session finished – send the final MCP response.
|
||||
let result = if let Some(msg) = last_agent_message {
|
||||
CallToolResult {
|
||||
content: vec![CallToolResultContent::TextContent(TextContent {
|
||||
@@ -169,15 +213,11 @@ pub async fn run_codex_tool_session(
|
||||
break;
|
||||
}
|
||||
EventMsg::SessionConfigured(_) => {
|
||||
tracing::error!("unexpected SessionConfigured event");
|
||||
// Already surfaced above; ignore duplicates.
|
||||
}
|
||||
EventMsg::AgentMessageDelta(_) => {
|
||||
// TODO: think how we want to support this in the MCP
|
||||
}
|
||||
EventMsg::AgentReasoningDelta(_) => {
|
||||
// TODO: think how we want to support this in the MCP
|
||||
}
|
||||
EventMsg::Error(_)
|
||||
EventMsg::AgentMessageDelta(_)
|
||||
| EventMsg::AgentReasoningDelta(_)
|
||||
| EventMsg::Error(_)
|
||||
| EventMsg::TaskStarted
|
||||
| EventMsg::TokenCount(_)
|
||||
| EventMsg::AgentReasoning(_)
|
||||
@@ -189,12 +229,7 @@ pub async fn run_codex_tool_session(
|
||||
| EventMsg::PatchApplyBegin(_)
|
||||
| EventMsg::PatchApplyEnd(_)
|
||||
| EventMsg::GetHistoryEntryResponse(_) => {
|
||||
// For now, we do not do anything extra for these
|
||||
// events. Note that
|
||||
// send(codex_event_to_notification(&event)) above has
|
||||
// already dispatched these events as notifications,
|
||||
// though we may want to do give different treatment to
|
||||
// individual events in the future.
|
||||
// No special handling.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,9 +1,14 @@
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
|
||||
use crate::codex_tool_config::CodexToolCallParam;
|
||||
use crate::codex_tool_config::create_tool_for_codex_tool_call_param;
|
||||
use crate::codex_tool_runner::run_codex_tool_session;
|
||||
|
||||
use codex_core::config::Config as CodexConfig;
|
||||
use codex_core::protocol::ReviewDecision;
|
||||
use mcp_types::CallToolRequestParams;
|
||||
use mcp_types::CallToolResult;
|
||||
use mcp_types::CallToolResultContent;
|
||||
@@ -27,10 +32,19 @@ use serde_json::json;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::task;
|
||||
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
struct ApprovalParams {
|
||||
id: String,
|
||||
decision: ReviewDecision,
|
||||
}
|
||||
|
||||
pub(crate) struct MessageProcessor {
|
||||
outgoing: mpsc::Sender<JSONRPCMessage>,
|
||||
initialized: bool,
|
||||
codex_linux_sandbox_exe: Option<PathBuf>,
|
||||
/// Map from Codex submission id (stringified MCP request id) -> channel used
|
||||
/// to forward approval decisions to the corresponding running Codex tool session.
|
||||
pending_approval_senders: Arc<Mutex<HashMap<String, mpsc::Sender<ReviewDecision>>>>,
|
||||
}
|
||||
|
||||
impl MessageProcessor {
|
||||
@@ -44,6 +58,7 @@ impl MessageProcessor {
|
||||
outgoing,
|
||||
initialized: false,
|
||||
codex_linux_sandbox_exe,
|
||||
pending_approval_senders: Arc::new(Mutex::new(HashMap::new())),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -51,6 +66,20 @@ impl MessageProcessor {
|
||||
// Hold on to the ID so we can respond.
|
||||
let request_id = request.id.clone();
|
||||
|
||||
if request.method == "codex/approval" {
|
||||
let params_json = request.params.unwrap_or(serde_json::Value::Null);
|
||||
let params: ApprovalParams = match serde_json::from_value(params_json) {
|
||||
Ok(p) => p,
|
||||
Err(e) => {
|
||||
tracing::warn!("Failed to parse approval params: {e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
self.handle_codex_approval(request_id, params);
|
||||
return;
|
||||
}
|
||||
|
||||
let client_request = match ClientRequest::try_from(request) {
|
||||
Ok(client_request) => client_request,
|
||||
Err(e) => {
|
||||
@@ -392,15 +421,31 @@ impl MessageProcessor {
|
||||
}
|
||||
};
|
||||
|
||||
// Create a channel to receive approval decisions for this Codex session.
|
||||
// We stringify the MCP request id to use as the Codex submission id.
|
||||
let sub_id_str = match &id {
|
||||
RequestId::String(s) => s.clone(),
|
||||
RequestId::Integer(n) => n.to_string(),
|
||||
};
|
||||
let (approval_tx, approval_rx) = mpsc::channel::<ReviewDecision>(8);
|
||||
{
|
||||
let mut map = self.pending_approval_senders.lock().unwrap();
|
||||
map.insert(sub_id_str.clone(), approval_tx);
|
||||
}
|
||||
|
||||
// Clone outgoing sender to move into async task.
|
||||
let outgoing = self.outgoing.clone();
|
||||
let approval_map = self.pending_approval_senders.clone();
|
||||
|
||||
// Spawn an async task to handle the Codex session so that we do not
|
||||
// block the synchronous message-processing loop.
|
||||
task::spawn(async move {
|
||||
// Run the Codex session and stream events back to the client.
|
||||
crate::codex_tool_runner::run_codex_tool_session(id, initial_prompt, config, outgoing)
|
||||
.await;
|
||||
run_codex_tool_session(id, initial_prompt, config, outgoing, approval_rx).await;
|
||||
|
||||
// Session finished; drop the sender entry so future approvals are ignored.
|
||||
let mut map = approval_map.lock().unwrap();
|
||||
map.remove(&sub_id_str);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -418,6 +463,43 @@ impl MessageProcessor {
|
||||
tracing::info!("completion/complete -> params: {:?}", params);
|
||||
}
|
||||
|
||||
fn handle_codex_approval(&self, id: RequestId, params: ApprovalParams) {
|
||||
tracing::info!("codex/approval -> params: {:?}", params);
|
||||
|
||||
// Forward the decision to the running Codex session (if any).
|
||||
let mut delivered = false;
|
||||
{
|
||||
let mut map = self.pending_approval_senders.lock().unwrap();
|
||||
if let Some(tx) = map.get_mut(¶ms.id) {
|
||||
match tx.try_send(params.decision) {
|
||||
Ok(()) => {
|
||||
delivered = true;
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("failed to forward approval to session {}: {e}", params.id);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tracing::warn!(
|
||||
"no pending Codex session found for approval id {}",
|
||||
params.id
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Ack the JSON-RPC request regardless of whether we were able to deliver the decision.
|
||||
// Include a boolean in the result so clients can detect delivery failures if desired.
|
||||
let response = JSONRPCMessage::Response(JSONRPCResponse {
|
||||
jsonrpc: JSONRPC_VERSION.into(),
|
||||
id,
|
||||
result: serde_json::json!({ "delivered": delivered }),
|
||||
});
|
||||
|
||||
if let Err(e) = self.outgoing.try_send(response) {
|
||||
tracing::error!("Failed to send approval response: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------
|
||||
// Notification handlers
|
||||
// ---------------------------------------------------------------------
|
||||
|
||||
Reference in New Issue
Block a user