Files
codex/codex-rs/core/src/session/mod.rs
jif-oai 569ff6a1c4 extension: move git attribution into an extension (#21738)
## Why

Git commit attribution is prompt policy, not session orchestration.
After #21737 adds the extension-registry seam, this moves that
prompt-only behavior out of `codex-core` so `Session` can consume
extension-contributed prompt fragments instead of owning a one-off
policy path itself.

Before this PR, `Session` injected the trailer instruction directly from
`codex-core` ([session
assembly](a57a747eb6/codex-rs/core/src/session/mod.rs (L2733-L2739)),
[helper
module](a57a747eb6/codex-rs/core/src/commit_attribution.rs (L1-L33))).
This branch moves that same responsibility into
[`codex-git-attribution`](b5029a6736/codex-rs/ext/git-attribution/src/lib.rs (L14-L100)).

## What changed

- Added the `codex-git-attribution` extension crate.
- Snapshot `CodexGitCommit` plus `commit_attribution` at thread start,
then contribute the developer-policy fragment through the extension
registry.
- Register the extension in app-server thread extensions.
- Remove the old `codex-core` helper module and direct `Session`
injection path.

This keeps the existing behavior intact: the prompt is only contributed
when `CodexGitCommit` is enabled, blank attribution still disables the
trailer, and the default remains `Codex <noreply@openai.com>`.

## Stack

- Stacked on #21737.
2026-05-11 12:53:15 +02:00

3379 lines
126 KiB
Rust

use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt::Debug;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use crate::agent::AgentControl;
use crate::agent::AgentStatus;
use crate::agent::Mailbox;
use crate::agent::MailboxReceiver;
use crate::agent::agent_status_from_event;
use crate::agent::status::is_final;
use crate::attestation::AttestationProvider;
use crate::build_available_skills;
use crate::compact;
use crate::config::ManagedFeatures;
use crate::config::resolve_tool_suggest_config_from_layer_stack;
use crate::connectors;
use crate::context::ApprovedCommandPrefixSaved;
use crate::context::AppsInstructions;
use crate::context::AvailablePluginsInstructions;
use crate::context::AvailableSkillsInstructions;
use crate::context::CollaborationModeInstructions;
use crate::context::ContextualUserFragment;
use crate::context::NetworkRuleSaved;
use crate::context::PermissionsInstructions;
use crate::context::PersonalitySpecInstructions;
use crate::default_skill_metadata_budget;
use crate::environment_selection::ResolvedTurnEnvironments;
use crate::exec_policy::ExecPolicyManager;
use crate::parse_turn_item;
use crate::path_utils::normalize_for_native_workdir;
use crate::realtime_conversation::RealtimeConversationManager;
use crate::session_prefix::format_subagent_notification_message;
use crate::skills::SkillRenderSideEffects;
use crate::skills_load_input_from_config;
use crate::turn_metadata::TurnMetadataState;
use crate::turn_timing::now_unix_timestamp_ms;
use async_channel::Receiver;
use async_channel::Sender;
use chrono::Local;
use chrono::Utc;
use codex_analytics::AnalyticsEventsClient;
use codex_analytics::SubAgentThreadStartedInput;
use codex_app_server_protocol::McpServerElicitationRequest;
use codex_app_server_protocol::McpServerElicitationRequestParams;
use codex_config::types::OAuthCredentialsStoreMode;
use codex_exec_server::Environment;
use codex_exec_server::EnvironmentManager;
use codex_exec_server::FileSystemSandboxContext;
use codex_extension_api::PromptSlot;
use codex_features::FEATURES;
use codex_features::Feature;
use codex_features::unstable_features_warning_event;
use codex_hooks::Hooks;
use codex_hooks::HooksConfig;
use codex_login::AuthManager;
use codex_login::CodexAuth;
use codex_login::auth_env_telemetry::collect_auth_env_telemetry;
use codex_login::default_client::originator;
use codex_mcp::McpConnectionManager;
use codex_mcp::McpRuntimeEnvironment;
use codex_mcp::ToolInfo;
use codex_mcp::codex_apps_tools_cache_key;
use codex_models_manager::manager::RefreshStrategy;
use codex_models_manager::manager::SharedModelsManager;
use codex_network_proxy::NetworkProxy;
use codex_network_proxy::NetworkProxyAuditMetadata;
use codex_network_proxy::normalize_host;
use codex_otel::current_span_trace_id;
use codex_otel::current_span_w3c_trace_context;
use codex_otel::set_parent_from_w3c_trace_context;
use codex_protocol::ThreadId;
use codex_protocol::ToolName;
use codex_protocol::account::PlanType as AccountPlanType;
use codex_protocol::approvals::ElicitationRequestEvent;
use codex_protocol::approvals::ExecPolicyAmendment;
use codex_protocol::approvals::NetworkPolicyAmendment;
use codex_protocol::approvals::NetworkPolicyRuleAction;
use codex_protocol::config_types::ApprovalsReviewer;
use codex_protocol::config_types::ModeKind;
use codex_protocol::config_types::Settings;
use codex_protocol::config_types::WebSearchMode;
use codex_protocol::dynamic_tools::DynamicToolResponse;
use codex_protocol::dynamic_tools::DynamicToolSpec;
use codex_protocol::items::TurnItem;
use codex_protocol::items::UserMessageItem;
use codex_protocol::mcp::CallToolResult;
use codex_protocol::models::ActivePermissionProfile;
use codex_protocol::models::AdditionalPermissionProfile;
use codex_protocol::models::BaseInstructions;
use codex_protocol::models::PermissionProfile;
use codex_protocol::models::SandboxEnforcement;
use codex_protocol::models::format_allow_prefixes;
use codex_protocol::openai_models::ModelInfo;
use codex_protocol::permissions::FileSystemSandboxPolicy;
use codex_protocol::permissions::NetworkSandboxPolicy;
use codex_protocol::protocol::FileChange;
use codex_protocol::protocol::HasLegacyEvent;
use codex_protocol::protocol::InterAgentCommunication;
use codex_protocol::protocol::ItemCompletedEvent;
use codex_protocol::protocol::ItemStartedEvent;
use codex_protocol::protocol::RawResponseItemEvent;
use codex_protocol::protocol::ReviewRequest;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::ThreadSource;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnContextItem;
use codex_protocol::protocol::TurnContextNetworkItem;
use codex_protocol::protocol::TurnEnvironmentSelection;
use codex_protocol::protocol::W3cTraceContext;
use codex_protocol::request_permissions::PermissionGrantScope;
use codex_protocol::request_permissions::RequestPermissionProfile;
use codex_protocol::request_permissions::RequestPermissionsArgs;
use codex_protocol::request_permissions::RequestPermissionsEvent;
use codex_protocol::request_permissions::RequestPermissionsResponse;
use codex_protocol::request_user_input::RequestUserInputArgs;
use codex_protocol::request_user_input::RequestUserInputResponse;
use codex_rmcp_client::ElicitationResponse;
use codex_rollout::state_db;
use codex_rollout_trace::AgentResultTracePayload;
use codex_rollout_trace::ThreadStartedTraceMetadata;
use codex_rollout_trace::ThreadTraceContext;
use codex_sandboxing::policy_transforms::intersect_permission_profiles;
use codex_shell_command::parse_command::parse_command;
use codex_terminal_detection::user_agent;
use codex_thread_store::CreateThreadParams;
use codex_thread_store::LiveThread;
use codex_thread_store::LiveThreadInitGuard;
use codex_thread_store::LocalThreadStore;
use codex_thread_store::ReadThreadParams;
use codex_thread_store::ResumeThreadParams;
use codex_thread_store::ThreadEventPersistenceMode;
use codex_thread_store::ThreadPersistenceMetadata;
use codex_thread_store::ThreadStore;
use codex_utils_output_truncation::TruncationPolicy;
use futures::future::BoxFuture;
use futures::future::Shared;
use futures::prelude::*;
use rmcp::model::ListResourceTemplatesResult;
use rmcp::model::ListResourcesResult;
use rmcp::model::PaginatedRequestParams;
use rmcp::model::ReadResourceRequestParams;
use rmcp::model::ReadResourceResult;
use rmcp::model::RequestId;
use serde_json::Value;
use tokio::sync::Mutex;
use tokio::sync::RwLock;
use tokio::sync::oneshot;
use tokio::sync::watch;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use toml::Value as TomlValue;
use tracing::Instrument;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing::info_span;
use tracing::instrument;
use tracing::warn;
use uuid::Uuid;
use crate::client::ModelClient;
use crate::codex_thread::ThreadConfigSnapshot;
use crate::compact::collect_user_messages;
use crate::config::Config;
use crate::config::Constrained;
use crate::config::ConstraintResult;
use crate::config::StartedNetworkProxy;
use crate::config::resolve_web_search_mode_for_turn;
use crate::context_manager::ContextManager;
use crate::context_manager::TotalTokenUsageBreakdown;
use crate::thread_rollout_truncation::initial_history_has_prior_user_turns;
use codex_config::CONFIG_TOML_FILE;
use codex_config::types::McpServerConfig;
use codex_model_provider_info::ModelProviderInfo;
use codex_protocol::config_types::ShellEnvironmentPolicy;
use codex_protocol::error::CodexErr;
use codex_protocol::error::Result as CodexResult;
#[cfg(test)]
use codex_protocol::exec_output::StreamOutput;
mod config_lock;
mod handlers;
mod mcp;
mod multi_agents;
mod review;
mod rollout_reconstruction;
#[allow(clippy::module_inception)]
pub(crate) mod session;
pub(crate) mod turn;
pub(crate) mod turn_context;
use self::config_lock::export_config_lock_if_configured;
use self::config_lock::validate_config_lock_if_configured;
#[cfg(test)]
use self::handlers::submission_dispatch_span;
use self::handlers::submission_loop;
use self::review::spawn_review_thread;
use self::session::AppServerClientMetadata;
use self::session::Session;
use self::session::SessionConfiguration;
pub(crate) use self::session::SessionSettingsUpdate;
#[cfg(test)]
use self::turn::AssistantMessageStreamParsers;
#[cfg(test)]
use self::turn::collect_explicit_app_ids_from_skill_items;
#[cfg(test)]
use self::turn::filter_connectors_for_input;
use self::turn::realtime_text_for_event;
use self::turn_context::TurnContext;
use self::turn_context::TurnSkillsContext;
#[cfg(test)]
mod rollout_reconstruction_tests;
#[derive(Debug, PartialEq)]
pub enum SteerInputError {
NoActiveTurn(Vec<UserInput>),
ExpectedTurnMismatch { expected: String, actual: String },
ActiveTurnNotSteerable { turn_kind: NonSteerableTurnKind },
EmptyInput,
}
impl SteerInputError {
fn to_error_event(&self) -> ErrorEvent {
match self {
Self::NoActiveTurn(_) => ErrorEvent {
message: "no active turn to steer".to_string(),
codex_error_info: Some(CodexErrorInfo::BadRequest),
},
Self::ExpectedTurnMismatch { expected, actual } => ErrorEvent {
message: format!("expected active turn id `{expected}` but found `{actual}`"),
codex_error_info: Some(CodexErrorInfo::BadRequest),
},
Self::ActiveTurnNotSteerable { turn_kind } => {
let turn_kind_label = match turn_kind {
NonSteerableTurnKind::Review => "review",
NonSteerableTurnKind::Compact => "compact",
};
ErrorEvent {
message: format!("cannot steer a {turn_kind_label} turn"),
codex_error_info: Some(CodexErrorInfo::ActiveTurnNotSteerable {
turn_kind: *turn_kind,
}),
}
}
Self::EmptyInput => ErrorEvent {
message: "input must not be empty".to_string(),
codex_error_info: Some(CodexErrorInfo::BadRequest),
},
}
}
}
/// Notes from the previous real user turn.
///
/// Conceptually this is the same role that `previous_model` used to fill, but
/// it can carry other prior-turn settings that matter when constructing
/// sensible state-change diffs or full-context reinjection, such as model
/// switches or detecting a prior `realtime_active -> false` transition.
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct PreviousTurnSettings {
pub(crate) model: String,
pub(crate) realtime_active: Option<bool>,
}
#[cfg(test)]
use crate::SkillLoadOutcome;
#[cfg(test)]
use crate::SkillMetadata;
use crate::SkillsManager;
use crate::agents_md::AgentsMdManager;
use crate::context::UserInstructions;
use crate::exec_policy::ExecPolicyUpdateError;
use crate::guardian::GuardianReviewSessionManager;
use crate::mcp::McpManager;
use crate::network_policy_decision::execpolicy_network_rule_amendment;
use crate::rollout::map_session_init_error;
use crate::session_startup_prewarm::SessionStartupPrewarmHandle;
use crate::shell;
use crate::shell_snapshot::ShellSnapshot;
use crate::state::ActiveTurn;
use crate::state::MailboxDeliveryPhase;
use crate::state::PendingRequestPermissions;
use crate::state::SessionServices;
use crate::state::SessionState;
#[cfg(test)]
use crate::stream_events_utils::HandleOutputCtx;
#[cfg(test)]
use crate::stream_events_utils::handle_output_item_done;
use crate::tasks::ReviewTask;
use crate::tools::network_approval::NetworkApprovalService;
use crate::tools::network_approval::build_blocked_request_observer;
use crate::tools::network_approval::build_network_policy_decider;
#[cfg(test)]
use crate::tools::parallel::ToolCallRuntime;
use crate::tools::sandboxing::ApprovalStore;
use crate::turn_timing::TurnTimingState;
use crate::turn_timing::record_turn_ttfm_metric;
use crate::unified_exec::UnifiedExecProcessManager;
use crate::windows_sandbox::WindowsSandboxLevelExt;
use codex_core_plugins::PluginsManager;
use codex_git_utils::get_git_repo_root;
use codex_mcp::compute_auth_statuses;
use codex_mcp::effective_mcp_servers_from_configured;
use codex_mcp::host_owned_codex_apps_enabled;
use codex_otel::SessionTelemetry;
use codex_otel::THREAD_STARTED_METRIC;
use codex_otel::TelemetryAuthMode;
use codex_protocol::config_types::CollaborationMode;
use codex_protocol::config_types::Personality;
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
use codex_protocol::config_types::ServiceTier;
use codex_protocol::config_types::WindowsSandboxLevel;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
use codex_protocol::protocol::ApplyPatchApprovalRequestEvent;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::CodexErrorInfo;
use codex_protocol::protocol::CompactedItem;
use codex_protocol::protocol::DeprecationNoticeEvent;
use codex_protocol::protocol::ErrorEvent;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::ExecApprovalRequestEvent;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::McpServerRefreshConfig;
use codex_protocol::protocol::ModelRerouteEvent;
use codex_protocol::protocol::ModelRerouteReason;
use codex_protocol::protocol::ModelVerification;
use codex_protocol::protocol::ModelVerificationEvent;
use codex_protocol::protocol::NetworkApprovalContext;
use codex_protocol::protocol::NonSteerableTurnKind;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::RateLimitSnapshot;
use codex_protocol::protocol::RequestUserInputEvent;
use codex_protocol::protocol::ReviewDecision;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::SessionConfiguredEvent;
use codex_protocol::protocol::SessionNetworkProxyRuntime;
use codex_protocol::protocol::StreamErrorEvent;
use codex_protocol::protocol::Submission;
use codex_protocol::protocol::ThreadMemoryMode;
use codex_protocol::protocol::TokenCountEvent;
use codex_protocol::protocol::TokenUsage;
use codex_protocol::protocol::TokenUsageInfo;
use codex_protocol::protocol::WarningEvent;
use codex_protocol::user_input::UserInput;
use codex_tools::ToolEnvironmentMode;
use codex_tools::ToolsConfig;
use codex_tools::ToolsConfigParams;
use codex_utils_absolute_path::AbsolutePathBuf;
use codex_utils_readiness::ReadinessFlag;
#[cfg(test)]
use codex_utils_stream_parser::ProposedPlanSegment;
/// The high-level interface to the Codex system.
/// It operates as a queue pair where you send submissions and receive events.
pub struct Codex {
pub(crate) tx_sub: Sender<Submission>,
pub(crate) rx_event: Receiver<Event>,
// Last known status of the agent.
pub(crate) agent_status: watch::Receiver<AgentStatus>,
pub(crate) session: Arc<Session>,
// Shared future for the background submission loop completion so multiple
// callers can wait for shutdown.
pub(crate) session_loop_termination: SessionLoopTermination,
}
pub(crate) type SessionLoopTermination = Shared<BoxFuture<'static, ()>>;
/// Wrapper returned by [`Codex::spawn`] containing the spawned [`Codex`] and
/// the unique session id.
pub struct CodexSpawnOk {
pub codex: Codex,
pub thread_id: ThreadId,
}
pub(crate) struct CodexSpawnArgs {
pub(crate) config: Config,
pub(crate) installation_id: String,
pub(crate) auth_manager: Arc<AuthManager>,
pub(crate) models_manager: SharedModelsManager,
pub(crate) environment_manager: Arc<EnvironmentManager>,
pub(crate) skills_manager: Arc<SkillsManager>,
pub(crate) plugins_manager: Arc<PluginsManager>,
pub(crate) mcp_manager: Arc<McpManager>,
pub(crate) extensions: Arc<codex_extension_api::ExtensionRegistry<crate::config::Config>>,
pub(crate) conversation_history: InitialHistory,
pub(crate) session_source: SessionSource,
pub(crate) thread_source: Option<ThreadSource>,
pub(crate) agent_control: AgentControl,
pub(crate) dynamic_tools: Vec<DynamicToolSpec>,
pub(crate) persist_extended_history: bool,
pub(crate) metrics_service_name: Option<String>,
pub(crate) inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
pub(crate) inherited_exec_policy: Option<Arc<ExecPolicyManager>>,
/// Parent rollout trace used only to derive fresh spawned child traces.
///
/// Root sessions and non-thread-spawn subagents pass a disabled context;
/// `Session::new` creates the root trace itself when rollout tracing is enabled.
pub(crate) parent_rollout_thread_trace: ThreadTraceContext,
pub(crate) user_shell_override: Option<shell::Shell>,
pub(crate) parent_trace: Option<W3cTraceContext>,
pub(crate) environment_selections: ResolvedTurnEnvironments,
pub(crate) analytics_events_client: Option<AnalyticsEventsClient>,
pub(crate) thread_store: Arc<dyn ThreadStore>,
pub(crate) attestation_provider: Option<Arc<dyn AttestationProvider>>,
}
pub(crate) const INITIAL_SUBMIT_ID: &str = "";
pub(crate) const SUBMISSION_CHANNEL_CAPACITY: usize = 512;
const CYBER_VERIFY_URL: &str = "https://chatgpt.com/cyber";
const CYBER_SAFETY_URL: &str = "https://developers.openai.com/codex/concepts/cyber-safety";
impl Codex {
/// Spawn a new [`Codex`] and initialize the session.
pub(crate) async fn spawn(args: CodexSpawnArgs) -> CodexResult<CodexSpawnOk> {
let parent_trace = match args.parent_trace {
Some(trace) => {
if codex_otel::context_from_w3c_trace_context(&trace).is_some() {
Some(trace)
} else {
warn!("ignoring invalid thread spawn trace carrier");
None
}
}
None => None,
};
let thread_spawn_span = info_span!("thread_spawn", otel.name = "thread_spawn");
if let Some(trace) = parent_trace.as_ref() {
let _ = set_parent_from_w3c_trace_context(&thread_spawn_span, trace);
}
Self::spawn_internal(CodexSpawnArgs {
parent_trace,
..args
})
.instrument(thread_spawn_span)
.await
}
async fn spawn_internal(args: CodexSpawnArgs) -> CodexResult<CodexSpawnOk> {
let CodexSpawnArgs {
mut config,
installation_id,
auth_manager,
models_manager,
environment_manager,
skills_manager,
plugins_manager,
mcp_manager,
extensions,
conversation_history,
session_source,
thread_source,
agent_control,
dynamic_tools,
persist_extended_history,
metrics_service_name,
inherited_shell_snapshot,
user_shell_override,
inherited_exec_policy,
parent_rollout_thread_trace,
parent_trace: _,
environment_selections,
analytics_events_client,
thread_store,
attestation_provider,
} = args;
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
let (tx_event, rx_event) = async_channel::unbounded();
let fs = environment_selections.primary_filesystem();
let plugins_input = config.plugins_config_input();
let plugin_outcome = plugins_manager.plugins_for_config(&plugins_input).await;
let effective_skill_roots = plugin_outcome.effective_plugin_skill_roots();
let skills_input = skills_load_input_from_config(&config, effective_skill_roots);
let loaded_skills = skills_manager.skills_for_config(&skills_input, fs).await;
for err in &loaded_skills.errors {
error!(
"failed to load skill {}: {}",
err.path.display(),
err.message
);
}
if let SessionSource::SubAgent(SubAgentSource::ThreadSpawn { depth, .. }) = session_source
&& depth >= config.agent_max_depth
&& !config.features.enabled(Feature::MultiAgentV2)
{
let _ = config.features.disable(Feature::SpawnCsv);
let _ = config.features.disable(Feature::Collab);
}
let primary_environment = environment_selections.primary_environment();
let user_instructions = AgentsMdManager::new(&config)
.user_instructions(primary_environment.as_deref())
.await;
let exec_policy = if crate::guardian::is_guardian_reviewer_source(&session_source) {
// Guardian review should rely on the built-in shell safety checks,
// not on caller-provided exec-policy rules that could shape the
// reviewer or silently auto-approve commands.
Arc::new(ExecPolicyManager::default())
} else if let Some(exec_policy) = &inherited_exec_policy {
Arc::clone(exec_policy)
} else {
Arc::new(
ExecPolicyManager::load(&config.config_layer_stack)
.await
.map_err(|err| CodexErr::Fatal(format!("failed to load rules: {err}")))?,
)
};
let config = Arc::new(config);
let refresh_strategy = if session_source.is_non_root_agent() {
codex_models_manager::manager::RefreshStrategy::Offline
} else {
codex_models_manager::manager::RefreshStrategy::OnlineIfUncached
};
if config.model.is_none()
|| !matches!(
refresh_strategy,
codex_models_manager::manager::RefreshStrategy::Offline
)
{
let _ = models_manager.list_models(refresh_strategy).await;
}
let model = models_manager
.get_default_model(&config.model, refresh_strategy)
.await;
// Resolve base instructions for the session. Priority order:
// 1. config.base_instructions override
// 2. conversation history => session_meta.base_instructions
// 3. base_instructions for current model
let model_info = models_manager
.get_model_info(model.as_str(), &config.to_models_manager_config())
.await;
let base_instructions = config
.base_instructions
.clone()
.or_else(|| conversation_history.get_base_instructions().map(|s| s.text))
.unwrap_or_else(|| model_info.get_model_instructions(config.personality));
// Respect thread-start tools. When missing (resumed/forked threads), read from the db
// first, then fall back to rollout-file tools.
let persisted_tools = if dynamic_tools.is_empty() {
let thread_id = match &conversation_history {
InitialHistory::Resumed(resumed) => Some(resumed.conversation_id),
InitialHistory::Forked(_) => conversation_history.forked_from_id(),
InitialHistory::New | InitialHistory::Cleared => None,
};
match thread_id {
Some(thread_id) => {
let state_db_ctx = if config.ephemeral {
None
} else if let Some(local_store) =
thread_store.as_any().downcast_ref::<LocalThreadStore>()
{
local_store.state_db().await
} else {
None
};
state_db::get_dynamic_tools(state_db_ctx.as_deref(), thread_id, "codex_spawn")
.await
}
None => None,
}
} else {
None
};
let dynamic_tools = if dynamic_tools.is_empty() {
persisted_tools
.or_else(|| conversation_history.get_dynamic_tools())
.unwrap_or_default()
} else {
dynamic_tools
};
// TODO (aibrahim): Consolidate config.model and config.model_reasoning_effort into config.collaboration_mode
// to avoid extracting these fields separately and constructing CollaborationMode here.
let collaboration_mode = CollaborationMode {
mode: ModeKind::Default,
settings: Settings {
model: model.clone(),
reasoning_effort: config.model_reasoning_effort,
developer_instructions: None,
},
};
let account_plan_type = auth_manager
.auth_cached()
.and_then(|auth| auth.account_plan_type());
let service_tier = get_service_tier(
config.service_tier.clone(),
config.notices.fast_default_opt_out.unwrap_or(false),
account_plan_type,
config.features.enabled(Feature::FastMode),
);
let session_configuration = SessionConfiguration {
provider: config.model_provider.clone(),
collaboration_mode,
model_reasoning_summary: config.model_reasoning_summary,
service_tier,
developer_instructions: config.developer_instructions.clone(),
user_instructions,
personality: config.personality,
base_instructions,
compact_prompt: config.compact_prompt.clone(),
approval_policy: config.permissions.approval_policy.clone(),
approvals_reviewer: config.approvals_reviewer,
permission_profile: config.permissions.permission_profile.clone(),
active_permission_profile: config.permissions.active_permission_profile(),
windows_sandbox_level: WindowsSandboxLevel::from_config(&config),
cwd: config.cwd.clone(),
codex_home: config.codex_home.clone(),
thread_name: None,
environments: environment_selections.to_selections(),
original_config_do_not_use: Arc::clone(&config),
metrics_service_name,
app_server_client_name: None,
app_server_client_version: None,
session_source,
thread_source,
dynamic_tools,
persist_extended_history,
inherited_shell_snapshot,
user_shell_override,
};
// Generate a unique ID for the lifetime of this Codex session.
let session_source_clone = session_configuration.session_source.clone();
let (agent_status_tx, agent_status_rx) = watch::channel(AgentStatus::PendingInit);
let session = Session::new(
session_configuration,
config.clone(),
installation_id,
auth_manager.clone(),
models_manager.clone(),
exec_policy,
tx_event.clone(),
agent_status_tx.clone(),
conversation_history,
session_source_clone,
skills_manager,
plugins_manager,
mcp_manager.clone(),
extensions,
agent_control,
environment_manager,
analytics_events_client,
thread_store,
parent_rollout_thread_trace,
attestation_provider,
)
.await
.map_err(|e| {
error!("Failed to create session: {e:#}");
map_session_init_error(&e, &config.codex_home)
})?;
let thread_id = session.conversation_id;
// This task will run until Op::Shutdown is received.
let session_for_loop = Arc::clone(&session);
let session_loop_handle = tokio::spawn(async move {
submission_loop(session_for_loop, config, rx_sub)
.instrument(info_span!("session_loop", thread_id = %thread_id))
.await;
});
let codex = Codex {
tx_sub,
rx_event,
agent_status: agent_status_rx,
session,
session_loop_termination: session_loop_termination_from_handle(session_loop_handle),
};
Ok(CodexSpawnOk { codex, thread_id })
}
/// Submit the `op` wrapped in a `Submission` with a unique ID.
pub async fn submit(&self, op: Op) -> CodexResult<String> {
self.submit_with_trace(op, /*trace*/ None).await
}
pub async fn submit_with_trace(
&self,
op: Op,
trace: Option<W3cTraceContext>,
) -> CodexResult<String> {
let id = Uuid::now_v7().to_string();
let sub = Submission {
id: id.clone(),
op,
trace,
};
self.submit_with_id(sub).await?;
Ok(id)
}
/// Use sparingly: prefer `submit()` so Codex is responsible for generating
/// unique IDs for each submission.
pub async fn submit_with_id(&self, mut sub: Submission) -> CodexResult<()> {
if sub.trace.is_none() {
sub.trace = current_span_w3c_trace_context();
}
self.tx_sub
.send(sub)
.await
.map_err(|_| CodexErr::InternalAgentDied)?;
Ok(())
}
/// Persist a thread-level memory mode update for the active session.
///
/// This is a local-only operation that updates rollout metadata directly
/// and does not involve the model.
pub async fn set_thread_memory_mode(
&self,
mode: codex_protocol::protocol::ThreadMemoryMode,
) -> anyhow::Result<()> {
handlers::persist_thread_memory_mode_update(&self.session, mode).await
}
pub async fn shutdown_and_wait(&self) -> CodexResult<()> {
let session_loop_termination = self.session_loop_termination.clone();
match self.submit(Op::Shutdown).await {
Ok(_) => {}
Err(CodexErr::InternalAgentDied) => {}
Err(err) => return Err(err),
}
session_loop_termination.await;
Ok(())
}
pub async fn next_event(&self) -> CodexResult<Event> {
let event = self
.rx_event
.recv()
.await
.map_err(|_| CodexErr::InternalAgentDied)?;
Ok(event)
}
pub async fn steer_input(
&self,
input: Vec<UserInput>,
expected_turn_id: Option<&str>,
responsesapi_client_metadata: Option<HashMap<String, String>>,
) -> Result<String, SteerInputError> {
self.session
.steer_input(input, expected_turn_id, responsesapi_client_metadata)
.await
}
pub(crate) async fn set_app_server_client_info(
&self,
app_server_client_name: Option<String>,
app_server_client_version: Option<String>,
mcp_elicitations_auto_deny: bool,
) -> ConstraintResult<()> {
self.session
.update_settings(SessionSettingsUpdate {
app_server_client_name,
app_server_client_version,
..Default::default()
})
.await?;
let mcp_connection_manager = self.session.services.mcp_connection_manager.read().await;
mcp_connection_manager.set_elicitations_auto_deny(mcp_elicitations_auto_deny);
Ok(())
}
pub(crate) async fn agent_status(&self) -> AgentStatus {
self.agent_status.borrow().clone()
}
pub(crate) async fn thread_config_snapshot(&self) -> ThreadConfigSnapshot {
let state = self.session.state.lock().await;
state.session_configuration.thread_config_snapshot()
}
pub(crate) async fn thread_environment_selections(&self) -> Vec<TurnEnvironmentSelection> {
let state = self.session.state.lock().await;
state.session_configuration.environments.clone()
}
pub(crate) fn state_db(&self) -> Option<state_db::StateDbHandle> {
self.session.state_db()
}
pub(crate) fn enabled(&self, feature: Feature) -> bool {
self.session.enabled(feature)
}
}
fn get_service_tier(
configured_service_tier: Option<String>,
fast_default_opt_out: bool,
account_plan_type: Option<AccountPlanType>,
fast_mode_enabled: bool,
) -> Option<String> {
if configured_service_tier.is_some() || fast_default_opt_out || !fast_mode_enabled {
return configured_service_tier;
}
account_plan_type
.is_some_and(is_enterprise_default_service_tier_plan)
.then_some(ServiceTier::Fast.request_value().to_string())
}
fn is_enterprise_default_service_tier_plan(plan_type: AccountPlanType) -> bool {
plan_type == AccountPlanType::Enterprise
|| plan_type.is_business_like()
|| plan_type.is_team_like()
}
#[cfg(test)]
pub(crate) fn completed_session_loop_termination() -> SessionLoopTermination {
futures::future::ready(()).boxed().shared()
}
pub(crate) fn session_loop_termination_from_handle(
handle: JoinHandle<()>,
) -> SessionLoopTermination {
async move {
let _ = handle.await;
}
.boxed()
.shared()
}
async fn thread_title_from_thread_store(
live_thread: Option<&LiveThread>,
thread_store: &Arc<dyn ThreadStore>,
conversation_id: ThreadId,
) -> Option<String> {
let thread = match live_thread {
Some(live_thread) => {
live_thread
.read_thread(
/*include_archived*/ true, /*include_history*/ false,
)
.await
}
None => {
thread_store
.read_thread(ReadThreadParams {
thread_id: conversation_id,
include_archived: true,
include_history: false,
})
.await
}
}
.ok()?;
let title = thread.name.as_deref()?.trim();
(!title.is_empty() && thread.preview.trim() != title).then(|| title.to_string())
}
impl Session {
pub(crate) async fn app_server_client_metadata(&self) -> AppServerClientMetadata {
let state = self.state.lock().await;
AppServerClientMetadata {
client_name: state.session_configuration.app_server_client_name.clone(),
client_version: state
.session_configuration
.app_server_client_version
.clone(),
}
}
pub(crate) async fn configured_multi_agent_v2_usage_hint_texts(&self) -> Vec<String> {
if !self.features.enabled(Feature::MultiAgentV2) {
return Vec::new();
}
let state = self.state.lock().await;
let config = &state.session_configuration.original_config_do_not_use;
[
config.multi_agent_v2.root_agent_usage_hint_text.clone(),
config.multi_agent_v2.subagent_usage_hint_text.clone(),
]
.into_iter()
.flatten()
.collect()
}
fn managed_network_proxy_active_for_permission_profile(
permission_profile: &PermissionProfile,
) -> bool {
!matches!(permission_profile, PermissionProfile::Disabled)
}
/// Builds the `x-codex-beta-features` header value for this session.
///
/// `ModelClient` is session-scoped and intentionally does not depend on the full `Config`, so
/// we precompute the comma-separated list of enabled experimental feature keys at session
/// creation time and thread it into the client.
fn build_model_client_beta_features_header(config: &Config) -> Option<String> {
let beta_features_header = FEATURES
.iter()
.filter_map(|spec| {
let advertise_in_model_client_header =
spec.stage.experimental_menu_description().is_some()
|| spec.id == Feature::RemoteCompactionV2;
if advertise_in_model_client_header && config.features.enabled(spec.id) {
Some(spec.key)
} else {
None
}
})
.collect::<Vec<_>>()
.join(",");
if beta_features_header.is_empty() {
None
} else {
Some(beta_features_header)
}
}
async fn start_managed_network_proxy(
spec: &crate::config::NetworkProxySpec,
exec_policy: &codex_execpolicy::Policy,
permission_profile: &PermissionProfile,
network_policy_decider: Option<Arc<dyn codex_network_proxy::NetworkPolicyDecider>>,
blocked_request_observer: Option<Arc<dyn codex_network_proxy::BlockedRequestObserver>>,
managed_network_requirements_enabled: bool,
audit_metadata: NetworkProxyAuditMetadata,
) -> anyhow::Result<(StartedNetworkProxy, SessionNetworkProxyRuntime)> {
let spec = spec
.with_exec_policy_network_rules(exec_policy)
.map_err(|err| {
tracing::warn!(
"failed to apply execpolicy network rules to managed proxy; continuing with configured network policy: {err}"
);
err
})
.unwrap_or_else(|_| spec.clone());
let network_proxy = spec
.start_proxy(
permission_profile,
network_policy_decider,
blocked_request_observer,
managed_network_requirements_enabled,
audit_metadata,
)
.await
.map_err(|err| anyhow::anyhow!("failed to start managed network proxy: {err}"))?;
let session_network_proxy = {
let proxy = network_proxy.proxy();
SessionNetworkProxyRuntime {
http_addr: proxy.http_addr().to_string(),
socks_addr: proxy.socks_addr().to_string(),
}
};
Ok((network_proxy, session_network_proxy))
}
async fn refresh_managed_network_proxy_for_current_permission_profile(&self) {
let Some(started_proxy) = self.services.network_proxy.as_ref() else {
return;
};
let Ok(_refresh_guard) = self.managed_network_proxy_refresh_lock.acquire().await else {
error!("managed network proxy refresh semaphore closed");
return;
};
let session_configuration = {
let state = self.state.lock().await;
state.session_configuration.clone()
};
let Some(spec) = session_configuration
.original_config_do_not_use
.permissions
.network
.as_ref()
else {
return;
};
let spec = match spec
.recompute_for_permission_profile(&session_configuration.permission_profile())
{
Ok(spec) => spec,
Err(err) => {
warn!("failed to rebuild managed network proxy policy for sandbox change: {err}");
return;
}
};
let current_exec_policy = self.services.exec_policy.current();
let spec = match spec.with_exec_policy_network_rules(current_exec_policy.as_ref()) {
Ok(spec) => spec,
Err(err) => {
warn!(
"failed to apply execpolicy network rules while refreshing managed network proxy: {err}"
);
spec
}
};
if let Err(err) = spec.apply_to_started_proxy(started_proxy).await {
warn!("failed to refresh managed network proxy for sandbox change: {err}");
}
}
#[cfg(test)]
pub(crate) async fn codex_home(&self) -> AbsolutePathBuf {
let state = self.state.lock().await;
state.session_configuration.codex_home().clone()
}
pub(crate) fn subscribe_out_of_band_elicitation_pause_state(&self) -> watch::Receiver<bool> {
self.out_of_band_elicitation_paused.subscribe()
}
pub(crate) fn set_out_of_band_elicitation_pause_state(&self, paused: bool) {
self.out_of_band_elicitation_paused.send_replace(paused);
}
pub(crate) fn get_tx_event(&self) -> Sender<Event> {
self.tx_event.clone()
}
pub(crate) fn state_db(&self) -> Option<state_db::StateDbHandle> {
self.services.state_db.clone()
}
pub(crate) fn live_thread_for_persistence(
&self,
operation: &str,
) -> anyhow::Result<&LiveThread> {
self.live_thread()
.ok_or_else(|| anyhow::anyhow!("Session persistence is disabled; cannot {operation}."))
}
pub(crate) fn live_thread(&self) -> Option<&LiveThread> {
self.services.live_thread.as_ref()
}
/// Flush rollout writes and return the final durability-barrier result.
pub(crate) async fn flush_rollout(&self) -> std::io::Result<()> {
if let Some(live_thread) = self.live_thread() {
live_thread.flush().await.map_err(std::io::Error::other)
} else {
Ok(())
}
}
pub(crate) async fn try_ensure_rollout_materialized(&self) -> std::io::Result<()> {
if let Some(live_thread) = self.live_thread() {
live_thread.persist().await.map_err(std::io::Error::other)?;
}
Ok(())
}
pub(crate) async fn ensure_rollout_materialized(&self) {
if let Err(e) = self.try_ensure_rollout_materialized().await {
warn!("failed to materialize thread persistence: {e}");
}
}
fn next_internal_sub_id(&self) -> String {
let id = self
.next_internal_sub_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
format!("auto-compact-{id}")
}
pub(crate) async fn route_realtime_text_input(self: &Arc<Self>, text: String) {
handlers::user_input_or_turn_inner(
self,
self.next_internal_sub_id(),
Op::UserInput {
environments: None,
items: vec![UserInput::Text {
text,
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
},
/*mirror_user_text_to_realtime*/ None,
)
.await;
}
pub(crate) async fn get_total_token_usage(&self) -> i64 {
let state = self.state.lock().await;
state.get_total_token_usage(state.server_reasoning_included())
}
pub(crate) async fn get_total_token_usage_breakdown(&self) -> TotalTokenUsageBreakdown {
let state = self.state.lock().await;
state.history.get_total_token_usage_breakdown()
}
pub(crate) async fn total_token_usage(&self) -> Option<TokenUsage> {
let state = self.state.lock().await;
state.token_info().map(|info| info.total_token_usage)
}
/// Returns the complete token usage snapshot currently cached for this session.
///
/// Resume and fork reconstruction seed this state from the last persisted rollout
/// `TokenCount` event. Callers that need to replay restored usage to a client
/// should use this accessor instead of `total_token_usage`, because the app-server
/// notification includes both total and last-turn usage.
pub(crate) async fn token_usage_info(&self) -> Option<TokenUsageInfo> {
let state = self.state.lock().await;
state.token_info()
}
pub(crate) async fn get_estimated_token_count(
&self,
turn_context: &TurnContext,
) -> Option<i64> {
let state = self.state.lock().await;
state.history.estimate_token_count(turn_context)
}
pub(crate) async fn get_base_instructions(&self) -> BaseInstructions {
let state = self.state.lock().await;
BaseInstructions {
text: state.session_configuration.base_instructions.clone(),
}
}
// Merges connector IDs into the session-level explicit connector selection.
pub(crate) async fn merge_connector_selection(
&self,
connector_ids: HashSet<String>,
) -> HashSet<String> {
let mut state = self.state.lock().await;
state.merge_connector_selection(connector_ids)
}
// Returns the connector IDs currently selected for this session.
pub(crate) async fn get_connector_selection(&self) -> HashSet<String> {
let state = self.state.lock().await;
state.get_connector_selection()
}
// Clears connector IDs that were accumulated for explicit selection.
pub(crate) async fn clear_connector_selection(&self) {
let mut state = self.state.lock().await;
state.clear_connector_selection();
}
async fn record_initial_history(&self, conversation_history: InitialHistory) {
let turn_context = self.new_default_turn().await;
let is_subagent = {
let state = self.state.lock().await;
state
.session_configuration
.session_source
.is_non_root_agent()
};
let has_prior_user_turns = initial_history_has_prior_user_turns(&conversation_history);
{
let mut state = self.state.lock().await;
state.set_next_turn_is_first(!has_prior_user_turns);
}
match conversation_history {
InitialHistory::New | InitialHistory::Cleared => {
// Defer initial context insertion until the first real turn starts so
// turn/start overrides can be merged before we write model-visible context.
self.set_previous_turn_settings(/*previous_turn_settings*/ None)
.await;
}
InitialHistory::Resumed(resumed_history) => {
let rollout_items = resumed_history.history;
let previous_turn_settings = self
.apply_rollout_reconstruction(&turn_context, &rollout_items)
.await;
// If resuming, warn when the last recorded model differs from the current one.
let curr: &str = turn_context.model_info.slug.as_str();
if let Some(prev) = previous_turn_settings
.as_ref()
.map(|settings| settings.model.as_str())
.filter(|model| *model != curr)
{
warn!("resuming session with different model: previous={prev}, current={curr}");
self.send_event(
&turn_context,
EventMsg::Warning(WarningEvent {
message: format!(
"This session was recorded with model `{prev}` but is resuming with `{curr}`. \
Consider switching back to `{prev}` as it may affect Codex performance."
),
}),
)
.await;
}
// Seed usage info from the recorded rollout so UIs can show token counts
// immediately on resume/fork.
if let Some(info) = Self::last_token_info_from_rollout(&rollout_items) {
let mut state = self.state.lock().await;
state.set_token_info(Some(info));
}
// Defer seeding the session's initial context until the first turn starts so
// turn/start overrides can be merged before we write to the rollout.
if !is_subagent {
let _ = self.flush_rollout().await;
}
}
InitialHistory::Forked(rollout_items) => {
self.apply_rollout_reconstruction(&turn_context, &rollout_items)
.await;
// Seed usage info from the recorded rollout so UIs can show token counts
// immediately on resume/fork.
if let Some(info) = Self::last_token_info_from_rollout(&rollout_items) {
let mut state = self.state.lock().await;
state.set_token_info(Some(info));
}
// If persisting, persist all rollout items as-is (the store filters).
if !rollout_items.is_empty() {
self.persist_rollout_items(&rollout_items).await;
}
// Forked threads should remain file-backed immediately after startup.
self.ensure_rollout_materialized().await;
// Flush after seeding history and any persisted rollout copy.
if !is_subagent {
let _ = self.flush_rollout().await;
}
}
}
}
async fn apply_rollout_reconstruction(
&self,
turn_context: &TurnContext,
rollout_items: &[RolloutItem],
) -> Option<PreviousTurnSettings> {
let reconstructed_rollout = self
.reconstruct_history_from_rollout(turn_context, rollout_items)
.await;
let previous_turn_settings = reconstructed_rollout.previous_turn_settings.clone();
self.replace_history(
reconstructed_rollout.history,
reconstructed_rollout.reference_context_item,
)
.await;
self.set_previous_turn_settings(previous_turn_settings.clone())
.await;
previous_turn_settings
}
fn last_token_info_from_rollout(rollout_items: &[RolloutItem]) -> Option<TokenUsageInfo> {
rollout_items.iter().rev().find_map(|item| match item {
RolloutItem::EventMsg(EventMsg::TokenCount(ev)) => ev.info.clone(),
_ => None,
})
}
async fn previous_turn_settings(&self) -> Option<PreviousTurnSettings> {
let state = self.state.lock().await;
state.previous_turn_settings()
}
pub(crate) async fn set_previous_turn_settings(
&self,
previous_turn_settings: Option<PreviousTurnSettings>,
) {
let mut state = self.state.lock().await;
state.set_previous_turn_settings(previous_turn_settings);
}
fn maybe_refresh_shell_snapshot_for_cwd(
&self,
previous_cwd: &AbsolutePathBuf,
next_cwd: &AbsolutePathBuf,
codex_home: &AbsolutePathBuf,
session_source: &SessionSource,
) {
if previous_cwd == next_cwd {
return;
}
if !self.features.enabled(Feature::ShellSnapshot) {
return;
}
if matches!(
session_source,
SessionSource::SubAgent(SubAgentSource::ThreadSpawn { .. })
) {
return;
}
ShellSnapshot::refresh_snapshot(
codex_home.clone(),
self.conversation_id,
next_cwd.clone(),
self.services.user_shell.as_ref().clone(),
self.services.shell_snapshot_tx.clone(),
self.services.session_telemetry.clone(),
self.services.state_db.clone(),
);
}
pub(crate) async fn update_settings(
&self,
updates: SessionSettingsUpdate,
) -> ConstraintResult<()> {
let (previous_cwd, permission_profile_changed, next_cwd, codex_home, session_source) = {
let mut state = self.state.lock().await;
let updated = match state.session_configuration.apply(&updates) {
Ok(updated) => updated,
Err(err) => {
warn!("rejected session settings update: {err}");
return Err(err);
}
};
let previous_cwd = state.session_configuration.cwd.clone();
let previous_permission_profile = state.session_configuration.permission_profile();
let updated_permission_profile = updated.permission_profile();
let permission_profile_changed =
previous_permission_profile != updated_permission_profile;
let next_cwd = updated.cwd.clone();
let codex_home = updated.codex_home.clone();
let session_source = updated.session_source.clone();
state.session_configuration = updated;
(
previous_cwd,
permission_profile_changed,
next_cwd,
codex_home,
session_source,
)
};
self.maybe_refresh_shell_snapshot_for_cwd(
&previous_cwd,
&next_cwd,
&codex_home,
&session_source,
);
if permission_profile_changed {
self.refresh_managed_network_proxy_for_current_permission_profile()
.await;
}
Ok(())
}
pub(crate) async fn validate_settings(
&self,
updates: &SessionSettingsUpdate,
) -> ConstraintResult<()> {
let state = self.state.lock().await;
state.session_configuration.apply(updates).map(|_| ())
}
pub(crate) async fn set_session_startup_prewarm(
&self,
startup_prewarm: SessionStartupPrewarmHandle,
) {
let mut state = self.state.lock().await;
state.set_session_startup_prewarm(startup_prewarm);
}
pub(crate) async fn take_session_startup_prewarm(&self) -> Option<SessionStartupPrewarmHandle> {
let mut state = self.state.lock().await;
state.take_session_startup_prewarm()
}
pub(crate) async fn get_config(&self) -> std::sync::Arc<Config> {
let state = self.state.lock().await;
state
.session_configuration
.original_config_do_not_use
.clone()
}
pub(crate) async fn provider(&self) -> ModelProviderInfo {
let state = self.state.lock().await;
state.session_configuration.provider.clone()
}
pub(crate) async fn refresh_runtime_config(&self, next_config: Config) {
// Refresh only the user layer from the incoming snapshot. Preserve thread-local
// layers such as request/session overrides that were present when this session
// was created.
let config = {
let mut state = self.state.lock().await;
let mut config = (*state.session_configuration.original_config_do_not_use).clone();
config.config_layer_stack = config
.config_layer_stack
.with_user_layer_from(&next_config.config_layer_stack);
config.tool_suggest =
resolve_tool_suggest_config_from_layer_stack(&config.config_layer_stack);
let config = Arc::new(config);
state.session_configuration.original_config_do_not_use = Arc::clone(&config);
config
};
self.services.skills_manager.clear_cache();
self.services.plugins_manager.clear_cache();
let hooks = build_hooks_for_config(
config.as_ref(),
self.services.plugins_manager.as_ref(),
self.services.user_shell.as_ref(),
)
.await;
let state = self.state.lock().await;
// A newer refresh may have updated the config while this hook build was in flight.
// Only publish hooks derived from the current config snapshot.
if Arc::ptr_eq(
&state.session_configuration.original_config_do_not_use,
&config,
) {
self.services.hooks.store(Arc::new(hooks));
}
}
pub(crate) async fn reload_user_config_layer(&self) {
// Refresh layer-backed runtime state for an existing session, including enabled plugin,
// skill, and hook state. Derived config fields such as feature gates and legacy notify
// settings remain session-static.
//
// Prefer `refresh_runtime_config()` when the host can already provide a materialized
// config snapshot. This file-based path exists for legacy local reload flows.
let config_toml_path = {
let state = self.state.lock().await;
state
.session_configuration
.codex_home
.join(CONFIG_TOML_FILE)
};
let user_config = match std::fs::read_to_string(&config_toml_path) {
Ok(contents) => match toml::from_str::<toml::Value>(&contents) {
Ok(config) => config,
Err(err) => {
warn!("failed to parse user config while reloading layer: {err}");
return;
}
},
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
toml::Value::Table(Default::default())
}
Err(err) => {
warn!("failed to read user config while reloading layer: {err}");
return;
}
};
let next_config = {
let state = self.state.lock().await;
let mut config = (*state.session_configuration.original_config_do_not_use).clone();
config.config_layer_stack = config
.config_layer_stack
.with_user_config(&config_toml_path, user_config);
config.tool_suggest =
resolve_tool_suggest_config_from_layer_stack(&config.config_layer_stack);
config
};
self.refresh_runtime_config(next_config).await;
}
async fn build_settings_update_items(
&self,
reference_context_item: Option<&TurnContextItem>,
current_context: &TurnContext,
) -> Vec<ResponseItem> {
// TODO: Make context updates a pure diff of persisted previous/current TurnContextItem
// state so replay/backtracking is deterministic. Runtime inputs that affect model-visible
// context (shell, exec policy, feature gates, previous-turn bridge) should be persisted
// state or explicit non-state replay events.
let previous_turn_settings = {
let state = self.state.lock().await;
state.previous_turn_settings()
};
let shell = self.user_shell();
let exec_policy = self.services.exec_policy.current();
crate::context_manager::updates::build_settings_update_items(
reference_context_item,
previous_turn_settings.as_ref(),
current_context,
shell.as_ref(),
exec_policy.as_ref(),
self.features.enabled(Feature::Personality),
)
}
/// Persist the event to rollout and send it to clients.
pub(crate) async fn send_event(&self, turn_context: &TurnContext, msg: EventMsg) {
let legacy_source = msg.clone();
self.services
.rollout_thread_trace
.record_codex_turn_event(&turn_context.sub_id, &legacy_source);
self.services
.rollout_thread_trace
.record_tool_call_event(turn_context.sub_id.clone(), &legacy_source);
let event = Event {
id: turn_context.sub_id.clone(),
msg,
};
self.send_event_raw(event).await;
self.maybe_notify_parent_of_terminal_turn(turn_context, &legacy_source)
.await;
self.maybe_mirror_event_text_to_realtime(&legacy_source)
.await;
self.maybe_clear_realtime_handoff_for_event(&legacy_source)
.await;
let show_raw_agent_reasoning = self.show_raw_agent_reasoning();
for legacy in legacy_source.as_legacy_events(show_raw_agent_reasoning) {
let legacy_event = Event {
id: turn_context.sub_id.clone(),
msg: legacy,
};
self.send_event_raw(legacy_event).await;
}
}
/// Forwards terminal turn events from spawned MultiAgentV2 children to their direct parent.
async fn maybe_notify_parent_of_terminal_turn(
&self,
turn_context: &TurnContext,
msg: &EventMsg,
) {
if !self.enabled(Feature::MultiAgentV2) {
return;
}
if !matches!(msg, EventMsg::TurnComplete(_) | EventMsg::TurnAborted(_)) {
return;
}
let SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
agent_path: Some(child_agent_path),
..
}) = &turn_context.session_source
else {
return;
};
let Some(status) = agent_status_from_event(msg) else {
return;
};
if !is_final(&status) {
return;
}
self.forward_child_completion_to_parent(
turn_context,
*parent_thread_id,
child_agent_path,
status,
)
.await;
}
/// Sends the standard completion envelope from a spawned MultiAgentV2 child to its parent.
async fn forward_child_completion_to_parent(
&self,
turn_context: &TurnContext,
parent_thread_id: ThreadId,
child_agent_path: &codex_protocol::AgentPath,
status: AgentStatus,
) {
let Some(parent_agent_path) = child_agent_path
.as_str()
.rsplit_once('/')
.and_then(|(parent, _)| codex_protocol::AgentPath::try_from(parent).ok())
else {
return;
};
let message = format_subagent_notification_message(child_agent_path.as_str(), &status);
// `communication` owns the message. Keep a second copy only when the
// recorder will actually need it after parent delivery succeeds.
let trace_message = self
.services
.rollout_thread_trace
.is_enabled()
.then(|| message.clone());
let communication = InterAgentCommunication::new(
child_agent_path.clone(),
parent_agent_path,
Vec::new(),
message,
/*trigger_turn*/ false,
);
if let Err(err) = self
.services
.agent_control
.send_inter_agent_communication(parent_thread_id, communication)
.await
{
debug!("failed to notify parent thread {parent_thread_id}: {err}");
return;
}
if let Some(message) = trace_message {
self.services
.rollout_thread_trace
.record_agent_result_interaction(
turn_context.sub_id.as_str(),
parent_thread_id,
&AgentResultTracePayload {
child_agent_path: child_agent_path.as_str(),
message: &message,
status: &status,
},
);
}
}
async fn maybe_mirror_event_text_to_realtime(&self, msg: &EventMsg) {
let Some(text) = realtime_text_for_event(msg) else {
return;
};
if self.conversation.running_state().await.is_none()
|| self.conversation.active_handoff_id().await.is_none()
{
return;
}
if let Err(err) = self.conversation.handoff_out(text).await {
debug!("failed to mirror event text to realtime conversation: {err}");
}
}
async fn maybe_clear_realtime_handoff_for_event(&self, msg: &EventMsg) {
if !matches!(msg, EventMsg::TurnComplete(_)) {
return;
}
if let Err(err) = self.conversation.handoff_complete().await {
debug!("failed to finalize realtime handoff output: {err}");
}
self.conversation.clear_active_handoff().await;
}
pub(crate) async fn send_event_raw(&self, event: Event) {
// Persist the event into rollout storage (the store filters as needed).
let rollout_items = vec![RolloutItem::EventMsg(event.msg.clone())];
self.persist_rollout_items(&rollout_items).await;
self.services
.rollout_thread_trace
.record_protocol_event(&event.msg);
self.deliver_event_raw(event).await;
}
async fn deliver_event_raw(&self, event: Event) {
// Record the last known agent status.
if let Some(status) = agent_status_from_event(&event.msg) {
self.agent_status.send_replace(status);
}
if let Err(e) = self.tx_event.send(event).await {
debug!("dropping event because channel is closed: {e}");
}
}
pub(crate) async fn emit_turn_item_started(&self, turn_context: &TurnContext, item: &TurnItem) {
self.send_event(
turn_context,
EventMsg::ItemStarted(ItemStartedEvent {
thread_id: self.conversation_id,
turn_id: turn_context.sub_id.clone(),
item: item.clone(),
started_at_ms: now_unix_timestamp_ms(),
}),
)
.await;
}
pub(crate) async fn emit_turn_item_completed(
&self,
turn_context: &TurnContext,
item: TurnItem,
) {
record_turn_ttfm_metric(turn_context, &item).await;
self.send_event(
turn_context,
EventMsg::ItemCompleted(ItemCompletedEvent {
thread_id: self.conversation_id,
turn_id: turn_context.sub_id.clone(),
item,
completed_at_ms: now_unix_timestamp_ms(),
}),
)
.await;
}
/// Adds an execpolicy amendment to both the in-memory and on-disk policies so future
/// commands can use the newly approved prefix.
pub(crate) async fn persist_execpolicy_amendment(
&self,
amendment: &ExecPolicyAmendment,
) -> Result<(), ExecPolicyUpdateError> {
let codex_home = self
.state
.lock()
.await
.session_configuration
.codex_home()
.clone();
self.services
.exec_policy
.append_amendment_and_update(&codex_home, amendment)
.await?;
Ok(())
}
pub(crate) async fn turn_context_for_sub_id(&self, sub_id: &str) -> Option<Arc<TurnContext>> {
let active = self.active_turn.lock().await;
active
.as_ref()
.and_then(|turn| turn.tasks.get(sub_id))
.map(|task| Arc::clone(&task.turn_context))
}
async fn active_turn_context_and_cancellation_token(
&self,
) -> Option<(Arc<TurnContext>, CancellationToken)> {
let active = self.active_turn.lock().await;
let (_, task) = active.as_ref()?.tasks.first()?;
Some((
Arc::clone(&task.turn_context),
task.cancellation_token.child_token(),
))
}
pub(crate) async fn record_execpolicy_amendment_message(
&self,
sub_id: &str,
amendment: &ExecPolicyAmendment,
) {
let Some(prefixes) = format_allow_prefixes(vec![amendment.command.clone()]) else {
warn!("execpolicy amendment for {sub_id} had no command prefix");
return;
};
let fragment = ApprovedCommandPrefixSaved::new(prefixes);
let text = fragment.render();
let message: ResponseItem = ContextualUserFragment::into(fragment);
if let Some(turn_context) = self.turn_context_for_sub_id(sub_id).await {
self.record_conversation_items(&turn_context, std::slice::from_ref(&message))
.await;
return;
}
if self
.inject_response_items(vec![ResponseInputItem::Message {
role: "developer".to_string(),
content: vec![ContentItem::InputText { text }],
phase: None,
}])
.await
.is_err()
{
warn!("no active turn found to record execpolicy amendment message for {sub_id}");
}
}
pub(crate) async fn persist_network_policy_amendment(
&self,
amendment: &NetworkPolicyAmendment,
network_approval_context: &NetworkApprovalContext,
) -> anyhow::Result<()> {
let _refresh_guard = self
.managed_network_proxy_refresh_lock
.acquire()
.await
.map_err(|_| anyhow::anyhow!("managed network proxy refresh semaphore closed"))?;
let host =
Self::validated_network_policy_amendment_host(amendment, network_approval_context)?;
let codex_home = self
.state
.lock()
.await
.session_configuration
.codex_home()
.clone();
let execpolicy_amendment =
execpolicy_network_rule_amendment(amendment, network_approval_context, &host);
if let Some(started_network_proxy) = self.services.network_proxy.as_ref() {
let proxy = started_network_proxy.proxy();
match amendment.action {
NetworkPolicyRuleAction::Allow => proxy
.add_allowed_domain(&host)
.await
.map_err(|err| anyhow::anyhow!("failed to update runtime allowlist: {err}"))?,
NetworkPolicyRuleAction::Deny => proxy
.add_denied_domain(&host)
.await
.map_err(|err| anyhow::anyhow!("failed to update runtime denylist: {err}"))?,
}
}
self.services
.exec_policy
.append_network_rule_and_update(
&codex_home,
&host,
execpolicy_amendment.protocol,
execpolicy_amendment.decision,
Some(execpolicy_amendment.justification),
)
.await
.map_err(|err| {
anyhow::anyhow!("failed to persist network policy amendment to execpolicy: {err}")
})?;
Ok(())
}
fn validated_network_policy_amendment_host(
amendment: &NetworkPolicyAmendment,
network_approval_context: &NetworkApprovalContext,
) -> anyhow::Result<String> {
let approved_host = normalize_host(&network_approval_context.host);
let amendment_host = normalize_host(&amendment.host);
if amendment_host != approved_host {
return Err(anyhow::anyhow!(
"network policy amendment host '{}' does not match approved host '{}'",
amendment.host,
network_approval_context.host
));
}
Ok(approved_host)
}
pub(crate) async fn record_network_policy_amendment_message(
&self,
sub_id: &str,
amendment: &NetworkPolicyAmendment,
) {
let fragment = NetworkRuleSaved::new(amendment);
let text = fragment.render();
let message: ResponseItem = ContextualUserFragment::into(fragment);
if let Some(turn_context) = self.turn_context_for_sub_id(sub_id).await {
self.record_conversation_items(&turn_context, std::slice::from_ref(&message))
.await;
return;
}
if self
.inject_response_items(vec![ResponseInputItem::Message {
role: "developer".to_string(),
content: vec![ContentItem::InputText { text }],
phase: None,
}])
.await
.is_err()
{
warn!("no active turn found to record network policy amendment message for {sub_id}");
}
}
/// Emit an exec approval request event and await the user's decision.
///
/// The request is keyed by `call_id` + `approval_id` so matching responses
/// are delivered to the correct in-flight turn. If the pending approval is
/// cleared before a response arrives, treat it as an abort so interrupted
/// turns do not continue on a synthetic denial.
///
/// Note that if `available_decisions` is `None`, then the other fields will
/// be used to derive the available decisions via
/// [ExecApprovalRequestEvent::default_available_decisions].
#[allow(clippy::too_many_arguments)]
#[expect(
clippy::await_holding_invalid_type,
reason = "active turn checks and turn state updates must remain atomic"
)]
pub async fn request_command_approval(
&self,
turn_context: &TurnContext,
call_id: String,
approval_id: Option<String>,
command: Vec<String>,
cwd: AbsolutePathBuf,
reason: Option<String>,
network_approval_context: Option<NetworkApprovalContext>,
proposed_execpolicy_amendment: Option<ExecPolicyAmendment>,
additional_permissions: Option<AdditionalPermissionProfile>,
available_decisions: Option<Vec<ReviewDecision>>,
) -> ReviewDecision {
// command-level approvals use `call_id`.
// `approval_id` is only present for subcommand callbacks (execve intercept)
let effective_approval_id = approval_id.clone().unwrap_or_else(|| call_id.clone());
// Add the tx_approve callback to the map before sending the request.
let (tx_approve, rx_approve) = oneshot::channel();
let prev_entry = {
let mut active = self.active_turn.lock().await;
match active.as_mut() {
Some(at) => {
let mut ts = at.turn_state.lock().await;
ts.insert_pending_approval(effective_approval_id.clone(), tx_approve)
}
None => None,
}
};
if prev_entry.is_some() {
warn!("Overwriting existing pending approval for call_id: {effective_approval_id}");
}
let parsed_cmd = parse_command(&command);
let proposed_network_policy_amendments = network_approval_context.as_ref().map(|context| {
vec![
NetworkPolicyAmendment {
host: context.host.clone(),
action: NetworkPolicyRuleAction::Allow,
},
NetworkPolicyAmendment {
host: context.host.clone(),
action: NetworkPolicyRuleAction::Deny,
},
]
});
let available_decisions = available_decisions.unwrap_or_else(|| {
ExecApprovalRequestEvent::default_available_decisions(
network_approval_context.as_ref(),
proposed_execpolicy_amendment.as_ref(),
proposed_network_policy_amendments.as_deref(),
additional_permissions.as_ref(),
)
});
let event = EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
call_id,
approval_id,
turn_id: turn_context.sub_id.clone(),
started_at_ms: now_unix_timestamp_ms(),
command,
cwd,
reason,
network_approval_context,
proposed_execpolicy_amendment,
proposed_network_policy_amendments,
additional_permissions,
available_decisions: Some(available_decisions),
parsed_cmd,
});
self.send_event(turn_context, event).await;
rx_approve.await.unwrap_or(ReviewDecision::Abort)
}
#[expect(
clippy::await_holding_invalid_type,
reason = "active turn checks and turn state updates must remain atomic"
)]
pub async fn request_patch_approval(
&self,
turn_context: &TurnContext,
call_id: String,
changes: HashMap<PathBuf, FileChange>,
reason: Option<String>,
grant_root: Option<PathBuf>,
) -> oneshot::Receiver<ReviewDecision> {
// Add the tx_approve callback to the map before sending the request.
let (tx_approve, rx_approve) = oneshot::channel();
let approval_id = call_id.clone();
let prev_entry = {
let mut active = self.active_turn.lock().await;
match active.as_mut() {
Some(at) => {
let mut ts = at.turn_state.lock().await;
ts.insert_pending_approval(approval_id.clone(), tx_approve)
}
None => None,
}
};
if prev_entry.is_some() {
warn!("Overwriting existing pending approval for call_id: {approval_id}");
}
let event = EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id,
turn_id: turn_context.sub_id.clone(),
started_at_ms: now_unix_timestamp_ms(),
changes,
reason,
grant_root,
});
self.send_event(turn_context, event).await;
rx_approve
}
pub async fn request_permissions(
self: &Arc<Self>,
turn_context: &Arc<TurnContext>,
call_id: String,
args: RequestPermissionsArgs,
cancellation_token: CancellationToken,
) -> Option<RequestPermissionsResponse> {
self.request_permissions_for_cwd(
turn_context,
call_id,
args,
turn_context.cwd.clone(),
cancellation_token,
)
.await
}
#[expect(
clippy::await_holding_invalid_type,
reason = "active turn checks and turn state updates must remain atomic"
)]
pub(crate) async fn request_permissions_for_cwd(
self: &Arc<Self>,
turn_context: &Arc<TurnContext>,
call_id: String,
args: RequestPermissionsArgs,
cwd: AbsolutePathBuf,
cancellation_token: CancellationToken,
) -> Option<RequestPermissionsResponse> {
match turn_context.as_ref().approval_policy.value() {
AskForApproval::Never => {
return Some(RequestPermissionsResponse {
permissions: RequestPermissionProfile::default(),
scope: PermissionGrantScope::Turn,
strict_auto_review: false,
});
}
AskForApproval::Granular(granular_config)
if !granular_config.allows_request_permissions() =>
{
return Some(RequestPermissionsResponse {
permissions: RequestPermissionProfile::default(),
scope: PermissionGrantScope::Turn,
strict_auto_review: false,
});
}
AskForApproval::OnFailure
| AskForApproval::OnRequest
| AskForApproval::UnlessTrusted
| AskForApproval::Granular(_) => {}
}
let requested_permissions = args.permissions;
if crate::guardian::routes_approval_to_guardian(turn_context.as_ref()) {
let originating_turn_state = {
let active = self.active_turn.lock().await;
active.as_ref().map(|active| Arc::clone(&active.turn_state))
};
let review_id = crate::guardian::new_guardian_review_id();
let session = Arc::clone(self);
let turn = Arc::clone(turn_context);
let request = crate::guardian::GuardianApprovalRequest::RequestPermissions {
id: call_id,
turn_id: turn_context.sub_id.clone(),
reason: args.reason,
permissions: requested_permissions.clone(),
};
let review_rx = crate::guardian::spawn_approval_request_review(
session,
turn,
review_id,
request,
/*retry_reason*/ None,
codex_analytics::GuardianApprovalRequestSource::MainTurn,
cancellation_token.clone(),
);
let decision = tokio::select! {
biased;
_ = cancellation_token.cancelled() => return None,
decision = review_rx => decision.unwrap_or(ReviewDecision::Denied),
};
let response = match decision {
ReviewDecision::Approved | ReviewDecision::ApprovedExecpolicyAmendment { .. } => {
RequestPermissionsResponse {
permissions: requested_permissions.clone(),
scope: PermissionGrantScope::Turn,
strict_auto_review: false,
}
}
ReviewDecision::ApprovedForSession => RequestPermissionsResponse {
permissions: requested_permissions.clone(),
scope: PermissionGrantScope::Session,
strict_auto_review: false,
},
ReviewDecision::NetworkPolicyAmendment {
network_policy_amendment,
} => match network_policy_amendment.action {
NetworkPolicyRuleAction::Allow => RequestPermissionsResponse {
permissions: requested_permissions.clone(),
scope: PermissionGrantScope::Turn,
strict_auto_review: false,
},
NetworkPolicyRuleAction::Deny => RequestPermissionsResponse {
permissions: RequestPermissionProfile::default(),
scope: PermissionGrantScope::Turn,
strict_auto_review: false,
},
},
ReviewDecision::Abort | ReviewDecision::Denied | ReviewDecision::TimedOut => {
RequestPermissionsResponse {
permissions: RequestPermissionProfile::default(),
scope: PermissionGrantScope::Turn,
strict_auto_review: false,
}
}
};
let response = Self::normalize_request_permissions_response(
requested_permissions,
response,
cwd.as_path(),
);
self.record_granted_request_permissions_for_turn(
&response,
originating_turn_state.as_ref(),
)
.await;
return Some(response);
}
let (tx_response, rx_response) = oneshot::channel();
let prev_entry = {
let mut active = self.active_turn.lock().await;
match active.as_mut() {
Some(at) => {
let mut ts = at.turn_state.lock().await;
ts.insert_pending_request_permissions(
call_id.clone(),
PendingRequestPermissions {
tx_response,
requested_permissions: requested_permissions.clone(),
cwd: cwd.clone(),
},
)
}
None => None,
}
};
if prev_entry.is_some() {
warn!("Overwriting existing pending request_permissions for call_id: {call_id}");
}
let event = EventMsg::RequestPermissions(RequestPermissionsEvent {
call_id: call_id.clone(),
turn_id: turn_context.sub_id.clone(),
started_at_ms: now_unix_timestamp_ms(),
reason: args.reason,
permissions: requested_permissions,
cwd: Some(cwd),
});
self.send_event(turn_context.as_ref(), event).await;
tokio::select! {
biased;
_ = cancellation_token.cancelled() => {
let mut active = self.active_turn.lock().await;
if let Some(at) = active.as_mut() {
let mut ts = at.turn_state.lock().await;
let _ = ts.remove_pending_request_permissions(&call_id);
}
None
}
response = rx_response => response.ok(),
}
}
#[expect(
clippy::await_holding_invalid_type,
reason = "active turn checks and turn state updates must remain atomic"
)]
pub async fn request_user_input(
&self,
turn_context: &TurnContext,
call_id: String,
args: RequestUserInputArgs,
) -> Option<RequestUserInputResponse> {
let sub_id = turn_context.sub_id.clone();
let (tx_response, rx_response) = oneshot::channel();
let event_id = sub_id.clone();
let prev_entry = {
let mut active = self.active_turn.lock().await;
match active.as_mut() {
Some(at) => {
let mut ts = at.turn_state.lock().await;
ts.insert_pending_user_input(sub_id, tx_response)
}
None => None,
}
};
if prev_entry.is_some() {
warn!("Overwriting existing pending user input for sub_id: {event_id}");
}
let event = EventMsg::RequestUserInput(RequestUserInputEvent {
call_id,
turn_id: turn_context.sub_id.clone(),
questions: args.questions,
});
self.send_event(turn_context, event).await;
rx_response.await.ok()
}
#[expect(
clippy::await_holding_invalid_type,
reason = "active turn checks and turn state updates must remain atomic"
)]
pub async fn notify_user_input_response(
&self,
sub_id: &str,
response: RequestUserInputResponse,
) {
let entry = {
let mut active = self.active_turn.lock().await;
match active.as_mut() {
Some(at) => {
let mut ts = at.turn_state.lock().await;
ts.remove_pending_user_input(sub_id)
}
None => None,
}
};
match entry {
Some(tx_response) => {
tx_response.send(response).ok();
}
None => {
warn!("No pending user input found for sub_id: {sub_id}");
}
}
}
#[expect(
clippy::await_holding_invalid_type,
reason = "active turn checks and turn state updates must remain atomic"
)]
pub async fn notify_request_permissions_response(
&self,
call_id: &str,
response: RequestPermissionsResponse,
) {
let (entry, originating_turn_state) = {
let mut active = self.active_turn.lock().await;
match active.as_mut() {
Some(at) => {
let mut ts = at.turn_state.lock().await;
let entry = ts.remove_pending_request_permissions(call_id);
let originating_turn_state = entry.as_ref().map(|_| Arc::clone(&at.turn_state));
(entry, originating_turn_state)
}
None => (None, None),
}
};
match entry {
Some(entry) => {
let response = Self::normalize_request_permissions_response(
entry.requested_permissions,
response,
entry.cwd.as_path(),
);
self.record_granted_request_permissions_for_turn(
&response,
originating_turn_state.as_ref(),
)
.await;
entry.tx_response.send(response).ok();
}
None => {
warn!("No pending request_permissions found for call_id: {call_id}");
}
}
}
fn normalize_request_permissions_response(
requested_permissions: RequestPermissionProfile,
response: RequestPermissionsResponse,
cwd: &Path,
) -> RequestPermissionsResponse {
if response.strict_auto_review && matches!(response.scope, PermissionGrantScope::Session) {
return RequestPermissionsResponse {
permissions: RequestPermissionProfile::default(),
scope: PermissionGrantScope::Turn,
strict_auto_review: false,
};
}
if response.permissions.is_empty() {
return response;
}
RequestPermissionsResponse {
permissions: intersect_permission_profiles(
requested_permissions.into(),
response.permissions.into(),
cwd,
)
.into(),
scope: response.scope,
strict_auto_review: response.strict_auto_review,
}
}
async fn record_granted_request_permissions_for_turn(
&self,
response: &RequestPermissionsResponse,
originating_turn_state: Option<&Arc<Mutex<crate::state::TurnState>>>,
) {
if response.permissions.is_empty() {
return;
}
match response.scope {
PermissionGrantScope::Turn => {
if let Some(turn_state) = originating_turn_state {
let mut ts = turn_state.lock().await;
let permissions: AdditionalPermissionProfile =
response.permissions.clone().into();
ts.record_granted_permissions(permissions);
if response.strict_auto_review {
ts.enable_strict_auto_review();
}
}
}
PermissionGrantScope::Session => {
let mut state = self.state.lock().await;
state.record_granted_permissions(response.permissions.clone().into());
}
}
}
#[expect(
clippy::await_holding_invalid_type,
reason = "active turn reads must stay consistent with the matching turn state"
)]
pub(crate) async fn granted_turn_permissions(&self) -> Option<AdditionalPermissionProfile> {
let active = self.active_turn.lock().await;
let active = active.as_ref()?;
let ts = active.turn_state.lock().await;
ts.granted_permissions()
}
#[expect(
clippy::await_holding_invalid_type,
reason = "active turn reads must stay consistent with the matching turn state"
)]
pub(crate) async fn strict_auto_review_enabled_for_turn(&self) -> bool {
let active = self.active_turn.lock().await;
let Some(active) = active.as_ref() else {
return false;
};
let ts = active.turn_state.lock().await;
ts.strict_auto_review_enabled()
}
pub(crate) async fn granted_session_permissions(&self) -> Option<AdditionalPermissionProfile> {
let state = self.state.lock().await;
state.granted_permissions()
}
#[expect(
clippy::await_holding_invalid_type,
reason = "active turn checks and turn state updates must remain atomic"
)]
pub async fn notify_dynamic_tool_response(&self, call_id: &str, response: DynamicToolResponse) {
let entry = {
let mut active = self.active_turn.lock().await;
match active.as_mut() {
Some(at) => {
let mut ts = at.turn_state.lock().await;
ts.remove_pending_dynamic_tool(call_id)
}
None => None,
}
};
match entry {
Some(tx_response) => {
tx_response.send(response).ok();
}
None => {
warn!("No pending dynamic tool call found for call_id: {call_id}");
}
}
}
#[expect(
clippy::await_holding_invalid_type,
reason = "active turn checks and turn state updates must remain atomic"
)]
pub async fn notify_approval(&self, approval_id: &str, decision: ReviewDecision) {
let entry = {
let mut active = self.active_turn.lock().await;
match active.as_mut() {
Some(at) => {
let mut ts = at.turn_state.lock().await;
ts.remove_pending_approval(approval_id)
}
None => None,
}
};
match entry {
Some(tx_approve) => {
tx_approve.send(decision).ok();
}
None => {
warn!("No pending approval found for call_id: {approval_id}");
}
}
}
/// Records input items: always append to conversation history and
/// persist these response items to rollout.
pub(crate) async fn record_conversation_items(
&self,
turn_context: &TurnContext,
items: &[ResponseItem],
) {
self.record_into_history(items, turn_context).await;
self.persist_rollout_response_items(items).await;
self.send_raw_response_items(turn_context, items).await;
}
/// Append ResponseItems to the in-memory conversation history only.
pub(crate) async fn record_into_history(
&self,
items: &[ResponseItem],
turn_context: &TurnContext,
) {
let mut state = self.state.lock().await;
state.record_items(items.iter(), turn_context.truncation_policy);
}
pub(crate) async fn record_model_warning(&self, message: impl Into<String>, ctx: &TurnContext) {
self.services
.session_telemetry
.counter("codex.model_warning", /*inc*/ 1, &[]);
let item = ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: format!("Warning: {}", message.into()),
}],
phase: None,
};
self.record_conversation_items(ctx, &[item]).await;
}
async fn maybe_warn_on_server_model_mismatch(
self: &Arc<Self>,
turn_context: &Arc<TurnContext>,
server_model: String,
) -> bool {
let requested_model = turn_context.model_info.slug.clone();
let server_model_normalized = server_model.to_ascii_lowercase();
let requested_model_normalized = requested_model.to_ascii_lowercase();
if server_model_normalized == requested_model_normalized {
info!("server reported model {server_model} (matches requested model)");
return false;
}
warn!("server reported model {server_model} while requested model was {requested_model}");
let warning_message = format!(
"Your account was flagged for potentially high-risk cyber activity and this request was routed to gpt-5.2 as a fallback. To regain access to gpt-5.3-codex, apply for trusted access: {CYBER_VERIFY_URL} or learn more: {CYBER_SAFETY_URL}"
);
self.send_event(
turn_context,
EventMsg::ModelReroute(ModelRerouteEvent {
from_model: requested_model.clone(),
to_model: server_model.clone(),
reason: ModelRerouteReason::HighRiskCyberActivity,
}),
)
.await;
self.send_event(
turn_context,
EventMsg::Warning(WarningEvent {
message: warning_message.clone(),
}),
)
.await;
self.record_model_warning(warning_message, turn_context)
.await;
true
}
pub(crate) async fn emit_model_verification(
self: &Arc<Self>,
turn_context: &Arc<TurnContext>,
verifications: Vec<ModelVerification>,
) {
self.send_event(
turn_context,
EventMsg::ModelVerification(ModelVerificationEvent { verifications }),
)
.await;
}
pub(crate) async fn replace_history(
&self,
items: Vec<ResponseItem>,
reference_context_item: Option<TurnContextItem>,
) {
let mut state = self.state.lock().await;
state.replace_history(items, reference_context_item);
}
pub(crate) async fn replace_compacted_history(
&self,
items: Vec<ResponseItem>,
reference_context_item: Option<TurnContextItem>,
compacted_item: CompactedItem,
) {
self.replace_history(items, reference_context_item.clone())
.await;
self.persist_rollout_items(&[RolloutItem::Compacted(compacted_item)])
.await;
if let Some(turn_context_item) = reference_context_item {
self.persist_rollout_items(&[RolloutItem::TurnContext(turn_context_item)])
.await;
}
self.services.model_client.advance_window_generation();
}
async fn persist_rollout_response_items(&self, items: &[ResponseItem]) {
let rollout_items: Vec<RolloutItem> = items
.iter()
.cloned()
.map(RolloutItem::ResponseItem)
.collect();
self.persist_rollout_items(&rollout_items).await;
}
pub fn enabled(&self, feature: Feature) -> bool {
self.features.enabled(feature)
}
pub(crate) fn features(&self) -> ManagedFeatures {
self.features.clone()
}
pub(crate) async fn collaboration_mode(&self) -> CollaborationMode {
let state = self.state.lock().await;
state.session_configuration.collaboration_mode.clone()
}
async fn send_raw_response_items(&self, turn_context: &TurnContext, items: &[ResponseItem]) {
for item in items {
self.send_event(
turn_context,
EventMsg::RawResponseItem(RawResponseItemEvent { item: item.clone() }),
)
.await;
}
}
#[expect(
clippy::await_holding_invalid_type,
reason = "MCP app context rendering reads through the session-owned manager guard"
)]
pub(crate) async fn build_initial_context(
&self,
turn_context: &TurnContext,
) -> Vec<ResponseItem> {
let mut developer_sections = Vec::<String>::with_capacity(8);
let mut contextual_user_sections = Vec::<String>::with_capacity(2);
let mut separate_developer_sections = Vec::<String>::new();
let (
reference_context_item,
previous_turn_settings,
collaboration_mode,
base_instructions,
session_source,
) = {
let state = self.state.lock().await;
(
state.reference_context_item(),
state.previous_turn_settings(),
state.session_configuration.collaboration_mode.clone(),
state.session_configuration.base_instructions.clone(),
state.session_configuration.session_source.clone(),
)
};
if let Some(model_switch_message) =
crate::context_manager::updates::build_model_instructions_update_item(
previous_turn_settings.as_ref(),
turn_context,
)
{
developer_sections.push(model_switch_message);
}
if turn_context.config.include_permissions_instructions {
developer_sections.push(
PermissionsInstructions::from_permission_profile(
&turn_context.permission_profile,
turn_context.approval_policy.value(),
turn_context.config.approvals_reviewer,
self.services.exec_policy.current().as_ref(),
&turn_context.cwd,
turn_context
.features
.enabled(Feature::ExecPermissionApprovals),
turn_context
.features
.enabled(Feature::RequestPermissionsTool),
)
.render(),
);
}
let separate_guardian_developer_message =
crate::guardian::is_guardian_reviewer_source(&session_source);
// Keep the guardian policy prompt out of the aggregated developer bundle so it
// stays isolated as its own top-level developer message for guardian subagents.
if !separate_guardian_developer_message
&& let Some(developer_instructions) = turn_context.developer_instructions.as_deref()
&& !developer_instructions.is_empty()
{
developer_sections.push(developer_instructions.to_string());
}
// Add developer instructions for memories.
if turn_context.features.enabled(Feature::MemoryTool)
&& turn_context.config.memories.use_memories
&& let Some(memory_prompt) =
build_memory_tool_developer_instructions(&turn_context.config.codex_home).await
{
developer_sections.push(memory_prompt);
}
// Add developer instructions from collaboration_mode if they exist and are non-empty
if let Some(collab_instructions) =
CollaborationModeInstructions::from_collaboration_mode(&collaboration_mode)
{
developer_sections.push(collab_instructions.render());
}
if let Some(realtime_update) = crate::context_manager::updates::build_initial_realtime_item(
reference_context_item.as_ref(),
previous_turn_settings.as_ref(),
turn_context,
) {
developer_sections.push(realtime_update);
}
if self.features.enabled(Feature::Personality)
&& let Some(personality) = turn_context.personality
{
let model_info = turn_context.model_info.clone();
let has_baked_personality = model_info.supports_personality()
&& base_instructions == model_info.get_model_instructions(Some(personality));
if !has_baked_personality
&& let Some(personality_message) =
crate::context_manager::updates::personality_message_for(
&model_info,
personality,
)
{
developer_sections
.push(PersonalitySpecInstructions::new(personality_message).render());
}
}
if turn_context.config.include_apps_instructions && turn_context.apps_enabled() {
let mcp_connection_manager = self.services.mcp_connection_manager.read().await;
let accessible_and_enabled_connectors =
connectors::list_accessible_and_enabled_connectors_from_manager(
&mcp_connection_manager,
&turn_context.config,
)
.await;
if let Some(apps_instructions) =
AppsInstructions::from_connectors(&accessible_and_enabled_connectors)
{
developer_sections.push(apps_instructions.render());
}
}
if turn_context.config.include_skill_instructions {
let available_skills = build_available_skills(
&turn_context.turn_skills.outcome,
default_skill_metadata_budget(turn_context.model_info.context_window),
SkillRenderSideEffects::ThreadStart {
session_telemetry: &self.services.session_telemetry,
},
);
if let Some(available_skills) = available_skills {
let warning_message = available_skills.warning_message.clone();
let skills_instructions = AvailableSkillsInstructions::from(available_skills);
if let Some(warning_message) = warning_message {
self.send_event_raw(Event {
id: String::new(),
msg: EventMsg::Warning(WarningEvent {
message: warning_message,
}),
})
.await;
}
developer_sections.push(skills_instructions.render());
}
}
let loaded_plugins = self
.services
.plugins_manager
.plugins_for_config(&turn_context.config.plugins_config_input())
.await;
if let Some(plugin_instructions) =
AvailablePluginsInstructions::from_plugins(loaded_plugins.capability_summaries())
{
developer_sections.push(plugin_instructions.render());
}
for contributor in self.services.extensions.context_contributors() {
for fragment in contributor.contribute(
&self.services.session_extension_data,
&self.services.thread_extension_data,
) {
match fragment.slot() {
PromptSlot::DeveloperPolicy | PromptSlot::DeveloperCapabilities => {
developer_sections.push(fragment.text().to_string());
}
PromptSlot::ContextualUser => {
contextual_user_sections.push(fragment.text().to_string());
}
PromptSlot::SeparateDeveloper => {
separate_developer_sections.push(fragment.text().to_string());
}
}
}
}
if let Some(user_instructions) = turn_context.user_instructions.as_deref() {
contextual_user_sections.push(
UserInstructions {
text: user_instructions.to_string(),
directory: turn_context.cwd.to_string_lossy().into_owned(),
}
.render(),
);
}
if turn_context.config.include_environment_context {
let shell = self.user_shell();
let subagents = self
.services
.agent_control
.format_environment_context_subagents(self.conversation_id)
.await;
contextual_user_sections.push(
crate::context::EnvironmentContext::from_turn_context(turn_context, shell.as_ref())
.with_subagents(subagents)
.render(),
);
}
let multi_agent_v2_usage_hint_text =
multi_agents::usage_hint_text(turn_context, &session_source);
let mut items = Vec::with_capacity(4);
if let Some(developer_message) =
crate::context_manager::updates::build_developer_update_item(developer_sections)
{
items.push(developer_message);
}
for section in separate_developer_sections {
if let Some(developer_message) =
crate::context_manager::updates::build_developer_update_item(vec![section])
{
items.push(developer_message);
}
}
if let Some(usage_hint_text) = multi_agent_v2_usage_hint_text
&& let Some(usage_hint_message) =
crate::context_manager::updates::build_developer_update_item(vec![
usage_hint_text.to_string(),
])
{
items.push(usage_hint_message);
}
if let Some(contextual_user_message) =
crate::context_manager::updates::build_contextual_user_message(contextual_user_sections)
{
items.push(contextual_user_message);
}
// Emit the guardian policy prompt as a separate developer item so the guardian
// subagent sees a distinct, easy-to-audit instruction block.
if separate_guardian_developer_message
&& let Some(developer_instructions) = turn_context.developer_instructions.as_deref()
&& !developer_instructions.is_empty()
&& let Some(guardian_developer_message) =
crate::context_manager::updates::build_developer_update_item(vec![
developer_instructions.to_string(),
])
{
items.push(guardian_developer_message);
}
items
}
pub(crate) async fn persist_rollout_items(&self, items: &[RolloutItem]) {
if let Some(live_thread) = self.live_thread()
&& let Err(e) = live_thread.append_items(items).await
{
error!("failed to record rollout items: {e:#}");
}
}
pub(crate) async fn clone_history(&self) -> ContextManager {
let state = self.state.lock().await;
state.clone_history()
}
pub(crate) async fn reference_context_item(&self) -> Option<TurnContextItem> {
let state = self.state.lock().await;
state.reference_context_item()
}
/// Persist the latest turn context snapshot for the first real user turn and for
/// steady-state turns that emit model-visible context updates.
///
/// When the reference snapshot is missing, this injects full initial context. Otherwise, it
/// emits only settings diff items.
///
/// If full context is injected and a model switch occurred, this prepends the
/// `<model_switch>` developer message so model-specific instructions are not lost.
///
/// This is the normal runtime path that establishes a new `reference_context_item`.
/// Mid-turn compaction is the other path that can re-establish that baseline when it
/// reinjects full initial context into replacement history. Other non-regular tasks
/// intentionally do not update the baseline.
pub(crate) async fn record_context_updates_and_set_reference_context_item(
&self,
turn_context: &TurnContext,
) {
let reference_context_item = {
let state = self.state.lock().await;
state.reference_context_item()
};
let should_inject_full_context = reference_context_item.is_none();
let context_items = if should_inject_full_context {
self.build_initial_context(turn_context).await
} else {
// Steady-state path: append only context diffs to minimize token overhead.
self.build_settings_update_items(reference_context_item.as_ref(), turn_context)
.await
};
let turn_context_item = turn_context.to_turn_context_item();
if !context_items.is_empty() {
self.record_conversation_items(turn_context, &context_items)
.await;
}
// Persist one `TurnContextItem` per real user turn so resume/lazy replay can recover the
// latest durable baseline even when this turn emitted no model-visible context diffs.
self.persist_rollout_items(&[RolloutItem::TurnContext(turn_context_item.clone())])
.await;
// Advance the in-memory diff baseline even when this turn emitted no model-visible
// context items. This keeps later runtime diffing aligned with the current turn state.
let mut state = self.state.lock().await;
state.set_reference_context_item(Some(turn_context_item));
}
pub(crate) async fn update_token_usage_info(
&self,
turn_context: &TurnContext,
token_usage: Option<&TokenUsage>,
) {
if let Some(token_usage) = token_usage {
let mut state = self.state.lock().await;
state.update_token_info_from_usage(token_usage, turn_context.model_context_window());
}
self.send_token_count_event(turn_context).await;
}
pub(crate) async fn recompute_token_usage(&self, turn_context: &TurnContext) {
let history = self.clone_history().await;
let base_instructions = self.get_base_instructions().await;
let Some(estimated_total_tokens) =
history.estimate_token_count_with_base_instructions(&base_instructions)
else {
return;
};
{
let mut state = self.state.lock().await;
let mut info = state.token_info().unwrap_or(TokenUsageInfo {
total_token_usage: TokenUsage::default(),
last_token_usage: TokenUsage::default(),
model_context_window: None,
});
info.last_token_usage = TokenUsage {
input_tokens: 0,
cached_input_tokens: 0,
output_tokens: 0,
reasoning_output_tokens: 0,
total_tokens: estimated_total_tokens.max(0),
};
if let Some(model_context_window) = turn_context.model_context_window() {
info.model_context_window = Some(model_context_window);
}
state.set_token_info(Some(info));
}
self.send_token_count_event(turn_context).await;
}
pub(crate) async fn update_rate_limits(
&self,
turn_context: &TurnContext,
new_rate_limits: RateLimitSnapshot,
) {
{
let mut state = self.state.lock().await;
state.set_rate_limits(new_rate_limits);
}
self.send_token_count_event(turn_context).await;
}
pub(crate) async fn mcp_dependency_prompted(&self) -> HashSet<String> {
let state = self.state.lock().await;
state.mcp_dependency_prompted()
}
pub(crate) async fn record_mcp_dependency_prompted<I>(&self, names: I)
where
I: IntoIterator<Item = String>,
{
let mut state = self.state.lock().await;
state.record_mcp_dependency_prompted(names);
}
pub async fn dependency_env(&self) -> HashMap<String, String> {
let state = self.state.lock().await;
state.dependency_env()
}
pub async fn set_dependency_env(&self, values: HashMap<String, String>) {
let mut state = self.state.lock().await;
state.set_dependency_env(values);
}
pub(crate) async fn set_server_reasoning_included(&self, included: bool) {
let mut state = self.state.lock().await;
state.set_server_reasoning_included(included);
}
async fn send_token_count_event(&self, turn_context: &TurnContext) {
let (info, rate_limits) = {
let state = self.state.lock().await;
state.token_info_and_rate_limits()
};
let event = EventMsg::TokenCount(TokenCountEvent { info, rate_limits });
self.send_event(turn_context, event).await;
}
pub(crate) async fn set_total_tokens_full(&self, turn_context: &TurnContext) {
if let Some(context_window) = turn_context.model_context_window() {
let mut state = self.state.lock().await;
state.set_token_usage_full(context_window);
}
self.send_token_count_event(turn_context).await;
}
pub(crate) async fn record_response_item_and_emit_turn_item(
&self,
turn_context: &TurnContext,
response_item: ResponseItem,
) {
// Add to conversation history and persist response item to rollout.
self.record_conversation_items(turn_context, std::slice::from_ref(&response_item))
.await;
// Derive a turn item and emit lifecycle events if applicable.
if let Some(item) = parse_turn_item(&response_item) {
self.emit_turn_item_started(turn_context, &item).await;
self.emit_turn_item_completed(turn_context, item).await;
}
}
pub(crate) async fn record_user_prompt_and_emit_turn_item(
&self,
turn_context: &TurnContext,
input: &[UserInput],
response_item: ResponseItem,
) {
// Persist the user message to history, but emit the turn item from `UserInput` so
// UI-only `text_elements` are preserved. `ResponseItem::Message` does not carry
// those spans, and `record_response_item_and_emit_turn_item` would drop them.
self.record_conversation_items(turn_context, std::slice::from_ref(&response_item))
.await;
let turn_item = TurnItem::UserMessage(UserMessageItem::new(input));
self.emit_turn_item_started(turn_context, &turn_item).await;
self.emit_turn_item_completed(turn_context, turn_item).await;
self.ensure_rollout_materialized().await;
}
pub(crate) async fn notify_stream_error(
&self,
turn_context: &TurnContext,
message: impl Into<String>,
codex_error: CodexErr,
) {
let additional_details = codex_error.to_string();
let codex_error_info = CodexErrorInfo::ResponseStreamDisconnected {
http_status_code: codex_error.http_status_code_value(),
};
let event = EventMsg::StreamError(StreamErrorEvent {
message: message.into(),
codex_error_info: Some(codex_error_info),
additional_details: Some(additional_details),
});
self.send_event(turn_context, event).await;
}
/// Inject additional user input into the currently active turn.
///
/// Returns the active turn id when accepted.
#[expect(
clippy::await_holding_invalid_type,
reason = "active turn checks and turn state updates must remain atomic"
)]
pub async fn steer_input(
&self,
input: Vec<UserInput>,
expected_turn_id: Option<&str>,
responsesapi_client_metadata: Option<HashMap<String, String>>,
) -> Result<String, SteerInputError> {
if input.is_empty() {
return Err(SteerInputError::EmptyInput);
}
let mut active = self.active_turn.lock().await;
let Some(active_turn) = active.as_mut() else {
return Err(SteerInputError::NoActiveTurn(input));
};
let Some((active_turn_id, _)) = active_turn.tasks.first() else {
return Err(SteerInputError::NoActiveTurn(input));
};
if let Some(expected_turn_id) = expected_turn_id
&& expected_turn_id != active_turn_id
{
return Err(SteerInputError::ExpectedTurnMismatch {
expected: expected_turn_id.to_string(),
actual: active_turn_id.clone(),
});
}
match active_turn.tasks.first().map(|(_, task)| task.kind) {
Some(crate::state::TaskKind::Regular) => {}
Some(crate::state::TaskKind::Review) => {
return Err(SteerInputError::ActiveTurnNotSteerable {
turn_kind: NonSteerableTurnKind::Review,
});
}
Some(crate::state::TaskKind::Compact) => {
return Err(SteerInputError::ActiveTurnNotSteerable {
turn_kind: NonSteerableTurnKind::Compact,
});
}
None => return Err(SteerInputError::NoActiveTurn(input)),
}
if let Some(responsesapi_client_metadata) = responsesapi_client_metadata
&& let Some((_, active_task)) = active_turn.tasks.first()
{
active_task
.turn_context
.turn_metadata_state
.set_responsesapi_client_metadata(responsesapi_client_metadata);
}
let mut turn_state = active_turn.turn_state.lock().await;
turn_state.push_pending_input(input.into());
turn_state.accept_mailbox_delivery_for_current_turn();
Ok(active_turn_id.clone())
}
/// Returns the input if there was no task running to inject into.
#[expect(
clippy::await_holding_invalid_type,
reason = "active turn checks and turn state updates must remain atomic"
)]
pub async fn inject_response_items(
&self,
input: Vec<ResponseInputItem>,
) -> Result<(), Vec<ResponseInputItem>> {
let mut active = self.active_turn.lock().await;
match active.as_mut() {
Some(at) => {
let mut ts = at.turn_state.lock().await;
for item in input {
ts.push_pending_input(item);
}
Ok(())
}
None => Err(input),
}
}
pub(crate) async fn defer_mailbox_delivery_to_next_turn(&self, sub_id: &str) {
let turn_state = self.turn_state_for_sub_id(sub_id).await;
let Some(turn_state) = turn_state else {
return;
};
let mut turn_state = turn_state.lock().await;
if turn_state.has_pending_input() {
return;
}
turn_state.set_mailbox_delivery_phase(MailboxDeliveryPhase::NextTurn);
}
pub(crate) async fn accept_mailbox_delivery_for_current_turn(&self, sub_id: &str) {
let turn_state = self.turn_state_for_sub_id(sub_id).await;
let Some(turn_state) = turn_state else {
return;
};
turn_state
.lock()
.await
.set_mailbox_delivery_phase(MailboxDeliveryPhase::CurrentTurn);
}
pub(crate) async fn record_memory_citation_for_turn(&self, sub_id: &str) {
let turn_state = self.turn_state_for_sub_id(sub_id).await;
let Some(turn_state) = turn_state else {
return;
};
turn_state.lock().await.has_memory_citation = true;
}
async fn turn_state_for_sub_id(
&self,
sub_id: &str,
) -> Option<Arc<tokio::sync::Mutex<crate::state::TurnState>>> {
let active = self.active_turn.lock().await;
active.as_ref().and_then(|active_turn| {
active_turn
.tasks
.contains_key(sub_id)
.then(|| Arc::clone(&active_turn.turn_state))
})
}
pub(crate) fn subscribe_mailbox_seq(&self) -> watch::Receiver<u64> {
self.mailbox.subscribe()
}
pub(crate) fn enqueue_mailbox_communication(&self, communication: InterAgentCommunication) {
self.mailbox.send(communication);
}
pub(crate) async fn has_trigger_turn_mailbox_items(&self) -> bool {
self.mailbox_rx.lock().await.has_pending_trigger_turn()
}
pub(crate) async fn has_pending_mailbox_items(&self) -> bool {
self.mailbox_rx.lock().await.has_pending()
}
#[expect(
clippy::await_holding_invalid_type,
reason = "active turn checks and turn state updates must remain atomic"
)]
pub async fn prepend_pending_input(&self, input: Vec<ResponseInputItem>) -> Result<(), ()> {
let mut active = self.active_turn.lock().await;
match active.as_mut() {
Some(at) => {
let mut ts = at.turn_state.lock().await;
ts.prepend_pending_input(input);
Ok(())
}
None => Err(()),
}
}
#[expect(
clippy::await_holding_invalid_type,
reason = "active turn checks and turn state updates must remain atomic"
)]
pub async fn get_pending_input(&self) -> Vec<ResponseInputItem> {
let (pending_input, accepts_mailbox_delivery) = {
let mut active = self.active_turn.lock().await;
match active.as_mut() {
Some(at) => {
let mut ts = at.turn_state.lock().await;
(
ts.take_pending_input(),
ts.accepts_mailbox_delivery_for_current_turn(),
)
}
None => (Vec::new(), true),
}
};
if !accepts_mailbox_delivery {
return pending_input;
}
let mailbox_items = {
let mut mailbox_rx = self.mailbox_rx.lock().await;
mailbox_rx
.drain()
.into_iter()
.map(|mail| mail.to_response_input_item())
.collect::<Vec<_>>()
};
if pending_input.is_empty() {
mailbox_items
} else if mailbox_items.is_empty() {
pending_input
} else {
let mut pending_input = pending_input;
pending_input.extend(mailbox_items);
pending_input
}
}
/// Queue response items to be injected into the next active turn created for this session.
pub(crate) async fn queue_response_items_for_next_turn(&self, items: Vec<ResponseInputItem>) {
if items.is_empty() {
return;
}
let mut idle_pending_input = self.idle_pending_input.lock().await;
idle_pending_input.extend(items);
}
pub(crate) async fn take_queued_response_items_for_next_turn(&self) -> Vec<ResponseInputItem> {
std::mem::take(&mut *self.idle_pending_input.lock().await)
}
pub(crate) async fn has_queued_response_items_for_next_turn(&self) -> bool {
!self.idle_pending_input.lock().await.is_empty()
}
#[expect(
clippy::await_holding_invalid_type,
reason = "active turn checks and turn state reads must remain atomic"
)]
pub async fn has_pending_input(&self) -> bool {
let (has_turn_pending_input, accepts_mailbox_delivery) = {
let active = self.active_turn.lock().await;
match active.as_ref() {
Some(at) => {
let ts = at.turn_state.lock().await;
(
ts.has_pending_input(),
ts.accepts_mailbox_delivery_for_current_turn(),
)
}
None => (false, true),
}
};
if has_turn_pending_input {
return true;
}
if !accepts_mailbox_delivery {
return false;
}
self.has_pending_mailbox_items().await
}
pub async fn interrupt_task(self: &Arc<Self>) {
info!("interrupt received: abort current task, if any");
let had_active_turn = self.active_turn.lock().await.is_some();
// Even without an active task, interrupt handling pauses any active goal.
self.abort_all_tasks(TurnAbortReason::Interrupted).await;
if !had_active_turn {
self.cancel_mcp_startup().await;
}
}
pub(crate) fn hooks(&self) -> Arc<Hooks> {
self.services.hooks.load_full()
}
pub(crate) fn user_shell(&self) -> Arc<shell::Shell> {
Arc::clone(&self.services.user_shell)
}
pub(crate) async fn current_rollout_path(&self) -> anyhow::Result<Option<PathBuf>> {
let Some(live_thread) = self.live_thread() else {
return Ok(None);
};
live_thread.local_rollout_path().await.map_err(Into::into)
}
pub(crate) async fn hook_transcript_path(&self) -> Option<PathBuf> {
self.ensure_rollout_materialized().await;
match self.current_rollout_path().await {
Ok(path) => path,
Err(err) => {
warn!("{err}");
None
}
}
}
pub(crate) async fn take_pending_session_start_source(
&self,
) -> Option<codex_hooks::SessionStartSource> {
let mut state = self.state.lock().await;
state.take_pending_session_start_source()
}
fn show_raw_agent_reasoning(&self) -> bool {
self.services.show_raw_agent_reasoning
}
}
pub(crate) fn emit_subagent_session_started(
analytics_events_client: &AnalyticsEventsClient,
client_metadata: AppServerClientMetadata,
thread_id: ThreadId,
parent_thread_id: Option<ThreadId>,
thread_config: ThreadConfigSnapshot,
subagent_source: SubAgentSource,
) {
let AppServerClientMetadata {
client_name,
client_version,
} = client_metadata;
let (Some(client_name), Some(client_version)) = (client_name, client_version) else {
tracing::warn!("skipping subagent thread analytics: missing inherited client metadata");
return;
};
let created_at = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
analytics_events_client.track_subagent_thread_started(SubAgentThreadStartedInput {
thread_id: thread_id.to_string(),
parent_thread_id: parent_thread_id.map(|thread_id| thread_id.to_string()),
product_client_id: client_name.clone(),
client_name,
client_version,
model: thread_config.model,
ephemeral: thread_config.ephemeral,
subagent_source,
created_at,
});
}
use codex_memories_read::build_memory_tool_developer_instructions;
/// Builds the hook engine for one config snapshot, including any enabled plugin hooks.
async fn build_hooks_for_config(
config: &Config,
plugins_manager: &PluginsManager,
user_shell: &crate::shell::Shell,
) -> Hooks {
let mut hook_shell_argv = user_shell.derive_exec_args("", /*use_login_shell*/ false);
let hook_shell_program = hook_shell_argv.remove(0);
let _ = hook_shell_argv.pop();
let plugin_hooks_enabled = config.features.enabled(Feature::PluginHooks);
let (plugin_hook_sources, plugin_hook_load_warnings) = if plugin_hooks_enabled {
let plugins_input = config.plugins_config_input();
let plugin_outcome = plugins_manager.plugins_for_config(&plugins_input).await;
(
plugin_outcome.effective_plugin_hook_sources(),
plugin_outcome.effective_plugin_hook_warnings(),
)
} else {
(Vec::new(), Vec::new())
};
Hooks::new(HooksConfig {
legacy_notify_argv: config.notify.clone(),
feature_enabled: config.features.enabled(Feature::CodexHooks),
config_layer_stack: Some(config.config_layer_stack.clone()),
plugin_hook_sources,
plugin_hook_load_warnings,
shell_program: Some(hook_shell_program),
shell_args: hook_shell_argv,
})
}
#[cfg(test)]
pub(crate) mod tests;