From becbd2a1272b64413c22bcbfe67f3bae55b2f6ab Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Sat, 9 May 2026 10:23:06 +0300 Subject: [PATCH] Document SDK turn routing helpers Co-authored-by: Codex --- sdk/python/scripts/update_sdk_artifacts.py | 5 ++++ .../src/codex_app_server/_message_router.py | 2 ++ sdk/python/src/codex_app_server/api.py | 2 ++ .../src/codex_app_server/async_client.py | 29 +++++++++++++++++++ sdk/python/src/codex_app_server/client.py | 12 ++++++++ .../generated/notification_registry.py | 1 + .../test_artifact_workflow_and_binaries.py | 2 ++ .../tests/test_async_client_behavior.py | 19 ++++++++++++ sdk/python/tests/test_client_rpc_methods.py | 7 +++++ .../tests/test_public_api_runtime_behavior.py | 24 +++++++++++++++ 10 files changed, 103 insertions(+) diff --git a/sdk/python/scripts/update_sdk_artifacts.py b/sdk/python/scripts/update_sdk_artifacts.py index 4ff6f0c24f..c44a1444c0 100755 --- a/sdk/python/scripts/update_sdk_artifacts.py +++ b/sdk/python/scripts/update_sdk_artifacts.py @@ -61,6 +61,7 @@ def staged_runtime_bin_path(root: Path) -> Path: def staged_runtime_resource_path(root: Path, resource: Path) -> Path: + """Stage runtime helper binaries beside the main bundled Codex binary.""" # Runtime wheels include the whole bin/ directory, so helper executables # should be staged beside the main Codex binary instead of changing the # package template for each platform. @@ -588,6 +589,7 @@ def _notification_specs() -> list[tuple[str, str]]: def _notification_turn_id_specs( specs: list[tuple[str, str]], ) -> tuple[list[str], list[str]]: + """Classify generated notification payloads by where the turn id lives.""" server_notifications = json.loads( (schema_root_dir() / "ServerNotification.json").read_text() ) @@ -615,6 +617,7 @@ def _notification_turn_id_specs( def _type_tuple_source(class_names: list[str]) -> str: + """Render a generated tuple literal for notification payload classes.""" if not class_names: return "()" if len(class_names) == 1: @@ -623,6 +626,7 @@ def _type_tuple_source(class_names: list[str]) -> str: def generate_notification_registry() -> None: + """Regenerate notification models and routing metadata from generated schemas.""" out = ( sdk_root() / "src" @@ -666,6 +670,7 @@ def generate_notification_registry() -> None: "", "", "def notification_turn_id(payload: BaseModel) -> str | None:", + ' """Return the turn id carried by generated notification payload metadata."""', " if isinstance(payload, DIRECT_TURN_ID_NOTIFICATION_TYPES):", " return payload.turn_id if isinstance(payload.turn_id, str) else None", " if isinstance(payload, NESTED_TURN_NOTIFICATION_TYPES):", diff --git a/sdk/python/src/codex_app_server/_message_router.py b/sdk/python/src/codex_app_server/_message_router.py index 6de575166c..fb466f958c 100644 --- a/sdk/python/src/codex_app_server/_message_router.py +++ b/sdk/python/src/codex_app_server/_message_router.py @@ -22,6 +22,7 @@ class MessageRouter: """ def __init__(self) -> None: + """Create empty response, turn, and global notification queues.""" self._lock = threading.Lock() self._response_waiters: dict[str, queue.Queue[ResponseQueueItem]] = {} self._turn_notifications: dict[str, queue.Queue[NotificationQueueItem]] = {} @@ -144,6 +145,7 @@ class MessageRouter: self._global_notifications.put(exc) def _notification_turn_id(self, notification: Notification) -> str | None: + """Extract routing ids from known generated payloads or raw unknown payloads.""" payload = notification.payload if isinstance(payload, UnknownNotification): raw_turn_id = payload.params.get("turnId") diff --git a/sdk/python/src/codex_app_server/api.py b/sdk/python/src/codex_app_server/api.py index 6f61a55868..886e4dd826 100644 --- a/sdk/python/src/codex_app_server/api.py +++ b/sdk/python/src/codex_app_server/api.py @@ -678,6 +678,7 @@ class TurnHandle: return self._client.turn_interrupt(self.thread_id, self.id) def stream(self) -> Iterator[Notification]: + """Yield only notifications routed to this turn handle.""" self._client.register_turn_notifications(self.id) try: while True: @@ -730,6 +731,7 @@ class AsyncTurnHandle: return await self._codex._client.turn_interrupt(self.thread_id, self.id) async def stream(self) -> AsyncIterator[Notification]: + """Yield only notifications routed to this async turn handle.""" await self._codex._ensure_initialized() self._codex._client.register_turn_notifications(self.id) try: diff --git a/sdk/python/src/codex_app_server/async_client.py b/sdk/python/src/codex_app_server/async_client.py index 6768c7d9fc..581d14abe9 100644 --- a/sdk/python/src/codex_app_server/async_client.py +++ b/sdk/python/src/codex_app_server/async_client.py @@ -40,13 +40,16 @@ class AsyncAppServerClient: """Async wrapper around AppServerClient using thread offloading.""" def __init__(self, config: AppServerConfig | None = None) -> None: + """Create the wrapped sync client that owns the transport process.""" self._sync = AppServerClient(config=config) async def __aenter__(self) -> "AsyncAppServerClient": + """Start the app-server process when entering an async context.""" await self.start() return self async def __aexit__(self, _exc_type, _exc, _tb) -> None: + """Close the app-server process when leaving an async context.""" await self.close() async def _call_sync( @@ -56,30 +59,37 @@ class AsyncAppServerClient: *args: ParamsT.args, **kwargs: ParamsT.kwargs, ) -> ReturnT: + """Run a blocking sync-client operation without blocking the event loop.""" return await asyncio.to_thread(fn, *args, **kwargs) @staticmethod def _next_from_iterator( iterator: Iterator[AgentMessageDeltaNotification], ) -> tuple[bool, AgentMessageDeltaNotification | None]: + """Convert StopIteration into a value that can cross asyncio.to_thread.""" try: return True, next(iterator) except StopIteration: return False, None async def start(self) -> None: + """Start the wrapped sync client in a worker thread.""" await self._call_sync(self._sync.start) async def close(self) -> None: + """Close the wrapped sync client in a worker thread.""" await self._call_sync(self._sync.close) async def initialize(self) -> InitializeResponse: + """Initialize the app-server session.""" return await self._call_sync(self._sync.initialize) def register_turn_notifications(self, turn_id: str) -> None: + """Register a turn notification queue on the wrapped sync client.""" self._sync.register_turn_notifications(turn_id) def unregister_turn_notifications(self, turn_id: str) -> None: + """Unregister a turn notification queue on the wrapped sync client.""" self._sync.unregister_turn_notifications(turn_id) async def request( @@ -89,6 +99,7 @@ class AsyncAppServerClient: *, response_model: type[ModelT], ) -> ModelT: + """Send a typed JSON-RPC request through the wrapped sync client.""" return await self._call_sync( self._sync.request, method, @@ -99,6 +110,7 @@ class AsyncAppServerClient: async def thread_start( self, params: V2ThreadStartParams | JsonObject | None = None ) -> ThreadStartResponse: + """Start a thread using the wrapped sync client.""" return await self._call_sync(self._sync.thread_start, params) async def thread_resume( @@ -106,16 +118,19 @@ class AsyncAppServerClient: thread_id: str, params: V2ThreadResumeParams | JsonObject | None = None, ) -> ThreadResumeResponse: + """Resume a thread using the wrapped sync client.""" return await self._call_sync(self._sync.thread_resume, thread_id, params) async def thread_list( self, params: V2ThreadListParams | JsonObject | None = None ) -> ThreadListResponse: + """List threads using the wrapped sync client.""" return await self._call_sync(self._sync.thread_list, params) async def thread_read( self, thread_id: str, include_turns: bool = False ) -> ThreadReadResponse: + """Read a thread using the wrapped sync client.""" return await self._call_sync(self._sync.thread_read, thread_id, include_turns) async def thread_fork( @@ -123,18 +138,23 @@ class AsyncAppServerClient: thread_id: str, params: V2ThreadForkParams | JsonObject | None = None, ) -> ThreadForkResponse: + """Fork a thread using the wrapped sync client.""" return await self._call_sync(self._sync.thread_fork, thread_id, params) async def thread_archive(self, thread_id: str) -> ThreadArchiveResponse: + """Archive a thread using the wrapped sync client.""" return await self._call_sync(self._sync.thread_archive, thread_id) async def thread_unarchive(self, thread_id: str) -> ThreadUnarchiveResponse: + """Unarchive a thread using the wrapped sync client.""" return await self._call_sync(self._sync.thread_unarchive, thread_id) async def thread_set_name(self, thread_id: str, name: str) -> ThreadSetNameResponse: + """Rename a thread using the wrapped sync client.""" return await self._call_sync(self._sync.thread_set_name, thread_id, name) async def thread_compact(self, thread_id: str) -> ThreadCompactStartResponse: + """Start thread compaction using the wrapped sync client.""" return await self._call_sync(self._sync.thread_compact, thread_id) async def turn_start( @@ -143,6 +163,7 @@ class AsyncAppServerClient: input_items: list[JsonObject] | JsonObject | str, params: V2TurnStartParams | JsonObject | None = None, ) -> TurnStartResponse: + """Start a turn using the wrapped sync client.""" return await self._call_sync( self._sync.turn_start, thread_id, input_items, params ) @@ -150,6 +171,7 @@ class AsyncAppServerClient: async def turn_interrupt( self, thread_id: str, turn_id: str ) -> TurnInterruptResponse: + """Interrupt a turn using the wrapped sync client.""" return await self._call_sync(self._sync.turn_interrupt, thread_id, turn_id) async def turn_steer( @@ -158,6 +180,7 @@ class AsyncAppServerClient: expected_turn_id: str, input_items: list[JsonObject] | JsonObject | str, ) -> TurnSteerResponse: + """Send steering input to a turn using the wrapped sync client.""" return await self._call_sync( self._sync.turn_steer, thread_id, @@ -166,6 +189,7 @@ class AsyncAppServerClient: ) async def model_list(self, include_hidden: bool = False) -> ModelListResponse: + """List models using the wrapped sync client.""" return await self._call_sync(self._sync.model_list, include_hidden) async def request_with_retry_on_overload( @@ -178,6 +202,7 @@ class AsyncAppServerClient: initial_delay_s: float = 0.25, max_delay_s: float = 2.0, ) -> ModelT: + """Send a typed request with the sync client's overload retry policy.""" return await self._call_sync( self._sync.request_with_retry_on_overload, method, @@ -189,12 +214,15 @@ class AsyncAppServerClient: ) async def next_notification(self) -> Notification: + """Wait for the next global notification without blocking the event loop.""" return await self._call_sync(self._sync.next_notification) async def next_turn_notification(self, turn_id: str) -> Notification: + """Wait for the next notification routed to one turn.""" return await self._call_sync(self._sync.next_turn_notification, turn_id) async def wait_for_turn_completed(self, turn_id: str) -> TurnCompletedNotification: + """Wait for the completion notification routed to one turn.""" return await self._call_sync(self._sync.wait_for_turn_completed, turn_id) async def stream_text( @@ -203,6 +231,7 @@ class AsyncAppServerClient: text: str, params: V2TurnStartParams | JsonObject | None = None, ) -> AsyncIterator[AgentMessageDeltaNotification]: + """Stream text deltas from one turn without monopolizing the event loop.""" iterator = self._sync.stream_text(thread_id, text, params) while True: has_value, chunk = await asyncio.to_thread( diff --git a/sdk/python/src/codex_app_server/client.py b/sdk/python/src/codex_app_server/client.py index ce3df4e416..26d23e3fa7 100644 --- a/sdk/python/src/codex_app_server/client.py +++ b/sdk/python/src/codex_app_server/client.py @@ -243,6 +243,7 @@ class AppServerClient: return response_model.model_validate(result) def _request_raw(self, method: str, params: JsonObject | None = None) -> JsonValue: + """Send a JSON-RPC request and wait for the reader thread to route its response.""" request_id = str(uuid.uuid4()) waiter = self._router.create_response_waiter(request_id) @@ -260,18 +261,23 @@ class AppServerClient: return item def notify(self, method: str, params: JsonObject | None = None) -> None: + """Send a JSON-RPC notification without waiting for a response.""" self._write_message({"method": method, "params": params or {}}) def next_notification(self) -> Notification: + """Return the next notification that is not scoped to an active turn.""" return self._router.next_global_notification() def register_turn_notifications(self, turn_id: str) -> None: + """Start routing notifications for one turn into its dedicated queue.""" self._router.register_turn(turn_id) def unregister_turn_notifications(self, turn_id: str) -> None: + """Stop routing notifications for one turn into its dedicated queue.""" self._router.unregister_turn(turn_id) def next_turn_notification(self, turn_id: str) -> Notification: + """Return the next routed notification for the requested turn id.""" return self._router.next_turn_notification(turn_id) def thread_start( @@ -349,6 +355,7 @@ class AppServerClient: input_items: list[JsonObject] | JsonObject | str, params: V2TurnStartParams | JsonObject | None = None, ) -> TurnStartResponse: + """Start a turn and register its notification queue as early as possible.""" payload = { **_params_dict(params), "threadId": thread_id, @@ -406,6 +413,7 @@ class AppServerClient: ) def wait_for_turn_completed(self, turn_id: str) -> TurnCompletedNotification: + """Block on the routed turn stream until the matching completion arrives.""" self.register_turn_notifications(turn_id) try: while True: @@ -425,6 +433,7 @@ class AppServerClient: text: str, params: V2TurnStartParams | JsonObject | None = None, ) -> Iterator[AgentMessageDeltaNotification]: + """Start a text turn and yield only its agent-message delta payloads.""" started = self.turn_start(thread_id, text, params=params) turn_id = started.turn.id self.register_turn_notifications(turn_id) @@ -477,6 +486,7 @@ class AppServerClient: def _default_approval_handler( self, method: str, params: JsonObject | None ) -> JsonObject: + """Accept approval requests when the caller did not provide a handler.""" if method == "item/commandExecution/requestApproval": return {"decision": "accept"} if method == "item/fileChange/requestApproval": @@ -498,6 +508,7 @@ class AppServerClient: self._stderr_thread.start() def _start_reader_thread(self) -> None: + """Start the sole stdout reader that fans messages into router queues.""" if self._proc is None or self._proc.stdout is None: return @@ -505,6 +516,7 @@ class AppServerClient: self._reader_thread.start() def _reader_loop(self) -> None: + """Continuously classify transport messages into requests, responses, and events.""" try: while True: msg = self._read_message() diff --git a/sdk/python/src/codex_app_server/generated/notification_registry.py b/sdk/python/src/codex_app_server/generated/notification_registry.py index b44ca2a436..3319f4edb5 100644 --- a/sdk/python/src/codex_app_server/generated/notification_registry.py +++ b/sdk/python/src/codex_app_server/generated/notification_registry.py @@ -165,6 +165,7 @@ NESTED_TURN_NOTIFICATION_TYPES: tuple[type[BaseModel], ...] = ( def notification_turn_id(payload: BaseModel) -> str | None: + """Return the turn id carried by generated notification payload metadata.""" if isinstance(payload, DIRECT_TURN_ID_NOTIFICATION_TYPES): return payload.turn_id if isinstance(payload.turn_id, str) else None if isinstance(payload, NESTED_TURN_NOTIFICATION_TYPES): diff --git a/sdk/python/tests/test_artifact_workflow_and_binaries.py b/sdk/python/tests/test_artifact_workflow_and_binaries.py index e9b4e6a8bb..93be06ca8a 100644 --- a/sdk/python/tests/test_artifact_workflow_and_binaries.py +++ b/sdk/python/tests/test_artifact_workflow_and_binaries.py @@ -352,6 +352,7 @@ def test_stage_runtime_release_can_pin_wheel_platform_tag(tmp_path: Path) -> Non def test_stage_runtime_release_copies_resource_binaries(tmp_path: Path) -> None: + """Runtime staging should copy every helper binary into the wheel bin dir.""" script = _load_update_script_module() fake_binary = tmp_path / script.runtime_binary_name() helper = tmp_path / "helper" @@ -382,6 +383,7 @@ def test_stage_runtime_release_copies_resource_binaries(tmp_path: Path) -> None: def test_runtime_resource_binaries_are_included_by_wheel_config( tmp_path: Path, ) -> None: + """The runtime wheel config should include helper binaries beside Codex.""" script = _load_update_script_module() fake_binary = tmp_path / script.runtime_binary_name() helper = tmp_path / "helper" diff --git a/sdk/python/tests/test_async_client_behavior.py b/sdk/python/tests/test_async_client_behavior.py index 0c4e8096fb..51359f041f 100644 --- a/sdk/python/tests/test_async_client_behavior.py +++ b/sdk/python/tests/test_async_client_behavior.py @@ -13,12 +13,15 @@ from codex_app_server.models import Notification, UnknownNotification def test_async_client_allows_concurrent_transport_calls() -> None: + """Async wrappers should offload sync calls so concurrent awaits can overlap.""" async def scenario() -> int: + """Run two blocking sync calls and report peak overlap.""" client = AsyncAppServerClient() active = 0 max_active = 0 def fake_model_list(include_hidden: bool = False) -> bool: + """Simulate a blocking sync transport call.""" nonlocal active, max_active active += 1 max_active = max(max_active, active) @@ -34,16 +37,20 @@ def test_async_client_allows_concurrent_transport_calls() -> None: def test_async_stream_text_is_incremental_without_blocking_parallel_calls() -> None: + """Async text streaming should yield incrementally without blocking other calls.""" async def scenario() -> tuple[str, list[str], bool]: + """Start a stream, then prove another async client call can finish.""" client = AsyncAppServerClient() def fake_stream_text(thread_id: str, text: str, params=None): # type: ignore[no-untyped-def] + """Yield one item before sleeping so the async wrapper can interleave.""" yield "first" time.sleep(0.03) yield "second" yield "third" def fake_model_list(include_hidden: bool = False) -> str: + """Return immediately to prove the event loop was not monopolized.""" return "done" client._sync.stream_text = fake_stream_text # type: ignore[method-assign] @@ -70,7 +77,9 @@ def test_async_stream_text_is_incremental_without_blocking_parallel_calls() -> N def test_async_client_turn_notification_methods_delegate_to_sync_client() -> None: + """Async turn routing methods should preserve sync-client registration semantics.""" async def scenario() -> tuple[list[tuple[str, str]], Notification, str]: + """Record the sync-client calls made by async turn notification wrappers.""" client = AsyncAppServerClient() event = Notification( method="unknown/direct", @@ -85,16 +94,20 @@ def test_async_client_turn_notification_methods_delegate_to_sync_client() -> Non calls: list[tuple[str, str]] = [] def fake_register(turn_id: str) -> None: + """Record turn registration through the wrapped sync client.""" calls.append(("register", turn_id)) def fake_unregister(turn_id: str) -> None: + """Record turn unregistration through the wrapped sync client.""" calls.append(("unregister", turn_id)) def fake_next(turn_id: str) -> Notification: + """Return one routed notification through the wrapped sync client.""" calls.append(("next", turn_id)) return event def fake_wait(turn_id: str) -> TurnCompletedNotification: + """Return one completion through the wrapped sync client.""" calls.append(("wait", turn_id)) return completed @@ -132,7 +145,9 @@ def test_async_client_turn_notification_methods_delegate_to_sync_client() -> Non def test_async_stream_text_uses_sync_turn_routing() -> None: + """Async text streaming should consume the same per-turn routing path as sync.""" async def scenario() -> tuple[list[tuple[str, str]], list[str]]: + """Record routing calls while streaming two deltas and one completion.""" client = AsyncAppServerClient() notifications = [ Notification( @@ -170,17 +185,21 @@ def test_async_stream_text_uses_sync_turn_routing() -> None: calls: list[tuple[str, str]] = [] def fake_turn_start(thread_id: str, text: str, *, params=None): # type: ignore[no-untyped-def] + """Return a started turn id while recording the request thread.""" calls.append(("turn_start", thread_id)) return SimpleNamespace(turn=SimpleNamespace(id="turn-1")) def fake_register(turn_id: str) -> None: + """Record stream registration for the started turn.""" calls.append(("register", turn_id)) def fake_next(turn_id: str) -> Notification: + """Return the next queued turn notification.""" calls.append(("next", turn_id)) return notifications.pop(0) def fake_unregister(turn_id: str) -> None: + """Record stream cleanup for the started turn.""" calls.append(("unregister", turn_id)) client._sync.turn_start = fake_turn_start # type: ignore[method-assign] diff --git a/sdk/python/tests/test_client_rpc_methods.py b/sdk/python/tests/test_client_rpc_methods.py index 07b88215a8..a049960b07 100644 --- a/sdk/python/tests/test_client_rpc_methods.py +++ b/sdk/python/tests/test_client_rpc_methods.py @@ -135,6 +135,7 @@ def test_invalid_notification_payload_falls_back_to_unknown() -> None: def test_generated_notification_turn_id_handles_known_payload_shapes() -> None: + """Generated routing metadata should cover direct, nested, and unscoped payloads.""" direct = AgentMessageDeltaNotification.model_validate( { "delta": "hello", @@ -159,6 +160,7 @@ def test_generated_notification_turn_id_handles_known_payload_shapes() -> None: def test_turn_notification_router_demuxes_registered_turns() -> None: + """The router should deliver out-of-order turn events to the matching queues.""" client = AppServerClient() client.register_turn_notifications("turn-1") client.register_turn_notifications("turn-2") @@ -201,6 +203,7 @@ def test_turn_notification_router_demuxes_registered_turns() -> None: def test_client_reader_routes_interleaved_turn_notifications_by_turn_id() -> None: + """Reader-loop routing should preserve order within each interleaved turn stream.""" client = AppServerClient() client.register_turn_notifications("turn-1") client.register_turn_notifications("turn-2") @@ -245,6 +248,7 @@ def test_client_reader_routes_interleaved_turn_notifications_by_turn_id() -> Non ] def fake_read_message() -> dict[str, object]: + """Feed the reader loop a realistic interleaved stdout sequence.""" if messages: return messages.pop(0) raise EOFError @@ -278,6 +282,7 @@ def test_client_reader_routes_interleaved_turn_notifications_by_turn_id() -> Non def test_turn_notification_router_buffers_events_before_registration() -> None: + """Early turn events should be replayed once their TurnHandle registers.""" client = AppServerClient() client._router.route_notification( client._coerce_notification( @@ -302,6 +307,7 @@ def test_turn_notification_router_buffers_events_before_registration() -> None: def test_turn_notification_router_clears_unregistered_turn_when_completed() -> None: + """A completed unregistered turn should not leave a pending queue behind.""" client = AppServerClient() client._router.route_notification( client._coerce_notification( @@ -328,6 +334,7 @@ def test_turn_notification_router_clears_unregistered_turn_when_completed() -> N def test_turn_notification_router_routes_unknown_turn_notifications() -> None: + """Unknown notifications should still route when their raw params carry a turn id.""" client = AppServerClient() client.register_turn_notifications("turn-1") client.register_turn_notifications("turn-2") diff --git a/sdk/python/tests/test_public_api_runtime_behavior.py b/sdk/python/tests/test_public_api_runtime_behavior.py index 4f0fc45a7c..3cf18c4e47 100644 --- a/sdk/python/tests/test_public_api_runtime_behavior.py +++ b/sdk/python/tests/test_public_api_runtime_behavior.py @@ -227,6 +227,7 @@ def test_async_codex_initializes_only_once_under_concurrency() -> None: def test_turn_streams_can_consume_multiple_turns_on_one_client() -> None: + """Two sync TurnHandle streams should advance independently on one client.""" client = AppServerClient() notifications: dict[str, deque[Notification]] = { "turn-1": deque( @@ -257,10 +258,13 @@ def test_turn_streams_can_consume_multiple_turns_on_one_client() -> None: def test_async_turn_streams_can_consume_multiple_turns_on_one_client() -> None: + """Two async TurnHandle streams should advance independently on one client.""" async def scenario() -> None: + """Interleave two async streams backed by separate per-turn queues.""" codex = AsyncCodex() async def fake_ensure_initialized() -> None: + """Avoid starting a real app-server process for this stream test.""" return None notifications: dict[str, deque[Notification]] = { @@ -279,6 +283,7 @@ def test_async_turn_streams_can_consume_multiple_turns_on_one_client() -> None: } async def fake_next_notification(turn_id: str) -> Notification: + """Return the next notification from the requested per-turn queue.""" return notifications[turn_id].popleft() codex._ensure_initialized = fake_ensure_initialized # type: ignore[method-assign] @@ -468,6 +473,7 @@ def test_thread_run_raises_on_failed_turn() -> None: def test_stream_text_registers_and_consumes_turn_notifications() -> None: + """stream_text should register, consume, and unregister one turn queue.""" client = AppServerClient() notifications: deque[Notification] = deque( [ @@ -482,13 +488,16 @@ def test_stream_text_registers_and_consumes_turn_notifications() -> None: ) def fake_register(turn_id: str) -> None: + """Record registration for the turn created by stream_text.""" calls.append(("register", turn_id)) def fake_next(turn_id: str) -> Notification: + """Return the next queued notification for stream_text.""" calls.append(("next", turn_id)) return notifications.popleft() def fake_unregister(turn_id: str) -> None: + """Record cleanup for the turn created by stream_text.""" calls.append(("unregister", turn_id)) client.register_turn_notifications = fake_register # type: ignore[method-assign] @@ -510,10 +519,13 @@ def test_stream_text_registers_and_consumes_turn_notifications() -> None: def test_async_thread_run_accepts_string_input_and_returns_run_result() -> None: + """Async Thread.run should normalize string input and collect routed results.""" async def scenario() -> None: + """Feed item, usage, and completion events through the async turn stream.""" codex = AsyncCodex() async def fake_ensure_initialized() -> None: + """Avoid starting a real app-server process for this run test.""" return None item_notification = _item_completed_notification(text="Hello async.") @@ -528,12 +540,14 @@ def test_async_thread_run_accepts_string_input_and_returns_run_result() -> None: seen: dict[str, object] = {} async def fake_turn_start(thread_id: str, wire_input: object, *, params=None): # noqa: ANN001,ANN202 + """Capture normalized input and return a synthetic turn id.""" seen["thread_id"] = thread_id seen["wire_input"] = wire_input seen["params"] = params return SimpleNamespace(turn=SimpleNamespace(id="turn-1")) async def fake_next_notification(_turn_id: str) -> Notification: + """Return the next queued notification for the synthetic turn.""" return notifications.popleft() codex._ensure_initialized = fake_ensure_initialized # type: ignore[method-assign] @@ -556,10 +570,13 @@ def test_async_thread_run_accepts_string_input_and_returns_run_result() -> None: def test_async_thread_run_uses_last_completed_assistant_message_as_final_response() -> ( None ): + """Async run should use the last final assistant message as the response text.""" async def scenario() -> None: + """Feed two completed agent messages through the async per-turn stream.""" codex = AsyncCodex() async def fake_ensure_initialized() -> None: + """Avoid starting a real app-server process for this run test.""" return None first_item_notification = _item_completed_notification( @@ -577,9 +594,11 @@ def test_async_thread_run_uses_last_completed_assistant_message_as_final_respons ) async def fake_turn_start(thread_id: str, wire_input: object, *, params=None): # noqa: ANN001,ANN202,ARG001 + """Return a synthetic turn id after AsyncThread.run builds input.""" return SimpleNamespace(turn=SimpleNamespace(id="turn-1")) async def fake_next_notification(_turn_id: str) -> Notification: + """Return the next queued notification for that synthetic turn.""" return notifications.popleft() codex._ensure_initialized = fake_ensure_initialized # type: ignore[method-assign] @@ -598,10 +617,13 @@ def test_async_thread_run_uses_last_completed_assistant_message_as_final_respons def test_async_thread_run_returns_none_when_only_commentary_messages_complete() -> None: + """Async Thread.run should ignore commentary-only messages for final text.""" async def scenario() -> None: + """Feed a commentary item and completion through the async turn stream.""" codex = AsyncCodex() async def fake_ensure_initialized() -> None: + """Avoid starting a real app-server process for this run test.""" return None commentary_notification = _item_completed_notification( @@ -616,9 +638,11 @@ def test_async_thread_run_returns_none_when_only_commentary_messages_complete() ) async def fake_turn_start(thread_id: str, wire_input: object, *, params=None): # noqa: ANN001,ANN202,ARG001 + """Return a synthetic turn id for commentary-only output.""" return SimpleNamespace(turn=SimpleNamespace(id="turn-1")) async def fake_next_notification(_turn_id: str) -> Notification: + """Return the next queued commentary/completion notification.""" return notifications.popleft() codex._ensure_initialized = fake_ensure_initialized # type: ignore[method-assign]