Compare commits

...

15 Commits

Author SHA1 Message Date
starr-openai
5815dd6a4b Give Windows arm64 tests enough CI time
Let the Windows arm64 test matrix use a longer timeout after CI showed the lane spending most of the default 45 minutes compiling before nextest could finish.

Also pin nextest through taiki-e/install-action's supported tool version syntax so the requested version is not ignored.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 15:20:39 -07:00
starr-openai
296fa6df0c Serialize Windows process-heavy nextest cases
Windows rust-ci-full repeatedly times out in subprocess-heavy tests even when the global nextest thread count is capped. Isolate the recurring Windows-only families with nextest overrides so the rest of the suite can keep normal parallelism.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 15:20:39 -07:00
starr-openai
64c684bd57 Add Windows nextest thread override for rust-ci-full
Co-authored-by: Codex <noreply@openai.com>
2026-05-07 15:20:39 -07:00
starr-openai
ce5d84e43a Make pending sideband close test deterministic
Replace the realtime websocket accept-delay race with an explicit test-server gate so close is issued while the sideband connection is pending, then prove the closed conversation does not emit stale events or send sideband websocket requests.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 15:20:35 -07:00
starr-openai
926b8d77cd Tolerate transient Windows metadata denial in memory startup test
Keep polling when Windows temporarily denies metadata reads while the phase 2 memory workspace is being cleaned up, so the test still verifies the file is removed and the baseline becomes clean.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 14:48:09 -07:00
starr-openai
7cd5127421 Wait for agent shutdown before resume tests reopen IDs
Subscribe before test shutdown and close operations, then wait for the Shutdown status before resuming the same thread IDs. This removes the Windows live-writer race exposed by the full nextest run.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 14:48:09 -07:00
starr-openai
6a2ce743f1 Make Windows realtime shell test use successful cmd echo
Use a Windows command form that exits successfully in constrained CI shells and trim the expected newline in the delegated realtime shell-tool assertion.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 14:48:08 -07:00
starr-openai
32deb67fc6 Harden Windows realtime and agent resume tests
Avoid PowerShell command forms that depend on method invocation for the delegated realtime shell-tool test, and wait for a shutdown status before resuming the same subagent thread in the nickname/role restore test.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 14:48:08 -07:00
starr-openai
59d9e96d66 Use PowerShell literal output in sandbox tests
The legacy sandbox runs PowerShell in constrained language mode, so method calls fail and module-backed cmdlets may not autoload. Use literal string expressions for the PowerShell I/O smoke tests so they exercise process output without depending on cmdlets or method invocation.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 14:48:08 -07:00
starr-openai
097e3ef949 Avoid PowerShell module autoload in sandbox tests
Windows arm64 can launch pwsh in the legacy sandbox while still failing Write-Output because Microsoft.PowerShell.Utility cannot autoload. Use Console output in the legacy PowerShell smoke tests so they continue to verify sandbox process I/O without depending on module autoload.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 14:48:07 -07:00
starr-openai
f3afa1132d Fix rollout cwd fixture import
Import the Windows-aware test_path_buf helper from core_test_support where it is defined.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 14:48:07 -07:00
starr-openai
a666109389 Make rollout cwd fixtures drive-stable on Windows
Dev Drive setup can put temporary Codex homes on D:, which exposed test fixtures that wrote root-relative '/' rollout cwd values while assertions expected the Windows-aware C:\ root helper. Use the same test_path_buf helper when creating and expecting fake rollout cwd values so the tests remain independent of the process temp drive.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 14:48:07 -07:00
starr-openai
16648c8d1c Make realtime sideband failure test deterministic
Use the existing mock server as the sideband failure endpoint instead of relying on an OS-level connection refusal from 127.0.0.1:1. Disable retries in this failure-path test so Windows CI does not spend the default retry budget before emitting the expected error/close events.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 14:48:06 -07:00
starr-openai
7d2c8dbec4 Fix agent job worker assignment race
Claim job items before spawning workers and allow reports to complete unassigned running items, so fast workers cannot lose stop=true reports before the parent records their thread id.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 14:48:06 -07:00
starr-openai
bfe33e5a7a Make agent job stop cancellation atomic
A worker stop request used to record the item result and job cancellation in separate updates, so the job runner could observe the item completion first and continue spawning pending work. Commit both state updates together and prevent completion from overwriting a final cancellation.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 14:48:05 -07:00
16 changed files with 449 additions and 80 deletions

View File

