Add Python SDK thread.run convenience methods (#15088)

## TL;DR
Add `thread.run(...)` / `async thread.run(...)` convenience methods to
the Python SDK for the common case.

- add `RunInput = Input | str` and `RunResult` with `final_response`,
collected `items`, and optional `usage`
- keep `thread.turn(...)` strict and lower-level for streaming,
steering, interrupting, and raw generated `Turn` access
- update Python SDK docs, quickstart examples, and tests for the sync
and async convenience flows

## Validation
- `python3 -m pytest sdk/python/tests/test_public_api_signatures.py
sdk/python/tests/test_public_api_runtime_behavior.py`
- `python3 -m pytest
sdk/python/tests/test_real_app_server_integration.py -k
'thread_run_convenience or async_thread_run_convenience'` (skipped in
this environment)

---------

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Shaqayeq
2026-03-18 17:57:48 -07:00
committed by GitHub
parent 825d09373d
commit 4fd2774614
12 changed files with 760 additions and 103 deletions

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
import asyncio
from collections import deque
from pathlib import Path
from types import SimpleNamespace
import pytest
@@ -10,14 +11,20 @@ import codex_app_server.api as public_api_module
from codex_app_server.client import AppServerClient
from codex_app_server.generated.v2_all import (
AgentMessageDeltaNotification,
ItemCompletedNotification,
MessagePhase,
ThreadTokenUsageUpdatedNotification,
TurnCompletedNotification,
TurnStatus,
)
from codex_app_server.models import InitializeResponse, Notification
from codex_app_server.api import (
AsyncCodex,
AsyncThread,
AsyncTurnHandle,
Codex,
RunResult,
Thread,
TurnHandle,
)
@@ -48,16 +55,78 @@ def _completed_notification(
thread_id: str = "thread-1",
turn_id: str = "turn-1",
status: str = "completed",
error_message: str | None = None,
) -> Notification:
turn: dict[str, object] = {
"id": turn_id,
"items": [],
"status": status,
}
if error_message is not None:
turn["error"] = {"message": error_message}
return Notification(
method="turn/completed",
payload=TurnCompletedNotification.model_validate(
{
"threadId": thread_id,
"turn": {
"id": turn_id,
"items": [],
"status": status,
"turn": turn,
}
),
)
def _item_completed_notification(
*,
thread_id: str = "thread-1",
turn_id: str = "turn-1",
text: str = "final text",
phase: MessagePhase | None = None,
) -> Notification:
item: dict[str, object] = {
"id": "item-1",
"text": text,
"type": "agentMessage",
}
if phase is not None:
item["phase"] = phase.value
return Notification(
method="item/completed",
payload=ItemCompletedNotification.model_validate(
{
"item": item,
"threadId": thread_id,
"turnId": turn_id,
}
),
)
def _token_usage_notification(
*,
thread_id: str = "thread-1",
turn_id: str = "turn-1",
) -> Notification:
return Notification(
method="thread/tokenUsage/updated",
payload=ThreadTokenUsageUpdatedNotification.model_validate(
{
"threadId": thread_id,
"turnId": turn_id,
"tokenUsage": {
"last": {
"cachedInputTokens": 1,
"inputTokens": 2,
"outputTokens": 3,
"reasoningOutputTokens": 4,
"totalTokens": 9,
},
"total": {
"cachedInputTokens": 5,
"inputTokens": 6,
"outputTokens": 7,
"reasoningOutputTokens": 8,
"totalTokens": 26,
},
},
}
),
@@ -225,6 +294,277 @@ def test_turn_run_returns_completed_turn_payload() -> None:
assert result.items == []
def test_thread_run_accepts_string_input_and_returns_run_result() -> None:
client = AppServerClient()
item_notification = _item_completed_notification(text="Hello.")
usage_notification = _token_usage_notification()
notifications: deque[Notification] = deque(
[
item_notification,
usage_notification,
_completed_notification(),
]
)
client.next_notification = notifications.popleft # type: ignore[method-assign]
seen: dict[str, object] = {}
def fake_turn_start(thread_id: str, wire_input: object, *, params=None): # noqa: ANN001,ANN202
seen["thread_id"] = thread_id
seen["wire_input"] = wire_input
seen["params"] = params
return SimpleNamespace(turn=SimpleNamespace(id="turn-1"))
client.turn_start = fake_turn_start # type: ignore[method-assign]
result = Thread(client, "thread-1").run("hello")
assert seen["thread_id"] == "thread-1"
assert seen["wire_input"] == [{"type": "text", "text": "hello"}]
assert result == RunResult(
final_response="Hello.",
items=[item_notification.payload.item],
usage=usage_notification.payload.token_usage,
)
def test_thread_run_uses_last_completed_assistant_message_as_final_response() -> None:
client = AppServerClient()
first_item_notification = _item_completed_notification(text="First message")
second_item_notification = _item_completed_notification(text="Second message")
notifications: deque[Notification] = deque(
[
first_item_notification,
second_item_notification,
_completed_notification(),
]
)
client.next_notification = notifications.popleft # type: ignore[method-assign]
client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731
turn=SimpleNamespace(id="turn-1")
)
result = Thread(client, "thread-1").run("hello")
assert result.final_response == "Second message"
assert result.items == [
first_item_notification.payload.item,
second_item_notification.payload.item,
]
def test_thread_run_preserves_empty_last_assistant_message() -> None:
client = AppServerClient()
first_item_notification = _item_completed_notification(text="First message")
second_item_notification = _item_completed_notification(text="")
notifications: deque[Notification] = deque(
[
first_item_notification,
second_item_notification,
_completed_notification(),
]
)
client.next_notification = notifications.popleft # type: ignore[method-assign]
client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731
turn=SimpleNamespace(id="turn-1")
)
result = Thread(client, "thread-1").run("hello")
assert result.final_response == ""
assert result.items == [
first_item_notification.payload.item,
second_item_notification.payload.item,
]
def test_thread_run_prefers_explicit_final_answer_over_later_commentary() -> None:
client = AppServerClient()
final_answer_notification = _item_completed_notification(
text="Final answer",
phase=MessagePhase.final_answer,
)
commentary_notification = _item_completed_notification(
text="Commentary",
phase=MessagePhase.commentary,
)
notifications: deque[Notification] = deque(
[
final_answer_notification,
commentary_notification,
_completed_notification(),
]
)
client.next_notification = notifications.popleft # type: ignore[method-assign]
client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731
turn=SimpleNamespace(id="turn-1")
)
result = Thread(client, "thread-1").run("hello")
assert result.final_response == "Final answer"
assert result.items == [
final_answer_notification.payload.item,
commentary_notification.payload.item,
]
def test_thread_run_returns_none_when_only_commentary_messages_complete() -> None:
client = AppServerClient()
commentary_notification = _item_completed_notification(
text="Commentary",
phase=MessagePhase.commentary,
)
notifications: deque[Notification] = deque(
[
commentary_notification,
_completed_notification(),
]
)
client.next_notification = notifications.popleft # type: ignore[method-assign]
client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731
turn=SimpleNamespace(id="turn-1")
)
result = Thread(client, "thread-1").run("hello")
assert result.final_response is None
assert result.items == [commentary_notification.payload.item]
def test_thread_run_raises_on_failed_turn() -> None:
client = AppServerClient()
notifications: deque[Notification] = deque(
[
_completed_notification(status="failed", error_message="boom"),
]
)
client.next_notification = notifications.popleft # type: ignore[method-assign]
client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731
turn=SimpleNamespace(id="turn-1")
)
with pytest.raises(RuntimeError, match="boom"):
Thread(client, "thread-1").run("hello")
def test_async_thread_run_accepts_string_input_and_returns_run_result() -> None:
async def scenario() -> None:
codex = AsyncCodex()
async def fake_ensure_initialized() -> None:
return None
item_notification = _item_completed_notification(text="Hello async.")
usage_notification = _token_usage_notification()
notifications: deque[Notification] = deque(
[
item_notification,
usage_notification,
_completed_notification(),
]
)
seen: dict[str, object] = {}
async def fake_turn_start(thread_id: str, wire_input: object, *, params=None): # noqa: ANN001,ANN202
seen["thread_id"] = thread_id
seen["wire_input"] = wire_input
seen["params"] = params
return SimpleNamespace(turn=SimpleNamespace(id="turn-1"))
async def fake_next_notification() -> Notification:
return notifications.popleft()
codex._ensure_initialized = fake_ensure_initialized # type: ignore[method-assign]
codex._client.turn_start = fake_turn_start # type: ignore[method-assign]
codex._client.next_notification = fake_next_notification # type: ignore[method-assign]
result = await AsyncThread(codex, "thread-1").run("hello")
assert seen["thread_id"] == "thread-1"
assert seen["wire_input"] == [{"type": "text", "text": "hello"}]
assert result == RunResult(
final_response="Hello async.",
items=[item_notification.payload.item],
usage=usage_notification.payload.token_usage,
)
asyncio.run(scenario())
def test_async_thread_run_uses_last_completed_assistant_message_as_final_response() -> None:
async def scenario() -> None:
codex = AsyncCodex()
async def fake_ensure_initialized() -> None:
return None
first_item_notification = _item_completed_notification(text="First async message")
second_item_notification = _item_completed_notification(text="Second async message")
notifications: deque[Notification] = deque(
[
first_item_notification,
second_item_notification,
_completed_notification(),
]
)
async def fake_turn_start(thread_id: str, wire_input: object, *, params=None): # noqa: ANN001,ANN202,ARG001
return SimpleNamespace(turn=SimpleNamespace(id="turn-1"))
async def fake_next_notification() -> Notification:
return notifications.popleft()
codex._ensure_initialized = fake_ensure_initialized # type: ignore[method-assign]
codex._client.turn_start = fake_turn_start # type: ignore[method-assign]
codex._client.next_notification = fake_next_notification # type: ignore[method-assign]
result = await AsyncThread(codex, "thread-1").run("hello")
assert result.final_response == "Second async message"
assert result.items == [
first_item_notification.payload.item,
second_item_notification.payload.item,
]
asyncio.run(scenario())
def test_async_thread_run_returns_none_when_only_commentary_messages_complete() -> None:
async def scenario() -> None:
codex = AsyncCodex()
async def fake_ensure_initialized() -> None:
return None
commentary_notification = _item_completed_notification(
text="Commentary",
phase=MessagePhase.commentary,
)
notifications: deque[Notification] = deque(
[
commentary_notification,
_completed_notification(),
]
)
async def fake_turn_start(thread_id: str, wire_input: object, *, params=None): # noqa: ANN001,ANN202,ARG001
return SimpleNamespace(turn=SimpleNamespace(id="turn-1"))
async def fake_next_notification() -> Notification:
return notifications.popleft()
codex._ensure_initialized = fake_ensure_initialized # type: ignore[method-assign]
codex._client.turn_start = fake_turn_start # type: ignore[method-assign]
codex._client.next_notification = fake_next_notification # type: ignore[method-assign]
result = await AsyncThread(codex, "thread-1").run("hello")
assert result.final_response is None
assert result.items == [commentary_notification.payload.item]
asyncio.run(scenario())
def test_retry_examples_compare_status_with_enum() -> None:
for path in (
ROOT / "examples" / "10_error_handling_and_retry" / "sync.py",

View File

@@ -4,7 +4,7 @@ import importlib.resources as resources
import inspect
from typing import Any
from codex_app_server import AppServerConfig
from codex_app_server import AppServerConfig, RunResult
from codex_app_server.models import InitializeResponse
from codex_app_server.api import AsyncCodex, AsyncThread, Codex, Thread
@@ -31,6 +31,10 @@ def test_root_exports_app_server_config() -> None:
assert AppServerConfig.__name__ == "AppServerConfig"
def test_root_exports_run_result() -> None:
assert RunResult.__name__ == "RunResult"
def test_package_includes_py_typed_marker() -> None:
marker = resources.files("codex_app_server").joinpath("py.typed")
assert marker.is_file()
@@ -101,6 +105,18 @@ def test_generated_public_signatures_are_snake_case_and_typed() -> None:
"service_tier",
"summary",
],
Thread.run: [
"approval_policy",
"approvals_reviewer",
"cwd",
"effort",
"model",
"output_schema",
"personality",
"sandbox_policy",
"service_tier",
"summary",
],
AsyncCodex.thread_start: [
"approval_policy",
"approvals_reviewer",
@@ -164,6 +180,18 @@ def test_generated_public_signatures_are_snake_case_and_typed() -> None:
"service_tier",
"summary",
],
AsyncThread.run: [
"approval_policy",
"approvals_reviewer",
"cwd",
"effort",
"model",
"output_schema",
"personality",
"sandbox_policy",
"service_tier",
"summary",
],
}
for fn, expected_kwargs in expected.items():

