From 92d6d45dc53fbcb97904677624f2c58e2602e802 Mon Sep 17 00:00:00 2001 From: Liang-Ting Jiang Date: Fri, 24 Apr 2026 17:35:51 -0700 Subject: [PATCH] Stream file download materialization to disk --- codex-rs/codex-api/Cargo.toml | 2 +- codex-rs/codex-api/src/files.rs | 179 +++++++++++++++--- codex-rs/codex-api/src/lib.rs | 2 + codex-rs/core/src/codex_apps_file_download.rs | 35 ++-- 4 files changed, 170 insertions(+), 48 deletions(-) diff --git a/codex-rs/codex-api/Cargo.toml b/codex-rs/codex-api/Cargo.toml index 14340af1eb..4b955e880d 100644 --- a/codex-rs/codex-api/Cargo.toml +++ b/codex-rs/codex-api/Cargo.toml @@ -19,7 +19,7 @@ reqwest = { workspace = true, features = ["json", "stream"] } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } thiserror = { workspace = true } -tokio = { workspace = true, features = ["fs", "macros", "net", "rt", "sync", "time"] } +tokio = { workspace = true, features = ["fs", "io-util", "macros", "net", "rt", "sync", "time"] } tokio-tungstenite = { workspace = true } tungstenite = { workspace = true } tracing = { workspace = true } diff --git a/codex-rs/codex-api/src/files.rs b/codex-rs/codex-api/src/files.rs index 23f882a4be..c8c2ea7d22 100644 --- a/codex-rs/codex-api/src/files.rs +++ b/codex-rs/codex-api/src/files.rs @@ -4,10 +4,12 @@ use std::time::Duration; use crate::AuthProvider; use codex_client::build_reqwest_client_with_custom_ca; +use futures::StreamExt; use reqwest::StatusCode; use reqwest::header::CONTENT_LENGTH; use serde::Deserialize; use tokio::fs::File; +use tokio::io::AsyncWriteExt; use tokio::time::Instant; use tokio_util::io::ReaderStream; use url::Url; @@ -48,6 +50,12 @@ pub enum OpenAiFileError { #[source] source: std::io::Error, }, + #[error("path `{path}` cannot be written: {source}")] + WriteFile { + path: PathBuf, + #[source] + source: std::io::Error, + }, #[error( "file `{path}` is too large: {size_bytes} bytes exceeds the limit of {limit_bytes} bytes" )] @@ -128,27 +136,23 @@ pub async fn download_openai_file( auth: &impl AuthProvider, download_url: &str, ) -> Result, OpenAiFileError> { - let resolved_url = Url::parse(download_url).map_err(|source| OpenAiFileError::InvalidUrl { - url: download_url.to_string(), - source, - })?; - let request_builder = if should_attach_auth_to_openai_file_url(&resolved_url, base_url) { - authorized_request(auth, reqwest::Method::GET, resolved_url.as_str()) - } else { - build_reqwest_client() - .request(reqwest::Method::GET, resolved_url.as_str()) - .timeout(OPENAI_FILE_REQUEST_TIMEOUT) - }; - let response = request_builder - .send() - .await - .map_err(|source| OpenAiFileError::Request { - url: resolved_url.to_string(), - source, - })?; + let (resolved_url, response) = + send_openai_file_download_request(base_url, auth, download_url).await?; response_bytes(resolved_url.as_str(), response).await } +pub async fn download_openai_file_to_path( + base_url: &str, + auth: &impl AuthProvider, + download_url: &str, + path: &Path, + max_size_bytes: u64, +) -> Result<(), OpenAiFileError> { + let (resolved_url, response) = + send_openai_file_download_request(base_url, auth, download_url).await?; + response_to_file(resolved_url.as_str(), response, path, max_size_bytes).await +} + pub async fn upload_local_file( base_url: &str, auth: &dyn AuthProvider, @@ -425,6 +429,32 @@ fn authorized_request( .headers(headers) } +async fn send_openai_file_download_request( + base_url: &str, + auth: &impl AuthProvider, + download_url: &str, +) -> Result<(Url, reqwest::Response), OpenAiFileError> { + let resolved_url = Url::parse(download_url).map_err(|source| OpenAiFileError::InvalidUrl { + url: download_url.to_string(), + source, + })?; + let request_builder = if should_attach_auth_to_openai_file_url(&resolved_url, base_url) { + authorized_request(auth, reqwest::Method::GET, resolved_url.as_str()) + } else { + build_reqwest_client() + .request(reqwest::Method::GET, resolved_url.as_str()) + .timeout(OPENAI_FILE_REQUEST_TIMEOUT) + }; + let response = request_builder + .send() + .await + .map_err(|source| OpenAiFileError::Request { + url: resolved_url.to_string(), + source, + })?; + Ok((resolved_url, response)) +} + async fn response_text(url: &str, response: reqwest::Response) -> Result { let status = response.status(); let body = response.text().await.unwrap_or_default(); @@ -461,6 +491,73 @@ async fn response_bytes( Ok(bytes.to_vec()) } +async fn response_to_file( + url: &str, + response: reqwest::Response, + path: &Path, + max_size_bytes: u64, +) -> Result<(), OpenAiFileError> { + let status = response.status(); + if !status.is_success() { + let body = response.text().await.unwrap_or_default(); + return Err(OpenAiFileError::UnexpectedStatus { + url: url.to_string(), + status, + body, + }); + } + + if let Some(content_length) = response.content_length() + && content_length > max_size_bytes + { + return Err(OpenAiFileError::FileTooLarge { + path: path.to_path_buf(), + size_bytes: content_length, + limit_bytes: max_size_bytes, + }); + } + + let mut file = File::create(path) + .await + .map_err(|source| OpenAiFileError::WriteFile { + path: path.to_path_buf(), + source, + })?; + let mut size_bytes = 0_u64; + let mut stream = response.bytes_stream(); + + while let Some(chunk) = stream.next().await { + let chunk = chunk.map_err(|source| OpenAiFileError::Request { + url: url.to_string(), + source, + })?; + size_bytes += chunk.len() as u64; + if size_bytes > max_size_bytes { + let _ = tokio::fs::remove_file(path).await; + return Err(OpenAiFileError::FileTooLarge { + path: path.to_path_buf(), + size_bytes, + limit_bytes: max_size_bytes, + }); + } + if let Err(source) = file.write_all(&chunk).await { + let _ = tokio::fs::remove_file(path).await; + return Err(OpenAiFileError::WriteFile { + path: path.to_path_buf(), + source, + }); + } + } + + file.flush() + .await + .map_err(|source| OpenAiFileError::WriteFile { + path: path.to_path_buf(), + source, + })?; + Ok(()) +} + fn should_attach_auth_to_openai_file_url(download_url: &Url, base_url: &str) -> bool { let Ok(base_url) = Url::parse(base_url) else { return false; @@ -592,6 +689,41 @@ mod tests { assert_eq!(finalize_attempts.load(Ordering::SeqCst), 2); } + #[tokio::test] + async fn download_openai_file_to_path_enforces_size_limit() { + let server = MockServer::start().await; + Mock::given(method("GET")) + .and(path("/download/file_123")) + .and(header("authorization", "Bearer token")) + .and(header("chatgpt-account-id", "account_id")) + .respond_with(ResponseTemplate::new(200).set_body_bytes(b"hello".to_vec())) + .mount(&server) + .await; + + let dir = TempDir::new().expect("temp dir"); + let path = dir.path().join("downloaded.txt"); + + let error = download_openai_file_to_path( + &base_url_for(&server), + &chatgpt_auth(), + &format!("{}/download/file_123", server.uri()), + &path, + 4, + ) + .await + .expect_err("download should fail"); + + assert!(matches!( + error, + OpenAiFileError::FileTooLarge { + size_bytes: 5, + limit_bytes: 4, + .. + } + )); + assert!(!path.exists()); + } + #[tokio::test] async fn upload_local_file_stores_library_file_with_process_upload_stream() { let server = MockServer::start().await; @@ -704,14 +836,9 @@ mod tests { .await; let base_url = base_url_for(&server); - let error = process_upload_stream( - &chatgpt_auth(), - &base_url, - "file_123", - "hello.txt", - ) - .await - .expect_err("stream processing should fail"); + let error = process_upload_stream(&chatgpt_auth(), &base_url, "file_123", "hello.txt") + .await + .expect_err("stream processing should fail"); assert!(matches!( error, diff --git a/codex-rs/codex-api/src/lib.rs b/codex-rs/codex-api/src/lib.rs index f1eb23a96e..0203297c04 100644 --- a/codex-rs/codex-api/src/lib.rs +++ b/codex-rs/codex-api/src/lib.rs @@ -57,8 +57,10 @@ pub use crate::endpoint::ResponsesWebsocketClient; pub use crate::endpoint::ResponsesWebsocketConnection; pub use crate::endpoint::session_update_session_json; pub use crate::error::ApiError; +pub use crate::files::OPENAI_FILE_UPLOAD_LIMIT_BYTES; pub use crate::files::OpenAiFileUploadOptions; pub use crate::files::download_openai_file; +pub use crate::files::download_openai_file_to_path; pub use crate::files::upload_local_file; pub use crate::provider::Provider; pub use crate::provider::RetryConfig; diff --git a/codex-rs/core/src/codex_apps_file_download.rs b/codex-rs/core/src/codex_apps_file_download.rs index 526f2b8482..164a79a107 100644 --- a/codex-rs/core/src/codex_apps_file_download.rs +++ b/codex-rs/core/src/codex_apps_file_download.rs @@ -1,6 +1,7 @@ use crate::session::session::Session; use crate::session::turn_context::TurnContext; -use codex_api::download_openai_file; +use codex_api::OPENAI_FILE_UPLOAD_LIMIT_BYTES; +use codex_api::download_openai_file_to_path; use codex_login::CodexAuth; use codex_model_provider::BearerAuthProvider; use codex_protocol::mcp::CallToolResult; @@ -111,24 +112,6 @@ async fn materialize_codex_apps_file_download_result_with_auth( account_id: token_data.account_id, is_fedramp_account: auth.is_fedramp_account(), }; - let downloaded = match download_openai_file( - download_base_url, - &auth_provider, - &payload.file_uri.download_url, - ) - .await - { - Ok(downloaded) => downloaded, - Err(error) => { - warn!( - error = %error, - file_id = payload.file_id, - "failed to materialize codex_apps file download via app-server", - ); - return result; - } - }; - let artifact_path = codex_apps_file_download_artifact_path( &turn_context.config.codex_home, session_id, @@ -149,11 +132,21 @@ async fn materialize_codex_apps_file_download_result_with_auth( ); return result; } - if let Err(error) = tokio::fs::write(artifact_path.as_path(), &downloaded).await { + + if let Err(error) = download_openai_file_to_path( + download_base_url, + &auth_provider, + &payload.file_uri.download_url, + artifact_path.as_path(), + OPENAI_FILE_UPLOAD_LIMIT_BYTES, + ) + .await + { warn!( error = %error, + file_id = payload.file_id, path = %artifact_path.display(), - "failed to write codex_apps file download artifact", + "failed to materialize codex_apps file download via app-server", ); return result; }