Compare commits

...

19 Commits

Author SHA1 Message Date
starr-openai
755d128add Shard Windows arm64 nextest runs
Add a dynamic rust-ci-full test matrix so workflow_dispatch or shard-specific full-ci branch names can split the Windows arm64 nextest lane across 2 or 4 hosts while leaving the default push behavior unchanged.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 17:11:16 -07:00
starr-openai
cd8ea2f36b Keep sccache stats alive through CI jobs
Disable the sccache daemon idle timeout in rust-ci-full so long test phases can still report the compile-cache stats collected during the build phase.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 15:59:45 -07:00
starr-openai
fcb1fb8ec6 Re-enable Windows sccache in Rust CI
Let Windows rust-ci-full jobs use sccache again, store the fallback cache on the configured work drive, and set Cargo's rustc wrapper to an absolute sccache path so Windows subprocesses resolve it consistently.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 15:20:40 -07:00
starr-openai
077a3970d7 Use Dev Drive for Windows CI
Configure Windows Rust CI jobs and the shared Bazel CI setup to put temp, repository-cache, and output-root paths on the runner's fast work drive when available. Fall back to C: if no secondary drive or Dev Drive provisioning path is available.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 15:20:40 -07:00
starr-openai
5815dd6a4b Give Windows arm64 tests enough CI time
Let the Windows arm64 test matrix use a longer timeout after CI showed the lane spending most of the default 45 minutes compiling before nextest could finish.

Also pin nextest through taiki-e/install-action's supported tool version syntax so the requested version is not ignored.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 15:20:39 -07:00
starr-openai
296fa6df0c Serialize Windows process-heavy nextest cases
Windows rust-ci-full repeatedly times out in subprocess-heavy tests even when the global nextest thread count is capped. Isolate the recurring Windows-only families with nextest overrides so the rest of the suite can keep normal parallelism.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 15:20:39 -07:00
starr-openai
64c684bd57 Add Windows nextest thread override for rust-ci-full
Co-authored-by: Codex <noreply@openai.com>
2026-05-07 15:20:39 -07:00
starr-openai
ce5d84e43a Make pending sideband close test deterministic
Replace the realtime websocket accept-delay race with an explicit test-server gate so close is issued while the sideband connection is pending, then prove the closed conversation does not emit stale events or send sideband websocket requests.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 15:20:35 -07:00
starr-openai
926b8d77cd Tolerate transient Windows metadata denial in memory startup test
Keep polling when Windows temporarily denies metadata reads while the phase 2 memory workspace is being cleaned up, so the test still verifies the file is removed and the baseline becomes clean.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 14:48:09 -07:00
starr-openai
7cd5127421 Wait for agent shutdown before resume tests reopen IDs
Subscribe before test shutdown and close operations, then wait for the Shutdown status before resuming the same thread IDs. This removes the Windows live-writer race exposed by the full nextest run.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 14:48:09 -07:00
starr-openai
6a2ce743f1 Make Windows realtime shell test use successful cmd echo
Use a Windows command form that exits successfully in constrained CI shells and trim the expected newline in the delegated realtime shell-tool assertion.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 14:48:08 -07:00
starr-openai
32deb67fc6 Harden Windows realtime and agent resume tests
Avoid PowerShell command forms that depend on method invocation for the delegated realtime shell-tool test, and wait for a shutdown status before resuming the same subagent thread in the nickname/role restore test.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 14:48:08 -07:00
starr-openai
59d9e96d66 Use PowerShell literal output in sandbox tests
The legacy sandbox runs PowerShell in constrained language mode, so method calls fail and module-backed cmdlets may not autoload. Use literal string expressions for the PowerShell I/O smoke tests so they exercise process output without depending on cmdlets or method invocation.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 14:48:08 -07:00
starr-openai
097e3ef949 Avoid PowerShell module autoload in sandbox tests
Windows arm64 can launch pwsh in the legacy sandbox while still failing Write-Output because Microsoft.PowerShell.Utility cannot autoload. Use Console output in the legacy PowerShell smoke tests so they continue to verify sandbox process I/O without depending on module autoload.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 14:48:07 -07:00
starr-openai
f3afa1132d Fix rollout cwd fixture import
Import the Windows-aware test_path_buf helper from core_test_support where it is defined.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 14:48:07 -07:00
starr-openai
a666109389 Make rollout cwd fixtures drive-stable on Windows
Dev Drive setup can put temporary Codex homes on D:, which exposed test fixtures that wrote root-relative '/' rollout cwd values while assertions expected the Windows-aware C:\ root helper. Use the same test_path_buf helper when creating and expecting fake rollout cwd values so the tests remain independent of the process temp drive.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 14:48:07 -07:00
starr-openai
16648c8d1c Make realtime sideband failure test deterministic
Use the existing mock server as the sideband failure endpoint instead of relying on an OS-level connection refusal from 127.0.0.1:1. Disable retries in this failure-path test so Windows CI does not spend the default retry budget before emitting the expected error/close events.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 14:48:06 -07:00
starr-openai
7d2c8dbec4 Fix agent job worker assignment race
Claim job items before spawning workers and allow reports to complete unassigned running items, so fast workers cannot lose stop=true reports before the parent records their thread id.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 14:48:06 -07:00
starr-openai
bfe33e5a7a Make agent job stop cancellation atomic
A worker stop request used to record the item result and job cancellation in separate updates, so the job runner could observe the item completion first and continue spawning pending work. Commit both state updates together and prevent completion from overwriting a final cancellation.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 14:48:05 -07:00
18 changed files with 720 additions and 136 deletions

View File

