mirror of
https://github.com/openai/codex.git
synced 2026-05-24 04:54:52 +00:00
Stream file download materialization to disk
This commit is contained in:
@@ -19,7 +19,7 @@ reqwest = { workspace = true, features = ["json", "stream"] }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tokio = { workspace = true, features = ["fs", "macros", "net", "rt", "sync", "time"] }
|
||||
tokio = { workspace = true, features = ["fs", "io-util", "macros", "net", "rt", "sync", "time"] }
|
||||
tokio-tungstenite = { workspace = true }
|
||||
tungstenite = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
|
||||
@@ -4,10 +4,12 @@ use std::time::Duration;
|
||||
|
||||
use crate::AuthProvider;
|
||||
use codex_client::build_reqwest_client_with_custom_ca;
|
||||
use futures::StreamExt;
|
||||
use reqwest::StatusCode;
|
||||
use reqwest::header::CONTENT_LENGTH;
|
||||
use serde::Deserialize;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::io::ReaderStream;
|
||||
use url::Url;
|
||||
@@ -48,6 +50,12 @@ pub enum OpenAiFileError {
|
||||
#[source]
|
||||
source: std::io::Error,
|
||||
},
|
||||
#[error("path `{path}` cannot be written: {source}")]
|
||||
WriteFile {
|
||||
path: PathBuf,
|
||||
#[source]
|
||||
source: std::io::Error,
|
||||
},
|
||||
#[error(
|
||||
"file `{path}` is too large: {size_bytes} bytes exceeds the limit of {limit_bytes} bytes"
|
||||
)]
|
||||
@@ -128,27 +136,23 @@ pub async fn download_openai_file(
|
||||
auth: &impl AuthProvider,
|
||||
download_url: &str,
|
||||
) -> Result<Vec<u8>, OpenAiFileError> {
|
||||
let resolved_url = Url::parse(download_url).map_err(|source| OpenAiFileError::InvalidUrl {
|
||||
url: download_url.to_string(),
|
||||
source,
|
||||
})?;
|
||||
let request_builder = if should_attach_auth_to_openai_file_url(&resolved_url, base_url) {
|
||||
authorized_request(auth, reqwest::Method::GET, resolved_url.as_str())
|
||||
} else {
|
||||
build_reqwest_client()
|
||||
.request(reqwest::Method::GET, resolved_url.as_str())
|
||||
.timeout(OPENAI_FILE_REQUEST_TIMEOUT)
|
||||
};
|
||||
let response = request_builder
|
||||
.send()
|
||||
.await
|
||||
.map_err(|source| OpenAiFileError::Request {
|
||||
url: resolved_url.to_string(),
|
||||
source,
|
||||
})?;
|
||||
let (resolved_url, response) =
|
||||
send_openai_file_download_request(base_url, auth, download_url).await?;
|
||||
response_bytes(resolved_url.as_str(), response).await
|
||||
}
|
||||
|
||||
pub async fn download_openai_file_to_path(
|
||||
base_url: &str,
|
||||
auth: &impl AuthProvider,
|
||||
download_url: &str,
|
||||
path: &Path,
|
||||
max_size_bytes: u64,
|
||||
) -> Result<(), OpenAiFileError> {
|
||||
let (resolved_url, response) =
|
||||
send_openai_file_download_request(base_url, auth, download_url).await?;
|
||||
response_to_file(resolved_url.as_str(), response, path, max_size_bytes).await
|
||||
}
|
||||
|
||||
pub async fn upload_local_file(
|
||||
base_url: &str,
|
||||
auth: &dyn AuthProvider,
|
||||
@@ -425,6 +429,32 @@ fn authorized_request(
|
||||
.headers(headers)
|
||||
}
|
||||
|
||||
async fn send_openai_file_download_request(
|
||||
base_url: &str,
|
||||
auth: &impl AuthProvider,
|
||||
download_url: &str,
|
||||
) -> Result<(Url, reqwest::Response), OpenAiFileError> {
|
||||
let resolved_url = Url::parse(download_url).map_err(|source| OpenAiFileError::InvalidUrl {
|
||||
url: download_url.to_string(),
|
||||
source,
|
||||
})?;
|
||||
let request_builder = if should_attach_auth_to_openai_file_url(&resolved_url, base_url) {
|
||||
authorized_request(auth, reqwest::Method::GET, resolved_url.as_str())
|
||||
} else {
|
||||
build_reqwest_client()
|
||||
.request(reqwest::Method::GET, resolved_url.as_str())
|
||||
.timeout(OPENAI_FILE_REQUEST_TIMEOUT)
|
||||
};
|
||||
let response = request_builder
|
||||
.send()
|
||||
.await
|
||||
.map_err(|source| OpenAiFileError::Request {
|
||||
url: resolved_url.to_string(),
|
||||
source,
|
||||
})?;
|
||||
Ok((resolved_url, response))
|
||||
}
|
||||
|
||||
async fn response_text(url: &str, response: reqwest::Response) -> Result<String, OpenAiFileError> {
|
||||
let status = response.status();
|
||||
let body = response.text().await.unwrap_or_default();
|
||||
@@ -461,6 +491,73 @@ async fn response_bytes(
|
||||
Ok(bytes.to_vec())
|
||||
}
|
||||
|
||||
async fn response_to_file(
|
||||
url: &str,
|
||||
response: reqwest::Response,
|
||||
path: &Path,
|
||||
max_size_bytes: u64,
|
||||
) -> Result<(), OpenAiFileError> {
|
||||
let status = response.status();
|
||||
if !status.is_success() {
|
||||
let body = response.text().await.unwrap_or_default();
|
||||
return Err(OpenAiFileError::UnexpectedStatus {
|
||||
url: url.to_string(),
|
||||
status,
|
||||
body,
|
||||
});
|
||||
}
|
||||
|
||||
if let Some(content_length) = response.content_length()
|
||||
&& content_length > max_size_bytes
|
||||
{
|
||||
return Err(OpenAiFileError::FileTooLarge {
|
||||
path: path.to_path_buf(),
|
||||
size_bytes: content_length,
|
||||
limit_bytes: max_size_bytes,
|
||||
});
|
||||
}
|
||||
|
||||
let mut file = File::create(path)
|
||||
.await
|
||||
.map_err(|source| OpenAiFileError::WriteFile {
|
||||
path: path.to_path_buf(),
|
||||
source,
|
||||
})?;
|
||||
let mut size_bytes = 0_u64;
|
||||
let mut stream = response.bytes_stream();
|
||||
|
||||
while let Some(chunk) = stream.next().await {
|
||||
let chunk = chunk.map_err(|source| OpenAiFileError::Request {
|
||||
url: url.to_string(),
|
||||
source,
|
||||
})?;
|
||||
size_bytes += chunk.len() as u64;
|
||||
if size_bytes > max_size_bytes {
|
||||
let _ = tokio::fs::remove_file(path).await;
|
||||
return Err(OpenAiFileError::FileTooLarge {
|
||||
path: path.to_path_buf(),
|
||||
size_bytes,
|
||||
limit_bytes: max_size_bytes,
|
||||
});
|
||||
}
|
||||
if let Err(source) = file.write_all(&chunk).await {
|
||||
let _ = tokio::fs::remove_file(path).await;
|
||||
return Err(OpenAiFileError::WriteFile {
|
||||
path: path.to_path_buf(),
|
||||
source,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
file.flush()
|
||||
.await
|
||||
.map_err(|source| OpenAiFileError::WriteFile {
|
||||
path: path.to_path_buf(),
|
||||
source,
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn should_attach_auth_to_openai_file_url(download_url: &Url, base_url: &str) -> bool {
|
||||
let Ok(base_url) = Url::parse(base_url) else {
|
||||
return false;
|
||||
@@ -592,6 +689,41 @@ mod tests {
|
||||
assert_eq!(finalize_attempts.load(Ordering::SeqCst), 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn download_openai_file_to_path_enforces_size_limit() {
|
||||
let server = MockServer::start().await;
|
||||
Mock::given(method("GET"))
|
||||
.and(path("/download/file_123"))
|
||||
.and(header("authorization", "Bearer token"))
|
||||
.and(header("chatgpt-account-id", "account_id"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_bytes(b"hello".to_vec()))
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let dir = TempDir::new().expect("temp dir");
|
||||
let path = dir.path().join("downloaded.txt");
|
||||
|
||||
let error = download_openai_file_to_path(
|
||||
&base_url_for(&server),
|
||||
&chatgpt_auth(),
|
||||
&format!("{}/download/file_123", server.uri()),
|
||||
&path,
|
||||
4,
|
||||
)
|
||||
.await
|
||||
.expect_err("download should fail");
|
||||
|
||||
assert!(matches!(
|
||||
error,
|
||||
OpenAiFileError::FileTooLarge {
|
||||
size_bytes: 5,
|
||||
limit_bytes: 4,
|
||||
..
|
||||
}
|
||||
));
|
||||
assert!(!path.exists());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn upload_local_file_stores_library_file_with_process_upload_stream() {
|
||||
let server = MockServer::start().await;
|
||||
@@ -704,14 +836,9 @@ mod tests {
|
||||
.await;
|
||||
|
||||
let base_url = base_url_for(&server);
|
||||
let error = process_upload_stream(
|
||||
&chatgpt_auth(),
|
||||
&base_url,
|
||||
"file_123",
|
||||
"hello.txt",
|
||||
)
|
||||
.await
|
||||
.expect_err("stream processing should fail");
|
||||
let error = process_upload_stream(&chatgpt_auth(), &base_url, "file_123", "hello.txt")
|
||||
.await
|
||||
.expect_err("stream processing should fail");
|
||||
|
||||
assert!(matches!(
|
||||
error,
|
||||
|
||||
@@ -57,8 +57,10 @@ pub use crate::endpoint::ResponsesWebsocketClient;
|
||||
pub use crate::endpoint::ResponsesWebsocketConnection;
|
||||
pub use crate::endpoint::session_update_session_json;
|
||||
pub use crate::error::ApiError;
|
||||
pub use crate::files::OPENAI_FILE_UPLOAD_LIMIT_BYTES;
|
||||
pub use crate::files::OpenAiFileUploadOptions;
|
||||
pub use crate::files::download_openai_file;
|
||||
pub use crate::files::download_openai_file_to_path;
|
||||
pub use crate::files::upload_local_file;
|
||||
pub use crate::provider::Provider;
|
||||
pub use crate::provider::RetryConfig;
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use crate::session::session::Session;
|
||||
use crate::session::turn_context::TurnContext;
|
||||
use codex_api::download_openai_file;
|
||||
use codex_api::OPENAI_FILE_UPLOAD_LIMIT_BYTES;
|
||||
use codex_api::download_openai_file_to_path;
|
||||
use codex_login::CodexAuth;
|
||||
use codex_model_provider::BearerAuthProvider;
|
||||
use codex_protocol::mcp::CallToolResult;
|
||||
@@ -111,24 +112,6 @@ async fn materialize_codex_apps_file_download_result_with_auth(
|
||||
account_id: token_data.account_id,
|
||||
is_fedramp_account: auth.is_fedramp_account(),
|
||||
};
|
||||
let downloaded = match download_openai_file(
|
||||
download_base_url,
|
||||
&auth_provider,
|
||||
&payload.file_uri.download_url,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(downloaded) => downloaded,
|
||||
Err(error) => {
|
||||
warn!(
|
||||
error = %error,
|
||||
file_id = payload.file_id,
|
||||
"failed to materialize codex_apps file download via app-server",
|
||||
);
|
||||
return result;
|
||||
}
|
||||
};
|
||||
|
||||
let artifact_path = codex_apps_file_download_artifact_path(
|
||||
&turn_context.config.codex_home,
|
||||
session_id,
|
||||
@@ -149,11 +132,21 @@ async fn materialize_codex_apps_file_download_result_with_auth(
|
||||
);
|
||||
return result;
|
||||
}
|
||||
if let Err(error) = tokio::fs::write(artifact_path.as_path(), &downloaded).await {
|
||||
|
||||
if let Err(error) = download_openai_file_to_path(
|
||||
download_base_url,
|
||||
&auth_provider,
|
||||
&payload.file_uri.download_url,
|
||||
artifact_path.as_path(),
|
||||
OPENAI_FILE_UPLOAD_LIMIT_BYTES,
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!(
|
||||
error = %error,
|
||||
file_id = payload.file_id,
|
||||
path = %artifact_path.display(),
|
||||
"failed to write codex_apps file download artifact",
|
||||
"failed to materialize codex_apps file download via app-server",
|
||||
);
|
||||
return result;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user