Compare commits

...

5 Commits

Author SHA1 Message Date
jif-oai
51062332c2 fmt 2026-03-30 19:20:58 +02:00
jif-oai
9e6edbb1bc simplify further 2026-03-30 19:20:22 +02:00
jif-oai
0257728856 other approoach 2026-03-30 19:09:53 +02:00
jif-oai
90cabecba9 fmt 2026-03-30 18:56:25 +02:00
jif-oai
1cd1fbd6b5 feat: make task return a more complex type 2026-03-30 18:50:26 +02:00
23 changed files with 883 additions and 211 deletions

View File

@@ -44,6 +44,7 @@ use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::ThreadRolledBackEvent;
use codex_protocol::protocol::TurnAbortedEvent;
use codex_protocol::protocol::TurnCompleteEvent;
use codex_protocol::protocol::TurnOutcome;
use codex_protocol::protocol::TurnStartedEvent;
use codex_protocol::protocol::UserMessageEvent;
use codex_protocol::protocol::ViewImageToolCallEvent;
@@ -920,9 +921,19 @@ impl ThreadHistoryBuilder {
}
fn handle_turn_complete(&mut self, payload: &TurnCompleteEvent) {
let mark_completed = |status: &mut TurnStatus| {
if matches!(*status, TurnStatus::Completed | TurnStatus::InProgress) {
*status = TurnStatus::Completed;
let mark_finished = |turn: &mut PendingTurn| match &payload.outcome {
TurnOutcome::Succeeded { .. } => {
if matches!(turn.status, TurnStatus::Completed | TurnStatus::InProgress) {
turn.status = TurnStatus::Completed;
}
}
TurnOutcome::Failed { error } => {
turn.status = TurnStatus::Failed;
turn.error = Some(V2TurnError {
message: error.message.clone(),
codex_error_info: error.codex_error_info.clone().map(Into::into),
additional_details: None,
});
}
};
@@ -932,7 +943,7 @@ impl ThreadHistoryBuilder {
.as_mut()
.filter(|turn| turn.id == payload.turn_id)
{
mark_completed(&mut current_turn.status);
mark_finished(current_turn);
self.finish_current_turn();
return;
}
@@ -942,13 +953,27 @@ impl ThreadHistoryBuilder {
.iter_mut()
.find(|turn| turn.id == payload.turn_id)
{
mark_completed(&mut turn.status);
match &payload.outcome {
TurnOutcome::Succeeded { .. } => {
if matches!(turn.status, TurnStatus::Completed | TurnStatus::InProgress) {
turn.status = TurnStatus::Completed;
}
}
TurnOutcome::Failed { error } => {
turn.status = TurnStatus::Failed;
turn.error = Some(V2TurnError {
message: error.message.clone(),
codex_error_info: error.codex_error_info.clone().map(Into::into),
additional_details: None,
});
}
}
return;
}
// If the completion event cannot be matched, apply it to the active turn.
if let Some(current_turn) = self.current_turn.as_mut() {
mark_completed(&mut current_turn.status);
mark_finished(current_turn);
self.finish_current_turn();
}
}
@@ -1361,7 +1386,9 @@ mod tests {
}),
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: turn_id.to_string(),
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
}),
];
@@ -1432,7 +1459,9 @@ mod tests {
})),
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-image".into(),
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
})),
];
@@ -1747,7 +1776,9 @@ mod tests {
}),
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-a".into(),
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
}),
];
@@ -2046,7 +2077,9 @@ mod tests {
}),
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-a".into(),
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
}),
EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-b".into(),
@@ -2080,7 +2113,9 @@ mod tests {
}),
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-b".into(),
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
}),
];
@@ -2129,7 +2164,9 @@ mod tests {
}),
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-a".into(),
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
}),
EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-b".into(),
@@ -2163,7 +2200,9 @@ mod tests {
}),
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-b".into(),
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
}),
];
@@ -2332,7 +2371,9 @@ mod tests {
}),
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-a".into(),
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
}),
EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-b".into(),
@@ -2347,7 +2388,9 @@ mod tests {
}),
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-a".into(),
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
}),
EventMsg::AgentMessage(AgentMessageEvent {
message: "still in b".into(),
@@ -2356,7 +2399,9 @@ mod tests {
}),
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-b".into(),
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
}),
];
@@ -2387,7 +2432,9 @@ mod tests {
}),
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-a".into(),
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
}),
EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-b".into(),
@@ -2437,7 +2484,9 @@ mod tests {
}),
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-compact".into(),
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
})),
];
@@ -2676,7 +2725,9 @@ mod tests {
}),
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-a".into(),
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
}),
EventMsg::Error(ErrorEvent {
message: "request-level failure".into(),
@@ -2729,7 +2780,9 @@ mod tests {
}),
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-a".into(),
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
}),
];
@@ -2777,7 +2830,9 @@ mod tests {
RolloutItem::ResponseItem(hook_prompt),
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-a".into(),
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
})),
];
@@ -2822,7 +2877,9 @@ mod tests {
}),
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-a".into(),
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
})),
];

View File

