From fbd4efa9ed6b9fe13dacd56247cc714903df72b7 Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Fri, 22 May 2026 15:21:08 -0700 Subject: [PATCH] [codex] Use TurnInput for session task input (#24151) ## Why The idea here is to erase the difference between initial and followup inputs to a turn. Followup inputs are already represented as TurnInput. Eventual goal is not to have explicit on task input at all and pull everything from input Q. ## What Changed - Changes `SessionTask::run` and the erased `AnySessionTask::run` path to accept `Vec`. - Wraps user-submitted spawn input as `TurnInput::UserInput` at the session task start boundary. - Updates `run_turn` to record initial `TurnInput` using the same hook and recording path used for pending input. - Keeps review-specific conversion local to `ReviewTask`, where the sub-Codex one-shot API still expects `Vec`. - Moves the synthetic compact prompt into `CompactTask` and starts compact tasks with empty task input. ## Validation - `cargo check -p codex-core` - `just test -p codex-core -E 'test(task_finish_emits_turn_item_lifecycle_for_leftover_pending_user_input) | test(queued_response_items_for_next_turn_move_into_next_active_turn) | test(steered_input_reopens_mailbox_delivery_for_current_turn)'` --- codex-rs/core/src/session/handlers.rs | 12 +-- codex-rs/core/src/session/tests.rs | 6 +- codex-rs/core/src/session/turn.rs | 104 +++++++++++++------------- codex-rs/core/src/tasks/compact.rs | 8 +- codex-rs/core/src/tasks/mod.rs | 13 +++- codex-rs/core/src/tasks/regular.rs | 4 +- codex-rs/core/src/tasks/review.rs | 13 +++- codex-rs/core/src/tasks/user_shell.rs | 4 +- 8 files changed, 89 insertions(+), 75 deletions(-) diff --git a/codex-rs/core/src/session/handlers.rs b/codex-rs/core/src/session/handlers.rs index 3d57bd7074..b1e36b0347 100644 --- a/codex-rs/core/src/session/handlers.rs +++ b/codex-rs/core/src/session/handlers.rs @@ -460,16 +460,8 @@ pub async fn reload_user_config(sess: &Arc) { pub async fn compact(sess: &Arc, sub_id: String) { let turn_context = sess.new_default_turn_with_sub_id(sub_id).await; - sess.spawn_task( - Arc::clone(&turn_context), - vec![UserInput::Text { - text: turn_context.compact_prompt().to_string(), - // Compaction prompt is synthesized; no UI element ranges to preserve. - text_elements: Vec::new(), - }], - CompactTask, - ) - .await; + sess.spawn_task(Arc::clone(&turn_context), Vec::new(), CompactTask) + .await; } pub async fn thread_rollback(sess: &Arc, sub_id: String, num_turns: u32) { diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index df7559a7d9..30d26191d7 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -5738,7 +5738,7 @@ async fn spawn_task_turn_span_inherits_dispatch_trace_context() { self: Arc, _session: Arc, _ctx: Arc, - _input: Vec, + _input: Vec, _cancellation_token: CancellationToken, ) -> Option { let mut trace = self @@ -7814,7 +7814,7 @@ impl SessionTask for NeverEndingTask { self: Arc, _session: Arc, _ctx: Arc, - _input: Vec, + _input: Vec, cancellation_token: CancellationToken, ) -> Option { if self.listen_to_cancellation_token { @@ -7843,7 +7843,7 @@ impl SessionTask for GuardianDeniedApprovalTask { self: Arc, session: Arc, ctx: Arc, - _input: Vec, + _input: Vec, cancellation_token: CancellationToken, ) -> Option { let session = session.clone_session(); diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 26ba845460..f81b2b1f74 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -114,8 +114,8 @@ use tracing::trace; use tracing::trace_span; use tracing::warn; -/// Takes a user message as input and runs a loop where, at each sampling request, the model -/// replies with either: +/// Takes initial turn input and runs a loop where, at each sampling request, +/// the model replies with either: /// /// - requested function calls /// - an assistant message @@ -132,7 +132,7 @@ pub(crate) async fn run_turn( sess: Arc, turn_context: Arc, turn_extension_data: Arc, - input: Vec, + input: Vec, prewarmed_client_session: Option, cancellation_token: CancellationToken, ) -> Option { @@ -172,26 +172,9 @@ pub(crate) async fn run_turn( if run_pending_session_start_hooks(&sess, &turn_context).await { return None; } - if !input.is_empty() { - let initial_turn_input = TurnInput::UserInput(input.clone()); - let user_prompt_submit_outcome = - inspect_pending_input(&sess, &turn_context, &initial_turn_input).await; - if user_prompt_submit_outcome.should_stop { - record_additional_contexts( - &sess, - &turn_context, - user_prompt_submit_outcome.additional_contexts, - ) - .await; - return None; - } - record_pending_input( - &sess, - &turn_context, - initial_turn_input, - user_prompt_submit_outcome.additional_contexts, - ) - .await; + let mut can_drain_pending_input = input.is_empty(); + if run_hooks_and_record_inputs(&sess, &turn_context, &input).await { + return None; } sess.merge_connector_selection(explicitly_enabled_connectors.clone()) @@ -232,9 +215,8 @@ pub(crate) async fn run_turn( // one instance across retries within this turn. // Pending input is drained into history before building the next model request. // However, we defer that drain until after sampling in two cases: - // 1. At the start of a turn, so the fresh user prompt in `input` gets sampled first. + // 1. At the start of a turn, so the fresh turn input in `input` gets sampled first. // 2. After auto-compact, when model/tool continuation needs to resume before any steer. - let mut can_drain_pending_input = input.is_empty(); loop { // Note that pending_input would be something like a message the user @@ -246,27 +228,7 @@ pub(crate) async fn run_turn( Vec::new() }; - let mut blocked_pending_input = false; - let mut accepted_pending_input = false; - for pending_input_item in pending_input { - let hook_outcome = - inspect_pending_input(&sess, &turn_context, &pending_input_item).await; - if hook_outcome.should_stop { - blocked_pending_input = true; - record_additional_contexts(&sess, &turn_context, hook_outcome.additional_contexts) - .await; - } else { - accepted_pending_input = true; - record_pending_input( - &sess, - &turn_context, - pending_input_item, - hook_outcome.additional_contexts, - ) - .await; - } - } - if blocked_pending_input && !accepted_pending_input { + if run_hooks_and_record_inputs(&sess, &turn_context, &pending_input).await { break; } @@ -450,6 +412,32 @@ pub(crate) async fn run_turn( last_agent_message } +async fn run_hooks_and_record_inputs( + sess: &Arc, + turn_context: &Arc, + input: &[TurnInput], +) -> bool { + let mut blocked_input = false; + let mut accepted_input = false; + for input_item in input { + let hook_outcome = inspect_pending_input(sess, turn_context, input_item).await; + if hook_outcome.should_stop { + blocked_input = true; + record_additional_contexts(sess, turn_context, hook_outcome.additional_contexts).await; + } else { + accepted_input = true; + record_pending_input( + sess, + turn_context, + input_item.clone(), + hook_outcome.additional_contexts, + ) + .await; + } + } + blocked_input && !accepted_input +} + #[expect( clippy::await_holding_invalid_type, reason = "MCP tool listing borrows the read guard across cancellation-aware await" @@ -457,9 +445,18 @@ pub(crate) async fn run_turn( async fn build_skills_and_plugins( sess: &Arc, turn_context: &TurnContext, - input: &[UserInput], + input: &[TurnInput], cancellation_token: &CancellationToken, ) -> Option<(Vec, HashSet)> { + let user_input = input + .iter() + .filter_map(|item| match item { + TurnInput::UserInput(content) => Some(content.as_slice()), + TurnInput::ResponseInputItem(_) => None, + }) + .flatten() + .cloned() + .collect::>(); let tracking = build_track_events_context( turn_context.model_info.slug.clone(), sess.conversation_id.to_string(), @@ -473,7 +470,7 @@ async fn build_skills_and_plugins( // Structured plugin:// mentions are resolved from the current session's // enabled plugins, then converted into turn-scoped guidance below. let mentioned_plugins = - collect_explicit_plugin_mentions(input, loaded_plugins.capability_summaries()); + collect_explicit_plugin_mentions(&user_input, loaded_plugins.capability_summaries()); let mcp_tools = if turn_context.apps_enabled() || !mentioned_plugins.is_empty() { // Plugin mentions need raw MCP/app inventory even when app tools // are normally hidden so we can describe the plugin's currently @@ -511,7 +508,7 @@ async fn build_skills_and_plugins( let skill_name_counts_lower = build_skill_name_counts(&skills_outcome.skills, &skills_outcome.disabled_paths).1; let mentioned_skills = collect_explicit_skill_mentions( - input, + &user_input, &skills_outcome.skills, &skills_outcome.disabled_paths, &connector_slug_counts, @@ -553,7 +550,7 @@ async fn build_skills_and_plugins( ); let plugin_items = build_plugin_injections(&mentioned_plugins, &mcp_tools, &available_connectors); - let mut explicitly_enabled_connectors = collect_explicit_app_ids(input); + let mut explicitly_enabled_connectors = collect_explicit_app_ids(&user_input); explicitly_enabled_connectors.extend(skill_connector_ids); let connector_names_by_id = available_connectors .iter() @@ -589,7 +586,7 @@ async fn build_skills_and_plugins( async fn track_turn_resolved_config_analytics( sess: &Session, turn_context: &TurnContext, - input: &[UserInput], + input: &[TurnInput], ) { let thread_config = { let state = sess.state.lock().await; @@ -606,6 +603,11 @@ async fn track_turn_resolved_config_analytics( thread_id: sess.conversation_id.to_string(), num_input_images: input .iter() + .filter_map(|item| match item { + TurnInput::UserInput(content) => Some(content.as_slice()), + TurnInput::ResponseInputItem(_) => None, + }) + .flatten() .filter(|item| { matches!(item, UserInput::Image { .. } | UserInput::LocalImage { .. }) }) diff --git a/codex-rs/core/src/tasks/compact.rs b/codex-rs/core/src/tasks/compact.rs index dddf46391e..77914633a2 100644 --- a/codex-rs/core/src/tasks/compact.rs +++ b/codex-rs/core/src/tasks/compact.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use super::SessionTask; use super::SessionTaskContext; +use crate::session::TurnInput; use crate::session::turn_context::TurnContext; use crate::state::TaskKind; use codex_protocol::user_input::UserInput; @@ -23,7 +24,7 @@ impl SessionTask for CompactTask { self: Arc, session: Arc, ctx: Arc, - input: Vec, + _input: Vec, _cancellation_token: CancellationToken, ) -> Option { let session = session.clone_session(); @@ -47,6 +48,11 @@ impl SessionTask for CompactTask { /*inc*/ 1, &[("type", "local")], ); + let input = vec![UserInput::Text { + text: ctx.compact_prompt().to_string(), + // Compaction prompt is synthesized; no UI element ranges to preserve. + text_elements: Vec::new(), + }]; crate::compact::run_compact_task(session.clone(), ctx, input).await }; None diff --git a/codex-rs/core/src/tasks/mod.rs b/codex-rs/core/src/tasks/mod.rs index 9cb6b5f0f2..4d6db8acf2 100644 --- a/codex-rs/core/src/tasks/mod.rs +++ b/codex-rs/core/src/tasks/mod.rs @@ -214,7 +214,7 @@ pub(crate) trait SessionTask: Send + Sync + 'static { self: Arc, session: Arc, ctx: Arc, - input: Vec, + input: Vec, cancellation_token: CancellationToken, ) -> impl std::future::Future> + Send; @@ -245,7 +245,7 @@ pub(crate) trait AnySessionTask: Send + Sync + 'static { self: Arc, session: Arc, ctx: Arc, - input: Vec, + input: Vec, cancellation_token: CancellationToken, ) -> BoxFuture<'static, Option>; @@ -276,7 +276,7 @@ where self: Arc, session: Arc, ctx: Arc, - input: Vec, + input: Vec, cancellation_token: CancellationToken, ) -> BoxFuture<'static, Option> { Box::pin(SessionTask::run( @@ -380,6 +380,11 @@ impl Session { )); let ctx = Arc::clone(&turn_context); let task_for_run = Arc::clone(&task); + let task_input = if input.is_empty() { + Vec::new() + } else { + vec![TurnInput::UserInput(input)] + }; let task_cancellation_token = cancellation_token.child_token(); // Task-owned turn spans keep a core-owned span open for the // full task lifecycle after the submission dispatch span ends. @@ -405,7 +410,7 @@ impl Session { .run( Arc::clone(&session_ctx), ctx, - input, + task_input, task_cancellation_token.child_token(), ) .await; diff --git a/codex-rs/core/src/tasks/regular.rs b/codex-rs/core/src/tasks/regular.rs index 531d5d7da7..6c6a0e5b09 100644 --- a/codex-rs/core/src/tasks/regular.rs +++ b/codex-rs/core/src/tasks/regular.rs @@ -2,13 +2,13 @@ use std::sync::Arc; use tokio_util::sync::CancellationToken; +use crate::session::TurnInput; use crate::session::turn::run_turn; use crate::session::turn_context::TurnContext; use crate::session_startup_prewarm::SessionStartupPrewarmResolution; use crate::state::TaskKind; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::TurnStartedEvent; -use codex_protocol::user_input::UserInput; use tracing::Instrument; use tracing::trace_span; @@ -41,7 +41,7 @@ impl SessionTask for RegularTask { self: Arc, session: Arc, ctx: Arc, - input: Vec, + input: Vec, cancellation_token: CancellationToken, ) -> Option { let sess = session.clone_session(); diff --git a/codex-rs/core/src/tasks/review.rs b/codex-rs/core/src/tasks/review.rs index f89c7d062f..ff7f38a7ea 100644 --- a/codex-rs/core/src/tasks/review.rs +++ b/codex-rs/core/src/tasks/review.rs @@ -20,6 +20,7 @@ use crate::codex_delegate::run_codex_thread_one_shot; use crate::config::Constrained; use crate::review_format::format_review_findings_block; use crate::review_format::render_review_output_text; +use crate::session::TurnInput; use crate::session::session::Session; use crate::session::turn_context::TurnContext; use crate::state::TaskKind; @@ -59,7 +60,7 @@ impl SessionTask for ReviewTask { self: Arc, session: Arc, ctx: Arc, - input: Vec, + input: Vec, cancellation_token: CancellationToken, ) -> Option { session.session.services.session_telemetry.counter( @@ -68,11 +69,19 @@ impl SessionTask for ReviewTask { &[], ); + let mut user_input = Vec::new(); + for item in input { + match item { + TurnInput::UserInput(mut content) => user_input.append(&mut content), + TurnInput::ResponseInputItem(_) => {} + } + } + // Start sub-codex conversation and get the receiver for events. let output = match start_review_conversation( session.clone(), ctx.clone(), - input, + user_input, cancellation_token.clone(), ) .await diff --git a/codex-rs/core/src/tasks/user_shell.rs b/codex-rs/core/src/tasks/user_shell.rs index 2ce4056957..396aecbeea 100644 --- a/codex-rs/core/src/tasks/user_shell.rs +++ b/codex-rs/core/src/tasks/user_shell.rs @@ -7,7 +7,6 @@ use codex_network_proxy::PROXY_ACTIVE_ENV_KEY; use codex_network_proxy::PROXY_ENV_KEYS; #[cfg(target_os = "macos")] use codex_network_proxy::PROXY_GIT_SSH_COMMAND_ENV_KEY; -use codex_protocol::user_input::UserInput; use tokio_util::sync::CancellationToken; use tracing::error; use uuid::Uuid; @@ -17,6 +16,7 @@ use crate::exec::StdoutStream; use crate::exec::execute_exec_request; use crate::exec_env::create_env; use crate::sandboxing::ExecRequest; +use crate::session::TurnInput; use crate::session::turn_context::TurnContext; use crate::state::TaskKind; use crate::tools::format_exec_output_str; @@ -77,7 +77,7 @@ impl SessionTask for UserShellCommandTask { self: Arc, session: Arc, turn_context: Arc, - _input: Vec, + _input: Vec, cancellation_token: CancellationToken, ) -> Option { execute_user_shell_command(