Split pinned app-server integration tests by behavior

Break the large integration test module into focused run, input, stream, turn-control, approval-mode, and lifecycle files with shared helpers for the mock Responses boundary.

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Ahmed Ibrahim
2026-05-10 14:47:06 +03:00
parent daf4694e4a
commit 57edbbf4eb
8 changed files with 1202 additions and 1076 deletions

View File

@@ -0,0 +1,168 @@
from __future__ import annotations
from collections.abc import AsyncIterator, Iterable, Iterator
from typing import Any
from app_server_harness import (
ev_assistant_message,
ev_completed,
ev_message_item_added,
ev_output_text_delta,
ev_response_created,
sse,
)
from openai_codex.generated.v2_all import (
AgentMessageDeltaNotification,
ItemCompletedNotification,
MessagePhase,
)
from openai_codex.models import Notification
TINY_PNG_BYTES = bytes(
[
137,
80,
78,
71,
13,
10,
26,
10,
0,
0,
0,
13,
73,
72,
68,
82,
0,
0,
0,
1,
0,
0,
0,
1,
8,
6,
0,
0,
0,
31,
21,
196,
137,
0,
0,
0,
11,
73,
68,
65,
84,
120,
156,
99,
96,
0,
2,
0,
0,
5,
0,
1,
122,
94,
171,
63,
0,
0,
0,
0,
73,
69,
78,
68,
174,
66,
96,
130,
]
)
def response_approval_policy(response: Any) -> str:
"""Return serialized approvalPolicy from a generated thread response."""
return response.model_dump(by_alias=True, mode="json")["approvalPolicy"]
def agent_message_texts(events: list[Notification]) -> list[str]:
"""Extract completed agent-message text from SDK notifications."""
texts: list[str] = []
for event in events:
if not isinstance(event.payload, ItemCompletedNotification):
continue
item = event.payload.item.root
if item.type == "agentMessage":
texts.append(item.text)
return texts
def agent_message_texts_from_items(items: Iterable[Any]) -> list[str]:
"""Extract agent-message text from completed run result items."""
texts: list[str] = []
for item in items:
root = item.root
if root.type == "agentMessage":
texts.append(root.text)
return texts
def next_sync_delta(stream: Iterator[Notification]) -> str:
"""Advance a sync turn stream until the next agent-message text delta."""
for event in stream:
if isinstance(event.payload, AgentMessageDeltaNotification):
return event.payload.delta
raise AssertionError("stream completed before an agent-message delta")
async def next_async_delta(stream: AsyncIterator[Notification]) -> str:
"""Advance an async turn stream until the next agent-message text delta."""
async for event in stream:
if isinstance(event.payload, AgentMessageDeltaNotification):
return event.payload.delta
raise AssertionError("stream completed before an agent-message delta")
def streaming_response(response_id: str, item_id: str, parts: list[str]) -> str:
"""Build an SSE stream with text deltas and a final assistant message."""
return sse(
[
ev_response_created(response_id),
ev_message_item_added(item_id),
*[ev_output_text_delta(part) for part in parts],
ev_assistant_message(item_id, "".join(parts)),
ev_completed(response_id),
]
)
def assistant_message_with_phase(
item_id: str,
text: str,
phase: MessagePhase,
) -> dict[str, Any]:
"""Build an assistant message event carrying app-server phase metadata."""
event = ev_assistant_message(item_id, text)
event["item"] = {**event["item"], "phase": phase.value}
return event
def request_kind(request_path: str) -> str:
"""Classify captured mock-server request paths for compact assertions."""
if request_path.endswith("/responses/compact"):
return "compact"
if request_path.endswith("/responses"):
return "responses"
return request_path

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,207 @@
from __future__ import annotations
import asyncio
from app_server_harness import AppServerHarness
from openai_codex import ApprovalMode, AsyncCodex, Codex
from openai_codex.generated.v2_all import AskForApprovalValue, ThreadResumeParams
from pinned_app_server_helpers import response_approval_policy
def test_thread_resume_inherits_deny_all_approval_mode(tmp_path) -> None:
"""Resuming a thread should preserve its stored approval mode."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_assistant_message("source seeded", response_id="resume-mode")
with Codex(config=harness.app_server_config()) as codex:
source = codex.thread_start(approval_mode=ApprovalMode.deny_all)
result = source.run("seed the source rollout")
resumed = codex.thread_resume(source.id)
resumed_state = codex._client.thread_resume( # noqa: SLF001
resumed.id,
ThreadResumeParams(thread_id=resumed.id),
)
assert {
"final_response": result.final_response,
"resumed_policy": response_approval_policy(resumed_state),
} == {
"final_response": "source seeded",
"resumed_policy": AskForApprovalValue.never.value,
}
def test_thread_fork_inherits_deny_all_approval_mode(tmp_path) -> None:
"""Forking without an override should preserve the source approval mode."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_assistant_message("source seeded", response_id="fork-mode")
with Codex(config=harness.app_server_config()) as codex:
source = codex.thread_start(approval_mode=ApprovalMode.deny_all)
result = source.run("seed the source rollout")
forked = codex.thread_fork(source.id)
forked_state = codex._client.thread_resume( # noqa: SLF001
forked.id,
ThreadResumeParams(thread_id=forked.id),
)
assert {
"final_response": result.final_response,
"forked_is_distinct": forked.id != source.id,
"forked_policy": response_approval_policy(forked_state),
} == {
"final_response": "source seeded",
"forked_is_distinct": True,
"forked_policy": AskForApprovalValue.never.value,
}
def test_thread_fork_can_override_approval_mode(tmp_path) -> None:
"""Forking with an explicit approval mode should send an override."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_assistant_message(
"source seeded",
response_id="fork-override-mode",
)
with Codex(config=harness.app_server_config()) as codex:
source = codex.thread_start(approval_mode=ApprovalMode.deny_all)
result = source.run("seed the source rollout")
forked = codex.thread_fork(
source.id,
approval_mode=ApprovalMode.auto_review,
)
forked_state = codex._client.thread_resume( # noqa: SLF001
forked.id,
ThreadResumeParams(thread_id=forked.id),
)
assert {
"final_response": result.final_response,
"forked_policy": response_approval_policy(forked_state),
} == {
"final_response": "source seeded",
"forked_policy": AskForApprovalValue.on_request.value,
}
def test_turn_approval_mode_persists_until_next_turn(tmp_path) -> None:
"""A turn-level approval override should apply to later omitted-arg turns."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_assistant_message("turn override", response_id="turn-mode-1")
harness.responses.enqueue_assistant_message("turn inherited", response_id="turn-mode-2")
with Codex(config=harness.app_server_config()) as codex:
thread = codex.thread_start()
first_result = thread.run(
"deny this and later turns",
approval_mode=ApprovalMode.deny_all,
)
after_turn_override = codex._client.thread_resume( # noqa: SLF001
thread.id,
ThreadResumeParams(thread_id=thread.id),
)
second_result = thread.run("inherit previous approval mode")
after_omitted_turn = codex._client.thread_resume( # noqa: SLF001
thread.id,
ThreadResumeParams(thread_id=thread.id),
)
assert {
"after_turn_override": response_approval_policy(after_turn_override),
"after_omitted_turn": response_approval_policy(after_omitted_turn),
"final_responses": [
first_result.final_response,
second_result.final_response,
],
} == {
"after_turn_override": AskForApprovalValue.never.value,
"after_omitted_turn": AskForApprovalValue.never.value,
"final_responses": ["turn override", "turn inherited"],
}
def test_thread_run_approval_mode_persists_until_explicit_override(tmp_path) -> None:
"""Omitted run approval mode should not rewrite the thread's stored setting."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_assistant_message("locked down", response_id="approval-1")
harness.responses.enqueue_assistant_message("reviewable", response_id="approval-2")
with Codex(config=harness.app_server_config()) as codex:
thread = codex.thread_start(approval_mode=ApprovalMode.deny_all)
first_result = thread.run("keep approvals denied")
after_default_run = codex._client.thread_resume( # noqa: SLF001
thread.id,
ThreadResumeParams(thread_id=thread.id),
)
second_result = thread.run(
"allow auto review now",
approval_mode=ApprovalMode.auto_review,
)
after_override_run = codex._client.thread_resume( # noqa: SLF001
thread.id,
ThreadResumeParams(thread_id=thread.id),
)
assert {
"after_default_policy": response_approval_policy(after_default_run),
"after_override_policy": response_approval_policy(after_override_run),
"final_responses": [
first_result.final_response,
second_result.final_response,
],
} == {
"after_default_policy": AskForApprovalValue.never.value,
"after_override_policy": AskForApprovalValue.on_request.value,
"final_responses": ["locked down", "reviewable"],
}
def test_async_thread_run_approval_mode_persists_until_explicit_override(
tmp_path,
) -> None:
"""Async omitted run approval mode should leave stored settings alone."""
async def scenario() -> None:
"""Use the async client to verify persisted app-server approval state."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_assistant_message(
"async locked down",
response_id="async-approval-1",
)
harness.responses.enqueue_assistant_message(
"async reviewable",
response_id="async-approval-2",
)
async with AsyncCodex(config=harness.app_server_config()) as codex:
thread = await codex.thread_start(approval_mode=ApprovalMode.deny_all)
first_result = await thread.run("keep async approvals denied")
after_default_run = await codex._client.thread_resume( # noqa: SLF001
thread.id,
ThreadResumeParams(thread_id=thread.id),
)
second_result = await thread.run(
"allow async auto review now",
approval_mode=ApprovalMode.auto_review,
)
after_override_run = await codex._client.thread_resume( # noqa: SLF001
thread.id,
ThreadResumeParams(thread_id=thread.id),
)
assert {
"after_default_policy": response_approval_policy(after_default_run),
"after_override_policy": response_approval_policy(after_override_run),
"final_responses": [
first_result.final_response,
second_result.final_response,
],
} == {
"after_default_policy": AskForApprovalValue.never.value,
"after_override_policy": AskForApprovalValue.on_request.value,
"final_responses": ["async locked down", "async reviewable"],
}
asyncio.run(scenario())

