Compare commits

...

1 Commits

Author SHA1 Message Date
Tom Wiltzius
43c6030612 core: add optional datadog trace headers for responses calls 2026-02-01 16:27:50 -08:00
13 changed files with 261 additions and 1 deletions

View File

@@ -412,6 +412,10 @@
"description": "Value to use with `Authorization: Bearer <token>` header. Use of this config is discouraged in favor of `env_key` for security reasons, but this may be necessary when using this programmatically.",
"type": "string"
},
"force_datadog_tracing": {
"description": "Whether to include Datadog tracing headers on Responses API calls.",
"type": "boolean"
},
"http_headers": {
"additionalProperties": {
"type": "string"

View File

@@ -50,6 +50,7 @@ use tokio::sync::mpsc;
use tokio_tungstenite::tungstenite::Error;
use tokio_tungstenite::tungstenite::Message;
use tracing::warn;
use uuid::Uuid;
use crate::AuthManager;
use crate::auth::CodexAuth;
@@ -72,6 +73,10 @@ use crate::transport_manager::TransportManager;
pub const WEB_SEARCH_ELIGIBLE_HEADER: &str = "x-oai-web-search-eligible";
pub const X_CODEX_TURN_STATE_HEADER: &str = "x-codex-turn-state";
const X_DATADOG_TRACE_ID_HEADER: &str = "x-datadog-trace-id";
const X_DATADOG_PARENT_ID_HEADER: &str = "x-datadog-parent-id";
const X_DATADOG_SAMPLING_PRIORITY_HEADER: &str = "x-datadog-sampling-priority";
const DATADOG_SAMPLING_PRIORITY: i32 = 2;
#[derive(Debug)]
struct ModelClientState {
@@ -380,7 +385,11 @@ impl ModelClientSession {
store_override: None,
conversation_id: Some(conversation_id),
session_source: Some(self.state.session_source.clone()),
extra_headers: build_responses_headers(&self.state.config, Some(&self.turn_state)),
extra_headers: build_responses_headers(
&self.state.config,
&self.state.provider,
Some(&self.turn_state),
),
compression,
turn_state: Some(Arc::clone(&self.turn_state)),
}
@@ -712,6 +721,7 @@ fn experimental_feature_headers(config: &Config) -> ApiHeaderMap {
fn build_responses_headers(
config: &Config,
provider: &ModelProviderInfo,
turn_state: Option<&Arc<OnceLock<String>>>,
) -> ApiHeaderMap {
let mut headers = experimental_feature_headers(config);
@@ -725,6 +735,7 @@ fn build_responses_headers(
},
),
);
apply_responses_trace_headers(&mut headers, provider.force_datadog_tracing);
if let Some(turn_state) = turn_state
&& let Some(state) = turn_state.get()
&& let Ok(header_value) = HeaderValue::from_str(state)
@@ -734,6 +745,38 @@ fn build_responses_headers(
headers
}
fn apply_responses_trace_headers(headers: &mut ApiHeaderMap, force_datadog_tracing: bool) {
if !force_datadog_tracing {
return;
}
apply_datadog_headers(headers);
}
fn apply_datadog_headers(headers: &mut ApiHeaderMap) {
let trace_id = random_datadog_id();
let parent_id = random_datadog_id();
if let Ok(trace_header) = HeaderValue::from_str(&trace_id.to_string()) {
let _ = headers.insert(X_DATADOG_TRACE_ID_HEADER, trace_header);
}
if let Ok(parent_header) = HeaderValue::from_str(&parent_id.to_string()) {
let _ = headers.insert(X_DATADOG_PARENT_ID_HEADER, parent_header);
}
let priority_value = DATADOG_SAMPLING_PRIORITY.to_string();
if let Ok(priority_header) = HeaderValue::from_str(&priority_value) {
let _ = headers.insert(X_DATADOG_SAMPLING_PRIORITY_HEADER, priority_header);
}
}
fn random_datadog_id() -> u64 {
loop {
let value = (Uuid::new_v4().as_u128() & u128::from(u64::MAX)) as u64;
if value != 0 {
return value;
}
}
}
fn map_response_stream<S>(api_stream: S, otel_manager: OtelManager) -> ResponseStream
where
S: futures::Stream<Item = std::result::Result<ResponseEvent, ApiError>>

View File

@@ -3722,6 +3722,7 @@ model_verbosity = "high"
stream_idle_timeout_ms: Some(300_000),
requires_openai_auth: false,
supports_websockets: false,
force_datadog_tracing: false,
};
let model_provider_map = {
let mut model_provider_map = built_in_model_providers();

View File

@@ -106,6 +106,10 @@ pub struct ModelProviderInfo {
/// Whether this provider supports the Responses API WebSocket transport.
#[serde(default)]
pub supports_websockets: bool,
/// Whether to include Datadog tracing headers on Responses API calls.
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
pub force_datadog_tracing: bool,
}
impl ModelProviderInfo {
@@ -264,6 +268,7 @@ impl ModelProviderInfo {
stream_idle_timeout_ms: None,
requires_openai_auth: true,
supports_websockets: true,
force_datadog_tracing: false,
}
}
@@ -343,6 +348,7 @@ pub fn create_oss_provider_with_base_url(base_url: &str, wire_api: WireApi) -> M
stream_idle_timeout_ms: None,
requires_openai_auth: false,
supports_websockets: false,
force_datadog_tracing: false,
}
}
@@ -372,6 +378,7 @@ base_url = "http://localhost:11434/v1"
stream_idle_timeout_ms: None,
requires_openai_auth: false,
supports_websockets: false,
force_datadog_tracing: false,
};
let provider: ModelProviderInfo = toml::from_str(azure_provider_toml).unwrap();
@@ -403,6 +410,7 @@ query_params = { api-version = "2025-04-01-preview" }
stream_idle_timeout_ms: None,
requires_openai_auth: false,
supports_websockets: false,
force_datadog_tracing: false,
};
let provider: ModelProviderInfo = toml::from_str(azure_provider_toml).unwrap();
@@ -437,6 +445,7 @@ env_http_headers = { "X-Example-Env-Header" = "EXAMPLE_ENV_VAR" }
stream_idle_timeout_ms: None,
requires_openai_auth: false,
supports_websockets: false,
force_datadog_tracing: false,
};
let provider: ModelProviderInfo = toml::from_str(azure_provider_toml).unwrap();

