mirror of
https://github.com/openai/codex.git
synced 2026-02-04 16:03:46 +00:00
Compare commits
12 Commits
codex-work
...
dev/zhao/u
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b610498550 | ||
|
|
ee32bdb773 | ||
|
|
00cb2e20b4 | ||
|
|
c23b8912bd | ||
|
|
1771759de9 | ||
|
|
459c53c45a | ||
|
|
f0eb141e02 | ||
|
|
4ab82bb579 | ||
|
|
0d3afbd499 | ||
|
|
29038df468 | ||
|
|
6e8e5ebdf4 | ||
|
|
3b345e0cb4 |
@@ -15,6 +15,13 @@ use uuid::Uuid;
|
||||
|
||||
use crate::user_instructions::UserInstructions;
|
||||
|
||||
fn is_user_shell_command_prefix(text: &str) -> bool {
|
||||
let trimmed = text.trim_start();
|
||||
let lowered = trimmed.to_ascii_lowercase();
|
||||
lowered.starts_with("<user_shell_command>")
|
||||
|| lowered.starts_with("<user_shell_command_output>")
|
||||
}
|
||||
|
||||
fn is_session_prefix(text: &str) -> bool {
|
||||
let trimmed = text.trim_start();
|
||||
let lowered = trimmed.to_ascii_lowercase();
|
||||
@@ -31,7 +38,7 @@ fn parse_user_message(message: &[ContentItem]) -> Option<UserMessageItem> {
|
||||
for content_item in message.iter() {
|
||||
match content_item {
|
||||
ContentItem::InputText { text } => {
|
||||
if is_session_prefix(text) {
|
||||
if is_session_prefix(text) || is_user_shell_command_prefix(text) {
|
||||
return None;
|
||||
}
|
||||
content.push(UserInput::Text { text: text.clone() });
|
||||
@@ -42,7 +49,7 @@ fn parse_user_message(message: &[ContentItem]) -> Option<UserMessageItem> {
|
||||
});
|
||||
}
|
||||
ContentItem::OutputText { text } => {
|
||||
if is_session_prefix(text) {
|
||||
if is_session_prefix(text) || is_user_shell_command_prefix(text) {
|
||||
return None;
|
||||
}
|
||||
warn!("Output text in user message: {}", text);
|
||||
|
||||
@@ -6,6 +6,7 @@ use std::io;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::process::ExitStatus;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
@@ -23,6 +24,7 @@ use crate::protocol::EventMsg;
|
||||
use crate::protocol::ExecCommandOutputDeltaEvent;
|
||||
use crate::protocol::ExecOutputStream;
|
||||
use crate::protocol::SandboxPolicy;
|
||||
use crate::protocol::UserCommandOutputDeltaEvent;
|
||||
use crate::sandboxing::CommandSpec;
|
||||
use crate::sandboxing::ExecEnv;
|
||||
use crate::sandboxing::SandboxManager;
|
||||
@@ -84,6 +86,43 @@ pub struct StdoutStream {
|
||||
pub tx_event: Sender<Event>,
|
||||
}
|
||||
|
||||
type DeltaEventFn = dyn Fn(&str, ExecOutputStream, Vec<u8>) -> EventMsg + Send + Sync;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct DeltaEventBuilder {
|
||||
inner: Arc<DeltaEventFn>,
|
||||
}
|
||||
|
||||
impl DeltaEventBuilder {
|
||||
pub fn exec_command() -> Self {
|
||||
Self {
|
||||
inner: Arc::new(|call_id, stream, chunk| {
|
||||
EventMsg::ExecCommandOutputDelta(ExecCommandOutputDeltaEvent {
|
||||
call_id: call_id.to_string(),
|
||||
stream,
|
||||
chunk,
|
||||
})
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn user_command() -> Self {
|
||||
Self {
|
||||
inner: Arc::new(|call_id, stream, chunk| {
|
||||
EventMsg::UserCommandOutputDelta(UserCommandOutputDeltaEvent {
|
||||
call_id: call_id.to_string(),
|
||||
stream,
|
||||
chunk,
|
||||
})
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn build(&self, call_id: &str, stream: ExecOutputStream, chunk: Vec<u8>) -> EventMsg {
|
||||
(self.inner)(call_id, stream, chunk)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn process_exec_tool_call(
|
||||
params: ExecParams,
|
||||
sandbox_type: SandboxType,
|
||||
@@ -138,6 +177,7 @@ pub(crate) async fn execute_exec_env(
|
||||
env: ExecEnv,
|
||||
sandbox_policy: &SandboxPolicy,
|
||||
stdout_stream: Option<StdoutStream>,
|
||||
delta_event_builder: Option<DeltaEventBuilder>,
|
||||
) -> Result<ExecToolCallOutput> {
|
||||
let ExecEnv {
|
||||
command,
|
||||
@@ -161,7 +201,15 @@ pub(crate) async fn execute_exec_env(
|
||||
};
|
||||
|
||||
let start = Instant::now();
|
||||
let raw_output_result = exec(params, sandbox, sandbox_policy, stdout_stream).await;
|
||||
let delta_event_builder = delta_event_builder.unwrap_or_else(DeltaEventBuilder::exec_command);
|
||||
let raw_output_result = exec(
|
||||
params,
|
||||
sandbox,
|
||||
sandbox_policy,
|
||||
stdout_stream,
|
||||
delta_event_builder.clone(),
|
||||
)
|
||||
.await;
|
||||
let duration = start.elapsed();
|
||||
finalize_exec_result(raw_output_result, sandbox, duration)
|
||||
}
|
||||
@@ -434,6 +482,7 @@ async fn exec(
|
||||
sandbox: SandboxType,
|
||||
sandbox_policy: &SandboxPolicy,
|
||||
stdout_stream: Option<StdoutStream>,
|
||||
delta_event_builder: DeltaEventBuilder,
|
||||
) -> Result<RawExecToolCallOutput> {
|
||||
#[cfg(target_os = "windows")]
|
||||
if sandbox == SandboxType::WindowsRestrictedToken {
|
||||
@@ -465,7 +514,7 @@ async fn exec(
|
||||
env,
|
||||
)
|
||||
.await?;
|
||||
consume_truncated_output(child, timeout, stdout_stream).await
|
||||
consume_truncated_output(child, timeout, stdout_stream, delta_event_builder).await
|
||||
}
|
||||
|
||||
/// Consumes the output of a child process, truncating it so it is suitable for
|
||||
@@ -474,6 +523,7 @@ async fn consume_truncated_output(
|
||||
mut child: Child,
|
||||
timeout: Duration,
|
||||
stdout_stream: Option<StdoutStream>,
|
||||
delta_event_builder: DeltaEventBuilder,
|
||||
) -> Result<RawExecToolCallOutput> {
|
||||
// Both stdout and stderr were configured with `Stdio::piped()`
|
||||
// above, therefore `take()` should normally return `Some`. If it doesn't
|
||||
@@ -497,12 +547,14 @@ async fn consume_truncated_output(
|
||||
stdout_stream.clone(),
|
||||
false,
|
||||
Some(agg_tx.clone()),
|
||||
delta_event_builder.clone(),
|
||||
));
|
||||
let stderr_handle = tokio::spawn(read_capped(
|
||||
BufReader::new(stderr_reader),
|
||||
stdout_stream.clone(),
|
||||
true,
|
||||
Some(agg_tx.clone()),
|
||||
delta_event_builder.clone(),
|
||||
));
|
||||
|
||||
let (exit_status, timed_out) = tokio::select! {
|
||||
@@ -554,6 +606,7 @@ async fn read_capped<R: AsyncRead + Unpin + Send + 'static>(
|
||||
stream: Option<StdoutStream>,
|
||||
is_stderr: bool,
|
||||
aggregate_tx: Option<Sender<Vec<u8>>>,
|
||||
delta_event_builder: DeltaEventBuilder,
|
||||
) -> io::Result<StreamOutput<Vec<u8>>> {
|
||||
let mut buf = Vec::with_capacity(AGGREGATE_BUFFER_INITIAL_CAPACITY);
|
||||
let mut tmp = [0u8; READ_CHUNK_SIZE];
|
||||
@@ -571,15 +624,15 @@ async fn read_capped<R: AsyncRead + Unpin + Send + 'static>(
|
||||
&& emitted_deltas < MAX_EXEC_OUTPUT_DELTAS_PER_CALL
|
||||
{
|
||||
let chunk = tmp[..n].to_vec();
|
||||
let msg = EventMsg::ExecCommandOutputDelta(ExecCommandOutputDeltaEvent {
|
||||
call_id: stream.call_id.clone(),
|
||||
stream: if is_stderr {
|
||||
let msg = delta_event_builder.build(
|
||||
&stream.call_id,
|
||||
if is_stderr {
|
||||
ExecOutputStream::Stderr
|
||||
} else {
|
||||
ExecOutputStream::Stdout
|
||||
},
|
||||
chunk,
|
||||
});
|
||||
);
|
||||
let event = Event {
|
||||
id: stream.sub_id.clone(),
|
||||
msg,
|
||||
|
||||
@@ -41,6 +41,9 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool {
|
||||
| EventMsg::AgentReasoning(_)
|
||||
| EventMsg::AgentReasoningRawContent(_)
|
||||
| EventMsg::TokenCount(_)
|
||||
| EventMsg::UserCommandBegin(_)
|
||||
| EventMsg::UserCommandOutputDelta(_)
|
||||
| EventMsg::UserCommandEnd(_)
|
||||
| EventMsg::EnteredReviewMode(_)
|
||||
| EventMsg::ExitedReviewMode(_)
|
||||
| EventMsg::UndoCompleted(_)
|
||||
|
||||
@@ -165,5 +165,5 @@ pub async fn execute_env(
|
||||
policy: &SandboxPolicy,
|
||||
stdout_stream: Option<StdoutStream>,
|
||||
) -> crate::error::Result<ExecToolCallOutput> {
|
||||
execute_exec_env(env.clone(), policy, stdout_stream).await
|
||||
execute_exec_env(env.clone(), policy, stdout_stream, None).await
|
||||
}
|
||||
|
||||
@@ -1,28 +1,36 @@
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use codex_protocol::models::ShellToolCallParams;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::error;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::codex::TurnContext;
|
||||
use crate::exec::DeltaEventBuilder;
|
||||
use crate::exec::ExecToolCallOutput;
|
||||
use crate::exec::SandboxType;
|
||||
use crate::exec::StdoutStream;
|
||||
use crate::exec::StreamOutput;
|
||||
use crate::exec::execute_exec_env;
|
||||
use crate::exec_env::create_env;
|
||||
use crate::parse_command::parse_command;
|
||||
use crate::protocol::EventMsg;
|
||||
use crate::protocol::SandboxPolicy;
|
||||
use crate::protocol::TaskStartedEvent;
|
||||
use crate::protocol::UserCommandBeginEvent;
|
||||
use crate::protocol::UserCommandEndEvent;
|
||||
use crate::sandboxing::ExecEnv;
|
||||
use crate::state::TaskKind;
|
||||
use crate::tools::context::ToolPayload;
|
||||
use crate::tools::parallel::ToolCallRuntime;
|
||||
use crate::tools::router::ToolCall;
|
||||
use crate::tools::router::ToolRouter;
|
||||
use crate::turn_diff_tracker::TurnDiffTracker;
|
||||
use crate::tools::format_exec_output_for_model;
|
||||
use crate::tools::format_exec_output_str;
|
||||
|
||||
use super::SessionTask;
|
||||
use super::SessionTaskContext;
|
||||
|
||||
const USER_SHELL_TOOL_NAME: &str = "local_shell";
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct UserShellCommandTask {
|
||||
command: String,
|
||||
@@ -78,34 +86,150 @@ impl SessionTask for UserShellCommandTask {
|
||||
}
|
||||
};
|
||||
|
||||
let params = ShellToolCallParams {
|
||||
fn build_user_message(text: String) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::InputText { text }],
|
||||
}
|
||||
}
|
||||
|
||||
let call_id = Uuid::new_v4().to_string();
|
||||
let raw_command = self.command.clone();
|
||||
let command_text = format!("<user_shell_command>\n{raw_command}\n</user_shell_command>");
|
||||
let command_items = [build_user_message(command_text)];
|
||||
session
|
||||
.record_conversation_items(turn_context.as_ref(), &command_items)
|
||||
.await;
|
||||
|
||||
let parsed_cmd = parse_command(&shell_invocation);
|
||||
session
|
||||
.send_event(
|
||||
turn_context.as_ref(),
|
||||
EventMsg::UserCommandBegin(UserCommandBeginEvent {
|
||||
call_id: call_id.clone(),
|
||||
command: shell_invocation.clone(),
|
||||
cwd: turn_context.cwd.clone(),
|
||||
parsed_cmd,
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
let exec_env = ExecEnv {
|
||||
command: shell_invocation,
|
||||
workdir: None,
|
||||
cwd: turn_context.cwd.clone(),
|
||||
env: create_env(&turn_context.shell_environment_policy),
|
||||
timeout_ms: None,
|
||||
sandbox: SandboxType::None,
|
||||
with_escalated_permissions: None,
|
||||
justification: None,
|
||||
arg0: None,
|
||||
};
|
||||
|
||||
let tool_call = ToolCall {
|
||||
tool_name: USER_SHELL_TOOL_NAME.to_string(),
|
||||
call_id: Uuid::new_v4().to_string(),
|
||||
payload: ToolPayload::LocalShell { params },
|
||||
};
|
||||
let stdout_stream = Some(StdoutStream {
|
||||
sub_id: turn_context.sub_id.clone(),
|
||||
call_id: call_id.clone(),
|
||||
tx_event: session.get_tx_event(),
|
||||
});
|
||||
|
||||
let router = Arc::new(ToolRouter::from_config(&turn_context.tools_config, None));
|
||||
let tracker = Arc::new(Mutex::new(TurnDiffTracker::new()));
|
||||
let runtime = ToolCallRuntime::new(
|
||||
Arc::clone(&router),
|
||||
Arc::clone(&session),
|
||||
Arc::clone(&turn_context),
|
||||
Arc::clone(&tracker),
|
||||
let sandbox_policy = SandboxPolicy::DangerFullAccess;
|
||||
let exec_future = execute_exec_env(
|
||||
exec_env,
|
||||
&sandbox_policy,
|
||||
stdout_stream,
|
||||
Some(DeltaEventBuilder::user_command()),
|
||||
);
|
||||
tokio::pin!(exec_future);
|
||||
|
||||
if let Err(err) = runtime
|
||||
.handle_tool_call(tool_call, cancellation_token)
|
||||
.await
|
||||
{
|
||||
error!("user shell command failed: {err:?}");
|
||||
let exec_result = tokio::select! {
|
||||
res = &mut exec_future => Some(res),
|
||||
_ = cancellation_token.cancelled() => None,
|
||||
};
|
||||
|
||||
match exec_result {
|
||||
None => {
|
||||
let aborted_message = "command aborted by user".to_string();
|
||||
let aborted_text = format!(
|
||||
"<user_shell_command_output>\n{aborted_message}\n</user_shell_command_output>"
|
||||
);
|
||||
let output_items = [build_user_message(aborted_text)];
|
||||
session
|
||||
.record_conversation_items(turn_context.as_ref(), &output_items)
|
||||
.await;
|
||||
session
|
||||
.send_event(
|
||||
turn_context.as_ref(),
|
||||
EventMsg::UserCommandEnd(UserCommandEndEvent {
|
||||
call_id,
|
||||
stdout: String::new(),
|
||||
stderr: aborted_message.clone(),
|
||||
aggregated_output: aborted_message.clone(),
|
||||
exit_code: -1,
|
||||
duration: Duration::ZERO,
|
||||
formatted_output: aborted_message,
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Some(Ok(output)) => {
|
||||
session
|
||||
.send_event(
|
||||
turn_context.as_ref(),
|
||||
EventMsg::UserCommandEnd(UserCommandEndEvent {
|
||||
call_id: call_id.clone(),
|
||||
stdout: output.stdout.text.clone(),
|
||||
stderr: output.stderr.text.clone(),
|
||||
aggregated_output: output.aggregated_output.text.clone(),
|
||||
exit_code: output.exit_code,
|
||||
duration: output.duration,
|
||||
formatted_output: format_exec_output_str(&output),
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
let output_payload = format_exec_output_for_model(&output);
|
||||
let output_text = format!(
|
||||
"<user_shell_command_output>\n{output_payload}\n</user_shell_command_output>"
|
||||
);
|
||||
let output_items = [build_user_message(output_text)];
|
||||
session
|
||||
.record_conversation_items(turn_context.as_ref(), &output_items)
|
||||
.await;
|
||||
}
|
||||
Some(Err(err)) => {
|
||||
error!("user shell command failed: {err:?}");
|
||||
let message = format!("execution error: {err:?}");
|
||||
let exec_output = ExecToolCallOutput {
|
||||
exit_code: -1,
|
||||
stdout: StreamOutput::new(String::new()),
|
||||
stderr: StreamOutput::new(message.clone()),
|
||||
aggregated_output: StreamOutput::new(message.clone()),
|
||||
duration: Duration::ZERO,
|
||||
timed_out: false,
|
||||
};
|
||||
session
|
||||
.send_event(
|
||||
turn_context.as_ref(),
|
||||
EventMsg::UserCommandEnd(UserCommandEndEvent {
|
||||
call_id,
|
||||
stdout: exec_output.stdout.text.clone(),
|
||||
stderr: exec_output.stderr.text.clone(),
|
||||
aggregated_output: exec_output.aggregated_output.text.clone(),
|
||||
exit_code: exec_output.exit_code,
|
||||
duration: exec_output.duration,
|
||||
formatted_output: format_exec_output_str(&exec_output),
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
let output_payload = format_exec_output_for_model(&exec_output);
|
||||
let output_text = format!(
|
||||
"<user_shell_command_output>\n{output_payload}\n</user_shell_command_output>"
|
||||
);
|
||||
let output_items = [build_user_message(output_text)];
|
||||
session
|
||||
.record_conversation_items(turn_context.as_ref(), &output_items)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
use codex_core::ConversationManager;
|
||||
use codex_core::NewConversation;
|
||||
use codex_core::protocol::EventMsg;
|
||||
use codex_core::protocol::ExecCommandEndEvent;
|
||||
use codex_core::protocol::Op;
|
||||
use codex_core::protocol::TurnAbortReason;
|
||||
use codex_core::protocol::UserCommandEndEvent;
|
||||
use core_test_support::load_default_config_for_test;
|
||||
use core_test_support::wait_for_event;
|
||||
use std::path::PathBuf;
|
||||
@@ -63,8 +63,8 @@ async fn user_shell_cmd_ls_and_cat_in_temp_dir() {
|
||||
.submit(Op::RunUserShellCommand { command: list_cmd })
|
||||
.await
|
||||
.unwrap();
|
||||
let msg = wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecCommandEnd(_))).await;
|
||||
let EventMsg::ExecCommandEnd(ExecCommandEndEvent {
|
||||
let msg = wait_for_event(&codex, |ev| matches!(ev, EventMsg::UserCommandEnd(_))).await;
|
||||
let EventMsg::UserCommandEnd(UserCommandEndEvent {
|
||||
stdout, exit_code, ..
|
||||
}) = msg
|
||||
else {
|
||||
@@ -84,8 +84,8 @@ async fn user_shell_cmd_ls_and_cat_in_temp_dir() {
|
||||
.submit(Op::RunUserShellCommand { command: cat_cmd })
|
||||
.await
|
||||
.unwrap();
|
||||
let msg = wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecCommandEnd(_))).await;
|
||||
let EventMsg::ExecCommandEnd(ExecCommandEndEvent {
|
||||
let msg = wait_for_event(&codex, |ev| matches!(ev, EventMsg::UserCommandEnd(_))).await;
|
||||
let EventMsg::UserCommandEnd(UserCommandEndEvent {
|
||||
mut stdout,
|
||||
exit_code,
|
||||
..
|
||||
@@ -128,7 +128,7 @@ async fn user_shell_cmd_can_be_interrupted() {
|
||||
.unwrap();
|
||||
|
||||
// Wait until it has started (ExecCommandBegin), then interrupt.
|
||||
let _ = wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecCommandBegin(_))).await;
|
||||
let _ = wait_for_event(&codex, |ev| matches!(ev, EventMsg::UserCommandBegin(_))).await;
|
||||
codex.submit(Op::Interrupt).await.unwrap();
|
||||
|
||||
// Expect a TurnAborted(Interrupted) notification.
|
||||
|
||||
@@ -522,6 +522,9 @@ impl EventProcessor for EventProcessorWithHumanOutput {
|
||||
| EventMsg::McpListToolsResponse(_)
|
||||
| EventMsg::ListCustomPromptsResponse(_)
|
||||
| EventMsg::RawResponseItem(_)
|
||||
| EventMsg::UserCommandBegin(_)
|
||||
| EventMsg::UserCommandOutputDelta(_)
|
||||
| EventMsg::UserCommandEnd(_)
|
||||
| EventMsg::UserMessage(_)
|
||||
| EventMsg::EnteredReviewMode(_)
|
||||
| EventMsg::ExitedReviewMode(_)
|
||||
|
||||
@@ -274,6 +274,9 @@ async fn run_codex_tool_session_inner(
|
||||
| EventMsg::ExecCommandBegin(_)
|
||||
| EventMsg::ExecCommandOutputDelta(_)
|
||||
| EventMsg::ExecCommandEnd(_)
|
||||
| EventMsg::UserCommandBegin(_)
|
||||
| EventMsg::UserCommandOutputDelta(_)
|
||||
| EventMsg::UserCommandEnd(_)
|
||||
| EventMsg::BackgroundEvent(_)
|
||||
| EventMsg::StreamError(_)
|
||||
| EventMsg::PatchApplyBegin(_)
|
||||
|
||||
@@ -493,6 +493,15 @@ pub enum EventMsg {
|
||||
|
||||
ExecCommandEnd(ExecCommandEndEvent),
|
||||
|
||||
/// Notification that the user initiated a shell command.
|
||||
UserCommandBegin(UserCommandBeginEvent),
|
||||
|
||||
/// Incremental chunk of output from a running user command.
|
||||
UserCommandOutputDelta(UserCommandOutputDeltaEvent),
|
||||
|
||||
/// Completion notification for a user shell command.
|
||||
UserCommandEnd(UserCommandEndEvent),
|
||||
|
||||
/// Notification that the agent attached a local image via the view_image tool.
|
||||
ViewImageToolCall(ViewImageToolCallEvent),
|
||||
|
||||
@@ -1267,6 +1276,51 @@ pub struct ExecCommandOutputDeltaEvent {
|
||||
pub chunk: Vec<u8>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
|
||||
pub struct UserCommandBeginEvent {
|
||||
/// Identifier so this can be paired with the UserCommandEnd event.
|
||||
pub call_id: String,
|
||||
/// The command to be executed.
|
||||
pub command: Vec<String>,
|
||||
/// The command's working directory.
|
||||
pub cwd: PathBuf,
|
||||
pub parsed_cmd: Vec<ParsedCommand>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
|
||||
pub struct UserCommandEndEvent {
|
||||
/// Identifier for the UserCommandBegin that finished.
|
||||
pub call_id: String,
|
||||
/// Captured stdout.
|
||||
pub stdout: String,
|
||||
/// Captured stderr.
|
||||
pub stderr: String,
|
||||
/// Captured aggregated output.
|
||||
#[serde(default)]
|
||||
pub aggregated_output: String,
|
||||
/// The command's exit code.
|
||||
pub exit_code: i32,
|
||||
/// The duration of the command execution.
|
||||
#[ts(type = "string")]
|
||||
pub duration: Duration,
|
||||
/// Formatted output from the command, as seen by the model.
|
||||
pub formatted_output: String,
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, JsonSchema, TS)]
|
||||
pub struct UserCommandOutputDeltaEvent {
|
||||
/// Identifier for the UserCommandBegin that produced this chunk.
|
||||
pub call_id: String,
|
||||
/// Which stream produced this chunk.
|
||||
pub stream: ExecOutputStream,
|
||||
/// Raw bytes from the stream (may not be valid UTF-8).
|
||||
#[serde_as(as = "serde_with::base64::Base64")]
|
||||
#[schemars(with = "String")]
|
||||
#[ts(type = "string")]
|
||||
pub chunk: Vec<u8>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
|
||||
pub struct BackgroundEventEvent {
|
||||
pub message: String,
|
||||
|
||||
@@ -40,6 +40,8 @@ use codex_core::protocol::TurnAbortReason;
|
||||
use codex_core::protocol::TurnDiffEvent;
|
||||
use codex_core::protocol::UndoCompletedEvent;
|
||||
use codex_core::protocol::UndoStartedEvent;
|
||||
use codex_core::protocol::UserCommandBeginEvent;
|
||||
use codex_core::protocol::UserCommandEndEvent;
|
||||
use codex_core::protocol::UserMessageEvent;
|
||||
use codex_core::protocol::ViewImageToolCallEvent;
|
||||
use codex_core::protocol::WarningEvent;
|
||||
@@ -627,6 +629,30 @@ impl ChatWidget {
|
||||
self.defer_or_handle(|q| q.push_exec_end(ev), |s| s.handle_exec_end_now(ev2));
|
||||
}
|
||||
|
||||
fn on_user_command_begin(&mut self, ev: UserCommandBeginEvent) {
|
||||
self.flush_answer_stream_with_separator();
|
||||
let ev2 = ev.clone();
|
||||
self.defer_or_handle(
|
||||
|q| q.push_user_command_begin(ev),
|
||||
|s| s.handle_user_command_begin_now(ev2),
|
||||
);
|
||||
}
|
||||
|
||||
fn on_user_command_output_delta(
|
||||
&mut self,
|
||||
_ev: codex_core::protocol::UserCommandOutputDeltaEvent,
|
||||
) {
|
||||
// TODO: Handle streaming exec output if/when implemented
|
||||
}
|
||||
|
||||
fn on_user_command_end(&mut self, ev: UserCommandEndEvent) {
|
||||
let ev2 = ev.clone();
|
||||
self.defer_or_handle(
|
||||
|q| q.push_user_command_end(ev),
|
||||
|s| s.handle_user_command_end_now(ev2),
|
||||
);
|
||||
}
|
||||
|
||||
fn on_mcp_tool_call_begin(&mut self, ev: McpToolCallBeginEvent) {
|
||||
let ev2 = ev.clone();
|
||||
self.defer_or_handle(|q| q.push_mcp_begin(ev), |s| s.handle_mcp_begin_now(ev2));
|
||||
@@ -785,11 +811,23 @@ impl ChatWidget {
|
||||
self.request_redraw();
|
||||
}
|
||||
|
||||
pub(crate) fn handle_exec_end_now(&mut self, ev: ExecCommandEndEvent) {
|
||||
let running = self.running_commands.remove(&ev.call_id);
|
||||
fn handle_command_end_internal(
|
||||
&mut self,
|
||||
call_id: String,
|
||||
aggregated_output: String,
|
||||
formatted_output: String,
|
||||
exit_code: i32,
|
||||
duration: std::time::Duration,
|
||||
default_is_user_shell_command: bool,
|
||||
) {
|
||||
let running = self.running_commands.remove(&call_id);
|
||||
let (command, parsed, is_user_shell_command) = match running {
|
||||
Some(rc) => (rc.command, rc.parsed_cmd, rc.is_user_shell_command),
|
||||
None => (vec![ev.call_id.clone()], Vec::new(), false),
|
||||
None => (
|
||||
vec![call_id.clone()],
|
||||
Vec::new(),
|
||||
default_is_user_shell_command,
|
||||
),
|
||||
};
|
||||
|
||||
let needs_new = self
|
||||
@@ -800,7 +838,7 @@ impl ChatWidget {
|
||||
if needs_new {
|
||||
self.flush_active_cell();
|
||||
self.active_cell = Some(Box::new(new_active_exec_command(
|
||||
ev.call_id.clone(),
|
||||
call_id.clone(),
|
||||
command,
|
||||
parsed,
|
||||
is_user_shell_command,
|
||||
@@ -813,13 +851,13 @@ impl ChatWidget {
|
||||
.and_then(|c| c.as_any_mut().downcast_mut::<ExecCell>())
|
||||
{
|
||||
cell.complete_call(
|
||||
&ev.call_id,
|
||||
&call_id,
|
||||
CommandOutput {
|
||||
exit_code: ev.exit_code,
|
||||
formatted_output: ev.formatted_output.clone(),
|
||||
aggregated_output: ev.aggregated_output.clone(),
|
||||
exit_code,
|
||||
formatted_output,
|
||||
aggregated_output,
|
||||
},
|
||||
ev.duration,
|
||||
duration,
|
||||
);
|
||||
if cell.should_flush() {
|
||||
self.flush_active_cell();
|
||||
@@ -827,6 +865,44 @@ impl ChatWidget {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn handle_exec_end_now(&mut self, ev: ExecCommandEndEvent) {
|
||||
let ExecCommandEndEvent {
|
||||
call_id,
|
||||
aggregated_output,
|
||||
formatted_output,
|
||||
exit_code,
|
||||
duration,
|
||||
..
|
||||
} = ev;
|
||||
self.handle_command_end_internal(
|
||||
call_id,
|
||||
aggregated_output,
|
||||
formatted_output,
|
||||
exit_code,
|
||||
duration,
|
||||
false,
|
||||
);
|
||||
}
|
||||
|
||||
pub(crate) fn handle_user_command_end_now(&mut self, ev: UserCommandEndEvent) {
|
||||
let UserCommandEndEvent {
|
||||
call_id,
|
||||
aggregated_output,
|
||||
formatted_output,
|
||||
exit_code,
|
||||
duration,
|
||||
..
|
||||
} = ev;
|
||||
self.handle_command_end_internal(
|
||||
call_id,
|
||||
aggregated_output,
|
||||
formatted_output,
|
||||
exit_code,
|
||||
duration,
|
||||
true,
|
||||
);
|
||||
}
|
||||
|
||||
pub(crate) fn handle_patch_apply_end_now(
|
||||
&mut self,
|
||||
event: codex_core::protocol::PatchApplyEndEvent,
|
||||
@@ -875,14 +951,19 @@ impl ChatWidget {
|
||||
});
|
||||
}
|
||||
|
||||
pub(crate) fn handle_exec_begin_now(&mut self, ev: ExecCommandBeginEvent) {
|
||||
// Ensure the status indicator is visible while the command runs.
|
||||
fn handle_command_begin_internal(
|
||||
&mut self,
|
||||
call_id: String,
|
||||
command: Vec<String>,
|
||||
parsed_cmd: Vec<ParsedCommand>,
|
||||
is_user_shell_command: bool,
|
||||
) {
|
||||
self.running_commands.insert(
|
||||
ev.call_id.clone(),
|
||||
call_id.clone(),
|
||||
RunningCommand {
|
||||
command: ev.command.clone(),
|
||||
parsed_cmd: ev.parsed_cmd.clone(),
|
||||
is_user_shell_command: ev.is_user_shell_command,
|
||||
command: command.clone(),
|
||||
parsed_cmd: parsed_cmd.clone(),
|
||||
is_user_shell_command,
|
||||
},
|
||||
);
|
||||
if let Some(cell) = self
|
||||
@@ -890,10 +971,10 @@ impl ChatWidget {
|
||||
.as_mut()
|
||||
.and_then(|c| c.as_any_mut().downcast_mut::<ExecCell>())
|
||||
&& let Some(new_exec) = cell.with_added_call(
|
||||
ev.call_id.clone(),
|
||||
ev.command.clone(),
|
||||
ev.parsed_cmd.clone(),
|
||||
ev.is_user_shell_command,
|
||||
call_id.clone(),
|
||||
command.clone(),
|
||||
parsed_cmd.clone(),
|
||||
is_user_shell_command,
|
||||
)
|
||||
{
|
||||
*cell = new_exec;
|
||||
@@ -901,16 +982,37 @@ impl ChatWidget {
|
||||
self.flush_active_cell();
|
||||
|
||||
self.active_cell = Some(Box::new(new_active_exec_command(
|
||||
ev.call_id.clone(),
|
||||
ev.command.clone(),
|
||||
ev.parsed_cmd,
|
||||
ev.is_user_shell_command,
|
||||
call_id,
|
||||
command,
|
||||
parsed_cmd,
|
||||
is_user_shell_command,
|
||||
)));
|
||||
}
|
||||
|
||||
self.request_redraw();
|
||||
}
|
||||
|
||||
pub(crate) fn handle_exec_begin_now(&mut self, ev: ExecCommandBeginEvent) {
|
||||
let ExecCommandBeginEvent {
|
||||
call_id,
|
||||
command,
|
||||
parsed_cmd,
|
||||
is_user_shell_command,
|
||||
..
|
||||
} = ev;
|
||||
self.handle_command_begin_internal(call_id, command, parsed_cmd, is_user_shell_command);
|
||||
}
|
||||
|
||||
pub(crate) fn handle_user_command_begin_now(&mut self, ev: UserCommandBeginEvent) {
|
||||
let UserCommandBeginEvent {
|
||||
call_id,
|
||||
command,
|
||||
parsed_cmd,
|
||||
..
|
||||
} = ev;
|
||||
self.handle_command_begin_internal(call_id, command, parsed_cmd, true);
|
||||
}
|
||||
|
||||
pub(crate) fn handle_mcp_begin_now(&mut self, ev: McpToolCallBeginEvent) {
|
||||
self.flush_answer_stream_with_separator();
|
||||
self.flush_active_cell();
|
||||
@@ -1453,7 +1555,8 @@ impl ChatWidget {
|
||||
match msg {
|
||||
EventMsg::AgentMessageDelta(_)
|
||||
| EventMsg::AgentReasoningDelta(_)
|
||||
| EventMsg::ExecCommandOutputDelta(_) => {}
|
||||
| EventMsg::ExecCommandOutputDelta(_)
|
||||
| EventMsg::UserCommandOutputDelta(_) => {}
|
||||
_ => {
|
||||
tracing::trace!("handle_codex_event: {:?}", msg);
|
||||
}
|
||||
@@ -1506,9 +1609,12 @@ impl ChatWidget {
|
||||
}
|
||||
EventMsg::ExecCommandBegin(ev) => self.on_exec_command_begin(ev),
|
||||
EventMsg::ExecCommandOutputDelta(delta) => self.on_exec_command_output_delta(delta),
|
||||
EventMsg::UserCommandBegin(ev) => self.on_user_command_begin(ev),
|
||||
EventMsg::UserCommandOutputDelta(delta) => self.on_user_command_output_delta(delta),
|
||||
EventMsg::PatchApplyBegin(ev) => self.on_patch_apply_begin(ev),
|
||||
EventMsg::PatchApplyEnd(ev) => self.on_patch_apply_end(ev),
|
||||
EventMsg::ExecCommandEnd(ev) => self.on_exec_command_end(ev),
|
||||
EventMsg::UserCommandEnd(ev) => self.on_user_command_end(ev),
|
||||
EventMsg::ViewImageToolCall(ev) => self.on_view_image_tool_call(ev),
|
||||
EventMsg::McpToolCallBegin(ev) => self.on_mcp_tool_call_begin(ev),
|
||||
EventMsg::McpToolCallEnd(ev) => self.on_mcp_tool_call_end(ev),
|
||||
|
||||
@@ -7,6 +7,8 @@ use codex_core::protocol::ExecCommandEndEvent;
|
||||
use codex_core::protocol::McpToolCallBeginEvent;
|
||||
use codex_core::protocol::McpToolCallEndEvent;
|
||||
use codex_core::protocol::PatchApplyEndEvent;
|
||||
use codex_core::protocol::UserCommandBeginEvent;
|
||||
use codex_core::protocol::UserCommandEndEvent;
|
||||
|
||||
use super::ChatWidget;
|
||||
|
||||
@@ -16,6 +18,8 @@ pub(crate) enum QueuedInterrupt {
|
||||
ApplyPatchApproval(String, ApplyPatchApprovalRequestEvent),
|
||||
ExecBegin(ExecCommandBeginEvent),
|
||||
ExecEnd(ExecCommandEndEvent),
|
||||
UserCommandBegin(UserCommandBeginEvent),
|
||||
UserCommandEnd(UserCommandEndEvent),
|
||||
McpBegin(McpToolCallBeginEvent),
|
||||
McpEnd(McpToolCallEndEvent),
|
||||
PatchEnd(PatchApplyEndEvent),
|
||||
@@ -59,6 +63,14 @@ impl InterruptManager {
|
||||
self.queue.push_back(QueuedInterrupt::ExecEnd(ev));
|
||||
}
|
||||
|
||||
pub(crate) fn push_user_command_begin(&mut self, ev: UserCommandBeginEvent) {
|
||||
self.queue.push_back(QueuedInterrupt::UserCommandBegin(ev));
|
||||
}
|
||||
|
||||
pub(crate) fn push_user_command_end(&mut self, ev: UserCommandEndEvent) {
|
||||
self.queue.push_back(QueuedInterrupt::UserCommandEnd(ev));
|
||||
}
|
||||
|
||||
pub(crate) fn push_mcp_begin(&mut self, ev: McpToolCallBeginEvent) {
|
||||
self.queue.push_back(QueuedInterrupt::McpBegin(ev));
|
||||
}
|
||||
@@ -80,6 +92,8 @@ impl InterruptManager {
|
||||
}
|
||||
QueuedInterrupt::ExecBegin(ev) => chat.handle_exec_begin_now(ev),
|
||||
QueuedInterrupt::ExecEnd(ev) => chat.handle_exec_end_now(ev),
|
||||
QueuedInterrupt::UserCommandBegin(ev) => chat.handle_user_command_begin_now(ev),
|
||||
QueuedInterrupt::UserCommandEnd(ev) => chat.handle_user_command_end_now(ev),
|
||||
QueuedInterrupt::McpBegin(ev) => chat.handle_mcp_begin_now(ev),
|
||||
QueuedInterrupt::McpEnd(ev) => chat.handle_mcp_end_now(ev),
|
||||
QueuedInterrupt::PatchEnd(ev) => chat.handle_patch_apply_end_now(ev),
|
||||
|
||||
@@ -345,7 +345,13 @@ impl ExecCell {
|
||||
Some(false) => "•".red().bold(),
|
||||
None => spinner(call.start_time),
|
||||
};
|
||||
let title = if self.is_active() { "Running" } else { "Ran" };
|
||||
let title = if self.is_active() {
|
||||
"Running"
|
||||
} else if call.is_user_shell_command {
|
||||
"You Ran:"
|
||||
} else {
|
||||
"Ran:"
|
||||
};
|
||||
|
||||
let mut header_line =
|
||||
Line::from(vec![bullet.clone(), " ".into(), title.bold(), " ".into()]);
|
||||
|
||||
Reference in New Issue
Block a user