Add Python SDK public API and examples

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Shaqayeq
2026-03-12 00:39:58 -07:00
parent c0528b9bd9
commit fd4beb8b37
41 changed files with 4124 additions and 32 deletions

View File

@@ -0,0 +1,64 @@
from __future__ import annotations
import asyncio
import time
from codex_app_server.async_client import AsyncAppServerClient
def test_async_client_serializes_transport_calls() -> None:
async def scenario() -> int:
client = AsyncAppServerClient()
active = 0
max_active = 0
def fake_model_list(include_hidden: bool = False) -> bool:
nonlocal active, max_active
active += 1
max_active = max(max_active, active)
time.sleep(0.05)
active -= 1
return include_hidden
client._sync.model_list = fake_model_list # type: ignore[method-assign]
await asyncio.gather(client.model_list(), client.model_list())
return max_active
assert asyncio.run(scenario()) == 1
def test_async_stream_text_is_incremental_and_blocks_parallel_calls() -> None:
async def scenario() -> tuple[str, list[str], bool]:
client = AsyncAppServerClient()
def fake_stream_text(thread_id: str, text: str, params=None): # type: ignore[no-untyped-def]
yield "first"
time.sleep(0.03)
yield "second"
yield "third"
def fake_model_list(include_hidden: bool = False) -> str:
return "done"
client._sync.stream_text = fake_stream_text # type: ignore[method-assign]
client._sync.model_list = fake_model_list # type: ignore[method-assign]
stream = client.stream_text("thread-1", "hello")
first = await anext(stream)
blocked_before_stream_done = False
competing_call = asyncio.create_task(client.model_list())
await asyncio.sleep(0.01)
blocked_before_stream_done = not competing_call.done()
remaining: list[str] = []
async for item in stream:
remaining.append(item)
await competing_call
return first, remaining, blocked_before_stream_done
first, remaining, blocked = asyncio.run(scenario())
assert first == "first"
assert remaining == ["second", "third"]
assert blocked

View File

