mirror of
https://github.com/openai/codex.git
synced 2026-05-17 01:32:32 +00:00
fix(app-server): thread history redaction for remote clients (#22178)
## Summary Remote clients can still receive large `thread/resume` histories when prior turns include MCP tool call payloads or image-generation results. This adds a temporary response-only redaction path for the known remote client names. Longer term we will move towards fully paginated APIs backed by SQLite. ## Changes - Redact MCP tool call payload-bearing fields in `thread/resume` responses for `codex_chatgpt_android_remote` and `codex_chatgpt_ios_remote`. - Drop `imageGeneration` items from those `thread/resume` responses. - Keep redaction out of persisted rollout files, `thread/read`, `thread/turns/list`, live notifications, and token usage replay. - Cover the behavior with app-server helper tests and a v2 resume integration test that checks both remote clients plus a non-target control client. ## Testing - `cargo test -p codex-app-server thread_resume_redaction` - `cargo test -p codex-app-server thread_resume_redacts_payloads_for_chatgpt_remote_clients`
This commit is contained in:
@@ -488,12 +488,14 @@ mod config_errors;
|
||||
mod request_errors;
|
||||
mod thread_goal_processor;
|
||||
mod thread_lifecycle;
|
||||
mod thread_resume_redaction;
|
||||
mod thread_summary;
|
||||
|
||||
use self::config_errors::*;
|
||||
use self::request_errors::*;
|
||||
use self::thread_goal_processor::api_thread_goal_from_state;
|
||||
use self::thread_lifecycle::*;
|
||||
use self::thread_resume_redaction::*;
|
||||
use self::thread_summary::*;
|
||||
|
||||
pub(crate) use self::thread_lifecycle::populate_thread_turns_from_history;
|
||||
|
||||
@@ -557,6 +557,10 @@ pub(super) async fn handle_pending_thread_resume_request(
|
||||
thread_status,
|
||||
has_live_in_progress_turn,
|
||||
);
|
||||
let token_usage_thread = pending.include_turns.then(|| thread.clone());
|
||||
if pending.redact_resume_payloads {
|
||||
redact_thread_resume_payloads(&mut thread);
|
||||
}
|
||||
|
||||
{
|
||||
let pending_thread_unloads = pending_thread_unloads.lock().await;
|
||||
@@ -624,7 +628,6 @@ pub(super) async fn handle_pending_thread_resume_request(
|
||||
active_permission_profile,
|
||||
reasoning_effort,
|
||||
};
|
||||
let token_usage_thread = pending.include_turns.then(|| response.thread.clone());
|
||||
outgoing.send_response(request_id, response).await;
|
||||
// Match cold resume: metadata-only resume should attach the listener without
|
||||
// paying the cost of turn reconstruction for historical usage replay.
|
||||
|
||||
@@ -2333,6 +2333,8 @@ impl ThreadRequestProcessor {
|
||||
self.send_persist_extended_history_deprecation_notice(request_id.connection_id)
|
||||
.await;
|
||||
}
|
||||
let redact_resume_payloads =
|
||||
should_redact_thread_resume_payloads(app_server_client_name.as_deref());
|
||||
|
||||
let _thread_list_state_permit = match self.acquire_thread_list_state_permit().await {
|
||||
Ok(permit) => permit,
|
||||
@@ -2527,6 +2529,10 @@ impl ThreadRequestProcessor {
|
||||
let active_permission_profile = thread_response_active_permission_profile(
|
||||
config_snapshot.active_permission_profile,
|
||||
);
|
||||
let token_usage_thread = include_turns.then(|| thread.clone());
|
||||
if redact_resume_payloads {
|
||||
redact_thread_resume_payloads(&mut thread);
|
||||
}
|
||||
|
||||
let response = ThreadResumeResponse {
|
||||
thread,
|
||||
@@ -2544,7 +2550,6 @@ impl ThreadRequestProcessor {
|
||||
};
|
||||
|
||||
let connection_id = request_id.connection_id;
|
||||
let token_usage_thread = include_turns.then(|| response.thread.clone());
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
// `excludeTurns` is explicitly the cheap resume path, so avoid
|
||||
// rebuilding history only to attribute a replayed usage update.
|
||||
@@ -2664,6 +2669,8 @@ impl ThreadRequestProcessor {
|
||||
};
|
||||
|
||||
if let Some((existing_thread_id, existing_thread, source_thread)) = running_thread {
|
||||
let redact_resume_payloads =
|
||||
should_redact_thread_resume_payloads(app_server_client_name.as_deref());
|
||||
let history_items = source_thread
|
||||
.history
|
||||
.as_ref()
|
||||
@@ -2738,6 +2745,7 @@ impl ThreadRequestProcessor {
|
||||
emit_thread_goal_update,
|
||||
thread_goal_state_db,
|
||||
include_turns: !params.exclude_turns,
|
||||
redact_resume_payloads,
|
||||
}),
|
||||
);
|
||||
if listener_command_tx.send(command).is_err() {
|
||||
|
||||
@@ -0,0 +1,198 @@
|
||||
use codex_app_server_protocol::McpToolCallResult;
|
||||
use codex_app_server_protocol::Thread;
|
||||
use codex_app_server_protocol::ThreadItem;
|
||||
use serde_json::Value as JsonValue;
|
||||
|
||||
// Temporary bandaid for remote clients: thread/resume can include large MCP and
|
||||
// image-generation payloads. Keep this response-only so persisted rollout
|
||||
// history, model resume history, and other APIs stay unchanged.
|
||||
const REDACTED_PAYLOAD: &str = "[redacted]";
|
||||
const CHATGPT_REMOTE_CLIENT_NAMES: &[&str] =
|
||||
&["codex_chatgpt_android_remote", "codex_chatgpt_ios_remote"];
|
||||
|
||||
pub(super) fn should_redact_thread_resume_payloads(client_name: Option<&str>) -> bool {
|
||||
client_name.is_some_and(|client_name| CHATGPT_REMOTE_CLIENT_NAMES.contains(&client_name))
|
||||
}
|
||||
|
||||
pub(super) fn redact_thread_resume_payloads(thread: &mut Thread) {
|
||||
for turn in &mut thread.turns {
|
||||
turn.items.retain_mut(|item| match item {
|
||||
ThreadItem::McpToolCall {
|
||||
arguments,
|
||||
result,
|
||||
error,
|
||||
..
|
||||
} => {
|
||||
*arguments = JsonValue::String(REDACTED_PAYLOAD.to_string());
|
||||
if result.is_some() {
|
||||
*result = Some(Box::new(redacted_mcp_tool_call_result()));
|
||||
}
|
||||
if let Some(error) = error {
|
||||
error.message = REDACTED_PAYLOAD.to_string();
|
||||
}
|
||||
true
|
||||
}
|
||||
ThreadItem::ImageGeneration { .. } => false,
|
||||
_ => true,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn redacted_mcp_tool_call_result() -> McpToolCallResult {
|
||||
McpToolCallResult {
|
||||
content: vec![serde_json::json!({
|
||||
"type": "text",
|
||||
"text": REDACTED_PAYLOAD,
|
||||
})],
|
||||
structured_content: None,
|
||||
meta: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use codex_app_server_protocol::McpToolCallError;
|
||||
use codex_app_server_protocol::McpToolCallStatus;
|
||||
use codex_app_server_protocol::SessionSource;
|
||||
use codex_app_server_protocol::ThreadStatus;
|
||||
use codex_app_server_protocol::Turn;
|
||||
use codex_app_server_protocol::TurnItemsView;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_utils_absolute_path::test_support::PathBufExt;
|
||||
use codex_utils_absolute_path::test_support::test_path_buf;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
fn redacts_mcp_success_result_and_removes_image_generation() {
|
||||
let mut thread = test_thread(vec![
|
||||
ThreadItem::AgentMessage {
|
||||
id: "agent-1".to_string(),
|
||||
text: "kept".to_string(),
|
||||
phase: None,
|
||||
memory_citation: None,
|
||||
},
|
||||
ThreadItem::McpToolCall {
|
||||
id: "mcp-1".to_string(),
|
||||
server: "docs".to_string(),
|
||||
tool: "lookup".to_string(),
|
||||
status: McpToolCallStatus::Completed,
|
||||
arguments: serde_json::json!({"secret":"argument"}),
|
||||
mcp_app_resource_uri: Some("ui://widget/lookup.html".to_string()),
|
||||
result: Some(Box::new(McpToolCallResult {
|
||||
content: vec![serde_json::json!({
|
||||
"type": "text",
|
||||
"text": "secret result"
|
||||
})],
|
||||
structured_content: Some(serde_json::json!({"secret":"structured"})),
|
||||
meta: Some(serde_json::json!({"secret":"meta"})),
|
||||
})),
|
||||
error: None,
|
||||
duration_ms: Some(8),
|
||||
},
|
||||
ThreadItem::ImageGeneration {
|
||||
id: "ig-1".to_string(),
|
||||
status: "completed".to_string(),
|
||||
revised_prompt: Some("revised".to_string()),
|
||||
result: "base64-result".to_string(),
|
||||
saved_path: Some(test_path_buf("/tmp/ig-1.png").abs()),
|
||||
},
|
||||
]);
|
||||
|
||||
redact_thread_resume_payloads(&mut thread);
|
||||
|
||||
assert_eq!(thread.turns[0].items.len(), 2);
|
||||
assert_eq!(
|
||||
thread.turns[0].items[0],
|
||||
ThreadItem::AgentMessage {
|
||||
id: "agent-1".to_string(),
|
||||
text: "kept".to_string(),
|
||||
phase: None,
|
||||
memory_citation: None,
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
thread.turns[0].items[1],
|
||||
ThreadItem::McpToolCall {
|
||||
id: "mcp-1".to_string(),
|
||||
server: "docs".to_string(),
|
||||
tool: "lookup".to_string(),
|
||||
status: McpToolCallStatus::Completed,
|
||||
arguments: JsonValue::String(REDACTED_PAYLOAD.to_string()),
|
||||
mcp_app_resource_uri: Some("ui://widget/lookup.html".to_string()),
|
||||
result: Some(Box::new(redacted_mcp_tool_call_result())),
|
||||
error: None,
|
||||
duration_ms: Some(8),
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn redacts_mcp_error_message() {
|
||||
let mut thread = test_thread(vec![ThreadItem::McpToolCall {
|
||||
id: "mcp-1".to_string(),
|
||||
server: "docs".to_string(),
|
||||
tool: "lookup".to_string(),
|
||||
status: McpToolCallStatus::Failed,
|
||||
arguments: serde_json::json!({"secret":"argument"}),
|
||||
mcp_app_resource_uri: None,
|
||||
result: None,
|
||||
error: Some(McpToolCallError {
|
||||
message: "secret error".to_string(),
|
||||
}),
|
||||
duration_ms: Some(8),
|
||||
}]);
|
||||
|
||||
redact_thread_resume_payloads(&mut thread);
|
||||
|
||||
assert_eq!(
|
||||
thread.turns[0].items[0],
|
||||
ThreadItem::McpToolCall {
|
||||
id: "mcp-1".to_string(),
|
||||
server: "docs".to_string(),
|
||||
tool: "lookup".to_string(),
|
||||
status: McpToolCallStatus::Failed,
|
||||
arguments: JsonValue::String(REDACTED_PAYLOAD.to_string()),
|
||||
mcp_app_resource_uri: None,
|
||||
result: None,
|
||||
error: Some(McpToolCallError {
|
||||
message: REDACTED_PAYLOAD.to_string(),
|
||||
}),
|
||||
duration_ms: Some(8),
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
fn test_thread(items: Vec<ThreadItem>) -> Thread {
|
||||
Thread {
|
||||
id: "thread-1".to_string(),
|
||||
session_id: "session-1".to_string(),
|
||||
forked_from_id: None,
|
||||
preview: "preview".to_string(),
|
||||
ephemeral: false,
|
||||
model_provider: "mock_provider".to_string(),
|
||||
created_at: 0,
|
||||
updated_at: 0,
|
||||
status: ThreadStatus::Idle,
|
||||
path: None,
|
||||
cwd: test_path_buf("/tmp").abs(),
|
||||
cli_version: "0.0.0".to_string(),
|
||||
source: SessionSource::Cli,
|
||||
thread_source: None,
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
git_info: None,
|
||||
name: None,
|
||||
turns: vec![Turn {
|
||||
id: "turn-1".to_string(),
|
||||
items,
|
||||
items_view: TurnItemsView::Full,
|
||||
status: TurnStatus::Completed,
|
||||
error: None,
|
||||
started_at: None,
|
||||
completed_at: None,
|
||||
duration_ms: None,
|
||||
}],
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -34,6 +34,7 @@ pub(crate) struct PendingThreadResumeRequest {
|
||||
pub(crate) emit_thread_goal_update: bool,
|
||||
pub(crate) thread_goal_state_db: Option<StateDbHandle>,
|
||||
pub(crate) include_turns: bool,
|
||||
pub(crate) redact_resume_payloads: bool,
|
||||
}
|
||||
|
||||
// ThreadListenerCommand is used to perform operations in the context of the thread listener, for serialization purposes.
|
||||
|
||||
@@ -15,6 +15,7 @@ use app_test_support::to_response;
|
||||
use app_test_support::write_chatgpt_auth;
|
||||
use chrono::Utc;
|
||||
use codex_app_server_protocol::AskForApproval;
|
||||
use codex_app_server_protocol::ClientInfo;
|
||||
use codex_app_server_protocol::CommandExecutionApprovalDecision;
|
||||
use codex_app_server_protocol::CommandExecutionRequestApprovalResponse;
|
||||
use codex_app_server_protocol::FileChangeApprovalDecision;
|
||||
@@ -51,10 +52,14 @@ use codex_config::types::AuthCredentialsStoreMode;
|
||||
use codex_login::REFRESH_TOKEN_URL_OVERRIDE_ENV_VAR;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::config_types::Personality;
|
||||
use codex_protocol::mcp::CallToolResult;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::AgentMessageEvent;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::ImageGenerationEndEvent;
|
||||
use codex_protocol::protocol::McpInvocation;
|
||||
use codex_protocol::protocol::McpToolCallEndEvent;
|
||||
use codex_protocol::protocol::SessionMeta;
|
||||
use codex_protocol::protocol::SessionMetaLine;
|
||||
use codex_protocol::protocol::SessionSource as RolloutSessionSource;
|
||||
@@ -77,6 +82,7 @@ use std::io::Write;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::process::Command;
|
||||
use std::time::Duration;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
use uuid::Uuid;
|
||||
@@ -376,6 +382,203 @@ async fn thread_resume_returns_rollout_history() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_resume_redacts_payloads_for_chatgpt_remote_clients() -> Result<()> {
|
||||
for client_name in ["codex_chatgpt_android_remote", "codex_chatgpt_ios_remote"] {
|
||||
let remote_thread = resume_redaction_fixture(Some(client_name)).await?;
|
||||
let remote_turn = remote_thread
|
||||
.turns
|
||||
.first()
|
||||
.expect("remote resume should include a turn");
|
||||
let remote_mcp_item = remote_turn
|
||||
.items
|
||||
.iter()
|
||||
.find(|item| matches!(item, ThreadItem::McpToolCall { .. }))
|
||||
.expect("remote resume should include redacted MCP item");
|
||||
let ThreadItem::McpToolCall {
|
||||
arguments,
|
||||
result,
|
||||
error,
|
||||
..
|
||||
} = remote_mcp_item
|
||||
else {
|
||||
unreachable!("matched MCP item");
|
||||
};
|
||||
assert_eq!(arguments, &json!("[redacted]"));
|
||||
let result = result.as_ref().expect("redacted MCP result");
|
||||
assert_eq!(
|
||||
result.content,
|
||||
vec![json!({
|
||||
"type": "text",
|
||||
"text": "[redacted]",
|
||||
})]
|
||||
);
|
||||
assert_eq!(result.structured_content, None);
|
||||
assert_eq!(result.meta, None);
|
||||
assert_eq!(error, &None);
|
||||
assert!(
|
||||
!remote_turn
|
||||
.items
|
||||
.iter()
|
||||
.any(|item| matches!(item, ThreadItem::ImageGeneration { .. })),
|
||||
"remote resume should drop image generation items for {client_name}"
|
||||
);
|
||||
}
|
||||
|
||||
let normal_thread = resume_redaction_fixture(Some("some_other_client")).await?;
|
||||
let normal_turn = normal_thread
|
||||
.turns
|
||||
.first()
|
||||
.expect("normal resume should include a turn");
|
||||
let normal_mcp_item = normal_turn
|
||||
.items
|
||||
.iter()
|
||||
.find(|item| matches!(item, ThreadItem::McpToolCall { .. }))
|
||||
.expect("normal resume should include MCP item");
|
||||
let ThreadItem::McpToolCall {
|
||||
arguments, result, ..
|
||||
} = normal_mcp_item
|
||||
else {
|
||||
unreachable!("matched MCP item");
|
||||
};
|
||||
assert_eq!(arguments, &json!({"secret":"argument"}));
|
||||
let result = result.as_ref().expect("normal MCP result");
|
||||
assert_eq!(
|
||||
result.content,
|
||||
vec![json!({
|
||||
"type": "text",
|
||||
"text": "secret result",
|
||||
})]
|
||||
);
|
||||
assert_eq!(
|
||||
result.structured_content,
|
||||
Some(json!({"secret":"structured"}))
|
||||
);
|
||||
assert_eq!(result.meta, Some(json!({"secret":"meta"})));
|
||||
assert!(
|
||||
normal_turn.items.iter().any(|item| matches!(
|
||||
item,
|
||||
ThreadItem::ImageGeneration {
|
||||
result,
|
||||
revised_prompt,
|
||||
..
|
||||
} if result == "base64-image-result"
|
||||
&& revised_prompt.as_deref() == Some("secret revised prompt")
|
||||
)),
|
||||
"normal resume should keep image generation items"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn resume_redaction_fixture(
|
||||
client_name: Option<&str>,
|
||||
) -> Result<codex_app_server_protocol::Thread> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let filename_ts = "2025-01-05T12-00-00";
|
||||
let meta_rfc3339 = "2025-01-05T12:00:00Z";
|
||||
let conversation_id = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
filename_ts,
|
||||
meta_rfc3339,
|
||||
"Saved user message",
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
append_resume_redaction_history(
|
||||
codex_home.path(),
|
||||
filename_ts,
|
||||
meta_rfc3339,
|
||||
&conversation_id,
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
if let Some(client_name) = client_name {
|
||||
let _ = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.initialize_with_client_info(ClientInfo {
|
||||
name: client_name.to_string(),
|
||||
title: None,
|
||||
version: "0.1.0".to_string(),
|
||||
}),
|
||||
)
|
||||
.await??;
|
||||
} else {
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
}
|
||||
|
||||
let resume_id = mcp
|
||||
.send_thread_resume_request(ThreadResumeParams {
|
||||
thread_id: conversation_id,
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let resume_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadResumeResponse { thread, .. } = to_response::<ThreadResumeResponse>(resume_resp)?;
|
||||
Ok(thread)
|
||||
}
|
||||
|
||||
fn append_resume_redaction_history(
|
||||
codex_home: &Path,
|
||||
filename_ts: &str,
|
||||
meta_rfc3339: &str,
|
||||
conversation_id: &str,
|
||||
) -> Result<()> {
|
||||
let rollout_file_path = rollout_path(codex_home, filename_ts, conversation_id);
|
||||
let persisted_rollout = std::fs::read_to_string(&rollout_file_path)?;
|
||||
let appended_rollout = [
|
||||
EventMsg::McpToolCallEnd(McpToolCallEndEvent {
|
||||
call_id: "mcp-1".to_string(),
|
||||
invocation: McpInvocation {
|
||||
server: "docs".to_string(),
|
||||
tool: "lookup".to_string(),
|
||||
arguments: Some(json!({"secret":"argument"})),
|
||||
},
|
||||
mcp_app_resource_uri: Some("ui://widget/lookup.html".to_string()),
|
||||
duration: Duration::from_millis(8),
|
||||
result: Ok(CallToolResult {
|
||||
content: vec![json!({
|
||||
"type": "text",
|
||||
"text": "secret result",
|
||||
})],
|
||||
structured_content: Some(json!({"secret":"structured"})),
|
||||
is_error: Some(false),
|
||||
meta: Some(json!({"secret":"meta"})),
|
||||
}),
|
||||
}),
|
||||
EventMsg::ImageGenerationEnd(ImageGenerationEndEvent {
|
||||
call_id: "ig-1".to_string(),
|
||||
status: "completed".to_string(),
|
||||
revised_prompt: Some("secret revised prompt".to_string()),
|
||||
result: "base64-image-result".to_string(),
|
||||
saved_path: Some(test_absolute_path("/tmp/ig-1.png")),
|
||||
}),
|
||||
]
|
||||
.into_iter()
|
||||
.map(|payload| {
|
||||
Ok(json!({
|
||||
"timestamp": meta_rfc3339,
|
||||
"type": "event_msg",
|
||||
"payload": serde_json::to_value(payload)?,
|
||||
})
|
||||
.to_string())
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?
|
||||
.join("\n");
|
||||
std::fs::write(
|
||||
&rollout_file_path,
|
||||
format!("{persisted_rollout}{appended_rollout}\n"),
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_resume_can_skip_turns_for_metadata_only_resume() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
|
||||
Reference in New Issue
Block a user