Default Python SDK approval policy to never

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Ahmed Ibrahim
2026-05-09 12:03:09 +03:00
parent 78c0d5ca3d
commit d80a43263f
10 changed files with 220 additions and 18 deletions

View File

@@ -4,6 +4,7 @@ import asyncio
from collections import deque
from pathlib import Path
from types import SimpleNamespace
from typing import Any
import pytest
@@ -24,13 +25,23 @@ from openai_codex.api import (
AsyncTurnHandle,
Codex,
RunResult,
TextInput,
Thread,
TurnHandle,
)
from openai_codex.types import AskForApproval
ROOT = Path(__file__).resolve().parents[1]
def _approval_policy_values(params: list[Any]) -> list[object]:
"""Return serialized approval policies from captured Pydantic params."""
return [
param.model_dump(by_alias=True, mode="json").get("approvalPolicy")
for param in params
]
def _delta_notification(
*,
thread_id: str = "thread-1",
@@ -229,6 +240,130 @@ def test_async_codex_initializes_only_once_under_concurrency() -> None:
asyncio.run(scenario())
def test_ask_for_approval_exposes_simple_policy_constants() -> None:
"""AskForApproval should expose enum-like aliases for simple policies."""
assert {
"untrusted": AskForApproval.untrusted.model_dump(mode="json"),
"on_failure": AskForApproval.on_failure.model_dump(mode="json"),
"on_request": AskForApproval.on_request.model_dump(mode="json"),
"never": AskForApproval.never.model_dump(mode="json"),
} == {
"untrusted": "untrusted",
"on_failure": "on-failure",
"on_request": "on-request",
"never": "never",
}
def test_sync_api_forces_approval_policy_never_for_started_work() -> None:
"""Sync start methods should send never until approval handling exists."""
captured: list[Any] = []
class FakeClient:
def thread_start(self, params: object) -> SimpleNamespace:
captured.append(params)
return SimpleNamespace(thread=SimpleNamespace(id="thread-started"))
def thread_resume(
self,
_thread_id: str,
params: object,
) -> SimpleNamespace:
captured.append(params)
return SimpleNamespace(thread=SimpleNamespace(id="thread-resumed"))
def thread_fork(
self,
_thread_id: str,
params: object,
) -> SimpleNamespace:
captured.append(params)
return SimpleNamespace(thread=SimpleNamespace(id="thread-forked"))
def turn_start(
self,
_thread_id: str,
_wire_input: object,
*,
params: object | None = None,
) -> SimpleNamespace:
captured.append(params)
return SimpleNamespace(turn=SimpleNamespace(id="turn-1"))
client = FakeClient()
codex = object.__new__(Codex)
codex._client = client
codex.thread_start(approval_policy=AskForApproval.on_request)
codex.thread_resume("thread-1", approval_policy=AskForApproval.on_request)
codex.thread_fork("thread-1", approval_policy=AskForApproval.on_request)
Thread(client, "thread-1").turn(
TextInput("hello"),
approval_policy=AskForApproval.on_request,
)
assert _approval_policy_values(captured) == ["never", "never", "never", "never"]
def test_async_api_forces_approval_policy_never_for_started_work() -> None:
"""Async start methods should send never until approval handling exists."""
async def scenario() -> None:
"""Exercise the async wrappers without spawning a real app server."""
captured: list[Any] = []
class FakeAsyncClient:
async def thread_start(self, params: object) -> SimpleNamespace:
captured.append(params)
return SimpleNamespace(thread=SimpleNamespace(id="thread-started"))
async def thread_resume(
self,
_thread_id: str,
params: object,
) -> SimpleNamespace:
captured.append(params)
return SimpleNamespace(thread=SimpleNamespace(id="thread-resumed"))
async def thread_fork(
self,
_thread_id: str,
params: object,
) -> SimpleNamespace:
captured.append(params)
return SimpleNamespace(thread=SimpleNamespace(id="thread-forked"))
async def turn_start(
self,
_thread_id: str,
_wire_input: object,
*,
params: object | None = None,
) -> SimpleNamespace:
captured.append(params)
return SimpleNamespace(turn=SimpleNamespace(id="turn-1"))
codex = AsyncCodex()
codex._client = FakeAsyncClient()
codex._initialized = True
await codex.thread_start(approval_policy=AskForApproval.on_request)
await codex.thread_resume("thread-1", approval_policy=AskForApproval.on_request)
await codex.thread_fork("thread-1", approval_policy=AskForApproval.on_request)
await AsyncThread(codex, "thread-1").turn(
TextInput("hello"),
approval_policy=AskForApproval.on_request,
)
assert _approval_policy_values(captured) == [
"never",
"never",
"never",
"never",
]
asyncio.run(scenario())
def test_turn_streams_can_consume_multiple_turns_on_one_client() -> None:
"""Two sync TurnHandle streams should advance independently on one client."""
client = AppServerClient()