From 7bddb3083d677bf36a1cd65ecff9fe81c5f0bdf1 Mon Sep 17 00:00:00 2001 From: Owen Lin Date: Mon, 11 May 2026 11:45:25 -0700 Subject: [PATCH] fix(app-server): thread history redaction for remote clients (#22178) ## Summary Remote clients can still receive large `thread/resume` histories when prior turns include MCP tool call payloads or image-generation results. This adds a temporary response-only redaction path for the known remote client names. Longer term we will move towards fully paginated APIs backed by SQLite. ## Changes - Redact MCP tool call payload-bearing fields in `thread/resume` responses for `codex_chatgpt_android_remote` and `codex_chatgpt_ios_remote`. - Drop `imageGeneration` items from those `thread/resume` responses. - Keep redaction out of persisted rollout files, `thread/read`, `thread/turns/list`, live notifications, and token usage replay. - Cover the behavior with app-server helper tests and a v2 resume integration test that checks both remote clients plus a non-target control client. ## Testing - `cargo test -p codex-app-server thread_resume_redaction` - `cargo test -p codex-app-server thread_resume_redacts_payloads_for_chatgpt_remote_clients` --- codex-rs/app-server/src/request_processors.rs | 2 + .../request_processors/thread_lifecycle.rs | 5 +- .../request_processors/thread_processor.rs | 10 +- .../thread_resume_redaction.rs | 198 +++++++++++++++++ codex-rs/app-server/src/thread_state.rs | 1 + .../tests/suite/v2/thread_resume.rs | 203 ++++++++++++++++++ 6 files changed, 417 insertions(+), 2 deletions(-) create mode 100644 codex-rs/app-server/src/request_processors/thread_resume_redaction.rs diff --git a/codex-rs/app-server/src/request_processors.rs b/codex-rs/app-server/src/request_processors.rs index f2e3444f88..f24dcaa34f 100644 --- a/codex-rs/app-server/src/request_processors.rs +++ b/codex-rs/app-server/src/request_processors.rs @@ -488,12 +488,14 @@ mod config_errors; mod request_errors; mod thread_goal_processor; mod thread_lifecycle; +mod thread_resume_redaction; mod thread_summary; use self::config_errors::*; use self::request_errors::*; use self::thread_goal_processor::api_thread_goal_from_state; use self::thread_lifecycle::*; +use self::thread_resume_redaction::*; use self::thread_summary::*; pub(crate) use self::thread_lifecycle::populate_thread_turns_from_history; diff --git a/codex-rs/app-server/src/request_processors/thread_lifecycle.rs b/codex-rs/app-server/src/request_processors/thread_lifecycle.rs index 45031490b0..7dab206d85 100644 --- a/codex-rs/app-server/src/request_processors/thread_lifecycle.rs +++ b/codex-rs/app-server/src/request_processors/thread_lifecycle.rs @@ -557,6 +557,10 @@ pub(super) async fn handle_pending_thread_resume_request( thread_status, has_live_in_progress_turn, ); + let token_usage_thread = pending.include_turns.then(|| thread.clone()); + if pending.redact_resume_payloads { + redact_thread_resume_payloads(&mut thread); + } { let pending_thread_unloads = pending_thread_unloads.lock().await; @@ -624,7 +628,6 @@ pub(super) async fn handle_pending_thread_resume_request( active_permission_profile, reasoning_effort, }; - let token_usage_thread = pending.include_turns.then(|| response.thread.clone()); outgoing.send_response(request_id, response).await; // Match cold resume: metadata-only resume should attach the listener without // paying the cost of turn reconstruction for historical usage replay. diff --git a/codex-rs/app-server/src/request_processors/thread_processor.rs b/codex-rs/app-server/src/request_processors/thread_processor.rs index dc0c169fab..4c14229729 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor.rs @@ -2333,6 +2333,8 @@ impl ThreadRequestProcessor { self.send_persist_extended_history_deprecation_notice(request_id.connection_id) .await; } + let redact_resume_payloads = + should_redact_thread_resume_payloads(app_server_client_name.as_deref()); let _thread_list_state_permit = match self.acquire_thread_list_state_permit().await { Ok(permit) => permit, @@ -2527,6 +2529,10 @@ impl ThreadRequestProcessor { let active_permission_profile = thread_response_active_permission_profile( config_snapshot.active_permission_profile, ); + let token_usage_thread = include_turns.then(|| thread.clone()); + if redact_resume_payloads { + redact_thread_resume_payloads(&mut thread); + } let response = ThreadResumeResponse { thread, @@ -2544,7 +2550,6 @@ impl ThreadRequestProcessor { }; let connection_id = request_id.connection_id; - let token_usage_thread = include_turns.then(|| response.thread.clone()); self.outgoing.send_response(request_id, response).await; // `excludeTurns` is explicitly the cheap resume path, so avoid // rebuilding history only to attribute a replayed usage update. @@ -2664,6 +2669,8 @@ impl ThreadRequestProcessor { }; if let Some((existing_thread_id, existing_thread, source_thread)) = running_thread { + let redact_resume_payloads = + should_redact_thread_resume_payloads(app_server_client_name.as_deref()); let history_items = source_thread .history .as_ref() @@ -2738,6 +2745,7 @@ impl ThreadRequestProcessor { emit_thread_goal_update, thread_goal_state_db, include_turns: !params.exclude_turns, + redact_resume_payloads, }), ); if listener_command_tx.send(command).is_err() { diff --git a/codex-rs/app-server/src/request_processors/thread_resume_redaction.rs b/codex-rs/app-server/src/request_processors/thread_resume_redaction.rs new file mode 100644 index 0000000000..d547aef3f3 --- /dev/null +++ b/codex-rs/app-server/src/request_processors/thread_resume_redaction.rs @@ -0,0 +1,198 @@ +use codex_app_server_protocol::McpToolCallResult; +use codex_app_server_protocol::Thread; +use codex_app_server_protocol::ThreadItem; +use serde_json::Value as JsonValue; + +// Temporary bandaid for remote clients: thread/resume can include large MCP and +// image-generation payloads. Keep this response-only so persisted rollout +// history, model resume history, and other APIs stay unchanged. +const REDACTED_PAYLOAD: &str = "[redacted]"; +const CHATGPT_REMOTE_CLIENT_NAMES: &[&str] = + &["codex_chatgpt_android_remote", "codex_chatgpt_ios_remote"]; + +pub(super) fn should_redact_thread_resume_payloads(client_name: Option<&str>) -> bool { + client_name.is_some_and(|client_name| CHATGPT_REMOTE_CLIENT_NAMES.contains(&client_name)) +} + +pub(super) fn redact_thread_resume_payloads(thread: &mut Thread) { + for turn in &mut thread.turns { + turn.items.retain_mut(|item| match item { + ThreadItem::McpToolCall { + arguments, + result, + error, + .. + } => { + *arguments = JsonValue::String(REDACTED_PAYLOAD.to_string()); + if result.is_some() { + *result = Some(Box::new(redacted_mcp_tool_call_result())); + } + if let Some(error) = error { + error.message = REDACTED_PAYLOAD.to_string(); + } + true + } + ThreadItem::ImageGeneration { .. } => false, + _ => true, + }); + } +} + +fn redacted_mcp_tool_call_result() -> McpToolCallResult { + McpToolCallResult { + content: vec![serde_json::json!({ + "type": "text", + "text": REDACTED_PAYLOAD, + })], + structured_content: None, + meta: None, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use codex_app_server_protocol::McpToolCallError; + use codex_app_server_protocol::McpToolCallStatus; + use codex_app_server_protocol::SessionSource; + use codex_app_server_protocol::ThreadStatus; + use codex_app_server_protocol::Turn; + use codex_app_server_protocol::TurnItemsView; + use codex_app_server_protocol::TurnStatus; + use codex_utils_absolute_path::test_support::PathBufExt; + use codex_utils_absolute_path::test_support::test_path_buf; + use pretty_assertions::assert_eq; + + #[test] + fn redacts_mcp_success_result_and_removes_image_generation() { + let mut thread = test_thread(vec![ + ThreadItem::AgentMessage { + id: "agent-1".to_string(), + text: "kept".to_string(), + phase: None, + memory_citation: None, + }, + ThreadItem::McpToolCall { + id: "mcp-1".to_string(), + server: "docs".to_string(), + tool: "lookup".to_string(), + status: McpToolCallStatus::Completed, + arguments: serde_json::json!({"secret":"argument"}), + mcp_app_resource_uri: Some("ui://widget/lookup.html".to_string()), + result: Some(Box::new(McpToolCallResult { + content: vec![serde_json::json!({ + "type": "text", + "text": "secret result" + })], + structured_content: Some(serde_json::json!({"secret":"structured"})), + meta: Some(serde_json::json!({"secret":"meta"})), + })), + error: None, + duration_ms: Some(8), + }, + ThreadItem::ImageGeneration { + id: "ig-1".to_string(), + status: "completed".to_string(), + revised_prompt: Some("revised".to_string()), + result: "base64-result".to_string(), + saved_path: Some(test_path_buf("/tmp/ig-1.png").abs()), + }, + ]); + + redact_thread_resume_payloads(&mut thread); + + assert_eq!(thread.turns[0].items.len(), 2); + assert_eq!( + thread.turns[0].items[0], + ThreadItem::AgentMessage { + id: "agent-1".to_string(), + text: "kept".to_string(), + phase: None, + memory_citation: None, + } + ); + assert_eq!( + thread.turns[0].items[1], + ThreadItem::McpToolCall { + id: "mcp-1".to_string(), + server: "docs".to_string(), + tool: "lookup".to_string(), + status: McpToolCallStatus::Completed, + arguments: JsonValue::String(REDACTED_PAYLOAD.to_string()), + mcp_app_resource_uri: Some("ui://widget/lookup.html".to_string()), + result: Some(Box::new(redacted_mcp_tool_call_result())), + error: None, + duration_ms: Some(8), + } + ); + } + + #[test] + fn redacts_mcp_error_message() { + let mut thread = test_thread(vec![ThreadItem::McpToolCall { + id: "mcp-1".to_string(), + server: "docs".to_string(), + tool: "lookup".to_string(), + status: McpToolCallStatus::Failed, + arguments: serde_json::json!({"secret":"argument"}), + mcp_app_resource_uri: None, + result: None, + error: Some(McpToolCallError { + message: "secret error".to_string(), + }), + duration_ms: Some(8), + }]); + + redact_thread_resume_payloads(&mut thread); + + assert_eq!( + thread.turns[0].items[0], + ThreadItem::McpToolCall { + id: "mcp-1".to_string(), + server: "docs".to_string(), + tool: "lookup".to_string(), + status: McpToolCallStatus::Failed, + arguments: JsonValue::String(REDACTED_PAYLOAD.to_string()), + mcp_app_resource_uri: None, + result: None, + error: Some(McpToolCallError { + message: REDACTED_PAYLOAD.to_string(), + }), + duration_ms: Some(8), + } + ); + } + + fn test_thread(items: Vec) -> Thread { + Thread { + id: "thread-1".to_string(), + session_id: "session-1".to_string(), + forked_from_id: None, + preview: "preview".to_string(), + ephemeral: false, + model_provider: "mock_provider".to_string(), + created_at: 0, + updated_at: 0, + status: ThreadStatus::Idle, + path: None, + cwd: test_path_buf("/tmp").abs(), + cli_version: "0.0.0".to_string(), + source: SessionSource::Cli, + thread_source: None, + agent_nickname: None, + agent_role: None, + git_info: None, + name: None, + turns: vec![Turn { + id: "turn-1".to_string(), + items, + items_view: TurnItemsView::Full, + status: TurnStatus::Completed, + error: None, + started_at: None, + completed_at: None, + duration_ms: None, + }], + } + } +} diff --git a/codex-rs/app-server/src/thread_state.rs b/codex-rs/app-server/src/thread_state.rs index f0dbb0e326..32dfcc325d 100644 --- a/codex-rs/app-server/src/thread_state.rs +++ b/codex-rs/app-server/src/thread_state.rs @@ -34,6 +34,7 @@ pub(crate) struct PendingThreadResumeRequest { pub(crate) emit_thread_goal_update: bool, pub(crate) thread_goal_state_db: Option, pub(crate) include_turns: bool, + pub(crate) redact_resume_payloads: bool, } // ThreadListenerCommand is used to perform operations in the context of the thread listener, for serialization purposes. diff --git a/codex-rs/app-server/tests/suite/v2/thread_resume.rs b/codex-rs/app-server/tests/suite/v2/thread_resume.rs index 36627ac6e7..64dfe0beb7 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_resume.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_resume.rs @@ -15,6 +15,7 @@ use app_test_support::to_response; use app_test_support::write_chatgpt_auth; use chrono::Utc; use codex_app_server_protocol::AskForApproval; +use codex_app_server_protocol::ClientInfo; use codex_app_server_protocol::CommandExecutionApprovalDecision; use codex_app_server_protocol::CommandExecutionRequestApprovalResponse; use codex_app_server_protocol::FileChangeApprovalDecision; @@ -51,10 +52,14 @@ use codex_config::types::AuthCredentialsStoreMode; use codex_login::REFRESH_TOKEN_URL_OVERRIDE_ENV_VAR; use codex_protocol::ThreadId; use codex_protocol::config_types::Personality; +use codex_protocol::mcp::CallToolResult; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::AgentMessageEvent; use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::ImageGenerationEndEvent; +use codex_protocol::protocol::McpInvocation; +use codex_protocol::protocol::McpToolCallEndEvent; use codex_protocol::protocol::SessionMeta; use codex_protocol::protocol::SessionMetaLine; use codex_protocol::protocol::SessionSource as RolloutSessionSource; @@ -77,6 +82,7 @@ use std::io::Write; use std::path::Path; use std::path::PathBuf; use std::process::Command; +use std::time::Duration; use tempfile::TempDir; use tokio::time::timeout; use uuid::Uuid; @@ -376,6 +382,203 @@ async fn thread_resume_returns_rollout_history() -> Result<()> { Ok(()) } +#[tokio::test] +async fn thread_resume_redacts_payloads_for_chatgpt_remote_clients() -> Result<()> { + for client_name in ["codex_chatgpt_android_remote", "codex_chatgpt_ios_remote"] { + let remote_thread = resume_redaction_fixture(Some(client_name)).await?; + let remote_turn = remote_thread + .turns + .first() + .expect("remote resume should include a turn"); + let remote_mcp_item = remote_turn + .items + .iter() + .find(|item| matches!(item, ThreadItem::McpToolCall { .. })) + .expect("remote resume should include redacted MCP item"); + let ThreadItem::McpToolCall { + arguments, + result, + error, + .. + } = remote_mcp_item + else { + unreachable!("matched MCP item"); + }; + assert_eq!(arguments, &json!("[redacted]")); + let result = result.as_ref().expect("redacted MCP result"); + assert_eq!( + result.content, + vec![json!({ + "type": "text", + "text": "[redacted]", + })] + ); + assert_eq!(result.structured_content, None); + assert_eq!(result.meta, None); + assert_eq!(error, &None); + assert!( + !remote_turn + .items + .iter() + .any(|item| matches!(item, ThreadItem::ImageGeneration { .. })), + "remote resume should drop image generation items for {client_name}" + ); + } + + let normal_thread = resume_redaction_fixture(Some("some_other_client")).await?; + let normal_turn = normal_thread + .turns + .first() + .expect("normal resume should include a turn"); + let normal_mcp_item = normal_turn + .items + .iter() + .find(|item| matches!(item, ThreadItem::McpToolCall { .. })) + .expect("normal resume should include MCP item"); + let ThreadItem::McpToolCall { + arguments, result, .. + } = normal_mcp_item + else { + unreachable!("matched MCP item"); + }; + assert_eq!(arguments, &json!({"secret":"argument"})); + let result = result.as_ref().expect("normal MCP result"); + assert_eq!( + result.content, + vec![json!({ + "type": "text", + "text": "secret result", + })] + ); + assert_eq!( + result.structured_content, + Some(json!({"secret":"structured"})) + ); + assert_eq!(result.meta, Some(json!({"secret":"meta"}))); + assert!( + normal_turn.items.iter().any(|item| matches!( + item, + ThreadItem::ImageGeneration { + result, + revised_prompt, + .. + } if result == "base64-image-result" + && revised_prompt.as_deref() == Some("secret revised prompt") + )), + "normal resume should keep image generation items" + ); + + Ok(()) +} + +async fn resume_redaction_fixture( + client_name: Option<&str>, +) -> Result { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let filename_ts = "2025-01-05T12-00-00"; + let meta_rfc3339 = "2025-01-05T12:00:00Z"; + let conversation_id = create_fake_rollout( + codex_home.path(), + filename_ts, + meta_rfc3339, + "Saved user message", + Some("mock_provider"), + /*git_info*/ None, + )?; + append_resume_redaction_history( + codex_home.path(), + filename_ts, + meta_rfc3339, + &conversation_id, + )?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + if let Some(client_name) = client_name { + let _ = timeout( + DEFAULT_READ_TIMEOUT, + mcp.initialize_with_client_info(ClientInfo { + name: client_name.to_string(), + title: None, + version: "0.1.0".to_string(), + }), + ) + .await??; + } else { + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + } + + let resume_id = mcp + .send_thread_resume_request(ThreadResumeParams { + thread_id: conversation_id, + ..Default::default() + }) + .await?; + let resume_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(resume_id)), + ) + .await??; + let ThreadResumeResponse { thread, .. } = to_response::(resume_resp)?; + Ok(thread) +} + +fn append_resume_redaction_history( + codex_home: &Path, + filename_ts: &str, + meta_rfc3339: &str, + conversation_id: &str, +) -> Result<()> { + let rollout_file_path = rollout_path(codex_home, filename_ts, conversation_id); + let persisted_rollout = std::fs::read_to_string(&rollout_file_path)?; + let appended_rollout = [ + EventMsg::McpToolCallEnd(McpToolCallEndEvent { + call_id: "mcp-1".to_string(), + invocation: McpInvocation { + server: "docs".to_string(), + tool: "lookup".to_string(), + arguments: Some(json!({"secret":"argument"})), + }, + mcp_app_resource_uri: Some("ui://widget/lookup.html".to_string()), + duration: Duration::from_millis(8), + result: Ok(CallToolResult { + content: vec![json!({ + "type": "text", + "text": "secret result", + })], + structured_content: Some(json!({"secret":"structured"})), + is_error: Some(false), + meta: Some(json!({"secret":"meta"})), + }), + }), + EventMsg::ImageGenerationEnd(ImageGenerationEndEvent { + call_id: "ig-1".to_string(), + status: "completed".to_string(), + revised_prompt: Some("secret revised prompt".to_string()), + result: "base64-image-result".to_string(), + saved_path: Some(test_absolute_path("/tmp/ig-1.png")), + }), + ] + .into_iter() + .map(|payload| { + Ok(json!({ + "timestamp": meta_rfc3339, + "type": "event_msg", + "payload": serde_json::to_value(payload)?, + }) + .to_string()) + }) + .collect::>>()? + .join("\n"); + std::fs::write( + &rollout_file_path, + format!("{persisted_rollout}{appended_rollout}\n"), + )?; + Ok(()) +} + #[tokio::test] async fn thread_resume_can_skip_turns_for_metadata_only_resume() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await;