mirror of
https://github.com/openai/codex.git
synced 2026-02-02 06:57:03 +00:00
Compare commits
8 Commits
exec-run-a
...
owen/threa
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d13b0ea8ad | ||
|
|
09251387e0 | ||
|
|
e471ebc5d2 | ||
|
|
375a5ef051 | ||
|
|
fdc69df454 | ||
|
|
01d7f8095b | ||
|
|
3ba702c5b6 | ||
|
|
6316e57497 |
@@ -2420,6 +2420,8 @@ pub struct ToolRequestUserInputQuestion {
|
||||
pub id: String,
|
||||
pub header: String,
|
||||
pub question: String,
|
||||
#[serde(default)]
|
||||
pub is_other: bool,
|
||||
pub options: Option<Vec<ToolRequestUserInputOption>>,
|
||||
}
|
||||
|
||||
|
||||
@@ -278,6 +278,7 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
id: question.id,
|
||||
header: question.header,
|
||||
question: question.question,
|
||||
is_other: question.is_other,
|
||||
options: question.options.map(|options| {
|
||||
options
|
||||
.into_iter()
|
||||
|
||||
@@ -134,7 +134,7 @@ fn project_config_warning(config: &Config) -> Option<ConfigWarningNotification>
|
||||
.disabled_reason
|
||||
.as_ref()
|
||||
.map(ToString::to_string)
|
||||
.unwrap_or_else(|| "Config folder disabled.".to_string()),
|
||||
.unwrap_or_else(|| "config.toml is disabled.".to_string()),
|
||||
));
|
||||
}
|
||||
}
|
||||
@@ -143,7 +143,11 @@ fn project_config_warning(config: &Config) -> Option<ConfigWarningNotification>
|
||||
return None;
|
||||
}
|
||||
|
||||
let mut message = "The following config folders are disabled:\n".to_string();
|
||||
let mut message = concat!(
|
||||
"Project config.toml files are disabled in the following folders. ",
|
||||
"Settings in those files are ignored, but skills and exec policies still load.\n",
|
||||
)
|
||||
.to_string();
|
||||
for (index, (folder, reason)) in disabled_folders.iter().enumerate() {
|
||||
let display_index = index + 1;
|
||||
message.push_str(&format!(" {display_index}. {folder}\n"));
|
||||
|
||||
@@ -67,6 +67,7 @@ pub fn create_request_user_input_sse_response(call_id: &str) -> anyhow::Result<S
|
||||
"id": "confirm_path",
|
||||
"header": "Confirm",
|
||||
"question": "Proceed with the plan?",
|
||||
"isOther": false,
|
||||
"options": [{
|
||||
"label": "Yes (Recommended)",
|
||||
"description": "Continue the current plan."
|
||||
|
||||
@@ -2,7 +2,9 @@ use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_fake_rollout_with_text_elements;
|
||||
use app_test_support::create_mock_responses_server_repeating_assistant;
|
||||
use app_test_support::rollout_path;
|
||||
use app_test_support::to_response;
|
||||
use chrono::Utc;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::SessionSource;
|
||||
@@ -14,6 +16,7 @@ use codex_app_server_protocol::UserInput;
|
||||
use codex_protocol::user_input::ByteRange;
|
||||
use codex_protocol::user_input::TextElement;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::fs::FileTimes;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use tempfile::TempDir;
|
||||
@@ -134,6 +137,57 @@ async fn thread_read_can_include_turns() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_read_does_not_change_updated_at_or_mtime() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let preview = "Saved user message";
|
||||
let filename_ts = "2025-01-05T12-00-00";
|
||||
let meta_rfc3339 = "2025-01-05T12:00:00Z";
|
||||
let expected_updated_at_rfc3339 = "2025-01-07T00:00:00Z";
|
||||
let conversation_id = create_fake_rollout_with_text_elements(
|
||||
codex_home.path(),
|
||||
filename_ts,
|
||||
meta_rfc3339,
|
||||
preview,
|
||||
Vec::new(),
|
||||
Some("mock_provider"),
|
||||
None,
|
||||
)?;
|
||||
let rollout_file_path = rollout_path(codex_home.path(), filename_ts, &conversation_id);
|
||||
set_rollout_mtime(rollout_file_path.as_path(), expected_updated_at_rfc3339)?;
|
||||
|
||||
let before_modified = std::fs::metadata(&rollout_file_path)?.modified()?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let read_id = mcp
|
||||
.send_thread_read_request(ThreadReadParams {
|
||||
thread_id: conversation_id,
|
||||
include_turns: false,
|
||||
})
|
||||
.await?;
|
||||
let read_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadReadResponse { thread } = to_response::<ThreadReadResponse>(read_resp)?;
|
||||
|
||||
let expected_updated_at = chrono::DateTime::parse_from_rfc3339(expected_updated_at_rfc3339)?
|
||||
.with_timezone(&Utc)
|
||||
.timestamp();
|
||||
assert_eq!(thread.updated_at, expected_updated_at);
|
||||
|
||||
let after_modified = std::fs::metadata(&rollout_file_path)?.modified()?;
|
||||
assert_eq!(after_modified, before_modified);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Helper to create a config.toml pointing at the mock model server.
|
||||
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
|
||||
let config_toml = codex_home.join("config.toml");
|
||||
@@ -157,3 +211,13 @@ stream_max_retries = 0
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
fn set_rollout_mtime(path: &Path, updated_at_rfc3339: &str) -> Result<()> {
|
||||
let parsed = chrono::DateTime::parse_from_rfc3339(updated_at_rfc3339)?.with_timezone(&Utc);
|
||||
let times = FileTimes::new().set_modified(parsed.into());
|
||||
std::fs::OpenOptions::new()
|
||||
.append(true)
|
||||
.open(path)?
|
||||
.set_times(times)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -512,10 +512,10 @@ impl ProjectTrustContext {
|
||||
let user_config_file = self.user_config_file.as_path().display();
|
||||
match decision.trust_level {
|
||||
Some(TrustLevel::Untrusted) => Some(format!(
|
||||
"{trust_key} is marked as untrusted in {user_config_file}. Mark it trusted to enable project config folders."
|
||||
"{trust_key} is marked as untrusted in {user_config_file}. To load config.toml, mark it trusted."
|
||||
)),
|
||||
_ => Some(format!(
|
||||
"Add {trust_key} as a trusted project in {user_config_file}."
|
||||
"To load config.toml, add {trust_key} as a trusted project in {user_config_file}."
|
||||
)),
|
||||
}
|
||||
}
|
||||
@@ -526,21 +526,16 @@ fn project_layer_entry(
|
||||
dot_codex_folder: &AbsolutePathBuf,
|
||||
layer_dir: &AbsolutePathBuf,
|
||||
config: TomlValue,
|
||||
config_toml_exists: bool,
|
||||
) -> ConfigLayerEntry {
|
||||
match trust_context.disabled_reason_for_dir(layer_dir) {
|
||||
Some(reason) => ConfigLayerEntry::new_disabled(
|
||||
ConfigLayerSource::Project {
|
||||
dot_codex_folder: dot_codex_folder.clone(),
|
||||
},
|
||||
config,
|
||||
reason,
|
||||
),
|
||||
None => ConfigLayerEntry::new(
|
||||
ConfigLayerSource::Project {
|
||||
dot_codex_folder: dot_codex_folder.clone(),
|
||||
},
|
||||
config,
|
||||
),
|
||||
let source = ConfigLayerSource::Project {
|
||||
dot_codex_folder: dot_codex_folder.clone(),
|
||||
};
|
||||
|
||||
if config_toml_exists && let Some(reason) = trust_context.disabled_reason_for_dir(layer_dir) {
|
||||
ConfigLayerEntry::new_disabled(source, config, reason)
|
||||
} else {
|
||||
ConfigLayerEntry::new(source, config)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -715,13 +710,15 @@ async fn load_project_layers(
|
||||
&dot_codex_abs,
|
||||
&layer_dir,
|
||||
TomlValue::Table(toml::map::Map::new()),
|
||||
true,
|
||||
));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let config =
|
||||
resolve_relative_paths_in_config_toml(config, dot_codex_abs.as_path())?;
|
||||
let entry = project_layer_entry(trust_context, &dot_codex_abs, &layer_dir, config);
|
||||
let entry =
|
||||
project_layer_entry(trust_context, &dot_codex_abs, &layer_dir, config, true);
|
||||
layers.push(entry);
|
||||
}
|
||||
Err(err) => {
|
||||
@@ -734,6 +731,7 @@ async fn load_project_layers(
|
||||
&dot_codex_abs,
|
||||
&layer_dir,
|
||||
TomlValue::Table(toml::map::Map::new()),
|
||||
false,
|
||||
));
|
||||
} else {
|
||||
let config_file_display = config_file.as_path().display();
|
||||
|
||||
@@ -464,8 +464,6 @@ mod tests {
|
||||
use pretty_assertions::assert_eq;
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::ffi::OsStrExt;
|
||||
#[cfg(target_os = "linux")]
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
#[cfg(unix)]
|
||||
use std::process::Command;
|
||||
#[cfg(target_os = "linux")]
|
||||
@@ -562,27 +560,16 @@ mod tests {
|
||||
use tokio::time::sleep;
|
||||
|
||||
let dir = tempdir()?;
|
||||
let shell_path = dir.path().join("hanging-shell.sh");
|
||||
let pid_path = dir.path().join("pid");
|
||||
|
||||
let script = format!(
|
||||
"#!/bin/sh\n\
|
||||
echo $$ > {}\n\
|
||||
sleep 30\n",
|
||||
pid_path.display()
|
||||
);
|
||||
fs::write(&shell_path, script).await?;
|
||||
let mut permissions = std::fs::metadata(&shell_path)?.permissions();
|
||||
permissions.set_mode(0o755);
|
||||
std::fs::set_permissions(&shell_path, permissions)?;
|
||||
let script = format!("echo $$ > \"{}\"; sleep 30", pid_path.display());
|
||||
|
||||
let shell = Shell {
|
||||
shell_type: ShellType::Sh,
|
||||
shell_path,
|
||||
shell_path: PathBuf::from("/bin/sh"),
|
||||
shell_snapshot: crate::shell::empty_shell_snapshot_receiver(),
|
||||
};
|
||||
|
||||
let err = run_script_with_timeout(&shell, "ignored", Duration::from_millis(500), true)
|
||||
let err = run_script_with_timeout(&shell, &script, Duration::from_secs(1), true)
|
||||
.await
|
||||
.expect_err("snapshot shell should time out");
|
||||
assert!(
|
||||
|
||||
@@ -41,7 +41,7 @@ pub(crate) use undo::UndoTask;
|
||||
pub(crate) use user_shell::UserShellCommandTask;
|
||||
|
||||
const GRACEFULL_INTERRUPTION_TIMEOUT_MS: u64 = 100;
|
||||
const TURN_ABORTED_INTERRUPTED_GUIDANCE: &str = "The user interrupted the previous turn. Do not continue or repeat work from that turn unless the user explicitly asks. If any tools/commands were aborted, they may have partially executed; verify current state before retrying.";
|
||||
const TURN_ABORTED_INTERRUPTED_GUIDANCE: &str = "The user interrupted the previous turn on purpose. If any tools/commands were aborted, they may have partially executed; verify current state before retrying.";
|
||||
|
||||
/// Thin wrapper that exposes the parts of [`Session`] task runners need.
|
||||
#[derive(Clone)]
|
||||
@@ -253,7 +253,7 @@ impl Session {
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::InputText {
|
||||
text: format!(
|
||||
"{TURN_ABORTED_OPEN_TAG}\n <turn_id>{sub_id}</turn_id>\n <reason>interrupted</reason>\n <guidance>{TURN_ABORTED_INTERRUPTED_GUIDANCE}</guidance>\n</turn_aborted>"
|
||||
"{TURN_ABORTED_OPEN_TAG}\n{TURN_ABORTED_INTERRUPTED_GUIDANCE}\n</turn_aborted>"
|
||||
),
|
||||
}],
|
||||
end_turn: None,
|
||||
|
||||
@@ -28,6 +28,8 @@ use serde::Serialize;
|
||||
|
||||
pub struct CollabHandler;
|
||||
|
||||
/// Minimum wait timeout to prevent tight polling loops from burning CPU.
|
||||
pub(crate) const MIN_WAIT_TIMEOUT_MS: i64 = 10_000;
|
||||
pub(crate) const DEFAULT_WAIT_TIMEOUT_MS: i64 = 30_000;
|
||||
pub(crate) const MAX_WAIT_TIMEOUT_MS: i64 = 300_000;
|
||||
|
||||
@@ -323,6 +325,8 @@ mod wait {
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
// Validate timeout.
|
||||
// Very short timeouts encourage busy-polling loops in the orchestrator prompt and can
|
||||
// cause high CPU usage even with a single active worker, so clamp to a minimum.
|
||||
let timeout_ms = args.timeout_ms.unwrap_or(DEFAULT_WAIT_TIMEOUT_MS);
|
||||
let timeout_ms = match timeout_ms {
|
||||
ms if ms <= 0 => {
|
||||
@@ -330,7 +334,7 @@ mod wait {
|
||||
"timeout_ms must be greater than zero".to_owned(),
|
||||
));
|
||||
}
|
||||
ms => ms.min(MAX_WAIT_TIMEOUT_MS),
|
||||
ms => ms.clamp(MIN_WAIT_TIMEOUT_MS, MAX_WAIT_TIMEOUT_MS),
|
||||
};
|
||||
|
||||
session
|
||||
@@ -1012,7 +1016,7 @@ mod tests {
|
||||
"wait",
|
||||
function_payload(json!({
|
||||
"ids": [agent_id.to_string()],
|
||||
"timeout_ms": 10
|
||||
"timeout_ms": MIN_WAIT_TIMEOUT_MS
|
||||
})),
|
||||
);
|
||||
let output = CollabHandler
|
||||
@@ -1043,6 +1047,37 @@ mod tests {
|
||||
.expect("shutdown should submit");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn wait_clamps_short_timeouts_to_minimum() {
|
||||
let (mut session, turn) = make_session_and_context().await;
|
||||
let manager = thread_manager();
|
||||
session.services.agent_control = manager.agent_control();
|
||||
let config = turn.client.config().as_ref().clone();
|
||||
let thread = manager.start_thread(config).await.expect("start thread");
|
||||
let agent_id = thread.thread_id;
|
||||
let invocation = invocation(
|
||||
Arc::new(session),
|
||||
Arc::new(turn),
|
||||
"wait",
|
||||
function_payload(json!({
|
||||
"ids": [agent_id.to_string()],
|
||||
"timeout_ms": 10
|
||||
})),
|
||||
);
|
||||
|
||||
let early = timeout(Duration::from_millis(50), CollabHandler.handle(invocation)).await;
|
||||
assert!(
|
||||
early.is_err(),
|
||||
"wait should not return before the minimum timeout clamp"
|
||||
);
|
||||
|
||||
let _ = thread
|
||||
.thread
|
||||
.submit(Op::Shutdown {})
|
||||
.await
|
||||
.expect("shutdown should submit");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn wait_returns_final_status_without_timeout() {
|
||||
let (mut session, turn) = make_session_and_context().await;
|
||||
|
||||
@@ -8,6 +8,7 @@ use crate::tools::handlers::apply_patch::create_apply_patch_freeform_tool;
|
||||
use crate::tools::handlers::apply_patch::create_apply_patch_json_tool;
|
||||
use crate::tools::handlers::collab::DEFAULT_WAIT_TIMEOUT_MS;
|
||||
use crate::tools::handlers::collab::MAX_WAIT_TIMEOUT_MS;
|
||||
use crate::tools::handlers::collab::MIN_WAIT_TIMEOUT_MS;
|
||||
use crate::tools::registry::ToolRegistryBuilder;
|
||||
use codex_protocol::config_types::WebSearchMode;
|
||||
use codex_protocol::dynamic_tools::DynamicToolSpec;
|
||||
@@ -517,7 +518,7 @@ fn create_wait_tool() -> ToolSpec {
|
||||
"timeout_ms".to_string(),
|
||||
JsonSchema::Number {
|
||||
description: Some(format!(
|
||||
"Optional timeout in milliseconds. Defaults to {DEFAULT_WAIT_TIMEOUT_MS} and max {MAX_WAIT_TIMEOUT_MS}."
|
||||
"Optional timeout in milliseconds. Defaults to {DEFAULT_WAIT_TIMEOUT_MS}, min {MIN_WAIT_TIMEOUT_MS}, and max {MAX_WAIT_TIMEOUT_MS}. Avoid tight polling loops; prefer longer waits (seconds to minutes)."
|
||||
)),
|
||||
},
|
||||
);
|
||||
@@ -555,7 +556,7 @@ fn create_request_user_input_tool() -> ToolSpec {
|
||||
|
||||
let options_schema = JsonSchema::Array {
|
||||
description: Some(
|
||||
"Optional 2-3 mutually exclusive choices. Put the recommended option first and suffix its label with \"(Recommended)\". Only include \"Other\" option if we want to include a free form option. If the question is free form in nature, please do not have any option."
|
||||
"Optional 2-3 mutually exclusive choices. Put the recommended option first and suffix its label with \"(Recommended)\". Do not include an \"Other\" option in this list; use isOther on the question to request a free form choice. If the question is free form in nature, please do not have any option."
|
||||
.to_string(),
|
||||
),
|
||||
items: Box::new(JsonSchema::Object {
|
||||
@@ -586,6 +587,15 @@ fn create_request_user_input_tool() -> ToolSpec {
|
||||
description: Some("Single-sentence prompt shown to the user.".to_string()),
|
||||
},
|
||||
);
|
||||
question_props.insert(
|
||||
"isOther".to_string(),
|
||||
JsonSchema::Boolean {
|
||||
description: Some(
|
||||
"True when this question should include a free-form \"Other\" option. Otherwise false."
|
||||
.to_string(),
|
||||
),
|
||||
},
|
||||
);
|
||||
question_props.insert("options".to_string(), options_schema);
|
||||
|
||||
let questions_schema = JsonSchema::Array {
|
||||
@@ -596,6 +606,7 @@ fn create_request_user_input_tool() -> ToolSpec {
|
||||
"id".to_string(),
|
||||
"header".to_string(),
|
||||
"question".to_string(),
|
||||
"isOther".to_string(),
|
||||
]),
|
||||
additional_properties: Some(false.into()),
|
||||
}),
|
||||
|
||||
@@ -52,6 +52,7 @@ You are Codex Orchestrator, based on GPT-5. You are running as an orchestration
|
||||
* Workers must not revert, overwrite, or conflict with others’ work.
|
||||
* By default, workers must not spawn sub-agents unless explicitly allowed.
|
||||
* When multiple workers are active, you may pass multiple IDs to `wait` to react to the first completion and keep the workflow event-driven and use a long timeout (e.g. 5 minutes).
|
||||
* Do not busy-poll `wait` with very short timeouts. Prefer waits measured in seconds (or minutes) so the system is idle while workers run.
|
||||
|
||||
## Collab tools
|
||||
|
||||
|
||||
@@ -2,6 +2,10 @@
|
||||
|
||||
You work in 2 phases and you should *chat your way* to a great plan before finalizing it.
|
||||
|
||||
While in **Plan Mode**, you must not perform any mutating or execution actions. Once you enter Plan Mode, you remain there until you are **explicitly instructed otherwise**. Plan Mode may continue across multiple user messages unless a developer message ends it.
|
||||
|
||||
User intent, tone, or imperative language does **not** trigger a mode change. If a user asks for execution while you are still in Plan Mode, you must treat that request as a prompt to **plan the execution**, not to carry it out.
|
||||
|
||||
PHASE 1 — Intent chat (what they actually want)
|
||||
- Keep asking until you can clearly state: goal + success criteria, audience, in/out of scope, constraints, current state, and the key preferences/tradeoffs.
|
||||
- Bias toward questions over guessing: if any high‑impact ambiguity remains, do NOT plan yet—ask.
|
||||
|
||||
@@ -94,6 +94,7 @@ async fn request_user_input_round_trip_resolves_pending() -> anyhow::Result<()>
|
||||
"id": "confirm_path",
|
||||
"header": "Confirm",
|
||||
"question": "Proceed with the plan?",
|
||||
"isOther": false,
|
||||
"options": [{
|
||||
"label": "Yes (Recommended)",
|
||||
"description": "Continue the current plan."
|
||||
@@ -213,6 +214,7 @@ where
|
||||
"id": "confirm_path",
|
||||
"header": "Confirm",
|
||||
"question": "Proceed with the plan?",
|
||||
"isOther": false,
|
||||
"options": [{
|
||||
"label": "Yes (Recommended)",
|
||||
"description": "Continue the current plan."
|
||||
|
||||
@@ -75,7 +75,7 @@ For complete documentation of the `Op` and `EventMsg` variants, refer to [protoc
|
||||
- `EventMsg`
|
||||
- `EventMsg::AgentMessage` – Messages from the `Model`
|
||||
- `EventMsg::ExecApprovalRequest` – Request approval from user to execute a command
|
||||
- `EventMsg::RequestUserInput` – Request user input for a tool call
|
||||
- `EventMsg::RequestUserInput` – Request user input for a tool call (questions can include options plus `isOther` to add a free-form choice)
|
||||
- `EventMsg::TurnComplete` – A turn completed successfully
|
||||
- `EventMsg::Error` – A turn stopped with an error
|
||||
- `EventMsg::Warning` – A non-fatal warning that the client should surface to the user
|
||||
|
||||
@@ -3,7 +3,16 @@ use codex_common::elapsed::format_elapsed;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::protocol::AgentMessageEvent;
|
||||
use codex_core::protocol::AgentReasoningRawContentEvent;
|
||||
use codex_core::protocol::AgentStatus;
|
||||
use codex_core::protocol::BackgroundEventEvent;
|
||||
use codex_core::protocol::CollabAgentInteractionBeginEvent;
|
||||
use codex_core::protocol::CollabAgentInteractionEndEvent;
|
||||
use codex_core::protocol::CollabAgentSpawnBeginEvent;
|
||||
use codex_core::protocol::CollabAgentSpawnEndEvent;
|
||||
use codex_core::protocol::CollabCloseBeginEvent;
|
||||
use codex_core::protocol::CollabCloseEndEvent;
|
||||
use codex_core::protocol::CollabWaitingBeginEvent;
|
||||
use codex_core::protocol::CollabWaitingEndEvent;
|
||||
use codex_core::protocol::DeprecationNoticeEvent;
|
||||
use codex_core::protocol::ErrorEvent;
|
||||
use codex_core::protocol::Event;
|
||||
@@ -571,15 +580,161 @@ impl EventProcessor for EventProcessorWithHumanOutput {
|
||||
EventMsg::ContextCompacted(_) => {
|
||||
ts_msg!(self, "context compacted");
|
||||
}
|
||||
EventMsg::CollabAgentSpawnBegin(_)
|
||||
| EventMsg::CollabAgentSpawnEnd(_)
|
||||
| EventMsg::CollabAgentInteractionBegin(_)
|
||||
| EventMsg::CollabAgentInteractionEnd(_)
|
||||
| EventMsg::CollabWaitingBegin(_)
|
||||
| EventMsg::CollabWaitingEnd(_)
|
||||
| EventMsg::CollabCloseBegin(_)
|
||||
| EventMsg::CollabCloseEnd(_) => {
|
||||
// TODO(jif) handle collab tools.
|
||||
EventMsg::CollabAgentSpawnBegin(CollabAgentSpawnBeginEvent {
|
||||
call_id,
|
||||
sender_thread_id: _,
|
||||
prompt,
|
||||
}) => {
|
||||
ts_msg!(
|
||||
self,
|
||||
"{} {}",
|
||||
"collab".style(self.magenta),
|
||||
format_collab_invocation("spawn_agent", &call_id, Some(&prompt))
|
||||
.style(self.bold)
|
||||
);
|
||||
}
|
||||
EventMsg::CollabAgentSpawnEnd(CollabAgentSpawnEndEvent {
|
||||
call_id,
|
||||
sender_thread_id: _,
|
||||
new_thread_id,
|
||||
prompt,
|
||||
status,
|
||||
}) => {
|
||||
let success = new_thread_id.is_some() && !is_collab_status_failure(&status);
|
||||
let title_style = if success { self.green } else { self.red };
|
||||
let title = format!(
|
||||
"{} {}:",
|
||||
format_collab_invocation("spawn_agent", &call_id, Some(&prompt)),
|
||||
format_collab_status(&status)
|
||||
);
|
||||
ts_msg!(self, "{}", title.style(title_style));
|
||||
if let Some(new_thread_id) = new_thread_id {
|
||||
eprintln!(" agent: {}", new_thread_id.to_string().style(self.dimmed));
|
||||
}
|
||||
}
|
||||
EventMsg::CollabAgentInteractionBegin(CollabAgentInteractionBeginEvent {
|
||||
call_id,
|
||||
sender_thread_id: _,
|
||||
receiver_thread_id,
|
||||
prompt,
|
||||
}) => {
|
||||
ts_msg!(
|
||||
self,
|
||||
"{} {}",
|
||||
"collab".style(self.magenta),
|
||||
format_collab_invocation("send_input", &call_id, Some(&prompt))
|
||||
.style(self.bold)
|
||||
);
|
||||
eprintln!(
|
||||
" receiver: {}",
|
||||
receiver_thread_id.to_string().style(self.dimmed)
|
||||
);
|
||||
}
|
||||
EventMsg::CollabAgentInteractionEnd(CollabAgentInteractionEndEvent {
|
||||
call_id,
|
||||
sender_thread_id: _,
|
||||
receiver_thread_id,
|
||||
prompt,
|
||||
status,
|
||||
}) => {
|
||||
let success = !is_collab_status_failure(&status);
|
||||
let title_style = if success { self.green } else { self.red };
|
||||
let title = format!(
|
||||
"{} {}:",
|
||||
format_collab_invocation("send_input", &call_id, Some(&prompt)),
|
||||
format_collab_status(&status)
|
||||
);
|
||||
ts_msg!(self, "{}", title.style(title_style));
|
||||
eprintln!(
|
||||
" receiver: {}",
|
||||
receiver_thread_id.to_string().style(self.dimmed)
|
||||
);
|
||||
}
|
||||
EventMsg::CollabWaitingBegin(CollabWaitingBeginEvent {
|
||||
sender_thread_id: _,
|
||||
receiver_thread_ids,
|
||||
call_id,
|
||||
}) => {
|
||||
ts_msg!(
|
||||
self,
|
||||
"{} {}",
|
||||
"collab".style(self.magenta),
|
||||
format_collab_invocation("wait", &call_id, None).style(self.bold)
|
||||
);
|
||||
eprintln!(
|
||||
" receivers: {}",
|
||||
format_receiver_list(&receiver_thread_ids).style(self.dimmed)
|
||||
);
|
||||
}
|
||||
EventMsg::CollabWaitingEnd(CollabWaitingEndEvent {
|
||||
sender_thread_id: _,
|
||||
call_id,
|
||||
statuses,
|
||||
}) => {
|
||||
if statuses.is_empty() {
|
||||
ts_msg!(
|
||||
self,
|
||||
"{} {}:",
|
||||
format_collab_invocation("wait", &call_id, None),
|
||||
"timed out".style(self.yellow)
|
||||
);
|
||||
return CodexStatus::Running;
|
||||
}
|
||||
let success = !statuses.values().any(is_collab_status_failure);
|
||||
let title_style = if success { self.green } else { self.red };
|
||||
let title = format!(
|
||||
"{} {} agents complete:",
|
||||
format_collab_invocation("wait", &call_id, None),
|
||||
statuses.len()
|
||||
);
|
||||
ts_msg!(self, "{}", title.style(title_style));
|
||||
let mut sorted = statuses
|
||||
.into_iter()
|
||||
.map(|(thread_id, status)| (thread_id.to_string(), status))
|
||||
.collect::<Vec<_>>();
|
||||
sorted.sort_by(|(left, _), (right, _)| left.cmp(right));
|
||||
for (thread_id, status) in sorted {
|
||||
eprintln!(
|
||||
" {} {}",
|
||||
thread_id.style(self.dimmed),
|
||||
format_collab_status(&status).style(style_for_agent_status(&status, self))
|
||||
);
|
||||
}
|
||||
}
|
||||
EventMsg::CollabCloseBegin(CollabCloseBeginEvent {
|
||||
call_id,
|
||||
sender_thread_id: _,
|
||||
receiver_thread_id,
|
||||
}) => {
|
||||
ts_msg!(
|
||||
self,
|
||||
"{} {}",
|
||||
"collab".style(self.magenta),
|
||||
format_collab_invocation("close_agent", &call_id, None).style(self.bold)
|
||||
);
|
||||
eprintln!(
|
||||
" receiver: {}",
|
||||
receiver_thread_id.to_string().style(self.dimmed)
|
||||
);
|
||||
}
|
||||
EventMsg::CollabCloseEnd(CollabCloseEndEvent {
|
||||
call_id,
|
||||
sender_thread_id: _,
|
||||
receiver_thread_id,
|
||||
status,
|
||||
}) => {
|
||||
let success = !is_collab_status_failure(&status);
|
||||
let title_style = if success { self.green } else { self.red };
|
||||
let title = format!(
|
||||
"{} {}:",
|
||||
format_collab_invocation("close_agent", &call_id, None),
|
||||
format_collab_status(&status)
|
||||
);
|
||||
ts_msg!(self, "{}", title.style(title_style));
|
||||
eprintln!(
|
||||
" receiver: {}",
|
||||
receiver_thread_id.to_string().style(self.dimmed)
|
||||
);
|
||||
}
|
||||
EventMsg::ShutdownComplete => return CodexStatus::Shutdown,
|
||||
EventMsg::WebSearchBegin(_)
|
||||
@@ -654,6 +809,78 @@ fn format_file_change(change: &FileChange) -> &'static str {
|
||||
}
|
||||
}
|
||||
|
||||
fn format_collab_invocation(tool: &str, call_id: &str, prompt: Option<&str>) -> String {
|
||||
let prompt = prompt
|
||||
.map(str::trim)
|
||||
.filter(|prompt| !prompt.is_empty())
|
||||
.map(|prompt| truncate_preview(prompt, 120));
|
||||
match prompt {
|
||||
Some(prompt) => format!("{tool}({call_id}, prompt=\"{prompt}\")"),
|
||||
None => format!("{tool}({call_id})"),
|
||||
}
|
||||
}
|
||||
|
||||
fn format_collab_status(status: &AgentStatus) -> String {
|
||||
match status {
|
||||
AgentStatus::PendingInit => "pending init".to_string(),
|
||||
AgentStatus::Running => "running".to_string(),
|
||||
AgentStatus::Completed(Some(message)) => {
|
||||
let preview = truncate_preview(message.trim(), 120);
|
||||
if preview.is_empty() {
|
||||
"completed".to_string()
|
||||
} else {
|
||||
format!("completed: \"{preview}\"")
|
||||
}
|
||||
}
|
||||
AgentStatus::Completed(None) => "completed".to_string(),
|
||||
AgentStatus::Errored(message) => {
|
||||
let preview = truncate_preview(message.trim(), 120);
|
||||
if preview.is_empty() {
|
||||
"errored".to_string()
|
||||
} else {
|
||||
format!("errored: \"{preview}\"")
|
||||
}
|
||||
}
|
||||
AgentStatus::Shutdown => "shutdown".to_string(),
|
||||
AgentStatus::NotFound => "not found".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
fn style_for_agent_status(
|
||||
status: &AgentStatus,
|
||||
processor: &EventProcessorWithHumanOutput,
|
||||
) -> Style {
|
||||
match status {
|
||||
AgentStatus::PendingInit | AgentStatus::Shutdown => processor.dimmed,
|
||||
AgentStatus::Running => processor.cyan,
|
||||
AgentStatus::Completed(_) => processor.green,
|
||||
AgentStatus::Errored(_) | AgentStatus::NotFound => processor.red,
|
||||
}
|
||||
}
|
||||
|
||||
fn is_collab_status_failure(status: &AgentStatus) -> bool {
|
||||
matches!(status, AgentStatus::Errored(_) | AgentStatus::NotFound)
|
||||
}
|
||||
|
||||
fn format_receiver_list(ids: &[codex_protocol::ThreadId]) -> String {
|
||||
if ids.is_empty() {
|
||||
return "none".to_string();
|
||||
}
|
||||
ids.iter()
|
||||
.map(ToString::to_string)
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ")
|
||||
}
|
||||
|
||||
fn truncate_preview(text: &str, max_chars: usize) -> String {
|
||||
if text.chars().count() <= max_chars {
|
||||
return text.to_string();
|
||||
}
|
||||
|
||||
let preview = text.chars().take(max_chars).collect::<String>();
|
||||
format!("{preview}…")
|
||||
}
|
||||
|
||||
fn format_mcp_invocation(invocation: &McpInvocation) -> String {
|
||||
// Build fully-qualified tool name: server.tool
|
||||
let fq_tool_name = format!("{}.{}", invocation.server, invocation.tool);
|
||||
|
||||
@@ -6,6 +6,11 @@ use crate::event_processor::CodexStatus;
|
||||
use crate::event_processor::EventProcessor;
|
||||
use crate::event_processor::handle_last_message;
|
||||
use crate::exec_events::AgentMessageItem;
|
||||
use crate::exec_events::CollabAgentState;
|
||||
use crate::exec_events::CollabAgentStatus;
|
||||
use crate::exec_events::CollabTool;
|
||||
use crate::exec_events::CollabToolCallItem;
|
||||
use crate::exec_events::CollabToolCallStatus;
|
||||
use crate::exec_events::CommandExecutionItem;
|
||||
use crate::exec_events::CommandExecutionStatus;
|
||||
use crate::exec_events::ErrorItem;
|
||||
@@ -35,6 +40,15 @@ use crate::exec_events::Usage;
|
||||
use crate::exec_events::WebSearchItem;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::protocol;
|
||||
use codex_core::protocol::AgentStatus as CoreAgentStatus;
|
||||
use codex_core::protocol::CollabAgentInteractionBeginEvent;
|
||||
use codex_core::protocol::CollabAgentInteractionEndEvent;
|
||||
use codex_core::protocol::CollabAgentSpawnBeginEvent;
|
||||
use codex_core::protocol::CollabAgentSpawnEndEvent;
|
||||
use codex_core::protocol::CollabCloseBeginEvent;
|
||||
use codex_core::protocol::CollabCloseEndEvent;
|
||||
use codex_core::protocol::CollabWaitingBeginEvent;
|
||||
use codex_core::protocol::CollabWaitingEndEvent;
|
||||
use codex_protocol::plan_tool::StepStatus;
|
||||
use codex_protocol::plan_tool::UpdatePlanArgs;
|
||||
use serde_json::Value as JsonValue;
|
||||
@@ -51,6 +65,7 @@ pub struct EventProcessorWithJsonOutput {
|
||||
running_todo_list: Option<RunningTodoList>,
|
||||
last_total_token_usage: Option<codex_core::protocol::TokenUsage>,
|
||||
running_mcp_tool_calls: HashMap<String, RunningMcpToolCall>,
|
||||
running_collab_tool_calls: HashMap<String, RunningCollabToolCall>,
|
||||
last_critical_error: Option<ThreadErrorEvent>,
|
||||
}
|
||||
|
||||
@@ -75,6 +90,12 @@ struct RunningMcpToolCall {
|
||||
arguments: JsonValue,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct RunningCollabToolCall {
|
||||
tool: CollabTool,
|
||||
item_id: String,
|
||||
}
|
||||
|
||||
impl EventProcessorWithJsonOutput {
|
||||
pub fn new(last_message_path: Option<PathBuf>) -> Self {
|
||||
Self {
|
||||
@@ -85,6 +106,7 @@ impl EventProcessorWithJsonOutput {
|
||||
running_todo_list: None,
|
||||
last_total_token_usage: None,
|
||||
running_mcp_tool_calls: HashMap::new(),
|
||||
running_collab_tool_calls: HashMap::new(),
|
||||
last_critical_error: None,
|
||||
}
|
||||
}
|
||||
@@ -102,6 +124,18 @@ impl EventProcessorWithJsonOutput {
|
||||
}
|
||||
protocol::EventMsg::McpToolCallBegin(ev) => self.handle_mcp_tool_call_begin(ev),
|
||||
protocol::EventMsg::McpToolCallEnd(ev) => self.handle_mcp_tool_call_end(ev),
|
||||
protocol::EventMsg::CollabAgentSpawnBegin(ev) => self.handle_collab_spawn_begin(ev),
|
||||
protocol::EventMsg::CollabAgentSpawnEnd(ev) => self.handle_collab_spawn_end(ev),
|
||||
protocol::EventMsg::CollabAgentInteractionBegin(ev) => {
|
||||
self.handle_collab_interaction_begin(ev)
|
||||
}
|
||||
protocol::EventMsg::CollabAgentInteractionEnd(ev) => {
|
||||
self.handle_collab_interaction_end(ev)
|
||||
}
|
||||
protocol::EventMsg::CollabWaitingBegin(ev) => self.handle_collab_wait_begin(ev),
|
||||
protocol::EventMsg::CollabWaitingEnd(ev) => self.handle_collab_wait_end(ev),
|
||||
protocol::EventMsg::CollabCloseBegin(ev) => self.handle_collab_close_begin(ev),
|
||||
protocol::EventMsg::CollabCloseEnd(ev) => self.handle_collab_close_end(ev),
|
||||
protocol::EventMsg::PatchApplyBegin(ev) => self.handle_patch_apply_begin(ev),
|
||||
protocol::EventMsg::PatchApplyEnd(ev) => self.handle_patch_apply_end(ev),
|
||||
protocol::EventMsg::WebSearchBegin(_) => Vec::new(),
|
||||
@@ -341,6 +375,219 @@ impl EventProcessorWithJsonOutput {
|
||||
vec![ThreadEvent::ItemCompleted(ItemCompletedEvent { item })]
|
||||
}
|
||||
|
||||
fn handle_collab_spawn_begin(&mut self, ev: &CollabAgentSpawnBeginEvent) -> Vec<ThreadEvent> {
|
||||
self.start_collab_tool_call(
|
||||
&ev.call_id,
|
||||
CollabTool::SpawnAgent,
|
||||
ev.sender_thread_id.to_string(),
|
||||
Vec::new(),
|
||||
Some(ev.prompt.clone()),
|
||||
)
|
||||
}
|
||||
|
||||
fn handle_collab_spawn_end(&mut self, ev: &CollabAgentSpawnEndEvent) -> Vec<ThreadEvent> {
|
||||
let (receiver_thread_ids, agents_states) = match ev.new_thread_id {
|
||||
Some(id) => {
|
||||
let receiver_id = id.to_string();
|
||||
let agent_state = CollabAgentState::from(ev.status.clone());
|
||||
(
|
||||
vec![receiver_id.clone()],
|
||||
[(receiver_id, agent_state)].into_iter().collect(),
|
||||
)
|
||||
}
|
||||
None => (Vec::new(), HashMap::new()),
|
||||
};
|
||||
let status = if ev.new_thread_id.is_some() && !is_collab_failure(&ev.status) {
|
||||
CollabToolCallStatus::Completed
|
||||
} else {
|
||||
CollabToolCallStatus::Failed
|
||||
};
|
||||
self.finish_collab_tool_call(
|
||||
&ev.call_id,
|
||||
CollabTool::SpawnAgent,
|
||||
ev.sender_thread_id.to_string(),
|
||||
receiver_thread_ids,
|
||||
Some(ev.prompt.clone()),
|
||||
agents_states,
|
||||
status,
|
||||
)
|
||||
}
|
||||
|
||||
fn handle_collab_interaction_begin(
|
||||
&mut self,
|
||||
ev: &CollabAgentInteractionBeginEvent,
|
||||
) -> Vec<ThreadEvent> {
|
||||
self.start_collab_tool_call(
|
||||
&ev.call_id,
|
||||
CollabTool::SendInput,
|
||||
ev.sender_thread_id.to_string(),
|
||||
vec![ev.receiver_thread_id.to_string()],
|
||||
Some(ev.prompt.clone()),
|
||||
)
|
||||
}
|
||||
|
||||
fn handle_collab_interaction_end(
|
||||
&mut self,
|
||||
ev: &CollabAgentInteractionEndEvent,
|
||||
) -> Vec<ThreadEvent> {
|
||||
let receiver_id = ev.receiver_thread_id.to_string();
|
||||
let agent_state = CollabAgentState::from(ev.status.clone());
|
||||
let status = if is_collab_failure(&ev.status) {
|
||||
CollabToolCallStatus::Failed
|
||||
} else {
|
||||
CollabToolCallStatus::Completed
|
||||
};
|
||||
self.finish_collab_tool_call(
|
||||
&ev.call_id,
|
||||
CollabTool::SendInput,
|
||||
ev.sender_thread_id.to_string(),
|
||||
vec![receiver_id.clone()],
|
||||
Some(ev.prompt.clone()),
|
||||
[(receiver_id, agent_state)].into_iter().collect(),
|
||||
status,
|
||||
)
|
||||
}
|
||||
|
||||
fn handle_collab_wait_begin(&mut self, ev: &CollabWaitingBeginEvent) -> Vec<ThreadEvent> {
|
||||
self.start_collab_tool_call(
|
||||
&ev.call_id,
|
||||
CollabTool::Wait,
|
||||
ev.sender_thread_id.to_string(),
|
||||
ev.receiver_thread_ids
|
||||
.iter()
|
||||
.map(ToString::to_string)
|
||||
.collect(),
|
||||
None,
|
||||
)
|
||||
}
|
||||
|
||||
fn handle_collab_wait_end(&mut self, ev: &CollabWaitingEndEvent) -> Vec<ThreadEvent> {
|
||||
let status = if ev.statuses.values().any(is_collab_failure) {
|
||||
CollabToolCallStatus::Failed
|
||||
} else {
|
||||
CollabToolCallStatus::Completed
|
||||
};
|
||||
let mut receiver_thread_ids = ev
|
||||
.statuses
|
||||
.keys()
|
||||
.map(ToString::to_string)
|
||||
.collect::<Vec<_>>();
|
||||
receiver_thread_ids.sort();
|
||||
let agents_states = ev
|
||||
.statuses
|
||||
.iter()
|
||||
.map(|(thread_id, status)| {
|
||||
(
|
||||
thread_id.to_string(),
|
||||
CollabAgentState::from(status.clone()),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
self.finish_collab_tool_call(
|
||||
&ev.call_id,
|
||||
CollabTool::Wait,
|
||||
ev.sender_thread_id.to_string(),
|
||||
receiver_thread_ids,
|
||||
None,
|
||||
agents_states,
|
||||
status,
|
||||
)
|
||||
}
|
||||
|
||||
fn handle_collab_close_begin(&mut self, ev: &CollabCloseBeginEvent) -> Vec<ThreadEvent> {
|
||||
self.start_collab_tool_call(
|
||||
&ev.call_id,
|
||||
CollabTool::CloseAgent,
|
||||
ev.sender_thread_id.to_string(),
|
||||
vec![ev.receiver_thread_id.to_string()],
|
||||
None,
|
||||
)
|
||||
}
|
||||
|
||||
fn handle_collab_close_end(&mut self, ev: &CollabCloseEndEvent) -> Vec<ThreadEvent> {
|
||||
let receiver_id = ev.receiver_thread_id.to_string();
|
||||
let agent_state = CollabAgentState::from(ev.status.clone());
|
||||
let status = if is_collab_failure(&ev.status) {
|
||||
CollabToolCallStatus::Failed
|
||||
} else {
|
||||
CollabToolCallStatus::Completed
|
||||
};
|
||||
self.finish_collab_tool_call(
|
||||
&ev.call_id,
|
||||
CollabTool::CloseAgent,
|
||||
ev.sender_thread_id.to_string(),
|
||||
vec![receiver_id.clone()],
|
||||
None,
|
||||
[(receiver_id, agent_state)].into_iter().collect(),
|
||||
status,
|
||||
)
|
||||
}
|
||||
|
||||
fn start_collab_tool_call(
|
||||
&mut self,
|
||||
call_id: &str,
|
||||
tool: CollabTool,
|
||||
sender_thread_id: String,
|
||||
receiver_thread_ids: Vec<String>,
|
||||
prompt: Option<String>,
|
||||
) -> Vec<ThreadEvent> {
|
||||
let item_id = self.get_next_item_id();
|
||||
self.running_collab_tool_calls.insert(
|
||||
call_id.to_string(),
|
||||
RunningCollabToolCall {
|
||||
tool: tool.clone(),
|
||||
item_id: item_id.clone(),
|
||||
},
|
||||
);
|
||||
let item = ThreadItem {
|
||||
id: item_id,
|
||||
details: ThreadItemDetails::CollabToolCall(CollabToolCallItem {
|
||||
tool,
|
||||
sender_thread_id,
|
||||
receiver_thread_ids,
|
||||
prompt,
|
||||
agents_states: HashMap::new(),
|
||||
status: CollabToolCallStatus::InProgress,
|
||||
}),
|
||||
};
|
||||
vec![ThreadEvent::ItemStarted(ItemStartedEvent { item })]
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn finish_collab_tool_call(
|
||||
&mut self,
|
||||
call_id: &str,
|
||||
tool: CollabTool,
|
||||
sender_thread_id: String,
|
||||
receiver_thread_ids: Vec<String>,
|
||||
prompt: Option<String>,
|
||||
agents_states: HashMap<String, CollabAgentState>,
|
||||
status: CollabToolCallStatus,
|
||||
) -> Vec<ThreadEvent> {
|
||||
let (tool, item_id) = match self.running_collab_tool_calls.remove(call_id) {
|
||||
Some(running) => (running.tool, running.item_id),
|
||||
None => {
|
||||
warn!(
|
||||
call_id,
|
||||
"Received collab tool end without begin; synthesizing new item"
|
||||
);
|
||||
(tool, self.get_next_item_id())
|
||||
}
|
||||
};
|
||||
let item = ThreadItem {
|
||||
id: item_id,
|
||||
details: ThreadItemDetails::CollabToolCall(CollabToolCallItem {
|
||||
tool,
|
||||
sender_thread_id,
|
||||
receiver_thread_ids,
|
||||
prompt,
|
||||
agents_states,
|
||||
status,
|
||||
}),
|
||||
};
|
||||
vec![ThreadEvent::ItemCompleted(ItemCompletedEvent { item })]
|
||||
}
|
||||
|
||||
fn handle_patch_apply_begin(
|
||||
&mut self,
|
||||
ev: &protocol::PatchApplyBeginEvent,
|
||||
@@ -512,6 +759,44 @@ impl EventProcessorWithJsonOutput {
|
||||
}
|
||||
}
|
||||
|
||||
fn is_collab_failure(status: &CoreAgentStatus) -> bool {
|
||||
matches!(
|
||||
status,
|
||||
CoreAgentStatus::Errored(_) | CoreAgentStatus::NotFound
|
||||
)
|
||||
}
|
||||
|
||||
impl From<CoreAgentStatus> for CollabAgentState {
|
||||
fn from(value: CoreAgentStatus) -> Self {
|
||||
match value {
|
||||
CoreAgentStatus::PendingInit => Self {
|
||||
status: CollabAgentStatus::PendingInit,
|
||||
message: None,
|
||||
},
|
||||
CoreAgentStatus::Running => Self {
|
||||
status: CollabAgentStatus::Running,
|
||||
message: None,
|
||||
},
|
||||
CoreAgentStatus::Completed(message) => Self {
|
||||
status: CollabAgentStatus::Completed,
|
||||
message,
|
||||
},
|
||||
CoreAgentStatus::Errored(message) => Self {
|
||||
status: CollabAgentStatus::Errored,
|
||||
message: Some(message),
|
||||
},
|
||||
CoreAgentStatus::Shutdown => Self {
|
||||
status: CollabAgentStatus::Shutdown,
|
||||
message: None,
|
||||
},
|
||||
CoreAgentStatus::NotFound => Self {
|
||||
status: CollabAgentStatus::NotFound,
|
||||
message: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl EventProcessor for EventProcessorWithJsonOutput {
|
||||
fn print_config_summary(&mut self, _: &Config, _: &str, ev: &protocol::SessionConfiguredEvent) {
|
||||
self.process_event(protocol::Event {
|
||||
|
||||
@@ -2,6 +2,7 @@ use mcp_types::ContentBlock as McpContentBlock;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use serde_json::Value as JsonValue;
|
||||
use std::collections::HashMap;
|
||||
use ts_rs::TS;
|
||||
|
||||
/// Top-level JSONL events emitted by codex exec
|
||||
@@ -113,6 +114,9 @@ pub enum ThreadItemDetails {
|
||||
/// Represents a call to an MCP tool. The item starts when the invocation is
|
||||
/// dispatched and completes when the MCP server reports success or failure.
|
||||
McpToolCall(McpToolCallItem),
|
||||
/// Represents a call to a collab tool. The item starts when the collab tool is
|
||||
/// invoked and completes when the collab tool reports success or failure.
|
||||
CollabToolCall(CollabToolCallItem),
|
||||
/// Captures a web search request. It starts when the search is kicked off
|
||||
/// and completes when results are returned to the agent.
|
||||
WebSearch(WebSearchItem),
|
||||
@@ -198,6 +202,56 @@ pub enum McpToolCallStatus {
|
||||
Failed,
|
||||
}
|
||||
|
||||
/// The status of a collab tool call.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default, TS)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum CollabToolCallStatus {
|
||||
#[default]
|
||||
InProgress,
|
||||
Completed,
|
||||
Failed,
|
||||
}
|
||||
|
||||
/// Supported collab tools.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum CollabTool {
|
||||
SpawnAgent,
|
||||
SendInput,
|
||||
Wait,
|
||||
CloseAgent,
|
||||
}
|
||||
|
||||
/// The status of a collab agent.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum CollabAgentStatus {
|
||||
PendingInit,
|
||||
Running,
|
||||
Completed,
|
||||
Errored,
|
||||
Shutdown,
|
||||
NotFound,
|
||||
}
|
||||
|
||||
/// Last known state of a collab agent.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)]
|
||||
pub struct CollabAgentState {
|
||||
pub status: CollabAgentStatus,
|
||||
pub message: Option<String>,
|
||||
}
|
||||
|
||||
/// A call to a collab tool.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS)]
|
||||
pub struct CollabToolCallItem {
|
||||
pub tool: CollabTool,
|
||||
pub sender_thread_id: String,
|
||||
pub receiver_thread_ids: Vec<String>,
|
||||
pub prompt: Option<String>,
|
||||
pub agents_states: HashMap<String, CollabAgentState>,
|
||||
pub status: CollabToolCallStatus,
|
||||
}
|
||||
|
||||
/// Result payload produced by an MCP tool invocation.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS)]
|
||||
pub struct McpToolCallItemResult {
|
||||
|
||||
@@ -1,6 +1,10 @@
|
||||
use codex_core::protocol::AgentMessageEvent;
|
||||
use codex_core::protocol::AgentReasoningEvent;
|
||||
use codex_core::protocol::AgentStatus;
|
||||
use codex_core::protocol::AskForApproval;
|
||||
use codex_core::protocol::CollabAgentSpawnBeginEvent;
|
||||
use codex_core::protocol::CollabAgentSpawnEndEvent;
|
||||
use codex_core::protocol::CollabWaitingEndEvent;
|
||||
use codex_core::protocol::ErrorEvent;
|
||||
use codex_core::protocol::Event;
|
||||
use codex_core::protocol::EventMsg;
|
||||
@@ -19,6 +23,11 @@ use codex_core::protocol::WarningEvent;
|
||||
use codex_core::protocol::WebSearchEndEvent;
|
||||
use codex_exec::event_processor_with_jsonl_output::EventProcessorWithJsonOutput;
|
||||
use codex_exec::exec_events::AgentMessageItem;
|
||||
use codex_exec::exec_events::CollabAgentState;
|
||||
use codex_exec::exec_events::CollabAgentStatus;
|
||||
use codex_exec::exec_events::CollabTool;
|
||||
use codex_exec::exec_events::CollabToolCallItem;
|
||||
use codex_exec::exec_events::CollabToolCallStatus;
|
||||
use codex_exec::exec_events::CommandExecutionItem;
|
||||
use codex_exec::exec_events::CommandExecutionStatus;
|
||||
use codex_exec::exec_events::ErrorItem;
|
||||
@@ -44,6 +53,7 @@ use codex_exec::exec_events::TurnFailedEvent;
|
||||
use codex_exec::exec_events::TurnStartedEvent;
|
||||
use codex_exec::exec_events::Usage;
|
||||
use codex_exec::exec_events::WebSearchItem;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::plan_tool::PlanItemArg;
|
||||
use codex_protocol::plan_tool::StepStatus;
|
||||
use codex_protocol::plan_tool::UpdatePlanArgs;
|
||||
@@ -444,6 +454,135 @@ fn mcp_tool_call_defaults_arguments_and_preserves_structured_content() {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn collab_spawn_begin_and_end_emit_item_events() {
|
||||
let mut ep = EventProcessorWithJsonOutput::new(None);
|
||||
let sender_thread_id = ThreadId::from_string("67e55044-10b1-426f-9247-bb680e5fe0c8").unwrap();
|
||||
let new_thread_id = ThreadId::from_string("9e107d9d-372b-4b8c-a2a4-1d9bb3fce0c1").unwrap();
|
||||
let prompt = "draft a plan".to_string();
|
||||
|
||||
let begin = event(
|
||||
"c1",
|
||||
EventMsg::CollabAgentSpawnBegin(CollabAgentSpawnBeginEvent {
|
||||
call_id: "call-10".to_string(),
|
||||
sender_thread_id,
|
||||
prompt: prompt.clone(),
|
||||
}),
|
||||
);
|
||||
let begin_events = ep.collect_thread_events(&begin);
|
||||
assert_eq!(
|
||||
begin_events,
|
||||
vec![ThreadEvent::ItemStarted(ItemStartedEvent {
|
||||
item: ThreadItem {
|
||||
id: "item_0".to_string(),
|
||||
details: ThreadItemDetails::CollabToolCall(CollabToolCallItem {
|
||||
tool: CollabTool::SpawnAgent,
|
||||
sender_thread_id: sender_thread_id.to_string(),
|
||||
receiver_thread_ids: Vec::new(),
|
||||
prompt: Some(prompt.clone()),
|
||||
agents_states: std::collections::HashMap::new(),
|
||||
status: CollabToolCallStatus::InProgress,
|
||||
}),
|
||||
},
|
||||
})]
|
||||
);
|
||||
|
||||
let end = event(
|
||||
"c2",
|
||||
EventMsg::CollabAgentSpawnEnd(CollabAgentSpawnEndEvent {
|
||||
call_id: "call-10".to_string(),
|
||||
sender_thread_id,
|
||||
new_thread_id: Some(new_thread_id),
|
||||
prompt: prompt.clone(),
|
||||
status: AgentStatus::Running,
|
||||
}),
|
||||
);
|
||||
let end_events = ep.collect_thread_events(&end);
|
||||
assert_eq!(
|
||||
end_events,
|
||||
vec![ThreadEvent::ItemCompleted(ItemCompletedEvent {
|
||||
item: ThreadItem {
|
||||
id: "item_0".to_string(),
|
||||
details: ThreadItemDetails::CollabToolCall(CollabToolCallItem {
|
||||
tool: CollabTool::SpawnAgent,
|
||||
sender_thread_id: sender_thread_id.to_string(),
|
||||
receiver_thread_ids: vec![new_thread_id.to_string()],
|
||||
prompt: Some(prompt),
|
||||
agents_states: [(
|
||||
new_thread_id.to_string(),
|
||||
CollabAgentState {
|
||||
status: CollabAgentStatus::Running,
|
||||
message: None,
|
||||
},
|
||||
)]
|
||||
.into_iter()
|
||||
.collect(),
|
||||
status: CollabToolCallStatus::Completed,
|
||||
}),
|
||||
},
|
||||
})]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn collab_wait_end_without_begin_synthesizes_failed_item() {
|
||||
let mut ep = EventProcessorWithJsonOutput::new(None);
|
||||
let sender_thread_id = ThreadId::from_string("67e55044-10b1-426f-9247-bb680e5fe0c8").unwrap();
|
||||
let running_thread_id = ThreadId::from_string("3f76d2a0-943e-4f43-8a38-b289c9c6c3d1").unwrap();
|
||||
let failed_thread_id = ThreadId::from_string("c1dfd96e-1f0c-4f26-9b4f-1aa02c2d3c4d").unwrap();
|
||||
let mut receiver_thread_ids = vec![running_thread_id.to_string(), failed_thread_id.to_string()];
|
||||
receiver_thread_ids.sort();
|
||||
let mut statuses = std::collections::HashMap::new();
|
||||
statuses.insert(
|
||||
running_thread_id,
|
||||
AgentStatus::Completed(Some("done".to_string())),
|
||||
);
|
||||
statuses.insert(failed_thread_id, AgentStatus::Errored("boom".to_string()));
|
||||
|
||||
let end = event(
|
||||
"c3",
|
||||
EventMsg::CollabWaitingEnd(CollabWaitingEndEvent {
|
||||
sender_thread_id,
|
||||
call_id: "call-11".to_string(),
|
||||
statuses: statuses.clone(),
|
||||
}),
|
||||
);
|
||||
let events = ep.collect_thread_events(&end);
|
||||
assert_eq!(
|
||||
events,
|
||||
vec![ThreadEvent::ItemCompleted(ItemCompletedEvent {
|
||||
item: ThreadItem {
|
||||
id: "item_0".to_string(),
|
||||
details: ThreadItemDetails::CollabToolCall(CollabToolCallItem {
|
||||
tool: CollabTool::Wait,
|
||||
sender_thread_id: sender_thread_id.to_string(),
|
||||
receiver_thread_ids,
|
||||
prompt: None,
|
||||
agents_states: [
|
||||
(
|
||||
running_thread_id.to_string(),
|
||||
CollabAgentState {
|
||||
status: CollabAgentStatus::Completed,
|
||||
message: Some("done".to_string()),
|
||||
},
|
||||
),
|
||||
(
|
||||
failed_thread_id.to_string(),
|
||||
CollabAgentState {
|
||||
status: CollabAgentStatus::Errored,
|
||||
message: Some("boom".to_string()),
|
||||
},
|
||||
),
|
||||
]
|
||||
.into_iter()
|
||||
.collect(),
|
||||
status: CollabToolCallStatus::Failed,
|
||||
}),
|
||||
},
|
||||
})]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn plan_update_after_complete_starts_new_todo_list_with_new_id() {
|
||||
let mut ep = EventProcessorWithJsonOutput::new(None);
|
||||
|
||||
@@ -16,6 +16,10 @@ pub struct RequestUserInputQuestion {
|
||||
pub id: String,
|
||||
pub header: String,
|
||||
pub question: String,
|
||||
#[serde(rename = "isOther", default)]
|
||||
#[schemars(rename = "isOther")]
|
||||
#[ts(rename = "isOther")]
|
||||
pub is_other: bool,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub options: Option<Vec<RequestUserInputQuestionOption>>,
|
||||
}
|
||||
|
||||
@@ -201,7 +201,7 @@ fn emit_project_config_warnings(app_event_tx: &AppEventSender, config: &Config)
|
||||
.disabled_reason
|
||||
.as_ref()
|
||||
.map(ToString::to_string)
|
||||
.unwrap_or_else(|| "Config folder disabled.".to_string()),
|
||||
.unwrap_or_else(|| "config.toml is disabled.".to_string()),
|
||||
));
|
||||
}
|
||||
|
||||
@@ -209,7 +209,11 @@ fn emit_project_config_warnings(app_event_tx: &AppEventSender, config: &Config)
|
||||
return;
|
||||
}
|
||||
|
||||
let mut message = "The following config folders are disabled:\n".to_string();
|
||||
let mut message = concat!(
|
||||
"Project config.toml files are disabled in the following folders. ",
|
||||
"Settings in those files are ignored, but skills and exec policies still load.\n",
|
||||
)
|
||||
.to_string();
|
||||
for (index, (folder, reason)) in disabled_folders.iter().enumerate() {
|
||||
let display_index = index + 1;
|
||||
message.push_str(&format!(" {display_index}. {folder}\n"));
|
||||
|
||||
@@ -508,6 +508,7 @@ mod tests {
|
||||
id: id.to_string(),
|
||||
header: header.to_string(),
|
||||
question: "Choose an option.".to_string(),
|
||||
is_other: false,
|
||||
options: Some(vec![
|
||||
RequestUserInputQuestionOption {
|
||||
label: "Option 1".to_string(),
|
||||
@@ -530,6 +531,7 @@ mod tests {
|
||||
id: id.to_string(),
|
||||
header: header.to_string(),
|
||||
question: "Share details.".to_string(),
|
||||
is_other: false,
|
||||
options: None,
|
||||
}
|
||||
}
|
||||
@@ -696,6 +698,7 @@ mod tests {
|
||||
id: "q1".to_string(),
|
||||
header: "Next Step".to_string(),
|
||||
question: "What would you like to do next?".to_string(),
|
||||
is_other: false,
|
||||
options: Some(vec![
|
||||
RequestUserInputQuestionOption {
|
||||
label: "Discuss a code change (Recommended)".to_string(),
|
||||
|
||||
Reference in New Issue
Block a user