diff --git a/sdk/python/tests/app_server_harness.py b/sdk/python/tests/app_server_harness.py new file mode 100644 index 0000000000..7bc6469727 --- /dev/null +++ b/sdk/python/tests/app_server_harness.py @@ -0,0 +1,399 @@ +from __future__ import annotations + +import json +import queue +import shutil +import threading +import time +from dataclasses import dataclass +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from pathlib import Path +from typing import Any + +from openai_codex import AppServerConfig + + +Json = dict[str, Any] + + +@dataclass(frozen=True) +class CapturedResponsesRequest: + """Recorded request sent by app-server to the mock Responses API.""" + + method: str + path: str + headers: dict[str, str] + body: bytes + + def body_json(self) -> Json: + """Decode the request body as JSON.""" + return json.loads(self.body.decode("utf-8")) + + def input(self) -> list[Json]: + """Return the Responses API input array from the request.""" + value = self.body_json().get("input") + if not isinstance(value, list): + raise AssertionError(f"expected input list, got {value!r}") + return value + + def message_input_texts(self, role: str) -> list[str]: + """Return all input_text strings for message inputs matching one role.""" + texts: list[str] = [] + for item in self.input(): + if item.get("type") != "message" or item.get("role") != role: + continue + content = item.get("content") + if isinstance(content, str): + texts.append(content) + continue + if not isinstance(content, list): + continue + for span in content: + if isinstance(span, dict) and span.get("type") == "input_text": + text = span.get("text") + if isinstance(text, str): + texts.append(text) + return texts + + def header(self, name: str) -> str | None: + """Return a captured request header by case-insensitive name.""" + return self.headers.get(name.lower()) + + +@dataclass(frozen=True) +class MockSseResponse: + """One queued SSE response served by the mock Responses API.""" + + body: str + delay_between_events_s: float = 0.0 + + def chunks(self) -> list[bytes]: + """Split an SSE body into event chunks while preserving framing.""" + chunks: list[bytes] = [] + for part in self.body.split("\n\n"): + if not part: + continue + chunks.append(f"{part}\n\n".encode("utf-8")) + return chunks + + +class MockResponsesServer: + """Local HTTP server that records `/v1/responses` requests and returns SSE.""" + + def __init__(self) -> None: + self._responses: queue.Queue[MockSseResponse] = queue.Queue() + self._requests: list[CapturedResponsesRequest] = [] + self._requests_lock = threading.Lock() + self._server = _ResponsesHttpServer(("127.0.0.1", 0), _ResponsesHandler, self) + self._thread = threading.Thread( + target=self._server.serve_forever, + name="mock-responses-api", + daemon=True, + ) + + def __enter__(self) -> MockResponsesServer: + self._thread.start() + return self + + def __exit__(self, _exc_type: object, _exc: object, _tb: object) -> None: + self.close() + + @property + def url(self) -> str: + """Return the base URL for app-server config.""" + host, port = self._server.server_address + return f"http://{host}:{port}" + + def close(self) -> None: + """Stop the background HTTP server thread.""" + self._server.shutdown() + self._server.server_close() + self._thread.join(timeout=2) + + def enqueue_sse( + self, + body: str, + *, + delay_between_events_s: float = 0.0, + ) -> None: + """Queue one SSE body for the next `/v1/responses` request.""" + self._responses.put( + MockSseResponse( + body=body, + delay_between_events_s=delay_between_events_s, + ) + ) + + def enqueue_assistant_message(self, text: str, *, response_id: str = "resp-1") -> None: + """Queue a completed assistant-message model response.""" + self.enqueue_sse( + sse( + [ + ev_response_created(response_id), + ev_assistant_message(f"msg-{response_id}", text), + ev_completed(response_id), + ] + ) + ) + + def requests(self) -> list[CapturedResponsesRequest]: + """Return all recorded Responses API requests.""" + with self._requests_lock: + return list(self._requests) + + def single_request(self) -> CapturedResponsesRequest: + """Return the only recorded request, failing if the count differs.""" + requests = self.requests() + if len(requests) != 1: + raise AssertionError(f"expected 1 request, got {len(requests)}") + return requests[0] + + def wait_for_requests( + self, + count: int, + *, + timeout_s: float = 5.0, + ) -> list[CapturedResponsesRequest]: + """Wait until at least `count` requests have been recorded.""" + deadline = time.monotonic() + timeout_s + while time.monotonic() < deadline: + requests = self.requests() + if len(requests) >= count: + return requests + time.sleep(0.01) + requests = self.requests() + raise AssertionError(f"expected {count} requests, got {len(requests)}") + + def _record_request(self, handler: BaseHTTPRequestHandler, body: bytes) -> None: + """Record one inbound HTTP request from app-server.""" + headers = {key.lower(): value for key, value in handler.headers.items()} + request = CapturedResponsesRequest( + method=handler.command, + path=handler.path, + headers=headers, + body=body, + ) + with self._requests_lock: + self._requests.append(request) + + def _next_response(self) -> MockSseResponse: + """Return the next queued SSE response or fail the HTTP request.""" + return self._responses.get_nowait() + + +class AppServerHarness: + """Test fixture that points a pinned runtime app-server at MockResponsesServer.""" + + def __init__(self, tmp_path: Path) -> None: + self.tmp_path = tmp_path + self.codex_home = tmp_path / "codex-home" + self.workspace = tmp_path / "workspace" + self.responses = MockResponsesServer() + + def __enter__(self) -> AppServerHarness: + self.codex_home.mkdir() + self.workspace.mkdir() + self.responses.__enter__() + self._write_config() + return self + + def __exit__(self, _exc_type: object, _exc: object, _tb: object) -> None: + self.responses.__exit__(_exc_type, _exc, _tb) + shutil.rmtree(self.codex_home, ignore_errors=True) + shutil.rmtree(self.workspace, ignore_errors=True) + + def app_server_config(self) -> AppServerConfig: + """Build SDK config for an isolated pinned-runtime app-server process.""" + return AppServerConfig( + cwd=str(self.workspace), + env={ + "CODEX_HOME": str(self.codex_home), + "CODEX_APP_SERVER_DISABLE_MANAGED_CONFIG": "1", + "RUST_LOG": "warn", + }, + ) + + def _write_config(self) -> None: + """Write config.toml that routes model calls to the mock server.""" + config_toml = self.codex_home / "config.toml" + config_toml.write_text( + f""" +model = "mock-model" +approval_policy = "never" +sandbox_mode = "read-only" + +model_provider = "mock_provider" + +[model_providers.mock_provider] +name = "Mock provider for Python SDK tests" +base_url = "{self.responses.url}/v1" +wire_api = "responses" +request_max_retries = 0 +stream_max_retries = 0 +""".lstrip() + ) + + +class _ResponsesHttpServer(ThreadingHTTPServer): + """ThreadingHTTPServer carrying a reference to the owning mock.""" + + def __init__( + self, + server_address: tuple[str, int], + handler_class: type[BaseHTTPRequestHandler], + mock: MockResponsesServer, + ) -> None: + super().__init__(server_address, handler_class) + self.mock = mock + + +class _ResponsesHandler(BaseHTTPRequestHandler): + """HTTP handler for the subset of the Responses API used by SDK tests.""" + + server: _ResponsesHttpServer + + def log_message(self, _format: str, *_args: object) -> None: + """Silence default stderr logging; pytest failures print captured requests.""" + return None + + def do_GET(self) -> None: + """Serve a minimal `/v1/models` response if app-server asks for models.""" + if self.path.endswith("/v1/models") or self.path.endswith("/models"): + self._send_json( + { + "object": "list", + "data": [ + { + "id": "mock-model", + "object": "model", + "created": 0, + "owned_by": "openai", + } + ], + } + ) + return + self.send_error(404, f"unexpected GET {self.path}") + + def do_POST(self) -> None: + """Serve queued SSE responses for `/v1/responses` requests.""" + length = int(self.headers.get("content-length", "0")) + body = self.rfile.read(length) + self.server.mock._record_request(self, body) + + if not (self.path.endswith("/v1/responses") or self.path.endswith("/responses")): + self.send_error(404, f"unexpected POST {self.path}") + return + + try: + response = self.server.mock._next_response() + except queue.Empty: + self.send_error(500, "no queued SSE response") + return + + self.send_response(200) + self.send_header("content-type", "text/event-stream") + self.end_headers() + for chunk in response.chunks(): + self.wfile.write(chunk) + self.wfile.flush() + if response.delay_between_events_s: + time.sleep(response.delay_between_events_s) + + def _send_json(self, payload: Json) -> None: + """Write one JSON response.""" + body = json.dumps(payload).encode("utf-8") + self.send_response(200) + self.send_header("content-type", "application/json") + self.send_header("content-length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + +def sse(events: list[Json]) -> str: + """Build an SSE body from Responses API event JSON objects.""" + chunks: list[str] = [] + for event in events: + event_type = event["type"] + chunks.append(f"event: {event_type}\ndata: {json.dumps(event)}\n") + return "\n".join(chunks) + "\n" + + +def ev_response_created(response_id: str) -> Json: + """Return a minimal `response.created` event.""" + return {"type": "response.created", "response": {"id": response_id}} + + +def ev_completed(response_id: str) -> Json: + """Return a minimal `response.completed` event with usage.""" + return { + "type": "response.completed", + "response": { + "id": response_id, + "usage": { + "input_tokens": 1, + "input_tokens_details": None, + "output_tokens": 1, + "output_tokens_details": None, + "total_tokens": 2, + }, + }, + } + + +def ev_assistant_message(item_id: str, text: str) -> Json: + """Return a completed assistant message output item.""" + return { + "type": "response.output_item.done", + "item": { + "type": "message", + "role": "assistant", + "id": item_id, + "content": [{"type": "output_text", "text": text}], + }, + } + + +def ev_message_item_added(item_id: str, text: str = "") -> Json: + """Return an assistant message added event before streaming deltas.""" + return { + "type": "response.output_item.added", + "item": { + "type": "message", + "role": "assistant", + "id": item_id, + "content": [{"type": "output_text", "text": text}], + }, + } + + +def ev_output_text_delta(delta: str) -> Json: + """Return an output-text delta event.""" + return { + "type": "response.output_text.delta", + "delta": delta, + } + + +def ev_function_call(call_id: str, name: str, arguments: str) -> Json: + """Return a completed function-call output item.""" + return { + "type": "response.output_item.done", + "item": { + "type": "function_call", + "call_id": call_id, + "name": name, + "arguments": arguments, + }, + } + + +def ev_failed(response_id: str, message: str) -> Json: + """Return a failed model response event.""" + return { + "type": "response.failed", + "response": { + "id": response_id, + "error": {"code": "server_error", "message": message}, + }, + } diff --git a/sdk/python/tests/test_mock_app_server_integration.py b/sdk/python/tests/test_mock_app_server_integration.py new file mode 100644 index 0000000000..e91b8096c5 --- /dev/null +++ b/sdk/python/tests/test_mock_app_server_integration.py @@ -0,0 +1,500 @@ +from __future__ import annotations + +import asyncio +from collections.abc import AsyncIterator, Iterator +from typing import Any + +from app_server_harness import ( + AppServerHarness, + ev_assistant_message, + ev_completed, + ev_message_item_added, + ev_output_text_delta, + ev_response_created, + sse, +) +from openai_codex import ApprovalMode, AsyncCodex, Codex, TextInput +from openai_codex.generated.v2_all import ( + AgentMessageDeltaNotification, + AskForApprovalValue, + ItemCompletedNotification, + MessagePhase, + ThreadResumeParams, + TurnCompletedNotification, + TurnStatus, +) +from openai_codex.models import Notification + + +def _response_approval_policy(response: Any) -> str: + """Return serialized approvalPolicy from a generated thread response.""" + return response.model_dump(by_alias=True, mode="json")["approvalPolicy"] + + +def _response_approval_settings(response: Any) -> dict[str, object]: + """Return only approval fields from a generated thread response.""" + dumped = response.model_dump(by_alias=True, mode="json") + return { + key: dumped[key] + for key in ("approvalPolicy", "approvalsReviewer") + if key in dumped + } + + +def _agent_message_texts(events: list[Notification]) -> list[str]: + """Extract completed agent-message text from SDK notifications.""" + texts: list[str] = [] + for event in events: + if not isinstance(event.payload, ItemCompletedNotification): + continue + item = event.payload.item.root + if item.type == "agentMessage": + texts.append(item.text) + return texts + + +def _next_sync_delta(stream: Iterator[Notification]) -> str: + """Advance a sync turn stream until the next agent-message text delta.""" + for event in stream: + if isinstance(event.payload, AgentMessageDeltaNotification): + return event.payload.delta + raise AssertionError("stream completed before an agent-message delta") + + +async def _next_async_delta(stream: AsyncIterator[Notification]) -> str: + """Advance an async turn stream until the next agent-message text delta.""" + async for event in stream: + if isinstance(event.payload, AgentMessageDeltaNotification): + return event.payload.delta + raise AssertionError("stream completed before an agent-message delta") + + +def _streaming_response(response_id: str, item_id: str, parts: list[str]) -> str: + """Build an SSE stream with text deltas and a final assistant message.""" + return sse( + [ + ev_response_created(response_id), + ev_message_item_added(item_id), + *[ev_output_text_delta(part) for part in parts], + ev_assistant_message(item_id, "".join(parts)), + ev_completed(response_id), + ] + ) + + +def test_sync_thread_run_uses_pinned_app_server_and_mock_responses( + tmp_path, +) -> None: + """Drive Thread.run through the pinned app-server and inspect the HTTP request.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message("Hello from the mock.", response_id="run-1") + + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + result = thread.run("hello") + + request = harness.responses.single_request() + + body = request.body_json() + assert { + "final_response": result.final_response, + "agent_messages": [item.root.text for item in result.items], + "has_usage": result.usage is not None, + "request_model": body["model"], + "request_stream": body["stream"], + "request_user_texts": request.message_input_texts("user"), + } == { + "final_response": "Hello from the mock.", + "agent_messages": ["Hello from the mock."], + "has_usage": True, + "request_model": "mock-model", + "request_stream": True, + "request_user_texts": ["hello"], + } + + +def test_async_thread_run_uses_pinned_app_server_and_mock_responses( + tmp_path, +) -> None: + """Async Thread.run should exercise the same app-server boundary.""" + + async def scenario() -> None: + """Run the async client against a real app-server process.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message( + "Hello async.", + response_id="async-run-1", + ) + + async with AsyncCodex(config=harness.app_server_config()) as codex: + thread = await codex.thread_start() + result = await thread.run("async hello") + + request = harness.responses.single_request() + + assert { + "final_response": result.final_response, + "agent_messages": [item.root.text for item in result.items], + "request_user_texts": request.message_input_texts("user"), + } == { + "final_response": "Hello async.", + "agent_messages": ["Hello async."], + "request_user_texts": ["async hello"], + } + + asyncio.run(scenario()) + + +def test_sync_stream_routes_text_deltas_and_completion(tmp_path) -> None: + """A sync turn stream should expose deltas, completed items, and completion.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + _streaming_response("stream-1", "msg-stream-1", ["hel", "lo"]) + ) + + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + stream = thread.turn(TextInput("stream please")).stream() + events = list(stream) + + assert { + "deltas": [ + event.payload.delta + for event in events + if isinstance(event.payload, AgentMessageDeltaNotification) + ], + "agent_messages": _agent_message_texts(events), + "completed_statuses": [ + event.payload.turn.status + for event in events + if isinstance(event.payload, TurnCompletedNotification) + ], + } == { + "deltas": ["hel", "lo"], + "agent_messages": ["hello"], + "completed_statuses": [TurnStatus.completed], + } + + +def test_turn_run_returns_completed_turn_from_real_app_server(tmp_path) -> None: + """TurnHandle.run should wait for the app-server completion notification.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message("turn complete", response_id="turn-run-1") + + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + turn = thread.turn(TextInput("complete this turn")) + completed = turn.run() + + assert { + "turn_id": completed.id, + "status": completed.status, + "items": completed.items, + } == { + "turn_id": turn.id, + "status": TurnStatus.completed, + "items": [], + } + + +def test_async_stream_routes_text_deltas_and_completion(tmp_path) -> None: + """An async turn stream should expose the same notification sequence.""" + + async def scenario() -> None: + """Stream one async turn against the real pinned app-server.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + _streaming_response("async-stream-1", "msg-async-stream-1", ["as", "ync"]) + ) + + async with AsyncCodex(config=harness.app_server_config()) as codex: + thread = await codex.thread_start() + turn = await thread.turn(TextInput("async stream please")) + events = [event async for event in turn.stream()] + + assert { + "deltas": [ + event.payload.delta + for event in events + if isinstance(event.payload, AgentMessageDeltaNotification) + ], + "agent_messages": _agent_message_texts(events), + "completed_statuses": [ + event.payload.turn.status + for event in events + if isinstance(event.payload, TurnCompletedNotification) + ], + } == { + "deltas": ["as", "ync"], + "agent_messages": ["async"], + "completed_statuses": [TurnStatus.completed], + } + + asyncio.run(scenario()) + + +def test_interleaved_sync_turn_streams_route_by_turn_id(tmp_path) -> None: + """Two sync streams on one client should consume only their own notifications.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + _streaming_response("first-stream", "msg-first", ["one-", "done"]), + delay_between_events_s=0.01, + ) + harness.responses.enqueue_sse( + _streaming_response("second-stream", "msg-second", ["two-", "done"]), + delay_between_events_s=0.01, + ) + + with Codex(config=harness.app_server_config()) as codex: + first_thread = codex.thread_start() + second_thread = codex.thread_start() + first_turn = first_thread.turn(TextInput("first")) + second_turn = second_thread.turn(TextInput("second")) + + first_stream = first_turn.stream() + second_stream = second_turn.stream() + first_first_delta = _next_sync_delta(first_stream) + second_first_delta = _next_sync_delta(second_stream) + first_second_delta = _next_sync_delta(first_stream) + second_second_delta = _next_sync_delta(second_stream) + first_tail = list(first_stream) + second_tail = list(second_stream) + + assert { + "interleaved_deltas": [ + first_first_delta, + second_first_delta, + first_second_delta, + second_second_delta, + ], + "first_agent_messages": _agent_message_texts(first_tail), + "second_agent_messages": _agent_message_texts(second_tail), + } == { + "interleaved_deltas": ["one-", "two-", "done", "done"], + "first_agent_messages": ["one-done"], + "second_agent_messages": ["two-done"], + } + + +def test_interleaved_async_turn_streams_route_by_turn_id(tmp_path) -> None: + """Two async streams on one client should consume only their own notifications.""" + + async def scenario() -> None: + """Interleave async stream consumers against one app-server process.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + _streaming_response("async-first", "msg-async-first", ["a1", "-done"]), + delay_between_events_s=0.01, + ) + harness.responses.enqueue_sse( + _streaming_response("async-second", "msg-async-second", ["a2", "-done"]), + delay_between_events_s=0.01, + ) + + async with AsyncCodex(config=harness.app_server_config()) as codex: + first_thread = await codex.thread_start() + second_thread = await codex.thread_start() + first_turn = await first_thread.turn(TextInput("async first")) + second_turn = await second_thread.turn(TextInput("async second")) + + first_stream = first_turn.stream() + second_stream = second_turn.stream() + first_first_delta = await _next_async_delta(first_stream) + second_first_delta = await _next_async_delta(second_stream) + first_second_delta = await _next_async_delta(first_stream) + second_second_delta = await _next_async_delta(second_stream) + first_tail = [event async for event in first_stream] + second_tail = [event async for event in second_stream] + + assert { + "interleaved_deltas": [ + first_first_delta, + second_first_delta, + first_second_delta, + second_second_delta, + ], + "first_agent_messages": _agent_message_texts(first_tail), + "second_agent_messages": _agent_message_texts(second_tail), + } == { + "interleaved_deltas": ["a1", "a2", "-done", "-done"], + "first_agent_messages": ["a1-done"], + "second_agent_messages": ["a2-done"], + } + + asyncio.run(scenario()) + + +def test_thread_run_approval_mode_persists_until_explicit_override(tmp_path) -> None: + """Omitted run approval mode should not rewrite the thread's stored setting.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message("locked down", response_id="approval-1") + harness.responses.enqueue_assistant_message("reviewable", response_id="approval-2") + + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start(approval_mode=ApprovalMode.deny_all) + + start_state = codex._client.thread_resume( # noqa: SLF001 + thread.id, + ThreadResumeParams(thread_id=thread.id), + ) + first_result = thread.run("keep approvals denied") + after_default_run = codex._client.thread_resume( # noqa: SLF001 + thread.id, + ThreadResumeParams(thread_id=thread.id), + ) + second_result = thread.run( + "allow auto review now", + approval_mode=ApprovalMode.auto_review, + ) + after_override_run = codex._client.thread_resume( # noqa: SLF001 + thread.id, + ThreadResumeParams(thread_id=thread.id), + ) + + assert { + "start_policy": _response_approval_policy(start_state), + "after_default_policy": _response_approval_policy(after_default_run), + "after_override_settings": _response_approval_settings(after_override_run), + "final_responses": [ + first_result.final_response, + second_result.final_response, + ], + } == { + "start_policy": AskForApprovalValue.never.value, + "after_default_policy": AskForApprovalValue.never.value, + "after_override_settings": { + "approvalPolicy": AskForApprovalValue.on_request.value, + "approvalsReviewer": "auto_review", + }, + "final_responses": ["locked down", "reviewable"], + } + + +def test_async_thread_run_approval_mode_persists_until_explicit_override( + tmp_path, +) -> None: + """Async omitted run approval mode should leave stored settings alone.""" + + async def scenario() -> None: + """Use the async client to verify persisted app-server approval state.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_assistant_message( + "async locked down", + response_id="async-approval-1", + ) + harness.responses.enqueue_assistant_message( + "async reviewable", + response_id="async-approval-2", + ) + + async with AsyncCodex(config=harness.app_server_config()) as codex: + thread = await codex.thread_start(approval_mode=ApprovalMode.deny_all) + start_state = await codex._client.thread_resume( # noqa: SLF001 + thread.id, + ThreadResumeParams(thread_id=thread.id), + ) + first_result = await thread.run("keep async approvals denied") + after_default_run = await codex._client.thread_resume( # noqa: SLF001 + thread.id, + ThreadResumeParams(thread_id=thread.id), + ) + second_result = await thread.run( + "allow async auto review now", + approval_mode=ApprovalMode.auto_review, + ) + after_override_run = await codex._client.thread_resume( # noqa: SLF001 + thread.id, + ThreadResumeParams(thread_id=thread.id), + ) + + assert { + "start_policy": _response_approval_policy(start_state), + "after_default_policy": _response_approval_policy(after_default_run), + "after_override_settings": _response_approval_settings(after_override_run), + "final_responses": [ + first_result.final_response, + second_result.final_response, + ], + } == { + "start_policy": AskForApprovalValue.never.value, + "after_default_policy": AskForApprovalValue.never.value, + "after_override_settings": { + "approvalPolicy": AskForApprovalValue.on_request.value, + "approvalsReviewer": "auto_review", + }, + "final_responses": ["async locked down", "async reviewable"], + } + + asyncio.run(scenario()) + + +def test_thread_lifecycle_uses_real_app_server_without_model_mocking(tmp_path) -> None: + """Thread lifecycle helpers should operate through app-server JSON-RPC.""" + with AppServerHarness(tmp_path) as harness: + with Codex(config=harness.app_server_config()) as codex: + thread = codex.thread_start() + thread.set_name("sdk integration thread") + named = thread.read(include_turns=True) + forked = codex.thread_fork(thread.id) + codex.thread_archive(thread.id) + unarchived = codex.thread_unarchive(thread.id) + listed = codex.thread_list(limit=10) + + assert { + "name": named.thread.name, + "fork_parent": forked.id != thread.id, + "unarchived_id": unarchived.id, + "listed_ids": sorted(item.id for item in listed.data), + } == { + "name": "sdk integration thread", + "fork_parent": True, + "unarchived_id": thread.id, + "listed_ids": sorted([thread.id, forked.id]), + } + + +def test_final_answer_phase_survives_real_app_server_mapping(tmp_path) -> None: + """RunResult should use the final-answer item emitted by app-server.""" + with AppServerHarness(tmp_path) as harness: + harness.responses.enqueue_sse( + sse( + [ + ev_response_created("phase-1"), + { + **ev_assistant_message("msg-commentary", "Commentary"), + "item": { + **ev_assistant_message("msg-commentary", "Commentary")["item"], + "phase": MessagePhase.commentary.value, + }, + }, + { + **ev_assistant_message("msg-final", "Final answer"), + "item": { + **ev_assistant_message("msg-final", "Final answer")["item"], + "phase": MessagePhase.final_answer.value, + }, + }, + ev_completed("phase-1"), + ] + ) + ) + + with Codex(config=harness.app_server_config()) as codex: + result = codex.thread_start().run("choose final answer") + + assert { + "final_response": result.final_response, + "items": [ + { + "text": item.root.text, + "phase": None if item.root.phase is None else item.root.phase.value, + } + for item in result.items + ], + } == { + "final_response": "Final answer", + "items": [ + {"text": "Commentary", "phase": MessagePhase.commentary.value}, + {"text": "Final answer", "phase": MessagePhase.final_answer.value}, + ], + } diff --git a/sdk/python/tests/test_public_api_runtime_behavior.py b/sdk/python/tests/test_public_api_runtime_behavior.py index 1b03501454..2d170ef1de 100644 --- a/sdk/python/tests/test_public_api_runtime_behavior.py +++ b/sdk/python/tests/test_public_api_runtime_behavior.py @@ -14,21 +14,16 @@ from openai_codex.generated.v2_all import ( AgentMessageDeltaNotification, ItemCompletedNotification, MessagePhase, - ThreadTokenUsageUpdatedNotification, TurnCompletedNotification, TurnStartParams, - TurnStatus, ) from openai_codex.models import InitializeResponse, Notification from openai_codex.api import ( ApprovalMode, AsyncCodex, AsyncThread, - AsyncTurnHandle, Codex, - RunResult, Thread, - TurnHandle, ) ROOT = Path(__file__).resolve().parents[1] @@ -123,38 +118,6 @@ def _item_completed_notification( ) -def _token_usage_notification( - *, - thread_id: str = "thread-1", - turn_id: str = "turn-1", -) -> Notification: - return Notification( - method="thread/tokenUsage/updated", - payload=ThreadTokenUsageUpdatedNotification.model_validate( - { - "threadId": thread_id, - "turnId": turn_id, - "tokenUsage": { - "last": { - "cachedInputTokens": 1, - "inputTokens": 2, - "outputTokens": 3, - "reasoningOutputTokens": 4, - "totalTokens": 9, - }, - "total": { - "cachedInputTokens": 5, - "inputTokens": 6, - "outputTokens": 7, - "reasoningOutputTokens": 8, - "totalTokens": 26, - }, - }, - } - ), - ) - - def test_codex_init_failure_closes_client(monkeypatch: pytest.MonkeyPatch) -> None: closed: list[bool] = [] @@ -261,64 +224,6 @@ def _approval_mode_turn_params(approval_mode: ApprovalMode) -> TurnStartParams: ) -class CapturingApprovalClient: - """Collect wrapper params at the app-server client boundary.""" - - def __init__(self) -> None: - self.params: list[Any] = [] - - def thread_start(self, params: Any) -> SimpleNamespace: - self.params.append(params) - return SimpleNamespace(thread=SimpleNamespace(id="thread-1")) - - def thread_resume(self, thread_id: str, params: Any) -> SimpleNamespace: - self.params.append(params) - return SimpleNamespace(thread=SimpleNamespace(id=thread_id)) - - def thread_fork(self, thread_id: str, params: Any) -> SimpleNamespace: - self.params.append(params) - return SimpleNamespace(thread=SimpleNamespace(id=f"{thread_id}-fork")) - - def turn_start( - self, - thread_id: str, - input: object, # noqa: A002 - *, - params: Any, - ) -> SimpleNamespace: - self.params.append(params) - return SimpleNamespace(turn=SimpleNamespace(id=f"{thread_id}-turn")) - - -class CapturingAsyncApprovalClient: - """Async mirror of CapturingApprovalClient for public async wrappers.""" - - def __init__(self) -> None: - self.params: list[Any] = [] - - async def thread_start(self, params: Any) -> SimpleNamespace: - self.params.append(params) - return SimpleNamespace(thread=SimpleNamespace(id="thread-1")) - - async def thread_resume(self, thread_id: str, params: Any) -> SimpleNamespace: - self.params.append(params) - return SimpleNamespace(thread=SimpleNamespace(id=thread_id)) - - async def thread_fork(self, thread_id: str, params: Any) -> SimpleNamespace: - self.params.append(params) - return SimpleNamespace(thread=SimpleNamespace(id=f"{thread_id}-fork")) - - async def turn_start( - self, - thread_id: str, - input: object, # noqa: A002 - *, - params: Any, - ) -> SimpleNamespace: - self.params.append(params) - return SimpleNamespace(turn=SimpleNamespace(id=f"{thread_id}-turn")) - - def test_approval_modes_serialize_to_expected_start_params() -> None: """ApprovalMode should map to the app-server params sent for new work.""" assert { @@ -339,195 +244,6 @@ def test_unknown_approval_mode_is_rejected() -> None: public_api_module._approval_mode_settings("allow_all") # type: ignore[arg-type] -def test_approval_defaults_preserve_existing_sync_thread_settings() -> None: - """Only thread creation should write approval defaults unless callers override.""" - client = CapturingApprovalClient() - codex = Codex.__new__(Codex) - codex._client = client - - started = codex.thread_start(approval_mode=ApprovalMode.deny_all) - started.turn([]) - codex.thread_resume("existing-thread") - codex.thread_fork("existing-thread") - started.turn([], approval_mode=ApprovalMode.auto_review) - - assert _approval_settings(client.params) == [ - {"approvalPolicy": "never"}, - {}, - {}, - {}, - { - "approvalPolicy": "on-request", - "approvalsReviewer": "auto_review", - }, - ] - - -def test_approval_defaults_preserve_existing_async_thread_settings() -> None: - """Async wrappers should follow the same approval override semantics.""" - - async def scenario() -> None: - client = CapturingAsyncApprovalClient() - codex = AsyncCodex() - codex._client = client # type: ignore[assignment] - codex._initialized = True - - started = await codex.thread_start(approval_mode=ApprovalMode.deny_all) - await started.turn([]) - await codex.thread_resume("existing-thread") - await codex.thread_fork("existing-thread") - await started.turn([], approval_mode=ApprovalMode.auto_review) - - assert _approval_settings(client.params) == [ - {"approvalPolicy": "never"}, - {}, - {}, - {}, - { - "approvalPolicy": "on-request", - "approvalsReviewer": "auto_review", - }, - ] - - asyncio.run(scenario()) - - -def test_turn_streams_can_consume_multiple_turns_on_one_client() -> None: - """Two sync TurnHandle streams should advance independently on one client.""" - client = AppServerClient() - notifications: dict[str, deque[Notification]] = { - "turn-1": deque( - [ - _delta_notification(turn_id="turn-1", text="one"), - _completed_notification(turn_id="turn-1"), - ] - ), - "turn-2": deque( - [ - _delta_notification(turn_id="turn-2", text="two"), - _completed_notification(turn_id="turn-2"), - ] - ), - } - client.next_turn_notification = lambda turn_id: notifications[turn_id].popleft() # type: ignore[method-assign] - - first_stream = TurnHandle(client, "thread-1", "turn-1").stream() - assert next(first_stream).method == "item/agentMessage/delta" - - second_stream = TurnHandle(client, "thread-1", "turn-2").stream() - assert next(second_stream).method == "item/agentMessage/delta" - assert next(first_stream).method == "turn/completed" - assert next(second_stream).method == "turn/completed" - - first_stream.close() - second_stream.close() - - -def test_async_turn_streams_can_consume_multiple_turns_on_one_client() -> None: - """Two async TurnHandle streams should advance independently on one client.""" - - async def scenario() -> None: - """Interleave two async streams backed by separate per-turn queues.""" - codex = AsyncCodex() - - async def fake_ensure_initialized() -> None: - """Avoid starting a real app-server process for this stream test.""" - return None - - notifications: dict[str, deque[Notification]] = { - "turn-1": deque( - [ - _delta_notification(turn_id="turn-1", text="one"), - _completed_notification(turn_id="turn-1"), - ] - ), - "turn-2": deque( - [ - _delta_notification(turn_id="turn-2", text="two"), - _completed_notification(turn_id="turn-2"), - ] - ), - } - - async def fake_next_notification(turn_id: str) -> Notification: - """Return the next notification from the requested per-turn queue.""" - return notifications[turn_id].popleft() - - codex._ensure_initialized = fake_ensure_initialized # type: ignore[method-assign] - codex._client.next_turn_notification = fake_next_notification # type: ignore[method-assign] - - first_stream = AsyncTurnHandle(codex, "thread-1", "turn-1").stream() - assert (await anext(first_stream)).method == "item/agentMessage/delta" - - second_stream = AsyncTurnHandle(codex, "thread-1", "turn-2").stream() - assert (await anext(second_stream)).method == "item/agentMessage/delta" - assert (await anext(first_stream)).method == "turn/completed" - assert (await anext(second_stream)).method == "turn/completed" - - await first_stream.aclose() - await second_stream.aclose() - - asyncio.run(scenario()) - - -def test_turn_run_returns_completed_turn_payload() -> None: - client = AppServerClient() - notifications: deque[Notification] = deque( - [ - _completed_notification(), - ] - ) - client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign] - - result = TurnHandle(client, "thread-1", "turn-1").run() - - assert result.id == "turn-1" - assert result.status == TurnStatus.completed - assert result.items == [] - - -def test_thread_run_accepts_string_input_and_returns_run_result() -> None: - """Sync Thread.run should preserve approval settings unless explicitly overridden.""" - client = AppServerClient() - item_notification = _item_completed_notification(text="Hello.") - usage_notification = _token_usage_notification() - notifications: deque[Notification] = deque( - [ - item_notification, - usage_notification, - _completed_notification(), - ] - ) - client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign] - seen: dict[str, object] = {} - - def fake_turn_start(thread_id: str, wire_input: object, *, params=None): # noqa: ANN001,ANN202 - seen["thread_id"] = thread_id - seen["wire_input"] = wire_input - seen["params"] = params - return SimpleNamespace(turn=SimpleNamespace(id="turn-1")) - - client.turn_start = fake_turn_start # type: ignore[method-assign] - - result = Thread(client, "thread-1").run("hello") - - assert ( - seen["thread_id"], - seen["wire_input"], - _approval_settings([seen["params"]]), - result, - ) == ( - "thread-1", - [{"type": "text", "text": "hello"}], - [{}], - RunResult( - final_response="Hello.", - items=[item_notification.payload.item], - usage=usage_notification.payload.token_usage, - ), - ) - - def test_thread_run_uses_last_completed_assistant_message_as_final_response() -> None: client = AppServerClient() first_item_notification = _item_completed_notification(text="First message") @@ -694,64 +410,6 @@ def test_stream_text_registers_and_consumes_turn_notifications() -> None: ) -def test_async_thread_run_accepts_string_input_and_returns_run_result() -> None: - """Async Thread.run should preserve approvals while collecting routed results.""" - - async def scenario() -> None: - """Feed item, usage, and completion events through the async turn stream.""" - codex = AsyncCodex() - - async def fake_ensure_initialized() -> None: - """Avoid starting a real app-server process for this run test.""" - return None - - item_notification = _item_completed_notification(text="Hello async.") - usage_notification = _token_usage_notification() - notifications: deque[Notification] = deque( - [ - item_notification, - usage_notification, - _completed_notification(), - ] - ) - seen: dict[str, object] = {} - - async def fake_turn_start(thread_id: str, wire_input: object, *, params=None): # noqa: ANN001,ANN202 - """Capture normalized input and return a synthetic turn id.""" - seen["thread_id"] = thread_id - seen["wire_input"] = wire_input - seen["params"] = params - return SimpleNamespace(turn=SimpleNamespace(id="turn-1")) - - async def fake_next_notification(_turn_id: str) -> Notification: - """Return the next queued notification for the synthetic turn.""" - return notifications.popleft() - - codex._ensure_initialized = fake_ensure_initialized # type: ignore[method-assign] - codex._client.turn_start = fake_turn_start # type: ignore[method-assign] - codex._client.next_turn_notification = fake_next_notification # type: ignore[method-assign] - - result = await AsyncThread(codex, "thread-1").run("hello") - - assert ( - seen["thread_id"], - seen["wire_input"], - _approval_settings([seen["params"]]), - result, - ) == ( - "thread-1", - [{"type": "text", "text": "hello"}], - [{}], - RunResult( - final_response="Hello async.", - items=[item_notification.payload.item], - usage=usage_notification.payload.token_usage, - ), - ) - - asyncio.run(scenario()) - - def test_async_thread_run_uses_last_completed_assistant_message_as_final_response() -> ( None ):