@@ -0,0 +1,286 @@
from __future__ import annotations
import asyncio
from collections import deque
from pathlib import Path
import pytest
import codex_app_server.public_api as public_api_module
from codex_app_server.client import AppServerClient
from codex_app_server.generated.v2_all import (
AgentMessageDeltaNotification,
RawResponseItemCompletedNotification,
ThreadTokenUsageUpdatedNotification,
)
from codex_app_server.models import InitializeResponse, Notification
from codex_app_server.public_api import AsyncCodex, AsyncTurn, Codex, Turn
from codex_app_server.public_types import TurnStatus
ROOT = Path(__file__).resolve().parents[1]
def _delta_notification(
*,
thread_id: str = "thread-1",
turn_id: str = "turn-1",
text: str = "delta-text",
) -> Notification:
return Notification(
method="item/agentMessage/delta",
payload=AgentMessageDeltaNotification.model_validate(
{
"delta": text,
"itemId": "item-1",
"threadId": thread_id,
"turnId": turn_id,
}
),
)
def _raw_response_notification(
*,
thread_id: str = "thread-1",
turn_id: str = "turn-1",
text: str = "raw-text",
) -> Notification:
return Notification(
method="rawResponseItem/completed",
payload=RawResponseItemCompletedNotification.model_validate(
{
"item": {
"type": "message",
"role": "assistant",
"content": [{"type": "output_text", "text": text}],
},
"threadId": thread_id,
"turnId": turn_id,
}
),
)
def _usage_notification(
*,
thread_id: str = "thread-1",
turn_id: str = "turn-1",
) -> Notification:
return Notification(
method="thread/tokenUsage/updated",
payload=ThreadTokenUsageUpdatedNotification.model_validate(
{
"threadId": thread_id,
"turnId": turn_id,
"tokenUsage": {
"last": {
"cachedInputTokens": 0,
"inputTokens": 1,
"outputTokens": 2,
"reasoningOutputTokens": 0,
"totalTokens": 3,
},
"total": {
"cachedInputTokens": 0,
"inputTokens": 1,
"outputTokens": 2,
"reasoningOutputTokens": 0,
"totalTokens": 3,
},
},
}
),
)
def _completed_notification(
*,
thread_id: str = "thread-1",
turn_id: str = "turn-1",
status: str = "completed",
) -> Notification:
return Notification(
method="turn/completed",
payload=public_api_module.TurnCompletedNotificationPayload.model_validate(
{
"threadId": thread_id,
"turn": {
"id": turn_id,
"items": [],
"status": status,
},
}
),
)
def test_codex_init_failure_closes_client(monkeypatch: pytest.MonkeyPatch) -> None:
closed: list[bool] = []
class FakeClient:
def __init__(self, config=None) -> None: # noqa: ANN001,ARG002
self._closed = False
def start(self) -> None:
return None
def initialize(self) -> InitializeResponse:
return InitializeResponse.model_validate({})
def close(self) -> None:
self._closed = True
closed.append(True)
monkeypatch.setattr(public_api_module, "AppServerClient", FakeClient)
with pytest.raises(RuntimeError, match="missing required metadata"):
Codex()
assert closed == [True]
def test_async_codex_init_failure_closes_client() -> None:
async def scenario() -> None:
codex = AsyncCodex()
close_calls = 0
async def fake_start() -> None:
return None
async def fake_initialize() -> InitializeResponse:
return InitializeResponse.model_validate({})
async def fake_close() -> None:
nonlocal close_calls
close_calls += 1
codex._client.start = fake_start # type: ignore[method-assign]
codex._client.initialize = fake_initialize # type: ignore[method-assign]
codex._client.close = fake_close # type: ignore[method-assign]
with pytest.raises(RuntimeError, match="missing required metadata"):
await codex.models()
assert close_calls == 1
assert codex._initialized is False
assert codex._init is None
asyncio.run(scenario())
def test_async_codex_initializes_only_once_under_concurrency() -> None:
async def scenario() -> None:
codex = AsyncCodex()
start_calls = 0
initialize_calls = 0
ready = asyncio.Event()
async def fake_start() -> None:
nonlocal start_calls
start_calls += 1
async def fake_initialize() -> InitializeResponse:
nonlocal initialize_calls
initialize_calls += 1
ready.set()
await asyncio.sleep(0.02)
return InitializeResponse.model_validate(
{
"userAgent": "codex-cli/1.2.3",
"serverInfo": {"name": "codex-cli", "version": "1.2.3"},
}
)
async def fake_model_list(include_hidden: bool = False): # noqa: ANN202,ARG001
await ready.wait()
return object()
codex._client.start = fake_start # type: ignore[method-assign]
codex._client.initialize = fake_initialize # type: ignore[method-assign]
codex._client.model_list = fake_model_list # type: ignore[method-assign]
await asyncio.gather(codex.models(), codex.models())
assert start_calls == 1
assert initialize_calls == 1
asyncio.run(scenario())
def test_turn_stream_rejects_second_active_consumer() -> None:
client = AppServerClient()
notifications: deque[Notification] = deque(
[
_delta_notification(turn_id="turn-1"),
_completed_notification(turn_id="turn-1"),
]
)
client.next_notification = notifications.popleft # type: ignore[method-assign]
first_stream = Turn(client, "thread-1", "turn-1").stream()
assert next(first_stream).method == "item/agentMessage/delta"
second_stream = Turn(client, "thread-1", "turn-2").stream()
with pytest.raises(RuntimeError, match="Concurrent turn consumers are not yet supported"):
next(second_stream)
first_stream.close()
def test_async_turn_stream_rejects_second_active_consumer() -> None:
async def scenario() -> None:
codex = AsyncCodex()
async def fake_ensure_initialized() -> None:
return None
notifications: deque[Notification] = deque(
[
_delta_notification(turn_id="turn-1"),
_completed_notification(turn_id="turn-1"),
]
)
async def fake_next_notification() -> Notification:
return notifications.popleft()
codex._ensure_initialized = fake_ensure_initialized # type: ignore[method-assign]
codex._client.next_notification = fake_next_notification # type: ignore[method-assign]
first_stream = AsyncTurn(codex, "thread-1", "turn-1").stream()
assert (await anext(first_stream)).method == "item/agentMessage/delta"
second_stream = AsyncTurn(codex, "thread-1", "turn-2").stream()
with pytest.raises(RuntimeError, match="Concurrent turn consumers are not yet supported"):
await anext(second_stream)
await first_stream.aclose()
asyncio.run(scenario())
def test_turn_run_falls_back_to_completed_raw_response_text() -> None:
client = AppServerClient()
notifications: deque[Notification] = deque(
[
_raw_response_notification(text="hello from raw response"),
_usage_notification(),
_completed_notification(),
]
)
client.next_notification = notifications.popleft # type: ignore[method-assign]
result = Turn(client, "thread-1", "turn-1").run()
assert result.status == TurnStatus.completed
assert result.text == "hello from raw response"
def test_retry_examples_compare_status_with_enum() -> None:
for path in (
ROOT / "examples" / "10_error_handling_and_retry" / "sync.py",
ROOT / "examples" / "10_error_handling_and_retry" / "async.py",
):
source = path.read_text()
assert '== "failed"' not in source
assert "TurnStatus.failed" in source