@@ -128,6 +128,7 @@ use codex_protocol::protocol::RealtimeEvent;
use codex_protocol::protocol::ReviewDecision;
use codex_protocol::protocol::ReviewOutputEvent;
use codex_protocol::protocol::TokenCountEvent;
use codex_protocol::protocol::TurnCompleteEvent;
use codex_protocol::protocol::TurnDiffEvent;
use codex_protocol::request_permissions::PermissionGrantScope as CorePermissionGrantScope;
use codex_protocol::request_permissions::RequestPermissionProfile as CoreRequestPermissionProfile;
@@ -297,11 +298,23 @@ pub(crate) async fn apply_bespoke_event_handling(
EventMsg::TurnComplete(_ev) => {
// All per-thread requests are bound to a turn, so abort them.
outgoing.abort_pending_server_requests().await;
let turn_failed = thread_state.lock().await.turn_summary.last_error.is_some();
let turn_failed = thread_state.lock().await.turn_summary.last_error.is_some()
|| matches!(
&_ev.outcome,
codex_protocol::protocol::TurnOutcome::Failed { error }
if error.affects_turn_status()
);
thread_watch_manager
.note_turn_completed(&conversation_id.to_string(), turn_failed)
.await;
handle_turn_complete(conversation_id, event_turn_id, &outgoing, &thread_state).await;
handle_turn_complete(
conversation_id,
event_turn_id,
&_ev,
&outgoing,
&thread_state,
)
.await;
}
EventMsg::SkillsUpdateAvailable => {
if let ApiVersion::V2 = api_version {
@@ -2100,6 +2113,7 @@ async fn find_and_remove_turn_summary(
async fn handle_turn_complete(
conversation_id: ThreadId,
event_turn_id: String,
event: &TurnCompleteEvent,
outgoing: &ThreadScopedOutgoingMessageSender,
thread_state: &Arc<Mutex<ThreadState>>,
) {
@@ -2107,7 +2121,21 @@ async fn handle_turn_complete(
let (status, error) = match turn_summary.last_error {
Some(error) => (TurnStatus::Failed, Some(error)),
None => (TurnStatus::Completed, None),
None => match &event.outcome {
codex_protocol::protocol::TurnOutcome::Failed { error }
if error.affects_turn_status() =>
{
(
TurnStatus::Failed,
Some(TurnError {
message: error.message.clone(),
codex_error_info: error.codex_error_info.clone().map(Into::into),
additional_details: None,
}),
)
}
_ => (TurnStatus::Completed, None),
},
};
emit_turn_completed_with_status(conversation_id, event_turn_id, status, error, outgoing).await;

View File

@@ -527,6 +527,37 @@ impl AgentControl {
result
}
pub(crate) async fn forward_child_final_status_to_parent(
&self,
session_source: &SessionSource,
status: &AgentStatus,
) {
let Some(parent_thread_id) = thread_spawn_parent_thread_id(session_source) else {
return;
};
let Some(child_agent_path) = session_source.get_agent_path() else {
return;
};
let Some(parent_agent_path) = child_agent_path
.as_str()
.rsplit_once('/')
.and_then(|(parent, _)| AgentPath::try_from(parent).ok())
else {
return;
};
let message = format_subagent_notification_message(child_agent_path.as_str(), status);
let communication = InterAgentCommunication::new(
child_agent_path,
parent_agent_path,
Vec::new(),
message,
/*trigger_turn*/ false,
);
let _ = self
.send_inter_agent_communication(parent_thread_id, communication)
.await;
}
/// Interrupt the current task for an existing agent thread.
pub(crate) async fn interrupt_agent(&self, agent_id: ThreadId) -> CodexResult<String> {
let state = self.upgrade()?;
@@ -782,6 +813,11 @@ impl AgentControl {
child_reference: String,
child_agent_path: Option<AgentPath>,
) {
// Agent-path children now forward final status directly from `Session::send_event`,
// which keeps reused spawned agents notifying their parent on every completed turn.
if child_agent_path.is_some() {
return;
}
let Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id, ..
})) = session_source

View File

@@ -250,7 +250,9 @@ async fn on_event_updates_status_from_task_started() {
async fn on_event_updates_status_from_task_complete() {
let status = agent_status_from_event(&EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-1".to_string(),
last_agent_message: Some("done".to_string()),
outcome: TurnOutcome::Succeeded {
last_agent_message: Some("done".to_string()),
},
}));
let expected = AgentStatus::Completed(Some("done".to_string()));
assert_eq!(status, Some(expected));
@@ -1133,7 +1135,9 @@ async fn multi_agent_v2_completion_ignores_dead_direct_parent() {
tester_turn.as_ref(),
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: tester_turn.sub_id.clone(),
last_agent_message: Some("done".to_string()),
outcome: TurnOutcome::Succeeded {
last_agent_message: Some("done".to_string()),
},
}),
)
.await;
@@ -1180,35 +1184,46 @@ async fn multi_agent_v2_completion_ignores_dead_direct_parent() {
#[tokio::test]
async fn multi_agent_v2_completion_queues_message_for_direct_parent() {
let harness = AgentControlHarness::new().await;
let (_root_thread_id, root_thread) = harness.start_thread().await;
let (worker_thread_id, _worker_thread) = harness.start_thread().await;
let (root_thread_id, root_thread) = harness.start_thread().await;
let mut tester_config = harness.config.clone();
let _ = tester_config.features.enable(Feature::MultiAgentV2);
let tester_thread_id = harness
.manager
.start_thread(tester_config)
let worker_path = AgentPath::root().join("worker_a").expect("worker path");
let tester_path = worker_path.join("tester").expect("tester path");
let worker_thread_id = harness
.control
.spawn_agent(
tester_config.clone(),
text_input("hello worker"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: root_thread_id,
depth: 1,
agent_path: Some(worker_path.clone()),
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
)
.await
.expect("tester thread should start")
.thread_id;
.expect("worker spawn should succeed");
let tester_thread_id = harness
.control
.spawn_agent(
tester_config,
text_input("hello tester"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: worker_thread_id,
depth: 2,
agent_path: Some(tester_path.clone()),
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
)
.await
.expect("tester spawn should succeed");
let tester_thread = harness
.manager
.get_thread(tester_thread_id)
.await
.expect("tester thread should exist");
let worker_path = AgentPath::root().join("worker_a").expect("worker path");
let tester_path = worker_path.join("tester").expect("tester path");
harness.control.maybe_start_completion_watcher(
tester_thread_id,
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: worker_thread_id,
depth: 2,
agent_path: Some(tester_path.clone()),
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
tester_path.to_string(),
Some(tester_path.clone()),
);
let tester_turn = tester_thread.codex.session.new_default_turn().await;
tester_thread
.codex
@@ -1217,11 +1232,12 @@ async fn multi_agent_v2_completion_queues_message_for_direct_parent() {
tester_turn.as_ref(),
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: tester_turn.sub_id.clone(),
last_agent_message: Some("done".to_string()),
outcome: TurnOutcome::Succeeded {
last_agent_message: Some("done".to_string()),
},
}),
)
.await;
let expected_message = crate::session_prefix::format_subagent_notification_message(
tester_path.as_str(),
&AgentStatus::Completed(Some("done".to_string())),

View File

@@ -1,19 +1,25 @@
use codex_protocol::protocol::AgentStatus;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::TurnOutcome;
/// Derive the next agent status from a single emitted event.
/// Returns `None` when the event does not affect status tracking.
pub(crate) fn agent_status_from_event(msg: &EventMsg) -> Option<AgentStatus> {
match msg {
EventMsg::TurnStarted(_) => Some(AgentStatus::Running),
EventMsg::TurnComplete(ev) => Some(AgentStatus::Completed(ev.last_agent_message.clone())),
EventMsg::TurnComplete(ev) => Some(match &ev.outcome {
TurnOutcome::Succeeded { last_agent_message } => {
AgentStatus::Completed(last_agent_message.clone())
}
TurnOutcome::Failed { error } => AgentStatus::Errored(error.message.clone()),
}),
EventMsg::TurnAborted(ev) => match ev.reason {
codex_protocol::protocol::TurnAbortReason::Interrupted => {
Some(AgentStatus::Interrupted)
}
_ => Some(AgentStatus::Errored(format!("{:?}", ev.reason))),
},
EventMsg::Error(ev) => Some(AgentStatus::Errored(ev.message.clone())),
EventMsg::Error(_) => None,
EventMsg::ShutdownComplete => Some(AgentStatus::Shutdown),
_ => None,
}

View File

@@ -14,6 +14,7 @@ use crate::agent::AgentStatus;
use crate::agent::Mailbox;
use crate::agent::MailboxReceiver;
use crate::agent::agent_status_from_event;
use crate::agent::status::is_final;
use crate::apps::render_apps_section;
use crate::auth_env_telemetry::collect_auth_env_telemetry;
use crate::commit_attribution::commit_message_trailer_instruction;
@@ -314,6 +315,7 @@ use crate::protocol::TokenCountEvent;
use crate::protocol::TokenUsage;
use crate::protocol::TokenUsageInfo;
use crate::protocol::TurnDiffEvent;
use crate::protocol::TurnOutcome;
use crate::protocol::WarningEvent;
use crate::resolve_skill_dependencies_for_turn;
use crate::rollout::RolloutRecorder;
@@ -2641,6 +2643,15 @@ impl Session {
.await;
self.maybe_clear_realtime_handoff_for_event(&legacy_source)
.await;
if let Some(status) = agent_status_from_event(&legacy_source)
&& is_final(&status)
&& turn_context.session_source.get_agent_path().is_some()
{
self.services
.agent_control
.forward_child_final_status_to_parent(&turn_context.session_source, &status)
.await;
}
let show_raw_agent_reasoning = self.show_raw_agent_reasoning();
for legacy in legacy_source.as_legacy_events(show_raw_agent_reasoning) {
@@ -5586,9 +5597,11 @@ pub(crate) async fn run_turn(
input: Vec<UserInput>,
prewarmed_client_session: Option<ModelClientSession>,
cancellation_token: CancellationToken,
) -> Option<String> {
) -> TurnOutcome {
if input.is_empty() && !sess.has_pending_input().await {
return None;
return TurnOutcome::Succeeded {
last_agent_message: None,
};
}
let model_info = turn_context.model_info.clone();
@@ -5602,7 +5615,9 @@ pub(crate) async fn run_turn(
.is_err()
{
error!("Failed to run pre-sampling compact");
return None;
return TurnOutcome::Succeeded {
last_agent_message: None,
};
}
let skills_outcome = Some(turn_context.turn_skills.outcome.as_ref());
@@ -5632,7 +5647,11 @@ pub(crate) async fn run_turn(
.await
{
Ok(mcp_tools) => mcp_tools,
Err(_) if turn_context.apps_enabled() => return None,
Err(_) if turn_context.apps_enabled() => {
return TurnOutcome::Succeeded {
last_agent_message: None,
};
}
Err(_) => HashMap::new(),
}
} else {
@@ -5730,7 +5749,9 @@ pub(crate) async fn run_turn(
.collect::<Vec<_>>();
if run_pending_session_start_hooks(&sess, &turn_context).await {
return None;
return TurnOutcome::Succeeded {
last_agent_message: None,
};
}
let additional_contexts = if input.is_empty() {
Vec::new()
@@ -5750,7 +5771,9 @@ pub(crate) async fn run_turn(
user_prompt_submit_outcome.additional_contexts,
)
.await;
return None;
return TurnOutcome::Succeeded {
last_agent_message: None,
};
}
sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), &input, response_item)
.await;
@@ -5913,7 +5936,9 @@ pub(crate) async fn run_turn(
.await
.is_err()
{
return None;
return TurnOutcome::Succeeded {
last_agent_message: None,
};
}
continue;
}
@@ -6025,15 +6050,12 @@ pub(crate) async fn run_turn(
}
}
if let Some(message) = abort_message {
sess.send_event(
&turn_context,
EventMsg::Error(ErrorEvent {
return TurnOutcome::Failed {
error: ErrorEvent {
message,
codex_error_info: None,
}),
)
.await;
return None;
},
};
}
break;
}
@@ -6051,25 +6073,23 @@ pub(crate) async fn run_turn(
if state.history.replace_last_turn_images("Invalid image") {
continue;
}
let event = EventMsg::Error(ErrorEvent {
let error = ErrorEvent {
message: "Invalid image in your last message. Please remove it and try again."
.to_string(),
codex_error_info: Some(CodexErrorInfo::BadRequest),
});
sess.send_event(&turn_context, event).await;
break;
};
return TurnOutcome::Failed { error };
}
Err(e) => {
info!("Turn error: {e:#}");
let event = EventMsg::Error(e.to_error_event(/*message_prefix*/ None));
sess.send_event(&turn_context, event).await;
// let the user continue the conversation
break;
return TurnOutcome::Failed {
error: e.to_error_event(/*message_prefix*/ None),
};
}
}
}
last_agent_message
TurnOutcome::Succeeded { last_agent_message }
}
async fn run_pre_sampling_compact(

View File

@@ -208,7 +208,9 @@ async fn reconstruct_history_rollback_keeps_history_and_metadata_in_sync_for_com
RolloutItem::EventMsg(EventMsg::TurnComplete(
codex_protocol::protocol::TurnCompleteEvent {
turn_id: first_turn_id,
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
},
)),
RolloutItem::EventMsg(EventMsg::TurnStarted(
@@ -232,7 +234,9 @@ async fn reconstruct_history_rollback_keeps_history_and_metadata_in_sync_for_com
RolloutItem::EventMsg(EventMsg::TurnComplete(
codex_protocol::protocol::TurnCompleteEvent {
turn_id: rolled_back_turn_id,
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
},
)),
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(
@@ -298,7 +302,9 @@ async fn reconstruct_history_rollback_keeps_history_and_metadata_in_sync_for_inc
RolloutItem::EventMsg(EventMsg::TurnComplete(
codex_protocol::protocol::TurnCompleteEvent {
turn_id: first_turn_id,
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
},
)),
RolloutItem::EventMsg(EventMsg::TurnStarted(
@@ -383,7 +389,9 @@ async fn reconstruct_history_rollback_skips_non_user_turns_for_history_and_metad
RolloutItem::EventMsg(EventMsg::TurnComplete(
codex_protocol::protocol::TurnCompleteEvent {
turn_id: first_turn_id,
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
},
)),
RolloutItem::EventMsg(EventMsg::TurnStarted(
@@ -406,7 +414,9 @@ async fn reconstruct_history_rollback_skips_non_user_turns_for_history_and_metad
RolloutItem::EventMsg(EventMsg::TurnComplete(
codex_protocol::protocol::TurnCompleteEvent {
turn_id: second_turn_id,
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
},
)),
RolloutItem::EventMsg(EventMsg::TurnStarted(
@@ -420,7 +430,9 @@ async fn reconstruct_history_rollback_skips_non_user_turns_for_history_and_metad
RolloutItem::EventMsg(EventMsg::TurnComplete(
codex_protocol::protocol::TurnCompleteEvent {
turn_id: standalone_turn_id,
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
},
)),
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(
@@ -489,7 +501,9 @@ async fn reconstruct_history_rollback_counts_inter_agent_assistant_turns() {
RolloutItem::EventMsg(EventMsg::TurnComplete(
codex_protocol::protocol::TurnCompleteEvent {
turn_id: first_turn_id,
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
},
)),
RolloutItem::EventMsg(EventMsg::TurnStarted(
@@ -505,7 +519,9 @@ async fn reconstruct_history_rollback_counts_inter_agent_assistant_turns() {
RolloutItem::EventMsg(EventMsg::TurnComplete(
codex_protocol::protocol::TurnCompleteEvent {
turn_id: assistant_turn_id,
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
},
)),
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(
@@ -569,7 +585,9 @@ async fn reconstruct_history_rollback_clears_history_and_metadata_when_exceeding
RolloutItem::EventMsg(EventMsg::TurnComplete(
codex_protocol::protocol::TurnCompleteEvent {
turn_id: only_turn_id,
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
},
)),
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(
@@ -615,7 +633,9 @@ async fn record_initial_history_resumed_rollback_skips_only_user_turns() {
RolloutItem::EventMsg(EventMsg::TurnComplete(
codex_protocol::protocol::TurnCompleteEvent {
turn_id: user_turn_id,
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
},
)),
// Standalone task turn (no UserMessage) should not consume rollback skips.
@@ -629,7 +649,9 @@ async fn record_initial_history_resumed_rollback_skips_only_user_turns() {
RolloutItem::EventMsg(EventMsg::TurnComplete(
codex_protocol::protocol::TurnCompleteEvent {
turn_id: standalone_turn_id,
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
},
)),
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(
@@ -679,7 +701,9 @@ async fn record_initial_history_resumed_rollback_drops_incomplete_user_turn_comp
RolloutItem::EventMsg(EventMsg::TurnComplete(
codex_protocol::protocol::TurnCompleteEvent {
turn_id: previous_turn_id,
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
},
)),
RolloutItem::EventMsg(EventMsg::TurnStarted(
@@ -831,7 +855,9 @@ async fn reconstruct_history_legacy_compaction_without_replacement_history_clear
RolloutItem::EventMsg(EventMsg::TurnComplete(
codex_protocol::protocol::TurnCompleteEvent {
turn_id: current_turn_id,
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
},
)),
];
@@ -897,7 +923,9 @@ async fn record_initial_history_resumed_turn_context_after_compaction_reestablis
RolloutItem::EventMsg(EventMsg::TurnComplete(
codex_protocol::protocol::TurnCompleteEvent {
turn_id: previous_turn_id,
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
},
)),
];
@@ -995,7 +1023,9 @@ async fn record_initial_history_resumed_aborted_turn_without_id_clears_active_tu
RolloutItem::EventMsg(EventMsg::TurnComplete(
codex_protocol::protocol::TurnCompleteEvent {
turn_id: previous_turn_id,
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
},
)),
RolloutItem::EventMsg(EventMsg::TurnStarted(
@@ -1096,7 +1126,9 @@ async fn record_initial_history_resumed_unmatched_abort_preserves_active_turn_fo
RolloutItem::EventMsg(EventMsg::TurnComplete(
codex_protocol::protocol::TurnCompleteEvent {
turn_id: previous_turn_id,
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
},
)),
RolloutItem::EventMsg(EventMsg::TurnStarted(
@@ -1124,7 +1156,9 @@ async fn record_initial_history_resumed_unmatched_abort_preserves_active_turn_fo
RolloutItem::EventMsg(EventMsg::TurnComplete(
codex_protocol::protocol::TurnCompleteEvent {
turn_id: current_turn_id,
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
},
)),
];
@@ -1203,7 +1237,9 @@ async fn record_initial_history_resumed_trailing_incomplete_turn_compaction_clea
RolloutItem::EventMsg(EventMsg::TurnComplete(
codex_protocol::protocol::TurnCompleteEvent {
turn_id: previous_turn_id,
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
},
)),
RolloutItem::EventMsg(EventMsg::TurnStarted(
@@ -1348,7 +1384,9 @@ async fn record_initial_history_resumed_replaced_incomplete_compacted_turn_clear
RolloutItem::EventMsg(EventMsg::TurnComplete(
codex_protocol::protocol::TurnCompleteEvent {
turn_id: previous_turn_id,
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
},
)),
RolloutItem::EventMsg(EventMsg::TurnStarted(

View File

@@ -35,6 +35,7 @@ use tracing::Span;
use crate::protocol::CompactedItem;
use crate::protocol::CreditsSnapshot;
use crate::protocol::ErrorEvent;
use crate::protocol::InitialHistory;
use crate::protocol::NetworkApprovalProtocol;
use crate::protocol::RateLimitSnapshot;
@@ -45,6 +46,7 @@ use crate::protocol::TokenCountEvent;
use crate::protocol::TokenUsage;
use crate::protocol::TokenUsageInfo;
use crate::protocol::TurnCompleteEvent;
use crate::protocol::TurnOutcome;
use crate::protocol::TurnStartedEvent;
use crate::protocol::UserMessageEvent;
use crate::rollout::policy::EventPersistenceMode;
@@ -1484,7 +1486,9 @@ async fn thread_rollback_recomputes_previous_turn_settings_and_reference_context
RolloutItem::ResponseItem(turn_one_assistant.clone()),
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: first_turn_id,
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
})),
RolloutItem::EventMsg(EventMsg::TurnStarted(
codex_protocol::protocol::TurnStartedEvent {
@@ -1506,7 +1510,9 @@ async fn thread_rollback_recomputes_previous_turn_settings_and_reference_context
RolloutItem::ResponseItem(turn_two_assistant),
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: rolled_back_turn_id,
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
})),
])
.await;
@@ -1580,7 +1586,9 @@ async fn thread_rollback_restores_cleared_reference_context_item_after_compactio
RolloutItem::ResponseItem(assistant_message("turn 1 assistant")),
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: first_turn_id,
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
})),
RolloutItem::EventMsg(EventMsg::TurnStarted(
codex_protocol::protocol::TurnStartedEvent {
@@ -1595,7 +1603,9 @@ async fn thread_rollback_restores_cleared_reference_context_item_after_compactio
}),
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: compact_turn_id,
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
})),
RolloutItem::EventMsg(EventMsg::TurnStarted(
codex_protocol::protocol::TurnStartedEvent {
@@ -1619,7 +1629,9 @@ async fn thread_rollback_restores_cleared_reference_context_item_after_compactio
RolloutItem::ResponseItem(assistant_message("turn 2 assistant")),
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: rolled_back_turn_id,
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
})),
])
.await;
@@ -1662,7 +1674,9 @@ async fn thread_rollback_persists_marker_and_replays_cumulatively() {
RolloutItem::ResponseItem(assistant_message("turn 1 assistant")),
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-1".to_string(),
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
})),
RolloutItem::EventMsg(EventMsg::TurnStarted(
codex_protocol::protocol::TurnStartedEvent {
@@ -1682,7 +1696,9 @@ async fn thread_rollback_persists_marker_and_replays_cumulatively() {
RolloutItem::ResponseItem(assistant_message("turn 2 assistant")),
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-2".to_string(),
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
})),
RolloutItem::EventMsg(EventMsg::TurnStarted(
codex_protocol::protocol::TurnStartedEvent {
@@ -1702,7 +1718,9 @@ async fn thread_rollback_persists_marker_and_replays_cumulatively() {
RolloutItem::ResponseItem(assistant_message("turn 3 assistant")),
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-3".to_string(),
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
})),
])
.await;
@@ -3138,13 +3156,15 @@ async fn spawn_task_turn_span_inherits_dispatch_trace_context() {
_ctx: Arc<TurnContext>,
_input: Vec<UserInput>,
_cancellation_token: CancellationToken,
) -> Option<String> {
) -> TurnOutcome {
let mut trace = self
.captured_trace
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
*trace = current_span_w3c_trace_context();
None
TurnOutcome::Succeeded {
last_agent_message: None,
}
}
}
@@ -4380,6 +4400,9 @@ struct NeverEndingTask {
listen_to_cancellation_token: bool,
}
#[derive(Clone, Copy)]
struct FailingTask;
#[async_trait::async_trait]
impl SessionTask for NeverEndingTask {
fn kind(&self) -> TaskKind {
@@ -4396,10 +4419,12 @@ impl SessionTask for NeverEndingTask {
_ctx: Arc<TurnContext>,
_input: Vec<UserInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
) -> TurnOutcome {
if self.listen_to_cancellation_token {
cancellation_token.cancelled().await;
return None;
return TurnOutcome::Succeeded {
last_agent_message: None,
};
}
loop {
sleep(Duration::from_secs(60)).await;
@@ -4407,6 +4432,32 @@ impl SessionTask for NeverEndingTask {
}
}
#[async_trait::async_trait]
impl SessionTask for FailingTask {
fn kind(&self) -> TaskKind {
TaskKind::Regular
}
fn span_name(&self) -> &'static str {
"session_task.failing"
}
async fn run(
self: Arc<Self>,
_session: Arc<SessionTaskContext>,
_ctx: Arc<TurnContext>,
_input: Vec<UserInput>,
_cancellation_token: CancellationToken,
) -> TurnOutcome {
TurnOutcome::Failed {
error: ErrorEvent {
message: "boom".to_string(),
codex_error_info: None,
},
}
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[test_log::test]
async fn abort_regular_task_emits_turn_aborted_only() {
@@ -4441,6 +4492,30 @@ async fn abort_regular_task_emits_turn_aborted_only() {
assert!(rx.try_recv().is_err());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[test_log::test]
async fn failing_regular_task_emits_error_without_turn_complete() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;
let input = vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}];
sess.spawn_task(Arc::clone(&tc), input, FailingTask).await;
let evt = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv())
.await
.expect("timeout waiting for event")
.expect("event");
match evt.msg {
EventMsg::Error(ErrorEvent {
message,
codex_error_info: None,
}) => assert_eq!(message, "boom"),
other => panic!("unexpected event: {other:?}"),
}
assert!(rx.try_recv().is_err());
}
#[tokio::test]
async fn abort_gracefully_emits_turn_aborted_only() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;
@@ -4502,8 +4577,13 @@ async fn task_finish_emits_turn_item_lifecycle_for_leftover_pending_user_input()
.await
.expect("inject pending input into active turn");
sess.on_task_finished(Arc::clone(&tc), /*last_agent_message*/ None)
.await;
sess.on_task_finished(
Arc::clone(&tc),
TurnOutcome::Succeeded {
last_agent_message: None,
},
)
.await;
let history = sess.clone_history().await;
let expected = ResponseItem::Message {

View File

@@ -18,6 +18,7 @@ use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::TurnOutcome;
use codex_protocol::user_input::UserInput;
use serde_json::Value;
use tokio::sync::Mutex;
@@ -578,8 +579,6 @@ async fn wait_for_guardian_review(
) -> (GuardianReviewSessionOutcome, bool) {
let timeout = tokio::time::sleep_until(deadline);
tokio::pin!(timeout);
let mut last_error_message: Option<String> = None;
loop {
tokio::select! {
_ = &mut timeout => {
@@ -600,21 +599,16 @@ async fn wait_for_guardian_review(
match event {
Ok(event) => match event.msg {
EventMsg::TurnComplete(turn_complete) => {
if turn_complete.last_agent_message.is_none()
&& let Some(error_message) = last_error_message
{
return (
GuardianReviewSessionOutcome::Completed(Err(anyhow!(error_message))),
return match turn_complete.outcome {
TurnOutcome::Succeeded { last_agent_message } => (
GuardianReviewSessionOutcome::Completed(Ok(last_agent_message)),
true,
);
}
return (
GuardianReviewSessionOutcome::Completed(Ok(turn_complete.last_agent_message)),
true,
);
}
EventMsg::Error(error) => {
last_error_message = Some(error.message);
),
TurnOutcome::Failed { error } => (
GuardianReviewSessionOutcome::Completed(Err(anyhow!(error.message))),
true,
),
};
}
EventMsg::TurnAborted(_) => {
return (GuardianReviewSessionOutcome::Aborted, true);

View File

@@ -3,6 +3,7 @@ use std::sync::Arc;
use super::SessionTask;
use super::SessionTaskContext;
use crate::codex::TurnContext;
use crate::protocol::TurnOutcome;
use crate::state::TaskKind;
use async_trait::async_trait;
use codex_protocol::user_input::UserInput;
@@ -27,7 +28,7 @@ impl SessionTask for CompactTask {
ctx: Arc<TurnContext>,
input: Vec<UserInput>,
_cancellation_token: CancellationToken,
) -> Option<String> {
) -> TurnOutcome {
let session = session.clone_session();
let _ = if crate::compact::should_use_remote_compact_task(&ctx.provider) {
let _ = session.services.session_telemetry.counter(
@@ -44,6 +45,8 @@ impl SessionTask for CompactTask {
);
crate::compact::run_compact_task(session.clone(), ctx, input).await
};
None
TurnOutcome::Succeeded {
last_agent_message: None,
}
}
}

View File

@@ -1,5 +1,6 @@
use crate::codex::TurnContext;
use crate::protocol::EventMsg;
use crate::protocol::TurnOutcome;
use crate::protocol::WarningEvent;
use crate::state::TaskKind;
use crate::tasks::SessionTask;
@@ -42,7 +43,7 @@ impl SessionTask for GhostSnapshotTask {
ctx: Arc<TurnContext>,
_input: Vec<UserInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
) -> TurnOutcome {
tokio::task::spawn(async move {
let token = self.token;
let warnings_enabled = !ctx.ghost_snapshot.disable_warnings;
@@ -156,7 +157,9 @@ impl SessionTask for GhostSnapshotTask {
Err(err) => warn!("failed to mark ghost snapshot ready: {err}"),
}
});
None
TurnOutcome::Succeeded {
last_agent_message: None,
}
}
}

View File

@@ -33,6 +33,7 @@ use crate::protocol::EventMsg;
use crate::protocol::TurnAbortReason;
use crate::protocol::TurnAbortedEvent;
use crate::protocol::TurnCompleteEvent;
use crate::protocol::TurnOutcome;
use crate::state::ActiveTurn;
use crate::state::RunningTask;
use crate::state::TaskKind;
@@ -140,15 +141,15 @@ pub(crate) trait SessionTask: Send + Sync + 'static {
/// `ctx`, returning an optional final agent message when finished. The
/// provided `cancellation_token` is cancelled when the session requests an
/// abort; implementers should watch for it and terminate quickly once it
/// fires. Returning [`Some`] yields a final message that
/// [`Session::on_task_finished`] will emit to the client.
/// fires. The returned [`TurnOutcome`] determines the single terminal
/// event emitted by [`Session::on_task_finished`].
async fn run(
self: Arc<Self>,
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
input: Vec<UserInput>,
cancellation_token: CancellationToken,
) -> Option<String>;
) -> TurnOutcome;
/// Gives the task a chance to perform cleanup after an abort.
///
@@ -209,7 +210,7 @@ impl Session {
tokio::spawn(
async move {
let ctx_for_finish = Arc::clone(&ctx);
let last_agent_message = task_for_run
let completion = task_for_run
.run(
Arc::clone(&session_ctx),
ctx,
@@ -221,7 +222,7 @@ impl Session {
sess.flush_rollout().await;
if !task_cancellation_token.is_cancelled() {
// Emit completion uniformly from spawn site so all tasks share the same lifecycle.
sess.on_task_finished(Arc::clone(&ctx_for_finish), last_agent_message)
sess.on_task_finished(Arc::clone(&ctx_for_finish), completion)
.await;
}
done_clone.notify_waiters();
@@ -314,7 +315,7 @@ impl Session {
pub async fn on_task_finished(
self: &Arc<Self>,
turn_context: Arc<TurnContext>,
last_agent_message: Option<String>,
outcome: TurnOutcome,
) {
turn_context
.turn_metadata_state
@@ -433,7 +434,7 @@ impl Session {
}
let event = EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: turn_context.sub_id.clone(),
last_agent_message,
outcome,
});
self.send_event(turn_context.as_ref(), event).await;
}

View File

@@ -6,6 +6,7 @@ use tokio_util::sync::CancellationToken;
use crate::codex::TurnContext;
use crate::codex::run_turn;
use crate::protocol::EventMsg;
use crate::protocol::TurnOutcome;
use crate::protocol::TurnStartedEvent;
use crate::session_startup_prewarm::SessionStartupPrewarmResolution;
use crate::state::TaskKind;
@@ -41,7 +42,7 @@ impl SessionTask for RegularTask {
ctx: Arc<TurnContext>,
input: Vec<UserInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
) -> TurnOutcome {
let sess = session.clone_session();
let run_turn_span = trace_span!("run_turn");
// Regular turns emit `TurnStarted` inline so first-turn lifecycle does
@@ -57,7 +58,11 @@ impl SessionTask for RegularTask {
.consume_startup_prewarm_for_regular_turn(&cancellation_token)
.await
{
SessionStartupPrewarmResolution::Cancelled => return None,
SessionStartupPrewarmResolution::Cancelled => {
return TurnOutcome::Succeeded {
last_agent_message: None,
};
}
SessionStartupPrewarmResolution::Unavailable { .. } => None,
SessionStartupPrewarmResolution::Ready(prewarmed_client_session) => {
Some(*prewarmed_client_session)

View File

@@ -15,6 +15,7 @@ use codex_protocol::protocol::ExitedReviewModeEvent;
use codex_protocol::protocol::ItemCompletedEvent;
use codex_protocol::protocol::ReviewOutputEvent;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::TurnOutcome;
use codex_utils_template::Template;
use tokio_util::sync::CancellationToken;
@@ -64,7 +65,7 @@ impl SessionTask for ReviewTask {
ctx: Arc<TurnContext>,
input: Vec<UserInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
) -> TurnOutcome {
let _ = session.session.services.session_telemetry.counter(
"codex.task.review",
/*inc*/ 1,
@@ -86,7 +87,9 @@ impl SessionTask for ReviewTask {
if !cancellation_token.is_cancelled() {
exit_review_mode(session.clone_session(), output.clone(), ctx.clone()).await;
}
None
TurnOutcome::Succeeded {
last_agent_message: None,
}
}
async fn abort(&self, session: Arc<SessionTaskContext>, ctx: Arc<TurnContext>) {
@@ -166,12 +169,18 @@ async fn process_review_events(
| EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { .. })
| EventMsg::AgentMessageContentDelta(AgentMessageContentDeltaEvent { .. }) => {}
EventMsg::TurnComplete(task_complete) => {
// Parse review output from the last agent message (if present).
let out = task_complete
.last_agent_message
.as_deref()
.map(parse_review_output_event);
return out;
return match task_complete.outcome {
TurnOutcome::Succeeded { last_agent_message } => {
last_agent_message.as_deref().map(parse_review_output_event)
}
TurnOutcome::Failed { error } => {
session
.clone_session()
.send_event(ctx.as_ref(), EventMsg::Error(error))
.await;
None
}
};
}
EventMsg::TurnAborted(_) => {
// Cancellation or abort: consumer will finalize with None.

View File

@@ -2,6 +2,7 @@ use std::sync::Arc;
use crate::codex::TurnContext;
use crate::protocol::EventMsg;
use crate::protocol::TurnOutcome;
use crate::protocol::UndoCompletedEvent;
use crate::protocol::UndoStartedEvent;
use crate::state::TaskKind;
@@ -41,7 +42,7 @@ impl SessionTask for UndoTask {
ctx: Arc<TurnContext>,
_input: Vec<UserInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
) -> TurnOutcome {
let _ = session.session.services.session_telemetry.counter(
"codex.task.undo",
/*inc*/ 1,
@@ -65,7 +66,9 @@ impl SessionTask for UndoTask {
}),
)
.await;
return None;
return TurnOutcome::Succeeded {
last_agent_message: None,
};
}
let history = sess.clone_history().await;
@@ -90,7 +93,9 @@ impl SessionTask for UndoTask {
completed.message = Some("No ghost snapshot available to undo.".to_string());
sess.send_event(ctx.as_ref(), EventMsg::UndoCompleted(completed))
.await;
return None;
return TurnOutcome::Succeeded {
last_agent_message: None,
};
};
let commit_id = ghost_commit.id().to_string();
@@ -126,6 +131,8 @@ impl SessionTask for UndoTask {
sess.send_event(ctx.as_ref(), EventMsg::UndoCompleted(completed))
.await;
None
TurnOutcome::Succeeded {
last_agent_message: None,
}
}
}

View File

@@ -23,6 +23,7 @@ use crate::protocol::ExecCommandEndEvent;
use crate::protocol::ExecCommandSource;
use crate::protocol::ExecCommandStatus;
use crate::protocol::SandboxPolicy;
use crate::protocol::TurnOutcome;
use crate::protocol::TurnStartedEvent;
use crate::sandboxing::ExecRequest;
use crate::state::TaskKind;
@@ -78,7 +79,7 @@ impl SessionTask for UserShellCommandTask {
turn_context: Arc<TurnContext>,
_input: Vec<UserInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
) -> TurnOutcome {
execute_user_shell_command(
session.clone_session(),
turn_context,
@@ -87,7 +88,9 @@ impl SessionTask for UserShellCommandTask {
UserShellCommandMode::StandaloneTurn,
)
.await;
None
TurnOutcome::Succeeded {
last_agent_message: None,
}
}
}

View File

@@ -9,6 +9,7 @@ use crate::config::types::ShellEnvironmentPolicy;
use crate::function_tool::FunctionCallError;
use crate::protocol::AgentStatus;
use crate::protocol::AskForApproval;
use crate::protocol::ErrorEvent;
use crate::protocol::EventMsg;
use crate::protocol::FileSystemSandboxPolicy;
use crate::protocol::NetworkSandboxPolicy;
@@ -17,6 +18,7 @@ use crate::protocol::SandboxPolicy;
use crate::protocol::SessionSource;
use crate::protocol::SubAgentSource;
use crate::protocol::TurnCompleteEvent;
use crate::protocol::TurnOutcome;
use crate::state::TaskKind;
use crate::tasks::SessionTask;
use crate::tasks::SessionTaskContext;
@@ -112,6 +114,9 @@ fn history_contains_inter_agent_communication(
#[derive(Clone, Copy)]
struct NeverEndingTask;
#[derive(Clone, Copy)]
struct FailingTask;
#[async_trait::async_trait]
impl SessionTask for NeverEndingTask {
fn kind(&self) -> TaskKind {
@@ -128,9 +133,37 @@ impl SessionTask for NeverEndingTask {
_ctx: Arc<TurnContext>,
_input: Vec<UserInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
) -> TurnOutcome {
cancellation_token.cancelled().await;
None
TurnOutcome::Succeeded {
last_agent_message: None,
}
}
}
#[async_trait::async_trait]
impl SessionTask for FailingTask {
fn kind(&self) -> TaskKind {
TaskKind::Regular
}
fn span_name(&self) -> &'static str {
"session_task.multi_agent_failing"
}
async fn run(
self: Arc<Self>,
_session: Arc<SessionTaskContext>,
_ctx: Arc<TurnContext>,
_input: Vec<UserInput>,
_cancellation_token: CancellationToken,
) -> TurnOutcome {
TurnOutcome::Failed {
error: ErrorEvent {
message: "boom".to_string(),
codex_error_info: None,
},
}
}
}
@@ -601,7 +634,9 @@ async fn multi_agent_v2_list_agents_returns_completed_status_and_last_task_messa
child_turn.as_ref(),
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: child_turn.sub_id.clone(),
last_agent_message: Some("done".to_string()),
outcome: TurnOutcome::Succeeded {
last_agent_message: Some("done".to_string()),
},
}),
)
.await;
@@ -1119,6 +1154,277 @@ async fn multi_agent_v2_assign_task_interrupts_busy_child_without_losing_message
.expect("shutdown should submit");
}
#[tokio::test]
async fn multi_agent_v2_assign_task_completion_notifies_parent_after_reuse() {
let (mut session, mut turn) = make_session_and_context().await;
let manager = thread_manager();
let root = manager
.start_thread((*turn.config).clone())
.await
.expect("root thread should start");
session.services.agent_control = manager.agent_control();
session.conversation_id = root.thread_id;
let mut config = turn.config.as_ref().clone();
let _ = config.features.enable(Feature::MultiAgentV2);
turn.config = Arc::new(config);
let session = Arc::new(session);
let turn = Arc::new(turn);
SpawnAgentHandlerV2
.handle(invocation(
session.clone(),
turn.clone(),
"spawn_agent",
function_payload(json!({
"message": "boot worker",
"task_name": "worker"
})),
))
.await
.expect("spawn worker");
let agent_id = session
.services
.agent_control
.resolve_agent_reference(session.conversation_id, &turn.session_source, "worker")
.await
.expect("worker should resolve");
let thread = manager
.get_thread(agent_id)
.await
.expect("worker thread should exist");
let worker_path = AgentPath::try_from("/root/worker").expect("agent path");
let first_turn = thread.codex.session.new_default_turn().await;
thread
.codex
.session
.send_event(
first_turn.as_ref(),
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: first_turn.sub_id.clone(),
outcome: TurnOutcome::Succeeded {
last_agent_message: Some("done once".to_string()),
},
}),
)
.await;
AssignTaskHandlerV2
.handle(invocation(
session,
turn,
"assign_task",
function_payload(json!({
"target": agent_id.to_string(),
"items": [{"type": "text", "text": "continue"}]
})),
))
.await
.expect("assign_task should succeed");
let second_turn = thread.codex.session.new_default_turn().await;
thread
.codex
.session
.send_event(
second_turn.as_ref(),
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: second_turn.sub_id.clone(),
outcome: TurnOutcome::Succeeded {
last_agent_message: Some("done twice".to_string()),
},
}),
)
.await;
let expected_first = (
root.thread_id,
Op::InterAgentCommunication {
communication: InterAgentCommunication::new(
worker_path.clone(),
AgentPath::root(),
Vec::new(),
crate::session_prefix::format_subagent_notification_message(
worker_path.as_str(),
&AgentStatus::Completed(Some("done once".to_string())),
),
/*trigger_turn*/ false,
),
},
);
let expected_second = (
root.thread_id,
Op::InterAgentCommunication {
communication: InterAgentCommunication::new(
worker_path,
AgentPath::root(),
Vec::new(),
crate::session_prefix::format_subagent_notification_message(
"/root/worker",
&AgentStatus::Completed(Some("done twice".to_string())),
),
/*trigger_turn*/ false,
),
},
);
timeout(Duration::from_secs(5), async {
loop {
let captured = manager.captured_ops();
if captured.contains(&expected_first) && captured.contains(&expected_second) {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("reused worker should notify parent on both completed turns");
}
#[tokio::test]
async fn multi_agent_v2_failed_turn_forwards_only_error_status_after_reuse() {
let (mut session, mut turn) = make_session_and_context().await;
let manager = thread_manager();
let root = manager
.start_thread((*turn.config).clone())
.await
.expect("root thread should start");
session.services.agent_control = manager.agent_control();
session.conversation_id = root.thread_id;
let mut config = turn.config.as_ref().clone();
let _ = config.features.enable(Feature::MultiAgentV2);
turn.config = Arc::new(config);
let session = Arc::new(session);
let turn = Arc::new(turn);
SpawnAgentHandlerV2
.handle(invocation(
session.clone(),
turn.clone(),
"spawn_agent",
function_payload(json!({
"message": "boot worker",
"task_name": "worker"
})),
))
.await
.expect("spawn worker");
let agent_id = session
.services
.agent_control
.resolve_agent_reference(session.conversation_id, &turn.session_source, "worker")
.await
.expect("worker should resolve");
let thread = manager
.get_thread(agent_id)
.await
.expect("worker thread should exist");
let worker_path = AgentPath::try_from("/root/worker").expect("agent path");
let first_turn = thread.codex.session.new_default_turn().await;
thread
.codex
.session
.send_event(
first_turn.as_ref(),
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: first_turn.sub_id.clone(),
outcome: TurnOutcome::Succeeded {
last_agent_message: Some("done once".to_string()),
},
}),
)
.await;
AssignTaskHandlerV2
.handle(invocation(
session,
turn,
"assign_task",
function_payload(json!({
"target": agent_id.to_string(),
"items": [{"type": "text", "text": "continue"}]
})),
))
.await
.expect("assign_task should succeed");
let second_turn = thread.codex.session.new_default_turn().await;
thread
.codex
.session
.spawn_task(
Arc::clone(&second_turn),
vec![UserInput::Text {
text: "continue".to_string(),
text_elements: Vec::new(),
}],
FailingTask,
)
.await;
let expected_first = (
root.thread_id,
Op::InterAgentCommunication {
communication: InterAgentCommunication::new(
worker_path.clone(),
AgentPath::root(),
Vec::new(),
crate::session_prefix::format_subagent_notification_message(
worker_path.as_str(),
&AgentStatus::Completed(Some("done once".to_string())),
),
/*trigger_turn*/ false,
),
},
);
let expected_error = (
root.thread_id,
Op::InterAgentCommunication {
communication: InterAgentCommunication::new(
worker_path.clone(),
AgentPath::root(),
Vec::new(),
crate::session_prefix::format_subagent_notification_message(
worker_path.as_str(),
&AgentStatus::Errored("boom".to_string()),
),
/*trigger_turn*/ false,
),
},
);
let unexpected_completed = (
root.thread_id,
Op::InterAgentCommunication {
communication: InterAgentCommunication::new(
worker_path,
AgentPath::root(),
Vec::new(),
crate::session_prefix::format_subagent_notification_message(
"/root/worker",
&AgentStatus::Completed(None),
),
/*trigger_turn*/ false,
),
},
);
timeout(Duration::from_secs(5), async {
loop {
let captured = manager.captured_ops();
if captured.contains(&expected_first)
&& captured.contains(&expected_error)
&& !captured.contains(&unexpected_completed)
{
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("failed reused worker turn should only notify parent with the errored status");
}
#[tokio::test]
async fn multi_agent_v2_spawn_includes_agent_id_key_when_named() {
let (mut session, mut turn) = make_session_and_context().await;

View File

@@ -134,7 +134,7 @@ async fn resume_includes_initial_messages_from_rollout_events() -> Result<()> {
assert_eq!(assistant_message.message, "Completed first turn");
assert_eq!(completed.turn_id, started.turn_id);
assert_eq!(
completed.last_agent_message.as_deref(),
completed.last_agent_message().as_deref(),
Some("Completed first turn")
);
}
@@ -224,7 +224,7 @@ async fn resume_includes_initial_messages_from_reasoning_events() -> Result<()>
assert_eq!(assistant_message.message, "Completed reasoning turn");
assert_eq!(completed.turn_id, started.turn_id);
assert_eq!(
completed.last_agent_message.as_deref(),
completed.last_agent_message().as_deref(),
Some("Completed reasoning turn")
);
}

View File

@@ -22,6 +22,7 @@ use codex_protocol::protocol::ExecApprovalRequestEvent;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::Submission;
use codex_protocol::protocol::TurnCompleteEvent;
use codex_protocol::protocol::TurnOutcome;
use codex_protocol::user_input::UserInput;
use rmcp::model::CallToolResult;
use rmcp::model::Content;
@@ -291,16 +292,15 @@ async fn run_codex_tool_session_inner(
.await;
continue;
}
EventMsg::TurnComplete(TurnCompleteEvent {
last_agent_message, ..
}) => {
let text = match last_agent_message {
Some(msg) => msg,
None => "".to_string(),
EventMsg::TurnComplete(TurnCompleteEvent { outcome, .. }) => {
let (text, is_error) = match outcome {
TurnOutcome::Succeeded { last_agent_message } => {
(last_agent_message.unwrap_or_default(), None)
}
TurnOutcome::Failed { error } => (error.message, Some(true)),
};
let result = create_call_tool_result_with_thread_id(
thread_id, text, /*is_error*/ None,
);
let result =
create_call_tool_result_with_thread_id(thread_id, text, is_error);
outgoing.send_response(request_id.clone(), result).await;
// unregister the id so we don't keep it in the map
running_requests_id_to_codex_uuid

View File

@@ -1858,10 +1858,18 @@ pub struct ModelRerouteEvent {
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
pub struct ContextCompactedEvent;
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
#[serde(untagged)]
pub enum TurnOutcome {
Succeeded { last_agent_message: Option<String> },
Failed { error: ErrorEvent },
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
pub struct TurnCompleteEvent {
pub turn_id: String,
pub last_agent_message: Option<String>,
#[serde(flatten)]
pub outcome: TurnOutcome,
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]

View File

@@ -103,6 +103,8 @@ use codex_protocol::protocol::TurnAbortedEvent;
#[cfg(test)]
use codex_protocol::protocol::TurnCompleteEvent;
#[cfg(test)]
use codex_protocol::protocol::TurnOutcome;
#[cfg(test)]
use codex_protocol::protocol::TurnStartedEvent;
#[cfg(test)]
use std::time::Duration;
@@ -731,7 +733,7 @@ fn turn_snapshot_events(
///
/// - `Completed` → `TurnComplete`
/// - `Interrupted` → `TurnAborted { reason: Interrupted }`
/// - `Failed` → `Error` (if present) then `TurnComplete`
/// - `Failed` → `TurnComplete` with failed outcome
/// - `InProgress` → no events (the turn is still running)
#[cfg(test)]
fn append_terminal_turn_events(events: &mut Vec<Event>, turn: &Turn, include_failed_error: bool) {
@@ -740,7 +742,9 @@ fn append_terminal_turn_events(events: &mut Vec<Event>, turn: &Turn, include_fai
id: String::new(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: turn.id.clone(),
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
}),
}),
TurnStatus::Interrupted => events.push(Event {
@@ -751,23 +755,26 @@ fn append_terminal_turn_events(events: &mut Vec<Event>, turn: &Turn, include_fai
}),
}),
TurnStatus::Failed => {
if include_failed_error && let Some(error) = &turn.error {
events.push(Event {
id: String::new(),
msg: EventMsg::Error(ErrorEvent {
message: error.message.clone(),
codex_error_info: error
.codex_error_info
.clone()
.and_then(app_server_codex_error_info_to_core),
}),
});
}
let _ = include_failed_error;
events.push(Event {
id: String::new(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: turn.id.clone(),
last_agent_message: None,
outcome: TurnOutcome::Failed {
error: ErrorEvent {
message: turn
.error
.as_ref()
.map(|error| error.message.clone())
.unwrap_or_default(),
codex_error_info: turn.error.as_ref().and_then(|error| {
error
.codex_error_info
.clone()
.and_then(app_server_codex_error_info_to_core)
}),
},
},
}),
});
}
@@ -1120,7 +1127,7 @@ mod tests {
panic!("expected turn complete event");
};
assert_eq!(completed.turn_id, turn_id);
assert_eq!(completed.last_agent_message, None);
assert_eq!(completed.last_agent_message(), None);
}
#[test]
@@ -1366,7 +1373,7 @@ mod tests {
panic!("expected turn complete event");
};
assert_eq!(completed.turn_id, turn_id);
assert_eq!(completed.last_agent_message, None);
assert_eq!(completed.last_agent_message(), None);
}
#[test]

View File

@@ -194,8 +194,6 @@ use codex_protocol::protocol::TokenUsage;
use codex_protocol::protocol::TokenUsageInfo;
use codex_protocol::protocol::TurnAbortReason;
#[cfg(test)]
use codex_protocol::protocol::TurnCompleteEvent;
#[cfg(test)]
use codex_protocol::protocol::TurnDiffEvent;
#[cfg(test)]
use codex_protocol::protocol::UndoCompletedEvent;
@@ -6927,11 +6925,16 @@ impl ChatWidget {
self.on_task_started();
}
}
EventMsg::TurnComplete(TurnCompleteEvent {
last_agent_message, ..
}) => {
self.on_task_complete(last_agent_message, from_replay);
}
EventMsg::TurnComplete(codex_protocol::protocol::TurnCompleteEvent {
outcome, ..
}) => match outcome {
codex_protocol::protocol::TurnOutcome::Succeeded { last_agent_message } => {
self.on_task_complete(last_agent_message, from_replay);
}
codex_protocol::protocol::TurnOutcome::Failed { error } => {
self.on_error(error.message);
}
},
EventMsg::TokenCount(ev) => {
self.set_token_info(ev.info);
self.on_rate_limit_snapshot(ev.rate_limits);

View File

@@ -1741,7 +1741,9 @@ async fn steer_rejection_queues_review_follow_up_before_existing_queued_messages
id: "turn-complete".into(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-1".to_string(),
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
}),
});
@@ -1760,7 +1762,9 @@ async fn steer_rejection_queues_review_follow_up_before_existing_queued_messages
id: "turn-complete-2".into(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-2".to_string(),
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
}),
});
@@ -3275,7 +3279,9 @@ async fn plan_implementation_popup_skips_replayed_turn_complete() {
chat.replay_initial_messages(vec![EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-1".to_string(),
last_agent_message: Some("Plan details".to_string()),
outcome: TurnOutcome::Succeeded {
last_agent_message: Some("Plan details".to_string()),
},
})]);
let popup = render_bottom_popup(&chat, /*width*/ 80);
@@ -3299,7 +3305,9 @@ async fn plan_implementation_popup_shows_once_when_replay_precedes_live_turn_com
chat.replay_initial_messages(vec![EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-1".to_string(),
last_agent_message: Some("Plan details".to_string()),
outcome: TurnOutcome::Succeeded {
last_agent_message: Some("Plan details".to_string()),
},
})]);
let replay_popup = render_bottom_popup(&chat, /*width*/ 80);
assert!(
@@ -3311,7 +3319,9 @@ async fn plan_implementation_popup_shows_once_when_replay_precedes_live_turn_com
id: "live-turn-complete-1".to_string(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-1".to_string(),
last_agent_message: Some("Plan details".to_string()),
outcome: TurnOutcome::Succeeded {
last_agent_message: Some("Plan details".to_string()),
},
}),
});
@@ -3332,7 +3342,9 @@ async fn plan_implementation_popup_shows_once_when_replay_precedes_live_turn_com
id: "live-turn-complete-2".to_string(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-1".to_string(),
last_agent_message: Some("Plan details".to_string()),
outcome: TurnOutcome::Succeeded {
last_agent_message: Some("Plan details".to_string()),
},
}),
});
let duplicate_popup = render_bottom_popup(&chat, /*width*/ 80);
@@ -6463,7 +6475,9 @@ async fn unified_exec_wait_after_final_agent_message_snapshot() {
id: "turn-1".into(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-1".to_string(),
last_agent_message: Some("Final response.".into()),
outcome: TurnOutcome::Succeeded {
last_agent_message: Some("Final response.".into()),
},
}),
});
@@ -6505,7 +6519,9 @@ async fn unified_exec_wait_before_streamed_agent_message_snapshot() {
id: "turn-1".into(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-1".to_string(),
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
}),
});
@@ -6570,7 +6586,9 @@ async fn unified_exec_waiting_multiple_empty_snapshots() {
id: "turn-wait-1".into(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-1".to_string(),
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
}),
});
@@ -6648,7 +6666,9 @@ async fn unified_exec_non_empty_then_empty_snapshots() {
id: "turn-wait-3".into(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-1".to_string(),
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
}),
});
@@ -7150,7 +7170,9 @@ async fn slash_copy_state_tracks_turn_complete_final_reply() {
id: "turn-1".into(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-1".to_string(),
last_agent_message: Some("Final reply **markdown**".to_string()),
outcome: TurnOutcome::Succeeded {
last_agent_message: Some("Final reply **markdown**".to_string()),
},
}),
});
@@ -7180,7 +7202,9 @@ async fn slash_copy_state_tracks_plan_item_completion() {
id: "turn-1".into(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-1".to_string(),
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
}),
});
@@ -7213,7 +7237,9 @@ async fn slash_copy_state_is_preserved_during_running_task() {
id: "turn-1".into(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-1".to_string(),
last_agent_message: Some("Previous completed reply".to_string()),
outcome: TurnOutcome::Succeeded {
last_agent_message: Some("Previous completed reply".to_string()),
},
}),
});
chat.on_task_started();
@@ -7232,7 +7258,9 @@ async fn slash_copy_state_clears_on_thread_rollback() {
id: "turn-1".into(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-1".to_string(),
last_agent_message: Some("Reply that will be rolled back".to_string()),
outcome: TurnOutcome::Succeeded {
last_agent_message: Some("Reply that will be rolled back".to_string()),
},
}),
});
chat.handle_codex_event(Event {
@@ -7260,7 +7288,9 @@ async fn slash_copy_is_unavailable_when_legacy_agent_message_is_not_repeated_on_
id: "turn-1".into(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-1".to_string(),
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
}),
});
let _ = drain_insert_history(&mut rx);
@@ -7301,7 +7331,9 @@ async fn slash_copy_uses_agent_message_item_when_turn_complete_omits_final_text(
id: "turn-1".into(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-1".to_string(),
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
}),
});
let _ = drain_insert_history(&mut rx);
@@ -7346,7 +7378,9 @@ async fn slash_copy_does_not_return_stale_output_after_thread_rollback() {
id: "turn-1".into(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-1".to_string(),
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
}),
});
let _ = drain_insert_history(&mut rx);
@@ -11230,7 +11264,9 @@ async fn turn_complete_keeps_unified_exec_processes() {
id: "turn-1".into(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-1".to_string(),
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
}),
});
@@ -13172,7 +13208,9 @@ async fn status_line_branch_refreshes_after_turn_complete() {
id: "turn-1".into(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-1".to_string(),
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
}),
});
@@ -13474,7 +13512,9 @@ async fn multiple_agent_messages_in_single_turn_emit_multiple_headers() {
id: "s1".into(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-1".to_string(),
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
}),
});
@@ -13945,7 +13985,9 @@ printf 'fenced within fenced\n'
id: "t1".into(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-1".to_string(),
last_agent_message: None,
outcome: TurnOutcome::Succeeded {
last_agent_message: None,
},
}),
});
for lines in drain_insert_history(&mut rx) {