Add Python SDK public API and examples (#14446)

## TL;DR
WIP esp the examples

Thin the Python SDK public surface so the wrapper layer returns
canonical app-server generated models directly.

- keeps `Codex` / `AsyncCodex` / `Thread` / `Turn` and input helpers,
but removes alias-only type layers and custom result models
- `metadata` now returns `InitializeResponse` and `run()` returns the
generated app-server `Turn`
- updates docs, examples, notebook, and tests to use canonical generated
types and regenerates `v2_all.py` against current schema
- keeps the pinned runtime-package integration flow and real integration
coverage

  ## Validation
  - `PYTHONPATH=sdk/python/src python3 -m pytest sdk/python/tests`
- `GH_TOKEN="$(gh auth token)" RUN_REAL_CODEX_TESTS=1
PYTHONPATH=sdk/python/src python3 -m pytest sdk/python/tests -rs`

---------

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Shaqayeq
2026-03-17 16:05:56 -07:00
committed by GitHub
parent 0d1539e74c
commit fc75d07504
46 changed files with 5081 additions and 69 deletions

View File

@@ -2,9 +2,11 @@ from __future__ import annotations
import ast
import importlib.util
import io
import json
import sys
import tomllib
import urllib.error
from pathlib import Path
import pytest
@@ -23,6 +25,17 @@ def _load_update_script_module():
return module
def _load_runtime_setup_module():
runtime_setup_path = ROOT / "_runtime_setup.py"
spec = importlib.util.spec_from_file_location("_runtime_setup", runtime_setup_path)
if spec is None or spec.loader is None:
raise AssertionError(f"Failed to load runtime setup module: {runtime_setup_path}")
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module
def test_generation_has_single_maintenance_entrypoint_script() -> None:
scripts = sorted(p.name for p in (ROOT / "scripts").glob("*.py"))
assert scripts == ["update_sdk_artifacts.py"]
@@ -146,6 +159,39 @@ def test_runtime_package_template_has_no_checked_in_binaries() -> None:
) == ["__init__.py"]
def test_examples_readme_matches_pinned_runtime_version() -> None:
runtime_setup = _load_runtime_setup_module()
readme = (ROOT / "examples" / "README.md").read_text()
assert (
f"Current pinned runtime version: `{runtime_setup.pinned_runtime_version()}`"
in readme
)
def test_release_metadata_retries_without_invalid_auth(monkeypatch: pytest.MonkeyPatch) -> None:
runtime_setup = _load_runtime_setup_module()
authorizations: list[str | None] = []
def fake_urlopen(request):
authorization = request.headers.get("Authorization")
authorizations.append(authorization)
if authorization is not None:
raise urllib.error.HTTPError(
request.full_url,
401,
"Unauthorized",
hdrs=None,
fp=None,
)
return io.StringIO('{"assets": []}')
monkeypatch.setenv("GH_TOKEN", "invalid-token")
monkeypatch.setattr(runtime_setup.urllib.request, "urlopen", fake_urlopen)
assert runtime_setup._release_metadata("1.2.3") == {"assets": []}
assert authorizations == ["Bearer invalid-token", None]
def test_runtime_package_is_wheel_only_and_builds_platform_specific_wheels() -> None:
pyproject = tomllib.loads(
(ROOT.parent / "python-runtime" / "pyproject.toml").read_text()

View File

@@ -0,0 +1,64 @@
from __future__ import annotations
import asyncio
import time
from codex_app_server.async_client import AsyncAppServerClient
def test_async_client_serializes_transport_calls() -> None:
async def scenario() -> int:
client = AsyncAppServerClient()
active = 0
max_active = 0
def fake_model_list(include_hidden: bool = False) -> bool:
nonlocal active, max_active
active += 1
max_active = max(max_active, active)
time.sleep(0.05)
active -= 1
return include_hidden
client._sync.model_list = fake_model_list # type: ignore[method-assign]
await asyncio.gather(client.model_list(), client.model_list())
return max_active
assert asyncio.run(scenario()) == 1
def test_async_stream_text_is_incremental_and_blocks_parallel_calls() -> None:
async def scenario() -> tuple[str, list[str], bool]:
client = AsyncAppServerClient()
def fake_stream_text(thread_id: str, text: str, params=None): # type: ignore[no-untyped-def]
yield "first"
time.sleep(0.03)
yield "second"
yield "third"
def fake_model_list(include_hidden: bool = False) -> str:
return "done"
client._sync.stream_text = fake_stream_text # type: ignore[method-assign]
client._sync.model_list = fake_model_list # type: ignore[method-assign]
stream = client.stream_text("thread-1", "hello")
first = await anext(stream)
blocked_before_stream_done = False
competing_call = asyncio.create_task(client.model_list())
await asyncio.sleep(0.01)
blocked_before_stream_done = not competing_call.done()
remaining: list[str] = []
async for item in stream:
remaining.append(item)
await competing_call
return first, remaining, blocked_before_stream_done
first, remaining, blocked = asyncio.run(scenario())
assert first == "first"
assert remaining == ["second", "third"]
assert blocked

View File

@@ -9,7 +9,7 @@ ROOT = Path(__file__).resolve().parents[1]
GENERATED_TARGETS = [
Path("src/codex_app_server/generated/notification_registry.py"),
Path("src/codex_app_server/generated/v2_all.py"),
Path("src/codex_app_server/public_api.py"),
Path("src/codex_app_server/api.py"),
]

View File

@@ -0,0 +1,235 @@
from __future__ import annotations
import asyncio
from collections import deque
from pathlib import Path
import pytest
import codex_app_server.api as public_api_module
from codex_app_server.client import AppServerClient
from codex_app_server.generated.v2_all import (
AgentMessageDeltaNotification,
TurnCompletedNotification,
TurnStatus,
)
from codex_app_server.models import InitializeResponse, Notification
from codex_app_server.api import (
AsyncCodex,
AsyncTurnHandle,
Codex,
TurnHandle,
)
ROOT = Path(__file__).resolve().parents[1]
def _delta_notification(
*,
thread_id: str = "thread-1",
turn_id: str = "turn-1",
text: str = "delta-text",
) -> Notification:
return Notification(
method="item/agentMessage/delta",
payload=AgentMessageDeltaNotification.model_validate(
{
"delta": text,
"itemId": "item-1",
"threadId": thread_id,
"turnId": turn_id,
}
),
)
def _completed_notification(
*,
thread_id: str = "thread-1",
turn_id: str = "turn-1",
status: str = "completed",
) -> Notification:
return Notification(
method="turn/completed",
payload=TurnCompletedNotification.model_validate(
{
"threadId": thread_id,
"turn": {
"id": turn_id,
"items": [],
"status": status,
},
}
),
)
def test_codex_init_failure_closes_client(monkeypatch: pytest.MonkeyPatch) -> None:
closed: list[bool] = []
class FakeClient:
def __init__(self, config=None) -> None: # noqa: ANN001,ARG002
self._closed = False
def start(self) -> None:
return None
def initialize(self) -> InitializeResponse:
return InitializeResponse.model_validate({})
def close(self) -> None:
self._closed = True
closed.append(True)
monkeypatch.setattr(public_api_module, "AppServerClient", FakeClient)
with pytest.raises(RuntimeError, match="missing required metadata"):
Codex()
assert closed == [True]
def test_async_codex_init_failure_closes_client() -> None:
async def scenario() -> None:
codex = AsyncCodex()
close_calls = 0
async def fake_start() -> None:
return None
async def fake_initialize() -> InitializeResponse:
return InitializeResponse.model_validate({})
async def fake_close() -> None:
nonlocal close_calls
close_calls += 1
codex._client.start = fake_start # type: ignore[method-assign]
codex._client.initialize = fake_initialize # type: ignore[method-assign]
codex._client.close = fake_close # type: ignore[method-assign]
with pytest.raises(RuntimeError, match="missing required metadata"):
await codex.models()
assert close_calls == 1
assert codex._initialized is False
assert codex._init is None
asyncio.run(scenario())
def test_async_codex_initializes_only_once_under_concurrency() -> None:
async def scenario() -> None:
codex = AsyncCodex()
start_calls = 0
initialize_calls = 0
ready = asyncio.Event()
async def fake_start() -> None:
nonlocal start_calls
start_calls += 1
async def fake_initialize() -> InitializeResponse:
nonlocal initialize_calls
initialize_calls += 1
ready.set()
await asyncio.sleep(0.02)
return InitializeResponse.model_validate(
{
"userAgent": "codex-cli/1.2.3",
"serverInfo": {"name": "codex-cli", "version": "1.2.3"},
}
)
async def fake_model_list(include_hidden: bool = False): # noqa: ANN202,ARG001
await ready.wait()
return object()
codex._client.start = fake_start # type: ignore[method-assign]
codex._client.initialize = fake_initialize # type: ignore[method-assign]
codex._client.model_list = fake_model_list # type: ignore[method-assign]
await asyncio.gather(codex.models(), codex.models())
assert start_calls == 1
assert initialize_calls == 1
asyncio.run(scenario())
def test_turn_stream_rejects_second_active_consumer() -> None:
client = AppServerClient()
notifications: deque[Notification] = deque(
[
_delta_notification(turn_id="turn-1"),
_completed_notification(turn_id="turn-1"),
]
)
client.next_notification = notifications.popleft # type: ignore[method-assign]
first_stream = TurnHandle(client, "thread-1", "turn-1").stream()
assert next(first_stream).method == "item/agentMessage/delta"
second_stream = TurnHandle(client, "thread-1", "turn-2").stream()
with pytest.raises(RuntimeError, match="Concurrent turn consumers are not yet supported"):
next(second_stream)
first_stream.close()
def test_async_turn_stream_rejects_second_active_consumer() -> None:
async def scenario() -> None:
codex = AsyncCodex()
async def fake_ensure_initialized() -> None:
return None
notifications: deque[Notification] = deque(
[
_delta_notification(turn_id="turn-1"),
_completed_notification(turn_id="turn-1"),
]
)
async def fake_next_notification() -> Notification:
return notifications.popleft()
codex._ensure_initialized = fake_ensure_initialized # type: ignore[method-assign]
codex._client.next_notification = fake_next_notification # type: ignore[method-assign]
first_stream = AsyncTurnHandle(codex, "thread-1", "turn-1").stream()
assert (await anext(first_stream)).method == "item/agentMessage/delta"
second_stream = AsyncTurnHandle(codex, "thread-1", "turn-2").stream()
with pytest.raises(RuntimeError, match="Concurrent turn consumers are not yet supported"):
await anext(second_stream)
await first_stream.aclose()
asyncio.run(scenario())
def test_turn_run_returns_completed_turn_payload() -> None:
client = AppServerClient()
notifications: deque[Notification] = deque(
[
_completed_notification(),
]
)
client.next_notification = notifications.popleft # type: ignore[method-assign]
result = TurnHandle(client, "thread-1", "turn-1").run()
assert result.id == "turn-1"
assert result.status == TurnStatus.completed
assert result.items == []
def test_retry_examples_compare_status_with_enum() -> None:
for path in (
ROOT / "examples" / "10_error_handling_and_retry" / "sync.py",
ROOT / "examples" / "10_error_handling_and_retry" / "async.py",
):
source = path.read_text()
assert '== "failed"' not in source
assert "TurnStatus.failed" in source

View File

@@ -0,0 +1,222 @@
from __future__ import annotations
import importlib.resources as resources
import inspect
from typing import Any
from codex_app_server import AppServerConfig
from codex_app_server.models import InitializeResponse
from codex_app_server.api import AsyncCodex, AsyncThread, Codex, Thread
def _keyword_only_names(fn: object) -> list[str]:
signature = inspect.signature(fn)
return [
param.name
for param in signature.parameters.values()
if param.kind == inspect.Parameter.KEYWORD_ONLY
]
def _assert_no_any_annotations(fn: object) -> None:
signature = inspect.signature(fn)
for param in signature.parameters.values():
if param.annotation is Any:
raise AssertionError(f"{fn} has public parameter typed as Any: {param.name}")
if signature.return_annotation is Any:
raise AssertionError(f"{fn} has public return annotation typed as Any")
def test_root_exports_app_server_config() -> None:
assert AppServerConfig.__name__ == "AppServerConfig"
def test_package_includes_py_typed_marker() -> None:
marker = resources.files("codex_app_server").joinpath("py.typed")
assert marker.is_file()
def test_generated_public_signatures_are_snake_case_and_typed() -> None:
expected = {
Codex.thread_start: [
"approval_policy",
"approvals_reviewer",
"base_instructions",
"config",
"cwd",
"developer_instructions",
"ephemeral",
"model",
"model_provider",
"personality",
"sandbox",
"service_name",
"service_tier",
],
Codex.thread_list: [
"archived",
"cursor",
"cwd",
"limit",
"model_providers",
"search_term",
"sort_key",
"source_kinds",
],
Codex.thread_resume: [
"approval_policy",
"approvals_reviewer",
"base_instructions",
"config",
"cwd",
"developer_instructions",
"model",
"model_provider",
"personality",
"sandbox",
"service_tier",
],
Codex.thread_fork: [
"approval_policy",
"approvals_reviewer",
"base_instructions",
"config",
"cwd",
"developer_instructions",
"ephemeral",
"model",
"model_provider",
"sandbox",
"service_tier",
],
Thread.turn: [
"approval_policy",
"approvals_reviewer",
"cwd",
"effort",
"model",
"output_schema",
"personality",
"sandbox_policy",
"service_tier",
"summary",
],
AsyncCodex.thread_start: [
"approval_policy",
"approvals_reviewer",
"base_instructions",
"config",
"cwd",
"developer_instructions",
"ephemeral",
"model",
"model_provider",
"personality",
"sandbox",
"service_name",
"service_tier",
],
AsyncCodex.thread_list: [
"archived",
"cursor",
"cwd",
"limit",
"model_providers",
"search_term",
"sort_key",
"source_kinds",
],
AsyncCodex.thread_resume: [
"approval_policy",
"approvals_reviewer",
"base_instructions",
"config",
"cwd",
"developer_instructions",
"model",
"model_provider",
"personality",
"sandbox",
"service_tier",
],
AsyncCodex.thread_fork: [
"approval_policy",
"approvals_reviewer",
"base_instructions",
"config",
"cwd",
"developer_instructions",
"ephemeral",
"model",
"model_provider",
"sandbox",
"service_tier",
],
AsyncThread.turn: [
"approval_policy",
"approvals_reviewer",
"cwd",
"effort",
"model",
"output_schema",
"personality",
"sandbox_policy",
"service_tier",
"summary",
],
}
for fn, expected_kwargs in expected.items():
actual = _keyword_only_names(fn)
assert actual == expected_kwargs, f"unexpected kwargs for {fn}: {actual}"
assert all(name == name.lower() for name in actual), f"non snake_case kwargs in {fn}: {actual}"
_assert_no_any_annotations(fn)
def test_lifecycle_methods_are_codex_scoped() -> None:
assert hasattr(Codex, "thread_resume")
assert hasattr(Codex, "thread_fork")
assert hasattr(Codex, "thread_archive")
assert hasattr(Codex, "thread_unarchive")
assert hasattr(AsyncCodex, "thread_resume")
assert hasattr(AsyncCodex, "thread_fork")
assert hasattr(AsyncCodex, "thread_archive")
assert hasattr(AsyncCodex, "thread_unarchive")
assert not hasattr(Codex, "thread")
assert not hasattr(AsyncCodex, "thread")
assert not hasattr(Thread, "resume")
assert not hasattr(Thread, "fork")
assert not hasattr(Thread, "archive")
assert not hasattr(Thread, "unarchive")
assert not hasattr(AsyncThread, "resume")
assert not hasattr(AsyncThread, "fork")
assert not hasattr(AsyncThread, "archive")
assert not hasattr(AsyncThread, "unarchive")
for fn in (
Codex.thread_archive,
Codex.thread_unarchive,
AsyncCodex.thread_archive,
AsyncCodex.thread_unarchive,
):
_assert_no_any_annotations(fn)
def test_initialize_metadata_parses_user_agent_shape() -> None:
payload = InitializeResponse.model_validate({"userAgent": "codex-cli/1.2.3"})
parsed = Codex._validate_initialize(payload)
assert parsed is payload
assert parsed.userAgent == "codex-cli/1.2.3"
assert parsed.serverInfo is not None
assert parsed.serverInfo.name == "codex-cli"
assert parsed.serverInfo.version == "1.2.3"
def test_initialize_metadata_requires_non_empty_information() -> None:
try:
Codex._validate_initialize(InitializeResponse.model_validate({}))
except RuntimeError as exc:
assert "missing required metadata" in str(exc)
else:
raise AssertionError("expected RuntimeError when initialize metadata is missing")

View File

@@ -0,0 +1,479 @@
from __future__ import annotations
import json
import os
import subprocess
import sys
import tempfile
import textwrap
from dataclasses import dataclass
from pathlib import Path
import pytest
ROOT = Path(__file__).resolve().parents[1]
EXAMPLES_DIR = ROOT / "examples"
NOTEBOOK_PATH = ROOT / "notebooks" / "sdk_walkthrough.ipynb"
root_str = str(ROOT)
if root_str not in sys.path:
sys.path.insert(0, root_str)
from _runtime_setup import ensure_runtime_package_installed, pinned_runtime_version
RUN_REAL_CODEX_TESTS = os.environ.get("RUN_REAL_CODEX_TESTS") == "1"
pytestmark = pytest.mark.skipif(
not RUN_REAL_CODEX_TESTS,
reason="set RUN_REAL_CODEX_TESTS=1 to run real Codex integration coverage",
)
# 11_cli_mini_app is interactive; we still run it by feeding one prompt, then '/exit'.
EXAMPLE_CASES: list[tuple[str, str]] = [
("01_quickstart_constructor", "sync.py"),
("01_quickstart_constructor", "async.py"),
("02_turn_run", "sync.py"),
("02_turn_run", "async.py"),
("03_turn_stream_events", "sync.py"),
("03_turn_stream_events", "async.py"),
("04_models_and_metadata", "sync.py"),
("04_models_and_metadata", "async.py"),
("05_existing_thread", "sync.py"),
("05_existing_thread", "async.py"),
("06_thread_lifecycle_and_controls", "sync.py"),
("06_thread_lifecycle_and_controls", "async.py"),
("07_image_and_text", "sync.py"),
("07_image_and_text", "async.py"),
("08_local_image_and_text", "sync.py"),
("08_local_image_and_text", "async.py"),
("09_async_parity", "sync.py"),
# 09_async_parity async path is represented by 01 async + dedicated async-based cases above.
("10_error_handling_and_retry", "sync.py"),
("10_error_handling_and_retry", "async.py"),
("11_cli_mini_app", "sync.py"),
("11_cli_mini_app", "async.py"),
("12_turn_params_kitchen_sink", "sync.py"),
("12_turn_params_kitchen_sink", "async.py"),
("13_model_select_and_turn_params", "sync.py"),
("13_model_select_and_turn_params", "async.py"),
("14_turn_controls", "sync.py"),
("14_turn_controls", "async.py"),
]
@dataclass(frozen=True)
class PreparedRuntimeEnv:
python: str
env: dict[str, str]
runtime_version: str
@pytest.fixture(scope="session")
def runtime_env(tmp_path_factory: pytest.TempPathFactory) -> PreparedRuntimeEnv:
runtime_version = pinned_runtime_version()
temp_root = tmp_path_factory.mktemp("python-runtime-env")
isolated_site = temp_root / "site-packages"
python = sys.executable
_run_command(
[
python,
"-m",
"pip",
"install",
"--target",
str(isolated_site),
"pydantic>=2.12",
],
cwd=ROOT,
env=os.environ.copy(),
timeout_s=240,
)
ensure_runtime_package_installed(
python,
ROOT,
install_target=isolated_site,
)
env = os.environ.copy()
env["PYTHONPATH"] = os.pathsep.join([str(isolated_site), str(ROOT / "src")])
env["CODEX_PYTHON_SDK_DIR"] = str(ROOT)
return PreparedRuntimeEnv(python=python, env=env, runtime_version=runtime_version)
def _run_command(
args: list[str],
*,
cwd: Path,
env: dict[str, str],
timeout_s: int,
stdin: str | None = None,
) -> subprocess.CompletedProcess[str]:
return subprocess.run(
args,
cwd=str(cwd),
env=env,
input=stdin,
text=True,
capture_output=True,
timeout=timeout_s,
check=False,
)
def _run_python(
runtime_env: PreparedRuntimeEnv,
source: str,
*,
cwd: Path | None = None,
timeout_s: int = 180,
) -> subprocess.CompletedProcess[str]:
return _run_command(
[str(runtime_env.python), "-c", source],
cwd=cwd or ROOT,
env=runtime_env.env,
timeout_s=timeout_s,
)
def _runtime_compatibility_hint(
runtime_env: PreparedRuntimeEnv,
*,
stdout: str,
stderr: str,
) -> str:
combined = f"{stdout}\n{stderr}"
if "ThreadStartResponse" in combined and "approvalsReviewer" in combined:
return (
"\nCompatibility hint:\n"
f"Pinned runtime {runtime_env.runtime_version} returned a thread/start payload "
"that is older than the current SDK schema and is missing "
"`approvalsReviewer`. Bump `sdk/python/_runtime_setup.py` to a matching "
"released runtime version.\n"
)
return ""
def _run_json_python(
runtime_env: PreparedRuntimeEnv,
source: str,
*,
cwd: Path | None = None,
timeout_s: int = 180,
) -> dict[str, object]:
result = _run_python(runtime_env, source, cwd=cwd, timeout_s=timeout_s)
assert result.returncode == 0, (
"Python snippet failed.\n"
f"STDOUT:\n{result.stdout}\n"
f"STDERR:\n{result.stderr}"
f"{_runtime_compatibility_hint(runtime_env, stdout=result.stdout, stderr=result.stderr)}"
)
return json.loads(result.stdout)
def _run_example(
runtime_env: PreparedRuntimeEnv,
folder: str,
script: str,
*,
timeout_s: int = 180,
) -> subprocess.CompletedProcess[str]:
path = EXAMPLES_DIR / folder / script
assert path.exists(), f"Missing example script: {path}"
stdin = (
"Give 3 short bullets on SIMD.\nNow rewrite that as 1 short sentence.\n/exit\n"
if folder == "11_cli_mini_app"
else None
)
return _run_command(
[str(runtime_env.python), str(path)],
cwd=ROOT,
env=runtime_env.env,
timeout_s=timeout_s,
stdin=stdin,
)
def _notebook_cell_source(cell_index: int) -> str:
notebook = json.loads(NOTEBOOK_PATH.read_text())
return "".join(notebook["cells"][cell_index]["source"])
def test_real_initialize_and_model_list(runtime_env: PreparedRuntimeEnv) -> None:
data = _run_json_python(
runtime_env,
textwrap.dedent(
"""
import json
from codex_app_server import Codex
with Codex() as codex:
models = codex.models(include_hidden=True)
server = codex.metadata.serverInfo
print(json.dumps({
"user_agent": codex.metadata.userAgent,
"server_name": None if server is None else server.name,
"server_version": None if server is None else server.version,
"model_count": len(models.data),
}))
"""
),
)
assert isinstance(data["user_agent"], str) and data["user_agent"].strip()
if data["server_name"] is not None:
assert isinstance(data["server_name"], str) and data["server_name"].strip()
if data["server_version"] is not None:
assert isinstance(data["server_version"], str) and data["server_version"].strip()
assert isinstance(data["model_count"], int)
def test_real_thread_and_turn_start_smoke(runtime_env: PreparedRuntimeEnv) -> None:
data = _run_json_python(
runtime_env,
textwrap.dedent(
"""
import json
from codex_app_server import Codex, TextInput
with Codex() as codex:
thread = codex.thread_start(
model="gpt-5.4",
config={"model_reasoning_effort": "high"},
)
result = thread.turn(TextInput("hello")).run()
persisted = thread.read(include_turns=True)
persisted_turn = next(
(turn for turn in persisted.thread.turns or [] if turn.id == result.id),
None,
)
print(json.dumps({
"thread_id": thread.id,
"turn_id": result.id,
"status": result.status.value,
"items_count": len(result.items or []),
"persisted_items_count": 0 if persisted_turn is None else len(persisted_turn.items or []),
}))
"""
),
)
assert isinstance(data["thread_id"], str) and data["thread_id"].strip()
assert isinstance(data["turn_id"], str) and data["turn_id"].strip()
assert data["status"] == "completed"
assert isinstance(data["items_count"], int)
assert isinstance(data["persisted_items_count"], int)
def test_real_async_thread_turn_usage_and_ids_smoke(
runtime_env: PreparedRuntimeEnv,
) -> None:
data = _run_json_python(
runtime_env,
textwrap.dedent(
"""
import asyncio
import json
from codex_app_server import AsyncCodex, TextInput
async def main():
async with AsyncCodex() as codex:
thread = await codex.thread_start(
model="gpt-5.4",
config={"model_reasoning_effort": "high"},
)
result = await (await thread.turn(TextInput("say ok"))).run()
persisted = await thread.read(include_turns=True)
persisted_turn = next(
(turn for turn in persisted.thread.turns or [] if turn.id == result.id),
None,
)
print(json.dumps({
"thread_id": thread.id,
"turn_id": result.id,
"status": result.status.value,
"items_count": len(result.items or []),
"persisted_items_count": 0 if persisted_turn is None else len(persisted_turn.items or []),
}))
asyncio.run(main())
"""
),
)
assert isinstance(data["thread_id"], str) and data["thread_id"].strip()
assert isinstance(data["turn_id"], str) and data["turn_id"].strip()
assert data["status"] == "completed"
assert isinstance(data["items_count"], int)
assert isinstance(data["persisted_items_count"], int)
def test_notebook_bootstrap_resolves_sdk_and_runtime_from_unrelated_cwd(
runtime_env: PreparedRuntimeEnv,
) -> None:
cell_1_source = _notebook_cell_source(1)
env = runtime_env.env.copy()
with tempfile.TemporaryDirectory() as temp_cwd:
result = _run_command(
[str(runtime_env.python), "-c", cell_1_source],
cwd=Path(temp_cwd),
env=env,
timeout_s=180,
)
assert result.returncode == 0, (
f"Notebook bootstrap failed from unrelated cwd.\n"
f"STDOUT:\n{result.stdout}\n"
f"STDERR:\n{result.stderr}"
)
assert "SDK source:" in result.stdout
assert f"Runtime package: {runtime_env.runtime_version}" in result.stdout
def test_notebook_sync_cell_smoke(runtime_env: PreparedRuntimeEnv) -> None:
source = "\n\n".join(
[
_notebook_cell_source(1),
_notebook_cell_source(2),
_notebook_cell_source(3),
]
)
result = _run_python(runtime_env, source, timeout_s=240)
assert result.returncode == 0, (
f"Notebook sync smoke failed.\nSTDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}"
)
assert "status:" in result.stdout
assert "server:" in result.stdout
def test_notebook_advanced_cell_smoke(runtime_env: PreparedRuntimeEnv) -> None:
source = "\n\n".join(
[
_notebook_cell_source(1),
_notebook_cell_source(2),
_notebook_cell_source(7),
]
)
result = _run_python(runtime_env, source, timeout_s=360)
assert result.returncode == 0, (
f"Notebook advanced smoke failed.\nSTDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}"
)
assert "selected.model:" in result.stdout
assert "agent.message.params:" in result.stdout
assert "items.params:" in result.stdout
def test_real_streaming_smoke_turn_completed(runtime_env: PreparedRuntimeEnv) -> None:
data = _run_json_python(
runtime_env,
textwrap.dedent(
"""
import json
from codex_app_server import Codex, TextInput
with Codex() as codex:
thread = codex.thread_start(
model="gpt-5.4",
config={"model_reasoning_effort": "high"},
)
turn = thread.turn(TextInput("Reply with one short sentence."))
saw_delta = False
saw_completed = False
for event in turn.stream():
if event.method == "item/agentMessage/delta":
saw_delta = True
if event.method == "turn/completed":
saw_completed = True
print(json.dumps({
"saw_delta": saw_delta,
"saw_completed": saw_completed,
}))
"""
),
)
assert data["saw_completed"] is True
assert isinstance(data["saw_delta"], bool)
def test_real_turn_interrupt_smoke(runtime_env: PreparedRuntimeEnv) -> None:
data = _run_json_python(
runtime_env,
textwrap.dedent(
"""
import json
from codex_app_server import Codex, TextInput
with Codex() as codex:
thread = codex.thread_start(
model="gpt-5.4",
config={"model_reasoning_effort": "high"},
)
turn = thread.turn(TextInput("Count from 1 to 200 with commas."))
turn.interrupt()
follow_up = thread.turn(TextInput("Say 'ok' only.")).run()
print(json.dumps({"status": follow_up.status.value}))
"""
),
)
assert data["status"] in {"completed", "failed"}
@pytest.mark.parametrize(("folder", "script"), EXAMPLE_CASES)
def test_real_examples_run_and_assert(
runtime_env: PreparedRuntimeEnv,
folder: str,
script: str,
) -> None:
result = _run_example(runtime_env, folder, script)
assert result.returncode == 0, (
f"Example failed: {folder}/{script}\n"
f"STDOUT:\n{result.stdout}\n"
f"STDERR:\n{result.stderr}"
f"{_runtime_compatibility_hint(runtime_env, stdout=result.stdout, stderr=result.stderr)}"
)
out = result.stdout
if folder == "01_quickstart_constructor":
assert "Status:" in out and "Text:" in out
assert "Server: unknown" not in out
elif folder == "02_turn_run":
assert "thread_id:" in out and "turn_id:" in out and "status:" in out
assert "persisted.items.count:" in out
elif folder == "03_turn_stream_events":
assert "stream.completed:" in out
assert "assistant>" in out
elif folder == "04_models_and_metadata":
assert "server:" in out
assert "models.count:" in out
assert "models:" in out
assert "metadata:" not in out
elif folder == "05_existing_thread":
assert "Created thread:" in out
elif folder == "06_thread_lifecycle_and_controls":
assert "Lifecycle OK:" in out
elif folder in {"07_image_and_text", "08_local_image_and_text"}:
assert "completed" in out.lower() or "Status:" in out
elif folder == "09_async_parity":
assert "Thread:" in out and "Turn:" in out
elif folder == "10_error_handling_and_retry":
assert "Text:" in out
elif folder == "11_cli_mini_app":
assert "Thread:" in out
assert out.count("assistant>") >= 2
assert out.count("assistant.status>") >= 2
assert out.count("usage>") >= 2
elif folder == "12_turn_params_kitchen_sink":
assert "Status:" in out
assert "summary:" in out
assert "actions:" in out
assert "Items:" in out
elif folder == "13_model_select_and_turn_params":
assert "selected.model:" in out and "agent.message.params:" in out and "items.params:" in out
elif folder == "14_turn_controls":
assert "steer.result:" in out and "steer.final.status:" in out
assert "interrupt.result:" in out and "interrupt.final.status:" in out