Add Python SDK mock app-server integration tests

Build deterministic Python SDK integration coverage around the pinned app-server runtime and a local mock Responses server. Port behavioral coverage off direct SDK monkeypatches where the real app-server boundary is more useful.

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Ahmed Ibrahim
2026-05-10 13:32:31 +03:00
parent f3e16de572
commit cad4bbdd64
3 changed files with 899 additions and 342 deletions

View File

@@ -0,0 +1,399 @@
from __future__ import annotations
import json
import queue
import shutil
import threading
import time
from dataclasses import dataclass
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any
from openai_codex import AppServerConfig
Json = dict[str, Any]
@dataclass(frozen=True)
class CapturedResponsesRequest:
"""Recorded request sent by app-server to the mock Responses API."""
method: str
path: str
headers: dict[str, str]
body: bytes
def body_json(self) -> Json:
"""Decode the request body as JSON."""
return json.loads(self.body.decode("utf-8"))
def input(self) -> list[Json]:
"""Return the Responses API input array from the request."""
value = self.body_json().get("input")
if not isinstance(value, list):
raise AssertionError(f"expected input list, got {value!r}")
return value
def message_input_texts(self, role: str) -> list[str]:
"""Return all input_text strings for message inputs matching one role."""
texts: list[str] = []
for item in self.input():
if item.get("type") != "message" or item.get("role") != role:
continue
content = item.get("content")
if isinstance(content, str):
texts.append(content)
continue
if not isinstance(content, list):
continue
for span in content:
if isinstance(span, dict) and span.get("type") == "input_text":
text = span.get("text")
if isinstance(text, str):
texts.append(text)
return texts
def header(self, name: str) -> str | None:
"""Return a captured request header by case-insensitive name."""
return self.headers.get(name.lower())
@dataclass(frozen=True)
class MockSseResponse:
"""One queued SSE response served by the mock Responses API."""
body: str
delay_between_events_s: float = 0.0
def chunks(self) -> list[bytes]:
"""Split an SSE body into event chunks while preserving framing."""
chunks: list[bytes] = []
for part in self.body.split("\n\n"):
if not part:
continue
chunks.append(f"{part}\n\n".encode("utf-8"))
return chunks
class MockResponsesServer:
"""Local HTTP server that records `/v1/responses` requests and returns SSE."""
def __init__(self) -> None:
self._responses: queue.Queue[MockSseResponse] = queue.Queue()
self._requests: list[CapturedResponsesRequest] = []
self._requests_lock = threading.Lock()
self._server = _ResponsesHttpServer(("127.0.0.1", 0), _ResponsesHandler, self)
self._thread = threading.Thread(
target=self._server.serve_forever,
name="mock-responses-api",
daemon=True,
)
def __enter__(self) -> MockResponsesServer:
self._thread.start()
return self
def __exit__(self, _exc_type: object, _exc: object, _tb: object) -> None:
self.close()
@property
def url(self) -> str:
"""Return the base URL for app-server config."""
host, port = self._server.server_address
return f"http://{host}:{port}"
def close(self) -> None:
"""Stop the background HTTP server thread."""
self._server.shutdown()
self._server.server_close()
self._thread.join(timeout=2)
def enqueue_sse(
self,
body: str,
*,
delay_between_events_s: float = 0.0,
) -> None:
"""Queue one SSE body for the next `/v1/responses` request."""
self._responses.put(
MockSseResponse(
body=body,
delay_between_events_s=delay_between_events_s,
)
)
def enqueue_assistant_message(self, text: str, *, response_id: str = "resp-1") -> None:
"""Queue a completed assistant-message model response."""
self.enqueue_sse(
sse(
[
ev_response_created(response_id),
ev_assistant_message(f"msg-{response_id}", text),
ev_completed(response_id),
]
)
)
def requests(self) -> list[CapturedResponsesRequest]:
"""Return all recorded Responses API requests."""
with self._requests_lock:
return list(self._requests)
def single_request(self) -> CapturedResponsesRequest:
"""Return the only recorded request, failing if the count differs."""
requests = self.requests()
if len(requests) != 1:
raise AssertionError(f"expected 1 request, got {len(requests)}")
return requests[0]
def wait_for_requests(
self,
count: int,
*,
timeout_s: float = 5.0,
) -> list[CapturedResponsesRequest]:
"""Wait until at least `count` requests have been recorded."""
deadline = time.monotonic() + timeout_s
while time.monotonic() < deadline:
requests = self.requests()
if len(requests) >= count:
return requests
time.sleep(0.01)
requests = self.requests()
raise AssertionError(f"expected {count} requests, got {len(requests)}")
def _record_request(self, handler: BaseHTTPRequestHandler, body: bytes) -> None:
"""Record one inbound HTTP request from app-server."""
headers = {key.lower(): value for key, value in handler.headers.items()}
request = CapturedResponsesRequest(
method=handler.command,
path=handler.path,
headers=headers,
body=body,
)
with self._requests_lock:
self._requests.append(request)
def _next_response(self) -> MockSseResponse:
"""Return the next queued SSE response or fail the HTTP request."""
return self._responses.get_nowait()
class AppServerHarness:
"""Test fixture that points a pinned runtime app-server at MockResponsesServer."""
def __init__(self, tmp_path: Path) -> None:
self.tmp_path = tmp_path
self.codex_home = tmp_path / "codex-home"
self.workspace = tmp_path / "workspace"
self.responses = MockResponsesServer()
def __enter__(self) -> AppServerHarness:
self.codex_home.mkdir()
self.workspace.mkdir()
self.responses.__enter__()
self._write_config()
return self
def __exit__(self, _exc_type: object, _exc: object, _tb: object) -> None:
self.responses.__exit__(_exc_type, _exc, _tb)
shutil.rmtree(self.codex_home, ignore_errors=True)
shutil.rmtree(self.workspace, ignore_errors=True)
def app_server_config(self) -> AppServerConfig:
"""Build SDK config for an isolated pinned-runtime app-server process."""
return AppServerConfig(
cwd=str(self.workspace),
env={
"CODEX_HOME": str(self.codex_home),
"CODEX_APP_SERVER_DISABLE_MANAGED_CONFIG": "1",
"RUST_LOG": "warn",
},
)
def _write_config(self) -> None:
"""Write config.toml that routes model calls to the mock server."""
config_toml = self.codex_home / "config.toml"
config_toml.write_text(
f"""
model = "mock-model"
approval_policy = "never"
sandbox_mode = "read-only"
model_provider = "mock_provider"
[model_providers.mock_provider]
name = "Mock provider for Python SDK tests"
base_url = "{self.responses.url}/v1"
wire_api = "responses"
request_max_retries = 0
stream_max_retries = 0
""".lstrip()
)
class _ResponsesHttpServer(ThreadingHTTPServer):
"""ThreadingHTTPServer carrying a reference to the owning mock."""
def __init__(
self,
server_address: tuple[str, int],
handler_class: type[BaseHTTPRequestHandler],
mock: MockResponsesServer,
) -> None:
super().__init__(server_address, handler_class)
self.mock = mock
class _ResponsesHandler(BaseHTTPRequestHandler):
"""HTTP handler for the subset of the Responses API used by SDK tests."""
server: _ResponsesHttpServer
def log_message(self, _format: str, *_args: object) -> None:
"""Silence default stderr logging; pytest failures print captured requests."""
return None
def do_GET(self) -> None:
"""Serve a minimal `/v1/models` response if app-server asks for models."""
if self.path.endswith("/v1/models") or self.path.endswith("/models"):
self._send_json(
{
"object": "list",
"data": [
{
"id": "mock-model",
"object": "model",
"created": 0,
"owned_by": "openai",
}
],
}
)
return
self.send_error(404, f"unexpected GET {self.path}")
def do_POST(self) -> None:
"""Serve queued SSE responses for `/v1/responses` requests."""
length = int(self.headers.get("content-length", "0"))
body = self.rfile.read(length)
self.server.mock._record_request(self, body)
if not (self.path.endswith("/v1/responses") or self.path.endswith("/responses")):
self.send_error(404, f"unexpected POST {self.path}")
return
try:
response = self.server.mock._next_response()
except queue.Empty:
self.send_error(500, "no queued SSE response")
return
self.send_response(200)
self.send_header("content-type", "text/event-stream")
self.end_headers()
for chunk in response.chunks():
self.wfile.write(chunk)
self.wfile.flush()
if response.delay_between_events_s:
time.sleep(response.delay_between_events_s)
def _send_json(self, payload: Json) -> None:
"""Write one JSON response."""
body = json.dumps(payload).encode("utf-8")
self.send_response(200)
self.send_header("content-type", "application/json")
self.send_header("content-length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def sse(events: list[Json]) -> str:
"""Build an SSE body from Responses API event JSON objects."""
chunks: list[str] = []
for event in events:
event_type = event["type"]
chunks.append(f"event: {event_type}\ndata: {json.dumps(event)}\n")
return "\n".join(chunks) + "\n"
def ev_response_created(response_id: str) -> Json:
"""Return a minimal `response.created` event."""
return {"type": "response.created", "response": {"id": response_id}}
def ev_completed(response_id: str) -> Json:
"""Return a minimal `response.completed` event with usage."""
return {
"type": "response.completed",
"response": {
"id": response_id,
"usage": {
"input_tokens": 1,
"input_tokens_details": None,
"output_tokens": 1,
"output_tokens_details": None,
"total_tokens": 2,
},
},
}
def ev_assistant_message(item_id: str, text: str) -> Json:
"""Return a completed assistant message output item."""
return {
"type": "response.output_item.done",
"item": {
"type": "message",
"role": "assistant",
"id": item_id,
"content": [{"type": "output_text", "text": text}],
},
}
def ev_message_item_added(item_id: str, text: str = "") -> Json:
"""Return an assistant message added event before streaming deltas."""
return {
"type": "response.output_item.added",
"item": {
"type": "message",
"role": "assistant",
"id": item_id,
"content": [{"type": "output_text", "text": text}],
},
}
def ev_output_text_delta(delta: str) -> Json:
"""Return an output-text delta event."""
return {
"type": "response.output_text.delta",
"delta": delta,
}
def ev_function_call(call_id: str, name: str, arguments: str) -> Json:
"""Return a completed function-call output item."""
return {
"type": "response.output_item.done",
"item": {
"type": "function_call",
"call_id": call_id,
"name": name,
"arguments": arguments,
},
}
def ev_failed(response_id: str, message: str) -> Json:
"""Return a failed model response event."""
return {
"type": "response.failed",
"response": {
"id": response_id,
"error": {"code": "server_error", "message": message},
},
}

View File

@@ -0,0 +1,500 @@
from __future__ import annotations
import asyncio
from collections.abc import AsyncIterator, Iterator
from typing import Any
from app_server_harness import (
AppServerHarness,
ev_assistant_message,
ev_completed,
ev_message_item_added,
ev_output_text_delta,
ev_response_created,
sse,
)
from openai_codex import ApprovalMode, AsyncCodex, Codex, TextInput
from openai_codex.generated.v2_all import (
AgentMessageDeltaNotification,
AskForApprovalValue,
ItemCompletedNotification,
MessagePhase,
ThreadResumeParams,
TurnCompletedNotification,
TurnStatus,
)
from openai_codex.models import Notification
def _response_approval_policy(response: Any) -> str:
"""Return serialized approvalPolicy from a generated thread response."""
return response.model_dump(by_alias=True, mode="json")["approvalPolicy"]
def _response_approval_settings(response: Any) -> dict[str, object]:
"""Return only approval fields from a generated thread response."""
dumped = response.model_dump(by_alias=True, mode="json")
return {
key: dumped[key]
for key in ("approvalPolicy", "approvalsReviewer")
if key in dumped
}
def _agent_message_texts(events: list[Notification]) -> list[str]:
"""Extract completed agent-message text from SDK notifications."""
texts: list[str] = []
for event in events:
if not isinstance(event.payload, ItemCompletedNotification):
continue
item = event.payload.item.root
if item.type == "agentMessage":
texts.append(item.text)
return texts
def _next_sync_delta(stream: Iterator[Notification]) -> str:
"""Advance a sync turn stream until the next agent-message text delta."""
for event in stream:
if isinstance(event.payload, AgentMessageDeltaNotification):
return event.payload.delta
raise AssertionError("stream completed before an agent-message delta")
async def _next_async_delta(stream: AsyncIterator[Notification]) -> str:
"""Advance an async turn stream until the next agent-message text delta."""
async for event in stream:
if isinstance(event.payload, AgentMessageDeltaNotification):
return event.payload.delta
raise AssertionError("stream completed before an agent-message delta")
def _streaming_response(response_id: str, item_id: str, parts: list[str]) -> str:
"""Build an SSE stream with text deltas and a final assistant message."""
return sse(
[
ev_response_created(response_id),
ev_message_item_added(item_id),
*[ev_output_text_delta(part) for part in parts],
ev_assistant_message(item_id, "".join(parts)),
ev_completed(response_id),
]
)
def test_sync_thread_run_uses_pinned_app_server_and_mock_responses(
tmp_path,
) -> None:
"""Drive Thread.run through the pinned app-server and inspect the HTTP request."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_assistant_message("Hello from the mock.", response_id="run-1")
with Codex(config=harness.app_server_config()) as codex:
thread = codex.thread_start()
result = thread.run("hello")
request = harness.responses.single_request()
body = request.body_json()
assert {
"final_response": result.final_response,
"agent_messages": [item.root.text for item in result.items],
"has_usage": result.usage is not None,
"request_model": body["model"],
"request_stream": body["stream"],
"request_user_texts": request.message_input_texts("user"),
} == {
"final_response": "Hello from the mock.",
"agent_messages": ["Hello from the mock."],
"has_usage": True,
"request_model": "mock-model",
"request_stream": True,
"request_user_texts": ["hello"],
}
def test_async_thread_run_uses_pinned_app_server_and_mock_responses(
tmp_path,
) -> None:
"""Async Thread.run should exercise the same app-server boundary."""
async def scenario() -> None:
"""Run the async client against a real app-server process."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_assistant_message(
"Hello async.",
response_id="async-run-1",
)
async with AsyncCodex(config=harness.app_server_config()) as codex:
thread = await codex.thread_start()
result = await thread.run("async hello")
request = harness.responses.single_request()
assert {
"final_response": result.final_response,
"agent_messages": [item.root.text for item in result.items],
"request_user_texts": request.message_input_texts("user"),
} == {
"final_response": "Hello async.",
"agent_messages": ["Hello async."],
"request_user_texts": ["async hello"],
}
asyncio.run(scenario())
def test_sync_stream_routes_text_deltas_and_completion(tmp_path) -> None:
"""A sync turn stream should expose deltas, completed items, and completion."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(
_streaming_response("stream-1", "msg-stream-1", ["hel", "lo"])
)
with Codex(config=harness.app_server_config()) as codex:
thread = codex.thread_start()
stream = thread.turn(TextInput("stream please")).stream()
events = list(stream)
assert {
"deltas": [
event.payload.delta
for event in events
if isinstance(event.payload, AgentMessageDeltaNotification)
],
"agent_messages": _agent_message_texts(events),
"completed_statuses": [
event.payload.turn.status
for event in events
if isinstance(event.payload, TurnCompletedNotification)
],
} == {
"deltas": ["hel", "lo"],
"agent_messages": ["hello"],
"completed_statuses": [TurnStatus.completed],
}
def test_turn_run_returns_completed_turn_from_real_app_server(tmp_path) -> None:
"""TurnHandle.run should wait for the app-server completion notification."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_assistant_message("turn complete", response_id="turn-run-1")
with Codex(config=harness.app_server_config()) as codex:
thread = codex.thread_start()
turn = thread.turn(TextInput("complete this turn"))
completed = turn.run()
assert {
"turn_id": completed.id,
"status": completed.status,
"items": completed.items,
} == {
"turn_id": turn.id,
"status": TurnStatus.completed,
"items": [],
}
def test_async_stream_routes_text_deltas_and_completion(tmp_path) -> None:
"""An async turn stream should expose the same notification sequence."""
async def scenario() -> None:
"""Stream one async turn against the real pinned app-server."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(
_streaming_response("async-stream-1", "msg-async-stream-1", ["as", "ync"])
)
async with AsyncCodex(config=harness.app_server_config()) as codex:
thread = await codex.thread_start()
turn = await thread.turn(TextInput("async stream please"))
events = [event async for event in turn.stream()]
assert {
"deltas": [
event.payload.delta
for event in events
if isinstance(event.payload, AgentMessageDeltaNotification)
],
"agent_messages": _agent_message_texts(events),
"completed_statuses": [
event.payload.turn.status
for event in events
if isinstance(event.payload, TurnCompletedNotification)
],
} == {
"deltas": ["as", "ync"],
"agent_messages": ["async"],
"completed_statuses": [TurnStatus.completed],
}
asyncio.run(scenario())
def test_interleaved_sync_turn_streams_route_by_turn_id(tmp_path) -> None:
"""Two sync streams on one client should consume only their own notifications."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(
_streaming_response("first-stream", "msg-first", ["one-", "done"]),
delay_between_events_s=0.01,
)
harness.responses.enqueue_sse(
_streaming_response("second-stream", "msg-second", ["two-", "done"]),
delay_between_events_s=0.01,
)
with Codex(config=harness.app_server_config()) as codex:
first_thread = codex.thread_start()
second_thread = codex.thread_start()
first_turn = first_thread.turn(TextInput("first"))
second_turn = second_thread.turn(TextInput("second"))
first_stream = first_turn.stream()
second_stream = second_turn.stream()
first_first_delta = _next_sync_delta(first_stream)
second_first_delta = _next_sync_delta(second_stream)
first_second_delta = _next_sync_delta(first_stream)
second_second_delta = _next_sync_delta(second_stream)
first_tail = list(first_stream)
second_tail = list(second_stream)
assert {
"interleaved_deltas": [
first_first_delta,
second_first_delta,
first_second_delta,
second_second_delta,
],
"first_agent_messages": _agent_message_texts(first_tail),
"second_agent_messages": _agent_message_texts(second_tail),
} == {
"interleaved_deltas": ["one-", "two-", "done", "done"],
"first_agent_messages": ["one-done"],
"second_agent_messages": ["two-done"],
}
def test_interleaved_async_turn_streams_route_by_turn_id(tmp_path) -> None:
"""Two async streams on one client should consume only their own notifications."""
async def scenario() -> None:
"""Interleave async stream consumers against one app-server process."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(
_streaming_response("async-first", "msg-async-first", ["a1", "-done"]),
delay_between_events_s=0.01,
)
harness.responses.enqueue_sse(
_streaming_response("async-second", "msg-async-second", ["a2", "-done"]),
delay_between_events_s=0.01,
)
async with AsyncCodex(config=harness.app_server_config()) as codex:
first_thread = await codex.thread_start()
second_thread = await codex.thread_start()
first_turn = await first_thread.turn(TextInput("async first"))
second_turn = await second_thread.turn(TextInput("async second"))
first_stream = first_turn.stream()
second_stream = second_turn.stream()
first_first_delta = await _next_async_delta(first_stream)
second_first_delta = await _next_async_delta(second_stream)
first_second_delta = await _next_async_delta(first_stream)
second_second_delta = await _next_async_delta(second_stream)
first_tail = [event async for event in first_stream]
second_tail = [event async for event in second_stream]
assert {
"interleaved_deltas": [
first_first_delta,
second_first_delta,
first_second_delta,
second_second_delta,
],
"first_agent_messages": _agent_message_texts(first_tail),
"second_agent_messages": _agent_message_texts(second_tail),
} == {
"interleaved_deltas": ["a1", "a2", "-done", "-done"],
"first_agent_messages": ["a1-done"],
"second_agent_messages": ["a2-done"],
}
asyncio.run(scenario())
def test_thread_run_approval_mode_persists_until_explicit_override(tmp_path) -> None:
"""Omitted run approval mode should not rewrite the thread's stored setting."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_assistant_message("locked down", response_id="approval-1")
harness.responses.enqueue_assistant_message("reviewable", response_id="approval-2")
with Codex(config=harness.app_server_config()) as codex:
thread = codex.thread_start(approval_mode=ApprovalMode.deny_all)
start_state = codex._client.thread_resume( # noqa: SLF001
thread.id,
ThreadResumeParams(thread_id=thread.id),
)
first_result = thread.run("keep approvals denied")
after_default_run = codex._client.thread_resume( # noqa: SLF001
thread.id,
ThreadResumeParams(thread_id=thread.id),
)
second_result = thread.run(
"allow auto review now",
approval_mode=ApprovalMode.auto_review,
)
after_override_run = codex._client.thread_resume( # noqa: SLF001
thread.id,
ThreadResumeParams(thread_id=thread.id),
)
assert {
"start_policy": _response_approval_policy(start_state),
"after_default_policy": _response_approval_policy(after_default_run),
"after_override_settings": _response_approval_settings(after_override_run),
"final_responses": [
first_result.final_response,
second_result.final_response,
],
} == {
"start_policy": AskForApprovalValue.never.value,
"after_default_policy": AskForApprovalValue.never.value,
"after_override_settings": {
"approvalPolicy": AskForApprovalValue.on_request.value,
"approvalsReviewer": "auto_review",
},
"final_responses": ["locked down", "reviewable"],
}
def test_async_thread_run_approval_mode_persists_until_explicit_override(
tmp_path,
) -> None:
"""Async omitted run approval mode should leave stored settings alone."""
async def scenario() -> None:
"""Use the async client to verify persisted app-server approval state."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_assistant_message(
"async locked down",
response_id="async-approval-1",
)
harness.responses.enqueue_assistant_message(
"async reviewable",
response_id="async-approval-2",
)
async with AsyncCodex(config=harness.app_server_config()) as codex:
thread = await codex.thread_start(approval_mode=ApprovalMode.deny_all)
start_state = await codex._client.thread_resume( # noqa: SLF001
thread.id,
ThreadResumeParams(thread_id=thread.id),
)
first_result = await thread.run("keep async approvals denied")
after_default_run = await codex._client.thread_resume( # noqa: SLF001
thread.id,
ThreadResumeParams(thread_id=thread.id),
)
second_result = await thread.run(
"allow async auto review now",
approval_mode=ApprovalMode.auto_review,
)
after_override_run = await codex._client.thread_resume( # noqa: SLF001
thread.id,
ThreadResumeParams(thread_id=thread.id),
)
assert {
"start_policy": _response_approval_policy(start_state),
"after_default_policy": _response_approval_policy(after_default_run),
"after_override_settings": _response_approval_settings(after_override_run),
"final_responses": [
first_result.final_response,
second_result.final_response,
],
} == {
"start_policy": AskForApprovalValue.never.value,
"after_default_policy": AskForApprovalValue.never.value,
"after_override_settings": {
"approvalPolicy": AskForApprovalValue.on_request.value,
"approvalsReviewer": "auto_review",
},
"final_responses": ["async locked down", "async reviewable"],
}
asyncio.run(scenario())
def test_thread_lifecycle_uses_real_app_server_without_model_mocking(tmp_path) -> None:
"""Thread lifecycle helpers should operate through app-server JSON-RPC."""
with AppServerHarness(tmp_path) as harness:
with Codex(config=harness.app_server_config()) as codex:
thread = codex.thread_start()
thread.set_name("sdk integration thread")
named = thread.read(include_turns=True)
forked = codex.thread_fork(thread.id)
codex.thread_archive(thread.id)
unarchived = codex.thread_unarchive(thread.id)
listed = codex.thread_list(limit=10)
assert {
"name": named.thread.name,
"fork_parent": forked.id != thread.id,
"unarchived_id": unarchived.id,
"listed_ids": sorted(item.id for item in listed.data),
} == {
"name": "sdk integration thread",
"fork_parent": True,
"unarchived_id": thread.id,
"listed_ids": sorted([thread.id, forked.id]),
}
def test_final_answer_phase_survives_real_app_server_mapping(tmp_path) -> None:
"""RunResult should use the final-answer item emitted by app-server."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(
sse(
[
ev_response_created("phase-1"),
{
**ev_assistant_message("msg-commentary", "Commentary"),
"item": {
**ev_assistant_message("msg-commentary", "Commentary")["item"],
"phase": MessagePhase.commentary.value,
},
},
{
**ev_assistant_message("msg-final", "Final answer"),
"item": {
**ev_assistant_message("msg-final", "Final answer")["item"],
"phase": MessagePhase.final_answer.value,
},
},
ev_completed("phase-1"),
]
)
)
with Codex(config=harness.app_server_config()) as codex:
result = codex.thread_start().run("choose final answer")
assert {
"final_response": result.final_response,
"items": [
{
"text": item.root.text,
"phase": None if item.root.phase is None else item.root.phase.value,
}
for item in result.items
],
} == {
"final_response": "Final answer",
"items": [
{"text": "Commentary", "phase": MessagePhase.commentary.value},
{"text": "Final answer", "phase": MessagePhase.final_answer.value},
],
}

