use crate::agent::AgentStatus; use crate::config::ConstraintResult; use crate::file_watcher::WatchRegistration; use crate::goals::GoalRuntimeEvent; use crate::session::Codex; use crate::session::SessionSettingsUpdate; use crate::session::SteerInputError; use codex_features::Feature; use codex_protocol::config_types::ApprovalsReviewer; use codex_protocol::config_types::CollaborationMode; use codex_protocol::config_types::Personality; use codex_protocol::config_types::ReasoningSummary; use codex_protocol::config_types::ServiceTier; use codex_protocol::config_types::WindowsSandboxLevel; use codex_protocol::error::CodexErr; use codex_protocol::error::Result as CodexResult; use codex_protocol::mcp::CallToolResult; use codex_protocol::models::ActivePermissionProfile; use codex_protocol::models::ContentItem; use codex_protocol::models::PermissionProfile; use codex_protocol::models::ResponseInputItem; use codex_protocol::models::ResponseItem; use codex_protocol::openai_models::ReasoningEffort; use codex_protocol::protocol::AskForApproval; use codex_protocol::protocol::Event; use codex_protocol::protocol::Op; use codex_protocol::protocol::SandboxPolicy; use codex_protocol::protocol::SessionConfiguredEvent; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::Submission; use codex_protocol::protocol::ThreadMemoryMode; use codex_protocol::protocol::TokenUsageInfo; use codex_protocol::protocol::W3cTraceContext; use codex_protocol::user_input::UserInput; use codex_thread_store::StoredThreadHistory; use codex_thread_store::ThreadStoreError; use codex_thread_store::ThreadStoreResult; use codex_utils_absolute_path::AbsolutePathBuf; use rmcp::model::ReadResourceRequestParams; use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::Mutex; use tokio::sync::watch; use codex_rollout::state_db::StateDbHandle; #[derive(Clone, Debug)] pub struct ThreadConfigSnapshot { pub model: String, pub model_provider_id: String, pub service_tier: Option, pub approval_policy: AskForApproval, pub approvals_reviewer: ApprovalsReviewer, pub permission_profile: PermissionProfile, pub active_permission_profile: Option, pub cwd: AbsolutePathBuf, pub ephemeral: bool, pub reasoning_effort: Option, pub personality: Option, pub session_source: SessionSource, } impl ThreadConfigSnapshot { pub fn sandbox_policy(&self) -> SandboxPolicy { let file_system_sandbox_policy = self.permission_profile.file_system_sandbox_policy(); codex_sandboxing::compatibility_sandbox_policy_for_permission_profile( &self.permission_profile, &file_system_sandbox_policy, self.permission_profile.network_sandbox_policy(), self.cwd.as_path(), ) } } /// Turn context overrides that app-server validates before starting a turn. #[derive(Clone, Default)] pub struct CodexThreadTurnContextOverrides { pub cwd: Option, pub approval_policy: Option, pub approvals_reviewer: Option, pub sandbox_policy: Option, pub permission_profile: Option, pub active_permission_profile: Option, pub windows_sandbox_level: Option, pub model: Option, pub effort: Option>, pub summary: Option, pub service_tier: Option>, pub collaboration_mode: Option, pub personality: Option, } pub struct CodexThread { pub(crate) codex: Codex, pub(crate) session_source: SessionSource, session_configured: SessionConfiguredEvent, rollout_path: Option, out_of_band_elicitation_count: Mutex, _watch_registration: WatchRegistration, } /// Conduit for the bidirectional stream of messages that compose a thread /// (formerly called a conversation) in Codex. impl CodexThread { pub(crate) fn new( codex: Codex, session_configured: SessionConfiguredEvent, rollout_path: Option, session_source: SessionSource, watch_registration: WatchRegistration, ) -> Self { Self { codex, session_source, session_configured, rollout_path, out_of_band_elicitation_count: Mutex::new(0), _watch_registration: watch_registration, } } pub async fn submit(&self, op: Op) -> CodexResult { self.codex.submit(op).await } pub async fn shutdown_and_wait(&self) -> CodexResult<()> { self.codex.shutdown_and_wait().await } /// Wait until the underlying session loop has terminated. pub async fn wait_until_terminated(&self) { self.codex.session_loop_termination.clone().await; } pub async fn apply_goal_resume_runtime_effects(&self) -> anyhow::Result<()> { self.codex .session .goal_runtime_apply(GoalRuntimeEvent::ThreadResumed) .await } pub async fn continue_active_goal_if_idle(&self) -> anyhow::Result<()> { self.codex .session .goal_runtime_apply(GoalRuntimeEvent::MaybeContinueIfIdle) .await } pub async fn prepare_external_goal_mutation(&self) { if let Err(err) = self .codex .session .goal_runtime_apply(GoalRuntimeEvent::ExternalMutationStarting) .await { tracing::warn!("failed to prepare external goal mutation: {err}"); } } pub async fn apply_external_goal_set(&self, status: codex_state::ThreadGoalStatus) { if let Err(err) = self .codex .session .goal_runtime_apply(GoalRuntimeEvent::ExternalSet { status }) .await { tracing::warn!("failed to apply external goal status runtime effects: {err}"); } } pub async fn apply_external_goal_clear(&self) { if let Err(err) = self .codex .session .goal_runtime_apply(GoalRuntimeEvent::ExternalClear) .await { tracing::warn!("failed to apply external goal clear runtime effects: {err}"); } } #[doc(hidden)] pub async fn ensure_rollout_materialized(&self) { self.codex.session.ensure_rollout_materialized().await; } #[doc(hidden)] pub async fn flush_rollout(&self) -> std::io::Result<()> { self.codex.session.flush_rollout().await } pub async fn submit_with_trace( &self, op: Op, trace: Option, ) -> CodexResult { self.codex.submit_with_trace(op, trace).await } /// Persist whether this thread is eligible for future memory generation. pub async fn set_thread_memory_mode(&self, mode: ThreadMemoryMode) -> anyhow::Result<()> { self.codex.set_thread_memory_mode(mode).await } pub async fn steer_input( &self, input: Vec, expected_turn_id: Option<&str>, responsesapi_client_metadata: Option>, ) -> Result { self.codex .steer_input(input, expected_turn_id, responsesapi_client_metadata) .await } pub async fn set_app_server_client_info( &self, app_server_client_name: Option, app_server_client_version: Option, ) -> ConstraintResult<()> { self.codex .set_app_server_client_info(app_server_client_name, app_server_client_version) .await } /// Validate persistent turn context overrides without committing them. pub async fn validate_turn_context_overrides( &self, overrides: CodexThreadTurnContextOverrides, ) -> ConstraintResult<()> { let CodexThreadTurnContextOverrides { cwd, approval_policy, approvals_reviewer, sandbox_policy, permission_profile, active_permission_profile, windows_sandbox_level, model, effort, summary, service_tier, collaboration_mode, personality, } = overrides; let collaboration_mode = if let Some(collaboration_mode) = collaboration_mode { collaboration_mode } else { self.codex .session .collaboration_mode() .await .with_updates(model, effort, /*developer_instructions*/ None) }; let updates = SessionSettingsUpdate { cwd, approval_policy, approvals_reviewer, sandbox_policy, permission_profile, active_permission_profile, windows_sandbox_level, collaboration_mode: Some(collaboration_mode), reasoning_summary: summary, service_tier, personality, ..Default::default() }; self.codex.session.validate_settings(&updates).await } /// Use sparingly: this is intended to be removed soon. pub async fn submit_with_id(&self, sub: Submission) -> CodexResult<()> { self.codex.submit_with_id(sub).await } pub async fn next_event(&self) -> CodexResult { self.codex.next_event().await } pub async fn agent_status(&self) -> AgentStatus { self.codex.agent_status().await } pub(crate) fn subscribe_status(&self) -> watch::Receiver { self.codex.agent_status.clone() } /// Returns the complete token usage snapshot currently cached for this thread. /// /// This accessor is intentionally narrower than direct session access: it lets /// app-server lifecycle paths replay restored usage after resume or fork without /// exposing broader session mutation authority. A caller that only reads /// `total_token_usage` would drop last-turn usage and make the v2 /// `thread/tokenUsage/updated` payload incomplete. pub async fn token_usage_info(&self) -> Option { self.codex.session.token_usage_info().await } /// Records a user-role session-prefix message without creating a new user turn boundary. pub(crate) async fn inject_user_message_without_turn(&self, message: String) { let message = ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: message }], phase: None, }; let pending_item = match pending_message_input_item(&message) { Ok(pending_item) => pending_item, Err(err) => { debug_assert!(false, "session-prefix message append should succeed: {err}"); return; } }; if self .codex .session .inject_response_items(vec![pending_item]) .await .is_err() { let turn_context = self.codex.session.new_default_turn().await; self.codex .session .record_conversation_items(turn_context.as_ref(), &[message]) .await; } } /// Append a prebuilt message to the thread history without treating it as a user turn. /// /// If the thread already has an active turn, the message is queued as pending input for that /// turn. Otherwise it is queued at session scope and a regular turn is started so the agent /// can consume that pending input through the normal turn pipeline. #[cfg(test)] pub(crate) async fn append_message(&self, message: ResponseItem) -> CodexResult { let submission_id = uuid::Uuid::new_v4().to_string(); let pending_item = pending_message_input_item(&message)?; if let Err(items) = self .codex .session .inject_response_items(vec![pending_item]) .await { self.codex .session .queue_response_items_for_next_turn(items) .await; self.codex.session.maybe_start_turn_for_pending_work().await; } Ok(submission_id) } /// Append raw Responses API items to the thread's model-visible history. pub async fn inject_response_items(&self, items: Vec) -> CodexResult<()> { if items.is_empty() { return Err(CodexErr::InvalidRequest( "items must not be empty".to_string(), )); } let turn_context = self.codex.session.new_default_turn().await; if self.codex.session.reference_context_item().await.is_none() { self.codex .session .record_context_updates_and_set_reference_context_item(turn_context.as_ref()) .await; } self.codex .session .record_conversation_items(turn_context.as_ref(), &items) .await; self.codex.session.flush_rollout().await?; Ok(()) } pub fn rollout_path(&self) -> Option { self.rollout_path.clone() } pub(crate) fn session_configured(&self) -> SessionConfiguredEvent { self.session_configured.clone() } pub(crate) fn is_running(&self) -> bool { !self.codex.tx_sub.is_closed() } pub async fn guardian_trunk_rollout_path(&self) -> Option { self.codex .session .guardian_review_session .trunk_rollout_path() .await } pub async fn load_history( &self, include_archived: bool, ) -> ThreadStoreResult { let live_thread = self .codex .session .live_thread_for_persistence("load history") .map_err(|err| ThreadStoreError::Internal { message: err.to_string(), })?; live_thread.load_history(include_archived).await } pub fn state_db(&self) -> Option { self.codex.state_db() } pub async fn config_snapshot(&self) -> ThreadConfigSnapshot { self.codex.thread_config_snapshot().await } pub async fn config(&self) -> Arc { self.codex.session.get_config().await } pub async fn read_mcp_resource( &self, server: &str, uri: &str, ) -> anyhow::Result { let result = self .codex .session .read_resource( server, ReadResourceRequestParams { meta: None, uri: uri.to_string(), }, ) .await?; Ok(serde_json::to_value(result)?) } pub async fn call_mcp_tool( &self, server: &str, tool: &str, arguments: Option, meta: Option, ) -> anyhow::Result { self.codex .session .call_tool(server, tool, arguments, meta) .await } pub fn enabled(&self, feature: Feature) -> bool { self.codex.enabled(feature) } pub async fn increment_out_of_band_elicitation_count(&self) -> CodexResult { let mut guard = self.out_of_band_elicitation_count.lock().await; let was_zero = *guard == 0; *guard = guard.checked_add(1).ok_or_else(|| { CodexErr::Fatal("out-of-band elicitation count overflowed".to_string()) })?; if was_zero { self.codex .session .set_out_of_band_elicitation_pause_state(/*paused*/ true); } Ok(*guard) } pub async fn decrement_out_of_band_elicitation_count(&self) -> CodexResult { let mut guard = self.out_of_band_elicitation_count.lock().await; if *guard == 0 { return Err(CodexErr::InvalidRequest( "out-of-band elicitation count is already zero".to_string(), )); } *guard -= 1; let now_zero = *guard == 0; if now_zero { self.codex .session .set_out_of_band_elicitation_pause_state(/*paused*/ false); } Ok(*guard) } } fn pending_message_input_item(message: &ResponseItem) -> CodexResult { match message { ResponseItem::Message { role, content, phase, .. } => Ok(ResponseInputItem::Message { role: role.clone(), content: content.clone(), phase: phase.clone(), }), _ => Err(CodexErr::InvalidRequest( "append_message only supports ResponseItem::Message".to_string(), )), } }