mirror of
https://github.com/openai/codex.git
synced 2026-02-03 15:33:41 +00:00
Compare commits
4 Commits
queue/stee
...
pr9103
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
52168a18f1 | ||
|
|
9659583559 | ||
|
|
623707ab58 | ||
|
|
86f81ca010 |
@@ -9,6 +9,7 @@ use codex_protocol::protocol::Op;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Weak;
|
||||
use tokio::sync::watch;
|
||||
|
||||
/// Control-plane handle for multi-agent operations.
|
||||
/// `AgentControl` is held by each session (via `SessionServices`). It provides capability to
|
||||
@@ -27,7 +28,6 @@ impl AgentControl {
|
||||
Self { manager }
|
||||
}
|
||||
|
||||
#[allow(dead_code)] // Used by upcoming multi-agent tooling.
|
||||
/// Spawn a new agent thread and submit the initial prompt.
|
||||
///
|
||||
/// If `headless` is true, a background drain task is spawned to prevent unbounded event growth
|
||||
@@ -50,7 +50,6 @@ impl AgentControl {
|
||||
Ok(new_thread.thread_id)
|
||||
}
|
||||
|
||||
#[allow(dead_code)] // Used by upcoming multi-agent tooling.
|
||||
/// Send a `user` prompt to an existing agent thread.
|
||||
pub(crate) async fn send_prompt(
|
||||
&self,
|
||||
@@ -69,7 +68,13 @@ impl AgentControl {
|
||||
.await
|
||||
}
|
||||
|
||||
#[allow(dead_code)] // Used by upcoming multi-agent tooling.
|
||||
/// Submit a shutdown request to an existing agent thread.
|
||||
pub(crate) async fn shutdown_agent(&self, agent_id: ThreadId) -> CodexResult<String> {
|
||||
let state = self.upgrade()?;
|
||||
state.send_op(agent_id, Op::Shutdown {}).await
|
||||
}
|
||||
|
||||
#[allow(dead_code)] // Will be used for collab tools.
|
||||
/// Fetch the last known status for `agent_id`, returning `NotFound` when unavailable.
|
||||
pub(crate) async fn get_status(&self, agent_id: ThreadId) -> AgentStatus {
|
||||
let Ok(state) = self.upgrade() else {
|
||||
@@ -82,6 +87,16 @@ impl AgentControl {
|
||||
thread.agent_status().await
|
||||
}
|
||||
|
||||
/// Subscribe to status updates for `agent_id`, yielding the latest value and changes.
|
||||
pub(crate) async fn subscribe_status(
|
||||
&self,
|
||||
agent_id: ThreadId,
|
||||
) -> CodexResult<watch::Receiver<AgentStatus>> {
|
||||
let state = self.upgrade()?;
|
||||
let thread = state.get_thread(agent_id).await?;
|
||||
Ok(thread.subscribe_status())
|
||||
}
|
||||
|
||||
fn upgrade(&self) -> CodexResult<Arc<ThreadManagerState>> {
|
||||
self.manager
|
||||
.upgrade()
|
||||
@@ -114,13 +129,63 @@ fn spawn_headless_drain(thread: Arc<CodexThread>) {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::CodexAuth;
|
||||
use crate::ThreadManager;
|
||||
use crate::agent::agent_status_from_event;
|
||||
use crate::config::Config;
|
||||
use crate::config::ConfigBuilder;
|
||||
use assert_matches::assert_matches;
|
||||
use codex_protocol::protocol::ErrorEvent;
|
||||
use codex_protocol::protocol::TurnAbortReason;
|
||||
use codex_protocol::protocol::TurnAbortedEvent;
|
||||
use codex_protocol::protocol::TurnCompleteEvent;
|
||||
use codex_protocol::protocol::TurnStartedEvent;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tempfile::TempDir;
|
||||
|
||||
async fn test_config() -> (TempDir, Config) {
|
||||
let home = TempDir::new().expect("create temp dir");
|
||||
let config = ConfigBuilder::default()
|
||||
.codex_home(home.path().to_path_buf())
|
||||
.build()
|
||||
.await
|
||||
.expect("load default test config");
|
||||
(home, config)
|
||||
}
|
||||
|
||||
struct AgentControlHarness {
|
||||
_home: TempDir,
|
||||
config: Config,
|
||||
manager: ThreadManager,
|
||||
control: AgentControl,
|
||||
}
|
||||
|
||||
impl AgentControlHarness {
|
||||
async fn new() -> Self {
|
||||
let (home, config) = test_config().await;
|
||||
let manager = ThreadManager::with_models_provider_and_home(
|
||||
CodexAuth::from_api_key("dummy"),
|
||||
config.model_provider.clone(),
|
||||
config.codex_home.clone(),
|
||||
);
|
||||
let control = manager.agent_control();
|
||||
Self {
|
||||
_home: home,
|
||||
config,
|
||||
manager,
|
||||
control,
|
||||
}
|
||||
}
|
||||
|
||||
async fn start_thread(&self) -> (ThreadId, Arc<CodexThread>) {
|
||||
let new_thread = self
|
||||
.manager
|
||||
.start_thread(self.config.clone())
|
||||
.await
|
||||
.expect("start thread");
|
||||
(new_thread.thread_id, new_thread.thread)
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn send_prompt_errors_when_manager_dropped() {
|
||||
@@ -185,4 +250,135 @@ mod tests {
|
||||
let status = agent_status_from_event(&EventMsg::ShutdownComplete);
|
||||
assert_eq!(status, Some(AgentStatus::Shutdown));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn spawn_agent_errors_when_manager_dropped() {
|
||||
let control = AgentControl::default();
|
||||
let (_home, config) = test_config().await;
|
||||
let err = control
|
||||
.spawn_agent(config, "hello".to_string(), false)
|
||||
.await
|
||||
.expect_err("spawn_agent should fail without a manager");
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
"unsupported operation: thread manager dropped"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn send_prompt_errors_when_thread_missing() {
|
||||
let harness = AgentControlHarness::new().await;
|
||||
let thread_id = ThreadId::new();
|
||||
let err = harness
|
||||
.control
|
||||
.send_prompt(thread_id, "hello".to_string())
|
||||
.await
|
||||
.expect_err("send_prompt should fail for missing thread");
|
||||
assert_matches!(err, CodexErr::ThreadNotFound(id) if id == thread_id);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn get_status_returns_not_found_for_missing_thread() {
|
||||
let harness = AgentControlHarness::new().await;
|
||||
let status = harness.control.get_status(ThreadId::new()).await;
|
||||
assert_eq!(status, AgentStatus::NotFound);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn get_status_returns_pending_init_for_new_thread() {
|
||||
let harness = AgentControlHarness::new().await;
|
||||
let (thread_id, _) = harness.start_thread().await;
|
||||
let status = harness.control.get_status(thread_id).await;
|
||||
assert_eq!(status, AgentStatus::PendingInit);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn subscribe_status_errors_for_missing_thread() {
|
||||
let harness = AgentControlHarness::new().await;
|
||||
let thread_id = ThreadId::new();
|
||||
let err = harness
|
||||
.control
|
||||
.subscribe_status(thread_id)
|
||||
.await
|
||||
.expect_err("subscribe_status should fail for missing thread");
|
||||
assert_matches!(err, CodexErr::ThreadNotFound(id) if id == thread_id);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn subscribe_status_updates_on_shutdown() {
|
||||
let harness = AgentControlHarness::new().await;
|
||||
let (thread_id, thread) = harness.start_thread().await;
|
||||
let mut status_rx = harness
|
||||
.control
|
||||
.subscribe_status(thread_id)
|
||||
.await
|
||||
.expect("subscribe_status should succeed");
|
||||
assert_eq!(status_rx.borrow().clone(), AgentStatus::PendingInit);
|
||||
|
||||
let _ = thread
|
||||
.submit(Op::Shutdown {})
|
||||
.await
|
||||
.expect("shutdown should submit");
|
||||
|
||||
let _ = status_rx.changed().await;
|
||||
assert_eq!(status_rx.borrow().clone(), AgentStatus::Shutdown);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn send_prompt_submits_user_message() {
|
||||
let harness = AgentControlHarness::new().await;
|
||||
let (thread_id, _thread) = harness.start_thread().await;
|
||||
|
||||
let submission_id = harness
|
||||
.control
|
||||
.send_prompt(thread_id, "hello from tests".to_string())
|
||||
.await
|
||||
.expect("send_prompt should succeed");
|
||||
assert!(!submission_id.is_empty());
|
||||
let expected = (
|
||||
thread_id,
|
||||
Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "hello from tests".to_string(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
},
|
||||
);
|
||||
let captured = harness
|
||||
.manager
|
||||
.captured_ops()
|
||||
.into_iter()
|
||||
.find(|entry| *entry == expected);
|
||||
assert_eq!(captured, Some(expected));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn spawn_agent_creates_thread_and_sends_prompt() {
|
||||
let harness = AgentControlHarness::new().await;
|
||||
let thread_id = harness
|
||||
.control
|
||||
.spawn_agent(harness.config.clone(), "spawned".to_string(), false)
|
||||
.await
|
||||
.expect("spawn_agent should succeed");
|
||||
let _thread = harness
|
||||
.manager
|
||||
.get_thread(thread_id)
|
||||
.await
|
||||
.expect("thread should be registered");
|
||||
let expected = (
|
||||
thread_id,
|
||||
Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "spawned".to_string(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
},
|
||||
);
|
||||
let captured = harness
|
||||
.manager
|
||||
.captured_ops()
|
||||
.into_iter()
|
||||
.find(|entry| *entry == expected);
|
||||
assert_eq!(captured, Some(expected));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,3 +13,7 @@ pub(crate) fn agent_status_from_event(msg: &EventMsg) -> Option<AgentStatus> {
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn is_final(status: &AgentStatus) -> bool {
|
||||
!matches!(status, AgentStatus::PendingInit | AgentStatus::Running)
|
||||
}
|
||||
|
||||
@@ -164,6 +164,7 @@ use codex_protocol::protocol::InitialHistory;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use codex_utils_readiness::Readiness;
|
||||
use codex_utils_readiness::ReadinessFlag;
|
||||
use tokio::sync::watch;
|
||||
|
||||
/// The high-level interface to the Codex system.
|
||||
/// It operates as a queue pair where you send submissions and receive events.
|
||||
@@ -172,7 +173,7 @@ pub struct Codex {
|
||||
pub(crate) tx_sub: Sender<Submission>,
|
||||
pub(crate) rx_event: Receiver<Event>,
|
||||
// Last known status of the agent.
|
||||
pub(crate) agent_status: Arc<RwLock<AgentStatus>>,
|
||||
pub(crate) agent_status: watch::Receiver<AgentStatus>,
|
||||
}
|
||||
|
||||
/// Wrapper returned by [`Codex::spawn`] containing the spawned [`Codex`],
|
||||
@@ -275,7 +276,7 @@ impl Codex {
|
||||
|
||||
// Generate a unique ID for the lifetime of this Codex session.
|
||||
let session_source_clone = session_configuration.session_source.clone();
|
||||
let agent_status = Arc::new(RwLock::new(AgentStatus::PendingInit));
|
||||
let (agent_status_tx, agent_status_rx) = watch::channel(AgentStatus::PendingInit);
|
||||
|
||||
let session = Session::new(
|
||||
session_configuration,
|
||||
@@ -284,7 +285,7 @@ impl Codex {
|
||||
models_manager.clone(),
|
||||
exec_policy,
|
||||
tx_event.clone(),
|
||||
Arc::clone(&agent_status),
|
||||
agent_status_tx.clone(),
|
||||
conversation_history,
|
||||
session_source_clone,
|
||||
skills_manager,
|
||||
@@ -303,7 +304,7 @@ impl Codex {
|
||||
next_id: AtomicU64::new(0),
|
||||
tx_sub,
|
||||
rx_event,
|
||||
agent_status,
|
||||
agent_status: agent_status_rx,
|
||||
};
|
||||
|
||||
#[allow(deprecated)]
|
||||
@@ -345,8 +346,7 @@ impl Codex {
|
||||
}
|
||||
|
||||
pub(crate) async fn agent_status(&self) -> AgentStatus {
|
||||
let status = self.agent_status.read().await;
|
||||
status.clone()
|
||||
self.agent_status.borrow().clone()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -356,7 +356,7 @@ impl Codex {
|
||||
pub(crate) struct Session {
|
||||
conversation_id: ThreadId,
|
||||
tx_event: Sender<Event>,
|
||||
agent_status: Arc<RwLock<AgentStatus>>,
|
||||
agent_status: watch::Sender<AgentStatus>,
|
||||
state: Mutex<SessionState>,
|
||||
/// The set of enabled features should be invariant for the lifetime of the
|
||||
/// session.
|
||||
@@ -557,7 +557,7 @@ impl Session {
|
||||
models_manager: Arc<ModelsManager>,
|
||||
exec_policy: ExecPolicyManager,
|
||||
tx_event: Sender<Event>,
|
||||
agent_status: Arc<RwLock<AgentStatus>>,
|
||||
agent_status: watch::Sender<AgentStatus>,
|
||||
initial_history: InitialHistory,
|
||||
session_source: SessionSource,
|
||||
skills_manager: Arc<SkillsManager>,
|
||||
@@ -703,7 +703,7 @@ impl Session {
|
||||
let sess = Arc::new(Session {
|
||||
conversation_id,
|
||||
tx_event: tx_event.clone(),
|
||||
agent_status: Arc::clone(&agent_status),
|
||||
agent_status,
|
||||
state: Mutex::new(state),
|
||||
features: config.features.clone(),
|
||||
active_turn: Mutex::new(None),
|
||||
@@ -1026,8 +1026,7 @@ impl Session {
|
||||
pub(crate) async fn send_event_raw(&self, event: Event) {
|
||||
// Record the last known agent status.
|
||||
if let Some(status) = agent_status_from_event(&event.msg) {
|
||||
let mut guard = self.agent_status.write().await;
|
||||
*guard = status;
|
||||
self.agent_status.send_replace(status);
|
||||
}
|
||||
// Persist the event into rollout (recorder filters as needed)
|
||||
let rollout_items = vec![RolloutItem::EventMsg(event.msg.clone())];
|
||||
@@ -1045,8 +1044,7 @@ impl Session {
|
||||
pub(crate) async fn send_event_raw_flushed(&self, event: Event) {
|
||||
// Record the last known agent status.
|
||||
if let Some(status) = agent_status_from_event(&event.msg) {
|
||||
let mut guard = self.agent_status.write().await;
|
||||
*guard = status;
|
||||
self.agent_status.send_replace(status);
|
||||
}
|
||||
self.persist_rollout_items(&[RolloutItem::EventMsg(event.msg.clone())])
|
||||
.await;
|
||||
@@ -3494,7 +3492,7 @@ mod tests {
|
||||
));
|
||||
let agent_control = AgentControl::default();
|
||||
let exec_policy = ExecPolicyManager::default();
|
||||
let agent_status = Arc::new(RwLock::new(AgentStatus::PendingInit));
|
||||
let (agent_status_tx, _agent_status_rx) = watch::channel(AgentStatus::PendingInit);
|
||||
let model = ModelsManager::get_model_offline(config.model.as_deref());
|
||||
let session_configuration = SessionConfiguration {
|
||||
provider: config.model_provider.clone(),
|
||||
@@ -3557,7 +3555,7 @@ mod tests {
|
||||
let session = Session {
|
||||
conversation_id,
|
||||
tx_event,
|
||||
agent_status: Arc::clone(&agent_status),
|
||||
agent_status: agent_status_tx,
|
||||
state: Mutex::new(state),
|
||||
features: config.features.clone(),
|
||||
active_turn: Mutex::new(None),
|
||||
@@ -3588,7 +3586,7 @@ mod tests {
|
||||
));
|
||||
let agent_control = AgentControl::default();
|
||||
let exec_policy = ExecPolicyManager::default();
|
||||
let agent_status = Arc::new(RwLock::new(AgentStatus::PendingInit));
|
||||
let (agent_status_tx, _agent_status_rx) = watch::channel(AgentStatus::PendingInit);
|
||||
let model = ModelsManager::get_model_offline(config.model.as_deref());
|
||||
let session_configuration = SessionConfiguration {
|
||||
provider: config.model_provider.clone(),
|
||||
@@ -3651,7 +3649,7 @@ mod tests {
|
||||
let session = Arc::new(Session {
|
||||
conversation_id,
|
||||
tx_event,
|
||||
agent_status: Arc::clone(&agent_status),
|
||||
agent_status: agent_status_tx,
|
||||
state: Mutex::new(state),
|
||||
features: config.features.clone(),
|
||||
active_turn: Mutex::new(None),
|
||||
|
||||
@@ -87,7 +87,7 @@ pub(crate) async fn run_codex_thread_interactive(
|
||||
next_id: AtomicU64::new(0),
|
||||
tx_sub: tx_ops,
|
||||
rx_event: rx_sub,
|
||||
agent_status: Arc::clone(&codex.agent_status),
|
||||
agent_status: codex.agent_status.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -129,7 +129,7 @@ pub(crate) async fn run_codex_thread_one_shot(
|
||||
// Bridge events so we can observe completion and shut down automatically.
|
||||
let (tx_bridge, rx_bridge) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
|
||||
let ops_tx = io.tx_sub.clone();
|
||||
let agent_status = Arc::clone(&io.agent_status);
|
||||
let agent_status = io.agent_status.clone();
|
||||
let io_for_bridge = io;
|
||||
tokio::spawn(async move {
|
||||
while let Ok(event) = io_for_bridge.next_event().await {
|
||||
@@ -363,20 +363,23 @@ mod tests {
|
||||
use super::*;
|
||||
use async_channel::bounded;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::AgentStatus;
|
||||
use codex_protocol::protocol::RawResponseItemEvent;
|
||||
use codex_protocol::protocol::TurnAbortReason;
|
||||
use codex_protocol::protocol::TurnAbortedEvent;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tokio::sync::watch;
|
||||
|
||||
#[tokio::test]
|
||||
async fn forward_events_cancelled_while_send_blocked_shuts_down_delegate() {
|
||||
let (tx_events, rx_events) = bounded(1);
|
||||
let (tx_sub, rx_sub) = bounded(SUBMISSION_CHANNEL_CAPACITY);
|
||||
let (_agent_status_tx, agent_status) = watch::channel(AgentStatus::PendingInit);
|
||||
let codex = Arc::new(Codex {
|
||||
next_id: AtomicU64::new(0),
|
||||
tx_sub,
|
||||
rx_event: rx_events,
|
||||
agent_status: Default::default(),
|
||||
agent_status,
|
||||
});
|
||||
|
||||
let (session, ctx, _rx_evt) = crate::codex::make_session_and_context_with_rx().await;
|
||||
|
||||
@@ -5,6 +5,7 @@ use crate::protocol::Event;
|
||||
use crate::protocol::Op;
|
||||
use crate::protocol::Submission;
|
||||
use std::path::PathBuf;
|
||||
use tokio::sync::watch;
|
||||
|
||||
pub struct CodexThread {
|
||||
codex: Codex,
|
||||
@@ -38,6 +39,10 @@ impl CodexThread {
|
||||
self.codex.agent_status().await
|
||||
}
|
||||
|
||||
pub(crate) fn subscribe_status(&self) -> watch::Receiver<AgentStatus> {
|
||||
self.codex.agent_status.clone()
|
||||
}
|
||||
|
||||
pub fn rollout_path(&self) -> PathBuf {
|
||||
self.rollout_path.clone()
|
||||
}
|
||||
|
||||
@@ -1,23 +1,28 @@
|
||||
use crate::bash::parse_shell_lc_plain_commands;
|
||||
use crate::command_safety::windows_safe_commands::is_safe_command_windows;
|
||||
use crate::command_safety::windows_safe_commands::is_powershell_executable;
|
||||
use crate::command_safety::windows_safe_commands::is_safe_powershell_invocation;
|
||||
|
||||
/// `command` must be the _exact_ proposed list of arguments that will be sent
|
||||
/// to `execvp(3)`. That is:
|
||||
/// - If the command is to be run via a shell, then the first argument MUST be
|
||||
/// the shell executable (e.g. "bash", "zsh", "pwsh", "powershell").
|
||||
/// - The first argument will be resolved against the `PATH` environment
|
||||
/// variable, so it can be either a bare command name (e.g. "ls") or a full
|
||||
/// path.
|
||||
/// - If the first argument is NOT a shell executable, then the command MUST be
|
||||
/// discoverable in the system `PATH` and MUST NOT rely on shell features
|
||||
/// (e.g. shell built-ins, operators, or syntax).
|
||||
pub fn is_known_safe_command(command: &[String]) -> bool {
|
||||
let command: Vec<String> = command
|
||||
.iter()
|
||||
.map(|s| {
|
||||
if s == "zsh" {
|
||||
"bash".to_string()
|
||||
} else {
|
||||
s.clone()
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
if is_safe_command_windows(&command) {
|
||||
return true;
|
||||
if command.is_empty() {
|
||||
return false;
|
||||
}
|
||||
|
||||
if is_safe_to_call_with_exec(&command) {
|
||||
let exe = &command[0];
|
||||
if is_powershell_executable(exe) {
|
||||
return is_safe_powershell_invocation(command);
|
||||
}
|
||||
|
||||
if is_safe_to_call_with_exec(command) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -27,7 +32,7 @@ pub fn is_known_safe_command(command: &[String]) -> bool {
|
||||
// introduce side effects ( "&&", "||", ";", and "|" ). If every
|
||||
// individual command in the script is itself a known‑safe command, then
|
||||
// the composite expression is considered safe.
|
||||
if let Some(all_commands) = parse_shell_lc_plain_commands(&command)
|
||||
if let Some(all_commands) = parse_shell_lc_plain_commands(command)
|
||||
&& !all_commands.is_empty()
|
||||
&& all_commands
|
||||
.iter()
|
||||
@@ -422,4 +427,33 @@ mod tests {
|
||||
"> redirection should be rejected"
|
||||
);
|
||||
}
|
||||
|
||||
/// This test only works on Windows because it requires access to PowerShell
|
||||
/// to parse the argument to `pwsh -Command`.
|
||||
#[test]
|
||||
fn ensure_safe_unix_command_that_is_unsafe_powershell_command_is_rejected() {
|
||||
// `brew install powershell` to run this test on a Mac!
|
||||
if which::which("pwsh").is_err() {
|
||||
return;
|
||||
}
|
||||
|
||||
assert!(
|
||||
is_known_safe_command(&vec_str(&["echo", "hi", "@(calc)"])),
|
||||
"Running echo with execvp is safe because there is no shell interpretation."
|
||||
);
|
||||
assert!(
|
||||
!is_known_safe_command(&vec_str(&["bash", "-lc", "echo hi @(calc)"])),
|
||||
"This command should not get marked as safe because the Bash script has a parse error."
|
||||
);
|
||||
assert!(
|
||||
is_known_safe_command(&vec_str(&["pwsh", "-Command", "echo hi"])),
|
||||
"Our logic should recognize that `echo hi` is safe to run under PowerShell."
|
||||
);
|
||||
assert!(
|
||||
!is_known_safe_command(&vec_str(&["pwsh", "-Command", "echo hi @(calc)"])),
|
||||
r#"Even though `echo hi @(calc)` would not do anything malicious
|
||||
when run under Bash (because it would be rejected with a parse error), it would run
|
||||
calc.exec when run under PowerShell and therefore should be rejected."#
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,7 +9,11 @@ const POWERSHELL_PARSER_SCRIPT: &str = include_str!("powershell_parser.ps1");
|
||||
|
||||
/// On Windows, we conservatively allow only clearly read-only PowerShell invocations
|
||||
/// that match a small safelist. Anything else (including direct CMD commands) is unsafe.
|
||||
pub fn is_safe_command_windows(command: &[String]) -> bool {
|
||||
///
|
||||
/// `command` must be the _exact_ proposed list of arguments that will be sent
|
||||
/// to `execvp(3)`, so `command[0]` must satisfy [`is_powershell_executable`]
|
||||
/// for this function to return true.
|
||||
pub fn is_safe_powershell_invocation(command: &[String]) -> bool {
|
||||
if let Some(commands) = try_parse_powershell_command_sequence(command) {
|
||||
commands
|
||||
.iter()
|
||||
@@ -108,7 +112,7 @@ fn parse_powershell_script(executable: &str, script: &str) -> Option<Vec<Vec<Str
|
||||
}
|
||||
|
||||
/// Returns true when the executable name is one of the supported PowerShell binaries.
|
||||
fn is_powershell_executable(exe: &str) -> bool {
|
||||
pub fn is_powershell_executable(exe: &str) -> bool {
|
||||
let executable_name = Path::new(exe)
|
||||
.file_name()
|
||||
.and_then(|osstr| osstr.to_str())
|
||||
@@ -357,21 +361,21 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn recognizes_safe_powershell_wrappers() {
|
||||
assert!(is_safe_command_windows(&vec_str(&[
|
||||
assert!(is_safe_powershell_invocation(&vec_str(&[
|
||||
"powershell.exe",
|
||||
"-NoLogo",
|
||||
"-Command",
|
||||
"Get-ChildItem -Path .",
|
||||
])));
|
||||
|
||||
assert!(is_safe_command_windows(&vec_str(&[
|
||||
assert!(is_safe_powershell_invocation(&vec_str(&[
|
||||
"powershell.exe",
|
||||
"-NoProfile",
|
||||
"-Command",
|
||||
"git status",
|
||||
])));
|
||||
|
||||
assert!(is_safe_command_windows(&vec_str(&[
|
||||
assert!(is_safe_powershell_invocation(&vec_str(&[
|
||||
"powershell.exe",
|
||||
"Get-Content",
|
||||
"Cargo.toml",
|
||||
@@ -379,7 +383,7 @@ mod tests {
|
||||
|
||||
// pwsh parity
|
||||
if let Some(pwsh) = try_find_pwsh_executable_blocking() {
|
||||
assert!(is_safe_command_windows(&[
|
||||
assert!(is_safe_powershell_invocation(&[
|
||||
pwsh.as_path().to_str().unwrap().into(),
|
||||
"-NoProfile".to_string(),
|
||||
"-Command".to_string(),
|
||||
@@ -396,7 +400,7 @@ mod tests {
|
||||
}
|
||||
|
||||
if let Some(pwsh) = try_find_pwsh_executable_blocking() {
|
||||
assert!(is_safe_command_windows(&[
|
||||
assert!(is_safe_powershell_invocation(&[
|
||||
pwsh.as_path().to_str().unwrap().into(),
|
||||
"-NoProfile".to_string(),
|
||||
"-Command".to_string(),
|
||||
@@ -404,7 +408,7 @@ mod tests {
|
||||
]));
|
||||
}
|
||||
|
||||
assert!(is_safe_command_windows(&vec_str(&[
|
||||
assert!(is_safe_powershell_invocation(&vec_str(&[
|
||||
r"C:\Windows\System32\WindowsPowerShell\v1.0\powershell.exe",
|
||||
"-Command",
|
||||
"Get-Content Cargo.toml",
|
||||
@@ -418,7 +422,7 @@ mod tests {
|
||||
};
|
||||
|
||||
let pwsh: String = pwsh.as_path().to_str().unwrap().into();
|
||||
assert!(is_safe_command_windows(&[
|
||||
assert!(is_safe_powershell_invocation(&[
|
||||
pwsh.clone(),
|
||||
"-NoLogo".to_string(),
|
||||
"-NoProfile".to_string(),
|
||||
@@ -427,7 +431,7 @@ mod tests {
|
||||
.to_string()
|
||||
]));
|
||||
|
||||
assert!(is_safe_command_windows(&[
|
||||
assert!(is_safe_powershell_invocation(&[
|
||||
pwsh.clone(),
|
||||
"-NoLogo".to_string(),
|
||||
"-NoProfile".to_string(),
|
||||
@@ -435,7 +439,7 @@ mod tests {
|
||||
"Get-Content foo.rs | Select-Object -Skip 200".to_string()
|
||||
]));
|
||||
|
||||
assert!(is_safe_command_windows(&[
|
||||
assert!(is_safe_powershell_invocation(&[
|
||||
pwsh.clone(),
|
||||
"-NoLogo".to_string(),
|
||||
"-NoProfile".to_string(),
|
||||
@@ -443,19 +447,19 @@ mod tests {
|
||||
"git -c core.pager=cat show HEAD:foo.rs".to_string()
|
||||
]));
|
||||
|
||||
assert!(is_safe_command_windows(&[
|
||||
assert!(is_safe_powershell_invocation(&[
|
||||
pwsh.clone(),
|
||||
"-Command".to_string(),
|
||||
"-git cat-file -p HEAD:foo.rs".to_string()
|
||||
]));
|
||||
|
||||
assert!(is_safe_command_windows(&[
|
||||
assert!(is_safe_powershell_invocation(&[
|
||||
pwsh.clone(),
|
||||
"-Command".to_string(),
|
||||
"(Get-Content foo.rs -Raw)".to_string()
|
||||
]));
|
||||
|
||||
assert!(is_safe_command_windows(&[
|
||||
assert!(is_safe_powershell_invocation(&[
|
||||
pwsh,
|
||||
"-Command".to_string(),
|
||||
"Get-Item foo.rs | Select-Object Length".to_string()
|
||||
@@ -464,97 +468,97 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn rejects_powershell_commands_with_side_effects() {
|
||||
assert!(!is_safe_command_windows(&vec_str(&[
|
||||
assert!(!is_safe_powershell_invocation(&vec_str(&[
|
||||
"powershell.exe",
|
||||
"-NoLogo",
|
||||
"-Command",
|
||||
"Remove-Item foo.txt",
|
||||
])));
|
||||
|
||||
assert!(!is_safe_command_windows(&vec_str(&[
|
||||
assert!(!is_safe_powershell_invocation(&vec_str(&[
|
||||
"powershell.exe",
|
||||
"-NoProfile",
|
||||
"-Command",
|
||||
"rg --pre cat",
|
||||
])));
|
||||
|
||||
assert!(!is_safe_command_windows(&vec_str(&[
|
||||
assert!(!is_safe_powershell_invocation(&vec_str(&[
|
||||
"powershell.exe",
|
||||
"-Command",
|
||||
"Set-Content foo.txt 'hello'",
|
||||
])));
|
||||
|
||||
// Redirections are blocked
|
||||
assert!(!is_safe_command_windows(&vec_str(&[
|
||||
assert!(!is_safe_powershell_invocation(&vec_str(&[
|
||||
"powershell.exe",
|
||||
"-Command",
|
||||
"echo hi > out.txt",
|
||||
])));
|
||||
assert!(!is_safe_command_windows(&vec_str(&[
|
||||
assert!(!is_safe_powershell_invocation(&vec_str(&[
|
||||
"powershell.exe",
|
||||
"-Command",
|
||||
"Get-Content x | Out-File y",
|
||||
])));
|
||||
assert!(!is_safe_command_windows(&vec_str(&[
|
||||
assert!(!is_safe_powershell_invocation(&vec_str(&[
|
||||
"powershell.exe",
|
||||
"-Command",
|
||||
"Write-Output foo 2> err.txt",
|
||||
])));
|
||||
|
||||
// Call operator is blocked
|
||||
assert!(!is_safe_command_windows(&vec_str(&[
|
||||
assert!(!is_safe_powershell_invocation(&vec_str(&[
|
||||
"powershell.exe",
|
||||
"-Command",
|
||||
"& Remove-Item foo",
|
||||
])));
|
||||
|
||||
// Chained safe + unsafe must fail
|
||||
assert!(!is_safe_command_windows(&vec_str(&[
|
||||
assert!(!is_safe_powershell_invocation(&vec_str(&[
|
||||
"powershell.exe",
|
||||
"-Command",
|
||||
"Get-ChildItem; Remove-Item foo",
|
||||
])));
|
||||
// Nested unsafe cmdlet inside safe command must fail
|
||||
assert!(!is_safe_command_windows(&vec_str(&[
|
||||
assert!(!is_safe_powershell_invocation(&vec_str(&[
|
||||
"powershell.exe",
|
||||
"-Command",
|
||||
"Write-Output (Set-Content foo6.txt 'abc')",
|
||||
])));
|
||||
// Additional nested unsafe cmdlet examples must fail
|
||||
assert!(!is_safe_command_windows(&vec_str(&[
|
||||
assert!(!is_safe_powershell_invocation(&vec_str(&[
|
||||
"powershell.exe",
|
||||
"-Command",
|
||||
"Write-Host (Remove-Item foo.txt)",
|
||||
])));
|
||||
assert!(!is_safe_command_windows(&vec_str(&[
|
||||
assert!(!is_safe_powershell_invocation(&vec_str(&[
|
||||
"powershell.exe",
|
||||
"-Command",
|
||||
"Get-Content (New-Item bar.txt)",
|
||||
])));
|
||||
|
||||
// Unsafe @ expansion.
|
||||
assert!(!is_safe_command_windows(&vec_str(&[
|
||||
assert!(!is_safe_powershell_invocation(&vec_str(&[
|
||||
"powershell.exe",
|
||||
"-Command",
|
||||
"ls @(calc.exe)"
|
||||
])));
|
||||
|
||||
// Unsupported constructs that the AST parser refuses (no fallback to manual splitting).
|
||||
assert!(!is_safe_command_windows(&vec_str(&[
|
||||
assert!(!is_safe_powershell_invocation(&vec_str(&[
|
||||
"powershell.exe",
|
||||
"-Command",
|
||||
"ls && pwd"
|
||||
])));
|
||||
|
||||
// Sub-expressions are rejected even if they contain otherwise safe commands.
|
||||
assert!(!is_safe_command_windows(&vec_str(&[
|
||||
assert!(!is_safe_powershell_invocation(&vec_str(&[
|
||||
"powershell.exe",
|
||||
"-Command",
|
||||
"Write-Output $(Get-Content foo)"
|
||||
])));
|
||||
|
||||
// Empty words from the parser (e.g. '') are rejected.
|
||||
assert!(!is_safe_command_windows(&vec_str(&[
|
||||
assert!(!is_safe_powershell_invocation(&vec_str(&[
|
||||
"powershell.exe",
|
||||
"-Command",
|
||||
"''"
|
||||
@@ -563,13 +567,13 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn accepts_constant_expression_arguments() {
|
||||
assert!(is_safe_command_windows(&vec_str(&[
|
||||
assert!(is_safe_powershell_invocation(&vec_str(&[
|
||||
"powershell.exe",
|
||||
"-Command",
|
||||
"Get-Content 'foo bar'"
|
||||
])));
|
||||
|
||||
assert!(is_safe_command_windows(&vec_str(&[
|
||||
assert!(is_safe_powershell_invocation(&vec_str(&[
|
||||
"powershell.exe",
|
||||
"-Command",
|
||||
"Get-Content \"foo bar\""
|
||||
@@ -578,13 +582,13 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn rejects_dynamic_arguments() {
|
||||
assert!(!is_safe_command_windows(&vec_str(&[
|
||||
assert!(!is_safe_powershell_invocation(&vec_str(&[
|
||||
"powershell.exe",
|
||||
"-Command",
|
||||
"Get-Content $foo"
|
||||
])));
|
||||
|
||||
assert!(!is_safe_command_windows(&vec_str(&[
|
||||
assert!(!is_safe_powershell_invocation(&vec_str(&[
|
||||
"powershell.exe",
|
||||
"-Command",
|
||||
"Write-Output \"foo $bar\""
|
||||
@@ -599,7 +603,7 @@ mod tests {
|
||||
|
||||
let chain = "pwd && ls";
|
||||
assert!(
|
||||
!is_safe_command_windows(&vec_str(&[
|
||||
!is_safe_powershell_invocation(&vec_str(&[
|
||||
"powershell.exe",
|
||||
"-NoProfile",
|
||||
"-Command",
|
||||
@@ -610,7 +614,7 @@ mod tests {
|
||||
|
||||
if let Some(pwsh) = try_find_pwsh_executable_blocking() {
|
||||
assert!(
|
||||
is_safe_command_windows(&[
|
||||
is_safe_powershell_invocation(&[
|
||||
pwsh.as_path().to_str().unwrap().into(),
|
||||
"-NoProfile".to_string(),
|
||||
"-Command".to_string(),
|
||||
|
||||
@@ -56,6 +56,10 @@ pub(crate) struct ThreadManagerState {
|
||||
models_manager: Arc<ModelsManager>,
|
||||
skills_manager: Arc<SkillsManager>,
|
||||
session_source: SessionSource,
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
#[allow(dead_code)]
|
||||
// Captures submitted ops for testing purpose.
|
||||
ops_log: Arc<std::sync::Mutex<Vec<(ThreadId, Op)>>>,
|
||||
}
|
||||
|
||||
impl ThreadManager {
|
||||
@@ -74,6 +78,8 @@ impl ThreadManager {
|
||||
skills_manager: Arc::new(SkillsManager::new(codex_home)),
|
||||
auth_manager,
|
||||
session_source,
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
ops_log: Arc::new(std::sync::Mutex::new(Vec::new())),
|
||||
}),
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
_test_codex_home_guard: None,
|
||||
@@ -111,6 +117,8 @@ impl ThreadManager {
|
||||
skills_manager: Arc::new(SkillsManager::new(codex_home)),
|
||||
auth_manager,
|
||||
session_source: SessionSource::Exec,
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
ops_log: Arc::new(std::sync::Mutex::new(Vec::new())),
|
||||
}),
|
||||
_test_codex_home_guard: None,
|
||||
}
|
||||
@@ -202,9 +210,19 @@ impl ThreadManager {
|
||||
.await
|
||||
}
|
||||
|
||||
fn agent_control(&self) -> AgentControl {
|
||||
pub(crate) fn agent_control(&self) -> AgentControl {
|
||||
AgentControl::new(Arc::downgrade(&self.state))
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn captured_ops(&self) -> Vec<(ThreadId, Op)> {
|
||||
self.state
|
||||
.ops_log
|
||||
.lock()
|
||||
.map(|log| log.clone())
|
||||
.unwrap_or_default()
|
||||
}
|
||||
}
|
||||
|
||||
impl ThreadManagerState {
|
||||
@@ -217,7 +235,14 @@ impl ThreadManagerState {
|
||||
}
|
||||
|
||||
pub(crate) async fn send_op(&self, thread_id: ThreadId, op: Op) -> CodexResult<String> {
|
||||
self.get_thread(thread_id).await?.submit(op).await
|
||||
let thread = self.get_thread(thread_id).await?;
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
{
|
||||
if let Ok(mut log) = self.ops_log.lock() {
|
||||
log.push((thread_id, op.clone()));
|
||||
}
|
||||
}
|
||||
thread.submit(op).await
|
||||
}
|
||||
|
||||
#[allow(dead_code)] // Used by upcoming multi-agent tooling.
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use crate::agent::AgentStatus;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::config::Config;
|
||||
use crate::error::CodexErr;
|
||||
@@ -11,29 +12,13 @@ use crate::tools::registry::ToolKind;
|
||||
use async_trait::async_trait;
|
||||
use codex_protocol::ThreadId;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
|
||||
pub struct CollabHandler;
|
||||
|
||||
pub(crate) const DEFAULT_WAIT_TIMEOUT_MS: i64 = 30_000;
|
||||
pub(crate) const MAX_WAIT_TIMEOUT_MS: i64 = 300_000;
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct SpawnAgentArgs {
|
||||
message: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct SendInputArgs {
|
||||
id: String,
|
||||
message: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct WaitArgs {
|
||||
id: String,
|
||||
timeout_ms: Option<i64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct CloseAgentArgs {
|
||||
id: String,
|
||||
@@ -68,10 +53,10 @@ impl ToolHandler for CollabHandler {
|
||||
};
|
||||
|
||||
match tool_name.as_str() {
|
||||
"spawn_agent" => handle_spawn_agent(session, turn, arguments).await,
|
||||
"send_input" => handle_send_input(session, arguments).await,
|
||||
"wait" => handle_wait(arguments).await,
|
||||
"close_agent" => handle_close_agent(arguments).await,
|
||||
"spawn_agent" => spawn::handle(session, turn, arguments).await,
|
||||
"send_input" => send_input::handle(session, arguments).await,
|
||||
"wait" => wait::handle(session, arguments).await,
|
||||
"close_agent" => close_agent::handle(session, arguments).await,
|
||||
other => Err(FunctionCallError::RespondToModel(format!(
|
||||
"unsupported collab tool {other}"
|
||||
))),
|
||||
@@ -79,84 +64,233 @@ impl ToolHandler for CollabHandler {
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_spawn_agent(
|
||||
session: std::sync::Arc<crate::codex::Session>,
|
||||
turn: std::sync::Arc<TurnContext>,
|
||||
arguments: String,
|
||||
) -> Result<ToolOutput, FunctionCallError> {
|
||||
let args: SpawnAgentArgs = parse_arguments(&arguments)?;
|
||||
if args.message.trim().is_empty() {
|
||||
return Err(FunctionCallError::RespondToModel(
|
||||
"Empty message can't be send to an agent".to_string(),
|
||||
));
|
||||
}
|
||||
let config = build_agent_spawn_config(turn.as_ref())?;
|
||||
let result = session
|
||||
.services
|
||||
.agent_control
|
||||
.spawn_agent(config, args.message, true)
|
||||
.await
|
||||
.map_err(|err| FunctionCallError::Fatal(err.to_string()))?;
|
||||
mod spawn {
|
||||
use super::*;
|
||||
use crate::codex::Session;
|
||||
use std::sync::Arc;
|
||||
|
||||
Ok(ToolOutput::Function {
|
||||
content: format!("agent_id: {result}"),
|
||||
success: Some(true),
|
||||
content_items: None,
|
||||
})
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct SpawnAgentArgs {
|
||||
message: String,
|
||||
}
|
||||
|
||||
pub async fn handle(
|
||||
session: Arc<Session>,
|
||||
turn: Arc<TurnContext>,
|
||||
arguments: String,
|
||||
) -> Result<ToolOutput, FunctionCallError> {
|
||||
let args: SpawnAgentArgs = parse_arguments(&arguments)?;
|
||||
if args.message.trim().is_empty() {
|
||||
return Err(FunctionCallError::RespondToModel(
|
||||
"Empty message can't be send to an agent".to_string(),
|
||||
));
|
||||
}
|
||||
let config = build_agent_spawn_config(turn.as_ref())?;
|
||||
let result = session
|
||||
.services
|
||||
.agent_control
|
||||
.spawn_agent(config, args.message, true)
|
||||
.await
|
||||
.map_err(|err| FunctionCallError::Fatal(err.to_string()))?;
|
||||
|
||||
Ok(ToolOutput::Function {
|
||||
content: format!("agent_id: {result}"),
|
||||
success: Some(true),
|
||||
content_items: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_send_input(
|
||||
session: std::sync::Arc<crate::codex::Session>,
|
||||
arguments: String,
|
||||
) -> Result<ToolOutput, FunctionCallError> {
|
||||
let args: SendInputArgs = parse_arguments(&arguments)?;
|
||||
let agent_id = agent_id(&args.id)?;
|
||||
if args.message.trim().is_empty() {
|
||||
return Err(FunctionCallError::RespondToModel(
|
||||
"Empty message can't be send to an agent".to_string(),
|
||||
));
|
||||
mod send_input {
|
||||
use super::*;
|
||||
use crate::codex::Session;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct SendInputArgs {
|
||||
id: String,
|
||||
message: String,
|
||||
}
|
||||
let content = session
|
||||
.services
|
||||
.agent_control
|
||||
.send_prompt(agent_id, args.message)
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
CodexErr::ThreadNotFound(id) => {
|
||||
FunctionCallError::RespondToModel(format!("agent with id {id} not found"))
|
||||
|
||||
pub async fn handle(
|
||||
session: Arc<Session>,
|
||||
arguments: String,
|
||||
) -> Result<ToolOutput, FunctionCallError> {
|
||||
let args: SendInputArgs = parse_arguments(&arguments)?;
|
||||
let agent_id = agent_id(&args.id)?;
|
||||
if args.message.trim().is_empty() {
|
||||
return Err(FunctionCallError::RespondToModel(
|
||||
"Empty message can't be send to an agent".to_string(),
|
||||
));
|
||||
}
|
||||
let content = session
|
||||
.services
|
||||
.agent_control
|
||||
.send_prompt(agent_id, args.message)
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
CodexErr::ThreadNotFound(id) => {
|
||||
FunctionCallError::RespondToModel(format!("agent with id {id} not found"))
|
||||
}
|
||||
err => FunctionCallError::Fatal(err.to_string()),
|
||||
})?;
|
||||
|
||||
Ok(ToolOutput::Function {
|
||||
content,
|
||||
success: Some(true),
|
||||
content_items: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
mod wait {
|
||||
use super::*;
|
||||
use crate::agent::status::is_final;
|
||||
use crate::codex::Session;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::time::Instant;
|
||||
use tokio::time::timeout_at;
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct WaitArgs {
|
||||
id: String,
|
||||
timeout_ms: Option<i64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct WaitResult {
|
||||
status: AgentStatus,
|
||||
timed_out: bool,
|
||||
}
|
||||
|
||||
pub async fn handle(
|
||||
session: Arc<Session>,
|
||||
arguments: String,
|
||||
) -> Result<ToolOutput, FunctionCallError> {
|
||||
let args: WaitArgs = parse_arguments(&arguments)?;
|
||||
let agent_id = agent_id(&args.id)?;
|
||||
|
||||
// Validate timeout.
|
||||
let timeout_ms = args.timeout_ms.unwrap_or(DEFAULT_WAIT_TIMEOUT_MS);
|
||||
let timeout_ms = match timeout_ms {
|
||||
ms if ms <= 0 => {
|
||||
return Err(FunctionCallError::RespondToModel(
|
||||
"timeout_ms must be greater than zero".to_owned(),
|
||||
));
|
||||
}
|
||||
err => FunctionCallError::Fatal(err.to_string()),
|
||||
ms => ms.min(MAX_WAIT_TIMEOUT_MS),
|
||||
};
|
||||
|
||||
let mut status_rx = session
|
||||
.services
|
||||
.agent_control
|
||||
.subscribe_status(agent_id)
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
CodexErr::ThreadNotFound(id) => {
|
||||
FunctionCallError::RespondToModel(format!("agent with id {id} not found"))
|
||||
}
|
||||
err => FunctionCallError::Fatal(err.to_string()),
|
||||
})?;
|
||||
|
||||
// Get last known status.
|
||||
let mut status = status_rx.borrow_and_update().clone();
|
||||
let deadline = Instant::now() + Duration::from_millis(timeout_ms as u64);
|
||||
|
||||
let timed_out = loop {
|
||||
if is_final(&status) {
|
||||
break false;
|
||||
}
|
||||
|
||||
match timeout_at(deadline, status_rx.changed()).await {
|
||||
Ok(Ok(())) => status = status_rx.borrow().clone(),
|
||||
Ok(Err(_)) => {
|
||||
let last_status = session.services.agent_control.get_status(agent_id).await;
|
||||
if last_status != AgentStatus::NotFound {
|
||||
// On-purpose we keep the last known status if the agent gets dropped. This
|
||||
// event is not supposed to happen.
|
||||
status = last_status;
|
||||
}
|
||||
break false;
|
||||
}
|
||||
Err(_) => break true,
|
||||
}
|
||||
};
|
||||
|
||||
if matches!(status, AgentStatus::NotFound) {
|
||||
return Err(FunctionCallError::RespondToModel(format!(
|
||||
"agent with id {agent_id} not found"
|
||||
)));
|
||||
}
|
||||
|
||||
let result = WaitResult { status, timed_out };
|
||||
|
||||
let content = serde_json::to_string(&result).map_err(|err| {
|
||||
FunctionCallError::Fatal(format!("failed to serialize wait result: {err}"))
|
||||
})?;
|
||||
|
||||
Ok(ToolOutput::Function {
|
||||
content,
|
||||
success: Some(true),
|
||||
content_items: None,
|
||||
})
|
||||
}
|
||||
|
||||
async fn handle_wait(arguments: String) -> Result<ToolOutput, FunctionCallError> {
|
||||
let args: WaitArgs = parse_arguments(&arguments)?;
|
||||
let _agent_id = agent_id(&args.id)?;
|
||||
|
||||
let timeout_ms = args.timeout_ms.unwrap_or(DEFAULT_WAIT_TIMEOUT_MS);
|
||||
if timeout_ms <= 0 {
|
||||
return Err(FunctionCallError::RespondToModel(
|
||||
"timeout_ms must be greater than zero".to_string(),
|
||||
));
|
||||
Ok(ToolOutput::Function {
|
||||
content,
|
||||
success: Some(!result.timed_out),
|
||||
content_items: None,
|
||||
})
|
||||
}
|
||||
let _timeout_ms = timeout_ms.min(MAX_WAIT_TIMEOUT_MS);
|
||||
// TODO(jif): implement agent wait once lifecycle tracking is wired up.
|
||||
Err(FunctionCallError::Fatal("wait not implemented".to_string()))
|
||||
}
|
||||
|
||||
async fn handle_close_agent(arguments: String) -> Result<ToolOutput, FunctionCallError> {
|
||||
let args: CloseAgentArgs = parse_arguments(&arguments)?;
|
||||
let _agent_id = agent_id(&args.id)?;
|
||||
// TODO(jif): implement agent shutdown and return the final status.
|
||||
Err(FunctionCallError::Fatal(
|
||||
"close_agent not implemented".to_string(),
|
||||
))
|
||||
pub mod close_agent {
|
||||
use super::*;
|
||||
use crate::codex::Session;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
pub(super) struct CloseAgentResult {
|
||||
pub(super) status: AgentStatus,
|
||||
}
|
||||
|
||||
pub async fn handle(
|
||||
session: Arc<Session>,
|
||||
arguments: String,
|
||||
) -> Result<ToolOutput, FunctionCallError> {
|
||||
let args: CloseAgentArgs = parse_arguments(&arguments)?;
|
||||
let agent_id = agent_id(&args.id)?;
|
||||
let mut status_rx = session
|
||||
.services
|
||||
.agent_control
|
||||
.subscribe_status(agent_id)
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
CodexErr::ThreadNotFound(id) => {
|
||||
FunctionCallError::RespondToModel(format!("agent with id {id} not found"))
|
||||
}
|
||||
err => FunctionCallError::Fatal(err.to_string()),
|
||||
})?;
|
||||
let status = status_rx.borrow_and_update().clone();
|
||||
|
||||
if !matches!(status, AgentStatus::Shutdown) {
|
||||
let _ = session
|
||||
.services
|
||||
.agent_control
|
||||
.shutdown_agent(agent_id)
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
CodexErr::ThreadNotFound(id) => {
|
||||
FunctionCallError::RespondToModel(format!("agent with id {id} not found"))
|
||||
}
|
||||
err => FunctionCallError::Fatal(err.to_string()),
|
||||
})?;
|
||||
}
|
||||
|
||||
let content = serde_json::to_string(&CloseAgentResult { status }).map_err(|err| {
|
||||
FunctionCallError::Fatal(format!("failed to serialize close_agent result: {err}"))
|
||||
})?;
|
||||
|
||||
Ok(ToolOutput::Function {
|
||||
content,
|
||||
success: Some(true),
|
||||
content_items: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn agent_id(id: &str) -> Result<ThreadId, FunctionCallError> {
|
||||
@@ -192,3 +326,384 @@ fn build_agent_spawn_config(turn: &TurnContext) -> Result<Config, FunctionCallEr
|
||||
})?;
|
||||
Ok(config)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::CodexAuth;
|
||||
use crate::ThreadManager;
|
||||
use crate::built_in_model_providers;
|
||||
use crate::codex::make_session_and_context;
|
||||
use crate::config::types::ShellEnvironmentPolicy;
|
||||
use crate::function_tool::FunctionCallError;
|
||||
use crate::protocol::AskForApproval;
|
||||
use crate::protocol::Op;
|
||||
use crate::protocol::SandboxPolicy;
|
||||
use crate::turn_diff_tracker::TurnDiffTracker;
|
||||
use codex_protocol::ThreadId;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::time::timeout;
|
||||
|
||||
fn invocation(
|
||||
session: Arc<crate::codex::Session>,
|
||||
turn: Arc<TurnContext>,
|
||||
tool_name: &str,
|
||||
payload: ToolPayload,
|
||||
) -> ToolInvocation {
|
||||
ToolInvocation {
|
||||
session,
|
||||
turn,
|
||||
tracker: Arc::new(Mutex::new(TurnDiffTracker::default())),
|
||||
call_id: "call-1".to_string(),
|
||||
tool_name: tool_name.to_string(),
|
||||
payload,
|
||||
}
|
||||
}
|
||||
|
||||
fn function_payload(args: serde_json::Value) -> ToolPayload {
|
||||
ToolPayload::Function {
|
||||
arguments: args.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
fn thread_manager() -> ThreadManager {
|
||||
ThreadManager::with_models_provider(
|
||||
CodexAuth::from_api_key("dummy"),
|
||||
built_in_model_providers()["openai"].clone(),
|
||||
)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn handler_rejects_non_function_payloads() {
|
||||
let (session, turn) = make_session_and_context().await;
|
||||
let invocation = invocation(
|
||||
Arc::new(session),
|
||||
Arc::new(turn),
|
||||
"spawn_agent",
|
||||
ToolPayload::Custom {
|
||||
input: "hello".to_string(),
|
||||
},
|
||||
);
|
||||
let Err(err) = CollabHandler.handle(invocation).await else {
|
||||
panic!("payload should be rejected");
|
||||
};
|
||||
assert_eq!(
|
||||
err,
|
||||
FunctionCallError::RespondToModel(
|
||||
"collab handler received unsupported payload".to_string()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn handler_rejects_unknown_tool() {
|
||||
let (session, turn) = make_session_and_context().await;
|
||||
let invocation = invocation(
|
||||
Arc::new(session),
|
||||
Arc::new(turn),
|
||||
"unknown_tool",
|
||||
function_payload(json!({})),
|
||||
);
|
||||
let Err(err) = CollabHandler.handle(invocation).await else {
|
||||
panic!("tool should be rejected");
|
||||
};
|
||||
assert_eq!(
|
||||
err,
|
||||
FunctionCallError::RespondToModel("unsupported collab tool unknown_tool".to_string())
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn spawn_agent_rejects_empty_message() {
|
||||
let (session, turn) = make_session_and_context().await;
|
||||
let invocation = invocation(
|
||||
Arc::new(session),
|
||||
Arc::new(turn),
|
||||
"spawn_agent",
|
||||
function_payload(json!({"message": " "})),
|
||||
);
|
||||
let Err(err) = CollabHandler.handle(invocation).await else {
|
||||
panic!("empty message should be rejected");
|
||||
};
|
||||
assert_eq!(
|
||||
err,
|
||||
FunctionCallError::RespondToModel(
|
||||
"Empty message can't be send to an agent".to_string()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn spawn_agent_errors_when_manager_dropped() {
|
||||
let (session, turn) = make_session_and_context().await;
|
||||
let invocation = invocation(
|
||||
Arc::new(session),
|
||||
Arc::new(turn),
|
||||
"spawn_agent",
|
||||
function_payload(json!({"message": "hello"})),
|
||||
);
|
||||
let Err(err) = CollabHandler.handle(invocation).await else {
|
||||
panic!("spawn should fail without a manager");
|
||||
};
|
||||
assert_eq!(
|
||||
err,
|
||||
FunctionCallError::Fatal("unsupported operation: thread manager dropped".to_string())
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn send_input_rejects_empty_message() {
|
||||
let (session, turn) = make_session_and_context().await;
|
||||
let invocation = invocation(
|
||||
Arc::new(session),
|
||||
Arc::new(turn),
|
||||
"send_input",
|
||||
function_payload(json!({"id": ThreadId::new().to_string(), "message": ""})),
|
||||
);
|
||||
let Err(err) = CollabHandler.handle(invocation).await else {
|
||||
panic!("empty message should be rejected");
|
||||
};
|
||||
assert_eq!(
|
||||
err,
|
||||
FunctionCallError::RespondToModel(
|
||||
"Empty message can't be send to an agent".to_string()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn send_input_rejects_invalid_id() {
|
||||
let (session, turn) = make_session_and_context().await;
|
||||
let invocation = invocation(
|
||||
Arc::new(session),
|
||||
Arc::new(turn),
|
||||
"send_input",
|
||||
function_payload(json!({"id": "not-a-uuid", "message": "hi"})),
|
||||
);
|
||||
let Err(err) = CollabHandler.handle(invocation).await else {
|
||||
panic!("invalid id should be rejected");
|
||||
};
|
||||
let FunctionCallError::RespondToModel(msg) = err else {
|
||||
panic!("expected respond-to-model error");
|
||||
};
|
||||
assert!(msg.starts_with("invalid agent id not-a-uuid:"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn send_input_reports_missing_agent() {
|
||||
let (mut session, turn) = make_session_and_context().await;
|
||||
let manager = thread_manager();
|
||||
session.services.agent_control = manager.agent_control();
|
||||
let agent_id = ThreadId::new();
|
||||
let invocation = invocation(
|
||||
Arc::new(session),
|
||||
Arc::new(turn),
|
||||
"send_input",
|
||||
function_payload(json!({"id": agent_id.to_string(), "message": "hi"})),
|
||||
);
|
||||
let Err(err) = CollabHandler.handle(invocation).await else {
|
||||
panic!("missing agent should be reported");
|
||||
};
|
||||
assert_eq!(
|
||||
err,
|
||||
FunctionCallError::RespondToModel(format!("agent with id {agent_id} not found"))
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn wait_rejects_non_positive_timeout() {
|
||||
let (session, turn) = make_session_and_context().await;
|
||||
let invocation = invocation(
|
||||
Arc::new(session),
|
||||
Arc::new(turn),
|
||||
"wait",
|
||||
function_payload(json!({"id": ThreadId::new().to_string(), "timeout_ms": 0})),
|
||||
);
|
||||
let Err(err) = CollabHandler.handle(invocation).await else {
|
||||
panic!("non-positive timeout should be rejected");
|
||||
};
|
||||
assert_eq!(
|
||||
err,
|
||||
FunctionCallError::RespondToModel("timeout_ms must be greater than zero".to_string())
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn wait_rejects_invalid_id() {
|
||||
let (session, turn) = make_session_and_context().await;
|
||||
let invocation = invocation(
|
||||
Arc::new(session),
|
||||
Arc::new(turn),
|
||||
"wait",
|
||||
function_payload(json!({"id": "invalid"})),
|
||||
);
|
||||
let Err(err) = CollabHandler.handle(invocation).await else {
|
||||
panic!("invalid id should be rejected");
|
||||
};
|
||||
let FunctionCallError::RespondToModel(msg) = err else {
|
||||
panic!("expected respond-to-model error");
|
||||
};
|
||||
assert!(msg.starts_with("invalid agent id invalid:"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn wait_times_out_when_status_is_not_final() {
|
||||
let (mut session, turn) = make_session_and_context().await;
|
||||
let manager = thread_manager();
|
||||
session.services.agent_control = manager.agent_control();
|
||||
let config = turn.client.config().as_ref().clone();
|
||||
let thread = manager.start_thread(config).await.expect("start thread");
|
||||
let agent_id = thread.thread_id;
|
||||
let invocation = invocation(
|
||||
Arc::new(session),
|
||||
Arc::new(turn),
|
||||
"wait",
|
||||
function_payload(json!({"id": agent_id.to_string(), "timeout_ms": 10})),
|
||||
);
|
||||
let output = CollabHandler
|
||||
.handle(invocation)
|
||||
.await
|
||||
.expect("wait should succeed");
|
||||
let ToolOutput::Function {
|
||||
content, success, ..
|
||||
} = output
|
||||
else {
|
||||
panic!("expected function output");
|
||||
};
|
||||
assert_eq!(content, r#"{"status":"pending_init","timed_out":true}"#);
|
||||
assert_eq!(success, Some(false));
|
||||
|
||||
let _ = thread
|
||||
.thread
|
||||
.submit(Op::Shutdown {})
|
||||
.await
|
||||
.expect("shutdown should submit");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn wait_returns_final_status_without_timeout() {
|
||||
let (mut session, turn) = make_session_and_context().await;
|
||||
let manager = thread_manager();
|
||||
session.services.agent_control = manager.agent_control();
|
||||
let config = turn.client.config().as_ref().clone();
|
||||
let thread = manager.start_thread(config).await.expect("start thread");
|
||||
let agent_id = thread.thread_id;
|
||||
let mut status_rx = manager
|
||||
.agent_control()
|
||||
.subscribe_status(agent_id)
|
||||
.await
|
||||
.expect("subscribe should succeed");
|
||||
|
||||
let _ = thread
|
||||
.thread
|
||||
.submit(Op::Shutdown {})
|
||||
.await
|
||||
.expect("shutdown should submit");
|
||||
let _ = timeout(Duration::from_secs(1), status_rx.changed())
|
||||
.await
|
||||
.expect("shutdown status should arrive");
|
||||
|
||||
let invocation = invocation(
|
||||
Arc::new(session),
|
||||
Arc::new(turn),
|
||||
"wait",
|
||||
function_payload(json!({"id": agent_id.to_string(), "timeout_ms": 1000})),
|
||||
);
|
||||
let output = CollabHandler
|
||||
.handle(invocation)
|
||||
.await
|
||||
.expect("wait should succeed");
|
||||
let ToolOutput::Function {
|
||||
content, success, ..
|
||||
} = output
|
||||
else {
|
||||
panic!("expected function output");
|
||||
};
|
||||
assert_eq!(content, r#"{"status":"shutdown","timed_out":false}"#);
|
||||
assert_eq!(success, Some(true));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn close_agent_submits_shutdown_and_returns_status() {
|
||||
let (mut session, turn) = make_session_and_context().await;
|
||||
let manager = thread_manager();
|
||||
session.services.agent_control = manager.agent_control();
|
||||
let config = turn.client.config().as_ref().clone();
|
||||
let thread = manager.start_thread(config).await.expect("start thread");
|
||||
let agent_id = thread.thread_id;
|
||||
let status_before = manager.agent_control().get_status(agent_id).await;
|
||||
|
||||
let invocation = invocation(
|
||||
Arc::new(session),
|
||||
Arc::new(turn),
|
||||
"close_agent",
|
||||
function_payload(json!({"id": agent_id.to_string()})),
|
||||
);
|
||||
let output = CollabHandler
|
||||
.handle(invocation)
|
||||
.await
|
||||
.expect("close_agent should succeed");
|
||||
let ToolOutput::Function {
|
||||
content, success, ..
|
||||
} = output
|
||||
else {
|
||||
panic!("expected function output");
|
||||
};
|
||||
let result: close_agent::CloseAgentResult =
|
||||
serde_json::from_str(&content).expect("close_agent result should be json");
|
||||
assert_eq!(result.status, status_before);
|
||||
assert_eq!(success, Some(true));
|
||||
|
||||
let ops = manager.captured_ops();
|
||||
let submitted_shutdown = ops
|
||||
.iter()
|
||||
.any(|(id, op)| *id == agent_id && matches!(op, Op::Shutdown));
|
||||
assert_eq!(submitted_shutdown, true);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn build_agent_spawn_config_uses_turn_context_values() {
|
||||
let (_session, mut turn) = make_session_and_context().await;
|
||||
turn.developer_instructions = Some("dev".to_string());
|
||||
turn.base_instructions = Some("base".to_string());
|
||||
turn.compact_prompt = Some("compact".to_string());
|
||||
turn.user_instructions = Some("user".to_string());
|
||||
turn.shell_environment_policy = ShellEnvironmentPolicy {
|
||||
use_profile: true,
|
||||
..ShellEnvironmentPolicy::default()
|
||||
};
|
||||
let temp_dir = tempfile::tempdir().expect("temp dir");
|
||||
turn.cwd = temp_dir.path().to_path_buf();
|
||||
turn.codex_linux_sandbox_exe = Some(PathBuf::from("/bin/echo"));
|
||||
turn.approval_policy = AskForApproval::Never;
|
||||
turn.sandbox_policy = SandboxPolicy::DangerFullAccess;
|
||||
|
||||
let config = build_agent_spawn_config(&turn).expect("spawn config");
|
||||
let mut expected = (*turn.client.config()).clone();
|
||||
expected.model = Some(turn.client.get_model());
|
||||
expected.model_provider = turn.client.get_provider();
|
||||
expected.model_reasoning_effort = turn.client.get_reasoning_effort();
|
||||
expected.model_reasoning_summary = turn.client.get_reasoning_summary();
|
||||
expected.developer_instructions = turn.developer_instructions.clone();
|
||||
expected.base_instructions = turn.base_instructions.clone();
|
||||
expected.compact_prompt = turn.compact_prompt.clone();
|
||||
expected.user_instructions = turn.user_instructions.clone();
|
||||
expected.shell_environment_policy = turn.shell_environment_policy.clone();
|
||||
expected.codex_linux_sandbox_exe = turn.codex_linux_sandbox_exe.clone();
|
||||
expected.cwd = turn.cwd.clone();
|
||||
expected
|
||||
.approval_policy
|
||||
.set(turn.approval_policy)
|
||||
.expect("approval policy set");
|
||||
expected
|
||||
.sandbox_policy
|
||||
.set(turn.sandbox_policy)
|
||||
.expect("sandbox policy set");
|
||||
assert_eq!(config, expected);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,183 +0,0 @@
|
||||
#![cfg(not(target_os = "windows"))]
|
||||
|
||||
use anyhow::Result;
|
||||
use codex_core::protocol::AskForApproval;
|
||||
use codex_core::protocol::EventMsg;
|
||||
use codex_core::protocol::Op;
|
||||
use codex_core::protocol::SandboxPolicy;
|
||||
use codex_protocol::config_types::ReasoningSummary;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use core_test_support::responses::ev_assistant_message;
|
||||
use core_test_support::responses::ev_completed;
|
||||
use core_test_support::responses::ev_function_call;
|
||||
use core_test_support::responses::ev_response_created;
|
||||
use core_test_support::responses::mount_sse_once;
|
||||
use core_test_support::responses::sse;
|
||||
use core_test_support::test_codex::TestCodexHarness;
|
||||
use core_test_support::wait_for_event;
|
||||
use core_test_support::wait_for_event_match;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
enum MidTurnOp {
|
||||
UserInput,
|
||||
UserTurn,
|
||||
}
|
||||
|
||||
fn message_contains_text(item: &Value, text: &str) -> bool {
|
||||
item.get("type").and_then(Value::as_str) == Some("message")
|
||||
&& item.get("role").and_then(Value::as_str) == Some("user")
|
||||
&& item
|
||||
.get("content")
|
||||
.and_then(Value::as_array)
|
||||
.map(|content| {
|
||||
content.iter().any(|span| {
|
||||
span.get("type").and_then(Value::as_str) == Some("input_text")
|
||||
&& span.get("text").and_then(Value::as_str) == Some(text)
|
||||
})
|
||||
})
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
async fn run_mid_turn_injection_test(mid_turn_op: MidTurnOp) -> Result<()> {
|
||||
let harness = TestCodexHarness::new().await?;
|
||||
let test = harness.test();
|
||||
let codex = test.codex.clone();
|
||||
let session_model = test.session_configured.model.clone();
|
||||
let cwd = test.cwd_path().to_path_buf();
|
||||
|
||||
let call_id = "shell-mid-turn";
|
||||
let first_message = "first message";
|
||||
let mid_turn_message = "mid-turn message";
|
||||
let workdir = cwd.to_string_lossy().to_string();
|
||||
|
||||
let args = json!({
|
||||
"command": ["bash", "-lc", "sleep 2; echo finished"],
|
||||
"workdir": workdir,
|
||||
"timeout_ms": 10_000,
|
||||
});
|
||||
|
||||
let first_response = sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_function_call(call_id, "shell", &serde_json::to_string(&args)?),
|
||||
ev_completed("resp-1"),
|
||||
]);
|
||||
let second_response = sse(vec![
|
||||
ev_response_created("resp-2"),
|
||||
ev_assistant_message("msg-1", "follow up"),
|
||||
ev_completed("resp-2"),
|
||||
]);
|
||||
|
||||
mount_sse_once(harness.server(), first_response).await;
|
||||
let request_log = mount_sse_once(harness.server(), second_response).await;
|
||||
|
||||
codex
|
||||
.submit(Op::UserTurn {
|
||||
items: vec![UserInput::Text {
|
||||
text: first_message.to_string(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
cwd: cwd.clone(),
|
||||
approval_policy: AskForApproval::Never,
|
||||
sandbox_policy: SandboxPolicy::DangerFullAccess,
|
||||
model: session_model.clone(),
|
||||
effort: None,
|
||||
summary: ReasoningSummary::Auto,
|
||||
})
|
||||
.await?;
|
||||
|
||||
let _ = wait_for_event_match(&codex, |event| match event {
|
||||
EventMsg::ExecCommandBegin(ev) if ev.call_id == call_id => Some(ev.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
|
||||
match mid_turn_op {
|
||||
MidTurnOp::UserInput => {
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: mid_turn_message.to_string(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
MidTurnOp::UserTurn => {
|
||||
codex
|
||||
.submit(Op::UserTurn {
|
||||
items: vec![UserInput::Text {
|
||||
text: mid_turn_message.to_string(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
cwd: cwd.clone(),
|
||||
approval_policy: AskForApproval::Never,
|
||||
sandbox_policy: SandboxPolicy::DangerFullAccess,
|
||||
model: session_model,
|
||||
effort: None,
|
||||
summary: ReasoningSummary::Auto,
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
let end_event = wait_for_event_match(&codex, |event| match event {
|
||||
EventMsg::ExecCommandEnd(ev) if ev.call_id == call_id => Some(ev.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
assert_eq!(end_event.exit_code, 0);
|
||||
assert!(
|
||||
end_event.stdout.contains("finished"),
|
||||
"expected stdout to include finished: {}",
|
||||
end_event.stdout
|
||||
);
|
||||
|
||||
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
let request = request_log.single_request();
|
||||
let user_messages = request.message_input_texts("user");
|
||||
assert_eq!(
|
||||
user_messages,
|
||||
vec![first_message.to_string(), mid_turn_message.to_string()]
|
||||
);
|
||||
|
||||
let input = request.input();
|
||||
let tool_index = input
|
||||
.iter()
|
||||
.position(|item| {
|
||||
item.get("type").and_then(Value::as_str) == Some("function_call_output")
|
||||
&& item.get("call_id").and_then(Value::as_str) == Some(call_id)
|
||||
})
|
||||
.expect("expected function_call_output in request");
|
||||
let mid_turn_index = input
|
||||
.iter()
|
||||
.position(|item| message_contains_text(item, mid_turn_message))
|
||||
.expect("expected mid-turn user message in request");
|
||||
assert!(
|
||||
tool_index < mid_turn_index,
|
||||
"expected tool output before mid-turn input"
|
||||
);
|
||||
|
||||
let tool_output = request
|
||||
.function_call_output_text(call_id)
|
||||
.expect("expected function_call_output output text");
|
||||
assert!(
|
||||
tool_output.contains("finished"),
|
||||
"expected tool output to include finished: {tool_output}"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn mid_turn_input_inserts_user_input_after_tool_output() -> Result<()> {
|
||||
run_mid_turn_injection_test(MidTurnOp::UserInput).await
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn mid_turn_input_inserts_user_turn_after_tool_output() -> Result<()> {
|
||||
run_mid_turn_injection_test(MidTurnOp::UserTurn).await
|
||||
}
|
||||
@@ -699,7 +699,7 @@ pub enum AgentStatus {
|
||||
Completed(Option<String>),
|
||||
/// Agent encountered an error.
|
||||
Errored(String),
|
||||
/// Agent has been shutdowned.
|
||||
/// Agent has been shutdown.
|
||||
Shutdown,
|
||||
/// Agent is not found.
|
||||
NotFound,
|
||||
|
||||
@@ -80,7 +80,6 @@ const LARGE_PASTE_CHAR_THRESHOLD: usize = 1000;
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum InputResult {
|
||||
Submitted(String),
|
||||
Queued(String),
|
||||
Command(SlashCommand),
|
||||
CommandWithArgs(SlashCommand, String),
|
||||
None,
|
||||
@@ -102,12 +101,6 @@ enum PromptSelectionAction {
|
||||
Submit { text: String },
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq)]
|
||||
enum SubmitMode {
|
||||
Submit,
|
||||
Queue,
|
||||
}
|
||||
|
||||
pub(crate) struct ChatComposer {
|
||||
textarea: TextArea,
|
||||
textarea_state: RefCell<TextAreaState>,
|
||||
@@ -1205,169 +1198,159 @@ impl ChatComposer {
|
||||
}
|
||||
self.handle_input_basic(key_event)
|
||||
}
|
||||
KeyEvent {
|
||||
code: KeyCode::Char('k'),
|
||||
modifiers: KeyModifiers::CONTROL,
|
||||
kind: KeyEventKind::Press,
|
||||
..
|
||||
} => self.handle_submit(SubmitMode::Queue),
|
||||
KeyEvent {
|
||||
code: KeyCode::Enter,
|
||||
modifiers: KeyModifiers::NONE,
|
||||
..
|
||||
} => self.handle_submit(SubmitMode::Submit),
|
||||
input => self.handle_input_basic(input),
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_submit(&mut self, mode: SubmitMode) -> (InputResult, bool) {
|
||||
// If the first line is a bare built-in slash command (no args),
|
||||
// dispatch it even when the slash popup isn't visible. This preserves
|
||||
// the workflow: type a prefix ("/di"), press Tab to complete to
|
||||
// "/diff ", then press Enter to run it. Tab moves the cursor beyond
|
||||
// the '/name' token and our caret-based heuristic hides the popup,
|
||||
// but Enter should still dispatch the command rather than submit
|
||||
// literal text.
|
||||
let first_line = self.textarea.text().lines().next().unwrap_or("");
|
||||
if let Some((name, rest)) = parse_slash_name(first_line)
|
||||
&& rest.is_empty()
|
||||
&& let Some((_n, cmd)) = built_in_slash_commands()
|
||||
.into_iter()
|
||||
.filter(|(_, cmd)| {
|
||||
windows_degraded_sandbox_active() || *cmd != SlashCommand::ElevateSandbox
|
||||
})
|
||||
.find(|(n, _)| *n == name)
|
||||
{
|
||||
self.textarea.set_text("");
|
||||
return (InputResult::Command(cmd), true);
|
||||
}
|
||||
// If we're in a paste-like burst capture, treat Enter as part of the burst
|
||||
// and accumulate it rather than submitting or inserting immediately.
|
||||
// Do not treat Enter as paste inside a slash-command context.
|
||||
let in_slash_context = matches!(self.active_popup, ActivePopup::Command(_))
|
||||
|| self
|
||||
.textarea
|
||||
.text()
|
||||
.lines()
|
||||
.next()
|
||||
.unwrap_or("")
|
||||
.starts_with('/');
|
||||
if self.paste_burst.is_active() && !in_slash_context {
|
||||
let now = Instant::now();
|
||||
if self.paste_burst.append_newline_if_active(now) {
|
||||
return (InputResult::None, true);
|
||||
}
|
||||
}
|
||||
// If we have pending placeholder pastes, replace them in the textarea text
|
||||
// and continue to the normal submission flow to handle slash commands.
|
||||
if !self.pending_pastes.is_empty() {
|
||||
let mut text = self.textarea.text().to_string();
|
||||
for (placeholder, actual) in &self.pending_pastes {
|
||||
if text.contains(placeholder) {
|
||||
text = text.replace(placeholder, actual);
|
||||
} => {
|
||||
// If the first line is a bare built-in slash command (no args),
|
||||
// dispatch it even when the slash popup isn't visible. This preserves
|
||||
// the workflow: type a prefix ("/di"), press Tab to complete to
|
||||
// "/diff ", then press Enter to run it. Tab moves the cursor beyond
|
||||
// the '/name' token and our caret-based heuristic hides the popup,
|
||||
// but Enter should still dispatch the command rather than submit
|
||||
// literal text.
|
||||
let first_line = self.textarea.text().lines().next().unwrap_or("");
|
||||
if let Some((name, rest)) = parse_slash_name(first_line)
|
||||
&& rest.is_empty()
|
||||
&& let Some((_n, cmd)) = built_in_slash_commands()
|
||||
.into_iter()
|
||||
.filter(|(_, cmd)| {
|
||||
windows_degraded_sandbox_active()
|
||||
|| *cmd != SlashCommand::ElevateSandbox
|
||||
})
|
||||
.find(|(n, _)| *n == name)
|
||||
{
|
||||
self.textarea.set_text("");
|
||||
return (InputResult::Command(cmd), true);
|
||||
}
|
||||
// If we're in a paste-like burst capture, treat Enter as part of the burst
|
||||
// and accumulate it rather than submitting or inserting immediately.
|
||||
// Do not treat Enter as paste inside a slash-command context.
|
||||
let in_slash_context = matches!(self.active_popup, ActivePopup::Command(_))
|
||||
|| self
|
||||
.textarea
|
||||
.text()
|
||||
.lines()
|
||||
.next()
|
||||
.unwrap_or("")
|
||||
.starts_with('/');
|
||||
if self.paste_burst.is_active() && !in_slash_context {
|
||||
let now = Instant::now();
|
||||
if self.paste_burst.append_newline_if_active(now) {
|
||||
return (InputResult::None, true);
|
||||
}
|
||||
}
|
||||
// If we have pending placeholder pastes, replace them in the textarea text
|
||||
// and continue to the normal submission flow to handle slash commands.
|
||||
if !self.pending_pastes.is_empty() {
|
||||
let mut text = self.textarea.text().to_string();
|
||||
for (placeholder, actual) in &self.pending_pastes {
|
||||
if text.contains(placeholder) {
|
||||
text = text.replace(placeholder, actual);
|
||||
}
|
||||
}
|
||||
self.textarea.set_text(&text);
|
||||
self.pending_pastes.clear();
|
||||
}
|
||||
}
|
||||
self.textarea.set_text(&text);
|
||||
self.pending_pastes.clear();
|
||||
}
|
||||
|
||||
// During a paste-like burst, treat Enter as a newline instead of submit.
|
||||
let now = Instant::now();
|
||||
if self
|
||||
.paste_burst
|
||||
.newline_should_insert_instead_of_submit(now)
|
||||
&& !in_slash_context
|
||||
{
|
||||
self.textarea.insert_str("\n");
|
||||
self.paste_burst.extend_window(now);
|
||||
return (InputResult::None, true);
|
||||
}
|
||||
let mut text = self.textarea.text().to_string();
|
||||
let original_input = text.clone();
|
||||
let input_starts_with_space = original_input.starts_with(' ');
|
||||
self.textarea.set_text("");
|
||||
|
||||
// Replace all pending pastes in the text
|
||||
for (placeholder, actual) in &self.pending_pastes {
|
||||
if text.contains(placeholder) {
|
||||
text = text.replace(placeholder, actual);
|
||||
}
|
||||
}
|
||||
self.pending_pastes.clear();
|
||||
|
||||
// If there is neither text nor attachments, suppress submission entirely.
|
||||
let has_attachments = !self.attached_images.is_empty();
|
||||
text = text.trim().to_string();
|
||||
if let Some((name, _rest)) = parse_slash_name(&text) {
|
||||
let treat_as_plain_text = input_starts_with_space || name.contains('/');
|
||||
if !treat_as_plain_text {
|
||||
let is_builtin = built_in_slash_commands()
|
||||
.into_iter()
|
||||
.filter(|(_, cmd)| {
|
||||
windows_degraded_sandbox_active() || *cmd != SlashCommand::ElevateSandbox
|
||||
})
|
||||
.any(|(command_name, _)| command_name == name);
|
||||
let prompt_prefix = format!("{PROMPTS_CMD_PREFIX}:");
|
||||
let is_known_prompt = name
|
||||
.strip_prefix(&prompt_prefix)
|
||||
.map(|prompt_name| {
|
||||
self.custom_prompts
|
||||
.iter()
|
||||
.any(|prompt| prompt.name == prompt_name)
|
||||
})
|
||||
.unwrap_or(false);
|
||||
if !is_builtin && !is_known_prompt {
|
||||
let message = format!(
|
||||
r#"Unrecognized command '/{name}'. Type "/" for a list of supported commands."#
|
||||
);
|
||||
self.app_event_tx.send(AppEvent::InsertHistoryCell(Box::new(
|
||||
history_cell::new_info_event(message, None),
|
||||
)));
|
||||
self.textarea.set_text(&original_input);
|
||||
self.textarea.set_cursor(original_input.len());
|
||||
// During a paste-like burst, treat Enter as a newline instead of submit.
|
||||
let now = Instant::now();
|
||||
if self
|
||||
.paste_burst
|
||||
.newline_should_insert_instead_of_submit(now)
|
||||
&& !in_slash_context
|
||||
{
|
||||
self.textarea.insert_str("\n");
|
||||
self.paste_burst.extend_window(now);
|
||||
return (InputResult::None, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut text = self.textarea.text().to_string();
|
||||
let original_input = text.clone();
|
||||
let input_starts_with_space = original_input.starts_with(' ');
|
||||
self.textarea.set_text("");
|
||||
|
||||
if !input_starts_with_space
|
||||
&& let Some((name, rest)) = parse_slash_name(&text)
|
||||
&& !rest.is_empty()
|
||||
&& !name.contains('/')
|
||||
&& let Some((_n, cmd)) = built_in_slash_commands()
|
||||
.into_iter()
|
||||
.find(|(command_name, _)| *command_name == name)
|
||||
&& cmd == SlashCommand::Review
|
||||
{
|
||||
return (InputResult::CommandWithArgs(cmd, rest.to_string()), true);
|
||||
}
|
||||
// Replace all pending pastes in the text
|
||||
for (placeholder, actual) in &self.pending_pastes {
|
||||
if text.contains(placeholder) {
|
||||
text = text.replace(placeholder, actual);
|
||||
}
|
||||
}
|
||||
self.pending_pastes.clear();
|
||||
|
||||
let expanded_prompt = match expand_custom_prompt(&text, &self.custom_prompts) {
|
||||
Ok(expanded) => expanded,
|
||||
Err(err) => {
|
||||
self.app_event_tx.send(AppEvent::InsertHistoryCell(Box::new(
|
||||
history_cell::new_error_event(err.user_message()),
|
||||
)));
|
||||
self.textarea.set_text(&original_input);
|
||||
self.textarea.set_cursor(original_input.len());
|
||||
return (InputResult::None, true);
|
||||
// If there is neither text nor attachments, suppress submission entirely.
|
||||
let has_attachments = !self.attached_images.is_empty();
|
||||
text = text.trim().to_string();
|
||||
if let Some((name, _rest)) = parse_slash_name(&text) {
|
||||
let treat_as_plain_text = input_starts_with_space || name.contains('/');
|
||||
if !treat_as_plain_text {
|
||||
let is_builtin = built_in_slash_commands()
|
||||
.into_iter()
|
||||
.filter(|(_, cmd)| {
|
||||
windows_degraded_sandbox_active()
|
||||
|| *cmd != SlashCommand::ElevateSandbox
|
||||
})
|
||||
.any(|(command_name, _)| command_name == name);
|
||||
let prompt_prefix = format!("{PROMPTS_CMD_PREFIX}:");
|
||||
let is_known_prompt = name
|
||||
.strip_prefix(&prompt_prefix)
|
||||
.map(|prompt_name| {
|
||||
self.custom_prompts
|
||||
.iter()
|
||||
.any(|prompt| prompt.name == prompt_name)
|
||||
})
|
||||
.unwrap_or(false);
|
||||
if !is_builtin && !is_known_prompt {
|
||||
let message = format!(
|
||||
r#"Unrecognized command '/{name}'. Type "/" for a list of supported commands."#
|
||||
);
|
||||
self.app_event_tx.send(AppEvent::InsertHistoryCell(Box::new(
|
||||
history_cell::new_info_event(message, None),
|
||||
)));
|
||||
self.textarea.set_text(&original_input);
|
||||
self.textarea.set_cursor(original_input.len());
|
||||
return (InputResult::None, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !input_starts_with_space
|
||||
&& let Some((name, rest)) = parse_slash_name(&text)
|
||||
&& !rest.is_empty()
|
||||
&& !name.contains('/')
|
||||
&& let Some((_n, cmd)) = built_in_slash_commands()
|
||||
.into_iter()
|
||||
.find(|(command_name, _)| *command_name == name)
|
||||
&& cmd == SlashCommand::Review
|
||||
{
|
||||
return (InputResult::CommandWithArgs(cmd, rest.to_string()), true);
|
||||
}
|
||||
|
||||
let expanded_prompt = match expand_custom_prompt(&text, &self.custom_prompts) {
|
||||
Ok(expanded) => expanded,
|
||||
Err(err) => {
|
||||
self.app_event_tx.send(AppEvent::InsertHistoryCell(Box::new(
|
||||
history_cell::new_error_event(err.user_message()),
|
||||
)));
|
||||
self.textarea.set_text(&original_input);
|
||||
self.textarea.set_cursor(original_input.len());
|
||||
return (InputResult::None, true);
|
||||
}
|
||||
};
|
||||
if let Some(expanded) = expanded_prompt {
|
||||
text = expanded;
|
||||
}
|
||||
if text.is_empty() && !has_attachments {
|
||||
return (InputResult::None, true);
|
||||
}
|
||||
if !text.is_empty() {
|
||||
self.history.record_local_submission(&text);
|
||||
}
|
||||
// Do not clear attached_images here; ChatWidget drains them via take_recent_submission_images().
|
||||
(InputResult::Submitted(text), true)
|
||||
}
|
||||
};
|
||||
if let Some(expanded) = expanded_prompt {
|
||||
text = expanded;
|
||||
input => self.handle_input_basic(input),
|
||||
}
|
||||
if text.is_empty() && !has_attachments {
|
||||
return (InputResult::None, true);
|
||||
}
|
||||
if !text.is_empty() {
|
||||
self.history.record_local_submission(&text);
|
||||
}
|
||||
// Do not clear attached_images here; ChatWidget drains them via take_recent_submission_images().
|
||||
let result = match mode {
|
||||
SubmitMode::Submit => InputResult::Submitted(text),
|
||||
SubmitMode::Queue => InputResult::Queued(text),
|
||||
};
|
||||
(result, true)
|
||||
}
|
||||
|
||||
fn handle_paste_burst_flush(&mut self, now: Instant) -> bool {
|
||||
|
||||
@@ -266,7 +266,6 @@ enum ShortcutId {
|
||||
Commands,
|
||||
ShellCommands,
|
||||
InsertNewline,
|
||||
QueueMessage,
|
||||
FilePaths,
|
||||
PasteImage,
|
||||
ExternalEditor,
|
||||
@@ -373,15 +372,6 @@ const SHORTCUTS: &[ShortcutDescriptor] = &[
|
||||
prefix: "",
|
||||
label: " for newline",
|
||||
},
|
||||
ShortcutDescriptor {
|
||||
id: ShortcutId::QueueMessage,
|
||||
bindings: &[ShortcutBinding {
|
||||
key: key_hint::ctrl(KeyCode::Char('k')),
|
||||
condition: DisplayCondition::Always,
|
||||
}],
|
||||
prefix: "",
|
||||
label: " to queue message",
|
||||
},
|
||||
ShortcutDescriptor {
|
||||
id: ShortcutId::FilePaths,
|
||||
bindings: &[ShortcutBinding {
|
||||
|
||||
@@ -1,19 +0,0 @@
|
||||
---
|
||||
source: tui/src/bottom_pane/chat_composer.rs
|
||||
assertion_line: 2127
|
||||
expression: terminal.backend()
|
||||
---
|
||||
" "
|
||||
"› Ask Codex to do anything "
|
||||
" "
|
||||
" "
|
||||
" "
|
||||
" "
|
||||
" "
|
||||
" "
|
||||
" / for commands ! for shell commands "
|
||||
" shift + enter for newline ctrl + enter to send immediately "
|
||||
" @ for file paths ctrl + v to paste images "
|
||||
" ctrl + g to edit in external editor esc again to edit previous message "
|
||||
" ctrl + c to exit "
|
||||
" ctrl + t to view transcript "
|
||||
@@ -1,11 +0,0 @@
|
||||
---
|
||||
source: tui/src/bottom_pane/footer.rs
|
||||
assertion_line: 468
|
||||
expression: terminal.backend()
|
||||
---
|
||||
" / for commands ! for shell commands "
|
||||
" shift + enter for newline ctrl + enter to send immediately "
|
||||
" @ for file paths ctrl + v to paste images "
|
||||
" ctrl + g to edit in external editor esc again to edit previous message "
|
||||
" ctrl + c to exit "
|
||||
" ctrl + t to view transcript "
|
||||
@@ -1630,29 +1630,25 @@ impl ChatWidget {
|
||||
self.request_redraw();
|
||||
}
|
||||
}
|
||||
_ => match self.bottom_pane.handle_key_event(key_event) {
|
||||
InputResult::Submitted(text) => {
|
||||
let user_message = UserMessage {
|
||||
text,
|
||||
image_paths: self.bottom_pane.take_recent_submission_images(),
|
||||
};
|
||||
self.submit_user_message(user_message);
|
||||
_ => {
|
||||
match self.bottom_pane.handle_key_event(key_event) {
|
||||
InputResult::Submitted(text) => {
|
||||
// If a task is running, queue the user input to be sent after the turn completes.
|
||||
let user_message = UserMessage {
|
||||
text,
|
||||
image_paths: self.bottom_pane.take_recent_submission_images(),
|
||||
};
|
||||
self.queue_user_message(user_message);
|
||||
}
|
||||
InputResult::Command(cmd) => {
|
||||
self.dispatch_command(cmd);
|
||||
}
|
||||
InputResult::CommandWithArgs(cmd, args) => {
|
||||
self.dispatch_command_with_args(cmd, args);
|
||||
}
|
||||
InputResult::None => {}
|
||||
}
|
||||
InputResult::Queued(text) => {
|
||||
let user_message = UserMessage {
|
||||
text,
|
||||
image_paths: self.bottom_pane.take_recent_submission_images(),
|
||||
};
|
||||
self.queue_user_message(user_message);
|
||||
}
|
||||
InputResult::Command(cmd) => {
|
||||
self.dispatch_command(cmd);
|
||||
}
|
||||
InputResult::CommandWithArgs(cmd, args) => {
|
||||
self.dispatch_command_with_args(cmd, args);
|
||||
}
|
||||
InputResult::None => {}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1057,7 +1057,7 @@ async fn enqueueing_history_prompt_multiple_times_is_stable() {
|
||||
assert_eq!(chat.bottom_pane.composer_text(), "repeat me");
|
||||
|
||||
// Queue the prompt while the task is running.
|
||||
chat.handle_key_event(KeyEvent::new(KeyCode::Char('k'), KeyModifiers::CONTROL));
|
||||
chat.handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE));
|
||||
}
|
||||
|
||||
assert_eq!(chat.queued_user_messages.len(), 3);
|
||||
@@ -1079,7 +1079,7 @@ async fn streaming_final_answer_keeps_task_running_state() {
|
||||
|
||||
chat.bottom_pane
|
||||
.set_composer_text("queued submission".to_string());
|
||||
chat.handle_key_event(KeyEvent::new(KeyCode::Char('k'), KeyModifiers::CONTROL));
|
||||
chat.handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE));
|
||||
|
||||
assert_eq!(chat.queued_user_messages.len(), 1);
|
||||
assert_eq!(
|
||||
|
||||
@@ -54,9 +54,7 @@ impl ComposerInput {
|
||||
/// Feed a key event into the composer and return a high-level action.
|
||||
pub fn input(&mut self, key: KeyEvent) -> ComposerAction {
|
||||
let action = match self.inner.handle_key_event(key).0 {
|
||||
InputResult::Submitted(text) | InputResult::Queued(text) => {
|
||||
ComposerAction::Submitted(text)
|
||||
}
|
||||
InputResult::Submitted(text) => ComposerAction::Submitted(text),
|
||||
_ => ComposerAction::None,
|
||||
};
|
||||
self.drain_app_events();
|
||||
|
||||
@@ -83,7 +83,6 @@ const LARGE_PASTE_CHAR_THRESHOLD: usize = 1000;
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum InputResult {
|
||||
Submitted(String),
|
||||
Queued(String),
|
||||
Command(SlashCommand),
|
||||
CommandWithArgs(SlashCommand, String),
|
||||
None,
|
||||
@@ -105,12 +104,6 @@ enum PromptSelectionAction {
|
||||
Submit { text: String },
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq)]
|
||||
enum SubmitMode {
|
||||
Submit,
|
||||
Queue,
|
||||
}
|
||||
|
||||
pub(crate) struct ChatComposer {
|
||||
textarea: TextArea,
|
||||
textarea_state: RefCell<TextAreaState>,
|
||||
@@ -1139,169 +1132,159 @@ impl ChatComposer {
|
||||
}
|
||||
self.handle_input_basic(key_event)
|
||||
}
|
||||
KeyEvent {
|
||||
code: KeyCode::Char('k'),
|
||||
modifiers: KeyModifiers::CONTROL,
|
||||
kind: KeyEventKind::Press,
|
||||
..
|
||||
} => self.handle_submit(SubmitMode::Queue),
|
||||
KeyEvent {
|
||||
code: KeyCode::Enter,
|
||||
modifiers: KeyModifiers::NONE,
|
||||
..
|
||||
} => self.handle_submit(SubmitMode::Submit),
|
||||
input => self.handle_input_basic(input),
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_submit(&mut self, mode: SubmitMode) -> (InputResult, bool) {
|
||||
// If the first line is a bare built-in slash command (no args),
|
||||
// dispatch it even when the slash popup isn't visible. This preserves
|
||||
// the workflow: type a prefix ("/di"), press Tab to complete to
|
||||
// "/diff ", then press Enter to run it. Tab moves the cursor beyond
|
||||
// the '/name' token and our caret-based heuristic hides the popup,
|
||||
// but Enter should still dispatch the command rather than submit
|
||||
// literal text.
|
||||
let first_line = self.textarea.text().lines().next().unwrap_or("");
|
||||
if let Some((name, rest)) = parse_slash_name(first_line)
|
||||
&& rest.is_empty()
|
||||
&& let Some((_n, cmd)) = built_in_slash_commands()
|
||||
.into_iter()
|
||||
.filter(|(_, cmd)| {
|
||||
windows_degraded_sandbox_active() || *cmd != SlashCommand::ElevateSandbox
|
||||
})
|
||||
.find(|(n, _)| *n == name)
|
||||
{
|
||||
self.textarea.set_text("");
|
||||
return (InputResult::Command(cmd), true);
|
||||
}
|
||||
// If we're in a paste-like burst capture, treat Enter as part of the burst
|
||||
// and accumulate it rather than submitting or inserting immediately.
|
||||
// Do not treat Enter as paste inside a slash-command context.
|
||||
let in_slash_context = matches!(self.active_popup, ActivePopup::Command(_))
|
||||
|| self
|
||||
.textarea
|
||||
.text()
|
||||
.lines()
|
||||
.next()
|
||||
.unwrap_or("")
|
||||
.starts_with('/');
|
||||
if self.paste_burst.is_active() && !in_slash_context {
|
||||
let now = Instant::now();
|
||||
if self.paste_burst.append_newline_if_active(now) {
|
||||
return (InputResult::None, true);
|
||||
}
|
||||
}
|
||||
// If we have pending placeholder pastes, replace them in the textarea text
|
||||
// and continue to the normal submission flow to handle slash commands.
|
||||
if !self.pending_pastes.is_empty() {
|
||||
let mut text = self.textarea.text().to_string();
|
||||
for (placeholder, actual) in &self.pending_pastes {
|
||||
if text.contains(placeholder) {
|
||||
text = text.replace(placeholder, actual);
|
||||
} => {
|
||||
// If the first line is a bare built-in slash command (no args),
|
||||
// dispatch it even when the slash popup isn't visible. This preserves
|
||||
// the workflow: type a prefix ("/di"), press Tab to complete to
|
||||
// "/diff ", then press Enter to run it. Tab moves the cursor beyond
|
||||
// the '/name' token and our caret-based heuristic hides the popup,
|
||||
// but Enter should still dispatch the command rather than submit
|
||||
// literal text.
|
||||
let first_line = self.textarea.text().lines().next().unwrap_or("");
|
||||
if let Some((name, rest)) = parse_slash_name(first_line)
|
||||
&& rest.is_empty()
|
||||
&& let Some((_n, cmd)) = built_in_slash_commands()
|
||||
.into_iter()
|
||||
.filter(|(_, cmd)| {
|
||||
windows_degraded_sandbox_active()
|
||||
|| *cmd != SlashCommand::ElevateSandbox
|
||||
})
|
||||
.find(|(n, _)| *n == name)
|
||||
{
|
||||
self.textarea.set_text("");
|
||||
return (InputResult::Command(cmd), true);
|
||||
}
|
||||
// If we're in a paste-like burst capture, treat Enter as part of the burst
|
||||
// and accumulate it rather than submitting or inserting immediately.
|
||||
// Do not treat Enter as paste inside a slash-command context.
|
||||
let in_slash_context = matches!(self.active_popup, ActivePopup::Command(_))
|
||||
|| self
|
||||
.textarea
|
||||
.text()
|
||||
.lines()
|
||||
.next()
|
||||
.unwrap_or("")
|
||||
.starts_with('/');
|
||||
if self.paste_burst.is_active() && !in_slash_context {
|
||||
let now = Instant::now();
|
||||
if self.paste_burst.append_newline_if_active(now) {
|
||||
return (InputResult::None, true);
|
||||
}
|
||||
}
|
||||
// If we have pending placeholder pastes, replace them in the textarea text
|
||||
// and continue to the normal submission flow to handle slash commands.
|
||||
if !self.pending_pastes.is_empty() {
|
||||
let mut text = self.textarea.text().to_string();
|
||||
for (placeholder, actual) in &self.pending_pastes {
|
||||
if text.contains(placeholder) {
|
||||
text = text.replace(placeholder, actual);
|
||||
}
|
||||
}
|
||||
self.textarea.set_text(&text);
|
||||
self.pending_pastes.clear();
|
||||
}
|
||||
}
|
||||
self.textarea.set_text(&text);
|
||||
self.pending_pastes.clear();
|
||||
}
|
||||
|
||||
// During a paste-like burst, treat Enter as a newline instead of submit.
|
||||
let now = Instant::now();
|
||||
if self
|
||||
.paste_burst
|
||||
.newline_should_insert_instead_of_submit(now)
|
||||
&& !in_slash_context
|
||||
{
|
||||
self.textarea.insert_str("\n");
|
||||
self.paste_burst.extend_window(now);
|
||||
return (InputResult::None, true);
|
||||
}
|
||||
let mut text = self.textarea.text().to_string();
|
||||
let original_input = text.clone();
|
||||
let input_starts_with_space = original_input.starts_with(' ');
|
||||
self.textarea.set_text("");
|
||||
|
||||
// Replace all pending pastes in the text
|
||||
for (placeholder, actual) in &self.pending_pastes {
|
||||
if text.contains(placeholder) {
|
||||
text = text.replace(placeholder, actual);
|
||||
}
|
||||
}
|
||||
self.pending_pastes.clear();
|
||||
|
||||
// If there is neither text nor attachments, suppress submission entirely.
|
||||
let has_attachments = !self.attached_images.is_empty();
|
||||
text = text.trim().to_string();
|
||||
if let Some((name, _rest)) = parse_slash_name(&text) {
|
||||
let treat_as_plain_text = input_starts_with_space || name.contains('/');
|
||||
if !treat_as_plain_text {
|
||||
let is_builtin = built_in_slash_commands()
|
||||
.into_iter()
|
||||
.filter(|(_, cmd)| {
|
||||
windows_degraded_sandbox_active() || *cmd != SlashCommand::ElevateSandbox
|
||||
})
|
||||
.any(|(command_name, _)| command_name == name);
|
||||
let prompt_prefix = format!("{PROMPTS_CMD_PREFIX}:");
|
||||
let is_known_prompt = name
|
||||
.strip_prefix(&prompt_prefix)
|
||||
.map(|prompt_name| {
|
||||
self.custom_prompts
|
||||
.iter()
|
||||
.any(|prompt| prompt.name == prompt_name)
|
||||
})
|
||||
.unwrap_or(false);
|
||||
if !is_builtin && !is_known_prompt {
|
||||
let message = format!(
|
||||
r#"Unrecognized command '/{name}'. Type "/" for a list of supported commands."#
|
||||
);
|
||||
self.app_event_tx.send(AppEvent::InsertHistoryCell(Box::new(
|
||||
history_cell::new_info_event(message, None),
|
||||
)));
|
||||
self.textarea.set_text(&original_input);
|
||||
self.textarea.set_cursor(original_input.len());
|
||||
// During a paste-like burst, treat Enter as a newline instead of submit.
|
||||
let now = Instant::now();
|
||||
if self
|
||||
.paste_burst
|
||||
.newline_should_insert_instead_of_submit(now)
|
||||
&& !in_slash_context
|
||||
{
|
||||
self.textarea.insert_str("\n");
|
||||
self.paste_burst.extend_window(now);
|
||||
return (InputResult::None, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut text = self.textarea.text().to_string();
|
||||
let original_input = text.clone();
|
||||
let input_starts_with_space = original_input.starts_with(' ');
|
||||
self.textarea.set_text("");
|
||||
|
||||
if !input_starts_with_space
|
||||
&& let Some((name, rest)) = parse_slash_name(&text)
|
||||
&& !rest.is_empty()
|
||||
&& !name.contains('/')
|
||||
&& let Some((_n, cmd)) = built_in_slash_commands()
|
||||
.into_iter()
|
||||
.find(|(command_name, _)| *command_name == name)
|
||||
&& cmd == SlashCommand::Review
|
||||
{
|
||||
return (InputResult::CommandWithArgs(cmd, rest.to_string()), true);
|
||||
}
|
||||
// Replace all pending pastes in the text
|
||||
for (placeholder, actual) in &self.pending_pastes {
|
||||
if text.contains(placeholder) {
|
||||
text = text.replace(placeholder, actual);
|
||||
}
|
||||
}
|
||||
self.pending_pastes.clear();
|
||||
|
||||
let expanded_prompt = match expand_custom_prompt(&text, &self.custom_prompts) {
|
||||
Ok(expanded) => expanded,
|
||||
Err(err) => {
|
||||
self.app_event_tx.send(AppEvent::InsertHistoryCell(Box::new(
|
||||
history_cell::new_error_event(err.user_message()),
|
||||
)));
|
||||
self.textarea.set_text(&original_input);
|
||||
self.textarea.set_cursor(original_input.len());
|
||||
return (InputResult::None, true);
|
||||
// If there is neither text nor attachments, suppress submission entirely.
|
||||
let has_attachments = !self.attached_images.is_empty();
|
||||
text = text.trim().to_string();
|
||||
if let Some((name, _rest)) = parse_slash_name(&text) {
|
||||
let treat_as_plain_text = input_starts_with_space || name.contains('/');
|
||||
if !treat_as_plain_text {
|
||||
let is_builtin = built_in_slash_commands()
|
||||
.into_iter()
|
||||
.filter(|(_, cmd)| {
|
||||
windows_degraded_sandbox_active()
|
||||
|| *cmd != SlashCommand::ElevateSandbox
|
||||
})
|
||||
.any(|(command_name, _)| command_name == name);
|
||||
let prompt_prefix = format!("{PROMPTS_CMD_PREFIX}:");
|
||||
let is_known_prompt = name
|
||||
.strip_prefix(&prompt_prefix)
|
||||
.map(|prompt_name| {
|
||||
self.custom_prompts
|
||||
.iter()
|
||||
.any(|prompt| prompt.name == prompt_name)
|
||||
})
|
||||
.unwrap_or(false);
|
||||
if !is_builtin && !is_known_prompt {
|
||||
let message = format!(
|
||||
r#"Unrecognized command '/{name}'. Type "/" for a list of supported commands."#
|
||||
);
|
||||
self.app_event_tx.send(AppEvent::InsertHistoryCell(Box::new(
|
||||
history_cell::new_info_event(message, None),
|
||||
)));
|
||||
self.textarea.set_text(&original_input);
|
||||
self.textarea.set_cursor(original_input.len());
|
||||
return (InputResult::None, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !input_starts_with_space
|
||||
&& let Some((name, rest)) = parse_slash_name(&text)
|
||||
&& !rest.is_empty()
|
||||
&& !name.contains('/')
|
||||
&& let Some((_n, cmd)) = built_in_slash_commands()
|
||||
.into_iter()
|
||||
.find(|(command_name, _)| *command_name == name)
|
||||
&& cmd == SlashCommand::Review
|
||||
{
|
||||
return (InputResult::CommandWithArgs(cmd, rest.to_string()), true);
|
||||
}
|
||||
|
||||
let expanded_prompt = match expand_custom_prompt(&text, &self.custom_prompts) {
|
||||
Ok(expanded) => expanded,
|
||||
Err(err) => {
|
||||
self.app_event_tx.send(AppEvent::InsertHistoryCell(Box::new(
|
||||
history_cell::new_error_event(err.user_message()),
|
||||
)));
|
||||
self.textarea.set_text(&original_input);
|
||||
self.textarea.set_cursor(original_input.len());
|
||||
return (InputResult::None, true);
|
||||
}
|
||||
};
|
||||
if let Some(expanded) = expanded_prompt {
|
||||
text = expanded;
|
||||
}
|
||||
if text.is_empty() && !has_attachments {
|
||||
return (InputResult::None, true);
|
||||
}
|
||||
if !text.is_empty() {
|
||||
self.history.record_local_submission(&text);
|
||||
}
|
||||
// Do not clear attached_images here; ChatWidget drains them via take_recent_submission_images().
|
||||
(InputResult::Submitted(text), true)
|
||||
}
|
||||
};
|
||||
if let Some(expanded) = expanded_prompt {
|
||||
text = expanded;
|
||||
input => self.handle_input_basic(input),
|
||||
}
|
||||
if text.is_empty() && !has_attachments {
|
||||
return (InputResult::None, true);
|
||||
}
|
||||
if !text.is_empty() {
|
||||
self.history.record_local_submission(&text);
|
||||
}
|
||||
// Do not clear attached_images here; ChatWidget drains them via take_recent_submission_images().
|
||||
let result = match mode {
|
||||
SubmitMode::Submit => InputResult::Submitted(text),
|
||||
SubmitMode::Queue => InputResult::Queued(text),
|
||||
};
|
||||
(result, true)
|
||||
}
|
||||
|
||||
fn handle_paste_burst_flush(&mut self, now: Instant) -> bool {
|
||||
|
||||
@@ -307,7 +307,6 @@ enum ShortcutId {
|
||||
Commands,
|
||||
ShellCommands,
|
||||
InsertNewline,
|
||||
QueueMessage,
|
||||
FilePaths,
|
||||
PasteImage,
|
||||
EditPrevious,
|
||||
@@ -413,15 +412,6 @@ const SHORTCUTS: &[ShortcutDescriptor] = &[
|
||||
prefix: "",
|
||||
label: " for newline",
|
||||
},
|
||||
ShortcutDescriptor {
|
||||
id: ShortcutId::QueueMessage,
|
||||
bindings: &[ShortcutBinding {
|
||||
key: key_hint::ctrl(KeyCode::Char('k')),
|
||||
condition: DisplayCondition::Always,
|
||||
}],
|
||||
prefix: "",
|
||||
label: " to queue message",
|
||||
},
|
||||
ShortcutDescriptor {
|
||||
id: ShortcutId::FilePaths,
|
||||
bindings: &[ShortcutBinding {
|
||||
|
||||
@@ -1489,29 +1489,25 @@ impl ChatWidget {
|
||||
self.request_redraw();
|
||||
}
|
||||
}
|
||||
_ => match self.bottom_pane.handle_key_event(key_event) {
|
||||
InputResult::Submitted(text) => {
|
||||
let user_message = UserMessage {
|
||||
text,
|
||||
image_paths: self.bottom_pane.take_recent_submission_images(),
|
||||
};
|
||||
self.submit_user_message(user_message);
|
||||
_ => {
|
||||
match self.bottom_pane.handle_key_event(key_event) {
|
||||
InputResult::Submitted(text) => {
|
||||
// If a task is running, queue the user input to be sent after the turn completes.
|
||||
let user_message = UserMessage {
|
||||
text,
|
||||
image_paths: self.bottom_pane.take_recent_submission_images(),
|
||||
};
|
||||
self.queue_user_message(user_message);
|
||||
}
|
||||
InputResult::Command(cmd) => {
|
||||
self.dispatch_command(cmd);
|
||||
}
|
||||
InputResult::CommandWithArgs(cmd, args) => {
|
||||
self.dispatch_command_with_args(cmd, args);
|
||||
}
|
||||
InputResult::None => {}
|
||||
}
|
||||
InputResult::Queued(text) => {
|
||||
let user_message = UserMessage {
|
||||
text,
|
||||
image_paths: self.bottom_pane.take_recent_submission_images(),
|
||||
};
|
||||
self.queue_user_message(user_message);
|
||||
}
|
||||
InputResult::Command(cmd) => {
|
||||
self.dispatch_command(cmd);
|
||||
}
|
||||
InputResult::CommandWithArgs(cmd, args) => {
|
||||
self.dispatch_command_with_args(cmd, args);
|
||||
}
|
||||
InputResult::None => {}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1008,7 +1008,7 @@ async fn enqueueing_history_prompt_multiple_times_is_stable() {
|
||||
assert_eq!(chat.bottom_pane.composer_text(), "repeat me");
|
||||
|
||||
// Queue the prompt while the task is running.
|
||||
chat.handle_key_event(KeyEvent::new(KeyCode::Char('k'), KeyModifiers::CONTROL));
|
||||
chat.handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE));
|
||||
}
|
||||
|
||||
assert_eq!(chat.queued_user_messages.len(), 3);
|
||||
@@ -1030,7 +1030,7 @@ async fn streaming_final_answer_keeps_task_running_state() {
|
||||
|
||||
chat.bottom_pane
|
||||
.set_composer_text("queued submission".to_string());
|
||||
chat.handle_key_event(KeyEvent::new(KeyCode::Char('k'), KeyModifiers::CONTROL));
|
||||
chat.handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE));
|
||||
|
||||
assert_eq!(chat.queued_user_messages.len(), 1);
|
||||
assert_eq!(
|
||||
|
||||
@@ -54,9 +54,7 @@ impl ComposerInput {
|
||||
/// Feed a key event into the composer and return a high-level action.
|
||||
pub fn input(&mut self, key: KeyEvent) -> ComposerAction {
|
||||
let action = match self.inner.handle_key_event(key).0 {
|
||||
InputResult::Submitted(text) | InputResult::Queued(text) => {
|
||||
ComposerAction::Submitted(text)
|
||||
}
|
||||
InputResult::Submitted(text) => ComposerAction::Submitted(text),
|
||||
_ => ComposerAction::None,
|
||||
};
|
||||
self.drain_app_events();
|
||||
|
||||
Reference in New Issue
Block a user