Return TurnResult from Python turn handles

This commit is contained in:
Ahmed Ibrahim
2026-05-17 06:01:12 -07:00
parent 4c89772314
commit fab7cba2c9
42 changed files with 401 additions and 684 deletions

View File

@@ -5,12 +5,7 @@ _EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
@@ -26,19 +21,14 @@ async def main() -> None:
)
turn = await thread.turn(TextInput("Give 3 bullets about SIMD."))
result = await turn.run()
persisted = await thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
print("thread_id:", thread.id)
print("turn_id:", result.id)
print("status:", result.status)
if result.error is not None:
print("error:", result.error)
print("text:", assistant_text_from_turn(persisted_turn))
print(
"persisted.items.count:",
0 if persisted_turn is None else len(persisted_turn.items or []),
)
print("text:", result.final_response)
print("items.count:", len(result.items))
if __name__ == "__main__":

View File

@@ -5,12 +5,7 @@ _EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
@@ -19,16 +14,11 @@ from openai_codex import Codex, TextInput
with Codex(config=runtime_config()) as codex:
thread = codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
result = thread.turn(TextInput("Give 3 bullets about SIMD.")).run()
persisted = thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
print("thread_id:", thread.id)
print("turn_id:", result.id)
print("status:", result.status)
if result.error is not None:
print("error:", result.error)
print("text:", assistant_text_from_turn(persisted_turn))
print(
"persisted.items.count:",
0 if persisted_turn is None else len(persisted_turn.items or []),
)
print("text:", result.final_response)
print("items.count:", len(result.items))

View File

@@ -5,12 +5,7 @@ _EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
@@ -29,7 +24,8 @@ async def main() -> None:
event_count = 0
saw_started = False
saw_delta = False
completed_status = "unknown"
completed_status = None
completed_texts = []
async for event in turn.stream():
event_count += 1
@@ -38,24 +34,27 @@ async def main() -> None:
print("stream.started")
continue
if event.method == "item/agentMessage/delta":
delta = getattr(event.payload, "delta", "")
delta = event.payload.delta
if delta:
if not saw_delta:
print("assistant> ", end="", flush=True)
print(delta, end="", flush=True)
saw_delta = True
continue
if event.method == "item/completed":
root = event.payload.item.root
if root.type == "agentMessage":
completed_texts.append(root.text)
continue
if event.method == "turn/completed":
completed_status = getattr(
event.payload.turn.status, "value", str(event.payload.turn.status)
)
completed_status = event.payload.turn.status.value
if completed_status is None:
raise RuntimeError("stream ended without turn/completed")
if saw_delta:
print()
else:
persisted = await thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, turn.id)
final_text = assistant_text_from_turn(persisted_turn).strip() or "[no assistant text]"
final_text = "".join(completed_texts).strip()
print("assistant>", final_text)
print("stream.started.seen:", saw_started)

View File

@@ -5,12 +5,7 @@ _EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
@@ -23,7 +18,8 @@ with Codex(config=runtime_config()) as codex:
event_count = 0
saw_started = False
saw_delta = False
completed_status = "unknown"
completed_status = None
completed_texts = []
for event in turn.stream():
event_count += 1
@@ -32,24 +28,27 @@ with Codex(config=runtime_config()) as codex:
print("stream.started")
continue
if event.method == "item/agentMessage/delta":
delta = getattr(event.payload, "delta", "")
delta = event.payload.delta
if delta:
if not saw_delta:
print("assistant> ", end="", flush=True)
print(delta, end="", flush=True)
saw_delta = True
continue
if event.method == "item/completed":
root = event.payload.item.root
if root.type == "agentMessage":
completed_texts.append(root.text)
continue
if event.method == "turn/completed":
completed_status = getattr(
event.payload.turn.status, "value", str(event.payload.turn.status)
)
completed_status = event.payload.turn.status.value
if completed_status is None:
raise RuntimeError("stream ended without turn/completed")
if saw_delta:
print()
else:
persisted = thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, turn.id)
final_text = assistant_text_from_turn(persisted_turn).strip() or "[no assistant text]"
final_text = "".join(completed_texts).strip()
print("assistant>", final_text)
print("stream.started.seen:", saw_started)

