diff --git a/sdk/python/tests/app_server_harness.py b/sdk/python/tests/app_server_harness.py index 7bc6469727..1a80a39d63 100644 --- a/sdk/python/tests/app_server_harness.py +++ b/sdk/python/tests/app_server_harness.py @@ -77,11 +77,19 @@ class MockSseResponse: return chunks +@dataclass(frozen=True) +class MockJsonResponse: + """One queued JSON response served by the mock Responses API.""" + + body: Json + + class MockResponsesServer: """Local HTTP server that records `/v1/responses` requests and returns SSE.""" def __init__(self) -> None: self._responses: queue.Queue[MockSseResponse] = queue.Queue() + self._compact_responses: queue.Queue[MockJsonResponse] = queue.Queue() self._requests: list[CapturedResponsesRequest] = [] self._requests_lock = threading.Lock() self._server = _ResponsesHttpServer(("127.0.0.1", 0), _ResponsesHandler, self) @@ -136,6 +144,21 @@ class MockResponsesServer: ) ) + def enqueue_compaction_summary(self, summary: str) -> None: + """Queue a compact endpoint response with one synthetic compaction item.""" + self._compact_responses.put( + MockJsonResponse( + body={ + "output": [ + { + "type": "compaction", + "encrypted_content": summary, + } + ] + } + ) + ) + def requests(self) -> list[CapturedResponsesRequest]: """Return all recorded Responses API requests.""" with self._requests_lock: @@ -180,6 +203,10 @@ class MockResponsesServer: """Return the next queued SSE response or fail the HTTP request.""" return self._responses.get_nowait() + def _next_compact_response(self) -> MockJsonResponse: + """Return the next queued compact JSON response or fail the HTTP request.""" + return self._compact_responses.get_nowait() + class AppServerHarness: """Test fixture that points a pinned runtime app-server at MockResponsesServer.""" @@ -276,11 +303,22 @@ class _ResponsesHandler(BaseHTTPRequestHandler): self.send_error(404, f"unexpected GET {self.path}") def do_POST(self) -> None: - """Serve queued SSE responses for `/v1/responses` requests.""" + """Serve queued responses for `/v1/responses` and compact requests.""" length = int(self.headers.get("content-length", "0")) body = self.rfile.read(length) self.server.mock._record_request(self, body) + if self.path.endswith("/v1/responses/compact") or self.path.endswith( + "/responses/compact" + ): + try: + response = self.server.mock._next_compact_response() + except queue.Empty: + self.send_error(500, "no queued compact response") + return + self._send_json(response.body) + return + if not (self.path.endswith("/v1/responses") or self.path.endswith("/responses")): self.send_error(404, f"unexpected POST {self.path}") return diff --git a/sdk/python/tests/test_async_client_behavior.py b/sdk/python/tests/test_async_client_behavior.py index 97e2a1d779..4e48fbbfee 100644 --- a/sdk/python/tests/test_async_client_behavior.py +++ b/sdk/python/tests/test_async_client_behavior.py @@ -2,11 +2,9 @@ from __future__ import annotations import asyncio import time -from types import SimpleNamespace from openai_codex.async_client import AsyncAppServerClient from openai_codex.generated.v2_all import ( - AgentMessageDeltaNotification, TurnCompletedNotification, ) from openai_codex.models import Notification, UnknownNotification @@ -36,46 +34,6 @@ def test_async_client_allows_concurrent_transport_calls() -> None: assert asyncio.run(scenario()) == 2 -def test_async_stream_text_is_incremental_without_blocking_parallel_calls() -> None: - """Async text streaming should yield incrementally without blocking other calls.""" - async def scenario() -> tuple[str, list[str], bool]: - """Start a stream, then prove another async client call can finish.""" - client = AsyncAppServerClient() - - def fake_stream_text(thread_id: str, text: str, params=None): # type: ignore[no-untyped-def] - """Yield one item before sleeping so the async wrapper can interleave.""" - yield "first" - time.sleep(0.03) - yield "second" - yield "third" - - def fake_model_list(include_hidden: bool = False) -> str: - """Return immediately to prove the event loop was not monopolized.""" - return "done" - - client._sync.stream_text = fake_stream_text # type: ignore[method-assign] - client._sync.model_list = fake_model_list # type: ignore[method-assign] - - stream = client.stream_text("thread-1", "hello") - first = await anext(stream) - - competing_call = asyncio.create_task(client.model_list()) - await asyncio.sleep(0.01) - competing_call_done_before_stream_done = competing_call.done() - - remaining: list[str] = [] - async for item in stream: - remaining.append(item) - - await competing_call - return first, remaining, competing_call_done_before_stream_done - - first, remaining, was_unblocked = asyncio.run(scenario()) - assert first == "first" - assert remaining == ["second", "third"] - assert was_unblocked - - def test_async_client_turn_notification_methods_delegate_to_sync_client() -> None: """Async turn routing methods should preserve sync-client registration semantics.""" async def scenario() -> tuple[list[tuple[str, str]], Notification, str]: @@ -142,84 +100,3 @@ def test_async_client_turn_notification_methods_delegate_to_sync_client() -> Non ), "turn-1", ) - - -def test_async_stream_text_uses_sync_turn_routing() -> None: - """Async text streaming should consume the same per-turn routing path as sync.""" - async def scenario() -> tuple[list[tuple[str, str]], list[str]]: - """Record routing calls while streaming two deltas and one completion.""" - client = AsyncAppServerClient() - notifications = [ - Notification( - method="item/agentMessage/delta", - payload=AgentMessageDeltaNotification.model_validate( - { - "delta": "first", - "itemId": "item-1", - "threadId": "thread-1", - "turnId": "turn-1", - } - ), - ), - Notification( - method="item/agentMessage/delta", - payload=AgentMessageDeltaNotification.model_validate( - { - "delta": "second", - "itemId": "item-2", - "threadId": "thread-1", - "turnId": "turn-1", - } - ), - ), - Notification( - method="turn/completed", - payload=TurnCompletedNotification.model_validate( - { - "threadId": "thread-1", - "turn": {"id": "turn-1", "items": [], "status": "completed"}, - } - ), - ), - ] - calls: list[tuple[str, str]] = [] - - def fake_turn_start(thread_id: str, text: str, *, params=None): # type: ignore[no-untyped-def] - """Return a started turn id while recording the request thread.""" - calls.append(("turn_start", thread_id)) - return SimpleNamespace(turn=SimpleNamespace(id="turn-1")) - - def fake_register(turn_id: str) -> None: - """Record stream registration for the started turn.""" - calls.append(("register", turn_id)) - - def fake_next(turn_id: str) -> Notification: - """Return the next queued turn notification.""" - calls.append(("next", turn_id)) - return notifications.pop(0) - - def fake_unregister(turn_id: str) -> None: - """Record stream cleanup for the started turn.""" - calls.append(("unregister", turn_id)) - - client._sync.turn_start = fake_turn_start # type: ignore[method-assign] - client._sync.register_turn_notifications = fake_register # type: ignore[method-assign] - client._sync.next_turn_notification = fake_next # type: ignore[method-assign] - client._sync.unregister_turn_notifications = fake_unregister # type: ignore[method-assign] - - chunks = [chunk async for chunk in client.stream_text("thread-1", "hello")] - return calls, [chunk.delta for chunk in chunks] - - calls, deltas = asyncio.run(scenario()) - - assert (calls, deltas) == ( - [ - ("turn_start", "thread-1"), - ("register", "turn-1"), - ("next", "turn-1"), - ("next", "turn-1"), - ("next", "turn-1"), - ("unregister", "turn-1"), - ], - ["first", "second"], - ) diff --git a/sdk/python/tests/test_client_rpc_methods.py b/sdk/python/tests/test_client_rpc_methods.py index 95f9f606ce..f2c5020d55 100644 --- a/sdk/python/tests/test_client_rpc_methods.py +++ b/sdk/python/tests/test_client_rpc_methods.py @@ -1,7 +1,6 @@ from __future__ import annotations from pathlib import Path -from typing import Any from openai_codex.client import AppServerClient, _params_dict from openai_codex.generated.notification_registry import notification_turn_id @@ -19,23 +18,6 @@ from openai_codex.models import Notification, UnknownNotification ROOT = Path(__file__).resolve().parents[1] -def test_thread_set_name_and_compact_use_current_rpc_methods() -> None: - client = AppServerClient() - calls: list[tuple[str, dict[str, Any] | None]] = [] - - def fake_request(method: str, params, *, response_model): # type: ignore[no-untyped-def] - calls.append((method, params)) - return response_model.model_validate({}) - - client.request = fake_request # type: ignore[method-assign] - - client.thread_set_name("thread-1", "sdk-name") - client.thread_compact("thread-1") - - assert calls[0][0] == "thread/name/set" - assert calls[1][0] == "thread/compact/start" - - def test_generated_params_models_are_snake_case_and_dump_by_alias() -> None: params = ThreadListParams(search_term="needle", limit=5) diff --git a/sdk/python/tests/test_mock_app_server_integration.py b/sdk/python/tests/test_mock_app_server_integration.py index c9e879ec77..1d50969230 100644 --- a/sdk/python/tests/test_mock_app_server_integration.py +++ b/sdk/python/tests/test_mock_app_server_integration.py @@ -4,10 +4,13 @@ import asyncio from collections.abc import AsyncIterator, Iterable, Iterator from typing import Any +import pytest + from app_server_harness import ( AppServerHarness, ev_assistant_message, ev_completed, + ev_failed, ev_message_item_added, ev_output_text_delta, ev_response_created, @@ -82,6 +85,26 @@ def _streaming_response(response_id: str, item_id: str, parts: list[str]) -> str ) +def _assistant_message_with_phase( + item_id: str, + text: str, + phase: MessagePhase, +) -> dict[str, Any]: + """Build an assistant message event carrying app-server phase metadata.""" + event = ev_assistant_message(item_id, text) + event["item"] = {**event["item"], "phase": phase.value} + return event + + +def _request_kind(request_path: str) -> str: + """Classify captured mock-server request paths for compact assertions.""" + if request_path.endswith("/responses/compact"): + return "compact" + if request_path.endswith("/responses"): + return "responses" + return request_path + + def test_sync_thread_run_uses_pinned_app_server_and_mock_responses( tmp_path, ) -> None: @@ -145,6 +168,165 @@ def test_async_thread_run_uses_pinned_app_server_and_mock_responses( asyncio.run(scenario()) +def test_run_result_item_semantics_use_real_app_server(tmp_path) -> None: + """RunResult should reflect real item notifications, not synthetic client queues.""" + cases = [ + ( + "last unknown phase wins", + sse( + [ + ev_response_created("items-last"), + ev_assistant_message("msg-items-first", "First message"), + ev_assistant_message("msg-items-second", "Second message"), + ev_completed("items-last"), + ] + ), + "Second message", + ["First message", "Second message"], + ), + ( + "empty last message is preserved", + sse( + [ + ev_response_created("items-empty"), + ev_assistant_message("msg-items-nonempty", "First message"), + ev_assistant_message("msg-items-empty", ""), + ev_completed("items-empty"), + ] + ), + "", + ["First message", ""], + ), + ( + "commentary only is not final", + sse( + [ + ev_response_created("items-commentary"), + _assistant_message_with_phase( + "msg-items-commentary", + "Commentary", + MessagePhase.commentary, + ), + ev_completed("items-commentary"), + ] + ), + None, + ["Commentary"], + ), + ] + + with AppServerHarness(tmp_path) as harness: + for _, body, _, _ in cases: + harness.responses.enqueue_sse(body) + + with Codex(config=harness.app_server_config()) as codex: + results = [ + codex.thread_start().run(f"case: {name}") for name, _, _, _ in cases + ] + + assert [ + { + "final_response": result.final_response, + "agent_messages": _agent_message_texts_from_items(result.items), + } + for result in results + ] == [ + { + "final_response": final_response, + "agent_messages": agent_messages, + } + for _, _, final_response, agent_messages in cases + ] + + +def test_thread_run_raises_when_real_app_server_reports_failed_turn(tmp_path) -> None: + """Thread.run should surface the failed turn error emitted by app-server.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + sse( + [ + ev_response_created("failed-run"), + ev_failed("failed-run", "boom from mock model"), + ] + ) + ) + + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + with pytest.raises(RuntimeError, match="boom from mock model"): + thread.run("trigger failure") + + +def test_async_run_result_item_semantics_use_real_app_server(tmp_path) -> None: + """Async RunResult should use the same real app-server notification mapping.""" + + async def scenario() -> None: + """Run multiple async result cases against one app-server process.""" + cases = [ + ( + "last async unknown phase wins", + sse( + [ + ev_response_created("async-items-last"), + ev_assistant_message( + "msg-async-items-first", + "First async message", + ), + ev_assistant_message( + "msg-async-items-second", + "Second async message", + ), + ev_completed("async-items-last"), + ] + ), + "Second async message", + ["First async message", "Second async message"], + ), + ( + "async commentary only is not final", + sse( + [ + ev_response_created("async-items-commentary"), + _assistant_message_with_phase( + "msg-async-items-commentary", + "Async commentary", + MessagePhase.commentary, + ), + ev_completed("async-items-commentary"), + ] + ), + None, + ["Async commentary"], + ), + ] + + with AppServerHarness(tmp_path) as harness: + for _, body, _, _ in cases: + harness.responses.enqueue_sse(body) + + async with AsyncCodex(config=harness.app_server_config()) as codex: + results = [ + await (await codex.thread_start()).run(f"case: {name}") + for name, _, _, _ in cases + ] + + assert [ + { + "final_response": result.final_response, + "agent_messages": _agent_message_texts_from_items(result.items), + } + for result in results + ] == [ + { + "final_response": final_response, + "agent_messages": agent_messages, + } + for _, _, final_response, agent_messages in cases + ] + + asyncio.run(scenario()) + + def test_sync_stream_routes_text_deltas_and_completion(tmp_path) -> None: """A sync turn stream should expose deltas, completed items, and completion.""" with AppServerHarness(tmp_path) as harness: @@ -233,6 +415,62 @@ def test_async_stream_routes_text_deltas_and_completion(tmp_path) -> None: asyncio.run(scenario()) +def test_low_level_sync_stream_text_uses_real_turn_routing(tmp_path) -> None: + """AppServerClient.stream_text should stream through a real app-server turn.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + _streaming_response("low-sync-stream", "msg-low-sync-stream", ["fir", "st"]) + ) + + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + chunks = list(codex._client.stream_text(thread.id, "low-level sync")) # noqa: SLF001 + + assert [chunk.delta for chunk in chunks] == ["fir", "st"] + + +def test_low_level_async_stream_text_allows_parallel_model_list(tmp_path) -> None: + """Async stream_text should yield without blocking another app-server request.""" + + async def scenario() -> None: + """Leave a stream open while another async request completes.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + _streaming_response( + "low-async-stream", + "msg-low-async-stream", + ["one", "two", "three"], + ), + delay_between_events_s=0.03, + ) + + async with AsyncCodex(config=harness.app_server_config()) as codex: + thread = await codex.thread_start() + stream = codex._client.stream_text( # noqa: SLF001 + thread.id, + "low-level async", + ) + first = await anext(stream) + models_task = asyncio.create_task(codex.models()) + models = await asyncio.wait_for(models_task, timeout=1.0) + remaining = [chunk.delta async for chunk in stream] + + assert { + "first": first.delta, + "remaining": remaining, + "models_payload_has_data": isinstance( + models.model_dump(by_alias=True, mode="json").get("data"), + list, + ), + } == { + "first": "one", + "remaining": ["two", "three"], + "models_payload_has_data": True, + } + + asyncio.run(scenario()) + + def test_interleaved_sync_turn_streams_route_by_turn_id(tmp_path) -> None: """Two sync streams on one client should consume only their own notifications.""" with AppServerHarness(tmp_path) as harness: @@ -338,6 +576,79 @@ def test_interleaved_async_turn_streams_route_by_turn_id(tmp_path) -> None: asyncio.run(scenario()) +def test_approval_modes_preserve_real_app_server_state_without_override( + tmp_path, +) -> None: + """Resume, fork, and next turn should inherit approval settings unless overridden.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message("turn override", response_id="turn-mode-1") + harness.responses.enqueue_assistant_message("turn inherited", response_id="turn-mode-2") + + with Codex(config=harness.app_server_config()) as codex: + source = codex.thread_start(approval_mode=ApprovalMode.deny_all) + resumed = codex.thread_resume(source.id) + forked = codex.thread_fork(source.id) + explicit_fork = codex.thread_fork( + source.id, + approval_mode=ApprovalMode.auto_review, + ) + + turn_thread = codex.thread_start() + first_result = turn_thread.run( + "deny this and later turns", + approval_mode=ApprovalMode.deny_all, + ) + after_turn_override = codex._client.thread_resume( # noqa: SLF001 + turn_thread.id, + ThreadResumeParams(thread_id=turn_thread.id), + ) + second_result = turn_thread.run("inherit previous approval mode") + after_omitted_turn = codex._client.thread_resume( # noqa: SLF001 + turn_thread.id, + ThreadResumeParams(thread_id=turn_thread.id), + ) + + inherited_policies = { + "resumed": _response_approval_policy( + codex._client.thread_resume( # noqa: SLF001 + resumed.id, + ThreadResumeParams(thread_id=resumed.id), + ) + ), + "forked": _response_approval_policy( + codex._client.thread_resume( # noqa: SLF001 + forked.id, + ThreadResumeParams(thread_id=forked.id), + ) + ), + "explicit_fork": _response_approval_policy( + codex._client.thread_resume( # noqa: SLF001 + explicit_fork.id, + ThreadResumeParams(thread_id=explicit_fork.id), + ) + ), + "after_turn_override": _response_approval_policy(after_turn_override), + "after_omitted_turn": _response_approval_policy(after_omitted_turn), + } + + assert { + "policies": inherited_policies, + "final_responses": [ + first_result.final_response, + second_result.final_response, + ], + } == { + "policies": { + "resumed": AskForApprovalValue.never.value, + "forked": AskForApprovalValue.never.value, + "explicit_fork": AskForApprovalValue.on_request.value, + "after_turn_override": AskForApprovalValue.never.value, + "after_omitted_turn": AskForApprovalValue.never.value, + }, + "final_responses": ["turn override", "turn inherited"], + } + + def test_thread_run_approval_mode_persists_until_explicit_override(tmp_path) -> None: """Omitted run approval mode should not rewrite the thread's stored setting.""" with AppServerHarness(tmp_path) as harness: @@ -442,6 +753,38 @@ def test_thread_lifecycle_uses_real_app_server_without_model_mocking(tmp_path) - } +def test_models_and_compact_use_real_app_server_rpcs(tmp_path) -> None: + """Model listing and compaction should go through real app-server methods.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message("history", response_id="compact-history") + harness.responses.enqueue_compaction_summary("compact summary") + + with Codex(config=harness.app_server_config()) as codex: + models = codex.models(include_hidden=True) + thread = codex.thread_start() + run_result = thread.run("create history") + compact_response = thread.compact() + requests = harness.responses.wait_for_requests(2) + + assert { + "models_payload_has_data": isinstance( + models.model_dump(by_alias=True, mode="json").get("data"), + list, + ), + "run_final_response": run_result.final_response, + "compact_response": compact_response.model_dump( + by_alias=True, + mode="json", + ), + "request_kinds": [_request_kind(request.path) for request in requests], + } == { + "models_payload_has_data": True, + "run_final_response": "history", + "compact_response": {}, + "request_kinds": ["responses", "compact"], + } + + def test_final_answer_phase_survives_real_app_server_mapping(tmp_path) -> None: """RunResult should use the final-answer item emitted by app-server.""" with AppServerHarness(tmp_path) as harness: diff --git a/sdk/python/tests/test_public_api_runtime_behavior.py b/sdk/python/tests/test_public_api_runtime_behavior.py index 2d170ef1de..fb52da59b3 100644 --- a/sdk/python/tests/test_public_api_runtime_behavior.py +++ b/sdk/python/tests/test_public_api_runtime_behavior.py @@ -1,29 +1,18 @@ from __future__ import annotations import asyncio -from collections import deque from pathlib import Path -from types import SimpleNamespace from typing import Any import pytest import openai_codex.api as public_api_module -from openai_codex.client import AppServerClient -from openai_codex.generated.v2_all import ( - AgentMessageDeltaNotification, - ItemCompletedNotification, - MessagePhase, - TurnCompletedNotification, - TurnStartParams, -) -from openai_codex.models import InitializeResponse, Notification +from openai_codex.generated.v2_all import TurnStartParams +from openai_codex.models import InitializeResponse from openai_codex.api import ( ApprovalMode, AsyncCodex, - AsyncThread, Codex, - Thread, ) ROOT = Path(__file__).resolve().parents[1] @@ -45,79 +34,6 @@ def _approval_settings(params: list[Any]) -> list[dict[str, object]]: ] -def _delta_notification( - *, - thread_id: str = "thread-1", - turn_id: str = "turn-1", - text: str = "delta-text", -) -> Notification: - return Notification( - method="item/agentMessage/delta", - payload=AgentMessageDeltaNotification.model_validate( - { - "delta": text, - "itemId": "item-1", - "threadId": thread_id, - "turnId": turn_id, - } - ), - ) - - -def _completed_notification( - *, - thread_id: str = "thread-1", - turn_id: str = "turn-1", - status: str = "completed", - error_message: str | None = None, -) -> Notification: - turn: dict[str, object] = { - "id": turn_id, - "items": [], - "status": status, - } - if error_message is not None: - turn["error"] = {"message": error_message} - return Notification( - method="turn/completed", - payload=TurnCompletedNotification.model_validate( - { - "threadId": thread_id, - "turn": turn, - } - ), - ) - - -def _item_completed_notification( - *, - thread_id: str = "thread-1", - turn_id: str = "turn-1", - text: str = "final text", - phase: MessagePhase | None = None, -) -> Notification: - """Build a realistic completed-item notification accepted by generated models.""" - item: dict[str, object] = { - "id": "item-1", - "text": text, - "type": "agentMessage", - } - if phase is not None: - item["phase"] = phase.value - return Notification( - method="item/completed", - payload=ItemCompletedNotification.model_validate( - { - # The pinned runtime schema requires completion timestamps. - "completedAtMs": 1, - "item": item, - "threadId": thread_id, - "turnId": turn_id, - } - ), - ) - - def test_codex_init_failure_closes_client(monkeypatch: pytest.MonkeyPatch) -> None: closed: list[bool] = [] @@ -244,264 +160,6 @@ def test_unknown_approval_mode_is_rejected() -> None: public_api_module._approval_mode_settings("allow_all") # type: ignore[arg-type] -def test_thread_run_uses_last_completed_assistant_message_as_final_response() -> None: - client = AppServerClient() - first_item_notification = _item_completed_notification(text="First message") - second_item_notification = _item_completed_notification(text="Second message") - notifications: deque[Notification] = deque( - [ - first_item_notification, - second_item_notification, - _completed_notification(), - ] - ) - client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign] - client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731 - turn=SimpleNamespace(id="turn-1") - ) - - result = Thread(client, "thread-1").run("hello") - - assert result.final_response == "Second message" - assert result.items == [ - first_item_notification.payload.item, - second_item_notification.payload.item, - ] - - -def test_thread_run_preserves_empty_last_assistant_message() -> None: - client = AppServerClient() - first_item_notification = _item_completed_notification(text="First message") - second_item_notification = _item_completed_notification(text="") - notifications: deque[Notification] = deque( - [ - first_item_notification, - second_item_notification, - _completed_notification(), - ] - ) - client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign] - client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731 - turn=SimpleNamespace(id="turn-1") - ) - - result = Thread(client, "thread-1").run("hello") - - assert result.final_response == "" - assert result.items == [ - first_item_notification.payload.item, - second_item_notification.payload.item, - ] - - -def test_thread_run_prefers_explicit_final_answer_over_later_commentary() -> None: - client = AppServerClient() - final_answer_notification = _item_completed_notification( - text="Final answer", - phase=MessagePhase.final_answer, - ) - commentary_notification = _item_completed_notification( - text="Commentary", - phase=MessagePhase.commentary, - ) - notifications: deque[Notification] = deque( - [ - final_answer_notification, - commentary_notification, - _completed_notification(), - ] - ) - client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign] - client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731 - turn=SimpleNamespace(id="turn-1") - ) - - result = Thread(client, "thread-1").run("hello") - - assert result.final_response == "Final answer" - assert result.items == [ - final_answer_notification.payload.item, - commentary_notification.payload.item, - ] - - -def test_thread_run_returns_none_when_only_commentary_messages_complete() -> None: - client = AppServerClient() - commentary_notification = _item_completed_notification( - text="Commentary", - phase=MessagePhase.commentary, - ) - notifications: deque[Notification] = deque( - [ - commentary_notification, - _completed_notification(), - ] - ) - client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign] - client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731 - turn=SimpleNamespace(id="turn-1") - ) - - result = Thread(client, "thread-1").run("hello") - - assert result.final_response is None - assert result.items == [commentary_notification.payload.item] - - -def test_thread_run_raises_on_failed_turn() -> None: - client = AppServerClient() - notifications: deque[Notification] = deque( - [ - _completed_notification(status="failed", error_message="boom"), - ] - ) - client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign] - client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731 - turn=SimpleNamespace(id="turn-1") - ) - - with pytest.raises(RuntimeError, match="boom"): - Thread(client, "thread-1").run("hello") - - -def test_stream_text_registers_and_consumes_turn_notifications() -> None: - """stream_text should register, consume, and unregister one turn queue.""" - client = AppServerClient() - notifications: deque[Notification] = deque( - [ - _delta_notification(text="first"), - _delta_notification(text="second"), - _completed_notification(), - ] - ) - calls: list[tuple[str, str]] = [] - client.turn_start = lambda thread_id, input_items, *, params=None: SimpleNamespace( # noqa: ARG005,E731 - turn=SimpleNamespace(id="turn-1") - ) - - def fake_register(turn_id: str) -> None: - """Record registration for the turn created by stream_text.""" - calls.append(("register", turn_id)) - - def fake_next(turn_id: str) -> Notification: - """Return the next queued notification for stream_text.""" - calls.append(("next", turn_id)) - return notifications.popleft() - - def fake_unregister(turn_id: str) -> None: - """Record cleanup for the turn created by stream_text.""" - calls.append(("unregister", turn_id)) - - client.register_turn_notifications = fake_register # type: ignore[method-assign] - client.next_turn_notification = fake_next # type: ignore[method-assign] - client.unregister_turn_notifications = fake_unregister # type: ignore[method-assign] - - chunks = list(client.stream_text("thread-1", "hello")) - - assert ([chunk.delta for chunk in chunks], calls) == ( - ["first", "second"], - [ - ("register", "turn-1"), - ("next", "turn-1"), - ("next", "turn-1"), - ("next", "turn-1"), - ("unregister", "turn-1"), - ], - ) - - -def test_async_thread_run_uses_last_completed_assistant_message_as_final_response() -> ( - None -): - """Async run should use the last final assistant message as the response text.""" - - async def scenario() -> None: - """Feed two completed agent messages through the async per-turn stream.""" - codex = AsyncCodex() - - async def fake_ensure_initialized() -> None: - """Avoid starting a real app-server process for this run test.""" - return None - - first_item_notification = _item_completed_notification( - text="First async message" - ) - second_item_notification = _item_completed_notification( - text="Second async message" - ) - notifications: deque[Notification] = deque( - [ - first_item_notification, - second_item_notification, - _completed_notification(), - ] - ) - - async def fake_turn_start(thread_id: str, wire_input: object, *, params=None): # noqa: ANN001,ANN202,ARG001 - """Return a synthetic turn id after AsyncThread.run builds input.""" - return SimpleNamespace(turn=SimpleNamespace(id="turn-1")) - - async def fake_next_notification(_turn_id: str) -> Notification: - """Return the next queued notification for that synthetic turn.""" - return notifications.popleft() - - codex._ensure_initialized = fake_ensure_initialized # type: ignore[method-assign] - codex._client.turn_start = fake_turn_start # type: ignore[method-assign] - codex._client.next_turn_notification = fake_next_notification # type: ignore[method-assign] - - result = await AsyncThread(codex, "thread-1").run("hello") - - assert result.final_response == "Second async message" - assert result.items == [ - first_item_notification.payload.item, - second_item_notification.payload.item, - ] - - asyncio.run(scenario()) - - -def test_async_thread_run_returns_none_when_only_commentary_messages_complete() -> None: - """Async Thread.run should ignore commentary-only messages for final text.""" - - async def scenario() -> None: - """Feed a commentary item and completion through the async turn stream.""" - codex = AsyncCodex() - - async def fake_ensure_initialized() -> None: - """Avoid starting a real app-server process for this run test.""" - return None - - commentary_notification = _item_completed_notification( - text="Commentary", - phase=MessagePhase.commentary, - ) - notifications: deque[Notification] = deque( - [ - commentary_notification, - _completed_notification(), - ] - ) - - async def fake_turn_start(thread_id: str, wire_input: object, *, params=None): # noqa: ANN001,ANN202,ARG001 - """Return a synthetic turn id for commentary-only output.""" - return SimpleNamespace(turn=SimpleNamespace(id="turn-1")) - - async def fake_next_notification(_turn_id: str) -> Notification: - """Return the next queued commentary/completion notification.""" - return notifications.popleft() - - codex._ensure_initialized = fake_ensure_initialized # type: ignore[method-assign] - codex._client.turn_start = fake_turn_start # type: ignore[method-assign] - codex._client.next_turn_notification = fake_next_notification # type: ignore[method-assign] - - result = await AsyncThread(codex, "thread-1").run("hello") - - assert result.final_response is None - assert result.items == [commentary_notification.payload.item] - - asyncio.run(scenario()) - - def test_retry_examples_compare_status_with_enum() -> None: for path in ( ROOT / "examples" / "10_error_handling_and_retry" / "sync.py",