Compare commits

...

3 Commits

Author SHA1 Message Date
Tom Wiltzius
0c1da3dc3c Refine turn timing with stream stats 2026-01-29 19:35:27 -08:00
Tom Wiltzius
93d7927cdf Include overall time in timing separator 2026-01-29 17:01:23 -08:00
Tom Wiltzius
d965615c56 Add turn timing stats to CLI output 2026-01-29 15:55:54 -08:00
14 changed files with 454 additions and 27 deletions

View File

@@ -239,6 +239,7 @@ mod tests {
async fn on_event_updates_status_from_task_complete() {
let status = agent_status_from_event(&EventMsg::TurnComplete(TurnCompleteEvent {
last_agent_message: Some("done".to_string()),
timing: None,
}));
let expected = AgentStatus::Completed(Some("done".to_string()));
assert_eq!(status, Some(expected));

View File

@@ -59,6 +59,8 @@ use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnContextItem;
use codex_protocol::protocol::TurnStartedEvent;
use codex_protocol::protocol::TurnTimingStats;
use codex_protocol::protocol::TurnTimingUpdateEvent;
use codex_protocol::request_user_input::RequestUserInputArgs;
use codex_protocol::request_user_input::RequestUserInputResponse;
use codex_rmcp_client::ElicitationResponse;
@@ -201,6 +203,8 @@ use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
use codex_protocol::config_types::WindowsSandboxLevel;
use codex_protocol::models::ContentItem;
use codex_protocol::models::DeveloperInstructions;
use codex_protocol::models::ReasoningItemContent;
use codex_protocol::models::ReasoningItemReasoningSummary;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::models::render_command_prefix_list;
@@ -461,6 +465,54 @@ pub(crate) struct Session {
next_internal_sub_id: AtomicU64,
}
#[derive(Debug, Default)]
pub(crate) struct TurnTimingState {
tool_calls: AtomicU64,
tool_duration_nanos: AtomicU64,
inference_calls: AtomicU64,
response_wait_nanos: AtomicU64,
inference_stream_nanos: AtomicU64,
}
impl TurnTimingState {
pub(crate) fn record_tool_call(&self, duration: std::time::Duration) {
self.tool_calls.fetch_add(1, Ordering::Relaxed);
self.tool_duration_nanos
.fetch_add(duration_nanos(duration), Ordering::Relaxed);
}
pub(crate) fn record_inference_wait(&self, duration: std::time::Duration) {
self.inference_calls.fetch_add(1, Ordering::Relaxed);
self.response_wait_nanos
.fetch_add(duration_nanos(duration), Ordering::Relaxed);
}
pub(crate) fn record_inference_stream(&self, duration: std::time::Duration) {
self.inference_stream_nanos
.fetch_add(duration_nanos(duration), Ordering::Relaxed);
}
pub(crate) fn snapshot(&self) -> TurnTimingStats {
TurnTimingStats {
tool_calls: self.tool_calls.load(Ordering::Relaxed),
inference_calls: self.inference_calls.load(Ordering::Relaxed),
local_tool_duration: std::time::Duration::from_nanos(
self.tool_duration_nanos.load(Ordering::Relaxed),
),
response_wait_duration: std::time::Duration::from_nanos(
self.response_wait_nanos.load(Ordering::Relaxed),
),
inference_stream_duration: std::time::Duration::from_nanos(
self.inference_stream_nanos.load(Ordering::Relaxed),
),
}
}
}
fn duration_nanos(duration: std::time::Duration) -> u64 {
u64::try_from(duration.as_nanos()).unwrap_or(u64::MAX)
}
/// The context needed for a single turn of the thread.
#[derive(Debug)]
pub(crate) struct TurnContext {
@@ -485,6 +537,7 @@ pub(crate) struct TurnContext {
pub(crate) tool_call_gate: Arc<ReadinessFlag>,
pub(crate) truncation_policy: TruncationPolicy,
pub(crate) dynamic_tools: Vec<DynamicToolSpec>,
pub(crate) timing: TurnTimingState,
}
impl TurnContext {
@@ -499,6 +552,22 @@ impl TurnContext {
.as_deref()
.unwrap_or(compact::SUMMARIZATION_PROMPT)
}
pub(crate) fn record_local_tool_call(&self, duration: std::time::Duration) {
self.timing.record_tool_call(duration);
}
pub(crate) fn record_inference_wait(&self, duration: std::time::Duration) {
self.timing.record_inference_wait(duration);
}
pub(crate) fn record_inference_stream(&self, duration: std::time::Duration) {
self.timing.record_inference_stream(duration);
}
pub(crate) fn timing_snapshot(&self) -> TurnTimingStats {
self.timing.snapshot()
}
}
#[derive(Clone)]
@@ -672,6 +741,7 @@ impl Session {
tool_call_gate: Arc::new(ReadinessFlag::new()),
truncation_policy: model_info.truncation_policy.into(),
dynamic_tools: session_configuration.dynamic_tools.clone(),
timing: TurnTimingState::default(),
}
}
@@ -1369,6 +1439,14 @@ impl Session {
}
}
pub(crate) async fn emit_turn_timing_update(&self, turn_context: &TurnContext) {
let event = EventMsg::TurnTimingUpdate(TurnTimingUpdateEvent {
turn_id: turn_context.sub_id.clone(),
stats: turn_context.timing_snapshot(),
});
self.send_event(turn_context, event).await;
}
pub(crate) async fn send_event_raw(&self, event: Event) {
// Record the last known agent status.
if let Some(status) = agent_status_from_event(&event.msg) {
@@ -3081,6 +3159,7 @@ async fn spawn_review_thread(
tool_call_gate: Arc::new(ReadinessFlag::new()),
dynamic_tools: parent_turn_context.dynamic_tools.clone(),
truncation_policy: model_info.truncation_policy.into(),
timing: TurnTimingState::default(),
};
// Seed the child task with the review prompt as the initial user message.
@@ -3633,6 +3712,71 @@ struct SamplingRequestResult {
last_agent_message: Option<String>,
}
fn response_stream_started(event: &ResponseEvent) -> bool {
match event {
ResponseEvent::Created
| ResponseEvent::RateLimits(_)
| ResponseEvent::ModelsEtag(_)
| ResponseEvent::ServerReasoningIncluded(_)
| ResponseEvent::ReasoningSummaryPartAdded { .. }
| ResponseEvent::Completed { .. } => false,
ResponseEvent::OutputTextDelta(delta) => !delta.is_empty(),
ResponseEvent::ReasoningSummaryDelta { delta, .. } => !delta.is_empty(),
ResponseEvent::ReasoningContentDelta { delta, .. } => !delta.is_empty(),
ResponseEvent::OutputItemAdded(item) | ResponseEvent::OutputItemDone(item) => {
response_item_has_content(item)
}
}
}
fn response_item_has_content(item: &ResponseItem) -> bool {
match item {
ResponseItem::Message { content, .. } => content.iter().any(|entry| {
matches!(entry, ContentItem::OutputText { text } if !text.is_empty())
}),
ResponseItem::Reasoning {
summary,
content,
encrypted_content,
..
} => {
let summary_has_text = summary.iter().any(|entry| {
matches!(entry, ReasoningItemReasoningSummary::SummaryText { text } if !text.is_empty())
});
let content_has_text = content.as_ref().is_some_and(|content| {
content.iter().any(|entry| {
matches!(
entry,
ReasoningItemContent::ReasoningText { text }
| ReasoningItemContent::Text { text }
if !text.is_empty()
)
})
});
let encrypted_has_text = encrypted_content
.as_ref()
.is_some_and(|text| !text.is_empty());
summary_has_text || content_has_text || encrypted_has_text
}
ResponseItem::LocalShellCall { .. } => true,
ResponseItem::FunctionCall { arguments, .. } => !arguments.is_empty(),
ResponseItem::FunctionCallOutput { output, .. } => {
!output.content.is_empty()
|| output
.content_items
.as_ref()
.is_some_and(|items| !items.is_empty())
}
ResponseItem::CustomToolCall { input, .. } => !input.is_empty(),
ResponseItem::CustomToolCallOutput { output, .. } => !output.is_empty(),
ResponseItem::WebSearchCall { action: Some(_), .. } => true,
ResponseItem::WebSearchCall { action: None, .. } => false,
ResponseItem::GhostSnapshot { .. } => true,
ResponseItem::Compaction { encrypted_content } => !encrypted_content.is_empty(),
ResponseItem::Other => true,
}
}
async fn drain_in_flight(
in_flight: &mut FuturesOrdered<BoxFuture<'static, CodexResult<ResponseInputItem>>>,
sess: Arc<Session>,
@@ -3699,11 +3843,27 @@ async fn try_run_sampling_request(
);
sess.persist_rollout_items(&[rollout_item]).await;
let mut stream = client_session
let response_wait_start = std::time::Instant::now();
let mut stream = match client_session
.stream(prompt)
.instrument(trace_span!("stream_request"))
.or_cancel(&cancellation_token)
.await??;
.await
{
Ok(result) => match result {
Ok(stream) => stream,
Err(err) => {
turn_context.record_inference_wait(response_wait_start.elapsed());
sess.emit_turn_timing_update(&turn_context).await;
return Err(err);
}
},
Err(codex_async_utils::CancelErr::Cancelled) => {
turn_context.record_inference_wait(response_wait_start.elapsed());
sess.emit_turn_timing_update(&turn_context).await;
return Err(CodexErr::TurnAborted);
}
};
let tool_runtime = ToolCallRuntime::new(
Arc::clone(&router),
@@ -3718,6 +3878,9 @@ async fn try_run_sampling_request(
let mut active_item: Option<TurnItem> = None;
let mut should_emit_turn_diff = false;
let receiving_span = trace_span!("receiving_stream");
let mut inference_wait_recorded = false;
let mut inference_stream_started_at: Option<std::time::Instant> = None;
let mut inference_stream_recorded = false;
let outcome: CodexResult<SamplingRequestResult> = loop {
let handle_responses = trace_span!(
parent: &receiving_span,
@@ -3747,6 +3910,15 @@ async fn try_run_sampling_request(
}
};
if !inference_wait_recorded && response_stream_started(&event) {
turn_context.record_inference_wait(response_wait_start.elapsed());
sess.emit_turn_timing_update(&turn_context).await;
inference_wait_recorded = true;
if inference_stream_started_at.is_none() {
inference_stream_started_at = Some(std::time::Instant::now());
}
}
sess.services
.otel_manager
.record_responses(&handle_responses, &event);
@@ -3804,6 +3976,13 @@ async fn try_run_sampling_request(
should_emit_turn_diff = true;
needs_follow_up |= sess.has_pending_input().await;
if !inference_stream_recorded {
if let Some(start) = inference_stream_started_at {
turn_context.record_inference_stream(start.elapsed());
sess.emit_turn_timing_update(&turn_context).await;
inference_stream_recorded = true;
}
}
break Ok(SamplingRequestResult {
needs_follow_up,
@@ -3877,6 +4056,17 @@ async fn try_run_sampling_request(
}
};
if !inference_wait_recorded {
turn_context.record_inference_wait(response_wait_start.elapsed());
sess.emit_turn_timing_update(&turn_context).await;
}
if !inference_stream_recorded {
if let Some(start) = inference_stream_started_at {
turn_context.record_inference_stream(start.elapsed());
sess.emit_turn_timing_update(&turn_context).await;
}
}
drain_in_flight(&mut in_flight, sess.clone(), turn_context.clone()).await?;
if should_emit_turn_diff {

View File

@@ -63,14 +63,17 @@ pub(crate) async fn handle_mcp_tool_call(
if let Err(e) = &result {
tracing::warn!("MCP tool call error: {e:?}");
}
let duration = start.elapsed();
let tool_call_end_event = EventMsg::McpToolCallEnd(McpToolCallEndEvent {
call_id: call_id.clone(),
invocation,
duration: start.elapsed(),
duration,
result: result.clone(),
});
notify_mcp_tool_call_event(sess, turn_context, tool_call_end_event.clone()).await;
turn_context.record_local_tool_call(duration);
sess.emit_turn_timing_update(turn_context).await;
let status = if result.is_ok() { "ok" } else { "error" };
turn_context

View File

@@ -52,6 +52,7 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool {
| EventMsg::Warning(_)
| EventMsg::TurnStarted(_)
| EventMsg::TurnComplete(_)
| EventMsg::TurnTimingUpdate(_)
| EventMsg::AgentMessageDelta(_)
| EventMsg::AgentReasoningDelta(_)
| EventMsg::AgentReasoningRawContentDelta(_)

View File

@@ -200,7 +200,10 @@ impl Session {
if should_close_processes {
self.close_unified_exec_processes().await;
}
let event = EventMsg::TurnComplete(TurnCompleteEvent { last_agent_message });
let event = EventMsg::TurnComplete(TurnCompleteEvent {
last_agent_message,
timing: Some(turn_context.timing_snapshot()),
});
self.send_event(turn_context.as_ref(), event).await;
}

View File

@@ -273,8 +273,9 @@ impl ToolEmitter {
ctx: ToolEventCtx<'_>,
out: Result<ExecToolCallOutput, ToolError>,
) -> Result<String, FunctionCallError> {
let (event, result) = match out {
let (event, result, tool_duration) = match out {
Ok(output) => {
let duration = output.duration;
let content = self.format_exec_output_for_model(&output, ctx);
let exit_code = output.exit_code;
let event = ToolEventStage::Success(output);
@@ -283,20 +284,21 @@ impl ToolEmitter {
} else {
Err(FunctionCallError::RespondToModel(content))
};
(event, result)
(event, result, duration)
}
Err(ToolError::Codex(CodexErr::Sandbox(SandboxErr::Timeout { output })))
| Err(ToolError::Codex(CodexErr::Sandbox(SandboxErr::Denied { output }))) => {
let duration = output.duration;
let response = self.format_exec_output_for_model(&output, ctx);
let event = ToolEventStage::Failure(ToolEventFailure::Output(*output));
let result = Err(FunctionCallError::RespondToModel(response));
(event, result)
(event, result, duration)
}
Err(ToolError::Codex(err)) => {
let message = format!("execution error: {err:?}");
let event = ToolEventStage::Failure(ToolEventFailure::Message(message.clone()));
let result = Err(FunctionCallError::RespondToModel(message));
(event, result)
(event, result, Duration::ZERO)
}
Err(ToolError::Rejected(msg)) => {
// Normalize common rejection messages for exec tools so tests and
@@ -313,10 +315,12 @@ impl ToolEmitter {
};
let event = ToolEventStage::Failure(ToolEventFailure::Message(normalized.clone()));
let result = Err(FunctionCallError::RespondToModel(normalized));
(event, result)
(event, result, Duration::ZERO)
}
};
self.emit(ctx, event).await;
ctx.turn.record_local_tool_call(tool_duration);
ctx.session.emit_turn_timing_update(ctx.turn).await;
result
}
}

View File

@@ -30,6 +30,7 @@ use codex_core::protocol::StreamErrorEvent;
use codex_core::protocol::TurnAbortReason;
use codex_core::protocol::TurnCompleteEvent;
use codex_core::protocol::TurnDiffEvent;
use codex_core::protocol::TurnTimingStats;
use codex_core::protocol::WarningEvent;
use codex_core::protocol::WebSearchEndEvent;
use codex_core::web_search::web_search_detail;
@@ -73,6 +74,11 @@ pub(crate) struct EventProcessorWithHumanOutput {
last_message_path: Option<PathBuf>,
last_total_token_usage: Option<codex_core::protocol::TokenUsageInfo>,
final_message: Option<String>,
last_turn_started_at: Option<Instant>,
last_turn_elapsed: Option<std::time::Duration>,
last_turn_timing: Option<TurnTimingStats>,
turn_start_output_tokens: Option<i64>,
last_turn_output_tokens: Option<i64>,
}
impl EventProcessorWithHumanOutput {
@@ -99,6 +105,11 @@ impl EventProcessorWithHumanOutput {
last_message_path,
last_total_token_usage: None,
final_message: None,
last_turn_started_at: None,
last_turn_elapsed: None,
last_turn_timing: None,
turn_start_output_tokens: None,
last_turn_output_tokens: None,
}
} else {
Self {
@@ -116,6 +127,11 @@ impl EventProcessorWithHumanOutput {
last_message_path,
last_total_token_usage: None,
final_message: None,
last_turn_started_at: None,
last_turn_elapsed: None,
last_turn_timing: None,
turn_start_output_tokens: None,
last_turn_output_tokens: None,
}
}
}
@@ -244,7 +260,12 @@ impl EventProcessor for EventProcessorWithHumanOutput {
ts_msg!(self, "{}", message.style(self.dimmed));
}
EventMsg::TurnStarted(_) => {
// Ignore.
self.last_turn_started_at = Some(Instant::now());
self.turn_start_output_tokens = self
.last_total_token_usage
.as_ref()
.map(|info| info.total_token_usage.output_tokens);
self.last_turn_output_tokens = None;
}
EventMsg::ElicitationRequest(ev) => {
ts_msg!(
@@ -259,13 +280,28 @@ impl EventProcessor for EventProcessorWithHumanOutput {
"auto-cancelling (not supported in exec mode)".style(self.dimmed)
);
}
EventMsg::TurnComplete(TurnCompleteEvent { last_agent_message }) => {
EventMsg::TurnComplete(TurnCompleteEvent {
last_agent_message,
timing,
}) => {
let last_message = last_agent_message.as_deref();
if let Some(output_file) = self.last_message_path.as_deref() {
handle_last_message(last_message, output_file);
}
self.final_message = last_agent_message;
self.last_turn_timing = timing;
self.last_turn_elapsed = self.last_turn_started_at.map(|start| start.elapsed());
self.last_turn_output_tokens = self
.last_total_token_usage
.as_ref()
.map(|info| {
let total = info.total_token_usage.output_tokens;
match self.turn_start_output_tokens {
Some(start) => total.saturating_sub(start),
None => total,
}
});
return CodexStatus::InitiateShutdown;
}
@@ -771,6 +807,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
| EventMsg::ReasoningContentDelta(_)
| EventMsg::ReasoningRawContentDelta(_)
| EventMsg::SkillsUpdateAvailable
| EventMsg::TurnTimingUpdate(_)
| EventMsg::UndoCompleted(_)
| EventMsg::UndoStarted(_)
| EventMsg::ThreadRolledBack(_)
@@ -789,6 +826,58 @@ impl EventProcessor for EventProcessorWithHumanOutput {
);
}
if let Some(timing) = &self.last_turn_timing {
let tool_calls = timing.tool_calls;
let tool_label = if tool_calls == 1 { "call" } else { "calls" };
let tool_duration = format_duration(timing.local_tool_duration);
let inference_calls = timing.inference_calls;
let inference_label = if inference_calls == 1 {
"call"
} else {
"calls"
};
let response_wait = format_duration(timing.response_wait_duration);
let inference_stream = format_duration(timing.inference_stream_duration);
let mut parts = Vec::new();
if let Some(elapsed) = self.last_turn_elapsed {
parts.push(format!("Worked for {}", format_duration(elapsed)));
}
parts.push(format!(
"Local tools: {tool_calls} {tool_label}, {tool_duration}"
));
parts.push(format!(
"Inference: {inference_calls} {inference_label}, {response_wait} wait, {inference_stream} stream"
));
let summary = parts.join("");
eprintln!("{summary}");
}
if let Some(output_tokens) = self.last_turn_output_tokens {
let output_tokens = output_tokens.max(0);
let mut detail = format!(
"Output tokens: {}",
format_with_separators(output_tokens)
);
if let Some(timing) = &self.last_turn_timing
&& output_tokens > 0
{
let stream = timing.inference_stream_duration;
let per_token_ms = stream.as_secs_f64() * 1000.0 / output_tokens as f64;
let per_token = if per_token_ms >= 1000.0 {
format!("{:.2}s/token", per_token_ms / 1000.0)
} else {
format!("{:.2}ms/token", per_token_ms)
};
detail = format!(
"{detail} • Stream: {}{per_token}",
format_duration(stream)
);
}
eprintln!("{detail}");
}
// If the user has not piped the final message to a file, they will see
// it twice: once written to stderr as part of the normal event
// processing, and once here on stdout. We print the token summary above

View File

@@ -852,6 +852,7 @@ impl EventProcessor for EventProcessorWithJsonOutput {
match msg {
protocol::EventMsg::TurnComplete(protocol::TurnCompleteEvent {
last_agent_message,
..
}) => {
if let Some(output_file) = self.last_message_path.as_deref() {
handle_last_message(last_agent_message.as_deref(), output_file);

View File

@@ -306,6 +306,7 @@ fn plan_update_emits_todo_list_started_updated_and_completed() {
"p3",
EventMsg::TurnComplete(codex_core::protocol::TurnCompleteEvent {
last_agent_message: None,
timing: None,
}),
);
let out_complete = ep.collect_thread_events(&complete);
@@ -679,6 +680,7 @@ fn plan_update_after_complete_starts_new_todo_list_with_new_id() {
"t2",
EventMsg::TurnComplete(codex_core::protocol::TurnCompleteEvent {
last_agent_message: None,
timing: None,
}),
);
let _ = ep.collect_thread_events(&complete);
@@ -831,6 +833,7 @@ fn error_followed_by_task_complete_produces_turn_failed() {
"e2",
EventMsg::TurnComplete(codex_core::protocol::TurnCompleteEvent {
last_agent_message: None,
timing: None,
}),
);
assert_eq!(
@@ -1278,6 +1281,7 @@ fn task_complete_produces_turn_completed_with_usage() {
"e2",
EventMsg::TurnComplete(codex_core::protocol::TurnCompleteEvent {
last_agent_message: Some("done".to_string()),
timing: None,
}),
);
let out = ep.collect_thread_events(&complete_event);

View File

@@ -291,7 +291,9 @@ async fn run_codex_tool_session_inner(
.await;
continue;
}
EventMsg::TurnComplete(TurnCompleteEvent { last_agent_message }) => {
EventMsg::TurnComplete(TurnCompleteEvent {
last_agent_message, ..
}) => {
let text = match last_agent_message {
Some(msg) => msg,
None => "".to_string(),
@@ -324,6 +326,7 @@ async fn run_codex_tool_session_inner(
| EventMsg::AgentReasoningRawContentDelta(_)
| EventMsg::TurnStarted(_)
| EventMsg::TokenCount(_)
| EventMsg::TurnTimingUpdate(_)
| EventMsg::AgentReasoning(_)
| EventMsg::AgentReasoningSectionBreak(_)
| EventMsg::McpToolCallBegin(_)

View File

@@ -704,6 +704,9 @@ pub enum EventMsg {
#[serde(rename = "task_complete", alias = "turn_complete")]
TurnComplete(TurnCompleteEvent),
/// Snapshot of timing statistics for the active turn.
TurnTimingUpdate(TurnTimingUpdateEvent),
/// Usage update for the current session, including totals and last turn.
/// Optional means unknown — UIs should not display when `None`.
TokenCount(TokenCountEvent),
@@ -1088,6 +1091,8 @@ pub struct ContextCompactedEvent;
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
pub struct TurnCompleteEvent {
pub last_agent_message: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub timing: Option<TurnTimingStats>,
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
@@ -1096,6 +1101,25 @@ pub struct TurnStartedEvent {
pub model_context_window: Option<i64>,
}
#[derive(Debug, Clone, Deserialize, Serialize, Default, PartialEq, Eq, JsonSchema, TS)]
pub struct TurnTimingStats {
pub tool_calls: u64,
pub inference_calls: u64,
#[ts(type = "string")]
pub local_tool_duration: Duration,
#[ts(type = "string")]
pub response_wait_duration: Duration,
#[serde(default)]
#[ts(type = "string")]
pub inference_stream_duration: Duration,
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
pub struct TurnTimingUpdateEvent {
pub turn_id: String,
pub stats: TurnTimingStats,
}
#[derive(Debug, Clone, Deserialize, Serialize, Default, PartialEq, Eq, JsonSchema, TS)]
pub struct TokenUsage {
#[ts(type = "number")]

View File

@@ -81,6 +81,8 @@ use codex_core::protocol::TokenUsageInfo;
use codex_core::protocol::TurnAbortReason;
use codex_core::protocol::TurnCompleteEvent;
use codex_core::protocol::TurnDiffEvent;
use codex_core::protocol::TurnTimingStats;
use codex_core::protocol::TurnTimingUpdateEvent;
use codex_core::protocol::UndoCompletedEvent;
use codex_core::protocol::UndoStartedEvent;
use codex_core::protocol::UserMessageEvent;
@@ -526,6 +528,10 @@ pub(crate) struct ChatWidget {
// This lets the separator show per-chunk work time (since the previous separator) rather than
// the total task-running time reported by the status indicator.
last_separator_elapsed_secs: Option<u64>,
// Latest timing snapshot for the active turn.
current_turn_timing: Option<TurnTimingStats>,
// Timing snapshot captured at the last emitted final-message separator.
last_separator_timing: Option<TurnTimingStats>,
last_rendered_width: std::cell::Cell<Option<usize>>,
// Feedback sink for /feedback
@@ -912,6 +918,8 @@ impl ChatWidget {
fn on_task_started(&mut self) {
self.agent_turn_running = true;
self.saw_plan_update_this_turn = false;
self.current_turn_timing = None;
self.last_separator_timing = Some(TurnTimingStats::default());
self.bottom_pane.clear_quit_shortcut_hint();
self.quit_shortcut_expires_at = None;
self.quit_shortcut_key = None;
@@ -924,12 +932,20 @@ impl ChatWidget {
self.request_redraw();
}
fn on_task_complete(&mut self, last_agent_message: Option<String>, from_replay: bool) {
fn on_task_complete(
&mut self,
last_agent_message: Option<String>,
timing: Option<TurnTimingStats>,
from_replay: bool,
) {
// If a stream is currently active, finalize it.
self.flush_answer_stream_with_separator();
self.flush_unified_exec_wait_streak();
// Mark task stopped and request redraw now that all content is in history.
self.agent_turn_running = false;
if let Some(timing) = timing {
self.current_turn_timing = Some(timing);
}
self.update_task_running_state();
self.running_commands.clear();
self.suppressed_exec_calls.clear();
@@ -951,6 +967,10 @@ impl ChatWidget {
self.maybe_show_pending_rate_limit_prompt();
}
fn on_turn_timing_update(&mut self, event: TurnTimingUpdateEvent) {
self.current_turn_timing = Some(event.stats);
}
fn maybe_prompt_plan_implementation(&mut self, last_agent_message: Option<&str>) {
if !self.collaboration_modes_enabled() {
return;
@@ -1715,7 +1735,11 @@ impl ChatWidget {
.status_widget()
.map(super::status_indicator_widget::StatusIndicatorWidget::elapsed_seconds)
.map(|current| self.worked_elapsed_from(current));
self.add_to_history(history_cell::FinalMessageSeparator::new(elapsed_seconds));
let timing = self.worked_timing_from();
self.add_to_history(history_cell::FinalMessageSeparator::new(
elapsed_seconds,
timing,
));
self.needs_final_message_separator = false;
self.had_work_activity = false;
} else if self.needs_final_message_separator {
@@ -1745,6 +1769,28 @@ impl ChatWidget {
elapsed
}
fn worked_timing_from(&mut self) -> Option<TurnTimingStats> {
let current = self.current_turn_timing.clone()?;
let baseline = self.last_separator_timing.clone().unwrap_or_default();
let delta = TurnTimingStats {
tool_calls: current.tool_calls.saturating_sub(baseline.tool_calls),
inference_calls: current
.inference_calls
.saturating_sub(baseline.inference_calls),
local_tool_duration: current
.local_tool_duration
.saturating_sub(baseline.local_tool_duration),
response_wait_duration: current
.response_wait_duration
.saturating_sub(baseline.response_wait_duration),
inference_stream_duration: current
.inference_stream_duration
.saturating_sub(baseline.inference_stream_duration),
};
self.last_separator_timing = Some(current);
Some(delta)
}
pub(crate) fn handle_exec_end_now(&mut self, ev: ExecCommandEndEvent) {
let running = self.running_commands.remove(&ev.call_id);
if self.suppressed_exec_calls.remove(&ev.call_id) {
@@ -2095,6 +2141,8 @@ impl ChatWidget {
had_work_activity: false,
saw_plan_update_this_turn: false,
last_separator_elapsed_secs: None,
current_turn_timing: None,
last_separator_timing: None,
last_rendered_width: std::cell::Cell::new(None),
feedback,
current_rollout_path: None,
@@ -2233,6 +2281,8 @@ impl ChatWidget {
needs_final_message_separator: false,
had_work_activity: false,
last_separator_elapsed_secs: None,
current_turn_timing: None,
last_separator_timing: None,
last_rendered_width: std::cell::Cell::new(None),
feedback,
current_rollout_path: None,
@@ -2360,6 +2410,8 @@ impl ChatWidget {
had_work_activity: false,
saw_plan_update_this_turn: false,
last_separator_elapsed_secs: None,
current_turn_timing: None,
last_separator_timing: None,
last_rendered_width: std::cell::Cell::new(None),
feedback,
current_rollout_path: None,
@@ -3077,9 +3129,11 @@ impl ChatWidget {
}
EventMsg::AgentReasoningSectionBreak(_) => self.on_reasoning_section_break(),
EventMsg::TurnStarted(_) => self.on_task_started(),
EventMsg::TurnComplete(TurnCompleteEvent { last_agent_message }) => {
self.on_task_complete(last_agent_message, from_replay)
}
EventMsg::TurnComplete(TurnCompleteEvent {
last_agent_message,
timing,
}) => self.on_task_complete(last_agent_message, timing, from_replay),
EventMsg::TurnTimingUpdate(ev) => self.on_turn_timing_update(ev),
EventMsg::TokenCount(ev) => {
self.set_token_info(ev.info);
self.on_rate_limit_snapshot(ev.rate_limits);

View File

@@ -835,6 +835,8 @@ async fn make_chatwidget_manual(
had_work_activity: false,
saw_plan_update_this_turn: false,
last_separator_elapsed_secs: None,
current_turn_timing: None,
last_separator_timing: None,
last_rendered_width: std::cell::Cell::new(None),
feedback: codex_feedback::CodexFeedback::new(),
current_rollout_path: None,
@@ -1240,6 +1242,7 @@ async fn plan_implementation_popup_skips_replayed_turn_complete() {
chat.replay_initial_messages(vec![EventMsg::TurnComplete(TurnCompleteEvent {
last_agent_message: Some("Plan details".to_string()),
timing: None,
})]);
let popup = render_bottom_popup(&chat, 80);
@@ -1260,7 +1263,7 @@ async fn plan_implementation_popup_skips_when_messages_queued() {
chat.bottom_pane.set_task_running(true);
chat.queue_user_message("Queued message".into());
chat.on_task_complete(Some("Plan details".to_string()), false);
chat.on_task_complete(Some("Plan details".to_string()), None, false);
let popup = render_bottom_popup(&chat, 80);
assert!(
@@ -1286,7 +1289,7 @@ async fn plan_implementation_popup_shows_on_plan_update_without_message() {
status: StepStatus::Pending,
}],
});
chat.on_task_complete(None, false);
chat.on_task_complete(None, None, false);
let popup = render_bottom_popup(&chat, 80);
assert!(
@@ -1315,7 +1318,7 @@ async fn plan_implementation_popup_skips_when_rate_limit_prompt_pending() {
}],
});
chat.on_rate_limit_snapshot(Some(snapshot(92.0)));
chat.on_task_complete(None, false);
chat.on_task_complete(None, None, false);
let popup = render_bottom_popup(&chat, 80);
assert!(
@@ -1911,7 +1914,7 @@ async fn unified_exec_end_after_task_complete_is_suppressed() {
);
drain_insert_history(&mut rx);
chat.on_task_complete(None, false);
chat.on_task_complete(None, None, false);
end_exec(&mut chat, begin, "", "", 0);
let cells = drain_insert_history(&mut rx);
@@ -1925,7 +1928,7 @@ async fn unified_exec_end_after_task_complete_is_suppressed() {
async fn unified_exec_interaction_after_task_complete_is_suppressed() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
chat.on_task_started();
chat.on_task_complete(None, false);
chat.on_task_complete(None, None, false);
chat.handle_codex_event(Event {
id: "call-1".to_string(),
@@ -1966,6 +1969,7 @@ async fn unified_exec_wait_after_final_agent_message_snapshot() {
id: "turn-1".into(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
last_agent_message: Some("Final response.".into()),
timing: None,
}),
});
@@ -2005,6 +2009,7 @@ async fn unified_exec_wait_before_streamed_agent_message_snapshot() {
id: "turn-1".into(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
last_agent_message: None,
timing: None,
}),
});
@@ -2055,6 +2060,7 @@ async fn unified_exec_waiting_multiple_empty_snapshots() {
id: "turn-wait-1".into(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
last_agent_message: None,
timing: None,
}),
});
@@ -2106,6 +2112,7 @@ async fn unified_exec_non_empty_then_empty_snapshots() {
id: "turn-wait-3".into(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
last_agent_message: None,
timing: None,
}),
});
@@ -3768,6 +3775,7 @@ async fn turn_complete_clears_unified_exec_processes() {
id: "turn-1".into(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
last_agent_message: None,
timing: None,
}),
});
@@ -4554,6 +4562,7 @@ async fn multiple_agent_messages_in_single_turn_emit_multiple_headers() {
id: "s1".into(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
last_agent_message: None,
timing: None,
}),
});
@@ -4842,6 +4851,7 @@ printf 'fenced within fenced\n'
id: "t1".into(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
last_agent_message: None,
timing: None,
}),
});
for lines in drain_insert_history(&mut rx) {

View File

@@ -36,6 +36,7 @@ use crate::wrapping::RtOptions;
use crate::wrapping::word_wrap_line;
use crate::wrapping::word_wrap_lines;
use base64::Engine;
use codex_common::elapsed::format_duration;
use codex_common::format_env_display::format_env_display;
use codex_core::config::Config;
use codex_core::config::types::McpServerTransportConfig;
@@ -43,6 +44,7 @@ use codex_core::protocol::FileChange;
use codex_core::protocol::McpAuthStatus;
use codex_core::protocol::McpInvocation;
use codex_core::protocol::SessionConfiguredEvent;
use codex_core::protocol::TurnTimingStats;
use codex_core::web_search::web_search_detail;
use codex_protocol::models::WebSearchAction;
use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
@@ -1877,19 +1879,57 @@ pub(crate) fn new_reasoning_summary_block(full_reasoning_buffer: String) -> Box<
/// divider.
pub struct FinalMessageSeparator {
elapsed_seconds: Option<u64>,
timing: Option<TurnTimingStats>,
}
impl FinalMessageSeparator {
/// Creates a separator; `elapsed_seconds` typically comes from the status indicator timer.
pub(crate) fn new(elapsed_seconds: Option<u64>) -> Self {
Self { elapsed_seconds }
pub(crate) fn new(elapsed_seconds: Option<u64>, timing: Option<TurnTimingStats>) -> Self {
Self {
elapsed_seconds,
timing,
}
}
}
impl HistoryCell for FinalMessageSeparator {
fn display_lines(&self, width: u16) -> Vec<Line<'static>> {
let elapsed_seconds = self
if let Some(timing) = &self.timing {
let tool_calls = timing.tool_calls;
let tool_label = if tool_calls == 1 { "call" } else { "calls" };
let tool_duration = format_duration(timing.local_tool_duration);
let inference_calls = timing.inference_calls;
let inference_label = if inference_calls == 1 {
"call"
} else {
"calls"
};
let response_wait = format_duration(timing.response_wait_duration);
let inference_stream = format_duration(timing.inference_stream_duration);
let summary = if let Some(elapsed_seconds) = self
.elapsed_seconds
.map(super::status_indicator_widget::fmt_elapsed_compact)
{
format!(
"─ Worked for {elapsed_seconds} • Local tools: {tool_calls} {tool_label}, {tool_duration} • Inference: {inference_calls} {inference_label}, {response_wait} wait, {inference_stream} stream ─"
)
} else {
format!(
"─ Local tools: {tool_calls} {tool_label}, {tool_duration} • Inference: {inference_calls} {inference_label}, {response_wait} wait, {inference_stream} stream ─"
)
};
let summary_width = summary.width();
vec![
Line::from_iter([
summary,
"".repeat((width as usize).saturating_sub(summary_width)),
])
.dim(),
]
} else if let Some(elapsed_seconds) = self
.elapsed_seconds
.map(super::status_indicator_widget::fmt_elapsed_compact);
if let Some(elapsed_seconds) = elapsed_seconds {
.map(super::status_indicator_widget::fmt_elapsed_compact)
{
let worked_for = format!("─ Worked for {elapsed_seconds}");
let worked_for_width = worked_for.width();
vec![