Compare commits

...

7 Commits

Author SHA1 Message Date
starr-openai
fae6cd5410 Serialize Windows Rust test harness in CI
Limit Windows CI to one Rust test thread per test binary so the split app-server shards do not multiply child-process-heavy test concurrency on the runner.

Co-authored-by: Codex <noreply@openai.com>
2026-04-15 13:33:51 -07:00
starr-openai
d43c72889c Cap macOS CI local test concurrency
macOS Bazel CI runs build actions remotely but test actions locally. After splitting app-server tests into separate binaries, cap local test-process fanout on macOS as well so server-spawning integration tests do not contend heavily on the runner.

Co-authored-by: Codex <noreply@openai.com>
2026-04-15 13:16:07 -07:00
starr-openai
9e2a286f24 Fix unsubscribe test lint failures
Remove stale imports and the unused response-count stabilization helper after switching the unsubscribe test to gated streaming SSE. Add the required argument comment for the request-count literal.

Co-authored-by: Codex <noreply@openai.com>
2026-04-15 13:06:46 -07:00
starr-openai
e55a86e909 Gate thread unsubscribe test on streaming SSE
Replace shell-command blocking in the unsubscribe-during-turn test with a gated streaming SSE response. This keeps the turn in progress without filesystem polling, shell sleeps, or Windows path translation. Add a request-count notification helper to the streaming SSE test server so tests can wait for the model request deterministically.

Co-authored-by: Codex <noreply@openai.com>
2026-04-15 12:49:18 -07:00
starr-openai
179d7f8732 Make thread unsubscribe Windows test deterministic
Replace the one-second sleep in the unsubscribe-during-turn test with a sentinel-file command so the test unsubscribes while command execution is definitely still running. Also use Bazel's printed FAIL test.log path when tailing failed CI logs, which fixes Windows MSVC output-path mismatches.

Co-authored-by: Codex <noreply@openai.com>
2026-04-15 12:20:46 -07:00
starr-openai
cc9c43514f Cap Windows CI local test concurrency
Windows test actions run locally in CI, and several Rust integration test binaries spawn subprocesses or servers. Limit local test-process fanout so sharded tests contend less on the runner while preserving the broader Bazel job count for scheduling and cache/download work.

Co-authored-by: Codex <noreply@openai.com>
2026-04-15 12:01:31 -07:00
starr-openai
de0e824794 Split app-server integration tests into shards
Replace the single all.rs integration root with smaller roots grouped by area so Bazel and CI can schedule them as independent app-server test targets.

Co-authored-by: Codex <noreply@openai.com>
2026-04-15 11:44:15 -07:00
13 changed files with 194 additions and 163 deletions

View File

@@ -125,6 +125,15 @@ build:argument-comment-lint --@rules_rust//rust/toolchain/channel=nightly
common:ci-windows --config=ci-bazel
common:ci-windows --build_metadata=TAG_os=windows
common:ci-windows --repo_contents_cache=D:/a/.cache/bazel-repo-contents-cache
# Windows tests run locally, and several Rust integration test binaries spawn
# subprocesses/servers. Keep local test-process fanout lower than the overall
# Bazel job count so sharded tests do not contend as heavily on the runner.
common:ci-windows --local_test_jobs=2
# Also keep Rust's per-test-binary harness serial on Windows. The app-server
# shards spawn many child processes internally; splitting the old giant test into
# multiple Bazel targets otherwise multiplies both Bazel-level and harness-level
# concurrency on the constrained Windows runner.
common:ci-windows --test_env=RUST_TEST_THREADS=1
# We prefer to run the build actions entirely remotely so we can dial up the concurrency.
# We have platform-specific tests, so we want to execute the tests on all platforms using the strongest sandboxing available on each platform.
@@ -143,6 +152,7 @@ common:ci-macos --build_metadata=TAG_os=macos
common:ci-macos --config=remote
common:ci-macos --strategy=remote
common:ci-macos --strategy=TestRunner=darwin-sandbox,local
common:ci-macos --local_test_jobs=2
# Linux-only V8 CI config.
common:ci-v8 --config=ci

