This commit is contained in:
GH Action - Upstream Sync
2025-08-16 01:21:21 +00:00
2 changed files with 109 additions and 18 deletions

View File

@@ -1,4 +1,5 @@
import { readIndex, getIndexInfo, rebuildIndex, getIndexStorageStats } from '../../utils/indexManager.js';
import { readIndex, mergeOperationsToIndex, deleteAllOperations, rebuildIndex,
getIndexInfo, getIndexStorageStats } from '../../utils/indexManager.js';
export async function onRequest(context) {
const { request, waitUntil } = context;
@@ -40,6 +41,24 @@ export async function onRequest(context) {
});
}
// 特殊操作:合并挂起的原子操作到索引
if (action === 'merge-operations') {
waitUntil(mergeOperationsToIndex(context));
return new Response('Operations merged into index asynchronously', {
headers: { "Content-Type": "text/plain" }
});
}
// 特殊操作:清除所有原子操作
if (action === 'delete-operations') {
waitUntil(deleteAllOperations(context));
return new Response('All operations deleted asynchronously', {
headers: { "Content-Type": "text/plain" }
});
}
// 特殊操作:获取索引存储信息
if (action === 'index-storage-stats') {
const stats = await getIndexStorageStats(context);

View File

@@ -284,7 +284,7 @@ export async function batchMoveFilesInIndex(context, moveOperations) {
* @returns {Object} 合并结果
*/
export async function mergeOperationsToIndex(context, options = {}) {
const { waitUntil } = context;
const { request } = context;
const { cleanupAfterMerge = true } = options;
try {
@@ -301,8 +301,11 @@ export async function mergeOperationsToIndex(context, options = {}) {
}
// 获取所有待处理的操作
const operations = await getAllPendingOperations(context, currentIndex.lastOperationId);
const operationsResult = await getAllPendingOperations(context, currentIndex.lastOperationId);
const operations = operationsResult.operations;
const isALLOperations = operationsResult.isAll;
if (operations.length === 0) {
console.log('No pending operations to merge');
return {
@@ -312,7 +315,7 @@ export async function mergeOperationsToIndex(context, options = {}) {
};
}
console.log(`Found ${operations.length} pending operations to merge`);
console.log(`Found ${operations.length} pending operations to merge. Is all operations: ${isALLOperations}, if there are remaining operations they will be processed in the next merge.`);
// 按时间戳排序操作,确保按正确顺序应用
operations.sort((a, b) => a.timestamp - b.timestamp);
@@ -405,7 +408,23 @@ export async function mergeOperationsToIndex(context, options = {}) {
// 清理已处理的操作记录
if (cleanupAfterMerge && processedOperationIds.length > 0) {
waitUntil(cleanupOperations(context, processedOperationIds));
await cleanupOperations(context, processedOperationIds);
}
// 如果未处理完所有操作,调用 merge-operations API 递归处理
if (!isALLOperations) {
console.log('There are remaining operations, will process them in subsequent calls.');
const headers = new Headers(request.headers);
const originUrl = new URL(request.url);
const mergeUrl = `${originUrl.protocol}//${originUrl.host}/api/manage/list?action=merge-operations`;
await fetch(mergeUrl, { method: 'GET', headers });
return {
success: false,
error: 'There are remaining operations, will process them in subsequent calls.'
};
}
const result = {
@@ -459,10 +478,17 @@ export async function readIndex(context, options = {}) {
const dirPrefix = directory === '' || directory.endsWith('/') ? directory : directory + '/';
// 处理挂起的操作
await mergeOperationsToIndex(context);
const mergeResult = await mergeOperationsToIndex(context);
if (!mergeResult.success) {
throw new Error('Failed to merge operations: ' + mergeResult.error);
}
// 获取当前索引
const index = await getIndex(context);
if (!index.success) {
throw new Error('Failed to get index');
}
let filteredFiles = index.files;
// 目录过滤
@@ -544,7 +570,7 @@ export async function readIndex(context, options = {}) {
totalCount: totalCount,
indexLastUpdated: index.lastUpdated,
returnedCount: resultFiles.length,
success: index.success ?? true
success: true
};
} catch (error) {
@@ -758,8 +784,12 @@ async function getAllPendingOperations(context, lastOperationId = null) {
const { env } = context;
const operations = [];
let cursor = null;
const MAX_OPERATION_COUNT = 30; // 单次获取的最大操作数量
let isALL = true; // 是否获取了所有操作
let operationCount = 0;
try {
while (true) {
const response = await env.img_url.list({
@@ -774,26 +804,36 @@ async function getAllPendingOperations(context, lastOperationId = null) {
continue;
}
if (operationCount >= MAX_OPERATION_COUNT) {
isALL = false; // 达到最大操作数量,停止获取
break;
}
try {
const operationData = await env.img_url.get(item.name);
if (operationData) {
const operation = JSON.parse(operationData);
operation.id = item.name.substring(OPERATION_KEY_PREFIX.length);
operations.push(operation);
operationCount++;
}
} catch (error) {
isALL = false;
console.warn(`Failed to parse operation ${item.name}:`, error);
}
}
cursor = response.cursor;
if (!cursor) break;
if (!cursor || operationCount >= MAX_OPERATION_COUNT) break;
}
} catch (error) {
console.error('Error getting pending operations:', error);
}
return operations;
return {
operations,
isAll: isALL,
}
}
/**
@@ -958,7 +998,7 @@ function applyBatchMoveOperation(index, data) {
}
/**
* 清理已处理的操作记录
* 并发清理指定的原子操作记录
* @param {Object} context - 上下文对象
* @param {Array} operationIds - 要清理的操作ID数组
* @param {number} concurrency - 并发数量默认为10
@@ -969,21 +1009,33 @@ async function cleanupOperations(context, operationIds, concurrency = 10) {
try {
console.log(`Cleaning up ${operationIds.length} processed operations with concurrency ${concurrency}...`);
let deletedCount = 0;
let errorCount = 0;
// 创建删除任务数组
const deleteTasks = operationIds.map(operationId => {
const operationKey = OPERATION_KEY_PREFIX + operationId;
return async () => {
try {
await env.img_url.delete(operationKey);
deletedCount++;
} catch (error) {
console.error(`Error deleting operation ${operationId}:`, error);
errorCount++;
}
};
});
// 使用并发控制执行删除操作
await promiseLimit(deleteTasks, concurrency);
console.log(`Successfully cleaned up ${operationIds.length} operations`);
console.log(`Successfully cleaned up ${deletedCount} operations, ${errorCount} operations failed.`);
return {
success: true,
deletedCount: deletedCount,
errorCount: errorCount,
};
} catch (error) {
console.error('Error cleaning up operations:', error);
}
@@ -995,7 +1047,7 @@ async function cleanupOperations(context, operationIds, concurrency = 10) {
* @returns {Object} 删除结果 { success, deletedCount, errors?, totalFound? }
*/
export async function deleteAllOperations(context) {
const { env } = context;
const { request, env } = context;
try {
console.log('Starting to delete all atomic operations...');
@@ -1033,11 +1085,31 @@ export async function deleteAllOperations(context) {
}
console.log(`Found ${totalFound} atomic operations to delete`);
// 批量删除原子操作
await cleanupOperations(context, allOperationIds);
console.log(`Delete all operations completed`);
// 限制单次删除的数量
const MAX_DELETE_BATCH = 40;
const toDeleteOperationIds = allOperationIds.slice(0, MAX_DELETE_BATCH);
// 批量删除原子操作
const cleanupResult = await cleanupOperations(context, toDeleteOperationIds);
// 剩余未删除的操作,调用 delete-operations API 进行递归删除
if (allOperationIds.length > MAX_DELETE_BATCH || cleanupResult.errorCount > 0) {
console.warn(`Too many operations (${allOperationIds.length}), only deleting first ${cleanupResult.deletedCount}. The remaining operations will be deleted in subsequent calls.`);
// 复制请求头,用于鉴权
const headers = new Headers(request.headers);
const originUrl = new URL(request.url);
const deleteUrl = `${originUrl.protocol}//${originUrl.host}/api/manage/list?action=delete-operations`
await fetch(deleteUrl, {
method: 'GET',
headers: headers
});
} else {
console.log(`Delete all operations completed`);
}
} catch (error) {
console.error('Error deleting all operations:', error);