View File

@@ -19,7 +19,7 @@ async def main() -> None:
print("server:", server_label(codex.metadata))
models = await codex.models()
print("models.count:", len(models.data))
print("models:", ", ".join(model.id for model in models.data[:5]) or "[none]")
print("models:", ", ".join(model.id for model in models.data[:5]))
if __name__ == "__main__":

View File

@@ -15,4 +15,4 @@ with Codex(config=runtime_config()) as codex:
print("server:", server_label(codex.metadata))
models = codex.models()
print("models.count:", len(models.data))
print("models:", ", ".join(model.id for model in models.data[:5]) or "[none]")
print("models:", ", ".join(model.id for model in models.data[:5]))

View File

@@ -5,12 +5,7 @@ _EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
@@ -32,9 +27,7 @@ async def main() -> None:
resumed = await codex.thread_resume(original.id)
second_turn = await resumed.turn(TextInput("Continue with one more fact."))
second = await second_turn.run()
persisted = await resumed.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, second.id)
print(assistant_text_from_turn(persisted_turn))
print(second.final_response)
if __name__ == "__main__":

View File

@@ -5,12 +5,7 @@ _EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
@@ -25,6 +20,4 @@ with Codex(config=runtime_config()) as codex:
# Resume the existing thread by ID.
resumed = codex.thread_resume(original.id)
second = resumed.turn(TextInput("Continue with one more fact.")).run()
persisted = resumed.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, second.id)
print(assistant_text_from_turn(persisted_turn))
print(second.final_response)

View File

@@ -33,45 +33,31 @@ async def main() -> None:
listing_archived = await codex.thread_list(limit=20, archived=True)
unarchived = await codex.thread_unarchive(reopened.id)
resumed_info = "n/a"
try:
resumed = await codex.thread_resume(
unarchived.id,
model="gpt-5.4",
config={"model_reasoning_effort": "high"},
)
resumed_result = await (
await resumed.turn(TextInput("Continue in one short sentence."))
).run()
resumed_info = f"{resumed_result.id} {resumed_result.status}"
except Exception as exc:
resumed_info = f"skipped({type(exc).__name__})"
resumed = await codex.thread_resume(
unarchived.id,
model="gpt-5.4",
config={"model_reasoning_effort": "high"},
)
resumed_result = await (
await resumed.turn(TextInput("Continue in one short sentence."))
).run()
forked_info = "n/a"
try:
forked = await codex.thread_fork(unarchived.id, model="gpt-5.4")
forked_result = await (
await forked.turn(TextInput("Take a different angle in one short sentence."))
).run()
forked_info = f"{forked_result.id} {forked_result.status}"
except Exception as exc:
forked_info = f"skipped({type(exc).__name__})"
forked = await codex.thread_fork(unarchived.id, model="gpt-5.4")
forked_result = await (
await forked.turn(TextInput("Take a different angle in one short sentence."))
).run()
compact_info = "sent"
try:
_ = await unarchived.compact()
except Exception as exc:
compact_info = f"skipped({type(exc).__name__})"
compact_result = await unarchived.compact()
print("Lifecycle OK:", thread.id)
print("first:", first.id, first.status)
print("second:", second.id, second.status)
print("read.turns:", len(reading.thread.turns or []))
print("read.turns:", len(reading.thread.turns))
print("list.active:", len(listing_active.data))
print("list.archived:", len(listing_archived.data))
print("resumed:", resumed_info)
print("forked:", forked_info)
print("compact:", compact_info)
print("resumed:", resumed_result.id, resumed_result.status)
print("forked:", forked_result.id, forked_result.status)
print("compact:", compact_result.model_dump(mode="json", by_alias=True))
if __name__ == "__main__":

View File