View File

@@ -94,6 +94,17 @@ print_bazel_test_log_tails() {
local rel_path="${target#//}"
rel_path="${rel_path/://}"
local test_log="${testlogs_dir}/${rel_path}/test.log"
local printed_test_log
printed_test_log="$(
grep -F "FAIL: ${target} " "$console_log" \
| sed -nE 's#.*\(see ([^)]+/test\.log)\).*#\1#p' \
| tr -d '\r' \
| head -n 1
)"
if [[ -n "$printed_test_log" ]]; then
test_log="$printed_test_log"
fi
echo "::group::Bazel test log tail for ${target}"
if [[ -f "$test_log" ]]; then

View File

@@ -1,3 +0,0 @@
// Single integration test binary that aggregates all test modules.
// The submodules live in `tests/suite/`.
mod suite;

View File

@@ -0,0 +1,10 @@
// Integration tests for legacy/non-v2 app-server coverage.
//
// Each file in `tests/` becomes its own Bazel integration-test target, so keep
// this split in sync with the generated target names expected by CI.
#[path = "suite/auth.rs"]
mod auth;
#[path = "suite/conversation_summary.rs"]
mod conversation_summary;
#[path = "suite/fuzzy_file_search.rs"]
mod fuzzy_file_search;

View File

@@ -1,4 +0,0 @@
mod auth;
mod conversation_summary;
mod fuzzy_file_search;
mod v2;

View File

@@ -1,58 +0,0 @@
mod account;
mod analytics;
mod app_list;
mod client_metadata;
mod collaboration_mode_list;
#[cfg(unix)]
mod command_exec;
mod compaction;
mod config_rpc;
mod connection_handling_websocket;
#[cfg(unix)]
mod connection_handling_websocket_unix;
mod dynamic_tools;
mod experimental_api;
mod experimental_feature_list;
mod fs;
mod initialize;
mod marketplace_add;
mod mcp_resource;
mod mcp_server_elicitation;
mod mcp_server_status;
mod mcp_tool;
mod memory_reset;
mod model_list;
mod output_schema;
mod plan_item;
mod plugin_install;
mod plugin_list;
mod plugin_read;
mod plugin_uninstall;
mod rate_limits;
mod realtime_conversation;
mod request_permissions;
mod request_user_input;
mod review;
mod safety_check_downgrade;
mod skills_list;
mod thread_archive;
mod thread_fork;
mod thread_inject_items;
mod thread_list;
mod thread_loaded_list;
mod thread_memory_mode_set;
mod thread_metadata_update;
mod thread_name_websocket;
mod thread_read;
mod thread_resume;
mod thread_rollback;
mod thread_shell_command;
mod thread_start;
mod thread_status;
mod thread_unarchive;
mod thread_unsubscribe;
mod turn_interrupt;
mod turn_start;
mod turn_start_zsh_fork;
mod turn_steer;
mod windows_sandbox_setup;

View File

