use crate::agent::AgentStatus; use crate::codex::Codex; use crate::codex::SteerInputError; use crate::config::ConstraintResult; use crate::file_watcher::WatchRegistration; use codex_features::Feature; use codex_protocol::config_types::ApprovalsReviewer; use codex_protocol::config_types::Personality; use codex_protocol::config_types::ServiceTier; use codex_protocol::error::CodexErr; use codex_protocol::error::Result as CodexResult; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseInputItem; use codex_protocol::models::ResponseItem; use codex_protocol::openai_models::ReasoningEffort; use codex_protocol::protocol::AskForApproval; use codex_protocol::protocol::Event; use codex_protocol::protocol::Op; use codex_protocol::protocol::SandboxPolicy; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::Submission; use codex_protocol::protocol::TokenUsage; use codex_protocol::protocol::W3cTraceContext; use codex_protocol::user_input::UserInput; use std::collections::HashMap; use std::path::PathBuf; use tokio::sync::Mutex; use tokio::sync::watch; use codex_rollout::state_db::StateDbHandle; #[derive(Clone, Debug)] pub struct ThreadConfigSnapshot { pub model: String, pub model_provider_id: String, pub service_tier: Option, pub approval_policy: AskForApproval, pub approvals_reviewer: ApprovalsReviewer, pub sandbox_policy: SandboxPolicy, pub cwd: PathBuf, pub ephemeral: bool, pub reasoning_effort: Option, pub personality: Option, pub session_source: SessionSource, } pub struct CodexThread { pub(crate) codex: Codex, rollout_path: Option, out_of_band_elicitation_count: Mutex, _watch_registration: WatchRegistration, } /// Conduit for the bidirectional stream of messages that compose a thread /// (formerly called a conversation) in Codex. impl CodexThread { pub(crate) fn new( codex: Codex, rollout_path: Option, watch_registration: WatchRegistration, ) -> Self { Self { codex, rollout_path, out_of_band_elicitation_count: Mutex::new(0), _watch_registration: watch_registration, } } pub async fn submit(&self, op: Op) -> CodexResult { self.codex.submit(op).await } pub async fn shutdown_and_wait(&self) -> CodexResult<()> { self.codex.shutdown_and_wait().await } #[doc(hidden)] pub async fn ensure_rollout_materialized(&self) { self.codex.session.ensure_rollout_materialized().await; } #[doc(hidden)] pub async fn flush_rollout(&self) { self.codex.session.flush_rollout().await; } pub async fn submit_with_trace( &self, op: Op, trace: Option, ) -> CodexResult { self.codex.submit_with_trace(op, trace).await } pub async fn steer_input( &self, input: Vec, expected_turn_id: Option<&str>, ) -> Result { self.codex.steer_input(input, expected_turn_id).await } pub async fn set_app_server_client_info( &self, app_server_client_name: Option, app_server_client_version: Option, ) -> ConstraintResult<()> { self.codex .set_app_server_client_info(app_server_client_name, app_server_client_version) .await } /// Use sparingly: this is intended to be removed soon. pub async fn submit_with_id(&self, sub: Submission) -> CodexResult<()> { self.codex.submit_with_id(sub).await } pub async fn next_event(&self) -> CodexResult { self.codex.next_event().await } pub async fn agent_status(&self) -> AgentStatus { self.codex.agent_status().await } pub async fn dependency_env(&self) -> HashMap { self.codex.session.dependency_env().await } pub async fn set_dependency_env(&self, values: HashMap) { self.codex.session.set_dependency_env(values).await; } pub(crate) fn subscribe_status(&self) -> watch::Receiver { self.codex.agent_status.clone() } pub(crate) async fn total_token_usage(&self) -> Option { self.codex.session.total_token_usage().await } /// Records a user-role session-prefix message without creating a new user turn boundary. pub(crate) async fn inject_user_message_without_turn(&self, message: String) { let message = ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: message }], end_turn: None, phase: None, }; let pending_item = match pending_message_input_item(&message) { Ok(pending_item) => pending_item, Err(err) => { debug_assert!(false, "session-prefix message append should succeed: {err}"); return; } }; if self .codex .session .inject_response_items(vec![pending_item]) .await .is_err() { let turn_context = self.codex.session.new_default_turn().await; self.codex .session .record_conversation_items(turn_context.as_ref(), &[message]) .await; } } /// Append a prebuilt message to the thread history without treating it as a user turn. /// /// If the thread already has an active turn, the message is queued as pending input for that /// turn. Otherwise it is queued at session scope and a regular turn is started so the agent /// can consume that pending input through the normal turn pipeline. #[cfg(test)] pub(crate) async fn append_message(&self, message: ResponseItem) -> CodexResult { let submission_id = uuid::Uuid::new_v4().to_string(); let pending_item = pending_message_input_item(&message)?; if let Err(items) = self .codex .session .inject_response_items(vec![pending_item]) .await { self.codex .session .queue_response_items_for_next_turn(items) .await; self.codex.session.maybe_start_turn_for_pending_work().await; } Ok(submission_id) } pub fn rollout_path(&self) -> Option { self.rollout_path.clone() } pub fn state_db(&self) -> Option { self.codex.state_db() } pub async fn config_snapshot(&self) -> ThreadConfigSnapshot { self.codex.thread_config_snapshot().await } pub fn enabled(&self, feature: Feature) -> bool { self.codex.enabled(feature) } pub async fn increment_out_of_band_elicitation_count(&self) -> CodexResult { let mut guard = self.out_of_band_elicitation_count.lock().await; let was_zero = *guard == 0; *guard = guard.checked_add(1).ok_or_else(|| { CodexErr::Fatal("out-of-band elicitation count overflowed".to_string()) })?; if was_zero { self.codex .session .set_out_of_band_elicitation_pause_state(/*paused*/ true); } Ok(*guard) } pub async fn decrement_out_of_band_elicitation_count(&self) -> CodexResult { let mut guard = self.out_of_band_elicitation_count.lock().await; if *guard == 0 { return Err(CodexErr::InvalidRequest( "out-of-band elicitation count is already zero".to_string(), )); } *guard -= 1; let now_zero = *guard == 0; if now_zero { self.codex .session .set_out_of_band_elicitation_pause_state(/*paused*/ false); } Ok(*guard) } } fn pending_message_input_item(message: &ResponseItem) -> CodexResult { match message { ResponseItem::Message { role, content, .. } => Ok(ResponseInputItem::Message { role: role.clone(), content: content.clone(), }), _ => Err(CodexErr::InvalidRequest( "append_message only supports ResponseItem::Message".to_string(), )), } }