[codex] Extract turn skill and plugin injections (#23396)

## Why

`run_turn` had accumulated the turn-scoped skill, plugin, app, MCP,
connector-selection, and analytics setup inline. That made the
orchestration path harder to scan even though the actual turn item
injection still needs to stay in `run_turn` so ordering is explicit.

## What changed

This extracts that setup into `build_skills_and_plugins`, which returns
the combined injection `ResponseItem`s and the explicitly enabled
connector IDs. `run_turn` now keeps the required orchestration pieces:
context update recording, user input handling, connector selection
merge, and the explicit per-item `record_conversation_items` calls for
injection items.

The refactor keeps the change LOC-neutral in `core/src/session/turn.rs`
and preserves the existing response-item based injection path.

## Validation

- `cargo test -p codex-core collect_explicit_app_ids_from_skill_items`
- `just fix -p codex-core`
This commit is contained in:
pakrym-oai
2026-05-18 20:33:27 -07:00
committed by GitHub
parent 1a25d8b6e5
commit 9e9a62dc28

View File

@@ -134,10 +134,6 @@ use tracing::warn;
/// - If the model sends only an assistant message, we record it in the
/// conversation history and consider the turn complete.
///
#[expect(
clippy::await_holding_invalid_type,
reason = "turn execution must keep active-turn state transitions atomic"
)]
pub(crate) async fn run_turn(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
@@ -172,131 +168,11 @@ pub(crate) async fn run_turn(
return None;
}
let skills_outcome = Some(turn_context.turn_skills.outcome.as_ref());
sess.record_context_updates_and_set_reference_context_item(turn_context.as_ref())
.await;
let loaded_plugins = sess
.services
.plugins_manager
.plugins_for_config(&turn_context.config.plugins_config_input())
.await;
// Structured plugin:// mentions are resolved from the current session's
// enabled plugins, then converted into turn-scoped guidance below.
let mentioned_plugins =
collect_explicit_plugin_mentions(&input, loaded_plugins.capability_summaries());
let mcp_tools = if turn_context.apps_enabled() || !mentioned_plugins.is_empty() {
// Plugin mentions need raw MCP/app inventory even when app tools
// are normally hidden so we can describe the plugin's currently
// usable capabilities for this turn.
match sess
.services
.mcp_connection_manager
.read()
.await
.list_all_tools()
.or_cancel(&cancellation_token)
.await
{
Ok(mcp_tools) => mcp_tools,
Err(_) if turn_context.apps_enabled() => return None,
Err(_) => Vec::new(),
}
} else {
Vec::new()
};
let available_connectors = if turn_context.apps_enabled() {
let connectors = codex_connectors::merge::merge_plugin_connectors_with_accessible(
loaded_plugins
.effective_apps()
.into_iter()
.map(|connector_id| connector_id.0),
connectors::accessible_connectors_from_mcp_tools(&mcp_tools),
);
connectors::with_app_enabled_state(connectors, &turn_context.config)
} else {
Vec::new()
};
let connector_slug_counts = build_connector_slug_counts(&available_connectors);
let skill_name_counts_lower = skills_outcome
.as_ref()
.map_or_else(HashMap::new, |outcome| {
build_skill_name_counts(&outcome.skills, &outcome.disabled_paths).1
});
let mentioned_skills = skills_outcome.as_ref().map_or_else(Vec::new, |outcome| {
collect_explicit_skill_mentions(
&input,
&outcome.skills,
&outcome.disabled_paths,
&connector_slug_counts,
)
});
maybe_prompt_and_install_mcp_dependencies(
sess.as_ref(),
turn_context.as_ref(),
&cancellation_token,
&mentioned_skills,
Some(sess.mcp_elicitation_reviewer()),
)
.await;
let session_telemetry = turn_context.session_telemetry.clone();
let thread_id = sess.conversation_id.to_string();
let tracking = build_track_events_context(
turn_context.model_info.slug.clone(),
thread_id,
turn_context.sub_id.clone(),
);
let SkillInjections {
items: skill_injections,
warnings: skill_warnings,
} = build_skill_injections(
&mentioned_skills,
skills_outcome,
Some(&session_telemetry),
&sess.services.analytics_events_client,
tracking.clone(),
)
.await;
for message in skill_warnings {
sess.send_event(&turn_context, EventMsg::Warning(WarningEvent { message }))
.await;
}
let skill_items: Vec<ResponseItem> = skill_injections
.iter()
.map(|skill| ContextualUserFragment::into(crate::context::SkillInstructions::from(skill)))
.collect();
let plugin_items =
build_plugin_injections(&mentioned_plugins, &mcp_tools, &available_connectors);
let mentioned_plugin_metadata = mentioned_plugins
.iter()
.filter_map(crate::plugins::PluginCapabilitySummary::telemetry_metadata)
.collect::<Vec<_>>();
let mut explicitly_enabled_connectors = collect_explicit_app_ids(&input);
explicitly_enabled_connectors.extend(collect_explicit_app_ids_from_skill_items(
&skill_items,
&available_connectors,
&skill_name_counts_lower,
));
let connector_names_by_id = available_connectors
.iter()
.map(|connector| (connector.id.as_str(), connector.name.as_str()))
.collect::<HashMap<&str, &str>>();
let mentioned_app_invocations = explicitly_enabled_connectors
.iter()
.map(|connector_id| AppInvocation {
connector_id: Some(connector_id.clone()),
app_name: connector_names_by_id
.get(connector_id.as_str())
.map(|name| (*name).to_string()),
invocation_type: Some(InvocationType::Explicit),
})
.collect::<Vec<_>>();
let (injection_items, explicitly_enabled_connectors) =
build_skills_and_plugins(&sess, turn_context.as_ref(), &input, &cancellation_token).await?;
if run_pending_session_start_hooks(&sess, &turn_context).await {
return None;
@@ -325,14 +201,6 @@ pub(crate) async fn run_turn(
.await;
user_prompt_submit_outcome.additional_contexts
};
sess.services
.analytics_events_client
.track_app_mentioned(tracking.clone(), mentioned_app_invocations);
for plugin in mentioned_plugin_metadata {
sess.services
.analytics_events_client
.track_plugin_used(tracking.clone(), plugin);
}
sess.merge_connector_selection(explicitly_enabled_connectors.clone())
.await;
record_additional_contexts(&sess, &turn_context, additional_contexts).await;
@@ -346,12 +214,8 @@ pub(crate) async fn run_turn(
}))
.await;
}
if !skill_items.is_empty() {
sess.record_conversation_items(&turn_context, &skill_items)
.await;
}
if !plugin_items.is_empty() {
sess.record_conversation_items(&turn_context, &plugin_items)
for response_item in injection_items {
sess.record_conversation_items(&turn_context, std::slice::from_ref(&response_item))
.await;
}
@@ -698,6 +562,142 @@ pub(crate) async fn run_turn(
last_agent_message
}
#[expect(
clippy::await_holding_invalid_type,
reason = "MCP tool listing borrows the read guard across cancellation-aware await"
)]
async fn build_skills_and_plugins(
sess: &Arc<Session>,
turn_context: &TurnContext,
input: &[UserInput],
cancellation_token: &CancellationToken,
) -> Option<(Vec<ResponseItem>, HashSet<String>)> {
let tracking = build_track_events_context(
turn_context.model_info.slug.clone(),
sess.conversation_id.to_string(),
turn_context.sub_id.clone(),
);
let loaded_plugins = sess
.services
.plugins_manager
.plugins_for_config(&turn_context.config.plugins_config_input())
.await;
// Structured plugin:// mentions are resolved from the current session's
// enabled plugins, then converted into turn-scoped guidance below.
let mentioned_plugins =
collect_explicit_plugin_mentions(input, loaded_plugins.capability_summaries());
let mcp_tools = if turn_context.apps_enabled() || !mentioned_plugins.is_empty() {
// Plugin mentions need raw MCP/app inventory even when app tools
// are normally hidden so we can describe the plugin's currently
// usable capabilities for this turn.
match sess
.services
.mcp_connection_manager
.read()
.await
.list_all_tools()
.or_cancel(cancellation_token)
.await
{
Ok(mcp_tools) => mcp_tools,
Err(_) if turn_context.apps_enabled() => return None,
Err(_) => Vec::new(),
}
} else {
Vec::new()
};
let available_connectors = if turn_context.apps_enabled() {
let connectors = codex_connectors::merge::merge_plugin_connectors_with_accessible(
loaded_plugins
.effective_apps()
.into_iter()
.map(|connector_id| connector_id.0),
connectors::accessible_connectors_from_mcp_tools(&mcp_tools),
);
connectors::with_app_enabled_state(connectors, &turn_context.config)
} else {
Vec::new()
};
let skills_outcome = turn_context.turn_skills.outcome.as_ref();
let connector_slug_counts = build_connector_slug_counts(&available_connectors);
let skill_name_counts_lower =
build_skill_name_counts(&skills_outcome.skills, &skills_outcome.disabled_paths).1;
let mentioned_skills = collect_explicit_skill_mentions(
input,
&skills_outcome.skills,
&skills_outcome.disabled_paths,
&connector_slug_counts,
);
maybe_prompt_and_install_mcp_dependencies(
sess,
turn_context,
cancellation_token,
&mentioned_skills,
Some(sess.mcp_elicitation_reviewer()),
)
.await;
let SkillInjections {
items: skill_injections,
warnings: skill_warnings,
} = build_skill_injections(
&mentioned_skills,
Some(skills_outcome),
Some(&turn_context.session_telemetry),
&sess.services.analytics_events_client,
tracking.clone(),
)
.await;
for message in skill_warnings {
sess.send_event(turn_context, EventMsg::Warning(WarningEvent { message }))
.await;
}
let skill_items: Vec<ResponseItem> = skill_injections
.iter()
.map(|skill| ContextualUserFragment::into(crate::context::SkillInstructions::from(skill)))
.collect();
let skill_connector_ids = collect_explicit_app_ids_from_skill_items(
&skill_items,
&available_connectors,
&skill_name_counts_lower,
);
let plugin_items =
build_plugin_injections(&mentioned_plugins, &mcp_tools, &available_connectors);
let mut explicitly_enabled_connectors = collect_explicit_app_ids(input);
explicitly_enabled_connectors.extend(skill_connector_ids);
let connector_names_by_id = available_connectors
.iter()
.map(|connector| (connector.id.as_str(), connector.name.as_str()))
.collect::<HashMap<&str, &str>>();
let mentioned_app_invocations = explicitly_enabled_connectors
.iter()
.map(|connector_id| AppInvocation {
connector_id: Some(connector_id.clone()),
app_name: connector_names_by_id
.get(connector_id.as_str())
.map(|name| (*name).to_string()),
invocation_type: Some(InvocationType::Explicit),
})
.collect::<Vec<_>>();
sess.services
.analytics_events_client
.track_app_mentioned(tracking.clone(), mentioned_app_invocations);
for plugin in mentioned_plugins
.iter()
.filter_map(crate::plugins::PluginCapabilitySummary::telemetry_metadata)
{
sess.services
.analytics_events_client
.track_plugin_used(tracking.clone(), plugin);
}
let mut injection_items = skill_items;
injection_items.extend(plugin_items);
Some((injection_items, explicitly_enabled_connectors))
}
async fn track_turn_resolved_config_analytics(
sess: &Session,
turn_context: &TurnContext,