From 3d7c528c450bccd750e3632d86d7178930816a90 Mon Sep 17 00:00:00 2001 From: Innei Date: Fri, 31 Oct 2025 23:36:42 +0800 Subject: [PATCH] feat(builder): implement photo execution context for improved logging and storage key normalization - Introduced a new `PhotoExecutionContext` to manage execution context for photo processing, enhancing logger management and storage key normalization. - Refactored photo processing methods to utilize the new context, replacing global logger usage with context-based loggers. - Updated related modules to ensure compatibility with the new execution context, improving modularity and reducing global state dependencies. Signed-off-by: Innei --- .../core/src/modules/photo/photo.service.ts | 38 ++- packages/builder/src/builder/builder.ts | 139 ++++++-- packages/builder/src/cli.ts | 50 ++- packages/builder/src/cli/tui.ts | 322 ++++++++++++++++++ packages/builder/src/logger/index.ts | 100 +++++- packages/builder/src/photo/README.md | 8 +- .../builder/src/photo/execution-context.ts | 80 +++++ packages/builder/src/photo/image-pipeline.ts | 23 +- packages/builder/src/photo/index.ts | 7 + packages/builder/src/photo/info-extractor.ts | 34 +- packages/builder/src/photo/logger-adapter.ts | 33 +- packages/builder/src/photo/processor.ts | 30 +- packages/builder/src/plugins/loader.ts | 30 +- packages/builder/src/utils/clone.ts | 76 ++++- packages/builder/src/worker/cluster-pool.ts | 23 +- packages/builder/src/worker/pool.ts | 23 +- 16 files changed, 877 insertions(+), 139 deletions(-) create mode 100644 packages/builder/src/cli/tui.ts create mode 100644 packages/builder/src/photo/execution-context.ts diff --git a/be/apps/core/src/modules/photo/photo.service.ts b/be/apps/core/src/modules/photo/photo.service.ts index 6810cf44..90074937 100644 --- a/be/apps/core/src/modules/photo/photo.service.ts +++ b/be/apps/core/src/modules/photo/photo.service.ts @@ -14,8 +14,11 @@ import { thumbnailStoragePlugin, } from '@afilmory/builder' import type { Logger as BuilderLogger } from '@afilmory/builder/logger/index.js' -import type { PhotoProcessingLoggers } from '@afilmory/builder/photo/index.js' -import { createPhotoProcessingLoggers, setGlobalLoggers } from '@afilmory/builder/photo/index.js' +import { + createPhotoProcessingLoggers, + createStorageKeyNormalizer, + runWithPhotoExecutionContext, +} from '@afilmory/builder/photo/index.js' import type { _Object } from '@aws-sdk/client-s3' import { injectable } from 'tsyringe' @@ -40,7 +43,6 @@ export type ProcessPhotoOptions = { export class PhotoBuilderService { private readonly baseLogger = coreLogger.extend('PhotoBuilder') private readonly builderLogger: BuilderLogger = createBuilderLoggerAdapter(this.baseLogger) - private photoLoggers: PhotoProcessingLoggers | null = null createBuilder(config: BuilderConfig): AfilmoryBuilder { const enhancedConfig = this.ensureThumbnailPlugin(config) @@ -56,7 +58,6 @@ export class PhotoBuilderService { options?: ProcessPhotoOptions, ): Promise>> { const { existingItem, livePhotoMap, processorOptions, builder, builderConfig } = options ?? {} - this.ensureGlobalPhotoLoggers() const activeBuilder = this.resolveBuilder(builder, builderConfig) await activeBuilder.ensurePluginsReady() @@ -65,6 +66,7 @@ export class PhotoBuilderService { ...processorOptions, } + const photoLoggers = createPhotoProcessingLoggers(0, this.builderLogger) const context: PhotoProcessingContext = { photoKey: object.key, obj: this.toLegacyObject(object), @@ -74,21 +76,22 @@ export class PhotoBuilderService { pluginData: {}, } - return await processPhotoWithPipeline( - context, - activeBuilder, - this.createPluginRuntime(activeBuilder, mergedOptions, builderConfig), + const runtime = this.createPluginRuntime(activeBuilder, mergedOptions, builderConfig) + const storageManager = activeBuilder.getStorageManager() + const storageConfig = activeBuilder.getConfig().storage + + return await runWithPhotoExecutionContext( + { + builder: activeBuilder, + storageManager, + storageConfig, + normalizeStorageKey: createStorageKeyNormalizer(storageConfig), + loggers: photoLoggers, + }, + async () => await processPhotoWithPipeline(context, runtime), ) } - private ensureGlobalPhotoLoggers(): void { - if (!this.photoLoggers) { - this.photoLoggers = createPhotoProcessingLoggers(0, this.builderLogger) - } - - setGlobalLoggers(this.photoLoggers) - } - private resolveBuilder(builder?: AfilmoryBuilder, builderConfig?: BuilderConfig): AfilmoryBuilder { if (builder) { return builder @@ -153,8 +156,7 @@ export class PhotoBuilderService { if (typeof entry === 'object' && entry !== null && THUMBNAIL_PLUGIN_SYMBOL in entry) { return true } - // Fallback: check by name property for backward compatibility - return entry?.name === 'afilmory:thumbnail-storage' + return false }) if (hasPlugin) { diff --git a/packages/builder/src/builder/builder.ts b/packages/builder/src/builder/builder.ts index f38aa566..eb57cf75 100644 --- a/packages/builder/src/builder/builder.ts +++ b/packages/builder/src/builder/builder.ts @@ -19,6 +19,7 @@ import type { BuilderConfig } from '../types/config.js' import type { AfilmoryManifest, CameraInfo, LensInfo } from '../types/manifest.js' import type { PhotoManifestItem, ProcessPhotoResult } from '../types/photo.js' import { ClusterPool } from '../worker/cluster-pool.js' +import type { TaskCompletedPayload } from '../worker/pool.js' import { WorkerPool } from '../worker/pool.js' export interface BuilderOptions { @@ -26,6 +27,7 @@ export interface BuilderOptions { isForceManifest: boolean isForceThumbnails: boolean concurrencyLimit?: number // 可选,如果未提供则使用配置文件中的默认值 + progressListener?: BuildProgressListener } export interface BuilderResult { @@ -37,6 +39,29 @@ export interface BuilderResult { totalPhotos: number } +export interface BuildProgressStartPayload { + total: number + mode: 'worker' | 'cluster' + concurrency: number +} + +export interface BuildProgressSnapshot { + total: number + completed: number + newCount: number + processedCount: number + skippedCount: number + failedCount: number + currentKey?: string +} + +export interface BuildProgressListener { + onStart?: (payload: BuildProgressStartPayload) => void + onProgress?: (snapshot: BuildProgressSnapshot) => void + onComplete?: (summary: BuildProgressSnapshot) => void + onError?: (error: unknown) => void +} + export class AfilmoryBuilder { private storageManager: StorageManager | null = null private config: BuilderConfig @@ -51,13 +76,6 @@ export class AfilmoryBuilder { this.pluginManager = new PluginManager(this.pluginReferences, { baseDir: process.cwd(), }) - - // 配置日志级别(保留接口以便未来扩展) - this.configureLogging() - } - - private configureLogging(): void { - // 日志配置在 logger 模块中处理,保留方法以兼容未来扩展 } async buildManifest(options: BuilderOptions): Promise { @@ -82,6 +100,7 @@ export class AfilmoryBuilder { let processedCount = 0 let skippedCount = 0 let newCount = 0 + let failedCount = 0 let deletedCount = 0 try { @@ -189,6 +208,7 @@ export class AfilmoryBuilder { const concurrency = options.concurrencyLimit ?? this.config.options.defaultConcurrency const { useClusterMode } = this.config.performance.worker const shouldUseCluster = useClusterMode && tasksToProcess.length >= concurrency * 2 + const {progressListener} = options await this.emitPluginEvent(runState, 'beforeProcessTasks', { options, @@ -213,12 +233,72 @@ export class AfilmoryBuilder { manifest.push(item) } } else { + const totalTasks = tasksToProcess.length + let completedTaskCount = 0 + + const applyResultCounters = (result: ProcessPhotoResult | null | undefined): void => { + if (!result) return + + switch (result.type) { + case 'new': { + newCount++ + processedCount++ + break + } + case 'processed': { + processedCount++ + break + } + case 'skipped': { + skippedCount++ + break + } + case 'failed': { + failedCount++ + break + } + } + } + + const emitProgress = (currentKey?: string): void => { + progressListener?.onProgress?.({ + total: totalTasks, + completed: completedTaskCount, + newCount, + processedCount, + skippedCount, + failedCount, + currentKey, + }) + } + + const handleTaskCompleted = ({ + result, + taskIndex, + completed, + }: TaskCompletedPayload): void => { + if (result) { + applyResultCounters(result) + } + + completedTaskCount = completed + const key = tasksToProcess[taskIndex]?.key + emitProgress(key) + } + + progressListener?.onStart?.({ + total: totalTasks, + mode: shouldUseCluster ? 'cluster' : 'worker', + concurrency, + }) + emitProgress() + + let results: ProcessPhotoResult[] + logger.main.info( `开始${shouldUseCluster ? '多进程' : '并发'}处理任务,${shouldUseCluster ? '进程' : 'Worker'}数:${concurrency}${shouldUseCluster ? `,每进程并发:${this.config.performance.worker.workerConcurrency}` : ''}`, ) - let results: ProcessPhotoResult[] - if (shouldUseCluster) { const clusterPool = new ClusterPool({ concurrency, @@ -235,6 +315,7 @@ export class AfilmoryBuilder { imageObjects: tasksToProcess, builderConfig: this.getConfig(), }, + onTaskCompleted: handleTaskCompleted, }) results = await clusterPool.execute() @@ -242,6 +323,7 @@ export class AfilmoryBuilder { const workerPool = new WorkerPool({ concurrency, totalTasks: tasksToProcess.length, + onTaskCompleted: handleTaskCompleted, }) results = await workerPool.execute(async (taskIndex, workerId) => { @@ -294,24 +376,19 @@ export class AfilmoryBuilder { }) manifest.push(result.item) - - switch (result.type) { - case 'new': { - newCount++ - processedCount++ - break - } - case 'processed': { - processedCount++ - break - } - case 'skipped': { - skippedCount++ - break - } - } } + completedTaskCount = Math.max(completedTaskCount, totalTasks) + emitProgress() + progressListener?.onComplete?.({ + total: totalTasks, + completed: completedTaskCount, + newCount, + processedCount, + skippedCount, + failedCount, + }) + for (const [key, item] of existingManifestMap) { if (s3ImageKeys.has(key) && !manifest.some((m) => m.s3Key === key)) { await this.emitPluginEvent(runState, 'beforeAddManifestItem', { @@ -327,6 +404,17 @@ export class AfilmoryBuilder { } } + if (tasksToProcess.length === 0 && progressListener) { + progressListener.onComplete?.({ + total: 0, + completed: 0, + newCount, + processedCount, + skippedCount, + failedCount, + }) + } + await this.emitPluginEvent(runState, 'afterProcessTasks', { options, tasks: tasksToProcess, @@ -399,6 +487,7 @@ export class AfilmoryBuilder { return result } catch (error) { + options.progressListener?.onError?.(error) await this.emitPluginEvent(runState, 'onError', { options, error, diff --git a/packages/builder/src/cli.ts b/packages/builder/src/cli.ts index b9264939..084d4c21 100644 --- a/packages/builder/src/cli.ts +++ b/packages/builder/src/cli.ts @@ -6,11 +6,14 @@ import { join } from 'node:path' import process from 'node:process' import { fileURLToPath } from 'node:url' +import type { BuildProgressListener } from './builder/builder.js' import { AfilmoryBuilder } from './builder/index.js' import { loadBuilderConfig } from './config/index.js' -import { logger } from './logger/index.js' +import { logger, setLogListener } from './logger/index.js' import { runAsWorker } from './runAsWorker.js' +type BuilderTUI = import('./cli/tui.js').BuilderTUI + async function main() { // 检查是否作为 cluster worker 运行 if (process.env.CLUSTER_WORKER === 'true' || process.argv.includes('--cluster-worker') || cluster.isWorker) { @@ -29,6 +32,7 @@ async function main() { const isForceMode = args.has('--force') const isForceManifest = args.has('--force-manifest') const isForceThumbnails = args.has('--force-thumbnails') + const disableUi = args.has('--no-ui') // 显示帮助信息 if (args.has('--help') || args.has('-h')) { @@ -43,6 +47,7 @@ async function main() { --force-thumbnails 强制重新生成缩略图 --config 显示当前配置信息 --help, -h 显示帮助信息 + --no-ui 使用传统日志输出(禁用 TUI) 示例: tsx src/core/cli.ts # 增量更新 @@ -118,6 +123,24 @@ async function main() { const concurrencyLimit = config.performance.worker.workerCount const finalConcurrency = concurrencyLimit ?? config.options.defaultConcurrency const processingMode = config.performance.worker.useClusterMode ? '多进程集群' : '并发线程池' + const processingModeKey = config.performance.worker.useClusterMode ? 'cluster' : 'worker' + + const useTui = process.stdout.isTTY && !disableUi + let tui: BuilderTUI | null = null + let progressListener: BuildProgressListener | undefined + + if (useTui) { + const { BuilderTUI } = await import('./cli/tui.js') + tui = new BuilderTUI() + tui.attach() + tui.setRunMetadata({ + runMode, + concurrency: finalConcurrency, + processingMode: processingModeKey, + }) + progressListener = tui.createProgressListener() + setLogListener((message) => tui?.handleLog(message), { forwardToConsole: false }) + } logger.main.info(`🚀 运行模式:${runMode}`) logger.main.info(`⚡ 最大并发数:${finalConcurrency}`) @@ -127,12 +150,25 @@ async function main() { environmentCheck() // 启动构建过程 - await cliBuilder.buildManifest({ - isForceMode, - isForceManifest, - isForceThumbnails, - concurrencyLimit, - }) + try { + const result = await cliBuilder.buildManifest({ + isForceMode, + isForceManifest, + isForceThumbnails, + concurrencyLimit, + progressListener, + }) + + tui?.markSuccess(result) + } catch (error) { + tui?.markError(error) + throw error + } finally { + if (useTui) { + setLogListener(null, { forwardToConsole: true }) + tui?.detach() + } + } // eslint-disable-next-line unicorn/no-process-exit process.exit(0) diff --git a/packages/builder/src/cli/tui.ts b/packages/builder/src/cli/tui.ts new file mode 100644 index 00000000..7e1edc6f --- /dev/null +++ b/packages/builder/src/cli/tui.ts @@ -0,0 +1,322 @@ +/* eslint-disable unicorn/prefer-single-call */ +import { format as utilFormat } from 'node:util' + +import type { + BuilderResult, + BuildProgressListener, + BuildProgressSnapshot, + BuildProgressStartPayload, +} from '../builder/builder.js' +import type { LogMessage } from '../logger/index.js' + +type RunStatus = 'idle' | 'running' | 'success' | 'error' + +interface RunMetadata { + runMode: string + concurrency: number + processingMode: 'cluster' | 'worker' +} + +interface ProgressState extends BuildProgressSnapshot {} + +interface SummaryState { + durationMs: number + result: BuilderResult +} + +interface TuiState { + status: RunStatus + runMeta: RunMetadata | null + progress: ProgressState | null + summary: SummaryState | null + error: string | null + logs: string[] +} + +const MAX_LOG_LINES = 15 + +export class BuilderTUI { + private readonly stream: NodeJS.WriteStream + private state: TuiState = { + status: 'idle', + runMeta: null, + progress: null, + summary: null, + error: null, + logs: [], + } + private lastFrame = '' + private isAttached = false + private startTimestamp: number | null = null + + constructor(options: { stream?: NodeJS.WriteStream } = {}) { + this.stream = options.stream ?? process.stdout + } + + attach(): void { + if (this.isAttached) return + this.isAttached = true + this.hideCursor() + this.render(true) + } + + detach(): void { + if (!this.isAttached) return + this.clearScreen() + this.showCursor() + this.isAttached = false + this.lastFrame = '' + + if (this.state.status === 'success' && this.state.summary) { + const summaryLine = this.formatSuccessSummary(this.state.summary) + this.stream.write(`${summaryLine}\n`) + } else if (this.state.status === 'error' && this.state.error) { + this.stream.write(`${color('构建失败:', 'red')}${this.state.error}\n`) + } + } + + setRunMetadata(meta: RunMetadata): void { + this.state.runMeta = meta + this.render() + } + + handleLog(message: LogMessage): void { + const timestamp = formatTime(message.timestamp) + const levelLabel = formatLevel(message.level) + const formatted = utilFormat(...(message.args as any[])) + const line = `${color(timestamp, 'gray')} ${levelLabel} ${color(`[${message.tag}]`, 'blue')} ${formatted}` + this.state.logs = [...this.state.logs, line].slice(-MAX_LOG_LINES) + this.render() + } + + createProgressListener(): BuildProgressListener { + return { + onStart: (payload: BuildProgressStartPayload) => { + this.startTimestamp = Date.now() + this.state.status = 'running' + this.state.progress = { + total: payload.total, + completed: 0, + newCount: 0, + processedCount: 0, + skippedCount: 0, + failedCount: 0, + } + this.render(true) + }, + onProgress: (snapshot: BuildProgressSnapshot) => { + this.state.progress = { ...snapshot } + this.render() + }, + onComplete: (snapshot: BuildProgressSnapshot) => { + this.state.progress = { ...snapshot } + this.render() + }, + onError: (error: unknown) => { + this.markError(error) + }, + } + } + + markSuccess(result: BuilderResult): void { + const durationMs = this.startTimestamp ? Date.now() - this.startTimestamp : 0 + this.state.status = 'success' + this.state.summary = { durationMs, result } + this.render() + } + + markError(error: unknown): void { + const message = error instanceof Error ? error.message : String(error) + this.state.status = 'error' + this.state.error = message + this.render() + } + + private render(force = false): void { + if (!this.isAttached) return + + const frame = this.composeFrame() + if (!force && frame === this.lastFrame) { + return + } + this.lastFrame = frame + + this.clearScreen() + this.stream.write(frame) + } + + private composeFrame(): string { + const columns = this.stream.columns ?? 80 + const lines: string[] = [] + + lines.push(this.composeHeader(columns)) + lines.push('') + lines.push(this.composeProgress(columns)) + lines.push('') + lines.push(this.composeStatusLine()) + lines.push('') + lines.push(color('最近日志:', 'gray')) + lines.push(...(this.state.logs.length > 0 ? this.state.logs : [color('暂无日志输出…', 'gray')])) + + return lines.join('\n') + } + + private composeHeader(columns: number): string { + const meta = this.state.runMeta + if (!meta) { + return color('📸 Afilmory Builder', 'cyan') + } + const parts = [ + color('📸 Afilmory Builder', 'cyan'), + `模式: ${color(meta.runMode, 'green')}`, + `处理: ${color(meta.processingMode === 'cluster' ? '多进程集群' : '并发线程池', 'green')}`, + `并发: ${color(String(meta.concurrency), 'green')}`, + ] + const header = parts.join(' · ') + return header.length > columns ? header.slice(0, columns - 1) : header + } + + private composeProgress(columns: number): string { + const { progress } = this.state + if (!progress) { + switch (this.state.status) { + case 'success': { + return color('构建已完成', 'green') + } + case 'error': { + return color('构建出现错误', 'red') + } + case 'running': { + return color('准备处理中…', 'yellow') + } + default: { + return color('等待开始…', 'yellow') + } + } + } + + const barWidth = Math.max(10, Math.min(40, columns - 40)) + const ratio = progress.total > 0 ? Math.min(1, Math.max(0, progress.completed / progress.total)) : 1 + const filledLength = Math.round(ratio * barWidth) + const bar = `${'█'.repeat(filledLength)}${'░'.repeat(Math.max(0, barWidth - filledLength))}` + const percent = progress.total > 0 ? (ratio * 100).toFixed(1) : '100' + const segments = [`新 ${progress.newCount}`, `更新 ${progress.processedCount}`, `跳过 ${progress.skippedCount}`] + if (progress.failedCount > 0) { + segments.push(`失败 ${progress.failedCount}`) + } + const counts = segments.join(' · ') + + const parts = [`处理照片 [${bar}] ${percent}% (${progress.completed}/${progress.total})`, counts] + if (progress.currentKey) { + parts.push(`当前: ${truncateMiddle(progress.currentKey, columns - 10)}`) + } + + return parts.join('\n') + } + + private composeStatusLine(): string { + switch (this.state.status) { + case 'success': { + if (this.state.summary) { + return this.formatSuccessSummary(this.state.summary) + } + return color('构建完成', 'green') + } + case 'error': { + return color(`构建失败:${this.state.error ?? '未知错误'}`, 'red') + } + case 'running': { + return color('构建进行中…', 'yellow') + } + default: { + return color('等待开始…', 'gray') + } + } + } + + private formatSuccessSummary(summary: SummaryState): string { + const { result, durationMs } = summary + const durationSeconds = Math.round(durationMs / 1000) + return color( + `✅ 构建成功 · 照片 ${result.totalPhotos} · 新增 ${result.newCount} · 更新 ${result.processedCount} · 删除 ${result.deletedCount} · 耗时 ${formatDuration(durationSeconds)}`, + 'green', + ) + } + + private clearScreen(): void { + this.stream.write('\u001b[0;0H') + this.stream.write('\u001b[2J') + } + + private hideCursor(): void { + this.stream.write('\u001b[?25l') + } + + private showCursor(): void { + this.stream.write('\u001b[?25h') + } +} + +function color(text: string, tone: 'cyan' | 'green' | 'yellow' | 'red' | 'gray' | 'blue' | 'magenta'): string { + const codes: Record = { + cyan: '\u001b[36m', + green: '\u001b[32m', + yellow: '\u001b[33m', + red: '\u001b[31m', + gray: '\u001b[90m', + blue: '\u001b[34m', + magenta: '\u001b[35m', + } as const + return `${codes[tone]}${text}\u001b[0m` +} + +function formatLevel(level: string): string { + const normalized = level.toLowerCase() + switch (normalized) { + case 'error': + case 'fatal': { + return color('ERR', 'red') + } + case 'warn': { + return color('WRN', 'yellow') + } + case 'success': { + return color('SUC', 'green') + } + case 'debug': + case 'trace': { + return color('DBG', 'magenta') + } + case 'start': { + return color('STA', 'cyan') + } + default: { + return color(normalized.slice(0, 3).toUpperCase(), 'gray') + } + } +} + +function formatTime(date: Date): string { + const hours = `${date.getHours()}`.padStart(2, '0') + const minutes = `${date.getMinutes()}`.padStart(2, '0') + const seconds = `${date.getSeconds()}`.padStart(2, '0') + return `${hours}:${minutes}:${seconds}` +} + +function formatDuration(totalSeconds: number): string { + const minutes = Math.floor(totalSeconds / 60) + const seconds = totalSeconds % 60 + if (minutes === 0) { + return `${seconds}秒` + } + return `${minutes}分${seconds}秒` +} + +function truncateMiddle(value: string, maxLength: number): string { + const limit = Math.max(10, maxLength) + if (value.length <= limit) { + return value + } + const half = Math.max(1, Math.floor((limit - 1) / 2)) + return `${value.slice(0, half)}…${value.slice(value.length - half)}` +} diff --git a/packages/builder/src/logger/index.ts b/packages/builder/src/logger/index.ts index 6b56c028..ab3c2a63 100644 --- a/packages/builder/src/logger/index.ts +++ b/packages/builder/src/logger/index.ts @@ -1,23 +1,89 @@ +import type {ConsolaInstance} from 'consola'; import consola from 'consola' -// 创建系统化的日志器 +export type LogLevel = 'log' | 'info' | 'success' | 'warn' | 'error' | 'debug' | 'trace' | 'start' | 'fatal' + +export interface LogMessage { + tag: string + level: LogLevel | string + args: unknown[] + timestamp: Date +} + +type LogListener = (message: LogMessage) => void + +let listener: LogListener | null = null +let forwardToConsole = true + +export function setLogListener(newListener: LogListener | null, options: { forwardToConsole?: boolean } = {}): void { + listener = newListener + if (options.forwardToConsole !== undefined) { + forwardToConsole = options.forwardToConsole + } +} + +export function setConsoleForwarding(enabled: boolean): void { + forwardToConsole = enabled +} + +function notifyListener(tag: string, level: LogLevel | string, args: unknown[]): void { + listener?.({ + tag, + level, + args, + timestamp: new Date(), + }) +} + +function combineTags(parentTag: string, childTag: string): string { + if (!parentTag) return childTag + return `${parentTag}/${childTag}` +} + +function wrapInstance(instance: ConsolaInstance, tag: string): ConsolaInstance { + return new Proxy(instance, { + get(target, property, receiver) { + const value = Reflect.get(target, property, receiver) + + if (property === 'withTag') { + return (...args: any[]) => { + const modifier = String(args[0] ?? '') + const nextTag = modifier ? combineTags(tag, modifier) : tag + const nextInstance = Reflect.apply(value, target, args) as ConsolaInstance + return wrapInstance(nextInstance, nextTag) + } + } + + if (typeof value !== 'function') { + return value + } + + return (...args: any[]) => { + notifyListener(tag, String(property), args) + + if (forwardToConsole) { + return Reflect.apply(value, target, args) + } + + return + } + }, + }) +} + +function createTaggedLogger(tag: string): ConsolaInstance { + return wrapInstance(consola.withTag(tag), tag) +} + export const logger = { - // 主进程日志 - main: consola.withTag('MAIN'), - // S3 操作日志 - s3: consola.withTag('S3'), - // 图片处理日志 - image: consola.withTag('IMAGE'), - // 缩略图处理日志 - thumbnail: consola.withTag('THUMBNAIL'), - // Blurhash 处理日志 - blurhash: consola.withTag('BLURHASH'), - // EXIF 处理日志 - exif: consola.withTag('EXIF'), - // 文件系统操作日志 - fs: consola.withTag('FS'), - // Worker 日志(动态创建) - worker: (id: number) => consola.withTag(`WORKER-${id}`), + main: createTaggedLogger('MAIN'), + s3: createTaggedLogger('S3'), + image: createTaggedLogger('IMAGE'), + thumbnail: createTaggedLogger('THUMBNAIL'), + blurhash: createTaggedLogger('BLURHASH'), + exif: createTaggedLogger('EXIF'), + fs: createTaggedLogger('FS'), + worker: (id: number) => createTaggedLogger(`WORKER-${id}`), } export type Logger = typeof logger diff --git a/packages/builder/src/photo/README.md b/packages/builder/src/photo/README.md index 5223bafa..8bd37518 100644 --- a/packages/builder/src/photo/README.md +++ b/packages/builder/src/photo/README.md @@ -50,7 +50,7 @@ class CompatibleLoggerAdapter implements PhotoLogger { ### 主要改进 1. **模块化分离**: 将不同的处理逻辑分离到专门的模块中 -2. **Logger 适配器**: 不再通过参数传递 logger,使用全局 logger 适配器 +2. **Logger 适配器**: 使用异步执行上下文管理 logger,避免全局状态污染 3. **缓存管理**: 统一管理各种数据的缓存和复用逻辑 4. **Live Photo 处理**: 专门的模块处理 Live Photo 检测和匹配 5. **类型安全**: 完善的 TypeScript 类型定义 @@ -60,13 +60,10 @@ class CompatibleLoggerAdapter implements PhotoLogger { #### 基本使用 ```typescript -import { processPhoto, setGlobalLoggers, createPhotoProcessingLoggers } from './index.js' +import { processPhoto, createPhotoProcessingLoggers } from './index.js' -// 设置全局 logger const loggers = createPhotoProcessingLoggers(workerId, baseLogger) -setGlobalLoggers(loggers) -// 处理照片 const result = await processPhoto( obj, index, @@ -76,6 +73,7 @@ const result = await processPhoto( livePhotoMap, options, builder, + pluginRuntime, ) ``` diff --git a/packages/builder/src/photo/execution-context.ts b/packages/builder/src/photo/execution-context.ts new file mode 100644 index 00000000..8dd2763b --- /dev/null +++ b/packages/builder/src/photo/execution-context.ts @@ -0,0 +1,80 @@ +import { AsyncLocalStorage } from 'node:async_hooks' + +import type { AfilmoryBuilder } from '../builder/builder.js' +import type { StorageManager } from '../storage/index.js' +import type { StorageConfig } from '../storage/interfaces.js' +import type { PhotoProcessingLoggers } from './logger-adapter.js' + +export interface PhotoExecutionContext { + builder: AfilmoryBuilder + storageManager: StorageManager + storageConfig: StorageConfig + normalizeStorageKey: (key: string) => string + loggers?: PhotoProcessingLoggers +} + +const photoContextStorage = new AsyncLocalStorage() + +export function runWithPhotoExecutionContext( + context: PhotoExecutionContext, + callback: () => T | Promise, +): T | Promise { + return photoContextStorage.run(context, callback) +} + +export function getPhotoExecutionContext(): PhotoExecutionContext { + const context = photoContextStorage.getStore() + if (!context) { + throw new Error('Photo execution context is not available') + } + return context +} + +function sanitizeStoragePath(value: string | undefined | null): string { + if (!value) return '' + return value.replaceAll('\\', '/').replaceAll(/\/+/g, '/').replace(/^\/+/, '').replace(/\/+$/, '') +} + +/** + * 创建一个用于标准化存储键值的函数 + * 会去除配置中的 prefix/path 等前缀,并统一路径分隔符 + */ +export function createStorageKeyNormalizer(storageConfig: StorageConfig): (key: string) => string { + let basePrefix = '' + + switch (storageConfig.provider) { + case 's3': { + basePrefix = sanitizeStoragePath(storageConfig.prefix) + break + } + case 'github': { + basePrefix = sanitizeStoragePath(storageConfig.path) + break + } + default: { + basePrefix = '' + } + } + + const prefixWithSlash = basePrefix ? `${basePrefix}/` : '' + + return (rawKey: string): string => { + if (!rawKey) return '' + + const sanitizedKey = rawKey.replaceAll('\\', '/').replaceAll(/\/+/g, '/').replace(/^\/+/, '') + + if (!basePrefix) { + return sanitizedKey + } + + if (sanitizedKey === basePrefix) { + return '' + } + + if (sanitizedKey.startsWith(prefixWithSlash)) { + return sanitizedKey.slice(prefixWithSlash.length) + } + + return sanitizedKey + } +} diff --git a/packages/builder/src/photo/image-pipeline.ts b/packages/builder/src/photo/image-pipeline.ts index eeeb3a07..6a5fa76b 100644 --- a/packages/builder/src/photo/image-pipeline.ts +++ b/packages/builder/src/photo/image-pipeline.ts @@ -5,7 +5,7 @@ import { compressUint8Array } from '@afilmory/utils' import type { _Object } from '@aws-sdk/client-s3' import sharp from 'sharp' -import type { AfilmoryBuilder, BuilderOptions } from '../builder/builder.js' +import type { BuilderOptions } from '../builder/builder.js' import { convertBmpToJpegSharpInstance, getImageMetadataWithSharp, @@ -17,6 +17,7 @@ import { THUMBNAIL_PLUGIN_DATA_KEY } from '../plugins/thumbnail-storage/shared.j import type { PhotoManifestItem, ProcessPhotoResult } from '../types/photo.js' import { shouldProcessPhoto } from './cache-manager.js' import { processExifData, processThumbnailAndBlurhash, processToneAnalysis } from './data-processors.js' +import { getPhotoExecutionContext } from './execution-context.js' import { extractPhotoInfo } from './info-extractor.js' import { processLivePhoto } from './live-photo-handler.js' import { getGlobalLoggers } from './logger-adapter.js' @@ -43,13 +44,13 @@ export interface PhotoProcessingContext { */ export async function preprocessImage( photoKey: string, - builder: AfilmoryBuilder, ): Promise<{ rawBuffer: Buffer; processedBuffer: Buffer } | null> { const loggers = getGlobalLoggers() + const { storageManager } = getPhotoExecutionContext() try { // 获取图片数据 - const rawImageBuffer = await builder.getStorageManager().getFile(photoKey) + const rawImageBuffer = await storageManager.getFile(photoKey) if (!rawImageBuffer) { loggers.image.error(`无法获取图片数据:${photoKey}`) return null @@ -122,7 +123,8 @@ export async function processImageWithSharp(imageBuffer: Buffer, photoKey: strin * @param s3Key S3 键 * @returns 带摘要后缀的 ID */ -async function generatePhotoId(s3Key: string, builder: AfilmoryBuilder): Promise { +async function generatePhotoId(s3Key: string): Promise { + const { builder } = getPhotoExecutionContext() const { options } = builder.getConfig() const { digestSuffixLength } = options if (!digestSuffixLength || digestSuffixLength <= 0) { @@ -141,17 +143,16 @@ async function generatePhotoId(s3Key: string, builder: AfilmoryBuilder): Promise */ export async function executePhotoProcessingPipeline( context: PhotoProcessingContext, - builder: AfilmoryBuilder, ): Promise { const { photoKey, obj, existingItem, livePhotoMap, options } = context + const { storageManager } = getPhotoExecutionContext() const loggers = getGlobalLoggers() // Generate the actual photo ID with digest suffix - const photoId = await generatePhotoId(photoKey, builder) - const storageManager = builder.getStorageManager() + const photoId = await generatePhotoId(photoKey) try { // 1. 预处理图片 - const imageData = await preprocessImage(photoKey, builder) + const imageData = await preprocessImage(photoKey) if (!imageData) return null // 2. 处理图片并创建 Sharp 实例 @@ -223,7 +224,6 @@ export async function executePhotoProcessingPipeline( */ export async function processPhotoWithPipeline( context: PhotoProcessingContext, - builder: AfilmoryBuilder, runtime: { runState: PluginRunState; builderOptions: BuilderOptions }, ): Promise<{ item: PhotoManifestItem | null @@ -231,9 +231,10 @@ export async function processPhotoWithPipeline( pluginData: Record }> { const { photoKey, existingItem, obj, options } = context + const { builder } = getPhotoExecutionContext() const loggers = getGlobalLoggers() - const photoId = await generatePhotoId(photoKey, builder) + const photoId = await generatePhotoId(photoKey) await builder.emitPluginEvent(runtime.runState, 'beforePhotoProcess', { options: runtime.builderOptions, @@ -270,7 +271,7 @@ export async function processPhotoWithPipeline( let resultType: ProcessPhotoResult['type'] = isNewPhoto ? 'new' : 'processed' try { - processedItem = await executePhotoProcessingPipeline(context, builder) + processedItem = await executePhotoProcessingPipeline(context) if (!processedItem) { resultType = 'failed' } diff --git a/packages/builder/src/photo/index.ts b/packages/builder/src/photo/index.ts index 8c6d2892..14f98af8 100644 --- a/packages/builder/src/photo/index.ts +++ b/packages/builder/src/photo/index.ts @@ -14,6 +14,13 @@ export { processExifData, processThumbnailAndBlurhash, processToneAnalysis } fro export type { LivePhotoResult } from './live-photo-handler.js' export { createLivePhotoMap, processLivePhoto } from './live-photo-handler.js' +// 执行上下文 +export { + createStorageKeyNormalizer, + getPhotoExecutionContext, + runWithPhotoExecutionContext, +} from './execution-context.js' + // Logger 适配器 export type { PhotoLogger, PhotoProcessingLoggers } from './logger-adapter.js' export { diff --git a/packages/builder/src/photo/info-extractor.ts b/packages/builder/src/photo/info-extractor.ts index 390ab027..a15f6780 100644 --- a/packages/builder/src/photo/info-extractor.ts +++ b/packages/builder/src/photo/info-extractor.ts @@ -1,17 +1,21 @@ import path from 'node:path' -import { env } from '@env' - import type { PhotoInfo, PickedExif } from '../types/photo.js' +import { getPhotoExecutionContext } from './execution-context.js' import { getGlobalLoggers } from './logger-adapter.js' // 从文件名提取照片信息 export function extractPhotoInfo(key: string, exifData?: PickedExif | null): PhotoInfo { const log = getGlobalLoggers().image + const { normalizeStorageKey } = getPhotoExecutionContext() log.info(`提取照片信息:${key}`) - const fileName = path.basename(key, path.extname(key)) + const sanitizedKey = key.replaceAll('\\', '/') + const relativeKey = normalizeStorageKey(sanitizedKey) + const keyForParsing = relativeKey || sanitizedKey + const extname = path.posix.extname(keyForParsing) + const fileName = path.posix.basename(keyForParsing, extname) // 尝试从文件名解析信息,格式示例:"2024-01-15_城市夜景_1250views" let title = fileName @@ -20,22 +24,16 @@ export function extractPhotoInfo(key: string, exifData?: PickedExif | null): Pho let tags: string[] = [] // 从目录路径中提取 tags - const dirPath = path.dirname(key) - if (dirPath && dirPath !== '.' && dirPath !== '/') { - // 移除前缀(如果有的话) - let relativePath = dirPath - if (env.S3_PREFIX && dirPath.startsWith(env.S3_PREFIX)) { - relativePath = dirPath.slice(env.S3_PREFIX.length) - } + const dirPathRaw = relativeKey ? path.posix.dirname(relativeKey) : path.posix.dirname(keyForParsing) + const dirPath = dirPathRaw === '.' || dirPathRaw === '/' ? '' : dirPathRaw + if (dirPath) { + const relativePath = dirPath.replaceAll(/^\/+|\/+$/g, '') - // 清理路径分隔符 - relativePath = relativePath.replaceAll(/^\/+|\/+$/g, '') - - if (relativePath) { - // 分割路径并过滤空字符串 - const pathParts = relativePath.split('/').filter((part) => part.trim() !== '') - tags = pathParts.map((part) => part.trim()) + // 分割路径并过滤空字符串 + const pathParts = relativePath.split('/').filter((part) => part.trim() !== '') + tags = pathParts.map((part) => part.trim()) + if (tags.length > 0) { log.info(`从路径提取标签:[${tags.join(', ')}]`) } } @@ -80,7 +78,7 @@ export function extractPhotoInfo(key: string, exifData?: PickedExif | null): Pho // 如果标题为空,使用文件名 if (!title) { - title = path.basename(key, path.extname(key)) + title = path.posix.basename(keyForParsing, extname) } log.info(`照片信息提取完成:"${title}"`) diff --git a/packages/builder/src/photo/logger-adapter.ts b/packages/builder/src/photo/logger-adapter.ts index 7fb2951a..1f800280 100644 --- a/packages/builder/src/photo/logger-adapter.ts +++ b/packages/builder/src/photo/logger-adapter.ts @@ -1,6 +1,7 @@ import type { ConsolaInstance } from 'consola' import type { Logger, WorkerLogger } from '../logger/index.js' +import { getPhotoExecutionContext } from './execution-context.js' /** * 通用 Logger 接口 @@ -118,23 +119,39 @@ export function createPhotoProcessingLoggers(workerId: number, baseLogger: Logge } /** - * 全局 Logger 实例 + * 遗留的全局 Logger(仅用于兼容旧代码) */ -let globalLoggers: PhotoProcessingLoggers | null = null +let legacyLoggers: PhotoProcessingLoggers | null = null +let hasWarnedLegacyLoggerUsage = false /** - * 设置全局 Logger + * @deprecated 使用执行上下文替代 */ export function setGlobalLoggers(loggers: PhotoProcessingLoggers): void { - globalLoggers = loggers + legacyLoggers = loggers } /** - * 获取全局 Logger + * 获取当前上下文中的 Logger 集合 + * 会优先从执行上下文中获取;若未初始化则回退到遗留的全局实例 */ export function getGlobalLoggers(): PhotoProcessingLoggers { - if (!globalLoggers) { - throw new Error('Global loggers not initialized. Call setGlobalLoggers first.') + try { + const context = getPhotoExecutionContext() + if (context.loggers) { + return context.loggers + } + } catch { + // 忽略上下文不存在的错误,继续尝试使用遗留 logger } - return globalLoggers + + if (legacyLoggers) { + if (!hasWarnedLegacyLoggerUsage) { + legacyLoggers.image.warn('使用遗留的全局 logger,请尽快迁移到执行上下文模式。') + hasWarnedLegacyLoggerUsage = true + } + return legacyLoggers + } + + throw new Error('Photo loggers not initialized. Ensure runWithPhotoExecutionContext is used.') } diff --git a/packages/builder/src/photo/processor.ts b/packages/builder/src/photo/processor.ts index 784bec1d..f0dd167b 100644 --- a/packages/builder/src/photo/processor.ts +++ b/packages/builder/src/photo/processor.ts @@ -4,9 +4,10 @@ import type { AfilmoryBuilder, BuilderOptions } from '../builder/builder.js' import { logger } from '../logger/index.js' import type { PluginRunState } from '../plugins/manager.js' import type { PhotoManifestItem, ProcessPhotoResult } from '../types/photo.js' +import { createStorageKeyNormalizer, runWithPhotoExecutionContext } from './execution-context.js' import type { PhotoProcessingContext } from './image-pipeline.js' import { processPhotoWithPipeline } from './image-pipeline.js' -import { createPhotoProcessingLoggers, setGlobalLoggers } from './logger-adapter.js' +import { createPhotoProcessingLoggers } from './logger-adapter.js' export interface PhotoProcessorOptions { isForceMode: boolean @@ -37,12 +38,6 @@ export async function processPhoto( const existingItem = existingManifestMap.get(key) - // 创建并设置全局 logger - const photoLoggers = createPhotoProcessingLoggers(workerId, logger) - setGlobalLoggers(photoLoggers) - - photoLoggers.image.info(`📸 [${index + 1}/${totalImages}] ${key}`) - // 构建处理上下文 const context: PhotoProcessingContext = { photoKey: key, @@ -53,6 +48,23 @@ export async function processPhoto( pluginData: {}, } - // 使用处理管道 - return await processPhotoWithPipeline(context, builder, pluginRuntime) + const storageManager = builder.getStorageManager() + const storageConfig = builder.getConfig().storage + const photoLoggers = createPhotoProcessingLoggers(workerId, logger) + + return await runWithPhotoExecutionContext( + { + builder, + storageManager, + storageConfig, + normalizeStorageKey: createStorageKeyNormalizer(storageConfig), + loggers: photoLoggers, + }, + async () => { + photoLoggers.image.info(`📸 [${index + 1}/${totalImages}] ${key}`) + + // 使用处理管道 + return await processPhotoWithPipeline(context, pluginRuntime) + }, + ) } diff --git a/packages/builder/src/plugins/loader.ts b/packages/builder/src/plugins/loader.ts index 9577b1ab..6f216911 100644 --- a/packages/builder/src/plugins/loader.ts +++ b/packages/builder/src/plugins/loader.ts @@ -25,6 +25,8 @@ interface NormalizedDescriptor { options?: unknown } +function normalizeDescriptor(ref: string): NormalizedDescriptor +function normalizeDescriptor(ref: BuilderPluginReference): NormalizedDescriptor | BuilderPluginESMImporter function normalizeDescriptor(ref: BuilderPluginReference): NormalizedDescriptor | BuilderPluginESMImporter { if (typeof ref === 'string') { return { specifier: ref } @@ -111,23 +113,23 @@ export async function loadPlugins( const results: LoadedPluginDefinition[] = [] for (const entry of entries) { + if (isPluginESMImporter(entry)) { + const { default: pluginFactoryOrPlugin } = await entry() + const plugin = await instantiatePlugin(pluginFactoryOrPlugin) + const hooks = normalizeHooks(plugin) + const name = plugin.name || `lazy-loaded-plugin-${results.length}` + + results.push({ + name, + hooks, + pluginOptions: undefined, + }) + continue + } + if (typeof entry === 'string') { const descriptor = normalizeDescriptor(entry) - if (isPluginESMImporter(descriptor)) { - const { default: pluginFactoryOrPlugin } = await descriptor() - const plugin = await instantiatePlugin(pluginFactoryOrPlugin) - const hooks = normalizeHooks(plugin) - const name = plugin.name || `lazy-loaded-plugin-${results.length}` - - results.push({ - name, - hooks, - pluginOptions: undefined, - }) - continue - } - const { resolvedPath } = resolveSpecifier(descriptor.specifier, baseDir) const mod = await importModule(resolvedPath) diff --git a/packages/builder/src/utils/clone.ts b/packages/builder/src/utils/clone.ts index c71aa1f9..b0640693 100644 --- a/packages/builder/src/utils/clone.ts +++ b/packages/builder/src/utils/clone.ts @@ -1,5 +1,69 @@ import { deserialize as v8Deserialize, serialize as v8Serialize } from 'node:v8' +function manualClone(value: T, seen = new WeakMap()): T { + if (value === null || typeof value !== 'object') { + return value + } + + if (typeof value === 'function') { + return value + } + + if (value instanceof Date) { + return new Date(value) as unknown as T + } + + if (value instanceof RegExp) { + return new RegExp(value.source, value.flags) as unknown as T + } + + if (value instanceof Map) { + const result = new Map() + seen.set(value, result) + for (const [key, val] of value.entries()) { + result.set(manualClone(key, seen), manualClone(val, seen)) + } + return result as unknown as T + } + + if (value instanceof Set) { + const result = new Set() + seen.set(value, result) + for (const item of value.values()) { + result.add(manualClone(item, seen)) + } + return result as unknown as T + } + + if (seen.has(value as object)) { + return seen.get(value as object) as T + } + + if (Array.isArray(value)) { + const result: unknown[] = [] + seen.set(value, result) + for (const item of value) { + result.push(manualClone(item, seen)) + } + return result as unknown as T + } + + const proto = Object.getPrototypeOf(value) + const result = Object.create(proto ?? Object.prototype) + seen.set(value as object, result) + + for (const key of Reflect.ownKeys(value)) { + const descriptor = Object.getOwnPropertyDescriptor(value, key) + if (!descriptor) continue + if ('value' in descriptor) { + descriptor.value = manualClone(descriptor.value, seen) + } + Object.defineProperty(result, key, descriptor) + } + + return result as T +} + export function clone(value: T): T { const maybeStructuredClone = ( globalThis as typeof globalThis & { @@ -8,8 +72,16 @@ export function clone(value: T): T { ).structuredClone if (typeof maybeStructuredClone === 'function') { - return maybeStructuredClone(value) + try { + return maybeStructuredClone(value) + } catch { + // Fall through to other clone strategies when structuredClone cannot handle the value + } } - return v8Deserialize(v8Serialize(value)) + try { + return v8Deserialize(v8Serialize(value)) + } catch { + return manualClone(value) + } } diff --git a/packages/builder/src/worker/cluster-pool.ts b/packages/builder/src/worker/cluster-pool.ts index 07dc49f5..db94a3ce 100644 --- a/packages/builder/src/worker/cluster-pool.ts +++ b/packages/builder/src/worker/cluster-pool.ts @@ -8,8 +8,9 @@ import { serialize } from 'node:v8' import type { Logger } from '../logger/index.js' import { logger } from '../logger/index.js' import type { BuilderConfig } from '../types/config.js' +import type { TaskCompletedPayload } from './pool.js' -export interface ClusterPoolOptions { +export interface ClusterPoolOptions { concurrency: number totalTasks: number workerEnv?: Record // 传递给 worker 的环境变量 @@ -21,6 +22,7 @@ export interface ClusterPoolOptions { imageObjects: any[] builderConfig: BuilderConfig } + onTaskCompleted?: (payload: TaskCompletedPayload) => void } export interface WorkerReadyMessage { @@ -78,7 +80,8 @@ export class ClusterPool extends EventEmitter { private workerEnv: Record private workerConcurrency: number private logger: Logger - private sharedData?: ClusterPoolOptions['sharedData'] + private sharedData?: ClusterPoolOptions['sharedData'] + private onTaskCompleted?: (payload: TaskCompletedPayload) => void private taskQueue: Array<{ taskIndex: number }> = [] private workers = new Map() @@ -95,7 +98,7 @@ export class ClusterPool extends EventEmitter { Map // taskId -> taskIndex >() // 跟踪每个 worker 正在处理的任务,以便在崩溃时重入队 - constructor(options: ClusterPoolOptions) { + constructor(options: ClusterPoolOptions) { super() this.concurrency = options.concurrency this.totalTasks = options.totalTasks @@ -103,6 +106,7 @@ export class ClusterPool extends EventEmitter { this.workerConcurrency = options.workerConcurrency || 5 // 默认每个 worker 同时处理 5 个任务 this.logger = logger this.sharedData = options.sharedData + this.onTaskCompleted = options.onTaskCompleted this.results = Array.from({ length: this.totalTasks }) } @@ -406,6 +410,13 @@ export class ClusterPool extends EventEmitter { successfulInBatch++ this.completedTasks++ + + this.onTaskCompleted?.({ + taskIndex, + completed: this.completedTasks, + total: this.totalTasks, + result: taskResult.result as T, + }) } else if (taskResult.type === 'error') { workerLogger.error(`任务执行失败:${taskResult.taskId}`, taskResult.error) pendingTask.reject(new Error(taskResult.error)) @@ -461,6 +472,12 @@ export class ClusterPool extends EventEmitter { stats.processedTasks++ this.completedTasks++ + this.onTaskCompleted?.({ + taskIndex, + completed: this.completedTasks, + total: this.totalTasks, + result: message.result as T, + }) workerLogger.info( `完成任务 ${taskIndex + 1}/${this.totalTasks} (已完成:${this.completedTasks},当前处理中:${newTaskCount})`, ) diff --git a/packages/builder/src/worker/pool.ts b/packages/builder/src/worker/pool.ts index a1ca76e0..0ae11758 100644 --- a/packages/builder/src/worker/pool.ts +++ b/packages/builder/src/worker/pool.ts @@ -1,9 +1,17 @@ import type { Logger } from '../logger/index.js' import { logger } from '../logger/index.js' -export interface WorkerPoolOptions { +export interface TaskCompletedPayload { + taskIndex: number + completed: number + total: number + result: T +} + +export interface WorkerPoolOptions { concurrency: number totalTasks: number + onTaskCompleted?: (payload: TaskCompletedPayload) => void } export type TaskFunction = (taskIndex: number, workerId: number) => Promise @@ -14,11 +22,14 @@ export class WorkerPool { private totalTasks: number private taskIndex = 0 private logger: Logger + private completedTasks = 0 + private onTaskCompleted?: (payload: TaskCompletedPayload) => void - constructor(options: WorkerPoolOptions) { + constructor(options: WorkerPoolOptions) { this.concurrency = options.concurrency this.totalTasks = options.totalTasks this.logger = logger + this.onTaskCompleted = options.onTaskCompleted } async execute(taskFunction: TaskFunction): Promise { @@ -45,6 +56,14 @@ export class WorkerPool { results[currentIndex] = result processedByWorker++ + this.completedTasks++ + + this.onTaskCompleted?.({ + taskIndex: currentIndex, + completed: this.completedTasks, + total: this.totalTasks, + result, + }) workerLogger.info(`完成任务 ${currentIndex + 1}/${this.totalTasks} - ${duration}ms`) }