@@ -1,15 +1,9 @@
use anyhow::Context;
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::create_mock_responses_server_sequence_unchecked;
use app_test_support::create_shell_command_sse_response;
use app_test_support::to_response;
use codex_app_server_protocol::ItemStartedNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadLoadedListParams;
use codex_app_server_protocol::ThreadLoadedListResponse;
use codex_app_server_protocol::ThreadReadParams;
@@ -26,57 +20,15 @@ use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::UserInput as V2UserInput;
use core_test_support::responses;
use core_test_support::streaming_sse::StreamingSseChunk;
use core_test_support::streaming_sse::start_streaming_sse_server;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
use tokio::sync::oneshot;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
async fn wait_for_responses_request_count_to_stabilize(
server: &wiremock::MockServer,
expected_count: usize,
settle_duration: std::time::Duration,
) -> Result<()> {
timeout(DEFAULT_READ_TIMEOUT, async {
let mut stable_since: Option<tokio::time::Instant> = None;
loop {
let requests = server
.received_requests()
.await
.context("failed to fetch received requests")?;
let responses_request_count = requests
.iter()
.filter(|request| {
request.method == "POST" && request.url.path().ends_with("/responses")
})
.count();
if responses_request_count > expected_count {
anyhow::bail!(
"expected exactly {expected_count} /responses requests, got {responses_request_count}"
);
}
if responses_request_count == expected_count {
match stable_since {
Some(stable_since) if stable_since.elapsed() >= settle_duration => {
return Ok::<(), anyhow::Error>(());
}
None => stable_since = Some(tokio::time::Instant::now()),
Some(_) => {}
}
} else {
stable_since = None;
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
})
.await??;
Ok(())
}
#[tokio::test]
async fn thread_unsubscribe_keeps_thread_loaded_until_idle_timeout() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
@@ -128,32 +80,24 @@ async fn thread_unsubscribe_keeps_thread_loaded_until_idle_timeout() -> Result<(
#[tokio::test]
async fn thread_unsubscribe_during_turn_keeps_turn_running() -> Result<()> {
#[cfg(target_os = "windows")]
let shell_command = vec![
"powershell".to_string(),
"-Command".to_string(),
"Start-Sleep -Seconds 1".to_string(),
];
#[cfg(not(target_os = "windows"))]
let shell_command = vec!["sleep".to_string(), "1".to_string()];
let tmp = TempDir::new()?;
let codex_home = tmp.path().join("codex_home");
std::fs::create_dir(&codex_home)?;
let working_directory = tmp.path().join("workdir");
std::fs::create_dir(&working_directory)?;
let server = create_mock_responses_server_sequence_unchecked(vec![
create_shell_command_sse_response(
shell_command.clone(),
Some(&working_directory),
Some(10_000),
"call_sleep",
)?,
create_final_assistant_message_sse_response("Done")?,
])
let (release_response_tx, release_response_rx) = oneshot::channel();
let (server, mut completions) = start_streaming_sse_server(vec![vec![StreamingSseChunk {
gate: Some(release_response_rx),
body: responses::sse(vec![
responses::ev_response_created("resp-1"),
responses::ev_assistant_message("msg-1", "Done"),
responses::ev_completed("resp-1"),
]),
}]])
.await;
create_config_toml(&codex_home, &server.uri())?;
let response_completed = completions.remove(0);
create_config_toml(&codex_home, server.uri())?;
let mut mcp = McpProcess::new(&codex_home).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
@@ -180,9 +124,9 @@ async fn thread_unsubscribe_during_turn_keeps_turn_running() -> Result<()> {
timeout(
DEFAULT_READ_TIMEOUT,
wait_for_command_execution_item_started(&mut mcp),
server.wait_for_request_count(/*count*/ 1),
)
.await??;
.await?;
let unsubscribe_id = mcp
.send_thread_unsubscribe_request(ThreadUnsubscribeParams {
@@ -197,21 +141,16 @@ async fn thread_unsubscribe_during_turn_keeps_turn_running() -> Result<()> {
let unsubscribe = to_response::<ThreadUnsubscribeResponse>(unsubscribe_resp)?;
assert_eq!(unsubscribe.status, ThreadUnsubscribeStatus::Unsubscribed);
assert!(
timeout(
std::time::Duration::from_millis(250),
mcp.read_stream_until_notification_message("thread/closed"),
)
.await
.is_err()
let closed_while_command_running = timeout(
std::time::Duration::from_millis(250),
mcp.read_stream_until_notification_message("thread/closed"),
);
let closed_while_command_running = closed_while_command_running.await;
let _ = release_response_tx.send(());
assert!(closed_while_command_running.is_err());
wait_for_responses_request_count_to_stabilize(
&server,
/*expected_count*/ 2,
std::time::Duration::from_millis(200),
)
.await?;
timeout(DEFAULT_READ_TIMEOUT, response_completed).await??;
server.shutdown().await;
Ok(())
}
@@ -350,19 +289,6 @@ async fn thread_unsubscribe_reports_not_subscribed_before_idle_unload() -> Resul
Ok(())
}
async fn wait_for_command_execution_item_started(mcp: &mut McpProcess) -> Result<()> {
loop {
let started_notif = mcp
.read_stream_until_notification_message("item/started")
.await?;
let started_params = started_notif.params.context("item/started params")?;
let started: ItemStartedNotification = serde_json::from_value(started_params)?;
if let ThreadItem::CommandExecution { .. } = started.item {
return Ok(());
}
}
}
fn create_config_toml(codex_home: &std::path::Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(

View File

@@ -0,0 +1,46 @@
// Core v2 app-server integration tests that do not depend on the thread/turn
// analytics or websocket helper modules.
#[path = "suite/v2/account.rs"]
mod account;
#[path = "suite/v2/app_list.rs"]
mod app_list;
#[path = "suite/v2/client_metadata.rs"]
mod client_metadata;
#[path = "suite/v2/collaboration_mode_list.rs"]
mod collaboration_mode_list;
#[path = "suite/v2/compaction.rs"]
mod compaction;
#[path = "suite/v2/config_rpc.rs"]
mod config_rpc;
#[path = "suite/v2/dynamic_tools.rs"]
mod dynamic_tools;
#[path = "suite/v2/experimental_api.rs"]
mod experimental_api;
#[path = "suite/v2/experimental_feature_list.rs"]
mod experimental_feature_list;
#[path = "suite/v2/fs.rs"]
mod fs;
#[path = "suite/v2/initialize.rs"]
mod initialize;
#[path = "suite/v2/memory_reset.rs"]
mod memory_reset;
#[path = "suite/v2/model_list.rs"]
mod model_list;
#[path = "suite/v2/output_schema.rs"]
mod output_schema;
#[path = "suite/v2/plan_item.rs"]
mod plan_item;
#[path = "suite/v2/rate_limits.rs"]
mod rate_limits;
#[path = "suite/v2/request_permissions.rs"]
mod request_permissions;
#[path = "suite/v2/request_user_input.rs"]
mod request_user_input;
#[path = "suite/v2/review.rs"]
mod review;
#[path = "suite/v2/safety_check_downgrade.rs"]
mod safety_check_downgrade;
#[path = "suite/v2/skills_list.rs"]
mod skills_list;
#[path = "suite/v2/windows_sandbox_setup.rs"]
mod windows_sandbox_setup;

View File

@@ -0,0 +1,19 @@
// v2 app-server plugin and MCP integration tests.
#[path = "suite/v2/marketplace_add.rs"]
mod marketplace_add;
#[path = "suite/v2/mcp_resource.rs"]
mod mcp_resource;
#[path = "suite/v2/mcp_server_elicitation.rs"]
mod mcp_server_elicitation;
#[path = "suite/v2/mcp_server_status.rs"]
mod mcp_server_status;
#[path = "suite/v2/mcp_tool.rs"]
mod mcp_tool;
#[path = "suite/v2/plugin_install.rs"]
mod plugin_install;
#[path = "suite/v2/plugin_list.rs"]
mod plugin_list;
#[path = "suite/v2/plugin_read.rs"]
mod plugin_read;
#[path = "suite/v2/plugin_uninstall.rs"]
mod plugin_uninstall;

View File

@@ -0,0 +1,4 @@
// v2 realtime integration tests, split out because they are comparatively
// large and expensive.
#[path = "suite/v2/realtime_conversation.rs"]
mod realtime_conversation;

View File

@@ -0,0 +1,42 @@
// v2 thread and turn integration tests. Keep `analytics` in this shard because
// several thread/turn modules import its helpers via `super::analytics`.
#[path = "suite/v2/analytics.rs"]
mod analytics;
#[path = "suite/v2/thread_archive.rs"]
mod thread_archive;
#[path = "suite/v2/thread_fork.rs"]
mod thread_fork;
#[path = "suite/v2/thread_inject_items.rs"]
mod thread_inject_items;
#[path = "suite/v2/thread_list.rs"]
mod thread_list;
#[path = "suite/v2/thread_loaded_list.rs"]
mod thread_loaded_list;
#[path = "suite/v2/thread_memory_mode_set.rs"]
mod thread_memory_mode_set;
#[path = "suite/v2/thread_metadata_update.rs"]
mod thread_metadata_update;
#[path = "suite/v2/thread_read.rs"]
mod thread_read;
#[path = "suite/v2/thread_resume.rs"]
mod thread_resume;
#[path = "suite/v2/thread_rollback.rs"]
mod thread_rollback;
#[path = "suite/v2/thread_shell_command.rs"]
mod thread_shell_command;
#[path = "suite/v2/thread_start.rs"]
mod thread_start;
#[path = "suite/v2/thread_status.rs"]
mod thread_status;
#[path = "suite/v2/thread_unarchive.rs"]
mod thread_unarchive;
#[path = "suite/v2/thread_unsubscribe.rs"]
mod thread_unsubscribe;
#[path = "suite/v2/turn_interrupt.rs"]
mod turn_interrupt;
#[path = "suite/v2/turn_start.rs"]
mod turn_start;
#[path = "suite/v2/turn_start_zsh_fork.rs"]
mod turn_start_zsh_fork;
#[path = "suite/v2/turn_steer.rs"]
mod turn_steer;

View File

@@ -0,0 +1,12 @@
// v2 websocket integration tests. Keep the websocket helper module in the same
// shard as modules that import it via `super::connection_handling_websocket`.
#[cfg(unix)]
#[path = "suite/v2/command_exec.rs"]
mod command_exec;
#[path = "suite/v2/connection_handling_websocket.rs"]
mod connection_handling_websocket;
#[cfg(unix)]
#[path = "suite/v2/connection_handling_websocket_unix.rs"]
mod connection_handling_websocket_unix;
#[path = "suite/v2/thread_name_websocket.rs"]
mod thread_name_websocket;

View File

@@ -7,6 +7,7 @@ use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
use tokio::sync::Mutex as TokioMutex;
use tokio::sync::Notify;
use tokio::sync::oneshot;
/// Streaming SSE chunk payload gated by a per-chunk signal.
@@ -20,6 +21,7 @@ pub struct StreamingSseChunk {
pub struct StreamingSseServer {
uri: String,
requests: Arc<TokioMutex<Vec<Vec<u8>>>>,
request_notify: Arc<Notify>,
shutdown: oneshot::Sender<()>,
task: tokio::task::JoinHandle<()>,
}
@@ -33,6 +35,15 @@ impl StreamingSseServer {
self.requests.lock().await.clone()
}
pub async fn wait_for_request_count(&self, count: usize) {
loop {
if self.requests.lock().await.len() >= count {
return;
}
self.request_notify.notified().await;
}
}
pub async fn shutdown(self) {
let _ = self.shutdown.send(());
let _ = self.task.await;
@@ -67,7 +78,9 @@ pub async fn start_streaming_sse_server(
completions: VecDeque::from(completion_senders),
}));
let requests = Arc::new(TokioMutex::new(Vec::new()));
let request_notify = Arc::new(Notify::new());
let requests_for_task = Arc::clone(&requests);
let request_notify_for_task = Arc::clone(&request_notify);
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
let task = tokio::spawn(async move {
@@ -78,6 +91,7 @@ pub async fn start_streaming_sse_server(
let (mut stream, _) = accept_res.expect("accept streaming SSE connection");
let state = Arc::clone(&state);
let requests = Arc::clone(&requests_for_task);
let request_notify = Arc::clone(&request_notify_for_task);
tokio::spawn(async move {
let (request, body_prefix) = read_http_request(&mut stream).await;
let Some((method, path)) = parse_request_line(&request) else {
@@ -113,6 +127,7 @@ pub async fn start_streaming_sse_server(
}
};
requests.lock().await.push(body);
request_notify.notify_one();
let Some((chunks, completion)) = take_next_stream(&state).await else {
let _ = write_http_response(&mut stream, /*status*/ 500, "no responses queued", "text/plain").await;
return;
@@ -149,6 +164,7 @@ pub async fn start_streaming_sse_server(
StreamingSseServer {
uri,
requests,
request_notify,
shutdown: shutdown_tx,
task,
},