Port SDK behavior tests to app-server harness

Move result extraction, stream_text, approval inheritance, model list, and compact coverage onto the pinned app-server integration harness so the remaining unit tests stay focused on generated models and transport internals.

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Ahmed Ibrahim
2026-05-10 14:04:55 +03:00
parent a248323b7d
commit 5d0ba5a0be
5 changed files with 384 additions and 486 deletions

View File

@@ -77,11 +77,19 @@ class MockSseResponse:
return chunks
@dataclass(frozen=True)
class MockJsonResponse:
"""One queued JSON response served by the mock Responses API."""
body: Json
class MockResponsesServer:
"""Local HTTP server that records `/v1/responses` requests and returns SSE."""
def __init__(self) -> None:
self._responses: queue.Queue[MockSseResponse] = queue.Queue()
self._compact_responses: queue.Queue[MockJsonResponse] = queue.Queue()
self._requests: list[CapturedResponsesRequest] = []
self._requests_lock = threading.Lock()
self._server = _ResponsesHttpServer(("127.0.0.1", 0), _ResponsesHandler, self)
@@ -136,6 +144,21 @@ class MockResponsesServer:
)
)
def enqueue_compaction_summary(self, summary: str) -> None:
"""Queue a compact endpoint response with one synthetic compaction item."""
self._compact_responses.put(
MockJsonResponse(
body={
"output": [
{
"type": "compaction",
"encrypted_content": summary,
}
]
}
)
)
def requests(self) -> list[CapturedResponsesRequest]:
"""Return all recorded Responses API requests."""
with self._requests_lock:
@@ -180,6 +203,10 @@ class MockResponsesServer:
"""Return the next queued SSE response or fail the HTTP request."""
return self._responses.get_nowait()
def _next_compact_response(self) -> MockJsonResponse:
"""Return the next queued compact JSON response or fail the HTTP request."""
return self._compact_responses.get_nowait()
class AppServerHarness:
"""Test fixture that points a pinned runtime app-server at MockResponsesServer."""
@@ -276,11 +303,22 @@ class _ResponsesHandler(BaseHTTPRequestHandler):
self.send_error(404, f"unexpected GET {self.path}")
def do_POST(self) -> None:
"""Serve queued SSE responses for `/v1/responses` requests."""
"""Serve queued responses for `/v1/responses` and compact requests."""
length = int(self.headers.get("content-length", "0"))
body = self.rfile.read(length)
self.server.mock._record_request(self, body)
if self.path.endswith("/v1/responses/compact") or self.path.endswith(
"/responses/compact"
):
try:
response = self.server.mock._next_compact_response()
except queue.Empty:
self.send_error(500, "no queued compact response")
return
self._send_json(response.body)
return
if not (self.path.endswith("/v1/responses") or self.path.endswith("/responses")):
self.send_error(404, f"unexpected POST {self.path}")
return

View File

