Compare commits

...

6 Commits

Author SHA1 Message Date
Shaqayeq
ffe4006782 2026-03-09 Flatten enum-like schema oneOfs for python codegen 2026-03-09 13:35:44 -07:00
Shaqayeq
eb98b16533 2026-03-09 Simplify experimental python SDK wire models 2026-03-09 12:42:16 -07:00
Shaqayeq
17818d6243 Merge branch 'main' into shaqayeq/python-sdk-types-foundation 2026-03-08 21:10:48 -07:00
Shaqayeq
0ef657d208 2026-03-08 Regenerate python SDK artifacts for current schema 2026-03-08 19:56:29 -07:00
Shaqayeq
4e4f81da5f 2026-03-08 Remove legacy python SDK notification aliases 2026-03-08 19:55:30 -07:00
Shaqayeq
35d03a0508 2026-03-07 Stabilize experimental python SDK foundation
Description:
- pin datamodel-code-generator to an exact version and invoke it through the active interpreter
- make generate_types own the maintained generated surfaces and regenerate committed artifacts
- make sdk/python tests hermetic and regeneration checks idempotent
2026-03-08 00:47:14 -08:00
26 changed files with 10418 additions and 0 deletions

79
sdk/python/README.md Normal file
View File

@@ -0,0 +1,79 @@
# Codex App Server Python SDK (Experimental)
Experimental Python SDK for `codex app-server` JSON-RPC v2 over stdio, with a small default surface optimized for real scripts and apps.
The generated wire-model layer is currently sourced from the bundled v2 schema and exposed as Pydantic models with snake_case Python fields that serialize back to the app-servers camelCase wire format.
## Install
```bash
cd sdk/python
python -m pip install -e .
```
## Quickstart
```python
from codex_app_server import Codex, TextInput
with Codex() as codex:
thread = codex.thread_start(model="gpt-5")
result = thread.turn(TextInput("Say hello in one sentence.")).run()
print(result.text)
```
## Docs map
- Golden path tutorial: `docs/getting-started.md`
- API reference (signatures + behavior): `docs/api-reference.md`
- Common decisions and pitfalls: `docs/faq.md`
- Runnable examples index: `examples/README.md`
- Jupyter walkthrough notebook: `notebooks/sdk_walkthrough.ipynb`
## Examples
Start here:
```bash
cd sdk/python
python examples/01_quickstart_constructor/sync.py
python examples/01_quickstart_constructor/async.py
```
## Bundled runtime binaries (out of the box)
The SDK ships with platform-specific bundled binaries, so end users do not need updater scripts.
Runtime binary source (single source, no fallback):
- `src/codex_app_server/bin/darwin-arm64/codex`
- `src/codex_app_server/bin/darwin-x64/codex`
- `src/codex_app_server/bin/linux-arm64/codex`
- `src/codex_app_server/bin/linux-x64/codex`
- `src/codex_app_server/bin/windows-arm64/codex.exe`
- `src/codex_app_server/bin/windows-x64/codex.exe`
## Maintainer workflow (refresh binaries/types)
```bash
cd sdk/python
python scripts/update_sdk_artifacts.py --channel stable --bundle-all-platforms
# or
python scripts/update_sdk_artifacts.py --channel alpha --bundle-all-platforms
```
This refreshes all bundled OS/arch binaries and regenerates protocol-derived Python types.
## Compatibility and versioning
- Package: `codex-app-server-sdk`
- Current SDK version in this repo: `0.2.0`
- Python: `>=3.10`
- Target protocol: Codex `app-server` JSON-RPC v2
- Recommendation: keep SDK and `codex` CLI reasonably up to date together
## Notes
- `Codex()` is eager and performs startup + `initialize` in the constructor.
- Use context managers (`with Codex() as codex:`) to ensure shutdown.
- For transient overload, use `codex_app_server.retry.retry_on_overload`.

65
sdk/python/docs/faq.md Normal file
View File

@@ -0,0 +1,65 @@
# FAQ
## Thread vs turn
- A `Thread` is conversation state.
- A `Turn` is one model execution inside that thread.
- Multi-turn chat means multiple turns on the same `Thread`.
## `run()` vs `stream()`
- `Turn.run()` is the easiest path. It consumes events until completion and returns `TurnResult`.
- `Turn.stream()` yields raw notifications (`Notification`) so you can react event-by-event.
Choose `run()` for most apps. Choose `stream()` for progress UIs, custom timeout logic, or custom parsing.
## Sync vs async clients
- `Codex` is the minimal sync SDK and best default.
- `AsyncAppServerClient` wraps the sync transport with `asyncio.to_thread(...)` for async-friendly call sites.
If your app is not already async, stay with `Codex`.
## `thread(...)` vs `thread_resume(...)`
- `codex.thread(thread_id)` only binds a local helper to an existing thread ID.
- `codex.thread_resume(thread_id, ...)` performs a `thread/resume` RPC and can apply overrides (model, instructions, sandbox, etc.).
Use `thread(...)` for simple continuation. Use `thread_resume(...)` when you need explicit resume semantics or override fields.
## Why does constructor fail?
`Codex()` is eager: it starts transport and calls `initialize` in `__init__`.
Common causes:
- bundled runtime binary missing for your OS/arch under `src/codex_app_server/bin/*`
- local auth/session is missing
- incompatible/old app-server
Maintainers can refresh bundled binaries with:
```bash
cd sdk/python
python scripts/update_sdk_artifacts.py --channel stable --bundle-all-platforms
```
## Why does a turn "hang"?
A turn is complete only when `turn/completed` arrives for that turn ID.
- `run()` waits for this automatically.
- With `stream()`, make sure you keep consuming notifications until completion.
## How do I retry safely?
Use `retry_on_overload(...)` for transient overload failures (`ServerBusyError`).
Do not blindly retry all errors. For `InvalidParamsError` or `MethodNotFoundError`, fix inputs/version compatibility instead.
## Common pitfalls
- Starting a new thread for every prompt when you wanted continuity.
- Forgetting to `close()` (or not using `with Codex() as codex:`).
- Ignoring `TurnResult.status` and `TurnResult.error`.
- Mixing SDK input classes with raw dicts incorrectly in minimal API paths.

View File

@@ -0,0 +1,75 @@
# Getting Started
This is the fastest path from install to a multi-turn thread using the minimal SDK surface.
## 1) Install
From repo root:
```bash
cd sdk/python
python -m pip install -e .
```
Requirements:
- Python `>=3.10`
- bundled runtime binary for your platform (shipped in package)
- Local Codex auth/session configured
## 2) Run your first turn
```python
from codex_app_server import Codex, TextInput
with Codex() as codex:
print("Server:", codex.metadata.server_name, codex.metadata.server_version)
thread = codex.thread_start(model="gpt-5")
result = thread.turn(TextInput("Say hello in one sentence.")).run()
print("Thread:", result.thread_id)
print("Turn:", result.turn_id)
print("Status:", result.status)
print("Text:", result.text)
```
What happened:
- `Codex()` started and initialized `codex app-server`.
- `thread_start(...)` created a thread.
- `turn(...).run()` consumed events until `turn/completed` and returned a `TurnResult`.
## 3) Continue the same thread (multi-turn)
```python
from codex_app_server import Codex, TextInput
with Codex() as codex:
thread = codex.thread_start(model="gpt-5")
first = thread.turn(TextInput("Summarize Rust ownership in 2 bullets.")).run()
second = thread.turn(TextInput("Now explain it to a Python developer.")).run()
print("first:", first.text)
print("second:", second.text)
```
## 4) Resume an existing thread
```python
from codex_app_server import Codex, TextInput
THREAD_ID = "thr_123" # replace with a real id
with Codex() as codex:
thread = codex.thread(THREAD_ID)
result = thread.turn(TextInput("Continue where we left off.")).run()
print(result.text)
```
## 5) Next stops
- API surface and signatures: `docs/api-reference.md`
- Common decisions/pitfalls: `docs/faq.md`
- End-to-end runnable examples: `examples/README.md`

63
sdk/python/pyproject.toml Normal file
View File

@@ -0,0 +1,63 @@
[build-system]
requires = ["hatchling>=1.24.0"]
build-backend = "hatchling.build"
[project]
name = "codex-app-server-sdk"
version = "0.2.0"
description = "Python SDK for Codex app-server v2"
readme = "README.md"
requires-python = ">=3.10"
license = { text = "Apache-2.0" }
authors = [{ name = "OpenClaw Assistant" }]
keywords = ["codex", "json-rpc", "sdk", "llm", "app-server"]
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Topic :: Software Development :: Libraries :: Python Modules",
]
dependencies = ["pydantic>=2.12"]
[project.urls]
Homepage = "https://github.com/openai/codex"
Repository = "https://github.com/openai/codex"
Issues = "https://github.com/openai/codex/issues"
[project.optional-dependencies]
dev = ["pytest>=8.0", "datamodel-code-generator==0.31.2"]
[tool.hatch.build]
exclude = [
".venv/**",
".venv2/**",
".pytest_cache/**",
"dist/**",
"build/**",
]
[tool.hatch.build.targets.wheel]
packages = ["src/codex_app_server"]
include = [
"src/codex_app_server/bin/**",
"src/codex_app_server/py.typed",
]
[tool.hatch.build.targets.sdist]
include = [
"src/codex_app_server/**",
"README.md",
"CHANGELOG.md",
"CONTRIBUTING.md",
"RELEASE_CHECKLIST.md",
"pyproject.toml",
]
[tool.pytest.ini_options]
addopts = "-q"
testpaths = ["tests"]

View File

