Reduce approval mode test mocking

Replace fake sync and async client approval tests with direct serialization checks using generated TurnStartParams, while keeping existing run-path coverage for the default behavior.

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Ahmed Ibrahim
2026-05-10 12:45:11 +03:00
parent 934eda61c3
commit 3b0b5a58e1

View File

@@ -16,6 +16,7 @@ from openai_codex.generated.v2_all import (
MessagePhase,
ThreadTokenUsageUpdatedNotification,
TurnCompletedNotification,
TurnStartParams,
TurnStatus,
)
from openai_codex.models import InitializeResponse, Notification
@@ -26,7 +27,6 @@ from openai_codex.api import (
AsyncTurnHandle,
Codex,
RunResult,
TextInput,
Thread,
TurnHandle,
)
@@ -248,229 +248,37 @@ def test_async_codex_initializes_only_once_under_concurrency() -> None:
asyncio.run(scenario())
def test_sync_api_maps_approval_modes_for_started_work() -> None:
"""Sync start methods should serialize only supported approval modes."""
captured: list[Any] = []
class FakeClient:
def thread_start(self, params: object) -> SimpleNamespace:
captured.append(params)
return SimpleNamespace(thread=SimpleNamespace(id="thread-started"))
def thread_resume(
self,
_thread_id: str,
params: object,
) -> SimpleNamespace:
captured.append(params)
return SimpleNamespace(thread=SimpleNamespace(id="thread-resumed"))
def thread_fork(
self,
_thread_id: str,
params: object,
) -> SimpleNamespace:
captured.append(params)
return SimpleNamespace(thread=SimpleNamespace(id="thread-forked"))
def turn_start(
self,
_thread_id: str,
_wire_input: object,
*,
params: object | None = None,
) -> SimpleNamespace:
captured.append(params)
return SimpleNamespace(turn=SimpleNamespace(id="turn-1"))
client = FakeClient()
codex = object.__new__(Codex)
codex._client = client
codex.thread_start()
codex.thread_resume("thread-1")
codex.thread_fork("thread-1")
Thread(client, "thread-1").turn(TextInput("hello"))
codex.thread_start(approval_mode=ApprovalMode.deny_all)
codex.thread_resume("thread-1", approval_mode=ApprovalMode.deny_all)
codex.thread_fork("thread-1", approval_mode=ApprovalMode.deny_all)
Thread(client, "thread-1").turn(
TextInput("hello"),
approval_mode=ApprovalMode.deny_all,
def _approval_mode_turn_params(approval_mode: ApprovalMode) -> TurnStartParams:
"""Build real generated turn params from one public approval mode."""
approval_policy, approvals_reviewer = public_api_module._approval_mode_settings(
approval_mode
)
assert _approval_settings(captured) == [
{"approvalPolicy": "on-request", "approvalsReviewer": "auto_review"},
{"approvalPolicy": "on-request", "approvalsReviewer": "auto_review"},
{"approvalPolicy": "on-request", "approvalsReviewer": "auto_review"},
{"approvalPolicy": "on-request", "approvalsReviewer": "auto_review"},
{"approvalPolicy": "never"},
{"approvalPolicy": "never"},
{"approvalPolicy": "never"},
{"approvalPolicy": "never"},
]
def test_sync_api_rejects_unknown_approval_mode_before_rpc() -> None:
"""Unknown approval modes should fail before building any client request."""
calls: list[str] = []
class FailOnRpcClient:
def thread_start(self, _params: object) -> SimpleNamespace:
calls.append("thread_start")
return SimpleNamespace(thread=SimpleNamespace(id="thread-started"))
def turn_start(
self,
_thread_id: str,
_wire_input: object,
*,
params: object | None = None, # noqa: ARG002
) -> SimpleNamespace:
calls.append("turn_start")
return SimpleNamespace(turn=SimpleNamespace(id="turn-1"))
client = FailOnRpcClient()
codex = object.__new__(Codex)
codex._client = client
errors: list[str] = []
for call in (
lambda: codex.thread_start(approval_mode="allow_all"), # type: ignore[arg-type]
lambda: Thread(client, "thread-1").turn( # type: ignore[arg-type]
TextInput("hello"),
approval_mode="allow_all",
),
):
with pytest.raises(ValueError) as exc_info:
call()
errors.append(str(exc_info.value))
assert (errors, calls) == (
[
"approval_mode must be one of: deny_all, auto_review",
"approval_mode must be one of: deny_all, auto_review",
],
[],
return TurnStartParams(
thread_id="thread-1",
input=[],
approval_policy=approval_policy,
approvals_reviewer=approvals_reviewer,
)
def test_async_api_maps_approval_modes_for_started_work() -> None:
"""Async start methods should serialize only supported approval modes."""
async def scenario() -> None:
"""Exercise the async wrappers without spawning a real app server."""
captured: list[Any] = []
class FakeAsyncClient:
async def thread_start(self, params: object) -> SimpleNamespace:
captured.append(params)
return SimpleNamespace(thread=SimpleNamespace(id="thread-started"))
async def thread_resume(
self,
_thread_id: str,
params: object,
) -> SimpleNamespace:
captured.append(params)
return SimpleNamespace(thread=SimpleNamespace(id="thread-resumed"))
async def thread_fork(
self,
_thread_id: str,
params: object,
) -> SimpleNamespace:
captured.append(params)
return SimpleNamespace(thread=SimpleNamespace(id="thread-forked"))
async def turn_start(
self,
_thread_id: str,
_wire_input: object,
*,
params: object | None = None,
) -> SimpleNamespace:
captured.append(params)
return SimpleNamespace(turn=SimpleNamespace(id="turn-1"))
codex = AsyncCodex()
codex._client = FakeAsyncClient()
codex._initialized = True
await codex.thread_start()
await codex.thread_resume("thread-1")
await codex.thread_fork("thread-1")
await AsyncThread(codex, "thread-1").turn(TextInput("hello"))
await codex.thread_start(approval_mode=ApprovalMode.deny_all)
await codex.thread_resume(
"thread-1",
approval_mode=ApprovalMode.deny_all,
)
await codex.thread_fork("thread-1", approval_mode=ApprovalMode.deny_all)
await AsyncThread(codex, "thread-1").turn(
TextInput("hello"),
approval_mode=ApprovalMode.deny_all,
)
assert _approval_settings(captured) == [
{"approvalPolicy": "on-request", "approvalsReviewer": "auto_review"},
{"approvalPolicy": "on-request", "approvalsReviewer": "auto_review"},
{"approvalPolicy": "on-request", "approvalsReviewer": "auto_review"},
{"approvalPolicy": "on-request", "approvalsReviewer": "auto_review"},
{"approvalPolicy": "never"},
{"approvalPolicy": "never"},
{"approvalPolicy": "never"},
{"approvalPolicy": "never"},
]
asyncio.run(scenario())
def test_approval_modes_serialize_to_expected_start_params() -> None:
"""ApprovalMode should map to the app-server params sent for new work."""
assert {
mode.value: _approval_settings([_approval_mode_turn_params(mode)])[0]
for mode in ApprovalMode
} == {
"deny_all": {"approvalPolicy": "never"},
"auto_review": {
"approvalPolicy": "on-request",
"approvalsReviewer": "auto_review",
},
}
def test_async_api_rejects_unknown_approval_mode_before_rpc() -> None:
"""Unknown async approval modes should fail before awaiting client calls."""
async def scenario() -> None:
"""Exercise async validation without starting a real app-server process."""
calls: list[str] = []
class FailOnAsyncRpcClient:
async def thread_start(self, _params: object) -> SimpleNamespace:
calls.append("thread_start")
return SimpleNamespace(thread=SimpleNamespace(id="thread-started"))
async def turn_start(
self,
_thread_id: str,
_wire_input: object,
*,
params: object | None = None, # noqa: ARG002
) -> SimpleNamespace:
calls.append("turn_start")
return SimpleNamespace(turn=SimpleNamespace(id="turn-1"))
codex = AsyncCodex()
codex._client = FailOnAsyncRpcClient()
codex._initialized = True
errors: list[str] = []
for call in (
lambda: codex.thread_start(approval_mode="allow_all"), # type: ignore[arg-type]
lambda: AsyncThread(codex, "thread-1").turn( # type: ignore[arg-type]
TextInput("hello"),
approval_mode="allow_all",
),
):
with pytest.raises(ValueError) as exc_info:
await call()
errors.append(str(exc_info.value))
assert (errors, calls) == (
[
"approval_mode must be one of: deny_all, auto_review",
"approval_mode must be one of: deny_all, auto_review",
],
[],
)
asyncio.run(scenario())
def test_unknown_approval_mode_is_rejected() -> None:
"""Invalid approval modes should fail before params are constructed."""
with pytest.raises(ValueError, match="deny_all, auto_review"):
public_api_module._approval_mode_settings("allow_all") # type: ignore[arg-type]
def test_turn_streams_can_consume_multiple_turns_on_one_client() -> None:
@@ -506,6 +314,7 @@ def test_turn_streams_can_consume_multiple_turns_on_one_client() -> None:
def test_async_turn_streams_can_consume_multiple_turns_on_one_client() -> None:
"""Two async TurnHandle streams should advance independently on one client."""
async def scenario() -> None:
"""Interleave two async streams backed by separate per-turn queues."""
codex = AsyncCodex()
@@ -775,6 +584,7 @@ def test_stream_text_registers_and_consumes_turn_notifications() -> None:
def test_async_thread_run_accepts_string_input_and_returns_run_result() -> None:
"""Async Thread.run should normalize string input and collect routed results."""
async def scenario() -> None:
"""Feed item, usage, and completion events through the async turn stream."""
codex = AsyncCodex()
@@ -834,6 +644,7 @@ def test_async_thread_run_uses_last_completed_assistant_message_as_final_respons
None
):
"""Async run should use the last final assistant message as the response text."""
async def scenario() -> None:
"""Feed two completed agent messages through the async per-turn stream."""
codex = AsyncCodex()
@@ -881,6 +692,7 @@ def test_async_thread_run_uses_last_completed_assistant_message_as_final_respons
def test_async_thread_run_returns_none_when_only_commentary_messages_complete() -> None:
"""Async Thread.run should ignore commentary-only messages for final text."""
async def scenario() -> None:
"""Feed a commentary item and completion through the async turn stream."""
codex = AsyncCodex()