mirror of
https://github.com/MarSeventh/CloudFlare-ImgBed.git
synced 2026-04-24 22:25:07 +00:00
Opt:优化索引更新策略
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
import { readIndex, getIndexInfo, rebuildIndex, getIndexStorageStats } from '../../utils/indexManager.js';
|
||||
import { readIndex, mergeOperationsToIndex, deleteAllOperations, rebuildIndex,
|
||||
getIndexInfo, getIndexStorageStats } from '../../utils/indexManager.js';
|
||||
|
||||
export async function onRequest(context) {
|
||||
const { request, waitUntil } = context;
|
||||
@@ -40,6 +41,24 @@ export async function onRequest(context) {
|
||||
});
|
||||
}
|
||||
|
||||
// 特殊操作:合并挂起的原子操作到索引
|
||||
if (action === 'merge-operations') {
|
||||
waitUntil(mergeOperationsToIndex(context));
|
||||
|
||||
return new Response('Operations merged into index asynchronously', {
|
||||
headers: { "Content-Type": "text/plain" }
|
||||
});
|
||||
}
|
||||
|
||||
// 特殊操作:清除所有原子操作
|
||||
if (action === 'delete-operations') {
|
||||
waitUntil(deleteAllOperations(context));
|
||||
|
||||
return new Response('All operations deleted asynchronously', {
|
||||
headers: { "Content-Type": "text/plain" }
|
||||
});
|
||||
}
|
||||
|
||||
// 特殊操作:获取索引存储信息
|
||||
if (action === 'index-storage-stats') {
|
||||
const stats = await getIndexStorageStats(context);
|
||||
|
||||
@@ -284,7 +284,7 @@ export async function batchMoveFilesInIndex(context, moveOperations) {
|
||||
* @returns {Object} 合并结果
|
||||
*/
|
||||
export async function mergeOperationsToIndex(context, options = {}) {
|
||||
const { waitUntil } = context;
|
||||
const { request } = context;
|
||||
const { cleanupAfterMerge = true } = options;
|
||||
|
||||
try {
|
||||
@@ -301,8 +301,11 @@ export async function mergeOperationsToIndex(context, options = {}) {
|
||||
}
|
||||
|
||||
// 获取所有待处理的操作
|
||||
const operations = await getAllPendingOperations(context, currentIndex.lastOperationId);
|
||||
|
||||
const operationsResult = await getAllPendingOperations(context, currentIndex.lastOperationId);
|
||||
|
||||
const operations = operationsResult.operations;
|
||||
const isALLOperations = operationsResult.isAll;
|
||||
|
||||
if (operations.length === 0) {
|
||||
console.log('No pending operations to merge');
|
||||
return {
|
||||
@@ -312,7 +315,7 @@ export async function mergeOperationsToIndex(context, options = {}) {
|
||||
};
|
||||
}
|
||||
|
||||
console.log(`Found ${operations.length} pending operations to merge`);
|
||||
console.log(`Found ${operations.length} pending operations to merge. Is all operations: ${isALLOperations}, if there are remaining operations they will be processed in the next merge.`);
|
||||
|
||||
// 按时间戳排序操作,确保按正确顺序应用
|
||||
operations.sort((a, b) => a.timestamp - b.timestamp);
|
||||
@@ -405,7 +408,23 @@ export async function mergeOperationsToIndex(context, options = {}) {
|
||||
|
||||
// 清理已处理的操作记录
|
||||
if (cleanupAfterMerge && processedOperationIds.length > 0) {
|
||||
waitUntil(cleanupOperations(context, processedOperationIds));
|
||||
await cleanupOperations(context, processedOperationIds);
|
||||
}
|
||||
|
||||
// 如果未处理完所有操作,调用 merge-operations API 递归处理
|
||||
if (!isALLOperations) {
|
||||
console.log('There are remaining operations, will process them in subsequent calls.');
|
||||
|
||||
const headers = new Headers(request.headers);
|
||||
const originUrl = new URL(request.url);
|
||||
const mergeUrl = `${originUrl.protocol}//${originUrl.host}/api/manage/list?action=merge-operations`;
|
||||
|
||||
await fetch(mergeUrl, { method: 'GET', headers });
|
||||
|
||||
return {
|
||||
success: false,
|
||||
error: 'There are remaining operations, will process them in subsequent calls.'
|
||||
};
|
||||
}
|
||||
|
||||
const result = {
|
||||
@@ -459,10 +478,17 @@ export async function readIndex(context, options = {}) {
|
||||
const dirPrefix = directory === '' || directory.endsWith('/') ? directory : directory + '/';
|
||||
|
||||
// 处理挂起的操作
|
||||
await mergeOperationsToIndex(context);
|
||||
const mergeResult = await mergeOperationsToIndex(context);
|
||||
if (!mergeResult.success) {
|
||||
throw new Error('Failed to merge operations: ' + mergeResult.error);
|
||||
}
|
||||
|
||||
// 获取当前索引
|
||||
const index = await getIndex(context);
|
||||
if (!index.success) {
|
||||
throw new Error('Failed to get index');
|
||||
}
|
||||
|
||||
let filteredFiles = index.files;
|
||||
|
||||
// 目录过滤
|
||||
@@ -544,7 +570,7 @@ export async function readIndex(context, options = {}) {
|
||||
totalCount: totalCount,
|
||||
indexLastUpdated: index.lastUpdated,
|
||||
returnedCount: resultFiles.length,
|
||||
success: index.success ?? true
|
||||
success: true
|
||||
};
|
||||
|
||||
} catch (error) {
|
||||
@@ -758,8 +784,12 @@ async function getAllPendingOperations(context, lastOperationId = null) {
|
||||
const { env } = context;
|
||||
|
||||
const operations = [];
|
||||
|
||||
let cursor = null;
|
||||
|
||||
const MAX_OPERATION_COUNT = 30; // 单次获取的最大操作数量
|
||||
let isALL = true; // 是否获取了所有操作
|
||||
let operationCount = 0;
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
const response = await env.img_url.list({
|
||||
@@ -774,26 +804,36 @@ async function getAllPendingOperations(context, lastOperationId = null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (operationCount >= MAX_OPERATION_COUNT) {
|
||||
isALL = false; // 达到最大操作数量,停止获取
|
||||
break;
|
||||
}
|
||||
|
||||
try {
|
||||
const operationData = await env.img_url.get(item.name);
|
||||
if (operationData) {
|
||||
const operation = JSON.parse(operationData);
|
||||
operation.id = item.name.substring(OPERATION_KEY_PREFIX.length);
|
||||
operations.push(operation);
|
||||
operationCount++;
|
||||
}
|
||||
} catch (error) {
|
||||
isALL = false;
|
||||
console.warn(`Failed to parse operation ${item.name}:`, error);
|
||||
}
|
||||
}
|
||||
|
||||
cursor = response.cursor;
|
||||
if (!cursor) break;
|
||||
if (!cursor || operationCount >= MAX_OPERATION_COUNT) break;
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('Error getting pending operations:', error);
|
||||
}
|
||||
|
||||
return operations;
|
||||
return {
|
||||
operations,
|
||||
isAll: isALL,
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -958,7 +998,7 @@ function applyBatchMoveOperation(index, data) {
|
||||
}
|
||||
|
||||
/**
|
||||
* 清理已处理的操作记录
|
||||
* 并发清理指定的原子操作记录
|
||||
* @param {Object} context - 上下文对象
|
||||
* @param {Array} operationIds - 要清理的操作ID数组
|
||||
* @param {number} concurrency - 并发数量,默认为10
|
||||
@@ -969,21 +1009,33 @@ async function cleanupOperations(context, operationIds, concurrency = 10) {
|
||||
try {
|
||||
console.log(`Cleaning up ${operationIds.length} processed operations with concurrency ${concurrency}...`);
|
||||
|
||||
let deletedCount = 0;
|
||||
let errorCount = 0;
|
||||
|
||||
// 创建删除任务数组
|
||||
const deleteTasks = operationIds.map(operationId => {
|
||||
const operationKey = OPERATION_KEY_PREFIX + operationId;
|
||||
return async () => {
|
||||
try {
|
||||
await env.img_url.delete(operationKey);
|
||||
deletedCount++;
|
||||
} catch (error) {
|
||||
console.error(`Error deleting operation ${operationId}:`, error);
|
||||
errorCount++;
|
||||
}
|
||||
};
|
||||
});
|
||||
|
||||
// 使用并发控制执行删除操作
|
||||
await promiseLimit(deleteTasks, concurrency);
|
||||
console.log(`Successfully cleaned up ${operationIds.length} operations`);
|
||||
|
||||
console.log(`Successfully cleaned up ${deletedCount} operations, ${errorCount} operations failed.`);
|
||||
return {
|
||||
success: true,
|
||||
deletedCount: deletedCount,
|
||||
errorCount: errorCount,
|
||||
};
|
||||
|
||||
} catch (error) {
|
||||
console.error('Error cleaning up operations:', error);
|
||||
}
|
||||
@@ -995,7 +1047,7 @@ async function cleanupOperations(context, operationIds, concurrency = 10) {
|
||||
* @returns {Object} 删除结果 { success, deletedCount, errors?, totalFound? }
|
||||
*/
|
||||
export async function deleteAllOperations(context) {
|
||||
const { env } = context;
|
||||
const { request, env } = context;
|
||||
|
||||
try {
|
||||
console.log('Starting to delete all atomic operations...');
|
||||
@@ -1033,11 +1085,31 @@ export async function deleteAllOperations(context) {
|
||||
}
|
||||
|
||||
console.log(`Found ${totalFound} atomic operations to delete`);
|
||||
|
||||
// 批量删除原子操作
|
||||
await cleanupOperations(context, allOperationIds);
|
||||
|
||||
console.log(`Delete all operations completed`);
|
||||
// 限制单次删除的数量
|
||||
const MAX_DELETE_BATCH = 40;
|
||||
const toDeleteOperationIds = allOperationIds.slice(0, MAX_DELETE_BATCH);
|
||||
|
||||
// 批量删除原子操作
|
||||
const cleanupResult = await cleanupOperations(context, toDeleteOperationIds);
|
||||
|
||||
// 剩余未删除的操作,调用 delete-operations API 进行递归删除
|
||||
if (allOperationIds.length > MAX_DELETE_BATCH || cleanupResult.errorCount > 0) {
|
||||
console.warn(`Too many operations (${allOperationIds.length}), only deleting first ${cleanupResult.deletedCount}. The remaining operations will be deleted in subsequent calls.`);
|
||||
// 复制请求头,用于鉴权
|
||||
const headers = new Headers(request.headers);
|
||||
|
||||
const originUrl = new URL(request.url);
|
||||
const deleteUrl = `${originUrl.protocol}//${originUrl.host}/api/manage/list?action=delete-operations`
|
||||
|
||||
await fetch(deleteUrl, {
|
||||
method: 'GET',
|
||||
headers: headers
|
||||
});
|
||||
|
||||
} else {
|
||||
console.log(`Delete all operations completed`);
|
||||
}
|
||||
|
||||
} catch (error) {
|
||||
console.error('Error deleting all operations:', error);
|
||||
|
||||
Reference in New Issue
Block a user