This commit is contained in:
jif-oai
2025-10-14 17:03:43 +01:00
parent 87362d6ebd
commit c0f8a49e3e
4 changed files with 578 additions and 222 deletions

View File

@@ -12,11 +12,8 @@ use anyhow::bail;
use codex_core::CodexAuth;
use codex_core::CodexConversation;
use codex_core::ConversationManager;
use codex_core::CrossSessionSpawnParams;
use codex_core::cross_session::AssistantMessage;
use codex_core::cross_session::CrossSessionHub;
use codex_core::cross_session::PostUserTurnRequest;
use codex_core::cross_session::RoleOrId;
use codex_core::cross_session::SessionEventStream;
use codex_core::protocol::EventMsg;
use codex_core::protocol::Op;
@@ -28,7 +25,6 @@ use serde::de::Error as _;
use serde_json::Value;
use serde_json::json;
use tokio::signal;
use tokio::time::Instant;
use tokio_stream::StreamExt;
use tracing::warn;
@@ -36,6 +32,7 @@ use crate::progress::ProgressReporter;
use crate::prompts;
use crate::run_store::RoleMetadata;
use crate::run_store::RunStore;
use crate::session;
use crate::signals::AggregatedVerifierVerdict;
use crate::signals::DirectiveResponse;
use crate::signals::VerifierDecision;
@@ -300,7 +297,8 @@ impl InftyOrchestrator {
let mut waiting_for_signal = false;
let mut pending_solver_turn_completion = false;
if let Some(objective) = &options.objective {
self.post_to_role(
session::post_turn(
self.hub.as_ref(),
&sessions.run_id,
&sessions.solver.role,
objective.as_str(),
@@ -486,31 +484,39 @@ impl InftyOrchestrator {
objective: options.objective.as_deref(),
};
let request_text = serde_json::to_string_pretty(&request)?;
let handle = self
.post_to_role(
&sessions.run_id,
&sessions.director.role,
request_text,
Some(directive_response_schema()),
)
.await?;
let directive = self
.await_first_assistant_idle(&handle, options.director_timeout, Some("director"))
.await?;
let handle = session::post_turn(
self.hub.as_ref(),
&sessions.run_id,
&sessions.director.role,
request_text,
Some(directive_response_schema()),
)
.await?;
let progress = self
.progress
.as_deref()
.map(|reporter| (reporter, "director"));
let directive = session::await_first_idle(
self.hub.as_ref(),
&handle,
options.director_timeout,
progress,
)
.await?;
let directive_payload: DirectiveResponse = parse_json_struct(&directive.message.message)
.context("director response was not valid directive JSON")?;
if let Some(progress) = self.progress.as_ref() {
progress.director_response(&directive_payload);
}
let directive_text = serde_json::to_string_pretty(&directive_payload)?;
let _ = self
.post_to_role(
&sessions.run_id,
&sessions.solver.role,
directive_text,
Some(solver_signal_schema()),
)
.await?;
session::post_turn(
self.hub.as_ref(),
&sessions.run_id,
&sessions.solver.role,
directive_text,
Some(solver_signal_schema()),
)
.await?;
Ok(())
}
@@ -571,16 +577,15 @@ impl InftyOrchestrator {
}
async fn request_solver_signal(&self, run_id: &str, solver_role: &str) -> Result<()> {
let handle = self
.post_to_role(
run_id,
solver_role,
FINALIZATION_PROMPT,
Some(final_delivery_schema()),
)
.await?;
let _ = self
.await_first_assistant_idle(&handle, Duration::from_secs(5), None)
let handle = session::post_turn(
self.hub.as_ref(),
run_id,
solver_role,
FINALIZATION_PROMPT,
Some(final_delivery_schema()),
)
.await?;
let _ = session::await_first_idle(self.hub.as_ref(), &handle, Duration::from_secs(5), None)
.await?;
Ok(())
}
@@ -606,17 +611,25 @@ impl InftyOrchestrator {
let request_text = serde_json::to_string_pretty(&request)?;
let mut collected = Vec::with_capacity(sessions.verifiers.len());
for verifier in &sessions.verifiers {
let handle = self
.post_to_role(
&sessions.run_id,
&verifier.role,
request_text.as_str(),
Some(verifier_verdict_schema()),
)
.await?;
let response = self
.await_first_assistant_idle(&handle, options.verifier_timeout, Some(&verifier.role))
.await?;
let handle = session::post_turn(
self.hub.as_ref(),
&sessions.run_id,
&verifier.role,
request_text.clone(),
Some(verifier_verdict_schema()),
)
.await?;
let progress = self
.progress
.as_deref()
.map(|reporter| (reporter, verifier.role.as_str()));
let response = session::await_first_idle(
self.hub.as_ref(),
&handle,
options.verifier_timeout,
progress,
)
.await?;
let verdict: VerifierVerdict = parse_json_struct(&response.message.message)
.with_context(|| {
format!("verifier {} returned invalid verdict JSON", verifier.role)
@@ -642,14 +655,14 @@ impl InftyOrchestrator {
summary: &AggregatedVerifierVerdict,
) -> Result<()> {
let summary_text = serde_json::to_string_pretty(summary)?;
let _ = self
.post_to_role(
&sessions.run_id,
&sessions.solver.role,
summary_text,
Some(solver_signal_schema()),
)
.await?;
session::post_turn(
self.hub.as_ref(),
&sessions.run_id,
&sessions.solver.role,
summary_text,
Some(solver_signal_schema()),
)
.await?;
Ok(())
}
@@ -686,90 +699,11 @@ impl InftyOrchestrator {
}
}
pub async fn post_to_role(
pub fn stream_events(
&self,
run_id: &str,
role: &str,
text: impl Into<String>,
final_output_json_schema: Option<Value>,
) -> Result<codex_core::cross_session::TurnHandle> {
let handle = self
.hub
.post_user_turn(PostUserTurnRequest {
target: RoleOrId::RunRole {
run_id: run_id.to_string(),
role: role.to_string(),
},
text: text.into(),
final_output_json_schema,
})
.await?;
Ok(handle)
}
pub async fn await_first_assistant(
&self,
handle: &codex_core::cross_session::TurnHandle,
timeout: Duration,
) -> Result<AssistantMessage> {
let message = self.hub.await_first_assistant(handle, timeout).await?;
Ok(message)
}
/// Await the first assistant message for this turn, but only time out after a
/// period of inactivity. Any event activity for this submission id resets the timer.
pub async fn await_first_assistant_idle(
&self,
handle: &codex_core::cross_session::TurnHandle,
idle_timeout: Duration,
role_label: Option<&str>,
) -> Result<AssistantMessage> {
// Subscribe to the session event stream to observe activity for this turn.
let mut events = self
.hub
.stream_events(handle.conversation_id())
.map_err(|e| anyhow!(e))?;
// We still rely on the hub's oneshot to capture the first assistant message.
let wait_first = self.hub.await_first_assistant(handle, idle_timeout);
tokio::pin!(wait_first);
// Idle timer that resets on any event for this submission.
let idle = tokio::time::sleep(idle_timeout);
tokio::pin!(idle);
let sub_id = handle.submission_id().to_string();
loop {
tokio::select! {
res = &mut wait_first => {
return res.map_err(|e| anyhow!(e));
}
maybe_event = events.next() => {
let Some(ev) = maybe_event else {
// Event stream ended; if the assistant message has not arrived yet,
// treat as session closed.
bail!(codex_core::cross_session::CrossSessionError::SessionClosed);
};
// Reset idle timer only for events emitted for our submission id.
if ev.event.id == sub_id {
if let Some(progress) = self.progress.as_ref() && let Some(role) = role_label {
progress.role_event(role, &ev.event.msg);
}
// If the session emits an error for this submission, surface it immediately
// rather than waiting for the idle timeout.
if let EventMsg::Error(err) = &ev.event.msg {
bail!(anyhow!(err.message.clone()));
}
idle.as_mut().reset(Instant::now() + idle_timeout);
}
}
_ = &mut idle => {
// No activity for the idle window — return a timeout error.
bail!(codex_core::cross_session::CrossSessionError::AwaitTimeout(idle_timeout));
}
}
}
conversation_id: ConversationId,
) -> Result<SessionEventStream, codex_core::cross_session::CrossSessionError> {
self.hub.stream_events(conversation_id)
}
pub async fn call_role(
@@ -780,11 +714,16 @@ impl InftyOrchestrator {
timeout: Duration,
final_output_json_schema: Option<Value>,
) -> Result<AssistantMessage> {
let handle = self
.post_to_role(run_id, role, text, final_output_json_schema)
.await?;
self.await_first_assistant_idle(&handle, timeout, Some(role))
.await
let handle = session::post_turn(
self.hub.as_ref(),
run_id,
role,
text,
final_output_json_schema,
)
.await?;
let progress = self.progress.as_deref().map(|reporter| (reporter, role));
session::await_first_idle(self.hub.as_ref(), &handle, timeout, progress).await
}
pub async fn relay_assistant_to_role(
@@ -795,23 +734,19 @@ impl InftyOrchestrator {
timeout: Duration,
final_output_json_schema: Option<Value>,
) -> Result<AssistantMessage> {
let handle = self
.post_to_role(
run_id,
target_role,
assistant.message.message.clone(),
final_output_json_schema,
)
.await?;
self.await_first_assistant_idle(&handle, timeout, Some(target_role))
.await
}
pub fn stream_events(
&self,
conversation_id: ConversationId,
) -> Result<SessionEventStream, codex_core::cross_session::CrossSessionError> {
self.hub.stream_events(conversation_id)
let handle = session::post_turn(
self.hub.as_ref(),
run_id,
target_role,
assistant.message.message.clone(),
final_output_json_schema,
)
.await?;
let progress = self
.progress
.as_deref()
.map(|reporter| (reporter, target_role));
session::await_first_idle(self.hub.as_ref(), &handle, timeout, progress).await
}
async fn spawn_and_register_role(
@@ -822,9 +757,15 @@ impl InftyOrchestrator {
store: &mut RunStore,
cleanup: &mut Vec<SessionCleanup>,
) -> Result<RoleSession> {
let session = self
.spawn_role_session(run_id, run_path, role_config.clone())
.await?;
let session = session::spawn_role(
Arc::clone(&self.hub),
&self.conversation_manager,
run_id,
run_path,
role_config.clone(),
prompts::ensure_instructions,
)
.await?;
cleanup.push(SessionCleanup::new(&session));
store.update_rollout_path(&session.role, session.rollout_path.clone())?;
if let Some(path) = role_config.config_path.clone() {
@@ -840,49 +781,6 @@ impl InftyOrchestrator {
role_config: &RoleConfig,
store: &mut RunStore,
cleanup: &mut Vec<SessionCleanup>,
) -> Result<RoleSession> {
let session = self
.resume_role_session(run_id, run_path, role_config, store)
.await?;
cleanup.push(SessionCleanup::new(&session));
store.update_rollout_path(&session.role, session.rollout_path.clone())?;
if let Some(path) = role_config.config_path.clone() {
store.set_role_config_path(&session.role, path)?;
}
Ok(session)
}
async fn spawn_role_session(
&self,
run_id: &str,
run_path: &Path,
role_config: RoleConfig,
) -> Result<RoleSession> {
let RoleConfig {
role, mut config, ..
} = role_config;
config.cwd = run_path.to_path_buf();
prompts::ensure_instructions(&role, &mut config);
let session = self
.conversation_manager
.new_conversation_with_cross_session(
config,
CrossSessionSpawnParams {
hub: Arc::clone(&self.hub),
run_id: Some(run_id.to_string()),
role: Some(role.clone()),
},
)
.await?;
Ok(RoleSession::from_new(role, session))
}
async fn resume_role_session(
&self,
run_id: &str,
run_path: &Path,
role_config: &RoleConfig,
store: &RunStore,
) -> Result<RoleSession> {
let metadata = store
.role_metadata(&role_config.role)
@@ -892,24 +790,22 @@ impl InftyOrchestrator {
.as_ref()
.ok_or_else(|| anyhow!("missing rollout path for role {}", role_config.role))?;
let mut config = role_config.config.clone();
config.cwd = run_path.to_path_buf();
prompts::ensure_instructions(&role_config.role, &mut config);
let session = self
.conversation_manager
.resume_conversation_with_cross_session(
config,
rollout_path.clone(),
CrossSessionSpawnParams {
hub: Arc::clone(&self.hub),
run_id: Some(run_id.to_string()),
role: Some(role_config.role.clone()),
},
)
.await?;
Ok(RoleSession::from_new(role_config.role.clone(), session))
let session = session::resume_role(
Arc::clone(&self.hub),
&self.conversation_manager,
run_id,
run_path,
role_config,
rollout_path,
prompts::ensure_instructions,
)
.await?;
cleanup.push(SessionCleanup::new(&session));
store.update_rollout_path(&session.role, session.rollout_path.clone())?;
if let Some(path) = role_config.config_path.clone() {
store.set_role_config_path(&session.role, path)?;
}
Ok(session)
}
}