Files
codex/sdk/python/tests/test_public_api_runtime_behavior.py
Ahmed Ibrahim ffe6e44a03 Add high-level Python SDK approval mode
Expose approval_mode with deny_all and auto_review options on the high-level Python SDK, and map those choices to generated app-server approval params internally.

Update examples, docs, notebooks, and public API tests to use the new mode instead of raw generated approval fields.

Co-authored-by: Codex <noreply@openai.com>
2026-05-10 11:44:34 +03:00

866 lines
30 KiB
Python

from __future__ import annotations
import asyncio
from collections import deque
from pathlib import Path
from types import SimpleNamespace
from typing import Any
import pytest
import openai_codex.api as public_api_module
from openai_codex.client import AppServerClient
from openai_codex.generated.v2_all import (
AgentMessageDeltaNotification,
ItemCompletedNotification,
MessagePhase,
ThreadTokenUsageUpdatedNotification,
TurnCompletedNotification,
TurnStatus,
)
from openai_codex.models import InitializeResponse, Notification
from openai_codex.api import (
ApprovalMode,
AsyncCodex,
AsyncThread,
AsyncTurnHandle,
Codex,
RunResult,
TextInput,
Thread,
TurnHandle,
)
from openai_codex.types import AskForApproval
ROOT = Path(__file__).resolve().parents[1]
def _approval_settings(params: list[Any]) -> list[dict[str, object]]:
"""Return serialized approval settings from captured Pydantic params."""
return [
{
key: value
for key, value in param.model_dump(
by_alias=True,
exclude_none=True,
mode="json",
).items()
if key in {"approvalPolicy", "approvalsReviewer"}
}
for param in params
]
def _delta_notification(
*,
thread_id: str = "thread-1",
turn_id: str = "turn-1",
text: str = "delta-text",
) -> Notification:
return Notification(
method="item/agentMessage/delta",
payload=AgentMessageDeltaNotification.model_validate(
{
"delta": text,
"itemId": "item-1",
"threadId": thread_id,
"turnId": turn_id,
}
),
)
def _completed_notification(
*,
thread_id: str = "thread-1",
turn_id: str = "turn-1",
status: str = "completed",
error_message: str | None = None,
) -> Notification:
turn: dict[str, object] = {
"id": turn_id,
"items": [],
"status": status,
}
if error_message is not None:
turn["error"] = {"message": error_message}
return Notification(
method="turn/completed",
payload=TurnCompletedNotification.model_validate(
{
"threadId": thread_id,
"turn": turn,
}
),
)
def _item_completed_notification(
*,
thread_id: str = "thread-1",
turn_id: str = "turn-1",
text: str = "final text",
phase: MessagePhase | None = None,
) -> Notification:
"""Build a realistic completed-item notification accepted by generated models."""
item: dict[str, object] = {
"id": "item-1",
"text": text,
"type": "agentMessage",
}
if phase is not None:
item["phase"] = phase.value
return Notification(
method="item/completed",
payload=ItemCompletedNotification.model_validate(
{
# The pinned runtime schema requires completion timestamps.
"completedAtMs": 1,
"item": item,
"threadId": thread_id,
"turnId": turn_id,
}
),
)
def _token_usage_notification(
*,
thread_id: str = "thread-1",
turn_id: str = "turn-1",
) -> Notification:
return Notification(
method="thread/tokenUsage/updated",
payload=ThreadTokenUsageUpdatedNotification.model_validate(
{
"threadId": thread_id,
"turnId": turn_id,
"tokenUsage": {
"last": {
"cachedInputTokens": 1,
"inputTokens": 2,
"outputTokens": 3,
"reasoningOutputTokens": 4,
"totalTokens": 9,
},
"total": {
"cachedInputTokens": 5,
"inputTokens": 6,
"outputTokens": 7,
"reasoningOutputTokens": 8,
"totalTokens": 26,
},
},
}
),
)
def test_codex_init_failure_closes_client(monkeypatch: pytest.MonkeyPatch) -> None:
closed: list[bool] = []
class FakeClient:
def __init__(self, config=None) -> None: # noqa: ANN001,ARG002
self._closed = False
def start(self) -> None:
return None
def initialize(self) -> InitializeResponse:
return InitializeResponse.model_validate({})
def close(self) -> None:
self._closed = True
closed.append(True)
monkeypatch.setattr(public_api_module, "AppServerClient", FakeClient)
with pytest.raises(RuntimeError, match="missing required metadata"):
Codex()
assert closed == [True]
def test_async_codex_init_failure_closes_client() -> None:
async def scenario() -> None:
codex = AsyncCodex()
close_calls = 0
async def fake_start() -> None:
return None
async def fake_initialize() -> InitializeResponse:
return InitializeResponse.model_validate({})
async def fake_close() -> None:
nonlocal close_calls
close_calls += 1
codex._client.start = fake_start # type: ignore[method-assign]
codex._client.initialize = fake_initialize # type: ignore[method-assign]
codex._client.close = fake_close # type: ignore[method-assign]
with pytest.raises(RuntimeError, match="missing required metadata"):
await codex.models()
assert close_calls == 1
assert codex._initialized is False
assert codex._init is None
asyncio.run(scenario())
def test_async_codex_initializes_only_once_under_concurrency() -> None:
async def scenario() -> None:
codex = AsyncCodex()
start_calls = 0
initialize_calls = 0
ready = asyncio.Event()
async def fake_start() -> None:
nonlocal start_calls
start_calls += 1
async def fake_initialize() -> InitializeResponse:
nonlocal initialize_calls
initialize_calls += 1
ready.set()
await asyncio.sleep(0.02)
return InitializeResponse.model_validate(
{
"userAgent": "codex-cli/1.2.3",
"serverInfo": {"name": "codex-cli", "version": "1.2.3"},
}
)
async def fake_model_list(include_hidden: bool = False): # noqa: ANN202,ARG001
await ready.wait()
return object()
codex._client.start = fake_start # type: ignore[method-assign]
codex._client.initialize = fake_initialize # type: ignore[method-assign]
codex._client.model_list = fake_model_list # type: ignore[method-assign]
await asyncio.gather(codex.models(), codex.models())
assert start_calls == 1
assert initialize_calls == 1
asyncio.run(scenario())
def test_ask_for_approval_exposes_simple_policy_constants() -> None:
"""AskForApproval should expose enum-like aliases for simple policies."""
assert {
"untrusted": AskForApproval.untrusted.model_dump(mode="json"),
"on_failure": AskForApproval.on_failure.model_dump(mode="json"),
"on_request": AskForApproval.on_request.model_dump(mode="json"),
"never": AskForApproval.never.model_dump(mode="json"),
} == {
"untrusted": "untrusted",
"on_failure": "on-failure",
"on_request": "on-request",
"never": "never",
}
def test_sync_api_maps_approval_modes_for_started_work() -> None:
"""Sync start methods should serialize only supported approval modes."""
captured: list[Any] = []
class FakeClient:
def thread_start(self, params: object) -> SimpleNamespace:
captured.append(params)
return SimpleNamespace(thread=SimpleNamespace(id="thread-started"))
def thread_resume(
self,
_thread_id: str,
params: object,
) -> SimpleNamespace:
captured.append(params)
return SimpleNamespace(thread=SimpleNamespace(id="thread-resumed"))
def thread_fork(
self,
_thread_id: str,
params: object,
) -> SimpleNamespace:
captured.append(params)
return SimpleNamespace(thread=SimpleNamespace(id="thread-forked"))
def turn_start(
self,
_thread_id: str,
_wire_input: object,
*,
params: object | None = None,
) -> SimpleNamespace:
captured.append(params)
return SimpleNamespace(turn=SimpleNamespace(id="turn-1"))
client = FakeClient()
codex = object.__new__(Codex)
codex._client = client
codex.thread_start()
codex.thread_resume("thread-1")
codex.thread_fork("thread-1")
Thread(client, "thread-1").turn(TextInput("hello"))
codex.thread_start(approval_mode=ApprovalMode.auto_review)
codex.thread_resume("thread-1", approval_mode=ApprovalMode.auto_review)
codex.thread_fork("thread-1", approval_mode=ApprovalMode.auto_review)
Thread(client, "thread-1").turn(
TextInput("hello"),
approval_mode=ApprovalMode.auto_review,
)
assert _approval_settings(captured) == [
{"approvalPolicy": "never"},
{"approvalPolicy": "never"},
{"approvalPolicy": "never"},
{"approvalPolicy": "never"},
{"approvalPolicy": "on-request", "approvalsReviewer": "auto_review"},
{"approvalPolicy": "on-request", "approvalsReviewer": "auto_review"},
{"approvalPolicy": "on-request", "approvalsReviewer": "auto_review"},
{"approvalPolicy": "on-request", "approvalsReviewer": "auto_review"},
]
def test_async_api_maps_approval_modes_for_started_work() -> None:
"""Async start methods should serialize only supported approval modes."""
async def scenario() -> None:
"""Exercise the async wrappers without spawning a real app server."""
captured: list[Any] = []
class FakeAsyncClient:
async def thread_start(self, params: object) -> SimpleNamespace:
captured.append(params)
return SimpleNamespace(thread=SimpleNamespace(id="thread-started"))
async def thread_resume(
self,
_thread_id: str,
params: object,
) -> SimpleNamespace:
captured.append(params)
return SimpleNamespace(thread=SimpleNamespace(id="thread-resumed"))
async def thread_fork(
self,
_thread_id: str,
params: object,
) -> SimpleNamespace:
captured.append(params)
return SimpleNamespace(thread=SimpleNamespace(id="thread-forked"))
async def turn_start(
self,
_thread_id: str,
_wire_input: object,
*,
params: object | None = None,
) -> SimpleNamespace:
captured.append(params)
return SimpleNamespace(turn=SimpleNamespace(id="turn-1"))
codex = AsyncCodex()
codex._client = FakeAsyncClient()
codex._initialized = True
await codex.thread_start()
await codex.thread_resume("thread-1")
await codex.thread_fork("thread-1")
await AsyncThread(codex, "thread-1").turn(TextInput("hello"))
await codex.thread_start(approval_mode=ApprovalMode.auto_review)
await codex.thread_resume(
"thread-1",
approval_mode=ApprovalMode.auto_review,
)
await codex.thread_fork("thread-1", approval_mode=ApprovalMode.auto_review)
await AsyncThread(codex, "thread-1").turn(
TextInput("hello"),
approval_mode=ApprovalMode.auto_review,
)
assert _approval_settings(captured) == [
{"approvalPolicy": "never"},
{"approvalPolicy": "never"},
{"approvalPolicy": "never"},
{"approvalPolicy": "never"},
{"approvalPolicy": "on-request", "approvalsReviewer": "auto_review"},
{"approvalPolicy": "on-request", "approvalsReviewer": "auto_review"},
{"approvalPolicy": "on-request", "approvalsReviewer": "auto_review"},
{"approvalPolicy": "on-request", "approvalsReviewer": "auto_review"},
]
asyncio.run(scenario())
def test_turn_streams_can_consume_multiple_turns_on_one_client() -> None:
"""Two sync TurnHandle streams should advance independently on one client."""
client = AppServerClient()
notifications: dict[str, deque[Notification]] = {
"turn-1": deque(
[
_delta_notification(turn_id="turn-1", text="one"),
_completed_notification(turn_id="turn-1"),
]
),
"turn-2": deque(
[
_delta_notification(turn_id="turn-2", text="two"),
_completed_notification(turn_id="turn-2"),
]
),
}
client.next_turn_notification = lambda turn_id: notifications[turn_id].popleft() # type: ignore[method-assign]
first_stream = TurnHandle(client, "thread-1", "turn-1").stream()
assert next(first_stream).method == "item/agentMessage/delta"
second_stream = TurnHandle(client, "thread-1", "turn-2").stream()
assert next(second_stream).method == "item/agentMessage/delta"
assert next(first_stream).method == "turn/completed"
assert next(second_stream).method == "turn/completed"
first_stream.close()
second_stream.close()
def test_async_turn_streams_can_consume_multiple_turns_on_one_client() -> None:
"""Two async TurnHandle streams should advance independently on one client."""
async def scenario() -> None:
"""Interleave two async streams backed by separate per-turn queues."""
codex = AsyncCodex()
async def fake_ensure_initialized() -> None:
"""Avoid starting a real app-server process for this stream test."""
return None
notifications: dict[str, deque[Notification]] = {
"turn-1": deque(
[
_delta_notification(turn_id="turn-1", text="one"),
_completed_notification(turn_id="turn-1"),
]
),
"turn-2": deque(
[
_delta_notification(turn_id="turn-2", text="two"),
_completed_notification(turn_id="turn-2"),
]
),
}
async def fake_next_notification(turn_id: str) -> Notification:
"""Return the next notification from the requested per-turn queue."""
return notifications[turn_id].popleft()
codex._ensure_initialized = fake_ensure_initialized # type: ignore[method-assign]
codex._client.next_turn_notification = fake_next_notification # type: ignore[method-assign]
first_stream = AsyncTurnHandle(codex, "thread-1", "turn-1").stream()
assert (await anext(first_stream)).method == "item/agentMessage/delta"
second_stream = AsyncTurnHandle(codex, "thread-1", "turn-2").stream()
assert (await anext(second_stream)).method == "item/agentMessage/delta"
assert (await anext(first_stream)).method == "turn/completed"
assert (await anext(second_stream)).method == "turn/completed"
await first_stream.aclose()
await second_stream.aclose()
asyncio.run(scenario())
def test_turn_run_returns_completed_turn_payload() -> None:
client = AppServerClient()
notifications: deque[Notification] = deque(
[
_completed_notification(),
]
)
client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign]
result = TurnHandle(client, "thread-1", "turn-1").run()
assert result.id == "turn-1"
assert result.status == TurnStatus.completed
assert result.items == []
def test_thread_run_accepts_string_input_and_returns_run_result() -> None:
client = AppServerClient()
item_notification = _item_completed_notification(text="Hello.")
usage_notification = _token_usage_notification()
notifications: deque[Notification] = deque(
[
item_notification,
usage_notification,
_completed_notification(),
]
)
client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign]
seen: dict[str, object] = {}
def fake_turn_start(thread_id: str, wire_input: object, *, params=None): # noqa: ANN001,ANN202
seen["thread_id"] = thread_id
seen["wire_input"] = wire_input
seen["params"] = params
return SimpleNamespace(turn=SimpleNamespace(id="turn-1"))
client.turn_start = fake_turn_start # type: ignore[method-assign]
result = Thread(client, "thread-1").run(
"hello",
approval_mode=ApprovalMode.auto_review,
)
assert (
seen["thread_id"],
seen["wire_input"],
_approval_settings([seen["params"]]),
result,
) == (
"thread-1",
[{"type": "text", "text": "hello"}],
[{"approvalPolicy": "on-request", "approvalsReviewer": "auto_review"}],
RunResult(
final_response="Hello.",
items=[item_notification.payload.item],
usage=usage_notification.payload.token_usage,
),
)
def test_thread_run_uses_last_completed_assistant_message_as_final_response() -> None:
client = AppServerClient()
first_item_notification = _item_completed_notification(text="First message")
second_item_notification = _item_completed_notification(text="Second message")
notifications: deque[Notification] = deque(
[
first_item_notification,
second_item_notification,
_completed_notification(),
]
)
client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign]
client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731
turn=SimpleNamespace(id="turn-1")
)
result = Thread(client, "thread-1").run("hello")
assert result.final_response == "Second message"
assert result.items == [
first_item_notification.payload.item,
second_item_notification.payload.item,
]
def test_thread_run_preserves_empty_last_assistant_message() -> None:
client = AppServerClient()
first_item_notification = _item_completed_notification(text="First message")
second_item_notification = _item_completed_notification(text="")
notifications: deque[Notification] = deque(
[
first_item_notification,
second_item_notification,
_completed_notification(),
]
)
client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign]
client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731
turn=SimpleNamespace(id="turn-1")
)
result = Thread(client, "thread-1").run("hello")
assert result.final_response == ""
assert result.items == [
first_item_notification.payload.item,
second_item_notification.payload.item,
]
def test_thread_run_prefers_explicit_final_answer_over_later_commentary() -> None:
client = AppServerClient()
final_answer_notification = _item_completed_notification(
text="Final answer",
phase=MessagePhase.final_answer,
)
commentary_notification = _item_completed_notification(
text="Commentary",
phase=MessagePhase.commentary,
)
notifications: deque[Notification] = deque(
[
final_answer_notification,
commentary_notification,
_completed_notification(),
]
)
client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign]
client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731
turn=SimpleNamespace(id="turn-1")
)
result = Thread(client, "thread-1").run("hello")
assert result.final_response == "Final answer"
assert result.items == [
final_answer_notification.payload.item,
commentary_notification.payload.item,
]
def test_thread_run_returns_none_when_only_commentary_messages_complete() -> None:
client = AppServerClient()
commentary_notification = _item_completed_notification(
text="Commentary",
phase=MessagePhase.commentary,
)
notifications: deque[Notification] = deque(
[
commentary_notification,
_completed_notification(),
]
)
client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign]
client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731
turn=SimpleNamespace(id="turn-1")
)
result = Thread(client, "thread-1").run("hello")
assert result.final_response is None
assert result.items == [commentary_notification.payload.item]
def test_thread_run_raises_on_failed_turn() -> None:
client = AppServerClient()
notifications: deque[Notification] = deque(
[
_completed_notification(status="failed", error_message="boom"),
]
)
client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign]
client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731
turn=SimpleNamespace(id="turn-1")
)
with pytest.raises(RuntimeError, match="boom"):
Thread(client, "thread-1").run("hello")
def test_stream_text_registers_and_consumes_turn_notifications() -> None:
"""stream_text should register, consume, and unregister one turn queue."""
client = AppServerClient()
notifications: deque[Notification] = deque(
[
_delta_notification(text="first"),
_delta_notification(text="second"),
_completed_notification(),
]
)
calls: list[tuple[str, str]] = []
client.turn_start = lambda thread_id, input_items, *, params=None: SimpleNamespace( # noqa: ARG005,E731
turn=SimpleNamespace(id="turn-1")
)
def fake_register(turn_id: str) -> None:
"""Record registration for the turn created by stream_text."""
calls.append(("register", turn_id))
def fake_next(turn_id: str) -> Notification:
"""Return the next queued notification for stream_text."""
calls.append(("next", turn_id))
return notifications.popleft()
def fake_unregister(turn_id: str) -> None:
"""Record cleanup for the turn created by stream_text."""
calls.append(("unregister", turn_id))
client.register_turn_notifications = fake_register # type: ignore[method-assign]
client.next_turn_notification = fake_next # type: ignore[method-assign]
client.unregister_turn_notifications = fake_unregister # type: ignore[method-assign]
chunks = list(client.stream_text("thread-1", "hello"))
assert ([chunk.delta for chunk in chunks], calls) == (
["first", "second"],
[
("register", "turn-1"),
("next", "turn-1"),
("next", "turn-1"),
("next", "turn-1"),
("unregister", "turn-1"),
],
)
def test_async_thread_run_accepts_string_input_and_returns_run_result() -> None:
"""Async Thread.run should normalize string input and collect routed results."""
async def scenario() -> None:
"""Feed item, usage, and completion events through the async turn stream."""
codex = AsyncCodex()
async def fake_ensure_initialized() -> None:
"""Avoid starting a real app-server process for this run test."""
return None
item_notification = _item_completed_notification(text="Hello async.")
usage_notification = _token_usage_notification()
notifications: deque[Notification] = deque(
[
item_notification,
usage_notification,
_completed_notification(),
]
)
seen: dict[str, object] = {}
async def fake_turn_start(thread_id: str, wire_input: object, *, params=None): # noqa: ANN001,ANN202
"""Capture normalized input and return a synthetic turn id."""
seen["thread_id"] = thread_id
seen["wire_input"] = wire_input
seen["params"] = params
return SimpleNamespace(turn=SimpleNamespace(id="turn-1"))
async def fake_next_notification(_turn_id: str) -> Notification:
"""Return the next queued notification for the synthetic turn."""
return notifications.popleft()
codex._ensure_initialized = fake_ensure_initialized # type: ignore[method-assign]
codex._client.turn_start = fake_turn_start # type: ignore[method-assign]
codex._client.next_turn_notification = fake_next_notification # type: ignore[method-assign]
result = await AsyncThread(codex, "thread-1").run(
"hello",
approval_mode=ApprovalMode.auto_review,
)
assert (
seen["thread_id"],
seen["wire_input"],
_approval_settings([seen["params"]]),
result,
) == (
"thread-1",
[{"type": "text", "text": "hello"}],
[{"approvalPolicy": "on-request", "approvalsReviewer": "auto_review"}],
RunResult(
final_response="Hello async.",
items=[item_notification.payload.item],
usage=usage_notification.payload.token_usage,
),
)
asyncio.run(scenario())
def test_async_thread_run_uses_last_completed_assistant_message_as_final_response() -> (
None
):
"""Async run should use the last final assistant message as the response text."""
async def scenario() -> None:
"""Feed two completed agent messages through the async per-turn stream."""
codex = AsyncCodex()
async def fake_ensure_initialized() -> None:
"""Avoid starting a real app-server process for this run test."""
return None
first_item_notification = _item_completed_notification(
text="First async message"
)
second_item_notification = _item_completed_notification(
text="Second async message"
)
notifications: deque[Notification] = deque(
[
first_item_notification,
second_item_notification,
_completed_notification(),
]
)
async def fake_turn_start(thread_id: str, wire_input: object, *, params=None): # noqa: ANN001,ANN202,ARG001
"""Return a synthetic turn id after AsyncThread.run builds input."""
return SimpleNamespace(turn=SimpleNamespace(id="turn-1"))
async def fake_next_notification(_turn_id: str) -> Notification:
"""Return the next queued notification for that synthetic turn."""
return notifications.popleft()
codex._ensure_initialized = fake_ensure_initialized # type: ignore[method-assign]
codex._client.turn_start = fake_turn_start # type: ignore[method-assign]
codex._client.next_turn_notification = fake_next_notification # type: ignore[method-assign]
result = await AsyncThread(codex, "thread-1").run("hello")
assert result.final_response == "Second async message"
assert result.items == [
first_item_notification.payload.item,
second_item_notification.payload.item,
]
asyncio.run(scenario())
def test_async_thread_run_returns_none_when_only_commentary_messages_complete() -> None:
"""Async Thread.run should ignore commentary-only messages for final text."""
async def scenario() -> None:
"""Feed a commentary item and completion through the async turn stream."""
codex = AsyncCodex()
async def fake_ensure_initialized() -> None:
"""Avoid starting a real app-server process for this run test."""
return None
commentary_notification = _item_completed_notification(
text="Commentary",
phase=MessagePhase.commentary,
)
notifications: deque[Notification] = deque(
[
commentary_notification,
_completed_notification(),
]
)
async def fake_turn_start(thread_id: str, wire_input: object, *, params=None): # noqa: ANN001,ANN202,ARG001
"""Return a synthetic turn id for commentary-only output."""
return SimpleNamespace(turn=SimpleNamespace(id="turn-1"))
async def fake_next_notification(_turn_id: str) -> Notification:
"""Return the next queued commentary/completion notification."""
return notifications.popleft()
codex._ensure_initialized = fake_ensure_initialized # type: ignore[method-assign]
codex._client.turn_start = fake_turn_start # type: ignore[method-assign]
codex._client.next_turn_notification = fake_next_notification # type: ignore[method-assign]
result = await AsyncThread(codex, "thread-1").run("hello")
assert result.final_response is None
assert result.items == [commentary_notification.payload.item]
asyncio.run(scenario())
def test_retry_examples_compare_status_with_enum() -> None:
for path in (
ROOT / "examples" / "10_error_handling_and_retry" / "sync.py",
ROOT / "examples" / "10_error_handling_and_retry" / "async.py",
):
source = path.read_text()
assert '== "failed"' not in source
assert "TurnStatus.failed" in source