View File

@@ -437,6 +437,7 @@ mod tests {
stream_idle_timeout_ms: Some(5_000),
requires_openai_auth: false,
supports_websockets: false,
force_datadog_tracing: false,
}
}
@@ -497,6 +498,42 @@ mod tests {
);
}
#[tokio::test]
async fn refresh_available_models_never_adds_datadog_trace_headers() {
let server = MockServer::start().await;
let models_mock = mount_models_once(&server, ModelsResponse { models: Vec::new() }).await;
let codex_home = tempdir().expect("temp dir");
let mut config = ConfigBuilder::default()
.codex_home(codex_home.path().to_path_buf())
.build()
.await
.expect("load default test config");
config.features.enable(Feature::RemoteModels);
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let mut provider = provider_for(server.uri());
provider.force_datadog_tracing = true;
let manager =
ModelsManager::with_provider(codex_home.path().to_path_buf(), auth_manager, provider);
manager
.refresh_available_models(&config, RefreshStrategy::OnlineIfUncached)
.await
.expect("refresh succeeds");
let requests = models_mock.requests();
assert_eq!(requests.len(), 1);
assert!(requests[0].headers.get("x-datadog-trace-id").is_none());
assert!(requests[0].headers.get("x-datadog-parent-id").is_none());
assert!(
requests[0]
.headers
.get("x-datadog-sampling-priority")
.is_none()
);
}
#[tokio::test]
async fn refresh_available_models_uses_cache_when_fresh() {
let server = MockServer::start().await;

View File

@@ -61,6 +61,7 @@ async fn run_request(input: Vec<ResponseItem>) -> Value {
stream_idle_timeout_ms: Some(5_000),
requires_openai_auth: false,
supports_websockets: false,
force_datadog_tracing: false,
};
let codex_home = match TempDir::new() {

View File

@@ -60,6 +60,7 @@ async fn run_stream_with_bytes(sse_body: &[u8]) -> Vec<ResponseEvent> {
stream_idle_timeout_ms: Some(5_000),
requires_openai_auth: false,
supports_websockets: false,
force_datadog_tracing: false,
};
let codex_home = match TempDir::new() {

View File

@@ -26,6 +26,10 @@ use futures::StreamExt;
use tempfile::TempDir;
use wiremock::matchers::header;
const X_DATADOG_TRACE_ID_HEADER: &str = "x-datadog-trace-id";
const X_DATADOG_PARENT_ID_HEADER: &str = "x-datadog-parent-id";
const X_DATADOG_SAMPLING_PRIORITY_HEADER: &str = "x-datadog-sampling-priority";
#[tokio::test]
async fn responses_stream_includes_subagent_header_on_review() {
core_test_support::skip_if_no_network!();
@@ -58,6 +62,7 @@ async fn responses_stream_includes_subagent_header_on_review() {
stream_idle_timeout_ms: Some(5_000),
requires_openai_auth: false,
supports_websockets: false,
force_datadog_tracing: false,
};
let codex_home = TempDir::new().expect("failed to create TempDir");
@@ -156,6 +161,7 @@ async fn responses_stream_includes_subagent_header_on_other() {
stream_idle_timeout_ms: Some(5_000),
requires_openai_auth: false,
supports_websockets: false,
force_datadog_tracing: false,
};
let codex_home = TempDir::new().expect("failed to create TempDir");
@@ -283,6 +289,110 @@ async fn responses_stream_includes_web_search_eligible_header_false_when_disable
);
}
#[tokio::test]
async fn responses_stream_includes_datadog_trace_headers_when_enabled() {
core_test_support::skip_if_no_network!();
let server = responses::start_mock_server().await;
let response_body = responses::sse(vec![
responses::ev_response_created("resp-1"),
responses::ev_completed("resp-1"),
]);
let request_recorder = responses::mount_sse_once(&server, response_body).await;
let provider = ModelProviderInfo {
name: "mock".into(),
base_url: Some(format!("{}/v1", server.uri())),
env_key: None,
env_key_instructions: None,
experimental_bearer_token: None,
wire_api: WireApi::Responses,
query_params: None,
http_headers: None,
env_http_headers: None,
request_max_retries: Some(0),
stream_max_retries: Some(0),
stream_idle_timeout_ms: Some(5_000),
requires_openai_auth: false,
supports_websockets: false,
force_datadog_tracing: true,
};
let codex_home = TempDir::new().expect("failed to create TempDir");
let mut config = load_default_config_for_test(&codex_home).await;
config.model_provider_id = provider.name.clone();
config.model_provider = provider.clone();
let effort = config.model_reasoning_effort;
let summary = config.model_reasoning_summary;
let model = ModelsManager::get_model_offline(config.model.as_deref());
config.model = Some(model.clone());
let config = Arc::new(config);
let conversation_id = ThreadId::new();
let auth_mode = AuthMode::Chatgpt;
let session_source = SessionSource::SubAgent(SubAgentSource::Review);
let model_info = ModelsManager::construct_model_info_offline(model.as_str(), &config);
let otel_manager = OtelManager::new(
conversation_id,
model.as_str(),
model_info.slug.as_str(),
None,
Some("test@test.com".to_string()),
Some(auth_mode),
false,
"test".to_string(),
session_source.clone(),
);
let mut client_session = ModelClient::new(
Arc::clone(&config),
None,
model_info,
otel_manager,
provider,
effort,
summary,
conversation_id,
session_source,
TransportManager::new(),
)
.new_session();
let mut prompt = Prompt::default();
prompt.input = vec![ResponseItem::Message {
id: None,
role: "user".into(),
content: vec![ContentItem::InputText {
text: "hello".into(),
}],
end_turn: None,
}];
let mut stream = client_session.stream(&prompt).await.expect("stream failed");
while let Some(event) = stream.next().await {
if matches!(event, Ok(ResponseEvent::Completed { .. })) {
break;
}
}
let request = request_recorder.single_request();
let trace_id = request
.header(X_DATADOG_TRACE_ID_HEADER)
.expect("trace id header");
let parent_id = request
.header(X_DATADOG_PARENT_ID_HEADER)
.expect("parent id header");
assert!(trace_id.parse::<u64>().is_ok_and(|id| id != 0));
assert!(parent_id.parse::<u64>().is_ok_and(|id| id != 0));
assert_eq!(
request
.header(X_DATADOG_SAMPLING_PRIORITY_HEADER)
.as_deref(),
Some("2")
);
}
#[tokio::test]
async fn responses_respects_model_info_overrides_from_config() {
core_test_support::skip_if_no_network!();
@@ -310,6 +420,7 @@ async fn responses_respects_model_info_overrides_from_config() {
stream_idle_timeout_ms: Some(5_000),
requires_openai_auth: false,
supports_websockets: false,
force_datadog_tracing: false,
};
let codex_home = TempDir::new().expect("failed to create TempDir");

View File

@@ -1152,6 +1152,7 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() {
stream_idle_timeout_ms: Some(5_000),
requires_openai_auth: false,
supports_websockets: false,
force_datadog_tracing: false,
};
let codex_home = TempDir::new().unwrap();
@@ -1674,6 +1675,7 @@ async fn azure_overrides_assign_properties_used_for_responses_url() {
stream_idle_timeout_ms: None,
requires_openai_auth: false,
supports_websockets: false,
force_datadog_tracing: false,
};
// Init session
@@ -1755,6 +1757,7 @@ async fn env_var_overrides_loaded_auth() {
stream_idle_timeout_ms: None,
requires_openai_auth: false,
supports_websockets: false,
force_datadog_tracing: false,
};
// Init session

View File

@@ -35,6 +35,9 @@ use tempfile::TempDir;
use tracing_test::traced_test;
const MODEL: &str = "gpt-5.2-codex";
const X_DATADOG_TRACE_ID_HEADER: &str = "x-datadog-trace-id";
const X_DATADOG_PARENT_ID_HEADER: &str = "x-datadog-parent-id";
const X_DATADOG_SAMPLING_PRIORITY_HEADER: &str = "x-datadog-sampling-priority";
struct WebsocketTestHarness {
_codex_home: TempDir,
@@ -136,6 +139,42 @@ async fn responses_websocket_emits_reasoning_included_event() {
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_includes_datadog_trace_headers_when_enabled() {
skip_if_no_network!();
let server = start_websocket_server(vec![vec![vec![
ev_response_created("resp-1"),
ev_completed("resp-1"),
]]])
.await;
let mut provider = websocket_provider(&server);
provider.force_datadog_tracing = true;
let harness = websocket_harness_with_provider(provider).await;
let mut session = harness.client.new_session();
let prompt = prompt_with_input(vec![message_item("hello")]);
stream_until_complete(&mut session, &prompt).await;
let handshake = server.single_handshake();
let trace_id = handshake
.header(X_DATADOG_TRACE_ID_HEADER)
.expect("trace id header");
let parent_id = handshake
.header(X_DATADOG_PARENT_ID_HEADER)
.expect("parent id header");
assert!(trace_id.parse::<u64>().is_ok_and(|id| id != 0));
assert!(parent_id.parse::<u64>().is_ok_and(|id| id != 0));
assert_eq!(
handshake
.header(X_DATADOG_SAMPLING_PRIORITY_HEADER)
.as_deref(),
Some("2")
);
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_appends_on_prefix() {
skip_if_no_network!();
@@ -236,11 +275,16 @@ fn websocket_provider(server: &WebSocketTestServer) -> ModelProviderInfo {
stream_idle_timeout_ms: Some(5_000),
requires_openai_auth: false,
supports_websockets: true,
force_datadog_tracing: false,
}
}
async fn websocket_harness(server: &WebSocketTestServer) -> WebsocketTestHarness {
let provider = websocket_provider(server);
websocket_harness_with_provider(provider).await
}
async fn websocket_harness_with_provider(provider: ModelProviderInfo) -> WebsocketTestHarness {
let codex_home = TempDir::new().unwrap();
let mut config = load_default_config_for_test(&codex_home).await;
config.model = Some(MODEL.to_string());

View File

@@ -34,6 +34,7 @@ async fn remote_compact_replaces_history_for_followups() -> Result<()> {
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.features.enable(Feature::RemoteCompaction);
config.model_provider.force_datadog_tracing = true;
}),
)
.await?;
@@ -108,6 +109,9 @@ async fn remote_compact_replaces_history_for_followups() -> Result<()> {
compact_request.header("authorization").as_deref(),
Some("Bearer Access Token")
);
assert_eq!(compact_request.header("x-datadog-trace-id"), None);
assert_eq!(compact_request.header("x-datadog-parent-id"), None);
assert_eq!(compact_request.header("x-datadog-sampling-priority"), None);
let compact_body = compact_request.body_json();
assert_eq!(
compact_body.get("model").and_then(|v| v.as_str()),

View File

@@ -74,6 +74,7 @@ async fn continue_after_stream_error() {
stream_idle_timeout_ms: Some(2_000),
requires_openai_auth: false,
supports_websockets: false,
force_datadog_tracing: false,
};
let TestCodex { codex, .. } = test_codex()

View File

@@ -82,6 +82,7 @@ async fn retries_on_early_close() {
stream_idle_timeout_ms: Some(2000),
requires_openai_auth: false,
supports_websockets: false,
force_datadog_tracing: false,
};
let TestCodex { codex, .. } = test_codex()