Compare commits

...

11 Commits

Author SHA1 Message Date
Ahmed Ibrahim
df4c918bd2 Restore async legacy notify hook 2026-03-04 21:55:12 -08:00
Ahmed Ibrahim
7d7b67192f Re-export turn state 2026-03-04 21:22:53 -08:00
Ahmed Ibrahim
0d246928c2 Remove unused thread resume import 2026-03-04 21:12:26 -08:00
Ahmed Ibrahim
c6f5627349 Cancel tasks before clearing pending approvals 2026-03-04 21:03:04 -08:00
Ahmed Ibrahim
8385a285a2 Handle missing wiremock request log 2026-03-04 20:53:54 -08:00
Ahmed Ibrahim
206cfb561f Wait for resumed approval follow-up requests 2026-03-04 20:48:32 -08:00
Ahmed Ibrahim
bf3c63d160 Remove invalid abort task assertion 2026-03-04 20:41:05 -08:00
Ahmed Ibrahim
b3f11d668a Suppress cancelled turn follow-up requests 2026-03-04 20:33:30 -08:00
Ahmed Ibrahim
39c4f2b06d Wait for legacy notify hook completion 2026-03-04 20:20:48 -08:00
Ahmed Ibrahim
3a932261cf Attach subagent completion watcher before input 2026-03-04 20:03:01 -08:00
Ahmed Ibrahim
42a6b98598 Start flaky test stabilization 2026-03-04 19:52:17 -08:00
8 changed files with 169 additions and 36 deletions

View File

@@ -4,7 +4,7 @@ use app_test_support::create_apply_patch_sse_response;
use app_test_support::create_fake_rollout_with_text_elements;
use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::create_mock_responses_server_sequence;
use app_test_support::create_mock_responses_server_sequence_unchecked;
use app_test_support::create_shell_command_sse_response;
use app_test_support::rollout_path;
use app_test_support::to_response;
@@ -59,6 +59,36 @@ use uuid::Uuid;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
const CODEX_5_2_INSTRUCTIONS_TEMPLATE_DEFAULT: &str = "You are Codex, a coding agent based on GPT-5. You and the user share the same workspace and collaborate to achieve the user's goals.";
async fn wait_for_responses_request_count(
server: &wiremock::MockServer,
expected_count: usize,
) -> Result<()> {
timeout(DEFAULT_READ_TIMEOUT, async {
loop {
let Some(requests) = server.received_requests().await else {
anyhow::bail!("wiremock did not record requests");
};
let responses_request_count = requests
.iter()
.filter(|request| {
request.method == "POST" && request.url.path().ends_with("/responses")
})
.count();
if responses_request_count == expected_count {
return Ok::<(), anyhow::Error>(());
}
if responses_request_count > expected_count {
anyhow::bail!(
"expected exactly {expected_count} /responses requests, got {responses_request_count}"
);
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
})
.await??;
Ok(())
}
#[tokio::test]
async fn thread_resume_rejects_unmaterialized_thread() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
@@ -870,7 +900,7 @@ async fn thread_resume_replays_pending_command_execution_request_approval() -> R
)?,
create_final_assistant_message_sse_response("done")?,
];
let server = create_mock_responses_server_sequence(responses).await;
let server = create_mock_responses_server_sequence_unchecked(responses).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
@@ -985,6 +1015,7 @@ async fn thread_resume_replays_pending_command_execution_request_approval() -> R
primary.read_stream_until_notification_message("turn/completed"),
)
.await??;
wait_for_responses_request_count(&server, 3).await?;
Ok(())
}
@@ -1007,7 +1038,7 @@ async fn thread_resume_replays_pending_file_change_request_approval() -> Result<
create_apply_patch_sse_response(patch, "patch-call")?,
create_final_assistant_message_sse_response("done")?,
];
let server = create_mock_responses_server_sequence(responses).await;
let server = create_mock_responses_server_sequence_unchecked(responses).await;
create_config_toml(&codex_home, &server.uri())?;
let mut primary = McpProcess::new(&codex_home).await?;
@@ -1150,6 +1181,7 @@ async fn thread_resume_replays_pending_file_change_request_approval() -> Result<
primary.read_stream_until_notification_message("turn/completed"),
)
.await??;
wait_for_responses_request_count(&server, 3).await?;
Ok(())
}