@@ -2,11 +2,9 @@ from __future__ import annotations
import asyncio
import time
from types import SimpleNamespace
from openai_codex.async_client import AsyncAppServerClient
from openai_codex.generated.v2_all import (
AgentMessageDeltaNotification,
TurnCompletedNotification,
)
from openai_codex.models import Notification, UnknownNotification
@@ -36,46 +34,6 @@ def test_async_client_allows_concurrent_transport_calls() -> None:
assert asyncio.run(scenario()) == 2
def test_async_stream_text_is_incremental_without_blocking_parallel_calls() -> None:
"""Async text streaming should yield incrementally without blocking other calls."""
async def scenario() -> tuple[str, list[str], bool]:
"""Start a stream, then prove another async client call can finish."""
client = AsyncAppServerClient()
def fake_stream_text(thread_id: str, text: str, params=None): # type: ignore[no-untyped-def]
"""Yield one item before sleeping so the async wrapper can interleave."""
yield "first"
time.sleep(0.03)
yield "second"
yield "third"
def fake_model_list(include_hidden: bool = False) -> str:
"""Return immediately to prove the event loop was not monopolized."""
return "done"
client._sync.stream_text = fake_stream_text # type: ignore[method-assign]
client._sync.model_list = fake_model_list # type: ignore[method-assign]
stream = client.stream_text("thread-1", "hello")
first = await anext(stream)
competing_call = asyncio.create_task(client.model_list())
await asyncio.sleep(0.01)
competing_call_done_before_stream_done = competing_call.done()
remaining: list[str] = []
async for item in stream:
remaining.append(item)
await competing_call
return first, remaining, competing_call_done_before_stream_done
first, remaining, was_unblocked = asyncio.run(scenario())
assert first == "first"
assert remaining == ["second", "third"]
assert was_unblocked
def test_async_client_turn_notification_methods_delegate_to_sync_client() -> None:
"""Async turn routing methods should preserve sync-client registration semantics."""
async def scenario() -> tuple[list[tuple[str, str]], Notification, str]:
@@ -142,84 +100,3 @@ def test_async_client_turn_notification_methods_delegate_to_sync_client() -> Non
),
"turn-1",
)
def test_async_stream_text_uses_sync_turn_routing() -> None:
"""Async text streaming should consume the same per-turn routing path as sync."""
async def scenario() -> tuple[list[tuple[str, str]], list[str]]:
"""Record routing calls while streaming two deltas and one completion."""
client = AsyncAppServerClient()
notifications = [
Notification(
method="item/agentMessage/delta",
payload=AgentMessageDeltaNotification.model_validate(
{
"delta": "first",
"itemId": "item-1",
"threadId": "thread-1",
"turnId": "turn-1",
}
),
),
Notification(
method="item/agentMessage/delta",
payload=AgentMessageDeltaNotification.model_validate(
{
"delta": "second",
"itemId": "item-2",
"threadId": "thread-1",
"turnId": "turn-1",
}
),
),
Notification(
method="turn/completed",
payload=TurnCompletedNotification.model_validate(
{
"threadId": "thread-1",
"turn": {"id": "turn-1", "items": [], "status": "completed"},
}
),
),
]
calls: list[tuple[str, str]] = []
def fake_turn_start(thread_id: str, text: str, *, params=None): # type: ignore[no-untyped-def]
"""Return a started turn id while recording the request thread."""
calls.append(("turn_start", thread_id))
return SimpleNamespace(turn=SimpleNamespace(id="turn-1"))
def fake_register(turn_id: str) -> None:
"""Record stream registration for the started turn."""
calls.append(("register", turn_id))
def fake_next(turn_id: str) -> Notification:
"""Return the next queued turn notification."""
calls.append(("next", turn_id))
return notifications.pop(0)
def fake_unregister(turn_id: str) -> None:
"""Record stream cleanup for the started turn."""
calls.append(("unregister", turn_id))
client._sync.turn_start = fake_turn_start # type: ignore[method-assign]
client._sync.register_turn_notifications = fake_register # type: ignore[method-assign]
client._sync.next_turn_notification = fake_next # type: ignore[method-assign]
client._sync.unregister_turn_notifications = fake_unregister # type: ignore[method-assign]
chunks = [chunk async for chunk in client.stream_text("thread-1", "hello")]
return calls, [chunk.delta for chunk in chunks]
calls, deltas = asyncio.run(scenario())
assert (calls, deltas) == (
[
("turn_start", "thread-1"),
("register", "turn-1"),
("next", "turn-1"),
("next", "turn-1"),
("next", "turn-1"),
("unregister", "turn-1"),
],
["first", "second"],
)

View File

@@ -1,7 +1,6 @@
from __future__ import annotations
from pathlib import Path
from typing import Any
from openai_codex.client import AppServerClient, _params_dict
from openai_codex.generated.notification_registry import notification_turn_id
@@ -19,23 +18,6 @@ from openai_codex.models import Notification, UnknownNotification
ROOT = Path(__file__).resolve().parents[1]
def test_thread_set_name_and_compact_use_current_rpc_methods() -> None:
client = AppServerClient()
calls: list[tuple[str, dict[str, Any] | None]] = []
def fake_request(method: str, params, *, response_model): # type: ignore[no-untyped-def]
calls.append((method, params))
return response_model.model_validate({})
client.request = fake_request # type: ignore[method-assign]
client.thread_set_name("thread-1", "sdk-name")
client.thread_compact("thread-1")
assert calls[0][0] == "thread/name/set"
assert calls[1][0] == "thread/compact/start"
def test_generated_params_models_are_snake_case_and_dump_by_alias() -> None:
params = ThreadListParams(search_term="needle", limit=5)

