Compare commits

...

4 Commits

Author SHA1 Message Date
YOUR NAME
65b9c3136c fix: satisfy argument comment lint in followup_task test 2026-04-02 16:49:08 -07:00
YOUR NAME
9628140d13 Merge remote-tracking branch 'origin/main' into codex/15868-run-20260326-100615-e2e-pr
# Conflicts:
#	codex-rs/core/src/tools/handlers/multi_agents_tests.rs
2026-04-02 15:16:15 -07:00
YOUR NAME
ce8df5c520 Stabilize multi-agent assign_task test 2026-03-26 13:26:26 -07:00
YOUR NAME
e58d6c9c9c fix(windows): kill timed-out shell command descendants 2026-03-26 10:30:56 -07:00
5 changed files with 124 additions and 16 deletions

View File

@@ -4142,6 +4142,18 @@ impl Session {
}
}
#[cfg(test)]
pub(crate) async fn pending_input_snapshot(&self) -> Vec<ResponseInputItem> {
let active = self.active_turn.lock().await;
match active.as_ref() {
Some(at) => {
let ts = at.turn_state.lock().await;
ts.pending_input_snapshot()
}
None => Vec::new(),
}
}
/// Queue response items to be injected into the next active turn created for this session.
#[cfg(test)]
pub(crate) async fn queue_response_items_for_next_turn(&self, items: Vec<ResponseInputItem>) {
@@ -4157,6 +4169,13 @@ impl Session {
std::mem::take(&mut *self.idle_pending_input.lock().await)
}
#[cfg(test)]
pub(crate) async fn queued_response_items_for_next_turn_snapshot(
&self,
) -> Vec<ResponseInputItem> {
self.idle_pending_input.lock().await.clone()
}
pub(crate) async fn has_queued_response_items_for_next_turn(&self) -> bool {
!self.idle_pending_input.lock().await.is_empty()
}

View File

@@ -220,6 +220,11 @@ impl TurnState {
}
}
#[cfg(test)]
pub(crate) fn pending_input_snapshot(&self) -> Vec<ResponseInputItem> {
self.pending_input.clone()
}
pub(crate) fn has_pending_input(&self) -> bool {
!self.pending_input.is_empty()
}

View File