@@ -1,10 +1,21 @@
name: rust-ci-full
run-name: >-
rust-ci-full${{
github.event_name == 'workflow_dispatch' &&
format(' windows-nextest-{0}', inputs.windows_nextest_threads) ||
''
}}
on:
push:
branches:
- main
- "**full-ci**"
workflow_dispatch:
inputs:
windows_nextest_threads:
description: "Optional nextest --test-threads override for Windows test jobs"
required: false
type: string
# CI builds in debug (dev) for faster signal.
@@ -510,10 +521,10 @@ jobs:
tests:
name: Tests — ${{ matrix.runner }} - ${{ matrix.target }}${{ matrix.remote_env == 'true' && ' (remote)' || '' }}
runs-on: ${{ matrix.runs_on || matrix.runner }}
# Perhaps we can bring this back down to 30m once we finish the cutover
# from tui_app_server/ to tui/. Incidentally, windows-arm64 was the main
# offender for exceeding the timeout.
timeout-minutes: 45
# Perhaps we can bring this back down once we finish the cutover from
# tui_app_server/ to tui/. Incidentally, windows-arm64 was the main offender
# for exceeding the timeout.
timeout-minutes: ${{ matrix.timeout_minutes || 45 }}
defaults:
run:
working-directory: codex-rs
@@ -524,6 +535,7 @@ jobs:
USE_SCCACHE: ${{ (startsWith(matrix.runner, 'windows') || (matrix.runner == 'macos-15-xlarge' && matrix.target == 'x86_64-apple-darwin')) && 'false' || 'true' }}
CARGO_INCREMENTAL: "0"
SCCACHE_CACHE_SIZE: 10G
WINDOWS_NEXTEST_THREADS: ${{ github.event_name == 'workflow_dispatch' && inputs.windows_nextest_threads || '' }}
strategy:
fail-fast: false
@@ -554,6 +566,7 @@ jobs:
- runner: windows-arm64
target: aarch64-pc-windows-msvc
profile: dev
timeout_minutes: 75
runs_on:
group: codex-runners
labels: codex-windows-arm64
@@ -640,8 +653,7 @@ jobs:
- uses: taiki-e/install-action@44c6d64aa62cd779e873306675c7a58e86d6d532 # v2.62.49
with:
tool: nextest
version: 0.9.103
tool: nextest@0.9.103
- name: Enable unprivileged user namespaces (Linux)
if: runner.os == 'Linux'
@@ -666,7 +678,19 @@ jobs:
- name: tests
id: test
run: cargo nextest run --no-fail-fast --target ${{ matrix.target }} --cargo-profile ci-test --timings
shell: bash
run: |
set -euo pipefail
nextest_args=(
--no-fail-fast
--target "${{ matrix.target }}"
--cargo-profile ci-test
--timings
)
if [[ "${{ runner.os }}" == "Windows" && -n "${WINDOWS_NEXTEST_THREADS}" ]]; then
nextest_args+=(--test-threads "${WINDOWS_NEXTEST_THREADS}")
fi
cargo nextest run "${nextest_args[@]}"
env:
RUST_BACKTRACE: 1
RUST_MIN_STACK: "8388608" # 8 MiB

View File

@@ -14,6 +14,9 @@ max-threads = 1
[test-groups.windows_sandbox_legacy_sessions]
max-threads = 1
[test-groups.windows_process_heavy]
max-threads = 1
[[profile.default.overrides]]
# Do not add new tests here
filter = 'test(rmcp_client) | test(humanlike_typing_1000_chars_appears_live_no_placeholder)'
@@ -27,6 +30,41 @@ slow-timeout = { period = "30s", terminate-after = 2 }
filter = 'package(codex-app-server-protocol) & (test(typescript_schema_fixtures_match_generated) | test(json_schema_fixtures_match_generated) | test(generate_ts_with_experimental_api_retains_experimental_entries) | test(generated_ts_optional_nullable_fields_only_in_params) | test(generate_json_filters_experimental_fields_and_methods))'
test-group = 'app_server_protocol_codegen'
[[profile.default.overrides]]
# These Windows CI tests launch full Codex/app-server process trees. They have
# repeatedly timed out when nextest schedules them alongside similar tests.
platform = 'cfg(windows)'
filter = 'package(codex-core) & kind(test) & (test(cli_stream) | test(realtime_conversation))'
test-group = 'windows_process_heavy'
threads-required = "num-test-threads"
slow-timeout = { period = "1m", terminate-after = 4 }
[[profile.default.overrides]]
# The exec resume tests spawn the CLI and touch shared session state on disk.
platform = 'cfg(windows)'
filter = 'package(codex-exec) & kind(test) & test(exec_resume)'
test-group = 'windows_process_heavy'
threads-required = "num-test-threads"
slow-timeout = { period = "1m", terminate-after = 4 }
[[profile.default.overrides]]
# Keep the specific app-server subprocess-heavy cases isolated on Windows. This
# must stay before the broader codex-app-server override below.
platform = 'cfg(windows)'
filter = 'package(codex-app-server) & kind(test) & (test(thread_fork_can_exclude_turns_and_skip_restored_token_usage) | test(turn_start_resolves_sticky_thread_environments_and_turn_overrides) | test(message_processor_tracing_tests))'
test-group = 'windows_process_heavy'
threads-required = "num-test-threads"
slow-timeout = { period = "1m", terminate-after = 4 }
[[profile.default.overrides]]
# These tests create restricted-token Windows child processes and private
# desktops. Running them alone avoids contention with other subprocess tests.
platform = 'cfg(windows)'
filter = 'package(codex-windows-sandbox) & kind(test) & test(legacy_)'
test-group = 'windows_process_heavy'
threads-required = "num-test-threads"
slow-timeout = { period = "1m", terminate-after = 4 }
[[profile.default.overrides]]
# These integration tests spawn a fresh app-server subprocess per case.
# Keep the library unit tests parallel.