View File

@@ -4,10 +4,13 @@ import asyncio
from collections.abc import AsyncIterator, Iterable, Iterator
from typing import Any
import pytest
from app_server_harness import (
AppServerHarness,
ev_assistant_message,
ev_completed,
ev_failed,
ev_message_item_added,
ev_output_text_delta,
ev_response_created,
@@ -82,6 +85,26 @@ def _streaming_response(response_id: str, item_id: str, parts: list[str]) -> str
)
def _assistant_message_with_phase(
item_id: str,
text: str,
phase: MessagePhase,
) -> dict[str, Any]:
"""Build an assistant message event carrying app-server phase metadata."""
event = ev_assistant_message(item_id, text)
event["item"] = {**event["item"], "phase": phase.value}
return event
def _request_kind(request_path: str) -> str:
"""Classify captured mock-server request paths for compact assertions."""
if request_path.endswith("/responses/compact"):
return "compact"
if request_path.endswith("/responses"):
return "responses"
return request_path
def test_sync_thread_run_uses_pinned_app_server_and_mock_responses(
tmp_path,
) -> None:
@@ -145,6 +168,165 @@ def test_async_thread_run_uses_pinned_app_server_and_mock_responses(
asyncio.run(scenario())
def test_run_result_item_semantics_use_real_app_server(tmp_path) -> None:
"""RunResult should reflect real item notifications, not synthetic client queues."""
cases = [
(
"last unknown phase wins",
sse(
[
ev_response_created("items-last"),
ev_assistant_message("msg-items-first", "First message"),
ev_assistant_message("msg-items-second", "Second message"),
ev_completed("items-last"),
]
),
"Second message",
["First message", "Second message"],
),
(
"empty last message is preserved",
sse(
[
ev_response_created("items-empty"),
ev_assistant_message("msg-items-nonempty", "First message"),
ev_assistant_message("msg-items-empty", ""),
ev_completed("items-empty"),
]
),
"",
["First message", ""],
),
(
"commentary only is not final",
sse(
[
ev_response_created("items-commentary"),
_assistant_message_with_phase(
"msg-items-commentary",
"Commentary",
MessagePhase.commentary,
),
ev_completed("items-commentary"),
]
),
None,
["Commentary"],
),
]
with AppServerHarness(tmp_path) as harness:
for _, body, _, _ in cases:
harness.responses.enqueue_sse(body)
with Codex(config=harness.app_server_config()) as codex:
results = [
codex.thread_start().run(f"case: {name}") for name, _, _, _ in cases
]
assert [
{
"final_response": result.final_response,
"agent_messages": _agent_message_texts_from_items(result.items),
}
for result in results
] == [
{
"final_response": final_response,
"agent_messages": agent_messages,
}
for _, _, final_response, agent_messages in cases
]
def test_thread_run_raises_when_real_app_server_reports_failed_turn(tmp_path) -> None:
"""Thread.run should surface the failed turn error emitted by app-server."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(
sse(
[
ev_response_created("failed-run"),
ev_failed("failed-run", "boom from mock model"),
]
)
)
with Codex(config=harness.app_server_config()) as codex:
thread = codex.thread_start()
with pytest.raises(RuntimeError, match="boom from mock model"):
thread.run("trigger failure")
def test_async_run_result_item_semantics_use_real_app_server(tmp_path) -> None:
"""Async RunResult should use the same real app-server notification mapping."""
async def scenario() -> None:
"""Run multiple async result cases against one app-server process."""
cases = [
(
"last async unknown phase wins",
sse(
[
ev_response_created("async-items-last"),
ev_assistant_message(
"msg-async-items-first",
"First async message",
),
ev_assistant_message(
"msg-async-items-second",
"Second async message",
),
ev_completed("async-items-last"),
]
),
"Second async message",
["First async message", "Second async message"],
),
(
"async commentary only is not final",
sse(
[
ev_response_created("async-items-commentary"),
_assistant_message_with_phase(
"msg-async-items-commentary",
"Async commentary",
MessagePhase.commentary,
),
ev_completed("async-items-commentary"),
]
),
None,
["Async commentary"],
),
]
with AppServerHarness(tmp_path) as harness:
for _, body, _, _ in cases:
harness.responses.enqueue_sse(body)
async with AsyncCodex(config=harness.app_server_config()) as codex:
results = [
await (await codex.thread_start()).run(f"case: {name}")
for name, _, _, _ in cases
]
assert [
{
"final_response": result.final_response,
"agent_messages": _agent_message_texts_from_items(result.items),
}
for result in results
] == [
{
"final_response": final_response,
"agent_messages": agent_messages,
}
for _, _, final_response, agent_messages in cases
]
asyncio.run(scenario())
def test_sync_stream_routes_text_deltas_and_completion(tmp_path) -> None:
"""A sync turn stream should expose deltas, completed items, and completion."""
with AppServerHarness(tmp_path) as harness:
@@ -233,6 +415,62 @@ def test_async_stream_routes_text_deltas_and_completion(tmp_path) -> None:
asyncio.run(scenario())
def test_low_level_sync_stream_text_uses_real_turn_routing(tmp_path) -> None:
"""AppServerClient.stream_text should stream through a real app-server turn."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(
_streaming_response("low-sync-stream", "msg-low-sync-stream", ["fir", "st"])
)
with Codex(config=harness.app_server_config()) as codex:
thread = codex.thread_start()
chunks = list(codex._client.stream_text(thread.id, "low-level sync")) # noqa: SLF001
assert [chunk.delta for chunk in chunks] == ["fir", "st"]
def test_low_level_async_stream_text_allows_parallel_model_list(tmp_path) -> None:
"""Async stream_text should yield without blocking another app-server request."""
async def scenario() -> None:
"""Leave a stream open while another async request completes."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(
_streaming_response(
"low-async-stream",
"msg-low-async-stream",
["one", "two", "three"],
),
delay_between_events_s=0.03,
)
async with AsyncCodex(config=harness.app_server_config()) as codex:
thread = await codex.thread_start()
stream = codex._client.stream_text( # noqa: SLF001
thread.id,
"low-level async",
)
first = await anext(stream)
models_task = asyncio.create_task(codex.models())
models = await asyncio.wait_for(models_task, timeout=1.0)
remaining = [chunk.delta async for chunk in stream]
assert {
"first": first.delta,
"remaining": remaining,
"models_payload_has_data": isinstance(
models.model_dump(by_alias=True, mode="json").get("data"),
list,
),
} == {
"first": "one",
"remaining": ["two", "three"],
"models_payload_has_data": True,
}
asyncio.run(scenario())
def test_interleaved_sync_turn_streams_route_by_turn_id(tmp_path) -> None:
"""Two sync streams on one client should consume only their own notifications."""
with AppServerHarness(tmp_path) as harness:
@@ -338,6 +576,79 @@ def test_interleaved_async_turn_streams_route_by_turn_id(tmp_path) -> None:
asyncio.run(scenario())
def test_approval_modes_preserve_real_app_server_state_without_override(
tmp_path,
) -> None:
"""Resume, fork, and next turn should inherit approval settings unless overridden."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_assistant_message("turn override", response_id="turn-mode-1")
harness.responses.enqueue_assistant_message("turn inherited", response_id="turn-mode-2")
with Codex(config=harness.app_server_config()) as codex:
source = codex.thread_start(approval_mode=ApprovalMode.deny_all)
resumed = codex.thread_resume(source.id)
forked = codex.thread_fork(source.id)
explicit_fork = codex.thread_fork(
source.id,
approval_mode=ApprovalMode.auto_review,
)
turn_thread = codex.thread_start()
first_result = turn_thread.run(
"deny this and later turns",
approval_mode=ApprovalMode.deny_all,
)
after_turn_override = codex._client.thread_resume( # noqa: SLF001
turn_thread.id,
ThreadResumeParams(thread_id=turn_thread.id),
)
second_result = turn_thread.run("inherit previous approval mode")
after_omitted_turn = codex._client.thread_resume( # noqa: SLF001
turn_thread.id,
ThreadResumeParams(thread_id=turn_thread.id),
)
inherited_policies = {
"resumed": _response_approval_policy(
codex._client.thread_resume( # noqa: SLF001
resumed.id,
ThreadResumeParams(thread_id=resumed.id),
)
),
"forked": _response_approval_policy(
codex._client.thread_resume( # noqa: SLF001
forked.id,
ThreadResumeParams(thread_id=forked.id),
)
),
"explicit_fork": _response_approval_policy(
codex._client.thread_resume( # noqa: SLF001
explicit_fork.id,
ThreadResumeParams(thread_id=explicit_fork.id),
)
),
"after_turn_override": _response_approval_policy(after_turn_override),
"after_omitted_turn": _response_approval_policy(after_omitted_turn),
}
assert {
"policies": inherited_policies,
"final_responses": [
first_result.final_response,
second_result.final_response,
],
} == {
"policies": {
"resumed": AskForApprovalValue.never.value,
"forked": AskForApprovalValue.never.value,
"explicit_fork": AskForApprovalValue.on_request.value,
"after_turn_override": AskForApprovalValue.never.value,
"after_omitted_turn": AskForApprovalValue.never.value,
},
"final_responses": ["turn override", "turn inherited"],
}
def test_thread_run_approval_mode_persists_until_explicit_override(tmp_path) -> None:
"""Omitted run approval mode should not rewrite the thread's stored setting."""
with AppServerHarness(tmp_path) as harness:
@@ -442,6 +753,38 @@ def test_thread_lifecycle_uses_real_app_server_without_model_mocking(tmp_path) -
}
def test_models_and_compact_use_real_app_server_rpcs(tmp_path) -> None:
"""Model listing and compaction should go through real app-server methods."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_assistant_message("history", response_id="compact-history")
harness.responses.enqueue_compaction_summary("compact summary")
with Codex(config=harness.app_server_config()) as codex:
models = codex.models(include_hidden=True)
thread = codex.thread_start()
run_result = thread.run("create history")
compact_response = thread.compact()
requests = harness.responses.wait_for_requests(2)
assert {
"models_payload_has_data": isinstance(
models.model_dump(by_alias=True, mode="json").get("data"),
list,
),
"run_final_response": run_result.final_response,
"compact_response": compact_response.model_dump(
by_alias=True,
mode="json",
),
"request_kinds": [_request_kind(request.path) for request in requests],
} == {
"models_payload_has_data": True,
"run_final_response": "history",
"compact_response": {},
"request_kinds": ["responses", "compact"],
}
def test_final_answer_phase_survives_real_app_server_mapping(tmp_path) -> None:
"""RunResult should use the final-answer item emitted by app-server."""
with AppServerHarness(tmp_path) as harness:

View File

@@ -1,29 +1,18 @@
from __future__ import annotations
import asyncio
from collections import deque
from pathlib import Path
from types import SimpleNamespace
from typing import Any
import pytest
import openai_codex.api as public_api_module
from openai_codex.client import AppServerClient
from openai_codex.generated.v2_all import (
AgentMessageDeltaNotification,
ItemCompletedNotification,
MessagePhase,
TurnCompletedNotification,
TurnStartParams,
)
from openai_codex.models import InitializeResponse, Notification
from openai_codex.generated.v2_all import TurnStartParams
from openai_codex.models import InitializeResponse
from openai_codex.api import (
ApprovalMode,
AsyncCodex,
AsyncThread,
Codex,
Thread,
)
ROOT = Path(__file__).resolve().parents[1]
@@ -45,79 +34,6 @@ def _approval_settings(params: list[Any]) -> list[dict[str, object]]:
]
def _delta_notification(
*,
thread_id: str = "thread-1",
turn_id: str = "turn-1",
text: str = "delta-text",
) -> Notification:
return Notification(
method="item/agentMessage/delta",
payload=AgentMessageDeltaNotification.model_validate(
{
"delta": text,
"itemId": "item-1",
"threadId": thread_id,
"turnId": turn_id,
}
),
)
def _completed_notification(
*,
thread_id: str = "thread-1",
turn_id: str = "turn-1",
status: str = "completed",
error_message: str | None = None,
) -> Notification:
turn: dict[str, object] = {
"id": turn_id,
"items": [],
"status": status,
}
if error_message is not None:
turn["error"] = {"message": error_message}
return Notification(
method="turn/completed",
payload=TurnCompletedNotification.model_validate(
{
"threadId": thread_id,
"turn": turn,
}
),
)
def _item_completed_notification(
*,
thread_id: str = "thread-1",
turn_id: str = "turn-1",
text: str = "final text",
phase: MessagePhase | None = None,
) -> Notification:
"""Build a realistic completed-item notification accepted by generated models."""
item: dict[str, object] = {
"id": "item-1",
"text": text,
"type": "agentMessage",
}
if phase is not None:
item["phase"] = phase.value
return Notification(
method="item/completed",
payload=ItemCompletedNotification.model_validate(
{
# The pinned runtime schema requires completion timestamps.
"completedAtMs": 1,
"item": item,
"threadId": thread_id,
"turnId": turn_id,
}
),
)
def test_codex_init_failure_closes_client(monkeypatch: pytest.MonkeyPatch) -> None:
closed: list[bool] = []
@@ -244,264 +160,6 @@ def test_unknown_approval_mode_is_rejected() -> None:
public_api_module._approval_mode_settings("allow_all") # type: ignore[arg-type]
def test_thread_run_uses_last_completed_assistant_message_as_final_response() -> None:
client = AppServerClient()
first_item_notification = _item_completed_notification(text="First message")
second_item_notification = _item_completed_notification(text="Second message")
notifications: deque[Notification] = deque(
[
first_item_notification,
second_item_notification,
_completed_notification(),
]
)
client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign]
client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731
turn=SimpleNamespace(id="turn-1")
)
result = Thread(client, "thread-1").run("hello")
assert result.final_response == "Second message"
assert result.items == [
first_item_notification.payload.item,
second_item_notification.payload.item,
]
def test_thread_run_preserves_empty_last_assistant_message() -> None:
client = AppServerClient()
first_item_notification = _item_completed_notification(text="First message")
second_item_notification = _item_completed_notification(text="")
notifications: deque[Notification] = deque(
[
first_item_notification,
second_item_notification,
_completed_notification(),
]
)
client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign]
client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731
turn=SimpleNamespace(id="turn-1")
)
result = Thread(client, "thread-1").run("hello")
assert result.final_response == ""
assert result.items == [
first_item_notification.payload.item,
second_item_notification.payload.item,
]
def test_thread_run_prefers_explicit_final_answer_over_later_commentary() -> None:
client = AppServerClient()
final_answer_notification = _item_completed_notification(
text="Final answer",
phase=MessagePhase.final_answer,
)
commentary_notification = _item_completed_notification(
text="Commentary",
phase=MessagePhase.commentary,
)
notifications: deque[Notification] = deque(
[
final_answer_notification,
commentary_notification,
_completed_notification(),
]
)
client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign]
client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731
turn=SimpleNamespace(id="turn-1")
)
result = Thread(client, "thread-1").run("hello")
assert result.final_response == "Final answer"
assert result.items == [
final_answer_notification.payload.item,
commentary_notification.payload.item,
]
def test_thread_run_returns_none_when_only_commentary_messages_complete() -> None:
client = AppServerClient()
commentary_notification = _item_completed_notification(
text="Commentary",
phase=MessagePhase.commentary,
)
notifications: deque[Notification] = deque(
[
commentary_notification,
_completed_notification(),
]
)
client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign]
client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731
turn=SimpleNamespace(id="turn-1")
)
result = Thread(client, "thread-1").run("hello")
assert result.final_response is None
assert result.items == [commentary_notification.payload.item]
def test_thread_run_raises_on_failed_turn() -> None:
client = AppServerClient()
notifications: deque[Notification] = deque(
[
_completed_notification(status="failed", error_message="boom"),
]
)
client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign]
client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731
turn=SimpleNamespace(id="turn-1")
)
with pytest.raises(RuntimeError, match="boom"):
Thread(client, "thread-1").run("hello")
def test_stream_text_registers_and_consumes_turn_notifications() -> None:
"""stream_text should register, consume, and unregister one turn queue."""
client = AppServerClient()
notifications: deque[Notification] = deque(
[
_delta_notification(text="first"),
_delta_notification(text="second"),
_completed_notification(),
]
)
calls: list[tuple[str, str]] = []
client.turn_start = lambda thread_id, input_items, *, params=None: SimpleNamespace( # noqa: ARG005,E731
turn=SimpleNamespace(id="turn-1")
)
def fake_register(turn_id: str) -> None:
"""Record registration for the turn created by stream_text."""
calls.append(("register", turn_id))
def fake_next(turn_id: str) -> Notification:
"""Return the next queued notification for stream_text."""
calls.append(("next", turn_id))
return notifications.popleft()
def fake_unregister(turn_id: str) -> None:
"""Record cleanup for the turn created by stream_text."""
calls.append(("unregister", turn_id))
client.register_turn_notifications = fake_register # type: ignore[method-assign]
client.next_turn_notification = fake_next # type: ignore[method-assign]
client.unregister_turn_notifications = fake_unregister # type: ignore[method-assign]
chunks = list(client.stream_text("thread-1", "hello"))
assert ([chunk.delta for chunk in chunks], calls) == (
["first", "second"],
[
("register", "turn-1"),
("next", "turn-1"),
("next", "turn-1"),
("next", "turn-1"),
("unregister", "turn-1"),
],
)
def test_async_thread_run_uses_last_completed_assistant_message_as_final_response() -> (
None
):
"""Async run should use the last final assistant message as the response text."""
async def scenario() -> None:
"""Feed two completed agent messages through the async per-turn stream."""
codex = AsyncCodex()
async def fake_ensure_initialized() -> None:
"""Avoid starting a real app-server process for this run test."""
return None
first_item_notification = _item_completed_notification(
text="First async message"
)
second_item_notification = _item_completed_notification(
text="Second async message"
)
notifications: deque[Notification] = deque(
[
first_item_notification,
second_item_notification,
_completed_notification(),
]
)
async def fake_turn_start(thread_id: str, wire_input: object, *, params=None): # noqa: ANN001,ANN202,ARG001
"""Return a synthetic turn id after AsyncThread.run builds input."""
return SimpleNamespace(turn=SimpleNamespace(id="turn-1"))
async def fake_next_notification(_turn_id: str) -> Notification:
"""Return the next queued notification for that synthetic turn."""
return notifications.popleft()
codex._ensure_initialized = fake_ensure_initialized # type: ignore[method-assign]
codex._client.turn_start = fake_turn_start # type: ignore[method-assign]
codex._client.next_turn_notification = fake_next_notification # type: ignore[method-assign]
result = await AsyncThread(codex, "thread-1").run("hello")
assert result.final_response == "Second async message"
assert result.items == [
first_item_notification.payload.item,
second_item_notification.payload.item,
]
asyncio.run(scenario())
def test_async_thread_run_returns_none_when_only_commentary_messages_complete() -> None:
"""Async Thread.run should ignore commentary-only messages for final text."""
async def scenario() -> None:
"""Feed a commentary item and completion through the async turn stream."""
codex = AsyncCodex()
async def fake_ensure_initialized() -> None:
"""Avoid starting a real app-server process for this run test."""
return None
commentary_notification = _item_completed_notification(
text="Commentary",
phase=MessagePhase.commentary,
)
notifications: deque[Notification] = deque(
[
commentary_notification,
_completed_notification(),
]
)
async def fake_turn_start(thread_id: str, wire_input: object, *, params=None): # noqa: ANN001,ANN202,ARG001
"""Return a synthetic turn id for commentary-only output."""
return SimpleNamespace(turn=SimpleNamespace(id="turn-1"))
async def fake_next_notification(_turn_id: str) -> Notification:
"""Return the next queued commentary/completion notification."""
return notifications.popleft()
codex._ensure_initialized = fake_ensure_initialized # type: ignore[method-assign]
codex._client.turn_start = fake_turn_start # type: ignore[method-assign]
codex._client.next_turn_notification = fake_next_notification # type: ignore[method-assign]
result = await AsyncThread(codex, "thread-1").run("hello")
assert result.final_response is None
assert result.items == [commentary_notification.payload.item]
asyncio.run(scenario())
def test_retry_examples_compare_status_with_enum() -> None:
for path in (
ROOT / "examples" / "10_error_handling_and_retry" / "sync.py",