mirror of
https://github.com/openai/codex.git
synced 2026-05-09 13:52:41 +00:00
Compare commits
5 Commits
xli-codex/
...
jif/fix-on
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
51062332c2 | ||
|
|
9e6edbb1bc | ||
|
|
0257728856 | ||
|
|
90cabecba9 | ||
|
|
1cd1fbd6b5 |
@@ -44,6 +44,7 @@ use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::ThreadRolledBackEvent;
|
||||
use codex_protocol::protocol::TurnAbortedEvent;
|
||||
use codex_protocol::protocol::TurnCompleteEvent;
|
||||
use codex_protocol::protocol::TurnOutcome;
|
||||
use codex_protocol::protocol::TurnStartedEvent;
|
||||
use codex_protocol::protocol::UserMessageEvent;
|
||||
use codex_protocol::protocol::ViewImageToolCallEvent;
|
||||
@@ -920,9 +921,19 @@ impl ThreadHistoryBuilder {
|
||||
}
|
||||
|
||||
fn handle_turn_complete(&mut self, payload: &TurnCompleteEvent) {
|
||||
let mark_completed = |status: &mut TurnStatus| {
|
||||
if matches!(*status, TurnStatus::Completed | TurnStatus::InProgress) {
|
||||
*status = TurnStatus::Completed;
|
||||
let mark_finished = |turn: &mut PendingTurn| match &payload.outcome {
|
||||
TurnOutcome::Succeeded { .. } => {
|
||||
if matches!(turn.status, TurnStatus::Completed | TurnStatus::InProgress) {
|
||||
turn.status = TurnStatus::Completed;
|
||||
}
|
||||
}
|
||||
TurnOutcome::Failed { error } => {
|
||||
turn.status = TurnStatus::Failed;
|
||||
turn.error = Some(V2TurnError {
|
||||
message: error.message.clone(),
|
||||
codex_error_info: error.codex_error_info.clone().map(Into::into),
|
||||
additional_details: None,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
@@ -932,7 +943,7 @@ impl ThreadHistoryBuilder {
|
||||
.as_mut()
|
||||
.filter(|turn| turn.id == payload.turn_id)
|
||||
{
|
||||
mark_completed(&mut current_turn.status);
|
||||
mark_finished(current_turn);
|
||||
self.finish_current_turn();
|
||||
return;
|
||||
}
|
||||
@@ -942,13 +953,27 @@ impl ThreadHistoryBuilder {
|
||||
.iter_mut()
|
||||
.find(|turn| turn.id == payload.turn_id)
|
||||
{
|
||||
mark_completed(&mut turn.status);
|
||||
match &payload.outcome {
|
||||
TurnOutcome::Succeeded { .. } => {
|
||||
if matches!(turn.status, TurnStatus::Completed | TurnStatus::InProgress) {
|
||||
turn.status = TurnStatus::Completed;
|
||||
}
|
||||
}
|
||||
TurnOutcome::Failed { error } => {
|
||||
turn.status = TurnStatus::Failed;
|
||||
turn.error = Some(V2TurnError {
|
||||
message: error.message.clone(),
|
||||
codex_error_info: error.codex_error_info.clone().map(Into::into),
|
||||
additional_details: None,
|
||||
});
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// If the completion event cannot be matched, apply it to the active turn.
|
||||
if let Some(current_turn) = self.current_turn.as_mut() {
|
||||
mark_completed(&mut current_turn.status);
|
||||
mark_finished(current_turn);
|
||||
self.finish_current_turn();
|
||||
}
|
||||
}
|
||||
@@ -1361,7 +1386,9 @@ mod tests {
|
||||
}),
|
||||
EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: turn_id.to_string(),
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
}),
|
||||
];
|
||||
|
||||
@@ -1432,7 +1459,9 @@ mod tests {
|
||||
})),
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-image".into(),
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
})),
|
||||
];
|
||||
|
||||
@@ -1747,7 +1776,9 @@ mod tests {
|
||||
}),
|
||||
EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-a".into(),
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
}),
|
||||
];
|
||||
|
||||
@@ -2046,7 +2077,9 @@ mod tests {
|
||||
}),
|
||||
EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-a".into(),
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
}),
|
||||
EventMsg::TurnStarted(TurnStartedEvent {
|
||||
turn_id: "turn-b".into(),
|
||||
@@ -2080,7 +2113,9 @@ mod tests {
|
||||
}),
|
||||
EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-b".into(),
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
}),
|
||||
];
|
||||
|
||||
@@ -2129,7 +2164,9 @@ mod tests {
|
||||
}),
|
||||
EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-a".into(),
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
}),
|
||||
EventMsg::TurnStarted(TurnStartedEvent {
|
||||
turn_id: "turn-b".into(),
|
||||
@@ -2163,7 +2200,9 @@ mod tests {
|
||||
}),
|
||||
EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-b".into(),
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
}),
|
||||
];
|
||||
|
||||
@@ -2332,7 +2371,9 @@ mod tests {
|
||||
}),
|
||||
EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-a".into(),
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
}),
|
||||
EventMsg::TurnStarted(TurnStartedEvent {
|
||||
turn_id: "turn-b".into(),
|
||||
@@ -2347,7 +2388,9 @@ mod tests {
|
||||
}),
|
||||
EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-a".into(),
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
}),
|
||||
EventMsg::AgentMessage(AgentMessageEvent {
|
||||
message: "still in b".into(),
|
||||
@@ -2356,7 +2399,9 @@ mod tests {
|
||||
}),
|
||||
EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-b".into(),
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
}),
|
||||
];
|
||||
|
||||
@@ -2387,7 +2432,9 @@ mod tests {
|
||||
}),
|
||||
EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-a".into(),
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
}),
|
||||
EventMsg::TurnStarted(TurnStartedEvent {
|
||||
turn_id: "turn-b".into(),
|
||||
@@ -2437,7 +2484,9 @@ mod tests {
|
||||
}),
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-compact".into(),
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
})),
|
||||
];
|
||||
|
||||
@@ -2676,7 +2725,9 @@ mod tests {
|
||||
}),
|
||||
EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-a".into(),
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
}),
|
||||
EventMsg::Error(ErrorEvent {
|
||||
message: "request-level failure".into(),
|
||||
@@ -2729,7 +2780,9 @@ mod tests {
|
||||
}),
|
||||
EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-a".into(),
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
}),
|
||||
];
|
||||
|
||||
@@ -2777,7 +2830,9 @@ mod tests {
|
||||
RolloutItem::ResponseItem(hook_prompt),
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-a".into(),
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
})),
|
||||
];
|
||||
|
||||
@@ -2822,7 +2877,9 @@ mod tests {
|
||||
}),
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-a".into(),
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
})),
|
||||
];
|
||||
|
||||
|
||||
@@ -128,6 +128,7 @@ use codex_protocol::protocol::RealtimeEvent;
|
||||
use codex_protocol::protocol::ReviewDecision;
|
||||
use codex_protocol::protocol::ReviewOutputEvent;
|
||||
use codex_protocol::protocol::TokenCountEvent;
|
||||
use codex_protocol::protocol::TurnCompleteEvent;
|
||||
use codex_protocol::protocol::TurnDiffEvent;
|
||||
use codex_protocol::request_permissions::PermissionGrantScope as CorePermissionGrantScope;
|
||||
use codex_protocol::request_permissions::RequestPermissionProfile as CoreRequestPermissionProfile;
|
||||
@@ -297,11 +298,23 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
EventMsg::TurnComplete(_ev) => {
|
||||
// All per-thread requests are bound to a turn, so abort them.
|
||||
outgoing.abort_pending_server_requests().await;
|
||||
let turn_failed = thread_state.lock().await.turn_summary.last_error.is_some();
|
||||
let turn_failed = thread_state.lock().await.turn_summary.last_error.is_some()
|
||||
|| matches!(
|
||||
&_ev.outcome,
|
||||
codex_protocol::protocol::TurnOutcome::Failed { error }
|
||||
if error.affects_turn_status()
|
||||
);
|
||||
thread_watch_manager
|
||||
.note_turn_completed(&conversation_id.to_string(), turn_failed)
|
||||
.await;
|
||||
handle_turn_complete(conversation_id, event_turn_id, &outgoing, &thread_state).await;
|
||||
handle_turn_complete(
|
||||
conversation_id,
|
||||
event_turn_id,
|
||||
&_ev,
|
||||
&outgoing,
|
||||
&thread_state,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
EventMsg::SkillsUpdateAvailable => {
|
||||
if let ApiVersion::V2 = api_version {
|
||||
@@ -2100,6 +2113,7 @@ async fn find_and_remove_turn_summary(
|
||||
async fn handle_turn_complete(
|
||||
conversation_id: ThreadId,
|
||||
event_turn_id: String,
|
||||
event: &TurnCompleteEvent,
|
||||
outgoing: &ThreadScopedOutgoingMessageSender,
|
||||
thread_state: &Arc<Mutex<ThreadState>>,
|
||||
) {
|
||||
@@ -2107,7 +2121,21 @@ async fn handle_turn_complete(
|
||||
|
||||
let (status, error) = match turn_summary.last_error {
|
||||
Some(error) => (TurnStatus::Failed, Some(error)),
|
||||
None => (TurnStatus::Completed, None),
|
||||
None => match &event.outcome {
|
||||
codex_protocol::protocol::TurnOutcome::Failed { error }
|
||||
if error.affects_turn_status() =>
|
||||
{
|
||||
(
|
||||
TurnStatus::Failed,
|
||||
Some(TurnError {
|
||||
message: error.message.clone(),
|
||||
codex_error_info: error.codex_error_info.clone().map(Into::into),
|
||||
additional_details: None,
|
||||
}),
|
||||
)
|
||||
}
|
||||
_ => (TurnStatus::Completed, None),
|
||||
},
|
||||
};
|
||||
|
||||
emit_turn_completed_with_status(conversation_id, event_turn_id, status, error, outgoing).await;
|
||||
|
||||
@@ -527,6 +527,37 @@ impl AgentControl {
|
||||
result
|
||||
}
|
||||
|
||||
pub(crate) async fn forward_child_final_status_to_parent(
|
||||
&self,
|
||||
session_source: &SessionSource,
|
||||
status: &AgentStatus,
|
||||
) {
|
||||
let Some(parent_thread_id) = thread_spawn_parent_thread_id(session_source) else {
|
||||
return;
|
||||
};
|
||||
let Some(child_agent_path) = session_source.get_agent_path() else {
|
||||
return;
|
||||
};
|
||||
let Some(parent_agent_path) = child_agent_path
|
||||
.as_str()
|
||||
.rsplit_once('/')
|
||||
.and_then(|(parent, _)| AgentPath::try_from(parent).ok())
|
||||
else {
|
||||
return;
|
||||
};
|
||||
let message = format_subagent_notification_message(child_agent_path.as_str(), status);
|
||||
let communication = InterAgentCommunication::new(
|
||||
child_agent_path,
|
||||
parent_agent_path,
|
||||
Vec::new(),
|
||||
message,
|
||||
/*trigger_turn*/ false,
|
||||
);
|
||||
let _ = self
|
||||
.send_inter_agent_communication(parent_thread_id, communication)
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Interrupt the current task for an existing agent thread.
|
||||
pub(crate) async fn interrupt_agent(&self, agent_id: ThreadId) -> CodexResult<String> {
|
||||
let state = self.upgrade()?;
|
||||
@@ -782,6 +813,11 @@ impl AgentControl {
|
||||
child_reference: String,
|
||||
child_agent_path: Option<AgentPath>,
|
||||
) {
|
||||
// Agent-path children now forward final status directly from `Session::send_event`,
|
||||
// which keeps reused spawned agents notifying their parent on every completed turn.
|
||||
if child_agent_path.is_some() {
|
||||
return;
|
||||
}
|
||||
let Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id, ..
|
||||
})) = session_source
|
||||
|
||||
@@ -250,7 +250,9 @@ async fn on_event_updates_status_from_task_started() {
|
||||
async fn on_event_updates_status_from_task_complete() {
|
||||
let status = agent_status_from_event(&EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-1".to_string(),
|
||||
last_agent_message: Some("done".to_string()),
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: Some("done".to_string()),
|
||||
},
|
||||
}));
|
||||
let expected = AgentStatus::Completed(Some("done".to_string()));
|
||||
assert_eq!(status, Some(expected));
|
||||
@@ -1133,7 +1135,9 @@ async fn multi_agent_v2_completion_ignores_dead_direct_parent() {
|
||||
tester_turn.as_ref(),
|
||||
EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: tester_turn.sub_id.clone(),
|
||||
last_agent_message: Some("done".to_string()),
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: Some("done".to_string()),
|
||||
},
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
@@ -1180,35 +1184,46 @@ async fn multi_agent_v2_completion_ignores_dead_direct_parent() {
|
||||
#[tokio::test]
|
||||
async fn multi_agent_v2_completion_queues_message_for_direct_parent() {
|
||||
let harness = AgentControlHarness::new().await;
|
||||
let (_root_thread_id, root_thread) = harness.start_thread().await;
|
||||
let (worker_thread_id, _worker_thread) = harness.start_thread().await;
|
||||
let (root_thread_id, root_thread) = harness.start_thread().await;
|
||||
let mut tester_config = harness.config.clone();
|
||||
let _ = tester_config.features.enable(Feature::MultiAgentV2);
|
||||
let tester_thread_id = harness
|
||||
.manager
|
||||
.start_thread(tester_config)
|
||||
let worker_path = AgentPath::root().join("worker_a").expect("worker path");
|
||||
let tester_path = worker_path.join("tester").expect("tester path");
|
||||
let worker_thread_id = harness
|
||||
.control
|
||||
.spawn_agent(
|
||||
tester_config.clone(),
|
||||
text_input("hello worker"),
|
||||
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id: root_thread_id,
|
||||
depth: 1,
|
||||
agent_path: Some(worker_path.clone()),
|
||||
agent_nickname: None,
|
||||
agent_role: Some("explorer".to_string()),
|
||||
})),
|
||||
)
|
||||
.await
|
||||
.expect("tester thread should start")
|
||||
.thread_id;
|
||||
.expect("worker spawn should succeed");
|
||||
let tester_thread_id = harness
|
||||
.control
|
||||
.spawn_agent(
|
||||
tester_config,
|
||||
text_input("hello tester"),
|
||||
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id: worker_thread_id,
|
||||
depth: 2,
|
||||
agent_path: Some(tester_path.clone()),
|
||||
agent_nickname: None,
|
||||
agent_role: Some("explorer".to_string()),
|
||||
})),
|
||||
)
|
||||
.await
|
||||
.expect("tester spawn should succeed");
|
||||
let tester_thread = harness
|
||||
.manager
|
||||
.get_thread(tester_thread_id)
|
||||
.await
|
||||
.expect("tester thread should exist");
|
||||
let worker_path = AgentPath::root().join("worker_a").expect("worker path");
|
||||
let tester_path = worker_path.join("tester").expect("tester path");
|
||||
harness.control.maybe_start_completion_watcher(
|
||||
tester_thread_id,
|
||||
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id: worker_thread_id,
|
||||
depth: 2,
|
||||
agent_path: Some(tester_path.clone()),
|
||||
agent_nickname: None,
|
||||
agent_role: Some("explorer".to_string()),
|
||||
})),
|
||||
tester_path.to_string(),
|
||||
Some(tester_path.clone()),
|
||||
);
|
||||
let tester_turn = tester_thread.codex.session.new_default_turn().await;
|
||||
tester_thread
|
||||
.codex
|
||||
@@ -1217,11 +1232,12 @@ async fn multi_agent_v2_completion_queues_message_for_direct_parent() {
|
||||
tester_turn.as_ref(),
|
||||
EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: tester_turn.sub_id.clone(),
|
||||
last_agent_message: Some("done".to_string()),
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: Some("done".to_string()),
|
||||
},
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
let expected_message = crate::session_prefix::format_subagent_notification_message(
|
||||
tester_path.as_str(),
|
||||
&AgentStatus::Completed(Some("done".to_string())),
|
||||
|
||||
@@ -1,19 +1,25 @@
|
||||
use codex_protocol::protocol::AgentStatus;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::TurnOutcome;
|
||||
|
||||
/// Derive the next agent status from a single emitted event.
|
||||
/// Returns `None` when the event does not affect status tracking.
|
||||
pub(crate) fn agent_status_from_event(msg: &EventMsg) -> Option<AgentStatus> {
|
||||
match msg {
|
||||
EventMsg::TurnStarted(_) => Some(AgentStatus::Running),
|
||||
EventMsg::TurnComplete(ev) => Some(AgentStatus::Completed(ev.last_agent_message.clone())),
|
||||
EventMsg::TurnComplete(ev) => Some(match &ev.outcome {
|
||||
TurnOutcome::Succeeded { last_agent_message } => {
|
||||
AgentStatus::Completed(last_agent_message.clone())
|
||||
}
|
||||
TurnOutcome::Failed { error } => AgentStatus::Errored(error.message.clone()),
|
||||
}),
|
||||
EventMsg::TurnAborted(ev) => match ev.reason {
|
||||
codex_protocol::protocol::TurnAbortReason::Interrupted => {
|
||||
Some(AgentStatus::Interrupted)
|
||||
}
|
||||
_ => Some(AgentStatus::Errored(format!("{:?}", ev.reason))),
|
||||
},
|
||||
EventMsg::Error(ev) => Some(AgentStatus::Errored(ev.message.clone())),
|
||||
EventMsg::Error(_) => None,
|
||||
EventMsg::ShutdownComplete => Some(AgentStatus::Shutdown),
|
||||
_ => None,
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ use crate::agent::AgentStatus;
|
||||
use crate::agent::Mailbox;
|
||||
use crate::agent::MailboxReceiver;
|
||||
use crate::agent::agent_status_from_event;
|
||||
use crate::agent::status::is_final;
|
||||
use crate::apps::render_apps_section;
|
||||
use crate::auth_env_telemetry::collect_auth_env_telemetry;
|
||||
use crate::commit_attribution::commit_message_trailer_instruction;
|
||||
@@ -314,6 +315,7 @@ use crate::protocol::TokenCountEvent;
|
||||
use crate::protocol::TokenUsage;
|
||||
use crate::protocol::TokenUsageInfo;
|
||||
use crate::protocol::TurnDiffEvent;
|
||||
use crate::protocol::TurnOutcome;
|
||||
use crate::protocol::WarningEvent;
|
||||
use crate::resolve_skill_dependencies_for_turn;
|
||||
use crate::rollout::RolloutRecorder;
|
||||
@@ -2641,6 +2643,15 @@ impl Session {
|
||||
.await;
|
||||
self.maybe_clear_realtime_handoff_for_event(&legacy_source)
|
||||
.await;
|
||||
if let Some(status) = agent_status_from_event(&legacy_source)
|
||||
&& is_final(&status)
|
||||
&& turn_context.session_source.get_agent_path().is_some()
|
||||
{
|
||||
self.services
|
||||
.agent_control
|
||||
.forward_child_final_status_to_parent(&turn_context.session_source, &status)
|
||||
.await;
|
||||
}
|
||||
|
||||
let show_raw_agent_reasoning = self.show_raw_agent_reasoning();
|
||||
for legacy in legacy_source.as_legacy_events(show_raw_agent_reasoning) {
|
||||
@@ -5586,9 +5597,11 @@ pub(crate) async fn run_turn(
|
||||
input: Vec<UserInput>,
|
||||
prewarmed_client_session: Option<ModelClientSession>,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
) -> TurnOutcome {
|
||||
if input.is_empty() && !sess.has_pending_input().await {
|
||||
return None;
|
||||
return TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
};
|
||||
}
|
||||
|
||||
let model_info = turn_context.model_info.clone();
|
||||
@@ -5602,7 +5615,9 @@ pub(crate) async fn run_turn(
|
||||
.is_err()
|
||||
{
|
||||
error!("Failed to run pre-sampling compact");
|
||||
return None;
|
||||
return TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
};
|
||||
}
|
||||
|
||||
let skills_outcome = Some(turn_context.turn_skills.outcome.as_ref());
|
||||
@@ -5632,7 +5647,11 @@ pub(crate) async fn run_turn(
|
||||
.await
|
||||
{
|
||||
Ok(mcp_tools) => mcp_tools,
|
||||
Err(_) if turn_context.apps_enabled() => return None,
|
||||
Err(_) if turn_context.apps_enabled() => {
|
||||
return TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
};
|
||||
}
|
||||
Err(_) => HashMap::new(),
|
||||
}
|
||||
} else {
|
||||
@@ -5730,7 +5749,9 @@ pub(crate) async fn run_turn(
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if run_pending_session_start_hooks(&sess, &turn_context).await {
|
||||
return None;
|
||||
return TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
};
|
||||
}
|
||||
let additional_contexts = if input.is_empty() {
|
||||
Vec::new()
|
||||
@@ -5750,7 +5771,9 @@ pub(crate) async fn run_turn(
|
||||
user_prompt_submit_outcome.additional_contexts,
|
||||
)
|
||||
.await;
|
||||
return None;
|
||||
return TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
};
|
||||
}
|
||||
sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), &input, response_item)
|
||||
.await;
|
||||
@@ -5913,7 +5936,9 @@ pub(crate) async fn run_turn(
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
return None;
|
||||
return TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
};
|
||||
}
|
||||
continue;
|
||||
}
|
||||
@@ -6025,15 +6050,12 @@ pub(crate) async fn run_turn(
|
||||
}
|
||||
}
|
||||
if let Some(message) = abort_message {
|
||||
sess.send_event(
|
||||
&turn_context,
|
||||
EventMsg::Error(ErrorEvent {
|
||||
return TurnOutcome::Failed {
|
||||
error: ErrorEvent {
|
||||
message,
|
||||
codex_error_info: None,
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
return None;
|
||||
},
|
||||
};
|
||||
}
|
||||
break;
|
||||
}
|
||||
@@ -6051,25 +6073,23 @@ pub(crate) async fn run_turn(
|
||||
if state.history.replace_last_turn_images("Invalid image") {
|
||||
continue;
|
||||
}
|
||||
let event = EventMsg::Error(ErrorEvent {
|
||||
let error = ErrorEvent {
|
||||
message: "Invalid image in your last message. Please remove it and try again."
|
||||
.to_string(),
|
||||
codex_error_info: Some(CodexErrorInfo::BadRequest),
|
||||
});
|
||||
sess.send_event(&turn_context, event).await;
|
||||
break;
|
||||
};
|
||||
return TurnOutcome::Failed { error };
|
||||
}
|
||||
Err(e) => {
|
||||
info!("Turn error: {e:#}");
|
||||
let event = EventMsg::Error(e.to_error_event(/*message_prefix*/ None));
|
||||
sess.send_event(&turn_context, event).await;
|
||||
// let the user continue the conversation
|
||||
break;
|
||||
return TurnOutcome::Failed {
|
||||
error: e.to_error_event(/*message_prefix*/ None),
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
last_agent_message
|
||||
TurnOutcome::Succeeded { last_agent_message }
|
||||
}
|
||||
|
||||
async fn run_pre_sampling_compact(
|
||||
|
||||
@@ -208,7 +208,9 @@ async fn reconstruct_history_rollback_keeps_history_and_metadata_in_sync_for_com
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(
|
||||
codex_protocol::protocol::TurnCompleteEvent {
|
||||
turn_id: first_turn_id,
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
},
|
||||
)),
|
||||
RolloutItem::EventMsg(EventMsg::TurnStarted(
|
||||
@@ -232,7 +234,9 @@ async fn reconstruct_history_rollback_keeps_history_and_metadata_in_sync_for_com
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(
|
||||
codex_protocol::protocol::TurnCompleteEvent {
|
||||
turn_id: rolled_back_turn_id,
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
},
|
||||
)),
|
||||
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(
|
||||
@@ -298,7 +302,9 @@ async fn reconstruct_history_rollback_keeps_history_and_metadata_in_sync_for_inc
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(
|
||||
codex_protocol::protocol::TurnCompleteEvent {
|
||||
turn_id: first_turn_id,
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
},
|
||||
)),
|
||||
RolloutItem::EventMsg(EventMsg::TurnStarted(
|
||||
@@ -383,7 +389,9 @@ async fn reconstruct_history_rollback_skips_non_user_turns_for_history_and_metad
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(
|
||||
codex_protocol::protocol::TurnCompleteEvent {
|
||||
turn_id: first_turn_id,
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
},
|
||||
)),
|
||||
RolloutItem::EventMsg(EventMsg::TurnStarted(
|
||||
@@ -406,7 +414,9 @@ async fn reconstruct_history_rollback_skips_non_user_turns_for_history_and_metad
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(
|
||||
codex_protocol::protocol::TurnCompleteEvent {
|
||||
turn_id: second_turn_id,
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
},
|
||||
)),
|
||||
RolloutItem::EventMsg(EventMsg::TurnStarted(
|
||||
@@ -420,7 +430,9 @@ async fn reconstruct_history_rollback_skips_non_user_turns_for_history_and_metad
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(
|
||||
codex_protocol::protocol::TurnCompleteEvent {
|
||||
turn_id: standalone_turn_id,
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
},
|
||||
)),
|
||||
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(
|
||||
@@ -489,7 +501,9 @@ async fn reconstruct_history_rollback_counts_inter_agent_assistant_turns() {
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(
|
||||
codex_protocol::protocol::TurnCompleteEvent {
|
||||
turn_id: first_turn_id,
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
},
|
||||
)),
|
||||
RolloutItem::EventMsg(EventMsg::TurnStarted(
|
||||
@@ -505,7 +519,9 @@ async fn reconstruct_history_rollback_counts_inter_agent_assistant_turns() {
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(
|
||||
codex_protocol::protocol::TurnCompleteEvent {
|
||||
turn_id: assistant_turn_id,
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
},
|
||||
)),
|
||||
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(
|
||||
@@ -569,7 +585,9 @@ async fn reconstruct_history_rollback_clears_history_and_metadata_when_exceeding
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(
|
||||
codex_protocol::protocol::TurnCompleteEvent {
|
||||
turn_id: only_turn_id,
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
},
|
||||
)),
|
||||
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(
|
||||
@@ -615,7 +633,9 @@ async fn record_initial_history_resumed_rollback_skips_only_user_turns() {
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(
|
||||
codex_protocol::protocol::TurnCompleteEvent {
|
||||
turn_id: user_turn_id,
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
},
|
||||
)),
|
||||
// Standalone task turn (no UserMessage) should not consume rollback skips.
|
||||
@@ -629,7 +649,9 @@ async fn record_initial_history_resumed_rollback_skips_only_user_turns() {
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(
|
||||
codex_protocol::protocol::TurnCompleteEvent {
|
||||
turn_id: standalone_turn_id,
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
},
|
||||
)),
|
||||
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(
|
||||
@@ -679,7 +701,9 @@ async fn record_initial_history_resumed_rollback_drops_incomplete_user_turn_comp
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(
|
||||
codex_protocol::protocol::TurnCompleteEvent {
|
||||
turn_id: previous_turn_id,
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
},
|
||||
)),
|
||||
RolloutItem::EventMsg(EventMsg::TurnStarted(
|
||||
@@ -831,7 +855,9 @@ async fn reconstruct_history_legacy_compaction_without_replacement_history_clear
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(
|
||||
codex_protocol::protocol::TurnCompleteEvent {
|
||||
turn_id: current_turn_id,
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
},
|
||||
)),
|
||||
];
|
||||
@@ -897,7 +923,9 @@ async fn record_initial_history_resumed_turn_context_after_compaction_reestablis
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(
|
||||
codex_protocol::protocol::TurnCompleteEvent {
|
||||
turn_id: previous_turn_id,
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
},
|
||||
)),
|
||||
];
|
||||
@@ -995,7 +1023,9 @@ async fn record_initial_history_resumed_aborted_turn_without_id_clears_active_tu
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(
|
||||
codex_protocol::protocol::TurnCompleteEvent {
|
||||
turn_id: previous_turn_id,
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
},
|
||||
)),
|
||||
RolloutItem::EventMsg(EventMsg::TurnStarted(
|
||||
@@ -1096,7 +1126,9 @@ async fn record_initial_history_resumed_unmatched_abort_preserves_active_turn_fo
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(
|
||||
codex_protocol::protocol::TurnCompleteEvent {
|
||||
turn_id: previous_turn_id,
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
},
|
||||
)),
|
||||
RolloutItem::EventMsg(EventMsg::TurnStarted(
|
||||
@@ -1124,7 +1156,9 @@ async fn record_initial_history_resumed_unmatched_abort_preserves_active_turn_fo
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(
|
||||
codex_protocol::protocol::TurnCompleteEvent {
|
||||
turn_id: current_turn_id,
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
},
|
||||
)),
|
||||
];
|
||||
@@ -1203,7 +1237,9 @@ async fn record_initial_history_resumed_trailing_incomplete_turn_compaction_clea
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(
|
||||
codex_protocol::protocol::TurnCompleteEvent {
|
||||
turn_id: previous_turn_id,
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
},
|
||||
)),
|
||||
RolloutItem::EventMsg(EventMsg::TurnStarted(
|
||||
@@ -1348,7 +1384,9 @@ async fn record_initial_history_resumed_replaced_incomplete_compacted_turn_clear
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(
|
||||
codex_protocol::protocol::TurnCompleteEvent {
|
||||
turn_id: previous_turn_id,
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
},
|
||||
)),
|
||||
RolloutItem::EventMsg(EventMsg::TurnStarted(
|
||||
|
||||
@@ -35,6 +35,7 @@ use tracing::Span;
|
||||
|
||||
use crate::protocol::CompactedItem;
|
||||
use crate::protocol::CreditsSnapshot;
|
||||
use crate::protocol::ErrorEvent;
|
||||
use crate::protocol::InitialHistory;
|
||||
use crate::protocol::NetworkApprovalProtocol;
|
||||
use crate::protocol::RateLimitSnapshot;
|
||||
@@ -45,6 +46,7 @@ use crate::protocol::TokenCountEvent;
|
||||
use crate::protocol::TokenUsage;
|
||||
use crate::protocol::TokenUsageInfo;
|
||||
use crate::protocol::TurnCompleteEvent;
|
||||
use crate::protocol::TurnOutcome;
|
||||
use crate::protocol::TurnStartedEvent;
|
||||
use crate::protocol::UserMessageEvent;
|
||||
use crate::rollout::policy::EventPersistenceMode;
|
||||
@@ -1484,7 +1486,9 @@ async fn thread_rollback_recomputes_previous_turn_settings_and_reference_context
|
||||
RolloutItem::ResponseItem(turn_one_assistant.clone()),
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: first_turn_id,
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
})),
|
||||
RolloutItem::EventMsg(EventMsg::TurnStarted(
|
||||
codex_protocol::protocol::TurnStartedEvent {
|
||||
@@ -1506,7 +1510,9 @@ async fn thread_rollback_recomputes_previous_turn_settings_and_reference_context
|
||||
RolloutItem::ResponseItem(turn_two_assistant),
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: rolled_back_turn_id,
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
})),
|
||||
])
|
||||
.await;
|
||||
@@ -1580,7 +1586,9 @@ async fn thread_rollback_restores_cleared_reference_context_item_after_compactio
|
||||
RolloutItem::ResponseItem(assistant_message("turn 1 assistant")),
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: first_turn_id,
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
})),
|
||||
RolloutItem::EventMsg(EventMsg::TurnStarted(
|
||||
codex_protocol::protocol::TurnStartedEvent {
|
||||
@@ -1595,7 +1603,9 @@ async fn thread_rollback_restores_cleared_reference_context_item_after_compactio
|
||||
}),
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: compact_turn_id,
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
})),
|
||||
RolloutItem::EventMsg(EventMsg::TurnStarted(
|
||||
codex_protocol::protocol::TurnStartedEvent {
|
||||
@@ -1619,7 +1629,9 @@ async fn thread_rollback_restores_cleared_reference_context_item_after_compactio
|
||||
RolloutItem::ResponseItem(assistant_message("turn 2 assistant")),
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: rolled_back_turn_id,
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
})),
|
||||
])
|
||||
.await;
|
||||
@@ -1662,7 +1674,9 @@ async fn thread_rollback_persists_marker_and_replays_cumulatively() {
|
||||
RolloutItem::ResponseItem(assistant_message("turn 1 assistant")),
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-1".to_string(),
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
})),
|
||||
RolloutItem::EventMsg(EventMsg::TurnStarted(
|
||||
codex_protocol::protocol::TurnStartedEvent {
|
||||
@@ -1682,7 +1696,9 @@ async fn thread_rollback_persists_marker_and_replays_cumulatively() {
|
||||
RolloutItem::ResponseItem(assistant_message("turn 2 assistant")),
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-2".to_string(),
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
})),
|
||||
RolloutItem::EventMsg(EventMsg::TurnStarted(
|
||||
codex_protocol::protocol::TurnStartedEvent {
|
||||
@@ -1702,7 +1718,9 @@ async fn thread_rollback_persists_marker_and_replays_cumulatively() {
|
||||
RolloutItem::ResponseItem(assistant_message("turn 3 assistant")),
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-3".to_string(),
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
})),
|
||||
])
|
||||
.await;
|
||||
@@ -3138,13 +3156,15 @@ async fn spawn_task_turn_span_inherits_dispatch_trace_context() {
|
||||
_ctx: Arc<TurnContext>,
|
||||
_input: Vec<UserInput>,
|
||||
_cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
) -> TurnOutcome {
|
||||
let mut trace = self
|
||||
.captured_trace
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
*trace = current_span_w3c_trace_context();
|
||||
None
|
||||
TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4380,6 +4400,9 @@ struct NeverEndingTask {
|
||||
listen_to_cancellation_token: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
struct FailingTask;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl SessionTask for NeverEndingTask {
|
||||
fn kind(&self) -> TaskKind {
|
||||
@@ -4396,10 +4419,12 @@ impl SessionTask for NeverEndingTask {
|
||||
_ctx: Arc<TurnContext>,
|
||||
_input: Vec<UserInput>,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
) -> TurnOutcome {
|
||||
if self.listen_to_cancellation_token {
|
||||
cancellation_token.cancelled().await;
|
||||
return None;
|
||||
return TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
};
|
||||
}
|
||||
loop {
|
||||
sleep(Duration::from_secs(60)).await;
|
||||
@@ -4407,6 +4432,32 @@ impl SessionTask for NeverEndingTask {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl SessionTask for FailingTask {
|
||||
fn kind(&self) -> TaskKind {
|
||||
TaskKind::Regular
|
||||
}
|
||||
|
||||
fn span_name(&self) -> &'static str {
|
||||
"session_task.failing"
|
||||
}
|
||||
|
||||
async fn run(
|
||||
self: Arc<Self>,
|
||||
_session: Arc<SessionTaskContext>,
|
||||
_ctx: Arc<TurnContext>,
|
||||
_input: Vec<UserInput>,
|
||||
_cancellation_token: CancellationToken,
|
||||
) -> TurnOutcome {
|
||||
TurnOutcome::Failed {
|
||||
error: ErrorEvent {
|
||||
message: "boom".to_string(),
|
||||
codex_error_info: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
#[test_log::test]
|
||||
async fn abort_regular_task_emits_turn_aborted_only() {
|
||||
@@ -4441,6 +4492,30 @@ async fn abort_regular_task_emits_turn_aborted_only() {
|
||||
assert!(rx.try_recv().is_err());
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
#[test_log::test]
|
||||
async fn failing_regular_task_emits_error_without_turn_complete() {
|
||||
let (sess, tc, rx) = make_session_and_context_with_rx().await;
|
||||
let input = vec![UserInput::Text {
|
||||
text: "hello".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}];
|
||||
sess.spawn_task(Arc::clone(&tc), input, FailingTask).await;
|
||||
|
||||
let evt = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv())
|
||||
.await
|
||||
.expect("timeout waiting for event")
|
||||
.expect("event");
|
||||
match evt.msg {
|
||||
EventMsg::Error(ErrorEvent {
|
||||
message,
|
||||
codex_error_info: None,
|
||||
}) => assert_eq!(message, "boom"),
|
||||
other => panic!("unexpected event: {other:?}"),
|
||||
}
|
||||
assert!(rx.try_recv().is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn abort_gracefully_emits_turn_aborted_only() {
|
||||
let (sess, tc, rx) = make_session_and_context_with_rx().await;
|
||||
@@ -4502,8 +4577,13 @@ async fn task_finish_emits_turn_item_lifecycle_for_leftover_pending_user_input()
|
||||
.await
|
||||
.expect("inject pending input into active turn");
|
||||
|
||||
sess.on_task_finished(Arc::clone(&tc), /*last_agent_message*/ None)
|
||||
.await;
|
||||
sess.on_task_finished(
|
||||
Arc::clone(&tc),
|
||||
TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
let history = sess.clone_history().await;
|
||||
let expected = ResponseItem::Message {
|
||||
|
||||
@@ -18,6 +18,7 @@ use codex_protocol::protocol::InitialHistory;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use codex_protocol::protocol::TurnOutcome;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use serde_json::Value;
|
||||
use tokio::sync::Mutex;
|
||||
@@ -578,8 +579,6 @@ async fn wait_for_guardian_review(
|
||||
) -> (GuardianReviewSessionOutcome, bool) {
|
||||
let timeout = tokio::time::sleep_until(deadline);
|
||||
tokio::pin!(timeout);
|
||||
let mut last_error_message: Option<String> = None;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = &mut timeout => {
|
||||
@@ -600,21 +599,16 @@ async fn wait_for_guardian_review(
|
||||
match event {
|
||||
Ok(event) => match event.msg {
|
||||
EventMsg::TurnComplete(turn_complete) => {
|
||||
if turn_complete.last_agent_message.is_none()
|
||||
&& let Some(error_message) = last_error_message
|
||||
{
|
||||
return (
|
||||
GuardianReviewSessionOutcome::Completed(Err(anyhow!(error_message))),
|
||||
return match turn_complete.outcome {
|
||||
TurnOutcome::Succeeded { last_agent_message } => (
|
||||
GuardianReviewSessionOutcome::Completed(Ok(last_agent_message)),
|
||||
true,
|
||||
);
|
||||
}
|
||||
return (
|
||||
GuardianReviewSessionOutcome::Completed(Ok(turn_complete.last_agent_message)),
|
||||
true,
|
||||
);
|
||||
}
|
||||
EventMsg::Error(error) => {
|
||||
last_error_message = Some(error.message);
|
||||
),
|
||||
TurnOutcome::Failed { error } => (
|
||||
GuardianReviewSessionOutcome::Completed(Err(anyhow!(error.message))),
|
||||
true,
|
||||
),
|
||||
};
|
||||
}
|
||||
EventMsg::TurnAborted(_) => {
|
||||
return (GuardianReviewSessionOutcome::Aborted, true);
|
||||
|
||||
@@ -3,6 +3,7 @@ use std::sync::Arc;
|
||||
use super::SessionTask;
|
||||
use super::SessionTaskContext;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::protocol::TurnOutcome;
|
||||
use crate::state::TaskKind;
|
||||
use async_trait::async_trait;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
@@ -27,7 +28,7 @@ impl SessionTask for CompactTask {
|
||||
ctx: Arc<TurnContext>,
|
||||
input: Vec<UserInput>,
|
||||
_cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
) -> TurnOutcome {
|
||||
let session = session.clone_session();
|
||||
let _ = if crate::compact::should_use_remote_compact_task(&ctx.provider) {
|
||||
let _ = session.services.session_telemetry.counter(
|
||||
@@ -44,6 +45,8 @@ impl SessionTask for CompactTask {
|
||||
);
|
||||
crate::compact::run_compact_task(session.clone(), ctx, input).await
|
||||
};
|
||||
None
|
||||
TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use crate::codex::TurnContext;
|
||||
use crate::protocol::EventMsg;
|
||||
use crate::protocol::TurnOutcome;
|
||||
use crate::protocol::WarningEvent;
|
||||
use crate::state::TaskKind;
|
||||
use crate::tasks::SessionTask;
|
||||
@@ -42,7 +43,7 @@ impl SessionTask for GhostSnapshotTask {
|
||||
ctx: Arc<TurnContext>,
|
||||
_input: Vec<UserInput>,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
) -> TurnOutcome {
|
||||
tokio::task::spawn(async move {
|
||||
let token = self.token;
|
||||
let warnings_enabled = !ctx.ghost_snapshot.disable_warnings;
|
||||
@@ -156,7 +157,9 @@ impl SessionTask for GhostSnapshotTask {
|
||||
Err(err) => warn!("failed to mark ghost snapshot ready: {err}"),
|
||||
}
|
||||
});
|
||||
None
|
||||
TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -33,6 +33,7 @@ use crate::protocol::EventMsg;
|
||||
use crate::protocol::TurnAbortReason;
|
||||
use crate::protocol::TurnAbortedEvent;
|
||||
use crate::protocol::TurnCompleteEvent;
|
||||
use crate::protocol::TurnOutcome;
|
||||
use crate::state::ActiveTurn;
|
||||
use crate::state::RunningTask;
|
||||
use crate::state::TaskKind;
|
||||
@@ -140,15 +141,15 @@ pub(crate) trait SessionTask: Send + Sync + 'static {
|
||||
/// `ctx`, returning an optional final agent message when finished. The
|
||||
/// provided `cancellation_token` is cancelled when the session requests an
|
||||
/// abort; implementers should watch for it and terminate quickly once it
|
||||
/// fires. Returning [`Some`] yields a final message that
|
||||
/// [`Session::on_task_finished`] will emit to the client.
|
||||
/// fires. The returned [`TurnOutcome`] determines the single terminal
|
||||
/// event emitted by [`Session::on_task_finished`].
|
||||
async fn run(
|
||||
self: Arc<Self>,
|
||||
session: Arc<SessionTaskContext>,
|
||||
ctx: Arc<TurnContext>,
|
||||
input: Vec<UserInput>,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String>;
|
||||
) -> TurnOutcome;
|
||||
|
||||
/// Gives the task a chance to perform cleanup after an abort.
|
||||
///
|
||||
@@ -209,7 +210,7 @@ impl Session {
|
||||
tokio::spawn(
|
||||
async move {
|
||||
let ctx_for_finish = Arc::clone(&ctx);
|
||||
let last_agent_message = task_for_run
|
||||
let completion = task_for_run
|
||||
.run(
|
||||
Arc::clone(&session_ctx),
|
||||
ctx,
|
||||
@@ -221,7 +222,7 @@ impl Session {
|
||||
sess.flush_rollout().await;
|
||||
if !task_cancellation_token.is_cancelled() {
|
||||
// Emit completion uniformly from spawn site so all tasks share the same lifecycle.
|
||||
sess.on_task_finished(Arc::clone(&ctx_for_finish), last_agent_message)
|
||||
sess.on_task_finished(Arc::clone(&ctx_for_finish), completion)
|
||||
.await;
|
||||
}
|
||||
done_clone.notify_waiters();
|
||||
@@ -314,7 +315,7 @@ impl Session {
|
||||
pub async fn on_task_finished(
|
||||
self: &Arc<Self>,
|
||||
turn_context: Arc<TurnContext>,
|
||||
last_agent_message: Option<String>,
|
||||
outcome: TurnOutcome,
|
||||
) {
|
||||
turn_context
|
||||
.turn_metadata_state
|
||||
@@ -433,7 +434,7 @@ impl Session {
|
||||
}
|
||||
let event = EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: turn_context.sub_id.clone(),
|
||||
last_agent_message,
|
||||
outcome,
|
||||
});
|
||||
self.send_event(turn_context.as_ref(), event).await;
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ use tokio_util::sync::CancellationToken;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::codex::run_turn;
|
||||
use crate::protocol::EventMsg;
|
||||
use crate::protocol::TurnOutcome;
|
||||
use crate::protocol::TurnStartedEvent;
|
||||
use crate::session_startup_prewarm::SessionStartupPrewarmResolution;
|
||||
use crate::state::TaskKind;
|
||||
@@ -41,7 +42,7 @@ impl SessionTask for RegularTask {
|
||||
ctx: Arc<TurnContext>,
|
||||
input: Vec<UserInput>,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
) -> TurnOutcome {
|
||||
let sess = session.clone_session();
|
||||
let run_turn_span = trace_span!("run_turn");
|
||||
// Regular turns emit `TurnStarted` inline so first-turn lifecycle does
|
||||
@@ -57,7 +58,11 @@ impl SessionTask for RegularTask {
|
||||
.consume_startup_prewarm_for_regular_turn(&cancellation_token)
|
||||
.await
|
||||
{
|
||||
SessionStartupPrewarmResolution::Cancelled => return None,
|
||||
SessionStartupPrewarmResolution::Cancelled => {
|
||||
return TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
};
|
||||
}
|
||||
SessionStartupPrewarmResolution::Unavailable { .. } => None,
|
||||
SessionStartupPrewarmResolution::Ready(prewarmed_client_session) => {
|
||||
Some(*prewarmed_client_session)
|
||||
|
||||
@@ -15,6 +15,7 @@ use codex_protocol::protocol::ExitedReviewModeEvent;
|
||||
use codex_protocol::protocol::ItemCompletedEvent;
|
||||
use codex_protocol::protocol::ReviewOutputEvent;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use codex_protocol::protocol::TurnOutcome;
|
||||
use codex_utils_template::Template;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
@@ -64,7 +65,7 @@ impl SessionTask for ReviewTask {
|
||||
ctx: Arc<TurnContext>,
|
||||
input: Vec<UserInput>,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
) -> TurnOutcome {
|
||||
let _ = session.session.services.session_telemetry.counter(
|
||||
"codex.task.review",
|
||||
/*inc*/ 1,
|
||||
@@ -86,7 +87,9 @@ impl SessionTask for ReviewTask {
|
||||
if !cancellation_token.is_cancelled() {
|
||||
exit_review_mode(session.clone_session(), output.clone(), ctx.clone()).await;
|
||||
}
|
||||
None
|
||||
TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
}
|
||||
}
|
||||
|
||||
async fn abort(&self, session: Arc<SessionTaskContext>, ctx: Arc<TurnContext>) {
|
||||
@@ -166,12 +169,18 @@ async fn process_review_events(
|
||||
| EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { .. })
|
||||
| EventMsg::AgentMessageContentDelta(AgentMessageContentDeltaEvent { .. }) => {}
|
||||
EventMsg::TurnComplete(task_complete) => {
|
||||
// Parse review output from the last agent message (if present).
|
||||
let out = task_complete
|
||||
.last_agent_message
|
||||
.as_deref()
|
||||
.map(parse_review_output_event);
|
||||
return out;
|
||||
return match task_complete.outcome {
|
||||
TurnOutcome::Succeeded { last_agent_message } => {
|
||||
last_agent_message.as_deref().map(parse_review_output_event)
|
||||
}
|
||||
TurnOutcome::Failed { error } => {
|
||||
session
|
||||
.clone_session()
|
||||
.send_event(ctx.as_ref(), EventMsg::Error(error))
|
||||
.await;
|
||||
None
|
||||
}
|
||||
};
|
||||
}
|
||||
EventMsg::TurnAborted(_) => {
|
||||
// Cancellation or abort: consumer will finalize with None.
|
||||
|
||||
@@ -2,6 +2,7 @@ use std::sync::Arc;
|
||||
|
||||
use crate::codex::TurnContext;
|
||||
use crate::protocol::EventMsg;
|
||||
use crate::protocol::TurnOutcome;
|
||||
use crate::protocol::UndoCompletedEvent;
|
||||
use crate::protocol::UndoStartedEvent;
|
||||
use crate::state::TaskKind;
|
||||
@@ -41,7 +42,7 @@ impl SessionTask for UndoTask {
|
||||
ctx: Arc<TurnContext>,
|
||||
_input: Vec<UserInput>,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
) -> TurnOutcome {
|
||||
let _ = session.session.services.session_telemetry.counter(
|
||||
"codex.task.undo",
|
||||
/*inc*/ 1,
|
||||
@@ -65,7 +66,9 @@ impl SessionTask for UndoTask {
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
return None;
|
||||
return TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
};
|
||||
}
|
||||
|
||||
let history = sess.clone_history().await;
|
||||
@@ -90,7 +93,9 @@ impl SessionTask for UndoTask {
|
||||
completed.message = Some("No ghost snapshot available to undo.".to_string());
|
||||
sess.send_event(ctx.as_ref(), EventMsg::UndoCompleted(completed))
|
||||
.await;
|
||||
return None;
|
||||
return TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
};
|
||||
};
|
||||
|
||||
let commit_id = ghost_commit.id().to_string();
|
||||
@@ -126,6 +131,8 @@ impl SessionTask for UndoTask {
|
||||
|
||||
sess.send_event(ctx.as_ref(), EventMsg::UndoCompleted(completed))
|
||||
.await;
|
||||
None
|
||||
TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ use crate::protocol::ExecCommandEndEvent;
|
||||
use crate::protocol::ExecCommandSource;
|
||||
use crate::protocol::ExecCommandStatus;
|
||||
use crate::protocol::SandboxPolicy;
|
||||
use crate::protocol::TurnOutcome;
|
||||
use crate::protocol::TurnStartedEvent;
|
||||
use crate::sandboxing::ExecRequest;
|
||||
use crate::state::TaskKind;
|
||||
@@ -78,7 +79,7 @@ impl SessionTask for UserShellCommandTask {
|
||||
turn_context: Arc<TurnContext>,
|
||||
_input: Vec<UserInput>,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
) -> TurnOutcome {
|
||||
execute_user_shell_command(
|
||||
session.clone_session(),
|
||||
turn_context,
|
||||
@@ -87,7 +88,9 @@ impl SessionTask for UserShellCommandTask {
|
||||
UserShellCommandMode::StandaloneTurn,
|
||||
)
|
||||
.await;
|
||||
None
|
||||
TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ use crate::config::types::ShellEnvironmentPolicy;
|
||||
use crate::function_tool::FunctionCallError;
|
||||
use crate::protocol::AgentStatus;
|
||||
use crate::protocol::AskForApproval;
|
||||
use crate::protocol::ErrorEvent;
|
||||
use crate::protocol::EventMsg;
|
||||
use crate::protocol::FileSystemSandboxPolicy;
|
||||
use crate::protocol::NetworkSandboxPolicy;
|
||||
@@ -17,6 +18,7 @@ use crate::protocol::SandboxPolicy;
|
||||
use crate::protocol::SessionSource;
|
||||
use crate::protocol::SubAgentSource;
|
||||
use crate::protocol::TurnCompleteEvent;
|
||||
use crate::protocol::TurnOutcome;
|
||||
use crate::state::TaskKind;
|
||||
use crate::tasks::SessionTask;
|
||||
use crate::tasks::SessionTaskContext;
|
||||
@@ -112,6 +114,9 @@ fn history_contains_inter_agent_communication(
|
||||
#[derive(Clone, Copy)]
|
||||
struct NeverEndingTask;
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
struct FailingTask;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl SessionTask for NeverEndingTask {
|
||||
fn kind(&self) -> TaskKind {
|
||||
@@ -128,9 +133,37 @@ impl SessionTask for NeverEndingTask {
|
||||
_ctx: Arc<TurnContext>,
|
||||
_input: Vec<UserInput>,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
) -> TurnOutcome {
|
||||
cancellation_token.cancelled().await;
|
||||
None
|
||||
TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl SessionTask for FailingTask {
|
||||
fn kind(&self) -> TaskKind {
|
||||
TaskKind::Regular
|
||||
}
|
||||
|
||||
fn span_name(&self) -> &'static str {
|
||||
"session_task.multi_agent_failing"
|
||||
}
|
||||
|
||||
async fn run(
|
||||
self: Arc<Self>,
|
||||
_session: Arc<SessionTaskContext>,
|
||||
_ctx: Arc<TurnContext>,
|
||||
_input: Vec<UserInput>,
|
||||
_cancellation_token: CancellationToken,
|
||||
) -> TurnOutcome {
|
||||
TurnOutcome::Failed {
|
||||
error: ErrorEvent {
|
||||
message: "boom".to_string(),
|
||||
codex_error_info: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -601,7 +634,9 @@ async fn multi_agent_v2_list_agents_returns_completed_status_and_last_task_messa
|
||||
child_turn.as_ref(),
|
||||
EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: child_turn.sub_id.clone(),
|
||||
last_agent_message: Some("done".to_string()),
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: Some("done".to_string()),
|
||||
},
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
@@ -1119,6 +1154,277 @@ async fn multi_agent_v2_assign_task_interrupts_busy_child_without_losing_message
|
||||
.expect("shutdown should submit");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn multi_agent_v2_assign_task_completion_notifies_parent_after_reuse() {
|
||||
let (mut session, mut turn) = make_session_and_context().await;
|
||||
let manager = thread_manager();
|
||||
let root = manager
|
||||
.start_thread((*turn.config).clone())
|
||||
.await
|
||||
.expect("root thread should start");
|
||||
session.services.agent_control = manager.agent_control();
|
||||
session.conversation_id = root.thread_id;
|
||||
let mut config = turn.config.as_ref().clone();
|
||||
let _ = config.features.enable(Feature::MultiAgentV2);
|
||||
turn.config = Arc::new(config);
|
||||
let session = Arc::new(session);
|
||||
let turn = Arc::new(turn);
|
||||
|
||||
SpawnAgentHandlerV2
|
||||
.handle(invocation(
|
||||
session.clone(),
|
||||
turn.clone(),
|
||||
"spawn_agent",
|
||||
function_payload(json!({
|
||||
"message": "boot worker",
|
||||
"task_name": "worker"
|
||||
})),
|
||||
))
|
||||
.await
|
||||
.expect("spawn worker");
|
||||
let agent_id = session
|
||||
.services
|
||||
.agent_control
|
||||
.resolve_agent_reference(session.conversation_id, &turn.session_source, "worker")
|
||||
.await
|
||||
.expect("worker should resolve");
|
||||
let thread = manager
|
||||
.get_thread(agent_id)
|
||||
.await
|
||||
.expect("worker thread should exist");
|
||||
let worker_path = AgentPath::try_from("/root/worker").expect("agent path");
|
||||
|
||||
let first_turn = thread.codex.session.new_default_turn().await;
|
||||
thread
|
||||
.codex
|
||||
.session
|
||||
.send_event(
|
||||
first_turn.as_ref(),
|
||||
EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: first_turn.sub_id.clone(),
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: Some("done once".to_string()),
|
||||
},
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
AssignTaskHandlerV2
|
||||
.handle(invocation(
|
||||
session,
|
||||
turn,
|
||||
"assign_task",
|
||||
function_payload(json!({
|
||||
"target": agent_id.to_string(),
|
||||
"items": [{"type": "text", "text": "continue"}]
|
||||
})),
|
||||
))
|
||||
.await
|
||||
.expect("assign_task should succeed");
|
||||
|
||||
let second_turn = thread.codex.session.new_default_turn().await;
|
||||
thread
|
||||
.codex
|
||||
.session
|
||||
.send_event(
|
||||
second_turn.as_ref(),
|
||||
EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: second_turn.sub_id.clone(),
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: Some("done twice".to_string()),
|
||||
},
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
let expected_first = (
|
||||
root.thread_id,
|
||||
Op::InterAgentCommunication {
|
||||
communication: InterAgentCommunication::new(
|
||||
worker_path.clone(),
|
||||
AgentPath::root(),
|
||||
Vec::new(),
|
||||
crate::session_prefix::format_subagent_notification_message(
|
||||
worker_path.as_str(),
|
||||
&AgentStatus::Completed(Some("done once".to_string())),
|
||||
),
|
||||
/*trigger_turn*/ false,
|
||||
),
|
||||
},
|
||||
);
|
||||
let expected_second = (
|
||||
root.thread_id,
|
||||
Op::InterAgentCommunication {
|
||||
communication: InterAgentCommunication::new(
|
||||
worker_path,
|
||||
AgentPath::root(),
|
||||
Vec::new(),
|
||||
crate::session_prefix::format_subagent_notification_message(
|
||||
"/root/worker",
|
||||
&AgentStatus::Completed(Some("done twice".to_string())),
|
||||
),
|
||||
/*trigger_turn*/ false,
|
||||
),
|
||||
},
|
||||
);
|
||||
|
||||
timeout(Duration::from_secs(5), async {
|
||||
loop {
|
||||
let captured = manager.captured_ops();
|
||||
if captured.contains(&expected_first) && captured.contains(&expected_second) {
|
||||
break;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
})
|
||||
.await
|
||||
.expect("reused worker should notify parent on both completed turns");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn multi_agent_v2_failed_turn_forwards_only_error_status_after_reuse() {
|
||||
let (mut session, mut turn) = make_session_and_context().await;
|
||||
let manager = thread_manager();
|
||||
let root = manager
|
||||
.start_thread((*turn.config).clone())
|
||||
.await
|
||||
.expect("root thread should start");
|
||||
session.services.agent_control = manager.agent_control();
|
||||
session.conversation_id = root.thread_id;
|
||||
let mut config = turn.config.as_ref().clone();
|
||||
let _ = config.features.enable(Feature::MultiAgentV2);
|
||||
turn.config = Arc::new(config);
|
||||
let session = Arc::new(session);
|
||||
let turn = Arc::new(turn);
|
||||
|
||||
SpawnAgentHandlerV2
|
||||
.handle(invocation(
|
||||
session.clone(),
|
||||
turn.clone(),
|
||||
"spawn_agent",
|
||||
function_payload(json!({
|
||||
"message": "boot worker",
|
||||
"task_name": "worker"
|
||||
})),
|
||||
))
|
||||
.await
|
||||
.expect("spawn worker");
|
||||
let agent_id = session
|
||||
.services
|
||||
.agent_control
|
||||
.resolve_agent_reference(session.conversation_id, &turn.session_source, "worker")
|
||||
.await
|
||||
.expect("worker should resolve");
|
||||
let thread = manager
|
||||
.get_thread(agent_id)
|
||||
.await
|
||||
.expect("worker thread should exist");
|
||||
let worker_path = AgentPath::try_from("/root/worker").expect("agent path");
|
||||
|
||||
let first_turn = thread.codex.session.new_default_turn().await;
|
||||
thread
|
||||
.codex
|
||||
.session
|
||||
.send_event(
|
||||
first_turn.as_ref(),
|
||||
EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: first_turn.sub_id.clone(),
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: Some("done once".to_string()),
|
||||
},
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
AssignTaskHandlerV2
|
||||
.handle(invocation(
|
||||
session,
|
||||
turn,
|
||||
"assign_task",
|
||||
function_payload(json!({
|
||||
"target": agent_id.to_string(),
|
||||
"items": [{"type": "text", "text": "continue"}]
|
||||
})),
|
||||
))
|
||||
.await
|
||||
.expect("assign_task should succeed");
|
||||
|
||||
let second_turn = thread.codex.session.new_default_turn().await;
|
||||
thread
|
||||
.codex
|
||||
.session
|
||||
.spawn_task(
|
||||
Arc::clone(&second_turn),
|
||||
vec![UserInput::Text {
|
||||
text: "continue".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
FailingTask,
|
||||
)
|
||||
.await;
|
||||
|
||||
let expected_first = (
|
||||
root.thread_id,
|
||||
Op::InterAgentCommunication {
|
||||
communication: InterAgentCommunication::new(
|
||||
worker_path.clone(),
|
||||
AgentPath::root(),
|
||||
Vec::new(),
|
||||
crate::session_prefix::format_subagent_notification_message(
|
||||
worker_path.as_str(),
|
||||
&AgentStatus::Completed(Some("done once".to_string())),
|
||||
),
|
||||
/*trigger_turn*/ false,
|
||||
),
|
||||
},
|
||||
);
|
||||
let expected_error = (
|
||||
root.thread_id,
|
||||
Op::InterAgentCommunication {
|
||||
communication: InterAgentCommunication::new(
|
||||
worker_path.clone(),
|
||||
AgentPath::root(),
|
||||
Vec::new(),
|
||||
crate::session_prefix::format_subagent_notification_message(
|
||||
worker_path.as_str(),
|
||||
&AgentStatus::Errored("boom".to_string()),
|
||||
),
|
||||
/*trigger_turn*/ false,
|
||||
),
|
||||
},
|
||||
);
|
||||
let unexpected_completed = (
|
||||
root.thread_id,
|
||||
Op::InterAgentCommunication {
|
||||
communication: InterAgentCommunication::new(
|
||||
worker_path,
|
||||
AgentPath::root(),
|
||||
Vec::new(),
|
||||
crate::session_prefix::format_subagent_notification_message(
|
||||
"/root/worker",
|
||||
&AgentStatus::Completed(None),
|
||||
),
|
||||
/*trigger_turn*/ false,
|
||||
),
|
||||
},
|
||||
);
|
||||
|
||||
timeout(Duration::from_secs(5), async {
|
||||
loop {
|
||||
let captured = manager.captured_ops();
|
||||
if captured.contains(&expected_first)
|
||||
&& captured.contains(&expected_error)
|
||||
&& !captured.contains(&unexpected_completed)
|
||||
{
|
||||
break;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
})
|
||||
.await
|
||||
.expect("failed reused worker turn should only notify parent with the errored status");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn multi_agent_v2_spawn_includes_agent_id_key_when_named() {
|
||||
let (mut session, mut turn) = make_session_and_context().await;
|
||||
|
||||
@@ -134,7 +134,7 @@ async fn resume_includes_initial_messages_from_rollout_events() -> Result<()> {
|
||||
assert_eq!(assistant_message.message, "Completed first turn");
|
||||
assert_eq!(completed.turn_id, started.turn_id);
|
||||
assert_eq!(
|
||||
completed.last_agent_message.as_deref(),
|
||||
completed.last_agent_message().as_deref(),
|
||||
Some("Completed first turn")
|
||||
);
|
||||
}
|
||||
@@ -224,7 +224,7 @@ async fn resume_includes_initial_messages_from_reasoning_events() -> Result<()>
|
||||
assert_eq!(assistant_message.message, "Completed reasoning turn");
|
||||
assert_eq!(completed.turn_id, started.turn_id);
|
||||
assert_eq!(
|
||||
completed.last_agent_message.as_deref(),
|
||||
completed.last_agent_message().as_deref(),
|
||||
Some("Completed reasoning turn")
|
||||
);
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ use codex_protocol::protocol::ExecApprovalRequestEvent;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::Submission;
|
||||
use codex_protocol::protocol::TurnCompleteEvent;
|
||||
use codex_protocol::protocol::TurnOutcome;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use rmcp::model::CallToolResult;
|
||||
use rmcp::model::Content;
|
||||
@@ -291,16 +292,15 @@ async fn run_codex_tool_session_inner(
|
||||
.await;
|
||||
continue;
|
||||
}
|
||||
EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
last_agent_message, ..
|
||||
}) => {
|
||||
let text = match last_agent_message {
|
||||
Some(msg) => msg,
|
||||
None => "".to_string(),
|
||||
EventMsg::TurnComplete(TurnCompleteEvent { outcome, .. }) => {
|
||||
let (text, is_error) = match outcome {
|
||||
TurnOutcome::Succeeded { last_agent_message } => {
|
||||
(last_agent_message.unwrap_or_default(), None)
|
||||
}
|
||||
TurnOutcome::Failed { error } => (error.message, Some(true)),
|
||||
};
|
||||
let result = create_call_tool_result_with_thread_id(
|
||||
thread_id, text, /*is_error*/ None,
|
||||
);
|
||||
let result =
|
||||
create_call_tool_result_with_thread_id(thread_id, text, is_error);
|
||||
outgoing.send_response(request_id.clone(), result).await;
|
||||
// unregister the id so we don't keep it in the map
|
||||
running_requests_id_to_codex_uuid
|
||||
|
||||
@@ -1858,10 +1858,18 @@ pub struct ModelRerouteEvent {
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
|
||||
pub struct ContextCompactedEvent;
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
|
||||
#[serde(untagged)]
|
||||
pub enum TurnOutcome {
|
||||
Succeeded { last_agent_message: Option<String> },
|
||||
Failed { error: ErrorEvent },
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
|
||||
pub struct TurnCompleteEvent {
|
||||
pub turn_id: String,
|
||||
pub last_agent_message: Option<String>,
|
||||
#[serde(flatten)]
|
||||
pub outcome: TurnOutcome,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
|
||||
|
||||
@@ -103,6 +103,8 @@ use codex_protocol::protocol::TurnAbortedEvent;
|
||||
#[cfg(test)]
|
||||
use codex_protocol::protocol::TurnCompleteEvent;
|
||||
#[cfg(test)]
|
||||
use codex_protocol::protocol::TurnOutcome;
|
||||
#[cfg(test)]
|
||||
use codex_protocol::protocol::TurnStartedEvent;
|
||||
#[cfg(test)]
|
||||
use std::time::Duration;
|
||||
@@ -731,7 +733,7 @@ fn turn_snapshot_events(
|
||||
///
|
||||
/// - `Completed` → `TurnComplete`
|
||||
/// - `Interrupted` → `TurnAborted { reason: Interrupted }`
|
||||
/// - `Failed` → `Error` (if present) then `TurnComplete`
|
||||
/// - `Failed` → `TurnComplete` with failed outcome
|
||||
/// - `InProgress` → no events (the turn is still running)
|
||||
#[cfg(test)]
|
||||
fn append_terminal_turn_events(events: &mut Vec<Event>, turn: &Turn, include_failed_error: bool) {
|
||||
@@ -740,7 +742,9 @@ fn append_terminal_turn_events(events: &mut Vec<Event>, turn: &Turn, include_fai
|
||||
id: String::new(),
|
||||
msg: EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: turn.id.clone(),
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
}),
|
||||
}),
|
||||
TurnStatus::Interrupted => events.push(Event {
|
||||
@@ -751,23 +755,26 @@ fn append_terminal_turn_events(events: &mut Vec<Event>, turn: &Turn, include_fai
|
||||
}),
|
||||
}),
|
||||
TurnStatus::Failed => {
|
||||
if include_failed_error && let Some(error) = &turn.error {
|
||||
events.push(Event {
|
||||
id: String::new(),
|
||||
msg: EventMsg::Error(ErrorEvent {
|
||||
message: error.message.clone(),
|
||||
codex_error_info: error
|
||||
.codex_error_info
|
||||
.clone()
|
||||
.and_then(app_server_codex_error_info_to_core),
|
||||
}),
|
||||
});
|
||||
}
|
||||
let _ = include_failed_error;
|
||||
events.push(Event {
|
||||
id: String::new(),
|
||||
msg: EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: turn.id.clone(),
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Failed {
|
||||
error: ErrorEvent {
|
||||
message: turn
|
||||
.error
|
||||
.as_ref()
|
||||
.map(|error| error.message.clone())
|
||||
.unwrap_or_default(),
|
||||
codex_error_info: turn.error.as_ref().and_then(|error| {
|
||||
error
|
||||
.codex_error_info
|
||||
.clone()
|
||||
.and_then(app_server_codex_error_info_to_core)
|
||||
}),
|
||||
},
|
||||
},
|
||||
}),
|
||||
});
|
||||
}
|
||||
@@ -1120,7 +1127,7 @@ mod tests {
|
||||
panic!("expected turn complete event");
|
||||
};
|
||||
assert_eq!(completed.turn_id, turn_id);
|
||||
assert_eq!(completed.last_agent_message, None);
|
||||
assert_eq!(completed.last_agent_message(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1366,7 +1373,7 @@ mod tests {
|
||||
panic!("expected turn complete event");
|
||||
};
|
||||
assert_eq!(completed.turn_id, turn_id);
|
||||
assert_eq!(completed.last_agent_message, None);
|
||||
assert_eq!(completed.last_agent_message(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -194,8 +194,6 @@ use codex_protocol::protocol::TokenUsage;
|
||||
use codex_protocol::protocol::TokenUsageInfo;
|
||||
use codex_protocol::protocol::TurnAbortReason;
|
||||
#[cfg(test)]
|
||||
use codex_protocol::protocol::TurnCompleteEvent;
|
||||
#[cfg(test)]
|
||||
use codex_protocol::protocol::TurnDiffEvent;
|
||||
#[cfg(test)]
|
||||
use codex_protocol::protocol::UndoCompletedEvent;
|
||||
@@ -6927,11 +6925,16 @@ impl ChatWidget {
|
||||
self.on_task_started();
|
||||
}
|
||||
}
|
||||
EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
last_agent_message, ..
|
||||
}) => {
|
||||
self.on_task_complete(last_agent_message, from_replay);
|
||||
}
|
||||
EventMsg::TurnComplete(codex_protocol::protocol::TurnCompleteEvent {
|
||||
outcome, ..
|
||||
}) => match outcome {
|
||||
codex_protocol::protocol::TurnOutcome::Succeeded { last_agent_message } => {
|
||||
self.on_task_complete(last_agent_message, from_replay);
|
||||
}
|
||||
codex_protocol::protocol::TurnOutcome::Failed { error } => {
|
||||
self.on_error(error.message);
|
||||
}
|
||||
},
|
||||
EventMsg::TokenCount(ev) => {
|
||||
self.set_token_info(ev.info);
|
||||
self.on_rate_limit_snapshot(ev.rate_limits);
|
||||
|
||||
@@ -1741,7 +1741,9 @@ async fn steer_rejection_queues_review_follow_up_before_existing_queued_messages
|
||||
id: "turn-complete".into(),
|
||||
msg: EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-1".to_string(),
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
}),
|
||||
});
|
||||
|
||||
@@ -1760,7 +1762,9 @@ async fn steer_rejection_queues_review_follow_up_before_existing_queued_messages
|
||||
id: "turn-complete-2".into(),
|
||||
msg: EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-2".to_string(),
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
}),
|
||||
});
|
||||
|
||||
@@ -3275,7 +3279,9 @@ async fn plan_implementation_popup_skips_replayed_turn_complete() {
|
||||
|
||||
chat.replay_initial_messages(vec![EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-1".to_string(),
|
||||
last_agent_message: Some("Plan details".to_string()),
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: Some("Plan details".to_string()),
|
||||
},
|
||||
})]);
|
||||
|
||||
let popup = render_bottom_popup(&chat, /*width*/ 80);
|
||||
@@ -3299,7 +3305,9 @@ async fn plan_implementation_popup_shows_once_when_replay_precedes_live_turn_com
|
||||
|
||||
chat.replay_initial_messages(vec![EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-1".to_string(),
|
||||
last_agent_message: Some("Plan details".to_string()),
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: Some("Plan details".to_string()),
|
||||
},
|
||||
})]);
|
||||
let replay_popup = render_bottom_popup(&chat, /*width*/ 80);
|
||||
assert!(
|
||||
@@ -3311,7 +3319,9 @@ async fn plan_implementation_popup_shows_once_when_replay_precedes_live_turn_com
|
||||
id: "live-turn-complete-1".to_string(),
|
||||
msg: EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-1".to_string(),
|
||||
last_agent_message: Some("Plan details".to_string()),
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: Some("Plan details".to_string()),
|
||||
},
|
||||
}),
|
||||
});
|
||||
|
||||
@@ -3332,7 +3342,9 @@ async fn plan_implementation_popup_shows_once_when_replay_precedes_live_turn_com
|
||||
id: "live-turn-complete-2".to_string(),
|
||||
msg: EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-1".to_string(),
|
||||
last_agent_message: Some("Plan details".to_string()),
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: Some("Plan details".to_string()),
|
||||
},
|
||||
}),
|
||||
});
|
||||
let duplicate_popup = render_bottom_popup(&chat, /*width*/ 80);
|
||||
@@ -6463,7 +6475,9 @@ async fn unified_exec_wait_after_final_agent_message_snapshot() {
|
||||
id: "turn-1".into(),
|
||||
msg: EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-1".to_string(),
|
||||
last_agent_message: Some("Final response.".into()),
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: Some("Final response.".into()),
|
||||
},
|
||||
}),
|
||||
});
|
||||
|
||||
@@ -6505,7 +6519,9 @@ async fn unified_exec_wait_before_streamed_agent_message_snapshot() {
|
||||
id: "turn-1".into(),
|
||||
msg: EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-1".to_string(),
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
}),
|
||||
});
|
||||
|
||||
@@ -6570,7 +6586,9 @@ async fn unified_exec_waiting_multiple_empty_snapshots() {
|
||||
id: "turn-wait-1".into(),
|
||||
msg: EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-1".to_string(),
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
}),
|
||||
});
|
||||
|
||||
@@ -6648,7 +6666,9 @@ async fn unified_exec_non_empty_then_empty_snapshots() {
|
||||
id: "turn-wait-3".into(),
|
||||
msg: EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-1".to_string(),
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
}),
|
||||
});
|
||||
|
||||
@@ -7150,7 +7170,9 @@ async fn slash_copy_state_tracks_turn_complete_final_reply() {
|
||||
id: "turn-1".into(),
|
||||
msg: EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-1".to_string(),
|
||||
last_agent_message: Some("Final reply **markdown**".to_string()),
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: Some("Final reply **markdown**".to_string()),
|
||||
},
|
||||
}),
|
||||
});
|
||||
|
||||
@@ -7180,7 +7202,9 @@ async fn slash_copy_state_tracks_plan_item_completion() {
|
||||
id: "turn-1".into(),
|
||||
msg: EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-1".to_string(),
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
}),
|
||||
});
|
||||
|
||||
@@ -7213,7 +7237,9 @@ async fn slash_copy_state_is_preserved_during_running_task() {
|
||||
id: "turn-1".into(),
|
||||
msg: EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-1".to_string(),
|
||||
last_agent_message: Some("Previous completed reply".to_string()),
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: Some("Previous completed reply".to_string()),
|
||||
},
|
||||
}),
|
||||
});
|
||||
chat.on_task_started();
|
||||
@@ -7232,7 +7258,9 @@ async fn slash_copy_state_clears_on_thread_rollback() {
|
||||
id: "turn-1".into(),
|
||||
msg: EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-1".to_string(),
|
||||
last_agent_message: Some("Reply that will be rolled back".to_string()),
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: Some("Reply that will be rolled back".to_string()),
|
||||
},
|
||||
}),
|
||||
});
|
||||
chat.handle_codex_event(Event {
|
||||
@@ -7260,7 +7288,9 @@ async fn slash_copy_is_unavailable_when_legacy_agent_message_is_not_repeated_on_
|
||||
id: "turn-1".into(),
|
||||
msg: EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-1".to_string(),
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
}),
|
||||
});
|
||||
let _ = drain_insert_history(&mut rx);
|
||||
@@ -7301,7 +7331,9 @@ async fn slash_copy_uses_agent_message_item_when_turn_complete_omits_final_text(
|
||||
id: "turn-1".into(),
|
||||
msg: EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-1".to_string(),
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
}),
|
||||
});
|
||||
let _ = drain_insert_history(&mut rx);
|
||||
@@ -7346,7 +7378,9 @@ async fn slash_copy_does_not_return_stale_output_after_thread_rollback() {
|
||||
id: "turn-1".into(),
|
||||
msg: EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-1".to_string(),
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
}),
|
||||
});
|
||||
let _ = drain_insert_history(&mut rx);
|
||||
@@ -11230,7 +11264,9 @@ async fn turn_complete_keeps_unified_exec_processes() {
|
||||
id: "turn-1".into(),
|
||||
msg: EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-1".to_string(),
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
}),
|
||||
});
|
||||
|
||||
@@ -13172,7 +13208,9 @@ async fn status_line_branch_refreshes_after_turn_complete() {
|
||||
id: "turn-1".into(),
|
||||
msg: EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-1".to_string(),
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
}),
|
||||
});
|
||||
|
||||
@@ -13474,7 +13512,9 @@ async fn multiple_agent_messages_in_single_turn_emit_multiple_headers() {
|
||||
id: "s1".into(),
|
||||
msg: EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-1".to_string(),
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
}),
|
||||
});
|
||||
|
||||
@@ -13945,7 +13985,9 @@ printf 'fenced within fenced\n'
|
||||
id: "t1".into(),
|
||||
msg: EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: "turn-1".to_string(),
|
||||
last_agent_message: None,
|
||||
outcome: TurnOutcome::Succeeded {
|
||||
last_agent_message: None,
|
||||
},
|
||||
}),
|
||||
});
|
||||
for lines in drain_insert_history(&mut rx) {
|
||||
|
||||
Reference in New Issue
Block a user