View File

@@ -0,0 +1,74 @@
from __future__ import annotations
from app_server_harness import AppServerHarness
from openai_codex import Codex, ImageInput, LocalImageInput, TextInput
from pinned_app_server_helpers import TINY_PNG_BYTES
def test_remote_image_input_reaches_responses_api_through_pinned_app_server(
tmp_path,
) -> None:
"""Remote image inputs should survive the SDK and app-server boundary."""
remote_image_url = "https://example.com/codex.png"
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_assistant_message(
"remote image received",
response_id="remote-image",
)
with Codex(config=harness.app_server_config()) as codex:
result = codex.thread_start().run(
[
TextInput("Describe the remote image."),
ImageInput(remote_image_url),
]
)
request = harness.responses.single_request()
assert {
"final_response": result.final_response,
"contains_user_prompt": "Describe the remote image."
in request.message_input_texts("user"),
"image_urls": request.message_image_urls("user"),
} == {
"final_response": "remote image received",
"contains_user_prompt": True,
"image_urls": [remote_image_url],
}
def test_local_image_input_reaches_responses_api_through_pinned_app_server(
tmp_path,
) -> None:
"""Local image inputs should become data URLs after crossing the app-server."""
local_image = tmp_path / "local.png"
local_image.write_bytes(TINY_PNG_BYTES)
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_assistant_message(
"local image received",
response_id="local-image",
)
with Codex(config=harness.app_server_config()) as codex:
result = codex.thread_start().run(
[
TextInput("Describe the local image."),
LocalImageInput(str(local_image)),
]
)
request = harness.responses.single_request()
assert {
"final_response": result.final_response,
"contains_user_prompt": "Describe the local image."
in request.message_input_texts("user"),
"image_url_is_png_data_url": request.message_image_urls("user")[-1].startswith(
"data:image/png;base64,"
),
} == {
"final_response": "local image received",
"contains_user_prompt": True,
"image_url_is_png_data_url": True,
}