@@ -25,40 +25,26 @@ with Codex(config=runtime_config()) as codex:
listing_archived = codex.thread_list(limit=20, archived=True)
unarchived = codex.thread_unarchive(reopened.id)
resumed_info = "n/a"
try:
resumed = codex.thread_resume(
unarchived.id,
model="gpt-5.4",
config={"model_reasoning_effort": "high"},
)
resumed_result = resumed.turn(TextInput("Continue in one short sentence.")).run()
resumed_info = f"{resumed_result.id} {resumed_result.status}"
except Exception as exc:
resumed_info = f"skipped({type(exc).__name__})"
resumed = codex.thread_resume(
unarchived.id,
model="gpt-5.4",
config={"model_reasoning_effort": "high"},
)
resumed_result = resumed.turn(TextInput("Continue in one short sentence.")).run()
forked_info = "n/a"
try:
forked = codex.thread_fork(unarchived.id, model="gpt-5.4")
forked_result = forked.turn(
TextInput("Take a different angle in one short sentence.")
).run()
forked_info = f"{forked_result.id} {forked_result.status}"
except Exception as exc:
forked_info = f"skipped({type(exc).__name__})"
forked = codex.thread_fork(unarchived.id, model="gpt-5.4")
forked_result = forked.turn(
TextInput("Take a different angle in one short sentence.")
).run()
compact_info = "sent"
try:
_ = unarchived.compact()
except Exception as exc:
compact_info = f"skipped({type(exc).__name__})"
compact_result = unarchived.compact()
print("Lifecycle OK:", thread.id)
print("first:", first.id, first.status)
print("second:", second.id, second.status)
print("read.turns:", len(reading.thread.turns or []))
print("read.turns:", len(reading.thread.turns))
print("list.active:", len(listing_active.data))
print("list.archived:", len(listing_archived.data))
print("resumed:", resumed_info)
print("forked:", forked_info)
print("compact:", compact_info)
print("resumed:", resumed_result.id, resumed_result.status)
print("forked:", forked_result.id, forked_result.status)
print("compact:", compact_result.model_dump(mode="json", by_alias=True))

View File

@@ -5,12 +5,7 @@ _EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
@@ -33,11 +28,9 @@ async def main() -> None:
]
)
result = await turn.run()
persisted = await thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
print("Status:", result.status)
print(assistant_text_from_turn(persisted_turn))
print(result.final_response)
if __name__ == "__main__":

View File

@@ -5,12 +5,7 @@ _EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
@@ -26,8 +21,6 @@ with Codex(config=runtime_config()) as codex:
ImageInput(REMOTE_IMAGE_URL),
]
).run()
persisted = thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
print("Status:", result.status)
print(assistant_text_from_turn(persisted_turn))
print(result.final_response)

View File

@@ -6,9 +6,7 @@ if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
temporary_sample_image_path,
)
@@ -36,11 +34,9 @@ async def main() -> None:
]
)
result = await turn.run()
persisted = await thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
print("Status:", result.status)
print(assistant_text_from_turn(persisted_turn))
print(result.final_response)
if __name__ == "__main__":

View File

@@ -6,9 +6,7 @@ if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
temporary_sample_image_path,
)
@@ -29,8 +27,6 @@ with temporary_sample_image_path() as image_path:
LocalImageInput(str(image_path.resolve())),
]
).run()
persisted = thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
print("Status:", result.status)
print(assistant_text_from_turn(persisted_turn))
print(result.final_response)

View File

@@ -5,13 +5,7 @@ _EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
server_label,
)
from _bootstrap import ensure_local_sdk_src, runtime_config, server_label
ensure_local_sdk_src()
@@ -23,9 +17,7 @@ with Codex(config=runtime_config()) as codex:
thread = codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
turn = thread.turn(TextInput("Say hello in one sentence."))
result = turn.run()
persisted = thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
print("Thread:", thread.id)
print("Turn:", result.id)
print("Text:", assistant_text_from_turn(persisted_turn).strip())
print("Text:", result.final_response.strip())

View File

