diff --git a/packages/core/src/context/config/profiles.ts b/packages/core/src/context/config/profiles.ts index b721c01ad0..356b13baf7 100644 --- a/packages/core/src/context/config/profiles.ts +++ b/packages/core/src/context/config/profiles.ts @@ -4,6 +4,7 @@ * SPDX-License-Identifier: Apache-2.0 */ +import { createStateSnapshotHydrationProcessor } from '../processors/stateSnapshotHydrationProcessor.js'; import type { AsyncPipelineDef, ContextManagementConfig, @@ -88,6 +89,29 @@ export const generalistProfile: ContextProfile = { ): PipelineDef[] => // Helper to merge default options with dynamically loaded processorOptions by ID [ + { + name: 'Initialization Hydration', + triggers: ['initialization'], + processors: [ + createStateSnapshotHydrationProcessor('StateSnapshotHydration', env, { + target: (() => { + const res = resolveProcessorOptions(config, 'StateSnapshotSync', { + target: 'max', + maxStateTokens: 4000, + maxSummaryTurns: 5, + }).target; + if ( + res === 'incremental' || + res === 'freeNTokens' || + res === 'max' + ) { + return res; + } + return undefined; + })(), + }), + ], + }, { name: 'Immediate Sanitization', triggers: ['new_message'], diff --git a/packages/core/src/context/config/types.ts b/packages/core/src/context/config/types.ts index caa3aecfec..425e07f39b 100644 --- a/packages/core/src/context/config/types.ts +++ b/packages/core/src/context/config/types.ts @@ -7,6 +7,7 @@ import type { ContextProcessor, AsyncContextProcessor } from '../pipeline.js'; export type PipelineTrigger = + | 'initialization' | 'new_message' | 'retained_exceeded' | 'gc_backstop' diff --git a/packages/core/src/context/contextManager.ts b/packages/core/src/context/contextManager.ts index f161e0903b..9ccccb2d3a 100644 --- a/packages/core/src/context/contextManager.ts +++ b/packages/core/src/context/contextManager.ts @@ -16,6 +16,8 @@ import { HistoryObserver } from './historyObserver.js'; import { render } from './graph/render.js'; import { ContextWorkingBufferImpl } from './pipeline/contextWorkingBuffer.js'; import { debugLogger } from '../utils/debugLogger.js'; +import { SnapshotStateHelper } from './utils/snapshotGenerator.js'; +import type { ContextEngineState } from '../services/chatRecordingTypes.js'; import { hardenHistory } from '../utils/historyHardening.js'; import { checkContextInvariants } from './utils/invariantChecker.js'; import type { AdvancedTokenCalculator } from './utils/contextTokenCalculator.js'; @@ -429,4 +431,29 @@ export class ContextManager { ); } } + + exportState(): ContextEngineState { + return SnapshotStateHelper.exportState(this.buffer.nodes); + } + + async restoreState(state: ContextEngineState): Promise { + if (!state) return; + SnapshotStateHelper.restoreState(state, this.env.inbox); + + // Explicitly run the initialization trigger to eagerly splice the restored snapshot + // into the graph *before* the first user message creates cache artifacts. + const nodes = this.buffer.nodes; + const hydratedNodes = await this.orchestrator.executeTriggerSync( + 'initialization', + nodes, + new Set(), // No trigger targets needed, it just reads the inbox + ); + + // Create a pseudo-processor result to apply the hydration without duplicating logic + this.buffer = this.buffer.applyProcessorResult( + 'StateSnapshotHydration', + nodes, + hydratedNodes, + ); + } } diff --git a/packages/core/src/context/processors/stateSnapshotHydrationProcessor.ts b/packages/core/src/context/processors/stateSnapshotHydrationProcessor.ts new file mode 100644 index 0000000000..cd3a690b25 --- /dev/null +++ b/packages/core/src/context/processors/stateSnapshotHydrationProcessor.ts @@ -0,0 +1,111 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ +import { randomUUID } from 'node:crypto'; +import type { JSONSchemaType } from 'ajv'; +import type { ContextProcessor, ProcessArgs } from '../pipeline.js'; +import type { ContextEnvironment } from '../pipeline/environment.js'; +import { type Snapshot, NodeType } from '../graph/types.js'; + +export interface StateSnapshotHydrationProcessorOptions { + target?: 'incremental' | 'freeNTokens' | 'max'; +} + +export const StateSnapshotHydrationProcessorOptionsSchema: JSONSchemaType = + { + type: 'object', + properties: { + target: { + type: 'string', + enum: ['incremental', 'freeNTokens', 'max'], + nullable: true, + }, + }, + required: [], + }; + +export function createStateSnapshotHydrationProcessor( + id: string, + env: ContextEnvironment, + options: StateSnapshotHydrationProcessorOptions, +): ContextProcessor { + return { + id, + name: 'StateSnapshotHydrationProcessor', + process: async ({ targets, inbox }: ProcessArgs) => { + if (targets.length === 0) { + return targets; + } + + // Determine what mode we are looking for: 'incremental' -> 'point-in-time', 'max' -> 'accumulate' + const strategy = options.target ?? 'max'; + const expectedType = + strategy === 'incremental' ? 'point-in-time' : 'accumulate'; + + // 1. Check Inbox for a completed Snapshot (The Fast Path) + const proposedSnapshots = inbox.getMessages<{ + newText: string; + consumedIds: string[]; + type: string; + timestamp: number; + }>('PROPOSED_SNAPSHOT'); + + if (proposedSnapshots.length > 0) { + // Filter for the snapshot type that matches our processor mode + const matchingSnapshots = proposedSnapshots.filter( + (s) => s.payload.type === expectedType, + ); + + // Sort by newest timestamp first (we want the most accumulated snapshot) + const sorted = [...matchingSnapshots].sort( + (a, b) => b.timestamp - a.timestamp, + ); + + for (const proposed of sorted) { + const { consumedIds, newText, timestamp } = proposed.payload; + + // Verify all consumed IDs still exist sequentially in targets + const targetIds = new Set(targets.map((t) => t.id)); + const isValid = consumedIds.every((id) => targetIds.has(id)); + + if (isValid) { + // If valid, apply it! + const newId = randomUUID(); + + const snapshotNode: Snapshot = { + id: newId, + turnId: newId, + type: NodeType.SNAPSHOT, + timestamp: timestamp ?? Date.now(), + role: 'user', + payload: { text: newText }, + abstractsIds: consumedIds, + }; + + // Remove the consumed nodes and insert the snapshot at the earliest index + const returnedNodes = targets.filter( + (t) => !consumedIds.includes(t.id), + ); + const firstRemovedIdx = targets.findIndex((t) => + consumedIds.includes(t.id), + ); + + if (firstRemovedIdx !== -1) { + const idx = Math.max(0, firstRemovedIdx); + returnedNodes.splice(idx, 0, snapshotNode); + } else { + returnedNodes.unshift(snapshotNode); + } + + inbox.consume(proposed.id); + return returnedNodes; + } + } + } + + return targets; + }, + }; +} diff --git a/packages/core/src/context/processors/stateSnapshotProcessor.ts b/packages/core/src/context/processors/stateSnapshotProcessor.ts index 3ad3001a16..df6b7f0a52 100644 --- a/packages/core/src/context/processors/stateSnapshotProcessor.ts +++ b/packages/core/src/context/processors/stateSnapshotProcessor.ts @@ -53,76 +53,13 @@ export function createStateSnapshotProcessor( return { id, name: 'StateSnapshotProcessor', - process: async ({ targets, inbox }: ProcessArgs) => { + process: async ({ targets }: ProcessArgs) => { if (targets.length === 0) { return targets; } // Determine what mode we are looking for: 'incremental' -> 'point-in-time', 'max' -> 'accumulate' const strategy = options.target ?? 'max'; - const expectedType = - strategy === 'incremental' ? 'point-in-time' : 'accumulate'; - - // 1. Check Inbox for a completed Snapshot (The Fast Path) - const proposedSnapshots = inbox.getMessages<{ - newText: string; - consumedIds: string[]; - type: string; - timestamp: number; - }>('PROPOSED_SNAPSHOT'); - - if (proposedSnapshots.length > 0) { - // Filter for the snapshot type that matches our processor mode - const matchingSnapshots = proposedSnapshots.filter( - (s) => s.payload.type === expectedType, - ); - - // Sort by newest timestamp first (we want the most accumulated snapshot) - const sorted = [...matchingSnapshots].sort( - (a, b) => b.timestamp - a.timestamp, - ); - - for (const proposed of sorted) { - const { consumedIds, newText, timestamp } = proposed.payload; - - // Verify all consumed IDs still exist sequentially in targets - const targetIds = new Set(targets.map((t) => t.id)); - const isValid = consumedIds.every((id) => targetIds.has(id)); - - if (isValid) { - // If valid, apply it! - const newId = randomUUID(); - - const snapshotNode: Snapshot = { - id: newId, - turnId: newId, - type: NodeType.SNAPSHOT, - timestamp: timestamp ?? Date.now(), - role: 'user', - payload: { text: newText }, - abstractsIds: consumedIds, - }; - - // Remove the consumed nodes and insert the snapshot at the earliest index - const returnedNodes = targets.filter( - (t) => !consumedIds.includes(t.id), - ); - const firstRemovedIdx = targets.findIndex((t) => - consumedIds.includes(t.id), - ); - - if (firstRemovedIdx !== -1) { - const idx = Math.max(0, firstRemovedIdx); - returnedNodes.splice(idx, 0, snapshotNode); - } else { - returnedNodes.unshift(snapshotNode); - } - - inbox.consume(proposed.id); - return returnedNodes; - } - } - } // 2. The Synchronous Backstop (The Slow Path) let targetTokensToRemove = 0; diff --git a/packages/core/src/context/utils/snapshotGenerator.ts b/packages/core/src/context/utils/snapshotGenerator.ts index 1d268e1228..b0c6c6f8a6 100644 --- a/packages/core/src/context/utils/snapshotGenerator.ts +++ b/packages/core/src/context/utils/snapshotGenerator.ts @@ -52,6 +52,7 @@ export interface BaselineSnapshotInfo { text: string; abstractsIds: string[]; id: string; + timestamp: number; } /** @@ -72,11 +73,47 @@ export function findLatestSnapshotBaseline( ? [...lastSnapshotNode.abstractsIds] : [], id: lastSnapshotNode.id, + timestamp: lastSnapshotNode.timestamp, }; } + return undefined; } +import type { LiveInbox } from '../pipeline/inbox.js'; +import type { ContextEngineState } from '../../services/chatRecordingTypes.js'; + +export const SnapshotStateHelper = { + exportState(nodes: readonly ConcreteNode[]): ContextEngineState { + const baseline = findLatestSnapshotBaseline(nodes); + if (!baseline) return {}; + + return { + snapshot: { + text: baseline.text, + consumedIds: baseline.abstractsIds, + timestamp: baseline.timestamp, + }, + }; + }, + + restoreState(state: ContextEngineState, inbox: LiveInbox): void { + if (!state.snapshot) return; + + if ( + typeof state.snapshot.text === 'string' && + Array.isArray(state.snapshot.consumedIds) + ) { + inbox.publish('PROPOSED_SNAPSHOT', { + newText: state.snapshot.text, + consumedIds: state.snapshot.consumedIds, + type: 'accumulate', + timestamp: state.snapshot.timestamp ?? Date.now(), + }); + } + }, +}; + export class SnapshotGenerator { constructor(private readonly env: ContextEnvironment) {} diff --git a/packages/core/src/core/client.ts b/packages/core/src/core/client.ts index 302b89d7f0..da3335aa30 100644 --- a/packages/core/src/core/client.ts +++ b/packages/core/src/core/client.ts @@ -414,6 +414,14 @@ export class GeminiClient { chat, this.lastPromptId, ); + if ( + this.contextManager && + resumedSessionData?.conversation.contextState + ) { + this.contextManager.restoreState( + resumedSessionData.conversation.contextState, + ); + } return chat; } catch (error) { await reportError( @@ -819,6 +827,9 @@ export class GeminiClient { promptBaseUnits: currentBaseUnits, }); } + this.chat + ?.getChatRecordingService() + ?.saveContextState(this.contextManager.exportState()); } this.updateTelemetryTokenCount(); if (event.type === GeminiEventType.Error) { diff --git a/packages/core/src/services/chatRecordingService.ts b/packages/core/src/services/chatRecordingService.ts index e070a1c542..4c1b47fdcf 100644 --- a/packages/core/src/services/chatRecordingService.ts +++ b/packages/core/src/services/chatRecordingService.ts @@ -36,6 +36,7 @@ import { type RewindRecord, type MetadataUpdateRecord, type PartialMetadataRecord, + type ContextEngineState, } from './chatRecordingTypes.js'; export * from './chatRecordingTypes.js'; @@ -646,6 +647,15 @@ export class ChatRecordingService { } } + saveContextState(contextState: ContextEngineState): void { + if (!this.conversationFile) return; + try { + this.updateMetadata({ contextState } as Partial); + } catch (e: unknown) { + debugLogger.error('Error saving context state to chat history.', e); + } + } + saveSummary(summary: string): void { if (!this.conversationFile) return; try { diff --git a/packages/core/src/services/chatRecordingTypes.ts b/packages/core/src/services/chatRecordingTypes.ts index ae5dca8026..926bd5b8a5 100644 --- a/packages/core/src/services/chatRecordingTypes.ts +++ b/packages/core/src/services/chatRecordingTypes.ts @@ -89,6 +89,14 @@ export type MessageRecord = BaseMessageRecord & ConversationRecordExtra; /** * Complete conversation record stored in session files. */ +export interface ContextEngineState { + snapshot?: { + text: string; + consumedIds: string[]; + timestamp?: number; + }; +} + export interface ConversationRecord { sessionId: string; projectHash: string; @@ -101,8 +109,9 @@ export interface ConversationRecord { directories?: string[]; /** The kind of conversation (main agent or subagent) */ kind?: 'main' | 'subagent'; + /** Opaque state object representing Context Engine state (e.g. snapshots) */ + contextState?: ContextEngineState; } - /** * Data structure for resuming an existing session. */ @@ -137,4 +146,5 @@ export interface PartialMetadataRecord { memoryScratchpad?: MemoryScratchpad; directories?: string[]; kind?: 'main' | 'subagent'; + contextState?: ContextEngineState; }