Add more SDK app-server integration coverage

Add new harness coverage for multimodal inputs, active turn controls, and archive lifecycle behavior through the pinned app-server.

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Ahmed Ibrahim
2026-05-10 14:26:40 +03:00
parent feffa481bd
commit ad23385e39
2 changed files with 258 additions and 1 deletions

View File

@@ -55,6 +55,29 @@ class CapturedResponsesRequest:
texts.append(text)
return texts
def message_content_items(self, role: str) -> list[Json]:
"""Return structured content items for message inputs matching one role."""
items: list[Json] = []
for item in self.input():
if item.get("type") != "message" or item.get("role") != role:
continue
content = item.get("content")
if not isinstance(content, list):
continue
items.extend(part for part in content if isinstance(part, dict))
return items
def message_image_urls(self, role: str) -> list[str]:
"""Return all input_image URLs for message inputs matching one role."""
urls: list[str] = []
for item in self.message_content_items(role):
if item.get("type") != "input_image":
continue
image_url = item.get("image_url")
if isinstance(image_url, str):
urls.append(image_url)
return urls
def header(self, name: str) -> str | None:
"""Return a captured request header by case-insensitive name."""
return self.headers.get(name.lower())

View File

@@ -16,7 +16,14 @@ from app_server_harness import (
ev_response_created,
sse,
)
from openai_codex import ApprovalMode, AsyncCodex, Codex, TextInput
from openai_codex import (
ApprovalMode,
AsyncCodex,
Codex,
ImageInput,
LocalImageInput,
TextInput,
)
from openai_codex.generated.v2_all import (
AgentMessageDeltaNotification,
AskForApprovalValue,
@@ -28,6 +35,79 @@ from openai_codex.generated.v2_all import (
)
from openai_codex.models import Notification
TINY_PNG_BYTES = bytes(
[
137,
80,
78,
71,
13,
10,
26,
10,
0,
0,
0,
13,
73,
72,
68,
82,
0,
0,
0,
1,
0,
0,
0,
1,
8,
6,
0,
0,
0,
31,
21,
196,
137,
0,
0,
0,
11,
73,
68,
65,
84,
120,
156,
99,
96,
0,
2,
0,
0,
5,
0,
1,
122,
94,
171,
63,
0,
0,
0,
0,
73,
69,
78,
68,
174,
66,
96,
130,
]
)
def _response_approval_policy(response: Any) -> str:
"""Return serialized approvalPolicy from a generated thread response."""
@@ -327,6 +407,64 @@ def test_async_run_result_item_semantics_use_real_app_server(tmp_path) -> None:
asyncio.run(scenario())
def test_multimodal_inputs_reach_responses_api_through_real_app_server(
tmp_path,
) -> None:
"""Remote and local image inputs should survive the SDK and app-server boundary."""
local_image = tmp_path / "local.png"
local_image.write_bytes(TINY_PNG_BYTES)
remote_image_url = "https://example.com/codex.png"
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_assistant_message(
"remote image received",
response_id="remote-image",
)
harness.responses.enqueue_assistant_message(
"local image received",
response_id="local-image",
)
with Codex(config=harness.app_server_config()) as codex:
thread = codex.thread_start()
remote_result = thread.run(
[
TextInput("Describe the remote image."),
ImageInput(remote_image_url),
]
)
local_result = thread.run(
[
TextInput("Describe the local image."),
LocalImageInput(str(local_image)),
]
)
requests = harness.responses.wait_for_requests(2)
assert {
"final_responses": [
remote_result.final_response,
local_result.final_response,
],
"first_user_texts": [
request.message_input_texts("user")[0] for request in requests
],
"image_url_shapes": [
requests[0].message_image_urls("user")[0],
requests[1].message_image_urls("user")[0].startswith(
"data:image/png;base64,"
),
],
} == {
"final_responses": ["remote image received", "local image received"],
"first_user_texts": [
"Describe the remote image.",
"Describe the local image.",
],
"image_url_shapes": [remote_image_url, True],
}
def test_sync_stream_routes_text_deltas_and_completion(tmp_path) -> None:
"""A sync turn stream should expose deltas, completed items, and completion."""
with AppServerHarness(tmp_path) as harness:
@@ -471,6 +609,82 @@ def test_low_level_async_stream_text_allows_parallel_model_list(tmp_path) -> Non
asyncio.run(scenario())
def test_turn_steer_adds_follow_up_input_through_real_app_server(tmp_path) -> None:
"""Steering an active turn should create a follow-up Responses request."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(
_streaming_response("steer-first", "msg-steer-first", ["before steer"]),
delay_between_events_s=0.2,
)
harness.responses.enqueue_assistant_message(
"after steer",
response_id="steer-second",
)
with Codex(config=harness.app_server_config()) as codex:
thread = codex.thread_start()
turn = thread.turn(TextInput("Start a steerable turn."))
harness.responses.wait_for_requests(1)
steer = turn.steer(TextInput("Use this steering input."))
events = list(turn.stream())
requests = harness.responses.wait_for_requests(2)
assert {
"steered_turn_id": steer.turn_id,
"turn_id": turn.id,
"agent_messages": _agent_message_texts(events),
"last_user_texts": [
request.message_input_texts("user")[-1] for request in requests
],
} == {
"steered_turn_id": turn.id,
"turn_id": turn.id,
"agent_messages": ["before steer", "after steer"],
"last_user_texts": [
"Start a steerable turn.",
"Use this steering input.",
],
}
def test_turn_interrupt_stops_active_turn_and_follow_up_runs(tmp_path) -> None:
"""Interrupting an active turn should complete it and leave the thread usable."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(
_streaming_response(
"interrupt-first",
"msg-interrupt-first",
["still ", "running"],
),
delay_between_events_s=0.2,
)
harness.responses.enqueue_assistant_message(
"after interrupt",
response_id="interrupt-follow-up",
)
with Codex(config=harness.app_server_config()) as codex:
thread = codex.thread_start()
interrupted_turn = thread.turn(TextInput("Start a long turn."))
harness.responses.wait_for_requests(1)
interrupt_response = interrupted_turn.interrupt()
completed = interrupted_turn.run()
follow_up = thread.run("Continue after the interrupt.")
assert {
"interrupt_response": interrupt_response.model_dump(
by_alias=True,
mode="json",
),
"interrupted_status": completed.status,
"follow_up": follow_up.final_response,
} == {
"interrupt_response": {},
"interrupted_status": TurnStatus.interrupted,
"follow_up": "after interrupt",
}
def test_interleaved_sync_turn_streams_route_by_turn_id(tmp_path) -> None:
"""Two sync streams on one client should consume only their own notifications."""
with AppServerHarness(tmp_path) as harness:
@@ -756,6 +970,26 @@ def test_thread_lifecycle_uses_real_app_server_without_model_mocking(tmp_path) -
}
def test_archive_unarchive_round_trip_uses_real_app_server(tmp_path) -> None:
"""Archive helpers should use real app-server lifecycle RPCs."""
with AppServerHarness(tmp_path) as harness:
with Codex(config=harness.app_server_config()) as codex:
thread = codex.thread_start()
archived = codex.thread_archive(thread.id)
unarchived = codex.thread_unarchive(thread.id)
read = unarchived.read()
assert {
"archive_response": archived.model_dump(by_alias=True, mode="json"),
"unarchived_id": unarchived.id,
"read_id": read.thread.id,
} == {
"archive_response": {},
"unarchived_id": thread.id,
"read_id": thread.id,
}
def test_models_and_compact_use_real_app_server_rpcs(tmp_path) -> None:
"""Model listing and compaction should go through real app-server methods."""
with AppServerHarness(tmp_path) as harness: