From b8ad178bee2d8be03634a67913393413c9060cd2 Mon Sep 17 00:00:00 2001 From: kevin-ramdass Date: Sat, 31 Jan 2026 18:23:15 -0800 Subject: [PATCH] refactor: Pass and handle a dedicated timeout signal for streaming content generation and update mock to return an AsyncGenerator. --- packages/core/src/core/geminiChat.test.ts | 31 ++-- packages/core/src/core/geminiChat.ts | 174 ++++++++++++---------- 2 files changed, 118 insertions(+), 87 deletions(-) diff --git a/packages/core/src/core/geminiChat.test.ts b/packages/core/src/core/geminiChat.test.ts index 4a2de3c712..84135fcb82 100644 --- a/packages/core/src/core/geminiChat.test.ts +++ b/packages/core/src/core/geminiChat.test.ts @@ -1022,18 +1022,27 @@ describe('GeminiChat', () => { // 2. Mock generateContentStream to hang UNTIL aborted vi.mocked(mockContentGenerator.generateContentStream).mockImplementation( - (request) => new Promise((resolve, reject) => { - const config = request?.config; - if (config?.abortSignal) { - if (config.abortSignal.aborted) { - reject(new Error('Aborted')); - return; + async (request) => { + const signal = request.config?.abortSignal; + return { + async *[Symbol.asyncIterator]() { + if (signal) { + await new Promise((resolve, reject) => { + if (signal.aborted) { + reject(new Error('Aborted')); + return; + } + signal.addEventListener('abort', () => { + reject(new Error('Aborted')); + }); + }); + } else { + await new Promise(() => {}); // Hang indefinitely } - config.abortSignal.addEventListener('abort', () => { - reject(new Error('Aborted')); - }); - } - }), + yield {} as GenerateContentResponse; // Dummy yield to satisfy require-yield lint rule + }, + } as AsyncGenerator; + }, ); // 3. Start the request diff --git a/packages/core/src/core/geminiChat.ts b/packages/core/src/core/geminiChat.ts index d4b651578d..370bb6676d 100644 --- a/packages/core/src/core/geminiChat.ts +++ b/packages/core/src/core/geminiChat.ts @@ -592,14 +592,20 @@ export class GeminiChat { lastContentsToUse = contentsToUse; try { - return await this.config.getContentGenerator().generateContentStream( - { - model: modelToUse, - contents: contentsToUse, - config, - }, - prompt_id, - ); + const stream = await this.config + .getContentGenerator() + .generateContentStream( + { + model: modelToUse, + contents: contentsToUse, + config, + }, + prompt_id, + ); + return { + stream, + timeoutSignal, + }; } catch (error) { if (timeoutSignal.aborted) { const timeoutError = new Error( @@ -632,24 +638,27 @@ export class GeminiChat { ); }; - const streamResponse = await retryWithBackoff(apiCall, { - onPersistent429: onPersistent429Callback, - onValidationRequired: onValidationRequiredCallback, - authType: this.config.getContentGeneratorConfig()?.authType, - retryFetchErrors: this.config.getRetryFetchErrors(), - signal: abortSignal, - maxAttempts: availabilityMaxAttempts, - getAvailabilityContext, - onRetry: (attempt, error, delayMs) => { - coreEvents.emitRetryAttempt({ - attempt, - maxAttempts: availabilityMaxAttempts ?? 10, - delayMs, - error: error instanceof Error ? error.message : String(error), - model: lastModelToUse, - }); + const { stream: streamResponse, timeoutSignal } = await retryWithBackoff( + apiCall, + { + onPersistent429: onPersistent429Callback, + onValidationRequired: onValidationRequiredCallback, + authType: this.config.getContentGeneratorConfig()?.authType, + retryFetchErrors: this.config.getRetryFetchErrors(), + signal: abortSignal, + maxAttempts: availabilityMaxAttempts, + getAvailabilityContext, + onRetry: (attempt, error, delayMs) => { + coreEvents.emitRetryAttempt({ + attempt, + maxAttempts: availabilityMaxAttempts ?? 10, + delayMs, + error: error instanceof Error ? error.message : String(error), + model: lastModelToUse, + }); + }, }, - }); + ); // Store the original request for AfterModel hooks const originalRequest: GenerateContentParameters = { @@ -662,6 +671,7 @@ export class GeminiChat { lastModelToUse, streamResponse, originalRequest, + timeoutSignal, ); } @@ -820,69 +830,81 @@ export class GeminiChat { model: string, streamResponse: AsyncGenerator, originalRequest: GenerateContentParameters, + timeoutSignal: AbortSignal, ): AsyncGenerator { const modelResponseParts: Part[] = []; let hasToolCall = false; let finishReason: FinishReason | undefined; - for await (const chunk of streamResponse) { - const candidateWithReason = chunk?.candidates?.find( - (candidate) => candidate.finishReason, - ); - if (candidateWithReason) { - finishReason = candidateWithReason.finishReason as FinishReason; - } - - if (isValidResponse(chunk)) { - const content = chunk.candidates?.[0]?.content; - if (content?.parts) { - if (content.parts.some((part) => part.thought)) { - // Record thoughts - this.recordThoughtFromContent(content); - } - if (content.parts.some((part) => part.functionCall)) { - hasToolCall = true; - } - - modelResponseParts.push( - ...content.parts.filter((part) => !part.thought), - ); - } - } - - // Record token usage if this chunk has usageMetadata - if (chunk.usageMetadata) { - this.chatRecordingService.recordMessageTokens(chunk.usageMetadata); - if (chunk.usageMetadata.promptTokenCount !== undefined) { - this.lastPromptTokenCount = chunk.usageMetadata.promptTokenCount; - } - } - - const hookSystem = this.config.getHookSystem(); - if (originalRequest && chunk && hookSystem) { - const hookResult = await hookSystem.fireAfterModelEvent( - originalRequest, - chunk, + try { + for await (const chunk of streamResponse) { + const candidateWithReason = chunk?.candidates?.find( + (candidate) => candidate.finishReason, ); - - if (hookResult.stopped) { - throw new AgentExecutionStoppedError( - hookResult.reason || 'Agent execution stopped by hook', - ); + if (candidateWithReason) { + finishReason = candidateWithReason.finishReason as FinishReason; } - if (hookResult.blocked) { - throw new AgentExecutionBlockedError( - hookResult.reason || 'Agent execution blocked by hook', - hookResult.response, - ); + if (isValidResponse(chunk)) { + const content = chunk.candidates?.[0]?.content; + if (content?.parts) { + if (content.parts.some((part) => part.thought)) { + // Record thoughts + this.recordThoughtFromContent(content); + } + if (content.parts.some((part) => part.functionCall)) { + hasToolCall = true; + } + + modelResponseParts.push( + ...content.parts.filter((part) => !part.thought), + ); + } } - yield hookResult.response; - } else { - yield chunk; + // Record token usage if this chunk has usageMetadata + if (chunk.usageMetadata) { + this.chatRecordingService.recordMessageTokens(chunk.usageMetadata); + if (chunk.usageMetadata.promptTokenCount !== undefined) { + this.lastPromptTokenCount = chunk.usageMetadata.promptTokenCount; + } + } + + const hookSystem = this.config.getHookSystem(); + if (originalRequest && chunk && hookSystem) { + const hookResult = await hookSystem.fireAfterModelEvent( + originalRequest, + chunk, + ); + + if (hookResult.stopped) { + throw new AgentExecutionStoppedError( + hookResult.reason || 'Agent execution stopped by hook', + ); + } + + if (hookResult.blocked) { + throw new AgentExecutionBlockedError( + hookResult.reason || 'Agent execution blocked by hook', + hookResult.response, + ); + } + + yield hookResult.response; + } else { + yield chunk; + } } + } catch (error) { + if (timeoutSignal.aborted) { + const timeoutError = new Error( + `Request timed out after ${TIMEOUT_MS}ms`, + ); + (timeoutError as unknown as { code: string }).code = 'ETIMEDOUT'; + throw timeoutError; + } + throw error; } // String thoughts and consolidate text parts.