@@ -5,12 +5,7 @@ _EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
@@ -26,7 +21,6 @@ from openai_codex import (
TextInput,
is_retryable_error,
)
from openai_codex.types import TurnStatus
ResultT = TypeVar("ResultT")
@@ -73,19 +67,12 @@ async def main() -> None:
)
except ServerBusyError as exc:
print("Server overloaded after retries:", exc.message)
print("Text:")
return
except JsonRpcError as exc:
print(f"JSON-RPC error {exc.code}: {exc.message}")
print("Text:")
return
persisted = await thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
if result.status == TurnStatus.failed:
print("Turn failed:", result.error)
print("Text:", assistant_text_from_turn(persisted_turn))
print("Text:", result.final_response)
def _run_turn(thread, prompt: str):

View File

@@ -5,12 +5,7 @@ _EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
@@ -21,7 +16,6 @@ from openai_codex import (
TextInput,
retry_on_overload,
)
from openai_codex.types import TurnStatus
with Codex(config=runtime_config()) as codex:
thread = codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
@@ -35,13 +29,7 @@ with Codex(config=runtime_config()) as codex:
)
except ServerBusyError as exc:
print("Server overloaded after retries:", exc.message)
print("Text:")
except JsonRpcError as exc:
print(f"JSON-RPC error {exc.code}: {exc.message}")
print("Text:")
else:
persisted = thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
if result.status == TurnStatus.failed:
print("Turn failed:", result.error)
print("Text:", assistant_text_from_turn(persisted_turn))
print("Text:", result.final_response)

View File

