Files
codex/codex-rs/core/src/session/session.rs
Michael Bolin 18a26d7bbc app-server: accept permission profile overrides (#18279)
## Why

`PermissionProfile` is becoming the canonical permissions shape shared
by core and app-server. After app-server responses expose the active
profile, clients need to be able to send that same shape back when
starting, resuming, forking, or overriding a turn instead of translating
through the legacy `sandbox`/`sandboxPolicy` shorthands.

This still needs to preserve the existing requirements/platform
enforcement model. A profile-shaped request can be downgraded or
rejected by constraints, but the server should keep the user's
elevated-access intent for project trust decisions. Turn-level profile
overrides also need to retain existing read protections, including
deny-read entries and bounded glob-scan metadata, so a permission
override cannot accidentally drop configured protections such as
`**/*.env = deny`.

## What changed

- Adds optional `permissionProfile` request fields to `thread/start`,
`thread/resume`, `thread/fork`, and `turn/start`.
- Rejects ambiguous requests that specify both `permissionProfile` and
the legacy `sandbox`/`sandboxPolicy` fields, including running-thread
resume requests.
- Converts profile-shaped overrides into core runtime filesystem/network
permissions while continuing to derive the constrained legacy sandbox
projection used by existing execution paths.
- Preserves project-trust intent for profile overrides that are
equivalent to workspace-write or full-access sandbox requests.
- Preserves existing deny-read entries and `globScanMaxDepth` when
applying turn-level `permissionProfile` overrides.
- Updates app-server docs plus generated JSON/TypeScript schema fixtures
and regression coverage.

## Verification

- `cargo test -p codex-app-server-protocol schema_fixtures`
- `cargo test -p codex-core
session_configuration_apply_permission_profile_preserves_existing_deny_read_entries`







---
[//]: # (BEGIN SAPLING FOOTER)
Stack created with [Sapling](https://sapling-scm.com). Best reviewed
with [ReviewStack](https://reviewstack.dev/openai/codex/pull/18279).
* #18288
* #18287
* #18286
* #18285
* #18284
* #18283
* #18282
* #18281
* #18280
* __->__ #18279
2026-04-22 13:34:33 -07:00

930 lines
41 KiB
Rust
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use super::*;
use crate::config::ConstraintError;
use tokio::sync::Semaphore;
/// Context for an initialized model agent
///
/// A session has at most 1 running task at a time, and can be interrupted by user input.
pub(crate) struct Session {
pub(crate) conversation_id: ThreadId,
pub(super) tx_event: Sender<Event>,
pub(super) agent_status: watch::Sender<AgentStatus>,
pub(super) out_of_band_elicitation_paused: watch::Sender<bool>,
pub(super) state: Mutex<SessionState>,
/// Serializes rebuild/apply cycles for the running proxy; each cycle
/// rebuilds from the current SessionState while holding this lock.
pub(super) managed_network_proxy_refresh_lock: Semaphore,
/// The set of enabled features should be invariant for the lifetime of the
/// session.
pub(super) features: ManagedFeatures,
pub(super) pending_mcp_server_refresh_config: Mutex<Option<McpServerRefreshConfig>>,
pub(crate) conversation: Arc<RealtimeConversationManager>,
pub(crate) active_turn: Mutex<Option<ActiveTurn>>,
pub(super) mailbox: Mailbox,
pub(super) mailbox_rx: Mutex<MailboxReceiver>,
pub(super) idle_pending_input: Mutex<Vec<ResponseInputItem>>, // TODO (jif) merge with mailbox!
pub(crate) guardian_review_session: GuardianReviewSessionManager,
pub(crate) services: SessionServices,
pub(super) js_repl: Arc<JsReplHandle>,
pub(super) next_internal_sub_id: AtomicU64,
}
#[derive(Clone)]
pub(crate) struct SessionConfiguration {
/// Provider identifier ("openai", "openrouter", ...).
pub(super) provider: ModelProviderInfo,
pub(super) collaboration_mode: CollaborationMode,
pub(super) model_reasoning_summary: Option<ReasoningSummaryConfig>,
pub(super) service_tier: Option<ServiceTier>,
/// Developer instructions that supplement the base instructions.
pub(super) developer_instructions: Option<String>,
/// Model instructions that are appended to the base instructions.
pub(super) user_instructions: Option<String>,
/// Personality preference for the model.
pub(super) personality: Option<Personality>,
/// Base instructions for the session.
pub(super) base_instructions: String,
/// Compact prompt override.
pub(super) compact_prompt: Option<String>,
/// When to escalate for approval for execution
pub(super) approval_policy: Constrained<AskForApproval>,
pub(super) approvals_reviewer: ApprovalsReviewer,
/// How to sandbox commands executed in the system
pub(super) sandbox_policy: Constrained<SandboxPolicy>,
pub(super) file_system_sandbox_policy: FileSystemSandboxPolicy,
pub(super) network_sandbox_policy: NetworkSandboxPolicy,
pub(super) windows_sandbox_level: WindowsSandboxLevel,
/// Absolute working directory that should be treated as the *root* of the
/// session. All relative paths supplied by the model as well as the
/// execution sandbox are resolved against this directory **instead** of
/// the process-wide current working directory.
pub(super) cwd: AbsolutePathBuf,
/// Directory containing all Codex state for this session.
pub(super) codex_home: AbsolutePathBuf,
/// Optional user-facing name for the thread, updated during the session.
pub(super) thread_name: Option<String>,
// TODO(pakrym): Remove config from here
pub(super) original_config_do_not_use: Arc<Config>,
/// Optional service name tag for session metrics.
pub(super) metrics_service_name: Option<String>,
pub(super) app_server_client_name: Option<String>,
pub(super) app_server_client_version: Option<String>,
/// Source of the session (cli, vscode, exec, mcp, ...)
pub(super) session_source: SessionSource,
pub(super) dynamic_tools: Vec<DynamicToolSpec>,
pub(super) persist_extended_history: bool,
pub(super) inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
pub(super) user_shell_override: Option<shell::Shell>,
}
impl SessionConfiguration {
pub(crate) fn codex_home(&self) -> &AbsolutePathBuf {
&self.codex_home
}
pub(super) fn permission_profile(&self) -> PermissionProfile {
PermissionProfile::from_runtime_permissions(
&self.file_system_sandbox_policy,
self.network_sandbox_policy,
)
}
pub(super) fn thread_config_snapshot(&self) -> ThreadConfigSnapshot {
ThreadConfigSnapshot {
model: self.collaboration_mode.model().to_string(),
model_provider_id: self.original_config_do_not_use.model_provider_id.clone(),
service_tier: self.service_tier,
approval_policy: self.approval_policy.value(),
approvals_reviewer: self.approvals_reviewer,
sandbox_policy: self.sandbox_policy.get().clone(),
permission_profile: self.permission_profile(),
cwd: self.cwd.clone(),
ephemeral: self.original_config_do_not_use.ephemeral,
reasoning_effort: self.collaboration_mode.reasoning_effort(),
personality: self.personality,
session_source: self.session_source.clone(),
}
}
pub(crate) fn apply(&self, updates: &SessionSettingsUpdate) -> ConstraintResult<Self> {
let mut next_configuration = self.clone();
let file_system_policy_matches_legacy = self.file_system_sandbox_policy
== FileSystemSandboxPolicy::from_legacy_sandbox_policy(
self.sandbox_policy.get(),
&self.cwd,
);
if let Some(collaboration_mode) = updates.collaboration_mode.clone() {
next_configuration.collaboration_mode = collaboration_mode;
}
if let Some(summary) = updates.reasoning_summary {
next_configuration.model_reasoning_summary = Some(summary);
}
if let Some(service_tier) = updates.service_tier {
next_configuration.service_tier = service_tier;
}
if let Some(personality) = updates.personality {
next_configuration.personality = Some(personality);
}
if let Some(approval_policy) = updates.approval_policy {
next_configuration.approval_policy.set(approval_policy)?;
}
if let Some(approvals_reviewer) = updates.approvals_reviewer {
next_configuration.approvals_reviewer = approvals_reviewer;
}
if let Some(windows_sandbox_level) = updates.windows_sandbox_level {
next_configuration.windows_sandbox_level = windows_sandbox_level;
}
let absolute_cwd = updates
.cwd
.as_ref()
.map(|cwd| {
AbsolutePathBuf::relative_to_current_dir(normalize_for_native_workdir(
cwd.as_path(),
))
.unwrap_or_else(|e| {
warn!("failed to normalize update cwd: {cwd:?}: {e}");
self.cwd.clone()
})
})
.unwrap_or_else(|| self.cwd.clone());
let cwd_changed = absolute_cwd.as_path() != self.cwd.as_path();
next_configuration.cwd = absolute_cwd;
if let Some(permission_profile) = updates.permission_profile.clone() {
let sandbox_policy = permission_profile
.to_legacy_sandbox_policy(&next_configuration.cwd)
.map_err(|err| ConstraintError::InvalidValue {
field_name: "permission_profile",
candidate: format!("{permission_profile:?}"),
allowed: format!(
"permission profiles that can be represented by the active sandbox constraints: {err}"
),
requirement_source: codex_config::RequirementSource::Unknown,
})?;
next_configuration.sandbox_policy.set(sandbox_policy)?;
let (mut file_system_sandbox_policy, network_sandbox_policy) =
permission_profile.to_runtime_permissions();
if file_system_sandbox_policy.glob_scan_max_depth.is_none() {
file_system_sandbox_policy.glob_scan_max_depth =
self.file_system_sandbox_policy.glob_scan_max_depth;
}
for deny_entry in self
.file_system_sandbox_policy
.entries
.iter()
.filter(|entry| {
entry.access == codex_protocol::permissions::FileSystemAccessMode::None
})
{
if !file_system_sandbox_policy
.entries
.iter()
.any(|entry| entry == deny_entry)
{
file_system_sandbox_policy.entries.push(deny_entry.clone());
}
}
next_configuration.file_system_sandbox_policy = file_system_sandbox_policy;
next_configuration.network_sandbox_policy = network_sandbox_policy;
} else if let Some(sandbox_policy) = updates.sandbox_policy.clone() {
next_configuration.sandbox_policy.set(sandbox_policy)?;
next_configuration.file_system_sandbox_policy =
FileSystemSandboxPolicy::from_legacy_sandbox_policy_preserving_deny_entries(
next_configuration.sandbox_policy.get(),
&next_configuration.cwd,
&self.file_system_sandbox_policy,
);
next_configuration.network_sandbox_policy =
NetworkSandboxPolicy::from(next_configuration.sandbox_policy.get());
} else if cwd_changed && file_system_policy_matches_legacy {
// Preserve richer split policies across cwd-only updates; only
// rederive when the session is already using the legacy bridge.
next_configuration.file_system_sandbox_policy =
FileSystemSandboxPolicy::from_legacy_sandbox_policy(
next_configuration.sandbox_policy.get(),
&next_configuration.cwd,
);
}
if let Some(app_server_client_name) = updates.app_server_client_name.clone() {
next_configuration.app_server_client_name = Some(app_server_client_name);
}
if let Some(app_server_client_version) = updates.app_server_client_version.clone() {
next_configuration.app_server_client_version = Some(app_server_client_version);
}
Ok(next_configuration)
}
}
#[derive(Default, Clone)]
pub(crate) struct SessionSettingsUpdate {
pub(crate) cwd: Option<PathBuf>,
pub(crate) approval_policy: Option<AskForApproval>,
pub(crate) approvals_reviewer: Option<ApprovalsReviewer>,
pub(crate) sandbox_policy: Option<SandboxPolicy>,
pub(crate) permission_profile: Option<PermissionProfile>,
pub(crate) windows_sandbox_level: Option<WindowsSandboxLevel>,
pub(crate) collaboration_mode: Option<CollaborationMode>,
pub(crate) reasoning_summary: Option<ReasoningSummaryConfig>,
pub(crate) service_tier: Option<Option<ServiceTier>>,
pub(crate) final_output_json_schema: Option<Option<Value>>,
pub(crate) personality: Option<Personality>,
pub(crate) app_server_client_name: Option<String>,
pub(crate) app_server_client_version: Option<String>,
}
pub(crate) struct AppServerClientMetadata {
pub(crate) client_name: Option<String>,
pub(crate) client_version: Option<String>,
}
impl Session {
#[instrument(name = "session_init", level = "info", skip_all)]
#[allow(clippy::too_many_arguments)]
#[expect(
clippy::await_holding_invalid_type,
reason = "session initialization must serialize access through session-owned manager guards"
)]
pub(crate) async fn new(
mut session_configuration: SessionConfiguration,
config: Arc<Config>,
auth_manager: Arc<AuthManager>,
models_manager: Arc<ModelsManager>,
exec_policy: Arc<ExecPolicyManager>,
tx_event: Sender<Event>,
agent_status: watch::Sender<AgentStatus>,
initial_history: InitialHistory,
session_source: SessionSource,
skills_manager: Arc<SkillsManager>,
plugins_manager: Arc<PluginsManager>,
mcp_manager: Arc<McpManager>,
skills_watcher: Arc<SkillsWatcher>,
agent_control: AgentControl,
environment_manager: Arc<EnvironmentManager>,
analytics_events_client: Option<AnalyticsEventsClient>,
inherited_rollout_trace: RolloutTraceRecorder,
) -> anyhow::Result<Arc<Self>> {
debug!(
"Configuring session: model={}; provider={:?}",
session_configuration.collaboration_mode.model(),
session_configuration.provider
);
let forked_from_id = initial_history.forked_from_id();
let (conversation_id, rollout_params) = match &initial_history {
InitialHistory::New | InitialHistory::Cleared | InitialHistory::Forked(_) => {
let conversation_id = ThreadId::default();
(
conversation_id,
RolloutRecorderParams::new(
conversation_id,
forked_from_id,
session_source,
BaseInstructions {
text: session_configuration.base_instructions.clone(),
},
session_configuration.dynamic_tools.clone(),
if session_configuration.persist_extended_history {
EventPersistenceMode::Extended
} else {
EventPersistenceMode::Limited
},
),
)
}
InitialHistory::Resumed(resumed_history) => (
resumed_history.conversation_id,
RolloutRecorderParams::resume(
resumed_history.rollout_path.clone(),
if session_configuration.persist_extended_history {
EventPersistenceMode::Extended
} else {
EventPersistenceMode::Limited
},
),
),
};
let window_generation = match &initial_history {
InitialHistory::Resumed(resumed_history) => u64::try_from(
resumed_history
.history
.iter()
.filter(|item| matches!(item, RolloutItem::Compacted(_)))
.count(),
)
.unwrap_or(u64::MAX),
InitialHistory::New | InitialHistory::Cleared | InitialHistory::Forked(_) => 0,
};
let state_builder = match &initial_history {
InitialHistory::Resumed(resumed) => metadata::builder_from_items(
resumed.history.as_slice(),
resumed.rollout_path.as_path(),
),
InitialHistory::New | InitialHistory::Cleared | InitialHistory::Forked(_) => None,
};
// Kick off independent async setup tasks in parallel to reduce startup latency.
//
// - initialize RolloutRecorder with new or resumed session info
// - perform default shell discovery
// - load history metadata (skipped for subagents)
let rollout_fut = async {
if config.ephemeral {
Ok::<_, anyhow::Error>((None, None))
} else {
let state_db_ctx = state_db::init(&config).await;
let rollout_recorder = RolloutRecorder::new(
&config,
rollout_params,
state_db_ctx.clone(),
state_builder.clone(),
)
.await?;
Ok((Some(rollout_recorder), state_db_ctx))
}
}
.instrument(info_span!(
"session_init.rollout",
otel.name = "session_init.rollout",
session_init.ephemeral = config.ephemeral,
));
let is_subagent = matches!(
session_configuration.session_source,
SessionSource::SubAgent(_)
);
let history_meta_fut = async {
if is_subagent {
(0, 0)
} else {
crate::message_history::history_metadata(&config).await
}
}
.instrument(info_span!(
"session_init.history_metadata",
otel.name = "session_init.history_metadata",
session_init.is_subagent = is_subagent,
));
let auth_manager_clone = Arc::clone(&auth_manager);
let config_for_mcp = Arc::clone(&config);
let mcp_manager_for_mcp = Arc::clone(&mcp_manager);
let auth_and_mcp_fut = async move {
let auth = auth_manager_clone.auth().await;
let mcp_servers = mcp_manager_for_mcp
.effective_servers(&config_for_mcp, auth.as_ref())
.await;
let auth_statuses = compute_auth_statuses(
mcp_servers.iter(),
config_for_mcp.mcp_oauth_credentials_store_mode,
)
.await;
(auth, mcp_servers, auth_statuses)
}
.instrument(info_span!(
"session_init.auth_mcp",
otel.name = "session_init.auth_mcp",
));
// Join all independent futures.
let (
rollout_recorder_and_state_db,
(history_log_id, history_entry_count),
(auth, mcp_servers, auth_statuses),
) = tokio::join!(rollout_fut, history_meta_fut, auth_and_mcp_fut);
let (rollout_recorder, state_db_ctx) = rollout_recorder_and_state_db.map_err(|e| {
error!("failed to initialize rollout recorder: {e:#}");
e
})?;
let rollout_path = rollout_recorder
.as_ref()
.map(|rec| rec.rollout_path().to_path_buf());
let trace_agent_path = session_configuration
.session_source
.get_agent_path()
.unwrap_or_else(codex_protocol::AgentPath::root);
let trace_task_name =
(!trace_agent_path.is_root()).then(|| trace_agent_path.name().to_string());
let trace_metadata = ThreadStartedTraceMetadata {
thread_id: conversation_id.to_string(),
agent_path: trace_agent_path.to_string(),
task_name: trace_task_name,
nickname: session_configuration.session_source.get_nickname(),
agent_role: session_configuration.session_source.get_agent_role(),
session_source: session_configuration.session_source.clone(),
cwd: session_configuration.cwd.to_path_buf(),
rollout_path: rollout_path.clone(),
model: session_configuration.collaboration_mode.model().to_string(),
provider_name: config.model_provider_id.clone(),
approval_policy: session_configuration.approval_policy.value().to_string(),
sandbox_policy: format!("{:?}", session_configuration.sandbox_policy.get()),
};
let rollout_trace = if matches!(
session_configuration.session_source,
SessionSource::SubAgent(SubAgentSource::ThreadSpawn { .. })
) {
// Spawned child threads are part of their root rollout tree. If
// the parent had no trace recorder, do not create an orphan child
// bundle that looks like an independent rollout.
inherited_rollout_trace
} else {
RolloutTraceRecorder::create_root_or_disabled(conversation_id)
};
rollout_trace.record_thread_started(trace_metadata);
let mut post_session_configured_events = Vec::<Event>::new();
for usage in config.features.legacy_feature_usages() {
post_session_configured_events.push(Event {
id: INITIAL_SUBMIT_ID.to_owned(),
msg: EventMsg::DeprecationNotice(DeprecationNoticeEvent {
summary: usage.summary.clone(),
details: usage.details.clone(),
}),
});
}
if crate::config::uses_deprecated_instructions_file(&config.config_layer_stack) {
post_session_configured_events.push(Event {
id: INITIAL_SUBMIT_ID.to_owned(),
msg: EventMsg::DeprecationNotice(DeprecationNoticeEvent {
summary: "`experimental_instructions_file` is deprecated and ignored. Use `model_instructions_file` instead."
.to_string(),
details: Some(
"Move the setting to `model_instructions_file` in config.toml (or under a profile) to load instructions from a file."
.to_string(),
),
}),
});
}
for message in &config.startup_warnings {
post_session_configured_events.push(Event {
id: "".to_owned(),
msg: EventMsg::Warning(WarningEvent {
message: message.clone(),
}),
});
}
let config_path = config.codex_home.join(CONFIG_TOML_FILE);
if let Some(event) = unstable_features_warning_event(
config
.config_layer_stack
.effective_config()
.get("features")
.and_then(TomlValue::as_table),
config.suppress_unstable_features_warning,
&config.features,
&config_path.display().to_string(),
) {
post_session_configured_events.push(event);
}
if config.permissions.approval_policy.value() == AskForApproval::OnFailure {
post_session_configured_events.push(Event {
id: "".to_owned(),
msg: EventMsg::Warning(WarningEvent {
message: "`on-failure` approval policy is deprecated and will be removed in a future release. Use `on-request` for interactive approvals or `never` for non-interactive runs.".to_string(),
}),
});
}
let auth = auth.as_ref();
let auth_mode = auth.map(CodexAuth::auth_mode).map(TelemetryAuthMode::from);
let account_id = auth.and_then(CodexAuth::get_account_id);
let account_email = auth.and_then(CodexAuth::get_account_email);
let originator = originator().value;
let terminal_type = user_agent();
let session_model = session_configuration.collaboration_mode.model().to_string();
let auth_env_telemetry = collect_auth_env_telemetry(
&session_configuration.provider,
auth_manager.codex_api_key_env_enabled(),
);
let mut session_telemetry = SessionTelemetry::new(
conversation_id,
session_model.as_str(),
session_model.as_str(),
account_id.clone(),
account_email.clone(),
auth_mode,
originator.clone(),
config.otel.log_user_prompt,
terminal_type.clone(),
session_configuration.session_source.clone(),
)
.with_auth_env(auth_env_telemetry.to_otel_metadata());
if let Some(service_name) = session_configuration.metrics_service_name.as_deref() {
session_telemetry = session_telemetry.with_metrics_service_name(service_name);
}
let network_proxy_audit_metadata = NetworkProxyAuditMetadata {
conversation_id: Some(conversation_id.to_string()),
app_version: Some(env!("CARGO_PKG_VERSION").to_string()),
user_account_id: account_id,
auth_mode: auth_mode.map(|mode| mode.to_string()),
originator: Some(originator),
user_email: account_email,
terminal_type: Some(terminal_type),
model: Some(session_model.clone()),
slug: Some(session_model),
};
config.features.emit_metrics(&session_telemetry);
session_telemetry.counter(
THREAD_STARTED_METRIC,
/*inc*/ 1,
&[(
"is_git",
if get_git_repo_root(&session_configuration.cwd).is_some() {
"true"
} else {
"false"
},
)],
);
session_telemetry.conversation_starts(
config.model_provider.name.as_str(),
session_configuration.collaboration_mode.reasoning_effort(),
config
.model_reasoning_summary
.unwrap_or(ReasoningSummaryConfig::Auto),
config.model_context_window,
config.model_auto_compact_token_limit,
config.permissions.approval_policy.value(),
config.permissions.sandbox_policy.get().clone(),
mcp_servers.keys().map(String::as_str).collect(),
config.active_profile.clone(),
);
let use_zsh_fork_shell = config.features.enabled(Feature::ShellZshFork);
let mut default_shell = if let Some(user_shell_override) =
session_configuration.user_shell_override.clone()
{
user_shell_override
} else if use_zsh_fork_shell {
let zsh_path = config.zsh_path.as_ref().ok_or_else(|| {
anyhow::anyhow!(
"zsh fork feature enabled, but `zsh_path` is not configured; set `zsh_path` in config.toml"
)
})?;
let zsh_path = zsh_path.to_path_buf();
shell::get_shell(shell::ShellType::Zsh, Some(&zsh_path)).ok_or_else(|| {
anyhow::anyhow!(
"zsh fork feature enabled, but zsh_path `{}` is not usable; set `zsh_path` to a valid zsh executable",
zsh_path.display()
)
})?
} else {
shell::default_user_shell()
};
// Create the mutable state for the Session.
let shell_snapshot_tx = if config.features.enabled(Feature::ShellSnapshot) {
if let Some(snapshot) = session_configuration.inherited_shell_snapshot.clone() {
let (tx, rx) = watch::channel(Some(snapshot));
default_shell.shell_snapshot = rx;
tx
} else {
ShellSnapshot::start_snapshotting(
config.codex_home.clone(),
conversation_id,
session_configuration.cwd.clone(),
&mut default_shell,
session_telemetry.clone(),
)
}
} else {
let (tx, rx) = watch::channel(None);
default_shell.shell_snapshot = rx;
tx
};
let thread_name =
thread_title_from_state_db(state_db_ctx.as_ref(), &config.codex_home, conversation_id)
.instrument(info_span!(
"session_init.thread_name_lookup",
otel.name = "session_init.thread_name_lookup",
))
.await;
session_configuration.thread_name = thread_name.clone();
let state = SessionState::new(session_configuration.clone());
let managed_network_requirements_configured = config
.config_layer_stack
.requirements_toml()
.network
.is_some();
let managed_network_requirements_enabled = config.managed_network_requirements_enabled();
let network_approval = Arc::new(NetworkApprovalService::default());
// The managed proxy can call back into core for allowlist-miss decisions.
let network_policy_decider_session = if managed_network_requirements_configured {
config
.permissions
.network
.as_ref()
.map(|_| Arc::new(RwLock::new(std::sync::Weak::<Session>::new())))
} else {
None
};
let blocked_request_observer = if managed_network_requirements_configured {
config
.permissions
.network
.as_ref()
.map(|_| build_blocked_request_observer(Arc::clone(&network_approval)))
} else {
None
};
let network_policy_decider =
network_policy_decider_session
.as_ref()
.map(|network_policy_decider_session| {
build_network_policy_decider(
Arc::clone(&network_approval),
Arc::clone(network_policy_decider_session),
)
});
let (network_proxy, session_network_proxy) =
if let Some(spec) = config.permissions.network.as_ref() {
let current_exec_policy = exec_policy.current();
let (network_proxy, session_network_proxy) = Self::start_managed_network_proxy(
spec,
current_exec_policy.as_ref(),
config.permissions.sandbox_policy.get(),
network_policy_decider.as_ref().map(Arc::clone),
blocked_request_observer.as_ref().map(Arc::clone),
managed_network_requirements_configured,
network_proxy_audit_metadata,
)
.instrument(info_span!(
"session_init.network_proxy",
otel.name = "session_init.network_proxy",
session_init.managed_network_requirements_enabled =
managed_network_requirements_enabled,
))
.await?;
(Some(network_proxy), Some(session_network_proxy))
} else {
(None, None)
};
let mut hook_shell_argv =
default_shell.derive_exec_args("", /*use_login_shell*/ false);
let hook_shell_program = hook_shell_argv.remove(0);
let _ = hook_shell_argv.pop();
let hooks = Hooks::new(HooksConfig {
legacy_notify_argv: config.notify.clone(),
feature_enabled: config.features.enabled(Feature::CodexHooks),
config_layer_stack: Some(config.config_layer_stack.clone()),
shell_program: Some(hook_shell_program),
shell_args: hook_shell_argv,
});
for warning in hooks.startup_warnings() {
post_session_configured_events.push(Event {
id: INITIAL_SUBMIT_ID.to_owned(),
msg: EventMsg::Warning(WarningEvent {
message: warning.clone(),
}),
});
}
let installation_id = resolve_installation_id(&config.codex_home).await?;
let analytics_events_client = analytics_events_client.unwrap_or_else(|| {
AnalyticsEventsClient::new(
Arc::clone(&auth_manager),
config.chatgpt_base_url.trim_end_matches('/').to_string(),
config.analytics_enabled,
)
});
let services = SessionServices {
// Initialize the MCP connection manager with an uninitialized
// instance. It will be replaced with one created via
// McpConnectionManager::new() once all its constructor args are
// available. This also ensures `SessionConfigured` is emitted
// before any MCP-related events. It is reasonable to consider
// changing this to use Option or OnceCell, though the current
// setup is straightforward enough and performs well.
mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::new_uninitialized(
&config.permissions.approval_policy,
&config.permissions.sandbox_policy,
))),
mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()),
unified_exec_manager: UnifiedExecProcessManager::new(
config.background_terminal_max_timeout,
),
shell_zsh_path: config.zsh_path.clone(),
main_execve_wrapper_exe: config.main_execve_wrapper_exe.clone(),
analytics_events_client,
hooks,
rollout: Mutex::new(rollout_recorder),
rollout_trace,
user_shell: Arc::new(default_shell),
shell_snapshot_tx,
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
exec_policy,
auth_manager: Arc::clone(&auth_manager),
session_telemetry,
models_manager: Arc::clone(&models_manager),
tool_approvals: Mutex::new(ApprovalStore::default()),
guardian_rejections: Mutex::new(HashMap::new()),
guardian_rejection_circuit_breaker: Mutex::new(Default::default()),
runtime_handle: tokio::runtime::Handle::current(),
skills_manager,
plugins_manager: Arc::clone(&plugins_manager),
mcp_manager: Arc::clone(&mcp_manager),
skills_watcher,
agent_control,
network_proxy,
network_approval: Arc::clone(&network_approval),
state_db: state_db_ctx.clone(),
thread_store: LocalThreadStore::new(RolloutConfig::from_view(config.as_ref())),
model_client: ModelClient::new(
Some(Arc::clone(&auth_manager)),
conversation_id,
installation_id,
session_configuration.provider.clone(),
session_configuration.session_source.clone(),
config.model_verbosity,
config.features.enabled(Feature::EnableRequestCompression),
config.features.enabled(Feature::RuntimeMetrics),
Self::build_model_client_beta_features_header(config.as_ref()),
),
code_mode_service: crate::tools::code_mode::CodeModeService::new(
config.js_repl_node_path.clone(),
),
environment_manager,
};
services
.model_client
.set_window_generation(window_generation);
let js_repl = Arc::new(JsReplHandle::with_node_path(
config.js_repl_node_path.clone(),
config.js_repl_node_module_dirs.clone(),
));
let (out_of_band_elicitation_paused, _out_of_band_elicitation_paused_rx) =
watch::channel(false);
let (mailbox, mailbox_rx) = Mailbox::new();
let sess = Arc::new(Session {
conversation_id,
tx_event: tx_event.clone(),
agent_status,
out_of_band_elicitation_paused,
state: Mutex::new(state),
managed_network_proxy_refresh_lock: Semaphore::new(/*permits*/ 1),
features: config.features.clone(),
pending_mcp_server_refresh_config: Mutex::new(None),
conversation: Arc::new(RealtimeConversationManager::new()),
active_turn: Mutex::new(None),
mailbox,
mailbox_rx: Mutex::new(mailbox_rx),
idle_pending_input: Mutex::new(Vec::new()),
guardian_review_session: GuardianReviewSessionManager::default(),
services,
js_repl,
next_internal_sub_id: AtomicU64::new(0),
});
if let Some(network_policy_decider_session) = network_policy_decider_session {
let mut guard = network_policy_decider_session.write().await;
*guard = Arc::downgrade(&sess);
}
// Dispatch the SessionConfiguredEvent first and then report any errors.
// If resuming, include converted initial messages in the payload so UIs can render them immediately.
let initial_messages = initial_history.get_event_msgs();
let events = std::iter::once(Event {
id: INITIAL_SUBMIT_ID.to_owned(),
msg: EventMsg::SessionConfigured(SessionConfiguredEvent {
session_id: conversation_id,
forked_from_id,
thread_name: session_configuration.thread_name.clone(),
model: session_configuration.collaboration_mode.model().to_string(),
model_provider_id: config.model_provider_id.clone(),
service_tier: session_configuration.service_tier,
approval_policy: session_configuration.approval_policy.value(),
approvals_reviewer: session_configuration.approvals_reviewer,
sandbox_policy: session_configuration.sandbox_policy.get().clone(),
cwd: session_configuration.cwd.clone(),
reasoning_effort: session_configuration.collaboration_mode.reasoning_effort(),
history_log_id,
history_entry_count,
initial_messages,
network_proxy: session_network_proxy.filter(|_| {
Self::managed_network_proxy_active_for_sandbox_policy(
session_configuration.sandbox_policy.get(),
)
}),
rollout_path,
}),
})
.chain(post_session_configured_events.into_iter());
for event in events {
sess.send_event_raw(event).await;
}
// Start the watcher after SessionConfigured so it cannot emit earlier events.
sess.start_skills_watcher_listener();
let mut required_mcp_servers: Vec<String> = mcp_servers
.iter()
.filter(|(_, server)| server.enabled && server.required)
.map(|(name, _)| name.clone())
.collect();
required_mcp_servers.sort();
let enabled_mcp_server_count = mcp_servers.values().filter(|server| server.enabled).count();
let required_mcp_server_count = required_mcp_servers.len();
let tool_plugin_provenance = mcp_manager.tool_plugin_provenance(config.as_ref()).await;
{
let mut cancel_guard = sess.services.mcp_startup_cancellation_token.lock().await;
cancel_guard.cancel();
*cancel_guard = CancellationToken::new();
}
let (mcp_connection_manager, cancel_token) = McpConnectionManager::new(
&mcp_servers,
config.mcp_oauth_credentials_store_mode,
auth_statuses.clone(),
&session_configuration.approval_policy,
INITIAL_SUBMIT_ID.to_owned(),
tx_event.clone(),
session_configuration.sandbox_policy.get().clone(),
McpRuntimeEnvironment::new(
sess.services
.environment_manager
.default_environment()
.unwrap_or_else(|| sess.services.environment_manager.local_environment()),
session_configuration.cwd.to_path_buf(),
),
config.codex_home.to_path_buf(),
codex_apps_tools_cache_key(auth),
tool_plugin_provenance,
)
.instrument(info_span!(
"session_init.mcp_manager_init",
otel.name = "session_init.mcp_manager_init",
session_init.enabled_mcp_server_count = enabled_mcp_server_count,
session_init.required_mcp_server_count = required_mcp_server_count,
))
.await;
{
let mut manager_guard = sess.services.mcp_connection_manager.write().await;
*manager_guard = mcp_connection_manager;
}
{
let mut cancel_guard = sess.services.mcp_startup_cancellation_token.lock().await;
if cancel_guard.is_cancelled() {
cancel_token.cancel();
}
*cancel_guard = cancel_token;
}
if !required_mcp_servers.is_empty() {
let failures = sess
.services
.mcp_connection_manager
.read()
.await
.required_startup_failures(&required_mcp_servers)
.instrument(info_span!(
"session_init.required_mcp_wait",
otel.name = "session_init.required_mcp_wait",
session_init.required_mcp_server_count = required_mcp_server_count,
))
.await;
if !failures.is_empty() {
let details = failures
.iter()
.map(|failure| format!("{}: {}", failure.server, failure.error))
.collect::<Vec<_>>()
.join("; ");
return Err(anyhow::anyhow!(
"required MCP servers failed to initialize: {details}"
));
}
}
sess.schedule_startup_prewarm(session_configuration.base_instructions.clone())
.await;
let session_start_source = match &initial_history {
InitialHistory::Resumed(_) => codex_hooks::SessionStartSource::Resume,
InitialHistory::New | InitialHistory::Forked(_) => {
codex_hooks::SessionStartSource::Startup
}
InitialHistory::Cleared => codex_hooks::SessionStartSource::Clear,
};
// record_initial_history can emit events. We record only after the SessionConfiguredEvent is emitted.
sess.record_initial_history(initial_history).await;
{
let mut state = sess.state.lock().await;
state.set_pending_session_start_source(Some(session_start_source));
}
memories::start_memories_startup_task(
&sess,
Arc::clone(&config),
&session_configuration.session_source,
);
Ok(sess)
}
}