This commit is contained in:
jif-oai
2025-10-14 14:03:15 +01:00
parent 9320565658
commit f073bc5ccf
4 changed files with 303 additions and 6 deletions

View File

@@ -28,6 +28,7 @@ use serde::de::Error as _;
use serde_json::Value;
use serde_json::json;
use tokio::signal;
use tokio::time::Instant;
use tokio_stream::StreamExt;
use tracing::warn;
@@ -491,7 +492,7 @@ impl InftyOrchestrator {
)
.await?;
let directive = self
.await_first_assistant(&handle, options.director_timeout)
.await_first_assistant_idle(&handle, options.director_timeout, Some("director"))
.await?;
let directive_payload: DirectiveResponse = parse_json_struct(&directive.message.message)
.context("director response was not valid directive JSON")?;
@@ -576,7 +577,7 @@ impl InftyOrchestrator {
)
.await?;
let _ = self
.await_first_assistant(&handle, Duration::from_secs(5))
.await_first_assistant_idle(&handle, Duration::from_secs(5), None)
.await?;
Ok(())
}
@@ -611,7 +612,7 @@ impl InftyOrchestrator {
)
.await?;
let response = self
.await_first_assistant(&handle, options.verifier_timeout)
.await_first_assistant_idle(&handle, options.verifier_timeout, Some(&verifier.role))
.await?;
let verdict: VerifierVerdict = parse_json_struct(&response.message.message)
.with_context(|| {
@@ -712,6 +713,59 @@ impl InftyOrchestrator {
Ok(message)
}
/// Await the first assistant message for this turn, but only time out after a
/// period of inactivity. Any event activity for this submission id resets the timer.
pub async fn await_first_assistant_idle(
&self,
handle: &codex_core::cross_session::TurnHandle,
idle_timeout: Duration,
role_label: Option<&str>,
) -> Result<AssistantMessage> {
// Subscribe to the session event stream to observe activity for this turn.
let mut events = self
.hub
.stream_events(handle.conversation_id())
.map_err(|e| anyhow!(e))?;
// We still rely on the hub's oneshot to capture the first assistant message.
let wait_first = self.hub.await_first_assistant(handle, idle_timeout);
tokio::pin!(wait_first);
// Idle timer that resets on any event for this submission.
let idle = tokio::time::sleep(idle_timeout);
tokio::pin!(idle);
let sub_id = handle.submission_id().to_string();
loop {
tokio::select! {
res = &mut wait_first => {
return res.map_err(|e| anyhow!(e));
}
maybe_event = events.next() => {
let Some(ev) = maybe_event else {
// Event stream ended; if the assistant message has not arrived yet,
// treat as session closed.
bail!(codex_core::cross_session::CrossSessionError::SessionClosed);
};
// Reset idle timer only for events emitted for our submission id.
if ev.event.id == sub_id {
if let Some(progress) = self.progress.as_ref()
&& let Some(role) = role_label
{
progress.role_event(role, &ev.event.msg);
}
idle.as_mut().reset(Instant::now() + idle_timeout);
}
}
_ = &mut idle => {
// No activity for the idle window — return a timeout error.
bail!(codex_core::cross_session::CrossSessionError::AwaitTimeout(idle_timeout));
}
}
}
}
pub async fn call_role(
&self,
run_id: &str,
@@ -723,7 +777,8 @@ impl InftyOrchestrator {
let handle = self
.post_to_role(run_id, role, text, final_output_json_schema)
.await?;
self.await_first_assistant(&handle, timeout).await
self.await_first_assistant_idle(&handle, timeout, Some(role))
.await
}
pub async fn relay_assistant_to_role(
@@ -742,7 +797,8 @@ impl InftyOrchestrator {
final_output_json_schema,
)
.await?;
self.await_first_assistant(&handle, timeout).await
self.await_first_assistant_idle(&handle, timeout, Some(target_role))
.await
}
pub fn stream_events(