Compare commits

...

1 Commits

Author SHA1 Message Date
Joe Gershenson
71b734cff5 Add PR health regression review 2026-04-20 10:21:34 -07:00
2 changed files with 1008 additions and 0 deletions

375
.github/workflows/pr-health-review.yml vendored Normal file
View File

@@ -0,0 +1,375 @@
name: PR Health Review
on:
pull_request:
types:
- opened
- reopened
- ready_for_review
- synchronize
concurrency:
group: ${{ github.workflow }}::${{ github.event.pull_request.number || github.run_id }}
cancel-in-progress: true
jobs:
measure:
name: Measure PR health signals
if: >-
github.event_name == 'pull_request' &&
github.repository == 'openai/codex' &&
github.event.pull_request.head.repo.full_name == github.repository &&
!github.event.pull_request.draft
runs-on: ubuntu-latest
timeout-minutes: 45
permissions:
contents: read
outputs:
review_input_json: ${{ steps.prepare-review-input.outputs.review_input_json }}
steps:
- name: Checkout base
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6
with:
ref: ${{ github.event.pull_request.base.sha }}
- name: Keep base probe script
run: |
set -euo pipefail
cp scripts/context_bloat_probe.py "$RUNNER_TEMP/context_bloat_probe.py"
- name: Measure base
run: |
set -euo pipefail
mkdir -p "$RUNNER_TEMP/pr-health-review"
python3 "$RUNNER_TEMP/context_bloat_probe.py" \
--repo "$GITHUB_WORKSPACE" \
--work-dir "$RUNNER_TEMP/pr-health-review/base" \
--output "$RUNNER_TEMP/pr-health-review/base.ndjson" \
--summary-output "$RUNNER_TEMP/pr-health-review/base-summary.json" \
--build-timeout-seconds 1800 \
--run-timeout-seconds 120
- name: Checkout head
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6
with:
ref: ${{ github.event.pull_request.head.sha }}
- name: Measure head
run: |
set -euo pipefail
python3 "$RUNNER_TEMP/context_bloat_probe.py" \
--repo "$GITHUB_WORKSPACE" \
--work-dir "$RUNNER_TEMP/pr-health-review/head" \
--output "$RUNNER_TEMP/pr-health-review/head.ndjson" \
--summary-output "$RUNNER_TEMP/pr-health-review/head-summary.json" \
--build-timeout-seconds 1800 \
--run-timeout-seconds 120
- id: prepare-review-input
name: Prepare Codex review input
env:
BASE_SUMMARY: ${{ runner.temp }}/pr-health-review/base-summary.json
HEAD_SUMMARY: ${{ runner.temp }}/pr-health-review/head-summary.json
REVIEW_INPUT: ${{ runner.temp }}/pr-health-review/pr-health-review-input.json
PR_NUMBER: ${{ github.event.pull_request.number }}
PR_TITLE: ${{ github.event.pull_request.title }}
PR_AUTHOR: ${{ github.event.pull_request.user.login }}
BASE_SHA: ${{ github.event.pull_request.base.sha }}
HEAD_SHA: ${{ github.event.pull_request.head.sha }}
run: |
set -euo pipefail
python3 - <<'PY'
import json
import os
metrics = [
"request_body_bytes",
"context_component_bytes",
"instructions_bytes",
"input_json_bytes",
"tools_json_bytes",
"developer_message_json_bytes",
"user_message_json_bytes",
"tool_count",
"input_item_count",
]
with open(os.environ["BASE_SUMMARY"], encoding="utf-8") as handle:
base = json.load(handle)
with open(os.environ["HEAD_SUMMARY"], encoding="utf-8") as handle:
head = json.load(handle)
def slim(summary):
return {
"valid": summary.get("valid"),
"invalid_reasons": summary.get("invalid_reasons", []),
"build_elapsed_ms": summary.get("build_elapsed_ms"),
"scenarios": summary.get("scenarios", []),
"measurement_count": summary.get("measurement_count"),
"measurements": summary.get("measurements", []),
}
def row_key(row):
return f"{row.get('scenario', '')}:{row.get('run_label', '')}"
base_rows = {row_key(row): row for row in base.get("measurements", [])}
head_rows = {row_key(row): row for row in head.get("measurements", [])}
comparisons = []
for key in sorted(set(base_rows) | set(head_rows)):
base_row = base_rows.get(key, {})
head_row = head_rows.get(key, {})
scenario, _, run_label = key.partition(":")
for metric in metrics:
base_value = base_row.get(metric)
head_value = head_row.get(metric)
if not isinstance(base_value, (int, float)) or not isinstance(head_value, (int, float)):
continue
delta = head_value - base_value
percent = None if base_value == 0 else (delta / base_value) * 100
comparisons.append(
{
"scenario": scenario,
"run_label": run_label,
"metric": metric,
"base": base_value,
"head": head_value,
"delta": delta,
"percent": percent,
}
)
base_build_ms = base.get("build_elapsed_ms")
head_build_ms = head.get("build_elapsed_ms")
build_comparison = None
if isinstance(base_build_ms, (int, float)) and isinstance(head_build_ms, (int, float)):
delta = head_build_ms - base_build_ms
percent = None if base_build_ms == 0 else (delta / base_build_ms) * 100
build_comparison = {
"metric": "build_elapsed_ms",
"base": base_build_ms,
"head": head_build_ms,
"delta": delta,
"percent": percent,
}
payload = {
"pull_request": {
"number": os.environ["PR_NUMBER"],
"title": os.environ["PR_TITLE"],
"author": os.environ["PR_AUTHOR"],
"base_sha": os.environ["BASE_SHA"],
"head_sha": os.environ["HEAD_SHA"],
},
"thresholds": {
"context_bytes_min_delta": 1,
"context_bytes_min_percent": 0,
"build_ms_min_percent": 3,
},
"base_summary": slim(base),
"head_summary": slim(head),
"comparisons": comparisons,
"build_comparison": build_comparison,
}
encoded = json.dumps(payload, separators=(",", ":"), sort_keys=True)
with open(os.environ["REVIEW_INPUT"], "w", encoding="utf-8") as handle:
handle.write(json.dumps(payload, indent=2, sort_keys=True))
handle.write("\n")
with open(os.environ["GITHUB_OUTPUT"], "a", encoding="utf-8") as handle:
handle.write("review_input_json<<EOF\n")
handle.write(encoded)
handle.write("\nEOF\n")
PY
- name: Upload PR health artifacts
if: ${{ always() }}
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7
with:
name: pr-health-review-pr-${{ github.event.pull_request.number }}
path: |
${{ runner.temp }}/pr-health-review/*.json
${{ runner.temp }}/pr-health-review/*.ndjson
review-and-comment:
name: Ask Codex whether to comment
needs: measure
if: ${{ needs.measure.result == 'success' }}
runs-on: ubuntu-latest
timeout-minutes: 10
permissions:
contents: read
issues: write
pull-requests: write
steps:
- name: Write review input
env:
REVIEW_INPUT_JSON: ${{ needs.measure.outputs.review_input_json }}
run: |
set -euo pipefail
printf '%s\n' "$REVIEW_INPUT_JSON" > pr-health-review-input.json
- id: codex
name: Review PR health deltas
uses: openai/codex-action@0b91f4a2703c23df3102c3f0967d3c6db34eedef # v1
with:
openai-api-key: ${{ secrets.CODEX_OPENAI_API_KEY }}
allow-users: "*"
prompt: |
You are an asynchronous PR health reviewer for the Codex repository.
Read `pr-health-review-input.json`. It contains context-size probe summaries for the PR base and head, precomputed scenario metric comparisons, and a separate build-time comparison.
Produce two independent evaluations: one for context size and one for build time.
Use these rules:
- For `context_evaluation`, set `should_comment: true` if any matching scenario/run has a positive increase in `request_body_bytes` or `context_component_bytes`.
- For `context_evaluation`, use component fields (`instructions_bytes`, `input_json_bytes`, `tools_json_bytes`, `developer_message_json_bytes`, `user_message_json_bytes`) to explain where the context growth came from.
- For `build_time_evaluation`, set `should_comment: true` if the separate `build_comparison` shows `build_elapsed_ms` increased by more than 3%.
- Build time is a separate reason to comment, even if context size does not increase.
- Ignore per-scenario command timing if it appears in the raw summaries; it is not part of either evaluation.
- If either summary is invalid, do not claim a regression. Explain the measurement issue in both evaluation reasons and set both evaluation `should_comment` fields to false.
Set top-level `should_comment` to true if either independent evaluation has `should_comment: true`.
When top-level `should_comment` is true:
- `comment_body` must be ready to post on the PR.
- Mention `@codex-core-agent-team`.
- If context grew, ask the PR author to explain in the PR body why the context growth is OK, or to reduce it.
- If build time grew by more than 3%, ask the PR author to explain in the PR body why the build-time growth is OK, or to reduce it.
- Include separate concise bullets or sections for context and build-time findings as applicable, with base, head, delta, and percent.
- Keep the comment concise and factual.
When top-level `should_comment` is false, return an empty string for `comment_body`.
output-schema: |
{
"type": "object",
"properties": {
"should_comment": { "type": "boolean" },
"reason": { "type": "string" },
"context_evaluation": {
"type": "object",
"properties": {
"should_comment": { "type": "boolean" },
"reason": { "type": "string" },
"regressions": {
"type": "array",
"items": {
"type": "object",
"properties": {
"scenario": { "type": "string" },
"run_label": { "type": "string" },
"metric": { "type": "string" },
"base": { "type": "number" },
"head": { "type": "number" },
"delta": { "type": "number" },
"percent": { "type": "number" },
"explanation": { "type": "string" }
},
"required": ["scenario", "run_label", "metric", "base", "head", "delta", "percent", "explanation"],
"additionalProperties": false
}
}
},
"required": ["should_comment", "reason", "regressions"],
"additionalProperties": false
},
"build_time_evaluation": {
"type": "object",
"properties": {
"should_comment": { "type": "boolean" },
"reason": { "type": "string" },
"regressions": {
"type": "array",
"items": {
"type": "object",
"properties": {
"metric": { "type": "string" },
"base": { "type": "number" },
"head": { "type": "number" },
"delta": { "type": "number" },
"percent": { "type": "number" },
"explanation": { "type": "string" }
},
"required": ["metric", "base", "head", "delta", "percent", "explanation"],
"additionalProperties": false
}
}
},
"required": ["should_comment", "reason", "regressions"],
"additionalProperties": false
},
"comment_body": { "type": "string" }
},
"required": ["should_comment", "reason", "context_evaluation", "build_time_evaluation", "comment_body"],
"additionalProperties": false
}
- name: Comment on PR if needed
uses: actions/github-script@ed597411d8f924073f98dfc5c65a23a2325f34cd # v8
env:
CODEX_OUTPUT: ${{ steps.codex.outputs.final-message }}
with:
github-token: ${{ github.token }}
script: |
const marker = '<!-- codex-pr-health-review -->';
const raw = process.env.CODEX_OUTPUT ?? '';
let parsed;
try {
parsed = JSON.parse(raw);
} catch (error) {
core.info(`Codex output was not valid JSON. Raw output: ${raw}`);
core.info(`Parse error: ${error.message}`);
return;
}
const issue_number = context.payload.pull_request.number;
const comments = await github.paginate(github.rest.issues.listComments, {
owner: context.repo.owner,
repo: context.repo.repo,
issue_number,
per_page: 100,
});
const existing = comments.find((comment) => comment.body?.includes(marker));
if (parsed?.should_comment !== true) {
core.info(`No PR health comment needed. Reason: ${parsed?.reason ?? ''}`);
if (existing) {
await github.rest.issues.deleteComment({
owner: context.repo.owner,
repo: context.repo.repo,
comment_id: existing.id,
});
}
return;
}
let body = typeof parsed.comment_body === 'string' ? parsed.comment_body.trim() : '';
if (!body) {
core.info('Codex requested a comment but returned an empty comment_body.');
return;
}
if (!body.includes('@codex-core-agent-team')) {
body = `@codex-core-agent-team\n\n${body}`;
}
if (!/PR body|pull request body|explain/i.test(body)) {
body += '\n\nPlease explain in the PR body why this regression is OK, or reduce it.';
}
body = `${marker}\n${body}`;
if (existing) {
await github.rest.issues.updateComment({
owner: context.repo.owner,
repo: context.repo.repo,
comment_id: existing.id,
body,
});
return;
}
await github.rest.issues.createComment({
owner: context.repo.owner,
repo: context.repo.repo,
issue_number,
body,
});

633
scripts/context_bloat_probe.py Executable file
View File

@@ -0,0 +1,633 @@
#!/usr/bin/env python3
"""Measure Codex Responses request size for the current checkout.
This is intentionally a black-box harness: it builds the real `codex` CLI from
the current checkout, runs a few `codex exec` scenarios against a local mock
Responses API, and measures the request body the client would have sent to the
backend.
The NDJSON stream is detailed enough for debugging; `--summary-output` writes a
compact artifact that a later GitHub/codex-action reviewer can compare against
baseline data and use to explain whether a PR is causing context regressions.
"""
from __future__ import annotations
import argparse
import contextlib
import dataclasses
import datetime as dt
import http.server
import json
import os
import signal
import shutil
import subprocess
import sys
import threading
import time
from pathlib import Path
from typing import Any
HOST = "127.0.0.1"
DEFAULT_SCENARIOS = ("baseline", "resume", "project_instructions", "output_schema", "workspace_write")
SUMMARY_MEASUREMENT_FIELDS = (
"scenario",
"run_label",
"model",
"request_body_bytes",
"context_component_bytes",
"instructions_bytes",
"input_json_bytes",
"tools_json_bytes",
"developer_message_json_bytes",
"user_message_json_bytes",
"tool_count",
"input_item_count",
"build_elapsed_ms",
"command_elapsed_ms",
"command_status",
"shape_reasons",
)
@dataclasses.dataclass(frozen=True)
class CommandResult:
returncode: int
stdout: str
stderr: str
elapsed_ms: int
timed_out: bool = False
class ResponsesHandler(http.server.BaseHTTPRequestHandler):
server: Any
def log_message(self, fmt: str, *args: object) -> None:
return
def do_GET(self) -> None:
self._send_empty_json()
def do_POST(self) -> None:
length = int(self.headers.get("content-length", "0"))
raw_body = self.rfile.read(length)
path = self.path.split("?", 1)[0]
if path.endswith("/responses"):
self.server.captured.append(
{
"path": path,
"headers": {key.lower(): value for key, value in self.headers.items()},
"raw_body": raw_body,
}
)
self._send_sse()
return
self._send_empty_json()
def _send_empty_json(self) -> None:
body = b"{}"
self.send_response(200)
self.send_header("content-type", "application/json")
self.send_header("content-length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def _send_sse(self) -> None:
body = b'event: response.completed\ndata: {"type":"response.completed","response":{"id":"resp"}}\n\n'
self.send_response(200)
self.send_header("content-type", "text/event-stream")
self.send_header("cache-control", "no-cache")
self.send_header("content-length", str(len(body)))
self.end_headers()
self.wfile.write(body)
@contextlib.contextmanager
def mock_responses_server() -> Any:
server = http.server.ThreadingHTTPServer((HOST, 0), ResponsesHandler)
server.captured = []
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
try:
yield server
finally:
server.shutdown()
server.server_close()
thread.join(timeout=5)
def run_command(
args: list[str],
*,
cwd: Path,
env: dict[str, str] | None = None,
timeout_seconds: int,
) -> CommandResult:
start = time.monotonic()
proc = subprocess.Popen(
args,
cwd=cwd,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
start_new_session=True,
)
timed_out = False
try:
stdout, stderr = proc.communicate(timeout=timeout_seconds)
except subprocess.TimeoutExpired:
timed_out = True
with contextlib.suppress(ProcessLookupError):
os.killpg(proc.pid, signal.SIGTERM)
try:
stdout, stderr = proc.communicate(timeout=10)
except subprocess.TimeoutExpired:
with contextlib.suppress(ProcessLookupError):
os.killpg(proc.pid, signal.SIGKILL)
stdout, stderr = proc.communicate()
elapsed_ms = int((time.monotonic() - start) * 1000)
return CommandResult(proc.returncode, stdout, stderr, elapsed_ms, timed_out)
def tail(text: str, max_chars: int = 4000) -> str:
if len(text) <= max_chars:
return text
return text[-max_chars:]
def compact_json(value: Any) -> bytes:
return json.dumps(value, ensure_ascii=False, separators=(",", ":")).encode("utf-8")
def byte_len_text(value: str | None) -> int:
return len((value or "").encode("utf-8"))
def request_metrics(request: dict[str, Any]) -> tuple[dict[str, Any], list[str]]:
reasons: list[str] = []
body_json: Any = None
raw_body = request["raw_body"]
headers = request["headers"]
encoding = headers.get("content-encoding")
if encoding:
reasons.append(f"unsupported content-encoding: {encoding}")
else:
try:
body_json = json.loads(raw_body)
except json.JSONDecodeError as err:
reasons.append(f"request body is not JSON: {err}")
metrics: dict[str, Any] = {
"request_path": request["path"],
"request_body_bytes": len(raw_body),
}
if not isinstance(body_json, dict):
return metrics, reasons
instructions = body_json.get("instructions")
input_value = body_json.get("input")
tools_value = body_json.get("tools")
messages = input_value if isinstance(input_value, list) else []
developer_messages = [item for item in messages if item.get("role") in ("developer", "system")]
user_messages = [item for item in messages if item.get("role") == "user"]
if "input" not in body_json:
reasons.append("request JSON has no `input` field")
if "model" not in body_json:
reasons.append("request JSON has no `model` field")
metrics.update(
{
"model": body_json.get("model"),
"instructions_bytes": byte_len_text(instructions if isinstance(instructions, str) else None),
"input_json_bytes": len(compact_json(input_value)) if input_value is not None else 0,
"tools_json_bytes": len(compact_json(tools_value)) if tools_value is not None else 0,
"context_component_bytes": (
byte_len_text(instructions if isinstance(instructions, str) else None)
+ (len(compact_json(input_value)) if input_value is not None else 0)
+ (len(compact_json(tools_value)) if tools_value is not None else 0)
),
"input_item_count": len(messages),
"developer_message_count": len(developer_messages),
"developer_message_json_bytes": len(compact_json(developer_messages)),
"user_message_count": len(user_messages),
"user_message_json_bytes": len(compact_json(user_messages)),
"tool_count": len(tools_value) if isinstance(tools_value, list) else 0,
}
)
return metrics, reasons
def write_scenario_files(scenario: str, workspace: Path, home: Path) -> None:
workspace.mkdir(parents=True, exist_ok=True)
home.mkdir(parents=True, exist_ok=True)
if scenario == "project_instructions":
(workspace / "AGENTS.md").write_text(
"\n".join(
[
"# Project Instructions",
"",
"- Treat this workspace as a context-bloat measurement fixture.",
"- Prefer concise answers.",
"- Mention the fixture marker `context-bloat-project-doc` if asked about project policy.",
"- Do not run shell commands unless explicitly requested.",
"- Keep generated output deterministic for comparison.",
]
)
+ "\n",
encoding="utf-8",
)
if scenario == "output_schema":
(workspace / "schema.json").write_text(
json.dumps(
{
"type": "object",
"additionalProperties": False,
"properties": {"answer": {"type": "string"}},
"required": ["answer"],
},
indent=2,
sort_keys=True,
)
+ "\n",
encoding="utf-8",
)
def codex_env(home: Path) -> dict[str, str]:
env = os.environ.copy()
env["CODEX_HOME"] = str(home)
env["OPENAI_API_KEY"] = "dummy"
env["NO_COLOR"] = "1"
env.pop("CODEX_SANDBOX_NETWORK_DISABLED", None)
env.pop("CODEX_SANDBOX", None)
return env
def base_exec_args(server_url: str, workspace: Path, extra_config: list[str]) -> list[str]:
provider_override = (
f'model_providers.mock={{ name = "mock", base_url = "{server_url}/v1", '
f'env_key = "OPENAI_API_KEY", wire_api = "responses" }}'
)
args = [
"exec",
"--skip-git-repo-check",
"-c",
provider_override,
"-c",
'model_provider="mock"',
"-c",
f'chatgpt_base_url="{server_url}/backend-api"',
]
for config in extra_config:
args.extend(["-c", config])
args.extend(["-C", str(workspace)])
return args
def run_scenario(
*,
binary: Path,
scenario: str,
scenario_root: Path,
run_timeout_seconds: int,
extra_config: list[str],
) -> tuple[list[dict[str, Any]], list[str]]:
workspace = scenario_root / "workspace"
home = scenario_root / "home"
write_scenario_files(scenario, workspace, home)
env = codex_env(home)
prompt = f"Reply with exactly `done` for scenario {scenario}."
measurements: list[dict[str, Any]] = []
reasons: list[str] = []
with mock_responses_server() as server:
actual_server_url = f"http://{HOST}:{server.server_address[1]}"
common = base_exec_args(actual_server_url, workspace, extra_config)
if scenario == "workspace_write":
common.extend(["--sandbox", "workspace-write"])
if scenario == "output_schema":
common.extend(["--output-schema", str(workspace / "schema.json")])
first = run_command(
[str(binary), *common, prompt],
cwd=workspace,
env=env,
timeout_seconds=run_timeout_seconds,
)
measurements.extend(
collect_new_measurements(
server,
scenario=scenario,
run_label="first_turn",
command_result=first,
)
)
if first.returncode != 0:
reasons.append(f"{scenario} first turn failed: {tail(first.stderr or first.stdout)}")
return measurements, reasons
if scenario == "resume":
resume_prompt = "Reply with exactly `done` for the resumed turn."
resume_args = [*common, "resume", "--last", resume_prompt]
second = run_command(
[str(binary), *resume_args],
cwd=workspace,
env=env,
timeout_seconds=run_timeout_seconds,
)
new_measurements = collect_new_measurements(
server,
scenario=scenario,
run_label="second_turn",
command_result=second,
)
measurements.extend(new_measurements)
if second.returncode != 0:
reasons.append(f"resume second turn failed: {tail(second.stderr or second.stdout)}")
if not new_measurements:
reasons.append("resume second turn did not capture a Responses request")
if not measurements:
reasons.append(f"{scenario} did not capture any Responses requests")
return measurements, reasons
def collect_new_measurements(
server: Any,
*,
scenario: str,
run_label: str,
command_result: CommandResult,
) -> list[dict[str, Any]]:
requests = list(server.captured)
server.captured.clear()
rows: list[dict[str, Any]] = []
for index, request in enumerate(requests):
metrics, shape_reasons = request_metrics(request)
rows.append(
{
"record_type": "measurement",
"scenario": scenario,
"run_label": run_label,
"request_index": index,
"command_elapsed_ms": command_result.elapsed_ms,
"command_status": command_result.returncode,
"command_timed_out": command_result.timed_out,
"shape_reasons": shape_reasons,
**metrics,
}
)
return rows
def clean_build_dir(target_dir: Path, *, clean: bool) -> dict[str, Any]:
start = time.monotonic()
existed = target_dir.exists()
skipped_reason = None
if clean and existed:
shutil.rmtree(target_dir)
elif not clean:
skipped_reason = "custom --target-dir is not cleaned"
target_dir.mkdir(parents=True, exist_ok=True)
elapsed_ms = int((time.monotonic() - start) * 1000)
return {
"record_type": "cleanup",
"path": str(target_dir),
"removed": clean and existed,
"skipped_reason": skipped_reason,
"cleanup_elapsed_ms": elapsed_ms,
}
def build_codex(
repo: Path,
*,
target_dir: Path,
timeout_seconds: int,
locked: bool,
) -> tuple[Path | None, CommandResult]:
manifest = repo / "codex-rs/Cargo.toml"
args = [
"cargo",
"build",
"--manifest-path",
str(manifest),
"-p",
"codex-cli",
"--bin",
"codex",
]
if locked:
args.append("--locked")
env = os.environ.copy()
env["CARGO_TARGET_DIR"] = str(target_dir)
result = run_command(args, cwd=repo / "codex-rs", env=env, timeout_seconds=timeout_seconds)
binary = target_dir / "debug" / "codex"
if result.returncode == 0 and binary.exists():
return binary, result
return None, result
def run_probe(
*,
repo: Path,
work_dir: Path,
target_dir: Path,
clean_build: bool,
scenarios: list[str],
build_timeout_seconds: int,
run_timeout_seconds: int,
locked: bool,
extra_config: list[str],
emit: Any,
) -> dict[str, Any]:
run_root = work_dir / "runs" / str(time.time_ns())
cleanup_record = clean_build_dir(target_dir, clean=clean_build)
emit(cleanup_record)
binary, build = build_codex(repo, target_dir=target_dir, timeout_seconds=build_timeout_seconds, locked=locked)
build_record = {
"record_type": "build",
"status": build.returncode,
"build_elapsed_ms": build.elapsed_ms,
"timed_out": build.timed_out,
"stderr_tail": tail(build.stderr),
}
emit(build_record)
if binary is None:
summary = make_summary(
repo=repo,
cleanup_record=cleanup_record,
build_record=build_record,
scenarios=scenarios,
measurements=[],
invalid_reasons=[f"build failed: {tail(build.stderr or build.stdout)}"],
)
emit(probe_summary_record(summary))
return summary
all_measurements: list[dict[str, Any]] = []
scenario_reasons: list[str] = []
for scenario in scenarios:
scenario_measurements, reasons = run_scenario(
binary=binary,
scenario=scenario,
scenario_root=run_root / scenario,
run_timeout_seconds=run_timeout_seconds,
extra_config=extra_config,
)
for row in scenario_measurements:
row["build_elapsed_ms"] = build.elapsed_ms
emit(row)
all_measurements.extend(scenario_measurements)
scenario_reasons.extend(reasons)
shape_reasons = [
reason
for row in all_measurements
for reason in row.get("shape_reasons", [])
if reason
]
invalid_reasons = [*scenario_reasons, *shape_reasons]
has_baseline = any(
row.get("scenario") == "baseline"
and row.get("request_path", "").endswith("/responses")
and row.get("input_json_bytes", 0) > 0
for row in all_measurements
)
if not has_baseline:
invalid_reasons.append("no usable baseline /responses measurement")
summary = make_summary(
repo=repo,
cleanup_record=cleanup_record,
build_record=build_record,
scenarios=scenarios,
measurements=all_measurements,
invalid_reasons=invalid_reasons,
)
emit(probe_summary_record(summary))
return summary
def make_summary(
*,
repo: Path,
cleanup_record: dict[str, Any],
build_record: dict[str, Any],
scenarios: list[str],
measurements: list[dict[str, Any]],
invalid_reasons: list[str],
) -> dict[str, Any]:
return {
"generated_at": dt.datetime.now(tz=dt.timezone.utc).isoformat(timespec="seconds"),
"repo": str(repo),
"valid": not invalid_reasons,
"invalid_reasons": invalid_reasons,
"cleanup": cleanup_record,
"build": build_record,
"build_elapsed_ms": build_record.get("build_elapsed_ms"),
"scenarios": scenarios,
"measurement_count": len(measurements),
"measurements": [
{field: row.get(field) for field in SUMMARY_MEASUREMENT_FIELDS if field in row}
for row in measurements
],
}
def probe_summary_record(summary: dict[str, Any]) -> dict[str, Any]:
return {
"record_type": "probe_summary",
"valid": summary["valid"],
"invalid_reasons": summary["invalid_reasons"],
"build_elapsed_ms": summary["build_elapsed_ms"],
"measurement_count": summary["measurement_count"],
}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--repo", type=Path, default=Path(__file__).resolve().parents[1])
parser.add_argument("--work-dir", type=Path, default=Path("/tmp/codex-context-bloat-probe"))
parser.add_argument(
"--target-dir",
type=Path,
help="Shared Cargo target dir. Defaults under --work-dir. Custom target dirs are not cleaned.",
)
parser.add_argument("--scenario", action="append", choices=DEFAULT_SCENARIOS, help="Scenario to run. Repeatable.")
parser.add_argument("--build-timeout-seconds", type=int, default=1800)
parser.add_argument("--run-timeout-seconds", type=int, default=120)
parser.add_argument("--cargo-locked", action="store_true", help="Pass --locked to cargo build.")
parser.add_argument("--output", type=Path, help="Write NDJSON records to this file instead of stdout.")
parser.add_argument("--summary-output", type=Path, help="Write a compact JSON summary for CI/Codex review.")
parser.add_argument(
"--fail-on-invalid",
action="store_true",
help="Exit non-zero if the probe cannot capture a valid baseline measurement.",
)
parser.add_argument(
"-c",
"--config",
action="append",
default=[],
help="Extra Codex config override passed through to `codex exec -c`.",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
repo = args.repo.resolve()
work_dir = args.work_dir.resolve()
target_dir = (args.target_dir or work_dir / "target").resolve()
clean_build = args.target_dir is None
scenarios = args.scenario or list(DEFAULT_SCENARIOS)
work_dir.mkdir(parents=True, exist_ok=True)
if args.output:
args.output.parent.mkdir(parents=True, exist_ok=True)
out = args.output.open("w", encoding="utf-8") if args.output else sys.stdout
def emit(record: dict[str, Any]) -> None:
print(json.dumps(record, ensure_ascii=False, sort_keys=True), file=out, flush=True)
try:
emit(
{
"record_type": "probe_start",
"scenarios": scenarios,
}
)
summary = run_probe(
repo=repo,
work_dir=work_dir,
target_dir=target_dir,
clean_build=clean_build,
scenarios=scenarios,
build_timeout_seconds=args.build_timeout_seconds,
run_timeout_seconds=args.run_timeout_seconds,
locked=args.cargo_locked,
extra_config=args.config,
emit=emit,
)
if args.summary_output:
args.summary_output.parent.mkdir(parents=True, exist_ok=True)
args.summary_output.write_text(
json.dumps(summary, ensure_ascii=False, indent=2, sort_keys=True) + "\n",
encoding="utf-8",
)
if args.fail_on_invalid and not summary["valid"]:
return 1
return 0
finally:
if args.output:
out.close()
if __name__ == "__main__":
raise SystemExit(main())