From ff15db5d403ec699c74d3ef94ea37389599d2a3e Mon Sep 17 00:00:00 2001 From: Liang-Ting Jiang Date: Fri, 24 Apr 2026 16:04:06 -0700 Subject: [PATCH] Add OpenAI file download materialization and library upload --- codex-rs/codex-api/src/files.rs | 345 +++++++++++++++++- codex-rs/codex-api/src/lib.rs | 2 + codex-rs/core/src/codex_apps_file_download.rs | 341 +++++++++++++++++ codex-rs/core/src/lib.rs | 1 + codex-rs/core/src/mcp_openai_file.rs | 36 +- codex-rs/core/src/mcp_tool_call.rs | 47 ++- codex-rs/core/src/mcp_tool_call_tests.rs | 14 + codex-rs/utils/plugins/src/mcp_connector.rs | 5 + 8 files changed, 765 insertions(+), 26 deletions(-) create mode 100644 codex-rs/core/src/codex_apps_file_download.rs diff --git a/codex-rs/codex-api/src/files.rs b/codex-rs/codex-api/src/files.rs index d1e2840066..9e308e01a4 100644 --- a/codex-rs/codex-api/src/files.rs +++ b/codex-rs/codex-api/src/files.rs @@ -10,6 +10,7 @@ use serde::Deserialize; use tokio::fs::File; use tokio::time::Instant; use tokio_util::io::ReaderStream; +use url::Url; pub const OPENAI_FILE_URI_PREFIX: &str = "sediment://"; pub const OPENAI_FILE_UPLOAD_LIMIT_BYTES: u64 = 512 * 1024 * 1024; @@ -18,16 +19,21 @@ const OPENAI_FILE_REQUEST_TIMEOUT: Duration = Duration::from_secs(60); const OPENAI_FILE_FINALIZE_TIMEOUT: Duration = Duration::from_secs(30); const OPENAI_FILE_FINALIZE_RETRY_DELAY: Duration = Duration::from_millis(250); const OPENAI_FILE_USE_CASE: &str = "codex"; +#[derive(Debug, Clone, PartialEq, Eq, Default)] +pub struct OpenAiFileUploadOptions { + pub store_in_library: bool, +} #[derive(Debug, Clone, PartialEq, Eq)] pub struct UploadedOpenAiFile { pub file_id: String, pub uri: String, - pub download_url: String, + pub download_url: Option, pub file_name: String, pub file_size_bytes: u64, pub mime_type: Option, pub path: PathBuf, + pub library_file_id: Option, } #[derive(Debug, thiserror::Error)] @@ -68,6 +74,12 @@ pub enum OpenAiFileError { #[source] source: serde_json::Error, }, + #[error("failed to resolve OpenAI file URL `{url}`: {source}")] + InvalidUrl { + url: String, + #[source] + source: url::ParseError, + }, #[error("OpenAI file upload for `{file_id}` is not ready yet")] UploadNotReady { file_id: String }, #[error("OpenAI file upload for `{file_id}` failed: {message}")] @@ -90,14 +102,55 @@ struct DownloadLinkResponse { error_message: Option, } +#[derive(Deserialize)] +struct ProcessUploadStreamStatus { + event: Option, + message: Option, + #[serde(default)] + extra: Option, +} + +#[derive(Debug, Default, Deserialize)] +struct ProcessUploadStreamExtra { + #[serde(alias = "metadata_object_id", alias = "library_file_id")] + library_file_id: Option, + #[serde(alias = "library_file_name")] + file_name: Option, + mime_type: Option, +} + pub fn openai_file_uri(file_id: &str) -> String { format!("{OPENAI_FILE_URI_PREFIX}{file_id}") } +pub async fn download_openai_file( + base_url: &str, + auth: &impl AuthProvider, + download_url: &str, +) -> Result, OpenAiFileError> { + let resolved_url = resolve_openai_file_download_url(base_url, download_url)?; + let request_builder = if should_attach_auth_to_openai_file_url(&resolved_url, base_url) { + authorized_request(auth, reqwest::Method::GET, resolved_url.as_str()) + } else { + build_reqwest_client() + .request(reqwest::Method::GET, resolved_url.as_str()) + .timeout(OPENAI_FILE_REQUEST_TIMEOUT) + }; + let response = request_builder + .send() + .await + .map_err(|source| OpenAiFileError::Request { + url: resolved_url.to_string(), + source, + })?; + response_bytes(resolved_url.as_str(), response).await +} + pub async fn upload_local_file( base_url: &str, auth: &dyn AuthProvider, path: &Path, + options: &OpenAiFileUploadOptions, ) -> Result { let metadata = tokio::fs::metadata(path) .await @@ -128,13 +181,18 @@ pub async fn upload_local_file( .and_then(|value| value.to_str()) .unwrap_or("file") .to_string(); - let create_url = format!("{}/files", base_url.trim_end_matches('/')); + let base_url = base_url.trim_end_matches('/'); + let mut create_request = serde_json::json!({ + "file_name": file_name, + "file_size": metadata.len(), + "use_case": OPENAI_FILE_USE_CASE, + }); + if options.store_in_library { + create_request["store_in_library"] = serde_json::json!(true); + } + let create_url = format!("{base_url}/files"); let create_response = authorized_request(auth, reqwest::Method::POST, &create_url) - .json(&serde_json::json!({ - "file_name": file_name, - "file_size": metadata.len(), - "use_case": OPENAI_FILE_USE_CASE, - })) + .json(&create_request) .send() .await .map_err(|source| OpenAiFileError::Request { @@ -184,11 +242,29 @@ pub async fn upload_local_file( }); } - let finalize_url = format!( - "{}/files/{}/uploaded", - base_url.trim_end_matches('/'), - create_payload.file_id, - ); + if options.store_in_library { + let processed = + process_upload_stream(auth, base_url, &create_payload.file_id, &file_name).await?; + let library_file_id = + processed + .library_file_id + .ok_or_else(|| OpenAiFileError::UploadFailed { + file_id: create_payload.file_id.clone(), + message: "upload completed without creating a library_file_id".to_string(), + })?; + return Ok(UploadedOpenAiFile { + file_id: create_payload.file_id.clone(), + uri: openai_file_uri(&create_payload.file_id), + download_url: None, + file_name: processed.file_name.unwrap_or(file_name), + file_size_bytes: metadata.len(), + mime_type: processed.mime_type, + path: path.to_path_buf(), + library_file_id: Some(library_file_id), + }); + } + + let finalize_url = format!("{base_url}/files/{}/uploaded", create_payload.file_id); let finalize_started_at = Instant::now(); loop { let finalize_response = authorized_request(auth, reqwest::Method::POST, &finalize_url) @@ -219,16 +295,17 @@ pub async fn upload_local_file( return Ok(UploadedOpenAiFile { file_id: create_payload.file_id.clone(), uri: openai_file_uri(&create_payload.file_id), - download_url: finalize_payload.download_url.ok_or_else(|| { + download_url: Some(finalize_payload.download_url.ok_or_else(|| { OpenAiFileError::UploadFailed { file_id: create_payload.file_id.clone(), message: "missing download_url".to_string(), } - })?, + })?), file_name: finalize_payload.file_name.unwrap_or(file_name), file_size_bytes: metadata.len(), mime_type: finalize_payload.mime_type, path: path.to_path_buf(), + library_file_id: None, }); } "retry" => { @@ -251,6 +328,82 @@ pub async fn upload_local_file( } } +async fn process_upload_stream( + auth: &dyn AuthProvider, + base_url: &str, + file_id: &str, + file_name: &str, +) -> Result { + let process_url = format!("{base_url}/files/process_upload_stream"); + let process_response = authorized_request(auth, reqwest::Method::POST, &process_url) + .json(&serde_json::json!({ + "file_id": file_id, + "file_name": file_name, + "use_case": OPENAI_FILE_USE_CASE, + "index_for_retrieval": false, + "entry_surface": OPENAI_FILE_USE_CASE, + "metadata": { + "store_in_library": true, + }, + })) + .send() + .await + .map_err(|source| OpenAiFileError::Request { + url: process_url.clone(), + source, + })?; + let process_body = response_text(&process_url, process_response).await?; + + let mut result = ProcessUploadStreamExtra::default(); + for line in process_body.lines() { + let line = line.trim(); + if line.is_empty() { + continue; + } + let ProcessUploadStreamStatus { + event, + message, + extra, + } = serde_json::from_str(line).map_err(|source| OpenAiFileError::Decode { + url: process_url.clone(), + source, + })?; + let extra = extra.unwrap_or_default(); + + if let Some(event) = event.as_deref() + && is_process_upload_stream_error_event(event) + { + return Err(OpenAiFileError::UploadFailed { + file_id: file_id.to_string(), + message: message + .filter(|message| !message.is_empty()) + .unwrap_or_else(|| format!("process_upload_stream returned {event}")), + }); + } + + if result.library_file_id.is_none() { + result.library_file_id = non_empty_string(extra.library_file_id); + } + if result.file_name.is_none() { + result.file_name = non_empty_string(extra.file_name); + } + if result.mime_type.is_none() { + result.mime_type = non_empty_string(extra.mime_type); + } + } + + Ok(result) +} + +fn non_empty_string(value: Option) -> Option { + value.filter(|value| !value.is_empty()) +} + +fn is_process_upload_stream_error_event(event: &str) -> bool { + let event_tail = event.rsplit(['.', '_']).next().unwrap_or(event); + matches!(event_tail, "error" | "cancelled" | "unknown") +} + fn authorized_request( auth: &dyn AuthProvider, method: reqwest::Method, @@ -266,6 +419,82 @@ fn authorized_request( .headers(headers) } +async fn response_text(url: &str, response: reqwest::Response) -> Result { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + if !status.is_success() { + return Err(OpenAiFileError::UnexpectedStatus { + url: url.to_string(), + status, + body, + }); + } + Ok(body) +} + +async fn response_bytes( + url: &str, + response: reqwest::Response, +) -> Result, OpenAiFileError> { + let status = response.status(); + if !status.is_success() { + let body = response.text().await.unwrap_or_default(); + return Err(OpenAiFileError::UnexpectedStatus { + url: url.to_string(), + status, + body, + }); + } + let bytes = response + .bytes() + .await + .map_err(|source| OpenAiFileError::Request { + url: url.to_string(), + source, + })?; + Ok(bytes.to_vec()) +} + +fn resolve_openai_file_download_url( + base_url: &str, + download_url: &str, +) -> Result { + match Url::parse(download_url) { + Ok(url) => Ok(url), + Err(url::ParseError::RelativeUrlWithoutBase) => { + let normalized_base_url = if base_url.ends_with('/') { + base_url.to_string() + } else { + format!("{base_url}/") + }; + let base = + Url::parse(&normalized_base_url).map_err(|source| OpenAiFileError::InvalidUrl { + url: normalized_base_url.clone(), + source, + })?; + base.join(download_url) + .map_err(|source| OpenAiFileError::InvalidUrl { + url: download_url.to_string(), + source, + }) + } + Err(source) => Err(OpenAiFileError::InvalidUrl { + url: download_url.to_string(), + source, + }), + } +} + +fn should_attach_auth_to_openai_file_url(download_url: &Url, base_url: &str) -> bool { + let Ok(base_url) = Url::parse(base_url) else { + return false; + }; + match (download_url.host_str(), base_url.host_str()) { + (Some(download_host), Some(base_host)) => download_host.eq_ignore_ascii_case(base_host), + _ => false, + } +} + fn build_reqwest_client() -> reqwest::Client { build_reqwest_client_with_custom_ca(reqwest::Client::builder()).unwrap_or_else(|error| { tracing::warn!(error = %error, "failed to build OpenAI file upload client"); @@ -363,18 +592,98 @@ mod tests { let path = dir.path().join("hello.txt"); tokio::fs::write(&path, b"hello").await.expect("write file"); - let uploaded = upload_local_file(&base_url, &chatgpt_auth(), &path) - .await - .expect("upload succeeds"); + let uploaded = upload_local_file( + &base_url, + &chatgpt_auth(), + &path, + &OpenAiFileUploadOptions::default(), + ) + .await + .expect("upload succeeds"); assert_eq!(uploaded.file_id, "file_123"); assert_eq!(uploaded.uri, "sediment://file_123"); assert_eq!( uploaded.download_url, - format!("{}/download/file_123", server.uri()) + Some(format!("{}/download/file_123", server.uri())) ); assert_eq!(uploaded.file_name, "hello.txt"); assert_eq!(uploaded.mime_type, Some("text/plain".to_string())); assert_eq!(finalize_attempts.load(Ordering::SeqCst), 2); } + + #[tokio::test] + async fn upload_local_file_stores_library_file_with_process_upload_stream() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/backend-api/files")) + .and(header("chatgpt-account-id", "account_id")) + .and(body_json(serde_json::json!({ + "file_name": "hello.txt", + "file_size": 5, + "use_case": "codex", + "store_in_library": true, + }))) + .respond_with( + ResponseTemplate::new(200) + .set_body_json(serde_json::json!({"file_id": "file_123", "upload_url": format!("{}/upload/file_123", server.uri())})), + ) + .mount(&server) + .await; + Mock::given(method("PUT")) + .and(path("/upload/file_123")) + .and(header("content-length", "5")) + .respond_with(ResponseTemplate::new(200)) + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path("/backend-api/files/process_upload_stream")) + .and(body_json(serde_json::json!({ + "file_id": "file_123", + "file_name": "hello.txt", + "use_case": "codex", + "index_for_retrieval": false, + "entry_surface": "codex", + "metadata": { + "store_in_library": true, + }, + }))) + .respond_with(ResponseTemplate::new(200).set_body_bytes( + concat!( + "{\"file_id\":\"file_123\",\"event\":\"indexing.completed\",\"message\":\"\",", + "\"extra\":{\"metadata_object_id\":\"library_123\",", + "\"library_file_name\":\"hello.txt\",\"mime_type\":\"text/plain\"}}\n", + "{\"file_id\":\"file_123\",\"event\":\"completed\",", + "\"message\":\"Succeeded processing file file_123\",", + "\"progress\":100,\"extra\":null}\n", + ) + .as_bytes() + .to_vec(), + )) + .mount(&server) + .await; + + let base_url = base_url_for(&server); + let dir = TempDir::new().expect("temp dir"); + let path = dir.path().join("hello.txt"); + tokio::fs::write(&path, b"hello").await.expect("write file"); + + let uploaded = upload_local_file( + &base_url, + &chatgpt_auth(), + &path, + &OpenAiFileUploadOptions { + store_in_library: true, + }, + ) + .await + .expect("upload succeeds"); + + assert_eq!(uploaded.file_id, "file_123"); + assert_eq!(uploaded.uri, "sediment://file_123"); + assert_eq!(uploaded.download_url, None); + assert_eq!(uploaded.file_name, "hello.txt"); + assert_eq!(uploaded.mime_type, Some("text/plain".to_string())); + assert_eq!(uploaded.library_file_id, Some("library_123".to_string())); + } } diff --git a/codex-rs/codex-api/src/lib.rs b/codex-rs/codex-api/src/lib.rs index 0b8aee266b..f1eb23a96e 100644 --- a/codex-rs/codex-api/src/lib.rs +++ b/codex-rs/codex-api/src/lib.rs @@ -57,6 +57,8 @@ pub use crate::endpoint::ResponsesWebsocketClient; pub use crate::endpoint::ResponsesWebsocketConnection; pub use crate::endpoint::session_update_session_json; pub use crate::error::ApiError; +pub use crate::files::OpenAiFileUploadOptions; +pub use crate::files::download_openai_file; pub use crate::files::upload_local_file; pub use crate::provider::Provider; pub use crate::provider::RetryConfig; diff --git a/codex-rs/core/src/codex_apps_file_download.rs b/codex-rs/core/src/codex_apps_file_download.rs new file mode 100644 index 0000000000..db09ba7d95 --- /dev/null +++ b/codex-rs/core/src/codex_apps_file_download.rs @@ -0,0 +1,341 @@ +use crate::session::session::Session; +use crate::session::turn_context::TurnContext; +use codex_api::download_openai_file; +use codex_login::CodexAuth; +use codex_model_provider::BearerAuthProvider; +use codex_protocol::mcp::CallToolResult; +use serde::Deserialize; +use serde::Serialize; +use serde_json::Map as JsonMap; +use serde_json::Value as JsonValue; +use tracing::warn; + +const CODEX_APPS_FILE_DOWNLOAD_ARTIFACTS_DIR: &str = ".tmp/codex_apps_downloads"; +const CODEX_APPS_PROVIDER_BUILTIN: &str = "builtin"; +const CODEX_APPS_META_PROVIDER_KEY: &str = "provider"; +const CODEX_APPS_META_MATERIALIZE_FILE_DOWNLOAD_KEY: &str = "materialize_file_download"; + +#[derive(Debug, Deserialize, Serialize)] +struct CodexAppsFileDownloadPayload { + file_id: String, + #[serde(default)] + file_name: Option, + file_uri: CodexAppsFileUri, +} + +#[derive(Debug, Deserialize, Serialize)] +struct CodexAppsFileUri { + download_url: String, + #[serde(default)] + file_name: Option, +} + +fn codex_apps_download_base_url(turn_context: &TurnContext) -> &str { + turn_context.config.chatgpt_base_url.as_str() +} + +fn should_materialize_codex_apps_file_download( + server: &str, + codex_apps_meta: Option<&JsonMap>, +) -> bool { + if server != codex_mcp::CODEX_APPS_MCP_SERVER_NAME { + return false; + } + + let Some(codex_apps_meta) = codex_apps_meta else { + return false; + }; + + codex_apps_meta + .get(CODEX_APPS_META_PROVIDER_KEY) + .and_then(JsonValue::as_str) + == Some(CODEX_APPS_PROVIDER_BUILTIN) + && codex_apps_meta + .get(CODEX_APPS_META_MATERIALIZE_FILE_DOWNLOAD_KEY) + .and_then(JsonValue::as_bool) + == Some(true) +} + +pub(crate) async fn maybe_materialize_codex_apps_file_download_result( + sess: &Session, + turn_context: &TurnContext, + server: &str, + codex_apps_meta: Option<&JsonMap>, + result: CallToolResult, +) -> CallToolResult { + let auth = sess.services.auth_manager.auth().await; + maybe_materialize_codex_apps_file_download_result_with_auth( + turn_context, + &sess.conversation_id.to_string(), + auth.as_ref(), + server, + codex_apps_meta, + result, + ) + .await +} + +async fn maybe_materialize_codex_apps_file_download_result_with_auth( + turn_context: &TurnContext, + session_id: &str, + auth: Option<&CodexAuth>, + server: &str, + codex_apps_meta: Option<&JsonMap>, + mut result: CallToolResult, +) -> CallToolResult { + if !should_materialize_codex_apps_file_download(server, codex_apps_meta) + || result.is_error == Some(true) + { + return result; + } + + let Some(payload) = extract_codex_apps_file_download_payload(&result) else { + return result; + }; + let download_base_url = codex_apps_download_base_url(turn_context); + if result.structured_content.is_none() + && let Ok(structured_content) = serde_json::to_value(&payload) + { + result.structured_content = Some(structured_content); + } + + let Some(auth) = auth else { + warn!( + "skipping codex_apps file download materialization because ChatGPT auth is unavailable" + ); + return result; + }; + let token_data = match auth.get_token_data() { + Ok(token_data) => token_data, + Err(error) => { + warn!(error = %error, "failed to read ChatGPT auth for codex_apps file download materialization"); + return result; + } + }; + let auth_provider = BearerAuthProvider { + token: Some(token_data.access_token), + account_id: token_data.account_id, + is_fedramp_account: auth.is_fedramp_account(), + }; + let downloaded = match download_openai_file( + download_base_url, + &auth_provider, + &payload.file_uri.download_url, + ) + .await + { + Ok(downloaded) => downloaded, + Err(error) => { + warn!( + error = %error, + file_id = payload.file_id, + "failed to materialize codex_apps file download via app-server", + ); + return result; + } + }; + + let artifact_path = codex_apps_file_download_artifact_path( + &turn_context.config.codex_home, + session_id, + &payload.file_id, + payload + .file_name + .as_deref() + .or(payload.file_uri.file_name.as_deref()) + .unwrap_or("downloaded_file"), + ); + if let Some(parent) = artifact_path.parent() + && let Err(error) = tokio::fs::create_dir_all(parent.as_path()).await + { + warn!( + error = %error, + path = %parent.display(), + "failed to create codex_apps file download artifact directory", + ); + return result; + } + if let Err(error) = tokio::fs::write(artifact_path.as_path(), &downloaded).await { + warn!( + error = %error, + path = %artifact_path.display(), + "failed to write codex_apps file download artifact", + ); + return result; + } + + let local_path = artifact_path.to_string_lossy().to_string(); + if let Some(JsonValue::Object(map)) = result.structured_content.as_mut() { + map.insert( + "local_path".to_string(), + JsonValue::String(local_path.clone()), + ); + } + result.content.push(serde_json::json!({ + "type": "text", + "text": format!("Downloaded file to local path: {local_path}"), + })); + result +} + +fn extract_codex_apps_file_download_payload( + result: &CallToolResult, +) -> Option { + if let Some(structured_content) = result.structured_content.clone() + && let Ok(payload) = + serde_json::from_value::(structured_content) + { + return Some(payload); + } + + result + .content + .iter() + .filter_map(|item| item.as_object()) + .find_map(|item| { + let text = item.get("text")?.as_str()?; + serde_json::from_str::(text).ok() + }) +} + +fn codex_apps_file_download_artifact_path( + codex_home: &codex_utils_absolute_path::AbsolutePathBuf, + session_id: &str, + file_id: &str, + file_name: &str, +) -> codex_utils_absolute_path::AbsolutePathBuf { + codex_home + .join(CODEX_APPS_FILE_DOWNLOAD_ARTIFACTS_DIR) + .join(sanitize_path_component(session_id, "session")) + .join(sanitize_path_component(file_id, "file")) + .join(sanitize_file_name(file_name)) +} + +fn sanitize_path_component(value: &str, fallback: &str) -> String { + let sanitized: String = value + .chars() + .map(|ch| { + if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' { + ch + } else { + '_' + } + }) + .collect(); + if sanitized.is_empty() { + fallback.to_string() + } else { + sanitized + } +} + +fn sanitize_file_name(value: &str) -> String { + let sanitized: String = value + .chars() + .map(|ch| { + if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' || ch == '.' { + ch + } else { + '_' + } + }) + .collect(); + if sanitized.is_empty() { + "downloaded_file".to_string() + } else { + sanitized + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::session::tests::make_session_and_context; + use codex_login::CodexAuth; + use pretty_assertions::assert_eq; + use std::sync::Arc; + use wiremock::Mock; + use wiremock::MockServer; + use wiremock::ResponseTemplate; + use wiremock::matchers::header; + use wiremock::matchers::method; + use wiremock::matchers::path; + + fn download_materialization_meta() -> JsonMap { + serde_json::json!({ + "provider": "builtin", + "materialize_file_download": true, + }) + .as_object() + .cloned() + .expect("_codex_apps metadata object") + } + + #[tokio::test] + async fn codex_apps_file_download_materialization_adds_local_path_for_marked_tools() { + let server = MockServer::start().await; + Mock::given(method("GET")) + .and(path("/download/file_123")) + .and(header("authorization", "Bearer Access Token")) + .and(header("chatgpt-account-id", "account_id")) + .respond_with( + ResponseTemplate::new(200) + .insert_header("content-type", "text/plain") + .set_body_bytes(b"downloaded contents".to_vec()), + ) + .mount(&server) + .await; + + let (_, mut turn_context) = make_session_and_context().await; + let mut config = (*turn_context.config).clone(); + config.chatgpt_base_url = format!("{}/backend-api/codex", server.uri()); + turn_context.config = Arc::new(config); + let original = CallToolResult { + content: vec![serde_json::json!({ + "type": "text", + "text": "{\"file_id\":\"file_123\"}", + })], + structured_content: Some(serde_json::json!({ + "file_id": "file_123", + "file_name": "testing-file.txt", + "file_uri": { + "download_url": format!("{}/download/file_123", server.uri()), + "file_id": "file_123", + "file_name": "testing-file.txt", + "mime_type": "text/plain", + } + })), + is_error: Some(false), + meta: None, + }; + + let result = maybe_materialize_codex_apps_file_download_result_with_auth( + &turn_context, + "session-1", + Some(&CodexAuth::create_dummy_chatgpt_auth_for_testing()), + codex_mcp::CODEX_APPS_MCP_SERVER_NAME, + Some(&download_materialization_meta()), + original, + ) + .await; + + let local_path = result + .structured_content + .as_ref() + .and_then(|value| value.get("local_path")) + .and_then(JsonValue::as_str) + .expect("local_path in structured content"); + assert!(local_path.contains("codex_apps_downloads")); + let saved = tokio::fs::read(local_path) + .await + .expect("saved local file should exist"); + assert_eq!(saved, b"downloaded contents".to_vec()); + assert!(result.content.iter().any(|block| { + block.get("type").and_then(JsonValue::as_str) == Some("text") + && block + .get("text") + .and_then(JsonValue::as_str) + .is_some_and(|text| text.contains("Downloaded file to local path:")) + })); + } +} diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 54fadc6fd3..f489f04622 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -21,6 +21,7 @@ pub use codex_thread::CodexThread; pub use codex_thread::CodexThreadTurnContextOverrides; pub use codex_thread::ThreadConfigSnapshot; mod agent; +mod codex_apps_file_download; mod codex_delegate; mod command_canonicalization; mod commit_attribution; diff --git a/codex-rs/core/src/mcp_openai_file.rs b/codex-rs/core/src/mcp_openai_file.rs index 0e0d4a6008..32480e0875 100644 --- a/codex-rs/core/src/mcp_openai_file.rs +++ b/codex-rs/core/src/mcp_openai_file.rs @@ -12,6 +12,7 @@ use crate::session::session::Session; use crate::session::turn_context::TurnContext; +use codex_api::OpenAiFileUploadOptions; use codex_api::upload_local_file; use codex_login::CodexAuth; use serde_json::Value as JsonValue; @@ -21,6 +22,7 @@ pub(crate) async fn rewrite_mcp_tool_arguments_for_openai_files( turn_context: &TurnContext, arguments_value: Option, openai_file_input_params: Option<&[String]>, + upload_options: Option<&OpenAiFileUploadOptions>, ) -> Result, String> { let Some(openai_file_input_params) = openai_file_input_params else { return Ok(arguments_value); @@ -39,9 +41,14 @@ pub(crate) async fn rewrite_mcp_tool_arguments_for_openai_files( let Some(value) = arguments.get(field_name) else { continue; }; - let Some(uploaded_value) = - rewrite_argument_value_for_openai_files(turn_context, auth.as_ref(), field_name, value) - .await? + let Some(uploaded_value) = rewrite_argument_value_for_openai_files( + turn_context, + auth.as_ref(), + field_name, + value, + upload_options, + ) + .await? else { continue; }; @@ -60,6 +67,7 @@ async fn rewrite_argument_value_for_openai_files( auth: Option<&CodexAuth>, field_name: &str, value: &JsonValue, + upload_options: Option<&OpenAiFileUploadOptions>, ) -> Result, String> { match value { JsonValue::String(path_or_file_ref) => { @@ -69,6 +77,7 @@ async fn rewrite_argument_value_for_openai_files( field_name, /*index*/ None, path_or_file_ref, + upload_options, ) .await?; Ok(Some(rewritten)) @@ -85,6 +94,7 @@ async fn rewrite_argument_value_for_openai_files( field_name, Some(index), path_or_file_ref, + upload_options, ) .await?; rewritten_values.push(rewritten); @@ -101,6 +111,7 @@ async fn build_uploaded_local_argument_value( field_name: &str, index: Option, file_path: &str, + upload_options: Option<&OpenAiFileUploadOptions>, ) -> Result { let resolved_path = turn_context.resolve_path(Some(file_path.to_string())); let Some(auth) = auth else { @@ -114,10 +125,12 @@ async fn build_uploaded_local_argument_value( ); } let upload_auth = codex_model_provider::auth_provider_from_auth(auth); + let default_upload_options = OpenAiFileUploadOptions::default(); let uploaded = upload_local_file( turn_context.config.chatgpt_base_url.trim_end_matches('/'), upload_auth.as_ref(), &resolved_path, + upload_options.unwrap_or(&default_upload_options), ) .await .map_err(|error| match index { @@ -126,14 +139,22 @@ async fn build_uploaded_local_argument_value( } None => format!("failed to upload `{file_path}` for `{field_name}`: {error}"), })?; - Ok(serde_json::json!({ + let mut uploaded_value = serde_json::json!({ "download_url": uploaded.download_url, "file_id": uploaded.file_id, + "library_file_id": uploaded.library_file_id, "mime_type": uploaded.mime_type, "file_name": uploaded.file_name, "uri": uploaded.uri, "file_size_bytes": uploaded.file_size_bytes, - })) + }); + if uploaded.library_file_id.is_none() { + uploaded_value + .as_object_mut() + .expect("uploaded value should be an object") + .remove("library_file_id"); + } + Ok(uploaded_value) } #[cfg(test)] @@ -157,6 +178,7 @@ mod tests { &Arc::new(turn_context), arguments.clone(), /*openai_file_input_params*/ None, + /*upload_options*/ None, ) .await .expect("rewrite should succeed"); @@ -228,6 +250,7 @@ mod tests { "file", /*index*/ None, "file_report.csv", + /*upload_options*/ None, ) .await .expect("rewrite should upload the local file"); @@ -307,6 +330,7 @@ mod tests { Some(&auth), "file", &serde_json::json!("file_report.csv"), + /*upload_options*/ None, ) .await .expect("rewrite should succeed"); @@ -421,6 +445,7 @@ mod tests { Some(&auth), "files", &serde_json::json!(["one.csv", "two.csv"]), + /*upload_options*/ None, ) .await .expect("rewrite should succeed"); @@ -461,6 +486,7 @@ mod tests { "file": "/definitely/missing/file.csv", })), Some(&["file".to_string()]), + /*upload_options*/ None, ) .await .expect_err("missing file should fail"); diff --git a/codex-rs/core/src/mcp_tool_call.rs b/codex-rs/core/src/mcp_tool_call.rs index 7a76db9e4f..12b153b97d 100644 --- a/codex-rs/core/src/mcp_tool_call.rs +++ b/codex-rs/core/src/mcp_tool_call.rs @@ -12,6 +12,7 @@ use tracing::error; use crate::arc_monitor::ArcMonitorOutcome; use crate::arc_monitor::monitor_action; +use crate::codex_apps_file_download::maybe_materialize_codex_apps_file_download_result; use crate::config::Config; use crate::config::edit::ConfigEdit; use crate::config::edit::ConfigEditsBuilder; @@ -36,6 +37,7 @@ use crate::tools::sandboxing::PermissionRequestPayload; use codex_analytics::AppInvocation; use codex_analytics::InvocationType; use codex_analytics::build_track_events_context; +use codex_api::OpenAiFileUploadOptions; use codex_config::types::AppToolApproval; use codex_features::Feature; use codex_hooks::PermissionRequestDecision; @@ -309,6 +311,7 @@ async fn handle_approved_mcp_tool_call( turn_context, arguments_value.clone(), metadata.and_then(|metadata| metadata.openai_file_input_params.as_deref()), + metadata.and_then(|metadata| metadata.openai_file_upload_options.as_ref()), ) .await; let tool_input = match &rewrite { @@ -325,6 +328,7 @@ async fn handle_approved_mcp_tool_call( &server, &tool_name, rewritten_arguments, + metadata, request_meta, ) .await @@ -479,6 +483,7 @@ async fn execute_mcp_tool_call( server: &str, tool_name: &str, rewritten_arguments: Option, + metadata: Option<&McpToolApprovalMetadata>, request_meta: Option, ) -> Result { let request_meta = @@ -491,6 +496,14 @@ async fn execute_mcp_tool_call( .call_tool(server, tool_name, rewritten_arguments, request_meta) .await .map_err(|e| format!("tool call error: {e:?}"))?; + let result = maybe_materialize_codex_apps_file_download_result( + sess, + turn_context, + server, + metadata.and_then(|metadata| metadata.codex_apps_meta.as_ref()), + result, + ) + .await; sanitize_mcp_tool_result_for_model( turn_context .model_info @@ -662,12 +675,37 @@ pub(crate) struct McpToolApprovalMetadata { mcp_app_resource_uri: Option, codex_apps_meta: Option>, openai_file_input_params: Option>, + openai_file_upload_options: Option, } -const MCP_TOOL_CODEX_APPS_META_KEY: &str = "_codex_apps"; +const CODEX_APPS_META_KEY: &str = "_codex_apps"; const MCP_TOOL_OPENAI_OUTPUT_TEMPLATE_META_KEY: &str = "openai/outputTemplate"; const MCP_TOOL_UI_RESOURCE_URI_META_KEY: &str = "ui/resourceUri"; const MCP_TOOL_THREAD_ID_META_KEY: &str = "threadId"; +const MCP_TOOL_OPENAI_FILE_UPLOAD_CONFIG_KEY: &str = "openai/fileUploadConfig"; + +#[derive(Debug, Clone, Deserialize)] +struct RawOpenAiFileUploadConfig { + #[serde(default)] + store_in_library: bool, +} + +fn parse_openai_file_upload_options( + meta: Option<&serde_json::Map>, +) -> Option { + let raw = meta? + .get(MCP_TOOL_OPENAI_FILE_UPLOAD_CONFIG_KEY) + .cloned() + .and_then(|value| serde_json::from_value::(value).ok())?; + + if !raw.store_in_library { + return None; + } + + Some(OpenAiFileUploadOptions { + store_in_library: true, + }) +} fn custom_mcp_tool_approval_mode( turn_context: &TurnContext, @@ -719,7 +757,7 @@ fn build_mcp_tool_call_request_meta( serde_json::Value::String(call_id.to_string()), ); request_meta.insert( - MCP_TOOL_CODEX_APPS_META_KEY.to_string(), + CODEX_APPS_META_KEY.to_string(), serde_json::Value::Object(codex_apps_meta), ); } @@ -1191,13 +1229,16 @@ pub(crate) async fn lookup_mcp_tool_metadata( .tool .meta .as_ref() - .and_then(|meta| meta.get(MCP_TOOL_CODEX_APPS_META_KEY)) + .and_then(|meta| meta.get(CODEX_APPS_META_KEY)) .and_then(serde_json::Value::as_object) .cloned(), openai_file_input_params: Some(declared_openai_file_input_param_names( tool_info.tool.meta.as_deref(), )) .filter(|params| !params.is_empty()), + openai_file_upload_options: parse_openai_file_upload_options( + tool_info.tool.meta.as_deref(), + ), }) } diff --git a/codex-rs/core/src/mcp_tool_call_tests.rs b/codex-rs/core/src/mcp_tool_call_tests.rs index da0c549009..17a068d7e3 100644 --- a/codex-rs/core/src/mcp_tool_call_tests.rs +++ b/codex-rs/core/src/mcp_tool_call_tests.rs @@ -66,6 +66,7 @@ fn approval_metadata( mcp_app_resource_uri: None, codex_apps_meta: None, openai_file_input_params: None, + openai_file_upload_options: None, } } @@ -714,6 +715,7 @@ async fn codex_apps_tool_call_request_meta_includes_turn_metadata_and_codex_apps .expect("_codex_apps metadata should be an object"), ), openai_file_input_params: None, + openai_file_upload_options: None, }; assert_eq!( @@ -921,6 +923,7 @@ fn guardian_mcp_review_request_includes_annotations_when_present() { mcp_app_resource_uri: None, codex_apps_meta: None, openai_file_input_params: None, + openai_file_upload_options: None, }; let request = build_guardian_mcp_tool_review_request("call-1", &invocation, Some(&metadata)); @@ -1485,6 +1488,7 @@ async fn approve_mode_skips_when_annotations_do_not_require_approval() { mcp_app_resource_uri: None, codex_apps_meta: None, openai_file_input_params: None, + openai_file_upload_options: None, }; let decision = maybe_request_mcp_tool_approval( @@ -1558,6 +1562,7 @@ async fn guardian_mode_skips_auto_when_annotations_do_not_require_approval() { mcp_app_resource_uri: None, codex_apps_meta: None, openai_file_input_params: None, + openai_file_upload_options: None, }; let decision = maybe_request_mcp_tool_approval( @@ -1614,6 +1619,7 @@ async fn permission_request_hook_allows_mcp_tool_call() { mcp_app_resource_uri: None, codex_apps_meta: None, openai_file_input_params: None, + openai_file_upload_options: None, }; let decision = maybe_request_mcp_tool_approval( @@ -1745,6 +1751,7 @@ async fn permission_request_hook_runs_after_remembered_mcp_approval() { mcp_app_resource_uri: None, codex_apps_meta: None, openai_file_input_params: None, + openai_file_upload_options: None, }; let remembered_key = session_mcp_tool_approval_key(&invocation, Some(&metadata), AppToolApproval::Auto) @@ -1831,6 +1838,7 @@ async fn guardian_mode_mcp_denial_returns_rationale_message() { mcp_app_resource_uri: None, codex_apps_meta: None, openai_file_input_params: None, + openai_file_upload_options: None, }; let decision = maybe_request_mcp_tool_approval( @@ -1884,6 +1892,7 @@ async fn prompt_mode_waits_for_approval_when_annotations_do_not_require_approval mcp_app_resource_uri: None, codex_apps_meta: None, openai_file_input_params: None, + openai_file_upload_options: None, }; let mut approval_task = { @@ -1963,6 +1972,7 @@ async fn approve_mode_blocks_when_arc_returns_interrupt_for_model() { mcp_app_resource_uri: None, codex_apps_meta: None, openai_file_input_params: None, + openai_file_upload_options: None, }; let decision = maybe_request_mcp_tool_approval( @@ -2035,6 +2045,7 @@ async fn custom_approve_mode_blocks_when_arc_returns_interrupt_for_model() { mcp_app_resource_uri: None, codex_apps_meta: None, openai_file_input_params: None, + openai_file_upload_options: None, }; let decision = maybe_request_mcp_tool_approval( @@ -2107,6 +2118,7 @@ async fn approve_mode_blocks_when_arc_returns_interrupt_without_annotations() { mcp_app_resource_uri: None, codex_apps_meta: None, openai_file_input_params: None, + openai_file_upload_options: None, }; let decision = maybe_request_mcp_tool_approval( @@ -2187,6 +2199,7 @@ async fn full_access_mode_skips_arc_monitor_for_all_approval_modes() { mcp_app_resource_uri: None, codex_apps_meta: None, openai_file_input_params: None, + openai_file_upload_options: None, }; for approval_mode in [ @@ -2294,6 +2307,7 @@ async fn approve_mode_routes_arc_ask_user_to_guardian_when_guardian_reviewer_is_ mcp_app_resource_uri: None, codex_apps_meta: None, openai_file_input_params: None, + openai_file_upload_options: None, }; let decision = maybe_request_mcp_tool_approval( diff --git a/codex-rs/utils/plugins/src/mcp_connector.rs b/codex-rs/utils/plugins/src/mcp_connector.rs index 40fb0d4bbf..0f02258c16 100644 --- a/codex-rs/utils/plugins/src/mcp_connector.rs +++ b/codex-rs/utils/plugins/src/mcp_connector.rs @@ -11,6 +11,7 @@ const DISALLOWED_CONNECTOR_IDS: &[&str] = &[ ]; const FIRST_PARTY_CHAT_DISALLOWED_CONNECTOR_IDS: &[&str] = &["connector_0f9c9d4592e54d0a9a12b3f44a1e2010"]; +const ALLOWED_OPENAI_CONNECTOR_IDS: &[&str] = &["connector_openai_library"]; const DISALLOWED_CONNECTOR_PREFIX: &str = "connector_openai_"; pub fn is_connector_id_allowed(connector_id: &str) -> bool { @@ -24,6 +25,10 @@ fn is_connector_id_allowed_for_originator(connector_id: &str, originator_value: DISALLOWED_CONNECTOR_IDS }; + if ALLOWED_OPENAI_CONNECTOR_IDS.contains(&connector_id) { + return true; + } + !connector_id.starts_with(DISALLOWED_CONNECTOR_PREFIX) && !disallowed_connector_ids.contains(&connector_id) }