@@ -21,19 +21,9 @@ from openai_codex.types import (
)
def _status_value(status: object | None) -> str:
return str(getattr(status, "value", status))
def _format_usage(usage: object | None) -> str:
if usage is None:
return "usage> (none)"
last = getattr(usage, "last", None)
total = getattr(usage, "total", None)
if last is None or total is None:
return f"usage> {usage}"
def _format_usage(usage: object) -> str:
last = usage.last
total = usage.total
return (
"usage>\n"
f" last: input={last.input_tokens} output={last.output_tokens} reasoning={last.reasoning_output_tokens} total={last.total_tokens} cached={last.cached_input_tokens}\n"
@@ -65,16 +55,14 @@ async def main() -> None:
usage = None
status = None
error = None
printed_delta = False
print("assistant> ", end="", flush=True)
async for event in turn.stream():
payload = event.payload
if event.method == "item/agentMessage/delta":
delta = getattr(payload, "delta", "")
delta = payload.delta
if delta:
print(delta, end="", flush=True)
printed_delta = True
continue
if isinstance(payload, ThreadTokenUsageUpdatedNotification):
usage = payload.token_usage
@@ -83,12 +71,13 @@ async def main() -> None:
status = payload.turn.status
error = payload.turn.error
if printed_delta:
print()
else:
print("[no text]")
print()
if status is None:
raise RuntimeError("stream ended without turn/completed")
if usage is None:
raise RuntimeError("stream ended without token usage")
status_text = _status_value(status)
status_text = status.value
print(f"assistant.status> {status_text}")
if status_text == "failed":
print("assistant.error>", error)

View File

@@ -21,19 +21,9 @@ from openai_codex.types import (
print("Codex mini CLI. Type /exit to quit.")
def _status_value(status: object | None) -> str:
return str(getattr(status, "value", status))
def _format_usage(usage: object | None) -> str:
if usage is None:
return "usage> (none)"
last = getattr(usage, "last", None)
total = getattr(usage, "total", None)
if last is None or total is None:
return f"usage> {usage}"
def _format_usage(usage: object) -> str:
last = usage.last
total = usage.total
return (
"usage>\n"
f" last: input={last.input_tokens} output={last.output_tokens} reasoning={last.reasoning_output_tokens} total={last.total_tokens} cached={last.cached_input_tokens}\n"
@@ -60,16 +50,14 @@ with Codex(config=runtime_config()) as codex:
usage = None
status = None
error = None
printed_delta = False
print("assistant> ", end="", flush=True)
for event in turn.stream():
payload = event.payload
if event.method == "item/agentMessage/delta":
delta = getattr(payload, "delta", "")
delta = payload.delta
if delta:
print(delta, end="", flush=True)
printed_delta = True
continue
if isinstance(payload, ThreadTokenUsageUpdatedNotification):
usage = payload.token_usage
@@ -78,12 +66,13 @@ with Codex(config=runtime_config()) as codex:
status = payload.turn.status
error = payload.turn.error
if printed_delta:
print()
else:
print("[no text]")
print()
if status is None:
raise RuntimeError("stream ended without turn/completed")
if usage is None:
raise RuntimeError("stream ended without token usage")
status_text = _status_value(status)
status_text = status.value
print(f"assistant.status> {status_text}")
if status_text == "failed":
print("assistant.error>", error)

View File

@@ -6,12 +6,7 @@ _EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
@@ -60,9 +55,7 @@ async def main() -> None:
summary=SUMMARY,
)
result = await turn.run()
persisted = await thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
structured_text = assistant_text_from_turn(persisted_turn).strip()
structured_text = result.final_response.strip()
try:
structured = json.loads(structured_text)
except json.JSONDecodeError as exc:
@@ -70,8 +63,8 @@ async def main() -> None:
f"Expected JSON matching OUTPUT_SCHEMA, got: {structured_text!r}"
) from exc
summary = structured.get("summary")
actions = structured.get("actions")
summary = structured["summary"]
actions = structured["actions"]
if (
not isinstance(summary, str)
or not isinstance(actions, list)
@@ -86,7 +79,7 @@ async def main() -> None:
print("actions:")
for action in actions:
print("-", action)
print("Items:", 0 if persisted_turn is None else len(persisted_turn.items or []))
print("Items:", len(result.items))
if __name__ == "__main__":

View File

@@ -6,12 +6,7 @@ _EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
@@ -54,9 +49,7 @@ with Codex(config=runtime_config()) as codex:
summary=SUMMARY,
)
result = turn.run()
persisted = thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
structured_text = assistant_text_from_turn(persisted_turn).strip()
structured_text = result.final_response.strip()
try:
structured = json.loads(structured_text)
except json.JSONDecodeError as exc:
@@ -64,8 +57,8 @@ with Codex(config=runtime_config()) as codex:
f"Expected JSON matching OUTPUT_SCHEMA, got: {structured_text!r}"
) from exc
summary = structured.get("summary")
actions = structured.get("actions")
summary = structured["summary"]
actions = structured["actions"]
if (
not isinstance(summary, str)
or not isinstance(actions, list)
@@ -80,4 +73,4 @@ with Codex(config=runtime_config()) as codex:
print("actions:")
for action in actions:
print("-", action)
print("Items:", 0 if persisted_turn is None else len(persisted_turn.items or []))
print("Items:", len(result.items))

View File

@@ -5,12 +5,7 @@ _EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
@@ -35,29 +30,27 @@ REASONING_RANK = {
"high": 4,
"xhigh": 5,
}
PREFERRED_MODEL = "gpt-5.4"
def _pick_highest_model(models):
visible = [m for m in models if not m.hidden] or models
preferred = next(
(m for m in visible if m.model == PREFERRED_MODEL or m.id == PREFERRED_MODEL), None
)
if preferred is not None:
return preferred
visible = [m for m in models if not m.hidden]
if not visible:
raise RuntimeError("models response did not include visible models")
known_names = {m.id for m in visible} | {m.model for m in visible}
top_candidates = [m for m in visible if not (m.upgrade and m.upgrade in known_names)]
pool = top_candidates or visible
return max(pool, key=lambda m: (m.model, m.id))
if not top_candidates:
raise RuntimeError("models response did not include top-level visible models")
return max(top_candidates, key=lambda m: (m.model, m.id))
def _pick_highest_turn_effort(model) -> ReasoningEffort:
if not model.supported_reasoning_efforts:
return ReasoningEffort.medium
raise RuntimeError(f"{model.model} did not advertise supported reasoning efforts")
best = max(
model.supported_reasoning_efforts,
key=lambda option: REASONING_RANK.get(option.reasoning_effort.value, -1),
key=lambda option: REASONING_RANK[option.reasoning_effort.value],
)
return ReasoningEffort(best.reasoning_effort.value)
@@ -103,13 +96,9 @@ async def main() -> None:
effort=selected_effort,
)
first = await first_turn.run()
persisted = await thread.read(include_turns=True)
first_persisted_turn = find_turn_by_id(persisted.thread.turns, first.id)
print("agent.message:", assistant_text_from_turn(first_persisted_turn))
print(
"items:", 0 if first_persisted_turn is None else len(first_persisted_turn.items or [])
)
print("agent.message:", first.final_response)
print("items:", len(first.items))
second_turn = await thread.turn(
TextInput("Return JSON for a safe feature-flag rollout plan."),
@@ -122,14 +111,9 @@ async def main() -> None:
summary=ReasoningSummary.model_validate("concise"),
)
second = await second_turn.run()
persisted = await thread.read(include_turns=True)
second_persisted_turn = find_turn_by_id(persisted.thread.turns, second.id)
print("agent.message.params:", assistant_text_from_turn(second_persisted_turn))
print(
"items.params:",
0 if second_persisted_turn is None else len(second_persisted_turn.items or []),
)
print("agent.message.params:", second.final_response)
print("items.params:", len(second.items))
if __name__ == "__main__":

