mirror of
https://github.com/openai/codex.git
synced 2026-02-02 15:03:38 +00:00
Compare commits
18 Commits
centralize
...
otel-traci
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d958b66179 | ||
|
|
a51cf9266d | ||
|
|
6bddf0369e | ||
|
|
b14804a560 | ||
|
|
544024478a | ||
|
|
d5ec436539 | ||
|
|
dbe7529cd4 | ||
|
|
6b6ccd6223 | ||
|
|
d1ea58e87e | ||
|
|
089e367814 | ||
|
|
4ac336bbc6 | ||
|
|
4e3c4277a4 | ||
|
|
2d200792d3 | ||
|
|
b58de1494c | ||
|
|
148dda758f | ||
|
|
ad78dce1d9 | ||
|
|
bc19cf26d3 | ||
|
|
4c9c63d725 |
472
codex-rs/Cargo.lock
generated
472
codex-rs/Cargo.lock
generated
@@ -78,12 +78,6 @@ version = "0.2.21"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923"
|
||||
|
||||
[[package]]
|
||||
name = "android-tzdata"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0"
|
||||
|
||||
[[package]]
|
||||
name = "android_system_properties"
|
||||
version = "0.1.5"
|
||||
@@ -316,6 +310,17 @@ dependencies = [
|
||||
"syn 2.0.104",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-trait"
|
||||
version = "0.1.89"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.104",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "atomic-waker"
|
||||
version = "1.1.2"
|
||||
@@ -328,6 +333,51 @@ version = "1.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
|
||||
|
||||
[[package]]
|
||||
name = "axum"
|
||||
version = "0.8.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "021e862c184ae977658b36c4500f7feac3221ca5da43e3f25bd04ab6c79a29b5"
|
||||
dependencies = [
|
||||
"axum-core",
|
||||
"bytes",
|
||||
"futures-util",
|
||||
"http",
|
||||
"http-body",
|
||||
"http-body-util",
|
||||
"itoa",
|
||||
"matchit",
|
||||
"memchr",
|
||||
"mime",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"rustversion",
|
||||
"serde",
|
||||
"sync_wrapper",
|
||||
"tower",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "axum-core"
|
||||
version = "0.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "68464cd0412f486726fb3373129ef5d2993f90c34bc2bc1c1e9943b2f4fc7ca6"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-core",
|
||||
"http",
|
||||
"http-body",
|
||||
"http-body-util",
|
||||
"mime",
|
||||
"pin-project-lite",
|
||||
"rustversion",
|
||||
"sync_wrapper",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "backtrace"
|
||||
version = "0.3.75"
|
||||
@@ -484,18 +534,23 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e"
|
||||
|
||||
[[package]]
|
||||
name = "chrono"
|
||||
version = "0.4.41"
|
||||
name = "cfg_aliases"
|
||||
version = "0.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d"
|
||||
checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
|
||||
|
||||
[[package]]
|
||||
name = "chrono"
|
||||
version = "0.4.42"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "145052bdd345b87320e369255277e3fb5152762ad123a901ef5c262dd38fe8d2"
|
||||
dependencies = [
|
||||
"android-tzdata",
|
||||
"iana-time-zone",
|
||||
"js-sys",
|
||||
"num-traits",
|
||||
"serde",
|
||||
"wasm-bindgen",
|
||||
"windows-link",
|
||||
"windows-link 0.2.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -672,6 +727,7 @@ dependencies = [
|
||||
"codex-apply-patch",
|
||||
"codex-file-search",
|
||||
"codex-mcp-client",
|
||||
"codex-otel",
|
||||
"codex-protocol",
|
||||
"core_test_support",
|
||||
"dirs",
|
||||
@@ -706,6 +762,7 @@ dependencies = [
|
||||
"toml",
|
||||
"toml_edit",
|
||||
"tracing",
|
||||
"tracing-test",
|
||||
"tree-sitter",
|
||||
"tree-sitter-bash",
|
||||
"uuid",
|
||||
@@ -730,6 +787,7 @@ dependencies = [
|
||||
"codex-protocol",
|
||||
"core_test_support",
|
||||
"libc",
|
||||
"opentelemetry-appender-tracing",
|
||||
"owo-colors",
|
||||
"predicates",
|
||||
"serde_json",
|
||||
@@ -867,6 +925,23 @@ dependencies = [
|
||||
"wiremock",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "codex-otel"
|
||||
version = "0.0.0"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"codex-protocol",
|
||||
"opentelemetry",
|
||||
"opentelemetry-otlp",
|
||||
"opentelemetry-semantic-conventions",
|
||||
"opentelemetry_sdk",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"strum_macros 0.27.2",
|
||||
"tonic",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "codex-protocol"
|
||||
version = "0.0.0"
|
||||
@@ -929,6 +1004,7 @@ dependencies = [
|
||||
"libc",
|
||||
"mcp-types",
|
||||
"once_cell",
|
||||
"opentelemetry-appender-tracing",
|
||||
"owo-colors",
|
||||
"path-clean",
|
||||
"pathdiff",
|
||||
@@ -1881,8 +1957,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"js-sys",
|
||||
"libc",
|
||||
"wasi 0.11.1+wasi-snapshot-preview1",
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1892,9 +1970,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"js-sys",
|
||||
"libc",
|
||||
"r-efi",
|
||||
"wasi 0.14.2+wasi-0.2.4",
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2087,12 +2167,26 @@ dependencies = [
|
||||
"hyper",
|
||||
"hyper-util",
|
||||
"rustls",
|
||||
"rustls-native-certs",
|
||||
"rustls-pki-types",
|
||||
"tokio",
|
||||
"tokio-rustls",
|
||||
"tower-service",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hyper-timeout"
|
||||
version = "0.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0"
|
||||
dependencies = [
|
||||
"hyper",
|
||||
"hyper-util",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
"tower-service",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hyper-tls"
|
||||
version = "0.6.0"
|
||||
@@ -2127,7 +2221,7 @@ dependencies = [
|
||||
"libc",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"socket2",
|
||||
"socket2 0.6.0",
|
||||
"system-configuration",
|
||||
"tokio",
|
||||
"tower-service",
|
||||
@@ -2691,6 +2785,12 @@ dependencies = [
|
||||
"hashbrown 0.15.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lru-slab"
|
||||
version = "0.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154"
|
||||
|
||||
[[package]]
|
||||
name = "lsp-types"
|
||||
version = "0.94.1"
|
||||
@@ -2719,6 +2819,12 @@ dependencies = [
|
||||
"regex-automata",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "matchit"
|
||||
version = "0.8.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3"
|
||||
|
||||
[[package]]
|
||||
name = "mcp-types"
|
||||
version = "0.0.0"
|
||||
@@ -2836,7 +2942,7 @@ dependencies = [
|
||||
"openssl-probe",
|
||||
"openssl-sys",
|
||||
"schannel",
|
||||
"security-framework",
|
||||
"security-framework 2.11.1",
|
||||
"security-framework-sys",
|
||||
"tempfile",
|
||||
]
|
||||
@@ -2870,7 +2976,7 @@ checksum = "ab2156c4fce2f8df6c499cc1c763e4394b7482525bf2a9701c9d79d215f519e4"
|
||||
dependencies = [
|
||||
"bitflags 2.9.1",
|
||||
"cfg-if",
|
||||
"cfg_aliases",
|
||||
"cfg_aliases 0.1.1",
|
||||
"libc",
|
||||
]
|
||||
|
||||
@@ -3110,6 +3216,104 @@ dependencies = [
|
||||
"vcpkg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry"
|
||||
version = "0.30.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "aaf416e4cb72756655126f7dd7bb0af49c674f4c1b9903e80c009e0c37e552e6"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-sink",
|
||||
"js-sys",
|
||||
"pin-project-lite",
|
||||
"thiserror 2.0.16",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry-appender-tracing"
|
||||
version = "0.30.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e68f63eca5fad47e570e00e893094fc17be959c80c79a7d6ec1abdd5ae6ffc16"
|
||||
dependencies = [
|
||||
"opentelemetry",
|
||||
"tracing",
|
||||
"tracing-core",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry-http"
|
||||
version = "0.30.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "50f6639e842a97dbea8886e3439710ae463120091e2e064518ba8e716e6ac36d"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
"http",
|
||||
"opentelemetry",
|
||||
"reqwest",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry-otlp"
|
||||
version = "0.30.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dbee664a43e07615731afc539ca60c6d9f1a9425e25ca09c57bc36c87c55852b"
|
||||
dependencies = [
|
||||
"http",
|
||||
"opentelemetry",
|
||||
"opentelemetry-http",
|
||||
"opentelemetry-proto",
|
||||
"opentelemetry_sdk",
|
||||
"prost",
|
||||
"reqwest",
|
||||
"serde_json",
|
||||
"thiserror 2.0.16",
|
||||
"tokio",
|
||||
"tonic",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry-proto"
|
||||
version = "0.30.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2e046fd7660710fe5a05e8748e70d9058dc15c94ba914e7c4faa7c728f0e8ddc"
|
||||
dependencies = [
|
||||
"base64",
|
||||
"hex",
|
||||
"opentelemetry",
|
||||
"opentelemetry_sdk",
|
||||
"prost",
|
||||
"serde",
|
||||
"tonic",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry-semantic-conventions"
|
||||
version = "0.30.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "83d059a296a47436748557a353c5e6c5705b9470ef6c95cfc52c21a8814ddac2"
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry_sdk"
|
||||
version = "0.30.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "11f644aa9e5e31d11896e024305d7e3c98a88884d9f8919dbf37a9991bc47a4b"
|
||||
dependencies = [
|
||||
"futures-channel",
|
||||
"futures-executor",
|
||||
"futures-util",
|
||||
"opentelemetry",
|
||||
"percent-encoding",
|
||||
"rand 0.9.2",
|
||||
"serde_json",
|
||||
"thiserror 2.0.16",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "option-ext"
|
||||
version = "0.2.0"
|
||||
@@ -3224,6 +3428,26 @@ dependencies = [
|
||||
"siphasher",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pin-project"
|
||||
version = "1.1.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a"
|
||||
dependencies = [
|
||||
"pin-project-internal",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pin-project-internal"
|
||||
version = "1.1.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.104",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pin-project-lite"
|
||||
version = "0.2.16"
|
||||
@@ -3384,6 +3608,29 @@ dependencies = [
|
||||
"unicode-ident",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost"
|
||||
version = "0.13.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"prost-derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prost-derive"
|
||||
version = "0.13.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"itertools 0.14.0",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.104",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pulldown-cmark"
|
||||
version = "0.10.3"
|
||||
@@ -3427,6 +3674,61 @@ dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quinn"
|
||||
version = "0.11.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "626214629cda6781b6dc1d316ba307189c85ba657213ce642d9c77670f8202c8"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"cfg_aliases 0.2.1",
|
||||
"pin-project-lite",
|
||||
"quinn-proto",
|
||||
"quinn-udp",
|
||||
"rustc-hash",
|
||||
"rustls",
|
||||
"socket2 0.5.10",
|
||||
"thiserror 2.0.16",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"web-time",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quinn-proto"
|
||||
version = "0.11.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "49df843a9161c85bb8aae55f101bc0bac8bcafd637a620d9122fd7e0b2f7422e"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"getrandom 0.3.3",
|
||||
"lru-slab",
|
||||
"rand 0.9.2",
|
||||
"ring",
|
||||
"rustc-hash",
|
||||
"rustls",
|
||||
"rustls-pki-types",
|
||||
"slab",
|
||||
"thiserror 2.0.16",
|
||||
"tinyvec",
|
||||
"tracing",
|
||||
"web-time",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quinn-udp"
|
||||
version = "0.5.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fcebb1209ee276352ef14ff8732e24cc2b02bbac986cd74a4c81bcb2f9881970"
|
||||
dependencies = [
|
||||
"cfg_aliases 0.2.1",
|
||||
"libc",
|
||||
"once_cell",
|
||||
"socket2 0.5.10",
|
||||
"tracing",
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quote"
|
||||
version = "1.0.40"
|
||||
@@ -3649,6 +3951,9 @@ dependencies = [
|
||||
"native-tls",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"quinn",
|
||||
"rustls",
|
||||
"rustls-native-certs",
|
||||
"rustls-pki-types",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -3656,6 +3961,7 @@ dependencies = [
|
||||
"sync_wrapper",
|
||||
"tokio",
|
||||
"tokio-native-tls",
|
||||
"tokio-rustls",
|
||||
"tokio-util",
|
||||
"tower",
|
||||
"tower-http",
|
||||
@@ -3687,6 +3993,12 @@ version = "0.1.25"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "989e6739f80c4ad5b13e0fd7fe89531180375b18520cc8c82080e4dc4035b84f"
|
||||
|
||||
[[package]]
|
||||
name = "rustc-hash"
|
||||
version = "2.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
|
||||
|
||||
[[package]]
|
||||
name = "rustix"
|
||||
version = "0.38.44"
|
||||
@@ -3720,18 +4032,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2491382039b29b9b11ff08b76ff6c97cf287671dbb74f0be44bda389fffe9bd1"
|
||||
dependencies = [
|
||||
"once_cell",
|
||||
"ring",
|
||||
"rustls-pki-types",
|
||||
"rustls-webpki",
|
||||
"subtle",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-native-certs"
|
||||
version = "0.8.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7fcff2dd52b58a8d98a70243663a0d234c4e2b79235637849d15913394a247d3"
|
||||
dependencies = [
|
||||
"openssl-probe",
|
||||
"rustls-pki-types",
|
||||
"schannel",
|
||||
"security-framework 3.3.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-pki-types"
|
||||
version = "1.12.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "229a4a4c221013e7e1f1a043678c5cc39fe5171437c88fb47151a21e6f5b5c79"
|
||||
dependencies = [
|
||||
"web-time",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
@@ -3916,6 +4242,19 @@ dependencies = [
|
||||
"security-framework-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "security-framework"
|
||||
version = "3.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "80fb1d92c5028aa318b4b8bd7302a5bfcf48be96a37fc6fc790f806b0004ee0c"
|
||||
dependencies = [
|
||||
"bitflags 2.9.1",
|
||||
"core-foundation 0.10.1",
|
||||
"core-foundation-sys",
|
||||
"libc",
|
||||
"security-framework-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "security-framework-sys"
|
||||
version = "2.14.0"
|
||||
@@ -4181,6 +4520,16 @@ version = "0.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b7c388c1b5e93756d0c740965c41e8822f866621d41acbdf6336a6a168f8840c"
|
||||
|
||||
[[package]]
|
||||
name = "socket2"
|
||||
version = "0.5.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "socket2"
|
||||
version = "0.6.0"
|
||||
@@ -4646,6 +4995,21 @@ dependencies = [
|
||||
"zerovec",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tinyvec"
|
||||
version = "1.10.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bfa5fdc3bce6191a1dbc8c02d5c8bffcf557bafa17c124c5264a458f1b0613fa"
|
||||
dependencies = [
|
||||
"tinyvec_macros",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tinyvec_macros"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
|
||||
|
||||
[[package]]
|
||||
name = "tokio"
|
||||
version = "1.47.1"
|
||||
@@ -4661,7 +5025,7 @@ dependencies = [
|
||||
"pin-project-lite",
|
||||
"signal-hook-registry",
|
||||
"slab",
|
||||
"socket2",
|
||||
"socket2 0.6.0",
|
||||
"tokio-macros",
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
@@ -4786,6 +5150,35 @@ version = "1.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fcc842091f2def52017664b53082ecbbeb5c7731092bad69d2c63050401dfd64"
|
||||
|
||||
[[package]]
|
||||
name = "tonic"
|
||||
version = "0.13.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"axum",
|
||||
"base64",
|
||||
"bytes",
|
||||
"h2",
|
||||
"http",
|
||||
"http-body",
|
||||
"http-body-util",
|
||||
"hyper",
|
||||
"hyper-timeout",
|
||||
"hyper-util",
|
||||
"percent-encoding",
|
||||
"pin-project",
|
||||
"prost",
|
||||
"socket2 0.5.10",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tower",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tower"
|
||||
version = "0.5.2"
|
||||
@@ -4794,11 +5187,15 @@ checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"indexmap 2.10.0",
|
||||
"pin-project-lite",
|
||||
"slab",
|
||||
"sync_wrapper",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4915,6 +5312,27 @@ dependencies = [
|
||||
"tracing-log",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-test"
|
||||
version = "0.2.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68"
|
||||
dependencies = [
|
||||
"tracing-core",
|
||||
"tracing-subscriber",
|
||||
"tracing-test-macro",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-test-macro"
|
||||
version = "0.2.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568"
|
||||
dependencies = [
|
||||
"quote",
|
||||
"syn 2.0.104",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tree-sitter"
|
||||
version = "0.25.9"
|
||||
@@ -5258,6 +5676,16 @@ dependencies = [
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "web-time"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb"
|
||||
dependencies = [
|
||||
"js-sys",
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "webbrowser"
|
||||
version = "1.0.5"
|
||||
@@ -5337,7 +5765,7 @@ checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3"
|
||||
dependencies = [
|
||||
"windows-implement",
|
||||
"windows-interface",
|
||||
"windows-link",
|
||||
"windows-link 0.1.3",
|
||||
"windows-result",
|
||||
"windows-strings",
|
||||
]
|
||||
@@ -5370,13 +5798,19 @@ version = "0.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a"
|
||||
|
||||
[[package]]
|
||||
name = "windows-link"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "45e46c0661abb7180e7b9c281db115305d49ca1709ab8242adf09666d2173c65"
|
||||
|
||||
[[package]]
|
||||
name = "windows-registry"
|
||||
version = "0.5.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5b8a9ed28765efc97bbc954883f4e6796c33a06546ebafacbabee9696967499e"
|
||||
dependencies = [
|
||||
"windows-link",
|
||||
"windows-link 0.1.3",
|
||||
"windows-result",
|
||||
"windows-strings",
|
||||
]
|
||||
@@ -5387,7 +5821,7 @@ version = "0.3.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6"
|
||||
dependencies = [
|
||||
"windows-link",
|
||||
"windows-link 0.1.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -5396,7 +5830,7 @@ version = "0.4.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57"
|
||||
dependencies = [
|
||||
"windows-link",
|
||||
"windows-link 0.1.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -17,6 +17,7 @@ members = [
|
||||
"ollama",
|
||||
"protocol",
|
||||
"protocol-ts",
|
||||
"otel",
|
||||
"tui",
|
||||
]
|
||||
resolver = "2"
|
||||
|
||||
@@ -22,6 +22,7 @@ codex-apply-patch = { path = "../apply-patch" }
|
||||
codex-file-search = { path = "../file-search" }
|
||||
codex-mcp-client = { path = "../mcp-client" }
|
||||
codex-protocol = { path = "../protocol" }
|
||||
codex-otel = { path = "../otel", features = ["otel"] }
|
||||
dirs = "6"
|
||||
env-flags = "0.1.1"
|
||||
eventsource-stream = "0.2.3"
|
||||
@@ -87,6 +88,7 @@ tempfile = "3"
|
||||
tokio-test = "0.4"
|
||||
walkdir = "2.5.0"
|
||||
wiremock = "0.6"
|
||||
tracing-test = { version = "0.2.5", features = ["no-env-filter"] }
|
||||
|
||||
[package.metadata.cargo-shear]
|
||||
ignored = ["openssl-sys"]
|
||||
|
||||
@@ -52,12 +52,13 @@ pub(crate) async fn apply_patch(
|
||||
&turn_context.sandbox_policy,
|
||||
&turn_context.cwd,
|
||||
) {
|
||||
SafetyCheck::AutoApprove { .. } => {
|
||||
InternalApplyPatchInvocation::DelegateToExec(ApplyPatchExec {
|
||||
action,
|
||||
user_explicitly_approved_this_action: false,
|
||||
})
|
||||
}
|
||||
SafetyCheck::AutoApprove {
|
||||
user_explicitly_approved,
|
||||
..
|
||||
} => InternalApplyPatchInvocation::DelegateToExec(ApplyPatchExec {
|
||||
action,
|
||||
user_explicitly_approved_this_action: user_explicitly_approved,
|
||||
}),
|
||||
SafetyCheck::AskUser => {
|
||||
// Compute a readable summary of path changes to include in the
|
||||
// approval request so the user can make an informed decision.
|
||||
|
||||
@@ -1,6 +1,20 @@
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
use crate::ModelProviderInfo;
|
||||
use crate::client_common::Prompt;
|
||||
use crate::client_common::ResponseEvent;
|
||||
use crate::client_common::ResponseStream;
|
||||
use crate::error::CodexErr;
|
||||
use crate::error::Result;
|
||||
use crate::model_family::ModelFamily;
|
||||
use crate::openai_tools::create_tools_json_for_chat_completions_api;
|
||||
use crate::util::backoff;
|
||||
use bytes::Bytes;
|
||||
use codex_otel::otel_event_manager::OtelEventManager;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ReasoningItemContent;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use eventsource_stream::Eventsource;
|
||||
use futures::Stream;
|
||||
use futures::StreamExt;
|
||||
@@ -15,25 +29,13 @@ use tokio::time::timeout;
|
||||
use tracing::debug;
|
||||
use tracing::trace;
|
||||
|
||||
use crate::ModelProviderInfo;
|
||||
use crate::client_common::Prompt;
|
||||
use crate::client_common::ResponseEvent;
|
||||
use crate::client_common::ResponseStream;
|
||||
use crate::error::CodexErr;
|
||||
use crate::error::Result;
|
||||
use crate::model_family::ModelFamily;
|
||||
use crate::openai_tools::create_tools_json_for_chat_completions_api;
|
||||
use crate::util::backoff;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ReasoningItemContent;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
|
||||
/// Implementation for the classic Chat Completions API.
|
||||
pub(crate) async fn stream_chat_completions(
|
||||
prompt: &Prompt,
|
||||
model_family: &ModelFamily,
|
||||
client: &reqwest::Client,
|
||||
provider: &ModelProviderInfo,
|
||||
otel_event_manager: &OtelEventManager,
|
||||
) -> Result<ResponseStream> {
|
||||
// Build messages array
|
||||
let mut messages = Vec::<serde_json::Value>::new();
|
||||
@@ -288,12 +290,30 @@ pub(crate) async fn stream_chat_completions(
|
||||
|
||||
let req_builder = provider.create_request_builder(client, &None).await?;
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
let res = req_builder
|
||||
.header(reqwest::header::ACCEPT, "text/event-stream")
|
||||
.json(&payload)
|
||||
.send()
|
||||
.await;
|
||||
|
||||
let cf_ray = if let Ok(resp) = &res {
|
||||
let cf_ray = resp
|
||||
.headers()
|
||||
.get("cf-ray")
|
||||
.map(|v| v.to_str().unwrap_or_default())
|
||||
.unwrap_or_default()
|
||||
.to_string();
|
||||
|
||||
trace!("Response status: {}, cf-ray: {}", resp.status(), cf_ray);
|
||||
Some(cf_ray)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
otel_event_manager.request(cf_ray, attempt, start.elapsed(), &res);
|
||||
|
||||
match res {
|
||||
Ok(resp) if resp.status().is_success() => {
|
||||
let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent>>(1600);
|
||||
@@ -302,6 +322,7 @@ pub(crate) async fn stream_chat_completions(
|
||||
stream,
|
||||
tx_event,
|
||||
provider.stream_idle_timeout(),
|
||||
otel_event_manager.clone(),
|
||||
));
|
||||
return Ok(ResponseStream { rx_event });
|
||||
}
|
||||
@@ -345,6 +366,7 @@ async fn process_chat_sse<S>(
|
||||
stream: S,
|
||||
tx_event: mpsc::Sender<Result<ResponseEvent>>,
|
||||
idle_timeout: Duration,
|
||||
otel_event_manager: OtelEventManager,
|
||||
) where
|
||||
S: Stream<Item = Result<Bytes>> + Unpin,
|
||||
{
|
||||
@@ -368,12 +390,13 @@ async fn process_chat_sse<S>(
|
||||
let mut reasoning_text = String::new();
|
||||
|
||||
loop {
|
||||
let start = Instant::now();
|
||||
let sse = match timeout(idle_timeout, stream.next()).await {
|
||||
Ok(Some(Ok(ev))) => ev,
|
||||
Ok(Some(Err(e))) => {
|
||||
let _ = tx_event
|
||||
.send(Err(CodexErr::Stream(e.to_string(), None)))
|
||||
.await;
|
||||
let error = e.to_string();
|
||||
otel_event_manager.sse_event_failed(None, start.elapsed(), error.as_str());
|
||||
let _ = tx_event.send(Err(CodexErr::Stream(error, None))).await;
|
||||
return;
|
||||
}
|
||||
Ok(None) => {
|
||||
@@ -387,11 +410,10 @@ async fn process_chat_sse<S>(
|
||||
return;
|
||||
}
|
||||
Err(_) => {
|
||||
let error = "idle timeout waiting for SSE";
|
||||
otel_event_manager.sse_event_failed(None, start.elapsed(), error);
|
||||
let _ = tx_event
|
||||
.send(Err(CodexErr::Stream(
|
||||
"idle timeout waiting for SSE".into(),
|
||||
None,
|
||||
)))
|
||||
.send(Err(CodexErr::Stream(error.into(), None)))
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
@@ -399,6 +421,7 @@ async fn process_chat_sse<S>(
|
||||
|
||||
// OpenAI Chat streaming sends a literal string "[DONE]" when finished.
|
||||
if sse.data.trim() == "[DONE]" {
|
||||
otel_event_manager.sse_event(&sse.event, start.elapsed());
|
||||
// Emit any finalized items before closing so downstream consumers receive
|
||||
// terminal events for both assistant content and raw reasoning.
|
||||
if !assistant_text.is_empty() {
|
||||
@@ -436,9 +459,14 @@ async fn process_chat_sse<S>(
|
||||
// Parse JSON chunk
|
||||
let chunk: serde_json::Value = match serde_json::from_str(&sse.data) {
|
||||
Ok(v) => v,
|
||||
Err(_) => continue,
|
||||
Err(e) => {
|
||||
let error = format!("Failed to parse SSE event: {e}, data: {}", &sse.data);
|
||||
otel_event_manager.sse_event_failed(None, start.elapsed(), error.as_str());
|
||||
continue;
|
||||
}
|
||||
};
|
||||
trace!("chat_completions received SSE chunk: {chunk:?}");
|
||||
otel_event_manager.sse_event(&sse.event, start.elapsed());
|
||||
|
||||
let choice_opt = chunk.get("choices").and_then(|c| c.get(0));
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ use std::io::BufRead;
|
||||
use std::path::Path;
|
||||
use std::sync::OnceLock;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
use crate::AuthManager;
|
||||
use bytes::Bytes;
|
||||
@@ -43,6 +44,7 @@ use crate::openai_tools::create_tools_json_for_responses_api;
|
||||
use crate::protocol::TokenUsage;
|
||||
use crate::token_data::PlanType;
|
||||
use crate::util::backoff;
|
||||
use codex_otel::otel_event_manager::OtelEventManager;
|
||||
use codex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig;
|
||||
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
@@ -69,6 +71,7 @@ struct Error {
|
||||
pub struct ModelClient {
|
||||
config: Arc<Config>,
|
||||
auth_manager: Option<Arc<AuthManager>>,
|
||||
otel_event_manager: OtelEventManager,
|
||||
client: reqwest::Client,
|
||||
provider: ModelProviderInfo,
|
||||
conversation_id: ConversationId,
|
||||
@@ -80,6 +83,7 @@ impl ModelClient {
|
||||
pub fn new(
|
||||
config: Arc<Config>,
|
||||
auth_manager: Option<Arc<AuthManager>>,
|
||||
otel_event_manager: OtelEventManager,
|
||||
provider: ModelProviderInfo,
|
||||
effort: Option<ReasoningEffortConfig>,
|
||||
summary: ReasoningSummaryConfig,
|
||||
@@ -90,6 +94,7 @@ impl ModelClient {
|
||||
Self {
|
||||
config,
|
||||
auth_manager,
|
||||
otel_event_manager,
|
||||
client,
|
||||
provider,
|
||||
conversation_id,
|
||||
@@ -123,6 +128,7 @@ impl ModelClient {
|
||||
&self.config.model_family,
|
||||
&self.client,
|
||||
&self.provider,
|
||||
&self.otel_event_manager,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -159,7 +165,12 @@ impl ModelClient {
|
||||
if let Some(path) = &*CODEX_RS_SSE_FIXTURE {
|
||||
// short circuit for tests
|
||||
warn!(path, "Streaming from fixture");
|
||||
return stream_from_fixture(path, self.provider.clone()).await;
|
||||
return stream_from_fixture(
|
||||
path,
|
||||
self.provider.clone(),
|
||||
self.otel_event_manager.clone(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
let auth_manager = self.auth_manager.clone();
|
||||
@@ -258,17 +269,25 @@ impl ModelClient {
|
||||
req_builder = req_builder.header("chatgpt-account-id", account_id);
|
||||
}
|
||||
|
||||
let start = Instant::now();
|
||||
let res = req_builder.send().await;
|
||||
if let Ok(resp) = &res {
|
||||
trace!(
|
||||
"Response status: {}, cf-ray: {}",
|
||||
resp.status(),
|
||||
resp.headers()
|
||||
.get("cf-ray")
|
||||
.map(|v| v.to_str().unwrap_or_default())
|
||||
.unwrap_or_default()
|
||||
);
|
||||
}
|
||||
|
||||
let cf_ray = if let Ok(resp) = &res {
|
||||
let cf_ray = resp
|
||||
.headers()
|
||||
.get("cf-ray")
|
||||
.map(|v| v.to_str().unwrap_or_default())
|
||||
.unwrap_or_default()
|
||||
.to_string();
|
||||
|
||||
trace!("Response status: {}, cf-ray: {}", resp.status(), cf_ray);
|
||||
Some(cf_ray)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
self.otel_event_manager
|
||||
.request(cf_ray, attempt, start.elapsed(), &res);
|
||||
|
||||
match res {
|
||||
Ok(resp) if resp.status().is_success() => {
|
||||
@@ -280,6 +299,7 @@ impl ModelClient {
|
||||
stream,
|
||||
tx_event,
|
||||
self.provider.stream_idle_timeout(),
|
||||
self.otel_event_manager.clone(),
|
||||
));
|
||||
|
||||
return Ok(ResponseStream { rx_event });
|
||||
@@ -366,6 +386,10 @@ impl ModelClient {
|
||||
self.provider.clone()
|
||||
}
|
||||
|
||||
pub fn get_otel_event_manager(&self) -> OtelEventManager {
|
||||
self.otel_event_manager.clone()
|
||||
}
|
||||
|
||||
/// Returns the currently configured model slug.
|
||||
pub fn get_model(&self) -> String {
|
||||
self.config.model.clone()
|
||||
@@ -389,6 +413,10 @@ impl ModelClient {
|
||||
pub fn get_auth_manager(&self) -> Option<Arc<AuthManager>> {
|
||||
self.auth_manager.clone()
|
||||
}
|
||||
|
||||
pub fn get_conversation_id(&self) -> ConversationId {
|
||||
self.conversation_id
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
@@ -477,6 +505,7 @@ async fn process_sse<S>(
|
||||
stream: S,
|
||||
tx_event: mpsc::Sender<Result<ResponseEvent>>,
|
||||
idle_timeout: Duration,
|
||||
otel_event_manager: OtelEventManager,
|
||||
) where
|
||||
S: Stream<Item = Result<Bytes>> + Unpin,
|
||||
{
|
||||
@@ -488,11 +517,14 @@ async fn process_sse<S>(
|
||||
let mut response_error: Option<CodexErr> = None;
|
||||
|
||||
loop {
|
||||
let start = Instant::now();
|
||||
let sse = match timeout(idle_timeout, stream.next()).await {
|
||||
Ok(Some(Ok(sse))) => sse,
|
||||
Ok(Some(Err(e))) => {
|
||||
debug!("SSE Error: {e:#}");
|
||||
let event = CodexErr::Stream(e.to_string(), None);
|
||||
let error = e.to_string();
|
||||
otel_event_manager.sse_event_failed(None, start.elapsed(), error.as_str());
|
||||
let event = CodexErr::Stream(error, None);
|
||||
let _ = tx_event.send(Err(event)).await;
|
||||
return;
|
||||
}
|
||||
@@ -502,6 +534,22 @@ async fn process_sse<S>(
|
||||
id: response_id,
|
||||
usage,
|
||||
}) => {
|
||||
if let Some(token_usage) = &usage {
|
||||
otel_event_manager.sse_event_completed(
|
||||
start.elapsed(),
|
||||
token_usage.input_tokens,
|
||||
token_usage.output_tokens,
|
||||
token_usage
|
||||
.input_tokens_details
|
||||
.as_ref()
|
||||
.map(|d| d.cached_tokens),
|
||||
token_usage
|
||||
.output_tokens_details
|
||||
.as_ref()
|
||||
.map(|d| d.reasoning_tokens),
|
||||
token_usage.total_tokens,
|
||||
);
|
||||
}
|
||||
let event = ResponseEvent::Completed {
|
||||
response_id,
|
||||
token_usage: usage.map(Into::into),
|
||||
@@ -509,22 +557,31 @@ async fn process_sse<S>(
|
||||
let _ = tx_event.send(Ok(event)).await;
|
||||
}
|
||||
None => {
|
||||
let _ = tx_event
|
||||
.send(Err(response_error.unwrap_or(CodexErr::Stream(
|
||||
"stream closed before response.completed".into(),
|
||||
let error = response_error.unwrap_or(CodexErr::Stream(
|
||||
"stream closed before response.completed".into(),
|
||||
None,
|
||||
));
|
||||
|
||||
if let CodexErr::Stream(message, _) = &error {
|
||||
otel_event_manager.sse_event_failed(
|
||||
None,
|
||||
))))
|
||||
.await;
|
||||
start.elapsed(),
|
||||
message.as_str(),
|
||||
);
|
||||
}
|
||||
|
||||
let _ = tx_event.send(Err(error)).await;
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
Err(_) => {
|
||||
let error = "idle timeout waiting for SSE";
|
||||
|
||||
otel_event_manager.sse_event_failed(None, start.elapsed(), error);
|
||||
|
||||
let _ = tx_event
|
||||
.send(Err(CodexErr::Stream(
|
||||
"idle timeout waiting for SSE".into(),
|
||||
None,
|
||||
)))
|
||||
.send(Err(CodexErr::Stream(error.into(), None)))
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
@@ -536,11 +593,15 @@ async fn process_sse<S>(
|
||||
let event: SseEvent = match serde_json::from_str(&sse.data) {
|
||||
Ok(event) => event,
|
||||
Err(e) => {
|
||||
debug!("Failed to parse SSE event: {e}, data: {}", &sse.data);
|
||||
let error = format!("Failed to parse SSE event: {e}, data: {}", &sse.data);
|
||||
otel_event_manager.sse_event_failed(None, start.elapsed(), error.as_str());
|
||||
debug!(error);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
otel_event_manager.sse_event(event.kind.as_str(), start.elapsed());
|
||||
|
||||
match event.kind.as_str() {
|
||||
// Individual output item finalised. Forward immediately so the
|
||||
// rest of the agent can stream assistant text/functions *live*
|
||||
@@ -561,9 +622,13 @@ async fn process_sse<S>(
|
||||
// The fix is to forward the incremental events *as they come* and
|
||||
// drop the duplicated list inside `response.completed`.
|
||||
"response.output_item.done" => {
|
||||
let Some(item_val) = event.item else { continue };
|
||||
let Some(item_val) = event.item else {
|
||||
continue
|
||||
};
|
||||
let Ok(item) = serde_json::from_value::<ResponseItem>(item_val) else {
|
||||
debug!("failed to parse ResponseItem from output_item.done");
|
||||
let error = "failed to parse ResponseItem from output_item.done";
|
||||
debug!(error);
|
||||
otel_event_manager.sse_event_failed(Some(event.kind), start.elapsed(), error);
|
||||
continue;
|
||||
};
|
||||
|
||||
@@ -603,8 +668,9 @@ async fn process_sse<S>(
|
||||
}
|
||||
"response.failed" => {
|
||||
if let Some(resp_val) = event.response {
|
||||
let error_message = "response.failed event received";
|
||||
response_error = Some(CodexErr::Stream(
|
||||
"response.failed event received".to_string(),
|
||||
error_message.to_string(),
|
||||
None,
|
||||
));
|
||||
|
||||
@@ -615,12 +681,17 @@ async fn process_sse<S>(
|
||||
Ok(error) => {
|
||||
let delay = try_parse_retry_after(&error);
|
||||
let message = error.message.unwrap_or_default();
|
||||
otel_event_manager.sse_event_failed(Some(event.kind), start.elapsed(), message.as_str());
|
||||
response_error = Some(CodexErr::Stream(message, delay));
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("failed to parse ErrorResponse: {e}");
|
||||
let error_message = format!("failed to parse ErrorResponse: {e}");
|
||||
debug!(error_message);
|
||||
otel_event_manager.sse_event_failed(Some(event.kind), start.elapsed(), error_message.as_str());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
otel_event_manager.sse_event_failed(Some(event.kind), start.elapsed(), error_message);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -632,7 +703,9 @@ async fn process_sse<S>(
|
||||
response_completed = Some(r);
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("failed to parse ResponseCompleted: {e}");
|
||||
let error_message = format!("failed to parse ResponseCompleted: {e}");
|
||||
debug!(error_message);
|
||||
otel_event_manager.sse_event_failed(Some(event.kind), start.elapsed(), error_message.as_str());
|
||||
continue;
|
||||
}
|
||||
};
|
||||
@@ -669,8 +742,10 @@ async fn process_sse<S>(
|
||||
return;
|
||||
}
|
||||
}
|
||||
"response.reasoning_summary_text.done" => {}
|
||||
_ => {}
|
||||
"response.reasoning_summary_text.done" => {
|
||||
}
|
||||
_ => {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -679,6 +754,7 @@ async fn process_sse<S>(
|
||||
async fn stream_from_fixture(
|
||||
path: impl AsRef<Path>,
|
||||
provider: ModelProviderInfo,
|
||||
otel_event_manager: OtelEventManager,
|
||||
) -> Result<ResponseStream> {
|
||||
let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent>>(1600);
|
||||
let f = std::fs::File::open(path.as_ref())?;
|
||||
@@ -697,6 +773,7 @@ async fn stream_from_fixture(
|
||||
stream,
|
||||
tx_event,
|
||||
provider.stream_idle_timeout(),
|
||||
otel_event_manager,
|
||||
));
|
||||
Ok(ResponseStream { rx_event })
|
||||
}
|
||||
@@ -742,7 +819,7 @@ mod tests {
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_test::io::Builder as IoBuilder;
|
||||
use tokio_util::io::ReaderStream;
|
||||
|
||||
use tracing_test::traced_test;
|
||||
// ────────────────────────────
|
||||
// Helpers
|
||||
// ────────────────────────────
|
||||
@@ -752,6 +829,7 @@ mod tests {
|
||||
async fn collect_events(
|
||||
chunks: &[&[u8]],
|
||||
provider: ModelProviderInfo,
|
||||
otel_event_manager: OtelEventManager,
|
||||
) -> Vec<Result<ResponseEvent>> {
|
||||
let mut builder = IoBuilder::new();
|
||||
for chunk in chunks {
|
||||
@@ -761,7 +839,12 @@ mod tests {
|
||||
let reader = builder.build();
|
||||
let stream = ReaderStream::new(reader).map_err(CodexErr::Io);
|
||||
let (tx, mut rx) = mpsc::channel::<Result<ResponseEvent>>(16);
|
||||
tokio::spawn(process_sse(stream, tx, provider.stream_idle_timeout()));
|
||||
tokio::spawn(process_sse(
|
||||
stream,
|
||||
tx,
|
||||
provider.stream_idle_timeout(),
|
||||
otel_event_manager,
|
||||
));
|
||||
|
||||
let mut events = Vec::new();
|
||||
while let Some(ev) = rx.recv().await {
|
||||
@@ -770,11 +853,35 @@ mod tests {
|
||||
events
|
||||
}
|
||||
|
||||
async fn run_sse_allow_errors(
|
||||
events: Vec<serde_json::Value>,
|
||||
provider: ModelProviderInfo,
|
||||
otel_event_manager: OtelEventManager,
|
||||
) {
|
||||
let chunks: Vec<Vec<u8>> = events
|
||||
.into_iter()
|
||||
.map(|event| {
|
||||
let kind = event
|
||||
.get("type")
|
||||
.and_then(|v| v.as_str())
|
||||
.expect("fixture event missing type");
|
||||
if event.as_object().map(|o| o.len() == 1).unwrap_or(false) {
|
||||
format!("event: {kind}\n\n").into_bytes()
|
||||
} else {
|
||||
format!("event: {kind}\ndata: {event}\n\n").into_bytes()
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
let chunk_refs: Vec<&[u8]> = chunks.iter().map(Vec::as_slice).collect();
|
||||
let _ = collect_events(&chunk_refs, provider, otel_event_manager).await;
|
||||
}
|
||||
|
||||
/// Builds an in-memory SSE stream from JSON fixtures and returns only the
|
||||
/// successfully parsed events (panics on internal channel errors).
|
||||
async fn run_sse(
|
||||
events: Vec<serde_json::Value>,
|
||||
provider: ModelProviderInfo,
|
||||
otel_event_manager: OtelEventManager,
|
||||
) -> Vec<ResponseEvent> {
|
||||
let mut body = String::new();
|
||||
for e in events {
|
||||
@@ -791,7 +898,12 @@ mod tests {
|
||||
|
||||
let (tx, mut rx) = mpsc::channel::<Result<ResponseEvent>>(8);
|
||||
let stream = ReaderStream::new(std::io::Cursor::new(body)).map_err(CodexErr::Io);
|
||||
tokio::spawn(process_sse(stream, tx, provider.stream_idle_timeout()));
|
||||
tokio::spawn(process_sse(
|
||||
stream,
|
||||
tx,
|
||||
provider.stream_idle_timeout(),
|
||||
otel_event_manager,
|
||||
));
|
||||
|
||||
let mut out = Vec::new();
|
||||
while let Some(ev) = rx.recv().await {
|
||||
@@ -800,6 +912,378 @@ mod tests {
|
||||
out
|
||||
}
|
||||
|
||||
fn otel_event_manager() -> OtelEventManager {
|
||||
OtelEventManager::new(
|
||||
ConversationId::new(),
|
||||
"test-model",
|
||||
"test-slug",
|
||||
None,
|
||||
Some(AuthMode::ChatGPT),
|
||||
true,
|
||||
"test-terminal".to_string(),
|
||||
)
|
||||
}
|
||||
|
||||
fn model_provider() -> ModelProviderInfo {
|
||||
ModelProviderInfo {
|
||||
name: "test".to_string(),
|
||||
base_url: Some("https://example.com".to_string()),
|
||||
env_key: Some("TEST_KEY".to_string()),
|
||||
env_key_instructions: None,
|
||||
wire_api: WireApi::Responses,
|
||||
query_params: None,
|
||||
http_headers: None,
|
||||
env_http_headers: None,
|
||||
request_max_retries: Some(0),
|
||||
stream_max_retries: Some(0),
|
||||
stream_idle_timeout_ms: Some(1_000),
|
||||
requires_openai_auth: false,
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[traced_test]
|
||||
async fn process_sse_emits_tracing_for_output_item() {
|
||||
let provider = model_provider();
|
||||
let otel_event_manager = otel_event_manager();
|
||||
|
||||
let _ = run_sse(
|
||||
vec![
|
||||
json!({
|
||||
"type": "response.output_item.done",
|
||||
"item": {
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"content": [{ "type": "output_text", "text": "hi" }]
|
||||
}
|
||||
}),
|
||||
json!({
|
||||
"type": "response.completed",
|
||||
"response": { "id": "resp1" }
|
||||
}),
|
||||
],
|
||||
provider,
|
||||
otel_event_manager,
|
||||
)
|
||||
.await;
|
||||
|
||||
logs_assert(|lines: &[&str]| {
|
||||
lines
|
||||
.iter()
|
||||
.find(|line| {
|
||||
line.contains("codex.sse_event")
|
||||
&& line.contains("event.kind=response.output_item.done")
|
||||
})
|
||||
.map(|_| Ok(()))
|
||||
.unwrap_or(Err("missing response.output_item.done event".to_string()))
|
||||
});
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[traced_test]
|
||||
async fn process_sse_emits_failed_event_on_parse_error() {
|
||||
let chunk = "data: not-json\n\n";
|
||||
let provider = model_provider();
|
||||
let otel_event_manager = otel_event_manager();
|
||||
|
||||
let _ = collect_events(&[chunk.as_bytes()], provider, otel_event_manager).await;
|
||||
|
||||
logs_assert(|lines: &[&str]| {
|
||||
lines
|
||||
.iter()
|
||||
.find(|line| {
|
||||
line.contains("codex.sse_event")
|
||||
&& line.contains("error.message")
|
||||
&& line.contains("Failed to parse SSE event")
|
||||
})
|
||||
.map(|_| Ok(()))
|
||||
.unwrap_or(Err("missing codex.sse_event".to_string()))
|
||||
});
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[traced_test]
|
||||
async fn process_sse_stream_error_records_failed_event() {
|
||||
let provider = model_provider();
|
||||
let otel_event_manager = otel_event_manager();
|
||||
let (tx, mut rx) = mpsc::channel::<Result<ResponseEvent>>(1);
|
||||
|
||||
let stream = stream::iter(vec![Err::<Bytes, CodexErr>(
|
||||
std::io::Error::new(std::io::ErrorKind::Other, "boom").into(),
|
||||
)]);
|
||||
|
||||
let handle = tokio::spawn(process_sse(
|
||||
stream,
|
||||
tx,
|
||||
provider.stream_idle_timeout(),
|
||||
otel_event_manager,
|
||||
));
|
||||
|
||||
while rx.recv().await.is_some() {}
|
||||
handle.await.expect("process_sse task");
|
||||
|
||||
logs_assert(|lines: &[&str]| {
|
||||
lines
|
||||
.iter()
|
||||
.find(|line| {
|
||||
line.contains("codex.sse_event")
|
||||
&& line.contains("error.message")
|
||||
&& line.contains("boom")
|
||||
})
|
||||
.map(|_| Ok(()))
|
||||
.unwrap_or(Err("missing codex.sse_event".to_string()))
|
||||
});
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[traced_test]
|
||||
async fn process_sse_idle_timeout_records_failed_event() {
|
||||
let otel_event_manager = otel_event_manager();
|
||||
let (tx, mut rx) = mpsc::channel::<Result<ResponseEvent>>(1);
|
||||
let stream = stream::pending::<std::result::Result<Bytes, CodexErr>>();
|
||||
let idle_timeout = std::time::Duration::from_millis(1);
|
||||
|
||||
let handle = tokio::spawn(process_sse(stream, tx, idle_timeout, otel_event_manager));
|
||||
let _ = rx.recv().await;
|
||||
handle.await.expect("process_sse task");
|
||||
|
||||
logs_assert(|lines: &[&str]| {
|
||||
lines
|
||||
.iter()
|
||||
.find(|line| {
|
||||
line.contains("codex.sse_event")
|
||||
&& line.contains("error.message")
|
||||
&& line.contains("idle timeout")
|
||||
})
|
||||
.map(|_| Ok(()))
|
||||
.unwrap_or(Err("missing codex.sse_event".to_string()))
|
||||
});
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[traced_test]
|
||||
async fn process_sse_records_failed_event_when_stream_closes_without_completed() {
|
||||
let item = json!({
|
||||
"type": "response.output_item.done",
|
||||
"item": {
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"content": [{"type": "output_text", "text": "hi"}]
|
||||
}
|
||||
})
|
||||
.to_string();
|
||||
let sse = format!("event: response.output_item.done\ndata: {item}\n\n");
|
||||
let provider = model_provider();
|
||||
let otel_event_manager = otel_event_manager();
|
||||
|
||||
let _ = collect_events(&[sse.as_bytes()], provider, otel_event_manager).await;
|
||||
|
||||
logs_assert(|lines: &[&str]| {
|
||||
lines
|
||||
.iter()
|
||||
.find(|line| {
|
||||
line.contains("codex.sse_event")
|
||||
&& line.contains("error.message")
|
||||
&& line.contains("stream closed before response.completed")
|
||||
})
|
||||
.map(|_| Ok(()))
|
||||
.unwrap_or(Err("missing codex.sse_event".to_string()))
|
||||
});
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[traced_test]
|
||||
async fn process_sse_output_item_parse_failure_logs_event() {
|
||||
let provider = model_provider();
|
||||
let otel_event_manager = otel_event_manager();
|
||||
let _ = run_sse_allow_errors(
|
||||
vec![json!({
|
||||
"type": "response.output_item.done",
|
||||
"item": "unexpected"
|
||||
})],
|
||||
provider,
|
||||
otel_event_manager,
|
||||
)
|
||||
.await;
|
||||
|
||||
logs_assert(|lines: &[&str]| {
|
||||
lines
|
||||
.iter()
|
||||
.find(|line| {
|
||||
line.contains("codex.sse_event")
|
||||
&& line.contains("event.kind=response.output_item.done")
|
||||
&& line.contains("error.message")
|
||||
&& line.contains("failed to parse ResponseItem")
|
||||
})
|
||||
.map(|_| Ok(()))
|
||||
.unwrap_or(Err("missing codex.sse_event".to_string()))
|
||||
});
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[traced_test]
|
||||
async fn process_sse_failed_event_records_response_error_message() {
|
||||
let provider = model_provider();
|
||||
let otel_event_manager = otel_event_manager();
|
||||
let _ = run_sse_allow_errors(
|
||||
vec![json!({
|
||||
"type": "response.failed",
|
||||
"response": {
|
||||
"error": {
|
||||
"message": "boom",
|
||||
"code": "bad"
|
||||
}
|
||||
}
|
||||
})],
|
||||
provider,
|
||||
otel_event_manager,
|
||||
)
|
||||
.await;
|
||||
|
||||
logs_assert(|lines: &[&str]| {
|
||||
lines
|
||||
.iter()
|
||||
.find(|line| {
|
||||
line.contains("codex.sse_event")
|
||||
&& line.contains("event.kind=response.failed")
|
||||
&& line.contains("error.message")
|
||||
&& line.contains("boom")
|
||||
})
|
||||
.map(|_| Ok(()))
|
||||
.unwrap_or(Err("missing codex.sse_event".to_string()))
|
||||
});
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[traced_test]
|
||||
async fn process_sse_failed_event_logs_parse_error() {
|
||||
let provider = model_provider();
|
||||
let otel_event_manager = otel_event_manager();
|
||||
let _ = run_sse_allow_errors(
|
||||
vec![json!({
|
||||
"type": "response.failed",
|
||||
"response": {
|
||||
"error": "not-an-object"
|
||||
}
|
||||
})],
|
||||
provider,
|
||||
otel_event_manager,
|
||||
)
|
||||
.await;
|
||||
|
||||
logs_assert(|lines: &[&str]| {
|
||||
lines
|
||||
.iter()
|
||||
.find(|line| {
|
||||
line.contains("codex.sse_event")
|
||||
&& line.contains("event.kind=response.failed")
|
||||
&& line.contains("error.message")
|
||||
&& line.contains("failed to parse ErrorResponse")
|
||||
})
|
||||
.map(|_| Ok(()))
|
||||
.unwrap_or(Err("missing codex.sse_event".to_string()))
|
||||
});
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[traced_test]
|
||||
async fn process_sse_failed_event_logs_missing_error() {
|
||||
let provider = model_provider();
|
||||
let otel_event_manager = otel_event_manager();
|
||||
let _ = run_sse_allow_errors(
|
||||
vec![json!({
|
||||
"type": "response.failed",
|
||||
"response": {}
|
||||
})],
|
||||
provider,
|
||||
otel_event_manager,
|
||||
)
|
||||
.await;
|
||||
|
||||
logs_assert(|lines: &[&str]| {
|
||||
lines
|
||||
.iter()
|
||||
.find(|line| {
|
||||
line.contains("codex.sse_event")
|
||||
&& line.contains("event.kind=response.failed")
|
||||
&& line.contains("error.message")
|
||||
&& line.contains("response.failed event received")
|
||||
})
|
||||
.map(|_| Ok(()))
|
||||
.unwrap_or(Err("missing codex.sse_event".to_string()))
|
||||
});
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[traced_test]
|
||||
async fn process_sse_failed_event_logs_response_completed_parse_error() {
|
||||
let provider = model_provider();
|
||||
let otel_event_manager = otel_event_manager();
|
||||
let _ = run_sse_allow_errors(
|
||||
vec![json!({
|
||||
"type": "response.completed",
|
||||
"response": {}
|
||||
})],
|
||||
provider,
|
||||
otel_event_manager,
|
||||
)
|
||||
.await;
|
||||
|
||||
logs_assert(|lines: &[&str]| {
|
||||
lines
|
||||
.iter()
|
||||
.find(|line| {
|
||||
line.contains("codex.sse_event")
|
||||
&& line.contains("event.kind=response.completed")
|
||||
&& line.contains("error.message")
|
||||
&& line.contains("failed to parse ResponseCompleted")
|
||||
})
|
||||
.map(|_| Ok(()))
|
||||
.unwrap_or(Err("missing codex.sse_event".to_string()))
|
||||
});
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[traced_test]
|
||||
async fn process_sse_emits_completed_telemetry() {
|
||||
let provider = model_provider();
|
||||
let otel_event_manager = otel_event_manager();
|
||||
let _ = run_sse_allow_errors(
|
||||
vec![json!({
|
||||
"type": "response.completed",
|
||||
"response": {
|
||||
"id": "resp1",
|
||||
"usage": {
|
||||
"input_tokens": 3,
|
||||
"input_tokens_details": { "cached_tokens": 1 },
|
||||
"output_tokens": 5,
|
||||
"output_tokens_details": { "reasoning_tokens": 2 },
|
||||
"total_tokens": 9
|
||||
}
|
||||
}
|
||||
})],
|
||||
provider,
|
||||
otel_event_manager,
|
||||
)
|
||||
.await;
|
||||
|
||||
logs_assert(|lines: &[&str]| {
|
||||
lines
|
||||
.iter()
|
||||
.find(|line| {
|
||||
line.contains("codex.sse_event")
|
||||
&& line.contains("event.kind=response.completed")
|
||||
&& line.contains("input_token_count=3")
|
||||
&& line.contains("output_token_count=5")
|
||||
&& line.contains("cached_token_count=1")
|
||||
&& line.contains("reasoning_token_count=2")
|
||||
&& line.contains("tool_token_count=9")
|
||||
})
|
||||
.map(|_| Ok(()))
|
||||
.unwrap_or(Err("missing response.completed telemetry".to_string()))
|
||||
});
|
||||
}
|
||||
|
||||
// ────────────────────────────
|
||||
// Tests from `implement-test-for-responses-api-sse-parser`
|
||||
// ────────────────────────────
|
||||
@@ -836,24 +1320,13 @@ mod tests {
|
||||
let sse2 = format!("event: response.output_item.done\ndata: {item2}\n\n");
|
||||
let sse3 = format!("event: response.completed\ndata: {completed}\n\n");
|
||||
|
||||
let provider = ModelProviderInfo {
|
||||
name: "test".to_string(),
|
||||
base_url: Some("https://test.com".to_string()),
|
||||
env_key: Some("TEST_API_KEY".to_string()),
|
||||
env_key_instructions: None,
|
||||
wire_api: WireApi::Responses,
|
||||
query_params: None,
|
||||
http_headers: None,
|
||||
env_http_headers: None,
|
||||
request_max_retries: Some(0),
|
||||
stream_max_retries: Some(0),
|
||||
stream_idle_timeout_ms: Some(1000),
|
||||
requires_openai_auth: false,
|
||||
};
|
||||
let provider = model_provider();
|
||||
let otel_event_manager = otel_event_manager();
|
||||
|
||||
let events = collect_events(
|
||||
&[sse1.as_bytes(), sse2.as_bytes(), sse3.as_bytes()],
|
||||
provider,
|
||||
otel_event_manager,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -896,22 +1369,10 @@ mod tests {
|
||||
.to_string();
|
||||
|
||||
let sse1 = format!("event: response.output_item.done\ndata: {item1}\n\n");
|
||||
let provider = ModelProviderInfo {
|
||||
name: "test".to_string(),
|
||||
base_url: Some("https://test.com".to_string()),
|
||||
env_key: Some("TEST_API_KEY".to_string()),
|
||||
env_key_instructions: None,
|
||||
wire_api: WireApi::Responses,
|
||||
query_params: None,
|
||||
http_headers: None,
|
||||
env_http_headers: None,
|
||||
request_max_retries: Some(0),
|
||||
stream_max_retries: Some(0),
|
||||
stream_idle_timeout_ms: Some(1000),
|
||||
requires_openai_auth: false,
|
||||
};
|
||||
let provider = model_provider();
|
||||
let otel_event_manager = otel_event_manager();
|
||||
|
||||
let events = collect_events(&[sse1.as_bytes()], provider).await;
|
||||
let events = collect_events(&[sse1.as_bytes()], provider, otel_event_manager).await;
|
||||
|
||||
assert_eq!(events.len(), 2);
|
||||
|
||||
@@ -930,22 +1391,10 @@ mod tests {
|
||||
let raw_error = r#"{"type":"response.failed","sequence_number":3,"response":{"id":"resp_689bcf18d7f08194bf3440ba62fe05d803fee0cdac429894","object":"response","created_at":1755041560,"status":"failed","background":false,"error":{"code":"rate_limit_exceeded","message":"Rate limit reached for gpt-5 in organization org-AAA on tokens per min (TPM): Limit 30000, Used 22999, Requested 12528. Please try again in 11.054s. Visit https://platform.openai.com/account/rate-limits to learn more."}, "usage":null,"user":null,"metadata":{}}}"#;
|
||||
|
||||
let sse1 = format!("event: response.failed\ndata: {raw_error}\n\n");
|
||||
let provider = ModelProviderInfo {
|
||||
name: "test".to_string(),
|
||||
base_url: Some("https://test.com".to_string()),
|
||||
env_key: Some("TEST_API_KEY".to_string()),
|
||||
env_key_instructions: None,
|
||||
wire_api: WireApi::Responses,
|
||||
query_params: None,
|
||||
http_headers: None,
|
||||
env_http_headers: None,
|
||||
request_max_retries: Some(0),
|
||||
stream_max_retries: Some(0),
|
||||
stream_idle_timeout_ms: Some(1000),
|
||||
requires_openai_auth: false,
|
||||
};
|
||||
let provider = model_provider();
|
||||
let otel_event_manager = otel_event_manager();
|
||||
|
||||
let events = collect_events(&[sse1.as_bytes()], provider).await;
|
||||
let events = collect_events(&[sse1.as_bytes()], provider, otel_event_manager).await;
|
||||
|
||||
assert_eq!(events.len(), 1);
|
||||
|
||||
@@ -1035,22 +1484,10 @@ mod tests {
|
||||
let mut evs = vec![case.event];
|
||||
evs.push(completed.clone());
|
||||
|
||||
let provider = ModelProviderInfo {
|
||||
name: "test".to_string(),
|
||||
base_url: Some("https://test.com".to_string()),
|
||||
env_key: Some("TEST_API_KEY".to_string()),
|
||||
env_key_instructions: None,
|
||||
wire_api: WireApi::Responses,
|
||||
query_params: None,
|
||||
http_headers: None,
|
||||
env_http_headers: None,
|
||||
request_max_retries: Some(0),
|
||||
stream_max_retries: Some(0),
|
||||
stream_idle_timeout_ms: Some(1000),
|
||||
requires_openai_auth: false,
|
||||
};
|
||||
let provider = model_provider();
|
||||
let otel_event_manager = otel_event_manager();
|
||||
|
||||
let out = run_sse(evs, provider).await;
|
||||
let out = run_sse(evs, provider, otel_event_manager).await;
|
||||
assert_eq!(out.len(), case.expected_len, "case {}", case.name);
|
||||
assert!(
|
||||
(case.expect_first)(&out[0]),
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -48,6 +48,8 @@ pub(crate) const PROJECT_DOC_MAX_BYTES: usize = 32 * 1024; // 32 KiB
|
||||
|
||||
pub(crate) const CONFIG_TOML_FILE: &str = "config.toml";
|
||||
|
||||
const DEFAULT_OTEL_ENVIRONMENT: &str = "dev";
|
||||
|
||||
/// Application configuration loaded from disk and merged with overrides.
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct Config {
|
||||
@@ -194,6 +196,9 @@ pub struct Config {
|
||||
/// All characters are inserted as they are received, and no buffering
|
||||
/// or placeholder replacement will occur for fast keypress bursts.
|
||||
pub disable_paste_burst: bool,
|
||||
|
||||
/// OTEL configuration (exporter type, endpoint, headers, etc.).
|
||||
pub otel: crate::config_types::OtelConfig,
|
||||
}
|
||||
|
||||
impl Config {
|
||||
@@ -705,6 +710,9 @@ pub struct ConfigToml {
|
||||
/// All characters are inserted as they are received, and no buffering
|
||||
/// or placeholder replacement will occur for fast keypress bursts.
|
||||
pub disable_paste_burst: Option<bool>,
|
||||
|
||||
/// OTEL configuration.
|
||||
pub otel: Option<crate::config_types::OtelConfigToml>,
|
||||
}
|
||||
|
||||
impl From<ConfigToml> for UserSavedConfig {
|
||||
@@ -1053,6 +1061,22 @@ impl Config {
|
||||
.as_ref()
|
||||
.map(|t| t.notifications.clone())
|
||||
.unwrap_or_default(),
|
||||
otel: {
|
||||
use crate::config_types::OtelConfig;
|
||||
use crate::config_types::OtelConfigToml;
|
||||
use crate::config_types::OtelExporterKind;
|
||||
let t: OtelConfigToml = cfg.otel.unwrap_or_default();
|
||||
let log_user_prompt = t.log_user_prompt.unwrap_or(false);
|
||||
let environment = t
|
||||
.environment
|
||||
.unwrap_or(DEFAULT_OTEL_ENVIRONMENT.to_string());
|
||||
let exporter = t.exporter.unwrap_or(OtelExporterKind::None);
|
||||
OtelConfig {
|
||||
log_user_prompt,
|
||||
environment,
|
||||
exporter,
|
||||
}
|
||||
},
|
||||
};
|
||||
Ok(config)
|
||||
}
|
||||
@@ -1631,6 +1655,11 @@ model_verbosity = "high"
|
||||
active_profile: Some("o3".to_string()),
|
||||
disable_paste_burst: false,
|
||||
tui_notifications: Default::default(),
|
||||
otel: crate::config_types::OtelConfig {
|
||||
log_user_prompt: false,
|
||||
environment: DEFAULT_OTEL_ENVIRONMENT.to_string(),
|
||||
exporter: crate::config_types::OtelExporterKind::None,
|
||||
},
|
||||
},
|
||||
o3_profile_config
|
||||
);
|
||||
@@ -1689,6 +1718,11 @@ model_verbosity = "high"
|
||||
active_profile: Some("gpt3".to_string()),
|
||||
disable_paste_burst: false,
|
||||
tui_notifications: Default::default(),
|
||||
otel: crate::config_types::OtelConfig {
|
||||
log_user_prompt: false,
|
||||
environment: DEFAULT_OTEL_ENVIRONMENT.to_string(),
|
||||
exporter: crate::config_types::OtelExporterKind::None,
|
||||
},
|
||||
};
|
||||
|
||||
assert_eq!(expected_gpt3_profile_config, gpt3_profile_config);
|
||||
@@ -1762,6 +1796,11 @@ model_verbosity = "high"
|
||||
active_profile: Some("zdr".to_string()),
|
||||
disable_paste_burst: false,
|
||||
tui_notifications: Default::default(),
|
||||
otel: crate::config_types::OtelConfig {
|
||||
log_user_prompt: false,
|
||||
environment: DEFAULT_OTEL_ENVIRONMENT.to_string(),
|
||||
exporter: crate::config_types::OtelExporterKind::None,
|
||||
},
|
||||
};
|
||||
|
||||
assert_eq!(expected_zdr_profile_config, zdr_profile_config);
|
||||
@@ -1821,6 +1860,11 @@ model_verbosity = "high"
|
||||
active_profile: Some("gpt5".to_string()),
|
||||
disable_paste_burst: false,
|
||||
tui_notifications: Default::default(),
|
||||
otel: crate::config_types::OtelConfig {
|
||||
log_user_prompt: false,
|
||||
environment: DEFAULT_OTEL_ENVIRONMENT.to_string(),
|
||||
exporter: crate::config_types::OtelExporterKind::None,
|
||||
},
|
||||
};
|
||||
|
||||
assert_eq!(expected_gpt5_profile_config, gpt5_profile_config);
|
||||
|
||||
@@ -76,6 +76,54 @@ pub enum HistoryPersistence {
|
||||
None,
|
||||
}
|
||||
|
||||
// ===== OTEL configuration =====
|
||||
|
||||
#[derive(Deserialize, Debug, Clone, PartialEq)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub enum OtelHttpProtocol {
|
||||
/// Binary payload
|
||||
Binary,
|
||||
/// JSON payload
|
||||
Json,
|
||||
}
|
||||
|
||||
/// Which OTEL exporter to use.
|
||||
#[derive(Deserialize, Debug, Clone, PartialEq)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub enum OtelExporterKind {
|
||||
None,
|
||||
OtlpHttp {
|
||||
endpoint: String,
|
||||
headers: HashMap<String, String>,
|
||||
protocol: OtelHttpProtocol,
|
||||
},
|
||||
OtlpGrpc {
|
||||
endpoint: String,
|
||||
headers: HashMap<String, String>,
|
||||
},
|
||||
}
|
||||
|
||||
/// OTEL settings loaded from config.toml. Fields are optional so we can apply defaults.
|
||||
#[derive(Deserialize, Debug, Clone, PartialEq, Default)]
|
||||
pub struct OtelConfigToml {
|
||||
/// Log user prompt in traces
|
||||
pub log_user_prompt: Option<bool>,
|
||||
|
||||
/// Mark traces with environment (dev, staging, prod, test). Defaults to dev.
|
||||
pub environment: Option<String>,
|
||||
|
||||
/// Exporter to use. Defaults to `otlp-file`.
|
||||
pub exporter: Option<OtelExporterKind>,
|
||||
}
|
||||
|
||||
/// Effective OTEL settings after defaults are applied.
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct OtelConfig {
|
||||
pub log_user_prompt: bool,
|
||||
pub environment: String,
|
||||
pub exporter: OtelExporterKind,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
|
||||
#[serde(untagged)]
|
||||
pub enum Notifications {
|
||||
|
||||
@@ -100,3 +100,5 @@ pub use codex_protocol::models::LocalShellExecAction;
|
||||
pub use codex_protocol::models::LocalShellStatus;
|
||||
pub use codex_protocol::models::ReasoningItemContent;
|
||||
pub use codex_protocol::models::ResponseItem;
|
||||
|
||||
pub mod otel_init;
|
||||
|
||||
61
codex-rs/core/src/otel_init.rs
Normal file
61
codex-rs/core/src/otel_init.rs
Normal file
@@ -0,0 +1,61 @@
|
||||
use crate::config::Config;
|
||||
use crate::config_types::OtelExporterKind as Kind;
|
||||
use crate::config_types::OtelHttpProtocol as Protocol;
|
||||
use crate::default_client::ORIGINATOR;
|
||||
use codex_otel::config::OtelExporter;
|
||||
use codex_otel::config::OtelHttpProtocol;
|
||||
use codex_otel::config::OtelSettings;
|
||||
use codex_otel::otel_provider::OtelProvider;
|
||||
use std::error::Error;
|
||||
|
||||
/// Build an OpenTelemetry provider from the app Config.
|
||||
///
|
||||
/// Returns `None` when OTEL export is disabled.
|
||||
pub fn build_provider(
|
||||
config: &Config,
|
||||
service_version: &str,
|
||||
) -> Result<Option<OtelProvider>, Box<dyn Error>> {
|
||||
let exporter = match &config.otel.exporter {
|
||||
Kind::None => OtelExporter::None,
|
||||
Kind::OtlpHttp {
|
||||
endpoint,
|
||||
headers,
|
||||
protocol,
|
||||
} => {
|
||||
let protocol = match protocol {
|
||||
Protocol::Json => OtelHttpProtocol::Json,
|
||||
Protocol::Binary => OtelHttpProtocol::Binary,
|
||||
};
|
||||
|
||||
OtelExporter::OtlpHttp {
|
||||
endpoint: endpoint.clone(),
|
||||
headers: headers
|
||||
.iter()
|
||||
.map(|(k, v)| (k.clone(), v.clone()))
|
||||
.collect(),
|
||||
protocol,
|
||||
}
|
||||
}
|
||||
Kind::OtlpGrpc { endpoint, headers } => OtelExporter::OtlpGrpc {
|
||||
endpoint: endpoint.clone(),
|
||||
headers: headers
|
||||
.iter()
|
||||
.map(|(k, v)| (k.clone(), v.clone()))
|
||||
.collect(),
|
||||
},
|
||||
};
|
||||
|
||||
OtelProvider::from(&OtelSettings {
|
||||
service_name: ORIGINATOR.value.to_owned(),
|
||||
service_version: service_version.to_string(),
|
||||
codex_home: config.codex_home.clone(),
|
||||
environment: config.otel.environment.to_string(),
|
||||
exporter,
|
||||
})
|
||||
}
|
||||
|
||||
/// Filter predicate for exporting only Codex-owned events via OTEL.
|
||||
/// Keeps events that originated from codex_otel module
|
||||
pub fn codex_export_filter(meta: &tracing::Metadata<'_>) -> bool {
|
||||
meta.target().starts_with("codex_otel")
|
||||
}
|
||||
@@ -13,9 +13,14 @@ use crate::protocol::SandboxPolicy;
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum SafetyCheck {
|
||||
AutoApprove { sandbox_type: SandboxType },
|
||||
AutoApprove {
|
||||
sandbox_type: SandboxType,
|
||||
user_explicitly_approved: bool,
|
||||
},
|
||||
AskUser,
|
||||
Reject { reason: String },
|
||||
Reject {
|
||||
reason: String,
|
||||
},
|
||||
}
|
||||
|
||||
pub fn assess_patch_safety(
|
||||
@@ -52,12 +57,16 @@ pub fn assess_patch_safety(
|
||||
// fall back to asking the user because the patch may touch arbitrary
|
||||
// paths outside the project.
|
||||
match get_platform_sandbox() {
|
||||
Some(sandbox_type) => SafetyCheck::AutoApprove { sandbox_type },
|
||||
Some(sandbox_type) => SafetyCheck::AutoApprove {
|
||||
sandbox_type,
|
||||
user_explicitly_approved: false,
|
||||
},
|
||||
None if sandbox_policy == &SandboxPolicy::DangerFullAccess => {
|
||||
// If the user has explicitly requested DangerFullAccess, then
|
||||
// we can auto-approve even without a sandbox.
|
||||
SafetyCheck::AutoApprove {
|
||||
sandbox_type: SandboxType::None,
|
||||
user_explicitly_approved: false,
|
||||
}
|
||||
}
|
||||
None => SafetyCheck::AskUser,
|
||||
@@ -101,6 +110,7 @@ pub fn assess_command_safety(
|
||||
if is_known_safe_command(command) || approved.contains(command) {
|
||||
return SafetyCheck::AutoApprove {
|
||||
sandbox_type: SandboxType::None,
|
||||
user_explicitly_approved: false,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -126,13 +136,17 @@ pub(crate) fn assess_safety_for_untrusted_command(
|
||||
| (Never, DangerFullAccess)
|
||||
| (OnRequest, DangerFullAccess) => SafetyCheck::AutoApprove {
|
||||
sandbox_type: SandboxType::None,
|
||||
user_explicitly_approved: false,
|
||||
},
|
||||
(OnRequest, ReadOnly) | (OnRequest, WorkspaceWrite { .. }) => {
|
||||
if with_escalated_permissions {
|
||||
SafetyCheck::AskUser
|
||||
} else {
|
||||
match get_platform_sandbox() {
|
||||
Some(sandbox_type) => SafetyCheck::AutoApprove { sandbox_type },
|
||||
Some(sandbox_type) => SafetyCheck::AutoApprove {
|
||||
sandbox_type,
|
||||
user_explicitly_approved: false,
|
||||
},
|
||||
// Fall back to asking since the command is untrusted and
|
||||
// we do not have a sandbox available
|
||||
None => SafetyCheck::AskUser,
|
||||
@@ -144,7 +158,10 @@ pub(crate) fn assess_safety_for_untrusted_command(
|
||||
| (OnFailure, ReadOnly)
|
||||
| (OnFailure, WorkspaceWrite { .. }) => {
|
||||
match get_platform_sandbox() {
|
||||
Some(sandbox_type) => SafetyCheck::AutoApprove { sandbox_type },
|
||||
Some(sandbox_type) => SafetyCheck::AutoApprove {
|
||||
sandbox_type,
|
||||
user_explicitly_approved: false,
|
||||
},
|
||||
None => {
|
||||
if matches!(approval_policy, OnFailure) {
|
||||
// Since the command is not trusted, even though the
|
||||
@@ -342,7 +359,10 @@ mod tests {
|
||||
);
|
||||
|
||||
let expected = match get_platform_sandbox() {
|
||||
Some(sandbox_type) => SafetyCheck::AutoApprove { sandbox_type },
|
||||
Some(sandbox_type) => SafetyCheck::AutoApprove {
|
||||
sandbox_type,
|
||||
user_explicitly_approved: false,
|
||||
},
|
||||
None => SafetyCheck::AskUser,
|
||||
};
|
||||
assert_eq!(safety_check, expected);
|
||||
|
||||
@@ -11,6 +11,8 @@ use codex_core::ReasoningItemContent;
|
||||
use codex_core::ResponseItem;
|
||||
use codex_core::WireApi;
|
||||
use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
|
||||
use codex_otel::otel_event_manager::OtelEventManager;
|
||||
use codex_protocol::mcp_protocol::AuthMode;
|
||||
use codex_protocol::mcp_protocol::ConversationId;
|
||||
use core_test_support::load_default_config_for_test;
|
||||
use futures::StreamExt;
|
||||
@@ -70,13 +72,26 @@ async fn run_request(input: Vec<ResponseItem>) -> Value {
|
||||
let summary = config.model_reasoning_summary;
|
||||
let config = Arc::new(config);
|
||||
|
||||
let conversation_id = ConversationId::new();
|
||||
|
||||
let otel_event_manager = OtelEventManager::new(
|
||||
conversation_id,
|
||||
config.model.as_str(),
|
||||
config.model_family.slug.as_str(),
|
||||
None,
|
||||
Some(AuthMode::ChatGPT),
|
||||
false,
|
||||
"test".to_string(),
|
||||
);
|
||||
|
||||
let client = ModelClient::new(
|
||||
Arc::clone(&config),
|
||||
None,
|
||||
otel_event_manager,
|
||||
provider,
|
||||
effort,
|
||||
summary,
|
||||
ConversationId::new(),
|
||||
conversation_id,
|
||||
);
|
||||
|
||||
let mut prompt = Prompt::default();
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use std::sync::Arc;
|
||||
use tracing_test::traced_test;
|
||||
|
||||
use codex_core::ContentItem;
|
||||
use codex_core::ModelClient;
|
||||
@@ -8,6 +9,8 @@ use codex_core::ResponseEvent;
|
||||
use codex_core::ResponseItem;
|
||||
use codex_core::WireApi;
|
||||
use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
|
||||
use codex_otel::otel_event_manager::OtelEventManager;
|
||||
use codex_protocol::mcp_protocol::AuthMode;
|
||||
use codex_protocol::mcp_protocol::ConversationId;
|
||||
use core_test_support::load_default_config_for_test;
|
||||
use futures::StreamExt;
|
||||
@@ -23,11 +26,15 @@ fn network_disabled() -> bool {
|
||||
}
|
||||
|
||||
async fn run_stream(sse_body: &str) -> Vec<ResponseEvent> {
|
||||
run_stream_with_bytes(sse_body.as_bytes()).await
|
||||
}
|
||||
|
||||
async fn run_stream_with_bytes(sse_body: &[u8]) -> Vec<ResponseEvent> {
|
||||
let server = MockServer::start().await;
|
||||
|
||||
let template = ResponseTemplate::new(200)
|
||||
.insert_header("content-type", "text/event-stream")
|
||||
.set_body_raw(sse_body.to_string(), "text/event-stream");
|
||||
.set_body_bytes(sse_body.to_vec());
|
||||
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/v1/chat/completions"))
|
||||
@@ -63,13 +70,26 @@ async fn run_stream(sse_body: &str) -> Vec<ResponseEvent> {
|
||||
let summary = config.model_reasoning_summary;
|
||||
let config = Arc::new(config);
|
||||
|
||||
let conversation_id = ConversationId::new();
|
||||
|
||||
let otel_event_manager = OtelEventManager::new(
|
||||
conversation_id,
|
||||
config.model.as_str(),
|
||||
config.model_family.slug.as_str(),
|
||||
None,
|
||||
Some(AuthMode::ChatGPT),
|
||||
false,
|
||||
"test".to_string(),
|
||||
);
|
||||
|
||||
let client = ModelClient::new(
|
||||
Arc::clone(&config),
|
||||
None,
|
||||
otel_event_manager,
|
||||
provider,
|
||||
effort,
|
||||
summary,
|
||||
ConversationId::new(),
|
||||
conversation_id,
|
||||
);
|
||||
|
||||
let mut prompt = Prompt::default();
|
||||
@@ -89,7 +109,8 @@ async fn run_stream(sse_body: &str) -> Vec<ResponseEvent> {
|
||||
while let Some(event) = stream.next().await {
|
||||
match event {
|
||||
Ok(ev) => events.push(ev),
|
||||
Err(e) => panic!("stream event error: {e}"),
|
||||
// We still collect the error to exercise telemetry and complete the task.
|
||||
Err(_e) => break,
|
||||
}
|
||||
}
|
||||
events
|
||||
@@ -318,3 +339,88 @@ async fn streams_reasoning_before_tool_call() {
|
||||
|
||||
assert!(matches!(events[3], ResponseEvent::Completed { .. }));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[traced_test]
|
||||
async fn chat_sse_emits_failed_on_parse_error() {
|
||||
if network_disabled() {
|
||||
println!(
|
||||
"Skipping test because it cannot execute when network is disabled in a Codex sandbox."
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
let sse_body = concat!("data: not-json\n\n", "data: [DONE]\n\n");
|
||||
|
||||
let _ = run_stream(sse_body).await;
|
||||
|
||||
logs_assert(|lines: &[&str]| {
|
||||
lines
|
||||
.iter()
|
||||
.find(|line| {
|
||||
line.contains("codex.api_request") && line.contains("http.response.status_code=200")
|
||||
})
|
||||
.map(|_| Ok(()))
|
||||
.unwrap_or(Err("cannot find codex.api_request event".to_string()))
|
||||
});
|
||||
|
||||
logs_assert(|lines: &[&str]| {
|
||||
lines
|
||||
.iter()
|
||||
.find(|line| {
|
||||
line.contains("codex.sse_event")
|
||||
&& line.contains("error.message")
|
||||
&& line.contains("Failed to parse SSE event")
|
||||
})
|
||||
.map(|_| Ok(()))
|
||||
.unwrap_or(Err("cannot find SSE event".to_string()))
|
||||
});
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[traced_test]
|
||||
async fn chat_sse_done_chunk_emits_event() {
|
||||
if network_disabled() {
|
||||
println!(
|
||||
"Skipping test because it cannot execute when network is disabled in a Codex sandbox."
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
let sse_body = concat!("data: [DONE]\n\n",);
|
||||
|
||||
let _ = run_stream(sse_body).await;
|
||||
|
||||
logs_assert(|lines: &[&str]| {
|
||||
lines
|
||||
.iter()
|
||||
.find(|line| line.contains("codex.sse_event") && line.contains("event.kind=message"))
|
||||
.map(|_| Ok(()))
|
||||
.unwrap_or(Err("cannot find SSE event".to_string()))
|
||||
});
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[traced_test]
|
||||
async fn chat_sse_emits_error_on_invalid_utf8() {
|
||||
if network_disabled() {
|
||||
println!(
|
||||
"Skipping test because it cannot execute when network is disabled in a Codex sandbox."
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
let _ = run_stream_with_bytes(b"data: \x80\x80\n\n").await;
|
||||
|
||||
logs_assert(|lines: &[&str]| {
|
||||
lines
|
||||
.iter()
|
||||
.find(|line| {
|
||||
line.contains("codex.sse_event")
|
||||
&& line.contains("error.message")
|
||||
&& line.contains("UTF8 error: invalid utf-8 sequence of 1 bytes from index 0")
|
||||
})
|
||||
.map(|_| Ok(()))
|
||||
.unwrap_or(Err("cannot find SSE event".to_string()))
|
||||
});
|
||||
}
|
||||
|
||||
@@ -17,6 +17,8 @@ use codex_core::protocol::EventMsg;
|
||||
use codex_core::protocol::InputItem;
|
||||
use codex_core::protocol::Op;
|
||||
use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
|
||||
use codex_otel::otel_event_manager::OtelEventManager;
|
||||
use codex_protocol::mcp_protocol::AuthMode;
|
||||
use codex_protocol::mcp_protocol::ConversationId;
|
||||
use codex_protocol::models::ReasoningItemReasoningSummary;
|
||||
use codex_protocol::models::WebSearchAction;
|
||||
@@ -28,6 +30,7 @@ use serde_json::json;
|
||||
use std::io::Write;
|
||||
use std::sync::Arc;
|
||||
use tempfile::TempDir;
|
||||
use tracing_test::traced_test;
|
||||
use uuid::Uuid;
|
||||
use wiremock::Mock;
|
||||
use wiremock::MockServer;
|
||||
@@ -685,13 +688,26 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() {
|
||||
let summary = config.model_reasoning_summary;
|
||||
let config = Arc::new(config);
|
||||
|
||||
let conversation_id = ConversationId::new();
|
||||
|
||||
let otel_event_manager = OtelEventManager::new(
|
||||
conversation_id,
|
||||
config.model.as_str(),
|
||||
config.model_family.slug.as_str(),
|
||||
None,
|
||||
Some(AuthMode::ChatGPT),
|
||||
false,
|
||||
"test".to_string(),
|
||||
);
|
||||
|
||||
let client = ModelClient::new(
|
||||
Arc::clone(&config),
|
||||
None,
|
||||
otel_event_manager,
|
||||
provider,
|
||||
effort,
|
||||
summary,
|
||||
ConversationId::new(),
|
||||
conversation_id,
|
||||
);
|
||||
|
||||
let mut prompt = Prompt::default();
|
||||
@@ -1074,3 +1090,57 @@ async fn history_dedupes_streamed_and_final_messages_across_turns() {
|
||||
"request 3 tail mismatch",
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[traced_test]
|
||||
async fn responses_api_emits_api_request_event() {
|
||||
let server = MockServer::start().await;
|
||||
|
||||
let first = ResponseTemplate::new(200)
|
||||
.insert_header("content-type", "text/event-stream")
|
||||
.set_body_raw(sse_completed("resp1"), "text/event-stream");
|
||||
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/v1/responses"))
|
||||
.respond_with(first)
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let model_provider = ModelProviderInfo {
|
||||
base_url: Some(format!("{}/v1", server.uri())),
|
||||
..built_in_model_providers()["openai"].clone()
|
||||
};
|
||||
|
||||
let codex_home = TempDir::new().unwrap();
|
||||
let mut config = load_default_config_for_test(&codex_home);
|
||||
config.model_provider = model_provider;
|
||||
config.user_instructions = Some("be nice".to_string());
|
||||
|
||||
let conversation_manager =
|
||||
ConversationManager::with_auth(CodexAuth::from_api_key("Test API Key"));
|
||||
let codex = conversation_manager
|
||||
.new_conversation(config)
|
||||
.await
|
||||
.expect("create new conversation")
|
||||
.conversation;
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![InputItem::Text {
|
||||
text: "hello".into(),
|
||||
}],
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
|
||||
|
||||
logs_assert(|lines: &[&str]| {
|
||||
lines
|
||||
.iter()
|
||||
.find(|line| line.contains("codex.api_request"))
|
||||
.map(|_| Ok(()))
|
||||
.unwrap_or_else(|| Err("expected codex.api_request event".to_string()))
|
||||
});
|
||||
}
|
||||
|
||||
@@ -39,6 +39,7 @@ tokio = { version = "1", features = [
|
||||
] }
|
||||
tracing = { version = "0.1.41", features = ["log"] }
|
||||
tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
|
||||
opentelemetry-appender-tracing = "0.30.1"
|
||||
|
||||
[dev-dependencies]
|
||||
assert_cmd = "2"
|
||||
|
||||
@@ -3,10 +3,6 @@ mod event_processor;
|
||||
mod event_processor_with_human_output;
|
||||
mod event_processor_with_json_output;
|
||||
|
||||
use std::io::IsTerminal;
|
||||
use std::io::Read;
|
||||
use std::path::PathBuf;
|
||||
|
||||
pub use cli::Cli;
|
||||
use codex_core::AuthManager;
|
||||
use codex_core::BUILT_IN_OSS_MODEL_PROVIDER_ID;
|
||||
@@ -25,10 +21,15 @@ use codex_ollama::DEFAULT_OSS_MODEL;
|
||||
use codex_protocol::config_types::SandboxMode;
|
||||
use event_processor_with_human_output::EventProcessorWithHumanOutput;
|
||||
use event_processor_with_json_output::EventProcessorWithJsonOutput;
|
||||
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
|
||||
use std::io::IsTerminal;
|
||||
use std::io::Read;
|
||||
use std::path::PathBuf;
|
||||
use tracing::debug;
|
||||
use tracing::error;
|
||||
use tracing::info;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
use tracing_subscriber::prelude::*;
|
||||
|
||||
use crate::cli::Command as ExecCommand;
|
||||
use crate::event_processor::CodexStatus;
|
||||
@@ -105,19 +106,18 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
|
||||
),
|
||||
};
|
||||
|
||||
// TODO(mbolin): Take a more thoughtful approach to logging.
|
||||
// Build fmt layer (existing logging) to compose with OTEL layer.
|
||||
let default_level = "error";
|
||||
let _ = tracing_subscriber::fmt()
|
||||
// Fallback to the `default_level` log filter if the environment
|
||||
// variable is not set _or_ contains an invalid value
|
||||
.with_env_filter(
|
||||
EnvFilter::try_from_default_env()
|
||||
.or_else(|_| EnvFilter::try_new(default_level))
|
||||
.unwrap_or_else(|_| EnvFilter::new(default_level)),
|
||||
)
|
||||
|
||||
// Build env_filter separately and attach via with_filter.
|
||||
let env_filter = EnvFilter::try_from_default_env()
|
||||
.or_else(|_| EnvFilter::try_new(default_level))
|
||||
.unwrap_or_else(|_| EnvFilter::new(default_level));
|
||||
|
||||
let fmt_layer = tracing_subscriber::fmt::layer()
|
||||
.with_ansi(stderr_with_ansi)
|
||||
.with_writer(std::io::stderr)
|
||||
.try_init();
|
||||
.with_filter(env_filter);
|
||||
|
||||
let sandbox_mode = if full_auto {
|
||||
Some(SandboxMode::WorkspaceWrite)
|
||||
@@ -173,6 +173,31 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
|
||||
};
|
||||
|
||||
let config = Config::load_with_cli_overrides(cli_kv_overrides, overrides)?;
|
||||
|
||||
let otel = codex_core::otel_init::build_provider(&config, env!("CARGO_PKG_VERSION"));
|
||||
|
||||
#[allow(clippy::print_stderr)]
|
||||
let otel = match otel {
|
||||
Ok(otel) => otel,
|
||||
Err(e) => {
|
||||
eprintln!("Could not create otel exporter: {e}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(provider) = otel {
|
||||
let otel_layer = OpenTelemetryTracingBridge::new(&provider.logger).with_filter(
|
||||
tracing_subscriber::filter::filter_fn(codex_core::otel_init::codex_export_filter),
|
||||
);
|
||||
|
||||
let _ = tracing_subscriber::registry()
|
||||
.with(fmt_layer)
|
||||
.with(otel_layer)
|
||||
.try_init();
|
||||
} else {
|
||||
let _ = tracing_subscriber::registry().with(fmt_layer).try_init();
|
||||
}
|
||||
|
||||
let mut event_processor: Box<dyn EventProcessor> = if json_mode {
|
||||
Box::new(EventProcessorWithJsonOutput::new(last_message_file.clone()))
|
||||
} else {
|
||||
|
||||
37
codex-rs/otel/Cargo.toml
Normal file
37
codex-rs/otel/Cargo.toml
Normal file
@@ -0,0 +1,37 @@
|
||||
[package]
|
||||
edition = "2024"
|
||||
name = "codex-otel"
|
||||
version = { workspace = true }
|
||||
|
||||
[lib]
|
||||
name = "codex_otel"
|
||||
path = "src/lib.rs"
|
||||
doctest = false
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[features]
|
||||
# Compile-time gate for OTLP support; disabled by default.
|
||||
# Downstream crates can enable via `features = ["otel"]`.
|
||||
default = []
|
||||
otel = [
|
||||
"opentelemetry",
|
||||
"opentelemetry_sdk",
|
||||
"opentelemetry-otlp",
|
||||
"tonic",
|
||||
]
|
||||
|
||||
[dependencies]
|
||||
# Optional to keep build lean unless feature enabled
|
||||
codex-protocol = { path = "../protocol" }
|
||||
chrono = "0.4.42"
|
||||
tracing = { version = "0.1" }
|
||||
opentelemetry = { version = "0.30.0", features = ["logs"], optional = true }
|
||||
opentelemetry_sdk = { version = "0.30.0", features = ["logs", "rt-tokio"], optional = true }
|
||||
opentelemetry-otlp = { version = "0.30.0", features = ["grpc-tonic", "http-proto", "http-json", "reqwest", "reqwest-rustls"], optional = true }
|
||||
tonic = { version = "0.13.1", optional = true }
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
strum_macros = "0.27.2"
|
||||
opentelemetry-semantic-conventions = "0.30.0"
|
||||
reqwest = "0.12.23"
|
||||
33
codex-rs/otel/src/config.rs
Normal file
33
codex-rs/otel/src/config.rs
Normal file
@@ -0,0 +1,33 @@
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct OtelSettings {
|
||||
pub environment: String,
|
||||
pub service_name: String,
|
||||
pub service_version: String,
|
||||
pub codex_home: PathBuf,
|
||||
pub exporter: OtelExporter,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum OtelHttpProtocol {
|
||||
/// HTTP protocol with binary protobuf
|
||||
Binary,
|
||||
/// HTTP protocol with JSON payload
|
||||
Json,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum OtelExporter {
|
||||
None,
|
||||
OtlpGrpc {
|
||||
endpoint: String,
|
||||
headers: HashMap<String, String>,
|
||||
},
|
||||
OtlpHttp {
|
||||
endpoint: String,
|
||||
headers: HashMap<String, String>,
|
||||
protocol: OtelHttpProtocol,
|
||||
},
|
||||
}
|
||||
26
codex-rs/otel/src/lib.rs
Normal file
26
codex-rs/otel/src/lib.rs
Normal file
@@ -0,0 +1,26 @@
|
||||
pub mod config;
|
||||
|
||||
pub mod otel_event_manager;
|
||||
#[cfg(feature = "otel")]
|
||||
pub mod otel_provider;
|
||||
|
||||
#[cfg(not(feature = "otel"))]
|
||||
mod imp {
|
||||
use reqwest::header::HeaderMap;
|
||||
use tracing::Span;
|
||||
|
||||
pub struct OtelProvider;
|
||||
|
||||
impl OtelProvider {
|
||||
pub fn from(_settings: &crate::config::OtelSettings) -> Option<Self> {
|
||||
None
|
||||
}
|
||||
|
||||
pub fn headers(_span: &Span) -> HeaderMap {
|
||||
HeaderMap::new()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "otel"))]
|
||||
pub use imp::OtelProvider;
|
||||
312
codex-rs/otel/src/otel_event_manager.rs
Normal file
312
codex-rs/otel/src/otel_event_manager.rs
Normal file
@@ -0,0 +1,312 @@
|
||||
use chrono::SecondsFormat;
|
||||
use chrono::Utc;
|
||||
use codex_protocol::config_types::ReasoningEffort;
|
||||
use codex_protocol::config_types::ReasoningSummary;
|
||||
use codex_protocol::mcp_protocol::AuthMode;
|
||||
use codex_protocol::mcp_protocol::ConversationId;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::InputItem;
|
||||
use codex_protocol::protocol::ReviewDecision;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use reqwest::Error;
|
||||
use reqwest::Response;
|
||||
use serde::Serialize;
|
||||
use std::time::Duration;
|
||||
use strum_macros::Display;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Display)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ToolDecisionSource {
|
||||
Config,
|
||||
User,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct OtelEventMetadata {
|
||||
conversation_id: ConversationId,
|
||||
auth_mode: Option<String>,
|
||||
account_id: Option<String>,
|
||||
model: String,
|
||||
slug: String,
|
||||
log_user_prompts: bool,
|
||||
app_version: &'static str,
|
||||
terminal_type: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct OtelEventManager {
|
||||
metadata: OtelEventMetadata,
|
||||
}
|
||||
|
||||
impl OtelEventManager {
|
||||
pub fn new(
|
||||
conversation_id: ConversationId,
|
||||
model: &str,
|
||||
slug: &str,
|
||||
account_id: Option<String>,
|
||||
auth_mode: Option<AuthMode>,
|
||||
log_user_prompts: bool,
|
||||
terminal_type: String,
|
||||
) -> OtelEventManager {
|
||||
Self {
|
||||
metadata: OtelEventMetadata {
|
||||
conversation_id,
|
||||
auth_mode: auth_mode.map(|m| m.to_string()),
|
||||
account_id,
|
||||
model: model.to_owned(),
|
||||
slug: slug.to_owned(),
|
||||
log_user_prompts,
|
||||
app_version: env!("CARGO_PKG_VERSION"),
|
||||
terminal_type,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_model(&self, model: &str, slug: &str) -> Self {
|
||||
let mut manager = self.clone();
|
||||
manager.metadata.model = model.to_owned();
|
||||
manager.metadata.slug = slug.to_owned();
|
||||
manager
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn conversation_starts(
|
||||
&self,
|
||||
provider_name: &str,
|
||||
reasoning_effort: Option<ReasoningEffort>,
|
||||
reasoning_summary: ReasoningSummary,
|
||||
context_window: Option<u64>,
|
||||
max_output_tokens: Option<u64>,
|
||||
auto_compact_token_limit: Option<i64>,
|
||||
approval_policy: AskForApproval,
|
||||
sandbox_policy: SandboxPolicy,
|
||||
mcp_servers: Vec<&str>,
|
||||
active_profile: Option<String>,
|
||||
) {
|
||||
tracing::event!(
|
||||
tracing::Level::INFO,
|
||||
event.name = "codex.conversation_starts",
|
||||
event.timestamp = %timestamp(),
|
||||
conversation.id = %self.metadata.conversation_id,
|
||||
app.version = %self.metadata.app_version,
|
||||
auth_mode = self.metadata.auth_mode,
|
||||
user.account_id = self.metadata.account_id,
|
||||
terminal.type = %self.metadata.terminal_type,
|
||||
model = %self.metadata.model,
|
||||
slug = %self.metadata.slug,
|
||||
provider_name = %provider_name,
|
||||
reasoning_effort = reasoning_effort.map(|e| e.to_string()),
|
||||
reasoning_summary = %reasoning_summary,
|
||||
context_window = context_window,
|
||||
max_output_tokens = max_output_tokens,
|
||||
auto_compact_token_limit = auto_compact_token_limit,
|
||||
approval_policy = %approval_policy,
|
||||
sandbox_policy = %sandbox_policy,
|
||||
mcp_servers = mcp_servers.join(", "),
|
||||
active_profile = active_profile,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn request(
|
||||
&self,
|
||||
cf_ray: Option<String>,
|
||||
attempt: u64,
|
||||
duration: Duration,
|
||||
response: &Result<Response, Error>,
|
||||
) {
|
||||
let (status, error) = match response {
|
||||
Ok(response) => (Some(response.status().as_u16()), None),
|
||||
Err(error) => (error.status().map(|s| s.as_u16()), Some(error.to_string())),
|
||||
};
|
||||
|
||||
tracing::event!(
|
||||
tracing::Level::INFO,
|
||||
event.name = "codex.api_request",
|
||||
event.timestamp = %timestamp(),
|
||||
conversation.id = %self.metadata.conversation_id,
|
||||
app.version = %self.metadata.app_version,
|
||||
auth_mode = self.metadata.auth_mode,
|
||||
user.account_id = self.metadata.account_id,
|
||||
terminal.type = %self.metadata.terminal_type,
|
||||
model = %self.metadata.model,
|
||||
slug = %self.metadata.slug,
|
||||
cf_ray = cf_ray,
|
||||
duration_ms = %duration.as_millis(),
|
||||
http.response.status_code = status,
|
||||
error.message = error,
|
||||
attempt = attempt,
|
||||
);
|
||||
}
|
||||
|
||||
pub fn sse_event(&self, kind: &str, duration: Duration) {
|
||||
tracing::event!(
|
||||
tracing::Level::INFO,
|
||||
event.name = "codex.sse_event",
|
||||
event.timestamp = %timestamp(),
|
||||
event.kind = %kind,
|
||||
conversation.id = %self.metadata.conversation_id,
|
||||
app.version = %self.metadata.app_version,
|
||||
auth_mode = self.metadata.auth_mode,
|
||||
user.account_id = self.metadata.account_id,
|
||||
terminal.type = %self.metadata.terminal_type,
|
||||
model = %self.metadata.model,
|
||||
slug = %self.metadata.slug,
|
||||
duration_ms = %duration.as_millis(),
|
||||
);
|
||||
}
|
||||
|
||||
pub fn sse_event_failed(&self, kind: Option<String>, duration: Duration, error: &str) {
|
||||
match kind {
|
||||
Some(kind) => tracing::event!(
|
||||
tracing::Level::INFO,
|
||||
event.name = "codex.sse_event",
|
||||
event.timestamp = %timestamp(),
|
||||
event.kind = %kind,
|
||||
conversation.id = %self.metadata.conversation_id,
|
||||
app.version = %self.metadata.app_version,
|
||||
auth_mode = self.metadata.auth_mode,
|
||||
user.account_id = self.metadata.account_id,
|
||||
terminal.type = %self.metadata.terminal_type,
|
||||
model = %self.metadata.model,
|
||||
slug = %self.metadata.slug,
|
||||
duration_ms = %duration.as_millis(),
|
||||
error.message = %error,
|
||||
),
|
||||
None => tracing::event!(
|
||||
tracing::Level::INFO,
|
||||
event.name = "codex.sse_event",
|
||||
event.timestamp = %timestamp(),
|
||||
conversation.id = %self.metadata.conversation_id,
|
||||
app.version = %self.metadata.app_version,
|
||||
auth_mode = self.metadata.auth_mode,
|
||||
user.account_id = self.metadata.account_id,
|
||||
terminal.type = %self.metadata.terminal_type,
|
||||
model = %self.metadata.model,
|
||||
slug = %self.metadata.slug,
|
||||
duration_ms = %duration.as_millis(),
|
||||
error.message = %error,
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn sse_event_completed(
|
||||
&self,
|
||||
duration: Duration,
|
||||
input_token_count: u64,
|
||||
output_token_count: u64,
|
||||
cached_token_count: Option<u64>,
|
||||
reasoning_token_count: Option<u64>,
|
||||
tool_token_count: u64,
|
||||
) {
|
||||
tracing::event!(
|
||||
tracing::Level::INFO,
|
||||
event.name = "codex.sse_event",
|
||||
event.timestamp = %timestamp(),
|
||||
event.kind = %"response.completed",
|
||||
conversation.id = %self.metadata.conversation_id,
|
||||
app.version = %self.metadata.app_version,
|
||||
auth_mode = self.metadata.auth_mode,
|
||||
user.account_id = self.metadata.account_id,
|
||||
terminal.type = %self.metadata.terminal_type,
|
||||
model = %self.metadata.model,
|
||||
slug = %self.metadata.slug,
|
||||
duration_ms = %duration.as_millis(),
|
||||
input_token_count = %input_token_count,
|
||||
output_token_count = %output_token_count,
|
||||
cached_token_count = cached_token_count,
|
||||
reasoning_token_count = reasoning_token_count,
|
||||
tool_token_count = %tool_token_count,
|
||||
);
|
||||
}
|
||||
|
||||
pub fn user_prompt(&self, items: &[InputItem]) {
|
||||
let prompt = items
|
||||
.iter()
|
||||
.flat_map(|item| match item {
|
||||
InputItem::Text { text } => Some(text.as_str()),
|
||||
_ => None,
|
||||
})
|
||||
.collect::<String>();
|
||||
|
||||
let prompt_to_log = if self.metadata.log_user_prompts {
|
||||
prompt.as_str()
|
||||
} else {
|
||||
"[REDACTED]"
|
||||
};
|
||||
|
||||
tracing::event!(
|
||||
tracing::Level::INFO,
|
||||
event.name = "codex.user_prompt",
|
||||
event.timestamp = %timestamp(),
|
||||
conversation.id = %self.metadata.conversation_id,
|
||||
app.version = %self.metadata.app_version,
|
||||
auth_mode = self.metadata.auth_mode,
|
||||
user.account_id = self.metadata.account_id,
|
||||
terminal.type = %self.metadata.terminal_type,
|
||||
model = %self.metadata.model,
|
||||
slug = %self.metadata.slug,
|
||||
prompt_length = %prompt.chars().count(),
|
||||
prompt = %prompt_to_log,
|
||||
);
|
||||
}
|
||||
|
||||
pub fn tool_decision(
|
||||
&self,
|
||||
tool_name: &str,
|
||||
call_id: &str,
|
||||
decision: ReviewDecision,
|
||||
source: ToolDecisionSource,
|
||||
) {
|
||||
tracing::event!(
|
||||
tracing::Level::INFO,
|
||||
event.name = "codex.tool_decision",
|
||||
event.timestamp = %timestamp(),
|
||||
conversation.id = %self.metadata.conversation_id,
|
||||
app.version = %self.metadata.app_version,
|
||||
auth_mode = self.metadata.auth_mode,
|
||||
user.account_id = self.metadata.account_id,
|
||||
terminal.type = %self.metadata.terminal_type,
|
||||
model = %self.metadata.model,
|
||||
slug = %self.metadata.slug,
|
||||
tool_name = %tool_name,
|
||||
call_id = %call_id,
|
||||
decision = %decision.to_string().to_lowercase(),
|
||||
source = %source.to_string(),
|
||||
);
|
||||
}
|
||||
|
||||
pub fn tool_result(
|
||||
&self,
|
||||
tool_name: &str,
|
||||
call_id: &str,
|
||||
arguments: &str,
|
||||
duration: Duration,
|
||||
success: bool,
|
||||
output: String,
|
||||
) {
|
||||
let success_str = if success { "true" } else { "false" };
|
||||
|
||||
tracing::event!(
|
||||
tracing::Level::INFO,
|
||||
event.name = "codex.tool_result",
|
||||
event.timestamp = %timestamp(),
|
||||
conversation.id = %self.metadata.conversation_id,
|
||||
app.version = %self.metadata.app_version,
|
||||
auth_mode = self.metadata.auth_mode,
|
||||
user.account_id = self.metadata.account_id,
|
||||
terminal.type = %self.metadata.terminal_type,
|
||||
model = %self.metadata.model,
|
||||
slug = %self.metadata.slug,
|
||||
tool_name = %tool_name,
|
||||
call_id = %call_id,
|
||||
arguments = %arguments,
|
||||
duration_ms = %duration.as_millis(),
|
||||
success = %success_str,
|
||||
output = %output,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn timestamp() -> String {
|
||||
Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true)
|
||||
}
|
||||
103
codex-rs/otel/src/otel_provider.rs
Normal file
103
codex-rs/otel/src/otel_provider.rs
Normal file
@@ -0,0 +1,103 @@
|
||||
use crate::config::OtelExporter;
|
||||
use crate::config::OtelHttpProtocol;
|
||||
use crate::config::OtelSettings;
|
||||
use opentelemetry::KeyValue;
|
||||
use opentelemetry_otlp::LogExporter;
|
||||
use opentelemetry_otlp::Protocol;
|
||||
use opentelemetry_otlp::WithExportConfig;
|
||||
use opentelemetry_otlp::WithHttpConfig;
|
||||
use opentelemetry_otlp::WithTonicConfig;
|
||||
use opentelemetry_sdk::Resource;
|
||||
use opentelemetry_sdk::logs::SdkLoggerProvider;
|
||||
use opentelemetry_semantic_conventions as semconv;
|
||||
use reqwest::header::HeaderMap;
|
||||
use reqwest::header::HeaderName;
|
||||
use reqwest::header::HeaderValue;
|
||||
use std::error::Error;
|
||||
use tonic::metadata::MetadataMap;
|
||||
use tracing::debug;
|
||||
|
||||
const ENV_ATTRIBUTE: &str = "env";
|
||||
|
||||
pub struct OtelProvider {
|
||||
pub logger: SdkLoggerProvider,
|
||||
}
|
||||
|
||||
impl OtelProvider {
|
||||
pub fn shutdown(&self) {
|
||||
let _ = self.logger.shutdown();
|
||||
}
|
||||
|
||||
pub fn from(settings: &OtelSettings) -> Result<Option<Self>, Box<dyn Error>> {
|
||||
let resource = Resource::builder()
|
||||
.with_service_name(settings.service_name.clone())
|
||||
.with_attributes(vec![
|
||||
KeyValue::new(
|
||||
semconv::attribute::SERVICE_VERSION,
|
||||
settings.service_version.clone(),
|
||||
),
|
||||
KeyValue::new(ENV_ATTRIBUTE, settings.environment.clone()),
|
||||
])
|
||||
.build();
|
||||
|
||||
let mut builder = SdkLoggerProvider::builder().with_resource(resource);
|
||||
|
||||
match &settings.exporter {
|
||||
OtelExporter::None => {
|
||||
debug!("No exporter enabled in OTLP settings.");
|
||||
return Ok(None);
|
||||
}
|
||||
OtelExporter::OtlpGrpc { endpoint, headers } => {
|
||||
debug!("Using OTLP Grpc exporter: {}", endpoint);
|
||||
|
||||
let mut header_map = HeaderMap::new();
|
||||
for (key, value) in headers {
|
||||
if let Ok(name) = HeaderName::from_bytes(key.as_bytes())
|
||||
&& let Ok(val) = HeaderValue::from_str(value)
|
||||
{
|
||||
header_map.insert(name, val);
|
||||
}
|
||||
}
|
||||
|
||||
let exporter = LogExporter::builder()
|
||||
.with_tonic()
|
||||
.with_endpoint(endpoint)
|
||||
.with_metadata(MetadataMap::from_headers(header_map))
|
||||
.build()?;
|
||||
|
||||
builder = builder.with_batch_exporter(exporter);
|
||||
}
|
||||
OtelExporter::OtlpHttp {
|
||||
endpoint,
|
||||
headers,
|
||||
protocol,
|
||||
} => {
|
||||
debug!("Using OTLP Http exporter: {}", endpoint);
|
||||
|
||||
let protocol = match protocol {
|
||||
OtelHttpProtocol::Binary => Protocol::HttpBinary,
|
||||
OtelHttpProtocol::Json => Protocol::HttpJson,
|
||||
};
|
||||
|
||||
let exporter = LogExporter::builder()
|
||||
.with_http()
|
||||
.with_endpoint(endpoint)
|
||||
.with_protocol(protocol)
|
||||
.with_headers(headers.clone())
|
||||
.build()?;
|
||||
|
||||
builder = builder.with_batch_exporter(exporter);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Some(Self {
|
||||
logger: builder.build(),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for OtelProvider {
|
||||
fn drop(&mut self) {
|
||||
let _ = self.logger.shutdown();
|
||||
}
|
||||
}
|
||||
@@ -81,7 +81,7 @@ impl GitSha {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, TS)]
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, Display, TS)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum AuthMode {
|
||||
ApiKey,
|
||||
|
||||
@@ -1186,7 +1186,7 @@ pub struct SessionConfiguredEvent {
|
||||
}
|
||||
|
||||
/// User's decision in response to an ExecApprovalRequest.
|
||||
#[derive(Debug, Default, Clone, Copy, Deserialize, Serialize, PartialEq, Eq, TS)]
|
||||
#[derive(Debug, Default, Clone, Copy, Deserialize, Serialize, PartialEq, Eq, Display, TS)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ReviewDecision {
|
||||
/// User has approved this command and the agent should execute it.
|
||||
|
||||
@@ -81,6 +81,7 @@ tracing = { version = "0.1.41", features = ["log"] }
|
||||
tracing-appender = "0.2.3"
|
||||
tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
|
||||
pulldown-cmark = "0.10"
|
||||
opentelemetry-appender-tracing = "0.30.1"
|
||||
unicode-segmentation = "1.12.0"
|
||||
unicode-width = "0.1"
|
||||
url = "2"
|
||||
|
||||
@@ -22,6 +22,7 @@ use codex_core::protocol::SandboxPolicy;
|
||||
use codex_ollama::DEFAULT_OSS_MODEL;
|
||||
use codex_protocol::config_types::SandboxMode;
|
||||
use codex_protocol::mcp_protocol::AuthMode;
|
||||
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
|
||||
use std::fs::OpenOptions;
|
||||
use std::path::PathBuf;
|
||||
use tracing::error;
|
||||
@@ -239,9 +240,31 @@ pub async fn run_main(
|
||||
.map_err(|e| std::io::Error::other(format!("OSS setup failed: {e}")))?;
|
||||
}
|
||||
|
||||
let _ = tracing_subscriber::registry().with(file_layer).try_init();
|
||||
let otel = codex_core::otel_init::build_provider(&config, env!("CARGO_PKG_VERSION"));
|
||||
|
||||
run_ratatui_app(
|
||||
#[allow(clippy::print_stderr)]
|
||||
let otel = match otel {
|
||||
Ok(otel) => otel,
|
||||
Err(e) => {
|
||||
eprintln!("Could not create otel exporter: {e}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(provider) = otel.as_ref() {
|
||||
let otel_layer = OpenTelemetryTracingBridge::new(&provider.logger).with_filter(
|
||||
tracing_subscriber::filter::filter_fn(codex_core::otel_init::codex_export_filter),
|
||||
);
|
||||
|
||||
let _ = tracing_subscriber::registry()
|
||||
.with(file_layer)
|
||||
.with(otel_layer)
|
||||
.try_init();
|
||||
} else {
|
||||
let _ = tracing_subscriber::registry().with(file_layer).try_init();
|
||||
};
|
||||
|
||||
let app_result = run_ratatui_app(
|
||||
cli,
|
||||
config,
|
||||
internal_storage,
|
||||
@@ -249,7 +272,13 @@ pub async fn run_main(
|
||||
should_show_trust_screen,
|
||||
)
|
||||
.await
|
||||
.map_err(|err| std::io::Error::other(err.to_string()))
|
||||
.map_err(|err| std::io::Error::other(err.to_string()));
|
||||
|
||||
if let Some(provider) = otel {
|
||||
provider.shutdown();
|
||||
}
|
||||
|
||||
app_result
|
||||
}
|
||||
|
||||
async fn run_ratatui_app(
|
||||
|
||||
112
docs/config.md
112
docs/config.md
@@ -432,6 +432,118 @@ set = { PATH = "/usr/bin", MY_FLAG = "1" }
|
||||
|
||||
Currently, `CODEX_SANDBOX_NETWORK_DISABLED=1` is also added to the environment, assuming network is disabled. This is not configurable.
|
||||
|
||||
## otel
|
||||
|
||||
Codex can emit [OpenTelemetry](https://opentelemetry.io/) **log events** that
|
||||
describe each run: outbound API requests, streamed responses, user input,
|
||||
tool-approval decisions, and the result of every tool invocation. Export is
|
||||
**disabled by default** so local runs remain self-contained. Opt in by adding an
|
||||
`[otel]` table and choosing an exporter.
|
||||
|
||||
```toml
|
||||
[otel]
|
||||
environment = "staging" # defaults to "dev"
|
||||
exporter = "none" # defaults to "none"; set to otlp-http or otlp-grpc to send events
|
||||
log_user_prompt = false # defaults to false; redact prompt text unless explicitly enabled
|
||||
```
|
||||
|
||||
Codex tags every exported event with `service.name = $ORIGINATOR` (the same
|
||||
value sent in the `originator` header, `codex_cli_rs` by default), the CLI
|
||||
version, and an `env` attribute so downstream collectors can distinguish
|
||||
dev/staging/prod traffic. Only telemetry produced inside the `codex_otel`
|
||||
crate—the events listed below—is forwarded to the exporter.
|
||||
|
||||
### Event catalog
|
||||
|
||||
Every event shares a common set of metadata fields: `event.timestamp`,
|
||||
`conversation.id`, `app.version`, `auth_mode` (when available),
|
||||
`user.account_id` (when available), `terminal.type`, `model`, and `slug`.
|
||||
|
||||
With OTEL enabled Codex emits the following event types (in addition to the
|
||||
metadata above):
|
||||
|
||||
- `codex.conversation_starts`
|
||||
- `provider_name`
|
||||
- `reasoning_effort` (optional)
|
||||
- `reasoning_summary`
|
||||
- `context_window` (optional)
|
||||
- `max_output_tokens` (optional)
|
||||
- `auto_compact_token_limit` (optional)
|
||||
- `approval_policy`
|
||||
- `sandbox_policy`
|
||||
- `mcp_servers` (comma-separated list)
|
||||
- `active_profile` (optional)
|
||||
- `codex.api_request`
|
||||
- `cf_ray` (optional)
|
||||
- `attempt`
|
||||
- `duration_ms`
|
||||
- `http.response.status_code` (optional)
|
||||
- `error.message` (failures)
|
||||
- `codex.sse_event`
|
||||
- `event.kind`
|
||||
- `duration_ms`
|
||||
- `error.message` (failures)
|
||||
- `input_token_count` (completion only)
|
||||
- `output_token_count` (completion only)
|
||||
- `cached_token_count` (completion only, optional)
|
||||
- `reasoning_token_count` (completion only, optional)
|
||||
- `tool_token_count` (completion only)
|
||||
- `codex.user_prompt`
|
||||
- `prompt_length`
|
||||
- `prompt` (redacted unless `log_user_prompt = true`)
|
||||
- `codex.tool_decision`
|
||||
- `tool_name`
|
||||
- `call_id`
|
||||
- `decision` (`approved`, `approved_for_session`, `denied`, or `abort`)
|
||||
- `source` (`config` or `user`)
|
||||
- `codex.tool_result`
|
||||
- `tool_name`
|
||||
- `call_id`
|
||||
- `arguments`
|
||||
- `duration_ms` (execution time for the tool)
|
||||
- `success` (`"true"` or `"false"`)
|
||||
- `output`
|
||||
|
||||
These event shapes may change as we iterate.
|
||||
|
||||
### Choosing an exporter
|
||||
|
||||
Set `otel.exporter` to control where events go:
|
||||
|
||||
- `none` – leaves instrumentation active but skips exporting. This is the
|
||||
default.
|
||||
- `otlp-http` – posts OTLP log records to an OTLP/HTTP collector. Specify the
|
||||
endpoint, protocol, and headers your collector expects:
|
||||
|
||||
```toml
|
||||
[otel]
|
||||
exporter = { otlp-http = {
|
||||
endpoint = "https://otel.example.com/v1/logs",
|
||||
protocol = "binary",
|
||||
headers = { "x-otlp-api-key" = "${OTLP_TOKEN}" }
|
||||
}}
|
||||
```
|
||||
|
||||
- `otlp-grpc` – streams OTLP log records over gRPC. Provide the endpoint and any
|
||||
metadata headers:
|
||||
|
||||
```toml
|
||||
[otel]
|
||||
exporter = { otlp-grpc = {
|
||||
endpoint = "https://otel.example.com:4317",
|
||||
headers = { "x-otlp-meta" = "abc123" }
|
||||
}}
|
||||
```
|
||||
|
||||
If the exporter is `none` nothing is written anywhere; otherwise you must run or point to your
|
||||
own collector. All exporters run on a background batch worker that is flushed on
|
||||
shutdown.
|
||||
|
||||
If you build Codex from source the OTEL crate is still behind an `otel` feature
|
||||
flag; the official prebuilt binaries ship with the feature enabled. When the
|
||||
feature is disabled the telemetry hooks become no-ops so the CLI continues to
|
||||
function without the extra dependencies.
|
||||
|
||||
## notify
|
||||
|
||||
Specify a program that will be executed to get notified about events generated by Codex. Note that the program will receive the notification argument as a string of JSON, e.g.:
|
||||
|
||||
Reference in New Issue
Block a user