Compare commits

...

49 Commits

Author SHA1 Message Date
Shaqayeq
b72b071e63 2026-03-08 Clarify experimental python SDK README
Description:
- rewrite the README around install, core concepts, and quickstart usage
- make the experimental status and current limitations obvious up front
- keep maintainer details but move them after the user-facing guidance
2026-03-08 00:56:41 -08:00
Shaqayeq
4bcc7f5a60 2026-03-08 Clarify experimental python SDK README
Description:
- rewrite the README around install, core concepts, and quickstart usage
- make the experimental status and current limitations obvious up front
- keep maintainer details but move them after the user-facing guidance
2026-03-08 00:55:07 -08:00
Shaqayeq
9a3a9374aa 2026-03-07 Harden experimental python SDK runtime
Description:
- clean up sync and async initialization failure paths and serialize async first use
- guard against concurrent turn consumers and fall back to completed raw response text
- mark the SDK as experimental, fix examples, and add runtime regression coverage
2026-03-08 00:55:07 -08:00
Shaqayeq
1c38a51e69 readme 2026-03-08 00:55:07 -08:00
Shaqayeq
99f760b167 python-sdk: clarify example setup and missing dependency guidance 2026-03-08 00:55:07 -08:00
Shaqayeq
66f3579e60 python-sdk: keep generation workflow test in PR2 baseline 2026-03-08 00:55:06 -08:00
Shaqayeq
6089b1dbf4 python-sdk: enforce initialize metadata and turn usage reporting 2026-03-08 00:55:06 -08:00
Shaqayeq
70a1b059e3 python-sdk: add notebook bootstrap + integration coverage 2026-03-08 00:55:06 -08:00
Shaqayeq
ba35a1db1c python-sdk: run real integration tests unconditionally 2026-03-08 00:55:06 -08:00
Shaqayeq
80f85ee875 python-sdk: remove unused thread session wrapper layer 2026-03-08 00:55:06 -08:00
Shaqayeq
964901919e python-sdk: remove duplicate thread bind api from public surface 2026-03-08 00:55:05 -08:00
Shaqayeq
73d8c49dba python-sdk: remove dead helpers and unused error payload 2026-03-08 00:55:05 -08:00
Shaqayeq
870ea5d567 python-sdk: normalize blank codex event ids 2026-03-08 00:55:05 -08:00
Shaqayeq
175eda790b python-sdk: fix notification typing and out-of-box examples/notebook 2026-03-08 00:55:05 -08:00
Shaqayeq
28692c0daf python-sdk: align generation workflow test with registry/event codegen 2026-03-08 00:55:05 -08:00
Shaqayeq
1f26141189 python-sdk: move thread lifecycle transitions to Codex 2026-03-08 00:55:05 -08:00
Shaqayeq
8762bcb040 python-sdk: add model selection and advanced turn params examples 2026-03-08 00:55:04 -08:00
Shaqayeq
62c8871eb4 python-sdk: enforce generation workflow and binary selection 2026-03-08 00:55:04 -08:00
Shaqayeq
39a9b450b1 python-sdk: stream mini-cli output and add turn params example 2026-03-08 00:55:04 -08:00
Shaqayeq
cf41126ae5 python-sdk: fix image examples with stable assets 2026-03-08 00:55:04 -08:00
Shaqayeq
cac276fc5d python-sdk: tighten typed public API parity 2026-03-08 00:55:04 -08:00
Shaqayeq
35552ef1e7 python-sdk: add complex sync/async public API coverage flows 2026-03-08 00:55:03 -08:00
Shaqayeq
208c933412 python-sdk: generate flat typed public API signatures 2026-03-08 00:55:03 -08:00
Shaqayeq
9e917cb26e python-sdk: make examples resilient and fix async startup 2026-03-08 00:55:03 -08:00
Shaqayeq
bc2b30fb76 python-sdk: remove dead typing and unused context args 2026-03-08 00:55:03 -08:00
Shaqayeq
8dc631408b python-sdk: harden notebook bootstrap for AsyncCodex import 2026-03-08 00:55:03 -08:00
Shaqayeq
4b7cdfe5f5 python-sdk: add AsyncCodex parity and migrate examples/notebook to public API 2026-03-08 00:55:02 -08:00
Shaqayeq
30abc6bfa4 python-sdk: enforce typed public API and typed examples 2026-03-08 00:55:02 -08:00
Shaqayeq
e008114906 Restructure public API hierarchy: thread methods on Thread, turn actions on Turn, typed params only 2026-03-08 00:55:02 -08:00
Shaqayeq
307ccc7020 Move turn_steer/turn_interrupt to Thread and keep turn on Thread as single entry 2026-03-08 00:55:02 -08:00
Shaqayeq
a3c3f64188 Add flattened public type aliases and use them in public_api 2026-03-08 00:55:02 -08:00
Shaqayeq
fad7b459f8 Fix notebook bootstrap path handling and restore example/test compatibility after typing changes 2026-03-08 00:55:01 -08:00
Shaqayeq
9ad038b48b Rename minimal.py to public_api.py 2026-03-08 00:55:01 -08:00
Shaqayeq
1580432ee7 Notebook: use public SDK APIs only (no _client internals) 2026-03-08 00:55:01 -08:00
Shaqayeq
76585b071b Notebook: remove non-guaranteed temperature param; keep SDK-backed examples only 2026-03-08 00:55:01 -08:00
Shaqayeq
f4c686ff79 Notebook: add reasoning effort and thread/turn parameter examples 2026-03-08 00:55:01 -08:00
Shaqayeq
d6b3275e6e Simplify notebook to researcher-style feature walkthrough cells 2026-03-08 00:55:01 -08:00
Shaqayeq
ea72c908d1 Turn notebook into multi-case SDK validation harness with failure diagnostics 2026-03-08 00:55:00 -08:00
Shaqayeq
03d271399b Rewrite advanced notebook: single bootstrap cell, concurrent flows, and robust error handling 2026-03-08 00:55:00 -08:00
Shaqayeq
386a8c8801 Notebook: install SDK dependencies in kernel via editable install 2026-03-08 00:55:00 -08:00
Shaqayeq
6cd34511d9 Add notebook bootstrap cell for relative local SDK imports 2026-03-08 00:55:00 -08:00
Shaqayeq
fa7d4e2d94 Add advanced Python SDK walkthrough notebook 2026-03-08 00:55:00 -08:00
Shaqayeq
2ff8e1fa4e Finalize PR3: rebase on PR2, pin executable binary, and make integration examples pass 2026-03-08 00:54:59 -08:00
Shaqayeq
35d03a0508 2026-03-07 Stabilize experimental python SDK foundation
Description:
- pin datamodel-code-generator to an exact version and invoke it through the active interpreter
- make generate_types own the maintained generated surfaces and regenerate committed artifacts
- make sdk/python tests hermetic and regeneration checks idempotent
2026-03-08 00:47:14 -08:00
Michael Bolin
dcc4d7b634 linux-sandbox: honor split filesystem policies in bwrap (#13453)
## Why

After `#13449`, the Linux helper could receive split filesystem and
network policies, but the bubblewrap mount builder still reconstructed
filesystem access from the legacy `SandboxPolicy`.

That loses explicit unreadable carveouts under writable roots, and it
also mishandles `Root` read access paired with explicit deny carveouts.
In those cases bubblewrap could still expose paths that the split
filesystem policy intentionally blocked.

## What changed

- switched bubblewrap mount generation to consume
`FileSystemSandboxPolicy` directly at the implementation boundary;
legacy `SandboxPolicy` configs still flow through the existing
`FileSystemSandboxPolicy::from(&sandbox_policy)` bridge before reaching
bwrap
- kept the Linux helper and preflight path on the split filesystem
policy all the way into bwrap
- re-applied explicit unreadable carveouts after readable and writable
mounts so blocked subpaths still win under bubblewrap
- masked denied directories with `--tmpfs` plus `--remount-ro` and
denied files with `--ro-bind-data`, preserving the backing fd until exec
- added comments in the unreadable-root masking block to explain why the
mount order and directory/file split are intentional
- updated Linux helper call sites and tests for the split-policy bwrap
path

## Verification

- added protocol coverage for root carveouts staying scoped
- added core coverage that root-write plus deny carveouts still requires
a platform sandbox
- added bwrap unit coverage for reapplying blocked carveouts after
writable binds
- added Linux integration coverage for explicit split-policy carveouts
under bubblewrap
- validated the final branch state with `cargo test -p
codex-linux-sandbox`, `cargo clippy -p codex-linux-sandbox --all-targets
-- -D warnings`, and the PR CI reruns

---
[//]: # (BEGIN SAPLING FOOTER)
Stack created with [Sapling](https://sapling-scm.com). Best reviewed
with [ReviewStack](https://reviewstack.dev/openai/codex/pull/13453).
* __->__ #13453
* #13452
* #13451
* #13449
* #13448
* #13445
* #13440
* #13439

---------

Co-authored-by: viyatb-oai <viyatb@openai.com>
2026-03-07 23:46:52 -08:00
Ahmed Ibrahim
dc19e78962 Stabilize abort task follow-up handling (#13874)
- production logic plus tests; cancel running tasks before clearing
pending turn state
- suppress follow-up model requests after cancellation and assert on
stabilized request counts instead of fixed sleeps
2026-03-07 22:56:00 -08:00
Michael Bolin
3b5fe5ca35 protocol: keep root carveouts sandboxed (#13452)
## Why

A restricted filesystem policy that grants `:root` read or write access
but also carries explicit deny entries should still behave like scoped
access with carveouts, not like unrestricted disk access.

Without that distinction, later platform backends cannot preserve
blocked subpaths under root-level permissions because the protocol layer
reports the policy as fully unrestricted.

## What changed

- taught `FileSystemSandboxPolicy` to treat root access plus explicit
deny entries as scoped access rather than full-disk access
- derived readable and writable roots from the filesystem root when root
access is combined with carveouts, while preserving the denied paths as
read-only subpaths
- added protocol coverage for root-write policies with carveouts and a
core sandboxing regression so those policies still require platform
sandboxing

## Verification

- added protocol coverage in `protocol/src/permissions.rs` and
`protocol/src/protocol.rs` for root access with explicit carveouts
- added platform-sandbox regression coverage in
`core/src/sandboxing/mod.rs`
- verified the current PR state with `just clippy`




---
[//]: # (BEGIN SAPLING FOOTER)
Stack created with [Sapling](https://sapling-scm.com). Best reviewed
with [ReviewStack](https://reviewstack.dev/openai/codex/pull/13452).
* #13453
* __->__ #13452
* #13451
* #13449
* #13448
* #13445
* #13440
* #13439

---------

Co-authored-by: viyatb-oai <viyatb@openai.com>
2026-03-07 21:15:47 -08:00
Michael Bolin
46b8d127cf sandboxing: preserve denied paths when widening permissions (#13451)
## Why

After the split-policy plumbing landed, additional-permissions widening
still rebuilt filesystem access through the legacy projection in a few
places.

That can erase explicit deny entries and make the runtime treat a policy
as fully writable even when it still has blocked subpaths, which in turn
can skip the platform sandbox when it is still needed.

## What changed

- preserved explicit deny entries when merging additional read and write
permissions into `FileSystemSandboxPolicy`
- switched platform-sandbox selection to rely on
`FileSystemSandboxPolicy::has_full_disk_write_access()` instead of ad
hoc root-write checks
- kept the widened policy path in `core/src/exec.rs` and
`core/src/sandboxing/mod.rs` aligned so denied subpaths survive both
policy merging and sandbox selection
- added regression coverage for root-write policies that still carry
carveouts

## Verification

- added regression coverage in `core/src/sandboxing/mod.rs` showing that
root write plus carveouts still requires the platform sandbox
- verified the current PR state with `just clippy`




---
[//]: # (BEGIN SAPLING FOOTER)
Stack created with [Sapling](https://sapling-scm.com). Best reviewed
with [ReviewStack](https://reviewstack.dev/openai/codex/pull/13451).
* #13453
* #13452
* __->__ #13451
* #13449
* #13448
* #13445
* #13440
* #13439

---------

Co-authored-by: viyatb-oai <viyatb@openai.com>
2026-03-08 04:29:35 +00:00
Michael Bolin
07a30da3fb linux-sandbox: plumb split sandbox policies through helper (#13449)
## Why

The Linux sandbox helper still only accepted the legacy `SandboxPolicy`
payload.

That meant the runtime could compute split filesystem and network
policies, but the helper would immediately collapse them back to the
compatibility projection before applying seccomp or staging the
bubblewrap inner command.

## What changed

- added hidden `--file-system-sandbox-policy` and
`--network-sandbox-policy` flags alongside the legacy `--sandbox-policy`
flag so the helper can migrate incrementally
- updated the core-side Landlock wrapper to pass the split policies
explicitly when launching `codex-linux-sandbox`
- added helper-side resolution logic that accepts either the legacy
policy alone or a complete split-policy pair and normalizes that into
one effective configuration
- switched Linux helper network decisions to use `NetworkSandboxPolicy`
directly
- added `FromStr` support for the split policy types so the helper can
parse them from CLI JSON

## Verification

- added helper coverage in `linux-sandbox/src/linux_run_main_tests.rs`
for split-policy flags and policy resolution
- added CLI argument coverage in `core/src/landlock.rs`
- verified the current PR state with `just clippy`




---
[//]: # (BEGIN SAPLING FOOTER)
Stack created with [Sapling](https://sapling-scm.com). Best reviewed
with [ReviewStack](https://reviewstack.dev/openai/codex/pull/13449).
* #13453
* #13452
* #13451
* __->__ #13449
* #13448
* #13445
* #13440
* #13439

---------

Co-authored-by: viyatb-oai <viyatb@openai.com>
2026-03-07 19:40:10 -08:00
174 changed files with 20646 additions and 215 deletions

View File

@@ -0,0 +1,23 @@
use anyhow::Result;
use anyhow::anyhow;
use std::env;
use std::path::PathBuf;
fn main() -> Result<()> {
let mut args = env::args_os().skip(1);
let output_path = PathBuf::from(
args.next()
.ok_or_else(|| anyhow!("missing output path argument"))?,
);
let payload = args
.next()
.ok_or_else(|| anyhow!("missing payload argument"))?
.into_string()
.map_err(|_| anyhow!("payload must be valid UTF-8"))?;
let temp_path = output_path.with_extension("json.tmp");
std::fs::write(&temp_path, payload)?;
std::fs::rename(&temp_path, &output_path)?;
Ok(())
}

View File

@@ -14,6 +14,7 @@ use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::UserInput as V2UserInput;
use codex_utils_cargo_bin::cargo_bin;
use core_test_support::fs_wait;
use pretty_assertions::assert_eq;
use serde_json::Value;
@@ -191,29 +192,22 @@ async fn turn_start_notify_payload_includes_initialize_client_name() -> Result<(
let responses = vec![create_final_assistant_message_sse_response("Done")?];
let server = create_mock_responses_server_sequence_unchecked(responses).await;
let codex_home = TempDir::new()?;
let notify_script = codex_home.path().join("notify.py");
std::fs::write(
&notify_script,
r#"from pathlib import Path
import sys
payload_path = Path(__file__).with_name("notify.json")
tmp_path = payload_path.with_suffix(".json.tmp")
tmp_path.write_text(sys.argv[-1], encoding="utf-8")
tmp_path.replace(payload_path)
"#,
)?;
let notify_file = codex_home.path().join("notify.json");
let notify_script = notify_script
let notify_capture = cargo_bin("test_notify_capture")?;
let notify_capture = notify_capture
.to_str()
.expect("notify script path should be valid UTF-8");
.expect("notify capture path should be valid UTF-8");
let notify_file = notify_file
.to_str()
.expect("notify output path should be valid UTF-8");
create_config_toml_with_extra(
codex_home.path(),
&server.uri(),
"never",
&format!(
"notify = [\"python3\", {}]",
toml_basic_string(notify_script)
"notify = [{}, {}]",
toml_basic_string(notify_capture),
toml_basic_string(notify_file)
),
)?;
@@ -261,8 +255,9 @@ tmp_path.replace(payload_path)
)
.await??;
fs_wait::wait_for_path_exists(&notify_file, Duration::from_secs(5)).await?;
let payload_raw = tokio::fs::read_to_string(&notify_file).await?;
let notify_file = Path::new(notify_file);
fs_wait::wait_for_path_exists(notify_file, Duration::from_secs(5)).await?;
let payload_raw = tokio::fs::read_to_string(notify_file).await?;
let payload: Value = serde_json::from_str(&payload_raw)?;
assert_eq!(payload["client"], "xcode");

View File

@@ -2702,8 +2702,9 @@ impl Session {
/// Emit an exec approval request event and await the user's decision.
///
/// The request is keyed by `call_id` + `approval_id` so matching responses
/// are delivered to the correct in-flight turn. If the task is aborted,
/// this returns the default `ReviewDecision` (`Denied`).
/// are delivered to the correct in-flight turn. If the pending approval is
/// cleared before a response arrives, treat it as an abort so interrupted
/// turns do not continue on a synthetic denial.
///
/// Note that if `available_decisions` is `None`, then the other fields will
/// be used to derive the available decisions via
@@ -2777,7 +2778,7 @@ impl Session {
parsed_cmd,
});
self.send_event(turn_context, event).await;
rx_approve.await.unwrap_or_default()
rx_approve.await.unwrap_or(ReviewDecision::Abort)
}
pub async fn request_patch_approval(
@@ -6859,6 +6860,10 @@ async fn try_run_sampling_request(
drain_in_flight(&mut in_flight, sess.clone(), turn_context.clone()).await?;
if cancellation_token.is_cancelled() {
return Err(CodexErr::TurnAborted);
}
if should_emit_turn_diff {
let unified_diff = {
let mut tracker = turn_diff_tracker.lock().await;

View File

@@ -3,6 +3,7 @@ use crate::spawn::SpawnChildRequest;
use crate::spawn::StdioPolicy;
use crate::spawn::spawn_child_async;
use codex_network_proxy::NetworkProxy;
use codex_protocol::permissions::FileSystemSandboxPolicy;
use codex_protocol::permissions::NetworkSandboxPolicy;
use std::collections::HashMap;
use std::path::Path;
@@ -14,9 +15,9 @@ use tokio::process::Child;
/// isolation plus seccomp for network restrictions.
///
/// Unlike macOS Seatbelt where we directly embed the policy text, the Linux
/// helper accepts a list of `--sandbox-permission`/`-s` flags mirroring the
/// public CLI. We convert the internal [`SandboxPolicy`] representation into
/// the equivalent CLI options.
/// helper is a separate executable. We pass the legacy [`SandboxPolicy`] plus
/// split filesystem/network policies as JSON so the helper can migrate
/// incrementally without breaking older call sites.
#[allow(clippy::too_many_arguments)]
pub async fn spawn_command_under_linux_sandbox<P>(
codex_linux_sandbox_exe: P,
@@ -32,9 +33,13 @@ pub async fn spawn_command_under_linux_sandbox<P>(
where
P: AsRef<Path>,
{
let args = create_linux_sandbox_command_args(
let file_system_sandbox_policy = FileSystemSandboxPolicy::from(sandbox_policy);
let network_sandbox_policy = NetworkSandboxPolicy::from(sandbox_policy);
let args = create_linux_sandbox_command_args_for_policies(
command,
sandbox_policy,
&file_system_sandbox_policy,
network_sandbox_policy,
sandbox_policy_cwd,
use_bwrap_sandbox,
allow_network_for_proxy(false),
@@ -45,7 +50,7 @@ where
args,
arg0,
cwd: command_cwd,
network_sandbox_policy: NetworkSandboxPolicy::from(sandbox_policy),
network_sandbox_policy,
network,
stdio_policy,
env,
@@ -60,32 +65,43 @@ pub(crate) fn allow_network_for_proxy(enforce_managed_network: bool) -> bool {
enforce_managed_network
}
/// Converts the sandbox policy into the CLI invocation for `codex-linux-sandbox`.
/// Converts the sandbox policies into the CLI invocation for
/// `codex-linux-sandbox`.
///
/// The helper performs the actual sandboxing (bubblewrap + seccomp) after
/// parsing these arguments. See `docs/linux_sandbox.md` for the Linux semantics.
pub(crate) fn create_linux_sandbox_command_args(
/// parsing these arguments. Policy JSON flags are emitted before helper feature
/// flags so the argv order matches the helper's CLI shape. See
/// `docs/linux_sandbox.md` for the Linux semantics.
#[allow(clippy::too_many_arguments)]
pub(crate) fn create_linux_sandbox_command_args_for_policies(
command: Vec<String>,
sandbox_policy: &SandboxPolicy,
file_system_sandbox_policy: &FileSystemSandboxPolicy,
network_sandbox_policy: NetworkSandboxPolicy,
sandbox_policy_cwd: &Path,
use_bwrap_sandbox: bool,
allow_network_for_proxy: bool,
) -> Vec<String> {
#[expect(clippy::expect_used)]
let sandbox_policy_json = serde_json::to_string(sandbox_policy)
.unwrap_or_else(|err| panic!("failed to serialize sandbox policy: {err}"));
let file_system_policy_json = serde_json::to_string(file_system_sandbox_policy)
.unwrap_or_else(|err| panic!("failed to serialize filesystem sandbox policy: {err}"));
let network_policy_json = serde_json::to_string(&network_sandbox_policy)
.unwrap_or_else(|err| panic!("failed to serialize network sandbox policy: {err}"));
let sandbox_policy_cwd = sandbox_policy_cwd
.to_str()
.expect("cwd must be valid UTF-8")
.unwrap_or_else(|| panic!("cwd must be valid UTF-8"))
.to_string();
#[expect(clippy::expect_used)]
let sandbox_policy_json =
serde_json::to_string(sandbox_policy).expect("Failed to serialize SandboxPolicy to JSON");
let mut linux_cmd: Vec<String> = vec![
"--sandbox-policy-cwd".to_string(),
sandbox_policy_cwd,
"--sandbox-policy".to_string(),
sandbox_policy_json,
"--file-system-sandbox-policy".to_string(),
file_system_policy_json,
"--network-sandbox-policy".to_string(),
network_policy_json,
];
if use_bwrap_sandbox {
linux_cmd.push("--use-bwrap-sandbox".to_string());
@@ -93,6 +109,32 @@ pub(crate) fn create_linux_sandbox_command_args(
if allow_network_for_proxy {
linux_cmd.push("--allow-network-for-proxy".to_string());
}
linux_cmd.push("--".to_string());
linux_cmd.extend(command);
linux_cmd
}
/// Converts the sandbox cwd and execution options into the CLI invocation for
/// `codex-linux-sandbox`.
#[cfg(test)]
pub(crate) fn create_linux_sandbox_command_args(
command: Vec<String>,
sandbox_policy_cwd: &Path,
use_bwrap_sandbox: bool,
allow_network_for_proxy: bool,
) -> Vec<String> {
let sandbox_policy_cwd = sandbox_policy_cwd
.to_str()
.unwrap_or_else(|| panic!("cwd must be valid UTF-8"))
.to_string();
let mut linux_cmd: Vec<String> = vec!["--sandbox-policy-cwd".to_string(), sandbox_policy_cwd];
if use_bwrap_sandbox {
linux_cmd.push("--use-bwrap-sandbox".to_string());
}
if allow_network_for_proxy {
linux_cmd.push("--allow-network-for-proxy".to_string());
}
// Separator so that command arguments starting with `-` are not parsed as
// options of the helper itself.
@@ -113,16 +155,14 @@ mod tests {
fn bwrap_flags_are_feature_gated() {
let command = vec!["/bin/true".to_string()];
let cwd = Path::new("/tmp");
let policy = SandboxPolicy::new_read_only_policy();
let with_bwrap =
create_linux_sandbox_command_args(command.clone(), &policy, cwd, true, false);
let with_bwrap = create_linux_sandbox_command_args(command.clone(), cwd, true, false);
assert_eq!(
with_bwrap.contains(&"--use-bwrap-sandbox".to_string()),
true
);
let without_bwrap = create_linux_sandbox_command_args(command, &policy, cwd, false, false);
let without_bwrap = create_linux_sandbox_command_args(command, cwd, false, false);
assert_eq!(
without_bwrap.contains(&"--use-bwrap-sandbox".to_string()),
false
@@ -133,15 +173,46 @@ mod tests {
fn proxy_flag_is_included_when_requested() {
let command = vec!["/bin/true".to_string()];
let cwd = Path::new("/tmp");
let policy = SandboxPolicy::new_read_only_policy();
let args = create_linux_sandbox_command_args(command, &policy, cwd, true, true);
let args = create_linux_sandbox_command_args(command, cwd, true, true);
assert_eq!(
args.contains(&"--allow-network-for-proxy".to_string()),
true
);
}
#[test]
fn split_policy_flags_are_included() {
let command = vec!["/bin/true".to_string()];
let cwd = Path::new("/tmp");
let sandbox_policy = SandboxPolicy::new_read_only_policy();
let file_system_sandbox_policy = FileSystemSandboxPolicy::from(&sandbox_policy);
let network_sandbox_policy = NetworkSandboxPolicy::from(&sandbox_policy);
let args = create_linux_sandbox_command_args_for_policies(
command,
&sandbox_policy,
&file_system_sandbox_policy,
network_sandbox_policy,
cwd,
true,
false,
);
assert_eq!(
args.windows(2).any(|window| {
window[0] == "--file-system-sandbox-policy" && !window[1].is_empty()
}),
true
);
assert_eq!(
args.windows(2)
.any(|window| window[0] == "--network-sandbox-policy"
&& window[1] == "\"restricted\""),
true
);
}
#[test]
fn proxy_network_requires_managed_requirements() {
assert_eq!(allow_network_for_proxy(false), false);

View File

@@ -14,7 +14,7 @@ use crate::exec::SandboxType;
use crate::exec::StdoutStream;
use crate::exec::execute_exec_request;
use crate::landlock::allow_network_for_proxy;
use crate::landlock::create_linux_sandbox_command_args;
use crate::landlock::create_linux_sandbox_command_args_for_policies;
use crate::protocol::SandboxPolicy;
#[cfg(target_os = "macos")]
use crate::seatbelt::MACOS_PATH_TO_SEATBELT_EXECUTABLE;
@@ -35,7 +35,6 @@ use codex_protocol::permissions::FileSystemPath;
use codex_protocol::permissions::FileSystemSandboxEntry;
use codex_protocol::permissions::FileSystemSandboxKind;
use codex_protocol::permissions::FileSystemSandboxPolicy;
use codex_protocol::permissions::FileSystemSpecialPath;
use codex_protocol::permissions::NetworkSandboxPolicy;
use codex_protocol::protocol::NetworkAccess;
use codex_protocol::protocol::ReadOnlyAccess;
@@ -215,7 +214,6 @@ fn additional_permission_roots(
)
}
#[cfg_attr(not(test), allow(dead_code))]
fn merge_file_system_policy_with_additional_permissions(
file_system_policy: &FileSystemSandboxPolicy,
extra_reads: Vec<AbsolutePathBuf>,
@@ -369,14 +367,7 @@ pub(crate) fn should_require_platform_sandbox(
}
match file_system_policy.kind {
FileSystemSandboxKind::Restricted => !file_system_policy.entries.iter().any(|entry| {
entry.access == FileSystemAccessMode::Write
&& matches!(
&entry.path,
FileSystemPath::Special { value }
if matches!(value, FileSystemSpecialPath::Root)
)
}),
FileSystemSandboxKind::Restricted => !file_system_policy.has_full_disk_write_access(),
FileSystemSandboxKind::Unrestricted | FileSystemSandboxKind::ExternalSandbox => false,
}
}
@@ -516,9 +507,11 @@ impl SandboxManager {
let exe = codex_linux_sandbox_exe
.ok_or(SandboxTransformError::MissingLinuxSandboxExecutable)?;
let allow_proxy_network = allow_network_for_proxy(enforce_managed_network);
let mut args = create_linux_sandbox_command_args(
let mut args = create_linux_sandbox_command_args_for_policies(
command.clone(),
&effective_policy,
&effective_file_system_policy,
effective_network_policy,
sandbox_policy_cwd,
use_linux_sandbox_bwrap,
allow_proxy_network,
@@ -680,6 +673,32 @@ mod tests {
);
}
#[test]
fn root_write_policy_with_carveouts_still_uses_platform_sandbox() {
let blocked = AbsolutePathBuf::resolve_path_against_base(
"blocked",
std::env::current_dir().expect("current dir"),
)
.expect("blocked path");
let policy = FileSystemSandboxPolicy::restricted(vec![
FileSystemSandboxEntry {
path: FileSystemPath::Special {
value: FileSystemSpecialPath::Root,
},
access: FileSystemAccessMode::Write,
},
FileSystemSandboxEntry {
path: FileSystemPath::Path { path: blocked },
access: FileSystemAccessMode::None,
},
]);
assert_eq!(
should_require_platform_sandbox(&policy, NetworkSandboxPolicy::Enabled, false),
true
);
}
#[test]
fn full_access_restricted_policy_still_uses_platform_sandbox_for_restricted_network() {
let policy = FileSystemSandboxPolicy::restricted(vec![FileSystemSandboxEntry {

View File

@@ -10,12 +10,14 @@
//! - seccomp + `PR_SET_NO_NEW_PRIVS` applied in-process, and
//! - bubblewrap used to construct the filesystem view before exec.
use std::collections::BTreeSet;
use std::fs::File;
use std::os::fd::AsRawFd;
use std::path::Path;
use std::path::PathBuf;
use codex_core::error::CodexErr;
use codex_core::error::Result;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::FileSystemSandboxPolicy;
use codex_protocol::protocol::WritableRoot;
/// Linux "platform defaults" that keep common system binaries and dynamic
@@ -76,6 +78,12 @@ impl BwrapNetworkMode {
}
}
#[derive(Debug)]
pub(crate) struct BwrapArgs {
pub args: Vec<String>,
pub preserved_files: Vec<File>,
}
/// Wrap a command with bubblewrap so the filesystem is read-only by default,
/// with explicit writable roots and read-only subpaths layered afterward.
///
@@ -85,22 +93,25 @@ impl BwrapNetworkMode {
/// namespace restrictions apply while preserving full filesystem access.
pub(crate) fn create_bwrap_command_args(
command: Vec<String>,
sandbox_policy: &SandboxPolicy,
file_system_sandbox_policy: &FileSystemSandboxPolicy,
cwd: &Path,
options: BwrapOptions,
) -> Result<Vec<String>> {
if sandbox_policy.has_full_disk_write_access() {
) -> Result<BwrapArgs> {
if file_system_sandbox_policy.has_full_disk_write_access() {
return if options.network_mode == BwrapNetworkMode::FullAccess {
Ok(command)
Ok(BwrapArgs {
args: command,
preserved_files: Vec::new(),
})
} else {
Ok(create_bwrap_flags_full_filesystem(command, options))
};
}
create_bwrap_flags(command, sandbox_policy, cwd, options)
create_bwrap_flags(command, file_system_sandbox_policy, cwd, options)
}
fn create_bwrap_flags_full_filesystem(command: Vec<String>, options: BwrapOptions) -> Vec<String> {
fn create_bwrap_flags_full_filesystem(command: Vec<String>, options: BwrapOptions) -> BwrapArgs {
let mut args = vec![
"--new-session".to_string(),
"--die-with-parent".to_string(),
@@ -121,20 +132,27 @@ fn create_bwrap_flags_full_filesystem(command: Vec<String>, options: BwrapOption
}
args.push("--".to_string());
args.extend(command);
args
BwrapArgs {
args,
preserved_files: Vec::new(),
}
}
/// Build the bubblewrap flags (everything after `argv[0]`).
fn create_bwrap_flags(
command: Vec<String>,
sandbox_policy: &SandboxPolicy,
file_system_sandbox_policy: &FileSystemSandboxPolicy,
cwd: &Path,
options: BwrapOptions,
) -> Result<Vec<String>> {
) -> Result<BwrapArgs> {
let BwrapArgs {
args: filesystem_args,
preserved_files,
} = create_filesystem_args(file_system_sandbox_policy, cwd)?;
let mut args = Vec::new();
args.push("--new-session".to_string());
args.push("--die-with-parent".to_string());
args.extend(create_filesystem_args(sandbox_policy, cwd)?);
args.extend(filesystem_args);
// Request a user namespace explicitly rather than relying on bubblewrap's
// auto-enable behavior, which is skipped when the caller runs as uid 0.
args.push("--unshare-user".to_string());
@@ -150,25 +168,35 @@ fn create_bwrap_flags(
}
args.push("--".to_string());
args.extend(command);
Ok(args)
Ok(BwrapArgs {
args,
preserved_files,
})
}
/// Build the bubblewrap filesystem mounts for a given sandbox policy.
/// Build the bubblewrap filesystem mounts for a given filesystem policy.
///
/// The mount order is important:
/// 1. Full-read policies use `--ro-bind / /`; restricted-read policies start
/// from `--tmpfs /` and layer scoped `--ro-bind` mounts.
/// 1. Full-read policies, and restricted policies that explicitly read `/`,
/// use `--ro-bind / /`; other restricted-read policies start from
/// `--tmpfs /` and layer scoped `--ro-bind` mounts.
/// 2. `--dev /dev` mounts a minimal writable `/dev` with standard device nodes
/// (including `/dev/urandom`) even under a read-only root.
/// 3. `--bind <root> <root>` re-enables writes for allowed roots, including
/// writable subpaths under `/dev` (for example, `/dev/shm`).
/// 4. `--ro-bind <subpath> <subpath>` re-applies read-only protections under
/// those writable roots so protected subpaths win.
fn create_filesystem_args(sandbox_policy: &SandboxPolicy, cwd: &Path) -> Result<Vec<String>> {
let writable_roots = sandbox_policy.get_writable_roots_with_cwd(cwd);
/// 5. Explicit unreadable roots are masked last so deny carveouts still win
/// even when the readable baseline includes `/`.
fn create_filesystem_args(
file_system_sandbox_policy: &FileSystemSandboxPolicy,
cwd: &Path,
) -> Result<BwrapArgs> {
let writable_roots = file_system_sandbox_policy.get_writable_roots_with_cwd(cwd);
let unreadable_roots = file_system_sandbox_policy.get_unreadable_roots_with_cwd(cwd);
ensure_mount_targets_exist(&writable_roots)?;
let mut args = if sandbox_policy.has_full_disk_read_access() {
let mut args = if file_system_sandbox_policy.has_full_disk_read_access() {
// Read-only root, then mount a minimal device tree.
// In bubblewrap (`bubblewrap.c`, `SETUP_MOUNT_DEV`), `--dev /dev`
// creates the standard minimal nodes: null, zero, full, random,
@@ -191,12 +219,12 @@ fn create_filesystem_args(sandbox_policy: &SandboxPolicy, cwd: &Path) -> Result<
"/dev".to_string(),
];
let mut readable_roots: BTreeSet<PathBuf> = sandbox_policy
let mut readable_roots: BTreeSet<PathBuf> = file_system_sandbox_policy
.get_readable_roots_with_cwd(cwd)
.into_iter()
.map(PathBuf::from)
.collect();
if sandbox_policy.include_platform_defaults() {
if file_system_sandbox_policy.include_platform_defaults() {
readable_roots.extend(
LINUX_PLATFORM_DEFAULT_READ_ROOTS
.iter()
@@ -206,7 +234,8 @@ fn create_filesystem_args(sandbox_policy: &SandboxPolicy, cwd: &Path) -> Result<
}
// A restricted policy can still explicitly request `/`, which is
// semantically equivalent to broad read access.
// the broad read baseline. Explicit unreadable carveouts are
// re-applied later.
if readable_roots.iter().any(|root| root == Path::new("/")) {
args = vec![
"--ro-bind".to_string(),
@@ -228,6 +257,7 @@ fn create_filesystem_args(sandbox_policy: &SandboxPolicy, cwd: &Path) -> Result<
args
};
let mut preserved_files = Vec::new();
for writable_root in &writable_roots {
let root = writable_root.root.as_path();
@@ -271,7 +301,44 @@ fn create_filesystem_args(sandbox_policy: &SandboxPolicy, cwd: &Path) -> Result<
}
}
Ok(args)
if !unreadable_roots.is_empty() {
// Apply explicit deny carveouts after all readable and writable mounts
// so they win even when the broader baseline includes `/` or a writable
// parent path.
let null_file = File::open("/dev/null")?;
let null_fd = null_file.as_raw_fd().to_string();
for unreadable_root in unreadable_roots {
let unreadable_root = unreadable_root.as_path();
if unreadable_root.is_dir() {
// Bubblewrap cannot bind `/dev/null` over a directory, so mask
// denied directories by overmounting them with an empty tmpfs
// and then remounting that tmpfs read-only.
args.push("--perms".to_string());
args.push("000".to_string());
args.push("--tmpfs".to_string());
args.push(path_to_string(unreadable_root));
args.push("--remount-ro".to_string());
args.push(path_to_string(unreadable_root));
continue;
}
// For files, bind a stable null-file payload over the original path
// so later reads do not expose host contents. `--ro-bind-data`
// expects a live fd number, so keep the backing file open until we
// exec bubblewrap below.
args.push("--perms".to_string());
args.push("000".to_string());
args.push("--ro-bind-data".to_string());
args.push(null_fd.clone());
args.push(path_to_string(unreadable_root));
}
preserved_files.push(null_file);
}
Ok(BwrapArgs {
args,
preserved_files,
})
}
/// Collect unique read-only subpaths across all writable roots.
@@ -386,6 +453,11 @@ fn find_first_non_existent_component(target_path: &Path) -> Option<PathBuf> {
#[cfg(test)]
mod tests {
use super::*;
use codex_protocol::protocol::FileSystemAccessMode;
use codex_protocol::protocol::FileSystemPath;
use codex_protocol::protocol::FileSystemSandboxEntry;
use codex_protocol::protocol::FileSystemSandboxPolicy;
use codex_protocol::protocol::FileSystemSpecialPath;
use codex_protocol::protocol::ReadOnlyAccess;
use codex_protocol::protocol::SandboxPolicy;
use codex_utils_absolute_path::AbsolutePathBuf;
@@ -397,7 +469,7 @@ mod tests {
let command = vec!["/bin/true".to_string()];
let args = create_bwrap_command_args(
command.clone(),
&SandboxPolicy::DangerFullAccess,
&FileSystemSandboxPolicy::from(&SandboxPolicy::DangerFullAccess),
Path::new("/"),
BwrapOptions {
mount_proc: true,
@@ -406,7 +478,7 @@ mod tests {
)
.expect("create bwrap args");
assert_eq!(args, command);
assert_eq!(args.args, command);
}
#[test]
@@ -414,7 +486,7 @@ mod tests {
let command = vec!["/bin/true".to_string()];
let args = create_bwrap_command_args(
command,
&SandboxPolicy::DangerFullAccess,
&FileSystemSandboxPolicy::from(&SandboxPolicy::DangerFullAccess),
Path::new("/"),
BwrapOptions {
mount_proc: true,
@@ -424,7 +496,7 @@ mod tests {
.expect("create bwrap args");
assert_eq!(
args,
args.args,
vec![
"--new-session".to_string(),
"--die-with-parent".to_string(),
@@ -452,9 +524,13 @@ mod tests {
exclude_slash_tmp: true,
};
let args = create_filesystem_args(&sandbox_policy, Path::new("/")).expect("bwrap fs args");
let args = create_filesystem_args(
&FileSystemSandboxPolicy::from(&sandbox_policy),
Path::new("/"),
)
.expect("bwrap fs args");
assert_eq!(
args,
args.args,
vec![
"--ro-bind".to_string(),
"/".to_string(),
@@ -462,11 +538,11 @@ mod tests {
"--dev".to_string(),
"/dev".to_string(),
"--bind".to_string(),
"/dev".to_string(),
"/dev".to_string(),
"/".to_string(),
"/".to_string(),
"--bind".to_string(),
"/".to_string(),
"/".to_string(),
"/dev".to_string(),
"/dev".to_string(),
]
);
}
@@ -488,12 +564,13 @@ mod tests {
network_access: false,
};
let args = create_filesystem_args(&policy, temp_dir.path()).expect("filesystem args");
let args = create_filesystem_args(&FileSystemSandboxPolicy::from(&policy), temp_dir.path())
.expect("filesystem args");
assert_eq!(args[0..4], ["--tmpfs", "/", "--dev", "/dev"]);
assert_eq!(args.args[0..4], ["--tmpfs", "/", "--dev", "/dev"]);
let readable_root_str = path_to_string(&readable_root);
assert!(args.windows(3).any(|window| {
assert!(args.args.windows(3).any(|window| {
window
== [
"--ro-bind",
@@ -517,15 +594,138 @@ mod tests {
// `ReadOnlyAccess::Restricted` always includes `cwd` as a readable
// root. Using `"/"` here would intentionally collapse to broad read
// access, so use a non-root cwd to exercise the restricted path.
let args = create_filesystem_args(&policy, temp_dir.path()).expect("filesystem args");
let args = create_filesystem_args(&FileSystemSandboxPolicy::from(&policy), temp_dir.path())
.expect("filesystem args");
assert!(args.starts_with(&["--tmpfs".to_string(), "/".to_string()]));
assert!(
args.args
.starts_with(&["--tmpfs".to_string(), "/".to_string()])
);
if Path::new("/usr").exists() {
assert!(
args.windows(3)
args.args
.windows(3)
.any(|window| window == ["--ro-bind", "/usr", "/usr"])
);
}
}
#[test]
fn split_policy_reapplies_unreadable_carveouts_after_writable_binds() {
let temp_dir = TempDir::new().expect("temp dir");
let writable_root = temp_dir.path().join("workspace");
let blocked = writable_root.join("blocked");
std::fs::create_dir_all(&blocked).expect("create blocked dir");
let writable_root =
AbsolutePathBuf::from_absolute_path(&writable_root).expect("absolute writable root");
let blocked = AbsolutePathBuf::from_absolute_path(&blocked).expect("absolute blocked dir");
let policy = FileSystemSandboxPolicy::restricted(vec![
FileSystemSandboxEntry {
path: FileSystemPath::Path {
path: writable_root.clone(),
},
access: FileSystemAccessMode::Write,
},
FileSystemSandboxEntry {
path: FileSystemPath::Path {
path: blocked.clone(),
},
access: FileSystemAccessMode::None,
},
]);
let args = create_filesystem_args(&policy, temp_dir.path()).expect("filesystem args");
let writable_root_str = path_to_string(writable_root.as_path());
let blocked_str = path_to_string(blocked.as_path());
assert!(args.args.windows(3).any(|window| {
window
== [
"--bind",
writable_root_str.as_str(),
writable_root_str.as_str(),
]
}));
assert!(
args.args.windows(3).any(|window| {
window == ["--ro-bind", blocked_str.as_str(), blocked_str.as_str()]
})
);
}
#[test]
fn split_policy_masks_root_read_directory_carveouts() {
let temp_dir = TempDir::new().expect("temp dir");
let blocked = temp_dir.path().join("blocked");
std::fs::create_dir_all(&blocked).expect("create blocked dir");
let blocked = AbsolutePathBuf::from_absolute_path(&blocked).expect("absolute blocked dir");
let policy = FileSystemSandboxPolicy::restricted(vec![
FileSystemSandboxEntry {
path: FileSystemPath::Special {
value: FileSystemSpecialPath::Root,
},
access: FileSystemAccessMode::Read,
},
FileSystemSandboxEntry {
path: FileSystemPath::Path {
path: blocked.clone(),
},
access: FileSystemAccessMode::None,
},
]);
let args = create_filesystem_args(&policy, temp_dir.path()).expect("filesystem args");
let blocked_str = path_to_string(blocked.as_path());
assert!(
args.args
.windows(3)
.any(|window| window == ["--ro-bind", "/", "/"])
);
assert!(
args.args
.windows(4)
.any(|window| { window == ["--perms", "000", "--tmpfs", blocked_str.as_str()] })
);
assert!(
args.args
.windows(2)
.any(|window| window == ["--remount-ro", blocked_str.as_str()])
);
}
#[test]
fn split_policy_masks_root_read_file_carveouts() {
let temp_dir = TempDir::new().expect("temp dir");
let blocked_file = temp_dir.path().join("blocked.txt");
std::fs::write(&blocked_file, "secret").expect("create blocked file");
let blocked_file =
AbsolutePathBuf::from_absolute_path(&blocked_file).expect("absolute blocked file");
let policy = FileSystemSandboxPolicy::restricted(vec![
FileSystemSandboxEntry {
path: FileSystemPath::Special {
value: FileSystemSpecialPath::Root,
},
access: FileSystemAccessMode::Read,
},
FileSystemSandboxEntry {
path: FileSystemPath::Path {
path: blocked_file.clone(),
},
access: FileSystemAccessMode::None,
},
]);
let args = create_filesystem_args(&policy, temp_dir.path()).expect("filesystem args");
let blocked_file_str = path_to_string(blocked_file.as_path());
assert_eq!(args.preserved_files.len(), 1);
assert!(args.args.windows(5).any(|window| {
window[0] == "--perms"
&& window[1] == "000"
&& window[2] == "--ro-bind-data"
&& window[4] == blocked_file_str
}));
}
}

View File

@@ -8,6 +8,7 @@ use std::path::Path;
use codex_core::error::CodexErr;
use codex_core::error::Result;
use codex_core::error::SandboxErr;
use codex_protocol::protocol::NetworkSandboxPolicy;
use codex_protocol::protocol::SandboxPolicy;
use codex_utils_absolute_path::AbsolutePathBuf;
@@ -40,13 +41,14 @@ use seccompiler::apply_filter;
/// Filesystem restrictions are intentionally handled by bubblewrap.
pub(crate) fn apply_sandbox_policy_to_current_thread(
sandbox_policy: &SandboxPolicy,
network_sandbox_policy: NetworkSandboxPolicy,
cwd: &Path,
apply_landlock_fs: bool,
allow_network_for_proxy: bool,
proxy_routed_network: bool,
) -> Result<()> {
let network_seccomp_mode = network_seccomp_mode(
sandbox_policy,
network_sandbox_policy,
allow_network_for_proxy,
proxy_routed_network,
);
@@ -91,20 +93,20 @@ enum NetworkSeccompMode {
}
fn should_install_network_seccomp(
sandbox_policy: &SandboxPolicy,
network_sandbox_policy: NetworkSandboxPolicy,
allow_network_for_proxy: bool,
) -> bool {
// Managed-network sessions should remain fail-closed even for policies that
// would normally grant full network access (for example, DangerFullAccess).
!sandbox_policy.has_full_network_access() || allow_network_for_proxy
!network_sandbox_policy.is_enabled() || allow_network_for_proxy
}
fn network_seccomp_mode(
sandbox_policy: &SandboxPolicy,
network_sandbox_policy: NetworkSandboxPolicy,
allow_network_for_proxy: bool,
proxy_routed_network: bool,
) -> Option<NetworkSeccompMode> {
if !should_install_network_seccomp(sandbox_policy, allow_network_for_proxy) {
if !should_install_network_seccomp(network_sandbox_policy, allow_network_for_proxy) {
None
} else if proxy_routed_network {
Some(NetworkSeccompMode::ProxyRouted)
@@ -266,13 +268,13 @@ mod tests {
use super::NetworkSeccompMode;
use super::network_seccomp_mode;
use super::should_install_network_seccomp;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::NetworkSandboxPolicy;
use pretty_assertions::assert_eq;
#[test]
fn managed_network_enforces_seccomp_even_for_full_network_policy() {
assert_eq!(
should_install_network_seccomp(&SandboxPolicy::DangerFullAccess, true),
should_install_network_seccomp(NetworkSandboxPolicy::Enabled, true),
true
);
}
@@ -280,7 +282,7 @@ mod tests {
#[test]
fn full_network_policy_without_managed_network_skips_seccomp() {
assert_eq!(
should_install_network_seccomp(&SandboxPolicy::DangerFullAccess, false),
should_install_network_seccomp(NetworkSandboxPolicy::Enabled, false),
false
);
}
@@ -288,11 +290,11 @@ mod tests {
#[test]
fn restricted_network_policy_always_installs_seccomp() {
assert!(should_install_network_seccomp(
&SandboxPolicy::new_read_only_policy(),
NetworkSandboxPolicy::Restricted,
false
));
assert!(should_install_network_seccomp(
&SandboxPolicy::new_read_only_policy(),
NetworkSandboxPolicy::Restricted,
true
));
}
@@ -300,7 +302,7 @@ mod tests {
#[test]
fn managed_proxy_routes_use_proxy_routed_seccomp_mode() {
assert_eq!(
network_seccomp_mode(&SandboxPolicy::DangerFullAccess, true, true),
network_seccomp_mode(NetworkSandboxPolicy::Enabled, true, true),
Some(NetworkSeccompMode::ProxyRouted)
);
}
@@ -308,7 +310,7 @@ mod tests {
#[test]
fn restricted_network_without_proxy_routing_uses_restricted_mode() {
assert_eq!(
network_seccomp_mode(&SandboxPolicy::new_read_only_policy(), false, false),
network_seccomp_mode(NetworkSandboxPolicy::Restricted, false, false),
Some(NetworkSeccompMode::Restricted)
);
}
@@ -316,7 +318,7 @@ mod tests {
#[test]
fn full_network_without_managed_proxy_skips_network_seccomp_mode() {
assert_eq!(
network_seccomp_mode(&SandboxPolicy::DangerFullAccess, false, false),
network_seccomp_mode(NetworkSandboxPolicy::Enabled, false, false),
None
);
}

View File

@@ -14,6 +14,9 @@ use crate::proxy_routing::activate_proxy_routes_in_netns;
use crate::proxy_routing::prepare_host_proxy_route_spec;
use crate::vendored_bwrap::exec_vendored_bwrap;
use crate::vendored_bwrap::run_vendored_bwrap_main;
use codex_protocol::protocol::FileSystemSandboxPolicy;
use codex_protocol::protocol::NetworkSandboxPolicy;
use codex_protocol::protocol::SandboxPolicy;
#[derive(Debug, Parser)]
/// CLI surface for the Linux sandbox helper.
@@ -26,8 +29,18 @@ pub struct LandlockCommand {
#[arg(long = "sandbox-policy-cwd")]
pub sandbox_policy_cwd: PathBuf,
#[arg(long = "sandbox-policy")]
pub sandbox_policy: codex_protocol::protocol::SandboxPolicy,
/// Legacy compatibility policy.
///
/// Newer callers pass split filesystem/network policies as well so the
/// helper can migrate incrementally without breaking older invocations.
#[arg(long = "sandbox-policy", hide = true)]
pub sandbox_policy: Option<SandboxPolicy>,
#[arg(long = "file-system-sandbox-policy", hide = true)]
pub file_system_sandbox_policy: Option<FileSystemSandboxPolicy>,
#[arg(long = "network-sandbox-policy", hide = true)]
pub network_sandbox_policy: Option<NetworkSandboxPolicy>,
/// Opt-in: use the bubblewrap-based Linux sandbox pipeline.
///
@@ -77,6 +90,8 @@ pub fn run_main() -> ! {
let LandlockCommand {
sandbox_policy_cwd,
sandbox_policy,
file_system_sandbox_policy,
network_sandbox_policy,
use_bwrap_sandbox,
apply_seccomp_then_exec,
allow_network_for_proxy,
@@ -89,6 +104,16 @@ pub fn run_main() -> ! {
panic!("No command specified to execute.");
}
ensure_inner_stage_mode_is_valid(apply_seccomp_then_exec, use_bwrap_sandbox);
let EffectiveSandboxPolicies {
sandbox_policy,
file_system_sandbox_policy,
network_sandbox_policy,
} = resolve_sandbox_policies(
sandbox_policy_cwd.as_path(),
sandbox_policy,
file_system_sandbox_policy,
network_sandbox_policy,
);
// Inner stage: apply seccomp/no_new_privs after bubblewrap has already
// established the filesystem view.
@@ -104,6 +129,7 @@ pub fn run_main() -> ! {
let proxy_routing_active = allow_network_for_proxy;
if let Err(e) = apply_sandbox_policy_to_current_thread(
&sandbox_policy,
network_sandbox_policy,
&sandbox_policy_cwd,
false,
allow_network_for_proxy,
@@ -114,9 +140,10 @@ pub fn run_main() -> ! {
exec_or_panic(command);
}
if sandbox_policy.has_full_disk_write_access() && !allow_network_for_proxy {
if file_system_sandbox_policy.has_full_disk_write_access() && !allow_network_for_proxy {
if let Err(e) = apply_sandbox_policy_to_current_thread(
&sandbox_policy,
network_sandbox_policy,
&sandbox_policy_cwd,
false,
allow_network_for_proxy,
@@ -139,17 +166,20 @@ pub fn run_main() -> ! {
} else {
None
};
let inner = build_inner_seccomp_command(
&sandbox_policy_cwd,
&sandbox_policy,
let inner = build_inner_seccomp_command(InnerSeccompCommandArgs {
sandbox_policy_cwd: &sandbox_policy_cwd,
sandbox_policy: &sandbox_policy,
file_system_sandbox_policy: &file_system_sandbox_policy,
network_sandbox_policy,
use_bwrap_sandbox,
allow_network_for_proxy,
proxy_route_spec,
command,
);
});
run_bwrap_with_proc_fallback(
&sandbox_policy_cwd,
&sandbox_policy,
&file_system_sandbox_policy,
network_sandbox_policy,
inner,
!no_proc,
allow_network_for_proxy,
@@ -159,6 +189,7 @@ pub fn run_main() -> ! {
// Legacy path: Landlock enforcement only, when bwrap sandboxing is not enabled.
if let Err(e) = apply_sandbox_policy_to_current_thread(
&sandbox_policy,
network_sandbox_policy,
&sandbox_policy_cwd,
true,
allow_network_for_proxy,
@@ -169,6 +200,59 @@ pub fn run_main() -> ! {
exec_or_panic(command);
}
#[derive(Debug, Clone)]
struct EffectiveSandboxPolicies {
sandbox_policy: SandboxPolicy,
file_system_sandbox_policy: FileSystemSandboxPolicy,
network_sandbox_policy: NetworkSandboxPolicy,
}
fn resolve_sandbox_policies(
sandbox_policy_cwd: &Path,
sandbox_policy: Option<SandboxPolicy>,
file_system_sandbox_policy: Option<FileSystemSandboxPolicy>,
network_sandbox_policy: Option<NetworkSandboxPolicy>,
) -> EffectiveSandboxPolicies {
// Accept either a fully legacy policy, a fully split policy pair, or all
// three views together. Reject partial split-policy input so the helper
// never runs with mismatched filesystem/network state.
let split_policies = match (file_system_sandbox_policy, network_sandbox_policy) {
(Some(file_system_sandbox_policy), Some(network_sandbox_policy)) => {
Some((file_system_sandbox_policy, network_sandbox_policy))
}
(None, None) => None,
_ => panic!("file-system and network sandbox policies must be provided together"),
};
match (sandbox_policy, split_policies) {
(Some(sandbox_policy), Some((file_system_sandbox_policy, network_sandbox_policy))) => {
EffectiveSandboxPolicies {
sandbox_policy,
file_system_sandbox_policy,
network_sandbox_policy,
}
}
(Some(sandbox_policy), None) => EffectiveSandboxPolicies {
file_system_sandbox_policy: FileSystemSandboxPolicy::from(&sandbox_policy),
network_sandbox_policy: NetworkSandboxPolicy::from(&sandbox_policy),
sandbox_policy,
},
(None, Some((file_system_sandbox_policy, network_sandbox_policy))) => {
let sandbox_policy = file_system_sandbox_policy
.to_legacy_sandbox_policy(network_sandbox_policy, sandbox_policy_cwd)
.unwrap_or_else(|err| {
panic!("failed to derive legacy sandbox policy from split policies: {err}")
});
EffectiveSandboxPolicies {
sandbox_policy,
file_system_sandbox_policy,
network_sandbox_policy,
}
}
(None, None) => panic!("missing sandbox policy configuration"),
}
}
fn ensure_inner_stage_mode_is_valid(apply_seccomp_then_exec: bool, use_bwrap_sandbox: bool) {
if apply_seccomp_then_exec && !use_bwrap_sandbox {
panic!("--apply-seccomp-then-exec requires --use-bwrap-sandbox");
@@ -177,15 +261,21 @@ fn ensure_inner_stage_mode_is_valid(apply_seccomp_then_exec: bool, use_bwrap_san
fn run_bwrap_with_proc_fallback(
sandbox_policy_cwd: &Path,
sandbox_policy: &codex_protocol::protocol::SandboxPolicy,
file_system_sandbox_policy: &FileSystemSandboxPolicy,
network_sandbox_policy: NetworkSandboxPolicy,
inner: Vec<String>,
mount_proc: bool,
allow_network_for_proxy: bool,
) -> ! {
let network_mode = bwrap_network_mode(sandbox_policy, allow_network_for_proxy);
let network_mode = bwrap_network_mode(network_sandbox_policy, allow_network_for_proxy);
let mut mount_proc = mount_proc;
if mount_proc && !preflight_proc_mount_support(sandbox_policy_cwd, sandbox_policy, network_mode)
if mount_proc
&& !preflight_proc_mount_support(
sandbox_policy_cwd,
file_system_sandbox_policy,
network_mode,
)
{
eprintln!("codex-linux-sandbox: bwrap could not mount /proc; retrying with --no-proc");
mount_proc = false;
@@ -195,17 +285,22 @@ fn run_bwrap_with_proc_fallback(
mount_proc,
network_mode,
};
let argv = build_bwrap_argv(inner, sandbox_policy, sandbox_policy_cwd, options);
exec_vendored_bwrap(argv);
let bwrap_args = build_bwrap_argv(
inner,
file_system_sandbox_policy,
sandbox_policy_cwd,
options,
);
exec_vendored_bwrap(bwrap_args.args, bwrap_args.preserved_files);
}
fn bwrap_network_mode(
sandbox_policy: &codex_protocol::protocol::SandboxPolicy,
network_sandbox_policy: NetworkSandboxPolicy,
allow_network_for_proxy: bool,
) -> BwrapNetworkMode {
if allow_network_for_proxy {
BwrapNetworkMode::ProxyOnly
} else if sandbox_policy.has_full_network_access() {
} else if network_sandbox_policy.is_enabled() {
BwrapNetworkMode::FullAccess
} else {
BwrapNetworkMode::Isolated
@@ -214,47 +309,56 @@ fn bwrap_network_mode(
fn build_bwrap_argv(
inner: Vec<String>,
sandbox_policy: &codex_protocol::protocol::SandboxPolicy,
file_system_sandbox_policy: &FileSystemSandboxPolicy,
sandbox_policy_cwd: &Path,
options: BwrapOptions,
) -> Vec<String> {
let mut args = create_bwrap_command_args(inner, sandbox_policy, sandbox_policy_cwd, options)
.unwrap_or_else(|err| panic!("error building bubblewrap command: {err:?}"));
) -> crate::bwrap::BwrapArgs {
let mut bwrap_args = create_bwrap_command_args(
inner,
file_system_sandbox_policy,
sandbox_policy_cwd,
options,
)
.unwrap_or_else(|err| panic!("error building bubblewrap command: {err:?}"));
let command_separator_index = args
let command_separator_index = bwrap_args
.args
.iter()
.position(|arg| arg == "--")
.unwrap_or_else(|| panic!("bubblewrap argv is missing command separator '--'"));
args.splice(
bwrap_args.args.splice(
command_separator_index..command_separator_index,
["--argv0".to_string(), "codex-linux-sandbox".to_string()],
);
let mut argv = vec!["bwrap".to_string()];
argv.extend(args);
argv
argv.extend(bwrap_args.args);
crate::bwrap::BwrapArgs {
args: argv,
preserved_files: bwrap_args.preserved_files,
}
}
fn preflight_proc_mount_support(
sandbox_policy_cwd: &Path,
sandbox_policy: &codex_protocol::protocol::SandboxPolicy,
file_system_sandbox_policy: &FileSystemSandboxPolicy,
network_mode: BwrapNetworkMode,
) -> bool {
let preflight_argv =
build_preflight_bwrap_argv(sandbox_policy_cwd, sandbox_policy, network_mode);
build_preflight_bwrap_argv(sandbox_policy_cwd, file_system_sandbox_policy, network_mode);
let stderr = run_bwrap_in_child_capture_stderr(preflight_argv);
!is_proc_mount_failure(stderr.as_str())
}
fn build_preflight_bwrap_argv(
sandbox_policy_cwd: &Path,
sandbox_policy: &codex_protocol::protocol::SandboxPolicy,
file_system_sandbox_policy: &FileSystemSandboxPolicy,
network_mode: BwrapNetworkMode,
) -> Vec<String> {
) -> crate::bwrap::BwrapArgs {
let preflight_command = vec![resolve_true_command()];
build_bwrap_argv(
preflight_command,
sandbox_policy,
file_system_sandbox_policy,
sandbox_policy_cwd,
BwrapOptions {
mount_proc: true,
@@ -283,7 +387,7 @@ fn resolve_true_command() -> String {
/// - We capture stderr from that preflight to match known mount-failure text.
/// We do not stream it because this is a one-shot probe with a trivial
/// command, and reads are bounded to a fixed max size.
fn run_bwrap_in_child_capture_stderr(argv: Vec<String>) -> String {
fn run_bwrap_in_child_capture_stderr(bwrap_args: crate::bwrap::BwrapArgs) -> String {
const MAX_PREFLIGHT_STDERR_BYTES: u64 = 64 * 1024;
let mut pipe_fds = [0; 2];
@@ -312,7 +416,7 @@ fn run_bwrap_in_child_capture_stderr(argv: Vec<String>) -> String {
close_fd_or_panic(write_fd, "close write end in bubblewrap child");
}
let exit_code = run_vendored_bwrap_main(&argv);
let exit_code = run_vendored_bwrap_main(&bwrap_args.args, &bwrap_args.preserved_files);
std::process::exit(exit_code);
}
@@ -358,15 +462,29 @@ fn is_proc_mount_failure(stderr: &str) -> bool {
|| stderr.contains("Permission denied"))
}
/// Build the inner command that applies seccomp after bubblewrap.
fn build_inner_seccomp_command(
sandbox_policy_cwd: &Path,
sandbox_policy: &codex_protocol::protocol::SandboxPolicy,
struct InnerSeccompCommandArgs<'a> {
sandbox_policy_cwd: &'a Path,
sandbox_policy: &'a SandboxPolicy,
file_system_sandbox_policy: &'a FileSystemSandboxPolicy,
network_sandbox_policy: NetworkSandboxPolicy,
use_bwrap_sandbox: bool,
allow_network_for_proxy: bool,
proxy_route_spec: Option<String>,
command: Vec<String>,
) -> Vec<String> {
}
/// Build the inner command that applies seccomp after bubblewrap.
fn build_inner_seccomp_command(args: InnerSeccompCommandArgs<'_>) -> Vec<String> {
let InnerSeccompCommandArgs {
sandbox_policy_cwd,
sandbox_policy,
file_system_sandbox_policy,
network_sandbox_policy,
use_bwrap_sandbox,
allow_network_for_proxy,
proxy_route_spec,
command,
} = args;
let current_exe = match std::env::current_exe() {
Ok(path) => path,
Err(err) => panic!("failed to resolve current executable path: {err}"),
@@ -375,6 +493,14 @@ fn build_inner_seccomp_command(
Ok(json) => json,
Err(err) => panic!("failed to serialize sandbox policy: {err}"),
};
let file_system_policy_json = match serde_json::to_string(file_system_sandbox_policy) {
Ok(json) => json,
Err(err) => panic!("failed to serialize filesystem sandbox policy: {err}"),
};
let network_policy_json = match serde_json::to_string(&network_sandbox_policy) {
Ok(json) => json,
Err(err) => panic!("failed to serialize network sandbox policy: {err}"),
};
let mut inner = vec![
current_exe.to_string_lossy().to_string(),
@@ -382,6 +508,10 @@ fn build_inner_seccomp_command(
sandbox_policy_cwd.to_string_lossy().to_string(),
"--sandbox-policy".to_string(),
policy_json,
"--file-system-sandbox-policy".to_string(),
file_system_policy_json,
"--network-sandbox-policy".to_string(),
network_policy_json,
];
if use_bwrap_sandbox {
inner.push("--use-bwrap-sandbox".to_string());

View File

@@ -1,7 +1,13 @@
#[cfg(test)]
use super::*;
#[cfg(test)]
use codex_protocol::protocol::FileSystemSandboxPolicy;
#[cfg(test)]
use codex_protocol::protocol::NetworkSandboxPolicy;
#[cfg(test)]
use codex_protocol::protocol::SandboxPolicy;
#[cfg(test)]
use pretty_assertions::assert_eq;
#[test]
fn detects_proc_mount_invalid_argument_failure() {
@@ -29,15 +35,17 @@ fn ignores_non_proc_mount_errors() {
#[test]
fn inserts_bwrap_argv0_before_command_separator() {
let sandbox_policy = SandboxPolicy::new_read_only_policy();
let argv = build_bwrap_argv(
vec!["/bin/true".to_string()],
&SandboxPolicy::new_read_only_policy(),
&FileSystemSandboxPolicy::from(&sandbox_policy),
Path::new("/"),
BwrapOptions {
mount_proc: true,
network_mode: BwrapNetworkMode::FullAccess,
},
);
)
.args;
assert_eq!(
argv,
vec![
@@ -63,70 +71,103 @@ fn inserts_bwrap_argv0_before_command_separator() {
#[test]
fn inserts_unshare_net_when_network_isolation_requested() {
let sandbox_policy = SandboxPolicy::new_read_only_policy();
let argv = build_bwrap_argv(
vec!["/bin/true".to_string()],
&SandboxPolicy::new_read_only_policy(),
&FileSystemSandboxPolicy::from(&sandbox_policy),
Path::new("/"),
BwrapOptions {
mount_proc: true,
network_mode: BwrapNetworkMode::Isolated,
},
);
)
.args;
assert!(argv.contains(&"--unshare-net".to_string()));
}
#[test]
fn inserts_unshare_net_when_proxy_only_network_mode_requested() {
let sandbox_policy = SandboxPolicy::new_read_only_policy();
let argv = build_bwrap_argv(
vec!["/bin/true".to_string()],
&SandboxPolicy::new_read_only_policy(),
&FileSystemSandboxPolicy::from(&sandbox_policy),
Path::new("/"),
BwrapOptions {
mount_proc: true,
network_mode: BwrapNetworkMode::ProxyOnly,
},
);
)
.args;
assert!(argv.contains(&"--unshare-net".to_string()));
}
#[test]
fn proxy_only_mode_takes_precedence_over_full_network_policy() {
let mode = bwrap_network_mode(&SandboxPolicy::DangerFullAccess, true);
let mode = bwrap_network_mode(NetworkSandboxPolicy::Enabled, true);
assert_eq!(mode, BwrapNetworkMode::ProxyOnly);
}
#[test]
fn managed_proxy_preflight_argv_is_wrapped_for_full_access_policy() {
let mode = bwrap_network_mode(&SandboxPolicy::DangerFullAccess, true);
let argv = build_preflight_bwrap_argv(Path::new("/"), &SandboxPolicy::DangerFullAccess, mode);
let mode = bwrap_network_mode(NetworkSandboxPolicy::Enabled, true);
let argv = build_preflight_bwrap_argv(
Path::new("/"),
&FileSystemSandboxPolicy::from(&SandboxPolicy::DangerFullAccess),
mode,
)
.args;
assert!(argv.iter().any(|arg| arg == "--"));
}
#[test]
fn managed_proxy_inner_command_includes_route_spec() {
let args = build_inner_seccomp_command(
Path::new("/tmp"),
&SandboxPolicy::new_read_only_policy(),
true,
true,
Some("{\"routes\":[]}".to_string()),
vec!["/bin/true".to_string()],
);
let sandbox_policy = SandboxPolicy::new_read_only_policy();
let args = build_inner_seccomp_command(InnerSeccompCommandArgs {
sandbox_policy_cwd: Path::new("/tmp"),
sandbox_policy: &sandbox_policy,
file_system_sandbox_policy: &FileSystemSandboxPolicy::from(&sandbox_policy),
network_sandbox_policy: NetworkSandboxPolicy::Restricted,
use_bwrap_sandbox: true,
allow_network_for_proxy: true,
proxy_route_spec: Some("{\"routes\":[]}".to_string()),
command: vec!["/bin/true".to_string()],
});
assert!(args.iter().any(|arg| arg == "--proxy-route-spec"));
assert!(args.iter().any(|arg| arg == "{\"routes\":[]}"));
}
#[test]
fn inner_command_includes_split_policy_flags() {
let sandbox_policy = SandboxPolicy::new_read_only_policy();
let args = build_inner_seccomp_command(InnerSeccompCommandArgs {
sandbox_policy_cwd: Path::new("/tmp"),
sandbox_policy: &sandbox_policy,
file_system_sandbox_policy: &FileSystemSandboxPolicy::from(&sandbox_policy),
network_sandbox_policy: NetworkSandboxPolicy::Restricted,
use_bwrap_sandbox: true,
allow_network_for_proxy: false,
proxy_route_spec: None,
command: vec!["/bin/true".to_string()],
});
assert!(args.iter().any(|arg| arg == "--file-system-sandbox-policy"));
assert!(args.iter().any(|arg| arg == "--network-sandbox-policy"));
}
#[test]
fn non_managed_inner_command_omits_route_spec() {
let args = build_inner_seccomp_command(
Path::new("/tmp"),
&SandboxPolicy::new_read_only_policy(),
true,
false,
None,
vec!["/bin/true".to_string()],
);
let sandbox_policy = SandboxPolicy::new_read_only_policy();
let args = build_inner_seccomp_command(InnerSeccompCommandArgs {
sandbox_policy_cwd: Path::new("/tmp"),
sandbox_policy: &sandbox_policy,
file_system_sandbox_policy: &FileSystemSandboxPolicy::from(&sandbox_policy),
network_sandbox_policy: NetworkSandboxPolicy::Restricted,
use_bwrap_sandbox: true,
allow_network_for_proxy: false,
proxy_route_spec: None,
command: vec!["/bin/true".to_string()],
});
assert!(!args.iter().any(|arg| arg == "--proxy-route-spec"));
}
@@ -134,15 +175,71 @@ fn non_managed_inner_command_omits_route_spec() {
#[test]
fn managed_proxy_inner_command_requires_route_spec() {
let result = std::panic::catch_unwind(|| {
build_inner_seccomp_command(
let sandbox_policy = SandboxPolicy::new_read_only_policy();
build_inner_seccomp_command(InnerSeccompCommandArgs {
sandbox_policy_cwd: Path::new("/tmp"),
sandbox_policy: &sandbox_policy,
file_system_sandbox_policy: &FileSystemSandboxPolicy::from(&sandbox_policy),
network_sandbox_policy: NetworkSandboxPolicy::Restricted,
use_bwrap_sandbox: true,
allow_network_for_proxy: true,
proxy_route_spec: None,
command: vec!["/bin/true".to_string()],
})
});
assert!(result.is_err());
}
#[test]
fn resolve_sandbox_policies_derives_split_policies_from_legacy_policy() {
let sandbox_policy = SandboxPolicy::new_read_only_policy();
let resolved =
resolve_sandbox_policies(Path::new("/tmp"), Some(sandbox_policy.clone()), None, None);
assert_eq!(resolved.sandbox_policy, sandbox_policy);
assert_eq!(
resolved.file_system_sandbox_policy,
FileSystemSandboxPolicy::from(&sandbox_policy)
);
assert_eq!(
resolved.network_sandbox_policy,
NetworkSandboxPolicy::from(&sandbox_policy)
);
}
#[test]
fn resolve_sandbox_policies_derives_legacy_policy_from_split_policies() {
let sandbox_policy = SandboxPolicy::new_read_only_policy();
let file_system_sandbox_policy = FileSystemSandboxPolicy::from(&sandbox_policy);
let network_sandbox_policy = NetworkSandboxPolicy::from(&sandbox_policy);
let resolved = resolve_sandbox_policies(
Path::new("/tmp"),
None,
Some(file_system_sandbox_policy.clone()),
Some(network_sandbox_policy),
);
assert_eq!(resolved.sandbox_policy, sandbox_policy);
assert_eq!(
resolved.file_system_sandbox_policy,
file_system_sandbox_policy
);
assert_eq!(resolved.network_sandbox_policy, network_sandbox_policy);
}
#[test]
fn resolve_sandbox_policies_rejects_partial_split_policies() {
let result = std::panic::catch_unwind(|| {
resolve_sandbox_policies(
Path::new("/tmp"),
&SandboxPolicy::new_read_only_policy(),
true,
true,
Some(SandboxPolicy::new_read_only_policy()),
Some(FileSystemSandboxPolicy::default()),
None,
vec!["/bin/true".to_string()],
)
});
assert!(result.is_err());
}

View File

@@ -6,6 +6,7 @@
#[cfg(vendored_bwrap_available)]
mod imp {
use std::ffi::CString;
use std::fs::File;
use std::os::raw::c_char;
unsafe extern "C" {
@@ -27,7 +28,10 @@ mod imp {
///
/// On success, bubblewrap will `execve` into the target program and this
/// function will never return. A return value therefore implies failure.
pub(crate) fn run_vendored_bwrap_main(argv: &[String]) -> libc::c_int {
pub(crate) fn run_vendored_bwrap_main(
argv: &[String],
_preserved_files: &[File],
) -> libc::c_int {
let cstrings = argv_to_cstrings(argv);
let mut argv_ptrs: Vec<*const c_char> = cstrings.iter().map(|arg| arg.as_ptr()).collect();
@@ -39,16 +43,21 @@ mod imp {
}
/// Execute the build-time bubblewrap `main` function with the given argv.
pub(crate) fn exec_vendored_bwrap(argv: Vec<String>) -> ! {
let exit_code = run_vendored_bwrap_main(&argv);
pub(crate) fn exec_vendored_bwrap(argv: Vec<String>, preserved_files: Vec<File>) -> ! {
let exit_code = run_vendored_bwrap_main(&argv, &preserved_files);
std::process::exit(exit_code);
}
}
#[cfg(not(vendored_bwrap_available))]
mod imp {
use std::fs::File;
/// Panics with a clear error when the build-time bwrap path is not enabled.
pub(crate) fn run_vendored_bwrap_main(_argv: &[String]) -> libc::c_int {
pub(crate) fn run_vendored_bwrap_main(
_argv: &[String],
_preserved_files: &[File],
) -> libc::c_int {
panic!(
r#"build-time bubblewrap is not available in this build.
codex-linux-sandbox should always compile vendored bubblewrap on Linux targets.
@@ -60,8 +69,8 @@ Notes:
}
/// Panics with a clear error when the build-time bwrap path is not enabled.
pub(crate) fn exec_vendored_bwrap(_argv: Vec<String>) -> ! {
let _ = run_vendored_bwrap_main(&[]);
pub(crate) fn exec_vendored_bwrap(_argv: Vec<String>, _preserved_files: Vec<File>) -> ! {
let _ = run_vendored_bwrap_main(&[], &[]);
unreachable!("run_vendored_bwrap_main should always panic in this configuration")
}
}

View File

@@ -9,8 +9,13 @@ use codex_core::exec::process_exec_tool_call;
use codex_core::exec_env::create_env;
use codex_core::sandboxing::SandboxPermissions;
use codex_protocol::config_types::WindowsSandboxLevel;
use codex_protocol::permissions::FileSystemAccessMode;
use codex_protocol::permissions::FileSystemPath;
use codex_protocol::permissions::FileSystemSandboxEntry;
use codex_protocol::permissions::FileSystemSandboxPolicy;
use codex_protocol::permissions::FileSystemSpecialPath;
use codex_protocol::permissions::NetworkSandboxPolicy;
use codex_protocol::protocol::ReadOnlyAccess;
use codex_protocol::protocol::SandboxPolicy;
use codex_utils_absolute_path::AbsolutePathBuf;
use pretty_assertions::assert_eq;
@@ -63,13 +68,47 @@ async fn run_cmd_output(
.expect("sandboxed command should execute")
}
#[expect(clippy::expect_used)]
async fn run_cmd_result_with_writable_roots(
cmd: &[&str],
writable_roots: &[PathBuf],
timeout_ms: u64,
use_bwrap_sandbox: bool,
network_access: bool,
) -> Result<codex_core::exec::ExecToolCallOutput> {
let sandbox_policy = SandboxPolicy::WorkspaceWrite {
writable_roots: writable_roots
.iter()
.map(|p| AbsolutePathBuf::try_from(p.as_path()).unwrap())
.collect(),
read_only_access: Default::default(),
network_access,
// Exclude tmp-related folders from writable roots because we need a
// folder that is writable by tests but that we intentionally disallow
// writing to in the sandbox.
exclude_tmpdir_env_var: true,
exclude_slash_tmp: true,
};
let file_system_sandbox_policy = FileSystemSandboxPolicy::from(&sandbox_policy);
let network_sandbox_policy = NetworkSandboxPolicy::from(&sandbox_policy);
run_cmd_result_with_policies(
cmd,
sandbox_policy,
file_system_sandbox_policy,
network_sandbox_policy,
timeout_ms,
use_bwrap_sandbox,
)
.await
}
#[expect(clippy::expect_used)]
async fn run_cmd_result_with_policies(
cmd: &[&str],
sandbox_policy: SandboxPolicy,
file_system_sandbox_policy: FileSystemSandboxPolicy,
network_sandbox_policy: NetworkSandboxPolicy,
timeout_ms: u64,
use_bwrap_sandbox: bool,
) -> Result<codex_core::exec::ExecToolCallOutput> {
let cwd = std::env::current_dir().expect("cwd should exist");
let sandbox_cwd = cwd.clone();
@@ -84,28 +123,14 @@ async fn run_cmd_result_with_writable_roots(
justification: None,
arg0: None,
};
let sandbox_policy = SandboxPolicy::WorkspaceWrite {
writable_roots: writable_roots
.iter()
.map(|p| AbsolutePathBuf::try_from(p.as_path()).unwrap())
.collect(),
read_only_access: Default::default(),
network_access,
// Exclude tmp-related folders from writable roots because we need a
// folder that is writable by tests but that we intentionally disallow
// writing to in the sandbox.
exclude_tmpdir_env_var: true,
exclude_slash_tmp: true,
};
let sandbox_program = env!("CARGO_BIN_EXE_codex-linux-sandbox");
let codex_linux_sandbox_exe = Some(PathBuf::from(sandbox_program));
process_exec_tool_call(
params,
&sandbox_policy,
&FileSystemSandboxPolicy::from(&sandbox_policy),
NetworkSandboxPolicy::from(&sandbox_policy),
&file_system_sandbox_policy,
network_sandbox_policy,
sandbox_cwd.as_path(),
&codex_linux_sandbox_exe,
use_bwrap_sandbox,
@@ -479,6 +504,110 @@ async fn sandbox_blocks_codex_symlink_replacement_attack() {
assert_ne!(codex_output.exit_code, 0);
}
#[tokio::test]
async fn sandbox_blocks_explicit_split_policy_carveouts_under_bwrap() {
if should_skip_bwrap_tests().await {
eprintln!("skipping bwrap test: bwrap sandbox prerequisites are unavailable");
return;
}
let tmpdir = tempfile::tempdir().expect("tempdir");
let blocked = tmpdir.path().join("blocked");
std::fs::create_dir_all(&blocked).expect("create blocked dir");
let blocked_target = blocked.join("secret.txt");
let sandbox_policy = SandboxPolicy::WorkspaceWrite {
writable_roots: vec![AbsolutePathBuf::try_from(tmpdir.path()).expect("absolute tempdir")],
read_only_access: Default::default(),
network_access: true,
exclude_tmpdir_env_var: true,
exclude_slash_tmp: true,
};
let file_system_sandbox_policy = FileSystemSandboxPolicy::restricted(vec![
FileSystemSandboxEntry {
path: FileSystemPath::Path {
path: AbsolutePathBuf::try_from(tmpdir.path()).expect("absolute tempdir"),
},
access: FileSystemAccessMode::Write,
},
FileSystemSandboxEntry {
path: FileSystemPath::Path {
path: AbsolutePathBuf::try_from(blocked.as_path()).expect("absolute blocked dir"),
},
access: FileSystemAccessMode::None,
},
]);
let output = expect_denied(
run_cmd_result_with_policies(
&[
"bash",
"-lc",
&format!("echo denied > {}", blocked_target.to_string_lossy()),
],
sandbox_policy,
file_system_sandbox_policy,
NetworkSandboxPolicy::Enabled,
LONG_TIMEOUT_MS,
true,
)
.await,
"explicit split-policy carveout should be denied under bubblewrap",
);
assert_ne!(output.exit_code, 0);
}
#[tokio::test]
async fn sandbox_blocks_root_read_carveouts_under_bwrap() {
if should_skip_bwrap_tests().await {
eprintln!("skipping bwrap test: bwrap sandbox prerequisites are unavailable");
return;
}
let tmpdir = tempfile::tempdir().expect("tempdir");
let blocked = tmpdir.path().join("blocked");
std::fs::create_dir_all(&blocked).expect("create blocked dir");
let blocked_target = blocked.join("secret.txt");
std::fs::write(&blocked_target, "secret").expect("seed blocked file");
let sandbox_policy = SandboxPolicy::ReadOnly {
access: ReadOnlyAccess::FullAccess,
network_access: true,
};
let file_system_sandbox_policy = FileSystemSandboxPolicy::restricted(vec![
FileSystemSandboxEntry {
path: FileSystemPath::Special {
value: FileSystemSpecialPath::Root,
},
access: FileSystemAccessMode::Read,
},
FileSystemSandboxEntry {
path: FileSystemPath::Path {
path: AbsolutePathBuf::try_from(blocked.as_path()).expect("absolute blocked dir"),
},
access: FileSystemAccessMode::None,
},
]);
let output = expect_denied(
run_cmd_result_with_policies(
&[
"bash",
"-lc",
&format!("cat {}", blocked_target.to_string_lossy()),
],
sandbox_policy,
file_system_sandbox_policy,
NetworkSandboxPolicy::Enabled,
LONG_TIMEOUT_MS,
true,
)
.await,
"root-read carveout should be denied under bubblewrap",
);
assert_ne!(output.exit_code, 0);
}
#[tokio::test]
async fn sandbox_blocks_ssh() {
// Force ssh to attempt a real TCP connection but fail quickly. `BatchMode`

View File

@@ -123,6 +123,25 @@ impl Default for FileSystemSandboxPolicy {
}
impl FileSystemSandboxPolicy {
fn has_root_access(&self, predicate: impl Fn(FileSystemAccessMode) -> bool) -> bool {
matches!(self.kind, FileSystemSandboxKind::Restricted)
&& self.entries.iter().any(|entry| {
matches!(
&entry.path,
FileSystemPath::Special { value }
if matches!(value, FileSystemSpecialPath::Root) && predicate(entry.access)
)
})
}
fn has_explicit_deny_entries(&self) -> bool {
matches!(self.kind, FileSystemSandboxKind::Restricted)
&& self
.entries
.iter()
.any(|entry| entry.access == FileSystemAccessMode::None)
}
pub fn unrestricted() -> Self {
Self {
kind: FileSystemSandboxKind::Unrestricted,
@@ -148,13 +167,10 @@ impl FileSystemSandboxPolicy {
pub fn has_full_disk_read_access(&self) -> bool {
match self.kind {
FileSystemSandboxKind::Unrestricted | FileSystemSandboxKind::ExternalSandbox => true,
FileSystemSandboxKind::Restricted => self.entries.iter().any(|entry| {
matches!(
&entry.path,
FileSystemPath::Special { value }
if matches!(value, FileSystemSpecialPath::Root) && entry.access.can_read()
)
}),
FileSystemSandboxKind::Restricted => {
self.has_root_access(FileSystemAccessMode::can_read)
&& !self.has_explicit_deny_entries()
}
}
}
@@ -162,14 +178,10 @@ impl FileSystemSandboxPolicy {
pub fn has_full_disk_write_access(&self) -> bool {
match self.kind {
FileSystemSandboxKind::Unrestricted | FileSystemSandboxKind::ExternalSandbox => true,
FileSystemSandboxKind::Restricted => self.entries.iter().any(|entry| {
matches!(
&entry.path,
FileSystemPath::Special { value }
if matches!(value, FileSystemSpecialPath::Root)
&& entry.access.can_write()
)
}),
FileSystemSandboxKind::Restricted => {
self.has_root_access(FileSystemAccessMode::can_write)
&& !self.has_explicit_deny_entries()
}
}
}
@@ -194,11 +206,24 @@ impl FileSystemSandboxPolicy {
}
let cwd_absolute = AbsolutePathBuf::from_absolute_path(cwd).ok();
let mut readable_roots = Vec::new();
if self.has_root_access(FileSystemAccessMode::can_read)
&& let Some(cwd_absolute) = cwd_absolute.as_ref()
{
readable_roots.push(absolute_root_path_for_cwd(cwd_absolute));
}
dedup_absolute_paths(
self.entries
.iter()
.filter(|entry| entry.access.can_read())
.filter_map(|entry| resolve_file_system_path(&entry.path, cwd_absolute.as_ref()))
readable_roots
.into_iter()
.chain(
self.entries
.iter()
.filter(|entry| entry.access.can_read())
.filter_map(|entry| {
resolve_file_system_path(&entry.path, cwd_absolute.as_ref())
}),
)
.collect(),
)
}
@@ -212,11 +237,24 @@ impl FileSystemSandboxPolicy {
let cwd_absolute = AbsolutePathBuf::from_absolute_path(cwd).ok();
let unreadable_roots = self.get_unreadable_roots_with_cwd(cwd);
let mut writable_roots = Vec::new();
if self.has_root_access(FileSystemAccessMode::can_write)
&& let Some(cwd_absolute) = cwd_absolute.as_ref()
{
writable_roots.push(absolute_root_path_for_cwd(cwd_absolute));
}
dedup_absolute_paths(
self.entries
.iter()
.filter(|entry| entry.access.can_write())
.filter_map(|entry| resolve_file_system_path(&entry.path, cwd_absolute.as_ref()))
writable_roots
.into_iter()
.chain(
self.entries
.iter()
.filter(|entry| entry.access.can_write())
.filter_map(|entry| {
resolve_file_system_path(&entry.path, cwd_absolute.as_ref())
}),
)
.collect(),
)
.into_iter()
@@ -543,6 +581,16 @@ fn resolve_file_system_path(
}
}
fn absolute_root_path_for_cwd(cwd: &AbsolutePathBuf) -> AbsolutePathBuf {
let root = cwd
.as_path()
.ancestors()
.last()
.unwrap_or_else(|| panic!("cwd must have a filesystem root"));
AbsolutePathBuf::from_absolute_path(root)
.unwrap_or_else(|err| panic!("cwd root must be an absolute path: {err}"))
}
fn resolve_file_system_special_path(
value: &FileSystemSpecialPath,
cwd: Option<&AbsolutePathBuf>,

View File

@@ -727,6 +727,22 @@ impl FromStr for SandboxPolicy {
}
}
impl FromStr for FileSystemSandboxPolicy {
type Err = serde_json::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
serde_json::from_str(s)
}
}
impl FromStr for NetworkSandboxPolicy {
type Err = serde_json::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
serde_json::from_str(s)
}
}
impl SandboxPolicy {
/// Returns a policy with read-only disk access and no network.
pub fn new_read_only_policy() -> Self {
@@ -3177,6 +3193,7 @@ mod tests {
use codex_utils_absolute_path::AbsolutePathBuf;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::path::PathBuf;
use tempfile::NamedTempFile;
use tempfile::TempDir;
@@ -3335,6 +3352,56 @@ mod tests {
assert!(writable.has_full_disk_write_access());
}
#[test]
fn restricted_file_system_policy_treats_root_with_carveouts_as_scoped_access() {
let cwd = TempDir::new().expect("tempdir");
let cwd_absolute =
AbsolutePathBuf::from_absolute_path(cwd.path()).expect("absolute tempdir");
let root = cwd_absolute
.as_path()
.ancestors()
.last()
.and_then(|path| AbsolutePathBuf::from_absolute_path(path).ok())
.expect("filesystem root");
let blocked = AbsolutePathBuf::resolve_path_against_base("blocked", cwd.path())
.expect("resolve blocked");
let policy = FileSystemSandboxPolicy::restricted(vec![
FileSystemSandboxEntry {
path: FileSystemPath::Special {
value: FileSystemSpecialPath::Root,
},
access: FileSystemAccessMode::Write,
},
FileSystemSandboxEntry {
path: FileSystemPath::Path {
path: blocked.clone(),
},
access: FileSystemAccessMode::None,
},
]);
assert!(!policy.has_full_disk_read_access());
assert!(!policy.has_full_disk_write_access());
assert_eq!(
policy.get_readable_roots_with_cwd(cwd.path()),
vec![root.clone()]
);
assert_eq!(
policy.get_unreadable_roots_with_cwd(cwd.path()),
vec![blocked.clone()]
);
let writable_roots = policy.get_writable_roots_with_cwd(cwd.path());
assert_eq!(writable_roots.len(), 1);
assert_eq!(writable_roots[0].root, root);
assert!(
writable_roots[0]
.read_only_subpaths
.iter()
.any(|path| path.as_path() == blocked.as_path())
);
}
#[test]
fn restricted_file_system_policy_derives_effective_paths() {
let cwd = TempDir::new().expect("tempdir");

145
sdk/python/README.md Normal file
View File

@@ -0,0 +1,145 @@
# Codex App Server Python SDK
Experimental Python SDK for `codex app-server` JSON-RPC v2.
It gives you a small typed API for:
- starting or resuming threads
- creating turns from Python
- streaming events or waiting for a final `TurnResult`
- using the same shape in sync and async code
## Experimental
This SDK is still experimental.
- it is not published yet
- API details may still change before the first release
- packaging and release workflow are still evolving
Use it for local development, dogfooding, and iteration inside this repo. Do not treat it as a stable public package yet.
## What You Need
- Python `>=3.10`
- local Codex auth/session already configured
- this repo checked out locally
## Install From Source
```bash
cd sdk/python
python -m pip install -e .
```
The package includes bundled Codex runtime binaries and automatically selects the binary for the current platform through `AppServerConfig().codex_bin`.
## Core Model
The public API is intentionally small:
- `Codex` / `AsyncCodex`: session entrypoint
- `Thread` / `AsyncThread`: a conversation thread
- `Turn` / `AsyncTurn`: one user turn within a thread
- `TurnResult`: final status, text, items, and usage
Typical flow:
1. create a `Codex` client
2. start or resume a thread
3. create a turn from input
4. call `run()` or iterate `stream()`
## Quickstart
### Sync
```python
from codex_app_server import Codex, TextInput
with Codex() as codex:
thread = codex.thread_start(
model="gpt-5",
config={"model_reasoning_effort": "high"},
)
result = thread.turn(TextInput("Say hello in one sentence.")).run()
print("status:", result.status)
print("text:", result.text)
```
### Async
```python
import asyncio
from codex_app_server import AsyncCodex, TextInput
async def main() -> None:
async with AsyncCodex() as codex:
thread = await codex.thread_start(
model="gpt-5",
config={"model_reasoning_effort": "high"},
)
turn = await thread.turn(TextInput("Say hello in one sentence."))
result = await turn.run()
print("status:", result.status)
print("text:", result.text)
asyncio.run(main())
```
## Current Limitations
- Only one active `Turn.stream()` or `Turn.run()` consumer is supported per client instance.
- Starting a second active turn consumer on the same `Codex` or `AsyncCodex` raises `RuntimeError`.
- `Codex()` is eager and performs startup plus `initialize` in the constructor.
## Behavior Notes
- `AsyncCodex` is intended to be used with `async with AsyncCodex() as codex:`.
- `TurnResult.text` prefers streamed assistant deltas and falls back to completed raw response items when no deltas are emitted.
- For transient overload handling, use `retry_on_overload(...)`.
## Learn By Example
Runnable examples:
```bash
cd sdk/python
python examples/01_quickstart_constructor/sync.py
python examples/01_quickstart_constructor/async.py
```
More docs:
- Getting started: `docs/getting-started.md`
- API reference: `docs/api-reference.md`
- FAQ and pitfalls: `docs/faq.md`
- Examples index: `examples/README.md`
- Notebook walkthrough: `notebooks/sdk_walkthrough.ipynb`
## Maintainer Workflow
Refresh bundled binaries and generated artifacts with:
```bash
cd sdk/python
python scripts/update_sdk_artifacts.py --channel stable --bundle-all-platforms
```
or:
```bash
cd sdk/python
python scripts/update_sdk_artifacts.py --channel alpha --bundle-all-platforms
```
## Compatibility
- Package name: `codex-app-server-sdk`
- SDK version in this repo: `0.2.0`
- Target protocol: Codex `app-server` JSON-RPC v2

View File

@@ -0,0 +1,180 @@
# Codex App Server SDK — API Reference
Public surface of `codex_app_server` for app-server v2.
This SDK surface is experimental. The current implementation intentionally allows only one active `Turn.stream()` or `Turn.run()` consumer per client instance at a time.
## Package Entry
```python
from codex_app_server import (
Codex,
AsyncCodex,
Thread,
AsyncThread,
Turn,
AsyncTurn,
TurnResult,
InitializeResult,
Input,
InputItem,
TextInput,
ImageInput,
LocalImageInput,
SkillInput,
MentionInput,
ThreadItem,
TurnStatus,
)
```
- Version: `codex_app_server.__version__`
- Requires Python >= 3.10
## Codex (sync)
```python
Codex(config: AppServerConfig | None = None)
```
Properties/methods:
- `metadata -> InitializeResult`
- `close() -> None`
- `thread_start(*, approval_policy=None, base_instructions=None, config=None, cwd=None, developer_instructions=None, ephemeral=None, model=None, model_provider=None, personality=None, sandbox=None) -> Thread`
- `thread_list(*, archived=None, cursor=None, cwd=None, limit=None, model_providers=None, sort_key=None, source_kinds=None) -> ThreadListResponse`
- `thread_resume(thread_id: str, *, approval_policy=None, base_instructions=None, config=None, cwd=None, developer_instructions=None, model=None, model_provider=None, personality=None, sandbox=None) -> Thread`
- `thread_fork(thread_id: str, *, approval_policy=None, base_instructions=None, config=None, cwd=None, developer_instructions=None, model=None, model_provider=None, sandbox=None) -> Thread`
- `thread_archive(thread_id: str) -> ThreadArchiveResponse`
- `thread_unarchive(thread_id: str) -> Thread`
- `models(*, include_hidden: bool = False) -> ModelListResponse`
Context manager:
```python
with Codex() as codex:
...
```
## AsyncCodex (async parity)
```python
AsyncCodex(config: AppServerConfig | None = None)
```
Properties/methods:
- `metadata -> InitializeResult`
- `close() -> Awaitable[None]`
- `thread_start(*, approval_policy=None, base_instructions=None, config=None, cwd=None, developer_instructions=None, ephemeral=None, model=None, model_provider=None, personality=None, sandbox=None) -> Awaitable[AsyncThread]`
- `thread_list(*, archived=None, cursor=None, cwd=None, limit=None, model_providers=None, sort_key=None, source_kinds=None) -> Awaitable[ThreadListResponse]`
- `thread_resume(thread_id: str, *, approval_policy=None, base_instructions=None, config=None, cwd=None, developer_instructions=None, model=None, model_provider=None, personality=None, sandbox=None) -> Awaitable[AsyncThread]`
- `thread_fork(thread_id: str, *, approval_policy=None, base_instructions=None, config=None, cwd=None, developer_instructions=None, model=None, model_provider=None, sandbox=None) -> Awaitable[AsyncThread]`
- `thread_archive(thread_id: str) -> Awaitable[ThreadArchiveResponse]`
- `thread_unarchive(thread_id: str) -> Awaitable[AsyncThread]`
- `models(*, include_hidden: bool = False) -> Awaitable[ModelListResponse]`
Async context manager:
```python
async with AsyncCodex() as codex:
...
```
## Thread / AsyncThread
`Thread` and `AsyncThread` share the same shape and intent.
### Thread
- `turn(input: Input, *, approval_policy=None, cwd=None, effort=None, model=None, output_schema=None, personality=None, sandbox_policy=None, summary=None) -> Turn`
- `read(*, include_turns: bool = False) -> ThreadReadResponse`
- `set_name(name: str) -> ThreadSetNameResponse`
- `compact() -> ThreadCompactStartResponse`
### AsyncThread
- `turn(input: Input, *, approval_policy=None, cwd=None, effort=None, model=None, output_schema=None, personality=None, sandbox_policy=None, summary=None) -> Awaitable[AsyncTurn]`
- `read(*, include_turns: bool = False) -> Awaitable[ThreadReadResponse]`
- `set_name(name: str) -> Awaitable[ThreadSetNameResponse]`
- `compact() -> Awaitable[ThreadCompactStartResponse]`
## Turn / AsyncTurn
### Turn
- `steer(input: Input) -> TurnSteerResponse`
- `interrupt() -> TurnInterruptResponse`
- `stream() -> Iterator[Notification]`
- `run() -> TurnResult`
Behavior notes:
- `stream()` and `run()` are exclusive per client instance in the current experimental build
- starting a second turn consumer on the same `Codex` instance raises `RuntimeError`
### AsyncTurn
- `steer(input: Input) -> Awaitable[TurnSteerResponse]`
- `interrupt() -> Awaitable[TurnInterruptResponse]`
- `stream() -> AsyncIterator[Notification]`
- `run() -> Awaitable[TurnResult]`
Behavior notes:
- `stream()` and `run()` are exclusive per client instance in the current experimental build
- starting a second turn consumer on the same `AsyncCodex` instance raises `RuntimeError`
## TurnResult
```python
@dataclass
class TurnResult:
thread_id: str
turn_id: str
status: TurnStatus
error: TurnError | None
text: str
items: list[ThreadItem]
usage: ThreadTokenUsageUpdatedNotification | None
```
## Inputs
```python
@dataclass class TextInput: text: str
@dataclass class ImageInput: url: str
@dataclass class LocalImageInput: path: str
@dataclass class SkillInput: name: str; path: str
@dataclass class MentionInput: name: str; path: str
InputItem = TextInput | ImageInput | LocalImageInput | SkillInput | MentionInput
Input = list[InputItem] | InputItem
```
## Retry + errors
```python
from codex_app_server import (
retry_on_overload,
JsonRpcError,
MethodNotFoundError,
InvalidParamsError,
ServerBusyError,
is_retryable_error,
)
```
- `retry_on_overload(...)` retries transient overload errors with exponential backoff + jitter.
- `is_retryable_error(exc)` checks if an exception is transient/overload-like.
## Example
```python
from codex_app_server import Codex, TextInput
with Codex() as codex:
thread = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
result = thread.turn(TextInput("Say hello in one sentence.")).run()
print(result.text)
```

83
sdk/python/docs/faq.md Normal file
View File

@@ -0,0 +1,83 @@
# FAQ
## Thread vs turn
- A `Thread` is conversation state.
- A `Turn` is one model execution inside that thread.
- Multi-turn chat means multiple turns on the same `Thread`.
## `run()` vs `stream()`
- `Turn.run()` / `AsyncTurn.run()` is the easiest path. It consumes events until completion and returns `TurnResult`.
- `Turn.stream()` / `AsyncTurn.stream()` yields raw notifications (`Notification`) so you can react event-by-event.
Choose `run()` for most apps. Choose `stream()` for progress UIs, custom timeout logic, or custom parsing.
## Sync vs async clients
- `Codex` is the sync public API.
- `AsyncCodex` is an async replica of the same public API shape.
If your app is not already async, stay with `Codex`.
## Public kwargs are snake_case
Public API keyword names are snake_case. The SDK still maps them to wire camelCase under the hood.
If you are migrating older code, update these names:
- `approvalPolicy` -> `approval_policy`
- `baseInstructions` -> `base_instructions`
- `developerInstructions` -> `developer_instructions`
- `modelProvider` -> `model_provider`
- `modelProviders` -> `model_providers`
- `sortKey` -> `sort_key`
- `sourceKinds` -> `source_kinds`
- `outputSchema` -> `output_schema`
- `sandboxPolicy` -> `sandbox_policy`
## Why only `thread_start(...)` and `thread_resume(...)`?
The public API keeps only explicit lifecycle calls:
- `thread_start(...)` to create new threads
- `thread_resume(thread_id, ...)` to continue existing threads
This avoids duplicate ways to do the same operation and keeps behavior explicit.
## Why does constructor fail?
`Codex()` is eager: it starts transport and calls `initialize` in `__init__`.
Common causes:
- bundled runtime binary missing for your OS/arch under `src/codex_app_server/bin/*`
- local auth/session is missing
- incompatible/old app-server
Maintainers can refresh bundled binaries with:
```bash
cd sdk/python
python scripts/update_sdk_artifacts.py --channel stable --bundle-all-platforms
```
## Why does a turn "hang"?
A turn is complete only when `turn/completed` arrives for that turn ID.
- `run()` waits for this automatically.
- With `stream()`, keep consuming notifications until completion.
## How do I retry safely?
Use `retry_on_overload(...)` for transient overload failures (`ServerBusyError`).
Do not blindly retry all errors. For `InvalidParamsError` or `MethodNotFoundError`, fix inputs/version compatibility instead.
## Common pitfalls
- Starting a new thread for every prompt when you wanted continuity.
- Forgetting to `close()` (or not using context managers).
- Ignoring `TurnResult.status` and `TurnResult.error`.
- Mixing SDK input classes with raw dicts incorrectly.

View File

@@ -0,0 +1,96 @@
# Getting Started
This is the fastest path from install to a multi-turn thread using the public SDK surface.
The SDK is experimental. Treat the API, bundled runtime strategy, and packaging details as unstable until the first public release.
## 1) Install
From repo root:
```bash
cd sdk/python
python -m pip install -e .
```
Requirements:
- Python `>=3.10`
- bundled runtime binary for your platform (shipped in package)
- local Codex auth/session configured
## 2) Run your first turn (sync)
```python
from codex_app_server import Codex, TextInput
with Codex() as codex:
print("Server:", codex.metadata.server_name, codex.metadata.server_version)
thread = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
result = thread.turn(TextInput("Say hello in one sentence.")).run()
print("Thread:", result.thread_id)
print("Turn:", result.turn_id)
print("Status:", result.status)
print("Text:", result.text)
```
What happened:
- `Codex()` started and initialized `codex app-server`.
- `thread_start(...)` created a thread.
- `turn(...).run()` consumed events until `turn/completed` and returned a `TurnResult`.
- one client can have only one active `Turn.stream()` / `Turn.run()` consumer at a time in the current experimental build
## 3) Continue the same thread (multi-turn)
```python
from codex_app_server import Codex, TextInput
with Codex() as codex:
thread = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
first = thread.turn(TextInput("Summarize Rust ownership in 2 bullets.")).run()
second = thread.turn(TextInput("Now explain it to a Python developer.")).run()
print("first:", first.text)
print("second:", second.text)
```
## 4) Async parity
```python
import asyncio
from codex_app_server import AsyncCodex, TextInput
async def main() -> None:
async with AsyncCodex() as codex:
thread = await codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
turn = await thread.turn(TextInput("Continue where we left off."))
result = await turn.run()
print(result.text)
asyncio.run(main())
```
## 5) Resume an existing thread
```python
from codex_app_server import Codex, TextInput
THREAD_ID = "thr_123" # replace with a real id
with Codex() as codex:
thread = codex.thread_resume(THREAD_ID)
result = thread.turn(TextInput("Continue where we left off.")).run()
print(result.text)
```
## 6) Next stops
- API surface and signatures: `docs/api-reference.md`
- Common decisions/pitfalls: `docs/faq.md`
- End-to-end runnable examples: `examples/README.md`

View File

@@ -0,0 +1,30 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
import asyncio
from codex_app_server import AsyncCodex, TextInput
async def main() -> None:
async with AsyncCodex() as codex:
print("Server:", codex.metadata.server_name, codex.metadata.server_version)
thread = await codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
turn = await thread.turn(TextInput("Say hello in one sentence."))
result = await turn.run()
print("Status:", result.status)
print("Text:", result.text)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,20 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
from codex_app_server import Codex, TextInput
with Codex() as codex:
print("Server:", codex.metadata.server_name, codex.metadata.server_version)
thread = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
result = thread.turn(TextInput("Say hello in one sentence.")).run()
print("Status:", result.status)
print("Text:", result.text)

View File

@@ -0,0 +1,37 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
import asyncio
from codex_app_server import AsyncCodex, TextInput
async def main() -> None:
async with AsyncCodex() as codex:
thread = await codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
turn = await thread.turn(TextInput("Give 3 bullets about SIMD."))
result = await turn.run()
print("thread_id:", result.thread_id)
print("turn_id:", result.turn_id)
print("status:", result.status)
if result.error is not None:
print("error:", result.error)
print("text:", result.text)
print("items.count:", len(result.items))
if result.usage is None:
raise RuntimeError("missing usage for completed turn")
print("usage.thread_id:", result.usage.threadId)
print("usage.turn_id:", result.usage.turnId)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,28 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
from codex_app_server import Codex, TextInput
with Codex() as codex:
thread = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
result = thread.turn(TextInput("Give 3 bullets about SIMD.")).run()
print("thread_id:", result.thread_id)
print("turn_id:", result.turn_id)
print("status:", result.status)
if result.error is not None:
print("error:", result.error)
print("text:", result.text)
print("items.count:", len(result.items))
if result.usage is None:
raise RuntimeError("missing usage for completed turn")
print("usage.thread_id:", result.usage.threadId)
print("usage.turn_id:", result.usage.turnId)

View File

@@ -0,0 +1,44 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
import asyncio
from codex_app_server import AsyncCodex, TextInput
async def main() -> None:
async with AsyncCodex() as codex:
thread = await codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
turn = await thread.turn(TextInput("Count from 1 to 200 with commas, then one summary sentence."))
# Best effort controls: models can finish quickly, so races are expected.
try:
_ = await turn.steer(TextInput("Keep it brief and stop after 20 numbers."))
print("steer: sent")
except Exception as exc:
print("steer: skipped", type(exc).__name__)
try:
_ = await turn.interrupt()
print("interrupt: sent")
except Exception as exc:
print("interrupt: skipped", type(exc).__name__)
event_count = 0
async for event in turn.stream():
event_count += 1
print(event.method, event.payload)
print("events.count:", event_count)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,36 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
from codex_app_server import Codex, TextInput
with Codex() as codex:
thread = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
turn = thread.turn(TextInput("Count from 1 to 200 with commas, then one summary sentence."))
# Best effort controls: models can finish quickly, so races are expected.
try:
_ = turn.steer(TextInput("Keep it brief and stop after 20 numbers."))
print("steer: sent")
except Exception as exc:
print("steer: skipped", type(exc).__name__)
try:
_ = turn.interrupt()
print("interrupt: sent")
except Exception as exc:
print("interrupt: skipped", type(exc).__name__)
event_count = 0
for event in turn.stream():
event_count += 1
print(event.method, event.payload)
print("events.count:", event_count)

View File

@@ -0,0 +1,28 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
import asyncio
from codex_app_server import AsyncCodex
async def main() -> None:
async with AsyncCodex() as codex:
print("metadata:", codex.metadata)
models = await codex.models(include_hidden=True)
print("models.count:", len(models.data))
if models.data:
print("first model id:", models.data[0].id)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,20 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
from codex_app_server import Codex
with Codex() as codex:
print("metadata:", codex.metadata)
models = codex.models()
print("models.count:", len(models.data))
if models.data:
print("first model id:", models.data[0].id)

View File

@@ -0,0 +1,32 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
import asyncio
from codex_app_server import AsyncCodex, TextInput
async def main() -> None:
async with AsyncCodex() as codex:
original = await codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
first_turn = await original.turn(TextInput("Tell me one fact about Saturn."))
first = await first_turn.run()
print("Created thread:", first.thread_id)
resumed = await codex.thread_resume(first.thread_id)
second_turn = await resumed.turn(TextInput("Continue with one more fact."))
second = await second_turn.run()
print(second.text)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,23 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
from codex_app_server import Codex, TextInput
with Codex() as codex:
# Create an initial thread and turn so we have a real thread to resume.
original = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
first = original.turn(TextInput("Tell me one fact about Saturn.")).run()
print("Created thread:", first.thread_id)
# Resume the existing thread by ID.
resumed = codex.thread_resume(first.thread_id)
second = resumed.turn(TextInput("Continue with one more fact.")).run()
print(second.text)

View File

@@ -0,0 +1,70 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
import asyncio
from codex_app_server import AsyncCodex, TextInput
async def main() -> None:
async with AsyncCodex() as codex:
thread = await codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
first = await (await thread.turn(TextInput("One sentence about structured planning."))).run()
second = await (await thread.turn(TextInput("Now restate it for a junior engineer."))).run()
reopened = await codex.thread_resume(thread.id)
listing_active = await codex.thread_list(limit=20, archived=False)
reading = await reopened.read(include_turns=True)
_ = await reopened.set_name("sdk-lifecycle-demo")
_ = await codex.thread_archive(reopened.id)
listing_archived = await codex.thread_list(limit=20, archived=True)
unarchived = await codex.thread_unarchive(reopened.id)
resumed_info = "n/a"
try:
resumed = await codex.thread_resume(
unarchived.id,
model="gpt-5",
config={"model_reasoning_effort": "high"},
)
resumed_result = await (await resumed.turn(TextInput("Continue in one short sentence."))).run()
resumed_info = f"{resumed_result.turn_id} {resumed_result.status}"
except Exception as exc:
resumed_info = f"skipped({type(exc).__name__})"
forked_info = "n/a"
try:
forked = await codex.thread_fork(unarchived.id, model="gpt-5")
forked_result = await (await forked.turn(TextInput("Take a different angle in one short sentence."))).run()
forked_info = f"{forked_result.turn_id} {forked_result.status}"
except Exception as exc:
forked_info = f"skipped({type(exc).__name__})"
compact_info = "sent"
try:
_ = await unarchived.compact()
except Exception as exc:
compact_info = f"skipped({type(exc).__name__})"
print("Lifecycle OK:", thread.id)
print("first:", first.turn_id, first.status)
print("second:", second.turn_id, second.status)
print("read.turns:", len(reading.thread.turns or []))
print("list.active:", len(listing_active.data))
print("list.archived:", len(listing_archived.data))
print("resumed:", resumed_info)
print("forked:", forked_info)
print("compact:", compact_info)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,63 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
from codex_app_server import Codex, TextInput
with Codex() as codex:
thread = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
first = thread.turn(TextInput("One sentence about structured planning.")).run()
second = thread.turn(TextInput("Now restate it for a junior engineer.")).run()
reopened = codex.thread_resume(thread.id)
listing_active = codex.thread_list(limit=20, archived=False)
reading = reopened.read(include_turns=True)
_ = reopened.set_name("sdk-lifecycle-demo")
_ = codex.thread_archive(reopened.id)
listing_archived = codex.thread_list(limit=20, archived=True)
unarchived = codex.thread_unarchive(reopened.id)
resumed_info = "n/a"
try:
resumed = codex.thread_resume(
unarchived.id,
model="gpt-5",
config={"model_reasoning_effort": "high"},
)
resumed_result = resumed.turn(TextInput("Continue in one short sentence.")).run()
resumed_info = f"{resumed_result.turn_id} {resumed_result.status}"
except Exception as exc:
resumed_info = f"skipped({type(exc).__name__})"
forked_info = "n/a"
try:
forked = codex.thread_fork(unarchived.id, model="gpt-5")
forked_result = forked.turn(TextInput("Take a different angle in one short sentence.")).run()
forked_info = f"{forked_result.turn_id} {forked_result.status}"
except Exception as exc:
forked_info = f"skipped({type(exc).__name__})"
compact_info = "sent"
try:
_ = unarchived.compact()
except Exception as exc:
compact_info = f"skipped({type(exc).__name__})"
print("Lifecycle OK:", thread.id)
print("first:", first.turn_id, first.status)
print("second:", second.turn_id, second.status)
print("read.turns:", len(reading.thread.turns or []))
print("list.active:", len(listing_active.data))
print("list.archived:", len(listing_archived.data))
print("resumed:", resumed_info)
print("forked:", forked_info)
print("compact:", compact_info)

View File

@@ -0,0 +1,35 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
import asyncio
from codex_app_server import AsyncCodex, ImageInput, TextInput
REMOTE_IMAGE_URL = "https://raw.githubusercontent.com/github/explore/main/topics/python/python.png"
async def main() -> None:
async with AsyncCodex() as codex:
thread = await codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
turn = await thread.turn(
[
TextInput("What is in this image? Give 3 bullets."),
ImageInput(REMOTE_IMAGE_URL),
]
)
result = await turn.run()
print("Status:", result.status)
print(result.text)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,26 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
from codex_app_server import Codex, ImageInput, TextInput
REMOTE_IMAGE_URL = "https://raw.githubusercontent.com/github/explore/main/topics/python/python.png"
with Codex() as codex:
thread = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
result = thread.turn(
[
TextInput("What is in this image? Give 3 bullets."),
ImageInput(REMOTE_IMAGE_URL),
]
).run()
print("Status:", result.status)
print(result.text)

View File

@@ -0,0 +1,38 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
import asyncio
from codex_app_server import AsyncCodex, LocalImageInput, TextInput
IMAGE_PATH = Path(__file__).resolve().parents[1] / "assets" / "sample_scene.png"
if not IMAGE_PATH.exists():
raise FileNotFoundError(f"Missing bundled image: {IMAGE_PATH}")
async def main() -> None:
async with AsyncCodex() as codex:
thread = await codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
turn = await thread.turn(
[
TextInput("Read this local image and summarize what you see in 2 bullets."),
LocalImageInput(str(IMAGE_PATH.resolve())),
]
)
result = await turn.run()
print("Status:", result.status)
print(result.text)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,29 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
from codex_app_server import Codex, LocalImageInput, TextInput
IMAGE_PATH = Path(__file__).resolve().parents[1] / "assets" / "sample_scene.png"
if not IMAGE_PATH.exists():
raise FileNotFoundError(f"Missing bundled image: {IMAGE_PATH}")
with Codex() as codex:
thread = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
result = thread.turn(
[
TextInput("Read this local image and summarize what you see in 2 bullets."),
LocalImageInput(str(IMAGE_PATH.resolve())),
]
).run()
print("Status:", result.status)
print(result.text)

View File

@@ -0,0 +1,23 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
from codex_app_server import Codex, TextInput
with Codex() as codex:
print("Server:", codex.metadata.server_name, codex.metadata.server_version)
thread = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
turn = thread.turn(TextInput("Say hello in one sentence."))
result = turn.run()
print("Thread:", result.thread_id)
print("Turn:", result.turn_id)
print("Text:", result.text.strip())

View File

@@ -0,0 +1,91 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
import asyncio
import random
from collections.abc import Awaitable, Callable
from typing import TypeVar
from codex_app_server import (
AsyncCodex,
JsonRpcError,
ServerBusyError,
TextInput,
TurnStatus,
is_retryable_error,
)
ResultT = TypeVar("ResultT")
async def retry_on_overload_async(
op: Callable[[], Awaitable[ResultT]],
*,
max_attempts: int = 3,
initial_delay_s: float = 0.25,
max_delay_s: float = 2.0,
jitter_ratio: float = 0.2,
) -> ResultT:
if max_attempts < 1:
raise ValueError("max_attempts must be >= 1")
delay = initial_delay_s
attempt = 0
while True:
attempt += 1
try:
return await op()
except Exception as exc: # noqa: BLE001
if attempt >= max_attempts or not is_retryable_error(exc):
raise
jitter = delay * jitter_ratio
sleep_for = min(max_delay_s, delay) + random.uniform(-jitter, jitter)
if sleep_for > 0:
await asyncio.sleep(sleep_for)
delay = min(max_delay_s, delay * 2)
async def main() -> None:
async with AsyncCodex() as codex:
thread = await codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
try:
result = await retry_on_overload_async(
_run_turn(thread, "Summarize retry best practices in 3 bullets."),
max_attempts=3,
initial_delay_s=0.25,
max_delay_s=2.0,
)
except ServerBusyError as exc:
print("Server overloaded after retries:", exc.message)
print("Text:")
return
except JsonRpcError as exc:
print(f"JSON-RPC error {exc.code}: {exc.message}")
print("Text:")
return
if result.status == TurnStatus.failed:
print("Turn failed:", result.error)
print("Text:", result.text)
def _run_turn(thread, prompt: str):
async def _inner():
turn = await thread.turn(TextInput(prompt))
return await turn.run()
return _inner
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,40 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
from codex_app_server import (
Codex,
JsonRpcError,
ServerBusyError,
TextInput,
TurnStatus,
retry_on_overload,
)
with Codex() as codex:
thread = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
try:
result = retry_on_overload(
lambda: thread.turn(TextInput("Summarize retry best practices in 3 bullets.")).run(),
max_attempts=3,
initial_delay_s=0.25,
max_delay_s=2.0,
)
except ServerBusyError as exc:
print("Server overloaded after retries:", exc.message)
print("Text:")
except JsonRpcError as exc:
print(f"JSON-RPC error {exc.code}: {exc.message}")
print("Text:")
else:
if result.status == TurnStatus.failed:
print("Turn failed:", result.error)
print("Text:", result.text)

View File

@@ -0,0 +1,96 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
import asyncio
from codex_app_server import (
AsyncCodex,
TextInput,
ThreadTokenUsageUpdatedNotification,
TurnCompletedNotificationPayload,
)
def _status_value(status: object | None) -> str:
return str(getattr(status, "value", status))
def _format_usage(usage: object | None) -> str:
if usage is None:
return "usage> (none)"
last = getattr(usage, "last", None)
total = getattr(usage, "total", None)
if last is None or total is None:
return f"usage> {usage}"
return (
"usage>\n"
f" last: input={last.inputTokens} output={last.outputTokens} reasoning={last.reasoningOutputTokens} total={last.totalTokens} cached={last.cachedInputTokens}\n"
f" total: input={total.inputTokens} output={total.outputTokens} reasoning={total.reasoningOutputTokens} total={total.totalTokens} cached={total.cachedInputTokens}"
)
async def main() -> None:
print("Codex async mini CLI. Type /exit to quit.")
async with AsyncCodex() as codex:
thread = await codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
print("Thread:", thread.id)
while True:
try:
user_input = (await asyncio.to_thread(input, "you> ")).strip()
except EOFError:
break
if not user_input:
continue
if user_input in {"/exit", "/quit"}:
break
turn = await thread.turn(TextInput(user_input))
usage = None
status = None
error = None
printed_delta = False
print("assistant> ", end="", flush=True)
async for event in turn.stream():
payload = event.payload
if event.method == "item/agentMessage/delta":
delta = getattr(payload, "delta", "")
if delta:
print(delta, end="", flush=True)
printed_delta = True
continue
if isinstance(payload, ThreadTokenUsageUpdatedNotification):
usage = payload.tokenUsage
continue
if isinstance(payload, TurnCompletedNotificationPayload):
status = payload.turn.status
error = payload.turn.error
if printed_delta:
print()
else:
print("[no text]")
status_text = _status_value(status)
print(f"assistant.status> {status_text}")
if status_text == "failed":
print("assistant.error>", error)
print(_format_usage(usage))
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,89 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
from codex_app_server import (
Codex,
TextInput,
ThreadTokenUsageUpdatedNotification,
TurnCompletedNotificationPayload,
)
print("Codex mini CLI. Type /exit to quit.")
def _status_value(status: object | None) -> str:
return str(getattr(status, "value", status))
def _format_usage(usage: object | None) -> str:
if usage is None:
return "usage> (none)"
last = getattr(usage, "last", None)
total = getattr(usage, "total", None)
if last is None or total is None:
return f"usage> {usage}"
return (
"usage>\n"
f" last: input={last.inputTokens} output={last.outputTokens} reasoning={last.reasoningOutputTokens} total={last.totalTokens} cached={last.cachedInputTokens}\n"
f" total: input={total.inputTokens} output={total.outputTokens} reasoning={total.reasoningOutputTokens} total={total.totalTokens} cached={total.cachedInputTokens}"
)
with Codex() as codex:
thread = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
print("Thread:", thread.id)
while True:
try:
user_input = input("you> ").strip()
except EOFError:
break
if not user_input:
continue
if user_input in {"/exit", "/quit"}:
break
turn = thread.turn(TextInput(user_input))
usage = None
status = None
error = None
printed_delta = False
print("assistant> ", end="", flush=True)
for event in turn.stream():
payload = event.payload
if event.method == "item/agentMessage/delta":
delta = getattr(payload, "delta", "")
if delta:
print(delta, end="", flush=True)
printed_delta = True
continue
if isinstance(payload, ThreadTokenUsageUpdatedNotification):
usage = payload.tokenUsage
continue
if isinstance(payload, TurnCompletedNotificationPayload):
status = payload.turn.status
error = payload.turn.error
if printed_delta:
print()
else:
print("[no text]")
status_text = _status_value(status)
print(f"assistant.status> {status_text}")
if status_text == "failed":
print("assistant.error>", error)
print(_format_usage(usage))

View File

@@ -0,0 +1,74 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
import asyncio
from codex_app_server import (
AsyncCodex,
TextInput,
TurnAskForApproval,
TurnPersonality,
TurnReasoningEffort,
TurnReasoningSummary,
TurnSandboxPolicy,
)
OUTPUT_SCHEMA = {
"type": "object",
"properties": {
"summary": {"type": "string"},
"actions": {
"type": "array",
"items": {"type": "string"},
},
},
"required": ["summary", "actions"],
"additionalProperties": False,
}
SANDBOX_POLICY = TurnSandboxPolicy.model_validate(
{
"type": "readOnly",
"access": {"type": "fullAccess"},
}
)
SUMMARY = TurnReasoningSummary.model_validate("concise")
PROMPT = (
"Analyze a safe rollout plan for enabling a feature flag in production. "
"Return JSON matching the requested schema."
)
async def main() -> None:
async with AsyncCodex() as codex:
thread = await codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
turn = await thread.turn(
TextInput(PROMPT),
approval_policy=TurnAskForApproval.never,
cwd=str(Path.cwd()),
effort=TurnReasoningEffort.medium,
model="gpt-5",
output_schema=OUTPUT_SCHEMA,
personality=TurnPersonality.pragmatic,
sandbox_policy=SANDBOX_POLICY,
summary=SUMMARY,
)
result = await turn.run()
print("Status:", result.status)
print("Text:", result.text)
print("Usage:", result.usage)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,66 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
from codex_app_server import (
Codex,
TextInput,
TurnAskForApproval,
TurnPersonality,
TurnReasoningEffort,
TurnReasoningSummary,
TurnSandboxPolicy,
)
OUTPUT_SCHEMA = {
"type": "object",
"properties": {
"summary": {"type": "string"},
"actions": {
"type": "array",
"items": {"type": "string"},
},
},
"required": ["summary", "actions"],
"additionalProperties": False,
}
SANDBOX_POLICY = TurnSandboxPolicy.model_validate(
{
"type": "readOnly",
"access": {"type": "fullAccess"},
}
)
SUMMARY = TurnReasoningSummary.model_validate("concise")
PROMPT = (
"Analyze a safe rollout plan for enabling a feature flag in production. "
"Return JSON matching the requested schema."
)
with Codex() as codex:
thread = codex.thread_start(model="gpt-5", config={"model_reasoning_effort": "high"})
turn = thread.turn(
TextInput(PROMPT),
approval_policy=TurnAskForApproval.never,
cwd=str(Path.cwd()),
effort=TurnReasoningEffort.medium,
model="gpt-5",
output_schema=OUTPUT_SCHEMA,
personality=TurnPersonality.pragmatic,
sandbox_policy=SANDBOX_POLICY,
summary=SUMMARY,
)
result = turn.run()
print("Status:", result.status)
print("Text:", result.text)
print("Usage:", result.usage)

View File

@@ -0,0 +1,116 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
import asyncio
from codex_app_server import (
AsyncCodex,
TextInput,
TurnAskForApproval,
TurnPersonality,
TurnReasoningEffort,
TurnReasoningSummary,
TurnSandboxPolicy,
)
REASONING_RANK = {
"none": 0,
"minimal": 1,
"low": 2,
"medium": 3,
"high": 4,
"xhigh": 5,
}
def _pick_highest_model(models):
visible = [m for m in models if not m.hidden] or models
known_names = {m.id for m in visible} | {m.model for m in visible}
top_candidates = [m for m in visible if not (m.upgrade and m.upgrade in known_names)]
pool = top_candidates or visible
return max(pool, key=lambda m: (m.model, m.id))
def _pick_highest_turn_effort(model) -> TurnReasoningEffort:
if not model.supportedReasoningEfforts:
return TurnReasoningEffort.medium
best = max(
model.supportedReasoningEfforts,
key=lambda option: REASONING_RANK.get(option.reasoningEffort.value, -1),
)
return TurnReasoningEffort(best.reasoningEffort.value)
OUTPUT_SCHEMA = {
"type": "object",
"properties": {
"summary": {"type": "string"},
"actions": {
"type": "array",
"items": {"type": "string"},
},
},
"required": ["summary", "actions"],
"additionalProperties": False,
}
SANDBOX_POLICY = TurnSandboxPolicy.model_validate(
{
"type": "readOnly",
"access": {"type": "fullAccess"},
}
)
async def main() -> None:
async with AsyncCodex() as codex:
models = await codex.models(include_hidden=True)
selected_model = _pick_highest_model(models.data)
selected_effort = _pick_highest_turn_effort(selected_model)
print("selected.model:", selected_model.model)
print("selected.effort:", selected_effort.value)
thread = await codex.thread_start(
model=selected_model.model,
config={"model_reasoning_effort": selected_effort.value},
)
first_turn = await thread.turn(
TextInput("Give one short sentence about reliable production releases."),
model=selected_model.model,
effort=selected_effort,
)
first = await first_turn.run()
print("agent.message:", first.text)
print("usage:", first.usage)
second_turn = await thread.turn(
TextInput("Return JSON for a safe feature-flag rollout plan."),
approval_policy=TurnAskForApproval.never,
cwd=str(Path.cwd()),
effort=selected_effort,
model=selected_model.model,
output_schema=OUTPUT_SCHEMA,
personality=TurnPersonality.pragmatic,
sandbox_policy=SANDBOX_POLICY,
summary=TurnReasoningSummary.model_validate("concise"),
)
second = await second_turn.run()
print("agent.message.params:", second.text)
print("usage.params:", second.usage)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,107 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src
ensure_local_sdk_src()
from codex_app_server import (
Codex,
TextInput,
TurnAskForApproval,
TurnPersonality,
TurnReasoningEffort,
TurnReasoningSummary,
TurnSandboxPolicy,
)
REASONING_RANK = {
"none": 0,
"minimal": 1,
"low": 2,
"medium": 3,
"high": 4,
"xhigh": 5,
}
def _pick_highest_model(models):
visible = [m for m in models if not m.hidden] or models
known_names = {m.id for m in visible} | {m.model for m in visible}
top_candidates = [m for m in visible if not (m.upgrade and m.upgrade in known_names)]
pool = top_candidates or visible
return max(pool, key=lambda m: (m.model, m.id))
def _pick_highest_turn_effort(model) -> TurnReasoningEffort:
if not model.supportedReasoningEfforts:
return TurnReasoningEffort.medium
best = max(
model.supportedReasoningEfforts,
key=lambda option: REASONING_RANK.get(option.reasoningEffort.value, -1),
)
return TurnReasoningEffort(best.reasoningEffort.value)
OUTPUT_SCHEMA = {
"type": "object",
"properties": {
"summary": {"type": "string"},
"actions": {
"type": "array",
"items": {"type": "string"},
},
},
"required": ["summary", "actions"],
"additionalProperties": False,
}
SANDBOX_POLICY = TurnSandboxPolicy.model_validate(
{
"type": "readOnly",
"access": {"type": "fullAccess"},
}
)
with Codex() as codex:
models = codex.models(include_hidden=True)
selected_model = _pick_highest_model(models.data)
selected_effort = _pick_highest_turn_effort(selected_model)
print("selected.model:", selected_model.model)
print("selected.effort:", selected_effort.value)
thread = codex.thread_start(
model=selected_model.model,
config={"model_reasoning_effort": selected_effort.value},
)
first = thread.turn(
TextInput("Give one short sentence about reliable production releases."),
model=selected_model.model,
effort=selected_effort,
).run()
print("agent.message:", first.text)
print("usage:", first.usage)
second = thread.turn(
TextInput("Return JSON for a safe feature-flag rollout plan."),
approval_policy=TurnAskForApproval.never,
cwd=str(Path.cwd()),
effort=selected_effort,
model=selected_model.model,
output_schema=OUTPUT_SCHEMA,
personality=TurnPersonality.pragmatic,
sandbox_policy=SANDBOX_POLICY,
summary=TurnReasoningSummary.model_validate("concise"),
).run()
print("agent.message.params:", second.text)
print("usage.params:", second.usage)

View File

@@ -0,0 +1,70 @@
# Python SDK Examples
Each example folder contains runnable versions:
- `sync.py` (public sync surface: `Codex`)
- `async.py` (public async surface: `AsyncCodex`)
All examples intentionally use only public SDK exports from `codex_app_server`.
## Prerequisites
- Python `>=3.10`
- Install SDK dependencies for the same Python interpreter you will use to run examples
Recommended setup (from `sdk/python`):
```bash
python -m venv .venv
source .venv/bin/activate
python -m pip install -U pip
python -m pip install -e .
```
## Run examples
From `sdk/python`:
```bash
python examples/<example-folder>/sync.py
python examples/<example-folder>/async.py
```
The examples bootstrap local imports from `sdk/python/src` automatically, so no `pip install -e .` step is required to run them from this repository checkout.
The only required install step is dependencies for your active interpreter.
## Recommended first run
```bash
python examples/01_quickstart_constructor/sync.py
python examples/01_quickstart_constructor/async.py
```
## Index
- `01_quickstart_constructor/`
- first run / sanity check
- `02_turn_run/`
- inspect full turn output fields
- `03_turn_stream_events/`
- stream and print raw notifications
- `04_models_and_metadata/`
- read server metadata and model list
- `05_existing_thread/`
- resume a real existing thread (created in-script)
- `06_thread_lifecycle_and_controls/`
- thread lifecycle + control calls
- `07_image_and_text/`
- remote image URL + text multimodal turn
- `08_local_image_and_text/`
- local image + text multimodal turn using bundled sample image
- `09_async_parity/`
- parity-style sync flow (see async parity in other examples)
- `10_error_handling_and_retry/`
- overload retry pattern + typed error handling structure
- `11_cli_mini_app/`
- interactive chat loop
- `12_turn_params_kitchen_sink/`
- one turn using most optional `turn(...)` params (sync + async)
- `13_model_select_and_turn_params/`
- list models, pick highest model + highest supported reasoning effort, run turns, print message and usage

View File

@@ -0,0 +1,35 @@
from __future__ import annotations
import importlib.util
import sys
from pathlib import Path
def _ensure_runtime_dependencies(sdk_python_dir: Path) -> None:
if importlib.util.find_spec("pydantic") is not None:
return
python = sys.executable
raise RuntimeError(
"Missing required dependency: pydantic.\n"
f"Interpreter: {python}\n"
"Install dependencies with the same interpreter used to run this example:\n"
f" {python} -m pip install -e {sdk_python_dir}\n"
"If you installed with `pip` from another Python, reinstall using the command above."
)
def ensure_local_sdk_src() -> Path:
"""Add sdk/python/src to sys.path so examples run without installing the package."""
sdk_python_dir = Path(__file__).resolve().parents[1]
src_dir = sdk_python_dir / "src"
package_dir = src_dir / "codex_app_server"
if not package_dir.exists():
raise RuntimeError(f"Could not locate local SDK package at {package_dir}")
_ensure_runtime_dependencies(sdk_python_dir)
src_str = str(src_dir)
if src_str not in sys.path:
sys.path.insert(0, src_str)
return src_dir

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.6 KiB

View File

@@ -0,0 +1,535 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Codex Python SDK Walkthrough\n",
"\n",
"Public SDK surface only (`codex_app_server` root exports)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 1: bootstrap local SDK imports (no installation required)\n",
"import os\n",
"import sys\n",
"from pathlib import Path\n",
"\n",
"if sys.version_info < (3, 10):\n",
" raise RuntimeError(\n",
" f'Notebook requires Python 3.10+; current interpreter is {sys.version.split()[0]}.'\n",
" )\n",
"\n",
"try:\n",
" _ = os.getcwd()\n",
"except FileNotFoundError:\n",
" os.chdir(str(Path.home()))\n",
"\n",
"\n",
"def _is_sdk_python_dir(path: Path) -> bool:\n",
" return (path / 'pyproject.toml').exists() and (path / 'src' / 'codex_app_server').exists()\n",
"\n",
"\n",
"def _iter_home_fallback_candidates(home: Path):\n",
" # bounded depth scan under home to support launching notebooks from unrelated cwd values\n",
" patterns = ('sdk/python', '*/sdk/python', '*/*/sdk/python', '*/*/*/sdk/python')\n",
" for pattern in patterns:\n",
" yield from home.glob(pattern)\n",
"\n",
"\n",
"def _find_sdk_python_dir(start: Path) -> Path | None:\n",
" checked = set()\n",
"\n",
" def _consider(candidate: Path) -> Path | None:\n",
" resolved = candidate.resolve()\n",
" if resolved in checked:\n",
" return None\n",
" checked.add(resolved)\n",
" if _is_sdk_python_dir(resolved):\n",
" return resolved\n",
" return None\n",
"\n",
" for candidate in [start, *start.parents]:\n",
" found = _consider(candidate)\n",
" if found is not None:\n",
" return found\n",
"\n",
" for candidate in [start / 'sdk' / 'python', *(parent / 'sdk' / 'python' for parent in start.parents)]:\n",
" found = _consider(candidate)\n",
" if found is not None:\n",
" return found\n",
"\n",
" env_dir = os.environ.get('CODEX_PYTHON_SDK_DIR')\n",
" if env_dir:\n",
" found = _consider(Path(env_dir).expanduser())\n",
" if found is not None:\n",
" return found\n",
"\n",
" for entry in sys.path:\n",
" if not entry:\n",
" continue\n",
" entry_path = Path(entry).expanduser()\n",
" for candidate in (entry_path, entry_path / 'sdk' / 'python'):\n",
" found = _consider(candidate)\n",
" if found is not None:\n",
" return found\n",
"\n",
" home = Path.home()\n",
" for candidate in _iter_home_fallback_candidates(home):\n",
" found = _consider(candidate)\n",
" if found is not None:\n",
" return found\n",
"\n",
" return None\n",
"\n",
"\n",
"repo_python_dir = _find_sdk_python_dir(Path.cwd())\n",
"if repo_python_dir is None:\n",
" raise RuntimeError('Could not locate sdk/python. Set CODEX_PYTHON_SDK_DIR to your sdk/python path.')\n",
"\n",
"src_dir = repo_python_dir / 'src'\n",
"if str(src_dir) not in sys.path:\n",
" sys.path.insert(0, str(src_dir))\n",
"\n",
"# Force fresh imports after SDK upgrades in the same notebook kernel.\n",
"for module_name in list(sys.modules):\n",
" if module_name == 'codex_app_server' or module_name.startswith('codex_app_server.'):\n",
" sys.modules.pop(module_name, None)\n",
"\n",
"print('Kernel:', sys.executable)\n",
"print('SDK source:', src_dir)\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 2: imports (public only)\n",
"from codex_app_server import (\n",
" AsyncCodex,\n",
" Codex,\n",
" ImageInput,\n",
" LocalImageInput,\n",
" TextInput,\n",
" retry_on_overload,\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 3: simple sync conversation\n",
"with Codex() as codex:\n",
" thread = codex.thread_start(model='gpt-5', config={'model_reasoning_effort': 'high'})\n",
" turn = thread.turn(TextInput('Explain gradient descent in 3 bullets.'))\n",
" result = turn.run()\n",
"\n",
" print('server:', codex.metadata)\n",
" print('status:', result.status)\n",
" print(result.text)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 4: multi-turn continuity in same thread\n",
"with Codex() as codex:\n",
" thread = codex.thread_start(model='gpt-5', config={'model_reasoning_effort': 'high'})\n",
"\n",
" first = thread.turn(TextInput('Give a short summary of transformers.')).run()\n",
" second = thread.turn(TextInput('Now explain that to a high-school student.')).run()\n",
"\n",
" print('first status:', first.status)\n",
" print('second status:', second.status)\n",
" print('second text:', second.text)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 5: full thread lifecycle and branching (sync)\n",
"with Codex() as codex:\n",
" thread = codex.thread_start(model='gpt-5', config={'model_reasoning_effort': 'high'})\n",
" first = thread.turn(TextInput('One sentence about structured planning.')).run()\n",
" second = thread.turn(TextInput('Now restate it for a junior engineer.')).run()\n",
"\n",
" reopened = codex.thread_resume(thread.id)\n",
" listing_active = codex.thread_list(limit=20, archived=False)\n",
" reading = reopened.read(include_turns=True)\n",
"\n",
" _ = reopened.set_name('sdk-lifecycle-demo')\n",
" _ = codex.thread_archive(reopened.id)\n",
" listing_archived = codex.thread_list(limit=20, archived=True)\n",
" unarchived = codex.thread_unarchive(reopened.id)\n",
"\n",
" resumed_info = 'n/a'\n",
" try:\n",
" resumed = codex.thread_resume(\n",
" unarchived.id,\n",
" model='gpt-5',\n",
" config={'model_reasoning_effort': 'high'},\n",
" )\n",
" resumed_result = resumed.turn(TextInput('Continue in one short sentence.')).run()\n",
" resumed_info = f'{resumed_result.turn_id} {resumed_result.status}'\n",
" except Exception as e:\n",
" resumed_info = f'skipped({type(e).__name__})'\n",
"\n",
" forked_info = 'n/a'\n",
" try:\n",
" forked = codex.thread_fork(unarchived.id, model='gpt-5')\n",
" forked_result = forked.turn(TextInput('Take a different angle in one short sentence.')).run()\n",
" forked_info = f'{forked_result.turn_id} {forked_result.status}'\n",
" except Exception as e:\n",
" forked_info = f'skipped({type(e).__name__})'\n",
"\n",
" compact_info = 'sent'\n",
" try:\n",
" _ = unarchived.compact()\n",
" except Exception as e:\n",
" compact_info = f'skipped({type(e).__name__})'\n",
"\n",
" print('Lifecycle OK:', thread.id)\n",
" print('first:', first.turn_id, first.status)\n",
" print('second:', second.turn_id, second.status)\n",
" print('read.turns:', len(reading.thread.turns or []))\n",
" print('list.active:', len(listing_active.data))\n",
" print('list.archived:', len(listing_archived.data))\n",
" print('resumed:', resumed_info)\n",
" print('forked:', forked_info)\n",
" print('compact:', compact_info)\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 5b: one turn with most optional turn params\n",
"from pathlib import Path\n",
"from codex_app_server import (\n",
" TurnAskForApproval,\n",
" TurnPersonality,\n",
" TurnReasoningEffort,\n",
" TurnReasoningSummary,\n",
" TurnSandboxPolicy,\n",
")\n",
"\n",
"output_schema = {\n",
" 'type': 'object',\n",
" 'properties': {\n",
" 'summary': {'type': 'string'},\n",
" 'actions': {'type': 'array', 'items': {'type': 'string'}},\n",
" },\n",
" 'required': ['summary', 'actions'],\n",
" 'additionalProperties': False,\n",
"}\n",
"\n",
"sandbox_policy = TurnSandboxPolicy.model_validate({'type': 'readOnly', 'access': {'type': 'fullAccess'}})\n",
"summary = TurnReasoningSummary.model_validate('concise')\n",
"\n",
"with Codex() as codex:\n",
" thread = codex.thread_start(model='gpt-5', config={'model_reasoning_effort': 'high'})\n",
" turn = thread.turn(\n",
" TextInput('Propose a safe production feature-flag rollout. Return JSON matching the schema.'),\n",
" approval_policy=TurnAskForApproval.never,\n",
" cwd=str(Path.cwd()),\n",
" effort=TurnReasoningEffort.medium,\n",
" model='gpt-5',\n",
" output_schema=output_schema,\n",
" personality=TurnPersonality.pragmatic,\n",
" sandbox_policy=sandbox_policy,\n",
" summary=summary,\n",
" )\n",
" result = turn.run()\n",
"\n",
" print('status:', result.status)\n",
" print(result.text)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 5c: choose highest model + highest supported reasoning, then run turns\n",
"from pathlib import Path\n",
"from codex_app_server import (\n",
" TurnAskForApproval,\n",
" TurnPersonality,\n",
" TurnReasoningEffort,\n",
" TurnReasoningSummary,\n",
" TurnSandboxPolicy,\n",
")\n",
"\n",
"reasoning_rank = {\n",
" 'none': 0,\n",
" 'minimal': 1,\n",
" 'low': 2,\n",
" 'medium': 3,\n",
" 'high': 4,\n",
" 'xhigh': 5,\n",
"}\n",
"\n",
"\n",
"def pick_highest_model(models):\n",
" visible = [m for m in models if not m.hidden] or models\n",
" known_names = {m.id for m in visible} | {m.model for m in visible}\n",
" top_candidates = [m for m in visible if not (m.upgrade and m.upgrade in known_names)]\n",
" pool = top_candidates or visible\n",
" return max(pool, key=lambda m: (m.model, m.id))\n",
"\n",
"\n",
"def pick_highest_turn_effort(model) -> TurnReasoningEffort:\n",
" if not model.supportedReasoningEfforts:\n",
" return TurnReasoningEffort.medium\n",
" best = max(model.supportedReasoningEfforts, key=lambda opt: reasoning_rank.get(opt.reasoningEffort.value, -1))\n",
" return TurnReasoningEffort(best.reasoningEffort.value)\n",
"\n",
"\n",
"output_schema = {\n",
" 'type': 'object',\n",
" 'properties': {\n",
" 'summary': {'type': 'string'},\n",
" 'actions': {'type': 'array', 'items': {'type': 'string'}},\n",
" },\n",
" 'required': ['summary', 'actions'],\n",
" 'additionalProperties': False,\n",
"}\n",
"sandbox_policy = TurnSandboxPolicy.model_validate({'type': 'readOnly', 'access': {'type': 'fullAccess'}})\n",
"\n",
"with Codex() as codex:\n",
" models = codex.models(include_hidden=True)\n",
" selected_model = pick_highest_model(models.data)\n",
" selected_effort = pick_highest_turn_effort(selected_model)\n",
"\n",
" print('selected.model:', selected_model.model)\n",
" print('selected.effort:', selected_effort.value)\n",
"\n",
" thread = codex.thread_start(model=selected_model.model, config={'model_reasoning_effort': selected_effort.value})\n",
"\n",
" first = thread.turn(\n",
" TextInput('Give one short sentence about reliable production releases.'),\n",
" model=selected_model.model,\n",
" effort=selected_effort,\n",
" ).run()\n",
" print('agent.message:', first.text)\n",
" print('usage:', first.usage)\n",
"\n",
" second = thread.turn(\n",
" TextInput('Return JSON for a safe feature-flag rollout plan.'),\n",
" approval_policy=TurnAskForApproval.never,\n",
" cwd=str(Path.cwd()),\n",
" effort=selected_effort,\n",
" model=selected_model.model,\n",
" output_schema=output_schema,\n",
" personality=TurnPersonality.pragmatic,\n",
" sandbox_policy=sandbox_policy,\n",
" summary=TurnReasoningSummary.model_validate('concise'),\n",
" ).run()\n",
" print('agent.message.params:', second.text)\n",
" print('usage.params:', second.usage)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 6: multimodal with remote image\n",
"remote_image_url = 'https://raw.githubusercontent.com/github/explore/main/topics/python/python.png'\n",
"\n",
"with Codex() as codex:\n",
" thread = codex.thread_start(model='gpt-5', config={'model_reasoning_effort': 'high'})\n",
" result = thread.turn([\n",
" TextInput('What do you see in this image? 3 bullets.'),\n",
" ImageInput(remote_image_url),\n",
" ]).run()\n",
"\n",
" print('status:', result.status)\n",
" print(result.text)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 7: multimodal with local image (bundled asset)\n",
"local_image_path = repo_python_dir / 'examples' / 'assets' / 'sample_scene.png'\n",
"if not local_image_path.exists():\n",
" raise FileNotFoundError(f'Missing bundled image: {local_image_path}')\n",
"\n",
"with Codex() as codex:\n",
" thread = codex.thread_start(model='gpt-5', config={'model_reasoning_effort': 'high'})\n",
" result = thread.turn([\n",
" TextInput('Describe this local image in 2 bullets.'),\n",
" LocalImageInput(str(local_image_path.resolve())),\n",
" ]).run()\n",
"\n",
" print('status:', result.status)\n",
" print(result.text)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 8: retry-on-overload pattern\n",
"with Codex() as codex:\n",
" thread = codex.thread_start(model='gpt-5', config={'model_reasoning_effort': 'high'})\n",
"\n",
" result = retry_on_overload(\n",
" lambda: thread.turn(TextInput('List 5 failure modes in distributed systems.')).run(),\n",
" max_attempts=3,\n",
" initial_delay_s=0.25,\n",
" max_delay_s=2.0,\n",
" )\n",
"\n",
" print('status:', result.status)\n",
" print(result.text)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 9: full thread lifecycle and branching (async)\n",
"import asyncio\n",
"\n",
"\n",
"async def async_lifecycle_demo():\n",
" async with AsyncCodex() as codex:\n",
" thread = await codex.thread_start(model='gpt-5', config={'model_reasoning_effort': 'high'})\n",
" first = await (await thread.turn(TextInput('One sentence about structured planning.'))).run()\n",
" second = await (await thread.turn(TextInput('Now restate it for a junior engineer.'))).run()\n",
"\n",
" reopened = await codex.thread_resume(thread.id)\n",
" listing_active = await codex.thread_list(limit=20, archived=False)\n",
" reading = await reopened.read(include_turns=True)\n",
"\n",
" _ = await reopened.set_name('sdk-lifecycle-demo')\n",
" _ = await codex.thread_archive(reopened.id)\n",
" listing_archived = await codex.thread_list(limit=20, archived=True)\n",
" unarchived = await codex.thread_unarchive(reopened.id)\n",
"\n",
" resumed_info = 'n/a'\n",
" try:\n",
" resumed = await codex.thread_resume(\n",
" unarchived.id,\n",
" model='gpt-5',\n",
" config={'model_reasoning_effort': 'high'},\n",
" )\n",
" resumed_result = await (await resumed.turn(TextInput('Continue in one short sentence.'))).run()\n",
" resumed_info = f'{resumed_result.turn_id} {resumed_result.status}'\n",
" except Exception as e:\n",
" resumed_info = f'skipped({type(e).__name__})'\n",
"\n",
" forked_info = 'n/a'\n",
" try:\n",
" forked = await codex.thread_fork(unarchived.id, model='gpt-5')\n",
" forked_result = await (await forked.turn(TextInput('Take a different angle in one short sentence.'))).run()\n",
" forked_info = f'{forked_result.turn_id} {forked_result.status}'\n",
" except Exception as e:\n",
" forked_info = f'skipped({type(e).__name__})'\n",
"\n",
" compact_info = 'sent'\n",
" try:\n",
" _ = await unarchived.compact()\n",
" except Exception as e:\n",
" compact_info = f'skipped({type(e).__name__})'\n",
"\n",
" print('Lifecycle OK:', thread.id)\n",
" print('first:', first.turn_id, first.status)\n",
" print('second:', second.turn_id, second.status)\n",
" print('read.turns:', len(reading.thread.turns or []))\n",
" print('list.active:', len(listing_active.data))\n",
" print('list.archived:', len(listing_archived.data))\n",
" print('resumed:', resumed_info)\n",
" print('forked:', forked_info)\n",
" print('compact:', compact_info)\n",
"\n",
"\n",
"await async_lifecycle_demo()\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 10: async stream + steer + interrupt (best effort)\n",
"import asyncio\n",
"\n",
"\n",
"async def async_stream_demo():\n",
" async with AsyncCodex() as codex:\n",
" thread = await codex.thread_start(model='gpt-5', config={'model_reasoning_effort': 'high'})\n",
" turn = await thread.turn(TextInput('Count from 1 to 200 with commas, then one summary sentence.'))\n",
"\n",
" try:\n",
" _ = await turn.steer(TextInput('Keep it brief and stop after 20 numbers.'))\n",
" print('steer: sent')\n",
" except Exception as e:\n",
" print('steer: skipped', type(e).__name__)\n",
"\n",
" try:\n",
" _ = await turn.interrupt()\n",
" print('interrupt: sent')\n",
" except Exception as e:\n",
" print('interrupt: skipped', type(e).__name__)\n",
"\n",
" event_count = 0\n",
" async for event in turn.stream():\n",
" event_count += 1\n",
" print(event.method, event.payload)\n",
"\n",
" print('events.count:', event_count)\n",
"\n",
"\n",
"await async_stream_demo()\n",
"\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"name": "python",
"version": "3.10+"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

63
sdk/python/pyproject.toml Normal file
View File

@@ -0,0 +1,63 @@
[build-system]
requires = ["hatchling>=1.24.0"]
build-backend = "hatchling.build"
[project]
name = "codex-app-server-sdk"
version = "0.2.0"
description = "Python SDK for Codex app-server v2"
readme = "README.md"
requires-python = ">=3.10"
license = { text = "Apache-2.0" }
authors = [{ name = "OpenClaw Assistant" }]
keywords = ["codex", "json-rpc", "sdk", "llm", "app-server"]
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Topic :: Software Development :: Libraries :: Python Modules",
]
dependencies = ["pydantic>=2.12"]
[project.urls]
Homepage = "https://github.com/openai/codex"
Repository = "https://github.com/openai/codex"
Issues = "https://github.com/openai/codex/issues"
[project.optional-dependencies]
dev = ["pytest>=8.0", "datamodel-code-generator==0.31.2"]
[tool.hatch.build]
exclude = [
".venv/**",
".venv2/**",
".pytest_cache/**",
"dist/**",
"build/**",
]
[tool.hatch.build.targets.wheel]
packages = ["src/codex_app_server"]
include = [
"src/codex_app_server/bin/**",
"src/codex_app_server/py.typed",
]
[tool.hatch.build.targets.sdist]
include = [
"src/codex_app_server/**",
"README.md",
"CHANGELOG.md",
"CONTRIBUTING.md",
"RELEASE_CHECKLIST.md",
"pyproject.toml",
]
[tool.pytest.ini_options]
addopts = "-q"
testpaths = ["tests"]

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,103 @@
from .client import AppServerConfig
from .errors import (
AppServerError,
AppServerRpcError,
InvalidParamsError,
InvalidRequestError,
InternalRpcError,
JsonRpcError,
MethodNotFoundError,
ParseError,
RetryLimitExceededError,
ServerBusyError,
TransportClosedError,
is_retryable_error,
)
from .generated.codex_event_types import CodexEventNotification, CodexEventType
from .generated.v2_types import (
ThreadItem,
ThreadTokenUsageUpdatedNotification,
TurnCompletedNotificationPayload,
)
from .public_api import (
AsyncCodex,
AsyncThread,
AsyncTurn,
Codex,
ImageInput,
InitializeResult,
Input,
InputItem,
LocalImageInput,
MentionInput,
SkillInput,
TextInput,
Thread,
Turn,
TurnResult,
)
from .public_types import (
TurnAskForApproval,
TurnPersonality,
TurnReasoningEffort,
TurnReasoningSummary,
TurnSandboxPolicy,
TurnStatus,
ThreadForkParams,
ThreadListParams,
ThreadResumeParams,
ThreadStartParams,
TurnStartParams,
)
from .retry import retry_on_overload
__version__ = "0.2.0"
__all__ = [
"__version__",
"AppServerConfig",
"Codex",
"AsyncCodex",
"Thread",
"AsyncThread",
"Turn",
"AsyncTurn",
"TurnResult",
"InitializeResult",
"Input",
"InputItem",
"TextInput",
"ImageInput",
"LocalImageInput",
"SkillInput",
"MentionInput",
"ThreadItem",
"ThreadTokenUsageUpdatedNotification",
"TurnCompletedNotificationPayload",
"CodexEventNotification",
"CodexEventType",
"ThreadStartParams",
"ThreadResumeParams",
"ThreadListParams",
"ThreadForkParams",
"TurnAskForApproval",
"TurnReasoningEffort",
"TurnPersonality",
"TurnSandboxPolicy",
"TurnReasoningSummary",
"TurnStatus",
"TurnStartParams",
"retry_on_overload",
"AppServerError",
"TransportClosedError",
"JsonRpcError",
"AppServerRpcError",
"ParseError",
"InvalidRequestError",
"MethodNotFoundError",
"InvalidParamsError",
"InternalRpcError",
"ServerBusyError",
"RetryLimitExceededError",
"is_retryable_error",
]

View File

@@ -0,0 +1,250 @@
from __future__ import annotations
import asyncio
from collections.abc import Iterator
from typing import AsyncIterator, Callable, Iterable, ParamSpec, TypeVar
from pydantic import BaseModel
from .client import AppServerClient, AppServerConfig
from .generated.v2_all.AgentMessageDeltaNotification import AgentMessageDeltaNotification
from .generated.v2_all.ModelListResponse import ModelListResponse
from .generated.v2_all.ThreadArchiveResponse import ThreadArchiveResponse
from .generated.v2_all.ThreadCompactStartResponse import ThreadCompactStartResponse
from .generated.v2_all.ThreadForkParams import ThreadForkParams as V2ThreadForkParams
from .generated.v2_all.ThreadForkResponse import ThreadForkResponse
from .generated.v2_all.ThreadListParams import ThreadListParams as V2ThreadListParams
from .generated.v2_all.ThreadListResponse import ThreadListResponse
from .generated.v2_all.ThreadReadResponse import ThreadReadResponse
from .generated.v2_all.ThreadResumeParams import ThreadResumeParams as V2ThreadResumeParams
from .generated.v2_all.ThreadResumeResponse import ThreadResumeResponse
from .generated.v2_all.ThreadSetNameResponse import ThreadSetNameResponse
from .generated.v2_all.ThreadStartParams import ThreadStartParams as V2ThreadStartParams
from .generated.v2_all.ThreadStartResponse import ThreadStartResponse
from .generated.v2_all.ThreadUnarchiveResponse import ThreadUnarchiveResponse
from .generated.v2_all.TurnCompletedNotification import TurnCompletedNotification
from .generated.v2_all.TurnInterruptResponse import TurnInterruptResponse
from .generated.v2_all.TurnStartParams import TurnStartParams as V2TurnStartParams
from .generated.v2_all.TurnStartResponse import TurnStartResponse
from .generated.v2_all.TurnSteerResponse import TurnSteerResponse
from .models import InitializeResponse, JsonObject, Notification, TextTurnResult
ModelT = TypeVar("ModelT", bound=BaseModel)
ParamsT = ParamSpec("ParamsT")
ReturnT = TypeVar("ReturnT")
class AsyncAppServerClient:
"""Async wrapper around AppServerClient using thread offloading."""
def __init__(self, config: AppServerConfig | None = None) -> None:
self._sync = AppServerClient(config=config)
# Single stdio transport cannot be read safely from multiple threads.
self._transport_lock = asyncio.Lock()
async def __aenter__(self) -> "AsyncAppServerClient":
await self.start()
return self
async def __aexit__(self, _exc_type, _exc, _tb) -> None:
await self.close()
async def _call_sync(
self,
fn: Callable[ParamsT, ReturnT],
/,
*args: ParamsT.args,
**kwargs: ParamsT.kwargs,
) -> ReturnT:
async with self._transport_lock:
return await asyncio.to_thread(fn, *args, **kwargs)
@staticmethod
def _next_from_iterator(
iterator: Iterator[AgentMessageDeltaNotification],
) -> tuple[bool, AgentMessageDeltaNotification | None]:
try:
return True, next(iterator)
except StopIteration:
return False, None
async def start(self) -> None:
await self._call_sync(self._sync.start)
async def close(self) -> None:
await self._call_sync(self._sync.close)
async def initialize(self) -> InitializeResponse:
return await self._call_sync(self._sync.initialize)
def acquire_turn_consumer(self, turn_id: str) -> None:
self._sync.acquire_turn_consumer(turn_id)
def release_turn_consumer(self, turn_id: str) -> None:
self._sync.release_turn_consumer(turn_id)
async def request(
self,
method: str,
params: JsonObject | None,
*,
response_model: type[ModelT],
) -> ModelT:
return await self._call_sync(
self._sync.request,
method,
params,
response_model=response_model,
)
async def thread_start(self, params: V2ThreadStartParams | JsonObject | None = None) -> ThreadStartResponse:
return await self._call_sync(self._sync.thread_start, params)
async def thread_resume(
self,
thread_id: str,
params: V2ThreadResumeParams | JsonObject | None = None,
) -> ThreadResumeResponse:
return await self._call_sync(self._sync.thread_resume, thread_id, params)
async def thread_list(self, params: V2ThreadListParams | JsonObject | None = None) -> ThreadListResponse:
return await self._call_sync(self._sync.thread_list, params)
async def thread_read(self, thread_id: str, include_turns: bool = False) -> ThreadReadResponse:
return await self._call_sync(self._sync.thread_read, thread_id, include_turns)
async def thread_fork(
self,
thread_id: str,
params: V2ThreadForkParams | JsonObject | None = None,
) -> ThreadForkResponse:
return await self._call_sync(self._sync.thread_fork, thread_id, params)
async def thread_archive(self, thread_id: str) -> ThreadArchiveResponse:
return await self._call_sync(self._sync.thread_archive, thread_id)
async def thread_unarchive(self, thread_id: str) -> ThreadUnarchiveResponse:
return await self._call_sync(self._sync.thread_unarchive, thread_id)
async def thread_set_name(self, thread_id: str, name: str) -> ThreadSetNameResponse:
return await self._call_sync(self._sync.thread_set_name, thread_id, name)
async def thread_compact(self, thread_id: str) -> ThreadCompactStartResponse:
return await self._call_sync(self._sync.thread_compact, thread_id)
async def turn_start(
self,
thread_id: str,
input_items: list[JsonObject] | JsonObject | str,
params: V2TurnStartParams | JsonObject | None = None,
) -> TurnStartResponse:
return await self._call_sync(self._sync.turn_start, thread_id, input_items, params)
async def turn_text(
self,
thread_id: str,
text: str,
params: V2TurnStartParams | JsonObject | None = None,
) -> TurnStartResponse:
return await self._call_sync(self._sync.turn_text, thread_id, text, params)
async def turn_interrupt(self, thread_id: str, turn_id: str) -> TurnInterruptResponse:
return await self._call_sync(self._sync.turn_interrupt, thread_id, turn_id)
async def turn_steer(
self,
thread_id: str,
expected_turn_id: str,
input_items: list[JsonObject] | JsonObject | str,
) -> TurnSteerResponse:
return await self._call_sync(
self._sync.turn_steer,
thread_id,
expected_turn_id,
input_items,
)
async def model_list(self, include_hidden: bool = False) -> ModelListResponse:
return await self._call_sync(self._sync.model_list, include_hidden)
async def request_with_retry_on_overload(
self,
method: str,
params: JsonObject | None,
*,
response_model: type[ModelT],
max_attempts: int = 3,
initial_delay_s: float = 0.25,
max_delay_s: float = 2.0,
) -> ModelT:
return await self._call_sync(
self._sync.request_with_retry_on_overload,
method,
params,
response_model=response_model,
max_attempts=max_attempts,
initial_delay_s=initial_delay_s,
max_delay_s=max_delay_s,
)
async def next_notification(self) -> Notification:
return await self._call_sync(self._sync.next_notification)
async def wait_for_turn_completed(self, turn_id: str) -> TurnCompletedNotification:
return await self._call_sync(self._sync.wait_for_turn_completed, turn_id)
async def stream_until_methods(self, methods: Iterable[str] | str) -> list[Notification]:
return await self._call_sync(self._sync.stream_until_methods, methods)
async def run_text_turn(
self,
thread_id: str,
text: str,
params: V2TurnStartParams | JsonObject | None = None,
) -> TextTurnResult:
return await self._call_sync(self._sync.run_text_turn, thread_id, text, params)
async def ask_result(
self,
text: str,
*,
model: str | None = None,
thread_id: str | None = None,
) -> TextTurnResult:
return await self._call_sync(
self._sync.ask_result,
text,
model=model,
thread_id=thread_id,
)
async def ask(
self,
text: str,
*,
model: str | None = None,
thread_id: str | None = None,
) -> TextTurnResult:
return await self._call_sync(
self._sync.ask,
text,
model=model,
thread_id=thread_id,
)
async def stream_text(
self,
thread_id: str,
text: str,
params: V2TurnStartParams | JsonObject | None = None,
) -> AsyncIterator[AgentMessageDeltaNotification]:
async with self._transport_lock:
iterator = self._sync.stream_text(thread_id, text, params)
while True:
has_value, chunk = await asyncio.to_thread(
self._next_from_iterator,
iterator,
)
if not has_value:
break
yield chunk

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,582 @@
from __future__ import annotations
import json
import os
import subprocess
import threading
import uuid
from collections import deque
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, Iterable, Iterator, TypeVar
from pydantic import BaseModel
from .errors import AppServerError, TransportClosedError, map_jsonrpc_error
from .generated.v2_all.AgentMessageDeltaNotification import AgentMessageDeltaNotification
from .generated.v2_all.ModelListResponse import ModelListResponse
from .generated.v2_all.ThreadArchiveResponse import ThreadArchiveResponse
from .generated.v2_all.ThreadCompactStartResponse import ThreadCompactStartResponse
from .generated.v2_all.ThreadForkParams import ThreadForkParams as V2ThreadForkParams
from .generated.v2_all.ThreadForkResponse import ThreadForkResponse
from .generated.v2_all.ThreadListParams import ThreadListParams as V2ThreadListParams
from .generated.v2_all.ThreadListResponse import ThreadListResponse
from .generated.v2_all.ThreadReadResponse import ThreadReadResponse
from .generated.v2_all.ThreadResumeParams import ThreadResumeParams as V2ThreadResumeParams
from .generated.v2_all.ThreadResumeResponse import ThreadResumeResponse
from .generated.v2_all.ThreadSetNameResponse import ThreadSetNameResponse
from .generated.v2_all.ThreadStartParams import ThreadStartParams as V2ThreadStartParams
from .generated.v2_all.ThreadStartResponse import ThreadStartResponse
from .generated.v2_all.ThreadUnarchiveResponse import ThreadUnarchiveResponse
from .generated.v2_all.TurnCompletedNotification import TurnCompletedNotification
from .generated.v2_all.TurnInterruptResponse import TurnInterruptResponse
from .generated.v2_all.TurnStartParams import TurnStartParams as V2TurnStartParams
from .generated.v2_all.TurnStartResponse import TurnStartResponse
from .generated.v2_all.TurnSteerResponse import TurnSteerResponse
from .generated.codex_event_types import CodexEventNotification
from .generated.notification_registry import (
NOTIFICATION_METHOD_ALIASES,
NOTIFICATION_MODELS,
)
from .models import (
InitializeResponse,
JsonObject,
JsonValue,
Notification,
TextTurnResult,
UnknownNotification,
)
from .retry import retry_on_overload
ModelT = TypeVar("ModelT", bound=BaseModel)
ApprovalHandler = Callable[[str, JsonObject | None], JsonObject]
def _params_dict(params: V2ThreadStartParams | V2ThreadResumeParams | V2ThreadListParams | V2ThreadForkParams | V2TurnStartParams | JsonObject | None) -> JsonObject:
if params is None:
return {}
if hasattr(params, "model_dump"):
dumped = params.model_dump(exclude_none=True, mode="json")
if not isinstance(dumped, dict):
raise TypeError("Expected model_dump() to return dict")
return dumped
if isinstance(params, dict):
return params
raise TypeError(f"Expected generated params model or dict, got {type(params).__name__}")
def _bundled_codex_path() -> Path:
import platform
sys_name = platform.system().lower()
machine = platform.machine().lower()
if sys_name.startswith("darwin"):
platform_dir = "darwin-arm64" if machine in {"arm64", "aarch64"} else "darwin-x64"
exe = "codex"
elif sys_name.startswith("linux"):
platform_dir = "linux-arm64" if machine in {"arm64", "aarch64"} else "linux-x64"
exe = "codex"
elif sys_name.startswith("windows") or os.name == "nt":
platform_dir = "windows-arm64" if machine in {"arm64", "aarch64"} else "windows-x64"
exe = "codex.exe"
else:
raise RuntimeError(f"Unsupported OS for bundled codex binary: {sys_name}/{machine}")
return Path(__file__).resolve().parent / "bin" / platform_dir / exe
@dataclass(slots=True)
class AppServerConfig:
codex_bin: str = str(_bundled_codex_path())
launch_args_override: tuple[str, ...] | None = None
config_overrides: tuple[str, ...] = ()
cwd: str | None = None
env: dict[str, str] | None = None
client_name: str = "codex_python_sdk"
client_title: str = "Codex Python SDK"
client_version: str = "0.2.0"
experimental_api: bool = True
class AppServerClient:
"""Synchronous typed JSON-RPC client for `codex app-server` over stdio."""
def __init__(
self,
config: AppServerConfig | None = None,
approval_handler: ApprovalHandler | None = None,
) -> None:
self.config = config or AppServerConfig()
self._approval_handler = approval_handler or self._default_approval_handler
self._proc: subprocess.Popen[str] | None = None
self._lock = threading.Lock()
self._turn_consumer_lock = threading.Lock()
self._active_turn_consumer: str | None = None
self._pending_notifications: deque[Notification] = deque()
self._stderr_lines: deque[str] = deque(maxlen=400)
self._stderr_thread: threading.Thread | None = None
def __enter__(self) -> "AppServerClient":
self.start()
return self
def __exit__(self, _exc_type, _exc, _tb) -> None:
self.close()
def start(self) -> None:
if self._proc is not None:
return
if self.config.launch_args_override is not None:
args = list(self.config.launch_args_override)
else:
codex_bin = Path(self.config.codex_bin)
if not codex_bin.exists():
raise FileNotFoundError(
f"Pinned codex binary not found at {codex_bin}. Run `python scripts/update_sdk_artifacts.py --channel stable` from sdk/python."
)
args = [str(codex_bin)]
for kv in self.config.config_overrides:
args.extend(["--config", kv])
args.extend(["app-server", "--listen", "stdio://"])
env = os.environ.copy()
if self.config.env:
env.update(self.config.env)
self._proc = subprocess.Popen(
args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
cwd=self.config.cwd,
env=env,
bufsize=1,
)
self._start_stderr_drain_thread()
def close(self) -> None:
if self._proc is None:
return
proc = self._proc
self._proc = None
self._active_turn_consumer = None
if proc.stdin:
proc.stdin.close()
try:
proc.terminate()
proc.wait(timeout=2)
except Exception:
proc.kill()
if self._stderr_thread and self._stderr_thread.is_alive():
self._stderr_thread.join(timeout=0.5)
def initialize(self) -> InitializeResponse:
result = self.request(
"initialize",
{
"clientInfo": {
"name": self.config.client_name,
"title": self.config.client_title,
"version": self.config.client_version,
},
"capabilities": {
"experimentalApi": self.config.experimental_api,
},
},
response_model=InitializeResponse,
)
self.notify("initialized", None)
return result
def request(
self,
method: str,
params: JsonObject | None,
*,
response_model: type[ModelT],
) -> ModelT:
result = self._request_raw(method, params)
if not isinstance(result, dict):
raise AppServerError(f"{method} response must be a JSON object")
return response_model.model_validate(result)
def _request_raw(self, method: str, params: JsonObject | None = None) -> JsonValue:
request_id = str(uuid.uuid4())
self._write_message({"id": request_id, "method": method, "params": params or {}})
while True:
msg = self._read_message()
if "method" in msg and "id" in msg:
response = self._handle_server_request(msg)
self._write_message({"id": msg["id"], "result": response})
continue
if "method" in msg and "id" not in msg:
self._pending_notifications.append(self._coerce_notification(msg["method"], msg.get("params")))
continue
if msg.get("id") != request_id:
continue
if "error" in msg:
err = msg["error"]
if isinstance(err, dict):
raise map_jsonrpc_error(
int(err.get("code", -32000)),
str(err.get("message", "unknown")),
err.get("data"),
)
raise AppServerError("Malformed JSON-RPC error response")
return msg.get("result")
def notify(self, method: str, params: JsonObject | None = None) -> None:
self._write_message({"method": method, "params": params or {}})
def next_notification(self) -> Notification:
if self._pending_notifications:
return self._pending_notifications.popleft()
while True:
msg = self._read_message()
if "method" in msg and "id" in msg:
response = self._handle_server_request(msg)
self._write_message({"id": msg["id"], "result": response})
continue
if "method" in msg and "id" not in msg:
return self._coerce_notification(msg["method"], msg.get("params"))
def acquire_turn_consumer(self, turn_id: str) -> None:
with self._turn_consumer_lock:
if self._active_turn_consumer is not None:
raise RuntimeError(
"Concurrent turn consumers are not yet supported in the experimental SDK. "
f"Client is already streaming turn {self._active_turn_consumer!r}; "
f"cannot start turn {turn_id!r} until the active consumer finishes."
)
self._active_turn_consumer = turn_id
def release_turn_consumer(self, turn_id: str) -> None:
with self._turn_consumer_lock:
if self._active_turn_consumer == turn_id:
self._active_turn_consumer = None
def thread_start(self, params: V2ThreadStartParams | JsonObject | None = None) -> ThreadStartResponse:
return self.request("thread/start", _params_dict(params), response_model=ThreadStartResponse)
def thread_resume(
self,
thread_id: str,
params: V2ThreadResumeParams | JsonObject | None = None,
) -> ThreadResumeResponse:
payload = {"threadId": thread_id, **_params_dict(params)}
return self.request("thread/resume", payload, response_model=ThreadResumeResponse)
def thread_list(self, params: V2ThreadListParams | JsonObject | None = None) -> ThreadListResponse:
return self.request("thread/list", _params_dict(params), response_model=ThreadListResponse)
def thread_read(self, thread_id: str, include_turns: bool = False) -> ThreadReadResponse:
return self.request(
"thread/read",
{"threadId": thread_id, "includeTurns": include_turns},
response_model=ThreadReadResponse,
)
def thread_fork(
self,
thread_id: str,
params: V2ThreadForkParams | JsonObject | None = None,
) -> ThreadForkResponse:
payload = {"threadId": thread_id, **_params_dict(params)}
return self.request("thread/fork", payload, response_model=ThreadForkResponse)
def thread_archive(self, thread_id: str) -> ThreadArchiveResponse:
return self.request("thread/archive", {"threadId": thread_id}, response_model=ThreadArchiveResponse)
def thread_unarchive(self, thread_id: str) -> ThreadUnarchiveResponse:
return self.request("thread/unarchive", {"threadId": thread_id}, response_model=ThreadUnarchiveResponse)
def thread_set_name(self, thread_id: str, name: str) -> ThreadSetNameResponse:
return self.request(
"thread/name/set",
{"threadId": thread_id, "name": name},
response_model=ThreadSetNameResponse,
)
def thread_compact(self, thread_id: str) -> ThreadCompactStartResponse:
return self.request(
"thread/compact/start",
{"threadId": thread_id},
response_model=ThreadCompactStartResponse,
)
def turn_start(
self,
thread_id: str,
input_items: list[JsonObject] | JsonObject | str,
params: V2TurnStartParams | JsonObject | None = None,
) -> TurnStartResponse:
payload = {
**_params_dict(params),
"threadId": thread_id,
"input": self._normalize_input_items(input_items),
}
return self.request("turn/start", payload, response_model=TurnStartResponse)
def turn_text(
self,
thread_id: str,
text: str,
params: V2TurnStartParams | JsonObject | None = None,
) -> TurnStartResponse:
return self.turn_start(thread_id, text, params=params)
def turn_interrupt(self, thread_id: str, turn_id: str) -> TurnInterruptResponse:
return self.request(
"turn/interrupt",
{"threadId": thread_id, "turnId": turn_id},
response_model=TurnInterruptResponse,
)
def turn_steer(
self,
thread_id: str,
expected_turn_id: str,
input_items: list[JsonObject] | JsonObject | str,
) -> TurnSteerResponse:
return self.request(
"turn/steer",
{
"threadId": thread_id,
"expectedTurnId": expected_turn_id,
"input": self._normalize_input_items(input_items),
},
response_model=TurnSteerResponse,
)
def model_list(self, include_hidden: bool = False) -> ModelListResponse:
return self.request(
"model/list",
{"includeHidden": include_hidden},
response_model=ModelListResponse,
)
def request_with_retry_on_overload(
self,
method: str,
params: JsonObject | None,
*,
response_model: type[ModelT],
max_attempts: int = 3,
initial_delay_s: float = 0.25,
max_delay_s: float = 2.0,
) -> ModelT:
return retry_on_overload(
lambda: self.request(method, params, response_model=response_model),
max_attempts=max_attempts,
initial_delay_s=initial_delay_s,
max_delay_s=max_delay_s,
)
def wait_for_turn_completed(self, turn_id: str) -> TurnCompletedNotification:
while True:
notification = self.next_notification()
if (
notification.method == "turn/completed"
and isinstance(notification.payload, TurnCompletedNotification)
and notification.payload.turn.id == turn_id
):
return notification.payload
def stream_until_methods(self, methods: Iterable[str] | str) -> list[Notification]:
target_methods = {methods} if isinstance(methods, str) else set(methods)
out: list[Notification] = []
while True:
notification = self.next_notification()
out.append(notification)
if notification.method in target_methods:
return out
def run_text_turn(
self,
thread_id: str,
text: str,
params: V2TurnStartParams | JsonObject | None = None,
) -> TextTurnResult:
started = self.turn_text(thread_id, text, params=params)
turn_id = started.turn.id
deltas: list[AgentMessageDeltaNotification] = []
completed: TurnCompletedNotification | None = None
while True:
notification = self.next_notification()
if (
notification.method == "item/agentMessage/delta"
and isinstance(notification.payload, AgentMessageDeltaNotification)
and notification.payload.turnId == turn_id
):
deltas.append(notification.payload)
continue
if (
notification.method == "turn/completed"
and isinstance(notification.payload, TurnCompletedNotification)
and notification.payload.turn.id == turn_id
):
completed = notification.payload
break
if completed is None:
raise AppServerError("turn/completed notification not received")
return TextTurnResult(
thread_id=thread_id,
turn_id=turn_id,
deltas=deltas,
completed=completed,
)
def ask_result(
self,
text: str,
*,
model: str | None = None,
thread_id: str | None = None,
) -> TextTurnResult:
active_thread_id = thread_id
if active_thread_id is None:
start_params = V2ThreadStartParams(model=model) if model else None
started = self.thread_start(start_params)
active_thread_id = started.thread.id
return self.run_text_turn(active_thread_id, text)
def ask(
self,
text: str,
*,
model: str | None = None,
thread_id: str | None = None,
) -> TextTurnResult:
return self.ask_result(text, model=model, thread_id=thread_id)
def stream_text(
self,
thread_id: str,
text: str,
params: V2TurnStartParams | JsonObject | None = None,
) -> Iterator[AgentMessageDeltaNotification]:
started = self.turn_text(thread_id, text, params=params)
turn_id = started.turn.id
while True:
notification = self.next_notification()
if (
notification.method == "item/agentMessage/delta"
and isinstance(notification.payload, AgentMessageDeltaNotification)
and notification.payload.turnId == turn_id
):
yield notification.payload
continue
if (
notification.method == "turn/completed"
and isinstance(notification.payload, TurnCompletedNotification)
and notification.payload.turn.id == turn_id
):
break
def _coerce_notification(self, method: str, params: object) -> Notification:
params_dict = params if isinstance(params, dict) else {}
if method.startswith("codex/event/"):
event_params = dict(params_dict)
for key in ("id", "conversationId"):
value = event_params.get(key)
if isinstance(value, str) and value.strip() == "":
event_params[key] = None
try:
payload = CodexEventNotification.model_validate(event_params)
except Exception: # noqa: BLE001
return Notification(method=method, payload=UnknownNotification(params=params_dict))
return Notification(method=method, payload=payload)
canonical_method = NOTIFICATION_METHOD_ALIASES.get(method, method)
model = NOTIFICATION_MODELS.get(canonical_method)
if model is None:
# Accept newer server notifications without breaking current SDK flows.
return Notification(method=canonical_method, payload=UnknownNotification(params=params_dict))
try:
payload = model.model_validate(params_dict)
except Exception: # noqa: BLE001
return Notification(method=canonical_method, payload=UnknownNotification(params=params_dict))
return Notification(method=canonical_method, payload=payload)
def _normalize_input_items(
self, input_items: list[JsonObject] | JsonObject | str
) -> list[JsonObject]:
if isinstance(input_items, str):
return [{"type": "text", "text": input_items}]
if isinstance(input_items, dict):
return [input_items]
return input_items
def _default_approval_handler(self, method: str, params: JsonObject | None) -> JsonObject:
if method == "item/commandExecution/requestApproval":
return {"decision": "accept"}
if method == "item/fileChange/requestApproval":
return {"decision": "accept"}
return {}
def _start_stderr_drain_thread(self) -> None:
if self._proc is None or self._proc.stderr is None:
return
def _drain() -> None:
stderr = self._proc.stderr
if stderr is None:
return
for line in stderr:
self._stderr_lines.append(line.rstrip("\n"))
self._stderr_thread = threading.Thread(target=_drain, daemon=True)
self._stderr_thread.start()
def _stderr_tail(self, limit: int = 40) -> str:
return "\n".join(list(self._stderr_lines)[-limit:])
def _handle_server_request(self, msg: JsonObject) -> JsonObject:
method = msg["method"]
params = msg.get("params")
request_params = params if isinstance(params, dict) else None
return self._approval_handler(str(method), request_params)
def _write_message(self, payload: JsonObject) -> None:
if self._proc is None or self._proc.stdin is None:
raise TransportClosedError("app-server is not running")
with self._lock:
self._proc.stdin.write(json.dumps(payload) + "\n")
self._proc.stdin.flush()
def _read_message(self) -> JsonObject:
if self._proc is None or self._proc.stdout is None:
raise TransportClosedError("app-server is not running")
line = self._proc.stdout.readline()
if not line:
raise TransportClosedError(
f"app-server closed stdout. stderr_tail={self._stderr_tail()[:2000]}"
)
try:
payload = json.loads(line)
except json.JSONDecodeError as exc:
raise AppServerError(f"Invalid JSON-RPC line: {line!r}") from exc
if not isinstance(payload, dict):
raise AppServerError(f"Expected JSON object message, got: {type(payload).__name__}")
return payload

View File

@@ -0,0 +1,126 @@
from __future__ import annotations
from typing import Any
class AppServerError(Exception):
"""Base exception for SDK errors."""
class JsonRpcError(AppServerError):
"""Raw JSON-RPC error wrapper from the server."""
def __init__(self, code: int, message: str, data: Any = None):
super().__init__(f"JSON-RPC error {code}: {message}")
self.code = code
self.message = message
self.data = data
class TransportClosedError(AppServerError):
"""Raised when the app-server transport closes unexpectedly."""
class AppServerRpcError(JsonRpcError):
"""Base typed error for JSON-RPC failures."""
class ParseError(AppServerRpcError):
pass
class InvalidRequestError(AppServerRpcError):
pass
class MethodNotFoundError(AppServerRpcError):
pass
class InvalidParamsError(AppServerRpcError):
pass
class InternalRpcError(AppServerRpcError):
pass
class ServerBusyError(AppServerRpcError):
"""Server is overloaded / unavailable and caller should retry."""
class RetryLimitExceededError(ServerBusyError):
"""Server exhausted internal retry budget for a retryable operation."""
def _contains_retry_limit_text(message: str) -> bool:
lowered = message.lower()
return "retry limit" in lowered or "too many failed attempts" in lowered
def _is_server_overloaded(data: Any) -> bool:
if data is None:
return False
if isinstance(data, str):
return data.lower() == "server_overloaded"
if isinstance(data, dict):
direct = (
data.get("codex_error_info")
or data.get("codexErrorInfo")
or data.get("errorInfo")
)
if isinstance(direct, str) and direct.lower() == "server_overloaded":
return True
if isinstance(direct, dict):
for v in direct.values():
if isinstance(v, str) and v.lower() == "server_overloaded":
return True
for v in data.values():
if _is_server_overloaded(v):
return True
if isinstance(data, list):
return any(_is_server_overloaded(v) for v in data)
return False
def map_jsonrpc_error(code: int, message: str, data: Any = None) -> JsonRpcError:
"""Map a raw JSON-RPC error into a richer SDK exception class."""
if code == -32700:
return ParseError(code, message, data)
if code == -32600:
return InvalidRequestError(code, message, data)
if code == -32601:
return MethodNotFoundError(code, message, data)
if code == -32602:
return InvalidParamsError(code, message, data)
if code == -32603:
return InternalRpcError(code, message, data)
# Reserved server-error range in JSON-RPC 2.0.
if -32099 <= code <= -32000:
if _is_server_overloaded(data):
if _contains_retry_limit_text(message):
return RetryLimitExceededError(code, message, data)
return ServerBusyError(code, message, data)
if _contains_retry_limit_text(message):
return RetryLimitExceededError(code, message, data)
return AppServerRpcError(code, message, data)
return JsonRpcError(code, message, data)
def is_retryable_error(exc: BaseException) -> bool:
"""True if the exception is a transient overload-style error."""
if isinstance(exc, ServerBusyError):
return True
if isinstance(exc, JsonRpcError):
return _is_server_overloaded(exc.data)
return False

View File

@@ -0,0 +1 @@
"""Auto-generated protocol/schema Python types."""

View File

@@ -0,0 +1,21 @@
# Auto-generated by scripts/update_sdk_artifacts.py
# DO NOT EDIT MANUALLY.
from __future__ import annotations
from typing import Any, Literal
from pydantic import BaseModel, ConfigDict
CodexEventType = Literal['agent_message', 'agent_message_content_delta', 'agent_message_delta', 'agent_reasoning', 'agent_reasoning_delta', 'agent_reasoning_raw_content', 'agent_reasoning_raw_content_delta', 'agent_reasoning_section_break', 'apply_patch_approval_request', 'background_event', 'collab_agent_interaction_begin', 'collab_agent_interaction_end', 'collab_agent_spawn_begin', 'collab_agent_spawn_end', 'collab_close_begin', 'collab_close_end', 'collab_resume_begin', 'collab_resume_end', 'collab_waiting_begin', 'collab_waiting_end', 'context_compacted', 'deprecation_notice', 'dynamic_tool_call_request', 'elicitation_request', 'entered_review_mode', 'error', 'exec_approval_request', 'exec_command_begin', 'exec_command_end', 'exec_command_output_delta', 'exited_review_mode', 'get_history_entry_response', 'item_completed', 'item_started', 'list_custom_prompts_response', 'list_remote_skills_response', 'list_skills_response', 'mcp_list_tools_response', 'mcp_startup_complete', 'mcp_startup_update', 'mcp_tool_call_begin', 'mcp_tool_call_end', 'patch_apply_begin', 'patch_apply_end', 'plan_delta', 'plan_update', 'raw_response_item', 'reasoning_content_delta', 'reasoning_raw_content_delta', 'remote_skill_downloaded', 'request_user_input', 'session_configured', 'shutdown_complete', 'skills_update_available', 'stream_error', 'task_complete', 'task_started', 'terminal_interaction', 'thread_name_updated', 'thread_rolled_back', 'token_count', 'turn_aborted', 'turn_diff', 'undo_completed', 'undo_started', 'user_message', 'view_image_tool_call', 'warning', 'web_search_begin', 'web_search_end']
class CodexEventMessage(BaseModel):
model_config = ConfigDict(extra="allow")
type: CodexEventType | str
class CodexEventNotification(BaseModel):
id: str | None = None
conversationId: str | None = None
msg: CodexEventMessage | dict[str, Any]

View File

@@ -0,0 +1,89 @@
# Auto-generated by scripts/update_sdk_artifacts.py
# DO NOT EDIT MANUALLY.
from __future__ import annotations
from pydantic import BaseModel
from .v2_all.AccountLoginCompletedNotification import AccountLoginCompletedNotification
from .v2_all.AccountRateLimitsUpdatedNotification import AccountRateLimitsUpdatedNotification
from .v2_all.AccountUpdatedNotification import AccountUpdatedNotification
from .v2_all.AgentMessageDeltaNotification import AgentMessageDeltaNotification
from .v2_all.AppListUpdatedNotification import AppListUpdatedNotification
from .v2_all.CommandExecutionOutputDeltaNotification import CommandExecutionOutputDeltaNotification
from .v2_all.ConfigWarningNotification import ConfigWarningNotification
from .v2_all.ContextCompactedNotification import ContextCompactedNotification
from .v2_all.DeprecationNoticeNotification import DeprecationNoticeNotification
from .v2_all.ErrorNotification import ErrorNotification
from .v2_all.FileChangeOutputDeltaNotification import FileChangeOutputDeltaNotification
from .v2_all.ItemCompletedNotification import ItemCompletedNotification
from .v2_all.ItemStartedNotification import ItemStartedNotification
from .v2_all.McpServerOauthLoginCompletedNotification import McpServerOauthLoginCompletedNotification
from .v2_all.McpToolCallProgressNotification import McpToolCallProgressNotification
from .v2_all.PlanDeltaNotification import PlanDeltaNotification
from .v2_all.RawResponseItemCompletedNotification import RawResponseItemCompletedNotification
from .v2_all.ReasoningSummaryPartAddedNotification import ReasoningSummaryPartAddedNotification
from .v2_all.ReasoningSummaryTextDeltaNotification import ReasoningSummaryTextDeltaNotification
from .v2_all.ReasoningTextDeltaNotification import ReasoningTextDeltaNotification
from .v2_all.TerminalInteractionNotification import TerminalInteractionNotification
from .v2_all.ThreadNameUpdatedNotification import ThreadNameUpdatedNotification
from .v2_all.ThreadStartedNotification import ThreadStartedNotification
from .v2_all.ThreadTokenUsageUpdatedNotification import ThreadTokenUsageUpdatedNotification
from .v2_all.TurnCompletedNotification import TurnCompletedNotification
from .v2_all.TurnDiffUpdatedNotification import TurnDiffUpdatedNotification
from .v2_all.TurnPlanUpdatedNotification import TurnPlanUpdatedNotification
from .v2_all.TurnStartedNotification import TurnStartedNotification
from .v2_all.WindowsWorldWritableWarningNotification import WindowsWorldWritableWarningNotification
NOTIFICATION_MODELS: dict[str, type[BaseModel]] = {
"account/login/completed": AccountLoginCompletedNotification,
"account/rateLimits/updated": AccountRateLimitsUpdatedNotification,
"account/updated": AccountUpdatedNotification,
"app/list/updated": AppListUpdatedNotification,
"configWarning": ConfigWarningNotification,
"deprecationNotice": DeprecationNoticeNotification,
"error": ErrorNotification,
"item/agentMessage/delta": AgentMessageDeltaNotification,
"item/commandExecution/outputDelta": CommandExecutionOutputDeltaNotification,
"item/commandExecution/terminalInteraction": TerminalInteractionNotification,
"item/completed": ItemCompletedNotification,
"item/fileChange/outputDelta": FileChangeOutputDeltaNotification,
"item/mcpToolCall/progress": McpToolCallProgressNotification,
"item/plan/delta": PlanDeltaNotification,
"item/reasoning/summaryPartAdded": ReasoningSummaryPartAddedNotification,
"item/reasoning/summaryTextDelta": ReasoningSummaryTextDeltaNotification,
"item/reasoning/textDelta": ReasoningTextDeltaNotification,
"item/started": ItemStartedNotification,
"mcpServer/oauthLogin/completed": McpServerOauthLoginCompletedNotification,
"rawResponseItem/completed": RawResponseItemCompletedNotification,
"thread/compacted": ContextCompactedNotification,
"thread/name/updated": ThreadNameUpdatedNotification,
"thread/started": ThreadStartedNotification,
"thread/tokenUsage/updated": ThreadTokenUsageUpdatedNotification,
"turn/completed": TurnCompletedNotification,
"turn/diff/updated": TurnDiffUpdatedNotification,
"turn/plan/updated": TurnPlanUpdatedNotification,
"turn/started": TurnStartedNotification,
"windows/worldWritableWarning": WindowsWorldWritableWarningNotification,
}
NOTIFICATION_METHOD_ALIASES: dict[str, str] = {
"account/loginCompleted": "account/login/completed",
"account/rateLimitsUpdated": "account/rateLimits/updated",
"app/listUpdated": "app/list/updated",
"commandExecution/outputDelta": "item/commandExecution/outputDelta",
"config/warning": "configWarning",
"context/compacted": "thread/compacted",
"fileChange/outputDelta": "item/fileChange/outputDelta",
"mcp/serverOauthLoginCompleted": "mcpServer/oauthLogin/completed",
"mcp/toolCallProgress": "item/mcpToolCall/progress",
"plan/delta": "item/plan/delta",
"reasoning/summaryPartAdded": "item/reasoning/summaryPartAdded",
"reasoning/summaryTextDelta": "item/reasoning/summaryTextDelta",
"reasoning/textDelta": "item/reasoning/textDelta",
"terminal/interaction": "item/commandExecution/terminalInteraction",
"thread/nameUpdated": "thread/name/updated",
"thread/tokenUsageUpdated": "thread/tokenUsage/updated",
"turn/diffUpdated": "turn/diff/updated",
"turn/planUpdated": "turn/plan/updated",
}

View File

@@ -0,0 +1,46 @@
from __future__ import annotations
from typing import Any, NotRequired, TypedDict
# Generated by scripts/update_sdk_artifacts.py
class ThreadObject(TypedDict):
cliVersion: str
createdAt: int
cwd: str
gitInfo: NotRequired[dict[str, Any] | None]
id: str
modelProvider: str
path: NotRequired[dict[str, Any]]
preview: str
source: dict[str, Any]
turns: list[TurnObject]
updatedAt: int
class TurnObject(TypedDict):
error: NotRequired[dict[str, Any] | None]
id: str
items: list[dict[str, Any]]
status: dict[str, Any]
class ThreadTokenUsage(TypedDict):
last: dict[str, Any]
modelContextWindow: NotRequired[dict[str, Any]]
total: dict[str, Any]
class ThreadStartResponse(TypedDict):
approvalPolicy: dict[str, Any]
cwd: str
model: str
modelProvider: str
reasoningEffort: NotRequired[dict[str, Any] | None]
sandbox: dict[str, Any]
thread: ThreadObject
class TurnStartResponse(TypedDict):
turn: TurnObject
class ThreadTokenUsageUpdatedNotificationParams(TypedDict):
threadId: str
tokenUsage: ThreadTokenUsage
turnId: str

View File

@@ -0,0 +1,172 @@
# Auto-generated by scripts/update_sdk_artifacts.py
# DO NOT EDIT MANUALLY.
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, TypedDict
class ThreadDict(TypedDict, total=False):
cliVersion: str
createdAt: int
cwd: str
gitInfo: Any
id: str
modelProvider: str
path: Any
preview: str
source: Any
turns: list[Turn]
updatedAt: int
@dataclass(slots=True, kw_only=True)
class Thread:
cliVersion: str
createdAt: int
cwd: str
gitInfo: Any = None
id: str
modelProvider: str
path: Any = None
preview: str
source: Any
turns: list[Turn]
updatedAt: int
class TurnDict(TypedDict, total=False):
error: Any
id: str
items: list[Any]
status: str
@dataclass(slots=True, kw_only=True)
class Turn:
error: Any = None
id: str
items: list[Any]
status: str
class ThreadStartResponseDict(TypedDict, total=False):
approvalPolicy: str
cwd: str
model: str
modelProvider: str
reasoningEffort: Any
sandbox: Any
thread: Thread
@dataclass(slots=True, kw_only=True)
class ThreadStartResponse:
approvalPolicy: str
cwd: str
model: str
modelProvider: str
reasoningEffort: Any = None
sandbox: Any
thread: Thread
class ThreadResumeResponseDict(TypedDict, total=False):
approvalPolicy: str
cwd: str
model: str
modelProvider: str
reasoningEffort: Any
sandbox: Any
thread: Thread
@dataclass(slots=True, kw_only=True)
class ThreadResumeResponse:
approvalPolicy: str
cwd: str
model: str
modelProvider: str
reasoningEffort: Any = None
sandbox: Any
thread: Thread
class ThreadReadResponseDict(TypedDict, total=False):
thread: Thread
@dataclass(slots=True, kw_only=True)
class ThreadReadResponse:
thread: Thread
class ThreadListResponseDict(TypedDict, total=False):
data: list[Thread]
nextCursor: Any
@dataclass(slots=True, kw_only=True)
class ThreadListResponse:
data: list[Thread]
nextCursor: Any = None
class ThreadForkResponseDict(TypedDict, total=False):
approvalPolicy: str
cwd: str
model: str
modelProvider: str
reasoningEffort: Any
sandbox: Any
thread: Thread
@dataclass(slots=True, kw_only=True)
class ThreadForkResponse:
approvalPolicy: str
cwd: str
model: str
modelProvider: str
reasoningEffort: Any = None
sandbox: Any
thread: Thread
class ThreadArchiveResponseDict(TypedDict, total=False):
pass
@dataclass(slots=True, kw_only=True)
class ThreadArchiveResponse:
pass
class ThreadUnarchiveResponseDict(TypedDict, total=False):
thread: Thread
@dataclass(slots=True, kw_only=True)
class ThreadUnarchiveResponse:
thread: Thread
class ThreadSetNameResponseDict(TypedDict, total=False):
pass
@dataclass(slots=True, kw_only=True)
class ThreadSetNameResponse:
pass
class ThreadCompactStartResponseDict(TypedDict, total=False):
pass
@dataclass(slots=True, kw_only=True)
class ThreadCompactStartResponse:
pass
class TurnStartResponseDict(TypedDict, total=False):
turn: Turn
@dataclass(slots=True, kw_only=True)
class TurnStartResponse:
turn: Turn
class TurnSteerResponseDict(TypedDict, total=False):
turnId: str
@dataclass(slots=True, kw_only=True)
class TurnSteerResponse:
turnId: str
class ModelListResponseDict(TypedDict, total=False):
data: list[Any]
nextCursor: Any
@dataclass(slots=True, kw_only=True)
class ModelListResponse:
data: list[Any]
nextCursor: Any = None

View File

@@ -0,0 +1,15 @@
# generated by datamodel-codegen:
# filename: AccountLoginCompletedNotification.json
# timestamp: <normalized>
from __future__ import annotations
from typing import Optional
from pydantic import BaseModel
class AccountLoginCompletedNotification(BaseModel):
error: Optional[str] = None
loginId: Optional[str] = None
success: bool

View File

@@ -0,0 +1,47 @@
# generated by datamodel-codegen:
# filename: AccountRateLimitsUpdatedNotification.json
# timestamp: <normalized>
from __future__ import annotations
from enum import Enum
from typing import Optional
from pydantic import BaseModel
class CreditsSnapshot(BaseModel):
balance: Optional[str] = None
hasCredits: bool
unlimited: bool
class PlanType(Enum):
free = "free"
go = "go"
plus = "plus"
pro = "pro"
team = "team"
business = "business"
enterprise = "enterprise"
edu = "edu"
unknown = "unknown"
class RateLimitWindow(BaseModel):
resetsAt: Optional[int] = None
usedPercent: int
windowDurationMins: Optional[int] = None
class RateLimitSnapshot(BaseModel):
credits: Optional[CreditsSnapshot] = None
limitId: Optional[str] = None
limitName: Optional[str] = None
planType: Optional[PlanType] = None
primary: Optional[RateLimitWindow] = None
secondary: Optional[RateLimitWindow] = None
class AccountRateLimitsUpdatedNotification(BaseModel):
rateLimits: RateLimitSnapshot

View File

@@ -0,0 +1,32 @@
# generated by datamodel-codegen:
# filename: AccountUpdatedNotification.json
# timestamp: <normalized>
from __future__ import annotations
from enum import Enum
from typing import Optional, Union
from pydantic import BaseModel, Field, RootModel
class AuthMode1(Enum):
apikey = "apikey"
class AuthMode2(Enum):
chatgpt = "chatgpt"
class AuthMode3(Enum):
chatgptAuthTokens = "chatgptAuthTokens"
class AuthMode(RootModel[Union[AuthMode1, AuthMode2, AuthMode3]]):
root: Union[AuthMode1, AuthMode2, AuthMode3] = Field(
..., description="Authentication mode for OpenAI-backed providers."
)
class AccountUpdatedNotification(BaseModel):
authMode: Optional[AuthMode] = None

View File

@@ -0,0 +1,14 @@
# generated by datamodel-codegen:
# filename: AgentMessageDeltaNotification.json
# timestamp: <normalized>
from __future__ import annotations
from pydantic import BaseModel
class AgentMessageDeltaNotification(BaseModel):
delta: str
itemId: str
threadId: str
turnId: str

View File

@@ -0,0 +1,28 @@
# generated by datamodel-codegen:
# filename: AppListUpdatedNotification.json
# timestamp: <normalized>
from __future__ import annotations
from typing import List, Optional
from pydantic import BaseModel, Field
class AppInfo(BaseModel):
description: Optional[str] = None
distributionChannel: Optional[str] = None
id: str
installUrl: Optional[str] = None
isAccessible: Optional[bool] = False
isEnabled: Optional[bool] = Field(
True,
description="Whether this app is enabled in config.toml. Example: ```toml [apps.bad_app] enabled = false ```",
)
logoUrl: Optional[str] = None
logoUrlDark: Optional[str] = None
name: str
class AppListUpdatedNotification(BaseModel):
data: List[AppInfo]

View File

@@ -0,0 +1,27 @@
# generated by datamodel-codegen:
# filename: AppsListParams.json
# timestamp: <normalized>
from __future__ import annotations
from typing import Optional
from pydantic import BaseModel, Field, conint
class AppsListParams(BaseModel):
cursor: Optional[str] = Field(
None, description="Opaque pagination cursor returned by a previous call."
)
forceRefetch: Optional[bool] = Field(
None,
description="When true, bypass app caches and fetch the latest data from sources.",
)
limit: Optional[conint(ge=0)] = Field(
None,
description="Optional page size; defaults to a reasonable server-side value.",
)
threadId: Optional[str] = Field(
None,
description="Optional thread id used to evaluate app feature gating from that thread's config.",
)

View File

@@ -0,0 +1,32 @@
# generated by datamodel-codegen:
# filename: AppsListResponse.json
# timestamp: <normalized>
from __future__ import annotations
from typing import List, Optional
from pydantic import BaseModel, Field
class AppInfo(BaseModel):
description: Optional[str] = None
distributionChannel: Optional[str] = None
id: str
installUrl: Optional[str] = None
isAccessible: Optional[bool] = False
isEnabled: Optional[bool] = Field(
True,
description="Whether this app is enabled in config.toml. Example: ```toml [apps.bad_app] enabled = false ```",
)
logoUrl: Optional[str] = None
logoUrlDark: Optional[str] = None
name: str
class AppsListResponse(BaseModel):
data: List[AppInfo]
nextCursor: Optional[str] = Field(
None,
description="Opaque cursor to pass to the next call to continue after the last item. If None, there are no more items to return.",
)

View File

@@ -0,0 +1,11 @@
# generated by datamodel-codegen:
# filename: CancelLoginAccountParams.json
# timestamp: <normalized>
from __future__ import annotations
from pydantic import BaseModel
class CancelLoginAccountParams(BaseModel):
loginId: str

View File

@@ -0,0 +1,18 @@
# generated by datamodel-codegen:
# filename: CancelLoginAccountResponse.json
# timestamp: <normalized>
from __future__ import annotations
from enum import Enum
from pydantic import BaseModel
class CancelLoginAccountStatus(Enum):
canceled = "canceled"
notFound = "notFound"
class CancelLoginAccountResponse(BaseModel):
status: CancelLoginAccountStatus

View File

@@ -0,0 +1,100 @@
# generated by datamodel-codegen:
# filename: CommandExecParams.json
# timestamp: <normalized>
from __future__ import annotations
from enum import Enum
from typing import List, Optional, Union
from pydantic import BaseModel, Field, RootModel
class AbsolutePathBuf(RootModel[str]):
root: str = Field(
...,
description="A path that is guaranteed to be absolute and normalized (though it is not guaranteed to be canonicalized or exist on the filesystem).\n\nIMPORTANT: When deserializing an `AbsolutePathBuf`, a base path must be set using [AbsolutePathBufGuard::new]. If no base path is set, the deserialization will fail unless the path being deserialized is already absolute.",
)
class NetworkAccess(Enum):
restricted = "restricted"
enabled = "enabled"
class Type(Enum):
restricted = "restricted"
class ReadOnlyAccess1(BaseModel):
includePlatformDefaults: Optional[bool] = True
readableRoots: Optional[List[AbsolutePathBuf]] = []
type: Type = Field(..., title="RestrictedReadOnlyAccessType")
class Type1(Enum):
fullAccess = "fullAccess"
class ReadOnlyAccess2(BaseModel):
type: Type1 = Field(..., title="FullAccessReadOnlyAccessType")
class ReadOnlyAccess(RootModel[Union[ReadOnlyAccess1, ReadOnlyAccess2]]):
root: Union[ReadOnlyAccess1, ReadOnlyAccess2]
class Type2(Enum):
dangerFullAccess = "dangerFullAccess"
class SandboxPolicy1(BaseModel):
type: Type2 = Field(..., title="DangerFullAccessSandboxPolicyType")
class Type3(Enum):
readOnly = "readOnly"
class SandboxPolicy2(BaseModel):
access: Optional[ReadOnlyAccess] = Field(
default_factory=lambda: ReadOnlyAccess.model_validate({"type": "fullAccess"})
)
type: Type3 = Field(..., title="ReadOnlySandboxPolicyType")
class Type4(Enum):
externalSandbox = "externalSandbox"
class SandboxPolicy3(BaseModel):
networkAccess: Optional[NetworkAccess] = "restricted"
type: Type4 = Field(..., title="ExternalSandboxSandboxPolicyType")
class Type5(Enum):
workspaceWrite = "workspaceWrite"
class SandboxPolicy4(BaseModel):
excludeSlashTmp: Optional[bool] = False
excludeTmpdirEnvVar: Optional[bool] = False
networkAccess: Optional[bool] = False
readOnlyAccess: Optional[ReadOnlyAccess] = Field(
default_factory=lambda: ReadOnlyAccess.model_validate({"type": "fullAccess"})
)
type: Type5 = Field(..., title="WorkspaceWriteSandboxPolicyType")
writableRoots: Optional[List[AbsolutePathBuf]] = []
class SandboxPolicy(
RootModel[Union[SandboxPolicy1, SandboxPolicy2, SandboxPolicy3, SandboxPolicy4]]
):
root: Union[SandboxPolicy1, SandboxPolicy2, SandboxPolicy3, SandboxPolicy4]
class CommandExecParams(BaseModel):
command: List[str]
cwd: Optional[str] = None
sandboxPolicy: Optional[SandboxPolicy] = None
timeoutMs: Optional[int] = None

View File

@@ -0,0 +1,13 @@
# generated by datamodel-codegen:
# filename: CommandExecResponse.json
# timestamp: <normalized>
from __future__ import annotations
from pydantic import BaseModel
class CommandExecResponse(BaseModel):
exitCode: int
stderr: str
stdout: str

View File

@@ -0,0 +1,14 @@
# generated by datamodel-codegen:
# filename: CommandExecutionOutputDeltaNotification.json
# timestamp: <normalized>
from __future__ import annotations
from pydantic import BaseModel
class CommandExecutionOutputDeltaNotification(BaseModel):
delta: str
itemId: str
threadId: str
turnId: str

View File

@@ -0,0 +1,30 @@
# generated by datamodel-codegen:
# filename: ConfigBatchWriteParams.json
# timestamp: <normalized>
from __future__ import annotations
from enum import Enum
from typing import Any, List, Optional
from pydantic import BaseModel, Field
class MergeStrategy(Enum):
replace = "replace"
upsert = "upsert"
class ConfigEdit(BaseModel):
keyPath: str
mergeStrategy: MergeStrategy
value: Any
class ConfigBatchWriteParams(BaseModel):
edits: List[ConfigEdit]
expectedVersion: Optional[str] = None
filePath: Optional[str] = Field(
None,
description="Path to the config file to write; defaults to the user's `config.toml` when omitted.",
)

View File

@@ -0,0 +1,17 @@
# generated by datamodel-codegen:
# filename: ConfigReadParams.json
# timestamp: <normalized>
from __future__ import annotations
from typing import Optional
from pydantic import BaseModel, Field
class ConfigReadParams(BaseModel):
cwd: Optional[str] = Field(
None,
description="Optional working directory to resolve project config layers. If specified, return the effective config as seen from that directory (i.e., including any project layers between `cwd` and the project/repo root).",
)
includeLayers: Optional[bool] = False

View File

@@ -0,0 +1,261 @@
# generated by datamodel-codegen:
# filename: ConfigReadResponse.json
# timestamp: <normalized>
from __future__ import annotations
from enum import Enum
from typing import Any, Dict, List, Optional, Union
from pydantic import BaseModel, ConfigDict, Field, RootModel
class AbsolutePathBuf(RootModel[str]):
root: str = Field(
...,
description="A path that is guaranteed to be absolute and normalized (though it is not guaranteed to be canonicalized or exist on the filesystem).\n\nIMPORTANT: When deserializing an `AbsolutePathBuf`, a base path must be set using [AbsolutePathBufGuard::new]. If no base path is set, the deserialization will fail unless the path being deserialized is already absolute.",
)
class AnalyticsConfig(BaseModel):
model_config = ConfigDict(
extra="allow",
)
enabled: Optional[bool] = None
class AppDisabledReason(Enum):
unknown = "unknown"
user = "user"
class AppsConfig(BaseModel):
pass
class AskForApproval(Enum):
untrusted = "untrusted"
on_failure = "on-failure"
on_request = "on-request"
never = "never"
class Type(Enum):
mdm = "mdm"
class ConfigLayerSource1(BaseModel):
domain: str
key: str
type: Type = Field(..., title="MdmConfigLayerSourceType")
class Type7(Enum):
system = "system"
class ConfigLayerSource2(BaseModel):
file: AbsolutePathBuf = Field(
...,
description="This is the path to the system config.toml file, though it is not guaranteed to exist.",
)
type: Type7 = Field(..., title="SystemConfigLayerSourceType")
class Type8(Enum):
user = "user"
class ConfigLayerSource3(BaseModel):
file: AbsolutePathBuf = Field(
...,
description="This is the path to the user's config.toml file, though it is not guaranteed to exist.",
)
type: Type8 = Field(..., title="UserConfigLayerSourceType")
class Type9(Enum):
project = "project"
class ConfigLayerSource4(BaseModel):
dotCodexFolder: AbsolutePathBuf
type: Type9 = Field(..., title="ProjectConfigLayerSourceType")
class Type10(Enum):
sessionFlags = "sessionFlags"
class ConfigLayerSource5(BaseModel):
type: Type10 = Field(..., title="SessionFlagsConfigLayerSourceType")
class Type11(Enum):
legacyManagedConfigTomlFromFile = "legacyManagedConfigTomlFromFile"
class ConfigLayerSource6(BaseModel):
file: AbsolutePathBuf
type: Type11 = Field(
..., title="LegacyManagedConfigTomlFromFileConfigLayerSourceType"
)
class Type12(Enum):
legacyManagedConfigTomlFromMdm = "legacyManagedConfigTomlFromMdm"
class ConfigLayerSource7(BaseModel):
type: Type12 = Field(
..., title="LegacyManagedConfigTomlFromMdmConfigLayerSourceType"
)
class ConfigLayerSource(
RootModel[
Union[
ConfigLayerSource1,
ConfigLayerSource2,
ConfigLayerSource3,
ConfigLayerSource4,
ConfigLayerSource5,
ConfigLayerSource6,
ConfigLayerSource7,
]
]
):
root: Union[
ConfigLayerSource1,
ConfigLayerSource2,
ConfigLayerSource3,
ConfigLayerSource4,
ConfigLayerSource5,
ConfigLayerSource6,
ConfigLayerSource7,
]
class ForcedLoginMethod(Enum):
chatgpt = "chatgpt"
api = "api"
class ReasoningEffort(Enum):
none = "none"
minimal = "minimal"
low = "low"
medium = "medium"
high = "high"
xhigh = "xhigh"
class ReasoningSummary1(Enum):
auto = "auto"
concise = "concise"
detailed = "detailed"
class ReasoningSummary2(Enum):
none = "none"
class ReasoningSummary(RootModel[Union[ReasoningSummary1, ReasoningSummary2]]):
root: Union[ReasoningSummary1, ReasoningSummary2] = Field(
...,
description="A summary of the reasoning performed by the model. This can be useful for debugging and understanding the model's reasoning process. See https://platform.openai.com/docs/guides/reasoning?api-mode=responses#reasoning-summaries",
)
class SandboxMode(Enum):
read_only = "read-only"
workspace_write = "workspace-write"
danger_full_access = "danger-full-access"
class SandboxWorkspaceWrite(BaseModel):
exclude_slash_tmp: Optional[bool] = False
exclude_tmpdir_env_var: Optional[bool] = False
network_access: Optional[bool] = False
writable_roots: Optional[List[str]] = []
class ToolsV2(BaseModel):
view_image: Optional[bool] = None
web_search: Optional[bool] = None
class Verbosity(Enum):
low = "low"
medium = "medium"
high = "high"
class WebSearchMode(Enum):
disabled = "disabled"
cached = "cached"
live = "live"
class AppConfig(BaseModel):
disabled_reason: Optional[AppDisabledReason] = None
enabled: Optional[bool] = True
class ConfigLayer(BaseModel):
config: Any
disabledReason: Optional[str] = None
name: ConfigLayerSource
version: str
class ConfigLayerMetadata(BaseModel):
name: ConfigLayerSource
version: str
class ProfileV2(BaseModel):
model_config = ConfigDict(
extra="allow",
)
approval_policy: Optional[AskForApproval] = None
chatgpt_base_url: Optional[str] = None
model: Optional[str] = None
model_provider: Optional[str] = None
model_reasoning_effort: Optional[ReasoningEffort] = None
model_reasoning_summary: Optional[ReasoningSummary] = None
model_verbosity: Optional[Verbosity] = None
web_search: Optional[WebSearchMode] = None
class Config(BaseModel):
model_config = ConfigDict(
extra="allow",
)
analytics: Optional[AnalyticsConfig] = None
approval_policy: Optional[AskForApproval] = None
compact_prompt: Optional[str] = None
developer_instructions: Optional[str] = None
forced_chatgpt_workspace_id: Optional[str] = None
forced_login_method: Optional[ForcedLoginMethod] = None
instructions: Optional[str] = None
model: Optional[str] = None
model_auto_compact_token_limit: Optional[int] = None
model_context_window: Optional[int] = None
model_provider: Optional[str] = None
model_reasoning_effort: Optional[ReasoningEffort] = None
model_reasoning_summary: Optional[ReasoningSummary] = None
model_verbosity: Optional[Verbosity] = None
profile: Optional[str] = None
profiles: Optional[Dict[str, ProfileV2]] = {}
review_model: Optional[str] = None
sandbox_mode: Optional[SandboxMode] = None
sandbox_workspace_write: Optional[SandboxWorkspaceWrite] = None
tools: Optional[ToolsV2] = None
web_search: Optional[WebSearchMode] = None
class ConfigReadResponse(BaseModel):
config: Config
layers: Optional[List[ConfigLayer]] = None
origins: Dict[str, ConfigLayerMetadata]

View File

@@ -0,0 +1,60 @@
# generated by datamodel-codegen:
# filename: ConfigRequirementsReadResponse.json
# timestamp: <normalized>
from __future__ import annotations
from enum import Enum
from typing import List, Optional
from pydantic import BaseModel, Field, conint
class AskForApproval(Enum):
untrusted = "untrusted"
on_failure = "on-failure"
on_request = "on-request"
never = "never"
class NetworkRequirements(BaseModel):
allowLocalBinding: Optional[bool] = None
allowUnixSockets: Optional[List[str]] = None
allowUpstreamProxy: Optional[bool] = None
allowedDomains: Optional[List[str]] = None
dangerouslyAllowNonLoopbackAdmin: Optional[bool] = None
dangerouslyAllowNonLoopbackProxy: Optional[bool] = None
deniedDomains: Optional[List[str]] = None
enabled: Optional[bool] = None
httpPort: Optional[conint(ge=0)] = None
socksPort: Optional[conint(ge=0)] = None
class ResidencyRequirement(Enum):
us = "us"
class SandboxMode(Enum):
read_only = "read-only"
workspace_write = "workspace-write"
danger_full_access = "danger-full-access"
class WebSearchMode(Enum):
disabled = "disabled"
cached = "cached"
live = "live"
class ConfigRequirements(BaseModel):
allowedApprovalPolicies: Optional[List[AskForApproval]] = None
allowedSandboxModes: Optional[List[SandboxMode]] = None
allowedWebSearchModes: Optional[List[WebSearchMode]] = None
enforceResidency: Optional[ResidencyRequirement] = None
class ConfigRequirementsReadResponse(BaseModel):
requirements: Optional[ConfigRequirements] = Field(
None,
description="Null if no requirements are configured (e.g. no requirements.toml/MDM entries).",
)

View File

@@ -0,0 +1,26 @@
# generated by datamodel-codegen:
# filename: ConfigValueWriteParams.json
# timestamp: <normalized>
from __future__ import annotations
from enum import Enum
from typing import Any, Optional
from pydantic import BaseModel, Field
class MergeStrategy(Enum):
replace = "replace"
upsert = "upsert"
class ConfigValueWriteParams(BaseModel):
expectedVersion: Optional[str] = None
filePath: Optional[str] = Field(
None,
description="Path to the config file to write; defaults to the user's `config.toml` when omitted.",
)
keyPath: str
mergeStrategy: MergeStrategy
value: Any

View File

@@ -0,0 +1,35 @@
# generated by datamodel-codegen:
# filename: ConfigWarningNotification.json
# timestamp: <normalized>
from __future__ import annotations
from typing import Optional
from pydantic import BaseModel, Field, conint
class TextPosition(BaseModel):
column: conint(ge=0) = Field(
..., description="1-based column number (in Unicode scalar values)."
)
line: conint(ge=0) = Field(..., description="1-based line number.")
class TextRange(BaseModel):
end: TextPosition
start: TextPosition
class ConfigWarningNotification(BaseModel):
details: Optional[str] = Field(
None, description="Optional extra guidance or error details."
)
path: Optional[str] = Field(
None, description="Optional path to the config file that triggered the warning."
)
range: Optional[TextRange] = Field(
None,
description="Optional range for the error location inside the config file.",
)
summary: str = Field(..., description="Concise summary of the warning.")

View File

@@ -0,0 +1,138 @@
# generated by datamodel-codegen:
# filename: ConfigWriteResponse.json
# timestamp: <normalized>
from __future__ import annotations
from enum import Enum
from typing import Any, Optional, Union
from pydantic import BaseModel, Field, RootModel
class AbsolutePathBuf(RootModel[str]):
root: str = Field(
...,
description="A path that is guaranteed to be absolute and normalized (though it is not guaranteed to be canonicalized or exist on the filesystem).\n\nIMPORTANT: When deserializing an `AbsolutePathBuf`, a base path must be set using [AbsolutePathBufGuard::new]. If no base path is set, the deserialization will fail unless the path being deserialized is already absolute.",
)
class Type(Enum):
mdm = "mdm"
class ConfigLayerSource8(BaseModel):
domain: str
key: str
type: Type = Field(..., title="MdmConfigLayerSourceType")
class Type14(Enum):
system = "system"
class ConfigLayerSource9(BaseModel):
file: AbsolutePathBuf = Field(
...,
description="This is the path to the system config.toml file, though it is not guaranteed to exist.",
)
type: Type14 = Field(..., title="SystemConfigLayerSourceType")
class Type15(Enum):
user = "user"
class ConfigLayerSource10(BaseModel):
file: AbsolutePathBuf = Field(
...,
description="This is the path to the user's config.toml file, though it is not guaranteed to exist.",
)
type: Type15 = Field(..., title="UserConfigLayerSourceType")
class Type16(Enum):
project = "project"
class ConfigLayerSource11(BaseModel):
dotCodexFolder: AbsolutePathBuf
type: Type16 = Field(..., title="ProjectConfigLayerSourceType")
class Type17(Enum):
sessionFlags = "sessionFlags"
class ConfigLayerSource12(BaseModel):
type: Type17 = Field(..., title="SessionFlagsConfigLayerSourceType")
class Type18(Enum):
legacyManagedConfigTomlFromFile = "legacyManagedConfigTomlFromFile"
class ConfigLayerSource13(BaseModel):
file: AbsolutePathBuf
type: Type18 = Field(
..., title="LegacyManagedConfigTomlFromFileConfigLayerSourceType"
)
class Type19(Enum):
legacyManagedConfigTomlFromMdm = "legacyManagedConfigTomlFromMdm"
class ConfigLayerSource14(BaseModel):
type: Type19 = Field(
..., title="LegacyManagedConfigTomlFromMdmConfigLayerSourceType"
)
class ConfigLayerSource(
RootModel[
Union[
ConfigLayerSource8,
ConfigLayerSource9,
ConfigLayerSource10,
ConfigLayerSource11,
ConfigLayerSource12,
ConfigLayerSource13,
ConfigLayerSource14,
]
]
):
root: Union[
ConfigLayerSource8,
ConfigLayerSource9,
ConfigLayerSource10,
ConfigLayerSource11,
ConfigLayerSource12,
ConfigLayerSource13,
ConfigLayerSource14,
]
class WriteStatus(Enum):
ok = "ok"
okOverridden = "okOverridden"
class ConfigLayerMetadata(BaseModel):
name: ConfigLayerSource
version: str
class OverriddenMetadata(BaseModel):
effectiveValue: Any
message: str
overridingLayer: ConfigLayerMetadata
class ConfigWriteResponse(BaseModel):
filePath: AbsolutePathBuf = Field(
..., description="Canonical path to the config file that was written."
)
overriddenMetadata: Optional[OverriddenMetadata] = None
status: WriteStatus
version: str

View File

@@ -0,0 +1,12 @@
# generated by datamodel-codegen:
# filename: ContextCompactedNotification.json
# timestamp: <normalized>
from __future__ import annotations
from pydantic import BaseModel
class ContextCompactedNotification(BaseModel):
threadId: str
turnId: str

View File

@@ -0,0 +1,17 @@
# generated by datamodel-codegen:
# filename: DeprecationNoticeNotification.json
# timestamp: <normalized>
from __future__ import annotations
from typing import Optional
from pydantic import BaseModel, Field
class DeprecationNoticeNotification(BaseModel):
details: Optional[str] = Field(
None,
description="Optional extra guidance, such as migration steps or rationale.",
)
summary: str = Field(..., description="Concise summary of what is deprecated.")

View File

@@ -0,0 +1,102 @@
# generated by datamodel-codegen:
# filename: ErrorNotification.json
# timestamp: <normalized>
from __future__ import annotations
from enum import Enum
from typing import Optional, Union
from pydantic import BaseModel, ConfigDict, Field, RootModel, conint
class CodexErrorInfo1(Enum):
contextWindowExceeded = "contextWindowExceeded"
usageLimitExceeded = "usageLimitExceeded"
serverOverloaded = "serverOverloaded"
internalServerError = "internalServerError"
unauthorized = "unauthorized"
badRequest = "badRequest"
threadRollbackFailed = "threadRollbackFailed"
sandboxError = "sandboxError"
other = "other"
class HttpConnectionFailed(BaseModel):
httpStatusCode: Optional[conint(ge=0)] = None
class CodexErrorInfo2(BaseModel):
model_config = ConfigDict(
extra="forbid",
)
httpConnectionFailed: HttpConnectionFailed
class ResponseStreamConnectionFailed(BaseModel):
httpStatusCode: Optional[conint(ge=0)] = None
class CodexErrorInfo3(BaseModel):
model_config = ConfigDict(
extra="forbid",
)
responseStreamConnectionFailed: ResponseStreamConnectionFailed
class ResponseStreamDisconnected(BaseModel):
httpStatusCode: Optional[conint(ge=0)] = None
class CodexErrorInfo4(BaseModel):
model_config = ConfigDict(
extra="forbid",
)
responseStreamDisconnected: ResponseStreamDisconnected
class ResponseTooManyFailedAttempts(BaseModel):
httpStatusCode: Optional[conint(ge=0)] = None
class CodexErrorInfo5(BaseModel):
model_config = ConfigDict(
extra="forbid",
)
responseTooManyFailedAttempts: ResponseTooManyFailedAttempts
class CodexErrorInfo(
RootModel[
Union[
CodexErrorInfo1,
CodexErrorInfo2,
CodexErrorInfo3,
CodexErrorInfo4,
CodexErrorInfo5,
]
]
):
root: Union[
CodexErrorInfo1,
CodexErrorInfo2,
CodexErrorInfo3,
CodexErrorInfo4,
CodexErrorInfo5,
] = Field(
...,
description="This translation layer make sure that we expose codex error code in camel case.\n\nWhen an upstream HTTP status is available (for example, from the Responses API or a provider), it is forwarded in `httpStatusCode` on the relevant `codexErrorInfo` variant.",
)
class TurnError(BaseModel):
additionalDetails: Optional[str] = None
codexErrorInfo: Optional[CodexErrorInfo] = None
message: str
class ErrorNotification(BaseModel):
error: TurnError
threadId: str
turnId: str
willRetry: bool

View File

@@ -0,0 +1,19 @@
# generated by datamodel-codegen:
# filename: ExperimentalFeatureListParams.json
# timestamp: <normalized>
from __future__ import annotations
from typing import Optional
from pydantic import BaseModel, Field, conint
class ExperimentalFeatureListParams(BaseModel):
cursor: Optional[str] = Field(
None, description="Opaque pagination cursor returned by a previous call."
)
limit: Optional[conint(ge=0)] = Field(
None,
description="Optional page size; defaults to a reasonable server-side value.",
)

View File

@@ -0,0 +1,86 @@
# generated by datamodel-codegen:
# filename: ExperimentalFeatureListResponse.json
# timestamp: <normalized>
from __future__ import annotations
from enum import Enum
from typing import List, Optional, Union
from pydantic import BaseModel, Field, RootModel
class ExperimentalFeatureStage1(Enum):
beta = "beta"
class ExperimentalFeatureStage2(Enum):
underDevelopment = "underDevelopment"
class ExperimentalFeatureStage3(Enum):
stable = "stable"
class ExperimentalFeatureStage4(Enum):
deprecated = "deprecated"
class ExperimentalFeatureStage5(Enum):
removed = "removed"
class ExperimentalFeatureStage(
RootModel[
Union[
ExperimentalFeatureStage1,
ExperimentalFeatureStage2,
ExperimentalFeatureStage3,
ExperimentalFeatureStage4,
ExperimentalFeatureStage5,
]
]
):
root: Union[
ExperimentalFeatureStage1,
ExperimentalFeatureStage2,
ExperimentalFeatureStage3,
ExperimentalFeatureStage4,
ExperimentalFeatureStage5,
]
class ExperimentalFeature(BaseModel):
announcement: Optional[str] = Field(
None,
description="Announcement copy shown to users when the feature is introduced. Null when this feature is not in beta.",
)
defaultEnabled: bool = Field(
..., description="Whether this feature is enabled by default."
)
description: Optional[str] = Field(
None,
description="Short summary describing what the feature does. Null when this feature is not in beta.",
)
displayName: Optional[str] = Field(
None,
description="User-facing display name shown in the experimental features UI. Null when this feature is not in beta.",
)
enabled: bool = Field(
...,
description="Whether this feature is currently enabled in the loaded config.",
)
name: str = Field(
..., description="Stable key used in config.toml and CLI flag toggles."
)
stage: ExperimentalFeatureStage = Field(
..., description="Lifecycle stage of this feature flag."
)
class ExperimentalFeatureListResponse(BaseModel):
data: List[ExperimentalFeature]
nextCursor: Optional[str] = Field(
None,
description="Opaque cursor to pass to the next call to continue after the last item. If None, there are no more items to return.",
)

View File

@@ -0,0 +1,16 @@
# generated by datamodel-codegen:
# filename: FeedbackUploadParams.json
# timestamp: <normalized>
from __future__ import annotations
from typing import Optional
from pydantic import BaseModel
class FeedbackUploadParams(BaseModel):
classification: str
includeLogs: bool
reason: Optional[str] = None
threadId: Optional[str] = None

View File

@@ -0,0 +1,11 @@
# generated by datamodel-codegen:
# filename: FeedbackUploadResponse.json
# timestamp: <normalized>
from __future__ import annotations
from pydantic import BaseModel
class FeedbackUploadResponse(BaseModel):
threadId: str

View File

@@ -0,0 +1,14 @@
# generated by datamodel-codegen:
# filename: FileChangeOutputDeltaNotification.json
# timestamp: <normalized>
from __future__ import annotations
from pydantic import BaseModel
class FileChangeOutputDeltaNotification(BaseModel):
delta: str
itemId: str
threadId: str
turnId: str

View File

@@ -0,0 +1,16 @@
# generated by datamodel-codegen:
# filename: GetAccountParams.json
# timestamp: <normalized>
from __future__ import annotations
from typing import Optional
from pydantic import BaseModel, Field
class GetAccountParams(BaseModel):
refreshToken: Optional[bool] = Field(
False,
description="When `true`, requests a proactive token refresh before returning.\n\nIn managed auth mode this triggers the normal refresh-token flow. In external auth mode this flag is ignored. Clients should refresh tokens themselves and call `account/login/start` with `chatgptAuthTokens`.",
)

View File

@@ -0,0 +1,54 @@
# generated by datamodel-codegen:
# filename: GetAccountRateLimitsResponse.json
# timestamp: <normalized>
from __future__ import annotations
from enum import Enum
from typing import Any, Dict, Optional
from pydantic import BaseModel, Field
class CreditsSnapshot(BaseModel):
balance: Optional[str] = None
hasCredits: bool
unlimited: bool
class PlanType(Enum):
free = "free"
go = "go"
plus = "plus"
pro = "pro"
team = "team"
business = "business"
enterprise = "enterprise"
edu = "edu"
unknown = "unknown"
class RateLimitWindow(BaseModel):
resetsAt: Optional[int] = None
usedPercent: int
windowDurationMins: Optional[int] = None
class RateLimitSnapshot(BaseModel):
credits: Optional[CreditsSnapshot] = None
limitId: Optional[str] = None
limitName: Optional[str] = None
planType: Optional[PlanType] = None
primary: Optional[RateLimitWindow] = None
secondary: Optional[RateLimitWindow] = None
class GetAccountRateLimitsResponse(BaseModel):
rateLimits: RateLimitSnapshot = Field(
...,
description="Backward-compatible single-bucket view; mirrors the historical payload.",
)
rateLimitsByLimitId: Optional[Dict[str, Any]] = Field(
None,
description="Multi-bucket view keyed by metered `limit_id` (for example, `codex`).",
)

View File

@@ -0,0 +1,49 @@
# generated by datamodel-codegen:
# filename: GetAccountResponse.json
# timestamp: <normalized>
from __future__ import annotations
from enum import Enum
from typing import Optional, Union
from pydantic import BaseModel, Field, RootModel
class Type(Enum):
apiKey = "apiKey"
class Account1(BaseModel):
type: Type = Field(..., title="ApiKeyAccountType")
class Type21(Enum):
chatgpt = "chatgpt"
class PlanType(Enum):
free = "free"
go = "go"
plus = "plus"
pro = "pro"
team = "team"
business = "business"
enterprise = "enterprise"
edu = "edu"
unknown = "unknown"
class Account2(BaseModel):
email: str
planType: PlanType
type: Type21 = Field(..., title="ChatgptAccountType")
class Account(RootModel[Union[Account1, Account2]]):
root: Union[Account1, Account2]
class GetAccountResponse(BaseModel):
account: Optional[Account] = None
requiresOpenaiAuth: bool

View File

@@ -0,0 +1,486 @@
# generated by datamodel-codegen:
# filename: ItemCompletedNotification.json
# timestamp: <normalized>
from __future__ import annotations
from enum import Enum
from typing import Any, Dict, List, Optional, Union
from pydantic import BaseModel, Field, RootModel, conint
class ByteRange(BaseModel):
end: conint(ge=0)
start: conint(ge=0)
class CollabAgentStatus(Enum):
pendingInit = "pendingInit"
running = "running"
completed = "completed"
errored = "errored"
shutdown = "shutdown"
notFound = "notFound"
class CollabAgentTool(Enum):
spawnAgent = "spawnAgent"
sendInput = "sendInput"
resumeAgent = "resumeAgent"
wait = "wait"
closeAgent = "closeAgent"
class CollabAgentToolCallStatus(Enum):
inProgress = "inProgress"
completed = "completed"
failed = "failed"
class Type(Enum):
read = "read"
class CommandAction1(BaseModel):
command: str
name: str
path: str
type: Type = Field(..., title="ReadCommandActionType")
class Type23(Enum):
listFiles = "listFiles"
class CommandAction2(BaseModel):
command: str
path: Optional[str] = None
type: Type23 = Field(..., title="ListFilesCommandActionType")
class Type24(Enum):
search = "search"
class CommandAction3(BaseModel):
command: str
path: Optional[str] = None
query: Optional[str] = None
type: Type24 = Field(..., title="SearchCommandActionType")
class Type25(Enum):
unknown = "unknown"
class CommandAction4(BaseModel):
command: str
type: Type25 = Field(..., title="UnknownCommandActionType")
class CommandAction(
RootModel[Union[CommandAction1, CommandAction2, CommandAction3, CommandAction4]]
):
root: Union[CommandAction1, CommandAction2, CommandAction3, CommandAction4]
class CommandExecutionStatus(Enum):
inProgress = "inProgress"
completed = "completed"
failed = "failed"
declined = "declined"
class McpToolCallError(BaseModel):
message: str
class McpToolCallResult(BaseModel):
content: List
structuredContent: Optional[Any] = None
class McpToolCallStatus(Enum):
inProgress = "inProgress"
completed = "completed"
failed = "failed"
class PatchApplyStatus(Enum):
inProgress = "inProgress"
completed = "completed"
failed = "failed"
declined = "declined"
class Type26(Enum):
add = "add"
class PatchChangeKind1(BaseModel):
type: Type26 = Field(..., title="AddPatchChangeKindType")
class Type27(Enum):
delete = "delete"
class PatchChangeKind2(BaseModel):
type: Type27 = Field(..., title="DeletePatchChangeKindType")
class Type28(Enum):
update = "update"
class PatchChangeKind3(BaseModel):
move_path: Optional[str] = None
type: Type28 = Field(..., title="UpdatePatchChangeKindType")
class PatchChangeKind(
RootModel[Union[PatchChangeKind1, PatchChangeKind2, PatchChangeKind3]]
):
root: Union[PatchChangeKind1, PatchChangeKind2, PatchChangeKind3]
class TextElement(BaseModel):
byteRange: ByteRange = Field(
...,
description="Byte range in the parent `text` buffer that this element occupies.",
)
placeholder: Optional[str] = Field(
None,
description="Optional human-readable placeholder for the element, displayed in the UI.",
)
class Type29(Enum):
userMessage = "userMessage"
class Type30(Enum):
agentMessage = "agentMessage"
class ThreadItem2(BaseModel):
id: str
text: str
type: Type30 = Field(..., title="AgentMessageThreadItemType")
class Type31(Enum):
plan = "plan"
class ThreadItem3(BaseModel):
id: str
text: str
type: Type31 = Field(..., title="PlanThreadItemType")
class Type32(Enum):
reasoning = "reasoning"
class ThreadItem4(BaseModel):
content: Optional[List[str]] = []
id: str
summary: Optional[List[str]] = []
type: Type32 = Field(..., title="ReasoningThreadItemType")
class Type33(Enum):
commandExecution = "commandExecution"
class ThreadItem5(BaseModel):
aggregatedOutput: Optional[str] = Field(
None, description="The command's output, aggregated from stdout and stderr."
)
command: str = Field(..., description="The command to be executed.")
commandActions: List[CommandAction] = Field(
...,
description="A best-effort parsing of the command to understand the action(s) it will perform. This returns a list of CommandAction objects because a single shell command may be composed of many commands piped together.",
)
cwd: str = Field(..., description="The command's working directory.")
durationMs: Optional[int] = Field(
None, description="The duration of the command execution in milliseconds."
)
exitCode: Optional[int] = Field(None, description="The command's exit code.")
id: str
processId: Optional[str] = Field(
None, description="Identifier for the underlying PTY process (when available)."
)
status: CommandExecutionStatus
type: Type33 = Field(..., title="CommandExecutionThreadItemType")
class Type34(Enum):
fileChange = "fileChange"
class Type35(Enum):
mcpToolCall = "mcpToolCall"
class ThreadItem7(BaseModel):
arguments: Any
durationMs: Optional[int] = Field(
None, description="The duration of the MCP tool call in milliseconds."
)
error: Optional[McpToolCallError] = None
id: str
result: Optional[McpToolCallResult] = None
server: str
status: McpToolCallStatus
tool: str
type: Type35 = Field(..., title="McpToolCallThreadItemType")
class Type36(Enum):
collabAgentToolCall = "collabAgentToolCall"
class Type37(Enum):
webSearch = "webSearch"
class Type38(Enum):
imageView = "imageView"
class ThreadItem10(BaseModel):
id: str
path: str
type: Type38 = Field(..., title="ImageViewThreadItemType")
class Type39(Enum):
enteredReviewMode = "enteredReviewMode"
class ThreadItem11(BaseModel):
id: str
review: str
type: Type39 = Field(..., title="EnteredReviewModeThreadItemType")
class Type40(Enum):
exitedReviewMode = "exitedReviewMode"
class ThreadItem12(BaseModel):
id: str
review: str
type: Type40 = Field(..., title="ExitedReviewModeThreadItemType")
class Type41(Enum):
contextCompaction = "contextCompaction"
class ThreadItem13(BaseModel):
id: str
type: Type41 = Field(..., title="ContextCompactionThreadItemType")
class Type42(Enum):
text = "text"
class UserInput1(BaseModel):
text: str
text_elements: Optional[List[TextElement]] = Field(
[],
description="UI-defined spans within `text` used to render or persist special elements.",
)
type: Type42 = Field(..., title="TextUserInputType")
class Type43(Enum):
image = "image"
class UserInput2(BaseModel):
type: Type43 = Field(..., title="ImageUserInputType")
url: str
class Type44(Enum):
localImage = "localImage"
class UserInput3(BaseModel):
path: str
type: Type44 = Field(..., title="LocalImageUserInputType")
class Type45(Enum):
skill = "skill"
class UserInput4(BaseModel):
name: str
path: str
type: Type45 = Field(..., title="SkillUserInputType")
class Type46(Enum):
mention = "mention"
class UserInput5(BaseModel):
name: str
path: str
type: Type46 = Field(..., title="MentionUserInputType")
class UserInput(
RootModel[Union[UserInput1, UserInput2, UserInput3, UserInput4, UserInput5]]
):
root: Union[UserInput1, UserInput2, UserInput3, UserInput4, UserInput5]
class Type47(Enum):
search = "search"
class WebSearchAction1(BaseModel):
queries: Optional[List[str]] = None
query: Optional[str] = None
type: Type47 = Field(..., title="SearchWebSearchActionType")
class Type48(Enum):
openPage = "openPage"
class WebSearchAction2(BaseModel):
type: Type48 = Field(..., title="OpenPageWebSearchActionType")
url: Optional[str] = None
class Type49(Enum):
findInPage = "findInPage"
class WebSearchAction3(BaseModel):
pattern: Optional[str] = None
type: Type49 = Field(..., title="FindInPageWebSearchActionType")
url: Optional[str] = None
class Type50(Enum):
other = "other"
class WebSearchAction4(BaseModel):
type: Type50 = Field(..., title="OtherWebSearchActionType")
class WebSearchAction(
RootModel[
Union[WebSearchAction1, WebSearchAction2, WebSearchAction3, WebSearchAction4]
]
):
root: Union[WebSearchAction1, WebSearchAction2, WebSearchAction3, WebSearchAction4]
class CollabAgentState(BaseModel):
message: Optional[str] = None
status: CollabAgentStatus
class FileUpdateChange(BaseModel):
diff: str
kind: PatchChangeKind
path: str
class ThreadItem1(BaseModel):
content: List[UserInput]
id: str
type: Type29 = Field(..., title="UserMessageThreadItemType")
class ThreadItem6(BaseModel):
changes: List[FileUpdateChange]
id: str
status: PatchApplyStatus
type: Type34 = Field(..., title="FileChangeThreadItemType")
class ThreadItem8(BaseModel):
agentsStates: Dict[str, CollabAgentState] = Field(
..., description="Last known status of the target agents, when available."
)
id: str = Field(..., description="Unique identifier for this collab tool call.")
prompt: Optional[str] = Field(
None,
description="Prompt text sent as part of the collab tool call, when available.",
)
receiverThreadIds: List[str] = Field(
...,
description="Thread ID of the receiving agent, when applicable. In case of spawn operation, this corresponds to the newly spawned agent.",
)
senderThreadId: str = Field(
..., description="Thread ID of the agent issuing the collab request."
)
status: CollabAgentToolCallStatus = Field(
..., description="Current status of the collab tool call."
)
tool: CollabAgentTool = Field(
..., description="Name of the collab tool that was invoked."
)
type: Type36 = Field(..., title="CollabAgentToolCallThreadItemType")
class ThreadItem9(BaseModel):
action: Optional[WebSearchAction] = None
id: str
query: str
type: Type37 = Field(..., title="WebSearchThreadItemType")
class ThreadItem(
RootModel[
Union[
ThreadItem1,
ThreadItem2,
ThreadItem3,
ThreadItem4,
ThreadItem5,
ThreadItem6,
ThreadItem7,
ThreadItem8,
ThreadItem9,
ThreadItem10,
ThreadItem11,
ThreadItem12,
ThreadItem13,
]
]
):
root: Union[
ThreadItem1,
ThreadItem2,
ThreadItem3,
ThreadItem4,
ThreadItem5,
ThreadItem6,
ThreadItem7,
ThreadItem8,
ThreadItem9,
ThreadItem10,
ThreadItem11,
ThreadItem12,
ThreadItem13,
]
class ItemCompletedNotification(BaseModel):
item: ThreadItem
threadId: str
turnId: str

View File

@@ -0,0 +1,486 @@
# generated by datamodel-codegen:
# filename: ItemStartedNotification.json
# timestamp: <normalized>
from __future__ import annotations
from enum import Enum
from typing import Any, Dict, List, Optional, Union
from pydantic import BaseModel, Field, RootModel, conint
class ByteRange(BaseModel):
end: conint(ge=0)
start: conint(ge=0)
class CollabAgentStatus(Enum):
pendingInit = "pendingInit"
running = "running"
completed = "completed"
errored = "errored"
shutdown = "shutdown"
notFound = "notFound"
class CollabAgentTool(Enum):
spawnAgent = "spawnAgent"
sendInput = "sendInput"
resumeAgent = "resumeAgent"
wait = "wait"
closeAgent = "closeAgent"
class CollabAgentToolCallStatus(Enum):
inProgress = "inProgress"
completed = "completed"
failed = "failed"
class Type(Enum):
read = "read"
class CommandAction(BaseModel):
command: str
name: str
path: str
type: Type = Field(..., title="ReadCommandActionType")
class Type52(Enum):
listFiles = "listFiles"
class CommandAction7(BaseModel):
command: str
path: Optional[str] = None
type: Type52 = Field(..., title="ListFilesCommandActionType")
class Type53(Enum):
search = "search"
class CommandAction8(BaseModel):
command: str
path: Optional[str] = None
query: Optional[str] = None
type: Type53 = Field(..., title="SearchCommandActionType")
class Type54(Enum):
unknown = "unknown"
class CommandAction9(BaseModel):
command: str
type: Type54 = Field(..., title="UnknownCommandActionType")
class CommandAction5(
RootModel[Union[CommandAction, CommandAction7, CommandAction8, CommandAction9]]
):
root: Union[CommandAction, CommandAction7, CommandAction8, CommandAction9]
class CommandExecutionStatus(Enum):
inProgress = "inProgress"
completed = "completed"
failed = "failed"
declined = "declined"
class McpToolCallError(BaseModel):
message: str
class McpToolCallResult(BaseModel):
content: List
structuredContent: Optional[Any] = None
class McpToolCallStatus(Enum):
inProgress = "inProgress"
completed = "completed"
failed = "failed"
class PatchApplyStatus(Enum):
inProgress = "inProgress"
completed = "completed"
failed = "failed"
declined = "declined"
class Type55(Enum):
add = "add"
class PatchChangeKind4(BaseModel):
type: Type55 = Field(..., title="AddPatchChangeKindType")
class Type56(Enum):
delete = "delete"
class PatchChangeKind5(BaseModel):
type: Type56 = Field(..., title="DeletePatchChangeKindType")
class Type57(Enum):
update = "update"
class PatchChangeKind6(BaseModel):
move_path: Optional[str] = None
type: Type57 = Field(..., title="UpdatePatchChangeKindType")
class PatchChangeKind(
RootModel[Union[PatchChangeKind4, PatchChangeKind5, PatchChangeKind6]]
):
root: Union[PatchChangeKind4, PatchChangeKind5, PatchChangeKind6]
class TextElement(BaseModel):
byteRange: ByteRange = Field(
...,
description="Byte range in the parent `text` buffer that this element occupies.",
)
placeholder: Optional[str] = Field(
None,
description="Optional human-readable placeholder for the element, displayed in the UI.",
)
class Type58(Enum):
userMessage = "userMessage"
class Type59(Enum):
agentMessage = "agentMessage"
class ThreadItem15(BaseModel):
id: str
text: str
type: Type59 = Field(..., title="AgentMessageThreadItemType")
class Type60(Enum):
plan = "plan"
class ThreadItem16(BaseModel):
id: str
text: str
type: Type60 = Field(..., title="PlanThreadItemType")
class Type61(Enum):
reasoning = "reasoning"
class ThreadItem17(BaseModel):
content: Optional[List[str]] = []
id: str
summary: Optional[List[str]] = []
type: Type61 = Field(..., title="ReasoningThreadItemType")
class Type62(Enum):
commandExecution = "commandExecution"
class ThreadItem18(BaseModel):
aggregatedOutput: Optional[str] = Field(
None, description="The command's output, aggregated from stdout and stderr."
)
command: str = Field(..., description="The command to be executed.")
commandActions: List[CommandAction5] = Field(
...,
description="A best-effort parsing of the command to understand the action(s) it will perform. This returns a list of CommandAction objects because a single shell command may be composed of many commands piped together.",
)
cwd: str = Field(..., description="The command's working directory.")
durationMs: Optional[int] = Field(
None, description="The duration of the command execution in milliseconds."
)
exitCode: Optional[int] = Field(None, description="The command's exit code.")
id: str
processId: Optional[str] = Field(
None, description="Identifier for the underlying PTY process (when available)."
)
status: CommandExecutionStatus
type: Type62 = Field(..., title="CommandExecutionThreadItemType")
class Type63(Enum):
fileChange = "fileChange"
class Type64(Enum):
mcpToolCall = "mcpToolCall"
class ThreadItem20(BaseModel):
arguments: Any
durationMs: Optional[int] = Field(
None, description="The duration of the MCP tool call in milliseconds."
)
error: Optional[McpToolCallError] = None
id: str
result: Optional[McpToolCallResult] = None
server: str
status: McpToolCallStatus
tool: str
type: Type64 = Field(..., title="McpToolCallThreadItemType")
class Type65(Enum):
collabAgentToolCall = "collabAgentToolCall"
class Type66(Enum):
webSearch = "webSearch"
class Type67(Enum):
imageView = "imageView"
class ThreadItem23(BaseModel):
id: str
path: str
type: Type67 = Field(..., title="ImageViewThreadItemType")
class Type68(Enum):
enteredReviewMode = "enteredReviewMode"
class ThreadItem24(BaseModel):
id: str
review: str
type: Type68 = Field(..., title="EnteredReviewModeThreadItemType")
class Type69(Enum):
exitedReviewMode = "exitedReviewMode"
class ThreadItem25(BaseModel):
id: str
review: str
type: Type69 = Field(..., title="ExitedReviewModeThreadItemType")
class Type70(Enum):
contextCompaction = "contextCompaction"
class ThreadItem26(BaseModel):
id: str
type: Type70 = Field(..., title="ContextCompactionThreadItemType")
class Type71(Enum):
text = "text"
class UserInput6(BaseModel):
text: str
text_elements: Optional[List[TextElement]] = Field(
[],
description="UI-defined spans within `text` used to render or persist special elements.",
)
type: Type71 = Field(..., title="TextUserInputType")
class Type72(Enum):
image = "image"
class UserInput7(BaseModel):
type: Type72 = Field(..., title="ImageUserInputType")
url: str
class Type73(Enum):
localImage = "localImage"
class UserInput8(BaseModel):
path: str
type: Type73 = Field(..., title="LocalImageUserInputType")
class Type74(Enum):
skill = "skill"
class UserInput9(BaseModel):
name: str
path: str
type: Type74 = Field(..., title="SkillUserInputType")
class Type75(Enum):
mention = "mention"
class UserInput10(BaseModel):
name: str
path: str
type: Type75 = Field(..., title="MentionUserInputType")
class UserInput(
RootModel[Union[UserInput6, UserInput7, UserInput8, UserInput9, UserInput10]]
):
root: Union[UserInput6, UserInput7, UserInput8, UserInput9, UserInput10]
class Type76(Enum):
search = "search"
class WebSearchAction5(BaseModel):
queries: Optional[List[str]] = None
query: Optional[str] = None
type: Type76 = Field(..., title="SearchWebSearchActionType")
class Type77(Enum):
openPage = "openPage"
class WebSearchAction6(BaseModel):
type: Type77 = Field(..., title="OpenPageWebSearchActionType")
url: Optional[str] = None
class Type78(Enum):
findInPage = "findInPage"
class WebSearchAction7(BaseModel):
pattern: Optional[str] = None
type: Type78 = Field(..., title="FindInPageWebSearchActionType")
url: Optional[str] = None
class Type79(Enum):
other = "other"
class WebSearchAction8(BaseModel):
type: Type79 = Field(..., title="OtherWebSearchActionType")
class WebSearchAction(
RootModel[
Union[WebSearchAction5, WebSearchAction6, WebSearchAction7, WebSearchAction8]
]
):
root: Union[WebSearchAction5, WebSearchAction6, WebSearchAction7, WebSearchAction8]
class CollabAgentState(BaseModel):
message: Optional[str] = None
status: CollabAgentStatus
class FileUpdateChange(BaseModel):
diff: str
kind: PatchChangeKind
path: str
class ThreadItem14(BaseModel):
content: List[UserInput]
id: str
type: Type58 = Field(..., title="UserMessageThreadItemType")
class ThreadItem19(BaseModel):
changes: List[FileUpdateChange]
id: str
status: PatchApplyStatus
type: Type63 = Field(..., title="FileChangeThreadItemType")
class ThreadItem21(BaseModel):
agentsStates: Dict[str, CollabAgentState] = Field(
..., description="Last known status of the target agents, when available."
)
id: str = Field(..., description="Unique identifier for this collab tool call.")
prompt: Optional[str] = Field(
None,
description="Prompt text sent as part of the collab tool call, when available.",
)
receiverThreadIds: List[str] = Field(
...,
description="Thread ID of the receiving agent, when applicable. In case of spawn operation, this corresponds to the newly spawned agent.",
)
senderThreadId: str = Field(
..., description="Thread ID of the agent issuing the collab request."
)
status: CollabAgentToolCallStatus = Field(
..., description="Current status of the collab tool call."
)
tool: CollabAgentTool = Field(
..., description="Name of the collab tool that was invoked."
)
type: Type65 = Field(..., title="CollabAgentToolCallThreadItemType")
class ThreadItem22(BaseModel):
action: Optional[WebSearchAction] = None
id: str
query: str
type: Type66 = Field(..., title="WebSearchThreadItemType")
class ThreadItem(
RootModel[
Union[
ThreadItem14,
ThreadItem15,
ThreadItem16,
ThreadItem17,
ThreadItem18,
ThreadItem19,
ThreadItem20,
ThreadItem21,
ThreadItem22,
ThreadItem23,
ThreadItem24,
ThreadItem25,
ThreadItem26,
]
]
):
root: Union[
ThreadItem14,
ThreadItem15,
ThreadItem16,
ThreadItem17,
ThreadItem18,
ThreadItem19,
ThreadItem20,
ThreadItem21,
ThreadItem22,
ThreadItem23,
ThreadItem24,
ThreadItem25,
ThreadItem26,
]
class ItemStartedNotification(BaseModel):
item: ThreadItem
threadId: str
turnId: str

View File

@@ -0,0 +1,18 @@
# generated by datamodel-codegen:
# filename: ListMcpServerStatusParams.json
# timestamp: <normalized>
from __future__ import annotations
from typing import Optional
from pydantic import BaseModel, Field, conint
class ListMcpServerStatusParams(BaseModel):
cursor: Optional[str] = Field(
None, description="Opaque pagination cursor returned by a previous call."
)
limit: Optional[conint(ge=0)] = Field(
None, description="Optional page size; defaults to a server-defined value."
)

View File

@@ -0,0 +1,65 @@
# generated by datamodel-codegen:
# filename: ListMcpServerStatusResponse.json
# timestamp: <normalized>
from __future__ import annotations
from enum import Enum
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
class McpAuthStatus(Enum):
unsupported = "unsupported"
notLoggedIn = "notLoggedIn"
bearerToken = "bearerToken"
oAuth = "oAuth"
class Resource(BaseModel):
field_meta: Optional[Any] = Field(None, alias="_meta")
annotations: Optional[Any] = None
description: Optional[str] = None
icons: Optional[List] = None
mimeType: Optional[str] = None
name: str
size: Optional[int] = None
title: Optional[str] = None
uri: str
class ResourceTemplate(BaseModel):
annotations: Optional[Any] = None
description: Optional[str] = None
mimeType: Optional[str] = None
name: str
title: Optional[str] = None
uriTemplate: str
class Tool(BaseModel):
field_meta: Optional[Any] = Field(None, alias="_meta")
annotations: Optional[Any] = None
description: Optional[str] = None
icons: Optional[List] = None
inputSchema: Any
name: str
outputSchema: Optional[Any] = None
title: Optional[str] = None
class McpServerStatus(BaseModel):
authStatus: McpAuthStatus
name: str
resourceTemplates: List[ResourceTemplate]
resources: List[Resource]
tools: Dict[str, Tool]
class ListMcpServerStatusResponse(BaseModel):
data: List[McpServerStatus]
nextCursor: Optional[str] = Field(
None,
description="Opaque cursor to pass to the next call to continue after the last item. If None, there are no more items to return.",
)

View File

@@ -0,0 +1,54 @@
# generated by datamodel-codegen:
# filename: LoginAccountParams.json
# timestamp: <normalized>
from __future__ import annotations
from enum import Enum
from typing import Optional, Union
from pydantic import BaseModel, Field, RootModel
class Type(Enum):
apiKey = "apiKey"
class LoginAccountParams1(BaseModel):
apiKey: str
type: Type = Field(..., title="ApiKeyv2::LoginAccountParamsType")
class Type81(Enum):
chatgpt = "chatgpt"
class LoginAccountParams2(BaseModel):
type: Type81 = Field(..., title="Chatgptv2::LoginAccountParamsType")
class Type82(Enum):
chatgptAuthTokens = "chatgptAuthTokens"
class LoginAccountParams3(BaseModel):
accessToken: str = Field(
...,
description="Access token (JWT) supplied by the client. This token is used for backend API requests and email extraction.",
)
chatgptAccountId: str = Field(
..., description="Workspace/account identifier supplied by the client."
)
chatgptPlanType: Optional[str] = Field(
None,
description="Optional plan type supplied by the client.\n\nWhen `null`, Codex attempts to derive the plan type from access-token claims. If unavailable, the plan defaults to `unknown`.",
)
type: Type82 = Field(..., title="ChatgptAuthTokensv2::LoginAccountParamsType")
class LoginAccountParams(
RootModel[Union[LoginAccountParams1, LoginAccountParams2, LoginAccountParams3]]
):
root: Union[LoginAccountParams1, LoginAccountParams2, LoginAccountParams3] = Field(
..., title="LoginAccountParams"
)

View File

@@ -0,0 +1,49 @@
# generated by datamodel-codegen:
# filename: LoginAccountResponse.json
# timestamp: <normalized>
from __future__ import annotations
from enum import Enum
from typing import Union
from pydantic import BaseModel, Field, RootModel
class Type(Enum):
apiKey = "apiKey"
class LoginAccountResponse1(BaseModel):
type: Type = Field(..., title="ApiKeyv2::LoginAccountResponseType")
class Type84(Enum):
chatgpt = "chatgpt"
class LoginAccountResponse2(BaseModel):
authUrl: str = Field(
...,
description="URL the client should open in a browser to initiate the OAuth flow.",
)
loginId: str
type: Type84 = Field(..., title="Chatgptv2::LoginAccountResponseType")
class Type85(Enum):
chatgptAuthTokens = "chatgptAuthTokens"
class LoginAccountResponse3(BaseModel):
type: Type85 = Field(..., title="ChatgptAuthTokensv2::LoginAccountResponseType")
class LoginAccountResponse(
RootModel[
Union[LoginAccountResponse1, LoginAccountResponse2, LoginAccountResponse3]
]
):
root: Union[LoginAccountResponse1, LoginAccountResponse2, LoginAccountResponse3] = (
Field(..., title="LoginAccountResponse")
)

View File

@@ -0,0 +1,11 @@
# generated by datamodel-codegen:
# filename: LogoutAccountResponse.json
# timestamp: <normalized>
from __future__ import annotations
from pydantic import BaseModel
class LogoutAccountResponse(BaseModel):
pass

Some files were not shown because too many files have changed in this diff Show More