Compare commits

...

9 Commits

Author SHA1 Message Date
Andrei Eternal
fea1b0a8de [hooks] use default runtime for queued hook test 2026-04-29 21:27:34 -07:00
Andrei Eternal
4ed64b48ad [hooks] simplify pending-input cleanup 2026-04-29 21:22:16 -07:00
Andrei Eternal
f7c60411db Preserve user prompt hook provenance 2026-04-28 19:39:01 -07:00
Andrei Eternal
f22e540c8b Remove hook ordering characterization test 2026-04-28 17:53:49 -07:00
Andrei Eternal
30873d8da5 Merge remote-tracking branch 'origin/main' into codex/remove-duplicate-session-start-hook-call
# Conflicts:
#	codex-rs/core/tests/suite/hooks.rs
2026-04-28 17:17:58 -07:00
Andrei Eternal
dc7fa04a76 Merge branch 'main' of https://github.com/openai/codex into codex/remove-duplicate-session-start-hook-call
# Conflicts:
#	codex-rs/core/src/tasks/mod.rs
2026-04-27 19:28:44 -07:00
Andrei Eternal
d7464d82d3 codex: fix CI failure on PR #18902 2026-04-21 20:43:24 -07:00
Andrei Eternal
8b2778a80b hooks: share input-item hook handling 2026-04-21 15:57:41 -07:00
Andrei Eternal
56ff187df5 core: remove duplicate session-start hook check 2026-04-21 15:38:51 -07:00
9 changed files with 401 additions and 37 deletions

View File