@@ -35,6 +35,11 @@ runs:
- name: Set up Bazel
uses: bazel-contrib/setup-bazel@c5acdfb288317d0b5c0bbd7a396a3dc868bb0f86 # 0.19.0
- name: Configure Dev Drive (Windows)
if: runner.os == 'Windows'
shell: pwsh
run: ./.github/scripts/setup-dev-drive.ps1
- name: Configure Bazel repository cache
id: configure_bazel_repository_cache
shell: pwsh
@@ -42,7 +47,12 @@ runs:
# Keep the repository cache under HOME on all runners. Windows `D:\a`
# cache paths match `.bazelrc`, but `actions/cache/restore` currently
# returns HTTP 400 for that path in the Windows clippy job.
$repositoryCachePath = Join-Path $HOME '.cache/bazel-repo-cache'
$cacheRoot = if ($env:RUNNER_OS -eq 'Windows' -and $env:DEV_DRIVE) {
$env:DEV_DRIVE
} else {
$HOME
}
$repositoryCachePath = Join-Path $cacheRoot '.cache/bazel-repo-cache'
"repository-cache-path=$repositoryCachePath" | Out-File -FilePath $env:GITHUB_OUTPUT -Encoding utf8 -Append
"BAZEL_REPOSITORY_CACHE=$repositoryCachePath" | Out-File -FilePath $env:GITHUB_ENV -Encoding utf8 -Append
@@ -50,11 +60,10 @@ runs:
if: runner.os == 'Windows'
shell: pwsh
run: |
# Use the shortest available drive to reduce argv/path length issues,
# but avoid the drive root because some Windows test launchers mis-handle
# MANIFEST paths there.
$hasDDrive = Test-Path 'D:\'
$bazelOutputUserRoot = if ($hasDDrive) { 'D:\b' } else { 'C:\b' }
# Keep Bazel on the fast Windows work drive, but avoid the drive root
# because some Windows test launchers mis-handle MANIFEST paths there.
$driveRoot = if ($env:DEV_DRIVE) { $env:DEV_DRIVE } elseif (Test-Path 'D:\') { 'D:' } else { 'C:' }
$bazelOutputUserRoot = Join-Path $driveRoot 'b'
$repoContentsCache = Join-Path $env:RUNNER_TEMP "bazel-repo-contents-cache-$env:GITHUB_RUN_ID-$env:GITHUB_JOB"
"BAZEL_OUTPUT_USER_ROOT=$bazelOutputUserRoot" | Out-File -FilePath $env:GITHUB_ENV -Encoding utf8 -Append
"BAZEL_REPO_CONTENTS_CACHE=$repoContentsCache" | Out-File -FilePath $env:GITHUB_ENV -Encoding utf8 -Append

62
.github/scripts/setup-dev-drive.ps1 vendored Normal file
View File

@@ -0,0 +1,62 @@
# Configure a fast drive for Windows CI jobs.
#
# GitHub-hosted Windows runners do not always expose a secondary D: volume. When
# they do not, try to create a Dev Drive VHD and fall back to C: if the runner
# image does not allow that provisioning path.
function Use-FallbackDrive {
param([string]$Reason)
Write-Warning "$Reason Falling back to C:"
return "C:"
}
function Invoke-BestEffort {
param([scriptblock]$Script, [string]$Description)
try {
& $Script
} catch {
Write-Warning "$Description failed: $($_.Exception.Message)"
}
}
if (Test-Path "D:\") {
Write-Output "Using existing drive at D:"
$Drive = "D:"
} else {
try {
$VhdPath = Join-Path $env:RUNNER_TEMP "codex-dev-drive.vhdx"
$SizeBytes = 64GB
if (Test-Path $VhdPath) {
Remove-Item -Path $VhdPath -Force
}
New-VHD -Path $VhdPath -SizeBytes $SizeBytes -Dynamic -ErrorAction Stop | Out-Null
$Mounted = Mount-VHD -Path $VhdPath -Passthru -ErrorAction Stop
$Disk = $Mounted | Get-Disk -ErrorAction Stop
$Disk | Initialize-Disk -PartitionStyle GPT -ErrorAction Stop
$Partition = $Disk | New-Partition -AssignDriveLetter -UseMaximumSize -ErrorAction Stop
$Volume = $Partition | Format-Volume -FileSystem ReFS -NewFileSystemLabel "CodexDevDrive" -DevDrive -Confirm:$false -Force -ErrorAction Stop
$Drive = "$($Volume.DriveLetter):"
Invoke-BestEffort { fsutil devdrv trust $Drive } "Trusting Dev Drive $Drive"
Invoke-BestEffort { fsutil devdrv enable /disallowAv } "Disabling AV filter attachment for Dev Drives"
Invoke-BestEffort { fsutil devdrv query $Drive } "Querying Dev Drive $Drive"
Write-Output "Using Dev Drive at $Drive"
} catch {
$Drive = Use-FallbackDrive "Failed to create Dev Drive: $($_.Exception.Message)"
}
}
$Tmp = "$Drive\codex-tmp"
New-Item -Path $Tmp -ItemType Directory -Force | Out-Null
@(
"DEV_DRIVE=$Drive"
"TMP=$Tmp"
"TEMP=$Tmp"
) | Out-File -FilePath $env:GITHUB_ENV -Encoding utf8 -Append

View File

@@ -1,10 +1,32 @@
name: rust-ci-full
run-name: >-
rust-ci-full${{
github.event_name == 'workflow_dispatch' &&
format(' windows-nextest-{0} arm64-shards-{1}', inputs.windows_nextest_threads || 'default', inputs.windows_arm64_partitions || '1') ||
contains(github.ref_name, 'arm64-shards-4') && ' arm64-shards-4' ||
contains(github.ref_name, 'arm64-shards-2') && ' arm64-shards-2' ||
''
}}
on:
push:
branches:
- main
- "**full-ci**"
workflow_dispatch:
inputs:
windows_nextest_threads:
description: "Optional nextest --test-threads override for Windows test jobs"
required: false
type: string
windows_arm64_partitions:
description: "Number of Windows arm64 nextest partitions"
required: false
default: "1"
type: choice
options:
- "1"
- "2"
- "4"
# CI builds in debug (dev) for faster signal.
@@ -147,9 +169,10 @@ jobs:
# Speed up repeated builds across CI runs by caching compiled objects, except on
# arm64 macOS runners cross-targeting x86_64 where ring/cc-rs can produce
# mixed-architecture archives under sccache.
USE_SCCACHE: ${{ (startsWith(matrix.runner, 'windows') || (matrix.runner == 'macos-15-xlarge' && matrix.target == 'x86_64-apple-darwin')) && 'false' || 'true' }}
USE_SCCACHE: ${{ (matrix.runner == 'macos-15-xlarge' && matrix.target == 'x86_64-apple-darwin') && 'false' || 'true' }}
CARGO_INCREMENTAL: "0"
SCCACHE_CACHE_SIZE: 10G
SCCACHE_IDLE_TIMEOUT: "0"
# In rust-ci, representative release-profile checks use thin LTO for faster feedback.
CARGO_PROFILE_RELEASE_LTO: ${{ matrix.profile == 'release' && 'thin' || 'fat' }}
@@ -234,6 +257,10 @@ jobs:
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Configure Dev Drive (Windows)
if: ${{ runner.os == 'Windows' }}
shell: pwsh
run: ../.github/scripts/setup-dev-drive.ps1
- name: Install Linux build dependencies
if: ${{ runner.os == 'Linux' }}
shell: bash
@@ -294,10 +321,10 @@ jobs:
# Install and restore sccache cache
- name: Install sccache
if: ${{ env.USE_SCCACHE == 'true' }}
uses: taiki-e/install-action@44c6d64aa62cd779e873306675c7a58e86d6d532 # v2.62.49
uses: taiki-e/install-action@cca35edeb1d01366c2843b68fc3ca441446d73d3 # v2.77.1
with:
tool: sccache
version: 0.7.5
tool: sccache@0.14.0
fallback: none
- name: Configure sccache backend
if: ${{ env.USE_SCCACHE == 'true' }}
@@ -309,32 +336,52 @@ jobs:
echo "Using sccache GitHub backend"
else
echo "SCCACHE_GHA_ENABLED=false" >> "$GITHUB_ENV"
echo "SCCACHE_DIR=${{ github.workspace }}/.sccache" >> "$GITHUB_ENV"
if [[ -n "${DEV_DRIVE:-}" ]]; then
echo "SCCACHE_DIR=${DEV_DRIVE}\\.sccache" >> "$GITHUB_ENV"
else
echo "SCCACHE_DIR=${{ github.workspace }}/.sccache" >> "$GITHUB_ENV"
fi
echo "Using sccache local disk + actions/cache fallback"
fi
- name: Enable sccache wrapper
if: ${{ env.USE_SCCACHE == 'true' }}
shell: bash
run: echo "RUSTC_WRAPPER=sccache" >> "$GITHUB_ENV"
run: |
set -euo pipefail
wrapper="$(command -v sccache)"
if [[ "${RUNNER_OS}" == "Windows" ]] && command -v cygpath >/dev/null 2>&1; then
wrapper="$(cygpath -w "${wrapper}")"
fi
echo "RUSTC_WRAPPER=${wrapper}" >> "$GITHUB_ENV"
echo "CARGO_BUILD_RUSTC_WRAPPER=${wrapper}" >> "$GITHUB_ENV"
- name: Restore sccache cache (fallback)
if: ${{ env.USE_SCCACHE == 'true' && env.SCCACHE_GHA_ENABLED != 'true' }}
id: cache_sccache_restore
uses: actions/cache/restore@668228422ae6a00e4ad889ee87cd7109ec5666a7 # v5.0.4
with:
path: ${{ github.workspace }}/.sccache/
path: ${{ env.SCCACHE_DIR }}
key: sccache-${{ matrix.runner }}-${{ matrix.target }}-${{ matrix.profile }}-${{ steps.lockhash.outputs.hash }}-${{ github.run_id }}
restore-keys: |
sccache-${{ matrix.runner }}-${{ matrix.target }}-${{ matrix.profile }}-${{ steps.lockhash.outputs.hash }}-
sccache-${{ matrix.runner }}-${{ matrix.target }}-${{ matrix.profile }}-
- name: Start sccache server (Windows)
if: ${{ env.USE_SCCACHE == 'true' && runner.os == 'Windows' }}
shell: bash
run: |
set -euo pipefail
sccache --start-server
sccache --show-stats
- if: ${{ matrix.target == 'x86_64-unknown-linux-musl' || matrix.target == 'aarch64-unknown-linux-musl'}}
name: Disable sccache wrapper (musl)
shell: bash
run: |
set -euo pipefail
echo "RUSTC_WRAPPER=" >> "$GITHUB_ENV"
echo "CARGO_BUILD_RUSTC_WRAPPER=" >> "$GITHUB_ENV"
echo "RUSTC_WORKSPACE_WRAPPER=" >> "$GITHUB_ENV"
- if: ${{ matrix.target == 'x86_64-unknown-linux-musl' || matrix.target == 'aarch64-unknown-linux-musl'}}
@@ -478,7 +525,7 @@ jobs:
continue-on-error: true
uses: actions/cache/save@668228422ae6a00e4ad889ee87cd7109ec5666a7 # v5.0.4
with:
path: ${{ github.workspace }}/.sccache/
path: ${{ env.SCCACHE_DIR }}
key: sccache-${{ matrix.runner }}-${{ matrix.target }}-${{ matrix.profile }}-${{ steps.lockhash.outputs.hash }}-${{ github.run_id }}
- name: sccache stats
@@ -507,13 +554,119 @@ jobs:
/var/cache/apt
key: apt-${{ matrix.runner }}-${{ matrix.target }}-v1
test_matrix:
name: Select test matrix
runs-on: ubuntu-24.04
outputs:
matrix: ${{ steps.matrix.outputs.matrix }}
env:
WINDOWS_ARM64_PARTITIONS: >-
${{
github.event_name == 'workflow_dispatch' && inputs.windows_arm64_partitions ||
contains(github.ref_name, 'arm64-shards-4') && '4' ||
contains(github.ref_name, 'arm64-shards-2') && '2' ||
'1'
}}
steps:
- name: Build test matrix
id: matrix
shell: bash
run: |
set -euo pipefail
case "${WINDOWS_ARM64_PARTITIONS}" in
1|2|4) ;;
*)
echo "Unsupported WINDOWS_ARM64_PARTITIONS=${WINDOWS_ARM64_PARTITIONS}" >&2
exit 1
;;
esac
python3 - <<'PY' >> "$GITHUB_OUTPUT"
import json
import os
windows_arm64_partitions = int(os.environ["WINDOWS_ARM64_PARTITIONS"])
def row(runner, target, profile, *, timeout_minutes=None, remote_env=None, runs_on=None, partition_index=None):
partition_suffix = ""
partition_name = ""
nextest_partition = ""
if partition_index is not None and windows_arm64_partitions > 1:
partition_suffix = f"-part-{partition_index}-of-{windows_arm64_partitions}"
partition_name = f" (part {partition_index}/{windows_arm64_partitions})"
nextest_partition = f"hash:{partition_index}/{windows_arm64_partitions}"
entry = {
"runner": runner,
"target": target,
"profile": profile,
"partition_suffix": partition_suffix,
"partition_name": partition_name,
"nextest_partition": nextest_partition,
}
if timeout_minutes is not None:
entry["timeout_minutes"] = timeout_minutes
if remote_env is not None:
entry["remote_env"] = remote_env
if runs_on is not None:
entry["runs_on"] = runs_on
return entry
codex_runners = {
"linux-x64": {"group": "codex-runners", "labels": "codex-linux-x64"},
"linux-arm64": {"group": "codex-runners", "labels": "codex-linux-arm64"},
"windows-x64": {"group": "codex-runners", "labels": "codex-windows-x64"},
"windows-arm64": {"group": "codex-runners", "labels": "codex-windows-arm64"},
}
include = [
row("macos-15-xlarge", "aarch64-apple-darwin", "dev"),
row(
"ubuntu-24.04",
"x86_64-unknown-linux-gnu",
"dev",
remote_env="true",
runs_on=codex_runners["linux-x64"],
),
row(
"ubuntu-24.04-arm",
"aarch64-unknown-linux-gnu",
"dev",
runs_on=codex_runners["linux-arm64"],
),
row(
"windows-x64",
"x86_64-pc-windows-msvc",
"dev",
runs_on=codex_runners["windows-x64"],
),
]
for partition_index in range(1, windows_arm64_partitions + 1):
include.append(
row(
"windows-arm64",
"aarch64-pc-windows-msvc",
"dev",
timeout_minutes=75,
runs_on=codex_runners["windows-arm64"],
partition_index=partition_index,
)
)
print("matrix<<JSON")
print(json.dumps({"include": include}, separators=(",", ":")))
print("JSON")
PY
tests:
name: Tests — ${{ matrix.runner }} - ${{ matrix.target }}${{ matrix.remote_env == 'true' && ' (remote)' || '' }}
name: Tests — ${{ matrix.runner }} - ${{ matrix.target }}${{ matrix.remote_env == 'true' && ' (remote)' || '' }}${{ matrix.partition_name }}
needs: test_matrix
runs-on: ${{ matrix.runs_on || matrix.runner }}
# Perhaps we can bring this back down to 30m once we finish the cutover
# from tui_app_server/ to tui/. Incidentally, windows-arm64 was the main
# offender for exceeding the timeout.
timeout-minutes: 45
# Perhaps we can bring this back down once we finish the cutover from
# tui_app_server/ to tui/. Incidentally, windows-arm64 was the main offender
# for exceeding the timeout.
timeout-minutes: ${{ matrix.timeout_minutes || 45 }}
defaults:
run:
working-directory: codex-rs
@@ -521,45 +674,22 @@ jobs:
# Speed up repeated builds across CI runs by caching compiled objects, except on
# arm64 macOS runners cross-targeting x86_64 where ring/cc-rs can produce
# mixed-architecture archives under sccache.
USE_SCCACHE: ${{ (startsWith(matrix.runner, 'windows') || (matrix.runner == 'macos-15-xlarge' && matrix.target == 'x86_64-apple-darwin')) && 'false' || 'true' }}
USE_SCCACHE: ${{ (matrix.runner == 'macos-15-xlarge' && matrix.target == 'x86_64-apple-darwin') && 'false' || 'true' }}
CARGO_INCREMENTAL: "0"
SCCACHE_CACHE_SIZE: 10G
SCCACHE_IDLE_TIMEOUT: "0"
WINDOWS_NEXTEST_THREADS: ${{ github.event_name == 'workflow_dispatch' && inputs.windows_nextest_threads || '' }}
strategy:
fail-fast: false
matrix:
include:
- runner: macos-15-xlarge
target: aarch64-apple-darwin
profile: dev
- runner: ubuntu-24.04
target: x86_64-unknown-linux-gnu
profile: dev
remote_env: "true"
runs_on:
group: codex-runners
labels: codex-linux-x64
- runner: ubuntu-24.04-arm
target: aarch64-unknown-linux-gnu
profile: dev
runs_on:
group: codex-runners
labels: codex-linux-arm64
- runner: windows-x64
target: x86_64-pc-windows-msvc
profile: dev
runs_on:
group: codex-runners
labels: codex-windows-x64
- runner: windows-arm64
target: aarch64-pc-windows-msvc
profile: dev
runs_on:
group: codex-runners
labels: codex-windows-arm64
matrix: ${{ fromJSON(needs.test_matrix.outputs.matrix) }}
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Configure Dev Drive (Windows)
if: ${{ runner.os == 'Windows' }}
shell: pwsh
run: ../.github/scripts/setup-dev-drive.ps1
- name: Install Linux build dependencies
if: ${{ runner.os == 'Linux' }}
shell: bash
@@ -603,10 +733,10 @@ jobs:
- name: Install sccache
if: ${{ env.USE_SCCACHE == 'true' }}
uses: taiki-e/install-action@44c6d64aa62cd779e873306675c7a58e86d6d532 # v2.62.49
uses: taiki-e/install-action@cca35edeb1d01366c2843b68fc3ca441446d73d3 # v2.77.1
with:
tool: sccache
version: 0.7.5
tool: sccache@0.14.0
fallback: none
- name: Configure sccache backend
if: ${{ env.USE_SCCACHE == 'true' }}
@@ -618,30 +748,49 @@ jobs:
echo "Using sccache GitHub backend"
else
echo "SCCACHE_GHA_ENABLED=false" >> "$GITHUB_ENV"
echo "SCCACHE_DIR=${{ github.workspace }}/.sccache" >> "$GITHUB_ENV"
if [[ -n "${DEV_DRIVE:-}" ]]; then
echo "SCCACHE_DIR=${DEV_DRIVE}\\.sccache" >> "$GITHUB_ENV"
else
echo "SCCACHE_DIR=${{ github.workspace }}/.sccache" >> "$GITHUB_ENV"
fi
echo "Using sccache local disk + actions/cache fallback"
fi
- name: Enable sccache wrapper
if: ${{ env.USE_SCCACHE == 'true' }}
shell: bash
run: echo "RUSTC_WRAPPER=sccache" >> "$GITHUB_ENV"
run: |
set -euo pipefail
wrapper="$(command -v sccache)"
if [[ "${RUNNER_OS}" == "Windows" ]] && command -v cygpath >/dev/null 2>&1; then
wrapper="$(cygpath -w "${wrapper}")"
fi
echo "RUSTC_WRAPPER=${wrapper}" >> "$GITHUB_ENV"
echo "CARGO_BUILD_RUSTC_WRAPPER=${wrapper}" >> "$GITHUB_ENV"
- name: Restore sccache cache (fallback)
if: ${{ env.USE_SCCACHE == 'true' && env.SCCACHE_GHA_ENABLED != 'true' }}
id: cache_sccache_restore
uses: actions/cache/restore@668228422ae6a00e4ad889ee87cd7109ec5666a7 # v5.0.4
with:
path: ${{ github.workspace }}/.sccache/
key: sccache-${{ matrix.runner }}-${{ matrix.target }}-${{ matrix.profile }}-${{ steps.lockhash.outputs.hash }}-${{ github.run_id }}
path: ${{ env.SCCACHE_DIR }}
key: sccache-${{ matrix.runner }}-${{ matrix.target }}-${{ matrix.profile }}${{ matrix.partition_suffix }}-${{ steps.lockhash.outputs.hash }}-${{ github.run_id }}
restore-keys: |
sccache-${{ matrix.runner }}-${{ matrix.target }}-${{ matrix.profile }}${{ matrix.partition_suffix }}-${{ steps.lockhash.outputs.hash }}-
sccache-${{ matrix.runner }}-${{ matrix.target }}-${{ matrix.profile }}-${{ steps.lockhash.outputs.hash }}-
sccache-${{ matrix.runner }}-${{ matrix.target }}-${{ matrix.profile }}-
- name: Start sccache server (Windows)
if: ${{ env.USE_SCCACHE == 'true' && runner.os == 'Windows' }}
shell: bash
run: |
set -euo pipefail
sccache --start-server
sccache --show-stats
- uses: taiki-e/install-action@44c6d64aa62cd779e873306675c7a58e86d6d532 # v2.62.49
with:
tool: nextest
version: 0.9.103
tool: nextest@0.9.103
- name: Enable unprivileged user namespaces (Linux)
if: runner.os == 'Linux'
@@ -666,17 +815,33 @@ jobs:
- name: tests
id: test
run: cargo nextest run --no-fail-fast --target ${{ matrix.target }} --cargo-profile ci-test --timings
shell: bash
run: |
set -euo pipefail
nextest_args=(
--no-fail-fast
--target "${{ matrix.target }}"
--cargo-profile ci-test
--timings
)
if [[ "${{ runner.os }}" == "Windows" && -n "${WINDOWS_NEXTEST_THREADS}" ]]; then
nextest_args+=(--test-threads "${WINDOWS_NEXTEST_THREADS}")
fi
if [[ -n "${NEXTEST_PARTITION}" ]]; then
nextest_args+=(--partition "${NEXTEST_PARTITION}")
fi
cargo nextest run "${nextest_args[@]}"
env:
RUST_BACKTRACE: 1
RUST_MIN_STACK: "8388608" # 8 MiB
NEXTEST_STATUS_LEVEL: leak
NEXTEST_PARTITION: ${{ matrix.nextest_partition }}
- name: Upload Cargo timings (nextest)
if: always()
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7.0.0
with:
name: cargo-timings-rust-ci-nextest-${{ matrix.target }}-${{ matrix.profile }}
name: cargo-timings-rust-ci-nextest-${{ matrix.target }}-${{ matrix.profile }}${{ matrix.partition_suffix }}
path: codex-rs/target/**/cargo-timings/cargo-timing.html
if-no-files-found: warn
@@ -697,8 +862,8 @@ jobs:
continue-on-error: true
uses: actions/cache/save@668228422ae6a00e4ad889ee87cd7109ec5666a7 # v5.0.4
with:
path: ${{ github.workspace }}/.sccache/
key: sccache-${{ matrix.runner }}-${{ matrix.target }}-${{ matrix.profile }}-${{ steps.lockhash.outputs.hash }}-${{ github.run_id }}
path: ${{ env.SCCACHE_DIR }}
key: sccache-${{ matrix.runner }}-${{ matrix.target }}-${{ matrix.profile }}${{ matrix.partition_suffix }}-${{ steps.lockhash.outputs.hash }}-${{ github.run_id }}
- name: sccache stats
if: always() && env.USE_SCCACHE == 'true'
@@ -743,6 +908,7 @@ jobs:
argument_comment_lint_package,
argument_comment_lint_prebuilt,
lint_build,
test_matrix,
tests,
]
if: always()
@@ -756,12 +922,14 @@ jobs:
echo "general: ${{ needs.general.result }}"
echo "shear : ${{ needs.cargo_shear.result }}"
echo "lint : ${{ needs.lint_build.result }}"
echo "matrix : ${{ needs.test_matrix.result }}"
echo "tests : ${{ needs.tests.result }}"
[[ '${{ needs.argument_comment_lint_package.result }}' == 'success' ]] || { echo 'argument_comment_lint_package failed'; exit 1; }
[[ '${{ needs.argument_comment_lint_prebuilt.result }}' == 'success' ]] || { echo 'argument_comment_lint_prebuilt failed'; exit 1; }
[[ '${{ needs.general.result }}' == 'success' ]] || { echo 'general failed'; exit 1; }
[[ '${{ needs.cargo_shear.result }}' == 'success' ]] || { echo 'cargo_shear failed'; exit 1; }
[[ '${{ needs.lint_build.result }}' == 'success' ]] || { echo 'lint_build failed'; exit 1; }
[[ '${{ needs.test_matrix.result }}' == 'success' ]] || { echo 'test_matrix failed'; exit 1; }
[[ '${{ needs.tests.result }}' == 'success' ]] || { echo 'tests failed'; exit 1; }
- name: sccache summary note

