Compare commits

...

2 Commits

Author SHA1 Message Date
Shaqayeq
db89d8d749 2026-03-10 Rebase python SDK public API on latest main 2026-03-10 01:12:23 -07:00
Shaqayeq
6baeec68bd python-sdk: generated type foundation (all v2 schemas) (#13953)
## Summary
Foundation PR only (base for PR #3).

This PR contains the SDK runtime foundation and generated artifacts:

- pinned runtime binary in `sdk/python/bin/` (`codex` or `codex.exe` by
platform)
- single maintenance script:
`sdk/python/scripts/update_sdk_artifacts.py`
- generated protocol/types artifacts under:
  - `sdk/python/src/codex_app_server/generated/protocol_types.py`
  - `sdk/python/src/codex_app_server/generated/schema_types.py`
  - `sdk/python/src/codex_app_server/generated/v2_all/*`
- generation-contract test wiring (`tests/test_contract_generation.py`)

## Release asset behavior
`update_sdk_artifacts.py` now:
- selects latest release by channel (`--channel stable|alpha`)
- resolves the correct asset for current OS/arch
- extracts platform binary (`codex` on macOS/Linux, `codex.exe` on
Windows)
- keeps runtime on single pinned binary source in `sdk/python/bin/`

## Scope boundary
-  PR #2 = binary + generation pipeline + generated types foundation
-  PR #2 does **not** include examples/integration logic polish (that
is PR #3)

## Validation
- Ran: `python scripts/update_sdk_artifacts.py --channel stable`
- Regenerated and committed resulting generated artifacts
- Local tests pass on branch
2026-03-10 01:00:46 -07:00
62 changed files with 14521 additions and 0 deletions

146
sdk/python/README.md Normal file
View File

@@ -0,0 +1,146 @@
# Codex App Server Python SDK (Experimental)
Experimental Python SDK for `codex app-server` JSON-RPC v2.
The generated wire models come from the bundled v2 schema and use snake_case Python fields while preserving camelCase wire serialization.
It gives you a small typed API for:
- starting or resuming threads
- creating turns from Python
- streaming events or waiting for a final `TurnResult`
- using the same shape in sync and async code
## Experimental
This SDK is still experimental.
- it is not published yet
- API details may still change before the first release
- packaging and release workflow are still evolving
Use it for local development, dogfooding, and iteration inside this repo. Do not treat it as a stable public package yet.
## What You Need
- Python `>=3.10`
- local Codex auth/session already configured
- this repo checked out locally
## Install From Source
```bash
cd sdk/python
python -m pip install -e .
```
The package includes bundled Codex runtime binaries and automatically selects the binary for the current platform through `AppServerConfig().codex_bin`.
## Core Model
The public API is intentionally small:
- `Codex` / `AsyncCodex`: session entrypoint
- `Thread` / `AsyncThread`: a conversation thread
- `Turn` / `AsyncTurn`: one user turn within a thread
- `TurnResult`: final status, text, items, and usage
Typical flow:
1. create a `Codex` client
2. start or resume a thread
3. create a turn from input
4. call `run()` or iterate `stream()`
## Quickstart
### Sync
```python
from codex_app_server import Codex, TextInput
with Codex() as codex:
thread = codex.thread_start(
model="gpt-5",
config={"model_reasoning_effort": "high"},
)
result = thread.turn(TextInput("Say hello in one sentence.")).run()
print("status:", result.status)
print("text:", result.text)
```
### Async
```python
import asyncio
from codex_app_server import AsyncCodex, TextInput
async def main() -> None:
async with AsyncCodex() as codex:
thread = await codex.thread_start(
model="gpt-5",
config={"model_reasoning_effort": "high"},
)
turn = await thread.turn(TextInput("Say hello in one sentence."))
result = await turn.run()
print("status:", result.status)
print("text:", result.text)
asyncio.run(main())
```
## Current Limitations
- Only one active `Turn.stream()` or `Turn.run()` consumer is supported per client instance.
- Starting a second active turn consumer on the same `Codex` or `AsyncCodex` raises `RuntimeError`.
- `Codex()` is eager and performs startup plus `initialize` in the constructor.
## Behavior Notes
- `AsyncCodex` is intended to be used with `async with AsyncCodex() as codex:`.
- `TurnResult.text` prefers streamed assistant deltas and falls back to completed raw response items when no deltas are emitted.
- For transient overload handling, use `retry_on_overload(...)`.
## Learn By Example
Runnable examples:
```bash
cd sdk/python
python examples/01_quickstart_constructor/sync.py
python examples/01_quickstart_constructor/async.py
```
More docs:
- Getting started: `docs/getting-started.md`
- API reference: `docs/api-reference.md`
- FAQ and pitfalls: `docs/faq.md`
- Examples index: `examples/README.md`
- Notebook walkthrough: `notebooks/sdk_walkthrough.ipynb`
## Maintainer Workflow
Refresh bundled binaries and generated artifacts with:
```bash
cd sdk/python
python scripts/update_sdk_artifacts.py --channel stable --bundle-all-platforms
```
or:
```bash
cd sdk/python
python scripts/update_sdk_artifacts.py --channel alpha --bundle-all-platforms
```
## Compatibility
- Package name: `codex-app-server-sdk`
- SDK version in this repo: `0.2.0`
- Target protocol: Codex `app-server` JSON-RPC v2

View File

@@ -0,0 +1,180 @@
# Codex App Server SDK — API Reference
Public surface of `codex_app_server` for app-server v2.
This SDK surface is experimental. The current implementation intentionally allows only one active `Turn.stream()` or `Turn.run()` consumer per client instance at a time.
## Package Entry
```python
from codex_app_server import (
Codex,
AsyncCodex,
Thread,
AsyncThread,
Turn,
AsyncTurn,
TurnResult,
InitializeResult,
Input,
InputItem,
TextInput,
ImageInput,
LocalImageInput,
SkillInput,
MentionInput,
ThreadItem,
TurnStatus,
)
```
- Version: `codex_app_server.__version__`
- Requires Python >= 3.10
## Codex (sync)
```python
Codex(config: AppServerConfig | None = None)
```
Properties/methods:
- `metadata -> InitializeResult`
- `close() -> None`
- `thread_start(*, approval_policy=None, base_instructions=None, config=None, cwd=None, developer_instructions=None, ephemeral=None, model=None, model_provider=None, personality=None, sandbox=None) -> Thread`
- `thread_list(*, archived=None, cursor=None, cwd=None, limit=None, model_providers=None, sort_key=None, source_kinds=None) -> ThreadListResponse`
- `thread_resume(thread_id: str, *, approval_policy=None, base_instructions=None, config=None, cwd=None, developer_instructions=None, model=None, model_provider=None, personality=None, sandbox=None) -> Thread`
- `thread_fork(thread_id: str, *, approval_policy=None, base_instructions=None, config=None, cwd=None, developer_instructions=None, model=None, model_provider=None, sandbox=None) -> Thread`
- `thread_archive(thread_id: str) -> ThreadArchiveResponse`
- `thread_unarchive(thread_id: str) -> Thread`
- `models(*, include_hidden: bool = False) -> ModelListResponse`
Context manager:
```python
with Codex() as codex:
...
```
## AsyncCodex (async parity)
```python
AsyncCodex(config: AppServerConfig | None = None)
```
Properties/methods:
- `metadata -> InitializeResult`
- `close() -> Awaitable[None]`
- `thread_start(*, approval_policy=None, base_instructions=None, config=None, cwd=None, developer_instructions=None, ephemeral=None, model=None, model_provider=None, personality=None, sandbox=None) -> Awaitable[AsyncThread]`
- `thread_list(*, archived=None, cursor=None, cwd=None, limit=None, model_providers=None, sort_key=None, source_kinds=None) -> Awaitable[ThreadListResponse]`
- `thread_resume(thread_id: str, *, approval_policy=None, base_instructions=None, config=None, cwd=None, developer_instructions=None, model=None, model_provider=None, personality=None, sandbox=None) -> Awaitable[AsyncThread]`
- `thread_fork(thread_id: str, *, approval_policy=None, base_instructions=None, config=None, cwd=None, developer_instructions=None, model=None, model_provider=None, sandbox=None) -> Awaitable[AsyncThread]`
- `thread_archive(thread_id: str) -> Awaitable[ThreadArchiveResponse]`
- `thread_unarchive(thread_id: str) -> Awaitable[AsyncThread]`
- `models(*, include_hidden: bool = False) -> Awaitable[ModelListResponse]`
Async context manager:
```python
async with AsyncCodex() as codex:
...
```
## Thread / AsyncThread
`Thread` and `AsyncThread` share the same shape and intent.
### Thread
- `turn(input: Input, *, approval_policy=None, cwd=None, effort=None, model=None, output_schema=None, personality=None, sandbox_policy=None, summary=None) -> Turn`
- `read(*, include_turns: bool = False) -> ThreadReadResponse`
- `set_name(name: str) -> ThreadSetNameResponse`
- `compact() -> ThreadCompactStartResponse`
### AsyncThread
- `turn(input: Input, *, approval_policy=None, cwd=None, effort=None, model=None, output_schema=None, personality=None, sandbox_policy=None, summary=None) -> Awaitable[AsyncTurn]`
- `read(*, include_turns: bool = False) -> Awaitable[ThreadReadResponse]`
- `set_name(name: str) -> Awaitable[ThreadSetNameResponse]`
- `compact() -> Awaitable[ThreadCompactStartResponse]`
## Turn / AsyncTurn
### Turn
- `steer(input: Input) -> TurnSteerResponse`
- `interrupt() -> TurnInterruptResponse`
- `stream() -> Iterator[Notification]`
- `run() -> TurnResult`
Behavior notes:
- `stream()` and `run()` are exclusive per client instance in the current experimental build
- starting a second turn consumer on the same `Codex` instance raises `RuntimeError`
### AsyncTurn
- `steer(input: Input) -> Awaitable[TurnSteerResponse]`
- `interrupt() -> Awaitable[TurnInterruptResponse]`
- `stream() -> AsyncIterator[Notification]`
- `run() -> Awaitable[TurnResult]`
Behavior notes:
- `stream()` and `run()` are exclusive per client instance in the current experimental build
- starting a second turn consumer on the same `AsyncCodex` instance raises `RuntimeError`
## TurnResult
```python
@dataclass
class TurnResult:
thread_id: str
turn_id: str
status: TurnStatus
error: TurnError | None
text: str
items: list[ThreadItem]
usage: ThreadTokenUsageUpdatedNotification | None
```
## Inputs
```python
@dataclass class TextInput: text: str
@dataclass class ImageInput: url: str
@dataclass class LocalImageInput: path: str
@dataclass class SkillInput: name: str; path: str
@dataclass class MentionInput: name: str; path: str
InputItem = TextInput | ImageInput | LocalImageInput | SkillInput | MentionInput
Input = list[InputItem] | InputItem
```
## Retry + errors
```python
from codex_app_server import (
retry_on_overload,
JsonRpcError,
MethodNotFoundError,
InvalidParamsError,
ServerBusyError,
is_retryable_error,
)
```
- `retry_on_overload(...)` retries transient overload errors with exponential backoff + jitter.
- `is_retryable_error(exc)` checks if an exception is transient/overload-like.
## Example
```python
from codex_app_server import Codex, TextInput
with Codex() as codex:
thread = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
result = thread.turn(TextInput("Say hello in one sentence.")).run()
print(result.text)
```

83
sdk/python/docs/faq.md Normal file
View File

@@ -0,0 +1,83 @@
# FAQ
## Thread vs turn
- A `Thread` is conversation state.
- A `Turn` is one model execution inside that thread.
- Multi-turn chat means multiple turns on the same `Thread`.
## `run()` vs `stream()`
- `Turn.run()` / `AsyncTurn.run()` is the easiest path. It consumes events until completion and returns `TurnResult`.
- `Turn.stream()` / `AsyncTurn.stream()` yields raw notifications (`Notification`) so you can react event-by-event.
Choose `run()` for most apps. Choose `stream()` for progress UIs, custom timeout logic, or custom parsing.
## Sync vs async clients
- `Codex` is the sync public API.
- `AsyncCodex` is an async replica of the same public API shape.
If your app is not already async, stay with `Codex`.
## Public kwargs are snake_case
Public API keyword names are snake_case. The SDK still maps them to wire camelCase under the hood.
If you are migrating older code, update these names:
- `approvalPolicy` -> `approval_policy`
- `baseInstructions` -> `base_instructions`
- `developerInstructions` -> `developer_instructions`
- `modelProvider` -> `model_provider`
- `modelProviders` -> `model_providers`
- `sortKey` -> `sort_key`
- `sourceKinds` -> `source_kinds`
- `outputSchema` -> `output_schema`
- `sandboxPolicy` -> `sandbox_policy`
## Why only `thread_start(...)` and `thread_resume(...)`?
The public API keeps only explicit lifecycle calls:
- `thread_start(...)` to create new threads
- `thread_resume(thread_id, ...)` to continue existing threads
This avoids duplicate ways to do the same operation and keeps behavior explicit.
## Why does constructor fail?
`Codex()` is eager: it starts transport and calls `initialize` in `__init__`.
Common causes:
- bundled runtime binary missing for your OS/arch under `src/codex_app_server/bin/*`
- local auth/session is missing
- incompatible/old app-server
Maintainers can refresh bundled binaries with:
```bash
cd sdk/python
python scripts/update_sdk_artifacts.py --channel stable --bundle-all-platforms
```
## Why does a turn "hang"?
A turn is complete only when `turn/completed` arrives for that turn ID.
- `run()` waits for this automatically.
- With `stream()`, keep consuming notifications until completion.
## How do I retry safely?
Use `retry_on_overload(...)` for transient overload failures (`ServerBusyError`).
Do not blindly retry all errors. For `InvalidParamsError` or `MethodNotFoundError`, fix inputs/version compatibility instead.
## Common pitfalls
- Starting a new thread for every prompt when you wanted continuity.
- Forgetting to `close()` (or not using context managers).
- Ignoring `TurnResult.status` and `TurnResult.error`.
- Mixing SDK input classes with raw dicts incorrectly.

View File

@@ -0,0 +1,96 @@
# Getting Started
This is the fastest path from install to a multi-turn thread using the public SDK surface.
The SDK is experimental. Treat the API, bundled runtime strategy, and packaging details as unstable until the first public release.
## 1) Install
From repo root:
```bash
cd sdk/python
python -m pip install -e .
```
Requirements:
- Python `>=3.10`
- bundled runtime binary for your platform (shipped in package)
- local Codex auth/session configured
## 2) Run your first turn (sync)
```python
from codex_app_server import Codex, TextInput
with Codex() as codex:
print("Server:", codex.metadata.server_name, codex.metadata.server_version)
thread = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
result = thread.turn(TextInput("Say hello in one sentence.")).run()
print("Thread:", result.thread_id)
print("Turn:", result.turn_id)
print("Status:", result.status)
print("Text:", result.text)
```
What happened:
- `Codex()` started and initialized `codex app-server`.
- `thread_start(...)` created a thread.
- `turn(...).run()` consumed events until `turn/completed` and returned a `TurnResult`.
- one client can have only one active `Turn.stream()` / `Turn.run()` consumer at a time in the current experimental build
## 3) Continue the same thread (multi-turn)
```python
from codex_app_server import Codex, TextInput
with Codex() as codex:
thread = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
first = thread.turn(TextInput("Summarize Rust ownership in 2 bullets.")).run()
second = thread.turn(TextInput("Now explain it to a Python developer.")).run()
print("first:", first.text)
print("second:", second.text)
```
## 4) Async parity
```python
import asyncio
from codex_app_server import AsyncCodex, TextInput
async def main() -> None:
async with AsyncCodex() as codex:
thread = await codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
turn = await thread.turn(TextInput("Continue where we left off."))
result = await turn.run()
print(result.text)
asyncio.run(main())
```
## 5) Resume an existing thread
```python
from codex_app_server import Codex, TextInput
THREAD_ID = "thr_123" # replace with a real id
with Codex() as codex:
thread = codex.thread_resume(THREAD_ID)
result = thread.turn(TextInput("Continue where we left off.")).run()
print(result.text)
```
## 6) Next stops
- API surface and signatures: `docs/api-reference.md`
- Common decisions/pitfalls: `docs/faq.md`
- End-to-end runnable examples: `examples/README.md`

View File

@@ -0,0 +1,30 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
import asyncio
from codex_app_server import AsyncCodex, TextInput
async def main() -> None:
async with AsyncCodex() as codex:
print("Server:", codex.metadata.server_name, codex.metadata.server_version)
thread = await codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
turn = await thread.turn(TextInput("Say hello in one sentence."))
result = await turn.run()
print("Status:", result.status)
print("Text:", result.text)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,20 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
from codex_app_server import Codex, TextInput
with Codex() as codex:
print("Server:", codex.metadata.server_name, codex.metadata.server_version)
thread = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
result = thread.turn(TextInput("Say hello in one sentence.")).run()
print("Status:", result.status)
print("Text:", result.text)

View File

@@ -0,0 +1,37 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
import asyncio
from codex_app_server import AsyncCodex, TextInput
async def main() -> None:
async with AsyncCodex() as codex:
thread = await codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
turn = await thread.turn(TextInput("Give 3 bullets about SIMD."))
result = await turn.run()
print("thread_id:", result.thread_id)
print("turn_id:", result.turn_id)
print("status:", result.status)
if result.error is not None:
print("error:", result.error)
print("text:", result.text)
print("items.count:", len(result.items))
if result.usage is None:
raise RuntimeError("missing usage for completed turn")
print("usage.thread_id:", result.usage.thread_id)
print("usage.turn_id:", result.usage.turn_id)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,28 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
from codex_app_server import Codex, TextInput
with Codex() as codex:
thread = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
result = thread.turn(TextInput("Give 3 bullets about SIMD.")).run()
print("thread_id:", result.thread_id)
print("turn_id:", result.turn_id)
print("status:", result.status)
if result.error is not None:
print("error:", result.error)
print("text:", result.text)
print("items.count:", len(result.items))
if result.usage is None:
raise RuntimeError("missing usage for completed turn")
print("usage.thread_id:", result.usage.thread_id)
print("usage.turn_id:", result.usage.turn_id)

View File

@@ -0,0 +1,44 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
import asyncio
from codex_app_server import AsyncCodex, TextInput
async def main() -> None:
async with AsyncCodex() as codex:
thread = await codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
turn = await thread.turn(TextInput("Count from 1 to 200 with commas, then one summary sentence."))
# Best effort controls: models can finish quickly, so races are expected.
try:
_ = await turn.steer(TextInput("Keep it brief and stop after 20 numbers."))
print("steer: sent")
except Exception as exc:
print("steer: skipped", type(exc).__name__)
try:
_ = await turn.interrupt()
print("interrupt: sent")
except Exception as exc:
print("interrupt: skipped", type(exc).__name__)
event_count = 0
async for event in turn.stream():
event_count += 1
print(event.method, event.payload)
print("events.count:", event_count)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,36 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
from codex_app_server import Codex, TextInput
with Codex() as codex:
thread = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
turn = thread.turn(TextInput("Count from 1 to 200 with commas, then one summary sentence."))
# Best effort controls: models can finish quickly, so races are expected.
try:
_ = turn.steer(TextInput("Keep it brief and stop after 20 numbers."))
print("steer: sent")
except Exception as exc:
print("steer: skipped", type(exc).__name__)
try:
_ = turn.interrupt()
print("interrupt: sent")
except Exception as exc:
print("interrupt: skipped", type(exc).__name__)
event_count = 0
for event in turn.stream():
event_count += 1
print(event.method, event.payload)
print("events.count:", event_count)

View File

@@ -0,0 +1,28 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
import asyncio
from codex_app_server import AsyncCodex
async def main() -> None:
async with AsyncCodex() as codex:
print("metadata:", codex.metadata)
models = await codex.models(include_hidden=True)
print("models.count:", len(models.data))
if models.data:
print("first model id:", models.data[0].id)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,20 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
from codex_app_server import Codex
with Codex() as codex:
print("metadata:", codex.metadata)
models = codex.models()
print("models.count:", len(models.data))
if models.data:
print("first model id:", models.data[0].id)

View File

@@ -0,0 +1,32 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
import asyncio
from codex_app_server import AsyncCodex, TextInput
async def main() -> None:
async with AsyncCodex() as codex:
original = await codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
first_turn = await original.turn(TextInput("Tell me one fact about Saturn."))
first = await first_turn.run()
print("Created thread:", first.thread_id)
resumed = await codex.thread_resume(first.thread_id)
second_turn = await resumed.turn(TextInput("Continue with one more fact."))
second = await second_turn.run()
print(second.text)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,23 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
from codex_app_server import Codex, TextInput
with Codex() as codex:
# Create an initial thread and turn so we have a real thread to resume.
original = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
first = original.turn(TextInput("Tell me one fact about Saturn.")).run()
print("Created thread:", first.thread_id)
# Resume the existing thread by ID.
resumed = codex.thread_resume(first.thread_id)
second = resumed.turn(TextInput("Continue with one more fact.")).run()
print(second.text)

View File

@@ -0,0 +1,70 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
import asyncio
from codex_app_server import AsyncCodex, TextInput
async def main() -> None:
async with AsyncCodex() as codex:
thread = await codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
first = await (await thread.turn(TextInput("One sentence about structured planning."))).run()
second = await (await thread.turn(TextInput("Now restate it for a junior engineer."))).run()
reopened = await codex.thread_resume(thread.id)
listing_active = await codex.thread_list(limit=20, archived=False)
reading = await reopened.read(include_turns=True)
_ = await reopened.set_name("sdk-lifecycle-demo")
_ = await codex.thread_archive(reopened.id)
listing_archived = await codex.thread_list(limit=20, archived=True)
unarchived = await codex.thread_unarchive(reopened.id)
resumed_info = "n/a"
try:
resumed = await codex.thread_resume(
unarchived.id,
model="gpt-5",
config={"model_reasoning_effort": "high"},
)
resumed_result = await (await resumed.turn(TextInput("Continue in one short sentence."))).run()
resumed_info = f"{resumed_result.turn_id} {resumed_result.status}"
except Exception as exc:
resumed_info = f"skipped({type(exc).__name__})"
forked_info = "n/a"
try:
forked = await codex.thread_fork(unarchived.id, model="gpt-5")
forked_result = await (await forked.turn(TextInput("Take a different angle in one short sentence."))).run()
forked_info = f"{forked_result.turn_id} {forked_result.status}"
except Exception as exc:
forked_info = f"skipped({type(exc).__name__})"
compact_info = "sent"
try:
_ = await unarchived.compact()
except Exception as exc:
compact_info = f"skipped({type(exc).__name__})"
print("Lifecycle OK:", thread.id)
print("first:", first.turn_id, first.status)
print("second:", second.turn_id, second.status)
print("read.turns:", len(reading.thread.turns or []))
print("list.active:", len(listing_active.data))
print("list.archived:", len(listing_archived.data))
print("resumed:", resumed_info)
print("forked:", forked_info)
print("compact:", compact_info)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,63 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
from codex_app_server import Codex, TextInput
with Codex() as codex:
thread = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
first = thread.turn(TextInput("One sentence about structured planning.")).run()
second = thread.turn(TextInput("Now restate it for a junior engineer.")).run()
reopened = codex.thread_resume(thread.id)
listing_active = codex.thread_list(limit=20, archived=False)
reading = reopened.read(include_turns=True)
_ = reopened.set_name("sdk-lifecycle-demo")
_ = codex.thread_archive(reopened.id)
listing_archived = codex.thread_list(limit=20, archived=True)
unarchived = codex.thread_unarchive(reopened.id)
resumed_info = "n/a"
try:
resumed = codex.thread_resume(
unarchived.id,
model="gpt-5",
config={"model_reasoning_effort": "high"},
)
resumed_result = resumed.turn(TextInput("Continue in one short sentence.")).run()
resumed_info = f"{resumed_result.turn_id} {resumed_result.status}"
except Exception as exc:
resumed_info = f"skipped({type(exc).__name__})"
forked_info = "n/a"
try:
forked = codex.thread_fork(unarchived.id, model="gpt-5")
forked_result = forked.turn(TextInput("Take a different angle in one short sentence.")).run()
forked_info = f"{forked_result.turn_id} {forked_result.status}"
except Exception as exc:
forked_info = f"skipped({type(exc).__name__})"
compact_info = "sent"
try:
_ = unarchived.compact()
except Exception as exc:
compact_info = f"skipped({type(exc).__name__})"
print("Lifecycle OK:", thread.id)
print("first:", first.turn_id, first.status)
print("second:", second.turn_id, second.status)
print("read.turns:", len(reading.thread.turns or []))
print("list.active:", len(listing_active.data))
print("list.archived:", len(listing_archived.data))
print("resumed:", resumed_info)
print("forked:", forked_info)
print("compact:", compact_info)

View File

@@ -0,0 +1,35 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
import asyncio
from codex_app_server import AsyncCodex, ImageInput, TextInput
REMOTE_IMAGE_URL = "https://raw.githubusercontent.com/github/explore/main/topics/python/python.png"
async def main() -> None:
async with AsyncCodex() as codex:
thread = await codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
turn = await thread.turn(
[
TextInput("What is in this image? Give 3 bullets."),
ImageInput(REMOTE_IMAGE_URL),
]
)
result = await turn.run()
print("Status:", result.status)
print(result.text)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,26 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
from codex_app_server import Codex, ImageInput, TextInput
REMOTE_IMAGE_URL = "https://raw.githubusercontent.com/github/explore/main/topics/python/python.png"
with Codex() as codex:
thread = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
result = thread.turn(
[
TextInput("What is in this image? Give 3 bullets."),
ImageInput(REMOTE_IMAGE_URL),
]
).run()
print("Status:", result.status)
print(result.text)

View File

@@ -0,0 +1,38 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
import asyncio
from codex_app_server import AsyncCodex, LocalImageInput, TextInput
IMAGE_PATH = Path(__file__).resolve().parents[1] / "assets" / "sample_scene.png"
if not IMAGE_PATH.exists():
raise FileNotFoundError(f"Missing bundled image: {IMAGE_PATH}")
async def main() -> None:
async with AsyncCodex() as codex:
thread = await codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
turn = await thread.turn(
[
TextInput("Read this local image and summarize what you see in 2 bullets."),
LocalImageInput(str(IMAGE_PATH.resolve())),
]
)
result = await turn.run()
print("Status:", result.status)
print(result.text)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,29 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
from codex_app_server import Codex, LocalImageInput, TextInput
IMAGE_PATH = Path(__file__).resolve().parents[1] / "assets" / "sample_scene.png"
if not IMAGE_PATH.exists():
raise FileNotFoundError(f"Missing bundled image: {IMAGE_PATH}")
with Codex() as codex:
thread = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
result = thread.turn(
[
TextInput("Read this local image and summarize what you see in 2 bullets."),
LocalImageInput(str(IMAGE_PATH.resolve())),
]
).run()
print("Status:", result.status)
print(result.text)

View File

@@ -0,0 +1,23 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
from codex_app_server import Codex, TextInput
with Codex() as codex:
print("Server:", codex.metadata.server_name, codex.metadata.server_version)
thread = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
turn = thread.turn(TextInput("Say hello in one sentence."))
result = turn.run()
print("Thread:", result.thread_id)
print("Turn:", result.turn_id)
print("Text:", result.text.strip())

View File

@@ -0,0 +1,91 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
import asyncio
import random
from collections.abc import Awaitable, Callable
from typing import TypeVar
from codex_app_server import (
AsyncCodex,
JsonRpcError,
ServerBusyError,
TextInput,
TurnStatus,
is_retryable_error,
)
ResultT = TypeVar("ResultT")
async def retry_on_overload_async(
op: Callable[[], Awaitable[ResultT]],
*,
max_attempts: int = 3,
initial_delay_s: float = 0.25,
max_delay_s: float = 2.0,
jitter_ratio: float = 0.2,
) -> ResultT:
if max_attempts < 1:
raise ValueError("max_attempts must be >= 1")
delay = initial_delay_s
attempt = 0
while True:
attempt += 1
try:
return await op()
except Exception as exc: # noqa: BLE001
if attempt >= max_attempts or not is_retryable_error(exc):
raise
jitter = delay * jitter_ratio
sleep_for = min(max_delay_s, delay) + random.uniform(-jitter, jitter)
if sleep_for > 0:
await asyncio.sleep(sleep_for)
delay = min(max_delay_s, delay * 2)
async def main() -> None:
async with AsyncCodex() as codex:
thread = await codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
try:
result = await retry_on_overload_async(
_run_turn(thread, "Summarize retry best practices in 3 bullets."),
max_attempts=3,
initial_delay_s=0.25,
max_delay_s=2.0,
)
except ServerBusyError as exc:
print("Server overloaded after retries:", exc.message)
print("Text:")
return
except JsonRpcError as exc:
print(f"JSON-RPC error {exc.code}: {exc.message}")
print("Text:")
return
if result.status == TurnStatus.failed:
print("Turn failed:", result.error)
print("Text:", result.text)
def _run_turn(thread, prompt: str):
async def _inner():
turn = await thread.turn(TextInput(prompt))
return await turn.run()
return _inner
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,40 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
from codex_app_server import (
Codex,
JsonRpcError,
ServerBusyError,
TextInput,
TurnStatus,
retry_on_overload,
)
with Codex() as codex:
thread = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
try:
result = retry_on_overload(
lambda: thread.turn(TextInput("Summarize retry best practices in 3 bullets.")).run(),
max_attempts=3,
initial_delay_s=0.25,
max_delay_s=2.0,
)
except ServerBusyError as exc:
print("Server overloaded after retries:", exc.message)
print("Text:")
except JsonRpcError as exc:
print(f"JSON-RPC error {exc.code}: {exc.message}")
print("Text:")
else:
if result.status == TurnStatus.failed:
print("Turn failed:", result.error)
print("Text:", result.text)

View File

@@ -0,0 +1,96 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
import asyncio
from codex_app_server import (
AsyncCodex,
TextInput,
ThreadTokenUsageUpdatedNotification,
TurnCompletedNotificationPayload,
)
def _status_value(status: object | None) -> str:
return str(getattr(status, "value", status))
def _format_usage(usage: object | None) -> str:
if usage is None:
return "usage> (none)"
last = getattr(usage, "last", None)
total = getattr(usage, "total", None)
if last is None or total is None:
return f"usage> {usage}"
return (
"usage>\n"
f" last: input={last.inputTokens} output={last.outputTokens} reasoning={last.reasoningOutputTokens} total={last.totalTokens} cached={last.cachedInputTokens}\n"
f" total: input={total.inputTokens} output={total.outputTokens} reasoning={total.reasoningOutputTokens} total={total.totalTokens} cached={total.cachedInputTokens}"
)
async def main() -> None:
print("Codex async mini CLI. Type /exit to quit.")
async with AsyncCodex() as codex:
thread = await codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
print("Thread:", thread.id)
while True:
try:
user_input = (await asyncio.to_thread(input, "you> ")).strip()
except EOFError:
break
if not user_input:
continue
if user_input in {"/exit", "/quit"}:
break
turn = await thread.turn(TextInput(user_input))
usage = None
status = None
error = None
printed_delta = False
print("assistant> ", end="", flush=True)
async for event in turn.stream():
payload = event.payload
if event.method == "item/agentMessage/delta":
delta = getattr(payload, "delta", "")
if delta:
print(delta, end="", flush=True)
printed_delta = True
continue
if isinstance(payload, ThreadTokenUsageUpdatedNotification):
usage = payload.token_usage
continue
if isinstance(payload, TurnCompletedNotificationPayload):
status = payload.turn.status
error = payload.turn.error
if printed_delta:
print()
else:
print("[no text]")
status_text = _status_value(status)
print(f"assistant.status> {status_text}")
if status_text == "failed":
print("assistant.error>", error)
print(_format_usage(usage))
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,89 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
from codex_app_server import (
Codex,
TextInput,
ThreadTokenUsageUpdatedNotification,
TurnCompletedNotificationPayload,
)
print("Codex mini CLI. Type /exit to quit.")
def _status_value(status: object | None) -> str:
return str(getattr(status, "value", status))
def _format_usage(usage: object | None) -> str:
if usage is None:
return "usage> (none)"
last = getattr(usage, "last", None)
total = getattr(usage, "total", None)
if last is None or total is None:
return f"usage> {usage}"
return (
"usage>\n"
f" last: input={last.inputTokens} output={last.outputTokens} reasoning={last.reasoningOutputTokens} total={last.totalTokens} cached={last.cachedInputTokens}\n"
f" total: input={total.inputTokens} output={total.outputTokens} reasoning={total.reasoningOutputTokens} total={total.totalTokens} cached={total.cachedInputTokens}"
)
with Codex() as codex:
thread = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
print("Thread:", thread.id)
while True:
try:
user_input = input("you> ").strip()
except EOFError:
break
if not user_input:
continue
if user_input in {"/exit", "/quit"}:
break
turn = thread.turn(TextInput(user_input))
usage = None
status = None
error = None
printed_delta = False
print("assistant> ", end="", flush=True)
for event in turn.stream():
payload = event.payload
if event.method == "item/agentMessage/delta":
delta = getattr(payload, "delta", "")
if delta:
print(delta, end="", flush=True)
printed_delta = True
continue
if isinstance(payload, ThreadTokenUsageUpdatedNotification):
usage = payload.token_usage
continue
if isinstance(payload, TurnCompletedNotificationPayload):
status = payload.turn.status
error = payload.turn.error
if printed_delta:
print()
else:
print("[no text]")
status_text = _status_value(status)
print(f"assistant.status> {status_text}")
if status_text == "failed":
print("assistant.error>", error)
print(_format_usage(usage))

View File

@@ -0,0 +1,75 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
import asyncio
from codex_app_server import (
AskForApproval,
AsyncCodex,
Personality,
ReasoningEffort,
ReasoningSummary,
SandboxPolicy,
TextInput,
)
OUTPUT_SCHEMA = {
"type": "object",
"properties": {
"summary": {"type": "string"},
"actions": {
"type": "array",
"items": {"type": "string"},
},
},
"required": ["summary", "actions"],
"additionalProperties": False,
}
SANDBOX_POLICY = SandboxPolicy.model_validate(
{
"type": "readOnly",
"access": {"type": "fullAccess"},
}
)
SUMMARY = ReasoningSummary.model_validate("concise")
PROMPT = (
"Analyze a safe rollout plan for enabling a feature flag in production. "
"Return JSON matching the requested schema."
)
APPROVAL_POLICY = AskForApproval.model_validate("never")
async def main() -> None:
async with AsyncCodex() as codex:
thread = await codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
turn = await thread.turn(
TextInput(PROMPT),
approval_policy=APPROVAL_POLICY,
cwd=str(Path.cwd()),
effort=ReasoningEffort.medium,
model="gpt-5",
output_schema=OUTPUT_SCHEMA,
personality=Personality.pragmatic,
sandbox_policy=SANDBOX_POLICY,
summary=SUMMARY,
)
result = await turn.run()
print("Status:", result.status)
print("Text:", result.text)
print("Usage:", result.usage)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,67 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
from codex_app_server import (
AskForApproval,
Codex,
Personality,
ReasoningEffort,
ReasoningSummary,
SandboxPolicy,
TextInput,
)
OUTPUT_SCHEMA = {
"type": "object",
"properties": {
"summary": {"type": "string"},
"actions": {
"type": "array",
"items": {"type": "string"},
},
},
"required": ["summary", "actions"],
"additionalProperties": False,
}
SANDBOX_POLICY = SandboxPolicy.model_validate(
{
"type": "readOnly",
"access": {"type": "fullAccess"},
}
)
SUMMARY = ReasoningSummary.model_validate("concise")
PROMPT = (
"Analyze a safe rollout plan for enabling a feature flag in production. "
"Return JSON matching the requested schema."
)
APPROVAL_POLICY = AskForApproval.model_validate("never")
with Codex() as codex:
thread = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
turn = thread.turn(
TextInput(PROMPT),
approval_policy=APPROVAL_POLICY,
cwd=str(Path.cwd()),
effort=ReasoningEffort.medium,
model="gpt-5",
output_schema=OUTPUT_SCHEMA,
personality=Personality.pragmatic,
sandbox_policy=SANDBOX_POLICY,
summary=SUMMARY,
)
result = turn.run()
print("Status:", result.status)
print("Text:", result.text)
print("Usage:", result.usage)

View File

@@ -0,0 +1,117 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
import asyncio
from codex_app_server import (
AskForApproval,
AsyncCodex,
Personality,
ReasoningEffort,
ReasoningSummary,
SandboxPolicy,
TextInput,
)
REASONING_RANK = {
"none": 0,
"minimal": 1,
"low": 2,
"medium": 3,
"high": 4,
"xhigh": 5,
}
def _pick_highest_model(models):
visible = [m for m in models if not m.hidden] or models
known_names = {m.id for m in visible} | {m.model for m in visible}
top_candidates = [m for m in visible if not (m.upgrade and m.upgrade in known_names)]
pool = top_candidates or visible
return max(pool, key=lambda m: (m.model, m.id))
def _pick_highest_turn_effort(model) -> ReasoningEffort:
if not model.supported_reasoning_efforts:
return ReasoningEffort.medium
best = max(
model.supported_reasoning_efforts,
key=lambda option: REASONING_RANK.get(option.reasoning_effort.value, -1),
)
return ReasoningEffort(best.reasoning_effort.value)
OUTPUT_SCHEMA = {
"type": "object",
"properties": {
"summary": {"type": "string"},
"actions": {
"type": "array",
"items": {"type": "string"},
},
},
"required": ["summary", "actions"],
"additionalProperties": False,
}
SANDBOX_POLICY = SandboxPolicy.model_validate(
{
"type": "readOnly",
"access": {"type": "fullAccess"},
}
)
APPROVAL_POLICY = AskForApproval.model_validate("never")
async def main() -> None:
async with AsyncCodex() as codex:
models = await codex.models(include_hidden=True)
selected_model = _pick_highest_model(models.data)
selected_effort = _pick_highest_turn_effort(selected_model)
print("selected.model:", selected_model.model)
print("selected.effort:", selected_effort.value)
thread = await codex.thread_start(
model=selected_model.model,
config={"model_reasoning_effort": selected_effort.value},
)
first_turn = await thread.turn(
TextInput("Give one short sentence about reliable production releases."),
model=selected_model.model,
effort=selected_effort,
)
first = await first_turn.run()
print("agent.message:", first.text)
print("usage:", first.usage)
second_turn = await thread.turn(
TextInput("Return JSON for a safe feature-flag rollout plan."),
approval_policy=APPROVAL_POLICY,
cwd=str(Path.cwd()),
effort=selected_effort,
model=selected_model.model,
output_schema=OUTPUT_SCHEMA,
personality=Personality.pragmatic,
sandbox_policy=SANDBOX_POLICY,
summary=ReasoningSummary.model_validate("concise"),
)
second = await second_turn.run()
print("agent.message.params:", second.text)
print("usage.params:", second.usage)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,108 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
from codex_app_server import (
AskForApproval,
Codex,
Personality,
ReasoningEffort,
ReasoningSummary,
SandboxPolicy,
TextInput,
)
REASONING_RANK = {
"none": 0,
"minimal": 1,
"low": 2,
"medium": 3,
"high": 4,
"xhigh": 5,
}
def _pick_highest_model(models):
visible = [m for m in models if not m.hidden] or models
known_names = {m.id for m in visible} | {m.model for m in visible}
top_candidates = [m for m in visible if not (m.upgrade and m.upgrade in known_names)]
pool = top_candidates or visible
return max(pool, key=lambda m: (m.model, m.id))
def _pick_highest_turn_effort(model) -> ReasoningEffort:
if not model.supported_reasoning_efforts:
return ReasoningEffort.medium
best = max(
model.supported_reasoning_efforts,
key=lambda option: REASONING_RANK.get(option.reasoning_effort.value, -1),
)
return ReasoningEffort(best.reasoning_effort.value)
OUTPUT_SCHEMA = {
"type": "object",
"properties": {
"summary": {"type": "string"},
"actions": {
"type": "array",
"items": {"type": "string"},
},
},
"required": ["summary", "actions"],
"additionalProperties": False,
}
SANDBOX_POLICY = SandboxPolicy.model_validate(
{
"type": "readOnly",
"access": {"type": "fullAccess"},
}
)
APPROVAL_POLICY = AskForApproval.model_validate("never")
with Codex() as codex:
models = codex.models(include_hidden=True)
selected_model = _pick_highest_model(models.data)
selected_effort = _pick_highest_turn_effort(selected_model)
print("selected.model:", selected_model.model)
print("selected.effort:", selected_effort.value)
thread = codex.thread_start(
model=selected_model.model,
config={"model_reasoning_effort": selected_effort.value},
)
first = thread.turn(
TextInput("Give one short sentence about reliable production releases."),
model=selected_model.model,
effort=selected_effort,
).run()
print("agent.message:", first.text)
print("usage:", first.usage)
second = thread.turn(
TextInput("Return JSON for a safe feature-flag rollout plan."),
approval_policy=APPROVAL_POLICY,
cwd=str(Path.cwd()),
effort=selected_effort,
model=selected_model.model,
output_schema=OUTPUT_SCHEMA,
personality=Personality.pragmatic,
sandbox_policy=SANDBOX_POLICY,
summary=ReasoningSummary.model_validate("concise"),
).run()
print("agent.message.params:", second.text)
print("usage.params:", second.usage)

View File

@@ -0,0 +1,70 @@
# Python SDK Examples
Each example folder contains runnable versions:
- `sync.py` (public sync surface: `Codex`)
- `async.py` (public async surface: `AsyncCodex`)
All examples intentionally use only public SDK exports from `codex_app_server`.
## Prerequisites
- Python `>=3.10`
- Install SDK dependencies for the same Python interpreter you will use to run examples
Recommended setup (from `sdk/python`):
```bash
python -m venv .venv
source .venv/bin/activate
python -m pip install -U pip
python -m pip install -e .
```
## Run examples
From `sdk/python`:
```bash
python examples/<example-folder>/sync.py
python examples/<example-folder>/async.py
```
The examples bootstrap local imports from `sdk/python/src` automatically, so no `pip install -e .` step is required to run them from this repository checkout.
The only required install step is dependencies for your active interpreter.
## Recommended first run
```bash
python examples/01_quickstart_constructor/sync.py
python examples/01_quickstart_constructor/async.py
```
## Index
- `01_quickstart_constructor/`
- first run / sanity check
- `02_turn_run/`
- inspect full turn output fields
- `03_turn_stream_events/`
- stream and print raw notifications
- `04_models_and_metadata/`
- read server metadata and model list
- `05_existing_thread/`
- resume a real existing thread (created in-script)
- `06_thread_lifecycle_and_controls/`
- thread lifecycle + control calls
- `07_image_and_text/`
- remote image URL + text multimodal turn
- `08_local_image_and_text/`
- local image + text multimodal turn using bundled sample image
- `09_async_parity/`
- parity-style sync flow (see async parity in other examples)
- `10_error_handling_and_retry/`
- overload retry pattern + typed error handling structure
- `11_cli_mini_app/`
- interactive chat loop
- `12_turn_params_kitchen_sink/`
- one turn using most optional `turn(...)` params (sync + async)
- `13_model_select_and_turn_params/`
- list models, pick highest model + highest supported reasoning effort, run turns, print message and usage

View File

@@ -0,0 +1,35 @@
from __future__ import annotations
import importlib.util
import sys
from pathlib import Path
def _ensure_runtime_dependencies(sdk_python_dir: Path) -> None:
if importlib.util.find_spec("pydantic") is not None:
return
python = sys.executable
raise RuntimeError(
"Missing required dependency: pydantic.\n"
f"Interpreter: {python}\n"
"Install dependencies with the same interpreter used to run this example:\n"
f" {python} -m pip install -e {sdk_python_dir}\n"
"If you installed with `pip` from another Python, reinstall using the command above."
)
def ensure_local_sdk_src() -> Path:
"""Add sdk/python/src to sys.path so examples run without installing the package."""
sdk_python_dir = Path(__file__).resolve().parents[1]
src_dir = sdk_python_dir / "src"
package_dir = src_dir / "codex_app_server"
if not package_dir.exists():
raise RuntimeError(f"Could not locate local SDK package at {package_dir}")
_ensure_runtime_dependencies(sdk_python_dir)
src_str = str(src_dir)
if src_str not in sys.path:
sys.path.insert(0, src_str)
return src_dir

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.6 KiB

View File

@@ -0,0 +1,535 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Codex Python SDK Walkthrough\n",
"\n",
"Public SDK surface only (`codex_app_server` root exports)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 1: bootstrap local SDK imports (no installation required)\n",
"import os\n",
"import sys\n",
"from pathlib import Path\n",
"\n",
"if sys.version_info < (3, 10):\n",
" raise RuntimeError(\n",
" f'Notebook requires Python 3.10+; current interpreter is {sys.version.split()[0]}.'\n",
" )\n",
"\n",
"try:\n",
" _ = os.getcwd()\n",
"except FileNotFoundError:\n",
" os.chdir(str(Path.home()))\n",
"\n",
"\n",
"def _is_sdk_python_dir(path: Path) -> bool:\n",
" return (path / 'pyproject.toml').exists() and (path / 'src' / 'codex_app_server').exists()\n",
"\n",
"\n",
"def _iter_home_fallback_candidates(home: Path):\n",
" # bounded depth scan under home to support launching notebooks from unrelated cwd values\n",
" patterns = ('sdk/python', '*/sdk/python', '*/*/sdk/python', '*/*/*/sdk/python')\n",
" for pattern in patterns:\n",
" yield from home.glob(pattern)\n",
"\n",
"\n",
"def _find_sdk_python_dir(start: Path) -> Path | None:\n",
" checked = set()\n",
"\n",
" def _consider(candidate: Path) -> Path | None:\n",
" resolved = candidate.resolve()\n",
" if resolved in checked:\n",
" return None\n",
" checked.add(resolved)\n",
" if _is_sdk_python_dir(resolved):\n",
" return resolved\n",
" return None\n",
"\n",
" for candidate in [start, *start.parents]:\n",
" found = _consider(candidate)\n",
" if found is not None:\n",
" return found\n",
"\n",
" for candidate in [start / 'sdk' / 'python', *(parent / 'sdk' / 'python' for parent in start.parents)]:\n",
" found = _consider(candidate)\n",
" if found is not None:\n",
" return found\n",
"\n",
" env_dir = os.environ.get('CODEX_PYTHON_SDK_DIR')\n",
" if env_dir:\n",
" found = _consider(Path(env_dir).expanduser())\n",
" if found is not None:\n",
" return found\n",
"\n",
" for entry in sys.path:\n",
" if not entry:\n",
" continue\n",
" entry_path = Path(entry).expanduser()\n",
" for candidate in (entry_path, entry_path / 'sdk' / 'python'):\n",
" found = _consider(candidate)\n",
" if found is not None:\n",
" return found\n",
"\n",
" home = Path.home()\n",
" for candidate in _iter_home_fallback_candidates(home):\n",
" found = _consider(candidate)\n",
" if found is not None:\n",
" return found\n",
"\n",
" return None\n",
"\n",
"\n",
"repo_python_dir = _find_sdk_python_dir(Path.cwd())\n",
"if repo_python_dir is None:\n",
" raise RuntimeError('Could not locate sdk/python. Set CODEX_PYTHON_SDK_DIR to your sdk/python path.')\n",
"\n",
"src_dir = repo_python_dir / 'src'\n",
"if str(src_dir) not in sys.path:\n",
" sys.path.insert(0, str(src_dir))\n",
"\n",
"# Force fresh imports after SDK upgrades in the same notebook kernel.\n",
"for module_name in list(sys.modules):\n",
" if module_name == 'codex_app_server' or module_name.startswith('codex_app_server.'):\n",
" sys.modules.pop(module_name, None)\n",
"\n",
"print('Kernel:', sys.executable)\n",
"print('SDK source:', src_dir)\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 2: imports (public only)\n",
"from codex_app_server import (\n",
" AsyncCodex,\n",
" Codex,\n",
" ImageInput,\n",
" LocalImageInput,\n",
" TextInput,\n",
" retry_on_overload,\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 3: simple sync conversation\n",
"with Codex() as codex:\n",
" thread = codex.thread_start(model='gpt-5', config={'model_reasoning_effort': 'high'})\n",
" turn = thread.turn(TextInput('Explain gradient descent in 3 bullets.'))\n",
" result = turn.run()\n",
"\n",
" print('server:', codex.metadata)\n",
" print('status:', result.status)\n",
" print(result.text)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 4: multi-turn continuity in same thread\n",
"with Codex() as codex:\n",
" thread = codex.thread_start(model='gpt-5', config={'model_reasoning_effort': 'high'})\n",
"\n",
" first = thread.turn(TextInput('Give a short summary of transformers.')).run()\n",
" second = thread.turn(TextInput('Now explain that to a high-school student.')).run()\n",
"\n",
" print('first status:', first.status)\n",
" print('second status:', second.status)\n",
" print('second text:', second.text)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 5: full thread lifecycle and branching (sync)\n",
"with Codex() as codex:\n",
" thread = codex.thread_start(model='gpt-5', config={'model_reasoning_effort': 'high'})\n",
" first = thread.turn(TextInput('One sentence about structured planning.')).run()\n",
" second = thread.turn(TextInput('Now restate it for a junior engineer.')).run()\n",
"\n",
" reopened = codex.thread_resume(thread.id)\n",
" listing_active = codex.thread_list(limit=20, archived=False)\n",
" reading = reopened.read(include_turns=True)\n",
"\n",
" _ = reopened.set_name('sdk-lifecycle-demo')\n",
" _ = codex.thread_archive(reopened.id)\n",
" listing_archived = codex.thread_list(limit=20, archived=True)\n",
" unarchived = codex.thread_unarchive(reopened.id)\n",
"\n",
" resumed_info = 'n/a'\n",
" try:\n",
" resumed = codex.thread_resume(\n",
" unarchived.id,\n",
" model='gpt-5',\n",
" config={'model_reasoning_effort': 'high'},\n",
" )\n",
" resumed_result = resumed.turn(TextInput('Continue in one short sentence.')).run()\n",
" resumed_info = f'{resumed_result.turn_id} {resumed_result.status}'\n",
" except Exception as e:\n",
" resumed_info = f'skipped({type(e).__name__})'\n",
"\n",
" forked_info = 'n/a'\n",
" try:\n",
" forked = codex.thread_fork(unarchived.id, model='gpt-5')\n",
" forked_result = forked.turn(TextInput('Take a different angle in one short sentence.')).run()\n",
" forked_info = f'{forked_result.turn_id} {forked_result.status}'\n",
" except Exception as e:\n",
" forked_info = f'skipped({type(e).__name__})'\n",
"\n",
" compact_info = 'sent'\n",
" try:\n",
" _ = unarchived.compact()\n",
" except Exception as e:\n",
" compact_info = f'skipped({type(e).__name__})'\n",
"\n",
" print('Lifecycle OK:', thread.id)\n",
" print('first:', first.turn_id, first.status)\n",
" print('second:', second.turn_id, second.status)\n",
" print('read.turns:', len(reading.thread.turns or []))\n",
" print('list.active:', len(listing_active.data))\n",
" print('list.archived:', len(listing_archived.data))\n",
" print('resumed:', resumed_info)\n",
" print('forked:', forked_info)\n",
" print('compact:', compact_info)\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 5b: one turn with most optional turn params\n",
"from pathlib import Path\n",
"from codex_app_server import (\n",
" AskForApproval,\n",
" Personality,\n",
" ReasoningEffort,\n",
" ReasoningSummary,\n",
" SandboxPolicy,\n",
")\n",
"\n",
"output_schema = {\n",
" 'type': 'object',\n",
" 'properties': {\n",
" 'summary': {'type': 'string'},\n",
" 'actions': {'type': 'array', 'items': {'type': 'string'}},\n",
" },\n",
" 'required': ['summary', 'actions'],\n",
" 'additionalProperties': False,\n",
"}\n",
"\n",
"sandbox_policy = SandboxPolicy.model_validate({'type': 'readOnly', 'access': {'type': 'fullAccess'}})\n",
"summary = ReasoningSummary.model_validate('concise')\n",
"\n",
"with Codex() as codex:\n",
" thread = codex.thread_start(model='gpt-5', config={'model_reasoning_effort': 'high'})\n",
" turn = thread.turn(\n",
" TextInput('Propose a safe production feature-flag rollout. Return JSON matching the schema.'),\n",
" approval_policy=AskForApproval.never,\n",
" cwd=str(Path.cwd()),\n",
" effort=ReasoningEffort.medium,\n",
" model='gpt-5',\n",
" output_schema=output_schema,\n",
" personality=Personality.pragmatic,\n",
" sandbox_policy=sandbox_policy,\n",
" summary=summary,\n",
" )\n",
" result = turn.run()\n",
"\n",
" print('status:', result.status)\n",
" print(result.text)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 5c: choose highest model + highest supported reasoning, then run turns\n",
"from pathlib import Path\n",
"from codex_app_server import (\n",
" AskForApproval,\n",
" Personality,\n",
" ReasoningEffort,\n",
" ReasoningSummary,\n",
" SandboxPolicy,\n",
")\n",
"\n",
"reasoning_rank = {\n",
" 'none': 0,\n",
" 'minimal': 1,\n",
" 'low': 2,\n",
" 'medium': 3,\n",
" 'high': 4,\n",
" 'xhigh': 5,\n",
"}\n",
"\n",
"\n",
"def pick_highest_model(models):\n",
" visible = [m for m in models if not m.hidden] or models\n",
" known_names = {m.id for m in visible} | {m.model for m in visible}\n",
" top_candidates = [m for m in visible if not (m.upgrade and m.upgrade in known_names)]\n",
" pool = top_candidates or visible\n",
" return max(pool, key=lambda m: (m.model, m.id))\n",
"\n",
"\n",
"def pick_highest_turn_effort(model) -> ReasoningEffort:\n",
" if not model.supported_reasoning_efforts:\n",
" return ReasoningEffort.medium\n",
" best = max(model.supported_reasoning_efforts, key=lambda opt: reasoning_rank.get(opt.reasoning_effort.value, -1))\n",
" return ReasoningEffort(best.reasoning_effort.value)\n",
"\n",
"\n",
"output_schema = {\n",
" 'type': 'object',\n",
" 'properties': {\n",
" 'summary': {'type': 'string'},\n",
" 'actions': {'type': 'array', 'items': {'type': 'string'}},\n",
" },\n",
" 'required': ['summary', 'actions'],\n",
" 'additionalProperties': False,\n",
"}\n",
"sandbox_policy = SandboxPolicy.model_validate({'type': 'readOnly', 'access': {'type': 'fullAccess'}})\n",
"\n",
"with Codex() as codex:\n",
" models = codex.models(include_hidden=True)\n",
" selected_model = pick_highest_model(models.data)\n",
" selected_effort = pick_highest_turn_effort(selected_model)\n",
"\n",
" print('selected.model:', selected_model.model)\n",
" print('selected.effort:', selected_effort.value)\n",
"\n",
" thread = codex.thread_start(model=selected_model.model, config={'model_reasoning_effort': selected_effort.value})\n",
"\n",
" first = thread.turn(\n",
" TextInput('Give one short sentence about reliable production releases.'),\n",
" model=selected_model.model,\n",
" effort=selected_effort,\n",
" ).run()\n",
" print('agent.message:', first.text)\n",
" print('usage:', first.usage)\n",
"\n",
" second = thread.turn(\n",
" TextInput('Return JSON for a safe feature-flag rollout plan.'),\n",
" approval_policy=AskForApproval.never,\n",
" cwd=str(Path.cwd()),\n",
" effort=selected_effort,\n",
" model=selected_model.model,\n",
" output_schema=output_schema,\n",
" personality=Personality.pragmatic,\n",
" sandbox_policy=sandbox_policy,\n",
" summary=ReasoningSummary.model_validate('concise'),\n",
" ).run()\n",
" print('agent.message.params:', second.text)\n",
" print('usage.params:', second.usage)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 6: multimodal with remote image\n",
"remote_image_url = 'https://raw.githubusercontent.com/github/explore/main/topics/python/python.png'\n",
"\n",
"with Codex() as codex:\n",
" thread = codex.thread_start(model='gpt-5', config={'model_reasoning_effort': 'high'})\n",
" result = thread.turn([\n",
" TextInput('What do you see in this image? 3 bullets.'),\n",
" ImageInput(remote_image_url),\n",
" ]).run()\n",
"\n",
" print('status:', result.status)\n",
" print(result.text)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 7: multimodal with local image (bundled asset)\n",
"local_image_path = repo_python_dir / 'examples' / 'assets' / 'sample_scene.png'\n",
"if not local_image_path.exists():\n",
" raise FileNotFoundError(f'Missing bundled image: {local_image_path}')\n",
"\n",
"with Codex() as codex:\n",
" thread = codex.thread_start(model='gpt-5', config={'model_reasoning_effort': 'high'})\n",
" result = thread.turn([\n",
" TextInput('Describe this local image in 2 bullets.'),\n",
" LocalImageInput(str(local_image_path.resolve())),\n",
" ]).run()\n",
"\n",
" print('status:', result.status)\n",
" print(result.text)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 8: retry-on-overload pattern\n",
"with Codex() as codex:\n",
" thread = codex.thread_start(model='gpt-5', config={'model_reasoning_effort': 'high'})\n",
"\n",
" result = retry_on_overload(\n",
" lambda: thread.turn(TextInput('List 5 failure modes in distributed systems.')).run(),\n",
" max_attempts=3,\n",
" initial_delay_s=0.25,\n",
" max_delay_s=2.0,\n",
" )\n",
"\n",
" print('status:', result.status)\n",
" print(result.text)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 9: full thread lifecycle and branching (async)\n",
"import asyncio\n",
"\n",
"\n",
"async def async_lifecycle_demo():\n",
" async with AsyncCodex() as codex:\n",
" thread = await codex.thread_start(model='gpt-5', config={'model_reasoning_effort': 'high'})\n",
" first = await (await thread.turn(TextInput('One sentence about structured planning.'))).run()\n",
" second = await (await thread.turn(TextInput('Now restate it for a junior engineer.'))).run()\n",
"\n",
" reopened = await codex.thread_resume(thread.id)\n",
" listing_active = await codex.thread_list(limit=20, archived=False)\n",
" reading = await reopened.read(include_turns=True)\n",
"\n",
" _ = await reopened.set_name('sdk-lifecycle-demo')\n",
" _ = await codex.thread_archive(reopened.id)\n",
" listing_archived = await codex.thread_list(limit=20, archived=True)\n",
" unarchived = await codex.thread_unarchive(reopened.id)\n",
"\n",
" resumed_info = 'n/a'\n",
" try:\n",
" resumed = await codex.thread_resume(\n",
" unarchived.id,\n",
" model='gpt-5',\n",
" config={'model_reasoning_effort': 'high'},\n",
" )\n",
" resumed_result = await (await resumed.turn(TextInput('Continue in one short sentence.'))).run()\n",
" resumed_info = f'{resumed_result.turn_id} {resumed_result.status}'\n",
" except Exception as e:\n",
" resumed_info = f'skipped({type(e).__name__})'\n",
"\n",
" forked_info = 'n/a'\n",
" try:\n",
" forked = await codex.thread_fork(unarchived.id, model='gpt-5')\n",
" forked_result = await (await forked.turn(TextInput('Take a different angle in one short sentence.'))).run()\n",
" forked_info = f'{forked_result.turn_id} {forked_result.status}'\n",
" except Exception as e:\n",
" forked_info = f'skipped({type(e).__name__})'\n",
"\n",
" compact_info = 'sent'\n",
" try:\n",
" _ = await unarchived.compact()\n",
" except Exception as e:\n",
" compact_info = f'skipped({type(e).__name__})'\n",
"\n",
" print('Lifecycle OK:', thread.id)\n",
" print('first:', first.turn_id, first.status)\n",
" print('second:', second.turn_id, second.status)\n",
" print('read.turns:', len(reading.thread.turns or []))\n",
" print('list.active:', len(listing_active.data))\n",
" print('list.archived:', len(listing_archived.data))\n",
" print('resumed:', resumed_info)\n",
" print('forked:', forked_info)\n",
" print('compact:', compact_info)\n",
"\n",
"\n",
"await async_lifecycle_demo()\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 10: async stream + steer + interrupt (best effort)\n",
"import asyncio\n",
"\n",
"\n",
"async def async_stream_demo():\n",
" async with AsyncCodex() as codex:\n",
" thread = await codex.thread_start(model='gpt-5', config={'model_reasoning_effort': 'high'})\n",
" turn = await thread.turn(TextInput('Count from 1 to 200 with commas, then one summary sentence.'))\n",
"\n",
" try:\n",
" _ = await turn.steer(TextInput('Keep it brief and stop after 20 numbers.'))\n",
" print('steer: sent')\n",
" except Exception as e:\n",
" print('steer: skipped', type(e).__name__)\n",
"\n",
" try:\n",
" _ = await turn.interrupt()\n",
" print('interrupt: sent')\n",
" except Exception as e:\n",
" print('interrupt: skipped', type(e).__name__)\n",
"\n",
" event_count = 0\n",
" async for event in turn.stream():\n",
" event_count += 1\n",
" print(event.method, event.payload)\n",
"\n",
" print('events.count:', event_count)\n",
"\n",
"\n",
"await async_stream_demo()\n",
"\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"name": "python",
"version": "3.10+"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

63
sdk/python/pyproject.toml Normal file
View File

@@ -0,0 +1,63 @@
[build-system]
requires = ["hatchling>=1.24.0"]
build-backend = "hatchling.build"
[project]
name = "codex-app-server-sdk"
version = "0.2.0"
description = "Python SDK for Codex app-server v2"
readme = "README.md"
requires-python = ">=3.10"
license = { text = "Apache-2.0" }
authors = [{ name = "OpenClaw Assistant" }]
keywords = ["codex", "json-rpc", "sdk", "llm", "app-server"]
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Topic :: Software Development :: Libraries :: Python Modules",
]
dependencies = ["pydantic>=2.12"]
[project.urls]
Homepage = "https://github.com/openai/codex"
Repository = "https://github.com/openai/codex"
Issues = "https://github.com/openai/codex/issues"
[project.optional-dependencies]
dev = ["pytest>=8.0", "datamodel-code-generator==0.31.2"]
[tool.hatch.build]
exclude = [
".venv/**",
".venv2/**",
".pytest_cache/**",
"dist/**",
"build/**",
]
[tool.hatch.build.targets.wheel]
packages = ["src/codex_app_server"]
include = [
"src/codex_app_server/bin/**",
"src/codex_app_server/py.typed",
]
[tool.hatch.build.targets.sdist]
include = [
"src/codex_app_server/**",
"README.md",
"CHANGELOG.md",
"CONTRIBUTING.md",
"RELEASE_CHECKLIST.md",
"pyproject.toml",
]
[tool.pytest.ini_options]
addopts = "-q"
testpaths = ["tests"]

View File

@@ -0,0 +1,741 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import importlib
import json
import platform
import re
import shutil
import stat
import subprocess
import sys
import tarfile
import tempfile
import types
import typing
import urllib.request
import zipfile
from dataclasses import dataclass
from pathlib import Path
from typing import Any, get_args, get_origin
def repo_root() -> Path:
return Path(__file__).resolve().parents[3]
def sdk_root() -> Path:
return repo_root() / "sdk" / "python"
def schema_bundle_path() -> Path:
return (
repo_root()
/ "codex-rs"
/ "app-server-protocol"
/ "schema"
/ "json"
/ "codex_app_server_protocol.v2.schemas.json"
)
def schema_root_dir() -> Path:
return repo_root() / "codex-rs" / "app-server-protocol" / "schema" / "json"
def _is_windows() -> bool:
return platform.system().lower().startswith("win")
def pinned_bin_path() -> Path:
name = "codex.exe" if _is_windows() else "codex"
return sdk_root() / "bin" / name
def bundled_platform_bin_path(platform_key: str) -> Path:
exe = "codex.exe" if platform_key.startswith("windows") else "codex"
return sdk_root() / "src" / "codex_app_server" / "bin" / platform_key / exe
PLATFORMS: dict[str, tuple[list[str], list[str]]] = {
"darwin-arm64": (["darwin", "apple-darwin", "macos"], ["aarch64", "arm64"]),
"darwin-x64": (["darwin", "apple-darwin", "macos"], ["x86_64", "amd64", "x64"]),
"linux-arm64": (["linux", "unknown-linux", "musl", "gnu"], ["aarch64", "arm64"]),
"linux-x64": (["linux", "unknown-linux", "musl", "gnu"], ["x86_64", "amd64", "x64"]),
"windows-arm64": (["windows", "pc-windows", "win", "msvc", "gnu"], ["aarch64", "arm64"]),
"windows-x64": (["windows", "pc-windows", "win", "msvc", "gnu"], ["x86_64", "amd64", "x64"]),
}
def run(cmd: list[str], cwd: Path) -> None:
subprocess.run(cmd, cwd=str(cwd), check=True)
def run_python_module(module: str, args: list[str], cwd: Path) -> None:
run([sys.executable, "-m", module, *args], cwd)
def platform_tokens() -> tuple[list[str], list[str]]:
sys_name = platform.system().lower()
machine = platform.machine().lower()
if sys_name == "darwin":
os_tokens = ["darwin", "apple-darwin", "macos"]
elif sys_name == "linux":
os_tokens = ["linux", "unknown-linux", "musl", "gnu"]
elif sys_name.startswith("win"):
os_tokens = ["windows", "pc-windows", "win", "msvc", "gnu"]
else:
raise RuntimeError(f"Unsupported OS: {sys_name}")
if machine in {"arm64", "aarch64"}:
arch_tokens = ["aarch64", "arm64"]
elif machine in {"x86_64", "amd64"}:
arch_tokens = ["x86_64", "amd64", "x64"]
else:
raise RuntimeError(f"Unsupported architecture: {machine}")
return os_tokens, arch_tokens
def pick_release(channel: str) -> dict[str, Any]:
releases = json.loads(
subprocess.check_output(["gh", "api", "repos/openai/codex/releases?per_page=50"], text=True)
)
if channel == "stable":
candidates = [r for r in releases if not r.get("prerelease") and not r.get("draft")]
else:
candidates = [r for r in releases if r.get("prerelease") and not r.get("draft")]
if not candidates:
raise RuntimeError(f"No {channel} release found")
return candidates[0]
def pick_asset(release: dict[str, Any], os_tokens: list[str], arch_tokens: list[str]) -> dict[str, Any]:
scored: list[tuple[int, dict[str, Any]]] = []
for asset in release.get("assets", []):
name = (asset.get("name") or "").lower()
# Accept only primary codex cli artifacts.
if not (name.startswith("codex-") or name == "codex"):
continue
if name.startswith("codex-responses") or name.startswith("codex-command-runner") or name.startswith("codex-windows-sandbox") or name.startswith("codex-npm"):
continue
if not (name.endswith(".tar.gz") or name.endswith(".zip")):
continue
os_score = sum(1 for t in os_tokens if t in name)
arch_score = sum(1 for t in arch_tokens if t in name)
if os_score == 0 or arch_score == 0:
continue
score = os_score * 10 + arch_score
scored.append((score, asset))
if not scored:
raise RuntimeError("Could not find matching codex CLI asset for this platform")
scored.sort(key=lambda x: x[0], reverse=True)
return scored[0][1]
def download(url: str, out: Path) -> None:
req = urllib.request.Request(url, headers={"User-Agent": "codex-python-sdk-updater"})
with urllib.request.urlopen(req) as resp, out.open("wb") as f:
shutil.copyfileobj(resp, f)
def extract_codex_binary(archive: Path, out_bin: Path) -> None:
with tempfile.TemporaryDirectory() as td:
tmp = Path(td)
if archive.name.endswith(".tar.gz"):
with tarfile.open(archive, "r:gz") as tar:
tar.extractall(tmp)
elif archive.name.endswith(".zip"):
with zipfile.ZipFile(archive) as zf:
zf.extractall(tmp)
else:
raise RuntimeError(f"Unsupported archive format: {archive}")
preferred_names = {"codex.exe", "codex"}
candidates = [
p for p in tmp.rglob("*") if p.is_file() and (p.name.lower() in preferred_names or p.name.lower().startswith("codex-"))
]
if not candidates:
raise RuntimeError("No codex binary found in release archive")
candidates.sort(key=lambda p: (p.name.lower() not in preferred_names, p.name.lower()))
out_bin.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(candidates[0], out_bin)
if not _is_windows():
out_bin.chmod(out_bin.stat().st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
def _download_asset_to_binary(release: dict[str, Any], os_tokens: list[str], arch_tokens: list[str], out_bin: Path) -> None:
asset = pick_asset(release, os_tokens, arch_tokens)
print(f"Asset: {asset.get('name')} -> {out_bin}")
with tempfile.TemporaryDirectory() as td:
archive = Path(td) / (asset.get("name") or "codex-release.tar.gz")
download(asset["browser_download_url"], archive)
extract_codex_binary(archive, out_bin)
def update_binary(channel: str) -> None:
if shutil.which("gh") is None:
raise RuntimeError("GitHub CLI (`gh`) is required to download release binaries")
release = pick_release(channel)
os_tokens, arch_tokens = platform_tokens()
print(f"Release: {release.get('tag_name')} ({channel})")
# refresh current platform in bundled runtime location
current_key = next((k for k, v in PLATFORMS.items() if v == (os_tokens, arch_tokens)), None)
out = bundled_platform_bin_path(current_key) if current_key else pinned_bin_path()
_download_asset_to_binary(release, os_tokens, arch_tokens, out)
print(f"Pinned binary updated: {out}")
def bundle_all_platform_binaries(channel: str) -> None:
if shutil.which("gh") is None:
raise RuntimeError("GitHub CLI (`gh`) is required to download release binaries")
release = pick_release(channel)
print(f"Release: {release.get('tag_name')} ({channel})")
for platform_key, (os_tokens, arch_tokens) in PLATFORMS.items():
_download_asset_to_binary(release, os_tokens, arch_tokens, bundled_platform_bin_path(platform_key))
print("Bundled all platform binaries.")
def _flatten_string_enum_one_of(definition: dict[str, Any]) -> bool:
branches = definition.get("oneOf")
if not isinstance(branches, list) or not branches:
return False
enum_values: list[str] = []
for branch in branches:
if not isinstance(branch, dict):
return False
if branch.get("type") != "string":
return False
enum = branch.get("enum")
if not isinstance(enum, list) or len(enum) != 1 or not isinstance(enum[0], str):
return False
extra_keys = set(branch) - {"type", "enum", "description", "title"}
if extra_keys:
return False
enum_values.append(enum[0])
description = definition.get("description")
title = definition.get("title")
definition.clear()
definition["type"] = "string"
definition["enum"] = enum_values
if isinstance(description, str):
definition["description"] = description
if isinstance(title, str):
definition["title"] = title
return True
def _normalized_schema_bundle_text() -> str:
schema = json.loads(schema_bundle_path().read_text())
definitions = schema.get("definitions", {})
if isinstance(definitions, dict):
for definition in definitions.values():
if isinstance(definition, dict):
_flatten_string_enum_one_of(definition)
return json.dumps(schema, indent=2, sort_keys=True) + "\n"
def generate_v2_all() -> None:
out_path = sdk_root() / "src" / "codex_app_server" / "generated" / "v2_all.py"
out_dir = out_path.parent
old_package_dir = out_dir / "v2_all"
if old_package_dir.exists():
shutil.rmtree(old_package_dir)
out_dir.mkdir(parents=True, exist_ok=True)
with tempfile.TemporaryDirectory() as td:
normalized_bundle = Path(td) / schema_bundle_path().name
normalized_bundle.write_text(_normalized_schema_bundle_text())
run_python_module(
"datamodel_code_generator",
[
"--input",
str(normalized_bundle),
"--input-file-type",
"jsonschema",
"--output",
str(out_path),
"--output-model-type",
"pydantic_v2.BaseModel",
"--target-python-version",
"3.10",
"--snake-case-field",
"--allow-population-by-field-name",
"--use-union-operator",
"--reuse-model",
"--disable-timestamp",
"--use-double-quotes",
],
cwd=sdk_root(),
)
_normalize_generated_timestamps(out_path)
def _notification_specs() -> list[tuple[str, str]]:
server_notifications = json.loads((schema_root_dir() / "ServerNotification.json").read_text())
one_of = server_notifications.get("oneOf", [])
generated_source = (
sdk_root() / "src" / "codex_app_server" / "generated" / "v2_all.py"
).read_text()
specs: list[tuple[str, str]] = []
for variant in one_of:
props = variant.get("properties", {})
method_meta = props.get("method", {})
params_meta = props.get("params", {})
methods = method_meta.get("enum", [])
if len(methods) != 1:
continue
method = methods[0]
if not isinstance(method, str):
continue
ref = params_meta.get("$ref")
if not isinstance(ref, str) or not ref.startswith("#/definitions/"):
continue
class_name = ref.split("/")[-1]
if f"class {class_name}(" not in generated_source and f"{class_name} =" not in generated_source:
# Skip schema variants that are not emitted into the generated v2 surface.
continue
specs.append((method, class_name))
specs.sort()
return specs
def generate_notification_registry() -> None:
out = sdk_root() / "src" / "codex_app_server" / "generated" / "notification_registry.py"
specs = _notification_specs()
class_names = sorted({class_name for _, class_name in specs})
lines = [
"# Auto-generated by scripts/update_sdk_artifacts.py",
"# DO NOT EDIT MANUALLY.",
"",
"from __future__ import annotations",
"",
"from pydantic import BaseModel",
"",
]
for class_name in class_names:
lines.append(f"from .v2_all import {class_name}")
lines.extend(
[
"",
"NOTIFICATION_MODELS: dict[str, type[BaseModel]] = {",
]
)
for method, class_name in specs:
lines.append(f' "{method}": {class_name},')
lines.extend(["}", ""])
out.write_text("\n".join(lines))
def _normalize_generated_timestamps(root: Path) -> None:
timestamp_re = re.compile(r"^#\s+timestamp:\s+.+$", flags=re.MULTILINE)
py_files = [root] if root.is_file() else sorted(root.rglob("*.py"))
for py_file in py_files:
content = py_file.read_text()
normalized = timestamp_re.sub("# timestamp: <normalized>", content)
if normalized != content:
py_file.write_text(normalized)
FIELD_ANNOTATION_OVERRIDES: dict[str, str] = {
# Keep public API typed without falling back to `Any`.
"config": "JsonObject",
"output_schema": "JsonObject",
}
@dataclass(slots=True)
class PublicFieldSpec:
wire_name: str
py_name: str
annotation: str
required: bool
def _annotation_to_source(annotation: Any) -> str:
origin = get_origin(annotation)
if origin is typing.Annotated:
return _annotation_to_source(get_args(annotation)[0])
if origin in (typing.Union, types.UnionType):
parts: list[str] = []
for arg in get_args(annotation):
rendered = _annotation_to_source(arg)
if rendered not in parts:
parts.append(rendered)
return " | ".join(parts)
if origin is list:
args = get_args(annotation)
item = _annotation_to_source(args[0]) if args else "Any"
return f"list[{item}]"
if origin is dict:
args = get_args(annotation)
key = _annotation_to_source(args[0]) if args else "str"
val = _annotation_to_source(args[1]) if len(args) > 1 else "Any"
return f"dict[{key}, {val}]"
if annotation is Any or annotation is typing.Any:
return "Any"
if annotation is None or annotation is type(None):
return "None"
if isinstance(annotation, type):
if annotation.__module__ == "builtins":
return annotation.__name__
return annotation.__name__
return repr(annotation)
def _camel_to_snake(name: str) -> str:
head = re.sub(r"(.)([A-Z][a-z]+)", r"\1_\2", name)
return re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", head).lower()
def _load_public_fields(module_name: str, class_name: str, *, exclude: set[str] | None = None) -> list[PublicFieldSpec]:
exclude = exclude or set()
module = importlib.import_module(module_name)
model = getattr(module, class_name)
fields: list[PublicFieldSpec] = []
for name, field in model.model_fields.items():
if name in exclude:
continue
required = field.is_required()
annotation = _annotation_to_source(field.annotation)
override = FIELD_ANNOTATION_OVERRIDES.get(name)
if override is not None:
annotation = override if required else f"{override} | None"
fields.append(
PublicFieldSpec(
wire_name=name,
py_name=name,
annotation=annotation,
required=required,
)
)
return fields
def _kw_signature_lines(fields: list[PublicFieldSpec]) -> list[str]:
lines: list[str] = []
for field in fields:
default = "" if field.required else " = None"
lines.append(f" {field.py_name}: {field.annotation}{default},")
return lines
def _model_arg_lines(fields: list[PublicFieldSpec], *, indent: str = " ") -> list[str]:
return [f"{indent}{field.wire_name}={field.py_name}," for field in fields]
def _replace_generated_block(source: str, block_name: str, body: str) -> str:
start_tag = f" # BEGIN GENERATED: {block_name}"
end_tag = f" # END GENERATED: {block_name}"
pattern = re.compile(
rf"(?s){re.escape(start_tag)}\n.*?\n{re.escape(end_tag)}"
)
replacement = f"{start_tag}\n{body.rstrip()}\n{end_tag}"
updated, count = pattern.subn(replacement, source, count=1)
if count != 1:
raise RuntimeError(f"Could not update generated block: {block_name}")
return updated
def _render_codex_block(
thread_start_fields: list[PublicFieldSpec],
thread_list_fields: list[PublicFieldSpec],
resume_fields: list[PublicFieldSpec],
fork_fields: list[PublicFieldSpec],
) -> str:
lines = [
" def thread_start(",
" self,",
" *,",
*_kw_signature_lines(thread_start_fields),
" ) -> Thread:",
" params = ThreadStartParams(",
*_model_arg_lines(thread_start_fields),
" )",
" started = self._client.thread_start(params)",
" return Thread(self._client, started.thread.id)",
"",
" def thread_list(",
" self,",
" *,",
*_kw_signature_lines(thread_list_fields),
" ) -> ThreadListResponse:",
" params = ThreadListParams(",
*_model_arg_lines(thread_list_fields),
" )",
" return self._client.thread_list(params)",
"",
" def thread_resume(",
" self,",
" thread_id: str,",
" *,",
*_kw_signature_lines(resume_fields),
" ) -> Thread:",
" params = ThreadResumeParams(",
" thread_id=thread_id,",
*_model_arg_lines(resume_fields),
" )",
" resumed = self._client.thread_resume(thread_id, params)",
" return Thread(self._client, resumed.thread.id)",
"",
" def thread_fork(",
" self,",
" thread_id: str,",
" *,",
*_kw_signature_lines(fork_fields),
" ) -> Thread:",
" params = ThreadForkParams(",
" thread_id=thread_id,",
*_model_arg_lines(fork_fields),
" )",
" forked = self._client.thread_fork(thread_id, params)",
" return Thread(self._client, forked.thread.id)",
"",
" def thread_archive(self, thread_id: str) -> ThreadArchiveResponse:",
" return self._client.thread_archive(thread_id)",
"",
" def thread_unarchive(self, thread_id: str) -> Thread:",
" unarchived = self._client.thread_unarchive(thread_id)",
" return Thread(self._client, unarchived.thread.id)",
]
return "\n".join(lines)
def _render_async_codex_block(
thread_start_fields: list[PublicFieldSpec],
thread_list_fields: list[PublicFieldSpec],
resume_fields: list[PublicFieldSpec],
fork_fields: list[PublicFieldSpec],
) -> str:
lines = [
" async def thread_start(",
" self,",
" *,",
*_kw_signature_lines(thread_start_fields),
" ) -> AsyncThread:",
" await self._ensure_initialized()",
" params = ThreadStartParams(",
*_model_arg_lines(thread_start_fields),
" )",
" started = await self._client.thread_start(params)",
" return AsyncThread(self, started.thread.id)",
"",
" async def thread_list(",
" self,",
" *,",
*_kw_signature_lines(thread_list_fields),
" ) -> ThreadListResponse:",
" await self._ensure_initialized()",
" params = ThreadListParams(",
*_model_arg_lines(thread_list_fields),
" )",
" return await self._client.thread_list(params)",
"",
" async def thread_resume(",
" self,",
" thread_id: str,",
" *,",
*_kw_signature_lines(resume_fields),
" ) -> AsyncThread:",
" await self._ensure_initialized()",
" params = ThreadResumeParams(",
" thread_id=thread_id,",
*_model_arg_lines(resume_fields),
" )",
" resumed = await self._client.thread_resume(thread_id, params)",
" return AsyncThread(self, resumed.thread.id)",
"",
" async def thread_fork(",
" self,",
" thread_id: str,",
" *,",
*_kw_signature_lines(fork_fields),
" ) -> AsyncThread:",
" await self._ensure_initialized()",
" params = ThreadForkParams(",
" thread_id=thread_id,",
*_model_arg_lines(fork_fields),
" )",
" forked = await self._client.thread_fork(thread_id, params)",
" return AsyncThread(self, forked.thread.id)",
"",
" async def thread_archive(self, thread_id: str) -> ThreadArchiveResponse:",
" await self._ensure_initialized()",
" return await self._client.thread_archive(thread_id)",
"",
" async def thread_unarchive(self, thread_id: str) -> AsyncThread:",
" await self._ensure_initialized()",
" unarchived = await self._client.thread_unarchive(thread_id)",
" return AsyncThread(self, unarchived.thread.id)",
]
return "\n".join(lines)
def _render_thread_block(
turn_fields: list[PublicFieldSpec],
) -> str:
lines = [
" def turn(",
" self,",
" input: Input,",
" *,",
*_kw_signature_lines(turn_fields),
" ) -> Turn:",
" wire_input = _to_wire_input(input)",
" params = TurnStartParams(",
" thread_id=self.id,",
" input=wire_input,",
*_model_arg_lines(turn_fields),
" )",
" turn = self._client.turn_start(self.id, wire_input, params=params)",
" return Turn(self._client, self.id, turn.turn.id)",
]
return "\n".join(lines)
def _render_async_thread_block(
turn_fields: list[PublicFieldSpec],
) -> str:
lines = [
" async def turn(",
" self,",
" input: Input,",
" *,",
*_kw_signature_lines(turn_fields),
" ) -> AsyncTurn:",
" await self._codex._ensure_initialized()",
" wire_input = _to_wire_input(input)",
" params = TurnStartParams(",
" thread_id=self.id,",
" input=wire_input,",
*_model_arg_lines(turn_fields),
" )",
" turn = await self._codex._client.turn_start(",
" self.id,",
" wire_input,",
" params=params,",
" )",
" return AsyncTurn(self._codex, self.id, turn.turn.id)",
]
return "\n".join(lines)
def generate_public_api_flat_methods() -> None:
src_dir = sdk_root() / "src"
public_api_path = src_dir / "codex_app_server" / "public_api.py"
if not public_api_path.exists():
# PR2 can run codegen before the ergonomic public API layer is added.
return
src_dir_str = str(src_dir)
if src_dir_str not in sys.path:
sys.path.insert(0, src_dir_str)
thread_start_fields = _load_public_fields(
"codex_app_server.generated.v2_all",
"ThreadStartParams",
)
thread_list_fields = _load_public_fields(
"codex_app_server.generated.v2_all",
"ThreadListParams",
)
thread_resume_fields = _load_public_fields(
"codex_app_server.generated.v2_all",
"ThreadResumeParams",
exclude={"thread_id"},
)
thread_fork_fields = _load_public_fields(
"codex_app_server.generated.v2_all",
"ThreadForkParams",
exclude={"thread_id"},
)
turn_start_fields = _load_public_fields(
"codex_app_server.generated.v2_all",
"TurnStartParams",
exclude={"thread_id", "input"},
)
source = public_api_path.read_text()
source = _replace_generated_block(
source,
"Codex.flat_methods",
_render_codex_block(
thread_start_fields,
thread_list_fields,
thread_resume_fields,
thread_fork_fields,
),
)
source = _replace_generated_block(
source,
"AsyncCodex.flat_methods",
_render_async_codex_block(
thread_start_fields,
thread_list_fields,
thread_resume_fields,
thread_fork_fields,
),
)
source = _replace_generated_block(
source,
"Thread.flat_methods",
_render_thread_block(turn_start_fields),
)
source = _replace_generated_block(
source,
"AsyncThread.flat_methods",
_render_async_thread_block(turn_start_fields),
)
public_api_path.write_text(source)
def generate_types() -> None:
# v2_all is the authoritative generated surface.
generate_v2_all()
generate_notification_registry()
generate_public_api_flat_methods()
def main() -> None:
parser = argparse.ArgumentParser(description="Single SDK maintenance entrypoint")
parser.add_argument("--channel", choices=["stable", "alpha"], default="stable")
parser.add_argument("--types-only", action="store_true", help="Regenerate types only (skip binary update)")
parser.add_argument(
"--bundle-all-platforms",
action="store_true",
help="Download and bundle codex binaries for all supported OS/arch targets",
)
args = parser.parse_args()
if not args.types_only:
if args.bundle_all_platforms:
bundle_all_platform_binaries(args.channel)
else:
update_binary(args.channel)
generate_types()
print("Done.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,110 @@
from .client import AppServerConfig
from .errors import (
AppServerError,
AppServerRpcError,
InvalidParamsError,
InvalidRequestError,
InternalRpcError,
JsonRpcError,
MethodNotFoundError,
ParseError,
RetryLimitExceededError,
ServerBusyError,
TransportClosedError,
is_retryable_error,
)
from .generated.v2_types import (
ThreadItem,
ThreadTokenUsageUpdatedNotification,
TurnCompletedNotificationPayload,
)
from .public_api import (
AsyncCodex,
AsyncThread,
AsyncTurn,
Codex,
ImageInput,
InitializeResult,
Input,
InputItem,
LocalImageInput,
MentionInput,
SkillInput,
TextInput,
Thread,
Turn,
TurnResult,
)
from .public_types import (
AskForApproval,
Personality,
PlanType,
ReasoningEffort,
ReasoningSummary,
SandboxMode,
SandboxPolicy,
ThreadForkParams,
ThreadListParams,
ThreadResumeParams,
ThreadSortKey,
ThreadSourceKind,
ThreadStartParams,
TurnStartParams,
TurnStatus,
TurnSteerParams,
)
from .retry import retry_on_overload
__version__ = "0.2.0"
__all__ = [
"__version__",
"AppServerConfig",
"Codex",
"AsyncCodex",
"Thread",
"AsyncThread",
"Turn",
"AsyncTurn",
"TurnResult",
"InitializeResult",
"Input",
"InputItem",
"TextInput",
"ImageInput",
"LocalImageInput",
"SkillInput",
"MentionInput",
"ThreadItem",
"ThreadTokenUsageUpdatedNotification",
"TurnCompletedNotificationPayload",
"AskForApproval",
"Personality",
"PlanType",
"ReasoningEffort",
"ReasoningSummary",
"SandboxMode",
"SandboxPolicy",
"ThreadStartParams",
"ThreadResumeParams",
"ThreadListParams",
"ThreadSortKey",
"ThreadSourceKind",
"ThreadForkParams",
"TurnStatus",
"TurnStartParams",
"TurnSteerParams",
"retry_on_overload",
"AppServerError",
"TransportClosedError",
"JsonRpcError",
"AppServerRpcError",
"ParseError",
"InvalidRequestError",
"MethodNotFoundError",
"InvalidParamsError",
"InternalRpcError",
"ServerBusyError",
"RetryLimitExceededError",
"is_retryable_error",
]

View File

@@ -0,0 +1,208 @@
from __future__ import annotations
import asyncio
from collections.abc import Iterator
from typing import AsyncIterator, Callable, Iterable, ParamSpec, TypeVar
from pydantic import BaseModel
from .client import AppServerClient, AppServerConfig
from .generated.v2_all import (
AgentMessageDeltaNotification,
ModelListResponse,
ThreadArchiveResponse,
ThreadCompactStartResponse,
ThreadForkParams as V2ThreadForkParams,
ThreadForkResponse,
ThreadListParams as V2ThreadListParams,
ThreadListResponse,
ThreadReadResponse,
ThreadResumeParams as V2ThreadResumeParams,
ThreadResumeResponse,
ThreadSetNameResponse,
ThreadStartParams as V2ThreadStartParams,
ThreadStartResponse,
ThreadUnarchiveResponse,
TurnCompletedNotification,
TurnInterruptResponse,
TurnStartParams as V2TurnStartParams,
TurnStartResponse,
TurnSteerResponse,
)
from .models import InitializeResponse, JsonObject, Notification
ModelT = TypeVar("ModelT", bound=BaseModel)
ParamsT = ParamSpec("ParamsT")
ReturnT = TypeVar("ReturnT")
class AsyncAppServerClient:
"""Async wrapper around AppServerClient using thread offloading."""
def __init__(self, config: AppServerConfig | None = None) -> None:
self._sync = AppServerClient(config=config)
# Single stdio transport cannot be read safely from multiple threads.
self._transport_lock = asyncio.Lock()
async def __aenter__(self) -> "AsyncAppServerClient":
await self.start()
return self
async def __aexit__(self, _exc_type, _exc, _tb) -> None:
await self.close()
async def _call_sync(
self,
fn: Callable[ParamsT, ReturnT],
/,
*args: ParamsT.args,
**kwargs: ParamsT.kwargs,
) -> ReturnT:
async with self._transport_lock:
return await asyncio.to_thread(fn, *args, **kwargs)
@staticmethod
def _next_from_iterator(
iterator: Iterator[AgentMessageDeltaNotification],
) -> tuple[bool, AgentMessageDeltaNotification | None]:
try:
return True, next(iterator)
except StopIteration:
return False, None
async def start(self) -> None:
await self._call_sync(self._sync.start)
async def close(self) -> None:
await self._call_sync(self._sync.close)
async def initialize(self) -> InitializeResponse:
return await self._call_sync(self._sync.initialize)
def acquire_turn_consumer(self, turn_id: str) -> None:
self._sync.acquire_turn_consumer(turn_id)
def release_turn_consumer(self, turn_id: str) -> None:
self._sync.release_turn_consumer(turn_id)
async def request(
self,
method: str,
params: JsonObject | None,
*,
response_model: type[ModelT],
) -> ModelT:
return await self._call_sync(
self._sync.request,
method,
params,
response_model=response_model,
)
async def thread_start(self, params: V2ThreadStartParams | JsonObject | None = None) -> ThreadStartResponse:
return await self._call_sync(self._sync.thread_start, params)
async def thread_resume(
self,
thread_id: str,
params: V2ThreadResumeParams | JsonObject | None = None,
) -> ThreadResumeResponse:
return await self._call_sync(self._sync.thread_resume, thread_id, params)
async def thread_list(self, params: V2ThreadListParams | JsonObject | None = None) -> ThreadListResponse:
return await self._call_sync(self._sync.thread_list, params)
async def thread_read(self, thread_id: str, include_turns: bool = False) -> ThreadReadResponse:
return await self._call_sync(self._sync.thread_read, thread_id, include_turns)
async def thread_fork(
self,
thread_id: str,
params: V2ThreadForkParams | JsonObject | None = None,
) -> ThreadForkResponse:
return await self._call_sync(self._sync.thread_fork, thread_id, params)
async def thread_archive(self, thread_id: str) -> ThreadArchiveResponse:
return await self._call_sync(self._sync.thread_archive, thread_id)
async def thread_unarchive(self, thread_id: str) -> ThreadUnarchiveResponse:
return await self._call_sync(self._sync.thread_unarchive, thread_id)
async def thread_set_name(self, thread_id: str, name: str) -> ThreadSetNameResponse:
return await self._call_sync(self._sync.thread_set_name, thread_id, name)
async def thread_compact(self, thread_id: str) -> ThreadCompactStartResponse:
return await self._call_sync(self._sync.thread_compact, thread_id)
async def turn_start(
self,
thread_id: str,
input_items: list[JsonObject] | JsonObject | str,
params: V2TurnStartParams | JsonObject | None = None,
) -> TurnStartResponse:
return await self._call_sync(self._sync.turn_start, thread_id, input_items, params)
async def turn_interrupt(self, thread_id: str, turn_id: str) -> TurnInterruptResponse:
return await self._call_sync(self._sync.turn_interrupt, thread_id, turn_id)
async def turn_steer(
self,
thread_id: str,
expected_turn_id: str,
input_items: list[JsonObject] | JsonObject | str,
) -> TurnSteerResponse:
return await self._call_sync(
self._sync.turn_steer,
thread_id,
expected_turn_id,
input_items,
)
async def model_list(self, include_hidden: bool = False) -> ModelListResponse:
return await self._call_sync(self._sync.model_list, include_hidden)
async def request_with_retry_on_overload(
self,
method: str,
params: JsonObject | None,
*,
response_model: type[ModelT],
max_attempts: int = 3,
initial_delay_s: float = 0.25,
max_delay_s: float = 2.0,
) -> ModelT:
return await self._call_sync(
self._sync.request_with_retry_on_overload,
method,
params,
response_model=response_model,
max_attempts=max_attempts,
initial_delay_s=initial_delay_s,
max_delay_s=max_delay_s,
)
async def next_notification(self) -> Notification:
return await self._call_sync(self._sync.next_notification)
async def wait_for_turn_completed(self, turn_id: str) -> TurnCompletedNotification:
return await self._call_sync(self._sync.wait_for_turn_completed, turn_id)
async def stream_until_methods(self, methods: Iterable[str] | str) -> list[Notification]:
return await self._call_sync(self._sync.stream_until_methods, methods)
async def stream_text(
self,
thread_id: str,
text: str,
params: V2TurnStartParams | JsonObject | None = None,
) -> AsyncIterator[AgentMessageDeltaNotification]:
async with self._transport_lock:
iterator = self._sync.stream_text(thread_id, text, params)
while True:
has_value, chunk = await asyncio.to_thread(
self._next_from_iterator,
iterator,
)
if not has_value:
break
yield chunk

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,521 @@
from __future__ import annotations
import json
import os
import subprocess
import threading
import uuid
from collections import deque
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, Iterable, Iterator, TypeVar
from pydantic import BaseModel
from .errors import AppServerError, TransportClosedError, map_jsonrpc_error
from .generated.notification_registry import NOTIFICATION_MODELS
from .generated.v2_all import (
AgentMessageDeltaNotification,
ModelListResponse,
ThreadArchiveResponse,
ThreadCompactStartResponse,
ThreadForkParams as V2ThreadForkParams,
ThreadForkResponse,
ThreadListParams as V2ThreadListParams,
ThreadListResponse,
ThreadReadResponse,
ThreadResumeParams as V2ThreadResumeParams,
ThreadResumeResponse,
ThreadSetNameResponse,
ThreadStartParams as V2ThreadStartParams,
ThreadStartResponse,
ThreadUnarchiveResponse,
TurnCompletedNotification,
TurnInterruptResponse,
TurnStartParams as V2TurnStartParams,
TurnStartResponse,
TurnSteerResponse,
)
from .models import (
InitializeResponse,
JsonObject,
JsonValue,
Notification,
UnknownNotification,
)
from .retry import retry_on_overload
ModelT = TypeVar("ModelT", bound=BaseModel)
ApprovalHandler = Callable[[str, JsonObject | None], JsonObject]
def _params_dict(
params: (
V2ThreadStartParams
| V2ThreadResumeParams
| V2ThreadListParams
| V2ThreadForkParams
| V2TurnStartParams
| JsonObject
| None
),
) -> JsonObject:
if params is None:
return {}
if hasattr(params, "model_dump"):
dumped = params.model_dump(
by_alias=True,
exclude_none=True,
mode="json",
)
if not isinstance(dumped, dict):
raise TypeError("Expected model_dump() to return dict")
return dumped
if isinstance(params, dict):
return params
raise TypeError(f"Expected generated params model or dict, got {type(params).__name__}")
def _bundled_codex_path() -> Path:
import platform
sys_name = platform.system().lower()
machine = platform.machine().lower()
if sys_name.startswith("darwin"):
platform_dir = "darwin-arm64" if machine in {"arm64", "aarch64"} else "darwin-x64"
exe = "codex"
elif sys_name.startswith("linux"):
platform_dir = "linux-arm64" if machine in {"arm64", "aarch64"} else "linux-x64"
exe = "codex"
elif sys_name.startswith("windows") or os.name == "nt":
platform_dir = "windows-arm64" if machine in {"arm64", "aarch64"} else "windows-x64"
exe = "codex.exe"
else:
raise RuntimeError(f"Unsupported OS for bundled codex binary: {sys_name}/{machine}")
return Path(__file__).resolve().parent / "bin" / platform_dir / exe
@dataclass(slots=True)
class AppServerConfig:
codex_bin: str = str(_bundled_codex_path())
launch_args_override: tuple[str, ...] | None = None
config_overrides: tuple[str, ...] = ()
cwd: str | None = None
env: dict[str, str] | None = None
client_name: str = "codex_python_sdk"
client_title: str = "Codex Python SDK"
client_version: str = "0.2.0"
experimental_api: bool = True
class AppServerClient:
"""Synchronous typed JSON-RPC client for `codex app-server` over stdio."""
def __init__(
self,
config: AppServerConfig | None = None,
approval_handler: ApprovalHandler | None = None,
) -> None:
self.config = config or AppServerConfig()
self._approval_handler = approval_handler or self._default_approval_handler
self._proc: subprocess.Popen[str] | None = None
self._lock = threading.Lock()
self._turn_consumer_lock = threading.Lock()
self._active_turn_consumer: str | None = None
self._pending_notifications: deque[Notification] = deque()
self._stderr_lines: deque[str] = deque(maxlen=400)
self._stderr_thread: threading.Thread | None = None
def __enter__(self) -> "AppServerClient":
self.start()
return self
def __exit__(self, _exc_type, _exc, _tb) -> None:
self.close()
def start(self) -> None:
if self._proc is not None:
return
if self.config.launch_args_override is not None:
args = list(self.config.launch_args_override)
else:
codex_bin = Path(self.config.codex_bin)
if not codex_bin.exists():
raise FileNotFoundError(
f"Pinned codex binary not found at {codex_bin}. Run `python scripts/update_sdk_artifacts.py --channel stable` from sdk/python."
)
args = [str(codex_bin)]
for kv in self.config.config_overrides:
args.extend(["--config", kv])
args.extend(["app-server", "--listen", "stdio://"])
env = os.environ.copy()
if self.config.env:
env.update(self.config.env)
self._proc = subprocess.Popen(
args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
cwd=self.config.cwd,
env=env,
bufsize=1,
)
self._start_stderr_drain_thread()
def close(self) -> None:
if self._proc is None:
return
proc = self._proc
self._proc = None
self._active_turn_consumer = None
if proc.stdin:
proc.stdin.close()
try:
proc.terminate()
proc.wait(timeout=2)
except Exception:
proc.kill()
if self._stderr_thread and self._stderr_thread.is_alive():
self._stderr_thread.join(timeout=0.5)
def initialize(self) -> InitializeResponse:
result = self.request(
"initialize",
{
"clientInfo": {
"name": self.config.client_name,
"title": self.config.client_title,
"version": self.config.client_version,
},
"capabilities": {
"experimentalApi": self.config.experimental_api,
},
},
response_model=InitializeResponse,
)
self.notify("initialized", None)
return result
def request(
self,
method: str,
params: JsonObject | None,
*,
response_model: type[ModelT],
) -> ModelT:
result = self._request_raw(method, params)
if not isinstance(result, dict):
raise AppServerError(f"{method} response must be a JSON object")
return response_model.model_validate(result)
def _request_raw(self, method: str, params: JsonObject | None = None) -> JsonValue:
request_id = str(uuid.uuid4())
self._write_message({"id": request_id, "method": method, "params": params or {}})
while True:
msg = self._read_message()
if "method" in msg and "id" in msg:
response = self._handle_server_request(msg)
self._write_message({"id": msg["id"], "result": response})
continue
if "method" in msg and "id" not in msg:
self._pending_notifications.append(
self._coerce_notification(msg["method"], msg.get("params"))
)
continue
if msg.get("id") != request_id:
continue
if "error" in msg:
err = msg["error"]
if isinstance(err, dict):
raise map_jsonrpc_error(
int(err.get("code", -32000)),
str(err.get("message", "unknown")),
err.get("data"),
)
raise AppServerError("Malformed JSON-RPC error response")
return msg.get("result")
def notify(self, method: str, params: JsonObject | None = None) -> None:
self._write_message({"method": method, "params": params or {}})
def next_notification(self) -> Notification:
if self._pending_notifications:
return self._pending_notifications.popleft()
while True:
msg = self._read_message()
if "method" in msg and "id" in msg:
response = self._handle_server_request(msg)
self._write_message({"id": msg["id"], "result": response})
continue
if "method" in msg and "id" not in msg:
return self._coerce_notification(msg["method"], msg.get("params"))
def acquire_turn_consumer(self, turn_id: str) -> None:
with self._turn_consumer_lock:
if self._active_turn_consumer is not None:
raise RuntimeError(
"Concurrent turn consumers are not yet supported in the experimental SDK. "
f"Client is already streaming turn {self._active_turn_consumer!r}; "
f"cannot start turn {turn_id!r} until the active consumer finishes."
)
self._active_turn_consumer = turn_id
def release_turn_consumer(self, turn_id: str) -> None:
with self._turn_consumer_lock:
if self._active_turn_consumer == turn_id:
self._active_turn_consumer = None
def thread_start(self, params: V2ThreadStartParams | JsonObject | None = None) -> ThreadStartResponse:
return self.request("thread/start", _params_dict(params), response_model=ThreadStartResponse)
def thread_resume(
self,
thread_id: str,
params: V2ThreadResumeParams | JsonObject | None = None,
) -> ThreadResumeResponse:
payload = {"threadId": thread_id, **_params_dict(params)}
return self.request("thread/resume", payload, response_model=ThreadResumeResponse)
def thread_list(self, params: V2ThreadListParams | JsonObject | None = None) -> ThreadListResponse:
return self.request("thread/list", _params_dict(params), response_model=ThreadListResponse)
def thread_read(self, thread_id: str, include_turns: bool = False) -> ThreadReadResponse:
return self.request(
"thread/read",
{"threadId": thread_id, "includeTurns": include_turns},
response_model=ThreadReadResponse,
)
def thread_fork(
self,
thread_id: str,
params: V2ThreadForkParams | JsonObject | None = None,
) -> ThreadForkResponse:
payload = {"threadId": thread_id, **_params_dict(params)}
return self.request("thread/fork", payload, response_model=ThreadForkResponse)
def thread_archive(self, thread_id: str) -> ThreadArchiveResponse:
return self.request("thread/archive", {"threadId": thread_id}, response_model=ThreadArchiveResponse)
def thread_unarchive(self, thread_id: str) -> ThreadUnarchiveResponse:
return self.request("thread/unarchive", {"threadId": thread_id}, response_model=ThreadUnarchiveResponse)
def thread_set_name(self, thread_id: str, name: str) -> ThreadSetNameResponse:
return self.request(
"thread/name/set",
{"threadId": thread_id, "name": name},
response_model=ThreadSetNameResponse,
)
def thread_compact(self, thread_id: str) -> ThreadCompactStartResponse:
return self.request(
"thread/compact/start",
{"threadId": thread_id},
response_model=ThreadCompactStartResponse,
)
def turn_start(
self,
thread_id: str,
input_items: list[JsonObject] | JsonObject | str,
params: V2TurnStartParams | JsonObject | None = None,
) -> TurnStartResponse:
payload = {
**_params_dict(params),
"threadId": thread_id,
"input": self._normalize_input_items(input_items),
}
return self.request("turn/start", payload, response_model=TurnStartResponse)
def turn_interrupt(self, thread_id: str, turn_id: str) -> TurnInterruptResponse:
return self.request(
"turn/interrupt",
{"threadId": thread_id, "turnId": turn_id},
response_model=TurnInterruptResponse,
)
def turn_steer(
self,
thread_id: str,
expected_turn_id: str,
input_items: list[JsonObject] | JsonObject | str,
) -> TurnSteerResponse:
return self.request(
"turn/steer",
{
"threadId": thread_id,
"expectedTurnId": expected_turn_id,
"input": self._normalize_input_items(input_items),
},
response_model=TurnSteerResponse,
)
def model_list(self, include_hidden: bool = False) -> ModelListResponse:
return self.request(
"model/list",
{"includeHidden": include_hidden},
response_model=ModelListResponse,
)
def request_with_retry_on_overload(
self,
method: str,
params: JsonObject | None,
*,
response_model: type[ModelT],
max_attempts: int = 3,
initial_delay_s: float = 0.25,
max_delay_s: float = 2.0,
) -> ModelT:
return retry_on_overload(
lambda: self.request(method, params, response_model=response_model),
max_attempts=max_attempts,
initial_delay_s=initial_delay_s,
max_delay_s=max_delay_s,
)
def wait_for_turn_completed(self, turn_id: str) -> TurnCompletedNotification:
while True:
notification = self.next_notification()
if (
notification.method == "turn/completed"
and isinstance(notification.payload, TurnCompletedNotification)
and notification.payload.turn.id == turn_id
):
return notification.payload
def stream_until_methods(self, methods: Iterable[str] | str) -> list[Notification]:
target_methods = {methods} if isinstance(methods, str) else set(methods)
out: list[Notification] = []
while True:
notification = self.next_notification()
out.append(notification)
if notification.method in target_methods:
return out
def stream_text(
self,
thread_id: str,
text: str,
params: V2TurnStartParams | JsonObject | None = None,
) -> Iterator[AgentMessageDeltaNotification]:
started = self.turn_start(thread_id, text, params=params)
turn_id = started.turn.id
while True:
notification = self.next_notification()
if (
notification.method == "item/agentMessage/delta"
and isinstance(notification.payload, AgentMessageDeltaNotification)
and notification.payload.turn_id == turn_id
):
yield notification.payload
continue
if (
notification.method == "turn/completed"
and isinstance(notification.payload, TurnCompletedNotification)
and notification.payload.turn.id == turn_id
):
break
def _coerce_notification(self, method: str, params: object) -> Notification:
params_dict = params if isinstance(params, dict) else {}
model = NOTIFICATION_MODELS.get(method)
if model is None:
return Notification(method=method, payload=UnknownNotification(params=params_dict))
try:
payload = model.model_validate(params_dict)
except Exception: # noqa: BLE001
return Notification(method=method, payload=UnknownNotification(params=params_dict))
return Notification(method=method, payload=payload)
def _normalize_input_items(
self,
input_items: list[JsonObject] | JsonObject | str,
) -> list[JsonObject]:
if isinstance(input_items, str):
return [{"type": "text", "text": input_items}]
if isinstance(input_items, dict):
return [input_items]
return input_items
def _default_approval_handler(self, method: str, params: JsonObject | None) -> JsonObject:
if method == "item/commandExecution/requestApproval":
return {"decision": "accept"}
if method == "item/fileChange/requestApproval":
return {"decision": "accept"}
return {}
def _start_stderr_drain_thread(self) -> None:
if self._proc is None or self._proc.stderr is None:
return
def _drain() -> None:
stderr = self._proc.stderr
if stderr is None:
return
for line in stderr:
self._stderr_lines.append(line.rstrip("\n"))
self._stderr_thread = threading.Thread(target=_drain, daemon=True)
self._stderr_thread.start()
def _stderr_tail(self, limit: int = 40) -> str:
return "\n".join(list(self._stderr_lines)[-limit:])
def _handle_server_request(self, msg: dict[str, JsonValue]) -> JsonObject:
method = msg["method"]
params = msg.get("params")
if not isinstance(method, str):
return {}
return self._approval_handler(
method,
params if isinstance(params, dict) else None,
)
def _write_message(self, payload: JsonObject) -> None:
if self._proc is None or self._proc.stdin is None:
raise TransportClosedError("app-server is not running")
with self._lock:
self._proc.stdin.write(json.dumps(payload) + "\n")
self._proc.stdin.flush()
def _read_message(self) -> dict[str, JsonValue]:
if self._proc is None or self._proc.stdout is None:
raise TransportClosedError("app-server is not running")
line = self._proc.stdout.readline()
if not line:
raise TransportClosedError(
f"app-server closed stdout. stderr_tail={self._stderr_tail()[:2000]}"
)
try:
message = json.loads(line)
except json.JSONDecodeError as exc:
raise AppServerError(f"Invalid JSON-RPC line: {line!r}") from exc
if not isinstance(message, dict):
raise AppServerError(f"Invalid JSON-RPC payload: {message!r}")
return message
def default_codex_home() -> str:
return str(Path.home() / ".codex")

View File

@@ -0,0 +1,125 @@
from __future__ import annotations
from typing import Any
class AppServerError(Exception):
"""Base exception for SDK errors."""
class JsonRpcError(AppServerError):
"""Raw JSON-RPC error wrapper from the server."""
def __init__(self, code: int, message: str, data: Any = None):
super().__init__(f"JSON-RPC error {code}: {message}")
self.code = code
self.message = message
self.data = data
class TransportClosedError(AppServerError):
"""Raised when the app-server transport closes unexpectedly."""
class AppServerRpcError(JsonRpcError):
"""Base typed error for JSON-RPC failures."""
class ParseError(AppServerRpcError):
pass
class InvalidRequestError(AppServerRpcError):
pass
class MethodNotFoundError(AppServerRpcError):
pass
class InvalidParamsError(AppServerRpcError):
pass
class InternalRpcError(AppServerRpcError):
pass
class ServerBusyError(AppServerRpcError):
"""Server is overloaded / unavailable and caller should retry."""
class RetryLimitExceededError(ServerBusyError):
"""Server exhausted internal retry budget for a retryable operation."""
def _contains_retry_limit_text(message: str) -> bool:
lowered = message.lower()
return "retry limit" in lowered or "too many failed attempts" in lowered
def _is_server_overloaded(data: Any) -> bool:
if data is None:
return False
if isinstance(data, str):
return data.lower() == "server_overloaded"
if isinstance(data, dict):
direct = (
data.get("codex_error_info")
or data.get("codexErrorInfo")
or data.get("errorInfo")
)
if isinstance(direct, str) and direct.lower() == "server_overloaded":
return True
if isinstance(direct, dict):
for value in direct.values():
if isinstance(value, str) and value.lower() == "server_overloaded":
return True
for value in data.values():
if _is_server_overloaded(value):
return True
if isinstance(data, list):
return any(_is_server_overloaded(value) for value in data)
return False
def map_jsonrpc_error(code: int, message: str, data: Any = None) -> JsonRpcError:
"""Map a raw JSON-RPC error into a richer SDK exception class."""
if code == -32700:
return ParseError(code, message, data)
if code == -32600:
return InvalidRequestError(code, message, data)
if code == -32601:
return MethodNotFoundError(code, message, data)
if code == -32602:
return InvalidParamsError(code, message, data)
if code == -32603:
return InternalRpcError(code, message, data)
if -32099 <= code <= -32000:
if _is_server_overloaded(data):
if _contains_retry_limit_text(message):
return RetryLimitExceededError(code, message, data)
return ServerBusyError(code, message, data)
if _contains_retry_limit_text(message):
return RetryLimitExceededError(code, message, data)
return AppServerRpcError(code, message, data)
return JsonRpcError(code, message, data)
def is_retryable_error(exc: BaseException) -> bool:
"""True if the exception is a transient overload-style error."""
if isinstance(exc, ServerBusyError):
return True
if isinstance(exc, JsonRpcError):
return _is_server_overloaded(exc.data)
return False

View File

@@ -0,0 +1 @@
"""Auto-generated Python types derived from the app-server schemas."""

View File

@@ -0,0 +1,102 @@
# Auto-generated by scripts/update_sdk_artifacts.py
# DO NOT EDIT MANUALLY.
from __future__ import annotations
from pydantic import BaseModel
from .v2_all import AccountLoginCompletedNotification
from .v2_all import AccountRateLimitsUpdatedNotification
from .v2_all import AccountUpdatedNotification
from .v2_all import AgentMessageDeltaNotification
from .v2_all import AppListUpdatedNotification
from .v2_all import CommandExecOutputDeltaNotification
from .v2_all import CommandExecutionOutputDeltaNotification
from .v2_all import ConfigWarningNotification
from .v2_all import ContextCompactedNotification
from .v2_all import DeprecationNoticeNotification
from .v2_all import ErrorNotification
from .v2_all import FileChangeOutputDeltaNotification
from .v2_all import FuzzyFileSearchSessionCompletedNotification
from .v2_all import FuzzyFileSearchSessionUpdatedNotification
from .v2_all import HookCompletedNotification
from .v2_all import HookStartedNotification
from .v2_all import ItemCompletedNotification
from .v2_all import ItemStartedNotification
from .v2_all import McpServerOauthLoginCompletedNotification
from .v2_all import McpToolCallProgressNotification
from .v2_all import ModelReroutedNotification
from .v2_all import PlanDeltaNotification
from .v2_all import ReasoningSummaryPartAddedNotification
from .v2_all import ReasoningSummaryTextDeltaNotification
from .v2_all import ReasoningTextDeltaNotification
from .v2_all import ServerRequestResolvedNotification
from .v2_all import SkillsChangedNotification
from .v2_all import TerminalInteractionNotification
from .v2_all import ThreadArchivedNotification
from .v2_all import ThreadClosedNotification
from .v2_all import ThreadNameUpdatedNotification
from .v2_all import ThreadRealtimeClosedNotification
from .v2_all import ThreadRealtimeErrorNotification
from .v2_all import ThreadRealtimeItemAddedNotification
from .v2_all import ThreadRealtimeOutputAudioDeltaNotification
from .v2_all import ThreadRealtimeStartedNotification
from .v2_all import ThreadStartedNotification
from .v2_all import ThreadStatusChangedNotification
from .v2_all import ThreadTokenUsageUpdatedNotification
from .v2_all import ThreadUnarchivedNotification
from .v2_all import TurnCompletedNotification
from .v2_all import TurnDiffUpdatedNotification
from .v2_all import TurnPlanUpdatedNotification
from .v2_all import TurnStartedNotification
from .v2_all import WindowsSandboxSetupCompletedNotification
from .v2_all import WindowsWorldWritableWarningNotification
NOTIFICATION_MODELS: dict[str, type[BaseModel]] = {
"account/login/completed": AccountLoginCompletedNotification,
"account/rateLimits/updated": AccountRateLimitsUpdatedNotification,
"account/updated": AccountUpdatedNotification,
"app/list/updated": AppListUpdatedNotification,
"command/exec/outputDelta": CommandExecOutputDeltaNotification,
"configWarning": ConfigWarningNotification,
"deprecationNotice": DeprecationNoticeNotification,
"error": ErrorNotification,
"fuzzyFileSearch/sessionCompleted": FuzzyFileSearchSessionCompletedNotification,
"fuzzyFileSearch/sessionUpdated": FuzzyFileSearchSessionUpdatedNotification,
"hook/completed": HookCompletedNotification,
"hook/started": HookStartedNotification,
"item/agentMessage/delta": AgentMessageDeltaNotification,
"item/commandExecution/outputDelta": CommandExecutionOutputDeltaNotification,
"item/commandExecution/terminalInteraction": TerminalInteractionNotification,
"item/completed": ItemCompletedNotification,
"item/fileChange/outputDelta": FileChangeOutputDeltaNotification,
"item/mcpToolCall/progress": McpToolCallProgressNotification,
"item/plan/delta": PlanDeltaNotification,
"item/reasoning/summaryPartAdded": ReasoningSummaryPartAddedNotification,
"item/reasoning/summaryTextDelta": ReasoningSummaryTextDeltaNotification,
"item/reasoning/textDelta": ReasoningTextDeltaNotification,
"item/started": ItemStartedNotification,
"mcpServer/oauthLogin/completed": McpServerOauthLoginCompletedNotification,
"model/rerouted": ModelReroutedNotification,
"serverRequest/resolved": ServerRequestResolvedNotification,
"skills/changed": SkillsChangedNotification,
"thread/archived": ThreadArchivedNotification,
"thread/closed": ThreadClosedNotification,
"thread/compacted": ContextCompactedNotification,
"thread/name/updated": ThreadNameUpdatedNotification,
"thread/realtime/closed": ThreadRealtimeClosedNotification,
"thread/realtime/error": ThreadRealtimeErrorNotification,
"thread/realtime/itemAdded": ThreadRealtimeItemAddedNotification,
"thread/realtime/outputAudio/delta": ThreadRealtimeOutputAudioDeltaNotification,
"thread/realtime/started": ThreadRealtimeStartedNotification,
"thread/started": ThreadStartedNotification,
"thread/status/changed": ThreadStatusChangedNotification,
"thread/tokenUsage/updated": ThreadTokenUsageUpdatedNotification,
"thread/unarchived": ThreadUnarchivedNotification,
"turn/completed": TurnCompletedNotification,
"turn/diff/updated": TurnDiffUpdatedNotification,
"turn/plan/updated": TurnPlanUpdatedNotification,
"turn/started": TurnStartedNotification,
"windows/worldWritableWarning": WindowsWorldWritableWarningNotification,
"windowsSandbox/setupCompleted": WindowsSandboxSetupCompletedNotification,
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,23 @@
"""Stable aliases over the canonical generated v2 models."""
from .v2_all import (
ModelListResponse,
ThreadCompactStartResponse,
ThreadItem,
ThreadListResponse,
ThreadReadResponse,
ThreadTokenUsageUpdatedNotification,
TurnCompletedNotification as TurnCompletedNotificationPayload,
TurnSteerResponse,
)
__all__ = [
"ModelListResponse",
"ThreadCompactStartResponse",
"ThreadItem",
"ThreadListResponse",
"ThreadReadResponse",
"ThreadTokenUsageUpdatedNotification",
"TurnCompletedNotificationPayload",
"TurnSteerResponse",
]

View File

@@ -0,0 +1,97 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import TypeAlias
from pydantic import BaseModel
from .generated.v2_all import (
AccountLoginCompletedNotification,
AccountRateLimitsUpdatedNotification,
AccountUpdatedNotification,
AgentMessageDeltaNotification,
AppListUpdatedNotification,
CommandExecutionOutputDeltaNotification,
ConfigWarningNotification,
ContextCompactedNotification,
DeprecationNoticeNotification,
ErrorNotification,
FileChangeOutputDeltaNotification,
ItemCompletedNotification,
ItemStartedNotification,
McpServerOauthLoginCompletedNotification,
McpToolCallProgressNotification,
PlanDeltaNotification,
RawResponseItemCompletedNotification,
ReasoningSummaryPartAddedNotification,
ReasoningSummaryTextDeltaNotification,
ReasoningTextDeltaNotification,
TerminalInteractionNotification,
ThreadNameUpdatedNotification,
ThreadStartedNotification,
ThreadTokenUsageUpdatedNotification,
TurnCompletedNotification,
TurnDiffUpdatedNotification,
TurnPlanUpdatedNotification,
TurnStartedNotification,
WindowsWorldWritableWarningNotification,
)
JsonScalar: TypeAlias = str | int | float | bool | None
JsonValue: TypeAlias = JsonScalar | dict[str, "JsonValue"] | list["JsonValue"]
JsonObject: TypeAlias = dict[str, JsonValue]
@dataclass(slots=True)
class UnknownNotification:
params: JsonObject
NotificationPayload: TypeAlias = (
AccountLoginCompletedNotification
| AccountRateLimitsUpdatedNotification
| AccountUpdatedNotification
| AgentMessageDeltaNotification
| AppListUpdatedNotification
| CommandExecutionOutputDeltaNotification
| ConfigWarningNotification
| ContextCompactedNotification
| DeprecationNoticeNotification
| ErrorNotification
| FileChangeOutputDeltaNotification
| ItemCompletedNotification
| ItemStartedNotification
| McpServerOauthLoginCompletedNotification
| McpToolCallProgressNotification
| PlanDeltaNotification
| RawResponseItemCompletedNotification
| ReasoningSummaryPartAddedNotification
| ReasoningSummaryTextDeltaNotification
| ReasoningTextDeltaNotification
| TerminalInteractionNotification
| ThreadNameUpdatedNotification
| ThreadStartedNotification
| ThreadTokenUsageUpdatedNotification
| TurnCompletedNotification
| TurnDiffUpdatedNotification
| TurnPlanUpdatedNotification
| TurnStartedNotification
| WindowsWorldWritableWarningNotification
| UnknownNotification
)
@dataclass(slots=True)
class Notification:
method: str
payload: NotificationPayload
class ServerInfo(BaseModel):
name: str | None = None
version: str | None = None
class InitializeResponse(BaseModel):
serverInfo: ServerInfo | None = None
userAgent: str | None = None

View File

@@ -0,0 +1,790 @@
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from typing import AsyncIterator, Iterator
from .async_client import AsyncAppServerClient
from .client import AppServerClient, AppServerConfig
from .generated.v2_all import (
AgentMessageDeltaNotification,
RawResponseItemCompletedNotification,
ThreadArchiveResponse,
ThreadSetNameResponse,
TurnError,
TurnInterruptResponse,
)
from .generated.v2_types import (
ModelListResponse,
ThreadCompactStartResponse,
ThreadItem,
ThreadListResponse,
ThreadReadResponse,
ThreadTokenUsageUpdatedNotification,
TurnCompletedNotificationPayload,
TurnSteerResponse,
)
from .models import InitializeResponse, JsonObject, Notification
from .public_types import (
AskForApproval,
Personality,
ReasoningEffort,
ReasoningSummary,
SandboxMode,
SandboxPolicy,
ThreadForkParams,
ThreadListParams,
ThreadResumeParams,
ThreadSortKey,
ThreadSourceKind,
ThreadStartParams,
TurnStartParams,
TurnStatus,
)
@dataclass(slots=True)
class TurnResult:
thread_id: str
turn_id: str
status: TurnStatus
error: TurnError | None
text: str
items: list[ThreadItem]
usage: ThreadTokenUsageUpdatedNotification | None = None
@dataclass(slots=True)
class TextInput:
text: str
@dataclass(slots=True)
class ImageInput:
url: str
@dataclass(slots=True)
class LocalImageInput:
path: str
@dataclass(slots=True)
class SkillInput:
name: str
path: str
@dataclass(slots=True)
class MentionInput:
name: str
path: str
InputItem = TextInput | ImageInput | LocalImageInput | SkillInput | MentionInput
Input = list[InputItem] | InputItem
@dataclass(slots=True)
class InitializeResult:
server_name: str
server_version: str
user_agent: str
def _to_wire_item(item: InputItem) -> JsonObject:
if isinstance(item, TextInput):
return {"type": "text", "text": item.text}
if isinstance(item, ImageInput):
return {"type": "image", "url": item.url}
if isinstance(item, LocalImageInput):
return {"type": "localImage", "path": item.path}
if isinstance(item, SkillInput):
return {"type": "skill", "name": item.name, "path": item.path}
if isinstance(item, MentionInput):
return {"type": "mention", "name": item.name, "path": item.path}
raise TypeError(f"unsupported input item: {type(item)!r}")
def _to_wire_input(input: Input) -> list[JsonObject]:
if isinstance(input, list):
return [_to_wire_item(i) for i in input]
return [_to_wire_item(input)]
def _split_user_agent(user_agent: str) -> tuple[str | None, str | None]:
raw = user_agent.strip()
if not raw:
return None, None
if "/" in raw:
name, version = raw.split("/", 1)
return (name or None), (version or None)
parts = raw.split(maxsplit=1)
if len(parts) == 2:
return parts[0], parts[1]
return raw, None
def _enum_value(value: object) -> object:
return getattr(value, "value", value)
def _assistant_output_text_chunks(
notification: RawResponseItemCompletedNotification,
) -> list[str]:
item = notification.item.root
if _enum_value(getattr(item, "type", None)) != "message":
return []
if getattr(item, "role", None) != "assistant":
return []
chunks: list[str] = []
for content in getattr(item, "content", []) or []:
content_item = getattr(content, "root", content)
if _enum_value(getattr(content_item, "type", None)) != "output_text":
continue
text = getattr(content_item, "text", None)
if isinstance(text, str) and text:
chunks.append(text)
return chunks
def _build_turn_result(
completed: TurnCompletedNotificationPayload | None,
usage: ThreadTokenUsageUpdatedNotification | None,
delta_chunks: list[str],
raw_text_chunks: list[str],
) -> TurnResult:
if completed is None:
raise RuntimeError("turn completed event not received")
if completed.turn.status == TurnStatus.completed and usage is None:
raise RuntimeError(
"thread/tokenUsage/updated notification not received for completed turn"
)
text = "".join(delta_chunks) if delta_chunks else "".join(raw_text_chunks)
return TurnResult(
thread_id=completed.thread_id,
turn_id=completed.turn.id,
status=completed.turn.status,
error=completed.turn.error,
text=text,
items=list(completed.turn.items or []),
usage=usage,
)
class Codex:
"""Minimal typed SDK surface for app-server v2."""
def __init__(self, config: AppServerConfig | None = None) -> None:
self._client = AppServerClient(config=config)
try:
self._client.start()
self._init = self._parse_initialize(self._client.initialize())
except Exception:
self._client.close()
raise
def __enter__(self) -> "Codex":
return self
def __exit__(self, _exc_type, _exc, _tb) -> None:
self.close()
@staticmethod
def _parse_initialize(payload: InitializeResponse) -> InitializeResult:
user_agent = (payload.userAgent or "").strip()
server = payload.serverInfo
server_name: str | None = None
server_version: str | None = None
if server is not None:
server_name = (server.name or "").strip() or None
server_version = (server.version or "").strip() or None
if (server_name is None or server_version is None) and user_agent:
parsed_name, parsed_version = _split_user_agent(user_agent)
if server_name is None:
server_name = parsed_name
if server_version is None:
server_version = parsed_version
normalized_server_name = (server_name or "").strip()
normalized_server_version = (server_version or "").strip()
if not user_agent or not normalized_server_name or not normalized_server_version:
raise RuntimeError(
"initialize response missing required metadata "
f"(user_agent={user_agent!r}, server_name={normalized_server_name!r}, server_version={normalized_server_version!r})"
)
return InitializeResult(
server_name=normalized_server_name,
server_version=normalized_server_version,
user_agent=user_agent,
)
@property
def metadata(self) -> InitializeResult:
return self._init
def close(self) -> None:
self._client.close()
# BEGIN GENERATED: Codex.flat_methods
def thread_start(
self,
*,
approval_policy: AskForApproval | None = None,
base_instructions: str | None = None,
config: JsonObject | None = None,
cwd: str | None = None,
developer_instructions: str | None = None,
ephemeral: bool | None = None,
model: str | None = None,
model_provider: str | None = None,
personality: Personality | None = None,
sandbox: SandboxMode | None = None,
service_name: str | None = None,
service_tier: ServiceTier | None = None,
) -> Thread:
params = ThreadStartParams(
approval_policy=approval_policy,
base_instructions=base_instructions,
config=config,
cwd=cwd,
developer_instructions=developer_instructions,
ephemeral=ephemeral,
model=model,
model_provider=model_provider,
personality=personality,
sandbox=sandbox,
service_name=service_name,
service_tier=service_tier,
)
started = self._client.thread_start(params)
return Thread(self._client, started.thread.id)
def thread_list(
self,
*,
archived: bool | None = None,
cursor: str | None = None,
cwd: str | None = None,
limit: int | None = None,
model_providers: list[str] | None = None,
search_term: str | None = None,
sort_key: ThreadSortKey | None = None,
source_kinds: list[ThreadSourceKind] | None = None,
) -> ThreadListResponse:
params = ThreadListParams(
archived=archived,
cursor=cursor,
cwd=cwd,
limit=limit,
model_providers=model_providers,
search_term=search_term,
sort_key=sort_key,
source_kinds=source_kinds,
)
return self._client.thread_list(params)
def thread_resume(
self,
thread_id: str,
*,
approval_policy: AskForApproval | None = None,
base_instructions: str | None = None,
config: JsonObject | None = None,
cwd: str | None = None,
developer_instructions: str | None = None,
model: str | None = None,
model_provider: str | None = None,
personality: Personality | None = None,
sandbox: SandboxMode | None = None,
service_tier: ServiceTier | None = None,
) -> Thread:
params = ThreadResumeParams(
thread_id=thread_id,
approval_policy=approval_policy,
base_instructions=base_instructions,
config=config,
cwd=cwd,
developer_instructions=developer_instructions,
model=model,
model_provider=model_provider,
personality=personality,
sandbox=sandbox,
service_tier=service_tier,
)
resumed = self._client.thread_resume(thread_id, params)
return Thread(self._client, resumed.thread.id)
def thread_fork(
self,
thread_id: str,
*,
approval_policy: AskForApproval | None = None,
base_instructions: str | None = None,
config: JsonObject | None = None,
cwd: str | None = None,
developer_instructions: str | None = None,
model: str | None = None,
model_provider: str | None = None,
sandbox: SandboxMode | None = None,
service_tier: ServiceTier | None = None,
) -> Thread:
params = ThreadForkParams(
thread_id=thread_id,
approval_policy=approval_policy,
base_instructions=base_instructions,
config=config,
cwd=cwd,
developer_instructions=developer_instructions,
model=model,
model_provider=model_provider,
sandbox=sandbox,
service_tier=service_tier,
)
forked = self._client.thread_fork(thread_id, params)
return Thread(self._client, forked.thread.id)
def thread_archive(self, thread_id: str) -> ThreadArchiveResponse:
return self._client.thread_archive(thread_id)
def thread_unarchive(self, thread_id: str) -> Thread:
unarchived = self._client.thread_unarchive(thread_id)
return Thread(self._client, unarchived.thread.id)
# END GENERATED: Codex.flat_methods
def models(self, *, include_hidden: bool = False) -> ModelListResponse:
return self._client.model_list(include_hidden=include_hidden)
class AsyncCodex:
"""Async mirror of :class:`Codex` with matching method shapes."""
def __init__(self, config: AppServerConfig | None = None) -> None:
self._client = AsyncAppServerClient(config=config)
self._init: InitializeResult | None = None
self._initialized = False
self._init_lock = asyncio.Lock()
async def __aenter__(self) -> "AsyncCodex":
await self._ensure_initialized()
return self
async def __aexit__(self, _exc_type, _exc, _tb) -> None:
await self.close()
async def _ensure_initialized(self) -> None:
if self._initialized:
return
async with self._init_lock:
if self._initialized:
return
try:
await self._client.start()
payload = await self._client.initialize()
self._init = Codex._parse_initialize(payload)
self._initialized = True
except Exception:
await self._client.close()
self._init = None
self._initialized = False
raise
@property
def metadata(self) -> InitializeResult:
if self._init is None:
raise RuntimeError(
"AsyncCodex is not initialized yet. Use `async with AsyncCodex()` or call an async API first."
)
return self._init
async def close(self) -> None:
await self._client.close()
self._init = None
self._initialized = False
# BEGIN GENERATED: AsyncCodex.flat_methods
async def thread_start(
self,
*,
approval_policy: AskForApproval | None = None,
base_instructions: str | None = None,
config: JsonObject | None = None,
cwd: str | None = None,
developer_instructions: str | None = None,
ephemeral: bool | None = None,
model: str | None = None,
model_provider: str | None = None,
personality: Personality | None = None,
sandbox: SandboxMode | None = None,
service_name: str | None = None,
service_tier: ServiceTier | None = None,
) -> AsyncThread:
await self._ensure_initialized()
params = ThreadStartParams(
approval_policy=approval_policy,
base_instructions=base_instructions,
config=config,
cwd=cwd,
developer_instructions=developer_instructions,
ephemeral=ephemeral,
model=model,
model_provider=model_provider,
personality=personality,
sandbox=sandbox,
service_name=service_name,
service_tier=service_tier,
)
started = await self._client.thread_start(params)
return AsyncThread(self, started.thread.id)
async def thread_list(
self,
*,
archived: bool | None = None,
cursor: str | None = None,
cwd: str | None = None,
limit: int | None = None,
model_providers: list[str] | None = None,
search_term: str | None = None,
sort_key: ThreadSortKey | None = None,
source_kinds: list[ThreadSourceKind] | None = None,
) -> ThreadListResponse:
await self._ensure_initialized()
params = ThreadListParams(
archived=archived,
cursor=cursor,
cwd=cwd,
limit=limit,
model_providers=model_providers,
search_term=search_term,
sort_key=sort_key,
source_kinds=source_kinds,
)
return await self._client.thread_list(params)
async def thread_resume(
self,
thread_id: str,
*,
approval_policy: AskForApproval | None = None,
base_instructions: str | None = None,
config: JsonObject | None = None,
cwd: str | None = None,
developer_instructions: str | None = None,
model: str | None = None,
model_provider: str | None = None,
personality: Personality | None = None,
sandbox: SandboxMode | None = None,
service_tier: ServiceTier | None = None,
) -> AsyncThread:
await self._ensure_initialized()
params = ThreadResumeParams(
thread_id=thread_id,
approval_policy=approval_policy,
base_instructions=base_instructions,
config=config,
cwd=cwd,
developer_instructions=developer_instructions,
model=model,
model_provider=model_provider,
personality=personality,
sandbox=sandbox,
service_tier=service_tier,
)
resumed = await self._client.thread_resume(thread_id, params)
return AsyncThread(self, resumed.thread.id)
async def thread_fork(
self,
thread_id: str,
*,
approval_policy: AskForApproval | None = None,
base_instructions: str | None = None,
config: JsonObject | None = None,
cwd: str | None = None,
developer_instructions: str | None = None,
model: str | None = None,
model_provider: str | None = None,
sandbox: SandboxMode | None = None,
service_tier: ServiceTier | None = None,
) -> AsyncThread:
await self._ensure_initialized()
params = ThreadForkParams(
thread_id=thread_id,
approval_policy=approval_policy,
base_instructions=base_instructions,
config=config,
cwd=cwd,
developer_instructions=developer_instructions,
model=model,
model_provider=model_provider,
sandbox=sandbox,
service_tier=service_tier,
)
forked = await self._client.thread_fork(thread_id, params)
return AsyncThread(self, forked.thread.id)
async def thread_archive(self, thread_id: str) -> ThreadArchiveResponse:
await self._ensure_initialized()
return await self._client.thread_archive(thread_id)
async def thread_unarchive(self, thread_id: str) -> AsyncThread:
await self._ensure_initialized()
unarchived = await self._client.thread_unarchive(thread_id)
return AsyncThread(self, unarchived.thread.id)
# END GENERATED: AsyncCodex.flat_methods
async def models(self, *, include_hidden: bool = False) -> ModelListResponse:
await self._ensure_initialized()
return await self._client.model_list(include_hidden=include_hidden)
@dataclass(slots=True)
class Thread:
_client: AppServerClient
id: str
# BEGIN GENERATED: Thread.flat_methods
def turn(
self,
input: Input,
*,
approval_policy: AskForApproval | None = None,
cwd: str | None = None,
effort: ReasoningEffort | None = None,
model: str | None = None,
output_schema: JsonObject | None = None,
personality: Personality | None = None,
sandbox_policy: SandboxPolicy | None = None,
service_tier: ServiceTier | None = None,
summary: ReasoningSummary | None = None,
) -> Turn:
wire_input = _to_wire_input(input)
params = TurnStartParams(
thread_id=self.id,
input=wire_input,
approval_policy=approval_policy,
cwd=cwd,
effort=effort,
model=model,
output_schema=output_schema,
personality=personality,
sandbox_policy=sandbox_policy,
service_tier=service_tier,
summary=summary,
)
turn = self._client.turn_start(self.id, wire_input, params=params)
return Turn(self._client, self.id, turn.turn.id)
# END GENERATED: Thread.flat_methods
def read(self, *, include_turns: bool = False) -> ThreadReadResponse:
return self._client.thread_read(self.id, include_turns=include_turns)
def set_name(self, name: str) -> ThreadSetNameResponse:
return self._client.thread_set_name(self.id, name)
def compact(self) -> ThreadCompactStartResponse:
return self._client.thread_compact(self.id)
@dataclass(slots=True)
class AsyncThread:
_codex: AsyncCodex
id: str
# BEGIN GENERATED: AsyncThread.flat_methods
async def turn(
self,
input: Input,
*,
approval_policy: AskForApproval | None = None,
cwd: str | None = None,
effort: ReasoningEffort | None = None,
model: str | None = None,
output_schema: JsonObject | None = None,
personality: Personality | None = None,
sandbox_policy: SandboxPolicy | None = None,
service_tier: ServiceTier | None = None,
summary: ReasoningSummary | None = None,
) -> AsyncTurn:
await self._codex._ensure_initialized()
wire_input = _to_wire_input(input)
params = TurnStartParams(
thread_id=self.id,
input=wire_input,
approval_policy=approval_policy,
cwd=cwd,
effort=effort,
model=model,
output_schema=output_schema,
personality=personality,
sandbox_policy=sandbox_policy,
service_tier=service_tier,
summary=summary,
)
turn = await self._codex._client.turn_start(
self.id,
wire_input,
params=params,
)
return AsyncTurn(self._codex, self.id, turn.turn.id)
# END GENERATED: AsyncThread.flat_methods
async def read(self, *, include_turns: bool = False) -> ThreadReadResponse:
await self._codex._ensure_initialized()
return await self._codex._client.thread_read(self.id, include_turns=include_turns)
async def set_name(self, name: str) -> ThreadSetNameResponse:
await self._codex._ensure_initialized()
return await self._codex._client.thread_set_name(self.id, name)
async def compact(self) -> ThreadCompactStartResponse:
await self._codex._ensure_initialized()
return await self._codex._client.thread_compact(self.id)
@dataclass(slots=True)
class Turn:
_client: AppServerClient
thread_id: str
id: str
def steer(self, input: Input) -> TurnSteerResponse:
return self._client.turn_steer(self.thread_id, self.id, _to_wire_input(input))
def interrupt(self) -> TurnInterruptResponse:
return self._client.turn_interrupt(self.thread_id, self.id)
def stream(self) -> Iterator[Notification]:
# TODO: replace this client-wide experimental guard with per-turn event demux.
self._client.acquire_turn_consumer(self.id)
try:
while True:
event = self._client.next_notification()
yield event
if (
event.method == "turn/completed"
and isinstance(event.payload, TurnCompletedNotificationPayload)
and event.payload.turn.id == self.id
):
break
finally:
self._client.release_turn_consumer(self.id)
def run(self) -> TurnResult:
completed: TurnCompletedNotificationPayload | None = None
usage: ThreadTokenUsageUpdatedNotification | None = None
delta_chunks: list[str] = []
raw_text_chunks: list[str] = []
stream = self.stream()
try:
for event in stream:
payload = event.payload
if (
isinstance(payload, AgentMessageDeltaNotification)
and payload.turn_id == self.id
):
delta_chunks.append(payload.delta)
continue
if (
isinstance(payload, RawResponseItemCompletedNotification)
and payload.turn_id == self.id
):
raw_text_chunks.extend(_assistant_output_text_chunks(payload))
continue
if (
isinstance(payload, ThreadTokenUsageUpdatedNotification)
and payload.turn_id == self.id
):
usage = payload
continue
if (
isinstance(payload, TurnCompletedNotificationPayload)
and payload.turn.id == self.id
):
completed = payload
finally:
stream.close()
return _build_turn_result(completed, usage, delta_chunks, raw_text_chunks)
@dataclass(slots=True)
class AsyncTurn:
_codex: AsyncCodex
thread_id: str
id: str
async def steer(self, input: Input) -> TurnSteerResponse:
await self._codex._ensure_initialized()
return await self._codex._client.turn_steer(
self.thread_id,
self.id,
_to_wire_input(input),
)
async def interrupt(self) -> TurnInterruptResponse:
await self._codex._ensure_initialized()
return await self._codex._client.turn_interrupt(self.thread_id, self.id)
async def stream(self) -> AsyncIterator[Notification]:
await self._codex._ensure_initialized()
# TODO: replace this client-wide experimental guard with per-turn event demux.
self._codex._client.acquire_turn_consumer(self.id)
try:
while True:
event = await self._codex._client.next_notification()
yield event
if (
event.method == "turn/completed"
and isinstance(event.payload, TurnCompletedNotificationPayload)
and event.payload.turn.id == self.id
):
break
finally:
self._codex._client.release_turn_consumer(self.id)
async def run(self) -> TurnResult:
completed: TurnCompletedNotificationPayload | None = None
usage: ThreadTokenUsageUpdatedNotification | None = None
delta_chunks: list[str] = []
raw_text_chunks: list[str] = []
stream = self.stream()
try:
async for event in stream:
payload = event.payload
if (
isinstance(payload, AgentMessageDeltaNotification)
and payload.turn_id == self.id
):
delta_chunks.append(payload.delta)
continue
if (
isinstance(payload, RawResponseItemCompletedNotification)
and payload.turn_id == self.id
):
raw_text_chunks.extend(_assistant_output_text_chunks(payload))
continue
if (
isinstance(payload, ThreadTokenUsageUpdatedNotification)
and payload.turn_id == self.id
):
usage = payload
continue
if (
isinstance(payload, TurnCompletedNotificationPayload)
and payload.turn.id == self.id
):
completed = payload
finally:
await stream.aclose()
return _build_turn_result(completed, usage, delta_chunks, raw_text_chunks)

View File

@@ -0,0 +1,39 @@
"""Shallow public aliases over the generated v2 wire models."""
from .generated.v2_all import (
AskForApproval,
Personality,
PlanType,
ReasoningEffort,
ReasoningSummary,
SandboxMode,
SandboxPolicy,
ThreadForkParams,
ThreadListParams,
ThreadResumeParams,
ThreadSortKey,
ThreadSourceKind,
ThreadStartParams,
TurnStartParams,
TurnStatus,
TurnSteerParams,
)
__all__ = [
"AskForApproval",
"Personality",
"PlanType",
"ReasoningEffort",
"ReasoningSummary",
"SandboxMode",
"SandboxPolicy",
"ThreadForkParams",
"ThreadListParams",
"ThreadResumeParams",
"ThreadSortKey",
"ThreadSourceKind",
"ThreadStartParams",
"TurnStartParams",
"TurnStatus",
"TurnSteerParams",
]

View File

View File

@@ -0,0 +1,41 @@
from __future__ import annotations
import random
import time
from typing import Callable, TypeVar
from .errors import is_retryable_error
T = TypeVar("T")
def retry_on_overload(
op: Callable[[], T],
*,
max_attempts: int = 3,
initial_delay_s: float = 0.25,
max_delay_s: float = 2.0,
jitter_ratio: float = 0.2,
) -> T:
"""Retry helper for transient server-overload errors."""
if max_attempts < 1:
raise ValueError("max_attempts must be >= 1")
delay = initial_delay_s
attempt = 0
while True:
attempt += 1
try:
return op()
except Exception as exc:
if attempt >= max_attempts:
raise
if not is_retryable_error(exc):
raise
jitter = delay * jitter_ratio
sleep_for = min(max_delay_s, delay) + random.uniform(-jitter, jitter)
if sleep_for > 0:
time.sleep(sleep_for)
delay = min(max_delay_s, delay * 2)

View File

@@ -0,0 +1,16 @@
from __future__ import annotations
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SRC = ROOT / "src"
src_str = str(SRC)
if src_str in sys.path:
sys.path.remove(src_str)
sys.path.insert(0, src_str)
for module_name in list(sys.modules):
if module_name == "codex_app_server" or module_name.startswith("codex_app_server."):
sys.modules.pop(module_name)

View File

@@ -0,0 +1,146 @@
from __future__ import annotations
import ast
import importlib.util
import json
import platform
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
def _load_update_script_module():
script_path = ROOT / "scripts" / "update_sdk_artifacts.py"
spec = importlib.util.spec_from_file_location("update_sdk_artifacts", script_path)
if spec is None or spec.loader is None:
raise AssertionError(f"Failed to load script module: {script_path}")
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module
def test_generation_has_single_maintenance_entrypoint_script() -> None:
scripts = sorted(p.name for p in (ROOT / "scripts").glob("*.py"))
assert scripts == ["update_sdk_artifacts.py"]
def test_generate_types_wires_all_generation_steps() -> None:
source = (ROOT / "scripts" / "update_sdk_artifacts.py").read_text()
tree = ast.parse(source)
generate_types_fn = next(
(node for node in tree.body if isinstance(node, ast.FunctionDef) and node.name == "generate_types"),
None,
)
assert generate_types_fn is not None
calls: list[str] = []
for node in generate_types_fn.body:
if isinstance(node, ast.Expr) and isinstance(node.value, ast.Call):
fn = node.value.func
if isinstance(fn, ast.Name):
calls.append(fn.id)
assert calls == [
"generate_v2_all",
"generate_notification_registry",
"generate_public_api_flat_methods",
]
def test_schema_normalization_only_flattens_string_literal_oneofs() -> None:
script = _load_update_script_module()
schema = json.loads(
(
ROOT.parent.parent
/ "codex-rs"
/ "app-server-protocol"
/ "schema"
/ "json"
/ "codex_app_server_protocol.v2.schemas.json"
).read_text()
)
definitions = schema["definitions"]
flattened = [
name
for name, definition in definitions.items()
if isinstance(definition, dict)
and script._flatten_string_enum_one_of(definition.copy())
]
assert flattened == [
"AuthMode",
"CommandExecOutputStream",
"ExperimentalFeatureStage",
"InputModality",
"MessagePhase",
]
def test_bundled_binaries_exist_for_all_supported_platforms() -> None:
script = _load_update_script_module()
for platform_key in script.PLATFORMS:
bin_path = script.bundled_platform_bin_path(platform_key)
assert bin_path.is_file(), f"Missing bundled binary: {bin_path}"
def test_default_runtime_uses_current_platform_bundled_binary() -> None:
client_source = (ROOT / "src" / "codex_app_server" / "client.py").read_text()
client_tree = ast.parse(client_source)
# Keep this assertion source-level so it works in both PR2 (types foundation)
# and PR3 (full SDK), regardless of runtime module wiring.
app_server_config = next(
(
node
for node in client_tree.body
if isinstance(node, ast.ClassDef) and node.name == "AppServerConfig"
),
None,
)
assert app_server_config is not None
codex_bin_field = next(
(
node
for node in app_server_config.body
if isinstance(node, ast.AnnAssign)
and isinstance(node.target, ast.Name)
and node.target.id == "codex_bin"
),
None,
)
assert codex_bin_field is not None
assert isinstance(codex_bin_field.value, ast.Call)
assert isinstance(codex_bin_field.value.func, ast.Name)
assert codex_bin_field.value.func.id == "str"
assert len(codex_bin_field.value.args) == 1
bundled_call = codex_bin_field.value.args[0]
assert isinstance(bundled_call, ast.Call)
assert isinstance(bundled_call.func, ast.Name)
assert bundled_call.func.id == "_bundled_codex_path"
bin_root = (ROOT / "src" / "codex_app_server" / "bin").resolve()
sys_name = platform.system().lower()
machine = platform.machine().lower()
is_arm = machine in {"arm64", "aarch64"}
if sys_name.startswith("darwin"):
platform_dir = "darwin-arm64" if is_arm else "darwin-x64"
exe = "codex"
elif sys_name.startswith("linux"):
platform_dir = "linux-arm64" if is_arm else "linux-x64"
exe = "codex"
elif sys_name.startswith("windows"):
platform_dir = "windows-arm64" if is_arm else "windows-x64"
exe = "codex.exe"
else:
raise AssertionError(f"Unsupported platform in test: {sys_name}/{machine}")
expected = (bin_root / platform_dir / exe).resolve()
assert expected.is_file()

View File

@@ -0,0 +1,64 @@
from __future__ import annotations
import asyncio
import time
from codex_app_server.async_client import AsyncAppServerClient
def test_async_client_serializes_transport_calls() -> None:
async def scenario() -> int:
client = AsyncAppServerClient()
active = 0
max_active = 0
def fake_model_list(include_hidden: bool = False) -> bool:
nonlocal active, max_active
active += 1
max_active = max(max_active, active)
time.sleep(0.05)
active -= 1
return include_hidden
client._sync.model_list = fake_model_list # type: ignore[method-assign]
await asyncio.gather(client.model_list(), client.model_list())
return max_active
assert asyncio.run(scenario()) == 1
def test_async_stream_text_is_incremental_and_blocks_parallel_calls() -> None:
async def scenario() -> tuple[str, list[str], bool]:
client = AsyncAppServerClient()
def fake_stream_text(thread_id: str, text: str, params=None): # type: ignore[no-untyped-def]
yield "first"
time.sleep(0.03)
yield "second"
yield "third"
def fake_model_list(include_hidden: bool = False) -> str:
return "done"
client._sync.stream_text = fake_stream_text # type: ignore[method-assign]
client._sync.model_list = fake_model_list # type: ignore[method-assign]
stream = client.stream_text("thread-1", "hello")
first = await anext(stream)
blocked_before_stream_done = False
competing_call = asyncio.create_task(client.model_list())
await asyncio.sleep(0.01)
blocked_before_stream_done = not competing_call.done()
remaining: list[str] = []
async for item in stream:
remaining.append(item)
await competing_call
return first, remaining, blocked_before_stream_done
first, remaining, blocked = asyncio.run(scenario())
assert first == "first"
assert remaining == ["second", "third"]
assert blocked

View File

@@ -0,0 +1,95 @@
from __future__ import annotations
from pathlib import Path
from typing import Any
from codex_app_server.client import AppServerClient, _params_dict
from codex_app_server.generated.v2_all import ThreadListParams, ThreadTokenUsageUpdatedNotification
from codex_app_server.models import UnknownNotification
ROOT = Path(__file__).resolve().parents[1]
def test_thread_set_name_and_compact_use_current_rpc_methods() -> None:
client = AppServerClient()
calls: list[tuple[str, dict[str, Any] | None]] = []
def fake_request(method: str, params, *, response_model): # type: ignore[no-untyped-def]
calls.append((method, params))
return response_model.model_validate({})
client.request = fake_request # type: ignore[method-assign]
client.thread_set_name("thread-1", "sdk-name")
client.thread_compact("thread-1")
assert calls[0][0] == "thread/name/set"
assert calls[1][0] == "thread/compact/start"
def test_generated_params_models_are_snake_case_and_dump_by_alias() -> None:
params = ThreadListParams(search_term="needle", limit=5)
assert "search_term" in ThreadListParams.model_fields
dumped = _params_dict(params)
assert dumped == {"searchTerm": "needle", "limit": 5}
def test_generated_v2_bundle_has_single_shared_plan_type_definition() -> None:
source = (ROOT / "src" / "codex_app_server" / "generated" / "v2_all.py").read_text()
assert source.count("class PlanType(") == 1
def test_notifications_are_typed_with_canonical_v2_methods() -> None:
client = AppServerClient()
event = client._coerce_notification(
"thread/tokenUsage/updated",
{
"threadId": "thread-1",
"turnId": "turn-1",
"tokenUsage": {
"last": {
"cachedInputTokens": 0,
"inputTokens": 1,
"outputTokens": 2,
"reasoningOutputTokens": 0,
"totalTokens": 3,
},
"total": {
"cachedInputTokens": 0,
"inputTokens": 1,
"outputTokens": 2,
"reasoningOutputTokens": 0,
"totalTokens": 3,
},
},
},
)
assert event.method == "thread/tokenUsage/updated"
assert isinstance(event.payload, ThreadTokenUsageUpdatedNotification)
assert event.payload.turn_id == "turn-1"
def test_unknown_notifications_fall_back_to_unknown_payloads() -> None:
client = AppServerClient()
event = client._coerce_notification(
"unknown/notification",
{
"id": "evt-1",
"conversationId": "thread-1",
"msg": {"type": "turn_aborted"},
},
)
assert event.method == "unknown/notification"
assert isinstance(event.payload, UnknownNotification)
assert event.payload.params["msg"] == {"type": "turn_aborted"}
def test_invalid_notification_payload_falls_back_to_unknown() -> None:
client = AppServerClient()
event = client._coerce_notification("thread/tokenUsage/updated", {"threadId": "missing"})
assert event.method == "thread/tokenUsage/updated"
assert isinstance(event.payload, UnknownNotification)

View File

@@ -0,0 +1,52 @@
from __future__ import annotations
import os
import subprocess
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
GENERATED_TARGETS = [
Path("src/codex_app_server/generated/notification_registry.py"),
Path("src/codex_app_server/generated/v2_all.py"),
Path("src/codex_app_server/public_api.py"),
]
def _snapshot_target(root: Path, rel_path: Path) -> dict[str, bytes] | bytes | None:
target = root / rel_path
if not target.exists():
return None
if target.is_file():
return target.read_bytes()
snapshot: dict[str, bytes] = {}
for path in sorted(target.rglob("*")):
if path.is_file() and "__pycache__" not in path.parts:
snapshot[str(path.relative_to(target))] = path.read_bytes()
return snapshot
def _snapshot_targets(root: Path) -> dict[str, dict[str, bytes] | bytes | None]:
return {
str(rel_path): _snapshot_target(root, rel_path) for rel_path in GENERATED_TARGETS
}
def test_generated_files_are_up_to_date():
before = _snapshot_targets(ROOT)
# Regenerate contract artifacts via single maintenance entrypoint.
env = os.environ.copy()
python_bin = str(Path(sys.executable).parent)
env["PATH"] = f"{python_bin}{os.pathsep}{env.get('PATH', '')}"
subprocess.run(
[sys.executable, "scripts/update_sdk_artifacts.py", "--types-only"],
cwd=ROOT,
check=True,
env=env,
)
after = _snapshot_targets(ROOT)
assert before == after, "Generated files drifted after regeneration"

View File

@@ -0,0 +1,286 @@
from __future__ import annotations
import asyncio
from collections import deque
from pathlib import Path
import pytest
import codex_app_server.public_api as public_api_module
from codex_app_server.client import AppServerClient
from codex_app_server.generated.v2_all import (
AgentMessageDeltaNotification,
RawResponseItemCompletedNotification,
ThreadTokenUsageUpdatedNotification,
)
from codex_app_server.models import InitializeResponse, Notification
from codex_app_server.public_api import AsyncCodex, AsyncTurn, Codex, Turn
from codex_app_server.public_types import TurnStatus
ROOT = Path(__file__).resolve().parents[1]
def _delta_notification(
*,
thread_id: str = "thread-1",
turn_id: str = "turn-1",
text: str = "delta-text",
) -> Notification:
return Notification(
method="item/agentMessage/delta",
payload=AgentMessageDeltaNotification.model_validate(
{
"delta": text,
"itemId": "item-1",
"threadId": thread_id,
"turnId": turn_id,
}
),
)
def _raw_response_notification(
*,
thread_id: str = "thread-1",
turn_id: str = "turn-1",
text: str = "raw-text",
) -> Notification:
return Notification(
method="rawResponseItem/completed",
payload=RawResponseItemCompletedNotification.model_validate(
{
"item": {
"type": "message",
"role": "assistant",
"content": [{"type": "output_text", "text": text}],
},
"threadId": thread_id,
"turnId": turn_id,
}
),
)
def _usage_notification(
*,
thread_id: str = "thread-1",
turn_id: str = "turn-1",
) -> Notification:
return Notification(
method="thread/tokenUsage/updated",
payload=ThreadTokenUsageUpdatedNotification.model_validate(
{
"threadId": thread_id,
"turnId": turn_id,
"tokenUsage": {
"last": {
"cachedInputTokens": 0,
"inputTokens": 1,
"outputTokens": 2,
"reasoningOutputTokens": 0,
"totalTokens": 3,
},
"total": {
"cachedInputTokens": 0,
"inputTokens": 1,
"outputTokens": 2,
"reasoningOutputTokens": 0,
"totalTokens": 3,
},
},
}
),
)
def _completed_notification(
*,
thread_id: str = "thread-1",
turn_id: str = "turn-1",
status: str = "completed",
) -> Notification:
return Notification(
method="turn/completed",
payload=public_api_module.TurnCompletedNotificationPayload.model_validate(
{
"threadId": thread_id,
"turn": {
"id": turn_id,
"items": [],
"status": status,
},
}
),
)
def test_codex_init_failure_closes_client(monkeypatch: pytest.MonkeyPatch) -> None:
closed: list[bool] = []
class FakeClient:
def __init__(self, config=None) -> None: # noqa: ANN001,ARG002
self._closed = False
def start(self) -> None:
return None
def initialize(self) -> InitializeResponse:
return InitializeResponse.model_validate({})
def close(self) -> None:
self._closed = True
closed.append(True)
monkeypatch.setattr(public_api_module, "AppServerClient", FakeClient)
with pytest.raises(RuntimeError, match="missing required metadata"):
Codex()
assert closed == [True]
def test_async_codex_init_failure_closes_client() -> None:
async def scenario() -> None:
codex = AsyncCodex()
close_calls = 0
async def fake_start() -> None:
return None
async def fake_initialize() -> InitializeResponse:
return InitializeResponse.model_validate({})
async def fake_close() -> None:
nonlocal close_calls
close_calls += 1
codex._client.start = fake_start # type: ignore[method-assign]
codex._client.initialize = fake_initialize # type: ignore[method-assign]
codex._client.close = fake_close # type: ignore[method-assign]
with pytest.raises(RuntimeError, match="missing required metadata"):
await codex.models()
assert close_calls == 1
assert codex._initialized is False
assert codex._init is None
asyncio.run(scenario())
def test_async_codex_initializes_only_once_under_concurrency() -> None:
async def scenario() -> None:
codex = AsyncCodex()
start_calls = 0
initialize_calls = 0
ready = asyncio.Event()
async def fake_start() -> None:
nonlocal start_calls
start_calls += 1
async def fake_initialize() -> InitializeResponse:
nonlocal initialize_calls
initialize_calls += 1
ready.set()
await asyncio.sleep(0.02)
return InitializeResponse.model_validate(
{
"userAgent": "codex-cli/1.2.3",
"serverInfo": {"name": "codex-cli", "version": "1.2.3"},
}
)
async def fake_model_list(include_hidden: bool = False): # noqa: ANN202,ARG001
await ready.wait()
return object()
codex._client.start = fake_start # type: ignore[method-assign]
codex._client.initialize = fake_initialize # type: ignore[method-assign]
codex._client.model_list = fake_model_list # type: ignore[method-assign]
await asyncio.gather(codex.models(), codex.models())
assert start_calls == 1
assert initialize_calls == 1
asyncio.run(scenario())
def test_turn_stream_rejects_second_active_consumer() -> None:
client = AppServerClient()
notifications: deque[Notification] = deque(
[
_delta_notification(turn_id="turn-1"),
_completed_notification(turn_id="turn-1"),
]
)
client.next_notification = notifications.popleft # type: ignore[method-assign]
first_stream = Turn(client, "thread-1", "turn-1").stream()
assert next(first_stream).method == "item/agentMessage/delta"
second_stream = Turn(client, "thread-1", "turn-2").stream()
with pytest.raises(RuntimeError, match="Concurrent turn consumers are not yet supported"):
next(second_stream)
first_stream.close()
def test_async_turn_stream_rejects_second_active_consumer() -> None:
async def scenario() -> None:
codex = AsyncCodex()
async def fake_ensure_initialized() -> None:
return None
notifications: deque[Notification] = deque(
[
_delta_notification(turn_id="turn-1"),
_completed_notification(turn_id="turn-1"),
]
)
async def fake_next_notification() -> Notification:
return notifications.popleft()
codex._ensure_initialized = fake_ensure_initialized # type: ignore[method-assign]
codex._client.next_notification = fake_next_notification # type: ignore[method-assign]
first_stream = AsyncTurn(codex, "thread-1", "turn-1").stream()
assert (await anext(first_stream)).method == "item/agentMessage/delta"
second_stream = AsyncTurn(codex, "thread-1", "turn-2").stream()
with pytest.raises(RuntimeError, match="Concurrent turn consumers are not yet supported"):
await anext(second_stream)
await first_stream.aclose()
asyncio.run(scenario())
def test_turn_run_falls_back_to_completed_raw_response_text() -> None:
client = AppServerClient()
notifications: deque[Notification] = deque(
[
_raw_response_notification(text="hello from raw response"),
_usage_notification(),
_completed_notification(),
]
)
client.next_notification = notifications.popleft # type: ignore[method-assign]
result = Turn(client, "thread-1", "turn-1").run()
assert result.status == TurnStatus.completed
assert result.text == "hello from raw response"
def test_retry_examples_compare_status_with_enum() -> None:
for path in (
ROOT / "examples" / "10_error_handling_and_retry" / "sync.py",
ROOT / "examples" / "10_error_handling_and_retry" / "async.py",
):
source = path.read_text()
assert '== "failed"' not in source
assert "TurnStatus.failed" in source

View File

@@ -0,0 +1,209 @@
from __future__ import annotations
import importlib.resources as resources
import inspect
from typing import Any
from codex_app_server import AppServerConfig
from codex_app_server.models import InitializeResponse
from codex_app_server.public_api import AsyncCodex, AsyncThread, Codex, Thread
def _keyword_only_names(fn: object) -> list[str]:
signature = inspect.signature(fn)
return [
param.name
for param in signature.parameters.values()
if param.kind == inspect.Parameter.KEYWORD_ONLY
]
def _assert_no_any_annotations(fn: object) -> None:
signature = inspect.signature(fn)
for param in signature.parameters.values():
if param.annotation is Any:
raise AssertionError(f"{fn} has public parameter typed as Any: {param.name}")
if signature.return_annotation is Any:
raise AssertionError(f"{fn} has public return annotation typed as Any")
def test_root_exports_app_server_config() -> None:
assert AppServerConfig.__name__ == "AppServerConfig"
def test_package_includes_py_typed_marker() -> None:
marker = resources.files("codex_app_server").joinpath("py.typed")
assert marker.is_file()
def test_generated_public_signatures_are_snake_case_and_typed() -> None:
expected = {
Codex.thread_start: [
"approval_policy",
"base_instructions",
"config",
"cwd",
"developer_instructions",
"ephemeral",
"model",
"model_provider",
"personality",
"sandbox",
"service_name",
"service_tier",
],
Codex.thread_list: [
"archived",
"cursor",
"cwd",
"limit",
"model_providers",
"search_term",
"sort_key",
"source_kinds",
],
Codex.thread_resume: [
"approval_policy",
"base_instructions",
"config",
"cwd",
"developer_instructions",
"model",
"model_provider",
"personality",
"sandbox",
"service_tier",
],
Codex.thread_fork: [
"approval_policy",
"base_instructions",
"config",
"cwd",
"developer_instructions",
"model",
"model_provider",
"sandbox",
"service_tier",
],
Thread.turn: [
"approval_policy",
"cwd",
"effort",
"model",
"output_schema",
"personality",
"sandbox_policy",
"service_tier",
"summary",
],
AsyncCodex.thread_start: [
"approval_policy",
"base_instructions",
"config",
"cwd",
"developer_instructions",
"ephemeral",
"model",
"model_provider",
"personality",
"sandbox",
"service_name",
"service_tier",
],
AsyncCodex.thread_list: [
"archived",
"cursor",
"cwd",
"limit",
"model_providers",
"search_term",
"sort_key",
"source_kinds",
],
AsyncCodex.thread_resume: [
"approval_policy",
"base_instructions",
"config",
"cwd",
"developer_instructions",
"model",
"model_provider",
"personality",
"sandbox",
"service_tier",
],
AsyncCodex.thread_fork: [
"approval_policy",
"base_instructions",
"config",
"cwd",
"developer_instructions",
"model",
"model_provider",
"sandbox",
"service_tier",
],
AsyncThread.turn: [
"approval_policy",
"cwd",
"effort",
"model",
"output_schema",
"personality",
"sandbox_policy",
"service_tier",
"summary",
],
}
for fn, expected_kwargs in expected.items():
actual = _keyword_only_names(fn)
assert actual == expected_kwargs, f"unexpected kwargs for {fn}: {actual}"
assert all(name == name.lower() for name in actual), f"non snake_case kwargs in {fn}: {actual}"
_assert_no_any_annotations(fn)
def test_lifecycle_methods_are_codex_scoped() -> None:
assert hasattr(Codex, "thread_resume")
assert hasattr(Codex, "thread_fork")
assert hasattr(Codex, "thread_archive")
assert hasattr(Codex, "thread_unarchive")
assert hasattr(AsyncCodex, "thread_resume")
assert hasattr(AsyncCodex, "thread_fork")
assert hasattr(AsyncCodex, "thread_archive")
assert hasattr(AsyncCodex, "thread_unarchive")
assert not hasattr(Codex, "thread")
assert not hasattr(AsyncCodex, "thread")
assert not hasattr(Thread, "resume")
assert not hasattr(Thread, "fork")
assert not hasattr(Thread, "archive")
assert not hasattr(Thread, "unarchive")
assert not hasattr(AsyncThread, "resume")
assert not hasattr(AsyncThread, "fork")
assert not hasattr(AsyncThread, "archive")
assert not hasattr(AsyncThread, "unarchive")
for fn in (
Codex.thread_archive,
Codex.thread_unarchive,
AsyncCodex.thread_archive,
AsyncCodex.thread_unarchive,
):
_assert_no_any_annotations(fn)
def test_initialize_metadata_parses_user_agent_shape() -> None:
parsed = Codex._parse_initialize(InitializeResponse.model_validate({"userAgent": "codex-cli/1.2.3"}))
assert parsed.user_agent == "codex-cli/1.2.3"
assert parsed.server_name == "codex-cli"
assert parsed.server_version == "1.2.3"
def test_initialize_metadata_requires_non_empty_information() -> None:
try:
Codex._parse_initialize(InitializeResponse.model_validate({}))
except RuntimeError as exc:
assert "missing required metadata" in str(exc)
else:
raise AssertionError("expected RuntimeError when initialize metadata is missing")

View File

@@ -0,0 +1,215 @@
from __future__ import annotations
import asyncio
import json
import os
import subprocess
import sys
import tempfile
from pathlib import Path
import pytest
from codex_app_server import AsyncCodex, Codex, TextInput
ROOT = Path(__file__).resolve().parents[1]
EXAMPLES_DIR = ROOT / "examples"
NOTEBOOK_PATH = ROOT / "notebooks" / "sdk_walkthrough.ipynb"
# 11_cli_mini_app is interactive; we still run it by feeding '/exit'.
EXAMPLE_CASES: list[tuple[str, str]] = [
("01_quickstart_constructor", "sync.py"),
("01_quickstart_constructor", "async.py"),
("02_turn_run", "sync.py"),
("02_turn_run", "async.py"),
("03_turn_stream_events", "sync.py"),
("03_turn_stream_events", "async.py"),
("04_models_and_metadata", "sync.py"),
("04_models_and_metadata", "async.py"),
("05_existing_thread", "sync.py"),
("05_existing_thread", "async.py"),
("06_thread_lifecycle_and_controls", "sync.py"),
("06_thread_lifecycle_and_controls", "async.py"),
("07_image_and_text", "sync.py"),
("07_image_and_text", "async.py"),
("08_local_image_and_text", "sync.py"),
("08_local_image_and_text", "async.py"),
("09_async_parity", "sync.py"),
# 09_async_parity async path is represented by 01 async + dedicated async-based cases above.
("10_error_handling_and_retry", "sync.py"),
("10_error_handling_and_retry", "async.py"),
("11_cli_mini_app", "sync.py"),
("11_cli_mini_app", "async.py"),
("12_turn_params_kitchen_sink", "sync.py"),
("12_turn_params_kitchen_sink", "async.py"),
("13_model_select_and_turn_params", "sync.py"),
("13_model_select_and_turn_params", "async.py"),
]
def _run_example(
folder: str, script: str, *, timeout_s: int = 150
) -> subprocess.CompletedProcess[str]:
path = EXAMPLES_DIR / folder / script
assert path.exists(), f"Missing example script: {path}"
env = os.environ.copy()
# Feed '/exit' only to interactive mini-cli examples.
stdin = "/exit\n" if folder == "11_cli_mini_app" else None
return subprocess.run(
[sys.executable, str(path)],
cwd=str(ROOT),
env=env,
input=stdin,
text=True,
capture_output=True,
timeout=timeout_s,
check=False,
)
def _notebook_cell_source(cell_index: int) -> str:
notebook = json.loads(NOTEBOOK_PATH.read_text())
return "".join(notebook["cells"][cell_index]["source"])
def test_real_initialize_and_model_list():
with Codex() as codex:
metadata = codex.metadata
assert isinstance(metadata.user_agent, str) and metadata.user_agent.strip()
assert isinstance(metadata.server_name, str) and metadata.server_name.strip()
assert isinstance(metadata.server_version, str) and metadata.server_version.strip()
models = codex.models(include_hidden=True)
assert isinstance(models.data, list)
def test_real_thread_and_turn_start_smoke():
with Codex() as codex:
thread = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
result = thread.turn(TextInput("hello")).run()
assert isinstance(result.thread_id, str) and result.thread_id.strip()
assert isinstance(result.turn_id, str) and result.turn_id.strip()
assert isinstance(result.items, list)
assert result.usage is not None
assert result.usage.thread_id == result.thread_id
assert result.usage.turn_id == result.turn_id
def test_real_async_thread_turn_usage_and_ids_smoke() -> None:
async def _run() -> None:
async with AsyncCodex() as codex:
thread = await codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
result = await (await thread.turn(TextInput("say ok"))).run()
assert isinstance(result.thread_id, str) and result.thread_id.strip()
assert isinstance(result.turn_id, str) and result.turn_id.strip()
assert isinstance(result.items, list)
assert result.usage is not None
assert result.usage.thread_id == result.thread_id
assert result.usage.turn_id == result.turn_id
asyncio.run(_run())
def test_notebook_bootstrap_resolves_sdk_from_unrelated_cwd() -> None:
cell_1_source = _notebook_cell_source(1)
env = os.environ.copy()
env["CODEX_PYTHON_SDK_DIR"] = str(ROOT)
with tempfile.TemporaryDirectory() as temp_cwd:
result = subprocess.run(
[sys.executable, "-c", cell_1_source],
cwd=temp_cwd,
env=env,
text=True,
capture_output=True,
timeout=60,
check=False,
)
assert result.returncode == 0, (
f"Notebook bootstrap failed from unrelated cwd.\n"
f"STDOUT:\n{result.stdout}\n"
f"STDERR:\n{result.stderr}"
)
assert "SDK source:" in result.stdout
assert "codex_app_server" in result.stdout or "sdk/python/src" in result.stdout
def test_real_streaming_smoke_turn_completed():
with Codex() as codex:
thread = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
turn = thread.turn(TextInput("Reply with one short sentence."))
saw_delta = False
saw_completed = False
for evt in turn.stream():
if evt.method == "item/agentMessage/delta":
saw_delta = True
if evt.method == "turn/completed":
saw_completed = True
assert saw_completed
# Some environments can produce zero deltas for very short output;
# this assert keeps the smoke test informative but non-flaky.
assert isinstance(saw_delta, bool)
def test_real_turn_interrupt_smoke():
with Codex() as codex:
thread = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
turn = thread.turn(TextInput("Count from 1 to 200 with commas."))
# Best effort: interrupting quickly may race with completion on fast models.
_ = turn.interrupt()
# Confirm the session is still usable after interrupt race.
follow_up = thread.turn(TextInput("Say 'ok' only.")).run()
assert follow_up.status.value in {"completed", "failed"}
@pytest.mark.parametrize(("folder", "script"), EXAMPLE_CASES)
def test_real_examples_run_and_assert(folder: str, script: str):
result = _run_example(folder, script)
assert result.returncode == 0, (
f"Example failed: {folder}/{script}\n"
f"STDOUT:\n{result.stdout}\n"
f"STDERR:\n{result.stderr}"
)
out = result.stdout
# Minimal content assertions so we validate behavior, not just exit code.
if folder == "01_quickstart_constructor":
assert "Status:" in out and "Text:" in out
assert "Server: None None" not in out
elif folder == "02_turn_run":
assert "thread_id:" in out and "turn_id:" in out and "status:" in out
assert "usage: None" not in out
elif folder == "03_turn_stream_events":
assert "turn/completed" in out
elif folder == "04_models_and_metadata":
assert "models.count:" in out
assert "server_name=None" not in out
assert "server_version=None" not in out
elif folder == "05_existing_thread":
assert "Created thread:" in out
elif folder == "06_thread_lifecycle_and_controls":
assert "Lifecycle OK:" in out
elif folder in {"07_image_and_text", "08_local_image_and_text"}:
assert "completed" in out.lower() or "Status:" in out
elif folder == "09_async_parity":
assert "Thread:" in out and "Turn:" in out
elif folder == "10_error_handling_and_retry":
assert "Text:" in out
elif folder == "11_cli_mini_app":
assert "Thread:" in out
elif folder == "12_turn_params_kitchen_sink":
assert "Status:" in out and "Usage:" in out
elif folder == "13_model_select_and_turn_params":
assert "selected.model:" in out and "agent.message.params:" in out and "usage.params:" in out
assert "usage.params: None" not in out