@@ -17,6 +17,7 @@ use codex_hooks::UserPromptSubmitRequest;
use codex_otel::HOOK_RUN_DURATION_METRIC;
use codex_otel::HOOK_RUN_METRIC;
use codex_protocol::items::TurnItem;
use codex_protocol::items::UserMessageItem;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::AskForApproval;
@@ -35,6 +36,7 @@ use crate::context::HookAdditionalContext;
use crate::event_mapping::parse_turn_item;
use crate::session::session::Session;
use crate::session::turn_context::TurnContext;
use crate::state::PendingInputItem;
use crate::tools::hook_names::HookToolName;
use crate::tools::sandboxing::PermissionRequestPayload;
@@ -50,6 +52,8 @@ pub(crate) enum PendingInputHookDisposition {
pub(crate) enum PendingInputRecord {
UserMessage {
// Queued user input still carries UI-only text spans. Preserve that
// payload when the caller has it instead of rebuilding from model input.
content: Vec<UserInput>,
response_item: ResponseItem,
additional_contexts: Vec<String>,
@@ -277,8 +281,31 @@ pub(crate) async fn run_user_prompt_submit_hooks(
pub(crate) async fn inspect_pending_input(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
pending_input_item: ResponseInputItem,
pending_input_item: PendingInputItem,
) -> PendingInputHookDisposition {
let pending_input_item = match pending_input_item {
PendingInputItem::UserInput(content) => {
let prompt = UserMessageItem::new(&content).message();
let response_item = ResponseItem::from(ResponseInputItem::from(content.clone()));
let user_prompt_submit_outcome =
run_user_prompt_submit_hooks(sess, turn_context, prompt).await;
if user_prompt_submit_outcome.should_stop {
return PendingInputHookDisposition::Blocked {
additional_contexts: user_prompt_submit_outcome.additional_contexts,
};
}
return PendingInputHookDisposition::Accepted(Box::new(
PendingInputRecord::UserMessage {
content,
response_item,
additional_contexts: user_prompt_submit_outcome.additional_contexts,
},
));
}
PendingInputItem::ResponseInput(input_item) => input_item,
};
let response_item = ResponseItem::from(pending_input_item);
if let Some(TurnItem::UserMessage(user_message)) = parse_turn_item(&response_item) {
let user_prompt_submit_outcome =

View File

@@ -281,6 +281,7 @@ use crate::skills_watcher::SkillsWatcher;
use crate::skills_watcher::SkillsWatcherEvent;
use crate::state::ActiveTurn;
use crate::state::MailboxDeliveryPhase;
use crate::state::PendingInputItem;
use crate::state::PendingRequestPermissions;
use crate::state::SessionServices;
use crate::state::SessionState;
@@ -2997,7 +2998,7 @@ impl Session {
}
let mut turn_state = active_turn.turn_state.lock().await;
turn_state.push_pending_input(input.into());
turn_state.push_pending_input_item(PendingInputItem::UserInput(input));
turn_state.accept_mailbox_delivery_for_current_turn();
Ok(active_turn_id.clone())
}
@@ -3084,34 +3085,60 @@ impl Session {
self.mailbox_rx.lock().await.has_pending()
}
#[expect(
clippy::await_holding_invalid_type,
reason = "active turn checks and turn state updates must remain atomic"
)]
#[cfg(test)]
pub async fn prepend_pending_input(&self, input: Vec<ResponseInputItem>) -> Result<(), ()> {
let mut active = self.active_turn.lock().await;
match active.as_mut() {
Some(at) => {
let mut ts = at.turn_state.lock().await;
ts.prepend_pending_input(input);
Ok(())
}
None => Err(()),
}
self.prepend_pending_input_items(
input
.into_iter()
.map(PendingInputItem::ResponseInput)
.collect(),
)
.await
}
#[expect(
clippy::await_holding_invalid_type,
reason = "active turn checks and turn state updates must remain atomic"
)]
pub(crate) async fn prepend_pending_input_items(
&self,
input: Vec<PendingInputItem>,
) -> Result<(), ()> {
let mut active = self.active_turn.lock().await;
match active.as_mut() {
Some(at) => {
let mut ts = at.turn_state.lock().await;
ts.prepend_pending_input_items(input);
Ok(())
}
None => Err(()),
}
}
#[cfg(test)]
pub async fn get_pending_input(&self) -> Vec<ResponseInputItem> {
self.take_pending_input_items()
.await
.into_iter()
.map(|item| match item {
PendingInputItem::UserInput(input) => input.into(),
PendingInputItem::ResponseInput(input) => input,
})
.collect()
}
#[expect(
clippy::await_holding_invalid_type,
reason = "active turn checks and turn state updates must remain atomic"
)]
pub(crate) async fn take_pending_input_items(&self) -> Vec<PendingInputItem> {
let (pending_input, accepts_mailbox_delivery) = {
let mut active = self.active_turn.lock().await;
match active.as_mut() {
Some(at) => {
let mut ts = at.turn_state.lock().await;
(
ts.take_pending_input(),
ts.take_pending_input_items(),
ts.accepts_mailbox_delivery_for_current_turn(),
)
}
@@ -3127,6 +3154,7 @@ impl Session {
.drain()
.into_iter()
.map(|mail| mail.to_response_input_item())
.map(PendingInputItem::ResponseInput)
.collect::<Vec<_>>()
};
if pending_input.is_empty() {
@@ -3147,10 +3175,10 @@ impl Session {
}
let mut idle_pending_input = self.idle_pending_input.lock().await;
idle_pending_input.extend(items);
idle_pending_input.extend(items.into_iter().map(PendingInputItem::ResponseInput));
}
pub(crate) async fn take_queued_response_items_for_next_turn(&self) -> Vec<ResponseInputItem> {
pub(crate) async fn take_queued_input_items_for_next_turn(&self) -> Vec<PendingInputItem> {
std::mem::take(&mut *self.idle_pending_input.lock().await)
}

View File

@@ -24,7 +24,7 @@ pub(crate) struct Session {
pub(crate) active_turn: Mutex<Option<ActiveTurn>>,
pub(super) mailbox: Mailbox,
pub(super) mailbox_rx: Mutex<MailboxReceiver>,
pub(super) idle_pending_input: Mutex<Vec<ResponseInputItem>>, // TODO (jif) merge with mailbox!
pub(super) idle_pending_input: Mutex<Vec<PendingInputItem>>, // TODO (jif) merge with mailbox!
pub(crate) goal_runtime: GoalRuntimeState,
pub(crate) guardian_review_session: GuardianReviewSessionManager,
pub(crate) services: SessionServices,

View File

@@ -296,6 +296,8 @@ pub(crate) async fn run_turn(
})
.collect::<Vec<_>>();
// Run SessionStart before UserPromptSubmit so startup hooks can shape the
// turn seen by prompt-submit hooks.
if run_pending_session_start_hooks(&sess, &turn_context).await {
return None;
}
@@ -303,7 +305,7 @@ pub(crate) async fn run_turn(
Vec::new()
} else {
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input.clone());
let response_item: ResponseItem = initial_input_for_turn.clone().into();
let response_item: ResponseItem = initial_input_for_turn.into();
let user_prompt_submit_outcome = run_user_prompt_submit_hooks(
&sess,
&turn_context,
@@ -373,15 +375,11 @@ pub(crate) async fn run_turn(
let mut can_drain_pending_input = input.is_empty();
loop {
if run_pending_session_start_hooks(&sess, &turn_context).await {
break;
}
// Note that pending_input would be something like a message the user
// submitted through the UI while the model was running. Though the UI
// may support this, the model might not.
let pending_input = if can_drain_pending_input {
sess.get_pending_input().await
sess.take_pending_input_items().await
} else {
Vec::new()
};
@@ -402,7 +400,9 @@ pub(crate) async fn run_turn(
} => {
let remaining_pending_input = pending_input_iter.collect::<Vec<_>>();
if !remaining_pending_input.is_empty() {
let _ = sess.prepend_pending_input(remaining_pending_input).await;
let _ = sess
.prepend_pending_input_items(remaining_pending_input)
.await;
requeued_pending_input = true;
}
blocked_pending_input_contexts = additional_contexts;

View File

@@ -6,6 +6,7 @@ pub(crate) use service::SessionServices;
pub(crate) use session::SessionState;
pub(crate) use turn::ActiveTurn;
pub(crate) use turn::MailboxDeliveryPhase;
pub(crate) use turn::PendingInputItem;
pub(crate) use turn::PendingRequestPermissions;
pub(crate) use turn::RunningTask;
pub(crate) use turn::TaskKind;

View File

@@ -24,6 +24,7 @@ use crate::tasks::AnySessionTask;
use codex_protocol::models::AdditionalPermissionProfile;
use codex_protocol::protocol::ReviewDecision;
use codex_protocol::protocol::TokenUsage;
use codex_protocol::user_input::UserInput;
/// Metadata about the currently running turn.
pub(crate) struct ActiveTurn {
@@ -84,6 +85,12 @@ pub(crate) struct RemovedTask {
pub(crate) active_turn_is_empty: bool,
}
#[derive(Clone, Debug, PartialEq)]
pub(crate) enum PendingInputItem {
UserInput(Vec<UserInput>),
ResponseInput(ResponseInputItem),
}
impl ActiveTurn {
pub(crate) fn add_task(&mut self, task: RunningTask) {
let sub_id = task.turn_context.sub_id.clone();
@@ -113,7 +120,7 @@ pub(crate) struct TurnState {
pending_user_input: HashMap<String, oneshot::Sender<RequestUserInputResponse>>,
pending_elicitations: HashMap<(String, RequestId), oneshot::Sender<ElicitationResponse>>,
pending_dynamic_tools: HashMap<String, oneshot::Sender<DynamicToolResponse>>,
pending_input: Vec<ResponseInputItem>,
pending_input: Vec<PendingInputItem>,
mailbox_delivery_phase: MailboxDeliveryPhase,
granted_permissions: Option<AdditionalPermissionProfile>,
strict_auto_review_enabled: bool,
@@ -219,10 +226,15 @@ impl TurnState {
}
pub(crate) fn push_pending_input(&mut self, input: ResponseInputItem) {
self.pending_input
.push(PendingInputItem::ResponseInput(input));
}
pub(crate) fn push_pending_input_item(&mut self, input: PendingInputItem) {
self.pending_input.push(input);
}
pub(crate) fn prepend_pending_input(&mut self, mut input: Vec<ResponseInputItem>) {
pub(crate) fn prepend_pending_input_items(&mut self, mut input: Vec<PendingInputItem>) {
if input.is_empty() {
return;
}
@@ -231,7 +243,7 @@ impl TurnState {
self.pending_input = input;
}
pub(crate) fn take_pending_input(&mut self) -> Vec<ResponseInputItem> {
pub(crate) fn take_pending_input_items(&mut self) -> Vec<PendingInputItem> {
if self.pending_input.is_empty() {
Vec::with_capacity(0)
} else {
@@ -241,6 +253,17 @@ impl TurnState {
}
}
#[cfg(test)]
pub(crate) fn take_pending_input(&mut self) -> Vec<ResponseInputItem> {
self.take_pending_input_items()
.into_iter()
.map(|item| match item {
PendingInputItem::UserInput(input) => input.into(),
PendingInputItem::ResponseInput(input) => input,
})
.collect()
}
pub(crate) fn has_pending_input(&self) -> bool {
!self.pending_input.is_empty()
}

View File

@@ -30,6 +30,7 @@ use crate::hook_runtime::record_pending_input;
use crate::session::session::Session;
use crate::session::turn_context::TurnContext;
use crate::state::ActiveTurn;
use crate::state::PendingInputItem;
use crate::state::RunningTask;
use crate::state::TaskKind;
use codex_analytics::TurnTokenUsageFact;
@@ -41,7 +42,6 @@ use codex_otel::TURN_MEMORY_METRIC;
use codex_otel::TURN_NETWORK_PROXY_METRIC;
use codex_otel::TURN_TOKEN_USAGE_METRIC;
use codex_otel::TURN_TOOL_CALL_METRIC;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::RolloutItem;
@@ -339,8 +339,8 @@ impl Session {
{
warn!("failed to apply goal runtime turn-start event: {err}");
}
let queued_response_items = self.take_queued_response_items_for_next_turn().await;
let mailbox_items = self.get_pending_input().await;
let queued_input_items = self.take_queued_input_items_for_next_turn().await;
let mailbox_items = self.take_pending_input_items().await;
let turn_state = {
let mut active = self.active_turn.lock().await;
let turn = active.get_or_insert_with(ActiveTurn::default);
@@ -350,11 +350,11 @@ impl Session {
{
let mut turn_state = turn_state.lock().await;
turn_state.token_usage_at_turn_start = token_usage_at_turn_start;
for item in queued_response_items {
turn_state.push_pending_input(item);
for item in queued_input_items {
turn_state.push_pending_input_item(item);
}
for item in mailbox_items {
turn_state.push_pending_input(item);
turn_state.push_pending_input_item(item);
}
}
@@ -562,7 +562,7 @@ impl Session {
.turn_metadata_state
.cancel_git_enrichment_task();
let mut pending_input = Vec::<ResponseInputItem>::new();
let mut pending_input = Vec::<PendingInputItem>::new();
let mut should_clear_active_turn = false;
let mut token_usage_at_turn_start = None;
let mut turn_had_memory_citation = false;
@@ -587,7 +587,7 @@ impl Session {
};
if let Some(turn_state) = turn_state.as_ref() {
let mut ts = turn_state.lock().await;
pending_input = ts.take_pending_input();
pending_input = ts.take_pending_input_items();
turn_had_memory_citation = ts.has_memory_citation;
turn_tool_calls = ts.tool_calls;
token_usage_at_turn_start = Some(ts.token_usage_at_turn_start.clone());

View File

@@ -45,6 +45,8 @@ use tokio::time::timeout;
const FIRST_CONTINUATION_PROMPT: &str = "Retry with exactly the phrase meow meow meow.";
const SECOND_CONTINUATION_PROMPT: &str = "Now tighten it to just: meow.";
const BLOCKED_PROMPT_CONTEXT: &str = "Remember the blocked lighthouse note.";
const HOOK_PROMPT_XML_USER_TEXT: &str =
r#"<hook_prompt hook_run_id="hook-run-1">blocked xml prompt</hook_prompt>"#;
const PERMISSION_REQUEST_HOOK_MATCHER: &str = "^Bash$";
const PERMISSION_REQUEST_ALLOW_REASON: &str = "should not be used for allow";
@@ -1111,6 +1113,77 @@ async fn blocked_user_prompt_submit_persists_additional_context_for_next_turn()
Ok(())
}
#[tokio::test]
async fn fresh_hook_prompt_xml_text_still_runs_user_prompt_submit() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let response = mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-1"),
ev_assistant_message("msg-1", "second prompt handled"),
ev_completed("resp-1"),
]),
)
.await;
let mut builder = test_codex()
.with_pre_build_hook(|home| {
if let Err(error) = write_user_prompt_submit_hook(
home,
HOOK_PROMPT_XML_USER_TEXT,
BLOCKED_PROMPT_CONTEXT,
) {
panic!("failed to write user prompt submit hook test fixture: {error}");
}
})
.with_config(|config| {
config
.features
.enable(Feature::CodexHooks)
.expect("test config should allow feature update");
});
let test = builder.build(&server).await?;
test.submit_turn(HOOK_PROMPT_XML_USER_TEXT).await?;
test.submit_turn("second prompt").await?;
let request = response.single_request();
assert!(
request
.message_input_texts("developer")
.contains(&BLOCKED_PROMPT_CONTEXT.to_string()),
"second request should include context persisted from the blocked XML-shaped prompt",
);
assert!(
request
.message_input_texts("user")
.iter()
.all(|text| !text.contains(HOOK_PROMPT_XML_USER_TEXT)),
"blocked XML-shaped prompt should not be sent to the model",
);
let hook_inputs = read_user_prompt_submit_hook_inputs(test.codex_home_path())?;
assert_eq!(
hook_inputs
.iter()
.map(|input| {
input["prompt"]
.as_str()
.expect("user prompt submit hook prompt")
.to_string()
})
.collect::<Vec<_>>(),
vec![
HOOK_PROMPT_XML_USER_TEXT.to_string(),
"second prompt".to_string(),
],
);
Ok(())
}
#[tokio::test]
async fn blocked_queued_prompt_does_not_strand_earlier_accepted_prompt() -> Result<()> {
skip_if_no_network!(Ok(()));

View File

@@ -1,6 +1,10 @@
use std::sync::Arc;
use std::time::Duration;
use anyhow::Context;
use anyhow::Result;
use codex_core::CodexThread;
use codex_features::Feature;
use codex_protocol::AgentPath;
use codex_protocol::items::TurnItem;
use codex_protocol::models::PermissionProfile;
@@ -20,6 +24,8 @@ use core_test_support::responses::ev_output_text_delta;
use core_test_support::responses::ev_reasoning_item;
use core_test_support::responses::ev_reasoning_item_added;
use core_test_support::responses::ev_response_created;
use core_test_support::skip_if_no_network;
use core_test_support::skip_if_windows;
use core_test_support::streaming_sse::StreamingSseChunk;
use core_test_support::streaming_sse::StreamingSseServer;
use core_test_support::streaming_sse::start_streaming_sse_server;
@@ -32,6 +38,11 @@ use serde_json::Value;
use serde_json::from_slice;
use serde_json::json;
use tokio::sync::oneshot;
use tokio::time::sleep;
const BLOCKED_PROMPT_CONTEXT: &str = "Remember the blocked lighthouse note.";
const HOOK_PROMPT_XML_USER_TEXT: &str =
r#"<hook_prompt hook_run_id="hook-run-1">blocked xml prompt</hook_prompt>"#;
fn ev_message_item_done(id: &str, text: &str) -> Value {
serde_json::json!({
@@ -84,6 +95,64 @@ fn response_completed_chunks(response_id: &str) -> Vec<StreamingSseChunk> {
]
}
fn write_user_prompt_submit_hook(
home: &std::path::Path,
blocked_prompt: &str,
additional_context: &str,
) -> Result<()> {
let script_path = home.join("user_prompt_submit_hook.py");
let log_path = home.join("user_prompt_submit_hook_log.jsonl");
let log_path = log_path.display();
let blocked_prompt_json =
serde_json::to_string(blocked_prompt).context("serialize blocked prompt for test")?;
let additional_context_json = serde_json::to_string(additional_context)
.context("serialize user prompt submit additional context for test")?;
let script = format!(
r#"import json
from pathlib import Path
import sys
payload = json.load(sys.stdin)
with Path(r"{log_path}").open("a", encoding="utf-8") as handle:
handle.write(json.dumps(payload) + "\n")
if payload.get("prompt") == {blocked_prompt_json}:
print(json.dumps({{
"decision": "block",
"reason": "blocked by hook",
"hookSpecificOutput": {{
"hookEventName": "UserPromptSubmit",
"additionalContext": {additional_context_json}
}}
}}))
"#,
);
let hooks = serde_json::json!({
"hooks": {
"UserPromptSubmit": [{
"hooks": [{
"type": "command",
"command": format!("python3 {}", script_path.display()),
"statusMessage": "running user prompt submit hook",
}]
}]
}
});
std::fs::write(&script_path, script).context("write user prompt submit hook script")?;
std::fs::write(home.join("hooks.json"), hooks.to_string()).context("write hooks.json")?;
Ok(())
}
fn read_user_prompt_submit_hook_inputs(home: &std::path::Path) -> Result<Vec<Value>> {
std::fs::read_to_string(home.join("user_prompt_submit_hook_log.jsonl"))
.context("read user prompt submit hook log")?
.lines()
.filter(|line| !line.trim().is_empty())
.map(|line| serde_json::from_str(line).context("parse user prompt submit hook log line"))
.collect()
}
async fn build_codex(server: &StreamingSseServer) -> Arc<CodexThread> {
test_codex()
.with_model("gpt-5.4")
@@ -487,6 +556,149 @@ async fn user_input_does_not_preempt_after_reasoning_item() {
server.shutdown().await;
}
#[tokio::test]
async fn queued_hook_prompt_xml_text_still_runs_user_prompt_submit() -> Result<()> {
skip_if_windows!(Ok(()));
skip_if_no_network!(Ok(()));
let (gate_completed_tx, gate_completed_rx) = oneshot::channel();
let first_chunks = vec![
StreamingSseChunk {
gate: None,
body: sse_event(ev_response_created("resp-1")),
},
StreamingSseChunk {
gate: None,
body: sse_event(ev_message_item_added("msg-1", "")),
},
StreamingSseChunk {
gate: None,
body: sse_event(ev_output_text_delta("first ")),
},
StreamingSseChunk {
gate: None,
body: sse_event(ev_message_item_done("msg-1", "first response")),
},
StreamingSseChunk {
gate: Some(gate_completed_rx),
body: sse_event(ev_completed("resp-1")),
},
];
let second_chunks = vec![StreamingSseChunk {
gate: None,
body: responses::sse(vec![
ev_response_created("resp-2"),
responses::ev_assistant_message("msg-2", "accepted queued prompt handled"),
ev_completed("resp-2"),
]),
}];
let (server, _completions) =
start_streaming_sse_server(vec![first_chunks, second_chunks]).await;
let mut builder = test_codex()
.with_model("gpt-5.4")
.with_pre_build_hook(|home| {
if let Err(error) = write_user_prompt_submit_hook(
home,
HOOK_PROMPT_XML_USER_TEXT,
BLOCKED_PROMPT_CONTEXT,
) {
panic!("failed to write user prompt submit hook test fixture: {error}");
}
})
.with_config(|config| {
config
.features
.enable(Feature::CodexHooks)
.expect("test config should allow feature update");
});
let test = builder.build_with_streaming_server(&server).await?;
test.codex
.submit(Op::UserInput {
environments: None,
items: vec![UserInput::Text {
text: "initial prompt".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
})
.await?;
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::AgentMessageContentDelta(_))
})
.await;
for text in ["accepted queued prompt", HOOK_PROMPT_XML_USER_TEXT] {
test.codex
.submit(Op::UserInput {
environments: None,
items: vec![UserInput::Text {
text: text.to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
})
.await?;
}
sleep(Duration::from_millis(100)).await;
let _ = gate_completed_tx.send(());
let requests = tokio::time::timeout(Duration::from_secs(30), async {
loop {
let requests = server.requests().await;
if requests.len() >= 2 {
break requests;
}
sleep(Duration::from_millis(50)).await;
}
})
.await
.expect("second request should arrive")
.into_iter()
.collect::<Vec<_>>();
sleep(Duration::from_millis(100)).await;
assert_eq!(requests.len(), 2);
let second_body: Value =
from_slice(&requests[1]).unwrap_or_else(|err| panic!("parse second request: {err}"));
let second_user_texts = message_input_texts(&second_body, "user");
assert!(
second_user_texts.contains(&"accepted queued prompt".to_string()),
"second request should include the accepted queued prompt",
);
assert!(
!second_user_texts.contains(&HOOK_PROMPT_XML_USER_TEXT.to_string()),
"second request should not include the blocked XML-shaped queued prompt",
);
let hook_inputs = read_user_prompt_submit_hook_inputs(test.codex_home_path())?;
assert_eq!(
hook_inputs
.iter()
.map(|input| {
input["prompt"]
.as_str()
.expect("queued prompt hook prompt")
.to_string()
})
.collect::<Vec<_>>(),
vec![
"initial prompt".to_string(),
"accepted queued prompt".to_string(),
HOOK_PROMPT_XML_USER_TEXT.to_string(),
],
);
server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn steered_user_input_waits_for_model_continuation_after_mid_turn_compact() {
let first_chunks = vec![