View File

@@ -8,6 +8,7 @@ use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::TokenCountEvent;
use codex_protocol::protocol::TokenUsage;
use codex_protocol::protocol::TokenUsageInfo;
use core_test_support::test_path_buf;
use serde_json::json;
use std::fs;
use std::fs::FileTimes;
@@ -134,7 +135,7 @@ pub fn create_fake_rollout_with_source(
id: conversation_id,
forked_from_id: None,
timestamp: meta_rfc3339.to_string(),
cwd: PathBuf::from("/"),
cwd: test_path_buf("/"),
originator: "codex".to_string(),
cli_version: "0.0.0".to_string(),
source,
@@ -218,7 +219,7 @@ pub fn create_fake_rollout_with_text_elements(
id: conversation_id,
forked_from_id: None,
timestamp: meta_rfc3339.to_string(),
cwd: PathBuf::from("/"),
cwd: test_path_buf("/"),
originator: "codex".to_string(),
cli_version: "0.0.0".to_string(),
source: SessionSource::Cli,

View File

@@ -31,6 +31,7 @@ use codex_thread_store::ThreadEventPersistenceMode;
use codex_thread_store::ThreadPersistenceMetadata;
use codex_thread_store::ThreadStore;
use codex_utils_absolute_path::AbsolutePathBuf;
use core_test_support::test_path_buf;
use pretty_assertions::assert_eq;
use std::path::Path;
use std::path::PathBuf;
@@ -56,7 +57,7 @@ fn expected_summary(conversation_id: ThreadId, path: PathBuf) -> ConversationSum
timestamp: Some(CREATED_AT_RFC3339.to_string()),
updated_at: Some(UPDATED_AT_RFC3339.to_string()),
model_provider: MODEL_PROVIDER.to_string(),
cwd: PathBuf::from("/"),
cwd: test_path_buf("/"),
cli_version: "0.0.0".to_string(),
source: SessionSource::Cli,
git_info: None,

View File

@@ -426,6 +426,8 @@ fn realtime_sideband_connection(
WebSocketConnectionConfig {
requests: realtime_server_events,
response_headers: Vec::new(),
accept_started: None,
accept_release: None,
accept_delay: None,
close_after_requests: true,
}
@@ -1044,6 +1046,8 @@ async fn realtime_webrtc_start_emits_sdp_notification() -> Result<()> {
"session": { "id": "sess_webrtc", "instructions": "backend prompt" }
})]],
response_headers: Vec::new(),
accept_started: None,
accept_release: None,
accept_delay: None,
close_after_requests: false,
}])
@@ -1836,7 +1840,10 @@ async fn webrtc_v2_tool_call_delegated_turn_can_execute_shell_tool() -> Result<(
};
assert_eq!(id.as_str(), "shell_call");
assert_eq!(status, CommandExecutionStatus::Completed);
assert_eq!(aggregated_output.as_deref(), Some("realtime-tool-ok"));
assert_eq!(
aggregated_output.as_deref().map(str::trim),
Some("realtime-tool-ok")
);
// Phase 3: verify the shell output reached Responses and the final delegated answer returned
// to realtime as a single function-call-output item.
@@ -2154,10 +2161,10 @@ fn realtime_tool_ok_command() -> Vec<String> {
#[cfg(windows)]
{
vec![
"powershell.exe".to_string(),
"-NoProfile".to_string(),
"-Command".to_string(),
"[Console]::Write('realtime-tool-ok')".to_string(),
"cmd.exe".to_string(),
"/D".to_string(),
"/C".to_string(),
"echo realtime-tool-ok".to_string(),
]
}

View File

@@ -239,6 +239,65 @@ async fn wait_for_live_thread_spawn_children(
.expect("expected persisted child tree");
}
async fn wait_for_agent_shutdown(
thread_id: ThreadId,
mut status_rx: tokio::sync::watch::Receiver<AgentStatus>,
) {
if matches!(status_rx.borrow().clone(), AgentStatus::Shutdown) {
return;
}
timeout(Duration::from_secs(5), async {
loop {
status_rx
.changed()
.await
.unwrap_or_else(|_| panic!("thread {thread_id} status should reach shutdown"));
if matches!(status_rx.borrow().clone(), AgentStatus::Shutdown) {
break;
}
}
})
.await
.unwrap_or_else(|_| panic!("thread {thread_id} should shut down before resume"));
}
async fn shutdown_live_agent_and_wait(control: &AgentControl, thread_id: ThreadId) {
let status_rx = control
.subscribe_status(thread_id)
.await
.expect("status subscription should succeed before shutdown");
let _ = control
.shutdown_live_agent(thread_id)
.await
.expect("thread shutdown should submit");
wait_for_agent_shutdown(thread_id, status_rx).await;
}
async fn close_agent_and_wait(
control: &AgentControl,
agent_id: ThreadId,
shutdown_ids: &[ThreadId],
) {
let mut status_rxs = Vec::with_capacity(shutdown_ids.len());
for thread_id in shutdown_ids {
status_rxs.push((
*thread_id,
control
.subscribe_status(*thread_id)
.await
.expect("status subscription should succeed before close"),
));
}
let _ = control
.close_agent(agent_id)
.await
.expect("agent close should succeed");
for (thread_id, status_rx) in status_rxs {
wait_for_agent_shutdown(thread_id, status_rx).await;
}
}
#[tokio::test]
async fn send_input_errors_when_manager_dropped() {
let control = AgentControl::default();
@@ -1626,11 +1685,9 @@ async fn resume_thread_subagent_restores_stored_nickname_and_role() {
.await
.expect("child thread metadata should be persisted to sqlite before shutdown");
let _ = harness
.control
.shutdown_live_agent(child_thread_id)
.await
.expect("child shutdown should submit");
drop(status_rx);
shutdown_live_agent_and_wait(&harness.control, child_thread_id).await;
drop(child_thread);
let resumed_thread_id = harness
.control
@@ -1699,11 +1756,8 @@ async fn resume_agent_from_rollout_reads_archived_rollout_path() {
.await
.expect("child thread should exist");
persist_thread_for_tree_resume(&child_thread, "persist before archiving").await;
let _ = harness
.control
.shutdown_live_agent(child_thread_id)
.await
.expect("child shutdown should succeed");
shutdown_live_agent_and_wait(&harness.control, child_thread_id).await;
drop(child_thread);
let store = LocalThreadStore::new(
LocalThreadStoreConfig::from_config(&harness.config),
harness.state_db.clone(),
@@ -1993,11 +2047,12 @@ async fn shutdown_agent_tree_closes_descendants_when_started_at_child() {
wait_for_live_thread_spawn_children(&harness.control, child_thread_id, &[grandchild_thread_id])
.await;
let _ = harness
.control
.close_agent(child_thread_id)
.await
.expect("child close should succeed");
close_agent_and_wait(
&harness.control,
child_thread_id,
&[child_thread_id, grandchild_thread_id],
)
.await;
let _ = harness
.control
@@ -2085,16 +2140,14 @@ async fn resume_agent_from_rollout_does_not_reopen_closed_descendants() {
wait_for_live_thread_spawn_children(&harness.control, child_thread_id, &[grandchild_thread_id])
.await;
let _ = harness
.control
.close_agent(child_thread_id)
.await
.expect("child close should succeed");
let _ = harness
.control
.shutdown_live_agent(parent_thread_id)
.await
.expect("parent shutdown should succeed");
close_agent_and_wait(
&harness.control,
child_thread_id,
&[child_thread_id, grandchild_thread_id],
)
.await;
shutdown_live_agent_and_wait(&harness.control, parent_thread_id).await;
drop(parent_thread);
let resumed_parent_thread_id = harness
.control
@@ -2180,11 +2233,12 @@ async fn resume_closed_child_reopens_open_descendants() {
wait_for_live_thread_spawn_children(&harness.control, child_thread_id, &[grandchild_thread_id])
.await;
let _ = harness
.control
.close_agent(child_thread_id)
.await
.expect("child close should succeed");
close_agent_and_wait(
&harness.control,
child_thread_id,
&[child_thread_id, grandchild_thread_id],
)
.await;
let resumed_child_thread_id = harness
.control

View File

@@ -196,6 +196,12 @@ async fn run_agent_job_loop(
)
.await?;
for item in pending_items {
let claimed = db
.mark_agent_job_item_running(job_id.as_str(), item.item_id.as_str())
.await?;
if !claimed {
continue;
}
let prompt = build_worker_prompt(&job, &item)?;
let items = vec![UserInput::Text {
text: prompt,
@@ -240,7 +246,7 @@ async fn run_agent_job_loop(
}
};
let assigned = db
.mark_agent_job_item_running_with_thread(
.set_agent_job_item_thread(
job_id.as_str(),
item.item_id.as_str(),
thread_id.to_string().as_str(),

View File

@@ -61,27 +61,31 @@ pub async fn handle(
}
let db = required_state_db(&session)?;
let reporting_thread_id = session.conversation_id.to_string();
let accepted = db
.report_agent_job_item_result(
let accepted = if args.stop.unwrap_or(false) {
db.report_agent_job_item_result_and_cancel_job(
args.job_id.as_str(),
args.item_id.as_str(),
reporting_thread_id.as_str(),
&args.result,
"cancelled by worker request",
)
.await
} else {
db.report_agent_job_item_result(
args.job_id.as_str(),
args.item_id.as_str(),
reporting_thread_id.as_str(),
&args.result,
)
.await
.map_err(|err| {
let job_id = args.job_id.as_str();
let item_id = args.item_id.as_str();
FunctionCallError::RespondToModel(format!(
"failed to record agent job result for {job_id} / {item_id}: {err}"
))
})?;
if accepted && args.stop.unwrap_or(false) {
let message = "cancelled by worker request";
let _ = db
.mark_agent_job_cancelled(args.job_id.as_str(), message)
.await;
}
.map_err(|err| {
let job_id = args.job_id.as_str();
let item_id = args.item_id.as_str();
FunctionCallError::RespondToModel(format!(
"failed to record agent job result for {job_id} / {item_id}: {err}"
))
})?;
let content =
serde_json::to_string(&ReportAgentJobResultToolResult { accepted }).map_err(|err| {
FunctionCallError::Fatal(format!(

View File

@@ -446,6 +446,11 @@ impl WebSocketHandshake {
pub struct WebSocketConnectionConfig {
pub requests: Vec<Vec<Value>>,
pub response_headers: Vec<(String, String)>,
/// Optional notification fired after the TCP connection is accepted and before the websocket
/// handshake is accepted.
pub accept_started: Option<Arc<Notify>>,
/// Optional gate that blocks websocket handshake acceptance until the notifier is signalled.
pub accept_release: Option<Arc<Notify>>,
/// Optional delay inserted before accepting the websocket handshake.
///
/// Tests use this to force websocket setup into an in-flight state so first-turn warmup paths
@@ -1254,6 +1259,8 @@ pub async fn start_websocket_server(connections: Vec<Vec<Vec<Value>>>) -> WebSoc
.map(|requests| WebSocketConnectionConfig {
requests,
response_headers: Vec::new(),
accept_started: None,
accept_release: None,
accept_delay: None,
close_after_requests: true,
})
@@ -1298,12 +1305,27 @@ pub async fn start_websocket_server_with_headers(
continue;
};
if let Some(accept_started) = &connection.accept_started {
accept_started.notify_one();
}
if let Some(accept_release) = &connection.accept_release {
tokio::select! {
_ = accept_release.notified() => {}
_ = &mut shutdown_rx => return,
}
}
if let Some(delay) = connection.accept_delay {
tokio::time::sleep(delay).await;
tokio::select! {
_ = tokio::time::sleep(delay) => {}
_ = &mut shutdown_rx => return,
}
}
let response_headers = connection.response_headers.clone();
let handshake_log = Arc::clone(&handshakes);
let pending_handshake = Arc::new(Mutex::new(None));
let callback_handshake = Arc::clone(&pending_handshake);
let callback = move |req: &Request, mut response: Response| {
let headers = req
.headers()
@@ -1315,7 +1337,7 @@ pub async fn start_websocket_server_with_headers(
.map(|value| (name.as_str().to_string(), value.to_string()))
})
.collect();
handshake_log.lock().unwrap().push(WebSocketHandshake {
*callback_handshake.lock().unwrap() = Some(WebSocketHandshake {
uri: req.uri().to_string(),
headers,
});
@@ -1344,6 +1366,10 @@ pub async fn start_websocket_server_with_headers(
Err(_) => continue,
};
if let Some(handshake) = pending_handshake.lock().unwrap().take() {
handshakes.lock().unwrap().push(handshake);
}
let connection_index = {
let mut log = requests.lock().unwrap();
log.push(Vec::new());

View File

@@ -121,6 +121,8 @@ async fn websocket_first_turn_handles_handshake_delay_with_startup_prewarm() ->
],
],
response_headers: Vec::new(),
accept_started: None,
accept_release: None,
// Delay handshake so turn processing must tolerate websocket startup latency.
accept_delay: Some(Duration::from_millis(150)),
close_after_requests: true,

View File

@@ -941,6 +941,8 @@ async fn responses_websocket_emits_reasoning_included_event() {
let server = start_websocket_server_with_headers(vec![WebSocketConnectionConfig {
requests: vec![vec![ev_response_created("resp-1"), ev_completed("resp-1")]],
response_headers: vec![("X-Reasoning-Included".to_string(), "true".to_string())],
accept_started: None,
accept_release: None,
accept_delay: None,
close_after_requests: true,
}])
@@ -1015,6 +1017,8 @@ async fn responses_websocket_emits_rate_limit_events() {
("X-Models-Etag".to_string(), "etag-123".to_string()),
("X-Reasoning-Included".to_string(), "true".to_string()),
],
accept_started: None,
accept_release: None,
accept_delay: None,
close_after_requests: true,
}])
@@ -1751,6 +1755,8 @@ async fn responses_websocket_v2_surfaces_terminal_error_without_close_handshake(
})],
],
response_headers: Vec::new(),
accept_started: None,
accept_release: None,
accept_delay: None,
close_after_requests: false,
}])

View File

@@ -48,6 +48,7 @@ use std::process::Command;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use tokio::sync::Notify;
use tokio::sync::oneshot;
use tokio::time::timeout;
use wiremock::Match;
@@ -492,6 +493,8 @@ async fn conversation_webrtc_start_posts_generated_session() -> Result<()> {
vec![],
],
response_headers: Vec::new(),
accept_started: None,
accept_release: None,
accept_delay: Some(sideband_accept_delay),
close_after_requests: false,
}])
@@ -659,10 +662,14 @@ async fn conversation_webrtc_close_while_sideband_connecting_drops_pending_join(
)
.mount(&server)
.await;
let accept_started = Arc::new(Notify::new());
let accept_release = Arc::new(Notify::new());
let realtime_server = start_websocket_server_with_headers(vec![WebSocketConnectionConfig {
requests: vec![vec![]],
response_headers: Vec::new(),
accept_delay: Some(Duration::from_millis(500)),
accept_started: Some(Arc::clone(&accept_started)),
accept_release: Some(Arc::clone(&accept_release)),
accept_delay: None,
close_after_requests: false,
}])
.await;
@@ -699,6 +706,9 @@ async fn conversation_webrtc_close_while_sideband_connecting_drops_pending_join(
realtime_server.handshakes().is_empty(),
"sideband websocket should still be pending when SDP is emitted"
);
timeout(Duration::from_secs(5), accept_started.notified())
.await
.context("sideband websocket should connect before close")?;
test.codex.submit(Op::RealtimeConversationClose).await?;
let closed = wait_for_event_match(&test.codex, |msg| match msg {
@@ -726,9 +736,17 @@ async fn conversation_webrtc_close_while_sideband_connecting_drops_pending_join(
"pending sideband task leaked after close: {:?}",
stale_event.ok()
);
accept_release.notify_one();
let stale_request = timeout(Duration::from_millis(250), async {
realtime_server
.wait_for_request(/*connection_index*/ 0, /*request_index*/ 0)
.await
})
.await;
assert!(
realtime_server.handshakes().is_empty(),
"pending sideband task should abort before websocket handshake completes"
stale_request.is_err(),
"pending sideband task sent websocket request after close: {:?}",
stale_request.ok().map(|request| request.body_json())
);
realtime_server.shutdown().await;
@@ -749,11 +767,13 @@ async fn conversation_webrtc_sideband_connect_failure_closes_with_error() -> Res
)
.mount(&server)
.await;
let mut builder = test_codex().with_config(|config| {
let realtime_base_url = server.uri();
let mut builder = test_codex().with_config(move |config| {
config.experimental_realtime_ws_backend_prompt = Some("backend prompt".to_string());
config.experimental_realtime_ws_model = Some("realtime-test-model".to_string());
config.experimental_realtime_ws_startup_context = Some(String::new());
config.experimental_realtime_ws_base_url = Some("http://127.0.0.1:1".to_string());
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
config.model_provider.request_max_retries = Some(0);
config.realtime.version = RealtimeWsVersion::V1;
});
let test = builder.build(&server).await?;

View File

@@ -102,6 +102,8 @@ async fn websocket_turn_state_persists_within_turn_and_resets_after() -> Result<
ev_completed("resp-1"),
]],
response_headers: vec![(TURN_STATE_HEADER.to_string(), "ts-1".to_string())],
accept_started: None,
accept_release: None,
accept_delay: None,
close_after_requests: true,
},
@@ -112,6 +114,8 @@ async fn websocket_turn_state_persists_within_turn_and_resets_after() -> Result<
ev_completed("resp-2"),
]],
response_headers: Vec::new(),
accept_started: None,
accept_release: None,
accept_delay: None,
close_after_requests: true,
},
@@ -122,6 +126,8 @@ async fn websocket_turn_state_persists_within_turn_and_resets_after() -> Result<
ev_completed("resp-3"),
]],
response_headers: Vec::new(),
accept_started: None,
accept_release: None,
accept_delay: None,
close_after_requests: true,
},

View File

@@ -372,7 +372,16 @@ async fn wait_for_single_request(mock: &ResponseMock) -> ResponsesRequest {
async fn wait_for_file_removed(path: &Path) -> anyhow::Result<()> {
let deadline = Instant::now() + Duration::from_secs(10);
loop {
if !tokio::fs::try_exists(path).await? {
let exists = match tokio::fs::try_exists(path).await {
Ok(exists) => exists,
Err(err) if err.kind() == std::io::ErrorKind::PermissionDenied => {
// Windows can transiently deny metadata reads while another task
// is removing or resetting files in this workspace.
true
}
Err(err) => return Err(err.into()),
};
if !exists {
return Ok(());
}
assert!(

View File

@@ -227,22 +227,23 @@ WHERE id = ?
Ok(())
}
pub async fn mark_agent_job_completed(&self, job_id: &str) -> anyhow::Result<()> {
pub async fn mark_agent_job_completed(&self, job_id: &str) -> anyhow::Result<bool> {
let now = Utc::now().timestamp();
sqlx::query(
let result = sqlx::query(
r#"
UPDATE agent_jobs
SET status = ?, updated_at = ?, completed_at = ?, last_error = NULL
WHERE id = ?
WHERE id = ? AND status = ?
"#,
)
.bind(AgentJobStatus::Completed.as_str())
.bind(now)
.bind(now)
.bind(job_id)
.bind(AgentJobStatus::Running.as_str())
.execute(self.pool.as_ref())
.await?;
Ok(())
Ok(result.rows_affected() > 0)
}
pub async fn mark_agent_job_failed(
@@ -428,9 +429,46 @@ WHERE job_id = ? AND item_id = ? AND status = ?
item_id: &str,
reporting_thread_id: &str,
result_json: &Value,
) -> anyhow::Result<bool> {
self.report_agent_job_item_result_inner(
job_id,
item_id,
reporting_thread_id,
result_json,
/*cancel_job_reason*/ None,
)
.await
}
pub async fn report_agent_job_item_result_and_cancel_job(
&self,
job_id: &str,
item_id: &str,
reporting_thread_id: &str,
result_json: &Value,
cancel_job_reason: &str,
) -> anyhow::Result<bool> {
self.report_agent_job_item_result_inner(
job_id,
item_id,
reporting_thread_id,
result_json,
Some(cancel_job_reason),
)
.await
}
async fn report_agent_job_item_result_inner(
&self,
job_id: &str,
item_id: &str,
reporting_thread_id: &str,
result_json: &Value,
cancel_job_reason: Option<&str>,
) -> anyhow::Result<bool> {
let now = Utc::now().timestamp();
let serialized = serde_json::to_string(result_json)?;
let mut tx = self.pool.begin().await?;
let result = sqlx::query(
r#"
UPDATE agent_job_items
@@ -446,7 +484,7 @@ WHERE
job_id = ?
AND item_id = ?
AND status = ?
AND assigned_thread_id = ?
AND (assigned_thread_id = ? OR assigned_thread_id IS NULL)
"#,
)
.bind(AgentJobItemStatus::Completed.as_str())
@@ -458,9 +496,29 @@ WHERE
.bind(item_id)
.bind(AgentJobItemStatus::Running.as_str())
.bind(reporting_thread_id)
.execute(self.pool.as_ref())
.execute(&mut *tx)
.await?;
Ok(result.rows_affected() > 0)
let accepted = result.rows_affected() > 0;
if accepted && let Some(reason) = cancel_job_reason {
sqlx::query(
r#"
UPDATE agent_jobs
SET status = ?, updated_at = ?, completed_at = ?, last_error = ?
WHERE id = ? AND status IN (?, ?)
"#,
)
.bind(AgentJobStatus::Cancelled.as_str())
.bind(now)
.bind(now)
.bind(reason)
.bind(job_id)
.bind(AgentJobStatus::Pending.as_str())
.bind(AgentJobStatus::Running.as_str())
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
Ok(accepted)
}
pub async fn mark_agent_job_item_completed(
@@ -652,6 +710,113 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn report_agent_job_item_result_can_cancel_job_atomically() -> anyhow::Result<()> {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home, "test-provider".to_string()).await?;
let (job_id, item_id, thread_id) = create_running_single_item_job(runtime.as_ref()).await?;
let accepted = runtime
.report_agent_job_item_result_and_cancel_job(
job_id.as_str(),
item_id.as_str(),
thread_id.as_str(),
&json!({"ok": true}),
"cancelled by worker request",
)
.await?;
assert!(accepted);
let job = runtime
.get_agent_job(job_id.as_str())
.await?
.expect("job should exist");
assert_eq!(job.status, AgentJobStatus::Cancelled);
assert_eq!(
job.last_error,
Some("cancelled by worker request".to_string())
);
let item = runtime
.get_agent_job_item(job_id.as_str(), item_id.as_str())
.await?
.expect("job item should exist");
assert_eq!(item.status, AgentJobItemStatus::Completed);
assert_eq!(item.result_json, Some(json!({"ok": true})));
assert_eq!(item.assigned_thread_id, None);
let completed = runtime.mark_agent_job_completed(job_id.as_str()).await?;
assert!(!completed);
let job = runtime
.get_agent_job(job_id.as_str())
.await?
.expect("job should exist");
assert_eq!(job.status, AgentJobStatus::Cancelled);
Ok(())
}
#[tokio::test]
async fn report_agent_job_item_result_accepts_unassigned_running_item() -> anyhow::Result<()> {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home, "test-provider".to_string()).await?;
let job_id = "job-1".to_string();
let item_id = "item-1".to_string();
let thread_id = "thread-1".to_string();
runtime
.create_agent_job(
&AgentJobCreateParams {
id: job_id.clone(),
name: "test-job".to_string(),
instruction: "Return a result".to_string(),
auto_export: true,
max_runtime_seconds: None,
output_schema_json: None,
input_headers: vec!["path".to_string()],
input_csv_path: "/tmp/in.csv".to_string(),
output_csv_path: "/tmp/out.csv".to_string(),
},
&[AgentJobItemCreateParams {
item_id: item_id.clone(),
row_index: 0,
source_id: None,
row_json: json!({"path":"file-1"}),
}],
)
.await?;
runtime.mark_agent_job_running(job_id.as_str()).await?;
let marked_running = runtime
.mark_agent_job_item_running(job_id.as_str(), item_id.as_str())
.await?;
assert!(marked_running);
let accepted = runtime
.report_agent_job_item_result_and_cancel_job(
job_id.as_str(),
item_id.as_str(),
thread_id.as_str(),
&json!({"ok": true}),
"cancelled by worker request",
)
.await?;
assert!(accepted);
let job = runtime
.get_agent_job(job_id.as_str())
.await?
.expect("job should exist");
assert_eq!(job.status, AgentJobStatus::Cancelled);
let item = runtime
.get_agent_job_item(job_id.as_str(), item_id.as_str())
.await?
.expect("job item should exist");
assert_eq!(item.status, AgentJobItemStatus::Completed);
assert_eq!(item.result_json, Some(json!({"ok": true})));
assert_eq!(item.assigned_thread_id, None);
Ok(())
}
#[tokio::test]
async fn report_agent_job_item_result_rejects_late_reports() -> anyhow::Result<()> {
let codex_home = unique_temp_dir();

View File

@@ -195,7 +195,7 @@ fn legacy_non_tty_powershell_emits_output() {
pwsh.display().to_string(),
"-NoProfile".to_string(),
"-Command".to_string(),
"Write-Output LEGACY-NONTTY-DIRECT".to_string(),
"'LEGACY-NONTTY-DIRECT'".to_string(),
],
cwd.as_path(),
HashMap::new(),
@@ -378,7 +378,7 @@ fn legacy_capture_powershell_emits_output() {
pwsh.display().to_string(),
"-NoProfile".to_string(),
"-Command".to_string(),
"Write-Output LEGACY-CAPTURE-DIRECT".to_string(),
"'LEGACY-CAPTURE-DIRECT'".to_string(),
],
cwd.as_path(),
HashMap::new(),
@@ -419,7 +419,7 @@ fn legacy_tty_powershell_emits_output_and_accepts_input() {
"-NoProfile".to_string(),
"-NoExit".to_string(),
"-Command".to_string(),
"$PID; Write-Output ready".to_string(),
"$PID; 'ready'".to_string(),
],
cwd.as_path(),
HashMap::new(),
@@ -434,7 +434,7 @@ fn legacy_tty_powershell_emits_output_and_accepts_input() {
let writer = spawned.session.writer_sender();
writer
.send(b"Write-Output second\n".to_vec())
.send(b"'second'\n".to_vec())
.await
.expect("send second command");
writer