mirror of
https://github.com/openai/codex.git
synced 2026-02-01 22:47:52 +00:00
Compare commits
54 Commits
tibo/parse
...
fix/otlp-l
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1b26719958 | ||
|
|
6a57d7980b | ||
|
|
198289934f | ||
|
|
6709ad8975 | ||
|
|
cf515142b0 | ||
|
|
74b2238931 | ||
|
|
cc0b5e8504 | ||
|
|
8e49a2c0d1 | ||
|
|
af1ed2685e | ||
|
|
1a0e2e612b | ||
|
|
acfd94f625 | ||
|
|
cabf85aa18 | ||
|
|
bc284669c2 | ||
|
|
fbe883318d | ||
|
|
2a06d64bc9 | ||
|
|
7daaabc795 | ||
|
|
1aed01e99f | ||
|
|
ed64804cb5 | ||
|
|
5c380d5b1e | ||
|
|
46b0c4acbb | ||
|
|
5b5a5b92b5 | ||
|
|
ea56186c2b | ||
|
|
cacdae8c05 | ||
|
|
bc92dc5cf0 | ||
|
|
7e5b3e069e | ||
|
|
e2e3f4490e | ||
|
|
225614d7fb | ||
|
|
16c66c37eb | ||
|
|
e9c548c65e | ||
|
|
fceae86581 | ||
|
|
568b938c80 | ||
|
|
24d6e0114f | ||
|
|
d3ff668f68 | ||
|
|
81caee3400 | ||
|
|
51dd5af807 | ||
|
|
6372ba9d5f | ||
|
|
bdfdebcfa1 | ||
|
|
62a73b6d58 | ||
|
|
be4364bb80 | ||
|
|
0d3e673019 | ||
|
|
41a317321d | ||
|
|
051bf81df9 | ||
|
|
a70f5b0b3c | ||
|
|
224c4867dd | ||
|
|
c9c6560685 | ||
|
|
634764ece9 | ||
|
|
5bc3e325a6 | ||
|
|
4156060416 | ||
|
|
98122cbad0 | ||
|
|
7b21b443bb | ||
|
|
93dec9045e | ||
|
|
69898e3dba | ||
|
|
c4af304c77 | ||
|
|
5b7707dfb1 |
3
.bazelignore
Normal file
3
.bazelignore
Normal file
@@ -0,0 +1,3 @@
|
||||
# Without this, Bazel will consider BUILD.bazel files in
|
||||
# .git/sl/origbackups (which can be populated by Sapling SCM).
|
||||
.git
|
||||
45
.bazelrc
Normal file
45
.bazelrc
Normal file
@@ -0,0 +1,45 @@
|
||||
common --repo_env=BAZEL_DO_NOT_DETECT_CPP_TOOLCHAIN=1
|
||||
common --repo_env=BAZEL_NO_APPLE_CPP_TOOLCHAIN=1
|
||||
|
||||
common --disk_cache=~/.cache/bazel-disk-cache
|
||||
common --repo_contents_cache=~/.cache/bazel-repo-contents-cache
|
||||
common --repository_cache=~/.cache/bazel-repo-cache
|
||||
|
||||
common --experimental_platform_in_output_dir
|
||||
|
||||
common --enable_platform_specific_config
|
||||
# TODO(zbarsky): We need to untangle these libc constraints to get linux remote builds working.
|
||||
common:linux --host_platform=//:local
|
||||
common --@rules_cc//cc/toolchains/args/archiver_flags:use_libtool_on_macos=False
|
||||
common --@toolchains_llvm_bootstrapped//config:experimental_stub_libgcc_s
|
||||
|
||||
# We need to use the sh toolchain on windows so we don't send host bash paths to the linux executor.
|
||||
common:windows --@rules_rust//rust/settings:experimental_use_sh_toolchain_for_bootstrap_process_wrapper
|
||||
|
||||
# TODO(zbarsky): rules_rust doesn't implement this flag properly with remote exec...
|
||||
# common --@rules_rust//rust/settings:pipelined_compilation
|
||||
|
||||
common --incompatible_strict_action_env
|
||||
# Not ideal, but We need to allow dotslash to be found
|
||||
common --test_env=PATH=/opt/homebrew/bin:/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin
|
||||
|
||||
common --test_output=errors
|
||||
common --bes_results_url=https://app.buildbuddy.io/invocation/
|
||||
common --bes_backend=grpcs://remote.buildbuddy.io
|
||||
common --remote_cache=grpcs://remote.buildbuddy.io
|
||||
common --remote_download_toplevel
|
||||
common --nobuild_runfile_links
|
||||
common --remote_timeout=3600
|
||||
common --noexperimental_throttle_remote_action_building
|
||||
common --experimental_remote_execution_keepalive
|
||||
common --grpc_keepalive_time=30s
|
||||
|
||||
# This limits both in-flight executions and concurrent downloads. Even with high number
|
||||
# of jobs execution will still be limited by CPU cores, so this just pays a bit of
|
||||
# memory in exchange for higher download concurrency.
|
||||
common --jobs=30
|
||||
|
||||
common:remote --extra_execution_platforms=//:rbe
|
||||
common:remote --remote_executor=grpcs://remote.buildbuddy.io
|
||||
common:remote --jobs=800
|
||||
|
||||
20
.github/workflows/Dockerfile.bazel
vendored
Normal file
20
.github/workflows/Dockerfile.bazel
vendored
Normal file
@@ -0,0 +1,20 @@
|
||||
FROM ubuntu:24.04
|
||||
|
||||
# TODO(mbolin): Published to docker.io/mbolin491/codex-bazel:latest for
|
||||
# initial debugging, but we should publish to a more proper location.
|
||||
#
|
||||
# docker buildx create --use
|
||||
# docker buildx build --platform linux/amd64,linux/arm64 -f .github/workflows/Dockerfile.bazel -t mbolin491/codex-bazel:latest --push .
|
||||
|
||||
RUN apt-get update && \
|
||||
apt-get install -y --no-install-recommends \
|
||||
curl git python3 ca-certificates && \
|
||||
rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Install dotslash.
|
||||
RUN curl -LSfs "https://github.com/facebook/dotslash/releases/download/v0.5.8/dotslash-ubuntu-22.04.$(uname -m).tar.gz" | tar fxz - -C /usr/local/bin
|
||||
|
||||
# Ubuntu 24.04 ships with user 'ubuntu' already created with UID 1000.
|
||||
USER ubuntu
|
||||
|
||||
WORKDIR /workspace
|
||||
110
.github/workflows/bazel.yml
vendored
Normal file
110
.github/workflows/bazel.yml
vendored
Normal file
@@ -0,0 +1,110 @@
|
||||
name: Bazel (experimental)
|
||||
|
||||
# Note this workflow was originally derived from:
|
||||
# https://github.com/cerisier/toolchains_llvm_bootstrapped/blob/main/.github/workflows/ci.yaml
|
||||
|
||||
on:
|
||||
pull_request: {}
|
||||
push:
|
||||
branches:
|
||||
- main
|
||||
workflow_dispatch:
|
||||
|
||||
concurrency:
|
||||
# Cancel previous actions from the same PR or branch except 'main' branch.
|
||||
# See https://docs.github.com/en/actions/using-jobs/using-concurrency and https://docs.github.com/en/actions/learn-github-actions/contexts for more info.
|
||||
group: concurrency-group::${{ github.workflow }}::${{ github.event.pull_request.number > 0 && format('pr-{0}', github.event.pull_request.number) || github.ref_name }}${{ github.ref_name == 'main' && format('::{0}', github.run_id) || ''}}
|
||||
cancel-in-progress: ${{ github.ref_name != 'main' }}
|
||||
jobs:
|
||||
test:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
include:
|
||||
# macOS
|
||||
- os: macos-15-xlarge
|
||||
target: aarch64-apple-darwin
|
||||
- os: macos-15-xlarge
|
||||
target: x86_64-apple-darwin
|
||||
|
||||
# Linux
|
||||
- os: ubuntu-24.04-arm
|
||||
target: aarch64-unknown-linux-gnu
|
||||
- os: ubuntu-24.04
|
||||
target: x86_64-unknown-linux-gnu
|
||||
- os: ubuntu-24.04-arm
|
||||
target: aarch64-unknown-linux-musl
|
||||
- os: ubuntu-24.04
|
||||
target: x86_64-unknown-linux-musl
|
||||
# TODO: Enable Windows once we fix the toolchain issues there.
|
||||
#- os: windows-latest
|
||||
# target: x86_64-pc-windows-gnullvm
|
||||
runs-on: ${{ matrix.os }}
|
||||
|
||||
# Configure a human readable name for each job
|
||||
name: Local Bazel build on ${{ matrix.os }} for ${{ matrix.target }}
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
|
||||
# Some integration tests rely on DotSlash being installed.
|
||||
# See https://github.com/openai/codex/pull/7617.
|
||||
- name: Install DotSlash
|
||||
uses: facebook/install-dotslash@v2
|
||||
|
||||
- name: Make DotSlash available in PATH (Unix)
|
||||
if: runner.os != 'Windows'
|
||||
run: cp "$(which dotslash)" /usr/local/bin
|
||||
|
||||
- name: Make DotSlash available in PATH (Windows)
|
||||
if: runner.os == 'Windows'
|
||||
shell: pwsh
|
||||
run: Copy-Item (Get-Command dotslash).Source -Destination "$env:LOCALAPPDATA\Microsoft\WindowsApps\dotslash.exe"
|
||||
|
||||
# Install Bazel via Bazelisk
|
||||
- name: Set up Bazel
|
||||
uses: bazelbuild/setup-bazelisk@v3
|
||||
|
||||
# TODO(mbolin): Bring this back once we have caching working. Currently,
|
||||
# we never seem to get a cache hit but we still end up paying the cost of
|
||||
# uploading at the end of the build, which takes over a minute!
|
||||
#
|
||||
# Cache build and external artifacts so that the next ci build is incremental.
|
||||
# Because github action caches cannot be updated after a build, we need to
|
||||
# store the contents of each build in a unique cache key, then fall back to loading
|
||||
# it on the next ci run. We use hashFiles(...) in the key and restore-keys- with
|
||||
# the prefix to load the most recent cache for the branch on a cache miss. You
|
||||
# should customize the contents of hashFiles to capture any bazel input sources,
|
||||
# although this doesn't need to be perfect. If none of the input sources change
|
||||
# then a cache hit will load an existing cache and bazel won't have to do any work.
|
||||
# In the case of a cache miss, you want the fallback cache to contain most of the
|
||||
# previously built artifacts to minimize build time. The more precise you are with
|
||||
# hashFiles sources the less work bazel will have to do.
|
||||
# - name: Mount bazel caches
|
||||
# uses: actions/cache@v4
|
||||
# with:
|
||||
# path: |
|
||||
# ~/.cache/bazel-repo-cache
|
||||
# ~/.cache/bazel-repo-contents-cache
|
||||
# key: bazel-cache-${{ matrix.os }}-${{ hashFiles('**/BUILD.bazel', '**/*.bzl', 'MODULE.bazel') }}
|
||||
# restore-keys: |
|
||||
# bazel-cache-${{ matrix.os }}
|
||||
|
||||
- name: Configure Bazel startup args (Windows)
|
||||
if: runner.os == 'Windows'
|
||||
shell: pwsh
|
||||
run: |
|
||||
# Use a very short path to reduce argv/path length issues.
|
||||
"BAZEL_STARTUP_ARGS=--output_user_root=C:\" | Out-File -FilePath $env:GITHUB_ENV -Encoding utf8 -Append
|
||||
|
||||
- name: bazel test //...
|
||||
env:
|
||||
BUILDBUDDY_API_KEY: ${{ secrets.BUILDBUDDY_API_KEY }}
|
||||
shell: bash
|
||||
run: |
|
||||
bazel $BAZEL_STARTUP_ARGS --bazelrc=.github/workflows/ci.bazelrc test //... \
|
||||
--build_metadata=REPO_URL=https://github.com/openai/codex.git \
|
||||
--build_metadata=COMMIT_SHA=$(git rev-parse HEAD) \
|
||||
--build_metadata=ROLE=CI \
|
||||
--build_metadata=VISIBILITY=PUBLIC \
|
||||
"--remote_header=x-buildbuddy-api-key=$BUILDBUDDY_API_KEY"
|
||||
20
.github/workflows/ci.bazelrc
vendored
Normal file
20
.github/workflows/ci.bazelrc
vendored
Normal file
@@ -0,0 +1,20 @@
|
||||
common --remote_download_minimal
|
||||
common --nobuild_runfile_links
|
||||
common --keep_going
|
||||
|
||||
# We prefer to run the build actions entirely remotely so we can dial up the concurrency.
|
||||
# We have platform-specific tests, so we want to execute the tests on all platforms using the strongest sandboxing available on each platform.
|
||||
|
||||
# On linux, we can do a full remote build/test, by targeting the right (x86/arm) runners, so we have coverage of both.
|
||||
# Linux crossbuilds don't work until we untangle the libc constraint mess.
|
||||
common:linux --config=remote
|
||||
common:linux --strategy=remote
|
||||
common:linux --platforms=//:rbe
|
||||
|
||||
# On mac, we can run all the build actions remotely but test actions locally.
|
||||
common:macos --config=remote
|
||||
common:macos --strategy=remote
|
||||
common:macos --strategy=TestRunner=darwin-sandbox,local
|
||||
|
||||
common:windows --strategy=TestRunner=local
|
||||
|
||||
19
BUILD.bazel
Normal file
19
BUILD.bazel
Normal file
@@ -0,0 +1,19 @@
|
||||
# We mark the local platform as glibc-compatible so that rust can grab a toolchain for us.
|
||||
# TODO(zbarsky): Upstream a better libc constraint into rules_rust.
|
||||
# We only enable this on linux though for sanity, and because it breaks remote execution.
|
||||
platform(
|
||||
name = "local",
|
||||
constraint_values = [
|
||||
"@toolchains_llvm_bootstrapped//constraints/libc:gnu.2.28",
|
||||
],
|
||||
parents = [
|
||||
"@platforms//host",
|
||||
],
|
||||
)
|
||||
|
||||
alias(
|
||||
name = "rbe",
|
||||
actual = "@rbe_platform",
|
||||
)
|
||||
|
||||
exports_files(["AGENTS.md"])
|
||||
128
MODULE.bazel
Normal file
128
MODULE.bazel
Normal file
@@ -0,0 +1,128 @@
|
||||
bazel_dep(name = "platforms", version = "1.0.0")
|
||||
bazel_dep(name = "toolchains_llvm_bootstrapped", version = "0.3.1")
|
||||
archive_override(
|
||||
module_name = "toolchains_llvm_bootstrapped",
|
||||
integrity = "sha256-9ks21bgEqbQWmwUIvqeLA64+Jk6o4ZVjC8KxjVa2Vw8=",
|
||||
strip_prefix = "toolchains_llvm_bootstrapped-e3775e66a7b6d287c705ca0cd24497ef4a77c503",
|
||||
urls = ["https://github.com/cerisier/toolchains_llvm_bootstrapped/archive/e3775e66a7b6d287c705ca0cd24497ef4a77c503/master.tar.gz"],
|
||||
patch_strip = 1,
|
||||
patches = [
|
||||
"//patches:llvm_toolchain_archive_params.patch",
|
||||
],
|
||||
)
|
||||
|
||||
osx = use_extension("@toolchains_llvm_bootstrapped//toolchain/extension:osx.bzl", "osx")
|
||||
osx.framework(name = "ApplicationServices")
|
||||
osx.framework(name = "AppKit")
|
||||
osx.framework(name = "ColorSync")
|
||||
osx.framework(name = "CoreFoundation")
|
||||
osx.framework(name = "CoreGraphics")
|
||||
osx.framework(name = "CoreServices")
|
||||
osx.framework(name = "CoreText")
|
||||
osx.framework(name = "CFNetwork")
|
||||
osx.framework(name = "Foundation")
|
||||
osx.framework(name = "ImageIO")
|
||||
osx.framework(name = "Kernel")
|
||||
osx.framework(name = "OSLog")
|
||||
osx.framework(name = "Security")
|
||||
osx.framework(name = "SystemConfiguration")
|
||||
|
||||
register_toolchains(
|
||||
"@toolchains_llvm_bootstrapped//toolchain:all",
|
||||
)
|
||||
|
||||
bazel_dep(name = "rules_cc", version = "0.2.16")
|
||||
bazel_dep(name = "rules_platform", version = "0.1.0")
|
||||
bazel_dep(name = "rules_rust", version = "0.68.1")
|
||||
single_version_override(
|
||||
module_name = "rules_rust",
|
||||
patch_strip = 1,
|
||||
patches = [
|
||||
"//patches:rules_rust.patch",
|
||||
"//patches:rules_rust_windows_gnu.patch",
|
||||
"//patches:rules_rust_musl.patch",
|
||||
],
|
||||
)
|
||||
|
||||
RUST_TRIPLES = [
|
||||
"aarch64-unknown-linux-musl",
|
||||
"aarch64-apple-darwin",
|
||||
"aarch64-pc-windows-gnullvm",
|
||||
"x86_64-unknown-linux-musl",
|
||||
"x86_64-apple-darwin",
|
||||
"x86_64-pc-windows-gnullvm",
|
||||
]
|
||||
|
||||
rust = use_extension("@rules_rust//rust:extensions.bzl", "rust")
|
||||
rust.toolchain(
|
||||
edition = "2024",
|
||||
extra_target_triples = RUST_TRIPLES,
|
||||
versions = ["1.90.0"],
|
||||
)
|
||||
use_repo(rust, "rust_toolchains")
|
||||
|
||||
register_toolchains("@rust_toolchains//:all")
|
||||
|
||||
bazel_dep(name = "rules_rs", version = "0.0.23")
|
||||
|
||||
crate = use_extension("@rules_rs//rs:extensions.bzl", "crate")
|
||||
crate.from_cargo(
|
||||
cargo_lock = "//codex-rs:Cargo.lock",
|
||||
cargo_toml = "//codex-rs:Cargo.toml",
|
||||
platform_triples = RUST_TRIPLES,
|
||||
)
|
||||
|
||||
bazel_dep(name = "openssl", version = "3.5.4.bcr.0")
|
||||
|
||||
crate.annotation(
|
||||
build_script_data = [
|
||||
"@openssl//:gen_dir",
|
||||
],
|
||||
build_script_env = {
|
||||
"OPENSSL_DIR": "$(execpath @openssl//:gen_dir)",
|
||||
"OPENSSL_NO_VENDOR": "1",
|
||||
"OPENSSL_STATIC": "1",
|
||||
},
|
||||
crate = "openssl-sys",
|
||||
data = ["@openssl//:gen_dir"],
|
||||
)
|
||||
|
||||
inject_repo(crate, "openssl")
|
||||
|
||||
# Fix readme inclusions
|
||||
crate.annotation(
|
||||
crate = "windows-link",
|
||||
patch_args = ["-p1"],
|
||||
patches = [
|
||||
"//patches:windows-link.patch"
|
||||
],
|
||||
)
|
||||
|
||||
WINDOWS_IMPORT_LIB = """
|
||||
load("@rules_cc//cc:defs.bzl", "cc_import")
|
||||
|
||||
cc_import(
|
||||
name = "windows_import_lib",
|
||||
static_library = glob(["lib/*.a"])[0],
|
||||
)
|
||||
"""
|
||||
|
||||
crate.annotation(
|
||||
additive_build_file_content = WINDOWS_IMPORT_LIB,
|
||||
crate = "windows_x86_64_gnullvm",
|
||||
gen_build_script = "off",
|
||||
deps = [":windows_import_lib"],
|
||||
)
|
||||
crate.annotation(
|
||||
additive_build_file_content = WINDOWS_IMPORT_LIB,
|
||||
crate = "windows_aarch64_gnullvm",
|
||||
gen_build_script = "off",
|
||||
deps = [":windows_import_lib"],
|
||||
)
|
||||
use_repo(crate, "crates")
|
||||
|
||||
rbe_platform_repository = use_repo_rule("//:rbe.bzl", "rbe_platform_repository")
|
||||
|
||||
rbe_platform_repository(
|
||||
name = "rbe_platform",
|
||||
)
|
||||
1097
MODULE.bazel.lock
generated
Normal file
1097
MODULE.bazel.lock
generated
Normal file
File diff suppressed because one or more lines are too long
@@ -10,6 +10,7 @@ from_date = "2024-10-01"
|
||||
to_date = "2024-10-15"
|
||||
target_app = "cli"
|
||||
|
||||
# Test announcement only for local build version until 2026-01-10 excluded (past)
|
||||
[[announcements]]
|
||||
content = "This is a test announcement"
|
||||
version_regex = "^0\\.0\\.0$"
|
||||
|
||||
@@ -95,7 +95,6 @@ function detectPackageManager() {
|
||||
return "bun";
|
||||
}
|
||||
|
||||
|
||||
if (
|
||||
__dirname.includes(".bun/install/global") ||
|
||||
__dirname.includes(".bun\\install\\global")
|
||||
|
||||
1
codex-rs/BUILD.bazel
Normal file
1
codex-rs/BUILD.bazel
Normal file
@@ -0,0 +1 @@
|
||||
|
||||
2
codex-rs/Cargo.lock
generated
2
codex-rs/Cargo.lock
generated
@@ -1156,7 +1156,6 @@ dependencies = [
|
||||
"codex-execpolicy",
|
||||
"codex-login",
|
||||
"codex-mcp-server",
|
||||
"codex-process-hardening",
|
||||
"codex-protocol",
|
||||
"codex-responses-api-proxy",
|
||||
"codex-rmcp-client",
|
||||
@@ -1166,7 +1165,6 @@ dependencies = [
|
||||
"codex-utils-absolute-path",
|
||||
"codex-utils-cargo-bin",
|
||||
"codex-windows-sandbox",
|
||||
"ctor 0.5.0",
|
||||
"libc",
|
||||
"owo-colors",
|
||||
"predicates",
|
||||
|
||||
6
codex-rs/ansi-escape/BUILD.bazel
Normal file
6
codex-rs/ansi-escape/BUILD.bazel
Normal file
@@ -0,0 +1,6 @@
|
||||
load("//:defs.bzl", "codex_rust_crate")
|
||||
|
||||
codex_rust_crate(
|
||||
name = "ansi-escape",
|
||||
crate_name = "codex_ansi_escape",
|
||||
)
|
||||
6
codex-rs/app-server-protocol/BUILD.bazel
Normal file
6
codex-rs/app-server-protocol/BUILD.bazel
Normal file
@@ -0,0 +1,6 @@
|
||||
load("//:defs.bzl", "codex_rust_crate")
|
||||
|
||||
codex_rust_crate(
|
||||
name = "app-server-protocol",
|
||||
crate_name = "codex_app_server_protocol",
|
||||
)
|
||||
@@ -109,6 +109,10 @@ client_request_definitions! {
|
||||
params: v2::ThreadResumeParams,
|
||||
response: v2::ThreadResumeResponse,
|
||||
},
|
||||
ThreadFork => "thread/fork" {
|
||||
params: v2::ThreadForkParams,
|
||||
response: v2::ThreadForkResponse,
|
||||
},
|
||||
ThreadArchive => "thread/archive" {
|
||||
params: v2::ThreadArchiveParams,
|
||||
response: v2::ThreadArchiveResponse,
|
||||
@@ -121,6 +125,10 @@ client_request_definitions! {
|
||||
params: v2::ThreadListParams,
|
||||
response: v2::ThreadListResponse,
|
||||
},
|
||||
ThreadLoadedList => "thread/loaded/list" {
|
||||
params: v2::ThreadLoadedListParams,
|
||||
response: v2::ThreadLoadedListResponse,
|
||||
},
|
||||
SkillsList => "skills/list" {
|
||||
params: v2::SkillsListParams,
|
||||
response: v2::SkillsListResponse,
|
||||
@@ -226,6 +234,11 @@ client_request_definitions! {
|
||||
params: v1::ResumeConversationParams,
|
||||
response: v1::ResumeConversationResponse,
|
||||
},
|
||||
/// Fork a recorded Codex conversation into a new session.
|
||||
ForkConversation {
|
||||
params: v1::ForkConversationParams,
|
||||
response: v1::ForkConversationResponse,
|
||||
},
|
||||
ArchiveConversation {
|
||||
params: v1::ArchiveConversationParams,
|
||||
response: v1::ArchiveConversationResponse,
|
||||
|
||||
@@ -83,6 +83,15 @@ pub struct ResumeConversationResponse {
|
||||
pub rollout_path: PathBuf,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ForkConversationResponse {
|
||||
pub conversation_id: ThreadId,
|
||||
pub model: String,
|
||||
pub initial_messages: Option<Vec<EventMsg>>,
|
||||
pub rollout_path: PathBuf,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(untagged)]
|
||||
pub enum GetConversationSummaryParams {
|
||||
@@ -148,6 +157,14 @@ pub struct ResumeConversationParams {
|
||||
pub overrides: Option<NewConversationParams>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ForkConversationParams {
|
||||
pub path: Option<PathBuf>,
|
||||
pub conversation_id: Option<ThreadId>,
|
||||
pub overrides: Option<NewConversationParams>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct AddConversationSubscriptionResponse {
|
||||
|
||||
@@ -1080,6 +1080,47 @@ pub struct ThreadResumeResponse {
|
||||
pub reasoning_effort: Option<ReasoningEffort>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
/// There are two ways to fork a thread:
|
||||
/// 1. By thread_id: load the thread from disk by thread_id and fork it into a new thread.
|
||||
/// 2. By path: load the thread from disk by path and fork it into a new thread.
|
||||
///
|
||||
/// If using path, the thread_id param will be ignored.
|
||||
///
|
||||
/// Prefer using thread_id whenever possible.
|
||||
pub struct ThreadForkParams {
|
||||
pub thread_id: String,
|
||||
|
||||
/// [UNSTABLE] Specify the rollout path to fork from.
|
||||
/// If specified, the thread_id param will be ignored.
|
||||
pub path: Option<PathBuf>,
|
||||
|
||||
/// Configuration overrides for the forked thread, if any.
|
||||
pub model: Option<String>,
|
||||
pub model_provider: Option<String>,
|
||||
pub cwd: Option<String>,
|
||||
pub approval_policy: Option<AskForApproval>,
|
||||
pub sandbox: Option<SandboxMode>,
|
||||
pub config: Option<HashMap<String, serde_json::Value>>,
|
||||
pub base_instructions: Option<String>,
|
||||
pub developer_instructions: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct ThreadForkResponse {
|
||||
pub thread: Thread,
|
||||
pub model: String,
|
||||
pub model_provider: String,
|
||||
pub cwd: PathBuf,
|
||||
pub approval_policy: AskForApproval,
|
||||
pub sandbox: SandboxPolicy,
|
||||
pub reasoning_effort: Option<ReasoningEffort>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
@@ -1139,6 +1180,27 @@ pub struct ThreadListResponse {
|
||||
pub next_cursor: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct ThreadLoadedListParams {
|
||||
/// Opaque pagination cursor returned by a previous call.
|
||||
pub cursor: Option<String>,
|
||||
/// Optional page size; defaults to no limit.
|
||||
pub limit: Option<u32>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct ThreadLoadedListResponse {
|
||||
/// Thread ids for sessions currently loaded in memory.
|
||||
pub data: Vec<String>,
|
||||
/// Opaque cursor to pass to the next call to continue after the last item.
|
||||
/// if None, there are no more items to return.
|
||||
pub next_cursor: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
@@ -1254,7 +1316,7 @@ pub struct Thread {
|
||||
pub source: SessionSource,
|
||||
/// Optional Git metadata captured when the thread was created.
|
||||
pub git_info: Option<GitInfo>,
|
||||
/// Only populated on `thread/resume` and `thread/rollback` responses.
|
||||
/// Only populated on `thread/resume`, `thread/rollback`, `thread/fork` responses.
|
||||
/// For all other responses and notifications returning a Thread,
|
||||
/// the turns field will be an empty list.
|
||||
pub turns: Vec<Turn>,
|
||||
@@ -1330,7 +1392,7 @@ impl From<CoreTokenUsage> for TokenUsageBreakdown {
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct Turn {
|
||||
pub id: String,
|
||||
/// Only populated on a `thread/resume` response.
|
||||
/// Only populated on a `thread/resume` or `thread/fork` response.
|
||||
/// For all other responses and notifications returning a Turn,
|
||||
/// the items field will be an empty list.
|
||||
pub items: Vec<ThreadItem>,
|
||||
|
||||
6
codex-rs/app-server-test-client/BUILD.bazel
Normal file
6
codex-rs/app-server-test-client/BUILD.bazel
Normal file
@@ -0,0 +1,6 @@
|
||||
load("//:defs.bzl", "codex_rust_crate")
|
||||
|
||||
codex_rust_crate(
|
||||
name = "codex-app-server-test-client",
|
||||
crate_name = "codex_app_server_test_client",
|
||||
)
|
||||
@@ -545,7 +545,7 @@ impl CodexClient {
|
||||
print!("{}", event.delta);
|
||||
std::io::stdout().flush().ok();
|
||||
}
|
||||
EventMsg::TaskComplete(event) => {
|
||||
EventMsg::TurnComplete(event) => {
|
||||
println!("\n[task complete: {event:?}]");
|
||||
break;
|
||||
}
|
||||
|
||||
8
codex-rs/app-server/BUILD.bazel
Normal file
8
codex-rs/app-server/BUILD.bazel
Normal file
@@ -0,0 +1,8 @@
|
||||
load("//:defs.bzl", "codex_rust_crate")
|
||||
|
||||
codex_rust_crate(
|
||||
name = "app-server",
|
||||
crate_name = "codex_app_server",
|
||||
integration_deps_extra = ["//codex-rs/app-server/tests/common:common"],
|
||||
test_tags = ["no-sandbox"],
|
||||
)
|
||||
@@ -41,7 +41,7 @@ Use the thread APIs to create, list, or archive conversations. Drive a conversat
|
||||
## Lifecycle Overview
|
||||
|
||||
- Initialize once: Immediately after launching the codex app-server process, send an `initialize` request with your client metadata, then emit an `initialized` notification. Any other request before this handshake gets rejected.
|
||||
- Start (or resume) a thread: Call `thread/start` to open a fresh conversation. The response returns the thread object and you’ll also get a `thread/started` notification. If you’re continuing an existing conversation, call `thread/resume` with its ID instead.
|
||||
- Start (or resume) a thread: Call `thread/start` to open a fresh conversation. The response returns the thread object and you’ll also get a `thread/started` notification. If you’re continuing an existing conversation, call `thread/resume` with its ID instead. If you want to branch from an existing conversation, call `thread/fork` to create a new thread id with copied history.
|
||||
- Begin a turn: To send user input, call `turn/start` with the target `threadId` and the user's input. Optional fields let you override model, cwd, sandbox policy, etc. This immediately returns the new turn object and triggers a `turn/started` notification.
|
||||
- Stream events: After `turn/start`, keep reading JSON-RPC notifications on stdout. You’ll see `item/started`, `item/completed`, deltas like `item/agentMessage/delta`, tool progress, etc. These represent streaming model output plus any side effects (commands, tool calls, reasoning notes).
|
||||
- Finish the turn: When the model is done (or the turn is interrupted via making the `turn/interrupt` call), the server sends `turn/completed` with the final turn state and token usage.
|
||||
@@ -52,6 +52,10 @@ Clients must send a single `initialize` request before invoking any other method
|
||||
|
||||
Applications building on top of `codex app-server` should identify themselves via the `clientInfo` parameter.
|
||||
|
||||
**Important**: `clientInfo.name` is used to identify the client for the OpenAI Compliance Logs Platform. If
|
||||
you are developing a new Codex integration that is intended for enterprise use, please contact us to get it
|
||||
added to a known clients list. For more context: https://chatgpt.com/admin/api-reference#tag/Logs:-Codex
|
||||
|
||||
Example (from OpenAI's official VSCode extension):
|
||||
|
||||
```json
|
||||
@@ -60,7 +64,7 @@ Example (from OpenAI's official VSCode extension):
|
||||
"id": 0,
|
||||
"params": {
|
||||
"clientInfo": {
|
||||
"name": "codex-vscode",
|
||||
"name": "codex_vscode",
|
||||
"title": "Codex VS Code Extension",
|
||||
"version": "0.1.0"
|
||||
}
|
||||
@@ -72,7 +76,9 @@ Example (from OpenAI's official VSCode extension):
|
||||
|
||||
- `thread/start` — create a new thread; emits `thread/started` and auto-subscribes you to turn/item events for that thread.
|
||||
- `thread/resume` — reopen an existing thread by id so subsequent `turn/start` calls append to it.
|
||||
- `thread/fork` — fork an existing thread into a new thread id by copying the stored history; emits `thread/started` and auto-subscribes you to turn/item events for the new thread.
|
||||
- `thread/list` — page through stored rollouts; supports cursor-based pagination and optional `modelProviders` filtering.
|
||||
- `thread/loaded/list` — list the thread ids currently loaded in memory.
|
||||
- `thread/archive` — move a thread’s rollout file into the archived directory; returns `{}` on success.
|
||||
- `thread/rollback` — drop the last N turns from the agent’s in-memory context and persist a rollback marker in the rollout so future resumes see the pruned history; returns the updated `thread` (with `turns` populated) on success.
|
||||
- `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications.
|
||||
@@ -121,6 +127,14 @@ To continue a stored session, call `thread/resume` with the `thread.id` you prev
|
||||
{ "id": 11, "result": { "thread": { "id": "thr_123", … } } }
|
||||
```
|
||||
|
||||
To branch from a stored session, call `thread/fork` with the `thread.id`. This creates a new thread id and emits a `thread/started` notification for it:
|
||||
|
||||
```json
|
||||
{ "method": "thread/fork", "id": 12, "params": { "threadId": "thr_123" } }
|
||||
{ "id": 12, "result": { "thread": { "id": "thr_456", … } } }
|
||||
{ "method": "thread/started", "params": { "thread": { … } } }
|
||||
```
|
||||
|
||||
### Example: List threads (with pagination & filters)
|
||||
|
||||
`thread/list` lets you render a history UI. Pass any combination of:
|
||||
@@ -147,6 +161,17 @@ Example:
|
||||
|
||||
When `nextCursor` is `null`, you’ve reached the final page.
|
||||
|
||||
### Example: List loaded threads
|
||||
|
||||
`thread/loaded/list` returns thread ids currently loaded in memory. This is useful when you want to check which sessions are active without scanning rollouts on disk.
|
||||
|
||||
```json
|
||||
{ "method": "thread/loaded/list", "id": 21 }
|
||||
{ "id": 21, "result": {
|
||||
"data": ["thr_123", "thr_456"]
|
||||
} }
|
||||
```
|
||||
|
||||
### Example: Archive a thread
|
||||
|
||||
Use `thread/archive` to move the persisted rollout (stored as a JSONL file on disk) into the archived sessions directory.
|
||||
|
||||
@@ -106,7 +106,7 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
msg,
|
||||
} = event;
|
||||
match msg {
|
||||
EventMsg::TaskComplete(_ev) => {
|
||||
EventMsg::TurnComplete(_ev) => {
|
||||
handle_turn_complete(
|
||||
conversation_id,
|
||||
event_turn_id,
|
||||
|
||||
@@ -28,6 +28,8 @@ use codex_app_server_protocol::ConversationSummary;
|
||||
use codex_app_server_protocol::ExecOneOffCommandResponse;
|
||||
use codex_app_server_protocol::FeedbackUploadParams;
|
||||
use codex_app_server_protocol::FeedbackUploadResponse;
|
||||
use codex_app_server_protocol::ForkConversationParams;
|
||||
use codex_app_server_protocol::ForkConversationResponse;
|
||||
use codex_app_server_protocol::FuzzyFileSearchParams;
|
||||
use codex_app_server_protocol::FuzzyFileSearchResponse;
|
||||
use codex_app_server_protocol::GetAccountParams;
|
||||
@@ -86,9 +88,13 @@ use codex_app_server_protocol::SkillsListResponse;
|
||||
use codex_app_server_protocol::Thread;
|
||||
use codex_app_server_protocol::ThreadArchiveParams;
|
||||
use codex_app_server_protocol::ThreadArchiveResponse;
|
||||
use codex_app_server_protocol::ThreadForkParams;
|
||||
use codex_app_server_protocol::ThreadForkResponse;
|
||||
use codex_app_server_protocol::ThreadItem;
|
||||
use codex_app_server_protocol::ThreadListParams;
|
||||
use codex_app_server_protocol::ThreadListResponse;
|
||||
use codex_app_server_protocol::ThreadLoadedListParams;
|
||||
use codex_app_server_protocol::ThreadLoadedListResponse;
|
||||
use codex_app_server_protocol::ThreadResumeParams;
|
||||
use codex_app_server_protocol::ThreadResumeResponse;
|
||||
use codex_app_server_protocol::ThreadRollbackParams;
|
||||
@@ -124,6 +130,7 @@ use codex_core::config::ConfigService;
|
||||
use codex_core::config::edit::ConfigEditsBuilder;
|
||||
use codex_core::config::types::McpServerTransportConfig;
|
||||
use codex_core::default_client::get_codex_user_agent;
|
||||
use codex_core::error::CodexErr;
|
||||
use codex_core::exec::ExecParams;
|
||||
use codex_core::exec_env::create_env;
|
||||
use codex_core::features::Feature;
|
||||
@@ -367,6 +374,9 @@ impl CodexMessageProcessor {
|
||||
ClientRequest::ThreadResume { request_id, params } => {
|
||||
self.thread_resume(request_id, params).await;
|
||||
}
|
||||
ClientRequest::ThreadFork { request_id, params } => {
|
||||
self.thread_fork(request_id, params).await;
|
||||
}
|
||||
ClientRequest::ThreadArchive { request_id, params } => {
|
||||
self.thread_archive(request_id, params).await;
|
||||
}
|
||||
@@ -376,6 +386,9 @@ impl CodexMessageProcessor {
|
||||
ClientRequest::ThreadList { request_id, params } => {
|
||||
self.thread_list(request_id, params).await;
|
||||
}
|
||||
ClientRequest::ThreadLoadedList { request_id, params } => {
|
||||
self.thread_loaded_list(request_id, params).await;
|
||||
}
|
||||
ClientRequest::SkillsList { request_id, params } => {
|
||||
self.skills_list(request_id, params).await;
|
||||
}
|
||||
@@ -433,6 +446,9 @@ impl CodexMessageProcessor {
|
||||
ClientRequest::ResumeConversation { request_id, params } => {
|
||||
self.handle_resume_conversation(request_id, params).await;
|
||||
}
|
||||
ClientRequest::ForkConversation { request_id, params } => {
|
||||
self.handle_fork_conversation(request_id, params).await;
|
||||
}
|
||||
ClientRequest::ArchiveConversation { request_id, params } => {
|
||||
self.archive_conversation(request_id, params).await;
|
||||
}
|
||||
@@ -585,7 +601,7 @@ impl CodexMessageProcessor {
|
||||
.await;
|
||||
|
||||
let payload = AuthStatusChangeNotification {
|
||||
auth_method: self.auth_manager.auth().map(|auth| auth.mode),
|
||||
auth_method: self.auth_manager.auth_cached().map(|auth| auth.mode),
|
||||
};
|
||||
self.outgoing
|
||||
.send_server_notification(ServerNotification::AuthStatusChange(payload))
|
||||
@@ -615,7 +631,7 @@ impl CodexMessageProcessor {
|
||||
.await;
|
||||
|
||||
let payload_v2 = AccountUpdatedNotification {
|
||||
auth_mode: self.auth_manager.auth().map(|auth| auth.mode),
|
||||
auth_mode: self.auth_manager.auth_cached().map(|auth| auth.mode),
|
||||
};
|
||||
self.outgoing
|
||||
.send_server_notification(ServerNotification::AccountUpdated(payload_v2))
|
||||
@@ -707,7 +723,7 @@ impl CodexMessageProcessor {
|
||||
auth_manager.reload();
|
||||
|
||||
// Notify clients with the actual current auth mode.
|
||||
let current_auth_method = auth_manager.auth().map(|a| a.mode);
|
||||
let current_auth_method = auth_manager.auth_cached().map(|a| a.mode);
|
||||
let payload = AuthStatusChangeNotification {
|
||||
auth_method: current_auth_method,
|
||||
};
|
||||
@@ -797,7 +813,7 @@ impl CodexMessageProcessor {
|
||||
auth_manager.reload();
|
||||
|
||||
// Notify clients with the actual current auth mode.
|
||||
let current_auth_method = auth_manager.auth().map(|a| a.mode);
|
||||
let current_auth_method = auth_manager.auth_cached().map(|a| a.mode);
|
||||
let payload_v2 = AccountUpdatedNotification {
|
||||
auth_mode: current_auth_method,
|
||||
};
|
||||
@@ -909,7 +925,7 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
|
||||
// Reflect the current auth method after logout (likely None).
|
||||
Ok(self.auth_manager.auth().map(|auth| auth.mode))
|
||||
Ok(self.auth_manager.auth_cached().map(|auth| auth.mode))
|
||||
}
|
||||
|
||||
async fn logout_v1(&mut self, request_id: RequestId) {
|
||||
@@ -976,10 +992,10 @@ impl CodexMessageProcessor {
|
||||
requires_openai_auth: Some(false),
|
||||
}
|
||||
} else {
|
||||
match self.auth_manager.auth() {
|
||||
match self.auth_manager.auth().await {
|
||||
Some(auth) => {
|
||||
let auth_mode = auth.mode;
|
||||
let (reported_auth_method, token_opt) = match auth.get_token().await {
|
||||
let (reported_auth_method, token_opt) = match auth.get_token() {
|
||||
Ok(token) if !token.is_empty() => {
|
||||
let tok = if include_token { Some(token) } else { None };
|
||||
(Some(auth_mode), tok)
|
||||
@@ -1024,7 +1040,7 @@ impl CodexMessageProcessor {
|
||||
return;
|
||||
}
|
||||
|
||||
let account = match self.auth_manager.auth() {
|
||||
let account = match self.auth_manager.auth_cached() {
|
||||
Some(auth) => Some(match auth.mode {
|
||||
AuthMode::ApiKey => Account::ApiKey {},
|
||||
AuthMode::ChatGPT => {
|
||||
@@ -1078,7 +1094,7 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
|
||||
async fn fetch_account_rate_limits(&self) -> Result<CoreRateLimitSnapshot, JSONRPCErrorError> {
|
||||
let Some(auth) = self.auth_manager.auth() else {
|
||||
let Some(auth) = self.auth_manager.auth().await else {
|
||||
return Err(JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: "codex account authentication required to read rate limits".to_string(),
|
||||
@@ -1095,7 +1111,6 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
|
||||
let client = BackendClient::from_auth(self.config.chatgpt_base_url.clone(), &auth)
|
||||
.await
|
||||
.map_err(|err| JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: format!("failed to construct backend client: {err}"),
|
||||
@@ -1135,7 +1150,10 @@ impl CodexMessageProcessor {
|
||||
|
||||
async fn get_user_info(&self, request_id: RequestId) {
|
||||
// Read alleged user email from cached auth (best-effort; not verified).
|
||||
let alleged_user_email = self.auth_manager.auth().and_then(|a| a.get_account_email());
|
||||
let alleged_user_email = self
|
||||
.auth_manager
|
||||
.auth_cached()
|
||||
.and_then(|a| a.get_account_email());
|
||||
|
||||
let response = UserInfoResponse { alleged_user_email };
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
@@ -1591,6 +1609,61 @@ impl CodexMessageProcessor {
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
}
|
||||
|
||||
async fn thread_loaded_list(&self, request_id: RequestId, params: ThreadLoadedListParams) {
|
||||
let ThreadLoadedListParams { cursor, limit } = params;
|
||||
let mut data = self
|
||||
.thread_manager
|
||||
.list_thread_ids()
|
||||
.await
|
||||
.into_iter()
|
||||
.map(|thread_id| thread_id.to_string())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if data.is_empty() {
|
||||
let response = ThreadLoadedListResponse {
|
||||
data,
|
||||
next_cursor: None,
|
||||
};
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
return;
|
||||
}
|
||||
|
||||
data.sort();
|
||||
let total = data.len();
|
||||
let start = match cursor {
|
||||
Some(cursor) => {
|
||||
let cursor = match ThreadId::from_string(&cursor) {
|
||||
Ok(id) => id.to_string(),
|
||||
Err(_) => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("invalid cursor: {cursor}"),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
match data.binary_search(&cursor) {
|
||||
Ok(idx) => idx + 1,
|
||||
Err(idx) => idx,
|
||||
}
|
||||
}
|
||||
None => 0,
|
||||
};
|
||||
|
||||
let effective_limit = limit.unwrap_or(total as u32).max(1) as usize;
|
||||
let end = start.saturating_add(effective_limit).min(total);
|
||||
let page = data[start..end].to_vec();
|
||||
let next_cursor = page.last().filter(|_| end < total).cloned();
|
||||
|
||||
let response = ThreadLoadedListResponse {
|
||||
data: page,
|
||||
next_cursor,
|
||||
};
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
}
|
||||
|
||||
async fn thread_resume(&mut self, request_id: RequestId, params: ThreadResumeParams) {
|
||||
let ThreadResumeParams {
|
||||
thread_id,
|
||||
@@ -1796,6 +1869,198 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
async fn thread_fork(&mut self, request_id: RequestId, params: ThreadForkParams) {
|
||||
let ThreadForkParams {
|
||||
thread_id,
|
||||
path,
|
||||
model,
|
||||
model_provider,
|
||||
cwd,
|
||||
approval_policy,
|
||||
sandbox,
|
||||
config: cli_overrides,
|
||||
base_instructions,
|
||||
developer_instructions,
|
||||
} = params;
|
||||
|
||||
let overrides_requested = model.is_some()
|
||||
|| model_provider.is_some()
|
||||
|| cwd.is_some()
|
||||
|| approval_policy.is_some()
|
||||
|| sandbox.is_some()
|
||||
|| cli_overrides.is_some()
|
||||
|| base_instructions.is_some()
|
||||
|| developer_instructions.is_some();
|
||||
|
||||
let config = if overrides_requested {
|
||||
let overrides = self.build_thread_config_overrides(
|
||||
model,
|
||||
model_provider,
|
||||
cwd,
|
||||
approval_policy,
|
||||
sandbox,
|
||||
base_instructions,
|
||||
developer_instructions,
|
||||
);
|
||||
|
||||
// Persist windows sandbox feature.
|
||||
let mut cli_overrides = cli_overrides.unwrap_or_default();
|
||||
if cfg!(windows) && self.config.features.enabled(Feature::WindowsSandbox) {
|
||||
cli_overrides.insert(
|
||||
"features.experimental_windows_sandbox".to_string(),
|
||||
serde_json::json!(true),
|
||||
);
|
||||
}
|
||||
|
||||
match derive_config_from_params(&self.cli_overrides, Some(cli_overrides), overrides)
|
||||
.await
|
||||
{
|
||||
Ok(config) => config,
|
||||
Err(err) => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("error deriving config: {err}"),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
self.config.as_ref().clone()
|
||||
};
|
||||
|
||||
let rollout_path = if let Some(path) = path {
|
||||
path
|
||||
} else {
|
||||
let existing_thread_id = match ThreadId::from_string(&thread_id) {
|
||||
Ok(id) => id,
|
||||
Err(err) => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("invalid thread id: {err}"),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
match find_thread_path_by_id_str(
|
||||
&self.config.codex_home,
|
||||
&existing_thread_id.to_string(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Some(p)) => p,
|
||||
Ok(None) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("no rollout found for thread id {existing_thread_id}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
Err(err) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("failed to locate thread id {existing_thread_id}: {err}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let fallback_model_provider = config.model_provider_id.clone();
|
||||
|
||||
let NewThread {
|
||||
thread_id,
|
||||
session_configured,
|
||||
..
|
||||
} = match self
|
||||
.thread_manager
|
||||
.fork_thread(usize::MAX, config, rollout_path.clone())
|
||||
.await
|
||||
{
|
||||
Ok(thread) => thread,
|
||||
Err(err) => {
|
||||
let (code, message) = match err {
|
||||
CodexErr::Io(_) | CodexErr::Json(_) => (
|
||||
INVALID_REQUEST_ERROR_CODE,
|
||||
format!("failed to load rollout `{}`: {err}", rollout_path.display()),
|
||||
),
|
||||
CodexErr::InvalidRequest(message) => (INVALID_REQUEST_ERROR_CODE, message),
|
||||
_ => (INTERNAL_ERROR_CODE, format!("error forking thread: {err}")),
|
||||
};
|
||||
let error = JSONRPCErrorError {
|
||||
code,
|
||||
message,
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let SessionConfiguredEvent {
|
||||
rollout_path,
|
||||
initial_messages,
|
||||
..
|
||||
} = session_configured;
|
||||
// Auto-attach a conversation listener when forking a thread.
|
||||
if let Err(err) = self
|
||||
.attach_conversation_listener(thread_id, false, ApiVersion::V2)
|
||||
.await
|
||||
{
|
||||
tracing::warn!(
|
||||
"failed to attach listener for thread {}: {}",
|
||||
thread_id,
|
||||
err.message
|
||||
);
|
||||
}
|
||||
|
||||
let mut thread = match read_summary_from_rollout(
|
||||
rollout_path.as_path(),
|
||||
fallback_model_provider.as_str(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(summary) => summary_to_thread(summary),
|
||||
Err(err) => {
|
||||
self.send_internal_error(
|
||||
request_id,
|
||||
format!(
|
||||
"failed to load rollout `{}` for thread {thread_id}: {err}",
|
||||
rollout_path.display()
|
||||
),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
thread.turns = initial_messages
|
||||
.as_deref()
|
||||
.map_or_else(Vec::new, build_turns_from_event_msgs);
|
||||
|
||||
let response = ThreadForkResponse {
|
||||
thread: thread.clone(),
|
||||
model: session_configured.model,
|
||||
model_provider: session_configured.model_provider_id,
|
||||
cwd: session_configured.cwd,
|
||||
approval_policy: session_configured.approval_policy.into(),
|
||||
sandbox: session_configured.sandbox_policy.into(),
|
||||
reasoning_effort: session_configured.reasoning_effort,
|
||||
};
|
||||
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
|
||||
let notif = ThreadStartedNotification { thread };
|
||||
self.outgoing
|
||||
.send_server_notification(ServerNotification::ThreadStarted(notif))
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn get_thread_summary(
|
||||
&self,
|
||||
request_id: RequestId,
|
||||
@@ -2419,6 +2684,166 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_fork_conversation(
|
||||
&self,
|
||||
request_id: RequestId,
|
||||
params: ForkConversationParams,
|
||||
) {
|
||||
let ForkConversationParams {
|
||||
path,
|
||||
conversation_id,
|
||||
overrides,
|
||||
} = params;
|
||||
|
||||
// Derive a Config using the same logic as new conversation, honoring overrides if provided.
|
||||
let config = match overrides {
|
||||
Some(overrides) => {
|
||||
let NewConversationParams {
|
||||
model,
|
||||
model_provider,
|
||||
profile,
|
||||
cwd,
|
||||
approval_policy,
|
||||
sandbox: sandbox_mode,
|
||||
config: cli_overrides,
|
||||
base_instructions,
|
||||
developer_instructions,
|
||||
compact_prompt,
|
||||
include_apply_patch_tool,
|
||||
} = overrides;
|
||||
|
||||
// Persist windows sandbox feature.
|
||||
let mut cli_overrides = cli_overrides.unwrap_or_default();
|
||||
if cfg!(windows) && self.config.features.enabled(Feature::WindowsSandbox) {
|
||||
cli_overrides.insert(
|
||||
"features.experimental_windows_sandbox".to_string(),
|
||||
serde_json::json!(true),
|
||||
);
|
||||
}
|
||||
|
||||
let overrides = ConfigOverrides {
|
||||
model,
|
||||
config_profile: profile,
|
||||
cwd: cwd.map(PathBuf::from),
|
||||
approval_policy,
|
||||
sandbox_mode,
|
||||
model_provider,
|
||||
codex_linux_sandbox_exe: self.codex_linux_sandbox_exe.clone(),
|
||||
base_instructions,
|
||||
developer_instructions,
|
||||
compact_prompt,
|
||||
include_apply_patch_tool,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
derive_config_from_params(&self.cli_overrides, Some(cli_overrides), overrides).await
|
||||
}
|
||||
None => Ok(self.config.as_ref().clone()),
|
||||
};
|
||||
let config = match config {
|
||||
Ok(cfg) => cfg,
|
||||
Err(err) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("error deriving config: {err}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let rollout_path = if let Some(path) = path {
|
||||
path
|
||||
} else if let Some(conversation_id) = conversation_id {
|
||||
match find_thread_path_by_id_str(&self.config.codex_home, &conversation_id.to_string())
|
||||
.await
|
||||
{
|
||||
Ok(Some(found_path)) => found_path,
|
||||
Ok(None) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("no rollout found for conversation id {conversation_id}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
Err(err) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("failed to locate conversation id {conversation_id}: {err}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
"either path or conversation id must be provided".to_string(),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
};
|
||||
|
||||
let NewThread {
|
||||
thread_id,
|
||||
session_configured,
|
||||
..
|
||||
} = match self
|
||||
.thread_manager
|
||||
.fork_thread(usize::MAX, config, rollout_path.clone())
|
||||
.await
|
||||
{
|
||||
Ok(thread) => thread,
|
||||
Err(err) => {
|
||||
let (code, message) = match err {
|
||||
CodexErr::Io(_) | CodexErr::Json(_) => (
|
||||
INVALID_REQUEST_ERROR_CODE,
|
||||
format!("failed to load rollout `{}`: {err}", rollout_path.display()),
|
||||
),
|
||||
CodexErr::InvalidRequest(message) => (INVALID_REQUEST_ERROR_CODE, message),
|
||||
_ => (
|
||||
INTERNAL_ERROR_CODE,
|
||||
format!("error forking conversation: {err}"),
|
||||
),
|
||||
};
|
||||
let error = JSONRPCErrorError {
|
||||
code,
|
||||
message,
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
self.outgoing
|
||||
.send_server_notification(ServerNotification::SessionConfigured(
|
||||
SessionConfiguredNotification {
|
||||
session_id: session_configured.session_id,
|
||||
model: session_configured.model.clone(),
|
||||
reasoning_effort: session_configured.reasoning_effort,
|
||||
history_log_id: session_configured.history_log_id,
|
||||
history_entry_count: session_configured.history_entry_count,
|
||||
initial_messages: session_configured.initial_messages.clone(),
|
||||
rollout_path: session_configured.rollout_path.clone(),
|
||||
},
|
||||
))
|
||||
.await;
|
||||
let initial_messages = session_configured
|
||||
.initial_messages
|
||||
.map(|msgs| msgs.into_iter().collect());
|
||||
|
||||
// Reply with conversation id + model and initial messages (when present)
|
||||
let response = ForkConversationResponse {
|
||||
conversation_id: thread_id,
|
||||
model: session_configured.model.clone(),
|
||||
initial_messages,
|
||||
rollout_path: session_configured.rollout_path.clone(),
|
||||
};
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
}
|
||||
|
||||
async fn send_invalid_request_error(&self, request_id: RequestId, message: String) {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
@@ -3148,7 +3573,11 @@ impl CodexMessageProcessor {
|
||||
// JSON-serializing the `Event` as-is, but these should
|
||||
// be migrated to be variants of `ServerNotification`
|
||||
// instead.
|
||||
let method = format!("codex/event/{}", event.msg);
|
||||
let event_formatted = match &event.msg {
|
||||
EventMsg::TurnStarted(_) => "task_started",
|
||||
EventMsg::TurnComplete(_) => "task_complete",
|
||||
_ => &event.msg.to_string(),
|
||||
};
|
||||
let mut params = match serde_json::to_value(event.clone()) {
|
||||
Ok(serde_json::Value::Object(map)) => map,
|
||||
Ok(_) => {
|
||||
@@ -3167,7 +3596,7 @@ impl CodexMessageProcessor {
|
||||
|
||||
outgoing_for_task
|
||||
.send_notification(OutgoingNotification {
|
||||
method,
|
||||
method: format!("codex/event/{event_formatted}"),
|
||||
params: Some(params.into()),
|
||||
})
|
||||
.await;
|
||||
@@ -3253,6 +3682,16 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
|
||||
async fn upload_feedback(&self, request_id: RequestId, params: FeedbackUploadParams) {
|
||||
if !self.config.feedback_enabled {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: "sending feedback is disabled by configuration".to_string(),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
|
||||
let FeedbackUploadParams {
|
||||
classification,
|
||||
reason,
|
||||
|
||||
@@ -92,13 +92,18 @@ pub async fn run_main(
|
||||
|
||||
let feedback = CodexFeedback::new();
|
||||
|
||||
let otel =
|
||||
codex_core::otel_init::build_provider(&config, env!("CARGO_PKG_VERSION")).map_err(|e| {
|
||||
std::io::Error::new(
|
||||
ErrorKind::InvalidData,
|
||||
format!("error loading otel config: {e}"),
|
||||
)
|
||||
})?;
|
||||
let otel = codex_core::otel_init::build_provider(
|
||||
&config,
|
||||
env!("CARGO_PKG_VERSION"),
|
||||
Some("codex_app_server"),
|
||||
false,
|
||||
)
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
ErrorKind::InvalidData,
|
||||
format!("error loading otel config: {e}"),
|
||||
)
|
||||
})?;
|
||||
|
||||
// Install a simple subscriber so `tracing` output is visible. Users can
|
||||
// control the log level with `RUST_LOG`.
|
||||
|
||||
@@ -21,8 +21,10 @@ use codex_core::AuthManager;
|
||||
use codex_core::ThreadManager;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::config_loader::LoaderOverrides;
|
||||
use codex_core::default_client::SetOriginatorError;
|
||||
use codex_core::default_client::USER_AGENT_SUFFIX;
|
||||
use codex_core::default_client::get_codex_user_agent;
|
||||
use codex_core::default_client::set_default_originator;
|
||||
use codex_feedback::CodexFeedback;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use toml::Value as TomlValue;
|
||||
@@ -121,6 +123,27 @@ impl MessageProcessor {
|
||||
title: _title,
|
||||
version,
|
||||
} = params.client_info;
|
||||
if let Err(error) = set_default_originator(name.clone()) {
|
||||
match error {
|
||||
SetOriginatorError::InvalidHeaderValue => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!(
|
||||
"Invalid clientInfo.name: '{name}'. Must be a valid HTTP header value."
|
||||
),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
SetOriginatorError::AlreadyInitialized => {
|
||||
// No-op. This is expected to happen if the originator is already set via env var.
|
||||
// TODO(owen): Once we remove support for CODEX_INTERNAL_ORIGINATOR_OVERRIDE,
|
||||
// this will be an unexpected state and we can return a JSON-RPC error indicating
|
||||
// internal server error.
|
||||
}
|
||||
}
|
||||
}
|
||||
let user_agent_suffix = format!("{name}; {version}");
|
||||
if let Ok(mut suffix) = USER_AGENT_SUFFIX.lock() {
|
||||
*suffix = Some(user_agent_suffix);
|
||||
|
||||
7
codex-rs/app-server/tests/common/BUILD.bazel
Normal file
7
codex-rs/app-server/tests/common/BUILD.bazel
Normal file
@@ -0,0 +1,7 @@
|
||||
load("//:defs.bzl", "codex_rust_crate")
|
||||
|
||||
codex_rust_crate(
|
||||
name = "common",
|
||||
crate_name = "app_test_support",
|
||||
crate_srcs = glob(["*.rs"]),
|
||||
)
|
||||
@@ -17,9 +17,11 @@ pub use core_test_support::format_with_current_shell_non_login;
|
||||
pub use core_test_support::test_path_buf_with_windows;
|
||||
pub use core_test_support::test_tmp_path;
|
||||
pub use core_test_support::test_tmp_path_buf;
|
||||
pub use mcp_process::DEFAULT_CLIENT_NAME;
|
||||
pub use mcp_process::McpProcess;
|
||||
pub use mock_model_server::create_mock_chat_completions_server;
|
||||
pub use mock_model_server::create_mock_chat_completions_server_unchecked;
|
||||
pub use mock_model_server::create_mock_responses_server_repeating_assistant;
|
||||
pub use mock_model_server::create_mock_responses_server_sequence;
|
||||
pub use mock_model_server::create_mock_responses_server_sequence_unchecked;
|
||||
pub use models_cache::write_models_cache;
|
||||
pub use models_cache::write_models_cache_with_models;
|
||||
pub use responses::create_apply_patch_sse_response;
|
||||
|
||||
@@ -21,6 +21,7 @@ use codex_app_server_protocol::ConfigBatchWriteParams;
|
||||
use codex_app_server_protocol::ConfigReadParams;
|
||||
use codex_app_server_protocol::ConfigValueWriteParams;
|
||||
use codex_app_server_protocol::FeedbackUploadParams;
|
||||
use codex_app_server_protocol::ForkConversationParams;
|
||||
use codex_app_server_protocol::GetAccountParams;
|
||||
use codex_app_server_protocol::GetAuthStatusParams;
|
||||
use codex_app_server_protocol::InitializeParams;
|
||||
@@ -43,7 +44,9 @@ use codex_app_server_protocol::SendUserTurnParams;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_app_server_protocol::SetDefaultModelParams;
|
||||
use codex_app_server_protocol::ThreadArchiveParams;
|
||||
use codex_app_server_protocol::ThreadForkParams;
|
||||
use codex_app_server_protocol::ThreadListParams;
|
||||
use codex_app_server_protocol::ThreadLoadedListParams;
|
||||
use codex_app_server_protocol::ThreadResumeParams;
|
||||
use codex_app_server_protocol::ThreadRollbackParams;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
@@ -60,9 +63,11 @@ pub struct McpProcess {
|
||||
process: Child,
|
||||
stdin: ChildStdin,
|
||||
stdout: BufReader<ChildStdout>,
|
||||
pending_user_messages: VecDeque<JSONRPCNotification>,
|
||||
pending_messages: VecDeque<JSONRPCMessage>,
|
||||
}
|
||||
|
||||
pub const DEFAULT_CLIENT_NAME: &str = "codex-app-server-tests";
|
||||
|
||||
impl McpProcess {
|
||||
pub async fn new(codex_home: &Path) -> anyhow::Result<Self> {
|
||||
Self::new_with_env(codex_home, &[]).await
|
||||
@@ -127,39 +132,68 @@ impl McpProcess {
|
||||
process,
|
||||
stdin,
|
||||
stdout,
|
||||
pending_user_messages: VecDeque::new(),
|
||||
pending_messages: VecDeque::new(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Performs the initialization handshake with the MCP server.
|
||||
pub async fn initialize(&mut self) -> anyhow::Result<()> {
|
||||
let params = Some(serde_json::to_value(InitializeParams {
|
||||
client_info: ClientInfo {
|
||||
name: "codex-app-server-tests".to_string(),
|
||||
let initialized = self
|
||||
.initialize_with_client_info(ClientInfo {
|
||||
name: DEFAULT_CLIENT_NAME.to_string(),
|
||||
title: None,
|
||||
version: "0.1.0".to_string(),
|
||||
},
|
||||
})?);
|
||||
let req_id = self.send_request("initialize", params).await?;
|
||||
let initialized = self.read_jsonrpc_message().await?;
|
||||
let JSONRPCMessage::Response(response) = initialized else {
|
||||
})
|
||||
.await?;
|
||||
let JSONRPCMessage::Response(_) = initialized else {
|
||||
unreachable!("expected JSONRPCMessage::Response for initialize, got {initialized:?}");
|
||||
};
|
||||
if response.id != RequestId::Integer(req_id) {
|
||||
anyhow::bail!(
|
||||
"initialize response id mismatch: expected {}, got {:?}",
|
||||
req_id,
|
||||
response.id
|
||||
);
|
||||
}
|
||||
|
||||
// Send notifications/initialized to ack the response.
|
||||
self.send_notification(ClientNotification::Initialized)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Sends initialize with the provided client info and returns the response/error message.
|
||||
pub async fn initialize_with_client_info(
|
||||
&mut self,
|
||||
client_info: ClientInfo,
|
||||
) -> anyhow::Result<JSONRPCMessage> {
|
||||
let params = Some(serde_json::to_value(InitializeParams { client_info })?);
|
||||
let request_id = self.send_request("initialize", params).await?;
|
||||
let message = self.read_jsonrpc_message().await?;
|
||||
match message {
|
||||
JSONRPCMessage::Response(response) => {
|
||||
if response.id != RequestId::Integer(request_id) {
|
||||
anyhow::bail!(
|
||||
"initialize response id mismatch: expected {}, got {:?}",
|
||||
request_id,
|
||||
response.id
|
||||
);
|
||||
}
|
||||
|
||||
// Send notifications/initialized to ack the response.
|
||||
self.send_notification(ClientNotification::Initialized)
|
||||
.await?;
|
||||
|
||||
Ok(JSONRPCMessage::Response(response))
|
||||
}
|
||||
JSONRPCMessage::Error(error) => {
|
||||
if error.id != RequestId::Integer(request_id) {
|
||||
anyhow::bail!(
|
||||
"initialize error id mismatch: expected {}, got {:?}",
|
||||
request_id,
|
||||
error.id
|
||||
);
|
||||
}
|
||||
Ok(JSONRPCMessage::Error(error))
|
||||
}
|
||||
JSONRPCMessage::Notification(notification) => {
|
||||
anyhow::bail!("unexpected JSONRPCMessage::Notification: {notification:?}");
|
||||
}
|
||||
JSONRPCMessage::Request(request) => {
|
||||
anyhow::bail!("unexpected JSONRPCMessage::Request: {request:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Send a `newConversation` JSON-RPC request.
|
||||
pub async fn send_new_conversation_request(
|
||||
&mut self,
|
||||
@@ -308,6 +342,15 @@ impl McpProcess {
|
||||
self.send_request("thread/resume", params).await
|
||||
}
|
||||
|
||||
/// Send a `thread/fork` JSON-RPC request.
|
||||
pub async fn send_thread_fork_request(
|
||||
&mut self,
|
||||
params: ThreadForkParams,
|
||||
) -> anyhow::Result<i64> {
|
||||
let params = Some(serde_json::to_value(params)?);
|
||||
self.send_request("thread/fork", params).await
|
||||
}
|
||||
|
||||
/// Send a `thread/archive` JSON-RPC request.
|
||||
pub async fn send_thread_archive_request(
|
||||
&mut self,
|
||||
@@ -335,6 +378,15 @@ impl McpProcess {
|
||||
self.send_request("thread/list", params).await
|
||||
}
|
||||
|
||||
/// Send a `thread/loaded/list` JSON-RPC request.
|
||||
pub async fn send_thread_loaded_list_request(
|
||||
&mut self,
|
||||
params: ThreadLoadedListParams,
|
||||
) -> anyhow::Result<i64> {
|
||||
let params = Some(serde_json::to_value(params)?);
|
||||
self.send_request("thread/loaded/list", params).await
|
||||
}
|
||||
|
||||
/// Send a `model/list` JSON-RPC request.
|
||||
pub async fn send_list_models_request(
|
||||
&mut self,
|
||||
@@ -353,6 +405,15 @@ impl McpProcess {
|
||||
self.send_request("resumeConversation", params).await
|
||||
}
|
||||
|
||||
/// Send a `forkConversation` JSON-RPC request.
|
||||
pub async fn send_fork_conversation_request(
|
||||
&mut self,
|
||||
params: ForkConversationParams,
|
||||
) -> anyhow::Result<i64> {
|
||||
let params = Some(serde_json::to_value(params)?);
|
||||
self.send_request("forkConversation", params).await
|
||||
}
|
||||
|
||||
/// Send a `loginApiKey` JSON-RPC request.
|
||||
pub async fn send_login_api_key_request(
|
||||
&mut self,
|
||||
@@ -544,27 +605,16 @@ impl McpProcess {
|
||||
pub async fn read_stream_until_request_message(&mut self) -> anyhow::Result<ServerRequest> {
|
||||
eprintln!("in read_stream_until_request_message()");
|
||||
|
||||
loop {
|
||||
let message = self.read_jsonrpc_message().await?;
|
||||
let message = self
|
||||
.read_stream_until_message(|message| matches!(message, JSONRPCMessage::Request(_)))
|
||||
.await?;
|
||||
|
||||
match message {
|
||||
JSONRPCMessage::Notification(notification) => {
|
||||
eprintln!("notification: {notification:?}");
|
||||
self.enqueue_user_message(notification);
|
||||
}
|
||||
JSONRPCMessage::Request(jsonrpc_request) => {
|
||||
return jsonrpc_request.try_into().with_context(
|
||||
|| "failed to deserialize ServerRequest from JSONRPCRequest",
|
||||
);
|
||||
}
|
||||
JSONRPCMessage::Error(_) => {
|
||||
anyhow::bail!("unexpected JSONRPCMessage::Error: {message:?}");
|
||||
}
|
||||
JSONRPCMessage::Response(_) => {
|
||||
anyhow::bail!("unexpected JSONRPCMessage::Response: {message:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
let JSONRPCMessage::Request(jsonrpc_request) = message else {
|
||||
unreachable!("expected JSONRPCMessage::Request, got {message:?}");
|
||||
};
|
||||
jsonrpc_request
|
||||
.try_into()
|
||||
.with_context(|| "failed to deserialize ServerRequest from JSONRPCRequest")
|
||||
}
|
||||
|
||||
pub async fn read_stream_until_response_message(
|
||||
@@ -573,52 +623,32 @@ impl McpProcess {
|
||||
) -> anyhow::Result<JSONRPCResponse> {
|
||||
eprintln!("in read_stream_until_response_message({request_id:?})");
|
||||
|
||||
loop {
|
||||
let message = self.read_jsonrpc_message().await?;
|
||||
match message {
|
||||
JSONRPCMessage::Notification(notification) => {
|
||||
eprintln!("notification: {notification:?}");
|
||||
self.enqueue_user_message(notification);
|
||||
}
|
||||
JSONRPCMessage::Request(_) => {
|
||||
anyhow::bail!("unexpected JSONRPCMessage::Request: {message:?}");
|
||||
}
|
||||
JSONRPCMessage::Error(_) => {
|
||||
anyhow::bail!("unexpected JSONRPCMessage::Error: {message:?}");
|
||||
}
|
||||
JSONRPCMessage::Response(jsonrpc_response) => {
|
||||
if jsonrpc_response.id == request_id {
|
||||
return Ok(jsonrpc_response);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let message = self
|
||||
.read_stream_until_message(|message| {
|
||||
Self::message_request_id(message) == Some(&request_id)
|
||||
})
|
||||
.await?;
|
||||
|
||||
let JSONRPCMessage::Response(response) = message else {
|
||||
unreachable!("expected JSONRPCMessage::Response, got {message:?}");
|
||||
};
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
pub async fn read_stream_until_error_message(
|
||||
&mut self,
|
||||
request_id: RequestId,
|
||||
) -> anyhow::Result<JSONRPCError> {
|
||||
loop {
|
||||
let message = self.read_jsonrpc_message().await?;
|
||||
match message {
|
||||
JSONRPCMessage::Notification(notification) => {
|
||||
eprintln!("notification: {notification:?}");
|
||||
self.enqueue_user_message(notification);
|
||||
}
|
||||
JSONRPCMessage::Request(_) => {
|
||||
anyhow::bail!("unexpected JSONRPCMessage::Request: {message:?}");
|
||||
}
|
||||
JSONRPCMessage::Response(_) => {
|
||||
// Keep scanning; we're waiting for an error with matching id.
|
||||
}
|
||||
JSONRPCMessage::Error(err) => {
|
||||
if err.id == request_id {
|
||||
return Ok(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let message = self
|
||||
.read_stream_until_message(|message| {
|
||||
Self::message_request_id(message) == Some(&request_id)
|
||||
})
|
||||
.await?;
|
||||
|
||||
let JSONRPCMessage::Error(err) = message else {
|
||||
unreachable!("expected JSONRPCMessage::Error, got {message:?}");
|
||||
};
|
||||
Ok(err)
|
||||
}
|
||||
|
||||
pub async fn read_stream_until_notification_message(
|
||||
@@ -627,46 +657,64 @@ impl McpProcess {
|
||||
) -> anyhow::Result<JSONRPCNotification> {
|
||||
eprintln!("in read_stream_until_notification_message({method})");
|
||||
|
||||
if let Some(notification) = self.take_pending_notification_by_method(method) {
|
||||
return Ok(notification);
|
||||
let message = self
|
||||
.read_stream_until_message(|message| {
|
||||
matches!(
|
||||
message,
|
||||
JSONRPCMessage::Notification(notification) if notification.method == method
|
||||
)
|
||||
})
|
||||
.await?;
|
||||
|
||||
let JSONRPCMessage::Notification(notification) = message else {
|
||||
unreachable!("expected JSONRPCMessage::Notification, got {message:?}");
|
||||
};
|
||||
Ok(notification)
|
||||
}
|
||||
|
||||
/// Clears any buffered messages so future reads only consider new stream items.
|
||||
///
|
||||
/// We call this when e.g. we want to validate against the next turn and no longer care about
|
||||
/// messages buffered from the prior turn.
|
||||
pub fn clear_message_buffer(&mut self) {
|
||||
self.pending_messages.clear();
|
||||
}
|
||||
|
||||
/// Reads the stream until a message matches `predicate`, buffering any non-matching messages
|
||||
/// for later reads.
|
||||
async fn read_stream_until_message<F>(&mut self, predicate: F) -> anyhow::Result<JSONRPCMessage>
|
||||
where
|
||||
F: Fn(&JSONRPCMessage) -> bool,
|
||||
{
|
||||
if let Some(message) = self.take_pending_message(&predicate) {
|
||||
return Ok(message);
|
||||
}
|
||||
|
||||
loop {
|
||||
let message = self.read_jsonrpc_message().await?;
|
||||
match message {
|
||||
JSONRPCMessage::Notification(notification) => {
|
||||
if notification.method == method {
|
||||
return Ok(notification);
|
||||
}
|
||||
self.enqueue_user_message(notification);
|
||||
}
|
||||
JSONRPCMessage::Request(_) => {
|
||||
anyhow::bail!("unexpected JSONRPCMessage::Request: {message:?}");
|
||||
}
|
||||
JSONRPCMessage::Error(_) => {
|
||||
anyhow::bail!("unexpected JSONRPCMessage::Error: {message:?}");
|
||||
}
|
||||
JSONRPCMessage::Response(_) => {
|
||||
anyhow::bail!("unexpected JSONRPCMessage::Response: {message:?}");
|
||||
}
|
||||
if predicate(&message) {
|
||||
return Ok(message);
|
||||
}
|
||||
self.pending_messages.push_back(message);
|
||||
}
|
||||
}
|
||||
|
||||
fn take_pending_notification_by_method(&mut self, method: &str) -> Option<JSONRPCNotification> {
|
||||
if let Some(pos) = self
|
||||
.pending_user_messages
|
||||
.iter()
|
||||
.position(|notification| notification.method == method)
|
||||
{
|
||||
return self.pending_user_messages.remove(pos);
|
||||
fn take_pending_message<F>(&mut self, predicate: &F) -> Option<JSONRPCMessage>
|
||||
where
|
||||
F: Fn(&JSONRPCMessage) -> bool,
|
||||
{
|
||||
if let Some(pos) = self.pending_messages.iter().position(predicate) {
|
||||
return self.pending_messages.remove(pos);
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
fn enqueue_user_message(&mut self, notification: JSONRPCNotification) {
|
||||
if notification.method == "codex/event/user_message" {
|
||||
self.pending_user_messages.push_back(notification);
|
||||
fn message_request_id(message: &JSONRPCMessage) -> Option<&RequestId> {
|
||||
match message {
|
||||
JSONRPCMessage::Request(request) => Some(&request.id),
|
||||
JSONRPCMessage::Response(response) => Some(&response.id),
|
||||
JSONRPCMessage::Error(err) => Some(&err.id),
|
||||
JSONRPCMessage::Notification(_) => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,17 +1,18 @@
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use core_test_support::responses;
|
||||
use wiremock::Mock;
|
||||
use wiremock::MockServer;
|
||||
use wiremock::Respond;
|
||||
use wiremock::ResponseTemplate;
|
||||
use wiremock::matchers::method;
|
||||
use wiremock::matchers::path;
|
||||
use wiremock::matchers::path_regex;
|
||||
|
||||
/// Create a mock server that will provide the responses, in order, for
|
||||
/// requests to the `/v1/chat/completions` endpoint.
|
||||
pub async fn create_mock_chat_completions_server(responses: Vec<String>) -> MockServer {
|
||||
let server = MockServer::start().await;
|
||||
/// requests to the `/v1/responses` endpoint.
|
||||
pub async fn create_mock_responses_server_sequence(responses: Vec<String>) -> MockServer {
|
||||
let server = responses::start_mock_server().await;
|
||||
|
||||
let num_calls = responses.len();
|
||||
let seq_responder = SeqResponder {
|
||||
@@ -20,7 +21,7 @@ pub async fn create_mock_chat_completions_server(responses: Vec<String>) -> Mock
|
||||
};
|
||||
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/v1/chat/completions"))
|
||||
.and(path_regex(".*/responses$"))
|
||||
.respond_with(seq_responder)
|
||||
.expect(num_calls as u64)
|
||||
.mount(&server)
|
||||
@@ -29,10 +30,10 @@ pub async fn create_mock_chat_completions_server(responses: Vec<String>) -> Mock
|
||||
server
|
||||
}
|
||||
|
||||
/// Same as `create_mock_chat_completions_server` but does not enforce an
|
||||
/// Same as `create_mock_responses_server_sequence` but does not enforce an
|
||||
/// expectation on the number of calls.
|
||||
pub async fn create_mock_chat_completions_server_unchecked(responses: Vec<String>) -> MockServer {
|
||||
let server = MockServer::start().await;
|
||||
pub async fn create_mock_responses_server_sequence_unchecked(responses: Vec<String>) -> MockServer {
|
||||
let server = responses::start_mock_server().await;
|
||||
|
||||
let seq_responder = SeqResponder {
|
||||
num_calls: AtomicUsize::new(0),
|
||||
@@ -40,7 +41,7 @@ pub async fn create_mock_chat_completions_server_unchecked(responses: Vec<String
|
||||
};
|
||||
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/v1/chat/completions"))
|
||||
.and(path_regex(".*/responses$"))
|
||||
.respond_with(seq_responder)
|
||||
.mount(&server)
|
||||
.await;
|
||||
@@ -57,10 +58,24 @@ impl Respond for SeqResponder {
|
||||
fn respond(&self, _: &wiremock::Request) -> ResponseTemplate {
|
||||
let call_num = self.num_calls.fetch_add(1, Ordering::SeqCst);
|
||||
match self.responses.get(call_num) {
|
||||
Some(response) => ResponseTemplate::new(200)
|
||||
.insert_header("content-type", "text/event-stream")
|
||||
.set_body_raw(response.clone(), "text/event-stream"),
|
||||
Some(response) => responses::sse_response(response.clone()),
|
||||
None => panic!("no response for {call_num}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a mock responses API server that returns the same assistant message for every request.
|
||||
pub async fn create_mock_responses_server_repeating_assistant(message: &str) -> MockServer {
|
||||
let server = responses::start_mock_server().await;
|
||||
let body = responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_assistant_message("msg-1", message),
|
||||
responses::ev_completed("resp-1"),
|
||||
]);
|
||||
Mock::given(method("POST"))
|
||||
.and(path_regex(".*/responses$"))
|
||||
.respond_with(responses::sse_response(body))
|
||||
.mount(&server)
|
||||
.await;
|
||||
server
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use core_test_support::responses;
|
||||
use serde_json::json;
|
||||
use std::path::Path;
|
||||
|
||||
@@ -14,85 +15,30 @@ pub fn create_shell_command_sse_response(
|
||||
"workdir": workdir.map(|w| w.to_string_lossy()),
|
||||
"timeout_ms": timeout_ms
|
||||
}))?;
|
||||
let tool_call = json!({
|
||||
"choices": [
|
||||
{
|
||||
"delta": {
|
||||
"tool_calls": [
|
||||
{
|
||||
"id": call_id,
|
||||
"function": {
|
||||
"name": "shell_command",
|
||||
"arguments": tool_call_arguments
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
"finish_reason": "tool_calls"
|
||||
}
|
||||
]
|
||||
});
|
||||
|
||||
let sse = format!(
|
||||
"data: {}\n\ndata: DONE\n\n",
|
||||
serde_json::to_string(&tool_call)?
|
||||
);
|
||||
Ok(sse)
|
||||
Ok(responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_function_call(call_id, "shell_command", &tool_call_arguments),
|
||||
responses::ev_completed("resp-1"),
|
||||
]))
|
||||
}
|
||||
|
||||
pub fn create_final_assistant_message_sse_response(message: &str) -> anyhow::Result<String> {
|
||||
let assistant_message = json!({
|
||||
"choices": [
|
||||
{
|
||||
"delta": {
|
||||
"content": message
|
||||
},
|
||||
"finish_reason": "stop"
|
||||
}
|
||||
]
|
||||
});
|
||||
|
||||
let sse = format!(
|
||||
"data: {}\n\ndata: DONE\n\n",
|
||||
serde_json::to_string(&assistant_message)?
|
||||
);
|
||||
Ok(sse)
|
||||
Ok(responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_assistant_message("msg-1", message),
|
||||
responses::ev_completed("resp-1"),
|
||||
]))
|
||||
}
|
||||
|
||||
pub fn create_apply_patch_sse_response(
|
||||
patch_content: &str,
|
||||
call_id: &str,
|
||||
) -> anyhow::Result<String> {
|
||||
// Use shell_command to call apply_patch with heredoc format
|
||||
let command = format!("apply_patch <<'EOF'\n{patch_content}\nEOF");
|
||||
let tool_call_arguments = serde_json::to_string(&json!({
|
||||
"command": command
|
||||
}))?;
|
||||
|
||||
let tool_call = json!({
|
||||
"choices": [
|
||||
{
|
||||
"delta": {
|
||||
"tool_calls": [
|
||||
{
|
||||
"id": call_id,
|
||||
"function": {
|
||||
"name": "shell_command",
|
||||
"arguments": tool_call_arguments
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
"finish_reason": "tool_calls"
|
||||
}
|
||||
]
|
||||
});
|
||||
|
||||
let sse = format!(
|
||||
"data: {}\n\ndata: DONE\n\n",
|
||||
serde_json::to_string(&tool_call)?
|
||||
);
|
||||
Ok(sse)
|
||||
Ok(responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_apply_patch_shell_command_call_via_heredoc(call_id, patch_content),
|
||||
responses::ev_completed("resp-1"),
|
||||
]))
|
||||
}
|
||||
|
||||
pub fn create_exec_command_sse_response(call_id: &str) -> anyhow::Result<String> {
|
||||
@@ -108,28 +54,9 @@ pub fn create_exec_command_sse_response(call_id: &str) -> anyhow::Result<String>
|
||||
"cmd": command.join(" "),
|
||||
"yield_time_ms": 500
|
||||
}))?;
|
||||
let tool_call = json!({
|
||||
"choices": [
|
||||
{
|
||||
"delta": {
|
||||
"tool_calls": [
|
||||
{
|
||||
"id": call_id,
|
||||
"function": {
|
||||
"name": "exec_command",
|
||||
"arguments": tool_call_arguments
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
"finish_reason": "tool_calls"
|
||||
}
|
||||
]
|
||||
});
|
||||
|
||||
let sse = format!(
|
||||
"data: {}\n\ndata: DONE\n\n",
|
||||
serde_json::to_string(&tool_call)?
|
||||
);
|
||||
Ok(sse)
|
||||
Ok(responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_function_call(call_id, "exec_command", &tool_call_arguments),
|
||||
responses::ev_completed("resp-1"),
|
||||
]))
|
||||
}
|
||||
|
||||
@@ -37,7 +37,7 @@ model_provider = "mock_provider"
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "http://127.0.0.1:0/v1"
|
||||
wire_api = "chat"
|
||||
wire_api = "responses"
|
||||
request_max_retries = 0
|
||||
stream_max_retries = 0
|
||||
{requires_line}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_final_assistant_message_sse_response;
|
||||
use app_test_support::create_mock_chat_completions_server;
|
||||
use app_test_support::create_mock_responses_server_sequence;
|
||||
use app_test_support::create_shell_command_sse_response;
|
||||
use app_test_support::format_with_current_shell;
|
||||
use app_test_support::to_response;
|
||||
@@ -65,7 +65,7 @@ async fn test_codex_jsonrpc_conversation_flow() -> Result<()> {
|
||||
)?,
|
||||
create_final_assistant_message_sse_response("Enjoy your new git repo!")?,
|
||||
];
|
||||
let server = create_mock_chat_completions_server(responses).await;
|
||||
let server = create_mock_responses_server_sequence(responses).await;
|
||||
create_config_toml(&codex_home, &server.uri())?;
|
||||
|
||||
// Start MCP server and initialize.
|
||||
@@ -197,7 +197,7 @@ async fn test_send_user_turn_changes_approval_policy_behavior() -> Result<()> {
|
||||
)?,
|
||||
create_final_assistant_message_sse_response("done 2")?,
|
||||
];
|
||||
let server = create_mock_chat_completions_server(responses).await;
|
||||
let server = create_mock_responses_server_sequence(responses).await;
|
||||
create_config_toml(&codex_home, &server.uri())?;
|
||||
|
||||
// Start MCP server and initialize.
|
||||
@@ -283,7 +283,7 @@ async fn test_send_user_turn_changes_approval_policy_behavior() -> Result<()> {
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Wait for first TaskComplete
|
||||
// Wait for first TurnComplete
|
||||
let _ = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("codex/event/task_complete"),
|
||||
@@ -363,7 +363,7 @@ async fn test_send_user_turn_updates_sandbox_and_cwd_between_turns() -> Result<(
|
||||
)?,
|
||||
create_final_assistant_message_sse_response("done second")?,
|
||||
];
|
||||
let server = create_mock_chat_completions_server(responses).await;
|
||||
let server = create_mock_responses_server_sequence(responses).await;
|
||||
create_config_toml(&codex_home, &server.uri())?;
|
||||
|
||||
let mut mcp = McpProcess::new(&codex_home).await?;
|
||||
@@ -430,6 +430,7 @@ async fn test_send_user_turn_updates_sandbox_and_cwd_between_turns() -> Result<(
|
||||
mcp.read_stream_until_notification_message("codex/event/task_complete"),
|
||||
)
|
||||
.await??;
|
||||
mcp.clear_message_buffer();
|
||||
|
||||
let second_turn_id = mcp
|
||||
.send_send_user_turn_request(SendUserTurnParams {
|
||||
@@ -499,7 +500,7 @@ model_provider = "mock_provider"
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "{server_uri}/v1"
|
||||
wire_api = "chat"
|
||||
wire_api = "responses"
|
||||
request_max_retries = 0
|
||||
stream_max_retries = 0
|
||||
"#
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_final_assistant_message_sse_response;
|
||||
use app_test_support::create_mock_chat_completions_server;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::AddConversationListenerParams;
|
||||
use codex_app_server_protocol::AddConversationSubscriptionResponse;
|
||||
@@ -12,6 +11,7 @@ use codex_app_server_protocol::NewConversationResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::SendUserMessageParams;
|
||||
use codex_app_server_protocol::SendUserMessageResponse;
|
||||
use core_test_support::responses;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use std::path::Path;
|
||||
@@ -23,8 +23,9 @@ const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_conversation_create_and_send_message_ok() -> Result<()> {
|
||||
// Mock server – we won't strictly rely on it, but provide one to satisfy any model wiring.
|
||||
let responses = vec![create_final_assistant_message_sse_response("Done")?];
|
||||
let server = create_mock_chat_completions_server(responses).await;
|
||||
let response_body = create_final_assistant_message_sse_response("Done")?;
|
||||
let server = responses::start_mock_server().await;
|
||||
let response_mock = responses::mount_sse_sequence(&server, vec![response_body]).await;
|
||||
|
||||
// Temporary Codex home with config pointing at the mock server.
|
||||
let codex_home = TempDir::new()?;
|
||||
@@ -86,32 +87,30 @@ async fn test_conversation_create_and_send_message_ok() -> Result<()> {
|
||||
.await??;
|
||||
let _ok: SendUserMessageResponse = to_response::<SendUserMessageResponse>(send_resp)?;
|
||||
|
||||
// avoid race condition by waiting for the mock server to receive the chat.completions request
|
||||
// Avoid race condition by waiting for the mock server to receive the responses request.
|
||||
let deadline = std::time::Instant::now() + DEFAULT_READ_TIMEOUT;
|
||||
let requests = loop {
|
||||
let requests = server.received_requests().await.unwrap_or_default();
|
||||
let requests = response_mock.requests();
|
||||
if !requests.is_empty() {
|
||||
break requests;
|
||||
}
|
||||
if std::time::Instant::now() >= deadline {
|
||||
panic!("mock server did not receive the chat.completions request in time");
|
||||
panic!("mock server did not receive the responses request in time");
|
||||
}
|
||||
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
|
||||
};
|
||||
|
||||
// Verify the outbound request body matches expectations for Chat Completions.
|
||||
// Verify the outbound request body matches expectations for Responses.
|
||||
let request = requests
|
||||
.first()
|
||||
.expect("mock server should have received at least one request");
|
||||
let body = request.body_json::<serde_json::Value>()?;
|
||||
let body = request.body_json();
|
||||
assert_eq!(body["model"], json!("o3"));
|
||||
assert!(body["stream"].as_bool().unwrap_or(false));
|
||||
let messages = body["messages"]
|
||||
.as_array()
|
||||
.expect("messages should be array");
|
||||
let last = messages.last().expect("at least one message");
|
||||
assert_eq!(last["role"], json!("user"));
|
||||
assert_eq!(last["content"], json!("Hello"));
|
||||
let user_texts = request.message_input_texts("user");
|
||||
assert!(
|
||||
user_texts.iter().any(|text| text == "Hello"),
|
||||
"expected user input to include Hello, got {user_texts:?}"
|
||||
);
|
||||
|
||||
drop(server);
|
||||
Ok(())
|
||||
@@ -133,7 +132,7 @@ model_provider = "mock_provider"
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "{server_uri}/v1"
|
||||
wire_api = "chat"
|
||||
wire_api = "responses"
|
||||
request_max_retries = 0
|
||||
stream_max_retries = 0
|
||||
"#
|
||||
|
||||
140
codex-rs/app-server/tests/suite/fork_thread.rs
Normal file
140
codex-rs/app-server/tests/suite/fork_thread.rs
Normal file
@@ -0,0 +1,140 @@
|
||||
use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_fake_rollout;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::ForkConversationParams;
|
||||
use codex_app_server_protocol::ForkConversationResponse;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::NewConversationParams; // reused for overrides shape
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::SessionConfiguredNotification;
|
||||
use codex_core::protocol::EventMsg;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn fork_conversation_creates_new_rollout() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
|
||||
let preview = "Hello A";
|
||||
let conversation_id = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-02T12-00-00",
|
||||
"2025-01-02T12:00:00Z",
|
||||
preview,
|
||||
Some("openai"),
|
||||
None,
|
||||
)?;
|
||||
|
||||
let original_path = codex_home
|
||||
.path()
|
||||
.join("sessions")
|
||||
.join("2025")
|
||||
.join("01")
|
||||
.join("02")
|
||||
.join(format!(
|
||||
"rollout-2025-01-02T12-00-00-{conversation_id}.jsonl"
|
||||
));
|
||||
assert!(
|
||||
original_path.exists(),
|
||||
"expected original rollout to exist at {}",
|
||||
original_path.display()
|
||||
);
|
||||
let original_contents = std::fs::read_to_string(&original_path)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let fork_req_id = mcp
|
||||
.send_fork_conversation_request(ForkConversationParams {
|
||||
path: Some(original_path.clone()),
|
||||
conversation_id: None,
|
||||
overrides: Some(NewConversationParams {
|
||||
model: Some("o3".to_string()),
|
||||
..Default::default()
|
||||
}),
|
||||
})
|
||||
.await?;
|
||||
|
||||
// Expect a sessionConfigured notification for the forked session.
|
||||
let notification: JSONRPCNotification = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("sessionConfigured"),
|
||||
)
|
||||
.await??;
|
||||
let session_configured: ServerNotification = notification.try_into()?;
|
||||
let ServerNotification::SessionConfigured(SessionConfiguredNotification {
|
||||
model,
|
||||
session_id,
|
||||
rollout_path,
|
||||
initial_messages: session_initial_messages,
|
||||
..
|
||||
}) = session_configured
|
||||
else {
|
||||
unreachable!("expected sessionConfigured notification");
|
||||
};
|
||||
|
||||
assert_eq!(model, "o3");
|
||||
assert_ne!(
|
||||
session_id.to_string(),
|
||||
conversation_id,
|
||||
"expected a new conversation id when forking"
|
||||
);
|
||||
assert_ne!(
|
||||
rollout_path, original_path,
|
||||
"expected a new rollout path when forking"
|
||||
);
|
||||
assert!(
|
||||
rollout_path.exists(),
|
||||
"expected forked rollout to exist at {}",
|
||||
rollout_path.display()
|
||||
);
|
||||
|
||||
let session_initial_messages =
|
||||
session_initial_messages.expect("expected initial messages when forking from rollout");
|
||||
match session_initial_messages.as_slice() {
|
||||
[EventMsg::UserMessage(message)] => {
|
||||
assert_eq!(message.message, preview);
|
||||
}
|
||||
other => panic!("unexpected initial messages from rollout fork: {other:#?}"),
|
||||
}
|
||||
|
||||
// Then the response for forkConversation.
|
||||
let fork_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(fork_req_id)),
|
||||
)
|
||||
.await??;
|
||||
let ForkConversationResponse {
|
||||
conversation_id: forked_id,
|
||||
model: forked_model,
|
||||
initial_messages: response_initial_messages,
|
||||
rollout_path: response_rollout_path,
|
||||
} = to_response::<ForkConversationResponse>(fork_resp)?;
|
||||
|
||||
assert_eq!(forked_model, "o3");
|
||||
assert_eq!(response_rollout_path, rollout_path);
|
||||
assert_ne!(forked_id.to_string(), conversation_id);
|
||||
|
||||
let response_initial_messages =
|
||||
response_initial_messages.expect("expected initial messages in fork response");
|
||||
match response_initial_messages.as_slice() {
|
||||
[EventMsg::UserMessage(message)] => {
|
||||
assert_eq!(message.message, preview);
|
||||
}
|
||||
other => panic!("unexpected initial messages in fork response: {other:#?}"),
|
||||
}
|
||||
|
||||
let after_contents = std::fs::read_to_string(&original_path)?;
|
||||
assert_eq!(
|
||||
after_contents, original_contents,
|
||||
"fork should not mutate the original rollout file"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -18,7 +18,7 @@ use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_mock_chat_completions_server;
|
||||
use app_test_support::create_mock_responses_server_sequence;
|
||||
use app_test_support::create_shell_command_sse_response;
|
||||
use app_test_support::to_response;
|
||||
|
||||
@@ -56,7 +56,7 @@ async fn shell_command_interruption() -> anyhow::Result<()> {
|
||||
std::fs::create_dir(&working_directory)?;
|
||||
|
||||
// Create mock server with a single SSE response: the long sleep command
|
||||
let server = create_mock_chat_completions_server(vec![create_shell_command_sse_response(
|
||||
let server = create_mock_responses_server_sequence(vec![create_shell_command_sse_response(
|
||||
shell_command.clone(),
|
||||
Some(&working_directory),
|
||||
Some(10_000), // 10 seconds timeout in ms
|
||||
@@ -153,7 +153,7 @@ model_provider = "mock_provider"
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "{server_uri}/v1"
|
||||
wire_api = "chat"
|
||||
wire_api = "responses"
|
||||
request_max_retries = 0
|
||||
stream_max_retries = 0
|
||||
"#
|
||||
|
||||
@@ -32,7 +32,7 @@ model_provider = "mock_provider"
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "http://127.0.0.1:0/v1"
|
||||
wire_api = "chat"
|
||||
wire_api = "responses"
|
||||
request_max_retries = 0
|
||||
stream_max_retries = 0
|
||||
"#,
|
||||
|
||||
@@ -3,6 +3,7 @@ mod auth;
|
||||
mod codex_message_processor_flow;
|
||||
mod config;
|
||||
mod create_thread;
|
||||
mod fork_thread;
|
||||
mod fuzzy_file_search;
|
||||
mod interrupt;
|
||||
mod list_resume;
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_final_assistant_message_sse_response;
|
||||
use app_test_support::create_mock_chat_completions_server;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::AddConversationListenerParams;
|
||||
use codex_app_server_protocol::AddConversationSubscriptionResponse;
|
||||
@@ -17,6 +15,7 @@ use codex_protocol::ThreadId;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::RawResponseItemEvent;
|
||||
use core_test_support::responses;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::path::Path;
|
||||
use tempfile::TempDir;
|
||||
@@ -26,13 +25,21 @@ const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_send_message_success() -> Result<()> {
|
||||
// Spin up a mock completions server that immediately ends the Codex turn.
|
||||
// Spin up a mock responses server that immediately ends the Codex turn.
|
||||
// Two Codex turns hit the mock model (session start + send-user-message). Provide two SSE responses.
|
||||
let responses = vec![
|
||||
create_final_assistant_message_sse_response("Done")?,
|
||||
create_final_assistant_message_sse_response("Done")?,
|
||||
];
|
||||
let server = create_mock_chat_completions_server(responses).await;
|
||||
let server = responses::start_mock_server().await;
|
||||
let body1 = responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_assistant_message("msg-1", "Done"),
|
||||
responses::ev_completed("resp-1"),
|
||||
]);
|
||||
let body2 = responses::sse(vec![
|
||||
responses::ev_response_created("resp-2"),
|
||||
responses::ev_assistant_message("msg-2", "Done"),
|
||||
responses::ev_completed("resp-2"),
|
||||
]);
|
||||
let _response_mock1 = responses::mount_sse_once(&server, body1).await;
|
||||
let _response_mock2 = responses::mount_sse_once(&server, body2).await;
|
||||
|
||||
// Create a temporary Codex home with config pointing at the mock server.
|
||||
let codex_home = TempDir::new()?;
|
||||
@@ -135,8 +142,13 @@ async fn send_message(
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_send_message_raw_notifications_opt_in() -> Result<()> {
|
||||
let responses = vec![create_final_assistant_message_sse_response("Done")?];
|
||||
let server = create_mock_chat_completions_server(responses).await;
|
||||
let server = responses::start_mock_server().await;
|
||||
let body = responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_assistant_message("msg-1", "Done"),
|
||||
responses::ev_completed("resp-1"),
|
||||
]);
|
||||
let _response_mock = responses::mount_sse_once(&server, body).await;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
@@ -259,7 +271,7 @@ model_provider = "mock_provider"
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "{server_uri}/v1"
|
||||
wire_api = "chat"
|
||||
wire_api = "responses"
|
||||
request_max_retries = 0
|
||||
stream_max_retries = 0
|
||||
"#
|
||||
@@ -269,6 +281,7 @@ stream_max_retries = 0
|
||||
|
||||
#[expect(clippy::expect_used)]
|
||||
async fn read_raw_response_item(mcp: &mut McpProcess, conversation_id: ThreadId) -> ResponseItem {
|
||||
// TODO: Switch to rawResponseItem/completed once we migrate to app server v2 in codex web.
|
||||
loop {
|
||||
let raw_notification: JSONRPCNotification = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use anyhow::Result;
|
||||
use app_test_support::DEFAULT_CLIENT_NAME;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::GetUserAgentResponse;
|
||||
@@ -25,13 +26,13 @@ async fn get_user_agent_returns_current_codex_user_agent() -> Result<()> {
|
||||
.await??;
|
||||
|
||||
let os_info = os_info::get();
|
||||
let originator = codex_core::default_client::originator().value.as_str();
|
||||
let originator = DEFAULT_CLIENT_NAME;
|
||||
let os_type = os_info.os_type();
|
||||
let os_version = os_info.version();
|
||||
let architecture = os_info.architecture().unwrap_or("unknown");
|
||||
let terminal_ua = codex_core::terminal::user_agent();
|
||||
let user_agent = format!(
|
||||
"{originator}/0.0.0 ({os_type} {os_version}; {architecture}) {terminal_ua} (codex-app-server-tests; 0.1.0)"
|
||||
"{originator}/0.0.0 ({os_type} {os_version}; {architecture}) {terminal_ua} ({DEFAULT_CLIENT_NAME}; 0.1.0)"
|
||||
);
|
||||
|
||||
let received: GetUserAgentResponse = to_response(response)?;
|
||||
|
||||
@@ -67,7 +67,7 @@ model_provider = "mock_provider"
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "http://127.0.0.1:0/v1"
|
||||
wire_api = "chat"
|
||||
wire_api = "responses"
|
||||
request_max_retries = 0
|
||||
stream_max_retries = 0
|
||||
{requires_line}
|
||||
|
||||
137
codex-rs/app-server/tests/suite/v2/initialize.rs
Normal file
137
codex-rs/app-server/tests/suite/v2/initialize.rs
Normal file
@@ -0,0 +1,137 @@
|
||||
use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_mock_responses_server_sequence_unchecked;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::ClientInfo;
|
||||
use codex_app_server_protocol::InitializeResponse;
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::path::Path;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
|
||||
#[tokio::test]
|
||||
async fn initialize_uses_client_info_name_as_originator() -> Result<()> {
|
||||
let responses = Vec::new();
|
||||
let server = create_mock_responses_server_sequence_unchecked(responses).await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri(), "never")?;
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
|
||||
let message = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.initialize_with_client_info(ClientInfo {
|
||||
name: "codex_vscode".to_string(),
|
||||
title: Some("Codex VS Code Extension".to_string()),
|
||||
version: "0.1.0".to_string(),
|
||||
}),
|
||||
)
|
||||
.await??;
|
||||
|
||||
let JSONRPCMessage::Response(response) = message else {
|
||||
anyhow::bail!("expected initialize response, got {message:?}");
|
||||
};
|
||||
let InitializeResponse { user_agent } = to_response::<InitializeResponse>(response)?;
|
||||
|
||||
assert!(user_agent.starts_with("codex_vscode/"));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn initialize_respects_originator_override_env_var() -> Result<()> {
|
||||
let responses = Vec::new();
|
||||
let server = create_mock_responses_server_sequence_unchecked(responses).await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri(), "never")?;
|
||||
let mut mcp = McpProcess::new_with_env(
|
||||
codex_home.path(),
|
||||
&[(
|
||||
"CODEX_INTERNAL_ORIGINATOR_OVERRIDE",
|
||||
Some("codex_originator_via_env_var"),
|
||||
)],
|
||||
)
|
||||
.await?;
|
||||
|
||||
let message = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.initialize_with_client_info(ClientInfo {
|
||||
name: "codex_vscode".to_string(),
|
||||
title: Some("Codex VS Code Extension".to_string()),
|
||||
version: "0.1.0".to_string(),
|
||||
}),
|
||||
)
|
||||
.await??;
|
||||
|
||||
let JSONRPCMessage::Response(response) = message else {
|
||||
anyhow::bail!("expected initialize response, got {message:?}");
|
||||
};
|
||||
let InitializeResponse { user_agent } = to_response::<InitializeResponse>(response)?;
|
||||
|
||||
assert!(user_agent.starts_with("codex_originator_via_env_var/"));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn initialize_rejects_invalid_client_name() -> Result<()> {
|
||||
let responses = Vec::new();
|
||||
let server = create_mock_responses_server_sequence_unchecked(responses).await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri(), "never")?;
|
||||
let mut mcp = McpProcess::new_with_env(
|
||||
codex_home.path(),
|
||||
&[("CODEX_INTERNAL_ORIGINATOR_OVERRIDE", None)],
|
||||
)
|
||||
.await?;
|
||||
|
||||
let message = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.initialize_with_client_info(ClientInfo {
|
||||
name: "bad\rname".to_string(),
|
||||
title: Some("Bad Client".to_string()),
|
||||
version: "0.1.0".to_string(),
|
||||
}),
|
||||
)
|
||||
.await??;
|
||||
|
||||
let JSONRPCMessage::Error(error) = message else {
|
||||
anyhow::bail!("expected initialize error, got {message:?}");
|
||||
};
|
||||
|
||||
assert_eq!(error.error.code, -32600);
|
||||
assert_eq!(
|
||||
error.error.message,
|
||||
"Invalid clientInfo.name: 'bad\rname'. Must be a valid HTTP header value."
|
||||
);
|
||||
assert_eq!(error.error.data, None);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Helper to create a config.toml pointing at the mock model server.
|
||||
fn create_config_toml(
|
||||
codex_home: &Path,
|
||||
server_uri: &str,
|
||||
approval_policy: &str,
|
||||
) -> std::io::Result<()> {
|
||||
let config_toml = codex_home.join("config.toml");
|
||||
std::fs::write(
|
||||
config_toml,
|
||||
format!(
|
||||
r#"
|
||||
model = "mock-model"
|
||||
approval_policy = "{approval_policy}"
|
||||
sandbox_mode = "read-only"
|
||||
|
||||
model_provider = "mock_provider"
|
||||
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "{server_uri}/v1"
|
||||
wire_api = "responses"
|
||||
request_max_retries = 0
|
||||
stream_max_retries = 0
|
||||
"#
|
||||
),
|
||||
)
|
||||
}
|
||||
@@ -1,11 +1,14 @@
|
||||
mod account;
|
||||
mod config_rpc;
|
||||
mod initialize;
|
||||
mod model_list;
|
||||
mod output_schema;
|
||||
mod rate_limits;
|
||||
mod review;
|
||||
mod thread_archive;
|
||||
mod thread_fork;
|
||||
mod thread_list;
|
||||
mod thread_loaded_list;
|
||||
mod thread_resume;
|
||||
mod thread_rollback;
|
||||
mod thread_start;
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_final_assistant_message_sse_response;
|
||||
use app_test_support::create_mock_chat_completions_server_unchecked;
|
||||
use app_test_support::create_mock_responses_server_repeating_assistant;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::ItemCompletedNotification;
|
||||
use codex_app_server_protocol::ItemStartedNotification;
|
||||
@@ -44,10 +43,7 @@ async fn review_start_runs_review_turn_and_emits_code_review_item() -> Result<()
|
||||
"overall_confidence_score": 0.75
|
||||
})
|
||||
.to_string();
|
||||
let responses = vec![create_final_assistant_message_sse_response(
|
||||
&review_payload,
|
||||
)?];
|
||||
let server = create_mock_chat_completions_server_unchecked(responses).await;
|
||||
let server = create_mock_responses_server_repeating_assistant(&review_payload).await;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
@@ -135,7 +131,7 @@ async fn review_start_runs_review_turn_and_emits_code_review_item() -> Result<()
|
||||
|
||||
#[tokio::test]
|
||||
async fn review_start_rejects_empty_base_branch() -> Result<()> {
|
||||
let server = create_mock_chat_completions_server_unchecked(vec![]).await;
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
@@ -176,10 +172,7 @@ async fn review_start_with_detached_delivery_returns_new_thread_id() -> Result<(
|
||||
"overall_confidence_score": 0.5
|
||||
})
|
||||
.to_string();
|
||||
let responses = vec![create_final_assistant_message_sse_response(
|
||||
&review_payload,
|
||||
)?];
|
||||
let server = create_mock_chat_completions_server_unchecked(responses).await;
|
||||
let server = create_mock_responses_server_repeating_assistant(&review_payload).await;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
@@ -219,7 +212,7 @@ async fn review_start_with_detached_delivery_returns_new_thread_id() -> Result<(
|
||||
|
||||
#[tokio::test]
|
||||
async fn review_start_rejects_empty_commit_sha() -> Result<()> {
|
||||
let server = create_mock_chat_completions_server_unchecked(vec![]).await;
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
@@ -254,7 +247,7 @@ async fn review_start_rejects_empty_commit_sha() -> Result<()> {
|
||||
|
||||
#[tokio::test]
|
||||
async fn review_start_rejects_empty_custom_instructions() -> Result<()> {
|
||||
let server = create_mock_chat_completions_server_unchecked(vec![]).await;
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
@@ -320,7 +313,7 @@ model_provider = "mock_provider"
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider"
|
||||
base_url = "{server_uri}/v1"
|
||||
wire_api = "chat"
|
||||
wire_api = "responses"
|
||||
request_max_retries = 0
|
||||
stream_max_retries = 0
|
||||
"#
|
||||
|
||||
140
codex-rs/app-server/tests/suite/v2/thread_fork.rs
Normal file
140
codex-rs/app-server/tests/suite/v2/thread_fork.rs
Normal file
@@ -0,0 +1,140 @@
|
||||
use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_fake_rollout;
|
||||
use app_test_support::create_mock_responses_server_repeating_assistant;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::SessionSource;
|
||||
use codex_app_server_protocol::ThreadForkParams;
|
||||
use codex_app_server_protocol::ThreadForkResponse;
|
||||
use codex_app_server_protocol::ThreadItem;
|
||||
use codex_app_server_protocol::ThreadStartedNotification;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_app_server_protocol::UserInput;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::path::Path;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_fork_creates_new_thread_and_emits_started() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let preview = "Saved user message";
|
||||
let conversation_id = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-05T12-00-00",
|
||||
"2025-01-05T12:00:00Z",
|
||||
preview,
|
||||
Some("mock_provider"),
|
||||
None,
|
||||
)?;
|
||||
|
||||
let original_path = codex_home
|
||||
.path()
|
||||
.join("sessions")
|
||||
.join("2025")
|
||||
.join("01")
|
||||
.join("05")
|
||||
.join(format!(
|
||||
"rollout-2025-01-05T12-00-00-{conversation_id}.jsonl"
|
||||
));
|
||||
assert!(
|
||||
original_path.exists(),
|
||||
"expected original rollout to exist at {}",
|
||||
original_path.display()
|
||||
);
|
||||
let original_contents = std::fs::read_to_string(&original_path)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let fork_id = mcp
|
||||
.send_thread_fork_request(ThreadForkParams {
|
||||
thread_id: conversation_id.clone(),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let fork_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(fork_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadForkResponse { thread, .. } = to_response::<ThreadForkResponse>(fork_resp)?;
|
||||
|
||||
let after_contents = std::fs::read_to_string(&original_path)?;
|
||||
assert_eq!(
|
||||
after_contents, original_contents,
|
||||
"fork should not mutate the original rollout file"
|
||||
);
|
||||
|
||||
assert_ne!(thread.id, conversation_id);
|
||||
assert_eq!(thread.preview, preview);
|
||||
assert_eq!(thread.model_provider, "mock_provider");
|
||||
assert!(thread.path.is_absolute());
|
||||
assert_ne!(thread.path, original_path);
|
||||
assert!(thread.cwd.is_absolute());
|
||||
assert_eq!(thread.source, SessionSource::VsCode);
|
||||
|
||||
assert_eq!(
|
||||
thread.turns.len(),
|
||||
1,
|
||||
"expected forked thread to include one turn"
|
||||
);
|
||||
let turn = &thread.turns[0];
|
||||
assert_eq!(turn.status, TurnStatus::Completed);
|
||||
assert_eq!(turn.items.len(), 1, "expected user message item");
|
||||
match &turn.items[0] {
|
||||
ThreadItem::UserMessage { content, .. } => {
|
||||
assert_eq!(
|
||||
content,
|
||||
&vec![UserInput::Text {
|
||||
text: preview.to_string()
|
||||
}]
|
||||
);
|
||||
}
|
||||
other => panic!("expected user message item, got {other:?}"),
|
||||
}
|
||||
|
||||
// A corresponding thread/started notification should arrive.
|
||||
let notif: JSONRPCNotification = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("thread/started"),
|
||||
)
|
||||
.await??;
|
||||
let started: ThreadStartedNotification =
|
||||
serde_json::from_value(notif.params.expect("params must be present"))?;
|
||||
assert_eq!(started.thread, thread);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Helper to create a config.toml pointing at the mock model server.
|
||||
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
|
||||
let config_toml = codex_home.join("config.toml");
|
||||
std::fs::write(
|
||||
config_toml,
|
||||
format!(
|
||||
r#"
|
||||
model = "mock-model"
|
||||
approval_policy = "never"
|
||||
sandbox_mode = "read-only"
|
||||
|
||||
model_provider = "mock_provider"
|
||||
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "{server_uri}/v1"
|
||||
wire_api = "responses"
|
||||
request_max_retries = 0
|
||||
stream_max_retries = 0
|
||||
"#
|
||||
),
|
||||
)
|
||||
}
|
||||
139
codex-rs/app-server/tests/suite/v2/thread_loaded_list.rs
Normal file
139
codex-rs/app-server/tests/suite/v2/thread_loaded_list.rs
Normal file
@@ -0,0 +1,139 @@
|
||||
use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_mock_responses_server_repeating_assistant;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ThreadLoadedListParams;
|
||||
use codex_app_server_protocol::ThreadLoadedListResponse;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::path::Path;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_loaded_list_returns_loaded_thread_ids() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let thread_id = start_thread(&mut mcp).await?;
|
||||
|
||||
let list_id = mcp
|
||||
.send_thread_loaded_list_request(ThreadLoadedListParams::default())
|
||||
.await?;
|
||||
let resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(list_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadLoadedListResponse {
|
||||
mut data,
|
||||
next_cursor,
|
||||
} = to_response::<ThreadLoadedListResponse>(resp)?;
|
||||
data.sort();
|
||||
assert_eq!(data, vec![thread_id]);
|
||||
assert_eq!(next_cursor, None);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_loaded_list_paginates() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let first = start_thread(&mut mcp).await?;
|
||||
let second = start_thread(&mut mcp).await?;
|
||||
|
||||
let mut expected = [first, second];
|
||||
expected.sort();
|
||||
|
||||
let list_id = mcp
|
||||
.send_thread_loaded_list_request(ThreadLoadedListParams {
|
||||
cursor: None,
|
||||
limit: Some(1),
|
||||
})
|
||||
.await?;
|
||||
let resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(list_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadLoadedListResponse {
|
||||
data: first_page,
|
||||
next_cursor,
|
||||
} = to_response::<ThreadLoadedListResponse>(resp)?;
|
||||
assert_eq!(first_page, vec![expected[0].clone()]);
|
||||
assert_eq!(next_cursor, Some(expected[0].clone()));
|
||||
|
||||
let list_id = mcp
|
||||
.send_thread_loaded_list_request(ThreadLoadedListParams {
|
||||
cursor: next_cursor,
|
||||
limit: Some(1),
|
||||
})
|
||||
.await?;
|
||||
let resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(list_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadLoadedListResponse {
|
||||
data: second_page,
|
||||
next_cursor,
|
||||
} = to_response::<ThreadLoadedListResponse>(resp)?;
|
||||
assert_eq!(second_page, vec![expected[1].clone()]);
|
||||
assert_eq!(next_cursor, None);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
|
||||
let config_toml = codex_home.join("config.toml");
|
||||
std::fs::write(
|
||||
config_toml,
|
||||
format!(
|
||||
r#"
|
||||
model = "mock-model"
|
||||
approval_policy = "never"
|
||||
sandbox_mode = "read-only"
|
||||
|
||||
model_provider = "mock_provider"
|
||||
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "{server_uri}/v1"
|
||||
wire_api = "responses"
|
||||
request_max_retries = 0
|
||||
stream_max_retries = 0
|
||||
"#
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
async fn start_thread(mcp: &mut McpProcess) -> Result<String> {
|
||||
let req_id = mcp
|
||||
.send_thread_start_request(ThreadStartParams {
|
||||
model: Some("gpt-5.1".to_string()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(req_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(resp)?;
|
||||
Ok(thread.id)
|
||||
}
|
||||
@@ -1,7 +1,7 @@
|
||||
use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_fake_rollout;
|
||||
use app_test_support::create_mock_chat_completions_server;
|
||||
use app_test_support::create_mock_responses_server_repeating_assistant;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
@@ -23,7 +23,7 @@ const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_resume_returns_original_thread() -> Result<()> {
|
||||
let server = create_mock_chat_completions_server(vec![]).await;
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
@@ -66,7 +66,7 @@ async fn thread_resume_returns_original_thread() -> Result<()> {
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_resume_returns_rollout_history() -> Result<()> {
|
||||
let server = create_mock_chat_completions_server(vec![]).await;
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
@@ -130,7 +130,7 @@ async fn thread_resume_returns_rollout_history() -> Result<()> {
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_resume_prefers_path_over_thread_id() -> Result<()> {
|
||||
let server = create_mock_chat_completions_server(vec![]).await;
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
@@ -174,7 +174,7 @@ async fn thread_resume_prefers_path_over_thread_id() -> Result<()> {
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_resume_supports_history_and_overrides() -> Result<()> {
|
||||
let server = create_mock_chat_completions_server(vec![]).await;
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
@@ -247,7 +247,7 @@ model_provider = "mock_provider"
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "{server_uri}/v1"
|
||||
wire_api = "chat"
|
||||
wire_api = "responses"
|
||||
request_max_retries = 0
|
||||
stream_max_retries = 0
|
||||
"#
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_final_assistant_message_sse_response;
|
||||
use app_test_support::create_mock_chat_completions_server_unchecked;
|
||||
use app_test_support::create_mock_responses_server_sequence_unchecked;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
@@ -28,7 +28,7 @@ async fn thread_rollback_drops_last_turns_and_persists_to_rollout() -> Result<()
|
||||
create_final_assistant_message_sse_response("Done")?,
|
||||
create_final_assistant_message_sse_response("Done")?,
|
||||
];
|
||||
let server = create_mock_chat_completions_server_unchecked(responses).await;
|
||||
let server = create_mock_responses_server_sequence_unchecked(responses).await;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
@@ -168,7 +168,7 @@ model_provider = "mock_provider"
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "{server_uri}/v1"
|
||||
wire_api = "chat"
|
||||
wire_api = "responses"
|
||||
request_max_retries = 0
|
||||
stream_max_retries = 0
|
||||
"#
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_mock_chat_completions_server;
|
||||
use app_test_support::create_mock_responses_server_repeating_assistant;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
@@ -17,7 +17,7 @@ const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs
|
||||
#[tokio::test]
|
||||
async fn thread_start_creates_thread_and_emits_started() -> Result<()> {
|
||||
// Provide a mock server and config so model wiring is valid.
|
||||
let server = create_mock_chat_completions_server(vec![]).await;
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
@@ -85,7 +85,7 @@ model_provider = "mock_provider"
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "{server_uri}/v1"
|
||||
wire_api = "chat"
|
||||
wire_api = "responses"
|
||||
request_max_retries = 0
|
||||
stream_max_retries = 0
|
||||
"#
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_mock_chat_completions_server;
|
||||
use app_test_support::create_mock_responses_server_sequence;
|
||||
use app_test_support::create_shell_command_sse_response;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
@@ -41,7 +41,7 @@ async fn turn_interrupt_aborts_running_turn() -> Result<()> {
|
||||
std::fs::create_dir(&working_directory)?;
|
||||
|
||||
// Mock server: long-running shell command then (after abort) nothing else needed.
|
||||
let server = create_mock_chat_completions_server(vec![create_shell_command_sse_response(
|
||||
let server = create_mock_responses_server_sequence(vec![create_shell_command_sse_response(
|
||||
shell_command.clone(),
|
||||
Some(&working_directory),
|
||||
Some(10_000),
|
||||
@@ -135,7 +135,7 @@ model_provider = "mock_provider"
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "{server_uri}/v1"
|
||||
wire_api = "chat"
|
||||
wire_api = "responses"
|
||||
request_max_retries = 0
|
||||
stream_max_retries = 0
|
||||
"#
|
||||
|
||||
@@ -3,11 +3,12 @@ use app_test_support::McpProcess;
|
||||
use app_test_support::create_apply_patch_sse_response;
|
||||
use app_test_support::create_exec_command_sse_response;
|
||||
use app_test_support::create_final_assistant_message_sse_response;
|
||||
use app_test_support::create_mock_chat_completions_server;
|
||||
use app_test_support::create_mock_chat_completions_server_unchecked;
|
||||
use app_test_support::create_mock_responses_server_sequence;
|
||||
use app_test_support::create_mock_responses_server_sequence_unchecked;
|
||||
use app_test_support::create_shell_command_sse_response;
|
||||
use app_test_support::format_with_current_shell_display;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::ClientInfo;
|
||||
use codex_app_server_protocol::CommandExecutionApprovalDecision;
|
||||
use codex_app_server_protocol::CommandExecutionRequestApprovalResponse;
|
||||
use codex_app_server_protocol::CommandExecutionStatus;
|
||||
@@ -40,6 +41,76 @@ use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
const TEST_ORIGINATOR: &str = "codex_vscode";
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_start_sends_originator_header() -> Result<()> {
|
||||
let responses = vec![create_final_assistant_message_sse_response("Done")?];
|
||||
let server = create_mock_responses_server_sequence_unchecked(responses).await;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri(), "never")?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.initialize_with_client_info(ClientInfo {
|
||||
name: TEST_ORIGINATOR.to_string(),
|
||||
title: Some("Codex VS Code Extension".to_string()),
|
||||
version: "0.1.0".to_string(),
|
||||
}),
|
||||
)
|
||||
.await??;
|
||||
|
||||
let thread_req = mcp
|
||||
.send_thread_start_request(ThreadStartParams {
|
||||
model: Some("mock-model".to_string()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let thread_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
|
||||
|
||||
let turn_req = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id.clone(),
|
||||
input: vec![V2UserInput::Text {
|
||||
text: "Hello".to_string(),
|
||||
}],
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
|
||||
)
|
||||
.await??;
|
||||
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("turn/completed"),
|
||||
)
|
||||
.await??;
|
||||
|
||||
let requests = server
|
||||
.received_requests()
|
||||
.await
|
||||
.expect("failed to fetch received requests");
|
||||
assert!(!requests.is_empty());
|
||||
for request in requests {
|
||||
let originator = request
|
||||
.headers
|
||||
.get("originator")
|
||||
.expect("originator header missing");
|
||||
assert_eq!(originator.to_str()?, TEST_ORIGINATOR);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_start_emits_notifications_and_accepts_model_override() -> Result<()> {
|
||||
@@ -50,7 +121,7 @@ async fn turn_start_emits_notifications_and_accepts_model_override() -> Result<(
|
||||
create_final_assistant_message_sse_response("Done")?,
|
||||
create_final_assistant_message_sse_response("Done")?,
|
||||
];
|
||||
let server = create_mock_chat_completions_server_unchecked(responses).await;
|
||||
let server = create_mock_responses_server_sequence_unchecked(responses).await;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri(), "never")?;
|
||||
@@ -157,7 +228,7 @@ async fn turn_start_accepts_local_image_input() -> Result<()> {
|
||||
];
|
||||
// Use the unchecked variant because the request payload includes a LocalImage
|
||||
// which the strict matcher does not currently cover.
|
||||
let server = create_mock_chat_completions_server_unchecked(responses).await;
|
||||
let server = create_mock_responses_server_sequence_unchecked(responses).await;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri(), "never")?;
|
||||
@@ -233,7 +304,7 @@ async fn turn_start_exec_approval_toggle_v2() -> Result<()> {
|
||||
)?,
|
||||
create_final_assistant_message_sse_response("done 2")?,
|
||||
];
|
||||
let server = create_mock_chat_completions_server(responses).await;
|
||||
let server = create_mock_responses_server_sequence(responses).await;
|
||||
// Default approval is untrusted to force elicitation on first turn.
|
||||
create_config_toml(codex_home.as_path(), &server.uri(), "untrusted")?;
|
||||
|
||||
@@ -357,7 +428,7 @@ async fn turn_start_exec_approval_decline_v2() -> Result<()> {
|
||||
)?,
|
||||
create_final_assistant_message_sse_response("done")?,
|
||||
];
|
||||
let server = create_mock_chat_completions_server(responses).await;
|
||||
let server = create_mock_responses_server_sequence(responses).await;
|
||||
create_config_toml(codex_home.as_path(), &server.uri(), "untrusted")?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.as_path()).await?;
|
||||
@@ -503,7 +574,7 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
|
||||
)?,
|
||||
create_final_assistant_message_sse_response("done second")?,
|
||||
];
|
||||
let server = create_mock_chat_completions_server(responses).await;
|
||||
let server = create_mock_responses_server_sequence(responses).await;
|
||||
create_config_toml(&codex_home, &server.uri(), "untrusted")?;
|
||||
|
||||
let mut mcp = McpProcess::new(&codex_home).await?;
|
||||
@@ -554,6 +625,7 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
|
||||
mcp.read_stream_until_notification_message("codex/event/task_complete"),
|
||||
)
|
||||
.await??;
|
||||
mcp.clear_message_buffer();
|
||||
|
||||
// second turn with workspace-write and second_cwd, ensure exec begins in second_cwd
|
||||
let second_turn = mcp
|
||||
@@ -636,7 +708,7 @@ async fn turn_start_file_change_approval_v2() -> Result<()> {
|
||||
create_apply_patch_sse_response(patch, "patch-call")?,
|
||||
create_final_assistant_message_sse_response("patch applied")?,
|
||||
];
|
||||
let server = create_mock_chat_completions_server(responses).await;
|
||||
let server = create_mock_responses_server_sequence(responses).await;
|
||||
create_config_toml(&codex_home, &server.uri(), "untrusted")?;
|
||||
|
||||
let mut mcp = McpProcess::new(&codex_home).await?;
|
||||
@@ -812,7 +884,7 @@ async fn turn_start_file_change_approval_accept_for_session_persists_v2() -> Res
|
||||
create_apply_patch_sse_response(patch_2, "patch-call-2")?,
|
||||
create_final_assistant_message_sse_response("patch 2 applied")?,
|
||||
];
|
||||
let server = create_mock_chat_completions_server(responses).await;
|
||||
let server = create_mock_responses_server_sequence(responses).await;
|
||||
create_config_toml(&codex_home, &server.uri(), "untrusted")?;
|
||||
|
||||
let mut mcp = McpProcess::new(&codex_home).await?;
|
||||
@@ -986,7 +1058,7 @@ async fn turn_start_file_change_approval_decline_v2() -> Result<()> {
|
||||
create_apply_patch_sse_response(patch, "patch-call")?,
|
||||
create_final_assistant_message_sse_response("patch declined")?,
|
||||
];
|
||||
let server = create_mock_chat_completions_server(responses).await;
|
||||
let server = create_mock_responses_server_sequence(responses).await;
|
||||
create_config_toml(&codex_home, &server.uri(), "untrusted")?;
|
||||
|
||||
let mut mcp = McpProcess::new(&codex_home).await?;
|
||||
@@ -1124,7 +1196,7 @@ async fn command_execution_notifications_include_process_id() -> Result<()> {
|
||||
create_exec_command_sse_response("uexec-1")?,
|
||||
create_final_assistant_message_sse_response("done")?,
|
||||
];
|
||||
let server = create_mock_chat_completions_server(responses).await;
|
||||
let server = create_mock_responses_server_sequence(responses).await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri(), "never")?;
|
||||
let config_toml = codex_home.path().join("config.toml");
|
||||
@@ -1263,7 +1335,7 @@ model_provider = "mock_provider"
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "{server_uri}/v1"
|
||||
wire_api = "chat"
|
||||
wire_api = "responses"
|
||||
request_max_retries = 0
|
||||
stream_max_retries = 0
|
||||
"#
|
||||
|
||||
11
codex-rs/apply-patch/BUILD.bazel
Normal file
11
codex-rs/apply-patch/BUILD.bazel
Normal file
@@ -0,0 +1,11 @@
|
||||
load("//:defs.bzl", "codex_rust_crate")
|
||||
|
||||
exports_files(["apply_patch_tool_instructions.md"])
|
||||
|
||||
codex_rust_crate(
|
||||
name = "apply-patch",
|
||||
crate_name = "codex_apply_patch",
|
||||
compile_data = [
|
||||
"apply_patch_tool_instructions.md",
|
||||
],
|
||||
)
|
||||
6
codex-rs/arg0/BUILD.bazel
Normal file
6
codex-rs/arg0/BUILD.bazel
Normal file
@@ -0,0 +1,6 @@
|
||||
load("//:defs.bzl", "codex_rust_crate")
|
||||
|
||||
codex_rust_crate(
|
||||
name = "arg0",
|
||||
crate_name = "codex_arg0",
|
||||
)
|
||||
@@ -145,11 +145,41 @@ where
|
||||
/// that `apply_patch` can be on the PATH without requiring the user to
|
||||
/// install a separate `apply_patch` executable, simplifying the deployment of
|
||||
/// Codex CLI.
|
||||
/// Note: In debug builds the temp-dir guard is disabled to ease local testing.
|
||||
///
|
||||
/// IMPORTANT: This function modifies the PATH environment variable, so it MUST
|
||||
/// be called before multiple threads are spawned.
|
||||
pub fn prepend_path_entry_for_codex_aliases() -> std::io::Result<TempDir> {
|
||||
let temp_dir = TempDir::new()?;
|
||||
let codex_home = codex_core::config::find_codex_home()?;
|
||||
#[cfg(not(debug_assertions))]
|
||||
{
|
||||
// Guard against placing helpers in system temp directories outside debug builds.
|
||||
let temp_root = std::env::temp_dir();
|
||||
if codex_home.starts_with(&temp_root) {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidInput,
|
||||
format!(
|
||||
"Refusing to create helper binaries under temporary dir {temp_root:?} (codex_home: {codex_home:?})"
|
||||
),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
std::fs::create_dir_all(&codex_home)?;
|
||||
// Use a CODEX_HOME-scoped temp root to avoid cluttering the top-level directory.
|
||||
let temp_root = codex_home.join("tmp").join("path");
|
||||
std::fs::create_dir_all(&temp_root)?;
|
||||
#[cfg(unix)]
|
||||
{
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
|
||||
// Ensure only the current user can access the temp directory.
|
||||
std::fs::set_permissions(&temp_root, std::fs::Permissions::from_mode(0o700))?;
|
||||
}
|
||||
|
||||
let temp_dir = tempfile::Builder::new()
|
||||
.prefix("codex-arg0")
|
||||
.tempdir_in(&temp_root)?;
|
||||
let path = temp_dir.path();
|
||||
|
||||
for filename in &[
|
||||
|
||||
6
codex-rs/async-utils/BUILD.bazel
Normal file
6
codex-rs/async-utils/BUILD.bazel
Normal file
@@ -0,0 +1,6 @@
|
||||
load("//:defs.bzl", "codex_rust_crate")
|
||||
|
||||
codex_rust_crate(
|
||||
name = "async-utils",
|
||||
crate_name = "codex_async_utils",
|
||||
)
|
||||
7
codex-rs/backend-client/BUILD.bazel
Normal file
7
codex-rs/backend-client/BUILD.bazel
Normal file
@@ -0,0 +1,7 @@
|
||||
load("//:defs.bzl", "codex_rust_crate")
|
||||
|
||||
codex_rust_crate(
|
||||
name = "backend-client",
|
||||
crate_name = "codex_backend_client",
|
||||
compile_data = glob(["tests/fixtures/**"]),
|
||||
)
|
||||
@@ -73,8 +73,8 @@ impl Client {
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn from_auth(base_url: impl Into<String>, auth: &CodexAuth) -> Result<Self> {
|
||||
let token = auth.get_token().await.map_err(anyhow::Error::from)?;
|
||||
pub fn from_auth(base_url: impl Into<String>, auth: &CodexAuth) -> Result<Self> {
|
||||
let token = auth.get_token().map_err(anyhow::Error::from)?;
|
||||
let mut client = Self::new(base_url)?
|
||||
.with_user_agent(get_codex_user_agent())
|
||||
.with_bearer_token(token);
|
||||
|
||||
6
codex-rs/chatgpt/BUILD.bazel
Normal file
6
codex-rs/chatgpt/BUILD.bazel
Normal file
@@ -0,0 +1,6 @@
|
||||
load("//:defs.bzl", "codex_rust_crate")
|
||||
|
||||
codex_rust_crate(
|
||||
name = "chatgpt",
|
||||
crate_name = "codex_chatgpt",
|
||||
)
|
||||
@@ -1,4 +1,4 @@
|
||||
use codex_core::CodexAuth;
|
||||
use codex_core::AuthManager;
|
||||
use std::path::Path;
|
||||
use std::sync::LazyLock;
|
||||
use std::sync::RwLock;
|
||||
@@ -23,9 +23,10 @@ pub async fn init_chatgpt_token_from_auth(
|
||||
codex_home: &Path,
|
||||
auth_credentials_store_mode: AuthCredentialsStoreMode,
|
||||
) -> std::io::Result<()> {
|
||||
let auth = CodexAuth::from_auth_storage(codex_home, auth_credentials_store_mode)?;
|
||||
if let Some(auth) = auth {
|
||||
let token_data = auth.get_token_data().await?;
|
||||
let auth_manager =
|
||||
AuthManager::new(codex_home.to_path_buf(), false, auth_credentials_store_mode);
|
||||
if let Some(auth) = auth_manager.auth().await {
|
||||
let token_data = auth.get_token_data()?;
|
||||
set_chatgpt_token_data(token_data);
|
||||
}
|
||||
Ok(())
|
||||
|
||||
10
codex-rs/cli/BUILD.bazel
Normal file
10
codex-rs/cli/BUILD.bazel
Normal file
@@ -0,0 +1,10 @@
|
||||
load("//:defs.bzl", "codex_rust_crate", "multiplatform_binaries")
|
||||
|
||||
codex_rust_crate(
|
||||
name = "cli",
|
||||
crate_name = "codex_cli",
|
||||
)
|
||||
|
||||
multiplatform_binaries(
|
||||
name = "codex",
|
||||
)
|
||||
@@ -30,7 +30,6 @@ codex-exec = { workspace = true }
|
||||
codex-execpolicy = { workspace = true }
|
||||
codex-login = { workspace = true }
|
||||
codex-mcp-server = { workspace = true }
|
||||
codex-process-hardening = { workspace = true }
|
||||
codex-protocol = { workspace = true }
|
||||
codex-responses-api-proxy = { workspace = true }
|
||||
codex-rmcp-client = { workspace = true }
|
||||
@@ -38,7 +37,6 @@ codex-stdio-to-uds = { workspace = true }
|
||||
codex-tui = { workspace = true }
|
||||
codex-tui2 = { workspace = true }
|
||||
codex-utils-absolute-path = { workspace = true }
|
||||
ctor = { workspace = true }
|
||||
libc = { workspace = true }
|
||||
owo-colors = { workspace = true }
|
||||
regex-lite = { workspace = true }
|
||||
|
||||
@@ -14,6 +14,18 @@ use std::io::IsTerminal;
|
||||
use std::io::Read;
|
||||
use std::path::PathBuf;
|
||||
|
||||
const CHATGPT_LOGIN_DISABLED_MESSAGE: &str =
|
||||
"ChatGPT login is disabled. Use API key login instead.";
|
||||
const API_KEY_LOGIN_DISABLED_MESSAGE: &str =
|
||||
"API key login is disabled. Use ChatGPT login instead.";
|
||||
const LOGIN_SUCCESS_MESSAGE: &str = "Successfully logged in";
|
||||
|
||||
fn print_login_server_start(actual_port: u16, auth_url: &str) {
|
||||
eprintln!(
|
||||
"Starting local login server on http://localhost:{actual_port}.\nIf your browser did not open, navigate to this URL to authenticate:\n\n{auth_url}"
|
||||
);
|
||||
}
|
||||
|
||||
pub async fn login_with_chatgpt(
|
||||
codex_home: PathBuf,
|
||||
forced_chatgpt_workspace_id: Option<String>,
|
||||
@@ -27,10 +39,7 @@ pub async fn login_with_chatgpt(
|
||||
);
|
||||
let server = run_login_server(opts)?;
|
||||
|
||||
eprintln!(
|
||||
"Starting local login server on http://localhost:{}.\nIf your browser did not open, navigate to this URL to authenticate:\n\n{}",
|
||||
server.actual_port, server.auth_url,
|
||||
);
|
||||
print_login_server_start(server.actual_port, &server.auth_url);
|
||||
|
||||
server.block_until_done().await
|
||||
}
|
||||
@@ -39,7 +48,7 @@ pub async fn run_login_with_chatgpt(cli_config_overrides: CliConfigOverrides) ->
|
||||
let config = load_config_or_exit(cli_config_overrides).await;
|
||||
|
||||
if matches!(config.forced_login_method, Some(ForcedLoginMethod::Api)) {
|
||||
eprintln!("ChatGPT login is disabled. Use API key login instead.");
|
||||
eprintln!("{CHATGPT_LOGIN_DISABLED_MESSAGE}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
@@ -53,7 +62,7 @@ pub async fn run_login_with_chatgpt(cli_config_overrides: CliConfigOverrides) ->
|
||||
.await
|
||||
{
|
||||
Ok(_) => {
|
||||
eprintln!("Successfully logged in");
|
||||
eprintln!("{LOGIN_SUCCESS_MESSAGE}");
|
||||
std::process::exit(0);
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -70,7 +79,7 @@ pub async fn run_login_with_api_key(
|
||||
let config = load_config_or_exit(cli_config_overrides).await;
|
||||
|
||||
if matches!(config.forced_login_method, Some(ForcedLoginMethod::Chatgpt)) {
|
||||
eprintln!("API key login is disabled. Use ChatGPT login instead.");
|
||||
eprintln!("{API_KEY_LOGIN_DISABLED_MESSAGE}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
@@ -80,7 +89,7 @@ pub async fn run_login_with_api_key(
|
||||
config.cli_auth_credentials_store_mode,
|
||||
) {
|
||||
Ok(_) => {
|
||||
eprintln!("Successfully logged in");
|
||||
eprintln!("{LOGIN_SUCCESS_MESSAGE}");
|
||||
std::process::exit(0);
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -125,7 +134,7 @@ pub async fn run_login_with_device_code(
|
||||
) -> ! {
|
||||
let config = load_config_or_exit(cli_config_overrides).await;
|
||||
if matches!(config.forced_login_method, Some(ForcedLoginMethod::Api)) {
|
||||
eprintln!("ChatGPT login is disabled. Use API key login instead.");
|
||||
eprintln!("{CHATGPT_LOGIN_DISABLED_MESSAGE}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
let forced_chatgpt_workspace_id = config.forced_chatgpt_workspace_id.clone();
|
||||
@@ -140,7 +149,7 @@ pub async fn run_login_with_device_code(
|
||||
}
|
||||
match run_device_code_login(opts).await {
|
||||
Ok(()) => {
|
||||
eprintln!("Successfully logged in");
|
||||
eprintln!("{LOGIN_SUCCESS_MESSAGE}");
|
||||
std::process::exit(0);
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -150,12 +159,74 @@ pub async fn run_login_with_device_code(
|
||||
}
|
||||
}
|
||||
|
||||
/// Prefers device-code login (with `open_browser = false`) when headless environment is detected, but keeps
|
||||
/// `codex login` working in environments where device-code may be disabled/feature-gated.
|
||||
/// If `run_device_code_login` returns `ErrorKind::NotFound` ("device-code unsupported"), this
|
||||
/// falls back to starting the local browser login server.
|
||||
pub async fn run_login_with_device_code_fallback_to_browser(
|
||||
cli_config_overrides: CliConfigOverrides,
|
||||
issuer_base_url: Option<String>,
|
||||
client_id: Option<String>,
|
||||
) -> ! {
|
||||
let config = load_config_or_exit(cli_config_overrides).await;
|
||||
if matches!(config.forced_login_method, Some(ForcedLoginMethod::Api)) {
|
||||
eprintln!("{CHATGPT_LOGIN_DISABLED_MESSAGE}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
let forced_chatgpt_workspace_id = config.forced_chatgpt_workspace_id.clone();
|
||||
let mut opts = ServerOptions::new(
|
||||
config.codex_home,
|
||||
client_id.unwrap_or(CLIENT_ID.to_string()),
|
||||
forced_chatgpt_workspace_id,
|
||||
config.cli_auth_credentials_store_mode,
|
||||
);
|
||||
if let Some(iss) = issuer_base_url {
|
||||
opts.issuer = iss;
|
||||
}
|
||||
opts.open_browser = false;
|
||||
|
||||
match run_device_code_login(opts.clone()).await {
|
||||
Ok(()) => {
|
||||
eprintln!("{LOGIN_SUCCESS_MESSAGE}");
|
||||
std::process::exit(0);
|
||||
}
|
||||
Err(e) => {
|
||||
if e.kind() == std::io::ErrorKind::NotFound {
|
||||
eprintln!("Device code login is not enabled; falling back to browser login.");
|
||||
match run_login_server(opts) {
|
||||
Ok(server) => {
|
||||
print_login_server_start(server.actual_port, &server.auth_url);
|
||||
match server.block_until_done().await {
|
||||
Ok(()) => {
|
||||
eprintln!("{LOGIN_SUCCESS_MESSAGE}");
|
||||
std::process::exit(0);
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("Error logging in: {e}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("Error logging in: {e}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
eprintln!("Error logging in with device code: {e}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run_login_status(cli_config_overrides: CliConfigOverrides) -> ! {
|
||||
let config = load_config_or_exit(cli_config_overrides).await;
|
||||
|
||||
match CodexAuth::from_auth_storage(&config.codex_home, config.cli_auth_credentials_store_mode) {
|
||||
Ok(Some(auth)) => match auth.mode {
|
||||
AuthMode::ApiKey => match auth.get_token().await {
|
||||
AuthMode::ApiKey => match auth.get_token() {
|
||||
Ok(api_key) => {
|
||||
eprintln!("Logged in using an API key - {}", safe_format_key(&api_key));
|
||||
std::process::exit(0);
|
||||
|
||||
@@ -14,9 +14,11 @@ use codex_cli::login::run_login_status;
|
||||
use codex_cli::login::run_login_with_api_key;
|
||||
use codex_cli::login::run_login_with_chatgpt;
|
||||
use codex_cli::login::run_login_with_device_code;
|
||||
use codex_cli::login::run_login_with_device_code_fallback_to_browser;
|
||||
use codex_cli::login::run_logout;
|
||||
use codex_cloud_tasks::Cli as CloudTasksCli;
|
||||
use codex_common::CliConfigOverrides;
|
||||
use codex_core::env::is_headless_environment;
|
||||
use codex_exec::Cli as ExecCli;
|
||||
use codex_exec::Command as ExecCommand;
|
||||
use codex_exec::ReviewArgs;
|
||||
@@ -418,14 +420,6 @@ fn stage_str(stage: codex_core::features::Stage) -> &'static str {
|
||||
}
|
||||
}
|
||||
|
||||
/// As early as possible in the process lifecycle, apply hardening measures. We
|
||||
/// skip this in debug builds to avoid interfering with debugging.
|
||||
#[ctor::ctor]
|
||||
#[cfg(not(debug_assertions))]
|
||||
fn pre_main_hardening() {
|
||||
codex_process_hardening::pre_main_hardening();
|
||||
}
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
arg0_dispatch_or_else(|codex_linux_sandbox_exe| async move {
|
||||
cli_main(codex_linux_sandbox_exe).await?;
|
||||
@@ -539,6 +533,13 @@ async fn cli_main(codex_linux_sandbox_exe: Option<PathBuf>) -> anyhow::Result<()
|
||||
} else if login_cli.with_api_key {
|
||||
let api_key = read_api_key_from_stdin();
|
||||
run_login_with_api_key(login_cli.config_overrides, api_key).await;
|
||||
} else if is_headless_environment() {
|
||||
run_login_with_device_code_fallback_to_browser(
|
||||
login_cli.config_overrides,
|
||||
login_cli.issuer_base_url,
|
||||
login_cli.client_id,
|
||||
)
|
||||
.await;
|
||||
} else {
|
||||
run_login_with_chatgpt(login_cli.config_overrides).await;
|
||||
}
|
||||
|
||||
10
codex-rs/cloud-tasks-client/BUILD.bazel
Normal file
10
codex-rs/cloud-tasks-client/BUILD.bazel
Normal file
@@ -0,0 +1,10 @@
|
||||
load("//:defs.bzl", "codex_rust_crate")
|
||||
|
||||
codex_rust_crate(
|
||||
name = "cloud-tasks-client",
|
||||
crate_name = "codex_cloud_tasks_client",
|
||||
crate_features = [
|
||||
"mock",
|
||||
"online",
|
||||
],
|
||||
)
|
||||
6
codex-rs/cloud-tasks/BUILD.bazel
Normal file
6
codex-rs/cloud-tasks/BUILD.bazel
Normal file
@@ -0,0 +1,6 @@
|
||||
load("//:defs.bzl", "codex_rust_crate")
|
||||
|
||||
codex_rust_crate(
|
||||
name = "cloud-tasks",
|
||||
crate_name = "codex_cloud_tasks",
|
||||
)
|
||||
@@ -10,7 +10,6 @@ pub use cli::Cli;
|
||||
use anyhow::anyhow;
|
||||
use chrono::Utc;
|
||||
use codex_cloud_tasks_client::TaskStatus;
|
||||
use codex_login::AuthManager;
|
||||
use owo_colors::OwoColorize;
|
||||
use owo_colors::Stream;
|
||||
use std::cmp::Ordering;
|
||||
@@ -65,7 +64,11 @@ async fn init_backend(user_agent_suffix: &str) -> anyhow::Result<BackendContext>
|
||||
append_error_log(format!("startup: base_url={base_url} path_style={style}"));
|
||||
|
||||
let auth_manager = util::load_auth_manager().await;
|
||||
let auth = match auth_manager.as_ref().and_then(AuthManager::auth) {
|
||||
let auth = match auth_manager.as_ref() {
|
||||
Some(manager) => manager.auth().await,
|
||||
None => None,
|
||||
};
|
||||
let auth = match auth {
|
||||
Some(auth) => auth,
|
||||
None => {
|
||||
eprintln!(
|
||||
@@ -79,7 +82,7 @@ async fn init_backend(user_agent_suffix: &str) -> anyhow::Result<BackendContext>
|
||||
append_error_log(format!("auth: mode=ChatGPT account_id={acc}"));
|
||||
}
|
||||
|
||||
let token = match auth.get_token().await {
|
||||
let token = match auth.get_token() {
|
||||
Ok(t) if !t.is_empty() => t,
|
||||
_ => {
|
||||
eprintln!(
|
||||
|
||||
@@ -85,8 +85,8 @@ pub async fn build_chatgpt_headers() -> HeaderMap {
|
||||
HeaderValue::from_str(&ua).unwrap_or(HeaderValue::from_static("codex-cli")),
|
||||
);
|
||||
if let Some(am) = load_auth_manager().await
|
||||
&& let Some(auth) = am.auth()
|
||||
&& let Ok(tok) = auth.get_token().await
|
||||
&& let Some(auth) = am.auth().await
|
||||
&& let Ok(tok) = auth.get_token()
|
||||
&& !tok.is_empty()
|
||||
{
|
||||
let v = format!("Bearer {tok}");
|
||||
|
||||
6
codex-rs/codex-api/BUILD.bazel
Normal file
6
codex-rs/codex-api/BUILD.bazel
Normal file
@@ -0,0 +1,6 @@
|
||||
load("//:defs.bzl", "codex_rust_crate")
|
||||
|
||||
codex_rust_crate(
|
||||
name = "codex-api",
|
||||
crate_name = "codex_api",
|
||||
)
|
||||
@@ -301,7 +301,9 @@ pub async fn process_sse(
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
_ => {
|
||||
trace!("unhandled SSE event: {:#?}", event.kind);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
6
codex-rs/codex-backend-openapi-models/BUILD.bazel
Normal file
6
codex-rs/codex-backend-openapi-models/BUILD.bazel
Normal file
@@ -0,0 +1,6 @@
|
||||
load("//:defs.bzl", "codex_rust_crate")
|
||||
|
||||
codex_rust_crate(
|
||||
name = "codex-backend-openapi-models",
|
||||
crate_name = "codex_backend_openapi_models",
|
||||
)
|
||||
6
codex-rs/codex-client/BUILD.bazel
Normal file
6
codex-rs/codex-client/BUILD.bazel
Normal file
@@ -0,0 +1,6 @@
|
||||
load("//:defs.bzl", "codex_rust_crate")
|
||||
|
||||
codex_rust_crate(
|
||||
name = "codex-client",
|
||||
crate_name = "codex_client",
|
||||
)
|
||||
@@ -7,6 +7,7 @@ pub enum TransportError {
|
||||
#[error("http {status}: {body:?}")]
|
||||
Http {
|
||||
status: StatusCode,
|
||||
url: Option<String>,
|
||||
headers: Option<HeaderMap>,
|
||||
body: Option<String>,
|
||||
},
|
||||
|
||||
@@ -131,6 +131,7 @@ impl HttpTransport for ReqwestTransport {
|
||||
);
|
||||
}
|
||||
|
||||
let url = req.url.clone();
|
||||
let builder = self.build(req)?;
|
||||
let resp = builder.send().await.map_err(Self::map_error)?;
|
||||
let status = resp.status();
|
||||
@@ -140,6 +141,7 @@ impl HttpTransport for ReqwestTransport {
|
||||
let body = String::from_utf8(bytes.to_vec()).ok();
|
||||
return Err(TransportError::Http {
|
||||
status,
|
||||
url: Some(url),
|
||||
headers: Some(headers),
|
||||
body,
|
||||
});
|
||||
@@ -161,6 +163,7 @@ impl HttpTransport for ReqwestTransport {
|
||||
);
|
||||
}
|
||||
|
||||
let url = req.url.clone();
|
||||
let builder = self.build(req)?;
|
||||
let resp = builder.send().await.map_err(Self::map_error)?;
|
||||
let status = resp.status();
|
||||
@@ -169,6 +172,7 @@ impl HttpTransport for ReqwestTransport {
|
||||
let body = resp.text().await.ok();
|
||||
return Err(TransportError::Http {
|
||||
status,
|
||||
url: Some(url),
|
||||
headers: Some(headers),
|
||||
body,
|
||||
});
|
||||
|
||||
11
codex-rs/common/BUILD.bazel
Normal file
11
codex-rs/common/BUILD.bazel
Normal file
@@ -0,0 +1,11 @@
|
||||
load("//:defs.bzl", "codex_rust_crate")
|
||||
|
||||
codex_rust_crate(
|
||||
name = "common",
|
||||
crate_name = "codex_common",
|
||||
crate_features = [
|
||||
"cli",
|
||||
"elapsed",
|
||||
"sandbox_summary",
|
||||
],
|
||||
)
|
||||
40
codex-rs/core/BUILD.bazel
Normal file
40
codex-rs/core/BUILD.bazel
Normal file
@@ -0,0 +1,40 @@
|
||||
load("//:defs.bzl", "codex_rust_crate")
|
||||
|
||||
codex_rust_crate(
|
||||
name = "core",
|
||||
crate_name = "codex_core",
|
||||
# TODO(mbolin): Eliminate the use of features in the version of the
|
||||
# rust_library() that is used by rust_binary() rules for release artifacts
|
||||
# such as the Codex CLI.
|
||||
crate_features = ["deterministic_process_ids", "test-support"],
|
||||
compile_data = glob(
|
||||
include = ["**"],
|
||||
exclude = [
|
||||
"**/* *",
|
||||
"BUILD.bazel",
|
||||
"Cargo.toml",
|
||||
],
|
||||
allow_empty = True,
|
||||
),
|
||||
integration_compile_data_extra = [
|
||||
"//codex-rs/apply-patch:apply_patch_tool_instructions.md",
|
||||
"prompt.md",
|
||||
],
|
||||
# This is a bit of a hack, but empirically, some of our integration tests
|
||||
# are relying on the presence of this file as a repo root marker. When
|
||||
# running tests locally, this "just works," but in remote execution,
|
||||
# the working directory is different and so the file is not found unless it
|
||||
# is explicitly added as test data.
|
||||
#
|
||||
# TODO(aibrahim): Update the tests so that `just bazel-remote-test` succeeds
|
||||
# without this workaround.
|
||||
test_data_extra = ["//:AGENTS.md"],
|
||||
integration_deps_extra = ["//codex-rs/core/tests/common:common"],
|
||||
test_tags = ["no-sandbox"],
|
||||
extra_binaries = [
|
||||
"//codex-rs/linux-sandbox:codex-linux-sandbox",
|
||||
"//codex-rs/rmcp-client:test_stdio_server",
|
||||
"//codex-rs/rmcp-client:test_streamable_http_server",
|
||||
"//codex-rs/cli:codex",
|
||||
],
|
||||
)
|
||||
7
codex-rs/core/hierarchical_agents_message.md
Normal file
7
codex-rs/core/hierarchical_agents_message.md
Normal file
@@ -0,0 +1,7 @@
|
||||
Files called AGENTS.md commonly appear in many places inside a container - at "/", in "~", deep within git repositories, or in any other directory; their location is not limited to version-controlled folders.
|
||||
|
||||
Their purpose is to pass along human guidance to you, the agent. Such guidance can include coding standards, explanations of the project layout, steps for building or testing, and even wording that must accompany a GitHub pull-request description produced by the agent; all of it is to be followed.
|
||||
|
||||
Each AGENTS.md governs the entire directory that contains it and every child directory beneath that point. Whenever you change a file, you have to comply with every AGENTS.md whose scope covers that file. Naming conventions, stylistic rules and similar directives are restricted to the code that falls inside that scope unless the document explicitly states otherwise.
|
||||
|
||||
When two AGENTS.md files disagree, the one located deeper in the directory structure overrides the higher-level file, while instructions given directly in the prompt by the system, developer, or user outrank any AGENTS.md content.
|
||||
File diff suppressed because one or more lines are too long
@@ -116,10 +116,10 @@ mod tests {
|
||||
use super::*;
|
||||
use crate::agent::agent_status_from_event;
|
||||
use codex_protocol::protocol::ErrorEvent;
|
||||
use codex_protocol::protocol::TaskCompleteEvent;
|
||||
use codex_protocol::protocol::TaskStartedEvent;
|
||||
use codex_protocol::protocol::TurnAbortReason;
|
||||
use codex_protocol::protocol::TurnAbortedEvent;
|
||||
use codex_protocol::protocol::TurnCompleteEvent;
|
||||
use codex_protocol::protocol::TurnStartedEvent;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[tokio::test]
|
||||
@@ -144,7 +144,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn on_event_updates_status_from_task_started() {
|
||||
let status = agent_status_from_event(&EventMsg::TaskStarted(TaskStartedEvent {
|
||||
let status = agent_status_from_event(&EventMsg::TurnStarted(TurnStartedEvent {
|
||||
model_context_window: None,
|
||||
}));
|
||||
assert_eq!(status, Some(AgentStatus::Running));
|
||||
@@ -152,7 +152,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn on_event_updates_status_from_task_complete() {
|
||||
let status = agent_status_from_event(&EventMsg::TaskComplete(TaskCompleteEvent {
|
||||
let status = agent_status_from_event(&EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
last_agent_message: Some("done".to_string()),
|
||||
}));
|
||||
let expected = AgentStatus::Completed(Some("done".to_string()));
|
||||
|
||||
@@ -5,8 +5,8 @@ use codex_protocol::protocol::EventMsg;
|
||||
/// Returns `None` when the event does not affect status tracking.
|
||||
pub(crate) fn agent_status_from_event(msg: &EventMsg) -> Option<AgentStatus> {
|
||||
match msg {
|
||||
EventMsg::TaskStarted(_) => Some(AgentStatus::Running),
|
||||
EventMsg::TaskComplete(ev) => Some(AgentStatus::Completed(ev.last_agent_message.clone())),
|
||||
EventMsg::TurnStarted(_) => Some(AgentStatus::Running),
|
||||
EventMsg::TurnComplete(ev) => Some(AgentStatus::Completed(ev.last_agent_message.clone())),
|
||||
EventMsg::TurnAborted(ev) => Some(AgentStatus::Errored(format!("{:?}", ev.reason))),
|
||||
EventMsg::Error(ev) => Some(AgentStatus::Errored(ev.message.clone())),
|
||||
EventMsg::ShutdownComplete => Some(AgentStatus::Shutdown),
|
||||
|
||||
@@ -25,11 +25,13 @@ pub(crate) fn map_api_error(err: ApiError) -> CodexErr {
|
||||
ApiError::Api { status, message } => CodexErr::UnexpectedStatus(UnexpectedResponseError {
|
||||
status,
|
||||
body: message,
|
||||
url: None,
|
||||
request_id: None,
|
||||
}),
|
||||
ApiError::Transport(transport) => match transport {
|
||||
TransportError::Http {
|
||||
status,
|
||||
url,
|
||||
headers,
|
||||
body,
|
||||
} => {
|
||||
@@ -71,6 +73,7 @@ pub(crate) fn map_api_error(err: ApiError) -> CodexErr {
|
||||
CodexErr::UnexpectedStatus(UnexpectedResponseError {
|
||||
status,
|
||||
body: body_text,
|
||||
url,
|
||||
request_id: extract_request_id(headers.as_ref()),
|
||||
})
|
||||
}
|
||||
@@ -100,7 +103,7 @@ fn extract_request_id(headers: Option<&HeaderMap>) -> Option<String> {
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn auth_provider_from_auth(
|
||||
pub(crate) fn auth_provider_from_auth(
|
||||
auth: Option<CodexAuth>,
|
||||
provider: &ModelProviderInfo,
|
||||
) -> crate::error::Result<CoreAuthProvider> {
|
||||
@@ -119,7 +122,7 @@ pub(crate) async fn auth_provider_from_auth(
|
||||
}
|
||||
|
||||
if let Some(auth) = auth {
|
||||
let token = auth.get_token().await?;
|
||||
let token = auth.get_token()?;
|
||||
Ok(CoreAuthProvider {
|
||||
token: Some(token),
|
||||
account_id: auth.get_account_id(),
|
||||
|
||||
@@ -8,12 +8,10 @@ use serde::Serialize;
|
||||
use serial_test::serial;
|
||||
use std::env;
|
||||
use std::fmt::Debug;
|
||||
use std::io::ErrorKind;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
|
||||
use codex_app_server_protocol::AuthMode;
|
||||
use codex_protocol::config_types::ForcedLoginMethod;
|
||||
@@ -77,10 +75,6 @@ impl RefreshTokenError {
|
||||
Self::Transient(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn other_with_message(message: impl Into<String>) -> Self {
|
||||
Self::Transient(std::io::Error::other(message.into()))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<RefreshTokenError> for std::io::Error {
|
||||
@@ -93,40 +87,6 @@ impl From<RefreshTokenError> for std::io::Error {
|
||||
}
|
||||
|
||||
impl CodexAuth {
|
||||
pub async fn refresh_token(&self) -> Result<String, RefreshTokenError> {
|
||||
tracing::info!("Refreshing token");
|
||||
|
||||
let token_data = self.get_current_token_data().ok_or_else(|| {
|
||||
RefreshTokenError::Transient(std::io::Error::other("Token data is not available."))
|
||||
})?;
|
||||
let token = token_data.refresh_token;
|
||||
|
||||
let refresh_response = try_refresh_token(token, &self.client).await?;
|
||||
|
||||
let updated = update_tokens(
|
||||
&self.storage,
|
||||
refresh_response.id_token,
|
||||
refresh_response.access_token,
|
||||
refresh_response.refresh_token,
|
||||
)
|
||||
.await
|
||||
.map_err(RefreshTokenError::from)?;
|
||||
|
||||
if let Ok(mut auth_lock) = self.auth_dot_json.lock() {
|
||||
*auth_lock = Some(updated.clone());
|
||||
}
|
||||
|
||||
let access = match updated.tokens {
|
||||
Some(t) => t.access_token,
|
||||
None => {
|
||||
return Err(RefreshTokenError::other_with_message(
|
||||
"Token data is not available after refresh.",
|
||||
));
|
||||
}
|
||||
};
|
||||
Ok(access)
|
||||
}
|
||||
|
||||
/// Loads the available auth information from auth storage.
|
||||
pub fn from_auth_storage(
|
||||
codex_home: &Path,
|
||||
@@ -135,62 +95,23 @@ impl CodexAuth {
|
||||
load_auth(codex_home, false, auth_credentials_store_mode)
|
||||
}
|
||||
|
||||
pub async fn get_token_data(&self) -> Result<TokenData, std::io::Error> {
|
||||
pub fn get_token_data(&self) -> Result<TokenData, std::io::Error> {
|
||||
let auth_dot_json: Option<AuthDotJson> = self.get_current_auth_json();
|
||||
match auth_dot_json {
|
||||
Some(AuthDotJson {
|
||||
tokens: Some(mut tokens),
|
||||
last_refresh: Some(last_refresh),
|
||||
tokens: Some(tokens),
|
||||
last_refresh: Some(_),
|
||||
..
|
||||
}) => {
|
||||
if last_refresh < Utc::now() - chrono::Duration::days(TOKEN_REFRESH_INTERVAL) {
|
||||
let refresh_result = tokio::time::timeout(
|
||||
Duration::from_secs(60),
|
||||
try_refresh_token(tokens.refresh_token.clone(), &self.client),
|
||||
)
|
||||
.await;
|
||||
let refresh_response = match refresh_result {
|
||||
Ok(Ok(response)) => response,
|
||||
Ok(Err(err)) => return Err(err.into()),
|
||||
Err(_) => {
|
||||
return Err(std::io::Error::new(
|
||||
ErrorKind::TimedOut,
|
||||
"timed out while refreshing OpenAI API key",
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
let updated_auth_dot_json = update_tokens(
|
||||
&self.storage,
|
||||
refresh_response.id_token,
|
||||
refresh_response.access_token,
|
||||
refresh_response.refresh_token,
|
||||
)
|
||||
.await?;
|
||||
|
||||
tokens = updated_auth_dot_json
|
||||
.tokens
|
||||
.clone()
|
||||
.ok_or(std::io::Error::other(
|
||||
"Token data is not available after refresh.",
|
||||
))?;
|
||||
|
||||
#[expect(clippy::unwrap_used)]
|
||||
let mut auth_lock = self.auth_dot_json.lock().unwrap();
|
||||
*auth_lock = Some(updated_auth_dot_json);
|
||||
}
|
||||
|
||||
Ok(tokens)
|
||||
}
|
||||
}) => Ok(tokens),
|
||||
_ => Err(std::io::Error::other("Token data is not available.")),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_token(&self) -> Result<String, std::io::Error> {
|
||||
pub fn get_token(&self) -> Result<String, std::io::Error> {
|
||||
match self.mode {
|
||||
AuthMode::ApiKey => Ok(self.api_key.clone().unwrap_or_default()),
|
||||
AuthMode::ChatGPT => {
|
||||
let id_token = self.get_token_data().await?.access_token;
|
||||
let id_token = self.get_token_data()?.access_token;
|
||||
Ok(id_token)
|
||||
}
|
||||
}
|
||||
@@ -338,7 +259,7 @@ pub fn load_auth_dot_json(
|
||||
storage.load()
|
||||
}
|
||||
|
||||
pub async fn enforce_login_restrictions(config: &Config) -> std::io::Result<()> {
|
||||
pub fn enforce_login_restrictions(config: &Config) -> std::io::Result<()> {
|
||||
let Some(auth) = load_auth(
|
||||
&config.codex_home,
|
||||
true,
|
||||
@@ -376,7 +297,7 @@ pub async fn enforce_login_restrictions(config: &Config) -> std::io::Result<()>
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let token_data = match auth.get_token_data().await {
|
||||
let token_data = match auth.get_token_data() {
|
||||
Ok(data) => data,
|
||||
Err(err) => {
|
||||
return logout_with_message(
|
||||
@@ -525,6 +446,7 @@ async fn try_refresh_token(
|
||||
Ok(refresh_response)
|
||||
} else {
|
||||
let body = response.text().await.unwrap_or_default();
|
||||
tracing::error!("Failed to refresh token: {status}: {body}");
|
||||
if status == StatusCode::UNAUTHORIZED {
|
||||
let failed = classify_refresh_token_failure(&body);
|
||||
Err(RefreshTokenError::Permanent(failed))
|
||||
@@ -623,6 +545,89 @@ struct CachedAuth {
|
||||
auth: Option<CodexAuth>,
|
||||
}
|
||||
|
||||
enum UnauthorizedRecoveryStep {
|
||||
Reload,
|
||||
RefreshToken,
|
||||
Done,
|
||||
}
|
||||
|
||||
enum ReloadOutcome {
|
||||
Reloaded,
|
||||
Skipped,
|
||||
}
|
||||
|
||||
// UnauthorizedRecovery is a state machine that handles an attempt to refresh the authentication when requests
|
||||
// to API fail with 401 status code.
|
||||
// The client calls next() every time it encounters a 401 error, one time per retry.
|
||||
// For API key based authentication, we don't do anything and let the error bubble to the user.
|
||||
// For ChatGPT based authentication, we:
|
||||
// 1. Attempt to reload the auth data from disk. We only reload if the account id matches the one the current process is running as.
|
||||
// 2. Attempt to refresh the token using OAuth token refresh flow.
|
||||
// If after both steps the server still responds with 401 we let the error bubble to the user.
|
||||
pub struct UnauthorizedRecovery {
|
||||
manager: Arc<AuthManager>,
|
||||
step: UnauthorizedRecoveryStep,
|
||||
expected_account_id: Option<String>,
|
||||
}
|
||||
|
||||
impl UnauthorizedRecovery {
|
||||
fn new(manager: Arc<AuthManager>) -> Self {
|
||||
let expected_account_id = manager
|
||||
.auth_cached()
|
||||
.as_ref()
|
||||
.and_then(CodexAuth::get_account_id);
|
||||
Self {
|
||||
manager,
|
||||
step: UnauthorizedRecoveryStep::Reload,
|
||||
expected_account_id,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn has_next(&self) -> bool {
|
||||
if !self
|
||||
.manager
|
||||
.auth_cached()
|
||||
.is_some_and(|auth| auth.mode == AuthMode::ChatGPT)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
!matches!(self.step, UnauthorizedRecoveryStep::Done)
|
||||
}
|
||||
|
||||
pub async fn next(&mut self) -> Result<(), RefreshTokenError> {
|
||||
if !self.has_next() {
|
||||
return Err(RefreshTokenError::Permanent(RefreshTokenFailedError::new(
|
||||
RefreshTokenFailedReason::Other,
|
||||
"No more recovery steps available.",
|
||||
)));
|
||||
}
|
||||
|
||||
match self.step {
|
||||
UnauthorizedRecoveryStep::Reload => {
|
||||
match self
|
||||
.manager
|
||||
.reload_if_account_id_matches(self.expected_account_id.as_deref())
|
||||
{
|
||||
ReloadOutcome::Reloaded => {
|
||||
self.step = UnauthorizedRecoveryStep::RefreshToken;
|
||||
}
|
||||
ReloadOutcome::Skipped => {
|
||||
self.manager.refresh_token().await?;
|
||||
self.step = UnauthorizedRecoveryStep::Done;
|
||||
}
|
||||
}
|
||||
}
|
||||
UnauthorizedRecoveryStep::RefreshToken => {
|
||||
self.manager.refresh_token().await?;
|
||||
self.step = UnauthorizedRecoveryStep::Done;
|
||||
}
|
||||
UnauthorizedRecoveryStep::Done => {}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Central manager providing a single source of truth for auth.json derived
|
||||
/// authentication data. It loads once (or on preference change) and then
|
||||
/// hands out cloned `CodexAuth` values so the rest of the program has a
|
||||
@@ -689,28 +694,53 @@ impl AuthManager {
|
||||
})
|
||||
}
|
||||
|
||||
/// Current cached auth (clone). May be `None` if not logged in or load failed.
|
||||
pub fn auth(&self) -> Option<CodexAuth> {
|
||||
/// Current cached auth (clone) without attempting a refresh.
|
||||
pub fn auth_cached(&self) -> Option<CodexAuth> {
|
||||
self.inner.read().ok().and_then(|c| c.auth.clone())
|
||||
}
|
||||
|
||||
/// Current cached auth (clone). May be `None` if not logged in or load failed.
|
||||
/// Refreshes cached ChatGPT tokens if they are stale before returning.
|
||||
pub async fn auth(&self) -> Option<CodexAuth> {
|
||||
let auth = self.auth_cached()?;
|
||||
if let Err(err) = self.refresh_if_stale(&auth).await {
|
||||
tracing::error!("Failed to refresh token: {}", err);
|
||||
return Some(auth);
|
||||
}
|
||||
self.auth_cached()
|
||||
}
|
||||
|
||||
/// Force a reload of the auth information from auth.json. Returns
|
||||
/// whether the auth value changed.
|
||||
pub fn reload(&self) -> bool {
|
||||
let new_auth = load_auth(
|
||||
&self.codex_home,
|
||||
self.enable_codex_api_key_env,
|
||||
self.auth_credentials_store_mode,
|
||||
)
|
||||
.ok()
|
||||
.flatten();
|
||||
if let Ok(mut guard) = self.inner.write() {
|
||||
let changed = !AuthManager::auths_equal(&guard.auth, &new_auth);
|
||||
guard.auth = new_auth;
|
||||
changed
|
||||
} else {
|
||||
false
|
||||
tracing::info!("Reloading auth");
|
||||
let new_auth = self.load_auth_from_storage();
|
||||
self.set_auth(new_auth)
|
||||
}
|
||||
|
||||
fn reload_if_account_id_matches(&self, expected_account_id: Option<&str>) -> ReloadOutcome {
|
||||
let expected_account_id = match expected_account_id {
|
||||
Some(account_id) => account_id,
|
||||
None => {
|
||||
tracing::info!("Skipping auth reload because no account id is available.");
|
||||
return ReloadOutcome::Skipped;
|
||||
}
|
||||
};
|
||||
|
||||
let new_auth = self.load_auth_from_storage();
|
||||
let new_account_id = new_auth.as_ref().and_then(CodexAuth::get_account_id);
|
||||
|
||||
if new_account_id.as_deref() != Some(expected_account_id) {
|
||||
let found_account_id = new_account_id.as_deref().unwrap_or("unknown");
|
||||
tracing::info!(
|
||||
"Skipping auth reload due to account id mismatch (expected: {expected_account_id}, found: {found_account_id})"
|
||||
);
|
||||
return ReloadOutcome::Skipped;
|
||||
}
|
||||
|
||||
tracing::info!("Reloading auth for account {expected_account_id}");
|
||||
self.set_auth(new_auth);
|
||||
ReloadOutcome::Reloaded
|
||||
}
|
||||
|
||||
fn auths_equal(a: &Option<CodexAuth>, b: &Option<CodexAuth>) -> bool {
|
||||
@@ -721,6 +751,27 @@ impl AuthManager {
|
||||
}
|
||||
}
|
||||
|
||||
fn load_auth_from_storage(&self) -> Option<CodexAuth> {
|
||||
load_auth(
|
||||
&self.codex_home,
|
||||
self.enable_codex_api_key_env,
|
||||
self.auth_credentials_store_mode,
|
||||
)
|
||||
.ok()
|
||||
.flatten()
|
||||
}
|
||||
|
||||
fn set_auth(&self, new_auth: Option<CodexAuth>) -> bool {
|
||||
if let Ok(mut guard) = self.inner.write() {
|
||||
let changed = !AuthManager::auths_equal(&guard.auth, &new_auth);
|
||||
tracing::info!("Reloaded auth, changed: {changed}");
|
||||
guard.auth = new_auth;
|
||||
changed
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// Convenience constructor returning an `Arc` wrapper.
|
||||
pub fn shared(
|
||||
codex_home: PathBuf,
|
||||
@@ -734,26 +785,27 @@ impl AuthManager {
|
||||
))
|
||||
}
|
||||
|
||||
pub fn unauthorized_recovery(self: &Arc<Self>) -> UnauthorizedRecovery {
|
||||
UnauthorizedRecovery::new(Arc::clone(self))
|
||||
}
|
||||
|
||||
/// Attempt to refresh the current auth token (if any). On success, reload
|
||||
/// the auth state from disk so other components observe refreshed token.
|
||||
/// If the token refresh fails in a permanent (non‑transient) way, logs out
|
||||
/// to clear invalid auth state.
|
||||
pub async fn refresh_token(&self) -> Result<Option<String>, RefreshTokenError> {
|
||||
let auth = match self.auth() {
|
||||
Some(a) => a,
|
||||
None => return Ok(None),
|
||||
/// If the token refresh fails, returns the error to the caller.
|
||||
pub async fn refresh_token(&self) -> Result<(), RefreshTokenError> {
|
||||
tracing::info!("Refreshing token");
|
||||
|
||||
let auth = match self.auth_cached() {
|
||||
Some(auth) => auth,
|
||||
None => return Ok(()),
|
||||
};
|
||||
match auth.refresh_token().await {
|
||||
Ok(token) => {
|
||||
// Reload to pick up persisted changes.
|
||||
self.reload();
|
||||
Ok(Some(token))
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to refresh token: {}", e);
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
let token_data = auth.get_current_token_data().ok_or_else(|| {
|
||||
RefreshTokenError::Transient(std::io::Error::other("Token data is not available."))
|
||||
})?;
|
||||
self.refresh_tokens(&auth, token_data.refresh_token).await?;
|
||||
// Reload to pick up persisted changes.
|
||||
self.reload();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Log out by deleting the on‑disk auth.json (if present). Returns Ok(true)
|
||||
@@ -768,7 +820,51 @@ impl AuthManager {
|
||||
}
|
||||
|
||||
pub fn get_auth_mode(&self) -> Option<AuthMode> {
|
||||
self.auth().map(|a| a.mode)
|
||||
self.auth_cached().map(|a| a.mode)
|
||||
}
|
||||
|
||||
async fn refresh_if_stale(&self, auth: &CodexAuth) -> Result<bool, RefreshTokenError> {
|
||||
if auth.mode != AuthMode::ChatGPT {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let auth_dot_json = match auth.get_current_auth_json() {
|
||||
Some(auth_dot_json) => auth_dot_json,
|
||||
None => return Ok(false),
|
||||
};
|
||||
let tokens = match auth_dot_json.tokens {
|
||||
Some(tokens) => tokens,
|
||||
None => return Ok(false),
|
||||
};
|
||||
let last_refresh = match auth_dot_json.last_refresh {
|
||||
Some(last_refresh) => last_refresh,
|
||||
None => return Ok(false),
|
||||
};
|
||||
if last_refresh >= Utc::now() - chrono::Duration::days(TOKEN_REFRESH_INTERVAL) {
|
||||
return Ok(false);
|
||||
}
|
||||
self.refresh_tokens(auth, tokens.refresh_token).await?;
|
||||
self.reload();
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
async fn refresh_tokens(
|
||||
&self,
|
||||
auth: &CodexAuth,
|
||||
refresh_token: String,
|
||||
) -> Result<(), RefreshTokenError> {
|
||||
let refresh_response = try_refresh_token(refresh_token, &auth.client).await?;
|
||||
|
||||
update_tokens(
|
||||
&auth.storage,
|
||||
refresh_response.id_token,
|
||||
refresh_response.access_token,
|
||||
refresh_response.refresh_token,
|
||||
)
|
||||
.await
|
||||
.map_err(RefreshTokenError::from)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -930,7 +1026,7 @@ mod tests {
|
||||
assert_eq!(auth.mode, AuthMode::ApiKey);
|
||||
assert_eq!(auth.api_key, Some("sk-test-key".to_string()));
|
||||
|
||||
assert!(auth.get_token_data().await.is_err());
|
||||
assert!(auth.get_token_data().is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1058,7 +1154,6 @@ mod tests {
|
||||
let config = build_config(codex_home.path(), Some(ForcedLoginMethod::Chatgpt), None).await;
|
||||
|
||||
let err = super::enforce_login_restrictions(&config)
|
||||
.await
|
||||
.expect_err("expected method mismatch to error");
|
||||
assert!(err.to_string().contains("ChatGPT login is required"));
|
||||
assert!(
|
||||
@@ -1084,7 +1179,6 @@ mod tests {
|
||||
let config = build_config(codex_home.path(), None, Some("org_mine".to_string())).await;
|
||||
|
||||
let err = super::enforce_login_restrictions(&config)
|
||||
.await
|
||||
.expect_err("expected workspace mismatch to error");
|
||||
assert!(err.to_string().contains("workspace org_mine"));
|
||||
assert!(
|
||||
@@ -1109,9 +1203,7 @@ mod tests {
|
||||
|
||||
let config = build_config(codex_home.path(), None, Some("org_mine".to_string())).await;
|
||||
|
||||
super::enforce_login_restrictions(&config)
|
||||
.await
|
||||
.expect("matching workspace should succeed");
|
||||
super::enforce_login_restrictions(&config).expect("matching workspace should succeed");
|
||||
assert!(
|
||||
codex_home.path().join("auth.json").exists(),
|
||||
"auth.json should remain when restrictions pass"
|
||||
@@ -1127,9 +1219,7 @@ mod tests {
|
||||
|
||||
let config = build_config(codex_home.path(), None, Some("org_mine".to_string())).await;
|
||||
|
||||
super::enforce_login_restrictions(&config)
|
||||
.await
|
||||
.expect("matching workspace should succeed");
|
||||
super::enforce_login_restrictions(&config).expect("matching workspace should succeed");
|
||||
assert!(
|
||||
codex_home.path().join("auth.json").exists(),
|
||||
"auth.json should remain when restrictions pass"
|
||||
@@ -1145,7 +1235,6 @@ mod tests {
|
||||
let config = build_config(codex_home.path(), Some(ForcedLoginMethod::Chatgpt), None).await;
|
||||
|
||||
let err = super::enforce_login_restrictions(&config)
|
||||
.await
|
||||
.expect_err("environment API key should not satisfy forced ChatGPT login");
|
||||
assert!(
|
||||
err.to_string()
|
||||
|
||||
@@ -2,6 +2,7 @@ use std::sync::Arc;
|
||||
|
||||
use crate::api_bridge::auth_provider_from_auth;
|
||||
use crate::api_bridge::map_api_error;
|
||||
use crate::auth::UnauthorizedRecovery;
|
||||
use codex_api::AggregateStreamExt;
|
||||
use codex_api::ChatClient as ApiChatClient;
|
||||
use codex_api::CompactClient as ApiCompactClient;
|
||||
@@ -20,6 +21,7 @@ use codex_api::error::ApiError;
|
||||
use codex_api::requests::responses::Compression;
|
||||
use codex_app_server_protocol::AuthMode;
|
||||
use codex_otel::OtelManager;
|
||||
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
@@ -155,13 +157,18 @@ impl ModelClient {
|
||||
let conversation_id = self.conversation_id.to_string();
|
||||
let session_source = self.session_source.clone();
|
||||
|
||||
let mut refreshed = false;
|
||||
let mut auth_recovery = auth_manager
|
||||
.as_ref()
|
||||
.map(super::auth::AuthManager::unauthorized_recovery);
|
||||
loop {
|
||||
let auth = auth_manager.as_ref().and_then(|m| m.auth());
|
||||
let auth = match auth_manager.as_ref() {
|
||||
Some(manager) => manager.auth().await,
|
||||
None => None,
|
||||
};
|
||||
let api_provider = self
|
||||
.provider
|
||||
.to_api_provider(auth.as_ref().map(|a| a.mode))?;
|
||||
let api_auth = auth_provider_from_auth(auth.clone(), &self.provider).await?;
|
||||
let api_auth = auth_provider_from_auth(auth.clone(), &self.provider)?;
|
||||
let transport = ReqwestTransport::new(build_reqwest_client());
|
||||
let (request_telemetry, sse_telemetry) = self.build_streaming_telemetry();
|
||||
let client = ApiChatClient::new(transport, api_provider, api_auth)
|
||||
@@ -181,7 +188,7 @@ impl ModelClient {
|
||||
Err(ApiError::Transport(TransportError::Http { status, .. }))
|
||||
if status == StatusCode::UNAUTHORIZED =>
|
||||
{
|
||||
handle_unauthorized(status, &mut refreshed, &auth_manager, &auth).await?;
|
||||
handle_unauthorized(status, &mut auth_recovery).await?;
|
||||
continue;
|
||||
}
|
||||
Err(err) => return Err(map_api_error(err)),
|
||||
@@ -243,13 +250,18 @@ impl ModelClient {
|
||||
let conversation_id = self.conversation_id.to_string();
|
||||
let session_source = self.session_source.clone();
|
||||
|
||||
let mut refreshed = false;
|
||||
let mut auth_recovery = auth_manager
|
||||
.as_ref()
|
||||
.map(super::auth::AuthManager::unauthorized_recovery);
|
||||
loop {
|
||||
let auth = auth_manager.as_ref().and_then(|m| m.auth());
|
||||
let auth = match auth_manager.as_ref() {
|
||||
Some(manager) => manager.auth().await,
|
||||
None => None,
|
||||
};
|
||||
let api_provider = self
|
||||
.provider
|
||||
.to_api_provider(auth.as_ref().map(|a| a.mode))?;
|
||||
let api_auth = auth_provider_from_auth(auth.clone(), &self.provider).await?;
|
||||
let api_auth = auth_provider_from_auth(auth.clone(), &self.provider)?;
|
||||
let transport = ReqwestTransport::new(build_reqwest_client());
|
||||
let (request_telemetry, sse_telemetry) = self.build_streaming_telemetry();
|
||||
let compression = if self
|
||||
@@ -292,7 +304,7 @@ impl ModelClient {
|
||||
Err(ApiError::Transport(TransportError::Http { status, .. }))
|
||||
if status == StatusCode::UNAUTHORIZED =>
|
||||
{
|
||||
handle_unauthorized(status, &mut refreshed, &auth_manager, &auth).await?;
|
||||
handle_unauthorized(status, &mut auth_recovery).await?;
|
||||
continue;
|
||||
}
|
||||
Err(err) => return Err(map_api_error(err)),
|
||||
@@ -344,11 +356,14 @@ impl ModelClient {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
let auth_manager = self.auth_manager.clone();
|
||||
let auth = auth_manager.as_ref().and_then(|m| m.auth());
|
||||
let auth = match auth_manager.as_ref() {
|
||||
Some(manager) => manager.auth().await,
|
||||
None => None,
|
||||
};
|
||||
let api_provider = self
|
||||
.provider
|
||||
.to_api_provider(auth.as_ref().map(|a| a.mode))?;
|
||||
let api_auth = auth_provider_from_auth(auth.clone(), &self.provider).await?;
|
||||
let api_auth = auth_provider_from_auth(auth.clone(), &self.provider)?;
|
||||
let transport = ReqwestTransport::new(build_reqwest_client());
|
||||
let request_telemetry = self.build_request_telemetry();
|
||||
let client = ApiCompactClient::new(transport, api_provider, api_auth)
|
||||
@@ -500,34 +515,25 @@ where
|
||||
/// the mapped `CodexErr` is returned to the caller.
|
||||
async fn handle_unauthorized(
|
||||
status: StatusCode,
|
||||
refreshed: &mut bool,
|
||||
auth_manager: &Option<Arc<AuthManager>>,
|
||||
auth: &Option<crate::auth::CodexAuth>,
|
||||
auth_recovery: &mut Option<UnauthorizedRecovery>,
|
||||
) -> Result<()> {
|
||||
if *refreshed {
|
||||
return Err(map_unauthorized_status(status));
|
||||
}
|
||||
|
||||
if let Some(manager) = auth_manager.as_ref()
|
||||
&& let Some(auth) = auth.as_ref()
|
||||
&& auth.mode == AuthMode::ChatGPT
|
||||
if let Some(recovery) = auth_recovery
|
||||
&& recovery.has_next()
|
||||
{
|
||||
match manager.refresh_token().await {
|
||||
Ok(_) => {
|
||||
*refreshed = true;
|
||||
Ok(())
|
||||
}
|
||||
return match recovery.next().await {
|
||||
Ok(_) => Ok(()),
|
||||
Err(RefreshTokenError::Permanent(failed)) => Err(CodexErr::RefreshTokenFailed(failed)),
|
||||
Err(RefreshTokenError::Transient(other)) => Err(CodexErr::Io(other)),
|
||||
}
|
||||
} else {
|
||||
Err(map_unauthorized_status(status))
|
||||
};
|
||||
}
|
||||
|
||||
Err(map_unauthorized_status(status))
|
||||
}
|
||||
|
||||
fn map_unauthorized_status(status: StatusCode) -> CodexErr {
|
||||
map_api_error(ApiError::Transport(TransportError::Http {
|
||||
status,
|
||||
url: None,
|
||||
headers: None,
|
||||
body: None,
|
||||
}))
|
||||
|
||||
@@ -7,6 +7,7 @@ use std::sync::atomic::AtomicU64;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use crate::AuthManager;
|
||||
use crate::CodexAuth;
|
||||
use crate::SandboxState;
|
||||
use crate::agent::AgentControl;
|
||||
use crate::agent::AgentStatus;
|
||||
@@ -43,9 +44,9 @@ use codex_protocol::protocol::RawResponseItemEvent;
|
||||
use codex_protocol::protocol::ReviewRequest;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::TaskStartedEvent;
|
||||
use codex_protocol::protocol::TurnAbortReason;
|
||||
use codex_protocol::protocol::TurnContextItem;
|
||||
use codex_protocol::protocol::TurnStartedEvent;
|
||||
use codex_rmcp_client::ElicitationResponse;
|
||||
use futures::future::BoxFuture;
|
||||
use futures::prelude::*;
|
||||
@@ -484,7 +485,7 @@ pub(crate) struct SessionSettingsUpdate {
|
||||
|
||||
impl Session {
|
||||
/// Don't expand the number of mutated arguments on config. We are in the process of getting rid of it.
|
||||
fn build_per_turn_config(session_configuration: &SessionConfiguration) -> Config {
|
||||
pub(crate) fn build_per_turn_config(session_configuration: &SessionConfiguration) -> Config {
|
||||
// todo(aibrahim): store this state somewhere else so we don't need to mut config
|
||||
let config = session_configuration.original_config_do_not_use.clone();
|
||||
let mut per_turn_config = (*config).clone();
|
||||
@@ -633,20 +634,22 @@ impl Session {
|
||||
}
|
||||
maybe_push_chat_wire_api_deprecation(&config, &mut post_session_configured_events);
|
||||
|
||||
let auth = auth_manager.auth().await;
|
||||
let auth = auth.as_ref();
|
||||
let otel_manager = OtelManager::new(
|
||||
conversation_id,
|
||||
session_configuration.model.as_str(),
|
||||
session_configuration.model.as_str(),
|
||||
auth_manager.auth().and_then(|a| a.get_account_id()),
|
||||
auth_manager.auth().and_then(|a| a.get_account_email()),
|
||||
auth_manager.auth().map(|a| a.mode),
|
||||
auth.and_then(CodexAuth::get_account_id),
|
||||
auth.and_then(CodexAuth::get_account_email),
|
||||
auth.map(|a| a.mode),
|
||||
config.otel.log_user_prompt,
|
||||
terminal::user_agent(),
|
||||
session_configuration.session_source.clone(),
|
||||
);
|
||||
config.features.emit_metrics(&otel_manager);
|
||||
otel_manager.counter(
|
||||
"codex.session.started",
|
||||
"codex.thread.started",
|
||||
1,
|
||||
&[(
|
||||
"is_git",
|
||||
@@ -1256,12 +1259,10 @@ impl Session {
|
||||
);
|
||||
}
|
||||
RolloutItem::Compacted(compacted) => {
|
||||
let snapshot = history.get_history();
|
||||
// TODO(jif) clean
|
||||
if let Some(replacement) = &compacted.replacement_history {
|
||||
history.replace(replacement.clone());
|
||||
} else {
|
||||
let user_messages = collect_user_messages(&snapshot);
|
||||
let user_messages = collect_user_messages(history.raw_items());
|
||||
let rebuilt = compact::build_compacted_history(
|
||||
self.build_initial_context(turn_context),
|
||||
&user_messages,
|
||||
@@ -1276,7 +1277,7 @@ impl Session {
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
history.get_history()
|
||||
history.raw_items().to_vec()
|
||||
}
|
||||
|
||||
/// Append ResponseItems to the in-memory conversation history only.
|
||||
@@ -1539,6 +1540,24 @@ impl Session {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the input if there was no task running to inject into
|
||||
pub async fn inject_response_items(
|
||||
&self,
|
||||
input: Vec<ResponseInputItem>,
|
||||
) -> Result<(), Vec<ResponseInputItem>> {
|
||||
let mut active = self.active_turn.lock().await;
|
||||
match active.as_mut() {
|
||||
Some(at) => {
|
||||
let mut ts = at.turn_state.lock().await;
|
||||
for item in input {
|
||||
ts.push_pending_input(item);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
None => Err(input),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_pending_input(&self) -> Vec<ResponseInputItem> {
|
||||
let mut active = self.active_turn.lock().await;
|
||||
match active.as_mut() {
|
||||
@@ -2107,7 +2126,10 @@ mod handlers {
|
||||
|
||||
let mut history = sess.clone_history().await;
|
||||
history.drop_last_n_user_turns(num_turns);
|
||||
sess.replace_history(history.get_history()).await;
|
||||
|
||||
// Replace with the raw items. We don't want to replace with a normalized
|
||||
// version of the history.
|
||||
sess.replace_history(history.raw_items().to_vec()).await;
|
||||
sess.recompute_token_usage(turn_context.as_ref()).await;
|
||||
|
||||
sess.send_event_raw_flushed(Event {
|
||||
@@ -2124,10 +2146,9 @@ mod handlers {
|
||||
.terminate_all_processes()
|
||||
.await;
|
||||
info!("Shutting down Codex instance");
|
||||
let turn_count = sess
|
||||
.clone_history()
|
||||
.await
|
||||
.get_history()
|
||||
let history = sess.clone_history().await;
|
||||
let turn_count = history
|
||||
.raw_items()
|
||||
.iter()
|
||||
.filter(|item| is_user_turn_boundary(item))
|
||||
.count();
|
||||
@@ -2321,9 +2342,9 @@ fn errors_to_info(errors: &[SkillError]) -> Vec<SkillErrorInfo> {
|
||||
/// - If the model requests a function call, we execute it and send the output
|
||||
/// back to the model in the next turn.
|
||||
/// - If the model sends only an assistant message, we record it in the
|
||||
/// conversation history and consider the task complete.
|
||||
/// conversation history and consider the turn complete.
|
||||
///
|
||||
pub(crate) async fn run_task(
|
||||
pub(crate) async fn run_turn(
|
||||
sess: Arc<Session>,
|
||||
turn_context: Arc<TurnContext>,
|
||||
input: Vec<UserInput>,
|
||||
@@ -2339,7 +2360,7 @@ pub(crate) async fn run_task(
|
||||
if total_usage_tokens >= auto_compact_limit {
|
||||
run_auto_compact(&sess, &turn_context).await;
|
||||
}
|
||||
let event = EventMsg::TaskStarted(TaskStartedEvent {
|
||||
let event = EventMsg::TurnStarted(TurnStartedEvent {
|
||||
model_context_window: turn_context.client.get_model_context_window(),
|
||||
});
|
||||
sess.send_event(&turn_context, event).await;
|
||||
@@ -2393,7 +2414,7 @@ pub(crate) async fn run_task(
|
||||
let turn_input: Vec<ResponseItem> = {
|
||||
sess.record_conversation_items(&turn_context, &pending_input)
|
||||
.await;
|
||||
sess.clone_history().await.get_history_for_prompt()
|
||||
sess.clone_history().await.for_prompt()
|
||||
};
|
||||
|
||||
let turn_input_messages = turn_input
|
||||
@@ -2404,7 +2425,7 @@ pub(crate) async fn run_task(
|
||||
})
|
||||
.map(|user_message| user_message.message())
|
||||
.collect::<Vec<String>>();
|
||||
match run_turn(
|
||||
match run_model_turn(
|
||||
Arc::clone(&sess),
|
||||
Arc::clone(&turn_context),
|
||||
Arc::clone(&turn_diff_tracker),
|
||||
@@ -2481,7 +2502,7 @@ async fn run_auto_compact(sess: &Arc<Session>, turn_context: &Arc<TurnContext>)
|
||||
cwd = %turn_context.cwd.display()
|
||||
)
|
||||
)]
|
||||
async fn run_turn(
|
||||
async fn run_model_turn(
|
||||
sess: Arc<Session>,
|
||||
turn_context: Arc<TurnContext>,
|
||||
turn_diff_tracker: SharedTurnDiffTracker,
|
||||
@@ -2934,8 +2955,8 @@ mod tests {
|
||||
}))
|
||||
.await;
|
||||
|
||||
let actual = session.state.lock().await.clone_history().get_history();
|
||||
assert_eq!(expected, actual);
|
||||
let history = session.state.lock().await.clone_history();
|
||||
assert_eq!(expected, history.raw_items());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -3024,8 +3045,8 @@ mod tests {
|
||||
.record_initial_history(InitialHistory::Forked(rollout_items))
|
||||
.await;
|
||||
|
||||
let actual = session.state.lock().await.clone_history().get_history();
|
||||
assert_eq!(expected, actual);
|
||||
let history = session.state.lock().await.clone_history();
|
||||
assert_eq!(expected, history.raw_items());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -3081,8 +3102,8 @@ mod tests {
|
||||
expected.extend(initial_context);
|
||||
expected.extend(turn_1);
|
||||
|
||||
let actual = sess.clone_history().await.get_history();
|
||||
assert_eq!(expected, actual);
|
||||
let history = sess.clone_history().await;
|
||||
assert_eq!(expected, history.raw_items());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -3107,8 +3128,8 @@ mod tests {
|
||||
let rollback_event = wait_for_thread_rolled_back(&rx).await;
|
||||
assert_eq!(rollback_event.num_turns, 99);
|
||||
|
||||
let actual = sess.clone_history().await.get_history();
|
||||
assert_eq!(initial_context, actual);
|
||||
let history = sess.clone_history().await;
|
||||
assert_eq!(initial_context, history.raw_items());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -3128,8 +3149,8 @@ mod tests {
|
||||
Some(CodexErrorInfo::ThreadRollbackFailed)
|
||||
);
|
||||
|
||||
let actual = sess.clone_history().await.get_history();
|
||||
assert_eq!(initial_context, actual);
|
||||
let history = sess.clone_history().await;
|
||||
assert_eq!(initial_context, history.raw_items());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -3149,8 +3170,8 @@ mod tests {
|
||||
Some(CodexErrorInfo::ThreadRollbackFailed)
|
||||
);
|
||||
|
||||
let actual = sess.clone_history().await.get_history();
|
||||
assert_eq!(initial_context, actual);
|
||||
let history = sess.clone_history().await;
|
||||
assert_eq!(initial_context, history.raw_items());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -3651,8 +3672,8 @@ mod tests {
|
||||
.record_model_warning("too many unified exec processes", &turn_context)
|
||||
.await;
|
||||
|
||||
let mut history = session.clone_history().await;
|
||||
let history_items = history.get_history();
|
||||
let history = session.clone_history().await;
|
||||
let history_items = history.raw_items();
|
||||
let last = history_items.last().expect("warning recorded");
|
||||
|
||||
match last {
|
||||
@@ -3798,8 +3819,9 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
let history = sess.clone_history().await.get_history();
|
||||
let _ = history;
|
||||
// TODO(jif) investigate what is this?
|
||||
let history = sess.clone_history().await;
|
||||
let _ = history.raw_items();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -3888,7 +3910,7 @@ mod tests {
|
||||
rollout_items.push(RolloutItem::ResponseItem(assistant1.clone()));
|
||||
|
||||
let summary1 = "summary one";
|
||||
let snapshot1 = live_history.get_history();
|
||||
let snapshot1 = live_history.clone().for_prompt();
|
||||
let user_messages1 = collect_user_messages(&snapshot1);
|
||||
let rebuilt1 = compact::build_compacted_history(
|
||||
session.build_initial_context(turn_context),
|
||||
@@ -3922,7 +3944,7 @@ mod tests {
|
||||
rollout_items.push(RolloutItem::ResponseItem(assistant2.clone()));
|
||||
|
||||
let summary2 = "summary two";
|
||||
let snapshot2 = live_history.get_history();
|
||||
let snapshot2 = live_history.clone().for_prompt();
|
||||
let user_messages2 = collect_user_messages(&snapshot2);
|
||||
let rebuilt2 = compact::build_compacted_history(
|
||||
session.build_initial_context(turn_context),
|
||||
@@ -3955,7 +3977,7 @@ mod tests {
|
||||
live_history.record_items(std::iter::once(&assistant3), turn_context.truncation_policy);
|
||||
rollout_items.push(RolloutItem::ResponseItem(assistant3.clone()));
|
||||
|
||||
(rollout_items, live_history.get_history())
|
||||
(rollout_items, live_history.for_prompt())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -135,7 +135,7 @@ pub(crate) async fn run_codex_thread_one_shot(
|
||||
while let Ok(event) = io_for_bridge.next_event().await {
|
||||
let should_shutdown = matches!(
|
||||
event.msg,
|
||||
EventMsg::TaskComplete(_) | EventMsg::TurnAborted(_)
|
||||
EventMsg::TurnComplete(_) | EventMsg::TurnAborted(_)
|
||||
);
|
||||
let _ = tx_bridge.send(event).await;
|
||||
if should_shutdown {
|
||||
@@ -253,7 +253,7 @@ async fn shutdown_delegate(codex: &Codex) {
|
||||
while let Ok(event) = codex.next_event().await {
|
||||
if matches!(
|
||||
event.msg,
|
||||
EventMsg::TurnAborted(_) | EventMsg::TaskComplete(_)
|
||||
EventMsg::TurnAborted(_) | EventMsg::TurnComplete(_)
|
||||
) {
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -12,8 +12,8 @@ use crate::features::Feature;
|
||||
use crate::protocol::CompactedItem;
|
||||
use crate::protocol::ContextCompactedEvent;
|
||||
use crate::protocol::EventMsg;
|
||||
use crate::protocol::TaskStartedEvent;
|
||||
use crate::protocol::TurnContextItem;
|
||||
use crate::protocol::TurnStartedEvent;
|
||||
use crate::protocol::WarningEvent;
|
||||
use crate::truncate::TruncationPolicy;
|
||||
use crate::truncate::approx_token_count;
|
||||
@@ -54,7 +54,7 @@ pub(crate) async fn run_compact_task(
|
||||
turn_context: Arc<TurnContext>,
|
||||
input: Vec<UserInput>,
|
||||
) {
|
||||
let start_event = EventMsg::TaskStarted(TaskStartedEvent {
|
||||
let start_event = EventMsg::TurnStarted(TurnStartedEvent {
|
||||
model_context_window: turn_context.client.get_model_context_window(),
|
||||
});
|
||||
sess.send_event(&turn_context, start_event).await;
|
||||
@@ -95,9 +95,11 @@ async fn run_compact_task_inner(
|
||||
sess.persist_rollout_items(&[rollout_item]).await;
|
||||
|
||||
loop {
|
||||
let turn_input = history.get_history_for_prompt();
|
||||
// Clone is required because of the loop
|
||||
let turn_input = history.clone().for_prompt();
|
||||
let turn_input_len = turn_input.len();
|
||||
let prompt = Prompt {
|
||||
input: turn_input.clone(),
|
||||
input: turn_input,
|
||||
..Default::default()
|
||||
};
|
||||
let attempt_result = drain_to_completed(&sess, turn_context.as_ref(), &prompt).await;
|
||||
@@ -119,7 +121,7 @@ async fn run_compact_task_inner(
|
||||
return;
|
||||
}
|
||||
Err(e @ CodexErr::ContextWindowExceeded) => {
|
||||
if turn_input.len() > 1 {
|
||||
if turn_input_len > 1 {
|
||||
// Trim from the beginning to preserve cache (prefix-based) and keep recent messages intact.
|
||||
error!(
|
||||
"Context window exceeded while compacting; removing oldest history item. Error: {e}"
|
||||
@@ -155,15 +157,15 @@ async fn run_compact_task_inner(
|
||||
}
|
||||
}
|
||||
|
||||
let history_snapshot = sess.clone_history().await.get_history();
|
||||
let summary_suffix =
|
||||
get_last_assistant_message_from_turn(&history_snapshot).unwrap_or_default();
|
||||
let history_snapshot = sess.clone_history().await;
|
||||
let history_items = history_snapshot.raw_items();
|
||||
let summary_suffix = get_last_assistant_message_from_turn(history_items).unwrap_or_default();
|
||||
let summary_text = format!("{SUMMARY_PREFIX}\n{summary_suffix}");
|
||||
let user_messages = collect_user_messages(&history_snapshot);
|
||||
let user_messages = collect_user_messages(history_items);
|
||||
|
||||
let initial_context = sess.build_initial_context(turn_context.as_ref());
|
||||
let mut new_history = build_compacted_history(initial_context, &user_messages, &summary_text);
|
||||
let ghost_snapshots: Vec<ResponseItem> = history_snapshot
|
||||
let ghost_snapshots: Vec<ResponseItem> = history_items
|
||||
.iter()
|
||||
.filter(|item| matches!(item, ResponseItem::GhostSnapshot { .. }))
|
||||
.cloned()
|
||||
|
||||
@@ -8,7 +8,7 @@ use crate::protocol::CompactedItem;
|
||||
use crate::protocol::ContextCompactedEvent;
|
||||
use crate::protocol::EventMsg;
|
||||
use crate::protocol::RolloutItem;
|
||||
use crate::protocol::TaskStartedEvent;
|
||||
use crate::protocol::TurnStartedEvent;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
|
||||
pub(crate) async fn run_inline_remote_auto_compact_task(
|
||||
@@ -19,7 +19,7 @@ pub(crate) async fn run_inline_remote_auto_compact_task(
|
||||
}
|
||||
|
||||
pub(crate) async fn run_remote_compact_task(sess: Arc<Session>, turn_context: Arc<TurnContext>) {
|
||||
let start_event = EventMsg::TaskStarted(TaskStartedEvent {
|
||||
let start_event = EventMsg::TurnStarted(TurnStartedEvent {
|
||||
model_context_window: turn_context.client.get_model_context_window(),
|
||||
});
|
||||
sess.send_event(&turn_context, start_event).await;
|
||||
@@ -40,9 +40,18 @@ async fn run_remote_compact_task_inner_impl(
|
||||
sess: &Arc<Session>,
|
||||
turn_context: &Arc<TurnContext>,
|
||||
) -> CodexResult<()> {
|
||||
let mut history = sess.clone_history().await;
|
||||
let history = sess.clone_history().await;
|
||||
|
||||
// Required to keep `/undo` available after compaction
|
||||
let ghost_snapshots: Vec<ResponseItem> = history
|
||||
.raw_items()
|
||||
.iter()
|
||||
.filter(|item| matches!(item, ResponseItem::GhostSnapshot { .. }))
|
||||
.cloned()
|
||||
.collect();
|
||||
|
||||
let prompt = Prompt {
|
||||
input: history.get_history_for_prompt(),
|
||||
input: history.for_prompt(),
|
||||
tools: vec![],
|
||||
parallel_tool_calls: false,
|
||||
base_instructions_override: turn_context.base_instructions.clone(),
|
||||
@@ -53,13 +62,6 @@ async fn run_remote_compact_task_inner_impl(
|
||||
.client
|
||||
.compact_conversation_history(&prompt)
|
||||
.await?;
|
||||
// Required to keep `/undo` available after compaction
|
||||
let ghost_snapshots: Vec<ResponseItem> = history
|
||||
.get_history()
|
||||
.iter()
|
||||
.filter(|item| matches!(item, ResponseItem::GhostSnapshot { .. }))
|
||||
.cloned()
|
||||
.collect();
|
||||
|
||||
if !ghost_snapshots.is_empty() {
|
||||
new_history.extend(ghost_snapshots);
|
||||
|
||||
@@ -32,6 +32,7 @@ use crate::protocol::AskForApproval;
|
||||
use crate::protocol::SandboxPolicy;
|
||||
use codex_app_server_protocol::Tools;
|
||||
use codex_app_server_protocol::UserSavedConfig;
|
||||
use codex_protocol::config_types::AltScreenMode;
|
||||
use codex_protocol::config_types::ForcedLoginMethod;
|
||||
use codex_protocol::config_types::ReasoningSummary;
|
||||
use codex_protocol::config_types::SandboxMode;
|
||||
@@ -236,6 +237,14 @@ pub struct Config {
|
||||
/// consistently to both mouse wheels and trackpads.
|
||||
pub tui_scroll_invert: bool,
|
||||
|
||||
/// Controls whether the TUI uses the terminal's alternate screen buffer.
|
||||
///
|
||||
/// This is the same `tui.alternate_screen` value from `config.toml` (see [`Tui`]).
|
||||
/// - `auto` (default): Disable alternate screen in Zellij, enable elsewhere.
|
||||
/// - `always`: Always use alternate screen (original behavior).
|
||||
/// - `never`: Never use alternate screen (inline mode, preserves scrollback).
|
||||
pub tui_alternate_screen: AltScreenMode,
|
||||
|
||||
/// The directory that should be treated as the current working directory
|
||||
/// for the session. All relative paths inside the business-logic layer are
|
||||
/// resolved against this path.
|
||||
@@ -353,8 +362,12 @@ pub struct Config {
|
||||
pub disable_paste_burst: bool,
|
||||
|
||||
/// When `false`, disables analytics across Codex product surfaces in this machine.
|
||||
/// Voluntarily left as Optional because the default value might depend on the client.
|
||||
pub analytics_enabled: Option<bool>,
|
||||
|
||||
/// When `false`, disables feedback collection across Codex product surfaces.
|
||||
/// Defaults to `true`.
|
||||
pub analytics: bool,
|
||||
pub feedback_enabled: bool,
|
||||
|
||||
/// OTEL configuration (exporter type, endpoint, headers, etc.).
|
||||
pub otel: crate::config::types::OtelConfig,
|
||||
@@ -820,6 +833,10 @@ pub struct ConfigToml {
|
||||
/// Defaults to `true`.
|
||||
pub analytics: Option<crate::config::types::AnalyticsConfigToml>,
|
||||
|
||||
/// When `false`, disables feedback collection across Codex product surfaces.
|
||||
/// Defaults to `true`.
|
||||
pub feedback: Option<crate::config::types::FeedbackConfigToml>,
|
||||
|
||||
/// OTEL configuration.
|
||||
pub otel: Option<crate::config::types::OtelConfigToml>,
|
||||
|
||||
@@ -1397,11 +1414,15 @@ impl Config {
|
||||
notices: cfg.notice.unwrap_or_default(),
|
||||
check_for_update_on_startup,
|
||||
disable_paste_burst: cfg.disable_paste_burst.unwrap_or(false),
|
||||
analytics: config_profile
|
||||
analytics_enabled: config_profile
|
||||
.analytics
|
||||
.as_ref()
|
||||
.and_then(|a| a.enabled)
|
||||
.or(cfg.analytics.as_ref().and_then(|a| a.enabled))
|
||||
.or(cfg.analytics.as_ref().and_then(|a| a.enabled)),
|
||||
feedback_enabled: cfg
|
||||
.feedback
|
||||
.as_ref()
|
||||
.and_then(|feedback| feedback.enabled)
|
||||
.unwrap_or(true),
|
||||
tui_notifications: cfg
|
||||
.tui
|
||||
@@ -1431,6 +1452,11 @@ impl Config {
|
||||
.as_ref()
|
||||
.and_then(|t| t.scroll_wheel_like_max_duration_ms),
|
||||
tui_scroll_invert: cfg.tui.as_ref().map(|t| t.scroll_invert).unwrap_or(false),
|
||||
tui_alternate_screen: cfg
|
||||
.tui
|
||||
.as_ref()
|
||||
.map(|t| t.alternate_screen)
|
||||
.unwrap_or_default(),
|
||||
otel: {
|
||||
let t: OtelConfigToml = cfg.otel.unwrap_or_default();
|
||||
let log_user_prompt = t.log_user_prompt.unwrap_or(false);
|
||||
@@ -1504,6 +1530,15 @@ impl Config {
|
||||
}
|
||||
self.forced_auto_mode_downgraded_on_windows = !value;
|
||||
}
|
||||
|
||||
pub fn set_windows_elevated_sandbox_globally(&mut self, value: bool) {
|
||||
crate::safety::set_windows_elevated_sandbox_enabled(value);
|
||||
if value {
|
||||
self.features.enable(Feature::WindowsSandboxElevated);
|
||||
} else {
|
||||
self.features.disable(Feature::WindowsSandboxElevated);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn default_review_model() -> String {
|
||||
@@ -1550,6 +1585,7 @@ mod tests {
|
||||
use crate::config::edit::ConfigEdit;
|
||||
use crate::config::edit::ConfigEditsBuilder;
|
||||
use crate::config::edit::apply_blocking;
|
||||
use crate::config::types::FeedbackConfigToml;
|
||||
use crate::config::types::HistoryPersistence;
|
||||
use crate::config::types::McpServerTransportConfig;
|
||||
use crate::config::types::Notifications;
|
||||
@@ -1619,6 +1655,7 @@ persistence = "none"
|
||||
scroll_wheel_tick_detect_max_ms: None,
|
||||
scroll_wheel_like_max_duration_ms: None,
|
||||
scroll_invert: false,
|
||||
alternate_screen: AltScreenMode::Auto,
|
||||
}
|
||||
);
|
||||
}
|
||||
@@ -1876,6 +1913,25 @@ trust_level = "trusted"
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn feedback_enabled_defaults_to_true() -> std::io::Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let cfg = ConfigToml {
|
||||
feedback: Some(FeedbackConfigToml::default()),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let config = Config::load_from_base_config_with_overrides(
|
||||
cfg,
|
||||
ConfigOverrides::default(),
|
||||
codex_home.path().to_path_buf(),
|
||||
)?;
|
||||
|
||||
assert_eq!(config.feedback_enabled, true);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn profile_legacy_toggles_override_base() -> std::io::Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
@@ -3224,7 +3280,8 @@ model_verbosity = "high"
|
||||
tui_notifications: Default::default(),
|
||||
animations: true,
|
||||
show_tooltips: true,
|
||||
analytics: true,
|
||||
analytics_enabled: Some(true),
|
||||
feedback_enabled: true,
|
||||
tui_scroll_events_per_tick: None,
|
||||
tui_scroll_wheel_lines: None,
|
||||
tui_scroll_trackpad_lines: None,
|
||||
@@ -3234,6 +3291,7 @@ model_verbosity = "high"
|
||||
tui_scroll_wheel_tick_detect_max_ms: None,
|
||||
tui_scroll_wheel_like_max_duration_ms: None,
|
||||
tui_scroll_invert: false,
|
||||
tui_alternate_screen: AltScreenMode::Auto,
|
||||
otel: OtelConfig::default(),
|
||||
},
|
||||
o3_profile_config
|
||||
@@ -3308,7 +3366,8 @@ model_verbosity = "high"
|
||||
tui_notifications: Default::default(),
|
||||
animations: true,
|
||||
show_tooltips: true,
|
||||
analytics: true,
|
||||
analytics_enabled: Some(true),
|
||||
feedback_enabled: true,
|
||||
tui_scroll_events_per_tick: None,
|
||||
tui_scroll_wheel_lines: None,
|
||||
tui_scroll_trackpad_lines: None,
|
||||
@@ -3318,6 +3377,7 @@ model_verbosity = "high"
|
||||
tui_scroll_wheel_tick_detect_max_ms: None,
|
||||
tui_scroll_wheel_like_max_duration_ms: None,
|
||||
tui_scroll_invert: false,
|
||||
tui_alternate_screen: AltScreenMode::Auto,
|
||||
otel: OtelConfig::default(),
|
||||
};
|
||||
|
||||
@@ -3407,7 +3467,8 @@ model_verbosity = "high"
|
||||
tui_notifications: Default::default(),
|
||||
animations: true,
|
||||
show_tooltips: true,
|
||||
analytics: false,
|
||||
analytics_enabled: Some(false),
|
||||
feedback_enabled: true,
|
||||
tui_scroll_events_per_tick: None,
|
||||
tui_scroll_wheel_lines: None,
|
||||
tui_scroll_trackpad_lines: None,
|
||||
@@ -3417,6 +3478,7 @@ model_verbosity = "high"
|
||||
tui_scroll_wheel_tick_detect_max_ms: None,
|
||||
tui_scroll_wheel_like_max_duration_ms: None,
|
||||
tui_scroll_invert: false,
|
||||
tui_alternate_screen: AltScreenMode::Auto,
|
||||
otel: OtelConfig::default(),
|
||||
};
|
||||
|
||||
@@ -3492,7 +3554,8 @@ model_verbosity = "high"
|
||||
tui_notifications: Default::default(),
|
||||
animations: true,
|
||||
show_tooltips: true,
|
||||
analytics: true,
|
||||
analytics_enabled: Some(true),
|
||||
feedback_enabled: true,
|
||||
tui_scroll_events_per_tick: None,
|
||||
tui_scroll_wheel_lines: None,
|
||||
tui_scroll_trackpad_lines: None,
|
||||
@@ -3502,6 +3565,7 @@ model_verbosity = "high"
|
||||
tui_scroll_wheel_tick_detect_max_ms: None,
|
||||
tui_scroll_wheel_like_max_duration_ms: None,
|
||||
tui_scroll_invert: false,
|
||||
tui_alternate_screen: AltScreenMode::Auto,
|
||||
otel: OtelConfig::default(),
|
||||
};
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
// Note this file should generally be restricted to simple struct/enum
|
||||
// definitions that do not contain business logic.
|
||||
|
||||
pub use codex_protocol::config_types::AltScreenMode;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::HashMap;
|
||||
@@ -282,6 +283,12 @@ pub struct AnalyticsConfigToml {
|
||||
pub enabled: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default)]
|
||||
pub struct FeedbackConfigToml {
|
||||
/// When `false`, disables the feedback flow across Codex product surfaces.
|
||||
pub enabled: Option<bool>,
|
||||
}
|
||||
|
||||
// ===== OTEL configuration =====
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||
@@ -517,6 +524,17 @@ pub struct Tui {
|
||||
/// wheel and trackpad input.
|
||||
#[serde(default)]
|
||||
pub scroll_invert: bool,
|
||||
|
||||
/// Controls whether the TUI uses the terminal's alternate screen buffer.
|
||||
///
|
||||
/// - `auto` (default): Disable alternate screen in Zellij, enable elsewhere.
|
||||
/// - `always`: Always use alternate screen (original behavior).
|
||||
/// - `never`: Never use alternate screen (inline mode only, preserves scrollback).
|
||||
///
|
||||
/// Using alternate screen provides a cleaner fullscreen experience but prevents
|
||||
/// scrollback in terminal multiplexers like Zellij that follow the xterm spec.
|
||||
#[serde(default)]
|
||||
pub alternate_screen: AltScreenMode,
|
||||
}
|
||||
|
||||
const fn default_true() -> bool {
|
||||
|
||||
@@ -574,7 +574,14 @@ impl From<LegacyManagedConfigToml> for ConfigRequirementsToml {
|
||||
config_requirements_toml.allowed_approval_policies = Some(vec![approval_policy]);
|
||||
}
|
||||
if let Some(sandbox_mode) = sandbox_mode {
|
||||
config_requirements_toml.allowed_sandbox_modes = Some(vec![sandbox_mode.into()]);
|
||||
let required_mode: SandboxModeRequirement = sandbox_mode.into();
|
||||
// Allowing read-only is a requirement for Codex to function correctly.
|
||||
// So in this backfill path, we append read-only if it's not already specified.
|
||||
let mut allowed_modes = vec![SandboxModeRequirement::ReadOnly];
|
||||
if required_mode != SandboxModeRequirement::ReadOnly {
|
||||
allowed_modes.push(required_mode);
|
||||
}
|
||||
config_requirements_toml.allowed_sandbox_modes = Some(allowed_modes);
|
||||
}
|
||||
config_requirements_toml
|
||||
}
|
||||
@@ -622,4 +629,22 @@ foo = "xyzzy"
|
||||
assert_eq!(normalized_toml_value, TomlValue::Table(expected_toml_value));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn legacy_managed_config_backfill_includes_read_only_sandbox_mode() {
|
||||
let legacy = LegacyManagedConfigToml {
|
||||
approval_policy: None,
|
||||
sandbox_mode: Some(SandboxMode::WorkspaceWrite),
|
||||
};
|
||||
|
||||
let requirements = ConfigRequirementsToml::from(legacy);
|
||||
|
||||
assert_eq!(
|
||||
requirements.allowed_sandbox_modes,
|
||||
Some(vec![
|
||||
SandboxModeRequirement::ReadOnly,
|
||||
SandboxModeRequirement::WorkspaceWrite
|
||||
])
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -67,17 +67,18 @@ impl ContextManager {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_history(&mut self) -> Vec<ResponseItem> {
|
||||
/// Returns the history prepared for sending to the model. This applies a proper
|
||||
/// normalization and drop un-suited items.
|
||||
pub(crate) fn for_prompt(mut self) -> Vec<ResponseItem> {
|
||||
self.normalize_history();
|
||||
self.contents()
|
||||
self.items
|
||||
.retain(|item| !matches!(item, ResponseItem::GhostSnapshot { .. }));
|
||||
self.items
|
||||
}
|
||||
|
||||
// Returns the history prepared for sending to the model.
|
||||
// With extra response items filtered out and GhostCommits removed.
|
||||
pub(crate) fn get_history_for_prompt(&mut self) -> Vec<ResponseItem> {
|
||||
let mut history = self.get_history();
|
||||
Self::remove_ghost_snapshots(&mut history);
|
||||
history
|
||||
/// Returns raw items in the history.
|
||||
pub(crate) fn raw_items(&self) -> &[ResponseItem] {
|
||||
&self.items
|
||||
}
|
||||
|
||||
// Estimate token usage using byte-based heuristics from the truncation helpers.
|
||||
@@ -168,9 +169,7 @@ impl ContextManager {
|
||||
return;
|
||||
}
|
||||
|
||||
// Keep behavior consistent with call sites that previously operated on `get_history()`:
|
||||
// normalize first (call/output invariants), then truncate based on the normalized view.
|
||||
let snapshot = self.get_history();
|
||||
let snapshot = self.items.clone();
|
||||
let user_positions = user_message_positions(&snapshot);
|
||||
let Some(&first_user_idx) = user_positions.first() else {
|
||||
self.replace(snapshot);
|
||||
@@ -250,15 +249,6 @@ impl ContextManager {
|
||||
normalize::remove_orphan_outputs(&mut self.items);
|
||||
}
|
||||
|
||||
/// Returns a clone of the contents in the transcript.
|
||||
fn contents(&self) -> Vec<ResponseItem> {
|
||||
self.items.clone()
|
||||
}
|
||||
|
||||
fn remove_ghost_snapshots(items: &mut Vec<ResponseItem>) {
|
||||
items.retain(|item| !matches!(item, ResponseItem::GhostSnapshot { .. }));
|
||||
}
|
||||
|
||||
fn process_item(&self, item: &ResponseItem, policy: TruncationPolicy) -> ResponseItem {
|
||||
let policy_with_serialization_budget = policy.mul(1.2);
|
||||
match item {
|
||||
|
||||
@@ -101,7 +101,7 @@ fn filters_non_api_messages() {
|
||||
let a = assistant_msg("hello");
|
||||
h.record_items([&u, &a], policy);
|
||||
|
||||
let items = h.contents();
|
||||
let items = h.raw_items();
|
||||
assert_eq!(
|
||||
items,
|
||||
vec![
|
||||
@@ -160,8 +160,8 @@ fn get_history_for_prompt_drops_ghost_commits() {
|
||||
let items = vec![ResponseItem::GhostSnapshot {
|
||||
ghost_commit: GhostCommit::new("ghost-1".to_string(), None, Vec::new(), Vec::new()),
|
||||
}];
|
||||
let mut history = create_history_with_items(items);
|
||||
let filtered = history.get_history_for_prompt();
|
||||
let history = create_history_with_items(items);
|
||||
let filtered = history.for_prompt();
|
||||
assert_eq!(filtered, vec![]);
|
||||
}
|
||||
|
||||
@@ -184,7 +184,7 @@ fn remove_first_item_removes_matching_output_for_function_call() {
|
||||
];
|
||||
let mut h = create_history_with_items(items);
|
||||
h.remove_first_item();
|
||||
assert_eq!(h.contents(), vec![]);
|
||||
assert_eq!(h.raw_items(), vec![]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -206,7 +206,7 @@ fn remove_first_item_removes_matching_call_for_output() {
|
||||
];
|
||||
let mut h = create_history_with_items(items);
|
||||
h.remove_first_item();
|
||||
assert_eq!(h.contents(), vec![]);
|
||||
assert_eq!(h.raw_items(), vec![]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -234,7 +234,7 @@ fn remove_first_item_handles_local_shell_pair() {
|
||||
];
|
||||
let mut h = create_history_with_items(items);
|
||||
h.remove_first_item();
|
||||
assert_eq!(h.contents(), vec![]);
|
||||
assert_eq!(h.raw_items(), vec![]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -250,7 +250,7 @@ fn drop_last_n_user_turns_preserves_prefix() {
|
||||
let mut history = create_history_with_items(items);
|
||||
history.drop_last_n_user_turns(1);
|
||||
assert_eq!(
|
||||
history.get_history(),
|
||||
history.for_prompt(),
|
||||
vec![
|
||||
assistant_msg("session prefix item"),
|
||||
user_msg("u1"),
|
||||
@@ -267,7 +267,7 @@ fn drop_last_n_user_turns_preserves_prefix() {
|
||||
]);
|
||||
history.drop_last_n_user_turns(99);
|
||||
assert_eq!(
|
||||
history.get_history(),
|
||||
history.for_prompt(),
|
||||
vec![assistant_msg("session prefix item")]
|
||||
);
|
||||
}
|
||||
@@ -307,7 +307,7 @@ fn drop_last_n_user_turns_ignores_session_prefix_user_messages() {
|
||||
assistant_msg("turn 1 assistant"),
|
||||
];
|
||||
|
||||
assert_eq!(history.get_history(), expected_prefix_and_first_turn);
|
||||
assert_eq!(history.for_prompt(), expected_prefix_and_first_turn);
|
||||
|
||||
let expected_prefix_only = vec![
|
||||
user_input_text_msg("<environment_context>ctx</environment_context>"),
|
||||
@@ -337,7 +337,7 @@ fn drop_last_n_user_turns_ignores_session_prefix_user_messages() {
|
||||
assistant_msg("turn 2 assistant"),
|
||||
]);
|
||||
history.drop_last_n_user_turns(2);
|
||||
assert_eq!(history.get_history(), expected_prefix_only);
|
||||
assert_eq!(history.for_prompt(), expected_prefix_only);
|
||||
|
||||
let mut history = create_history_with_items(vec![
|
||||
user_input_text_msg("<environment_context>ctx</environment_context>"),
|
||||
@@ -355,7 +355,7 @@ fn drop_last_n_user_turns_ignores_session_prefix_user_messages() {
|
||||
assistant_msg("turn 2 assistant"),
|
||||
]);
|
||||
history.drop_last_n_user_turns(3);
|
||||
assert_eq!(history.get_history(), expected_prefix_only);
|
||||
assert_eq!(history.for_prompt(), expected_prefix_only);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -375,7 +375,7 @@ fn remove_first_item_handles_custom_tool_pair() {
|
||||
];
|
||||
let mut h = create_history_with_items(items);
|
||||
h.remove_first_item();
|
||||
assert_eq!(h.contents(), vec![]);
|
||||
assert_eq!(h.raw_items(), vec![]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -402,8 +402,8 @@ fn normalization_retains_local_shell_outputs() {
|
||||
},
|
||||
];
|
||||
|
||||
let mut history = create_history_with_items(items.clone());
|
||||
let normalized = history.get_history();
|
||||
let history = create_history_with_items(items.clone());
|
||||
let normalized = history.for_prompt();
|
||||
assert_eq!(normalized, items);
|
||||
}
|
||||
|
||||
@@ -607,7 +607,7 @@ fn normalize_adds_missing_output_for_function_call() {
|
||||
h.normalize_history();
|
||||
|
||||
assert_eq!(
|
||||
h.contents(),
|
||||
h.raw_items(),
|
||||
vec![
|
||||
ResponseItem::FunctionCall {
|
||||
id: None,
|
||||
@@ -641,7 +641,7 @@ fn normalize_adds_missing_output_for_custom_tool_call() {
|
||||
h.normalize_history();
|
||||
|
||||
assert_eq!(
|
||||
h.contents(),
|
||||
h.raw_items(),
|
||||
vec![
|
||||
ResponseItem::CustomToolCall {
|
||||
id: None,
|
||||
@@ -678,7 +678,7 @@ fn normalize_adds_missing_output_for_local_shell_call_with_id() {
|
||||
h.normalize_history();
|
||||
|
||||
assert_eq!(
|
||||
h.contents(),
|
||||
h.raw_items(),
|
||||
vec![
|
||||
ResponseItem::LocalShellCall {
|
||||
id: None,
|
||||
@@ -717,7 +717,7 @@ fn normalize_removes_orphan_function_call_output() {
|
||||
|
||||
h.normalize_history();
|
||||
|
||||
assert_eq!(h.contents(), vec![]);
|
||||
assert_eq!(h.raw_items(), vec![]);
|
||||
}
|
||||
|
||||
#[cfg(not(debug_assertions))]
|
||||
@@ -731,7 +731,7 @@ fn normalize_removes_orphan_custom_tool_call_output() {
|
||||
|
||||
h.normalize_history();
|
||||
|
||||
assert_eq!(h.contents(), vec![]);
|
||||
assert_eq!(h.raw_items(), vec![]);
|
||||
}
|
||||
|
||||
#[cfg(not(debug_assertions))]
|
||||
@@ -780,7 +780,7 @@ fn normalize_mixed_inserts_and_removals() {
|
||||
h.normalize_history();
|
||||
|
||||
assert_eq!(
|
||||
h.contents(),
|
||||
h.raw_items(),
|
||||
vec![
|
||||
ResponseItem::FunctionCall {
|
||||
id: None,
|
||||
@@ -840,7 +840,7 @@ fn normalize_adds_missing_output_for_function_call_inserts_output() {
|
||||
let mut h = create_history_with_items(items);
|
||||
h.normalize_history();
|
||||
assert_eq!(
|
||||
h.contents(),
|
||||
h.raw_items(),
|
||||
vec![
|
||||
ResponseItem::FunctionCall {
|
||||
id: None,
|
||||
|
||||
@@ -4,7 +4,7 @@ pub use codex_client::CodexRequestBuilder;
|
||||
use reqwest::header::HeaderValue;
|
||||
use std::sync::LazyLock;
|
||||
use std::sync::Mutex;
|
||||
use std::sync::OnceLock;
|
||||
use std::sync::RwLock;
|
||||
|
||||
/// Set this to add a suffix to the User-Agent string.
|
||||
///
|
||||
@@ -30,7 +30,7 @@ pub struct Originator {
|
||||
pub value: String,
|
||||
pub header_value: HeaderValue,
|
||||
}
|
||||
static ORIGINATOR: OnceLock<Originator> = OnceLock::new();
|
||||
static ORIGINATOR: LazyLock<RwLock<Option<Originator>>> = LazyLock::new(|| RwLock::new(None));
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum SetOriginatorError {
|
||||
@@ -60,22 +60,48 @@ fn get_originator_value(provided: Option<String>) -> Originator {
|
||||
}
|
||||
|
||||
pub fn set_default_originator(value: String) -> Result<(), SetOriginatorError> {
|
||||
if HeaderValue::from_str(&value).is_err() {
|
||||
return Err(SetOriginatorError::InvalidHeaderValue);
|
||||
}
|
||||
let originator = get_originator_value(Some(value));
|
||||
ORIGINATOR
|
||||
.set(originator)
|
||||
.map_err(|_| SetOriginatorError::AlreadyInitialized)
|
||||
let Ok(mut guard) = ORIGINATOR.write() else {
|
||||
return Err(SetOriginatorError::AlreadyInitialized);
|
||||
};
|
||||
if guard.is_some() {
|
||||
return Err(SetOriginatorError::AlreadyInitialized);
|
||||
}
|
||||
*guard = Some(originator);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn originator() -> &'static Originator {
|
||||
ORIGINATOR.get_or_init(|| get_originator_value(None))
|
||||
pub fn originator() -> Originator {
|
||||
if let Ok(guard) = ORIGINATOR.read()
|
||||
&& let Some(originator) = guard.as_ref()
|
||||
{
|
||||
return originator.clone();
|
||||
}
|
||||
|
||||
if std::env::var(CODEX_INTERNAL_ORIGINATOR_OVERRIDE_ENV_VAR).is_ok() {
|
||||
let originator = get_originator_value(None);
|
||||
if let Ok(mut guard) = ORIGINATOR.write() {
|
||||
match guard.as_ref() {
|
||||
Some(originator) => return originator.clone(),
|
||||
None => *guard = Some(originator.clone()),
|
||||
}
|
||||
}
|
||||
return originator;
|
||||
}
|
||||
|
||||
get_originator_value(None)
|
||||
}
|
||||
|
||||
pub fn get_codex_user_agent() -> String {
|
||||
let build_version = env!("CARGO_PKG_VERSION");
|
||||
let os_info = os_info::get();
|
||||
let originator = originator();
|
||||
let prefix = format!(
|
||||
"{}/{build_version} ({} {}; {}) {}",
|
||||
originator().value.as_str(),
|
||||
originator.value.as_str(),
|
||||
os_info.os_type(),
|
||||
os_info.version(),
|
||||
os_info.architecture().unwrap_or("unknown"),
|
||||
@@ -123,7 +149,7 @@ fn sanitize_user_agent(candidate: String, fallback: &str) -> String {
|
||||
tracing::warn!(
|
||||
"Falling back to default Codex originator because base user agent string is invalid"
|
||||
);
|
||||
originator().value.clone()
|
||||
originator().value
|
||||
}
|
||||
}
|
||||
|
||||
@@ -137,7 +163,7 @@ pub fn build_reqwest_client() -> reqwest::Client {
|
||||
use reqwest::header::HeaderMap;
|
||||
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("originator", originator().header_value.clone());
|
||||
headers.insert("originator", originator().header_value);
|
||||
let ua = get_codex_user_agent();
|
||||
|
||||
let mut builder = reqwest::Client::builder()
|
||||
@@ -163,7 +189,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_get_codex_user_agent() {
|
||||
let user_agent = get_codex_user_agent();
|
||||
let originator = originator().value.as_str();
|
||||
let originator = originator().value;
|
||||
let prefix = format!("{originator}/");
|
||||
assert!(user_agent.starts_with(&prefix));
|
||||
}
|
||||
|
||||
@@ -1,5 +1,9 @@
|
||||
//! Functions for environment detection that need to be shared across crates.
|
||||
|
||||
fn env_var_set(key: &str) -> bool {
|
||||
std::env::var(key).is_ok_and(|v| !v.trim().is_empty())
|
||||
}
|
||||
|
||||
/// Returns true if the current process is running under Windows Subsystem for Linux.
|
||||
pub fn is_wsl() -> bool {
|
||||
#[cfg(target_os = "linux")]
|
||||
@@ -17,3 +21,26 @@ pub fn is_wsl() -> bool {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true when Codex is likely running in an environment without a usable GUI.
|
||||
///
|
||||
/// This is intentionally conservative and is used by frontends to avoid flows that would try to
|
||||
/// open a browser (e.g. device-code auth fallback).
|
||||
pub fn is_headless_environment() -> bool {
|
||||
if env_var_set("CI")
|
||||
|| env_var_set("SSH_CONNECTION")
|
||||
|| env_var_set("SSH_CLIENT")
|
||||
|| env_var_set("SSH_TTY")
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
if !env_var_set("DISPLAY") && !env_var_set("WAYLAND_DISPLAY") {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
|
||||
@@ -277,6 +277,7 @@ pub enum RefreshTokenFailedReason {
|
||||
pub struct UnexpectedResponseError {
|
||||
pub status: StatusCode,
|
||||
pub body: String,
|
||||
pub url: Option<String>,
|
||||
pub request_id: Option<String>,
|
||||
}
|
||||
|
||||
@@ -293,7 +294,11 @@ impl UnexpectedResponseError {
|
||||
return None;
|
||||
}
|
||||
|
||||
let mut message = format!("{CLOUDFLARE_BLOCKED_MESSAGE} (status {})", self.status);
|
||||
let status = self.status;
|
||||
let mut message = format!("{CLOUDFLARE_BLOCKED_MESSAGE} (status {status})");
|
||||
if let Some(url) = &self.url {
|
||||
message.push_str(&format!(", url: {url}"));
|
||||
}
|
||||
if let Some(id) = &self.request_id {
|
||||
message.push_str(&format!(", request id: {id}"));
|
||||
}
|
||||
@@ -307,16 +312,16 @@ impl std::fmt::Display for UnexpectedResponseError {
|
||||
if let Some(friendly) = self.friendly_message() {
|
||||
write!(f, "{friendly}")
|
||||
} else {
|
||||
write!(
|
||||
f,
|
||||
"unexpected status {}: {}{}",
|
||||
self.status,
|
||||
self.body,
|
||||
self.request_id
|
||||
.as_ref()
|
||||
.map(|id| format!(", request id: {id}"))
|
||||
.unwrap_or_default()
|
||||
)
|
||||
let status = self.status;
|
||||
let body = &self.body;
|
||||
let mut message = format!("unexpected status {status}: {body}");
|
||||
if let Some(url) = &self.url {
|
||||
message.push_str(&format!(", url: {url}"));
|
||||
}
|
||||
if let Some(id) = &self.request_id {
|
||||
message.push_str(&format!(", request id: {id}"));
|
||||
}
|
||||
write!(f, "{message}")
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -826,12 +831,16 @@ mod tests {
|
||||
status: StatusCode::FORBIDDEN,
|
||||
body: "<html><body>Cloudflare error: Sorry, you have been blocked</body></html>"
|
||||
.to_string(),
|
||||
url: Some("http://example.com/blocked".to_string()),
|
||||
request_id: Some("ray-id".to_string()),
|
||||
};
|
||||
let status = StatusCode::FORBIDDEN.to_string();
|
||||
let url = "http://example.com/blocked";
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
format!("{CLOUDFLARE_BLOCKED_MESSAGE} (status {status}), request id: ray-id")
|
||||
format!(
|
||||
"{CLOUDFLARE_BLOCKED_MESSAGE} (status {status}), url: {url}, request id: ray-id"
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@@ -840,12 +849,14 @@ mod tests {
|
||||
let err = UnexpectedResponseError {
|
||||
status: StatusCode::FORBIDDEN,
|
||||
body: "plain text error".to_string(),
|
||||
url: Some("http://example.com/plain".to_string()),
|
||||
request_id: None,
|
||||
};
|
||||
let status = StatusCode::FORBIDDEN.to_string();
|
||||
let url = "http://example.com/plain";
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
format!("unexpected status {status}: plain text error")
|
||||
format!("unexpected status {status}: plain text error, url: {url}")
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -9,6 +9,10 @@ use codex_protocol::models::ReasoningItemContent;
|
||||
use codex_protocol::models::ReasoningItemReasoningSummary;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::models::WebSearchAction;
|
||||
use codex_protocol::models::is_image_close_tag_text;
|
||||
use codex_protocol::models::is_image_open_tag_text;
|
||||
use codex_protocol::models::is_local_image_close_tag_text;
|
||||
use codex_protocol::models::is_local_image_open_tag_text;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use tracing::warn;
|
||||
use uuid::Uuid;
|
||||
@@ -32,9 +36,17 @@ fn parse_user_message(message: &[ContentItem]) -> Option<UserMessageItem> {
|
||||
|
||||
let mut content: Vec<UserInput> = Vec::new();
|
||||
|
||||
for content_item in message.iter() {
|
||||
for (idx, content_item) in message.iter().enumerate() {
|
||||
match content_item {
|
||||
ContentItem::InputText { text } => {
|
||||
if (is_local_image_open_tag_text(text) || is_image_open_tag_text(text))
|
||||
&& (matches!(message.get(idx + 1), Some(ContentItem::InputImage { .. })))
|
||||
|| (idx > 0
|
||||
&& (is_local_image_close_tag_text(text) || is_image_close_tag_text(text))
|
||||
&& matches!(message.get(idx - 1), Some(ContentItem::InputImage { .. })))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
if is_session_prefix(text) || is_user_shell_command_text(text) {
|
||||
return None;
|
||||
}
|
||||
@@ -177,6 +189,80 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn skips_local_image_label_text() {
|
||||
let image_url = "data:image/png;base64,abc".to_string();
|
||||
let label = codex_protocol::models::local_image_open_tag_text(1);
|
||||
let user_text = "Please review this image.".to_string();
|
||||
|
||||
let item = ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![
|
||||
ContentItem::InputText { text: label },
|
||||
ContentItem::InputImage {
|
||||
image_url: image_url.clone(),
|
||||
},
|
||||
ContentItem::InputText {
|
||||
text: "</image>".to_string(),
|
||||
},
|
||||
ContentItem::InputText {
|
||||
text: user_text.clone(),
|
||||
},
|
||||
],
|
||||
};
|
||||
|
||||
let turn_item = parse_turn_item(&item).expect("expected user message turn item");
|
||||
|
||||
match turn_item {
|
||||
TurnItem::UserMessage(user) => {
|
||||
let expected_content = vec![
|
||||
UserInput::Image { image_url },
|
||||
UserInput::Text { text: user_text },
|
||||
];
|
||||
assert_eq!(user.content, expected_content);
|
||||
}
|
||||
other => panic!("expected TurnItem::UserMessage, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn skips_unnamed_image_label_text() {
|
||||
let image_url = "data:image/png;base64,abc".to_string();
|
||||
let label = codex_protocol::models::image_open_tag_text();
|
||||
let user_text = "Please review this image.".to_string();
|
||||
|
||||
let item = ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![
|
||||
ContentItem::InputText { text: label },
|
||||
ContentItem::InputImage {
|
||||
image_url: image_url.clone(),
|
||||
},
|
||||
ContentItem::InputText {
|
||||
text: codex_protocol::models::image_close_tag_text(),
|
||||
},
|
||||
ContentItem::InputText {
|
||||
text: user_text.clone(),
|
||||
},
|
||||
],
|
||||
};
|
||||
|
||||
let turn_item = parse_turn_item(&item).expect("expected user message turn item");
|
||||
|
||||
match turn_item {
|
||||
TurnItem::UserMessage(user) => {
|
||||
let expected_content = vec![
|
||||
UserInput::Image { image_url },
|
||||
UserInput::Text { text: user_text },
|
||||
];
|
||||
assert_eq!(user.content, expected_content);
|
||||
}
|
||||
other => panic!("expected TurnItem::UserMessage, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn skips_user_instructions_and_env() {
|
||||
let items = vec![
|
||||
|
||||
@@ -29,9 +29,16 @@ where
|
||||
"HOME", "LOGNAME", "PATH", "SHELL", "USER", "USERNAME", "TMPDIR", "TEMP", "TMP",
|
||||
];
|
||||
let allow: HashSet<&str> = CORE_VARS.iter().copied().collect();
|
||||
vars.into_iter()
|
||||
.filter(|(k, _)| allow.contains(k.as_str()))
|
||||
.collect()
|
||||
let is_core_var = |name: &str| {
|
||||
if cfg!(target_os = "windows") {
|
||||
CORE_VARS
|
||||
.iter()
|
||||
.any(|allowed| allowed.eq_ignore_ascii_case(name))
|
||||
} else {
|
||||
allow.contains(name)
|
||||
}
|
||||
};
|
||||
vars.into_iter().filter(|(k, _)| is_core_var(k)).collect()
|
||||
}
|
||||
};
|
||||
|
||||
@@ -198,6 +205,30 @@ mod tests {
|
||||
assert_eq!(result, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(target_os = "windows")]
|
||||
fn test_core_inherit_respects_case_insensitive_names_on_windows() {
|
||||
let vars = make_vars(&[
|
||||
("Path", "C:\\Windows\\System32"),
|
||||
("TEMP", "C:\\Temp"),
|
||||
("FOO", "bar"),
|
||||
]);
|
||||
|
||||
let policy = ShellEnvironmentPolicy {
|
||||
inherit: ShellEnvironmentPolicyInherit::Core,
|
||||
ignore_default_excludes: true,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let result = populate_env(vars, &policy);
|
||||
let expected: HashMap<String, String> = hashmap! {
|
||||
"Path".to_string() => "C:\\Windows\\System32".to_string(),
|
||||
"TEMP".to_string() => "C:\\Temp".to_string(),
|
||||
};
|
||||
|
||||
assert_eq!(result, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_inherit_none() {
|
||||
let vars = make_vars(&[("PATH", "/usr/bin"), ("HOME", "/home")]);
|
||||
|
||||
@@ -86,12 +86,16 @@ pub enum Feature {
|
||||
RemoteModels,
|
||||
/// Experimental shell snapshotting.
|
||||
ShellSnapshot,
|
||||
/// Append additional AGENTS.md guidance to user instructions.
|
||||
HierarchicalAgents,
|
||||
/// Experimental TUI v2 (viewport) implementation.
|
||||
Tui2,
|
||||
/// Enforce UTF8 output in Powershell.
|
||||
PowershellUtf8,
|
||||
/// Compress request bodies (zstd) when sending streaming requests to codex-backend.
|
||||
EnableRequestCompression,
|
||||
/// Enable collab tools.
|
||||
Collab,
|
||||
}
|
||||
|
||||
impl Feature {
|
||||
@@ -350,6 +354,12 @@ pub const FEATURES: &[FeatureSpec] = &[
|
||||
},
|
||||
default_enabled: false,
|
||||
},
|
||||
FeatureSpec {
|
||||
id: Feature::HierarchicalAgents,
|
||||
key: "hierarchical_agents",
|
||||
stage: Stage::Experimental,
|
||||
default_enabled: false,
|
||||
},
|
||||
FeatureSpec {
|
||||
id: Feature::ApplyPatchFreeform,
|
||||
key: "apply_patch_freeform",
|
||||
@@ -398,6 +408,12 @@ pub const FEATURES: &[FeatureSpec] = &[
|
||||
stage: Stage::Experimental,
|
||||
default_enabled: false,
|
||||
},
|
||||
FeatureSpec {
|
||||
id: Feature::Collab,
|
||||
key: "collab",
|
||||
stage: Stage::Experimental,
|
||||
default_enabled: false,
|
||||
},
|
||||
FeatureSpec {
|
||||
id: Feature::Tui2,
|
||||
key: "tui2",
|
||||
|
||||
@@ -51,6 +51,7 @@ pub mod token_data;
|
||||
mod truncate;
|
||||
mod unified_exec;
|
||||
mod user_instructions;
|
||||
pub mod windows_sandbox;
|
||||
pub use model_provider_info::CHAT_WIRE_API_DEPRECATION_SUMMARY;
|
||||
pub use model_provider_info::DEFAULT_LMSTUDIO_PORT;
|
||||
pub use model_provider_info::DEFAULT_OLLAMA_PORT;
|
||||
@@ -114,6 +115,8 @@ pub use command_safety::is_safe_command;
|
||||
pub use exec_policy::ExecPolicyError;
|
||||
pub use exec_policy::load_exec_policy;
|
||||
pub use safety::get_platform_sandbox;
|
||||
pub use safety::is_windows_elevated_sandbox_enabled;
|
||||
pub use safety::set_windows_elevated_sandbox_enabled;
|
||||
pub use safety::set_windows_sandbox_enabled;
|
||||
// Re-export the protocol types from the standalone `codex-protocol` crate so existing
|
||||
// `codex_core::protocol::...` references continue to work across the workspace.
|
||||
|
||||
@@ -72,6 +72,12 @@ pub(crate) async fn handle_mcp_tool_call(
|
||||
|
||||
notify_mcp_tool_call_event(sess, turn_context, tool_call_end_event.clone()).await;
|
||||
|
||||
let status = if result.is_ok() { "ok" } else { "error" };
|
||||
turn_context
|
||||
.client
|
||||
.get_otel_manager()
|
||||
.counter("codex.mcp.call", 1, &[("status", status)]);
|
||||
|
||||
ResponseInputItem::McpToolCallOutput { call_id, result }
|
||||
}
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::sync::TryLockError;
|
||||
use tokio::time::timeout;
|
||||
use tracing::error;
|
||||
|
||||
use super::cache;
|
||||
@@ -21,6 +22,7 @@ use crate::api_bridge::map_api_error;
|
||||
use crate::auth::AuthManager;
|
||||
use crate::config::Config;
|
||||
use crate::default_client::build_reqwest_client;
|
||||
use crate::error::CodexErr;
|
||||
use crate::error::Result as CoreResult;
|
||||
use crate::features::Feature;
|
||||
use crate::model_provider_info::ModelProviderInfo;
|
||||
@@ -29,6 +31,7 @@ use crate::models_manager::model_presets::builtin_model_presets;
|
||||
|
||||
const MODEL_CACHE_FILE: &str = "models_cache.json";
|
||||
const DEFAULT_MODEL_CACHE_TTL: Duration = Duration::from_secs(300);
|
||||
const MODELS_REFRESH_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
const OPENAI_DEFAULT_API_MODEL: &str = "gpt-5.1-codex-max";
|
||||
const OPENAI_DEFAULT_CHATGPT_MODEL: &str = "gpt-5.2-codex";
|
||||
const CODEX_AUTO_BALANCED_MODEL: &str = "codex-auto-balanced";
|
||||
@@ -98,17 +101,20 @@ impl ModelsManager {
|
||||
if !remote_models_feature || self.auth_manager.get_auth_mode() == Some(AuthMode::ApiKey) {
|
||||
return Ok(());
|
||||
}
|
||||
let auth = self.auth_manager.auth();
|
||||
let auth = self.auth_manager.auth().await;
|
||||
let api_provider = self.provider.to_api_provider(Some(AuthMode::ChatGPT))?;
|
||||
let api_auth = auth_provider_from_auth(auth.clone(), &self.provider).await?;
|
||||
let api_auth = auth_provider_from_auth(auth.clone(), &self.provider)?;
|
||||
let transport = ReqwestTransport::new(build_reqwest_client());
|
||||
let client = ModelsClient::new(transport, api_provider, api_auth);
|
||||
|
||||
let client_version = format_client_version_to_whole();
|
||||
let (models, etag) = client
|
||||
.list_models(&client_version, HeaderMap::new())
|
||||
.await
|
||||
.map_err(map_api_error)?;
|
||||
let (models, etag) = timeout(
|
||||
MODELS_REFRESH_TIMEOUT,
|
||||
client.list_models(&client_version, HeaderMap::new()),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| CodexErr::Timeout)?
|
||||
.map_err(map_api_error)?;
|
||||
|
||||
self.apply_remote_models(models.clone()).await;
|
||||
*self.etag.write().await = etag.clone();
|
||||
|
||||
@@ -6,7 +6,7 @@ use codex_otel::config::OtelExporter;
|
||||
use codex_otel::config::OtelHttpProtocol;
|
||||
use codex_otel::config::OtelSettings;
|
||||
use codex_otel::config::OtelTlsConfig as OtelTlsSettings;
|
||||
use codex_otel::traces::otel_provider::OtelProvider;
|
||||
use codex_otel::otel_provider::OtelProvider;
|
||||
use std::error::Error;
|
||||
|
||||
/// Build an OpenTelemetry provider from the app Config.
|
||||
@@ -15,6 +15,8 @@ use std::error::Error;
|
||||
pub fn build_provider(
|
||||
config: &Config,
|
||||
service_version: &str,
|
||||
service_name_override: Option<&str>,
|
||||
default_analytics_enabled: bool,
|
||||
) -> Result<Option<OtelProvider>, Box<dyn Error>> {
|
||||
let to_otel_exporter = |kind: &Kind| match kind {
|
||||
Kind::None => OtelExporter::None,
|
||||
@@ -64,14 +66,20 @@ pub fn build_provider(
|
||||
|
||||
let exporter = to_otel_exporter(&config.otel.exporter);
|
||||
let trace_exporter = to_otel_exporter(&config.otel.trace_exporter);
|
||||
let metrics_exporter = if config.analytics {
|
||||
let metrics_exporter = if config
|
||||
.analytics_enabled
|
||||
.unwrap_or(default_analytics_enabled)
|
||||
{
|
||||
to_otel_exporter(&config.otel.metrics_exporter)
|
||||
} else {
|
||||
OtelExporter::None
|
||||
};
|
||||
|
||||
let originator = originator();
|
||||
let service_name = service_name_override.unwrap_or(originator.value.as_str());
|
||||
|
||||
OtelProvider::from(&OtelSettings {
|
||||
service_name: originator().value.to_owned(),
|
||||
service_name: service_name.to_string(),
|
||||
service_version: service_version.to_string(),
|
||||
codex_home: config.codex_home.clone(),
|
||||
environment: config.otel.environment.to_string(),
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user