This commit is contained in:
jif-oai
2025-10-14 11:46:27 +01:00
parent f72e9da7c5
commit cb99d71f57
7 changed files with 237 additions and 80 deletions

1
codex-rs/Cargo.lock generated
View File

@@ -970,6 +970,7 @@ dependencies = [
"supports-color",
"tempfile",
"tokio",
"tracing",
]
[[package]]

View File

@@ -48,6 +48,7 @@ tokio = { workspace = true, features = [
codex-infty = { path = "../codex-infty" }
chrono = { workspace = true }
serde = { workspace = true, features = ["derive"] }
tracing = "0.1.41"
[dev-dependencies]
assert_matches = { workspace = true }

View File

@@ -145,10 +145,10 @@ impl ProgressReporter for TerminalProgressReporter {
fn solver_event(&self, event: &EventMsg) {
match serde_json::to_string_pretty(event) {
Ok(json) => {
println!("[solver:event]\n{json}");
tracing::trace!("[solver:event]\n{json}");
}
Err(err) => {
println!("[solver:event] (failed to serialize: {err}) {event:?}");
tracing::warn!("[solver:event] (failed to serialize: {err}) {event:?}");
}
}
}

View File

@@ -23,8 +23,10 @@ use codex_core::cross_session::PostUserTurnRequest;
use codex_core::cross_session::RoleOrId;
use codex_core::cross_session::SessionEventStream;
use codex_core::protocol::AgentMessageEvent;
use codex_core::protocol::AskForApproval;
use codex_core::protocol::EventMsg;
use codex_core::protocol::Op;
use codex_core::protocol::SandboxPolicy;
use codex_core::protocol::SessionConfiguredEvent;
use codex_protocol::ConversationId;
use dirs::home_dir;
@@ -67,7 +69,9 @@ pub struct RoleConfig {
}
impl RoleConfig {
pub fn new(role: impl Into<String>, config: Config) -> Self {
pub fn new(role: impl Into<String>, mut config: Config) -> Self {
config.sandbox_policy = SandboxPolicy::DangerFullAccess;
config.approval_policy = AskForApproval::Never;
Self {
role: role.into(),
config,
@@ -101,7 +105,7 @@ pub struct ResumeParams {
const DEFAULT_DIRECTOR_TIMEOUT: Duration = Duration::from_secs(120);
const DEFAULT_VERIFIER_TIMEOUT: Duration = Duration::from_secs(180);
const FINALIZATION_PROMPT: &str = "Create deliverable/: include compiled artifacts or scripts, usage docs, and tests. Write deliverable/README.md with overview, manifest (paths and sizes), verification steps, and limitations. Remove scratch files. Reply with JSON: {\"type\":\"final_delivery\",\"deliverable_path\":\"<path>\",\"summary\":\"<summary>\"}.";
const FINALIZATION_PROMPT: &str = "Create deliverable/: include compiled artifacts or scripts, usage docs, and tests. Write deliverable/summary.txt capturing the final answer, evidence, and follow-up steps. Also provide deliverable/README.md with overview, manifest (paths and sizes), verification steps, and limitations. Remove scratch files. Reply with JSON: {\"type\":\"final_delivery\",\"deliverable_path\":\"deliverable/summary.txt\",\"summary\":\"<answer plus supporting context>\"}.";
#[derive(Clone)]
pub struct RunExecutionOptions {
@@ -140,15 +144,17 @@ pub struct RoleSession {
enum SolverSignal {
DirectionRequest {
#[serde(default)]
prompt: String,
prompt: Option<String>,
},
VerificationRequest {
claim_path: String,
#[serde(default)]
claim_path: Option<String>,
#[serde(default)]
notes: Option<String>,
},
FinalDelivery {
deliverable_path: String,
#[serde(default)]
deliverable_path: Option<String>,
#[serde(default)]
summary: Option<String>,
},
@@ -431,28 +437,29 @@ impl InftyOrchestrator {
options: &RunExecutionOptions,
) -> Result<RunOutcome> {
let mut solver_events = self.stream_events(sessions.solver.conversation_id)?;
let mut waiting_for_signal = false;
let mut pending_solver_turn_completion = false;
if let Some(objective) = &options.objective {
self.post_to_role(
&sessions.run_id,
&sessions.solver.role,
objective.as_str(),
None,
Some(solver_signal_schema()),
)
.await?;
sessions.store.touch()?;
waiting_for_signal = true;
if let Some(progress) = self.progress.as_ref() {
progress.objective_posted(objective);
progress.waiting_for_solver();
}
}
let mut task_complete_detected = false;
loop {
'event_loop: loop {
tokio::select! {
maybe_event = solver_events.next() => {
let Some(event) = maybe_event else {
break;
break 'event_loop;
};
if let Some(progress) = self.progress.as_ref() {
progress.solver_event(&event.event.msg);
@@ -463,18 +470,53 @@ impl InftyOrchestrator {
progress.solver_agent_message(agent_msg);
}
if let Some(signal) = parse_solver_signal(&agent_msg.message) {
waiting_for_signal = false;
match signal {
SolverSignal::DirectionRequest { prompt } => {
let prompt = prompt
.and_then(|p| {
let trimmed = p.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
})
.ok_or_else(|| {
anyhow!(
"solver direction_request missing prompt text"
)
})?;
if let Some(progress) = self.progress.as_ref() {
progress.direction_request(&prompt);
}
self.handle_direction_request(sessions, &prompt, options)
.await?;
self.handle_direction_request(
sessions,
&prompt,
options,
)
.await?;
sessions.store.touch()?;
pending_solver_turn_completion = true;
}
SolverSignal::VerificationRequest { claim_path, notes } => {
let claim_path = claim_path
.and_then(|p| {
let trimmed = p.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
})
.ok_or_else(|| {
anyhow!(
"solver verification_request missing claim_path"
)
})?;
if let Some(progress) = self.progress.as_ref() {
progress.verification_request(&claim_path, notes.as_deref());
progress
.verification_request(&claim_path, notes.as_deref());
}
let pass = self
.handle_verification_request(
@@ -485,6 +527,7 @@ impl InftyOrchestrator {
)
.await?;
sessions.store.touch()?;
pending_solver_turn_completion = true;
if pass {
self.post_to_role(
&sessions.run_id,
@@ -494,6 +537,7 @@ impl InftyOrchestrator {
)
.await?;
sessions.store.touch()?;
pending_solver_turn_completion = true;
}
}
SolverSignal::FinalDelivery {
@@ -501,17 +545,91 @@ impl InftyOrchestrator {
summary,
} => {
sessions.store.touch()?;
let deliverable_path = resolve_deliverable_path(
sessions.store.path(),
&candidate_path,
)
.with_context(|| {
format!(
"invalid deliverable path reported by solver: {candidate_path}"
let summary_clone = summary.clone();
let mut deliverable_path = match candidate_path
.and_then(|p| {
let trimmed = p.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
})
{
Some(candidate) => resolve_deliverable_path(
sessions.store.path(),
&candidate,
)
})?;
.with_context(|| {
format!(
"invalid deliverable path reported by solver: {candidate}"
)
})?,
None => {
let fallback_dir =
sessions.store.path().join("deliverable");
fs::create_dir_all(&fallback_dir).with_context(|| {
format!(
"failed to create fallback deliverable directory at {}",
fallback_dir.display()
)
})?;
fallback_dir.join("summary.txt")
}
};
let content = summary_clone
.clone()
.or_else(|| Some(agent_msg.message.clone()))
.unwrap_or_default();
if deliverable_path.exists() {
if deliverable_path.is_dir() {
let fallback_file =
deliverable_path.join("summary.txt");
fs::create_dir_all(
fallback_file.parent().unwrap_or(&deliverable_path),
)
.with_context(|| {
format!(
"failed to prepare fallback deliverable directory at {}",
deliverable_path.display()
)
})?;
fs::write(&fallback_file, &content).with_context(
|| {
format!(
"failed to write fallback deliverable at {}",
fallback_file.display()
)
},
)?;
deliverable_path = fallback_file;
}
} else {
if let Some(parent) = deliverable_path.parent() {
fs::create_dir_all(parent).with_context(|| {
format!(
"failed to create deliverable directory at {}",
parent.display()
)
})?;
}
fs::write(&deliverable_path, &content).with_context(
|| {
format!(
"failed to write deliverable file at {}",
deliverable_path.display()
)
},
)?;
}
if let Some(progress) = self.progress.as_ref() {
progress.final_delivery(&deliverable_path, summary.as_deref());
progress.final_delivery(
&deliverable_path,
summary.as_deref(),
);
}
return Ok(RunOutcome {
run_id: sessions.run_id.clone(),
@@ -524,10 +642,21 @@ impl InftyOrchestrator {
}
}
EventMsg::TaskComplete { .. } => {
task_complete_detected = true;
if pending_solver_turn_completion {
pending_solver_turn_completion = false;
waiting_for_signal = true;
continue;
}
if waiting_for_signal {
return Err(anyhow!(
"run {} ended without emitting a structured solver signal (direction_request / verification_request / final_delivery)",
sessions.run_id
));
}
break 'event_loop;
}
EventMsg::ShutdownComplete => {
break;
break 'event_loop;
}
_ => {}
}
@@ -544,17 +673,10 @@ impl InftyOrchestrator {
}
}
if task_complete_detected {
Err(anyhow!(
"run {} ended without emitting a structured solver signal (direction_request / verification_request / final_delivery)",
sessions.run_id
))
} else {
Err(anyhow!(
"run {} ended before emitting final_delivery message",
sessions.run_id
))
}
Err(anyhow!(
"run {} ended before emitting final_delivery message",
sessions.run_id
))
}
async fn handle_direction_request(
@@ -573,7 +695,7 @@ impl InftyOrchestrator {
&sessions.run_id,
&sessions.director.role,
request_text,
Some(director_schema()),
None,
)
.await?;
let directive = self
@@ -590,7 +712,7 @@ impl InftyOrchestrator {
&sessions.run_id,
&sessions.solver.role,
directive_text,
None,
Some(solver_signal_schema()),
)
.await?;
Ok(())
@@ -610,7 +732,12 @@ impl InftyOrchestrator {
}
let summary_text = serde_json::to_string_pretty(&summary)?;
let _ = self
.post_to_role(&sessions.run_id, &sessions.solver.role, summary_text, None)
.post_to_role(
&sessions.run_id,
&sessions.solver.role,
summary_text,
Some(solver_signal_schema()),
)
.await?;
return Ok(true);
}
@@ -622,14 +749,13 @@ impl InftyOrchestrator {
};
let request_text = serde_json::to_string_pretty(&request)?;
let mut collected = Vec::with_capacity(sessions.verifiers.len());
let schema = verifier_schema();
for verifier in &sessions.verifiers {
let handle = self
.post_to_role(
&sessions.run_id,
&verifier.role,
request_text.as_str(),
Some(schema.clone()),
None,
)
.await?;
let response = self
@@ -651,7 +777,12 @@ impl InftyOrchestrator {
}
let summary_text = serde_json::to_string_pretty(&summary)?;
let _ = self
.post_to_role(&sessions.run_id, &sessions.solver.role, summary_text, None)
.post_to_role(
&sessions.run_id,
&sessions.solver.role,
summary_text,
Some(solver_signal_schema()),
)
.await?;
Ok(summary.overall.is_pass())
}
@@ -971,27 +1102,28 @@ fn aggregate_verdicts(items: Vec<(String, VerifierVerdict)>) -> AggregatedVerifi
}
}
fn director_schema() -> Value {
fn solver_signal_schema() -> Value {
json!({
"type": "object",
"required": ["directive"],
"properties": {
"directive": { "type": "string" },
"rationale": { "type": "string" }
},
"additionalProperties": false
})
}
fn verifier_schema() -> Value {
json!({
"type": "object",
"required": ["verdict"],
"properties": {
"verdict": { "type": "string", "enum": ["pass", "fail"] },
"reasons": { "type": "array", "items": { "type": "string" } },
"suggestions": { "type": "array", "items": { "type": "string" } }
"type": {
"type": "string",
"enum": ["direction_request", "verification_request", "final_delivery"]
},
"prompt": { "type": ["string", "null"] },
"claim_path": { "type": ["string", "null"] },
"notes": { "type": ["string", "null"] },
"deliverable_path": { "type": ["string", "null"] },
"summary": { "type": ["string", "null"] }
},
"required": [
"type",
"prompt",
"claim_path",
"notes",
"deliverable_path",
"summary"
],
"additionalProperties": false
})
}
@@ -1003,7 +1135,7 @@ fn final_delivery_schema() -> Value {
"properties": {
"type": { "const": "final_delivery" },
"deliverable_path": { "type": "string" },
"summary": { "type": "string" }
"summary": { "type": ["string", "null"] }
},
"additionalProperties": false
})

View File

@@ -15,23 +15,23 @@ Available Codex tools mirror standard Codex sessions (e.g. `shell`, `apply_patch
## Communication contract
The orchestrator routes your structured messages to the Director or Verifier roles. Respond with **JSON only**—no leading prose or trailing commentary. Wrap JSON in a fenced block only if the agent policy forces it.
- Every reply must populate the full schema, even when a field does not apply. Set unused string fields to `null`.
- Direction request (send to Director):
```json
{"type":"direction_request","prompt":"<concise question or decision>"}
{"type":"direction_request","prompt":"<concise question or decision>","claim_path":null,"notes":null,"deliverable_path":null,"summary":null}
```
- Verification request (send to Verifier):
```json
{"type":"verification_request","claim_path":"memory/claims/<file>.json","notes":"<optional context>"}
{"type":"verification_request","prompt":null,"claim_path":"memory/claims/<file>.json","notes":null,"deliverable_path":null,"summary":null}
```
- Final delivery (after receiving the finalization instruction):
```json
{"type":"final_delivery","deliverable_path":"deliverable","summary":"<one paragraph>"}
{"type":"final_delivery","prompt":null,"claim_path":null,"notes":null,"deliverable_path":"deliverable/summary.txt","summary":"<answer plus supporting context>"}
```
If you have nothing to add for `notes`, omit the field.
## Operating rhythm
- Never ask humans for approval to continue; the orchestrator supplies direction via the Director role.
- Create `deliverable/summary.txt` before every final delivery. Capture the final answer, how you reached it, and any follow-up instructions.
- Keep the run resilient to restarts: document intent, intermediate results, and follow-up tasks in `memory/`.
- Prefer concrete evidence (tests, diffs, logs). Link every claim to artifacts or durable notes so the Verifier can reproduce your reasoning.
- On failure feedback from a Verifier, update artifacts/notes/tests, then issue a new verification request referencing the superseding claim.

View File

@@ -130,7 +130,9 @@ async fn orchestrator_resumes_existing_run() -> anyhow::Result<()> {
responses::ev_completed("solver-resp-2"),
]),
];
responses::mount_sse_sequence(&server, bodies).await;
for body in bodies {
responses::mount_sse_once(&server, body).await;
}
let runs_root = TempDir::new()?;
let orchestrator =
@@ -150,9 +152,18 @@ async fn orchestrator_resumes_existing_run() -> anyhow::Result<()> {
})
.await?;
sessions.solver.conversation.submit(Op::Shutdown).await.ok();
sessions
.director
.conversation
.submit(Op::Shutdown)
.await
.ok();
drop(sessions);
let resume = orchestrator
.resume_run(ResumeParams {
run_path: sessions.store.path().to_path_buf(),
run_path: runs_root.path().join("runs").join(&run_id),
solver: RoleConfig::new("solver", solver_config),
director: RoleConfig::new("director", director_config),
verifiers: Vec::new(),
@@ -181,7 +192,7 @@ async fn execute_new_run_drives_to_completion() -> anyhow::Result<()> {
responses::ev_response_created("solver-resp-1"),
responses::ev_assistant_message(
"solver-msg-1",
r#"{"type":"direction_request","prompt":"Need directive"}"#,
r#"{"type":"direction_request","prompt":"Need directive","claim_path":null,"notes":null,"deliverable_path":null,"summary":null}"#,
),
responses::ev_completed("solver-resp-1"),
]),
@@ -197,7 +208,7 @@ async fn execute_new_run_drives_to_completion() -> anyhow::Result<()> {
responses::ev_response_created("solver-resp-2"),
responses::ev_assistant_message(
"solver-msg-2",
r#"{"type":"verification_request","claim_path":"memory/claims/attempt1.json"}"#,
r#"{"type":"verification_request","prompt":null,"claim_path":"memory/claims/attempt1.json","notes":null,"deliverable_path":null,"summary":null}"#,
),
responses::ev_completed("solver-resp-2"),
]),
@@ -213,7 +224,7 @@ async fn execute_new_run_drives_to_completion() -> anyhow::Result<()> {
responses::ev_response_created("solver-resp-3"),
responses::ev_assistant_message(
"solver-msg-3",
r#"{"type":"verification_request","claim_path":"memory/claims/attempt2.json"}"#,
r#"{"type":"verification_request","prompt":null,"claim_path":"memory/claims/attempt2.json","notes":null,"deliverable_path":null,"summary":null}"#,
),
responses::ev_completed("solver-resp-3"),
]),
@@ -229,12 +240,14 @@ async fn execute_new_run_drives_to_completion() -> anyhow::Result<()> {
responses::ev_response_created("solver-resp-4"),
responses::ev_assistant_message(
"solver-msg-4",
r#"{"type":"final_delivery","deliverable_path":"deliverable","summary":"done"}"#,
r#"{"type":"final_delivery","prompt":null,"claim_path":null,"notes":null,"deliverable_path":"deliverable","summary":"done"}"#,
),
responses::ev_completed("solver-resp-4"),
]),
];
responses::mount_sse_sequence(&server, bodies).await;
for body in bodies {
responses::mount_sse_once(&server, body).await;
}
let runs_root = TempDir::new()?;
let orchestrator =
@@ -267,8 +280,9 @@ async fn execute_new_run_drives_to_completion() -> anyhow::Result<()> {
assert_eq!(outcome.run_id, run_id);
assert_eq!(outcome.summary.as_deref(), Some("done"));
assert!(outcome.raw_message.contains("final_delivery"));
assert!(outcome.deliverable_path.starts_with(&run_root));
assert_eq!(outcome.deliverable_path, run_root.join("deliverable"));
let canonical_run_root = std::fs::canonicalize(&run_root)?;
let canonical_deliverable = std::fs::canonicalize(&outcome.deliverable_path)?;
assert!(canonical_deliverable.starts_with(&canonical_run_root));
Ok(())
}
@@ -292,7 +306,9 @@ async fn spawn_run_cleans_up_on_failure() -> anyhow::Result<()> {
responses::ev_completed("dup-resp"),
]),
];
responses::mount_sse_sequence(&server, bodies).await;
for body in bodies {
responses::mount_sse_once(&server, body).await;
}
let runs_root = TempDir::new()?;
let orchestrator =
@@ -325,7 +341,9 @@ async fn spawn_run_cleans_up_on_failure() -> anyhow::Result<()> {
responses::ev_completed("director-resp-2"),
]),
];
responses::mount_sse_sequence(&server, bodies).await;
for body in bodies {
responses::mount_sse_once(&server, body).await;
}
let sessions = orchestrator
.spawn_run(RunParams {

View File

@@ -987,11 +987,16 @@ impl Session {
}
async fn notify_stream_error(&self, sub_id: &str, message: impl Into<String>) {
let message = message.into();
warn!(
conversation_id = %self.conversation_id,
sub_id = %sub_id,
%message,
"stream error while streaming model response",
);
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::StreamError(StreamErrorEvent {
message: message.into(),
}),
msg: EventMsg::StreamError(StreamErrorEvent { message }),
};
self.send_event(event).await;
}