View File

@@ -0,0 +1,98 @@
from __future__ import annotations
from app_server_harness import AppServerHarness
from openai_codex import Codex
from pinned_app_server_helpers import request_kind
def test_thread_set_name_and_read_use_pinned_app_server(tmp_path) -> None:
"""Thread naming should round-trip through app-server JSON-RPC."""
with AppServerHarness(tmp_path) as harness:
with Codex(config=harness.app_server_config()) as codex:
thread = codex.thread_start()
thread.set_name("sdk integration thread")
named = thread.read(include_turns=True)
assert {"thread_name": named.thread.name} == {
"thread_name": "sdk integration thread",
}
def test_thread_fork_returns_distinct_thread_from_pinned_app_server(tmp_path) -> None:
"""Thread fork should return a distinct thread from app-server."""
with AppServerHarness(tmp_path) as harness:
with Codex(config=harness.app_server_config()) as codex:
thread = codex.thread_start()
forked = codex.thread_fork(thread.id)
assert {"forked_is_distinct": forked.id != thread.id} == {
"forked_is_distinct": True,
}
def test_archive_unarchive_round_trip_uses_materialized_rollout(tmp_path) -> None:
"""Archive helpers should work once the app-server has persisted a rollout."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_assistant_message("materialized", response_id="archive-seed")
with Codex(config=harness.app_server_config()) as codex:
thread = codex.thread_start()
seeded = thread.run("materialize this thread before archive")
archived = codex.thread_archive(thread.id)
unarchived = codex.thread_unarchive(thread.id)
read = unarchived.read()
assert {
"seeded_response": seeded.final_response,
"archive_response": archived.model_dump(by_alias=True, mode="json"),
"unarchived_id": unarchived.id,
"read_id": read.thread.id,
} == {
"seeded_response": "materialized",
"archive_response": {},
"unarchived_id": thread.id,
"read_id": thread.id,
}
def test_models_use_pinned_app_server_rpc(tmp_path) -> None:
"""Model listing should go through the pinned app-server method."""
with AppServerHarness(tmp_path) as harness:
with Codex(config=harness.app_server_config()) as codex:
models = codex.models(include_hidden=True)
assert {
"models_payload_has_data": isinstance(
models.model_dump(by_alias=True, mode="json").get("data"),
list,
),
} == {"models_payload_has_data": True}
def test_compact_uses_pinned_app_server_rpc_and_mock_responses(tmp_path) -> None:
"""Compaction should run through app-server and hit the mock Responses boundary."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_assistant_message("history", response_id="compact-history")
harness.responses.enqueue_assistant_message(
"compact summary",
response_id="compact-summary",
)
with Codex(config=harness.app_server_config()) as codex:
thread = codex.thread_start()
run_result = thread.run("create history")
compact_response = thread.compact()
requests = harness.responses.wait_for_requests(2)
assert {
"run_final_response": run_result.final_response,
"compact_response": compact_response.model_dump(
by_alias=True,
mode="json",
),
"request_kinds": [request_kind(request.path) for request in requests],
} == {
"run_final_response": "history",
"compact_response": {},
"request_kinds": ["responses", "responses"],
}

