mirror of
https://github.com/openai/codex.git
synced 2026-03-03 05:03:20 +00:00
Compare commits
3 Commits
fix/notify
...
owen/app_s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d29f364f23 | ||
|
|
7709bf32a3 | ||
|
|
3241c1c6cc |
6
.github/workflows/shell-tool-mcp.yml
vendored
6
.github/workflows/shell-tool-mcp.yml
vendored
@@ -146,9 +146,8 @@ jobs:
|
||||
shell: bash
|
||||
run: |
|
||||
set -euo pipefail
|
||||
git clone --depth 1 https://github.com/bolinfest/bash /tmp/bash
|
||||
git clone https://git.savannah.gnu.org/git/bash /tmp/bash
|
||||
cd /tmp/bash
|
||||
git fetch --depth 1 origin a8a1c2fac029404d3f42cd39f5a20f24b6e4fe4b
|
||||
git checkout a8a1c2fac029404d3f42cd39f5a20f24b6e4fe4b
|
||||
git apply "${GITHUB_WORKSPACE}/shell-tool-mcp/patches/bash-exec-wrapper.patch"
|
||||
./configure --without-bash-malloc
|
||||
@@ -188,9 +187,8 @@ jobs:
|
||||
shell: bash
|
||||
run: |
|
||||
set -euo pipefail
|
||||
git clone --depth 1 https://github.com/bolinfest/bash /tmp/bash
|
||||
git clone https://git.savannah.gnu.org/git/bash /tmp/bash
|
||||
cd /tmp/bash
|
||||
git fetch --depth 1 origin a8a1c2fac029404d3f42cd39f5a20f24b6e4fe4b
|
||||
git checkout a8a1c2fac029404d3f42cd39f5a20f24b6e4fe4b
|
||||
git apply "${GITHUB_WORKSPACE}/shell-tool-mcp/patches/bash-exec-wrapper.patch"
|
||||
./configure --without-bash-malloc
|
||||
|
||||
811
app_server_tracing_design.md
Normal file
811
app_server_tracing_design.md
Normal file
@@ -0,0 +1,811 @@
|
||||
# App-server v2 tracing design
|
||||
|
||||
This document proposes a simple, staged tracing design for
|
||||
`codex-rs/app-server` with these goals:
|
||||
|
||||
- support distributed tracing from client-initiated app-server work into
|
||||
app-server and `codex-core`
|
||||
- keep tracing consistent across the app-server v2 surface area
|
||||
- minimize tracing boilerplate in request handlers
|
||||
- avoid introducing tracing-owned lifecycle state that duplicates existing
|
||||
app-server runtime state
|
||||
|
||||
This design explicitly avoids a `RequestKind` taxonomy and avoids
|
||||
app-server-owned long-lived lifecycle span registries.
|
||||
|
||||
## Summary
|
||||
|
||||
The design has four major pieces:
|
||||
|
||||
1. A transport-level W3C trace carrier on inbound JSON-RPC request envelopes.
|
||||
2. A centralized app-server request tracing layer that wraps every inbound
|
||||
request in the same request span.
|
||||
3. An internal trace-context handoff through `codex_protocol::Submission` so
|
||||
work that continues in `codex-core` inherits the inbound app-server request
|
||||
ancestry.
|
||||
4. A core-owned long-lived turn span for turn-producing operations such as
|
||||
`turn/start` and `review/start`.
|
||||
|
||||
Every inbound JSON-RPC request gets a standardized request span.
|
||||
|
||||
When an app-server request submits work into core, the current span context is
|
||||
captured into `Submission.trace` when the `Submission` is created. Core then
|
||||
creates a short-lived dispatch span parented from that carrier and, for
|
||||
turn-producing operations, creates a long-lived turn span beneath it before
|
||||
continuing into its existing task and model request tracing.
|
||||
|
||||
Important:
|
||||
|
||||
- request spans stay short-lived
|
||||
- long-lived turn spans are owned by core, not app-server
|
||||
- the design does not add app-server-owned long-lived thread or realtime spans
|
||||
|
||||
## Design goals
|
||||
|
||||
- **Distributed tracing first**
|
||||
- Clients should be able to send trace context to app-server.
|
||||
- App-server should preserve that trace ancestry across the async handoff into
|
||||
core.
|
||||
- Existing core model request tracing should continue to inherit from the
|
||||
active core span once the handoff occurs.
|
||||
|
||||
- **Consistent request instrumentation**
|
||||
- Every inbound request should produce the same request span with the same
|
||||
base attributes.
|
||||
- Request tracing should be wired at the transport boundary, not repeated in
|
||||
individual handlers.
|
||||
|
||||
- **Minimal boilerplate**
|
||||
- Request handlers should not manually parse carriers or build request spans.
|
||||
- Existing calls to `thread.submit(...)` and similar APIs should pick up trace
|
||||
propagation automatically because `Submission` creation captures the active
|
||||
span context.
|
||||
|
||||
- **Minimal business logic pollution**
|
||||
- W3C parsing, OTEL conversion, and span-parenting rules should live in
|
||||
tracing-specific modules.
|
||||
- App-server business logic should stay focused on request handling, not span
|
||||
management.
|
||||
|
||||
- **Incremental rollout**
|
||||
- The first rollout should prove inbound request tracing and app-server ->
|
||||
core propagation.
|
||||
- Once propagation is in place, core should add a long-lived turn span so a
|
||||
single span covers the actual duration of a turn.
|
||||
- Thread and realtime lifecycle tracing should wait until there is a concrete
|
||||
need.
|
||||
|
||||
## Non-goals
|
||||
|
||||
- This design does not attempt to make every loaded thread or realtime session
|
||||
correspond to a long-lived tracing span.
|
||||
- This design does not add tracing-owned thread or realtime state stores in the
|
||||
initial design.
|
||||
- This design does not require every app-server v2 `*Params` type to carry
|
||||
trace metadata.
|
||||
- This design does not require outbound JSON-RPC trace propagation in the
|
||||
initial rollout.
|
||||
|
||||
## Why not `RequestKind`
|
||||
|
||||
An earlier direction considered a central `RequestKind` taxonomy such as
|
||||
`Unary`, `TurnLifecycle`, or `RealtimeLifecycle`.
|
||||
|
||||
That is workable, but it makes tracing depend on a classification that can
|
||||
drift from runtime behavior. The simpler design instead treats tracing as two
|
||||
generic mechanics:
|
||||
|
||||
- every inbound request gets the same request span
|
||||
- any async work that crosses from app-server into core gets the current span
|
||||
context attached to `Submission`
|
||||
|
||||
This keeps the initial implementation small and avoids turning tracing into a
|
||||
taxonomy maintenance problem.
|
||||
|
||||
## Terminology
|
||||
|
||||
- **Request span**
|
||||
- A short-lived span for one inbound JSON-RPC request to app-server.
|
||||
|
||||
- **W3C trace context**
|
||||
- A serializable representation of distributed trace context based on
|
||||
`traceparent` and `tracestate`.
|
||||
|
||||
- **Submission trace handoff**
|
||||
- The optional serialized trace context attached to
|
||||
`codex_protocol::Submission` so core can restore parentage after the
|
||||
app-server request handler returns.
|
||||
|
||||
- **Dispatch span**
|
||||
- A short-lived core span created when the submission loop receives a
|
||||
`Submission` with trace context.
|
||||
|
||||
- **Turn span**
|
||||
- A long-lived core-owned span representing the actual runtime of a turn from
|
||||
turn start until completion, interruption, or failure.
|
||||
|
||||
## High-level tracing model
|
||||
|
||||
### 1. Inbound request
|
||||
|
||||
For every inbound JSON-RPC request:
|
||||
|
||||
1. parse an optional W3C trace carrier from the JSON-RPC envelope
|
||||
2. create a standardized request span
|
||||
3. parent that span from the incoming carrier when present
|
||||
4. process the request inside that span
|
||||
|
||||
This is true for every request, regardless of whether the API is unary or
|
||||
starts work that continues later.
|
||||
|
||||
### 2. Async handoff into core
|
||||
|
||||
Some app-server requests submit work that continues in core after the original
|
||||
request returns. The critical example is `turn/start`, but the mechanism should
|
||||
be generic.
|
||||
|
||||
To preserve trace ancestry:
|
||||
|
||||
- add an optional `W3cTraceContext` to `codex_protocol::Submission`
|
||||
- capture the current span context into that field when constructing a
|
||||
`Submission` in core submission APIs such as `Codex::submit()` and
|
||||
`Codex::submit_with_id()`
|
||||
- have `codex-core` create a per-submission dispatch span parented from that
|
||||
carrier
|
||||
|
||||
This gives a clean causal chain:
|
||||
|
||||
- client span
|
||||
- app-server request span
|
||||
- core dispatch span
|
||||
- core turn span for turn-producing operations
|
||||
- existing core spans such as `run_turn`, sampling, and model request spans
|
||||
|
||||
### 3. Core-owned turn spans
|
||||
|
||||
For turn-producing operations such as `turn/start` and `review/start`:
|
||||
|
||||
- app-server creates the inbound request span
|
||||
- app-server propagates that request context through `Submission.trace`
|
||||
- core creates a dispatch span when it receives the submission
|
||||
- core then creates a long-lived turn span beneath that dispatch span
|
||||
- existing core work such as `run_turn` and model request tracing runs beneath
|
||||
the turn span
|
||||
|
||||
This keeps long-lived span ownership with the layer that actually owns turn
|
||||
execution and completion.
|
||||
|
||||
### 4. Defer thread and realtime lifecycle-heavy tracing
|
||||
|
||||
The design should not add:
|
||||
|
||||
- app-server-owned thread residency stores
|
||||
- app-server-owned realtime session stores
|
||||
|
||||
App-server already maintains thread subscription and runtime state in existing
|
||||
structures. If later tracing work needs thread loaded-duration or realtime
|
||||
duration metrics, that data should extend those existing structures rather than
|
||||
introducing a parallel tracing-only state machine.
|
||||
|
||||
## Span model by API shape
|
||||
|
||||
The initial implementation keeps the app-server side uniform.
|
||||
|
||||
### Unary request/response APIs
|
||||
|
||||
Examples:
|
||||
|
||||
- `thread/list`
|
||||
- `thread/read`
|
||||
- `model/list`
|
||||
- `config/read`
|
||||
- `skills/list`
|
||||
- `app/list`
|
||||
|
||||
Behavior:
|
||||
|
||||
- create request span
|
||||
- return response
|
||||
- no additional app-server span state
|
||||
|
||||
### Turn-producing APIs
|
||||
|
||||
Examples:
|
||||
|
||||
- `turn/start`
|
||||
- `review/start`
|
||||
- `thread/compact/start` when it executes as a normal turn lifecycle
|
||||
|
||||
Behavior:
|
||||
|
||||
- create request span
|
||||
- submit work under that request span
|
||||
- capture the current span context into `Submission.trace` when the
|
||||
`Submission` is created
|
||||
- let core create a dispatch span and then a long-lived turn span
|
||||
- let the turn span remain open until the real core turn lifecycle ends
|
||||
|
||||
Important: request spans should not stay open until eventual streamed
|
||||
completion. The request span ends quickly; the core-owned turn span carries the
|
||||
long-running work.
|
||||
|
||||
### Other APIs that submit work into core
|
||||
|
||||
Examples:
|
||||
|
||||
- `thread/realtime/start`
|
||||
- `thread/realtime/appendAudio`
|
||||
- `thread/realtime/appendText`
|
||||
- `thread/realtime/stop`
|
||||
|
||||
Behavior:
|
||||
|
||||
- create request span
|
||||
- submit work under that request span
|
||||
- capture the current span context into `Submission.trace` when the
|
||||
`Submission` is created
|
||||
- let core continue tracing from there
|
||||
|
||||
These APIs do not automatically imply a long-lived app-server or core lifecycle
|
||||
span in the initial design.
|
||||
|
||||
### Thread lifecycle APIs
|
||||
|
||||
Examples:
|
||||
|
||||
- `thread/start`
|
||||
- `thread/resume`
|
||||
- `thread/fork`
|
||||
- `thread/unsubscribe`
|
||||
|
||||
Behavior in the initial design:
|
||||
|
||||
- create request span
|
||||
- annotate with `thread.id` when known
|
||||
- do not introduce separate app-server lifecycle spans or tracing-only state
|
||||
|
||||
If later work needs thread loaded/unloaded metrics, it should reuse the existing
|
||||
thread runtime state already maintained by app-server.
|
||||
|
||||
## Where the code should live
|
||||
|
||||
### `codex-rs/protocol`
|
||||
|
||||
Add a small shared `W3cTraceContext` type to
|
||||
[`codex-rs/protocol/src/protocol.rs`](/Users/owen/repos/codex3/codex-rs/protocol/src/protocol.rs).
|
||||
|
||||
Responsibilities:
|
||||
|
||||
- define a serializable W3C trace context type
|
||||
- avoid direct dependence on OTEL runtime types
|
||||
- be usable from both protocol crates and runtime crates
|
||||
|
||||
Suggested contents:
|
||||
|
||||
- `W3cTraceContext`
|
||||
- `traceparent: Option<String>`
|
||||
- `tracestate: Option<String>`
|
||||
|
||||
Suggested `Submission` change:
|
||||
|
||||
- `Submission { id, op, trace: Option<W3cTraceContext> }`
|
||||
|
||||
This is the only new internal async handoff needed for the initial rollout.
|
||||
|
||||
### `codex-rs/otel`
|
||||
|
||||
Add a small helper module or extend existing tracing helpers so OTEL-specific
|
||||
logic stays centralized.
|
||||
|
||||
Responsibilities:
|
||||
|
||||
- convert `W3cTraceContext` -> OTEL `Context`
|
||||
- convert the current tracing span context -> `W3cTraceContext`
|
||||
- parent a tracing span from an explicit carrier when present
|
||||
- keep env `TRACEPARENT` / `TRACESTATE` lookup as an explicit helper used at
|
||||
defined entrypoints, not as an implicit side effect of generic OTEL provider
|
||||
initialization for app-server
|
||||
- apply precedence rules:
|
||||
- app-server inbound request spans: parent from request `trace` when present,
|
||||
else from env `TRACEPARENT` / `TRACESTATE` during migration, otherwise
|
||||
create a new root span
|
||||
- app-server submission dispatch spans: parent from `Submission.trace` when
|
||||
present, otherwise inherit naturally from the current span or create a root
|
||||
span
|
||||
- env `TRACEPARENT` / `TRACESTATE` fallback is not used for app-server
|
||||
submission dispatch spans or deeper app-server/core spans
|
||||
- env `TRACEPARENT` / `TRACESTATE` remains available for non-server
|
||||
entrypoints or process-level startup tracing
|
||||
|
||||
Temporary compatibility during rollout:
|
||||
|
||||
- Codex Cloud currently injects `TRACEPARENT` /
|
||||
`TRACESTATE` into the app-server process environment
|
||||
- that env-based propagation should be treated as legacy compatibility while
|
||||
clients migrate to sending request-level `trace` carriers on every JSON-RPC
|
||||
request
|
||||
- during that migration, only inbound app-server request span creation may fall
|
||||
back to env when request `trace` is absent
|
||||
- `codex-otel` should not keep app-server under ambient env parentage by
|
||||
automatically attaching `TRACEPARENT` / `TRACESTATE` during provider
|
||||
initialization; app-server request tracing should opt into env fallback only
|
||||
when building the inbound request span
|
||||
- once request-level carriers are in use for app-server clients, remove the
|
||||
env-based request propagation path rather than keeping both mechanisms active
|
||||
|
||||
Required migration rule:
|
||||
|
||||
- app-server submission dispatch spans and downstream spans must not consult env
|
||||
`TRACEPARENT` / `TRACESTATE` when determining span parentage
|
||||
- once app-server inbound request spans or `Submission.trace` provide an
|
||||
explicit parent, downstream spans must inherit from the current span and must
|
||||
not re-parent themselves from env `TRACEPARENT` / `TRACESTATE`
|
||||
- update both categories of existing env-based parenting so they do not
|
||||
override explicit request/submission ancestry:
|
||||
- lower-level callsites such as `apply_traceparent_parent` on `run_turn`
|
||||
- provider-init behavior in `codex-otel` that eagerly attaches env trace
|
||||
context for the process/thread
|
||||
|
||||
Important:
|
||||
|
||||
- keep this focused on carrier parsing and span parenting
|
||||
- do not move app-server runtime state into `codex-otel`
|
||||
- do not overload `OtelManager` with app-server lifecycle ownership in the
|
||||
initial design
|
||||
|
||||
### `codex-rs/app-server-protocol`
|
||||
|
||||
Extend inbound JSON-RPC request envelopes in
|
||||
[`codex-rs/app-server-protocol/src/jsonrpc_lite.rs`](/Users/owen/repos/codex3/codex-rs/app-server-protocol/src/jsonrpc_lite.rs)
|
||||
with a dedicated optional trace carrier field.
|
||||
|
||||
Suggested shape:
|
||||
|
||||
- `JSONRPCRequest { id, method, params, trace }`
|
||||
|
||||
Where:
|
||||
|
||||
- `trace: Option<W3cTraceContext>`
|
||||
|
||||
Important:
|
||||
|
||||
- use a dedicated tracing field, not a generic `meta` bag
|
||||
- keep tracing transport-level and method-agnostic
|
||||
- do not add trace fields to individual `*Params` business payloads
|
||||
|
||||
### `codex-rs/core`
|
||||
|
||||
Make small changes in the submission path in
|
||||
[`codex-rs/core/src/codex.rs`](/Users/owen/repos/codex3/codex-rs/core/src/codex.rs).
|
||||
|
||||
Responsibilities:
|
||||
|
||||
- capture the current span context when a `Submission` is created in
|
||||
`Codex::submit()` / `Codex::submit_with_id()`
|
||||
- read `Submission.trace`
|
||||
- create a per-submission dispatch span parented from that carrier
|
||||
- run existing submission handling under that span
|
||||
- do not use env `TRACEPARENT` / `TRACESTATE` fallback in app-server submission
|
||||
handling or deeper core spans created from app-server work
|
||||
- replace lower-level env-based re-parenting where needed so explicit
|
||||
`Submission.trace` ancestry remains authoritative
|
||||
|
||||
This is enough for existing core tracing to inherit the correct ancestry, and
|
||||
it is the right place to add the long-lived turn span required for turn
|
||||
lifecycles.
|
||||
|
||||
For turn-producing operations, core responsibilities should include:
|
||||
|
||||
- read `Submission.trace`
|
||||
- create a per-submission dispatch span parented from that carrier
|
||||
- create a long-lived turn span beneath the dispatch span when the operation
|
||||
actually starts a turn
|
||||
- finish that turn span when the real core turn lifecycle completes,
|
||||
interrupts, or fails
|
||||
|
||||
### `codex-rs/app-server`
|
||||
|
||||
Add a small dedicated tracing module rather than spreading request tracing logic
|
||||
across handlers. A likely shape is:
|
||||
|
||||
- `app_server_tracing/mod.rs`
|
||||
- `app_server_tracing/request_spans.rs`
|
||||
- `app_server_tracing/incoming.rs`
|
||||
|
||||
Responsibilities:
|
||||
|
||||
- extract incoming W3C trace carriers from JSON-RPC requests
|
||||
- build standardized request spans
|
||||
- provide a small API that wraps request handling in the correct span
|
||||
|
||||
Non-responsibilities in the initial design:
|
||||
|
||||
- no thread residency registry
|
||||
- no realtime session registry
|
||||
|
||||
## Standardized request spans
|
||||
|
||||
Every inbound request should use the same request-span builder.
|
||||
|
||||
Suggested name:
|
||||
|
||||
- `app_server.request`
|
||||
|
||||
Suggested attributes:
|
||||
|
||||
- `rpc.system = "jsonrpc"`
|
||||
- `rpc.service = "codex-app-server"`
|
||||
- `rpc.method`
|
||||
- `rpc.transport`
|
||||
- `stdio`
|
||||
- `websocket`
|
||||
- `rpc.request_id`
|
||||
- `app_server.connection_id`
|
||||
- `app_server.api_version = "v2"` when applicable
|
||||
- `app_server.client_name` when known from initialize
|
||||
- `app_server.client_version` when known
|
||||
|
||||
Optional useful attributes:
|
||||
|
||||
- `thread.id` when already known from params
|
||||
- `turn.id` when already known from params
|
||||
|
||||
Important:
|
||||
|
||||
- the span factory should be the only place that assembles these fields
|
||||
- handlers should not manually construct request-span attributes
|
||||
- for the `initialize` request itself, read `clientInfo.name` and
|
||||
`clientInfo.version` directly from the request params when present
|
||||
- for later requests on the same connection, read client metadata from
|
||||
per-connection session state populated during `initialize`
|
||||
|
||||
## No app-server tracing registries
|
||||
|
||||
The design should not introduce app-server-owned tracing registries for turns,
|
||||
threads, or realtime sessions.
|
||||
|
||||
Why:
|
||||
|
||||
- app-server already has thread subscription and runtime state
|
||||
- core already owns the real task and turn lifecycle
|
||||
- a second tracing-specific state machine adds more code and more ways for
|
||||
lifecycle tracking to drift
|
||||
|
||||
Future guidance:
|
||||
|
||||
- if thread loaded/unloaded metrics become important, extend existing app-server
|
||||
thread state
|
||||
- keep long-lived turn spans in core
|
||||
- if realtime lifecycle metrics become important, extend the existing realtime
|
||||
runtime path rather than creating a parallel tracing store
|
||||
|
||||
## No direct span construction in handlers
|
||||
|
||||
Request handlers should not call `info_span!`, `trace_span!`, `set_parent`, or
|
||||
OTEL APIs directly for app-server request tracing.
|
||||
|
||||
Instead:
|
||||
|
||||
- `message_processor` should wrap inbound request handling through the
|
||||
centralized request-span helper
|
||||
- `Codex::submit()` / `Codex::submit_with_id()` should capture the current span
|
||||
context into `Submission.trace` when constructing the `Submission`
|
||||
|
||||
That keeps request tracing transport-level and largely invisible to business
|
||||
handlers.
|
||||
|
||||
## Layering
|
||||
|
||||
The intended call graph is:
|
||||
|
||||
- `message_processor` -> `app_server_tracing`
|
||||
- create and enter the standardized inbound request span
|
||||
- `Codex::submit()` / `Codex::submit_with_id()` -> `codex-otel` trace-context
|
||||
helper
|
||||
- snapshot the current span context into `Submission.trace` when the
|
||||
`Submission` is created
|
||||
- `codex-core` submission loop -> `codex-otel` trace-context helper
|
||||
- create a dispatch span parented from `Submission.trace`
|
||||
- create a long-lived turn span for turn-producing operations
|
||||
|
||||
Important:
|
||||
|
||||
- app-server owns inbound request tracing
|
||||
- core owns execution after the async handoff
|
||||
- core owns long-lived turn spans
|
||||
- the design does not add app-server-owned long-lived thread or realtime spans
|
||||
|
||||
## Inbound flow in app-server
|
||||
|
||||
The inbound request path should work like this:
|
||||
|
||||
1. Parse the JSON-RPC request envelope, including `trace`.
|
||||
2. Use the tracing module to create a request span parented from that request
|
||||
carrier when present, otherwise fall back to env `TRACEPARENT` /
|
||||
`TRACESTATE` during migration, otherwise create a new root request span.
|
||||
3. Process the request inside that span.
|
||||
4. If the request submits work into core, capture the active span context into
|
||||
`Submission.trace` when the `Submission` is created.
|
||||
|
||||
Integration point:
|
||||
|
||||
- [`codex-rs/app-server/src/message_processor.rs`](/Users/owen/repos/codex3/codex-rs/app-server/src/message_processor.rs)
|
||||
|
||||
## Core handoff flow
|
||||
|
||||
The `turn/start` and similar flows cross an async boundary:
|
||||
|
||||
- app-server handler submits work
|
||||
- core submission loop receives `Submission`
|
||||
- actual work continues later on different tasks
|
||||
|
||||
To preserve parentage:
|
||||
|
||||
1. app-server request handling runs inside `app_server.request`
|
||||
2. `Codex::submit()` / `Codex::submit_with_id()` capture that active context
|
||||
into `Submission.trace` when constructing the `Submission`
|
||||
3. core submission loop creates a dispatch span parented from `Submission.trace`
|
||||
without consulting env `TRACEPARENT` / `TRACESTATE`
|
||||
4. if the submission starts a turn, core creates a long-lived turn span beneath
|
||||
that dispatch span
|
||||
5. existing core spans naturally nest under the turn span
|
||||
|
||||
This lets:
|
||||
|
||||
- submission handling
|
||||
- a single long-lived turn span for turn-producing APIs
|
||||
- `run_turn`
|
||||
- model client request tracing
|
||||
|
||||
inherit the app-server request trace without broad tracing changes across core.
|
||||
|
||||
## Behavior for key v2 APIs
|
||||
|
||||
### `thread/start`
|
||||
|
||||
- create request span
|
||||
- annotate with `thread.id` once known
|
||||
- send response and `thread/started`
|
||||
- no separate thread lifecycle span in the initial design
|
||||
|
||||
### `thread/resume`
|
||||
|
||||
- create request span
|
||||
- annotate with `thread.id` when known
|
||||
- no separate lifecycle span
|
||||
|
||||
### `thread/fork`
|
||||
|
||||
- create request span
|
||||
- annotate with the new `thread.id`
|
||||
- no separate lifecycle span
|
||||
|
||||
### `thread/unsubscribe`
|
||||
|
||||
- create request span
|
||||
- no separate unload span
|
||||
- if later thread unload metrics are needed, reuse existing thread state rather
|
||||
than adding a tracing-only registry
|
||||
|
||||
### `turn/start`
|
||||
|
||||
- create request span
|
||||
- submit work into core under that request span
|
||||
- propagate the active span context through `Submission.trace` when the
|
||||
`Submission` is created
|
||||
- let core create a dispatch span and then a long-lived turn span
|
||||
- let that turn span cover the full duration until completion, interruption, or
|
||||
failure
|
||||
|
||||
### `turn/steer`
|
||||
|
||||
- create request span
|
||||
- if the request submits core work, propagate via `Submission.trace`
|
||||
- otherwise request span only
|
||||
|
||||
### `turn/interrupt`
|
||||
|
||||
- create request span
|
||||
- request span only unless core submission is involved
|
||||
|
||||
### `review/start`
|
||||
|
||||
- treat like `turn/start`
|
||||
- let core create the same kind of long-lived turn span
|
||||
|
||||
### `thread/realtime/start`, `appendAudio`, `appendText`, `stop`
|
||||
|
||||
- create request span
|
||||
- if the API submits work into core, propagate via `Submission.trace` when the
|
||||
`Submission` is created
|
||||
- do not introduce separate realtime lifecycle spans in the initial design
|
||||
|
||||
### Unary methods such as `thread/list`
|
||||
|
||||
- create request span only
|
||||
|
||||
## Runtime checks
|
||||
|
||||
Keep runtime checks narrowly scoped in the initial rollout:
|
||||
|
||||
- warn when an inbound trace carrier is present but invalid
|
||||
- test that `Submission.trace` is set when work is submitted from a traced
|
||||
request
|
||||
|
||||
Do not add lifecycle consistency checks for tracing registries that do not
|
||||
exist yet.
|
||||
|
||||
## Tests
|
||||
|
||||
Add tests for the initial mechanics:
|
||||
|
||||
- inbound request tracing accepts a valid W3C carrier
|
||||
- invalid carriers are ignored cleanly
|
||||
- unary methods create request spans without needing any extra handler changes
|
||||
- `turn/start` propagates request ancestry through `Submission.trace` into core
|
||||
- `turn/start` creates a long-lived core-owned turn span
|
||||
- the turn span closes on completion, interruption, or failure
|
||||
- existing core spans inherit from the propagated parent
|
||||
- inbound app-server request spans prefer request `trace` and may temporarily
|
||||
fall back to env `TRACEPARENT` / `TRACESTATE` during client migration
|
||||
- app-server submission and downstream spans do not use env `TRACEPARENT` /
|
||||
`TRACESTATE` fallback
|
||||
- explicit transport or `Submission.trace` parents are not overwritten by
|
||||
lower-level env-based re-parenting
|
||||
|
||||
The goal is to verify the centralized propagation behavior, not to exhaustively
|
||||
test OTEL internals.
|
||||
|
||||
## Suggested PR sequence
|
||||
|
||||
### PR 1: Foundation plus inbound request spans
|
||||
|
||||
Scope:
|
||||
|
||||
1. Introduce a shared `W3cTraceContext` type in `codex-protocol`.
|
||||
2. Add `trace` to inbound JSON-RPC request envelopes in app-server protocol.
|
||||
3. Add focused trace-context helpers in `codex-rs/otel`.
|
||||
4. Add the centralized app-server request tracing module.
|
||||
5. Wrap inbound request handling in `message_processor.rs`.
|
||||
|
||||
Why this PR:
|
||||
|
||||
- proves the transport and request-span shape with minimal scope
|
||||
- gives all inbound app-server APIs consistent request tracing immediately
|
||||
- avoids mixing lifecycle questions into the initial plumbing review
|
||||
|
||||
### PR 2: Async handoff into core via `Submission`
|
||||
|
||||
Scope:
|
||||
|
||||
1. Add `trace` to `Submission`.
|
||||
2. Capture the current span context automatically when constructing
|
||||
`Submission` values in `Codex::submit()` / `Codex::submit_with_id()`.
|
||||
3. Have the core submission loop restore parentage with a dispatch span.
|
||||
4. Remove or gate both lower-level env-based re-parenting and provider-init
|
||||
env attachment in `codex-otel` so `Submission.trace` remains the
|
||||
authoritative parent when present.
|
||||
5. Validate the flow with `turn/start`.
|
||||
|
||||
Why this PR:
|
||||
|
||||
- validates the critical async handoff from app-server into core
|
||||
- proves that existing core tracing can inherit the app-server request ancestry
|
||||
- keeps the behavior change focused on one boundary
|
||||
|
||||
### PR 3: Core-owned long-lived turn spans
|
||||
|
||||
Scope:
|
||||
|
||||
1. Add a long-lived turn span in core for `turn/start`.
|
||||
2. Reuse the same turn-span pattern for `review/start`.
|
||||
3. Ensure the span closes on completion, interruption, or failure.
|
||||
|
||||
Why this PR:
|
||||
|
||||
- completes the minimum useful tracing story for turn lifecycles
|
||||
- keeps long-lived span ownership in the layer that actually owns the turn
|
||||
- still builds on the simpler propagation model from PR 2 instead of mixing
|
||||
everything into one change
|
||||
|
||||
### PR 4: Use request-level trace instead of env var in Python
|
||||
|
||||
Scope:
|
||||
|
||||
1. Update the Python app-server launcher/client to send `trace` on each
|
||||
JSON-RPC request.
|
||||
2. Reuse the existing upstream trace context in request envelopes during the
|
||||
initial migration so end-to-end parentage is preserved before env fallback is
|
||||
removed.
|
||||
3. Continue preferring request-level `trace` over env once both exist.
|
||||
|
||||
Why this PR:
|
||||
|
||||
- preserves end-to-end parentage while the Rust side is migrating away from
|
||||
env-based request propagation
|
||||
- validates the transport-level tracing design with a real client
|
||||
- moves trace propagation from process scope toward request scope without
|
||||
waiting for the final cleanup
|
||||
|
||||
### PR 5: Remove support for `TRACEPARENT` / `TRACESTATE` when using codex app-server
|
||||
|
||||
Scope:
|
||||
|
||||
1. Stop consulting env `TRACEPARENT` / `TRACESTATE` for app-server inbound
|
||||
request span creation.
|
||||
2. Remove app-server-specific env injection from launchers once request-level
|
||||
`trace` carriers are in use.
|
||||
3. Keep env-based propagation only for non-server entrypoints or process-level
|
||||
startup tracing if still needed, and require those entrypoints to opt in
|
||||
explicitly rather than inheriting env parentage from generic provider init.
|
||||
4. Validate that app-server requests still preserve parentage through explicit
|
||||
request carriers.
|
||||
|
||||
Why this PR:
|
||||
|
||||
- completes the migration from process-scoped to request-scoped tracing for
|
||||
app-server
|
||||
- removes duplicated propagation mechanisms and future ambiguity
|
||||
- aligns the implementation with the long-term design boundary
|
||||
|
||||
### PR 6: Optional follow-ups
|
||||
|
||||
Possible follow-ups:
|
||||
|
||||
1. Reuse existing app-server thread state to add thread loaded/unloaded duration
|
||||
metrics if needed.
|
||||
2. Reuse existing realtime runtime state to add realtime duration metrics if
|
||||
needed.
|
||||
3. Add outbound JSON-RPC trace propagation only if there is a concrete
|
||||
client-side tracing use case.
|
||||
|
||||
## Rollout guidance
|
||||
|
||||
Start with:
|
||||
|
||||
- inbound request spans for all app-server requests
|
||||
- `turn/start` request -> core propagation
|
||||
- a core-owned long-lived turn span for `turn/start`
|
||||
|
||||
Migration note:
|
||||
|
||||
- the Rust implementation should treat request-level `trace` carriers as the
|
||||
real app-server tracing path immediately
|
||||
- env `TRACEPARENT` / `TRACESTATE` should remain only as temporary compatibility
|
||||
for existing launchers, and only at the inbound request boundary, while
|
||||
clients are updated to send `trace` on every JSON-RPC request
|
||||
- after client rollout, remove the env-based app-server request propagation
|
||||
path instead of keeping it as a long-term fallback
|
||||
|
||||
Those pieces exercise the important mechanics:
|
||||
|
||||
- inbound carrier extraction
|
||||
- request span creation
|
||||
- async handoff into core
|
||||
- inherited core tracing beneath the propagated parent
|
||||
- a single span covering the full duration of a turn
|
||||
|
||||
After that, only add more lifecycle-specific tracing if a real debugging or
|
||||
observability gap remains.
|
||||
|
||||
## Bottom line
|
||||
|
||||
The recommended initial design is:
|
||||
|
||||
- trace context on inbound JSON-RPC request envelopes
|
||||
- one standardized request span for every inbound request
|
||||
- automatic propagation through `Submission` into core
|
||||
- core-owned long-lived turn spans for turn-producing APIs
|
||||
- OTEL conversion and carrier logic centralized in `codex-otel`
|
||||
- env `TRACEPARENT` / `TRACESTATE` kept only as temporary compatibility during
|
||||
launcher migration, not as the steady-state app-server request propagation
|
||||
mechanism
|
||||
- no app-server-owned tracing registries for turns, threads, or realtime
|
||||
sessions in the initial implementation
|
||||
|
||||
This gives app-server distributed tracing that is:
|
||||
|
||||
- consistent
|
||||
- low-boilerplate
|
||||
- modular
|
||||
- aligned with the existing ownership boundaries in app-server and core
|
||||
1
codex-rs/Cargo.lock
generated
1
codex-rs/Cargo.lock
generated
@@ -1428,6 +1428,7 @@ dependencies = [
|
||||
"codex-feedback",
|
||||
"codex-file-search",
|
||||
"codex-login",
|
||||
"codex-otel",
|
||||
"codex-protocol",
|
||||
"codex-rmcp-client",
|
||||
"codex-shell-command",
|
||||
|
||||
@@ -70,7 +70,17 @@
|
||||
"method": {
|
||||
"type": "string"
|
||||
},
|
||||
"params": true
|
||||
"params": true,
|
||||
"trace": {
|
||||
"anyOf": [
|
||||
{
|
||||
"$ref": "#/definitions/W3cTraceContext"
|
||||
},
|
||||
{
|
||||
"type": "null"
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"id",
|
||||
@@ -102,6 +112,23 @@
|
||||
"type": "integer"
|
||||
}
|
||||
]
|
||||
},
|
||||
"W3cTraceContext": {
|
||||
"properties": {
|
||||
"traceparent": {
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"tracestate": {
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
}
|
||||
},
|
||||
"type": "object"
|
||||
}
|
||||
},
|
||||
"description": "Refers to any valid JSON-RPC object that can be decoded off the wire, or encoded to be sent.",
|
||||
|
||||
@@ -11,6 +11,23 @@
|
||||
"type": "integer"
|
||||
}
|
||||
]
|
||||
},
|
||||
"W3cTraceContext": {
|
||||
"properties": {
|
||||
"traceparent": {
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"tracestate": {
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
}
|
||||
},
|
||||
"type": "object"
|
||||
}
|
||||
},
|
||||
"description": "A request that expects a response.",
|
||||
@@ -21,7 +38,17 @@
|
||||
"method": {
|
||||
"type": "string"
|
||||
},
|
||||
"params": true
|
||||
"params": true,
|
||||
"trace": {
|
||||
"anyOf": [
|
||||
{
|
||||
"$ref": "#/definitions/W3cTraceContext"
|
||||
},
|
||||
{
|
||||
"type": "null"
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"id",
|
||||
|
||||
@@ -5016,7 +5016,17 @@
|
||||
"method": {
|
||||
"type": "string"
|
||||
},
|
||||
"params": true
|
||||
"params": true,
|
||||
"trace": {
|
||||
"anyOf": [
|
||||
{
|
||||
"$ref": "#/definitions/W3cTraceContext"
|
||||
},
|
||||
{
|
||||
"type": "null"
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"id",
|
||||
@@ -7220,6 +7230,23 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
"W3cTraceContext": {
|
||||
"properties": {
|
||||
"traceparent": {
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"tracestate": {
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
}
|
||||
},
|
||||
"type": "object"
|
||||
},
|
||||
"v2": {
|
||||
"AbsolutePathBuf": {
|
||||
"description": "A path that is guaranteed to be absolute and normalized (though it is not guaranteed to be canonicalized or exist on the filesystem).\n\nIMPORTANT: When deserializing an `AbsolutePathBuf`, a base path must be set using [AbsolutePathBufGuard::new]. If no base path is set, the deserialization will fail unless the path being deserialized is already absolute.",
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
//! We do not do true JSON-RPC 2.0, as we neither send nor expect the
|
||||
//! "jsonrpc": "2.0" field.
|
||||
|
||||
use codex_protocol::protocol::W3cTraceContext;
|
||||
use schemars::JsonSchema;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
@@ -38,6 +39,11 @@ pub struct JSONRPCRequest {
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
#[ts(optional)]
|
||||
pub params: Option<serde_json::Value>,
|
||||
/// Optional W3C Trace Context carrier for distributed tracing.
|
||||
/// Uses `traceparent` and `tracestate` at the JSON-RPC envelope level.
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
#[ts(optional)]
|
||||
pub trace: Option<W3cTraceContext>,
|
||||
}
|
||||
|
||||
/// A notification which does not expect a response.
|
||||
|
||||
@@ -15,6 +15,8 @@ use std::process::Command;
|
||||
use std::process::Stdio;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use std::time::SystemTime;
|
||||
use std::time::UNIX_EPOCH;
|
||||
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
@@ -71,6 +73,7 @@ use codex_app_server_protocol::UserInput as V2UserInput;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::Event;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::W3cTraceContext;
|
||||
use serde::Serialize;
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde_json::Value;
|
||||
@@ -104,6 +107,10 @@ const NOTIFICATIONS_TO_OPT_OUT: &[&str] = &[
|
||||
"item/reasoning/summaryTextDelta",
|
||||
"item/reasoning/textDelta",
|
||||
];
|
||||
const APP_SERVER_GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
const APP_SERVER_GRACEFUL_SHUTDOWN_POLL_INTERVAL: Duration = Duration::from_millis(100);
|
||||
const DATADOG_TRACE_BASE_URL: &str = "https://openai.datadoghq.com/apm/traces";
|
||||
const DATADOG_TRACE_WINDOW_MS: u128 = 15 * 60 * 1000;
|
||||
|
||||
/// Minimal launcher that initializes the Codex app-server and logs the handshake.
|
||||
#[derive(Parser)]
|
||||
@@ -499,24 +506,28 @@ fn send_message(
|
||||
user_message: String,
|
||||
) -> Result<()> {
|
||||
let mut client = CodexClient::connect(endpoint, config_overrides)?;
|
||||
let result = (|| -> Result<()> {
|
||||
let initialize = client.initialize()?;
|
||||
println!("< initialize response: {initialize:?}");
|
||||
|
||||
let initialize = client.initialize()?;
|
||||
println!("< initialize response: {initialize:?}");
|
||||
let conversation = client.start_thread()?;
|
||||
println!("< newConversation response: {conversation:?}");
|
||||
|
||||
let conversation = client.start_thread()?;
|
||||
println!("< newConversation response: {conversation:?}");
|
||||
let subscription = client.add_conversation_listener(&conversation.conversation_id)?;
|
||||
println!("< addConversationListener response: {subscription:?}");
|
||||
|
||||
let subscription = client.add_conversation_listener(&conversation.conversation_id)?;
|
||||
println!("< addConversationListener response: {subscription:?}");
|
||||
let send_response =
|
||||
client.send_user_message(&conversation.conversation_id, &user_message)?;
|
||||
println!("< sendUserMessage response: {send_response:?}");
|
||||
|
||||
let send_response = client.send_user_message(&conversation.conversation_id, &user_message)?;
|
||||
println!("< sendUserMessage response: {send_response:?}");
|
||||
client.stream_conversation(&conversation.conversation_id)?;
|
||||
|
||||
client.stream_conversation(&conversation.conversation_id)?;
|
||||
client.remove_thread_listener(subscription.subscription_id)?;
|
||||
|
||||
client.remove_thread_listener(subscription.subscription_id)?;
|
||||
|
||||
Ok(())
|
||||
Ok(())
|
||||
})();
|
||||
client.print_trace_summary();
|
||||
result
|
||||
}
|
||||
|
||||
pub fn send_message_v2(
|
||||
@@ -575,81 +586,87 @@ fn trigger_zsh_fork_multi_cmd_approval(
|
||||
let message = user_message.unwrap_or_else(|| default_prompt.to_string());
|
||||
|
||||
let mut client = CodexClient::connect(endpoint, config_overrides)?;
|
||||
let initialize = client.initialize()?;
|
||||
println!("< initialize response: {initialize:?}");
|
||||
let result = (|| -> Result<()> {
|
||||
let initialize = client.initialize()?;
|
||||
println!("< initialize response: {initialize:?}");
|
||||
|
||||
let thread_response = client.thread_start(ThreadStartParams {
|
||||
dynamic_tools: dynamic_tools.clone(),
|
||||
..Default::default()
|
||||
})?;
|
||||
println!("< thread/start response: {thread_response:?}");
|
||||
let thread_response = client.thread_start(ThreadStartParams {
|
||||
dynamic_tools: dynamic_tools.clone(),
|
||||
..Default::default()
|
||||
})?;
|
||||
println!("< thread/start response: {thread_response:?}");
|
||||
|
||||
client.command_approval_behavior = match abort_on {
|
||||
Some(index) => CommandApprovalBehavior::AbortOn(index),
|
||||
None => CommandApprovalBehavior::AlwaysAccept,
|
||||
};
|
||||
client.command_approval_count = 0;
|
||||
client.command_approval_item_ids.clear();
|
||||
client.command_execution_statuses.clear();
|
||||
client.last_turn_status = None;
|
||||
client.command_approval_behavior = match abort_on {
|
||||
Some(index) => CommandApprovalBehavior::AbortOn(index),
|
||||
None => CommandApprovalBehavior::AlwaysAccept,
|
||||
};
|
||||
client.command_approval_count = 0;
|
||||
client.command_approval_item_ids.clear();
|
||||
client.command_execution_statuses.clear();
|
||||
client.last_turn_status = None;
|
||||
|
||||
let mut turn_params = TurnStartParams {
|
||||
thread_id: thread_response.thread.id.clone(),
|
||||
input: vec![V2UserInput::Text {
|
||||
text: message,
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
..Default::default()
|
||||
};
|
||||
turn_params.approval_policy = Some(AskForApproval::OnRequest);
|
||||
turn_params.sandbox_policy = Some(SandboxPolicy::ReadOnly {
|
||||
access: ReadOnlyAccess::FullAccess,
|
||||
});
|
||||
let mut turn_params = TurnStartParams {
|
||||
thread_id: thread_response.thread.id.clone(),
|
||||
input: vec![V2UserInput::Text {
|
||||
text: message,
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
..Default::default()
|
||||
};
|
||||
turn_params.approval_policy = Some(AskForApproval::OnRequest);
|
||||
turn_params.sandbox_policy = Some(SandboxPolicy::ReadOnly {
|
||||
access: ReadOnlyAccess::FullAccess,
|
||||
});
|
||||
|
||||
let turn_response = client.turn_start(turn_params)?;
|
||||
println!("< turn/start response: {turn_response:?}");
|
||||
client.stream_turn(&thread_response.thread.id, &turn_response.turn.id)?;
|
||||
let turn_response = client.turn_start(turn_params)?;
|
||||
println!("< turn/start response: {turn_response:?}");
|
||||
client.stream_turn(&thread_response.thread.id, &turn_response.turn.id)?;
|
||||
|
||||
if client.command_approval_count < min_approvals {
|
||||
bail!(
|
||||
"expected at least {min_approvals} command approvals, got {}",
|
||||
client.command_approval_count
|
||||
);
|
||||
}
|
||||
let mut approvals_per_item = std::collections::BTreeMap::new();
|
||||
for item_id in &client.command_approval_item_ids {
|
||||
*approvals_per_item.entry(item_id.clone()).or_insert(0usize) += 1;
|
||||
}
|
||||
let max_approvals_for_one_item = approvals_per_item.values().copied().max().unwrap_or(0);
|
||||
if max_approvals_for_one_item < min_approvals {
|
||||
bail!(
|
||||
"expected at least {min_approvals} approvals for one command item, got max {max_approvals_for_one_item} with map {approvals_per_item:?}"
|
||||
);
|
||||
}
|
||||
|
||||
let last_command_status = client.command_execution_statuses.last();
|
||||
if abort_on.is_none() {
|
||||
if last_command_status != Some(&CommandExecutionStatus::Completed) {
|
||||
bail!("expected completed command execution, got {last_command_status:?}");
|
||||
}
|
||||
if client.last_turn_status != Some(TurnStatus::Completed) {
|
||||
if client.command_approval_count < min_approvals {
|
||||
bail!(
|
||||
"expected completed turn in all-accept flow, got {:?}",
|
||||
client.last_turn_status
|
||||
"expected at least {min_approvals} command approvals, got {}",
|
||||
client.command_approval_count
|
||||
);
|
||||
}
|
||||
} else if last_command_status == Some(&CommandExecutionStatus::Completed) {
|
||||
bail!(
|
||||
"expected non-completed command execution in mixed approval/decline flow, got {last_command_status:?}"
|
||||
let mut approvals_per_item = std::collections::BTreeMap::new();
|
||||
for item_id in &client.command_approval_item_ids {
|
||||
*approvals_per_item.entry(item_id.clone()).or_insert(0usize) += 1;
|
||||
}
|
||||
let max_approvals_for_one_item = approvals_per_item.values().copied().max().unwrap_or(0);
|
||||
if max_approvals_for_one_item < min_approvals {
|
||||
bail!(
|
||||
"expected at least {min_approvals} approvals for one command item, got max {max_approvals_for_one_item} with map {approvals_per_item:?}"
|
||||
);
|
||||
}
|
||||
|
||||
let last_command_status = client.command_execution_statuses.last();
|
||||
if abort_on.is_none() {
|
||||
if last_command_status != Some(&CommandExecutionStatus::Completed) {
|
||||
bail!("expected completed command execution, got {last_command_status:?}");
|
||||
}
|
||||
if client.last_turn_status != Some(TurnStatus::Completed) {
|
||||
bail!(
|
||||
"expected completed turn in all-accept flow, got {:?}",
|
||||
client.last_turn_status
|
||||
);
|
||||
}
|
||||
} else if last_command_status == Some(&CommandExecutionStatus::Completed) {
|
||||
bail!(
|
||||
"expected non-completed command execution in mixed approval/decline flow, got {last_command_status:?}"
|
||||
);
|
||||
}
|
||||
|
||||
println!(
|
||||
"[zsh-fork multi-approval summary] approvals={}, approvals_per_item={approvals_per_item:?}, command_statuses={:?}, turn_status={:?}",
|
||||
client.command_approval_count,
|
||||
client.command_execution_statuses,
|
||||
client.last_turn_status
|
||||
);
|
||||
}
|
||||
|
||||
println!(
|
||||
"[zsh-fork multi-approval summary] approvals={}, approvals_per_item={approvals_per_item:?}, command_statuses={:?}, turn_status={:?}",
|
||||
client.command_approval_count, client.command_execution_statuses, client.last_turn_status
|
||||
);
|
||||
|
||||
Ok(())
|
||||
Ok(())
|
||||
})();
|
||||
client.print_trace_summary();
|
||||
result
|
||||
}
|
||||
|
||||
fn resume_message_v2(
|
||||
@@ -662,29 +679,32 @@ fn resume_message_v2(
|
||||
ensure_dynamic_tools_unused(dynamic_tools, "resume-message-v2")?;
|
||||
|
||||
let mut client = CodexClient::connect(endpoint, config_overrides)?;
|
||||
let result = (|| -> Result<()> {
|
||||
let initialize = client.initialize()?;
|
||||
println!("< initialize response: {initialize:?}");
|
||||
|
||||
let initialize = client.initialize()?;
|
||||
println!("< initialize response: {initialize:?}");
|
||||
let resume_response = client.thread_resume(ThreadResumeParams {
|
||||
thread_id,
|
||||
..Default::default()
|
||||
})?;
|
||||
println!("< thread/resume response: {resume_response:?}");
|
||||
|
||||
let resume_response = client.thread_resume(ThreadResumeParams {
|
||||
thread_id,
|
||||
..Default::default()
|
||||
})?;
|
||||
println!("< thread/resume response: {resume_response:?}");
|
||||
let turn_response = client.turn_start(TurnStartParams {
|
||||
thread_id: resume_response.thread.id.clone(),
|
||||
input: vec![V2UserInput::Text {
|
||||
text: user_message,
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
..Default::default()
|
||||
})?;
|
||||
println!("< turn/start response: {turn_response:?}");
|
||||
|
||||
let turn_response = client.turn_start(TurnStartParams {
|
||||
thread_id: resume_response.thread.id.clone(),
|
||||
input: vec![V2UserInput::Text {
|
||||
text: user_message,
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
..Default::default()
|
||||
})?;
|
||||
println!("< turn/start response: {turn_response:?}");
|
||||
client.stream_turn(&resume_response.thread.id, &turn_response.turn.id)?;
|
||||
|
||||
client.stream_turn(&resume_response.thread.id, &turn_response.turn.id)?;
|
||||
|
||||
Ok(())
|
||||
Ok(())
|
||||
})();
|
||||
client.print_trace_summary();
|
||||
result
|
||||
}
|
||||
|
||||
fn thread_resume_follow(
|
||||
@@ -788,33 +808,36 @@ fn send_message_v2_with_policies(
|
||||
dynamic_tools: &Option<Vec<DynamicToolSpec>>,
|
||||
) -> Result<()> {
|
||||
let mut client = CodexClient::connect(endpoint, config_overrides)?;
|
||||
let result = (|| -> Result<()> {
|
||||
let initialize = client.initialize_with_experimental_api(experimental_api)?;
|
||||
println!("< initialize response: {initialize:?}");
|
||||
|
||||
let initialize = client.initialize_with_experimental_api(experimental_api)?;
|
||||
println!("< initialize response: {initialize:?}");
|
||||
let thread_response = client.thread_start(ThreadStartParams {
|
||||
dynamic_tools: dynamic_tools.clone(),
|
||||
..Default::default()
|
||||
})?;
|
||||
println!("< thread/start response: {thread_response:?}");
|
||||
let mut turn_params = TurnStartParams {
|
||||
thread_id: thread_response.thread.id.clone(),
|
||||
input: vec![V2UserInput::Text {
|
||||
text: user_message,
|
||||
// Test client sends plain text without UI element ranges.
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
..Default::default()
|
||||
};
|
||||
turn_params.approval_policy = approval_policy;
|
||||
turn_params.sandbox_policy = sandbox_policy;
|
||||
|
||||
let thread_response = client.thread_start(ThreadStartParams {
|
||||
dynamic_tools: dynamic_tools.clone(),
|
||||
..Default::default()
|
||||
})?;
|
||||
println!("< thread/start response: {thread_response:?}");
|
||||
let mut turn_params = TurnStartParams {
|
||||
thread_id: thread_response.thread.id.clone(),
|
||||
input: vec![V2UserInput::Text {
|
||||
text: user_message,
|
||||
// Test client sends plain text without UI element ranges.
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
..Default::default()
|
||||
};
|
||||
turn_params.approval_policy = approval_policy;
|
||||
turn_params.sandbox_policy = sandbox_policy;
|
||||
let turn_response = client.turn_start(turn_params)?;
|
||||
println!("< turn/start response: {turn_response:?}");
|
||||
|
||||
let turn_response = client.turn_start(turn_params)?;
|
||||
println!("< turn/start response: {turn_response:?}");
|
||||
client.stream_turn(&thread_response.thread.id, &turn_response.turn.id)?;
|
||||
|
||||
client.stream_turn(&thread_response.thread.id, &turn_response.turn.id)?;
|
||||
|
||||
Ok(())
|
||||
Ok(())
|
||||
})();
|
||||
client.print_trace_summary();
|
||||
result
|
||||
}
|
||||
|
||||
fn send_follow_up_v2(
|
||||
@@ -825,118 +848,133 @@ fn send_follow_up_v2(
|
||||
dynamic_tools: &Option<Vec<DynamicToolSpec>>,
|
||||
) -> Result<()> {
|
||||
let mut client = CodexClient::connect(endpoint, config_overrides)?;
|
||||
let result = (|| -> Result<()> {
|
||||
let initialize = client.initialize()?;
|
||||
println!("< initialize response: {initialize:?}");
|
||||
|
||||
let initialize = client.initialize()?;
|
||||
println!("< initialize response: {initialize:?}");
|
||||
let thread_response = client.thread_start(ThreadStartParams {
|
||||
dynamic_tools: dynamic_tools.clone(),
|
||||
..Default::default()
|
||||
})?;
|
||||
println!("< thread/start response: {thread_response:?}");
|
||||
|
||||
let thread_response = client.thread_start(ThreadStartParams {
|
||||
dynamic_tools: dynamic_tools.clone(),
|
||||
..Default::default()
|
||||
})?;
|
||||
println!("< thread/start response: {thread_response:?}");
|
||||
let first_turn_params = TurnStartParams {
|
||||
thread_id: thread_response.thread.id.clone(),
|
||||
input: vec![V2UserInput::Text {
|
||||
text: first_message,
|
||||
// Test client sends plain text without UI element ranges.
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
..Default::default()
|
||||
};
|
||||
let first_turn_response = client.turn_start(first_turn_params)?;
|
||||
println!("< turn/start response (initial): {first_turn_response:?}");
|
||||
client.stream_turn(&thread_response.thread.id, &first_turn_response.turn.id)?;
|
||||
|
||||
let first_turn_params = TurnStartParams {
|
||||
thread_id: thread_response.thread.id.clone(),
|
||||
input: vec![V2UserInput::Text {
|
||||
text: first_message,
|
||||
// Test client sends plain text without UI element ranges.
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
..Default::default()
|
||||
};
|
||||
let first_turn_response = client.turn_start(first_turn_params)?;
|
||||
println!("< turn/start response (initial): {first_turn_response:?}");
|
||||
client.stream_turn(&thread_response.thread.id, &first_turn_response.turn.id)?;
|
||||
let follow_up_params = TurnStartParams {
|
||||
thread_id: thread_response.thread.id.clone(),
|
||||
input: vec![V2UserInput::Text {
|
||||
text: follow_up_message,
|
||||
// Test client sends plain text without UI element ranges.
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
..Default::default()
|
||||
};
|
||||
let follow_up_response = client.turn_start(follow_up_params)?;
|
||||
println!("< turn/start response (follow-up): {follow_up_response:?}");
|
||||
client.stream_turn(&thread_response.thread.id, &follow_up_response.turn.id)?;
|
||||
|
||||
let follow_up_params = TurnStartParams {
|
||||
thread_id: thread_response.thread.id.clone(),
|
||||
input: vec![V2UserInput::Text {
|
||||
text: follow_up_message,
|
||||
// Test client sends plain text without UI element ranges.
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
..Default::default()
|
||||
};
|
||||
let follow_up_response = client.turn_start(follow_up_params)?;
|
||||
println!("< turn/start response (follow-up): {follow_up_response:?}");
|
||||
client.stream_turn(&thread_response.thread.id, &follow_up_response.turn.id)?;
|
||||
|
||||
Ok(())
|
||||
Ok(())
|
||||
})();
|
||||
client.print_trace_summary();
|
||||
result
|
||||
}
|
||||
|
||||
fn test_login(endpoint: &Endpoint, config_overrides: &[String]) -> Result<()> {
|
||||
let mut client = CodexClient::connect(endpoint, config_overrides)?;
|
||||
let result = (|| -> Result<()> {
|
||||
let initialize = client.initialize()?;
|
||||
println!("< initialize response: {initialize:?}");
|
||||
|
||||
let initialize = client.initialize()?;
|
||||
println!("< initialize response: {initialize:?}");
|
||||
|
||||
let login_response = client.login_chat_gpt()?;
|
||||
println!("< loginChatGpt response: {login_response:?}");
|
||||
println!(
|
||||
"Open the following URL in your browser to continue:\n{}",
|
||||
login_response.auth_url
|
||||
);
|
||||
|
||||
let completion = client.wait_for_login_completion(&login_response.login_id)?;
|
||||
println!("< loginChatGptComplete notification: {completion:?}");
|
||||
|
||||
if completion.success {
|
||||
println!("Login succeeded.");
|
||||
Ok(())
|
||||
} else {
|
||||
bail!(
|
||||
"login failed: {}",
|
||||
completion
|
||||
.error
|
||||
.as_deref()
|
||||
.unwrap_or("unknown error from loginChatGptComplete")
|
||||
let login_response = client.login_chat_gpt()?;
|
||||
println!("< loginChatGpt response: {login_response:?}");
|
||||
println!(
|
||||
"Open the following URL in your browser to continue:\n{}",
|
||||
login_response.auth_url
|
||||
);
|
||||
}
|
||||
|
||||
let completion = client.wait_for_login_completion(&login_response.login_id)?;
|
||||
println!("< loginChatGptComplete notification: {completion:?}");
|
||||
|
||||
if completion.success {
|
||||
println!("Login succeeded.");
|
||||
Ok(())
|
||||
} else {
|
||||
bail!(
|
||||
"login failed: {}",
|
||||
completion
|
||||
.error
|
||||
.as_deref()
|
||||
.unwrap_or("unknown error from loginChatGptComplete")
|
||||
);
|
||||
}
|
||||
})();
|
||||
client.print_trace_summary();
|
||||
result
|
||||
}
|
||||
|
||||
fn get_account_rate_limits(endpoint: &Endpoint, config_overrides: &[String]) -> Result<()> {
|
||||
let mut client = CodexClient::connect(endpoint, config_overrides)?;
|
||||
let result = (|| -> Result<()> {
|
||||
let initialize = client.initialize()?;
|
||||
println!("< initialize response: {initialize:?}");
|
||||
|
||||
let initialize = client.initialize()?;
|
||||
println!("< initialize response: {initialize:?}");
|
||||
let response = client.get_account_rate_limits()?;
|
||||
println!("< account/rateLimits/read response: {response:?}");
|
||||
|
||||
let response = client.get_account_rate_limits()?;
|
||||
println!("< account/rateLimits/read response: {response:?}");
|
||||
|
||||
Ok(())
|
||||
Ok(())
|
||||
})();
|
||||
client.print_trace_summary();
|
||||
result
|
||||
}
|
||||
|
||||
fn model_list(endpoint: &Endpoint, config_overrides: &[String]) -> Result<()> {
|
||||
let mut client = CodexClient::connect(endpoint, config_overrides)?;
|
||||
let result = (|| -> Result<()> {
|
||||
let initialize = client.initialize()?;
|
||||
println!("< initialize response: {initialize:?}");
|
||||
|
||||
let initialize = client.initialize()?;
|
||||
println!("< initialize response: {initialize:?}");
|
||||
let response = client.model_list(ModelListParams::default())?;
|
||||
println!("< model/list response: {response:?}");
|
||||
|
||||
let response = client.model_list(ModelListParams::default())?;
|
||||
println!("< model/list response: {response:?}");
|
||||
|
||||
Ok(())
|
||||
Ok(())
|
||||
})();
|
||||
client.print_trace_summary();
|
||||
result
|
||||
}
|
||||
|
||||
fn thread_list(endpoint: &Endpoint, config_overrides: &[String], limit: u32) -> Result<()> {
|
||||
let mut client = CodexClient::connect(endpoint, config_overrides)?;
|
||||
let result = (|| -> Result<()> {
|
||||
let initialize = client.initialize()?;
|
||||
println!("< initialize response: {initialize:?}");
|
||||
|
||||
let initialize = client.initialize()?;
|
||||
println!("< initialize response: {initialize:?}");
|
||||
let response = client.thread_list(ThreadListParams {
|
||||
cursor: None,
|
||||
limit: Some(limit),
|
||||
sort_key: None,
|
||||
model_providers: None,
|
||||
source_kinds: None,
|
||||
archived: None,
|
||||
cwd: None,
|
||||
search_term: None,
|
||||
})?;
|
||||
println!("< thread/list response: {response:?}");
|
||||
|
||||
let response = client.thread_list(ThreadListParams {
|
||||
cursor: None,
|
||||
limit: Some(limit),
|
||||
sort_key: None,
|
||||
model_providers: None,
|
||||
source_kinds: None,
|
||||
archived: None,
|
||||
cwd: None,
|
||||
search_term: None,
|
||||
})?;
|
||||
println!("< thread/list response: {response:?}");
|
||||
|
||||
Ok(())
|
||||
Ok(())
|
||||
})();
|
||||
client.print_trace_summary();
|
||||
result
|
||||
}
|
||||
|
||||
fn ensure_dynamic_tools_unused(
|
||||
@@ -993,6 +1031,14 @@ struct CodexClient {
|
||||
command_approval_item_ids: Vec<String>,
|
||||
command_execution_statuses: Vec<CommandExecutionStatus>,
|
||||
last_turn_status: Option<TurnStatus>,
|
||||
trace_id: String,
|
||||
trace_requests: Vec<TraceRequestSummary>,
|
||||
}
|
||||
|
||||
struct TraceRequestSummary {
|
||||
method: String,
|
||||
request_id: String,
|
||||
parent_span_id: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
@@ -1052,6 +1098,8 @@ impl CodexClient {
|
||||
command_approval_item_ids: Vec::new(),
|
||||
command_execution_statuses: Vec::new(),
|
||||
last_turn_status: None,
|
||||
trace_id: generate_trace_id(),
|
||||
trace_requests: Vec::new(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1073,6 +1121,8 @@ impl CodexClient {
|
||||
command_approval_item_ids: Vec::new(),
|
||||
command_execution_statuses: Vec::new(),
|
||||
last_turn_status: None,
|
||||
trace_id: generate_trace_id(),
|
||||
trace_requests: Vec::new(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1438,12 +1488,67 @@ impl CodexClient {
|
||||
}
|
||||
|
||||
fn write_request(&mut self, request: &ClientRequest) -> Result<()> {
|
||||
let request_json = serde_json::to_string(request)?;
|
||||
let request_pretty = serde_json::to_string_pretty(request)?;
|
||||
let request = self.jsonrpc_request_with_trace(request)?;
|
||||
self.record_request_trace_info(&request);
|
||||
let request_json = serde_json::to_string(&request)?;
|
||||
let request_pretty = serde_json::to_string_pretty(&request)?;
|
||||
print_multiline_with_prefix("> ", &request_pretty);
|
||||
self.write_payload(&request_json)
|
||||
}
|
||||
|
||||
fn jsonrpc_request_with_trace(&self, request: &ClientRequest) -> Result<JSONRPCRequest> {
|
||||
let request_value = serde_json::to_value(request)?;
|
||||
let mut request: JSONRPCRequest = serde_json::from_value(request_value)
|
||||
.context("client request was not a valid JSON-RPC request")?;
|
||||
request.trace = Some(W3cTraceContext {
|
||||
traceparent: Some(format!(
|
||||
"00-{}-{}-01",
|
||||
self.trace_id,
|
||||
generate_parent_span_id()
|
||||
)),
|
||||
tracestate: None,
|
||||
});
|
||||
Ok(request)
|
||||
}
|
||||
|
||||
fn record_request_trace_info(&mut self, request: &JSONRPCRequest) {
|
||||
let Some(trace) = request.trace.as_ref() else {
|
||||
return;
|
||||
};
|
||||
let Some(traceparent) = trace.traceparent.as_deref() else {
|
||||
return;
|
||||
};
|
||||
let Some((trace_id, parent_span_id)) = parse_traceparent(traceparent) else {
|
||||
return;
|
||||
};
|
||||
|
||||
debug_assert_eq!(trace_id, self.trace_id);
|
||||
self.trace_requests.push(TraceRequestSummary {
|
||||
method: request.method.clone(),
|
||||
request_id: format_request_id(&request.id),
|
||||
parent_span_id: parent_span_id.to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
fn print_trace_summary(&self) {
|
||||
if self.trace_requests.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
println!("\n[trace summary]");
|
||||
println!("trace_id: {}", self.trace_id);
|
||||
println!(
|
||||
"datadog_url: {}",
|
||||
datadog_trace_url(&self.trace_id, "codex_app_server")
|
||||
);
|
||||
println!("requests:");
|
||||
for request in &self.trace_requests {
|
||||
println!(" method: {}", request.method);
|
||||
println!(" request_id: {}", request.request_id);
|
||||
println!(" parent_span_id: {}", request.parent_span_id);
|
||||
}
|
||||
}
|
||||
|
||||
fn wait_for_response<T>(&mut self, request_id: RequestId, method: &str) -> Result<T>
|
||||
where
|
||||
T: DeserializeOwned,
|
||||
@@ -1709,6 +1814,50 @@ impl CodexClient {
|
||||
}
|
||||
}
|
||||
|
||||
fn generate_trace_id() -> String {
|
||||
Uuid::new_v4().simple().to_string()
|
||||
}
|
||||
|
||||
fn generate_parent_span_id() -> String {
|
||||
let uuid = Uuid::new_v4().simple().to_string();
|
||||
uuid[..16].to_string()
|
||||
}
|
||||
|
||||
fn parse_traceparent(traceparent: &str) -> Option<(&str, &str)> {
|
||||
let mut parts = traceparent.split('-');
|
||||
let _version = parts.next()?;
|
||||
let trace_id = parts.next()?;
|
||||
let parent_span_id = parts.next()?;
|
||||
let _flags = parts.next()?;
|
||||
if parts.next().is_some() {
|
||||
return None;
|
||||
}
|
||||
Some((trace_id, parent_span_id))
|
||||
}
|
||||
|
||||
fn format_request_id(request_id: &RequestId) -> String {
|
||||
match request_id {
|
||||
RequestId::String(value) => value.clone(),
|
||||
RequestId::Integer(value) => value.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
fn datadog_trace_url(trace_id: &str, service_name: &str) -> String {
|
||||
let now_ms = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.map(|duration| duration.as_millis())
|
||||
.unwrap_or(0);
|
||||
let start = now_ms.saturating_sub(DATADOG_TRACE_WINDOW_MS);
|
||||
let end = now_ms.saturating_add(DATADOG_TRACE_WINDOW_MS);
|
||||
|
||||
let query = format!("service:{service_name} trace_id:{trace_id}");
|
||||
let encoded_query: String = url::form_urlencoded::byte_serialize(query.as_bytes()).collect();
|
||||
|
||||
format!(
|
||||
"{DATADOG_TRACE_BASE_URL}?query={encoded_query}&agg_m=count&agg_m_source=base&agg_t=count&fromUser=false&graphType=flamegraph&historicalData=false&messageDisplay=inline&query_translation_version=v0&shouldShowLegend=true&sort=desc&spanType=all&storage=hot&target-span=a&tq_query_translation_version=v0&traceQuery=a&view=spans&viz=stream&start={start}&end={end}&paused=false"
|
||||
)
|
||||
}
|
||||
|
||||
fn print_multiline_with_prefix(prefix: &str, payload: &str) {
|
||||
for line in payload.lines() {
|
||||
println!("{prefix}{line}");
|
||||
@@ -1728,11 +1877,18 @@ impl Drop for CodexClient {
|
||||
return;
|
||||
}
|
||||
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
let deadline = SystemTime::now() + APP_SERVER_GRACEFUL_SHUTDOWN_TIMEOUT;
|
||||
loop {
|
||||
if let Ok(Some(status)) = child.try_wait() {
|
||||
println!("[codex app-server exited: {status}]");
|
||||
return;
|
||||
}
|
||||
|
||||
if let Ok(Some(status)) = child.try_wait() {
|
||||
println!("[codex app-server exited: {status}]");
|
||||
return;
|
||||
if SystemTime::now() >= deadline {
|
||||
break;
|
||||
}
|
||||
|
||||
thread::sleep(APP_SERVER_GRACEFUL_SHUTDOWN_POLL_INTERVAL);
|
||||
}
|
||||
|
||||
let _ = child.kill();
|
||||
|
||||
@@ -21,6 +21,7 @@ async-trait = { workspace = true }
|
||||
codex-arg0 = { workspace = true }
|
||||
codex-cloud-requirements = { workspace = true }
|
||||
codex-core = { workspace = true }
|
||||
codex-otel = { workspace = true }
|
||||
codex-shell-command = { workspace = true }
|
||||
codex-utils-cli = { workspace = true }
|
||||
codex-backend-client = { workspace = true }
|
||||
|
||||
@@ -28,6 +28,10 @@ Supported transports:
|
||||
|
||||
Websocket transport is currently experimental and unsupported. Do not rely on it for production workloads.
|
||||
|
||||
Inbound JSON-RPC requests may include an optional top-level `trace` object with
|
||||
W3C Trace Context fields (`traceparent` and optional `tracestate`). This keeps
|
||||
trace propagation transport-level and independent of method-specific params.
|
||||
|
||||
Tracing/log output:
|
||||
|
||||
- `RUST_LOG` controls log filtering/verbosity.
|
||||
@@ -85,6 +89,9 @@ Example (from OpenAI's official VSCode extension):
|
||||
{
|
||||
"method": "initialize",
|
||||
"id": 0,
|
||||
"trace": {
|
||||
"traceparent": "00-00000000000000000000000000000001-0000000000000002-01"
|
||||
},
|
||||
"params": {
|
||||
"clientInfo": {
|
||||
"name": "codex_vscode",
|
||||
|
||||
89
codex-rs/app-server/src/app_server_tracing.rs
Normal file
89
codex-rs/app-server/src/app_server_tracing.rs
Normal file
@@ -0,0 +1,89 @@
|
||||
use crate::message_processor::ConnectionSessionState;
|
||||
use crate::outgoing_message::ConnectionId;
|
||||
use crate::transport::AppServerTransport;
|
||||
use codex_app_server_protocol::InitializeParams;
|
||||
use codex_app_server_protocol::JSONRPCRequest;
|
||||
use codex_otel::set_parent_from_context;
|
||||
use codex_otel::set_parent_from_w3c_trace_context;
|
||||
use codex_otel::traceparent_context_from_env;
|
||||
use tracing::Span;
|
||||
use tracing::field;
|
||||
use tracing::info_span;
|
||||
|
||||
pub(crate) fn request_span(
|
||||
request: &JSONRPCRequest,
|
||||
transport: AppServerTransport,
|
||||
connection_id: ConnectionId,
|
||||
session: &ConnectionSessionState,
|
||||
) -> Span {
|
||||
let span = info_span!(
|
||||
"app_server.request",
|
||||
otel.kind = "server",
|
||||
otel.name = request.method.as_str(),
|
||||
rpc.system = "jsonrpc",
|
||||
rpc.method = request.method.as_str(),
|
||||
rpc.transport = transport_name(transport),
|
||||
rpc.request_id = ?request.id,
|
||||
app_server.connection_id = ?connection_id,
|
||||
app_server.api_version = "v2",
|
||||
app_server.client_name = field::Empty,
|
||||
app_server.client_version = field::Empty,
|
||||
);
|
||||
|
||||
let initialize_client_info = initialize_client_info(request);
|
||||
if let Some(client_name) = client_name(initialize_client_info.as_ref(), session) {
|
||||
span.record("app_server.client_name", client_name);
|
||||
}
|
||||
if let Some(client_version) = client_version(initialize_client_info.as_ref(), session) {
|
||||
span.record("app_server.client_version", client_version);
|
||||
}
|
||||
|
||||
if let Some(trace) = &request.trace {
|
||||
if !set_parent_from_w3c_trace_context(&span, trace) {
|
||||
tracing::warn!(
|
||||
rpc_method = request.method.as_str(),
|
||||
rpc_request_id = ?request.id,
|
||||
"ignoring invalid inbound request trace carrier"
|
||||
);
|
||||
}
|
||||
} else if let Some(context) = traceparent_context_from_env() {
|
||||
set_parent_from_context(&span, context);
|
||||
}
|
||||
|
||||
span
|
||||
}
|
||||
|
||||
fn transport_name(transport: AppServerTransport) -> &'static str {
|
||||
match transport {
|
||||
AppServerTransport::Stdio => "stdio",
|
||||
AppServerTransport::WebSocket { .. } => "websocket",
|
||||
}
|
||||
}
|
||||
|
||||
fn client_name<'a>(
|
||||
initialize_client_info: Option<&'a InitializeParams>,
|
||||
session: &'a ConnectionSessionState,
|
||||
) -> Option<&'a str> {
|
||||
if let Some(params) = initialize_client_info {
|
||||
return Some(params.client_info.name.as_str());
|
||||
}
|
||||
session.app_server_client_name.as_deref()
|
||||
}
|
||||
|
||||
fn client_version<'a>(
|
||||
initialize_client_info: Option<&'a InitializeParams>,
|
||||
session: &'a ConnectionSessionState,
|
||||
) -> Option<&'a str> {
|
||||
if let Some(params) = initialize_client_info {
|
||||
return Some(params.client_info.version.as_str());
|
||||
}
|
||||
session.client_version.as_deref()
|
||||
}
|
||||
|
||||
fn initialize_client_info(request: &JSONRPCRequest) -> Option<InitializeParams> {
|
||||
if request.method != "initialize" {
|
||||
return None;
|
||||
}
|
||||
let params = request.params.clone()?;
|
||||
serde_json::from_value(params).ok()
|
||||
}
|
||||
@@ -52,6 +52,7 @@ use tracing_subscriber::layer::SubscriberExt;
|
||||
use tracing_subscriber::registry::Registry;
|
||||
use tracing_subscriber::util::SubscriberInitExt;
|
||||
|
||||
mod app_server_tracing;
|
||||
mod bespoke_event_handling;
|
||||
mod codex_message_processor;
|
||||
mod config_api;
|
||||
@@ -447,7 +448,7 @@ pub async fn run_main_with_transport(
|
||||
let otel = codex_core::otel_init::build_provider(
|
||||
&config,
|
||||
env!("CARGO_PKG_VERSION"),
|
||||
Some("codex_app_server"),
|
||||
Some("codex-app-server"),
|
||||
default_analytics_enabled,
|
||||
)
|
||||
.map_err(|e| {
|
||||
@@ -675,6 +676,7 @@ pub async fn run_main_with_transport(
|
||||
.process_request(
|
||||
connection_id,
|
||||
request,
|
||||
transport,
|
||||
&mut connection_state.session,
|
||||
&connection_state.outbound_initialized,
|
||||
)
|
||||
|
||||
@@ -12,6 +12,7 @@ use crate::external_agent_config_api::ExternalAgentConfigApi;
|
||||
use crate::outgoing_message::ConnectionId;
|
||||
use crate::outgoing_message::ConnectionRequestId;
|
||||
use crate::outgoing_message::OutgoingMessageSender;
|
||||
use crate::transport::AppServerTransport;
|
||||
use async_trait::async_trait;
|
||||
use codex_app_server_protocol::ChatgptAuthTokensRefreshParams;
|
||||
use codex_app_server_protocol::ChatgptAuthTokensRefreshReason;
|
||||
@@ -59,6 +60,7 @@ use tokio::sync::watch;
|
||||
use tokio::time::Duration;
|
||||
use tokio::time::timeout;
|
||||
use toml::Value as TomlValue;
|
||||
use tracing::Instrument;
|
||||
|
||||
const EXTERNAL_AUTH_REFRESH_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
|
||||
@@ -141,6 +143,7 @@ pub(crate) struct ConnectionSessionState {
|
||||
pub(crate) experimental_api_enabled: bool,
|
||||
pub(crate) opted_out_notification_methods: HashSet<String>,
|
||||
pub(crate) app_server_client_name: Option<String>,
|
||||
pub(crate) client_version: Option<String>,
|
||||
}
|
||||
|
||||
pub(crate) struct MessageProcessorArgs {
|
||||
@@ -224,46 +227,50 @@ impl MessageProcessor {
|
||||
&mut self,
|
||||
connection_id: ConnectionId,
|
||||
request: JSONRPCRequest,
|
||||
transport: AppServerTransport,
|
||||
session: &mut ConnectionSessionState,
|
||||
outbound_initialized: &AtomicBool,
|
||||
) {
|
||||
let request_method = request.method.as_str();
|
||||
tracing::trace!(
|
||||
?connection_id,
|
||||
request_id = ?request.id,
|
||||
"app-server request: {request_method}"
|
||||
);
|
||||
let request_id = ConnectionRequestId {
|
||||
connection_id,
|
||||
request_id: request.id.clone(),
|
||||
};
|
||||
let request_json = match serde_json::to_value(&request) {
|
||||
Ok(request_json) => request_json,
|
||||
Err(err) => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("Invalid request: {err}"),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
let request_span =
|
||||
crate::app_server_tracing::request_span(&request, transport, connection_id, session);
|
||||
async {
|
||||
let request_method = request.method.as_str();
|
||||
tracing::trace!(
|
||||
?connection_id,
|
||||
request_id = ?request.id,
|
||||
"app-server request: {request_method}"
|
||||
);
|
||||
let request_id = ConnectionRequestId {
|
||||
connection_id,
|
||||
request_id: request.id.clone(),
|
||||
};
|
||||
let request_json = match serde_json::to_value(&request) {
|
||||
Ok(request_json) => request_json,
|
||||
Err(err) => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("Invalid request: {err}"),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let codex_request = match serde_json::from_value::<ClientRequest>(request_json) {
|
||||
Ok(codex_request) => codex_request,
|
||||
Err(err) => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("Invalid request: {err}"),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
let codex_request = match serde_json::from_value::<ClientRequest>(request_json) {
|
||||
Ok(codex_request) => codex_request,
|
||||
Err(err) => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("Invalid request: {err}"),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
match codex_request {
|
||||
match codex_request {
|
||||
// Handle Initialize internally so CodexMessageProcessor does not have to concern
|
||||
// itself with the `initialized` bool.
|
||||
ClientRequest::Initialize { request_id, params } => {
|
||||
@@ -304,6 +311,8 @@ impl MessageProcessor {
|
||||
title: _title,
|
||||
version,
|
||||
} = params.client_info;
|
||||
session.app_server_client_name = Some(name.clone());
|
||||
session.client_version = Some(version.clone());
|
||||
if let Err(error) = set_default_originator(name.clone()) {
|
||||
match error {
|
||||
SetOriginatorError::InvalidHeaderValue => {
|
||||
@@ -330,7 +339,6 @@ impl MessageProcessor {
|
||||
if let Ok(mut suffix) = USER_AGENT_SUFFIX.lock() {
|
||||
*suffix = Some(user_agent_suffix);
|
||||
}
|
||||
session.app_server_client_name = Some(name.clone());
|
||||
|
||||
let user_agent = get_codex_user_agent();
|
||||
let response = InitializeResponse { user_agent };
|
||||
@@ -355,91 +363,97 @@ impl MessageProcessor {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(reason) = codex_request.experimental_reason()
|
||||
&& !session.experimental_api_enabled
|
||||
{
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: experimental_required_message(reason),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
|
||||
if let Some(reason) = codex_request.experimental_reason()
|
||||
&& !session.experimental_api_enabled
|
||||
{
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: experimental_required_message(reason),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
|
||||
match codex_request {
|
||||
ClientRequest::ConfigRead { request_id, params } => {
|
||||
self.handle_config_read(
|
||||
ConnectionRequestId {
|
||||
connection_id,
|
||||
request_id,
|
||||
},
|
||||
params,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::ExternalAgentConfigDetect { request_id, params } => {
|
||||
self.handle_external_agent_config_detect(
|
||||
ConnectionRequestId {
|
||||
connection_id,
|
||||
request_id,
|
||||
},
|
||||
params,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::ExternalAgentConfigImport { request_id, params } => {
|
||||
self.handle_external_agent_config_import(
|
||||
ConnectionRequestId {
|
||||
connection_id,
|
||||
request_id,
|
||||
},
|
||||
params,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::ConfigValueWrite { request_id, params } => {
|
||||
self.handle_config_value_write(
|
||||
ConnectionRequestId {
|
||||
connection_id,
|
||||
request_id,
|
||||
},
|
||||
params,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::ConfigBatchWrite { request_id, params } => {
|
||||
self.handle_config_batch_write(
|
||||
ConnectionRequestId {
|
||||
connection_id,
|
||||
request_id,
|
||||
},
|
||||
params,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::ConfigRequirementsRead {
|
||||
request_id,
|
||||
params: _,
|
||||
} => {
|
||||
self.handle_config_requirements_read(ConnectionRequestId {
|
||||
connection_id,
|
||||
request_id,
|
||||
})
|
||||
.await;
|
||||
}
|
||||
other => {
|
||||
// Box the delegated future so this wrapper's async state machine does not
|
||||
// inline the full `CodexMessageProcessor::process_request` future, which
|
||||
// can otherwise push worker-thread stack usage over the edge.
|
||||
self.codex_message_processor
|
||||
.process_request(connection_id, other, session.app_server_client_name.clone())
|
||||
.boxed()
|
||||
match codex_request {
|
||||
ClientRequest::ConfigRead { request_id, params } => {
|
||||
self.handle_config_read(
|
||||
ConnectionRequestId {
|
||||
connection_id,
|
||||
request_id,
|
||||
},
|
||||
params,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::ExternalAgentConfigDetect { request_id, params } => {
|
||||
self.handle_external_agent_config_detect(
|
||||
ConnectionRequestId {
|
||||
connection_id,
|
||||
request_id,
|
||||
},
|
||||
params,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::ExternalAgentConfigImport { request_id, params } => {
|
||||
self.handle_external_agent_config_import(
|
||||
ConnectionRequestId {
|
||||
connection_id,
|
||||
request_id,
|
||||
},
|
||||
params,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::ConfigValueWrite { request_id, params } => {
|
||||
self.handle_config_value_write(
|
||||
ConnectionRequestId {
|
||||
connection_id,
|
||||
request_id,
|
||||
},
|
||||
params,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::ConfigBatchWrite { request_id, params } => {
|
||||
self.handle_config_batch_write(
|
||||
ConnectionRequestId {
|
||||
connection_id,
|
||||
request_id,
|
||||
},
|
||||
params,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::ConfigRequirementsRead {
|
||||
request_id,
|
||||
params: _,
|
||||
} => {
|
||||
self.handle_config_requirements_read(ConnectionRequestId {
|
||||
connection_id,
|
||||
request_id,
|
||||
})
|
||||
.await;
|
||||
}
|
||||
other => {
|
||||
// Box the delegated future so this wrapper's async state machine does not
|
||||
// inline the full `CodexMessageProcessor::process_request` future, which
|
||||
// can otherwise push worker-thread stack usage over the edge.
|
||||
self.codex_message_processor
|
||||
.process_request(
|
||||
connection_id,
|
||||
other,
|
||||
session.app_server_client_name.clone(),
|
||||
)
|
||||
.boxed()
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
.instrument(request_span)
|
||||
.await;
|
||||
}
|
||||
|
||||
pub(crate) async fn process_notification(&self, notification: JSONRPCNotification) {
|
||||
|
||||
@@ -744,6 +744,7 @@ mod tests {
|
||||
id: codex_app_server_protocol::RequestId::Integer(7),
|
||||
method: "config/read".to_string(),
|
||||
params: Some(json!({ "includeLayers": false })),
|
||||
trace: None,
|
||||
});
|
||||
assert!(
|
||||
enqueue_incoming_message(&transport_event_tx, &writer_tx, connection_id, request).await
|
||||
@@ -885,6 +886,7 @@ mod tests {
|
||||
id: codex_app_server_protocol::RequestId::Integer(7),
|
||||
method: "config/read".to_string(),
|
||||
params: Some(json!({ "includeLayers": false })),
|
||||
trace: None,
|
||||
});
|
||||
|
||||
let enqueue_result = tokio::time::timeout(
|
||||
|
||||
@@ -891,6 +891,7 @@ impl McpProcess {
|
||||
id: RequestId::Integer(request_id),
|
||||
method: method.to_string(),
|
||||
params,
|
||||
trace: None,
|
||||
});
|
||||
self.send_jsonrpc_message(message).await?;
|
||||
Ok(request_id)
|
||||
|
||||
@@ -30,7 +30,7 @@ async fn app_server_default_analytics_disabled_without_flag() -> Result<()> {
|
||||
let provider = codex_core::otel_init::build_provider(
|
||||
&config,
|
||||
SERVICE_VERSION,
|
||||
Some("codex_app_server"),
|
||||
Some("codex-app-server"),
|
||||
false,
|
||||
)
|
||||
.map_err(|err| anyhow::anyhow!(err.to_string()))?;
|
||||
@@ -55,7 +55,7 @@ async fn app_server_default_analytics_enabled_with_flag() -> Result<()> {
|
||||
let provider = codex_core::otel_init::build_provider(
|
||||
&config,
|
||||
SERVICE_VERSION,
|
||||
Some("codex_app_server"),
|
||||
Some("codex-app-server"),
|
||||
true,
|
||||
)
|
||||
.map_err(|err| anyhow::anyhow!(err.to_string()))?;
|
||||
|
||||
@@ -174,6 +174,7 @@ pub(super) async fn send_request(
|
||||
id: RequestId::Integer(id),
|
||||
method: method.to_string(),
|
||||
params,
|
||||
trace: None,
|
||||
});
|
||||
send_jsonrpc(stream, message).await
|
||||
}
|
||||
|
||||
@@ -6,7 +6,6 @@ mod macos;
|
||||
mod tests;
|
||||
|
||||
use crate::config::ConfigToml;
|
||||
use crate::config::deserialize_config_toml_with_base;
|
||||
use crate::config_loader::layer_io::LoadedConfigLayers;
|
||||
use crate::git_info::resolve_root_git_project_for_trust;
|
||||
use codex_app_server_protocol::ConfigLayerSource;
|
||||
@@ -576,6 +575,11 @@ struct ProjectTrustContext {
|
||||
user_config_file: AbsolutePathBuf,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct ProjectTrustConfigToml {
|
||||
projects: Option<std::collections::HashMap<String, crate::config::ProjectConfig>>,
|
||||
}
|
||||
|
||||
struct ProjectTrustDecision {
|
||||
trust_level: Option<TrustLevel>,
|
||||
trust_key: String,
|
||||
@@ -666,10 +670,16 @@ async fn project_trust_context(
|
||||
config_base_dir: &Path,
|
||||
user_config_file: &AbsolutePathBuf,
|
||||
) -> io::Result<ProjectTrustContext> {
|
||||
let config_toml = deserialize_config_toml_with_base(merged_config.clone(), config_base_dir)?;
|
||||
let project_trust_config: ProjectTrustConfigToml = {
|
||||
let _guard = AbsolutePathBufGuard::new(config_base_dir);
|
||||
merged_config
|
||||
.clone()
|
||||
.try_into()
|
||||
.map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidData, err))?
|
||||
};
|
||||
|
||||
let project_root = find_project_root(cwd, project_root_markers).await?;
|
||||
let projects = config_toml.projects.unwrap_or_default();
|
||||
let projects = project_trust_config.projects.unwrap_or_default();
|
||||
|
||||
let project_root_key = project_root.as_path().to_string_lossy().to_string();
|
||||
let repo_root = resolve_root_git_project_for_trust(cwd.as_path());
|
||||
|
||||
@@ -1114,6 +1114,91 @@ async fn project_layers_disabled_when_untrusted_or_unknown() -> std::io::Result<
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn cli_override_can_update_project_local_mcp_server_when_project_is_trusted()
|
||||
-> std::io::Result<()> {
|
||||
let tmp = tempdir()?;
|
||||
let project_root = tmp.path().join("project");
|
||||
let nested = project_root.join("child");
|
||||
let dot_codex = project_root.join(".codex");
|
||||
let codex_home = tmp.path().join("home");
|
||||
tokio::fs::create_dir_all(&nested).await?;
|
||||
tokio::fs::create_dir_all(&dot_codex).await?;
|
||||
tokio::fs::create_dir_all(&codex_home).await?;
|
||||
tokio::fs::write(project_root.join(".git"), "gitdir: here").await?;
|
||||
tokio::fs::write(
|
||||
dot_codex.join(CONFIG_TOML_FILE),
|
||||
r#"
|
||||
[mcp_servers.sentry]
|
||||
url = "https://mcp.sentry.dev/mcp"
|
||||
enabled = false
|
||||
"#,
|
||||
)
|
||||
.await?;
|
||||
make_config_for_test(&codex_home, &project_root, TrustLevel::Trusted, None).await?;
|
||||
|
||||
let config = ConfigBuilder::default()
|
||||
.codex_home(codex_home)
|
||||
.cli_overrides(vec![(
|
||||
"mcp_servers.sentry.enabled".to_string(),
|
||||
TomlValue::Boolean(true),
|
||||
)])
|
||||
.fallback_cwd(Some(nested))
|
||||
.build()
|
||||
.await?;
|
||||
|
||||
let server = config
|
||||
.mcp_servers
|
||||
.get()
|
||||
.get("sentry")
|
||||
.expect("trusted project MCP server should load");
|
||||
assert!(server.enabled);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn cli_override_for_disabled_project_local_mcp_server_returns_invalid_transport()
|
||||
-> std::io::Result<()> {
|
||||
let tmp = tempdir()?;
|
||||
let project_root = tmp.path().join("project");
|
||||
let nested = project_root.join("child");
|
||||
let dot_codex = project_root.join(".codex");
|
||||
let codex_home = tmp.path().join("home");
|
||||
tokio::fs::create_dir_all(&nested).await?;
|
||||
tokio::fs::create_dir_all(&dot_codex).await?;
|
||||
tokio::fs::create_dir_all(&codex_home).await?;
|
||||
tokio::fs::write(project_root.join(".git"), "gitdir: here").await?;
|
||||
tokio::fs::write(
|
||||
dot_codex.join(CONFIG_TOML_FILE),
|
||||
r#"
|
||||
[mcp_servers.sentry]
|
||||
url = "https://mcp.sentry.dev/mcp"
|
||||
enabled = false
|
||||
"#,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let err = ConfigBuilder::default()
|
||||
.codex_home(codex_home)
|
||||
.cli_overrides(vec![(
|
||||
"mcp_servers.sentry.enabled".to_string(),
|
||||
TomlValue::Boolean(true),
|
||||
)])
|
||||
.fallback_cwd(Some(nested))
|
||||
.build()
|
||||
.await
|
||||
.expect_err("untrusted project layer should not provide MCP transport");
|
||||
|
||||
assert!(
|
||||
err.to_string().contains("invalid transport")
|
||||
&& err.to_string().contains("mcp_servers.sentry"),
|
||||
"unexpected error: {err}"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn invalid_project_config_ignored_when_untrusted_or_unknown() -> std::io::Result<()> {
|
||||
let tmp = tempdir()?;
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
pub mod config;
|
||||
pub mod metrics;
|
||||
pub mod otel_provider;
|
||||
pub mod trace_context;
|
||||
pub mod traces;
|
||||
|
||||
mod otlp;
|
||||
@@ -23,6 +24,11 @@ use tracing::debug;
|
||||
|
||||
pub use crate::metrics::runtime_metrics::RuntimeMetricTotals;
|
||||
pub use crate::metrics::runtime_metrics::RuntimeMetricsSummary;
|
||||
pub use crate::otel_provider::traceparent_context_from_env;
|
||||
pub use crate::trace_context::context_from_w3c_trace_context;
|
||||
pub use crate::trace_context::current_span_w3c_trace_context;
|
||||
pub use crate::trace_context::set_parent_from_context;
|
||||
pub use crate::trace_context::set_parent_from_w3c_trace_context;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Display)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
|
||||
@@ -3,13 +3,12 @@ use crate::config::OtelHttpProtocol;
|
||||
use crate::config::OtelSettings;
|
||||
use crate::metrics::MetricsClient;
|
||||
use crate::metrics::MetricsConfig;
|
||||
use crate::trace_context::context_from_trace_headers;
|
||||
use gethostname::gethostname;
|
||||
use opentelemetry::Context;
|
||||
use opentelemetry::KeyValue;
|
||||
use opentelemetry::context::ContextGuard;
|
||||
use opentelemetry::global;
|
||||
use opentelemetry::propagation::TextMapPropagator;
|
||||
use opentelemetry::trace::TraceContextExt;
|
||||
use opentelemetry::trace::TracerProvider as _;
|
||||
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
|
||||
use opentelemetry_otlp::LogExporter;
|
||||
@@ -30,7 +29,6 @@ use opentelemetry_sdk::trace::SdkTracerProvider;
|
||||
use opentelemetry_sdk::trace::Tracer;
|
||||
use opentelemetry_semantic_conventions as semconv;
|
||||
use std::cell::RefCell;
|
||||
use std::collections::HashMap;
|
||||
use std::env;
|
||||
use std::error::Error;
|
||||
use std::sync::OnceLock;
|
||||
@@ -172,7 +170,7 @@ impl Drop for OtelProvider {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn traceparent_context_from_env() -> Option<Context> {
|
||||
pub fn traceparent_context_from_env() -> Option<Context> {
|
||||
TRACEPARENT_CONTEXT
|
||||
.get_or_init(load_traceparent_context)
|
||||
.clone()
|
||||
@@ -194,7 +192,7 @@ fn load_traceparent_context() -> Option<Context> {
|
||||
let traceparent = env::var(TRACEPARENT_ENV_VAR).ok()?;
|
||||
let tracestate = env::var(TRACESTATE_ENV_VAR).ok();
|
||||
|
||||
match extract_traceparent_context(traceparent, tracestate) {
|
||||
match context_from_trace_headers(Some(&traceparent), tracestate.as_deref()) {
|
||||
Some(context) => {
|
||||
debug!("TRACEPARENT detected; continuing trace from parent context");
|
||||
Some(context)
|
||||
@@ -206,22 +204,6 @@ fn load_traceparent_context() -> Option<Context> {
|
||||
}
|
||||
}
|
||||
|
||||
fn extract_traceparent_context(traceparent: String, tracestate: Option<String>) -> Option<Context> {
|
||||
let mut headers = HashMap::new();
|
||||
headers.insert("traceparent".to_string(), traceparent);
|
||||
if let Some(tracestate) = tracestate {
|
||||
headers.insert("tracestate".to_string(), tracestate);
|
||||
}
|
||||
|
||||
let context = TraceContextPropagator::new().extract(&headers);
|
||||
let span = context.span();
|
||||
let span_context = span.span_context();
|
||||
if !span_context.is_valid() {
|
||||
return None;
|
||||
}
|
||||
Some(context)
|
||||
}
|
||||
|
||||
fn make_resource(settings: &OtelSettings) -> Resource {
|
||||
Resource::builder()
|
||||
.with_service_name(settings.service_name.clone())
|
||||
@@ -407,8 +389,9 @@ mod tests {
|
||||
fn parses_valid_traceparent() {
|
||||
let trace_id = "00000000000000000000000000000001";
|
||||
let span_id = "0000000000000002";
|
||||
let context = extract_traceparent_context(format!("00-{trace_id}-{span_id}-01"), None)
|
||||
.expect("trace context");
|
||||
let context =
|
||||
context_from_trace_headers(Some(&format!("00-{trace_id}-{span_id}-01")), None)
|
||||
.expect("trace context");
|
||||
let span = context.span();
|
||||
let span_context = span.span_context();
|
||||
assert_eq!(
|
||||
@@ -421,7 +404,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn invalid_traceparent_returns_none() {
|
||||
assert!(extract_traceparent_context("not-a-traceparent".to_string(), None).is_none());
|
||||
assert!(context_from_trace_headers(Some("not-a-traceparent"), None).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
106
codex-rs/otel/src/trace_context.rs
Normal file
106
codex-rs/otel/src/trace_context.rs
Normal file
@@ -0,0 +1,106 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use codex_protocol::protocol::W3cTraceContext;
|
||||
use opentelemetry::Context;
|
||||
use opentelemetry::propagation::TextMapPropagator;
|
||||
use opentelemetry::trace::TraceContextExt;
|
||||
use opentelemetry_sdk::propagation::TraceContextPropagator;
|
||||
use tracing::Span;
|
||||
use tracing_opentelemetry::OpenTelemetrySpanExt;
|
||||
|
||||
pub fn current_span_w3c_trace_context() -> Option<W3cTraceContext> {
|
||||
let context = Span::current().context();
|
||||
if !context.span().span_context().is_valid() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let mut headers = HashMap::new();
|
||||
TraceContextPropagator::new().inject_context(&context, &mut headers);
|
||||
|
||||
Some(W3cTraceContext {
|
||||
traceparent: headers.remove("traceparent"),
|
||||
tracestate: headers.remove("tracestate"),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn context_from_w3c_trace_context(trace: &W3cTraceContext) -> Option<Context> {
|
||||
context_from_trace_headers(trace.traceparent.as_deref(), trace.tracestate.as_deref())
|
||||
}
|
||||
|
||||
pub fn set_parent_from_w3c_trace_context(span: &Span, trace: &W3cTraceContext) -> bool {
|
||||
if let Some(context) = context_from_w3c_trace_context(trace) {
|
||||
set_parent_from_context(span, context);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_parent_from_context(span: &Span, context: Context) {
|
||||
let _ = span.set_parent(context);
|
||||
}
|
||||
|
||||
pub(crate) fn context_from_trace_headers(
|
||||
traceparent: Option<&str>,
|
||||
tracestate: Option<&str>,
|
||||
) -> Option<Context> {
|
||||
let traceparent = traceparent?;
|
||||
let mut headers = HashMap::new();
|
||||
headers.insert("traceparent".to_string(), traceparent.to_string());
|
||||
if let Some(tracestate) = tracestate {
|
||||
headers.insert("tracestate".to_string(), tracestate.to_string());
|
||||
}
|
||||
|
||||
let context = TraceContextPropagator::new().extract(&headers);
|
||||
if !context.span().span_context().is_valid() {
|
||||
return None;
|
||||
}
|
||||
Some(context)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::context_from_trace_headers;
|
||||
use super::context_from_w3c_trace_context;
|
||||
use codex_protocol::protocol::W3cTraceContext;
|
||||
use opentelemetry::trace::SpanId;
|
||||
use opentelemetry::trace::TraceContextExt;
|
||||
use opentelemetry::trace::TraceId;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
fn parses_valid_w3c_trace_context() {
|
||||
let trace_id = "00000000000000000000000000000001";
|
||||
let span_id = "0000000000000002";
|
||||
let context = context_from_w3c_trace_context(&W3cTraceContext {
|
||||
traceparent: Some(format!("00-{trace_id}-{span_id}-01")),
|
||||
tracestate: None,
|
||||
})
|
||||
.expect("trace context");
|
||||
|
||||
let span = context.span();
|
||||
let span_context = span.span_context();
|
||||
assert_eq!(
|
||||
span_context.trace_id(),
|
||||
TraceId::from_hex(trace_id).unwrap()
|
||||
);
|
||||
assert_eq!(span_context.span_id(), SpanId::from_hex(span_id).unwrap());
|
||||
assert!(span_context.is_remote());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn invalid_traceparent_returns_none() {
|
||||
assert!(context_from_trace_headers(Some("not-a-traceparent"), None).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn missing_traceparent_returns_none() {
|
||||
assert!(
|
||||
context_from_w3c_trace_context(&W3cTraceContext {
|
||||
traceparent: None,
|
||||
tracestate: Some("vendor=value".to_string()),
|
||||
})
|
||||
.is_none()
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -83,6 +83,16 @@ pub struct Submission {
|
||||
pub op: Op,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)]
|
||||
pub struct W3cTraceContext {
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
#[ts(optional)]
|
||||
pub traceparent: Option<String>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
#[ts(optional)]
|
||||
pub tracestate: Option<String>,
|
||||
}
|
||||
|
||||
/// Config payload for refreshing MCP servers.
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, JsonSchema)]
|
||||
pub struct McpServerRefreshConfig {
|
||||
|
||||
@@ -20,7 +20,7 @@ decision to the shell-escalation protocol over a shared file descriptor (specifi
|
||||
We carry a small patch to `execute_cmd.c` (see `patches/bash-exec-wrapper.patch`) that adds support for `EXEC_WRAPPER`. The original commit message is “add support for BASH_EXEC_WRAPPER” and the patch applies cleanly to `a8a1c2fac029404d3f42cd39f5a20f24b6e4fe4b` from https://github.com/bminor/bash. To rebuild manually:
|
||||
|
||||
```bash
|
||||
git clone https://github.com/bminor/bash
|
||||
git clone https://git.savannah.gnu.org/git/bash
|
||||
git checkout a8a1c2fac029404d3f42cd39f5a20f24b6e4fe4b
|
||||
git apply /path/to/patches/bash-exec-wrapper.patch
|
||||
./configure --without-bash-malloc
|
||||
|
||||
Reference in New Issue
Block a user