View File

@@ -5,12 +5,7 @@ _EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
@@ -33,29 +28,27 @@ REASONING_RANK = {
"high": 4,
"xhigh": 5,
}
PREFERRED_MODEL = "gpt-5.4"
def _pick_highest_model(models):
visible = [m for m in models if not m.hidden] or models
preferred = next(
(m for m in visible if m.model == PREFERRED_MODEL or m.id == PREFERRED_MODEL), None
)
if preferred is not None:
return preferred
visible = [m for m in models if not m.hidden]
if not visible:
raise RuntimeError("models response did not include visible models")
known_names = {m.id for m in visible} | {m.model for m in visible}
top_candidates = [m for m in visible if not (m.upgrade and m.upgrade in known_names)]
pool = top_candidates or visible
return max(pool, key=lambda m: (m.model, m.id))
if not top_candidates:
raise RuntimeError("models response did not include top-level visible models")
return max(top_candidates, key=lambda m: (m.model, m.id))
def _pick_highest_turn_effort(model) -> ReasoningEffort:
if not model.supported_reasoning_efforts:
return ReasoningEffort.medium
raise RuntimeError(f"{model.model} did not advertise supported reasoning efforts")
best = max(
model.supported_reasoning_efforts,
key=lambda option: REASONING_RANK.get(option.reasoning_effort.value, -1),
key=lambda option: REASONING_RANK[option.reasoning_effort.value],
)
return ReasoningEffort(best.reasoning_effort.value)
@@ -99,11 +92,9 @@ with Codex(config=runtime_config()) as codex:
model=selected_model.model,
effort=selected_effort,
).run()
persisted = thread.read(include_turns=True)
first_turn = find_turn_by_id(persisted.thread.turns, first.id)
print("agent.message:", assistant_text_from_turn(first_turn))
print("items:", 0 if first_turn is None else len(first_turn.items or []))
print("agent.message:", first.final_response)
print("items:", len(first.items))
second = thread.turn(
TextInput("Return JSON for a safe feature-flag rollout plan."),
@@ -115,8 +106,6 @@ with Codex(config=runtime_config()) as codex:
sandbox_policy=SANDBOX_POLICY,
summary=ReasoningSummary.model_validate("concise"),
).run()
persisted = thread.read(include_turns=True)
second_turn = find_turn_by_id(persisted.thread.turns, second.id)
print("agent.message.params:", assistant_text_from_turn(second_turn))
print("items.params:", 0 if second_turn is None else len(second_turn.items or []))
print("agent.message.params:", second.final_response)
print("items.params:", len(second.items))

View File

