start of hooks engine (#13276)

(Experimental)

This PR adds a first MVP for hooks, with SessionStart and Stop

The core design is:

- hooks live in a dedicated engine under codex-rs/hooks
- each hook type has its own event-specific file
- hook execution is synchronous and blocks normal turn progression while
running
- matching hooks run in parallel, then their results are aggregated into
a normalized HookRunSummary

On the AppServer side, hooks are exposed as operational metadata rather
than transcript-native items:

- new live notifications: hook/started, hook/completed
- persisted/replayed hook results live on Turn.hookRuns
- we intentionally did not add hook-specific ThreadItem variants

Hooks messages are not persisted, they remain ephemeral. The context
changes they add are (they get appended to the user's prompt)
This commit is contained in:
Andrei Eternal
2026-03-09 21:11:31 -07:00
committed by GitHub
parent da616136cc
commit 244b2d53f4
73 changed files with 4791 additions and 483 deletions

View File

@@ -1560,6 +1560,25 @@ impl Session {
(None, None)
};
let mut hook_shell_argv = default_shell.derive_exec_args("", false);
let hook_shell_program = hook_shell_argv.remove(0);
let _ = hook_shell_argv.pop();
let hooks = Hooks::new(HooksConfig {
legacy_notify_argv: config.notify.clone(),
feature_enabled: config.features.enabled(Feature::CodexHooks),
config_layer_stack: Some(config.config_layer_stack.clone()),
shell_program: Some(hook_shell_program),
shell_args: hook_shell_argv,
});
for warning in hooks.startup_warnings() {
post_session_configured_events.push(Event {
id: INITIAL_SUBMIT_ID.to_owned(),
msg: EventMsg::Warning(WarningEvent {
message: warning.clone(),
}),
});
}
let services = SessionServices {
// Initialize the MCP connection manager with an uninitialized
// instance. It will be replaced with one created via
@@ -1581,9 +1600,7 @@ impl Session {
Arc::clone(&config),
Arc::clone(&auth_manager),
),
hooks: Hooks::new(HooksConfig {
legacy_notify_argv: config.notify.clone(),
}),
hooks,
rollout: Mutex::new(rollout_recorder),
user_shell: Arc::new(default_shell),
shell_snapshot_tx,
@@ -1730,9 +1747,19 @@ impl Session {
}
sess.schedule_startup_prewarm(session_configuration.base_instructions.clone())
.await;
let session_start_source = match &initial_history {
InitialHistory::Resumed(_) => codex_hooks::SessionStartSource::Resume,
InitialHistory::New | InitialHistory::Forked(_) => {
codex_hooks::SessionStartSource::Startup
}
};
// record_initial_history can emit events. We record only after the SessionConfiguredEvent is emitted.
sess.record_initial_history(initial_history).await;
{
let mut state = sess.state.lock().await;
state.set_pending_session_start_source(Some(session_start_source));
}
memories::start_memories_startup_task(
&sess,
@@ -3838,6 +3865,21 @@ impl Session {
Arc::clone(&self.services.user_shell)
}
pub(crate) async fn current_rollout_path(&self) -> Option<PathBuf> {
let recorder = {
let guard = self.services.rollout.lock().await;
guard.clone()
};
recorder.map(|recorder| recorder.rollout_path().to_path_buf())
}
pub(crate) async fn take_pending_session_start_source(
&self,
) -> Option<codex_hooks::SessionStartSource> {
let mut state = self.state.lock().await;
state.take_pending_session_start_source()
}
async fn refresh_mcp_servers_inner(
&self,
turn_context: &TurnContext,
@@ -5467,6 +5509,8 @@ pub(crate) async fn run_turn(
sess.maybe_start_ghost_snapshot(Arc::clone(&turn_context), cancellation_token.child_token())
.await;
let mut last_agent_message: Option<String> = None;
let mut stop_hook_active = false;
let mut pending_stop_hook_message: Option<String> = None;
// Although from the perspective of codex.rs, TurnDiffTracker has the lifecycle of a Task which contains
// many turns, from the perspective of the user, it is a single turn.
let turn_diff_tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new()));
@@ -5478,6 +5522,55 @@ pub(crate) async fn run_turn(
prewarmed_client_session.unwrap_or_else(|| sess.services.model_client.new_session());
loop {
if let Some(session_start_source) = sess.take_pending_session_start_source().await {
let session_start_permission_mode = match turn_context.approval_policy.value() {
AskForApproval::Never => "bypassPermissions",
AskForApproval::UnlessTrusted
| AskForApproval::OnFailure
| AskForApproval::OnRequest
| AskForApproval::Reject(_) => "default",
}
.to_string();
let session_start_request = codex_hooks::SessionStartRequest {
session_id: sess.conversation_id,
cwd: turn_context.cwd.clone(),
transcript_path: sess.current_rollout_path().await,
model: turn_context.model_info.slug.clone(),
permission_mode: session_start_permission_mode,
source: session_start_source,
};
for run in sess.hooks().preview_session_start(&session_start_request) {
sess.send_event(
&turn_context,
EventMsg::HookStarted(crate::protocol::HookStartedEvent {
turn_id: Some(turn_context.sub_id.clone()),
run,
}),
)
.await;
}
let session_start_outcome = sess
.hooks()
.run_session_start(session_start_request, Some(turn_context.sub_id.clone()))
.await;
for completed in session_start_outcome.hook_events {
sess.send_event(&turn_context, EventMsg::HookCompleted(completed))
.await;
}
if session_start_outcome.should_stop {
break;
}
if let Some(additional_context) = session_start_outcome.additional_context {
let developer_message: ResponseItem =
DeveloperInstructions::new(additional_context).into();
sess.record_conversation_items(
&turn_context,
std::slice::from_ref(&developer_message),
)
.await;
}
}
// Note that pending_input would be something like a message the user
// submitted through the UI while the model was running. Though the UI
// may support this, the model might not.
@@ -5509,11 +5602,14 @@ pub(crate) async fn run_turn(
}
// Construct the input that we will send to the model.
let sampling_request_input: Vec<ResponseItem> = {
let mut sampling_request_input: Vec<ResponseItem> = {
sess.clone_history()
.await
.for_prompt(&turn_context.model_info.input_modalities)
};
if let Some(stop_hook_message) = pending_stop_hook_message.take() {
sampling_request_input.push(DeveloperInstructions::new(stop_hook_message).into());
}
let sampling_request_input_messages = sampling_request_input
.iter()
@@ -5576,6 +5672,57 @@ pub(crate) async fn run_turn(
if !needs_follow_up {
last_agent_message = sampling_request_last_agent_message;
let stop_hook_permission_mode = match turn_context.approval_policy.value() {
AskForApproval::Never => "bypassPermissions",
AskForApproval::UnlessTrusted
| AskForApproval::OnFailure
| AskForApproval::OnRequest
| AskForApproval::Reject(_) => "default",
}
.to_string();
let stop_request = codex_hooks::StopRequest {
session_id: sess.conversation_id,
turn_id: turn_context.sub_id.clone(),
cwd: turn_context.cwd.clone(),
transcript_path: sess.current_rollout_path().await,
model: turn_context.model_info.slug.clone(),
permission_mode: stop_hook_permission_mode,
stop_hook_active,
last_assistant_message: last_agent_message.clone(),
};
for run in sess.hooks().preview_stop(&stop_request) {
sess.send_event(
&turn_context,
EventMsg::HookStarted(crate::protocol::HookStartedEvent {
turn_id: Some(turn_context.sub_id.clone()),
run,
}),
)
.await;
}
let stop_outcome = sess.hooks().run_stop(stop_request).await;
for completed in stop_outcome.hook_events {
sess.send_event(&turn_context, EventMsg::HookCompleted(completed))
.await;
}
if stop_outcome.should_block {
if stop_hook_active {
sess.send_event(
&turn_context,
EventMsg::Warning(WarningEvent {
message: "Stop hook blocked twice in the same turn; ignoring the second block to avoid an infinite loop.".to_string(),
}),
)
.await;
} else {
stop_hook_active = true;
pending_stop_hook_message = stop_outcome.block_message_for_model;
continue;
}
}
if stop_outcome.should_stop {
break;
}
let hook_outcomes = sess
.hooks()
.dispatch(HookPayload {
@@ -6392,6 +6539,8 @@ fn realtime_text_for_event(msg: &EventMsg) -> Option<String> {
| EventMsg::ExitedReviewMode(_)
| EventMsg::RawResponseItem(_)
| EventMsg::ItemStarted(_)
| EventMsg::HookStarted(_)
| EventMsg::HookCompleted(_)
| EventMsg::AgentMessageContentDelta(_)
| EventMsg::PlanDelta(_)
| EventMsg::ReasoningContentDelta(_)