View File

@@ -0,0 +1,307 @@
from __future__ import annotations
import asyncio
import pytest
from app_server_harness import (
AppServerHarness,
ev_assistant_message,
ev_completed,
ev_failed,
ev_response_created,
sse,
)
from openai_codex import AsyncCodex, Codex
from openai_codex.generated.v2_all import MessagePhase
from pinned_app_server_helpers import (
agent_message_texts_from_items,
assistant_message_with_phase,
)
def test_sync_thread_run_uses_pinned_app_server_and_mock_responses(
tmp_path,
) -> None:
"""Drive Thread.run through the pinned app-server and inspect the HTTP request."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_assistant_message("Hello from the mock.", response_id="run-1")
with Codex(config=harness.app_server_config()) as codex:
thread = codex.thread_start()
result = thread.run("hello")
request = harness.responses.single_request()
body = request.body_json()
assert {
"final_response": result.final_response,
"agent_messages": agent_message_texts_from_items(result.items),
"has_usage": result.usage is not None,
"request_model": body["model"],
"request_stream": body["stream"],
"request_user_texts": request.message_input_texts("user")[-1:],
} == {
"final_response": "Hello from the mock.",
"agent_messages": ["Hello from the mock."],
"has_usage": True,
"request_model": "mock-model",
"request_stream": True,
"request_user_texts": ["hello"],
}
def test_async_thread_run_uses_pinned_app_server_and_mock_responses(
tmp_path,
) -> None:
"""Async Thread.run should exercise the same app-server boundary."""
async def scenario() -> None:
"""Run the async client against a real app-server process."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_assistant_message(
"Hello async.",
response_id="async-run-1",
)
async with AsyncCodex(config=harness.app_server_config()) as codex:
thread = await codex.thread_start()
result = await thread.run("async hello")
request = harness.responses.single_request()
assert {
"final_response": result.final_response,
"agent_messages": agent_message_texts_from_items(result.items),
"request_user_texts": request.message_input_texts("user")[-1:],
} == {
"final_response": "Hello async.",
"agent_messages": ["Hello async."],
"request_user_texts": ["async hello"],
}
asyncio.run(scenario())
def test_sync_run_result_uses_last_unknown_phase_message(tmp_path) -> None:
"""RunResult should use the last unknown-phase agent message as final text."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(
sse(
[
ev_response_created("items-last"),
ev_assistant_message("msg-items-first", "First message"),
ev_assistant_message("msg-items-second", "Second message"),
ev_completed("items-last"),
]
)
)
with Codex(config=harness.app_server_config()) as codex:
result = codex.thread_start().run("case: last unknown phase wins")
assert {
"final_response": result.final_response,
"agent_messages": agent_message_texts_from_items(result.items),
} == {
"final_response": "Second message",
"agent_messages": ["First message", "Second message"],
}
def test_sync_run_result_preserves_empty_last_message(tmp_path) -> None:
"""RunResult should preserve an empty final agent message instead of skipping it."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(
sse(
[
ev_response_created("items-empty"),
ev_assistant_message("msg-items-nonempty", "First message"),
ev_assistant_message("msg-items-empty", ""),
ev_completed("items-empty"),
]
)
)
with Codex(config=harness.app_server_config()) as codex:
result = codex.thread_start().run("case: empty last message")
assert {
"final_response": result.final_response,
"agent_messages": agent_message_texts_from_items(result.items),
} == {
"final_response": "",
"agent_messages": ["First message", ""],
}
def test_sync_run_result_does_not_promote_commentary_only_to_final(tmp_path) -> None:
"""RunResult final_response should stay unset when app-server marks only commentary."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(
sse(
[
ev_response_created("items-commentary"),
assistant_message_with_phase(
"msg-items-commentary",
"Commentary",
MessagePhase.commentary,
),
ev_completed("items-commentary"),
]
)
)
with Codex(config=harness.app_server_config()) as codex:
result = codex.thread_start().run("case: commentary only")
assert {
"final_response": result.final_response,
"agent_messages": agent_message_texts_from_items(result.items),
} == {
"final_response": None,
"agent_messages": ["Commentary"],
}
def test_async_run_result_uses_last_unknown_phase_message(tmp_path) -> None:
"""Async RunResult should use the last unknown-phase agent message."""
async def scenario() -> None:
"""Run one async result-mapping case against a pinned app-server."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(
sse(
[
ev_response_created("async-items-last"),
ev_assistant_message(
"msg-async-items-first",
"First async message",
),
ev_assistant_message(
"msg-async-items-second",
"Second async message",
),
ev_completed("async-items-last"),
]
)
)
async with AsyncCodex(config=harness.app_server_config()) as codex:
result = await (await codex.thread_start()).run(
"case: async last unknown phase"
)
assert {
"final_response": result.final_response,
"agent_messages": agent_message_texts_from_items(result.items),
} == {
"final_response": "Second async message",
"agent_messages": ["First async message", "Second async message"],
}
asyncio.run(scenario())
def test_async_run_result_does_not_promote_commentary_only_to_final(
tmp_path,
) -> None:
"""Async RunResult final_response should stay unset for commentary-only output."""
async def scenario() -> None:
"""Run one async commentary mapping case against a pinned app-server."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(
sse(
[
ev_response_created("async-items-commentary"),
assistant_message_with_phase(
"msg-async-items-commentary",
"Async commentary",
MessagePhase.commentary,
),
ev_completed("async-items-commentary"),
]
)
)
async with AsyncCodex(config=harness.app_server_config()) as codex:
result = await (await codex.thread_start()).run(
"case: async commentary only"
)
assert {
"final_response": result.final_response,
"agent_messages": agent_message_texts_from_items(result.items),
} == {
"final_response": None,
"agent_messages": ["Async commentary"],
}
asyncio.run(scenario())
def test_thread_run_raises_when_real_app_server_reports_failed_turn(tmp_path) -> None:
"""Thread.run should surface the failed turn error emitted by app-server."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(
sse(
[
ev_response_created("failed-run"),
ev_failed("failed-run", "boom from mock model"),
]
)
)
with Codex(config=harness.app_server_config()) as codex:
thread = codex.thread_start()
with pytest.raises(RuntimeError, match="boom from mock model"):
thread.run("trigger failure")
def test_final_answer_phase_survives_real_app_server_mapping(tmp_path) -> None:
"""RunResult should use the final-answer item emitted by app-server."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(
sse(
[
ev_response_created("phase-1"),
{
**ev_assistant_message("msg-commentary", "Commentary"),
"item": {
**ev_assistant_message("msg-commentary", "Commentary")["item"],
"phase": MessagePhase.commentary.value,
},
},
{
**ev_assistant_message("msg-final", "Final answer"),
"item": {
**ev_assistant_message("msg-final", "Final answer")["item"],
"phase": MessagePhase.final_answer.value,
},
},
ev_completed("phase-1"),
]
)
)
with Codex(config=harness.app_server_config()) as codex:
result = codex.thread_start().run("choose final answer")
assert {
"final_response": result.final_response,
"items": [
{
"text": item.root.text,
"phase": None if item.root.phase is None else item.root.phase.value,
}
for item in result.items
if item.root.type == "agentMessage"
],
} == {
"final_response": "Final answer",
"items": [
{"text": "Commentary", "phase": MessagePhase.commentary.value},
{"text": "Final answer", "phase": MessagePhase.final_answer.value},
],
}

View File

@@ -0,0 +1,266 @@
from __future__ import annotations
import asyncio
from app_server_harness import AppServerHarness
from openai_codex import AsyncCodex, Codex, TextInput
from openai_codex.generated.v2_all import (
AgentMessageDeltaNotification,
TurnCompletedNotification,
TurnStatus,
)
from pinned_app_server_helpers import (
agent_message_texts,
next_async_delta,
next_sync_delta,
streaming_response,
)
def test_sync_stream_routes_text_deltas_and_completion(tmp_path) -> None:
"""A sync turn stream should expose deltas, completed items, and completion."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(
streaming_response("stream-1", "msg-stream-1", ["hel", "lo"])
)
with Codex(config=harness.app_server_config()) as codex:
thread = codex.thread_start()
stream = thread.turn(TextInput("stream please")).stream()
events = list(stream)
assert {
"deltas": [
event.payload.delta
for event in events
if isinstance(event.payload, AgentMessageDeltaNotification)
],
"agent_messages": agent_message_texts(events),
"completed_statuses": [
event.payload.turn.status
for event in events
if isinstance(event.payload, TurnCompletedNotification)
],
} == {
"deltas": ["hel", "lo"],
"agent_messages": ["hello"],
"completed_statuses": [TurnStatus.completed],
}
def test_turn_run_returns_completed_turn_from_pinned_app_server(tmp_path) -> None:
"""TurnHandle.run should wait for the app-server completion notification."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_assistant_message("turn complete", response_id="turn-run-1")
with Codex(config=harness.app_server_config()) as codex:
thread = codex.thread_start()
turn = thread.turn(TextInput("complete this turn"))
completed = turn.run()
assert {
"turn_id": completed.id,
"status": completed.status,
"items": completed.items,
} == {
"turn_id": turn.id,
"status": TurnStatus.completed,
"items": [],
}
def test_async_stream_routes_text_deltas_and_completion(tmp_path) -> None:
"""An async turn stream should expose the same notification sequence."""
async def scenario() -> None:
"""Stream one async turn against the real pinned app-server."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(
streaming_response("async-stream-1", "msg-async-stream-1", ["as", "ync"])
)
async with AsyncCodex(config=harness.app_server_config()) as codex:
thread = await codex.thread_start()
turn = await thread.turn(TextInput("async stream please"))
events = [event async for event in turn.stream()]
assert {
"deltas": [
event.payload.delta
for event in events
if isinstance(event.payload, AgentMessageDeltaNotification)
],
"agent_messages": agent_message_texts(events),
"completed_statuses": [
event.payload.turn.status
for event in events
if isinstance(event.payload, TurnCompletedNotification)
],
} == {
"deltas": ["as", "ync"],
"agent_messages": ["async"],
"completed_statuses": [TurnStatus.completed],
}
asyncio.run(scenario())
def test_low_level_sync_stream_text_uses_real_turn_routing(tmp_path) -> None:
"""AppServerClient.stream_text should stream through a real app-server turn."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(
streaming_response("low-sync-stream", "msg-low-sync-stream", ["fir", "st"])
)
with Codex(config=harness.app_server_config()) as codex:
thread = codex.thread_start()
chunks = list(codex._client.stream_text(thread.id, "low-level sync")) # noqa: SLF001
assert [chunk.delta for chunk in chunks] == ["fir", "st"]
def test_low_level_async_stream_text_allows_parallel_model_list(tmp_path) -> None:
"""Async stream_text should yield without blocking another app-server request."""
async def scenario() -> None:
"""Leave a stream open while another async request completes."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(
streaming_response(
"low-async-stream",
"msg-low-async-stream",
["one", "two", "three"],
),
delay_between_events_s=0.03,
)
async with AsyncCodex(config=harness.app_server_config()) as codex:
thread = await codex.thread_start()
stream = codex._client.stream_text( # noqa: SLF001
thread.id,
"low-level async",
)
first = await anext(stream)
models_task = asyncio.create_task(codex.models())
models = await asyncio.wait_for(models_task, timeout=1.0)
remaining = [chunk.delta async for chunk in stream]
assert {
"first": first.delta,
"remaining": remaining,
"models_payload_has_data": isinstance(
models.model_dump(by_alias=True, mode="json").get("data"),
list,
),
} == {
"first": "one",
"remaining": ["two", "three"],
"models_payload_has_data": True,
}
asyncio.run(scenario())
def test_interleaved_sync_turn_streams_route_by_turn_id(tmp_path) -> None:
"""Two sync streams on one client should consume only their own notifications."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(
streaming_response("first-stream", "msg-first", ["one-", "done"]),
delay_between_events_s=0.01,
)
harness.responses.enqueue_sse(
streaming_response("second-stream", "msg-second", ["two-", "done"]),
delay_between_events_s=0.01,
)
with Codex(config=harness.app_server_config()) as codex:
first_thread = codex.thread_start()
second_thread = codex.thread_start()
first_turn = first_thread.turn(TextInput("first"))
second_turn = second_thread.turn(TextInput("second"))
first_stream = first_turn.stream()
second_stream = second_turn.stream()
first_first_delta = next_sync_delta(first_stream)
second_first_delta = next_sync_delta(second_stream)
first_second_delta = next_sync_delta(first_stream)
second_second_delta = next_sync_delta(second_stream)
first_tail = list(first_stream)
second_tail = list(second_stream)
assert {
"streams": sorted(
[
(
first_first_delta,
first_second_delta,
agent_message_texts(first_tail),
),
(
second_first_delta,
second_second_delta,
agent_message_texts(second_tail),
),
]
),
} == {
"streams": [
("one-", "done", ["one-done"]),
("two-", "done", ["two-done"]),
],
}
def test_interleaved_async_turn_streams_route_by_turn_id(tmp_path) -> None:
"""Two async streams on one client should consume only their own notifications."""
async def scenario() -> None:
"""Interleave async stream consumers against one app-server process."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(
streaming_response("async-first", "msg-async-first", ["a1", "-done"]),
delay_between_events_s=0.01,
)
harness.responses.enqueue_sse(
streaming_response("async-second", "msg-async-second", ["a2", "-done"]),
delay_between_events_s=0.01,
)
async with AsyncCodex(config=harness.app_server_config()) as codex:
first_thread = await codex.thread_start()
second_thread = await codex.thread_start()
first_turn = await first_thread.turn(TextInput("async first"))
second_turn = await second_thread.turn(TextInput("async second"))
first_stream = first_turn.stream()
second_stream = second_turn.stream()
first_first_delta = await next_async_delta(first_stream)
second_first_delta = await next_async_delta(second_stream)
first_second_delta = await next_async_delta(first_stream)
second_second_delta = await next_async_delta(second_stream)
first_tail = [event async for event in first_stream]
second_tail = [event async for event in second_stream]
assert {
"streams": sorted(
[
(
first_first_delta,
first_second_delta,
agent_message_texts(first_tail),
),
(
second_first_delta,
second_second_delta,
agent_message_texts(second_tail),
),
]
),
} == {
"streams": [
("a1", "-done", ["a1-done"]),
("a2", "-done", ["a2-done"]),
],
}
asyncio.run(scenario())

