renaming: task to turn (#8963)

This commit is contained in:
jif-oai
2026-01-09 17:31:17 +00:00
committed by GitHub
parent ed64804cb5
commit 1aed01e99f
58 changed files with 362 additions and 349 deletions

View File

@@ -34,22 +34,7 @@ use crate::exec_events::TurnStartedEvent;
use crate::exec_events::Usage;
use crate::exec_events::WebSearchItem;
use codex_core::config::Config;
use codex_core::protocol::AgentMessageEvent;
use codex_core::protocol::AgentReasoningEvent;
use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
use codex_core::protocol::ExecCommandBeginEvent;
use codex_core::protocol::ExecCommandEndEvent;
use codex_core::protocol::FileChange;
use codex_core::protocol::McpToolCallBeginEvent;
use codex_core::protocol::McpToolCallEndEvent;
use codex_core::protocol::PatchApplyBeginEvent;
use codex_core::protocol::PatchApplyEndEvent;
use codex_core::protocol::SessionConfiguredEvent;
use codex_core::protocol::TaskCompleteEvent;
use codex_core::protocol::TaskStartedEvent;
use codex_core::protocol::TerminalInteractionEvent;
use codex_core::protocol::WebSearchEndEvent;
use codex_core::protocol;
use codex_protocol::plan_tool::StepStatus;
use codex_protocol::plan_tool::UpdatePlanArgs;
use serde_json::Value as JsonValue;
@@ -61,7 +46,7 @@ pub struct EventProcessorWithJsonOutput {
next_event_id: AtomicU64,
// Tracks running commands by call_id, including the associated item id.
running_commands: HashMap<String, RunningCommand>,
running_patch_applies: HashMap<String, PatchApplyBeginEvent>,
running_patch_applies: HashMap<String, protocol::PatchApplyBeginEvent>,
// Tracks the todo list for the current turn (at most one per turn).
running_todo_list: Option<RunningTodoList>,
last_total_token_usage: Option<codex_core::protocol::TokenUsage>,
@@ -104,39 +89,39 @@ impl EventProcessorWithJsonOutput {
}
}
pub fn collect_thread_events(&mut self, event: &Event) -> Vec<ThreadEvent> {
pub fn collect_thread_events(&mut self, event: &protocol::Event) -> Vec<ThreadEvent> {
match &event.msg {
EventMsg::SessionConfigured(ev) => self.handle_session_configured(ev),
EventMsg::AgentMessage(ev) => self.handle_agent_message(ev),
EventMsg::AgentReasoning(ev) => self.handle_reasoning_event(ev),
EventMsg::ExecCommandBegin(ev) => self.handle_exec_command_begin(ev),
EventMsg::ExecCommandEnd(ev) => self.handle_exec_command_end(ev),
EventMsg::TerminalInteraction(ev) => self.handle_terminal_interaction(ev),
EventMsg::ExecCommandOutputDelta(ev) => {
protocol::EventMsg::SessionConfigured(ev) => self.handle_session_configured(ev),
protocol::EventMsg::AgentMessage(ev) => self.handle_agent_message(ev),
protocol::EventMsg::AgentReasoning(ev) => self.handle_reasoning_event(ev),
protocol::EventMsg::ExecCommandBegin(ev) => self.handle_exec_command_begin(ev),
protocol::EventMsg::ExecCommandEnd(ev) => self.handle_exec_command_end(ev),
protocol::EventMsg::TerminalInteraction(ev) => self.handle_terminal_interaction(ev),
protocol::EventMsg::ExecCommandOutputDelta(ev) => {
self.handle_output_chunk(&ev.call_id, &ev.chunk)
}
EventMsg::McpToolCallBegin(ev) => self.handle_mcp_tool_call_begin(ev),
EventMsg::McpToolCallEnd(ev) => self.handle_mcp_tool_call_end(ev),
EventMsg::PatchApplyBegin(ev) => self.handle_patch_apply_begin(ev),
EventMsg::PatchApplyEnd(ev) => self.handle_patch_apply_end(ev),
EventMsg::WebSearchBegin(_) => Vec::new(),
EventMsg::WebSearchEnd(ev) => self.handle_web_search_end(ev),
EventMsg::TokenCount(ev) => {
protocol::EventMsg::McpToolCallBegin(ev) => self.handle_mcp_tool_call_begin(ev),
protocol::EventMsg::McpToolCallEnd(ev) => self.handle_mcp_tool_call_end(ev),
protocol::EventMsg::PatchApplyBegin(ev) => self.handle_patch_apply_begin(ev),
protocol::EventMsg::PatchApplyEnd(ev) => self.handle_patch_apply_end(ev),
protocol::EventMsg::WebSearchBegin(_) => Vec::new(),
protocol::EventMsg::WebSearchEnd(ev) => self.handle_web_search_end(ev),
protocol::EventMsg::TokenCount(ev) => {
if let Some(info) = &ev.info {
self.last_total_token_usage = Some(info.total_token_usage.clone());
}
Vec::new()
}
EventMsg::TaskStarted(ev) => self.handle_task_started(ev),
EventMsg::TaskComplete(_) => self.handle_task_complete(),
EventMsg::Error(ev) => {
protocol::EventMsg::TurnStarted(ev) => self.handle_task_started(ev),
protocol::EventMsg::TurnComplete(_) => self.handle_task_complete(),
protocol::EventMsg::Error(ev) => {
let error = ThreadErrorEvent {
message: ev.message.clone(),
};
self.last_critical_error = Some(error.clone());
vec![ThreadEvent::Error(error)]
}
EventMsg::Warning(ev) => {
protocol::EventMsg::Warning(ev) => {
let item = ThreadItem {
id: self.get_next_item_id(),
details: ThreadItemDetails::Error(ErrorItem {
@@ -145,7 +130,7 @@ impl EventProcessorWithJsonOutput {
};
vec![ThreadEvent::ItemCompleted(ItemCompletedEvent { item })]
}
EventMsg::StreamError(ev) => {
protocol::EventMsg::StreamError(ev) => {
let message = match &ev.additional_details {
Some(details) if !details.trim().is_empty() => {
format!("{} ({})", ev.message, details)
@@ -154,7 +139,7 @@ impl EventProcessorWithJsonOutput {
};
vec![ThreadEvent::Error(ThreadErrorEvent { message })]
}
EventMsg::PlanUpdate(ev) => self.handle_plan_update(ev),
protocol::EventMsg::PlanUpdate(ev) => self.handle_plan_update(ev),
_ => Vec::new(),
}
}
@@ -167,13 +152,16 @@ impl EventProcessorWithJsonOutput {
)
}
fn handle_session_configured(&self, payload: &SessionConfiguredEvent) -> Vec<ThreadEvent> {
fn handle_session_configured(
&self,
payload: &protocol::SessionConfiguredEvent,
) -> Vec<ThreadEvent> {
vec![ThreadEvent::ThreadStarted(ThreadStartedEvent {
thread_id: payload.session_id.to_string(),
})]
}
fn handle_web_search_end(&self, ev: &WebSearchEndEvent) -> Vec<ThreadEvent> {
fn handle_web_search_end(&self, ev: &protocol::WebSearchEndEvent) -> Vec<ThreadEvent> {
let item = ThreadItem {
id: self.get_next_item_id(),
details: ThreadItemDetails::WebSearch(WebSearchItem {
@@ -189,12 +177,15 @@ impl EventProcessorWithJsonOutput {
vec![]
}
fn handle_terminal_interaction(&mut self, _ev: &TerminalInteractionEvent) -> Vec<ThreadEvent> {
fn handle_terminal_interaction(
&mut self,
_ev: &protocol::TerminalInteractionEvent,
) -> Vec<ThreadEvent> {
//TODO see how we want to process them
vec![]
}
fn handle_agent_message(&self, payload: &AgentMessageEvent) -> Vec<ThreadEvent> {
fn handle_agent_message(&self, payload: &protocol::AgentMessageEvent) -> Vec<ThreadEvent> {
let item = ThreadItem {
id: self.get_next_item_id(),
@@ -206,7 +197,7 @@ impl EventProcessorWithJsonOutput {
vec![ThreadEvent::ItemCompleted(ItemCompletedEvent { item })]
}
fn handle_reasoning_event(&self, ev: &AgentReasoningEvent) -> Vec<ThreadEvent> {
fn handle_reasoning_event(&self, ev: &protocol::AgentReasoningEvent) -> Vec<ThreadEvent> {
let item = ThreadItem {
id: self.get_next_item_id(),
@@ -217,7 +208,10 @@ impl EventProcessorWithJsonOutput {
vec![ThreadEvent::ItemCompleted(ItemCompletedEvent { item })]
}
fn handle_exec_command_begin(&mut self, ev: &ExecCommandBeginEvent) -> Vec<ThreadEvent> {
fn handle_exec_command_begin(
&mut self,
ev: &protocol::ExecCommandBeginEvent,
) -> Vec<ThreadEvent> {
let item_id = self.get_next_item_id();
let command_string = match shlex::try_join(ev.command.iter().map(String::as_str)) {
@@ -253,7 +247,10 @@ impl EventProcessorWithJsonOutput {
vec![ThreadEvent::ItemStarted(ItemStartedEvent { item })]
}
fn handle_mcp_tool_call_begin(&mut self, ev: &McpToolCallBeginEvent) -> Vec<ThreadEvent> {
fn handle_mcp_tool_call_begin(
&mut self,
ev: &protocol::McpToolCallBeginEvent,
) -> Vec<ThreadEvent> {
let item_id = self.get_next_item_id();
let server = ev.invocation.server.clone();
let tool = ev.invocation.tool.clone();
@@ -284,7 +281,7 @@ impl EventProcessorWithJsonOutput {
vec![ThreadEvent::ItemStarted(ItemStartedEvent { item })]
}
fn handle_mcp_tool_call_end(&mut self, ev: &McpToolCallEndEvent) -> Vec<ThreadEvent> {
fn handle_mcp_tool_call_end(&mut self, ev: &protocol::McpToolCallEndEvent) -> Vec<ThreadEvent> {
let status = if ev.is_success() {
McpToolCallStatus::Completed
} else {
@@ -344,22 +341,25 @@ impl EventProcessorWithJsonOutput {
vec![ThreadEvent::ItemCompleted(ItemCompletedEvent { item })]
}
fn handle_patch_apply_begin(&mut self, ev: &PatchApplyBeginEvent) -> Vec<ThreadEvent> {
fn handle_patch_apply_begin(
&mut self,
ev: &protocol::PatchApplyBeginEvent,
) -> Vec<ThreadEvent> {
self.running_patch_applies
.insert(ev.call_id.clone(), ev.clone());
Vec::new()
}
fn map_change_kind(&self, kind: &FileChange) -> PatchChangeKind {
fn map_change_kind(&self, kind: &protocol::FileChange) -> PatchChangeKind {
match kind {
FileChange::Add { .. } => PatchChangeKind::Add,
FileChange::Delete { .. } => PatchChangeKind::Delete,
FileChange::Update { .. } => PatchChangeKind::Update,
protocol::FileChange::Add { .. } => PatchChangeKind::Add,
protocol::FileChange::Delete { .. } => PatchChangeKind::Delete,
protocol::FileChange::Update { .. } => PatchChangeKind::Update,
}
}
fn handle_patch_apply_end(&mut self, ev: &PatchApplyEndEvent) -> Vec<ThreadEvent> {
fn handle_patch_apply_end(&mut self, ev: &protocol::PatchApplyEndEvent) -> Vec<ThreadEvent> {
if let Some(running_patch_apply) = self.running_patch_applies.remove(&ev.call_id) {
let status = if ev.success {
PatchApplyStatus::Completed
@@ -388,7 +388,7 @@ impl EventProcessorWithJsonOutput {
Vec::new()
}
fn handle_exec_command_end(&mut self, ev: &ExecCommandEndEvent) -> Vec<ThreadEvent> {
fn handle_exec_command_end(&mut self, ev: &protocol::ExecCommandEndEvent) -> Vec<ThreadEvent> {
let Some(RunningCommand {
command,
item_id,
@@ -459,7 +459,7 @@ impl EventProcessorWithJsonOutput {
vec![ThreadEvent::ItemStarted(ItemStartedEvent { item })]
}
fn handle_task_started(&mut self, _: &TaskStartedEvent) -> Vec<ThreadEvent> {
fn handle_task_started(&mut self, _: &protocol::TurnStartedEvent) -> Vec<ThreadEvent> {
self.last_critical_error = None;
vec![ThreadEvent::TurnStarted(TurnStartedEvent {})]
}
@@ -513,15 +513,15 @@ impl EventProcessorWithJsonOutput {
}
impl EventProcessor for EventProcessorWithJsonOutput {
fn print_config_summary(&mut self, _: &Config, _: &str, ev: &SessionConfiguredEvent) {
self.process_event(Event {
fn print_config_summary(&mut self, _: &Config, _: &str, ev: &protocol::SessionConfiguredEvent) {
self.process_event(protocol::Event {
id: "".to_string(),
msg: EventMsg::SessionConfigured(ev.clone()),
msg: protocol::EventMsg::SessionConfigured(ev.clone()),
});
}
#[allow(clippy::print_stdout)]
fn process_event(&mut self, event: Event) -> CodexStatus {
fn process_event(&mut self, event: protocol::Event) -> CodexStatus {
let aggregated = self.collect_thread_events(&event);
for conv_event in aggregated {
match serde_json::to_string(&conv_event) {
@@ -534,9 +534,12 @@ impl EventProcessor for EventProcessorWithJsonOutput {
}
}
let Event { msg, .. } = event;
let protocol::Event { msg, .. } = event;
if let EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }) = msg {
if let protocol::EventMsg::TurnComplete(protocol::TurnCompleteEvent {
last_agent_message,
}) = msg
{
if let Some(output_file) = self.last_message_path.as_deref() {
handle_last_message(last_agent_message.as_deref(), output_file);
}