View File

@@ -265,6 +265,36 @@ def test_real_thread_and_turn_start_smoke(runtime_env: PreparedRuntimeEnv) -> No
assert isinstance(data["persisted_items_count"], int)
def test_real_thread_run_convenience_smoke(runtime_env: PreparedRuntimeEnv) -> None:
data = _run_json_python(
runtime_env,
textwrap.dedent(
"""
import json
from codex_app_server import Codex
with Codex() as codex:
thread = codex.thread_start(
model="gpt-5.4",
config={"model_reasoning_effort": "high"},
)
result = thread.run("say ok")
print(json.dumps({
"thread_id": thread.id,
"final_response": result.final_response,
"items_count": len(result.items),
"has_usage": result.usage is not None,
}))
"""
),
)
assert isinstance(data["thread_id"], str) and data["thread_id"].strip()
assert isinstance(data["final_response"], str) and data["final_response"].strip()
assert isinstance(data["items_count"], int)
assert isinstance(data["has_usage"], bool)
def test_real_async_thread_turn_usage_and_ids_smoke(
runtime_env: PreparedRuntimeEnv,
) -> None:
@@ -308,6 +338,42 @@ def test_real_async_thread_turn_usage_and_ids_smoke(
assert isinstance(data["persisted_items_count"], int)
def test_real_async_thread_run_convenience_smoke(
runtime_env: PreparedRuntimeEnv,
) -> None:
data = _run_json_python(
runtime_env,
textwrap.dedent(
"""
import asyncio
import json
from codex_app_server import AsyncCodex
async def main():
async with AsyncCodex() as codex:
thread = await codex.thread_start(
model="gpt-5.4",
config={"model_reasoning_effort": "high"},
)
result = await thread.run("say ok")
print(json.dumps({
"thread_id": thread.id,
"final_response": result.final_response,
"items_count": len(result.items),
"has_usage": result.usage is not None,
}))
asyncio.run(main())
"""
),
)
assert isinstance(data["thread_id"], str) and data["thread_id"].strip()
assert isinstance(data["final_response"], str) and data["final_response"].strip()
assert isinstance(data["items_count"], int)
assert isinstance(data["has_usage"], bool)
def test_notebook_bootstrap_resolves_sdk_and_runtime_from_unrelated_cwd(
runtime_env: PreparedRuntimeEnv,
) -> None: