mirror of
https://github.com/openai/codex.git
synced 2026-05-02 10:26:45 +00:00
## Summary Fixes #19022. `codex exec --json` currently emits `turn.completed.usage` with input, cached input, and output token counts, but drops the reasoning-token split that Codex already receives through thread token usage updates. Programmatic consumers that rely on the JSON stream, especially ephemeral runs that do not write rollout files, need this field to accurately display reasoning-model usage. This PR adds `reasoning_output_tokens` to the public exec JSON `Usage` payload and maps it from the existing `ThreadTokenUsageUpdated` total token usage data. ## Verification - Added coverage to `event_processor_with_json_output::token_usage_update_is_emitted_on_turn_completion` so `turn.completed.usage.reasoning_output_tokens` is asserted. - Updated SDK expectations for `run()` and `runStreamed()` so TypeScript consumers see the new usage field. - Ran `cargo test -p codex-exec`. - Ran `pnpm --filter ./sdk/typescript run build`. - Ran `pnpm --filter ./sdk/typescript run lint`. - Ran `pnpm --filter ./sdk/typescript exec jest --runInBand --testTimeout=30000`.
1614 lines
58 KiB
Rust
1614 lines
58 KiB
Rust
use codex_app_server_protocol::CollabAgentState as ApiCollabAgentState;
|
|
use codex_app_server_protocol::CollabAgentStatus as ApiCollabAgentStatus;
|
|
use codex_app_server_protocol::CollabAgentTool;
|
|
use codex_app_server_protocol::CollabAgentToolCallStatus as ApiCollabAgentToolCallStatus;
|
|
use codex_app_server_protocol::CommandAction;
|
|
use codex_app_server_protocol::CommandExecutionSource;
|
|
use codex_app_server_protocol::CommandExecutionStatus as ApiCommandExecutionStatus;
|
|
use codex_app_server_protocol::ErrorNotification;
|
|
use codex_app_server_protocol::FileUpdateChange as ApiFileUpdateChange;
|
|
use codex_app_server_protocol::ItemCompletedNotification;
|
|
use codex_app_server_protocol::ItemStartedNotification;
|
|
use codex_app_server_protocol::McpToolCallError;
|
|
use codex_app_server_protocol::McpToolCallResult;
|
|
use codex_app_server_protocol::McpToolCallStatus as ApiMcpToolCallStatus;
|
|
use codex_app_server_protocol::PatchApplyStatus as ApiPatchApplyStatus;
|
|
use codex_app_server_protocol::PatchChangeKind as ApiPatchChangeKind;
|
|
use codex_app_server_protocol::ServerNotification;
|
|
use codex_app_server_protocol::ThreadItem;
|
|
use codex_app_server_protocol::ThreadTokenUsage;
|
|
use codex_app_server_protocol::TokenUsageBreakdown;
|
|
use codex_app_server_protocol::Turn;
|
|
use codex_app_server_protocol::TurnCompletedNotification;
|
|
use codex_app_server_protocol::TurnError;
|
|
use codex_app_server_protocol::TurnPlanStep;
|
|
use codex_app_server_protocol::TurnPlanStepStatus;
|
|
use codex_app_server_protocol::TurnPlanUpdatedNotification;
|
|
use codex_app_server_protocol::TurnStartedNotification;
|
|
use codex_app_server_protocol::TurnStatus;
|
|
use codex_app_server_protocol::WebSearchAction as ApiWebSearchAction;
|
|
use codex_protocol::ThreadId;
|
|
use codex_protocol::models::WebSearchAction;
|
|
use codex_protocol::protocol::AskForApproval;
|
|
use codex_protocol::protocol::SandboxPolicy;
|
|
use codex_protocol::protocol::SessionConfiguredEvent;
|
|
use codex_utils_absolute_path::test_support::PathBufExt;
|
|
use codex_utils_absolute_path::test_support::test_path_buf;
|
|
use pretty_assertions::assert_eq;
|
|
use serde_json::json;
|
|
|
|
use codex_exec::AgentMessageItem;
|
|
use codex_exec::CodexStatus;
|
|
use codex_exec::CollabAgentState;
|
|
use codex_exec::CollabAgentStatus;
|
|
use codex_exec::CollabTool;
|
|
use codex_exec::CollabToolCallItem;
|
|
use codex_exec::CollabToolCallStatus;
|
|
use codex_exec::CollectedThreadEvents;
|
|
use codex_exec::CommandExecutionItem;
|
|
use codex_exec::CommandExecutionStatus;
|
|
use codex_exec::ErrorItem;
|
|
use codex_exec::EventProcessorWithJsonOutput;
|
|
use codex_exec::ExecThreadItem;
|
|
use codex_exec::FileChangeItem;
|
|
use codex_exec::FileUpdateChange as ExecFileUpdateChange;
|
|
use codex_exec::ItemCompletedEvent;
|
|
use codex_exec::ItemStartedEvent;
|
|
use codex_exec::ItemUpdatedEvent;
|
|
use codex_exec::McpToolCallItem;
|
|
use codex_exec::McpToolCallItemError;
|
|
use codex_exec::McpToolCallItemResult;
|
|
use codex_exec::McpToolCallStatus;
|
|
use codex_exec::PatchApplyStatus;
|
|
use codex_exec::PatchChangeKind;
|
|
use codex_exec::ReasoningItem;
|
|
use codex_exec::ThreadErrorEvent;
|
|
use codex_exec::ThreadEvent;
|
|
use codex_exec::ThreadItemDetails;
|
|
use codex_exec::ThreadStartedEvent;
|
|
use codex_exec::TodoItem;
|
|
use codex_exec::TodoListItem;
|
|
use codex_exec::TurnCompletedEvent;
|
|
use codex_exec::TurnFailedEvent;
|
|
use codex_exec::TurnStartedEvent;
|
|
use codex_exec::Usage;
|
|
use codex_exec::WebSearchItem;
|
|
|
|
#[test]
|
|
fn map_todo_items_preserves_text_and_completion_state() {
|
|
let items = EventProcessorWithJsonOutput::map_todo_items(&[
|
|
TurnPlanStep {
|
|
step: "inspect bootstrap".to_string(),
|
|
status: TurnPlanStepStatus::InProgress,
|
|
},
|
|
TurnPlanStep {
|
|
step: "drop legacy notifications".to_string(),
|
|
status: TurnPlanStepStatus::Completed,
|
|
},
|
|
]);
|
|
|
|
assert_eq!(
|
|
items,
|
|
vec![
|
|
TodoItem {
|
|
text: "inspect bootstrap".to_string(),
|
|
completed: false,
|
|
},
|
|
TodoItem {
|
|
text: "drop legacy notifications".to_string(),
|
|
completed: true,
|
|
},
|
|
]
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn session_configured_produces_thread_started_event() {
|
|
let session_configured = SessionConfiguredEvent {
|
|
session_id: ThreadId::from_string("67e55044-10b1-426f-9247-bb680e5fe0c8")
|
|
.expect("thread id should parse"),
|
|
forked_from_id: None,
|
|
thread_name: None,
|
|
model: "codex-mini-latest".to_string(),
|
|
model_provider_id: "test-provider".to_string(),
|
|
service_tier: None,
|
|
approval_policy: AskForApproval::Never,
|
|
approvals_reviewer: codex_protocol::config_types::ApprovalsReviewer::User,
|
|
sandbox_policy: SandboxPolicy::new_read_only_policy(),
|
|
permission_profile: None,
|
|
cwd: test_path_buf("/tmp/project").abs(),
|
|
reasoning_effort: None,
|
|
history_log_id: 0,
|
|
history_entry_count: 0,
|
|
initial_messages: None,
|
|
network_proxy: None,
|
|
rollout_path: None,
|
|
};
|
|
|
|
assert_eq!(
|
|
EventProcessorWithJsonOutput::thread_started_event(&session_configured),
|
|
ThreadEvent::ThreadStarted(ThreadStartedEvent {
|
|
thread_id: "67e55044-10b1-426f-9247-bb680e5fe0c8".to_string(),
|
|
})
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn turn_started_emits_turn_started_event() {
|
|
let mut processor = EventProcessorWithJsonOutput::new(/*last_message_path*/ None);
|
|
|
|
let collected =
|
|
processor.collect_thread_events(ServerNotification::TurnStarted(TurnStartedNotification {
|
|
thread_id: "thread-1".to_string(),
|
|
turn: Turn {
|
|
id: "turn-1".to_string(),
|
|
items: Vec::new(),
|
|
status: TurnStatus::InProgress,
|
|
error: None,
|
|
started_at: None,
|
|
completed_at: None,
|
|
duration_ms: None,
|
|
},
|
|
}));
|
|
|
|
assert_eq!(
|
|
collected,
|
|
CollectedThreadEvents {
|
|
events: vec![ThreadEvent::TurnStarted(TurnStartedEvent {})],
|
|
status: CodexStatus::Running,
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn command_execution_started_and_completed_translate_to_thread_events() {
|
|
let mut processor = EventProcessorWithJsonOutput::new(/*last_message_path*/ None);
|
|
let command_item = ThreadItem::CommandExecution {
|
|
id: "cmd-1".to_string(),
|
|
command: "ls".to_string(),
|
|
cwd: test_path_buf("/tmp/project").abs(),
|
|
process_id: Some("123".to_string()),
|
|
source: CommandExecutionSource::UserShell,
|
|
status: ApiCommandExecutionStatus::InProgress,
|
|
command_actions: Vec::<CommandAction>::new(),
|
|
aggregated_output: None,
|
|
exit_code: None,
|
|
duration_ms: None,
|
|
};
|
|
|
|
let started =
|
|
processor.collect_thread_events(ServerNotification::ItemStarted(ItemStartedNotification {
|
|
item: command_item,
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
}));
|
|
assert_eq!(
|
|
started,
|
|
CollectedThreadEvents {
|
|
events: vec![ThreadEvent::ItemStarted(ItemStartedEvent {
|
|
item: ExecThreadItem {
|
|
id: "item_0".to_string(),
|
|
details: ThreadItemDetails::CommandExecution(CommandExecutionItem {
|
|
command: "ls".to_string(),
|
|
aggregated_output: String::new(),
|
|
exit_code: None,
|
|
status: CommandExecutionStatus::InProgress,
|
|
}),
|
|
},
|
|
})],
|
|
status: CodexStatus::Running,
|
|
}
|
|
);
|
|
|
|
let completed = processor.collect_thread_events(ServerNotification::ItemCompleted(
|
|
ItemCompletedNotification {
|
|
item: ThreadItem::CommandExecution {
|
|
id: "cmd-1".to_string(),
|
|
command: "ls".to_string(),
|
|
cwd: test_path_buf("/tmp/project").abs(),
|
|
process_id: Some("123".to_string()),
|
|
source: CommandExecutionSource::UserShell,
|
|
status: ApiCommandExecutionStatus::Completed,
|
|
command_actions: Vec::<CommandAction>::new(),
|
|
aggregated_output: Some("a.txt\n".to_string()),
|
|
exit_code: Some(0),
|
|
duration_ms: Some(3),
|
|
},
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
},
|
|
));
|
|
assert_eq!(
|
|
completed,
|
|
CollectedThreadEvents {
|
|
events: vec![ThreadEvent::ItemCompleted(ItemCompletedEvent {
|
|
item: ExecThreadItem {
|
|
id: "item_0".to_string(),
|
|
details: ThreadItemDetails::CommandExecution(CommandExecutionItem {
|
|
command: "ls".to_string(),
|
|
aggregated_output: "a.txt\n".to_string(),
|
|
exit_code: Some(0),
|
|
status: CommandExecutionStatus::Completed,
|
|
}),
|
|
},
|
|
})],
|
|
status: CodexStatus::Running,
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn empty_reasoning_items_are_ignored() {
|
|
let mut processor = EventProcessorWithJsonOutput::new(/*last_message_path*/ None);
|
|
|
|
let collected = processor.collect_thread_events(ServerNotification::ItemCompleted(
|
|
ItemCompletedNotification {
|
|
item: ThreadItem::Reasoning {
|
|
id: "reasoning-1".to_string(),
|
|
summary: Vec::new(),
|
|
content: vec!["raw reasoning".to_string()],
|
|
},
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
},
|
|
));
|
|
|
|
assert_eq!(
|
|
collected,
|
|
CollectedThreadEvents {
|
|
events: Vec::new(),
|
|
status: CodexStatus::Running,
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn unsupported_items_do_not_consume_synthetic_ids() {
|
|
let mut processor = EventProcessorWithJsonOutput::new(/*last_message_path*/ None);
|
|
|
|
let ignored = processor.collect_thread_events(ServerNotification::ItemCompleted(
|
|
ItemCompletedNotification {
|
|
item: ThreadItem::Plan {
|
|
id: "plan-1".to_string(),
|
|
text: "ignored plan".to_string(),
|
|
},
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
},
|
|
));
|
|
|
|
assert_eq!(
|
|
ignored,
|
|
CollectedThreadEvents {
|
|
events: Vec::new(),
|
|
status: CodexStatus::Running,
|
|
}
|
|
);
|
|
|
|
let collected = processor.collect_thread_events(ServerNotification::ItemCompleted(
|
|
ItemCompletedNotification {
|
|
item: ThreadItem::AgentMessage {
|
|
id: "message-1".to_string(),
|
|
text: "hello".to_string(),
|
|
phase: None,
|
|
memory_citation: None,
|
|
},
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
},
|
|
));
|
|
|
|
assert_eq!(
|
|
collected,
|
|
CollectedThreadEvents {
|
|
events: vec![ThreadEvent::ItemCompleted(ItemCompletedEvent {
|
|
item: ExecThreadItem {
|
|
id: "item_0".to_string(),
|
|
details: ThreadItemDetails::AgentMessage(AgentMessageItem {
|
|
text: "hello".to_string(),
|
|
}),
|
|
},
|
|
})],
|
|
status: CodexStatus::Running,
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn reasoning_items_emit_summary_not_raw_content() {
|
|
let mut processor = EventProcessorWithJsonOutput::new(/*last_message_path*/ None);
|
|
|
|
let collected = processor.collect_thread_events(ServerNotification::ItemCompleted(
|
|
ItemCompletedNotification {
|
|
item: ThreadItem::Reasoning {
|
|
id: "reasoning-1".to_string(),
|
|
summary: vec!["safe summary".to_string()],
|
|
content: vec!["raw reasoning".to_string()],
|
|
},
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
},
|
|
));
|
|
|
|
assert_eq!(
|
|
collected,
|
|
CollectedThreadEvents {
|
|
events: vec![ThreadEvent::ItemCompleted(ItemCompletedEvent {
|
|
item: ExecThreadItem {
|
|
id: "item_0".to_string(),
|
|
details: ThreadItemDetails::Reasoning(ReasoningItem {
|
|
text: "safe summary".to_string(),
|
|
}),
|
|
},
|
|
})],
|
|
status: CodexStatus::Running,
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn web_search_completion_preserves_query_and_action() {
|
|
let mut processor = EventProcessorWithJsonOutput::new(/*last_message_path*/ None);
|
|
|
|
let collected = processor.collect_thread_events(ServerNotification::ItemCompleted(
|
|
ItemCompletedNotification {
|
|
item: ThreadItem::WebSearch {
|
|
id: "search-1".to_string(),
|
|
query: "rust async await".to_string(),
|
|
action: Some(ApiWebSearchAction::Search {
|
|
query: Some("rust async await".to_string()),
|
|
queries: None,
|
|
}),
|
|
},
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
},
|
|
));
|
|
|
|
assert_eq!(
|
|
collected,
|
|
CollectedThreadEvents {
|
|
events: vec![ThreadEvent::ItemCompleted(ItemCompletedEvent {
|
|
item: ExecThreadItem {
|
|
id: "item_0".to_string(),
|
|
details: ThreadItemDetails::WebSearch(WebSearchItem {
|
|
id: "search-1".to_string(),
|
|
query: "rust async await".to_string(),
|
|
action: WebSearchAction::Search {
|
|
query: Some("rust async await".to_string()),
|
|
queries: None,
|
|
},
|
|
}),
|
|
},
|
|
})],
|
|
status: CodexStatus::Running,
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn web_search_start_and_completion_reuse_item_id() {
|
|
let mut processor = EventProcessorWithJsonOutput::new(/*last_message_path*/ None);
|
|
|
|
let started =
|
|
processor.collect_thread_events(ServerNotification::ItemStarted(ItemStartedNotification {
|
|
item: ThreadItem::WebSearch {
|
|
id: "search-1".to_string(),
|
|
query: String::new(),
|
|
action: None,
|
|
},
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
}));
|
|
|
|
let completed = processor.collect_thread_events(ServerNotification::ItemCompleted(
|
|
ItemCompletedNotification {
|
|
item: ThreadItem::WebSearch {
|
|
id: "search-1".to_string(),
|
|
query: "rust async await".to_string(),
|
|
action: Some(ApiWebSearchAction::Search {
|
|
query: Some("rust async await".to_string()),
|
|
queries: None,
|
|
}),
|
|
},
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
},
|
|
));
|
|
|
|
assert_eq!(
|
|
started,
|
|
CollectedThreadEvents {
|
|
events: vec![ThreadEvent::ItemStarted(ItemStartedEvent {
|
|
item: ExecThreadItem {
|
|
id: "item_0".to_string(),
|
|
details: ThreadItemDetails::WebSearch(WebSearchItem {
|
|
id: "search-1".to_string(),
|
|
query: String::new(),
|
|
action: WebSearchAction::Other,
|
|
}),
|
|
},
|
|
})],
|
|
status: CodexStatus::Running,
|
|
}
|
|
);
|
|
assert_eq!(
|
|
completed,
|
|
CollectedThreadEvents {
|
|
events: vec![ThreadEvent::ItemCompleted(ItemCompletedEvent {
|
|
item: ExecThreadItem {
|
|
id: "item_0".to_string(),
|
|
details: ThreadItemDetails::WebSearch(WebSearchItem {
|
|
id: "search-1".to_string(),
|
|
query: "rust async await".to_string(),
|
|
action: WebSearchAction::Search {
|
|
query: Some("rust async await".to_string()),
|
|
queries: None,
|
|
},
|
|
}),
|
|
},
|
|
})],
|
|
status: CodexStatus::Running,
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn mcp_tool_call_begin_and_end_emit_item_events() {
|
|
let mut processor = EventProcessorWithJsonOutput::new(/*last_message_path*/ None);
|
|
|
|
let started =
|
|
processor.collect_thread_events(ServerNotification::ItemStarted(ItemStartedNotification {
|
|
item: ThreadItem::McpToolCall {
|
|
id: "mcp-1".to_string(),
|
|
server: "server_a".to_string(),
|
|
tool: "tool_x".to_string(),
|
|
status: ApiMcpToolCallStatus::InProgress,
|
|
arguments: json!({ "key": "value" }),
|
|
mcp_app_resource_uri: None,
|
|
result: None,
|
|
error: None,
|
|
duration_ms: None,
|
|
},
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
}));
|
|
let completed = processor.collect_thread_events(ServerNotification::ItemCompleted(
|
|
ItemCompletedNotification {
|
|
item: ThreadItem::McpToolCall {
|
|
id: "mcp-1".to_string(),
|
|
server: "server_a".to_string(),
|
|
tool: "tool_x".to_string(),
|
|
status: ApiMcpToolCallStatus::Completed,
|
|
arguments: json!({ "key": "value" }),
|
|
mcp_app_resource_uri: None,
|
|
result: Some(Box::new(McpToolCallResult {
|
|
content: Vec::new(),
|
|
structured_content: None,
|
|
meta: None,
|
|
})),
|
|
error: None,
|
|
duration_ms: Some(1_000),
|
|
},
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
},
|
|
));
|
|
|
|
assert_eq!(
|
|
started,
|
|
CollectedThreadEvents {
|
|
events: vec![ThreadEvent::ItemStarted(ItemStartedEvent {
|
|
item: ExecThreadItem {
|
|
id: "item_0".to_string(),
|
|
details: ThreadItemDetails::McpToolCall(McpToolCallItem {
|
|
server: "server_a".to_string(),
|
|
tool: "tool_x".to_string(),
|
|
arguments: json!({ "key": "value" }),
|
|
result: None,
|
|
error: None,
|
|
status: McpToolCallStatus::InProgress,
|
|
}),
|
|
},
|
|
})],
|
|
status: CodexStatus::Running,
|
|
}
|
|
);
|
|
assert_eq!(
|
|
completed,
|
|
CollectedThreadEvents {
|
|
events: vec![ThreadEvent::ItemCompleted(ItemCompletedEvent {
|
|
item: ExecThreadItem {
|
|
id: "item_0".to_string(),
|
|
details: ThreadItemDetails::McpToolCall(McpToolCallItem {
|
|
server: "server_a".to_string(),
|
|
tool: "tool_x".to_string(),
|
|
arguments: json!({ "key": "value" }),
|
|
result: Some(McpToolCallItemResult {
|
|
content: Vec::new(),
|
|
structured_content: None,
|
|
}),
|
|
error: None,
|
|
status: McpToolCallStatus::Completed,
|
|
}),
|
|
},
|
|
})],
|
|
status: CodexStatus::Running,
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn mcp_tool_call_failure_sets_failed_status() {
|
|
let mut processor = EventProcessorWithJsonOutput::new(/*last_message_path*/ None);
|
|
|
|
let collected = processor.collect_thread_events(ServerNotification::ItemCompleted(
|
|
ItemCompletedNotification {
|
|
item: ThreadItem::McpToolCall {
|
|
id: "mcp-2".to_string(),
|
|
server: "server_b".to_string(),
|
|
tool: "tool_y".to_string(),
|
|
status: ApiMcpToolCallStatus::Failed,
|
|
arguments: json!({ "param": 42 }),
|
|
mcp_app_resource_uri: None,
|
|
result: None,
|
|
error: Some(McpToolCallError {
|
|
message: "tool exploded".to_string(),
|
|
}),
|
|
duration_ms: Some(5),
|
|
},
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
},
|
|
));
|
|
|
|
assert_eq!(
|
|
collected,
|
|
CollectedThreadEvents {
|
|
events: vec![ThreadEvent::ItemCompleted(ItemCompletedEvent {
|
|
item: ExecThreadItem {
|
|
id: "item_0".to_string(),
|
|
details: ThreadItemDetails::McpToolCall(McpToolCallItem {
|
|
server: "server_b".to_string(),
|
|
tool: "tool_y".to_string(),
|
|
arguments: json!({ "param": 42 }),
|
|
result: None,
|
|
error: Some(McpToolCallItemError {
|
|
message: "tool exploded".to_string(),
|
|
}),
|
|
status: McpToolCallStatus::Failed,
|
|
}),
|
|
},
|
|
})],
|
|
status: CodexStatus::Running,
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn mcp_tool_call_defaults_arguments_and_preserves_structured_content() {
|
|
let mut processor = EventProcessorWithJsonOutput::new(/*last_message_path*/ None);
|
|
|
|
let started =
|
|
processor.collect_thread_events(ServerNotification::ItemStarted(ItemStartedNotification {
|
|
item: ThreadItem::McpToolCall {
|
|
id: "mcp-3".to_string(),
|
|
server: "server_c".to_string(),
|
|
tool: "tool_z".to_string(),
|
|
status: ApiMcpToolCallStatus::InProgress,
|
|
arguments: serde_json::Value::Null,
|
|
mcp_app_resource_uri: None,
|
|
result: None,
|
|
error: None,
|
|
duration_ms: None,
|
|
},
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
}));
|
|
let completed = processor.collect_thread_events(ServerNotification::ItemCompleted(
|
|
ItemCompletedNotification {
|
|
item: ThreadItem::McpToolCall {
|
|
id: "mcp-3".to_string(),
|
|
server: "server_c".to_string(),
|
|
tool: "tool_z".to_string(),
|
|
status: ApiMcpToolCallStatus::Completed,
|
|
arguments: serde_json::Value::Null,
|
|
mcp_app_resource_uri: None,
|
|
result: Some(Box::new(McpToolCallResult {
|
|
content: vec![json!({
|
|
"type": "text",
|
|
"text": "done",
|
|
})],
|
|
structured_content: Some(json!({ "status": "ok" })),
|
|
meta: None,
|
|
})),
|
|
error: None,
|
|
duration_ms: Some(10),
|
|
},
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
},
|
|
));
|
|
|
|
assert_eq!(
|
|
started,
|
|
CollectedThreadEvents {
|
|
events: vec![ThreadEvent::ItemStarted(ItemStartedEvent {
|
|
item: ExecThreadItem {
|
|
id: "item_0".to_string(),
|
|
details: ThreadItemDetails::McpToolCall(McpToolCallItem {
|
|
server: "server_c".to_string(),
|
|
tool: "tool_z".to_string(),
|
|
arguments: serde_json::Value::Null,
|
|
result: None,
|
|
error: None,
|
|
status: McpToolCallStatus::InProgress,
|
|
}),
|
|
},
|
|
})],
|
|
status: CodexStatus::Running,
|
|
}
|
|
);
|
|
assert_eq!(
|
|
completed,
|
|
CollectedThreadEvents {
|
|
events: vec![ThreadEvent::ItemCompleted(ItemCompletedEvent {
|
|
item: ExecThreadItem {
|
|
id: "item_0".to_string(),
|
|
details: ThreadItemDetails::McpToolCall(McpToolCallItem {
|
|
server: "server_c".to_string(),
|
|
tool: "tool_z".to_string(),
|
|
arguments: serde_json::Value::Null,
|
|
result: Some(McpToolCallItemResult {
|
|
content: vec![json!({
|
|
"type": "text",
|
|
"text": "done",
|
|
})],
|
|
structured_content: Some(json!({ "status": "ok" })),
|
|
}),
|
|
error: None,
|
|
status: McpToolCallStatus::Completed,
|
|
}),
|
|
},
|
|
})],
|
|
status: CodexStatus::Running,
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn collab_spawn_begin_and_end_emit_item_events() {
|
|
let mut processor = EventProcessorWithJsonOutput::new(/*last_message_path*/ None);
|
|
|
|
let started =
|
|
processor.collect_thread_events(ServerNotification::ItemStarted(ItemStartedNotification {
|
|
item: ThreadItem::CollabAgentToolCall {
|
|
id: "collab-1".to_string(),
|
|
tool: CollabAgentTool::SpawnAgent,
|
|
status: ApiCollabAgentToolCallStatus::InProgress,
|
|
sender_thread_id: "thread-parent".to_string(),
|
|
receiver_thread_ids: Vec::new(),
|
|
prompt: Some("draft a plan".to_string()),
|
|
model: Some("gpt-5".to_string()),
|
|
reasoning_effort: None,
|
|
agents_states: std::collections::HashMap::new(),
|
|
},
|
|
thread_id: "thread-parent".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
}));
|
|
let completed = processor.collect_thread_events(ServerNotification::ItemCompleted(
|
|
ItemCompletedNotification {
|
|
item: ThreadItem::CollabAgentToolCall {
|
|
id: "collab-1".to_string(),
|
|
tool: CollabAgentTool::SpawnAgent,
|
|
status: ApiCollabAgentToolCallStatus::Completed,
|
|
sender_thread_id: "thread-parent".to_string(),
|
|
receiver_thread_ids: vec!["thread-child".to_string()],
|
|
prompt: Some("draft a plan".to_string()),
|
|
model: Some("gpt-5".to_string()),
|
|
reasoning_effort: None,
|
|
agents_states: std::collections::HashMap::from([(
|
|
"thread-child".to_string(),
|
|
ApiCollabAgentState {
|
|
status: ApiCollabAgentStatus::Running,
|
|
message: None,
|
|
},
|
|
)]),
|
|
},
|
|
thread_id: "thread-parent".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
},
|
|
));
|
|
|
|
assert_eq!(
|
|
started,
|
|
CollectedThreadEvents {
|
|
events: vec![ThreadEvent::ItemStarted(ItemStartedEvent {
|
|
item: ExecThreadItem {
|
|
id: "item_0".to_string(),
|
|
details: ThreadItemDetails::CollabToolCall(CollabToolCallItem {
|
|
tool: CollabTool::SpawnAgent,
|
|
sender_thread_id: "thread-parent".to_string(),
|
|
receiver_thread_ids: Vec::new(),
|
|
prompt: Some("draft a plan".to_string()),
|
|
agents_states: std::collections::HashMap::new(),
|
|
status: CollabToolCallStatus::InProgress,
|
|
},),
|
|
},
|
|
})],
|
|
status: CodexStatus::Running,
|
|
}
|
|
);
|
|
assert_eq!(
|
|
completed,
|
|
CollectedThreadEvents {
|
|
events: vec![ThreadEvent::ItemCompleted(ItemCompletedEvent {
|
|
item: ExecThreadItem {
|
|
id: "item_0".to_string(),
|
|
details: ThreadItemDetails::CollabToolCall(CollabToolCallItem {
|
|
tool: CollabTool::SpawnAgent,
|
|
sender_thread_id: "thread-parent".to_string(),
|
|
receiver_thread_ids: vec!["thread-child".to_string()],
|
|
prompt: Some("draft a plan".to_string()),
|
|
agents_states: std::collections::HashMap::from([(
|
|
"thread-child".to_string(),
|
|
CollabAgentState {
|
|
status: CollabAgentStatus::Running,
|
|
message: None,
|
|
},
|
|
)]),
|
|
status: CollabToolCallStatus::Completed,
|
|
},),
|
|
},
|
|
})],
|
|
status: CodexStatus::Running,
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn file_change_completion_maps_change_kinds() {
|
|
let mut processor = EventProcessorWithJsonOutput::new(/*last_message_path*/ None);
|
|
|
|
let collected = processor.collect_thread_events(ServerNotification::ItemCompleted(
|
|
ItemCompletedNotification {
|
|
item: ThreadItem::FileChange {
|
|
id: "patch-1".to_string(),
|
|
changes: vec![
|
|
ApiFileUpdateChange {
|
|
path: "a/added.txt".to_string(),
|
|
kind: ApiPatchChangeKind::Add,
|
|
diff: String::new(),
|
|
},
|
|
ApiFileUpdateChange {
|
|
path: "b/deleted.txt".to_string(),
|
|
kind: ApiPatchChangeKind::Delete,
|
|
diff: String::new(),
|
|
},
|
|
ApiFileUpdateChange {
|
|
path: "c/modified.txt".to_string(),
|
|
kind: ApiPatchChangeKind::Update { move_path: None },
|
|
diff: "@@ -1 +1 @@".to_string(),
|
|
},
|
|
],
|
|
status: ApiPatchApplyStatus::Completed,
|
|
},
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
},
|
|
));
|
|
|
|
assert_eq!(
|
|
collected,
|
|
CollectedThreadEvents {
|
|
events: vec![ThreadEvent::ItemCompleted(ItemCompletedEvent {
|
|
item: ExecThreadItem {
|
|
id: "item_0".to_string(),
|
|
details: ThreadItemDetails::FileChange(FileChangeItem {
|
|
changes: vec![
|
|
ExecFileUpdateChange {
|
|
path: "a/added.txt".to_string(),
|
|
kind: PatchChangeKind::Add,
|
|
},
|
|
ExecFileUpdateChange {
|
|
path: "b/deleted.txt".to_string(),
|
|
kind: PatchChangeKind::Delete,
|
|
},
|
|
ExecFileUpdateChange {
|
|
path: "c/modified.txt".to_string(),
|
|
kind: PatchChangeKind::Update,
|
|
},
|
|
],
|
|
status: PatchApplyStatus::Completed,
|
|
}),
|
|
},
|
|
})],
|
|
status: CodexStatus::Running,
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn file_change_declined_maps_to_failed_status() {
|
|
let mut processor = EventProcessorWithJsonOutput::new(/*last_message_path*/ None);
|
|
|
|
let collected = processor.collect_thread_events(ServerNotification::ItemCompleted(
|
|
ItemCompletedNotification {
|
|
item: ThreadItem::FileChange {
|
|
id: "patch-2".to_string(),
|
|
changes: vec![ApiFileUpdateChange {
|
|
path: "file.txt".to_string(),
|
|
kind: ApiPatchChangeKind::Update { move_path: None },
|
|
diff: "@@ -1 +1 @@".to_string(),
|
|
}],
|
|
status: ApiPatchApplyStatus::Declined,
|
|
},
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
},
|
|
));
|
|
|
|
assert_eq!(
|
|
collected,
|
|
CollectedThreadEvents {
|
|
events: vec![ThreadEvent::ItemCompleted(ItemCompletedEvent {
|
|
item: ExecThreadItem {
|
|
id: "item_0".to_string(),
|
|
details: ThreadItemDetails::FileChange(FileChangeItem {
|
|
changes: vec![ExecFileUpdateChange {
|
|
path: "file.txt".to_string(),
|
|
kind: PatchChangeKind::Update,
|
|
}],
|
|
status: PatchApplyStatus::Failed,
|
|
}),
|
|
},
|
|
})],
|
|
status: CodexStatus::Running,
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn agent_message_item_updates_final_message() {
|
|
let mut processor = EventProcessorWithJsonOutput::new(/*last_message_path*/ None);
|
|
|
|
let collected = processor.collect_thread_events(ServerNotification::ItemCompleted(
|
|
ItemCompletedNotification {
|
|
item: ThreadItem::AgentMessage {
|
|
id: "msg-1".to_string(),
|
|
text: "hello".to_string(),
|
|
phase: None,
|
|
memory_citation: None,
|
|
},
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
},
|
|
));
|
|
|
|
assert_eq!(
|
|
collected,
|
|
CollectedThreadEvents {
|
|
events: vec![ThreadEvent::ItemCompleted(ItemCompletedEvent {
|
|
item: ExecThreadItem {
|
|
id: "item_0".to_string(),
|
|
details: ThreadItemDetails::AgentMessage(AgentMessageItem {
|
|
text: "hello".to_string(),
|
|
}),
|
|
},
|
|
})],
|
|
status: CodexStatus::Running,
|
|
}
|
|
);
|
|
assert_eq!(processor.final_message(), Some("hello"));
|
|
}
|
|
|
|
#[test]
|
|
fn agent_message_item_started_is_ignored() {
|
|
let mut processor = EventProcessorWithJsonOutput::new(/*last_message_path*/ None);
|
|
|
|
let collected =
|
|
processor.collect_thread_events(ServerNotification::ItemStarted(ItemStartedNotification {
|
|
item: ThreadItem::AgentMessage {
|
|
id: "msg-1".to_string(),
|
|
text: "hello".to_string(),
|
|
phase: None,
|
|
memory_citation: None,
|
|
},
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
}));
|
|
|
|
assert_eq!(
|
|
collected,
|
|
CollectedThreadEvents {
|
|
events: Vec::new(),
|
|
status: CodexStatus::Running,
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn reasoning_item_completed_uses_synthetic_id() {
|
|
let mut processor = EventProcessorWithJsonOutput::new(/*last_message_path*/ None);
|
|
|
|
let collected = processor.collect_thread_events(ServerNotification::ItemCompleted(
|
|
ItemCompletedNotification {
|
|
item: ThreadItem::Reasoning {
|
|
id: "rs-1".to_string(),
|
|
summary: vec!["thinking...".to_string()],
|
|
content: vec!["raw".to_string()],
|
|
},
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
},
|
|
));
|
|
|
|
assert_eq!(
|
|
collected,
|
|
CollectedThreadEvents {
|
|
events: vec![ThreadEvent::ItemCompleted(ItemCompletedEvent {
|
|
item: ExecThreadItem {
|
|
id: "item_0".to_string(),
|
|
details: ThreadItemDetails::Reasoning(ReasoningItem {
|
|
text: "thinking...".to_string(),
|
|
}),
|
|
},
|
|
})],
|
|
status: CodexStatus::Running,
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn warning_event_produces_error_item() {
|
|
let mut processor = EventProcessorWithJsonOutput::new(/*last_message_path*/ None);
|
|
|
|
let collected = processor.collect_warning(
|
|
"Heads up: Long conversations and multiple compactions can cause the model to be less accurate. Start a new conversation when possible to keep conversations small and targeted.".to_string(),
|
|
);
|
|
|
|
assert_eq!(
|
|
collected,
|
|
CollectedThreadEvents {
|
|
events: vec![ThreadEvent::ItemCompleted(ItemCompletedEvent {
|
|
item: ExecThreadItem {
|
|
id: "item_0".to_string(),
|
|
details: ThreadItemDetails::Error(ErrorItem {
|
|
message: "Heads up: Long conversations and multiple compactions can cause the model to be less accurate. Start a new conversation when possible to keep conversations small and targeted.".to_string(),
|
|
}),
|
|
},
|
|
})],
|
|
status: CodexStatus::Running,
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn plan_update_emits_started_then_updated_then_completed() {
|
|
let mut processor = EventProcessorWithJsonOutput::new(/*last_message_path*/ None);
|
|
|
|
let started = processor.collect_thread_events(ServerNotification::TurnPlanUpdated(
|
|
TurnPlanUpdatedNotification {
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
explanation: None,
|
|
plan: vec![
|
|
TurnPlanStep {
|
|
step: "step one".to_string(),
|
|
status: TurnPlanStepStatus::Pending,
|
|
},
|
|
TurnPlanStep {
|
|
step: "step two".to_string(),
|
|
status: TurnPlanStepStatus::InProgress,
|
|
},
|
|
],
|
|
},
|
|
));
|
|
assert_eq!(
|
|
started,
|
|
CollectedThreadEvents {
|
|
events: vec![ThreadEvent::ItemStarted(ItemStartedEvent {
|
|
item: ExecThreadItem {
|
|
id: "item_0".to_string(),
|
|
details: ThreadItemDetails::TodoList(TodoListItem {
|
|
items: vec![
|
|
TodoItem {
|
|
text: "step one".to_string(),
|
|
completed: false,
|
|
},
|
|
TodoItem {
|
|
text: "step two".to_string(),
|
|
completed: false,
|
|
},
|
|
],
|
|
}),
|
|
},
|
|
})],
|
|
status: CodexStatus::Running,
|
|
}
|
|
);
|
|
|
|
let updated = processor.collect_thread_events(ServerNotification::TurnPlanUpdated(
|
|
TurnPlanUpdatedNotification {
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
explanation: None,
|
|
plan: vec![
|
|
TurnPlanStep {
|
|
step: "step one".to_string(),
|
|
status: TurnPlanStepStatus::Completed,
|
|
},
|
|
TurnPlanStep {
|
|
step: "step two".to_string(),
|
|
status: TurnPlanStepStatus::InProgress,
|
|
},
|
|
],
|
|
},
|
|
));
|
|
assert_eq!(
|
|
updated,
|
|
CollectedThreadEvents {
|
|
events: vec![ThreadEvent::ItemUpdated(ItemUpdatedEvent {
|
|
item: ExecThreadItem {
|
|
id: "item_0".to_string(),
|
|
details: ThreadItemDetails::TodoList(TodoListItem {
|
|
items: vec![
|
|
TodoItem {
|
|
text: "step one".to_string(),
|
|
completed: true,
|
|
},
|
|
TodoItem {
|
|
text: "step two".to_string(),
|
|
completed: false,
|
|
},
|
|
],
|
|
}),
|
|
},
|
|
})],
|
|
status: CodexStatus::Running,
|
|
}
|
|
);
|
|
|
|
let completed = processor.collect_thread_events(ServerNotification::TurnCompleted(
|
|
TurnCompletedNotification {
|
|
thread_id: "thread-1".to_string(),
|
|
turn: Turn {
|
|
id: "turn-1".to_string(),
|
|
items: Vec::new(),
|
|
status: TurnStatus::Completed,
|
|
error: None,
|
|
started_at: None,
|
|
completed_at: None,
|
|
duration_ms: None,
|
|
},
|
|
},
|
|
));
|
|
assert_eq!(
|
|
completed,
|
|
CollectedThreadEvents {
|
|
events: vec![
|
|
ThreadEvent::ItemCompleted(ItemCompletedEvent {
|
|
item: ExecThreadItem {
|
|
id: "item_0".to_string(),
|
|
details: ThreadItemDetails::TodoList(TodoListItem {
|
|
items: vec![
|
|
TodoItem {
|
|
text: "step one".to_string(),
|
|
completed: true,
|
|
},
|
|
TodoItem {
|
|
text: "step two".to_string(),
|
|
completed: false,
|
|
},
|
|
],
|
|
}),
|
|
},
|
|
}),
|
|
ThreadEvent::TurnCompleted(TurnCompletedEvent {
|
|
usage: Usage::default(),
|
|
}),
|
|
],
|
|
status: CodexStatus::InitiateShutdown,
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn plan_update_after_completion_starts_new_todo_list_with_new_id() {
|
|
let mut processor = EventProcessorWithJsonOutput::new(/*last_message_path*/ None);
|
|
|
|
let _ = processor.collect_thread_events(ServerNotification::TurnPlanUpdated(
|
|
TurnPlanUpdatedNotification {
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
explanation: None,
|
|
plan: vec![TurnPlanStep {
|
|
step: "only".to_string(),
|
|
status: TurnPlanStepStatus::Pending,
|
|
}],
|
|
},
|
|
));
|
|
let _ = processor.collect_thread_events(ServerNotification::TurnCompleted(
|
|
TurnCompletedNotification {
|
|
thread_id: "thread-1".to_string(),
|
|
turn: Turn {
|
|
id: "turn-1".to_string(),
|
|
items: Vec::new(),
|
|
status: TurnStatus::Completed,
|
|
error: None,
|
|
started_at: None,
|
|
completed_at: None,
|
|
duration_ms: None,
|
|
},
|
|
},
|
|
));
|
|
|
|
let restarted = processor.collect_thread_events(ServerNotification::TurnPlanUpdated(
|
|
TurnPlanUpdatedNotification {
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-2".to_string(),
|
|
explanation: None,
|
|
plan: vec![TurnPlanStep {
|
|
step: "again".to_string(),
|
|
status: TurnPlanStepStatus::Pending,
|
|
}],
|
|
},
|
|
));
|
|
|
|
assert_eq!(
|
|
restarted,
|
|
CollectedThreadEvents {
|
|
events: vec![ThreadEvent::ItemStarted(ItemStartedEvent {
|
|
item: ExecThreadItem {
|
|
id: "item_1".to_string(),
|
|
details: ThreadItemDetails::TodoList(TodoListItem {
|
|
items: vec![TodoItem {
|
|
text: "again".to_string(),
|
|
completed: false,
|
|
}],
|
|
}),
|
|
},
|
|
})],
|
|
status: CodexStatus::Running,
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn token_usage_update_is_emitted_on_turn_completion() {
|
|
let mut processor = EventProcessorWithJsonOutput::new(/*last_message_path*/ None);
|
|
|
|
let usage_update =
|
|
processor.collect_thread_events(ServerNotification::ThreadTokenUsageUpdated(
|
|
codex_app_server_protocol::ThreadTokenUsageUpdatedNotification {
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
token_usage: ThreadTokenUsage {
|
|
total: TokenUsageBreakdown {
|
|
total_tokens: 42,
|
|
input_tokens: 10,
|
|
cached_input_tokens: 3,
|
|
output_tokens: 29,
|
|
reasoning_output_tokens: 7,
|
|
},
|
|
last: TokenUsageBreakdown {
|
|
total_tokens: 42,
|
|
input_tokens: 10,
|
|
cached_input_tokens: 3,
|
|
output_tokens: 29,
|
|
reasoning_output_tokens: 7,
|
|
},
|
|
model_context_window: Some(128_000),
|
|
},
|
|
},
|
|
));
|
|
assert_eq!(
|
|
usage_update,
|
|
CollectedThreadEvents {
|
|
events: Vec::new(),
|
|
status: CodexStatus::Running,
|
|
}
|
|
);
|
|
|
|
let completed = processor.collect_thread_events(ServerNotification::TurnCompleted(
|
|
TurnCompletedNotification {
|
|
thread_id: "thread-1".to_string(),
|
|
turn: Turn {
|
|
id: "turn-1".to_string(),
|
|
items: Vec::new(),
|
|
status: TurnStatus::Completed,
|
|
error: None,
|
|
started_at: None,
|
|
completed_at: None,
|
|
duration_ms: None,
|
|
},
|
|
},
|
|
));
|
|
assert_eq!(
|
|
completed,
|
|
CollectedThreadEvents {
|
|
events: vec![ThreadEvent::TurnCompleted(TurnCompletedEvent {
|
|
usage: Usage {
|
|
input_tokens: 10,
|
|
cached_input_tokens: 3,
|
|
output_tokens: 29,
|
|
reasoning_output_tokens: 7,
|
|
},
|
|
})],
|
|
status: CodexStatus::InitiateShutdown,
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn turn_completion_recovers_final_message_from_turn_items() {
|
|
let mut processor = EventProcessorWithJsonOutput::new(/*last_message_path*/ None);
|
|
|
|
let completed = processor.collect_thread_events(ServerNotification::TurnCompleted(
|
|
TurnCompletedNotification {
|
|
thread_id: "thread-1".to_string(),
|
|
turn: Turn {
|
|
id: "turn-1".to_string(),
|
|
items: vec![ThreadItem::AgentMessage {
|
|
id: "msg-1".to_string(),
|
|
text: "final answer".to_string(),
|
|
phase: None,
|
|
memory_citation: None,
|
|
}],
|
|
status: TurnStatus::Completed,
|
|
error: None,
|
|
started_at: None,
|
|
completed_at: None,
|
|
duration_ms: None,
|
|
},
|
|
},
|
|
));
|
|
|
|
assert_eq!(
|
|
completed,
|
|
CollectedThreadEvents {
|
|
events: vec![ThreadEvent::TurnCompleted(TurnCompletedEvent {
|
|
usage: Usage::default(),
|
|
})],
|
|
status: CodexStatus::InitiateShutdown,
|
|
}
|
|
);
|
|
assert_eq!(processor.final_message(), Some("final answer"));
|
|
}
|
|
|
|
#[test]
|
|
fn turn_completion_reconciles_started_items_from_turn_items() {
|
|
let mut processor = EventProcessorWithJsonOutput::new(/*last_message_path*/ None);
|
|
|
|
let started =
|
|
processor.collect_thread_events(ServerNotification::ItemStarted(ItemStartedNotification {
|
|
item: ThreadItem::CommandExecution {
|
|
id: "cmd-1".to_string(),
|
|
command: "ls".to_string(),
|
|
cwd: test_path_buf("/tmp/project").abs(),
|
|
process_id: Some("123".to_string()),
|
|
source: CommandExecutionSource::UserShell,
|
|
status: ApiCommandExecutionStatus::InProgress,
|
|
command_actions: Vec::<CommandAction>::new(),
|
|
aggregated_output: None,
|
|
exit_code: None,
|
|
duration_ms: None,
|
|
},
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
}));
|
|
assert_eq!(
|
|
started,
|
|
CollectedThreadEvents {
|
|
events: vec![ThreadEvent::ItemStarted(ItemStartedEvent {
|
|
item: ExecThreadItem {
|
|
id: "item_0".to_string(),
|
|
details: ThreadItemDetails::CommandExecution(CommandExecutionItem {
|
|
command: "ls".to_string(),
|
|
aggregated_output: String::new(),
|
|
exit_code: None,
|
|
status: CommandExecutionStatus::InProgress,
|
|
}),
|
|
},
|
|
})],
|
|
status: CodexStatus::Running,
|
|
}
|
|
);
|
|
|
|
let completed = processor.collect_thread_events(ServerNotification::TurnCompleted(
|
|
TurnCompletedNotification {
|
|
thread_id: "thread-1".to_string(),
|
|
turn: Turn {
|
|
id: "turn-1".to_string(),
|
|
items: vec![ThreadItem::CommandExecution {
|
|
id: "cmd-1".to_string(),
|
|
command: "ls".to_string(),
|
|
cwd: test_path_buf("/tmp/project").abs(),
|
|
process_id: Some("123".to_string()),
|
|
source: CommandExecutionSource::UserShell,
|
|
status: ApiCommandExecutionStatus::Completed,
|
|
command_actions: Vec::<CommandAction>::new(),
|
|
aggregated_output: Some("a.txt\n".to_string()),
|
|
exit_code: Some(0),
|
|
duration_ms: Some(3),
|
|
}],
|
|
status: TurnStatus::Completed,
|
|
error: None,
|
|
started_at: None,
|
|
completed_at: None,
|
|
duration_ms: None,
|
|
},
|
|
},
|
|
));
|
|
|
|
assert_eq!(
|
|
completed,
|
|
CollectedThreadEvents {
|
|
events: vec![
|
|
ThreadEvent::ItemCompleted(ItemCompletedEvent {
|
|
item: ExecThreadItem {
|
|
id: "item_0".to_string(),
|
|
details: ThreadItemDetails::CommandExecution(CommandExecutionItem {
|
|
command: "ls".to_string(),
|
|
aggregated_output: "a.txt\n".to_string(),
|
|
exit_code: Some(0),
|
|
status: CommandExecutionStatus::Completed,
|
|
}),
|
|
},
|
|
}),
|
|
ThreadEvent::TurnCompleted(TurnCompletedEvent {
|
|
usage: Usage::default(),
|
|
}),
|
|
],
|
|
status: CodexStatus::InitiateShutdown,
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn turn_completion_overwrites_stale_final_message_from_turn_items() {
|
|
let mut processor = EventProcessorWithJsonOutput::new(/*last_message_path*/ None);
|
|
let _ = processor.collect_thread_events(ServerNotification::ItemCompleted(
|
|
ItemCompletedNotification {
|
|
item: ThreadItem::AgentMessage {
|
|
id: "msg-stale".to_string(),
|
|
text: "stale answer".to_string(),
|
|
phase: None,
|
|
memory_citation: None,
|
|
},
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
},
|
|
));
|
|
|
|
let completed = processor.collect_thread_events(ServerNotification::TurnCompleted(
|
|
TurnCompletedNotification {
|
|
thread_id: "thread-1".to_string(),
|
|
turn: Turn {
|
|
id: "turn-1".to_string(),
|
|
items: vec![ThreadItem::AgentMessage {
|
|
id: "msg-1".to_string(),
|
|
text: "final answer".to_string(),
|
|
phase: None,
|
|
memory_citation: None,
|
|
}],
|
|
status: TurnStatus::Completed,
|
|
error: None,
|
|
started_at: None,
|
|
completed_at: None,
|
|
duration_ms: None,
|
|
},
|
|
},
|
|
));
|
|
|
|
assert_eq!(
|
|
completed,
|
|
CollectedThreadEvents {
|
|
events: vec![ThreadEvent::TurnCompleted(TurnCompletedEvent {
|
|
usage: Usage::default(),
|
|
})],
|
|
status: CodexStatus::InitiateShutdown,
|
|
}
|
|
);
|
|
assert_eq!(processor.final_message(), Some("final answer"));
|
|
}
|
|
|
|
#[test]
|
|
fn turn_completion_preserves_streamed_final_message_when_turn_items_are_empty() {
|
|
let mut processor = EventProcessorWithJsonOutput::new(/*last_message_path*/ None);
|
|
let _ = processor.collect_thread_events(ServerNotification::ItemCompleted(
|
|
ItemCompletedNotification {
|
|
item: ThreadItem::AgentMessage {
|
|
id: "msg-streamed".to_string(),
|
|
text: "streamed answer".to_string(),
|
|
phase: None,
|
|
memory_citation: None,
|
|
},
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
},
|
|
));
|
|
|
|
let completed = processor.collect_thread_events(ServerNotification::TurnCompleted(
|
|
TurnCompletedNotification {
|
|
thread_id: "thread-1".to_string(),
|
|
turn: Turn {
|
|
id: "turn-1".to_string(),
|
|
items: Vec::new(),
|
|
status: TurnStatus::Completed,
|
|
error: None,
|
|
started_at: None,
|
|
completed_at: None,
|
|
duration_ms: None,
|
|
},
|
|
},
|
|
));
|
|
|
|
assert_eq!(
|
|
completed,
|
|
CollectedThreadEvents {
|
|
events: vec![ThreadEvent::TurnCompleted(TurnCompletedEvent {
|
|
usage: Usage::default(),
|
|
})],
|
|
status: CodexStatus::InitiateShutdown,
|
|
}
|
|
);
|
|
assert_eq!(processor.final_message(), Some("streamed answer"));
|
|
}
|
|
|
|
#[test]
|
|
fn failed_turn_clears_stale_final_message() {
|
|
let mut processor = EventProcessorWithJsonOutput::new(/*last_message_path*/ None);
|
|
|
|
let collected = processor.collect_thread_events(ServerNotification::ItemCompleted(
|
|
ItemCompletedNotification {
|
|
item: ThreadItem::AgentMessage {
|
|
id: "msg-1".to_string(),
|
|
text: "partial answer".to_string(),
|
|
phase: None,
|
|
memory_citation: None,
|
|
},
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
},
|
|
));
|
|
|
|
assert_eq!(collected.status, CodexStatus::Running);
|
|
assert_eq!(processor.final_message(), Some("partial answer"));
|
|
|
|
let collected = processor.collect_thread_events(ServerNotification::TurnCompleted(
|
|
TurnCompletedNotification {
|
|
thread_id: "thread-1".to_string(),
|
|
turn: Turn {
|
|
id: "turn-1".to_string(),
|
|
items: Vec::new(),
|
|
status: TurnStatus::Failed,
|
|
error: Some(TurnError {
|
|
message: "turn failed".to_string(),
|
|
additional_details: None,
|
|
codex_error_info: None,
|
|
}),
|
|
started_at: None,
|
|
completed_at: None,
|
|
duration_ms: None,
|
|
},
|
|
},
|
|
));
|
|
|
|
assert_eq!(collected.status, CodexStatus::InitiateShutdown);
|
|
assert_eq!(processor.final_message(), None);
|
|
}
|
|
|
|
#[test]
|
|
fn turn_completion_falls_back_to_final_plan_text() {
|
|
let mut processor = EventProcessorWithJsonOutput::new(/*last_message_path*/ None);
|
|
|
|
let completed = processor.collect_thread_events(ServerNotification::TurnCompleted(
|
|
TurnCompletedNotification {
|
|
thread_id: "thread-1".to_string(),
|
|
turn: Turn {
|
|
id: "turn-1".to_string(),
|
|
items: vec![ThreadItem::Plan {
|
|
id: "plan-1".to_string(),
|
|
text: "ship the typed adapter".to_string(),
|
|
}],
|
|
status: TurnStatus::Completed,
|
|
error: None,
|
|
started_at: None,
|
|
completed_at: None,
|
|
duration_ms: None,
|
|
},
|
|
},
|
|
));
|
|
|
|
assert_eq!(
|
|
completed,
|
|
CollectedThreadEvents {
|
|
events: vec![ThreadEvent::TurnCompleted(TurnCompletedEvent {
|
|
usage: Usage::default(),
|
|
})],
|
|
status: CodexStatus::InitiateShutdown,
|
|
}
|
|
);
|
|
assert_eq!(processor.final_message(), Some("ship the typed adapter"));
|
|
}
|
|
|
|
#[test]
|
|
fn turn_failure_prefers_structured_error_message() {
|
|
let mut processor = EventProcessorWithJsonOutput::new(/*last_message_path*/ None);
|
|
|
|
let error = processor.collect_thread_events(ServerNotification::Error(ErrorNotification {
|
|
error: TurnError {
|
|
message: "backend failed".to_string(),
|
|
codex_error_info: None,
|
|
additional_details: Some("request id abc".to_string()),
|
|
},
|
|
will_retry: false,
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
}));
|
|
assert_eq!(
|
|
error,
|
|
CollectedThreadEvents {
|
|
events: vec![ThreadEvent::Error(ThreadErrorEvent {
|
|
message: "backend failed (request id abc)".to_string(),
|
|
})],
|
|
status: CodexStatus::Running,
|
|
}
|
|
);
|
|
|
|
let failed = processor.collect_thread_events(ServerNotification::TurnCompleted(
|
|
TurnCompletedNotification {
|
|
thread_id: "thread-1".to_string(),
|
|
turn: Turn {
|
|
id: "turn-1".to_string(),
|
|
items: Vec::new(),
|
|
status: TurnStatus::Failed,
|
|
error: None,
|
|
started_at: None,
|
|
completed_at: None,
|
|
duration_ms: None,
|
|
},
|
|
},
|
|
));
|
|
assert_eq!(
|
|
failed,
|
|
CollectedThreadEvents {
|
|
events: vec![ThreadEvent::TurnFailed(TurnFailedEvent {
|
|
error: ThreadErrorEvent {
|
|
message: "backend failed (request id abc)".to_string(),
|
|
},
|
|
})],
|
|
status: CodexStatus::InitiateShutdown,
|
|
}
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn model_reroute_surfaces_as_error_item() {
|
|
let mut processor = EventProcessorWithJsonOutput::new(/*last_message_path*/ None);
|
|
|
|
let collected = processor.collect_thread_events(ServerNotification::ModelRerouted(
|
|
codex_app_server_protocol::ModelReroutedNotification {
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
from_model: "gpt-5".to_string(),
|
|
to_model: "gpt-5-mini".to_string(),
|
|
reason: codex_app_server_protocol::ModelRerouteReason::HighRiskCyberActivity,
|
|
},
|
|
));
|
|
|
|
assert_eq!(collected.status, CodexStatus::Running);
|
|
assert_eq!(collected.events.len(), 1);
|
|
let ThreadEvent::ItemCompleted(ItemCompletedEvent { item }) = &collected.events[0] else {
|
|
panic!("expected ItemCompleted");
|
|
};
|
|
assert_eq!(item.id, "item_0");
|
|
assert_eq!(
|
|
item.details,
|
|
ThreadItemDetails::Error(ErrorItem {
|
|
message: "model rerouted: gpt-5 -> gpt-5-mini (HighRiskCyberActivity)".to_string(),
|
|
})
|
|
);
|
|
}
|