Add SubagentStop hook

This commit is contained in:
Abhinav Vedmala
2026-05-15 18:01:57 +00:00
parent bd8ee83848
commit 48110f91dc
43 changed files with 853 additions and 84 deletions

View File

@@ -1,9 +1,19 @@
use anyhow::Result;
use codex_core::StartThreadOptions;
use codex_core::ThreadConfigSnapshot;
use codex_core::config::AgentRoleConfig;
use codex_features::Feature;
use codex_protocol::ThreadId;
use codex_protocol::models::PermissionProfile;
use codex_protocol::openai_models::ReasoningEffort;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::user_input::UserInput;
use core_test_support::hooks::trust_discovered_hooks;
use core_test_support::responses::ResponsesRequest;
use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
@@ -17,6 +27,8 @@ use core_test_support::responses::start_mock_server;
use core_test_support::skip_if_no_network;
use core_test_support::test_codex::TestCodex;
use core_test_support::test_codex::test_codex;
use core_test_support::test_codex::turn_permission_fields;
use core_test_support::wait_for_event_match;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::fs;
@@ -38,6 +50,8 @@ const REQUESTED_REASONING_EFFORT: ReasoningEffort = ReasoningEffort::Low;
const ROLE_MODEL: &str = "gpt-5.4";
const ROLE_REASONING_EFFORT: ReasoningEffort = ReasoningEffort::High;
const SUBAGENT_START_CONTEXT: &str = "subagent start context reaches child";
const SUBAGENT_STOP_CONTINUATION: &str = "continue only the child";
const INTERNAL_SUBAGENT_PROMPT: &str = "internal subagent: review";
fn body_contains(req: &wiremock::Request, text: &str) -> bool {
let is_zstd = req
@@ -111,7 +125,11 @@ fn write_home_skill(codex_home: &Path, dir: &str, name: &str, description: &str)
Ok(())
}
fn write_subagent_start_hooks(home: &Path) -> Result<()> {
fn write_subagent_lifecycle_hooks(
home: &Path,
stop_prompts: &[&str],
subagent_stop_matcher: &str,
) -> Result<()> {
let session_start_script_path = home.join("session_start_hook.py");
let session_start_log_path = home.join("session_start_hook_log.jsonl");
let session_start_script = format!(
@@ -143,6 +161,51 @@ print(json.dumps({{"hookSpecificOutput": {{"hookEventName": "SubagentStart", "ad
start_log_path = start_log_path.display(),
);
let subagent_stop_script_path = home.join("subagent_stop_hook.py");
let subagent_stop_log_path = home.join("subagent_stop_hook_log.jsonl");
let prompts_json = serde_json::to_string(stop_prompts)?;
let subagent_stop_script = format!(
r#"import json
from pathlib import Path
import sys
log_path = Path(r"{subagent_stop_log_path}")
block_prompts = {prompts_json}
payload = json.load(sys.stdin)
existing = []
if log_path.exists():
existing = [line for line in log_path.read_text(encoding="utf-8").splitlines() if line.strip()]
with log_path.open("a", encoding="utf-8") as handle:
handle.write(json.dumps(payload) + "\n")
invocation_index = len(existing)
if invocation_index < len(block_prompts):
print(json.dumps({{"decision": "block", "reason": block_prompts[invocation_index]}}))
else:
print(json.dumps({{"systemMessage": f"subagent stop pass {{invocation_index + 1}} complete"}}))
"#,
subagent_stop_log_path = subagent_stop_log_path.display(),
prompts_json = prompts_json,
);
let stop_script_path = home.join("stop_hook.py");
let stop_log_path = home.join("stop_hook_log.jsonl");
let stop_script = format!(
r#"import json
from pathlib import Path
import sys
log_path = Path(r"{stop_log_path}")
payload = json.load(sys.stdin)
with log_path.open("a", encoding="utf-8") as handle:
handle.write(json.dumps(payload) + "\n")
print(json.dumps({{"systemMessage": "root stop complete"}}))
"#,
stop_log_path = stop_log_path.display(),
);
let hooks = serde_json::json!({
"hooks": {
"SessionStart": [{
@@ -158,12 +221,27 @@ print(json.dumps({{"hookSpecificOutput": {{"hookEventName": "SubagentStart", "ad
"type": "command",
"command": format!("python3 {}", start_script_path.display()),
}]
}],
"SubagentStop": [{
"matcher": subagent_stop_matcher,
"hooks": [{
"type": "command",
"command": format!("python3 {}", subagent_stop_script_path.display()),
}]
}],
"Stop": [{
"hooks": [{
"type": "command",
"command": format!("python3 {}", stop_script_path.display()),
}]
}]
}
});
fs::write(&session_start_script_path, session_start_script)?;
fs::write(&start_script_path, start_script)?;
fs::write(&subagent_stop_script_path, subagent_stop_script)?;
fs::write(&stop_script_path, stop_script)?;
fs::write(home.join("hooks.json"), hooks.to_string())?;
Ok(())
}
@@ -180,6 +258,27 @@ fn read_hook_log(home: &Path, filename: &str) -> Result<Vec<serde_json::Value>>
.collect()
}
async fn wait_for_hook_log_entries(
home: &Path,
filename: &str,
expected_len: usize,
) -> Result<Vec<serde_json::Value>> {
let deadline = Instant::now() + Duration::from_secs(2);
loop {
let entries = read_hook_log(home, filename)?;
if entries.len() >= expected_len {
return Ok(entries);
}
if Instant::now() >= deadline {
anyhow::bail!(
"expected at least {expected_len} entries in {filename}, got {}",
entries.len()
);
}
sleep(Duration::from_millis(10)).await;
}
}
async fn wait_for_spawned_thread_id(test: &TestCodex) -> Result<String> {
let deadline = Instant::now() + Duration::from_secs(2);
loop {
@@ -400,12 +499,12 @@ async fn subagent_start_replaces_session_start_and_injects_context() -> Result<(
let test = test_codex()
.with_pre_build_hook(|home| {
if let Err(error) = write_subagent_start_hooks(home) {
if let Err(error) = write_subagent_lifecycle_hooks(home, &[], "worker") {
panic!("failed to write subagent hook fixture: {error}");
}
})
.with_config(|config| {
core_test_support::hooks::trust_discovered_hooks(config);
trust_discovered_hooks(config);
config
.features
.enable(Feature::Collab)
@@ -439,6 +538,207 @@ async fn subagent_start_replaces_session_start_and_injects_context() -> Result<(
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn subagent_stop_replaces_stop_and_skips_internal_subagents() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let spawn_args = serde_json::to_string(&json!({
"message": CHILD_PROMPT,
"task_name": "child",
"agent_type": "worker",
}))?;
mount_sse_once_match(
&server,
|req: &wiremock::Request| body_contains(req, TURN_1_PROMPT),
sse(vec![
ev_response_created("resp-turn1-1"),
ev_function_call(SPAWN_CALL_ID, "spawn_agent", &spawn_args),
ev_completed("resp-turn1-1"),
]),
)
.await;
let first_child_request = mount_sse_once_match(
&server,
|req: &wiremock::Request| {
body_contains(req, CHILD_PROMPT) && !body_contains(req, SPAWN_CALL_ID)
},
sse(vec![
ev_response_created("resp-child-1"),
ev_assistant_message("msg-child-1", "child done first"),
ev_completed("resp-child-1"),
]),
)
.await;
let second_child_request = mount_sse_once_match(
&server,
|req: &wiremock::Request| {
body_contains(req, SUBAGENT_STOP_CONTINUATION) && !body_contains(req, SPAWN_CALL_ID)
},
sse(vec![
ev_response_created("resp-child-2"),
ev_assistant_message("msg-child-2", "child done final"),
ev_completed("resp-child-2"),
]),
)
.await;
let _turn1_followup = mount_sse_once_match(
&server,
|req: &wiremock::Request| body_contains(req, SPAWN_CALL_ID),
sse(vec![
ev_response_created("resp-turn1-2"),
ev_assistant_message("msg-turn1-2", "parent done"),
ev_completed("resp-turn1-2"),
]),
)
.await;
let internal_request = mount_sse_once_match(
&server,
|req: &wiremock::Request| body_contains(req, INTERNAL_SUBAGENT_PROMPT),
sse(vec![
ev_response_created("resp-internal-1"),
ev_assistant_message("msg-internal-1", "internal subagent done"),
ev_completed("resp-internal-1"),
]),
)
.await;
let test = test_codex()
.with_pre_build_hook(|home| {
if let Err(error) =
write_subagent_lifecycle_hooks(home, &[SUBAGENT_STOP_CONTINUATION], "")
{
panic!("failed to write subagent hook fixture: {error}");
}
})
.with_config(|config| {
trust_discovered_hooks(config);
config
.features
.enable(Feature::Collab)
.expect("test config should allow feature update");
})
.build(&server)
.await?;
test.submit_turn(TURN_1_PROMPT).await?;
let _ = wait_for_requests(&first_child_request).await?;
let _ = wait_for_requests(&second_child_request).await?;
let subagent_stop_inputs = wait_for_hook_log_entries(
test.codex_home_path(),
"subagent_stop_hook_log.jsonl",
/*expected_len*/ 2,
)
.await?;
assert_eq!(subagent_stop_inputs.len(), 2);
assert_eq!(
subagent_stop_inputs
.iter()
.map(|input| input["stop_hook_active"].as_bool())
.collect::<Vec<_>>(),
vec![Some(false), Some(true)]
);
assert_eq!(
subagent_stop_inputs[0]["agent_type"].as_str(),
Some("worker")
);
let parent_transcript_path = subagent_stop_inputs[0]["transcript_path"]
.as_str()
.expect("SubagentStop should include parent transcript_path");
let agent_transcript_path = subagent_stop_inputs[0]["agent_transcript_path"]
.as_str()
.expect("SubagentStop should include agent_transcript_path");
assert_ne!(parent_transcript_path, agent_transcript_path);
assert_eq!(
subagent_stop_inputs[1]["transcript_path"].as_str(),
Some(parent_transcript_path)
);
assert_eq!(
subagent_stop_inputs[1]["agent_transcript_path"].as_str(),
Some(agent_transcript_path)
);
assert_eq!(
subagent_stop_inputs[0]["last_assistant_message"].as_str(),
Some("child done first")
);
let stop_inputs = read_hook_log(test.codex_home_path(), "stop_hook_log.jsonl")?;
assert!(
stop_inputs
.iter()
.all(|input| input["last_assistant_message"].as_str() != Some("child done first")),
"child completion should not invoke the normal Stop hook"
);
let stop_input_count = stop_inputs.len();
// This matcher would catch the old synthetic "review" SubagentStop target
// because the SubagentStop hook above intentionally matches all agent types.
let internal_thread = test
.thread_manager
.start_thread_with_options(StartThreadOptions {
config: test.config.clone(),
initial_history: InitialHistory::New,
session_source: Some(SessionSource::SubAgent(SubAgentSource::Review)),
thread_source: None,
dynamic_tools: Vec::new(),
persist_extended_history: false,
metrics_service_name: None,
parent_trace: None,
environments: Vec::new(),
})
.await?;
let (sandbox_policy, permission_profile) =
turn_permission_fields(PermissionProfile::Disabled, test.cwd_path());
internal_thread
.thread
.submit(Op::UserTurn {
environments: None,
items: vec![UserInput::Text {
text: INTERNAL_SUBAGENT_PROMPT.to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: test.config.cwd.to_path_buf(),
approval_policy: AskForApproval::Never,
approvals_reviewer: None,
sandbox_policy,
permission_profile,
model: internal_thread.session_configured.model.clone(),
effort: None,
summary: None,
service_tier: None,
collaboration_mode: None,
personality: None,
})
.await?;
let turn_id = wait_for_event_match(internal_thread.thread.as_ref(), |event| match event {
EventMsg::TurnStarted(event) => Some(event.turn_id.clone()),
_ => None,
})
.await;
wait_for_event_match(internal_thread.thread.as_ref(), |event| match event {
EventMsg::TurnComplete(event) if event.turn_id == turn_id => Some(()),
_ => None,
})
.await;
let requests = wait_for_requests(&internal_request).await?;
assert_eq!(requests.len(), 1);
let subagent_stop_inputs_after_internal =
read_hook_log(test.codex_home_path(), "subagent_stop_hook_log.jsonl")?;
assert_eq!(subagent_stop_inputs_after_internal, subagent_stop_inputs);
let stop_inputs_after_internal = read_hook_log(test.codex_home_path(), "stop_hook_log.jsonl")?;
assert_eq!(stop_inputs_after_internal.len(), stop_input_count);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn subagent_notification_is_included_without_wait() -> Result<()> {
skip_if_no_network!(Ok(()));