Compare commits

...

4 Commits

Author SHA1 Message Date
pakrym-oai
933d8dc3e8 tests: fix module ordering for output_last_message in test suite 2025-10-02 16:16:49 -07:00
pakrym-oai
0a2f502de9 Merge branch 'main' into pakrym/allow-output-last-message-to-print-to-stdout
# Conflicts:
#	codex-rs/exec/src/event_processor_with_human_output.rs
#	codex-rs/exec/src/event_processor_with_jsonl_output.rs
#	codex-rs/exec/tests/event_processor_with_json_output.rs
#	codex-rs/exec/tests/suite/mod.rs
2025-10-02 16:11:23 -07:00
pakrym-oai
411c8607d4 cli: add short option '-o' for --output-last-message argument 2025-10-01 22:15:57 -07:00
pakrym-oai
f8f66adeac Allow --output-last-message to print to stdout 2025-10-01 18:02:18 -07:00
9 changed files with 198 additions and 77 deletions

View File

@@ -72,8 +72,8 @@ pub struct Cli {
pub include_plan_tool: bool,
/// Specifies file where the last message from the agent should be written.
#[arg(long = "output-last-message")]
pub last_message_file: Option<PathBuf>,
#[arg(long = "output-last-message", short = 'o', num_args(0..=1))]
pub last_message_file: Option<Option<PathBuf>>,
/// Initial instructions for the agent. If not provided as an argument (or
/// if `-` is used), instructions are read from stdin.

View File

@@ -1,4 +1,4 @@
use std::path::Path;
use std::path::PathBuf;
use codex_core::config::Config;
use codex_core::protocol::Event;
@@ -14,10 +14,11 @@ pub(crate) trait EventProcessor {
/// Print summary of effective configuration and user prompt.
fn print_config_summary(
&mut self,
config: &Config,
prompt: &str,
session_configured: &SessionConfiguredEvent,
);
_config: &Config,
_prompt: &str,
_session_configured: &SessionConfiguredEvent,
) {
}
/// Handle a single event emitted by the agent.
fn process_event(&mut self, event: Event) -> CodexStatus;
@@ -25,21 +26,24 @@ pub(crate) trait EventProcessor {
fn print_final_output(&mut self) {}
}
pub(crate) fn handle_last_message(last_agent_message: Option<&str>, output_file: &Path) {
pub(crate) fn handle_last_message(
last_agent_message: Option<String>,
output_file: Option<Option<PathBuf>>,
) {
let message = last_agent_message.unwrap_or_default();
write_last_message_file(message, Some(output_file));
if last_agent_message.is_none() {
eprintln!(
"Warning: no last agent message; wrote empty content to {}",
output_file.display()
);
if message.is_empty() && output_file.is_some() {
eprintln!("Warning: no last agent message; wrote empty content to {output_file:?}");
}
match output_file {
Some(Some(path)) => write_last_message_file(&message, &path),
Some(None) => println!("{message}"),
_ => (),
}
}
fn write_last_message_file(contents: &str, last_message_path: Option<&Path>) {
if let Some(path) = last_message_path
&& let Err(e) = std::fs::write(path, contents)
{
eprintln!("Failed to write last message file {path:?}: {e}");
fn write_last_message_file(contents: &str, last_message_path: &PathBuf) {
if let Err(e) = std::fs::write(last_message_path, contents) {
eprintln!("Failed to write last message file {last_message_path:?}: {e}");
}
}

View File

@@ -0,0 +1,18 @@
use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
use crate::event_processor::CodexStatus;
use crate::event_processor::EventProcessor;
pub(crate) struct EventProcessorNoop {}
impl EventProcessor for EventProcessorNoop {
fn process_event(&mut self, event: Event) -> CodexStatus {
let Event { id: _, msg } = event;
match msg {
EventMsg::ShutdownComplete => CodexStatus::Shutdown,
EventMsg::TaskComplete(_) => CodexStatus::InitiateShutdown,
_ => CodexStatus::Running,
}
}
}

View File

@@ -18,7 +18,6 @@ use codex_core::protocol::PatchApplyBeginEvent;
use codex_core::protocol::PatchApplyEndEvent;
use codex_core::protocol::SessionConfiguredEvent;
use codex_core::protocol::StreamErrorEvent;
use codex_core::protocol::TaskCompleteEvent;
use codex_core::protocol::TurnAbortReason;
use codex_core::protocol::TurnDiffEvent;
use codex_core::protocol::WebSearchBeginEvent;
@@ -28,12 +27,10 @@ use owo_colors::OwoColorize;
use owo_colors::Style;
use shlex::try_join;
use std::collections::HashMap;
use std::path::PathBuf;
use std::time::Instant;
use crate::event_processor::CodexStatus;
use crate::event_processor::EventProcessor;
use crate::event_processor::handle_last_message;
use codex_common::create_config_summary_entries;
/// This should be configurable. When used in CI, users may not want to impose
@@ -57,16 +54,11 @@ pub(crate) struct EventProcessorWithHumanOutput {
/// Whether to include `AgentReasoning` events in the output.
show_agent_reasoning: bool,
show_raw_agent_reasoning: bool,
last_message_path: Option<PathBuf>,
last_total_token_usage: Option<codex_core::protocol::TokenUsageInfo>,
}
impl EventProcessorWithHumanOutput {
pub(crate) fn create_with_ansi(
with_ansi: bool,
config: &Config,
last_message_path: Option<PathBuf>,
) -> Self {
pub(crate) fn create_with_ansi(with_ansi: bool, config: &Config) -> Self {
let call_id_to_patch = HashMap::new();
if with_ansi {
@@ -81,7 +73,6 @@ impl EventProcessorWithHumanOutput {
cyan: Style::new().cyan(),
show_agent_reasoning: !config.hide_agent_reasoning,
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
last_message_path,
last_total_token_usage: None,
}
} else {
@@ -96,7 +87,6 @@ impl EventProcessorWithHumanOutput {
cyan: Style::new(),
show_agent_reasoning: !config.hide_agent_reasoning,
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
last_message_path,
last_total_token_usage: None,
}
}
@@ -167,12 +157,6 @@ impl EventProcessor for EventProcessorWithHumanOutput {
EventMsg::TaskStarted(_) => {
// Ignore.
}
EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }) => {
if let Some(output_file) = self.last_message_path.as_deref() {
handle_last_message(last_agent_message.as_deref(), output_file);
}
return CodexStatus::InitiateShutdown;
}
EventMsg::TokenCount(ev) => {
self.last_total_token_usage = ev.info;
}
@@ -504,6 +488,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
}
},
EventMsg::ShutdownComplete => return CodexStatus::Shutdown,
EventMsg::TaskComplete(_) => return CodexStatus::InitiateShutdown,
EventMsg::ConversationPath(_) => {}
EventMsg::UserMessage(_) => {}
EventMsg::EnteredReviewMode(_) => {}

View File

@@ -1,10 +1,8 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::atomic::AtomicU64;
use crate::event_processor::CodexStatus;
use crate::event_processor::EventProcessor;
use crate::event_processor::handle_last_message;
use crate::exec_events::AgentMessageItem;
use crate::exec_events::CommandExecutionItem;
use crate::exec_events::CommandExecutionStatus;
@@ -45,14 +43,12 @@ use codex_core::protocol::McpToolCallEndEvent;
use codex_core::protocol::PatchApplyBeginEvent;
use codex_core::protocol::PatchApplyEndEvent;
use codex_core::protocol::SessionConfiguredEvent;
use codex_core::protocol::TaskCompleteEvent;
use codex_core::protocol::TaskStartedEvent;
use codex_core::protocol::WebSearchEndEvent;
use tracing::error;
use tracing::warn;
pub struct EventProcessorWithJsonOutput {
last_message_path: Option<PathBuf>,
next_event_id: AtomicU64,
// Tracks running commands by call_id, including the associated item id.
running_commands: HashMap<String, RunningCommand>,
@@ -84,19 +80,6 @@ struct RunningMcpToolCall {
}
impl EventProcessorWithJsonOutput {
pub fn new(last_message_path: Option<PathBuf>) -> Self {
Self {
last_message_path,
next_event_id: AtomicU64::new(0),
running_commands: HashMap::new(),
running_patch_applies: HashMap::new(),
running_todo_list: None,
last_total_token_usage: None,
running_mcp_tool_calls: HashMap::new(),
last_critical_error: None,
}
}
pub fn collect_thread_events(&mut self, event: &Event) -> Vec<ThreadEvent> {
match &event.msg {
EventMsg::SessionConfigured(ev) => self.handle_session_configured(ev),
@@ -420,6 +403,20 @@ impl EventProcessorWithJsonOutput {
}
}
impl Default for EventProcessorWithJsonOutput {
fn default() -> Self {
Self {
next_event_id: AtomicU64::new(0),
running_commands: HashMap::new(),
running_patch_applies: HashMap::new(),
running_todo_list: None,
last_total_token_usage: None,
running_mcp_tool_calls: HashMap::new(),
last_critical_error: None,
}
}
}
impl EventProcessor for EventProcessorWithJsonOutput {
fn print_config_summary(&mut self, _: &Config, _: &str, ev: &SessionConfiguredEvent) {
self.process_event(Event {
@@ -443,10 +440,7 @@ impl EventProcessor for EventProcessorWithJsonOutput {
let Event { msg, .. } = event;
if let EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }) = msg {
if let Some(output_file) = self.last_message_path.as_deref() {
handle_last_message(last_agent_message.as_deref(), output_file);
}
if matches!(msg, EventMsg::TaskComplete(_)) {
CodexStatus::InitiateShutdown
} else {
CodexStatus::Running

View File

@@ -1,5 +1,6 @@
mod cli;
mod event_processor;
pub mod event_processor_noop;
mod event_processor_with_human_output;
pub mod event_processor_with_jsonl_output;
pub mod exec_events;
@@ -37,6 +38,8 @@ use tracing_subscriber::prelude::*;
use crate::cli::Command as ExecCommand;
use crate::event_processor::CodexStatus;
use crate::event_processor::EventProcessor;
use crate::event_processor::handle_last_message;
use crate::event_processor_noop::EventProcessorNoop;
use codex_core::default_client::set_default_originator;
use codex_core::find_conversation_path_by_id_str;
@@ -211,14 +214,18 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
}
let mut event_processor: Box<dyn EventProcessor> = match json_mode {
true => Box::new(EventProcessorWithJsonOutput::new(last_message_file.clone())),
true => Box::new(EventProcessorWithJsonOutput::default()),
_ => Box::new(EventProcessorWithHumanOutput::create_with_ansi(
stdout_with_ansi,
&config,
last_message_file.clone(),
)),
};
// Output the last message to stdout
if let Some(None) = last_message_file {
event_processor = Box::new(EventProcessorNoop {});
}
if oss {
codex_ollama::ensure_oss_ready(&config)
.await
@@ -349,10 +356,14 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
// Track whether a fatal error was reported by the server so we can
// exit with a non-zero status for automation-friendly signaling.
let mut error_seen = false;
let mut last_message = None;
while let Some(event) = rx.recv().await {
if matches!(event.msg, EventMsg::Error(_)) {
error_seen = true;
}
if let EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }) = &event.msg {
last_message = last_agent_message.clone();
}
let shutdown: CodexStatus = event_processor.process_event(event);
match shutdown {
CodexStatus::Running => continue,
@@ -369,6 +380,8 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
std::process::exit(1);
}
handle_last_message(last_message, last_message_file);
Ok(())
}

View File

@@ -51,7 +51,7 @@ fn event(id: &str, msg: EventMsg) -> Event {
#[test]
fn session_configured_produces_thread_started_event() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let mut ep = EventProcessorWithJsonOutput::default();
let session_id =
codex_protocol::ConversationId::from_string("67e55044-10b1-426f-9247-bb680e5fe0c8")
.unwrap();
@@ -79,7 +79,7 @@ fn session_configured_produces_thread_started_event() {
#[test]
fn task_started_produces_turn_started_event() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let mut ep = EventProcessorWithJsonOutput::default();
let out = ep.collect_thread_events(&event(
"t1",
EventMsg::TaskStarted(codex_core::protocol::TaskStartedEvent {
@@ -92,7 +92,7 @@ fn task_started_produces_turn_started_event() {
#[test]
fn web_search_end_emits_item_completed() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let mut ep = EventProcessorWithJsonOutput::default();
let query = "rust async await".to_string();
let out = ep.collect_thread_events(&event(
"w1",
@@ -119,7 +119,7 @@ fn plan_update_emits_todo_list_started_updated_and_completed() {
use codex_core::plan_tool::StepStatus;
use codex_core::plan_tool::UpdatePlanArgs;
let mut ep = EventProcessorWithJsonOutput::new(None);
let mut ep = EventProcessorWithJsonOutput::default();
// First plan update => item.started (todo_list)
let first = event(
@@ -236,7 +236,7 @@ fn plan_update_emits_todo_list_started_updated_and_completed() {
#[test]
fn mcp_tool_call_begin_and_end_emit_item_events() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let mut ep = EventProcessorWithJsonOutput::default();
let invocation = McpInvocation {
server: "server_a".to_string(),
tool: "tool_x".to_string(),
@@ -296,7 +296,7 @@ fn mcp_tool_call_begin_and_end_emit_item_events() {
#[test]
fn mcp_tool_call_failure_sets_failed_status() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let mut ep = EventProcessorWithJsonOutput::default();
let invocation = McpInvocation {
server: "server_b".to_string(),
tool: "tool_y".to_string(),
@@ -343,7 +343,7 @@ fn plan_update_after_complete_starts_new_todo_list_with_new_id() {
use codex_core::plan_tool::StepStatus;
use codex_core::plan_tool::UpdatePlanArgs;
let mut ep = EventProcessorWithJsonOutput::new(None);
let mut ep = EventProcessorWithJsonOutput::default();
// First turn: start + complete
let start = event(
@@ -388,7 +388,7 @@ fn plan_update_after_complete_starts_new_todo_list_with_new_id() {
#[test]
fn agent_reasoning_produces_item_completed_reasoning() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let mut ep = EventProcessorWithJsonOutput::default();
let ev = event(
"e1",
EventMsg::AgentReasoning(AgentReasoningEvent {
@@ -411,7 +411,7 @@ fn agent_reasoning_produces_item_completed_reasoning() {
#[test]
fn agent_message_produces_item_completed_agent_message() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let mut ep = EventProcessorWithJsonOutput::default();
let ev = event(
"e1",
EventMsg::AgentMessage(AgentMessageEvent {
@@ -434,7 +434,7 @@ fn agent_message_produces_item_completed_agent_message() {
#[test]
fn error_event_produces_error() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let mut ep = EventProcessorWithJsonOutput::default();
let out = ep.collect_thread_events(&event(
"e1",
EventMsg::Error(codex_core::protocol::ErrorEvent {
@@ -451,7 +451,7 @@ fn error_event_produces_error() {
#[test]
fn stream_error_event_produces_error() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let mut ep = EventProcessorWithJsonOutput::default();
let out = ep.collect_thread_events(&event(
"e1",
EventMsg::StreamError(codex_core::protocol::StreamErrorEvent {
@@ -468,7 +468,7 @@ fn stream_error_event_produces_error() {
#[test]
fn error_followed_by_task_complete_produces_turn_failed() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let mut ep = EventProcessorWithJsonOutput::default();
let error_event = event(
"e1",
@@ -501,7 +501,7 @@ fn error_followed_by_task_complete_produces_turn_failed() {
#[test]
fn exec_command_end_success_produces_completed_command_item() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let mut ep = EventProcessorWithJsonOutput::default();
// Begin -> no output
let begin = event(
@@ -561,7 +561,7 @@ fn exec_command_end_success_produces_completed_command_item() {
#[test]
fn exec_command_end_failure_produces_failed_command_item() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let mut ep = EventProcessorWithJsonOutput::default();
// Begin -> no output
let begin = event(
@@ -620,7 +620,7 @@ fn exec_command_end_failure_produces_failed_command_item() {
#[test]
fn exec_command_end_without_begin_is_ignored() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let mut ep = EventProcessorWithJsonOutput::default();
// End event arrives without a prior Begin; should produce no thread events.
let end_only = event(
@@ -641,7 +641,7 @@ fn exec_command_end_without_begin_is_ignored() {
#[test]
fn patch_apply_success_produces_item_completed_patchapply() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let mut ep = EventProcessorWithJsonOutput::default();
// Prepare a patch with multiple kinds of changes
let mut changes = std::collections::HashMap::new();
@@ -723,7 +723,7 @@ fn patch_apply_success_produces_item_completed_patchapply() {
#[test]
fn patch_apply_failure_produces_item_completed_patchapply_failed() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let mut ep = EventProcessorWithJsonOutput::default();
let mut changes = std::collections::HashMap::new();
changes.insert(
@@ -777,7 +777,7 @@ fn patch_apply_failure_produces_item_completed_patchapply_failed() {
#[test]
fn task_complete_produces_turn_completed_with_usage() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let mut ep = EventProcessorWithJsonOutput::default();
// First, feed a TokenCount event with known totals.
let usage = codex_core::protocol::TokenUsage {

View File

@@ -1,6 +1,7 @@
// Aggregates all former standalone integration tests as modules.
mod apply_patch;
mod auth_env;
mod output_last_message;
mod output_schema;
mod resume;
mod sandbox;

View File

@@ -0,0 +1,106 @@
#![cfg(not(target_os = "windows"))]
#![allow(clippy::expect_used, clippy::unwrap_used)]
use core_test_support::responses;
use core_test_support::test_codex_exec::test_codex_exec;
use wiremock::matchers::any;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exec_includes_output_last_message_in_request() -> anyhow::Result<()> {
let test = test_codex_exec();
let last_message_path = test.cwd_path().join("last_message.txt");
let server = responses::start_mock_server().await;
let body = responses::sse(vec![
serde_json::json!({
"type": "response.created",
"response": {"id": "resp1"}
}),
responses::ev_assistant_message("m1", "fixture hello"),
responses::ev_completed("resp1"),
]);
responses::mount_sse_once_match(&server, any(), body).await;
test.cmd_with_server(&server)
.arg("--skip-git-repo-check")
// keep using -C in the test to exercise the flag as well
.arg("-C")
.arg(test.cwd_path())
.arg("--output-last-message")
.arg(&last_message_path)
.arg("tell me a joke")
.assert()
.success();
assert_eq!(
std::fs::read_to_string(&last_message_path)?,
"fixture hello"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exec_includes_output_last_message_in_request_json() -> anyhow::Result<()> {
let test = test_codex_exec();
let last_message_path = test.cwd_path().join("last_message.txt");
let server = responses::start_mock_server().await;
let body = responses::sse(vec![
serde_json::json!({
"type": "response.created",
"response": {"id": "resp1"}
}),
responses::ev_assistant_message("m1", "fixture hello"),
responses::ev_completed("resp1"),
]);
responses::mount_sse_once_match(&server, any(), body).await;
test.cmd_with_server(&server)
.arg("--skip-git-repo-check")
// keep using -C in the test to exercise the flag as well
.arg("-C")
.arg(test.cwd_path())
.arg("--output-last-message")
.arg(&last_message_path)
.arg("--json")
.arg("tell me a joke")
.assert()
.success();
assert_eq!(
std::fs::read_to_string(&last_message_path)?,
"fixture hello"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exec_includes_output_last_message_in_request_stdout() -> anyhow::Result<()> {
let test = test_codex_exec();
let server = responses::start_mock_server().await;
let body = responses::sse(vec![
serde_json::json!({
"type": "response.created",
"response": {"id": "resp1"}
}),
responses::ev_assistant_message("m1", "fixture hello"),
responses::ev_completed("resp1"),
]);
responses::mount_sse_once_match(&server, any(), body).await;
test.cmd_with_server(&server)
.arg("--skip-git-repo-check")
// keep using -C in the test to exercise the flag as well
.arg("-C")
.arg(test.cwd_path())
.arg("tell me a joke")
.arg("--output-last-message")
.assert()
.success()
.stdout("fixture hello\n");
Ok(())
}