mirror of
https://github.com/MarSeventh/CloudFlare-ImgBed.git
synced 2026-05-03 02:16:35 +00:00
1205 lines
38 KiB
JavaScript
1205 lines
38 KiB
JavaScript
/* 索引管理器 */
|
||
|
||
/**
|
||
* 文件索引结构:
|
||
* - key: manage@index
|
||
* - value: JSON.stringify(fileIndex)
|
||
* - fileIndex: {
|
||
* files: [
|
||
* {
|
||
* id: "file_unique_id",
|
||
* metadata: {}
|
||
* }
|
||
* ],
|
||
* lastUpdated: 1640995200000,
|
||
* totalCount: 1000,
|
||
* lastOperationId: "operation_timestamp_uuid"
|
||
* }
|
||
*
|
||
* 原子操作结构:
|
||
* - key: manage@index@operation_${timestamp}_${uuid}
|
||
* - value: JSON.stringify(operation)
|
||
* - operation: {
|
||
* type: "add" | "remove" | "move" | "batch_add" | "batch_remove" | "batch_move",
|
||
* timestamp: 1640995200000,
|
||
* data: {
|
||
* // 根据操作类型包含不同的数据
|
||
* }
|
||
* }
|
||
*/
|
||
|
||
const INDEX_KEY = 'manage@index';
|
||
const OPERATION_KEY_PREFIX = 'manage@index@operation_';
|
||
const KV_LIST_LIMIT = 1000; // KV 列出批量大小
|
||
const BATCH_SIZE = 10; // 批量处理大小
|
||
|
||
/**
|
||
* 添加文件到索引
|
||
* @param {Object} context - 上下文对象,包含 env 和其他信息
|
||
* @param {string} fileId - 文件 ID
|
||
* @param {Object} metadata - 文件元数据
|
||
*/
|
||
export async function addFileToIndex(context, fileId, metadata = null) {
|
||
const { env } = context;
|
||
|
||
try {
|
||
if (metadata === null) {
|
||
// 如果未传入metadata,尝试从KV中获取
|
||
const fileData = await env.img_url.getWithMetadata(fileId);
|
||
metadata = fileData.metadata || {};
|
||
}
|
||
|
||
// 记录原子操作
|
||
const operationId = await recordOperation(context, 'add', {
|
||
fileId,
|
||
metadata
|
||
});
|
||
|
||
console.log(`File ${fileId} add operation recorded with ID: ${operationId}`);
|
||
return { success: true, operationId };
|
||
} catch (error) {
|
||
console.error('Error recording add file operation:', error);
|
||
return { success: false, error: error.message };
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 批量添加文件到索引
|
||
* @param {Object} context - 上下文对象,包含 env 和其他信息
|
||
* @param {Array} files - 文件数组,每个元素包含 { fileId, metadata }
|
||
* @param {Object} options - 选项
|
||
* @param {boolean} options.skipExisting - 是否跳过已存在的文件,默认为 false(更新已存在的文件)
|
||
* @returns {Object} 返回操作结果 { operationId, totalProcessed }
|
||
*/
|
||
export async function batchAddFilesToIndex(context, files, options = {}) {
|
||
try {
|
||
const { env } = context;
|
||
const { skipExisting = false } = options;
|
||
|
||
// 处理每个文件的metadata
|
||
const processedFiles = [];
|
||
for (const fileItem of files) {
|
||
const { fileId, metadata } = fileItem;
|
||
let finalMetadata = metadata;
|
||
|
||
// 如果没有提供metadata,尝试从KV中获取
|
||
if (!finalMetadata) {
|
||
try {
|
||
const fileData = await env.img_url.getWithMetadata(fileId);
|
||
finalMetadata = fileData.metadata || {};
|
||
} catch (error) {
|
||
console.warn(`Failed to get metadata for file ${fileId}:`, error);
|
||
finalMetadata = {};
|
||
}
|
||
}
|
||
|
||
processedFiles.push({
|
||
fileId,
|
||
metadata: finalMetadata
|
||
});
|
||
}
|
||
|
||
// 记录批量添加操作
|
||
const operationId = await recordOperation(context, 'batch_add', {
|
||
files: processedFiles,
|
||
options: { skipExisting }
|
||
});
|
||
|
||
console.log(`Batch add operation recorded with ID: ${operationId}, ${files.length} files`);
|
||
return {
|
||
success: true,
|
||
operationId,
|
||
totalProcessed: files.length
|
||
};
|
||
} catch (error) {
|
||
console.error('Error recording batch add files operation:', error);
|
||
return {
|
||
success: false,
|
||
error: error.message,
|
||
totalProcessed: 0
|
||
};
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 从索引中删除文件
|
||
* @param {Object} context - 上下文对象
|
||
* @param {string} fileId - 文件 ID
|
||
*/
|
||
export async function removeFileFromIndex(context, fileId) {
|
||
try {
|
||
// 记录删除操作
|
||
const operationId = await recordOperation(context, 'remove', {
|
||
fileId
|
||
});
|
||
|
||
console.log(`File ${fileId} remove operation recorded with ID: ${operationId}`);
|
||
return { success: true, operationId };
|
||
} catch (error) {
|
||
console.error('Error recording remove file operation:', error);
|
||
return { success: false, error: error.message };
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 批量删除文件
|
||
* @param {Object} context - 上下文对象
|
||
* @param {Array} fileIds - 文件 ID 数组
|
||
*/
|
||
export async function batchRemoveFilesFromIndex(context, fileIds) {
|
||
try {
|
||
// 记录批量删除操作
|
||
const operationId = await recordOperation(context, 'batch_remove', {
|
||
fileIds
|
||
});
|
||
|
||
console.log(`Batch remove operation recorded with ID: ${operationId}, ${fileIds.length} files`);
|
||
return {
|
||
success: true,
|
||
operationId,
|
||
totalProcessed: fileIds.length
|
||
};
|
||
} catch (error) {
|
||
console.error('Error recording batch remove files operation:', error);
|
||
return {
|
||
success: false,
|
||
error: error.message,
|
||
totalProcessed: 0
|
||
};
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 移动文件(修改文件ID)
|
||
* @param {Object} context - 上下文对象,包含 env 和其他信息
|
||
* @param {string} originalFileId - 原文件 ID
|
||
* @param {string} newFileId - 新文件 ID
|
||
* @param {Object} newMetadata - 新的元数据,如果为null则获取原文件的metadata
|
||
* @returns {Object} 返回操作结果 { success, operationId?, error? }
|
||
*/
|
||
export async function moveFileInIndex(context, originalFileId, newFileId, newMetadata = null) {
|
||
try {
|
||
const { env } = context;
|
||
|
||
// 确定最终的metadata
|
||
let finalMetadata = newMetadata;
|
||
if (finalMetadata === null) {
|
||
// 如果没有提供新metadata,尝试从KV中获取
|
||
try {
|
||
const fileData = await env.img_url.getWithMetadata(newFileId);
|
||
finalMetadata = fileData.metadata || {};
|
||
} catch (error) {
|
||
console.warn(`Failed to get metadata for new file ${newFileId}:`, error);
|
||
finalMetadata = {};
|
||
}
|
||
}
|
||
|
||
// 记录移动操作
|
||
const operationId = await recordOperation(context, 'move', {
|
||
originalFileId,
|
||
newFileId,
|
||
metadata: finalMetadata
|
||
});
|
||
|
||
console.log(`File move operation from ${originalFileId} to ${newFileId} recorded with ID: ${operationId}`);
|
||
return { success: true, operationId };
|
||
} catch (error) {
|
||
console.error('Error recording move file operation:', error);
|
||
return { success: false, error: error.message };
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 批量移动文件
|
||
* @param {Object} context - 上下文对象,包含 env 和其他信息
|
||
* @param {Array} moveOperations - 移动操作数组,每个元素包含 { originalFileId, newFileId, metadata? }
|
||
* @returns {Object} 返回操作结果 { operationId, totalProcessed }
|
||
*/
|
||
export async function batchMoveFilesInIndex(context, moveOperations) {
|
||
try {
|
||
const { env } = context;
|
||
|
||
// 处理每个移动操作的metadata
|
||
const processedOperations = [];
|
||
for (const operation of moveOperations) {
|
||
const { originalFileId, newFileId, metadata } = operation;
|
||
|
||
// 确定最终的metadata
|
||
let finalMetadata = metadata;
|
||
if (finalMetadata === null || finalMetadata === undefined) {
|
||
// 如果没有提供新metadata,尝试从KV中获取
|
||
try {
|
||
const fileData = await env.img_url.getWithMetadata(newFileId);
|
||
finalMetadata = fileData.metadata || {};
|
||
} catch (error) {
|
||
console.warn(`Failed to get metadata for new file ${newFileId}:`, error);
|
||
finalMetadata = {};
|
||
}
|
||
}
|
||
|
||
processedOperations.push({
|
||
originalFileId,
|
||
newFileId,
|
||
metadata: finalMetadata
|
||
});
|
||
}
|
||
|
||
// 记录批量移动操作
|
||
const operationId = await recordOperation(context, 'batch_move', {
|
||
operations: processedOperations
|
||
});
|
||
|
||
console.log(`Batch move operation recorded with ID: ${operationId}, ${moveOperations.length} operations`);
|
||
return {
|
||
success: true,
|
||
operationId,
|
||
totalProcessed: moveOperations.length
|
||
};
|
||
} catch (error) {
|
||
console.error('Error recording batch move files operation:', error);
|
||
return {
|
||
success: false,
|
||
error: error.message,
|
||
totalProcessed: 0
|
||
};
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 合并所有挂起的操作到索引中
|
||
* @param {Object} context - 上下文对象
|
||
* @param {Object} options - 选项
|
||
* @param {boolean} options.cleanupAfterMerge - 合并后是否清理操作记录,默认为 true
|
||
* @returns {Object} 合并结果
|
||
*/
|
||
export async function mergeOperationsToIndex(context, options = {}) {
|
||
const { env } = context;
|
||
const { cleanupAfterMerge = true } = options;
|
||
|
||
try {
|
||
console.log('Starting operations merge...');
|
||
|
||
// 获取当前索引
|
||
const currentIndex = await getIndex(context);
|
||
if (currentIndex.success === false) {
|
||
console.error('Failed to get current index for merge');
|
||
return {
|
||
success: false,
|
||
error: 'Failed to get current index'
|
||
};
|
||
}
|
||
|
||
// 获取所有待处理的操作
|
||
const operations = await getAllPendingOperations(context, currentIndex.lastOperationId);
|
||
|
||
if (operations.length === 0) {
|
||
console.log('No pending operations to merge');
|
||
return {
|
||
success: true,
|
||
processedOperations: 0,
|
||
message: 'No pending operations'
|
||
};
|
||
}
|
||
|
||
console.log(`Found ${operations.length} pending operations to merge`);
|
||
|
||
// 按时间戳排序操作,确保按正确顺序应用
|
||
operations.sort((a, b) => a.timestamp - b.timestamp);
|
||
|
||
// 创建索引的副本进行操作
|
||
const workingIndex = currentIndex;
|
||
let operationsProcessed = 0;
|
||
let addedCount = 0;
|
||
let removedCount = 0;
|
||
let movedCount = 0;
|
||
let updatedCount = 0;
|
||
const processedOperationIds = [];
|
||
|
||
// 应用每个操作
|
||
for (const operation of operations) {
|
||
try {
|
||
switch (operation.type) {
|
||
case 'add':
|
||
const addResult = applyAddOperation(workingIndex, operation.data);
|
||
if (addResult.added) addedCount++;
|
||
if (addResult.updated) updatedCount++;
|
||
break;
|
||
|
||
case 'remove':
|
||
if (applyRemoveOperation(workingIndex, operation.data)) {
|
||
removedCount++;
|
||
}
|
||
break;
|
||
|
||
case 'move':
|
||
if (applyMoveOperation(workingIndex, operation.data)) {
|
||
movedCount++;
|
||
}
|
||
break;
|
||
|
||
case 'batch_add':
|
||
const batchAddResult = applyBatchAddOperation(workingIndex, operation.data);
|
||
addedCount += batchAddResult.addedCount;
|
||
updatedCount += batchAddResult.updatedCount;
|
||
break;
|
||
|
||
case 'batch_remove':
|
||
removedCount += applyBatchRemoveOperation(workingIndex, operation.data);
|
||
break;
|
||
|
||
case 'batch_move':
|
||
movedCount += applyBatchMoveOperation(workingIndex, operation.data);
|
||
break;
|
||
|
||
default:
|
||
console.warn(`Unknown operation type: ${operation.type}`);
|
||
continue;
|
||
}
|
||
|
||
operationsProcessed++;
|
||
processedOperationIds.push(operation.id);
|
||
|
||
// 增加协作点
|
||
if (operationsProcessed % 3 === 0) {
|
||
await new Promise(resolve => setTimeout(resolve, 0));
|
||
}
|
||
|
||
} catch (error) {
|
||
console.error(`Error applying operation ${operation.id}:`, error);
|
||
}
|
||
}
|
||
|
||
// 如果有任何修改,保存索引
|
||
if (operationsProcessed > 0) {
|
||
workingIndex.lastUpdated = Date.now();
|
||
workingIndex.totalCount = workingIndex.files.length;
|
||
|
||
// 记录最后处理的操作ID
|
||
if (processedOperationIds.length > 0) {
|
||
workingIndex.lastOperationId = processedOperationIds[processedOperationIds.length - 1];
|
||
}
|
||
|
||
// 保存更新后的索引
|
||
await env.img_url.put(INDEX_KEY, JSON.stringify(workingIndex));
|
||
|
||
console.log(`Index updated: ${addedCount} added, ${updatedCount} updated, ${removedCount} removed, ${movedCount} moved`);
|
||
}
|
||
|
||
// 清理已处理的操作记录
|
||
if (cleanupAfterMerge && processedOperationIds.length > 0) {
|
||
await cleanupOperations(context, processedOperationIds);
|
||
}
|
||
|
||
const result = {
|
||
success: true,
|
||
processedOperations: operationsProcessed,
|
||
addedCount,
|
||
updatedCount,
|
||
removedCount,
|
||
movedCount,
|
||
totalFiles: workingIndex.totalCount
|
||
};
|
||
|
||
console.log('Operations merge completed:', result);
|
||
return result;
|
||
|
||
} catch (error) {
|
||
console.error('Error merging operations:', error);
|
||
return {
|
||
success: false,
|
||
error: error.message
|
||
};
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 读取文件索引,支持搜索和分页
|
||
* @param {Object} context - 上下文对象
|
||
* @param {Object} options - 查询选项
|
||
* @param {string} options.search - 搜索关键字
|
||
* @param {string} options.directory - 目录过滤
|
||
* @param {number} options.start - 起始位置
|
||
* @param {number} options.count - 返回数量,-1 表示返回所有
|
||
* @param {string} options.channel - 渠道过滤
|
||
* @param {string} options.listType - 列表类型过滤
|
||
* @param {boolean} options.countOnly - 仅返回总数
|
||
*/
|
||
export async function readIndex(context, options = {}) {
|
||
try {
|
||
const {
|
||
search = '',
|
||
directory = '',
|
||
start = 0,
|
||
count = 50,
|
||
channel = '',
|
||
listType = '',
|
||
countOnly = false
|
||
} = options;
|
||
|
||
const index = await getIndex(context);
|
||
let filteredFiles = index.files;
|
||
|
||
// 目录过滤
|
||
if (directory) {
|
||
const normalizedDir = directory.endsWith('/') ? directory : directory + '/';
|
||
filteredFiles = filteredFiles.filter(file => {
|
||
const fileDir = file.metadata.Directory ? file.metadata.Directory : extractDirectory(file.id);
|
||
return fileDir.startsWith(normalizedDir) || file.metadata.Directory === directory;
|
||
});
|
||
}
|
||
|
||
// 渠道过滤
|
||
if (channel) {
|
||
filteredFiles = filteredFiles.filter(file =>
|
||
file.metadata.Channel.toLowerCase() === channel.toLowerCase()
|
||
);
|
||
}
|
||
|
||
// 列表类型过滤
|
||
if (listType) {
|
||
filteredFiles = filteredFiles.filter(file =>
|
||
file.metadata.ListType === listType
|
||
);
|
||
}
|
||
|
||
// 搜索过滤
|
||
if (search) {
|
||
const searchLower = search.toLowerCase();
|
||
filteredFiles = filteredFiles.filter(file => {
|
||
return file.metadata.FileName?.toLowerCase().includes(searchLower) ||
|
||
file.id.toLowerCase().includes(searchLower);
|
||
});
|
||
}
|
||
|
||
// 如果只需要总数
|
||
if (countOnly) {
|
||
return {
|
||
totalCount: filteredFiles.length,
|
||
indexLastUpdated: index.lastUpdated
|
||
};
|
||
}
|
||
|
||
// 分页处理
|
||
const totalCount = filteredFiles.length;
|
||
// 获取当前目录下的直接文件
|
||
let resultFiles = filteredFiles.filter(file => {
|
||
const fileDir = file.metadata.Directory ? file.metadata.Directory : extractDirectory(file.id);
|
||
const dirPrefix = directory === '' || directory.endsWith('/') ? directory : directory + '/';
|
||
return fileDir === dirPrefix;
|
||
});
|
||
|
||
if (count !== -1) {
|
||
const startIndex = Math.max(0, start);
|
||
const endIndex = startIndex + Math.max(1, count);
|
||
resultFiles = resultFiles.slice(startIndex, endIndex);
|
||
}
|
||
|
||
// 提取目录信息
|
||
const directories = new Set();
|
||
if (directory === '') {
|
||
// 如果查询根目录,提取一级子目录
|
||
filteredFiles.forEach(file => {
|
||
const fileDir = file.metadata.Directory ? file.metadata.Directory : extractDirectory(file.id);
|
||
if (fileDir) {
|
||
const firstSlashIndex = fileDir.indexOf('/');
|
||
const topLevelDir = firstSlashIndex === -1 ?
|
||
fileDir :
|
||
fileDir.substring(0, firstSlashIndex);
|
||
directories.add(topLevelDir);
|
||
}
|
||
});
|
||
} else {
|
||
// 如果查询特定目录,提取其子目录
|
||
const dirPrefix = directory.endsWith('/') ? directory : directory + '/';
|
||
filteredFiles.forEach(file => {
|
||
const fileDir = file.metadata.Directory ? file.metadata.Directory : extractDirectory(file.id);
|
||
if (fileDir && fileDir.startsWith(dirPrefix)) {
|
||
const relativePath = fileDir.substring(dirPrefix.length);
|
||
const firstSlashIndex = relativePath.indexOf('/');
|
||
if (firstSlashIndex !== -1) {
|
||
const subDir = dirPrefix + relativePath.substring(0, firstSlashIndex);
|
||
directories.add(subDir);
|
||
}
|
||
}
|
||
});
|
||
}
|
||
|
||
return {
|
||
files: resultFiles,
|
||
directories: Array.from(directories),
|
||
totalCount: totalCount,
|
||
indexLastUpdated: index.lastUpdated,
|
||
returnedCount: resultFiles.length
|
||
};
|
||
|
||
} catch (error) {
|
||
console.error('Error reading index:', error);
|
||
return {
|
||
files: [],
|
||
directories: [],
|
||
totalCount: 0,
|
||
indexLastUpdated: Date.now(),
|
||
returnedCount: 0
|
||
};
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 重建索引(从 KV 中的所有文件重新构建索引)
|
||
* @param {Object} context - 上下文对象
|
||
* @param {Function} progressCallback - 进度回调函数
|
||
*/
|
||
export async function rebuildIndex(context, progressCallback = null) {
|
||
const { env } = context;
|
||
|
||
try {
|
||
console.log('Starting index rebuild...');
|
||
|
||
let cursor = null;
|
||
let processedCount = 0;
|
||
const newIndex = {
|
||
files: [],
|
||
lastUpdated: Date.now(),
|
||
totalCount: 0,
|
||
lastOperationId: null
|
||
};
|
||
|
||
// 分批读取所有文件
|
||
while (true) {
|
||
const response = await env.img_url.list({
|
||
limit: KV_LIST_LIMIT,
|
||
cursor: cursor
|
||
});
|
||
|
||
cursor = response.cursor;
|
||
|
||
for (const item of response.keys) {
|
||
// 跳过管理相关的键
|
||
if (item.name.startsWith('manage@') || item.name.startsWith('chunk_')) {
|
||
continue;
|
||
}
|
||
|
||
// 跳过没有元数据的文件
|
||
if (!item.metadata || !item.metadata.TimeStamp) {
|
||
continue;
|
||
}
|
||
|
||
// 构建文件索引项
|
||
const fileItem = {
|
||
id: item.name,
|
||
metadata: item.metadata || {}
|
||
};
|
||
|
||
newIndex.files.push(fileItem);
|
||
processedCount++;
|
||
|
||
// 报告进度
|
||
if (progressCallback && processedCount % 100 === 0) {
|
||
progressCallback(processedCount);
|
||
}
|
||
}
|
||
|
||
if (!cursor) break;
|
||
|
||
// 添加协作点
|
||
await new Promise(resolve => setTimeout(resolve, 10));
|
||
}
|
||
|
||
// 按时间戳倒序排序
|
||
newIndex.files.sort((a, b) => b.metadata.TimeStamp - a.metadata.TimeStamp);
|
||
|
||
newIndex.totalCount = newIndex.files.length;
|
||
|
||
// 保存新索引
|
||
await env.img_url.put(INDEX_KEY, JSON.stringify(newIndex));
|
||
|
||
// 清除旧的操作记录
|
||
await deleteAllOperations(context, { force: true });
|
||
|
||
|
||
console.log(`Index rebuild completed. Processed ${processedCount} files, indexed ${newIndex.totalCount} files.`);
|
||
return {
|
||
success: true,
|
||
processedCount,
|
||
indexedCount: newIndex.totalCount
|
||
};
|
||
|
||
} catch (error) {
|
||
console.error('Error rebuilding index:', error);
|
||
return {
|
||
success: false,
|
||
error: error.message
|
||
};
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 获取索引信息
|
||
* @param {Object} context - 上下文对象
|
||
*/
|
||
export async function getIndexInfo(context) {
|
||
try {
|
||
const index = await getIndex(context);
|
||
|
||
// 检查索引是否成功获取
|
||
if (index.success === false) {
|
||
return {
|
||
success: false,
|
||
error: 'Failed to retrieve index',
|
||
message: 'Index is not available or corrupted'
|
||
}
|
||
}
|
||
|
||
// 统计各渠道文件数量
|
||
const channelStats = {};
|
||
const directoryStats = {};
|
||
const typeStats = {};
|
||
|
||
index.files.forEach(file => {
|
||
// 渠道统计
|
||
const channel = file.metadata.Channel || 'Unknown';
|
||
channelStats[channel] = (channelStats[channel] || 0) + 1;
|
||
|
||
// 目录统计
|
||
const dir = file.metadata.Directory || extractDirectory(file.id) || '/';
|
||
directoryStats[dir] = (directoryStats[dir] || 0) + 1;
|
||
|
||
// 类型统计
|
||
typeStats[file.metadata.ListType] = (typeStats[file.metadata.ListType] || 0) + 1;
|
||
});
|
||
|
||
return {
|
||
success: true,
|
||
totalFiles: index.totalCount,
|
||
lastUpdated: index.lastUpdated,
|
||
channelStats,
|
||
directoryStats,
|
||
typeStats,
|
||
oldestFile: index.files[index.files.length - 1],
|
||
newestFile: index.files[0]
|
||
};
|
||
} catch (error) {
|
||
console.error('Error getting index info:', error);
|
||
return null;
|
||
}
|
||
}
|
||
|
||
/* ============= 原子操作相关函数 ============= */
|
||
|
||
/**
|
||
* 生成唯一的操作ID
|
||
*/
|
||
function generateOperationId() {
|
||
const timestamp = Date.now();
|
||
const random = Math.random().toString(36).substring(2, 9);
|
||
return `${timestamp}_${random}`;
|
||
}
|
||
|
||
/**
|
||
* 记录原子操作
|
||
* @param {Object} context - 上下文对象,包含 env 和其他信息
|
||
* @param {string} type - 操作类型
|
||
* @param {Object} data - 操作数据
|
||
*/
|
||
async function recordOperation(context, type, data) {
|
||
const { env } = context;
|
||
|
||
const operationId = generateOperationId();
|
||
const operation = {
|
||
type,
|
||
timestamp: Date.now(),
|
||
data
|
||
};
|
||
|
||
const operationKey = OPERATION_KEY_PREFIX + operationId;
|
||
await env.img_url.put(operationKey, JSON.stringify(operation));
|
||
|
||
return operationId;
|
||
}
|
||
|
||
/**
|
||
* 获取所有待处理的操作
|
||
* @param {Object} context - 上下文对象
|
||
* @param {string} lastOperationId - 最后处理的操作ID
|
||
*/
|
||
async function getAllPendingOperations(context, lastOperationId = null) {
|
||
const { env } = context;
|
||
|
||
const operations = [];
|
||
let cursor = null;
|
||
|
||
try {
|
||
while (true) {
|
||
const response = await env.img_url.list({
|
||
prefix: OPERATION_KEY_PREFIX,
|
||
limit: KV_LIST_LIMIT,
|
||
cursor: cursor
|
||
});
|
||
|
||
for (const item of response.keys) {
|
||
// 如果指定了lastOperationId,跳过已处理的操作
|
||
if (lastOperationId && item.name <= OPERATION_KEY_PREFIX + lastOperationId) {
|
||
continue;
|
||
}
|
||
|
||
try {
|
||
const operationData = await env.img_url.get(item.name);
|
||
if (operationData) {
|
||
const operation = JSON.parse(operationData);
|
||
operation.id = item.name.substring(OPERATION_KEY_PREFIX.length);
|
||
operations.push(operation);
|
||
}
|
||
} catch (error) {
|
||
console.warn(`Failed to parse operation ${item.name}:`, error);
|
||
}
|
||
}
|
||
|
||
cursor = response.cursor;
|
||
if (!cursor) break;
|
||
}
|
||
} catch (error) {
|
||
console.error('Error getting pending operations:', error);
|
||
}
|
||
|
||
return operations;
|
||
}
|
||
|
||
/**
|
||
* 应用添加操作
|
||
* @param {Object} index - 索引对象
|
||
* @param {Object} data - 操作数据
|
||
*/
|
||
function applyAddOperation(index, data) {
|
||
const { fileId, metadata } = data;
|
||
|
||
// 检查文件是否已存在
|
||
const existingIndex = index.files.findIndex(file => file.id === fileId);
|
||
|
||
const fileItem = {
|
||
id: fileId,
|
||
metadata: metadata || {}
|
||
};
|
||
|
||
if (existingIndex !== -1) {
|
||
// 更新现有文件
|
||
index.files[existingIndex] = fileItem;
|
||
return { added: false, updated: true };
|
||
} else {
|
||
// 添加新文件
|
||
insertFileInOrder(index.files, fileItem);
|
||
return { added: true, updated: false };
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 应用删除操作
|
||
* @param {Object} index - 索引对象
|
||
* @param {Object} data - 操作数据
|
||
*/
|
||
function applyRemoveOperation(index, data) {
|
||
const { fileId } = data;
|
||
const initialLength = index.files.length;
|
||
index.files = index.files.filter(file => file.id !== fileId);
|
||
return index.files.length < initialLength;
|
||
}
|
||
|
||
/**
|
||
* 应用移动操作
|
||
* @param {Object} index - 索引对象
|
||
* @param {Object} data - 操作数据
|
||
*/
|
||
function applyMoveOperation(index, data) {
|
||
const { originalFileId, newFileId, metadata } = data;
|
||
|
||
const originalIndex = index.files.findIndex(file => file.id === originalFileId);
|
||
if (originalIndex === -1) {
|
||
return false; // 原文件不存在
|
||
}
|
||
|
||
// 更新文件ID和元数据
|
||
index.files[originalIndex] = {
|
||
id: newFileId,
|
||
metadata: metadata || index.files[originalIndex].metadata
|
||
};
|
||
|
||
return true;
|
||
}
|
||
|
||
/**
|
||
* 应用批量添加操作
|
||
* @param {Object} index - 索引对象
|
||
* @param {Object} data - 操作数据
|
||
*/
|
||
function applyBatchAddOperation(index, data) {
|
||
const { files, options } = data;
|
||
const { skipExisting = false } = options || {};
|
||
|
||
let addedCount = 0;
|
||
let updatedCount = 0;
|
||
|
||
// 创建现有文件ID的映射以提高查找效率
|
||
const existingFilesMap = new Map();
|
||
index.files.forEach((file, idx) => {
|
||
existingFilesMap.set(file.id, idx);
|
||
});
|
||
|
||
for (const fileData of files) {
|
||
const { fileId, metadata } = fileData;
|
||
const fileItem = {
|
||
id: fileId,
|
||
metadata: metadata || {}
|
||
};
|
||
|
||
const existingIndex = existingFilesMap.get(fileId);
|
||
|
||
if (existingIndex !== undefined) {
|
||
if (!skipExisting) {
|
||
// 更新现有文件
|
||
index.files[existingIndex] = fileItem;
|
||
updatedCount++;
|
||
}
|
||
} else {
|
||
// 添加新文件
|
||
insertFileInOrder(index.files, fileItem);
|
||
// 更新映射
|
||
index.files.forEach((file, idx) => {
|
||
existingFilesMap.set(file.id, idx);
|
||
});
|
||
|
||
addedCount++;
|
||
}
|
||
}
|
||
|
||
return { addedCount, updatedCount };
|
||
}
|
||
|
||
/**
|
||
* 应用批量删除操作
|
||
* @param {Object} index - 索引对象
|
||
* @param {Object} data - 操作数据
|
||
*/
|
||
function applyBatchRemoveOperation(index, data) {
|
||
const { fileIds } = data;
|
||
const fileIdSet = new Set(fileIds);
|
||
const initialLength = index.files.length;
|
||
|
||
index.files = index.files.filter(file => !fileIdSet.has(file.id));
|
||
|
||
return initialLength - index.files.length;
|
||
}
|
||
|
||
/**
|
||
* 应用批量移动操作
|
||
* @param {Object} index - 索引对象
|
||
* @param {Object} data - 操作数据
|
||
*/
|
||
function applyBatchMoveOperation(index, data) {
|
||
const { operations } = data;
|
||
let movedCount = 0;
|
||
|
||
// 创建现有文件ID的映射以提高查找效率
|
||
const existingFilesMap = new Map();
|
||
index.files.forEach((file, idx) => {
|
||
existingFilesMap.set(file.id, idx);
|
||
});
|
||
|
||
for (const operation of operations) {
|
||
const { originalFileId, newFileId, metadata } = operation;
|
||
|
||
const originalIndex = existingFilesMap.get(originalFileId);
|
||
if (originalIndex !== undefined) {
|
||
// 更新映射
|
||
existingFilesMap.delete(originalFileId);
|
||
existingFilesMap.set(newFileId, originalIndex);
|
||
|
||
// 更新文件信息
|
||
index.files[originalIndex] = {
|
||
id: newFileId,
|
||
metadata: metadata || index.files[originalIndex].metadata
|
||
};
|
||
|
||
movedCount++;
|
||
}
|
||
}
|
||
|
||
return movedCount;
|
||
}
|
||
|
||
/**
|
||
* 清理已处理的操作记录
|
||
* @param {Object} context - 上下文对象
|
||
* @param {Array} operationIds - 要清理的操作ID数组
|
||
* @param {number} concurrency - 并发数量,默认为10
|
||
*/
|
||
async function cleanupOperations(context, operationIds, concurrency = 10) {
|
||
const { env } = context;
|
||
|
||
try {
|
||
console.log(`Cleaning up ${operationIds.length} processed operations with concurrency ${concurrency}...`);
|
||
|
||
// 创建删除任务数组
|
||
const deleteTasks = operationIds.map(operationId => {
|
||
const operationKey = OPERATION_KEY_PREFIX + operationId;
|
||
return () => env.img_url.delete(operationKey);
|
||
});
|
||
|
||
// 使用并发控制执行删除操作
|
||
await promiseLimit(deleteTasks, concurrency);
|
||
console.log(`Successfully cleaned up ${operationIds.length} operations`);
|
||
} catch (error) {
|
||
console.error('Error cleaning up operations:', error);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 删除所有原子操作记录
|
||
* @param {Object} context - 上下文对象,包含 env 和其他信息
|
||
* @param {Object} options - 选项
|
||
* @param {boolean} options.force - 是否强制删除,即使有错误也继续,默认为 false
|
||
* @param {Function} options.progressCallback - 进度回调函数,接收 (processed, total) 参数
|
||
* @returns {Object} 删除结果 { success, deletedCount, errors?, totalFound? }
|
||
*/
|
||
export async function deleteAllOperations(context, options = {}) {
|
||
const { env } = context;
|
||
const { force = false, progressCallback = null } = options;
|
||
|
||
try {
|
||
console.log('Starting to delete all atomic operations...');
|
||
|
||
// 获取所有原子操作
|
||
const allOperations = [];
|
||
let cursor = null;
|
||
let totalFound = 0;
|
||
|
||
// 首先收集所有操作键
|
||
while (true) {
|
||
const response = await env.img_url.list({
|
||
prefix: OPERATION_KEY_PREFIX,
|
||
limit: KV_LIST_LIMIT,
|
||
cursor: cursor
|
||
});
|
||
|
||
for (const item of response.keys) {
|
||
allOperations.push(item.name);
|
||
totalFound++;
|
||
}
|
||
|
||
cursor = response.cursor;
|
||
if (!cursor) break;
|
||
}
|
||
|
||
if (totalFound === 0) {
|
||
console.log('No atomic operations found to delete');
|
||
return {
|
||
success: true,
|
||
deletedCount: 0,
|
||
totalFound: 0,
|
||
message: 'No operations to delete'
|
||
};
|
||
}
|
||
|
||
console.log(`Found ${totalFound} atomic operations to delete`);
|
||
|
||
// 批量删除操作
|
||
let deletedCount = 0;
|
||
const errors = [];
|
||
|
||
for (let i = 0; i < allOperations.length; i += BATCH_SIZE) {
|
||
const batch = allOperations.slice(i, i + BATCH_SIZE);
|
||
|
||
// 并行删除当前批次
|
||
const deletePromises = batch.map(async (operationKey) => {
|
||
try {
|
||
await env.img_url.delete(operationKey);
|
||
return { success: true, key: operationKey };
|
||
} catch (error) {
|
||
const errorInfo = {
|
||
success: false,
|
||
key: operationKey,
|
||
error: error.message
|
||
};
|
||
|
||
if (!force) {
|
||
throw error;
|
||
}
|
||
|
||
return errorInfo;
|
||
}
|
||
});
|
||
|
||
try {
|
||
const results = await Promise.all(deletePromises);
|
||
|
||
// 统计结果
|
||
for (const result of results) {
|
||
if (result.success) {
|
||
deletedCount++;
|
||
} else {
|
||
errors.push(result);
|
||
console.warn(`Failed to delete operation ${result.key}: ${result.error}`);
|
||
}
|
||
}
|
||
|
||
} catch (error) {
|
||
if (!force) {
|
||
console.error(`Error deleting operations batch:`, error);
|
||
return {
|
||
success: false,
|
||
error: error.message,
|
||
deletedCount,
|
||
totalFound
|
||
};
|
||
}
|
||
}
|
||
|
||
// 报告进度
|
||
if (progressCallback) {
|
||
progressCallback(Math.min(deletedCount + errors.length, totalFound), totalFound);
|
||
}
|
||
|
||
// 添加协作点,避免长时间阻塞
|
||
if (i + BATCH_SIZE < allOperations.length) {
|
||
await new Promise(resolve => setTimeout(resolve, 10));
|
||
}
|
||
}
|
||
|
||
const result = {
|
||
success: true,
|
||
deletedCount,
|
||
totalFound,
|
||
message: `Successfully deleted ${deletedCount} out of ${totalFound} operations`
|
||
};
|
||
|
||
// 如果有错误且开启了强制模式,包含错误信息
|
||
if (errors.length > 0) {
|
||
result.errors = errors;
|
||
result.errorCount = errors.length;
|
||
result.message += `, ${errors.length} operations failed to delete`;
|
||
}
|
||
|
||
console.log(`Delete all operations completed: ${result.message}`);
|
||
return result;
|
||
|
||
} catch (error) {
|
||
console.error('Error deleting all operations:', error);
|
||
return {
|
||
success: false,
|
||
error: error.message,
|
||
deletedCount: 0
|
||
};
|
||
}
|
||
}
|
||
|
||
/* ============= 工具函数 ============= */
|
||
|
||
/**
|
||
* 获取索引(内部函数)
|
||
* @param {Object} context - 上下文对象
|
||
*/
|
||
async function getIndex(context) {
|
||
const { env, waitUntil } = context;
|
||
try {
|
||
const indexData = await env.img_url.get(INDEX_KEY);
|
||
if (indexData) {
|
||
return JSON.parse(indexData);
|
||
} else {
|
||
waitUntil(rebuildIndex(context));
|
||
}
|
||
} catch (error) {
|
||
console.warn('Error reading index, creating new one:', error);
|
||
}
|
||
|
||
// 返回空的索引结构
|
||
return {
|
||
files: [],
|
||
lastUpdated: Date.now(),
|
||
totalCount: 0,
|
||
lastOperationId: null,
|
||
success: false,
|
||
};
|
||
}
|
||
|
||
/**
|
||
* 从文件路径提取目录(内部函数)
|
||
* @param {string} filePath - 文件路径
|
||
*/
|
||
function extractDirectory(filePath) {
|
||
const lastSlashIndex = filePath.lastIndexOf('/');
|
||
if (lastSlashIndex === -1) {
|
||
return ''; // 根目录
|
||
}
|
||
return filePath.substring(0, lastSlashIndex + 1); // 包含最后的斜杠
|
||
}
|
||
|
||
/**
|
||
* 将文件按时间戳倒序插入到已排序的数组中
|
||
* @param {Array} sortedFiles - 已按时间戳倒序排序的文件数组
|
||
* @param {Object} fileItem - 要插入的文件项
|
||
*/
|
||
function insertFileInOrder(sortedFiles, fileItem) {
|
||
const fileTimestamp = fileItem.metadata.TimeStamp || 0;
|
||
|
||
// 如果数组为空或新文件时间戳比第一个文件更新,直接插入到开头
|
||
if (sortedFiles.length === 0 || fileTimestamp >= (sortedFiles[0].metadata.TimeStamp || 0)) {
|
||
sortedFiles.unshift(fileItem);
|
||
return;
|
||
}
|
||
|
||
// 如果新文件时间戳比最后一个文件更旧,直接添加到末尾
|
||
if (fileTimestamp <= (sortedFiles[sortedFiles.length - 1].metadata.TimeStamp || 0)) {
|
||
sortedFiles.push(fileItem);
|
||
return;
|
||
}
|
||
|
||
// 使用二分查找找到正确的插入位置
|
||
let left = 0;
|
||
let right = sortedFiles.length;
|
||
|
||
while (left < right) {
|
||
const mid = Math.floor((left + right) / 2);
|
||
const midTimestamp = sortedFiles[mid].metadata.TimeStamp || 0;
|
||
|
||
if (fileTimestamp >= midTimestamp) {
|
||
right = mid;
|
||
} else {
|
||
left = mid + 1;
|
||
}
|
||
}
|
||
|
||
// 在找到的位置插入文件
|
||
sortedFiles.splice(left, 0, fileItem);
|
||
}
|
||
|
||
/**
|
||
* 并发控制工具函数 - 限制同时执行的Promise数量
|
||
* @param {Array} tasks - 任务数组,每个任务是一个返回Promise的函数
|
||
* @param {number} concurrency - 并发数量
|
||
* @returns {Promise<Array>} 所有任务的结果数组
|
||
*/
|
||
async function promiseLimit(tasks, concurrency = BATCH_SIZE) {
|
||
const results = [];
|
||
const executing = [];
|
||
|
||
for (let i = 0; i < tasks.length; i++) {
|
||
const task = tasks[i];
|
||
const promise = Promise.resolve().then(() => task()).then(result => {
|
||
results[i] = result;
|
||
return result;
|
||
}).finally(() => {
|
||
const index = executing.indexOf(promise);
|
||
if (index >= 0) {
|
||
executing.splice(index, 1);
|
||
}
|
||
});
|
||
|
||
executing.push(promise);
|
||
|
||
if (executing.length >= concurrency) {
|
||
await Promise.race(executing);
|
||
}
|
||
}
|
||
|
||
// 等待所有剩余的Promise完成
|
||
await Promise.all(executing);
|
||
return results;
|
||
} |