@@ -112,6 +112,18 @@ fn history_contains_inter_agent_communication(
})
}
fn response_input_items_contain_inter_agent_communication(
input_items: &[ResponseInputItem],
expected: &InterAgentCommunication,
) -> bool {
let response_items: Vec<ResponseItem> = input_items
.iter()
.cloned()
.map(ResponseItem::from)
.collect();
history_contains_inter_agent_communication(&response_items, expected)
}
#[derive(Clone, Copy)]
struct NeverEndingTask;
@@ -1232,6 +1244,13 @@ async fn multi_agent_v2_followup_task_interrupts_busy_child_without_losing_messa
.iter()
.filter_map(|(id, op)| (*id == agent_id).then_some(op))
.collect();
let expected_communication = InterAgentCommunication::new(
AgentPath::root(),
AgentPath::try_from("/root/worker").expect("agent path"),
Vec::new(),
"continue".to_string(),
/*trigger_turn*/ true,
);
assert!(ops_for_agent.iter().any(|op| matches!(op, Op::Interrupt)));
assert!(ops_for_agent.iter().any(|op| {
matches!(
@@ -1241,9 +1260,13 @@ async fn multi_agent_v2_followup_task_interrupts_busy_child_without_losing_messa
&& communication.recipient.as_str() == "/root/worker"
&& communication.other_recipients.is_empty()
&& communication.content == "continue"
&& communication.trigger_turn
)
}));
// `followup_task` wakes the interrupted child immediately, so the redirected
// envelope may still be buffered in turn state before it is materialized
// into history on slower runners.
timeout(Duration::from_secs(5), async {
loop {
let history_items = thread
@@ -1253,16 +1276,22 @@ async fn multi_agent_v2_followup_task_interrupts_busy_child_without_losing_messa
.await
.raw_items()
.to_vec();
let saw_envelope = history_contains_inter_agent_communication(
&history_items,
&InterAgentCommunication::new(
AgentPath::root(),
AgentPath::try_from("/root/worker").expect("agent path"),
Vec::new(),
"continue".to_string(),
/*trigger_turn*/ true,
),
);
let pending_input_items = thread.codex.session.pending_input_snapshot().await;
let queued_input_items = thread
.codex
.session
.queued_response_items_for_next_turn_snapshot()
.await;
let saw_envelope =
history_contains_inter_agent_communication(&history_items, &expected_communication)
|| response_input_items_contain_inter_agent_communication(
&pending_input_items,
&expected_communication,
)
|| response_input_items_contain_inter_agent_communication(
&queued_input_items,
&expected_communication,
);
let saw_user_message = history_items.iter().any(|item| {
matches!(
item,

View File

@@ -24,6 +24,7 @@ winapi = { version = "0.3.9", features = [
"handleapi",
"minwinbase",
"processthreadsapi",
"sysinfoapi",
"synchapi",
"winbase",
"wincon",

View File

@@ -15,9 +15,30 @@
//!
//! On non-Unix platforms these helpers are no-ops.
#[cfg(windows)]
use std::ffi::OsString;
use std::io;
#[cfg(windows)]
use std::os::windows::ffi::OsStringExt;
#[cfg(windows)]
use std::path::PathBuf;
use tokio::process::Child;
#[cfg(windows)]
use winapi::um::sysinfoapi::GetSystemDirectoryW;
#[cfg(windows)]
fn trusted_taskkill_path() -> PathBuf {
let mut buffer = [0_u16; 260];
let len = unsafe { GetSystemDirectoryW(buffer.as_mut_ptr(), buffer.len() as u32) };
if len > 0 && (len as usize) < buffer.len() {
let mut path = PathBuf::from(OsString::from_wide(&buffer[..len as usize]));
path.push("taskkill.exe");
return path;
}
PathBuf::from(r"C:\Windows\System32\taskkill.exe")
}
#[cfg(target_os = "linux")]
/// Ensure the child receives SIGTERM when the original parent dies.
@@ -111,8 +132,25 @@ pub fn kill_process_group_by_pid(pid: u32) -> io::Result<()> {
Ok(())
}
#[cfg(not(unix))]
/// No-op on non-Unix platforms.
#[cfg(windows)]
/// Best-effort termination of a process tree rooted at `pid`.
///
/// Uses `taskkill /PID <pid> /T /F` so descendants are terminated as well.
pub fn kill_process_group_by_pid(pid: u32) -> io::Result<()> {
use std::process::Command;
let command_result = Command::new(trusted_taskkill_path())
.args(["/PID", &pid.to_string(), "/T", "/F"])
.output();
// Best-effort cleanup path: timeout/interrupt handling should continue to
// the direct-child kill path even if taskkill launch or execution fails.
let _ = command_result;
Ok(())
}
#[cfg(not(any(unix, windows)))]
/// No-op on non-Unix platforms without a process-group primitive.
pub fn kill_process_group_by_pid(_pid: u32) -> io::Result<()> {
Ok(())
}
@@ -161,8 +199,14 @@ pub fn kill_process_group(process_group_id: u32) -> io::Result<()> {
Ok(())
}
#[cfg(not(unix))]
/// No-op on non-Unix platforms.
#[cfg(windows)]
/// Best-effort termination for a process-tree root id.
pub fn kill_process_group(process_group_id: u32) -> io::Result<()> {
kill_process_group_by_pid(process_group_id)
}
#[cfg(not(any(unix, windows)))]
/// No-op on non-Unix platforms without a process-group primitive.
pub fn kill_process_group(_process_group_id: u32) -> io::Result<()> {
Ok(())
}
@@ -177,8 +221,18 @@ pub fn kill_child_process_group(child: &mut Child) -> io::Result<()> {
Ok(())
}
#[cfg(not(unix))]
/// No-op on non-Unix platforms.
#[cfg(windows)]
/// Kill the Windows process tree rooted at a tokio child pid (best-effort).
pub fn kill_child_process_group(child: &mut Child) -> io::Result<()> {
if let Some(pid) = child.id() {
return kill_process_group_by_pid(pid);
}
Ok(())
}
#[cfg(not(any(unix, windows)))]
/// No-op on non-Unix platforms without a process-group primitive.
pub fn kill_child_process_group(_child: &mut Child) -> io::Result<()> {
Ok(())
}