Opt:优化大文件上传

This commit is contained in:
MarSeventh
2025-07-17 21:43:24 +08:00
parent 0245f91f64
commit fd002366e0
21 changed files with 454 additions and 58 deletions

View File

@@ -119,6 +119,7 @@ export async function handleChunkUpload(context) {
// 立即创建分块记录,标记为"uploading"状态
const chunkKey = `chunk_${uploadId}_${chunkIndex.toString().padStart(3, '0')}`;
const chunkData = await chunk.arrayBuffer();
const uploadStartTime = Date.now();
const initialChunkMetadata = {
uploadId,
chunkIndex,
@@ -126,9 +127,11 @@ export async function handleChunkUpload(context) {
originalFileName,
originalFileType,
chunkSize: chunkData.byteLength,
uploadTime: Date.now(),
uploadTime: uploadStartTime,
uploadStartTime: uploadStartTime,
status: 'uploading',
uploadChannel
uploadChannel,
timeoutThreshold: uploadStartTime + 60000 // 1分钟超时阈值
};
// 立即保存分块记录和数据,设置过期时间
@@ -137,8 +140,8 @@ export async function handleChunkUpload(context) {
expirationTtl: 3600 // 1小时过期
});
// 异步上传分块到存储端
waitUntil(uploadChunkToStorage(context, chunk, chunkIndex, totalChunks, uploadId, originalFileName, originalFileType, uploadChannel));
// 异步上传分块到存储端,添加超时保护
waitUntil(uploadChunkToStorageWithTimeout(context, chunk, chunkIndex, totalChunks, uploadId, originalFileName, originalFileType, uploadChannel));
return createResponse(JSON.stringify({
success: true,
@@ -155,8 +158,84 @@ export async function handleChunkUpload(context) {
}
}
// 处理清理请求
export async function handleCleanupRequest(env, uploadId, totalChunks) {
try {
if (!uploadId) {
return createResponse(JSON.stringify({
error: 'Missing uploadId parameter'
}), { status: 400, headers: { 'Content-Type': 'application/json' } });
}
// 强制清理所有相关数据
await forceCleanupUpload(env, uploadId, totalChunks);
return createResponse(JSON.stringify({
success: true,
message: `Cleanup completed for upload ${uploadId}`,
uploadId: uploadId,
cleanedChunks: totalChunks
}), {
status: 200,
headers: { 'Content-Type': 'application/json' }
});
} catch (error) {
return createResponse(JSON.stringify({
error: `Cleanup failed: ${error.message}`,
uploadId: uploadId
}), { status: 500, headers: { 'Content-Type': 'application/json' } });
}
}
/* ======= 单个分块上传到不同渠道的存储端 ======= */
// 带超时保护的异步上传分块到存储端
async function uploadChunkToStorageWithTimeout(context, chunk, chunkIndex, totalChunks, uploadId, originalFileName, originalFileType, uploadChannel) {
const { env } = context;
const chunkKey = `chunk_${uploadId}_${chunkIndex.toString().padStart(3, '0')}`;
const UPLOAD_TIMEOUT = 45000; // 45秒超时
try {
// 设置超时 Promise
const timeoutPromise = new Promise((_, reject) => {
setTimeout(() => reject(new Error('Upload timeout')), UPLOAD_TIMEOUT);
});
// 执行实际上传
const uploadPromise = uploadChunkToStorage(context, chunk, chunkIndex, totalChunks, uploadId, originalFileName, originalFileType, uploadChannel);
// 竞速执行
await Promise.race([uploadPromise, timeoutPromise]);
} catch (error) {
console.error(`Chunk ${chunkIndex} upload failed or timed out:`, error);
// 超时或失败时,更新状态为超时/失败
try {
const chunkRecord = await env.img_url.getWithMetadata(chunkKey, { type: 'arrayBuffer' });
if (chunkRecord && chunkRecord.metadata) {
const isTimeout = error.message === 'Upload timeout';
const errorMetadata = {
...chunkRecord.metadata,
status: isTimeout ? 'timeout' : 'failed',
error: error.message,
failedTime: Date.now(),
isTimeout: isTimeout
};
// 保留原始数据以便重试
await env.img_url.put(chunkKey, chunkRecord.value, {
metadata: errorMetadata,
expirationTtl: 3600
});
}
} catch (metaError) {
console.error('Failed to save timeout/error metadata:', metaError);
}
}
}
// 异步上传分块到存储端
async function uploadChunkToStorage(context, chunk, chunkIndex, totalChunks, uploadId, originalFileName, originalFileType, uploadChannel) {
const { env, url } = context;
@@ -569,9 +648,10 @@ async function uploadSingleChunkToTelegram(context, chunkData, chunkIndex, total
export async function retryFailedChunks(context, failedChunks, uploadChannel) {
const { env } = context;
const maxRetries = 3;
const RETRY_TIMEOUT = 30000; // 30秒重试超时
for (const chunk of failedChunks) {
// 只重试真正失败且有数据的分块
// 只重试真正失败、超时且有数据的分块
if (!chunk.hasData) {
console.warn(`Chunk ${chunk.index} has no data, skipping retry (status: ${chunk.status})`);
continue;
@@ -582,6 +662,11 @@ export async function retryFailedChunks(context, failedChunks, uploadChannel) {
continue;
}
// 跳过已完成的分块
if (chunk.status === 'completed') {
continue;
}
let retryCount = 0;
let success = false;
@@ -599,16 +684,39 @@ export async function retryFailedChunks(context, failedChunks, uploadChannel) {
const uploadId = chunkRecord.metadata?.uploadId;
const totalChunks = chunkRecord.metadata?.totalChunks || 1;
// 更新重试状态
const retryMetadata = {
...chunkRecord.metadata,
status: 'retrying',
retryCount: retryCount + 1,
retryStartTime: Date.now(),
retryTimeoutThreshold: Date.now() + RETRY_TIMEOUT
};
await env.img_url.put(chunk.key, chunkData, {
metadata: retryMetadata,
expirationTtl: 3600
});
let uploadResult = null;
// 根据渠道重新上传
if (uploadChannel === 'cfr2') {
uploadResult = await uploadSingleChunkToR2Multipart(context, chunkData, chunk.index, totalChunks, uploadId, originalFileName, originalFileType);
} else if (uploadChannel === 's3') {
uploadResult = await uploadSingleChunkToS3Multipart(context, chunkData, chunk.index, totalChunks, uploadId, originalFileName, originalFileType);
} else if (uploadChannel === 'telegram') {
uploadResult = await uploadSingleChunkToTelegram(context, chunkData, chunk.index, totalChunks, uploadId, originalFileName, originalFileType);
}
// 根据渠道重新上传,添加超时保护
const retryPromise = (async () => {
if (uploadChannel === 'cfr2') {
return await uploadSingleChunkToR2Multipart(context, chunkData, chunk.index, totalChunks, uploadId, originalFileName, originalFileType);
} else if (uploadChannel === 's3') {
return await uploadSingleChunkToS3Multipart(context, chunkData, chunk.index, totalChunks, uploadId, originalFileName, originalFileType);
} else if (uploadChannel === 'telegram') {
return await uploadSingleChunkToTelegram(context, chunkData, chunk.index, totalChunks, uploadId, originalFileName, originalFileType);
}
return null;
})();
const timeoutPromise = new Promise((_, reject) => {
setTimeout(() => reject(new Error('Retry timeout')), RETRY_TIMEOUT);
});
uploadResult = await Promise.race([retryPromise, timeoutPromise]);
if (uploadResult && uploadResult.success) {
// 更新状态为成功
@@ -616,7 +724,8 @@ export async function retryFailedChunks(context, failedChunks, uploadChannel) {
...chunkRecord.metadata,
status: 'completed',
uploadResult: uploadResult,
retryCount: retryCount + 1
retryCount: retryCount + 1,
completedTime: Date.now()
};
// 删除原始数据,只保留上传结果,设置过期时间
@@ -627,17 +736,35 @@ export async function retryFailedChunks(context, failedChunks, uploadChannel) {
success = true;
console.log(`Chunk ${chunk.index} retry successful after ${retryCount + 1} attempts`);
} else {
retryCount++;
console.warn(`Chunk ${chunk.index} retry ${retryCount} failed: ${uploadResult?.error || 'Unknown error'}`);
if (retryCount < maxRetries) {
await new Promise(resolve => setTimeout(resolve, 1000 * retryCount)); // 递增延迟
}
throw new Error(uploadResult?.error || 'Unknown retry error');
}
} catch (error) {
retryCount++;
console.error(`Chunk ${chunk.index} retry ${retryCount} error:`, error);
const isTimeout = error.message === 'Retry timeout';
console.warn(`Chunk ${chunk.index} retry ${retryCount} ${isTimeout ? 'timed out' : 'failed'}: ${error.message}`);
// 更新重试失败状态
try {
const chunkRecord = await env.img_url.getWithMetadata(chunk.key, { type: 'arrayBuffer' });
if (chunkRecord) {
const failedRetryMetadata = {
...chunkRecord.metadata,
status: isTimeout ? 'retry_timeout' : 'retry_failed',
retryCount: retryCount,
lastRetryError: error.message,
lastRetryTime: Date.now(),
isRetryTimeout: isTimeout
};
await env.img_url.put(chunk.key, chunkRecord.value, {
metadata: failedRetryMetadata,
expirationTtl: 3600
});
}
} catch (metaError) {
console.error(`Failed to update retry error metadata for chunk ${chunk.index}:`, metaError);
}
if (retryCount < maxRetries) {
await new Promise(resolve => setTimeout(resolve, 1000 * retryCount)); // 递增延迟
@@ -708,20 +835,39 @@ export async function cleanupFailedMultipartUploads(context, uploadId, uploadCha
// 检查分块上传状态
export async function checkChunkUploadStatuses(env, uploadId, totalChunks) {
const chunkStatuses = [];
const currentTime = Date.now();
for (let i = 0; i < totalChunks; i++) {
const chunkKey = `chunk_${uploadId}_${i.toString().padStart(3, '0')}`;
try {
const chunkRecord = await env.img_url.getWithMetadata(chunkKey);
if (chunkRecord && chunkRecord.metadata) {
const status = chunkRecord.metadata.status || 'unknown';
let status = chunkRecord.metadata.status || 'unknown';
// 检查上传超时:如果状态是 uploading 但超过了超时阈值,标记为超时
if (status === 'uploading' && chunkRecord.metadata.timeoutThreshold && currentTime > chunkRecord.metadata.timeoutThreshold) {
status = 'timeout';
// 异步更新状态为超时,不阻塞主流程
const timeoutMetadata = {
...chunkRecord.metadata,
status: 'timeout',
error: 'Upload timeout detected',
timeoutDetectedTime: currentTime
};
env.img_url.put(chunkKey, chunkRecord.value, {
metadata: timeoutMetadata,
expirationTtl: 3600
}).catch(err => console.warn(`Failed to update timeout status for chunk ${i}:`, err));
}
let hasData = false;
if (status === 'completed') {
// 已完成的分块通过uploadResult判断
hasData = !!chunkRecord.metadata.uploadResult;
} else if (status === 'uploading' || status === 'failed') {
// 正在上传失败的分块通过原始数据判断
} else if (status === 'uploading' || status === 'failed' || status === 'timeout') {
// 正在上传失败或超时的分块通过原始数据判断
hasData = (chunkRecord.value && chunkRecord.value.byteLength > 0);
} else {
// 其他状态也检查是否有数据
@@ -737,7 +883,10 @@ export async function checkChunkUploadStatuses(env, uploadId, totalChunks) {
hasData: hasData,
chunkSize: chunkRecord.metadata.chunkSize,
uploadTime: chunkRecord.metadata.uploadTime,
uploadChannel: chunkRecord.metadata.uploadChannel
uploadStartTime: chunkRecord.metadata.uploadStartTime,
timeoutThreshold: chunkRecord.metadata.timeoutThreshold,
uploadChannel: chunkRecord.metadata.uploadChannel,
isTimeout: status === 'timeout'
});
} else {
chunkStatuses.push({
@@ -792,6 +941,85 @@ export async function cleanupUploadSession(env, uploadId) {
}
}
// 清理超时和失败的分块数据
export async function cleanupTimeoutChunks(env, uploadId, totalChunks) {
try {
const currentTime = Date.now();
const cleanupPromises = [];
for (let i = 0; i < totalChunks; i++) {
const chunkKey = `chunk_${uploadId}_${i.toString().padStart(3, '0')}`;
const cleanupPromise = (async () => {
try {
const chunkRecord = await env.img_url.getWithMetadata(chunkKey);
if (chunkRecord && chunkRecord.metadata) {
const status = chunkRecord.metadata.status;
const timeoutThreshold = chunkRecord.metadata.timeoutThreshold;
// 清理超时、失败或长时间未完成的分块
const shouldCleanup =
status === 'timeout' ||
status === 'failed' ||
status === 'retry_timeout' ||
status === 'retry_failed' ||
(status === 'uploading' && timeoutThreshold && currentTime > timeoutThreshold + 300000); // 超时5分钟后清理
if (shouldCleanup) {
await env.img_url.delete(chunkKey);
console.log(`Cleaned up timeout/failed chunk ${i} for ${uploadId}`);
}
}
} catch (chunkError) {
console.warn(`Failed to cleanup chunk ${i}:`, chunkError);
}
})();
cleanupPromises.push(cleanupPromise);
}
await Promise.allSettled(cleanupPromises);
console.log(`Cleanup completed for timeout chunks of ${uploadId}`);
} catch (cleanupError) {
console.warn('Failed to cleanup timeout chunks:', cleanupError);
}
}
// 强制清理所有相关数据(用于彻底清理失败的上传)
export async function forceCleanupUpload(env, uploadId, totalChunks) {
try {
const cleanupPromises = [];
// 清理所有分块
for (let i = 0; i < totalChunks; i++) {
const chunkKey = `chunk_${uploadId}_${i.toString().padStart(3, '0')}`;
cleanupPromises.push(env.img_url.delete(chunkKey).catch(err =>
console.warn(`Failed to delete chunk ${i}:`, err)
));
}
// 清理相关的键
const keysToCleanup = [
`upload_session_${uploadId}`,
`multipart_${uploadId}`,
`merge_status_${uploadId}`
];
keysToCleanup.forEach(key => {
cleanupPromises.push(env.img_url.delete(key).catch(err =>
console.warn(`Failed to delete key ${key}:`, err)
));
});
await Promise.allSettled(cleanupPromises);
console.log(`Force cleanup completed for ${uploadId}`);
} catch (cleanupError) {
console.warn('Failed to force cleanup upload:', cleanupError);
}
}
/* ======= 单个大文件大文件分块上传到Telegram ======= */
export async function uploadLargeFileToTelegram(context, file, fullId, metadata, fileName, fileType, returnLink, tgBotToken, tgChatId, tgChannel) {
const { env } = context;