Compare commits

...

3 Commits

Author SHA1 Message Date
Owen Lin
d29f364f23 feat(app-server): tracing pt. 1 2026-03-02 14:56:24 -08:00
Eric Traut
7709bf32a3 Fix project trust config parsing so CLI overrides work (#13090)
Fixes #13076

This PR fixes a bug that causes command-line config overrides for MCP
subtables to not be merged correctly.

Summary
- make project trust loading go through the dedicated struct so CLI
overrides can update trusted project-local MCP transports

---------

Co-authored-by: jif-oai <jif@openai.com>
2026-03-02 11:10:38 -07:00
Michael Bolin
3241c1c6cc fix: use https://git.savannah.gnu.org/git/bash instead of https://github.com/bolinfest/bash (#13057)
Historically, we cloned the Bash repo from
https://github.com/bminor/bash, but for whatever reason, it was removed
at some point.

I had a local clone of it, so I pushed it to
https://github.com/bolinfest/bash so that we could continue running our
CI job. I did this in https://github.com/openai/codex/pull/9563, and as
you can see, I did not tamper with the commit hash we used as the basis
of this build.

Using a personal fork is not great, so this PR changes the CI job to use
what appears to be considered the source of truth for Bash, which is
https://git.savannah.gnu.org/git/bash.git.

Though in testing this out, it appears this Git server does not support
the combination of `git clone --depth 1
https://git.savannah.gnu.org/git/bash` and `git fetch --depth 1 origin
a8a1c2fac029404d3f42cd39f5a20f24b6e4fe4b`, as it fails with the
following error:

```
error: Server does not allow request for unadvertised object a8a1c2fac029404d3f42cd39f5a20f24b6e4fe4b
```

so unfortunately this means that we have to do a full clone instead of a
shallow clone in our CI jobs, which will be a bit slower.

Also updated `codex-rs/shell-escalation/README.md` to reflect this
change.
2026-03-02 09:09:54 -08:00
24 changed files with 1734 additions and 364 deletions

View File

@@ -146,9 +146,8 @@ jobs:
shell: bash
run: |
set -euo pipefail
git clone --depth 1 https://github.com/bolinfest/bash /tmp/bash
git clone https://git.savannah.gnu.org/git/bash /tmp/bash
cd /tmp/bash
git fetch --depth 1 origin a8a1c2fac029404d3f42cd39f5a20f24b6e4fe4b
git checkout a8a1c2fac029404d3f42cd39f5a20f24b6e4fe4b
git apply "${GITHUB_WORKSPACE}/shell-tool-mcp/patches/bash-exec-wrapper.patch"
./configure --without-bash-malloc
@@ -188,9 +187,8 @@ jobs:
shell: bash
run: |
set -euo pipefail
git clone --depth 1 https://github.com/bolinfest/bash /tmp/bash
git clone https://git.savannah.gnu.org/git/bash /tmp/bash
cd /tmp/bash
git fetch --depth 1 origin a8a1c2fac029404d3f42cd39f5a20f24b6e4fe4b
git checkout a8a1c2fac029404d3f42cd39f5a20f24b6e4fe4b
git apply "${GITHUB_WORKSPACE}/shell-tool-mcp/patches/bash-exec-wrapper.patch"
./configure --without-bash-malloc

View File

@@ -0,0 +1,811 @@
# App-server v2 tracing design
This document proposes a simple, staged tracing design for
`codex-rs/app-server` with these goals:
- support distributed tracing from client-initiated app-server work into
app-server and `codex-core`
- keep tracing consistent across the app-server v2 surface area
- minimize tracing boilerplate in request handlers
- avoid introducing tracing-owned lifecycle state that duplicates existing
app-server runtime state
This design explicitly avoids a `RequestKind` taxonomy and avoids
app-server-owned long-lived lifecycle span registries.
## Summary
The design has four major pieces:
1. A transport-level W3C trace carrier on inbound JSON-RPC request envelopes.
2. A centralized app-server request tracing layer that wraps every inbound
request in the same request span.
3. An internal trace-context handoff through `codex_protocol::Submission` so
work that continues in `codex-core` inherits the inbound app-server request
ancestry.
4. A core-owned long-lived turn span for turn-producing operations such as
`turn/start` and `review/start`.
Every inbound JSON-RPC request gets a standardized request span.
When an app-server request submits work into core, the current span context is
captured into `Submission.trace` when the `Submission` is created. Core then
creates a short-lived dispatch span parented from that carrier and, for
turn-producing operations, creates a long-lived turn span beneath it before
continuing into its existing task and model request tracing.
Important:
- request spans stay short-lived
- long-lived turn spans are owned by core, not app-server
- the design does not add app-server-owned long-lived thread or realtime spans
## Design goals
- **Distributed tracing first**
- Clients should be able to send trace context to app-server.
- App-server should preserve that trace ancestry across the async handoff into
core.
- Existing core model request tracing should continue to inherit from the
active core span once the handoff occurs.
- **Consistent request instrumentation**
- Every inbound request should produce the same request span with the same
base attributes.
- Request tracing should be wired at the transport boundary, not repeated in
individual handlers.
- **Minimal boilerplate**
- Request handlers should not manually parse carriers or build request spans.
- Existing calls to `thread.submit(...)` and similar APIs should pick up trace
propagation automatically because `Submission` creation captures the active
span context.
- **Minimal business logic pollution**
- W3C parsing, OTEL conversion, and span-parenting rules should live in
tracing-specific modules.
- App-server business logic should stay focused on request handling, not span
management.
- **Incremental rollout**
- The first rollout should prove inbound request tracing and app-server ->
core propagation.
- Once propagation is in place, core should add a long-lived turn span so a
single span covers the actual duration of a turn.
- Thread and realtime lifecycle tracing should wait until there is a concrete
need.
## Non-goals
- This design does not attempt to make every loaded thread or realtime session
correspond to a long-lived tracing span.
- This design does not add tracing-owned thread or realtime state stores in the
initial design.
- This design does not require every app-server v2 `*Params` type to carry
trace metadata.
- This design does not require outbound JSON-RPC trace propagation in the
initial rollout.
## Why not `RequestKind`
An earlier direction considered a central `RequestKind` taxonomy such as
`Unary`, `TurnLifecycle`, or `RealtimeLifecycle`.
That is workable, but it makes tracing depend on a classification that can
drift from runtime behavior. The simpler design instead treats tracing as two
generic mechanics:
- every inbound request gets the same request span
- any async work that crosses from app-server into core gets the current span
context attached to `Submission`
This keeps the initial implementation small and avoids turning tracing into a
taxonomy maintenance problem.
## Terminology
- **Request span**
- A short-lived span for one inbound JSON-RPC request to app-server.
- **W3C trace context**
- A serializable representation of distributed trace context based on
`traceparent` and `tracestate`.
- **Submission trace handoff**
- The optional serialized trace context attached to
`codex_protocol::Submission` so core can restore parentage after the
app-server request handler returns.
- **Dispatch span**
- A short-lived core span created when the submission loop receives a
`Submission` with trace context.
- **Turn span**
- A long-lived core-owned span representing the actual runtime of a turn from
turn start until completion, interruption, or failure.
## High-level tracing model
### 1. Inbound request
For every inbound JSON-RPC request:
1. parse an optional W3C trace carrier from the JSON-RPC envelope
2. create a standardized request span
3. parent that span from the incoming carrier when present
4. process the request inside that span
This is true for every request, regardless of whether the API is unary or
starts work that continues later.
### 2. Async handoff into core
Some app-server requests submit work that continues in core after the original
request returns. The critical example is `turn/start`, but the mechanism should
be generic.
To preserve trace ancestry:
- add an optional `W3cTraceContext` to `codex_protocol::Submission`
- capture the current span context into that field when constructing a
`Submission` in core submission APIs such as `Codex::submit()` and
`Codex::submit_with_id()`
- have `codex-core` create a per-submission dispatch span parented from that
carrier
This gives a clean causal chain:
- client span
- app-server request span
- core dispatch span
- core turn span for turn-producing operations
- existing core spans such as `run_turn`, sampling, and model request spans
### 3. Core-owned turn spans
For turn-producing operations such as `turn/start` and `review/start`:
- app-server creates the inbound request span
- app-server propagates that request context through `Submission.trace`
- core creates a dispatch span when it receives the submission
- core then creates a long-lived turn span beneath that dispatch span
- existing core work such as `run_turn` and model request tracing runs beneath
the turn span
This keeps long-lived span ownership with the layer that actually owns turn
execution and completion.
### 4. Defer thread and realtime lifecycle-heavy tracing
The design should not add:
- app-server-owned thread residency stores
- app-server-owned realtime session stores
App-server already maintains thread subscription and runtime state in existing
structures. If later tracing work needs thread loaded-duration or realtime
duration metrics, that data should extend those existing structures rather than
introducing a parallel tracing-only state machine.
## Span model by API shape
The initial implementation keeps the app-server side uniform.
### Unary request/response APIs
Examples:
- `thread/list`
- `thread/read`
- `model/list`
- `config/read`
- `skills/list`
- `app/list`
Behavior:
- create request span
- return response
- no additional app-server span state
### Turn-producing APIs
Examples:
- `turn/start`
- `review/start`
- `thread/compact/start` when it executes as a normal turn lifecycle
Behavior:
- create request span
- submit work under that request span
- capture the current span context into `Submission.trace` when the
`Submission` is created
- let core create a dispatch span and then a long-lived turn span
- let the turn span remain open until the real core turn lifecycle ends
Important: request spans should not stay open until eventual streamed
completion. The request span ends quickly; the core-owned turn span carries the
long-running work.
### Other APIs that submit work into core
Examples:
- `thread/realtime/start`
- `thread/realtime/appendAudio`
- `thread/realtime/appendText`
- `thread/realtime/stop`
Behavior:
- create request span
- submit work under that request span
- capture the current span context into `Submission.trace` when the
`Submission` is created
- let core continue tracing from there
These APIs do not automatically imply a long-lived app-server or core lifecycle
span in the initial design.
### Thread lifecycle APIs
Examples:
- `thread/start`
- `thread/resume`
- `thread/fork`
- `thread/unsubscribe`
Behavior in the initial design:
- create request span
- annotate with `thread.id` when known
- do not introduce separate app-server lifecycle spans or tracing-only state
If later work needs thread loaded/unloaded metrics, it should reuse the existing
thread runtime state already maintained by app-server.
## Where the code should live
### `codex-rs/protocol`
Add a small shared `W3cTraceContext` type to
[`codex-rs/protocol/src/protocol.rs`](/Users/owen/repos/codex3/codex-rs/protocol/src/protocol.rs).
Responsibilities:
- define a serializable W3C trace context type
- avoid direct dependence on OTEL runtime types
- be usable from both protocol crates and runtime crates
Suggested contents:
- `W3cTraceContext`
- `traceparent: Option<String>`
- `tracestate: Option<String>`
Suggested `Submission` change:
- `Submission { id, op, trace: Option<W3cTraceContext> }`
This is the only new internal async handoff needed for the initial rollout.
### `codex-rs/otel`
Add a small helper module or extend existing tracing helpers so OTEL-specific
logic stays centralized.
Responsibilities:
- convert `W3cTraceContext` -> OTEL `Context`
- convert the current tracing span context -> `W3cTraceContext`
- parent a tracing span from an explicit carrier when present
- keep env `TRACEPARENT` / `TRACESTATE` lookup as an explicit helper used at
defined entrypoints, not as an implicit side effect of generic OTEL provider
initialization for app-server
- apply precedence rules:
- app-server inbound request spans: parent from request `trace` when present,
else from env `TRACEPARENT` / `TRACESTATE` during migration, otherwise
create a new root span
- app-server submission dispatch spans: parent from `Submission.trace` when
present, otherwise inherit naturally from the current span or create a root
span
- env `TRACEPARENT` / `TRACESTATE` fallback is not used for app-server
submission dispatch spans or deeper app-server/core spans
- env `TRACEPARENT` / `TRACESTATE` remains available for non-server
entrypoints or process-level startup tracing
Temporary compatibility during rollout:
- Codex Cloud currently injects `TRACEPARENT` /
`TRACESTATE` into the app-server process environment
- that env-based propagation should be treated as legacy compatibility while
clients migrate to sending request-level `trace` carriers on every JSON-RPC
request
- during that migration, only inbound app-server request span creation may fall
back to env when request `trace` is absent
- `codex-otel` should not keep app-server under ambient env parentage by
automatically attaching `TRACEPARENT` / `TRACESTATE` during provider
initialization; app-server request tracing should opt into env fallback only
when building the inbound request span
- once request-level carriers are in use for app-server clients, remove the
env-based request propagation path rather than keeping both mechanisms active
Required migration rule:
- app-server submission dispatch spans and downstream spans must not consult env
`TRACEPARENT` / `TRACESTATE` when determining span parentage
- once app-server inbound request spans or `Submission.trace` provide an
explicit parent, downstream spans must inherit from the current span and must
not re-parent themselves from env `TRACEPARENT` / `TRACESTATE`
- update both categories of existing env-based parenting so they do not
override explicit request/submission ancestry:
- lower-level callsites such as `apply_traceparent_parent` on `run_turn`
- provider-init behavior in `codex-otel` that eagerly attaches env trace
context for the process/thread
Important:
- keep this focused on carrier parsing and span parenting
- do not move app-server runtime state into `codex-otel`
- do not overload `OtelManager` with app-server lifecycle ownership in the
initial design
### `codex-rs/app-server-protocol`
Extend inbound JSON-RPC request envelopes in
[`codex-rs/app-server-protocol/src/jsonrpc_lite.rs`](/Users/owen/repos/codex3/codex-rs/app-server-protocol/src/jsonrpc_lite.rs)
with a dedicated optional trace carrier field.
Suggested shape:
- `JSONRPCRequest { id, method, params, trace }`
Where:
- `trace: Option<W3cTraceContext>`
Important:
- use a dedicated tracing field, not a generic `meta` bag
- keep tracing transport-level and method-agnostic
- do not add trace fields to individual `*Params` business payloads
### `codex-rs/core`
Make small changes in the submission path in
[`codex-rs/core/src/codex.rs`](/Users/owen/repos/codex3/codex-rs/core/src/codex.rs).
Responsibilities:
- capture the current span context when a `Submission` is created in
`Codex::submit()` / `Codex::submit_with_id()`
- read `Submission.trace`
- create a per-submission dispatch span parented from that carrier
- run existing submission handling under that span
- do not use env `TRACEPARENT` / `TRACESTATE` fallback in app-server submission
handling or deeper core spans created from app-server work
- replace lower-level env-based re-parenting where needed so explicit
`Submission.trace` ancestry remains authoritative
This is enough for existing core tracing to inherit the correct ancestry, and
it is the right place to add the long-lived turn span required for turn
lifecycles.
For turn-producing operations, core responsibilities should include:
- read `Submission.trace`
- create a per-submission dispatch span parented from that carrier
- create a long-lived turn span beneath the dispatch span when the operation
actually starts a turn
- finish that turn span when the real core turn lifecycle completes,
interrupts, or fails
### `codex-rs/app-server`
Add a small dedicated tracing module rather than spreading request tracing logic
across handlers. A likely shape is:
- `app_server_tracing/mod.rs`
- `app_server_tracing/request_spans.rs`
- `app_server_tracing/incoming.rs`
Responsibilities:
- extract incoming W3C trace carriers from JSON-RPC requests
- build standardized request spans
- provide a small API that wraps request handling in the correct span
Non-responsibilities in the initial design:
- no thread residency registry
- no realtime session registry
## Standardized request spans
Every inbound request should use the same request-span builder.
Suggested name:
- `app_server.request`
Suggested attributes:
- `rpc.system = "jsonrpc"`
- `rpc.service = "codex-app-server"`
- `rpc.method`
- `rpc.transport`
- `stdio`
- `websocket`
- `rpc.request_id`
- `app_server.connection_id`
- `app_server.api_version = "v2"` when applicable
- `app_server.client_name` when known from initialize
- `app_server.client_version` when known
Optional useful attributes:
- `thread.id` when already known from params
- `turn.id` when already known from params
Important:
- the span factory should be the only place that assembles these fields
- handlers should not manually construct request-span attributes
- for the `initialize` request itself, read `clientInfo.name` and
`clientInfo.version` directly from the request params when present
- for later requests on the same connection, read client metadata from
per-connection session state populated during `initialize`
## No app-server tracing registries
The design should not introduce app-server-owned tracing registries for turns,
threads, or realtime sessions.
Why:
- app-server already has thread subscription and runtime state
- core already owns the real task and turn lifecycle
- a second tracing-specific state machine adds more code and more ways for
lifecycle tracking to drift
Future guidance:
- if thread loaded/unloaded metrics become important, extend existing app-server
thread state
- keep long-lived turn spans in core
- if realtime lifecycle metrics become important, extend the existing realtime
runtime path rather than creating a parallel tracing store
## No direct span construction in handlers
Request handlers should not call `info_span!`, `trace_span!`, `set_parent`, or
OTEL APIs directly for app-server request tracing.
Instead:
- `message_processor` should wrap inbound request handling through the
centralized request-span helper
- `Codex::submit()` / `Codex::submit_with_id()` should capture the current span
context into `Submission.trace` when constructing the `Submission`
That keeps request tracing transport-level and largely invisible to business
handlers.
## Layering
The intended call graph is:
- `message_processor` -> `app_server_tracing`
- create and enter the standardized inbound request span
- `Codex::submit()` / `Codex::submit_with_id()` -> `codex-otel` trace-context
helper
- snapshot the current span context into `Submission.trace` when the
`Submission` is created
- `codex-core` submission loop -> `codex-otel` trace-context helper
- create a dispatch span parented from `Submission.trace`
- create a long-lived turn span for turn-producing operations
Important:
- app-server owns inbound request tracing
- core owns execution after the async handoff
- core owns long-lived turn spans
- the design does not add app-server-owned long-lived thread or realtime spans
## Inbound flow in app-server
The inbound request path should work like this:
1. Parse the JSON-RPC request envelope, including `trace`.
2. Use the tracing module to create a request span parented from that request
carrier when present, otherwise fall back to env `TRACEPARENT` /
`TRACESTATE` during migration, otherwise create a new root request span.
3. Process the request inside that span.
4. If the request submits work into core, capture the active span context into
`Submission.trace` when the `Submission` is created.
Integration point:
- [`codex-rs/app-server/src/message_processor.rs`](/Users/owen/repos/codex3/codex-rs/app-server/src/message_processor.rs)
## Core handoff flow
The `turn/start` and similar flows cross an async boundary:
- app-server handler submits work
- core submission loop receives `Submission`
- actual work continues later on different tasks
To preserve parentage:
1. app-server request handling runs inside `app_server.request`
2. `Codex::submit()` / `Codex::submit_with_id()` capture that active context
into `Submission.trace` when constructing the `Submission`
3. core submission loop creates a dispatch span parented from `Submission.trace`
without consulting env `TRACEPARENT` / `TRACESTATE`
4. if the submission starts a turn, core creates a long-lived turn span beneath
that dispatch span
5. existing core spans naturally nest under the turn span
This lets:
- submission handling
- a single long-lived turn span for turn-producing APIs
- `run_turn`
- model client request tracing
inherit the app-server request trace without broad tracing changes across core.
## Behavior for key v2 APIs
### `thread/start`
- create request span
- annotate with `thread.id` once known
- send response and `thread/started`
- no separate thread lifecycle span in the initial design
### `thread/resume`
- create request span
- annotate with `thread.id` when known
- no separate lifecycle span
### `thread/fork`
- create request span
- annotate with the new `thread.id`
- no separate lifecycle span
### `thread/unsubscribe`
- create request span
- no separate unload span
- if later thread unload metrics are needed, reuse existing thread state rather
than adding a tracing-only registry
### `turn/start`
- create request span
- submit work into core under that request span
- propagate the active span context through `Submission.trace` when the
`Submission` is created
- let core create a dispatch span and then a long-lived turn span
- let that turn span cover the full duration until completion, interruption, or
failure
### `turn/steer`
- create request span
- if the request submits core work, propagate via `Submission.trace`
- otherwise request span only
### `turn/interrupt`
- create request span
- request span only unless core submission is involved
### `review/start`
- treat like `turn/start`
- let core create the same kind of long-lived turn span
### `thread/realtime/start`, `appendAudio`, `appendText`, `stop`
- create request span
- if the API submits work into core, propagate via `Submission.trace` when the
`Submission` is created
- do not introduce separate realtime lifecycle spans in the initial design
### Unary methods such as `thread/list`
- create request span only
## Runtime checks
Keep runtime checks narrowly scoped in the initial rollout:
- warn when an inbound trace carrier is present but invalid
- test that `Submission.trace` is set when work is submitted from a traced
request
Do not add lifecycle consistency checks for tracing registries that do not
exist yet.
## Tests
Add tests for the initial mechanics:
- inbound request tracing accepts a valid W3C carrier
- invalid carriers are ignored cleanly
- unary methods create request spans without needing any extra handler changes
- `turn/start` propagates request ancestry through `Submission.trace` into core
- `turn/start` creates a long-lived core-owned turn span
- the turn span closes on completion, interruption, or failure
- existing core spans inherit from the propagated parent
- inbound app-server request spans prefer request `trace` and may temporarily
fall back to env `TRACEPARENT` / `TRACESTATE` during client migration
- app-server submission and downstream spans do not use env `TRACEPARENT` /
`TRACESTATE` fallback
- explicit transport or `Submission.trace` parents are not overwritten by
lower-level env-based re-parenting
The goal is to verify the centralized propagation behavior, not to exhaustively
test OTEL internals.
## Suggested PR sequence
### PR 1: Foundation plus inbound request spans
Scope:
1. Introduce a shared `W3cTraceContext` type in `codex-protocol`.
2. Add `trace` to inbound JSON-RPC request envelopes in app-server protocol.
3. Add focused trace-context helpers in `codex-rs/otel`.
4. Add the centralized app-server request tracing module.
5. Wrap inbound request handling in `message_processor.rs`.
Why this PR:
- proves the transport and request-span shape with minimal scope
- gives all inbound app-server APIs consistent request tracing immediately
- avoids mixing lifecycle questions into the initial plumbing review
### PR 2: Async handoff into core via `Submission`
Scope:
1. Add `trace` to `Submission`.
2. Capture the current span context automatically when constructing
`Submission` values in `Codex::submit()` / `Codex::submit_with_id()`.
3. Have the core submission loop restore parentage with a dispatch span.
4. Remove or gate both lower-level env-based re-parenting and provider-init
env attachment in `codex-otel` so `Submission.trace` remains the
authoritative parent when present.
5. Validate the flow with `turn/start`.
Why this PR:
- validates the critical async handoff from app-server into core
- proves that existing core tracing can inherit the app-server request ancestry
- keeps the behavior change focused on one boundary
### PR 3: Core-owned long-lived turn spans
Scope:
1. Add a long-lived turn span in core for `turn/start`.
2. Reuse the same turn-span pattern for `review/start`.
3. Ensure the span closes on completion, interruption, or failure.
Why this PR:
- completes the minimum useful tracing story for turn lifecycles
- keeps long-lived span ownership in the layer that actually owns the turn
- still builds on the simpler propagation model from PR 2 instead of mixing
everything into one change
### PR 4: Use request-level trace instead of env var in Python
Scope:
1. Update the Python app-server launcher/client to send `trace` on each
JSON-RPC request.
2. Reuse the existing upstream trace context in request envelopes during the
initial migration so end-to-end parentage is preserved before env fallback is
removed.
3. Continue preferring request-level `trace` over env once both exist.
Why this PR:
- preserves end-to-end parentage while the Rust side is migrating away from
env-based request propagation
- validates the transport-level tracing design with a real client
- moves trace propagation from process scope toward request scope without
waiting for the final cleanup
### PR 5: Remove support for `TRACEPARENT` / `TRACESTATE` when using codex app-server
Scope:
1. Stop consulting env `TRACEPARENT` / `TRACESTATE` for app-server inbound
request span creation.
2. Remove app-server-specific env injection from launchers once request-level
`trace` carriers are in use.
3. Keep env-based propagation only for non-server entrypoints or process-level
startup tracing if still needed, and require those entrypoints to opt in
explicitly rather than inheriting env parentage from generic provider init.
4. Validate that app-server requests still preserve parentage through explicit
request carriers.
Why this PR:
- completes the migration from process-scoped to request-scoped tracing for
app-server
- removes duplicated propagation mechanisms and future ambiguity
- aligns the implementation with the long-term design boundary
### PR 6: Optional follow-ups
Possible follow-ups:
1. Reuse existing app-server thread state to add thread loaded/unloaded duration
metrics if needed.
2. Reuse existing realtime runtime state to add realtime duration metrics if
needed.
3. Add outbound JSON-RPC trace propagation only if there is a concrete
client-side tracing use case.
## Rollout guidance
Start with:
- inbound request spans for all app-server requests
- `turn/start` request -> core propagation
- a core-owned long-lived turn span for `turn/start`
Migration note:
- the Rust implementation should treat request-level `trace` carriers as the
real app-server tracing path immediately
- env `TRACEPARENT` / `TRACESTATE` should remain only as temporary compatibility
for existing launchers, and only at the inbound request boundary, while
clients are updated to send `trace` on every JSON-RPC request
- after client rollout, remove the env-based app-server request propagation
path instead of keeping it as a long-term fallback
Those pieces exercise the important mechanics:
- inbound carrier extraction
- request span creation
- async handoff into core
- inherited core tracing beneath the propagated parent
- a single span covering the full duration of a turn
After that, only add more lifecycle-specific tracing if a real debugging or
observability gap remains.
## Bottom line
The recommended initial design is:
- trace context on inbound JSON-RPC request envelopes
- one standardized request span for every inbound request
- automatic propagation through `Submission` into core
- core-owned long-lived turn spans for turn-producing APIs
- OTEL conversion and carrier logic centralized in `codex-otel`
- env `TRACEPARENT` / `TRACESTATE` kept only as temporary compatibility during
launcher migration, not as the steady-state app-server request propagation
mechanism
- no app-server-owned tracing registries for turns, threads, or realtime
sessions in the initial implementation
This gives app-server distributed tracing that is:
- consistent
- low-boilerplate
- modular
- aligned with the existing ownership boundaries in app-server and core

1
codex-rs/Cargo.lock generated
View File

@@ -1428,6 +1428,7 @@ dependencies = [
"codex-feedback",
"codex-file-search",
"codex-login",
"codex-otel",
"codex-protocol",
"codex-rmcp-client",
"codex-shell-command",

View File

@@ -70,7 +70,17 @@
"method": {
"type": "string"
},
"params": true
"params": true,
"trace": {
"anyOf": [
{
"$ref": "#/definitions/W3cTraceContext"
},
{
"type": "null"
}
]
}
},
"required": [
"id",
@@ -102,6 +112,23 @@
"type": "integer"
}
]
},
"W3cTraceContext": {
"properties": {
"traceparent": {
"type": [
"string",
"null"
]
},
"tracestate": {
"type": [
"string",
"null"
]
}
},
"type": "object"
}
},
"description": "Refers to any valid JSON-RPC object that can be decoded off the wire, or encoded to be sent.",

