diff --git a/sdk/python/tests/pinned_app_server_helpers.py b/sdk/python/tests/pinned_app_server_helpers.py new file mode 100644 index 0000000000..e11390d527 --- /dev/null +++ b/sdk/python/tests/pinned_app_server_helpers.py @@ -0,0 +1,168 @@ +from __future__ import annotations + +from collections.abc import AsyncIterator, Iterable, Iterator +from typing import Any + +from app_server_harness import ( + ev_assistant_message, + ev_completed, + ev_message_item_added, + ev_output_text_delta, + ev_response_created, + sse, +) +from openai_codex.generated.v2_all import ( + AgentMessageDeltaNotification, + ItemCompletedNotification, + MessagePhase, +) +from openai_codex.models import Notification + +TINY_PNG_BYTES = bytes( + [ + 137, + 80, + 78, + 71, + 13, + 10, + 26, + 10, + 0, + 0, + 0, + 13, + 73, + 72, + 68, + 82, + 0, + 0, + 0, + 1, + 0, + 0, + 0, + 1, + 8, + 6, + 0, + 0, + 0, + 31, + 21, + 196, + 137, + 0, + 0, + 0, + 11, + 73, + 68, + 65, + 84, + 120, + 156, + 99, + 96, + 0, + 2, + 0, + 0, + 5, + 0, + 1, + 122, + 94, + 171, + 63, + 0, + 0, + 0, + 0, + 73, + 69, + 78, + 68, + 174, + 66, + 96, + 130, + ] +) + + +def response_approval_policy(response: Any) -> str: + """Return serialized approvalPolicy from a generated thread response.""" + return response.model_dump(by_alias=True, mode="json")["approvalPolicy"] + + +def agent_message_texts(events: list[Notification]) -> list[str]: + """Extract completed agent-message text from SDK notifications.""" + texts: list[str] = [] + for event in events: + if not isinstance(event.payload, ItemCompletedNotification): + continue + item = event.payload.item.root + if item.type == "agentMessage": + texts.append(item.text) + return texts + + +def agent_message_texts_from_items(items: Iterable[Any]) -> list[str]: + """Extract agent-message text from completed run result items.""" + texts: list[str] = [] + for item in items: + root = item.root + if root.type == "agentMessage": + texts.append(root.text) + return texts + + +def next_sync_delta(stream: Iterator[Notification]) -> str: + """Advance a sync turn stream until the next agent-message text delta.""" + for event in stream: + if isinstance(event.payload, AgentMessageDeltaNotification): + return event.payload.delta + raise AssertionError("stream completed before an agent-message delta") + + +async def next_async_delta(stream: AsyncIterator[Notification]) -> str: + """Advance an async turn stream until the next agent-message text delta.""" + async for event in stream: + if isinstance(event.payload, AgentMessageDeltaNotification): + return event.payload.delta + raise AssertionError("stream completed before an agent-message delta") + + +def streaming_response(response_id: str, item_id: str, parts: list[str]) -> str: + """Build an SSE stream with text deltas and a final assistant message.""" + return sse( + [ + ev_response_created(response_id), + ev_message_item_added(item_id), + *[ev_output_text_delta(part) for part in parts], + ev_assistant_message(item_id, "".join(parts)), + ev_completed(response_id), + ] + ) + + +def assistant_message_with_phase( + item_id: str, + text: str, + phase: MessagePhase, +) -> dict[str, Any]: + """Build an assistant message event carrying app-server phase metadata.""" + event = ev_assistant_message(item_id, text) + event["item"] = {**event["item"], "phase": phase.value} + return event + + +def request_kind(request_path: str) -> str: + """Classify captured mock-server request paths for compact assertions.""" + if request_path.endswith("/responses/compact"): + return "compact" + if request_path.endswith("/responses"): + return "responses" + return request_path diff --git a/sdk/python/tests/test_mock_app_server_integration.py b/sdk/python/tests/test_mock_app_server_integration.py deleted file mode 100644 index 08e3e5c0d6..0000000000 --- a/sdk/python/tests/test_mock_app_server_integration.py +++ /dev/null @@ -1,1076 +0,0 @@ -from __future__ import annotations - -import asyncio -from collections.abc import AsyncIterator, Iterable, Iterator -from typing import Any - -import pytest - -from app_server_harness import ( - AppServerHarness, - ev_assistant_message, - ev_completed, - ev_failed, - ev_message_item_added, - ev_output_text_delta, - ev_response_created, - sse, -) -from openai_codex import ( - ApprovalMode, - AsyncCodex, - Codex, - ImageInput, - LocalImageInput, - TextInput, -) -from openai_codex.generated.v2_all import ( - AgentMessageDeltaNotification, - AskForApprovalValue, - ItemCompletedNotification, - MessagePhase, - ThreadResumeParams, - TurnCompletedNotification, - TurnStatus, -) -from openai_codex.models import Notification - -TINY_PNG_BYTES = bytes( - [ - 137, - 80, - 78, - 71, - 13, - 10, - 26, - 10, - 0, - 0, - 0, - 13, - 73, - 72, - 68, - 82, - 0, - 0, - 0, - 1, - 0, - 0, - 0, - 1, - 8, - 6, - 0, - 0, - 0, - 31, - 21, - 196, - 137, - 0, - 0, - 0, - 11, - 73, - 68, - 65, - 84, - 120, - 156, - 99, - 96, - 0, - 2, - 0, - 0, - 5, - 0, - 1, - 122, - 94, - 171, - 63, - 0, - 0, - 0, - 0, - 73, - 69, - 78, - 68, - 174, - 66, - 96, - 130, - ] -) - - -def _response_approval_policy(response: Any) -> str: - """Return serialized approvalPolicy from a generated thread response.""" - return response.model_dump(by_alias=True, mode="json")["approvalPolicy"] - - -def _agent_message_texts(events: list[Notification]) -> list[str]: - """Extract completed agent-message text from SDK notifications.""" - texts: list[str] = [] - for event in events: - if not isinstance(event.payload, ItemCompletedNotification): - continue - item = event.payload.item.root - if item.type == "agentMessage": - texts.append(item.text) - return texts - - -def _agent_message_texts_from_items(items: Iterable[Any]) -> list[str]: - """Extract agent-message text from completed run result items.""" - texts: list[str] = [] - for item in items: - root = item.root - if root.type == "agentMessage": - texts.append(root.text) - return texts - - -def _next_sync_delta(stream: Iterator[Notification]) -> str: - """Advance a sync turn stream until the next agent-message text delta.""" - for event in stream: - if isinstance(event.payload, AgentMessageDeltaNotification): - return event.payload.delta - raise AssertionError("stream completed before an agent-message delta") - - -async def _next_async_delta(stream: AsyncIterator[Notification]) -> str: - """Advance an async turn stream until the next agent-message text delta.""" - async for event in stream: - if isinstance(event.payload, AgentMessageDeltaNotification): - return event.payload.delta - raise AssertionError("stream completed before an agent-message delta") - - -def _streaming_response(response_id: str, item_id: str, parts: list[str]) -> str: - """Build an SSE stream with text deltas and a final assistant message.""" - return sse( - [ - ev_response_created(response_id), - ev_message_item_added(item_id), - *[ev_output_text_delta(part) for part in parts], - ev_assistant_message(item_id, "".join(parts)), - ev_completed(response_id), - ] - ) - - -def _assistant_message_with_phase( - item_id: str, - text: str, - phase: MessagePhase, -) -> dict[str, Any]: - """Build an assistant message event carrying app-server phase metadata.""" - event = ev_assistant_message(item_id, text) - event["item"] = {**event["item"], "phase": phase.value} - return event - - -def _request_kind(request_path: str) -> str: - """Classify captured mock-server request paths for compact assertions.""" - if request_path.endswith("/responses/compact"): - return "compact" - if request_path.endswith("/responses"): - return "responses" - return request_path - - -def test_sync_thread_run_uses_pinned_app_server_and_mock_responses( - tmp_path, -) -> None: - """Drive Thread.run through the pinned app-server and inspect the HTTP request.""" - with AppServerHarness(tmp_path) as harness: - harness.responses.enqueue_assistant_message("Hello from the mock.", response_id="run-1") - - with Codex(config=harness.app_server_config()) as codex: - thread = codex.thread_start() - result = thread.run("hello") - - request = harness.responses.single_request() - - body = request.body_json() - assert { - "final_response": result.final_response, - "agent_messages": _agent_message_texts_from_items(result.items), - "has_usage": result.usage is not None, - "request_model": body["model"], - "request_stream": body["stream"], - "request_user_texts": request.message_input_texts("user")[-1:], - } == { - "final_response": "Hello from the mock.", - "agent_messages": ["Hello from the mock."], - "has_usage": True, - "request_model": "mock-model", - "request_stream": True, - "request_user_texts": ["hello"], - } - - -def test_async_thread_run_uses_pinned_app_server_and_mock_responses( - tmp_path, -) -> None: - """Async Thread.run should exercise the same app-server boundary.""" - - async def scenario() -> None: - """Run the async client against a real app-server process.""" - with AppServerHarness(tmp_path) as harness: - harness.responses.enqueue_assistant_message( - "Hello async.", - response_id="async-run-1", - ) - - async with AsyncCodex(config=harness.app_server_config()) as codex: - thread = await codex.thread_start() - result = await thread.run("async hello") - - request = harness.responses.single_request() - - assert { - "final_response": result.final_response, - "agent_messages": _agent_message_texts_from_items(result.items), - "request_user_texts": request.message_input_texts("user")[-1:], - } == { - "final_response": "Hello async.", - "agent_messages": ["Hello async."], - "request_user_texts": ["async hello"], - } - - asyncio.run(scenario()) - - -def test_run_result_item_semantics_use_real_app_server(tmp_path) -> None: - """RunResult should reflect real item notifications, not synthetic client queues.""" - cases = [ - ( - "last unknown phase wins", - sse( - [ - ev_response_created("items-last"), - ev_assistant_message("msg-items-first", "First message"), - ev_assistant_message("msg-items-second", "Second message"), - ev_completed("items-last"), - ] - ), - "Second message", - ["First message", "Second message"], - ), - ( - "empty last message is preserved", - sse( - [ - ev_response_created("items-empty"), - ev_assistant_message("msg-items-nonempty", "First message"), - ev_assistant_message("msg-items-empty", ""), - ev_completed("items-empty"), - ] - ), - "", - ["First message", ""], - ), - ( - "commentary only is not final", - sse( - [ - ev_response_created("items-commentary"), - _assistant_message_with_phase( - "msg-items-commentary", - "Commentary", - MessagePhase.commentary, - ), - ev_completed("items-commentary"), - ] - ), - None, - ["Commentary"], - ), - ] - - with AppServerHarness(tmp_path) as harness: - for _, body, _, _ in cases: - harness.responses.enqueue_sse(body) - - with Codex(config=harness.app_server_config()) as codex: - results = [ - codex.thread_start().run(f"case: {name}") for name, _, _, _ in cases - ] - - assert [ - { - "final_response": result.final_response, - "agent_messages": _agent_message_texts_from_items(result.items), - } - for result in results - ] == [ - { - "final_response": final_response, - "agent_messages": agent_messages, - } - for _, _, final_response, agent_messages in cases - ] - - -def test_thread_run_raises_when_real_app_server_reports_failed_turn(tmp_path) -> None: - """Thread.run should surface the failed turn error emitted by app-server.""" - with AppServerHarness(tmp_path) as harness: - harness.responses.enqueue_sse( - sse( - [ - ev_response_created("failed-run"), - ev_failed("failed-run", "boom from mock model"), - ] - ) - ) - - with Codex(config=harness.app_server_config()) as codex: - thread = codex.thread_start() - with pytest.raises(RuntimeError, match="boom from mock model"): - thread.run("trigger failure") - - -def test_async_run_result_item_semantics_use_real_app_server(tmp_path) -> None: - """Async RunResult should use the same real app-server notification mapping.""" - - async def scenario() -> None: - """Run multiple async result cases against one app-server process.""" - cases = [ - ( - "last async unknown phase wins", - sse( - [ - ev_response_created("async-items-last"), - ev_assistant_message( - "msg-async-items-first", - "First async message", - ), - ev_assistant_message( - "msg-async-items-second", - "Second async message", - ), - ev_completed("async-items-last"), - ] - ), - "Second async message", - ["First async message", "Second async message"], - ), - ( - "async commentary only is not final", - sse( - [ - ev_response_created("async-items-commentary"), - _assistant_message_with_phase( - "msg-async-items-commentary", - "Async commentary", - MessagePhase.commentary, - ), - ev_completed("async-items-commentary"), - ] - ), - None, - ["Async commentary"], - ), - ] - - with AppServerHarness(tmp_path) as harness: - for _, body, _, _ in cases: - harness.responses.enqueue_sse(body) - - async with AsyncCodex(config=harness.app_server_config()) as codex: - results = [ - await (await codex.thread_start()).run(f"case: {name}") - for name, _, _, _ in cases - ] - - assert [ - { - "final_response": result.final_response, - "agent_messages": _agent_message_texts_from_items(result.items), - } - for result in results - ] == [ - { - "final_response": final_response, - "agent_messages": agent_messages, - } - for _, _, final_response, agent_messages in cases - ] - - asyncio.run(scenario()) - - -def test_multimodal_inputs_reach_responses_api_through_real_app_server( - tmp_path, -) -> None: - """Remote and local image inputs should survive the SDK and app-server boundary.""" - local_image = tmp_path / "local.png" - local_image.write_bytes(TINY_PNG_BYTES) - remote_image_url = "https://example.com/codex.png" - - with AppServerHarness(tmp_path) as harness: - harness.responses.enqueue_assistant_message( - "remote image received", - response_id="remote-image", - ) - harness.responses.enqueue_assistant_message( - "local image received", - response_id="local-image", - ) - - with Codex(config=harness.app_server_config()) as codex: - thread = codex.thread_start() - remote_result = thread.run( - [ - TextInput("Describe the remote image."), - ImageInput(remote_image_url), - ] - ) - local_result = thread.run( - [ - TextInput("Describe the local image."), - LocalImageInput(str(local_image)), - ] - ) - requests = harness.responses.wait_for_requests(2) - - assert { - "final_responses": [ - remote_result.final_response, - local_result.final_response, - ], - "contains_user_prompts": [ - "Describe the remote image." in requests[0].message_input_texts("user"), - "Describe the local image." in requests[1].message_input_texts("user"), - ], - "image_url_shapes": [ - requests[0].message_image_urls("user")[0], - requests[1].message_image_urls("user")[-1].startswith( - "data:image/png;base64," - ), - ], - } == { - "final_responses": ["remote image received", "local image received"], - "contains_user_prompts": [True, True], - "image_url_shapes": [remote_image_url, True], - } - - -def test_sync_stream_routes_text_deltas_and_completion(tmp_path) -> None: - """A sync turn stream should expose deltas, completed items, and completion.""" - with AppServerHarness(tmp_path) as harness: - harness.responses.enqueue_sse( - _streaming_response("stream-1", "msg-stream-1", ["hel", "lo"]) - ) - - with Codex(config=harness.app_server_config()) as codex: - thread = codex.thread_start() - stream = thread.turn(TextInput("stream please")).stream() - events = list(stream) - - assert { - "deltas": [ - event.payload.delta - for event in events - if isinstance(event.payload, AgentMessageDeltaNotification) - ], - "agent_messages": _agent_message_texts(events), - "completed_statuses": [ - event.payload.turn.status - for event in events - if isinstance(event.payload, TurnCompletedNotification) - ], - } == { - "deltas": ["hel", "lo"], - "agent_messages": ["hello"], - "completed_statuses": [TurnStatus.completed], - } - - -def test_turn_run_returns_completed_turn_from_real_app_server(tmp_path) -> None: - """TurnHandle.run should wait for the app-server completion notification.""" - with AppServerHarness(tmp_path) as harness: - harness.responses.enqueue_assistant_message("turn complete", response_id="turn-run-1") - - with Codex(config=harness.app_server_config()) as codex: - thread = codex.thread_start() - turn = thread.turn(TextInput("complete this turn")) - completed = turn.run() - - assert { - "turn_id": completed.id, - "status": completed.status, - "items": completed.items, - } == { - "turn_id": turn.id, - "status": TurnStatus.completed, - "items": [], - } - - -def test_async_stream_routes_text_deltas_and_completion(tmp_path) -> None: - """An async turn stream should expose the same notification sequence.""" - - async def scenario() -> None: - """Stream one async turn against the real pinned app-server.""" - with AppServerHarness(tmp_path) as harness: - harness.responses.enqueue_sse( - _streaming_response("async-stream-1", "msg-async-stream-1", ["as", "ync"]) - ) - - async with AsyncCodex(config=harness.app_server_config()) as codex: - thread = await codex.thread_start() - turn = await thread.turn(TextInput("async stream please")) - events = [event async for event in turn.stream()] - - assert { - "deltas": [ - event.payload.delta - for event in events - if isinstance(event.payload, AgentMessageDeltaNotification) - ], - "agent_messages": _agent_message_texts(events), - "completed_statuses": [ - event.payload.turn.status - for event in events - if isinstance(event.payload, TurnCompletedNotification) - ], - } == { - "deltas": ["as", "ync"], - "agent_messages": ["async"], - "completed_statuses": [TurnStatus.completed], - } - - asyncio.run(scenario()) - - -def test_low_level_sync_stream_text_uses_real_turn_routing(tmp_path) -> None: - """AppServerClient.stream_text should stream through a real app-server turn.""" - with AppServerHarness(tmp_path) as harness: - harness.responses.enqueue_sse( - _streaming_response("low-sync-stream", "msg-low-sync-stream", ["fir", "st"]) - ) - - with Codex(config=harness.app_server_config()) as codex: - thread = codex.thread_start() - chunks = list(codex._client.stream_text(thread.id, "low-level sync")) # noqa: SLF001 - - assert [chunk.delta for chunk in chunks] == ["fir", "st"] - - -def test_low_level_async_stream_text_allows_parallel_model_list(tmp_path) -> None: - """Async stream_text should yield without blocking another app-server request.""" - - async def scenario() -> None: - """Leave a stream open while another async request completes.""" - with AppServerHarness(tmp_path) as harness: - harness.responses.enqueue_sse( - _streaming_response( - "low-async-stream", - "msg-low-async-stream", - ["one", "two", "three"], - ), - delay_between_events_s=0.03, - ) - - async with AsyncCodex(config=harness.app_server_config()) as codex: - thread = await codex.thread_start() - stream = codex._client.stream_text( # noqa: SLF001 - thread.id, - "low-level async", - ) - first = await anext(stream) - models_task = asyncio.create_task(codex.models()) - models = await asyncio.wait_for(models_task, timeout=1.0) - remaining = [chunk.delta async for chunk in stream] - - assert { - "first": first.delta, - "remaining": remaining, - "models_payload_has_data": isinstance( - models.model_dump(by_alias=True, mode="json").get("data"), - list, - ), - } == { - "first": "one", - "remaining": ["two", "three"], - "models_payload_has_data": True, - } - - asyncio.run(scenario()) - - -def test_turn_steer_adds_follow_up_input_through_real_app_server(tmp_path) -> None: - """Steering an active turn should create a follow-up Responses request.""" - with AppServerHarness(tmp_path) as harness: - harness.responses.enqueue_sse( - _streaming_response("steer-first", "msg-steer-first", ["before steer"]), - delay_between_events_s=0.2, - ) - harness.responses.enqueue_assistant_message( - "after steer", - response_id="steer-second", - ) - - with Codex(config=harness.app_server_config()) as codex: - thread = codex.thread_start() - turn = thread.turn(TextInput("Start a steerable turn.")) - harness.responses.wait_for_requests(1) - steer = turn.steer(TextInput("Use this steering input.")) - events = list(turn.stream()) - requests = harness.responses.wait_for_requests(2) - - assert { - "steered_turn_id": steer.turn_id, - "turn_id": turn.id, - "agent_messages": _agent_message_texts(events), - "last_user_texts": [ - request.message_input_texts("user")[-1] for request in requests - ], - } == { - "steered_turn_id": turn.id, - "turn_id": turn.id, - "agent_messages": ["before steer", "after steer"], - "last_user_texts": [ - "Start a steerable turn.", - "Use this steering input.", - ], - } - - -def test_turn_interrupt_stops_active_turn_and_follow_up_runs(tmp_path) -> None: - """Interrupting an active turn should complete it and leave the thread usable.""" - with AppServerHarness(tmp_path) as harness: - harness.responses.enqueue_sse( - _streaming_response( - "interrupt-first", - "msg-interrupt-first", - ["still ", "running"], - ), - delay_between_events_s=0.2, - ) - harness.responses.enqueue_assistant_message( - "after interrupt", - response_id="interrupt-follow-up", - ) - - with Codex(config=harness.app_server_config()) as codex: - thread = codex.thread_start() - interrupted_turn = thread.turn(TextInput("Start a long turn.")) - harness.responses.wait_for_requests(1) - interrupt_response = interrupted_turn.interrupt() - completed = interrupted_turn.run() - follow_up = thread.run("Continue after the interrupt.") - - assert { - "interrupt_response": interrupt_response.model_dump( - by_alias=True, - mode="json", - ), - "interrupted_status": completed.status, - "follow_up": follow_up.final_response, - } == { - "interrupt_response": {}, - "interrupted_status": TurnStatus.interrupted, - "follow_up": "after interrupt", - } - - -def test_interleaved_sync_turn_streams_route_by_turn_id(tmp_path) -> None: - """Two sync streams on one client should consume only their own notifications.""" - with AppServerHarness(tmp_path) as harness: - harness.responses.enqueue_sse( - _streaming_response("first-stream", "msg-first", ["one-", "done"]), - delay_between_events_s=0.01, - ) - harness.responses.enqueue_sse( - _streaming_response("second-stream", "msg-second", ["two-", "done"]), - delay_between_events_s=0.01, - ) - - with Codex(config=harness.app_server_config()) as codex: - first_thread = codex.thread_start() - second_thread = codex.thread_start() - first_turn = first_thread.turn(TextInput("first")) - second_turn = second_thread.turn(TextInput("second")) - - first_stream = first_turn.stream() - second_stream = second_turn.stream() - first_first_delta = _next_sync_delta(first_stream) - second_first_delta = _next_sync_delta(second_stream) - first_second_delta = _next_sync_delta(first_stream) - second_second_delta = _next_sync_delta(second_stream) - first_tail = list(first_stream) - second_tail = list(second_stream) - - assert { - "streams": sorted( - [ - ( - first_first_delta, - first_second_delta, - _agent_message_texts(first_tail), - ), - ( - second_first_delta, - second_second_delta, - _agent_message_texts(second_tail), - ), - ] - ), - } == { - "streams": [ - ("one-", "done", ["one-done"]), - ("two-", "done", ["two-done"]), - ], - } - - -def test_interleaved_async_turn_streams_route_by_turn_id(tmp_path) -> None: - """Two async streams on one client should consume only their own notifications.""" - - async def scenario() -> None: - """Interleave async stream consumers against one app-server process.""" - with AppServerHarness(tmp_path) as harness: - harness.responses.enqueue_sse( - _streaming_response("async-first", "msg-async-first", ["a1", "-done"]), - delay_between_events_s=0.01, - ) - harness.responses.enqueue_sse( - _streaming_response("async-second", "msg-async-second", ["a2", "-done"]), - delay_between_events_s=0.01, - ) - - async with AsyncCodex(config=harness.app_server_config()) as codex: - first_thread = await codex.thread_start() - second_thread = await codex.thread_start() - first_turn = await first_thread.turn(TextInput("async first")) - second_turn = await second_thread.turn(TextInput("async second")) - - first_stream = first_turn.stream() - second_stream = second_turn.stream() - first_first_delta = await _next_async_delta(first_stream) - second_first_delta = await _next_async_delta(second_stream) - first_second_delta = await _next_async_delta(first_stream) - second_second_delta = await _next_async_delta(second_stream) - first_tail = [event async for event in first_stream] - second_tail = [event async for event in second_stream] - - assert { - "streams": sorted( - [ - ( - first_first_delta, - first_second_delta, - _agent_message_texts(first_tail), - ), - ( - second_first_delta, - second_second_delta, - _agent_message_texts(second_tail), - ), - ] - ), - } == { - "streams": [ - ("a1", "-done", ["a1-done"]), - ("a2", "-done", ["a2-done"]), - ], - } - - asyncio.run(scenario()) - - -def test_approval_modes_preserve_real_app_server_state_without_override( - tmp_path, -) -> None: - """Resume, fork, and next turn should inherit approval settings unless overridden.""" - with AppServerHarness(tmp_path) as harness: - harness.responses.enqueue_assistant_message("source seeded", response_id="turn-mode-0") - harness.responses.enqueue_assistant_message("turn override", response_id="turn-mode-1") - harness.responses.enqueue_assistant_message("turn inherited", response_id="turn-mode-2") - - with Codex(config=harness.app_server_config()) as codex: - source = codex.thread_start(approval_mode=ApprovalMode.deny_all) - source_result = source.run("seed the source rollout") - resumed = codex.thread_resume(source.id) - forked = codex.thread_fork(source.id) - explicit_fork = codex.thread_fork( - source.id, - approval_mode=ApprovalMode.auto_review, - ) - - turn_thread = codex.thread_start() - first_result = turn_thread.run( - "deny this and later turns", - approval_mode=ApprovalMode.deny_all, - ) - after_turn_override = codex._client.thread_resume( # noqa: SLF001 - turn_thread.id, - ThreadResumeParams(thread_id=turn_thread.id), - ) - second_result = turn_thread.run("inherit previous approval mode") - after_omitted_turn = codex._client.thread_resume( # noqa: SLF001 - turn_thread.id, - ThreadResumeParams(thread_id=turn_thread.id), - ) - - inherited_policies = { - "resumed": _response_approval_policy( - codex._client.thread_resume( # noqa: SLF001 - resumed.id, - ThreadResumeParams(thread_id=resumed.id), - ) - ), - "forked": _response_approval_policy( - codex._client.thread_resume( # noqa: SLF001 - forked.id, - ThreadResumeParams(thread_id=forked.id), - ) - ), - "explicit_fork": _response_approval_policy( - codex._client.thread_resume( # noqa: SLF001 - explicit_fork.id, - ThreadResumeParams(thread_id=explicit_fork.id), - ) - ), - "after_turn_override": _response_approval_policy(after_turn_override), - "after_omitted_turn": _response_approval_policy(after_omitted_turn), - } - - assert { - "policies": inherited_policies, - "final_responses": [ - source_result.final_response, - first_result.final_response, - second_result.final_response, - ], - } == { - "policies": { - "resumed": AskForApprovalValue.never.value, - "forked": AskForApprovalValue.never.value, - "explicit_fork": AskForApprovalValue.on_request.value, - "after_turn_override": AskForApprovalValue.never.value, - "after_omitted_turn": AskForApprovalValue.never.value, - }, - "final_responses": ["source seeded", "turn override", "turn inherited"], - } - - -def test_thread_run_approval_mode_persists_until_explicit_override(tmp_path) -> None: - """Omitted run approval mode should not rewrite the thread's stored setting.""" - with AppServerHarness(tmp_path) as harness: - harness.responses.enqueue_assistant_message("locked down", response_id="approval-1") - harness.responses.enqueue_assistant_message("reviewable", response_id="approval-2") - - with Codex(config=harness.app_server_config()) as codex: - thread = codex.thread_start(approval_mode=ApprovalMode.deny_all) - - first_result = thread.run("keep approvals denied") - after_default_run = codex._client.thread_resume( # noqa: SLF001 - thread.id, - ThreadResumeParams(thread_id=thread.id), - ) - second_result = thread.run( - "allow auto review now", - approval_mode=ApprovalMode.auto_review, - ) - after_override_run = codex._client.thread_resume( # noqa: SLF001 - thread.id, - ThreadResumeParams(thread_id=thread.id), - ) - - assert { - "after_default_policy": _response_approval_policy(after_default_run), - "after_override_policy": _response_approval_policy(after_override_run), - "final_responses": [ - first_result.final_response, - second_result.final_response, - ], - } == { - "after_default_policy": AskForApprovalValue.never.value, - "after_override_policy": AskForApprovalValue.on_request.value, - "final_responses": ["locked down", "reviewable"], - } - - -def test_async_thread_run_approval_mode_persists_until_explicit_override( - tmp_path, -) -> None: - """Async omitted run approval mode should leave stored settings alone.""" - - async def scenario() -> None: - """Use the async client to verify persisted app-server approval state.""" - with AppServerHarness(tmp_path) as harness: - harness.responses.enqueue_assistant_message( - "async locked down", - response_id="async-approval-1", - ) - harness.responses.enqueue_assistant_message( - "async reviewable", - response_id="async-approval-2", - ) - - async with AsyncCodex(config=harness.app_server_config()) as codex: - thread = await codex.thread_start(approval_mode=ApprovalMode.deny_all) - first_result = await thread.run("keep async approvals denied") - after_default_run = await codex._client.thread_resume( # noqa: SLF001 - thread.id, - ThreadResumeParams(thread_id=thread.id), - ) - second_result = await thread.run( - "allow async auto review now", - approval_mode=ApprovalMode.auto_review, - ) - after_override_run = await codex._client.thread_resume( # noqa: SLF001 - thread.id, - ThreadResumeParams(thread_id=thread.id), - ) - - assert { - "after_default_policy": _response_approval_policy(after_default_run), - "after_override_policy": _response_approval_policy(after_override_run), - "final_responses": [ - first_result.final_response, - second_result.final_response, - ], - } == { - "after_default_policy": AskForApprovalValue.never.value, - "after_override_policy": AskForApprovalValue.on_request.value, - "final_responses": ["async locked down", "async reviewable"], - } - - asyncio.run(scenario()) - - -def test_thread_lifecycle_uses_real_app_server_without_model_mocking(tmp_path) -> None: - """Thread lifecycle helpers should operate through app-server JSON-RPC.""" - with AppServerHarness(tmp_path) as harness: - with Codex(config=harness.app_server_config()) as codex: - thread = codex.thread_start() - thread.set_name("sdk integration thread") - named = thread.read(include_turns=True) - forked = codex.thread_fork(thread.id) - - assert { - "name": named.thread.name, - "fork_parent": forked.id != thread.id, - } == { - "name": "sdk integration thread", - "fork_parent": True, - } - - -def test_archive_unarchive_round_trip_uses_real_app_server(tmp_path) -> None: - """Archive helpers should use real app-server lifecycle RPCs.""" - with AppServerHarness(tmp_path) as harness: - harness.responses.enqueue_assistant_message("materialized", response_id="archive-seed") - - with Codex(config=harness.app_server_config()) as codex: - thread = codex.thread_start() - seeded = thread.run("materialize this thread before archive") - archived = codex.thread_archive(thread.id) - unarchived = codex.thread_unarchive(thread.id) - read = unarchived.read() - - assert { - "seeded_response": seeded.final_response, - "archive_response": archived.model_dump(by_alias=True, mode="json"), - "unarchived_id": unarchived.id, - "read_id": read.thread.id, - } == { - "seeded_response": "materialized", - "archive_response": {}, - "unarchived_id": thread.id, - "read_id": thread.id, - } - - -def test_models_and_compact_use_real_app_server_rpcs(tmp_path) -> None: - """Model listing and compaction should go through real app-server methods.""" - with AppServerHarness(tmp_path) as harness: - harness.responses.enqueue_assistant_message("history", response_id="compact-history") - harness.responses.enqueue_assistant_message( - "compact summary", - response_id="compact-summary", - ) - - with Codex(config=harness.app_server_config()) as codex: - models = codex.models(include_hidden=True) - thread = codex.thread_start() - run_result = thread.run("create history") - compact_response = thread.compact() - requests = harness.responses.wait_for_requests(2) - - assert { - "models_payload_has_data": isinstance( - models.model_dump(by_alias=True, mode="json").get("data"), - list, - ), - "run_final_response": run_result.final_response, - "compact_response": compact_response.model_dump( - by_alias=True, - mode="json", - ), - "request_kinds": [_request_kind(request.path) for request in requests], - } == { - "models_payload_has_data": True, - "run_final_response": "history", - "compact_response": {}, - "request_kinds": ["responses", "responses"], - } - - -def test_final_answer_phase_survives_real_app_server_mapping(tmp_path) -> None: - """RunResult should use the final-answer item emitted by app-server.""" - with AppServerHarness(tmp_path) as harness: - harness.responses.enqueue_sse( - sse( - [ - ev_response_created("phase-1"), - { - **ev_assistant_message("msg-commentary", "Commentary"), - "item": { - **ev_assistant_message("msg-commentary", "Commentary")["item"], - "phase": MessagePhase.commentary.value, - }, - }, - { - **ev_assistant_message("msg-final", "Final answer"), - "item": { - **ev_assistant_message("msg-final", "Final answer")["item"], - "phase": MessagePhase.final_answer.value, - }, - }, - ev_completed("phase-1"), - ] - ) - ) - - with Codex(config=harness.app_server_config()) as codex: - result = codex.thread_start().run("choose final answer") - - assert { - "final_response": result.final_response, - "items": [ - { - "text": item.root.text, - "phase": None if item.root.phase is None else item.root.phase.value, - } - for item in result.items - if item.root.type == "agentMessage" - ], - } == { - "final_response": "Final answer", - "items": [ - {"text": "Commentary", "phase": MessagePhase.commentary.value}, - {"text": "Final answer", "phase": MessagePhase.final_answer.value}, - ], - } diff --git a/sdk/python/tests/test_pinned_app_server_approval_modes_integration.py b/sdk/python/tests/test_pinned_app_server_approval_modes_integration.py new file mode 100644 index 0000000000..12200f8102 --- /dev/null +++ b/sdk/python/tests/test_pinned_app_server_approval_modes_integration.py @@ -0,0 +1,207 @@ +from __future__ import annotations + +import asyncio + +from app_server_harness import AppServerHarness +from openai_codex import ApprovalMode, AsyncCodex, Codex +from openai_codex.generated.v2_all import AskForApprovalValue, ThreadResumeParams +from pinned_app_server_helpers import response_approval_policy + + +def test_thread_resume_inherits_deny_all_approval_mode(tmp_path) -> None: + """Resuming a thread should preserve its stored approval mode.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message("source seeded", response_id="resume-mode") + + with Codex(config=harness.app_server_config()) as codex: + source = codex.thread_start(approval_mode=ApprovalMode.deny_all) + result = source.run("seed the source rollout") + resumed = codex.thread_resume(source.id) + resumed_state = codex._client.thread_resume( # noqa: SLF001 + resumed.id, + ThreadResumeParams(thread_id=resumed.id), + ) + + assert { + "final_response": result.final_response, + "resumed_policy": response_approval_policy(resumed_state), + } == { + "final_response": "source seeded", + "resumed_policy": AskForApprovalValue.never.value, + } + + +def test_thread_fork_inherits_deny_all_approval_mode(tmp_path) -> None: + """Forking without an override should preserve the source approval mode.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message("source seeded", response_id="fork-mode") + + with Codex(config=harness.app_server_config()) as codex: + source = codex.thread_start(approval_mode=ApprovalMode.deny_all) + result = source.run("seed the source rollout") + forked = codex.thread_fork(source.id) + forked_state = codex._client.thread_resume( # noqa: SLF001 + forked.id, + ThreadResumeParams(thread_id=forked.id), + ) + + assert { + "final_response": result.final_response, + "forked_is_distinct": forked.id != source.id, + "forked_policy": response_approval_policy(forked_state), + } == { + "final_response": "source seeded", + "forked_is_distinct": True, + "forked_policy": AskForApprovalValue.never.value, + } + + +def test_thread_fork_can_override_approval_mode(tmp_path) -> None: + """Forking with an explicit approval mode should send an override.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message( + "source seeded", + response_id="fork-override-mode", + ) + + with Codex(config=harness.app_server_config()) as codex: + source = codex.thread_start(approval_mode=ApprovalMode.deny_all) + result = source.run("seed the source rollout") + forked = codex.thread_fork( + source.id, + approval_mode=ApprovalMode.auto_review, + ) + forked_state = codex._client.thread_resume( # noqa: SLF001 + forked.id, + ThreadResumeParams(thread_id=forked.id), + ) + + assert { + "final_response": result.final_response, + "forked_policy": response_approval_policy(forked_state), + } == { + "final_response": "source seeded", + "forked_policy": AskForApprovalValue.on_request.value, + } + + +def test_turn_approval_mode_persists_until_next_turn(tmp_path) -> None: + """A turn-level approval override should apply to later omitted-arg turns.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message("turn override", response_id="turn-mode-1") + harness.responses.enqueue_assistant_message("turn inherited", response_id="turn-mode-2") + + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + first_result = thread.run( + "deny this and later turns", + approval_mode=ApprovalMode.deny_all, + ) + after_turn_override = codex._client.thread_resume( # noqa: SLF001 + thread.id, + ThreadResumeParams(thread_id=thread.id), + ) + second_result = thread.run("inherit previous approval mode") + after_omitted_turn = codex._client.thread_resume( # noqa: SLF001 + thread.id, + ThreadResumeParams(thread_id=thread.id), + ) + + assert { + "after_turn_override": response_approval_policy(after_turn_override), + "after_omitted_turn": response_approval_policy(after_omitted_turn), + "final_responses": [ + first_result.final_response, + second_result.final_response, + ], + } == { + "after_turn_override": AskForApprovalValue.never.value, + "after_omitted_turn": AskForApprovalValue.never.value, + "final_responses": ["turn override", "turn inherited"], + } + + +def test_thread_run_approval_mode_persists_until_explicit_override(tmp_path) -> None: + """Omitted run approval mode should not rewrite the thread's stored setting.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message("locked down", response_id="approval-1") + harness.responses.enqueue_assistant_message("reviewable", response_id="approval-2") + + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start(approval_mode=ApprovalMode.deny_all) + + first_result = thread.run("keep approvals denied") + after_default_run = codex._client.thread_resume( # noqa: SLF001 + thread.id, + ThreadResumeParams(thread_id=thread.id), + ) + second_result = thread.run( + "allow auto review now", + approval_mode=ApprovalMode.auto_review, + ) + after_override_run = codex._client.thread_resume( # noqa: SLF001 + thread.id, + ThreadResumeParams(thread_id=thread.id), + ) + + assert { + "after_default_policy": response_approval_policy(after_default_run), + "after_override_policy": response_approval_policy(after_override_run), + "final_responses": [ + first_result.final_response, + second_result.final_response, + ], + } == { + "after_default_policy": AskForApprovalValue.never.value, + "after_override_policy": AskForApprovalValue.on_request.value, + "final_responses": ["locked down", "reviewable"], + } + + +def test_async_thread_run_approval_mode_persists_until_explicit_override( + tmp_path, +) -> None: + """Async omitted run approval mode should leave stored settings alone.""" + + async def scenario() -> None: + """Use the async client to verify persisted app-server approval state.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message( + "async locked down", + response_id="async-approval-1", + ) + harness.responses.enqueue_assistant_message( + "async reviewable", + response_id="async-approval-2", + ) + + async with AsyncCodex(config=harness.app_server_config()) as codex: + thread = await codex.thread_start(approval_mode=ApprovalMode.deny_all) + first_result = await thread.run("keep async approvals denied") + after_default_run = await codex._client.thread_resume( # noqa: SLF001 + thread.id, + ThreadResumeParams(thread_id=thread.id), + ) + second_result = await thread.run( + "allow async auto review now", + approval_mode=ApprovalMode.auto_review, + ) + after_override_run = await codex._client.thread_resume( # noqa: SLF001 + thread.id, + ThreadResumeParams(thread_id=thread.id), + ) + + assert { + "after_default_policy": response_approval_policy(after_default_run), + "after_override_policy": response_approval_policy(after_override_run), + "final_responses": [ + first_result.final_response, + second_result.final_response, + ], + } == { + "after_default_policy": AskForApprovalValue.never.value, + "after_override_policy": AskForApprovalValue.on_request.value, + "final_responses": ["async locked down", "async reviewable"], + } + + asyncio.run(scenario()) diff --git a/sdk/python/tests/test_pinned_app_server_input_integration.py b/sdk/python/tests/test_pinned_app_server_input_integration.py new file mode 100644 index 0000000000..f5d02cab20 --- /dev/null +++ b/sdk/python/tests/test_pinned_app_server_input_integration.py @@ -0,0 +1,74 @@ +from __future__ import annotations + +from app_server_harness import AppServerHarness +from openai_codex import Codex, ImageInput, LocalImageInput, TextInput +from pinned_app_server_helpers import TINY_PNG_BYTES + + +def test_remote_image_input_reaches_responses_api_through_pinned_app_server( + tmp_path, +) -> None: + """Remote image inputs should survive the SDK and app-server boundary.""" + remote_image_url = "https://example.com/codex.png" + + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message( + "remote image received", + response_id="remote-image", + ) + + with Codex(config=harness.app_server_config()) as codex: + result = codex.thread_start().run( + [ + TextInput("Describe the remote image."), + ImageInput(remote_image_url), + ] + ) + request = harness.responses.single_request() + + assert { + "final_response": result.final_response, + "contains_user_prompt": "Describe the remote image." + in request.message_input_texts("user"), + "image_urls": request.message_image_urls("user"), + } == { + "final_response": "remote image received", + "contains_user_prompt": True, + "image_urls": [remote_image_url], + } + + +def test_local_image_input_reaches_responses_api_through_pinned_app_server( + tmp_path, +) -> None: + """Local image inputs should become data URLs after crossing the app-server.""" + local_image = tmp_path / "local.png" + local_image.write_bytes(TINY_PNG_BYTES) + + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message( + "local image received", + response_id="local-image", + ) + + with Codex(config=harness.app_server_config()) as codex: + result = codex.thread_start().run( + [ + TextInput("Describe the local image."), + LocalImageInput(str(local_image)), + ] + ) + request = harness.responses.single_request() + + assert { + "final_response": result.final_response, + "contains_user_prompt": "Describe the local image." + in request.message_input_texts("user"), + "image_url_is_png_data_url": request.message_image_urls("user")[-1].startswith( + "data:image/png;base64," + ), + } == { + "final_response": "local image received", + "contains_user_prompt": True, + "image_url_is_png_data_url": True, + } diff --git a/sdk/python/tests/test_pinned_app_server_lifecycle_integration.py b/sdk/python/tests/test_pinned_app_server_lifecycle_integration.py new file mode 100644 index 0000000000..8f79e69c55 --- /dev/null +++ b/sdk/python/tests/test_pinned_app_server_lifecycle_integration.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +from app_server_harness import AppServerHarness +from openai_codex import Codex +from pinned_app_server_helpers import request_kind + + +def test_thread_set_name_and_read_use_pinned_app_server(tmp_path) -> None: + """Thread naming should round-trip through app-server JSON-RPC.""" + with AppServerHarness(tmp_path) as harness: + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + thread.set_name("sdk integration thread") + named = thread.read(include_turns=True) + + assert {"thread_name": named.thread.name} == { + "thread_name": "sdk integration thread", + } + + +def test_thread_fork_returns_distinct_thread_from_pinned_app_server(tmp_path) -> None: + """Thread fork should return a distinct thread from app-server.""" + with AppServerHarness(tmp_path) as harness: + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + forked = codex.thread_fork(thread.id) + + assert {"forked_is_distinct": forked.id != thread.id} == { + "forked_is_distinct": True, + } + + +def test_archive_unarchive_round_trip_uses_materialized_rollout(tmp_path) -> None: + """Archive helpers should work once the app-server has persisted a rollout.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message("materialized", response_id="archive-seed") + + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + seeded = thread.run("materialize this thread before archive") + archived = codex.thread_archive(thread.id) + unarchived = codex.thread_unarchive(thread.id) + read = unarchived.read() + + assert { + "seeded_response": seeded.final_response, + "archive_response": archived.model_dump(by_alias=True, mode="json"), + "unarchived_id": unarchived.id, + "read_id": read.thread.id, + } == { + "seeded_response": "materialized", + "archive_response": {}, + "unarchived_id": thread.id, + "read_id": thread.id, + } + + +def test_models_use_pinned_app_server_rpc(tmp_path) -> None: + """Model listing should go through the pinned app-server method.""" + with AppServerHarness(tmp_path) as harness: + with Codex(config=harness.app_server_config()) as codex: + models = codex.models(include_hidden=True) + + assert { + "models_payload_has_data": isinstance( + models.model_dump(by_alias=True, mode="json").get("data"), + list, + ), + } == {"models_payload_has_data": True} + + +def test_compact_uses_pinned_app_server_rpc_and_mock_responses(tmp_path) -> None: + """Compaction should run through app-server and hit the mock Responses boundary.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message("history", response_id="compact-history") + harness.responses.enqueue_assistant_message( + "compact summary", + response_id="compact-summary", + ) + + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + run_result = thread.run("create history") + compact_response = thread.compact() + requests = harness.responses.wait_for_requests(2) + + assert { + "run_final_response": run_result.final_response, + "compact_response": compact_response.model_dump( + by_alias=True, + mode="json", + ), + "request_kinds": [request_kind(request.path) for request in requests], + } == { + "run_final_response": "history", + "compact_response": {}, + "request_kinds": ["responses", "responses"], + } diff --git a/sdk/python/tests/test_pinned_app_server_run_integration.py b/sdk/python/tests/test_pinned_app_server_run_integration.py new file mode 100644 index 0000000000..27329764b4 --- /dev/null +++ b/sdk/python/tests/test_pinned_app_server_run_integration.py @@ -0,0 +1,307 @@ +from __future__ import annotations + +import asyncio + +import pytest + +from app_server_harness import ( + AppServerHarness, + ev_assistant_message, + ev_completed, + ev_failed, + ev_response_created, + sse, +) +from openai_codex import AsyncCodex, Codex +from openai_codex.generated.v2_all import MessagePhase +from pinned_app_server_helpers import ( + agent_message_texts_from_items, + assistant_message_with_phase, +) + + +def test_sync_thread_run_uses_pinned_app_server_and_mock_responses( + tmp_path, +) -> None: + """Drive Thread.run through the pinned app-server and inspect the HTTP request.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message("Hello from the mock.", response_id="run-1") + + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + result = thread.run("hello") + + request = harness.responses.single_request() + + body = request.body_json() + assert { + "final_response": result.final_response, + "agent_messages": agent_message_texts_from_items(result.items), + "has_usage": result.usage is not None, + "request_model": body["model"], + "request_stream": body["stream"], + "request_user_texts": request.message_input_texts("user")[-1:], + } == { + "final_response": "Hello from the mock.", + "agent_messages": ["Hello from the mock."], + "has_usage": True, + "request_model": "mock-model", + "request_stream": True, + "request_user_texts": ["hello"], + } + + +def test_async_thread_run_uses_pinned_app_server_and_mock_responses( + tmp_path, +) -> None: + """Async Thread.run should exercise the same app-server boundary.""" + + async def scenario() -> None: + """Run the async client against a real app-server process.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message( + "Hello async.", + response_id="async-run-1", + ) + + async with AsyncCodex(config=harness.app_server_config()) as codex: + thread = await codex.thread_start() + result = await thread.run("async hello") + + request = harness.responses.single_request() + + assert { + "final_response": result.final_response, + "agent_messages": agent_message_texts_from_items(result.items), + "request_user_texts": request.message_input_texts("user")[-1:], + } == { + "final_response": "Hello async.", + "agent_messages": ["Hello async."], + "request_user_texts": ["async hello"], + } + + asyncio.run(scenario()) + + +def test_sync_run_result_uses_last_unknown_phase_message(tmp_path) -> None: + """RunResult should use the last unknown-phase agent message as final text.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + sse( + [ + ev_response_created("items-last"), + ev_assistant_message("msg-items-first", "First message"), + ev_assistant_message("msg-items-second", "Second message"), + ev_completed("items-last"), + ] + ) + ) + + with Codex(config=harness.app_server_config()) as codex: + result = codex.thread_start().run("case: last unknown phase wins") + + assert { + "final_response": result.final_response, + "agent_messages": agent_message_texts_from_items(result.items), + } == { + "final_response": "Second message", + "agent_messages": ["First message", "Second message"], + } + + +def test_sync_run_result_preserves_empty_last_message(tmp_path) -> None: + """RunResult should preserve an empty final agent message instead of skipping it.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + sse( + [ + ev_response_created("items-empty"), + ev_assistant_message("msg-items-nonempty", "First message"), + ev_assistant_message("msg-items-empty", ""), + ev_completed("items-empty"), + ] + ) + ) + + with Codex(config=harness.app_server_config()) as codex: + result = codex.thread_start().run("case: empty last message") + + assert { + "final_response": result.final_response, + "agent_messages": agent_message_texts_from_items(result.items), + } == { + "final_response": "", + "agent_messages": ["First message", ""], + } + + +def test_sync_run_result_does_not_promote_commentary_only_to_final(tmp_path) -> None: + """RunResult final_response should stay unset when app-server marks only commentary.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + sse( + [ + ev_response_created("items-commentary"), + assistant_message_with_phase( + "msg-items-commentary", + "Commentary", + MessagePhase.commentary, + ), + ev_completed("items-commentary"), + ] + ) + ) + + with Codex(config=harness.app_server_config()) as codex: + result = codex.thread_start().run("case: commentary only") + + assert { + "final_response": result.final_response, + "agent_messages": agent_message_texts_from_items(result.items), + } == { + "final_response": None, + "agent_messages": ["Commentary"], + } + + +def test_async_run_result_uses_last_unknown_phase_message(tmp_path) -> None: + """Async RunResult should use the last unknown-phase agent message.""" + + async def scenario() -> None: + """Run one async result-mapping case against a pinned app-server.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + sse( + [ + ev_response_created("async-items-last"), + ev_assistant_message( + "msg-async-items-first", + "First async message", + ), + ev_assistant_message( + "msg-async-items-second", + "Second async message", + ), + ev_completed("async-items-last"), + ] + ) + ) + + async with AsyncCodex(config=harness.app_server_config()) as codex: + result = await (await codex.thread_start()).run( + "case: async last unknown phase" + ) + + assert { + "final_response": result.final_response, + "agent_messages": agent_message_texts_from_items(result.items), + } == { + "final_response": "Second async message", + "agent_messages": ["First async message", "Second async message"], + } + + asyncio.run(scenario()) + + +def test_async_run_result_does_not_promote_commentary_only_to_final( + tmp_path, +) -> None: + """Async RunResult final_response should stay unset for commentary-only output.""" + + async def scenario() -> None: + """Run one async commentary mapping case against a pinned app-server.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + sse( + [ + ev_response_created("async-items-commentary"), + assistant_message_with_phase( + "msg-async-items-commentary", + "Async commentary", + MessagePhase.commentary, + ), + ev_completed("async-items-commentary"), + ] + ) + ) + + async with AsyncCodex(config=harness.app_server_config()) as codex: + result = await (await codex.thread_start()).run( + "case: async commentary only" + ) + + assert { + "final_response": result.final_response, + "agent_messages": agent_message_texts_from_items(result.items), + } == { + "final_response": None, + "agent_messages": ["Async commentary"], + } + + asyncio.run(scenario()) + + +def test_thread_run_raises_when_real_app_server_reports_failed_turn(tmp_path) -> None: + """Thread.run should surface the failed turn error emitted by app-server.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + sse( + [ + ev_response_created("failed-run"), + ev_failed("failed-run", "boom from mock model"), + ] + ) + ) + + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + with pytest.raises(RuntimeError, match="boom from mock model"): + thread.run("trigger failure") + + +def test_final_answer_phase_survives_real_app_server_mapping(tmp_path) -> None: + """RunResult should use the final-answer item emitted by app-server.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + sse( + [ + ev_response_created("phase-1"), + { + **ev_assistant_message("msg-commentary", "Commentary"), + "item": { + **ev_assistant_message("msg-commentary", "Commentary")["item"], + "phase": MessagePhase.commentary.value, + }, + }, + { + **ev_assistant_message("msg-final", "Final answer"), + "item": { + **ev_assistant_message("msg-final", "Final answer")["item"], + "phase": MessagePhase.final_answer.value, + }, + }, + ev_completed("phase-1"), + ] + ) + ) + + with Codex(config=harness.app_server_config()) as codex: + result = codex.thread_start().run("choose final answer") + + assert { + "final_response": result.final_response, + "items": [ + { + "text": item.root.text, + "phase": None if item.root.phase is None else item.root.phase.value, + } + for item in result.items + if item.root.type == "agentMessage" + ], + } == { + "final_response": "Final answer", + "items": [ + {"text": "Commentary", "phase": MessagePhase.commentary.value}, + {"text": "Final answer", "phase": MessagePhase.final_answer.value}, + ], + } diff --git a/sdk/python/tests/test_pinned_app_server_stream_integration.py b/sdk/python/tests/test_pinned_app_server_stream_integration.py new file mode 100644 index 0000000000..3820c25d07 --- /dev/null +++ b/sdk/python/tests/test_pinned_app_server_stream_integration.py @@ -0,0 +1,266 @@ +from __future__ import annotations + +import asyncio + +from app_server_harness import AppServerHarness +from openai_codex import AsyncCodex, Codex, TextInput +from openai_codex.generated.v2_all import ( + AgentMessageDeltaNotification, + TurnCompletedNotification, + TurnStatus, +) +from pinned_app_server_helpers import ( + agent_message_texts, + next_async_delta, + next_sync_delta, + streaming_response, +) + + +def test_sync_stream_routes_text_deltas_and_completion(tmp_path) -> None: + """A sync turn stream should expose deltas, completed items, and completion.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + streaming_response("stream-1", "msg-stream-1", ["hel", "lo"]) + ) + + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + stream = thread.turn(TextInput("stream please")).stream() + events = list(stream) + + assert { + "deltas": [ + event.payload.delta + for event in events + if isinstance(event.payload, AgentMessageDeltaNotification) + ], + "agent_messages": agent_message_texts(events), + "completed_statuses": [ + event.payload.turn.status + for event in events + if isinstance(event.payload, TurnCompletedNotification) + ], + } == { + "deltas": ["hel", "lo"], + "agent_messages": ["hello"], + "completed_statuses": [TurnStatus.completed], + } + + +def test_turn_run_returns_completed_turn_from_pinned_app_server(tmp_path) -> None: + """TurnHandle.run should wait for the app-server completion notification.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message("turn complete", response_id="turn-run-1") + + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + turn = thread.turn(TextInput("complete this turn")) + completed = turn.run() + + assert { + "turn_id": completed.id, + "status": completed.status, + "items": completed.items, + } == { + "turn_id": turn.id, + "status": TurnStatus.completed, + "items": [], + } + + +def test_async_stream_routes_text_deltas_and_completion(tmp_path) -> None: + """An async turn stream should expose the same notification sequence.""" + + async def scenario() -> None: + """Stream one async turn against the real pinned app-server.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + streaming_response("async-stream-1", "msg-async-stream-1", ["as", "ync"]) + ) + + async with AsyncCodex(config=harness.app_server_config()) as codex: + thread = await codex.thread_start() + turn = await thread.turn(TextInput("async stream please")) + events = [event async for event in turn.stream()] + + assert { + "deltas": [ + event.payload.delta + for event in events + if isinstance(event.payload, AgentMessageDeltaNotification) + ], + "agent_messages": agent_message_texts(events), + "completed_statuses": [ + event.payload.turn.status + for event in events + if isinstance(event.payload, TurnCompletedNotification) + ], + } == { + "deltas": ["as", "ync"], + "agent_messages": ["async"], + "completed_statuses": [TurnStatus.completed], + } + + asyncio.run(scenario()) + + +def test_low_level_sync_stream_text_uses_real_turn_routing(tmp_path) -> None: + """AppServerClient.stream_text should stream through a real app-server turn.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + streaming_response("low-sync-stream", "msg-low-sync-stream", ["fir", "st"]) + ) + + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + chunks = list(codex._client.stream_text(thread.id, "low-level sync")) # noqa: SLF001 + + assert [chunk.delta for chunk in chunks] == ["fir", "st"] + + +def test_low_level_async_stream_text_allows_parallel_model_list(tmp_path) -> None: + """Async stream_text should yield without blocking another app-server request.""" + + async def scenario() -> None: + """Leave a stream open while another async request completes.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + streaming_response( + "low-async-stream", + "msg-low-async-stream", + ["one", "two", "three"], + ), + delay_between_events_s=0.03, + ) + + async with AsyncCodex(config=harness.app_server_config()) as codex: + thread = await codex.thread_start() + stream = codex._client.stream_text( # noqa: SLF001 + thread.id, + "low-level async", + ) + first = await anext(stream) + models_task = asyncio.create_task(codex.models()) + models = await asyncio.wait_for(models_task, timeout=1.0) + remaining = [chunk.delta async for chunk in stream] + + assert { + "first": first.delta, + "remaining": remaining, + "models_payload_has_data": isinstance( + models.model_dump(by_alias=True, mode="json").get("data"), + list, + ), + } == { + "first": "one", + "remaining": ["two", "three"], + "models_payload_has_data": True, + } + + asyncio.run(scenario()) + + +def test_interleaved_sync_turn_streams_route_by_turn_id(tmp_path) -> None: + """Two sync streams on one client should consume only their own notifications.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + streaming_response("first-stream", "msg-first", ["one-", "done"]), + delay_between_events_s=0.01, + ) + harness.responses.enqueue_sse( + streaming_response("second-stream", "msg-second", ["two-", "done"]), + delay_between_events_s=0.01, + ) + + with Codex(config=harness.app_server_config()) as codex: + first_thread = codex.thread_start() + second_thread = codex.thread_start() + first_turn = first_thread.turn(TextInput("first")) + second_turn = second_thread.turn(TextInput("second")) + + first_stream = first_turn.stream() + second_stream = second_turn.stream() + first_first_delta = next_sync_delta(first_stream) + second_first_delta = next_sync_delta(second_stream) + first_second_delta = next_sync_delta(first_stream) + second_second_delta = next_sync_delta(second_stream) + first_tail = list(first_stream) + second_tail = list(second_stream) + + assert { + "streams": sorted( + [ + ( + first_first_delta, + first_second_delta, + agent_message_texts(first_tail), + ), + ( + second_first_delta, + second_second_delta, + agent_message_texts(second_tail), + ), + ] + ), + } == { + "streams": [ + ("one-", "done", ["one-done"]), + ("two-", "done", ["two-done"]), + ], + } + + +def test_interleaved_async_turn_streams_route_by_turn_id(tmp_path) -> None: + """Two async streams on one client should consume only their own notifications.""" + + async def scenario() -> None: + """Interleave async stream consumers against one app-server process.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + streaming_response("async-first", "msg-async-first", ["a1", "-done"]), + delay_between_events_s=0.01, + ) + harness.responses.enqueue_sse( + streaming_response("async-second", "msg-async-second", ["a2", "-done"]), + delay_between_events_s=0.01, + ) + + async with AsyncCodex(config=harness.app_server_config()) as codex: + first_thread = await codex.thread_start() + second_thread = await codex.thread_start() + first_turn = await first_thread.turn(TextInput("async first")) + second_turn = await second_thread.turn(TextInput("async second")) + + first_stream = first_turn.stream() + second_stream = second_turn.stream() + first_first_delta = await next_async_delta(first_stream) + second_first_delta = await next_async_delta(second_stream) + first_second_delta = await next_async_delta(first_stream) + second_second_delta = await next_async_delta(second_stream) + first_tail = [event async for event in first_stream] + second_tail = [event async for event in second_stream] + + assert { + "streams": sorted( + [ + ( + first_first_delta, + first_second_delta, + agent_message_texts(first_tail), + ), + ( + second_first_delta, + second_second_delta, + agent_message_texts(second_tail), + ), + ] + ), + } == { + "streams": [ + ("a1", "-done", ["a1-done"]), + ("a2", "-done", ["a2-done"]), + ], + } + + asyncio.run(scenario()) diff --git a/sdk/python/tests/test_pinned_app_server_turn_controls_integration.py b/sdk/python/tests/test_pinned_app_server_turn_controls_integration.py new file mode 100644 index 0000000000..ab3fec8285 --- /dev/null +++ b/sdk/python/tests/test_pinned_app_server_turn_controls_integration.py @@ -0,0 +1,82 @@ +from __future__ import annotations + +from app_server_harness import AppServerHarness +from openai_codex import Codex, TextInput +from openai_codex.generated.v2_all import TurnStatus +from pinned_app_server_helpers import agent_message_texts, streaming_response + + +def test_turn_steer_adds_follow_up_input_through_pinned_app_server(tmp_path) -> None: + """Steering an active turn should create a follow-up Responses request.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + streaming_response("steer-first", "msg-steer-first", ["before steer"]), + delay_between_events_s=0.2, + ) + harness.responses.enqueue_assistant_message( + "after steer", + response_id="steer-second", + ) + + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + turn = thread.turn(TextInput("Start a steerable turn.")) + harness.responses.wait_for_requests(1) + steer = turn.steer(TextInput("Use this steering input.")) + events = list(turn.stream()) + requests = harness.responses.wait_for_requests(2) + + assert { + "steered_turn_id": steer.turn_id, + "turn_id": turn.id, + "agent_messages": agent_message_texts(events), + "last_user_texts": [ + request.message_input_texts("user")[-1] for request in requests + ], + } == { + "steered_turn_id": turn.id, + "turn_id": turn.id, + "agent_messages": ["before steer", "after steer"], + "last_user_texts": [ + "Start a steerable turn.", + "Use this steering input.", + ], + } + + +def test_turn_interrupt_stops_active_turn_and_follow_up_runs(tmp_path) -> None: + """Interrupting an active turn should complete it and leave the thread usable.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + streaming_response( + "interrupt-first", + "msg-interrupt-first", + ["still ", "running"], + ), + delay_between_events_s=0.2, + ) + harness.responses.enqueue_assistant_message( + "after interrupt", + response_id="interrupt-follow-up", + ) + + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + interrupted_turn = thread.turn(TextInput("Start a long turn.")) + harness.responses.wait_for_requests(1) + interrupt_response = interrupted_turn.interrupt() + completed = interrupted_turn.run() + follow_up = thread.run("Continue after the interrupt.") + + assert { + "interrupt_response": interrupt_response.model_dump( + by_alias=True, + mode="json", + ), + "interrupted_status": completed.status, + "follow_up": follow_up.final_response, + } == { + "interrupt_response": {}, + "interrupted_status": TurnStatus.interrupted, + "follow_up": "after interrupt", + }