View File

@@ -14,6 +14,9 @@ max-threads = 1
[test-groups.windows_sandbox_legacy_sessions]
max-threads = 1
[test-groups.windows_process_heavy]
max-threads = 1
[[profile.default.overrides]]
# Do not add new tests here
filter = 'test(rmcp_client) | test(humanlike_typing_1000_chars_appears_live_no_placeholder)'
@@ -27,6 +30,41 @@ slow-timeout = { period = "30s", terminate-after = 2 }
filter = 'package(codex-app-server-protocol) & (test(typescript_schema_fixtures_match_generated) | test(json_schema_fixtures_match_generated) | test(generate_ts_with_experimental_api_retains_experimental_entries) | test(generated_ts_optional_nullable_fields_only_in_params) | test(generate_json_filters_experimental_fields_and_methods))'
test-group = 'app_server_protocol_codegen'
[[profile.default.overrides]]
# These Windows CI tests launch full Codex/app-server process trees. They have
# repeatedly timed out when nextest schedules them alongside similar tests.
platform = 'cfg(windows)'
filter = 'package(codex-core) & kind(test) & (test(cli_stream) | test(realtime_conversation))'
test-group = 'windows_process_heavy'
threads-required = "num-test-threads"
slow-timeout = { period = "1m", terminate-after = 4 }
[[profile.default.overrides]]
# The exec resume tests spawn the CLI and touch shared session state on disk.
platform = 'cfg(windows)'
filter = 'package(codex-exec) & kind(test) & test(exec_resume)'
test-group = 'windows_process_heavy'
threads-required = "num-test-threads"
slow-timeout = { period = "1m", terminate-after = 4 }
[[profile.default.overrides]]
# Keep the specific app-server subprocess-heavy cases isolated on Windows. This
# must stay before the broader codex-app-server override below.
platform = 'cfg(windows)'
filter = 'package(codex-app-server) & kind(test) & (test(thread_fork_can_exclude_turns_and_skip_restored_token_usage) | test(turn_start_resolves_sticky_thread_environments_and_turn_overrides) | test(message_processor_tracing_tests))'
test-group = 'windows_process_heavy'
threads-required = "num-test-threads"
slow-timeout = { period = "1m", terminate-after = 4 }
[[profile.default.overrides]]
# These tests create restricted-token Windows child processes and private
# desktops. Running them alone avoids contention with other subprocess tests.
platform = 'cfg(windows)'
filter = 'package(codex-windows-sandbox) & kind(test) & test(legacy_)'
test-group = 'windows_process_heavy'
threads-required = "num-test-threads"
slow-timeout = { period = "1m", terminate-after = 4 }
[[profile.default.overrides]]
# These integration tests spawn a fresh app-server subprocess per case.
# Keep the library unit tests parallel.

