mirror of
https://github.com/openai/codex.git
synced 2026-05-14 16:22:51 +00:00
## Why The Python SDK needs the same tight formatter/lint loop as the rest of the repo: a safe Ruff autofix pass, Ruff formatting, editor save behavior, and CI checks that catch drift. Without that loop, SDK changes can land with formatting or import ordering that differs from what reviewers and CI expect. ## What - Add Ruff configuration to `sdk/python/pyproject.toml`, excluding generated protocol code and notebooks from the normal lint/format pass. - Update `just fmt` so it still formats Rust and also runs Python SDK Ruff autofix and formatting. - Add Python SDK CI steps for `ruff check` and `ruff format --check` before pytest. - Recommend the Ruff VS Code extension and enable Python format/fix/organize-on-save so Cmd+S uses the same tooling. - Apply the resulting Ruff formatting to SDK Python files, examples, and the checked-in generated `v2_all.py` output emitted by the pinned generator. - Add a guard test for the `just fmt` recipe so it keeps working from both Rust and Python SDK working directories. ## Stack 1. #21891 `[1/8]` Pin Python SDK runtime dependency 2. #21893 `[2/8]` Generate Python SDK types from pinned runtime 3. #21895 `[3/8]` Run Python SDK tests in CI 4. #21896 `[4/8]` Define Python SDK public API surface 5. #21905 `[5/8]` Rename Python SDK package to `openai-codex` 6. #21910 `[6/8]` Add high-level Python SDK approval mode 7. #22014 `[7/8]` Add Python SDK app-server integration harness 8. This PR `[8/8]` Add Python SDK Ruff formatting ## Verification - Added `test_root_fmt_recipe_formats_rust_and_python_sdk` for the shared format recipe. - Ran `just fmt` after the recipe update. --------- Co-authored-by: Codex <noreply@openai.com>
156 lines
4.7 KiB
Python
156 lines
4.7 KiB
Python
from __future__ import annotations
|
|
|
|
import contextlib
|
|
import importlib.util
|
|
import sys
|
|
import tempfile
|
|
import zlib
|
|
from pathlib import Path
|
|
from typing import Iterable, Iterator
|
|
|
|
_SDK_PYTHON_DIR = Path(__file__).resolve().parents[1]
|
|
_SDK_PYTHON_STR = str(_SDK_PYTHON_DIR)
|
|
if _SDK_PYTHON_STR not in sys.path:
|
|
sys.path.insert(0, _SDK_PYTHON_STR)
|
|
|
|
from _runtime_setup import ensure_runtime_package_installed
|
|
|
|
|
|
def _ensure_runtime_dependencies(sdk_python_dir: Path) -> None:
|
|
if importlib.util.find_spec("pydantic") is not None:
|
|
return
|
|
|
|
python = sys.executable
|
|
raise RuntimeError(
|
|
"Missing required dependency: pydantic.\n"
|
|
f"Interpreter: {python}\n"
|
|
"Install dependencies with the same interpreter used to run this example:\n"
|
|
f" cd {sdk_python_dir} && uv sync\n"
|
|
"Then activate `.venv`, or reinstall with the Python interpreter above."
|
|
)
|
|
|
|
|
|
def ensure_local_sdk_src() -> Path:
|
|
"""Add sdk/python/src to sys.path so examples run without installing the package."""
|
|
sdk_python_dir = _SDK_PYTHON_DIR
|
|
src_dir = sdk_python_dir / "src"
|
|
package_dir = src_dir / "openai_codex"
|
|
if not package_dir.exists():
|
|
raise RuntimeError(f"Could not locate local SDK package at {package_dir}")
|
|
|
|
_ensure_runtime_dependencies(sdk_python_dir)
|
|
|
|
src_str = str(src_dir)
|
|
if src_str not in sys.path:
|
|
sys.path.insert(0, src_str)
|
|
return src_dir
|
|
|
|
|
|
def runtime_config():
|
|
"""Return an example-friendly AppServerConfig for repo-source SDK usage."""
|
|
from openai_codex import AppServerConfig
|
|
|
|
ensure_runtime_package_installed(sys.executable, _SDK_PYTHON_DIR)
|
|
return AppServerConfig()
|
|
|
|
|
|
def _png_chunk(chunk_type: bytes, data: bytes) -> bytes:
|
|
import struct
|
|
|
|
payload = chunk_type + data
|
|
checksum = zlib.crc32(payload) & 0xFFFFFFFF
|
|
return struct.pack(">I", len(data)) + payload + struct.pack(">I", checksum)
|
|
|
|
|
|
def _generated_sample_png_bytes() -> bytes:
|
|
import struct
|
|
|
|
width = 96
|
|
height = 96
|
|
top_left = (120, 180, 255)
|
|
top_right = (255, 220, 90)
|
|
bottom_left = (90, 180, 95)
|
|
bottom_right = (180, 85, 85)
|
|
|
|
rows = bytearray()
|
|
for y in range(height):
|
|
rows.append(0)
|
|
for x in range(width):
|
|
if y < height // 2 and x < width // 2:
|
|
color = top_left
|
|
elif y < height // 2:
|
|
color = top_right
|
|
elif x < width // 2:
|
|
color = bottom_left
|
|
else:
|
|
color = bottom_right
|
|
rows.extend(color)
|
|
|
|
header = struct.pack(">IIBBBBB", width, height, 8, 2, 0, 0, 0)
|
|
return (
|
|
b"\x89PNG\r\n\x1a\n"
|
|
+ _png_chunk(b"IHDR", header)
|
|
+ _png_chunk(b"IDAT", zlib.compress(bytes(rows)))
|
|
+ _png_chunk(b"IEND", b"")
|
|
)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def temporary_sample_image_path() -> Iterator[Path]:
|
|
with tempfile.TemporaryDirectory(prefix="codex-python-example-image-") as temp_root:
|
|
image_path = Path(temp_root) / "generated_sample.png"
|
|
image_path.write_bytes(_generated_sample_png_bytes())
|
|
yield image_path
|
|
|
|
|
|
def server_label(metadata: object) -> str:
|
|
server = getattr(metadata, "serverInfo", None)
|
|
server_name = ((getattr(server, "name", None) or "") if server is not None else "").strip()
|
|
server_version = (
|
|
(getattr(server, "version", None) or "") if server is not None else ""
|
|
).strip()
|
|
if server_name and server_version:
|
|
return f"{server_name} {server_version}"
|
|
|
|
user_agent = (
|
|
(getattr(metadata, "userAgent", None) or "") if metadata is not None else ""
|
|
).strip()
|
|
return user_agent or "unknown"
|
|
|
|
|
|
def find_turn_by_id(turns: Iterable[object] | None, turn_id: str) -> object | None:
|
|
for turn in turns or []:
|
|
if getattr(turn, "id", None) == turn_id:
|
|
return turn
|
|
return None
|
|
|
|
|
|
def assistant_text_from_turn(turn: object | None) -> str:
|
|
if turn is None:
|
|
return ""
|
|
|
|
chunks: list[str] = []
|
|
for item in getattr(turn, "items", []) or []:
|
|
raw_item = item.model_dump(mode="json") if hasattr(item, "model_dump") else item
|
|
if not isinstance(raw_item, dict):
|
|
continue
|
|
|
|
item_type = raw_item.get("type")
|
|
if item_type == "agentMessage":
|
|
text = raw_item.get("text")
|
|
if isinstance(text, str) and text:
|
|
chunks.append(text)
|
|
continue
|
|
|
|
if item_type != "message" or raw_item.get("role") != "assistant":
|
|
continue
|
|
|
|
for content in raw_item.get("content") or []:
|
|
if not isinstance(content, dict) or content.get("type") != "output_text":
|
|
continue
|
|
text = content.get("text")
|
|
if isinstance(text, str) and text:
|
|
chunks.append(text)
|
|
|
|
return "".join(chunks)
|