@@ -5,11 +5,7 @@ _EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
runtime_config,
)
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
@@ -26,56 +22,50 @@ async def main() -> None:
steer_turn = await thread.turn(
TextInput("Count from 1 to 40 with commas, then one summary sentence.")
)
steer_result = "sent"
try:
_ = await steer_turn.steer(TextInput("Keep it brief and stop after 10 numbers."))
except Exception as exc:
steer_result = f"skipped {type(exc).__name__}"
steer_result = await steer_turn.steer(
TextInput("Keep it brief and stop after 10 numbers.")
)
steer_event_count = 0
steer_completed_status = "unknown"
steer_completed_turn = None
steer_completed_status = None
steer_deltas = []
async for event in steer_turn.stream():
steer_event_count += 1
if event.method == "item/agentMessage/delta":
steer_deltas.append(event.payload.delta)
continue
if event.method == "turn/completed":
steer_completed_turn = event.payload.turn
steer_completed_status = getattr(
event.payload.turn.status, "value", str(event.payload.turn.status)
)
steer_completed_status = event.payload.turn.status.value
steer_preview = (
assistant_text_from_turn(steer_completed_turn).strip() or "[no assistant text]"
)
if steer_completed_status is None:
raise RuntimeError("stream ended without turn/completed")
steer_preview = "".join(steer_deltas).strip()
interrupt_turn = await thread.turn(
TextInput("Count from 1 to 200 with commas, then one summary sentence.")
)
interrupt_result = "sent"
try:
_ = await interrupt_turn.interrupt()
except Exception as exc:
interrupt_result = f"skipped {type(exc).__name__}"
interrupt_result = await interrupt_turn.interrupt()
interrupt_event_count = 0
interrupt_completed_status = "unknown"
interrupt_completed_turn = None
interrupt_completed_status = None
interrupt_deltas = []
async for event in interrupt_turn.stream():
interrupt_event_count += 1
if event.method == "item/agentMessage/delta":
interrupt_deltas.append(event.payload.delta)
continue
if event.method == "turn/completed":
interrupt_completed_turn = event.payload.turn
interrupt_completed_status = getattr(
event.payload.turn.status, "value", str(event.payload.turn.status)
)
interrupt_completed_status = event.payload.turn.status.value
interrupt_preview = (
assistant_text_from_turn(interrupt_completed_turn).strip() or "[no assistant text]"
)
if interrupt_completed_status is None:
raise RuntimeError("stream ended without turn/completed")
interrupt_preview = "".join(interrupt_deltas).strip()
print("steer.result:", steer_result)
print("steer.result:", steer_result.model_dump(mode="json", by_alias=True))
print("steer.final.status:", steer_completed_status)
print("steer.events.count:", steer_event_count)
print("steer.assistant.preview:", steer_preview)
print("interrupt.result:", interrupt_result)
print("interrupt.result:", interrupt_result.model_dump(mode="json", by_alias=True))
print("interrupt.final.status:", interrupt_completed_status)
print("interrupt.events.count:", interrupt_event_count)
print("interrupt.assistant.preview:", interrupt_preview)

View File

