diff --git a/codex-rs/app-server-protocol/src/protocol/event_mapping.rs b/codex-rs/app-server-protocol/src/protocol/event_mapping.rs index 809f08050f..2880ce73f3 100644 --- a/codex-rs/app-server-protocol/src/protocol/event_mapping.rs +++ b/codex-rs/app-server-protocol/src/protocol/event_mapping.rs @@ -12,9 +12,6 @@ use crate::protocol::v2::DynamicToolCallStatus; use crate::protocol::v2::FileChangePatchUpdatedNotification; use crate::protocol::v2::ItemCompletedNotification; use crate::protocol::v2::ItemStartedNotification; -use crate::protocol::v2::McpToolCallError; -use crate::protocol::v2::McpToolCallResult; -use crate::protocol::v2::McpToolCallStatus; use crate::protocol::v2::PlanDeltaNotification; use crate::protocol::v2::ReasoningSummaryPartAddedNotification; use crate::protocol::v2::ReasoningSummaryTextDeltaNotification; @@ -23,7 +20,6 @@ use crate::protocol::v2::TerminalInteractionNotification; use crate::protocol::v2::ThreadItem; use codex_protocol::dynamic_tools::DynamicToolCallOutputContentItem as CoreDynamicToolCallOutputContentItem; use codex_protocol::protocol::EventMsg; -use serde_json::Value as JsonValue; use std::collections::HashMap; /// Build the v2 app-server notification that directly corresponds to a single core event. @@ -75,64 +71,6 @@ pub fn item_event_to_server_notification( item, }) } - EventMsg::McpToolCallBegin(begin_event) => { - let item = ThreadItem::McpToolCall { - id: begin_event.call_id, - server: begin_event.invocation.server, - tool: begin_event.invocation.tool, - status: McpToolCallStatus::InProgress, - arguments: begin_event.invocation.arguments.unwrap_or(JsonValue::Null), - mcp_app_resource_uri: begin_event.mcp_app_resource_uri, - result: None, - error: None, - duration_ms: None, - }; - ServerNotification::ItemStarted(ItemStartedNotification { - thread_id, - turn_id, - item, - }) - } - EventMsg::McpToolCallEnd(end_event) => { - let status = if end_event.is_success() { - McpToolCallStatus::Completed - } else { - McpToolCallStatus::Failed - }; - let duration_ms = i64::try_from(end_event.duration.as_millis()).ok(); - let (result, error) = match &end_event.result { - Ok(value) => ( - Some(Box::new(McpToolCallResult { - content: value.content.clone(), - structured_content: value.structured_content.clone(), - meta: value.meta.clone(), - })), - None, - ), - Err(message) => ( - None, - Some(McpToolCallError { - message: message.clone(), - }), - ), - }; - let item = ThreadItem::McpToolCall { - id: end_event.call_id, - server: end_event.invocation.server, - tool: end_event.invocation.tool, - status, - arguments: end_event.invocation.arguments.unwrap_or(JsonValue::Null), - mcp_app_resource_uri: end_event.mcp_app_resource_uri, - result, - error, - duration_ms, - }; - ServerNotification::ItemCompleted(ItemCompletedNotification { - thread_id, - turn_id, - item, - }) - } EventMsg::CollabAgentSpawnBegin(begin_event) => { let item = ThreadItem::CollabAgentToolCall { id: begin_event.call_id, @@ -500,17 +438,11 @@ pub fn item_event_to_server_notification( mod tests { use super::*; use codex_protocol::ThreadId; - use codex_protocol::mcp::CallToolResult; use codex_protocol::protocol::CollabResumeBeginEvent; use codex_protocol::protocol::CollabResumeEndEvent; use codex_protocol::protocol::ExecCommandOutputDeltaEvent; use codex_protocol::protocol::ExecOutputStream; - use codex_protocol::protocol::McpInvocation; - use codex_protocol::protocol::McpToolCallBeginEvent; - use codex_protocol::protocol::McpToolCallEndEvent; use pretty_assertions::assert_eq; - use rmcp::model::Content; - use std::time::Duration; fn assert_item_started_server_notification( notification: ServerNotification, @@ -621,179 +553,6 @@ mod tests { ); } - #[test] - fn mcp_tool_call_begin_maps_to_item_started_notification_with_args() { - let begin_event = McpToolCallBeginEvent { - call_id: "call_123".to_string(), - invocation: McpInvocation { - server: "codex".to_string(), - tool: "list_mcp_resources".to_string(), - arguments: Some(serde_json::json!({"server": ""})), - }, - mcp_app_resource_uri: Some("ui://widget/list-resources.html".to_string()), - }; - - let notification = item_event_to_server_notification( - EventMsg::McpToolCallBegin(begin_event.clone()), - "thread-1", - "turn_1", - ); - assert_item_started_server_notification( - notification, - ItemStartedNotification { - thread_id: "thread-1".to_string(), - turn_id: "turn_1".to_string(), - item: ThreadItem::McpToolCall { - id: begin_event.call_id, - server: begin_event.invocation.server, - tool: begin_event.invocation.tool, - status: McpToolCallStatus::InProgress, - arguments: serde_json::json!({"server": ""}), - mcp_app_resource_uri: Some("ui://widget/list-resources.html".to_string()), - result: None, - error: None, - duration_ms: None, - }, - }, - ); - } - - #[test] - fn mcp_tool_call_begin_maps_to_item_started_notification_without_args() { - let begin_event = McpToolCallBeginEvent { - call_id: "call_456".to_string(), - invocation: McpInvocation { - server: "codex".to_string(), - tool: "list_mcp_resources".to_string(), - arguments: None, - }, - mcp_app_resource_uri: None, - }; - - let notification = item_event_to_server_notification( - EventMsg::McpToolCallBegin(begin_event.clone()), - "thread-2", - "turn_2", - ); - assert_item_started_server_notification( - notification, - ItemStartedNotification { - thread_id: "thread-2".to_string(), - turn_id: "turn_2".to_string(), - item: ThreadItem::McpToolCall { - id: begin_event.call_id, - server: begin_event.invocation.server, - tool: begin_event.invocation.tool, - status: McpToolCallStatus::InProgress, - arguments: JsonValue::Null, - mcp_app_resource_uri: None, - result: None, - error: None, - duration_ms: None, - }, - }, - ); - } - - #[test] - fn mcp_tool_call_end_maps_to_item_completed_notification_on_success() { - let content = vec![ - serde_json::to_value(Content::text("{\"resources\":[]}")) - .expect("content should serialize"), - ]; - let result = CallToolResult { - content: content.clone(), - is_error: Some(false), - structured_content: None, - meta: Some(serde_json::json!({ - "ui/resourceUri": "ui://widget/list-resources.html" - })), - }; - - let end_event = McpToolCallEndEvent { - call_id: "call_789".to_string(), - invocation: McpInvocation { - server: "codex".to_string(), - tool: "list_mcp_resources".to_string(), - arguments: Some(serde_json::json!({"server": ""})), - }, - mcp_app_resource_uri: Some("ui://widget/list-resources.html".to_string()), - duration: Duration::from_nanos(92708), - result: Ok(result), - }; - - let notification = item_event_to_server_notification( - EventMsg::McpToolCallEnd(end_event.clone()), - "thread-3", - "turn_3", - ); - assert_item_completed_server_notification( - notification, - ItemCompletedNotification { - thread_id: "thread-3".to_string(), - turn_id: "turn_3".to_string(), - item: ThreadItem::McpToolCall { - id: end_event.call_id, - server: end_event.invocation.server, - tool: end_event.invocation.tool, - status: McpToolCallStatus::Completed, - arguments: serde_json::json!({"server": ""}), - mcp_app_resource_uri: Some("ui://widget/list-resources.html".to_string()), - result: Some(Box::new(McpToolCallResult { - content, - structured_content: None, - meta: Some(serde_json::json!({ - "ui/resourceUri": "ui://widget/list-resources.html" - })), - })), - error: None, - duration_ms: Some(0), - }, - }, - ); - } - - #[test] - fn mcp_tool_call_end_maps_to_item_completed_notification_on_error() { - let end_event = McpToolCallEndEvent { - call_id: "call_err".to_string(), - invocation: McpInvocation { - server: "codex".to_string(), - tool: "list_mcp_resources".to_string(), - arguments: None, - }, - mcp_app_resource_uri: None, - duration: Duration::from_millis(1), - result: Err("boom".to_string()), - }; - - let notification = item_event_to_server_notification( - EventMsg::McpToolCallEnd(end_event.clone()), - "thread-4", - "turn_4", - ); - assert_item_completed_server_notification( - notification, - ItemCompletedNotification { - thread_id: "thread-4".to_string(), - turn_id: "turn_4".to_string(), - item: ThreadItem::McpToolCall { - id: end_event.call_id, - server: end_event.invocation.server, - tool: end_event.invocation.tool, - status: McpToolCallStatus::Failed, - arguments: JsonValue::Null, - mcp_app_resource_uri: None, - result: None, - error: Some(McpToolCallError { - message: "boom".to_string(), - }), - duration_ms: Some(1), - }, - }, - ); - } - #[test] fn exec_command_output_delta_maps_to_command_execution_output_delta() { let notification = item_event_to_server_notification( diff --git a/codex-rs/app-server-protocol/src/protocol/thread_history.rs b/codex-rs/app-server-protocol/src/protocol/thread_history.rs index 64307c24bf..57eeedec34 100644 --- a/codex-rs/app-server-protocol/src/protocol/thread_history.rs +++ b/codex-rs/app-server-protocol/src/protocol/thread_history.rs @@ -359,6 +359,7 @@ impl ThreadHistoryBuilder { | codex_protocol::items::TurnItem::ImageView(_) | codex_protocol::items::TurnItem::ImageGeneration(_) | codex_protocol::items::TurnItem::FileChange(_) + | codex_protocol::items::TurnItem::McpToolCall(_) | codex_protocol::items::TurnItem::ContextCompaction(_) => {} } } @@ -382,6 +383,7 @@ impl ThreadHistoryBuilder { | codex_protocol::items::TurnItem::ImageView(_) | codex_protocol::items::TurnItem::ImageGeneration(_) | codex_protocol::items::TurnItem::FileChange(_) + | codex_protocol::items::TurnItem::McpToolCall(_) | codex_protocol::items::TurnItem::ContextCompaction(_) => {} } } diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 963ac69000..4eb33fa850 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -31,6 +31,8 @@ use codex_protocol::config_types::Verbosity; use codex_protocol::config_types::WebSearchMode; use codex_protocol::config_types::WebSearchToolConfig; use codex_protocol::items::AgentMessageContent as CoreAgentMessageContent; +use codex_protocol::items::McpToolCallError as CoreMcpToolCallError; +use codex_protocol::items::McpToolCallStatus as CoreMcpToolCallStatus; use codex_protocol::items::TurnItem as CoreTurnItem; use codex_protocol::mcp::CallToolResult as CoreMcpCallToolResult; use codex_protocol::mcp::Resource as McpResource; @@ -2783,6 +2785,24 @@ impl From for McpServerToolCallResponse { } } +impl From for McpToolCallResult { + fn from(result: CoreMcpCallToolResult) -> Self { + Self { + content: result.content, + structured_content: result.structured_content, + meta: result.meta, + } + } +} + +impl From for McpToolCallError { + fn from(error: CoreMcpToolCallError) -> Self { + Self { + message: error.message, + } + } +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] @@ -6483,6 +6503,23 @@ impl From for ThreadItem { .map(PatchApplyStatus::from) .unwrap_or(PatchApplyStatus::InProgress), }, + CoreTurnItem::McpToolCall(mcp) => { + let duration_ms = mcp + .duration + .and_then(|duration| i64::try_from(duration.as_millis()).ok()); + + ThreadItem::McpToolCall { + id: mcp.id, + server: mcp.server, + tool: mcp.tool, + status: McpToolCallStatus::from(mcp.status), + arguments: mcp.arguments, + mcp_app_resource_uri: mcp.mcp_app_resource_uri, + result: mcp.result.map(McpToolCallResult::from).map(Box::new), + error: mcp.error.map(McpToolCallError::from), + duration_ms, + } + } CoreTurnItem::ContextCompaction(compaction) => { ThreadItem::ContextCompaction { id: compaction.id } } @@ -6592,6 +6629,16 @@ impl From<&CorePatchApplyStatus> for PatchApplyStatus { } } +impl From for McpToolCallStatus { + fn from(value: CoreMcpToolCallStatus) -> Self { + match value { + CoreMcpToolCallStatus::InProgress => McpToolCallStatus::InProgress, + CoreMcpToolCallStatus::Completed => McpToolCallStatus::Completed, + CoreMcpToolCallStatus::Failed => McpToolCallStatus::Failed, + } + } +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] @@ -8094,10 +8141,13 @@ mod tests { use codex_protocol::items::AgentMessageItem; use codex_protocol::items::FileChangeItem; use codex_protocol::items::ImageViewItem; + use codex_protocol::items::McpToolCallItem; + use codex_protocol::items::McpToolCallStatus as CoreMcpToolCallStatus; use codex_protocol::items::ReasoningItem; use codex_protocol::items::TurnItem; use codex_protocol::items::UserMessageItem; use codex_protocol::items::WebSearchItem; + use codex_protocol::mcp::CallToolResult; use codex_protocol::models::WebSearchAction as CoreWebSearchAction; use codex_protocol::protocol::NetworkAccess as CoreNetworkAccess; use codex_protocol::user_input::UserInput as CoreUserInput; @@ -8107,6 +8157,7 @@ mod tests { use serde_json::json; use std::num::NonZeroUsize; use std::path::PathBuf; + use std::time::Duration; fn absolute_path_string(path: &str) -> String { let path = format!("/{}", path.trim_start_matches('/')); @@ -10416,6 +10467,69 @@ mod tests { status: PatchApplyStatus::Completed, } ); + + let mcp_tool_call_item = TurnItem::McpToolCall(McpToolCallItem { + id: "mcp-1".to_string(), + server: "server".to_string(), + tool: "tool".to_string(), + arguments: json!({"arg": "value"}), + mcp_app_resource_uri: Some("app://connector".to_string()), + status: CoreMcpToolCallStatus::InProgress, + result: None, + error: None, + duration: None, + }); + + assert_eq!( + ThreadItem::from(mcp_tool_call_item), + ThreadItem::McpToolCall { + id: "mcp-1".to_string(), + server: "server".to_string(), + tool: "tool".to_string(), + status: McpToolCallStatus::InProgress, + arguments: json!({"arg": "value"}), + mcp_app_resource_uri: Some("app://connector".to_string()), + result: None, + error: None, + duration_ms: None, + } + ); + + let completed_mcp_tool_call_item = TurnItem::McpToolCall(McpToolCallItem { + id: "mcp-2".to_string(), + server: "server".to_string(), + tool: "tool".to_string(), + arguments: JsonValue::Null, + mcp_app_resource_uri: None, + status: CoreMcpToolCallStatus::Completed, + result: Some(CallToolResult { + content: vec![json!({"type": "text", "text": "ok"})], + structured_content: Some(json!({"ok": true})), + is_error: Some(false), + meta: Some(json!({"trace": "1"})), + }), + error: None, + duration: Some(Duration::from_millis(42)), + }); + + assert_eq!( + ThreadItem::from(completed_mcp_tool_call_item), + ThreadItem::McpToolCall { + id: "mcp-2".to_string(), + server: "server".to_string(), + tool: "tool".to_string(), + status: McpToolCallStatus::Completed, + arguments: JsonValue::Null, + mcp_app_resource_uri: None, + result: Some(Box::new(McpToolCallResult { + content: vec![json!({"type": "text", "text": "ok"})], + structured_content: Some(json!({"ok": true})), + meta: Some(json!({"trace": "1"})), + })), + error: None, + duration_ms: Some(42), + } + ); } #[test] diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index e702152356..7e720bd681 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -836,9 +836,11 @@ pub(crate) async fn apply_bespoke_event_handling( crate::dynamic_tools::on_call_response(call_id, rx, conversation).await; }); } + EventMsg::McpToolCallBegin(_) | EventMsg::McpToolCallEnd(_) => { + // Deprecated MCP tool-call events are still fanned out for legacy clients. + // App-server v2 receives the canonical TurnItem::McpToolCall lifecycle instead. + } msg @ (EventMsg::DynamicToolCallResponse(_) - | EventMsg::McpToolCallBegin(_) - | EventMsg::McpToolCallEnd(_) | EventMsg::CollabAgentSpawnBegin(_) | EventMsg::CollabAgentSpawnEnd(_) | EventMsg::CollabAgentInteractionBegin(_) diff --git a/codex-rs/core/src/mcp_tool_call.rs b/codex-rs/core/src/mcp_tool_call.rs index 85fc939ba9..58f26cb25e 100644 --- a/codex-rs/core/src/mcp_tool_call.rs +++ b/codex-rs/core/src/mcp_tool_call.rs @@ -45,12 +45,13 @@ use codex_mcp::SandboxState; use codex_mcp::declared_openai_file_input_param_names; use codex_mcp::mcp_permission_prompt_is_auto_approved; use codex_otel::sanitize_metric_tag_value; +use codex_protocol::items::McpToolCallError; +use codex_protocol::items::McpToolCallItem; +use codex_protocol::items::McpToolCallStatus; +use codex_protocol::items::TurnItem; use codex_protocol::mcp::CallToolResult; use codex_protocol::openai_models::InputModality; -use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::McpInvocation; -use codex_protocol::protocol::McpToolCallBeginEvent; -use codex_protocol::protocol::McpToolCallEndEvent; use codex_protocol::protocol::ReviewDecision; use codex_protocol::request_user_input::RequestUserInputAnswer; use codex_protocol::request_user_input::RequestUserInputArgs; @@ -87,8 +88,8 @@ const MCP_RESULT_TELEMETRY_SERVER_USER_FLOW_SPAN_ATTR: &str = const MCP_RESULT_TELEMETRY_TARGET_ID_MAX_CHARS: usize = 256; const MCP_TOOL_CALL_EVENT_RESULT_MAX_BYTES: usize = DEFAULT_OUTPUT_BYTES_CAP; -/// Handles the specified tool call dispatches the appropriate -/// `McpToolCallBegin` and `McpToolCallEnd` events to the `Session`. +/// Handles the specified tool call and dispatches the appropriate MCP tool-call +/// item lifecycle events to the `Session`. pub(crate) async fn handle_mcp_tool_call( sess: Arc, turn_context: &Arc, @@ -186,12 +187,14 @@ pub(crate) async fn handle_mcp_tool_call( .as_ref() .and_then(|metadata| metadata.connector_name.clone()); - let tool_call_begin_event = EventMsg::McpToolCallBegin(McpToolCallBeginEvent { - call_id: call_id.clone(), - invocation: invocation.clone(), - mcp_app_resource_uri: mcp_app_resource_uri.clone(), - }); - notify_mcp_tool_call_event(sess.as_ref(), turn_context.as_ref(), tool_call_begin_event).await; + notify_mcp_tool_call_started( + sess.as_ref(), + turn_context.as_ref(), + &call_id, + invocation.clone(), + mcp_app_resource_uri.clone(), + ) + .await; if let Some(decision) = maybe_request_mcp_tool_approval( &sess, @@ -362,14 +365,16 @@ async fn handle_approved_mcp_tool_call( tracing::warn!("MCP tool call error: {error:?}"); } let duration = start.elapsed(); - let tool_call_end_event = EventMsg::McpToolCallEnd(McpToolCallEndEvent { - call_id: call_id.to_string(), + notify_mcp_tool_call_completed( + sess, + turn_context, + call_id, invocation, mcp_app_resource_uri, duration, - result: truncate_mcp_tool_result_for_event(&result), - }); - notify_mcp_tool_call_event(sess, turn_context, tool_call_end_event.clone()).await; + truncate_mcp_tool_result_for_event(&result), + ) + .await; maybe_track_codex_app_used(sess, turn_context, &server, &tool_name).await; let status = if result.is_ok() { "ok" } else { "error" }; @@ -658,7 +663,7 @@ fn truncate_mcp_tool_result_for_event( ) -> Result { match result { Ok(call_tool_result) => { - // The app-server rebuilds `ThreadItem::McpToolCall` from this event, + // The app-server rebuilds `ThreadItem::McpToolCall` from this item, // so avoid persisting multi-megabyte results in rollout storage. let Ok(serialized) = serde_json::to_string(call_tool_result) else { return Ok(call_tool_result.clone()); @@ -697,8 +702,69 @@ fn truncate_mcp_tool_result_for_event( } } -async fn notify_mcp_tool_call_event(sess: &Session, turn_context: &TurnContext, event: EventMsg) { - sess.send_event(turn_context, event).await; +async fn notify_mcp_tool_call_started( + sess: &Session, + turn_context: &TurnContext, + call_id: &str, + invocation: McpInvocation, + mcp_app_resource_uri: Option, +) { + let McpInvocation { + server, + tool, + arguments, + } = invocation; + let item = TurnItem::McpToolCall(McpToolCallItem { + id: call_id.to_string(), + server, + tool, + arguments: arguments.unwrap_or(JsonValue::Null), + mcp_app_resource_uri, + status: McpToolCallStatus::InProgress, + result: None, + error: None, + duration: None, + }); + sess.emit_turn_item_started(turn_context, &item).await; +} + +async fn notify_mcp_tool_call_completed( + sess: &Session, + turn_context: &TurnContext, + call_id: &str, + invocation: McpInvocation, + mcp_app_resource_uri: Option, + duration: Duration, + result: Result, +) { + let (status, result, error) = match result { + Ok(result) if result.is_error.unwrap_or(false) => { + (McpToolCallStatus::Failed, Some(result), None) + } + Ok(result) => (McpToolCallStatus::Completed, Some(result), None), + Err(message) => ( + McpToolCallStatus::Failed, + None, + Some(McpToolCallError { message }), + ), + }; + let McpInvocation { + server, + tool, + arguments, + } = invocation; + let item = TurnItem::McpToolCall(McpToolCallItem { + id: call_id.to_string(), + server, + tool, + arguments: arguments.unwrap_or(JsonValue::Null), + mcp_app_resource_uri, + status, + result, + error, + duration: Some(duration), + }); + sess.emit_turn_item_completed(turn_context, item).await; } struct McpAppUsageMetadata { @@ -1979,22 +2045,26 @@ async fn notify_mcp_tool_call_skip( already_started: bool, ) -> Result { if !already_started { - let tool_call_begin_event = EventMsg::McpToolCallBegin(McpToolCallBeginEvent { - call_id: call_id.to_string(), - invocation: invocation.clone(), - mcp_app_resource_uri: mcp_app_resource_uri.clone(), - }); - notify_mcp_tool_call_event(sess, turn_context, tool_call_begin_event).await; + notify_mcp_tool_call_started( + sess, + turn_context, + call_id, + invocation.clone(), + mcp_app_resource_uri.clone(), + ) + .await; } - let tool_call_end_event = EventMsg::McpToolCallEnd(McpToolCallEndEvent { - call_id: call_id.to_string(), + notify_mcp_tool_call_completed( + sess, + turn_context, + call_id, invocation, mcp_app_resource_uri, - duration: Duration::ZERO, - result: truncate_mcp_tool_result_for_event(&Err(message.clone())), - }); - notify_mcp_tool_call_event(sess, turn_context, tool_call_end_event).await; + Duration::ZERO, + truncate_mcp_tool_result_for_event(&Err(message.clone())), + ) + .await; Err(message) } diff --git a/codex-rs/core/src/tools/handlers/mcp_resource.rs b/codex-rs/core/src/tools/handlers/mcp_resource.rs index fa4a066741..14f8db3a4c 100644 --- a/codex-rs/core/src/tools/handlers/mcp_resource.rs +++ b/codex-rs/core/src/tools/handlers/mcp_resource.rs @@ -3,6 +3,10 @@ use std::sync::Arc; use std::time::Duration; use std::time::Instant; +use codex_protocol::items::McpToolCallError; +use codex_protocol::items::McpToolCallItem; +use codex_protocol::items::McpToolCallStatus; +use codex_protocol::items::TurnItem; use codex_protocol::mcp::CallToolResult; use codex_protocol::models::function_call_output_content_items_to_text; use rmcp::model::ListResourceTemplatesResult; @@ -25,10 +29,7 @@ use crate::tools::context::ToolInvocation; use crate::tools::context::ToolPayload; use crate::tools::registry::ToolHandler; use crate::tools::registry::ToolKind; -use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::McpInvocation; -use codex_protocol::protocol::McpToolCallBeginEvent; -use codex_protocol::protocol::McpToolCallEndEvent; pub struct McpResourceHandler; @@ -564,16 +565,23 @@ async fn emit_tool_call_begin( call_id: &str, invocation: McpInvocation, ) { - session - .send_event( - turn, - EventMsg::McpToolCallBegin(McpToolCallBeginEvent { - call_id: call_id.to_string(), - invocation, - mcp_app_resource_uri: None, - }), - ) - .await; + let McpInvocation { + server, + tool, + arguments, + } = invocation; + let item = TurnItem::McpToolCall(McpToolCallItem { + id: call_id.to_string(), + server, + tool, + arguments: arguments.unwrap_or(Value::Null), + mcp_app_resource_uri: None, + status: McpToolCallStatus::InProgress, + result: None, + error: None, + duration: None, + }); + session.emit_turn_item_started(turn, &item).await; } async fn emit_tool_call_end( @@ -584,18 +592,34 @@ async fn emit_tool_call_end( duration: Duration, result: Result, ) { - session - .send_event( - turn, - EventMsg::McpToolCallEnd(McpToolCallEndEvent { - call_id: call_id.to_string(), - invocation, - mcp_app_resource_uri: None, - duration, - result, - }), - ) - .await; + let (status, result, error) = match result { + Ok(result) if result.is_error.unwrap_or(false) => { + (McpToolCallStatus::Failed, Some(result), None) + } + Ok(result) => (McpToolCallStatus::Completed, Some(result), None), + Err(message) => ( + McpToolCallStatus::Failed, + None, + Some(McpToolCallError { message }), + ), + }; + let McpInvocation { + server, + tool, + arguments, + } = invocation; + let item = TurnItem::McpToolCall(McpToolCallItem { + id: call_id.to_string(), + server, + tool, + arguments: arguments.unwrap_or(Value::Null), + mcp_app_resource_uri: None, + status, + result, + error, + duration: Some(duration), + }); + session.emit_turn_item_completed(turn, item).await; } fn normalize_optional_string(input: Option) -> Option { diff --git a/codex-rs/protocol/src/items.rs b/codex-rs/protocol/src/items.rs index fb8936ed11..499db6fc85 100644 --- a/codex-rs/protocol/src/items.rs +++ b/codex-rs/protocol/src/items.rs @@ -1,3 +1,4 @@ +use crate::mcp::CallToolResult; use crate::memory_citation::MemoryCitation; use crate::models::ContentItem; use crate::models::MessagePhase; @@ -10,6 +11,9 @@ use crate::protocol::ContextCompactedEvent; use crate::protocol::EventMsg; use crate::protocol::FileChange; use crate::protocol::ImageGenerationEndEvent; +use crate::protocol::McpInvocation; +use crate::protocol::McpToolCallBeginEvent; +use crate::protocol::McpToolCallEndEvent; use crate::protocol::PatchApplyBeginEvent; use crate::protocol::PatchApplyEndEvent; use crate::protocol::PatchApplyStatus; @@ -27,8 +31,10 @@ use serde::Deserialize; use serde::Serialize; use std::collections::HashMap; use std::path::PathBuf; +use std::time::Duration; use ts_rs::TS; +#[allow(clippy::large_enum_variant)] #[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] #[serde(tag = "type")] #[ts(tag = "type")] @@ -42,6 +48,7 @@ pub enum TurnItem { ImageView(ImageViewItem), ImageGeneration(ImageGenerationItem), FileChange(FileChangeItem), + McpToolCall(McpToolCallItem), ContextCompaction(ContextCompactionItem), } @@ -160,6 +167,45 @@ pub struct FileChangeItem { pub stderr: Option, } +#[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema, PartialEq)] +#[serde(rename_all = "camelCase")] +#[ts(rename_all = "camelCase")] +pub struct McpToolCallItem { + pub id: String, + pub server: String, + pub tool: String, + pub arguments: serde_json::Value, + #[serde(default, skip_serializing_if = "Option::is_none")] + #[ts(optional)] + pub mcp_app_resource_uri: Option, + pub status: McpToolCallStatus, + #[serde(default, skip_serializing_if = "Option::is_none")] + #[ts(optional)] + pub result: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + #[ts(optional)] + pub error: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + #[ts(type = "string", optional)] + pub duration: Option, +} + +#[derive(Debug, Clone, Copy, Deserialize, Serialize, TS, JsonSchema, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +#[ts(rename_all = "camelCase")] +pub enum McpToolCallStatus { + InProgress, + Completed, + Failed, +} + +#[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +#[ts(rename_all = "camelCase")] +pub struct McpToolCallError { + pub message: String, +} + #[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] pub struct ContextCompactionItem { pub id: String, @@ -438,6 +484,40 @@ impl FileChangeItem { } } +impl McpToolCallItem { + pub fn as_legacy_begin_event(&self) -> EventMsg { + EventMsg::McpToolCallBegin(McpToolCallBeginEvent { + call_id: self.id.clone(), + invocation: McpInvocation { + server: self.server.clone(), + tool: self.tool.clone(), + arguments: (!self.arguments.is_null()).then(|| self.arguments.clone()), + }, + mcp_app_resource_uri: self.mcp_app_resource_uri.clone(), + }) + } + + pub fn as_legacy_end_event(&self) -> Option { + let result = match (&self.result, &self.error) { + (Some(result), _) => Ok(result.clone()), + (None, Some(error)) => Err(error.message.clone()), + (None, None) => return None, + }; + + Some(EventMsg::McpToolCallEnd(McpToolCallEndEvent { + call_id: self.id.clone(), + invocation: McpInvocation { + server: self.server.clone(), + tool: self.tool.clone(), + arguments: (!self.arguments.is_null()).then(|| self.arguments.clone()), + }, + mcp_app_resource_uri: self.mcp_app_resource_uri.clone(), + duration: self.duration?, + result, + })) + } +} + impl TurnItem { pub fn id(&self) -> String { match self { @@ -450,6 +530,7 @@ impl TurnItem { TurnItem::ImageView(item) => item.id.clone(), TurnItem::ImageGeneration(item) => item.id.clone(), TurnItem::FileChange(item) => item.id.clone(), + TurnItem::McpToolCall(item) => item.id.clone(), TurnItem::ContextCompaction(item) => item.id.clone(), } } @@ -472,6 +553,7 @@ impl TurnItem { .as_legacy_end_event(String::new()) .into_iter() .collect(), + TurnItem::McpToolCall(item) => item.as_legacy_end_event().into_iter().collect(), TurnItem::Reasoning(item) => item.as_legacy_events(show_raw_agent_reasoning), TurnItem::ContextCompaction(item) => vec![item.as_legacy_event()], } diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 60137fa8b0..95f61eba88 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -1843,6 +1843,7 @@ impl HasLegacyEvent for ItemStartedEvent { })] } TurnItem::FileChange(item) => vec![item.as_legacy_begin_event(self.turn_id.clone())], + TurnItem::McpToolCall(item) => vec![item.as_legacy_begin_event()], _ => Vec::new(), } } @@ -3938,8 +3939,11 @@ mod tests { use super::*; use crate::items::FileChangeItem; use crate::items::ImageGenerationItem; + use crate::items::McpToolCallItem; + use crate::items::McpToolCallStatus; use crate::items::UserMessageItem; use crate::items::WebSearchItem; + use crate::mcp::CallToolResult; use crate::permissions::FileSystemAccessMode; use crate::permissions::FileSystemPath; use crate::permissions::FileSystemSandboxEntry; @@ -4674,6 +4678,40 @@ mod tests { } } + #[test] + fn item_started_event_from_mcp_tool_call_emits_begin_event() { + let event = ItemStartedEvent { + thread_id: ThreadId::new(), + turn_id: "turn-1".into(), + item: TurnItem::McpToolCall(McpToolCallItem { + id: "mcp-1".into(), + server: "server".into(), + tool: "tool".into(), + arguments: json!({"arg": "value"}), + mcp_app_resource_uri: Some("app://connector".into()), + status: McpToolCallStatus::InProgress, + result: None, + error: None, + duration: None, + }), + }; + + let legacy_events = event.as_legacy_events(/*show_raw_agent_reasoning*/ false); + assert_eq!(legacy_events.len(), 1); + match &legacy_events[0] { + EventMsg::McpToolCallBegin(event) => { + assert_eq!(event.call_id, "mcp-1"); + assert_eq!(event.invocation.server, "server"); + assert_eq!(event.invocation.tool, "tool"); + assert_eq!( + event.mcp_app_resource_uri.as_deref(), + Some("app://connector") + ); + } + _ => panic!("expected McpToolCallBegin event"), + } + } + #[test] fn item_completed_event_from_image_generation_emits_end_event() { let event = ItemCompletedEvent { @@ -4742,6 +4780,47 @@ mod tests { } } + #[test] + fn item_completed_event_from_mcp_tool_call_emits_end_event() { + let event = ItemCompletedEvent { + thread_id: ThreadId::new(), + turn_id: "turn-1".into(), + item: TurnItem::McpToolCall(McpToolCallItem { + id: "mcp-1".into(), + server: "server".into(), + tool: "tool".into(), + arguments: json!({"arg": "value"}), + mcp_app_resource_uri: Some("app://connector".into()), + status: McpToolCallStatus::Completed, + result: Some(CallToolResult { + content: vec![json!({"type": "text", "text": "ok"})], + structured_content: None, + is_error: Some(false), + meta: None, + }), + error: None, + duration: Some(Duration::from_millis(42)), + }), + }; + + let legacy_events = event.as_legacy_events(/*show_raw_agent_reasoning*/ false); + assert_eq!(legacy_events.len(), 1); + match &legacy_events[0] { + EventMsg::McpToolCallEnd(event) => { + assert_eq!(event.call_id, "mcp-1"); + assert_eq!(event.invocation.server, "server"); + assert_eq!(event.invocation.tool, "tool"); + assert_eq!( + event.mcp_app_resource_uri.as_deref(), + Some("app://connector") + ); + assert_eq!(event.duration, Duration::from_millis(42)); + assert!(event.is_success()); + } + _ => panic!("expected McpToolCallEnd event"), + } + } + #[test] fn rollback_failed_error_does_not_affect_turn_status() { let event = ErrorEvent {