[codex] Use TurnInput for session task input (#24151)

## Why

The idea here is to erase the difference between initial and followup
inputs to a turn. Followup inputs are already represented as TurnInput.

Eventual goal is not to have explicit on task input at all and pull
everything from input Q.

## What Changed

- Changes `SessionTask::run` and the erased `AnySessionTask::run` path
to accept `Vec<TurnInput>`.
- Wraps user-submitted spawn input as `TurnInput::UserInput` at the
session task start boundary.
- Updates `run_turn` to record initial `TurnInput` using the same hook
and recording path used for pending input.
- Keeps review-specific conversion local to `ReviewTask`, where the
sub-Codex one-shot API still expects `Vec<UserInput>`.
- Moves the synthetic compact prompt into `CompactTask` and starts
compact tasks with empty task input.

## Validation

- `cargo check -p codex-core`
- `just test -p codex-core -E
'test(task_finish_emits_turn_item_lifecycle_for_leftover_pending_user_input)
| test(queued_response_items_for_next_turn_move_into_next_active_turn) |
test(steered_input_reopens_mailbox_delivery_for_current_turn)'`
This commit is contained in:
pakrym-oai
2026-05-22 15:21:08 -07:00
committed by GitHub
parent 195ba3eb88
commit fbd4efa9ed
8 changed files with 89 additions and 75 deletions

View File

@@ -460,16 +460,8 @@ pub async fn reload_user_config(sess: &Arc<Session>) {
pub async fn compact(sess: &Arc<Session>, sub_id: String) {
let turn_context = sess.new_default_turn_with_sub_id(sub_id).await;
sess.spawn_task(
Arc::clone(&turn_context),
vec![UserInput::Text {
text: turn_context.compact_prompt().to_string(),
// Compaction prompt is synthesized; no UI element ranges to preserve.
text_elements: Vec::new(),
}],
CompactTask,
)
.await;
sess.spawn_task(Arc::clone(&turn_context), Vec::new(), CompactTask)
.await;
}
pub async fn thread_rollback(sess: &Arc<Session>, sub_id: String, num_turns: u32) {

View File

@@ -5738,7 +5738,7 @@ async fn spawn_task_turn_span_inherits_dispatch_trace_context() {
self: Arc<Self>,
_session: Arc<SessionTaskContext>,
_ctx: Arc<TurnContext>,
_input: Vec<UserInput>,
_input: Vec<TurnInput>,
_cancellation_token: CancellationToken,
) -> Option<String> {
let mut trace = self
@@ -7814,7 +7814,7 @@ impl SessionTask for NeverEndingTask {
self: Arc<Self>,
_session: Arc<SessionTaskContext>,
_ctx: Arc<TurnContext>,
_input: Vec<UserInput>,
_input: Vec<TurnInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
if self.listen_to_cancellation_token {
@@ -7843,7 +7843,7 @@ impl SessionTask for GuardianDeniedApprovalTask {
self: Arc<Self>,
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
_input: Vec<UserInput>,
_input: Vec<TurnInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
let session = session.clone_session();

View File

@@ -114,8 +114,8 @@ use tracing::trace;
use tracing::trace_span;
use tracing::warn;
/// Takes a user message as input and runs a loop where, at each sampling request, the model
/// replies with either:
/// Takes initial turn input and runs a loop where, at each sampling request,
/// the model replies with either:
///
/// - requested function calls
/// - an assistant message
@@ -132,7 +132,7 @@ pub(crate) async fn run_turn(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
turn_extension_data: Arc<codex_extension_api::ExtensionData>,
input: Vec<UserInput>,
input: Vec<TurnInput>,
prewarmed_client_session: Option<ModelClientSession>,
cancellation_token: CancellationToken,
) -> Option<String> {
@@ -172,26 +172,9 @@ pub(crate) async fn run_turn(
if run_pending_session_start_hooks(&sess, &turn_context).await {
return None;
}
if !input.is_empty() {
let initial_turn_input = TurnInput::UserInput(input.clone());
let user_prompt_submit_outcome =
inspect_pending_input(&sess, &turn_context, &initial_turn_input).await;
if user_prompt_submit_outcome.should_stop {
record_additional_contexts(
&sess,
&turn_context,
user_prompt_submit_outcome.additional_contexts,
)
.await;
return None;
}
record_pending_input(
&sess,
&turn_context,
initial_turn_input,
user_prompt_submit_outcome.additional_contexts,
)
.await;
let mut can_drain_pending_input = input.is_empty();
if run_hooks_and_record_inputs(&sess, &turn_context, &input).await {
return None;
}
sess.merge_connector_selection(explicitly_enabled_connectors.clone())
@@ -232,9 +215,8 @@ pub(crate) async fn run_turn(
// one instance across retries within this turn.
// Pending input is drained into history before building the next model request.
// However, we defer that drain until after sampling in two cases:
// 1. At the start of a turn, so the fresh user prompt in `input` gets sampled first.
// 1. At the start of a turn, so the fresh turn input in `input` gets sampled first.
// 2. After auto-compact, when model/tool continuation needs to resume before any steer.
let mut can_drain_pending_input = input.is_empty();
loop {
// Note that pending_input would be something like a message the user
@@ -246,27 +228,7 @@ pub(crate) async fn run_turn(
Vec::new()
};
let mut blocked_pending_input = false;
let mut accepted_pending_input = false;
for pending_input_item in pending_input {
let hook_outcome =
inspect_pending_input(&sess, &turn_context, &pending_input_item).await;
if hook_outcome.should_stop {
blocked_pending_input = true;
record_additional_contexts(&sess, &turn_context, hook_outcome.additional_contexts)
.await;
} else {
accepted_pending_input = true;
record_pending_input(
&sess,
&turn_context,
pending_input_item,
hook_outcome.additional_contexts,
)
.await;
}
}
if blocked_pending_input && !accepted_pending_input {
if run_hooks_and_record_inputs(&sess, &turn_context, &pending_input).await {
break;
}
@@ -450,6 +412,32 @@ pub(crate) async fn run_turn(
last_agent_message
}
async fn run_hooks_and_record_inputs(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
input: &[TurnInput],
) -> bool {
let mut blocked_input = false;
let mut accepted_input = false;
for input_item in input {
let hook_outcome = inspect_pending_input(sess, turn_context, input_item).await;
if hook_outcome.should_stop {
blocked_input = true;
record_additional_contexts(sess, turn_context, hook_outcome.additional_contexts).await;
} else {
accepted_input = true;
record_pending_input(
sess,
turn_context,
input_item.clone(),
hook_outcome.additional_contexts,
)
.await;
}
}
blocked_input && !accepted_input
}
#[expect(
clippy::await_holding_invalid_type,
reason = "MCP tool listing borrows the read guard across cancellation-aware await"
@@ -457,9 +445,18 @@ pub(crate) async fn run_turn(
async fn build_skills_and_plugins(
sess: &Arc<Session>,
turn_context: &TurnContext,
input: &[UserInput],
input: &[TurnInput],
cancellation_token: &CancellationToken,
) -> Option<(Vec<ResponseItem>, HashSet<String>)> {
let user_input = input
.iter()
.filter_map(|item| match item {
TurnInput::UserInput(content) => Some(content.as_slice()),
TurnInput::ResponseInputItem(_) => None,
})
.flatten()
.cloned()
.collect::<Vec<_>>();
let tracking = build_track_events_context(
turn_context.model_info.slug.clone(),
sess.conversation_id.to_string(),
@@ -473,7 +470,7 @@ async fn build_skills_and_plugins(
// Structured plugin:// mentions are resolved from the current session's
// enabled plugins, then converted into turn-scoped guidance below.
let mentioned_plugins =
collect_explicit_plugin_mentions(input, loaded_plugins.capability_summaries());
collect_explicit_plugin_mentions(&user_input, loaded_plugins.capability_summaries());
let mcp_tools = if turn_context.apps_enabled() || !mentioned_plugins.is_empty() {
// Plugin mentions need raw MCP/app inventory even when app tools
// are normally hidden so we can describe the plugin's currently
@@ -511,7 +508,7 @@ async fn build_skills_and_plugins(
let skill_name_counts_lower =
build_skill_name_counts(&skills_outcome.skills, &skills_outcome.disabled_paths).1;
let mentioned_skills = collect_explicit_skill_mentions(
input,
&user_input,
&skills_outcome.skills,
&skills_outcome.disabled_paths,
&connector_slug_counts,
@@ -553,7 +550,7 @@ async fn build_skills_and_plugins(
);
let plugin_items =
build_plugin_injections(&mentioned_plugins, &mcp_tools, &available_connectors);
let mut explicitly_enabled_connectors = collect_explicit_app_ids(input);
let mut explicitly_enabled_connectors = collect_explicit_app_ids(&user_input);
explicitly_enabled_connectors.extend(skill_connector_ids);
let connector_names_by_id = available_connectors
.iter()
@@ -589,7 +586,7 @@ async fn build_skills_and_plugins(
async fn track_turn_resolved_config_analytics(
sess: &Session,
turn_context: &TurnContext,
input: &[UserInput],
input: &[TurnInput],
) {
let thread_config = {
let state = sess.state.lock().await;
@@ -606,6 +603,11 @@ async fn track_turn_resolved_config_analytics(
thread_id: sess.conversation_id.to_string(),
num_input_images: input
.iter()
.filter_map(|item| match item {
TurnInput::UserInput(content) => Some(content.as_slice()),
TurnInput::ResponseInputItem(_) => None,
})
.flatten()
.filter(|item| {
matches!(item, UserInput::Image { .. } | UserInput::LocalImage { .. })
})

View File

@@ -2,6 +2,7 @@ use std::sync::Arc;
use super::SessionTask;
use super::SessionTaskContext;
use crate::session::TurnInput;
use crate::session::turn_context::TurnContext;
use crate::state::TaskKind;
use codex_protocol::user_input::UserInput;
@@ -23,7 +24,7 @@ impl SessionTask for CompactTask {
self: Arc<Self>,
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
input: Vec<UserInput>,
_input: Vec<TurnInput>,
_cancellation_token: CancellationToken,
) -> Option<String> {
let session = session.clone_session();
@@ -47,6 +48,11 @@ impl SessionTask for CompactTask {
/*inc*/ 1,
&[("type", "local")],
);
let input = vec![UserInput::Text {
text: ctx.compact_prompt().to_string(),
// Compaction prompt is synthesized; no UI element ranges to preserve.
text_elements: Vec::new(),
}];
crate::compact::run_compact_task(session.clone(), ctx, input).await
};
None

View File

@@ -214,7 +214,7 @@ pub(crate) trait SessionTask: Send + Sync + 'static {
self: Arc<Self>,
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
input: Vec<UserInput>,
input: Vec<TurnInput>,
cancellation_token: CancellationToken,
) -> impl std::future::Future<Output = Option<String>> + Send;
@@ -245,7 +245,7 @@ pub(crate) trait AnySessionTask: Send + Sync + 'static {
self: Arc<Self>,
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
input: Vec<UserInput>,
input: Vec<TurnInput>,
cancellation_token: CancellationToken,
) -> BoxFuture<'static, Option<String>>;
@@ -276,7 +276,7 @@ where
self: Arc<Self>,
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
input: Vec<UserInput>,
input: Vec<TurnInput>,
cancellation_token: CancellationToken,
) -> BoxFuture<'static, Option<String>> {
Box::pin(SessionTask::run(
@@ -380,6 +380,11 @@ impl Session {
));
let ctx = Arc::clone(&turn_context);
let task_for_run = Arc::clone(&task);
let task_input = if input.is_empty() {
Vec::new()
} else {
vec![TurnInput::UserInput(input)]
};
let task_cancellation_token = cancellation_token.child_token();
// Task-owned turn spans keep a core-owned span open for the
// full task lifecycle after the submission dispatch span ends.
@@ -405,7 +410,7 @@ impl Session {
.run(
Arc::clone(&session_ctx),
ctx,
input,
task_input,
task_cancellation_token.child_token(),
)
.await;

View File

@@ -2,13 +2,13 @@ use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use crate::session::TurnInput;
use crate::session::turn::run_turn;
use crate::session::turn_context::TurnContext;
use crate::session_startup_prewarm::SessionStartupPrewarmResolution;
use crate::state::TaskKind;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::TurnStartedEvent;
use codex_protocol::user_input::UserInput;
use tracing::Instrument;
use tracing::trace_span;
@@ -41,7 +41,7 @@ impl SessionTask for RegularTask {
self: Arc<Self>,
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
input: Vec<UserInput>,
input: Vec<TurnInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
let sess = session.clone_session();

View File

@@ -20,6 +20,7 @@ use crate::codex_delegate::run_codex_thread_one_shot;
use crate::config::Constrained;
use crate::review_format::format_review_findings_block;
use crate::review_format::render_review_output_text;
use crate::session::TurnInput;
use crate::session::session::Session;
use crate::session::turn_context::TurnContext;
use crate::state::TaskKind;
@@ -59,7 +60,7 @@ impl SessionTask for ReviewTask {
self: Arc<Self>,
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
input: Vec<UserInput>,
input: Vec<TurnInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
session.session.services.session_telemetry.counter(
@@ -68,11 +69,19 @@ impl SessionTask for ReviewTask {
&[],
);
let mut user_input = Vec::new();
for item in input {
match item {
TurnInput::UserInput(mut content) => user_input.append(&mut content),
TurnInput::ResponseInputItem(_) => {}
}
}
// Start sub-codex conversation and get the receiver for events.
let output = match start_review_conversation(
session.clone(),
ctx.clone(),
input,
user_input,
cancellation_token.clone(),
)
.await

View File

@@ -7,7 +7,6 @@ use codex_network_proxy::PROXY_ACTIVE_ENV_KEY;
use codex_network_proxy::PROXY_ENV_KEYS;
#[cfg(target_os = "macos")]
use codex_network_proxy::PROXY_GIT_SSH_COMMAND_ENV_KEY;
use codex_protocol::user_input::UserInput;
use tokio_util::sync::CancellationToken;
use tracing::error;
use uuid::Uuid;
@@ -17,6 +16,7 @@ use crate::exec::StdoutStream;
use crate::exec::execute_exec_request;
use crate::exec_env::create_env;
use crate::sandboxing::ExecRequest;
use crate::session::TurnInput;
use crate::session::turn_context::TurnContext;
use crate::state::TaskKind;
use crate::tools::format_exec_output_str;
@@ -77,7 +77,7 @@ impl SessionTask for UserShellCommandTask {
self: Arc<Self>,
session: Arc<SessionTaskContext>,
turn_context: Arc<TurnContext>,
_input: Vec<UserInput>,
_input: Vec<TurnInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
execute_user_shell_command(