mirror of
https://github.com/openai/codex.git
synced 2026-04-23 22:24:57 +00:00
5.8 KiB
5.8 KiB
Codex App Server SDK — API Reference
Public surface of codex_app_server for app-server v2.
This SDK surface is experimental. The current implementation intentionally allows only one active Turn.stream() or Turn.run() consumer per client instance at a time.
Package Entry
from codex_app_server import (
Codex,
AsyncCodex,
Thread,
AsyncThread,
Turn,
AsyncTurn,
TurnResult,
InitializeResult,
Input,
InputItem,
TextInput,
ImageInput,
LocalImageInput,
SkillInput,
MentionInput,
ThreadItem,
TurnStatus,
)
- Version:
codex_app_server.__version__ - Requires Python >= 3.10
Codex (sync)
Codex(config: AppServerConfig | None = None)
Properties/methods:
metadata -> InitializeResultclose() -> Nonethread_start(*, approval_policy=None, base_instructions=None, config=None, cwd=None, developer_instructions=None, ephemeral=None, model=None, model_provider=None, personality=None, sandbox=None) -> Threadthread_list(*, archived=None, cursor=None, cwd=None, limit=None, model_providers=None, sort_key=None, source_kinds=None) -> ThreadListResponsethread_resume(thread_id: str, *, approval_policy=None, base_instructions=None, config=None, cwd=None, developer_instructions=None, model=None, model_provider=None, personality=None, sandbox=None) -> Threadthread_fork(thread_id: str, *, approval_policy=None, base_instructions=None, config=None, cwd=None, developer_instructions=None, model=None, model_provider=None, sandbox=None) -> Threadthread_archive(thread_id: str) -> ThreadArchiveResponsethread_unarchive(thread_id: str) -> Threadmodels(*, include_hidden: bool = False) -> ModelListResponse
Context manager:
with Codex() as codex:
...
AsyncCodex (async parity)
AsyncCodex(config: AppServerConfig | None = None)
Properties/methods:
metadata -> InitializeResultclose() -> Awaitable[None]thread_start(*, approval_policy=None, base_instructions=None, config=None, cwd=None, developer_instructions=None, ephemeral=None, model=None, model_provider=None, personality=None, sandbox=None) -> Awaitable[AsyncThread]thread_list(*, archived=None, cursor=None, cwd=None, limit=None, model_providers=None, sort_key=None, source_kinds=None) -> Awaitable[ThreadListResponse]thread_resume(thread_id: str, *, approval_policy=None, base_instructions=None, config=None, cwd=None, developer_instructions=None, model=None, model_provider=None, personality=None, sandbox=None) -> Awaitable[AsyncThread]thread_fork(thread_id: str, *, approval_policy=None, base_instructions=None, config=None, cwd=None, developer_instructions=None, model=None, model_provider=None, sandbox=None) -> Awaitable[AsyncThread]thread_archive(thread_id: str) -> Awaitable[ThreadArchiveResponse]thread_unarchive(thread_id: str) -> Awaitable[AsyncThread]models(*, include_hidden: bool = False) -> Awaitable[ModelListResponse]
Async context manager:
async with AsyncCodex() as codex:
...
Thread / AsyncThread
Thread and AsyncThread share the same shape and intent.
Thread
turn(input: Input, *, approval_policy=None, cwd=None, effort=None, model=None, output_schema=None, personality=None, sandbox_policy=None, summary=None) -> Turnread(*, include_turns: bool = False) -> ThreadReadResponseset_name(name: str) -> ThreadSetNameResponsecompact() -> ThreadCompactStartResponse
AsyncThread
turn(input: Input, *, approval_policy=None, cwd=None, effort=None, model=None, output_schema=None, personality=None, sandbox_policy=None, summary=None) -> Awaitable[AsyncTurn]read(*, include_turns: bool = False) -> Awaitable[ThreadReadResponse]set_name(name: str) -> Awaitable[ThreadSetNameResponse]compact() -> Awaitable[ThreadCompactStartResponse]
Turn / AsyncTurn
Turn
steer(input: Input) -> TurnSteerResponseinterrupt() -> TurnInterruptResponsestream() -> Iterator[Notification]run() -> TurnResult
Behavior notes:
stream()andrun()are exclusive per client instance in the current experimental build- starting a second turn consumer on the same
Codexinstance raisesRuntimeError
AsyncTurn
steer(input: Input) -> Awaitable[TurnSteerResponse]interrupt() -> Awaitable[TurnInterruptResponse]stream() -> AsyncIterator[Notification]run() -> Awaitable[TurnResult]
Behavior notes:
stream()andrun()are exclusive per client instance in the current experimental build- starting a second turn consumer on the same
AsyncCodexinstance raisesRuntimeError
TurnResult
@dataclass
class TurnResult:
thread_id: str
turn_id: str
status: TurnStatus
error: TurnError | None
text: str
items: list[ThreadItem]
usage: ThreadTokenUsageUpdatedNotification | None
Inputs
@dataclass class TextInput: text: str
@dataclass class ImageInput: url: str
@dataclass class LocalImageInput: path: str
@dataclass class SkillInput: name: str; path: str
@dataclass class MentionInput: name: str; path: str
InputItem = TextInput | ImageInput | LocalImageInput | SkillInput | MentionInput
Input = list[InputItem] | InputItem
Retry + errors
from codex_app_server import (
retry_on_overload,
JsonRpcError,
MethodNotFoundError,
InvalidParamsError,
ServerBusyError,
is_retryable_error,
)
retry_on_overload(...)retries transient overload errors with exponential backoff + jitter.is_retryable_error(exc)checks if an exception is transient/overload-like.
Example
from codex_app_server import Codex, TextInput
with Codex() as codex:
thread = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
result = thread.turn(TextInput("Say hello in one sentence.")).run()
print(result.text)