Add OpenAI file download materialization and library upload

This commit is contained in:
Liang-Ting Jiang
2026-04-24 16:04:06 -07:00
parent 8a559e7938
commit ff15db5d40
8 changed files with 765 additions and 26 deletions

View File

@@ -10,6 +10,7 @@ use serde::Deserialize;
use tokio::fs::File;
use tokio::time::Instant;
use tokio_util::io::ReaderStream;
use url::Url;
pub const OPENAI_FILE_URI_PREFIX: &str = "sediment://";
pub const OPENAI_FILE_UPLOAD_LIMIT_BYTES: u64 = 512 * 1024 * 1024;
@@ -18,16 +19,21 @@ const OPENAI_FILE_REQUEST_TIMEOUT: Duration = Duration::from_secs(60);
const OPENAI_FILE_FINALIZE_TIMEOUT: Duration = Duration::from_secs(30);
const OPENAI_FILE_FINALIZE_RETRY_DELAY: Duration = Duration::from_millis(250);
const OPENAI_FILE_USE_CASE: &str = "codex";
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct OpenAiFileUploadOptions {
pub store_in_library: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UploadedOpenAiFile {
pub file_id: String,
pub uri: String,
pub download_url: String,
pub download_url: Option<String>,
pub file_name: String,
pub file_size_bytes: u64,
pub mime_type: Option<String>,
pub path: PathBuf,
pub library_file_id: Option<String>,
}
#[derive(Debug, thiserror::Error)]
@@ -68,6 +74,12 @@ pub enum OpenAiFileError {
#[source]
source: serde_json::Error,
},
#[error("failed to resolve OpenAI file URL `{url}`: {source}")]
InvalidUrl {
url: String,
#[source]
source: url::ParseError,
},
#[error("OpenAI file upload for `{file_id}` is not ready yet")]
UploadNotReady { file_id: String },
#[error("OpenAI file upload for `{file_id}` failed: {message}")]
@@ -90,14 +102,55 @@ struct DownloadLinkResponse {
error_message: Option<String>,
}
#[derive(Deserialize)]
struct ProcessUploadStreamStatus {
event: Option<String>,
message: Option<String>,
#[serde(default)]
extra: Option<ProcessUploadStreamExtra>,
}
#[derive(Debug, Default, Deserialize)]
struct ProcessUploadStreamExtra {
#[serde(alias = "metadata_object_id", alias = "library_file_id")]
library_file_id: Option<String>,
#[serde(alias = "library_file_name")]
file_name: Option<String>,
mime_type: Option<String>,
}
pub fn openai_file_uri(file_id: &str) -> String {
format!("{OPENAI_FILE_URI_PREFIX}{file_id}")
}
pub async fn download_openai_file(
base_url: &str,
auth: &impl AuthProvider,
download_url: &str,
) -> Result<Vec<u8>, OpenAiFileError> {
let resolved_url = resolve_openai_file_download_url(base_url, download_url)?;
let request_builder = if should_attach_auth_to_openai_file_url(&resolved_url, base_url) {
authorized_request(auth, reqwest::Method::GET, resolved_url.as_str())
} else {
build_reqwest_client()
.request(reqwest::Method::GET, resolved_url.as_str())
.timeout(OPENAI_FILE_REQUEST_TIMEOUT)
};
let response = request_builder
.send()
.await
.map_err(|source| OpenAiFileError::Request {
url: resolved_url.to_string(),
source,
})?;
response_bytes(resolved_url.as_str(), response).await
}
pub async fn upload_local_file(
base_url: &str,
auth: &dyn AuthProvider,
path: &Path,
options: &OpenAiFileUploadOptions,
) -> Result<UploadedOpenAiFile, OpenAiFileError> {
let metadata = tokio::fs::metadata(path)
.await
@@ -128,13 +181,18 @@ pub async fn upload_local_file(
.and_then(|value| value.to_str())
.unwrap_or("file")
.to_string();
let create_url = format!("{}/files", base_url.trim_end_matches('/'));
let base_url = base_url.trim_end_matches('/');
let mut create_request = serde_json::json!({
"file_name": file_name,
"file_size": metadata.len(),
"use_case": OPENAI_FILE_USE_CASE,
});
if options.store_in_library {
create_request["store_in_library"] = serde_json::json!(true);
}
let create_url = format!("{base_url}/files");
let create_response = authorized_request(auth, reqwest::Method::POST, &create_url)
.json(&serde_json::json!({
"file_name": file_name,
"file_size": metadata.len(),
"use_case": OPENAI_FILE_USE_CASE,
}))
.json(&create_request)
.send()
.await
.map_err(|source| OpenAiFileError::Request {
@@ -184,11 +242,29 @@ pub async fn upload_local_file(
});
}
let finalize_url = format!(
"{}/files/{}/uploaded",
base_url.trim_end_matches('/'),
create_payload.file_id,
);
if options.store_in_library {
let processed =
process_upload_stream(auth, base_url, &create_payload.file_id, &file_name).await?;
let library_file_id =
processed
.library_file_id
.ok_or_else(|| OpenAiFileError::UploadFailed {
file_id: create_payload.file_id.clone(),
message: "upload completed without creating a library_file_id".to_string(),
})?;
return Ok(UploadedOpenAiFile {
file_id: create_payload.file_id.clone(),
uri: openai_file_uri(&create_payload.file_id),
download_url: None,
file_name: processed.file_name.unwrap_or(file_name),
file_size_bytes: metadata.len(),
mime_type: processed.mime_type,
path: path.to_path_buf(),
library_file_id: Some(library_file_id),
});
}
let finalize_url = format!("{base_url}/files/{}/uploaded", create_payload.file_id);
let finalize_started_at = Instant::now();
loop {
let finalize_response = authorized_request(auth, reqwest::Method::POST, &finalize_url)
@@ -219,16 +295,17 @@ pub async fn upload_local_file(
return Ok(UploadedOpenAiFile {
file_id: create_payload.file_id.clone(),
uri: openai_file_uri(&create_payload.file_id),
download_url: finalize_payload.download_url.ok_or_else(|| {
download_url: Some(finalize_payload.download_url.ok_or_else(|| {
OpenAiFileError::UploadFailed {
file_id: create_payload.file_id.clone(),
message: "missing download_url".to_string(),
}
})?,
})?),
file_name: finalize_payload.file_name.unwrap_or(file_name),
file_size_bytes: metadata.len(),
mime_type: finalize_payload.mime_type,
path: path.to_path_buf(),
library_file_id: None,
});
}
"retry" => {
@@ -251,6 +328,82 @@ pub async fn upload_local_file(
}
}
async fn process_upload_stream(
auth: &dyn AuthProvider,
base_url: &str,
file_id: &str,
file_name: &str,
) -> Result<ProcessUploadStreamExtra, OpenAiFileError> {
let process_url = format!("{base_url}/files/process_upload_stream");
let process_response = authorized_request(auth, reqwest::Method::POST, &process_url)
.json(&serde_json::json!({
"file_id": file_id,
"file_name": file_name,
"use_case": OPENAI_FILE_USE_CASE,
"index_for_retrieval": false,
"entry_surface": OPENAI_FILE_USE_CASE,
"metadata": {
"store_in_library": true,
},
}))
.send()
.await
.map_err(|source| OpenAiFileError::Request {
url: process_url.clone(),
source,
})?;
let process_body = response_text(&process_url, process_response).await?;
let mut result = ProcessUploadStreamExtra::default();
for line in process_body.lines() {
let line = line.trim();
if line.is_empty() {
continue;
}
let ProcessUploadStreamStatus {
event,
message,
extra,
} = serde_json::from_str(line).map_err(|source| OpenAiFileError::Decode {
url: process_url.clone(),
source,
})?;
let extra = extra.unwrap_or_default();
if let Some(event) = event.as_deref()
&& is_process_upload_stream_error_event(event)
{
return Err(OpenAiFileError::UploadFailed {
file_id: file_id.to_string(),
message: message
.filter(|message| !message.is_empty())
.unwrap_or_else(|| format!("process_upload_stream returned {event}")),
});
}
if result.library_file_id.is_none() {
result.library_file_id = non_empty_string(extra.library_file_id);
}
if result.file_name.is_none() {
result.file_name = non_empty_string(extra.file_name);
}
if result.mime_type.is_none() {
result.mime_type = non_empty_string(extra.mime_type);
}
}
Ok(result)
}
fn non_empty_string(value: Option<String>) -> Option<String> {
value.filter(|value| !value.is_empty())
}
fn is_process_upload_stream_error_event(event: &str) -> bool {
let event_tail = event.rsplit(['.', '_']).next().unwrap_or(event);
matches!(event_tail, "error" | "cancelled" | "unknown")
}
fn authorized_request(
auth: &dyn AuthProvider,
method: reqwest::Method,
@@ -266,6 +419,82 @@ fn authorized_request(
.headers(headers)
}
async fn response_text(url: &str, response: reqwest::Response) -> Result<String, OpenAiFileError> {
let status = response.status();
let body = response.text().await.unwrap_or_default();
if !status.is_success() {
return Err(OpenAiFileError::UnexpectedStatus {
url: url.to_string(),
status,
body,
});
}
Ok(body)
}
async fn response_bytes(
url: &str,
response: reqwest::Response,
) -> Result<Vec<u8>, OpenAiFileError> {
let status = response.status();
if !status.is_success() {
let body = response.text().await.unwrap_or_default();
return Err(OpenAiFileError::UnexpectedStatus {
url: url.to_string(),
status,
body,
});
}
let bytes = response
.bytes()
.await
.map_err(|source| OpenAiFileError::Request {
url: url.to_string(),
source,
})?;
Ok(bytes.to_vec())
}
fn resolve_openai_file_download_url(
base_url: &str,
download_url: &str,
) -> Result<Url, OpenAiFileError> {
match Url::parse(download_url) {
Ok(url) => Ok(url),
Err(url::ParseError::RelativeUrlWithoutBase) => {
let normalized_base_url = if base_url.ends_with('/') {
base_url.to_string()
} else {
format!("{base_url}/")
};
let base =
Url::parse(&normalized_base_url).map_err(|source| OpenAiFileError::InvalidUrl {
url: normalized_base_url.clone(),
source,
})?;
base.join(download_url)
.map_err(|source| OpenAiFileError::InvalidUrl {
url: download_url.to_string(),
source,
})
}
Err(source) => Err(OpenAiFileError::InvalidUrl {
url: download_url.to_string(),
source,
}),
}
}
fn should_attach_auth_to_openai_file_url(download_url: &Url, base_url: &str) -> bool {
let Ok(base_url) = Url::parse(base_url) else {
return false;
};
match (download_url.host_str(), base_url.host_str()) {
(Some(download_host), Some(base_host)) => download_host.eq_ignore_ascii_case(base_host),
_ => false,
}
}
fn build_reqwest_client() -> reqwest::Client {
build_reqwest_client_with_custom_ca(reqwest::Client::builder()).unwrap_or_else(|error| {
tracing::warn!(error = %error, "failed to build OpenAI file upload client");
@@ -363,18 +592,98 @@ mod tests {
let path = dir.path().join("hello.txt");
tokio::fs::write(&path, b"hello").await.expect("write file");
let uploaded = upload_local_file(&base_url, &chatgpt_auth(), &path)
.await
.expect("upload succeeds");
let uploaded = upload_local_file(
&base_url,
&chatgpt_auth(),
&path,
&OpenAiFileUploadOptions::default(),
)
.await
.expect("upload succeeds");
assert_eq!(uploaded.file_id, "file_123");
assert_eq!(uploaded.uri, "sediment://file_123");
assert_eq!(
uploaded.download_url,
format!("{}/download/file_123", server.uri())
Some(format!("{}/download/file_123", server.uri()))
);
assert_eq!(uploaded.file_name, "hello.txt");
assert_eq!(uploaded.mime_type, Some("text/plain".to_string()));
assert_eq!(finalize_attempts.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn upload_local_file_stores_library_file_with_process_upload_stream() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/backend-api/files"))
.and(header("chatgpt-account-id", "account_id"))
.and(body_json(serde_json::json!({
"file_name": "hello.txt",
"file_size": 5,
"use_case": "codex",
"store_in_library": true,
})))
.respond_with(
ResponseTemplate::new(200)
.set_body_json(serde_json::json!({"file_id": "file_123", "upload_url": format!("{}/upload/file_123", server.uri())})),
)
.mount(&server)
.await;
Mock::given(method("PUT"))
.and(path("/upload/file_123"))
.and(header("content-length", "5"))
.respond_with(ResponseTemplate::new(200))
.mount(&server)
.await;
Mock::given(method("POST"))
.and(path("/backend-api/files/process_upload_stream"))
.and(body_json(serde_json::json!({
"file_id": "file_123",
"file_name": "hello.txt",
"use_case": "codex",
"index_for_retrieval": false,
"entry_surface": "codex",
"metadata": {
"store_in_library": true,
},
})))
.respond_with(ResponseTemplate::new(200).set_body_bytes(
concat!(
"{\"file_id\":\"file_123\",\"event\":\"indexing.completed\",\"message\":\"\",",
"\"extra\":{\"metadata_object_id\":\"library_123\",",
"\"library_file_name\":\"hello.txt\",\"mime_type\":\"text/plain\"}}\n",
"{\"file_id\":\"file_123\",\"event\":\"completed\",",
"\"message\":\"Succeeded processing file file_123\",",
"\"progress\":100,\"extra\":null}\n",
)
.as_bytes()
.to_vec(),
))
.mount(&server)
.await;
let base_url = base_url_for(&server);
let dir = TempDir::new().expect("temp dir");
let path = dir.path().join("hello.txt");
tokio::fs::write(&path, b"hello").await.expect("write file");
let uploaded = upload_local_file(
&base_url,
&chatgpt_auth(),
&path,
&OpenAiFileUploadOptions {
store_in_library: true,
},
)
.await
.expect("upload succeeds");
assert_eq!(uploaded.file_id, "file_123");
assert_eq!(uploaded.uri, "sediment://file_123");
assert_eq!(uploaded.download_url, None);
assert_eq!(uploaded.file_name, "hello.txt");
assert_eq!(uploaded.mime_type, Some("text/plain".to_string()));
assert_eq!(uploaded.library_file_id, Some("library_123".to_string()));
}
}

View File

@@ -57,6 +57,8 @@ pub use crate::endpoint::ResponsesWebsocketClient;
pub use crate::endpoint::ResponsesWebsocketConnection;
pub use crate::endpoint::session_update_session_json;
pub use crate::error::ApiError;
pub use crate::files::OpenAiFileUploadOptions;
pub use crate::files::download_openai_file;
pub use crate::files::upload_local_file;
pub use crate::provider::Provider;
pub use crate::provider::RetryConfig;