feat(builder): implement photo execution context for improved logging and storage key normalization

- Introduced a new `PhotoExecutionContext` to manage execution context for photo processing, enhancing logger management and storage key normalization.
- Refactored photo processing methods to utilize the new context, replacing global logger usage with context-based loggers.
- Updated related modules to ensure compatibility with the new execution context, improving modularity and reducing global state dependencies.

Signed-off-by: Innei <tukon479@gmail.com>
This commit is contained in:
Innei
2025-10-31 23:36:42 +08:00
parent e27b45ec49
commit 3d7c528c45
16 changed files with 877 additions and 139 deletions

View File

@@ -14,8 +14,11 @@ import {
thumbnailStoragePlugin,
} from '@afilmory/builder'
import type { Logger as BuilderLogger } from '@afilmory/builder/logger/index.js'
import type { PhotoProcessingLoggers } from '@afilmory/builder/photo/index.js'
import { createPhotoProcessingLoggers, setGlobalLoggers } from '@afilmory/builder/photo/index.js'
import {
createPhotoProcessingLoggers,
createStorageKeyNormalizer,
runWithPhotoExecutionContext,
} from '@afilmory/builder/photo/index.js'
import type { _Object } from '@aws-sdk/client-s3'
import { injectable } from 'tsyringe'
@@ -40,7 +43,6 @@ export type ProcessPhotoOptions = {
export class PhotoBuilderService {
private readonly baseLogger = coreLogger.extend('PhotoBuilder')
private readonly builderLogger: BuilderLogger = createBuilderLoggerAdapter(this.baseLogger)
private photoLoggers: PhotoProcessingLoggers | null = null
createBuilder(config: BuilderConfig): AfilmoryBuilder {
const enhancedConfig = this.ensureThumbnailPlugin(config)
@@ -56,7 +58,6 @@ export class PhotoBuilderService {
options?: ProcessPhotoOptions,
): Promise<Awaited<ReturnType<typeof processPhotoWithPipeline>>> {
const { existingItem, livePhotoMap, processorOptions, builder, builderConfig } = options ?? {}
this.ensureGlobalPhotoLoggers()
const activeBuilder = this.resolveBuilder(builder, builderConfig)
await activeBuilder.ensurePluginsReady()
@@ -65,6 +66,7 @@ export class PhotoBuilderService {
...processorOptions,
}
const photoLoggers = createPhotoProcessingLoggers(0, this.builderLogger)
const context: PhotoProcessingContext = {
photoKey: object.key,
obj: this.toLegacyObject(object),
@@ -74,21 +76,22 @@ export class PhotoBuilderService {
pluginData: {},
}
return await processPhotoWithPipeline(
context,
activeBuilder,
this.createPluginRuntime(activeBuilder, mergedOptions, builderConfig),
const runtime = this.createPluginRuntime(activeBuilder, mergedOptions, builderConfig)
const storageManager = activeBuilder.getStorageManager()
const storageConfig = activeBuilder.getConfig().storage
return await runWithPhotoExecutionContext(
{
builder: activeBuilder,
storageManager,
storageConfig,
normalizeStorageKey: createStorageKeyNormalizer(storageConfig),
loggers: photoLoggers,
},
async () => await processPhotoWithPipeline(context, runtime),
)
}
private ensureGlobalPhotoLoggers(): void {
if (!this.photoLoggers) {
this.photoLoggers = createPhotoProcessingLoggers(0, this.builderLogger)
}
setGlobalLoggers(this.photoLoggers)
}
private resolveBuilder(builder?: AfilmoryBuilder, builderConfig?: BuilderConfig): AfilmoryBuilder {
if (builder) {
return builder
@@ -153,8 +156,7 @@ export class PhotoBuilderService {
if (typeof entry === 'object' && entry !== null && THUMBNAIL_PLUGIN_SYMBOL in entry) {
return true
}
// Fallback: check by name property for backward compatibility
return entry?.name === 'afilmory:thumbnail-storage'
return false
})
if (hasPlugin) {

View File

@@ -19,6 +19,7 @@ import type { BuilderConfig } from '../types/config.js'
import type { AfilmoryManifest, CameraInfo, LensInfo } from '../types/manifest.js'
import type { PhotoManifestItem, ProcessPhotoResult } from '../types/photo.js'
import { ClusterPool } from '../worker/cluster-pool.js'
import type { TaskCompletedPayload } from '../worker/pool.js'
import { WorkerPool } from '../worker/pool.js'
export interface BuilderOptions {
@@ -26,6 +27,7 @@ export interface BuilderOptions {
isForceManifest: boolean
isForceThumbnails: boolean
concurrencyLimit?: number // 可选,如果未提供则使用配置文件中的默认值
progressListener?: BuildProgressListener
}
export interface BuilderResult {
@@ -37,6 +39,29 @@ export interface BuilderResult {
totalPhotos: number
}
export interface BuildProgressStartPayload {
total: number
mode: 'worker' | 'cluster'
concurrency: number
}
export interface BuildProgressSnapshot {
total: number
completed: number
newCount: number
processedCount: number
skippedCount: number
failedCount: number
currentKey?: string
}
export interface BuildProgressListener {
onStart?: (payload: BuildProgressStartPayload) => void
onProgress?: (snapshot: BuildProgressSnapshot) => void
onComplete?: (summary: BuildProgressSnapshot) => void
onError?: (error: unknown) => void
}
export class AfilmoryBuilder {
private storageManager: StorageManager | null = null
private config: BuilderConfig
@@ -51,13 +76,6 @@ export class AfilmoryBuilder {
this.pluginManager = new PluginManager(this.pluginReferences, {
baseDir: process.cwd(),
})
// 配置日志级别(保留接口以便未来扩展)
this.configureLogging()
}
private configureLogging(): void {
// 日志配置在 logger 模块中处理,保留方法以兼容未来扩展
}
async buildManifest(options: BuilderOptions): Promise<BuilderResult> {
@@ -82,6 +100,7 @@ export class AfilmoryBuilder {
let processedCount = 0
let skippedCount = 0
let newCount = 0
let failedCount = 0
let deletedCount = 0
try {
@@ -189,6 +208,7 @@ export class AfilmoryBuilder {
const concurrency = options.concurrencyLimit ?? this.config.options.defaultConcurrency
const { useClusterMode } = this.config.performance.worker
const shouldUseCluster = useClusterMode && tasksToProcess.length >= concurrency * 2
const {progressListener} = options
await this.emitPluginEvent(runState, 'beforeProcessTasks', {
options,
@@ -213,12 +233,72 @@ export class AfilmoryBuilder {
manifest.push(item)
}
} else {
const totalTasks = tasksToProcess.length
let completedTaskCount = 0
const applyResultCounters = (result: ProcessPhotoResult | null | undefined): void => {
if (!result) return
switch (result.type) {
case 'new': {
newCount++
processedCount++
break
}
case 'processed': {
processedCount++
break
}
case 'skipped': {
skippedCount++
break
}
case 'failed': {
failedCount++
break
}
}
}
const emitProgress = (currentKey?: string): void => {
progressListener?.onProgress?.({
total: totalTasks,
completed: completedTaskCount,
newCount,
processedCount,
skippedCount,
failedCount,
currentKey,
})
}
const handleTaskCompleted = ({
result,
taskIndex,
completed,
}: TaskCompletedPayload<ProcessPhotoResult>): void => {
if (result) {
applyResultCounters(result)
}
completedTaskCount = completed
const key = tasksToProcess[taskIndex]?.key
emitProgress(key)
}
progressListener?.onStart?.({
total: totalTasks,
mode: shouldUseCluster ? 'cluster' : 'worker',
concurrency,
})
emitProgress()
let results: ProcessPhotoResult[]
logger.main.info(
`开始${shouldUseCluster ? '多进程' : '并发'}处理任务,${shouldUseCluster ? '进程' : 'Worker'}数:${concurrency}${shouldUseCluster ? `,每进程并发:${this.config.performance.worker.workerConcurrency}` : ''}`,
)
let results: ProcessPhotoResult[]
if (shouldUseCluster) {
const clusterPool = new ClusterPool<ProcessPhotoResult>({
concurrency,
@@ -235,6 +315,7 @@ export class AfilmoryBuilder {
imageObjects: tasksToProcess,
builderConfig: this.getConfig(),
},
onTaskCompleted: handleTaskCompleted,
})
results = await clusterPool.execute()
@@ -242,6 +323,7 @@ export class AfilmoryBuilder {
const workerPool = new WorkerPool<ProcessPhotoResult>({
concurrency,
totalTasks: tasksToProcess.length,
onTaskCompleted: handleTaskCompleted,
})
results = await workerPool.execute(async (taskIndex, workerId) => {
@@ -294,24 +376,19 @@ export class AfilmoryBuilder {
})
manifest.push(result.item)
switch (result.type) {
case 'new': {
newCount++
processedCount++
break
}
case 'processed': {
processedCount++
break
}
case 'skipped': {
skippedCount++
break
}
}
}
completedTaskCount = Math.max(completedTaskCount, totalTasks)
emitProgress()
progressListener?.onComplete?.({
total: totalTasks,
completed: completedTaskCount,
newCount,
processedCount,
skippedCount,
failedCount,
})
for (const [key, item] of existingManifestMap) {
if (s3ImageKeys.has(key) && !manifest.some((m) => m.s3Key === key)) {
await this.emitPluginEvent(runState, 'beforeAddManifestItem', {
@@ -327,6 +404,17 @@ export class AfilmoryBuilder {
}
}
if (tasksToProcess.length === 0 && progressListener) {
progressListener.onComplete?.({
total: 0,
completed: 0,
newCount,
processedCount,
skippedCount,
failedCount,
})
}
await this.emitPluginEvent(runState, 'afterProcessTasks', {
options,
tasks: tasksToProcess,
@@ -399,6 +487,7 @@ export class AfilmoryBuilder {
return result
} catch (error) {
options.progressListener?.onError?.(error)
await this.emitPluginEvent(runState, 'onError', {
options,
error,

View File

@@ -6,11 +6,14 @@ import { join } from 'node:path'
import process from 'node:process'
import { fileURLToPath } from 'node:url'
import type { BuildProgressListener } from './builder/builder.js'
import { AfilmoryBuilder } from './builder/index.js'
import { loadBuilderConfig } from './config/index.js'
import { logger } from './logger/index.js'
import { logger, setLogListener } from './logger/index.js'
import { runAsWorker } from './runAsWorker.js'
type BuilderTUI = import('./cli/tui.js').BuilderTUI
async function main() {
// 检查是否作为 cluster worker 运行
if (process.env.CLUSTER_WORKER === 'true' || process.argv.includes('--cluster-worker') || cluster.isWorker) {
@@ -29,6 +32,7 @@ async function main() {
const isForceMode = args.has('--force')
const isForceManifest = args.has('--force-manifest')
const isForceThumbnails = args.has('--force-thumbnails')
const disableUi = args.has('--no-ui')
// 显示帮助信息
if (args.has('--help') || args.has('-h')) {
@@ -43,6 +47,7 @@ async function main() {
--force-thumbnails 强制重新生成缩略图
--config 显示当前配置信息
--help, -h 显示帮助信息
--no-ui 使用传统日志输出(禁用 TUI
示例:
tsx src/core/cli.ts # 增量更新
@@ -118,6 +123,24 @@ async function main() {
const concurrencyLimit = config.performance.worker.workerCount
const finalConcurrency = concurrencyLimit ?? config.options.defaultConcurrency
const processingMode = config.performance.worker.useClusterMode ? '多进程集群' : '并发线程池'
const processingModeKey = config.performance.worker.useClusterMode ? 'cluster' : 'worker'
const useTui = process.stdout.isTTY && !disableUi
let tui: BuilderTUI | null = null
let progressListener: BuildProgressListener | undefined
if (useTui) {
const { BuilderTUI } = await import('./cli/tui.js')
tui = new BuilderTUI()
tui.attach()
tui.setRunMetadata({
runMode,
concurrency: finalConcurrency,
processingMode: processingModeKey,
})
progressListener = tui.createProgressListener()
setLogListener((message) => tui?.handleLog(message), { forwardToConsole: false })
}
logger.main.info(`🚀 运行模式:${runMode}`)
logger.main.info(`⚡ 最大并发数:${finalConcurrency}`)
@@ -127,12 +150,25 @@ async function main() {
environmentCheck()
// 启动构建过程
await cliBuilder.buildManifest({
isForceMode,
isForceManifest,
isForceThumbnails,
concurrencyLimit,
})
try {
const result = await cliBuilder.buildManifest({
isForceMode,
isForceManifest,
isForceThumbnails,
concurrencyLimit,
progressListener,
})
tui?.markSuccess(result)
} catch (error) {
tui?.markError(error)
throw error
} finally {
if (useTui) {
setLogListener(null, { forwardToConsole: true })
tui?.detach()
}
}
// eslint-disable-next-line unicorn/no-process-exit
process.exit(0)

View File

@@ -0,0 +1,322 @@
/* eslint-disable unicorn/prefer-single-call */
import { format as utilFormat } from 'node:util'
import type {
BuilderResult,
BuildProgressListener,
BuildProgressSnapshot,
BuildProgressStartPayload,
} from '../builder/builder.js'
import type { LogMessage } from '../logger/index.js'
type RunStatus = 'idle' | 'running' | 'success' | 'error'
interface RunMetadata {
runMode: string
concurrency: number
processingMode: 'cluster' | 'worker'
}
interface ProgressState extends BuildProgressSnapshot {}
interface SummaryState {
durationMs: number
result: BuilderResult
}
interface TuiState {
status: RunStatus
runMeta: RunMetadata | null
progress: ProgressState | null
summary: SummaryState | null
error: string | null
logs: string[]
}
const MAX_LOG_LINES = 15
export class BuilderTUI {
private readonly stream: NodeJS.WriteStream
private state: TuiState = {
status: 'idle',
runMeta: null,
progress: null,
summary: null,
error: null,
logs: [],
}
private lastFrame = ''
private isAttached = false
private startTimestamp: number | null = null
constructor(options: { stream?: NodeJS.WriteStream } = {}) {
this.stream = options.stream ?? process.stdout
}
attach(): void {
if (this.isAttached) return
this.isAttached = true
this.hideCursor()
this.render(true)
}
detach(): void {
if (!this.isAttached) return
this.clearScreen()
this.showCursor()
this.isAttached = false
this.lastFrame = ''
if (this.state.status === 'success' && this.state.summary) {
const summaryLine = this.formatSuccessSummary(this.state.summary)
this.stream.write(`${summaryLine}\n`)
} else if (this.state.status === 'error' && this.state.error) {
this.stream.write(`${color('构建失败:', 'red')}${this.state.error}\n`)
}
}
setRunMetadata(meta: RunMetadata): void {
this.state.runMeta = meta
this.render()
}
handleLog(message: LogMessage): void {
const timestamp = formatTime(message.timestamp)
const levelLabel = formatLevel(message.level)
const formatted = utilFormat(...(message.args as any[]))
const line = `${color(timestamp, 'gray')} ${levelLabel} ${color(`[${message.tag}]`, 'blue')} ${formatted}`
this.state.logs = [...this.state.logs, line].slice(-MAX_LOG_LINES)
this.render()
}
createProgressListener(): BuildProgressListener {
return {
onStart: (payload: BuildProgressStartPayload) => {
this.startTimestamp = Date.now()
this.state.status = 'running'
this.state.progress = {
total: payload.total,
completed: 0,
newCount: 0,
processedCount: 0,
skippedCount: 0,
failedCount: 0,
}
this.render(true)
},
onProgress: (snapshot: BuildProgressSnapshot) => {
this.state.progress = { ...snapshot }
this.render()
},
onComplete: (snapshot: BuildProgressSnapshot) => {
this.state.progress = { ...snapshot }
this.render()
},
onError: (error: unknown) => {
this.markError(error)
},
}
}
markSuccess(result: BuilderResult): void {
const durationMs = this.startTimestamp ? Date.now() - this.startTimestamp : 0
this.state.status = 'success'
this.state.summary = { durationMs, result }
this.render()
}
markError(error: unknown): void {
const message = error instanceof Error ? error.message : String(error)
this.state.status = 'error'
this.state.error = message
this.render()
}
private render(force = false): void {
if (!this.isAttached) return
const frame = this.composeFrame()
if (!force && frame === this.lastFrame) {
return
}
this.lastFrame = frame
this.clearScreen()
this.stream.write(frame)
}
private composeFrame(): string {
const columns = this.stream.columns ?? 80
const lines: string[] = []
lines.push(this.composeHeader(columns))
lines.push('')
lines.push(this.composeProgress(columns))
lines.push('')
lines.push(this.composeStatusLine())
lines.push('')
lines.push(color('最近日志:', 'gray'))
lines.push(...(this.state.logs.length > 0 ? this.state.logs : [color('暂无日志输出…', 'gray')]))
return lines.join('\n')
}
private composeHeader(columns: number): string {
const meta = this.state.runMeta
if (!meta) {
return color('📸 Afilmory Builder', 'cyan')
}
const parts = [
color('📸 Afilmory Builder', 'cyan'),
`模式: ${color(meta.runMode, 'green')}`,
`处理: ${color(meta.processingMode === 'cluster' ? '多进程集群' : '并发线程池', 'green')}`,
`并发: ${color(String(meta.concurrency), 'green')}`,
]
const header = parts.join(' · ')
return header.length > columns ? header.slice(0, columns - 1) : header
}
private composeProgress(columns: number): string {
const { progress } = this.state
if (!progress) {
switch (this.state.status) {
case 'success': {
return color('构建已完成', 'green')
}
case 'error': {
return color('构建出现错误', 'red')
}
case 'running': {
return color('准备处理中…', 'yellow')
}
default: {
return color('等待开始…', 'yellow')
}
}
}
const barWidth = Math.max(10, Math.min(40, columns - 40))
const ratio = progress.total > 0 ? Math.min(1, Math.max(0, progress.completed / progress.total)) : 1
const filledLength = Math.round(ratio * barWidth)
const bar = `${'█'.repeat(filledLength)}${'░'.repeat(Math.max(0, barWidth - filledLength))}`
const percent = progress.total > 0 ? (ratio * 100).toFixed(1) : '100'
const segments = [`${progress.newCount}`, `更新 ${progress.processedCount}`, `跳过 ${progress.skippedCount}`]
if (progress.failedCount > 0) {
segments.push(`失败 ${progress.failedCount}`)
}
const counts = segments.join(' · ')
const parts = [`处理照片 [${bar}] ${percent}% (${progress.completed}/${progress.total})`, counts]
if (progress.currentKey) {
parts.push(`当前: ${truncateMiddle(progress.currentKey, columns - 10)}`)
}
return parts.join('\n')
}
private composeStatusLine(): string {
switch (this.state.status) {
case 'success': {
if (this.state.summary) {
return this.formatSuccessSummary(this.state.summary)
}
return color('构建完成', 'green')
}
case 'error': {
return color(`构建失败:${this.state.error ?? '未知错误'}`, 'red')
}
case 'running': {
return color('构建进行中…', 'yellow')
}
default: {
return color('等待开始…', 'gray')
}
}
}
private formatSuccessSummary(summary: SummaryState): string {
const { result, durationMs } = summary
const durationSeconds = Math.round(durationMs / 1000)
return color(
`✅ 构建成功 · 照片 ${result.totalPhotos} · 新增 ${result.newCount} · 更新 ${result.processedCount} · 删除 ${result.deletedCount} · 耗时 ${formatDuration(durationSeconds)}`,
'green',
)
}
private clearScreen(): void {
this.stream.write('\u001b[0;0H')
this.stream.write('\u001b[2J')
}
private hideCursor(): void {
this.stream.write('\u001b[?25l')
}
private showCursor(): void {
this.stream.write('\u001b[?25h')
}
}
function color(text: string, tone: 'cyan' | 'green' | 'yellow' | 'red' | 'gray' | 'blue' | 'magenta'): string {
const codes: Record<typeof tone, string> = {
cyan: '\u001b[36m',
green: '\u001b[32m',
yellow: '\u001b[33m',
red: '\u001b[31m',
gray: '\u001b[90m',
blue: '\u001b[34m',
magenta: '\u001b[35m',
} as const
return `${codes[tone]}${text}\u001b[0m`
}
function formatLevel(level: string): string {
const normalized = level.toLowerCase()
switch (normalized) {
case 'error':
case 'fatal': {
return color('ERR', 'red')
}
case 'warn': {
return color('WRN', 'yellow')
}
case 'success': {
return color('SUC', 'green')
}
case 'debug':
case 'trace': {
return color('DBG', 'magenta')
}
case 'start': {
return color('STA', 'cyan')
}
default: {
return color(normalized.slice(0, 3).toUpperCase(), 'gray')
}
}
}
function formatTime(date: Date): string {
const hours = `${date.getHours()}`.padStart(2, '0')
const minutes = `${date.getMinutes()}`.padStart(2, '0')
const seconds = `${date.getSeconds()}`.padStart(2, '0')
return `${hours}:${minutes}:${seconds}`
}
function formatDuration(totalSeconds: number): string {
const minutes = Math.floor(totalSeconds / 60)
const seconds = totalSeconds % 60
if (minutes === 0) {
return `${seconds}`
}
return `${minutes}${seconds}`
}
function truncateMiddle(value: string, maxLength: number): string {
const limit = Math.max(10, maxLength)
if (value.length <= limit) {
return value
}
const half = Math.max(1, Math.floor((limit - 1) / 2))
return `${value.slice(0, half)}${value.slice(value.length - half)}`
}

View File

@@ -1,23 +1,89 @@
import type {ConsolaInstance} from 'consola';
import consola from 'consola'
// 创建系统化的日志器
export type LogLevel = 'log' | 'info' | 'success' | 'warn' | 'error' | 'debug' | 'trace' | 'start' | 'fatal'
export interface LogMessage {
tag: string
level: LogLevel | string
args: unknown[]
timestamp: Date
}
type LogListener = (message: LogMessage) => void
let listener: LogListener | null = null
let forwardToConsole = true
export function setLogListener(newListener: LogListener | null, options: { forwardToConsole?: boolean } = {}): void {
listener = newListener
if (options.forwardToConsole !== undefined) {
forwardToConsole = options.forwardToConsole
}
}
export function setConsoleForwarding(enabled: boolean): void {
forwardToConsole = enabled
}
function notifyListener(tag: string, level: LogLevel | string, args: unknown[]): void {
listener?.({
tag,
level,
args,
timestamp: new Date(),
})
}
function combineTags(parentTag: string, childTag: string): string {
if (!parentTag) return childTag
return `${parentTag}/${childTag}`
}
function wrapInstance(instance: ConsolaInstance, tag: string): ConsolaInstance {
return new Proxy(instance, {
get(target, property, receiver) {
const value = Reflect.get(target, property, receiver)
if (property === 'withTag') {
return (...args: any[]) => {
const modifier = String(args[0] ?? '')
const nextTag = modifier ? combineTags(tag, modifier) : tag
const nextInstance = Reflect.apply(value, target, args) as ConsolaInstance
return wrapInstance(nextInstance, nextTag)
}
}
if (typeof value !== 'function') {
return value
}
return (...args: any[]) => {
notifyListener(tag, String(property), args)
if (forwardToConsole) {
return Reflect.apply(value, target, args)
}
return
}
},
})
}
function createTaggedLogger(tag: string): ConsolaInstance {
return wrapInstance(consola.withTag(tag), tag)
}
export const logger = {
// 主进程日志
main: consola.withTag('MAIN'),
// S3 操作日志
s3: consola.withTag('S3'),
// 图片处理日志
image: consola.withTag('IMAGE'),
// 缩略图处理日志
thumbnail: consola.withTag('THUMBNAIL'),
// Blurhash 处理日志
blurhash: consola.withTag('BLURHASH'),
// EXIF 处理日志
exif: consola.withTag('EXIF'),
// 文件系统操作日志
fs: consola.withTag('FS'),
// Worker 日志(动态创建)
worker: (id: number) => consola.withTag(`WORKER-${id}`),
main: createTaggedLogger('MAIN'),
s3: createTaggedLogger('S3'),
image: createTaggedLogger('IMAGE'),
thumbnail: createTaggedLogger('THUMBNAIL'),
blurhash: createTaggedLogger('BLURHASH'),
exif: createTaggedLogger('EXIF'),
fs: createTaggedLogger('FS'),
worker: (id: number) => createTaggedLogger(`WORKER-${id}`),
}
export type Logger = typeof logger

View File

@@ -50,7 +50,7 @@ class CompatibleLoggerAdapter implements PhotoLogger {
### 主要改进
1. **模块化分离**: 将不同的处理逻辑分离到专门的模块中
2. **Logger 适配器**: 不再通过参数传递 logger使用全局 logger 适配器
2. **Logger 适配器**: 使用异步执行上下文管理 logger避免全局状态污染
3. **缓存管理**: 统一管理各种数据的缓存和复用逻辑
4. **Live Photo 处理**: 专门的模块处理 Live Photo 检测和匹配
5. **类型安全**: 完善的 TypeScript 类型定义
@@ -60,13 +60,10 @@ class CompatibleLoggerAdapter implements PhotoLogger {
#### 基本使用
```typescript
import { processPhoto, setGlobalLoggers, createPhotoProcessingLoggers } from './index.js'
import { processPhoto, createPhotoProcessingLoggers } from './index.js'
// 设置全局 logger
const loggers = createPhotoProcessingLoggers(workerId, baseLogger)
setGlobalLoggers(loggers)
// 处理照片
const result = await processPhoto(
obj,
index,
@@ -76,6 +73,7 @@ const result = await processPhoto(
livePhotoMap,
options,
builder,
pluginRuntime,
)
```

View File

@@ -0,0 +1,80 @@
import { AsyncLocalStorage } from 'node:async_hooks'
import type { AfilmoryBuilder } from '../builder/builder.js'
import type { StorageManager } from '../storage/index.js'
import type { StorageConfig } from '../storage/interfaces.js'
import type { PhotoProcessingLoggers } from './logger-adapter.js'
export interface PhotoExecutionContext {
builder: AfilmoryBuilder
storageManager: StorageManager
storageConfig: StorageConfig
normalizeStorageKey: (key: string) => string
loggers?: PhotoProcessingLoggers
}
const photoContextStorage = new AsyncLocalStorage<PhotoExecutionContext>()
export function runWithPhotoExecutionContext<T>(
context: PhotoExecutionContext,
callback: () => T | Promise<T>,
): T | Promise<T> {
return photoContextStorage.run(context, callback)
}
export function getPhotoExecutionContext(): PhotoExecutionContext {
const context = photoContextStorage.getStore()
if (!context) {
throw new Error('Photo execution context is not available')
}
return context
}
function sanitizeStoragePath(value: string | undefined | null): string {
if (!value) return ''
return value.replaceAll('\\', '/').replaceAll(/\/+/g, '/').replace(/^\/+/, '').replace(/\/+$/, '')
}
/**
* 创建一个用于标准化存储键值的函数
* 会去除配置中的 prefix/path 等前缀,并统一路径分隔符
*/
export function createStorageKeyNormalizer(storageConfig: StorageConfig): (key: string) => string {
let basePrefix = ''
switch (storageConfig.provider) {
case 's3': {
basePrefix = sanitizeStoragePath(storageConfig.prefix)
break
}
case 'github': {
basePrefix = sanitizeStoragePath(storageConfig.path)
break
}
default: {
basePrefix = ''
}
}
const prefixWithSlash = basePrefix ? `${basePrefix}/` : ''
return (rawKey: string): string => {
if (!rawKey) return ''
const sanitizedKey = rawKey.replaceAll('\\', '/').replaceAll(/\/+/g, '/').replace(/^\/+/, '')
if (!basePrefix) {
return sanitizedKey
}
if (sanitizedKey === basePrefix) {
return ''
}
if (sanitizedKey.startsWith(prefixWithSlash)) {
return sanitizedKey.slice(prefixWithSlash.length)
}
return sanitizedKey
}
}

View File

@@ -5,7 +5,7 @@ import { compressUint8Array } from '@afilmory/utils'
import type { _Object } from '@aws-sdk/client-s3'
import sharp from 'sharp'
import type { AfilmoryBuilder, BuilderOptions } from '../builder/builder.js'
import type { BuilderOptions } from '../builder/builder.js'
import {
convertBmpToJpegSharpInstance,
getImageMetadataWithSharp,
@@ -17,6 +17,7 @@ import { THUMBNAIL_PLUGIN_DATA_KEY } from '../plugins/thumbnail-storage/shared.j
import type { PhotoManifestItem, ProcessPhotoResult } from '../types/photo.js'
import { shouldProcessPhoto } from './cache-manager.js'
import { processExifData, processThumbnailAndBlurhash, processToneAnalysis } from './data-processors.js'
import { getPhotoExecutionContext } from './execution-context.js'
import { extractPhotoInfo } from './info-extractor.js'
import { processLivePhoto } from './live-photo-handler.js'
import { getGlobalLoggers } from './logger-adapter.js'
@@ -43,13 +44,13 @@ export interface PhotoProcessingContext {
*/
export async function preprocessImage(
photoKey: string,
builder: AfilmoryBuilder,
): Promise<{ rawBuffer: Buffer; processedBuffer: Buffer } | null> {
const loggers = getGlobalLoggers()
const { storageManager } = getPhotoExecutionContext()
try {
// 获取图片数据
const rawImageBuffer = await builder.getStorageManager().getFile(photoKey)
const rawImageBuffer = await storageManager.getFile(photoKey)
if (!rawImageBuffer) {
loggers.image.error(`无法获取图片数据:${photoKey}`)
return null
@@ -122,7 +123,8 @@ export async function processImageWithSharp(imageBuffer: Buffer, photoKey: strin
* @param s3Key S3 键
* @returns 带摘要后缀的 ID
*/
async function generatePhotoId(s3Key: string, builder: AfilmoryBuilder): Promise<string> {
async function generatePhotoId(s3Key: string): Promise<string> {
const { builder } = getPhotoExecutionContext()
const { options } = builder.getConfig()
const { digestSuffixLength } = options
if (!digestSuffixLength || digestSuffixLength <= 0) {
@@ -141,17 +143,16 @@ async function generatePhotoId(s3Key: string, builder: AfilmoryBuilder): Promise
*/
export async function executePhotoProcessingPipeline(
context: PhotoProcessingContext,
builder: AfilmoryBuilder,
): Promise<PhotoManifestItem | null> {
const { photoKey, obj, existingItem, livePhotoMap, options } = context
const { storageManager } = getPhotoExecutionContext()
const loggers = getGlobalLoggers()
// Generate the actual photo ID with digest suffix
const photoId = await generatePhotoId(photoKey, builder)
const storageManager = builder.getStorageManager()
const photoId = await generatePhotoId(photoKey)
try {
// 1. 预处理图片
const imageData = await preprocessImage(photoKey, builder)
const imageData = await preprocessImage(photoKey)
if (!imageData) return null
// 2. 处理图片并创建 Sharp 实例
@@ -223,7 +224,6 @@ export async function executePhotoProcessingPipeline(
*/
export async function processPhotoWithPipeline(
context: PhotoProcessingContext,
builder: AfilmoryBuilder,
runtime: { runState: PluginRunState; builderOptions: BuilderOptions },
): Promise<{
item: PhotoManifestItem | null
@@ -231,9 +231,10 @@ export async function processPhotoWithPipeline(
pluginData: Record<string, unknown>
}> {
const { photoKey, existingItem, obj, options } = context
const { builder } = getPhotoExecutionContext()
const loggers = getGlobalLoggers()
const photoId = await generatePhotoId(photoKey, builder)
const photoId = await generatePhotoId(photoKey)
await builder.emitPluginEvent(runtime.runState, 'beforePhotoProcess', {
options: runtime.builderOptions,
@@ -270,7 +271,7 @@ export async function processPhotoWithPipeline(
let resultType: ProcessPhotoResult['type'] = isNewPhoto ? 'new' : 'processed'
try {
processedItem = await executePhotoProcessingPipeline(context, builder)
processedItem = await executePhotoProcessingPipeline(context)
if (!processedItem) {
resultType = 'failed'
}

View File

@@ -14,6 +14,13 @@ export { processExifData, processThumbnailAndBlurhash, processToneAnalysis } fro
export type { LivePhotoResult } from './live-photo-handler.js'
export { createLivePhotoMap, processLivePhoto } from './live-photo-handler.js'
// 执行上下文
export {
createStorageKeyNormalizer,
getPhotoExecutionContext,
runWithPhotoExecutionContext,
} from './execution-context.js'
// Logger 适配器
export type { PhotoLogger, PhotoProcessingLoggers } from './logger-adapter.js'
export {

View File

@@ -1,17 +1,21 @@
import path from 'node:path'
import { env } from '@env'
import type { PhotoInfo, PickedExif } from '../types/photo.js'
import { getPhotoExecutionContext } from './execution-context.js'
import { getGlobalLoggers } from './logger-adapter.js'
// 从文件名提取照片信息
export function extractPhotoInfo(key: string, exifData?: PickedExif | null): PhotoInfo {
const log = getGlobalLoggers().image
const { normalizeStorageKey } = getPhotoExecutionContext()
log.info(`提取照片信息:${key}`)
const fileName = path.basename(key, path.extname(key))
const sanitizedKey = key.replaceAll('\\', '/')
const relativeKey = normalizeStorageKey(sanitizedKey)
const keyForParsing = relativeKey || sanitizedKey
const extname = path.posix.extname(keyForParsing)
const fileName = path.posix.basename(keyForParsing, extname)
// 尝试从文件名解析信息,格式示例:"2024-01-15_城市夜景_1250views"
let title = fileName
@@ -20,22 +24,16 @@ export function extractPhotoInfo(key: string, exifData?: PickedExif | null): Pho
let tags: string[] = []
// 从目录路径中提取 tags
const dirPath = path.dirname(key)
if (dirPath && dirPath !== '.' && dirPath !== '/') {
// 移除前缀(如果有的话)
let relativePath = dirPath
if (env.S3_PREFIX && dirPath.startsWith(env.S3_PREFIX)) {
relativePath = dirPath.slice(env.S3_PREFIX.length)
}
const dirPathRaw = relativeKey ? path.posix.dirname(relativeKey) : path.posix.dirname(keyForParsing)
const dirPath = dirPathRaw === '.' || dirPathRaw === '/' ? '' : dirPathRaw
if (dirPath) {
const relativePath = dirPath.replaceAll(/^\/+|\/+$/g, '')
// 清理路径分隔符
relativePath = relativePath.replaceAll(/^\/+|\/+$/g, '')
if (relativePath) {
// 分割路径并过滤空字符串
const pathParts = relativePath.split('/').filter((part) => part.trim() !== '')
tags = pathParts.map((part) => part.trim())
// 分割路径并过滤空字符串
const pathParts = relativePath.split('/').filter((part) => part.trim() !== '')
tags = pathParts.map((part) => part.trim())
if (tags.length > 0) {
log.info(`从路径提取标签:[${tags.join(', ')}]`)
}
}
@@ -80,7 +78,7 @@ export function extractPhotoInfo(key: string, exifData?: PickedExif | null): Pho
// 如果标题为空,使用文件名
if (!title) {
title = path.basename(key, path.extname(key))
title = path.posix.basename(keyForParsing, extname)
}
log.info(`照片信息提取完成:"${title}"`)

View File

@@ -1,6 +1,7 @@
import type { ConsolaInstance } from 'consola'
import type { Logger, WorkerLogger } from '../logger/index.js'
import { getPhotoExecutionContext } from './execution-context.js'
/**
* 通用 Logger 接口
@@ -118,23 +119,39 @@ export function createPhotoProcessingLoggers(workerId: number, baseLogger: Logge
}
/**
* 全局 Logger 实例
* 遗留的全局 Logger(仅用于兼容旧代码)
*/
let globalLoggers: PhotoProcessingLoggers | null = null
let legacyLoggers: PhotoProcessingLoggers | null = null
let hasWarnedLegacyLoggerUsage = false
/**
* 设置全局 Logger
* @deprecated 使用执行上下文替代
*/
export function setGlobalLoggers(loggers: PhotoProcessingLoggers): void {
globalLoggers = loggers
legacyLoggers = loggers
}
/**
* 获取全局 Logger
* 获取当前上下文中的 Logger 集合
* 会优先从执行上下文中获取;若未初始化则回退到遗留的全局实例
*/
export function getGlobalLoggers(): PhotoProcessingLoggers {
if (!globalLoggers) {
throw new Error('Global loggers not initialized. Call setGlobalLoggers first.')
try {
const context = getPhotoExecutionContext()
if (context.loggers) {
return context.loggers
}
} catch {
// 忽略上下文不存在的错误,继续尝试使用遗留 logger
}
return globalLoggers
if (legacyLoggers) {
if (!hasWarnedLegacyLoggerUsage) {
legacyLoggers.image.warn('使用遗留的全局 logger请尽快迁移到执行上下文模式。')
hasWarnedLegacyLoggerUsage = true
}
return legacyLoggers
}
throw new Error('Photo loggers not initialized. Ensure runWithPhotoExecutionContext is used.')
}

View File

@@ -4,9 +4,10 @@ import type { AfilmoryBuilder, BuilderOptions } from '../builder/builder.js'
import { logger } from '../logger/index.js'
import type { PluginRunState } from '../plugins/manager.js'
import type { PhotoManifestItem, ProcessPhotoResult } from '../types/photo.js'
import { createStorageKeyNormalizer, runWithPhotoExecutionContext } from './execution-context.js'
import type { PhotoProcessingContext } from './image-pipeline.js'
import { processPhotoWithPipeline } from './image-pipeline.js'
import { createPhotoProcessingLoggers, setGlobalLoggers } from './logger-adapter.js'
import { createPhotoProcessingLoggers } from './logger-adapter.js'
export interface PhotoProcessorOptions {
isForceMode: boolean
@@ -37,12 +38,6 @@ export async function processPhoto(
const existingItem = existingManifestMap.get(key)
// 创建并设置全局 logger
const photoLoggers = createPhotoProcessingLoggers(workerId, logger)
setGlobalLoggers(photoLoggers)
photoLoggers.image.info(`📸 [${index + 1}/${totalImages}] ${key}`)
// 构建处理上下文
const context: PhotoProcessingContext = {
photoKey: key,
@@ -53,6 +48,23 @@ export async function processPhoto(
pluginData: {},
}
// 使用处理管道
return await processPhotoWithPipeline(context, builder, pluginRuntime)
const storageManager = builder.getStorageManager()
const storageConfig = builder.getConfig().storage
const photoLoggers = createPhotoProcessingLoggers(workerId, logger)
return await runWithPhotoExecutionContext(
{
builder,
storageManager,
storageConfig,
normalizeStorageKey: createStorageKeyNormalizer(storageConfig),
loggers: photoLoggers,
},
async () => {
photoLoggers.image.info(`📸 [${index + 1}/${totalImages}] ${key}`)
// 使用处理管道
return await processPhotoWithPipeline(context, pluginRuntime)
},
)
}

View File

@@ -25,6 +25,8 @@ interface NormalizedDescriptor {
options?: unknown
}
function normalizeDescriptor(ref: string): NormalizedDescriptor
function normalizeDescriptor(ref: BuilderPluginReference): NormalizedDescriptor | BuilderPluginESMImporter
function normalizeDescriptor(ref: BuilderPluginReference): NormalizedDescriptor | BuilderPluginESMImporter {
if (typeof ref === 'string') {
return { specifier: ref }
@@ -111,23 +113,23 @@ export async function loadPlugins(
const results: LoadedPluginDefinition[] = []
for (const entry of entries) {
if (isPluginESMImporter(entry)) {
const { default: pluginFactoryOrPlugin } = await entry()
const plugin = await instantiatePlugin(pluginFactoryOrPlugin)
const hooks = normalizeHooks(plugin)
const name = plugin.name || `lazy-loaded-plugin-${results.length}`
results.push({
name,
hooks,
pluginOptions: undefined,
})
continue
}
if (typeof entry === 'string') {
const descriptor = normalizeDescriptor(entry)
if (isPluginESMImporter(descriptor)) {
const { default: pluginFactoryOrPlugin } = await descriptor()
const plugin = await instantiatePlugin(pluginFactoryOrPlugin)
const hooks = normalizeHooks(plugin)
const name = plugin.name || `lazy-loaded-plugin-${results.length}`
results.push({
name,
hooks,
pluginOptions: undefined,
})
continue
}
const { resolvedPath } = resolveSpecifier(descriptor.specifier, baseDir)
const mod = await importModule(resolvedPath)

View File

@@ -1,5 +1,69 @@
import { deserialize as v8Deserialize, serialize as v8Serialize } from 'node:v8'
function manualClone<T>(value: T, seen = new WeakMap<object, unknown>()): T {
if (value === null || typeof value !== 'object') {
return value
}
if (typeof value === 'function') {
return value
}
if (value instanceof Date) {
return new Date(value) as unknown as T
}
if (value instanceof RegExp) {
return new RegExp(value.source, value.flags) as unknown as T
}
if (value instanceof Map) {
const result = new Map()
seen.set(value, result)
for (const [key, val] of value.entries()) {
result.set(manualClone(key, seen), manualClone(val, seen))
}
return result as unknown as T
}
if (value instanceof Set) {
const result = new Set()
seen.set(value, result)
for (const item of value.values()) {
result.add(manualClone(item, seen))
}
return result as unknown as T
}
if (seen.has(value as object)) {
return seen.get(value as object) as T
}
if (Array.isArray(value)) {
const result: unknown[] = []
seen.set(value, result)
for (const item of value) {
result.push(manualClone(item, seen))
}
return result as unknown as T
}
const proto = Object.getPrototypeOf(value)
const result = Object.create(proto ?? Object.prototype)
seen.set(value as object, result)
for (const key of Reflect.ownKeys(value)) {
const descriptor = Object.getOwnPropertyDescriptor(value, key)
if (!descriptor) continue
if ('value' in descriptor) {
descriptor.value = manualClone(descriptor.value, seen)
}
Object.defineProperty(result, key, descriptor)
}
return result as T
}
export function clone<T>(value: T): T {
const maybeStructuredClone = (
globalThis as typeof globalThis & {
@@ -8,8 +72,16 @@ export function clone<T>(value: T): T {
).structuredClone
if (typeof maybeStructuredClone === 'function') {
return maybeStructuredClone(value)
try {
return maybeStructuredClone(value)
} catch {
// Fall through to other clone strategies when structuredClone cannot handle the value
}
}
return v8Deserialize(v8Serialize(value))
try {
return v8Deserialize(v8Serialize(value))
} catch {
return manualClone(value)
}
}

View File

@@ -8,8 +8,9 @@ import { serialize } from 'node:v8'
import type { Logger } from '../logger/index.js'
import { logger } from '../logger/index.js'
import type { BuilderConfig } from '../types/config.js'
import type { TaskCompletedPayload } from './pool.js'
export interface ClusterPoolOptions {
export interface ClusterPoolOptions<T> {
concurrency: number
totalTasks: number
workerEnv?: Record<string, string> // 传递给 worker 的环境变量
@@ -21,6 +22,7 @@ export interface ClusterPoolOptions {
imageObjects: any[]
builderConfig: BuilderConfig
}
onTaskCompleted?: (payload: TaskCompletedPayload<T>) => void
}
export interface WorkerReadyMessage {
@@ -78,7 +80,8 @@ export class ClusterPool<T> extends EventEmitter {
private workerEnv: Record<string, string>
private workerConcurrency: number
private logger: Logger
private sharedData?: ClusterPoolOptions['sharedData']
private sharedData?: ClusterPoolOptions<T>['sharedData']
private onTaskCompleted?: (payload: TaskCompletedPayload<T>) => void
private taskQueue: Array<{ taskIndex: number }> = []
private workers = new Map<number, Worker>()
@@ -95,7 +98,7 @@ export class ClusterPool<T> extends EventEmitter {
Map<string, number> // taskId -> taskIndex
>() // 跟踪每个 worker 正在处理的任务,以便在崩溃时重入队
constructor(options: ClusterPoolOptions) {
constructor(options: ClusterPoolOptions<T>) {
super()
this.concurrency = options.concurrency
this.totalTasks = options.totalTasks
@@ -103,6 +106,7 @@ export class ClusterPool<T> extends EventEmitter {
this.workerConcurrency = options.workerConcurrency || 5 // 默认每个 worker 同时处理 5 个任务
this.logger = logger
this.sharedData = options.sharedData
this.onTaskCompleted = options.onTaskCompleted
this.results = Array.from({ length: this.totalTasks })
}
@@ -406,6 +410,13 @@ export class ClusterPool<T> extends EventEmitter {
successfulInBatch++
this.completedTasks++
this.onTaskCompleted?.({
taskIndex,
completed: this.completedTasks,
total: this.totalTasks,
result: taskResult.result as T,
})
} else if (taskResult.type === 'error') {
workerLogger.error(`任务执行失败:${taskResult.taskId}`, taskResult.error)
pendingTask.reject(new Error(taskResult.error))
@@ -461,6 +472,12 @@ export class ClusterPool<T> extends EventEmitter {
stats.processedTasks++
this.completedTasks++
this.onTaskCompleted?.({
taskIndex,
completed: this.completedTasks,
total: this.totalTasks,
result: message.result as T,
})
workerLogger.info(
`完成任务 ${taskIndex + 1}/${this.totalTasks} (已完成:${this.completedTasks},当前处理中:${newTaskCount})`,
)

View File

@@ -1,9 +1,17 @@
import type { Logger } from '../logger/index.js'
import { logger } from '../logger/index.js'
export interface WorkerPoolOptions {
export interface TaskCompletedPayload<T> {
taskIndex: number
completed: number
total: number
result: T
}
export interface WorkerPoolOptions<T> {
concurrency: number
totalTasks: number
onTaskCompleted?: (payload: TaskCompletedPayload<T>) => void
}
export type TaskFunction<T> = (taskIndex: number, workerId: number) => Promise<T>
@@ -14,11 +22,14 @@ export class WorkerPool<T> {
private totalTasks: number
private taskIndex = 0
private logger: Logger
private completedTasks = 0
private onTaskCompleted?: (payload: TaskCompletedPayload<T>) => void
constructor(options: WorkerPoolOptions) {
constructor(options: WorkerPoolOptions<T>) {
this.concurrency = options.concurrency
this.totalTasks = options.totalTasks
this.logger = logger
this.onTaskCompleted = options.onTaskCompleted
}
async execute(taskFunction: TaskFunction<T>): Promise<T[]> {
@@ -45,6 +56,14 @@ export class WorkerPool<T> {
results[currentIndex] = result
processedByWorker++
this.completedTasks++
this.onTaskCompleted?.({
taskIndex: currentIndex,
completed: this.completedTasks,
total: this.totalTasks,
result,
})
workerLogger.info(`完成任务 ${currentIndex + 1}/${this.totalTasks} - ${duration}ms`)
}