diff --git a/codex-rs/.config/nextest.toml b/codex-rs/.config/nextest.toml index 1f18aebffa..3f045fbee7 100644 --- a/codex-rs/.config/nextest.toml +++ b/codex-rs/.config/nextest.toml @@ -14,6 +14,9 @@ max-threads = 1 [test-groups.core_remote_environment_integration] max-threads = 1 +[test-groups.core_thread_rollback_integration] +max-threads = 1 + [test-groups.windows_sandbox_legacy_sessions] max-threads = 1 @@ -49,6 +52,12 @@ test-group = 'core_apply_patch_cli_integration' filter = 'package(codex-core) & kind(test) & (test(remote_env) | test(view_image_routes_to_selected_remote_environment) | test(unified_exec_network_denial_emits_failed_background_end_event) | test(unified_exec_short_lived_network_denial_emits_failed_end_event))' test-group = 'core_remote_environment_integration' +[[profile.default.overrides]] +# These tests drive rollback through full mocked turns and can contend with each +# other while waiting for append-only history rewrites on saturated CI runners. +filter = 'package(codex-core) & kind(test) & (test(snapshot_rollback_followup_turn_trims_context_updates) | test(snapshot_rollback_past_compaction_replays_append_only_history) | test(thread_rollback_after_generated_image_drops_entire_image_turn_history))' +test-group = 'core_thread_rollback_integration' + [[profile.default.overrides]] # These tests create restricted-token Windows child processes and private desktops. # Serialize them to avoid exhausting Windows session/global desktop resources in CI. diff --git a/codex-rs/core/tests/common/test_codex.rs b/codex-rs/core/tests/common/test_codex.rs index 706acde960..1786cdf982 100644 --- a/codex-rs/core/tests/common/test_codex.rs +++ b/codex-rs/core/tests/common/test_codex.rs @@ -693,6 +693,38 @@ impl TestCodex { .await } + pub async fn submit_turn_with_environments_no_wait( + &self, + prompt: &str, + environments: Option>, + ) -> Result<()> { + let (sandbox_policy, permission_profile) = + turn_permission_fields(PermissionProfile::Disabled, self.config.cwd.as_path()); + let session_model = self.session_configured.model.clone(); + self.codex + .submit(Op::UserTurn { + environments, + items: vec![UserInput::Text { + text: prompt.into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + cwd: self.config.cwd.to_path_buf(), + approval_policy: AskForApproval::Never, + approvals_reviewer: None, + sandbox_policy, + permission_profile, + model: session_model, + effort: None, + summary: None, + service_tier: None, + collaboration_mode: None, + personality: None, + }) + .await?; + Ok(()) + } + async fn submit_turn_with_permission_profile_context( &self, prompt: &str, diff --git a/codex-rs/core/tests/suite/compact_resume_fork.rs b/codex-rs/core/tests/suite/compact_resume_fork.rs index 77489cd13a..b707e3e3ca 100644 --- a/codex-rs/core/tests/suite/compact_resume_fork.rs +++ b/codex-rs/core/tests/suite/compact_resume_fork.rs @@ -46,6 +46,7 @@ use wiremock::MockServer; const AFTER_SECOND_RESUME: &str = "AFTER_SECOND_RESUME"; const AFTER_ROLLBACK: &str = "AFTER_ROLLBACK"; +const THREAD_ROLLBACK_EVENT_TIMEOUT: Duration = Duration::from_secs(25); fn network_disabled() -> bool { std::env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() @@ -460,7 +461,7 @@ async fn snapshot_rollback_past_compaction_replays_append_only_history() -> Resu let rollback_event = wait_for_event_with_timeout( &base, |ev| matches!(ev, EventMsg::ThreadRolledBack(_)), - Duration::from_secs(20), + THREAD_ROLLBACK_EVENT_TIMEOUT, ) .await; let EventMsg::ThreadRolledBack(rollback_event) = rollback_event else { @@ -582,9 +583,11 @@ async fn snapshot_rollback_followup_turn_trims_context_updates() -> Result<()> { conversation .submit(Op::ThreadRollback { num_turns: 1 }) .await?; - let rollback_event = wait_for_event(&conversation, |ev| { - matches!(ev, EventMsg::ThreadRolledBack(_)) - }) + let rollback_event = wait_for_event_with_timeout( + &conversation, + |ev| matches!(ev, EventMsg::ThreadRolledBack(_)), + THREAD_ROLLBACK_EVENT_TIMEOUT, + ) .await; let EventMsg::ThreadRolledBack(rollback_event) = rollback_event else { panic!("expected thread rolled back event"); diff --git a/codex-rs/core/tests/suite/model_switching.rs b/codex-rs/core/tests/suite/model_switching.rs index d13e7c7289..305162e90c 100644 --- a/codex-rs/core/tests/suite/model_switching.rs +++ b/codex-rs/core/tests/suite/model_switching.rs @@ -41,6 +41,8 @@ use std::path::PathBuf; use tokio::time::Duration; use wiremock::MockServer; +const THREAD_ROLLBACK_EVENT_TIMEOUT: Duration = Duration::from_secs(25); + fn read_only_user_turn(test: &TestCodex, items: Vec, model: String) -> Op { let (sandbox_policy, permission_profile) = turn_permission_fields(PermissionProfile::read_only(), test.cwd_path()); @@ -799,7 +801,7 @@ async fn thread_rollback_after_generated_image_drops_entire_image_turn_history() wait_for_event_with_timeout( &test.codex, |ev| matches!(ev, EventMsg::ThreadRolledBack(_)), - Duration::from_secs(20), + THREAD_ROLLBACK_EVENT_TIMEOUT, ) .await; diff --git a/codex-rs/core/tests/suite/remote_env.rs b/codex-rs/core/tests/suite/remote_env.rs index 0bd449188c..cb8e0761ca 100644 --- a/codex-rs/core/tests/suite/remote_env.rs +++ b/codex-rs/core/tests/suite/remote_env.rs @@ -38,6 +38,23 @@ use std::process::Command; use std::time::SystemTime; use std::time::UNIX_EPOCH; use tempfile::TempDir; +use tokio::time::Duration; + +async fn wait_for_function_call_output( + response_mock: &core_test_support::responses::ResponseMock, + call_id: &str, +) -> Result { + tokio::time::timeout(Duration::from_secs(25), async { + loop { + if let Some(output) = response_mock.function_call_output_text(call_id) { + return output; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + }) + .await + .with_context(|| format!("timed out waiting for function_call_output for {call_id}")) +} async fn unified_exec_test(server: &wiremock::MockServer) -> Result { let mut builder = test_codex().with_config(|config| { config.use_experimental_unified_exec_tool = true; @@ -174,12 +191,10 @@ async fn exec_command_routing_output( ) .await; - test.submit_turn_with_environments("route exec command", environments) + test.submit_turn_with_environments_no_wait("route exec command", environments) .await?; - response_mock - .function_call_output_text(call_id) - .with_context(|| format!("missing function_call_output for {call_id}")) + wait_for_function_call_output(&response_mock, call_id).await } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] diff --git a/codex-rs/core/tests/suite/view_image.rs b/codex-rs/core/tests/suite/view_image.rs index cf06f94792..2b56369ef0 100644 --- a/codex-rs/core/tests/suite/view_image.rs +++ b/codex-rs/core/tests/suite/view_image.rs @@ -64,6 +64,26 @@ use wiremock::matchers::body_string_contains; const VIEW_IMAGE_TURN_COMPLETE_TIMEOUT: Duration = Duration::from_secs(30); +async fn wait_for_function_call_output( + response_mock: &responses::ResponseMock, + call_id: &str, +) -> anyhow::Result { + tokio::time::timeout(Duration::from_secs(25), async { + loop { + if let Some(output) = response_mock + .requests() + .iter() + .find_map(|request| request.function_call_output(call_id).cloned()) + { + return output; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + }) + .await + .with_context(|| format!("timed out waiting for function_call_output for {call_id}")) +} + fn disabled_user_turn(test: &TestCodex, items: Vec, model: String) -> Op { let (sandbox_policy, permission_profile) = turn_permission_fields(PermissionProfile::Disabled, test.config.cwd.as_path()); @@ -537,17 +557,13 @@ async fn view_image_routes_to_selected_remote_environment() -> anyhow::Result<() ) .await; - test.submit_turn_with_environments( + test.submit_turn_with_environments_no_wait( "route view image", Some(vec![local_selection, remote_selection]), ) .await?; - let output = response_mock - .last_request() - .context("missing request containing view_image output")? - .function_call_output(call_id) - .clone(); + let output = wait_for_function_call_output(&response_mock, call_id).await?; let output_items = output .get("output") .and_then(Value::as_array)