View File

@@ -11,6 +11,23 @@
"type": "integer"
}
]
},
"W3cTraceContext": {
"properties": {
"traceparent": {
"type": [
"string",
"null"
]
},
"tracestate": {
"type": [
"string",
"null"
]
}
},
"type": "object"
}
},
"description": "A request that expects a response.",
@@ -21,7 +38,17 @@
"method": {
"type": "string"
},
"params": true
"params": true,
"trace": {
"anyOf": [
{
"$ref": "#/definitions/W3cTraceContext"
},
{
"type": "null"
}
]
}
},
"required": [
"id",

View File

@@ -5016,7 +5016,17 @@
"method": {
"type": "string"
},
"params": true
"params": true,
"trace": {
"anyOf": [
{
"$ref": "#/definitions/W3cTraceContext"
},
{
"type": "null"
}
]
}
},
"required": [
"id",
@@ -7220,6 +7230,23 @@
}
]
},
"W3cTraceContext": {
"properties": {
"traceparent": {
"type": [
"string",
"null"
]
},
"tracestate": {
"type": [
"string",
"null"
]
}
},
"type": "object"
},
"v2": {
"AbsolutePathBuf": {
"description": "A path that is guaranteed to be absolute and normalized (though it is not guaranteed to be canonicalized or exist on the filesystem).\n\nIMPORTANT: When deserializing an `AbsolutePathBuf`, a base path must be set using [AbsolutePathBufGuard::new]. If no base path is set, the deserialization will fail unless the path being deserialized is already absolute.",

View File

@@ -1,6 +1,7 @@
//! We do not do true JSON-RPC 2.0, as we neither send nor expect the
//! "jsonrpc": "2.0" field.
use codex_protocol::protocol::W3cTraceContext;
use schemars::JsonSchema;
use serde::Deserialize;
use serde::Serialize;
@@ -38,6 +39,11 @@ pub struct JSONRPCRequest {
#[serde(default, skip_serializing_if = "Option::is_none")]
#[ts(optional)]
pub params: Option<serde_json::Value>,
/// Optional W3C Trace Context carrier for distributed tracing.
/// Uses `traceparent` and `tracestate` at the JSON-RPC envelope level.
#[serde(default, skip_serializing_if = "Option::is_none")]
#[ts(optional)]
pub trace: Option<W3cTraceContext>,
}
/// A notification which does not expect a response.

View File

@@ -15,6 +15,8 @@ use std::process::Command;
use std::process::Stdio;
use std::thread;
use std::time::Duration;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use anyhow::Context;
use anyhow::Result;
@@ -71,6 +73,7 @@ use codex_app_server_protocol::UserInput as V2UserInput;
use codex_protocol::ThreadId;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::W3cTraceContext;
use serde::Serialize;
use serde::de::DeserializeOwned;
use serde_json::Value;
@@ -104,6 +107,10 @@ const NOTIFICATIONS_TO_OPT_OUT: &[&str] = &[
"item/reasoning/summaryTextDelta",
"item/reasoning/textDelta",
];
const APP_SERVER_GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
const APP_SERVER_GRACEFUL_SHUTDOWN_POLL_INTERVAL: Duration = Duration::from_millis(100);
const DATADOG_TRACE_BASE_URL: &str = "https://openai.datadoghq.com/apm/traces";
const DATADOG_TRACE_WINDOW_MS: u128 = 15 * 60 * 1000;
/// Minimal launcher that initializes the Codex app-server and logs the handshake.
#[derive(Parser)]
@@ -499,24 +506,28 @@ fn send_message(
user_message: String,
) -> Result<()> {
let mut client = CodexClient::connect(endpoint, config_overrides)?;
let result = (|| -> Result<()> {
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
let conversation = client.start_thread()?;
println!("< newConversation response: {conversation:?}");
let conversation = client.start_thread()?;
println!("< newConversation response: {conversation:?}");
let subscription = client.add_conversation_listener(&conversation.conversation_id)?;
println!("< addConversationListener response: {subscription:?}");
let subscription = client.add_conversation_listener(&conversation.conversation_id)?;
println!("< addConversationListener response: {subscription:?}");
let send_response =
client.send_user_message(&conversation.conversation_id, &user_message)?;
println!("< sendUserMessage response: {send_response:?}");
let send_response = client.send_user_message(&conversation.conversation_id, &user_message)?;
println!("< sendUserMessage response: {send_response:?}");
client.stream_conversation(&conversation.conversation_id)?;
client.stream_conversation(&conversation.conversation_id)?;
client.remove_thread_listener(subscription.subscription_id)?;
client.remove_thread_listener(subscription.subscription_id)?;
Ok(())
Ok(())
})();
client.print_trace_summary();
result
}
pub fn send_message_v2(
@@ -575,81 +586,87 @@ fn trigger_zsh_fork_multi_cmd_approval(
let message = user_message.unwrap_or_else(|| default_prompt.to_string());
let mut client = CodexClient::connect(endpoint, config_overrides)?;
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
let result = (|| -> Result<()> {
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
let thread_response = client.thread_start(ThreadStartParams {
dynamic_tools: dynamic_tools.clone(),
..Default::default()
})?;
println!("< thread/start response: {thread_response:?}");
let thread_response = client.thread_start(ThreadStartParams {
dynamic_tools: dynamic_tools.clone(),
..Default::default()
})?;
println!("< thread/start response: {thread_response:?}");
client.command_approval_behavior = match abort_on {
Some(index) => CommandApprovalBehavior::AbortOn(index),
None => CommandApprovalBehavior::AlwaysAccept,
};
client.command_approval_count = 0;
client.command_approval_item_ids.clear();
client.command_execution_statuses.clear();
client.last_turn_status = None;
client.command_approval_behavior = match abort_on {
Some(index) => CommandApprovalBehavior::AbortOn(index),
None => CommandApprovalBehavior::AlwaysAccept,
};
client.command_approval_count = 0;
client.command_approval_item_ids.clear();
client.command_execution_statuses.clear();
client.last_turn_status = None;
let mut turn_params = TurnStartParams {
thread_id: thread_response.thread.id.clone(),
input: vec![V2UserInput::Text {
text: message,
text_elements: Vec::new(),
}],
..Default::default()
};
turn_params.approval_policy = Some(AskForApproval::OnRequest);
turn_params.sandbox_policy = Some(SandboxPolicy::ReadOnly {
access: ReadOnlyAccess::FullAccess,
});
let mut turn_params = TurnStartParams {
thread_id: thread_response.thread.id.clone(),
input: vec![V2UserInput::Text {
text: message,
text_elements: Vec::new(),
}],
..Default::default()
};
turn_params.approval_policy = Some(AskForApproval::OnRequest);
turn_params.sandbox_policy = Some(SandboxPolicy::ReadOnly {
access: ReadOnlyAccess::FullAccess,
});
let turn_response = client.turn_start(turn_params)?;
println!("< turn/start response: {turn_response:?}");
client.stream_turn(&thread_response.thread.id, &turn_response.turn.id)?;
let turn_response = client.turn_start(turn_params)?;
println!("< turn/start response: {turn_response:?}");
client.stream_turn(&thread_response.thread.id, &turn_response.turn.id)?;
if client.command_approval_count < min_approvals {
bail!(
"expected at least {min_approvals} command approvals, got {}",
client.command_approval_count
);
}
let mut approvals_per_item = std::collections::BTreeMap::new();
for item_id in &client.command_approval_item_ids {
*approvals_per_item.entry(item_id.clone()).or_insert(0usize) += 1;
}
let max_approvals_for_one_item = approvals_per_item.values().copied().max().unwrap_or(0);
if max_approvals_for_one_item < min_approvals {
bail!(
"expected at least {min_approvals} approvals for one command item, got max {max_approvals_for_one_item} with map {approvals_per_item:?}"
);
}
let last_command_status = client.command_execution_statuses.last();
if abort_on.is_none() {
if last_command_status != Some(&CommandExecutionStatus::Completed) {
bail!("expected completed command execution, got {last_command_status:?}");
}
if client.last_turn_status != Some(TurnStatus::Completed) {
if client.command_approval_count < min_approvals {
bail!(
"expected completed turn in all-accept flow, got {:?}",
client.last_turn_status
"expected at least {min_approvals} command approvals, got {}",
client.command_approval_count
);
}
} else if last_command_status == Some(&CommandExecutionStatus::Completed) {
bail!(
"expected non-completed command execution in mixed approval/decline flow, got {last_command_status:?}"
let mut approvals_per_item = std::collections::BTreeMap::new();
for item_id in &client.command_approval_item_ids {
*approvals_per_item.entry(item_id.clone()).or_insert(0usize) += 1;
}
let max_approvals_for_one_item = approvals_per_item.values().copied().max().unwrap_or(0);
if max_approvals_for_one_item < min_approvals {
bail!(
"expected at least {min_approvals} approvals for one command item, got max {max_approvals_for_one_item} with map {approvals_per_item:?}"
);
}
let last_command_status = client.command_execution_statuses.last();
if abort_on.is_none() {
if last_command_status != Some(&CommandExecutionStatus::Completed) {
bail!("expected completed command execution, got {last_command_status:?}");
}
if client.last_turn_status != Some(TurnStatus::Completed) {
bail!(
"expected completed turn in all-accept flow, got {:?}",
client.last_turn_status
);
}
} else if last_command_status == Some(&CommandExecutionStatus::Completed) {
bail!(
"expected non-completed command execution in mixed approval/decline flow, got {last_command_status:?}"
);
}
println!(
"[zsh-fork multi-approval summary] approvals={}, approvals_per_item={approvals_per_item:?}, command_statuses={:?}, turn_status={:?}",
client.command_approval_count,
client.command_execution_statuses,
client.last_turn_status
);
}
println!(
"[zsh-fork multi-approval summary] approvals={}, approvals_per_item={approvals_per_item:?}, command_statuses={:?}, turn_status={:?}",
client.command_approval_count, client.command_execution_statuses, client.last_turn_status
);
Ok(())
Ok(())
})();
client.print_trace_summary();
result
}
fn resume_message_v2(
@@ -662,29 +679,32 @@ fn resume_message_v2(
ensure_dynamic_tools_unused(dynamic_tools, "resume-message-v2")?;
let mut client = CodexClient::connect(endpoint, config_overrides)?;
let result = (|| -> Result<()> {
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
let resume_response = client.thread_resume(ThreadResumeParams {
thread_id,
..Default::default()
})?;
println!("< thread/resume response: {resume_response:?}");
let resume_response = client.thread_resume(ThreadResumeParams {
thread_id,
..Default::default()
})?;
println!("< thread/resume response: {resume_response:?}");
let turn_response = client.turn_start(TurnStartParams {
thread_id: resume_response.thread.id.clone(),
input: vec![V2UserInput::Text {
text: user_message,
text_elements: Vec::new(),
}],
..Default::default()
})?;
println!("< turn/start response: {turn_response:?}");
let turn_response = client.turn_start(TurnStartParams {
thread_id: resume_response.thread.id.clone(),
input: vec![V2UserInput::Text {
text: user_message,
text_elements: Vec::new(),
}],
..Default::default()
})?;
println!("< turn/start response: {turn_response:?}");
client.stream_turn(&resume_response.thread.id, &turn_response.turn.id)?;
client.stream_turn(&resume_response.thread.id, &turn_response.turn.id)?;
Ok(())
Ok(())
})();
client.print_trace_summary();
result
}
fn thread_resume_follow(
@@ -788,33 +808,36 @@ fn send_message_v2_with_policies(
dynamic_tools: &Option<Vec<DynamicToolSpec>>,
) -> Result<()> {
let mut client = CodexClient::connect(endpoint, config_overrides)?;
let result = (|| -> Result<()> {
let initialize = client.initialize_with_experimental_api(experimental_api)?;
println!("< initialize response: {initialize:?}");
let initialize = client.initialize_with_experimental_api(experimental_api)?;
println!("< initialize response: {initialize:?}");
let thread_response = client.thread_start(ThreadStartParams {
dynamic_tools: dynamic_tools.clone(),
..Default::default()
})?;
println!("< thread/start response: {thread_response:?}");
let mut turn_params = TurnStartParams {
thread_id: thread_response.thread.id.clone(),
input: vec![V2UserInput::Text {
text: user_message,
// Test client sends plain text without UI element ranges.
text_elements: Vec::new(),
}],
..Default::default()
};
turn_params.approval_policy = approval_policy;
turn_params.sandbox_policy = sandbox_policy;
let thread_response = client.thread_start(ThreadStartParams {
dynamic_tools: dynamic_tools.clone(),
..Default::default()
})?;
println!("< thread/start response: {thread_response:?}");
let mut turn_params = TurnStartParams {
thread_id: thread_response.thread.id.clone(),
input: vec![V2UserInput::Text {
text: user_message,
// Test client sends plain text without UI element ranges.
text_elements: Vec::new(),
}],
..Default::default()
};
turn_params.approval_policy = approval_policy;
turn_params.sandbox_policy = sandbox_policy;
let turn_response = client.turn_start(turn_params)?;
println!("< turn/start response: {turn_response:?}");
let turn_response = client.turn_start(turn_params)?;
println!("< turn/start response: {turn_response:?}");
client.stream_turn(&thread_response.thread.id, &turn_response.turn.id)?;
client.stream_turn(&thread_response.thread.id, &turn_response.turn.id)?;
Ok(())
Ok(())
})();
client.print_trace_summary();
result
}
fn send_follow_up_v2(
@@ -825,118 +848,133 @@ fn send_follow_up_v2(
dynamic_tools: &Option<Vec<DynamicToolSpec>>,
) -> Result<()> {
let mut client = CodexClient::connect(endpoint, config_overrides)?;
let result = (|| -> Result<()> {
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
let thread_response = client.thread_start(ThreadStartParams {
dynamic_tools: dynamic_tools.clone(),
..Default::default()
})?;
println!("< thread/start response: {thread_response:?}");
let thread_response = client.thread_start(ThreadStartParams {
dynamic_tools: dynamic_tools.clone(),
..Default::default()
})?;
println!("< thread/start response: {thread_response:?}");
let first_turn_params = TurnStartParams {
thread_id: thread_response.thread.id.clone(),
input: vec![V2UserInput::Text {
text: first_message,
// Test client sends plain text without UI element ranges.
text_elements: Vec::new(),
}],
..Default::default()
};
let first_turn_response = client.turn_start(first_turn_params)?;
println!("< turn/start response (initial): {first_turn_response:?}");
client.stream_turn(&thread_response.thread.id, &first_turn_response.turn.id)?;
let first_turn_params = TurnStartParams {
thread_id: thread_response.thread.id.clone(),
input: vec![V2UserInput::Text {
text: first_message,
// Test client sends plain text without UI element ranges.
text_elements: Vec::new(),
}],
..Default::default()
};
let first_turn_response = client.turn_start(first_turn_params)?;
println!("< turn/start response (initial): {first_turn_response:?}");
client.stream_turn(&thread_response.thread.id, &first_turn_response.turn.id)?;
let follow_up_params = TurnStartParams {
thread_id: thread_response.thread.id.clone(),
input: vec![V2UserInput::Text {
text: follow_up_message,
// Test client sends plain text without UI element ranges.
text_elements: Vec::new(),
}],
..Default::default()
};
let follow_up_response = client.turn_start(follow_up_params)?;
println!("< turn/start response (follow-up): {follow_up_response:?}");
client.stream_turn(&thread_response.thread.id, &follow_up_response.turn.id)?;
let follow_up_params = TurnStartParams {
thread_id: thread_response.thread.id.clone(),
input: vec![V2UserInput::Text {
text: follow_up_message,
// Test client sends plain text without UI element ranges.
text_elements: Vec::new(),
}],
..Default::default()
};
let follow_up_response = client.turn_start(follow_up_params)?;
println!("< turn/start response (follow-up): {follow_up_response:?}");
client.stream_turn(&thread_response.thread.id, &follow_up_response.turn.id)?;
Ok(())
Ok(())
})();
client.print_trace_summary();
result
}
fn test_login(endpoint: &Endpoint, config_overrides: &[String]) -> Result<()> {
let mut client = CodexClient::connect(endpoint, config_overrides)?;
let result = (|| -> Result<()> {
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
let login_response = client.login_chat_gpt()?;
println!("< loginChatGpt response: {login_response:?}");
println!(
"Open the following URL in your browser to continue:\n{}",
login_response.auth_url
);
let completion = client.wait_for_login_completion(&login_response.login_id)?;
println!("< loginChatGptComplete notification: {completion:?}");
if completion.success {
println!("Login succeeded.");
Ok(())
} else {
bail!(
"login failed: {}",
completion
.error
.as_deref()
.unwrap_or("unknown error from loginChatGptComplete")
let login_response = client.login_chat_gpt()?;
println!("< loginChatGpt response: {login_response:?}");
println!(
"Open the following URL in your browser to continue:\n{}",
login_response.auth_url
);
}
let completion = client.wait_for_login_completion(&login_response.login_id)?;
println!("< loginChatGptComplete notification: {completion:?}");
if completion.success {
println!("Login succeeded.");
Ok(())
} else {
bail!(
"login failed: {}",
completion
.error
.as_deref()
.unwrap_or("unknown error from loginChatGptComplete")
);
}
})();
client.print_trace_summary();
result
}
fn get_account_rate_limits(endpoint: &Endpoint, config_overrides: &[String]) -> Result<()> {
let mut client = CodexClient::connect(endpoint, config_overrides)?;
let result = (|| -> Result<()> {
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
let response = client.get_account_rate_limits()?;
println!("< account/rateLimits/read response: {response:?}");
let response = client.get_account_rate_limits()?;
println!("< account/rateLimits/read response: {response:?}");
Ok(())
Ok(())
})();
client.print_trace_summary();
result
}
fn model_list(endpoint: &Endpoint, config_overrides: &[String]) -> Result<()> {
let mut client = CodexClient::connect(endpoint, config_overrides)?;
let result = (|| -> Result<()> {
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
let response = client.model_list(ModelListParams::default())?;
println!("< model/list response: {response:?}");
let response = client.model_list(ModelListParams::default())?;
println!("< model/list response: {response:?}");
Ok(())
Ok(())
})();
client.print_trace_summary();
result
}
fn thread_list(endpoint: &Endpoint, config_overrides: &[String], limit: u32) -> Result<()> {
let mut client = CodexClient::connect(endpoint, config_overrides)?;
let result = (|| -> Result<()> {
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
let response = client.thread_list(ThreadListParams {
cursor: None,
limit: Some(limit),
sort_key: None,
model_providers: None,
source_kinds: None,
archived: None,
cwd: None,
search_term: None,
})?;
println!("< thread/list response: {response:?}");
let response = client.thread_list(ThreadListParams {
cursor: None,
limit: Some(limit),
sort_key: None,
model_providers: None,
source_kinds: None,
archived: None,
cwd: None,
search_term: None,
})?;
println!("< thread/list response: {response:?}");
Ok(())
Ok(())
})();
client.print_trace_summary();
result
}
fn ensure_dynamic_tools_unused(
@@ -993,6 +1031,14 @@ struct CodexClient {
command_approval_item_ids: Vec<String>,
command_execution_statuses: Vec<CommandExecutionStatus>,
last_turn_status: Option<TurnStatus>,
trace_id: String,
trace_requests: Vec<TraceRequestSummary>,
}
struct TraceRequestSummary {
method: String,
request_id: String,
parent_span_id: String,
}
#[derive(Debug, Clone, Copy)]
@@ -1052,6 +1098,8 @@ impl CodexClient {
command_approval_item_ids: Vec::new(),
command_execution_statuses: Vec::new(),
last_turn_status: None,
trace_id: generate_trace_id(),
trace_requests: Vec::new(),
})
}
@@ -1073,6 +1121,8 @@ impl CodexClient {
command_approval_item_ids: Vec::new(),
command_execution_statuses: Vec::new(),
last_turn_status: None,
trace_id: generate_trace_id(),
trace_requests: Vec::new(),
})
}
@@ -1438,12 +1488,67 @@ impl CodexClient {
}
fn write_request(&mut self, request: &ClientRequest) -> Result<()> {
let request_json = serde_json::to_string(request)?;
let request_pretty = serde_json::to_string_pretty(request)?;
let request = self.jsonrpc_request_with_trace(request)?;
self.record_request_trace_info(&request);
let request_json = serde_json::to_string(&request)?;
let request_pretty = serde_json::to_string_pretty(&request)?;
print_multiline_with_prefix("> ", &request_pretty);
self.write_payload(&request_json)
}
fn jsonrpc_request_with_trace(&self, request: &ClientRequest) -> Result<JSONRPCRequest> {
let request_value = serde_json::to_value(request)?;
let mut request: JSONRPCRequest = serde_json::from_value(request_value)
.context("client request was not a valid JSON-RPC request")?;
request.trace = Some(W3cTraceContext {
traceparent: Some(format!(
"00-{}-{}-01",
self.trace_id,
generate_parent_span_id()
)),
tracestate: None,
});
Ok(request)
}
fn record_request_trace_info(&mut self, request: &JSONRPCRequest) {
let Some(trace) = request.trace.as_ref() else {
return;
};
let Some(traceparent) = trace.traceparent.as_deref() else {
return;
};
let Some((trace_id, parent_span_id)) = parse_traceparent(traceparent) else {
return;
};
debug_assert_eq!(trace_id, self.trace_id);
self.trace_requests.push(TraceRequestSummary {
method: request.method.clone(),
request_id: format_request_id(&request.id),
parent_span_id: parent_span_id.to_string(),
});
}
fn print_trace_summary(&self) {
if self.trace_requests.is_empty() {
return;
}
println!("\n[trace summary]");
println!("trace_id: {}", self.trace_id);
println!(
"datadog_url: {}",
datadog_trace_url(&self.trace_id, "codex_app_server")
);
println!("requests:");
for request in &self.trace_requests {
println!(" method: {}", request.method);
println!(" request_id: {}", request.request_id);
println!(" parent_span_id: {}", request.parent_span_id);
}
}
fn wait_for_response<T>(&mut self, request_id: RequestId, method: &str) -> Result<T>
where
T: DeserializeOwned,
@@ -1709,6 +1814,50 @@ impl CodexClient {
}
}
fn generate_trace_id() -> String {
Uuid::new_v4().simple().to_string()
}
fn generate_parent_span_id() -> String {
let uuid = Uuid::new_v4().simple().to_string();
uuid[..16].to_string()
}
fn parse_traceparent(traceparent: &str) -> Option<(&str, &str)> {
let mut parts = traceparent.split('-');
let _version = parts.next()?;
let trace_id = parts.next()?;
let parent_span_id = parts.next()?;
let _flags = parts.next()?;
if parts.next().is_some() {
return None;
}
Some((trace_id, parent_span_id))
}
fn format_request_id(request_id: &RequestId) -> String {
match request_id {
RequestId::String(value) => value.clone(),
RequestId::Integer(value) => value.to_string(),
}
}
fn datadog_trace_url(trace_id: &str, service_name: &str) -> String {
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_millis())
.unwrap_or(0);
let start = now_ms.saturating_sub(DATADOG_TRACE_WINDOW_MS);
let end = now_ms.saturating_add(DATADOG_TRACE_WINDOW_MS);
let query = format!("service:{service_name} trace_id:{trace_id}");
let encoded_query: String = url::form_urlencoded::byte_serialize(query.as_bytes()).collect();
format!(
"{DATADOG_TRACE_BASE_URL}?query={encoded_query}&agg_m=count&agg_m_source=base&agg_t=count&fromUser=false&graphType=flamegraph&historicalData=false&messageDisplay=inline&query_translation_version=v0&shouldShowLegend=true&sort=desc&spanType=all&storage=hot&target-span=a&tq_query_translation_version=v0&traceQuery=a&view=spans&viz=stream&start={start}&end={end}&paused=false"
)
}
fn print_multiline_with_prefix(prefix: &str, payload: &str) {
for line in payload.lines() {
println!("{prefix}{line}");
@@ -1728,11 +1877,18 @@ impl Drop for CodexClient {
return;
}
thread::sleep(Duration::from_millis(100));
let deadline = SystemTime::now() + APP_SERVER_GRACEFUL_SHUTDOWN_TIMEOUT;
loop {
if let Ok(Some(status)) = child.try_wait() {
println!("[codex app-server exited: {status}]");
return;
}
if let Ok(Some(status)) = child.try_wait() {
println!("[codex app-server exited: {status}]");
return;
if SystemTime::now() >= deadline {
break;
}
thread::sleep(APP_SERVER_GRACEFUL_SHUTDOWN_POLL_INTERVAL);
}
let _ = child.kill();

View File

@@ -21,6 +21,7 @@ async-trait = { workspace = true }
codex-arg0 = { workspace = true }
codex-cloud-requirements = { workspace = true }
codex-core = { workspace = true }
codex-otel = { workspace = true }
codex-shell-command = { workspace = true }
codex-utils-cli = { workspace = true }
codex-backend-client = { workspace = true }

View File

@@ -28,6 +28,10 @@ Supported transports:
Websocket transport is currently experimental and unsupported. Do not rely on it for production workloads.
Inbound JSON-RPC requests may include an optional top-level `trace` object with
W3C Trace Context fields (`traceparent` and optional `tracestate`). This keeps
trace propagation transport-level and independent of method-specific params.
Tracing/log output:
- `RUST_LOG` controls log filtering/verbosity.
@@ -85,6 +89,9 @@ Example (from OpenAI's official VSCode extension):
{
"method": "initialize",
"id": 0,
"trace": {
"traceparent": "00-00000000000000000000000000000001-0000000000000002-01"
},
"params": {
"clientInfo": {
"name": "codex_vscode",

View File

@@ -0,0 +1,89 @@
use crate::message_processor::ConnectionSessionState;
use crate::outgoing_message::ConnectionId;
use crate::transport::AppServerTransport;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::JSONRPCRequest;
use codex_otel::set_parent_from_context;
use codex_otel::set_parent_from_w3c_trace_context;
use codex_otel::traceparent_context_from_env;
use tracing::Span;
use tracing::field;
use tracing::info_span;
pub(crate) fn request_span(
request: &JSONRPCRequest,
transport: AppServerTransport,
connection_id: ConnectionId,
session: &ConnectionSessionState,
) -> Span {
let span = info_span!(
"app_server.request",
otel.kind = "server",
otel.name = request.method.as_str(),
rpc.system = "jsonrpc",
rpc.method = request.method.as_str(),
rpc.transport = transport_name(transport),
rpc.request_id = ?request.id,
app_server.connection_id = ?connection_id,
app_server.api_version = "v2",
app_server.client_name = field::Empty,
app_server.client_version = field::Empty,
);
let initialize_client_info = initialize_client_info(request);
if let Some(client_name) = client_name(initialize_client_info.as_ref(), session) {
span.record("app_server.client_name", client_name);
}
if let Some(client_version) = client_version(initialize_client_info.as_ref(), session) {
span.record("app_server.client_version", client_version);
}
if let Some(trace) = &request.trace {
if !set_parent_from_w3c_trace_context(&span, trace) {
tracing::warn!(
rpc_method = request.method.as_str(),
rpc_request_id = ?request.id,
"ignoring invalid inbound request trace carrier"
);
}
} else if let Some(context) = traceparent_context_from_env() {
set_parent_from_context(&span, context);
}
span
}
fn transport_name(transport: AppServerTransport) -> &'static str {
match transport {
AppServerTransport::Stdio => "stdio",
AppServerTransport::WebSocket { .. } => "websocket",
}
}
fn client_name<'a>(
initialize_client_info: Option<&'a InitializeParams>,
session: &'a ConnectionSessionState,
) -> Option<&'a str> {
if let Some(params) = initialize_client_info {
return Some(params.client_info.name.as_str());
}
session.app_server_client_name.as_deref()
}
fn client_version<'a>(
initialize_client_info: Option<&'a InitializeParams>,
session: &'a ConnectionSessionState,
) -> Option<&'a str> {
if let Some(params) = initialize_client_info {
return Some(params.client_info.version.as_str());
}
session.client_version.as_deref()
}
fn initialize_client_info(request: &JSONRPCRequest) -> Option<InitializeParams> {
if request.method != "initialize" {
return None;
}
let params = request.params.clone()?;
serde_json::from_value(params).ok()
}

View File

@@ -52,6 +52,7 @@ use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::registry::Registry;
use tracing_subscriber::util::SubscriberInitExt;
mod app_server_tracing;
mod bespoke_event_handling;
mod codex_message_processor;
mod config_api;
@@ -447,7 +448,7 @@ pub async fn run_main_with_transport(
let otel = codex_core::otel_init::build_provider(
&config,
env!("CARGO_PKG_VERSION"),
Some("codex_app_server"),
Some("codex-app-server"),
default_analytics_enabled,
)
.map_err(|e| {
@@ -675,6 +676,7 @@ pub async fn run_main_with_transport(
.process_request(
connection_id,
request,
transport,
&mut connection_state.session,
&connection_state.outbound_initialized,
)

View File

@@ -12,6 +12,7 @@ use crate::external_agent_config_api::ExternalAgentConfigApi;
use crate::outgoing_message::ConnectionId;
use crate::outgoing_message::ConnectionRequestId;
use crate::outgoing_message::OutgoingMessageSender;
use crate::transport::AppServerTransport;
use async_trait::async_trait;
use codex_app_server_protocol::ChatgptAuthTokensRefreshParams;
use codex_app_server_protocol::ChatgptAuthTokensRefreshReason;
@@ -59,6 +60,7 @@ use tokio::sync::watch;
use tokio::time::Duration;
use tokio::time::timeout;
use toml::Value as TomlValue;
use tracing::Instrument;
const EXTERNAL_AUTH_REFRESH_TIMEOUT: Duration = Duration::from_secs(10);
@@ -141,6 +143,7 @@ pub(crate) struct ConnectionSessionState {
pub(crate) experimental_api_enabled: bool,
pub(crate) opted_out_notification_methods: HashSet<String>,
pub(crate) app_server_client_name: Option<String>,
pub(crate) client_version: Option<String>,
}
pub(crate) struct MessageProcessorArgs {
@@ -224,46 +227,50 @@ impl MessageProcessor {
&mut self,
connection_id: ConnectionId,
request: JSONRPCRequest,
transport: AppServerTransport,
session: &mut ConnectionSessionState,
outbound_initialized: &AtomicBool,
) {
let request_method = request.method.as_str();
tracing::trace!(
?connection_id,
request_id = ?request.id,
"app-server request: {request_method}"
);
let request_id = ConnectionRequestId {
connection_id,
request_id: request.id.clone(),
};
let request_json = match serde_json::to_value(&request) {
Ok(request_json) => request_json,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("Invalid request: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
let request_span =
crate::app_server_tracing::request_span(&request, transport, connection_id, session);
async {
let request_method = request.method.as_str();
tracing::trace!(
?connection_id,
request_id = ?request.id,
"app-server request: {request_method}"
);
let request_id = ConnectionRequestId {
connection_id,
request_id: request.id.clone(),
};
let request_json = match serde_json::to_value(&request) {
Ok(request_json) => request_json,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("Invalid request: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
let codex_request = match serde_json::from_value::<ClientRequest>(request_json) {
Ok(codex_request) => codex_request,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("Invalid request: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
let codex_request = match serde_json::from_value::<ClientRequest>(request_json) {
Ok(codex_request) => codex_request,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("Invalid request: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
match codex_request {
match codex_request {
// Handle Initialize internally so CodexMessageProcessor does not have to concern
// itself with the `initialized` bool.
ClientRequest::Initialize { request_id, params } => {
@@ -304,6 +311,8 @@ impl MessageProcessor {
title: _title,
version,
} = params.client_info;
session.app_server_client_name = Some(name.clone());
session.client_version = Some(version.clone());
if let Err(error) = set_default_originator(name.clone()) {
match error {
SetOriginatorError::InvalidHeaderValue => {
@@ -330,7 +339,6 @@ impl MessageProcessor {
if let Ok(mut suffix) = USER_AGENT_SUFFIX.lock() {
*suffix = Some(user_agent_suffix);
}
session.app_server_client_name = Some(name.clone());
let user_agent = get_codex_user_agent();
let response = InitializeResponse { user_agent };
@@ -355,91 +363,97 @@ impl MessageProcessor {
return;
}
}
}
}
if let Some(reason) = codex_request.experimental_reason()
&& !session.experimental_api_enabled
{
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: experimental_required_message(reason),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
if let Some(reason) = codex_request.experimental_reason()
&& !session.experimental_api_enabled
{
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: experimental_required_message(reason),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
match codex_request {
ClientRequest::ConfigRead { request_id, params } => {
self.handle_config_read(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
}
ClientRequest::ExternalAgentConfigDetect { request_id, params } => {
self.handle_external_agent_config_detect(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
}
ClientRequest::ExternalAgentConfigImport { request_id, params } => {
self.handle_external_agent_config_import(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
}
ClientRequest::ConfigValueWrite { request_id, params } => {
self.handle_config_value_write(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
}
ClientRequest::ConfigBatchWrite { request_id, params } => {
self.handle_config_batch_write(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
}
ClientRequest::ConfigRequirementsRead {
request_id,
params: _,
} => {
self.handle_config_requirements_read(ConnectionRequestId {
connection_id,
request_id,
})
.await;
}
other => {
// Box the delegated future so this wrapper's async state machine does not
// inline the full `CodexMessageProcessor::process_request` future, which
// can otherwise push worker-thread stack usage over the edge.
self.codex_message_processor
.process_request(connection_id, other, session.app_server_client_name.clone())
.boxed()
match codex_request {
ClientRequest::ConfigRead { request_id, params } => {
self.handle_config_read(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
}
ClientRequest::ExternalAgentConfigDetect { request_id, params } => {
self.handle_external_agent_config_detect(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
}
ClientRequest::ExternalAgentConfigImport { request_id, params } => {
self.handle_external_agent_config_import(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
}
ClientRequest::ConfigValueWrite { request_id, params } => {
self.handle_config_value_write(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
}
ClientRequest::ConfigBatchWrite { request_id, params } => {
self.handle_config_batch_write(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
}
ClientRequest::ConfigRequirementsRead {
request_id,
params: _,
} => {
self.handle_config_requirements_read(ConnectionRequestId {
connection_id,
request_id,
})
.await;
}
other => {
// Box the delegated future so this wrapper's async state machine does not
// inline the full `CodexMessageProcessor::process_request` future, which
// can otherwise push worker-thread stack usage over the edge.
self.codex_message_processor
.process_request(
connection_id,
other,
session.app_server_client_name.clone(),
)
.boxed()
.await;
}
}
}
.instrument(request_span)
.await;
}
pub(crate) async fn process_notification(&self, notification: JSONRPCNotification) {

View File

@@ -744,6 +744,7 @@ mod tests {
id: codex_app_server_protocol::RequestId::Integer(7),
method: "config/read".to_string(),
params: Some(json!({ "includeLayers": false })),
trace: None,
});
assert!(
enqueue_incoming_message(&transport_event_tx, &writer_tx, connection_id, request).await
@@ -885,6 +886,7 @@ mod tests {
id: codex_app_server_protocol::RequestId::Integer(7),
method: "config/read".to_string(),
params: Some(json!({ "includeLayers": false })),
trace: None,
});
let enqueue_result = tokio::time::timeout(

View File

@@ -891,6 +891,7 @@ impl McpProcess {
id: RequestId::Integer(request_id),
method: method.to_string(),
params,
trace: None,
});
self.send_jsonrpc_message(message).await?;
Ok(request_id)

View File

@@ -30,7 +30,7 @@ async fn app_server_default_analytics_disabled_without_flag() -> Result<()> {
let provider = codex_core::otel_init::build_provider(
&config,
SERVICE_VERSION,
Some("codex_app_server"),
Some("codex-app-server"),
false,
)
.map_err(|err| anyhow::anyhow!(err.to_string()))?;
@@ -55,7 +55,7 @@ async fn app_server_default_analytics_enabled_with_flag() -> Result<()> {
let provider = codex_core::otel_init::build_provider(
&config,
SERVICE_VERSION,
Some("codex_app_server"),
Some("codex-app-server"),
true,
)
.map_err(|err| anyhow::anyhow!(err.to_string()))?;

View File

@@ -174,6 +174,7 @@ pub(super) async fn send_request(
id: RequestId::Integer(id),
method: method.to_string(),
params,
trace: None,
});
send_jsonrpc(stream, message).await
}

View File

@@ -6,7 +6,6 @@ mod macos;
mod tests;
use crate::config::ConfigToml;
use crate::config::deserialize_config_toml_with_base;
use crate::config_loader::layer_io::LoadedConfigLayers;
use crate::git_info::resolve_root_git_project_for_trust;
use codex_app_server_protocol::ConfigLayerSource;
@@ -576,6 +575,11 @@ struct ProjectTrustContext {
user_config_file: AbsolutePathBuf,
}
#[derive(Deserialize)]
struct ProjectTrustConfigToml {
projects: Option<std::collections::HashMap<String, crate::config::ProjectConfig>>,
}
struct ProjectTrustDecision {
trust_level: Option<TrustLevel>,
trust_key: String,
@@ -666,10 +670,16 @@ async fn project_trust_context(
config_base_dir: &Path,
user_config_file: &AbsolutePathBuf,
) -> io::Result<ProjectTrustContext> {
let config_toml = deserialize_config_toml_with_base(merged_config.clone(), config_base_dir)?;
let project_trust_config: ProjectTrustConfigToml = {
let _guard = AbsolutePathBufGuard::new(config_base_dir);
merged_config
.clone()
.try_into()
.map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidData, err))?
};
let project_root = find_project_root(cwd, project_root_markers).await?;
let projects = config_toml.projects.unwrap_or_default();
let projects = project_trust_config.projects.unwrap_or_default();
let project_root_key = project_root.as_path().to_string_lossy().to_string();
let repo_root = resolve_root_git_project_for_trust(cwd.as_path());

View File

@@ -1114,6 +1114,91 @@ async fn project_layers_disabled_when_untrusted_or_unknown() -> std::io::Result<
Ok(())
}
#[tokio::test]
async fn cli_override_can_update_project_local_mcp_server_when_project_is_trusted()
-> std::io::Result<()> {
let tmp = tempdir()?;
let project_root = tmp.path().join("project");
let nested = project_root.join("child");
let dot_codex = project_root.join(".codex");
let codex_home = tmp.path().join("home");
tokio::fs::create_dir_all(&nested).await?;
tokio::fs::create_dir_all(&dot_codex).await?;
tokio::fs::create_dir_all(&codex_home).await?;
tokio::fs::write(project_root.join(".git"), "gitdir: here").await?;
tokio::fs::write(
dot_codex.join(CONFIG_TOML_FILE),
r#"
[mcp_servers.sentry]
url = "https://mcp.sentry.dev/mcp"
enabled = false
"#,
)
.await?;
make_config_for_test(&codex_home, &project_root, TrustLevel::Trusted, None).await?;
let config = ConfigBuilder::default()
.codex_home(codex_home)
.cli_overrides(vec![(
"mcp_servers.sentry.enabled".to_string(),
TomlValue::Boolean(true),
)])
.fallback_cwd(Some(nested))
.build()
.await?;
let server = config
.mcp_servers
.get()
.get("sentry")
.expect("trusted project MCP server should load");
assert!(server.enabled);
Ok(())
}
#[tokio::test]
async fn cli_override_for_disabled_project_local_mcp_server_returns_invalid_transport()
-> std::io::Result<()> {
let tmp = tempdir()?;
let project_root = tmp.path().join("project");
let nested = project_root.join("child");
let dot_codex = project_root.join(".codex");
let codex_home = tmp.path().join("home");
tokio::fs::create_dir_all(&nested).await?;
tokio::fs::create_dir_all(&dot_codex).await?;
tokio::fs::create_dir_all(&codex_home).await?;
tokio::fs::write(project_root.join(".git"), "gitdir: here").await?;
tokio::fs::write(
dot_codex.join(CONFIG_TOML_FILE),
r#"
[mcp_servers.sentry]
url = "https://mcp.sentry.dev/mcp"
enabled = false
"#,
)
.await?;
let err = ConfigBuilder::default()
.codex_home(codex_home)
.cli_overrides(vec![(
"mcp_servers.sentry.enabled".to_string(),
TomlValue::Boolean(true),
)])
.fallback_cwd(Some(nested))
.build()
.await
.expect_err("untrusted project layer should not provide MCP transport");
assert!(
err.to_string().contains("invalid transport")
&& err.to_string().contains("mcp_servers.sentry"),
"unexpected error: {err}"
);
Ok(())
}
#[tokio::test]
async fn invalid_project_config_ignored_when_untrusted_or_unknown() -> std::io::Result<()> {
let tmp = tempdir()?;

View File

@@ -1,6 +1,7 @@
pub mod config;
pub mod metrics;
pub mod otel_provider;
pub mod trace_context;
pub mod traces;
mod otlp;
@@ -23,6 +24,11 @@ use tracing::debug;
pub use crate::metrics::runtime_metrics::RuntimeMetricTotals;
pub use crate::metrics::runtime_metrics::RuntimeMetricsSummary;
pub use crate::otel_provider::traceparent_context_from_env;
pub use crate::trace_context::context_from_w3c_trace_context;
pub use crate::trace_context::current_span_w3c_trace_context;
pub use crate::trace_context::set_parent_from_context;
pub use crate::trace_context::set_parent_from_w3c_trace_context;
#[derive(Debug, Clone, Serialize, Display)]
#[serde(rename_all = "snake_case")]

View File

@@ -3,13 +3,12 @@ use crate::config::OtelHttpProtocol;
use crate::config::OtelSettings;
use crate::metrics::MetricsClient;
use crate::metrics::MetricsConfig;
use crate::trace_context::context_from_trace_headers;
use gethostname::gethostname;
use opentelemetry::Context;
use opentelemetry::KeyValue;
use opentelemetry::context::ContextGuard;
use opentelemetry::global;
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry::trace::TraceContextExt;
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_otlp::LogExporter;
@@ -30,7 +29,6 @@ use opentelemetry_sdk::trace::SdkTracerProvider;
use opentelemetry_sdk::trace::Tracer;
use opentelemetry_semantic_conventions as semconv;
use std::cell::RefCell;
use std::collections::HashMap;
use std::env;
use std::error::Error;
use std::sync::OnceLock;
@@ -172,7 +170,7 @@ impl Drop for OtelProvider {
}
}
pub(crate) fn traceparent_context_from_env() -> Option<Context> {
pub fn traceparent_context_from_env() -> Option<Context> {
TRACEPARENT_CONTEXT
.get_or_init(load_traceparent_context)
.clone()
@@ -194,7 +192,7 @@ fn load_traceparent_context() -> Option<Context> {
let traceparent = env::var(TRACEPARENT_ENV_VAR).ok()?;
let tracestate = env::var(TRACESTATE_ENV_VAR).ok();
match extract_traceparent_context(traceparent, tracestate) {
match context_from_trace_headers(Some(&traceparent), tracestate.as_deref()) {
Some(context) => {
debug!("TRACEPARENT detected; continuing trace from parent context");
Some(context)
@@ -206,22 +204,6 @@ fn load_traceparent_context() -> Option<Context> {
}
}
fn extract_traceparent_context(traceparent: String, tracestate: Option<String>) -> Option<Context> {
let mut headers = HashMap::new();
headers.insert("traceparent".to_string(), traceparent);
if let Some(tracestate) = tracestate {
headers.insert("tracestate".to_string(), tracestate);
}
let context = TraceContextPropagator::new().extract(&headers);
let span = context.span();
let span_context = span.span_context();
if !span_context.is_valid() {
return None;
}
Some(context)
}
fn make_resource(settings: &OtelSettings) -> Resource {
Resource::builder()
.with_service_name(settings.service_name.clone())
@@ -407,8 +389,9 @@ mod tests {
fn parses_valid_traceparent() {
let trace_id = "00000000000000000000000000000001";
let span_id = "0000000000000002";
let context = extract_traceparent_context(format!("00-{trace_id}-{span_id}-01"), None)
.expect("trace context");
let context =
context_from_trace_headers(Some(&format!("00-{trace_id}-{span_id}-01")), None)
.expect("trace context");
let span = context.span();
let span_context = span.span_context();
assert_eq!(
@@ -421,7 +404,7 @@ mod tests {
#[test]
fn invalid_traceparent_returns_none() {
assert!(extract_traceparent_context("not-a-traceparent".to_string(), None).is_none());
assert!(context_from_trace_headers(Some("not-a-traceparent"), None).is_none());
}
#[test]

View File

@@ -0,0 +1,106 @@
use std::collections::HashMap;
use codex_protocol::protocol::W3cTraceContext;
use opentelemetry::Context;
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry::trace::TraceContextExt;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use tracing::Span;
use tracing_opentelemetry::OpenTelemetrySpanExt;
pub fn current_span_w3c_trace_context() -> Option<W3cTraceContext> {
let context = Span::current().context();
if !context.span().span_context().is_valid() {
return None;
}
let mut headers = HashMap::new();
TraceContextPropagator::new().inject_context(&context, &mut headers);
Some(W3cTraceContext {
traceparent: headers.remove("traceparent"),
tracestate: headers.remove("tracestate"),
})
}
pub fn context_from_w3c_trace_context(trace: &W3cTraceContext) -> Option<Context> {
context_from_trace_headers(trace.traceparent.as_deref(), trace.tracestate.as_deref())
}
pub fn set_parent_from_w3c_trace_context(span: &Span, trace: &W3cTraceContext) -> bool {
if let Some(context) = context_from_w3c_trace_context(trace) {
set_parent_from_context(span, context);
true
} else {
false
}
}
pub fn set_parent_from_context(span: &Span, context: Context) {
let _ = span.set_parent(context);
}
pub(crate) fn context_from_trace_headers(
traceparent: Option<&str>,
tracestate: Option<&str>,
) -> Option<Context> {
let traceparent = traceparent?;
let mut headers = HashMap::new();
headers.insert("traceparent".to_string(), traceparent.to_string());
if let Some(tracestate) = tracestate {
headers.insert("tracestate".to_string(), tracestate.to_string());
}
let context = TraceContextPropagator::new().extract(&headers);
if !context.span().span_context().is_valid() {
return None;
}
Some(context)
}
#[cfg(test)]
mod tests {
use super::context_from_trace_headers;
use super::context_from_w3c_trace_context;
use codex_protocol::protocol::W3cTraceContext;
use opentelemetry::trace::SpanId;
use opentelemetry::trace::TraceContextExt;
use opentelemetry::trace::TraceId;
use pretty_assertions::assert_eq;
#[test]
fn parses_valid_w3c_trace_context() {
let trace_id = "00000000000000000000000000000001";
let span_id = "0000000000000002";
let context = context_from_w3c_trace_context(&W3cTraceContext {
traceparent: Some(format!("00-{trace_id}-{span_id}-01")),
tracestate: None,
})
.expect("trace context");
let span = context.span();
let span_context = span.span_context();
assert_eq!(
span_context.trace_id(),
TraceId::from_hex(trace_id).unwrap()
);
assert_eq!(span_context.span_id(), SpanId::from_hex(span_id).unwrap());
assert!(span_context.is_remote());
}
#[test]
fn invalid_traceparent_returns_none() {
assert!(context_from_trace_headers(Some("not-a-traceparent"), None).is_none());
}
#[test]
fn missing_traceparent_returns_none() {
assert!(
context_from_w3c_trace_context(&W3cTraceContext {
traceparent: None,
tracestate: Some("vendor=value".to_string()),
})
.is_none()
);
}
}

View File

@@ -83,6 +83,16 @@ pub struct Submission {
pub op: Op,
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)]
pub struct W3cTraceContext {
#[serde(default, skip_serializing_if = "Option::is_none")]
#[ts(optional)]
pub traceparent: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[ts(optional)]
pub tracestate: Option<String>,
}
/// Config payload for refreshing MCP servers.
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, JsonSchema)]
pub struct McpServerRefreshConfig {

View File

@@ -20,7 +20,7 @@ decision to the shell-escalation protocol over a shared file descriptor (specifi
We carry a small patch to `execute_cmd.c` (see `patches/bash-exec-wrapper.patch`) that adds support for `EXEC_WRAPPER`. The original commit message is “add support for BASH_EXEC_WRAPPER” and the patch applies cleanly to `a8a1c2fac029404d3f42cd39f5a20f24b6e4fe4b` from https://github.com/bminor/bash. To rebuild manually:
```bash
git clone https://github.com/bminor/bash
git clone https://git.savannah.gnu.org/git/bash
git checkout a8a1c2fac029404d3f42cd39f5a20f24b6e4fe4b
git apply /path/to/patches/bash-exec-wrapper.patch
./configure --without-bash-malloc