mirror of
https://github.com/openai/codex.git
synced 2026-03-05 06:03:20 +00:00
Compare commits
11 Commits
main
...
codex/flak
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
df4c918bd2 | ||
|
|
7d7b67192f | ||
|
|
0d246928c2 | ||
|
|
c6f5627349 | ||
|
|
8385a285a2 | ||
|
|
206cfb561f | ||
|
|
bf3c63d160 | ||
|
|
b3f11d668a | ||
|
|
39c4f2b06d | ||
|
|
3a932261cf | ||
|
|
42a6b98598 |
@@ -4,7 +4,7 @@ use app_test_support::create_apply_patch_sse_response;
|
||||
use app_test_support::create_fake_rollout_with_text_elements;
|
||||
use app_test_support::create_final_assistant_message_sse_response;
|
||||
use app_test_support::create_mock_responses_server_repeating_assistant;
|
||||
use app_test_support::create_mock_responses_server_sequence;
|
||||
use app_test_support::create_mock_responses_server_sequence_unchecked;
|
||||
use app_test_support::create_shell_command_sse_response;
|
||||
use app_test_support::rollout_path;
|
||||
use app_test_support::to_response;
|
||||
@@ -59,6 +59,36 @@ use uuid::Uuid;
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
const CODEX_5_2_INSTRUCTIONS_TEMPLATE_DEFAULT: &str = "You are Codex, a coding agent based on GPT-5. You and the user share the same workspace and collaborate to achieve the user's goals.";
|
||||
|
||||
async fn wait_for_responses_request_count(
|
||||
server: &wiremock::MockServer,
|
||||
expected_count: usize,
|
||||
) -> Result<()> {
|
||||
timeout(DEFAULT_READ_TIMEOUT, async {
|
||||
loop {
|
||||
let Some(requests) = server.received_requests().await else {
|
||||
anyhow::bail!("wiremock did not record requests");
|
||||
};
|
||||
let responses_request_count = requests
|
||||
.iter()
|
||||
.filter(|request| {
|
||||
request.method == "POST" && request.url.path().ends_with("/responses")
|
||||
})
|
||||
.count();
|
||||
if responses_request_count == expected_count {
|
||||
return Ok::<(), anyhow::Error>(());
|
||||
}
|
||||
if responses_request_count > expected_count {
|
||||
anyhow::bail!(
|
||||
"expected exactly {expected_count} /responses requests, got {responses_request_count}"
|
||||
);
|
||||
}
|
||||
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
|
||||
}
|
||||
})
|
||||
.await??;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_resume_rejects_unmaterialized_thread() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
@@ -870,7 +900,7 @@ async fn thread_resume_replays_pending_command_execution_request_approval() -> R
|
||||
)?,
|
||||
create_final_assistant_message_sse_response("done")?,
|
||||
];
|
||||
let server = create_mock_responses_server_sequence(responses).await;
|
||||
let server = create_mock_responses_server_sequence_unchecked(responses).await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
@@ -985,6 +1015,7 @@ async fn thread_resume_replays_pending_command_execution_request_approval() -> R
|
||||
primary.read_stream_until_notification_message("turn/completed"),
|
||||
)
|
||||
.await??;
|
||||
wait_for_responses_request_count(&server, 3).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1007,7 +1038,7 @@ async fn thread_resume_replays_pending_file_change_request_approval() -> Result<
|
||||
create_apply_patch_sse_response(patch, "patch-call")?,
|
||||
create_final_assistant_message_sse_response("done")?,
|
||||
];
|
||||
let server = create_mock_responses_server_sequence(responses).await;
|
||||
let server = create_mock_responses_server_sequence_unchecked(responses).await;
|
||||
create_config_toml(&codex_home, &server.uri())?;
|
||||
|
||||
let mut primary = McpProcess::new(&codex_home).await?;
|
||||
@@ -1150,6 +1181,7 @@ async fn thread_resume_replays_pending_file_change_request_approval() -> Result<
|
||||
primary.read_stream_until_notification_message("turn/completed"),
|
||||
)
|
||||
.await??;
|
||||
wait_for_responses_request_count(&server, 3).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -168,6 +168,18 @@ async fn thread_unsubscribe_during_turn_interrupts_turn_and_emits_thread_closed(
|
||||
};
|
||||
assert_eq!(payload.thread_id, thread_id);
|
||||
|
||||
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
|
||||
|
||||
let requests = server
|
||||
.received_requests()
|
||||
.await
|
||||
.context("failed to fetch received requests")?;
|
||||
assert_eq!(
|
||||
requests.len(),
|
||||
1,
|
||||
"unsubscribe should not trigger a follow-up responses request"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ use codex_protocol::user_input::UserInput;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Weak;
|
||||
use tokio::sync::watch;
|
||||
use tracing::debug;
|
||||
|
||||
const AGENT_NAMES: &str = include_str!("agent_names.txt");
|
||||
const FORKED_SPAWN_AGENT_OUTPUT_MESSAGE: &str = "You are the newly spawned agent. The prior conversation history was forked from your parent agent. Treat the next user message as your new task, and use the forked history only as background context.";
|
||||
@@ -212,8 +213,9 @@ impl AgentControl {
|
||||
// TODO(jif) add helper for drain
|
||||
state.notify_thread_created(new_thread.thread_id);
|
||||
|
||||
self.maybe_start_completion_watcher(new_thread.thread_id, notification_source)
|
||||
.await;
|
||||
self.send_input(new_thread.thread_id, items).await?;
|
||||
self.maybe_start_completion_watcher(new_thread.thread_id, notification_source);
|
||||
|
||||
Ok(new_thread.thread_id)
|
||||
}
|
||||
@@ -288,7 +290,8 @@ impl AgentControl {
|
||||
// Resumed threads are re-registered in-memory and need the same listener
|
||||
// attachment path as freshly spawned threads.
|
||||
state.notify_thread_created(resumed_thread.thread_id);
|
||||
self.maybe_start_completion_watcher(resumed_thread.thread_id, Some(notification_source));
|
||||
self.maybe_start_completion_watcher(resumed_thread.thread_id, Some(notification_source))
|
||||
.await;
|
||||
|
||||
Ok(resumed_thread.thread_id)
|
||||
}
|
||||
@@ -418,7 +421,7 @@ impl AgentControl {
|
||||
///
|
||||
/// This is only enabled for `SubAgentSource::ThreadSpawn`, where a parent thread exists and
|
||||
/// can receive completion notifications.
|
||||
fn maybe_start_completion_watcher(
|
||||
async fn maybe_start_completion_watcher(
|
||||
&self,
|
||||
child_thread_id: ThreadId,
|
||||
session_source: Option<SessionSource>,
|
||||
@@ -429,13 +432,20 @@ impl AgentControl {
|
||||
else {
|
||||
return;
|
||||
};
|
||||
|
||||
let status_rx = self.subscribe_status(child_thread_id).await.ok();
|
||||
let control = self.clone();
|
||||
tokio::spawn(async move {
|
||||
let status = match control.subscribe_status(child_thread_id).await {
|
||||
Ok(mut status_rx) => {
|
||||
let status = match status_rx {
|
||||
Some(mut status_rx) => {
|
||||
debug!(%child_thread_id, "subagent completion watcher attached");
|
||||
let mut status = status_rx.borrow().clone();
|
||||
while !is_final(&status) {
|
||||
if status_rx.changed().await.is_err() {
|
||||
debug!(
|
||||
%child_thread_id,
|
||||
"subagent completion watcher lost status stream; reading latest status"
|
||||
);
|
||||
status = control.get_status(child_thread_id).await;
|
||||
break;
|
||||
}
|
||||
@@ -443,9 +453,20 @@ impl AgentControl {
|
||||
}
|
||||
status
|
||||
}
|
||||
Err(_) => control.get_status(child_thread_id).await,
|
||||
None => {
|
||||
debug!(
|
||||
%child_thread_id,
|
||||
"subagent completion watcher could not subscribe; reading latest status"
|
||||
);
|
||||
control.get_status(child_thread_id).await
|
||||
}
|
||||
};
|
||||
if !is_final(&status) {
|
||||
debug!(
|
||||
%child_thread_id,
|
||||
?status,
|
||||
"subagent completion watcher exiting before final status"
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1331,15 +1352,18 @@ mod tests {
|
||||
let (parent_thread_id, parent_thread) = harness.start_thread().await;
|
||||
let child_thread_id = ThreadId::new();
|
||||
|
||||
harness.control.maybe_start_completion_watcher(
|
||||
child_thread_id,
|
||||
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id,
|
||||
depth: 1,
|
||||
agent_nickname: None,
|
||||
agent_role: Some("explorer".to_string()),
|
||||
})),
|
||||
);
|
||||
harness
|
||||
.control
|
||||
.maybe_start_completion_watcher(
|
||||
child_thread_id,
|
||||
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id,
|
||||
depth: 1,
|
||||
agent_nickname: None,
|
||||
agent_role: Some("explorer".to_string()),
|
||||
})),
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(wait_for_subagent_notification(&parent_thread).await, true);
|
||||
|
||||
|
||||
@@ -6583,6 +6583,16 @@ async fn try_run_sampling_request(
|
||||
|
||||
drain_in_flight(&mut in_flight, sess.clone(), turn_context.clone()).await?;
|
||||
|
||||
if cancellation_token.is_cancelled()
|
||||
&& outcome.as_ref().is_ok_and(|result| result.needs_follow_up)
|
||||
{
|
||||
debug!(
|
||||
turn_id = %turn_context.sub_id,
|
||||
"turn cancelled after tool drain; suppressing follow-up request"
|
||||
);
|
||||
return Err(CodexErr::TurnAborted);
|
||||
}
|
||||
|
||||
if should_emit_turn_diff {
|
||||
let unified_diff = {
|
||||
let mut tracker = turn_diff_tracker.lock().await;
|
||||
|
||||
@@ -7,3 +7,4 @@ pub(crate) use session::SessionState;
|
||||
pub(crate) use turn::ActiveTurn;
|
||||
pub(crate) use turn::RunningTask;
|
||||
pub(crate) use turn::TaskKind;
|
||||
pub(crate) use turn::TurnState;
|
||||
|
||||
@@ -149,11 +149,3 @@ impl TurnState {
|
||||
!self.pending_input.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
impl ActiveTurn {
|
||||
/// Clear any pending approvals and input buffered for the current turn.
|
||||
pub(crate) async fn clear_pending(&self) {
|
||||
let mut ts = self.turn_state.lock().await;
|
||||
ts.clear_pending();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,12 +31,14 @@ use crate::protocol::TurnCompleteEvent;
|
||||
use crate::state::ActiveTurn;
|
||||
use crate::state::RunningTask;
|
||||
use crate::state::TaskKind;
|
||||
use crate::state::TurnState;
|
||||
use codex_protocol::items::TurnItem;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use crate::features::Feature;
|
||||
pub(crate) use compact::CompactTask;
|
||||
@@ -192,7 +194,16 @@ impl Session {
|
||||
}
|
||||
|
||||
pub async fn abort_all_tasks(self: &Arc<Self>, reason: TurnAbortReason) {
|
||||
for task in self.take_all_running_tasks().await {
|
||||
let (tasks, turn_state) = self.take_all_running_tasks().await;
|
||||
for task in &tasks {
|
||||
task.cancellation_token.cancel();
|
||||
}
|
||||
if let Some(turn_state) = turn_state {
|
||||
// Drop pending approvals only after all running tasks observe cancellation, so
|
||||
// interrupted approval waits resolve as aborts instead of synthetic denials.
|
||||
turn_state.lock().await.clear_pending();
|
||||
}
|
||||
for task in tasks {
|
||||
self.handle_task_abort(task, reason.clone()).await;
|
||||
}
|
||||
if reason == TurnAbortReason::Interrupted {
|
||||
@@ -328,15 +339,14 @@ impl Session {
|
||||
*active = Some(turn);
|
||||
}
|
||||
|
||||
async fn take_all_running_tasks(&self) -> Vec<RunningTask> {
|
||||
async fn take_all_running_tasks(&self) -> (Vec<RunningTask>, Option<Arc<Mutex<TurnState>>>) {
|
||||
let mut active = self.active_turn.lock().await;
|
||||
match active.take() {
|
||||
Some(mut at) => {
|
||||
at.clear_pending().await;
|
||||
|
||||
at.drain_tasks()
|
||||
let turn_state = Arc::clone(&at.turn_state);
|
||||
(at.drain_tasks(), Some(turn_state))
|
||||
}
|
||||
None => Vec::new(),
|
||||
None => (Vec::new(), None),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -349,10 +359,6 @@ impl Session {
|
||||
|
||||
async fn handle_task_abort(self: &Arc<Self>, task: RunningTask, reason: TurnAbortReason) {
|
||||
let sub_id = task.turn_context.sub_id.clone();
|
||||
if task.cancellation_token.is_cancelled() {
|
||||
return;
|
||||
}
|
||||
|
||||
trace!(task_kind = ?task.kind, sub_id, "aborting running task");
|
||||
task.cancellation_token.cancel();
|
||||
task.turn_context
|
||||
|
||||
@@ -239,3 +239,59 @@ async fn interrupt_persists_turn_aborted_marker_in_next_request() {
|
||||
"expected <turn_aborted> marker in follow-up request"
|
||||
);
|
||||
}
|
||||
|
||||
/// Interrupting a turn while a tool-produced follow-up is pending must not
|
||||
/// start another model request before the session reports TurnAborted.
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn interrupt_does_not_issue_follow_up_request() {
|
||||
let command = "sleep 60";
|
||||
let call_id = "call-no-follow-up";
|
||||
|
||||
let args = json!({
|
||||
"command": command,
|
||||
"timeout_ms": 60_000
|
||||
})
|
||||
.to_string();
|
||||
let first_body = sse(vec![
|
||||
ev_response_created("resp-no-follow-up"),
|
||||
ev_function_call(call_id, "shell_command", &args),
|
||||
ev_completed("resp-no-follow-up"),
|
||||
]);
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let response_mock = mount_sse_once(&server, first_body).await;
|
||||
|
||||
let fixture = test_codex()
|
||||
.with_model("gpt-5.1")
|
||||
.build(&server)
|
||||
.await
|
||||
.unwrap();
|
||||
let codex = Arc::clone(&fixture.codex);
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "start interrupt follow-up guard".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecCommandBegin(_))).await;
|
||||
|
||||
tokio::time::sleep(Duration::from_secs_f32(0.1)).await;
|
||||
codex.submit(Op::Interrupt).await.unwrap();
|
||||
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnAborted(_))).await;
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
|
||||
let requests = response_mock.requests();
|
||||
assert_eq!(
|
||||
requests.len(),
|
||||
1,
|
||||
"interrupt should not issue a follow-up responses request"
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user