View File

@@ -168,6 +168,18 @@ async fn thread_unsubscribe_during_turn_interrupts_turn_and_emits_thread_closed(
};
assert_eq!(payload.thread_id, thread_id);
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
let requests = server
.received_requests()
.await
.context("failed to fetch received requests")?;
assert_eq!(
requests.len(),
1,
"unsubscribe should not trigger a follow-up responses request"
);
Ok(())
}

View File

@@ -25,6 +25,7 @@ use codex_protocol::user_input::UserInput;
use std::sync::Arc;
use std::sync::Weak;
use tokio::sync::watch;
use tracing::debug;
const AGENT_NAMES: &str = include_str!("agent_names.txt");
const FORKED_SPAWN_AGENT_OUTPUT_MESSAGE: &str = "You are the newly spawned agent. The prior conversation history was forked from your parent agent. Treat the next user message as your new task, and use the forked history only as background context.";
@@ -212,8 +213,9 @@ impl AgentControl {
// TODO(jif) add helper for drain
state.notify_thread_created(new_thread.thread_id);
self.maybe_start_completion_watcher(new_thread.thread_id, notification_source)
.await;
self.send_input(new_thread.thread_id, items).await?;
self.maybe_start_completion_watcher(new_thread.thread_id, notification_source);
Ok(new_thread.thread_id)
}
@@ -288,7 +290,8 @@ impl AgentControl {
// Resumed threads are re-registered in-memory and need the same listener
// attachment path as freshly spawned threads.
state.notify_thread_created(resumed_thread.thread_id);
self.maybe_start_completion_watcher(resumed_thread.thread_id, Some(notification_source));
self.maybe_start_completion_watcher(resumed_thread.thread_id, Some(notification_source))
.await;
Ok(resumed_thread.thread_id)
}
@@ -418,7 +421,7 @@ impl AgentControl {
///
/// This is only enabled for `SubAgentSource::ThreadSpawn`, where a parent thread exists and
/// can receive completion notifications.
fn maybe_start_completion_watcher(
async fn maybe_start_completion_watcher(
&self,
child_thread_id: ThreadId,
session_source: Option<SessionSource>,
@@ -429,13 +432,20 @@ impl AgentControl {
else {
return;
};
let status_rx = self.subscribe_status(child_thread_id).await.ok();
let control = self.clone();
tokio::spawn(async move {
let status = match control.subscribe_status(child_thread_id).await {
Ok(mut status_rx) => {
let status = match status_rx {
Some(mut status_rx) => {
debug!(%child_thread_id, "subagent completion watcher attached");
let mut status = status_rx.borrow().clone();
while !is_final(&status) {
if status_rx.changed().await.is_err() {
debug!(
%child_thread_id,
"subagent completion watcher lost status stream; reading latest status"
);
status = control.get_status(child_thread_id).await;
break;
}
@@ -443,9 +453,20 @@ impl AgentControl {
}
status
}
Err(_) => control.get_status(child_thread_id).await,
None => {
debug!(
%child_thread_id,
"subagent completion watcher could not subscribe; reading latest status"
);
control.get_status(child_thread_id).await
}
};
if !is_final(&status) {
debug!(
%child_thread_id,
?status,
"subagent completion watcher exiting before final status"
);
return;
}
@@ -1331,15 +1352,18 @@ mod tests {
let (parent_thread_id, parent_thread) = harness.start_thread().await;
let child_thread_id = ThreadId::new();
harness.control.maybe_start_completion_watcher(
child_thread_id,
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
);
harness
.control
.maybe_start_completion_watcher(
child_thread_id,
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
)
.await;
assert_eq!(wait_for_subagent_notification(&parent_thread).await, true);

View File

@@ -6583,6 +6583,16 @@ async fn try_run_sampling_request(
drain_in_flight(&mut in_flight, sess.clone(), turn_context.clone()).await?;
if cancellation_token.is_cancelled()
&& outcome.as_ref().is_ok_and(|result| result.needs_follow_up)
{
debug!(
turn_id = %turn_context.sub_id,
"turn cancelled after tool drain; suppressing follow-up request"
);
return Err(CodexErr::TurnAborted);
}
if should_emit_turn_diff {
let unified_diff = {
let mut tracker = turn_diff_tracker.lock().await;

View File

@@ -7,3 +7,4 @@ pub(crate) use session::SessionState;
pub(crate) use turn::ActiveTurn;
pub(crate) use turn::RunningTask;
pub(crate) use turn::TaskKind;
pub(crate) use turn::TurnState;

View File

@@ -149,11 +149,3 @@ impl TurnState {
!self.pending_input.is_empty()
}
}
impl ActiveTurn {
/// Clear any pending approvals and input buffered for the current turn.
pub(crate) async fn clear_pending(&self) {
let mut ts = self.turn_state.lock().await;
ts.clear_pending();
}
}

View File

@@ -31,12 +31,14 @@ use crate::protocol::TurnCompleteEvent;
use crate::state::ActiveTurn;
use crate::state::RunningTask;
use crate::state::TaskKind;
use crate::state::TurnState;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::user_input::UserInput;
use tokio::sync::Mutex;
use crate::features::Feature;
pub(crate) use compact::CompactTask;
@@ -192,7 +194,16 @@ impl Session {
}
pub async fn abort_all_tasks(self: &Arc<Self>, reason: TurnAbortReason) {
for task in self.take_all_running_tasks().await {
let (tasks, turn_state) = self.take_all_running_tasks().await;
for task in &tasks {
task.cancellation_token.cancel();
}
if let Some(turn_state) = turn_state {
// Drop pending approvals only after all running tasks observe cancellation, so
// interrupted approval waits resolve as aborts instead of synthetic denials.
turn_state.lock().await.clear_pending();
}
for task in tasks {
self.handle_task_abort(task, reason.clone()).await;
}
if reason == TurnAbortReason::Interrupted {
@@ -328,15 +339,14 @@ impl Session {
*active = Some(turn);
}
async fn take_all_running_tasks(&self) -> Vec<RunningTask> {
async fn take_all_running_tasks(&self) -> (Vec<RunningTask>, Option<Arc<Mutex<TurnState>>>) {
let mut active = self.active_turn.lock().await;
match active.take() {
Some(mut at) => {
at.clear_pending().await;
at.drain_tasks()
let turn_state = Arc::clone(&at.turn_state);
(at.drain_tasks(), Some(turn_state))
}
None => Vec::new(),
None => (Vec::new(), None),
}
}
@@ -349,10 +359,6 @@ impl Session {
async fn handle_task_abort(self: &Arc<Self>, task: RunningTask, reason: TurnAbortReason) {
let sub_id = task.turn_context.sub_id.clone();
if task.cancellation_token.is_cancelled() {
return;
}
trace!(task_kind = ?task.kind, sub_id, "aborting running task");
task.cancellation_token.cancel();
task.turn_context

View File

@@ -239,3 +239,59 @@ async fn interrupt_persists_turn_aborted_marker_in_next_request() {
"expected <turn_aborted> marker in follow-up request"
);
}
/// Interrupting a turn while a tool-produced follow-up is pending must not
/// start another model request before the session reports TurnAborted.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn interrupt_does_not_issue_follow_up_request() {
let command = "sleep 60";
let call_id = "call-no-follow-up";
let args = json!({
"command": command,
"timeout_ms": 60_000
})
.to_string();
let first_body = sse(vec![
ev_response_created("resp-no-follow-up"),
ev_function_call(call_id, "shell_command", &args),
ev_completed("resp-no-follow-up"),
]);
let server = start_mock_server().await;
let response_mock = mount_sse_once(&server, first_body).await;
let fixture = test_codex()
.with_model("gpt-5.1")
.build(&server)
.await
.unwrap();
let codex = Arc::clone(&fixture.codex);
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "start interrupt follow-up guard".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecCommandBegin(_))).await;
tokio::time::sleep(Duration::from_secs_f32(0.1)).await;
codex.submit(Op::Interrupt).await.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnAborted(_))).await;
tokio::time::sleep(Duration::from_millis(200)).await;
let requests = response_mock.requests();
assert_eq!(
requests.len(),
1,
"interrupt should not issue a follow-up responses request"
);
}