View File

@@ -8,6 +8,7 @@ use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::TokenCountEvent;
use codex_protocol::protocol::TokenUsage;
use codex_protocol::protocol::TokenUsageInfo;
use core_test_support::test_path_buf;
use serde_json::json;
use std::fs;
use std::fs::FileTimes;
@@ -134,7 +135,7 @@ pub fn create_fake_rollout_with_source(
id: conversation_id,
forked_from_id: None,
timestamp: meta_rfc3339.to_string(),
cwd: PathBuf::from("/"),
cwd: test_path_buf("/"),
originator: "codex".to_string(),
cli_version: "0.0.0".to_string(),
source,
@@ -218,7 +219,7 @@ pub fn create_fake_rollout_with_text_elements(
id: conversation_id,
forked_from_id: None,
timestamp: meta_rfc3339.to_string(),
cwd: PathBuf::from("/"),
cwd: test_path_buf("/"),
originator: "codex".to_string(),
cli_version: "0.0.0".to_string(),
source: SessionSource::Cli,

View File

@@ -31,6 +31,7 @@ use codex_thread_store::ThreadEventPersistenceMode;
use codex_thread_store::ThreadPersistenceMetadata;
use codex_thread_store::ThreadStore;
use codex_utils_absolute_path::AbsolutePathBuf;
use core_test_support::test_path_buf;
use pretty_assertions::assert_eq;
use std::path::Path;
use std::path::PathBuf;
@@ -56,7 +57,7 @@ fn expected_summary(conversation_id: ThreadId, path: PathBuf) -> ConversationSum
timestamp: Some(CREATED_AT_RFC3339.to_string()),
updated_at: Some(UPDATED_AT_RFC3339.to_string()),
model_provider: MODEL_PROVIDER.to_string(),
cwd: PathBuf::from("/"),
cwd: test_path_buf("/"),
cli_version: "0.0.0".to_string(),
source: SessionSource::Cli,
git_info: None,

View File

@@ -426,6 +426,8 @@ fn realtime_sideband_connection(
WebSocketConnectionConfig {
requests: realtime_server_events,
response_headers: Vec::new(),
accept_started: None,
accept_release: None,
accept_delay: None,
close_after_requests: true,
}
@@ -1044,6 +1046,8 @@ async fn realtime_webrtc_start_emits_sdp_notification() -> Result<()> {
"session": { "id": "sess_webrtc", "instructions": "backend prompt" }
})]],
response_headers: Vec::new(),
accept_started: None,
accept_release: None,
accept_delay: None,
close_after_requests: false,
}])
@@ -1836,7 +1840,10 @@ async fn webrtc_v2_tool_call_delegated_turn_can_execute_shell_tool() -> Result<(
};
assert_eq!(id.as_str(), "shell_call");
assert_eq!(status, CommandExecutionStatus::Completed);
assert_eq!(aggregated_output.as_deref(), Some("realtime-tool-ok"));
assert_eq!(
aggregated_output.as_deref().map(str::trim),
Some("realtime-tool-ok")
);
// Phase 3: verify the shell output reached Responses and the final delegated answer returned
// to realtime as a single function-call-output item.
@@ -2154,10 +2161,10 @@ fn realtime_tool_ok_command() -> Vec<String> {
#[cfg(windows)]
{
vec![
"powershell.exe".to_string(),
"-NoProfile".to_string(),
"-Command".to_string(),
"[Console]::Write('realtime-tool-ok')".to_string(),
"cmd.exe".to_string(),
"/D".to_string(),
"/C".to_string(),
"echo realtime-tool-ok".to_string(),
]
}

View File

@@ -239,6 +239,65 @@ async fn wait_for_live_thread_spawn_children(
.expect("expected persisted child tree");
}
async fn wait_for_agent_shutdown(
thread_id: ThreadId,
mut status_rx: tokio::sync::watch::Receiver<AgentStatus>,
) {
if matches!(status_rx.borrow().clone(), AgentStatus::Shutdown) {
return;
}
timeout(Duration::from_secs(5), async {
loop {
status_rx
.changed()
.await
.unwrap_or_else(|_| panic!("thread {thread_id} status should reach shutdown"));
if matches!(status_rx.borrow().clone(), AgentStatus::Shutdown) {
break;
}
}
})
.await
.unwrap_or_else(|_| panic!("thread {thread_id} should shut down before resume"));
}
async fn shutdown_live_agent_and_wait(control: &AgentControl, thread_id: ThreadId) {
let status_rx = control
.subscribe_status(thread_id)
.await
.expect("status subscription should succeed before shutdown");
let _ = control
.shutdown_live_agent(thread_id)
.await
.expect("thread shutdown should submit");
wait_for_agent_shutdown(thread_id, status_rx).await;
}
async fn close_agent_and_wait(
control: &AgentControl,
agent_id: ThreadId,
shutdown_ids: &[ThreadId],
) {
let mut status_rxs = Vec::with_capacity(shutdown_ids.len());
for thread_id in shutdown_ids {
status_rxs.push((
*thread_id,
control
.subscribe_status(*thread_id)
.await
.expect("status subscription should succeed before close"),
));
}
let _ = control
.close_agent(agent_id)
.await
.expect("agent close should succeed");
for (thread_id, status_rx) in status_rxs {
wait_for_agent_shutdown(thread_id, status_rx).await;
}
}
#[tokio::test]
async fn send_input_errors_when_manager_dropped() {
let control = AgentControl::default();
@@ -1626,11 +1685,9 @@ async fn resume_thread_subagent_restores_stored_nickname_and_role() {
.await
.expect("child thread metadata should be persisted to sqlite before shutdown");
let _ = harness
.control
.shutdown_live_agent(child_thread_id)
.await
.expect("child shutdown should submit");
drop(status_rx);
shutdown_live_agent_and_wait(&harness.control, child_thread_id).await;
drop(child_thread);
let resumed_thread_id = harness
.control
@@ -1699,11 +1756,8 @@ async fn resume_agent_from_rollout_reads_archived_rollout_path() {
.await
.expect("child thread should exist");
persist_thread_for_tree_resume(&child_thread, "persist before archiving").await;
let _ = harness
.control
.shutdown_live_agent(child_thread_id)
.await
.expect("child shutdown should succeed");
shutdown_live_agent_and_wait(&harness.control, child_thread_id).await;
drop(child_thread);
let store = LocalThreadStore::new(
LocalThreadStoreConfig::from_config(&harness.config),
harness.state_db.clone(),
@@ -1993,11 +2047,12 @@ async fn shutdown_agent_tree_closes_descendants_when_started_at_child() {
wait_for_live_thread_spawn_children(&harness.control, child_thread_id, &[grandchild_thread_id])
.await;
let _ = harness
.control
.close_agent(child_thread_id)
.await
.expect("child close should succeed");
close_agent_and_wait(
&harness.control,
child_thread_id,
&[child_thread_id, grandchild_thread_id],
)
.await;
let _ = harness
.control
@@ -2085,16 +2140,14 @@ async fn resume_agent_from_rollout_does_not_reopen_closed_descendants() {
wait_for_live_thread_spawn_children(&harness.control, child_thread_id, &[grandchild_thread_id])
.await;
let _ = harness
.control
.close_agent(child_thread_id)
.await
.expect("child close should succeed");
let _ = harness
.control
.shutdown_live_agent(parent_thread_id)
.await
.expect("parent shutdown should succeed");
close_agent_and_wait(
&harness.control,
child_thread_id,
&[child_thread_id, grandchild_thread_id],
)
.await;
shutdown_live_agent_and_wait(&harness.control, parent_thread_id).await;
drop(parent_thread);
let resumed_parent_thread_id = harness
.control
@@ -2180,11 +2233,12 @@ async fn resume_closed_child_reopens_open_descendants() {
wait_for_live_thread_spawn_children(&harness.control, child_thread_id, &[grandchild_thread_id])
.await;
let _ = harness
.control
.close_agent(child_thread_id)
.await
.expect("child close should succeed");
close_agent_and_wait(
&harness.control,
child_thread_id,
&[child_thread_id, grandchild_thread_id],
)
.await;
let resumed_child_thread_id = harness
.control

View File

@@ -196,6 +196,12 @@ async fn run_agent_job_loop(
)
.await?;
for item in pending_items {
let claimed = db
.mark_agent_job_item_running(job_id.as_str(), item.item_id.as_str())
.await?;
if !claimed {
continue;
}
let prompt = build_worker_prompt(&job, &item)?;
let items = vec![UserInput::Text {
text: prompt,
@@ -240,7 +246,7 @@ async fn run_agent_job_loop(
}
};
let assigned = db
.mark_agent_job_item_running_with_thread(
.set_agent_job_item_thread(
job_id.as_str(),
item.item_id.as_str(),
thread_id.to_string().as_str(),

View File

@@ -61,27 +61,31 @@ pub async fn handle(
}
let db = required_state_db(&session)?;
let reporting_thread_id = session.conversation_id.to_string();
let accepted = db
.report_agent_job_item_result(
let accepted = if args.stop.unwrap_or(false) {
db.report_agent_job_item_result_and_cancel_job(
args.job_id.as_str(),
args.item_id.as_str(),
reporting_thread_id.as_str(),
&args.result,
"cancelled by worker request",
)
.await
} else {
db.report_agent_job_item_result(
args.job_id.as_str(),
args.item_id.as_str(),
reporting_thread_id.as_str(),
&args.result,
)
.await
.map_err(|err| {
let job_id = args.job_id.as_str();
let item_id = args.item_id.as_str();
FunctionCallError::RespondToModel(format!(
"failed to record agent job result for {job_id} / {item_id}: {err}"
))
})?;
if accepted && args.stop.unwrap_or(false) {
let message = "cancelled by worker request";
let _ = db
.mark_agent_job_cancelled(args.job_id.as_str(), message)
.await;
}
.map_err(|err| {
let job_id = args.job_id.as_str();
let item_id = args.item_id.as_str();
FunctionCallError::RespondToModel(format!(
"failed to record agent job result for {job_id} / {item_id}: {err}"
))
})?;
let content =
serde_json::to_string(&ReportAgentJobResultToolResult { accepted }).map_err(|err| {
FunctionCallError::Fatal(format!(

View File

@@ -446,6 +446,11 @@ impl WebSocketHandshake {
pub struct WebSocketConnectionConfig {
pub requests: Vec<Vec<Value>>,
pub response_headers: Vec<(String, String)>,
/// Optional notification fired after the TCP connection is accepted and before the websocket
/// handshake is accepted.
pub accept_started: Option<Arc<Notify>>,
/// Optional gate that blocks websocket handshake acceptance until the notifier is signalled.
pub accept_release: Option<Arc<Notify>>,
/// Optional delay inserted before accepting the websocket handshake.
///
/// Tests use this to force websocket setup into an in-flight state so first-turn warmup paths
@@ -1254,6 +1259,8 @@ pub async fn start_websocket_server(connections: Vec<Vec<Vec<Value>>>) -> WebSoc
.map(|requests| WebSocketConnectionConfig {
requests,
response_headers: Vec::new(),
accept_started: None,
accept_release: None,
accept_delay: None,
close_after_requests: true,
})
@@ -1298,12 +1305,27 @@ pub async fn start_websocket_server_with_headers(
continue;
};
if let Some(accept_started) = &connection.accept_started {
accept_started.notify_one();
}
if let Some(accept_release) = &connection.accept_release {
tokio::select! {
_ = accept_release.notified() => {}
_ = &mut shutdown_rx => return,
}
}
if let Some(delay) = connection.accept_delay {
tokio::time::sleep(delay).await;
tokio::select! {
_ = tokio::time::sleep(delay) => {}
_ = &mut shutdown_rx => return,
}
}
let response_headers = connection.response_headers.clone();
let handshake_log = Arc::clone(&handshakes);
let pending_handshake = Arc::new(Mutex::new(None));
let callback_handshake = Arc::clone(&pending_handshake);
let callback = move |req: &Request, mut response: Response| {
let headers = req
.headers()
@@ -1315,7 +1337,7 @@ pub async fn start_websocket_server_with_headers(
.map(|value| (name.as_str().to_string(), value.to_string()))
})
.collect();
handshake_log.lock().unwrap().push(WebSocketHandshake {
*callback_handshake.lock().unwrap() = Some(WebSocketHandshake {
uri: req.uri().to_string(),
headers,
});
@@ -1344,6 +1366,10 @@ pub async fn start_websocket_server_with_headers(
Err(_) => continue,
};
if let Some(handshake) = pending_handshake.lock().unwrap().take() {
handshakes.lock().unwrap().push(handshake);
}
let connection_index = {
let mut log = requests.lock().unwrap();
log.push(Vec::new());

View File

@@ -121,6 +121,8 @@ async fn websocket_first_turn_handles_handshake_delay_with_startup_prewarm() ->
],
],
response_headers: Vec::new(),
accept_started: None,
accept_release: None,
// Delay handshake so turn processing must tolerate websocket startup latency.
accept_delay: Some(Duration::from_millis(150)),
close_after_requests: true,

View File

@@ -941,6 +941,8 @@ async fn responses_websocket_emits_reasoning_included_event() {
let server = start_websocket_server_with_headers(vec![WebSocketConnectionConfig {
requests: vec![vec![ev_response_created("resp-1"), ev_completed("resp-1")]],
response_headers: vec![("X-Reasoning-Included".to_string(), "true".to_string())],
accept_started: None,
accept_release: None,
accept_delay: None,
close_after_requests: true,
}])
@@ -1015,6 +1017,8 @@ async fn responses_websocket_emits_rate_limit_events() {
("X-Models-Etag".to_string(), "etag-123".to_string()),
("X-Reasoning-Included".to_string(), "true".to_string()),
],
accept_started: None,
accept_release: None,
accept_delay: None,
close_after_requests: true,
}])
@@ -1751,6 +1755,8 @@ async fn responses_websocket_v2_surfaces_terminal_error_without_close_handshake(
})],
],
response_headers: Vec::new(),
accept_started: None,
accept_release: None,
accept_delay: None,
close_after_requests: false,
}])

View File

@@ -48,6 +48,7 @@ use std::process::Command;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use tokio::sync::Notify;
use tokio::sync::oneshot;
use tokio::time::timeout;
use wiremock::Match;
@@ -492,6 +493,8 @@ async fn conversation_webrtc_start_posts_generated_session() -> Result<()> {
vec![],
],
response_headers: Vec::new(),
accept_started: None,
accept_release: None,
accept_delay: Some(sideband_accept_delay),
close_after_requests: false,
}])
@@ -659,10 +662,14 @@ async fn conversation_webrtc_close_while_sideband_connecting_drops_pending_join(
)
.mount(&server)
.await;
let accept_started = Arc::new(Notify::new());
let accept_release = Arc::new(Notify::new());
let realtime_server = start_websocket_server_with_headers(vec![WebSocketConnectionConfig {
requests: vec![vec![]],
response_headers: Vec::new(),
accept_delay: Some(Duration::from_millis(500)),
accept_started: Some(Arc::clone(&accept_started)),
accept_release: Some(Arc::clone(&accept_release)),
accept_delay: None,
close_after_requests: false,
}])
.await;
@@ -699,6 +706,9 @@ async fn conversation_webrtc_close_while_sideband_connecting_drops_pending_join(
realtime_server.handshakes().is_empty(),
"sideband websocket should still be pending when SDP is emitted"
);
timeout(Duration::from_secs(5), accept_started.notified())
.await
.context("sideband websocket should connect before close")?;
test.codex.submit(Op::RealtimeConversationClose).await?;
let closed = wait_for_event_match(&test.codex, |msg| match msg {
@@ -726,9 +736,17 @@ async fn conversation_webrtc_close_while_sideband_connecting_drops_pending_join(
"pending sideband task leaked after close: {:?}",
stale_event.ok()
);
accept_release.notify_one();
let stale_request = timeout(Duration::from_millis(250), async {
realtime_server
.wait_for_request(/*connection_index*/ 0, /*request_index*/ 0)
.await
})
.await;
assert!(
realtime_server.handshakes().is_empty(),
"pending sideband task should abort before websocket handshake completes"
stale_request.is_err(),
"pending sideband task sent websocket request after close: {:?}",
stale_request.ok().map(|request| request.body_json())
);
realtime_server.shutdown().await;
@@ -749,11 +767,13 @@ async fn conversation_webrtc_sideband_connect_failure_closes_with_error() -> Res
)
.mount(&server)
.await;
let mut builder = test_codex().with_config(|config| {
let realtime_base_url = server.uri();
let mut builder = test_codex().with_config(move |config| {
config.experimental_realtime_ws_backend_prompt = Some("backend prompt".to_string());
config.experimental_realtime_ws_model = Some("realtime-test-model".to_string());
config.experimental_realtime_ws_startup_context = Some(String::new());
config.experimental_realtime_ws_base_url = Some("http://127.0.0.1:1".to_string());
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
config.model_provider.request_max_retries = Some(0);
config.realtime.version = RealtimeWsVersion::V1;
});
let test = builder.build(&server).await?;

View File

@@ -102,6 +102,8 @@ async fn websocket_turn_state_persists_within_turn_and_resets_after() -> Result<
ev_completed("resp-1"),
]],
response_headers: vec![(TURN_STATE_HEADER.to_string(), "ts-1".to_string())],
accept_started: None,
accept_release: None,
accept_delay: None,
close_after_requests: true,
},
@@ -112,6 +114,8 @@ async fn websocket_turn_state_persists_within_turn_and_resets_after() -> Result<
ev_completed("resp-2"),
]],
response_headers: Vec::new(),
accept_started: None,
accept_release: None,
accept_delay: None,
close_after_requests: true,
},
@@ -122,6 +126,8 @@ async fn websocket_turn_state_persists_within_turn_and_resets_after() -> Result<
ev_completed("resp-3"),
]],
response_headers: Vec::new(),
accept_started: None,
accept_release: None,
accept_delay: None,
close_after_requests: true,
},

View File

@@ -372,7 +372,16 @@ async fn wait_for_single_request(mock: &ResponseMock) -> ResponsesRequest {
async fn wait_for_file_removed(path: &Path) -> anyhow::Result<()> {
let deadline = Instant::now() + Duration::from_secs(10);
loop {
if !tokio::fs::try_exists(path).await? {
let exists = match tokio::fs::try_exists(path).await {
Ok(exists) => exists,
Err(err) if err.kind() == std::io::ErrorKind::PermissionDenied => {
// Windows can transiently deny metadata reads while another task
// is removing or resetting files in this workspace.
true
}
Err(err) => return Err(err.into()),
};
if !exists {
return Ok(());
}
assert!(

View File

@@ -227,22 +227,23 @@ WHERE id = ?
Ok(())
}
pub async fn mark_agent_job_completed(&self, job_id: &str) -> anyhow::Result<()> {
pub async fn mark_agent_job_completed(&self, job_id: &str) -> anyhow::Result<bool> {
let now = Utc::now().timestamp();
sqlx::query(
let result = sqlx::query(
r#"
UPDATE agent_jobs
SET status = ?, updated_at = ?, completed_at = ?, last_error = NULL
WHERE id = ?
WHERE id = ? AND status = ?
"#,
)
.bind(AgentJobStatus::Completed.as_str())
.bind(now)
.bind(now)
.bind(job_id)
.bind(AgentJobStatus::Running.as_str())
.execute(self.pool.as_ref())
.await?;
Ok(())
Ok(result.rows_affected() > 0)
}
pub async fn mark_agent_job_failed(
@@ -428,9 +429,46 @@ WHERE job_id = ? AND item_id = ? AND status = ?
item_id: &str,
reporting_thread_id: &str,
result_json: &Value,
) -> anyhow::Result<bool> {
self.report_agent_job_item_result_inner(
job_id,
item_id,
reporting_thread_id,
result_json,
/*cancel_job_reason*/ None,
)
.await
}
pub async fn report_agent_job_item_result_and_cancel_job(
&self,
job_id: &str,
item_id: &str,
reporting_thread_id: &str,
result_json: &Value,
cancel_job_reason: &str,
) -> anyhow::Result<bool> {
self.report_agent_job_item_result_inner(
job_id,
item_id,
reporting_thread_id,
result_json,
Some(cancel_job_reason),
)
.await
}
async fn report_agent_job_item_result_inner(
&self,
job_id: &str,
item_id: &str,
reporting_thread_id: &str,
result_json: &Value,
cancel_job_reason: Option<&str>,
) -> anyhow::Result<bool> {
let now = Utc::now().timestamp();
let serialized = serde_json::to_string(result_json)?;
let mut tx = self.pool.begin().await?;
let result = sqlx::query(
r#"
UPDATE agent_job_items
@@ -446,7 +484,7 @@ WHERE
job_id = ?
AND item_id = ?
AND status = ?
AND assigned_thread_id = ?
AND (assigned_thread_id = ? OR assigned_thread_id IS NULL)
"#,
)
.bind(AgentJobItemStatus::Completed.as_str())
@@ -458,9 +496,29 @@ WHERE
.bind(item_id)
.bind(AgentJobItemStatus::Running.as_str())
.bind(reporting_thread_id)
.execute(self.pool.as_ref())
.execute(&mut *tx)
.await?;
Ok(result.rows_affected() > 0)
let accepted = result.rows_affected() > 0;
if accepted && let Some(reason) = cancel_job_reason {
sqlx::query(
r#"
UPDATE agent_jobs
SET status = ?, updated_at = ?, completed_at = ?, last_error = ?
WHERE id = ? AND status IN (?, ?)
"#,
)
.bind(AgentJobStatus::Cancelled.as_str())
.bind(now)
.bind(now)
.bind(reason)
.bind(job_id)
.bind(AgentJobStatus::Pending.as_str())
.bind(AgentJobStatus::Running.as_str())
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
Ok(accepted)
}
pub async fn mark_agent_job_item_completed(
@@ -652,6 +710,113 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn report_agent_job_item_result_can_cancel_job_atomically() -> anyhow::Result<()> {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home, "test-provider".to_string()).await?;
let (job_id, item_id, thread_id) = create_running_single_item_job(runtime.as_ref()).await?;
let accepted = runtime
.report_agent_job_item_result_and_cancel_job(
job_id.as_str(),
item_id.as_str(),
thread_id.as_str(),
&json!({"ok": true}),
"cancelled by worker request",
)
.await?;
assert!(accepted);
let job = runtime
.get_agent_job(job_id.as_str())
.await?
.expect("job should exist");
assert_eq!(job.status, AgentJobStatus::Cancelled);
assert_eq!(
job.last_error,
Some("cancelled by worker request".to_string())
);
let item = runtime
.get_agent_job_item(job_id.as_str(), item_id.as_str())
.await?
.expect("job item should exist");
assert_eq!(item.status, AgentJobItemStatus::Completed);
assert_eq!(item.result_json, Some(json!({"ok": true})));
assert_eq!(item.assigned_thread_id, None);
let completed = runtime.mark_agent_job_completed(job_id.as_str()).await?;
assert!(!completed);
let job = runtime
.get_agent_job(job_id.as_str())
.await?
.expect("job should exist");
assert_eq!(job.status, AgentJobStatus::Cancelled);
Ok(())
}
#[tokio::test]
async fn report_agent_job_item_result_accepts_unassigned_running_item() -> anyhow::Result<()> {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home, "test-provider".to_string()).await?;
let job_id = "job-1".to_string();
let item_id = "item-1".to_string();
let thread_id = "thread-1".to_string();
runtime
.create_agent_job(
&AgentJobCreateParams {
id: job_id.clone(),
name: "test-job".to_string(),
instruction: "Return a result".to_string(),
auto_export: true,
max_runtime_seconds: None,
output_schema_json: None,
input_headers: vec!["path".to_string()],
input_csv_path: "/tmp/in.csv".to_string(),
output_csv_path: "/tmp/out.csv".to_string(),
},
&[AgentJobItemCreateParams {
item_id: item_id.clone(),
row_index: 0,
source_id: None,
row_json: json!({"path":"file-1"}),
}],
)
.await?;
runtime.mark_agent_job_running(job_id.as_str()).await?;
let marked_running = runtime
.mark_agent_job_item_running(job_id.as_str(), item_id.as_str())
.await?;
assert!(marked_running);
let accepted = runtime
.report_agent_job_item_result_and_cancel_job(
job_id.as_str(),
item_id.as_str(),
thread_id.as_str(),
&json!({"ok": true}),
"cancelled by worker request",
)
.await?;
assert!(accepted);
let job = runtime
.get_agent_job(job_id.as_str())
.await?
.expect("job should exist");
assert_eq!(job.status, AgentJobStatus::Cancelled);
let item = runtime
.get_agent_job_item(job_id.as_str(), item_id.as_str())
.await?
.expect("job item should exist");
assert_eq!(item.status, AgentJobItemStatus::Completed);
assert_eq!(item.result_json, Some(json!({"ok": true})));
assert_eq!(item.assigned_thread_id, None);
Ok(())
}
#[tokio::test]
async fn report_agent_job_item_result_rejects_late_reports() -> anyhow::Result<()> {
let codex_home = unique_temp_dir();

View File

@@ -195,7 +195,7 @@ fn legacy_non_tty_powershell_emits_output() {
pwsh.display().to_string(),
"-NoProfile".to_string(),
"-Command".to_string(),
"Write-Output LEGACY-NONTTY-DIRECT".to_string(),
"'LEGACY-NONTTY-DIRECT'".to_string(),
],
cwd.as_path(),
HashMap::new(),
@@ -378,7 +378,7 @@ fn legacy_capture_powershell_emits_output() {
pwsh.display().to_string(),
"-NoProfile".to_string(),
"-Command".to_string(),
"Write-Output LEGACY-CAPTURE-DIRECT".to_string(),
"'LEGACY-CAPTURE-DIRECT'".to_string(),
],
cwd.as_path(),
HashMap::new(),
@@ -419,7 +419,7 @@ fn legacy_tty_powershell_emits_output_and_accepts_input() {
"-NoProfile".to_string(),
"-NoExit".to_string(),
"-Command".to_string(),
"$PID; Write-Output ready".to_string(),
"$PID; 'ready'".to_string(),
],
cwd.as_path(),
HashMap::new(),
@@ -434,7 +434,7 @@ fn legacy_tty_powershell_emits_output_and_accepts_input() {
let writer = spawned.session.writer_sender();
writer
.send(b"Write-Output second\n".to_vec())
.send(b"'second'\n".to_vec())
.await
.expect("send second command");
writer