mirror of
https://github.com/openai/codex.git
synced 2026-06-01 19:02:59 +00:00
Split codex session modules (#18244)
## Summary - split `codex.rs` session definitions and constructor into `codex/session.rs` - move MCP session methods into `codex/mcp.rs` - move turn-context types/helpers into `codex/turn_context.rs` - move review thread spawning into `codex/review.rs` ## Testing - `cargo check -p codex-core` - `just fmt` - `just fix -p codex-core` - `cargo test -p codex-core` (unit tests passed; integration run failed locally with 45 failures, including missing helper binaries such as `test_stdio_server`/`codex` plus approval/web-search/MCP-related cases)
This commit is contained in:
File diff suppressed because it is too large
Load Diff
284
codex-rs/core/src/codex/mcp.rs
Normal file
284
codex-rs/core/src/codex/mcp.rs
Normal file
@@ -0,0 +1,284 @@
|
||||
use super::*;
|
||||
|
||||
impl Session {
|
||||
pub async fn request_mcp_server_elicitation(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
request_id: RequestId,
|
||||
params: McpServerElicitationRequestParams,
|
||||
) -> Option<ElicitationResponse> {
|
||||
let server_name = params.server_name.clone();
|
||||
let request = match params.request {
|
||||
McpServerElicitationRequest::Form {
|
||||
meta,
|
||||
message,
|
||||
requested_schema,
|
||||
} => {
|
||||
let requested_schema = match serde_json::to_value(requested_schema) {
|
||||
Ok(requested_schema) => requested_schema,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to serialize MCP elicitation schema for server_name: {server_name}, request_id: {request_id}: {err:#}"
|
||||
);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
codex_protocol::approvals::ElicitationRequest::Form {
|
||||
meta,
|
||||
message,
|
||||
requested_schema,
|
||||
}
|
||||
}
|
||||
McpServerElicitationRequest::Url {
|
||||
meta,
|
||||
message,
|
||||
url,
|
||||
elicitation_id,
|
||||
} => codex_protocol::approvals::ElicitationRequest::Url {
|
||||
meta,
|
||||
message,
|
||||
url,
|
||||
elicitation_id,
|
||||
},
|
||||
};
|
||||
|
||||
let (tx_response, rx_response) = oneshot::channel();
|
||||
let prev_entry = {
|
||||
let mut active = self.active_turn.lock().await;
|
||||
match active.as_mut() {
|
||||
Some(at) => {
|
||||
let mut ts = at.turn_state.lock().await;
|
||||
ts.insert_pending_elicitation(
|
||||
server_name.clone(),
|
||||
request_id.clone(),
|
||||
tx_response,
|
||||
)
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
};
|
||||
if prev_entry.is_some() {
|
||||
warn!(
|
||||
"Overwriting existing pending elicitation for server_name: {server_name}, request_id: {request_id}"
|
||||
);
|
||||
}
|
||||
let id = match request_id {
|
||||
rmcp::model::NumberOrString::String(value) => {
|
||||
codex_protocol::mcp::RequestId::String(value.to_string())
|
||||
}
|
||||
rmcp::model::NumberOrString::Number(value) => {
|
||||
codex_protocol::mcp::RequestId::Integer(value)
|
||||
}
|
||||
};
|
||||
let event = EventMsg::ElicitationRequest(ElicitationRequestEvent {
|
||||
turn_id: params.turn_id,
|
||||
server_name,
|
||||
id,
|
||||
request,
|
||||
});
|
||||
self.send_event(turn_context, event).await;
|
||||
rx_response.await.ok()
|
||||
}
|
||||
|
||||
pub async fn resolve_elicitation(
|
||||
&self,
|
||||
server_name: String,
|
||||
id: RequestId,
|
||||
response: ElicitationResponse,
|
||||
) -> anyhow::Result<()> {
|
||||
let entry = {
|
||||
let mut active = self.active_turn.lock().await;
|
||||
match active.as_mut() {
|
||||
Some(at) => {
|
||||
let mut ts = at.turn_state.lock().await;
|
||||
ts.remove_pending_elicitation(&server_name, &id)
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
};
|
||||
if let Some(tx_response) = entry {
|
||||
tx_response
|
||||
.send(response)
|
||||
.map_err(|e| anyhow::anyhow!("failed to send elicitation response: {e:?}"))?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.resolve_elicitation(server_name, id, response)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn list_resources(
|
||||
&self,
|
||||
server: &str,
|
||||
params: Option<PaginatedRequestParams>,
|
||||
) -> anyhow::Result<ListResourcesResult> {
|
||||
self.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.list_resources(server, params)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn list_resource_templates(
|
||||
&self,
|
||||
server: &str,
|
||||
params: Option<PaginatedRequestParams>,
|
||||
) -> anyhow::Result<ListResourceTemplatesResult> {
|
||||
self.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.list_resource_templates(server, params)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn read_resource(
|
||||
&self,
|
||||
server: &str,
|
||||
params: ReadResourceRequestParams,
|
||||
) -> anyhow::Result<ReadResourceResult> {
|
||||
self.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.read_resource(server, params)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn call_tool(
|
||||
&self,
|
||||
server: &str,
|
||||
tool: &str,
|
||||
arguments: Option<serde_json::Value>,
|
||||
meta: Option<serde_json::Value>,
|
||||
) -> anyhow::Result<CallToolResult> {
|
||||
self.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.call_tool(server, tool, arguments, meta)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn resolve_mcp_tool_info(&self, tool_name: &ToolName) -> Option<ToolInfo> {
|
||||
self.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.resolve_tool_info(tool_name)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn refresh_mcp_servers_inner(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
mcp_servers: HashMap<String, McpServerConfig>,
|
||||
store_mode: OAuthCredentialsStoreMode,
|
||||
) {
|
||||
let auth = self.services.auth_manager.auth().await;
|
||||
let config = self.get_config().await;
|
||||
let mcp_config = config
|
||||
.to_mcp_config(self.services.plugins_manager.as_ref())
|
||||
.await;
|
||||
let tool_plugin_provenance = self
|
||||
.services
|
||||
.mcp_manager
|
||||
.tool_plugin_provenance(config.as_ref())
|
||||
.await;
|
||||
let mcp_servers = with_codex_apps_mcp(mcp_servers, auth.as_ref(), &mcp_config);
|
||||
let auth_statuses = compute_auth_statuses(mcp_servers.iter(), store_mode).await;
|
||||
{
|
||||
let mut guard = self.services.mcp_startup_cancellation_token.lock().await;
|
||||
guard.cancel();
|
||||
*guard = CancellationToken::new();
|
||||
}
|
||||
let (refreshed_manager, cancel_token) = McpConnectionManager::new(
|
||||
&mcp_servers,
|
||||
store_mode,
|
||||
auth_statuses,
|
||||
&turn_context.config.permissions.approval_policy,
|
||||
turn_context.sub_id.clone(),
|
||||
self.get_tx_event(),
|
||||
turn_context.sandbox_policy.get().clone(),
|
||||
config.codex_home.to_path_buf(),
|
||||
codex_apps_tools_cache_key(auth.as_ref()),
|
||||
tool_plugin_provenance,
|
||||
)
|
||||
.await;
|
||||
{
|
||||
let mut guard = self.services.mcp_startup_cancellation_token.lock().await;
|
||||
if guard.is_cancelled() {
|
||||
cancel_token.cancel();
|
||||
}
|
||||
*guard = cancel_token;
|
||||
}
|
||||
|
||||
let mut manager = self.services.mcp_connection_manager.write().await;
|
||||
*manager = refreshed_manager;
|
||||
}
|
||||
|
||||
pub(crate) async fn refresh_mcp_servers_if_requested(&self, turn_context: &TurnContext) {
|
||||
let refresh_config = { self.pending_mcp_server_refresh_config.lock().await.take() };
|
||||
let Some(refresh_config) = refresh_config else {
|
||||
return;
|
||||
};
|
||||
|
||||
let McpServerRefreshConfig {
|
||||
mcp_servers,
|
||||
mcp_oauth_credentials_store_mode,
|
||||
} = refresh_config;
|
||||
|
||||
let mcp_servers =
|
||||
match serde_json::from_value::<HashMap<String, McpServerConfig>>(mcp_servers) {
|
||||
Ok(servers) => servers,
|
||||
Err(err) => {
|
||||
warn!("failed to parse MCP server refresh config: {err}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
let store_mode = match serde_json::from_value::<OAuthCredentialsStoreMode>(
|
||||
mcp_oauth_credentials_store_mode,
|
||||
) {
|
||||
Ok(mode) => mode,
|
||||
Err(err) => {
|
||||
warn!("failed to parse MCP OAuth refresh config: {err}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
self.refresh_mcp_servers_inner(turn_context, mcp_servers, store_mode)
|
||||
.await;
|
||||
}
|
||||
|
||||
pub(crate) async fn refresh_mcp_servers_now(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
mcp_servers: HashMap<String, McpServerConfig>,
|
||||
store_mode: OAuthCredentialsStoreMode,
|
||||
) {
|
||||
self.refresh_mcp_servers_inner(turn_context, mcp_servers, store_mode)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) async fn mcp_startup_cancellation_token(&self) -> CancellationToken {
|
||||
self.services
|
||||
.mcp_startup_cancellation_token
|
||||
.lock()
|
||||
.await
|
||||
.clone()
|
||||
}
|
||||
|
||||
pub(crate) async fn cancel_mcp_startup(&self) {
|
||||
self.services
|
||||
.mcp_startup_cancellation_token
|
||||
.lock()
|
||||
.await
|
||||
.cancel();
|
||||
}
|
||||
}
|
||||
164
codex-rs/core/src/codex/review.rs
Normal file
164
codex-rs/core/src/codex/review.rs
Normal file
@@ -0,0 +1,164 @@
|
||||
use super::turn_context::image_generation_tool_auth_allowed;
|
||||
use super::*;
|
||||
|
||||
/// Spawn a review thread using the given prompt.
|
||||
pub(super) async fn spawn_review_thread(
|
||||
sess: Arc<Session>,
|
||||
config: Arc<Config>,
|
||||
parent_turn_context: Arc<TurnContext>,
|
||||
sub_id: String,
|
||||
resolved: crate::review_prompts::ResolvedReviewRequest,
|
||||
) {
|
||||
let model = config
|
||||
.review_model
|
||||
.clone()
|
||||
.unwrap_or_else(|| parent_turn_context.model_info.slug.clone());
|
||||
let review_model_info = sess
|
||||
.services
|
||||
.models_manager
|
||||
.get_model_info(&model, &config.to_models_manager_config())
|
||||
.await;
|
||||
// For reviews, disable web_search and view_image regardless of global settings.
|
||||
let mut review_features = sess.features.clone();
|
||||
let _ = review_features.disable(Feature::WebSearchRequest);
|
||||
let _ = review_features.disable(Feature::WebSearchCached);
|
||||
let review_web_search_mode = WebSearchMode::Disabled;
|
||||
let tools_config = ToolsConfig::new(&ToolsConfigParams {
|
||||
model_info: &review_model_info,
|
||||
available_models: &sess
|
||||
.services
|
||||
.models_manager
|
||||
.list_models(RefreshStrategy::OnlineIfUncached)
|
||||
.await,
|
||||
features: &review_features,
|
||||
image_generation_tool_auth_allowed: image_generation_tool_auth_allowed(Some(
|
||||
sess.services.auth_manager.as_ref(),
|
||||
)),
|
||||
web_search_mode: Some(review_web_search_mode),
|
||||
session_source: parent_turn_context.session_source.clone(),
|
||||
sandbox_policy: parent_turn_context.sandbox_policy.get(),
|
||||
windows_sandbox_level: parent_turn_context.windows_sandbox_level,
|
||||
})
|
||||
.with_unified_exec_shell_mode_for_session(
|
||||
crate::tools::spec::tool_user_shell_type(sess.services.user_shell.as_ref()),
|
||||
sess.services.shell_zsh_path.as_ref(),
|
||||
sess.services.main_execve_wrapper_exe.as_ref(),
|
||||
)
|
||||
.with_web_search_config(/*web_search_config*/ None)
|
||||
.with_allow_login_shell(config.permissions.allow_login_shell)
|
||||
.with_has_environment(parent_turn_context.environment.is_some())
|
||||
.with_spawn_agent_usage_hint(config.multi_agent_v2.usage_hint_enabled)
|
||||
.with_spawn_agent_usage_hint_text(config.multi_agent_v2.usage_hint_text.clone())
|
||||
.with_hide_spawn_agent_metadata(config.multi_agent_v2.hide_spawn_agent_metadata)
|
||||
.with_agent_type_description(crate::agent::role::spawn_tool_spec::build(
|
||||
&config.agent_roles,
|
||||
));
|
||||
|
||||
let review_prompt = resolved.prompt.clone();
|
||||
let provider = parent_turn_context.provider.clone();
|
||||
let auth_manager = parent_turn_context.auth_manager.clone();
|
||||
let model_info = review_model_info.clone();
|
||||
|
||||
// Build per‑turn client with the requested model/family.
|
||||
let mut per_turn_config = (*config).clone();
|
||||
per_turn_config.model = Some(model.clone());
|
||||
per_turn_config.features = review_features.clone();
|
||||
if let Err(err) = per_turn_config.web_search_mode.set(review_web_search_mode) {
|
||||
let fallback_value = per_turn_config.web_search_mode.value();
|
||||
tracing::warn!(
|
||||
error = %err,
|
||||
?review_web_search_mode,
|
||||
?fallback_value,
|
||||
"review web_search_mode is disallowed by requirements; keeping constrained value"
|
||||
);
|
||||
}
|
||||
|
||||
let session_telemetry = parent_turn_context
|
||||
.session_telemetry
|
||||
.clone()
|
||||
.with_model(model.as_str(), review_model_info.slug.as_str());
|
||||
let auth_manager_for_context = auth_manager.clone();
|
||||
let provider_for_context = provider.clone();
|
||||
let session_telemetry_for_context = session_telemetry.clone();
|
||||
let reasoning_effort = per_turn_config.model_reasoning_effort;
|
||||
let reasoning_summary = per_turn_config
|
||||
.model_reasoning_summary
|
||||
.unwrap_or(model_info.default_reasoning_summary);
|
||||
let session_source = parent_turn_context.session_source.clone();
|
||||
|
||||
let per_turn_config = Arc::new(per_turn_config);
|
||||
let review_turn_id = sub_id.to_string();
|
||||
let turn_metadata_state = Arc::new(TurnMetadataState::new(
|
||||
sess.conversation_id.to_string(),
|
||||
&session_source,
|
||||
review_turn_id.clone(),
|
||||
parent_turn_context.cwd.clone(),
|
||||
parent_turn_context.sandbox_policy.get(),
|
||||
parent_turn_context.windows_sandbox_level,
|
||||
));
|
||||
|
||||
let review_turn_context = TurnContext {
|
||||
sub_id: review_turn_id,
|
||||
trace_id: current_span_trace_id(),
|
||||
realtime_active: parent_turn_context.realtime_active,
|
||||
config: per_turn_config,
|
||||
auth_manager: auth_manager_for_context,
|
||||
model_info: model_info.clone(),
|
||||
session_telemetry: session_telemetry_for_context,
|
||||
provider: provider_for_context,
|
||||
reasoning_effort,
|
||||
reasoning_summary,
|
||||
session_source,
|
||||
environment: parent_turn_context.environment.clone(),
|
||||
tools_config,
|
||||
features: parent_turn_context.features.clone(),
|
||||
ghost_snapshot: parent_turn_context.ghost_snapshot.clone(),
|
||||
current_date: parent_turn_context.current_date.clone(),
|
||||
timezone: parent_turn_context.timezone.clone(),
|
||||
app_server_client_name: parent_turn_context.app_server_client_name.clone(),
|
||||
developer_instructions: None,
|
||||
user_instructions: None,
|
||||
compact_prompt: parent_turn_context.compact_prompt.clone(),
|
||||
collaboration_mode: parent_turn_context.collaboration_mode.clone(),
|
||||
personality: parent_turn_context.personality,
|
||||
approval_policy: parent_turn_context.approval_policy.clone(),
|
||||
sandbox_policy: parent_turn_context.sandbox_policy.clone(),
|
||||
file_system_sandbox_policy: parent_turn_context.file_system_sandbox_policy.clone(),
|
||||
network_sandbox_policy: parent_turn_context.network_sandbox_policy,
|
||||
network: parent_turn_context.network.clone(),
|
||||
windows_sandbox_level: parent_turn_context.windows_sandbox_level,
|
||||
shell_environment_policy: parent_turn_context.shell_environment_policy.clone(),
|
||||
cwd: parent_turn_context.cwd.clone(),
|
||||
final_output_json_schema: None,
|
||||
codex_self_exe: parent_turn_context.codex_self_exe.clone(),
|
||||
codex_linux_sandbox_exe: parent_turn_context.codex_linux_sandbox_exe.clone(),
|
||||
tool_call_gate: Arc::new(ReadinessFlag::new()),
|
||||
js_repl: Arc::clone(&sess.js_repl),
|
||||
dynamic_tools: parent_turn_context.dynamic_tools.clone(),
|
||||
truncation_policy: model_info.truncation_policy.into(),
|
||||
turn_metadata_state,
|
||||
turn_skills: TurnSkillsContext::new(parent_turn_context.turn_skills.outcome.clone()),
|
||||
turn_timing_state: Arc::new(TurnTimingState::default()),
|
||||
};
|
||||
|
||||
// Seed the child task with the review prompt as the initial user message.
|
||||
let input: Vec<UserInput> = vec![UserInput::Text {
|
||||
text: review_prompt,
|
||||
// Review prompt is synthesized; no UI element ranges to preserve.
|
||||
text_elements: Vec::new(),
|
||||
}];
|
||||
let tc = Arc::new(review_turn_context);
|
||||
tc.turn_metadata_state.spawn_git_enrichment_task();
|
||||
// TODO(ccunningham): Review turns currently rely on `spawn_task` for TurnComplete but do not
|
||||
// emit a parent TurnStarted. Consider giving review a full parent turn lifecycle
|
||||
// (TurnStarted + TurnComplete) for consistency with other standalone tasks.
|
||||
sess.spawn_task(tc.clone(), input, ReviewTask::new()).await;
|
||||
|
||||
// Announce entering review mode so UIs can switch modes.
|
||||
let review_request = ReviewRequest {
|
||||
target: resolved.target,
|
||||
user_facing_hint: Some(resolved.user_facing_hint),
|
||||
};
|
||||
sess.send_event(&tc, EventMsg::EnteredReviewMode(review_request))
|
||||
.await;
|
||||
}
|
||||
844
codex-rs/core/src/codex/session.rs
Normal file
844
codex-rs/core/src/codex/session.rs
Normal file
@@ -0,0 +1,844 @@
|
||||
use super::*;
|
||||
|
||||
/// Context for an initialized model agent
|
||||
///
|
||||
/// A session has at most 1 running task at a time, and can be interrupted by user input.
|
||||
pub(crate) struct Session {
|
||||
pub(crate) conversation_id: ThreadId,
|
||||
pub(super) tx_event: Sender<Event>,
|
||||
pub(super) agent_status: watch::Sender<AgentStatus>,
|
||||
pub(super) out_of_band_elicitation_paused: watch::Sender<bool>,
|
||||
pub(super) state: Mutex<SessionState>,
|
||||
/// Serializes rebuild/apply cycles for the running proxy; each cycle
|
||||
/// rebuilds from the current SessionState while holding this lock.
|
||||
pub(super) managed_network_proxy_refresh_lock: Mutex<()>,
|
||||
/// The set of enabled features should be invariant for the lifetime of the
|
||||
/// session.
|
||||
pub(super) features: ManagedFeatures,
|
||||
pub(super) pending_mcp_server_refresh_config: Mutex<Option<McpServerRefreshConfig>>,
|
||||
pub(crate) conversation: Arc<RealtimeConversationManager>,
|
||||
pub(crate) active_turn: Mutex<Option<ActiveTurn>>,
|
||||
pub(super) mailbox: Mailbox,
|
||||
pub(super) mailbox_rx: Mutex<MailboxReceiver>,
|
||||
pub(super) idle_pending_input: Mutex<Vec<ResponseInputItem>>, // TODO (jif) merge with mailbox!
|
||||
pub(crate) guardian_review_session: GuardianReviewSessionManager,
|
||||
pub(crate) services: SessionServices,
|
||||
pub(super) js_repl: Arc<JsReplHandle>,
|
||||
pub(super) next_internal_sub_id: AtomicU64,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct SessionConfiguration {
|
||||
/// Provider identifier ("openai", "openrouter", ...).
|
||||
pub(super) provider: ModelProviderInfo,
|
||||
|
||||
pub(super) collaboration_mode: CollaborationMode,
|
||||
pub(super) model_reasoning_summary: Option<ReasoningSummaryConfig>,
|
||||
pub(super) service_tier: Option<ServiceTier>,
|
||||
|
||||
/// Developer instructions that supplement the base instructions.
|
||||
pub(super) developer_instructions: Option<String>,
|
||||
|
||||
/// Model instructions that are appended to the base instructions.
|
||||
pub(super) user_instructions: Option<String>,
|
||||
|
||||
/// Personality preference for the model.
|
||||
pub(super) personality: Option<Personality>,
|
||||
|
||||
/// Base instructions for the session.
|
||||
pub(super) base_instructions: String,
|
||||
|
||||
/// Compact prompt override.
|
||||
pub(super) compact_prompt: Option<String>,
|
||||
|
||||
/// When to escalate for approval for execution
|
||||
pub(super) approval_policy: Constrained<AskForApproval>,
|
||||
pub(super) approvals_reviewer: ApprovalsReviewer,
|
||||
/// How to sandbox commands executed in the system
|
||||
pub(super) sandbox_policy: Constrained<SandboxPolicy>,
|
||||
pub(super) file_system_sandbox_policy: FileSystemSandboxPolicy,
|
||||
pub(super) network_sandbox_policy: NetworkSandboxPolicy,
|
||||
pub(super) windows_sandbox_level: WindowsSandboxLevel,
|
||||
|
||||
/// Absolute working directory that should be treated as the *root* of the
|
||||
/// session. All relative paths supplied by the model as well as the
|
||||
/// execution sandbox are resolved against this directory **instead** of
|
||||
/// the process-wide current working directory.
|
||||
pub(super) cwd: AbsolutePathBuf,
|
||||
/// Directory containing all Codex state for this session.
|
||||
pub(super) codex_home: AbsolutePathBuf,
|
||||
/// Optional user-facing name for the thread, updated during the session.
|
||||
pub(super) thread_name: Option<String>,
|
||||
|
||||
// TODO(pakrym): Remove config from here
|
||||
pub(super) original_config_do_not_use: Arc<Config>,
|
||||
/// Optional service name tag for session metrics.
|
||||
pub(super) metrics_service_name: Option<String>,
|
||||
pub(super) app_server_client_name: Option<String>,
|
||||
pub(super) app_server_client_version: Option<String>,
|
||||
/// Source of the session (cli, vscode, exec, mcp, ...)
|
||||
pub(super) session_source: SessionSource,
|
||||
pub(super) dynamic_tools: Vec<DynamicToolSpec>,
|
||||
pub(super) persist_extended_history: bool,
|
||||
pub(super) inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
|
||||
pub(super) user_shell_override: Option<shell::Shell>,
|
||||
}
|
||||
|
||||
impl SessionConfiguration {
|
||||
pub(crate) fn codex_home(&self) -> &AbsolutePathBuf {
|
||||
&self.codex_home
|
||||
}
|
||||
|
||||
pub(super) fn thread_config_snapshot(&self) -> ThreadConfigSnapshot {
|
||||
ThreadConfigSnapshot {
|
||||
model: self.collaboration_mode.model().to_string(),
|
||||
model_provider_id: self.original_config_do_not_use.model_provider_id.clone(),
|
||||
service_tier: self.service_tier,
|
||||
approval_policy: self.approval_policy.value(),
|
||||
approvals_reviewer: self.approvals_reviewer,
|
||||
sandbox_policy: self.sandbox_policy.get().clone(),
|
||||
cwd: self.cwd.clone(),
|
||||
ephemeral: self.original_config_do_not_use.ephemeral,
|
||||
reasoning_effort: self.collaboration_mode.reasoning_effort(),
|
||||
personality: self.personality,
|
||||
session_source: self.session_source.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn apply(&self, updates: &SessionSettingsUpdate) -> ConstraintResult<Self> {
|
||||
let mut next_configuration = self.clone();
|
||||
let file_system_policy_matches_legacy = self.file_system_sandbox_policy
|
||||
== FileSystemSandboxPolicy::from_legacy_sandbox_policy(
|
||||
self.sandbox_policy.get(),
|
||||
&self.cwd,
|
||||
);
|
||||
if let Some(collaboration_mode) = updates.collaboration_mode.clone() {
|
||||
next_configuration.collaboration_mode = collaboration_mode;
|
||||
}
|
||||
if let Some(summary) = updates.reasoning_summary {
|
||||
next_configuration.model_reasoning_summary = Some(summary);
|
||||
}
|
||||
if let Some(service_tier) = updates.service_tier {
|
||||
next_configuration.service_tier = service_tier;
|
||||
}
|
||||
if let Some(personality) = updates.personality {
|
||||
next_configuration.personality = Some(personality);
|
||||
}
|
||||
if let Some(approval_policy) = updates.approval_policy {
|
||||
next_configuration.approval_policy.set(approval_policy)?;
|
||||
}
|
||||
if let Some(approvals_reviewer) = updates.approvals_reviewer {
|
||||
next_configuration.approvals_reviewer = approvals_reviewer;
|
||||
}
|
||||
let mut sandbox_policy_changed = false;
|
||||
if let Some(sandbox_policy) = updates.sandbox_policy.clone() {
|
||||
next_configuration.sandbox_policy.set(sandbox_policy)?;
|
||||
next_configuration.network_sandbox_policy =
|
||||
NetworkSandboxPolicy::from(next_configuration.sandbox_policy.get());
|
||||
sandbox_policy_changed = true;
|
||||
}
|
||||
if let Some(windows_sandbox_level) = updates.windows_sandbox_level {
|
||||
next_configuration.windows_sandbox_level = windows_sandbox_level;
|
||||
}
|
||||
|
||||
let absolute_cwd = updates
|
||||
.cwd
|
||||
.as_ref()
|
||||
.map(|cwd| {
|
||||
AbsolutePathBuf::relative_to_current_dir(normalize_for_native_workdir(
|
||||
cwd.as_path(),
|
||||
))
|
||||
.unwrap_or_else(|e| {
|
||||
warn!("failed to normalize update cwd: {cwd:?}: {e}");
|
||||
self.cwd.clone()
|
||||
})
|
||||
})
|
||||
.unwrap_or_else(|| self.cwd.clone());
|
||||
|
||||
let cwd_changed = absolute_cwd.as_path() != self.cwd.as_path();
|
||||
next_configuration.cwd = absolute_cwd;
|
||||
if sandbox_policy_changed {
|
||||
next_configuration.file_system_sandbox_policy =
|
||||
FileSystemSandboxPolicy::from_legacy_sandbox_policy_preserving_deny_entries(
|
||||
next_configuration.sandbox_policy.get(),
|
||||
&next_configuration.cwd,
|
||||
&self.file_system_sandbox_policy,
|
||||
);
|
||||
} else if cwd_changed && file_system_policy_matches_legacy {
|
||||
// Preserve richer split policies across cwd-only updates; only
|
||||
// rederive when the session is already using the legacy bridge.
|
||||
next_configuration.file_system_sandbox_policy =
|
||||
FileSystemSandboxPolicy::from_legacy_sandbox_policy(
|
||||
next_configuration.sandbox_policy.get(),
|
||||
&next_configuration.cwd,
|
||||
);
|
||||
}
|
||||
if let Some(app_server_client_name) = updates.app_server_client_name.clone() {
|
||||
next_configuration.app_server_client_name = Some(app_server_client_name);
|
||||
}
|
||||
if let Some(app_server_client_version) = updates.app_server_client_version.clone() {
|
||||
next_configuration.app_server_client_version = Some(app_server_client_version);
|
||||
}
|
||||
Ok(next_configuration)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
pub(crate) struct SessionSettingsUpdate {
|
||||
pub(crate) cwd: Option<PathBuf>,
|
||||
pub(crate) approval_policy: Option<AskForApproval>,
|
||||
pub(crate) approvals_reviewer: Option<ApprovalsReviewer>,
|
||||
pub(crate) sandbox_policy: Option<SandboxPolicy>,
|
||||
pub(crate) windows_sandbox_level: Option<WindowsSandboxLevel>,
|
||||
pub(crate) collaboration_mode: Option<CollaborationMode>,
|
||||
pub(crate) reasoning_summary: Option<ReasoningSummaryConfig>,
|
||||
pub(crate) service_tier: Option<Option<ServiceTier>>,
|
||||
pub(crate) final_output_json_schema: Option<Option<Value>>,
|
||||
pub(crate) personality: Option<Personality>,
|
||||
pub(crate) app_server_client_name: Option<String>,
|
||||
pub(crate) app_server_client_version: Option<String>,
|
||||
}
|
||||
|
||||
pub(crate) struct AppServerClientMetadata {
|
||||
pub(crate) client_name: Option<String>,
|
||||
pub(crate) client_version: Option<String>,
|
||||
}
|
||||
|
||||
impl Session {
|
||||
#[instrument(name = "session_init", level = "info", skip_all)]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) async fn new(
|
||||
mut session_configuration: SessionConfiguration,
|
||||
config: Arc<Config>,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
models_manager: Arc<ModelsManager>,
|
||||
exec_policy: Arc<ExecPolicyManager>,
|
||||
tx_event: Sender<Event>,
|
||||
agent_status: watch::Sender<AgentStatus>,
|
||||
initial_history: InitialHistory,
|
||||
session_source: SessionSource,
|
||||
skills_manager: Arc<SkillsManager>,
|
||||
plugins_manager: Arc<PluginsManager>,
|
||||
mcp_manager: Arc<McpManager>,
|
||||
skills_watcher: Arc<SkillsWatcher>,
|
||||
agent_control: AgentControl,
|
||||
environment: Option<Arc<Environment>>,
|
||||
analytics_events_client: Option<AnalyticsEventsClient>,
|
||||
) -> anyhow::Result<Arc<Self>> {
|
||||
debug!(
|
||||
"Configuring session: model={}; provider={:?}",
|
||||
session_configuration.collaboration_mode.model(),
|
||||
session_configuration.provider
|
||||
);
|
||||
let forked_from_id = initial_history.forked_from_id();
|
||||
|
||||
let (conversation_id, rollout_params) = match &initial_history {
|
||||
InitialHistory::New | InitialHistory::Cleared | InitialHistory::Forked(_) => {
|
||||
let conversation_id = ThreadId::default();
|
||||
(
|
||||
conversation_id,
|
||||
RolloutRecorderParams::new(
|
||||
conversation_id,
|
||||
forked_from_id,
|
||||
session_source,
|
||||
BaseInstructions {
|
||||
text: session_configuration.base_instructions.clone(),
|
||||
},
|
||||
session_configuration.dynamic_tools.clone(),
|
||||
if session_configuration.persist_extended_history {
|
||||
EventPersistenceMode::Extended
|
||||
} else {
|
||||
EventPersistenceMode::Limited
|
||||
},
|
||||
),
|
||||
)
|
||||
}
|
||||
InitialHistory::Resumed(resumed_history) => (
|
||||
resumed_history.conversation_id,
|
||||
RolloutRecorderParams::resume(
|
||||
resumed_history.rollout_path.clone(),
|
||||
if session_configuration.persist_extended_history {
|
||||
EventPersistenceMode::Extended
|
||||
} else {
|
||||
EventPersistenceMode::Limited
|
||||
},
|
||||
),
|
||||
),
|
||||
};
|
||||
let window_generation = match &initial_history {
|
||||
InitialHistory::Resumed(resumed_history) => u64::try_from(
|
||||
resumed_history
|
||||
.history
|
||||
.iter()
|
||||
.filter(|item| matches!(item, RolloutItem::Compacted(_)))
|
||||
.count(),
|
||||
)
|
||||
.unwrap_or(u64::MAX),
|
||||
InitialHistory::New | InitialHistory::Cleared | InitialHistory::Forked(_) => 0,
|
||||
};
|
||||
let state_builder = match &initial_history {
|
||||
InitialHistory::Resumed(resumed) => metadata::builder_from_items(
|
||||
resumed.history.as_slice(),
|
||||
resumed.rollout_path.as_path(),
|
||||
),
|
||||
InitialHistory::New | InitialHistory::Cleared | InitialHistory::Forked(_) => None,
|
||||
};
|
||||
|
||||
// Kick off independent async setup tasks in parallel to reduce startup latency.
|
||||
//
|
||||
// - initialize RolloutRecorder with new or resumed session info
|
||||
// - perform default shell discovery
|
||||
// - load history metadata (skipped for subagents)
|
||||
let rollout_fut = async {
|
||||
if config.ephemeral {
|
||||
Ok::<_, anyhow::Error>((None, None))
|
||||
} else {
|
||||
let state_db_ctx = state_db::init(&config).await;
|
||||
let rollout_recorder = RolloutRecorder::new(
|
||||
&config,
|
||||
rollout_params,
|
||||
state_db_ctx.clone(),
|
||||
state_builder.clone(),
|
||||
)
|
||||
.await?;
|
||||
Ok((Some(rollout_recorder), state_db_ctx))
|
||||
}
|
||||
}
|
||||
.instrument(info_span!(
|
||||
"session_init.rollout",
|
||||
otel.name = "session_init.rollout",
|
||||
session_init.ephemeral = config.ephemeral,
|
||||
));
|
||||
|
||||
let is_subagent = matches!(
|
||||
session_configuration.session_source,
|
||||
SessionSource::SubAgent(_)
|
||||
);
|
||||
let history_meta_fut = async {
|
||||
if is_subagent {
|
||||
(0, 0)
|
||||
} else {
|
||||
crate::message_history::history_metadata(&config).await
|
||||
}
|
||||
}
|
||||
.instrument(info_span!(
|
||||
"session_init.history_metadata",
|
||||
otel.name = "session_init.history_metadata",
|
||||
session_init.is_subagent = is_subagent,
|
||||
));
|
||||
let auth_manager_clone = Arc::clone(&auth_manager);
|
||||
let config_for_mcp = Arc::clone(&config);
|
||||
let mcp_manager_for_mcp = Arc::clone(&mcp_manager);
|
||||
let auth_and_mcp_fut = async move {
|
||||
let auth = auth_manager_clone.auth().await;
|
||||
let mcp_servers = mcp_manager_for_mcp
|
||||
.effective_servers(&config_for_mcp, auth.as_ref())
|
||||
.await;
|
||||
let auth_statuses = compute_auth_statuses(
|
||||
mcp_servers.iter(),
|
||||
config_for_mcp.mcp_oauth_credentials_store_mode,
|
||||
)
|
||||
.await;
|
||||
(auth, mcp_servers, auth_statuses)
|
||||
}
|
||||
.instrument(info_span!(
|
||||
"session_init.auth_mcp",
|
||||
otel.name = "session_init.auth_mcp",
|
||||
));
|
||||
|
||||
// Join all independent futures.
|
||||
let (
|
||||
rollout_recorder_and_state_db,
|
||||
(history_log_id, history_entry_count),
|
||||
(auth, mcp_servers, auth_statuses),
|
||||
) = tokio::join!(rollout_fut, history_meta_fut, auth_and_mcp_fut);
|
||||
|
||||
let (rollout_recorder, state_db_ctx) = rollout_recorder_and_state_db.map_err(|e| {
|
||||
error!("failed to initialize rollout recorder: {e:#}");
|
||||
e
|
||||
})?;
|
||||
let rollout_path = rollout_recorder
|
||||
.as_ref()
|
||||
.map(|rec| rec.rollout_path().to_path_buf());
|
||||
|
||||
let mut post_session_configured_events = Vec::<Event>::new();
|
||||
|
||||
for usage in config.features.legacy_feature_usages() {
|
||||
post_session_configured_events.push(Event {
|
||||
id: INITIAL_SUBMIT_ID.to_owned(),
|
||||
msg: EventMsg::DeprecationNotice(DeprecationNoticeEvent {
|
||||
summary: usage.summary.clone(),
|
||||
details: usage.details.clone(),
|
||||
}),
|
||||
});
|
||||
}
|
||||
if crate::config::uses_deprecated_instructions_file(&config.config_layer_stack) {
|
||||
post_session_configured_events.push(Event {
|
||||
id: INITIAL_SUBMIT_ID.to_owned(),
|
||||
msg: EventMsg::DeprecationNotice(DeprecationNoticeEvent {
|
||||
summary: "`experimental_instructions_file` is deprecated and ignored. Use `model_instructions_file` instead."
|
||||
.to_string(),
|
||||
details: Some(
|
||||
"Move the setting to `model_instructions_file` in config.toml (or under a profile) to load instructions from a file."
|
||||
.to_string(),
|
||||
),
|
||||
}),
|
||||
});
|
||||
}
|
||||
for message in &config.startup_warnings {
|
||||
post_session_configured_events.push(Event {
|
||||
id: "".to_owned(),
|
||||
msg: EventMsg::Warning(WarningEvent {
|
||||
message: message.clone(),
|
||||
}),
|
||||
});
|
||||
}
|
||||
let config_path = config.codex_home.join(CONFIG_TOML_FILE);
|
||||
if let Some(event) = unstable_features_warning_event(
|
||||
config
|
||||
.config_layer_stack
|
||||
.effective_config()
|
||||
.get("features")
|
||||
.and_then(TomlValue::as_table),
|
||||
config.suppress_unstable_features_warning,
|
||||
&config.features,
|
||||
&config_path.display().to_string(),
|
||||
) {
|
||||
post_session_configured_events.push(event);
|
||||
}
|
||||
if config.permissions.approval_policy.value() == AskForApproval::OnFailure {
|
||||
post_session_configured_events.push(Event {
|
||||
id: "".to_owned(),
|
||||
msg: EventMsg::Warning(WarningEvent {
|
||||
message: "`on-failure` approval policy is deprecated and will be removed in a future release. Use `on-request` for interactive approvals or `never` for non-interactive runs.".to_string(),
|
||||
}),
|
||||
});
|
||||
}
|
||||
|
||||
let auth = auth.as_ref();
|
||||
let auth_mode = auth.map(CodexAuth::auth_mode).map(TelemetryAuthMode::from);
|
||||
let account_id = auth.and_then(CodexAuth::get_account_id);
|
||||
let account_email = auth.and_then(CodexAuth::get_account_email);
|
||||
let originator = originator().value;
|
||||
let terminal_type = user_agent();
|
||||
let session_model = session_configuration.collaboration_mode.model().to_string();
|
||||
let auth_env_telemetry = collect_auth_env_telemetry(
|
||||
&session_configuration.provider,
|
||||
auth_manager.codex_api_key_env_enabled(),
|
||||
);
|
||||
let mut session_telemetry = SessionTelemetry::new(
|
||||
conversation_id,
|
||||
session_model.as_str(),
|
||||
session_model.as_str(),
|
||||
account_id.clone(),
|
||||
account_email.clone(),
|
||||
auth_mode,
|
||||
originator.clone(),
|
||||
config.otel.log_user_prompt,
|
||||
terminal_type.clone(),
|
||||
session_configuration.session_source.clone(),
|
||||
)
|
||||
.with_auth_env(auth_env_telemetry.to_otel_metadata());
|
||||
if let Some(service_name) = session_configuration.metrics_service_name.as_deref() {
|
||||
session_telemetry = session_telemetry.with_metrics_service_name(service_name);
|
||||
}
|
||||
let network_proxy_audit_metadata = NetworkProxyAuditMetadata {
|
||||
conversation_id: Some(conversation_id.to_string()),
|
||||
app_version: Some(env!("CARGO_PKG_VERSION").to_string()),
|
||||
user_account_id: account_id,
|
||||
auth_mode: auth_mode.map(|mode| mode.to_string()),
|
||||
originator: Some(originator),
|
||||
user_email: account_email,
|
||||
terminal_type: Some(terminal_type),
|
||||
model: Some(session_model.clone()),
|
||||
slug: Some(session_model),
|
||||
};
|
||||
config.features.emit_metrics(&session_telemetry);
|
||||
session_telemetry.counter(
|
||||
THREAD_STARTED_METRIC,
|
||||
/*inc*/ 1,
|
||||
&[(
|
||||
"is_git",
|
||||
if get_git_repo_root(&session_configuration.cwd).is_some() {
|
||||
"true"
|
||||
} else {
|
||||
"false"
|
||||
},
|
||||
)],
|
||||
);
|
||||
|
||||
session_telemetry.conversation_starts(
|
||||
config.model_provider.name.as_str(),
|
||||
session_configuration.collaboration_mode.reasoning_effort(),
|
||||
config
|
||||
.model_reasoning_summary
|
||||
.unwrap_or(ReasoningSummaryConfig::Auto),
|
||||
config.model_context_window,
|
||||
config.model_auto_compact_token_limit,
|
||||
config.permissions.approval_policy.value(),
|
||||
config.permissions.sandbox_policy.get().clone(),
|
||||
mcp_servers.keys().map(String::as_str).collect(),
|
||||
config.active_profile.clone(),
|
||||
);
|
||||
|
||||
let use_zsh_fork_shell = config.features.enabled(Feature::ShellZshFork);
|
||||
let mut default_shell = if let Some(user_shell_override) =
|
||||
session_configuration.user_shell_override.clone()
|
||||
{
|
||||
user_shell_override
|
||||
} else if use_zsh_fork_shell {
|
||||
let zsh_path = config.zsh_path.as_ref().ok_or_else(|| {
|
||||
anyhow::anyhow!(
|
||||
"zsh fork feature enabled, but `zsh_path` is not configured; set `zsh_path` in config.toml"
|
||||
)
|
||||
})?;
|
||||
let zsh_path = zsh_path.to_path_buf();
|
||||
shell::get_shell(shell::ShellType::Zsh, Some(&zsh_path)).ok_or_else(|| {
|
||||
anyhow::anyhow!(
|
||||
"zsh fork feature enabled, but zsh_path `{}` is not usable; set `zsh_path` to a valid zsh executable",
|
||||
zsh_path.display()
|
||||
)
|
||||
})?
|
||||
} else {
|
||||
shell::default_user_shell()
|
||||
};
|
||||
// Create the mutable state for the Session.
|
||||
let shell_snapshot_tx = if config.features.enabled(Feature::ShellSnapshot) {
|
||||
if let Some(snapshot) = session_configuration.inherited_shell_snapshot.clone() {
|
||||
let (tx, rx) = watch::channel(Some(snapshot));
|
||||
default_shell.shell_snapshot = rx;
|
||||
tx
|
||||
} else {
|
||||
ShellSnapshot::start_snapshotting(
|
||||
config.codex_home.clone(),
|
||||
conversation_id,
|
||||
session_configuration.cwd.clone(),
|
||||
&mut default_shell,
|
||||
session_telemetry.clone(),
|
||||
)
|
||||
}
|
||||
} else {
|
||||
let (tx, rx) = watch::channel(None);
|
||||
default_shell.shell_snapshot = rx;
|
||||
tx
|
||||
};
|
||||
let thread_name =
|
||||
thread_title_from_state_db(state_db_ctx.as_ref(), &config.codex_home, conversation_id)
|
||||
.instrument(info_span!(
|
||||
"session_init.thread_name_lookup",
|
||||
otel.name = "session_init.thread_name_lookup",
|
||||
))
|
||||
.await;
|
||||
session_configuration.thread_name = thread_name.clone();
|
||||
let state = SessionState::new(session_configuration.clone());
|
||||
let managed_network_requirements_configured = config
|
||||
.config_layer_stack
|
||||
.requirements_toml()
|
||||
.network
|
||||
.is_some();
|
||||
let managed_network_requirements_enabled = config.managed_network_requirements_enabled();
|
||||
let network_approval = Arc::new(NetworkApprovalService::default());
|
||||
// The managed proxy can call back into core for allowlist-miss decisions.
|
||||
let network_policy_decider_session = if managed_network_requirements_configured {
|
||||
config
|
||||
.permissions
|
||||
.network
|
||||
.as_ref()
|
||||
.map(|_| Arc::new(RwLock::new(std::sync::Weak::<Session>::new())))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let blocked_request_observer = if managed_network_requirements_configured {
|
||||
config
|
||||
.permissions
|
||||
.network
|
||||
.as_ref()
|
||||
.map(|_| build_blocked_request_observer(Arc::clone(&network_approval)))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let network_policy_decider =
|
||||
network_policy_decider_session
|
||||
.as_ref()
|
||||
.map(|network_policy_decider_session| {
|
||||
build_network_policy_decider(
|
||||
Arc::clone(&network_approval),
|
||||
Arc::clone(network_policy_decider_session),
|
||||
)
|
||||
});
|
||||
let (network_proxy, session_network_proxy) =
|
||||
if let Some(spec) = config.permissions.network.as_ref() {
|
||||
let current_exec_policy = exec_policy.current();
|
||||
let (network_proxy, session_network_proxy) = Self::start_managed_network_proxy(
|
||||
spec,
|
||||
current_exec_policy.as_ref(),
|
||||
config.permissions.sandbox_policy.get(),
|
||||
network_policy_decider.as_ref().map(Arc::clone),
|
||||
blocked_request_observer.as_ref().map(Arc::clone),
|
||||
managed_network_requirements_configured,
|
||||
network_proxy_audit_metadata,
|
||||
)
|
||||
.instrument(info_span!(
|
||||
"session_init.network_proxy",
|
||||
otel.name = "session_init.network_proxy",
|
||||
session_init.managed_network_requirements_enabled =
|
||||
managed_network_requirements_enabled,
|
||||
))
|
||||
.await?;
|
||||
(Some(network_proxy), Some(session_network_proxy))
|
||||
} else {
|
||||
(None, None)
|
||||
};
|
||||
|
||||
let mut hook_shell_argv =
|
||||
default_shell.derive_exec_args("", /*use_login_shell*/ false);
|
||||
let hook_shell_program = hook_shell_argv.remove(0);
|
||||
let _ = hook_shell_argv.pop();
|
||||
let hooks = Hooks::new(HooksConfig {
|
||||
legacy_notify_argv: config.notify.clone(),
|
||||
feature_enabled: config.features.enabled(Feature::CodexHooks),
|
||||
config_layer_stack: Some(config.config_layer_stack.clone()),
|
||||
shell_program: Some(hook_shell_program),
|
||||
shell_args: hook_shell_argv,
|
||||
});
|
||||
for warning in hooks.startup_warnings() {
|
||||
post_session_configured_events.push(Event {
|
||||
id: INITIAL_SUBMIT_ID.to_owned(),
|
||||
msg: EventMsg::Warning(WarningEvent {
|
||||
message: warning.clone(),
|
||||
}),
|
||||
});
|
||||
}
|
||||
|
||||
let installation_id = resolve_installation_id(&config.codex_home).await?;
|
||||
let analytics_events_client = analytics_events_client.unwrap_or_else(|| {
|
||||
AnalyticsEventsClient::new(
|
||||
Arc::clone(&auth_manager),
|
||||
config.chatgpt_base_url.trim_end_matches('/').to_string(),
|
||||
config.analytics_enabled,
|
||||
)
|
||||
});
|
||||
let services = SessionServices {
|
||||
// Initialize the MCP connection manager with an uninitialized
|
||||
// instance. It will be replaced with one created via
|
||||
// McpConnectionManager::new() once all its constructor args are
|
||||
// available. This also ensures `SessionConfigured` is emitted
|
||||
// before any MCP-related events. It is reasonable to consider
|
||||
// changing this to use Option or OnceCell, though the current
|
||||
// setup is straightforward enough and performs well.
|
||||
mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::new_uninitialized(
|
||||
&config.permissions.approval_policy,
|
||||
&config.permissions.sandbox_policy,
|
||||
))),
|
||||
mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()),
|
||||
unified_exec_manager: UnifiedExecProcessManager::new(
|
||||
config.background_terminal_max_timeout,
|
||||
),
|
||||
shell_zsh_path: config.zsh_path.clone(),
|
||||
main_execve_wrapper_exe: config.main_execve_wrapper_exe.clone(),
|
||||
analytics_events_client,
|
||||
hooks,
|
||||
rollout: Mutex::new(rollout_recorder),
|
||||
user_shell: Arc::new(default_shell),
|
||||
agent_identity_manager: Arc::new(AgentIdentityManager::new(
|
||||
config.as_ref(),
|
||||
Arc::clone(&auth_manager),
|
||||
session_configuration.session_source.clone(),
|
||||
)),
|
||||
shell_snapshot_tx,
|
||||
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
|
||||
exec_policy,
|
||||
auth_manager: Arc::clone(&auth_manager),
|
||||
session_telemetry,
|
||||
models_manager: Arc::clone(&models_manager),
|
||||
tool_approvals: Mutex::new(ApprovalStore::default()),
|
||||
guardian_rejections: Mutex::new(HashMap::new()),
|
||||
skills_manager,
|
||||
plugins_manager: Arc::clone(&plugins_manager),
|
||||
mcp_manager: Arc::clone(&mcp_manager),
|
||||
skills_watcher,
|
||||
agent_control,
|
||||
network_proxy,
|
||||
network_approval: Arc::clone(&network_approval),
|
||||
state_db: state_db_ctx.clone(),
|
||||
thread_store: LocalThreadStore::new(RolloutConfig::from_view(config.as_ref())),
|
||||
model_client: ModelClient::new(
|
||||
Some(Arc::clone(&auth_manager)),
|
||||
conversation_id,
|
||||
installation_id,
|
||||
session_configuration.provider.clone(),
|
||||
session_configuration.session_source.clone(),
|
||||
config.model_verbosity,
|
||||
config.features.enabled(Feature::EnableRequestCompression),
|
||||
config.features.enabled(Feature::RuntimeMetrics),
|
||||
Self::build_model_client_beta_features_header(config.as_ref()),
|
||||
),
|
||||
code_mode_service: crate::tools::code_mode::CodeModeService::new(
|
||||
config.js_repl_node_path.clone(),
|
||||
),
|
||||
environment,
|
||||
};
|
||||
services
|
||||
.model_client
|
||||
.set_window_generation(window_generation);
|
||||
let js_repl = Arc::new(JsReplHandle::with_node_path(
|
||||
config.js_repl_node_path.clone(),
|
||||
config.js_repl_node_module_dirs.clone(),
|
||||
));
|
||||
let (out_of_band_elicitation_paused, _out_of_band_elicitation_paused_rx) =
|
||||
watch::channel(false);
|
||||
|
||||
let (mailbox, mailbox_rx) = Mailbox::new();
|
||||
let sess = Arc::new(Session {
|
||||
conversation_id,
|
||||
tx_event: tx_event.clone(),
|
||||
agent_status,
|
||||
out_of_band_elicitation_paused,
|
||||
state: Mutex::new(state),
|
||||
managed_network_proxy_refresh_lock: Mutex::new(()),
|
||||
features: config.features.clone(),
|
||||
pending_mcp_server_refresh_config: Mutex::new(None),
|
||||
conversation: Arc::new(RealtimeConversationManager::new()),
|
||||
active_turn: Mutex::new(None),
|
||||
mailbox,
|
||||
mailbox_rx: Mutex::new(mailbox_rx),
|
||||
idle_pending_input: Mutex::new(Vec::new()),
|
||||
guardian_review_session: GuardianReviewSessionManager::default(),
|
||||
services,
|
||||
js_repl,
|
||||
next_internal_sub_id: AtomicU64::new(0),
|
||||
});
|
||||
if let Some(network_policy_decider_session) = network_policy_decider_session {
|
||||
let mut guard = network_policy_decider_session.write().await;
|
||||
*guard = Arc::downgrade(&sess);
|
||||
}
|
||||
// Dispatch the SessionConfiguredEvent first and then report any errors.
|
||||
// If resuming, include converted initial messages in the payload so UIs can render them immediately.
|
||||
let initial_messages = initial_history.get_event_msgs();
|
||||
let events = std::iter::once(Event {
|
||||
id: INITIAL_SUBMIT_ID.to_owned(),
|
||||
msg: EventMsg::SessionConfigured(SessionConfiguredEvent {
|
||||
session_id: conversation_id,
|
||||
forked_from_id,
|
||||
thread_name: session_configuration.thread_name.clone(),
|
||||
model: session_configuration.collaboration_mode.model().to_string(),
|
||||
model_provider_id: config.model_provider_id.clone(),
|
||||
service_tier: session_configuration.service_tier,
|
||||
approval_policy: session_configuration.approval_policy.value(),
|
||||
approvals_reviewer: session_configuration.approvals_reviewer,
|
||||
sandbox_policy: session_configuration.sandbox_policy.get().clone(),
|
||||
cwd: session_configuration.cwd.clone(),
|
||||
reasoning_effort: session_configuration.collaboration_mode.reasoning_effort(),
|
||||
history_log_id,
|
||||
history_entry_count,
|
||||
initial_messages,
|
||||
network_proxy: session_network_proxy.filter(|_| {
|
||||
Self::managed_network_proxy_active_for_sandbox_policy(
|
||||
session_configuration.sandbox_policy.get(),
|
||||
)
|
||||
}),
|
||||
rollout_path,
|
||||
}),
|
||||
})
|
||||
.chain(post_session_configured_events.into_iter());
|
||||
for event in events {
|
||||
sess.send_event_raw(event).await;
|
||||
}
|
||||
|
||||
// Start the watcher after SessionConfigured so it cannot emit earlier events.
|
||||
sess.start_skills_watcher_listener();
|
||||
sess.start_agent_identity_registration();
|
||||
let mut required_mcp_servers: Vec<String> = mcp_servers
|
||||
.iter()
|
||||
.filter(|(_, server)| server.enabled && server.required)
|
||||
.map(|(name, _)| name.clone())
|
||||
.collect();
|
||||
required_mcp_servers.sort();
|
||||
let enabled_mcp_server_count = mcp_servers.values().filter(|server| server.enabled).count();
|
||||
let required_mcp_server_count = required_mcp_servers.len();
|
||||
let tool_plugin_provenance = mcp_manager.tool_plugin_provenance(config.as_ref()).await;
|
||||
{
|
||||
let mut cancel_guard = sess.services.mcp_startup_cancellation_token.lock().await;
|
||||
cancel_guard.cancel();
|
||||
*cancel_guard = CancellationToken::new();
|
||||
}
|
||||
let (mcp_connection_manager, cancel_token) = McpConnectionManager::new(
|
||||
&mcp_servers,
|
||||
config.mcp_oauth_credentials_store_mode,
|
||||
auth_statuses.clone(),
|
||||
&session_configuration.approval_policy,
|
||||
INITIAL_SUBMIT_ID.to_owned(),
|
||||
tx_event.clone(),
|
||||
session_configuration.sandbox_policy.get().clone(),
|
||||
config.codex_home.to_path_buf(),
|
||||
codex_apps_tools_cache_key(auth),
|
||||
tool_plugin_provenance,
|
||||
)
|
||||
.instrument(info_span!(
|
||||
"session_init.mcp_manager_init",
|
||||
otel.name = "session_init.mcp_manager_init",
|
||||
session_init.enabled_mcp_server_count = enabled_mcp_server_count,
|
||||
session_init.required_mcp_server_count = required_mcp_server_count,
|
||||
))
|
||||
.await;
|
||||
{
|
||||
let mut manager_guard = sess.services.mcp_connection_manager.write().await;
|
||||
*manager_guard = mcp_connection_manager;
|
||||
}
|
||||
{
|
||||
let mut cancel_guard = sess.services.mcp_startup_cancellation_token.lock().await;
|
||||
if cancel_guard.is_cancelled() {
|
||||
cancel_token.cancel();
|
||||
}
|
||||
*cancel_guard = cancel_token;
|
||||
}
|
||||
if !required_mcp_servers.is_empty() {
|
||||
let failures = sess
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.required_startup_failures(&required_mcp_servers)
|
||||
.instrument(info_span!(
|
||||
"session_init.required_mcp_wait",
|
||||
otel.name = "session_init.required_mcp_wait",
|
||||
session_init.required_mcp_server_count = required_mcp_server_count,
|
||||
))
|
||||
.await;
|
||||
if !failures.is_empty() {
|
||||
let details = failures
|
||||
.iter()
|
||||
.map(|failure| format!("{}: {}", failure.server, failure.error))
|
||||
.collect::<Vec<_>>()
|
||||
.join("; ");
|
||||
return Err(anyhow::anyhow!(
|
||||
"required MCP servers failed to initialize: {details}"
|
||||
));
|
||||
}
|
||||
}
|
||||
sess.schedule_startup_prewarm(session_configuration.base_instructions.clone())
|
||||
.await;
|
||||
let session_start_source = match &initial_history {
|
||||
InitialHistory::Resumed(_) => codex_hooks::SessionStartSource::Resume,
|
||||
InitialHistory::New | InitialHistory::Forked(_) => {
|
||||
codex_hooks::SessionStartSource::Startup
|
||||
}
|
||||
InitialHistory::Cleared => codex_hooks::SessionStartSource::Clear,
|
||||
};
|
||||
|
||||
// record_initial_history can emit events. We record only after the SessionConfiguredEvent is emitted.
|
||||
sess.record_initial_history(initial_history).await;
|
||||
{
|
||||
let mut state = sess.state.lock().await;
|
||||
state.set_pending_session_start_source(Some(session_start_source));
|
||||
}
|
||||
|
||||
memories::start_memories_startup_task(
|
||||
&sess,
|
||||
Arc::clone(&config),
|
||||
&session_configuration.session_source,
|
||||
);
|
||||
|
||||
Ok(sess)
|
||||
}
|
||||
}
|
||||
615
codex-rs/core/src/codex/turn_context.rs
Normal file
615
codex-rs/core/src/codex/turn_context.rs
Normal file
@@ -0,0 +1,615 @@
|
||||
use super::*;
|
||||
|
||||
pub(super) fn image_generation_tool_auth_allowed(auth_manager: Option<&AuthManager>) -> bool {
|
||||
matches!(
|
||||
auth_manager.and_then(AuthManager::auth_mode),
|
||||
Some(AuthMode::Chatgpt)
|
||||
)
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) struct TurnSkillsContext {
|
||||
pub(crate) outcome: Arc<SkillLoadOutcome>,
|
||||
pub(crate) implicit_invocation_seen_skills: Arc<Mutex<HashSet<String>>>,
|
||||
}
|
||||
|
||||
impl TurnSkillsContext {
|
||||
pub(crate) fn new(outcome: Arc<SkillLoadOutcome>) -> Self {
|
||||
Self {
|
||||
outcome,
|
||||
implicit_invocation_seen_skills: Arc::new(Mutex::new(HashSet::new())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The context needed for a single turn of the thread.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct TurnContext {
|
||||
pub(crate) sub_id: String,
|
||||
pub(crate) trace_id: Option<String>,
|
||||
pub(crate) realtime_active: bool,
|
||||
pub(crate) config: Arc<Config>,
|
||||
pub(crate) auth_manager: Option<Arc<AuthManager>>,
|
||||
pub(crate) model_info: ModelInfo,
|
||||
pub(crate) session_telemetry: SessionTelemetry,
|
||||
pub(crate) provider: ModelProviderInfo,
|
||||
pub(crate) reasoning_effort: Option<ReasoningEffortConfig>,
|
||||
pub(crate) reasoning_summary: ReasoningSummaryConfig,
|
||||
pub(crate) session_source: SessionSource,
|
||||
pub(crate) environment: Option<Arc<Environment>>,
|
||||
/// The session's absolute working directory. All relative paths provided
|
||||
/// by the model as well as sandbox policies are resolved against this path
|
||||
/// instead of `std::env::current_dir()`.
|
||||
pub(crate) cwd: AbsolutePathBuf,
|
||||
pub(crate) current_date: Option<String>,
|
||||
pub(crate) timezone: Option<String>,
|
||||
pub(crate) app_server_client_name: Option<String>,
|
||||
pub(crate) developer_instructions: Option<String>,
|
||||
pub(crate) compact_prompt: Option<String>,
|
||||
pub(crate) user_instructions: Option<String>,
|
||||
pub(crate) collaboration_mode: CollaborationMode,
|
||||
pub(crate) personality: Option<Personality>,
|
||||
pub(crate) approval_policy: Constrained<AskForApproval>,
|
||||
pub(crate) sandbox_policy: Constrained<SandboxPolicy>,
|
||||
pub(crate) file_system_sandbox_policy: FileSystemSandboxPolicy,
|
||||
pub(crate) network_sandbox_policy: NetworkSandboxPolicy,
|
||||
pub(crate) network: Option<NetworkProxy>,
|
||||
pub(crate) windows_sandbox_level: WindowsSandboxLevel,
|
||||
pub(crate) shell_environment_policy: ShellEnvironmentPolicy,
|
||||
pub(crate) tools_config: ToolsConfig,
|
||||
pub(crate) features: ManagedFeatures,
|
||||
pub(crate) ghost_snapshot: GhostSnapshotConfig,
|
||||
pub(crate) final_output_json_schema: Option<Value>,
|
||||
pub(crate) codex_self_exe: Option<PathBuf>,
|
||||
pub(crate) codex_linux_sandbox_exe: Option<PathBuf>,
|
||||
pub(crate) tool_call_gate: Arc<ReadinessFlag>,
|
||||
pub(crate) truncation_policy: TruncationPolicy,
|
||||
pub(crate) js_repl: Arc<JsReplHandle>,
|
||||
pub(crate) dynamic_tools: Vec<DynamicToolSpec>,
|
||||
pub(crate) turn_metadata_state: Arc<TurnMetadataState>,
|
||||
pub(crate) turn_skills: TurnSkillsContext,
|
||||
pub(crate) turn_timing_state: Arc<TurnTimingState>,
|
||||
}
|
||||
impl TurnContext {
|
||||
pub(crate) fn model_context_window(&self) -> Option<i64> {
|
||||
let effective_context_window_percent = self.model_info.effective_context_window_percent;
|
||||
self.model_info.context_window.map(|context_window| {
|
||||
context_window.saturating_mul(effective_context_window_percent) / 100
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn apps_enabled(&self) -> bool {
|
||||
let is_chatgpt_auth = self
|
||||
.auth_manager
|
||||
.as_deref()
|
||||
.and_then(AuthManager::auth_cached)
|
||||
.as_ref()
|
||||
.is_some_and(CodexAuth::is_chatgpt_auth);
|
||||
self.features.apps_enabled_for_auth(is_chatgpt_auth)
|
||||
}
|
||||
|
||||
pub(crate) async fn with_model(&self, model: String, models_manager: &ModelsManager) -> Self {
|
||||
let mut config = (*self.config).clone();
|
||||
config.model = Some(model.clone());
|
||||
let model_info = models_manager
|
||||
.get_model_info(model.as_str(), &config.to_models_manager_config())
|
||||
.await;
|
||||
let truncation_policy = model_info.truncation_policy.into();
|
||||
let supported_reasoning_levels = model_info
|
||||
.supported_reasoning_levels
|
||||
.iter()
|
||||
.map(|preset| preset.effort)
|
||||
.collect::<Vec<_>>();
|
||||
let reasoning_effort = if let Some(current_reasoning_effort) = self.reasoning_effort {
|
||||
if supported_reasoning_levels.contains(¤t_reasoning_effort) {
|
||||
Some(current_reasoning_effort)
|
||||
} else {
|
||||
supported_reasoning_levels
|
||||
.get(supported_reasoning_levels.len().saturating_sub(1) / 2)
|
||||
.copied()
|
||||
.or(model_info.default_reasoning_level)
|
||||
}
|
||||
} else {
|
||||
supported_reasoning_levels
|
||||
.get(supported_reasoning_levels.len().saturating_sub(1) / 2)
|
||||
.copied()
|
||||
.or(model_info.default_reasoning_level)
|
||||
};
|
||||
config.model_reasoning_effort = reasoning_effort;
|
||||
|
||||
let collaboration_mode = self.collaboration_mode.with_updates(
|
||||
Some(model.clone()),
|
||||
Some(reasoning_effort),
|
||||
/*developer_instructions*/ None,
|
||||
);
|
||||
let features = self.features.clone();
|
||||
let tools_config = ToolsConfig::new(&ToolsConfigParams {
|
||||
model_info: &model_info,
|
||||
available_models: &models_manager
|
||||
.list_models(RefreshStrategy::OnlineIfUncached)
|
||||
.await,
|
||||
features: &features,
|
||||
image_generation_tool_auth_allowed: image_generation_tool_auth_allowed(
|
||||
self.auth_manager.as_deref(),
|
||||
),
|
||||
web_search_mode: self.tools_config.web_search_mode,
|
||||
session_source: self.session_source.clone(),
|
||||
sandbox_policy: self.sandbox_policy.get(),
|
||||
windows_sandbox_level: self.windows_sandbox_level,
|
||||
})
|
||||
.with_unified_exec_shell_mode(self.tools_config.unified_exec_shell_mode.clone())
|
||||
.with_web_search_config(self.tools_config.web_search_config.clone())
|
||||
.with_allow_login_shell(self.tools_config.allow_login_shell)
|
||||
.with_has_environment(self.tools_config.has_environment)
|
||||
.with_spawn_agent_usage_hint(config.multi_agent_v2.usage_hint_enabled)
|
||||
.with_spawn_agent_usage_hint_text(config.multi_agent_v2.usage_hint_text.clone())
|
||||
.with_hide_spawn_agent_metadata(config.multi_agent_v2.hide_spawn_agent_metadata)
|
||||
.with_agent_type_description(crate::agent::role::spawn_tool_spec::build(
|
||||
&config.agent_roles,
|
||||
));
|
||||
|
||||
Self {
|
||||
sub_id: self.sub_id.clone(),
|
||||
trace_id: self.trace_id.clone(),
|
||||
realtime_active: self.realtime_active,
|
||||
config: Arc::new(config),
|
||||
auth_manager: self.auth_manager.clone(),
|
||||
model_info: model_info.clone(),
|
||||
session_telemetry: self
|
||||
.session_telemetry
|
||||
.clone()
|
||||
.with_model(model.as_str(), model_info.slug.as_str()),
|
||||
provider: self.provider.clone(),
|
||||
reasoning_effort,
|
||||
reasoning_summary: self.reasoning_summary,
|
||||
session_source: self.session_source.clone(),
|
||||
environment: self.environment.clone(),
|
||||
cwd: self.cwd.clone(),
|
||||
current_date: self.current_date.clone(),
|
||||
timezone: self.timezone.clone(),
|
||||
app_server_client_name: self.app_server_client_name.clone(),
|
||||
developer_instructions: self.developer_instructions.clone(),
|
||||
compact_prompt: self.compact_prompt.clone(),
|
||||
user_instructions: self.user_instructions.clone(),
|
||||
collaboration_mode,
|
||||
personality: self.personality,
|
||||
approval_policy: self.approval_policy.clone(),
|
||||
sandbox_policy: self.sandbox_policy.clone(),
|
||||
file_system_sandbox_policy: self.file_system_sandbox_policy.clone(),
|
||||
network_sandbox_policy: self.network_sandbox_policy,
|
||||
network: self.network.clone(),
|
||||
windows_sandbox_level: self.windows_sandbox_level,
|
||||
shell_environment_policy: self.shell_environment_policy.clone(),
|
||||
tools_config,
|
||||
features,
|
||||
ghost_snapshot: self.ghost_snapshot.clone(),
|
||||
final_output_json_schema: self.final_output_json_schema.clone(),
|
||||
codex_self_exe: self.codex_self_exe.clone(),
|
||||
codex_linux_sandbox_exe: self.codex_linux_sandbox_exe.clone(),
|
||||
tool_call_gate: Arc::new(ReadinessFlag::new()),
|
||||
truncation_policy,
|
||||
js_repl: Arc::clone(&self.js_repl),
|
||||
dynamic_tools: self.dynamic_tools.clone(),
|
||||
turn_metadata_state: self.turn_metadata_state.clone(),
|
||||
turn_skills: self.turn_skills.clone(),
|
||||
turn_timing_state: Arc::clone(&self.turn_timing_state),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn resolve_path(&self, path: Option<String>) -> AbsolutePathBuf {
|
||||
path.as_ref()
|
||||
.map_or_else(|| self.cwd.clone(), |path| self.cwd.join(path))
|
||||
}
|
||||
|
||||
pub(crate) fn file_system_sandbox_context(
|
||||
&self,
|
||||
additional_permissions: Option<PermissionProfile>,
|
||||
) -> FileSystemSandboxContext {
|
||||
FileSystemSandboxContext {
|
||||
sandbox_policy: self.sandbox_policy.get().clone(),
|
||||
windows_sandbox_level: self.windows_sandbox_level,
|
||||
windows_sandbox_private_desktop: self
|
||||
.config
|
||||
.permissions
|
||||
.windows_sandbox_private_desktop,
|
||||
use_legacy_landlock: self.features.use_legacy_landlock(),
|
||||
additional_permissions,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn compact_prompt(&self) -> &str {
|
||||
self.compact_prompt
|
||||
.as_deref()
|
||||
.unwrap_or(compact::SUMMARIZATION_PROMPT)
|
||||
}
|
||||
|
||||
pub(crate) fn to_turn_context_item(&self) -> TurnContextItem {
|
||||
let legacy_file_system_sandbox_policy = FileSystemSandboxPolicy::from_legacy_sandbox_policy(
|
||||
self.sandbox_policy.get(),
|
||||
&self.cwd,
|
||||
);
|
||||
// Omit the derived split filesystem policy when it is equivalent to
|
||||
// the legacy sandbox policy. This keeps turn-context payloads stable
|
||||
// while both fields exist; once callers consume only the split policy,
|
||||
// this comparison and the legacy projection should go away.
|
||||
let file_system_sandbox_policy = (self.file_system_sandbox_policy
|
||||
!= legacy_file_system_sandbox_policy)
|
||||
.then(|| self.file_system_sandbox_policy.clone());
|
||||
|
||||
TurnContextItem {
|
||||
turn_id: Some(self.sub_id.clone()),
|
||||
trace_id: self.trace_id.clone(),
|
||||
cwd: self.cwd.to_path_buf(),
|
||||
current_date: self.current_date.clone(),
|
||||
timezone: self.timezone.clone(),
|
||||
approval_policy: self.approval_policy.value(),
|
||||
sandbox_policy: self.sandbox_policy.get().clone(),
|
||||
network: self.turn_context_network_item(),
|
||||
file_system_sandbox_policy,
|
||||
model: self.model_info.slug.clone(),
|
||||
personality: self.personality,
|
||||
collaboration_mode: Some(self.collaboration_mode.clone()),
|
||||
realtime_active: Some(self.realtime_active),
|
||||
effort: self.reasoning_effort,
|
||||
summary: self.reasoning_summary,
|
||||
user_instructions: self.user_instructions.clone(),
|
||||
developer_instructions: self.developer_instructions.clone(),
|
||||
final_output_json_schema: self.final_output_json_schema.clone(),
|
||||
truncation_policy: Some(self.truncation_policy),
|
||||
}
|
||||
}
|
||||
|
||||
fn turn_context_network_item(&self) -> Option<TurnContextNetworkItem> {
|
||||
let network = self
|
||||
.config
|
||||
.config_layer_stack
|
||||
.requirements()
|
||||
.network
|
||||
.as_ref()?;
|
||||
Some(TurnContextNetworkItem {
|
||||
allowed_domains: network
|
||||
.domains
|
||||
.as_ref()
|
||||
.and_then(codex_config::NetworkDomainPermissionsToml::allowed_domains)
|
||||
.unwrap_or_default(),
|
||||
denied_domains: network
|
||||
.domains
|
||||
.as_ref()
|
||||
.and_then(codex_config::NetworkDomainPermissionsToml::denied_domains)
|
||||
.unwrap_or_default(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn local_time_context() -> (String, String) {
|
||||
match iana_time_zone::get_timezone() {
|
||||
Ok(timezone) => (Local::now().format("%Y-%m-%d").to_string(), timezone),
|
||||
Err(_) => (
|
||||
Utc::now().format("%Y-%m-%d").to_string(),
|
||||
"Etc/UTC".to_string(),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
impl Session {
|
||||
/// Don't expand the number of mutated arguments on config. We are in the process of getting rid of it.
|
||||
pub(crate) fn build_per_turn_config(session_configuration: &SessionConfiguration) -> Config {
|
||||
// todo(aibrahim): store this state somewhere else so we don't need to mut config
|
||||
let config = session_configuration.original_config_do_not_use.clone();
|
||||
let mut per_turn_config = (*config).clone();
|
||||
per_turn_config.cwd = session_configuration.cwd.clone();
|
||||
per_turn_config.model_reasoning_effort =
|
||||
session_configuration.collaboration_mode.reasoning_effort();
|
||||
per_turn_config.model_reasoning_summary = session_configuration.model_reasoning_summary;
|
||||
per_turn_config.service_tier = session_configuration.service_tier;
|
||||
per_turn_config.personality = session_configuration.personality;
|
||||
per_turn_config.approvals_reviewer = session_configuration.approvals_reviewer;
|
||||
let resolved_web_search_mode = resolve_web_search_mode_for_turn(
|
||||
&per_turn_config.web_search_mode,
|
||||
session_configuration.sandbox_policy.get(),
|
||||
);
|
||||
if let Err(err) = per_turn_config
|
||||
.web_search_mode
|
||||
.set(resolved_web_search_mode)
|
||||
{
|
||||
let fallback_value = per_turn_config.web_search_mode.value();
|
||||
tracing::warn!(
|
||||
error = %err,
|
||||
?resolved_web_search_mode,
|
||||
?fallback_value,
|
||||
"resolved web_search_mode is disallowed by requirements; keeping constrained value"
|
||||
);
|
||||
}
|
||||
per_turn_config.features = config.features.clone();
|
||||
per_turn_config
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) fn make_turn_context(
|
||||
conversation_id: ThreadId,
|
||||
auth_manager: Option<Arc<AuthManager>>,
|
||||
session_telemetry: &SessionTelemetry,
|
||||
provider: ModelProviderInfo,
|
||||
session_configuration: &SessionConfiguration,
|
||||
user_shell: &shell::Shell,
|
||||
shell_zsh_path: Option<&PathBuf>,
|
||||
main_execve_wrapper_exe: Option<&PathBuf>,
|
||||
per_turn_config: Config,
|
||||
model_info: ModelInfo,
|
||||
models_manager: &ModelsManager,
|
||||
network: Option<NetworkProxy>,
|
||||
environment: Option<Arc<Environment>>,
|
||||
sub_id: String,
|
||||
js_repl: Arc<JsReplHandle>,
|
||||
skills_outcome: Arc<SkillLoadOutcome>,
|
||||
) -> TurnContext {
|
||||
let reasoning_effort = session_configuration.collaboration_mode.reasoning_effort();
|
||||
let reasoning_summary = session_configuration
|
||||
.model_reasoning_summary
|
||||
.unwrap_or(model_info.default_reasoning_summary);
|
||||
let session_telemetry = session_telemetry.clone().with_model(
|
||||
session_configuration.collaboration_mode.model(),
|
||||
model_info.slug.as_str(),
|
||||
);
|
||||
let session_source = session_configuration.session_source.clone();
|
||||
let image_generation_tool_auth_allowed =
|
||||
image_generation_tool_auth_allowed(auth_manager.as_deref());
|
||||
let auth_manager_for_context = auth_manager;
|
||||
let provider_for_context = provider;
|
||||
let session_telemetry_for_context = session_telemetry;
|
||||
let tools_config = ToolsConfig::new(&ToolsConfigParams {
|
||||
model_info: &model_info,
|
||||
available_models: &models_manager.try_list_models().unwrap_or_default(),
|
||||
features: &per_turn_config.features,
|
||||
image_generation_tool_auth_allowed,
|
||||
web_search_mode: Some(per_turn_config.web_search_mode.value()),
|
||||
session_source: session_source.clone(),
|
||||
sandbox_policy: session_configuration.sandbox_policy.get(),
|
||||
windows_sandbox_level: session_configuration.windows_sandbox_level,
|
||||
})
|
||||
.with_unified_exec_shell_mode_for_session(
|
||||
crate::tools::spec::tool_user_shell_type(user_shell),
|
||||
shell_zsh_path,
|
||||
main_execve_wrapper_exe,
|
||||
)
|
||||
.with_web_search_config(per_turn_config.web_search_config.clone())
|
||||
.with_allow_login_shell(per_turn_config.permissions.allow_login_shell)
|
||||
.with_has_environment(environment.is_some())
|
||||
.with_spawn_agent_usage_hint(per_turn_config.multi_agent_v2.usage_hint_enabled)
|
||||
.with_spawn_agent_usage_hint_text(per_turn_config.multi_agent_v2.usage_hint_text.clone())
|
||||
.with_hide_spawn_agent_metadata(per_turn_config.multi_agent_v2.hide_spawn_agent_metadata)
|
||||
.with_agent_type_description(crate::agent::role::spawn_tool_spec::build(
|
||||
&per_turn_config.agent_roles,
|
||||
));
|
||||
|
||||
let cwd = session_configuration.cwd.clone();
|
||||
|
||||
let per_turn_config = Arc::new(per_turn_config);
|
||||
let turn_metadata_state = Arc::new(TurnMetadataState::new(
|
||||
conversation_id.to_string(),
|
||||
&session_source,
|
||||
sub_id.clone(),
|
||||
cwd.clone(),
|
||||
session_configuration.sandbox_policy.get(),
|
||||
session_configuration.windows_sandbox_level,
|
||||
));
|
||||
let (current_date, timezone) = local_time_context();
|
||||
TurnContext {
|
||||
sub_id,
|
||||
trace_id: current_span_trace_id(),
|
||||
realtime_active: false,
|
||||
config: per_turn_config.clone(),
|
||||
auth_manager: auth_manager_for_context,
|
||||
model_info: model_info.clone(),
|
||||
session_telemetry: session_telemetry_for_context,
|
||||
provider: provider_for_context,
|
||||
reasoning_effort,
|
||||
reasoning_summary,
|
||||
session_source,
|
||||
environment,
|
||||
cwd,
|
||||
current_date: Some(current_date),
|
||||
timezone: Some(timezone),
|
||||
app_server_client_name: session_configuration.app_server_client_name.clone(),
|
||||
developer_instructions: session_configuration.developer_instructions.clone(),
|
||||
compact_prompt: session_configuration.compact_prompt.clone(),
|
||||
user_instructions: session_configuration.user_instructions.clone(),
|
||||
collaboration_mode: session_configuration.collaboration_mode.clone(),
|
||||
personality: session_configuration.personality,
|
||||
approval_policy: session_configuration.approval_policy.clone(),
|
||||
sandbox_policy: session_configuration.sandbox_policy.clone(),
|
||||
file_system_sandbox_policy: session_configuration.file_system_sandbox_policy.clone(),
|
||||
network_sandbox_policy: session_configuration.network_sandbox_policy,
|
||||
network,
|
||||
windows_sandbox_level: session_configuration.windows_sandbox_level,
|
||||
shell_environment_policy: per_turn_config.permissions.shell_environment_policy.clone(),
|
||||
tools_config,
|
||||
features: per_turn_config.features.clone(),
|
||||
ghost_snapshot: per_turn_config.ghost_snapshot.clone(),
|
||||
final_output_json_schema: None,
|
||||
codex_self_exe: per_turn_config.codex_self_exe.clone(),
|
||||
codex_linux_sandbox_exe: per_turn_config.codex_linux_sandbox_exe.clone(),
|
||||
tool_call_gate: Arc::new(ReadinessFlag::new()),
|
||||
truncation_policy: model_info.truncation_policy.into(),
|
||||
js_repl,
|
||||
dynamic_tools: session_configuration.dynamic_tools.clone(),
|
||||
turn_metadata_state,
|
||||
turn_skills: TurnSkillsContext::new(skills_outcome),
|
||||
turn_timing_state: Arc::new(TurnTimingState::default()),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn new_turn_with_sub_id(
|
||||
&self,
|
||||
sub_id: String,
|
||||
updates: SessionSettingsUpdate,
|
||||
) -> ConstraintResult<Arc<TurnContext>> {
|
||||
let (
|
||||
session_configuration,
|
||||
sandbox_policy_changed,
|
||||
previous_cwd,
|
||||
codex_home,
|
||||
session_source,
|
||||
) = {
|
||||
let mut state = self.state.lock().await;
|
||||
match state.session_configuration.clone().apply(&updates) {
|
||||
Ok(next) => {
|
||||
let previous_cwd = state.session_configuration.cwd.clone();
|
||||
let sandbox_policy_changed =
|
||||
state.session_configuration.sandbox_policy != next.sandbox_policy;
|
||||
let codex_home = next.codex_home.clone();
|
||||
let session_source = next.session_source.clone();
|
||||
state.session_configuration = next.clone();
|
||||
(
|
||||
next,
|
||||
sandbox_policy_changed,
|
||||
previous_cwd,
|
||||
codex_home,
|
||||
session_source,
|
||||
)
|
||||
}
|
||||
Err(err) => {
|
||||
drop(state);
|
||||
self.send_event_raw(Event {
|
||||
id: sub_id.clone(),
|
||||
msg: EventMsg::Error(ErrorEvent {
|
||||
message: err.to_string(),
|
||||
codex_error_info: Some(CodexErrorInfo::BadRequest),
|
||||
}),
|
||||
})
|
||||
.await;
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
self.maybe_refresh_shell_snapshot_for_cwd(
|
||||
&previous_cwd,
|
||||
&session_configuration.cwd,
|
||||
&codex_home,
|
||||
&session_source,
|
||||
);
|
||||
|
||||
if sandbox_policy_changed {
|
||||
self.refresh_managed_network_proxy_for_current_sandbox_policy()
|
||||
.await;
|
||||
}
|
||||
|
||||
Ok(self
|
||||
.new_turn_from_configuration(
|
||||
sub_id,
|
||||
session_configuration,
|
||||
updates.final_output_json_schema,
|
||||
)
|
||||
.await)
|
||||
}
|
||||
|
||||
async fn new_turn_from_configuration(
|
||||
&self,
|
||||
sub_id: String,
|
||||
session_configuration: SessionConfiguration,
|
||||
final_output_json_schema: Option<Option<Value>>,
|
||||
) -> Arc<TurnContext> {
|
||||
let per_turn_config = Self::build_per_turn_config(&session_configuration);
|
||||
{
|
||||
let mcp_connection_manager = self.services.mcp_connection_manager.read().await;
|
||||
mcp_connection_manager.set_approval_policy(&session_configuration.approval_policy);
|
||||
mcp_connection_manager
|
||||
.set_sandbox_policy(per_turn_config.permissions.sandbox_policy.get());
|
||||
}
|
||||
|
||||
let model_info = self
|
||||
.services
|
||||
.models_manager
|
||||
.get_model_info(
|
||||
session_configuration.collaboration_mode.model(),
|
||||
&per_turn_config.to_models_manager_config(),
|
||||
)
|
||||
.await;
|
||||
let plugin_outcome = self
|
||||
.services
|
||||
.plugins_manager
|
||||
.plugins_for_config(&per_turn_config)
|
||||
.await;
|
||||
let effective_skill_roots = plugin_outcome.effective_skill_roots();
|
||||
let skills_input = skills_load_input_from_config(&per_turn_config, effective_skill_roots);
|
||||
let fs = self
|
||||
.services
|
||||
.environment
|
||||
.as_ref()
|
||||
.map(|environment| environment.get_filesystem());
|
||||
let skills_outcome = Arc::new(
|
||||
self.services
|
||||
.skills_manager
|
||||
.skills_for_config(&skills_input, fs)
|
||||
.await,
|
||||
);
|
||||
let mut turn_context: TurnContext = Self::make_turn_context(
|
||||
self.conversation_id,
|
||||
Some(Arc::clone(&self.services.auth_manager)),
|
||||
&self.services.session_telemetry,
|
||||
session_configuration.provider.clone(),
|
||||
&session_configuration,
|
||||
self.services.user_shell.as_ref(),
|
||||
self.services.shell_zsh_path.as_ref(),
|
||||
self.services.main_execve_wrapper_exe.as_ref(),
|
||||
per_turn_config,
|
||||
model_info,
|
||||
&self.services.models_manager,
|
||||
self.services
|
||||
.network_proxy
|
||||
.as_ref()
|
||||
.and_then(|started_proxy| {
|
||||
Self::managed_network_proxy_active_for_sandbox_policy(
|
||||
session_configuration.sandbox_policy.get(),
|
||||
)
|
||||
.then(|| started_proxy.proxy())
|
||||
}),
|
||||
self.services.environment.clone(),
|
||||
sub_id,
|
||||
Arc::clone(&self.js_repl),
|
||||
skills_outcome,
|
||||
);
|
||||
turn_context.realtime_active = self.conversation.running_state().await.is_some();
|
||||
|
||||
if let Some(final_schema) = final_output_json_schema {
|
||||
turn_context.final_output_json_schema = final_schema;
|
||||
}
|
||||
let turn_context = Arc::new(turn_context);
|
||||
turn_context.turn_metadata_state.spawn_git_enrichment_task();
|
||||
turn_context
|
||||
}
|
||||
|
||||
pub(crate) async fn maybe_emit_unknown_model_warning_for_turn(&self, tc: &TurnContext) {
|
||||
if tc.model_info.used_fallback_model_metadata {
|
||||
self.send_event(
|
||||
tc,
|
||||
EventMsg::Warning(WarningEvent {
|
||||
message: format!(
|
||||
"Model metadata for `{}` not found. Defaulting to fallback metadata; this can degrade performance and cause issues.",
|
||||
tc.model_info.slug
|
||||
),
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn new_default_turn(&self) -> Arc<TurnContext> {
|
||||
self.new_default_turn_with_sub_id(self.next_internal_sub_id())
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn new_default_turn_with_sub_id(&self, sub_id: String) -> Arc<TurnContext> {
|
||||
let session_configuration = {
|
||||
let state = self.state.lock().await;
|
||||
state.session_configuration.clone()
|
||||
};
|
||||
self.new_turn_from_configuration(
|
||||
sub_id,
|
||||
session_configuration,
|
||||
/*final_output_json_schema*/ None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user