fix: cluster pool dead lock (#99)

This commit is contained in:
woolen-sheep
2025-09-13 22:01:09 +08:00
committed by GitHub
parent 41b0d7a73d
commit a185ab9242

View File

@@ -119,9 +119,6 @@ export class ClusterPool<T> extends EventEmitter {
// 启动 worker 进程
await this.startWorkers()
// 等待所有 worker 准备好
await this.waitForWorkersReady()
// 等待所有任务完成
return new Promise((resolve, reject) => {
this.on('allTasksCompleted', () => {
@@ -134,9 +131,6 @@ export class ClusterPool<T> extends EventEmitter {
})
this.on('error', reject)
// 开始分发任务
this.distributeInitialTasks()
})
}
@@ -200,25 +194,25 @@ export class ClusterPool<T> extends EventEmitter {
| { type: 'init-complete'; workerId: number },
) => {
switch (message.type) {
case 'ready':
case 'pong': {
this.handleWorkerReady(workerId, message as WorkerReadyMessage)
break;
}
case 'init-complete': {
this.handleWorkerInitComplete(workerId)
break;
}
case 'batch-result': {
this.handleWorkerBatchResult(workerId, message as BatchTaskResult)
break;
}
default: {
this.handleWorkerMessage(workerId, message as TaskResult)
}
case 'ready':
case 'pong': {
this.handleWorkerReady(workerId, message as WorkerReadyMessage)
break
}
case 'init-complete': {
this.handleWorkerInitComplete(workerId)
break
}
case 'batch-result': {
this.handleWorkerBatchResult(workerId, message as BatchTaskResult)
break
}
default: {
this.handleWorkerMessage(workerId, message as TaskResult)
}
}
},
)
@@ -305,13 +299,6 @@ export class ClusterPool<T> extends EventEmitter {
}
}
private distributeInitialTasks(): void {
// 为每个 worker 分配初始任务批次
for (const [workerId] of this.workers) {
this.assignBatchTasksToWorker(workerId)
}
}
private assignBatchTasksToWorker(workerId: number): void {
if (this.taskQueue.length === 0) return
@@ -521,27 +508,6 @@ export class ClusterPool<T> extends EventEmitter {
this.workerStats.clear()
}
private async waitForWorkersReady(): Promise<void> {
return new Promise((resolve) => {
const requiredWorkers = Math.ceil(
this.totalTasks / this.workerConcurrency,
)
const expectedWorkers = Math.min(this.concurrency, requiredWorkers)
const checkReady = () => {
if (this.readyWorkers.size >= expectedWorkers) {
this.logger.main.info(
`所有 ${expectedWorkers} 个 worker 进程已准备就绪`,
)
resolve()
}
}
this.on('workerReady', checkReady)
checkReady() // 立即检查一次
})
}
// 获取 worker 统计信息
getWorkerStats(): WorkerStats[] {
return Array.from(this.workerStats.values())