Compare commits

..

1 Commits

Author SHA1 Message Date
pakrym-oai
e05f333e55 Python sdk 2025-09-30 14:35:22 -07:00
224 changed files with 4474 additions and 6306 deletions

View File

@@ -1,18 +0,0 @@
You are an assistant that triages new GitHub issues by identifying potential duplicates.
You will receive the following JSON files located in the current working directory:
- `codex-current-issue.json`: JSON object describing the newly created issue (fields: number, title, body).
- `codex-existing-issues.json`: JSON array of recent issues (each element includes number, title, body, createdAt).
Instructions:
- Load both files as JSON and review their contents carefully. The codex-existing-issues.json file is large, ensure you explore all of it.
- Compare the current issue against the existing issues to find up to five that appear to describe the same underlying problem or request.
- Only consider an issue a potential duplicate if there is a clear overlap in symptoms, feature requests, reproduction steps, or error messages.
- Prioritize newer issues when similarity is comparable.
- Ignore pull requests and issues whose similarity is tenuous.
- When unsure, prefer returning fewer matches.
Output requirements:
- Respond with a JSON array of issue numbers (integers), ordered from most likely duplicate to least.
- Include at most five numbers.
- If you find no plausible duplicates, respond with `[]`.

View File

@@ -1,26 +0,0 @@
You are an assistant that reviews GitHub issues for the repository.
Your job is to choose the most appropriate existing labels for the issue described later in this prompt.
Follow these rules:
- Only pick labels out of the list below.
- Prefer a small set of precise labels over many broad ones.
- If none of the labels fit, respond with an empty JSON array: []
- Output must be a JSON array of label names (strings) with no additional commentary.
Labels to apply:
1. bug — Reproducible defects in Codex products (CLI, VS Code extension, web, auth).
2. enhancement — Feature requests or usability improvements that ask for new capabilities, better ergonomics, or quality-of-life tweaks.
3. extension — VS Code (or other IDE) extension-specific issues.
4. windows-os — Bugs or friction specific to Windows environments (PowerShell behavior, path handling, copy/paste, OS-specific auth or tooling failures).
5. mcp — Topics involving Model Context Protocol servers/clients.
6. codex-web — Issues targeting the Codex web UI/Cloud experience.
8. azure — Problems or requests tied to Azure OpenAI deployments.
9. documentation — Updates or corrections needed in docs/README/config references (broken links, missing examples, outdated keys, clarification requests).
10. model-behavior — Undesirable LLM behavior: forgetting goals, refusing work, hallucinating environment details, quota misreports, or other reasoning/performance anomalies.
Issue information is available in environment variables:
ISSUE_NUMBER
ISSUE_TITLE
ISSUE_BODY
REPO_FULL_NAME

View File

@@ -27,7 +27,7 @@ jobs:
- name: Install dependencies
run: pnpm install --frozen-lockfile
# stage_npm_packages.py requires DotSlash when staging releases.
# build_npm_package.py requires DotSlash when staging releases.
- uses: facebook/install-dotslash@v2
- name: Stage npm package
@@ -37,12 +37,10 @@ jobs:
run: |
set -euo pipefail
CODEX_VERSION=0.40.0
OUTPUT_DIR="${RUNNER_TEMP}"
python3 ./scripts/stage_npm_packages.py \
PACK_OUTPUT="${RUNNER_TEMP}/codex-npm.tgz"
python3 ./codex-cli/scripts/build_npm_package.py \
--release-version "$CODEX_VERSION" \
--package codex \
--output-dir "$OUTPUT_DIR"
PACK_OUTPUT="${OUTPUT_DIR}/codex-npm-${CODEX_VERSION}.tgz"
--pack-output "$PACK_OUTPUT"
echo "pack_output=$PACK_OUTPUT" >> "$GITHUB_OUTPUT"
- name: Upload staged npm package artifact

View File

@@ -1,97 +0,0 @@
name: Issue Deduplicator
on:
issues:
types:
# - opened - disabled while testing
- labeled
jobs:
gather-duplicates:
name: Identify potential duplicates
if: ${{ github.event.action == 'opened' || (github.event.action == 'labeled' && github.event.label.name == 'codex-deduplicate') }}
runs-on: ubuntu-latest
permissions:
contents: read
outputs:
codex_output: ${{ steps.codex.outputs.final_message }}
steps:
- uses: actions/checkout@v4
- name: Prepare Codex inputs
env:
GH_TOKEN: ${{ github.token }}
run: |
set -eo pipefail
CURRENT_ISSUE_FILE=codex-current-issue.json
EXISTING_ISSUES_FILE=codex-existing-issues.json
gh issue list --repo "${{ github.repository }}" \
--json number,title,body,createdAt \
--limit 1000 \
--state all \
--search "sort:created-desc" \
| jq '.' \
> "$EXISTING_ISSUES_FILE"
gh issue view "${{ github.event.issue.number }}" \
--repo "${{ github.repository }}" \
--json number,title,body \
| jq '.' \
> "$CURRENT_ISSUE_FILE"
- id: codex
uses: openai/codex-action@main
with:
openai_api_key: ${{ secrets.CODEX_OPENAI_API_KEY }}
prompt_file: .github/prompts/issue-deduplicator.txt
require_repo_write: false
codex_version: 0.43.0-alpha.16
comment-on-issue:
name: Comment with potential duplicates
needs: gather-duplicates
if: ${{ needs.gather-duplicates.result != 'skipped' }}
runs-on: ubuntu-latest
permissions:
contents: read
issues: write
steps:
- name: Comment on issue
uses: actions/github-script@v7
env:
CODEX_OUTPUT: ${{ needs.gather-duplicates.outputs.codex_output }}
with:
github-token: ${{ github.token }}
script: |
let numbers;
try {
numbers = JSON.parse(process.env.CODEX_OUTPUT);
} catch (error) {
core.info(`Codex output was not valid JSON. Raw output: ${raw}`);
return;
}
if (numbers.length === 0) {
core.info('Codex reported no potential duplicates.');
return;
}
const lines = ['Potential duplicates detected:', ...numbers.map((value) => `- #${value}`)];
await github.rest.issues.createComment({
owner: context.repo.owner,
repo: context.repo.repo,
issue_number: context.payload.issue.number,
body: lines.join("\n"),
});
- name: Remove codex-deduplicate label
if: ${{ always() && github.event.action == 'labeled' && github.event.label.name == 'codex-deduplicate' }}
env:
GH_TOKEN: ${{ github.token }}
GH_REPO: ${{ github.repository }}
run: |
gh issue edit "${{ github.event.issue.number }}" --remove-label codex-deduplicate || true
echo "Attempted to remove label: codex-deduplicate"

View File

