From 65d4bdfc24df129a06ea7eaecc4fc88e21e2ad6e Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 12 May 2026 19:09:58 +0000 Subject: [PATCH] feat(context): persist context engine snapshots across sessions This commit implements an opaque state export/import pattern for the ContextManager to ensure expensive LLM-derived snapshots are properly rehydrated upon session resume. The ContextManager now exposes `exportState` and `restoreState` methods, delegating structural validation to the `SnapshotStateHelper`. During active chat, the GeminiClient routinely passes the finalized context state down to the ChatRecordingService, which seamlessly embeds it into the existing JSONL metadata payload. Upon resume, the saved snapshot is re-published as a draft to the LiveInbox, allowing the synchronous pipeline to automatically and deterministically splice it back into the raw graph without an additional LLM call. --- packages/core/src/context/config/profiles.ts | 24 ++++ packages/core/src/context/config/types.ts | 1 + packages/core/src/context/contextManager.ts | 27 +++++ .../stateSnapshotHydrationProcessor.ts | 111 ++++++++++++++++++ .../processors/stateSnapshotProcessor.ts | 65 +--------- .../src/context/utils/snapshotGenerator.ts | 37 ++++++ packages/core/src/core/client.ts | 11 ++ .../core/src/services/chatRecordingService.ts | 10 ++ .../core/src/services/chatRecordingTypes.ts | 12 +- 9 files changed, 233 insertions(+), 65 deletions(-) create mode 100644 packages/core/src/context/processors/stateSnapshotHydrationProcessor.ts diff --git a/packages/core/src/context/config/profiles.ts b/packages/core/src/context/config/profiles.ts index b721c01ad0..356b13baf7 100644 --- a/packages/core/src/context/config/profiles.ts +++ b/packages/core/src/context/config/profiles.ts @@ -4,6 +4,7 @@ * SPDX-License-Identifier: Apache-2.0 */ +import { createStateSnapshotHydrationProcessor } from '../processors/stateSnapshotHydrationProcessor.js'; import type { AsyncPipelineDef, ContextManagementConfig, @@ -88,6 +89,29 @@ export const generalistProfile: ContextProfile = { ): PipelineDef[] => // Helper to merge default options with dynamically loaded processorOptions by ID [ + { + name: 'Initialization Hydration', + triggers: ['initialization'], + processors: [ + createStateSnapshotHydrationProcessor('StateSnapshotHydration', env, { + target: (() => { + const res = resolveProcessorOptions(config, 'StateSnapshotSync', { + target: 'max', + maxStateTokens: 4000, + maxSummaryTurns: 5, + }).target; + if ( + res === 'incremental' || + res === 'freeNTokens' || + res === 'max' + ) { + return res; + } + return undefined; + })(), + }), + ], + }, { name: 'Immediate Sanitization', triggers: ['new_message'], diff --git a/packages/core/src/context/config/types.ts b/packages/core/src/context/config/types.ts index caa3aecfec..425e07f39b 100644 --- a/packages/core/src/context/config/types.ts +++ b/packages/core/src/context/config/types.ts @@ -7,6 +7,7 @@ import type { ContextProcessor, AsyncContextProcessor } from '../pipeline.js'; export type PipelineTrigger = + | 'initialization' | 'new_message' | 'retained_exceeded' | 'gc_backstop' diff --git a/packages/core/src/context/contextManager.ts b/packages/core/src/context/contextManager.ts index f161e0903b..9ccccb2d3a 100644 --- a/packages/core/src/context/contextManager.ts +++ b/packages/core/src/context/contextManager.ts @@ -16,6 +16,8 @@ import { HistoryObserver } from './historyObserver.js'; import { render } from './graph/render.js'; import { ContextWorkingBufferImpl } from './pipeline/contextWorkingBuffer.js'; import { debugLogger } from '../utils/debugLogger.js'; +import { SnapshotStateHelper } from './utils/snapshotGenerator.js'; +import type { ContextEngineState } from '../services/chatRecordingTypes.js'; import { hardenHistory } from '../utils/historyHardening.js'; import { checkContextInvariants } from './utils/invariantChecker.js'; import type { AdvancedTokenCalculator } from './utils/contextTokenCalculator.js'; @@ -429,4 +431,29 @@ export class ContextManager { ); } } + + exportState(): ContextEngineState { + return SnapshotStateHelper.exportState(this.buffer.nodes); + } + + async restoreState(state: ContextEngineState): Promise { + if (!state) return; + SnapshotStateHelper.restoreState(state, this.env.inbox); + + // Explicitly run the initialization trigger to eagerly splice the restored snapshot + // into the graph *before* the first user message creates cache artifacts. + const nodes = this.buffer.nodes; + const hydratedNodes = await this.orchestrator.executeTriggerSync( + 'initialization', + nodes, + new Set(), // No trigger targets needed, it just reads the inbox + ); + + // Create a pseudo-processor result to apply the hydration without duplicating logic + this.buffer = this.buffer.applyProcessorResult( + 'StateSnapshotHydration', + nodes, + hydratedNodes, + ); + } } diff --git a/packages/core/src/context/processors/stateSnapshotHydrationProcessor.ts b/packages/core/src/context/processors/stateSnapshotHydrationProcessor.ts new file mode 100644 index 0000000000..cd3a690b25 --- /dev/null +++ b/packages/core/src/context/processors/stateSnapshotHydrationProcessor.ts @@ -0,0 +1,111 @@ +/** + * @license + * Copyright 2026 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ +import { randomUUID } from 'node:crypto'; +import type { JSONSchemaType } from 'ajv'; +import type { ContextProcessor, ProcessArgs } from '../pipeline.js'; +import type { ContextEnvironment } from '../pipeline/environment.js'; +import { type Snapshot, NodeType } from '../graph/types.js'; + +export interface StateSnapshotHydrationProcessorOptions { + target?: 'incremental' | 'freeNTokens' | 'max'; +} + +export const StateSnapshotHydrationProcessorOptionsSchema: JSONSchemaType = + { + type: 'object', + properties: { + target: { + type: 'string', + enum: ['incremental', 'freeNTokens', 'max'], + nullable: true, + }, + }, + required: [], + }; + +export function createStateSnapshotHydrationProcessor( + id: string, + env: ContextEnvironment, + options: StateSnapshotHydrationProcessorOptions, +): ContextProcessor { + return { + id, + name: 'StateSnapshotHydrationProcessor', + process: async ({ targets, inbox }: ProcessArgs) => { + if (targets.length === 0) { + return targets; + } + + // Determine what mode we are looking for: 'incremental' -> 'point-in-time', 'max' -> 'accumulate' + const strategy = options.target ?? 'max'; + const expectedType = + strategy === 'incremental' ? 'point-in-time' : 'accumulate'; + + // 1. Check Inbox for a completed Snapshot (The Fast Path) + const proposedSnapshots = inbox.getMessages<{ + newText: string; + consumedIds: string[]; + type: string; + timestamp: number; + }>('PROPOSED_SNAPSHOT'); + + if (proposedSnapshots.length > 0) { + // Filter for the snapshot type that matches our processor mode + const matchingSnapshots = proposedSnapshots.filter( + (s) => s.payload.type === expectedType, + ); + + // Sort by newest timestamp first (we want the most accumulated snapshot) + const sorted = [...matchingSnapshots].sort( + (a, b) => b.timestamp - a.timestamp, + ); + + for (const proposed of sorted) { + const { consumedIds, newText, timestamp } = proposed.payload; + + // Verify all consumed IDs still exist sequentially in targets + const targetIds = new Set(targets.map((t) => t.id)); + const isValid = consumedIds.every((id) => targetIds.has(id)); + + if (isValid) { + // If valid, apply it! + const newId = randomUUID(); + + const snapshotNode: Snapshot = { + id: newId, + turnId: newId, + type: NodeType.SNAPSHOT, + timestamp: timestamp ?? Date.now(), + role: 'user', + payload: { text: newText }, + abstractsIds: consumedIds, + }; + + // Remove the consumed nodes and insert the snapshot at the earliest index + const returnedNodes = targets.filter( + (t) => !consumedIds.includes(t.id), + ); + const firstRemovedIdx = targets.findIndex((t) => + consumedIds.includes(t.id), + ); + + if (firstRemovedIdx !== -1) { + const idx = Math.max(0, firstRemovedIdx); + returnedNodes.splice(idx, 0, snapshotNode); + } else { + returnedNodes.unshift(snapshotNode); + } + + inbox.consume(proposed.id); + return returnedNodes; + } + } + } + + return targets; + }, + }; +} diff --git a/packages/core/src/context/processors/stateSnapshotProcessor.ts b/packages/core/src/context/processors/stateSnapshotProcessor.ts index 3ad3001a16..df6b7f0a52 100644 --- a/packages/core/src/context/processors/stateSnapshotProcessor.ts +++ b/packages/core/src/context/processors/stateSnapshotProcessor.ts @@ -53,76 +53,13 @@ export function createStateSnapshotProcessor( return { id, name: 'StateSnapshotProcessor', - process: async ({ targets, inbox }: ProcessArgs) => { + process: async ({ targets }: ProcessArgs) => { if (targets.length === 0) { return targets; } // Determine what mode we are looking for: 'incremental' -> 'point-in-time', 'max' -> 'accumulate' const strategy = options.target ?? 'max'; - const expectedType = - strategy === 'incremental' ? 'point-in-time' : 'accumulate'; - - // 1. Check Inbox for a completed Snapshot (The Fast Path) - const proposedSnapshots = inbox.getMessages<{ - newText: string; - consumedIds: string[]; - type: string; - timestamp: number; - }>('PROPOSED_SNAPSHOT'); - - if (proposedSnapshots.length > 0) { - // Filter for the snapshot type that matches our processor mode - const matchingSnapshots = proposedSnapshots.filter( - (s) => s.payload.type === expectedType, - ); - - // Sort by newest timestamp first (we want the most accumulated snapshot) - const sorted = [...matchingSnapshots].sort( - (a, b) => b.timestamp - a.timestamp, - ); - - for (const proposed of sorted) { - const { consumedIds, newText, timestamp } = proposed.payload; - - // Verify all consumed IDs still exist sequentially in targets - const targetIds = new Set(targets.map((t) => t.id)); - const isValid = consumedIds.every((id) => targetIds.has(id)); - - if (isValid) { - // If valid, apply it! - const newId = randomUUID(); - - const snapshotNode: Snapshot = { - id: newId, - turnId: newId, - type: NodeType.SNAPSHOT, - timestamp: timestamp ?? Date.now(), - role: 'user', - payload: { text: newText }, - abstractsIds: consumedIds, - }; - - // Remove the consumed nodes and insert the snapshot at the earliest index - const returnedNodes = targets.filter( - (t) => !consumedIds.includes(t.id), - ); - const firstRemovedIdx = targets.findIndex((t) => - consumedIds.includes(t.id), - ); - - if (firstRemovedIdx !== -1) { - const idx = Math.max(0, firstRemovedIdx); - returnedNodes.splice(idx, 0, snapshotNode); - } else { - returnedNodes.unshift(snapshotNode); - } - - inbox.consume(proposed.id); - return returnedNodes; - } - } - } // 2. The Synchronous Backstop (The Slow Path) let targetTokensToRemove = 0; diff --git a/packages/core/src/context/utils/snapshotGenerator.ts b/packages/core/src/context/utils/snapshotGenerator.ts index 1d268e1228..b0c6c6f8a6 100644 --- a/packages/core/src/context/utils/snapshotGenerator.ts +++ b/packages/core/src/context/utils/snapshotGenerator.ts @@ -52,6 +52,7 @@ export interface BaselineSnapshotInfo { text: string; abstractsIds: string[]; id: string; + timestamp: number; } /** @@ -72,11 +73,47 @@ export function findLatestSnapshotBaseline( ? [...lastSnapshotNode.abstractsIds] : [], id: lastSnapshotNode.id, + timestamp: lastSnapshotNode.timestamp, }; } + return undefined; } +import type { LiveInbox } from '../pipeline/inbox.js'; +import type { ContextEngineState } from '../../services/chatRecordingTypes.js'; + +export const SnapshotStateHelper = { + exportState(nodes: readonly ConcreteNode[]): ContextEngineState { + const baseline = findLatestSnapshotBaseline(nodes); + if (!baseline) return {}; + + return { + snapshot: { + text: baseline.text, + consumedIds: baseline.abstractsIds, + timestamp: baseline.timestamp, + }, + }; + }, + + restoreState(state: ContextEngineState, inbox: LiveInbox): void { + if (!state.snapshot) return; + + if ( + typeof state.snapshot.text === 'string' && + Array.isArray(state.snapshot.consumedIds) + ) { + inbox.publish('PROPOSED_SNAPSHOT', { + newText: state.snapshot.text, + consumedIds: state.snapshot.consumedIds, + type: 'accumulate', + timestamp: state.snapshot.timestamp ?? Date.now(), + }); + } + }, +}; + export class SnapshotGenerator { constructor(private readonly env: ContextEnvironment) {} diff --git a/packages/core/src/core/client.ts b/packages/core/src/core/client.ts index 302b89d7f0..da3335aa30 100644 --- a/packages/core/src/core/client.ts +++ b/packages/core/src/core/client.ts @@ -414,6 +414,14 @@ export class GeminiClient { chat, this.lastPromptId, ); + if ( + this.contextManager && + resumedSessionData?.conversation.contextState + ) { + this.contextManager.restoreState( + resumedSessionData.conversation.contextState, + ); + } return chat; } catch (error) { await reportError( @@ -819,6 +827,9 @@ export class GeminiClient { promptBaseUnits: currentBaseUnits, }); } + this.chat + ?.getChatRecordingService() + ?.saveContextState(this.contextManager.exportState()); } this.updateTelemetryTokenCount(); if (event.type === GeminiEventType.Error) { diff --git a/packages/core/src/services/chatRecordingService.ts b/packages/core/src/services/chatRecordingService.ts index e070a1c542..4c1b47fdcf 100644 --- a/packages/core/src/services/chatRecordingService.ts +++ b/packages/core/src/services/chatRecordingService.ts @@ -36,6 +36,7 @@ import { type RewindRecord, type MetadataUpdateRecord, type PartialMetadataRecord, + type ContextEngineState, } from './chatRecordingTypes.js'; export * from './chatRecordingTypes.js'; @@ -646,6 +647,15 @@ export class ChatRecordingService { } } + saveContextState(contextState: ContextEngineState): void { + if (!this.conversationFile) return; + try { + this.updateMetadata({ contextState } as Partial); + } catch (e: unknown) { + debugLogger.error('Error saving context state to chat history.', e); + } + } + saveSummary(summary: string): void { if (!this.conversationFile) return; try { diff --git a/packages/core/src/services/chatRecordingTypes.ts b/packages/core/src/services/chatRecordingTypes.ts index ae5dca8026..926bd5b8a5 100644 --- a/packages/core/src/services/chatRecordingTypes.ts +++ b/packages/core/src/services/chatRecordingTypes.ts @@ -89,6 +89,14 @@ export type MessageRecord = BaseMessageRecord & ConversationRecordExtra; /** * Complete conversation record stored in session files. */ +export interface ContextEngineState { + snapshot?: { + text: string; + consumedIds: string[]; + timestamp?: number; + }; +} + export interface ConversationRecord { sessionId: string; projectHash: string; @@ -101,8 +109,9 @@ export interface ConversationRecord { directories?: string[]; /** The kind of conversation (main agent or subagent) */ kind?: 'main' | 'subagent'; + /** Opaque state object representing Context Engine state (e.g. snapshots) */ + contextState?: ContextEngineState; } - /** * Data structure for resuming an existing session. */ @@ -137,4 +146,5 @@ export interface PartialMetadataRecord { memoryScratchpad?: MemoryScratchpad; directories?: string[]; kind?: 'main' | 'subagent'; + contextState?: ContextEngineState; }