Route Python SDK turn notifications by ID (#21778)

## Why

The Python SDK previously protected the stdio transport with a single
active turn-consumer guard. That avoided competing reads from stdout,
but it also meant one `Codex`/`AsyncCodex` client could not stream
multiple active turns at the same time. Notifications could also arrive
before the caller received a `TurnHandle` and registered for streaming,
so the SDK needed an explicit routing layer instead of letting
individual API calls read directly from the shared transport.

## What Changed

- Added a private `MessageRouter` that owns per-request response queues,
per-turn notification queues, pending turn-notification replay, and
global notification delivery behind a single stdout reader thread.
- Generated typed notification routing metadata so turn IDs come from
known payload shapes instead of router-side attribute guessing, with
explicit fallback handling for unknown notification payloads.
- Updated sync and async turn streaming so `TurnHandle.stream()`/`run()`
and `stream_text()` consume only notifications for their own turn ID,
while `AsyncAppServerClient` no longer serializes all transport calls
behind one async lock.
- Cleared pending turn-notification buffers when unregistered turns
complete so never-consumed turn handles do not leave stale queues
behind.
- Removed the internal stream-until helper now that turn completion
waiting can register directly with routed turn notifications.
- Updated Python SDK docs and focused tests for concurrent transport
calls, interleaved turn routing, buffered early notifications, unknown
notification routing, async delegation, and routed turn completion
behavior.

## Validation

- `uv run --extra dev ruff format scripts/update_sdk_artifacts.py
src/codex_app_server/_message_router.py src/codex_app_server/client.py
src/codex_app_server/generated/notification_registry.py
tests/test_client_rpc_methods.py
tests/test_public_api_runtime_behavior.py
tests/test_async_client_behavior.py`
- `uv run --extra dev ruff check scripts/update_sdk_artifacts.py
src/codex_app_server/_message_router.py src/codex_app_server/client.py
src/codex_app_server/generated/notification_registry.py
tests/test_client_rpc_methods.py
tests/test_public_api_runtime_behavior.py
tests/test_async_client_behavior.py`
- `uv run --extra dev pytest tests/test_client_rpc_methods.py
tests/test_public_api_runtime_behavior.py
tests/test_async_client_behavior.py`
- `git diff --check`

---------

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Ahmed Ibrahim
2026-05-09 07:16:23 +03:00
committed by GitHub
parent 77d9223e9f
commit ebe75bb683
11 changed files with 916 additions and 197 deletions

View File

@@ -226,54 +226,74 @@ def test_async_codex_initializes_only_once_under_concurrency() -> None:
asyncio.run(scenario())
def test_turn_stream_rejects_second_active_consumer() -> None:
def test_turn_streams_can_consume_multiple_turns_on_one_client() -> None:
client = AppServerClient()
notifications: deque[Notification] = deque(
[
_delta_notification(turn_id="turn-1"),
_completed_notification(turn_id="turn-1"),
]
)
client.next_notification = notifications.popleft # type: ignore[method-assign]
notifications: dict[str, deque[Notification]] = {
"turn-1": deque(
[
_delta_notification(turn_id="turn-1", text="one"),
_completed_notification(turn_id="turn-1"),
]
),
"turn-2": deque(
[
_delta_notification(turn_id="turn-2", text="two"),
_completed_notification(turn_id="turn-2"),
]
),
}
client.next_turn_notification = lambda turn_id: notifications[turn_id].popleft() # type: ignore[method-assign]
first_stream = TurnHandle(client, "thread-1", "turn-1").stream()
assert next(first_stream).method == "item/agentMessage/delta"
second_stream = TurnHandle(client, "thread-1", "turn-2").stream()
with pytest.raises(RuntimeError, match="Concurrent turn consumers are not yet supported"):
next(second_stream)
assert next(second_stream).method == "item/agentMessage/delta"
assert next(first_stream).method == "turn/completed"
assert next(second_stream).method == "turn/completed"
first_stream.close()
second_stream.close()
def test_async_turn_stream_rejects_second_active_consumer() -> None:
def test_async_turn_streams_can_consume_multiple_turns_on_one_client() -> None:
async def scenario() -> None:
codex = AsyncCodex()
async def fake_ensure_initialized() -> None:
return None
notifications: deque[Notification] = deque(
[
_delta_notification(turn_id="turn-1"),
_completed_notification(turn_id="turn-1"),
]
)
notifications: dict[str, deque[Notification]] = {
"turn-1": deque(
[
_delta_notification(turn_id="turn-1", text="one"),
_completed_notification(turn_id="turn-1"),
]
),
"turn-2": deque(
[
_delta_notification(turn_id="turn-2", text="two"),
_completed_notification(turn_id="turn-2"),
]
),
}
async def fake_next_notification() -> Notification:
return notifications.popleft()
async def fake_next_notification(turn_id: str) -> Notification:
return notifications[turn_id].popleft()
codex._ensure_initialized = fake_ensure_initialized # type: ignore[method-assign]
codex._client.next_notification = fake_next_notification # type: ignore[method-assign]
codex._client.next_turn_notification = fake_next_notification # type: ignore[method-assign]
first_stream = AsyncTurnHandle(codex, "thread-1", "turn-1").stream()
assert (await anext(first_stream)).method == "item/agentMessage/delta"
second_stream = AsyncTurnHandle(codex, "thread-1", "turn-2").stream()
with pytest.raises(RuntimeError, match="Concurrent turn consumers are not yet supported"):
await anext(second_stream)
assert (await anext(second_stream)).method == "item/agentMessage/delta"
assert (await anext(first_stream)).method == "turn/completed"
assert (await anext(second_stream)).method == "turn/completed"
await first_stream.aclose()
await second_stream.aclose()
asyncio.run(scenario())
@@ -285,7 +305,7 @@ def test_turn_run_returns_completed_turn_payload() -> None:
_completed_notification(),
]
)
client.next_notification = notifications.popleft # type: ignore[method-assign]
client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign]
result = TurnHandle(client, "thread-1", "turn-1").run()
@@ -305,7 +325,7 @@ def test_thread_run_accepts_string_input_and_returns_run_result() -> None:
_completed_notification(),
]
)
client.next_notification = notifications.popleft # type: ignore[method-assign]
client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign]
seen: dict[str, object] = {}
def fake_turn_start(thread_id: str, wire_input: object, *, params=None): # noqa: ANN001,ANN202
@@ -338,7 +358,7 @@ def test_thread_run_uses_last_completed_assistant_message_as_final_response() ->
_completed_notification(),
]
)
client.next_notification = notifications.popleft # type: ignore[method-assign]
client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign]
client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731
turn=SimpleNamespace(id="turn-1")
)
@@ -363,7 +383,7 @@ def test_thread_run_preserves_empty_last_assistant_message() -> None:
_completed_notification(),
]
)
client.next_notification = notifications.popleft # type: ignore[method-assign]
client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign]
client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731
turn=SimpleNamespace(id="turn-1")
)
@@ -394,7 +414,7 @@ def test_thread_run_prefers_explicit_final_answer_over_later_commentary() -> Non
_completed_notification(),
]
)
client.next_notification = notifications.popleft # type: ignore[method-assign]
client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign]
client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731
turn=SimpleNamespace(id="turn-1")
)
@@ -420,7 +440,7 @@ def test_thread_run_returns_none_when_only_commentary_messages_complete() -> Non
_completed_notification(),
]
)
client.next_notification = notifications.popleft # type: ignore[method-assign]
client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign]
client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731
turn=SimpleNamespace(id="turn-1")
)
@@ -438,7 +458,7 @@ def test_thread_run_raises_on_failed_turn() -> None:
_completed_notification(status="failed", error_message="boom"),
]
)
client.next_notification = notifications.popleft # type: ignore[method-assign]
client.next_turn_notification = lambda _turn_id: notifications.popleft() # type: ignore[method-assign]
client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731
turn=SimpleNamespace(id="turn-1")
)
@@ -447,6 +467,48 @@ def test_thread_run_raises_on_failed_turn() -> None:
Thread(client, "thread-1").run("hello")
def test_stream_text_registers_and_consumes_turn_notifications() -> None:
client = AppServerClient()
notifications: deque[Notification] = deque(
[
_delta_notification(text="first"),
_delta_notification(text="second"),
_completed_notification(),
]
)
calls: list[tuple[str, str]] = []
client.turn_start = lambda thread_id, input_items, *, params=None: SimpleNamespace( # noqa: ARG005,E731
turn=SimpleNamespace(id="turn-1")
)
def fake_register(turn_id: str) -> None:
calls.append(("register", turn_id))
def fake_next(turn_id: str) -> Notification:
calls.append(("next", turn_id))
return notifications.popleft()
def fake_unregister(turn_id: str) -> None:
calls.append(("unregister", turn_id))
client.register_turn_notifications = fake_register # type: ignore[method-assign]
client.next_turn_notification = fake_next # type: ignore[method-assign]
client.unregister_turn_notifications = fake_unregister # type: ignore[method-assign]
chunks = list(client.stream_text("thread-1", "hello"))
assert ([chunk.delta for chunk in chunks], calls) == (
["first", "second"],
[
("register", "turn-1"),
("next", "turn-1"),
("next", "turn-1"),
("next", "turn-1"),
("unregister", "turn-1"),
],
)
def test_async_thread_run_accepts_string_input_and_returns_run_result() -> None:
async def scenario() -> None:
codex = AsyncCodex()
@@ -471,12 +533,12 @@ def test_async_thread_run_accepts_string_input_and_returns_run_result() -> None:
seen["params"] = params
return SimpleNamespace(turn=SimpleNamespace(id="turn-1"))
async def fake_next_notification() -> Notification:
async def fake_next_notification(_turn_id: str) -> Notification:
return notifications.popleft()
codex._ensure_initialized = fake_ensure_initialized # type: ignore[method-assign]
codex._client.turn_start = fake_turn_start # type: ignore[method-assign]
codex._client.next_notification = fake_next_notification # type: ignore[method-assign]
codex._client.next_turn_notification = fake_next_notification # type: ignore[method-assign]
result = await AsyncThread(codex, "thread-1").run("hello")
@@ -491,15 +553,21 @@ def test_async_thread_run_accepts_string_input_and_returns_run_result() -> None:
asyncio.run(scenario())
def test_async_thread_run_uses_last_completed_assistant_message_as_final_response() -> None:
def test_async_thread_run_uses_last_completed_assistant_message_as_final_response() -> (
None
):
async def scenario() -> None:
codex = AsyncCodex()
async def fake_ensure_initialized() -> None:
return None
first_item_notification = _item_completed_notification(text="First async message")
second_item_notification = _item_completed_notification(text="Second async message")
first_item_notification = _item_completed_notification(
text="First async message"
)
second_item_notification = _item_completed_notification(
text="Second async message"
)
notifications: deque[Notification] = deque(
[
first_item_notification,
@@ -511,12 +579,12 @@ def test_async_thread_run_uses_last_completed_assistant_message_as_final_respons
async def fake_turn_start(thread_id: str, wire_input: object, *, params=None): # noqa: ANN001,ANN202,ARG001
return SimpleNamespace(turn=SimpleNamespace(id="turn-1"))
async def fake_next_notification() -> Notification:
async def fake_next_notification(_turn_id: str) -> Notification:
return notifications.popleft()
codex._ensure_initialized = fake_ensure_initialized # type: ignore[method-assign]
codex._client.turn_start = fake_turn_start # type: ignore[method-assign]
codex._client.next_notification = fake_next_notification # type: ignore[method-assign]
codex._client.next_turn_notification = fake_next_notification # type: ignore[method-assign]
result = await AsyncThread(codex, "thread-1").run("hello")
@@ -550,12 +618,12 @@ def test_async_thread_run_returns_none_when_only_commentary_messages_complete()
async def fake_turn_start(thread_id: str, wire_input: object, *, params=None): # noqa: ANN001,ANN202,ARG001
return SimpleNamespace(turn=SimpleNamespace(id="turn-1"))
async def fake_next_notification() -> Notification:
async def fake_next_notification(_turn_id: str) -> Notification:
return notifications.popleft()
codex._ensure_initialized = fake_ensure_initialized # type: ignore[method-assign]
codex._client.turn_start = fake_turn_start # type: ignore[method-assign]
codex._client.next_notification = fake_next_notification # type: ignore[method-assign]
codex._client.next_turn_notification = fake_next_notification # type: ignore[method-assign]
result = await AsyncThread(codex, "thread-1").run("hello")