This commit is contained in:
jif-oai
2025-10-16 10:27:49 +01:00
parent b45c204109
commit c0baaa171b
11 changed files with 1008 additions and 537 deletions

View File

@@ -1,9 +1,7 @@
use std::any::type_name;
use std::fs;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Context;
use anyhow::Result;
@@ -14,33 +12,29 @@ use codex_core::CodexConversation;
use codex_core::ConversationManager;
use codex_core::config::Config;
use codex_core::config::ConfigOverrides;
use codex_core::cross_session::AssistantMessage;
use codex_core::cross_session::CrossSessionHub;
use codex_core::cross_session::SessionEventStream;
use codex_core::protocol::EventMsg;
use codex_core::protocol::Op;
use codex_protocol::ConversationId;
use serde::Deserialize;
use serde::Serialize;
use serde::de::DeserializeOwned;
use serde::de::Error as _;
use serde_json::Value;
use serde_json::json;
use tokio::signal;
use tokio_stream::StreamExt;
use tracing::warn;
use crate::progress::ProgressReporter;
use crate::prompts;
use crate::roles::Role;
use crate::roles::director::DirectionRequestPayload;
use crate::roles::director::DirectorRole;
use crate::roles::solver::SolverRequest;
use crate::roles::solver::SolverRole;
use crate::roles::solver::SolverSignal;
use crate::roles::solver::parse_solver_signal;
use crate::roles::verifier::VerificationRequestPayload;
use crate::roles::verifier_pool::VerifierPool;
use crate::run_store::RoleMetadata;
use crate::run_store::RunStore;
use crate::session;
use crate::signals::AggregatedVerifierVerdict;
use crate::signals::DirectiveResponse;
use crate::signals::VerifierDecision;
use crate::signals::VerifierReport;
use crate::signals::VerifierVerdict;
use crate::types::FINALIZATION_PROMPT;
use crate::types::RoleConfig;
use crate::types::RoleSession;
use crate::types::RunExecutionOptions;
@@ -48,47 +42,6 @@ use crate::types::RunOutcome;
use crate::types::RunParams;
use crate::types::RunSessions;
#[derive(Debug, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
enum SolverSignal {
DirectionRequest {
#[serde(default)]
prompt: Option<String>,
},
VerificationRequest {
#[serde(default)]
claim_path: Option<String>,
#[serde(default)]
notes: Option<String>,
},
FinalDelivery {
#[serde(default)]
deliverable_path: Option<String>,
#[serde(default)]
summary: Option<String>,
},
}
#[derive(Serialize)]
struct DirectionRequestPayload<'a> {
#[serde(rename = "type")]
kind: &'static str,
prompt: &'a str,
#[serde(skip_serializing_if = "Option::is_none")]
objective: Option<&'a str>,
}
#[derive(Serialize)]
struct VerificationRequestPayload<'a> {
#[serde(rename = "type")]
kind: &'static str,
claim_path: &'a str,
#[serde(skip_serializing_if = "Option::is_none")]
notes: Option<&'a str>,
#[serde(skip_serializing_if = "Option::is_none")]
objective: Option<&'a str>,
}
struct SessionCleanup {
conversation_id: ConversationId,
conversation: Arc<CodexConversation>,
@@ -227,18 +180,34 @@ impl InftyOrchestrator {
sessions: &mut RunSessions,
options: &RunExecutionOptions,
) -> Result<RunOutcome> {
let mut solver_events = self.stream_events(sessions.solver.conversation_id)?;
let solver_role = SolverRole::new(
Arc::clone(&self.hub),
sessions.run_id.clone(),
sessions.solver.role.clone(),
sessions.solver.conversation_id,
self.progress.clone(),
);
let director_role = DirectorRole::new(
Arc::clone(&self.hub),
sessions.run_id.clone(),
sessions.director.role.clone(),
options.director_timeout,
self.progress.clone(),
);
let mut verifier_pool = VerifierPool::from_sessions(
Arc::clone(&self.hub),
sessions,
options.verifier_timeout,
self.progress.clone(),
);
let mut solver_events = solver_role.stream_events()?;
let mut waiting_for_signal = false;
let mut pending_solver_turn_completion = false;
if let Some(objective) = &options.objective {
session::post_turn(
self.hub.as_ref(),
&sessions.run_id,
&sessions.solver.role,
objective.as_str(),
Some(solver_signal_schema()),
)
.await?;
solver_role
.post(objective.as_str(), Some(SolverRole::solver_signal_schema()))
.await?;
sessions.store.touch()?;
waiting_for_signal = true;
if let Some(progress) = self.progress.as_ref() {
@@ -268,47 +237,29 @@ impl InftyOrchestrator {
waiting_for_signal = false;
match signal {
SolverSignal::DirectionRequest { prompt } => {
let prompt = prompt
.and_then(|p| {
let trimmed = p.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
})
.ok_or_else(|| {
anyhow!(
"solver direction_request missing prompt text"
)
})?;
let prompt = crate::utils::required_trimmed(
prompt,
"solver direction_request missing prompt text",
)?;
if let Some(progress) = self.progress.as_ref() {
progress.direction_request(&prompt);
}
self.handle_direction_request(
sessions,
&prompt,
options,
)
.await?;
self
.handle_direction_request(
&prompt,
options,
&director_role,
&solver_role,
)
.await?;
sessions.store.touch()?;
pending_solver_turn_completion = true;
}
SolverSignal::VerificationRequest { claim_path, notes } => {
let claim_path = claim_path
.and_then(|p| {
let trimmed = p.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
})
.ok_or_else(|| {
anyhow!(
"solver verification_request missing claim_path"
)
})?;
let claim_path = crate::utils::required_trimmed(
claim_path,
"solver verification_request missing claim_path",
)?;
if let Some(progress) = self.progress.as_ref() {
progress.verification_request(
&claim_path,
@@ -318,9 +269,11 @@ impl InftyOrchestrator {
let verified = self
.handle_verification_request(
sessions,
&mut verifier_pool,
&claim_path,
notes.as_deref(),
options,
&solver_role,
)
.await?;
sessions.store.touch()?;
@@ -332,35 +285,18 @@ impl InftyOrchestrator {
deliverable_path,
summary,
} => {
let deliverable_path = deliverable_path
.and_then(|p| {
let trimmed = p.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
})
.ok_or_else(|| {
anyhow!(
"solver final_delivery missing deliverable_path"
)
})?;
let deliverable_path = crate::utils::required_trimmed(
deliverable_path,
"solver final_delivery missing deliverable_path",
)?;
if deliverable_path.is_empty() {
bail!("solver final_delivery provided empty path");
}
let resolved = resolve_deliverable_path(
let resolved = crate::utils::resolve_deliverable_path(
sessions.store.path(),
&deliverable_path,
)?;
let summary_clean = summary.and_then(|s| {
let trimmed = s.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
});
let summary_clean = crate::utils::trim_to_non_empty(summary);
let summary_ref = summary_clean.as_deref();
if let Some(progress) = self.progress.as_ref() {
progress.final_delivery(&resolved, summary_ref);
@@ -368,9 +304,11 @@ impl InftyOrchestrator {
let verified = self
.run_final_verification(
sessions,
&mut verifier_pool,
&resolved,
summary_ref,
options,
&solver_role,
)
.await?;
if !verified {
@@ -391,8 +329,7 @@ impl InftyOrchestrator {
EventMsg::TaskComplete(..) => {
if waiting_for_signal {
// The solver completed its turn without issuing a signal; ask for one now.
self.request_solver_signal(&sessions.run_id, &sessions.solver.role)
.await?;
solver_role.request_finalization_signal().await?;
} else if pending_solver_turn_completion {
// We handled a signal earlier in the loop; this completion corresponds to it.
pending_solver_turn_completion = false;
@@ -405,8 +342,7 @@ impl InftyOrchestrator {
if let Some(progress) = self.progress.as_ref() {
progress.run_interrupted();
}
let cleanup = collect_session_cleanup(sessions);
self.shutdown_sessions(cleanup).await;
// Cleanup is handled by the caller (drive_run) to avoid double-shutdown
bail!("run interrupted by Ctrl+C");
}
}
@@ -420,80 +356,62 @@ impl InftyOrchestrator {
async fn handle_direction_request(
&self,
sessions: &RunSessions,
prompt: &str,
options: &RunExecutionOptions,
director_role: &DirectorRole,
solver_role: &SolverRole,
) -> Result<()> {
let request = DirectionRequestPayload {
kind: "direction_request",
prompt,
objective: options.objective.as_deref(),
};
let request_text = serde_json::to_string_pretty(&request)?;
let handle = session::post_turn(
self.hub.as_ref(),
&sessions.run_id,
&sessions.director.role,
request_text,
Some(directive_response_schema()),
)
.await?;
let progress = self
.progress
.as_deref()
.map(|reporter| (reporter, "director"));
let directive = session::await_first_idle(
self.hub.as_ref(),
&handle,
options.director_timeout,
progress,
)
.await?;
let directive_payload: DirectiveResponse = parse_json_struct(&directive.message.message)
let request = DirectionRequestPayload::new(prompt, options.objective.as_deref());
let directive_payload = director_role
.call(&request)
.await
.context("director response was not valid directive JSON")?;
if let Some(progress) = self.progress.as_ref() {
progress.director_response(&directive_payload);
}
let directive_text = serde_json::to_string_pretty(&directive_payload)?;
session::post_turn(
self.hub.as_ref(),
&sessions.run_id,
&sessions.solver.role,
directive_text,
Some(solver_signal_schema()),
)
.await?;
let req = SolverRequest::from(directive_payload);
solver_role.call(&req).await?;
Ok(())
}
async fn handle_verification_request(
&self,
sessions: &mut RunSessions,
verifier_pool: &mut VerifierPool,
claim_path: &str,
notes: Option<&str>,
options: &RunExecutionOptions,
solver_role: &SolverRole,
) -> Result<bool> {
let objective = options
.objective
.as_deref()
.map(str::trim)
.filter(|objective| !objective.is_empty());
let objective = crate::utils::objective_as_str(options);
let summary = self
.collect_verification_summary(sessions, claim_path, notes, objective, options)
.await?;
let request = VerificationRequestPayload::new(claim_path, notes, objective);
if verifier_pool.is_empty() {
return Ok(true);
}
let round = verifier_pool.collect_round(&request).await?;
for role in &round.passing_roles {
if let Err(err) = self.replace_verifier_session(sessions, role).await {
warn!(role = %role, ?err, "failed to replace verifier session; keeping existing");
} else {
verifier_pool.replace_role(role);
}
}
let summary = round.summary;
self.emit_verification_summary(&summary);
self.post_verification_summary_to_solver(sessions, &summary)
.await?;
let req = SolverRequest::from(&summary);
solver_role.call(&req).await?;
Ok(summary.overall.is_pass())
}
async fn run_final_verification(
&self,
sessions: &mut RunSessions,
verifier_pool: &mut VerifierPool,
deliverable_path: &Path,
summary: Option<&str>,
options: &RunExecutionOptions,
solver_role: &SolverRole,
) -> Result<bool> {
let relative = deliverable_path
.strip_prefix(sessions.store.path())
@@ -501,112 +419,25 @@ impl InftyOrchestrator {
.and_then(|p| p.to_str().map(|s| s.to_string()));
let claim_path = relative.unwrap_or_else(|| deliverable_path.display().to_string());
let objective = options
.objective
.as_deref()
.map(str::trim)
.filter(|objective| !objective.is_empty());
let objective = crate::utils::objective_as_str(options);
let summary_result = self
.collect_verification_summary(
sessions,
claim_path.as_str(),
summary,
objective,
options,
)
.await?;
self.emit_verification_summary(&summary_result);
self.post_verification_summary_to_solver(sessions, &summary_result)
.await?;
Ok(summary_result.overall.is_pass())
}
async fn request_solver_signal(&self, run_id: &str, solver_role: &str) -> Result<()> {
let handle = session::post_turn(
self.hub.as_ref(),
run_id,
solver_role,
FINALIZATION_PROMPT,
Some(final_delivery_schema()),
)
.await?;
let _ = session::await_first_idle(self.hub.as_ref(), &handle, Duration::from_secs(5), None)
.await?;
Ok(())
}
async fn collect_verification_summary(
&self,
sessions: &mut RunSessions,
claim_path: &str,
notes: Option<&str>,
objective: Option<&str>,
options: &RunExecutionOptions,
) -> Result<AggregatedVerifierVerdict> {
if sessions.verifiers.is_empty() {
return Ok(aggregate_verdicts(Vec::new()));
let request = VerificationRequestPayload::new(claim_path.as_str(), summary, objective);
if verifier_pool.is_empty() {
return Ok(true);
}
let request = VerificationRequestPayload {
kind: "verification_request",
claim_path,
notes,
objective,
};
let request_text = serde_json::to_string_pretty(&request)?;
let mut results: Vec<(String, VerifierVerdict)> =
Vec::with_capacity(sessions.verifiers.len());
for verifier in &sessions.verifiers {
let handle = session::post_turn(
self.hub.as_ref(),
&sessions.run_id,
&verifier.role,
request_text.clone(),
Some(verifier_verdict_schema()),
)
.await?;
let progress = self
.progress
.as_deref()
.map(|reporter| (reporter, verifier.role.as_str()));
let response = session::await_first_idle(
self.hub.as_ref(),
&handle,
options.verifier_timeout,
progress,
)
.await?;
let verdict: VerifierVerdict = parse_json_struct(&response.message.message)
.with_context(|| {
format!("verifier {} returned invalid verdict JSON", verifier.role)
})?;
if let Some(progress) = self.progress.as_ref() {
progress.verifier_verdict(&verifier.role, &verdict);
}
results.push((verifier.role.clone(), verdict));
}
// Replace any verifier that passed with a fresh session; keep failures.
// Build a set of roles to replace to avoid borrowing issues while mutating.
let to_replace: Vec<String> = results
.iter()
.filter_map(|(role, verdict)| {
if verdict.verdict.is_pass() {
Some(role.clone())
} else {
None
}
})
.collect();
for role in to_replace {
if let Err(err) = self.replace_verifier_session(sessions, &role).await {
let round = verifier_pool.collect_round(&request).await?;
for role in &round.passing_roles {
if let Err(err) = self.replace_verifier_session(sessions, role).await {
warn!(role = %role, ?err, "failed to replace verifier session; keeping existing");
} else {
verifier_pool.replace_role(role);
}
}
// Aggregate directly from the collected results
Ok(aggregate_verdicts(results))
let summary_result = round.summary;
self.emit_verification_summary(&summary_result);
let req = SolverRequest::from(&summary_result);
solver_role.call(&req).await?;
Ok(summary_result.overall.is_pass())
}
async fn replace_verifier_session(&self, sessions: &mut RunSessions, role: &str) -> Result<()> {
@@ -656,23 +487,6 @@ impl InftyOrchestrator {
}
}
async fn post_verification_summary_to_solver(
&self,
sessions: &RunSessions,
summary: &AggregatedVerifierVerdict,
) -> Result<()> {
let summary_text = serde_json::to_string_pretty(summary)?;
session::post_turn(
self.hub.as_ref(),
&sessions.run_id,
&sessions.solver.role,
summary_text,
Some(solver_signal_schema()),
)
.await?;
Ok(())
}
async fn cleanup_failed_spawn(&self, sessions: Vec<SessionCleanup>, run_path: &Path) {
self.shutdown_sessions(sessions).await;
if run_path.exists()
@@ -704,56 +518,6 @@ impl InftyOrchestrator {
}
}
pub fn stream_events(
&self,
conversation_id: ConversationId,
) -> Result<SessionEventStream, codex_core::cross_session::CrossSessionError> {
self.hub.stream_events(conversation_id)
}
pub async fn call_role(
&self,
run_id: &str,
role: &str,
text: impl Into<String>,
timeout: Duration,
final_output_json_schema: Option<Value>,
) -> Result<AssistantMessage> {
let handle = session::post_turn(
self.hub.as_ref(),
run_id,
role,
text,
final_output_json_schema,
)
.await?;
let progress = self.progress.as_deref().map(|reporter| (reporter, role));
session::await_first_idle(self.hub.as_ref(), &handle, timeout, progress).await
}
pub async fn relay_assistant_to_role(
&self,
run_id: &str,
target_role: &str,
assistant: &AssistantMessage,
timeout: Duration,
final_output_json_schema: Option<Value>,
) -> Result<AssistantMessage> {
let handle = session::post_turn(
self.hub.as_ref(),
run_id,
target_role,
assistant.message.message.clone(),
final_output_json_schema,
)
.await?;
let progress = self
.progress
.as_deref()
.map(|reporter| (reporter, target_role));
session::await_first_idle(self.hub.as_ref(), &handle, timeout, progress).await
}
async fn spawn_and_register_role(
&self,
run_id: &str,
@@ -791,8 +555,15 @@ impl InftyOrchestrator {
claim_path: &str,
options: &RunExecutionOptions,
) -> Result<AggregatedVerifierVerdict> {
self.collect_verification_summary(sessions, claim_path, None, None, options)
.await
let pool = VerifierPool::from_sessions(
Arc::clone(&self.hub),
sessions,
options.verifier_timeout,
self.progress.clone(),
);
let req = VerificationRequestPayload::new(claim_path, None, None);
let round = pool.collect_round(&req).await?;
Ok(round.summary)
}
}
@@ -833,168 +604,3 @@ fn collect_session_cleanup(sessions: &RunSessions) -> Vec<SessionCleanup> {
cleanup.extend(sessions.verifiers.iter().map(SessionCleanup::new));
cleanup
}
fn parse_solver_signal(message: &str) -> Option<SolverSignal> {
let trimmed = message.trim();
if trimmed.is_empty() {
return None;
}
serde_json::from_str(trimmed)
.or_else(|_| {
strip_json_code_fence(trimmed)
.map(|inner| serde_json::from_str(inner.trim()))
.unwrap_or_else(|| Err(serde_json::Error::custom("invalid payload")))
})
.ok()
}
fn strip_json_code_fence(text: &str) -> Option<&str> {
let trimmed = text.trim();
if let Some(rest) = trimmed.strip_prefix("```json") {
return rest.strip_suffix("```").map(str::trim);
}
if let Some(rest) = trimmed.strip_prefix("```JSON") {
return rest.strip_suffix("```").map(str::trim);
}
if let Some(rest) = trimmed.strip_prefix("```") {
return rest.strip_suffix("```").map(str::trim);
}
None
}
fn parse_json_struct<T>(message: &str) -> Result<T>
where
T: DeserializeOwned,
{
let trimmed = message.trim();
if trimmed.is_empty() {
return Err(anyhow!("message was empty"));
}
serde_json::from_str(trimmed)
.or_else(|err| {
strip_json_code_fence(trimmed)
.map(|inner| serde_json::from_str(inner))
.unwrap_or_else(|| Err(err))
})
.map_err(|err| anyhow!(err))
.with_context(|| format!("failed to parse message as {}", type_name::<T>()))
}
fn aggregate_verdicts(items: Vec<(String, VerifierVerdict)>) -> AggregatedVerifierVerdict {
let mut overall = VerifierDecision::Pass;
let mut verdicts = Vec::with_capacity(items.len());
for (role, verdict) in items {
if !verdict.verdict.is_pass() {
overall = VerifierDecision::Fail;
}
verdicts.push(VerifierReport {
role,
verdict: verdict.verdict,
reasons: verdict.reasons,
suggestions: verdict.suggestions,
});
}
AggregatedVerifierVerdict {
kind: "verification_feedback",
overall,
verdicts,
}
}
fn solver_signal_schema() -> Value {
json!({
"type": "object",
"properties": {
"type": {
"type": "string",
"enum": ["direction_request", "verification_request", "final_delivery"]
},
"prompt": { "type": ["string", "null"] },
"claim_path": { "type": ["string", "null"] },
"notes": { "type": ["string", "null"] },
"deliverable_path": { "type": ["string", "null"] },
"summary": { "type": ["string", "null"] }
},
"required": [
"type",
"prompt",
"claim_path",
"notes",
"deliverable_path",
"summary"
],
"additionalProperties": false
})
}
fn final_delivery_schema() -> Value {
json!({
"type": "object",
"required": ["type", "deliverable_path", "summary"],
"properties": {
"type": { "const": "final_delivery" },
"deliverable_path": { "type": "string" },
"summary": { "type": ["string", "null"] }
},
"additionalProperties": false
})
}
fn directive_response_schema() -> Value {
json!({
"type": "object",
"required": ["directive", "rationale"],
"properties": {
"directive": { "type": "string" },
"rationale": { "type": ["string", "null"] }
},
"additionalProperties": false
})
}
fn verifier_verdict_schema() -> Value {
json!({
"type": "object",
"required": ["verdict", "reasons", "suggestions"],
"properties": {
"verdict": { "type": "string", "enum": ["pass", "fail"] },
"reasons": { "type": "array", "items": { "type": "string" } },
"suggestions": { "type": "array", "items": { "type": "string" } }
},
"additionalProperties": false
})
}
fn resolve_deliverable_path(base: &Path, candidate: &str) -> Result<PathBuf> {
let base_abs = base
.canonicalize()
.with_context(|| format!("failed to canonicalize run store {}", base.display()))?;
let candidate_path = Path::new(candidate);
let joined = if candidate_path.is_absolute() {
candidate_path.to_path_buf()
} else {
base_abs.join(candidate_path)
};
let resolved = joined.canonicalize().with_context(|| {
format!(
"failed to canonicalize deliverable path {}",
joined.display()
)
})?;
if !resolved.starts_with(&base_abs) {
bail!(
"deliverable path {} escapes run store {}",
resolved.display(),
base_abs.display()
);
}
Ok(resolved)
}