mirror of
https://github.com/openai/codex.git
synced 2026-02-01 22:47:52 +00:00
[app-server] feat: add Declined status for command exec (#7101)
Add a `Declined` status for when we request an approval from the user and the user declines. This allows us to distinguish from commands that actually ran, but failed. This behaves similarly to apply_patch / FileChange, which does the same thing.
This commit is contained in:
@@ -859,6 +859,7 @@ pub enum CommandExecutionStatus {
|
||||
InProgress,
|
||||
Completed,
|
||||
Failed,
|
||||
Declined,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
|
||||
@@ -175,12 +175,20 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
});
|
||||
}
|
||||
ApiVersion::V2 => {
|
||||
let item_id = call_id.clone();
|
||||
let command_actions = parsed_cmd
|
||||
.iter()
|
||||
.cloned()
|
||||
.map(V2ParsedCommand::from)
|
||||
.collect::<Vec<_>>();
|
||||
let command_string = shlex_join(&command);
|
||||
|
||||
let params = CommandExecutionRequestApprovalParams {
|
||||
thread_id: conversation_id.to_string(),
|
||||
turn_id: turn_id.clone(),
|
||||
// Until we migrate the core to be aware of a first class CommandExecutionItem
|
||||
// and emit the corresponding EventMsg, we repurpose the call_id as the item_id.
|
||||
item_id: call_id.clone(),
|
||||
item_id: item_id.clone(),
|
||||
reason,
|
||||
risk: risk.map(V2SandboxCommandAssessment::from),
|
||||
};
|
||||
@@ -190,8 +198,17 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
))
|
||||
.await;
|
||||
tokio::spawn(async move {
|
||||
on_command_execution_request_approval_response(event_id, rx, conversation)
|
||||
.await;
|
||||
on_command_execution_request_approval_response(
|
||||
event_id,
|
||||
item_id,
|
||||
command_string,
|
||||
cwd,
|
||||
command_actions,
|
||||
rx,
|
||||
conversation,
|
||||
outgoing,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
}
|
||||
},
|
||||
@@ -370,16 +387,21 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
.await;
|
||||
}
|
||||
EventMsg::ExecCommandBegin(exec_command_begin_event) => {
|
||||
let item_id = exec_command_begin_event.call_id.clone();
|
||||
let command_actions = exec_command_begin_event
|
||||
.parsed_cmd
|
||||
.into_iter()
|
||||
.map(V2ParsedCommand::from)
|
||||
.collect::<Vec<_>>();
|
||||
let command = shlex_join(&exec_command_begin_event.command);
|
||||
let cwd = exec_command_begin_event.cwd;
|
||||
|
||||
let item = ThreadItem::CommandExecution {
|
||||
id: exec_command_begin_event.call_id.clone(),
|
||||
command: shlex_join(&exec_command_begin_event.command),
|
||||
cwd: exec_command_begin_event.cwd,
|
||||
id: item_id,
|
||||
command,
|
||||
cwd,
|
||||
status: CommandExecutionStatus::InProgress,
|
||||
command_actions: exec_command_begin_event
|
||||
.parsed_cmd
|
||||
.into_iter()
|
||||
.map(V2ParsedCommand::from)
|
||||
.collect(),
|
||||
command_actions,
|
||||
aggregated_output: None,
|
||||
exit_code: None,
|
||||
duration_ms: None,
|
||||
@@ -417,6 +439,10 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
} else {
|
||||
CommandExecutionStatus::Failed
|
||||
};
|
||||
let command_actions = parsed_cmd
|
||||
.into_iter()
|
||||
.map(V2ParsedCommand::from)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let aggregated_output = if aggregated_output.is_empty() {
|
||||
None
|
||||
@@ -431,7 +457,7 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
command: shlex_join(&command),
|
||||
cwd,
|
||||
status,
|
||||
command_actions: parsed_cmd.into_iter().map(V2ParsedCommand::from).collect(),
|
||||
command_actions,
|
||||
aggregated_output,
|
||||
exit_code: Some(exit_code),
|
||||
duration_ms: Some(duration_ms),
|
||||
@@ -516,6 +542,30 @@ async fn complete_file_change_item(
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn complete_command_execution_item(
|
||||
item_id: String,
|
||||
command: String,
|
||||
cwd: PathBuf,
|
||||
command_actions: Vec<V2ParsedCommand>,
|
||||
status: CommandExecutionStatus,
|
||||
outgoing: &OutgoingMessageSender,
|
||||
) {
|
||||
let item = ThreadItem::CommandExecution {
|
||||
id: item_id,
|
||||
command,
|
||||
cwd,
|
||||
status,
|
||||
command_actions,
|
||||
aggregated_output: None,
|
||||
exit_code: None,
|
||||
duration_ms: None,
|
||||
};
|
||||
let notification = ItemCompletedNotification { item };
|
||||
outgoing
|
||||
.send_server_notification(ServerNotification::ItemCompleted(notification))
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn find_and_remove_turn_summary(
|
||||
conversation_id: ConversationId,
|
||||
turn_summary_store: &TurnSummaryStore,
|
||||
@@ -765,42 +815,68 @@ async fn on_file_change_request_approval_response(
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn on_command_execution_request_approval_response(
|
||||
event_id: String,
|
||||
item_id: String,
|
||||
command: String,
|
||||
cwd: PathBuf,
|
||||
command_actions: Vec<V2ParsedCommand>,
|
||||
receiver: oneshot::Receiver<JsonValue>,
|
||||
conversation: Arc<CodexConversation>,
|
||||
outgoing: Arc<OutgoingMessageSender>,
|
||||
) {
|
||||
let response = receiver.await;
|
||||
let value = match response {
|
||||
Ok(value) => value,
|
||||
let (decision, completion_status) = match response {
|
||||
Ok(value) => {
|
||||
let response = serde_json::from_value::<CommandExecutionRequestApprovalResponse>(value)
|
||||
.unwrap_or_else(|err| {
|
||||
error!("failed to deserialize CommandExecutionRequestApprovalResponse: {err}");
|
||||
CommandExecutionRequestApprovalResponse {
|
||||
decision: ApprovalDecision::Decline,
|
||||
accept_settings: None,
|
||||
}
|
||||
});
|
||||
|
||||
let CommandExecutionRequestApprovalResponse {
|
||||
decision,
|
||||
accept_settings,
|
||||
} = response;
|
||||
|
||||
let (decision, completion_status) = match (decision, accept_settings) {
|
||||
(ApprovalDecision::Accept, Some(settings)) if settings.for_session => {
|
||||
(ReviewDecision::ApprovedForSession, None)
|
||||
}
|
||||
(ApprovalDecision::Accept, _) => (ReviewDecision::Approved, None),
|
||||
(ApprovalDecision::Decline, _) => (
|
||||
ReviewDecision::Denied,
|
||||
Some(CommandExecutionStatus::Declined),
|
||||
),
|
||||
(ApprovalDecision::Cancel, _) => (
|
||||
ReviewDecision::Abort,
|
||||
Some(CommandExecutionStatus::Declined),
|
||||
),
|
||||
};
|
||||
(decision, completion_status)
|
||||
}
|
||||
Err(err) => {
|
||||
error!("request failed: {err:?}");
|
||||
return;
|
||||
(ReviewDecision::Denied, Some(CommandExecutionStatus::Failed))
|
||||
}
|
||||
};
|
||||
|
||||
let response = serde_json::from_value::<CommandExecutionRequestApprovalResponse>(value)
|
||||
.unwrap_or_else(|err| {
|
||||
error!("failed to deserialize CommandExecutionRequestApprovalResponse: {err}");
|
||||
CommandExecutionRequestApprovalResponse {
|
||||
decision: ApprovalDecision::Decline,
|
||||
accept_settings: None,
|
||||
}
|
||||
});
|
||||
if let Some(status) = completion_status {
|
||||
complete_command_execution_item(
|
||||
item_id.clone(),
|
||||
command.clone(),
|
||||
cwd.clone(),
|
||||
command_actions.clone(),
|
||||
status,
|
||||
outgoing.as_ref(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
let CommandExecutionRequestApprovalResponse {
|
||||
decision,
|
||||
accept_settings,
|
||||
} = response;
|
||||
|
||||
let decision = match (decision, accept_settings) {
|
||||
(ApprovalDecision::Accept, Some(settings)) if settings.for_session => {
|
||||
ReviewDecision::ApprovedForSession
|
||||
}
|
||||
(ApprovalDecision::Accept, _) => ReviewDecision::Approved,
|
||||
(ApprovalDecision::Decline, _) => ReviewDecision::Denied,
|
||||
(ApprovalDecision::Cancel, _) => ReviewDecision::Abort,
|
||||
};
|
||||
if let Err(err) = conversation
|
||||
.submit(Op::ExecApproval {
|
||||
id: event_id,
|
||||
|
||||
@@ -8,6 +8,7 @@ use app_test_support::create_shell_command_sse_response;
|
||||
use app_test_support::format_with_current_shell_display;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::ApprovalDecision;
|
||||
use codex_app_server_protocol::CommandExecutionRequestApprovalResponse;
|
||||
use codex_app_server_protocol::CommandExecutionStatus;
|
||||
use codex_app_server_protocol::FileChangeRequestApprovalResponse;
|
||||
use codex_app_server_protocol::ItemCompletedNotification;
|
||||
@@ -329,6 +330,145 @@ async fn turn_start_exec_approval_toggle_v2() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_start_exec_approval_decline_v2() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let tmp = TempDir::new()?;
|
||||
let codex_home = tmp.path().to_path_buf();
|
||||
let workspace = tmp.path().join("workspace");
|
||||
std::fs::create_dir(&workspace)?;
|
||||
|
||||
let responses = vec![
|
||||
create_shell_command_sse_response(
|
||||
vec![
|
||||
"python3".to_string(),
|
||||
"-c".to_string(),
|
||||
"print(42)".to_string(),
|
||||
],
|
||||
None,
|
||||
Some(5000),
|
||||
"call-decline",
|
||||
)?,
|
||||
create_final_assistant_message_sse_response("done")?,
|
||||
];
|
||||
let server = create_mock_chat_completions_server(responses).await;
|
||||
create_config_toml(codex_home.as_path(), &server.uri(), "untrusted")?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.as_path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let start_id = mcp
|
||||
.send_thread_start_request(ThreadStartParams {
|
||||
model: Some("mock-model".to_string()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let start_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
|
||||
|
||||
let turn_id = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id.clone(),
|
||||
input: vec![V2UserInput::Text {
|
||||
text: "run python".to_string(),
|
||||
}],
|
||||
cwd: Some(workspace.clone()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let turn_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
|
||||
)
|
||||
.await??;
|
||||
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
|
||||
|
||||
let started_command_execution = timeout(DEFAULT_READ_TIMEOUT, async {
|
||||
loop {
|
||||
let started_notif = mcp
|
||||
.read_stream_until_notification_message("item/started")
|
||||
.await?;
|
||||
let started: ItemStartedNotification =
|
||||
serde_json::from_value(started_notif.params.clone().expect("item/started params"))?;
|
||||
if let ThreadItem::CommandExecution { .. } = started.item {
|
||||
return Ok::<ThreadItem, anyhow::Error>(started.item);
|
||||
}
|
||||
}
|
||||
})
|
||||
.await??;
|
||||
let ThreadItem::CommandExecution { id, status, .. } = started_command_execution else {
|
||||
unreachable!("loop ensures we break on command execution items");
|
||||
};
|
||||
assert_eq!(id, "call-decline");
|
||||
assert_eq!(status, CommandExecutionStatus::InProgress);
|
||||
|
||||
let server_req = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_request_message(),
|
||||
)
|
||||
.await??;
|
||||
let ServerRequest::CommandExecutionRequestApproval { request_id, params } = server_req else {
|
||||
panic!("expected CommandExecutionRequestApproval request")
|
||||
};
|
||||
assert_eq!(params.item_id, "call-decline");
|
||||
assert_eq!(params.thread_id, thread.id);
|
||||
assert_eq!(params.turn_id, turn.id);
|
||||
|
||||
mcp.send_response(
|
||||
request_id,
|
||||
serde_json::to_value(CommandExecutionRequestApprovalResponse {
|
||||
decision: ApprovalDecision::Decline,
|
||||
accept_settings: None,
|
||||
})?,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let completed_command_execution = timeout(DEFAULT_READ_TIMEOUT, async {
|
||||
loop {
|
||||
let completed_notif = mcp
|
||||
.read_stream_until_notification_message("item/completed")
|
||||
.await?;
|
||||
let completed: ItemCompletedNotification = serde_json::from_value(
|
||||
completed_notif
|
||||
.params
|
||||
.clone()
|
||||
.expect("item/completed params"),
|
||||
)?;
|
||||
if let ThreadItem::CommandExecution { .. } = completed.item {
|
||||
return Ok::<ThreadItem, anyhow::Error>(completed.item);
|
||||
}
|
||||
}
|
||||
})
|
||||
.await??;
|
||||
let ThreadItem::CommandExecution {
|
||||
id,
|
||||
status,
|
||||
exit_code,
|
||||
aggregated_output,
|
||||
..
|
||||
} = completed_command_execution
|
||||
else {
|
||||
unreachable!("loop ensures we break on command execution items");
|
||||
};
|
||||
assert_eq!(id, "call-decline");
|
||||
assert_eq!(status, CommandExecutionStatus::Declined);
|
||||
assert!(exit_code.is_none());
|
||||
assert!(aggregated_output.is_none());
|
||||
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("codex/event/task_complete"),
|
||||
)
|
||||
.await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
@@ -144,6 +144,7 @@ pub enum CommandExecutionStatus {
|
||||
InProgress,
|
||||
Completed,
|
||||
Failed,
|
||||
Declined,
|
||||
}
|
||||
|
||||
/// A command executed by the agent.
|
||||
|
||||
Reference in New Issue
Block a user