From ad23385e39c793aecb4e21021dd073f2b9b6808d Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Sun, 10 May 2026 14:26:40 +0300 Subject: [PATCH] Add more SDK app-server integration coverage Add new harness coverage for multimodal inputs, active turn controls, and archive lifecycle behavior through the pinned app-server. Co-authored-by: Codex --- sdk/python/tests/app_server_harness.py | 23 ++ .../tests/test_mock_app_server_integration.py | 236 +++++++++++++++++- 2 files changed, 258 insertions(+), 1 deletion(-) diff --git a/sdk/python/tests/app_server_harness.py b/sdk/python/tests/app_server_harness.py index 7bc6469727..bd62e63eda 100644 --- a/sdk/python/tests/app_server_harness.py +++ b/sdk/python/tests/app_server_harness.py @@ -55,6 +55,29 @@ class CapturedResponsesRequest: texts.append(text) return texts + def message_content_items(self, role: str) -> list[Json]: + """Return structured content items for message inputs matching one role.""" + items: list[Json] = [] + for item in self.input(): + if item.get("type") != "message" or item.get("role") != role: + continue + content = item.get("content") + if not isinstance(content, list): + continue + items.extend(part for part in content if isinstance(part, dict)) + return items + + def message_image_urls(self, role: str) -> list[str]: + """Return all input_image URLs for message inputs matching one role.""" + urls: list[str] = [] + for item in self.message_content_items(role): + if item.get("type") != "input_image": + continue + image_url = item.get("image_url") + if isinstance(image_url, str): + urls.append(image_url) + return urls + def header(self, name: str) -> str | None: """Return a captured request header by case-insensitive name.""" return self.headers.get(name.lower()) diff --git a/sdk/python/tests/test_mock_app_server_integration.py b/sdk/python/tests/test_mock_app_server_integration.py index 1a241ec92f..133e3f8627 100644 --- a/sdk/python/tests/test_mock_app_server_integration.py +++ b/sdk/python/tests/test_mock_app_server_integration.py @@ -16,7 +16,14 @@ from app_server_harness import ( ev_response_created, sse, ) -from openai_codex import ApprovalMode, AsyncCodex, Codex, TextInput +from openai_codex import ( + ApprovalMode, + AsyncCodex, + Codex, + ImageInput, + LocalImageInput, + TextInput, +) from openai_codex.generated.v2_all import ( AgentMessageDeltaNotification, AskForApprovalValue, @@ -28,6 +35,79 @@ from openai_codex.generated.v2_all import ( ) from openai_codex.models import Notification +TINY_PNG_BYTES = bytes( + [ + 137, + 80, + 78, + 71, + 13, + 10, + 26, + 10, + 0, + 0, + 0, + 13, + 73, + 72, + 68, + 82, + 0, + 0, + 0, + 1, + 0, + 0, + 0, + 1, + 8, + 6, + 0, + 0, + 0, + 31, + 21, + 196, + 137, + 0, + 0, + 0, + 11, + 73, + 68, + 65, + 84, + 120, + 156, + 99, + 96, + 0, + 2, + 0, + 0, + 5, + 0, + 1, + 122, + 94, + 171, + 63, + 0, + 0, + 0, + 0, + 73, + 69, + 78, + 68, + 174, + 66, + 96, + 130, + ] +) + def _response_approval_policy(response: Any) -> str: """Return serialized approvalPolicy from a generated thread response.""" @@ -327,6 +407,64 @@ def test_async_run_result_item_semantics_use_real_app_server(tmp_path) -> None: asyncio.run(scenario()) +def test_multimodal_inputs_reach_responses_api_through_real_app_server( + tmp_path, +) -> None: + """Remote and local image inputs should survive the SDK and app-server boundary.""" + local_image = tmp_path / "local.png" + local_image.write_bytes(TINY_PNG_BYTES) + remote_image_url = "https://example.com/codex.png" + + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message( + "remote image received", + response_id="remote-image", + ) + harness.responses.enqueue_assistant_message( + "local image received", + response_id="local-image", + ) + + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + remote_result = thread.run( + [ + TextInput("Describe the remote image."), + ImageInput(remote_image_url), + ] + ) + local_result = thread.run( + [ + TextInput("Describe the local image."), + LocalImageInput(str(local_image)), + ] + ) + requests = harness.responses.wait_for_requests(2) + + assert { + "final_responses": [ + remote_result.final_response, + local_result.final_response, + ], + "first_user_texts": [ + request.message_input_texts("user")[0] for request in requests + ], + "image_url_shapes": [ + requests[0].message_image_urls("user")[0], + requests[1].message_image_urls("user")[0].startswith( + "data:image/png;base64," + ), + ], + } == { + "final_responses": ["remote image received", "local image received"], + "first_user_texts": [ + "Describe the remote image.", + "Describe the local image.", + ], + "image_url_shapes": [remote_image_url, True], + } + + def test_sync_stream_routes_text_deltas_and_completion(tmp_path) -> None: """A sync turn stream should expose deltas, completed items, and completion.""" with AppServerHarness(tmp_path) as harness: @@ -471,6 +609,82 @@ def test_low_level_async_stream_text_allows_parallel_model_list(tmp_path) -> Non asyncio.run(scenario()) +def test_turn_steer_adds_follow_up_input_through_real_app_server(tmp_path) -> None: + """Steering an active turn should create a follow-up Responses request.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + _streaming_response("steer-first", "msg-steer-first", ["before steer"]), + delay_between_events_s=0.2, + ) + harness.responses.enqueue_assistant_message( + "after steer", + response_id="steer-second", + ) + + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + turn = thread.turn(TextInput("Start a steerable turn.")) + harness.responses.wait_for_requests(1) + steer = turn.steer(TextInput("Use this steering input.")) + events = list(turn.stream()) + requests = harness.responses.wait_for_requests(2) + + assert { + "steered_turn_id": steer.turn_id, + "turn_id": turn.id, + "agent_messages": _agent_message_texts(events), + "last_user_texts": [ + request.message_input_texts("user")[-1] for request in requests + ], + } == { + "steered_turn_id": turn.id, + "turn_id": turn.id, + "agent_messages": ["before steer", "after steer"], + "last_user_texts": [ + "Start a steerable turn.", + "Use this steering input.", + ], + } + + +def test_turn_interrupt_stops_active_turn_and_follow_up_runs(tmp_path) -> None: + """Interrupting an active turn should complete it and leave the thread usable.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + _streaming_response( + "interrupt-first", + "msg-interrupt-first", + ["still ", "running"], + ), + delay_between_events_s=0.2, + ) + harness.responses.enqueue_assistant_message( + "after interrupt", + response_id="interrupt-follow-up", + ) + + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + interrupted_turn = thread.turn(TextInput("Start a long turn.")) + harness.responses.wait_for_requests(1) + interrupt_response = interrupted_turn.interrupt() + completed = interrupted_turn.run() + follow_up = thread.run("Continue after the interrupt.") + + assert { + "interrupt_response": interrupt_response.model_dump( + by_alias=True, + mode="json", + ), + "interrupted_status": completed.status, + "follow_up": follow_up.final_response, + } == { + "interrupt_response": {}, + "interrupted_status": TurnStatus.interrupted, + "follow_up": "after interrupt", + } + + def test_interleaved_sync_turn_streams_route_by_turn_id(tmp_path) -> None: """Two sync streams on one client should consume only their own notifications.""" with AppServerHarness(tmp_path) as harness: @@ -756,6 +970,26 @@ def test_thread_lifecycle_uses_real_app_server_without_model_mocking(tmp_path) - } +def test_archive_unarchive_round_trip_uses_real_app_server(tmp_path) -> None: + """Archive helpers should use real app-server lifecycle RPCs.""" + with AppServerHarness(tmp_path) as harness: + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + archived = codex.thread_archive(thread.id) + unarchived = codex.thread_unarchive(thread.id) + read = unarchived.read() + + assert { + "archive_response": archived.model_dump(by_alias=True, mode="json"), + "unarchived_id": unarchived.id, + "read_id": read.thread.id, + } == { + "archive_response": {}, + "unarchived_id": thread.id, + "read_id": thread.id, + } + + def test_models_and_compact_use_real_app_server_rpcs(tmp_path) -> None: """Model listing and compaction should go through real app-server methods.""" with AppServerHarness(tmp_path) as harness: