fix(guardian): fix ordering of guardian events (#16462)

Guardian events were emitted a bit out of order for CommandExecution
items. This would make it hard for the frontend to render a guardian
auto-review, which has this payload:
```
pub struct ItemGuardianApprovalReviewStartedNotification {
    pub thread_id: String,
    pub turn_id: String,
    pub target_item_id: String,
    pub review: GuardianApprovalReview,
    // FYI this is no longer a json blob
    pub action: Option<JsonValue>,
}
```

There is a `target_item_id` the auto-approval review is referring to,
but the actual item had not been emitted yet.

Before this PR:
- `item/autoApprovalReview/started`
- `item/autoApprovalReview/completed`, and if approved...
- `item/started`
- `item/completed`

After this PR:
- `item/started`
- `item/autoApprovalReview/started`
- `item/autoApprovalReview/completed`
- `item/completed`

This lines up much better with existing patterns (i.e. human review in
`Default mode`, where app-server would send a server request to prompt
for user approval after `item/started`), and makes it easier for clients
to render what guardian is actually reviewing.

We do this following a similar pattern as `FileChange` (aka apply patch)
items, where we create a FileChange item and emit `item/started` if we
see the apply patch approval request, before the actual apply patch call
runs.
This commit is contained in:
Owen Lin
2026-04-06 12:14:27 -07:00
committed by GitHub
parent 4eabc3dcb1
commit bd30bad96f
8 changed files with 1084 additions and 321 deletions

2
codex-rs/Cargo.lock generated
View File

@@ -1492,6 +1492,7 @@ dependencies = [
"codex-experimental-api-macros",
"codex-git-utils",
"codex-protocol",
"codex-shell-command",
"codex-utils-absolute-path",
"codex-utils-cargo-bin",
"inventory",
@@ -1501,7 +1502,6 @@ dependencies = [
"serde",
"serde_json",
"serde_with",
"shlex",
"similar",
"strum_macros 0.28.0",
"tempfile",

View File

@@ -17,12 +17,12 @@ clap = { workspace = true, features = ["derive"] }
codex-experimental-api-macros = { workspace = true }
codex-git-utils = { workspace = true }
codex-protocol = { workspace = true }
codex-shell-command = { workspace = true }
codex-utils-absolute-path = { workspace = true }
schemars = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
serde_with = { workspace = true }
shlex = { workspace = true }
strum_macros = { workspace = true }
thiserror = { workspace = true }
rmcp = { workspace = true, default-features = false, features = [

View File

@@ -15,6 +15,7 @@ pub use export::generate_ts_with_options;
pub use export::generate_types;
pub use jsonrpc_lite::*;
pub use protocol::common::*;
pub use protocol::item_builders::*;
pub use protocol::thread_history::*;
pub use protocol::v1::ApplyPatchApprovalParams;
pub use protocol::v1::ApplyPatchApprovalResponse;

View File

@@ -0,0 +1,299 @@
//! Shared builders for synthetic [`ThreadItem`] values emitted by the app-server layer.
//!
//! These items do not come from first-class core `ItemStarted` / `ItemCompleted` events.
//! Instead, the app-server synthesizes them so clients can render a coherent lifecycle for
//! approvals and other pre-execution flows before the underlying tool has started or when the
//! tool never starts at all.
//!
//! Keeping these builders in one place is useful for two reasons:
//! - Live notifications and rebuilt `thread/read` history both need to construct the same
//! synthetic items, so sharing the logic avoids drift between those paths.
//! - The projection is presentation-specific. Core protocol events stay generic, while the
//! app-server protocol decides how to surface those events as `ThreadItem`s for clients.
use crate::protocol::common::ServerNotification;
use crate::protocol::v2::CommandAction;
use crate::protocol::v2::CommandExecutionSource;
use crate::protocol::v2::CommandExecutionStatus;
use crate::protocol::v2::FileUpdateChange;
use crate::protocol::v2::GuardianApprovalReview;
use crate::protocol::v2::GuardianApprovalReviewStatus;
use crate::protocol::v2::ItemGuardianApprovalReviewCompletedNotification;
use crate::protocol::v2::ItemGuardianApprovalReviewStartedNotification;
use crate::protocol::v2::PatchApplyStatus;
use crate::protocol::v2::PatchChangeKind;
use crate::protocol::v2::ThreadItem;
use codex_protocol::ThreadId;
use codex_protocol::protocol::ApplyPatchApprovalRequestEvent;
use codex_protocol::protocol::ExecApprovalRequestEvent;
use codex_protocol::protocol::ExecCommandBeginEvent;
use codex_protocol::protocol::ExecCommandEndEvent;
use codex_protocol::protocol::FileChange;
use codex_protocol::protocol::GuardianAssessmentAction;
use codex_protocol::protocol::GuardianAssessmentEvent;
use codex_protocol::protocol::PatchApplyBeginEvent;
use codex_protocol::protocol::PatchApplyEndEvent;
use codex_shell_command::parse_command::parse_command;
use codex_shell_command::parse_command::shlex_join;
use std::collections::HashMap;
use std::path::PathBuf;
pub fn build_file_change_approval_request_item(
payload: &ApplyPatchApprovalRequestEvent,
) -> ThreadItem {
ThreadItem::FileChange {
id: payload.call_id.clone(),
changes: convert_patch_changes(&payload.changes),
status: PatchApplyStatus::InProgress,
}
}
pub fn build_file_change_begin_item(payload: &PatchApplyBeginEvent) -> ThreadItem {
ThreadItem::FileChange {
id: payload.call_id.clone(),
changes: convert_patch_changes(&payload.changes),
status: PatchApplyStatus::InProgress,
}
}
pub fn build_file_change_end_item(payload: &PatchApplyEndEvent) -> ThreadItem {
ThreadItem::FileChange {
id: payload.call_id.clone(),
changes: convert_patch_changes(&payload.changes),
status: (&payload.status).into(),
}
}
pub fn build_command_execution_approval_request_item(
payload: &ExecApprovalRequestEvent,
) -> ThreadItem {
ThreadItem::CommandExecution {
id: payload.call_id.clone(),
command: shlex_join(&payload.command),
cwd: payload.cwd.clone(),
process_id: None,
source: CommandExecutionSource::Agent,
status: CommandExecutionStatus::InProgress,
command_actions: payload
.parsed_cmd
.iter()
.cloned()
.map(CommandAction::from)
.collect(),
aggregated_output: None,
exit_code: None,
duration_ms: None,
}
}
pub fn build_command_execution_begin_item(payload: &ExecCommandBeginEvent) -> ThreadItem {
ThreadItem::CommandExecution {
id: payload.call_id.clone(),
command: shlex_join(&payload.command),
cwd: payload.cwd.clone(),
process_id: payload.process_id.clone(),
source: payload.source.into(),
status: CommandExecutionStatus::InProgress,
command_actions: payload
.parsed_cmd
.iter()
.cloned()
.map(CommandAction::from)
.collect(),
aggregated_output: None,
exit_code: None,
duration_ms: None,
}
}
pub fn build_command_execution_end_item(payload: &ExecCommandEndEvent) -> ThreadItem {
let aggregated_output = if payload.aggregated_output.is_empty() {
None
} else {
Some(payload.aggregated_output.clone())
};
let duration_ms = i64::try_from(payload.duration.as_millis()).unwrap_or(i64::MAX);
ThreadItem::CommandExecution {
id: payload.call_id.clone(),
command: shlex_join(&payload.command),
cwd: payload.cwd.clone(),
process_id: payload.process_id.clone(),
source: payload.source.into(),
status: (&payload.status).into(),
command_actions: payload
.parsed_cmd
.iter()
.cloned()
.map(CommandAction::from)
.collect(),
aggregated_output,
exit_code: Some(payload.exit_code),
duration_ms: Some(duration_ms),
}
}
/// Build a guardian-derived [`ThreadItem`].
///
/// Currently this only synthesizes [`ThreadItem::CommandExecution`] for
/// [`GuardianAssessmentAction::Command`] and [`GuardianAssessmentAction::Execve`].
pub fn build_item_from_guardian_event(
assessment: &GuardianAssessmentEvent,
status: CommandExecutionStatus,
) -> Option<ThreadItem> {
match &assessment.action {
GuardianAssessmentAction::Command { command, cwd, .. } => {
let command = command.clone();
let command_actions = vec![CommandAction::Unknown {
command: command.clone(),
}];
Some(ThreadItem::CommandExecution {
id: assessment.id.clone(),
command,
cwd: cwd.clone(),
process_id: None,
source: CommandExecutionSource::Agent,
status,
command_actions,
aggregated_output: None,
exit_code: None,
duration_ms: None,
})
}
GuardianAssessmentAction::Execve {
program, argv, cwd, ..
} => {
let argv = if argv.is_empty() {
vec![program.clone()]
} else {
std::iter::once(program.clone())
.chain(argv.iter().skip(1).cloned())
.collect::<Vec<_>>()
};
let command = shlex_join(&argv);
let parsed_cmd = parse_command(&argv);
let command_actions = if parsed_cmd.is_empty() {
vec![CommandAction::Unknown {
command: command.clone(),
}]
} else {
parsed_cmd.into_iter().map(CommandAction::from).collect()
};
Some(ThreadItem::CommandExecution {
id: assessment.id.clone(),
command,
cwd: cwd.clone(),
process_id: None,
source: CommandExecutionSource::Agent,
status,
command_actions,
aggregated_output: None,
exit_code: None,
duration_ms: None,
})
}
GuardianAssessmentAction::ApplyPatch { .. }
| GuardianAssessmentAction::NetworkAccess { .. }
| GuardianAssessmentAction::McpToolCall { .. } => None,
}
}
pub fn guardian_auto_approval_review_notification(
conversation_id: &ThreadId,
event_turn_id: &str,
assessment: &GuardianAssessmentEvent,
) -> ServerNotification {
// TODO(ccunningham): Attach guardian review state to the reviewed tool
// item's lifecycle instead of sending standalone review notifications so
// the app-server API can persist and replay review state via `thread/read`.
let turn_id = if assessment.turn_id.is_empty() {
event_turn_id.to_string()
} else {
assessment.turn_id.clone()
};
let review = GuardianApprovalReview {
status: match assessment.status {
codex_protocol::protocol::GuardianAssessmentStatus::InProgress => {
GuardianApprovalReviewStatus::InProgress
}
codex_protocol::protocol::GuardianAssessmentStatus::Approved => {
GuardianApprovalReviewStatus::Approved
}
codex_protocol::protocol::GuardianAssessmentStatus::Denied => {
GuardianApprovalReviewStatus::Denied
}
codex_protocol::protocol::GuardianAssessmentStatus::Aborted => {
GuardianApprovalReviewStatus::Aborted
}
},
risk_score: assessment.risk_score,
risk_level: assessment.risk_level.map(Into::into),
rationale: assessment.rationale.clone(),
};
let action = assessment.action.clone().into();
match assessment.status {
codex_protocol::protocol::GuardianAssessmentStatus::InProgress => {
ServerNotification::ItemGuardianApprovalReviewStarted(
ItemGuardianApprovalReviewStartedNotification {
thread_id: conversation_id.to_string(),
turn_id,
target_item_id: assessment.id.clone(),
review,
action,
},
)
}
codex_protocol::protocol::GuardianAssessmentStatus::Approved
| codex_protocol::protocol::GuardianAssessmentStatus::Denied
| codex_protocol::protocol::GuardianAssessmentStatus::Aborted => {
ServerNotification::ItemGuardianApprovalReviewCompleted(
ItemGuardianApprovalReviewCompletedNotification {
thread_id: conversation_id.to_string(),
turn_id,
target_item_id: assessment.id.clone(),
review,
action,
},
)
}
}
}
pub fn convert_patch_changes(changes: &HashMap<PathBuf, FileChange>) -> Vec<FileUpdateChange> {
let mut converted: Vec<FileUpdateChange> = changes
.iter()
.map(|(path, change)| FileUpdateChange {
path: path.to_string_lossy().into_owned(),
kind: map_patch_change_kind(change),
diff: format_file_change_diff(change),
})
.collect();
converted.sort_by(|a, b| a.path.cmp(&b.path));
converted
}
fn map_patch_change_kind(change: &FileChange) -> PatchChangeKind {
match change {
FileChange::Add { .. } => PatchChangeKind::Add,
FileChange::Delete { .. } => PatchChangeKind::Delete,
FileChange::Update { move_path, .. } => PatchChangeKind::Update {
move_path: move_path.clone(),
},
}
}
fn format_file_change_diff(change: &FileChange) -> String {
match change {
FileChange::Add { content } => content.clone(),
FileChange::Delete { content } => content.clone(),
FileChange::Update {
unified_diff,
move_path,
} => {
if let Some(path) = move_path {
format!("{unified_diff}\n\nMoved to: {}", path.display())
} else {
unified_diff.clone()
}
}
}
}

View File

@@ -2,6 +2,7 @@
// Exposes protocol pieces used by `lib.rs` via `pub use protocol::common::*;`.
pub mod common;
pub mod item_builders;
mod mappers;
mod serde_helpers;
pub mod thread_history;

View File

@@ -1,16 +1,18 @@
use crate::protocol::item_builders::build_command_execution_begin_item;
use crate::protocol::item_builders::build_command_execution_end_item;
use crate::protocol::item_builders::build_file_change_approval_request_item;
use crate::protocol::item_builders::build_file_change_begin_item;
use crate::protocol::item_builders::build_file_change_end_item;
use crate::protocol::item_builders::build_item_from_guardian_event;
use crate::protocol::v2::CollabAgentState;
use crate::protocol::v2::CollabAgentTool;
use crate::protocol::v2::CollabAgentToolCallStatus;
use crate::protocol::v2::CommandAction;
use crate::protocol::v2::CommandExecutionStatus;
use crate::protocol::v2::DynamicToolCallOutputContentItem;
use crate::protocol::v2::DynamicToolCallStatus;
use crate::protocol::v2::FileUpdateChange;
use crate::protocol::v2::McpToolCallError;
use crate::protocol::v2::McpToolCallResult;
use crate::protocol::v2::McpToolCallStatus;
use crate::protocol::v2::PatchApplyStatus;
use crate::protocol::v2::PatchChangeKind;
use crate::protocol::v2::ThreadItem;
use crate::protocol::v2::Turn;
use crate::protocol::v2::TurnError as V2TurnError;
@@ -31,6 +33,8 @@ use codex_protocol::protocol::ErrorEvent;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::ExecCommandBeginEvent;
use codex_protocol::protocol::ExecCommandEndEvent;
use codex_protocol::protocol::GuardianAssessmentEvent;
use codex_protocol::protocol::GuardianAssessmentStatus;
use codex_protocol::protocol::ImageGenerationBeginEvent;
use codex_protocol::protocol::ImageGenerationEndEvent;
use codex_protocol::protocol::ItemCompletedEvent;
@@ -53,6 +57,14 @@ use std::collections::HashMap;
use tracing::warn;
use uuid::Uuid;
#[cfg(test)]
use crate::protocol::v2::CommandAction;
#[cfg(test)]
use crate::protocol::v2::FileUpdateChange;
#[cfg(test)]
use crate::protocol::v2::PatchApplyStatus;
#[cfg(test)]
use crate::protocol::v2::PatchChangeKind;
#[cfg(test)]
use codex_protocol::protocol::ExecCommandStatus as CoreExecCommandStatus;
#[cfg(test)]
@@ -149,6 +161,7 @@ impl ThreadHistoryBuilder {
EventMsg::WebSearchEnd(payload) => self.handle_web_search_end(payload),
EventMsg::ExecCommandBegin(payload) => self.handle_exec_command_begin(payload),
EventMsg::ExecCommandEnd(payload) => self.handle_exec_command_end(payload),
EventMsg::GuardianAssessment(payload) => self.handle_guardian_assessment(payload),
EventMsg::ApplyPatchApprovalRequest(payload) => {
self.handle_apply_patch_approval_request(payload)
}
@@ -375,57 +388,12 @@ impl ThreadHistoryBuilder {
}
fn handle_exec_command_begin(&mut self, payload: &ExecCommandBeginEvent) {
let command = shlex::try_join(payload.command.iter().map(String::as_str))
.unwrap_or_else(|_| payload.command.join(" "));
let command_actions = payload
.parsed_cmd
.iter()
.cloned()
.map(CommandAction::from)
.collect();
let item = ThreadItem::CommandExecution {
id: payload.call_id.clone(),
command,
cwd: payload.cwd.clone(),
process_id: payload.process_id.clone(),
source: payload.source.into(),
status: CommandExecutionStatus::InProgress,
command_actions,
aggregated_output: None,
exit_code: None,
duration_ms: None,
};
let item = build_command_execution_begin_item(payload);
self.upsert_item_in_turn_id(&payload.turn_id, item);
}
fn handle_exec_command_end(&mut self, payload: &ExecCommandEndEvent) {
let status: CommandExecutionStatus = (&payload.status).into();
let duration_ms = i64::try_from(payload.duration.as_millis()).unwrap_or(i64::MAX);
let aggregated_output = if payload.aggregated_output.is_empty() {
None
} else {
Some(payload.aggregated_output.clone())
};
let command = shlex::try_join(payload.command.iter().map(String::as_str))
.unwrap_or_else(|_| payload.command.join(" "));
let command_actions = payload
.parsed_cmd
.iter()
.cloned()
.map(CommandAction::from)
.collect();
let item = ThreadItem::CommandExecution {
id: payload.call_id.clone(),
command,
cwd: payload.cwd.clone(),
process_id: payload.process_id.clone(),
source: payload.source.into(),
status,
command_actions,
aggregated_output,
exit_code: Some(payload.exit_code),
duration_ms: Some(duration_ms),
};
let item = build_command_execution_end_item(payload);
// Command completions can arrive out of order. Unified exec may return
// while a PTY is still running, then emit ExecCommandEnd later from a
// background exit watcher when that process finally exits. By then, a
@@ -434,12 +402,26 @@ impl ThreadHistoryBuilder {
self.upsert_item_in_turn_id(&payload.turn_id, item);
}
fn handle_apply_patch_approval_request(&mut self, payload: &ApplyPatchApprovalRequestEvent) {
let item = ThreadItem::FileChange {
id: payload.call_id.clone(),
changes: convert_patch_changes(&payload.changes),
status: PatchApplyStatus::InProgress,
fn handle_guardian_assessment(&mut self, payload: &GuardianAssessmentEvent) {
let status = match payload.status {
GuardianAssessmentStatus::InProgress => CommandExecutionStatus::InProgress,
GuardianAssessmentStatus::Denied | GuardianAssessmentStatus::Aborted => {
CommandExecutionStatus::Declined
}
GuardianAssessmentStatus::Approved => return,
};
let Some(item) = build_item_from_guardian_event(payload, status) else {
return;
};
if payload.turn_id.is_empty() {
self.upsert_item_in_current_turn(item);
} else {
self.upsert_item_in_turn_id(&payload.turn_id, item);
}
}
fn handle_apply_patch_approval_request(&mut self, payload: &ApplyPatchApprovalRequestEvent) {
let item = build_file_change_approval_request_item(payload);
if payload.turn_id.is_empty() {
self.upsert_item_in_current_turn(item);
} else {
@@ -448,11 +430,7 @@ impl ThreadHistoryBuilder {
}
fn handle_patch_apply_begin(&mut self, payload: &PatchApplyBeginEvent) {
let item = ThreadItem::FileChange {
id: payload.call_id.clone(),
changes: convert_patch_changes(&payload.changes),
status: PatchApplyStatus::InProgress,
};
let item = build_file_change_begin_item(payload);
if payload.turn_id.is_empty() {
self.upsert_item_in_current_turn(item);
} else {
@@ -461,12 +439,7 @@ impl ThreadHistoryBuilder {
}
fn handle_patch_apply_end(&mut self, payload: &PatchApplyEndEvent) {
let status: PatchApplyStatus = (&payload.status).into();
let item = ThreadItem::FileChange {
id: payload.call_id.clone(),
changes: convert_patch_changes(&payload.changes),
status,
};
let item = build_file_change_end_item(payload);
if payload.turn_id.is_empty() {
self.upsert_item_in_current_turn(item);
} else {
@@ -1076,21 +1049,6 @@ fn render_review_output_text(output: &ReviewOutputEvent) -> String {
}
}
pub fn convert_patch_changes(
changes: &HashMap<std::path::PathBuf, codex_protocol::protocol::FileChange>,
) -> Vec<FileUpdateChange> {
let mut converted: Vec<FileUpdateChange> = changes
.iter()
.map(|(path, change)| FileUpdateChange {
path: path.to_string_lossy().into_owned(),
kind: map_patch_change_kind(change),
diff: format_file_change_diff(change),
})
.collect();
converted.sort_by(|a, b| a.path.cmp(&b.path));
converted
}
fn convert_dynamic_tool_content_items(
items: &[codex_protocol::dynamic_tools::DynamicToolCallOutputContentItem],
) -> Vec<DynamicToolCallOutputContentItem> {
@@ -1108,33 +1066,6 @@ fn convert_dynamic_tool_content_items(
.collect()
}
fn map_patch_change_kind(change: &codex_protocol::protocol::FileChange) -> PatchChangeKind {
match change {
codex_protocol::protocol::FileChange::Add { .. } => PatchChangeKind::Add,
codex_protocol::protocol::FileChange::Delete { .. } => PatchChangeKind::Delete,
codex_protocol::protocol::FileChange::Update { move_path, .. } => PatchChangeKind::Update {
move_path: move_path.clone(),
},
}
}
fn format_file_change_diff(change: &codex_protocol::protocol::FileChange) -> String {
match change {
codex_protocol::protocol::FileChange::Add { content } => content.clone(),
codex_protocol::protocol::FileChange::Delete { content } => content.clone(),
codex_protocol::protocol::FileChange::Update {
unified_diff,
move_path,
} => {
if let Some(path) = move_path {
format!("{unified_diff}\n\nMoved to: {}", path.display())
} else {
unified_diff.clone()
}
}
}
}
fn upsert_turn_item(items: &mut Vec<ThreadItem>, item: ThreadItem) {
if let Some(existing_item) = items
.iter_mut()
@@ -2030,6 +1961,136 @@ mod tests {
);
}
#[test]
fn reconstructs_declined_guardian_command_item() {
let events = vec![
EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-1".into(),
model_context_window: None,
collaboration_mode_kind: Default::default(),
}),
EventMsg::UserMessage(UserMessageEvent {
message: "review this command".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
}),
EventMsg::GuardianAssessment(GuardianAssessmentEvent {
id: "guardian-exec".into(),
turn_id: "turn-1".into(),
status: GuardianAssessmentStatus::InProgress,
risk_score: None,
risk_level: None,
rationale: None,
action: serde_json::from_value(serde_json::json!({
"type": "command",
"source": "shell",
"command": "rm -rf /tmp/guardian",
"cwd": "/tmp",
}))
.expect("guardian action"),
}),
EventMsg::GuardianAssessment(GuardianAssessmentEvent {
id: "guardian-exec".into(),
turn_id: "turn-1".into(),
status: GuardianAssessmentStatus::Denied,
risk_score: Some(97),
risk_level: Some(codex_protocol::protocol::GuardianRiskLevel::High),
rationale: Some("Would delete user data.".into()),
action: serde_json::from_value(serde_json::json!({
"type": "command",
"source": "shell",
"command": "rm -rf /tmp/guardian",
"cwd": "/tmp",
}))
.expect("guardian action"),
}),
];
let items = events
.into_iter()
.map(RolloutItem::EventMsg)
.collect::<Vec<_>>();
let turns = build_turns_from_rollout_items(&items);
assert_eq!(turns.len(), 1);
assert_eq!(turns[0].items.len(), 2);
assert_eq!(
turns[0].items[1],
ThreadItem::CommandExecution {
id: "guardian-exec".into(),
command: "rm -rf /tmp/guardian".into(),
cwd: PathBuf::from("/tmp"),
process_id: None,
source: CommandExecutionSource::Agent,
status: CommandExecutionStatus::Declined,
command_actions: vec![CommandAction::Unknown {
command: "rm -rf /tmp/guardian".into(),
}],
aggregated_output: None,
exit_code: None,
duration_ms: None,
}
);
}
#[test]
fn reconstructs_in_progress_guardian_execve_item() {
let events = vec![
EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-1".into(),
model_context_window: None,
collaboration_mode_kind: Default::default(),
}),
EventMsg::UserMessage(UserMessageEvent {
message: "run a subcommand".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
}),
EventMsg::GuardianAssessment(GuardianAssessmentEvent {
id: "guardian-execve".into(),
turn_id: "turn-1".into(),
status: GuardianAssessmentStatus::InProgress,
risk_score: None,
risk_level: None,
rationale: None,
action: serde_json::from_value(serde_json::json!({
"type": "execve",
"source": "shell",
"program": "/bin/rm",
"argv": ["/usr/bin/rm", "-f", "/tmp/file.sqlite"],
"cwd": "/tmp",
}))
.expect("guardian action"),
}),
];
let items = events
.into_iter()
.map(RolloutItem::EventMsg)
.collect::<Vec<_>>();
let turns = build_turns_from_rollout_items(&items);
assert_eq!(turns.len(), 1);
assert_eq!(turns[0].items.len(), 2);
assert_eq!(
turns[0].items[1],
ThreadItem::CommandExecution {
id: "guardian-execve".into(),
command: "/bin/rm -f /tmp/file.sqlite".into(),
cwd: PathBuf::from("/tmp"),
process_id: None,
source: CommandExecutionSource::Agent,
status: CommandExecutionStatus::InProgress,
command_actions: vec![CommandAction::Unknown {
command: "/bin/rm -f /tmp/file.sqlite".into(),
}],
aggregated_output: None,
exit_code: None,
duration_ms: None,
}
);
}
#[test]
fn assigns_late_exec_completion_to_original_turn() {
let events = vec![

File diff suppressed because it is too large Load Diff

View File

@@ -16,6 +16,7 @@ use std::sync::Weak;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tracing::error;
type PendingInterruptQueue = Vec<(
ConnectionRequestId,
@@ -116,6 +117,38 @@ impl ThreadState {
}
}
pub(crate) async fn resolve_server_request_on_thread_listener(
thread_state: &Arc<Mutex<ThreadState>>,
request_id: RequestId,
) {
let (completion_tx, completion_rx) = oneshot::channel();
let listener_command_tx = {
let state = thread_state.lock().await;
state.listener_command_tx()
};
let Some(listener_command_tx) = listener_command_tx else {
error!("failed to remove pending client request: thread listener is not running");
return;
};
if listener_command_tx
.send(ThreadListenerCommand::ResolveServerRequest {
request_id,
completion_tx,
})
.is_err()
{
error!(
"failed to remove pending client request: thread listener command channel is closed"
);
return;
}
if let Err(err) = completion_rx.await {
error!("failed to remove pending client request: {err}");
}
}
struct ThreadEntry {
state: Arc<Mutex<ThreadState>>,
connection_ids: HashSet<ConnectionId>,