mirror of
https://github.com/openai/codex.git
synced 2026-05-20 11:12:43 +00:00
## Summary
- Allow thread.turn and turn.steer, including async variants, to accept
RunInput so plain strings work alongside typed input objects.
- Export RunInput and update the SDK artifact generator so regenerated
turn methods keep the same signature and normalization.
- Update docs, examples, notebook cells, and tests to use string
shorthand for text-only turns while keeping typed inputs for multimodal
input.
## Validation
- uv run --extra dev ruff format .
- uv run --extra dev ruff check --output-format=github .
- python3 -m py_compile sdk/python/src/openai_codex/__init__.py
sdk/python/src/openai_codex/api.py
sdk/python/src/openai_codex/_inputs.py
sdk/python/scripts/update_sdk_artifacts.py
sdk/python/tests/test_public_api_signatures.py
sdk/python/tests/test_app_server_streaming.py
sdk/python/tests/test_app_server_turn_controls.py
sdk/python/tests/test_real_app_server_integration.py
- python3 -c "import json;
json.load(open('sdk/python/notebooks/sdk_walkthrough.ipynb'))"
- sdk/python/.venv/bin/python -c "import inspect, openai_codex; from
openai_codex import Thread, AsyncThread, TurnHandle, AsyncTurnHandle,
RunInput; funcs=[Thread.run, Thread.turn, AsyncThread.run,
AsyncThread.turn, TurnHandle.steer, AsyncTurnHandle.steer]; assert
all(inspect.signature(fn).parameters['input'].annotation == 'RunInput'
for fn in funcs); assert RunInput is openai_codex.RunInput"
275 lines
10 KiB
Python
275 lines
10 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
|
|
from app_server_harness import AppServerHarness
|
|
from app_server_helpers import (
|
|
agent_message_texts,
|
|
agent_message_texts_from_items,
|
|
next_async_delta,
|
|
next_sync_delta,
|
|
streaming_response,
|
|
)
|
|
|
|
from openai_codex import AsyncCodex, Codex
|
|
from openai_codex.generated.v2_all import (
|
|
AgentMessageDeltaNotification,
|
|
TurnCompletedNotification,
|
|
TurnStatus,
|
|
)
|
|
|
|
|
|
def test_sync_stream_routes_text_deltas_and_completion(tmp_path) -> None:
|
|
"""A sync turn stream should expose deltas, completed items, and completion."""
|
|
with AppServerHarness(tmp_path) as harness:
|
|
harness.responses.enqueue_sse(streaming_response("stream-1", "msg-stream-1", ["he", "llo"]))
|
|
|
|
with Codex(config=harness.app_server_config()) as codex:
|
|
thread = codex.thread_start()
|
|
stream = thread.turn("stream please").stream()
|
|
events = list(stream)
|
|
request = harness.responses.single_request()
|
|
|
|
assert {
|
|
"deltas": [
|
|
event.payload.delta
|
|
for event in events
|
|
if isinstance(event.payload, AgentMessageDeltaNotification)
|
|
],
|
|
"agent_messages": agent_message_texts(events),
|
|
"request_user_texts": request.message_input_texts("user")[-1:],
|
|
"completed_statuses": [
|
|
event.payload.turn.status
|
|
for event in events
|
|
if isinstance(event.payload, TurnCompletedNotification)
|
|
],
|
|
} == {
|
|
"deltas": ["he", "llo"],
|
|
"agent_messages": ["hello"],
|
|
"request_user_texts": ["stream please"],
|
|
"completed_statuses": [TurnStatus.completed],
|
|
}
|
|
|
|
|
|
def test_turn_run_returns_completed_turn(tmp_path) -> None:
|
|
"""TurnHandle.run should collect output and completion metadata."""
|
|
with AppServerHarness(tmp_path) as harness:
|
|
harness.responses.enqueue_assistant_message("turn complete", response_id="turn-run-1")
|
|
|
|
with Codex(config=harness.app_server_config()) as codex:
|
|
thread = codex.thread_start()
|
|
turn = thread.turn("complete this turn")
|
|
completed = turn.run()
|
|
|
|
assert {
|
|
"turn_id": completed.id,
|
|
"status": completed.status,
|
|
"agent_messages": agent_message_texts_from_items(completed.items),
|
|
"final_response": completed.final_response,
|
|
} == {
|
|
"turn_id": turn.id,
|
|
"status": TurnStatus.completed,
|
|
"agent_messages": ["turn complete"],
|
|
"final_response": "turn complete",
|
|
}
|
|
|
|
|
|
def test_async_stream_routes_text_deltas_and_completion(tmp_path) -> None:
|
|
"""An async turn stream should expose the same notification sequence."""
|
|
|
|
async def scenario() -> None:
|
|
"""Stream one async turn against the real pinned app-server."""
|
|
with AppServerHarness(tmp_path) as harness:
|
|
harness.responses.enqueue_sse(
|
|
streaming_response("async-stream-1", "msg-async-stream-1", ["as", "ync"])
|
|
)
|
|
|
|
async with AsyncCodex(config=harness.app_server_config()) as codex:
|
|
thread = await codex.thread_start()
|
|
turn = await thread.turn("async stream please")
|
|
events = [event async for event in turn.stream()]
|
|
request = harness.responses.single_request()
|
|
|
|
assert {
|
|
"deltas": [
|
|
event.payload.delta
|
|
for event in events
|
|
if isinstance(event.payload, AgentMessageDeltaNotification)
|
|
],
|
|
"agent_messages": agent_message_texts(events),
|
|
"request_user_texts": request.message_input_texts("user")[-1:],
|
|
"completed_statuses": [
|
|
event.payload.turn.status
|
|
for event in events
|
|
if isinstance(event.payload, TurnCompletedNotification)
|
|
],
|
|
} == {
|
|
"deltas": ["as", "ync"],
|
|
"agent_messages": ["async"],
|
|
"request_user_texts": ["async stream please"],
|
|
"completed_statuses": [TurnStatus.completed],
|
|
}
|
|
|
|
asyncio.run(scenario())
|
|
|
|
|
|
def test_low_level_sync_stream_text_uses_real_turn_routing(tmp_path) -> None:
|
|
"""AppServerClient.stream_text should stream through a real app-server turn."""
|
|
with AppServerHarness(tmp_path) as harness:
|
|
harness.responses.enqueue_sse(
|
|
streaming_response("low-sync-stream", "msg-low-sync-stream", ["fir", "st"])
|
|
)
|
|
|
|
with Codex(config=harness.app_server_config()) as codex:
|
|
thread = codex.thread_start()
|
|
chunks = list(codex._client.stream_text(thread.id, "low-level sync")) # noqa: SLF001
|
|
|
|
assert [chunk.delta for chunk in chunks] == ["fir", "st"]
|
|
|
|
|
|
def test_low_level_async_stream_text_allows_parallel_model_list(tmp_path) -> None:
|
|
"""Async stream_text should yield without blocking another app-server request."""
|
|
|
|
async def scenario() -> None:
|
|
"""Leave a stream open while another async request completes."""
|
|
with AppServerHarness(tmp_path) as harness:
|
|
harness.responses.enqueue_sse(
|
|
streaming_response(
|
|
"low-async-stream",
|
|
"msg-low-async-stream",
|
|
["one", "two", "three"],
|
|
),
|
|
delay_between_events_s=0.03,
|
|
)
|
|
|
|
async with AsyncCodex(config=harness.app_server_config()) as codex:
|
|
thread = await codex.thread_start()
|
|
stream = codex._client.stream_text( # noqa: SLF001
|
|
thread.id,
|
|
"low-level async",
|
|
)
|
|
first = await anext(stream)
|
|
models_task = asyncio.create_task(codex.models())
|
|
models = await asyncio.wait_for(models_task, timeout=1.0)
|
|
remaining = [chunk.delta async for chunk in stream]
|
|
|
|
assert {
|
|
"first": first.delta,
|
|
"remaining": remaining,
|
|
"models_payload_has_data": isinstance(
|
|
models.model_dump(by_alias=True, mode="json").get("data"),
|
|
list,
|
|
),
|
|
} == {
|
|
"first": "one",
|
|
"remaining": ["two", "three"],
|
|
"models_payload_has_data": True,
|
|
}
|
|
|
|
asyncio.run(scenario())
|
|
|
|
|
|
def test_interleaved_sync_turn_streams_route_by_turn_id(tmp_path) -> None:
|
|
"""Two sync streams on one client should consume only their own notifications."""
|
|
with AppServerHarness(tmp_path) as harness:
|
|
harness.responses.enqueue_sse(
|
|
streaming_response("first-stream", "msg-first", ["one-", "done"]),
|
|
delay_between_events_s=0.01,
|
|
)
|
|
harness.responses.enqueue_sse(
|
|
streaming_response("second-stream", "msg-second", ["two-", "done"]),
|
|
delay_between_events_s=0.01,
|
|
)
|
|
|
|
with Codex(config=harness.app_server_config()) as codex:
|
|
first_thread = codex.thread_start()
|
|
second_thread = codex.thread_start()
|
|
first_turn = first_thread.turn("first")
|
|
second_turn = second_thread.turn("second")
|
|
|
|
first_stream = first_turn.stream()
|
|
second_stream = second_turn.stream()
|
|
first_first_delta = next_sync_delta(first_stream)
|
|
second_first_delta = next_sync_delta(second_stream)
|
|
first_second_delta = next_sync_delta(first_stream)
|
|
second_second_delta = next_sync_delta(second_stream)
|
|
first_tail = list(first_stream)
|
|
second_tail = list(second_stream)
|
|
|
|
assert {
|
|
"streams": sorted(
|
|
[
|
|
(
|
|
first_first_delta,
|
|
first_second_delta,
|
|
agent_message_texts(first_tail),
|
|
),
|
|
(
|
|
second_first_delta,
|
|
second_second_delta,
|
|
agent_message_texts(second_tail),
|
|
),
|
|
]
|
|
),
|
|
} == {
|
|
"streams": [
|
|
("one-", "done", ["one-done"]),
|
|
("two-", "done", ["two-done"]),
|
|
],
|
|
}
|
|
|
|
|
|
def test_interleaved_async_turn_streams_route_by_turn_id(tmp_path) -> None:
|
|
"""Two async streams on one client should consume only their own notifications."""
|
|
|
|
async def scenario() -> None:
|
|
"""Interleave async stream consumers against one app-server process."""
|
|
with AppServerHarness(tmp_path) as harness:
|
|
harness.responses.enqueue_sse(
|
|
streaming_response("async-first", "msg-async-first", ["a1", "-done"]),
|
|
delay_between_events_s=0.01,
|
|
)
|
|
harness.responses.enqueue_sse(
|
|
streaming_response("async-second", "msg-async-second", ["a2", "-done"]),
|
|
delay_between_events_s=0.01,
|
|
)
|
|
|
|
async with AsyncCodex(config=harness.app_server_config()) as codex:
|
|
first_thread = await codex.thread_start()
|
|
second_thread = await codex.thread_start()
|
|
first_turn = await first_thread.turn("async first")
|
|
second_turn = await second_thread.turn("async second")
|
|
|
|
first_stream = first_turn.stream()
|
|
second_stream = second_turn.stream()
|
|
first_first_delta = await next_async_delta(first_stream)
|
|
second_first_delta = await next_async_delta(second_stream)
|
|
first_second_delta = await next_async_delta(first_stream)
|
|
second_second_delta = await next_async_delta(second_stream)
|
|
first_tail = [event async for event in first_stream]
|
|
second_tail = [event async for event in second_stream]
|
|
|
|
assert {
|
|
"streams": sorted(
|
|
[
|
|
(
|
|
first_first_delta,
|
|
first_second_delta,
|
|
agent_message_texts(first_tail),
|
|
),
|
|
(
|
|
second_first_delta,
|
|
second_second_delta,
|
|
agent_message_texts(second_tail),
|
|
),
|
|
]
|
|
),
|
|
} == {
|
|
"streams": [
|
|
("a1", "-done", ["a1-done"]),
|
|
("a2", "-done", ["a2-done"]),
|
|
],
|
|
}
|
|
|
|
asyncio.run(scenario())
|