View File

@@ -0,0 +1,211 @@
from __future__ import annotations
import importlib.resources as resources
import inspect
from typing import Any
from codex_app_server import AppServerConfig
from codex_app_server.models import InitializeResponse
from codex_app_server.public_api import AsyncCodex, AsyncThread, Codex, Thread
def _keyword_only_names(fn: object) -> list[str]:
signature = inspect.signature(fn)
return [
param.name
for param in signature.parameters.values()
if param.kind == inspect.Parameter.KEYWORD_ONLY
]
def _assert_no_any_annotations(fn: object) -> None:
signature = inspect.signature(fn)
for param in signature.parameters.values():
if param.annotation is Any:
raise AssertionError(f"{fn} has public parameter typed as Any: {param.name}")
if signature.return_annotation is Any:
raise AssertionError(f"{fn} has public return annotation typed as Any")
def test_root_exports_app_server_config() -> None:
assert AppServerConfig.__name__ == "AppServerConfig"
def test_package_includes_py_typed_marker() -> None:
marker = resources.files("codex_app_server").joinpath("py.typed")
assert marker.is_file()
def test_generated_public_signatures_are_snake_case_and_typed() -> None:
expected = {
Codex.thread_start: [
"approval_policy",
"base_instructions",
"config",
"cwd",
"developer_instructions",
"ephemeral",
"model",
"model_provider",
"personality",
"sandbox",
"service_name",
"service_tier",
],
Codex.thread_list: [
"archived",
"cursor",
"cwd",
"limit",
"model_providers",
"search_term",
"sort_key",
"source_kinds",
],
Codex.thread_resume: [
"approval_policy",
"base_instructions",
"config",
"cwd",
"developer_instructions",
"model",
"model_provider",
"personality",
"sandbox",
"service_tier",
],
Codex.thread_fork: [
"approval_policy",
"base_instructions",
"config",
"cwd",
"developer_instructions",
"ephemeral",
"model",
"model_provider",
"sandbox",
"service_tier",
],
Thread.turn: [
"approval_policy",
"cwd",
"effort",
"model",
"output_schema",
"personality",
"sandbox_policy",
"service_tier",
"summary",
],
AsyncCodex.thread_start: [
"approval_policy",
"base_instructions",
"config",
"cwd",
"developer_instructions",
"ephemeral",
"model",
"model_provider",
"personality",
"sandbox",
"service_name",
"service_tier",
],
AsyncCodex.thread_list: [
"archived",
"cursor",
"cwd",
"limit",
"model_providers",
"search_term",
"sort_key",
"source_kinds",
],
AsyncCodex.thread_resume: [
"approval_policy",
"base_instructions",
"config",
"cwd",
"developer_instructions",
"model",
"model_provider",
"personality",
"sandbox",
"service_tier",
],
AsyncCodex.thread_fork: [
"approval_policy",
"base_instructions",
"config",
"cwd",
"developer_instructions",
"ephemeral",
"model",
"model_provider",
"sandbox",
"service_tier",
],
AsyncThread.turn: [
"approval_policy",
"cwd",
"effort",
"model",
"output_schema",
"personality",
"sandbox_policy",
"service_tier",
"summary",
],
}
for fn, expected_kwargs in expected.items():
actual = _keyword_only_names(fn)
assert actual == expected_kwargs, f"unexpected kwargs for {fn}: {actual}"
assert all(name == name.lower() for name in actual), f"non snake_case kwargs in {fn}: {actual}"
_assert_no_any_annotations(fn)
def test_lifecycle_methods_are_codex_scoped() -> None:
assert hasattr(Codex, "thread_resume")
assert hasattr(Codex, "thread_fork")
assert hasattr(Codex, "thread_archive")
assert hasattr(Codex, "thread_unarchive")
assert hasattr(AsyncCodex, "thread_resume")
assert hasattr(AsyncCodex, "thread_fork")
assert hasattr(AsyncCodex, "thread_archive")
assert hasattr(AsyncCodex, "thread_unarchive")
assert not hasattr(Codex, "thread")
assert not hasattr(AsyncCodex, "thread")
assert not hasattr(Thread, "resume")
assert not hasattr(Thread, "fork")
assert not hasattr(Thread, "archive")
assert not hasattr(Thread, "unarchive")
assert not hasattr(AsyncThread, "resume")
assert not hasattr(AsyncThread, "fork")
assert not hasattr(AsyncThread, "archive")
assert not hasattr(AsyncThread, "unarchive")
for fn in (
Codex.thread_archive,
Codex.thread_unarchive,
AsyncCodex.thread_archive,
AsyncCodex.thread_unarchive,
):
_assert_no_any_annotations(fn)
def test_initialize_metadata_parses_user_agent_shape() -> None:
parsed = Codex._parse_initialize(InitializeResponse.model_validate({"userAgent": "codex-cli/1.2.3"}))
assert parsed.user_agent == "codex-cli/1.2.3"
assert parsed.server_name == "codex-cli"
assert parsed.server_version == "1.2.3"
def test_initialize_metadata_requires_non_empty_information() -> None:
try:
Codex._parse_initialize(InitializeResponse.model_validate({}))
except RuntimeError as exc:
assert "missing required metadata" in str(exc)
else:
raise AssertionError("expected RuntimeError when initialize metadata is missing")