@@ -0,0 +1,796 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import importlib
import json
import platform
import re
import shutil
import stat
import subprocess
import sys
import tarfile
import tempfile
import types
import typing
import urllib.request
import zipfile
from dataclasses import dataclass
from pathlib import Path
from typing import Any, get_args, get_origin
def repo_root() -> Path:
return Path(__file__).resolve().parents[3]
def sdk_root() -> Path:
return repo_root() / "sdk" / "python"
def schema_bundle_path() -> Path:
return (
repo_root()
/ "codex-rs"
/ "app-server-protocol"
/ "schema"
/ "json"
/ "codex_app_server_protocol.v2.schemas.json"
)
def schema_root_dir() -> Path:
return repo_root() / "codex-rs" / "app-server-protocol" / "schema" / "json"
def _is_windows() -> bool:
return platform.system().lower().startswith("win")
def pinned_bin_path() -> Path:
name = "codex.exe" if _is_windows() else "codex"
return sdk_root() / "bin" / name
def bundled_platform_bin_path(platform_key: str) -> Path:
exe = "codex.exe" if platform_key.startswith("windows") else "codex"
return sdk_root() / "src" / "codex_app_server" / "bin" / platform_key / exe
PLATFORMS: dict[str, tuple[list[str], list[str]]] = {
"darwin-arm64": (["darwin", "apple-darwin", "macos"], ["aarch64", "arm64"]),
"darwin-x64": (["darwin", "apple-darwin", "macos"], ["x86_64", "amd64", "x64"]),
"linux-arm64": (["linux", "unknown-linux", "musl", "gnu"], ["aarch64", "arm64"]),
"linux-x64": (["linux", "unknown-linux", "musl", "gnu"], ["x86_64", "amd64", "x64"]),
"windows-arm64": (["windows", "pc-windows", "win", "msvc", "gnu"], ["aarch64", "arm64"]),
"windows-x64": (["windows", "pc-windows", "win", "msvc", "gnu"], ["x86_64", "amd64", "x64"]),
}
def run(cmd: list[str], cwd: Path) -> None:
subprocess.run(cmd, cwd=str(cwd), check=True)
def run_python_module(module: str, args: list[str], cwd: Path) -> None:
run([sys.executable, "-m", module, *args], cwd)
def platform_tokens() -> tuple[list[str], list[str]]:
sys_name = platform.system().lower()
machine = platform.machine().lower()
if sys_name == "darwin":
os_tokens = ["darwin", "apple-darwin", "macos"]
elif sys_name == "linux":
os_tokens = ["linux", "unknown-linux", "musl", "gnu"]
elif sys_name.startswith("win"):
os_tokens = ["windows", "pc-windows", "win", "msvc", "gnu"]
else:
raise RuntimeError(f"Unsupported OS: {sys_name}")
if machine in {"arm64", "aarch64"}:
arch_tokens = ["aarch64", "arm64"]
elif machine in {"x86_64", "amd64"}:
arch_tokens = ["x86_64", "amd64", "x64"]
else:
raise RuntimeError(f"Unsupported architecture: {machine}")
return os_tokens, arch_tokens
def pick_release(channel: str) -> dict[str, Any]:
releases = json.loads(
subprocess.check_output(["gh", "api", "repos/openai/codex/releases?per_page=50"], text=True)
)
if channel == "stable":
candidates = [r for r in releases if not r.get("prerelease") and not r.get("draft")]
else:
candidates = [r for r in releases if r.get("prerelease") and not r.get("draft")]
if not candidates:
raise RuntimeError(f"No {channel} release found")
return candidates[0]
def pick_asset(release: dict[str, Any], os_tokens: list[str], arch_tokens: list[str]) -> dict[str, Any]:
scored: list[tuple[int, dict[str, Any]]] = []
for asset in release.get("assets", []):
name = (asset.get("name") or "").lower()
# Accept only primary codex cli artifacts.
if not (name.startswith("codex-") or name == "codex"):
continue
if name.startswith("codex-responses") or name.startswith("codex-command-runner") or name.startswith("codex-windows-sandbox") or name.startswith("codex-npm"):
continue
if not (name.endswith(".tar.gz") or name.endswith(".zip")):
continue
os_score = sum(1 for t in os_tokens if t in name)
arch_score = sum(1 for t in arch_tokens if t in name)
if os_score == 0 or arch_score == 0:
continue
score = os_score * 10 + arch_score
scored.append((score, asset))
if not scored:
raise RuntimeError("Could not find matching codex CLI asset for this platform")
scored.sort(key=lambda x: x[0], reverse=True)
return scored[0][1]
def download(url: str, out: Path) -> None:
req = urllib.request.Request(url, headers={"User-Agent": "codex-python-sdk-updater"})
with urllib.request.urlopen(req) as resp, out.open("wb") as f:
shutil.copyfileobj(resp, f)
def extract_codex_binary(archive: Path, out_bin: Path) -> None:
with tempfile.TemporaryDirectory() as td:
tmp = Path(td)
if archive.name.endswith(".tar.gz"):
with tarfile.open(archive, "r:gz") as tar:
tar.extractall(tmp)
elif archive.name.endswith(".zip"):
with zipfile.ZipFile(archive) as zf:
zf.extractall(tmp)
else:
raise RuntimeError(f"Unsupported archive format: {archive}")
preferred_names = {"codex.exe", "codex"}
candidates = [
p for p in tmp.rglob("*") if p.is_file() and (p.name.lower() in preferred_names or p.name.lower().startswith("codex-"))
]
if not candidates:
raise RuntimeError("No codex binary found in release archive")
candidates.sort(key=lambda p: (p.name.lower() not in preferred_names, p.name.lower()))
out_bin.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(candidates[0], out_bin)
if not _is_windows():
out_bin.chmod(out_bin.stat().st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
def _download_asset_to_binary(release: dict[str, Any], os_tokens: list[str], arch_tokens: list[str], out_bin: Path) -> None:
asset = pick_asset(release, os_tokens, arch_tokens)
print(f"Asset: {asset.get('name')} -> {out_bin}")
with tempfile.TemporaryDirectory() as td:
archive = Path(td) / (asset.get("name") or "codex-release.tar.gz")
download(asset["browser_download_url"], archive)
extract_codex_binary(archive, out_bin)
def update_binary(channel: str) -> None:
if shutil.which("gh") is None:
raise RuntimeError("GitHub CLI (`gh`) is required to download release binaries")
release = pick_release(channel)
os_tokens, arch_tokens = platform_tokens()
print(f"Release: {release.get('tag_name')} ({channel})")
# refresh current platform in bundled runtime location
current_key = next((k for k, v in PLATFORMS.items() if v == (os_tokens, arch_tokens)), None)
out = bundled_platform_bin_path(current_key) if current_key else pinned_bin_path()
_download_asset_to_binary(release, os_tokens, arch_tokens, out)
print(f"Pinned binary updated: {out}")
def bundle_all_platform_binaries(channel: str) -> None:
if shutil.which("gh") is None:
raise RuntimeError("GitHub CLI (`gh`) is required to download release binaries")
release = pick_release(channel)
print(f"Release: {release.get('tag_name')} ({channel})")
for platform_key, (os_tokens, arch_tokens) in PLATFORMS.items():
_download_asset_to_binary(release, os_tokens, arch_tokens, bundled_platform_bin_path(platform_key))
print("Bundled all platform binaries.")
def _flatten_string_enum_one_of(definition: dict[str, Any]) -> bool:
branches = definition.get("oneOf")
if not isinstance(branches, list) or not branches:
return False
enum_values: list[str] = []
for branch in branches:
if not isinstance(branch, dict):
return False
if branch.get("type") != "string":
return False
enum = branch.get("enum")
if not isinstance(enum, list) or len(enum) != 1 or not isinstance(enum[0], str):
return False
extra_keys = set(branch) - {"type", "enum", "description", "title"}
if extra_keys:
return False
enum_values.append(enum[0])
description = definition.get("description")
title = definition.get("title")
definition.clear()
definition["type"] = "string"
definition["enum"] = enum_values
if isinstance(description, str):
definition["description"] = description
if isinstance(title, str):
definition["title"] = title
return True
def _normalized_schema_bundle_text() -> str:
schema = json.loads(schema_bundle_path().read_text())
definitions = schema.get("definitions", {})
if isinstance(definitions, dict):
for definition in definitions.values():
if isinstance(definition, dict):
_flatten_string_enum_one_of(definition)
return json.dumps(schema, indent=2, sort_keys=True) + "\n"
def generate_v2_all() -> None:
out_path = sdk_root() / "src" / "codex_app_server" / "generated" / "v2_all.py"
out_dir = out_path.parent
old_package_dir = out_dir / "v2_all"
if old_package_dir.exists():
shutil.rmtree(old_package_dir)
out_dir.mkdir(parents=True, exist_ok=True)
with tempfile.TemporaryDirectory() as td:
normalized_bundle = Path(td) / schema_bundle_path().name
normalized_bundle.write_text(_normalized_schema_bundle_text())
run_python_module(
"datamodel_code_generator",
[
"--input",
str(normalized_bundle),
"--input-file-type",
"jsonschema",
"--output",
str(out_path),
"--output-model-type",
"pydantic_v2.BaseModel",
"--target-python-version",
"3.10",
"--snake-case-field",
"--allow-population-by-field-name",
"--use-union-operator",
"--reuse-model",
"--disable-timestamp",
"--use-double-quotes",
],
cwd=sdk_root(),
)
_normalize_generated_timestamps(out_path)
def _notification_specs() -> list[tuple[str, str]]:
server_notifications = json.loads((schema_root_dir() / "ServerNotification.json").read_text())
one_of = server_notifications.get("oneOf", [])
generated_source = (
sdk_root() / "src" / "codex_app_server" / "generated" / "v2_all.py"
).read_text()
specs: list[tuple[str, str]] = []
for variant in one_of:
props = variant.get("properties", {})
method_meta = props.get("method", {})
params_meta = props.get("params", {})
methods = method_meta.get("enum", [])
if len(methods) != 1:
continue
method = methods[0]
if not isinstance(method, str):
continue
ref = params_meta.get("$ref")
if not isinstance(ref, str) or not ref.startswith("#/definitions/"):
continue
class_name = ref.split("/")[-1]
if f"class {class_name}(" not in generated_source and f"{class_name} =" not in generated_source:
# Skip schema variants that are not emitted into the generated v2 surface.
continue
specs.append((method, class_name))
specs.sort()
return specs
def generate_notification_registry() -> None:
out = sdk_root() / "src" / "codex_app_server" / "generated" / "notification_registry.py"
specs = _notification_specs()
class_names = sorted({class_name for _, class_name in specs})
lines = [
"# Auto-generated by scripts/update_sdk_artifacts.py",
"# DO NOT EDIT MANUALLY.",
"",
"from __future__ import annotations",
"",
"from pydantic import BaseModel",
"",
]
for class_name in class_names:
lines.append(f"from .v2_all import {class_name}")
lines.extend(
[
"",
"NOTIFICATION_MODELS: dict[str, type[BaseModel]] = {",
]
)
for method, class_name in specs:
lines.append(f' "{method}": {class_name},')
lines.extend(["}", ""])
out.write_text("\n".join(lines))
def _event_msg_types() -> list[str]:
schema = json.loads((schema_root_dir() / "codex_app_server_protocol.schemas.json").read_text())
definitions = schema.get("definitions", {})
event_msg = definitions.get("EventMsg", {})
one_of = event_msg.get("oneOf", [])
types: set[str] = set()
for variant in one_of:
props = variant.get("properties", {})
type_meta = props.get("type", {})
enum_values = type_meta.get("enum", [])
if len(enum_values) != 1:
continue
value = enum_values[0]
if isinstance(value, str):
types.add(value)
return sorted(types)
def generate_codex_event_types() -> None:
out = sdk_root() / "src" / "codex_app_server" / "generated" / "codex_event_types.py"
event_types = _event_msg_types()
literal_values = ", ".join(repr(event_type) for event_type in event_types)
event_type_alias = f"Literal[{literal_values}]" if literal_values else "str"
lines = [
"# Auto-generated by scripts/update_sdk_artifacts.py",
"# DO NOT EDIT MANUALLY.",
"",
"from __future__ import annotations",
"",
"from typing import Any, Literal",
"",
"from pydantic import BaseModel, ConfigDict",
"",
f"CodexEventType = {event_type_alias}",
"",
"",
"class CodexEventMessage(BaseModel):",
" model_config = ConfigDict(extra=\"allow\")",
" type: CodexEventType | str",
"",
"",
"class CodexEventNotification(BaseModel):",
" id: str | None = None",
" conversationId: str | None = None",
" msg: CodexEventMessage | dict[str, Any]",
"",
]
out.write_text("\n".join(lines))
def _normalize_generated_timestamps(root: Path) -> None:
timestamp_re = re.compile(r"^#\s+timestamp:\s+.+$", flags=re.MULTILINE)
py_files = [root] if root.is_file() else sorted(root.rglob("*.py"))
for py_file in py_files:
content = py_file.read_text()
normalized = timestamp_re.sub("# timestamp: <normalized>", content)
if normalized != content:
py_file.write_text(normalized)
FIELD_ANNOTATION_OVERRIDES: dict[str, str] = {
# Keep public API typed without falling back to `Any`.
"config": "JsonObject",
"output_schema": "JsonObject",
}
@dataclass(slots=True)
class PublicFieldSpec:
wire_name: str
py_name: str
annotation: str
required: bool
def _annotation_to_source(annotation: Any) -> str:
origin = get_origin(annotation)
if origin is typing.Annotated:
return _annotation_to_source(get_args(annotation)[0])
if origin in (typing.Union, types.UnionType):
parts: list[str] = []
for arg in get_args(annotation):
rendered = _annotation_to_source(arg)
if rendered not in parts:
parts.append(rendered)
return " | ".join(parts)
if origin is list:
args = get_args(annotation)
item = _annotation_to_source(args[0]) if args else "Any"
return f"list[{item}]"
if origin is dict:
args = get_args(annotation)
key = _annotation_to_source(args[0]) if args else "str"
val = _annotation_to_source(args[1]) if len(args) > 1 else "Any"
return f"dict[{key}, {val}]"
if annotation is Any or annotation is typing.Any:
return "Any"
if annotation is None or annotation is type(None):
return "None"
if isinstance(annotation, type):
if annotation.__module__ == "builtins":
return annotation.__name__
return annotation.__name__
return repr(annotation)
def _camel_to_snake(name: str) -> str:
head = re.sub(r"(.)([A-Z][a-z]+)", r"\1_\2", name)
return re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", head).lower()
def _load_public_fields(module_name: str, class_name: str, *, exclude: set[str] | None = None) -> list[PublicFieldSpec]:
exclude = exclude or set()
module = importlib.import_module(module_name)
model = getattr(module, class_name)
fields: list[PublicFieldSpec] = []
for name, field in model.model_fields.items():
if name in exclude:
continue
required = field.is_required()
annotation = _annotation_to_source(field.annotation)
override = FIELD_ANNOTATION_OVERRIDES.get(name)
if override is not None:
annotation = override if required else f"{override} | None"
fields.append(
PublicFieldSpec(
wire_name=name,
py_name=name,
annotation=annotation,
required=required,
)
)
return fields
def _kw_signature_lines(fields: list[PublicFieldSpec]) -> list[str]:
lines: list[str] = []
for field in fields:
default = "" if field.required else " = None"
lines.append(f" {field.py_name}: {field.annotation}{default},")
return lines
def _model_arg_lines(fields: list[PublicFieldSpec], *, indent: str = " ") -> list[str]:
return [f"{indent}{field.wire_name}={field.py_name}," for field in fields]
def _replace_generated_block(source: str, block_name: str, body: str) -> str:
start_tag = f" # BEGIN GENERATED: {block_name}"
end_tag = f" # END GENERATED: {block_name}"
pattern = re.compile(
rf"(?s){re.escape(start_tag)}\n.*?\n{re.escape(end_tag)}"
)
replacement = f"{start_tag}\n{body.rstrip()}\n{end_tag}"
updated, count = pattern.subn(replacement, source, count=1)
if count != 1:
raise RuntimeError(f"Could not update generated block: {block_name}")
return updated
def _render_codex_block(
thread_start_fields: list[PublicFieldSpec],
thread_list_fields: list[PublicFieldSpec],
resume_fields: list[PublicFieldSpec],
fork_fields: list[PublicFieldSpec],
) -> str:
lines = [
" def thread_start(",
" self,",
" *,",
*_kw_signature_lines(thread_start_fields),
" ) -> Thread:",
" params = ThreadStartParams(",
*_model_arg_lines(thread_start_fields),
" )",
" started = self._client.thread_start(params)",
" return Thread(self._client, started.thread.id)",
"",
" def thread_list(",
" self,",
" *,",
*_kw_signature_lines(thread_list_fields),
" ) -> ThreadListResponse:",
" params = ThreadListParams(",
*_model_arg_lines(thread_list_fields),
" )",
" return self._client.thread_list(params)",
"",
" def thread_resume(",
" self,",
" thread_id: str,",
" *,",
*_kw_signature_lines(resume_fields),
" ) -> Thread:",
" params = ThreadResumeParams(",
" thread_id=thread_id,",
*_model_arg_lines(resume_fields),
" )",
" resumed = self._client.thread_resume(thread_id, params)",
" return Thread(self._client, resumed.thread.id)",
"",
" def thread_fork(",
" self,",
" thread_id: str,",
" *,",
*_kw_signature_lines(fork_fields),
" ) -> Thread:",
" params = ThreadForkParams(",
" thread_id=thread_id,",
*_model_arg_lines(fork_fields),
" )",
" forked = self._client.thread_fork(thread_id, params)",
" return Thread(self._client, forked.thread.id)",
"",
" def thread_archive(self, thread_id: str) -> ThreadArchiveResponse:",
" return self._client.thread_archive(thread_id)",
"",
" def thread_unarchive(self, thread_id: str) -> Thread:",
" unarchived = self._client.thread_unarchive(thread_id)",
" return Thread(self._client, unarchived.thread.id)",
]
return "\n".join(lines)
def _render_async_codex_block(
thread_start_fields: list[PublicFieldSpec],
thread_list_fields: list[PublicFieldSpec],
resume_fields: list[PublicFieldSpec],
fork_fields: list[PublicFieldSpec],
) -> str:
lines = [
" async def thread_start(",
" self,",
" *,",
*_kw_signature_lines(thread_start_fields),
" ) -> AsyncThread:",
" await self._ensure_initialized()",
" params = ThreadStartParams(",
*_model_arg_lines(thread_start_fields),
" )",
" started = await self._client.thread_start(params)",
" return AsyncThread(self, started.thread.id)",
"",
" async def thread_list(",
" self,",
" *,",
*_kw_signature_lines(thread_list_fields),
" ) -> ThreadListResponse:",
" await self._ensure_initialized()",
" params = ThreadListParams(",
*_model_arg_lines(thread_list_fields),
" )",
" return await self._client.thread_list(params)",
"",
" async def thread_resume(",
" self,",
" thread_id: str,",
" *,",
*_kw_signature_lines(resume_fields),
" ) -> AsyncThread:",
" await self._ensure_initialized()",
" params = ThreadResumeParams(",
" thread_id=thread_id,",
*_model_arg_lines(resume_fields),
" )",
" resumed = await self._client.thread_resume(thread_id, params)",
" return AsyncThread(self, resumed.thread.id)",
"",
" async def thread_fork(",
" self,",
" thread_id: str,",
" *,",
*_kw_signature_lines(fork_fields),
" ) -> AsyncThread:",
" await self._ensure_initialized()",
" params = ThreadForkParams(",
" thread_id=thread_id,",
*_model_arg_lines(fork_fields),
" )",
" forked = await self._client.thread_fork(thread_id, params)",
" return AsyncThread(self, forked.thread.id)",
"",
" async def thread_archive(self, thread_id: str) -> ThreadArchiveResponse:",
" await self._ensure_initialized()",
" return await self._client.thread_archive(thread_id)",
"",
" async def thread_unarchive(self, thread_id: str) -> AsyncThread:",
" await self._ensure_initialized()",
" unarchived = await self._client.thread_unarchive(thread_id)",
" return AsyncThread(self, unarchived.thread.id)",
]
return "\n".join(lines)
def _render_thread_block(
turn_fields: list[PublicFieldSpec],
) -> str:
lines = [
" def turn(",
" self,",
" input: Input,",
" *,",
*_kw_signature_lines(turn_fields),
" ) -> Turn:",
" wire_input = _to_wire_input(input)",
" params = TurnStartParams(",
" thread_id=self.id,",
" input=wire_input,",
*_model_arg_lines(turn_fields),
" )",
" turn = self._client.turn_start(self.id, wire_input, params=params)",
" return Turn(self._client, self.id, turn.turn.id)",
]
return "\n".join(lines)
def _render_async_thread_block(
turn_fields: list[PublicFieldSpec],
) -> str:
lines = [
" async def turn(",
" self,",
" input: Input,",
" *,",
*_kw_signature_lines(turn_fields),
" ) -> AsyncTurn:",
" await self._codex._ensure_initialized()",
" wire_input = _to_wire_input(input)",
" params = TurnStartParams(",
" thread_id=self.id,",
" input=wire_input,",
*_model_arg_lines(turn_fields),
" )",
" turn = await self._codex._client.turn_start(",
" self.id,",
" wire_input,",
" params=params,",
" )",
" return AsyncTurn(self._codex, self.id, turn.turn.id)",
]
return "\n".join(lines)
def generate_public_api_flat_methods() -> None:
src_dir = sdk_root() / "src"
public_api_path = src_dir / "codex_app_server" / "public_api.py"
if not public_api_path.exists():
# PR2 can run codegen before the ergonomic public API layer is added.
return
src_dir_str = str(src_dir)
if src_dir_str not in sys.path:
sys.path.insert(0, src_dir_str)
thread_start_fields = _load_public_fields(
"codex_app_server.generated.v2_all",
"ThreadStartParams",
)
thread_list_fields = _load_public_fields(
"codex_app_server.generated.v2_all",
"ThreadListParams",
)
thread_resume_fields = _load_public_fields(
"codex_app_server.generated.v2_all",
"ThreadResumeParams",
exclude={"thread_id"},
)
thread_fork_fields = _load_public_fields(
"codex_app_server.generated.v2_all",
"ThreadForkParams",
exclude={"thread_id"},
)
turn_start_fields = _load_public_fields(
"codex_app_server.generated.v2_all",
"TurnStartParams",
exclude={"thread_id", "input"},
)
source = public_api_path.read_text()
source = _replace_generated_block(
source,
"Codex.flat_methods",
_render_codex_block(
thread_start_fields,
thread_list_fields,
thread_resume_fields,
thread_fork_fields,
),
)
source = _replace_generated_block(
source,
"AsyncCodex.flat_methods",
_render_async_codex_block(
thread_start_fields,
thread_list_fields,
thread_resume_fields,
thread_fork_fields,
),
)
source = _replace_generated_block(
source,
"Thread.flat_methods",
_render_thread_block(turn_start_fields),
)
source = _replace_generated_block(
source,
"AsyncThread.flat_methods",
_render_async_thread_block(turn_start_fields),
)
public_api_path.write_text(source)
def generate_types() -> None:
# v2_all is the authoritative generated surface.
generate_v2_all()
generate_notification_registry()
generate_codex_event_types()
generate_public_api_flat_methods()
def main() -> None:
parser = argparse.ArgumentParser(description="Single SDK maintenance entrypoint")
parser.add_argument("--channel", choices=["stable", "alpha"], default="stable")
parser.add_argument("--types-only", action="store_true", help="Regenerate types only (skip binary update)")
parser.add_argument(
"--bundle-all-platforms",
action="store_true",
help="Download and bundle codex binaries for all supported OS/arch targets",
)
args = parser.parse_args()
if not args.types_only:
if args.bundle_all_platforms:
bundle_all_platform_binaries(args.channel)
else:
update_binary(args.channel)
generate_types()
print("Done.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,13 @@
from .client import AppServerClient, AppServerConfig
from .errors import AppServerError, JsonRpcError, TransportClosedError
from .generated.codex_event_types import CodexEventNotification, CodexEventType
__all__ = [
"AppServerClient",
"AppServerConfig",
"AppServerError",
"JsonRpcError",
"TransportClosedError",
"CodexEventNotification",
"CodexEventType",
]

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,605 @@
from __future__ import annotations
import json
import os
import subprocess
import threading
import uuid
from collections import deque
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, Iterable, Iterator, TypeVar
from pydantic import BaseModel
from .errors import AppServerError, TransportClosedError, map_jsonrpc_error
from .generated.codex_event_types import CodexEventNotification
from .generated.notification_registry import NOTIFICATION_MODELS
from .generated.v2_all import (
AgentMessageDeltaNotification,
ModelListResponse,
ThreadArchiveResponse,
ThreadCompactStartResponse,
ThreadForkParams as V2ThreadForkParams,
ThreadForkResponse,
ThreadListParams as V2ThreadListParams,
ThreadListResponse,
ThreadReadResponse,
ThreadResumeParams as V2ThreadResumeParams,
ThreadResumeResponse,
ThreadSetNameResponse,
ThreadStartParams as V2ThreadStartParams,
ThreadStartResponse,
ThreadUnarchiveResponse,
TurnCompletedNotification,
TurnInterruptResponse,
TurnStartParams as V2TurnStartParams,
TurnStartResponse,
TurnSteerResponse,
)
from .models import (
InitializeResponse,
JsonObject,
JsonValue,
Notification,
TextTurnResult,
UnknownNotification,
)
from .retry import retry_on_overload
ModelT = TypeVar("ModelT", bound=BaseModel)
ApprovalHandler = Callable[[str, JsonObject | None], JsonObject]
def _params_dict(
params: (
V2ThreadStartParams
| V2ThreadResumeParams
| V2ThreadListParams
| V2ThreadForkParams
| V2TurnStartParams
| JsonObject
| None
),
) -> JsonObject:
if params is None:
return {}
if hasattr(params, "model_dump"):
dumped = params.model_dump(
by_alias=True,
exclude_none=True,
mode="json",
)
if not isinstance(dumped, dict):
raise TypeError("Expected model_dump() to return dict")
return dumped
if isinstance(params, dict):
return params
raise TypeError(f"Expected generated params model or dict, got {type(params).__name__}")
def _bundled_codex_path() -> Path:
import platform
sys_name = platform.system().lower()
machine = platform.machine().lower()
if sys_name.startswith("darwin"):
platform_dir = "darwin-arm64" if machine in {"arm64", "aarch64"} else "darwin-x64"
exe = "codex"
elif sys_name.startswith("linux"):
platform_dir = "linux-arm64" if machine in {"arm64", "aarch64"} else "linux-x64"
exe = "codex"
elif sys_name.startswith("windows") or os.name == "nt":
platform_dir = "windows-arm64" if machine in {"arm64", "aarch64"} else "windows-x64"
exe = "codex.exe"
else:
raise RuntimeError(f"Unsupported OS for bundled codex binary: {sys_name}/{machine}")
return Path(__file__).resolve().parent / "bin" / platform_dir / exe
@dataclass(slots=True)
class AppServerConfig:
codex_bin: str = str(_bundled_codex_path())
launch_args_override: tuple[str, ...] | None = None
config_overrides: tuple[str, ...] = ()
cwd: str | None = None
env: dict[str, str] | None = None
client_name: str = "codex_python_sdk"
client_title: str = "Codex Python SDK"
client_version: str = "0.2.0"
experimental_api: bool = True
class AppServerClient:
"""Synchronous typed JSON-RPC client for `codex app-server` over stdio."""
def __init__(
self,
config: AppServerConfig | None = None,
approval_handler: ApprovalHandler | None = None,
) -> None:
self.config = config or AppServerConfig()
self._approval_handler = approval_handler or self._default_approval_handler
self._proc: subprocess.Popen[str] | None = None
self._lock = threading.Lock()
self._turn_consumer_lock = threading.Lock()
self._active_turn_consumer: str | None = None
self._pending_notifications: deque[Notification] = deque()
self._stderr_lines: deque[str] = deque(maxlen=400)
self._stderr_thread: threading.Thread | None = None
def __enter__(self) -> "AppServerClient":
self.start()
return self
def __exit__(self, _exc_type, _exc, _tb) -> None:
self.close()
def start(self) -> None:
if self._proc is not None:
return
if self.config.launch_args_override is not None:
args = list(self.config.launch_args_override)
else:
codex_bin = Path(self.config.codex_bin)
if not codex_bin.exists():
raise FileNotFoundError(
f"Pinned codex binary not found at {codex_bin}. Run `python scripts/update_sdk_artifacts.py --channel stable` from sdk/python."
)
args = [str(codex_bin)]
for kv in self.config.config_overrides:
args.extend(["--config", kv])
args.extend(["app-server", "--listen", "stdio://"])
env = os.environ.copy()
if self.config.env:
env.update(self.config.env)
self._proc = subprocess.Popen(
args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
cwd=self.config.cwd,
env=env,
bufsize=1,
)
self._start_stderr_drain_thread()
def close(self) -> None:
if self._proc is None:
return
proc = self._proc
self._proc = None
self._active_turn_consumer = None
if proc.stdin:
proc.stdin.close()
try:
proc.terminate()
proc.wait(timeout=2)
except Exception:
proc.kill()
if self._stderr_thread and self._stderr_thread.is_alive():
self._stderr_thread.join(timeout=0.5)
def initialize(self) -> InitializeResponse:
result = self.request(
"initialize",
{
"clientInfo": {
"name": self.config.client_name,
"title": self.config.client_title,
"version": self.config.client_version,
},
"capabilities": {
"experimentalApi": self.config.experimental_api,
},
},
response_model=InitializeResponse,
)
self.notify("initialized", None)
return result
def request(
self,
method: str,
params: JsonObject | None,
*,
response_model: type[ModelT],
) -> ModelT:
result = self._request_raw(method, params)
if not isinstance(result, dict):
raise AppServerError(f"{method} response must be a JSON object")
return response_model.model_validate(result)
def _request_raw(self, method: str, params: JsonObject | None = None) -> JsonValue:
request_id = str(uuid.uuid4())
self._write_message({"id": request_id, "method": method, "params": params or {}})
while True:
msg = self._read_message()
if "method" in msg and "id" in msg:
response = self._handle_server_request(msg)
self._write_message({"id": msg["id"], "result": response})
continue
if "method" in msg and "id" not in msg:
self._pending_notifications.append(
self._coerce_notification(msg["method"], msg.get("params"))
)
continue
if msg.get("id") != request_id:
continue
if "error" in msg:
err = msg["error"]
if isinstance(err, dict):
raise map_jsonrpc_error(
int(err.get("code", -32000)),
str(err.get("message", "unknown")),
err.get("data"),
)
raise AppServerError("Malformed JSON-RPC error response")
return msg.get("result")
def notify(self, method: str, params: JsonObject | None = None) -> None:
self._write_message({"method": method, "params": params or {}})
def next_notification(self) -> Notification:
if self._pending_notifications:
return self._pending_notifications.popleft()
while True:
msg = self._read_message()
if "method" in msg and "id" in msg:
response = self._handle_server_request(msg)
self._write_message({"id": msg["id"], "result": response})
continue
if "method" in msg and "id" not in msg:
return self._coerce_notification(msg["method"], msg.get("params"))
def acquire_turn_consumer(self, turn_id: str) -> None:
with self._turn_consumer_lock:
if self._active_turn_consumer is not None:
raise RuntimeError(
"Concurrent turn consumers are not yet supported in the experimental SDK. "
f"Client is already streaming turn {self._active_turn_consumer!r}; "
f"cannot start turn {turn_id!r} until the active consumer finishes."
)
self._active_turn_consumer = turn_id
def release_turn_consumer(self, turn_id: str) -> None:
with self._turn_consumer_lock:
if self._active_turn_consumer == turn_id:
self._active_turn_consumer = None
def thread_start(self, params: V2ThreadStartParams | JsonObject | None = None) -> ThreadStartResponse:
return self.request("thread/start", _params_dict(params), response_model=ThreadStartResponse)
def thread_resume(
self,
thread_id: str,
params: V2ThreadResumeParams | JsonObject | None = None,
) -> ThreadResumeResponse:
payload = {"threadId": thread_id, **_params_dict(params)}
return self.request("thread/resume", payload, response_model=ThreadResumeResponse)
def thread_list(self, params: V2ThreadListParams | JsonObject | None = None) -> ThreadListResponse:
return self.request("thread/list", _params_dict(params), response_model=ThreadListResponse)
def thread_read(self, thread_id: str, include_turns: bool = False) -> ThreadReadResponse:
return self.request(
"thread/read",
{"threadId": thread_id, "includeTurns": include_turns},
response_model=ThreadReadResponse,
)
def thread_fork(
self,
thread_id: str,
params: V2ThreadForkParams | JsonObject | None = None,
) -> ThreadForkResponse:
payload = {"threadId": thread_id, **_params_dict(params)}
return self.request("thread/fork", payload, response_model=ThreadForkResponse)
def thread_archive(self, thread_id: str) -> ThreadArchiveResponse:
return self.request("thread/archive", {"threadId": thread_id}, response_model=ThreadArchiveResponse)
def thread_unarchive(self, thread_id: str) -> ThreadUnarchiveResponse:
return self.request("thread/unarchive", {"threadId": thread_id}, response_model=ThreadUnarchiveResponse)
def thread_set_name(self, thread_id: str, name: str) -> ThreadSetNameResponse:
return self.request(
"thread/name/set",
{"threadId": thread_id, "name": name},
response_model=ThreadSetNameResponse,
)
def thread_compact(self, thread_id: str) -> ThreadCompactStartResponse:
return self.request(
"thread/compact/start",
{"threadId": thread_id},
response_model=ThreadCompactStartResponse,
)
def turn_start(
self,
thread_id: str,
input_items: list[JsonObject] | JsonObject | str,
params: V2TurnStartParams | JsonObject | None = None,
) -> TurnStartResponse:
payload = {
**_params_dict(params),
"threadId": thread_id,
"input": self._normalize_input_items(input_items),
}
return self.request("turn/start", payload, response_model=TurnStartResponse)
def turn_text(
self,
thread_id: str,
text: str,
params: V2TurnStartParams | JsonObject | None = None,
) -> TurnStartResponse:
return self.turn_start(thread_id, text, params=params)
def turn_interrupt(self, thread_id: str, turn_id: str) -> TurnInterruptResponse:
return self.request(
"turn/interrupt",
{"threadId": thread_id, "turnId": turn_id},
response_model=TurnInterruptResponse,
)
def turn_steer(
self,
thread_id: str,
expected_turn_id: str,
input_items: list[JsonObject] | JsonObject | str,
) -> TurnSteerResponse:
return self.request(
"turn/steer",
{
"threadId": thread_id,
"expectedTurnId": expected_turn_id,
"input": self._normalize_input_items(input_items),
},
response_model=TurnSteerResponse,
)
def model_list(self, include_hidden: bool = False) -> ModelListResponse:
return self.request(
"model/list",
{"includeHidden": include_hidden},
response_model=ModelListResponse,
)
def request_with_retry_on_overload(
self,
method: str,
params: JsonObject | None,
*,
response_model: type[ModelT],
max_attempts: int = 3,
initial_delay_s: float = 0.25,
max_delay_s: float = 2.0,
) -> ModelT:
return retry_on_overload(
lambda: self.request(method, params, response_model=response_model),
max_attempts=max_attempts,
initial_delay_s=initial_delay_s,
max_delay_s=max_delay_s,
)
def wait_for_turn_completed(self, turn_id: str) -> TurnCompletedNotification:
while True:
notification = self.next_notification()
if (
notification.method == "turn/completed"
and isinstance(notification.payload, TurnCompletedNotification)
and notification.payload.turn.id == turn_id
):
return notification.payload
def stream_until_methods(self, methods: Iterable[str] | str) -> list[Notification]:
target_methods = {methods} if isinstance(methods, str) else set(methods)
out: list[Notification] = []
while True:
notification = self.next_notification()
out.append(notification)
if notification.method in target_methods:
return out
def run_text_turn(
self,
thread_id: str,
text: str,
params: V2TurnStartParams | JsonObject | None = None,
) -> TextTurnResult:
started = self.turn_text(thread_id, text, params=params)
turn_id = started.turn.id
deltas: list[AgentMessageDeltaNotification] = []
completed: TurnCompletedNotification | None = None
while True:
notification = self.next_notification()
if (
notification.method == "item/agentMessage/delta"
and isinstance(notification.payload, AgentMessageDeltaNotification)
and notification.payload.turn_id == turn_id
):
deltas.append(notification.payload)
continue
if (
notification.method == "turn/completed"
and isinstance(notification.payload, TurnCompletedNotification)
and notification.payload.turn.id == turn_id
):
completed = notification.payload
break
if completed is None:
raise AppServerError("turn/completed notification not received")
return TextTurnResult(
thread_id=thread_id,
turn_id=turn_id,
deltas=deltas,
completed=completed,
)
def ask_result(
self,
text: str,
*,
model: str | None = None,
thread_id: str | None = None,
) -> TextTurnResult:
active_thread_id = thread_id
if active_thread_id is None:
start_params = V2ThreadStartParams(model=model) if model else None
started = self.thread_start(start_params)
active_thread_id = started.thread.id
return self.run_text_turn(active_thread_id, text)
def ask(
self,
text: str,
*,
model: str | None = None,
thread_id: str | None = None,
) -> TextTurnResult:
return self.ask_result(text, model=model, thread_id=thread_id)
def stream_text(
self,
thread_id: str,
text: str,
params: V2TurnStartParams | JsonObject | None = None,
) -> Iterator[AgentMessageDeltaNotification]:
started = self.turn_text(thread_id, text, params=params)
turn_id = started.turn.id
while True:
notification = self.next_notification()
if (
notification.method == "item/agentMessage/delta"
and isinstance(notification.payload, AgentMessageDeltaNotification)
and notification.payload.turn_id == turn_id
):
yield notification.payload
continue
if (
notification.method == "turn/completed"
and isinstance(notification.payload, TurnCompletedNotification)
and notification.payload.turn.id == turn_id
):
break
def _coerce_notification(self, method: str, params: object) -> Notification:
params_dict = params if isinstance(params, dict) else {}
if method.startswith("codex/event/"):
event_params = dict(params_dict)
for key in ("id", "conversationId"):
value = event_params.get(key)
if isinstance(value, str) and value.strip() == "":
event_params[key] = None
try:
payload = CodexEventNotification.model_validate(event_params)
except Exception: # noqa: BLE001
return Notification(method=method, payload=UnknownNotification(params=params_dict))
return Notification(method=method, payload=payload)
model = NOTIFICATION_MODELS.get(method)
if model is None:
return Notification(method=method, payload=UnknownNotification(params=params_dict))
try:
payload = model.model_validate(params_dict)
except Exception: # noqa: BLE001
return Notification(method=method, payload=UnknownNotification(params=params_dict))
return Notification(method=method, payload=payload)
def _normalize_input_items(
self,
input_items: list[JsonObject] | JsonObject | str,
) -> list[JsonObject]:
if isinstance(input_items, str):
return [{"type": "text", "text": input_items}]
if isinstance(input_items, dict):
return [input_items]
return input_items
def _default_approval_handler(self, method: str, params: JsonObject | None) -> JsonObject:
if method == "item/commandExecution/requestApproval":
return {"decision": "accept"}
if method == "item/fileChange/requestApproval":
return {"decision": "accept"}
return {}
def _start_stderr_drain_thread(self) -> None:
if self._proc is None or self._proc.stderr is None:
return
def _drain() -> None:
stderr = self._proc.stderr
if stderr is None:
return
for line in stderr:
self._stderr_lines.append(line.rstrip("\n"))
self._stderr_thread = threading.Thread(target=_drain, daemon=True)
self._stderr_thread.start()
def _stderr_tail(self, limit: int = 40) -> str:
return "\n".join(list(self._stderr_lines)[-limit:])
def _handle_server_request(self, msg: dict[str, JsonValue]) -> JsonObject:
method = msg["method"]
params = msg.get("params")
if not isinstance(method, str):
return {}
return self._approval_handler(
method,
params if isinstance(params, dict) else None,
)
def _write_message(self, payload: JsonObject) -> None:
if self._proc is None or self._proc.stdin is None:
raise TransportClosedError("app-server is not running")
with self._lock:
self._proc.stdin.write(json.dumps(payload) + "\n")
self._proc.stdin.flush()
def _read_message(self) -> dict[str, JsonValue]:
if self._proc is None or self._proc.stdout is None:
raise TransportClosedError("app-server is not running")
line = self._proc.stdout.readline()
if not line:
raise TransportClosedError(
f"app-server closed stdout. stderr_tail={self._stderr_tail()[:2000]}"
)
try:
message = json.loads(line)
except json.JSONDecodeError as exc:
raise AppServerError(f"Invalid JSON-RPC line: {line!r}") from exc
if not isinstance(message, dict):
raise AppServerError(f"Invalid JSON-RPC payload: {message!r}")
return message
def default_codex_home() -> str:
return str(Path.home() / ".codex")

View File

@@ -0,0 +1,125 @@
from __future__ import annotations
from typing import Any
class AppServerError(Exception):
"""Base exception for SDK errors."""
class JsonRpcError(AppServerError):
"""Raw JSON-RPC error wrapper from the server."""
def __init__(self, code: int, message: str, data: Any = None):
super().__init__(f"JSON-RPC error {code}: {message}")
self.code = code
self.message = message
self.data = data
class TransportClosedError(AppServerError):
"""Raised when the app-server transport closes unexpectedly."""
class AppServerRpcError(JsonRpcError):
"""Base typed error for JSON-RPC failures."""
class ParseError(AppServerRpcError):
pass
class InvalidRequestError(AppServerRpcError):
pass
class MethodNotFoundError(AppServerRpcError):
pass
class InvalidParamsError(AppServerRpcError):
pass
class InternalRpcError(AppServerRpcError):
pass
class ServerBusyError(AppServerRpcError):
"""Server is overloaded / unavailable and caller should retry."""
class RetryLimitExceededError(ServerBusyError):
"""Server exhausted internal retry budget for a retryable operation."""
def _contains_retry_limit_text(message: str) -> bool:
lowered = message.lower()
return "retry limit" in lowered or "too many failed attempts" in lowered
def _is_server_overloaded(data: Any) -> bool:
if data is None:
return False
if isinstance(data, str):
return data.lower() == "server_overloaded"
if isinstance(data, dict):
direct = (
data.get("codex_error_info")
or data.get("codexErrorInfo")
or data.get("errorInfo")
)
if isinstance(direct, str) and direct.lower() == "server_overloaded":
return True
if isinstance(direct, dict):
for value in direct.values():
if isinstance(value, str) and value.lower() == "server_overloaded":
return True
for value in data.values():
if _is_server_overloaded(value):
return True
if isinstance(data, list):
return any(_is_server_overloaded(value) for value in data)
return False
def map_jsonrpc_error(code: int, message: str, data: Any = None) -> JsonRpcError:
"""Map a raw JSON-RPC error into a richer SDK exception class."""
if code == -32700:
return ParseError(code, message, data)
if code == -32600:
return InvalidRequestError(code, message, data)
if code == -32601:
return MethodNotFoundError(code, message, data)
if code == -32602:
return InvalidParamsError(code, message, data)
if code == -32603:
return InternalRpcError(code, message, data)
if -32099 <= code <= -32000:
if _is_server_overloaded(data):
if _contains_retry_limit_text(message):
return RetryLimitExceededError(code, message, data)
return ServerBusyError(code, message, data)
if _contains_retry_limit_text(message):
return RetryLimitExceededError(code, message, data)
return AppServerRpcError(code, message, data)
return JsonRpcError(code, message, data)
def is_retryable_error(exc: BaseException) -> bool:
"""True if the exception is a transient overload-style error."""
if isinstance(exc, ServerBusyError):
return True
if isinstance(exc, JsonRpcError):
return _is_server_overloaded(exc.data)
return False

View File

@@ -0,0 +1 @@
"""Auto-generated Python types derived from the app-server schemas."""

View File

@@ -0,0 +1,21 @@
# Auto-generated by scripts/update_sdk_artifacts.py
# DO NOT EDIT MANUALLY.
from __future__ import annotations
from typing import Any, Literal
from pydantic import BaseModel, ConfigDict
CodexEventType = Literal['agent_message', 'agent_message_content_delta', 'agent_message_delta', 'agent_reasoning', 'agent_reasoning_delta', 'agent_reasoning_raw_content', 'agent_reasoning_raw_content_delta', 'agent_reasoning_section_break', 'apply_patch_approval_request', 'background_event', 'collab_agent_interaction_begin', 'collab_agent_interaction_end', 'collab_agent_spawn_begin', 'collab_agent_spawn_end', 'collab_close_begin', 'collab_close_end', 'collab_resume_begin', 'collab_resume_end', 'collab_waiting_begin', 'collab_waiting_end', 'context_compacted', 'deprecation_notice', 'dynamic_tool_call_request', 'dynamic_tool_call_response', 'elicitation_request', 'entered_review_mode', 'error', 'exec_approval_request', 'exec_command_begin', 'exec_command_end', 'exec_command_output_delta', 'exited_review_mode', 'get_history_entry_response', 'image_generation_begin', 'image_generation_end', 'item_completed', 'item_started', 'list_custom_prompts_response', 'list_remote_skills_response', 'list_skills_response', 'mcp_list_tools_response', 'mcp_startup_complete', 'mcp_startup_update', 'mcp_tool_call_begin', 'mcp_tool_call_end', 'model_reroute', 'patch_apply_begin', 'patch_apply_end', 'plan_delta', 'plan_update', 'raw_response_item', 'realtime_conversation_closed', 'realtime_conversation_realtime', 'realtime_conversation_started', 'reasoning_content_delta', 'reasoning_raw_content_delta', 'remote_skill_downloaded', 'request_permissions', 'request_user_input', 'session_configured', 'shutdown_complete', 'skills_update_available', 'stream_error', 'task_complete', 'task_started', 'terminal_interaction', 'thread_name_updated', 'thread_rolled_back', 'token_count', 'turn_aborted', 'turn_diff', 'undo_completed', 'undo_started', 'user_message', 'view_image_tool_call', 'warning', 'web_search_begin', 'web_search_end']
class CodexEventMessage(BaseModel):
model_config = ConfigDict(extra="allow")
type: CodexEventType | str
class CodexEventNotification(BaseModel):
id: str | None = None
conversationId: str | None = None
msg: CodexEventMessage | dict[str, Any]

View File

@@ -0,0 +1,98 @@
# Auto-generated by scripts/update_sdk_artifacts.py
# DO NOT EDIT MANUALLY.
from __future__ import annotations
from pydantic import BaseModel
from .v2_all import AccountLoginCompletedNotification
from .v2_all import AccountRateLimitsUpdatedNotification
from .v2_all import AccountUpdatedNotification
from .v2_all import AgentMessageDeltaNotification
from .v2_all import AppListUpdatedNotification
from .v2_all import CommandExecOutputDeltaNotification
from .v2_all import CommandExecutionOutputDeltaNotification
from .v2_all import ConfigWarningNotification
from .v2_all import ContextCompactedNotification
from .v2_all import DeprecationNoticeNotification
from .v2_all import ErrorNotification
from .v2_all import FileChangeOutputDeltaNotification
from .v2_all import FuzzyFileSearchSessionCompletedNotification
from .v2_all import FuzzyFileSearchSessionUpdatedNotification
from .v2_all import ItemCompletedNotification
from .v2_all import ItemStartedNotification
from .v2_all import McpServerOauthLoginCompletedNotification
from .v2_all import McpToolCallProgressNotification
from .v2_all import ModelReroutedNotification
from .v2_all import PlanDeltaNotification
from .v2_all import ReasoningSummaryPartAddedNotification
from .v2_all import ReasoningSummaryTextDeltaNotification
from .v2_all import ReasoningTextDeltaNotification
from .v2_all import ServerRequestResolvedNotification
from .v2_all import SkillsChangedNotification
from .v2_all import TerminalInteractionNotification
from .v2_all import ThreadArchivedNotification
from .v2_all import ThreadClosedNotification
from .v2_all import ThreadNameUpdatedNotification
from .v2_all import ThreadRealtimeClosedNotification
from .v2_all import ThreadRealtimeErrorNotification
from .v2_all import ThreadRealtimeItemAddedNotification
from .v2_all import ThreadRealtimeOutputAudioDeltaNotification
from .v2_all import ThreadRealtimeStartedNotification
from .v2_all import ThreadStartedNotification
from .v2_all import ThreadStatusChangedNotification
from .v2_all import ThreadTokenUsageUpdatedNotification
from .v2_all import ThreadUnarchivedNotification
from .v2_all import TurnCompletedNotification
from .v2_all import TurnDiffUpdatedNotification
from .v2_all import TurnPlanUpdatedNotification
from .v2_all import TurnStartedNotification
from .v2_all import WindowsSandboxSetupCompletedNotification
from .v2_all import WindowsWorldWritableWarningNotification
NOTIFICATION_MODELS: dict[str, type[BaseModel]] = {
"account/login/completed": AccountLoginCompletedNotification,
"account/rateLimits/updated": AccountRateLimitsUpdatedNotification,
"account/updated": AccountUpdatedNotification,
"app/list/updated": AppListUpdatedNotification,
"command/exec/outputDelta": CommandExecOutputDeltaNotification,
"configWarning": ConfigWarningNotification,
"deprecationNotice": DeprecationNoticeNotification,
"error": ErrorNotification,
"fuzzyFileSearch/sessionCompleted": FuzzyFileSearchSessionCompletedNotification,
"fuzzyFileSearch/sessionUpdated": FuzzyFileSearchSessionUpdatedNotification,
"item/agentMessage/delta": AgentMessageDeltaNotification,
"item/commandExecution/outputDelta": CommandExecutionOutputDeltaNotification,
"item/commandExecution/terminalInteraction": TerminalInteractionNotification,
"item/completed": ItemCompletedNotification,
"item/fileChange/outputDelta": FileChangeOutputDeltaNotification,
"item/mcpToolCall/progress": McpToolCallProgressNotification,
"item/plan/delta": PlanDeltaNotification,
"item/reasoning/summaryPartAdded": ReasoningSummaryPartAddedNotification,
"item/reasoning/summaryTextDelta": ReasoningSummaryTextDeltaNotification,
"item/reasoning/textDelta": ReasoningTextDeltaNotification,
"item/started": ItemStartedNotification,
"mcpServer/oauthLogin/completed": McpServerOauthLoginCompletedNotification,
"model/rerouted": ModelReroutedNotification,
"serverRequest/resolved": ServerRequestResolvedNotification,
"skills/changed": SkillsChangedNotification,
"thread/archived": ThreadArchivedNotification,
"thread/closed": ThreadClosedNotification,
"thread/compacted": ContextCompactedNotification,
"thread/name/updated": ThreadNameUpdatedNotification,
"thread/realtime/closed": ThreadRealtimeClosedNotification,
"thread/realtime/error": ThreadRealtimeErrorNotification,
"thread/realtime/itemAdded": ThreadRealtimeItemAddedNotification,
"thread/realtime/outputAudio/delta": ThreadRealtimeOutputAudioDeltaNotification,
"thread/realtime/started": ThreadRealtimeStartedNotification,
"thread/started": ThreadStartedNotification,
"thread/status/changed": ThreadStatusChangedNotification,
"thread/tokenUsage/updated": ThreadTokenUsageUpdatedNotification,
"thread/unarchived": ThreadUnarchivedNotification,
"turn/completed": TurnCompletedNotification,
"turn/diff/updated": TurnDiffUpdatedNotification,
"turn/plan/updated": TurnPlanUpdatedNotification,
"turn/started": TurnStartedNotification,
"windows/worldWritableWarning": WindowsWorldWritableWarningNotification,
"windowsSandbox/setupCompleted": WindowsSandboxSetupCompletedNotification,
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,25 @@
"""Stable aliases over full v2 autogenerated models (datamodel-code-generator)."""
from .v2_all.ModelListResponse import ModelListResponse
from .v2_all.ThreadCompactStartResponse import ThreadCompactStartResponse
from .v2_all.ThreadListResponse import ThreadListResponse
from .v2_all.ThreadReadResponse import ThreadReadResponse
from .v2_all.ThreadTokenUsageUpdatedNotification import (
ThreadTokenUsageUpdatedNotification,
)
from .v2_all.TurnCompletedNotification import ThreadItem153 as ThreadItem
from .v2_all.TurnCompletedNotification import (
TurnCompletedNotification as TurnCompletedNotificationPayload,
)
from .v2_all.TurnSteerResponse import TurnSteerResponse
__all__ = [
"ModelListResponse",
"ThreadCompactStartResponse",
"ThreadListResponse",
"ThreadReadResponse",
"ThreadTokenUsageUpdatedNotification",
"TurnCompletedNotificationPayload",
"TurnSteerResponse",
"ThreadItem",
]

View File

@@ -0,0 +1,107 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import TypeAlias
from pydantic import BaseModel
from .generated.codex_event_types import CodexEventNotification
from .generated.v2_all import (
AccountLoginCompletedNotification,
AccountRateLimitsUpdatedNotification,
AccountUpdatedNotification,
AgentMessageDeltaNotification,
AppListUpdatedNotification,
CommandExecutionOutputDeltaNotification,
ConfigWarningNotification,
ContextCompactedNotification,
DeprecationNoticeNotification,
ErrorNotification,
FileChangeOutputDeltaNotification,
ItemCompletedNotification,
ItemStartedNotification,
McpServerOauthLoginCompletedNotification,
McpToolCallProgressNotification,
PlanDeltaNotification,
RawResponseItemCompletedNotification,
ReasoningSummaryPartAddedNotification,
ReasoningSummaryTextDeltaNotification,
ReasoningTextDeltaNotification,
TerminalInteractionNotification,
ThreadNameUpdatedNotification,
ThreadStartedNotification,
ThreadTokenUsageUpdatedNotification,
TurnCompletedNotification,
TurnDiffUpdatedNotification,
TurnPlanUpdatedNotification,
TurnStartedNotification,
WindowsWorldWritableWarningNotification,
)
JsonScalar: TypeAlias = str | int | float | bool | None
JsonValue: TypeAlias = JsonScalar | dict[str, "JsonValue"] | list["JsonValue"]
JsonObject: TypeAlias = dict[str, JsonValue]
@dataclass(slots=True)
class UnknownNotification:
params: JsonObject
NotificationPayload: TypeAlias = (
AccountLoginCompletedNotification
| AccountRateLimitsUpdatedNotification
| AccountUpdatedNotification
| AgentMessageDeltaNotification
| AppListUpdatedNotification
| CommandExecutionOutputDeltaNotification
| ConfigWarningNotification
| ContextCompactedNotification
| DeprecationNoticeNotification
| ErrorNotification
| FileChangeOutputDeltaNotification
| ItemCompletedNotification
| ItemStartedNotification
| McpServerOauthLoginCompletedNotification
| McpToolCallProgressNotification
| PlanDeltaNotification
| RawResponseItemCompletedNotification
| ReasoningSummaryPartAddedNotification
| ReasoningSummaryTextDeltaNotification
| ReasoningTextDeltaNotification
| TerminalInteractionNotification
| ThreadNameUpdatedNotification
| ThreadStartedNotification
| ThreadTokenUsageUpdatedNotification
| TurnCompletedNotification
| TurnDiffUpdatedNotification
| TurnPlanUpdatedNotification
| TurnStartedNotification
| WindowsWorldWritableWarningNotification
| CodexEventNotification
| UnknownNotification
)
@dataclass(slots=True)
class Notification:
method: str
payload: NotificationPayload
class ServerInfo(BaseModel):
name: str | None = None
version: str | None = None
class InitializeResponse(BaseModel):
serverInfo: ServerInfo | None = None
userAgent: str | None = None
@dataclass(slots=True)
class TextTurnResult:
thread_id: str
turn_id: str
deltas: list[AgentMessageDeltaNotification]
completed: TurnCompletedNotification

View File

View File

@@ -0,0 +1,41 @@
from __future__ import annotations
import random
import time
from typing import Callable, TypeVar
from .errors import is_retryable_error
T = TypeVar("T")
def retry_on_overload(
op: Callable[[], T],
*,
max_attempts: int = 3,
initial_delay_s: float = 0.25,
max_delay_s: float = 2.0,
jitter_ratio: float = 0.2,
) -> T:
"""Retry helper for transient server-overload errors."""
if max_attempts < 1:
raise ValueError("max_attempts must be >= 1")
delay = initial_delay_s
attempt = 0
while True:
attempt += 1
try:
return op()
except Exception as exc:
if attempt >= max_attempts:
raise
if not is_retryable_error(exc):
raise
jitter = delay * jitter_ratio
sleep_for = min(max_delay_s, delay) + random.uniform(-jitter, jitter)
if sleep_for > 0:
time.sleep(sleep_for)
delay = min(max_delay_s, delay * 2)

View File

@@ -0,0 +1,16 @@
from __future__ import annotations
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SRC = ROOT / "src"
src_str = str(SRC)
if src_str in sys.path:
sys.path.remove(src_str)
sys.path.insert(0, src_str)
for module_name in list(sys.modules):
if module_name == "codex_app_server" or module_name.startswith("codex_app_server."):
sys.modules.pop(module_name)

View File

@@ -0,0 +1,147 @@
from __future__ import annotations
import ast
import importlib.util
import json
import platform
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
def _load_update_script_module():
script_path = ROOT / "scripts" / "update_sdk_artifacts.py"
spec = importlib.util.spec_from_file_location("update_sdk_artifacts", script_path)
if spec is None or spec.loader is None:
raise AssertionError(f"Failed to load script module: {script_path}")
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module
def test_generation_has_single_maintenance_entrypoint_script() -> None:
scripts = sorted(p.name for p in (ROOT / "scripts").glob("*.py"))
assert scripts == ["update_sdk_artifacts.py"]
def test_generate_types_wires_all_generation_steps() -> None:
source = (ROOT / "scripts" / "update_sdk_artifacts.py").read_text()
tree = ast.parse(source)
generate_types_fn = next(
(node for node in tree.body if isinstance(node, ast.FunctionDef) and node.name == "generate_types"),
None,
)
assert generate_types_fn is not None
calls: list[str] = []
for node in generate_types_fn.body:
if isinstance(node, ast.Expr) and isinstance(node.value, ast.Call):
fn = node.value.func
if isinstance(fn, ast.Name):
calls.append(fn.id)
assert calls == [
"generate_v2_all",
"generate_notification_registry",
"generate_codex_event_types",
"generate_public_api_flat_methods",
]
def test_schema_normalization_only_flattens_string_literal_oneofs() -> None:
script = _load_update_script_module()
schema = json.loads(
(
ROOT.parent.parent
/ "codex-rs"
/ "app-server-protocol"
/ "schema"
/ "json"
/ "codex_app_server_protocol.v2.schemas.json"
).read_text()
)
definitions = schema["definitions"]
flattened = [
name
for name, definition in definitions.items()
if isinstance(definition, dict)
and script._flatten_string_enum_one_of(definition.copy())
]
assert flattened == [
"AuthMode",
"CommandExecOutputStream",
"ExperimentalFeatureStage",
"InputModality",
"MessagePhase",
]
def test_bundled_binaries_exist_for_all_supported_platforms() -> None:
script = _load_update_script_module()
for platform_key in script.PLATFORMS:
bin_path = script.bundled_platform_bin_path(platform_key)
assert bin_path.is_file(), f"Missing bundled binary: {bin_path}"
def test_default_runtime_uses_current_platform_bundled_binary() -> None:
client_source = (ROOT / "src" / "codex_app_server" / "client.py").read_text()
client_tree = ast.parse(client_source)
# Keep this assertion source-level so it works in both PR2 (types foundation)
# and PR3 (full SDK), regardless of runtime module wiring.
app_server_config = next(
(
node
for node in client_tree.body
if isinstance(node, ast.ClassDef) and node.name == "AppServerConfig"
),
None,
)
assert app_server_config is not None
codex_bin_field = next(
(
node
for node in app_server_config.body
if isinstance(node, ast.AnnAssign)
and isinstance(node.target, ast.Name)
and node.target.id == "codex_bin"
),
None,
)
assert codex_bin_field is not None
assert isinstance(codex_bin_field.value, ast.Call)
assert isinstance(codex_bin_field.value.func, ast.Name)
assert codex_bin_field.value.func.id == "str"
assert len(codex_bin_field.value.args) == 1
bundled_call = codex_bin_field.value.args[0]
assert isinstance(bundled_call, ast.Call)
assert isinstance(bundled_call.func, ast.Name)
assert bundled_call.func.id == "_bundled_codex_path"
bin_root = (ROOT / "src" / "codex_app_server" / "bin").resolve()
sys_name = platform.system().lower()
machine = platform.machine().lower()
is_arm = machine in {"arm64", "aarch64"}
if sys_name.startswith("darwin"):
platform_dir = "darwin-arm64" if is_arm else "darwin-x64"
exe = "codex"
elif sys_name.startswith("linux"):
platform_dir = "linux-arm64" if is_arm else "linux-x64"
exe = "codex"
elif sys_name.startswith("windows"):
platform_dir = "windows-arm64" if is_arm else "windows-x64"
exe = "codex.exe"
else:
raise AssertionError(f"Unsupported platform in test: {sys_name}/{machine}")
expected = (bin_root / platform_dir / exe).resolve()
assert expected.is_file()

View File

@@ -0,0 +1,112 @@
from __future__ import annotations
from pathlib import Path
from typing import Any
from codex_app_server.client import AppServerClient, _params_dict
from codex_app_server.generated.codex_event_types import CodexEventNotification
from codex_app_server.generated.v2_all import ThreadListParams, ThreadTokenUsageUpdatedNotification
from codex_app_server.models import UnknownNotification
ROOT = Path(__file__).resolve().parents[1]
def test_thread_set_name_and_compact_use_current_rpc_methods() -> None:
client = AppServerClient()
calls: list[tuple[str, dict[str, Any] | None]] = []
def fake_request(method: str, params, *, response_model): # type: ignore[no-untyped-def]
calls.append((method, params))
return response_model.model_validate({})
client.request = fake_request # type: ignore[method-assign]
client.thread_set_name("thread-1", "sdk-name")
client.thread_compact("thread-1")
assert calls[0][0] == "thread/name/set"
assert calls[1][0] == "thread/compact/start"
def test_generated_params_models_are_snake_case_and_dump_by_alias() -> None:
params = ThreadListParams(search_term="needle", limit=5)
assert "search_term" in ThreadListParams.model_fields
dumped = _params_dict(params)
assert dumped == {"searchTerm": "needle", "limit": 5}
def test_generated_v2_bundle_has_single_shared_plan_type_definition() -> None:
source = (ROOT / "src" / "codex_app_server" / "generated" / "v2_all.py").read_text()
assert source.count("class PlanType(") == 1
def test_notifications_are_typed_with_canonical_v2_methods() -> None:
client = AppServerClient()
event = client._coerce_notification(
"thread/tokenUsage/updated",
{
"threadId": "thread-1",
"turnId": "turn-1",
"tokenUsage": {
"last": {
"cachedInputTokens": 0,
"inputTokens": 1,
"outputTokens": 2,
"reasoningOutputTokens": 0,
"totalTokens": 3,
},
"total": {
"cachedInputTokens": 0,
"inputTokens": 1,
"outputTokens": 2,
"reasoningOutputTokens": 0,
"totalTokens": 3,
},
},
},
)
assert event.method == "thread/tokenUsage/updated"
assert isinstance(event.payload, ThreadTokenUsageUpdatedNotification)
assert event.payload.turn_id == "turn-1"
def test_codex_event_notifications_are_typed() -> None:
client = AppServerClient()
event = client._coerce_notification(
"codex/event/turn_aborted",
{
"id": "evt-1",
"conversationId": "thread-1",
"msg": {"type": "turn_aborted"},
},
)
assert event.method == "codex/event/turn_aborted"
assert isinstance(event.payload, CodexEventNotification)
assert event.payload.msg.type == "turn_aborted"
def test_codex_event_blank_ids_are_normalized_to_none() -> None:
client = AppServerClient()
event = client._coerce_notification(
"codex/event/mcp_startup_complete",
{
"id": "",
"conversationId": "",
"msg": {"type": "mcp_startup_complete"},
},
)
assert isinstance(event.payload, CodexEventNotification)
assert event.payload.id is None
assert event.payload.conversationId is None
def test_invalid_notification_payload_falls_back_to_unknown() -> None:
client = AppServerClient()
event = client._coerce_notification("thread/tokenUsage/updated", {"threadId": "missing"})
assert event.method == "thread/tokenUsage/updated"
assert isinstance(event.payload, UnknownNotification)

View File

@@ -0,0 +1,53 @@
from __future__ import annotations
import os
import subprocess
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
GENERATED_TARGETS = [
Path("src/codex_app_server/generated/codex_event_types.py"),
Path("src/codex_app_server/generated/notification_registry.py"),
Path("src/codex_app_server/generated/v2_all.py"),
Path("src/codex_app_server/public_api.py"),
]
def _snapshot_target(root: Path, rel_path: Path) -> dict[str, bytes] | bytes | None:
target = root / rel_path
if not target.exists():
return None
if target.is_file():
return target.read_bytes()
snapshot: dict[str, bytes] = {}
for path in sorted(target.rglob("*")):
if path.is_file() and "__pycache__" not in path.parts:
snapshot[str(path.relative_to(target))] = path.read_bytes()
return snapshot
def _snapshot_targets(root: Path) -> dict[str, dict[str, bytes] | bytes | None]:
return {
str(rel_path): _snapshot_target(root, rel_path) for rel_path in GENERATED_TARGETS
}
def test_generated_files_are_up_to_date():
before = _snapshot_targets(ROOT)
# Regenerate contract artifacts via single maintenance entrypoint.
env = os.environ.copy()
python_bin = str(Path(sys.executable).parent)
env["PATH"] = f"{python_bin}{os.pathsep}{env.get('PATH', '')}"
subprocess.run(
[sys.executable, "scripts/update_sdk_artifacts.py", "--types-only"],
cwd=ROOT,
check=True,
env=env,
)
after = _snapshot_targets(ROOT)
assert before == after, "Generated files drifted after regeneration"