Files
codex/sdk/python/tests/test_app_server_streaming.py
Ahmed Ibrahim aa9e8f0262 [8/8] Add Python SDK Ruff formatting (#22021)
## Why

The Python SDK needs the same tight formatter/lint loop as the rest of
the repo: a safe Ruff autofix pass, Ruff formatting, editor save
behavior, and CI checks that catch drift. Without that loop, SDK changes
can land with formatting or import ordering that differs from what
reviewers and CI expect.

## What

- Add Ruff configuration to `sdk/python/pyproject.toml`, excluding
generated protocol code and notebooks from the normal lint/format pass.
- Update `just fmt` so it still formats Rust and also runs Python SDK
Ruff autofix and formatting.
- Add Python SDK CI steps for `ruff check` and `ruff format --check`
before pytest.
- Recommend the Ruff VS Code extension and enable Python
format/fix/organize-on-save so Cmd+S uses the same tooling.
- Apply the resulting Ruff formatting to SDK Python files, examples, and
the checked-in generated `v2_all.py` output emitted by the pinned
generator.
- Add a guard test for the `just fmt` recipe so it keeps working from
both Rust and Python SDK working directories.

## Stack

1. #21891 `[1/8]` Pin Python SDK runtime dependency
2. #21893 `[2/8]` Generate Python SDK types from pinned runtime
3. #21895 `[3/8]` Run Python SDK tests in CI
4. #21896 `[4/8]` Define Python SDK public API surface
5. #21905 `[5/8]` Rename Python SDK package to `openai-codex`
6. #21910 `[6/8]` Add high-level Python SDK approval mode
7. #22014 `[7/8]` Add Python SDK app-server integration harness
8. This PR `[8/8]` Add Python SDK Ruff formatting

## Verification

- Added `test_root_fmt_recipe_formats_rust_and_python_sdk` for the
shared format recipe.
- Ran `just fmt` after the recipe update.

---------

Co-authored-by: Codex <noreply@openai.com>
2026-05-12 01:10:29 +03:00

266 lines
9.7 KiB
Python

from __future__ import annotations
import asyncio
from app_server_harness import AppServerHarness
from app_server_helpers import (
agent_message_texts,
next_async_delta,
next_sync_delta,
streaming_response,
)
from openai_codex import AsyncCodex, Codex, TextInput
from openai_codex.generated.v2_all import (
AgentMessageDeltaNotification,
TurnCompletedNotification,
TurnStatus,
)
def test_sync_stream_routes_text_deltas_and_completion(tmp_path) -> None:
"""A sync turn stream should expose deltas, completed items, and completion."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(streaming_response("stream-1", "msg-stream-1", ["he", "llo"]))
with Codex(config=harness.app_server_config()) as codex:
thread = codex.thread_start()
stream = thread.turn(TextInput("stream please")).stream()
events = list(stream)
assert {
"deltas": [
event.payload.delta
for event in events
if isinstance(event.payload, AgentMessageDeltaNotification)
],
"agent_messages": agent_message_texts(events),
"completed_statuses": [
event.payload.turn.status
for event in events
if isinstance(event.payload, TurnCompletedNotification)
],
} == {
"deltas": ["he", "llo"],
"agent_messages": ["hello"],
"completed_statuses": [TurnStatus.completed],
}
def test_turn_run_returns_completed_turn(tmp_path) -> None:
"""TurnHandle.run should wait for the app-server completion notification."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_assistant_message("turn complete", response_id="turn-run-1")
with Codex(config=harness.app_server_config()) as codex:
thread = codex.thread_start()
turn = thread.turn(TextInput("complete this turn"))
completed = turn.run()
assert {
"turn_id": completed.id,
"status": completed.status,
"items": completed.items,
} == {
"turn_id": turn.id,
"status": TurnStatus.completed,
"items": [],
}
def test_async_stream_routes_text_deltas_and_completion(tmp_path) -> None:
"""An async turn stream should expose the same notification sequence."""
async def scenario() -> None:
"""Stream one async turn against the real pinned app-server."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(
streaming_response("async-stream-1", "msg-async-stream-1", ["as", "ync"])
)
async with AsyncCodex(config=harness.app_server_config()) as codex:
thread = await codex.thread_start()
turn = await thread.turn(TextInput("async stream please"))
events = [event async for event in turn.stream()]
assert {
"deltas": [
event.payload.delta
for event in events
if isinstance(event.payload, AgentMessageDeltaNotification)
],
"agent_messages": agent_message_texts(events),
"completed_statuses": [
event.payload.turn.status
for event in events
if isinstance(event.payload, TurnCompletedNotification)
],
} == {
"deltas": ["as", "ync"],
"agent_messages": ["async"],
"completed_statuses": [TurnStatus.completed],
}
asyncio.run(scenario())
def test_low_level_sync_stream_text_uses_real_turn_routing(tmp_path) -> None:
"""AppServerClient.stream_text should stream through a real app-server turn."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(
streaming_response("low-sync-stream", "msg-low-sync-stream", ["fir", "st"])
)
with Codex(config=harness.app_server_config()) as codex:
thread = codex.thread_start()
chunks = list(codex._client.stream_text(thread.id, "low-level sync")) # noqa: SLF001
assert [chunk.delta for chunk in chunks] == ["fir", "st"]
def test_low_level_async_stream_text_allows_parallel_model_list(tmp_path) -> None:
"""Async stream_text should yield without blocking another app-server request."""
async def scenario() -> None:
"""Leave a stream open while another async request completes."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(
streaming_response(
"low-async-stream",
"msg-low-async-stream",
["one", "two", "three"],
),
delay_between_events_s=0.03,
)
async with AsyncCodex(config=harness.app_server_config()) as codex:
thread = await codex.thread_start()
stream = codex._client.stream_text( # noqa: SLF001
thread.id,
"low-level async",
)
first = await anext(stream)
models_task = asyncio.create_task(codex.models())
models = await asyncio.wait_for(models_task, timeout=1.0)
remaining = [chunk.delta async for chunk in stream]
assert {
"first": first.delta,
"remaining": remaining,
"models_payload_has_data": isinstance(
models.model_dump(by_alias=True, mode="json").get("data"),
list,
),
} == {
"first": "one",
"remaining": ["two", "three"],
"models_payload_has_data": True,
}
asyncio.run(scenario())
def test_interleaved_sync_turn_streams_route_by_turn_id(tmp_path) -> None:
"""Two sync streams on one client should consume only their own notifications."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(
streaming_response("first-stream", "msg-first", ["one-", "done"]),
delay_between_events_s=0.01,
)
harness.responses.enqueue_sse(
streaming_response("second-stream", "msg-second", ["two-", "done"]),
delay_between_events_s=0.01,
)
with Codex(config=harness.app_server_config()) as codex:
first_thread = codex.thread_start()
second_thread = codex.thread_start()
first_turn = first_thread.turn(TextInput("first"))
second_turn = second_thread.turn(TextInput("second"))
first_stream = first_turn.stream()
second_stream = second_turn.stream()
first_first_delta = next_sync_delta(first_stream)
second_first_delta = next_sync_delta(second_stream)
first_second_delta = next_sync_delta(first_stream)
second_second_delta = next_sync_delta(second_stream)
first_tail = list(first_stream)
second_tail = list(second_stream)
assert {
"streams": sorted(
[
(
first_first_delta,
first_second_delta,
agent_message_texts(first_tail),
),
(
second_first_delta,
second_second_delta,
agent_message_texts(second_tail),
),
]
),
} == {
"streams": [
("one-", "done", ["one-done"]),
("two-", "done", ["two-done"]),
],
}
def test_interleaved_async_turn_streams_route_by_turn_id(tmp_path) -> None:
"""Two async streams on one client should consume only their own notifications."""
async def scenario() -> None:
"""Interleave async stream consumers against one app-server process."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(
streaming_response("async-first", "msg-async-first", ["a1", "-done"]),
delay_between_events_s=0.01,
)
harness.responses.enqueue_sse(
streaming_response("async-second", "msg-async-second", ["a2", "-done"]),
delay_between_events_s=0.01,
)
async with AsyncCodex(config=harness.app_server_config()) as codex:
first_thread = await codex.thread_start()
second_thread = await codex.thread_start()
first_turn = await first_thread.turn(TextInput("async first"))
second_turn = await second_thread.turn(TextInput("async second"))
first_stream = first_turn.stream()
second_stream = second_turn.stream()
first_first_delta = await next_async_delta(first_stream)
second_first_delta = await next_async_delta(second_stream)
first_second_delta = await next_async_delta(first_stream)
second_second_delta = await next_async_delta(second_stream)
first_tail = [event async for event in first_stream]
second_tail = [event async for event in second_stream]
assert {
"streams": sorted(
[
(
first_first_delta,
first_second_delta,
agent_message_texts(first_tail),
),
(
second_first_delta,
second_second_delta,
agent_message_texts(second_tail),
),
]
),
} == {
"streams": [
("a1", "-done", ["a1-done"]),
("a2", "-done", ["a2-done"]),
],
}
asyncio.run(scenario())