View File

@@ -0,0 +1,82 @@
from __future__ import annotations
from app_server_harness import AppServerHarness
from openai_codex import Codex, TextInput
from openai_codex.generated.v2_all import TurnStatus
from pinned_app_server_helpers import agent_message_texts, streaming_response
def test_turn_steer_adds_follow_up_input_through_pinned_app_server(tmp_path) -> None:
"""Steering an active turn should create a follow-up Responses request."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(
streaming_response("steer-first", "msg-steer-first", ["before steer"]),
delay_between_events_s=0.2,
)
harness.responses.enqueue_assistant_message(
"after steer",
response_id="steer-second",
)
with Codex(config=harness.app_server_config()) as codex:
thread = codex.thread_start()
turn = thread.turn(TextInput("Start a steerable turn."))
harness.responses.wait_for_requests(1)
steer = turn.steer(TextInput("Use this steering input."))
events = list(turn.stream())
requests = harness.responses.wait_for_requests(2)
assert {
"steered_turn_id": steer.turn_id,
"turn_id": turn.id,
"agent_messages": agent_message_texts(events),
"last_user_texts": [
request.message_input_texts("user")[-1] for request in requests
],
} == {
"steered_turn_id": turn.id,
"turn_id": turn.id,
"agent_messages": ["before steer", "after steer"],
"last_user_texts": [
"Start a steerable turn.",
"Use this steering input.",
],
}
def test_turn_interrupt_stops_active_turn_and_follow_up_runs(tmp_path) -> None:
"""Interrupting an active turn should complete it and leave the thread usable."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(
streaming_response(
"interrupt-first",
"msg-interrupt-first",
["still ", "running"],
),
delay_between_events_s=0.2,
)
harness.responses.enqueue_assistant_message(
"after interrupt",
response_id="interrupt-follow-up",
)
with Codex(config=harness.app_server_config()) as codex:
thread = codex.thread_start()
interrupted_turn = thread.turn(TextInput("Start a long turn."))
harness.responses.wait_for_requests(1)
interrupt_response = interrupted_turn.interrupt()
completed = interrupted_turn.run()
follow_up = thread.run("Continue after the interrupt.")
assert {
"interrupt_response": interrupt_response.model_dump(
by_alias=True,
mode="json",
),
"interrupted_status": completed.status,
"follow_up": follow_up.final_response,
} == {
"interrupt_response": {},
"interrupted_status": TurnStatus.interrupted,
"follow_up": "after interrupt",
}