[hooks] simplify pending-input cleanup

This commit is contained in:
Andrei Eternal
2026-04-29 21:22:16 -07:00
parent f7c60411db
commit 4ed64b48ad
7 changed files with 292 additions and 210 deletions

View File

@@ -45,14 +45,14 @@ pub(crate) struct HookRuntimeOutcome {
pub additional_contexts: Vec<String>,
}
pub(crate) enum InputItemHookDisposition {
Accepted(Box<AcceptedInputItem>),
pub(crate) enum PendingInputHookDisposition {
Accepted(Box<PendingInputRecord>),
Blocked { additional_contexts: Vec<String> },
}
pub(crate) enum AcceptedInputItem {
pub(crate) enum PendingInputRecord {
UserMessage {
// Fresh-turn user input still carries UI-only text spans. Preserve that
// Queued user input still carries UI-only text spans. Preserve that
// payload when the caller has it instead of rebuilding from model input.
content: Vec<UserInput>,
response_item: ResponseItem,
@@ -278,61 +278,63 @@ pub(crate) async fn run_user_prompt_submit_hooks(
.await
}
pub(crate) async fn inspect_input_item(
pub(crate) async fn inspect_pending_input(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
input_item: PendingInputItem,
) -> InputItemHookDisposition {
let input_item = match input_item {
pending_input_item: PendingInputItem,
) -> PendingInputHookDisposition {
let pending_input_item = match pending_input_item {
PendingInputItem::UserInput(content) => {
let prompt = UserMessageItem::new(&content).message();
let response_item = ResponseItem::from(ResponseInputItem::from(content.clone()));
let user_prompt_submit_outcome =
run_user_prompt_submit_hooks(sess, turn_context, prompt).await;
if user_prompt_submit_outcome.should_stop {
return InputItemHookDisposition::Blocked {
return PendingInputHookDisposition::Blocked {
additional_contexts: user_prompt_submit_outcome.additional_contexts,
};
}
return InputItemHookDisposition::Accepted(Box::new(AcceptedInputItem::UserMessage {
content,
response_item,
additional_contexts: user_prompt_submit_outcome.additional_contexts,
}));
return PendingInputHookDisposition::Accepted(Box::new(
PendingInputRecord::UserMessage {
content,
response_item,
additional_contexts: user_prompt_submit_outcome.additional_contexts,
},
));
}
PendingInputItem::ResponseInput(input_item) => input_item,
};
let response_item = ResponseItem::from(input_item);
let response_item = ResponseItem::from(pending_input_item);
if let Some(TurnItem::UserMessage(user_message)) = parse_turn_item(&response_item) {
let user_prompt_submit_outcome =
run_user_prompt_submit_hooks(sess, turn_context, user_message.message()).await;
if user_prompt_submit_outcome.should_stop {
InputItemHookDisposition::Blocked {
PendingInputHookDisposition::Blocked {
additional_contexts: user_prompt_submit_outcome.additional_contexts,
}
} else {
InputItemHookDisposition::Accepted(Box::new(AcceptedInputItem::UserMessage {
PendingInputHookDisposition::Accepted(Box::new(PendingInputRecord::UserMessage {
content: user_message.content,
response_item,
additional_contexts: user_prompt_submit_outcome.additional_contexts,
}))
}
} else {
InputItemHookDisposition::Accepted(Box::new(AcceptedInputItem::ConversationItem {
PendingInputHookDisposition::Accepted(Box::new(PendingInputRecord::ConversationItem {
response_item,
}))
}
}
pub(crate) async fn record_input_item(
pub(crate) async fn record_pending_input(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
input_item: AcceptedInputItem,
pending_input: PendingInputRecord,
) {
match input_item {
AcceptedInputItem::UserMessage {
match pending_input {
PendingInputRecord::UserMessage {
content,
response_item,
additional_contexts,
@@ -345,7 +347,7 @@ pub(crate) async fn record_input_item(
.await;
record_additional_contexts(sess, turn_context, additional_contexts).await;
}
AcceptedInputItem::ConversationItem { response_item } => {
PendingInputRecord::ConversationItem { response_item } => {
sess.record_conversation_items(turn_context, std::slice::from_ref(&response_item))
.await;
}

View File

@@ -2998,7 +2998,7 @@ impl Session {
}
let mut turn_state = active_turn.turn_state.lock().await;
turn_state.push_pending_user_input(input);
turn_state.push_pending_input_item(PendingInputItem::UserInput(input));
turn_state.accept_mailbox_delivery_for_current_turn();
Ok(active_turn_id.clone())
}
@@ -3120,7 +3120,10 @@ impl Session {
self.take_pending_input_items()
.await
.into_iter()
.map(PendingInputItem::into_response_input_item)
.map(|item| match item {
PendingInputItem::UserInput(input) => input.into(),
PendingInputItem::ResponseInput(input) => input,
})
.collect()
}

View File

@@ -19,12 +19,13 @@ use crate::compact_remote::run_inline_remote_auto_compact_task;
use crate::connectors;
use crate::context::ContextualUserFragment;
use crate::feedback_tags;
use crate::hook_runtime::InputItemHookDisposition;
use crate::hook_runtime::PendingInputHookDisposition;
use crate::hook_runtime::emit_hook_completed_events;
use crate::hook_runtime::inspect_input_item;
use crate::hook_runtime::inspect_pending_input;
use crate::hook_runtime::record_additional_contexts;
use crate::hook_runtime::record_input_item;
use crate::hook_runtime::record_pending_input;
use crate::hook_runtime::run_pending_session_start_hooks;
use crate::hook_runtime::run_user_prompt_submit_hooks;
use crate::injection::ToolMentionKind;
use crate::injection::app_id_from_path;
use crate::injection::tool_kind_for_path;
@@ -41,7 +42,6 @@ use crate::resolve_skill_dependencies_for_turn;
use crate::session::PreviousTurnSettings;
use crate::session::session::Session;
use crate::session::turn_context::TurnContext;
use crate::state::PendingInputItem;
use crate::stream_events_utils::HandleOutputCtx;
use crate::stream_events_utils::handle_non_tool_response_item;
use crate::stream_events_utils::handle_output_item_done;
@@ -76,6 +76,7 @@ use codex_protocol::error::CodexErr;
use codex_protocol::error::Result as CodexResult;
use codex_protocol::items::PlanItem;
use codex_protocol::items::TurnItem;
use codex_protocol::items::UserMessageItem;
use codex_protocol::items::build_hook_prompt_message;
use codex_protocol::models::BaseInstructions;
use codex_protocol::models::ContentItem;
@@ -300,25 +301,30 @@ pub(crate) async fn run_turn(
if run_pending_session_start_hooks(&sess, &turn_context).await {
return None;
}
if !input.is_empty() {
match inspect_input_item(
let additional_contexts = if input.is_empty() {
Vec::new()
} else {
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input.clone());
let response_item: ResponseItem = initial_input_for_turn.into();
let user_prompt_submit_outcome = run_user_prompt_submit_hooks(
&sess,
&turn_context,
PendingInputItem::UserInput(input.clone()),
UserMessageItem::new(&input).message(),
)
.await
{
InputItemHookDisposition::Accepted(input_item) => {
record_input_item(&sess, &turn_context, *input_item).await;
}
InputItemHookDisposition::Blocked {
additional_contexts,
} => {
record_additional_contexts(&sess, &turn_context, additional_contexts).await;
return None;
}
.await;
if user_prompt_submit_outcome.should_stop {
record_additional_contexts(
&sess,
&turn_context,
user_prompt_submit_outcome.additional_contexts,
)
.await;
return None;
}
}
sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), &input, response_item)
.await;
user_prompt_submit_outcome.additional_contexts
};
sess.services
.analytics_events_client
.track_app_mentioned(tracking.clone(), mentioned_app_invocations);
@@ -329,6 +335,7 @@ pub(crate) async fn run_turn(
}
sess.merge_connector_selection(explicitly_enabled_connectors.clone())
.await;
record_additional_contexts(&sess, &turn_context, additional_contexts).await;
if !input.is_empty() {
// Track the previous-turn baseline from the regular user-turn path only so
// standalone tasks (compact/shell/review/undo) cannot suppress future
@@ -384,11 +391,11 @@ pub(crate) async fn run_turn(
if !pending_input.is_empty() {
let mut pending_input_iter = pending_input.into_iter();
while let Some(pending_input_item) = pending_input_iter.next() {
match inspect_input_item(&sess, &turn_context, pending_input_item).await {
InputItemHookDisposition::Accepted(pending_input) => {
match inspect_pending_input(&sess, &turn_context, pending_input_item).await {
PendingInputHookDisposition::Accepted(pending_input) => {
accepted_pending_input.push(*pending_input);
}
InputItemHookDisposition::Blocked {
PendingInputHookDisposition::Blocked {
additional_contexts,
} => {
let remaining_pending_input = pending_input_iter.collect::<Vec<_>>();
@@ -408,7 +415,7 @@ pub(crate) async fn run_turn(
let has_accepted_pending_input = !accepted_pending_input.is_empty();
for pending_input in accepted_pending_input {
record_input_item(&sess, &turn_context, pending_input).await;
record_pending_input(&sess, &turn_context, pending_input).await;
}
record_additional_contexts(&sess, &turn_context, blocked_pending_input_contexts).await;

View File

@@ -91,16 +91,6 @@ pub(crate) enum PendingInputItem {
ResponseInput(ResponseInputItem),
}
impl PendingInputItem {
#[cfg(test)]
pub(crate) fn into_response_input_item(self) -> ResponseInputItem {
match self {
Self::UserInput(input) => input.into(),
Self::ResponseInput(input) => input,
}
}
}
impl ActiveTurn {
pub(crate) fn add_task(&mut self, task: RunningTask) {
let sub_id = task.turn_context.sub_id.clone();
@@ -240,10 +230,6 @@ impl TurnState {
.push(PendingInputItem::ResponseInput(input));
}
pub(crate) fn push_pending_user_input(&mut self, input: Vec<UserInput>) {
self.pending_input.push(PendingInputItem::UserInput(input));
}
pub(crate) fn push_pending_input_item(&mut self, input: PendingInputItem) {
self.pending_input.push(input);
}
@@ -271,7 +257,10 @@ impl TurnState {
pub(crate) fn take_pending_input(&mut self) -> Vec<ResponseInputItem> {
self.take_pending_input_items()
.into_iter()
.map(PendingInputItem::into_response_input_item)
.map(|item| match item {
PendingInputItem::UserInput(input) => input.into(),
PendingInputItem::ResponseInput(input) => input,
})
.collect()
}

View File

@@ -23,10 +23,10 @@ use tracing::warn;
use crate::config::Config;
use crate::context::ContextualUserFragment;
use crate::goals::GoalRuntimeEvent;
use crate::hook_runtime::InputItemHookDisposition;
use crate::hook_runtime::inspect_input_item;
use crate::hook_runtime::PendingInputHookDisposition;
use crate::hook_runtime::inspect_pending_input;
use crate::hook_runtime::record_additional_contexts;
use crate::hook_runtime::record_input_item;
use crate::hook_runtime::record_pending_input;
use crate::session::session::Session;
use crate::session::turn_context::TurnContext;
use crate::state::ActiveTurn;
@@ -594,11 +594,11 @@ impl Session {
}
if !pending_input.is_empty() {
for pending_input_item in pending_input {
match inspect_input_item(self, &turn_context, pending_input_item).await {
InputItemHookDisposition::Accepted(pending_input) => {
record_input_item(self, &turn_context, *pending_input).await;
match inspect_pending_input(self, &turn_context, pending_input_item).await {
PendingInputHookDisposition::Accepted(pending_input) => {
record_pending_input(self, &turn_context, *pending_input).await;
}
InputItemHookDisposition::Blocked {
PendingInputHookDisposition::Blocked {
additional_contexts,
} => {
record_additional_contexts(self, &turn_context, additional_contexts).await;

View File

@@ -1349,146 +1349,6 @@ async fn blocked_queued_prompt_does_not_strand_earlier_accepted_prompt() -> Resu
Ok(())
}
#[tokio::test]
async fn queued_hook_prompt_xml_text_still_runs_user_prompt_submit() -> Result<()> {
skip_if_no_network!(Ok(()));
let (gate_completed_tx, gate_completed_rx) = oneshot::channel();
let first_chunks = vec![
StreamingSseChunk {
gate: None,
body: sse_event(ev_response_created("resp-1")),
},
StreamingSseChunk {
gate: None,
body: sse_event(ev_message_item_added("msg-1", "")),
},
StreamingSseChunk {
gate: None,
body: sse_event(ev_output_text_delta("first ")),
},
StreamingSseChunk {
gate: None,
body: sse_event(ev_message_item_done("msg-1", "first response")),
},
StreamingSseChunk {
gate: Some(gate_completed_rx),
body: sse_event(ev_completed("resp-1")),
},
];
let second_chunks = vec![StreamingSseChunk {
gate: None,
body: sse(vec![
ev_response_created("resp-2"),
ev_assistant_message("msg-2", "accepted queued prompt handled"),
ev_completed("resp-2"),
]),
}];
let (server, _completions) =
start_streaming_sse_server(vec![first_chunks, second_chunks]).await;
let mut builder = test_codex()
.with_model("gpt-5.4")
.with_pre_build_hook(|home| {
if let Err(error) = write_user_prompt_submit_hook(
home,
HOOK_PROMPT_XML_USER_TEXT,
BLOCKED_PROMPT_CONTEXT,
) {
panic!("failed to write user prompt submit hook test fixture: {error}");
}
})
.with_config(|config| {
config
.features
.enable(Feature::CodexHooks)
.expect("test config should allow feature update");
});
let test = builder.build_with_streaming_server(&server).await?;
test.codex
.submit(Op::UserInput {
environments: None,
items: vec![UserInput::Text {
text: "initial prompt".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
})
.await?;
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::AgentMessageContentDelta(_))
})
.await;
for text in ["accepted queued prompt", HOOK_PROMPT_XML_USER_TEXT] {
test.codex
.submit(Op::UserInput {
environments: None,
items: vec![UserInput::Text {
text: text.to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
})
.await?;
}
sleep(Duration::from_millis(100)).await;
let _ = gate_completed_tx.send(());
let requests = tokio::time::timeout(Duration::from_secs(30), async {
loop {
let requests = server.requests().await;
if requests.len() >= 2 {
break requests;
}
sleep(Duration::from_millis(50)).await;
}
})
.await
.expect("second request should arrive")
.into_iter()
.collect::<Vec<_>>();
sleep(Duration::from_millis(100)).await;
assert_eq!(requests.len(), 2);
let second_user_texts = request_message_input_texts(&requests[1], "user");
assert!(
second_user_texts.contains(&"accepted queued prompt".to_string()),
"second request should include the accepted queued prompt",
);
assert!(
!second_user_texts.contains(&HOOK_PROMPT_XML_USER_TEXT.to_string()),
"second request should not include the blocked XML-shaped queued prompt",
);
let hook_inputs = read_user_prompt_submit_hook_inputs(test.codex_home_path())?;
assert_eq!(
hook_inputs
.iter()
.map(|input| {
input["prompt"]
.as_str()
.expect("queued prompt hook prompt")
.to_string()
})
.collect::<Vec<_>>(),
vec![
"initial prompt".to_string(),
"accepted queued prompt".to_string(),
HOOK_PROMPT_XML_USER_TEXT.to_string(),
],
);
server.shutdown().await;
Ok(())
}
#[tokio::test]
async fn permission_request_hook_allows_shell_command_without_user_approval() -> Result<()> {
skip_if_no_network!(Ok(()));

View File

@@ -1,6 +1,14 @@
use std::sync::Arc;
#[cfg(not(target_os = "windows"))]
use std::time::Duration;
#[cfg(not(target_os = "windows"))]
use anyhow::Context;
#[cfg(not(target_os = "windows"))]
use anyhow::Result;
use codex_core::CodexThread;
#[cfg(not(target_os = "windows"))]
use codex_features::Feature;
use codex_protocol::AgentPath;
use codex_protocol::items::TurnItem;
use codex_protocol::models::PermissionProfile;
@@ -20,6 +28,8 @@ use core_test_support::responses::ev_output_text_delta;
use core_test_support::responses::ev_reasoning_item;
use core_test_support::responses::ev_reasoning_item_added;
use core_test_support::responses::ev_response_created;
#[cfg(not(target_os = "windows"))]
use core_test_support::skip_if_no_network;
use core_test_support::streaming_sse::StreamingSseChunk;
use core_test_support::streaming_sse::StreamingSseServer;
use core_test_support::streaming_sse::start_streaming_sse_server;
@@ -32,6 +42,14 @@ use serde_json::Value;
use serde_json::from_slice;
use serde_json::json;
use tokio::sync::oneshot;
#[cfg(not(target_os = "windows"))]
use tokio::time::sleep;
#[cfg(not(target_os = "windows"))]
const BLOCKED_PROMPT_CONTEXT: &str = "Remember the blocked lighthouse note.";
#[cfg(not(target_os = "windows"))]
const HOOK_PROMPT_XML_USER_TEXT: &str =
r#"<hook_prompt hook_run_id="hook-run-1">blocked xml prompt</hook_prompt>"#;
fn ev_message_item_done(id: &str, text: &str) -> Value {
serde_json::json!({
@@ -84,6 +102,66 @@ fn response_completed_chunks(response_id: &str) -> Vec<StreamingSseChunk> {
]
}
#[cfg(not(target_os = "windows"))]
fn write_user_prompt_submit_hook(
home: &std::path::Path,
blocked_prompt: &str,
additional_context: &str,
) -> Result<()> {
let script_path = home.join("user_prompt_submit_hook.py");
let log_path = home.join("user_prompt_submit_hook_log.jsonl");
let log_path = log_path.display();
let blocked_prompt_json =
serde_json::to_string(blocked_prompt).context("serialize blocked prompt for test")?;
let additional_context_json = serde_json::to_string(additional_context)
.context("serialize user prompt submit additional context for test")?;
let script = format!(
r#"import json
from pathlib import Path
import sys
payload = json.load(sys.stdin)
with Path(r"{log_path}").open("a", encoding="utf-8") as handle:
handle.write(json.dumps(payload) + "\n")
if payload.get("prompt") == {blocked_prompt_json}:
print(json.dumps({{
"decision": "block",
"reason": "blocked by hook",
"hookSpecificOutput": {{
"hookEventName": "UserPromptSubmit",
"additionalContext": {additional_context_json}
}}
}}))
"#,
);
let hooks = serde_json::json!({
"hooks": {
"UserPromptSubmit": [{
"hooks": [{
"type": "command",
"command": format!("python3 {}", script_path.display()),
"statusMessage": "running user prompt submit hook",
}]
}]
}
});
std::fs::write(&script_path, script).context("write user prompt submit hook script")?;
std::fs::write(home.join("hooks.json"), hooks.to_string()).context("write hooks.json")?;
Ok(())
}
#[cfg(not(target_os = "windows"))]
fn read_user_prompt_submit_hook_inputs(home: &std::path::Path) -> Result<Vec<Value>> {
std::fs::read_to_string(home.join("user_prompt_submit_hook_log.jsonl"))
.context("read user prompt submit hook log")?
.lines()
.filter(|line| !line.trim().is_empty())
.map(|line| serde_json::from_str(line).context("parse user prompt submit hook log line"))
.collect()
}
async fn build_codex(server: &StreamingSseServer) -> Arc<CodexThread> {
test_codex()
.with_model("gpt-5.4")
@@ -487,6 +565,149 @@ async fn user_input_does_not_preempt_after_reasoning_item() {
server.shutdown().await;
}
#[cfg(not(target_os = "windows"))]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn queued_hook_prompt_xml_text_still_runs_user_prompt_submit() -> Result<()> {
skip_if_no_network!(Ok(()));
let (gate_completed_tx, gate_completed_rx) = oneshot::channel();
let first_chunks = vec![
StreamingSseChunk {
gate: None,
body: sse_event(ev_response_created("resp-1")),
},
StreamingSseChunk {
gate: None,
body: sse_event(ev_message_item_added("msg-1", "")),
},
StreamingSseChunk {
gate: None,
body: sse_event(ev_output_text_delta("first ")),
},
StreamingSseChunk {
gate: None,
body: sse_event(ev_message_item_done("msg-1", "first response")),
},
StreamingSseChunk {
gate: Some(gate_completed_rx),
body: sse_event(ev_completed("resp-1")),
},
];
let second_chunks = vec![StreamingSseChunk {
gate: None,
body: responses::sse(vec![
ev_response_created("resp-2"),
responses::ev_assistant_message("msg-2", "accepted queued prompt handled"),
ev_completed("resp-2"),
]),
}];
let (server, _completions) =
start_streaming_sse_server(vec![first_chunks, second_chunks]).await;
let mut builder = test_codex()
.with_model("gpt-5.4")
.with_pre_build_hook(|home| {
if let Err(error) = write_user_prompt_submit_hook(
home,
HOOK_PROMPT_XML_USER_TEXT,
BLOCKED_PROMPT_CONTEXT,
) {
panic!("failed to write user prompt submit hook test fixture: {error}");
}
})
.with_config(|config| {
config
.features
.enable(Feature::CodexHooks)
.expect("test config should allow feature update");
});
let test = builder.build_with_streaming_server(&server).await?;
test.codex
.submit(Op::UserInput {
environments: None,
items: vec![UserInput::Text {
text: "initial prompt".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
})
.await?;
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::AgentMessageContentDelta(_))
})
.await;
for text in ["accepted queued prompt", HOOK_PROMPT_XML_USER_TEXT] {
test.codex
.submit(Op::UserInput {
environments: None,
items: vec![UserInput::Text {
text: text.to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
})
.await?;
}
sleep(Duration::from_millis(100)).await;
let _ = gate_completed_tx.send(());
let requests = tokio::time::timeout(Duration::from_secs(30), async {
loop {
let requests = server.requests().await;
if requests.len() >= 2 {
break requests;
}
sleep(Duration::from_millis(50)).await;
}
})
.await
.expect("second request should arrive")
.into_iter()
.collect::<Vec<_>>();
sleep(Duration::from_millis(100)).await;
assert_eq!(requests.len(), 2);
let second_body: Value =
from_slice(&requests[1]).unwrap_or_else(|err| panic!("parse second request: {err}"));
let second_user_texts = message_input_texts(&second_body, "user");
assert!(
second_user_texts.contains(&"accepted queued prompt".to_string()),
"second request should include the accepted queued prompt",
);
assert!(
!second_user_texts.contains(&HOOK_PROMPT_XML_USER_TEXT.to_string()),
"second request should not include the blocked XML-shaped queued prompt",
);
let hook_inputs = read_user_prompt_submit_hook_inputs(test.codex_home_path())?;
assert_eq!(
hook_inputs
.iter()
.map(|input| {
input["prompt"]
.as_str()
.expect("queued prompt hook prompt")
.to_string()
})
.collect::<Vec<_>>(),
vec![
"initial prompt".to_string(),
"accepted queued prompt".to_string(),
HOOK_PROMPT_XML_USER_TEXT.to_string(),
],
);
server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn steered_user_input_waits_for_model_continuation_after_mid_turn_compact() {
let first_chunks = vec![