View File

@@ -0,0 +1,237 @@
from __future__ import annotations
import asyncio
import json
import os
import shutil
import subprocess
import sys
import tempfile
from pathlib import Path
import pytest
from codex_app_server import AppServerConfig, AsyncCodex, Codex, TextInput
ROOT = Path(__file__).resolve().parents[1]
EXAMPLES_DIR = ROOT / "examples"
NOTEBOOK_PATH = ROOT / "notebooks" / "sdk_walkthrough.ipynb"
RUN_REAL_CODEX_TESTS = os.environ.get("RUN_REAL_CODEX_TESTS") == "1"
pytestmark = pytest.mark.skipif(
not RUN_REAL_CODEX_TESTS,
reason="set RUN_REAL_CODEX_TESTS=1 to run real Codex integration coverage",
)
# 11_cli_mini_app is interactive; we still run it by feeding '/exit'.
EXAMPLE_CASES: list[tuple[str, str]] = [
("01_quickstart_constructor", "sync.py"),
("01_quickstart_constructor", "async.py"),
("02_turn_run", "sync.py"),
("02_turn_run", "async.py"),
("03_turn_stream_events", "sync.py"),
("03_turn_stream_events", "async.py"),
("04_models_and_metadata", "sync.py"),
("04_models_and_metadata", "async.py"),
("05_existing_thread", "sync.py"),
("05_existing_thread", "async.py"),
("06_thread_lifecycle_and_controls", "sync.py"),
("06_thread_lifecycle_and_controls", "async.py"),
("07_image_and_text", "sync.py"),
("07_image_and_text", "async.py"),
("08_local_image_and_text", "sync.py"),
("08_local_image_and_text", "async.py"),
("09_async_parity", "sync.py"),
# 09_async_parity async path is represented by 01 async + dedicated async-based cases above.
("10_error_handling_and_retry", "sync.py"),
("10_error_handling_and_retry", "async.py"),
("11_cli_mini_app", "sync.py"),
("11_cli_mini_app", "async.py"),
("12_turn_params_kitchen_sink", "sync.py"),
("12_turn_params_kitchen_sink", "async.py"),
("13_model_select_and_turn_params", "sync.py"),
("13_model_select_and_turn_params", "async.py"),
]
def _run_example(
folder: str, script: str, *, timeout_s: int = 150
) -> subprocess.CompletedProcess[str]:
path = EXAMPLES_DIR / folder / script
assert path.exists(), f"Missing example script: {path}"
env = os.environ.copy()
env.setdefault(
"CODEX_PYTHON_SDK_CODEX_BIN",
_real_test_config().codex_bin or "",
)
# Feed '/exit' only to interactive mini-cli examples.
stdin = "/exit\n" if folder == "11_cli_mini_app" else None
return subprocess.run(
[sys.executable, str(path)],
cwd=str(ROOT),
env=env,
input=stdin,
text=True,
capture_output=True,
timeout=timeout_s,
check=False,
)
def _notebook_cell_source(cell_index: int) -> str:
notebook = json.loads(NOTEBOOK_PATH.read_text())
return "".join(notebook["cells"][cell_index]["source"])
def _real_test_config() -> AppServerConfig:
codex_bin = os.environ.get("CODEX_PYTHON_SDK_CODEX_BIN") or shutil.which("codex")
if codex_bin is None:
raise RuntimeError(
"Real SDK integration tests require a Codex CLI binary.\n"
"Set RUN_REAL_CODEX_TESTS=1 and CODEX_PYTHON_SDK_CODEX_BIN=/absolute/path/to/codex, "
"or ensure `codex` is on PATH."
)
return AppServerConfig(codex_bin=codex_bin)
def test_real_initialize_and_model_list():
with Codex(config=_real_test_config()) as codex:
metadata = codex.metadata
assert isinstance(metadata.user_agent, str) and metadata.user_agent.strip()
assert isinstance(metadata.server_name, str) and metadata.server_name.strip()
assert isinstance(metadata.server_version, str) and metadata.server_version.strip()
models = codex.models(include_hidden=True)
assert isinstance(models.data, list)
def test_real_thread_and_turn_start_smoke():
with Codex(config=_real_test_config()) as codex:
thread = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
result = thread.turn(TextInput("hello")).run()
assert isinstance(result.thread_id, str) and result.thread_id.strip()
assert isinstance(result.turn_id, str) and result.turn_id.strip()
assert isinstance(result.items, list)
assert result.usage is not None
assert result.usage.thread_id == result.thread_id
assert result.usage.turn_id == result.turn_id
def test_real_async_thread_turn_usage_and_ids_smoke() -> None:
async def _run() -> None:
async with AsyncCodex(config=_real_test_config()) as codex:
thread = await codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
result = await (await thread.turn(TextInput("say ok"))).run()
assert isinstance(result.thread_id, str) and result.thread_id.strip()
assert isinstance(result.turn_id, str) and result.turn_id.strip()
assert isinstance(result.items, list)
assert result.usage is not None
assert result.usage.thread_id == result.thread_id
assert result.usage.turn_id == result.turn_id
asyncio.run(_run())
def test_notebook_bootstrap_resolves_sdk_from_unrelated_cwd() -> None:
cell_1_source = _notebook_cell_source(1)
env = os.environ.copy()
env["CODEX_PYTHON_SDK_DIR"] = str(ROOT)
with tempfile.TemporaryDirectory() as temp_cwd:
result = subprocess.run(
[sys.executable, "-c", cell_1_source],
cwd=temp_cwd,
env=env,
text=True,
capture_output=True,
timeout=60,
check=False,
)
assert result.returncode == 0, (
f"Notebook bootstrap failed from unrelated cwd.\n"
f"STDOUT:\n{result.stdout}\n"
f"STDERR:\n{result.stderr}"
)
assert "SDK source:" in result.stdout
assert "codex_app_server" in result.stdout or "sdk/python/src" in result.stdout
def test_real_streaming_smoke_turn_completed():
with Codex(config=_real_test_config()) as codex:
thread = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
turn = thread.turn(TextInput("Reply with one short sentence."))
saw_delta = False
saw_completed = False
for evt in turn.stream():
if evt.method == "item/agentMessage/delta":
saw_delta = True
if evt.method == "turn/completed":
saw_completed = True
assert saw_completed
# Some environments can produce zero deltas for very short output;
# this assert keeps the smoke test informative but non-flaky.
assert isinstance(saw_delta, bool)
def test_real_turn_interrupt_smoke():
with Codex(config=_real_test_config()) as codex:
thread = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
turn = thread.turn(TextInput("Count from 1 to 200 with commas."))
# Best effort: interrupting quickly may race with completion on fast models.
_ = turn.interrupt()
# Confirm the session is still usable after interrupt race.
follow_up = thread.turn(TextInput("Say 'ok' only.")).run()
assert follow_up.status.value in {"completed", "failed"}
@pytest.mark.parametrize(("folder", "script"), EXAMPLE_CASES)
def test_real_examples_run_and_assert(folder: str, script: str):
result = _run_example(folder, script)
assert result.returncode == 0, (
f"Example failed: {folder}/{script}\n"
f"STDOUT:\n{result.stdout}\n"
f"STDERR:\n{result.stderr}"
)
out = result.stdout
# Minimal content assertions so we validate behavior, not just exit code.
if folder == "01_quickstart_constructor":
assert "Status:" in out and "Text:" in out
assert "Server: None None" not in out
elif folder == "02_turn_run":
assert "thread_id:" in out and "turn_id:" in out and "status:" in out
assert "usage: None" not in out
elif folder == "03_turn_stream_events":
assert "turn/completed" in out
elif folder == "04_models_and_metadata":
assert "models.count:" in out
assert "server_name=None" not in out
assert "server_version=None" not in out
elif folder == "05_existing_thread":
assert "Created thread:" in out
elif folder == "06_thread_lifecycle_and_controls":
assert "Lifecycle OK:" in out
elif folder in {"07_image_and_text", "08_local_image_and_text"}:
assert "completed" in out.lower() or "Status:" in out
elif folder == "09_async_parity":
assert "Thread:" in out and "Turn:" in out
elif folder == "10_error_handling_and_retry":
assert "Text:" in out
elif folder == "11_cli_mini_app":
assert "Thread:" in out
elif folder == "12_turn_params_kitchen_sink":
assert "Status:" in out and "Usage:" in out
elif folder == "13_model_select_and_turn_params":
assert "selected.model:" in out and "agent.message.params:" in out and "usage.params:" in out
assert "usage.params: None" not in out