Files
codex/codex-rs/exec/src/event_processor_with_human_output.rs
Won Park ee1a20258a Enabling CWD Saving for Image-Gen (#13607)
Codex now saves the generated image on to your current working
directory.
2026-03-06 00:47:21 -08:00

1278 lines
47 KiB
Rust

use codex_core::config::Config;
use codex_core::web_search::web_search_detail;
use codex_protocol::items::TurnItem;
use codex_protocol::num_format::format_with_separators;
use codex_protocol::protocol::AgentMessageEvent;
use codex_protocol::protocol::AgentReasoningRawContentEvent;
use codex_protocol::protocol::AgentStatus;
use codex_protocol::protocol::BackgroundEventEvent;
use codex_protocol::protocol::CollabAgentInteractionBeginEvent;
use codex_protocol::protocol::CollabAgentInteractionEndEvent;
use codex_protocol::protocol::CollabAgentSpawnBeginEvent;
use codex_protocol::protocol::CollabAgentSpawnEndEvent;
use codex_protocol::protocol::CollabCloseBeginEvent;
use codex_protocol::protocol::CollabCloseEndEvent;
use codex_protocol::protocol::CollabWaitingBeginEvent;
use codex_protocol::protocol::CollabWaitingEndEvent;
use codex_protocol::protocol::DeprecationNoticeEvent;
use codex_protocol::protocol::ErrorEvent;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::ExecCommandBeginEvent;
use codex_protocol::protocol::ExecCommandEndEvent;
use codex_protocol::protocol::FileChange;
use codex_protocol::protocol::ItemCompletedEvent;
use codex_protocol::protocol::McpInvocation;
use codex_protocol::protocol::McpToolCallBeginEvent;
use codex_protocol::protocol::McpToolCallEndEvent;
use codex_protocol::protocol::PatchApplyBeginEvent;
use codex_protocol::protocol::PatchApplyEndEvent;
use codex_protocol::protocol::SessionConfiguredEvent;
use codex_protocol::protocol::StreamErrorEvent;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnCompleteEvent;
use codex_protocol::protocol::TurnDiffEvent;
use codex_protocol::protocol::WarningEvent;
use codex_protocol::protocol::WebSearchEndEvent;
use codex_utils_elapsed::format_duration;
use codex_utils_elapsed::format_elapsed;
use owo_colors::OwoColorize;
use owo_colors::Style;
use serde::Deserialize;
use shlex::try_join;
use std::collections::HashMap;
use std::io::IsTerminal;
use std::io::Write;
use std::path::PathBuf;
use std::time::Duration;
use std::time::Instant;
use crate::event_processor::CodexStatus;
use crate::event_processor::EventProcessor;
use crate::event_processor::handle_last_message;
use codex_protocol::plan_tool::StepStatus;
use codex_protocol::plan_tool::UpdatePlanArgs;
use codex_utils_sandbox_summary::create_config_summary_entries;
/// This should be configurable. When used in CI, users may not want to impose
/// a limit so they can see the full transcript.
const MAX_OUTPUT_LINES_FOR_EXEC_TOOL_CALL: usize = 20;
pub(crate) struct EventProcessorWithHumanOutput {
call_id_to_patch: HashMap<String, PatchApplyBegin>,
// To ensure that --color=never is respected, ANSI escapes _must_ be added
// using .style() with one of these fields. If you need a new style, add a
// new field here.
bold: Style,
italic: Style,
dimmed: Style,
magenta: Style,
red: Style,
green: Style,
cyan: Style,
yellow: Style,
/// Whether to include `AgentReasoning` events in the output.
show_agent_reasoning: bool,
show_raw_agent_reasoning: bool,
last_message_path: Option<PathBuf>,
last_total_token_usage: Option<codex_protocol::protocol::TokenUsageInfo>,
final_message: Option<String>,
last_proposed_plan: Option<String>,
progress_active: bool,
progress_last_len: usize,
use_ansi_cursor: bool,
progress_anchor: bool,
progress_done: bool,
}
impl EventProcessorWithHumanOutput {
pub(crate) fn create_with_ansi(
with_ansi: bool,
cursor_ansi: bool,
config: &Config,
last_message_path: Option<PathBuf>,
) -> Self {
let call_id_to_patch = HashMap::new();
if with_ansi {
Self {
call_id_to_patch,
bold: Style::new().bold(),
italic: Style::new().italic(),
dimmed: Style::new().dimmed(),
magenta: Style::new().magenta(),
red: Style::new().red(),
green: Style::new().green(),
cyan: Style::new().cyan(),
yellow: Style::new().yellow(),
show_agent_reasoning: !config.hide_agent_reasoning,
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
last_message_path,
last_total_token_usage: None,
final_message: None,
last_proposed_plan: None,
progress_active: false,
progress_last_len: 0,
use_ansi_cursor: cursor_ansi,
progress_anchor: false,
progress_done: false,
}
} else {
Self {
call_id_to_patch,
bold: Style::new(),
italic: Style::new(),
dimmed: Style::new(),
magenta: Style::new(),
red: Style::new(),
green: Style::new(),
cyan: Style::new(),
yellow: Style::new(),
show_agent_reasoning: !config.hide_agent_reasoning,
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
last_message_path,
last_total_token_usage: None,
final_message: None,
last_proposed_plan: None,
progress_active: false,
progress_last_len: 0,
use_ansi_cursor: cursor_ansi,
progress_anchor: false,
progress_done: false,
}
}
}
}
#[derive(Debug, Deserialize)]
struct AgentJobProgressMessage {
job_id: String,
total_items: usize,
pending_items: usize,
running_items: usize,
completed_items: usize,
failed_items: usize,
eta_seconds: Option<u64>,
}
struct PatchApplyBegin {
start_time: Instant,
auto_approved: bool,
}
/// Timestamped helper. The timestamp is styled with self.dimmed.
macro_rules! ts_msg {
($self:ident, $($arg:tt)*) => {{
eprintln!($($arg)*);
}};
}
impl EventProcessor for EventProcessorWithHumanOutput {
/// Print a concise summary of the effective configuration that will be used
/// for the session. This mirrors the information shown in the TUI welcome
/// screen.
fn print_config_summary(
&mut self,
config: &Config,
prompt: &str,
session_configured_event: &SessionConfiguredEvent,
) {
const VERSION: &str = env!("CARGO_PKG_VERSION");
ts_msg!(
self,
"OpenAI Codex v{} (research preview)\n--------",
VERSION
);
let mut entries =
create_config_summary_entries(config, session_configured_event.model.as_str());
entries.push((
"session id",
session_configured_event.session_id.to_string(),
));
for (key, value) in entries {
eprintln!("{} {}", format!("{key}:").style(self.bold), value);
}
eprintln!("--------");
// Echo the prompt that will be sent to the agent so it is visible in the
// transcript/logs before any events come in. Note the prompt may have been
// read from stdin, so it may not be visible in the terminal otherwise.
ts_msg!(self, "{}\n{}", "user".style(self.cyan), prompt);
}
fn process_event(&mut self, event: Event) -> CodexStatus {
let Event { id: _, msg } = event;
if let EventMsg::BackgroundEvent(BackgroundEventEvent { message }) = &msg
&& let Some(update) = Self::parse_agent_job_progress(message)
{
self.render_agent_job_progress(update);
return CodexStatus::Running;
}
if self.progress_active && !Self::should_interrupt_progress(&msg) {
return CodexStatus::Running;
}
if !Self::is_silent_event(&msg) {
self.finish_progress_line();
}
match msg {
EventMsg::Error(ErrorEvent { message, .. }) => {
let prefix = "ERROR:".style(self.red);
ts_msg!(self, "{prefix} {message}");
}
EventMsg::Warning(WarningEvent { message }) => {
ts_msg!(
self,
"{} {message}",
"warning:".style(self.yellow).style(self.bold)
);
}
EventMsg::ModelReroute(_) => {}
EventMsg::DeprecationNotice(DeprecationNoticeEvent { summary, details }) => {
ts_msg!(
self,
"{} {summary}",
"deprecated:".style(self.magenta).style(self.bold)
);
if let Some(details) = details {
ts_msg!(self, " {}", details.style(self.dimmed));
}
}
EventMsg::McpStartupUpdate(update) => {
let status_text = match update.status {
codex_protocol::protocol::McpStartupStatus::Starting => "starting".to_string(),
codex_protocol::protocol::McpStartupStatus::Ready => "ready".to_string(),
codex_protocol::protocol::McpStartupStatus::Cancelled => {
"cancelled".to_string()
}
codex_protocol::protocol::McpStartupStatus::Failed { ref error } => {
format!("failed: {error}")
}
};
ts_msg!(
self,
"{} {} {}",
"mcp:".style(self.cyan),
update.server,
status_text
);
}
EventMsg::McpStartupComplete(summary) => {
let mut parts = Vec::new();
if !summary.ready.is_empty() {
parts.push(format!("ready: {}", summary.ready.join(", ")));
}
if !summary.failed.is_empty() {
let servers: Vec<_> = summary.failed.iter().map(|f| f.server.clone()).collect();
parts.push(format!("failed: {}", servers.join(", ")));
}
if !summary.cancelled.is_empty() {
parts.push(format!("cancelled: {}", summary.cancelled.join(", ")));
}
let joined = if parts.is_empty() {
"no servers".to_string()
} else {
parts.join("; ")
};
ts_msg!(self, "{} {}", "mcp startup:".style(self.cyan), joined);
}
EventMsg::BackgroundEvent(BackgroundEventEvent { message }) => {
ts_msg!(self, "{}", message.style(self.dimmed));
}
EventMsg::StreamError(StreamErrorEvent {
message,
additional_details,
..
}) => {
let message = match additional_details {
Some(details) if !details.trim().is_empty() => format!("{message} ({details})"),
_ => message,
};
ts_msg!(self, "{}", message.style(self.dimmed));
}
EventMsg::TurnStarted(_) => {
// Ignore.
}
EventMsg::ElicitationRequest(ev) => {
ts_msg!(
self,
"{} {}",
"elicitation request".style(self.magenta),
ev.server_name.style(self.dimmed)
);
ts_msg!(
self,
"{}",
"auto-cancelling (not supported in exec mode)".style(self.dimmed)
);
}
EventMsg::TurnComplete(TurnCompleteEvent {
last_agent_message, ..
}) => {
let last_message = last_agent_message
.as_deref()
.or(self.last_proposed_plan.as_deref());
if let Some(output_file) = self.last_message_path.as_deref() {
handle_last_message(last_message, output_file);
}
self.final_message = last_agent_message.or_else(|| self.last_proposed_plan.clone());
return CodexStatus::InitiateShutdown;
}
EventMsg::TokenCount(ev) => {
self.last_total_token_usage = ev.info;
}
EventMsg::AgentReasoningSectionBreak(_) => {
if !self.show_agent_reasoning {
return CodexStatus::Running;
}
eprintln!();
}
EventMsg::AgentReasoningRawContent(AgentReasoningRawContentEvent { text }) => {
if self.show_raw_agent_reasoning {
ts_msg!(
self,
"{}\n{}",
"thinking".style(self.italic).style(self.magenta),
text,
);
}
}
EventMsg::AgentMessage(AgentMessageEvent { message, .. }) => {
ts_msg!(
self,
"{}\n{}",
"codex".style(self.italic).style(self.magenta),
message,
);
}
EventMsg::ItemCompleted(ItemCompletedEvent {
item: TurnItem::Plan(item),
..
}) => {
self.last_proposed_plan = Some(item.text);
}
EventMsg::ExecCommandBegin(ExecCommandBeginEvent { command, cwd, .. }) => {
eprint!(
"{}\n{} in {}",
"exec".style(self.italic).style(self.magenta),
escape_command(&command).style(self.bold),
cwd.to_string_lossy(),
);
}
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
aggregated_output,
duration,
exit_code,
..
}) => {
let duration = format!(" in {}", format_duration(duration));
let truncated_output = aggregated_output
.lines()
.take(MAX_OUTPUT_LINES_FOR_EXEC_TOOL_CALL)
.collect::<Vec<_>>()
.join("\n");
match exit_code {
0 => {
let title = format!(" succeeded{duration}:");
ts_msg!(self, "{}", title.style(self.green));
}
_ => {
let title = format!(" exited {exit_code}{duration}:");
ts_msg!(self, "{}", title.style(self.red));
}
}
eprintln!("{}", truncated_output.style(self.dimmed));
}
EventMsg::McpToolCallBegin(McpToolCallBeginEvent {
call_id: _,
invocation,
}) => {
ts_msg!(
self,
"{} {}",
"tool".style(self.magenta),
format_mcp_invocation(&invocation).style(self.bold),
);
}
EventMsg::McpToolCallEnd(tool_call_end_event) => {
let is_success = tool_call_end_event.is_success();
let McpToolCallEndEvent {
call_id: _,
result,
invocation,
duration,
} = tool_call_end_event;
let duration = format!(" in {}", format_duration(duration));
let status_str = if is_success { "success" } else { "failed" };
let title_style = if is_success { self.green } else { self.red };
let title = format!(
"{} {status_str}{duration}:",
format_mcp_invocation(&invocation)
);
ts_msg!(self, "{}", title.style(title_style));
if let Ok(res) = result {
let val = serde_json::to_value(res)
.unwrap_or_else(|_| serde_json::Value::String("<result>".to_string()));
let pretty =
serde_json::to_string_pretty(&val).unwrap_or_else(|_| val.to_string());
for line in pretty.lines().take(MAX_OUTPUT_LINES_FOR_EXEC_TOOL_CALL) {
eprintln!("{}", line.style(self.dimmed));
}
}
}
EventMsg::WebSearchBegin(_) => {
ts_msg!(self, "🌐 Searching the web...");
}
EventMsg::WebSearchEnd(WebSearchEndEvent {
call_id: _,
query,
action,
}) => {
let detail = web_search_detail(Some(&action), &query);
if detail.is_empty() {
ts_msg!(self, "🌐 Searched the web");
} else {
ts_msg!(self, "🌐 Searched: {detail}");
}
}
EventMsg::ImageGenerationBegin(generated) => {
ts_msg!(
self,
"{} {}",
"image generation started".style(self.magenta),
generated.call_id
);
}
EventMsg::ImageGenerationEnd(generated) => {
if !generated.result.is_empty()
&& !generated.result.starts_with("data:")
&& !generated.result.starts_with("http://")
&& !generated.result.starts_with("https://")
&& !generated.result.starts_with("file://")
{
ts_msg!(
self,
"{} {} {}",
"generated image".style(self.magenta),
generated.call_id,
generated.result.style(self.dimmed)
);
} else {
ts_msg!(
self,
"{} {}",
"generated image".style(self.magenta),
generated.call_id
);
}
}
EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
call_id,
auto_approved,
changes,
..
}) => {
// Store metadata so we can calculate duration later when we
// receive the corresponding PatchApplyEnd event.
self.call_id_to_patch.insert(
call_id,
PatchApplyBegin {
start_time: Instant::now(),
auto_approved,
},
);
ts_msg!(
self,
"{}",
"file update".style(self.magenta).style(self.italic),
);
// Pretty-print the patch summary with colored diff markers so
// it's easy to scan in the terminal output.
for (path, change) in changes.iter() {
match change {
FileChange::Add { content } => {
let header = format!(
"{} {}",
format_file_change(change),
path.to_string_lossy()
);
eprintln!("{}", header.style(self.magenta));
for line in content.lines() {
eprintln!("{}", line.style(self.green));
}
}
FileChange::Delete { content } => {
let header = format!(
"{} {}",
format_file_change(change),
path.to_string_lossy()
);
eprintln!("{}", header.style(self.magenta));
for line in content.lines() {
eprintln!("{}", line.style(self.red));
}
}
FileChange::Update {
unified_diff,
move_path,
} => {
let header = if let Some(dest) = move_path {
format!(
"{} {} -> {}",
format_file_change(change),
path.to_string_lossy(),
dest.to_string_lossy()
)
} else {
format!("{} {}", format_file_change(change), path.to_string_lossy())
};
eprintln!("{}", header.style(self.magenta));
// Colorize diff lines. We keep file header lines
// (--- / +++) without extra coloring so they are
// still readable.
for diff_line in unified_diff.lines() {
if diff_line.starts_with('+') && !diff_line.starts_with("+++") {
eprintln!("{}", diff_line.style(self.green));
} else if diff_line.starts_with('-')
&& !diff_line.starts_with("---")
{
eprintln!("{}", diff_line.style(self.red));
} else {
eprintln!("{diff_line}");
}
}
}
}
}
}
EventMsg::PatchApplyEnd(PatchApplyEndEvent {
call_id,
stdout,
stderr,
success,
..
}) => {
let patch_begin = self.call_id_to_patch.remove(&call_id);
// Compute duration and summary label similar to exec commands.
let (duration, label) = if let Some(PatchApplyBegin {
start_time,
auto_approved,
}) = patch_begin
{
(
format!(" in {}", format_elapsed(start_time)),
format!("apply_patch(auto_approved={auto_approved})"),
)
} else {
(String::new(), format!("apply_patch('{call_id}')"))
};
let (exit_code, output, title_style) = if success {
(0, stdout, self.green)
} else {
(1, stderr, self.red)
};
let title = format!("{label} exited {exit_code}{duration}:");
ts_msg!(self, "{}", title.style(title_style));
for line in output.lines() {
eprintln!("{}", line.style(self.dimmed));
}
}
EventMsg::TurnDiff(TurnDiffEvent { unified_diff }) => {
ts_msg!(
self,
"{}",
"file update:".style(self.magenta).style(self.italic)
);
eprintln!("{unified_diff}");
}
EventMsg::AgentReasoning(agent_reasoning_event) => {
if self.show_agent_reasoning {
ts_msg!(
self,
"{}\n{}",
"thinking".style(self.italic).style(self.magenta),
agent_reasoning_event.text,
);
}
}
EventMsg::SessionConfigured(session_configured_event) => {
let SessionConfiguredEvent {
session_id: conversation_id,
model,
..
} = session_configured_event;
ts_msg!(
self,
"{} {}",
"codex session".style(self.magenta).style(self.bold),
conversation_id.to_string().style(self.dimmed)
);
ts_msg!(self, "model: {}", model);
eprintln!();
}
EventMsg::PlanUpdate(plan_update_event) => {
let UpdatePlanArgs { explanation, plan } = plan_update_event;
// Header
ts_msg!(self, "{}", "Plan update".style(self.magenta));
// Optional explanation
if let Some(explanation) = explanation
&& !explanation.trim().is_empty()
{
ts_msg!(self, "{}", explanation.style(self.italic));
}
// Pretty-print the plan items with simple status markers.
for item in plan {
match item.status {
StepStatus::Completed => {
ts_msg!(self, " {} {}", "".style(self.green), item.step);
}
StepStatus::InProgress => {
ts_msg!(self, " {} {}", "".style(self.cyan), item.step);
}
StepStatus::Pending => {
ts_msg!(
self,
" {} {}",
"".style(self.dimmed),
item.step.style(self.dimmed)
);
}
}
}
}
EventMsg::ViewImageToolCall(view) => {
ts_msg!(
self,
"{} {}",
"viewed image".style(self.magenta),
view.path.display()
);
}
EventMsg::TurnAborted(abort_reason) => {
match abort_reason.reason {
TurnAbortReason::Interrupted => {
ts_msg!(self, "task interrupted");
}
TurnAbortReason::Replaced => {
ts_msg!(self, "task aborted: replaced by a new task");
}
TurnAbortReason::ReviewEnded => {
ts_msg!(self, "task aborted: review ended");
}
}
return CodexStatus::InitiateShutdown;
}
EventMsg::ContextCompacted(_) => {
ts_msg!(self, "context compacted");
}
EventMsg::CollabAgentSpawnBegin(CollabAgentSpawnBeginEvent {
call_id,
sender_thread_id: _,
prompt,
}) => {
ts_msg!(
self,
"{} {}",
"collab".style(self.magenta),
format_collab_invocation("spawn_agent", &call_id, Some(&prompt))
.style(self.bold)
);
}
EventMsg::CollabAgentSpawnEnd(CollabAgentSpawnEndEvent {
call_id,
sender_thread_id: _,
new_thread_id,
prompt,
status,
..
}) => {
let success = new_thread_id.is_some() && !is_collab_status_failure(&status);
let title_style = if success { self.green } else { self.red };
let title = format!(
"{} {}:",
format_collab_invocation("spawn_agent", &call_id, Some(&prompt)),
format_collab_status(&status)
);
ts_msg!(self, "{}", title.style(title_style));
if let Some(new_thread_id) = new_thread_id {
eprintln!(" agent: {}", new_thread_id.to_string().style(self.dimmed));
}
}
EventMsg::CollabAgentInteractionBegin(CollabAgentInteractionBeginEvent {
call_id,
sender_thread_id: _,
receiver_thread_id,
prompt,
}) => {
ts_msg!(
self,
"{} {}",
"collab".style(self.magenta),
format_collab_invocation("send_input", &call_id, Some(&prompt))
.style(self.bold)
);
eprintln!(
" receiver: {}",
receiver_thread_id.to_string().style(self.dimmed)
);
}
EventMsg::CollabAgentInteractionEnd(CollabAgentInteractionEndEvent {
call_id,
sender_thread_id: _,
receiver_thread_id,
prompt,
status,
..
}) => {
let success = !is_collab_status_failure(&status);
let title_style = if success { self.green } else { self.red };
let title = format!(
"{} {}:",
format_collab_invocation("send_input", &call_id, Some(&prompt)),
format_collab_status(&status)
);
ts_msg!(self, "{}", title.style(title_style));
eprintln!(
" receiver: {}",
receiver_thread_id.to_string().style(self.dimmed)
);
}
EventMsg::CollabWaitingBegin(CollabWaitingBeginEvent {
sender_thread_id: _,
receiver_thread_ids,
call_id,
..
}) => {
ts_msg!(
self,
"{} {}",
"collab".style(self.magenta),
format_collab_invocation("wait", &call_id, None).style(self.bold)
);
eprintln!(
" receivers: {}",
format_receiver_list(&receiver_thread_ids).style(self.dimmed)
);
}
EventMsg::CollabWaitingEnd(CollabWaitingEndEvent {
sender_thread_id: _,
call_id,
statuses,
..
}) => {
if statuses.is_empty() {
ts_msg!(
self,
"{} {}:",
format_collab_invocation("wait", &call_id, None),
"timed out".style(self.yellow)
);
return CodexStatus::Running;
}
let success = !statuses.values().any(is_collab_status_failure);
let title_style = if success { self.green } else { self.red };
let title = format!(
"{} {} agents complete:",
format_collab_invocation("wait", &call_id, None),
statuses.len()
);
ts_msg!(self, "{}", title.style(title_style));
let mut sorted = statuses
.into_iter()
.map(|(thread_id, status)| (thread_id.to_string(), status))
.collect::<Vec<_>>();
sorted.sort_by(|(left, _), (right, _)| left.cmp(right));
for (thread_id, status) in sorted {
eprintln!(
" {} {}",
thread_id.style(self.dimmed),
format_collab_status(&status).style(style_for_agent_status(&status, self))
);
}
}
EventMsg::CollabCloseBegin(CollabCloseBeginEvent {
call_id,
sender_thread_id: _,
receiver_thread_id,
}) => {
ts_msg!(
self,
"{} {}",
"collab".style(self.magenta),
format_collab_invocation("close_agent", &call_id, None).style(self.bold)
);
eprintln!(
" receiver: {}",
receiver_thread_id.to_string().style(self.dimmed)
);
}
EventMsg::CollabCloseEnd(CollabCloseEndEvent {
call_id,
sender_thread_id: _,
receiver_thread_id,
status,
..
}) => {
let success = !is_collab_status_failure(&status);
let title_style = if success { self.green } else { self.red };
let title = format!(
"{} {}:",
format_collab_invocation("close_agent", &call_id, None),
format_collab_status(&status)
);
ts_msg!(self, "{}", title.style(title_style));
eprintln!(
" receiver: {}",
receiver_thread_id.to_string().style(self.dimmed)
);
}
EventMsg::ShutdownComplete => return CodexStatus::Shutdown,
EventMsg::ThreadNameUpdated(_)
| EventMsg::ExecApprovalRequest(_)
| EventMsg::ApplyPatchApprovalRequest(_)
| EventMsg::TerminalInteraction(_)
| EventMsg::ExecCommandOutputDelta(_)
| EventMsg::GetHistoryEntryResponse(_)
| EventMsg::McpListToolsResponse(_)
| EventMsg::ListCustomPromptsResponse(_)
| EventMsg::ListSkillsResponse(_)
| EventMsg::ListRemoteSkillsResponse(_)
| EventMsg::RemoteSkillDownloaded(_)
| EventMsg::RawResponseItem(_)
| EventMsg::UserMessage(_)
| EventMsg::EnteredReviewMode(_)
| EventMsg::ExitedReviewMode(_)
| EventMsg::AgentMessageDelta(_)
| EventMsg::AgentReasoningDelta(_)
| EventMsg::AgentReasoningRawContentDelta(_)
| EventMsg::ItemStarted(_)
| EventMsg::ItemCompleted(_)
| EventMsg::AgentMessageContentDelta(_)
| EventMsg::PlanDelta(_)
| EventMsg::ReasoningContentDelta(_)
| EventMsg::ReasoningRawContentDelta(_)
| EventMsg::SkillsUpdateAvailable
| EventMsg::UndoCompleted(_)
| EventMsg::UndoStarted(_)
| EventMsg::ThreadRolledBack(_)
| EventMsg::RequestUserInput(_)
| EventMsg::CollabResumeBegin(_)
| EventMsg::CollabResumeEnd(_)
| EventMsg::RealtimeConversationStarted(_)
| EventMsg::RealtimeConversationRealtime(_)
| EventMsg::RealtimeConversationClosed(_)
| EventMsg::DynamicToolCallRequest(_)
| EventMsg::DynamicToolCallResponse(_) => {}
}
CodexStatus::Running
}
fn print_final_output(&mut self) {
self.finish_progress_line();
if let Some(usage_info) = &self.last_total_token_usage {
eprintln!(
"{}\n{}",
"tokens used".style(self.magenta).style(self.italic),
format_with_separators(usage_info.total_token_usage.blended_total())
);
}
// In interactive terminals we already emitted the final assistant
// message on stderr during event processing. Preserve stdout emission
// only for non-interactive use so pipes and scripts still receive the
// final message.
#[allow(clippy::print_stdout)]
if should_print_final_message_to_stdout(
self.final_message.as_deref(),
std::io::stdout().is_terminal(),
std::io::stderr().is_terminal(),
) && let Some(message) = &self.final_message
{
if message.ends_with('\n') {
print!("{message}");
} else {
println!("{message}");
}
}
}
}
impl EventProcessorWithHumanOutput {
fn parse_agent_job_progress(message: &str) -> Option<AgentJobProgressMessage> {
let payload = message.strip_prefix("agent_job_progress:")?;
serde_json::from_str::<AgentJobProgressMessage>(payload).ok()
}
fn is_silent_event(msg: &EventMsg) -> bool {
matches!(
msg,
EventMsg::ThreadNameUpdated(_)
| EventMsg::TokenCount(_)
| EventMsg::TurnStarted(_)
| EventMsg::ExecApprovalRequest(_)
| EventMsg::ApplyPatchApprovalRequest(_)
| EventMsg::TerminalInteraction(_)
| EventMsg::ExecCommandOutputDelta(_)
| EventMsg::GetHistoryEntryResponse(_)
| EventMsg::McpListToolsResponse(_)
| EventMsg::ListCustomPromptsResponse(_)
| EventMsg::ListSkillsResponse(_)
| EventMsg::ListRemoteSkillsResponse(_)
| EventMsg::RemoteSkillDownloaded(_)
| EventMsg::RawResponseItem(_)
| EventMsg::UserMessage(_)
| EventMsg::EnteredReviewMode(_)
| EventMsg::ExitedReviewMode(_)
| EventMsg::AgentMessageDelta(_)
| EventMsg::AgentReasoningDelta(_)
| EventMsg::AgentReasoningRawContentDelta(_)
| EventMsg::ItemStarted(_)
| EventMsg::ItemCompleted(_)
| EventMsg::AgentMessageContentDelta(_)
| EventMsg::PlanDelta(_)
| EventMsg::ReasoningContentDelta(_)
| EventMsg::ReasoningRawContentDelta(_)
| EventMsg::SkillsUpdateAvailable
| EventMsg::UndoCompleted(_)
| EventMsg::UndoStarted(_)
| EventMsg::ThreadRolledBack(_)
| EventMsg::RequestUserInput(_)
| EventMsg::DynamicToolCallRequest(_)
| EventMsg::DynamicToolCallResponse(_)
)
}
fn should_interrupt_progress(msg: &EventMsg) -> bool {
matches!(
msg,
EventMsg::Error(_)
| EventMsg::Warning(_)
| EventMsg::DeprecationNotice(_)
| EventMsg::StreamError(_)
| EventMsg::TurnComplete(_)
| EventMsg::ShutdownComplete
)
}
fn finish_progress_line(&mut self) {
if self.progress_active {
self.progress_active = false;
self.progress_last_len = 0;
self.progress_done = false;
if self.use_ansi_cursor {
if self.progress_anchor {
eprintln!("\u{1b}[1A\u{1b}[1G\u{1b}[2K");
} else {
eprintln!("\u{1b}[1G\u{1b}[2K");
}
} else {
eprintln!();
}
self.progress_anchor = false;
}
}
fn render_agent_job_progress(&mut self, update: AgentJobProgressMessage) {
let total = update.total_items.max(1);
let processed = update.completed_items + update.failed_items;
let percent = (processed as f64 / total as f64 * 100.0).round() as i64;
let job_label = update.job_id.chars().take(8).collect::<String>();
let eta = update
.eta_seconds
.map(|secs| format_duration(Duration::from_secs(secs)))
.unwrap_or_else(|| "--".to_string());
let columns = std::env::var("COLUMNS")
.ok()
.and_then(|value| value.parse::<usize>().ok())
.filter(|value| *value > 0);
let line = format_agent_job_progress_line(
columns,
job_label.as_str(),
AgentJobProgressStats {
processed,
total,
percent,
failed: update.failed_items,
running: update.running_items,
pending: update.pending_items,
},
eta.as_str(),
);
let done = processed >= update.total_items;
if !self.use_ansi_cursor {
eprintln!("{line}");
if done {
self.progress_active = false;
self.progress_last_len = 0;
}
return;
}
if done && self.progress_done {
return;
}
if !self.progress_active {
eprintln!();
self.progress_anchor = true;
self.progress_done = false;
}
let mut output = String::new();
if self.progress_anchor {
output.push_str("\u{1b}[1A\u{1b}[1G\u{1b}[2K");
} else {
output.push_str("\u{1b}[1G\u{1b}[2K");
}
output.push_str(&line);
if done {
output.push('\n');
eprint!("{output}");
self.progress_active = false;
self.progress_last_len = 0;
self.progress_anchor = false;
self.progress_done = true;
return;
}
eprint!("{output}");
let _ = std::io::stderr().flush();
self.progress_active = true;
self.progress_last_len = line.len();
}
}
fn should_print_final_message_to_stdout(
final_message: Option<&str>,
stdout_is_terminal: bool,
stderr_is_terminal: bool,
) -> bool {
final_message.is_some() && !(stdout_is_terminal && stderr_is_terminal)
}
struct AgentJobProgressStats {
processed: usize,
total: usize,
percent: i64,
failed: usize,
running: usize,
pending: usize,
}
fn format_agent_job_progress_line(
columns: Option<usize>,
job_label: &str,
stats: AgentJobProgressStats,
eta: &str,
) -> String {
let rest = format!(
"{processed}/{total} {percent}% f{failed} r{running} p{pending} eta {eta}",
processed = stats.processed,
total = stats.total,
percent = stats.percent,
failed = stats.failed,
running = stats.running,
pending = stats.pending
);
let prefix = format!("job {job_label}");
let base_len = prefix.len() + rest.len() + 4;
let mut bar_width = columns
.and_then(|columns| columns.checked_sub(base_len))
.filter(|available| *available > 0)
.unwrap_or(20usize);
let with_bar = |width: usize| {
let filled = ((stats.processed as f64 / stats.total as f64) * width as f64)
.round()
.clamp(0.0, width as f64) as usize;
let mut bar = "#".repeat(filled);
bar.push_str(&"-".repeat(width - filled));
format!("{prefix} [{bar}] {rest}")
};
let mut line = with_bar(bar_width);
if let Some(columns) = columns
&& line.len() > columns
{
let min_line = format!("{prefix} {rest}");
if min_line.len() > columns {
let mut truncated = min_line;
if columns > 2 && truncated.len() > columns {
truncated.truncate(columns - 2);
truncated.push_str("..");
}
return truncated;
}
let available = columns.saturating_sub(base_len);
if available == 0 {
return min_line;
}
bar_width = available.min(bar_width).max(1);
line = with_bar(bar_width);
}
line
}
fn escape_command(command: &[String]) -> String {
try_join(command.iter().map(String::as_str)).unwrap_or_else(|_| command.join(" "))
}
fn format_file_change(change: &FileChange) -> &'static str {
match change {
FileChange::Add { .. } => "A",
FileChange::Delete { .. } => "D",
FileChange::Update {
move_path: Some(_), ..
} => "R",
FileChange::Update {
move_path: None, ..
} => "M",
}
}
fn format_collab_invocation(tool: &str, call_id: &str, prompt: Option<&str>) -> String {
let prompt = prompt
.map(str::trim)
.filter(|prompt| !prompt.is_empty())
.map(|prompt| truncate_preview(prompt, 120));
match prompt {
Some(prompt) => format!("{tool}({call_id}, prompt=\"{prompt}\")"),
None => format!("{tool}({call_id})"),
}
}
fn format_collab_status(status: &AgentStatus) -> String {
match status {
AgentStatus::PendingInit => "pending init".to_string(),
AgentStatus::Running => "running".to_string(),
AgentStatus::Completed(Some(message)) => {
let preview = truncate_preview(message.trim(), 120);
if preview.is_empty() {
"completed".to_string()
} else {
format!("completed: \"{preview}\"")
}
}
AgentStatus::Completed(None) => "completed".to_string(),
AgentStatus::Errored(message) => {
let preview = truncate_preview(message.trim(), 120);
if preview.is_empty() {
"errored".to_string()
} else {
format!("errored: \"{preview}\"")
}
}
AgentStatus::Shutdown => "shutdown".to_string(),
AgentStatus::NotFound => "not found".to_string(),
}
}
fn style_for_agent_status(
status: &AgentStatus,
processor: &EventProcessorWithHumanOutput,
) -> Style {
match status {
AgentStatus::PendingInit | AgentStatus::Shutdown => processor.dimmed,
AgentStatus::Running => processor.cyan,
AgentStatus::Completed(_) => processor.green,
AgentStatus::Errored(_) | AgentStatus::NotFound => processor.red,
}
}
fn is_collab_status_failure(status: &AgentStatus) -> bool {
matches!(status, AgentStatus::Errored(_) | AgentStatus::NotFound)
}
fn format_receiver_list(ids: &[codex_protocol::ThreadId]) -> String {
if ids.is_empty() {
return "none".to_string();
}
ids.iter()
.map(ToString::to_string)
.collect::<Vec<_>>()
.join(", ")
}
fn truncate_preview(text: &str, max_chars: usize) -> String {
if text.chars().count() <= max_chars {
return text.to_string();
}
let preview = text.chars().take(max_chars).collect::<String>();
format!("{preview}")
}
fn format_mcp_invocation(invocation: &McpInvocation) -> String {
// Build fully-qualified tool name: server.tool
let fq_tool_name = format!("{}.{}", invocation.server, invocation.tool);
// Format arguments as compact JSON so they fit on one line.
let args_str = invocation
.arguments
.as_ref()
.map(|v: &serde_json::Value| serde_json::to_string(v).unwrap_or_else(|_| v.to_string()))
.unwrap_or_default();
if args_str.is_empty() {
format!("{fq_tool_name}()")
} else {
format!("{fq_tool_name}({args_str})")
}
}
#[cfg(test)]
mod tests {
use super::should_print_final_message_to_stdout;
use pretty_assertions::assert_eq;
#[test]
fn suppresses_final_stdout_message_when_both_streams_are_terminals() {
assert_eq!(
should_print_final_message_to_stdout(Some("hello"), true, true),
false
);
}
#[test]
fn prints_final_stdout_message_when_stdout_is_not_terminal() {
assert_eq!(
should_print_final_message_to_stdout(Some("hello"), false, true),
true
);
}
#[test]
fn prints_final_stdout_message_when_stderr_is_not_terminal() {
assert_eq!(
should_print_final_message_to_stdout(Some("hello"), true, false),
true
);
}
#[test]
fn does_not_print_when_message_is_missing() {
assert_eq!(
should_print_final_message_to_stdout(None, false, false),
false
);
}
}