mirror of
https://github.com/openai/codex.git
synced 2026-05-07 04:47:13 +00:00
Compare commits
9 Commits
rust-v0.12
...
codex/remo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fea1b0a8de | ||
|
|
4ed64b48ad | ||
|
|
f7c60411db | ||
|
|
f22e540c8b | ||
|
|
30873d8da5 | ||
|
|
dc7fa04a76 | ||
|
|
d7464d82d3 | ||
|
|
8b2778a80b | ||
|
|
56ff187df5 |
@@ -17,6 +17,7 @@ use codex_hooks::UserPromptSubmitRequest;
|
||||
use codex_otel::HOOK_RUN_DURATION_METRIC;
|
||||
use codex_otel::HOOK_RUN_METRIC;
|
||||
use codex_protocol::items::TurnItem;
|
||||
use codex_protocol::items::UserMessageItem;
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
@@ -35,6 +36,7 @@ use crate::context::HookAdditionalContext;
|
||||
use crate::event_mapping::parse_turn_item;
|
||||
use crate::session::session::Session;
|
||||
use crate::session::turn_context::TurnContext;
|
||||
use crate::state::PendingInputItem;
|
||||
use crate::tools::hook_names::HookToolName;
|
||||
use crate::tools::sandboxing::PermissionRequestPayload;
|
||||
|
||||
@@ -50,6 +52,8 @@ pub(crate) enum PendingInputHookDisposition {
|
||||
|
||||
pub(crate) enum PendingInputRecord {
|
||||
UserMessage {
|
||||
// Queued user input still carries UI-only text spans. Preserve that
|
||||
// payload when the caller has it instead of rebuilding from model input.
|
||||
content: Vec<UserInput>,
|
||||
response_item: ResponseItem,
|
||||
additional_contexts: Vec<String>,
|
||||
@@ -277,8 +281,31 @@ pub(crate) async fn run_user_prompt_submit_hooks(
|
||||
pub(crate) async fn inspect_pending_input(
|
||||
sess: &Arc<Session>,
|
||||
turn_context: &Arc<TurnContext>,
|
||||
pending_input_item: ResponseInputItem,
|
||||
pending_input_item: PendingInputItem,
|
||||
) -> PendingInputHookDisposition {
|
||||
let pending_input_item = match pending_input_item {
|
||||
PendingInputItem::UserInput(content) => {
|
||||
let prompt = UserMessageItem::new(&content).message();
|
||||
let response_item = ResponseItem::from(ResponseInputItem::from(content.clone()));
|
||||
let user_prompt_submit_outcome =
|
||||
run_user_prompt_submit_hooks(sess, turn_context, prompt).await;
|
||||
if user_prompt_submit_outcome.should_stop {
|
||||
return PendingInputHookDisposition::Blocked {
|
||||
additional_contexts: user_prompt_submit_outcome.additional_contexts,
|
||||
};
|
||||
}
|
||||
|
||||
return PendingInputHookDisposition::Accepted(Box::new(
|
||||
PendingInputRecord::UserMessage {
|
||||
content,
|
||||
response_item,
|
||||
additional_contexts: user_prompt_submit_outcome.additional_contexts,
|
||||
},
|
||||
));
|
||||
}
|
||||
PendingInputItem::ResponseInput(input_item) => input_item,
|
||||
};
|
||||
|
||||
let response_item = ResponseItem::from(pending_input_item);
|
||||
if let Some(TurnItem::UserMessage(user_message)) = parse_turn_item(&response_item) {
|
||||
let user_prompt_submit_outcome =
|
||||
|
||||
@@ -281,6 +281,7 @@ use crate::skills_watcher::SkillsWatcher;
|
||||
use crate::skills_watcher::SkillsWatcherEvent;
|
||||
use crate::state::ActiveTurn;
|
||||
use crate::state::MailboxDeliveryPhase;
|
||||
use crate::state::PendingInputItem;
|
||||
use crate::state::PendingRequestPermissions;
|
||||
use crate::state::SessionServices;
|
||||
use crate::state::SessionState;
|
||||
@@ -2997,7 +2998,7 @@ impl Session {
|
||||
}
|
||||
|
||||
let mut turn_state = active_turn.turn_state.lock().await;
|
||||
turn_state.push_pending_input(input.into());
|
||||
turn_state.push_pending_input_item(PendingInputItem::UserInput(input));
|
||||
turn_state.accept_mailbox_delivery_for_current_turn();
|
||||
Ok(active_turn_id.clone())
|
||||
}
|
||||
@@ -3084,34 +3085,60 @@ impl Session {
|
||||
self.mailbox_rx.lock().await.has_pending()
|
||||
}
|
||||
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "active turn checks and turn state updates must remain atomic"
|
||||
)]
|
||||
#[cfg(test)]
|
||||
pub async fn prepend_pending_input(&self, input: Vec<ResponseInputItem>) -> Result<(), ()> {
|
||||
let mut active = self.active_turn.lock().await;
|
||||
match active.as_mut() {
|
||||
Some(at) => {
|
||||
let mut ts = at.turn_state.lock().await;
|
||||
ts.prepend_pending_input(input);
|
||||
Ok(())
|
||||
}
|
||||
None => Err(()),
|
||||
}
|
||||
self.prepend_pending_input_items(
|
||||
input
|
||||
.into_iter()
|
||||
.map(PendingInputItem::ResponseInput)
|
||||
.collect(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "active turn checks and turn state updates must remain atomic"
|
||||
)]
|
||||
pub(crate) async fn prepend_pending_input_items(
|
||||
&self,
|
||||
input: Vec<PendingInputItem>,
|
||||
) -> Result<(), ()> {
|
||||
let mut active = self.active_turn.lock().await;
|
||||
match active.as_mut() {
|
||||
Some(at) => {
|
||||
let mut ts = at.turn_state.lock().await;
|
||||
ts.prepend_pending_input_items(input);
|
||||
Ok(())
|
||||
}
|
||||
None => Err(()),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub async fn get_pending_input(&self) -> Vec<ResponseInputItem> {
|
||||
self.take_pending_input_items()
|
||||
.await
|
||||
.into_iter()
|
||||
.map(|item| match item {
|
||||
PendingInputItem::UserInput(input) => input.into(),
|
||||
PendingInputItem::ResponseInput(input) => input,
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "active turn checks and turn state updates must remain atomic"
|
||||
)]
|
||||
pub(crate) async fn take_pending_input_items(&self) -> Vec<PendingInputItem> {
|
||||
let (pending_input, accepts_mailbox_delivery) = {
|
||||
let mut active = self.active_turn.lock().await;
|
||||
match active.as_mut() {
|
||||
Some(at) => {
|
||||
let mut ts = at.turn_state.lock().await;
|
||||
(
|
||||
ts.take_pending_input(),
|
||||
ts.take_pending_input_items(),
|
||||
ts.accepts_mailbox_delivery_for_current_turn(),
|
||||
)
|
||||
}
|
||||
@@ -3127,6 +3154,7 @@ impl Session {
|
||||
.drain()
|
||||
.into_iter()
|
||||
.map(|mail| mail.to_response_input_item())
|
||||
.map(PendingInputItem::ResponseInput)
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
if pending_input.is_empty() {
|
||||
@@ -3147,10 +3175,10 @@ impl Session {
|
||||
}
|
||||
|
||||
let mut idle_pending_input = self.idle_pending_input.lock().await;
|
||||
idle_pending_input.extend(items);
|
||||
idle_pending_input.extend(items.into_iter().map(PendingInputItem::ResponseInput));
|
||||
}
|
||||
|
||||
pub(crate) async fn take_queued_response_items_for_next_turn(&self) -> Vec<ResponseInputItem> {
|
||||
pub(crate) async fn take_queued_input_items_for_next_turn(&self) -> Vec<PendingInputItem> {
|
||||
std::mem::take(&mut *self.idle_pending_input.lock().await)
|
||||
}
|
||||
|
||||
|
||||
@@ -24,7 +24,7 @@ pub(crate) struct Session {
|
||||
pub(crate) active_turn: Mutex<Option<ActiveTurn>>,
|
||||
pub(super) mailbox: Mailbox,
|
||||
pub(super) mailbox_rx: Mutex<MailboxReceiver>,
|
||||
pub(super) idle_pending_input: Mutex<Vec<ResponseInputItem>>, // TODO (jif) merge with mailbox!
|
||||
pub(super) idle_pending_input: Mutex<Vec<PendingInputItem>>, // TODO (jif) merge with mailbox!
|
||||
pub(crate) goal_runtime: GoalRuntimeState,
|
||||
pub(crate) guardian_review_session: GuardianReviewSessionManager,
|
||||
pub(crate) services: SessionServices,
|
||||
|
||||
@@ -296,6 +296,8 @@ pub(crate) async fn run_turn(
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Run SessionStart before UserPromptSubmit so startup hooks can shape the
|
||||
// turn seen by prompt-submit hooks.
|
||||
if run_pending_session_start_hooks(&sess, &turn_context).await {
|
||||
return None;
|
||||
}
|
||||
@@ -303,7 +305,7 @@ pub(crate) async fn run_turn(
|
||||
Vec::new()
|
||||
} else {
|
||||
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input.clone());
|
||||
let response_item: ResponseItem = initial_input_for_turn.clone().into();
|
||||
let response_item: ResponseItem = initial_input_for_turn.into();
|
||||
let user_prompt_submit_outcome = run_user_prompt_submit_hooks(
|
||||
&sess,
|
||||
&turn_context,
|
||||
@@ -373,15 +375,11 @@ pub(crate) async fn run_turn(
|
||||
let mut can_drain_pending_input = input.is_empty();
|
||||
|
||||
loop {
|
||||
if run_pending_session_start_hooks(&sess, &turn_context).await {
|
||||
break;
|
||||
}
|
||||
|
||||
// Note that pending_input would be something like a message the user
|
||||
// submitted through the UI while the model was running. Though the UI
|
||||
// may support this, the model might not.
|
||||
let pending_input = if can_drain_pending_input {
|
||||
sess.get_pending_input().await
|
||||
sess.take_pending_input_items().await
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
@@ -402,7 +400,9 @@ pub(crate) async fn run_turn(
|
||||
} => {
|
||||
let remaining_pending_input = pending_input_iter.collect::<Vec<_>>();
|
||||
if !remaining_pending_input.is_empty() {
|
||||
let _ = sess.prepend_pending_input(remaining_pending_input).await;
|
||||
let _ = sess
|
||||
.prepend_pending_input_items(remaining_pending_input)
|
||||
.await;
|
||||
requeued_pending_input = true;
|
||||
}
|
||||
blocked_pending_input_contexts = additional_contexts;
|
||||
|
||||
@@ -6,6 +6,7 @@ pub(crate) use service::SessionServices;
|
||||
pub(crate) use session::SessionState;
|
||||
pub(crate) use turn::ActiveTurn;
|
||||
pub(crate) use turn::MailboxDeliveryPhase;
|
||||
pub(crate) use turn::PendingInputItem;
|
||||
pub(crate) use turn::PendingRequestPermissions;
|
||||
pub(crate) use turn::RunningTask;
|
||||
pub(crate) use turn::TaskKind;
|
||||
|
||||
@@ -24,6 +24,7 @@ use crate::tasks::AnySessionTask;
|
||||
use codex_protocol::models::AdditionalPermissionProfile;
|
||||
use codex_protocol::protocol::ReviewDecision;
|
||||
use codex_protocol::protocol::TokenUsage;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
|
||||
/// Metadata about the currently running turn.
|
||||
pub(crate) struct ActiveTurn {
|
||||
@@ -84,6 +85,12 @@ pub(crate) struct RemovedTask {
|
||||
pub(crate) active_turn_is_empty: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub(crate) enum PendingInputItem {
|
||||
UserInput(Vec<UserInput>),
|
||||
ResponseInput(ResponseInputItem),
|
||||
}
|
||||
|
||||
impl ActiveTurn {
|
||||
pub(crate) fn add_task(&mut self, task: RunningTask) {
|
||||
let sub_id = task.turn_context.sub_id.clone();
|
||||
@@ -113,7 +120,7 @@ pub(crate) struct TurnState {
|
||||
pending_user_input: HashMap<String, oneshot::Sender<RequestUserInputResponse>>,
|
||||
pending_elicitations: HashMap<(String, RequestId), oneshot::Sender<ElicitationResponse>>,
|
||||
pending_dynamic_tools: HashMap<String, oneshot::Sender<DynamicToolResponse>>,
|
||||
pending_input: Vec<ResponseInputItem>,
|
||||
pending_input: Vec<PendingInputItem>,
|
||||
mailbox_delivery_phase: MailboxDeliveryPhase,
|
||||
granted_permissions: Option<AdditionalPermissionProfile>,
|
||||
strict_auto_review_enabled: bool,
|
||||
@@ -219,10 +226,15 @@ impl TurnState {
|
||||
}
|
||||
|
||||
pub(crate) fn push_pending_input(&mut self, input: ResponseInputItem) {
|
||||
self.pending_input
|
||||
.push(PendingInputItem::ResponseInput(input));
|
||||
}
|
||||
|
||||
pub(crate) fn push_pending_input_item(&mut self, input: PendingInputItem) {
|
||||
self.pending_input.push(input);
|
||||
}
|
||||
|
||||
pub(crate) fn prepend_pending_input(&mut self, mut input: Vec<ResponseInputItem>) {
|
||||
pub(crate) fn prepend_pending_input_items(&mut self, mut input: Vec<PendingInputItem>) {
|
||||
if input.is_empty() {
|
||||
return;
|
||||
}
|
||||
@@ -231,7 +243,7 @@ impl TurnState {
|
||||
self.pending_input = input;
|
||||
}
|
||||
|
||||
pub(crate) fn take_pending_input(&mut self) -> Vec<ResponseInputItem> {
|
||||
pub(crate) fn take_pending_input_items(&mut self) -> Vec<PendingInputItem> {
|
||||
if self.pending_input.is_empty() {
|
||||
Vec::with_capacity(0)
|
||||
} else {
|
||||
@@ -241,6 +253,17 @@ impl TurnState {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn take_pending_input(&mut self) -> Vec<ResponseInputItem> {
|
||||
self.take_pending_input_items()
|
||||
.into_iter()
|
||||
.map(|item| match item {
|
||||
PendingInputItem::UserInput(input) => input.into(),
|
||||
PendingInputItem::ResponseInput(input) => input,
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub(crate) fn has_pending_input(&self) -> bool {
|
||||
!self.pending_input.is_empty()
|
||||
}
|
||||
|
||||
@@ -30,6 +30,7 @@ use crate::hook_runtime::record_pending_input;
|
||||
use crate::session::session::Session;
|
||||
use crate::session::turn_context::TurnContext;
|
||||
use crate::state::ActiveTurn;
|
||||
use crate::state::PendingInputItem;
|
||||
use crate::state::RunningTask;
|
||||
use crate::state::TaskKind;
|
||||
use codex_analytics::TurnTokenUsageFact;
|
||||
@@ -41,7 +42,6 @@ use codex_otel::TURN_MEMORY_METRIC;
|
||||
use codex_otel::TURN_NETWORK_PROXY_METRIC;
|
||||
use codex_otel::TURN_TOKEN_USAGE_METRIC;
|
||||
use codex_otel::TURN_TOOL_CALL_METRIC;
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
@@ -339,8 +339,8 @@ impl Session {
|
||||
{
|
||||
warn!("failed to apply goal runtime turn-start event: {err}");
|
||||
}
|
||||
let queued_response_items = self.take_queued_response_items_for_next_turn().await;
|
||||
let mailbox_items = self.get_pending_input().await;
|
||||
let queued_input_items = self.take_queued_input_items_for_next_turn().await;
|
||||
let mailbox_items = self.take_pending_input_items().await;
|
||||
let turn_state = {
|
||||
let mut active = self.active_turn.lock().await;
|
||||
let turn = active.get_or_insert_with(ActiveTurn::default);
|
||||
@@ -350,11 +350,11 @@ impl Session {
|
||||
{
|
||||
let mut turn_state = turn_state.lock().await;
|
||||
turn_state.token_usage_at_turn_start = token_usage_at_turn_start;
|
||||
for item in queued_response_items {
|
||||
turn_state.push_pending_input(item);
|
||||
for item in queued_input_items {
|
||||
turn_state.push_pending_input_item(item);
|
||||
}
|
||||
for item in mailbox_items {
|
||||
turn_state.push_pending_input(item);
|
||||
turn_state.push_pending_input_item(item);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -562,7 +562,7 @@ impl Session {
|
||||
.turn_metadata_state
|
||||
.cancel_git_enrichment_task();
|
||||
|
||||
let mut pending_input = Vec::<ResponseInputItem>::new();
|
||||
let mut pending_input = Vec::<PendingInputItem>::new();
|
||||
let mut should_clear_active_turn = false;
|
||||
let mut token_usage_at_turn_start = None;
|
||||
let mut turn_had_memory_citation = false;
|
||||
@@ -587,7 +587,7 @@ impl Session {
|
||||
};
|
||||
if let Some(turn_state) = turn_state.as_ref() {
|
||||
let mut ts = turn_state.lock().await;
|
||||
pending_input = ts.take_pending_input();
|
||||
pending_input = ts.take_pending_input_items();
|
||||
turn_had_memory_citation = ts.has_memory_citation;
|
||||
turn_tool_calls = ts.tool_calls;
|
||||
token_usage_at_turn_start = Some(ts.token_usage_at_turn_start.clone());
|
||||
|
||||
@@ -45,6 +45,8 @@ use tokio::time::timeout;
|
||||
const FIRST_CONTINUATION_PROMPT: &str = "Retry with exactly the phrase meow meow meow.";
|
||||
const SECOND_CONTINUATION_PROMPT: &str = "Now tighten it to just: meow.";
|
||||
const BLOCKED_PROMPT_CONTEXT: &str = "Remember the blocked lighthouse note.";
|
||||
const HOOK_PROMPT_XML_USER_TEXT: &str =
|
||||
r#"<hook_prompt hook_run_id="hook-run-1">blocked xml prompt</hook_prompt>"#;
|
||||
const PERMISSION_REQUEST_HOOK_MATCHER: &str = "^Bash$";
|
||||
const PERMISSION_REQUEST_ALLOW_REASON: &str = "should not be used for allow";
|
||||
|
||||
@@ -1111,6 +1113,77 @@ async fn blocked_user_prompt_submit_persists_additional_context_for_next_turn()
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fresh_hook_prompt_xml_text_still_runs_user_prompt_submit() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let response = mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_assistant_message("msg-1", "second prompt handled"),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex()
|
||||
.with_pre_build_hook(|home| {
|
||||
if let Err(error) = write_user_prompt_submit_hook(
|
||||
home,
|
||||
HOOK_PROMPT_XML_USER_TEXT,
|
||||
BLOCKED_PROMPT_CONTEXT,
|
||||
) {
|
||||
panic!("failed to write user prompt submit hook test fixture: {error}");
|
||||
}
|
||||
})
|
||||
.with_config(|config| {
|
||||
config
|
||||
.features
|
||||
.enable(Feature::CodexHooks)
|
||||
.expect("test config should allow feature update");
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
|
||||
test.submit_turn(HOOK_PROMPT_XML_USER_TEXT).await?;
|
||||
test.submit_turn("second prompt").await?;
|
||||
|
||||
let request = response.single_request();
|
||||
assert!(
|
||||
request
|
||||
.message_input_texts("developer")
|
||||
.contains(&BLOCKED_PROMPT_CONTEXT.to_string()),
|
||||
"second request should include context persisted from the blocked XML-shaped prompt",
|
||||
);
|
||||
assert!(
|
||||
request
|
||||
.message_input_texts("user")
|
||||
.iter()
|
||||
.all(|text| !text.contains(HOOK_PROMPT_XML_USER_TEXT)),
|
||||
"blocked XML-shaped prompt should not be sent to the model",
|
||||
);
|
||||
|
||||
let hook_inputs = read_user_prompt_submit_hook_inputs(test.codex_home_path())?;
|
||||
assert_eq!(
|
||||
hook_inputs
|
||||
.iter()
|
||||
.map(|input| {
|
||||
input["prompt"]
|
||||
.as_str()
|
||||
.expect("user prompt submit hook prompt")
|
||||
.to_string()
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
vec![
|
||||
HOOK_PROMPT_XML_USER_TEXT.to_string(),
|
||||
"second prompt".to_string(),
|
||||
],
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn blocked_queued_prompt_does_not_strand_earlier_accepted_prompt() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
@@ -1,6 +1,10 @@
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use codex_core::CodexThread;
|
||||
use codex_features::Feature;
|
||||
use codex_protocol::AgentPath;
|
||||
use codex_protocol::items::TurnItem;
|
||||
use codex_protocol::models::PermissionProfile;
|
||||
@@ -20,6 +24,8 @@ use core_test_support::responses::ev_output_text_delta;
|
||||
use core_test_support::responses::ev_reasoning_item;
|
||||
use core_test_support::responses::ev_reasoning_item_added;
|
||||
use core_test_support::responses::ev_response_created;
|
||||
use core_test_support::skip_if_no_network;
|
||||
use core_test_support::skip_if_windows;
|
||||
use core_test_support::streaming_sse::StreamingSseChunk;
|
||||
use core_test_support::streaming_sse::StreamingSseServer;
|
||||
use core_test_support::streaming_sse::start_streaming_sse_server;
|
||||
@@ -32,6 +38,11 @@ use serde_json::Value;
|
||||
use serde_json::from_slice;
|
||||
use serde_json::json;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::time::sleep;
|
||||
|
||||
const BLOCKED_PROMPT_CONTEXT: &str = "Remember the blocked lighthouse note.";
|
||||
const HOOK_PROMPT_XML_USER_TEXT: &str =
|
||||
r#"<hook_prompt hook_run_id="hook-run-1">blocked xml prompt</hook_prompt>"#;
|
||||
|
||||
fn ev_message_item_done(id: &str, text: &str) -> Value {
|
||||
serde_json::json!({
|
||||
@@ -84,6 +95,64 @@ fn response_completed_chunks(response_id: &str) -> Vec<StreamingSseChunk> {
|
||||
]
|
||||
}
|
||||
|
||||
fn write_user_prompt_submit_hook(
|
||||
home: &std::path::Path,
|
||||
blocked_prompt: &str,
|
||||
additional_context: &str,
|
||||
) -> Result<()> {
|
||||
let script_path = home.join("user_prompt_submit_hook.py");
|
||||
let log_path = home.join("user_prompt_submit_hook_log.jsonl");
|
||||
let log_path = log_path.display();
|
||||
let blocked_prompt_json =
|
||||
serde_json::to_string(blocked_prompt).context("serialize blocked prompt for test")?;
|
||||
let additional_context_json = serde_json::to_string(additional_context)
|
||||
.context("serialize user prompt submit additional context for test")?;
|
||||
let script = format!(
|
||||
r#"import json
|
||||
from pathlib import Path
|
||||
import sys
|
||||
|
||||
payload = json.load(sys.stdin)
|
||||
with Path(r"{log_path}").open("a", encoding="utf-8") as handle:
|
||||
handle.write(json.dumps(payload) + "\n")
|
||||
|
||||
if payload.get("prompt") == {blocked_prompt_json}:
|
||||
print(json.dumps({{
|
||||
"decision": "block",
|
||||
"reason": "blocked by hook",
|
||||
"hookSpecificOutput": {{
|
||||
"hookEventName": "UserPromptSubmit",
|
||||
"additionalContext": {additional_context_json}
|
||||
}}
|
||||
}}))
|
||||
"#,
|
||||
);
|
||||
let hooks = serde_json::json!({
|
||||
"hooks": {
|
||||
"UserPromptSubmit": [{
|
||||
"hooks": [{
|
||||
"type": "command",
|
||||
"command": format!("python3 {}", script_path.display()),
|
||||
"statusMessage": "running user prompt submit hook",
|
||||
}]
|
||||
}]
|
||||
}
|
||||
});
|
||||
|
||||
std::fs::write(&script_path, script).context("write user prompt submit hook script")?;
|
||||
std::fs::write(home.join("hooks.json"), hooks.to_string()).context("write hooks.json")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn read_user_prompt_submit_hook_inputs(home: &std::path::Path) -> Result<Vec<Value>> {
|
||||
std::fs::read_to_string(home.join("user_prompt_submit_hook_log.jsonl"))
|
||||
.context("read user prompt submit hook log")?
|
||||
.lines()
|
||||
.filter(|line| !line.trim().is_empty())
|
||||
.map(|line| serde_json::from_str(line).context("parse user prompt submit hook log line"))
|
||||
.collect()
|
||||
}
|
||||
|
||||
async fn build_codex(server: &StreamingSseServer) -> Arc<CodexThread> {
|
||||
test_codex()
|
||||
.with_model("gpt-5.4")
|
||||
@@ -487,6 +556,149 @@ async fn user_input_does_not_preempt_after_reasoning_item() {
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn queued_hook_prompt_xml_text_still_runs_user_prompt_submit() -> Result<()> {
|
||||
skip_if_windows!(Ok(()));
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let (gate_completed_tx, gate_completed_rx) = oneshot::channel();
|
||||
let first_chunks = vec![
|
||||
StreamingSseChunk {
|
||||
gate: None,
|
||||
body: sse_event(ev_response_created("resp-1")),
|
||||
},
|
||||
StreamingSseChunk {
|
||||
gate: None,
|
||||
body: sse_event(ev_message_item_added("msg-1", "")),
|
||||
},
|
||||
StreamingSseChunk {
|
||||
gate: None,
|
||||
body: sse_event(ev_output_text_delta("first ")),
|
||||
},
|
||||
StreamingSseChunk {
|
||||
gate: None,
|
||||
body: sse_event(ev_message_item_done("msg-1", "first response")),
|
||||
},
|
||||
StreamingSseChunk {
|
||||
gate: Some(gate_completed_rx),
|
||||
body: sse_event(ev_completed("resp-1")),
|
||||
},
|
||||
];
|
||||
let second_chunks = vec![StreamingSseChunk {
|
||||
gate: None,
|
||||
body: responses::sse(vec![
|
||||
ev_response_created("resp-2"),
|
||||
responses::ev_assistant_message("msg-2", "accepted queued prompt handled"),
|
||||
ev_completed("resp-2"),
|
||||
]),
|
||||
}];
|
||||
let (server, _completions) =
|
||||
start_streaming_sse_server(vec![first_chunks, second_chunks]).await;
|
||||
|
||||
let mut builder = test_codex()
|
||||
.with_model("gpt-5.4")
|
||||
.with_pre_build_hook(|home| {
|
||||
if let Err(error) = write_user_prompt_submit_hook(
|
||||
home,
|
||||
HOOK_PROMPT_XML_USER_TEXT,
|
||||
BLOCKED_PROMPT_CONTEXT,
|
||||
) {
|
||||
panic!("failed to write user prompt submit hook test fixture: {error}");
|
||||
}
|
||||
})
|
||||
.with_config(|config| {
|
||||
config
|
||||
.features
|
||||
.enable(Feature::CodexHooks)
|
||||
.expect("test config should allow feature update");
|
||||
});
|
||||
let test = builder.build_with_streaming_server(&server).await?;
|
||||
|
||||
test.codex
|
||||
.submit(Op::UserInput {
|
||||
environments: None,
|
||||
items: vec![UserInput::Text {
|
||||
text: "initial prompt".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
responsesapi_client_metadata: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
wait_for_event(&test.codex, |event| {
|
||||
matches!(event, EventMsg::AgentMessageContentDelta(_))
|
||||
})
|
||||
.await;
|
||||
|
||||
for text in ["accepted queued prompt", HOOK_PROMPT_XML_USER_TEXT] {
|
||||
test.codex
|
||||
.submit(Op::UserInput {
|
||||
environments: None,
|
||||
items: vec![UserInput::Text {
|
||||
text: text.to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
responsesapi_client_metadata: None,
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
|
||||
sleep(Duration::from_millis(100)).await;
|
||||
let _ = gate_completed_tx.send(());
|
||||
|
||||
let requests = tokio::time::timeout(Duration::from_secs(30), async {
|
||||
loop {
|
||||
let requests = server.requests().await;
|
||||
if requests.len() >= 2 {
|
||||
break requests;
|
||||
}
|
||||
sleep(Duration::from_millis(50)).await;
|
||||
}
|
||||
})
|
||||
.await
|
||||
.expect("second request should arrive")
|
||||
.into_iter()
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
sleep(Duration::from_millis(100)).await;
|
||||
|
||||
assert_eq!(requests.len(), 2);
|
||||
let second_body: Value =
|
||||
from_slice(&requests[1]).unwrap_or_else(|err| panic!("parse second request: {err}"));
|
||||
let second_user_texts = message_input_texts(&second_body, "user");
|
||||
assert!(
|
||||
second_user_texts.contains(&"accepted queued prompt".to_string()),
|
||||
"second request should include the accepted queued prompt",
|
||||
);
|
||||
assert!(
|
||||
!second_user_texts.contains(&HOOK_PROMPT_XML_USER_TEXT.to_string()),
|
||||
"second request should not include the blocked XML-shaped queued prompt",
|
||||
);
|
||||
|
||||
let hook_inputs = read_user_prompt_submit_hook_inputs(test.codex_home_path())?;
|
||||
assert_eq!(
|
||||
hook_inputs
|
||||
.iter()
|
||||
.map(|input| {
|
||||
input["prompt"]
|
||||
.as_str()
|
||||
.expect("queued prompt hook prompt")
|
||||
.to_string()
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
vec![
|
||||
"initial prompt".to_string(),
|
||||
"accepted queued prompt".to_string(),
|
||||
HOOK_PROMPT_XML_USER_TEXT.to_string(),
|
||||
],
|
||||
);
|
||||
|
||||
server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn steered_user_input_waits_for_model_continuation_after_mid_turn_compact() {
|
||||
let first_chunks = vec![
|
||||
|
||||
Reference in New Issue
Block a user