From af49d38373d637c858b3510d9f0cd3da96c52856 Mon Sep 17 00:00:00 2001 From: Abhinav Date: Wed, 20 May 2026 13:46:19 -0700 Subject: [PATCH] Support compact SessionStart hooks (#21272) # Why Compaction replaces the live conversation history, so hooks that use `SessionStart` to re-inject durable model context need a way to run again after that rewrite. Related - #19905 adds dedicated compact lifecycle hooks # What - add `compact` as a supported `SessionStart` source and matcher value - change pending `SessionStart` state from a single slot to a small FIFO queue so `resume` / `startup` / `clear` can be preserved alongside a later `compact` - drain all queued `SessionStart` sources before the next model request, preserving their original order # Testing The new integration coverage verifies both the basic `compact` matcher path and the stacked `resume` -> `compact` case where both hooks contribute `additionalContext` to the next model turn. --- codex-rs/core/src/hook_runtime.rs | 95 ++-- codex-rs/core/src/session/mod.rs | 4 + codex-rs/core/src/session/session.rs | 2 +- codex-rs/core/src/state/session.rs | 13 +- codex-rs/core/tests/suite/hooks.rs | 410 ++++++++++++++++++ .../session-start.command.input.schema.json | 3 +- codex-rs/hooks/src/events/session_start.rs | 2 + codex-rs/hooks/src/schema.rs | 2 +- 8 files changed, 476 insertions(+), 55 deletions(-) diff --git a/codex-rs/core/src/hook_runtime.rs b/codex-rs/core/src/hook_runtime.rs index 72bfc1de1c..3708ea6089 100644 --- a/codex-rs/core/src/hook_runtime.rs +++ b/codex-rs/core/src/hook_runtime.rs @@ -99,54 +99,57 @@ pub(crate) async fn run_pending_session_start_hooks( sess: &Arc, turn_context: &Arc, ) -> bool { - let Some(session_start_source) = sess.take_pending_session_start_source().await else { - return false; - }; - - // Pending session-start hooks are reused to dispatch thread-spawn subagent - // starts. Other subagent sessions are internal/system work and do not run - // start hooks. - let target = match &turn_context.session_source { - SessionSource::SubAgent(SubAgentSource::ThreadSpawn { agent_role, .. }) - if matches!( - session_start_source, - codex_hooks::SessionStartSource::Startup - ) => - { - let agent_type = agent_role - .clone() - .unwrap_or_else(|| crate::agent::role::DEFAULT_ROLE_NAME.to_string()); - StartHookTarget::SubagentStart { - turn_id: turn_context.sub_id.clone(), - agent_id: sess.thread_id().to_string(), - agent_type, + while let Some(session_start_source) = sess.take_pending_session_start_source().await { + // Pending session-start hooks are reused to dispatch thread-spawn subagent + // starts. Other subagent sessions are internal/system work and do not run + // start hooks. + let target = match &turn_context.session_source { + SessionSource::SubAgent(SubAgentSource::ThreadSpawn { agent_role, .. }) + if matches!( + session_start_source, + codex_hooks::SessionStartSource::Startup + ) => + { + let agent_type = agent_role + .clone() + .unwrap_or_else(|| crate::agent::role::DEFAULT_ROLE_NAME.to_string()); + StartHookTarget::SubagentStart { + turn_id: turn_context.sub_id.clone(), + agent_id: sess.thread_id().to_string(), + agent_type, + } } + SessionSource::SubAgent(_) => return false, + _ => StartHookTarget::SessionStart { + source: session_start_source, + }, + }; + let request = codex_hooks::SessionStartRequest { + session_id: sess.session_id().into(), + #[allow(deprecated)] + cwd: turn_context.cwd.clone(), + transcript_path: sess.hook_transcript_path().await, + model: turn_context.model_info.slug.clone(), + permission_mode: hook_permission_mode(turn_context), + target, + }; + let hooks = sess.hooks(); + let preview_runs = hooks.preview_session_start(&request); + if run_context_injecting_hook( + sess, + turn_context, + preview_runs, + hooks.run_session_start(request, Some(turn_context.sub_id.clone())), + ) + .await + .record_additional_contexts(sess, turn_context) + .await + { + return true; } - SessionSource::SubAgent(_) => return false, - _ => StartHookTarget::SessionStart { - source: session_start_source, - }, - }; - let request = codex_hooks::SessionStartRequest { - session_id: sess.session_id().into(), - #[allow(deprecated)] - cwd: turn_context.cwd.clone(), - transcript_path: sess.hook_transcript_path().await, - model: turn_context.model_info.slug.clone(), - permission_mode: hook_permission_mode(turn_context), - target, - }; - let hooks = sess.hooks(); - let preview_runs = hooks.preview_session_start(&request); - run_context_injecting_hook( - sess, - turn_context, - preview_runs, - hooks.run_session_start(request, Some(turn_context.sub_id.clone())), - ) - .await - .record_additional_contexts(sess, turn_context) - .await + } + + false } /// Runs matching `PreToolUse` hooks before a tool executes. diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index bdffd4e3ca..b710e095ef 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -2609,6 +2609,10 @@ impl Session { self.persist_rollout_items(&[RolloutItem::TurnContext(turn_context_item)]) .await; } + { + let mut state = self.state.lock().await; + state.queue_pending_session_start_source(codex_hooks::SessionStartSource::Compact); + } self.services.model_client.advance_window_generation(); } diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index 8945880c33..88958b39ef 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -1184,7 +1184,7 @@ impl Session { Box::pin(sess.record_initial_history(initial_history)).await; { let mut state = sess.state.lock().await; - state.set_pending_session_start_source(Some(session_start_source)); + state.queue_pending_session_start_source(session_start_source); } Ok(sess) diff --git a/codex-rs/core/src/state/session.rs b/codex-rs/core/src/state/session.rs index 0903d0d073..baeaddbbf9 100644 --- a/codex-rs/core/src/state/session.rs +++ b/codex-rs/core/src/state/session.rs @@ -4,6 +4,7 @@ use codex_protocol::models::AdditionalPermissionProfile; use codex_protocol::models::ResponseItem; use codex_sandboxing::policy_transforms::merge_permission_profiles; use std::collections::HashSet; +use std::collections::VecDeque; use super::auto_compact_window::AutoCompactWindow; use super::auto_compact_window::AutoCompactWindowSnapshot; @@ -33,7 +34,7 @@ pub(crate) struct SessionState { /// Startup prewarmed session prepared during session initialization. pub(crate) startup_prewarm: Option, pub(crate) active_connector_selection: HashSet, - pub(crate) pending_session_start_source: Option, + pub(crate) pending_session_start_sources: VecDeque, granted_permissions: Option, next_turn_is_first: bool, } @@ -52,7 +53,7 @@ impl SessionState { auto_compact_window: AutoCompactWindow::new(), startup_prewarm: None, active_connector_selection: HashSet::new(), - pending_session_start_source: None, + pending_session_start_sources: VecDeque::new(), granted_permissions: None, next_turn_is_first: true, } @@ -218,17 +219,17 @@ impl SessionState { self.active_connector_selection.clear(); } - pub(crate) fn set_pending_session_start_source( + pub(crate) fn queue_pending_session_start_source( &mut self, - value: Option, + value: codex_hooks::SessionStartSource, ) { - self.pending_session_start_source = value; + self.pending_session_start_sources.push_back(value); } pub(crate) fn take_pending_session_start_source( &mut self, ) -> Option { - self.pending_session_start_source.take() + self.pending_session_start_sources.pop_front() } pub(crate) fn record_granted_permissions(&mut self, permissions: AdditionalPermissionProfile) { diff --git a/codex-rs/core/tests/suite/hooks.rs b/codex-rs/core/tests/suite/hooks.rs index 4f376573b3..e9067b8a43 100644 --- a/codex-rs/core/tests/suite/hooks.rs +++ b/codex-rs/core/tests/suite/hooks.rs @@ -6,6 +6,8 @@ use anyhow::Result; use codex_core::config::Config; use codex_core::config::Constrained; use codex_features::Feature; +use codex_model_provider_info::ModelProviderInfo; +use codex_model_provider_info::built_in_model_providers; use codex_plugin::PluginHookSource; use codex_plugin::PluginId; use codex_protocol::items::parse_hook_prompt_fragment; @@ -26,11 +28,13 @@ use core_test_support::managed_network_requirements_loader; use core_test_support::responses::ev_apply_patch_custom_tool_call; use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; +use core_test_support::responses::ev_completed_with_tokens; use core_test_support::responses::ev_custom_tool_call; use core_test_support::responses::ev_function_call; use core_test_support::responses::ev_message_item_added; use core_test_support::responses::ev_output_text_delta; use core_test_support::responses::ev_response_created; +use core_test_support::responses::mount_compact_json_once; use core_test_support::responses::mount_sse_once; use core_test_support::responses::mount_sse_sequence; use core_test_support::responses::sse; @@ -74,6 +78,15 @@ fn network_workspace_write_profile() -> PermissionProfile { ) } +fn non_openai_model_provider(server: &wiremock::MockServer) -> ModelProviderInfo { + let mut provider = + built_in_model_providers(/* openai_base_url */ /*openai_base_url*/ None)["openai"].clone(); + provider.name = "OpenAI (test)".into(); + provider.base_url = Some(format!("{}/v1", server.uri())); + provider.supports_websockets = false; + provider +} + fn trust_plugin_hooks(config: &mut Config, plugin_hook_sources: Vec) { if let Err(err) = config.features.enable(Feature::CodexHooks) { panic!("test config should allow feature update: {err}"); @@ -229,6 +242,66 @@ if payload.get("prompt") == {blocked_prompt_json}: Ok(()) } +fn write_session_start_and_user_prompt_submit_order_hooks(home: &Path) -> Result<()> { + let session_start_script_path = home.join("session_start_order_hook.py"); + let user_prompt_submit_script_path = home.join("user_prompt_submit_order_hook.py"); + let log_path = home.join("hook_order_log.jsonl"); + + let session_start_script = format!( + r#"import json +from pathlib import Path +import sys + +payload = json.load(sys.stdin) +with Path(r"{log_path}").open("a", encoding="utf-8") as handle: + handle.write(json.dumps({{ + "hook_event_name": payload.get("hook_event_name"), + "source": payload.get("source"), + }}) + "\n") +"#, + log_path = log_path.display(), + ); + let user_prompt_submit_script = format!( + r#"import json +from pathlib import Path +import sys + +payload = json.load(sys.stdin) +with Path(r"{log_path}").open("a", encoding="utf-8") as handle: + handle.write(json.dumps({{ + "hook_event_name": payload.get("hook_event_name"), + "prompt": payload.get("prompt"), + }}) + "\n") +"#, + log_path = log_path.display(), + ); + let hooks = serde_json::json!({ + "hooks": { + "SessionStart": [{ + "hooks": [{ + "type": "command", + "command": format!("python3 {}", session_start_script_path.display()), + "statusMessage": "running session start order hook", + }] + }], + "UserPromptSubmit": [{ + "hooks": [{ + "type": "command", + "command": format!("python3 {}", user_prompt_submit_script_path.display()), + "statusMessage": "running user prompt submit order hook", + }] + }] + } + }); + + fs::write(&session_start_script_path, session_start_script) + .context("write session start order hook script")?; + fs::write(&user_prompt_submit_script_path, user_prompt_submit_script) + .context("write user prompt submit order hook script")?; + fs::write(home.join("hooks.json"), hooks.to_string()).context("write hooks.json")?; + Ok(()) +} + fn write_pre_tool_use_hook( home: &Path, matcher: Option<&str>, @@ -705,6 +778,109 @@ print(json.dumps({{ Ok(()) } +fn write_compact_session_start_hook_with_context( + home: &Path, + additional_context: &str, +) -> Result<()> { + let script_path = home.join("compact_session_start_hook.py"); + let log_path = home.join("session_start_hook_log.jsonl"); + let additional_context_json = serde_json::to_string(additional_context) + .context("serialize compact session start additional context for test")?; + let script = format!( + r#"import json +from pathlib import Path +import sys + +payload = json.load(sys.stdin) +with Path(r"{log_path}").open("a", encoding="utf-8") as handle: + handle.write(json.dumps(payload) + "\n") + +print(json.dumps({{ + "hookSpecificOutput": {{ + "hookEventName": "SessionStart", + "additionalContext": {additional_context_json} + }} +}})) +"#, + log_path = log_path.display(), + ); + let hooks = serde_json::json!({ + "hooks": { + "SessionStart": [{ + "matcher": "compact", + "hooks": [{ + "type": "command", + "command": format!("python3 {}", script_path.display()), + "statusMessage": "running compact session start hook", + }] + }] + } + }); + + fs::write(&script_path, script).context("write compact session start hook script")?; + fs::write(home.join("hooks.json"), hooks.to_string()).context("write hooks.json")?; + Ok(()) +} + +fn write_resume_and_compact_session_start_hook_with_context( + home: &Path, + resume_context: &str, + compact_context: &str, +) -> Result<()> { + let script_path = home.join("resume_and_compact_session_start_hook.py"); + let log_path = home.join("session_start_hook_log.jsonl"); + let resume_context_json = serde_json::to_string(resume_context) + .context("serialize resume session start additional context for test")?; + let compact_context_json = serde_json::to_string(compact_context) + .context("serialize compact session start additional context for test")?; + let script = format!( + r#"import json +from pathlib import Path +import sys + +payload = json.load(sys.stdin) +with Path(r"{log_path}").open("a", encoding="utf-8") as handle: + handle.write(json.dumps(payload) + "\n") + +contexts = {{ + "resume": {resume_context_json}, + "compact": {compact_context_json}, +}} +print(json.dumps({{ + "hookSpecificOutput": {{ + "hookEventName": "SessionStart", + "additionalContext": contexts[payload["source"]] + }} +}})) +"#, + log_path = log_path.display(), + ); + let hooks = serde_json::json!({ + "hooks": { + "SessionStart": [{ + "matcher": "resume", + "hooks": [{ + "type": "command", + "command": format!("python3 {}", script_path.display()), + "statusMessage": "running resume session start hook", + }] + }, { + "matcher": "compact", + "hooks": [{ + "type": "command", + "command": format!("python3 {}", script_path.display()), + "statusMessage": "running compact session start hook", + }] + }] + } + }); + + fs::write(&script_path, script) + .context("write resume and compact session start hook script")?; + fs::write(home.join("hooks.json"), hooks.to_string()).context("write hooks.json")?; + Ok(()) +} + fn rollout_hook_prompt_texts(text: &str) -> Result> { let mut texts = Vec::new(); for line in text.lines() { @@ -837,6 +1013,10 @@ fn read_user_prompt_submit_hook_inputs(home: &Path) -> Result Result> { + read_hook_inputs_from_log(home.join("hook_order_log.jsonl").as_path()) +} + fn ev_message_item_done(id: &str, text: &str) -> Value { serde_json::json!({ "type": "response.output_item.done", @@ -1020,6 +1200,54 @@ async fn session_start_hook_sees_materialized_transcript_path() -> Result<()> { Ok(()) } +#[tokio::test] +async fn session_start_runs_before_user_prompt_submit_on_first_turn() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let _response = mount_sse_once( + &server, + sse(vec![ + ev_response_created("resp-1"), + ev_assistant_message("msg-1", "hello after hooks"), + ev_completed("resp-1"), + ]), + ) + .await; + + let mut builder = test_codex() + .with_pre_build_hook(|home| { + if let Err(error) = write_session_start_and_user_prompt_submit_order_hooks(home) { + panic!("failed to write hook ordering fixtures: {error}"); + } + }) + .with_config(trust_discovered_hooks); + let test = builder.build(&server).await?; + + test.submit_turn("hello").await?; + + let hook_inputs = read_hook_order_inputs(test.codex_home_path())?; + assert_eq!( + hook_inputs + .iter() + .map(|input| input["hook_event_name"] + .as_str() + .expect("hook input event name")) + .collect::>(), + vec!["SessionStart", "UserPromptSubmit"], + ); + assert_eq!( + hook_inputs[0].get("source").and_then(Value::as_str), + Some("startup") + ); + assert_eq!( + hook_inputs[1].get("prompt").and_then(Value::as_str), + Some("hello") + ); + + Ok(()) +} + #[tokio::test] async fn session_start_hook_spills_large_additional_context() -> Result<()> { skip_if_no_network!(Ok(())); @@ -1125,6 +1353,188 @@ async fn pre_tool_use_hook_spills_large_additional_context() -> Result<()> { Ok(()) } +#[tokio::test] +async fn compact_session_start_hook_records_additional_context_for_next_turn() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let request_log = mount_sse_sequence( + &server, + vec![ + sse(vec![ + ev_response_created("resp-1"), + ev_assistant_message("msg-1", "hello before compact"), + ev_completed("resp-1"), + ]), + sse(vec![ + ev_response_created("resp-2"), + ev_assistant_message("msg-2", "summary after compact"), + ev_completed("resp-2"), + ]), + sse(vec![ + ev_response_created("resp-3"), + ev_assistant_message("msg-3", "hello after compact"), + ev_completed("resp-3"), + ]), + ], + ) + .await; + let additional_context = "remember the compacted reef"; + let model_provider = non_openai_model_provider(&server); + + let mut builder = test_codex() + .with_pre_build_hook(move |home| { + if let Err(error) = + write_compact_session_start_hook_with_context(home, additional_context) + { + panic!("failed to write compact session start hook fixture: {error}"); + } + }) + .with_config(move |config| { + config.model_provider = model_provider; + trust_discovered_hooks(config); + }); + let test = builder.build(&server).await?; + + test.submit_turn("hello before compact").await?; + test.codex.submit(Op::Compact).await?; + wait_for_event(&test.codex, |event| { + matches!(event, EventMsg::TurnComplete(_)) + }) + .await; + test.submit_turn("hello after compact").await?; + + let requests = request_log.requests(); + assert_eq!(requests.len(), 3); + assert!( + !requests[0] + .message_input_texts("developer") + .iter() + .any(|message| message == additional_context), + "compact matcher should not run for initial startup", + ); + assert!( + requests[2] + .message_input_texts("developer") + .iter() + .any(|message| message == additional_context), + "compact matcher should inject additional context before the next model turn", + ); + + let hook_inputs = read_session_start_hook_inputs(test.codex_home_path())?; + assert_eq!(hook_inputs.len(), 1); + assert_eq!( + hook_inputs[0].get("source").and_then(Value::as_str), + Some("compact") + ); + + Ok(()) +} + +#[tokio::test] +async fn resumed_thread_runs_resume_then_compact_session_start_hooks() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let limit = 200_000; + let over_limit_tokens = 250_000; + let remote_summary = "remote compact summary"; + let resume_context = "remember the resumed reef"; + let compact_context = "remember the compacted reef"; + let compacted_history = vec![ + ResponseItem::Message { + id: None, + role: "assistant".to_string(), + content: vec![ContentItem::OutputText { + text: remote_summary.to_string(), + }], + phase: None, + }, + ResponseItem::Compaction { + encrypted_content: "encrypted compact summary".to_string(), + }, + ]; + let compact_mock = + mount_compact_json_once(&server, serde_json::json!({ "output": compacted_history })).await; + + let mut builder = test_codex() + .with_pre_build_hook(move |home| { + if let Err(error) = write_resume_and_compact_session_start_hook_with_context( + home, + resume_context, + compact_context, + ) { + panic!("failed to write resume/compact session start hook fixture: {error}"); + } + }) + .with_config(move |config| { + config.model_auto_compact_token_limit = Some(limit); + trust_discovered_hooks(config); + }); + let initial = builder.build(&server).await?; + let home = initial.home.clone(); + let rollout_path = initial + .session_configured + .rollout_path + .clone() + .context("rollout path")?; + + mount_sse_once( + &server, + sse(vec![ + ev_response_created("resp-1"), + ev_assistant_message("msg-1", "hello before resume"), + ev_completed_with_tokens("resp-1", over_limit_tokens), + ]), + ) + .await; + initial.submit_turn("hello before resume").await?; + assert!(compact_mock.requests().is_empty()); + + let mut resume_builder = test_codex().with_config(move |config| { + config.model_auto_compact_token_limit = Some(limit); + trust_discovered_hooks(config); + }); + let resumed = resume_builder.resume(&server, home, rollout_path).await?; + let follow_up = mount_sse_once( + &server, + sse(vec![ + ev_response_created("resp-2"), + ev_assistant_message("msg-2", "hello after resume"), + ev_completed("resp-2"), + ]), + ) + .await; + + resumed.submit_turn("hello after resume").await?; + + assert_eq!(compact_mock.requests().len(), 1); + let developer_messages = follow_up.single_request().message_input_texts("developer"); + assert!( + developer_messages + .iter() + .any(|message| message == resume_context), + "resume matcher should inject additional context before the next model turn", + ); + assert!( + developer_messages + .iter() + .any(|message| message == compact_context), + "compact matcher should inject additional context before the next model turn", + ); + + let hook_inputs = read_session_start_hook_inputs(resumed.codex_home_path())?; + assert_eq!( + hook_inputs + .iter() + .filter_map(|input| input.get("source").and_then(Value::as_str)) + .collect::>(), + vec!["resume", "compact"], + ); + + Ok(()) +} + #[tokio::test] async fn stop_hook_spills_large_continuation_prompt() -> Result<()> { skip_if_no_network!(Ok(())); diff --git a/codex-rs/hooks/schema/generated/session-start.command.input.schema.json b/codex-rs/hooks/schema/generated/session-start.command.input.schema.json index 385932c364..d460fdb583 100644 --- a/codex-rs/hooks/schema/generated/session-start.command.input.schema.json +++ b/codex-rs/hooks/schema/generated/session-start.command.input.schema.json @@ -37,7 +37,8 @@ "enum": [ "startup", "resume", - "clear" + "clear", + "compact" ], "type": "string" }, diff --git a/codex-rs/hooks/src/events/session_start.rs b/codex-rs/hooks/src/events/session_start.rs index 2904497b40..bd1aa2096f 100644 --- a/codex-rs/hooks/src/events/session_start.rs +++ b/codex-rs/hooks/src/events/session_start.rs @@ -24,6 +24,7 @@ pub enum SessionStartSource { Startup, Resume, Clear, + Compact, } impl SessionStartSource { @@ -32,6 +33,7 @@ impl SessionStartSource { Self::Startup => "startup", Self::Resume => "resume", Self::Clear => "clear", + Self::Compact => "compact", } } } diff --git a/codex-rs/hooks/src/schema.rs b/codex-rs/hooks/src/schema.rs index 96d5f790be..15a655c976 100644 --- a/codex-rs/hooks/src/schema.rs +++ b/codex-rs/hooks/src/schema.rs @@ -673,7 +673,7 @@ fn permission_mode_schema(_gen: &mut SchemaGenerator) -> Schema { } fn session_start_source_schema(_gen: &mut SchemaGenerator) -> Schema { - string_enum_schema(&["startup", "resume", "clear"]) + string_enum_schema(&["startup", "resume", "clear", "compact"]) } fn compaction_trigger_schema(_gen: &mut SchemaGenerator) -> Schema {