View File

@@ -14,21 +14,16 @@ from openai_codex.generated.v2_all import (
AgentMessageDeltaNotification,
ItemCompletedNotification,
MessagePhase,
ThreadTokenUsageUpdatedNotification,
TurnCompletedNotification,
TurnStartParams,
TurnStatus,
)
from openai_codex.models import InitializeResponse, Notification
from openai_codex.api import (
ApprovalMode,
AsyncCodex,
AsyncThread,
AsyncTurnHandle,
Codex,
RunResult,
Thread,
TurnHandle,
)
ROOT = Path(__file__).resolve().parents[1]
@@ -123,38 +118,6 @@ def _item_completed_notification(
)
def _token_usage_notification(
*,
thread_id: str = "thread-1",
turn_id: str = "turn-1",
) -> Notification:
return Notification(
method="thread/tokenUsage/updated",
payload=ThreadTokenUsageUpdatedNotification.model_validate(
{
"threadId": thread_id,
"turnId": turn_id,
"tokenUsage": {
"last": {
"cachedInputTokens": 1,
"inputTokens": 2,
"outputTokens": 3,
"reasoningOutputTokens": 4,
"totalTokens": 9,
},
"total": {
"cachedInputTokens": 5,
"inputTokens": 6,
"outputTokens": 7,
"reasoningOutputTokens": 8,
"totalTokens": 26,
},
},
}
),
)
def test_codex_init_failure_closes_client(monkeypatch: pytest.MonkeyPatch) -> None:
closed: list[bool] = []
@@ -261,64 +224,6 @@ def _approval_mode_turn_params(approval_mode: ApprovalMode) -> TurnStartParams:
)
class CapturingApprovalClient:
"""Collect wrapper params at the app-server client boundary."""
def __init__(self) -> None:
self.params: list[Any] = []
def thread_start(self, params: Any) -> SimpleNamespace:
self.params.append(params)
return SimpleNamespace(thread=SimpleNamespace(id="thread-1"))
def thread_resume(self, thread_id: str, params: Any) -> SimpleNamespace:
self.params.append(params)
return SimpleNamespace(thread=SimpleNamespace(id=thread_id))
def thread_fork(self, thread_id: str, params: Any) -> SimpleNamespace:
self.params.append(params)
return SimpleNamespace(thread=SimpleNamespace(id=f"{thread_id}-fork"))
def turn_start(
self,
thread_id: str,
input: object, # noqa: A002
*,
params: Any,
) -> SimpleNamespace:
self.params.append(params)
return SimpleNamespace(turn=SimpleNamespace(id=f"{thread_id}-turn"))
class CapturingAsyncApprovalClient:
"""Async mirror of CapturingApprovalClient for public async wrappers."""
def __init__(self) -> None:
self.params: list[Any] = []
async def thread_start(self, params: Any) -> SimpleNamespace:
self.params.append(params)
return SimpleNamespace(thread=SimpleNamespace(id="thread-1"))
async def thread_resume(self, thread_id: str, params: Any) -> SimpleNamespace:
self.params.append(params)
return SimpleNamespace(thread=SimpleNamespace(id=thread_id))
async def thread_fork(self, thread_id: str, params: Any) -> SimpleNamespace:
self.params.append(params)
return SimpleNamespace(thread=SimpleNamespace(id=f"{thread_id}-fork"))
async def turn_start(
self,
thread_id: str,
input: object, # noqa: A002
*,
params: Any,
) -> SimpleNamespace:
self.params.append(params)
return SimpleNamespace(turn=SimpleNamespace(id=f"{thread_id}-turn"))
def test_approval_modes_serialize_to_expected_start_params() -> None:
"""ApprovalMode should map to the app-server params sent for new work."""
assert {
@@ -339,195 +244,6 @@ def test_unknown_approval_mode_is_rejected() -> None:
public_api_module._approval_mode_settings("allow_all") # type: ignore[arg-type]
def test_approval_defaults_preserve_existing_sync_thread_settings() -> None:
"""Only thread creation should write approval defaults unless callers override."""
client = CapturingApprovalClient()
codex = Codex.__new__(Codex)
codex._client = client
started = codex.thread_start(approval_mode=ApprovalMode.deny_all)
started.turn([])
codex.thread_resume("existing-thread")
codex.thread_fork("existing-thread")
started.turn([], approval_mode=ApprovalMode.auto_review)
assert _approval_settings(client.params) == [
{"approvalPolicy": "never"},
{},
{},
{},
{
"approvalPolicy": "on-request",
"approvalsReviewer": "auto_review",
},
]
def test_approval_defaults_preserve_existing_async_thread_settings() -> None:
"""Async wrappers should follow the same approval override semantics."""
async def scenario() -> None:
client = CapturingAsyncApprovalClient()
codex = AsyncCodex()
codex._client = client # type: ignore[assignment]
codex._initialized = True
started = await codex.thread_start(approval_mode=ApprovalMode.deny_all)
await started.turn([])
await codex.thread_resume("existing-thread")
await codex.thread_fork("existing-thread")
await started.turn([], approval_mode=ApprovalMode.auto_review)
assert _approval_settings(client.params) == [
{"approvalPolicy": "never"},
{},
{},
{},
{
"approvalPolicy": "on-request",
"approvalsReviewer": "auto_review",
},
]
asyncio.run(scenario())
def test_turn_streams_can_consume_multiple_turns_on_one_client() -> None:
"""Two sync TurnHandle streams should advance independently on one client."""
client = AppServerClient()
notifications: dict[str, deque[Notification]] = {
"turn-1": deque(
[
_delta_notification(turn_id="turn-1", text="one"),
_completed_notification(turn_id="turn-1"),
]
),
"turn-2": deque(
[
_delta_notification(turn_id="turn-2", text="two"),
_completed_notification(turn_id="turn-2"),
]
),
}
client.next_turn_notification = lambda turn_id: notifications[turn_id].popleft() # type: ignore[method-assign]
first_stream = TurnHandle(client, "thread-1", "turn-1").stream()
assert next(first_stream).method == "item/agentMessage/delta"
second_stream = TurnHandle(client, "thread-1", "turn-2").stream()
assert next(second_stream).method == "item/agentMessage/delta"
assert next(first_stream).method == "turn/completed"
assert next(second_stream).method == "turn/completed"
first_stream.close()
second_stream.close()
def test_async_turn_streams_can_consume_multiple_turns_on_one_client() -> None:
"""Two async TurnHandle streams should advance independently on one client."""
async def scenario() -> None:
"""Interleave two async streams backed by separate per-turn queues."""
codex = AsyncCodex()
async def fake_ensure_initialized() -> None:
"""Avoid starting a real app-server process for this stream test."""
return None
notifications: dict[str, deque[Notification]] = {
"turn-1": deque(
[
_delta_notification(turn_id="turn-1", text="one"),
_completed_notification(turn_id="turn-1"),
]
),
"turn-2": deque(
[
_delta_notification(turn_id="turn-2", text="two"),
_completed_notification(turn_id="turn-2"),
]
),
}
async def fake_next_notification(turn_id: str) -> Notification:
"""Return the next notification from the requested per-turn queue."""
return notifications[turn_id].popleft()
codex._ensure_initialized = fake_ensure_initialized # type: ignore[method-assign]
codex._client.next_turn_notification = fake_next_notification # type: ignore[method-assign]
first_stream = AsyncTurnHandle(codex, "thread-1", "turn-1").stream()
assert (await anext(first_stream)).method == "item/agentMessage/delta"
second_stream = AsyncTurnHandle(codex, "thread-1", "turn-2").stream()
assert (await anext(second_stream)).method == "item/agentMessage/delta"
assert (await anext(first_stream)).method == "turn/completed"
assert (await anext(second_stream)).method == "turn/completed"
await first_stream.aclose()
await second_stream.aclose()
asyncio.run(scenario())
def test_turn_run_returns_completed_turn_payload() -> None:
client = AppServerClient()
notifications: deque[Notification] = deque(
[
_completed_notification(),
]
)
client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign]
result = TurnHandle(client, "thread-1", "turn-1").run()
assert result.id == "turn-1"
assert result.status == TurnStatus.completed
assert result.items == []
def test_thread_run_accepts_string_input_and_returns_run_result() -> None:
"""Sync Thread.run should preserve approval settings unless explicitly overridden."""
client = AppServerClient()
item_notification = _item_completed_notification(text="Hello.")
usage_notification = _token_usage_notification()
notifications: deque[Notification] = deque(
[
item_notification,
usage_notification,
_completed_notification(),
]
)
client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign]
seen: dict[str, object] = {}
def fake_turn_start(thread_id: str, wire_input: object, *, params=None): # noqa: ANN001,ANN202
seen["thread_id"] = thread_id
seen["wire_input"] = wire_input
seen["params"] = params
return SimpleNamespace(turn=SimpleNamespace(id="turn-1"))
client.turn_start = fake_turn_start # type: ignore[method-assign]
result = Thread(client, "thread-1").run("hello")
assert (
seen["thread_id"],
seen["wire_input"],
_approval_settings([seen["params"]]),
result,
) == (
"thread-1",
[{"type": "text", "text": "hello"}],
[{}],
RunResult(
final_response="Hello.",
items=[item_notification.payload.item],
usage=usage_notification.payload.token_usage,
),
)
def test_thread_run_uses_last_completed_assistant_message_as_final_response() -> None:
client = AppServerClient()
first_item_notification = _item_completed_notification(text="First message")
@@ -694,64 +410,6 @@ def test_stream_text_registers_and_consumes_turn_notifications() -> None:
)
def test_async_thread_run_accepts_string_input_and_returns_run_result() -> None:
"""Async Thread.run should preserve approvals while collecting routed results."""
async def scenario() -> None:
"""Feed item, usage, and completion events through the async turn stream."""
codex = AsyncCodex()
async def fake_ensure_initialized() -> None:
"""Avoid starting a real app-server process for this run test."""
return None
item_notification = _item_completed_notification(text="Hello async.")
usage_notification = _token_usage_notification()
notifications: deque[Notification] = deque(
[
item_notification,
usage_notification,
_completed_notification(),
]
)
seen: dict[str, object] = {}
async def fake_turn_start(thread_id: str, wire_input: object, *, params=None): # noqa: ANN001,ANN202
"""Capture normalized input and return a synthetic turn id."""
seen["thread_id"] = thread_id
seen["wire_input"] = wire_input
seen["params"] = params
return SimpleNamespace(turn=SimpleNamespace(id="turn-1"))
async def fake_next_notification(_turn_id: str) -> Notification:
"""Return the next queued notification for the synthetic turn."""
return notifications.popleft()
codex._ensure_initialized = fake_ensure_initialized # type: ignore[method-assign]
codex._client.turn_start = fake_turn_start # type: ignore[method-assign]
codex._client.next_turn_notification = fake_next_notification # type: ignore[method-assign]
result = await AsyncThread(codex, "thread-1").run("hello")
assert (
seen["thread_id"],
seen["wire_input"],
_approval_settings([seen["params"]]),
result,
) == (
"thread-1",
[{"type": "text", "text": "hello"}],
[{}],
RunResult(
final_response="Hello async.",
items=[item_notification.payload.item],
usage=usage_notification.payload.token_usage,
),
)
asyncio.run(scenario())
def test_async_thread_run_uses_last_completed_assistant_message_as_final_response() -> (
None
):