@@ -1,78 +0,0 @@
name: Issue Labeler
on:
issues:
types:
# - opened - disabled while testing
- labeled
jobs:
gather-labels:
name: Generate label suggestions
if: ${{ github.event.action == 'opened' || (github.event.action == 'labeled' && github.event.label.name == 'codex-label') }}
runs-on: ubuntu-latest
permissions:
contents: read
env:
ISSUE_NUMBER: ${{ github.event.issue.number }}
ISSUE_TITLE: ${{ github.event.issue.title }}
ISSUE_BODY: ${{ github.event.issue.body }}
REPO_FULL_NAME: ${{ github.repository }}
outputs:
codex_output: ${{ steps.codex.outputs.final_message }}
steps:
- uses: actions/checkout@v4
- id: codex
uses: openai/codex-action@main
with:
openai_api_key: ${{ secrets.CODEX_OPENAI_API_KEY }}
prompt_file: .github/prompts/issue-labeler.txt
require_repo_write: false
codex_version: 0.43.0-alpha.16
apply-labels:
name: Apply labels from Codex output
needs: gather-labels
if: ${{ needs.gather-labels.result != 'skipped' }}
runs-on: ubuntu-latest
permissions:
contents: read
issues: write
env:
GH_TOKEN: ${{ github.token }}
GH_REPO: ${{ github.repository }}
ISSUE_NUMBER: ${{ github.event.issue.number }}
CODEX_OUTPUT: ${{ needs.gather-labels.outputs.codex_output }}
steps:
- name: Apply labels
run: |
json=${CODEX_OUTPUT//$'\r'/}
if [ -z "$json" ]; then
echo "Codex produced no output. Skipping label application."
exit 0
fi
if ! printf '%s' "$json" | jq -e 'type == "array"' >/dev/null 2>&1; then
echo "Codex output was not a JSON array. Raw output: $json"
exit 0
fi
labels=$(printf '%s' "$json" | jq -r '.[] | tostring')
if [ -z "$labels" ]; then
echo "Codex returned an empty array. Nothing to do."
exit 0
fi
cmd=(gh issue edit "$ISSUE_NUMBER")
while IFS= read -r label; do
cmd+=(--add-label "$label")
done <<< "$labels"
"${cmd[@]}" || true
- name: Remove codex-label trigger
if: ${{ always() && github.event.action == 'labeled' && github.event.label.name == 'codex-label' }}
run: |
gh issue edit "$ISSUE_NUMBER" --remove-label codex-label || true
echo "Attempted to remove label: codex-label"

View File

@@ -216,30 +216,31 @@ jobs:
echo "npm_tag=" >> "$GITHUB_OUTPUT"
fi
- name: Setup pnpm
uses: pnpm/action-setup@v4
with:
run_install: false
- name: Setup Node.js for npm packaging
uses: actions/setup-node@v5
with:
node-version: 22
- name: Install dependencies
run: pnpm install --frozen-lockfile
# stage_npm_packages.py requires DotSlash when staging releases.
# build_npm_package.py requires DotSlash when staging releases.
- uses: facebook/install-dotslash@v2
- name: Stage npm packages
- name: Stage codex CLI npm package
env:
GH_TOKEN: ${{ github.token }}
run: |
./scripts/stage_npm_packages.py \
--release-version "${{ steps.release_name.outputs.name }}" \
set -euo pipefail
TMP_DIR="${RUNNER_TEMP}/npm-stage"
./codex-cli/scripts/build_npm_package.py \
--package codex \
--release-version "${{ steps.release_name.outputs.name }}" \
--staging-dir "${TMP_DIR}" \
--pack-output "${GITHUB_WORKSPACE}/dist/npm/codex-npm-${{ steps.release_name.outputs.name }}.tgz"
- name: Stage responses API proxy npm package
env:
GH_TOKEN: ${{ github.token }}
run: |
set -euo pipefail
TMP_DIR="${RUNNER_TEMP}/npm-stage-responses"
./codex-cli/scripts/build_npm_package.py \
--package codex-responses-api-proxy \
--package codex-sdk
--release-version "${{ steps.release_name.outputs.name }}" \
--staging-dir "${TMP_DIR}" \
--pack-output "${GITHUB_WORKSPACE}/dist/npm/codex-responses-api-proxy-npm-${{ steps.release_name.outputs.name }}.tgz"
- name: Create GitHub Release
uses: softprops/action-gh-release@v2
@@ -299,10 +300,6 @@ jobs:
--repo "${GITHUB_REPOSITORY}" \
--pattern "codex-responses-api-proxy-npm-${version}.tgz" \
--dir dist/npm
gh release download "$tag" \
--repo "${GITHUB_REPOSITORY}" \
--pattern "codex-sdk-npm-${version}.tgz" \
--dir dist/npm
# No NODE_AUTH_TOKEN needed because we use OIDC.
- name: Publish to npm
@@ -319,7 +316,6 @@ jobs:
tarballs=(
"codex-npm-${VERSION}.tgz"
"codex-responses-api-proxy-npm-${VERSION}.tgz"
"codex-sdk-npm-${VERSION}.tgz"
)
for tarball in "${tarballs[@]}"; do

View File

@@ -83,7 +83,6 @@ Codex CLI supports a rich set of configuration options, with preferences stored
- [**Authentication**](./docs/authentication.md)
- [Auth methods](./docs/authentication.md#forcing-a-specific-auth-method-advanced)
- [Login on a "Headless" machine](./docs/authentication.md#connecting-on-a-headless-machine)
- [**Non-interactive mode**](./docs/exec.md)
- [**Advanced**](./docs/advanced.md)
- [Non-interactive / CI mode](./docs/advanced.md#non-interactive--ci-mode)
- [Tracing / verbose logging](./docs/advanced.md#tracing--verbose-logging)

View File

@@ -1,19 +1,11 @@
# npm releases
Use the staging helper in the repo root to generate npm tarballs for a release. For
example, to stage the CLI, responses proxy, and SDK packages for version `0.6.0`:
Run the following:
To build the 0.2.x or later version of the npm module, which runs the Rust version of the CLI, build it as follows:
```bash
./scripts/stage_npm_packages.py \
--release-version 0.6.0 \
--package codex \
--package codex-responses-api-proxy \
--package codex-sdk
./codex-cli/scripts/build_npm_package.py --release-version 0.6.0
```
This downloads the native artifacts once, hydrates `vendor/` for each package, and writes
tarballs to `dist/npm/`.
If you need to invoke `build_npm_package.py` directly, run
`codex-cli/scripts/install_native_deps.py` first and pass `--vendor-src` pointing to the
directory that contains the populated `vendor/` tree.
Note this will create `./codex-cli/vendor/` as a side-effect.

View File

@@ -3,6 +3,7 @@
import argparse
import json
import re
import shutil
import subprocess
import sys
@@ -13,25 +14,19 @@ SCRIPT_DIR = Path(__file__).resolve().parent
CODEX_CLI_ROOT = SCRIPT_DIR.parent
REPO_ROOT = CODEX_CLI_ROOT.parent
RESPONSES_API_PROXY_NPM_ROOT = REPO_ROOT / "codex-rs" / "responses-api-proxy" / "npm"
CODEX_SDK_ROOT = REPO_ROOT / "sdk" / "typescript"
GITHUB_REPO = "openai/codex"
PACKAGE_NATIVE_COMPONENTS: dict[str, list[str]] = {
"codex": ["codex", "rg"],
"codex-responses-api-proxy": ["codex-responses-api-proxy"],
"codex-sdk": ["codex"],
}
COMPONENT_DEST_DIR: dict[str, str] = {
"codex": "codex",
"codex-responses-api-proxy": "codex-responses-api-proxy",
"rg": "path",
}
# The docs are not clear on what the expected value/format of
# workflow/workflowName is:
# https://cli.github.com/manual/gh_run_list
WORKFLOW_NAME = ".github/workflows/rust-release.yml"
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Build or stage the Codex CLI npm package.")
parser.add_argument(
"--package",
choices=("codex", "codex-responses-api-proxy", "codex-sdk"),
choices=("codex", "codex-responses-api-proxy"),
default="codex",
help="Which npm package to stage (default: codex).",
)
@@ -42,9 +37,14 @@ def parse_args() -> argparse.Namespace:
parser.add_argument(
"--release-version",
help=(
"Version to stage for npm release."
"Version to stage for npm release. When provided, the script also resolves the "
"matching rust-release workflow unless --workflow-url is supplied."
),
)
parser.add_argument(
"--workflow-url",
help="Optional GitHub Actions workflow run URL used to download native binaries.",
)
parser.add_argument(
"--staging-dir",
type=Path,
@@ -64,11 +64,6 @@ def parse_args() -> argparse.Namespace:
type=Path,
help="Path where the generated npm tarball should be written.",
)
parser.add_argument(
"--vendor-src",
type=Path,
help="Directory containing pre-installed native binaries to bundle (vendor root).",
)
return parser.parse_args()
@@ -91,19 +86,29 @@ def main() -> int:
try:
stage_sources(staging_dir, version, package)
vendor_src = args.vendor_src.resolve() if args.vendor_src else None
native_components = PACKAGE_NATIVE_COMPONENTS.get(package, [])
workflow_url = args.workflow_url
resolved_head_sha: str | None = None
if not workflow_url:
if release_version:
workflow = resolve_release_workflow(version)
workflow_url = workflow["url"]
resolved_head_sha = workflow.get("headSha")
else:
workflow_url = resolve_latest_alpha_workflow_url()
elif release_version:
try:
workflow = resolve_release_workflow(version)
resolved_head_sha = workflow.get("headSha")
except Exception:
resolved_head_sha = None
if native_components:
if vendor_src is None:
components_str = ", ".join(native_components)
raise RuntimeError(
"Native components "
f"({components_str}) required for package '{package}'. Provide --vendor-src "
"pointing to a directory containing pre-installed binaries."
)
if release_version and resolved_head_sha:
print(f"should `git checkout {resolved_head_sha}`")
copy_native_binaries(vendor_src, staging_dir, native_components)
if not workflow_url:
raise RuntimeError("Unable to determine workflow URL for native binaries.")
install_native_binaries(staging_dir, workflow_url, package)
if release_version:
staging_dir_str = str(staging_dir)
@@ -114,20 +119,12 @@ def main() -> int:
f" node {staging_dir_str}/bin/codex.js --version\n"
f" node {staging_dir_str}/bin/codex.js --help\n\n"
)
elif package == "codex-responses-api-proxy":
else:
print(
f"Staged version {version} for release in {staging_dir_str}\n\n"
"Verify the responses API proxy:\n"
f" node {staging_dir_str}/bin/codex-responses-api-proxy.js --help\n\n"
)
else:
print(
f"Staged version {version} for release in {staging_dir_str}\n\n"
"Verify the SDK contents:\n"
f" ls {staging_dir_str}/dist\n"
f" ls {staging_dir_str}/vendor\n"
" node -e \"import('./dist/index.js').then(() => console.log('ok'))\"\n\n"
)
else:
print(f"Staged package in {staging_dir}")
@@ -155,9 +152,10 @@ def prepare_staging_dir(staging_dir: Path | None) -> tuple[Path, bool]:
def stage_sources(staging_dir: Path, version: str, package: str) -> None:
bin_dir = staging_dir / "bin"
bin_dir.mkdir(parents=True, exist_ok=True)
if package == "codex":
bin_dir = staging_dir / "bin"
bin_dir.mkdir(parents=True, exist_ok=True)
shutil.copy2(CODEX_CLI_ROOT / "bin" / "codex.js", bin_dir / "codex.js")
rg_manifest = CODEX_CLI_ROOT / "bin" / "rg"
if rg_manifest.exists():
@@ -169,8 +167,6 @@ def stage_sources(staging_dir: Path, version: str, package: str) -> None:
package_json_path = CODEX_CLI_ROOT / "package.json"
elif package == "codex-responses-api-proxy":
bin_dir = staging_dir / "bin"
bin_dir.mkdir(parents=True, exist_ok=True)
launcher_src = RESPONSES_API_PROXY_NPM_ROOT / "bin" / "codex-responses-api-proxy.js"
shutil.copy2(launcher_src, bin_dir / "codex-responses-api-proxy.js")
@@ -179,9 +175,6 @@ def stage_sources(staging_dir: Path, version: str, package: str) -> None:
shutil.copy2(readme_src, staging_dir / "README.md")
package_json_path = RESPONSES_API_PROXY_NPM_ROOT / "package.json"
elif package == "codex-sdk":
package_json_path = CODEX_SDK_ROOT / "package.json"
stage_codex_sdk_sources(staging_dir)
else:
raise RuntimeError(f"Unknown package '{package}'.")
@@ -189,85 +182,91 @@ def stage_sources(staging_dir: Path, version: str, package: str) -> None:
package_json = json.load(fh)
package_json["version"] = version
if package == "codex-sdk":
scripts = package_json.get("scripts")
if isinstance(scripts, dict):
scripts.pop("prepare", None)
files = package_json.get("files")
if isinstance(files, list):
if "vendor" not in files:
files.append("vendor")
else:
package_json["files"] = ["dist", "vendor"]
with open(staging_dir / "package.json", "w", encoding="utf-8") as out:
json.dump(package_json, out, indent=2)
out.write("\n")
def run_command(cmd: list[str], cwd: Path | None = None) -> None:
print("+", " ".join(cmd))
subprocess.run(cmd, cwd=cwd, check=True)
def install_native_binaries(staging_dir: Path, workflow_url: str, package: str) -> None:
package_components = {
"codex": ["codex", "rg"],
"codex-responses-api-proxy": ["codex-responses-api-proxy"],
}
components = package_components.get(package)
if components is None:
raise RuntimeError(f"Unknown package '{package}'.")
cmd = ["./scripts/install_native_deps.py", "--workflow-url", workflow_url]
for component in components:
cmd.extend(["--component", component])
cmd.append(str(staging_dir))
subprocess.check_call(cmd, cwd=CODEX_CLI_ROOT)
def stage_codex_sdk_sources(staging_dir: Path) -> None:
package_root = CODEX_SDK_ROOT
run_command(["pnpm", "install", "--frozen-lockfile"], cwd=package_root)
run_command(["pnpm", "run", "build"], cwd=package_root)
dist_src = package_root / "dist"
if not dist_src.exists():
raise RuntimeError("codex-sdk build did not produce a dist directory.")
shutil.copytree(dist_src, staging_dir / "dist")
readme_src = package_root / "README.md"
if readme_src.exists():
shutil.copy2(readme_src, staging_dir / "README.md")
license_src = REPO_ROOT / "LICENSE"
if license_src.exists():
shutil.copy2(license_src, staging_dir / "LICENSE")
def resolve_latest_alpha_workflow_url() -> str:
version = determine_latest_alpha_version()
workflow = resolve_release_workflow(version)
return workflow["url"]
def copy_native_binaries(vendor_src: Path, staging_dir: Path, components: list[str]) -> None:
vendor_src = vendor_src.resolve()
if not vendor_src.exists():
raise RuntimeError(f"Vendor source directory not found: {vendor_src}")
components_set = {component for component in components if component in COMPONENT_DEST_DIR}
if not components_set:
return
vendor_dest = staging_dir / "vendor"
if vendor_dest.exists():
shutil.rmtree(vendor_dest)
vendor_dest.mkdir(parents=True, exist_ok=True)
for target_dir in vendor_src.iterdir():
if not target_dir.is_dir():
def determine_latest_alpha_version() -> str:
releases = list_releases()
best_key: tuple[int, int, int, int] | None = None
best_version: str | None = None
pattern = re.compile(r"^rust-v(\d+)\.(\d+)\.(\d+)-alpha\.(\d+)$")
for release in releases:
tag = release.get("tag_name", "")
match = pattern.match(tag)
if not match:
continue
key = tuple(int(match.group(i)) for i in range(1, 5))
if best_key is None or key > best_key:
best_key = key
best_version = (
f"{match.group(1)}.{match.group(2)}.{match.group(3)}-alpha.{match.group(4)}"
)
dest_target_dir = vendor_dest / target_dir.name
dest_target_dir.mkdir(parents=True, exist_ok=True)
if best_version is None:
raise RuntimeError("No alpha releases found when resolving workflow URL.")
return best_version
for component in components_set:
dest_dir_name = COMPONENT_DEST_DIR.get(component)
if dest_dir_name is None:
continue
src_component_dir = target_dir / dest_dir_name
if not src_component_dir.exists():
raise RuntimeError(
f"Missing native component '{component}' in vendor source: {src_component_dir}"
)
def list_releases() -> list[dict]:
stdout = subprocess.check_output(
["gh", "api", f"/repos/{GITHUB_REPO}/releases?per_page=100"],
text=True,
)
try:
releases = json.loads(stdout or "[]")
except json.JSONDecodeError as exc:
raise RuntimeError("Unable to parse releases JSON.") from exc
if not isinstance(releases, list):
raise RuntimeError("Unexpected response when listing releases.")
return releases
dest_component_dir = dest_target_dir / dest_dir_name
if dest_component_dir.exists():
shutil.rmtree(dest_component_dir)
shutil.copytree(src_component_dir, dest_component_dir)
def resolve_release_workflow(version: str) -> dict:
stdout = subprocess.check_output(
[
"gh",
"run",
"list",
"--branch",
f"rust-v{version}",
"--json",
"workflowName,url,headSha",
"--workflow",
WORKFLOW_NAME,
"--jq",
"first(.[])",
],
text=True,
)
workflow = json.loads(stdout or "[]")
if not workflow:
raise RuntimeError(f"Unable to find rust-release workflow for version {version}.")
return workflow
def run_npm_pack(staging_dir: Path, output_path: Path) -> Path:

44
codex-rs/Cargo.lock generated
View File

@@ -171,7 +171,8 @@ version = "0.0.0"
dependencies = [
"anyhow",
"assert_cmd",
"codex-app-server-protocol",
"codex-protocol",
"mcp-types",
"serde",
"serde_json",
"tokio",
@@ -656,7 +657,6 @@ dependencies = [
"app_test_support",
"assert_cmd",
"base64",
"codex-app-server-protocol",
"codex-arg0",
"codex-common",
"codex-core",
@@ -665,6 +665,7 @@ dependencies = [
"codex-protocol",
"codex-utils-json-to-toml",
"core_test_support",
"mcp-types",
"os_info",
"pretty_assertions",
"serde",
@@ -678,21 +679,6 @@ dependencies = [
"wiremock",
]
[[package]]
name = "codex-app-server-protocol"
version = "0.0.0"
dependencies = [
"anyhow",
"codex-protocol",
"paste",
"pretty_assertions",
"serde",
"serde_json",
"strum_macros 0.27.2",
"ts-rs",
"uuid",
]
[[package]]
name = "codex-apply-patch"
version = "0.0.0"
@@ -764,7 +750,6 @@ dependencies = [
"clap",
"clap_complete",
"codex-app-server",
"codex-app-server-protocol",
"codex-arg0",
"codex-chatgpt",
"codex-cloud-tasks",
@@ -786,6 +771,8 @@ dependencies = [
"supports-color",
"tempfile",
"tokio",
"tracing",
"tracing-subscriber",
]
[[package]]
@@ -835,7 +822,6 @@ name = "codex-common"
version = "0.0.0"
dependencies = [
"clap",
"codex-app-server-protocol",
"codex-core",
"codex-protocol",
"serde",
@@ -854,7 +840,6 @@ dependencies = [
"base64",
"bytes",
"chrono",
"codex-app-server-protocol",
"codex-apply-patch",
"codex-file-search",
"codex-mcp-client",
@@ -863,7 +848,6 @@ dependencies = [
"codex-rmcp-client",
"core_test_support",
"dirs",
"dunce",
"env-flags",
"escargot",
"eventsource-stream",
@@ -913,6 +897,7 @@ version = "0.0.0"
dependencies = [
"anyhow",
"assert_cmd",
"chrono",
"clap",
"codex-arg0",
"codex-common",
@@ -1011,8 +996,8 @@ dependencies = [
"anyhow",
"base64",
"chrono",
"codex-app-server-protocol",
"codex-core",
"codex-protocol",
"core_test_support",
"rand 0.9.2",
"reqwest",
@@ -1088,7 +1073,6 @@ name = "codex-otel"
version = "0.0.0"
dependencies = [
"chrono",
"codex-app-server-protocol",
"codex-protocol",
"eventsource-stream",
"opentelemetry",
@@ -1121,6 +1105,7 @@ dependencies = [
"icu_locale_core",
"mcp-types",
"mime_guess",
"pretty_assertions",
"serde",
"serde_json",
"serde_with",
@@ -1139,7 +1124,8 @@ version = "0.0.0"
dependencies = [
"anyhow",
"clap",
"codex-app-server-protocol",
"codex-protocol",
"mcp-types",
"ts-rs",
]
@@ -1187,7 +1173,6 @@ dependencies = [
"chrono",
"clap",
"codex-ansi-escape",
"codex-app-server-protocol",
"codex-arg0",
"codex-common",
"codex-core",
@@ -1200,7 +1185,6 @@ dependencies = [
"crossterm",
"diffy",
"dirs",
"dunce",
"image",
"insta",
"itertools 0.14.0",
@@ -1799,12 +1783,6 @@ version = "0.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f678cf4a922c215c63e0de95eb1ff08a958a81d47e485cf9da1e27bf6305cfa5"
[[package]]
name = "dunce"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813"
[[package]]
name = "dupe"
version = "0.9.1"
@@ -4071,7 +4049,7 @@ dependencies = [
"once_cell",
"socket2 0.5.10",
"tracing",
"windows-sys 0.59.0",
"windows-sys 0.52.0",
]
[[package]]

View File

@@ -3,7 +3,6 @@ members = [
"backend-client",
"ansi-escape",
"app-server",
"app-server-protocol",
"apply-patch",
"arg0",
"codex-backend-openapi-models",
@@ -48,7 +47,6 @@ edition = "2024"
app_test_support = { path = "app-server/tests/common" }
codex-ansi-escape = { path = "ansi-escape" }
codex-app-server = { path = "app-server" }
codex-app-server-protocol = { path = "app-server-protocol" }
codex-apply-patch = { path = "apply-patch" }
codex-arg0 = { path = "arg0" }
codex-chatgpt = { path = "chatgpt" }
@@ -97,7 +95,6 @@ derive_more = "2"
diffy = "0.4.2"
dirs = "6"
dotenvy = "0.15.7"
dunce = "1.0.4"
env-flags = "0.1.1"
env_logger = "0.11.5"
escargot = "0.5"
@@ -126,7 +123,6 @@ opentelemetry-semantic-conventions = "0.30.0"
opentelemetry_sdk = "0.30.0"
os_info = "3.12.0"
owo-colors = "4.2.0"
paste = "1.0.15"
path-absolutize = "3.1.1"
path-clean = "1.0.1"
pathdiff = "0.2"

View File

@@ -1,24 +0,0 @@
[package]
edition = "2024"
name = "codex-app-server-protocol"
version = { workspace = true }
[lib]
name = "codex_app_server_protocol"
path = "src/lib.rs"
[lints]
workspace = true
[dependencies]
codex-protocol = { workspace = true }
paste = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
strum_macros = { workspace = true }
ts-rs = { workspace = true }
uuid = { workspace = true, features = ["serde", "v7"] }
[dev-dependencies]
anyhow = { workspace = true }
pretty_assertions = { workspace = true }

View File

@@ -1,67 +0,0 @@
//! We do not do true JSON-RPC 2.0, as we neither send nor expect the
//! "jsonrpc": "2.0" field.
use serde::Deserialize;
use serde::Serialize;
use ts_rs::TS;
pub const JSONRPC_VERSION: &str = "2.0";
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize, Hash, Eq, TS)]
#[serde(untagged)]
pub enum RequestId {
String(String),
#[ts(type = "number")]
Integer(i64),
}
pub type Result = serde_json::Value;
/// Refers to any valid JSON-RPC object that can be decoded off the wire, or encoded to be sent.
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize, TS)]
#[serde(untagged)]
pub enum JSONRPCMessage {
Request(JSONRPCRequest),
Notification(JSONRPCNotification),
Response(JSONRPCResponse),
Error(JSONRPCError),
}
/// A request that expects a response.
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize, TS)]
pub struct JSONRPCRequest {
pub id: RequestId,
pub method: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub params: Option<serde_json::Value>,
}
/// A notification which does not expect a response.
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize, TS)]
pub struct JSONRPCNotification {
pub method: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub params: Option<serde_json::Value>,
}
/// A successful (non-error) response to a request.
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize, TS)]
pub struct JSONRPCResponse {
pub id: RequestId,
pub result: Result,
}
/// A response to a request that indicates an error occurred.
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize, TS)]
pub struct JSONRPCError {
pub error: JSONRPCErrorError,
pub id: RequestId,
}
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize, TS)]
pub struct JSONRPCErrorError {
pub code: i64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub data: Option<serde_json::Value>,
pub message: String,
}

View File

@@ -1,5 +0,0 @@
mod jsonrpc_lite;
mod protocol;
pub use jsonrpc_lite::*;
pub use protocol::*;

View File

@@ -22,8 +22,10 @@ codex-core = { workspace = true }
codex-file-search = { workspace = true }
codex-login = { workspace = true }
codex-protocol = { workspace = true }
codex-app-server-protocol = { workspace = true }
codex-utils-json-to-toml = { workspace = true }
# We should only be using mcp-types for JSON-RPC types: it would be nice to
# split this out into a separate crate at some point.
mcp-types = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
tokio = { workspace = true, features = [

View File

@@ -3,57 +3,10 @@ use crate::error_code::INVALID_REQUEST_ERROR_CODE;
use crate::fuzzy_file_search::run_fuzzy_file_search;
use crate::outgoing_message::OutgoingMessageSender;
use crate::outgoing_message::OutgoingNotification;
use codex_app_server_protocol::AddConversationListenerParams;
use codex_app_server_protocol::AddConversationSubscriptionResponse;
use codex_app_server_protocol::ApplyPatchApprovalParams;
use codex_app_server_protocol::ApplyPatchApprovalResponse;
use codex_app_server_protocol::ArchiveConversationParams;
use codex_app_server_protocol::ArchiveConversationResponse;
use codex_app_server_protocol::AuthStatusChangeNotification;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ConversationSummary;
use codex_app_server_protocol::ExecCommandApprovalParams;
use codex_app_server_protocol::ExecCommandApprovalResponse;
use codex_app_server_protocol::ExecOneOffCommandParams;
use codex_app_server_protocol::ExecOneOffCommandResponse;
use codex_app_server_protocol::FuzzyFileSearchParams;
use codex_app_server_protocol::FuzzyFileSearchResponse;
use codex_app_server_protocol::GetUserAgentResponse;
use codex_app_server_protocol::GetUserSavedConfigResponse;
use codex_app_server_protocol::GitDiffToRemoteResponse;
use codex_app_server_protocol::InputItem as WireInputItem;
use codex_app_server_protocol::InterruptConversationParams;
use codex_app_server_protocol::InterruptConversationResponse;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::ListConversationsParams;
use codex_app_server_protocol::ListConversationsResponse;
use codex_app_server_protocol::LoginApiKeyParams;
use codex_app_server_protocol::LoginApiKeyResponse;
use codex_app_server_protocol::LoginChatGptCompleteNotification;
use codex_app_server_protocol::LoginChatGptResponse;
use codex_app_server_protocol::NewConversationParams;
use codex_app_server_protocol::NewConversationResponse;
use codex_app_server_protocol::RemoveConversationListenerParams;
use codex_app_server_protocol::RemoveConversationSubscriptionResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::Result as JsonRpcResult;
use codex_app_server_protocol::ResumeConversationParams;
use codex_app_server_protocol::SendUserMessageParams;
use codex_app_server_protocol::SendUserMessageResponse;
use codex_app_server_protocol::SendUserTurnParams;
use codex_app_server_protocol::SendUserTurnResponse;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequestPayload;
use codex_app_server_protocol::SessionConfiguredNotification;
use codex_app_server_protocol::SetDefaultModelParams;
use codex_app_server_protocol::SetDefaultModelResponse;
use codex_app_server_protocol::UserInfoResponse;
use codex_app_server_protocol::UserSavedConfig;
use codex_core::AuthManager;
use codex_core::CodexConversation;
use codex_core::ConversationManager;
use codex_core::Cursor as RolloutCursor;
use codex_core::INTERACTIVE_SESSION_SOURCES;
use codex_core::NewConversation;
use codex_core::RolloutRecorder;
use codex_core::SessionMeta;
@@ -83,12 +36,58 @@ use codex_core::protocol::ReviewDecision;
use codex_login::ServerOptions as LoginServerOptions;
use codex_login::ShutdownHandle;
use codex_login::run_login_server;
use codex_protocol::ConversationId;
use codex_protocol::mcp_protocol::APPLY_PATCH_APPROVAL_METHOD;
use codex_protocol::mcp_protocol::AddConversationListenerParams;
use codex_protocol::mcp_protocol::AddConversationSubscriptionResponse;
use codex_protocol::mcp_protocol::ApplyPatchApprovalParams;
use codex_protocol::mcp_protocol::ApplyPatchApprovalResponse;
use codex_protocol::mcp_protocol::ArchiveConversationParams;
use codex_protocol::mcp_protocol::ArchiveConversationResponse;
use codex_protocol::mcp_protocol::AuthStatusChangeNotification;
use codex_protocol::mcp_protocol::ClientRequest;
use codex_protocol::mcp_protocol::ConversationId;
use codex_protocol::mcp_protocol::ConversationSummary;
use codex_protocol::mcp_protocol::EXEC_COMMAND_APPROVAL_METHOD;
use codex_protocol::mcp_protocol::ExecArbitraryCommandResponse;
use codex_protocol::mcp_protocol::ExecCommandApprovalParams;
use codex_protocol::mcp_protocol::ExecCommandApprovalResponse;
use codex_protocol::mcp_protocol::ExecOneOffCommandParams;
use codex_protocol::mcp_protocol::FuzzyFileSearchParams;
use codex_protocol::mcp_protocol::FuzzyFileSearchResponse;
use codex_protocol::mcp_protocol::GetUserAgentResponse;
use codex_protocol::mcp_protocol::GetUserSavedConfigResponse;
use codex_protocol::mcp_protocol::GitDiffToRemoteResponse;
use codex_protocol::mcp_protocol::InputItem as WireInputItem;
use codex_protocol::mcp_protocol::InterruptConversationParams;
use codex_protocol::mcp_protocol::InterruptConversationResponse;
use codex_protocol::mcp_protocol::ListConversationsParams;
use codex_protocol::mcp_protocol::ListConversationsResponse;
use codex_protocol::mcp_protocol::LoginApiKeyParams;
use codex_protocol::mcp_protocol::LoginApiKeyResponse;
use codex_protocol::mcp_protocol::LoginChatGptCompleteNotification;
use codex_protocol::mcp_protocol::LoginChatGptResponse;
use codex_protocol::mcp_protocol::NewConversationParams;
use codex_protocol::mcp_protocol::NewConversationResponse;
use codex_protocol::mcp_protocol::RemoveConversationListenerParams;
use codex_protocol::mcp_protocol::RemoveConversationSubscriptionResponse;
use codex_protocol::mcp_protocol::ResumeConversationParams;
use codex_protocol::mcp_protocol::SendUserMessageParams;
use codex_protocol::mcp_protocol::SendUserMessageResponse;
use codex_protocol::mcp_protocol::SendUserTurnParams;
use codex_protocol::mcp_protocol::SendUserTurnResponse;
use codex_protocol::mcp_protocol::ServerNotification;
use codex_protocol::mcp_protocol::SessionConfiguredNotification;
use codex_protocol::mcp_protocol::SetDefaultModelParams;
use codex_protocol::mcp_protocol::SetDefaultModelResponse;
use codex_protocol::mcp_protocol::UserInfoResponse;
use codex_protocol::mcp_protocol::UserSavedConfig;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::InputMessageKind;
use codex_protocol::protocol::USER_MESSAGE_BEGIN;
use codex_utils_json_to_toml::json_to_toml;
use mcp_types::JSONRPCErrorError;
use mcp_types::RequestId;
use std::collections::HashMap;
use std::ffi::OsStr;
use std::path::PathBuf;
@@ -194,43 +193,28 @@ impl CodexMessageProcessor {
ClientRequest::LoginApiKey { request_id, params } => {
self.login_api_key(request_id, params).await;
}
ClientRequest::LoginChatGpt {
request_id,
params: _,
} => {
ClientRequest::LoginChatGpt { request_id } => {
self.login_chatgpt(request_id).await;
}
ClientRequest::CancelLoginChatGpt { request_id, params } => {
self.cancel_login_chatgpt(request_id, params.login_id).await;
}
ClientRequest::LogoutChatGpt {
request_id,
params: _,
} => {
ClientRequest::LogoutChatGpt { request_id } => {
self.logout_chatgpt(request_id).await;
}
ClientRequest::GetAuthStatus { request_id, params } => {
self.get_auth_status(request_id, params).await;
}
ClientRequest::GetUserSavedConfig {
request_id,
params: _,
} => {
ClientRequest::GetUserSavedConfig { request_id } => {
self.get_user_saved_config(request_id).await;
}
ClientRequest::SetDefaultModel { request_id, params } => {
self.set_default_model(request_id, params).await;
}
ClientRequest::GetUserAgent {
request_id,
params: _,
} => {
ClientRequest::GetUserAgent { request_id } => {
self.get_user_agent(request_id).await;
}
ClientRequest::UserInfo {
request_id,
params: _,
} => {
ClientRequest::UserInfo { request_id } => {
self.get_user_info(request_id).await;
}
ClientRequest::FuzzyFileSearch { request_id, params } => {
@@ -387,7 +371,7 @@ impl CodexMessageProcessor {
self.outgoing
.send_response(
request_id,
codex_app_server_protocol::CancelLoginChatGptResponse {},
codex_protocol::mcp_protocol::CancelLoginChatGptResponse {},
)
.await;
} else {
@@ -423,7 +407,7 @@ impl CodexMessageProcessor {
self.outgoing
.send_response(
request_id,
codex_app_server_protocol::LogoutChatGptResponse {},
codex_protocol::mcp_protocol::LogoutChatGptResponse {},
)
.await;
@@ -441,7 +425,7 @@ impl CodexMessageProcessor {
async fn get_auth_status(
&self,
request_id: RequestId,
params: codex_app_server_protocol::GetAuthStatusParams,
params: codex_protocol::mcp_protocol::GetAuthStatusParams,
) {
let include_token = params.include_token.unwrap_or(false);
let do_refresh = params.refresh_token.unwrap_or(false);
@@ -456,7 +440,7 @@ impl CodexMessageProcessor {
let requires_openai_auth = self.config.model_provider.requires_openai_auth;
let response = if !requires_openai_auth {
codex_app_server_protocol::GetAuthStatusResponse {
codex_protocol::mcp_protocol::GetAuthStatusResponse {
auth_method: None,
auth_token: None,
requires_openai_auth: Some(false),
@@ -476,13 +460,13 @@ impl CodexMessageProcessor {
(None, None)
}
};
codex_app_server_protocol::GetAuthStatusResponse {
codex_protocol::mcp_protocol::GetAuthStatusResponse {
auth_method: reported_auth_method,
auth_token: token_opt,
requires_openai_auth: Some(true),
}
}
None => codex_app_server_protocol::GetAuthStatusResponse {
None => codex_protocol::mcp_protocol::GetAuthStatusResponse {
auth_method: None,
auth_token: None,
requires_openai_auth: Some(true),
@@ -633,7 +617,7 @@ impl CodexMessageProcessor {
.await
{
Ok(output) => {
let response = ExecOneOffCommandResponse {
let response = ExecArbitraryCommandResponse {
exit_code: output.exit_code,
stdout: output.stdout.text,
stderr: output.stderr.text,
@@ -709,7 +693,6 @@ impl CodexMessageProcessor {
&self.config.codex_home,
page_size,
cursor_ref,
INTERACTIVE_SESSION_SOURCES,
)
.await
{
@@ -810,7 +793,7 @@ impl CodexMessageProcessor {
});
// Reply with conversation id + model and initial messages (when present)
let response = codex_app_server_protocol::ResumeConversationResponse {
let response = codex_protocol::mcp_protocol::ResumeConversationResponse {
conversation_id,
model: session_configured.model.clone(),
initial_messages,
@@ -1270,8 +1253,9 @@ async fn apply_bespoke_event_handling(
reason,
grant_root,
};
let value = serde_json::to_value(&params).unwrap_or_default();
let rx = outgoing
.send_request(ServerRequestPayload::ApplyPatchApproval(params))
.send_request(APPLY_PATCH_APPROVAL_METHOD, Some(value))
.await;
// TODO(mbolin): Enforce a timeout so this task does not live indefinitely?
tokio::spawn(async move {
@@ -1291,8 +1275,9 @@ async fn apply_bespoke_event_handling(
cwd,
reason,
};
let value = serde_json::to_value(&params).unwrap_or_default();
let rx = outgoing
.send_request(ServerRequestPayload::ExecCommandApproval(params))
.send_request(EXEC_COMMAND_APPROVAL_METHOD, Some(value))
.await;
// TODO(mbolin): Enforce a timeout so this task does not live indefinitely?
@@ -1363,7 +1348,7 @@ fn derive_config_from_params(
async fn on_patch_approval_response(
event_id: String,
receiver: oneshot::Receiver<JsonRpcResult>,
receiver: oneshot::Receiver<mcp_types::Result>,
codex: Arc<CodexConversation>,
) {
let response = receiver.await;
@@ -1405,7 +1390,7 @@ async fn on_patch_approval_response(
async fn on_exec_approval_response(
event_id: String,
receiver: oneshot::Receiver<JsonRpcResult>,
receiver: oneshot::Receiver<mcp_types::Result>,
conversation: Arc<CodexConversation>,
) {
let response = receiver.await;

View File

@@ -4,8 +4,8 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use codex_app_server_protocol::FuzzyFileSearchResult;
use codex_file_search as file_search;
use codex_protocol::mcp_protocol::FuzzyFileSearchResult;
use tokio::task::JoinSet;
use tracing::warn;

View File

@@ -8,7 +8,7 @@ use codex_common::CliConfigOverrides;
use codex_core::config::Config;
use codex_core::config::ConfigOverrides;
use codex_app_server_protocol::JSONRPCMessage;
use mcp_types::JSONRPCMessage;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
@@ -111,17 +111,17 @@ pub async fn run_main(
let stdout_writer_handle = tokio::spawn(async move {
let mut stdout = io::stdout();
while let Some(outgoing_message) = outgoing_rx.recv().await {
let Ok(value) = serde_json::to_value(outgoing_message) else {
error!("Failed to convert OutgoingMessage to JSON value");
continue;
};
match serde_json::to_string(&value) {
Ok(mut json) => {
json.push('\n');
let msg: JSONRPCMessage = outgoing_message.into();
match serde_json::to_string(&msg) {
Ok(json) => {
if let Err(e) = stdout.write_all(json.as_bytes()).await {
error!("Failed to write to stdout: {e}");
break;
}
if let Err(e) = stdout.write_all(b"\n").await {
error!("Failed to write newline to stdout: {e}");
break;
}
}
Err(e) => error!("Failed to serialize JSONRPCMessage: {e}"),
}

View File

@@ -3,21 +3,20 @@ use std::path::PathBuf;
use crate::codex_message_processor::CodexMessageProcessor;
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
use crate::outgoing_message::OutgoingMessageSender;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::InitializeResponse;
use codex_protocol::mcp_protocol::ClientInfo;
use codex_protocol::mcp_protocol::ClientRequest;
use codex_protocol::mcp_protocol::InitializeResponse;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::JSONRPCResponse;
use codex_core::AuthManager;
use codex_core::ConversationManager;
use codex_core::config::Config;
use codex_core::default_client::USER_AGENT_SUFFIX;
use codex_core::default_client::get_codex_user_agent;
use codex_protocol::protocol::SessionSource;
use mcp_types::JSONRPCError;
use mcp_types::JSONRPCErrorError;
use mcp_types::JSONRPCNotification;
use mcp_types::JSONRPCRequest;
use mcp_types::JSONRPCResponse;
use std::sync::Arc;
pub(crate) struct MessageProcessor {
@@ -35,11 +34,8 @@ impl MessageProcessor {
config: Arc<Config>,
) -> Self {
let outgoing = Arc::new(outgoing);
let auth_manager = AuthManager::shared(config.codex_home.clone(), false);
let conversation_manager = Arc::new(ConversationManager::new(
auth_manager.clone(),
SessionSource::VSCode,
));
let auth_manager = AuthManager::shared(config.codex_home.clone());
let conversation_manager = Arc::new(ConversationManager::new(auth_manager.clone()));
let codex_message_processor = CodexMessageProcessor::new(
auth_manager,
conversation_manager,

View File

@@ -2,12 +2,16 @@ use std::collections::HashMap;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::Result;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ServerRequestPayload;
use codex_protocol::mcp_protocol::ServerNotification;
use mcp_types::JSONRPC_VERSION;
use mcp_types::JSONRPCError;
use mcp_types::JSONRPCErrorError;
use mcp_types::JSONRPCMessage;
use mcp_types::JSONRPCNotification;
use mcp_types::JSONRPCRequest;
use mcp_types::JSONRPCResponse;
use mcp_types::RequestId;
use mcp_types::Result;
use serde::Serialize;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
@@ -34,7 +38,8 @@ impl OutgoingMessageSender {
pub(crate) async fn send_request(
&self,
request: ServerRequestPayload,
method: &str,
params: Option<serde_json::Value>,
) -> oneshot::Receiver<Result> {
let id = RequestId::Integer(self.next_request_id.fetch_add(1, Ordering::Relaxed));
let outgoing_message_id = id.clone();
@@ -44,8 +49,11 @@ impl OutgoingMessageSender {
request_id_to_callback.insert(id, tx_approve);
}
let outgoing_message =
OutgoingMessage::Request(request.request_with_id(outgoing_message_id));
let outgoing_message = OutgoingMessage::Request(OutgoingRequest {
id: outgoing_message_id,
method: method.to_string(),
params,
});
let _ = self.sender.send(outgoing_message);
rx_approve
}
@@ -108,10 +116,8 @@ impl OutgoingMessageSender {
}
/// Outgoing message from the server to the client.
#[derive(Debug, Clone, Serialize)]
#[serde(untagged)]
pub(crate) enum OutgoingMessage {
Request(ServerRequest),
Request(OutgoingRequest),
Notification(OutgoingNotification),
/// AppServerNotification is specific to the case where this is run as an
/// "app server" as opposed to an MCP server.
@@ -120,6 +126,64 @@ pub(crate) enum OutgoingMessage {
Error(OutgoingError),
}
impl From<OutgoingMessage> for JSONRPCMessage {
fn from(val: OutgoingMessage) -> Self {
use OutgoingMessage::*;
match val {
Request(OutgoingRequest { id, method, params }) => {
JSONRPCMessage::Request(JSONRPCRequest {
jsonrpc: JSONRPC_VERSION.into(),
id,
method,
params,
})
}
Notification(OutgoingNotification { method, params }) => {
JSONRPCMessage::Notification(JSONRPCNotification {
jsonrpc: JSONRPC_VERSION.into(),
method,
params,
})
}
AppServerNotification(notification) => {
let method = notification.to_string();
let params = match notification.to_params() {
Ok(params) => Some(params),
Err(err) => {
warn!("failed to serialize notification params: {err}");
None
}
};
JSONRPCMessage::Notification(JSONRPCNotification {
jsonrpc: JSONRPC_VERSION.into(),
method,
params,
})
}
Response(OutgoingResponse { id, result }) => {
JSONRPCMessage::Response(JSONRPCResponse {
jsonrpc: JSONRPC_VERSION.into(),
id,
result,
})
}
Error(OutgoingError { id, error }) => JSONRPCMessage::Error(JSONRPCError {
jsonrpc: JSONRPC_VERSION.into(),
id,
error,
}),
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize)]
pub(crate) struct OutgoingRequest {
pub id: RequestId,
pub method: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub params: Option<serde_json::Value>,
}
#[derive(Debug, Clone, PartialEq, Serialize)]
pub(crate) struct OutgoingNotification {
pub method: String,
@@ -141,7 +205,7 @@ pub(crate) struct OutgoingError {
#[cfg(test)]
mod tests {
use codex_app_server_protocol::LoginChatGptCompleteNotification;
use codex_protocol::mcp_protocol::LoginChatGptCompleteNotification;
use pretty_assertions::assert_eq;
use serde_json::json;
use uuid::Uuid;
@@ -157,17 +221,18 @@ mod tests {
error: None,
});
let jsonrpc_notification = OutgoingMessage::AppServerNotification(notification);
let jsonrpc_notification: JSONRPCMessage =
OutgoingMessage::AppServerNotification(notification).into();
assert_eq!(
json!({
"method": "loginChatGptComplete",
"params": {
JSONRPCMessage::Notification(JSONRPCNotification {
jsonrpc: "2.0".into(),
method: "loginChatGptComplete".into(),
params: Some(json!({
"loginId": Uuid::nil(),
"success": true,
},
})),
}),
serde_json::to_value(jsonrpc_notification)
.expect("ensure the strum macros serialize the method field correctly"),
jsonrpc_notification,
"ensure the strum macros serialize the method field correctly"
);
}

View File

@@ -9,7 +9,8 @@ path = "lib.rs"
[dependencies]
anyhow = { workspace = true }
assert_cmd = { workspace = true }
codex-app-server-protocol = { workspace = true }
codex-protocol = { workspace = true }
mcp-types = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true, features = [

View File

@@ -2,8 +2,8 @@ mod mcp_process;
mod mock_model_server;
mod responses;
use codex_app_server_protocol::JSONRPCResponse;
pub use mcp_process::McpProcess;
use mcp_types::JSONRPCResponse;
pub use mock_model_server::create_mock_chat_completions_server;
pub use responses::create_apply_patch_sse_response;
pub use responses::create_final_assistant_message_sse_response;

View File

@@ -11,30 +11,29 @@ use tokio::process::ChildStdout;
use anyhow::Context;
use assert_cmd::prelude::*;
use codex_app_server_protocol::AddConversationListenerParams;
use codex_app_server_protocol::ArchiveConversationParams;
use codex_app_server_protocol::CancelLoginChatGptParams;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ClientNotification;
use codex_app_server_protocol::GetAuthStatusParams;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::InterruptConversationParams;
use codex_app_server_protocol::ListConversationsParams;
use codex_app_server_protocol::LoginApiKeyParams;
use codex_app_server_protocol::NewConversationParams;
use codex_app_server_protocol::RemoveConversationListenerParams;
use codex_app_server_protocol::ResumeConversationParams;
use codex_app_server_protocol::SendUserMessageParams;
use codex_app_server_protocol::SendUserTurnParams;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::SetDefaultModelParams;
use codex_protocol::mcp_protocol::AddConversationListenerParams;
use codex_protocol::mcp_protocol::ArchiveConversationParams;
use codex_protocol::mcp_protocol::CancelLoginChatGptParams;
use codex_protocol::mcp_protocol::ClientInfo;
use codex_protocol::mcp_protocol::ClientNotification;
use codex_protocol::mcp_protocol::GetAuthStatusParams;
use codex_protocol::mcp_protocol::InitializeParams;
use codex_protocol::mcp_protocol::InterruptConversationParams;
use codex_protocol::mcp_protocol::ListConversationsParams;
use codex_protocol::mcp_protocol::LoginApiKeyParams;
use codex_protocol::mcp_protocol::NewConversationParams;
use codex_protocol::mcp_protocol::RemoveConversationListenerParams;
use codex_protocol::mcp_protocol::ResumeConversationParams;
use codex_protocol::mcp_protocol::SendUserMessageParams;
use codex_protocol::mcp_protocol::SendUserTurnParams;
use codex_protocol::mcp_protocol::SetDefaultModelParams;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use mcp_types::JSONRPC_VERSION;
use mcp_types::JSONRPCMessage;
use mcp_types::JSONRPCNotification;
use mcp_types::JSONRPCRequest;
use mcp_types::JSONRPCResponse;
use mcp_types::RequestId;
use std::process::Command as StdCommand;
use tokio::process::Command;
@@ -318,6 +317,7 @@ impl McpProcess {
let request_id = self.next_request_id.fetch_add(1, Ordering::Relaxed);
let message = JSONRPCMessage::Request(JSONRPCRequest {
jsonrpc: JSONRPC_VERSION.into(),
id: RequestId::Integer(request_id),
method: method.to_string(),
params,
@@ -331,8 +331,12 @@ impl McpProcess {
id: RequestId,
result: serde_json::Value,
) -> anyhow::Result<()> {
self.send_jsonrpc_message(JSONRPCMessage::Response(JSONRPCResponse { id, result }))
.await
self.send_jsonrpc_message(JSONRPCMessage::Response(JSONRPCResponse {
jsonrpc: JSONRPC_VERSION.into(),
id,
result,
}))
.await
}
pub async fn send_notification(
@@ -341,6 +345,7 @@ impl McpProcess {
) -> anyhow::Result<()> {
let value = serde_json::to_value(notification)?;
self.send_jsonrpc_message(JSONRPCMessage::Notification(JSONRPCNotification {
jsonrpc: JSONRPC_VERSION.into(),
method: value
.get("method")
.and_then(|m| m.as_str())
@@ -368,7 +373,7 @@ impl McpProcess {
Ok(message)
}
pub async fn read_stream_until_request_message(&mut self) -> anyhow::Result<ServerRequest> {
pub async fn read_stream_until_request_message(&mut self) -> anyhow::Result<JSONRPCRequest> {
eprintln!("in read_stream_until_request_message()");
loop {
@@ -379,9 +384,7 @@ impl McpProcess {
eprintln!("notification: {message:?}");
}
JSONRPCMessage::Request(jsonrpc_request) => {
return jsonrpc_request.try_into().with_context(
|| "failed to deserialize ServerRequest from JSONRPCRequest",
);
return Ok(jsonrpc_request);
}
JSONRPCMessage::Error(_) => {
anyhow::bail!("unexpected JSONRPCMessage::Error: {message:?}");
@@ -423,7 +426,7 @@ impl McpProcess {
pub async fn read_stream_until_error_message(
&mut self,
request_id: RequestId,
) -> anyhow::Result<JSONRPCError> {
) -> anyhow::Result<mcp_types::JSONRPCError> {
loop {
let message = self.read_jsonrpc_message().await?;
match message {

View File

@@ -2,13 +2,13 @@ use std::path::Path;
use app_test_support::McpProcess;
use app_test_support::to_response;
use codex_app_server_protocol::ArchiveConversationParams;
use codex_app_server_protocol::ArchiveConversationResponse;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::NewConversationParams;
use codex_app_server_protocol::NewConversationResponse;
use codex_app_server_protocol::RequestId;
use codex_core::ARCHIVED_SESSIONS_SUBDIR;
use codex_protocol::mcp_protocol::ArchiveConversationParams;
use codex_protocol::mcp_protocol::ArchiveConversationResponse;
use codex_protocol::mcp_protocol::NewConversationParams;
use codex_protocol::mcp_protocol::NewConversationResponse;
use mcp_types::JSONRPCResponse;
use mcp_types::RequestId;
use tempfile::TempDir;
use tokio::time::timeout;

View File

@@ -2,13 +2,13 @@ use std::path::Path;
use app_test_support::McpProcess;
use app_test_support::to_response;
use codex_app_server_protocol::AuthMode;
use codex_app_server_protocol::GetAuthStatusParams;
use codex_app_server_protocol::GetAuthStatusResponse;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::LoginApiKeyParams;
use codex_app_server_protocol::LoginApiKeyResponse;
use codex_app_server_protocol::RequestId;
use codex_protocol::mcp_protocol::AuthMode;
use codex_protocol::mcp_protocol::GetAuthStatusParams;
use codex_protocol::mcp_protocol::GetAuthStatusResponse;
use codex_protocol::mcp_protocol::LoginApiKeyParams;
use codex_protocol::mcp_protocol::LoginApiKeyResponse;
use mcp_types::JSONRPCResponse;
use mcp_types::RequestId;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
use tokio::time::timeout;

View File

@@ -5,26 +5,25 @@ use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::create_mock_chat_completions_server;
use app_test_support::create_shell_sse_response;
use app_test_support::to_response;
use codex_app_server_protocol::AddConversationListenerParams;
use codex_app_server_protocol::AddConversationSubscriptionResponse;
use codex_app_server_protocol::ExecCommandApprovalParams;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::NewConversationParams;
use codex_app_server_protocol::NewConversationResponse;
use codex_app_server_protocol::RemoveConversationListenerParams;
use codex_app_server_protocol::RemoveConversationSubscriptionResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::SendUserMessageParams;
use codex_app_server_protocol::SendUserMessageResponse;
use codex_app_server_protocol::SendUserTurnParams;
use codex_app_server_protocol::SendUserTurnResponse;
use codex_app_server_protocol::ServerRequest;
use codex_core::protocol::AskForApproval;
use codex_core::protocol::SandboxPolicy;
use codex_core::protocol_config_types::ReasoningEffort;
use codex_core::protocol_config_types::ReasoningSummary;
use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
use codex_protocol::mcp_protocol::AddConversationListenerParams;
use codex_protocol::mcp_protocol::AddConversationSubscriptionResponse;
use codex_protocol::mcp_protocol::EXEC_COMMAND_APPROVAL_METHOD;
use codex_protocol::mcp_protocol::NewConversationParams;
use codex_protocol::mcp_protocol::NewConversationResponse;
use codex_protocol::mcp_protocol::RemoveConversationListenerParams;
use codex_protocol::mcp_protocol::RemoveConversationSubscriptionResponse;
use codex_protocol::mcp_protocol::SendUserMessageParams;
use codex_protocol::mcp_protocol::SendUserMessageResponse;
use codex_protocol::mcp_protocol::SendUserTurnParams;
use codex_protocol::mcp_protocol::SendUserTurnResponse;
use mcp_types::JSONRPCNotification;
use mcp_types::JSONRPCResponse;
use mcp_types::RequestId;
use pretty_assertions::assert_eq;
use std::env;
use tempfile::TempDir;
@@ -116,7 +115,7 @@ async fn test_codex_jsonrpc_conversation_flow() {
let send_user_id = mcp
.send_send_user_message_request(SendUserMessageParams {
conversation_id,
items: vec![codex_app_server_protocol::InputItem::Text {
items: vec![codex_protocol::mcp_protocol::InputItem::Text {
text: "text".to_string(),
}],
})
@@ -266,7 +265,7 @@ async fn test_send_user_turn_changes_approval_policy_behavior() {
let send_user_id = mcp
.send_send_user_message_request(SendUserMessageParams {
conversation_id,
items: vec![codex_app_server_protocol::InputItem::Text {
items: vec![codex_protocol::mcp_protocol::InputItem::Text {
text: "run python".to_string(),
}],
})
@@ -291,28 +290,11 @@ async fn test_send_user_turn_changes_approval_policy_behavior() {
.await
.expect("waiting for exec approval request timeout")
.expect("exec approval request");
let ServerRequest::ExecCommandApproval { request_id, params } = request else {
panic!("expected ExecCommandApproval request, got: {request:?}");
};
assert_eq!(
ExecCommandApprovalParams {
conversation_id,
call_id: "call1".to_string(),
command: vec![
"python3".to_string(),
"-c".to_string(),
"print(42)".to_string(),
],
cwd: working_directory.clone(),
reason: None,
},
params
);
assert_eq!(request.method, EXEC_COMMAND_APPROVAL_METHOD);
// Approve so the first turn can complete
mcp.send_response(
request_id,
request.id,
serde_json::json!({ "decision": codex_core::protocol::ReviewDecision::Approved }),
)
.await
@@ -331,7 +313,7 @@ async fn test_send_user_turn_changes_approval_policy_behavior() {
let send_turn_id = mcp
.send_send_user_turn_request(SendUserTurnParams {
conversation_id,
items: vec![codex_app_server_protocol::InputItem::Text {
items: vec![codex_protocol::mcp_protocol::InputItem::Text {
text: "run python again".to_string(),
}],
cwd: working_directory.clone(),

View File

@@ -3,18 +3,18 @@ use std::path::Path;
use app_test_support::McpProcess;
use app_test_support::to_response;
use codex_app_server_protocol::GetUserSavedConfigResponse;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::Profile;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::SandboxSettings;
use codex_app_server_protocol::Tools;
use codex_app_server_protocol::UserSavedConfig;
use codex_core::protocol::AskForApproval;
use codex_protocol::config_types::ReasoningEffort;
use codex_protocol::config_types::ReasoningSummary;
use codex_protocol::config_types::SandboxMode;
use codex_protocol::config_types::Verbosity;
use codex_protocol::mcp_protocol::GetUserSavedConfigResponse;
use codex_protocol::mcp_protocol::Profile;
use codex_protocol::mcp_protocol::SandboxSettings;
use codex_protocol::mcp_protocol::Tools;
use codex_protocol::mcp_protocol::UserSavedConfig;
use mcp_types::JSONRPCResponse;
use mcp_types::RequestId;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
use tokio::time::timeout;

View File

@@ -4,15 +4,15 @@ use app_test_support::McpProcess;
use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::create_mock_chat_completions_server;
use app_test_support::to_response;
use codex_app_server_protocol::AddConversationListenerParams;
use codex_app_server_protocol::AddConversationSubscriptionResponse;
use codex_app_server_protocol::InputItem;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::NewConversationParams;
use codex_app_server_protocol::NewConversationResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::SendUserMessageParams;
use codex_app_server_protocol::SendUserMessageResponse;
use codex_protocol::mcp_protocol::AddConversationListenerParams;
use codex_protocol::mcp_protocol::AddConversationSubscriptionResponse;
use codex_protocol::mcp_protocol::InputItem;
use codex_protocol::mcp_protocol::NewConversationParams;
use codex_protocol::mcp_protocol::NewConversationResponse;
use codex_protocol::mcp_protocol::SendUserMessageParams;
use codex_protocol::mcp_protocol::SendUserMessageResponse;
use mcp_types::JSONRPCResponse;
use mcp_types::RequestId;
use pretty_assertions::assert_eq;
use serde_json::json;
use tempfile::TempDir;

View File

@@ -1,6 +1,6 @@
use app_test_support::McpProcess;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use mcp_types::JSONRPCResponse;
use mcp_types::RequestId;
use pretty_assertions::assert_eq;
use serde_json::json;
use tempfile::TempDir;

View File

@@ -3,17 +3,17 @@
use std::path::Path;
use codex_app_server_protocol::AddConversationListenerParams;
use codex_app_server_protocol::InterruptConversationParams;
use codex_app_server_protocol::InterruptConversationResponse;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::NewConversationParams;
use codex_app_server_protocol::NewConversationResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::SendUserMessageParams;
use codex_app_server_protocol::SendUserMessageResponse;
use codex_core::protocol::TurnAbortReason;
use codex_protocol::mcp_protocol::AddConversationListenerParams;
use codex_protocol::mcp_protocol::InterruptConversationParams;
use codex_protocol::mcp_protocol::InterruptConversationResponse;
use codex_protocol::mcp_protocol::NewConversationParams;
use codex_protocol::mcp_protocol::NewConversationResponse;
use codex_protocol::mcp_protocol::SendUserMessageParams;
use codex_protocol::mcp_protocol::SendUserMessageResponse;
use core_test_support::skip_if_no_network;
use mcp_types::JSONRPCResponse;
use mcp_types::RequestId;
use tempfile::TempDir;
use tokio::time::timeout;
@@ -100,7 +100,7 @@ async fn shell_command_interruption() -> anyhow::Result<()> {
let send_user_id = mcp
.send_send_user_message_request(SendUserMessageParams {
conversation_id,
items: vec![codex_app_server_protocol::InputItem::Text {
items: vec![codex_protocol::mcp_protocol::InputItem::Text {
text: "run first sleep command".to_string(),
}],
})

View File

@@ -3,16 +3,16 @@ use std::path::Path;
use app_test_support::McpProcess;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::ListConversationsParams;
use codex_app_server_protocol::ListConversationsResponse;
use codex_app_server_protocol::NewConversationParams; // reused for overrides shape
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ResumeConversationParams;
use codex_app_server_protocol::ResumeConversationResponse;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::SessionConfiguredNotification;
use codex_protocol::mcp_protocol::ListConversationsParams;
use codex_protocol::mcp_protocol::ListConversationsResponse;
use codex_protocol::mcp_protocol::NewConversationParams; // reused for overrides shape
use codex_protocol::mcp_protocol::ResumeConversationParams;
use codex_protocol::mcp_protocol::ResumeConversationResponse;
use codex_protocol::mcp_protocol::ServerNotification;
use codex_protocol::mcp_protocol::SessionConfiguredNotification;
use mcp_types::JSONRPCNotification;
use mcp_types::JSONRPCResponse;
use mcp_types::RequestId;
use pretty_assertions::assert_eq;
use serde_json::json;
use tempfile::TempDir;

View File

@@ -3,15 +3,15 @@ use std::time::Duration;
use app_test_support::McpProcess;
use app_test_support::to_response;
use codex_app_server_protocol::CancelLoginChatGptParams;
use codex_app_server_protocol::CancelLoginChatGptResponse;
use codex_app_server_protocol::GetAuthStatusParams;
use codex_app_server_protocol::GetAuthStatusResponse;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::LoginChatGptResponse;
use codex_app_server_protocol::LogoutChatGptResponse;
use codex_app_server_protocol::RequestId;
use codex_login::login_with_api_key;
use codex_protocol::mcp_protocol::CancelLoginChatGptParams;
use codex_protocol::mcp_protocol::CancelLoginChatGptResponse;
use codex_protocol::mcp_protocol::GetAuthStatusParams;
use codex_protocol::mcp_protocol::GetAuthStatusResponse;
use codex_protocol::mcp_protocol::LoginChatGptResponse;
use codex_protocol::mcp_protocol::LogoutChatGptResponse;
use mcp_types::JSONRPCResponse;
use mcp_types::RequestId;
use tempfile::TempDir;
use tokio::time::timeout;

View File

@@ -4,17 +4,17 @@ use app_test_support::McpProcess;
use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::create_mock_chat_completions_server;
use app_test_support::to_response;
use codex_app_server_protocol::AddConversationListenerParams;
use codex_app_server_protocol::AddConversationSubscriptionResponse;
use codex_app_server_protocol::InputItem;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::NewConversationParams;
use codex_app_server_protocol::NewConversationResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::SendUserMessageParams;
use codex_app_server_protocol::SendUserMessageResponse;
use codex_protocol::ConversationId;
use codex_protocol::mcp_protocol::AddConversationListenerParams;
use codex_protocol::mcp_protocol::AddConversationSubscriptionResponse;
use codex_protocol::mcp_protocol::ConversationId;
use codex_protocol::mcp_protocol::InputItem;
use codex_protocol::mcp_protocol::NewConversationParams;
use codex_protocol::mcp_protocol::NewConversationResponse;
use codex_protocol::mcp_protocol::SendUserMessageParams;
use codex_protocol::mcp_protocol::SendUserMessageResponse;
use mcp_types::JSONRPCNotification;
use mcp_types::JSONRPCResponse;
use mcp_types::RequestId;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
use tokio::time::timeout;

View File

@@ -2,11 +2,11 @@ use std::path::Path;
use app_test_support::McpProcess;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::SetDefaultModelParams;
use codex_app_server_protocol::SetDefaultModelResponse;
use codex_core::config::ConfigToml;
use codex_protocol::mcp_protocol::SetDefaultModelParams;
use codex_protocol::mcp_protocol::SetDefaultModelResponse;
use mcp_types::JSONRPCResponse;
use mcp_types::RequestId;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
use tokio::time::timeout;

View File

@@ -1,8 +1,8 @@
use app_test_support::McpProcess;
use app_test_support::to_response;
use codex_app_server_protocol::GetUserAgentResponse;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_protocol::mcp_protocol::GetUserAgentResponse;
use mcp_types::JSONRPCResponse;
use mcp_types::RequestId;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
use tokio::time::timeout;

View File

@@ -5,14 +5,14 @@ use app_test_support::McpProcess;
use app_test_support::to_response;
use base64::Engine;
use base64::engine::general_purpose::URL_SAFE_NO_PAD;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::UserInfoResponse;
use codex_core::auth::AuthDotJson;
use codex_core::auth::get_auth_file;
use codex_core::auth::write_auth_json;
use codex_core::token_data::IdTokenInfo;
use codex_core::token_data::TokenData;
use codex_protocol::mcp_protocol::UserInfoResponse;
use mcp_types::JSONRPCResponse;
use mcp_types::RequestId;
use pretty_assertions::assert_eq;
use serde_json::json;
use tempfile::TempDir;

View File

@@ -28,7 +28,6 @@ codex-login = { workspace = true }
codex-mcp-server = { workspace = true }
codex-process-hardening = { workspace = true }
codex-protocol = { workspace = true }
codex-app-server-protocol = { workspace = true }
codex-protocol-ts = { workspace = true }
codex-responses-api-proxy = { workspace = true }
codex-tui = { workspace = true }
@@ -44,6 +43,8 @@ tokio = { workspace = true, features = [
"rt-multi-thread",
"signal",
] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
[dev-dependencies]
assert_cmd = { workspace = true }

View File

@@ -1,6 +1,7 @@
pub mod debug_sandbox;
mod exit_status;
pub mod login;
pub mod proto;
use clap::Parser;
use codex_common::CliConfigOverrides;

View File

@@ -1,4 +1,3 @@
use codex_app_server_protocol::AuthMode;
use codex_common::CliConfigOverrides;
use codex_core::CodexAuth;
use codex_core::auth::CLIENT_ID;
@@ -9,6 +8,7 @@ use codex_core::config::ConfigOverrides;
use codex_login::ServerOptions;
use codex_login::run_device_code_login;
use codex_login::run_login_server;
use codex_protocol::mcp_protocol::AuthMode;
use std::path::PathBuf;
pub async fn login_with_chatgpt(codex_home: PathBuf) -> std::io::Result<()> {

View File

@@ -12,6 +12,7 @@ use codex_cli::login::run_login_with_api_key;
use codex_cli::login::run_login_with_chatgpt;
use codex_cli::login::run_login_with_device_code;
use codex_cli::login::run_logout;
use codex_cli::proto;
use codex_cloud_tasks::Cli as CloudTasksCli;
use codex_common::CliConfigOverrides;
use codex_exec::Cli as ExecCli;
@@ -25,6 +26,7 @@ use supports_color::Stream;
mod mcp_cmd;
use crate::mcp_cmd::McpCli;
use crate::proto::ProtoCli;
/// Codex CLI
///
@@ -72,6 +74,10 @@ enum Subcommand {
/// [experimental] Run the app server.
AppServer,
/// Run the Protocol stream via stdin/stdout
#[clap(visible_alias = "p")]
Proto(ProtoCli),
/// Generate shell completion scripts.
Completion(CompletionCommand),
@@ -218,12 +224,25 @@ fn print_exit_messages(exit_info: AppExitInfo) {
}
}
/// As early as possible in the process lifecycle, apply hardening measures. We
/// skip this in debug builds to avoid interfering with debugging.
pub(crate) const CODEX_SECURE_MODE_ENV_VAR: &str = "CODEX_SECURE_MODE";
/// As early as possible in the process lifecycle, apply hardening measures
/// if the CODEX_SECURE_MODE environment variable is set to "1".
#[ctor::ctor]
#[cfg(not(debug_assertions))]
fn pre_main_hardening() {
codex_process_hardening::pre_main_hardening();
let secure_mode = match std::env::var(CODEX_SECURE_MODE_ENV_VAR) {
Ok(value) => value,
Err(_) => return,
};
if secure_mode == "1" {
codex_process_hardening::pre_main_hardening();
}
// Always clear this env var so child processes don't inherit it.
unsafe {
std::env::remove_var(CODEX_SECURE_MODE_ENV_VAR);
}
}
fn main() -> anyhow::Result<()> {
@@ -313,6 +332,13 @@ async fn cli_main(codex_linux_sandbox_exe: Option<PathBuf>) -> anyhow::Result<()
);
run_logout(logout_cli.config_overrides).await;
}
Some(Subcommand::Proto(mut proto_cli)) => {
prepend_config_flags(
&mut proto_cli.config_overrides,
root_config_overrides.clone(),
);
proto::run_main(proto_cli).await?;
}
Some(Subcommand::Completion(completion_cli)) => {
print_completion(completion_cli);
}
@@ -455,7 +481,7 @@ fn print_completion(cmd: CompletionCommand) {
mod tests {
use super::*;
use codex_core::protocol::TokenUsage;
use codex_protocol::ConversationId;
use codex_protocol::mcp_protocol::ConversationId;
fn finalize_from_args(args: &[&str]) -> TuiCli {
let cli = MultitoolCli::try_parse_from(args).expect("parse");

133
codex-rs/cli/src/proto.rs Normal file
View File

@@ -0,0 +1,133 @@
use std::io::IsTerminal;
use clap::Parser;
use codex_common::CliConfigOverrides;
use codex_core::AuthManager;
use codex_core::ConversationManager;
use codex_core::NewConversation;
use codex_core::config::Config;
use codex_core::config::ConfigOverrides;
use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
use codex_core::protocol::Submission;
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tracing::error;
use tracing::info;
#[derive(Debug, Parser)]
pub struct ProtoCli {
#[clap(skip)]
pub config_overrides: CliConfigOverrides,
}
pub async fn run_main(opts: ProtoCli) -> anyhow::Result<()> {
if std::io::stdin().is_terminal() {
anyhow::bail!("Protocol mode expects stdin to be a pipe, not a terminal");
}
tracing_subscriber::fmt()
.with_writer(std::io::stderr)
.init();
let ProtoCli { config_overrides } = opts;
let overrides_vec = config_overrides
.parse_overrides()
.map_err(anyhow::Error::msg)?;
let config = Config::load_with_cli_overrides(overrides_vec, ConfigOverrides::default())?;
// Use conversation_manager API to start a conversation
let conversation_manager =
ConversationManager::new(AuthManager::shared(config.codex_home.clone()));
let NewConversation {
conversation_id: _,
conversation,
session_configured,
} = conversation_manager.new_conversation(config).await?;
// Simulate streaming the session_configured event.
let synthetic_event = Event {
// Fake id value.
id: "".to_string(),
msg: EventMsg::SessionConfigured(session_configured),
};
let session_configured_event = match serde_json::to_string(&synthetic_event) {
Ok(s) => s,
Err(e) => {
error!("Failed to serialize session_configured: {e}");
return Err(anyhow::Error::from(e));
}
};
println!("{session_configured_event}");
// Task that reads JSON lines from stdin and forwards to Submission Queue
let sq_fut = {
let conversation = conversation.clone();
async move {
let stdin = BufReader::new(tokio::io::stdin());
let mut lines = stdin.lines();
loop {
let result = tokio::select! {
_ = tokio::signal::ctrl_c() => {
break
},
res = lines.next_line() => res,
};
match result {
Ok(Some(line)) => {
let line = line.trim();
if line.is_empty() {
continue;
}
match serde_json::from_str::<Submission>(line) {
Ok(sub) => {
if let Err(e) = conversation.submit_with_id(sub).await {
error!("{e:#}");
break;
}
}
Err(e) => {
error!("invalid submission: {e}");
}
}
}
_ => {
info!("Submission queue closed");
break;
}
}
}
}
};
// Task that reads events from the agent and prints them as JSON lines to stdout
let eq_fut = async move {
loop {
let event = tokio::select! {
_ = tokio::signal::ctrl_c() => break,
event = conversation.next_event() => event,
};
match event {
Ok(event) => {
let event_str = match serde_json::to_string(&event) {
Ok(s) => s,
Err(e) => {
error!("Failed to serialize event: {e}");
continue;
}
};
println!("{event_str}");
}
Err(e) => {
error!("{e:#}");
break;
}
}
}
info!("Event queue closed");
};
tokio::join!(sq_fut, eq_fut);
Ok(())
}

View File

@@ -190,7 +190,7 @@ pub async fn run_main(_cli: Cli, _codex_linux_sandbox_exe: Option<PathBuf>) -> a
// Require ChatGPT login (SWIC). Exit with a clear message if missing.
let _token = match codex_core::config::find_codex_home()
.ok()
.map(|home| codex_login::AuthManager::new(home, false))
.map(codex_login::AuthManager::new)
.and_then(|am| am.auth())
{
Some(auth) => {
@@ -839,9 +839,6 @@ pub async fn run_main(_cli: Cli, _codex_linux_sandbox_exe: Option<PathBuf>) -> a
&& matches!(key.code, KeyCode::Char('n') | KeyCode::Char('N'))
|| matches!(key.code, KeyCode::Char('\u{000E}'));
if is_ctrl_n {
if app.new_task.is_none() {
continue;
}
if app.best_of_modal.is_some() {
app.best_of_modal = None;
needs_redraw = true;

View File

@@ -262,9 +262,9 @@ fn draw_footer(frame: &mut Frame, area: Rect, app: &mut App) {
help.push(": Apply ".dim());
}
help.push("o : Set Env ".dim());
help.push("Ctrl+N".dim());
help.push(format!(": Attempts {}x ", app.best_of_n).dim());
if app.new_task.is_some() {
help.push("Ctrl+N".dim());
help.push(format!(": Attempts {}x ", app.best_of_n).dim());
help.push("(editing new task) ".dim());
} else {
help.push("n : New Task ".dim());
@@ -1004,40 +1004,32 @@ pub fn draw_best_of_modal(frame: &mut Frame, area: Rect, app: &mut App) {
use ratatui::widgets::Wrap;
let inner = overlay_outer(area);
const MAX_WIDTH: u16 = 40;
const MIN_WIDTH: u16 = 20;
const MAX_HEIGHT: u16 = 12;
const MIN_HEIGHT: u16 = 6;
let modal_width = inner.width.min(MAX_WIDTH).max(inner.width.min(MIN_WIDTH));
let modal_height = inner
.height
.min(MAX_HEIGHT)
.max(inner.height.min(MIN_HEIGHT));
let modal_x = inner.x + (inner.width.saturating_sub(modal_width)) / 2;
let modal_y = inner.y + (inner.height.saturating_sub(modal_height)) / 2;
let modal_area = Rect::new(modal_x, modal_y, modal_width, modal_height);
let title = Line::from(vec!["Parallel Attempts".magenta().bold()]);
let block = overlay_block().title(title);
frame.render_widget(Clear, modal_area);
frame.render_widget(block.clone(), modal_area);
let content = overlay_content(modal_area);
frame.render_widget(Clear, inner);
frame.render_widget(block.clone(), inner);
let content = overlay_content(inner);
let rows = Layout::default()
.direction(Direction::Vertical)
.constraints([Constraint::Length(2), Constraint::Min(1)])
.split(content);
let hint = Paragraph::new(Line::from("Use ↑/↓ to choose, 1-4 jump".cyan().dim()))
.wrap(Wrap { trim: true });
let hint = Paragraph::new(Line::from(
"Use ↑/↓ to choose, 1-4 jump; Enter confirm, Esc cancel"
.cyan()
.dim(),
))
.wrap(Wrap { trim: true });
frame.render_widget(hint, rows[0]);
let selected = app.best_of_modal.as_ref().map(|m| m.selected).unwrap_or(0);
let options = [1usize, 2, 3, 4];
let mut items: Vec<ListItem> = Vec::new();
for &attempts in &options {
let noun = if attempts == 1 { "attempt" } else { "attempts" };
let mut spans: Vec<ratatui::text::Span> = vec![format!("{attempts} {noun:<8}").into()];
let mut spans: Vec<ratatui::text::Span> =
vec![format!("{attempts} attempt{}", if attempts == 1 { "" } else { "s" }).into()];
spans.push(" ".into());
spans.push(format!("{attempts}x parallel").dim());
if attempts == app.best_of_n {

View File

@@ -70,7 +70,7 @@ pub async fn build_chatgpt_headers() -> HeaderMap {
HeaderValue::from_str(&ua).unwrap_or(HeaderValue::from_static("codex-cli")),
);
if let Ok(home) = codex_core::config::find_codex_home() {
let am = codex_login::AuthManager::new(home, false);
let am = codex_login::AuthManager::new(home);
if let Some(auth) = am.auth()
&& let Ok(tok) = auth.get_token().await
&& !tok.is_empty()

View File

@@ -10,7 +10,6 @@ workspace = true
clap = { workspace = true, features = ["derive", "wrap_help"], optional = true }
codex-core = { workspace = true }
codex-protocol = { workspace = true }
codex-app-server-protocol = { workspace = true }
serde = { workspace = true, optional = true }
toml = { workspace = true, optional = true }

View File

@@ -1,5 +1,5 @@
use codex_app_server_protocol::AuthMode;
use codex_core::protocol_config_types::ReasoningEffort;
use codex_protocol::mcp_protocol::AuthMode;
/// A simple preset pairing a model slug with a reasoning effort.
#[derive(Debug, Clone, Copy)]
@@ -20,49 +20,49 @@ const PRESETS: &[ModelPreset] = &[
ModelPreset {
id: "gpt-5-codex-low",
label: "gpt-5-codex low",
description: "Fastest responses with limited reasoning",
description: "",
model: "gpt-5-codex",
effort: Some(ReasoningEffort::Low),
},
ModelPreset {
id: "gpt-5-codex-medium",
label: "gpt-5-codex medium",
description: "Dynamically adjusts reasoning based on the task",
description: "",
model: "gpt-5-codex",
effort: Some(ReasoningEffort::Medium),
},
ModelPreset {
id: "gpt-5-codex-high",
label: "gpt-5-codex high",
description: "Maximizes reasoning depth for complex or ambiguous problems",
description: "",
model: "gpt-5-codex",
effort: Some(ReasoningEffort::High),
},
ModelPreset {
id: "gpt-5-minimal",
label: "gpt-5 minimal",
description: "Fastest responses with little reasoning",
description: "— fastest responses with limited reasoning; ideal for coding, instructions, or lightweight tasks",
model: "gpt-5",
effort: Some(ReasoningEffort::Minimal),
},
ModelPreset {
id: "gpt-5-low",
label: "gpt-5 low",
description: "Balances speed with some reasoning; useful for straightforward queries and short explanations",
description: "— balances speed with some reasoning; useful for straightforward queries and short explanations",
model: "gpt-5",
effort: Some(ReasoningEffort::Low),
},
ModelPreset {
id: "gpt-5-medium",
label: "gpt-5 medium",
description: "Provides a solid balance of reasoning depth and latency for general-purpose tasks",
description: "— default setting; provides a solid balance of reasoning depth and latency for general-purpose tasks",
model: "gpt-5",
effort: Some(ReasoningEffort::Medium),
},
ModelPreset {
id: "gpt-5-high",
label: "gpt-5 high",
description: "Maximizes reasoning depth for complex or ambiguous problems",
description: "— maximizes reasoning depth for complex or ambiguous problems",
model: "gpt-5",
effort: Some(ReasoningEffort::High),
},

View File

@@ -24,10 +24,8 @@ codex-file-search = { workspace = true }
codex-mcp-client = { workspace = true }
codex-rmcp-client = { workspace = true }
codex-protocol = { workspace = true }
codex-app-server-protocol = { workspace = true }
codex-otel = { workspace = true, features = ["otel"] }
dirs = { workspace = true }
dunce = { workspace = true }
env-flags = { workspace = true }
eventsource-stream = { workspace = true }
futures = { workspace = true }

View File

@@ -5,6 +5,7 @@ You are Codex, based on GPT-5. You are running as a coding agent in the Codex CL
- The arguments to `shell` will be passed to execvp(). Most terminal commands should be prefixed with ["bash", "-lc"].
- Always set the `workdir` param when using the shell function. Do not use `cd` unless absolutely necessary.
- When searching for text or files, prefer using `rg` or `rg --files` respectively because `rg` is much faster than alternatives like `grep`. (If the `rg` command is not found, then use alternatives.)
- When editing or creating files, you MUST use apply_patch as a standalone tool without going through ["bash", "-lc"], `Python`, `cat`, `sed`, ... Example: functions.shell({"command":["apply_patch","*** Begin Patch\nAdd File: hello.txt\n+Hello, world!\n*** End Patch"]}).
## Editing constraints
@@ -89,7 +90,7 @@ You are producing plain text that will later be styled by the CLI. Follow these
- Headers: optional; short Title Case (1-3 words) wrapped in **…**; no blank line before the first bullet; add only if they truly help.
- Bullets: use - ; merge related points; keep to one line when possible; 46 per list ordered by importance; keep phrasing consistent.
- Monospace: backticks for commands/paths/env vars/code ids and inline examples; use for literal keyword bullets; never combine with **.
- Code samples or multi-line snippets should be wrapped in fenced code blocks; include an info string as often as possible.
- Code samples or multi-line snippets should be wrapped in fenced code blocks; add a language hint whenever obvious.
- Structure: group related bullets; order sections general → specific → supporting; for subsections, start with a bolded keyword bullet, then items; match complexity to the task.
- Tone: collaborative, concise, factual; present tense, active voice; selfcontained; no "above/below"; parallel wording.
- Don'ts: no nested bullets/hierarchies; no ANSI codes; don't cram unrelated keywords; keep keyword lists short—wrap/reformat if long; avoid naming formatting styles in answers.

View File

@@ -15,7 +15,7 @@ use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use codex_app_server_protocol::AuthMode;
use codex_protocol::mcp_protocol::AuthMode;
use crate::token_data::PlanType;
use crate::token_data::TokenData;
@@ -73,7 +73,7 @@ impl CodexAuth {
/// Loads the available auth information from the auth.json.
pub fn from_codex_home(codex_home: &Path) -> std::io::Result<Option<CodexAuth>> {
load_auth(codex_home, false)
load_auth(codex_home)
}
pub async fn get_token_data(&self) -> Result<TokenData, std::io::Error> {
@@ -188,7 +188,6 @@ impl CodexAuth {
}
pub const OPENAI_API_KEY_ENV_VAR: &str = "OPENAI_API_KEY";
pub const CODEX_API_KEY_ENV_VAR: &str = "CODEX_API_KEY";
pub fn read_openai_api_key_from_env() -> Option<String> {
env::var(OPENAI_API_KEY_ENV_VAR)
@@ -197,13 +196,6 @@ pub fn read_openai_api_key_from_env() -> Option<String> {
.filter(|value| !value.is_empty())
}
pub fn read_codex_api_key_from_env() -> Option<String> {
env::var(CODEX_API_KEY_ENV_VAR)
.ok()
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
}
pub fn get_auth_file(codex_home: &Path) -> PathBuf {
codex_home.join("auth.json")
}
@@ -229,18 +221,7 @@ pub fn login_with_api_key(codex_home: &Path, api_key: &str) -> std::io::Result<(
write_auth_json(&get_auth_file(codex_home), &auth_dot_json)
}
fn load_auth(
codex_home: &Path,
enable_codex_api_key_env: bool,
) -> std::io::Result<Option<CodexAuth>> {
if enable_codex_api_key_env && let Some(api_key) = read_codex_api_key_from_env() {
let client = crate::default_client::create_client();
return Ok(Some(CodexAuth::from_api_key_with_client(
api_key.as_str(),
client,
)));
}
fn load_auth(codex_home: &Path) -> std::io::Result<Option<CodexAuth>> {
let auth_file = get_auth_file(codex_home);
let client = crate::default_client::create_client();
let auth_dot_json = match try_read_auth_json(&auth_file) {
@@ -474,7 +455,7 @@ mod tests {
auth_dot_json,
auth_file: _,
..
} = super::load_auth(codex_home.path(), false).unwrap().unwrap();
} = super::load_auth(codex_home.path()).unwrap().unwrap();
assert_eq!(None, api_key);
assert_eq!(AuthMode::ChatGPT, mode);
@@ -513,7 +494,7 @@ mod tests {
)
.unwrap();
let auth = super::load_auth(dir.path(), false).unwrap().unwrap();
let auth = super::load_auth(dir.path()).unwrap().unwrap();
assert_eq!(auth.mode, AuthMode::ApiKey);
assert_eq!(auth.api_key, Some("sk-test-key".to_string()));
@@ -596,7 +577,6 @@ mod tests {
pub struct AuthManager {
codex_home: PathBuf,
inner: RwLock<CachedAuth>,
enable_codex_api_key_env: bool,
}
impl AuthManager {
@@ -604,14 +584,11 @@ impl AuthManager {
/// preferred auth method. Errors loading auth are swallowed; `auth()` will
/// simply return `None` in that case so callers can treat it as an
/// unauthenticated state.
pub fn new(codex_home: PathBuf, enable_codex_api_key_env: bool) -> Self {
let auth = load_auth(&codex_home, enable_codex_api_key_env)
.ok()
.flatten();
pub fn new(codex_home: PathBuf) -> Self {
let auth = CodexAuth::from_codex_home(&codex_home).ok().flatten();
Self {
codex_home,
inner: RwLock::new(CachedAuth { auth }),
enable_codex_api_key_env,
}
}
@@ -621,7 +598,6 @@ impl AuthManager {
Arc::new(Self {
codex_home: PathBuf::new(),
inner: RwLock::new(cached),
enable_codex_api_key_env: false,
})
}
@@ -633,9 +609,7 @@ impl AuthManager {
/// Force a reload of the auth information from auth.json. Returns
/// whether the auth value changed.
pub fn reload(&self) -> bool {
let new_auth = load_auth(&self.codex_home, self.enable_codex_api_key_env)
.ok()
.flatten();
let new_auth = CodexAuth::from_codex_home(&self.codex_home).ok().flatten();
if let Ok(mut guard) = self.inner.write() {
let changed = !AuthManager::auths_equal(&guard.auth, &new_auth);
guard.auth = new_auth;
@@ -654,8 +628,8 @@ impl AuthManager {
}
/// Convenience constructor returning an `Arc` wrapper.
pub fn shared(codex_home: PathBuf, enable_codex_api_key_env: bool) -> Arc<Self> {
Arc::new(Self::new(codex_home, enable_codex_api_key_env))
pub fn shared(codex_home: PathBuf) -> Arc<Self> {
Arc::new(Self::new(codex_home))
}
/// Attempt to refresh the current auth token (if any). On success, reload

View File

@@ -6,8 +6,6 @@ use crate::client_common::ResponseEvent;
use crate::client_common::ResponseStream;
use crate::error::CodexErr;
use crate::error::Result;
use crate::error::RetryLimitReachedError;
use crate::error::UnexpectedResponseError;
use crate::model_family::ModelFamily;
use crate::openai_tools::create_tools_json_for_chat_completions_api;
use crate::util::backoff;
@@ -322,18 +320,11 @@ pub(crate) async fn stream_chat_completions(
let status = res.status();
if !(status == StatusCode::TOO_MANY_REQUESTS || status.is_server_error()) {
let body = (res.text().await).unwrap_or_default();
return Err(CodexErr::UnexpectedStatus(UnexpectedResponseError {
status,
body,
request_id: None,
}));
return Err(CodexErr::UnexpectedStatus(status, body));
}
if attempt > max_retries {
return Err(CodexErr::RetryLimit(RetryLimitReachedError {
status,
request_id: None,
}));
return Err(CodexErr::RetryLimit(status));
}
let retry_after_secs = res

View File

@@ -5,11 +5,9 @@ use std::time::Duration;
use crate::AuthManager;
use crate::auth::CodexAuth;
use crate::error::RetryLimitReachedError;
use crate::error::UnexpectedResponseError;
use bytes::Bytes;
use codex_app_server_protocol::AuthMode;
use codex_protocol::ConversationId;
use codex_protocol::mcp_protocol::AuthMode;
use codex_protocol::mcp_protocol::ConversationId;
use eventsource_stream::Eventsource;
use futures::prelude::*;
use regex_lite::Regex;
@@ -309,17 +307,14 @@ impl ModelClient {
.log_request(attempt, || req_builder.send())
.await;
let mut request_id = None;
if let Ok(resp) = &res {
request_id = resp
.headers()
.get("cf-ray")
.map(|v| v.to_str().unwrap_or_default().to_string());
trace!(
"Response status: {}, cf-ray: {:?}",
"Response status: {}, cf-ray: {}",
resp.status(),
request_id
resp.headers()
.get("cf-ray")
.map(|v| v.to_str().unwrap_or_default())
.unwrap_or_default()
);
}
@@ -379,11 +374,7 @@ impl ModelClient {
// Surface the error body to callers. Use `unwrap_or_default` per Clippy.
let body = res.text().await.unwrap_or_default();
return Err(StreamAttemptError::Fatal(CodexErr::UnexpectedStatus(
UnexpectedResponseError {
status,
body,
request_id: None,
},
status, body,
)));
}
@@ -414,7 +405,6 @@ impl ModelClient {
Err(StreamAttemptError::RetryableHttpError {
status,
retry_after,
request_id,
})
}
Err(e) => Err(StreamAttemptError::RetryableTransportError(e.into())),
@@ -458,7 +448,6 @@ enum StreamAttemptError {
RetryableHttpError {
status: StatusCode,
retry_after: Option<Duration>,
request_id: Option<String>,
},
RetryableTransportError(CodexErr),
Fatal(CodexErr),
@@ -483,13 +472,11 @@ impl StreamAttemptError {
fn into_error(self) -> CodexErr {
match self {
Self::RetryableHttpError {
status, request_id, ..
} => {
Self::RetryableHttpError { status, .. } => {
if status == StatusCode::INTERNAL_SERVER_ERROR {
CodexErr::InternalServerError
} else {
CodexErr::RetryLimit(RetryLimitReachedError { status, request_id })
CodexErr::RetryLimit(status)
}
}
Self::RetryableTransportError(error) => error,

View File

@@ -19,12 +19,11 @@ use async_channel::Sender;
use codex_apply_patch::ApplyPatchAction;
use codex_apply_patch::MaybeApplyPatchVerified;
use codex_apply_patch::maybe_parse_apply_patch_verified;
use codex_protocol::ConversationId;
use codex_protocol::mcp_protocol::ConversationId;
use codex_protocol::protocol::ConversationPathResponseEvent;
use codex_protocol::protocol::ExitedReviewModeEvent;
use codex_protocol::protocol::ReviewRequest;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::TaskStartedEvent;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnContextItem;
@@ -111,7 +110,6 @@ use crate::protocol::Submission;
use crate::protocol::TokenCountEvent;
use crate::protocol::TokenUsage;
use crate::protocol::TurnDiffEvent;
use crate::protocol::ViewImageToolCallEvent;
use crate::protocol::WebSearchBeginEvent;
use crate::rollout::RolloutRecorder;
use crate::rollout::RolloutRecorderParams;
@@ -178,7 +176,6 @@ impl Codex {
config: Config,
auth_manager: Arc<AuthManager>,
conversation_history: InitialHistory,
session_source: SessionSource,
) -> CodexResult<CodexSpawnOk> {
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
let (tx_event, rx_event) = async_channel::unbounded();
@@ -207,7 +204,6 @@ impl Codex {
auth_manager.clone(),
tx_event.clone(),
conversation_history,
session_source,
)
.await
.map_err(|e| {
@@ -342,7 +338,6 @@ impl Session {
auth_manager: Arc<AuthManager>,
tx_event: Sender<Event>,
initial_history: InitialHistory,
session_source: SessionSource,
) -> anyhow::Result<(Arc<Self>, TurnContext)> {
let ConfigureSession {
provider,
@@ -366,11 +361,7 @@ impl Session {
let conversation_id = ConversationId::default();
(
conversation_id,
RolloutRecorderParams::new(
conversation_id,
user_instructions.clone(),
session_source,
),
RolloutRecorderParams::new(conversation_id, user_instructions.clone()),
)
}
InitialHistory::Resumed(resumed_history) => (
@@ -2479,21 +2470,13 @@ async fn handle_function_call(
))
})?;
let abs = turn_context.resolve_path(Some(args.path));
sess.inject_input(vec![InputItem::LocalImage { path: abs.clone() }])
sess.inject_input(vec![InputItem::LocalImage { path: abs }])
.await
.map_err(|_| {
FunctionCallError::RespondToModel(
"unable to attach image (no active task)".to_string(),
)
})?;
sess.send_event(Event {
id: sub_id.clone(),
msg: EventMsg::ViewImageToolCall(ViewImageToolCallEvent {
call_id: call_id.clone(),
path: abs,
}),
})
.await;
Ok("attached local image path".to_string())
}
@@ -3318,7 +3301,7 @@ mod tests {
use crate::state::TaskKind;
use crate::tasks::SessionTask;
use crate::tasks::SessionTaskContext;
use codex_app_server_protocol::AuthMode;
use codex_protocol::mcp_protocol::AuthMode;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;

View File

@@ -1,431 +1,25 @@
use shlex::split as shlex_split;
/// On Windows, we conservatively allow only clearly read-only PowerShell invocations
/// that match a small safelist. Anything else (including direct CMD commands) is unsafe.
pub fn is_safe_command_windows(command: &[String]) -> bool {
if let Some(commands) = try_parse_powershell_command_sequence(command) {
return commands
.iter()
.all(|cmd| is_safe_powershell_command(cmd.as_slice()));
}
// Only PowerShell invocations are allowed on Windows for now; anything else is unsafe.
false
}
/// Returns each command sequence if the invocation starts with a PowerShell binary.
/// For example, the tokens from `pwsh Get-ChildItem | Measure-Object` become two sequences.
fn try_parse_powershell_command_sequence(command: &[String]) -> Option<Vec<Vec<String>>> {
let (exe, rest) = command.split_first()?;
if !is_powershell_executable(exe) {
return None;
}
parse_powershell_invocation(rest)
}
/// Parses a PowerShell invocation into discrete command vectors, rejecting unsafe patterns.
fn parse_powershell_invocation(args: &[String]) -> Option<Vec<Vec<String>>> {
if args.is_empty() {
// Examples rejected here: "pwsh" and "powershell.exe" with no additional arguments.
return None;
}
let mut idx = 0;
while idx < args.len() {
let arg = &args[idx];
let lower = arg.to_ascii_lowercase();
match lower.as_str() {
"-command" | "/command" | "-c" => {
let script = args.get(idx + 1)?;
if idx + 2 != args.len() {
// Reject if there is more than one token representing the actual command.
// Examples rejected here: "pwsh -Command foo bar" and "powershell -c ls extra".
return None;
}
return parse_powershell_script(script);
}
_ if lower.starts_with("-command:") || lower.starts_with("/command:") => {
if idx + 1 != args.len() {
// Reject if there are more tokens after the command itself.
// Examples rejected here: "pwsh -Command:dir C:\\" and "powershell /Command:dir C:\\" with trailing args.
return None;
}
let script = arg.split_once(':')?.1;
return parse_powershell_script(script);
}
// Benign, no-arg flags we tolerate.
"-nologo" | "-noprofile" | "-noninteractive" | "-mta" | "-sta" => {
idx += 1;
continue;
}
// Explicitly forbidden/opaque or unnecessary for read-only operations.
"-encodedcommand" | "-ec" | "-file" | "/file" | "-windowstyle" | "-executionpolicy"
| "-workingdirectory" => {
// Examples rejected here: "pwsh -EncodedCommand ..." and "powershell -File script.ps1".
return None;
}
// Unknown switch → bail conservatively.
_ if lower.starts_with('-') => {
// Examples rejected here: "pwsh -UnknownFlag" and "powershell -foo bar".
return None;
}
// If we hit non-flag tokens, treat the remainder as a command sequence.
// This happens if powershell is invoked without -Command, e.g.
// ["pwsh", "-NoLogo", "git", "-c", "core.pager=cat", "status"]
_ => {
return split_into_commands(args[idx..].to_vec());
}
}
}
// Examples rejected here: "pwsh" and "powershell.exe -NoLogo" without a script.
None
}
/// Tokenizes an inline PowerShell script and delegates to the command splitter.
/// Examples of when this is called: pwsh.exe -Command '<script>' or pwsh.exe -Command:<script>
fn parse_powershell_script(script: &str) -> Option<Vec<Vec<String>>> {
let tokens = shlex_split(script)?;
split_into_commands(tokens)
}
/// Splits tokens into pipeline segments while ensuring no unsafe separators slip through.
/// e.g. Get-ChildItem | Measure-Object -> [['Get-ChildItem'], ['Measure-Object']]
fn split_into_commands(tokens: Vec<String>) -> Option<Vec<Vec<String>>> {
if tokens.is_empty() {
// Examples rejected here: "pwsh -Command ''" and "powershell -Command \"\"".
return None;
}
let mut commands = Vec::new();
let mut current = Vec::new();
for token in tokens.into_iter() {
match token.as_str() {
"|" | "||" | "&&" | ";" => {
if current.is_empty() {
// Examples rejected here: "pwsh -Command '| Get-ChildItem'" and "pwsh -Command '; dir'".
return None;
}
commands.push(current);
current = Vec::new();
}
// Reject if any token embeds separators, redirection, or call operator characters.
_ if token.contains(['|', ';', '>', '<', '&']) || token.contains("$(") => {
// Examples rejected here: "pwsh -Command 'dir|select'" and "pwsh -Command 'echo hi > out.txt'".
return None;
}
_ => current.push(token),
}
}
if current.is_empty() {
// Examples rejected here: "pwsh -Command 'dir |'" and "pwsh -Command 'Get-ChildItem ;'".
return None;
}
commands.push(current);
Some(commands)
}
/// Returns true when the executable name is one of the supported PowerShell binaries.
fn is_powershell_executable(exe: &str) -> bool {
matches!(
exe.to_ascii_lowercase().as_str(),
"powershell" | "powershell.exe" | "pwsh" | "pwsh.exe"
)
}
/// Validates that a parsed PowerShell command stays within our read-only safelist.
/// Everything before this is parsing, and rejecting things that make us feel uncomfortable.
fn is_safe_powershell_command(words: &[String]) -> bool {
if words.is_empty() {
// Examples rejected here: "pwsh -Command ''" and "pwsh -Command \"\"".
return false;
}
// Reject nested unsafe cmdlets inside parentheses or arguments
for w in words.iter() {
let inner = w
.trim_matches(|c| c == '(' || c == ')')
.trim_start_matches('-')
.to_ascii_lowercase();
if matches!(
inner.as_str(),
"set-content"
| "add-content"
| "out-file"
| "new-item"
| "remove-item"
| "move-item"
| "copy-item"
| "rename-item"
| "start-process"
| "stop-process"
) {
// Examples rejected here: "Write-Output (Set-Content foo6.txt 'abc')" and "Get-Content (New-Item bar.txt)".
return false;
}
}
// Block PowerShell call operator or any redirection explicitly.
if words.iter().any(|w| {
matches!(
w.as_str(),
"&" | ">" | ">>" | "1>" | "2>" | "2>&1" | "*>" | "<" | "<<"
)
}) {
// Examples rejected here: "pwsh -Command '& Remove-Item foo'" and "pwsh -Command 'Get-Content foo > bar'".
return false;
}
let command = words[0]
.trim_matches(|c| c == '(' || c == ')')
.trim_start_matches('-')
.to_ascii_lowercase();
match command.as_str() {
"echo" | "write-output" | "write-host" => true, // (no redirection allowed)
"dir" | "ls" | "get-childitem" | "gci" => true,
"cat" | "type" | "gc" | "get-content" => true,
"select-string" | "sls" | "findstr" => true,
"measure-object" | "measure" => true,
"get-location" | "gl" | "pwd" => true,
"test-path" | "tp" => true,
"resolve-path" | "rvpa" => true,
"select-object" | "select" => true,
"get-item" => true,
"git" => is_safe_git_command(words),
"rg" => is_safe_ripgrep(words),
// Extra safety: explicitly prohibit common side-effecting cmdlets regardless of args.
"set-content" | "add-content" | "out-file" | "new-item" | "remove-item" | "move-item"
| "copy-item" | "rename-item" | "start-process" | "stop-process" => {
// Examples rejected here: "pwsh -Command 'Set-Content notes.txt data'" and "pwsh -Command 'Remove-Item temp.log'".
false
}
_ => {
// Examples rejected here: "pwsh -Command 'Invoke-WebRequest https://example.com'" and "pwsh -Command 'Start-Service Spooler'".
false
}
}
}
/// Checks that an `rg` invocation avoids options that can spawn arbitrary executables.
fn is_safe_ripgrep(words: &[String]) -> bool {
const UNSAFE_RIPGREP_OPTIONS_WITH_ARGS: &[&str] = &["--pre", "--hostname-bin"];
const UNSAFE_RIPGREP_OPTIONS_WITHOUT_ARGS: &[&str] = &["--search-zip", "-z"];
!words.iter().skip(1).any(|arg| {
let arg_lc = arg.to_ascii_lowercase();
// Examples rejected here: "pwsh -Command 'rg --pre cat pattern'" and "pwsh -Command 'rg --search-zip pattern'".
UNSAFE_RIPGREP_OPTIONS_WITHOUT_ARGS.contains(&arg_lc.as_str())
|| UNSAFE_RIPGREP_OPTIONS_WITH_ARGS
.iter()
.any(|opt| arg_lc == *opt || arg_lc.starts_with(&format!("{opt}=")))
})
}
/// Ensures a Git command sticks to whitelisted read-only subcommands and flags.
fn is_safe_git_command(words: &[String]) -> bool {
const SAFE_SUBCOMMANDS: &[&str] = &["status", "log", "show", "diff", "cat-file"];
let mut iter = words.iter().skip(1);
while let Some(arg) = iter.next() {
let arg_lc = arg.to_ascii_lowercase();
if arg.starts_with('-') {
if arg.eq_ignore_ascii_case("-c") || arg.eq_ignore_ascii_case("--config") {
if iter.next().is_none() {
// Examples rejected here: "pwsh -Command 'git -c'" and "pwsh -Command 'git --config'".
return false;
}
continue;
}
if arg_lc.starts_with("-c=")
|| arg_lc.starts_with("--config=")
|| arg_lc.starts_with("--git-dir=")
|| arg_lc.starts_with("--work-tree=")
{
continue;
}
if arg.eq_ignore_ascii_case("--git-dir") || arg.eq_ignore_ascii_case("--work-tree") {
if iter.next().is_none() {
// Examples rejected here: "pwsh -Command 'git --git-dir'" and "pwsh -Command 'git --work-tree'".
return false;
}
continue;
}
continue;
}
return SAFE_SUBCOMMANDS.contains(&arg_lc.as_str());
}
// Examples rejected here: "pwsh -Command 'git'" and "pwsh -Command 'git status --short | Remove-Item foo'".
// This is a WIP. This will eventually contain a real list of common safe Windows commands.
pub fn is_safe_command_windows(_command: &[String]) -> bool {
false
}
#[cfg(test)]
mod tests {
use super::is_safe_command_windows;
use std::string::ToString;
/// Converts a slice of string literals into owned `String`s for the tests.
fn vec_str(args: &[&str]) -> Vec<String> {
args.iter().map(ToString::to_string).collect()
}
#[test]
fn recognizes_safe_powershell_wrappers() {
assert!(is_safe_command_windows(&vec_str(&[
"powershell.exe",
"-NoLogo",
"-Command",
"Get-ChildItem -Path .",
])));
assert!(is_safe_command_windows(&vec_str(&[
"powershell.exe",
"-NoProfile",
"-Command",
"git status",
])));
assert!(is_safe_command_windows(&vec_str(&[
"powershell.exe",
"Get-Content",
"Cargo.toml",
])));
// pwsh parity
assert!(is_safe_command_windows(&vec_str(&[
"pwsh.exe",
"-NoProfile",
"-Command",
"Get-ChildItem",
])));
}
#[test]
fn allows_read_only_pipelines_and_git_usage() {
assert!(is_safe_command_windows(&vec_str(&[
"pwsh",
"-NoLogo",
"-NoProfile",
"-Command",
"rg --files-with-matches foo | Measure-Object | Select-Object -ExpandProperty Count",
])));
assert!(is_safe_command_windows(&vec_str(&[
"pwsh",
"-NoLogo",
"-NoProfile",
"-Command",
"Get-Content foo.rs | Select-Object -Skip 200",
])));
assert!(is_safe_command_windows(&vec_str(&[
"pwsh",
"-NoLogo",
"-NoProfile",
"-Command",
"git -c core.pager=cat show HEAD:foo.rs",
])));
assert!(is_safe_command_windows(&vec_str(&[
"pwsh",
"-Command",
"-git cat-file -p HEAD:foo.rs",
])));
assert!(is_safe_command_windows(&vec_str(&[
"pwsh",
"-Command",
"(Get-Content foo.rs -Raw)",
])));
assert!(is_safe_command_windows(&vec_str(&[
"pwsh",
"-Command",
"Get-Item foo.rs | Select-Object Length",
])));
}
#[test]
fn rejects_powershell_commands_with_side_effects() {
assert!(!is_safe_command_windows(&vec_str(&[
"powershell.exe",
"-NoLogo",
"-Command",
"Remove-Item foo.txt",
])));
assert!(!is_safe_command_windows(&vec_str(&[
"powershell.exe",
"-NoProfile",
"-Command",
"rg --pre cat",
])));
assert!(!is_safe_command_windows(&vec_str(&[
"powershell.exe",
"-Command",
"Set-Content foo.txt 'hello'",
])));
// Redirections are blocked
assert!(!is_safe_command_windows(&vec_str(&[
"powershell.exe",
"-Command",
"echo hi > out.txt",
])));
assert!(!is_safe_command_windows(&vec_str(&[
"powershell.exe",
"-Command",
"Get-Content x | Out-File y",
])));
assert!(!is_safe_command_windows(&vec_str(&[
"powershell.exe",
"-Command",
"Write-Output foo 2> err.txt",
])));
// Call operator is blocked
assert!(!is_safe_command_windows(&vec_str(&[
"powershell.exe",
"-Command",
"& Remove-Item foo",
])));
// Chained safe + unsafe must fail
assert!(!is_safe_command_windows(&vec_str(&[
"powershell.exe",
"-Command",
"Get-ChildItem; Remove-Item foo",
])));
// Nested unsafe cmdlet inside safe command must fail
assert!(!is_safe_command_windows(&vec_str(&[
"powershell.exe",
"-Command",
"Write-Output (Set-Content foo6.txt 'abc')",
])));
// Additional nested unsafe cmdlet examples must fail
assert!(!is_safe_command_windows(&vec_str(&[
"powershell.exe",
"-Command",
"Write-Host (Remove-Item foo.txt)",
])));
assert!(!is_safe_command_windows(&vec_str(&[
"powershell.exe",
"-Command",
"Get-Content (New-Item bar.txt)",
])));
fn everything_is_unsafe() {
for cmd in [
vec_str(&["powershell.exe", "-NoLogo", "-Command", "echo hello"]),
vec_str(&["copy", "foo", "bar"]),
vec_str(&["del", "file.txt"]),
vec_str(&["powershell.exe", "Get-ChildItem"]),
] {
assert!(!is_safe_command_windows(&cmd));
}
}
}

View File

@@ -23,12 +23,12 @@ use crate::openai_model_info::get_model_info;
use crate::protocol::AskForApproval;
use crate::protocol::SandboxPolicy;
use anyhow::Context;
use codex_app_server_protocol::Tools;
use codex_app_server_protocol::UserSavedConfig;
use codex_protocol::config_types::ReasoningEffort;
use codex_protocol::config_types::ReasoningSummary;
use codex_protocol::config_types::SandboxMode;
use codex_protocol::config_types::Verbosity;
use codex_protocol::mcp_protocol::Tools;
use codex_protocol::mcp_protocol::UserSavedConfig;
use dirs::home_dir;
use serde::Deserialize;
use std::collections::BTreeMap;
@@ -141,9 +141,6 @@ pub struct Config {
/// Maximum number of bytes to include from an AGENTS.md project doc file.
pub project_doc_max_bytes: usize,
/// Additional filenames to try when looking for project-level docs.
pub project_doc_fallback_filenames: Vec<String>,
/// Directory containing all Codex state (defaults to `~/.codex` but can be
/// overridden by the `CODEX_HOME` environment variable).
pub codex_home: PathBuf,
@@ -673,9 +670,6 @@ pub struct ConfigToml {
/// Maximum number of bytes to include from an AGENTS.md project doc file.
pub project_doc_max_bytes: Option<usize>,
/// Ordered list of fallback filenames to look for when AGENTS.md is missing.
pub project_doc_fallback_filenames: Option<Vec<String>>,
/// Profile to use from the `profiles` map.
pub profile: Option<String>,
@@ -1044,19 +1038,6 @@ impl Config {
mcp_servers: cfg.mcp_servers,
model_providers,
project_doc_max_bytes: cfg.project_doc_max_bytes.unwrap_or(PROJECT_DOC_MAX_BYTES),
project_doc_fallback_filenames: cfg
.project_doc_fallback_filenames
.unwrap_or_default()
.into_iter()
.filter_map(|name| {
let trimmed = name.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
})
.collect(),
codex_home,
history,
file_opener: cfg.file_opener.unwrap_or(UriBasedFileOpener::VsCode),
@@ -1830,7 +1811,6 @@ model_verbosity = "high"
mcp_servers: HashMap::new(),
model_providers: fixture.model_provider_map.clone(),
project_doc_max_bytes: PROJECT_DOC_MAX_BYTES,
project_doc_fallback_filenames: Vec::new(),
codex_home: fixture.codex_home(),
history: History::default(),
file_opener: UriBasedFileOpener::VsCode,
@@ -1891,7 +1871,6 @@ model_verbosity = "high"
mcp_servers: HashMap::new(),
model_providers: fixture.model_provider_map.clone(),
project_doc_max_bytes: PROJECT_DOC_MAX_BYTES,
project_doc_fallback_filenames: Vec::new(),
codex_home: fixture.codex_home(),
history: History::default(),
file_opener: UriBasedFileOpener::VsCode,
@@ -1967,7 +1946,6 @@ model_verbosity = "high"
mcp_servers: HashMap::new(),
model_providers: fixture.model_provider_map.clone(),
project_doc_max_bytes: PROJECT_DOC_MAX_BYTES,
project_doc_fallback_filenames: Vec::new(),
codex_home: fixture.codex_home(),
history: History::default(),
file_opener: UriBasedFileOpener::VsCode,
@@ -2029,7 +2007,6 @@ model_verbosity = "high"
mcp_servers: HashMap::new(),
model_providers: fixture.model_provider_map.clone(),
project_doc_max_bytes: PROJECT_DOC_MAX_BYTES,
project_doc_fallback_filenames: Vec::new(),
codex_home: fixture.codex_home(),
history: History::default(),
file_opener: UriBasedFileOpener::VsCode,

View File

@@ -22,7 +22,7 @@ pub struct ConfigProfile {
pub experimental_instructions_file: Option<PathBuf>,
}
impl From<ConfigProfile> for codex_app_server_protocol::Profile {
impl From<ConfigProfile> for codex_protocol::mcp_protocol::Profile {
fn from(config_profile: ConfigProfile) -> Self {
Self {
model: config_profile.model,

View File

@@ -313,7 +313,7 @@ pub struct SandboxWorkspaceWrite {
pub exclude_slash_tmp: bool,
}
impl From<SandboxWorkspaceWrite> for codex_app_server_protocol::SandboxSettings {
impl From<SandboxWorkspaceWrite> for codex_protocol::mcp_protocol::SandboxSettings {
fn from(sandbox_workspace_write: SandboxWorkspaceWrite) -> Self {
Self {
writable_roots: sandbox_workspace_write.writable_roots,

View File

@@ -13,11 +13,10 @@ use crate::protocol::Event;
use crate::protocol::EventMsg;
use crate::protocol::SessionConfiguredEvent;
use crate::rollout::RolloutRecorder;
use codex_protocol::ConversationId;
use codex_protocol::mcp_protocol::ConversationId;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SessionSource;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
@@ -36,25 +35,20 @@ pub struct NewConversation {
pub struct ConversationManager {
conversations: Arc<RwLock<HashMap<ConversationId, Arc<CodexConversation>>>>,
auth_manager: Arc<AuthManager>,
session_source: SessionSource,
}
impl ConversationManager {
pub fn new(auth_manager: Arc<AuthManager>, session_source: SessionSource) -> Self {
pub fn new(auth_manager: Arc<AuthManager>) -> Self {
Self {
conversations: Arc::new(RwLock::new(HashMap::new())),
auth_manager,
session_source,
}
}
/// Construct with a dummy AuthManager containing the provided CodexAuth.
/// Used for integration tests: should not be used by ordinary business logic.
pub fn with_auth(auth: CodexAuth) -> Self {
Self::new(
crate::AuthManager::from_auth_for_testing(auth),
SessionSource::Exec,
)
Self::new(crate::AuthManager::from_auth_for_testing(auth))
}
pub async fn new_conversation(&self, config: Config) -> CodexResult<NewConversation> {
@@ -70,13 +64,7 @@ impl ConversationManager {
let CodexSpawnOk {
codex,
conversation_id,
} = Codex::spawn(
config,
auth_manager,
InitialHistory::New,
self.session_source,
)
.await?;
} = Codex::spawn(config, auth_manager, InitialHistory::New).await?;
self.finalize_spawn(codex, conversation_id).await
}
@@ -133,7 +121,7 @@ impl ConversationManager {
let CodexSpawnOk {
codex,
conversation_id,
} = Codex::spawn(config, auth_manager, initial_history, self.session_source).await?;
} = Codex::spawn(config, auth_manager, initial_history).await?;
self.finalize_spawn(codex, conversation_id).await
}
@@ -167,7 +155,7 @@ impl ConversationManager {
let CodexSpawnOk {
codex,
conversation_id,
} = Codex::spawn(config, auth_manager, history, self.session_source).await?;
} = Codex::spawn(config, auth_manager, history).await?;
self.finalize_spawn(codex, conversation_id).await
}

View File

@@ -1,7 +1,7 @@
use crate::exec::ExecToolCallOutput;
use crate::token_data::KnownPlan;
use crate::token_data::PlanType;
use codex_protocol::ConversationId;
use codex_protocol::mcp_protocol::ConversationId;
use codex_protocol::protocol::RateLimitSnapshot;
use reqwest::StatusCode;
use serde_json;
@@ -76,8 +76,8 @@ pub enum CodexErr {
Interrupted,
/// Unexpected HTTP status code.
#[error("{0}")]
UnexpectedStatus(UnexpectedResponseError),
#[error("unexpected status {0}: {1}")]
UnexpectedStatus(StatusCode, String),
#[error("{0}")]
UsageLimitReached(UsageLimitReachedError),
@@ -91,8 +91,8 @@ pub enum CodexErr {
InternalServerError,
/// Retry limit exceeded.
#[error("{0}")]
RetryLimit(RetryLimitReachedError),
#[error("exceeded retry limit, last status: {0}")]
RetryLimit(StatusCode),
/// Agent loop died unexpectedly
#[error("internal error; agent loop died unexpectedly")]
@@ -135,49 +135,6 @@ pub enum CodexErr {
EnvVar(EnvVarError),
}
#[derive(Debug)]
pub struct UnexpectedResponseError {
pub status: StatusCode,
pub body: String,
pub request_id: Option<String>,
}
impl std::fmt::Display for UnexpectedResponseError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"unexpected status {}: {}{}",
self.status,
self.body,
self.request_id
.as_ref()
.map(|id| format!(", request id: {id}"))
.unwrap_or_default()
)
}
}
impl std::error::Error for UnexpectedResponseError {}
#[derive(Debug)]
pub struct RetryLimitReachedError {
pub status: StatusCode,
pub request_id: Option<String>,
}
impl std::fmt::Display for RetryLimitReachedError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"exceeded retry limit, last status: {}{}",
self.status,
self.request_id
.as_ref()
.map(|id| format!(", request id: {id}"))
.unwrap_or_default()
)
}
}
#[derive(Debug)]
pub struct UsageLimitReachedError {
pub(crate) plan_type: Option<PlanType>,

View File

@@ -49,7 +49,7 @@ pub fn create_exec_command_tool_for_responses_api() -> ResponsesApiTool {
parameters: JsonSchema::Object {
properties,
required: Some(vec!["cmd".to_string()]),
additional_properties: Some(false.into()),
additional_properties: Some(false),
},
}
}
@@ -92,7 +92,7 @@ Can write control characters (\u0003 for Ctrl-C), or an empty string to just pol
parameters: JsonSchema::Object {
properties,
required: Some(vec!["session_id".to_string(), "chars".to_string()]),
additional_properties: Some(false.into()),
additional_properties: Some(false),
},
}
}

View File

@@ -2,7 +2,7 @@ use std::collections::HashSet;
use std::path::Path;
use std::path::PathBuf;
use codex_app_server_protocol::GitSha;
use codex_protocol::mcp_protocol::GitSha;
use codex_protocol::protocol::GitInfo;
use futures::future::join_all;
use serde::Deserialize;

View File

@@ -67,7 +67,6 @@ pub mod terminal;
mod tool_apply_patch;
pub mod turn_diff_tracker;
pub use rollout::ARCHIVED_SESSIONS_SUBDIR;
pub use rollout::INTERACTIVE_SESSION_SOURCES;
pub use rollout::RolloutRecorder;
pub use rollout::SESSIONS_SUBDIR;
pub use rollout::SessionMeta;

View File

@@ -30,7 +30,7 @@ use tokio::io::AsyncReadExt;
use crate::config::Config;
use crate::config_types::HistoryPersistence;
use codex_protocol::ConversationId;
use codex_protocol::mcp_protocol::ConversationId;
#[cfg(unix)]
use std::os::unix::fs::OpenOptionsExt;
#[cfg(unix)]

View File

@@ -6,7 +6,7 @@
//! key. These override or extend the defaults at runtime.
use crate::CodexAuth;
use codex_app_server_protocol::AuthMode;
use codex_protocol::mcp_protocol::AuthMode;
use serde::Deserialize;
use serde::Serialize;
use std::collections::HashMap;

View File

@@ -122,26 +122,6 @@ impl ToolsConfig {
}
}
/// Whether additional properties are allowed, and if so, any required schema
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(untagged)]
pub(crate) enum AdditionalProperties {
Boolean(bool),
Schema(Box<JsonSchema>),
}
impl From<bool> for AdditionalProperties {
fn from(b: bool) -> Self {
Self::Boolean(b)
}
}
impl From<JsonSchema> for AdditionalProperties {
fn from(s: JsonSchema) -> Self {
Self::Schema(Box::new(s))
}
}
/// Generic JSONSchema subset needed for our tool definitions
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type", rename_all = "lowercase")]
@@ -174,7 +154,7 @@ pub(crate) enum JsonSchema {
rename = "additionalProperties",
skip_serializing_if = "Option::is_none"
)]
additional_properties: Option<AdditionalProperties>,
additional_properties: Option<bool>,
},
}
@@ -220,7 +200,7 @@ fn create_unified_exec_tool() -> OpenAiTool {
parameters: JsonSchema::Object {
properties,
required: Some(vec!["input".to_string()]),
additional_properties: Some(false.into()),
additional_properties: Some(false),
},
})
}
@@ -267,7 +247,7 @@ fn create_shell_tool() -> OpenAiTool {
parameters: JsonSchema::Object {
properties,
required: Some(vec!["command".to_string()]),
additional_properties: Some(false.into()),
additional_properties: Some(false),
},
})
}
@@ -291,7 +271,7 @@ fn create_view_image_tool() -> OpenAiTool {
parameters: JsonSchema::Object {
properties,
required: Some(vec!["path".to_string()]),
additional_properties: Some(false.into()),
additional_properties: Some(false),
},
})
}
@@ -728,130 +708,7 @@ mod tests {
"string_property".to_string(),
"number_property".to_string(),
]),
additional_properties: Some(false.into()),
},
),
]),
required: None,
additional_properties: None,
},
description: "Do something cool".to_string(),
strict: false,
})
);
}
#[test]
fn test_get_openai_tools_mcp_tools_with_additional_properties_schema() {
let model_family = find_family_for_model("o3").expect("o3 should be a valid model family");
let config = ToolsConfig::new(&ToolsConfigParams {
model_family: &model_family,
include_plan_tool: false,
include_apply_patch_tool: false,
include_web_search_request: true,
use_streamable_shell_tool: false,
include_view_image_tool: true,
experimental_unified_exec_tool: true,
});
let tools = get_openai_tools(
&config,
Some(HashMap::from([(
"test_server/do_something_cool".to_string(),
mcp_types::Tool {
name: "do_something_cool".to_string(),
input_schema: ToolInputSchema {
properties: Some(serde_json::json!({
"string_argument": {
"type": "string",
},
"number_argument": {
"type": "number",
},
"object_argument": {
"type": "object",
"properties": {
"string_property": { "type": "string" },
"number_property": { "type": "number" },
},
"required": [
"string_property",
"number_property",
],
"additionalProperties": {
"type": "object",
"properties": {
"addtl_prop": { "type": "string" },
},
"required": [
"addtl_prop",
],
"additionalProperties": false,
},
},
})),
required: None,
r#type: "object".to_string(),
},
output_schema: None,
title: None,
annotations: None,
description: Some("Do something cool".to_string()),
},
)])),
);
assert_eq_tool_names(
&tools,
&[
"unified_exec",
"web_search",
"view_image",
"test_server/do_something_cool",
],
);
assert_eq!(
tools[3],
OpenAiTool::Function(ResponsesApiTool {
name: "test_server/do_something_cool".to_string(),
parameters: JsonSchema::Object {
properties: BTreeMap::from([
(
"string_argument".to_string(),
JsonSchema::String { description: None }
),
(
"number_argument".to_string(),
JsonSchema::Number { description: None }
),
(
"object_argument".to_string(),
JsonSchema::Object {
properties: BTreeMap::from([
(
"string_property".to_string(),
JsonSchema::String { description: None }
),
(
"number_property".to_string(),
JsonSchema::Number { description: None }
),
]),
required: Some(vec![
"string_property".to_string(),
"number_property".to_string(),
]),
additional_properties: Some(
JsonSchema::Object {
properties: BTreeMap::from([(
"addtl_prop".to_string(),
JsonSchema::String { description: None }
),]),
required: Some(vec!["addtl_prop".to_string(),]),
additional_properties: Some(false.into()),
}
.into()
),
additional_properties: Some(false),
},
),
]),

View File

@@ -32,7 +32,7 @@ pub(crate) static PLAN_TOOL: LazyLock<OpenAiTool> = LazyLock::new(|| {
items: Box::new(JsonSchema::Object {
properties: plan_item_props,
required: Some(vec!["step".to_string(), "status".to_string()]),
additional_properties: Some(false.into()),
additional_properties: Some(false),
}),
};
@@ -54,7 +54,7 @@ At most one step can be in_progress at a time.
parameters: JsonSchema::Object {
properties,
required: Some(vec!["plan".to_string()]),
additional_properties: Some(false.into()),
additional_properties: Some(false),
},
})
});

View File

@@ -1,7 +1,6 @@
//! Project-level documentation discovery.
//!
//! Project-level documentation is primarily stored in files named `AGENTS.md`.
//! Additional fallback filenames can be configured via `project_doc_fallback_filenames`.
//! Project-level documentation can be stored in files named `AGENTS.md`.
//! We include the concatenation of all files found along the path from the
//! repository root to the current working directory as follows:
//!
@@ -14,13 +13,12 @@
//! 3. We do **not** walk past the Git root.
use crate::config::Config;
use dunce::canonicalize as normalize_path;
use std::path::PathBuf;
use tokio::io::AsyncReadExt;
use tracing::error;
/// Default filename scanned for project-level docs.
pub const DEFAULT_PROJECT_DOC_FILENAME: &str = "AGENTS.md";
/// Currently, we only match the filename `AGENTS.md` exactly.
const CANDIDATE_FILENAMES: &[&str] = &["AGENTS.md"];
/// When both `Config::instructions` and the project doc are present, they will
/// be concatenated with the following separator.
@@ -110,7 +108,7 @@ pub async fn read_project_docs(config: &Config) -> std::io::Result<Option<String
/// is zero, returns an empty list.
pub fn discover_project_doc_paths(config: &Config) -> std::io::Result<Vec<PathBuf>> {
let mut dir = config.cwd.clone();
if let Ok(canon) = normalize_path(&dir) {
if let Ok(canon) = dir.canonicalize() {
dir = canon;
}
@@ -154,9 +152,8 @@ pub fn discover_project_doc_paths(config: &Config) -> std::io::Result<Vec<PathBu
};
let mut found: Vec<PathBuf> = Vec::new();
let candidate_filenames = candidate_filenames(config);
for d in search_dirs {
for name in &candidate_filenames {
for name in CANDIDATE_FILENAMES {
let candidate = d.join(name);
match std::fs::symlink_metadata(&candidate) {
Ok(md) => {
@@ -176,22 +173,6 @@ pub fn discover_project_doc_paths(config: &Config) -> std::io::Result<Vec<PathBu
Ok(found)
}
fn candidate_filenames<'a>(config: &'a Config) -> Vec<&'a str> {
let mut names: Vec<&'a str> =
Vec::with_capacity(1 + config.project_doc_fallback_filenames.len());
names.push(DEFAULT_PROJECT_DOC_FILENAME);
for candidate in &config.project_doc_fallback_filenames {
let candidate = candidate.as_str();
if candidate.is_empty() {
continue;
}
if !names.contains(&candidate) {
names.push(candidate);
}
}
names
}
#[cfg(test)]
mod tests {
use super::*;
@@ -221,20 +202,6 @@ mod tests {
config
}
fn make_config_with_fallback(
root: &TempDir,
limit: usize,
instructions: Option<&str>,
fallbacks: &[&str],
) -> Config {
let mut config = make_config(root, limit, instructions);
config.project_doc_fallback_filenames = fallbacks
.iter()
.map(std::string::ToString::to_string)
.collect();
config
}
/// AGENTS.md missing should yield `None`.
#[tokio::test]
async fn no_doc_file_returns_none() {
@@ -380,45 +347,4 @@ mod tests {
let res = get_user_instructions(&cfg).await.expect("doc expected");
assert_eq!(res, "root doc\n\ncrate doc");
}
/// When AGENTS.md is absent but a configured fallback exists, the fallback is used.
#[tokio::test]
async fn uses_configured_fallback_when_agents_missing() {
let tmp = tempfile::tempdir().expect("tempdir");
fs::write(tmp.path().join("EXAMPLE.md"), "example instructions").unwrap();
let cfg = make_config_with_fallback(&tmp, 4096, None, &["EXAMPLE.md"]);
let res = get_user_instructions(&cfg)
.await
.expect("fallback doc expected");
assert_eq!(res, "example instructions");
}
/// AGENTS.md remains preferred when both AGENTS.md and fallbacks are present.
#[tokio::test]
async fn agents_md_preferred_over_fallbacks() {
let tmp = tempfile::tempdir().expect("tempdir");
fs::write(tmp.path().join("AGENTS.md"), "primary").unwrap();
fs::write(tmp.path().join("EXAMPLE.md"), "secondary").unwrap();
let cfg = make_config_with_fallback(&tmp, 4096, None, &["EXAMPLE.md", ".example.md"]);
let res = get_user_instructions(&cfg)
.await
.expect("AGENTS.md should win");
assert_eq!(res, "primary");
let discovery = discover_project_doc_paths(&cfg).expect("discover paths");
assert_eq!(discovery.len(), 1);
assert!(
discovery[0]
.file_name()
.unwrap()
.to_string_lossy()
.eq(DEFAULT_PROJECT_DOC_FILENAME)
);
}
}

View File

@@ -17,7 +17,6 @@ use super::SESSIONS_SUBDIR;
use crate::protocol::EventMsg;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::SessionSource;
/// Returned page of conversation summaries.
#[derive(Debug, Default, PartialEq)]
@@ -41,25 +40,10 @@ pub struct ConversationItem {
pub head: Vec<serde_json::Value>,
/// Last up to `TAIL_RECORD_LIMIT` JSONL response records parsed as JSON.
pub tail: Vec<serde_json::Value>,
/// RFC3339 timestamp string for when the session was created, if available.
pub created_at: Option<String>,
/// RFC3339 timestamp string for the most recent response in the tail, if available.
pub updated_at: Option<String>,
}
#[derive(Default)]
struct HeadTailSummary {
head: Vec<serde_json::Value>,
tail: Vec<serde_json::Value>,
saw_session_meta: bool,
saw_user_event: bool,
source: Option<SessionSource>,
created_at: Option<String>,
updated_at: Option<String>,
}
/// Hard cap to bound worstcase work per request.
const MAX_SCAN_FILES: usize = 10000;
const MAX_SCAN_FILES: usize = 100;
const HEAD_RECORD_LIMIT: usize = 10;
const TAIL_RECORD_LIMIT: usize = 10;
@@ -108,7 +92,6 @@ pub(crate) async fn get_conversations(
codex_home: &Path,
page_size: usize,
cursor: Option<&Cursor>,
allowed_sources: &[SessionSource],
) -> io::Result<ConversationsPage> {
let mut root = codex_home.to_path_buf();
root.push(SESSIONS_SUBDIR);
@@ -124,8 +107,7 @@ pub(crate) async fn get_conversations(
let anchor = cursor.cloned();
let result =
traverse_directories_for_paths(root.clone(), page_size, anchor, allowed_sources).await?;
let result = traverse_directories_for_paths(root.clone(), page_size, anchor).await?;
Ok(result)
}
@@ -144,7 +126,6 @@ async fn traverse_directories_for_paths(
root: PathBuf,
page_size: usize,
anchor: Option<Cursor>,
allowed_sources: &[SessionSource],
) -> io::Result<ConversationsPage> {
let mut items: Vec<ConversationItem> = Vec::with_capacity(page_size);
let mut scanned_files = 0usize;
@@ -198,33 +179,13 @@ async fn traverse_directories_for_paths(
}
// Read head and simultaneously detect message events within the same
// first N JSONL records to avoid a second file read.
let summary = read_head_and_tail(&path, HEAD_RECORD_LIMIT, TAIL_RECORD_LIMIT)
.await
.unwrap_or_default();
if !allowed_sources.is_empty()
&& !summary
.source
.is_some_and(|source| allowed_sources.iter().any(|s| s == &source))
{
continue;
}
let (head, tail, saw_session_meta, saw_user_event) =
read_head_and_tail(&path, HEAD_RECORD_LIMIT, TAIL_RECORD_LIMIT)
.await
.unwrap_or((Vec::new(), Vec::new(), false, false));
// Apply filters: must have session meta and at least one user message event
if summary.saw_session_meta && summary.saw_user_event {
let HeadTailSummary {
head,
tail,
created_at,
mut updated_at,
..
} = summary;
updated_at = updated_at.or_else(|| created_at.clone());
items.push(ConversationItem {
path,
head,
tail,
created_at,
updated_at,
});
if saw_session_meta && saw_user_event {
items.push(ConversationItem { path, head, tail });
}
}
}
@@ -332,15 +293,17 @@ async fn read_head_and_tail(
path: &Path,
head_limit: usize,
tail_limit: usize,
) -> io::Result<HeadTailSummary> {
) -> io::Result<(Vec<serde_json::Value>, Vec<serde_json::Value>, bool, bool)> {
use tokio::io::AsyncBufReadExt;
let file = tokio::fs::File::open(path).await?;
let reader = tokio::io::BufReader::new(file);
let mut lines = reader.lines();
let mut summary = HeadTailSummary::default();
let mut head: Vec<serde_json::Value> = Vec::new();
let mut saw_session_meta = false;
let mut saw_user_event = false;
while summary.head.len() < head_limit {
while head.len() < head_limit {
let line_opt = lines.next_line().await?;
let Some(line) = line_opt else { break };
let trimmed = line.trim();
@@ -353,23 +316,14 @@ async fn read_head_and_tail(
match rollout_line.item {
RolloutItem::SessionMeta(session_meta_line) => {
summary.source = Some(session_meta_line.meta.source);
summary.created_at = summary
.created_at
.clone()
.or_else(|| Some(rollout_line.timestamp.clone()));
if let Ok(val) = serde_json::to_value(session_meta_line) {
summary.head.push(val);
summary.saw_session_meta = true;
head.push(val);
saw_session_meta = true;
}
}
RolloutItem::ResponseItem(item) => {
summary.created_at = summary
.created_at
.clone()
.or_else(|| Some(rollout_line.timestamp.clone()));
if let Ok(val) = serde_json::to_value(item) {
summary.head.push(val);
head.push(val);
}
}
RolloutItem::TurnContext(_) => {
@@ -380,30 +334,28 @@ async fn read_head_and_tail(
}
RolloutItem::EventMsg(ev) => {
if matches!(ev, EventMsg::UserMessage(_)) {
summary.saw_user_event = true;
saw_user_event = true;
}
}
}
}
if tail_limit != 0 {
let (tail, updated_at) = read_tail_records(path, tail_limit).await?;
summary.tail = tail;
summary.updated_at = updated_at;
}
Ok(summary)
let tail = if tail_limit == 0 {
Vec::new()
} else {
read_tail_records(path, tail_limit).await?
};
Ok((head, tail, saw_session_meta, saw_user_event))
}
async fn read_tail_records(
path: &Path,
max_records: usize,
) -> io::Result<(Vec<serde_json::Value>, Option<String>)> {
async fn read_tail_records(path: &Path, max_records: usize) -> io::Result<Vec<serde_json::Value>> {
use std::io::SeekFrom;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncSeekExt;
if max_records == 0 {
return Ok((Vec::new(), None));
return Ok(Vec::new());
}
const CHUNK_SIZE: usize = 8192;
@@ -411,28 +363,24 @@ async fn read_tail_records(
let mut file = tokio::fs::File::open(path).await?;
let mut pos = file.seek(SeekFrom::End(0)).await?;
if pos == 0 {
return Ok((Vec::new(), None));
return Ok(Vec::new());
}
let mut buffer: Vec<u8> = Vec::new();
let mut latest_timestamp: Option<String> = None;
loop {
let slice_start = match (pos > 0, buffer.iter().position(|&b| b == b'\n')) {
(true, Some(idx)) => idx + 1,
_ => 0,
};
let (tail, newest_ts) = collect_last_response_values(&buffer[slice_start..], max_records);
if latest_timestamp.is_none() {
latest_timestamp = newest_ts.clone();
}
let tail = collect_last_response_values(&buffer[slice_start..], max_records);
if tail.len() >= max_records || pos == 0 {
return Ok((tail, latest_timestamp.or(newest_ts)));
return Ok(tail);
}
let read_size = CHUNK_SIZE.min(pos as usize);
if read_size == 0 {
return Ok((tail, latest_timestamp.or(newest_ts)));
return Ok(tail);
}
pos -= read_size as u64;
file.seek(SeekFrom::Start(pos)).await?;
@@ -443,19 +391,15 @@ async fn read_tail_records(
}
}
fn collect_last_response_values(
buffer: &[u8],
max_records: usize,
) -> (Vec<serde_json::Value>, Option<String>) {
fn collect_last_response_values(buffer: &[u8], max_records: usize) -> Vec<serde_json::Value> {
use std::borrow::Cow;
if buffer.is_empty() || max_records == 0 {
return (Vec::new(), None);
return Vec::new();
}
let text: Cow<'_, str> = String::from_utf8_lossy(buffer);
let mut collected_rev: Vec<serde_json::Value> = Vec::new();
let mut latest_timestamp: Option<String> = None;
for line in text.lines().rev() {
let trimmed = line.trim();
if trimmed.is_empty() {
@@ -463,13 +407,9 @@ fn collect_last_response_values(
}
let parsed: serde_json::Result<RolloutLine> = serde_json::from_str(trimmed);
let Ok(rollout_line) = parsed else { continue };
let RolloutLine { timestamp, item } = rollout_line;
if let RolloutItem::ResponseItem(item) = item
&& let Ok(val) = serde_json::to_value(&item)
if let RolloutItem::ResponseItem(item) = rollout_line.item
&& let Ok(val) = serde_json::to_value(item)
{
if latest_timestamp.is_none() {
latest_timestamp = Some(timestamp.clone());
}
collected_rev.push(val);
if collected_rev.len() == max_records {
break;
@@ -477,7 +417,7 @@ fn collect_last_response_values(
}
}
collected_rev.reverse();
(collected_rev, latest_timestamp)
collected_rev
}
/// Locate a recorded conversation rollout file by its UUID string using the existing

View File

@@ -1,11 +1,7 @@
//! Rollout module: persistence and discovery of session rollout files.
use codex_protocol::protocol::SessionSource;
pub const SESSIONS_SUBDIR: &str = "sessions";
pub const ARCHIVED_SESSIONS_SUBDIR: &str = "archived_sessions";
pub const INTERACTIVE_SESSION_SOURCES: &[SessionSource] =
&[SessionSource::Cli, SessionSource::VSCode];
pub mod list;
pub(crate) mod policy;

View File

@@ -70,7 +70,6 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool {
| EventMsg::ListCustomPromptsResponse(_)
| EventMsg::PlanUpdate(_)
| EventMsg::ShutdownComplete
| EventMsg::ViewImageToolCall(_)
| EventMsg::ConversationPath(_) => false,
}
}

View File

@@ -6,7 +6,7 @@ use std::io::Error as IoError;
use std::path::Path;
use std::path::PathBuf;
use codex_protocol::ConversationId;
use codex_protocol::mcp_protocol::ConversationId;
use serde_json::Value;
use time::OffsetDateTime;
use time::format_description::FormatItem;
@@ -32,7 +32,6 @@ use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::SessionMeta;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::SessionSource;
/// Records all [`ResponseItem`]s for a session and flushes them to disk after
/// every update.
@@ -54,7 +53,6 @@ pub enum RolloutRecorderParams {
Create {
conversation_id: ConversationId,
instructions: Option<String>,
source: SessionSource,
},
Resume {
path: PathBuf,
@@ -73,15 +71,10 @@ enum RolloutCmd {
}
impl RolloutRecorderParams {
pub fn new(
conversation_id: ConversationId,
instructions: Option<String>,
source: SessionSource,
) -> Self {
pub fn new(conversation_id: ConversationId, instructions: Option<String>) -> Self {
Self::Create {
conversation_id,
instructions,
source,
}
}
@@ -96,9 +89,8 @@ impl RolloutRecorder {
codex_home: &Path,
page_size: usize,
cursor: Option<&Cursor>,
allowed_sources: &[SessionSource],
) -> std::io::Result<ConversationsPage> {
get_conversations(codex_home, page_size, cursor, allowed_sources).await
get_conversations(codex_home, page_size, cursor).await
}
/// Attempt to create a new [`RolloutRecorder`]. If the sessions directory
@@ -109,7 +101,6 @@ impl RolloutRecorder {
RolloutRecorderParams::Create {
conversation_id,
instructions,
source,
} => {
let LogFileInfo {
file,
@@ -136,7 +127,6 @@ impl RolloutRecorder {
originator: originator().value.clone(),
cli_version: env!("CARGO_PKG_VERSION").to_string(),
instructions,
source,
}),
)
}

View File

@@ -12,14 +12,13 @@ use time::format_description::FormatItem;
use time::macros::format_description;
use uuid::Uuid;
use crate::rollout::INTERACTIVE_SESSION_SOURCES;
use crate::rollout::list::ConversationItem;
use crate::rollout::list::ConversationsPage;
use crate::rollout::list::Cursor;
use crate::rollout::list::get_conversation;
use crate::rollout::list::get_conversations;
use anyhow::Result;
use codex_protocol::ConversationId;
use codex_protocol::mcp_protocol::ConversationId;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::CompactedItem;
@@ -29,17 +28,13 @@ use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::SessionMeta;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::UserMessageEvent;
const NO_SOURCE_FILTER: &[SessionSource] = &[];
fn write_session_file(
root: &Path,
ts_str: &str,
uuid: Uuid,
num_records: usize,
source: Option<SessionSource>,
) -> std::io::Result<(OffsetDateTime, Uuid)> {
let format: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
@@ -57,23 +52,17 @@ fn write_session_file(
let file_path = dir.join(filename);
let mut file = File::create(file_path)?;
let mut payload = serde_json::json!({
"id": uuid,
"timestamp": ts_str,
"instructions": null,
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version",
});
if let Some(source) = source {
payload["source"] = serde_json::to_value(source).unwrap();
}
let meta = serde_json::json!({
"timestamp": ts_str,
"type": "session_meta",
"payload": payload,
"payload": {
"id": uuid,
"timestamp": ts_str,
"instructions": null,
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version"
}
});
writeln!(file, "{meta}")?;
@@ -110,34 +99,11 @@ async fn test_list_conversations_latest_first() {
let u3 = Uuid::from_u128(3);
// Create three sessions across three days
write_session_file(
home,
"2025-01-01T12-00-00",
u1,
3,
Some(SessionSource::VSCode),
)
.unwrap();
write_session_file(
home,
"2025-01-02T12-00-00",
u2,
3,
Some(SessionSource::VSCode),
)
.unwrap();
write_session_file(
home,
"2025-01-03T12-00-00",
u3,
3,
Some(SessionSource::VSCode),
)
.unwrap();
write_session_file(home, "2025-01-01T12-00-00", u1, 3).unwrap();
write_session_file(home, "2025-01-02T12-00-00", u2, 3).unwrap();
write_session_file(home, "2025-01-03T12-00-00", u3, 3).unwrap();
let page = get_conversations(home, 10, None, INTERACTIVE_SESSION_SOURCES)
.await
.unwrap();
let page = get_conversations(home, 10, None).await.unwrap();
// Build expected objects
let p1 = home
@@ -165,8 +131,7 @@ async fn test_list_conversations_latest_first() {
"instructions": null,
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"cli_version": "test_version"
})];
let head_2 = vec![serde_json::json!({
"id": u2,
@@ -174,8 +139,7 @@ async fn test_list_conversations_latest_first() {
"instructions": null,
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"cli_version": "test_version"
})];
let head_1 = vec![serde_json::json!({
"id": u1,
@@ -183,8 +147,7 @@ async fn test_list_conversations_latest_first() {
"instructions": null,
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"cli_version": "test_version"
})];
let expected_cursor: Cursor =
@@ -196,22 +159,16 @@ async fn test_list_conversations_latest_first() {
path: p1,
head: head_3,
tail: Vec::new(),
created_at: Some("2025-01-03T12-00-00".into()),
updated_at: Some("2025-01-03T12-00-00".into()),
},
ConversationItem {
path: p2,
head: head_2,
tail: Vec::new(),
created_at: Some("2025-01-02T12-00-00".into()),
updated_at: Some("2025-01-02T12-00-00".into()),
},
ConversationItem {
path: p3,
head: head_1,
tail: Vec::new(),
created_at: Some("2025-01-01T12-00-00".into()),
updated_at: Some("2025-01-01T12-00-00".into()),
},
],
next_cursor: Some(expected_cursor),
@@ -235,50 +192,13 @@ async fn test_pagination_cursor() {
let u5 = Uuid::from_u128(55);
// Oldest to newest
write_session_file(
home,
"2025-03-01T09-00-00",
u1,
1,
Some(SessionSource::VSCode),
)
.unwrap();
write_session_file(
home,
"2025-03-02T09-00-00",
u2,
1,
Some(SessionSource::VSCode),
)
.unwrap();
write_session_file(
home,
"2025-03-03T09-00-00",
u3,
1,
Some(SessionSource::VSCode),
)
.unwrap();
write_session_file(
home,
"2025-03-04T09-00-00",
u4,
1,
Some(SessionSource::VSCode),
)
.unwrap();
write_session_file(
home,
"2025-03-05T09-00-00",
u5,
1,
Some(SessionSource::VSCode),
)
.unwrap();
write_session_file(home, "2025-03-01T09-00-00", u1, 1).unwrap();
write_session_file(home, "2025-03-02T09-00-00", u2, 1).unwrap();
write_session_file(home, "2025-03-03T09-00-00", u3, 1).unwrap();
write_session_file(home, "2025-03-04T09-00-00", u4, 1).unwrap();
write_session_file(home, "2025-03-05T09-00-00", u5, 1).unwrap();
let page1 = get_conversations(home, 2, None, INTERACTIVE_SESSION_SOURCES)
.await
.unwrap();
let page1 = get_conversations(home, 2, None).await.unwrap();
let p5 = home
.join("sessions")
.join("2025")
@@ -297,8 +217,7 @@ async fn test_pagination_cursor() {
"instructions": null,
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"cli_version": "test_version"
})];
let head_4 = vec![serde_json::json!({
"id": u4,
@@ -306,8 +225,7 @@ async fn test_pagination_cursor() {
"instructions": null,
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"cli_version": "test_version"
})];
let expected_cursor1: Cursor =
serde_json::from_str(&format!("\"2025-03-04T09-00-00|{u4}\"")).unwrap();
@@ -317,15 +235,11 @@ async fn test_pagination_cursor() {
path: p5,
head: head_5,
tail: Vec::new(),
created_at: Some("2025-03-05T09-00-00".into()),
updated_at: Some("2025-03-05T09-00-00".into()),
},
ConversationItem {
path: p4,
head: head_4,
tail: Vec::new(),
created_at: Some("2025-03-04T09-00-00".into()),
updated_at: Some("2025-03-04T09-00-00".into()),
},
],
next_cursor: Some(expected_cursor1.clone()),
@@ -334,14 +248,9 @@ async fn test_pagination_cursor() {
};
assert_eq!(page1, expected_page1);
let page2 = get_conversations(
home,
2,
page1.next_cursor.as_ref(),
INTERACTIVE_SESSION_SOURCES,
)
.await
.unwrap();
let page2 = get_conversations(home, 2, page1.next_cursor.as_ref())
.await
.unwrap();
let p3 = home
.join("sessions")
.join("2025")
@@ -360,8 +269,7 @@ async fn test_pagination_cursor() {
"instructions": null,
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"cli_version": "test_version"
})];
let head_2 = vec![serde_json::json!({
"id": u2,
@@ -369,8 +277,7 @@ async fn test_pagination_cursor() {
"instructions": null,
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"cli_version": "test_version"
})];
let expected_cursor2: Cursor =
serde_json::from_str(&format!("\"2025-03-02T09-00-00|{u2}\"")).unwrap();
@@ -380,15 +287,11 @@ async fn test_pagination_cursor() {
path: p3,
head: head_3,
tail: Vec::new(),
created_at: Some("2025-03-03T09-00-00".into()),
updated_at: Some("2025-03-03T09-00-00".into()),
},
ConversationItem {
path: p2,
head: head_2,
tail: Vec::new(),
created_at: Some("2025-03-02T09-00-00".into()),
updated_at: Some("2025-03-02T09-00-00".into()),
},
],
next_cursor: Some(expected_cursor2.clone()),
@@ -397,14 +300,9 @@ async fn test_pagination_cursor() {
};
assert_eq!(page2, expected_page2);
let page3 = get_conversations(
home,
2,
page2.next_cursor.as_ref(),
INTERACTIVE_SESSION_SOURCES,
)
.await
.unwrap();
let page3 = get_conversations(home, 2, page2.next_cursor.as_ref())
.await
.unwrap();
let p1 = home
.join("sessions")
.join("2025")
@@ -417,8 +315,7 @@ async fn test_pagination_cursor() {
"instructions": null,
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"cli_version": "test_version"
})];
let expected_cursor3: Cursor =
serde_json::from_str(&format!("\"2025-03-01T09-00-00|{u1}\"")).unwrap();
@@ -427,8 +324,6 @@ async fn test_pagination_cursor() {
path: p1,
head: head_1,
tail: Vec::new(),
created_at: Some("2025-03-01T09-00-00".into()),
updated_at: Some("2025-03-01T09-00-00".into()),
}],
next_cursor: Some(expected_cursor3),
num_scanned_files: 5, // scanned 05, 04 (anchor), 03, 02 (anchor), 01
@@ -444,11 +339,9 @@ async fn test_get_conversation_contents() {
let uuid = Uuid::new_v4();
let ts = "2025-04-01T10-30-00";
write_session_file(home, ts, uuid, 2, Some(SessionSource::VSCode)).unwrap();
write_session_file(home, ts, uuid, 2).unwrap();
let page = get_conversations(home, 1, None, INTERACTIVE_SESSION_SOURCES)
.await
.unwrap();
let page = get_conversations(home, 1, None).await.unwrap();
let path = &page.items[0].path;
let content = get_conversation(path).await.unwrap();
@@ -466,8 +359,7 @@ async fn test_get_conversation_contents() {
"instructions": null,
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"cli_version": "test_version"
})];
let expected_cursor: Cursor = serde_json::from_str(&format!("\"{ts}|{uuid}\"")).unwrap();
let expected_page = ConversationsPage {
@@ -475,8 +367,6 @@ async fn test_get_conversation_contents() {
path: expected_path,
head: expected_head,
tail: Vec::new(),
created_at: Some(ts.into()),
updated_at: Some(ts.into()),
}],
next_cursor: Some(expected_cursor),
num_scanned_files: 1,
@@ -485,19 +375,7 @@ async fn test_get_conversation_contents() {
assert_eq!(page, expected_page);
// Entire file contents equality
let meta = serde_json::json!({
"timestamp": ts,
"type": "session_meta",
"payload": {
"id": uuid,
"timestamp": ts,
"instructions": null,
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
}
});
let meta = serde_json::json!({"timestamp": ts, "type": "session_meta", "payload": {"id": uuid, "timestamp": ts, "instructions": null, "cwd": ".", "originator": "test_originator", "cli_version": "test_version"}});
let user_event = serde_json::json!({
"timestamp": ts,
"type": "event_msg",
@@ -532,7 +410,6 @@ async fn test_tail_includes_last_response_items() -> Result<()> {
cwd: ".".into(),
originator: "test_originator".into(),
cli_version: "test_version".into(),
source: SessionSource::VSCode,
},
git: None,
}),
@@ -565,30 +442,25 @@ async fn test_tail_includes_last_response_items() -> Result<()> {
}
drop(file);
let page = get_conversations(home, 1, None, INTERACTIVE_SESSION_SOURCES).await?;
let page = get_conversations(home, 1, None).await?;
let item = page.items.first().expect("conversation item");
let tail_len = item.tail.len();
assert_eq!(tail_len, 10usize.min(total_messages));
let expected: Vec<serde_json::Value> = (total_messages - tail_len..total_messages)
.map(|idx| {
serde_json::json!({
"type": "message",
"role": "assistant",
"content": [
{
"type": "output_text",
"text": format!("reply-{idx}"),
}
],
serde_json::to_value(ResponseItem::Message {
id: None,
role: "assistant".into(),
content: vec![ContentItem::OutputText {
text: format!("reply-{idx}"),
}],
})
.expect("serialize response item")
})
.collect();
assert_eq!(item.tail, expected);
assert_eq!(item.created_at.as_deref(), Some(ts));
let expected_updated = format!("{ts}-{last:02}", last = total_messages - 1);
assert_eq!(item.updated_at.as_deref(), Some(expected_updated.as_str()));
Ok(())
}
@@ -616,7 +488,6 @@ async fn test_tail_handles_short_sessions() -> Result<()> {
cwd: ".".into(),
originator: "test_originator".into(),
cli_version: "test_version".into(),
source: SessionSource::VSCode,
},
git: None,
}),
@@ -648,32 +519,25 @@ async fn test_tail_handles_short_sessions() -> Result<()> {
}
drop(file);
let page = get_conversations(home, 1, None, INTERACTIVE_SESSION_SOURCES).await?;
let page = get_conversations(home, 1, None).await?;
let tail = &page.items.first().expect("conversation item").tail;
assert_eq!(tail.len(), 3);
let expected: Vec<serde_json::Value> = (0..3)
.map(|idx| {
serde_json::json!({
"type": "message",
"role": "assistant",
"content": [
{
"type": "output_text",
"text": format!("short-{idx}"),
}
],
serde_json::to_value(ResponseItem::Message {
id: None,
role: "assistant".into(),
content: vec![ContentItem::OutputText {
text: format!("short-{idx}"),
}],
})
.expect("serialize response item")
})
.collect();
assert_eq!(tail, &expected);
let expected_updated = format!("{ts}-{last:02}", last = 2);
assert_eq!(
page.items[0].updated_at.as_deref(),
Some(expected_updated.as_str())
);
Ok(())
}
@@ -701,7 +565,6 @@ async fn test_tail_skips_trailing_non_responses() -> Result<()> {
cwd: ".".into(),
originator: "test_originator".into(),
cli_version: "test_version".into(),
source: SessionSource::VSCode,
},
git: None,
}),
@@ -747,30 +610,23 @@ async fn test_tail_skips_trailing_non_responses() -> Result<()> {
writeln!(file, "{}", serde_json::to_string(&shutdown_event)?)?;
drop(file);
let page = get_conversations(home, 1, None, INTERACTIVE_SESSION_SOURCES).await?;
let page = get_conversations(home, 1, None).await?;
let tail = &page.items.first().expect("conversation item").tail;
let expected: Vec<serde_json::Value> = (0..4)
.map(|idx| {
serde_json::json!({
"type": "message",
"role": "assistant",
"content": [
{
"type": "output_text",
"text": format!("response-{idx}"),
}
],
serde_json::to_value(ResponseItem::Message {
id: None,
role: "assistant".into(),
content: vec![ContentItem::OutputText {
text: format!("response-{idx}"),
}],
})
.expect("serialize response item")
})
.collect();
assert_eq!(tail, &expected);
let expected_updated = format!("{ts}-{last:02}", last = 3);
assert_eq!(
page.items[0].updated_at.as_deref(),
Some(expected_updated.as_str())
);
Ok(())
}
@@ -785,13 +641,11 @@ async fn test_stable_ordering_same_second_pagination() {
let u2 = Uuid::from_u128(2);
let u3 = Uuid::from_u128(3);
write_session_file(home, ts, u1, 0, Some(SessionSource::VSCode)).unwrap();
write_session_file(home, ts, u2, 0, Some(SessionSource::VSCode)).unwrap();
write_session_file(home, ts, u3, 0, Some(SessionSource::VSCode)).unwrap();
write_session_file(home, ts, u1, 0).unwrap();
write_session_file(home, ts, u2, 0).unwrap();
write_session_file(home, ts, u3, 0).unwrap();
let page1 = get_conversations(home, 2, None, INTERACTIVE_SESSION_SOURCES)
.await
.unwrap();
let page1 = get_conversations(home, 2, None).await.unwrap();
let p3 = home
.join("sessions")
@@ -812,8 +666,7 @@ async fn test_stable_ordering_same_second_pagination() {
"instructions": null,
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"cli_version": "test_version"
})]
};
let expected_cursor1: Cursor = serde_json::from_str(&format!("\"{ts}|{u2}\"")).unwrap();
@@ -823,15 +676,11 @@ async fn test_stable_ordering_same_second_pagination() {
path: p3,
head: head(u3),
tail: Vec::new(),
created_at: Some(ts.to_string()),
updated_at: Some(ts.to_string()),
},
ConversationItem {
path: p2,
head: head(u2),
tail: Vec::new(),
created_at: Some(ts.to_string()),
updated_at: Some(ts.to_string()),
},
],
next_cursor: Some(expected_cursor1.clone()),
@@ -840,14 +689,9 @@ async fn test_stable_ordering_same_second_pagination() {
};
assert_eq!(page1, expected_page1);
let page2 = get_conversations(
home,
2,
page1.next_cursor.as_ref(),
INTERACTIVE_SESSION_SOURCES,
)
.await
.unwrap();
let page2 = get_conversations(home, 2, page1.next_cursor.as_ref())
.await
.unwrap();
let p1 = home
.join("sessions")
.join("2025")
@@ -860,8 +704,6 @@ async fn test_stable_ordering_same_second_pagination() {
path: p1,
head: head(u1),
tail: Vec::new(),
created_at: Some(ts.to_string()),
updated_at: Some(ts.to_string()),
}],
next_cursor: Some(expected_cursor2),
num_scanned_files: 3, // scanned u3, u2 (anchor), u1
@@ -869,59 +711,3 @@ async fn test_stable_ordering_same_second_pagination() {
};
assert_eq!(page2, expected_page2);
}
#[tokio::test]
async fn test_source_filter_excludes_non_matching_sessions() {
let temp = TempDir::new().unwrap();
let home = temp.path();
let interactive_id = Uuid::from_u128(42);
let non_interactive_id = Uuid::from_u128(77);
write_session_file(
home,
"2025-08-02T10-00-00",
interactive_id,
2,
Some(SessionSource::Cli),
)
.unwrap();
write_session_file(
home,
"2025-08-01T10-00-00",
non_interactive_id,
2,
Some(SessionSource::Exec),
)
.unwrap();
let interactive_only = get_conversations(home, 10, None, INTERACTIVE_SESSION_SOURCES)
.await
.unwrap();
let paths: Vec<_> = interactive_only
.items
.iter()
.map(|item| item.path.as_path())
.collect();
assert_eq!(paths.len(), 1);
assert!(paths.iter().all(|path| {
path.ends_with("rollout-2025-08-02T10-00-00-00000000-0000-0000-0000-00000000002a.jsonl")
}));
let all_sessions = get_conversations(home, 10, None, NO_SOURCE_FILTER)
.await
.unwrap();
let all_paths: Vec<_> = all_sessions
.items
.into_iter()
.map(|item| item.path)
.collect();
assert_eq!(all_paths.len(), 2);
assert!(all_paths.iter().any(|path| {
path.ends_with("rollout-2025-08-02T10-00-00-00000000-0000-0000-0000-00000000002a.jsonl")
}));
assert!(all_paths.iter().any(|path| {
path.ends_with("rollout-2025-08-01T10-00-00-00000000-0000-0000-0000-00000000004d.jsonl")
}));
}

View File

@@ -116,7 +116,7 @@ It is important to remember:
parameters: JsonSchema::Object {
properties,
required: Some(vec!["input".to_string()]),
additional_properties: Some(false.into()),
additional_properties: Some(false),
},
})
}

View File

@@ -1,6 +1,5 @@
use std::sync::Arc;
use codex_app_server_protocol::AuthMode;
use codex_core::ContentItem;
use codex_core::LocalShellAction;
use codex_core::LocalShellExecAction;
@@ -13,7 +12,8 @@ use codex_core::ResponseItem;
use codex_core::WireApi;
use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
use codex_otel::otel_event_manager::OtelEventManager;
use codex_protocol::ConversationId;
use codex_protocol::mcp_protocol::AuthMode;
use codex_protocol::mcp_protocol::ConversationId;
use core_test_support::load_default_config_for_test;
use futures::StreamExt;
use serde_json::Value;

View File

@@ -1,7 +1,6 @@
use std::sync::Arc;
use tracing_test::traced_test;
use codex_app_server_protocol::AuthMode;
use codex_core::ContentItem;
use codex_core::ModelClient;
use codex_core::ModelProviderInfo;
@@ -11,7 +10,8 @@ use codex_core::ResponseItem;
use codex_core::WireApi;
use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
use codex_otel::otel_event_manager::OtelEventManager;
use codex_protocol::ConversationId;
use codex_protocol::mcp_protocol::AuthMode;
use codex_protocol::mcp_protocol::ConversationId;
use core_test_support::load_default_config_for_test;
use futures::StreamExt;
use tempfile::TempDir;

View File

@@ -1,5 +1,4 @@
#![allow(clippy::expect_used)]
use codex_core::auth::CODEX_API_KEY_ENV_VAR;
use std::path::Path;
use tempfile::TempDir;
use wiremock::MockServer;
@@ -15,7 +14,7 @@ impl TestCodexExecBuilder {
.expect("should find binary for codex-exec");
cmd.current_dir(self.cwd.path())
.env("CODEX_HOME", self.home.path())
.env(CODEX_API_KEY_ENV_VAR, "dummy");
.env("OPENAI_API_KEY", "dummy");
cmd
}
pub fn cmd_with_server(&self, server: &MockServer) -> assert_cmd::Command {

View File

@@ -76,7 +76,7 @@ async fn chat_mode_stream_cli() {
server.verify().await;
// Verify a new session rollout was created and is discoverable via list_conversations
let page = RolloutRecorder::list_conversations(home.path(), 10, None, &[])
let page = RolloutRecorder::list_conversations(home.path(), 10, None)
.await
.expect("list conversations");
assert!(

View File

@@ -1,4 +1,3 @@
use codex_app_server_protocol::AuthMode;
use codex_core::CodexAuth;
use codex_core::ContentItem;
use codex_core::ConversationManager;
@@ -17,9 +16,9 @@ use codex_core::built_in_model_providers;
use codex_core::protocol::EventMsg;
use codex_core::protocol::InputItem;
use codex_core::protocol::Op;
use codex_core::protocol::SessionSource;
use codex_otel::otel_event_manager::OtelEventManager;
use codex_protocol::ConversationId;
use codex_protocol::mcp_protocol::AuthMode;
use codex_protocol::mcp_protocol::ConversationId;
use codex_protocol::models::ReasoningItemReasoningSummary;
use codex_protocol::models::WebSearchAction;
use core_test_support::load_default_config_for_test;
@@ -539,7 +538,7 @@ async fn prefers_apikey_when_config_prefers_apikey_even_with_chatgpt_tokens() {
Ok(None) => panic!("No CodexAuth found in codex_home"),
Err(e) => panic!("Failed to load CodexAuth: {e}"),
};
let conversation_manager = ConversationManager::new(auth_manager, SessionSource::Exec);
let conversation_manager = ConversationManager::new(auth_manager);
let NewConversation {
conversation: codex,
..

View File

@@ -16,6 +16,7 @@ workspace = true
[dependencies]
anyhow = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true, features = ["derive"] }
codex-arg0 = { workspace = true }
codex-common = { workspace = true, features = [
@@ -26,7 +27,6 @@ codex-common = { workspace = true, features = [
codex-core = { workspace = true }
codex-ollama = { workspace = true }
codex-protocol = { workspace = true }
opentelemetry-appender-tracing = { workspace = true }
owo-colors = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
@@ -40,6 +40,7 @@ tokio = { workspace = true, features = [
] }
tracing = { workspace = true, features = ["log"] }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
opentelemetry-appender-tracing = { workspace = true }
ts-rs = { workspace = true, features = [
"uuid-impl",
"serde-json-impl",
@@ -51,10 +52,10 @@ ts-rs = { workspace = true, features = [
assert_cmd = { workspace = true }
core_test_support = { workspace = true }
libc = { workspace = true }
mcp-types = { workspace = true }
predicates = { workspace = true }
pretty_assertions = { workspace = true }
tempfile = { workspace = true }
uuid = { workspace = true }
walkdir = { workspace = true }
wiremock = { workspace = true }
mcp-types = { workspace = true }

View File

@@ -64,9 +64,20 @@ pub struct Cli {
pub color: Color,
/// Print events to stdout as JSONL.
#[arg(long = "json", alias = "experimental-json", default_value_t = false)]
#[arg(
long = "json",
default_value_t = false,
conflicts_with = "experimental_json"
)]
pub json: bool,
#[arg(
long = "experimental-json",
default_value_t = false,
conflicts_with = "json"
)]
pub experimental_json: bool,
/// Whether to include the plan tool in the conversation.
#[arg(long = "include-plan-tool", default_value_t = false)]
pub include_plan_tool: bool,

View File

@@ -21,8 +21,6 @@ pub(crate) trait EventProcessor {
/// Handle a single event emitted by the agent.
fn process_event(&mut self, event: Event) -> CodexStatus;
fn print_final_output(&mut self) {}
}
pub(crate) fn handle_last_message(last_agent_message: Option<&str>, output_file: &Path) {

View File

@@ -2,7 +2,10 @@ use codex_common::elapsed::format_duration;
use codex_common::elapsed::format_elapsed;
use codex_core::config::Config;
use codex_core::plan_tool::UpdatePlanArgs;
use codex_core::protocol::AgentMessageDeltaEvent;
use codex_core::protocol::AgentMessageEvent;
use codex_core::protocol::AgentReasoningDeltaEvent;
use codex_core::protocol::AgentReasoningRawContentDeltaEvent;
use codex_core::protocol::AgentReasoningRawContentEvent;
use codex_core::protocol::BackgroundEventEvent;
use codex_core::protocol::ErrorEvent;
@@ -28,6 +31,7 @@ use owo_colors::OwoColorize;
use owo_colors::Style;
use shlex::try_join;
use std::collections::HashMap;
use std::io::Write;
use std::path::PathBuf;
use std::time::Instant;
@@ -40,6 +44,7 @@ use codex_common::create_config_summary_entries;
/// a limit so they can see the full transcript.
const MAX_OUTPUT_LINES_FOR_EXEC_TOOL_CALL: usize = 20;
pub(crate) struct EventProcessorWithHumanOutput {
call_id_to_command: HashMap<String, ExecCommandBegin>,
call_id_to_patch: HashMap<String, PatchApplyBegin>,
// To ensure that --color=never is respected, ANSI escapes _must_ be added
@@ -57,8 +62,10 @@ pub(crate) struct EventProcessorWithHumanOutput {
/// Whether to include `AgentReasoning` events in the output.
show_agent_reasoning: bool,
show_raw_agent_reasoning: bool,
answer_started: bool,
reasoning_started: bool,
raw_reasoning_started: bool,
last_message_path: Option<PathBuf>,
last_total_token_usage: Option<codex_core::protocol::TokenUsageInfo>,
}
impl EventProcessorWithHumanOutput {
@@ -67,10 +74,12 @@ impl EventProcessorWithHumanOutput {
config: &Config,
last_message_path: Option<PathBuf>,
) -> Self {
let call_id_to_command = HashMap::new();
let call_id_to_patch = HashMap::new();
if with_ansi {
Self {
call_id_to_command,
call_id_to_patch,
bold: Style::new().bold(),
italic: Style::new().italic(),
@@ -81,11 +90,14 @@ impl EventProcessorWithHumanOutput {
cyan: Style::new().cyan(),
show_agent_reasoning: !config.hide_agent_reasoning,
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
answer_started: false,
reasoning_started: false,
raw_reasoning_started: false,
last_message_path,
last_total_token_usage: None,
}
} else {
Self {
call_id_to_command,
call_id_to_patch,
bold: Style::new(),
italic: Style::new(),
@@ -96,13 +108,19 @@ impl EventProcessorWithHumanOutput {
cyan: Style::new(),
show_agent_reasoning: !config.hide_agent_reasoning,
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
answer_started: false,
reasoning_started: false,
raw_reasoning_started: false,
last_message_path,
last_total_token_usage: None,
}
}
}
}
struct ExecCommandBegin {
command: Vec<String>,
}
struct PatchApplyBegin {
start_time: Instant,
auto_approved: bool,
@@ -112,6 +130,9 @@ struct PatchApplyBegin {
#[macro_export]
macro_rules! ts_println {
($self:ident, $($arg:tt)*) => {{
let now = chrono::Utc::now();
let formatted = now.format("[%Y-%m-%dT%H:%M:%S]");
print!("{} ", formatted.style($self.dimmed));
println!($($arg)*);
}};
}
@@ -120,12 +141,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
/// Print a concise summary of the effective configuration that will be used
/// for the session. This mirrors the information shown in the TUI welcome
/// screen.
fn print_config_summary(
&mut self,
config: &Config,
prompt: &str,
session_configured_event: &SessionConfiguredEvent,
) {
fn print_config_summary(&mut self, config: &Config, prompt: &str, _: &SessionConfiguredEvent) {
const VERSION: &str = env!("CARGO_PKG_VERSION");
ts_println!(
self,
@@ -133,11 +149,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
VERSION
);
let mut entries = create_config_summary_entries(config);
entries.push((
"session id",
session_configured_event.session_id.to_string(),
));
let entries = create_config_summary_entries(config);
for (key, value) in entries {
println!("{} {}", format!("{key}:").style(self.bold), value);
@@ -148,7 +160,12 @@ impl EventProcessor for EventProcessorWithHumanOutput {
// Echo the prompt that will be sent to the agent so it is visible in the
// transcript/logs before any events come in. Note the prompt may have been
// read from stdin, so it may not be visible in the terminal otherwise.
ts_println!(self, "{}\n{}", "user".style(self.cyan), prompt);
ts_println!(
self,
"{}\n{}",
"User instructions:".style(self.bold).style(self.cyan),
prompt
);
}
fn process_event(&mut self, event: Event) -> CodexStatus {
@@ -174,49 +191,126 @@ impl EventProcessor for EventProcessorWithHumanOutput {
return CodexStatus::InitiateShutdown;
}
EventMsg::TokenCount(ev) => {
self.last_total_token_usage = ev.info;
if let Some(usage_info) = ev.info {
ts_println!(
self,
"tokens used: {}",
format_with_separators(usage_info.total_token_usage.blended_total())
);
}
}
EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }) => {
if !self.answer_started {
ts_println!(self, "{}\n", "codex".style(self.italic).style(self.magenta));
self.answer_started = true;
}
print!("{delta}");
#[expect(clippy::expect_used)]
std::io::stdout().flush().expect("could not flush stdout");
}
EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }) => {
if !self.show_agent_reasoning {
return CodexStatus::Running;
}
if !self.reasoning_started {
ts_println!(
self,
"{}\n",
"thinking".style(self.italic).style(self.magenta),
);
self.reasoning_started = true;
}
print!("{delta}");
#[expect(clippy::expect_used)]
std::io::stdout().flush().expect("could not flush stdout");
}
EventMsg::AgentReasoningSectionBreak(_) => {
if !self.show_agent_reasoning {
return CodexStatus::Running;
}
println!();
#[expect(clippy::expect_used)]
std::io::stdout().flush().expect("could not flush stdout");
}
EventMsg::AgentReasoningRawContent(AgentReasoningRawContentEvent { text }) => {
if self.show_raw_agent_reasoning {
if !self.show_raw_agent_reasoning {
return CodexStatus::Running;
}
if !self.raw_reasoning_started {
print!("{text}");
#[expect(clippy::expect_used)]
std::io::stdout().flush().expect("could not flush stdout");
} else {
println!();
self.raw_reasoning_started = false;
}
}
EventMsg::AgentReasoningRawContentDelta(AgentReasoningRawContentDeltaEvent {
delta,
}) => {
if !self.show_raw_agent_reasoning {
return CodexStatus::Running;
}
if !self.raw_reasoning_started {
self.raw_reasoning_started = true;
}
print!("{delta}");
#[expect(clippy::expect_used)]
std::io::stdout().flush().expect("could not flush stdout");
}
EventMsg::AgentMessage(AgentMessageEvent { message }) => {
// if answer_started is false, this means we haven't received any
// delta. Thus, we need to print the message as a new answer.
if !self.answer_started {
ts_println!(
self,
"{}\n{}",
"thinking".style(self.italic).style(self.magenta),
text,
"codex".style(self.italic).style(self.magenta),
message,
);
} else {
println!();
self.answer_started = false;
}
}
EventMsg::AgentMessage(AgentMessageEvent { message }) => {
EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
call_id,
command,
cwd,
parsed_cmd: _,
}) => {
self.call_id_to_command.insert(
call_id,
ExecCommandBegin {
command: command.clone(),
},
);
ts_println!(
self,
"{}\n{}",
"codex".style(self.italic).style(self.magenta),
message,
);
}
EventMsg::ExecCommandBegin(ExecCommandBeginEvent { command, cwd, .. }) => {
print!(
"{}\n{} in {}",
"exec".style(self.italic).style(self.magenta),
"{} {} in {}",
"exec".style(self.magenta),
escape_command(&command).style(self.bold),
cwd.to_string_lossy(),
);
}
EventMsg::ExecCommandOutputDelta(_) => {}
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id,
aggregated_output,
duration,
exit_code,
..
}) => {
let duration = format!(" in {}", format_duration(duration));
let exec_command = self.call_id_to_command.remove(&call_id);
let (duration, call) = if let Some(ExecCommandBegin { command, .. }) = exec_command
{
(
format!(" in {}", format_duration(duration)),
format!("{}", escape_command(&command).style(self.bold)),
)
} else {
("".to_string(), format!("exec('{call_id}')"))
};
let truncated_output = aggregated_output
.lines()
@@ -225,11 +319,11 @@ impl EventProcessor for EventProcessorWithHumanOutput {
.join("\n");
match exit_code {
0 => {
let title = format!(" succeeded{duration}:");
let title = format!("{call} succeeded{duration}:");
ts_println!(self, "{}", title.style(self.green));
}
_ => {
let title = format!(" exited {exit_code}{duration}:");
let title = format!("{call} exited {exit_code}{duration}:");
ts_println!(self, "{}", title.style(self.red));
}
}
@@ -297,8 +391,9 @@ impl EventProcessor for EventProcessorWithHumanOutput {
ts_println!(
self,
"{}",
"file update".style(self.magenta).style(self.italic),
"{} auto_approved={}:",
"apply_patch".style(self.magenta),
auto_approved,
);
// Pretty-print the patch summary with colored diff markers so
@@ -397,11 +492,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
}
}
EventMsg::TurnDiff(TurnDiffEvent { unified_diff }) => {
ts_println!(
self,
"{}",
"file update:".style(self.magenta).style(self.italic)
);
ts_println!(self, "{}", "turn diff:".style(self.magenta));
println!("{unified_diff}");
}
EventMsg::ExecApprovalRequest(_) => {
@@ -412,12 +503,17 @@ impl EventProcessor for EventProcessorWithHumanOutput {
}
EventMsg::AgentReasoning(agent_reasoning_event) => {
if self.show_agent_reasoning {
ts_println!(
self,
"{}\n{}",
"thinking".style(self.italic).style(self.magenta),
agent_reasoning_event.text,
);
if !self.reasoning_started {
ts_println!(
self,
"{}\n{}",
"codex".style(self.italic).style(self.magenta),
agent_reasoning_event.text,
);
} else {
println!();
self.reasoning_started = false;
}
}
}
EventMsg::SessionConfigured(session_configured_event) => {
@@ -484,14 +580,6 @@ impl EventProcessor for EventProcessorWithHumanOutput {
EventMsg::ListCustomPromptsResponse(_) => {
// Currently ignored in exec output.
}
EventMsg::ViewImageToolCall(view) => {
ts_println!(
self,
"{} {}",
"viewed image".style(self.magenta),
view.path.display()
);
}
EventMsg::TurnAborted(abort_reason) => match abort_reason.reason {
TurnAbortReason::Interrupted => {
ts_println!(self, "task interrupted");
@@ -508,23 +596,9 @@ impl EventProcessor for EventProcessorWithHumanOutput {
EventMsg::UserMessage(_) => {}
EventMsg::EnteredReviewMode(_) => {}
EventMsg::ExitedReviewMode(_) => {}
EventMsg::AgentMessageDelta(_) => {}
EventMsg::AgentReasoningDelta(_) => {}
EventMsg::AgentReasoningRawContentDelta(_) => {}
}
CodexStatus::Running
}
fn print_final_output(&mut self) {
if let Some(usage_info) = &self.last_total_token_usage {
ts_println!(
self,
"{}\n{}",
"tokens used".style(self.magenta).style(self.italic),
format_with_separators(usage_info.total_token_usage.blended_total())
);
}
}
}
fn escape_command(command: &[String]) -> String {

View File

@@ -0,0 +1,64 @@
use std::collections::HashMap;
use std::path::PathBuf;
use codex_core::config::Config;
use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
use codex_core::protocol::SessionConfiguredEvent;
use codex_core::protocol::TaskCompleteEvent;
use serde_json::json;
use crate::event_processor::CodexStatus;
use crate::event_processor::EventProcessor;
use crate::event_processor::handle_last_message;
use codex_common::create_config_summary_entries;
pub(crate) struct EventProcessorWithJsonOutput {
last_message_path: Option<PathBuf>,
}
impl EventProcessorWithJsonOutput {
pub fn new(last_message_path: Option<PathBuf>) -> Self {
Self { last_message_path }
}
}
impl EventProcessor for EventProcessorWithJsonOutput {
fn print_config_summary(&mut self, config: &Config, prompt: &str, _: &SessionConfiguredEvent) {
let entries = create_config_summary_entries(config)
.into_iter()
.map(|(key, value)| (key.to_string(), value))
.collect::<HashMap<String, String>>();
#[expect(clippy::expect_used)]
let config_json =
serde_json::to_string(&entries).expect("Failed to serialize config summary to JSON");
println!("{config_json}");
let prompt_json = json!({
"prompt": prompt,
});
println!("{prompt_json}");
}
fn process_event(&mut self, event: Event) -> CodexStatus {
match event.msg {
EventMsg::AgentMessageDelta(_) | EventMsg::AgentReasoningDelta(_) => {
// Suppress streaming events in JSON mode.
CodexStatus::Running
}
EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }) => {
if let Some(output_file) = self.last_message_path.as_deref() {
handle_last_message(last_agent_message.as_deref(), output_file);
}
CodexStatus::InitiateShutdown
}
EventMsg::ShutdownComplete => CodexStatus::Shutdown,
_ => {
if let Ok(line) = serde_json::to_string(&event) {
println!("{line}");
}
CodexStatus::Running
}
}
}
}

View File

@@ -2,45 +2,34 @@ use serde::Deserialize;
use serde::Serialize;
use ts_rs::TS;
/// Top-level JSONL events emitted by codex exec
/// Top-level events emitted on the Codex Exec thread stream.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)]
#[serde(tag = "type")]
pub enum ThreadEvent {
/// Emitted when a new thread is started as the first event.
#[serde(rename = "thread.started")]
ThreadStarted(ThreadStartedEvent),
/// Emitted when a turn is started by sending a new prompt to the model.
/// A turn encompasses all events that happen while agent is processing the prompt.
#[serde(rename = "turn.started")]
TurnStarted(TurnStartedEvent),
/// Emitted when a turn is completed. Typically right after the assistant's response.
#[serde(rename = "turn.completed")]
TurnCompleted(TurnCompletedEvent),
/// Indicates that a turn failed with an error.
#[serde(rename = "turn.failed")]
TurnFailed(TurnFailedEvent),
/// Emitted when a new item is added to the thread. Typically the item will be in an "in progress" state.
#[serde(rename = "item.started")]
ItemStarted(ItemStartedEvent),
/// Emitted when an item is updated.
#[serde(rename = "item.updated")]
ItemUpdated(ItemUpdatedEvent),
/// Signals that an item has reached a terminal state—either success or failure.
#[serde(rename = "item.completed")]
ItemCompleted(ItemCompletedEvent),
/// Represents an unrecoverable error emitted directly by the event stream.
#[serde(rename = "error")]
Error(ThreadErrorEvent),
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)]
pub struct ThreadStartedEvent {
/// The identified of the new thread. Can be used to resume the thread later.
pub thread_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS, Default)]
pub struct TurnStartedEvent {}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)]
@@ -53,14 +42,11 @@ pub struct TurnFailedEvent {
pub error: ThreadErrorEvent,
}
/// Describes the usage of tokens during a turn.
/// Minimal usage summary for a turn.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS, Default)]
pub struct Usage {
/// The number of input tokens used during the turn.
pub input_tokens: u64,
/// The number of cached input tokens used during the turn.
pub cached_input_tokens: u64,
/// The number of output tokens used during the turn.
pub output_tokens: u64,
}
@@ -95,46 +81,36 @@ pub struct ThreadItem {
/// Typed payloads for each supported thread item type.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)]
#[serde(tag = "type", rename_all = "snake_case")]
#[serde(tag = "item_type", rename_all = "snake_case")]
pub enum ThreadItemDetails {
/// Response from the agent.
/// Either a natural-language response or a JSON string when structured output is requested.
AgentMessage(AgentMessageItem),
/// Agent's reasoning summary.
AssistantMessage(AssistantMessageItem),
Reasoning(ReasoningItem),
/// Tracks a command executed by the agent. The item starts when the command is
/// spawned, and completes when the process exits with an exit code.
CommandExecution(CommandExecutionItem),
/// Represents a set of file changes by the agent. The item is emitted only as a
/// completed event once the patch succeeds or fails.
FileChange(FileChangeItem),
/// Represents a call to an MCP tool. The item starts when the invocation is
/// dispatched and completes when the MCP server reports success or failure.
McpToolCall(McpToolCallItem),
/// Captures a web search request. It starts when the search is kicked off
/// and completes when results are returned to the agent.
WebSearch(WebSearchItem),
/// Tracks the agent's running to-do list. It starts when the plan is first
/// issued, updates as steps change state, and completes when the turn ends.
TodoList(TodoListItem),
/// Describes a non-fatal error surfaced as an item.
Error(ErrorItem),
}
/// Response from the agent.
/// Either a natural-language response or a JSON string when structured output is requested.
/// Session metadata.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)]
pub struct AgentMessageItem {
pub struct SessionItem {
pub session_id: String,
}
/// Assistant message payload.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)]
pub struct AssistantMessageItem {
pub text: String,
}
/// Agent's reasoning summary.
/// Model reasoning summary payload.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)]
pub struct ReasoningItem {
pub text: String,
}
/// The status of a command execution.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default, TS)]
#[serde(rename_all = "snake_case")]
pub enum CommandExecutionStatus {
@@ -144,7 +120,7 @@ pub enum CommandExecutionStatus {
Failed,
}
/// A command executed by the agent.
/// Local shell command execution payload.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)]
pub struct CommandExecutionItem {
pub command: String,
@@ -154,14 +130,13 @@ pub struct CommandExecutionItem {
pub status: CommandExecutionStatus,
}
/// A set of file changes by the agent.
/// Single file change summary for a patch.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)]
pub struct FileUpdateChange {
pub path: String,
pub kind: PatchChangeKind,
}
/// The status of a file change.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)]
#[serde(rename_all = "snake_case")]
pub enum PatchApplyStatus {
@@ -169,14 +144,14 @@ pub enum PatchApplyStatus {
Failed,
}
/// A set of file changes by the agent.
/// Patch application payload.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)]
pub struct FileChangeItem {
pub changes: Vec<FileUpdateChange>,
pub status: PatchApplyStatus,
}
/// Indicates the type of the file change.
/// Known change kinds for a patch.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)]
#[serde(rename_all = "snake_case")]
pub enum PatchChangeKind {
@@ -185,7 +160,6 @@ pub enum PatchChangeKind {
Update,
}
/// The status of an MCP tool call.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default, TS)]
#[serde(rename_all = "snake_case")]
pub enum McpToolCallStatus {
@@ -195,7 +169,6 @@ pub enum McpToolCallStatus {
Failed,
}
/// A call to an MCP tool.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)]
pub struct McpToolCallItem {
pub server: String,
@@ -203,19 +176,16 @@ pub struct McpToolCallItem {
pub status: McpToolCallStatus,
}
/// A web search request.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)]
pub struct WebSearchItem {
pub query: String,
}
/// An error notification.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)]
pub struct ErrorItem {
pub message: String,
}
/// An item in agent's to-do list.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)]
pub struct TodoItem {
pub text: String,

View File

@@ -5,7 +5,7 @@ use std::sync::atomic::AtomicU64;
use crate::event_processor::CodexStatus;
use crate::event_processor::EventProcessor;
use crate::event_processor::handle_last_message;
use crate::exec_events::AgentMessageItem;
use crate::exec_events::AssistantMessageItem;
use crate::exec_events::CommandExecutionItem;
use crate::exec_events::CommandExecutionStatus;
use crate::exec_events::FileChangeItem;
@@ -29,7 +29,6 @@ use crate::exec_events::TurnCompletedEvent;
use crate::exec_events::TurnFailedEvent;
use crate::exec_events::TurnStartedEvent;
use crate::exec_events::Usage;
use crate::exec_events::WebSearchItem;
use codex_core::config::Config;
use codex_core::plan_tool::StepStatus;
use codex_core::plan_tool::UpdatePlanArgs;
@@ -47,11 +46,10 @@ use codex_core::protocol::PatchApplyEndEvent;
use codex_core::protocol::SessionConfiguredEvent;
use codex_core::protocol::TaskCompleteEvent;
use codex_core::protocol::TaskStartedEvent;
use codex_core::protocol::WebSearchEndEvent;
use tracing::error;
use tracing::warn;
pub struct EventProcessorWithJsonOutput {
pub struct ExperimentalEventProcessorWithJsonOutput {
last_message_path: Option<PathBuf>,
next_event_id: AtomicU64,
// Tracks running commands by call_id, including the associated item id.
@@ -83,7 +81,7 @@ struct RunningMcpToolCall {
item_id: String,
}
impl EventProcessorWithJsonOutput {
impl ExperimentalEventProcessorWithJsonOutput {
pub fn new(last_message_path: Option<PathBuf>) -> Self {
Self {
last_message_path,
@@ -108,8 +106,6 @@ impl EventProcessorWithJsonOutput {
EventMsg::McpToolCallEnd(ev) => self.handle_mcp_tool_call_end(ev),
EventMsg::PatchApplyBegin(ev) => self.handle_patch_apply_begin(ev),
EventMsg::PatchApplyEnd(ev) => self.handle_patch_apply_end(ev),
EventMsg::WebSearchBegin(_) => Vec::new(),
EventMsg::WebSearchEnd(ev) => self.handle_web_search_end(ev),
EventMsg::TokenCount(ev) => {
if let Some(info) = &ev.info {
self.last_total_token_usage = Some(info.total_token_usage.clone());
@@ -147,22 +143,11 @@ impl EventProcessorWithJsonOutput {
})]
}
fn handle_web_search_end(&self, ev: &WebSearchEndEvent) -> Vec<ThreadEvent> {
let item = ThreadItem {
id: self.get_next_item_id(),
details: ThreadItemDetails::WebSearch(WebSearchItem {
query: ev.query.clone(),
}),
};
vec![ThreadEvent::ItemCompleted(ItemCompletedEvent { item })]
}
fn handle_agent_message(&self, payload: &AgentMessageEvent) -> Vec<ThreadEvent> {
let item = ThreadItem {
id: self.get_next_item_id(),
details: ThreadItemDetails::AgentMessage(AgentMessageItem {
details: ThreadItemDetails::AssistantMessage(AssistantMessageItem {
text: payload.message.clone(),
}),
};
@@ -420,7 +405,7 @@ impl EventProcessorWithJsonOutput {
}
}
impl EventProcessor for EventProcessorWithJsonOutput {
impl EventProcessor for ExperimentalEventProcessorWithJsonOutput {
fn print_config_summary(&mut self, _: &Config, _: &str, ev: &SessionConfiguredEvent) {
self.process_event(Event {
id: "".to_string(),

View File

@@ -1,8 +1,9 @@
mod cli;
mod event_processor;
mod event_processor_with_human_output;
pub mod event_processor_with_jsonl_output;
pub mod event_processor_with_json_output;
pub mod exec_events;
pub mod experimental_event_processor_with_json_output;
pub use cli::Cli;
use codex_core::AuthManager;
@@ -17,12 +18,12 @@ use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
use codex_core::protocol::InputItem;
use codex_core::protocol::Op;
use codex_core::protocol::SessionSource;
use codex_core::protocol::TaskCompleteEvent;
use codex_ollama::DEFAULT_OSS_MODEL;
use codex_protocol::config_types::SandboxMode;
use event_processor_with_human_output::EventProcessorWithHumanOutput;
use event_processor_with_jsonl_output::EventProcessorWithJsonOutput;
use event_processor_with_json_output::EventProcessorWithJsonOutput;
use experimental_event_processor_with_json_output::ExperimentalEventProcessorWithJsonOutput;
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use serde_json::Value;
use std::io::IsTerminal;
@@ -58,6 +59,7 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
color,
last_message_file,
json: json_mode,
experimental_json,
sandbox_mode: sandbox_mode_cli_arg,
prompt,
output_schema: output_schema_path,
@@ -210,8 +212,17 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
let _ = tracing_subscriber::registry().with(fmt_layer).try_init();
}
let mut event_processor: Box<dyn EventProcessor> = match json_mode {
true => Box::new(EventProcessorWithJsonOutput::new(last_message_file.clone())),
let mut event_processor: Box<dyn EventProcessor> = match (json_mode, experimental_json) {
(_, true) => Box::new(ExperimentalEventProcessorWithJsonOutput::new(
last_message_file.clone(),
)),
(true, _) => {
eprintln!(
"The existing `--json` output format is being deprecated. Please try the new format using `--experimental-json`."
);
Box::new(EventProcessorWithJsonOutput::new(last_message_file.clone()))
}
_ => Box::new(EventProcessorWithHumanOutput::create_with_ansi(
stdout_with_ansi,
&config,
@@ -237,8 +248,8 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
std::process::exit(1);
}
let auth_manager = AuthManager::shared(config.codex_home.clone(), true);
let conversation_manager = ConversationManager::new(auth_manager.clone(), SessionSource::Exec);
let conversation_manager =
ConversationManager::new(AuthManager::shared(config.codex_home.clone()));
// Handle resume subcommand by resolving a rollout path and using explicit resume API.
let NewConversation {
@@ -250,7 +261,11 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
if let Some(path) = resume_path {
conversation_manager
.resume_conversation_from_rollout(config.clone(), path, auth_manager.clone())
.resume_conversation_from_rollout(
config.clone(),
path,
AuthManager::shared(config.codex_home.clone()),
)
.await?
} else {
conversation_manager
@@ -364,7 +379,6 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
}
}
}
event_processor.print_final_output();
if error_seen {
std::process::exit(1);
}
@@ -377,9 +391,7 @@ async fn resolve_resume_path(
args: &crate::cli::ResumeArgs,
) -> anyhow::Result<Option<PathBuf>> {
if args.last {
match codex_core::RolloutRecorder::list_conversations(&config.codex_home, 1, None, &[])
.await
{
match codex_core::RolloutRecorder::list_conversations(&config.codex_home, 1, None).await {
Ok(page) => Ok(page.items.first().map(|it| it.path.clone())),
Err(e) => {
error!("Error listing conversations: {e}");

View File

@@ -12,9 +12,7 @@ use codex_core::protocol::McpToolCallEndEvent;
use codex_core::protocol::PatchApplyBeginEvent;
use codex_core::protocol::PatchApplyEndEvent;
use codex_core::protocol::SessionConfiguredEvent;
use codex_core::protocol::WebSearchEndEvent;
use codex_exec::event_processor_with_jsonl_output::EventProcessorWithJsonOutput;
use codex_exec::exec_events::AgentMessageItem;
use codex_exec::exec_events::AssistantMessageItem;
use codex_exec::exec_events::CommandExecutionItem;
use codex_exec::exec_events::CommandExecutionStatus;
use codex_exec::exec_events::ItemCompletedEvent;
@@ -36,7 +34,7 @@ use codex_exec::exec_events::TurnCompletedEvent;
use codex_exec::exec_events::TurnFailedEvent;
use codex_exec::exec_events::TurnStartedEvent;
use codex_exec::exec_events::Usage;
use codex_exec::exec_events::WebSearchItem;
use codex_exec::experimental_event_processor_with_json_output::ExperimentalEventProcessorWithJsonOutput;
use mcp_types::CallToolResult;
use pretty_assertions::assert_eq;
use std::path::PathBuf;
@@ -51,10 +49,11 @@ fn event(id: &str, msg: EventMsg) -> Event {
#[test]
fn session_configured_produces_thread_started_event() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let session_id =
codex_protocol::ConversationId::from_string("67e55044-10b1-426f-9247-bb680e5fe0c8")
.unwrap();
let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None);
let session_id = codex_protocol::mcp_protocol::ConversationId::from_string(
"67e55044-10b1-426f-9247-bb680e5fe0c8",
)
.unwrap();
let rollout_path = PathBuf::from("/tmp/rollout.json");
let ev = event(
"e1",
@@ -79,7 +78,7 @@ fn session_configured_produces_thread_started_event() {
#[test]
fn task_started_produces_turn_started_event() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None);
let out = ep.collect_thread_events(&event(
"t1",
EventMsg::TaskStarted(codex_core::protocol::TaskStartedEvent {
@@ -90,36 +89,13 @@ fn task_started_produces_turn_started_event() {
assert_eq!(out, vec![ThreadEvent::TurnStarted(TurnStartedEvent {})]);
}
#[test]
fn web_search_end_emits_item_completed() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let query = "rust async await".to_string();
let out = ep.collect_thread_events(&event(
"w1",
EventMsg::WebSearchEnd(WebSearchEndEvent {
call_id: "call-123".to_string(),
query: query.clone(),
}),
));
assert_eq!(
out,
vec![ThreadEvent::ItemCompleted(ItemCompletedEvent {
item: ThreadItem {
id: "item_0".to_string(),
details: ThreadItemDetails::WebSearch(WebSearchItem { query }),
},
})]
);
}
#[test]
fn plan_update_emits_todo_list_started_updated_and_completed() {
use codex_core::plan_tool::PlanItemArg;
use codex_core::plan_tool::StepStatus;
use codex_core::plan_tool::UpdatePlanArgs;
let mut ep = EventProcessorWithJsonOutput::new(None);
let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None);
// First plan update => item.started (todo_list)
let first = event(
@@ -236,7 +212,7 @@ fn plan_update_emits_todo_list_started_updated_and_completed() {
#[test]
fn mcp_tool_call_begin_and_end_emit_item_events() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None);
let invocation = McpInvocation {
server: "server_a".to_string(),
tool: "tool_x".to_string(),
@@ -296,7 +272,7 @@ fn mcp_tool_call_begin_and_end_emit_item_events() {
#[test]
fn mcp_tool_call_failure_sets_failed_status() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None);
let invocation = McpInvocation {
server: "server_b".to_string(),
tool: "tool_y".to_string(),
@@ -343,7 +319,7 @@ fn plan_update_after_complete_starts_new_todo_list_with_new_id() {
use codex_core::plan_tool::StepStatus;
use codex_core::plan_tool::UpdatePlanArgs;
let mut ep = EventProcessorWithJsonOutput::new(None);
let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None);
// First turn: start + complete
let start = event(
@@ -388,7 +364,7 @@ fn plan_update_after_complete_starts_new_todo_list_with_new_id() {
#[test]
fn agent_reasoning_produces_item_completed_reasoning() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None);
let ev = event(
"e1",
EventMsg::AgentReasoning(AgentReasoningEvent {
@@ -410,8 +386,8 @@ fn agent_reasoning_produces_item_completed_reasoning() {
}
#[test]
fn agent_message_produces_item_completed_agent_message() {
let mut ep = EventProcessorWithJsonOutput::new(None);
fn agent_message_produces_item_completed_assistant_message() {
let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None);
let ev = event(
"e1",
EventMsg::AgentMessage(AgentMessageEvent {
@@ -424,7 +400,7 @@ fn agent_message_produces_item_completed_agent_message() {
vec![ThreadEvent::ItemCompleted(ItemCompletedEvent {
item: ThreadItem {
id: "item_0".to_string(),
details: ThreadItemDetails::AgentMessage(AgentMessageItem {
details: ThreadItemDetails::AssistantMessage(AssistantMessageItem {
text: "hello".to_string(),
}),
},
@@ -434,7 +410,7 @@ fn agent_message_produces_item_completed_agent_message() {
#[test]
fn error_event_produces_error() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None);
let out = ep.collect_thread_events(&event(
"e1",
EventMsg::Error(codex_core::protocol::ErrorEvent {
@@ -451,7 +427,7 @@ fn error_event_produces_error() {
#[test]
fn stream_error_event_produces_error() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None);
let out = ep.collect_thread_events(&event(
"e1",
EventMsg::StreamError(codex_core::protocol::StreamErrorEvent {
@@ -468,7 +444,7 @@ fn stream_error_event_produces_error() {
#[test]
fn error_followed_by_task_complete_produces_turn_failed() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None);
let error_event = event(
"e1",
@@ -501,7 +477,7 @@ fn error_followed_by_task_complete_produces_turn_failed() {
#[test]
fn exec_command_end_success_produces_completed_command_item() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None);
// Begin -> no output
let begin = event(
@@ -561,7 +537,7 @@ fn exec_command_end_success_produces_completed_command_item() {
#[test]
fn exec_command_end_failure_produces_failed_command_item() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None);
// Begin -> no output
let begin = event(
@@ -620,7 +596,7 @@ fn exec_command_end_failure_produces_failed_command_item() {
#[test]
fn exec_command_end_without_begin_is_ignored() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None);
// End event arrives without a prior Begin; should produce no thread events.
let end_only = event(
@@ -641,7 +617,7 @@ fn exec_command_end_without_begin_is_ignored() {
#[test]
fn patch_apply_success_produces_item_completed_patchapply() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None);
// Prepare a patch with multiple kinds of changes
let mut changes = std::collections::HashMap::new();
@@ -723,7 +699,7 @@ fn patch_apply_success_produces_item_completed_patchapply() {
#[test]
fn patch_apply_failure_produces_item_completed_patchapply_failed() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None);
let mut changes = std::collections::HashMap::new();
changes.insert(
@@ -777,7 +753,7 @@ fn patch_apply_failure_produces_item_completed_patchapply_failed() {
#[test]
fn task_complete_produces_turn_completed_with_usage() {
let mut ep = EventProcessorWithJsonOutput::new(None);
let mut ep = ExperimentalEventProcessorWithJsonOutput::new(None);
// First, feed a TokenCount event with known totals.
let usage = codex_core::protocol::TokenUsage {

View File

@@ -1,34 +0,0 @@
#![allow(clippy::unwrap_used, clippy::expect_used)]
use core_test_support::responses::ev_completed;
use core_test_support::responses::sse;
use core_test_support::responses::sse_response;
use core_test_support::responses::start_mock_server;
use core_test_support::test_codex_exec::test_codex_exec;
use wiremock::Mock;
use wiremock::matchers::header;
use wiremock::matchers::method;
use wiremock::matchers::path;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exec_uses_codex_api_key_env_var() -> anyhow::Result<()> {
let test = test_codex_exec();
let server = start_mock_server().await;
Mock::given(method("POST"))
.and(path("/v1/responses"))
.and(header("Authorization", "Bearer dummy"))
.respond_with(sse_response(sse(vec![ev_completed("request_0")])))
.expect(1)
.mount(&server)
.await;
test.cmd_with_server(&server)
.arg("--skip-git-repo-check")
.arg("-C")
.arg(env!("CARGO_MANIFEST_DIR"))
.arg("echo testing codex api key")
.assert()
.success();
Ok(())
}

View File

@@ -1,6 +1,5 @@
// Aggregates all former standalone integration tests as modules.
mod apply_patch;
mod auth_env;
mod output_schema;
mod resume;
mod sandbox;

View File

@@ -1,9 +1,10 @@
#![allow(clippy::unwrap_used, clippy::expect_used)]
use anyhow::Context;
use core_test_support::test_codex_exec::test_codex_exec;
use assert_cmd::prelude::*;
use serde_json::Value;
use std::path::Path;
use std::process::Command;
use std::string::ToString;
use tempfile::TempDir;
use uuid::Uuid;
use walkdir::WalkDir;
@@ -71,15 +72,18 @@ fn extract_conversation_id(path: &std::path::Path) -> String {
#[test]
fn exec_resume_last_appends_to_existing_file() -> anyhow::Result<()> {
let test = test_codex_exec();
let fixture =
Path::new(env!("CARGO_MANIFEST_DIR")).join("tests/fixtures/cli_responses_fixture.sse");
let home = TempDir::new()?;
let fixture = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
.join("tests/fixtures/cli_responses_fixture.sse");
// 1) First run: create a session with a unique marker in the content.
let marker = format!("resume-last-{}", Uuid::new_v4());
let prompt = format!("echo {marker}");
test.cmd()
Command::cargo_bin("codex-exec")
.context("should find binary for codex-exec")?
.env("CODEX_HOME", home.path())
.env("OPENAI_API_KEY", "dummy")
.env("CODEX_RS_SSE_FIXTURE", &fixture)
.env("OPENAI_BASE_URL", "http://unused.local")
.arg("--skip-git-repo-check")
@@ -90,7 +94,7 @@ fn exec_resume_last_appends_to_existing_file() -> anyhow::Result<()> {
.success();
// Find the created session file containing the marker.
let sessions_dir = test.home_path().join("sessions");
let sessions_dir = home.path().join("sessions");
let path = find_session_file_containing_marker(&sessions_dir, &marker)
.expect("no session file found after first run");
@@ -98,7 +102,11 @@ fn exec_resume_last_appends_to_existing_file() -> anyhow::Result<()> {
let marker2 = format!("resume-last-2-{}", Uuid::new_v4());
let prompt2 = format!("echo {marker2}");
test.cmd()
let mut binding = assert_cmd::Command::cargo_bin("codex-exec")
.context("should find binary for codex-exec")?;
let cmd = binding
.env("CODEX_HOME", home.path())
.env("OPENAI_API_KEY", "dummy")
.env("CODEX_RS_SSE_FIXTURE", &fixture)
.env("OPENAI_BASE_URL", "http://unused.local")
.arg("--skip-git-repo-check")
@@ -106,9 +114,8 @@ fn exec_resume_last_appends_to_existing_file() -> anyhow::Result<()> {
.arg(env!("CARGO_MANIFEST_DIR"))
.arg(&prompt2)
.arg("resume")
.arg("--last")
.assert()
.success();
.arg("--last");
cmd.assert().success();
// Ensure the same file was updated and contains both markers.
let resumed_path = find_session_file_containing_marker(&sessions_dir, &marker2)
@@ -125,15 +132,18 @@ fn exec_resume_last_appends_to_existing_file() -> anyhow::Result<()> {
#[test]
fn exec_resume_by_id_appends_to_existing_file() -> anyhow::Result<()> {
let test = test_codex_exec();
let fixture =
Path::new(env!("CARGO_MANIFEST_DIR")).join("tests/fixtures/cli_responses_fixture.sse");
let home = TempDir::new()?;
let fixture = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
.join("tests/fixtures/cli_responses_fixture.sse");
// 1) First run: create a session
let marker = format!("resume-by-id-{}", Uuid::new_v4());
let prompt = format!("echo {marker}");
test.cmd()
Command::cargo_bin("codex-exec")
.context("should find binary for codex-exec")?
.env("CODEX_HOME", home.path())
.env("OPENAI_API_KEY", "dummy")
.env("CODEX_RS_SSE_FIXTURE", &fixture)
.env("OPENAI_BASE_URL", "http://unused.local")
.arg("--skip-git-repo-check")
@@ -143,7 +153,7 @@ fn exec_resume_by_id_appends_to_existing_file() -> anyhow::Result<()> {
.assert()
.success();
let sessions_dir = test.home_path().join("sessions");
let sessions_dir = home.path().join("sessions");
let path = find_session_file_containing_marker(&sessions_dir, &marker)
.expect("no session file found after first run");
let session_id = extract_conversation_id(&path);
@@ -156,7 +166,11 @@ fn exec_resume_by_id_appends_to_existing_file() -> anyhow::Result<()> {
let marker2 = format!("resume-by-id-2-{}", Uuid::new_v4());
let prompt2 = format!("echo {marker2}");
test.cmd()
let mut binding = assert_cmd::Command::cargo_bin("codex-exec")
.context("should find binary for codex-exec")?;
let cmd = binding
.env("CODEX_HOME", home.path())
.env("OPENAI_API_KEY", "dummy")
.env("CODEX_RS_SSE_FIXTURE", &fixture)
.env("OPENAI_BASE_URL", "http://unused.local")
.arg("--skip-git-repo-check")
@@ -164,9 +178,8 @@ fn exec_resume_by_id_appends_to_existing_file() -> anyhow::Result<()> {
.arg(env!("CARGO_MANIFEST_DIR"))
.arg(&prompt2)
.arg("resume")
.arg(&session_id)
.assert()
.success();
.arg(&session_id);
cmd.assert().success();
let resumed_path = find_session_file_containing_marker(&sessions_dir, &marker2)
.expect("no resumed session file containing marker2");
@@ -182,14 +195,17 @@ fn exec_resume_by_id_appends_to_existing_file() -> anyhow::Result<()> {
#[test]
fn exec_resume_preserves_cli_configuration_overrides() -> anyhow::Result<()> {
let test = test_codex_exec();
let fixture =
Path::new(env!("CARGO_MANIFEST_DIR")).join("tests/fixtures/cli_responses_fixture.sse");
let home = TempDir::new()?;
let fixture = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
.join("tests/fixtures/cli_responses_fixture.sse");
let marker = format!("resume-config-{}", Uuid::new_v4());
let prompt = format!("echo {marker}");
test.cmd()
Command::cargo_bin("codex-exec")
.context("should find binary for codex-exec")?
.env("CODEX_HOME", home.path())
.env("OPENAI_API_KEY", "dummy")
.env("CODEX_RS_SSE_FIXTURE", &fixture)
.env("OPENAI_BASE_URL", "http://unused.local")
.arg("--skip-git-repo-check")
@@ -203,15 +219,17 @@ fn exec_resume_preserves_cli_configuration_overrides() -> anyhow::Result<()> {
.assert()
.success();
let sessions_dir = test.home_path().join("sessions");
let sessions_dir = home.path().join("sessions");
let path = find_session_file_containing_marker(&sessions_dir, &marker)
.expect("no session file found after first run");
let marker2 = format!("resume-config-2-{}", Uuid::new_v4());
let prompt2 = format!("echo {marker2}");
let output = test
.cmd()
let output = Command::cargo_bin("codex-exec")
.context("should find binary for codex-exec")?
.env("CODEX_HOME", home.path())
.env("OPENAI_API_KEY", "dummy")
.env("CODEX_RS_SSE_FIXTURE", &fixture)
.env("OPENAI_BASE_URL", "http://unused.local")
.arg("--skip-git-repo-check")

View File

@@ -10,7 +10,7 @@ workspace = true
base64 = { workspace = true }
chrono = { workspace = true, features = ["serde"] }
codex-core = { workspace = true }
codex-app-server-protocol = { workspace = true }
codex-protocol = { workspace = true }
rand = { workspace = true }
reqwest = { workspace = true, features = ["json", "blocking"] }
serde = { workspace = true, features = ["derive"] }

View File

@@ -148,20 +148,20 @@ fn print_colored_warning_device_code() {
/// Full device code login flow.
pub async fn run_device_code_login(opts: ServerOptions) -> std::io::Result<()> {
let client = reqwest::Client::new();
let base_url = opts.issuer.trim_end_matches('/');
let api_base_url = format!("{}/api/accounts", opts.issuer.trim_end_matches('/'));
let auth_base_url = opts.issuer.trim_end_matches('/').to_owned();
print_colored_warning_device_code();
println!("⏳ Generating a new 9-digit device code for authentication...\n");
let uc = request_user_code(&client, &api_base_url, &opts.client_id).await?;
let uc = request_user_code(&client, &auth_base_url, &opts.client_id).await?;
println!(
"To authenticate, visit: {}/deviceauth/authorize and enter code: {}",
api_base_url, uc.user_code
opts.issuer.trim_end_matches('/'),
uc.user_code
);
let code_resp = poll_for_token(
&client,
&api_base_url,
&auth_base_url,
&uc.device_auth_id,
&uc.user_code,
uc.interval,
@@ -173,10 +173,10 @@ pub async fn run_device_code_login(opts: ServerOptions) -> std::io::Result<()> {
code_challenge: code_resp.code_challenge,
};
println!("authorization code received");
let redirect_uri = format!("{base_url}/deviceauth/callback");
let redirect_uri = format!("{}/deviceauth/callback", opts.issuer.trim_end_matches('/'));
let tokens = crate::server::exchange_code_for_tokens(
base_url,
&opts.issuer,
&opts.client_id,
&redirect_uri,
&pkce,

View File

@@ -9,12 +9,10 @@ pub use server::ShutdownHandle;
pub use server::run_login_server;
// Re-export commonly used auth types and helpers from codex-core for compatibility
pub use codex_app_server_protocol::AuthMode;
pub use codex_core::AuthManager;
pub use codex_core::CodexAuth;
pub use codex_core::auth::AuthDotJson;
pub use codex_core::auth::CLIENT_ID;
pub use codex_core::auth::CODEX_API_KEY_ENV_VAR;
pub use codex_core::auth::OPENAI_API_KEY_ENV_VAR;
pub use codex_core::auth::get_auth_file;
pub use codex_core::auth::login_with_api_key;
@@ -22,3 +20,4 @@ pub use codex_core::auth::logout;
pub use codex_core::auth::try_read_auth_json;
pub use codex_core::auth::write_auth_json;
pub use codex_core::token_data::TokenData;
pub use codex_protocol::mcp_protocol::AuthMode;

View File

@@ -25,7 +25,6 @@ use tiny_http::Header;
use tiny_http::Request;
use tiny_http::Response;
use tiny_http::Server;
use tiny_http::StatusCode;
const DEFAULT_ISSUER: &str = "https://auth.openai.com";
const DEFAULT_PORT: u16 = 1455;
@@ -149,15 +148,8 @@ pub fn run_login_server(opts: ServerOptions) -> io::Result<LoginServer> {
let _ = tokio::task::spawn_blocking(move || req.respond(response)).await;
None
}
HandledRequest::ResponseAndExit {
headers,
body,
result,
} => {
let _ = tokio::task::spawn_blocking(move || {
send_response_with_disconnect(req, headers, body)
})
.await;
HandledRequest::ResponseAndExit { response, result } => {
let _ = tokio::task::spawn_blocking(move || req.respond(response)).await;
Some(result)
}
HandledRequest::RedirectWithHeader(header) => {
@@ -193,8 +185,7 @@ enum HandledRequest {
Response(Response<Cursor<Vec<u8>>>),
RedirectWithHeader(Header),
ResponseAndExit {
headers: Vec<Header>,
body: Vec<u8>,
response: Response<Cursor<Vec<u8>>>,
result: io::Result<()>,
},
}
@@ -284,21 +275,20 @@ async fn process_request(
}
"/success" => {
let body = include_str!("assets/success.html");
let mut resp = Response::from_data(body.as_bytes());
if let Ok(h) = tiny_http::Header::from_bytes(
&b"Content-Type"[..],
&b"text/html; charset=utf-8"[..],
) {
resp.add_header(h);
}
HandledRequest::ResponseAndExit {
headers: match Header::from_bytes(
&b"Content-Type"[..],
&b"text/html; charset=utf-8"[..],
) {
Ok(header) => vec![header],
Err(_) => Vec::new(),
},
body: body.as_bytes().to_vec(),
response: resp,
result: Ok(()),
}
}
"/cancel" => HandledRequest::ResponseAndExit {
headers: Vec::new(),
body: b"Login cancelled".to_vec(),
response: Response::from_string("Login cancelled"),
result: Err(io::Error::new(
io::ErrorKind::Interrupted,
"Login cancelled",
@@ -308,50 +298,6 @@ async fn process_request(
}
}
/// tiny_http filters `Connection` headers out of `Response` objects, so using
/// `req.respond` never informs the client (or the library) that a keep-alive
/// socket should be closed. That leaves the per-connection worker parked in a
/// loop waiting for more requests, which in turn causes the next login attempt
/// to hang on the old connection. This helper bypasses tiny_https response
/// machinery: it extracts the raw writer, prints the HTTP response manually,
/// and always appends `Connection: close`, ensuring the socket is closed from
/// the server side. Ideally, tiny_http would provide an API to control
/// server-side connection persistence, but it does not.
fn send_response_with_disconnect(
req: Request,
mut headers: Vec<Header>,
body: Vec<u8>,
) -> io::Result<()> {
let status = StatusCode(200);
let mut writer = req.into_writer();
let reason = status.default_reason_phrase();
write!(writer, "HTTP/1.1 {} {}\r\n", status.0, reason)?;
headers.retain(|h| !h.field.equiv("Connection"));
if let Ok(close_header) = Header::from_bytes(&b"Connection"[..], &b"close"[..]) {
headers.push(close_header);
}
let content_length_value = format!("{}", body.len());
if let Ok(content_length_header) =
Header::from_bytes(&b"Content-Length"[..], content_length_value.as_bytes())
{
headers.push(content_length_header);
}
for header in headers {
write!(
writer,
"{}: {}\r\n",
header.field.as_str(),
header.value.as_str()
)?;
}
writer.write_all(b"\r\n")?;
writer.write_all(&body)?;
writer.flush()
}
fn build_authorize_url(
issuer: &str,
client_id: &str,

View File

@@ -32,7 +32,7 @@ fn make_jwt(payload: serde_json::Value) -> String {
async fn mock_usercode_success(server: &MockServer) {
Mock::given(method("POST"))
.and(path("/api/accounts/deviceauth/usercode"))
.and(path("/deviceauth/usercode"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"device_auth_id": "device-auth-123",
"user_code": "CODE-12345",
@@ -45,7 +45,7 @@ async fn mock_usercode_success(server: &MockServer) {
async fn mock_usercode_failure(server: &MockServer, status: u16) {
Mock::given(method("POST"))
.and(path("/api/accounts/deviceauth/usercode"))
.and(path("/deviceauth/usercode"))
.respond_with(ResponseTemplate::new(status))
.mount(server)
.await;
@@ -58,7 +58,7 @@ async fn mock_poll_token_two_step(
) {
let c = counter.clone();
Mock::given(method("POST"))
.and(path("/api/accounts/deviceauth/token"))
.and(path("/deviceauth/token"))
.respond_with(move |_: &Request| {
let attempt = c.fetch_add(1, Ordering::SeqCst);
if attempt == 0 {
@@ -214,7 +214,7 @@ async fn device_code_login_integration_handles_error_payload() {
// // /deviceauth/token → returns error payload with status 401
mock_poll_token_single(
&mock_server,
"/api/accounts/deviceauth/token",
"/deviceauth/token",
ResponseTemplate::new(401).set_body_json(json!({
"error": "authorization_declined",
"error_description": "Denied"

View File

@@ -22,7 +22,7 @@ use codex_core::protocol::InputItem;
use codex_core::protocol::Op;
use codex_core::protocol::Submission;
use codex_core::protocol::TaskCompleteEvent;
use codex_protocol::ConversationId;
use codex_protocol::mcp_protocol::ConversationId;
use mcp_types::CallToolResult;
use mcp_types::ContentBlock;
use mcp_types::RequestId;
@@ -280,7 +280,6 @@ async fn run_codex_tool_session_inner(
| EventMsg::ConversationPath(_)
| EventMsg::UserMessage(_)
| EventMsg::ShutdownComplete
| EventMsg::ViewImageToolCall(_)
| EventMsg::EnteredReviewMode(_)
| EventMsg::ExitedReviewMode(_) => {
// For now, we do not do anything extra for these

View File

@@ -7,8 +7,7 @@ use crate::codex_tool_config::create_tool_for_codex_tool_call_param;
use crate::codex_tool_config::create_tool_for_codex_tool_call_reply_param;
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
use crate::outgoing_message::OutgoingMessageSender;
use codex_protocol::ConversationId;
use codex_protocol::protocol::SessionSource;
use codex_protocol::mcp_protocol::ConversationId;
use codex_core::AuthManager;
use codex_core::ConversationManager;
@@ -53,9 +52,8 @@ impl MessageProcessor {
config: Arc<Config>,
) -> Self {
let outgoing = Arc::new(outgoing);
let auth_manager = AuthManager::shared(config.codex_home.clone(), false);
let conversation_manager =
Arc::new(ConversationManager::new(auth_manager, SessionSource::Mcp));
let auth_manager = AuthManager::shared(config.codex_home.clone());
let conversation_manager = Arc::new(ConversationManager::new(auth_manager));
Self {
outgoing,
initialized: false,

View File

@@ -234,8 +234,8 @@ mod tests {
use anyhow::Result;
use codex_core::protocol::EventMsg;
use codex_core::protocol::SessionConfiguredEvent;
use codex_protocol::ConversationId;
use codex_protocol::config_types::ReasoningEffort;
use codex_protocol::mcp_protocol::ConversationId;
use pretty_assertions::assert_eq;
use serde_json::json;
use tempfile::NamedTempFile;

View File

@@ -4,9 +4,9 @@ name = "codex-otel"
version = { workspace = true }
[lib]
doctest = false
name = "codex_otel"
path = "src/lib.rs"
doctest = false
[lints]
workspace = true
@@ -15,30 +15,25 @@ workspace = true
# Compile-time gate for OTLP support; disabled by default.
# Downstream crates can enable via `features = ["otel"]`.
default = []
otel = ["opentelemetry", "opentelemetry_sdk", "opentelemetry-otlp", "tonic"]
otel = [
"opentelemetry",
"opentelemetry_sdk",
"opentelemetry-otlp",
"tonic",
]
[dependencies]
codex-protocol = { path = "../protocol" }
chrono = { workspace = true }
codex-app-server-protocol = { workspace = true }
codex-protocol = { workspace = true }
eventsource-stream = { workspace = true }
opentelemetry = { workspace = true, features = ["logs"], optional = true }
opentelemetry-otlp = { workspace = true, features = [
"grpc-tonic",
"http-proto",
"http-json",
"reqwest",
"reqwest-rustls",
], optional = true }
opentelemetry-semantic-conventions = { workspace = true }
opentelemetry_sdk = { workspace = true, features = [
"logs",
"rt-tokio",
], optional = true }
reqwest = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
strum_macros = { workspace = true }
tokio = { workspace = true }
tonic = { workspace = true, optional = true }
tracing = { workspace = true }
opentelemetry = { workspace = true, features = ["logs"], optional = true }
opentelemetry_sdk = { workspace = true, features = ["logs", "rt-tokio"], optional = true }
opentelemetry-otlp = { workspace = true, features = ["grpc-tonic", "http-proto", "http-json", "reqwest", "reqwest-rustls"], optional = true }
opentelemetry-semantic-conventions = { workspace = true }
tonic = { workspace = true, optional = true }
serde = { workspace = true, features = ["derive"] }
strum_macros = { workspace = true }
reqwest = { workspace = true }
eventsource-stream = { workspace = true }
tokio = { workspace = true }
serde_json = { workspace = true }

View File

@@ -1,9 +1,9 @@
use chrono::SecondsFormat;
use chrono::Utc;
use codex_app_server_protocol::AuthMode;
use codex_protocol::ConversationId;
use codex_protocol::config_types::ReasoningEffort;
use codex_protocol::config_types::ReasoningSummary;
use codex_protocol::mcp_protocol::AuthMode;
use codex_protocol::mcp_protocol::ConversationId;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::InputItem;

View File

@@ -16,6 +16,7 @@ path = "src/main.rs"
[dependencies]
anyhow = { workspace = true }
clap = { workspace = true, features = ["derive"] }
codex-app-server-protocol = { workspace = true }
mcp-types = { workspace = true }
codex-protocol = { workspace = true }
ts-rs = { workspace = true }
clap = { workspace = true, features = ["derive"] }

Some files were not shown because too many files have changed in this diff Show More