mirror of
https://github.com/openai/codex.git
synced 2026-04-26 15:45:02 +00:00
app-server: deflake running thread resume tests (#13047)
## Why CI has been intermittently failing in `suite::v2::thread_resume::thread_resume_rejoins_running_thread_even_with_override_mismatch` because these running-thread resume tests treated `turn/started` as proof that the thread was already active. That signal is too early for this path. `turn/started` is emitted optimistically from [`turn_start`](1103d0037e/codex-rs/app-server/src/codex_message_processor.rs (L5757-L5767)). In `single_client_mode`, the listener skips `current_turn_history` tracking in [`codex_message_processor.rs`](1103d0037e/codex-rs/app-server/src/codex_message_processor.rs (L6461-L6465)), so running-thread resume still depends on `ThreadWatchManager` observing the core `TurnStarted` event in [`bespoke_event_handling.rs`](1103d0037e/codex-rs/app-server/src/bespoke_event_handling.rs (L152-L156)). If `thread/resume` lands in that window, the thread can still look `Idle` and the assertion flakes. ## What - Add a helper in `codex-rs/app-server/tests/suite/v2/thread_resume.rs` that waits for `thread/status/changed` to report `Active` for the target thread. - Use that public v2 notification as the synchronization barrier in the four running-thread resume tests instead of relying on `turn/started`. ## Follow-up This PR keeps the fix at the test layer so we can remove the flake without changing server behavior. A broader runtime fix should still be considered separately, for example: - make `turn/start` eagerly transition the thread to `Active` so `turn/started` and `thread/status/changed` are coherent - or revisit the `single_client_mode` guard that skips current-turn tracking for running-thread resume ## Testing - `cargo test -p codex-app-server thread_resume -- --nocapture` - `for i in $(seq 1 10); do cargo test -p codex-app-server 'suite::v2::thread_resume::thread_resume_rejoins_running_thread_even_with_override_mismatch' -- --exact --nocapture; done`
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_fake_rollout_with_text_elements;
|
||||
@@ -6,6 +7,7 @@ use app_test_support::rollout_path;
|
||||
use app_test_support::to_response;
|
||||
use chrono::Utc;
|
||||
use codex_app_server_protocol::JSONRPCError;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::SessionSource;
|
||||
@@ -15,6 +17,7 @@ use codex_app_server_protocol::ThreadResumeResponse;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::ThreadStatus;
|
||||
use codex_app_server_protocol::ThreadStatusChangedNotification;
|
||||
use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::TurnStartResponse;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
@@ -277,7 +280,7 @@ async fn thread_resume_keeps_in_flight_turn_streaming() -> Result<()> {
|
||||
.await??;
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
primary.read_stream_until_notification_message("turn/started"),
|
||||
wait_for_thread_status_active(&mut primary, &thread.id),
|
||||
)
|
||||
.await??;
|
||||
|
||||
@@ -384,7 +387,7 @@ async fn thread_resume_rejects_history_when_thread_is_running() -> Result<()> {
|
||||
to_response::<TurnStartResponse>(running_turn_resp)?;
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
primary.read_stream_until_notification_message("turn/started"),
|
||||
wait_for_thread_status_active(&mut primary, &thread_id),
|
||||
)
|
||||
.await??;
|
||||
|
||||
@@ -500,7 +503,7 @@ async fn thread_resume_rejects_mismatched_path_when_thread_is_running() -> Resul
|
||||
to_response::<TurnStartResponse>(running_turn_resp)?;
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
primary.read_stream_until_notification_message("turn/started"),
|
||||
wait_for_thread_status_active(&mut primary, &thread_id),
|
||||
)
|
||||
.await??;
|
||||
|
||||
@@ -603,7 +606,7 @@ async fn thread_resume_rejoins_running_thread_even_with_override_mismatch() -> R
|
||||
.await??;
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
primary.read_stream_until_notification_message("turn/started"),
|
||||
wait_for_thread_status_active(&mut primary, &thread.id),
|
||||
)
|
||||
.await??;
|
||||
|
||||
@@ -1103,6 +1106,30 @@ required = true
|
||||
)
|
||||
}
|
||||
|
||||
async fn wait_for_thread_status_active(
|
||||
mcp: &mut McpProcess,
|
||||
thread_id: &str,
|
||||
) -> Result<ThreadStatusChangedNotification> {
|
||||
loop {
|
||||
let status_changed_notif: JSONRPCNotification = mcp
|
||||
.read_stream_until_notification_message("thread/status/changed")
|
||||
.await?;
|
||||
let status_changed_params = status_changed_notif
|
||||
.params
|
||||
.context("thread/status/changed params must be present")?;
|
||||
let status_changed: ThreadStatusChangedNotification =
|
||||
serde_json::from_value(status_changed_params)?;
|
||||
if status_changed.thread_id == thread_id
|
||||
&& status_changed.status
|
||||
== (ThreadStatus::Active {
|
||||
active_flags: Vec::new(),
|
||||
})
|
||||
{
|
||||
return Ok(status_changed);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn set_rollout_mtime(path: &Path, updated_at_rfc3339: &str) -> Result<()> {
|
||||
let parsed = chrono::DateTime::parse_from_rfc3339(updated_at_rfc3339)?.with_timezone(&Utc);
|
||||
|
||||
Reference in New Issue
Block a user