@@ -5,11 +5,7 @@ _EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
runtime_config,
)
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
@@ -20,54 +16,48 @@ with Codex(config=runtime_config()) as codex:
steer_turn = thread.turn(
TextInput("Count from 1 to 40 with commas, then one summary sentence.")
)
steer_result = "sent"
try:
_ = steer_turn.steer(TextInput("Keep it brief and stop after 10 numbers."))
except Exception as exc:
steer_result = f"skipped {type(exc).__name__}"
steer_result = steer_turn.steer(TextInput("Keep it brief and stop after 10 numbers."))
steer_event_count = 0
steer_completed_status = "unknown"
steer_completed_turn = None
steer_completed_status = None
steer_deltas = []
for event in steer_turn.stream():
steer_event_count += 1
if event.method == "item/agentMessage/delta":
steer_deltas.append(event.payload.delta)
continue
if event.method == "turn/completed":
steer_completed_turn = event.payload.turn
steer_completed_status = getattr(
event.payload.turn.status, "value", str(event.payload.turn.status)
)
steer_completed_status = event.payload.turn.status.value
steer_preview = assistant_text_from_turn(steer_completed_turn).strip() or "[no assistant text]"
if steer_completed_status is None:
raise RuntimeError("stream ended without turn/completed")
steer_preview = "".join(steer_deltas).strip()
interrupt_turn = thread.turn(
TextInput("Count from 1 to 200 with commas, then one summary sentence.")
)
interrupt_result = "sent"
try:
_ = interrupt_turn.interrupt()
except Exception as exc:
interrupt_result = f"skipped {type(exc).__name__}"
interrupt_result = interrupt_turn.interrupt()
interrupt_event_count = 0
interrupt_completed_status = "unknown"
interrupt_completed_turn = None
interrupt_completed_status = None
interrupt_deltas = []
for event in interrupt_turn.stream():
interrupt_event_count += 1
if event.method == "item/agentMessage/delta":
interrupt_deltas.append(event.payload.delta)
continue
if event.method == "turn/completed":
interrupt_completed_turn = event.payload.turn
interrupt_completed_status = getattr(
event.payload.turn.status, "value", str(event.payload.turn.status)
)
interrupt_completed_status = event.payload.turn.status.value
interrupt_preview = (
assistant_text_from_turn(interrupt_completed_turn).strip() or "[no assistant text]"
)
if interrupt_completed_status is None:
raise RuntimeError("stream ended without turn/completed")
interrupt_preview = "".join(interrupt_deltas).strip()
print("steer.result:", steer_result)
print("steer.result:", steer_result.model_dump(mode="json", by_alias=True))
print("steer.final.status:", steer_completed_status)
print("steer.events.count:", steer_event_count)
print("steer.assistant.preview:", steer_preview)
print("interrupt.result:", interrupt_result)
print("interrupt.result:", interrupt_result.model_dump(mode="json", by_alias=True))
print("interrupt.final.status:", interrupt_completed_status)
print("interrupt.events.count:", interrupt_event_count)
print("interrupt.assistant.preview:", interrupt_preview)

View File

@@ -81,6 +81,6 @@ python examples/01_quickstart_constructor/async.py
- `13_model_select_and_turn_params/`
- list models, pick highest model + highest supported reasoning effort, run turns, print message and usage
- `14_turn_controls/`
- separate best-effort `steer()` and `interrupt()` demos with concise summaries
- separate `steer()` and `interrupt()` demos with concise summaries
- `15_login_and_account/`
- browser-login handle lifecycle, cancellation, and account inspection

View File

@@ -6,7 +6,7 @@ import sys
import tempfile
import zlib
from pathlib import Path
from typing import Iterable, Iterator
from typing import Any, Iterator
_SDK_PYTHON_DIR = Path(__file__).resolve().parents[1]
_SDK_PYTHON_STR = str(_SDK_PYTHON_DIR)
@@ -103,53 +103,5 @@ def temporary_sample_image_path() -> Iterator[Path]:
yield image_path
def server_label(metadata: object) -> str:
server = getattr(metadata, "serverInfo", None)
server_name = ((getattr(server, "name", None) or "") if server is not None else "").strip()
server_version = (
(getattr(server, "version", None) or "") if server is not None else ""
).strip()
if server_name and server_version:
return f"{server_name} {server_version}"
user_agent = (
(getattr(metadata, "userAgent", None) or "") if metadata is not None else ""
).strip()
return user_agent or "unknown"
def find_turn_by_id(turns: Iterable[object] | None, turn_id: str) -> object | None:
for turn in turns or []:
if getattr(turn, "id", None) == turn_id:
return turn
return None
def assistant_text_from_turn(turn: object | None) -> str:
if turn is None:
return ""
chunks: list[str] = []
for item in getattr(turn, "items", []) or []:
raw_item = item.model_dump(mode="json") if hasattr(item, "model_dump") else item
if not isinstance(raw_item, dict):
continue
item_type = raw_item.get("type")
if item_type == "agentMessage":
text = raw_item.get("text")
if isinstance(text, str) and text:
chunks.append(text)
continue
if item_type != "message" or raw_item.get("role") != "assistant":
continue
for content in raw_item.get("content") or []:
if not isinstance(content, dict) or content.get("type") != "output_text":
continue
text = content.get("text")
if isinstance(text, str) and text:
chunks.append(text)
return "".join(chunks)
def server_label(metadata: Any) -> str:
return f"{metadata.serverInfo.name} {metadata.serverInfo.version}"