diff --git a/codex-rs/codex-infty/src/orchestrator.rs b/codex-rs/codex-infty/src/orchestrator.rs index bf747fe96f..a1079464b7 100644 --- a/codex-rs/codex-infty/src/orchestrator.rs +++ b/codex-rs/codex-infty/src/orchestrator.rs @@ -10,8 +10,6 @@ use anyhow::bail; use codex_core::CodexAuth; use codex_core::CodexConversation; use codex_core::ConversationManager; -use codex_core::config::Config; -use codex_core::config::ConfigOverrides; use codex_core::cross_session::CrossSessionHub; use codex_core::protocol::EventMsg; use codex_core::protocol::Op; @@ -212,17 +210,8 @@ impl InftyOrchestrator { let mut solver_events = solver_role.stream_events()?; let mut state = LoopState::default(); - if let Some(objective) = &options.objective { - solver_role - .post(objective.as_str(), Some(SolverRole::solver_signal_schema())) - .await?; - sessions.store.touch()?; - state.waiting_for_signal = true; - if let Some(p) = self.progress_ref() { - p.objective_posted(objective); - p.waiting_for_solver(); - } - } + self.maybe_post_objective(&solver_role, sessions, &mut state, options) + .await?; let ctrl_c = signal::ctrl_c(); tokio::pin!(ctrl_c); @@ -349,6 +338,29 @@ impl InftyOrchestrator { )) } + async fn maybe_post_objective( + &self, + solver: &crate::roles::solver::SolverRole, + sessions: &mut RunSessions, + state: &mut LoopState, + options: &RunExecutionOptions, + ) -> Result<()> { + if let Some(objective) = options.objective.as_deref() + && !objective.trim().is_empty() + { + solver + .post(objective, Some(SolverRole::solver_signal_schema())) + .await?; + sessions.store.touch()?; + state.waiting_for_signal = true; + if let Some(p) = self.progress_ref() { + p.objective_posted(objective); + p.waiting_for_solver(); + } + } + Ok(()) + } + async fn handle_direction_request( &self, prompt: &str, @@ -385,13 +397,9 @@ impl InftyOrchestrator { return Ok(true); } let round = verifier_pool.collect_round(&request).await?; - for role in &round.passing_roles { - if let Err(err) = self.replace_verifier_session(sessions, role).await { - warn!(role = %role, ?err, "failed to replace verifier session; keeping existing"); - } else { - verifier_pool.replace_role(role); - } - } + verifier_pool + .rotate_passing(sessions, &self.conversation_manager, &round.passing_roles) + .await?; let summary = round.summary; self.emit_verification_summary(&summary); let req = SolverRequest::from(&summary); @@ -421,13 +429,9 @@ impl InftyOrchestrator { return Ok(true); } let round = verifier_pool.collect_round(&request).await?; - for role in &round.passing_roles { - if let Err(err) = self.replace_verifier_session(sessions, role).await { - warn!(role = %role, ?err, "failed to replace verifier session; keeping existing"); - } else { - verifier_pool.replace_role(role); - } - } + verifier_pool + .rotate_passing(sessions, &self.conversation_manager, &round.passing_roles) + .await?; let summary_result = round.summary; self.emit_verification_summary(&summary_result); let req = SolverRequest::from(&summary_result); @@ -435,47 +439,6 @@ impl InftyOrchestrator { Ok(summary_result.overall.is_pass()) } - async fn replace_verifier_session(&self, sessions: &mut RunSessions, role: &str) -> Result<()> { - // Find the existing verifier session index by role - let idx = sessions - .verifiers - .iter() - .position(|s| s.role == role) - .ok_or_else(|| anyhow!(format!("verifier role {role} not found")))?; - - // Shut down the old session and unregister it from the hub - let old = &sessions.verifiers[idx]; - // best-effort shutdown; ignore errors but proceed to unregister - let _ = old.conversation.submit(Op::Shutdown).await; - let _ = self - .conversation_manager - .remove_conversation(&old.conversation_id) - .await; - - // Prepare a fresh Config using current user defaults, then apply our autonomous policies - let config = Config::load_with_cli_overrides(Vec::new(), ConfigOverrides::default()) - .await - .context("failed to load Codex config for verifier respawn")?; - // RoleConfig::new applies sandbox + approval; mimic that here via the constructor - let role_config = crate::types::RoleConfig::new(role.to_string(), config); - - // Spawn a new verifier session and register it - let mut dummy = Vec::new(); - let run_path = sessions.store.path().to_path_buf(); - let new_session = self - .spawn_and_register_role( - &sessions.run_id, - &run_path, - &role_config, - &mut sessions.store, - &mut dummy, - ) - .await?; - - sessions.verifiers[idx] = new_session; - Ok(()) - } - fn emit_verification_summary(&self, summary: &AggregatedVerifierVerdict) { if let Some(progress) = self.progress.as_ref() { progress.verification_summary(summary); diff --git a/codex-rs/codex-infty/src/roles/verifier_pool.rs b/codex-rs/codex-infty/src/roles/verifier_pool.rs index 6ea783abcd..9a63584d84 100644 --- a/codex-rs/codex-infty/src/roles/verifier_pool.rs +++ b/codex-rs/codex-infty/src/roles/verifier_pool.rs @@ -3,15 +3,22 @@ use std::time::Duration; use anyhow::Context as _; use anyhow::Result; +use codex_core::ConversationManager; +use codex_core::config::Config; +use codex_core::config::ConfigOverrides; use codex_core::cross_session::CrossSessionHub; +use codex_core::protocol::Op; use crate::progress::ProgressReporter; +use crate::prompts; use crate::roles::Role; use crate::roles::verifier::VerificationRequestPayload; use crate::roles::verifier::VerifierRole; use crate::roles::verifier::aggregate_verdicts; +use crate::session; use crate::signals::AggregatedVerifierVerdict; use crate::signals::VerifierVerdict; +use crate::types::RoleConfig; use crate::types::RunSessions; pub struct VerificationRound { @@ -106,4 +113,44 @@ impl VerifierPool { ); } } + + pub async fn rotate_passing( + &mut self, + sessions: &mut RunSessions, + manager: &ConversationManager, + passing_roles: &[String], + ) -> Result<()> { + for role in passing_roles { + // find existing index + let Some(idx) = sessions.verifiers.iter().position(|s| &s.role == role) else { + continue; + }; + let old = &sessions.verifiers[idx]; + // best-effort shutdown and unregister + let _ = old.conversation.submit(Op::Shutdown).await; + let _ = manager.remove_conversation(&old.conversation_id).await; + + // load fresh config and spawn a new session + let config = Config::load_with_cli_overrides(Vec::new(), ConfigOverrides::default()) + .await + .context("failed to load Codex config for verifier respawn")?; + let role_config = RoleConfig::new(role.to_string(), config); + let run_path = sessions.store.path(); + let session = session::spawn_role( + Arc::clone(&self.hub), + manager, + &self.run_id, + run_path, + role_config, + prompts::ensure_instructions, + ) + .await?; + sessions + .store + .update_rollout_path(&session.role, session.rollout_path.clone())?; + sessions.verifiers[idx] = session; + self.replace_role(role); + } + Ok(()) + } } diff --git a/codex-rs/codex-infty/src/utils.rs b/codex-rs/codex-infty/src/utils.rs index 44f41821b0..44e24ad631 100644 --- a/codex-rs/codex-infty/src/utils.rs +++ b/codex-rs/codex-infty/src/utils.rs @@ -58,3 +58,28 @@ pub fn objective_as_str(options: &crate::types::RunExecutionOptions) -> Option<& .map(str::trim) .filter(|s| !s.is_empty()) } + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + #[test] + fn resolve_deliverable_within_base() { + let tmp = TempDir::new().unwrap(); + let base = tmp.path(); + std::fs::create_dir_all(base.join("deliverable")).unwrap(); + std::fs::write(base.join("deliverable").join("a.txt"), "ok").unwrap(); + let resolved = resolve_deliverable_path(base, "deliverable/a.txt").unwrap(); + assert!(resolved.starts_with(base)); + } + + #[test] + fn resolve_deliverable_rejects_escape() { + let tmp = TempDir::new().unwrap(); + let base = tmp.path(); + let err = resolve_deliverable_path(base, "../outside.txt").unwrap_err(); + let msg = format!("{err}"); + assert!(msg.contains("escapes run store")); + } +}