mirror of
https://github.com/openai/codex.git
synced 2026-05-04 11:26:33 +00:00
fix(core): truncate large mcp tool outputs in rollouts (#20260)
## Why
Large MCP tool call outputs can make rollout JSONL files enormous. In
the session that motivated this change, the biggest JSONL records were:
- `event_msg/mcp_tool_call_end`
- `response_item/function_call_output`
both containing the same unbounded MCP payloads - just 3 MCP tool calls
that each were multi-hundred MBs 😱
This PR truncates both of those JSONL records.
## How
#### For `response_item/function_call_output`
Unified exec already bounds tool output before it is injected into
model-facing history, which also keeps the corresponding rollout
`response_item/function_call_output` records small.
MCP should follow the same pattern: truncate the model-facing tool
output at the tool-output boundary, while leaving code-mode/raw hook
consumers alone.
#### For `event_msg/mcp_tool_call_end`
`McpToolCallEnd` also needs its own bounded event copy because it is the
app-server/replay/UI event shape that backs `ThreadItem::McpToolCall`.
Unfortunately this is _not_ downstream of the `ToolOutput` trait.
## Model behavior
Model behavior is actually unchanged as a result of this PR.
Before this PR, MCP output was:
1. Converted to `FunctionCallOutput`.
2. Recorded into in-memory history.
3. Truncated by `ContextManager::record_items()` before later model
turns saw it.
After this branch, MCP output is truncated earlier, in
`McpToolOutput::response_payload()`, using the same helper. Then
`ContextManager::record_items()` sees an already-truncated output and
effectively has little/no additional work to do.
So the model should still see the same kind of truncated function-call
output. The practical difference is where truncation happens: earlier,
before rollout persistence/app-server emission can see the giant
payload.
## Verification
- `cargo test -p codex-core mcp_tool_output`
- `cargo test -p codex-core
mcp_tool_call::tests::truncate_mcp_tool_result_for_event`
- `cargo test -p codex-core
mcp_post_tool_use_payload_uses_model_tool_name_args_and_result`
- `just fmt`
- `just fix -p codex-core`
- `git diff --check`
This commit is contained in:
@@ -5,16 +5,25 @@ use std::time::Duration;
|
||||
|
||||
use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_final_assistant_message_sse_response;
|
||||
use app_test_support::create_mock_responses_server_sequence;
|
||||
use app_test_support::to_response;
|
||||
use app_test_support::write_mock_responses_config_toml;
|
||||
use axum::Router;
|
||||
use codex_app_server_protocol::ItemCompletedNotification;
|
||||
use codex_app_server_protocol::JSONRPCError;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::McpServerToolCallParams;
|
||||
use codex_app_server_protocol::McpServerToolCallResponse;
|
||||
use codex_app_server_protocol::McpToolCallStatus;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ThreadItem;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::TurnStartResponse;
|
||||
use codex_app_server_protocol::UserInput as V2UserInput;
|
||||
use codex_utils_pty::DEFAULT_OUTPUT_BYTES_CAP;
|
||||
use core_test_support::responses;
|
||||
use pretty_assertions::assert_eq;
|
||||
use rmcp::handler::server::ServerHandler;
|
||||
@@ -42,6 +51,7 @@ use tokio::time::timeout;
|
||||
const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const TEST_SERVER_NAME: &str = "tool_server";
|
||||
const TEST_TOOL_NAME: &str = "echo_tool";
|
||||
const LARGE_RESPONSE_MESSAGE: &str = "large";
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn mcp_server_tool_call_returns_tool_result() -> Result<()> {
|
||||
@@ -161,6 +171,137 @@ async fn mcp_server_tool_call_returns_error_for_unknown_thread() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn mcp_tool_call_completion_notification_contains_truncated_large_result() -> Result<()> {
|
||||
let call_id = "call-large-mcp";
|
||||
let namespace = format!("mcp__{TEST_SERVER_NAME}__");
|
||||
let responses = vec![
|
||||
responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_function_call_with_namespace(
|
||||
call_id,
|
||||
&namespace,
|
||||
TEST_TOOL_NAME,
|
||||
&serde_json::to_string(&json!({
|
||||
"message": LARGE_RESPONSE_MESSAGE,
|
||||
}))?,
|
||||
),
|
||||
responses::ev_completed("resp-1"),
|
||||
]),
|
||||
create_final_assistant_message_sse_response("done")?,
|
||||
];
|
||||
let responses_server = create_mock_responses_server_sequence(responses).await;
|
||||
let (mcp_server_url, mcp_server_handle) = start_mcp_server().await?;
|
||||
let codex_home = TempDir::new()?;
|
||||
write_mock_responses_config_toml(
|
||||
codex_home.path(),
|
||||
&responses_server.uri(),
|
||||
&BTreeMap::new(),
|
||||
/*auto_compact_limit*/ 1_000_000,
|
||||
/*requires_openai_auth*/ None,
|
||||
"mock_provider",
|
||||
"compact",
|
||||
)?;
|
||||
|
||||
let config_path = codex_home.path().join("config.toml");
|
||||
let mut config_toml = std::fs::read_to_string(&config_path)?;
|
||||
config_toml.push_str(&format!(
|
||||
r#"
|
||||
[mcp_servers.{TEST_SERVER_NAME}]
|
||||
url = "{mcp_server_url}/mcp"
|
||||
"#
|
||||
));
|
||||
std::fs::write(config_path, config_toml)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let thread_start_id = mcp
|
||||
.send_thread_start_request(ThreadStartParams {
|
||||
model: Some("mock-model".to_string()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let thread_start_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(thread_start_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response(thread_start_resp)?;
|
||||
|
||||
let turn_start_id = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id,
|
||||
input: vec![V2UserInput::Text {
|
||||
text: "Call the large MCP tool".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let turn_start_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(turn_start_id)),
|
||||
)
|
||||
.await??;
|
||||
let TurnStartResponse { turn, .. } = to_response(turn_start_resp)?;
|
||||
|
||||
let completed = wait_for_mcp_tool_call_completed(&mut mcp, call_id).await?;
|
||||
assert_eq!(completed.turn_id, turn.id);
|
||||
|
||||
let ThreadItem::McpToolCall {
|
||||
id,
|
||||
server,
|
||||
tool,
|
||||
status,
|
||||
result: Some(result),
|
||||
error,
|
||||
..
|
||||
} = completed.item
|
||||
else {
|
||||
panic!("expected completed MCP tool call item");
|
||||
};
|
||||
assert_eq!(id, call_id);
|
||||
assert_eq!(server, TEST_SERVER_NAME);
|
||||
assert_eq!(tool, TEST_TOOL_NAME);
|
||||
assert_eq!(status, McpToolCallStatus::Completed);
|
||||
assert_eq!(error, None);
|
||||
assert_eq!(result.structured_content, None);
|
||||
assert_eq!(result.meta, None);
|
||||
assert_eq!(result.content.len(), 1);
|
||||
|
||||
let text = result.content[0]
|
||||
.get("text")
|
||||
.and_then(serde_json::Value::as_str)
|
||||
.expect("truncated MCP event result should be represented as text content");
|
||||
assert!(text.contains("truncated"));
|
||||
assert!(text.len() < DEFAULT_OUTPUT_BYTES_CAP + 1024);
|
||||
|
||||
let serialized_item = serde_json::to_string(&ThreadItem::McpToolCall {
|
||||
id,
|
||||
server,
|
||||
tool,
|
||||
status,
|
||||
arguments: json!({ "message": LARGE_RESPONSE_MESSAGE }),
|
||||
mcp_app_resource_uri: None,
|
||||
result: Some(result),
|
||||
error: None,
|
||||
duration_ms: None,
|
||||
})?;
|
||||
assert!(serialized_item.len() < DEFAULT_OUTPUT_BYTES_CAP * 2 + 2048);
|
||||
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("turn/completed"),
|
||||
)
|
||||
.await??;
|
||||
|
||||
mcp_server_handle.abort();
|
||||
let _ = mcp_server_handle.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
struct ToolAppsMcpServer;
|
||||
|
||||
@@ -224,6 +365,16 @@ impl ServerHandler for ToolAppsMcpServer {
|
||||
let mut meta = Meta::new();
|
||||
meta.0.insert("calledBy".to_string(), json!("mcp-app"));
|
||||
|
||||
if message == LARGE_RESPONSE_MESSAGE {
|
||||
let large_text = "large-mcp-content-".repeat(DEFAULT_OUTPUT_BYTES_CAP / 8);
|
||||
let mut result = CallToolResult::structured(json!({
|
||||
"large": "structured-value-".repeat(DEFAULT_OUTPUT_BYTES_CAP / 8),
|
||||
}));
|
||||
result.content = vec![Content::text(large_text)];
|
||||
result.meta = Some(meta);
|
||||
return Ok(result);
|
||||
}
|
||||
|
||||
let mut result = CallToolResult::structured(json!({
|
||||
"echoed": message,
|
||||
"threadId": thread_id,
|
||||
@@ -250,3 +401,23 @@ async fn start_mcp_server() -> Result<(String, JoinHandle<()>)> {
|
||||
|
||||
Ok((format!("http://{addr}"), handle))
|
||||
}
|
||||
|
||||
async fn wait_for_mcp_tool_call_completed(
|
||||
mcp: &mut McpProcess,
|
||||
call_id: &str,
|
||||
) -> Result<ItemCompletedNotification> {
|
||||
loop {
|
||||
let notification = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("item/completed"),
|
||||
)
|
||||
.await??;
|
||||
let Some(params) = notification.params else {
|
||||
continue;
|
||||
};
|
||||
let completed: ItemCompletedNotification = serde_json::from_value(params)?;
|
||||
if matches!(&completed.item, ThreadItem::McpToolCall { id, .. } if id == call_id) {
|
||||
return Ok(completed);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user