mirror of
https://github.com/openai/codex.git
synced 2026-04-24 22:54:54 +00:00
## Summary - Stream proposed plans in Plan Mode using `<proposed_plan>` tags parsed in core, emitting plan deltas plus a plan `ThreadItem`, while stripping tags from normal assistant output. - Persist plan items and rebuild them on resume so proposed plans show in thread history. - Wire plan items/deltas through app-server protocol v2 and render a dedicated proposed-plan view in the TUI, including the “Implement this plan?” prompt only when a plan item is present. ## Changes ### Core (`codex-rs/core`) - Added a generic, line-based tag parser that buffers each line until it can disprove a tag prefix; implements auto-close on `finish()` for unterminated tags. `codex-rs/core/src/tagged_block_parser.rs` - Refactored proposed plan parsing to wrap the generic parser. `codex-rs/core/src/proposed_plan_parser.rs` - In plan mode, stream assistant deltas as: - **Normal text** → `AgentMessageContentDelta` - **Plan text** → `PlanDelta` + `TurnItem::Plan` start/completion (`codex-rs/core/src/codex.rs`) - Final plan item content is derived from the completed assistant message (authoritative), not necessarily the concatenated deltas. - Strips `<proposed_plan>` blocks from assistant text in plan mode so tags don’t appear in normal messages. (`codex-rs/core/src/stream_events_utils.rs`) - Persist `ItemCompleted` events only for plan items for rollout replay. (`codex-rs/core/src/rollout/policy.rs`) - Guard `update_plan` tool in Plan Mode with a clear error message. (`codex-rs/core/src/tools/handlers/plan.rs`) - Updated Plan Mode prompt to: - keep `<proposed_plan>` out of non-final reasoning/preambles - require exact tag formatting - allow only one `<proposed_plan>` block per turn (`codex-rs/core/templates/collaboration_mode/plan.md`) ### Protocol / App-server protocol - Added `TurnItem::Plan` and `PlanDeltaEvent` to core protocol items. (`codex-rs/protocol/src/items.rs`, `codex-rs/protocol/src/protocol.rs`) - Added v2 `ThreadItem::Plan` and `PlanDeltaNotification` with EXPERIMENTAL markers and note that deltas may not match the final plan item. (`codex-rs/app-server-protocol/src/protocol/v2.rs`) - Added plan delta route in app-server protocol common mapping. (`codex-rs/app-server-protocol/src/protocol/common.rs`) - Rebuild plan items from persisted `ItemCompleted` events on resume. (`codex-rs/app-server-protocol/src/protocol/thread_history.rs`) ### App-server - Forward plan deltas to v2 clients and map core plan items to v2 plan items. (`codex-rs/app-server/src/bespoke_event_handling.rs`, `codex-rs/app-server/src/codex_message_processor.rs`) - Added v2 plan item tests. (`codex-rs/app-server/tests/suite/v2/plan_item.rs`) ### TUI - Added a dedicated proposed plan history cell with special background and padding, and moved “• Proposed Plan” outside the highlighted block. (`codex-rs/tui/src/history_cell.rs`, `codex-rs/tui/src/style.rs`) - Only show “Implement this plan?” when a plan item exists. (`codex-rs/tui/src/chatwidget.rs`, `codex-rs/tui/src/chatwidget/tests.rs`) <img width="831" height="847" alt="Screenshot 2026-01-29 at 7 06 24 PM" src="https://github.com/user-attachments/assets/69794c8c-f96b-4d36-92ef-c1f5c3a8f286" /> ### Docs / Misc - Updated protocol docs to mention plan deltas. (`codex-rs/docs/protocol_v1.md`) - Minor plumbing updates in exec/debug clients to tolerate plan deltas. (`codex-rs/debug-client/src/reader.rs`, `codex-rs/exec/...`) ## Tests - Added core integration tests: - Plan mode strips plan from agent messages. - Missing `</proposed_plan>` closes at end-of-message. (`codex-rs/core/tests/suite/items.rs`) - Added unit tests for generic tag parser (prefix buffering, non-tag lines, auto-close). (`codex-rs/core/src/tagged_block_parser.rs`) - Existing app-server plan item tests in v2. (`codex-rs/app-server/tests/suite/v2/plan_item.rs`) ## Notes / Behavior - Plan output no longer appears in standard assistant text in Plan Mode; it streams via `PlanDelta` and completes as a `TurnItem::Plan`. - The final plan item content is authoritative and may diverge from streamed deltas (documented as experimental). - Reasoning summaries are not filtered; prompt instructs the model not to include `<proposed_plan>` outside the final plan message. ## Codex Author `codex fork 019bec2d-b09d-7450-b292-d7bcdddcdbfb`
522 lines
18 KiB
Rust
522 lines
18 KiB
Rust
use crate::agent::AgentStatus;
|
|
use crate::agent::guards::Guards;
|
|
use crate::error::CodexErr;
|
|
use crate::error::Result as CodexResult;
|
|
use crate::thread_manager::ThreadManagerState;
|
|
use codex_protocol::ThreadId;
|
|
use codex_protocol::protocol::Op;
|
|
use codex_protocol::user_input::UserInput;
|
|
use std::sync::Arc;
|
|
use std::sync::Weak;
|
|
use tokio::sync::watch;
|
|
|
|
/// Control-plane handle for multi-agent operations.
|
|
/// `AgentControl` is held by each session (via `SessionServices`). It provides capability to
|
|
/// spawn new agents and the inter-agent communication layer.
|
|
/// An `AgentControl` instance is shared per "user session" which means the same `AgentControl`
|
|
/// is used for every sub-agent spawned by Codex. By doing so, we make sure the guards are
|
|
/// scoped to a user session.
|
|
#[derive(Clone, Default)]
|
|
pub(crate) struct AgentControl {
|
|
/// Weak handle back to the global thread registry/state.
|
|
/// This is `Weak` to avoid reference cycles and shadow persistence of the form
|
|
/// `ThreadManagerState -> CodexThread -> Session -> SessionServices -> ThreadManagerState`.
|
|
manager: Weak<ThreadManagerState>,
|
|
state: Arc<Guards>,
|
|
}
|
|
|
|
impl AgentControl {
|
|
/// Construct a new `AgentControl` that can spawn/message agents via the given manager state.
|
|
pub(crate) fn new(manager: Weak<ThreadManagerState>) -> Self {
|
|
Self {
|
|
manager,
|
|
..Default::default()
|
|
}
|
|
}
|
|
|
|
/// Spawn a new agent thread and submit the initial prompt.
|
|
pub(crate) async fn spawn_agent(
|
|
&self,
|
|
config: crate::config::Config,
|
|
prompt: String,
|
|
session_source: Option<codex_protocol::protocol::SessionSource>,
|
|
) -> CodexResult<ThreadId> {
|
|
let state = self.upgrade()?;
|
|
let reservation = self.state.reserve_spawn_slot(config.agent_max_threads)?;
|
|
|
|
// The same `AgentControl` is sent to spawn the thread.
|
|
let new_thread = match session_source {
|
|
Some(session_source) => {
|
|
state
|
|
.spawn_new_thread_with_source(config, self.clone(), session_source)
|
|
.await?
|
|
}
|
|
None => state.spawn_new_thread(config, self.clone()).await?,
|
|
};
|
|
reservation.commit(new_thread.thread_id);
|
|
|
|
// Notify a new thread has been created. This notification will be processed by clients
|
|
// to subscribe or drain this newly created thread.
|
|
// TODO(jif) add helper for drain
|
|
state.notify_thread_created(new_thread.thread_id);
|
|
|
|
self.send_prompt(new_thread.thread_id, prompt).await?;
|
|
|
|
Ok(new_thread.thread_id)
|
|
}
|
|
|
|
/// Send a `user` prompt to an existing agent thread.
|
|
pub(crate) async fn send_prompt(
|
|
&self,
|
|
agent_id: ThreadId,
|
|
prompt: String,
|
|
) -> CodexResult<String> {
|
|
let state = self.upgrade()?;
|
|
let result = state
|
|
.send_op(
|
|
agent_id,
|
|
Op::UserInput {
|
|
items: vec![UserInput::Text {
|
|
text: prompt,
|
|
// Agent control prompts are plain text with no UI text elements.
|
|
text_elements: Vec::new(),
|
|
}],
|
|
final_output_json_schema: None,
|
|
},
|
|
)
|
|
.await;
|
|
if matches!(result, Err(CodexErr::InternalAgentDied)) {
|
|
let _ = state.remove_thread(&agent_id).await;
|
|
self.state.release_spawned_thread(agent_id);
|
|
}
|
|
result
|
|
}
|
|
|
|
/// Interrupt the current task for an existing agent thread.
|
|
pub(crate) async fn interrupt_agent(&self, agent_id: ThreadId) -> CodexResult<String> {
|
|
let state = self.upgrade()?;
|
|
state.send_op(agent_id, Op::Interrupt).await
|
|
}
|
|
|
|
/// Submit a shutdown request to an existing agent thread.
|
|
pub(crate) async fn shutdown_agent(&self, agent_id: ThreadId) -> CodexResult<String> {
|
|
let state = self.upgrade()?;
|
|
let result = state.send_op(agent_id, Op::Shutdown {}).await;
|
|
let _ = state.remove_thread(&agent_id).await;
|
|
self.state.release_spawned_thread(agent_id);
|
|
result
|
|
}
|
|
|
|
/// Fetch the last known status for `agent_id`, returning `NotFound` when unavailable.
|
|
pub(crate) async fn get_status(&self, agent_id: ThreadId) -> AgentStatus {
|
|
let Ok(state) = self.upgrade() else {
|
|
// No agent available if upgrade fails.
|
|
return AgentStatus::NotFound;
|
|
};
|
|
let Ok(thread) = state.get_thread(agent_id).await else {
|
|
return AgentStatus::NotFound;
|
|
};
|
|
thread.agent_status().await
|
|
}
|
|
|
|
/// Subscribe to status updates for `agent_id`, yielding the latest value and changes.
|
|
pub(crate) async fn subscribe_status(
|
|
&self,
|
|
agent_id: ThreadId,
|
|
) -> CodexResult<watch::Receiver<AgentStatus>> {
|
|
let state = self.upgrade()?;
|
|
let thread = state.get_thread(agent_id).await?;
|
|
Ok(thread.subscribe_status())
|
|
}
|
|
|
|
fn upgrade(&self) -> CodexResult<Arc<ThreadManagerState>> {
|
|
self.manager
|
|
.upgrade()
|
|
.ok_or_else(|| CodexErr::UnsupportedOperation("thread manager dropped".to_string()))
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use crate::CodexAuth;
|
|
use crate::CodexThread;
|
|
use crate::ThreadManager;
|
|
use crate::agent::agent_status_from_event;
|
|
use crate::config::Config;
|
|
use crate::config::ConfigBuilder;
|
|
use assert_matches::assert_matches;
|
|
use codex_protocol::config_types::ModeKind;
|
|
use codex_protocol::protocol::ErrorEvent;
|
|
use codex_protocol::protocol::EventMsg;
|
|
use codex_protocol::protocol::TurnAbortReason;
|
|
use codex_protocol::protocol::TurnAbortedEvent;
|
|
use codex_protocol::protocol::TurnCompleteEvent;
|
|
use codex_protocol::protocol::TurnStartedEvent;
|
|
use pretty_assertions::assert_eq;
|
|
use tempfile::TempDir;
|
|
use toml::Value as TomlValue;
|
|
|
|
async fn test_config_with_cli_overrides(
|
|
cli_overrides: Vec<(String, TomlValue)>,
|
|
) -> (TempDir, Config) {
|
|
let home = TempDir::new().expect("create temp dir");
|
|
let config = ConfigBuilder::default()
|
|
.codex_home(home.path().to_path_buf())
|
|
.cli_overrides(cli_overrides)
|
|
.build()
|
|
.await
|
|
.expect("load default test config");
|
|
(home, config)
|
|
}
|
|
|
|
async fn test_config() -> (TempDir, Config) {
|
|
test_config_with_cli_overrides(Vec::new()).await
|
|
}
|
|
|
|
struct AgentControlHarness {
|
|
_home: TempDir,
|
|
config: Config,
|
|
manager: ThreadManager,
|
|
control: AgentControl,
|
|
}
|
|
|
|
impl AgentControlHarness {
|
|
async fn new() -> Self {
|
|
let (home, config) = test_config().await;
|
|
let manager = ThreadManager::with_models_provider_and_home(
|
|
CodexAuth::from_api_key("dummy"),
|
|
config.model_provider.clone(),
|
|
config.codex_home.clone(),
|
|
);
|
|
let control = manager.agent_control();
|
|
Self {
|
|
_home: home,
|
|
config,
|
|
manager,
|
|
control,
|
|
}
|
|
}
|
|
|
|
async fn start_thread(&self) -> (ThreadId, Arc<CodexThread>) {
|
|
let new_thread = self
|
|
.manager
|
|
.start_thread(self.config.clone())
|
|
.await
|
|
.expect("start thread");
|
|
(new_thread.thread_id, new_thread.thread)
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn send_prompt_errors_when_manager_dropped() {
|
|
let control = AgentControl::default();
|
|
let err = control
|
|
.send_prompt(ThreadId::new(), "hello".to_string())
|
|
.await
|
|
.expect_err("send_prompt should fail without a manager");
|
|
assert_eq!(
|
|
err.to_string(),
|
|
"unsupported operation: thread manager dropped"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn get_status_returns_not_found_without_manager() {
|
|
let control = AgentControl::default();
|
|
let got = control.get_status(ThreadId::new()).await;
|
|
assert_eq!(got, AgentStatus::NotFound);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn on_event_updates_status_from_task_started() {
|
|
let status = agent_status_from_event(&EventMsg::TurnStarted(TurnStartedEvent {
|
|
model_context_window: None,
|
|
collaboration_mode_kind: ModeKind::Custom,
|
|
}));
|
|
assert_eq!(status, Some(AgentStatus::Running));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn on_event_updates_status_from_task_complete() {
|
|
let status = agent_status_from_event(&EventMsg::TurnComplete(TurnCompleteEvent {
|
|
last_agent_message: Some("done".to_string()),
|
|
}));
|
|
let expected = AgentStatus::Completed(Some("done".to_string()));
|
|
assert_eq!(status, Some(expected));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn on_event_updates_status_from_error() {
|
|
let status = agent_status_from_event(&EventMsg::Error(ErrorEvent {
|
|
message: "boom".to_string(),
|
|
codex_error_info: None,
|
|
}));
|
|
|
|
let expected = AgentStatus::Errored("boom".to_string());
|
|
assert_eq!(status, Some(expected));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn on_event_updates_status_from_turn_aborted() {
|
|
let status = agent_status_from_event(&EventMsg::TurnAborted(TurnAbortedEvent {
|
|
reason: TurnAbortReason::Interrupted,
|
|
}));
|
|
|
|
let expected = AgentStatus::Errored("Interrupted".to_string());
|
|
assert_eq!(status, Some(expected));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn on_event_updates_status_from_shutdown_complete() {
|
|
let status = agent_status_from_event(&EventMsg::ShutdownComplete);
|
|
assert_eq!(status, Some(AgentStatus::Shutdown));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn spawn_agent_errors_when_manager_dropped() {
|
|
let control = AgentControl::default();
|
|
let (_home, config) = test_config().await;
|
|
let err = control
|
|
.spawn_agent(config, "hello".to_string(), None)
|
|
.await
|
|
.expect_err("spawn_agent should fail without a manager");
|
|
assert_eq!(
|
|
err.to_string(),
|
|
"unsupported operation: thread manager dropped"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn send_prompt_errors_when_thread_missing() {
|
|
let harness = AgentControlHarness::new().await;
|
|
let thread_id = ThreadId::new();
|
|
let err = harness
|
|
.control
|
|
.send_prompt(thread_id, "hello".to_string())
|
|
.await
|
|
.expect_err("send_prompt should fail for missing thread");
|
|
assert_matches!(err, CodexErr::ThreadNotFound(id) if id == thread_id);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn get_status_returns_not_found_for_missing_thread() {
|
|
let harness = AgentControlHarness::new().await;
|
|
let status = harness.control.get_status(ThreadId::new()).await;
|
|
assert_eq!(status, AgentStatus::NotFound);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn get_status_returns_pending_init_for_new_thread() {
|
|
let harness = AgentControlHarness::new().await;
|
|
let (thread_id, _) = harness.start_thread().await;
|
|
let status = harness.control.get_status(thread_id).await;
|
|
assert_eq!(status, AgentStatus::PendingInit);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn subscribe_status_errors_for_missing_thread() {
|
|
let harness = AgentControlHarness::new().await;
|
|
let thread_id = ThreadId::new();
|
|
let err = harness
|
|
.control
|
|
.subscribe_status(thread_id)
|
|
.await
|
|
.expect_err("subscribe_status should fail for missing thread");
|
|
assert_matches!(err, CodexErr::ThreadNotFound(id) if id == thread_id);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn subscribe_status_updates_on_shutdown() {
|
|
let harness = AgentControlHarness::new().await;
|
|
let (thread_id, thread) = harness.start_thread().await;
|
|
let mut status_rx = harness
|
|
.control
|
|
.subscribe_status(thread_id)
|
|
.await
|
|
.expect("subscribe_status should succeed");
|
|
assert_eq!(status_rx.borrow().clone(), AgentStatus::PendingInit);
|
|
|
|
let _ = thread
|
|
.submit(Op::Shutdown {})
|
|
.await
|
|
.expect("shutdown should submit");
|
|
|
|
let _ = status_rx.changed().await;
|
|
assert_eq!(status_rx.borrow().clone(), AgentStatus::Shutdown);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn send_prompt_submits_user_message() {
|
|
let harness = AgentControlHarness::new().await;
|
|
let (thread_id, _thread) = harness.start_thread().await;
|
|
|
|
let submission_id = harness
|
|
.control
|
|
.send_prompt(thread_id, "hello from tests".to_string())
|
|
.await
|
|
.expect("send_prompt should succeed");
|
|
assert!(!submission_id.is_empty());
|
|
let expected = (
|
|
thread_id,
|
|
Op::UserInput {
|
|
items: vec![UserInput::Text {
|
|
text: "hello from tests".to_string(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
final_output_json_schema: None,
|
|
},
|
|
);
|
|
let captured = harness
|
|
.manager
|
|
.captured_ops()
|
|
.into_iter()
|
|
.find(|entry| *entry == expected);
|
|
assert_eq!(captured, Some(expected));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn spawn_agent_creates_thread_and_sends_prompt() {
|
|
let harness = AgentControlHarness::new().await;
|
|
let thread_id = harness
|
|
.control
|
|
.spawn_agent(harness.config.clone(), "spawned".to_string(), None)
|
|
.await
|
|
.expect("spawn_agent should succeed");
|
|
let _thread = harness
|
|
.manager
|
|
.get_thread(thread_id)
|
|
.await
|
|
.expect("thread should be registered");
|
|
let expected = (
|
|
thread_id,
|
|
Op::UserInput {
|
|
items: vec![UserInput::Text {
|
|
text: "spawned".to_string(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
final_output_json_schema: None,
|
|
},
|
|
);
|
|
let captured = harness
|
|
.manager
|
|
.captured_ops()
|
|
.into_iter()
|
|
.find(|entry| *entry == expected);
|
|
assert_eq!(captured, Some(expected));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn spawn_agent_respects_max_threads_limit() {
|
|
let max_threads = 1usize;
|
|
let (_home, config) = test_config_with_cli_overrides(vec![(
|
|
"agents.max_threads".to_string(),
|
|
TomlValue::Integer(max_threads as i64),
|
|
)])
|
|
.await;
|
|
let manager = ThreadManager::with_models_provider_and_home(
|
|
CodexAuth::from_api_key("dummy"),
|
|
config.model_provider.clone(),
|
|
config.codex_home.clone(),
|
|
);
|
|
let control = manager.agent_control();
|
|
|
|
let _ = manager
|
|
.start_thread(config.clone())
|
|
.await
|
|
.expect("start thread");
|
|
|
|
let first_agent_id = control
|
|
.spawn_agent(config.clone(), "hello".to_string(), None)
|
|
.await
|
|
.expect("spawn_agent should succeed");
|
|
|
|
let err = control
|
|
.spawn_agent(config, "hello again".to_string(), None)
|
|
.await
|
|
.expect_err("spawn_agent should respect max threads");
|
|
let CodexErr::AgentLimitReached {
|
|
max_threads: seen_max_threads,
|
|
} = err
|
|
else {
|
|
panic!("expected CodexErr::AgentLimitReached");
|
|
};
|
|
assert_eq!(seen_max_threads, max_threads);
|
|
|
|
let _ = control
|
|
.shutdown_agent(first_agent_id)
|
|
.await
|
|
.expect("shutdown agent");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn spawn_agent_releases_slot_after_shutdown() {
|
|
let max_threads = 1usize;
|
|
let (_home, config) = test_config_with_cli_overrides(vec![(
|
|
"agents.max_threads".to_string(),
|
|
TomlValue::Integer(max_threads as i64),
|
|
)])
|
|
.await;
|
|
let manager = ThreadManager::with_models_provider_and_home(
|
|
CodexAuth::from_api_key("dummy"),
|
|
config.model_provider.clone(),
|
|
config.codex_home.clone(),
|
|
);
|
|
let control = manager.agent_control();
|
|
|
|
let first_agent_id = control
|
|
.spawn_agent(config.clone(), "hello".to_string(), None)
|
|
.await
|
|
.expect("spawn_agent should succeed");
|
|
let _ = control
|
|
.shutdown_agent(first_agent_id)
|
|
.await
|
|
.expect("shutdown agent");
|
|
|
|
let second_agent_id = control
|
|
.spawn_agent(config.clone(), "hello again".to_string(), None)
|
|
.await
|
|
.expect("spawn_agent should succeed after shutdown");
|
|
let _ = control
|
|
.shutdown_agent(second_agent_id)
|
|
.await
|
|
.expect("shutdown agent");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn spawn_agent_limit_shared_across_clones() {
|
|
let max_threads = 1usize;
|
|
let (_home, config) = test_config_with_cli_overrides(vec![(
|
|
"agents.max_threads".to_string(),
|
|
TomlValue::Integer(max_threads as i64),
|
|
)])
|
|
.await;
|
|
let manager = ThreadManager::with_models_provider_and_home(
|
|
CodexAuth::from_api_key("dummy"),
|
|
config.model_provider.clone(),
|
|
config.codex_home.clone(),
|
|
);
|
|
let control = manager.agent_control();
|
|
let cloned = control.clone();
|
|
|
|
let first_agent_id = cloned
|
|
.spawn_agent(config.clone(), "hello".to_string(), None)
|
|
.await
|
|
.expect("spawn_agent should succeed");
|
|
|
|
let err = control
|
|
.spawn_agent(config, "hello again".to_string(), None)
|
|
.await
|
|
.expect_err("spawn_agent should respect shared guard");
|
|
let CodexErr::AgentLimitReached { max_threads } = err else {
|
|
panic!("expected CodexErr::AgentLimitReached");
|
|
};
|
|
assert_eq!(max_threads, 1);
|
|
|
|
let _ = control
|
|
.shutdown_agent(first_agent_id)
|
|
.await
|
|
.expect("shutdown agent");
|
|
}
|
|
}
|