mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
- Added support for message and reasoning deltas - Skipped adding the support in the cli and tui for later - Commented a failing test (wrong merge) that needs fix in a separate PR. Side note: I think we need to disable merge when the CI don't pass.
222 lines
9.1 KiB
Rust
222 lines
9.1 KiB
Rust
//! Asynchronous worker that executes a **Codex** tool-call inside a spawned
|
|
//! Tokio task. Separated from `message_processor.rs` to keep that file small
|
|
//! and to make future feature-growth easier to manage.
|
|
|
|
use codex_core::codex_wrapper::init_codex;
|
|
use codex_core::config::Config as CodexConfig;
|
|
use codex_core::protocol::AgentMessageEvent;
|
|
use codex_core::protocol::Event;
|
|
use codex_core::protocol::EventMsg;
|
|
use codex_core::protocol::InputItem;
|
|
use codex_core::protocol::Op;
|
|
use codex_core::protocol::Submission;
|
|
use codex_core::protocol::TaskCompleteEvent;
|
|
use mcp_types::CallToolResult;
|
|
use mcp_types::CallToolResultContent;
|
|
use mcp_types::JSONRPC_VERSION;
|
|
use mcp_types::JSONRPCMessage;
|
|
use mcp_types::JSONRPCResponse;
|
|
use mcp_types::RequestId;
|
|
use mcp_types::TextContent;
|
|
use tokio::sync::mpsc::Sender;
|
|
|
|
/// Convert a Codex [`Event`] to an MCP notification.
|
|
fn codex_event_to_notification(event: &Event) -> JSONRPCMessage {
|
|
#[expect(clippy::expect_used)]
|
|
JSONRPCMessage::Notification(mcp_types::JSONRPCNotification {
|
|
jsonrpc: JSONRPC_VERSION.into(),
|
|
method: "codex/event".into(),
|
|
params: Some(serde_json::to_value(event).expect("Event must serialize")),
|
|
})
|
|
}
|
|
|
|
/// Run a complete Codex session and stream events back to the client.
|
|
///
|
|
/// On completion (success or error) the function sends the appropriate
|
|
/// `tools/call` response so the LLM can continue the conversation.
|
|
pub async fn run_codex_tool_session(
|
|
id: RequestId,
|
|
initial_prompt: String,
|
|
config: CodexConfig,
|
|
outgoing: Sender<JSONRPCMessage>,
|
|
) {
|
|
let (codex, first_event, _ctrl_c) = match init_codex(config).await {
|
|
Ok(res) => res,
|
|
Err(e) => {
|
|
let result = CallToolResult {
|
|
content: vec![CallToolResultContent::TextContent(TextContent {
|
|
r#type: "text".to_string(),
|
|
text: format!("Failed to start Codex session: {e}"),
|
|
annotations: None,
|
|
})],
|
|
is_error: Some(true),
|
|
};
|
|
let _ = outgoing
|
|
.send(JSONRPCMessage::Response(JSONRPCResponse {
|
|
jsonrpc: JSONRPC_VERSION.into(),
|
|
id,
|
|
result: result.into(),
|
|
}))
|
|
.await;
|
|
return;
|
|
}
|
|
};
|
|
|
|
// Send initial SessionConfigured event.
|
|
let _ = outgoing
|
|
.send(codex_event_to_notification(&first_event))
|
|
.await;
|
|
|
|
// Use the original MCP request ID as the `sub_id` for the Codex submission so that
|
|
// any events emitted for this tool-call can be correlated with the
|
|
// originating `tools/call` request.
|
|
let sub_id = match &id {
|
|
RequestId::String(s) => s.clone(),
|
|
RequestId::Integer(n) => n.to_string(),
|
|
};
|
|
|
|
let submission = Submission {
|
|
id: sub_id,
|
|
op: Op::UserInput {
|
|
items: vec![InputItem::Text {
|
|
text: initial_prompt.clone(),
|
|
}],
|
|
},
|
|
};
|
|
|
|
if let Err(e) = codex.submit_with_id(submission).await {
|
|
tracing::error!("Failed to submit initial prompt: {e}");
|
|
}
|
|
|
|
let mut last_agent_message: Option<String> = None;
|
|
|
|
// Stream events until the task needs to pause for user interaction or
|
|
// completes.
|
|
loop {
|
|
match codex.next_event().await {
|
|
Ok(event) => {
|
|
let _ = outgoing.send(codex_event_to_notification(&event)).await;
|
|
|
|
match &event.msg {
|
|
EventMsg::AgentMessage(AgentMessageEvent { message }) => {
|
|
last_agent_message = Some(message.clone());
|
|
}
|
|
EventMsg::ExecApprovalRequest(_) => {
|
|
let result = CallToolResult {
|
|
content: vec![CallToolResultContent::TextContent(TextContent {
|
|
r#type: "text".to_string(),
|
|
text: "EXEC_APPROVAL_REQUIRED".to_string(),
|
|
annotations: None,
|
|
})],
|
|
is_error: None,
|
|
};
|
|
let _ = outgoing
|
|
.send(JSONRPCMessage::Response(JSONRPCResponse {
|
|
jsonrpc: JSONRPC_VERSION.into(),
|
|
id: id.clone(),
|
|
result: result.into(),
|
|
}))
|
|
.await;
|
|
break;
|
|
}
|
|
EventMsg::ApplyPatchApprovalRequest(_) => {
|
|
let result = CallToolResult {
|
|
content: vec![CallToolResultContent::TextContent(TextContent {
|
|
r#type: "text".to_string(),
|
|
text: "PATCH_APPROVAL_REQUIRED".to_string(),
|
|
annotations: None,
|
|
})],
|
|
is_error: None,
|
|
};
|
|
let _ = outgoing
|
|
.send(JSONRPCMessage::Response(JSONRPCResponse {
|
|
jsonrpc: JSONRPC_VERSION.into(),
|
|
id: id.clone(),
|
|
result: result.into(),
|
|
}))
|
|
.await;
|
|
break;
|
|
}
|
|
EventMsg::TaskComplete(TaskCompleteEvent {
|
|
last_agent_message: _,
|
|
}) => {
|
|
let result = if let Some(msg) = last_agent_message {
|
|
CallToolResult {
|
|
content: vec![CallToolResultContent::TextContent(TextContent {
|
|
r#type: "text".to_string(),
|
|
text: msg,
|
|
annotations: None,
|
|
})],
|
|
is_error: None,
|
|
}
|
|
} else {
|
|
CallToolResult {
|
|
content: vec![CallToolResultContent::TextContent(TextContent {
|
|
r#type: "text".to_string(),
|
|
text: String::new(),
|
|
annotations: None,
|
|
})],
|
|
is_error: None,
|
|
}
|
|
};
|
|
let _ = outgoing
|
|
.send(JSONRPCMessage::Response(JSONRPCResponse {
|
|
jsonrpc: JSONRPC_VERSION.into(),
|
|
id: id.clone(),
|
|
result: result.into(),
|
|
}))
|
|
.await;
|
|
break;
|
|
}
|
|
EventMsg::SessionConfigured(_) => {
|
|
tracing::error!("unexpected SessionConfigured event");
|
|
}
|
|
EventMsg::AgentMessageDelta(_) => {
|
|
// TODO: think how we want to support this in the MCP
|
|
}
|
|
EventMsg::AgentReasoningDelta(_) => {
|
|
// TODO: think how we want to support this in the MCP
|
|
}
|
|
EventMsg::Error(_)
|
|
| EventMsg::TaskStarted
|
|
| EventMsg::TokenCount(_)
|
|
| EventMsg::AgentReasoning(_)
|
|
| EventMsg::McpToolCallBegin(_)
|
|
| EventMsg::McpToolCallEnd(_)
|
|
| EventMsg::ExecCommandBegin(_)
|
|
| EventMsg::ExecCommandEnd(_)
|
|
| EventMsg::BackgroundEvent(_)
|
|
| EventMsg::PatchApplyBegin(_)
|
|
| EventMsg::PatchApplyEnd(_)
|
|
| EventMsg::GetHistoryEntryResponse(_) => {
|
|
// For now, we do not do anything extra for these
|
|
// events. Note that
|
|
// send(codex_event_to_notification(&event)) above has
|
|
// already dispatched these events as notifications,
|
|
// though we may want to do give different treatment to
|
|
// individual events in the future.
|
|
}
|
|
}
|
|
}
|
|
Err(e) => {
|
|
let result = CallToolResult {
|
|
content: vec![CallToolResultContent::TextContent(TextContent {
|
|
r#type: "text".to_string(),
|
|
text: format!("Codex runtime error: {e}"),
|
|
annotations: None,
|
|
})],
|
|
is_error: Some(true),
|
|
};
|
|
let _ = outgoing
|
|
.send(JSONRPCMessage::Response(JSONRPCResponse {
|
|
jsonrpc: JSONRPC_VERSION.into(),
|
|
id: id.clone(),
|
|
result: result.into(),
|
|
}))
|
|
.await;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|