mirror of
https://github.com/openai/codex.git
synced 2026-02-06 00:43:40 +00:00
Compare commits
3 Commits
queue/stee
...
codex/ci-f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ee193c9ec1 | ||
|
|
012bf21679 | ||
|
|
55edfc386a |
152
EXEC_PLAN.md
Normal file
152
EXEC_PLAN.md
Normal file
@@ -0,0 +1,152 @@
|
||||
# Codex Execution Plans (ExecPlans):
|
||||
|
||||
This document describes the requirements for an execution plan ("ExecPlan"), a design document that a coding agent can follow to deliver a working feature or system change. Treat the reader as a complete beginner to this repository: they have only the current working tree and the single ExecPlan file you provide. There is no memory of prior plans and no external context.
|
||||
|
||||
## How to use ExecPlans and PLANS.md
|
||||
|
||||
When authoring an executable specification (ExecPlan), follow PLANS.md _to the letter_. If it is not in your context, refresh your memory by reading the entire PLANS.md file. Be thorough in reading (and re-reading) source material to produce an accurate specification. When creating a spec, start from the skeleton and flesh it out as you do your research.
|
||||
|
||||
When implementing an executable specification (ExecPlan), do not prompt the user for "next steps"; simply proceed to the next milestone. Keep all sections up to date, add or split entries in the list at every stopping point to affirmatively state the progress made and next steps. Resolve ambiguities autonomously, and commit frequently.
|
||||
|
||||
When discussing an executable specification (ExecPlan), record decisions in a log in the spec for posterity; it should be unambiguously clear why any change to the specification was made. ExecPlans are living documents, and it should always be possible to restart from _only_ the ExecPlan and no other work.
|
||||
|
||||
When researching a design with challenging requirements or significant unknowns, use milestones to implement proof of concepts, "toy implementations", etc., that allow validating whether the user's proposal is feasible. Read the source code of libraries by finding or acquiring them, research deeply, and include prototypes to guide a fuller implementation.
|
||||
|
||||
## Requirements
|
||||
|
||||
NON-NEGOTIABLE REQUIREMENTS:
|
||||
|
||||
* Every ExecPlan must be fully self-contained. Self-contained means that in its current form it contains all knowledge and instructions needed for a novice to succeed.
|
||||
* Every ExecPlan is a living document. Contributors are required to revise it as progress is made, as discoveries occur, and as design decisions are finalized. Each revision must remain fully self-contained.
|
||||
* Every ExecPlan must enable a complete novice to implement the feature end-to-end without prior knowledge of this repo.
|
||||
* Every ExecPlan must produce a demonstrably working behavior, not merely code changes to "meet a definition".
|
||||
* Every ExecPlan must define every term of art in plain language or do not use it.
|
||||
|
||||
Purpose and intent come first. Begin by explaining, in a few sentences, why the work matters from a user's perspective: what someone can do after this change that they could not do before, and how to see it working. Then guide the reader through the exact steps to achieve that outcome, including what to edit, what to run, and what they should observe.
|
||||
|
||||
The agent executing your plan can list files, read files, search, run the project, and run tests. It does not know any prior context and cannot infer what you meant from earlier milestones. Repeat any assumption you rely on. Do not point to external blogs or docs; if knowledge is required, embed it in the plan itself in your own words. If an ExecPlan builds upon a prior ExecPlan and that file is checked in, incorporate it by reference. If it is not, you must include all relevant context from that plan.
|
||||
|
||||
## Formatting
|
||||
|
||||
Format and envelope are simple and strict. Each ExecPlan must be one single fenced code block labeled as `md` that begins and ends with triple backticks. Do not nest additional triple-backtick code fences inside; when you need to show commands, transcripts, diffs, or code, present them as indented blocks within that single fence. Use indentation for clarity rather than code fences inside an ExecPlan to avoid prematurely closing the ExecPlan's code fence. Use two newlines after every heading, use # and ## and so on, and correct syntax for ordered and unordered lists.
|
||||
|
||||
When writing an ExecPlan to a Markdown (.md) file where the content of the file *is only* the single ExecPlan, you should omit the triple backticks.
|
||||
|
||||
Write in plain prose. Prefer sentences over lists. Avoid checklists, tables, and long enumerations unless brevity would obscure meaning. Checklists are permitted only in the `Progress` section, where they are mandatory. Narrative sections must remain prose-first.
|
||||
|
||||
## Guidelines
|
||||
|
||||
Self-containment and plain language are paramount. If you introduce a phrase that is not ordinary English ("daemon", "middleware", "RPC gateway", "filter graph"), define it immediately and remind the reader how it manifests in this repository (for example, by naming the files or commands where it appears). Do not say "as defined previously" or "according to the architecture doc." Include the needed explanation here, even if you repeat yourself.
|
||||
|
||||
Avoid common failure modes. Do not rely on undefined jargon. Do not describe "the letter of a feature" so narrowly that the resulting code compiles but does nothing meaningful. Do not outsource key decisions to the reader. When ambiguity exists, resolve it in the plan itself and explain why you chose that path. Err on the side of over-explaining user-visible effects and under-specifying incidental implementation details.
|
||||
|
||||
Anchor the plan with observable outcomes. State what the user can do after implementation, the commands to run, and the outputs they should see. Acceptance should be phrased as behavior a human can verify ("after starting the server, navigating to [http://localhost:8080/health](http://localhost:8080/health) returns HTTP 200 with body OK") rather than internal attributes ("added a HealthCheck struct"). If a change is internal, explain how its impact can still be demonstrated (for example, by running tests that fail before and pass after, and by showing a scenario that uses the new behavior).
|
||||
|
||||
Specify repository context explicitly. Name files with full repository-relative paths, name functions and modules precisely, and describe where new files should be created. If touching multiple areas, include a short orientation paragraph that explains how those parts fit together so a novice can navigate confidently. When running commands, show the working directory and exact command line. When outcomes depend on environment, state the assumptions and provide alternatives when reasonable.
|
||||
|
||||
Be idempotent and safe. Write the steps so they can be run multiple times without causing damage or drift. If a step can fail halfway, include how to retry or adapt. If a migration or destructive operation is necessary, spell out backups or safe fallbacks. Prefer additive, testable changes that can be validated as you go.
|
||||
|
||||
Validation is not optional. Include instructions to run tests, to start the system if applicable, and to observe it doing something useful. Describe comprehensive testing for any new features or capabilities. Include expected outputs and error messages so a novice can tell success from failure. Where possible, show how to prove that the change is effective beyond compilation (for example, through a small end-to-end scenario, a CLI invocation, or an HTTP request/response transcript). State the exact test commands appropriate to the project’s toolchain and how to interpret their results.
|
||||
|
||||
Capture evidence. When your steps produce terminal output, short diffs, or logs, include them inside the single fenced block as indented examples. Keep them concise and focused on what proves success. If you need to include a patch, prefer file-scoped diffs or small excerpts that a reader can recreate by following your instructions rather than pasting large blobs.
|
||||
|
||||
## Milestones
|
||||
|
||||
Milestones are narrative, not bureaucracy. If you break the work into milestones, introduce each with a brief paragraph that describes the scope, what will exist at the end of the milestone that did not exist before, the commands to run, and the acceptance you expect to observe. Keep it readable as a story: goal, work, result, proof. Progress and milestones are distinct: milestones tell the story, progress tracks granular work. Both must exist. Never abbreviate a milestone merely for the sake of brevity, do not leave out details that could be crucial to a future implementation.
|
||||
|
||||
Each milestone must be independently verifiable and incrementally implement the overall goal of the execution plan.
|
||||
|
||||
## Living plans and design decisions
|
||||
|
||||
* ExecPlans are living documents. As you make key design decisions, update the plan to record both the decision and the thinking behind it. Record all decisions in the `Decision Log` section.
|
||||
* ExecPlans must contain and maintain a `Progress` section, a `Surprises & Discoveries` section, a `Decision Log`, and an `Outcomes & Retrospective` section. These are not optional.
|
||||
* When you discover optimizer behavior, performance tradeoffs, unexpected bugs, or inverse/unapply semantics that shaped your approach, capture those observations in the `Surprises & Discoveries` section with short evidence snippets (test output is ideal).
|
||||
* If you change course mid-implementation, document why in the `Decision Log` and reflect the implications in `Progress`. Plans are guides for the next contributor as much as checklists for you.
|
||||
* At completion of a major task or the full plan, write an `Outcomes & Retrospective` entry summarizing what was achieved, what remains, and lessons learned.
|
||||
|
||||
# Prototyping milestones and parallel implementations
|
||||
|
||||
It is acceptable—-and often encouraged—-to include explicit prototyping milestones when they de-risk a larger change. Examples: adding a low-level operator to a dependency to validate feasibility, or exploring two composition orders while measuring optimizer effects. Keep prototypes additive and testable. Clearly label the scope as “prototyping”; describe how to run and observe results; and state the criteria for promoting or discarding the prototype.
|
||||
|
||||
Prefer additive code changes followed by subtractions that keep tests passing. Parallel implementations (e.g., keeping an adapter alongside an older path during migration) are fine when they reduce risk or enable tests to continue passing during a large migration. Describe how to validate both paths and how to retire one safely with tests. When working with multiple new libraries or feature areas, consider creating spikes that evaluate the feasibility of these features _independently_ of one another, proving that the external library performs as expected and implements the features we need in isolation.
|
||||
|
||||
## Skeleton of a Good ExecPlan
|
||||
|
||||
```md
|
||||
# <Short, action-oriented description>
|
||||
|
||||
This ExecPlan is a living document. The sections `Progress`, `Surprises & Discoveries`, `Decision Log`, and `Outcomes & Retrospective` must be kept up to date as work proceeds.
|
||||
|
||||
If PLANS.md file is checked into the repo, reference the path to that file here from the repository root and note that this document must be maintained in accordance with PLANS.md.
|
||||
|
||||
## Purpose / Big Picture
|
||||
|
||||
Explain in a few sentences what someone gains after this change and how they can see it working. State the user-visible behavior you will enable.
|
||||
|
||||
## Progress
|
||||
|
||||
Use a list with checkboxes to summarize granular steps. Every stopping point must be documented here, even if it requires splitting a partially completed task into two (“done” vs. “remaining”). This section must always reflect the actual current state of the work.
|
||||
|
||||
- [x] (2025-10-01 13:00Z) Example completed step.
|
||||
- [ ] Example incomplete step.
|
||||
- [ ] Example partially completed step (completed: X; remaining: Y).
|
||||
|
||||
Use timestamps to measure rates of progress.
|
||||
|
||||
## Surprises & Discoveries
|
||||
|
||||
Document unexpected behaviors, bugs, optimizations, or insights discovered during implementation. Provide concise evidence.
|
||||
|
||||
- Observation: …
|
||||
Evidence: …
|
||||
|
||||
## Decision Log
|
||||
|
||||
Record every decision made while working on the plan in the format:
|
||||
|
||||
- Decision: …
|
||||
Rationale: …
|
||||
Date/Author: …
|
||||
|
||||
## Outcomes & Retrospective
|
||||
|
||||
Summarize outcomes, gaps, and lessons learned at major milestones or at completion. Compare the result against the original purpose.
|
||||
|
||||
## Context and Orientation
|
||||
|
||||
Describe the current state relevant to this task as if the reader knows nothing. Name the key files and modules by full path. Define any non-obvious term you will use. Do not refer to prior plans.
|
||||
|
||||
## Plan of Work
|
||||
|
||||
Describe, in prose, the sequence of edits and additions. For each edit, name the file and location (function, module) and what to insert or change. Keep it concrete and minimal.
|
||||
|
||||
## Concrete Steps
|
||||
|
||||
State the exact commands to run and where to run them (working directory). When a command generates output, show a short expected transcript so the reader can compare. This section must be updated as work proceeds.
|
||||
|
||||
## Validation and Acceptance
|
||||
|
||||
Describe how to start or exercise the system and what to observe. Phrase acceptance as behavior, with specific inputs and outputs. If tests are involved, say "run <project’s test command> and expect <N> passed; the new test <name> fails before the change and passes after>".
|
||||
|
||||
## Idempotence and Recovery
|
||||
|
||||
If steps can be repeated safely, say so. If a step is risky, provide a safe retry or rollback path. Keep the environment clean after completion.
|
||||
|
||||
## Artifacts and Notes
|
||||
|
||||
Include the most important transcripts, diffs, or snippets as indented examples. Keep them concise and focused on what proves success.
|
||||
|
||||
## Interfaces and Dependencies
|
||||
|
||||
Be prescriptive. Name the libraries, modules, and services to use and why. Specify the types, traits/interfaces, and function signatures that must exist at the end of the milestone. Prefer stable names and paths such as `crate::module::function` or `package.submodule.Interface`. E.g.:
|
||||
|
||||
In crates/foo/planner.rs, define:
|
||||
|
||||
pub trait Planner {
|
||||
fn plan(&self, observed: &Observed) -> Vec<Action>;
|
||||
}
|
||||
```
|
||||
|
||||
If you follow the guidance above, a single, stateless agent -- or a human novice -- can read your ExecPlan from top to bottom and produce a working, observable result. That is the bar: SELF-CONTAINED, SELF-SUFFICIENT, NOVICE-GUIDING, OUTCOME-FOCUSED.
|
||||
|
||||
When you revise a plan, you must ensure your changes are comprehensively reflected across all sections, including the living document sections, and you must write a note at the bottom of the plan describing the change and the reason why. ExecPlans must describe not just the what but the why for almost everything.
|
||||
99
sdk/python/EXEC_PLAN.md
Normal file
99
sdk/python/EXEC_PLAN.md
Normal file
@@ -0,0 +1,99 @@
|
||||
# Build a Python SDK for Codex CLI
|
||||
|
||||
This ExecPlan is a living document. The sections `Progress`, `Surprises & Discoveries`, `Decision Log`, and `Outcomes & Retrospective` must be kept up to date as work proceeds. There is no PLANS.md file in this repository; all guidance comes from EXEC_PLAN.md.
|
||||
|
||||
## Purpose / Big Picture
|
||||
|
||||
Deliver a first-class Python SDK that mirrors the TypeScript SDK in `sdk/typescript`. Python developers should be able to embed the Codex CLI agent by instantiating a client, starting or resuming threads, and running turns synchronously or as a streamed generator of events. Success means someone can pip-install the package locally (editable install is fine), run the sample code from the README, and observe structured events and final responses without referring back to the TypeScript sources.
|
||||
|
||||
## Progress
|
||||
|
||||
- [x] (2025-12-05 18:31Z) Reviewed EXEC_PLAN.md, TypeScript SDK sources/tests, and exec event schema to scope the Python SDK.
|
||||
- [x] (2025-12-05 18:44Z) Established Python package scaffold and parity API surface (modules, classes, type hints).
|
||||
- [x] (2025-12-05 18:44Z) Implemented process runner, thread logic, schema handling, and streaming/parsing.
|
||||
- [x] (2025-12-05 18:45Z) Added README, examples, and tests exercising options, streaming, cancellation, and schema handling.
|
||||
- [ ] (2025-12-05 18:45Z) Ran unittest suite (skipped in sandbox due to loopback bind restrictions); need re-run in an environment that allows local HTTP servers to validate fully.
|
||||
|
||||
## Surprises & Discoveries
|
||||
|
||||
- pip install -e . with build isolation failed offline; reran with --no-build-isolation but installation was blocked by site-packages permissions. Worked around by running tests with PYTHONPATH instead of installation.
|
||||
- Sandbox denied binding a loopback HTTP server, so tests that rely on SSE proxies were skipped in this environment. The proxy helper now skips cleanly when sockets are unavailable.
|
||||
|
||||
## Decision Log
|
||||
|
||||
- Decision: Follow the TypeScript SDK behavior and default flags, using the Codex CLI (`codex exec --experimental-json`) as the transport. Adopt a Pythonic surface (snake_case, context managers where useful) while keeping method names close to TS (`start_thread`, `resume_thread`, `run`, `run_streamed`).
|
||||
Rationale: Minimizes divergence for users moving between languages while respecting Python conventions.
|
||||
Date/Author: 2025-12-05 / assistant
|
||||
- Decision: Skip proxy-backed tests when the sandbox forbids binding a loopback HTTP server.
|
||||
Rationale: Allows local test invocation without hard failures in restricted environments while preserving coverage when sockets are available.
|
||||
Date/Author: 2025-12-05 / assistant
|
||||
|
||||
## Outcomes & Retrospective
|
||||
|
||||
- To be filled after implementation and validation.
|
||||
|
||||
## Context and Orientation
|
||||
|
||||
- The TypeScript SDK lives in `sdk/typescript`. Key files: `src/codex.ts`, `src/thread.ts`, `src/exec.ts`, `src/events.ts`, `src/items.ts`, and `src/outputSchemaFile.ts`. Tests in `sdk/typescript/tests` exercise threading, streaming, options, env overrides, output schema handling, images, additional directories, and abort signals.
|
||||
- The Codex CLI emits JSONL events described in `codex-rs/exec/src/exec_events.rs`. Events include `thread.started`, `turn.started`, `item.*`, `turn.completed`, `turn.failed`, and `error`, with item payloads such as `agent_message`, `command_execution`, `file_change`, `mcp_tool_call`, `web_search`, `todo_list`, and `error`.
|
||||
- The CLI is invoked as `codex exec --experimental-json` with flags like `--model`, `--sandbox`, `--cd`, `--add-dir`, `--skip-git-repo-check`, `--output-schema`, and `--config` entries for `model_reasoning_effort`, `sandbox_workspace_write.network_access`, `features.web_search_request`, and `approval_policy`. Images are forwarded via repeated `--image` flags.
|
||||
- The TypeScript SDK writes output schemas to a temp file and cleans them up after each turn. It aggregates text inputs separated by blank lines, forwards images, sets `CODEX_INTERNAL_ORIGINATOR_OVERRIDE` to `codex_sdk_ts`, and injects `OPENAI_BASE_URL` and `CODEX_API_KEY` into the child env unless overridden.
|
||||
- Tests use a local Codex binary built from `codex-rs` (e.g., `codex-rs/target/debug/codex`) and a lightweight HTTP proxy in tests to capture `/responses` requests and stream SSE events. We should reuse this strategy with Python’s stdlib to avoid network calls.
|
||||
|
||||
## Plan of Work
|
||||
|
||||
First, scaffold a Python package under `sdk/python` with `pyproject.toml`, `README.md`, and `src/codex_sdk` module files. Mirror the TS module layout: `codex.py` (entry client), `thread.py` (thread state and run/run_streamed), `exec.py` (process runner), `types.py` (events/items dataclasses), and `schema_file.py` (temp schema handling). Export a `Codex` class that owns a `CodexExec` runner and produces `Thread` instances via `start_thread`/`resume_thread`. Thread options should include model, sandbox_mode, working_directory, skip_git_repo_check, model_reasoning_effort, network_access_enabled, web_search_enabled, approval_policy, and additional_directories; turn options should allow `output_schema` and `cancellation` (event or asyncio task cancellation).
|
||||
|
||||
Implement `CodexExec.run` to spawn the Codex CLI with the same flags as TS, accept a `signal`/cancel hook, wire stdin/stdout, and yield decoded lines. Build env injection mirroring TS: inherit `os.environ` unless overridden, set `CODEX_INTERNAL_ORIGINATOR_OVERRIDE=codex_sdk_py`, and overlay `OPENAI_BASE_URL`/`CODEX_API_KEY` when provided. Provide binary resolution similar to TS (`vendor/<target-triple>/codex/codex[.exe]`) with optional override path.
|
||||
|
||||
Implement event parsing into typed dataclasses, raising clear errors on JSON decode or unexpected structures. `Thread.run_streamed` should normalize inputs (string or list of `{type: "text"|"local_image"}` dicts), concatenate text segments with blank lines, collect image paths, and call `CodexExec.run`. As events stream, update `thread_id` on `thread.started`, forward each parsed event to the caller, and on completion capture `usage` and `final_response` (last `agent_message` text) while collecting items. `Thread.run` should drain the generator and either return `{items, final_response, usage}` or raise on `turn.failed` or stream-level errors. Ensure output schema temp dir is cleaned even on exceptions.
|
||||
|
||||
Add tests with `unittest` mirroring TS coverage: successful run collecting items/usage; repeated runs continue thread and include previous assistant output; resume_thread by id; options mapped to CLI flags; env override; output schema temp file creation and cleanup; input concatenation; image forwarding; additional_directories; working_directory with and without `skip_git_repo_check`; originator header; turn failure surfaces errors; streaming path yields events. Implement an HTTP proxy helper (akin to `responsesProxy.ts`) using `http.server` + SSE formatting to capture requests for assertions. Provide a simple abort/cancel test using `threading.Event` or `asyncio` cancellation to ensure process termination.
|
||||
|
||||
Document usage in `sdk/python/README.md` with quickstart, streaming example, structured output, image input, resume thread, working directory controls, and env overrides. Note Python version requirement (e.g., 3.9+), install steps (`pip install -e .`), and test command.
|
||||
|
||||
## Concrete Steps
|
||||
|
||||
- Work in `sdk/python`.
|
||||
- Create packaging scaffold: `pyproject.toml` with `setuptools`, `src/codex_sdk/__init__.py`, and module files. Add `README.md` referencing examples below.
|
||||
- Port core logic from TS to Python modules as described above.
|
||||
- Write tests in `sdk/python/tests` using stdlib `unittest` and helper proxy server; point Codex path to `../codex-rs/target/debug/codex`.
|
||||
- Commands to run (once implemented):
|
||||
- Build/install editable for development: `cd sdk/python && python -m pip install -e .`
|
||||
- Run tests: `cd sdk/python && python -m unittest discover -v`
|
||||
- Optional manual demo: `python -m examples.basic` once examples exist.
|
||||
- Keep this section updated if commands change during implementation.
|
||||
|
||||
## Validation and Acceptance
|
||||
|
||||
Acceptance hinges on observable behavior: installing the package locally, running the quickstart script, and seeing a turn complete with streamed events and final response text. Automated validation: `python -m unittest discover -v` passes, including cases for options mapping, output schema lifecycle, image forwarding, and cancellation. Manual validation: start a thread, call `run` twice to confirm thread continuity, and `resume_thread` with saved id. The SDK should emit helpful errors on malformed events or CLI failures.
|
||||
|
||||
## Idempotence and Recovery
|
||||
|
||||
All commands are safe to re-run. Package builds are additive; re-running tests or installs is safe. Temp schema directories and proxy servers should be cleaned in `finally` blocks; retrying a failed test should not leak files or processes. If the CLI is missing, the SDK should raise a clear error rather than leaving partial state.
|
||||
|
||||
## Artifacts and Notes
|
||||
|
||||
- Keep key test transcripts or sample outputs short and inline in this plan as progress is made.
|
||||
- Note deviations from TS behavior with rationale in the Decision Log.
|
||||
|
||||
## Interfaces and Dependencies
|
||||
|
||||
- Public classes/functions to expose in `codex_sdk`:
|
||||
- `Codex(codex_path_override: Optional[str] = None, base_url: Optional[str] = None, api_key: Optional[str] = None, env: Optional[Dict[str, str]] = None)`
|
||||
- `Codex.start_thread(options: ThreadOptions = None) -> Thread`
|
||||
- `Codex.resume_thread(thread_id: str, options: ThreadOptions = None) -> Thread`
|
||||
- `Thread.run(input: Input, turn_options: TurnOptions = None) -> TurnResult`
|
||||
- `Thread.run_streamed(input: Input, turn_options: TurnOptions = None) -> StreamedTurn` where `StreamedTurn.events` is an iterator/generator of `ThreadEvent`.
|
||||
- `Thread.id` property returning the current thread id (or None before first turn).
|
||||
- Types:
|
||||
- `Input = Union[str, List[InputEntry]]` with `InputEntry` dicts `{ "type": "text", "text": str }` or `{ "type": "local_image", "path": str }`.
|
||||
- `ThreadEvent`/`ThreadItem` dataclasses mirroring `codex-rs/exec/src/exec_events.rs`.
|
||||
- `TurnResult` holding `items: List[ThreadItem]`, `final_response: str`, `usage: Optional[Usage]`.
|
||||
- `ThreadOptions` (model, sandbox_mode, working_directory, skip_git_repo_check, model_reasoning_effort, network_access_enabled, web_search_enabled, approval_policy, additional_directories) and `TurnOptions` (output_schema, signal/cancel hook).
|
||||
- Dependencies: prefer Python stdlib; avoid third-party packages unless already vendored. Use `subprocess`, `json`, `tempfile`, `pathlib`, `typing`, `dataclasses`, and `asyncio` if needed for cancellation support.
|
||||
|
||||
---
|
||||
|
||||
Revision note: Initial version created to guide Python SDK implementation based on the TypeScript SDK, following EXEC_PLAN.md requirements.
|
||||
Revision note: Updated after implementing the Python SDK, adding docs/tests, and documenting sandbox-related test skips (2025-12-05).
|
||||
129
sdk/python/README.md
Normal file
129
sdk/python/README.md
Normal file
@@ -0,0 +1,129 @@
|
||||
# Codex SDK for Python
|
||||
|
||||
Embed the Codex agent in Python workflows by spawning the Codex CLI and consuming structured events.
|
||||
|
||||
The SDK launches the bundled `codex` binary (or a custom path) and exchanges JSONL events over stdin/stdout.
|
||||
|
||||
## Installation
|
||||
|
||||
Until packages are published, install locally in editable mode:
|
||||
|
||||
```
|
||||
pip install -e .
|
||||
```
|
||||
|
||||
Requires Python 3.9+ and a Codex binary reachable either via the packaged vendor path or `codex_path_override`.
|
||||
|
||||
## Quickstart
|
||||
|
||||
```python
|
||||
from codex_sdk import Codex
|
||||
|
||||
codex = Codex()
|
||||
thread = codex.start_thread()
|
||||
turn = thread.run("Diagnose the test failure and propose a fix")
|
||||
|
||||
print(turn.final_response)
|
||||
print(turn.items)
|
||||
```
|
||||
|
||||
Call `run()` again on the same `Thread` to continue the conversation.
|
||||
|
||||
```python
|
||||
next_turn = thread.run("Implement the fix")
|
||||
```
|
||||
|
||||
## Streaming responses
|
||||
|
||||
`run()` buffers events. To react to intermediate progress—tool calls, streamed responses, and file change notifications—use `run_streamed()` instead. It returns a generator of structured events.
|
||||
|
||||
```python
|
||||
from codex_sdk import ThreadEvent
|
||||
|
||||
stream = thread.run_streamed("Diagnose the test failure and propose a fix")
|
||||
|
||||
for event in stream.events:
|
||||
if event.type == "item.completed":
|
||||
print("item", event.item)
|
||||
elif event.type == "turn.completed":
|
||||
print("usage", event.usage)
|
||||
```
|
||||
|
||||
## Structured output
|
||||
|
||||
Provide a JSON schema per turn to receive structured assistant responses.
|
||||
|
||||
```python
|
||||
schema = {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"summary": {"type": "string"},
|
||||
"status": {"type": "string", "enum": ["ok", "action_required"]},
|
||||
},
|
||||
"required": ["summary", "status"],
|
||||
"additionalProperties": False,
|
||||
}
|
||||
|
||||
from codex_sdk import TurnOptions
|
||||
|
||||
turn = thread.run("Summarize repository status", TurnOptions(output_schema=schema))
|
||||
print(turn.final_response)
|
||||
```
|
||||
|
||||
## Attaching images
|
||||
|
||||
Pass structured input entries when including images alongside text. Text entries are concatenated into the prompt; image entries are forwarded to the Codex CLI via `--image`.
|
||||
|
||||
```python
|
||||
turn = thread.run([
|
||||
{"type": "text", "text": "Describe these screenshots"},
|
||||
{"type": "local_image", "path": "./ui.png"},
|
||||
{"type": "local_image", "path": "./diagram.jpg"},
|
||||
])
|
||||
```
|
||||
|
||||
## Resuming an existing thread
|
||||
|
||||
Threads persist in `~/.codex/sessions`. If you lose the in-memory `Thread`, reconstruct it with `resume_thread()` and keep going.
|
||||
|
||||
```python
|
||||
import os
|
||||
|
||||
saved_thread_id = os.environ["CODEX_THREAD_ID"]
|
||||
thread = codex.resume_thread(saved_thread_id)
|
||||
thread.run("Implement the fix")
|
||||
```
|
||||
|
||||
## Working directory controls
|
||||
|
||||
Codex runs in the current working directory by default. To bypass the Git repository check for temporary directories, pass `skip_git_repo_check=True` when creating a thread.
|
||||
|
||||
```python
|
||||
from codex_sdk import ThreadOptions
|
||||
|
||||
thread = codex.start_thread(ThreadOptions(working_directory="/path/to/project", skip_git_repo_check=True))
|
||||
```
|
||||
|
||||
## Controlling the Codex CLI environment
|
||||
|
||||
By default, the CLI inherits `os.environ`. Override it when you need a sandboxed environment and the SDK will inject required variables (`OPENAI_BASE_URL`, `CODEX_API_KEY`, and the SDK originator marker).
|
||||
|
||||
```python
|
||||
from codex_sdk import CodexOptions
|
||||
|
||||
codex = Codex(CodexOptions(env={"PATH": "/usr/local/bin"}))
|
||||
```
|
||||
|
||||
## Options reference
|
||||
|
||||
- `CodexOptions`: `codex_path_override`, `base_url`, `api_key`, `env`
|
||||
- `ThreadOptions`: `model`, `sandbox_mode`, `working_directory`, `skip_git_repo_check`, `model_reasoning_effort`, `network_access_enabled`, `web_search_enabled`, `approval_policy`, `additional_directories`
|
||||
- `TurnOptions`: `output_schema`, `cancellation_event`
|
||||
|
||||
## Running tests
|
||||
|
||||
```
|
||||
python -m unittest discover -v
|
||||
```
|
||||
|
||||
Set `codex_path_override` if the bundled binary is unavailable; the test suite expects a Codex binary and will exercise a local HTTP proxy to avoid external calls.
|
||||
35
sdk/python/pyproject.toml
Normal file
35
sdk/python/pyproject.toml
Normal file
@@ -0,0 +1,35 @@
|
||||
[build-system]
|
||||
requires = ["setuptools", "wheel"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "codex-sdk"
|
||||
version = "0.0.0.dev0"
|
||||
description = "Python SDK for Codex CLI."
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.9"
|
||||
license = { text = "Apache-2.0" }
|
||||
authors = [{ name = "OpenAI" }]
|
||||
keywords = ["openai", "codex", "sdk", "python"]
|
||||
classifiers = [
|
||||
"Development Status :: 3 - Alpha",
|
||||
"Intended Audience :: Developers",
|
||||
"License :: OSI Approved :: Apache Software License",
|
||||
"Programming Language :: Python :: 3",
|
||||
"Programming Language :: Python :: 3.9",
|
||||
"Programming Language :: Python :: 3.10",
|
||||
"Programming Language :: Python :: 3.11",
|
||||
"Programming Language :: Python :: 3.12",
|
||||
"Programming Language :: Python :: 3.13",
|
||||
"Topic :: Software Development :: Libraries",
|
||||
]
|
||||
|
||||
[project.urls]
|
||||
Homepage = "https://github.com/openai/codex"
|
||||
Repository = "https://github.com/openai/codex"
|
||||
|
||||
[tool.setuptools]
|
||||
package-dir = { "" = "src" }
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
where = ["src"]
|
||||
39
sdk/python/src/codex_sdk/__init__.py
Normal file
39
sdk/python/src/codex_sdk/__init__.py
Normal file
@@ -0,0 +1,39 @@
|
||||
from .codex import Codex
|
||||
from .options import CodexOptions, ThreadOptions, TurnOptions
|
||||
from .thread import Input, StreamedTurn, Thread, ThreadRunError, TurnResult
|
||||
from .types import (
|
||||
AgentMessageItem,
|
||||
CommandExecutionItem,
|
||||
ErrorItem,
|
||||
FileChangeItem,
|
||||
McpToolCallItem,
|
||||
ReasoningItem,
|
||||
ThreadEvent,
|
||||
ThreadItem,
|
||||
TodoListItem,
|
||||
Usage,
|
||||
WebSearchItem,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"Codex",
|
||||
"CodexOptions",
|
||||
"ThreadOptions",
|
||||
"TurnOptions",
|
||||
"Thread",
|
||||
"ThreadRunError",
|
||||
"TurnResult",
|
||||
"StreamedTurn",
|
||||
"Input",
|
||||
"ThreadEvent",
|
||||
"ThreadItem",
|
||||
"Usage",
|
||||
"AgentMessageItem",
|
||||
"ReasoningItem",
|
||||
"CommandExecutionItem",
|
||||
"FileChangeItem",
|
||||
"McpToolCallItem",
|
||||
"WebSearchItem",
|
||||
"TodoListItem",
|
||||
"ErrorItem",
|
||||
]
|
||||
BIN
sdk/python/src/codex_sdk/__pycache__/__init__.cpython-312.pyc
Normal file
BIN
sdk/python/src/codex_sdk/__pycache__/__init__.cpython-312.pyc
Normal file
Binary file not shown.
BIN
sdk/python/src/codex_sdk/__pycache__/codex.cpython-312.pyc
Normal file
BIN
sdk/python/src/codex_sdk/__pycache__/codex.cpython-312.pyc
Normal file
Binary file not shown.
BIN
sdk/python/src/codex_sdk/__pycache__/exec.cpython-312.pyc
Normal file
BIN
sdk/python/src/codex_sdk/__pycache__/exec.cpython-312.pyc
Normal file
Binary file not shown.
BIN
sdk/python/src/codex_sdk/__pycache__/options.cpython-312.pyc
Normal file
BIN
sdk/python/src/codex_sdk/__pycache__/options.cpython-312.pyc
Normal file
Binary file not shown.
BIN
sdk/python/src/codex_sdk/__pycache__/schema_file.cpython-312.pyc
Normal file
BIN
sdk/python/src/codex_sdk/__pycache__/schema_file.cpython-312.pyc
Normal file
Binary file not shown.
BIN
sdk/python/src/codex_sdk/__pycache__/thread.cpython-312.pyc
Normal file
BIN
sdk/python/src/codex_sdk/__pycache__/thread.cpython-312.pyc
Normal file
Binary file not shown.
BIN
sdk/python/src/codex_sdk/__pycache__/types.cpython-312.pyc
Normal file
BIN
sdk/python/src/codex_sdk/__pycache__/types.cpython-312.pyc
Normal file
Binary file not shown.
22
sdk/python/src/codex_sdk/codex.py
Normal file
22
sdk/python/src/codex_sdk/codex.py
Normal file
@@ -0,0 +1,22 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Optional
|
||||
|
||||
from .exec import CodexExec
|
||||
from .options import CodexOptions, ThreadOptions
|
||||
from .thread import Thread
|
||||
|
||||
|
||||
class Codex:
|
||||
"""Main entry point for interacting with the Codex agent."""
|
||||
|
||||
def __init__(self, options: Optional[CodexOptions] = None) -> None:
|
||||
opts = options or CodexOptions()
|
||||
self._options = opts
|
||||
self._exec = CodexExec(opts.codex_path_override, opts.env)
|
||||
|
||||
def start_thread(self, options: Optional[ThreadOptions] = None) -> Thread:
|
||||
return Thread(self._exec, self._options, options or ThreadOptions(), None)
|
||||
|
||||
def resume_thread(self, thread_id: str, options: Optional[ThreadOptions] = None) -> Thread:
|
||||
return Thread(self._exec, self._options, options or ThreadOptions(), thread_id)
|
||||
196
sdk/python/src/codex_sdk/exec.py
Normal file
196
sdk/python/src/codex_sdk/exec.py
Normal file
@@ -0,0 +1,196 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Dict, Generator, List, Optional
|
||||
|
||||
from .options import CancellationEvent
|
||||
|
||||
INTERNAL_ORIGINATOR_ENV = "CODEX_INTERNAL_ORIGINATOR_OVERRIDE"
|
||||
PYTHON_SDK_ORIGINATOR = "codex_sdk_py"
|
||||
|
||||
|
||||
class CancelledError(Exception):
|
||||
"""Raised when a turn is cancelled before completion."""
|
||||
|
||||
|
||||
@dataclass
|
||||
class CodexExecArgs:
|
||||
input: str
|
||||
base_url: Optional[str] = None
|
||||
api_key: Optional[str] = None
|
||||
thread_id: Optional[str] = None
|
||||
images: Optional[List[str]] = None
|
||||
model: Optional[str] = None
|
||||
sandbox_mode: Optional[str] = None
|
||||
working_directory: Optional[str] = None
|
||||
additional_directories: Optional[List[str]] = None
|
||||
skip_git_repo_check: bool = False
|
||||
output_schema_file: Optional[str] = None
|
||||
model_reasoning_effort: Optional[str] = None
|
||||
cancellation_event: Optional[CancellationEvent] = None
|
||||
network_access_enabled: Optional[bool] = None
|
||||
web_search_enabled: Optional[bool] = None
|
||||
approval_policy: Optional[str] = None
|
||||
|
||||
|
||||
class CodexExec:
|
||||
def __init__(self, executable_path: Optional[str] = None, env: Optional[Dict[str, str]] = None) -> None:
|
||||
self.executable_path = executable_path or find_codex_path()
|
||||
self.env_override = env
|
||||
|
||||
def run(self, args: CodexExecArgs) -> Generator[str, None, None]:
|
||||
cancel_event = args.cancellation_event
|
||||
if cancel_event and cancel_event.is_set():
|
||||
raise CancelledError("Turn cancelled before start")
|
||||
|
||||
command_args: list[str] = ["exec", "--experimental-json"]
|
||||
if args.model:
|
||||
command_args.extend(["--model", args.model])
|
||||
if args.sandbox_mode:
|
||||
command_args.extend(["--sandbox", args.sandbox_mode])
|
||||
if args.working_directory:
|
||||
command_args.extend(["--cd", args.working_directory])
|
||||
if args.additional_directories:
|
||||
for extra_dir in args.additional_directories:
|
||||
command_args.extend(["--add-dir", extra_dir])
|
||||
if args.skip_git_repo_check:
|
||||
command_args.append("--skip-git-repo-check")
|
||||
if args.output_schema_file:
|
||||
command_args.extend(["--output-schema", args.output_schema_file])
|
||||
if args.model_reasoning_effort:
|
||||
command_args.extend(["--config", f'model_reasoning_effort="{args.model_reasoning_effort}"'])
|
||||
if args.network_access_enabled is not None:
|
||||
command_args.extend(
|
||||
["--config", f"sandbox_workspace_write.network_access={str(args.network_access_enabled).lower()}"]
|
||||
)
|
||||
if args.web_search_enabled is not None:
|
||||
command_args.extend(["--config", f"features.web_search_request={str(args.web_search_enabled).lower()}"])
|
||||
if args.approval_policy:
|
||||
command_args.extend(["--config", f'approval_policy="{args.approval_policy}"'])
|
||||
if args.images:
|
||||
for image in args.images:
|
||||
command_args.extend(["--image", image])
|
||||
if args.thread_id:
|
||||
command_args.extend(["resume", args.thread_id])
|
||||
|
||||
env = self._build_env(args)
|
||||
|
||||
process: Optional[subprocess.Popen[str]] = None
|
||||
try:
|
||||
process = subprocess.Popen(
|
||||
[self.executable_path, *command_args],
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True,
|
||||
bufsize=1,
|
||||
env=env,
|
||||
)
|
||||
if not process.stdin or not process.stdout:
|
||||
raise RuntimeError("Failed to open stdio for Codex process")
|
||||
|
||||
process.stdin.write(args.input)
|
||||
process.stdin.close()
|
||||
|
||||
if cancel_event and cancel_event.is_set():
|
||||
raise CancelledError("Turn cancelled before first event")
|
||||
|
||||
for raw_line in process.stdout:
|
||||
if cancel_event and cancel_event.is_set():
|
||||
raise CancelledError("Turn cancelled")
|
||||
yield raw_line.rstrip("\r\n")
|
||||
|
||||
process.wait()
|
||||
if cancel_event and cancel_event.is_set():
|
||||
raise CancelledError("Turn cancelled after process exit")
|
||||
if process.returncode:
|
||||
stderr_output = process.stderr.read() if process.stderr else ""
|
||||
raise RuntimeError(f"Codex Exec exited with code {process.returncode}: {stderr_output}")
|
||||
except CancelledError:
|
||||
if process:
|
||||
terminate_process(process)
|
||||
raise
|
||||
except Exception:
|
||||
if process:
|
||||
terminate_process(process)
|
||||
raise
|
||||
finally:
|
||||
if process:
|
||||
if process.poll() is None:
|
||||
terminate_process(process)
|
||||
if process.stdout and not process.stdout.closed:
|
||||
process.stdout.close()
|
||||
if process.stderr and not process.stderr.closed:
|
||||
process.stderr.close()
|
||||
|
||||
def _build_env(self, args: CodexExecArgs) -> Dict[str, str]:
|
||||
env: Dict[str, str] = {}
|
||||
if self.env_override is not None:
|
||||
env.update(self.env_override)
|
||||
else:
|
||||
env.update({key: value for key, value in os.environ.items() if value is not None})
|
||||
|
||||
if INTERNAL_ORIGINATOR_ENV not in env:
|
||||
env[INTERNAL_ORIGINATOR_ENV] = PYTHON_SDK_ORIGINATOR
|
||||
if args.base_url:
|
||||
env["OPENAI_BASE_URL"] = args.base_url
|
||||
if args.api_key:
|
||||
env["CODEX_API_KEY"] = args.api_key
|
||||
return env
|
||||
|
||||
|
||||
def terminate_process(process: subprocess.Popen[str]) -> None:
|
||||
try:
|
||||
if process.poll() is None:
|
||||
process.terminate()
|
||||
try:
|
||||
process.wait(timeout=2)
|
||||
except Exception:
|
||||
if process.poll() is None:
|
||||
process.kill()
|
||||
except Exception:
|
||||
try:
|
||||
if process.poll() is None:
|
||||
process.kill()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def find_codex_path() -> str:
|
||||
platform_name = sys.platform
|
||||
machine = platform.machine().lower()
|
||||
|
||||
target_triple = None
|
||||
if platform_name.startswith("linux") or platform_name == "android":
|
||||
if machine in {"x86_64", "amd64"}:
|
||||
target_triple = "x86_64-unknown-linux-musl"
|
||||
elif machine in {"aarch64", "arm64"}:
|
||||
target_triple = "aarch64-unknown-linux-musl"
|
||||
elif platform_name == "darwin":
|
||||
if machine in {"x86_64", "amd64"}:
|
||||
target_triple = "x86_64-apple-darwin"
|
||||
elif machine in {"arm64", "aarch64"}:
|
||||
target_triple = "aarch64-apple-darwin"
|
||||
elif platform_name == "win32":
|
||||
if machine in {"x86_64", "amd64"}:
|
||||
target_triple = "x86_64-pc-windows-msvc"
|
||||
elif machine in {"arm64", "aarch64"}:
|
||||
target_triple = "aarch64-pc-windows-msvc"
|
||||
|
||||
if target_triple is None:
|
||||
raise RuntimeError(f"Unsupported platform: {platform_name} ({machine})")
|
||||
|
||||
package_root = Path(__file__).resolve().parent.parent
|
||||
vendor_root = package_root / "vendor" / target_triple / "codex"
|
||||
binary_name = "codex.exe" if platform_name == "win32" else "codex"
|
||||
binary_path = vendor_root / binary_name
|
||||
if not binary_path.exists():
|
||||
raise RuntimeError(
|
||||
f"Codex binary not found at {binary_path}. "
|
||||
"Install Codex or provide codex_path_override when creating the client."
|
||||
)
|
||||
return str(binary_path)
|
||||
36
sdk/python/src/codex_sdk/options.py
Normal file
36
sdk/python/src/codex_sdk/options.py
Normal file
@@ -0,0 +1,36 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, Optional, Protocol
|
||||
|
||||
|
||||
class CancellationEvent(Protocol):
|
||||
def is_set(self) -> bool:
|
||||
...
|
||||
|
||||
|
||||
@dataclass
|
||||
class CodexOptions:
|
||||
codex_path_override: Optional[str] = None
|
||||
base_url: Optional[str] = None
|
||||
api_key: Optional[str] = None
|
||||
env: Optional[Dict[str, str]] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class ThreadOptions:
|
||||
model: Optional[str] = None
|
||||
sandbox_mode: Optional[str] = None
|
||||
working_directory: Optional[str] = None
|
||||
skip_git_repo_check: bool = False
|
||||
model_reasoning_effort: Optional[str] = None
|
||||
network_access_enabled: Optional[bool] = None
|
||||
web_search_enabled: Optional[bool] = None
|
||||
approval_policy: Optional[str] = None
|
||||
additional_directories: Optional[list[str]] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class TurnOptions:
|
||||
output_schema: Optional[object] = None
|
||||
cancellation_event: Optional[CancellationEvent] = None
|
||||
38
sdk/python/src/codex_sdk/schema_file.py
Normal file
38
sdk/python/src/codex_sdk/schema_file.py
Normal file
@@ -0,0 +1,38 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import shutil
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Callable, Optional, Tuple
|
||||
|
||||
|
||||
SchemaFile = Tuple[Optional[str], Callable[[], None]]
|
||||
|
||||
|
||||
def create_output_schema_file(schema: object) -> SchemaFile:
|
||||
if schema is None:
|
||||
return None, _noop_cleanup
|
||||
|
||||
if not isinstance(schema, dict):
|
||||
raise ValueError("output_schema must be a plain JSON object")
|
||||
|
||||
schema_dir = Path(tempfile.mkdtemp(prefix="codex-output-schema-"))
|
||||
schema_path = schema_dir / "schema.json"
|
||||
|
||||
def cleanup() -> None:
|
||||
try:
|
||||
shutil.rmtree(schema_dir, ignore_errors=True)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
schema_path.write_text(json.dumps(schema), encoding="utf-8")
|
||||
return str(schema_path), cleanup
|
||||
except Exception:
|
||||
cleanup()
|
||||
raise
|
||||
|
||||
|
||||
def _noop_cleanup() -> None:
|
||||
return None
|
||||
140
sdk/python/src/codex_sdk/thread.py
Normal file
140
sdk/python/src/codex_sdk/thread.py
Normal file
@@ -0,0 +1,140 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Generator, List, Optional, Union
|
||||
|
||||
from .exec import CancelledError, CodexExec, CodexExecArgs
|
||||
from .options import CodexOptions, ThreadOptions, TurnOptions
|
||||
from .schema_file import create_output_schema_file
|
||||
from .types import (
|
||||
AgentMessageItem,
|
||||
ItemCompletedEvent,
|
||||
ThreadErrorEvent,
|
||||
ThreadEvent,
|
||||
ThreadItem,
|
||||
ThreadStartedEvent,
|
||||
TurnCompletedEvent,
|
||||
TurnFailedEvent,
|
||||
Usage,
|
||||
parse_thread_event,
|
||||
)
|
||||
|
||||
InputEntry = dict
|
||||
Input = Union[str, List[InputEntry]]
|
||||
|
||||
|
||||
@dataclass
|
||||
class TurnResult:
|
||||
items: List[ThreadItem]
|
||||
final_response: str
|
||||
usage: Optional[Usage]
|
||||
|
||||
|
||||
@dataclass
|
||||
class StreamedTurn:
|
||||
events: Generator[ThreadEvent, None, None]
|
||||
|
||||
|
||||
class ThreadRunError(Exception):
|
||||
"""Raised when a turn fails."""
|
||||
|
||||
|
||||
class Thread:
|
||||
def __init__(
|
||||
self,
|
||||
exec_client: CodexExec,
|
||||
options: CodexOptions,
|
||||
thread_options: ThreadOptions,
|
||||
thread_id: Optional[str] = None,
|
||||
) -> None:
|
||||
self._exec = exec_client
|
||||
self._options = options
|
||||
self._thread_options = thread_options
|
||||
self._id = thread_id
|
||||
|
||||
@property
|
||||
def id(self) -> Optional[str]:
|
||||
return self._id
|
||||
|
||||
def run_streamed(self, input: Input, turn_options: Optional[TurnOptions] = None) -> StreamedTurn:
|
||||
return StreamedTurn(events=self._run_streamed_internal(input, turn_options or TurnOptions()))
|
||||
|
||||
def _run_streamed_internal(
|
||||
self, input: Input, turn_options: TurnOptions
|
||||
) -> Generator[ThreadEvent, None, None]:
|
||||
prompt, images = normalize_input(input)
|
||||
schema_path, cleanup = create_output_schema_file(turn_options.output_schema)
|
||||
args = CodexExecArgs(
|
||||
input=prompt,
|
||||
base_url=self._options.base_url,
|
||||
api_key=self._options.api_key,
|
||||
thread_id=self._id,
|
||||
images=images,
|
||||
model=self._thread_options.model,
|
||||
sandbox_mode=self._thread_options.sandbox_mode,
|
||||
working_directory=self._thread_options.working_directory,
|
||||
additional_directories=self._thread_options.additional_directories,
|
||||
skip_git_repo_check=self._thread_options.skip_git_repo_check,
|
||||
output_schema_file=schema_path,
|
||||
model_reasoning_effort=self._thread_options.model_reasoning_effort,
|
||||
cancellation_event=turn_options.cancellation_event,
|
||||
network_access_enabled=self._thread_options.network_access_enabled,
|
||||
web_search_enabled=self._thread_options.web_search_enabled,
|
||||
approval_policy=self._thread_options.approval_policy,
|
||||
)
|
||||
generator = self._exec.run(args)
|
||||
try:
|
||||
for line in generator:
|
||||
event = parse_thread_event(line)
|
||||
if isinstance(event, ThreadStartedEvent):
|
||||
self._id = event.thread_id
|
||||
yield event
|
||||
finally:
|
||||
cleanup()
|
||||
|
||||
def run(self, input: Input, turn_options: Optional[TurnOptions] = None) -> TurnResult:
|
||||
generator = self._run_streamed_internal(input, turn_options or TurnOptions())
|
||||
items: List[ThreadItem] = []
|
||||
final_response = ""
|
||||
usage: Optional[Usage] = None
|
||||
turn_failure: Optional[str] = None
|
||||
try:
|
||||
try:
|
||||
for event in generator:
|
||||
if isinstance(event, ItemCompletedEvent):
|
||||
if isinstance(event.item, AgentMessageItem):
|
||||
final_response = event.item.text
|
||||
items.append(event.item)
|
||||
elif isinstance(event, TurnCompletedEvent):
|
||||
usage = event.usage
|
||||
elif isinstance(event, TurnFailedEvent):
|
||||
turn_failure = event.error.message
|
||||
break
|
||||
elif isinstance(event, ThreadErrorEvent):
|
||||
turn_failure = event.message
|
||||
break
|
||||
except CancelledError:
|
||||
raise
|
||||
finally:
|
||||
generator.close()
|
||||
if turn_failure:
|
||||
raise ThreadRunError(turn_failure)
|
||||
return TurnResult(items=items, final_response=final_response, usage=usage)
|
||||
|
||||
|
||||
def normalize_input(input: Input) -> tuple[str, List[str]]:
|
||||
if isinstance(input, str):
|
||||
return input, []
|
||||
prompt_parts: List[str] = []
|
||||
images: List[str] = []
|
||||
for item in input:
|
||||
item_type = item.get("type")
|
||||
if item_type == "text":
|
||||
text = item.get("text")
|
||||
if text is not None:
|
||||
prompt_parts.append(str(text))
|
||||
elif item_type == "local_image":
|
||||
path = item.get("path")
|
||||
if path:
|
||||
images.append(str(path))
|
||||
return "\n\n".join(prompt_parts), images
|
||||
266
sdk/python/src/codex_sdk/types.py
Normal file
266
sdk/python/src/codex_sdk/types.py
Normal file
@@ -0,0 +1,266 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
|
||||
JsonDict = Dict[str, Any]
|
||||
|
||||
|
||||
@dataclass
|
||||
class Usage:
|
||||
input_tokens: int = 0
|
||||
cached_input_tokens: int = 0
|
||||
output_tokens: int = 0
|
||||
|
||||
|
||||
@dataclass
|
||||
class ThreadError:
|
||||
message: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class AgentMessageItem:
|
||||
id: str
|
||||
type: str
|
||||
text: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class ReasoningItem:
|
||||
id: str
|
||||
type: str
|
||||
text: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class CommandExecutionItem:
|
||||
id: str
|
||||
type: str
|
||||
command: str
|
||||
aggregated_output: str
|
||||
status: str
|
||||
exit_code: Optional[int] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class FileUpdateChange:
|
||||
path: str
|
||||
kind: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class FileChangeItem:
|
||||
id: str
|
||||
type: str
|
||||
changes: List[FileUpdateChange]
|
||||
status: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class McpToolCallResult:
|
||||
content: List[JsonDict]
|
||||
structured_content: Any
|
||||
|
||||
|
||||
@dataclass
|
||||
class McpToolCallError:
|
||||
message: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class McpToolCallItem:
|
||||
id: str
|
||||
type: str
|
||||
server: str
|
||||
tool: str
|
||||
arguments: Any
|
||||
status: str
|
||||
result: Optional[McpToolCallResult] = None
|
||||
error: Optional[McpToolCallError] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class WebSearchItem:
|
||||
id: str
|
||||
type: str
|
||||
query: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class TodoItem:
|
||||
text: str
|
||||
completed: bool
|
||||
|
||||
|
||||
@dataclass
|
||||
class TodoListItem:
|
||||
id: str
|
||||
type: str
|
||||
items: List[TodoItem]
|
||||
|
||||
|
||||
@dataclass
|
||||
class ErrorItem:
|
||||
id: str
|
||||
type: str
|
||||
message: str
|
||||
|
||||
|
||||
ThreadItem = Union[
|
||||
AgentMessageItem,
|
||||
ReasoningItem,
|
||||
CommandExecutionItem,
|
||||
FileChangeItem,
|
||||
McpToolCallItem,
|
||||
WebSearchItem,
|
||||
TodoListItem,
|
||||
ErrorItem,
|
||||
]
|
||||
|
||||
|
||||
@dataclass
|
||||
class ThreadStartedEvent:
|
||||
type: str
|
||||
thread_id: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class TurnStartedEvent:
|
||||
type: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class TurnCompletedEvent:
|
||||
type: str
|
||||
usage: Usage
|
||||
|
||||
|
||||
@dataclass
|
||||
class TurnFailedEvent:
|
||||
type: str
|
||||
error: ThreadError
|
||||
|
||||
|
||||
@dataclass
|
||||
class ItemStartedEvent:
|
||||
type: str
|
||||
item: ThreadItem
|
||||
|
||||
|
||||
@dataclass
|
||||
class ItemUpdatedEvent:
|
||||
type: str
|
||||
item: ThreadItem
|
||||
|
||||
|
||||
@dataclass
|
||||
class ItemCompletedEvent:
|
||||
type: str
|
||||
item: ThreadItem
|
||||
|
||||
|
||||
@dataclass
|
||||
class ThreadErrorEvent:
|
||||
type: str
|
||||
message: str
|
||||
|
||||
|
||||
ThreadEvent = Union[
|
||||
ThreadStartedEvent,
|
||||
TurnStartedEvent,
|
||||
TurnCompletedEvent,
|
||||
TurnFailedEvent,
|
||||
ItemStartedEvent,
|
||||
ItemUpdatedEvent,
|
||||
ItemCompletedEvent,
|
||||
ThreadErrorEvent,
|
||||
]
|
||||
|
||||
|
||||
def parse_thread_event(payload: Union[str, JsonDict]) -> ThreadEvent:
|
||||
data = json.loads(payload) if isinstance(payload, str) else dict(payload)
|
||||
event_type = data.get("type")
|
||||
if event_type == "thread.started":
|
||||
return ThreadStartedEvent(type=event_type, thread_id=str(data["thread_id"]))
|
||||
if event_type == "turn.started":
|
||||
return TurnStartedEvent(type=event_type)
|
||||
if event_type == "turn.completed":
|
||||
usage_data = data.get("usage", {}) or {}
|
||||
usage = Usage(
|
||||
input_tokens=int(usage_data.get("input_tokens", 0) or 0),
|
||||
cached_input_tokens=int(usage_data.get("cached_input_tokens", 0) or 0),
|
||||
output_tokens=int(usage_data.get("output_tokens", 0) or 0),
|
||||
)
|
||||
return TurnCompletedEvent(type=event_type, usage=usage)
|
||||
if event_type == "turn.failed":
|
||||
error = data.get("error") or {}
|
||||
return TurnFailedEvent(type=event_type, error=ThreadError(message=str(error.get("message", ""))))
|
||||
if event_type == "item.started":
|
||||
return ItemStartedEvent(type=event_type, item=parse_thread_item(data["item"]))
|
||||
if event_type == "item.updated":
|
||||
return ItemUpdatedEvent(type=event_type, item=parse_thread_item(data["item"]))
|
||||
if event_type == "item.completed":
|
||||
return ItemCompletedEvent(type=event_type, item=parse_thread_item(data["item"]))
|
||||
if event_type == "error":
|
||||
return ThreadErrorEvent(type=event_type, message=str(data.get("message", "")))
|
||||
raise ValueError(f"Unsupported event type: {event_type}")
|
||||
|
||||
|
||||
def parse_thread_item(data: JsonDict) -> ThreadItem:
|
||||
item_type = data.get("type")
|
||||
item_id = str(data.get("id", ""))
|
||||
if item_type == "agent_message":
|
||||
return AgentMessageItem(id=item_id, type=item_type, text=str(data.get("text", "")))
|
||||
if item_type == "reasoning":
|
||||
return ReasoningItem(id=item_id, type=item_type, text=str(data.get("text", "")))
|
||||
if item_type == "command_execution":
|
||||
return CommandExecutionItem(
|
||||
id=item_id,
|
||||
type=item_type,
|
||||
command=str(data.get("command", "")),
|
||||
aggregated_output=str(data.get("aggregated_output", "")),
|
||||
exit_code=data.get("exit_code"),
|
||||
status=str(data.get("status", "")),
|
||||
)
|
||||
if item_type == "file_change":
|
||||
changes_data = data.get("changes") or []
|
||||
changes = [
|
||||
FileUpdateChange(path=str(change.get("path", "")), kind=str(change.get("kind", "")))
|
||||
for change in changes_data
|
||||
]
|
||||
return FileChangeItem(id=item_id, type=item_type, changes=changes, status=str(data.get("status", "")))
|
||||
if item_type == "mcp_tool_call":
|
||||
result_data = data.get("result")
|
||||
error_data = data.get("error")
|
||||
result = None
|
||||
if isinstance(result_data, dict):
|
||||
result = McpToolCallResult(
|
||||
content=list(result_data.get("content") or []),
|
||||
structured_content=result_data.get("structured_content"),
|
||||
)
|
||||
error = None
|
||||
if isinstance(error_data, dict):
|
||||
error = McpToolCallError(message=str(error_data.get("message", "")))
|
||||
return McpToolCallItem(
|
||||
id=item_id,
|
||||
type=item_type,
|
||||
server=str(data.get("server", "")),
|
||||
tool=str(data.get("tool", "")),
|
||||
arguments=data.get("arguments"),
|
||||
status=str(data.get("status", "")),
|
||||
result=result,
|
||||
error=error,
|
||||
)
|
||||
if item_type == "web_search":
|
||||
return WebSearchItem(id=item_id, type=item_type, query=str(data.get("query", "")))
|
||||
if item_type == "todo_list":
|
||||
todos_data = data.get("items") or []
|
||||
todos = [
|
||||
TodoItem(text=str(todo.get("text", "")), completed=bool(todo.get("completed", False)))
|
||||
for todo in todos_data
|
||||
]
|
||||
return TodoListItem(id=item_id, type=item_type, items=todos)
|
||||
if item_type == "error":
|
||||
return ErrorItem(id=item_id, type=item_type, message=str(data.get("message", "")))
|
||||
raise ValueError(f"Unsupported item type: {item_type}")
|
||||
1
sdk/python/tests/__init__.py
Normal file
1
sdk/python/tests/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
#
|
||||
BIN
sdk/python/tests/__pycache__/__init__.cpython-312.pyc
Normal file
BIN
sdk/python/tests/__pycache__/__init__.cpython-312.pyc
Normal file
Binary file not shown.
BIN
sdk/python/tests/__pycache__/responses_proxy.cpython-312.pyc
Normal file
BIN
sdk/python/tests/__pycache__/responses_proxy.cpython-312.pyc
Normal file
Binary file not shown.
BIN
sdk/python/tests/__pycache__/test_sdk.cpython-312.pyc
Normal file
BIN
sdk/python/tests/__pycache__/test_sdk.cpython-312.pyc
Normal file
Binary file not shown.
130
sdk/python/tests/responses_proxy.py
Normal file
130
sdk/python/tests/responses_proxy.py
Normal file
@@ -0,0 +1,130 @@
|
||||
import json
|
||||
import threading
|
||||
from http import HTTPStatus
|
||||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||||
from typing import Any, Callable, Dict, Generator, Iterable, List, Optional, Tuple
|
||||
|
||||
DEFAULT_RESPONSE_ID = "resp_mock"
|
||||
DEFAULT_MESSAGE_ID = "msg_mock"
|
||||
|
||||
|
||||
class _ServerState:
|
||||
def __init__(self, responses: Iterable[Dict[str, Any]], status_code: int) -> None:
|
||||
self.responses = iter(responses)
|
||||
self.requests: List[RecordedRequest] = []
|
||||
self.status_code = status_code
|
||||
self.error: Optional[Exception] = None
|
||||
|
||||
|
||||
class RecordedRequest:
|
||||
def __init__(self, body: str, headers: Dict[str, str]) -> None:
|
||||
self.body = body
|
||||
self.json = json.loads(body)
|
||||
self.headers = headers
|
||||
|
||||
|
||||
def format_sse_event(event: Dict[str, Any]) -> str:
|
||||
return f"event: {event['type']}\n" + f"data: {json.dumps(event)}\n\n"
|
||||
|
||||
|
||||
def start_responses_test_proxy(
|
||||
response_bodies: Iterable[Dict[str, Any]], status_code: int = HTTPStatus.OK
|
||||
) -> Tuple[str, List[RecordedRequest], Callable[[], None]]:
|
||||
responses_iterable = response_bodies if isinstance(response_bodies, Generator) else list(response_bodies)
|
||||
state = _ServerState(responses_iterable, int(status_code))
|
||||
|
||||
class Handler(BaseHTTPRequestHandler):
|
||||
def log_message(self, fmt: str, *args: Any) -> None: # pragma: no cover - silence stderr noise
|
||||
return
|
||||
|
||||
def _read_body(self) -> str:
|
||||
length = int(self.headers.get("content-length", "0"))
|
||||
return self.rfile.read(length).decode("utf-8")
|
||||
|
||||
def do_POST(self) -> None: # noqa: N802
|
||||
if self.path != "/responses":
|
||||
self.send_error(HTTPStatus.NOT_FOUND)
|
||||
return
|
||||
body = self._read_body()
|
||||
state.requests.append(RecordedRequest(body, dict(self.headers)))
|
||||
try:
|
||||
response = next(state.responses)
|
||||
except Exception as exc: # pragma: no cover - defensive
|
||||
state.error = exc
|
||||
self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR, explain=str(exc))
|
||||
return
|
||||
|
||||
self.send_response(state.status_code)
|
||||
self.send_header("content-type", "text/event-stream")
|
||||
self.end_headers()
|
||||
for event in response["events"]:
|
||||
self.wfile.write(format_sse_event(event).encode("utf-8"))
|
||||
self.wfile.flush()
|
||||
|
||||
try:
|
||||
server = HTTPServer(("127.0.0.1", 0), Handler)
|
||||
except PermissionError as exc:
|
||||
raise RuntimeError("Cannot bind loopback HTTP server inside sandbox") from exc
|
||||
address, port = server.server_address
|
||||
url = f"http://{address}:{port}"
|
||||
|
||||
def serve() -> None:
|
||||
with server:
|
||||
server.serve_forever(poll_interval=0.1)
|
||||
|
||||
thread = threading.Thread(target=serve, daemon=True)
|
||||
thread.start()
|
||||
return url, state.requests, lambda: _stop_server(server, thread)
|
||||
|
||||
|
||||
def _stop_server(server: HTTPServer, thread: threading.Thread) -> None:
|
||||
server.shutdown()
|
||||
thread.join(timeout=2)
|
||||
|
||||
|
||||
def sse(*events: Dict[str, Any]) -> Dict[str, Any]:
|
||||
return {"kind": "sse", "events": list(events)}
|
||||
|
||||
|
||||
def response_started(response_id: str = DEFAULT_RESPONSE_ID) -> Dict[str, Any]:
|
||||
return {"type": "response.created", "response": {"id": response_id}}
|
||||
|
||||
|
||||
def assistant_message(text: str, item_id: str = DEFAULT_MESSAGE_ID) -> Dict[str, Any]:
|
||||
return {
|
||||
"type": "response.output_item.done",
|
||||
"item": {
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"id": item_id,
|
||||
"content": [{"type": "output_text", "text": text}],
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def shell_call() -> Dict[str, Any]:
|
||||
command = ["bash", "-lc", "echo 'Hello, world!'"]
|
||||
return {
|
||||
"type": "response.output_item.done",
|
||||
"item": {
|
||||
"type": "function_call",
|
||||
"call_id": f"call_id{threading.get_ident()}",
|
||||
"name": "shell",
|
||||
"arguments": json.dumps({"command": command, "timeout_ms": 100}),
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def response_failed(error_message: str) -> Dict[str, Any]:
|
||||
return {"type": "error", "error": {"code": "rate_limit_exceeded", "message": error_message}}
|
||||
|
||||
|
||||
def response_completed(response_id: str = DEFAULT_RESPONSE_ID, usage: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
|
||||
usage_payload = usage or {
|
||||
"input_tokens": 42,
|
||||
"input_tokens_details": {"cached_tokens": 12},
|
||||
"output_tokens": 5,
|
||||
"output_tokens_details": None,
|
||||
"total_tokens": 47,
|
||||
}
|
||||
return {"type": "response.completed", "response": {"id": response_id, "usage": usage_payload}}
|
||||
452
sdk/python/tests/test_sdk.py
Normal file
452
sdk/python/tests/test_sdk.py
Normal file
@@ -0,0 +1,452 @@
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import tempfile
|
||||
import threading
|
||||
import time
|
||||
import unittest
|
||||
import unittest.mock
|
||||
from contextlib import contextmanager
|
||||
from pathlib import Path
|
||||
from typing import List, Tuple
|
||||
|
||||
from codex_sdk import (
|
||||
Codex,
|
||||
CodexOptions,
|
||||
ThreadOptions,
|
||||
ThreadRunError,
|
||||
TurnOptions,
|
||||
)
|
||||
from codex_sdk.exec import INTERNAL_ORIGINATOR_ENV
|
||||
from codex_sdk.types import ThreadEvent
|
||||
|
||||
from .responses_proxy import (
|
||||
assistant_message,
|
||||
response_completed,
|
||||
response_failed,
|
||||
response_started,
|
||||
shell_call,
|
||||
sse,
|
||||
start_responses_test_proxy,
|
||||
)
|
||||
|
||||
CODEX_EXEC_PATH = Path(__file__).resolve().parents[2] / "codex-rs" / "target" / "debug" / "codex"
|
||||
|
||||
|
||||
def expect_pair(args: List[str], flag: str, value: str) -> None:
|
||||
self_index = args.index(flag)
|
||||
assert args[self_index + 1] == value
|
||||
|
||||
|
||||
@contextmanager
|
||||
def spy_popen() -> Tuple[list, list]:
|
||||
real_popen = subprocess.Popen
|
||||
calls: list = []
|
||||
envs: list = []
|
||||
|
||||
def wrapper(*args, **kwargs):
|
||||
calls.append(list(args[0]))
|
||||
envs.append(kwargs.get("env"))
|
||||
return real_popen(*args, **kwargs)
|
||||
|
||||
with unittest.mock.patch("codex_sdk.exec.subprocess.Popen", side_effect=wrapper):
|
||||
yield calls, envs
|
||||
|
||||
|
||||
class CodexSdkTests(unittest.TestCase):
|
||||
def _start_proxy(self, bodies):
|
||||
try:
|
||||
return start_responses_test_proxy(bodies)
|
||||
except RuntimeError as exc:
|
||||
self.skipTest(str(exc))
|
||||
|
||||
def test_returns_thread_events(self) -> None:
|
||||
url, requests, close = self._start_proxy([sse(response_started(), assistant_message("Hi!"), response_completed())])
|
||||
try:
|
||||
client = Codex(CodexOptions(codex_path_override=str(CODEX_EXEC_PATH), base_url=url, api_key="test"))
|
||||
thread = client.start_thread()
|
||||
result = thread.run("Hello, world!")
|
||||
|
||||
self.assertEqual(result.final_response, "Hi!")
|
||||
self.assertEqual(len(result.items), 1)
|
||||
self.assertIsNotNone(result.usage)
|
||||
self.assertIsNotNone(thread.id)
|
||||
self.assertGreater(len(requests), 0)
|
||||
finally:
|
||||
close()
|
||||
|
||||
def test_run_twice_continues_thread(self) -> None:
|
||||
url, requests, close = self._start_proxy(
|
||||
[
|
||||
sse(response_started("response_1"), assistant_message("First response", "item_1"), response_completed("response_1")),
|
||||
sse(response_started("response_2"), assistant_message("Second response", "item_2"), response_completed("response_2")),
|
||||
]
|
||||
)
|
||||
try:
|
||||
client = Codex(CodexOptions(codex_path_override=str(CODEX_EXEC_PATH), base_url=url, api_key="test"))
|
||||
thread = client.start_thread()
|
||||
thread.run("first input")
|
||||
thread.run("second input")
|
||||
|
||||
second_request = requests[1]
|
||||
payload = second_request.json
|
||||
assistant_entry = next((entry for entry in payload["input"] if entry.get("role") == "assistant"), None)
|
||||
self.assertIsNotNone(assistant_entry)
|
||||
content = assistant_entry.get("content") or []
|
||||
assistant_text = next((item.get("text") for item in content if item.get("type") == "output_text"), None)
|
||||
self.assertEqual(assistant_text, "First response")
|
||||
finally:
|
||||
close()
|
||||
|
||||
def test_resume_thread_by_id(self) -> None:
|
||||
url, requests, close = self._start_proxy(
|
||||
[
|
||||
sse(response_started("response_1"), assistant_message("First response", "item_1"), response_completed("response_1")),
|
||||
sse(response_started("response_2"), assistant_message("Second response", "item_2"), response_completed("response_2")),
|
||||
]
|
||||
)
|
||||
try:
|
||||
client = Codex(CodexOptions(codex_path_override=str(CODEX_EXEC_PATH), base_url=url, api_key="test"))
|
||||
original = client.start_thread()
|
||||
original.run("first input")
|
||||
|
||||
resumed = client.resume_thread(original.id or "")
|
||||
result = resumed.run("second input")
|
||||
|
||||
self.assertEqual(resumed.id, original.id)
|
||||
self.assertEqual(result.final_response, "Second response")
|
||||
|
||||
second_request = requests[1]
|
||||
payload = second_request.json
|
||||
assistant_entry = next((entry for entry in payload["input"] if entry.get("role") == "assistant"), None)
|
||||
content = assistant_entry.get("content") if assistant_entry else []
|
||||
assistant_text = next((item.get("text") for item in content if item.get("type") == "output_text"), None)
|
||||
self.assertEqual(assistant_text, "First response")
|
||||
finally:
|
||||
close()
|
||||
|
||||
def test_run_streamed(self) -> None:
|
||||
url, requests, close = self._start_proxy(
|
||||
[sse(response_started(), assistant_message("Hi!"), response_completed())]
|
||||
)
|
||||
try:
|
||||
client = Codex(CodexOptions(codex_path_override=str(CODEX_EXEC_PATH), base_url=url, api_key="test"))
|
||||
thread = client.start_thread()
|
||||
streamed = thread.run_streamed("Hello, world!")
|
||||
|
||||
events: List[ThreadEvent] = list(streamed.events)
|
||||
self.assertEqual(len(events), 4)
|
||||
self.assertIsNotNone(thread.id)
|
||||
self.assertGreater(len(requests), 0)
|
||||
finally:
|
||||
close()
|
||||
|
||||
def test_thread_options_passed_to_exec(self) -> None:
|
||||
url, requests, close = self._start_proxy(
|
||||
[sse(response_started("response_1"), assistant_message("Turn options applied", "item_1"), response_completed("response_1"))]
|
||||
)
|
||||
with spy_popen() as (calls, _envs):
|
||||
try:
|
||||
client = Codex(CodexOptions(codex_path_override=str(CODEX_EXEC_PATH), base_url=url, api_key="test"))
|
||||
thread = client.start_thread(ThreadOptions(model="gpt-test-1", sandbox_mode="workspace-write"))
|
||||
thread.run("apply options")
|
||||
command_args = calls[0]
|
||||
self.assertIn("--sandbox", command_args)
|
||||
self.assertIn("workspace-write", command_args)
|
||||
self.assertIn("--model", command_args)
|
||||
self.assertIn("gpt-test-1", command_args)
|
||||
finally:
|
||||
close()
|
||||
|
||||
def test_model_reasoning_effort_flag(self) -> None:
|
||||
url, _requests, close = self._start_proxy(
|
||||
[sse(response_started("response_1"), assistant_message("Reasoning effort applied", "item_1"), response_completed("response_1"))]
|
||||
)
|
||||
with spy_popen() as (calls, _envs):
|
||||
try:
|
||||
client = Codex(CodexOptions(codex_path_override=str(CODEX_EXEC_PATH), base_url=url, api_key="test"))
|
||||
thread = client.start_thread(ThreadOptions(model_reasoning_effort="high"))
|
||||
thread.run("apply reasoning effort")
|
||||
command_args = calls[0]
|
||||
expect_pair(command_args, "--config", 'model_reasoning_effort="high"')
|
||||
finally:
|
||||
close()
|
||||
|
||||
def test_network_access_flag(self) -> None:
|
||||
url, _requests, close = self._start_proxy(
|
||||
[sse(response_started("response_1"), assistant_message("Network access enabled", "item_1"), response_completed("response_1"))]
|
||||
)
|
||||
with spy_popen() as (calls, _envs):
|
||||
try:
|
||||
client = Codex(CodexOptions(codex_path_override=str(CODEX_EXEC_PATH), base_url=url, api_key="test"))
|
||||
thread = client.start_thread(ThreadOptions(network_access_enabled=True))
|
||||
thread.run("test network access")
|
||||
command_args = calls[0]
|
||||
expect_pair(command_args, "--config", "sandbox_workspace_write.network_access=true")
|
||||
finally:
|
||||
close()
|
||||
|
||||
def test_web_search_flag(self) -> None:
|
||||
url, _requests, close = self._start_proxy(
|
||||
[sse(response_started("response_1"), assistant_message("Web search enabled", "item_1"), response_completed("response_1"))]
|
||||
)
|
||||
with spy_popen() as (calls, _envs):
|
||||
try:
|
||||
client = Codex(CodexOptions(codex_path_override=str(CODEX_EXEC_PATH), base_url=url, api_key="test"))
|
||||
thread = client.start_thread(ThreadOptions(web_search_enabled=True))
|
||||
thread.run("test web search")
|
||||
command_args = calls[0]
|
||||
expect_pair(command_args, "--config", "features.web_search_request=true")
|
||||
finally:
|
||||
close()
|
||||
|
||||
def test_approval_policy_flag(self) -> None:
|
||||
url, _requests, close = self._start_proxy(
|
||||
[sse(response_started("response_1"), assistant_message("Approval policy set", "item_1"), response_completed("response_1"))]
|
||||
)
|
||||
with spy_popen() as (calls, _envs):
|
||||
try:
|
||||
client = Codex(CodexOptions(codex_path_override=str(CODEX_EXEC_PATH), base_url=url, api_key="test"))
|
||||
thread = client.start_thread(ThreadOptions(approval_policy="on-request"))
|
||||
thread.run("test approval policy")
|
||||
command_args = calls[0]
|
||||
expect_pair(command_args, "--config", 'approval_policy="on-request"')
|
||||
finally:
|
||||
close()
|
||||
|
||||
def test_env_override(self) -> None:
|
||||
url, _requests, close = self._start_proxy(
|
||||
[sse(response_started("response_1"), assistant_message("Custom env", "item_1"), response_completed("response_1"))]
|
||||
)
|
||||
os.environ["CODEX_ENV_SHOULD_NOT_LEAK"] = "leak"
|
||||
with spy_popen() as (_calls, envs):
|
||||
try:
|
||||
client = Codex(
|
||||
CodexOptions(
|
||||
codex_path_override=str(CODEX_EXEC_PATH),
|
||||
base_url=url,
|
||||
api_key="test",
|
||||
env={"CUSTOM_ENV": "custom"},
|
||||
)
|
||||
)
|
||||
thread = client.start_thread()
|
||||
thread.run("custom env")
|
||||
|
||||
spawn_env = envs[0]
|
||||
self.assertIsNotNone(spawn_env)
|
||||
if spawn_env:
|
||||
self.assertEqual(spawn_env.get("CUSTOM_ENV"), "custom")
|
||||
self.assertIsNone(spawn_env.get("CODEX_ENV_SHOULD_NOT_LEAK"))
|
||||
self.assertEqual(spawn_env.get("OPENAI_BASE_URL"), url)
|
||||
self.assertEqual(spawn_env.get("CODEX_API_KEY"), "test")
|
||||
self.assertEqual(spawn_env.get(INTERNAL_ORIGINATOR_ENV), "codex_sdk_py")
|
||||
finally:
|
||||
os.environ.pop("CODEX_ENV_SHOULD_NOT_LEAK", None)
|
||||
close()
|
||||
|
||||
def test_additional_directories(self) -> None:
|
||||
url, _requests, close = self._start_proxy(
|
||||
[sse(response_started("response_1"), assistant_message("Additional directories applied", "item_1"), response_completed("response_1"))]
|
||||
)
|
||||
with spy_popen() as (calls, _envs):
|
||||
try:
|
||||
client = Codex(CodexOptions(codex_path_override=str(CODEX_EXEC_PATH), base_url=url, api_key="test"))
|
||||
thread = client.start_thread(ThreadOptions(additional_directories=["../backend", "/tmp/shared"]))
|
||||
thread.run("test additional dirs")
|
||||
command_args = calls[0]
|
||||
forwarded = []
|
||||
for index, arg in enumerate(command_args):
|
||||
if arg == "--add-dir" and index + 1 < len(command_args):
|
||||
forwarded.append(command_args[index + 1])
|
||||
self.assertEqual(forwarded, ["../backend", "/tmp/shared"])
|
||||
finally:
|
||||
close()
|
||||
|
||||
def test_output_schema_written_and_cleaned(self) -> None:
|
||||
url, requests, close = self._start_proxy(
|
||||
[sse(response_started("response_1"), assistant_message("Structured response", "item_1"), response_completed("response_1"))]
|
||||
)
|
||||
schema = {
|
||||
"type": "object",
|
||||
"properties": {"answer": {"type": "string"}},
|
||||
"required": ["answer"],
|
||||
"additionalProperties": False,
|
||||
}
|
||||
with spy_popen() as (calls, _envs):
|
||||
try:
|
||||
client = Codex(CodexOptions(codex_path_override=str(CODEX_EXEC_PATH), base_url=url, api_key="test"))
|
||||
thread = client.start_thread()
|
||||
thread.run("structured", TurnOptions(output_schema=schema))
|
||||
|
||||
payload = requests[0].json
|
||||
text = payload.get("text")
|
||||
self.assertIsNotNone(text)
|
||||
if text:
|
||||
self.assertEqual(
|
||||
text.get("format"),
|
||||
{"name": "codex_output_schema", "type": "json_schema", "strict": True, "schema": schema},
|
||||
)
|
||||
|
||||
command_args = calls[0]
|
||||
schema_index = command_args.index("--output-schema")
|
||||
schema_path = command_args[schema_index + 1]
|
||||
self.assertFalse(Path(schema_path).exists())
|
||||
finally:
|
||||
close()
|
||||
|
||||
def test_combines_text_segments(self) -> None:
|
||||
url, requests, close = self._start_proxy(
|
||||
[sse(response_started("response_1"), assistant_message("Combined input applied", "item_1"), response_completed("response_1"))]
|
||||
)
|
||||
try:
|
||||
client = Codex(CodexOptions(codex_path_override=str(CODEX_EXEC_PATH), base_url=url, api_key="test"))
|
||||
thread = client.start_thread()
|
||||
thread.run(
|
||||
[
|
||||
{"type": "text", "text": "Describe file changes"},
|
||||
{"type": "text", "text": "Focus on impacted tests"},
|
||||
]
|
||||
)
|
||||
|
||||
payload = requests[0].json
|
||||
last_user = payload["input"][-1]
|
||||
text = last_user["content"][0]["text"]
|
||||
self.assertEqual(text, "Describe file changes\n\nFocus on impacted tests")
|
||||
finally:
|
||||
close()
|
||||
|
||||
def test_forwards_images(self) -> None:
|
||||
url, _requests, close = self._start_proxy(
|
||||
[sse(response_started("response_1"), assistant_message("Images applied", "item_1"), response_completed("response_1"))]
|
||||
)
|
||||
temp_dir = tempfile.mkdtemp(prefix="codex-images-")
|
||||
images = [str(Path(temp_dir) / "first.png"), str(Path(temp_dir) / "second.jpg")]
|
||||
for index, image_path in enumerate(images):
|
||||
Path(image_path).write_text(f"image-{index}", encoding="utf-8")
|
||||
with spy_popen() as (calls, _envs):
|
||||
try:
|
||||
client = Codex(CodexOptions(codex_path_override=str(CODEX_EXEC_PATH), base_url=url, api_key="test"))
|
||||
thread = client.start_thread()
|
||||
thread.run(
|
||||
[
|
||||
{"type": "text", "text": "describe the images"},
|
||||
{"type": "local_image", "path": images[0]},
|
||||
{"type": "local_image", "path": images[1]},
|
||||
]
|
||||
)
|
||||
command_args = calls[0]
|
||||
forwarded = []
|
||||
for index, arg in enumerate(command_args):
|
||||
if arg == "--image" and index + 1 < len(command_args):
|
||||
forwarded.append(command_args[index + 1])
|
||||
self.assertEqual(forwarded, images)
|
||||
finally:
|
||||
close()
|
||||
try:
|
||||
for image in images:
|
||||
Path(image).unlink(missing_ok=True)
|
||||
Path(temp_dir).rmdir()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def test_working_directory(self) -> None:
|
||||
url, _requests, close = self._start_proxy(
|
||||
[sse(response_started("response_1"), assistant_message("Working directory applied", "item_1"), response_completed("response_1"))]
|
||||
)
|
||||
with spy_popen() as (calls, _envs):
|
||||
try:
|
||||
working_dir = tempfile.mkdtemp(prefix="codex-working-dir-")
|
||||
client = Codex(CodexOptions(codex_path_override=str(CODEX_EXEC_PATH), base_url=url, api_key="test"))
|
||||
thread = client.start_thread(ThreadOptions(working_directory=working_dir, skip_git_repo_check=True))
|
||||
thread.run("use custom working directory")
|
||||
command_args = calls[0]
|
||||
expect_pair(command_args, "--cd", working_dir)
|
||||
finally:
|
||||
close()
|
||||
try:
|
||||
Path(working_dir).rmdir()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def test_working_directory_without_git_check_fails(self) -> None:
|
||||
url, _requests, close = self._start_proxy(
|
||||
[sse(response_started("response_1"), assistant_message("Working directory applied", "item_1"), response_completed("response_1"))]
|
||||
)
|
||||
try:
|
||||
working_dir = tempfile.mkdtemp(prefix="codex-working-dir-")
|
||||
client = Codex(CodexOptions(codex_path_override=str(CODEX_EXEC_PATH), base_url=url, api_key="test"))
|
||||
thread = client.start_thread(ThreadOptions(working_directory=working_dir))
|
||||
with self.assertRaises(ThreadRunError):
|
||||
thread.run("use custom working directory")
|
||||
finally:
|
||||
close()
|
||||
try:
|
||||
Path(working_dir).rmdir()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def test_originator_header(self) -> None:
|
||||
url, requests, close = self._start_proxy(
|
||||
[sse(response_started(), assistant_message("Hi!"), response_completed())]
|
||||
)
|
||||
try:
|
||||
client = Codex(CodexOptions(codex_path_override=str(CODEX_EXEC_PATH), base_url=url, api_key="test"))
|
||||
thread = client.start_thread()
|
||||
thread.run("Hello, originator!")
|
||||
|
||||
originator = requests[0].headers.get("originator")
|
||||
self.assertIn("codex_sdk_py", originator)
|
||||
finally:
|
||||
close()
|
||||
|
||||
def test_turn_failure_raises(self) -> None:
|
||||
def failure_events():
|
||||
yield sse(response_started("response_1"))
|
||||
while True:
|
||||
yield sse(response_failed("rate limit exceeded"))
|
||||
|
||||
url, _requests, close = self._start_proxy(failure_events())
|
||||
try:
|
||||
client = Codex(CodexOptions(codex_path_override=str(CODEX_EXEC_PATH), base_url=url, api_key="test"))
|
||||
thread = client.start_thread()
|
||||
with self.assertRaises(ThreadRunError):
|
||||
thread.run("fail")
|
||||
finally:
|
||||
close()
|
||||
|
||||
def test_cancellation_before_start(self) -> None:
|
||||
url, _requests, close = self._start_proxy(
|
||||
[sse(response_started(), shell_call(), response_completed())]
|
||||
)
|
||||
cancel_event = threading.Event()
|
||||
cancel_event.set()
|
||||
try:
|
||||
client = Codex(CodexOptions(codex_path_override=str(CODEX_EXEC_PATH), base_url=url, api_key="test"))
|
||||
thread = client.start_thread()
|
||||
with self.assertRaises(Exception):
|
||||
thread.run("Hello, world!", TurnOptions(cancellation_event=cancel_event))
|
||||
finally:
|
||||
close()
|
||||
|
||||
def test_cancellation_during_iteration(self) -> None:
|
||||
def endless_shell_calls():
|
||||
while True:
|
||||
yield sse(response_started(), shell_call(), response_completed())
|
||||
|
||||
url, _requests, close = self._start_proxy(endless_shell_calls())
|
||||
cancel_event = threading.Event()
|
||||
try:
|
||||
client = Codex(CodexOptions(codex_path_override=str(CODEX_EXEC_PATH), base_url=url, api_key="test"))
|
||||
thread = client.start_thread()
|
||||
turn = thread.run_streamed("Hello, world!", TurnOptions(cancellation_event=cancel_event))
|
||||
|
||||
def cancel_soon():
|
||||
time.sleep(0.05)
|
||||
cancel_event.set()
|
||||
|
||||
threading.Thread(target=cancel_soon, daemon=True).start()
|
||||
with self.assertRaises(Exception):
|
||||
for _event in turn.events:
|
||||
pass
|
||||
finally:
|
||||
close()
|
||||
Reference in New Issue
Block a user