diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 64fb0d0ad4..0b133cd1a6 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -83,7 +83,9 @@ use codex_protocol::dynamic_tools::DynamicToolResponse; use codex_protocol::dynamic_tools::DynamicToolSpec; use codex_protocol::items::PlanItem; use codex_protocol::items::TurnItem; +use codex_protocol::items::TurnItemMetadata; use codex_protocol::items::UserMessageItem; +use codex_protocol::items::UserMessageType; use codex_protocol::mcp::CallToolResult; use codex_protocol::models::BaseInstructions; use codex_protocol::models::PermissionProfile; @@ -3655,13 +3657,17 @@ impl Session { turn_context: &TurnContext, input: &[UserInput], response_item: ResponseItem, + user_message_type: Option, ) { // Persist the user message to history, but emit the turn item from `UserInput` so // UI-only `text_elements` are preserved. `ResponseItem::Message` does not carry // those spans, and `record_response_item_and_emit_turn_item` would drop them. self.record_conversation_items(turn_context, std::slice::from_ref(&response_item)) .await; - let turn_item = TurnItem::UserMessage(UserMessageItem::new(input)); + let metadata = user_message_type.map(|kind| TurnItemMetadata { + user_message_type: Some(kind), + }); + let turn_item = TurnItem::UserMessage(UserMessageItem::new_with_metadata(input, metadata)); self.emit_turn_item_started(turn_context, &turn_item).await; self.emit_turn_item_completed(turn_context, turn_item).await; self.ensure_rollout_materialized().await; @@ -3755,7 +3761,7 @@ impl Session { } let mut turn_state = active_turn.turn_state.lock().await; - turn_state.push_pending_input(input.into()); + turn_state.push_pending_input(input.into(), Some(UserMessageType::PromptSteering)); Ok(active_turn_id.clone()) } @@ -3769,7 +3775,11 @@ impl Session { Some(at) => { let mut ts = at.turn_state.lock().await; for item in input { - ts.push_pending_input(item); + let user_message_type = match &item { + ResponseInputItem::Message { .. } => Some(UserMessageType::PromptQueued), + _ => None, + }; + ts.push_pending_input(item, user_message_type); } Ok(()) } @@ -3777,12 +3787,17 @@ impl Session { } } - pub async fn get_pending_input(&self) -> Vec { + pub async fn get_pending_input_with_metadata( + &self, + ) -> Vec<(ResponseInputItem, Option)> { let mut active = self.active_turn.lock().await; match active.as_mut() { Some(at) => { let mut ts = at.turn_state.lock().await; - ts.take_pending_input() + ts.take_pending_input_with_metadata() + .into_iter() + .map(|item| (item.input, item.user_message_type)) + .collect() } None => Vec::with_capacity(0), } @@ -5504,8 +5519,13 @@ pub(crate) async fn run_turn( let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input.clone()); let response_item: ResponseItem = initial_input_for_turn.clone().into(); - sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), &input, response_item) - .await; + sess.record_user_prompt_and_emit_turn_item( + turn_context.as_ref(), + &input, + response_item, + Some(UserMessageType::Prompt), + ) + .await; // Track the previous-turn baseline from the regular user-turn path only so // standalone tasks (compact/shell/review/undo) cannot suppress future // model/realtime injections. @@ -5592,21 +5612,18 @@ pub(crate) async fn run_turn( // Note that pending_input would be something like a message the user // submitted through the UI while the model was running. Though the UI // may support this, the model might not. - let pending_response_items = sess - .get_pending_input() - .await - .into_iter() - .map(ResponseItem::from) - .collect::>(); + let pending_response_items = sess.get_pending_input_with_metadata().await; if !pending_response_items.is_empty() { - for response_item in pending_response_items { + for (pending_input, user_message_type) in pending_response_items { + let response_item = ResponseItem::from(pending_input); if let Some(TurnItem::UserMessage(user_message)) = parse_turn_item(&response_item) { // todo(aibrahim): move pending input to be UserInput only to keep TextElements. context: https://github.com/openai/codex/pull/10656#discussion_r2765522480 sess.record_user_prompt_and_emit_turn_item( turn_context.as_ref(), &user_message.content, response_item, + user_message_type, ) .await; } else { diff --git a/codex-rs/core/src/codex_tests.rs b/codex-rs/core/src/codex_tests.rs index 69ce86b61b..9d3919be4b 100644 --- a/codex-rs/core/src/codex_tests.rs +++ b/codex-rs/core/src/codex_tests.rs @@ -3737,12 +3737,19 @@ async fn task_finish_emits_turn_item_lifecycle_for_leftover_pending_user_input() assert!(matches!( second.msg, EventMsg::ItemStarted(ItemStartedEvent { - item: TurnItem::UserMessage(UserMessageItem { content, .. }), + item: TurnItem::UserMessage(UserMessageItem { + content, + metadata, + .. + }), .. }) if content == vec![UserInput::Text { text: "late pending input".to_string(), text_elements: Vec::new(), - }] + }] && metadata + .as_ref() + .and_then(|item_metadata| item_metadata.user_message_type.as_ref()) + == Some(&codex_protocol::items::UserMessageType::PromptQueued) )); let third = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()) @@ -3752,12 +3759,19 @@ async fn task_finish_emits_turn_item_lifecycle_for_leftover_pending_user_input() assert!(matches!( third.msg, EventMsg::ItemCompleted(ItemCompletedEvent { - item: TurnItem::UserMessage(UserMessageItem { content, .. }), + item: TurnItem::UserMessage(UserMessageItem { + content, + metadata, + .. + }), .. }) if content == vec![UserInput::Text { text: "late pending input".to_string(), text_elements: Vec::new(), - }] + }] && metadata + .as_ref() + .and_then(|item_metadata| item_metadata.user_message_type.as_ref()) + == Some(&codex_protocol::items::UserMessageType::PromptQueued) )); let fourth = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()) @@ -3871,6 +3885,14 @@ async fn steer_input_returns_active_turn_id() { assert_eq!(turn_id, tc.sub_id); assert!(sess.has_pending_input().await); + let pending_input = sess.get_pending_input_with_metadata().await; + assert_eq!(pending_input.len(), 1); + assert_eq!( + pending_input + .first() + .and_then(|(_, user_message_type)| user_message_type.as_ref()), + Some(&codex_protocol::items::UserMessageType::PromptSteering) + ); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] diff --git a/codex-rs/core/src/state/turn.rs b/codex-rs/core/src/state/turn.rs index e2e141d387..ac3b69b2d6 100644 --- a/codex-rs/core/src/state/turn.rs +++ b/codex-rs/core/src/state/turn.rs @@ -9,6 +9,7 @@ use tokio_util::sync::CancellationToken; use tokio_util::task::AbortOnDropHandle; use codex_protocol::dynamic_tools::DynamicToolResponse; +use codex_protocol::items::UserMessageType; use codex_protocol::models::ResponseInputItem; use codex_protocol::request_permissions::RequestPermissionsResponse; use codex_protocol::request_user_input::RequestUserInputResponse; @@ -56,6 +57,12 @@ pub(crate) struct RunningTask { pub(crate) _timer: Option, } +#[derive(Debug, Clone)] +pub(crate) struct PendingInputItem { + pub(crate) input: ResponseInputItem, + pub(crate) user_message_type: Option, +} + impl ActiveTurn { pub(crate) fn add_task(&mut self, task: RunningTask) { let sub_id = task.turn_context.sub_id.clone(); @@ -80,7 +87,7 @@ pub(crate) struct TurnState { pending_user_input: HashMap>, pending_elicitations: HashMap<(String, RequestId), oneshot::Sender>, pending_dynamic_tools: HashMap>, - pending_input: Vec, + pending_input: Vec, granted_permissions: Option, pub(crate) tool_calls: u64, pub(crate) token_usage_at_turn_start: TokenUsage, @@ -175,11 +182,18 @@ impl TurnState { self.pending_dynamic_tools.remove(key) } - pub(crate) fn push_pending_input(&mut self, input: ResponseInputItem) { - self.pending_input.push(input); + pub(crate) fn push_pending_input( + &mut self, + input: ResponseInputItem, + user_message_type: Option, + ) { + self.pending_input.push(PendingInputItem { + input, + user_message_type, + }); } - pub(crate) fn take_pending_input(&mut self) -> Vec { + pub(crate) fn take_pending_input_with_metadata(&mut self) -> Vec { if self.pending_input.is_empty() { Vec::with_capacity(0) } else { diff --git a/codex-rs/core/src/tasks/mod.rs b/codex-rs/core/src/tasks/mod.rs index 638cb3febd..848679eb3e 100644 --- a/codex-rs/core/src/tasks/mod.rs +++ b/codex-rs/core/src/tasks/mod.rs @@ -40,7 +40,6 @@ use codex_otel::metrics::names::TURN_TOKEN_USAGE_METRIC; use codex_otel::metrics::names::TURN_TOOL_CALL_METRIC; use codex_protocol::items::TurnItem; use codex_protocol::models::ContentItem; -use codex_protocol::models::ResponseInputItem; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::RolloutItem; use codex_protocol::user_input::UserInput; @@ -242,7 +241,7 @@ impl Session { .cancel_git_enrichment_task(); let mut active = self.active_turn.lock().await; - let mut pending_input = Vec::::new(); + let mut pending_input = Vec::new(); let mut should_clear_active_turn = false; let mut token_usage_at_turn_start = None; let mut turn_tool_calls = 0_u64; @@ -250,7 +249,7 @@ impl Session { && at.remove_task(&turn_context.sub_id) { let mut ts = at.turn_state.lock().await; - pending_input = ts.take_pending_input(); + pending_input = ts.take_pending_input_with_metadata(); turn_tool_calls = ts.tool_calls; token_usage_at_turn_start = Some(ts.token_usage_at_turn_start.clone()); should_clear_active_turn = true; @@ -260,11 +259,11 @@ impl Session { } drop(active); if !pending_input.is_empty() { - let pending_response_items = pending_input + for (pending_input, user_message_type) in pending_input .into_iter() - .map(ResponseItem::from) - .collect::>(); - for response_item in pending_response_items { + .map(|item| (item.input, item.user_message_type)) + { + let response_item = ResponseItem::from(pending_input); if let Some(TurnItem::UserMessage(user_message)) = parse_turn_item(&response_item) { // Keep leftover user input on the same persistence + lifecycle path as the // normal pre-sampling drain. This helper records the response item once, then @@ -273,6 +272,7 @@ impl Session { turn_context.as_ref(), &user_message.content, response_item, + user_message_type, ) .await; } else { diff --git a/codex-rs/core/src/tools/js_repl/mod.rs b/codex-rs/core/src/tools/js_repl/mod.rs index d8d043a7d9..494e0667f5 100644 --- a/codex-rs/core/src/tools/js_repl/mod.rs +++ b/codex-rs/core/src/tools/js_repl/mod.rs @@ -2557,7 +2557,7 @@ console.log(out.type); .await?; assert!(result.output.contains("function_call_output")); assert!(result.content_items.is_empty()); - assert!(session.get_pending_input().await.is_empty()); + assert!(session.get_pending_input_with_metadata().await.is_empty()); Ok(()) } @@ -2623,7 +2623,7 @@ console.log(out.type); }] .as_slice() ); - assert!(session.get_pending_input().await.is_empty()); + assert!(session.get_pending_input_with_metadata().await.is_empty()); Ok(()) } @@ -2678,7 +2678,7 @@ await codex.emitImage({ bytes: png, mimeType: "image/png" }); }] .as_slice() ); - assert!(session.get_pending_input().await.is_empty()); + assert!(session.get_pending_input_with_metadata().await.is_empty()); Ok(()) } @@ -2742,7 +2742,7 @@ await codex.emitImage( ] .as_slice() ); - assert!(session.get_pending_input().await.is_empty()); + assert!(session.get_pending_input_with_metadata().await.is_empty()); Ok(()) } @@ -2797,7 +2797,7 @@ console.log("cell-complete"); }] .as_slice() ); - assert!(session.get_pending_input().await.is_empty()); + assert!(session.get_pending_input_with_metadata().await.is_empty()); Ok(()) } @@ -2841,7 +2841,7 @@ console.log("cell-complete"); .await .expect_err("unawaited invalid emitImage should fail"); assert!(err.to_string().contains("expected non-empty bytes")); - assert!(session.get_pending_input().await.is_empty()); + assert!(session.get_pending_input_with_metadata().await.is_empty()); Ok(()) } @@ -2890,7 +2890,7 @@ console.log("cell-complete"); assert!(result.output.contains("expected non-empty bytes")); assert!(result.output.contains("cell-complete")); assert!(result.content_items.is_empty()); - assert!(session.get_pending_input().await.is_empty()); + assert!(session.get_pending_input_with_metadata().await.is_empty()); Ok(()) } @@ -2937,7 +2937,7 @@ await codex.emitImage({ bytes: png }); .await .expect_err("missing mimeType should fail"); assert!(err.to_string().contains("expected a non-empty mimeType")); - assert!(session.get_pending_input().await.is_empty()); + assert!(session.get_pending_input_with_metadata().await.is_empty()); Ok(()) } @@ -2980,7 +2980,7 @@ await codex.emitImage("https://example.com/image.png"); .await .expect_err("non-data URLs should fail"); assert!(err.to_string().contains("only accepts data URLs")); - assert!(session.get_pending_input().await.is_empty()); + assert!(session.get_pending_input_with_metadata().await.is_empty()); Ok(()) } @@ -3029,7 +3029,7 @@ await codex.emitImage("DATA:image/png;base64,AAA"); }] .as_slice() ); - assert!(session.get_pending_input().await.is_empty()); + assert!(session.get_pending_input_with_metadata().await.is_empty()); Ok(()) } @@ -3076,7 +3076,7 @@ await codex.emitImage({ bytes: png, mimeType: "image/png", detail: "ultra" }); .await .expect_err("invalid detail should fail"); assert!(err.to_string().contains("expected detail to be one of")); - assert!(session.get_pending_input().await.is_empty()); + assert!(session.get_pending_input_with_metadata().await.is_empty()); Ok(()) } @@ -3160,7 +3160,7 @@ await codex.emitImage(out); err.to_string() .contains("does not accept mixed text and image content") ); - assert!(session.get_pending_input().await.is_empty()); + assert!(session.get_pending_input_with_metadata().await.is_empty()); Ok(()) } diff --git a/codex-rs/core/tests/suite/items.rs b/codex-rs/core/tests/suite/items.rs index 113a946019..e18488b4c8 100644 --- a/codex-rs/core/tests/suite/items.rs +++ b/codex-rs/core/tests/suite/items.rs @@ -83,6 +83,20 @@ async fn user_message_item_is_emitted() -> anyhow::Result<()> { assert_eq!(started_item.id, completed_item.id); assert_eq!(started_item.content, vec![expected_input.clone()]); assert_eq!(completed_item.content, vec![expected_input]); + assert_eq!( + started_item + .metadata + .as_ref() + .and_then(|metadata| metadata.user_message_type.as_ref()), + Some(&codex_protocol::items::UserMessageType::Prompt) + ); + assert_eq!( + completed_item + .metadata + .as_ref() + .and_then(|metadata| metadata.user_message_type.as_ref()), + Some(&codex_protocol::items::UserMessageType::Prompt) + ); let legacy_message = wait_for_event_match(&codex, |ev| match ev { EventMsg::UserMessage(event) => Some(event.clone()), diff --git a/codex-rs/protocol/src/items.rs b/codex-rs/protocol/src/items.rs index 824d10a148..81a852fa08 100644 --- a/codex-rs/protocol/src/items.rs +++ b/codex-rs/protocol/src/items.rs @@ -157,10 +157,14 @@ impl Default for ContextCompactionItem { impl UserMessageItem { pub fn new(content: &[UserInput]) -> Self { + Self::new_with_metadata(content, None) + } + + pub fn new_with_metadata(content: &[UserInput], metadata: Option) -> Self { Self { id: uuid::Uuid::new_v4().to_string(), content: content.to_vec(), - metadata: None, + metadata, } }