Support compact SessionStart hooks (#21272)

# Why

Compaction replaces the live conversation history, so hooks that use
`SessionStart` to re-inject durable model context need a way to run
again after that rewrite.

Related - #19905 adds dedicated compact lifecycle hooks

# What

- add `compact` as a supported `SessionStart` source and matcher value
- change pending `SessionStart` state from a single slot to a small FIFO
queue so `resume` / `startup` / `clear` can be preserved alongside a
later `compact`
- drain all queued `SessionStart` sources before the next model request,
preserving their original order

# Testing

The new integration coverage verifies both the basic `compact` matcher
path and the stacked `resume` -> `compact` case where both hooks
contribute `additionalContext` to the next model turn.
This commit is contained in:
Abhinav
2026-05-20 13:46:19 -07:00
committed by GitHub
parent 9265701b7f
commit af49d38373
8 changed files with 476 additions and 55 deletions

View File

@@ -99,54 +99,57 @@ pub(crate) async fn run_pending_session_start_hooks(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
) -> bool {
let Some(session_start_source) = sess.take_pending_session_start_source().await else {
return false;
};
// Pending session-start hooks are reused to dispatch thread-spawn subagent
// starts. Other subagent sessions are internal/system work and do not run
// start hooks.
let target = match &turn_context.session_source {
SessionSource::SubAgent(SubAgentSource::ThreadSpawn { agent_role, .. })
if matches!(
session_start_source,
codex_hooks::SessionStartSource::Startup
) =>
{
let agent_type = agent_role
.clone()
.unwrap_or_else(|| crate::agent::role::DEFAULT_ROLE_NAME.to_string());
StartHookTarget::SubagentStart {
turn_id: turn_context.sub_id.clone(),
agent_id: sess.thread_id().to_string(),
agent_type,
while let Some(session_start_source) = sess.take_pending_session_start_source().await {
// Pending session-start hooks are reused to dispatch thread-spawn subagent
// starts. Other subagent sessions are internal/system work and do not run
// start hooks.
let target = match &turn_context.session_source {
SessionSource::SubAgent(SubAgentSource::ThreadSpawn { agent_role, .. })
if matches!(
session_start_source,
codex_hooks::SessionStartSource::Startup
) =>
{
let agent_type = agent_role
.clone()
.unwrap_or_else(|| crate::agent::role::DEFAULT_ROLE_NAME.to_string());
StartHookTarget::SubagentStart {
turn_id: turn_context.sub_id.clone(),
agent_id: sess.thread_id().to_string(),
agent_type,
}
}
SessionSource::SubAgent(_) => return false,
_ => StartHookTarget::SessionStart {
source: session_start_source,
},
};
let request = codex_hooks::SessionStartRequest {
session_id: sess.session_id().into(),
#[allow(deprecated)]
cwd: turn_context.cwd.clone(),
transcript_path: sess.hook_transcript_path().await,
model: turn_context.model_info.slug.clone(),
permission_mode: hook_permission_mode(turn_context),
target,
};
let hooks = sess.hooks();
let preview_runs = hooks.preview_session_start(&request);
if run_context_injecting_hook(
sess,
turn_context,
preview_runs,
hooks.run_session_start(request, Some(turn_context.sub_id.clone())),
)
.await
.record_additional_contexts(sess, turn_context)
.await
{
return true;
}
SessionSource::SubAgent(_) => return false,
_ => StartHookTarget::SessionStart {
source: session_start_source,
},
};
let request = codex_hooks::SessionStartRequest {
session_id: sess.session_id().into(),
#[allow(deprecated)]
cwd: turn_context.cwd.clone(),
transcript_path: sess.hook_transcript_path().await,
model: turn_context.model_info.slug.clone(),
permission_mode: hook_permission_mode(turn_context),
target,
};
let hooks = sess.hooks();
let preview_runs = hooks.preview_session_start(&request);
run_context_injecting_hook(
sess,
turn_context,
preview_runs,
hooks.run_session_start(request, Some(turn_context.sub_id.clone())),
)
.await
.record_additional_contexts(sess, turn_context)
.await
}
false
}
/// Runs matching `PreToolUse` hooks before a tool executes.

View File

@@ -2609,6 +2609,10 @@ impl Session {
self.persist_rollout_items(&[RolloutItem::TurnContext(turn_context_item)])
.await;
}
{
let mut state = self.state.lock().await;
state.queue_pending_session_start_source(codex_hooks::SessionStartSource::Compact);
}
self.services.model_client.advance_window_generation();
}

View File

@@ -1184,7 +1184,7 @@ impl Session {
Box::pin(sess.record_initial_history(initial_history)).await;
{
let mut state = sess.state.lock().await;
state.set_pending_session_start_source(Some(session_start_source));
state.queue_pending_session_start_source(session_start_source);
}
Ok(sess)

View File

@@ -4,6 +4,7 @@ use codex_protocol::models::AdditionalPermissionProfile;
use codex_protocol::models::ResponseItem;
use codex_sandboxing::policy_transforms::merge_permission_profiles;
use std::collections::HashSet;
use std::collections::VecDeque;
use super::auto_compact_window::AutoCompactWindow;
use super::auto_compact_window::AutoCompactWindowSnapshot;
@@ -33,7 +34,7 @@ pub(crate) struct SessionState {
/// Startup prewarmed session prepared during session initialization.
pub(crate) startup_prewarm: Option<SessionStartupPrewarmHandle>,
pub(crate) active_connector_selection: HashSet<String>,
pub(crate) pending_session_start_source: Option<codex_hooks::SessionStartSource>,
pub(crate) pending_session_start_sources: VecDeque<codex_hooks::SessionStartSource>,
granted_permissions: Option<AdditionalPermissionProfile>,
next_turn_is_first: bool,
}
@@ -52,7 +53,7 @@ impl SessionState {
auto_compact_window: AutoCompactWindow::new(),
startup_prewarm: None,
active_connector_selection: HashSet::new(),
pending_session_start_source: None,
pending_session_start_sources: VecDeque::new(),
granted_permissions: None,
next_turn_is_first: true,
}
@@ -218,17 +219,17 @@ impl SessionState {
self.active_connector_selection.clear();
}
pub(crate) fn set_pending_session_start_source(
pub(crate) fn queue_pending_session_start_source(
&mut self,
value: Option<codex_hooks::SessionStartSource>,
value: codex_hooks::SessionStartSource,
) {
self.pending_session_start_source = value;
self.pending_session_start_sources.push_back(value);
}
pub(crate) fn take_pending_session_start_source(
&mut self,
) -> Option<codex_hooks::SessionStartSource> {
self.pending_session_start_source.take()
self.pending_session_start_sources.pop_front()
}
pub(crate) fn record_granted_permissions(&mut self, permissions: AdditionalPermissionProfile) {

View File

@@ -6,6 +6,8 @@ use anyhow::Result;
use codex_core::config::Config;
use codex_core::config::Constrained;
use codex_features::Feature;
use codex_model_provider_info::ModelProviderInfo;
use codex_model_provider_info::built_in_model_providers;
use codex_plugin::PluginHookSource;
use codex_plugin::PluginId;
use codex_protocol::items::parse_hook_prompt_fragment;
@@ -26,11 +28,13 @@ use core_test_support::managed_network_requirements_loader;
use core_test_support::responses::ev_apply_patch_custom_tool_call;
use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_completed_with_tokens;
use core_test_support::responses::ev_custom_tool_call;
use core_test_support::responses::ev_function_call;
use core_test_support::responses::ev_message_item_added;
use core_test_support::responses::ev_output_text_delta;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::mount_compact_json_once;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::mount_sse_sequence;
use core_test_support::responses::sse;
@@ -74,6 +78,15 @@ fn network_workspace_write_profile() -> PermissionProfile {
)
}
fn non_openai_model_provider(server: &wiremock::MockServer) -> ModelProviderInfo {
let mut provider =
built_in_model_providers(/* openai_base_url */ /*openai_base_url*/ None)["openai"].clone();
provider.name = "OpenAI (test)".into();
provider.base_url = Some(format!("{}/v1", server.uri()));
provider.supports_websockets = false;
provider
}
fn trust_plugin_hooks(config: &mut Config, plugin_hook_sources: Vec<PluginHookSource>) {
if let Err(err) = config.features.enable(Feature::CodexHooks) {
panic!("test config should allow feature update: {err}");
@@ -229,6 +242,66 @@ if payload.get("prompt") == {blocked_prompt_json}:
Ok(())
}
fn write_session_start_and_user_prompt_submit_order_hooks(home: &Path) -> Result<()> {
let session_start_script_path = home.join("session_start_order_hook.py");
let user_prompt_submit_script_path = home.join("user_prompt_submit_order_hook.py");
let log_path = home.join("hook_order_log.jsonl");
let session_start_script = format!(
r#"import json
from pathlib import Path
import sys
payload = json.load(sys.stdin)
with Path(r"{log_path}").open("a", encoding="utf-8") as handle:
handle.write(json.dumps({{
"hook_event_name": payload.get("hook_event_name"),
"source": payload.get("source"),
}}) + "\n")
"#,
log_path = log_path.display(),
);
let user_prompt_submit_script = format!(
r#"import json
from pathlib import Path
import sys
payload = json.load(sys.stdin)
with Path(r"{log_path}").open("a", encoding="utf-8") as handle:
handle.write(json.dumps({{
"hook_event_name": payload.get("hook_event_name"),
"prompt": payload.get("prompt"),
}}) + "\n")
"#,
log_path = log_path.display(),
);
let hooks = serde_json::json!({
"hooks": {
"SessionStart": [{
"hooks": [{
"type": "command",
"command": format!("python3 {}", session_start_script_path.display()),
"statusMessage": "running session start order hook",
}]
}],
"UserPromptSubmit": [{
"hooks": [{
"type": "command",
"command": format!("python3 {}", user_prompt_submit_script_path.display()),
"statusMessage": "running user prompt submit order hook",
}]
}]
}
});
fs::write(&session_start_script_path, session_start_script)
.context("write session start order hook script")?;
fs::write(&user_prompt_submit_script_path, user_prompt_submit_script)
.context("write user prompt submit order hook script")?;
fs::write(home.join("hooks.json"), hooks.to_string()).context("write hooks.json")?;
Ok(())
}
fn write_pre_tool_use_hook(
home: &Path,
matcher: Option<&str>,
@@ -705,6 +778,109 @@ print(json.dumps({{
Ok(())
}
fn write_compact_session_start_hook_with_context(
home: &Path,
additional_context: &str,
) -> Result<()> {
let script_path = home.join("compact_session_start_hook.py");
let log_path = home.join("session_start_hook_log.jsonl");
let additional_context_json = serde_json::to_string(additional_context)
.context("serialize compact session start additional context for test")?;
let script = format!(
r#"import json
from pathlib import Path
import sys
payload = json.load(sys.stdin)
with Path(r"{log_path}").open("a", encoding="utf-8") as handle:
handle.write(json.dumps(payload) + "\n")
print(json.dumps({{
"hookSpecificOutput": {{
"hookEventName": "SessionStart",
"additionalContext": {additional_context_json}
}}
}}))
"#,
log_path = log_path.display(),
);
let hooks = serde_json::json!({
"hooks": {
"SessionStart": [{
"matcher": "compact",
"hooks": [{
"type": "command",
"command": format!("python3 {}", script_path.display()),
"statusMessage": "running compact session start hook",
}]
}]
}
});
fs::write(&script_path, script).context("write compact session start hook script")?;
fs::write(home.join("hooks.json"), hooks.to_string()).context("write hooks.json")?;
Ok(())
}
fn write_resume_and_compact_session_start_hook_with_context(
home: &Path,
resume_context: &str,
compact_context: &str,
) -> Result<()> {
let script_path = home.join("resume_and_compact_session_start_hook.py");
let log_path = home.join("session_start_hook_log.jsonl");
let resume_context_json = serde_json::to_string(resume_context)
.context("serialize resume session start additional context for test")?;
let compact_context_json = serde_json::to_string(compact_context)
.context("serialize compact session start additional context for test")?;
let script = format!(
r#"import json
from pathlib import Path
import sys
payload = json.load(sys.stdin)
with Path(r"{log_path}").open("a", encoding="utf-8") as handle:
handle.write(json.dumps(payload) + "\n")
contexts = {{
"resume": {resume_context_json},
"compact": {compact_context_json},
}}
print(json.dumps({{
"hookSpecificOutput": {{
"hookEventName": "SessionStart",
"additionalContext": contexts[payload["source"]]
}}
}}))
"#,
log_path = log_path.display(),
);
let hooks = serde_json::json!({
"hooks": {
"SessionStart": [{
"matcher": "resume",
"hooks": [{
"type": "command",
"command": format!("python3 {}", script_path.display()),
"statusMessage": "running resume session start hook",
}]
}, {
"matcher": "compact",
"hooks": [{
"type": "command",
"command": format!("python3 {}", script_path.display()),
"statusMessage": "running compact session start hook",
}]
}]
}
});
fs::write(&script_path, script)
.context("write resume and compact session start hook script")?;
fs::write(home.join("hooks.json"), hooks.to_string()).context("write hooks.json")?;
Ok(())
}
fn rollout_hook_prompt_texts(text: &str) -> Result<Vec<String>> {
let mut texts = Vec::new();
for line in text.lines() {
@@ -837,6 +1013,10 @@ fn read_user_prompt_submit_hook_inputs(home: &Path) -> Result<Vec<serde_json::Va
.collect()
}
fn read_hook_order_inputs(home: &Path) -> Result<Vec<serde_json::Value>> {
read_hook_inputs_from_log(home.join("hook_order_log.jsonl").as_path())
}
fn ev_message_item_done(id: &str, text: &str) -> Value {
serde_json::json!({
"type": "response.output_item.done",
@@ -1020,6 +1200,54 @@ async fn session_start_hook_sees_materialized_transcript_path() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn session_start_runs_before_user_prompt_submit_on_first_turn() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let _response = mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-1"),
ev_assistant_message("msg-1", "hello after hooks"),
ev_completed("resp-1"),
]),
)
.await;
let mut builder = test_codex()
.with_pre_build_hook(|home| {
if let Err(error) = write_session_start_and_user_prompt_submit_order_hooks(home) {
panic!("failed to write hook ordering fixtures: {error}");
}
})
.with_config(trust_discovered_hooks);
let test = builder.build(&server).await?;
test.submit_turn("hello").await?;
let hook_inputs = read_hook_order_inputs(test.codex_home_path())?;
assert_eq!(
hook_inputs
.iter()
.map(|input| input["hook_event_name"]
.as_str()
.expect("hook input event name"))
.collect::<Vec<_>>(),
vec!["SessionStart", "UserPromptSubmit"],
);
assert_eq!(
hook_inputs[0].get("source").and_then(Value::as_str),
Some("startup")
);
assert_eq!(
hook_inputs[1].get("prompt").and_then(Value::as_str),
Some("hello")
);
Ok(())
}
#[tokio::test]
async fn session_start_hook_spills_large_additional_context() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -1125,6 +1353,188 @@ async fn pre_tool_use_hook_spills_large_additional_context() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn compact_session_start_hook_records_additional_context_for_next_turn() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let request_log = mount_sse_sequence(
&server,
vec![
sse(vec![
ev_response_created("resp-1"),
ev_assistant_message("msg-1", "hello before compact"),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_assistant_message("msg-2", "summary after compact"),
ev_completed("resp-2"),
]),
sse(vec![
ev_response_created("resp-3"),
ev_assistant_message("msg-3", "hello after compact"),
ev_completed("resp-3"),
]),
],
)
.await;
let additional_context = "remember the compacted reef";
let model_provider = non_openai_model_provider(&server);
let mut builder = test_codex()
.with_pre_build_hook(move |home| {
if let Err(error) =
write_compact_session_start_hook_with_context(home, additional_context)
{
panic!("failed to write compact session start hook fixture: {error}");
}
})
.with_config(move |config| {
config.model_provider = model_provider;
trust_discovered_hooks(config);
});
let test = builder.build(&server).await?;
test.submit_turn("hello before compact").await?;
test.codex.submit(Op::Compact).await?;
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
test.submit_turn("hello after compact").await?;
let requests = request_log.requests();
assert_eq!(requests.len(), 3);
assert!(
!requests[0]
.message_input_texts("developer")
.iter()
.any(|message| message == additional_context),
"compact matcher should not run for initial startup",
);
assert!(
requests[2]
.message_input_texts("developer")
.iter()
.any(|message| message == additional_context),
"compact matcher should inject additional context before the next model turn",
);
let hook_inputs = read_session_start_hook_inputs(test.codex_home_path())?;
assert_eq!(hook_inputs.len(), 1);
assert_eq!(
hook_inputs[0].get("source").and_then(Value::as_str),
Some("compact")
);
Ok(())
}
#[tokio::test]
async fn resumed_thread_runs_resume_then_compact_session_start_hooks() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let limit = 200_000;
let over_limit_tokens = 250_000;
let remote_summary = "remote compact summary";
let resume_context = "remember the resumed reef";
let compact_context = "remember the compacted reef";
let compacted_history = vec![
ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: remote_summary.to_string(),
}],
phase: None,
},
ResponseItem::Compaction {
encrypted_content: "encrypted compact summary".to_string(),
},
];
let compact_mock =
mount_compact_json_once(&server, serde_json::json!({ "output": compacted_history })).await;
let mut builder = test_codex()
.with_pre_build_hook(move |home| {
if let Err(error) = write_resume_and_compact_session_start_hook_with_context(
home,
resume_context,
compact_context,
) {
panic!("failed to write resume/compact session start hook fixture: {error}");
}
})
.with_config(move |config| {
config.model_auto_compact_token_limit = Some(limit);
trust_discovered_hooks(config);
});
let initial = builder.build(&server).await?;
let home = initial.home.clone();
let rollout_path = initial
.session_configured
.rollout_path
.clone()
.context("rollout path")?;
mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-1"),
ev_assistant_message("msg-1", "hello before resume"),
ev_completed_with_tokens("resp-1", over_limit_tokens),
]),
)
.await;
initial.submit_turn("hello before resume").await?;
assert!(compact_mock.requests().is_empty());
let mut resume_builder = test_codex().with_config(move |config| {
config.model_auto_compact_token_limit = Some(limit);
trust_discovered_hooks(config);
});
let resumed = resume_builder.resume(&server, home, rollout_path).await?;
let follow_up = mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-2"),
ev_assistant_message("msg-2", "hello after resume"),
ev_completed("resp-2"),
]),
)
.await;
resumed.submit_turn("hello after resume").await?;
assert_eq!(compact_mock.requests().len(), 1);
let developer_messages = follow_up.single_request().message_input_texts("developer");
assert!(
developer_messages
.iter()
.any(|message| message == resume_context),
"resume matcher should inject additional context before the next model turn",
);
assert!(
developer_messages
.iter()
.any(|message| message == compact_context),
"compact matcher should inject additional context before the next model turn",
);
let hook_inputs = read_session_start_hook_inputs(resumed.codex_home_path())?;
assert_eq!(
hook_inputs
.iter()
.filter_map(|input| input.get("source").and_then(Value::as_str))
.collect::<Vec<_>>(),
vec!["resume", "compact"],
);
Ok(())
}
#[tokio::test]
async fn stop_hook_spills_large_continuation_prompt() -> Result<()> {
skip_if_no_network!(Ok(()));

View File

@@ -37,7 +37,8 @@
"enum": [
"startup",
"resume",
"clear"
"clear",
"compact"
],
"type": "string"
},

View File

@@ -24,6 +24,7 @@ pub enum SessionStartSource {
Startup,
Resume,
Clear,
Compact,
}
impl SessionStartSource {
@@ -32,6 +33,7 @@ impl SessionStartSource {
Self::Startup => "startup",
Self::Resume => "resume",
Self::Clear => "clear",
Self::Compact => "compact",
}
}
}

View File

@@ -673,7 +673,7 @@ fn permission_mode_schema(_gen: &mut SchemaGenerator) -> Schema {
}
fn session_start_source_schema(_gen: &mut SchemaGenerator) -> Schema {
string_enum_schema(&["startup", "resume", "clear"])
string_enum_schema(&["startup", "resume", "clear", "compact"])
}
fn compaction_trigger_schema(_gen: &mut SchemaGenerator) -> Schema {