mirror of
https://github.com/openai/codex.git
synced 2026-03-05 14:13:21 +00:00
Compare commits
1 Commits
fix/notify
...
codex/data
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
43c6030612 |
@@ -412,6 +412,10 @@
|
||||
"description": "Value to use with `Authorization: Bearer <token>` header. Use of this config is discouraged in favor of `env_key` for security reasons, but this may be necessary when using this programmatically.",
|
||||
"type": "string"
|
||||
},
|
||||
"force_datadog_tracing": {
|
||||
"description": "Whether to include Datadog tracing headers on Responses API calls.",
|
||||
"type": "boolean"
|
||||
},
|
||||
"http_headers": {
|
||||
"additionalProperties": {
|
||||
"type": "string"
|
||||
|
||||
@@ -50,6 +50,7 @@ use tokio::sync::mpsc;
|
||||
use tokio_tungstenite::tungstenite::Error;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
use tracing::warn;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::AuthManager;
|
||||
use crate::auth::CodexAuth;
|
||||
@@ -72,6 +73,10 @@ use crate::transport_manager::TransportManager;
|
||||
|
||||
pub const WEB_SEARCH_ELIGIBLE_HEADER: &str = "x-oai-web-search-eligible";
|
||||
pub const X_CODEX_TURN_STATE_HEADER: &str = "x-codex-turn-state";
|
||||
const X_DATADOG_TRACE_ID_HEADER: &str = "x-datadog-trace-id";
|
||||
const X_DATADOG_PARENT_ID_HEADER: &str = "x-datadog-parent-id";
|
||||
const X_DATADOG_SAMPLING_PRIORITY_HEADER: &str = "x-datadog-sampling-priority";
|
||||
const DATADOG_SAMPLING_PRIORITY: i32 = 2;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct ModelClientState {
|
||||
@@ -380,7 +385,11 @@ impl ModelClientSession {
|
||||
store_override: None,
|
||||
conversation_id: Some(conversation_id),
|
||||
session_source: Some(self.state.session_source.clone()),
|
||||
extra_headers: build_responses_headers(&self.state.config, Some(&self.turn_state)),
|
||||
extra_headers: build_responses_headers(
|
||||
&self.state.config,
|
||||
&self.state.provider,
|
||||
Some(&self.turn_state),
|
||||
),
|
||||
compression,
|
||||
turn_state: Some(Arc::clone(&self.turn_state)),
|
||||
}
|
||||
@@ -712,6 +721,7 @@ fn experimental_feature_headers(config: &Config) -> ApiHeaderMap {
|
||||
|
||||
fn build_responses_headers(
|
||||
config: &Config,
|
||||
provider: &ModelProviderInfo,
|
||||
turn_state: Option<&Arc<OnceLock<String>>>,
|
||||
) -> ApiHeaderMap {
|
||||
let mut headers = experimental_feature_headers(config);
|
||||
@@ -725,6 +735,7 @@ fn build_responses_headers(
|
||||
},
|
||||
),
|
||||
);
|
||||
apply_responses_trace_headers(&mut headers, provider.force_datadog_tracing);
|
||||
if let Some(turn_state) = turn_state
|
||||
&& let Some(state) = turn_state.get()
|
||||
&& let Ok(header_value) = HeaderValue::from_str(state)
|
||||
@@ -734,6 +745,38 @@ fn build_responses_headers(
|
||||
headers
|
||||
}
|
||||
|
||||
fn apply_responses_trace_headers(headers: &mut ApiHeaderMap, force_datadog_tracing: bool) {
|
||||
if !force_datadog_tracing {
|
||||
return;
|
||||
}
|
||||
|
||||
apply_datadog_headers(headers);
|
||||
}
|
||||
|
||||
fn apply_datadog_headers(headers: &mut ApiHeaderMap) {
|
||||
let trace_id = random_datadog_id();
|
||||
let parent_id = random_datadog_id();
|
||||
if let Ok(trace_header) = HeaderValue::from_str(&trace_id.to_string()) {
|
||||
let _ = headers.insert(X_DATADOG_TRACE_ID_HEADER, trace_header);
|
||||
}
|
||||
if let Ok(parent_header) = HeaderValue::from_str(&parent_id.to_string()) {
|
||||
let _ = headers.insert(X_DATADOG_PARENT_ID_HEADER, parent_header);
|
||||
}
|
||||
let priority_value = DATADOG_SAMPLING_PRIORITY.to_string();
|
||||
if let Ok(priority_header) = HeaderValue::from_str(&priority_value) {
|
||||
let _ = headers.insert(X_DATADOG_SAMPLING_PRIORITY_HEADER, priority_header);
|
||||
}
|
||||
}
|
||||
|
||||
fn random_datadog_id() -> u64 {
|
||||
loop {
|
||||
let value = (Uuid::new_v4().as_u128() & u128::from(u64::MAX)) as u64;
|
||||
if value != 0 {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn map_response_stream<S>(api_stream: S, otel_manager: OtelManager) -> ResponseStream
|
||||
where
|
||||
S: futures::Stream<Item = std::result::Result<ResponseEvent, ApiError>>
|
||||
|
||||
@@ -3722,6 +3722,7 @@ model_verbosity = "high"
|
||||
stream_idle_timeout_ms: Some(300_000),
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
force_datadog_tracing: false,
|
||||
};
|
||||
let model_provider_map = {
|
||||
let mut model_provider_map = built_in_model_providers();
|
||||
|
||||
@@ -106,6 +106,10 @@ pub struct ModelProviderInfo {
|
||||
/// Whether this provider supports the Responses API WebSocket transport.
|
||||
#[serde(default)]
|
||||
pub supports_websockets: bool,
|
||||
|
||||
/// Whether to include Datadog tracing headers on Responses API calls.
|
||||
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
|
||||
pub force_datadog_tracing: bool,
|
||||
}
|
||||
|
||||
impl ModelProviderInfo {
|
||||
@@ -264,6 +268,7 @@ impl ModelProviderInfo {
|
||||
stream_idle_timeout_ms: None,
|
||||
requires_openai_auth: true,
|
||||
supports_websockets: true,
|
||||
force_datadog_tracing: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -343,6 +348,7 @@ pub fn create_oss_provider_with_base_url(base_url: &str, wire_api: WireApi) -> M
|
||||
stream_idle_timeout_ms: None,
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
force_datadog_tracing: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -372,6 +378,7 @@ base_url = "http://localhost:11434/v1"
|
||||
stream_idle_timeout_ms: None,
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
force_datadog_tracing: false,
|
||||
};
|
||||
|
||||
let provider: ModelProviderInfo = toml::from_str(azure_provider_toml).unwrap();
|
||||
@@ -403,6 +410,7 @@ query_params = { api-version = "2025-04-01-preview" }
|
||||
stream_idle_timeout_ms: None,
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
force_datadog_tracing: false,
|
||||
};
|
||||
|
||||
let provider: ModelProviderInfo = toml::from_str(azure_provider_toml).unwrap();
|
||||
@@ -437,6 +445,7 @@ env_http_headers = { "X-Example-Env-Header" = "EXAMPLE_ENV_VAR" }
|
||||
stream_idle_timeout_ms: None,
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
force_datadog_tracing: false,
|
||||
};
|
||||
|
||||
let provider: ModelProviderInfo = toml::from_str(azure_provider_toml).unwrap();
|
||||
|
||||
@@ -437,6 +437,7 @@ mod tests {
|
||||
stream_idle_timeout_ms: Some(5_000),
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
force_datadog_tracing: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -497,6 +498,42 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn refresh_available_models_never_adds_datadog_trace_headers() {
|
||||
let server = MockServer::start().await;
|
||||
let models_mock = mount_models_once(&server, ModelsResponse { models: Vec::new() }).await;
|
||||
|
||||
let codex_home = tempdir().expect("temp dir");
|
||||
let mut config = ConfigBuilder::default()
|
||||
.codex_home(codex_home.path().to_path_buf())
|
||||
.build()
|
||||
.await
|
||||
.expect("load default test config");
|
||||
config.features.enable(Feature::RemoteModels);
|
||||
let auth_manager =
|
||||
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
|
||||
let mut provider = provider_for(server.uri());
|
||||
provider.force_datadog_tracing = true;
|
||||
let manager =
|
||||
ModelsManager::with_provider(codex_home.path().to_path_buf(), auth_manager, provider);
|
||||
|
||||
manager
|
||||
.refresh_available_models(&config, RefreshStrategy::OnlineIfUncached)
|
||||
.await
|
||||
.expect("refresh succeeds");
|
||||
|
||||
let requests = models_mock.requests();
|
||||
assert_eq!(requests.len(), 1);
|
||||
assert!(requests[0].headers.get("x-datadog-trace-id").is_none());
|
||||
assert!(requests[0].headers.get("x-datadog-parent-id").is_none());
|
||||
assert!(
|
||||
requests[0]
|
||||
.headers
|
||||
.get("x-datadog-sampling-priority")
|
||||
.is_none()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn refresh_available_models_uses_cache_when_fresh() {
|
||||
let server = MockServer::start().await;
|
||||
|
||||
@@ -61,6 +61,7 @@ async fn run_request(input: Vec<ResponseItem>) -> Value {
|
||||
stream_idle_timeout_ms: Some(5_000),
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
force_datadog_tracing: false,
|
||||
};
|
||||
|
||||
let codex_home = match TempDir::new() {
|
||||
|
||||
@@ -60,6 +60,7 @@ async fn run_stream_with_bytes(sse_body: &[u8]) -> Vec<ResponseEvent> {
|
||||
stream_idle_timeout_ms: Some(5_000),
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
force_datadog_tracing: false,
|
||||
};
|
||||
|
||||
let codex_home = match TempDir::new() {
|
||||
|
||||
@@ -26,6 +26,10 @@ use futures::StreamExt;
|
||||
use tempfile::TempDir;
|
||||
use wiremock::matchers::header;
|
||||
|
||||
const X_DATADOG_TRACE_ID_HEADER: &str = "x-datadog-trace-id";
|
||||
const X_DATADOG_PARENT_ID_HEADER: &str = "x-datadog-parent-id";
|
||||
const X_DATADOG_SAMPLING_PRIORITY_HEADER: &str = "x-datadog-sampling-priority";
|
||||
|
||||
#[tokio::test]
|
||||
async fn responses_stream_includes_subagent_header_on_review() {
|
||||
core_test_support::skip_if_no_network!();
|
||||
@@ -58,6 +62,7 @@ async fn responses_stream_includes_subagent_header_on_review() {
|
||||
stream_idle_timeout_ms: Some(5_000),
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
force_datadog_tracing: false,
|
||||
};
|
||||
|
||||
let codex_home = TempDir::new().expect("failed to create TempDir");
|
||||
@@ -156,6 +161,7 @@ async fn responses_stream_includes_subagent_header_on_other() {
|
||||
stream_idle_timeout_ms: Some(5_000),
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
force_datadog_tracing: false,
|
||||
};
|
||||
|
||||
let codex_home = TempDir::new().expect("failed to create TempDir");
|
||||
@@ -283,6 +289,110 @@ async fn responses_stream_includes_web_search_eligible_header_false_when_disable
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn responses_stream_includes_datadog_trace_headers_when_enabled() {
|
||||
core_test_support::skip_if_no_network!();
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
let response_body = responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_completed("resp-1"),
|
||||
]);
|
||||
|
||||
let request_recorder = responses::mount_sse_once(&server, response_body).await;
|
||||
|
||||
let provider = ModelProviderInfo {
|
||||
name: "mock".into(),
|
||||
base_url: Some(format!("{}/v1", server.uri())),
|
||||
env_key: None,
|
||||
env_key_instructions: None,
|
||||
experimental_bearer_token: None,
|
||||
wire_api: WireApi::Responses,
|
||||
query_params: None,
|
||||
http_headers: None,
|
||||
env_http_headers: None,
|
||||
request_max_retries: Some(0),
|
||||
stream_max_retries: Some(0),
|
||||
stream_idle_timeout_ms: Some(5_000),
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
force_datadog_tracing: true,
|
||||
};
|
||||
|
||||
let codex_home = TempDir::new().expect("failed to create TempDir");
|
||||
let mut config = load_default_config_for_test(&codex_home).await;
|
||||
config.model_provider_id = provider.name.clone();
|
||||
config.model_provider = provider.clone();
|
||||
let effort = config.model_reasoning_effort;
|
||||
let summary = config.model_reasoning_summary;
|
||||
let model = ModelsManager::get_model_offline(config.model.as_deref());
|
||||
config.model = Some(model.clone());
|
||||
let config = Arc::new(config);
|
||||
|
||||
let conversation_id = ThreadId::new();
|
||||
let auth_mode = AuthMode::Chatgpt;
|
||||
let session_source = SessionSource::SubAgent(SubAgentSource::Review);
|
||||
let model_info = ModelsManager::construct_model_info_offline(model.as_str(), &config);
|
||||
let otel_manager = OtelManager::new(
|
||||
conversation_id,
|
||||
model.as_str(),
|
||||
model_info.slug.as_str(),
|
||||
None,
|
||||
Some("test@test.com".to_string()),
|
||||
Some(auth_mode),
|
||||
false,
|
||||
"test".to_string(),
|
||||
session_source.clone(),
|
||||
);
|
||||
|
||||
let mut client_session = ModelClient::new(
|
||||
Arc::clone(&config),
|
||||
None,
|
||||
model_info,
|
||||
otel_manager,
|
||||
provider,
|
||||
effort,
|
||||
summary,
|
||||
conversation_id,
|
||||
session_source,
|
||||
TransportManager::new(),
|
||||
)
|
||||
.new_session();
|
||||
|
||||
let mut prompt = Prompt::default();
|
||||
prompt.input = vec![ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".into(),
|
||||
content: vec![ContentItem::InputText {
|
||||
text: "hello".into(),
|
||||
}],
|
||||
end_turn: None,
|
||||
}];
|
||||
|
||||
let mut stream = client_session.stream(&prompt).await.expect("stream failed");
|
||||
while let Some(event) = stream.next().await {
|
||||
if matches!(event, Ok(ResponseEvent::Completed { .. })) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let request = request_recorder.single_request();
|
||||
let trace_id = request
|
||||
.header(X_DATADOG_TRACE_ID_HEADER)
|
||||
.expect("trace id header");
|
||||
let parent_id = request
|
||||
.header(X_DATADOG_PARENT_ID_HEADER)
|
||||
.expect("parent id header");
|
||||
assert!(trace_id.parse::<u64>().is_ok_and(|id| id != 0));
|
||||
assert!(parent_id.parse::<u64>().is_ok_and(|id| id != 0));
|
||||
assert_eq!(
|
||||
request
|
||||
.header(X_DATADOG_SAMPLING_PRIORITY_HEADER)
|
||||
.as_deref(),
|
||||
Some("2")
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn responses_respects_model_info_overrides_from_config() {
|
||||
core_test_support::skip_if_no_network!();
|
||||
@@ -310,6 +420,7 @@ async fn responses_respects_model_info_overrides_from_config() {
|
||||
stream_idle_timeout_ms: Some(5_000),
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
force_datadog_tracing: false,
|
||||
};
|
||||
|
||||
let codex_home = TempDir::new().expect("failed to create TempDir");
|
||||
|
||||
@@ -1152,6 +1152,7 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() {
|
||||
stream_idle_timeout_ms: Some(5_000),
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
force_datadog_tracing: false,
|
||||
};
|
||||
|
||||
let codex_home = TempDir::new().unwrap();
|
||||
@@ -1674,6 +1675,7 @@ async fn azure_overrides_assign_properties_used_for_responses_url() {
|
||||
stream_idle_timeout_ms: None,
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
force_datadog_tracing: false,
|
||||
};
|
||||
|
||||
// Init session
|
||||
@@ -1755,6 +1757,7 @@ async fn env_var_overrides_loaded_auth() {
|
||||
stream_idle_timeout_ms: None,
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
force_datadog_tracing: false,
|
||||
};
|
||||
|
||||
// Init session
|
||||
|
||||
@@ -35,6 +35,9 @@ use tempfile::TempDir;
|
||||
use tracing_test::traced_test;
|
||||
|
||||
const MODEL: &str = "gpt-5.2-codex";
|
||||
const X_DATADOG_TRACE_ID_HEADER: &str = "x-datadog-trace-id";
|
||||
const X_DATADOG_PARENT_ID_HEADER: &str = "x-datadog-parent-id";
|
||||
const X_DATADOG_SAMPLING_PRIORITY_HEADER: &str = "x-datadog-sampling-priority";
|
||||
|
||||
struct WebsocketTestHarness {
|
||||
_codex_home: TempDir,
|
||||
@@ -136,6 +139,42 @@ async fn responses_websocket_emits_reasoning_included_event() {
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn responses_websocket_includes_datadog_trace_headers_when_enabled() {
|
||||
skip_if_no_network!();
|
||||
|
||||
let server = start_websocket_server(vec![vec![vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_completed("resp-1"),
|
||||
]]])
|
||||
.await;
|
||||
|
||||
let mut provider = websocket_provider(&server);
|
||||
provider.force_datadog_tracing = true;
|
||||
let harness = websocket_harness_with_provider(provider).await;
|
||||
let mut session = harness.client.new_session();
|
||||
let prompt = prompt_with_input(vec![message_item("hello")]);
|
||||
stream_until_complete(&mut session, &prompt).await;
|
||||
|
||||
let handshake = server.single_handshake();
|
||||
let trace_id = handshake
|
||||
.header(X_DATADOG_TRACE_ID_HEADER)
|
||||
.expect("trace id header");
|
||||
let parent_id = handshake
|
||||
.header(X_DATADOG_PARENT_ID_HEADER)
|
||||
.expect("parent id header");
|
||||
assert!(trace_id.parse::<u64>().is_ok_and(|id| id != 0));
|
||||
assert!(parent_id.parse::<u64>().is_ok_and(|id| id != 0));
|
||||
assert_eq!(
|
||||
handshake
|
||||
.header(X_DATADOG_SAMPLING_PRIORITY_HEADER)
|
||||
.as_deref(),
|
||||
Some("2")
|
||||
);
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn responses_websocket_appends_on_prefix() {
|
||||
skip_if_no_network!();
|
||||
@@ -236,11 +275,16 @@ fn websocket_provider(server: &WebSocketTestServer) -> ModelProviderInfo {
|
||||
stream_idle_timeout_ms: Some(5_000),
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: true,
|
||||
force_datadog_tracing: false,
|
||||
}
|
||||
}
|
||||
|
||||
async fn websocket_harness(server: &WebSocketTestServer) -> WebsocketTestHarness {
|
||||
let provider = websocket_provider(server);
|
||||
websocket_harness_with_provider(provider).await
|
||||
}
|
||||
|
||||
async fn websocket_harness_with_provider(provider: ModelProviderInfo) -> WebsocketTestHarness {
|
||||
let codex_home = TempDir::new().unwrap();
|
||||
let mut config = load_default_config_for_test(&codex_home).await;
|
||||
config.model = Some(MODEL.to_string());
|
||||
|
||||
@@ -34,6 +34,7 @@ async fn remote_compact_replaces_history_for_followups() -> Result<()> {
|
||||
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
|
||||
.with_config(|config| {
|
||||
config.features.enable(Feature::RemoteCompaction);
|
||||
config.model_provider.force_datadog_tracing = true;
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
@@ -108,6 +109,9 @@ async fn remote_compact_replaces_history_for_followups() -> Result<()> {
|
||||
compact_request.header("authorization").as_deref(),
|
||||
Some("Bearer Access Token")
|
||||
);
|
||||
assert_eq!(compact_request.header("x-datadog-trace-id"), None);
|
||||
assert_eq!(compact_request.header("x-datadog-parent-id"), None);
|
||||
assert_eq!(compact_request.header("x-datadog-sampling-priority"), None);
|
||||
let compact_body = compact_request.body_json();
|
||||
assert_eq!(
|
||||
compact_body.get("model").and_then(|v| v.as_str()),
|
||||
|
||||
@@ -74,6 +74,7 @@ async fn continue_after_stream_error() {
|
||||
stream_idle_timeout_ms: Some(2_000),
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
force_datadog_tracing: false,
|
||||
};
|
||||
|
||||
let TestCodex { codex, .. } = test_codex()
|
||||
|
||||
@@ -82,6 +82,7 @@ async fn retries_on_early_close() {
|
||||
stream_idle_timeout_ms: Some(2000),
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
force_datadog_tracing: false,
|
||||
};
|
||||
|
||||
let TestCodex { codex, .. } = test_codex()
|
||||
|
||||
Reference in New Issue
Block a user