mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
V4
This commit is contained in:
@@ -2,6 +2,7 @@ use std::fs;
|
||||
use std::io;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
@@ -18,12 +19,19 @@ use codex_core::auth::read_codex_api_key_from_env;
|
||||
use codex_core::auth::read_openai_api_key_from_env;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::config::ConfigOverrides;
|
||||
use codex_core::protocol::AgentMessageEvent;
|
||||
use codex_core::protocol::EventMsg;
|
||||
use codex_infty::AggregatedVerifierVerdict;
|
||||
use codex_infty::DirectiveResponse;
|
||||
use codex_infty::InftyOrchestrator;
|
||||
use codex_infty::ProgressReporter;
|
||||
use codex_infty::ResumeParams;
|
||||
use codex_infty::RoleConfig;
|
||||
use codex_infty::RunExecutionOptions;
|
||||
use codex_infty::RunParams;
|
||||
use codex_infty::RunStore;
|
||||
use codex_infty::VerifierDecision;
|
||||
use codex_infty::VerifierVerdict;
|
||||
use serde::Serialize;
|
||||
|
||||
const DEFAULT_TIMEOUT_SECS: u64 = 60;
|
||||
@@ -113,6 +121,119 @@ struct RunSummary {
|
||||
roles: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct TerminalProgressReporter;
|
||||
|
||||
impl TerminalProgressReporter {
|
||||
fn decision_label(decision: VerifierDecision) -> &'static str {
|
||||
match decision {
|
||||
VerifierDecision::Pass => "pass",
|
||||
VerifierDecision::Fail => "fail",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ProgressReporter for TerminalProgressReporter {
|
||||
fn objective_posted(&self, objective: &str) {
|
||||
println!("→ objective sent to solver: {objective}");
|
||||
}
|
||||
|
||||
fn waiting_for_solver(&self) {
|
||||
println!("Waiting for solver response...");
|
||||
}
|
||||
|
||||
fn solver_event(&self, event: &EventMsg) {
|
||||
match serde_json::to_string_pretty(event) {
|
||||
Ok(json) => {
|
||||
println!("[solver:event]\n{json}");
|
||||
}
|
||||
Err(err) => {
|
||||
println!("[solver:event] (failed to serialize: {err}) {event:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn solver_agent_message(&self, agent_msg: &AgentMessageEvent) {
|
||||
println!("[solver] {}", agent_msg.message);
|
||||
}
|
||||
|
||||
fn direction_request(&self, prompt: &str) {
|
||||
println!("→ solver requested direction: {prompt}");
|
||||
}
|
||||
|
||||
fn director_response(&self, directive: &DirectiveResponse) {
|
||||
match directive.rationale.as_deref() {
|
||||
Some(rationale) if !rationale.is_empty() => {
|
||||
println!(
|
||||
"[director] directive: {} (rationale: {})",
|
||||
directive.directive, rationale
|
||||
);
|
||||
}
|
||||
_ => {
|
||||
println!("[director] directive: {}", directive.directive);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn verification_request(&self, claim_path: &str, notes: Option<&str>) {
|
||||
println!("→ solver requested verification for {claim_path}");
|
||||
if let Some(notes) = notes {
|
||||
if !notes.is_empty() {
|
||||
println!(" notes: {notes}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn verifier_verdict(&self, role: &str, verdict: &VerifierVerdict) {
|
||||
println!(
|
||||
"[{role}] verdict: {}",
|
||||
Self::decision_label(verdict.verdict)
|
||||
);
|
||||
if !verdict.reasons.is_empty() {
|
||||
println!(" reasons: {}", verdict.reasons.join("; "));
|
||||
}
|
||||
if !verdict.suggestions.is_empty() {
|
||||
println!(" suggestions: {}", verdict.suggestions.join("; "));
|
||||
}
|
||||
}
|
||||
|
||||
fn verification_summary(&self, summary: &AggregatedVerifierVerdict) {
|
||||
println!(
|
||||
"Verification summary: {}",
|
||||
Self::decision_label(summary.overall)
|
||||
);
|
||||
for report in &summary.verdicts {
|
||||
println!(
|
||||
" {} → {}",
|
||||
report.role,
|
||||
Self::decision_label(report.verdict)
|
||||
);
|
||||
if !report.reasons.is_empty() {
|
||||
println!(" reasons: {}", report.reasons.join("; "));
|
||||
}
|
||||
if !report.suggestions.is_empty() {
|
||||
println!(" suggestions: {}", report.suggestions.join("; "));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn final_delivery(&self, deliverable_path: &Path, summary: Option<&str>) {
|
||||
println!(
|
||||
"✓ solver reported final delivery at {}",
|
||||
deliverable_path.display()
|
||||
);
|
||||
if let Some(summary) = summary {
|
||||
if !summary.is_empty() {
|
||||
println!(" summary: {summary}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn run_interrupted(&self) {
|
||||
println!("Run interrupted by Ctrl+C. Shutting down sessions…");
|
||||
}
|
||||
}
|
||||
|
||||
impl InftyCli {
|
||||
pub async fn run(self) -> Result<()> {
|
||||
let InftyCli {
|
||||
@@ -158,7 +279,8 @@ async fn run_create(
|
||||
bail!("run {run_id} already exists at {}", run_path.display());
|
||||
}
|
||||
|
||||
let orchestrator = InftyOrchestrator::with_runs_root(auth, runs_root.clone());
|
||||
let orchestrator = InftyOrchestrator::with_runs_root(auth, runs_root.clone())
|
||||
.with_progress(Arc::new(TerminalProgressReporter::default()));
|
||||
let run_params = RunParams {
|
||||
run_id: run_id.clone(),
|
||||
run_root: Some(run_path.clone()),
|
||||
@@ -174,6 +296,11 @@ async fn run_create(
|
||||
options.director_timeout = timeout;
|
||||
options.verifier_timeout = timeout;
|
||||
|
||||
println!("Starting run {run_id} at {}", run_path.display());
|
||||
println!(
|
||||
"Objective: {}",
|
||||
options.objective.as_deref().unwrap_or_default()
|
||||
);
|
||||
let outcome = orchestrator
|
||||
.execute_new_run(run_params, options)
|
||||
.await
|
||||
@@ -291,7 +418,8 @@ async fn run_drive(
|
||||
.map(|role| RoleConfig::new(role.role.clone(), config.clone()))
|
||||
.collect();
|
||||
|
||||
let orchestrator = InftyOrchestrator::with_runs_root(auth, runs_root);
|
||||
let orchestrator = InftyOrchestrator::with_runs_root(auth, runs_root)
|
||||
.with_progress(Arc::new(TerminalProgressReporter::default()));
|
||||
let sessions = orchestrator
|
||||
.resume_run(ResumeParams {
|
||||
run_path: run_path.clone(),
|
||||
|
||||
@@ -12,7 +12,7 @@ dirs = { workspace = true }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
tempfile = { workspace = true }
|
||||
tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread"] }
|
||||
tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread", "signal"] }
|
||||
tokio-stream = { workspace = true }
|
||||
tracing = { workspace = true, features = ["log"] }
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ use codex_core::cross_session::CrossSessionHub;
|
||||
use codex_core::cross_session::PostUserTurnRequest;
|
||||
use codex_core::cross_session::RoleOrId;
|
||||
use codex_core::cross_session::SessionEventStream;
|
||||
use codex_core::protocol::AgentMessageEvent;
|
||||
use codex_core::protocol::EventMsg;
|
||||
use codex_core::protocol::Op;
|
||||
use codex_core::protocol::SessionConfiguredEvent;
|
||||
@@ -33,6 +34,7 @@ use serde::de::DeserializeOwned;
|
||||
use serde::de::Error as _;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
use tokio::signal;
|
||||
use tokio_stream::StreamExt;
|
||||
use tracing::warn;
|
||||
|
||||
@@ -43,6 +45,20 @@ pub use run_store::RoleMetadata;
|
||||
pub use run_store::RunMetadata;
|
||||
pub use run_store::RunStore;
|
||||
|
||||
pub trait ProgressReporter: Send + Sync {
|
||||
fn objective_posted(&self, _objective: &str) {}
|
||||
fn waiting_for_solver(&self) {}
|
||||
fn solver_event(&self, _event: &EventMsg) {}
|
||||
fn solver_agent_message(&self, _message: &AgentMessageEvent) {}
|
||||
fn direction_request(&self, _prompt: &str) {}
|
||||
fn director_response(&self, _directive: &DirectiveResponse) {}
|
||||
fn verification_request(&self, _claim_path: &str, _notes: Option<&str>) {}
|
||||
fn verifier_verdict(&self, _role: &str, _verdict: &VerifierVerdict) {}
|
||||
fn verification_summary(&self, _summary: &AggregatedVerifierVerdict) {}
|
||||
fn final_delivery(&self, _deliverable_path: &Path, _summary: Option<&str>) {}
|
||||
fn run_interrupted(&self) {}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RoleConfig {
|
||||
pub role: String,
|
||||
@@ -139,15 +155,15 @@ enum SolverSignal {
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
struct DirectiveResponse {
|
||||
directive: String,
|
||||
pub struct DirectiveResponse {
|
||||
pub directive: String,
|
||||
#[serde(default)]
|
||||
rationale: Option<String>,
|
||||
pub rationale: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
enum VerifierDecision {
|
||||
pub enum VerifierDecision {
|
||||
Pass,
|
||||
Fail,
|
||||
}
|
||||
@@ -159,30 +175,30 @@ impl VerifierDecision {
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
struct VerifierVerdict {
|
||||
verdict: VerifierDecision,
|
||||
pub struct VerifierVerdict {
|
||||
pub verdict: VerifierDecision,
|
||||
#[serde(default)]
|
||||
reasons: Vec<String>,
|
||||
pub reasons: Vec<String>,
|
||||
#[serde(default)]
|
||||
suggestions: Vec<String>,
|
||||
pub suggestions: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct VerifierReport {
|
||||
role: String,
|
||||
verdict: VerifierDecision,
|
||||
pub struct VerifierReport {
|
||||
pub role: String,
|
||||
pub verdict: VerifierDecision,
|
||||
#[serde(default)]
|
||||
reasons: Vec<String>,
|
||||
pub reasons: Vec<String>,
|
||||
#[serde(default)]
|
||||
suggestions: Vec<String>,
|
||||
pub suggestions: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct AggregatedVerifierVerdict {
|
||||
pub struct AggregatedVerifierVerdict {
|
||||
#[serde(rename = "type")]
|
||||
kind: &'static str,
|
||||
overall: VerifierDecision,
|
||||
verdicts: Vec<VerifierReport>,
|
||||
pub kind: &'static str,
|
||||
pub overall: VerifierDecision,
|
||||
pub verdicts: Vec<VerifierReport>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
@@ -227,6 +243,7 @@ pub struct InftyOrchestrator {
|
||||
hub: Arc<CrossSessionHub>,
|
||||
conversation_manager: ConversationManager,
|
||||
runs_root: PathBuf,
|
||||
progress: Option<Arc<dyn ProgressReporter>>,
|
||||
}
|
||||
|
||||
impl InftyOrchestrator {
|
||||
@@ -240,6 +257,7 @@ impl InftyOrchestrator {
|
||||
hub: Arc::new(CrossSessionHub::new()),
|
||||
conversation_manager: ConversationManager::with_auth(auth),
|
||||
runs_root: runs_root.into(),
|
||||
progress: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -251,6 +269,11 @@ impl InftyOrchestrator {
|
||||
Arc::clone(&self.hub)
|
||||
}
|
||||
|
||||
pub fn with_progress(mut self, reporter: Arc<dyn ProgressReporter>) -> Self {
|
||||
self.progress = Some(reporter);
|
||||
self
|
||||
}
|
||||
|
||||
pub async fn execute_new_run(
|
||||
&self,
|
||||
params: RunParams,
|
||||
@@ -418,74 +441,120 @@ impl InftyOrchestrator {
|
||||
)
|
||||
.await?;
|
||||
sessions.store.touch()?;
|
||||
}
|
||||
|
||||
while let Some(event) = solver_events.next().await {
|
||||
match event.event.msg {
|
||||
EventMsg::AgentMessage(agent_msg) => {
|
||||
if let Some(signal) = parse_solver_signal(&agent_msg.message) {
|
||||
match signal {
|
||||
SolverSignal::DirectionRequest { prompt } => {
|
||||
self.handle_direction_request(sessions, &prompt, options)
|
||||
.await?;
|
||||
sessions.store.touch()?;
|
||||
}
|
||||
SolverSignal::VerificationRequest { claim_path, notes } => {
|
||||
let pass = self
|
||||
.handle_verification_request(
|
||||
sessions,
|
||||
&claim_path,
|
||||
notes.as_deref(),
|
||||
options,
|
||||
)
|
||||
.await?;
|
||||
sessions.store.touch()?;
|
||||
if pass {
|
||||
self.post_to_role(
|
||||
&sessions.run_id,
|
||||
&sessions.solver.role,
|
||||
FINALIZATION_PROMPT.to_string(),
|
||||
Some(final_delivery_schema()),
|
||||
)
|
||||
.await?;
|
||||
sessions.store.touch()?;
|
||||
}
|
||||
}
|
||||
SolverSignal::FinalDelivery {
|
||||
deliverable_path: candidate_path,
|
||||
summary,
|
||||
} => {
|
||||
sessions.store.touch()?;
|
||||
let deliverable_path = resolve_deliverable_path(
|
||||
sessions.store.path(),
|
||||
&candidate_path,
|
||||
)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"invalid deliverable path reported by solver: {candidate_path}"
|
||||
)
|
||||
})?;
|
||||
return Ok(RunOutcome {
|
||||
run_id: sessions.run_id.clone(),
|
||||
deliverable_path,
|
||||
summary,
|
||||
raw_message: agent_msg.message,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
EventMsg::ShutdownComplete => {
|
||||
break;
|
||||
}
|
||||
_ => {}
|
||||
if let Some(progress) = self.progress.as_ref() {
|
||||
progress.objective_posted(objective);
|
||||
progress.waiting_for_solver();
|
||||
}
|
||||
}
|
||||
|
||||
Err(anyhow!(
|
||||
"run {} ended before emitting final_delivery message",
|
||||
sessions.run_id
|
||||
))
|
||||
let mut task_complete_detected = false;
|
||||
loop {
|
||||
tokio::select! {
|
||||
maybe_event = solver_events.next() => {
|
||||
let Some(event) = maybe_event else {
|
||||
break;
|
||||
};
|
||||
if let Some(progress) = self.progress.as_ref() {
|
||||
progress.solver_event(&event.event.msg);
|
||||
}
|
||||
match &event.event.msg {
|
||||
EventMsg::AgentMessage(agent_msg) => {
|
||||
if let Some(progress) = self.progress.as_ref() {
|
||||
progress.solver_agent_message(agent_msg);
|
||||
}
|
||||
if let Some(signal) = parse_solver_signal(&agent_msg.message) {
|
||||
match signal {
|
||||
SolverSignal::DirectionRequest { prompt } => {
|
||||
if let Some(progress) = self.progress.as_ref() {
|
||||
progress.direction_request(&prompt);
|
||||
}
|
||||
self.handle_direction_request(sessions, &prompt, options)
|
||||
.await?;
|
||||
sessions.store.touch()?;
|
||||
}
|
||||
SolverSignal::VerificationRequest { claim_path, notes } => {
|
||||
if let Some(progress) = self.progress.as_ref() {
|
||||
progress.verification_request(&claim_path, notes.as_deref());
|
||||
}
|
||||
let pass = self
|
||||
.handle_verification_request(
|
||||
sessions,
|
||||
&claim_path,
|
||||
notes.as_deref(),
|
||||
options,
|
||||
)
|
||||
.await?;
|
||||
sessions.store.touch()?;
|
||||
if pass {
|
||||
self.post_to_role(
|
||||
&sessions.run_id,
|
||||
&sessions.solver.role,
|
||||
FINALIZATION_PROMPT.to_string(),
|
||||
Some(final_delivery_schema()),
|
||||
)
|
||||
.await?;
|
||||
sessions.store.touch()?;
|
||||
}
|
||||
}
|
||||
SolverSignal::FinalDelivery {
|
||||
deliverable_path: candidate_path,
|
||||
summary,
|
||||
} => {
|
||||
sessions.store.touch()?;
|
||||
let deliverable_path = resolve_deliverable_path(
|
||||
sessions.store.path(),
|
||||
&candidate_path,
|
||||
)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"invalid deliverable path reported by solver: {candidate_path}"
|
||||
)
|
||||
})?;
|
||||
if let Some(progress) = self.progress.as_ref() {
|
||||
progress.final_delivery(&deliverable_path, summary.as_deref());
|
||||
}
|
||||
return Ok(RunOutcome {
|
||||
run_id: sessions.run_id.clone(),
|
||||
deliverable_path,
|
||||
summary,
|
||||
raw_message: agent_msg.message.clone(),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
EventMsg::TaskComplete { .. } => {
|
||||
task_complete_detected = true;
|
||||
}
|
||||
EventMsg::ShutdownComplete => {
|
||||
break;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
result = signal::ctrl_c() => {
|
||||
if let Err(err) = result {
|
||||
return Err(anyhow!("failed to listen for Ctrl+C: {err}"));
|
||||
}
|
||||
if let Some(progress) = self.progress.as_ref() {
|
||||
progress.run_interrupted();
|
||||
}
|
||||
return Err(anyhow!("run {} interrupted by Ctrl+C", sessions.run_id));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if task_complete_detected {
|
||||
Err(anyhow!(
|
||||
"run {} ended without emitting a structured solver signal (direction_request / verification_request / final_delivery)",
|
||||
sessions.run_id
|
||||
))
|
||||
} else {
|
||||
Err(anyhow!(
|
||||
"run {} ended before emitting final_delivery message",
|
||||
sessions.run_id
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_direction_request(
|
||||
@@ -512,6 +581,9 @@ impl InftyOrchestrator {
|
||||
.await?;
|
||||
let directive_payload: DirectiveResponse = parse_json_struct(&directive.message.message)
|
||||
.context("director response was not valid directive JSON")?;
|
||||
if let Some(progress) = self.progress.as_ref() {
|
||||
progress.director_response(&directive_payload);
|
||||
}
|
||||
let directive_text = serde_json::to_string_pretty(&directive_payload)?;
|
||||
let _ = self
|
||||
.post_to_role(
|
||||
@@ -533,6 +605,9 @@ impl InftyOrchestrator {
|
||||
) -> Result<bool> {
|
||||
if sessions.verifiers.is_empty() {
|
||||
let summary = aggregate_verdicts(Vec::new());
|
||||
if let Some(progress) = self.progress.as_ref() {
|
||||
progress.verification_summary(&summary);
|
||||
}
|
||||
let summary_text = serde_json::to_string_pretty(&summary)?;
|
||||
let _ = self
|
||||
.post_to_role(&sessions.run_id, &sessions.solver.role, summary_text, None)
|
||||
@@ -564,10 +639,16 @@ impl InftyOrchestrator {
|
||||
.with_context(|| {
|
||||
format!("verifier {} returned invalid verdict JSON", verifier.role)
|
||||
})?;
|
||||
if let Some(progress) = self.progress.as_ref() {
|
||||
progress.verifier_verdict(&verifier.role, &verdict);
|
||||
}
|
||||
collected.push((verifier.role.clone(), verdict));
|
||||
}
|
||||
|
||||
let summary = aggregate_verdicts(collected);
|
||||
if let Some(progress) = self.progress.as_ref() {
|
||||
progress.verification_summary(&summary);
|
||||
}
|
||||
let summary_text = serde_json::to_string_pretty(&summary)?;
|
||||
let _ = self
|
||||
.post_to_role(&sessions.run_id, &sessions.solver.role, summary_text, None)
|
||||
|
||||
Reference in New Issue
Block a user