Compare commits

...

2 Commits

Author SHA1 Message Date
jif-oai
17e9542872 test: avoid redundant thread id clone
Co-authored-by: Codex <noreply@openai.com>
2026-04-24 18:24:14 +02:00
jif-oai
05444a2eb6 feat: add root thread id 2026-04-24 16:18:00 +02:00
19 changed files with 288 additions and 46 deletions

View File

@@ -14,7 +14,7 @@ The public interface of this crate is intentionally small and uniform:
- **Responses endpoint**
- Input:
- `ResponsesApiRequest` for the request body (`model`, `instructions`, `input`, `tools`, `parallel_tool_calls`, reasoning/text controls).
- `ResponsesOptions` for transport/header concerns (`conversation_id`, `session_source`, `extra_headers`, `compression`, `turn_state`).
- `ResponsesOptions` for transport/header concerns (`session_id`, `thread_id`, `session_source`, `extra_headers`, `compression`, `turn_state`).
- Output: a `ResponseStream` of `ResponseEvent` (both re-exported from `common`).
- **Compaction endpoint**

View File

@@ -6,7 +6,7 @@ use crate::error::ApiError;
use crate::provider::Provider;
use crate::requests::Compression;
use crate::requests::attach_item_ids;
use crate::requests::headers::build_conversation_headers;
use crate::requests::headers::build_session_headers;
use crate::requests::headers::insert_header;
use crate::requests::headers::subagent_header;
use crate::sse::spawn_response_stream;
@@ -30,7 +30,8 @@ pub struct ResponsesClient<T: HttpTransport> {
#[derive(Default)]
pub struct ResponsesOptions {
pub conversation_id: Option<String>,
pub session_id: Option<String>,
pub thread_id: Option<String>,
pub session_source: Option<SessionSource>,
pub extra_headers: HeaderMap,
pub compression: Compression,
@@ -72,7 +73,8 @@ impl<T: HttpTransport> ResponsesClient<T> {
options: ResponsesOptions,
) -> Result<ResponseStream, ApiError> {
let ResponsesOptions {
conversation_id,
session_id,
thread_id,
session_source,
extra_headers,
compression,
@@ -86,10 +88,10 @@ impl<T: HttpTransport> ResponsesClient<T> {
}
let mut headers = extra_headers;
if let Some(ref conv_id) = conversation_id {
insert_header(&mut headers, "x-client-request-id", conv_id);
if let Some(ref thread_id) = thread_id {
insert_header(&mut headers, "x-client-request-id", thread_id);
}
headers.extend(build_conversation_headers(conversation_id));
headers.extend(build_session_headers(session_id, thread_id));
if let Some(subagent) = subagent_header(&session_source) {
insert_header(&mut headers, "x-openai-subagent", &subagent);
}

View File

@@ -10,7 +10,7 @@ pub(crate) mod requests;
pub(crate) mod sse;
pub(crate) mod telemetry;
pub use crate::requests::headers::build_conversation_headers;
pub use crate::requests::headers::build_session_headers;
pub use codex_client::RequestTelemetry;
pub use codex_client::ReqwestTransport;
pub use codex_client::TransportError;

View File

@@ -2,11 +2,14 @@ use codex_protocol::protocol::SessionSource;
use http::HeaderMap;
use http::HeaderValue;
pub fn build_conversation_headers(conversation_id: Option<String>) -> HeaderMap {
pub fn build_session_headers(session_id: Option<String>, thread_id: Option<String>) -> HeaderMap {
let mut headers = HeaderMap::new();
if let Some(id) = conversation_id {
if let Some(id) = session_id {
insert_header(&mut headers, "session_id", &id);
}
if let Some(id) = thread_id {
insert_header(&mut headers, "thread_id", &id);
}
headers
}

View File

@@ -445,7 +445,8 @@ async fn azure_default_store_attaches_ids_and_headers() -> Result<()> {
.stream_request(
request,
ResponsesOptions {
conversation_id: Some("sess_123".into()),
session_id: Some("sess_123".into()),
thread_id: Some("thread_456".into()),
session_source: Some(SessionSource::SubAgent(SubAgentSource::Review)),
extra_headers,
compression: Compression::None,
@@ -462,6 +463,10 @@ async fn azure_default_store_attaches_ids_and_headers() -> Result<()> {
req.headers.get("session_id").and_then(|v| v.to_str().ok()),
Some("sess_123")
);
assert_eq!(
req.headers.get("thread_id").and_then(|v| v.to_str().ok()),
Some("thread_456")
);
assert_eq!(
req.headers
.get("x-openai-subagent")

View File

@@ -58,7 +58,7 @@ use codex_api::SseTelemetry;
use codex_api::TransportError;
use codex_api::WebsocketTelemetry;
use codex_api::auth_header_telemetry;
use codex_api::build_conversation_headers;
use codex_api::build_session_headers;
use codex_api::create_text_param_for_request;
use codex_api::response_create_client_metadata;
use codex_app_server_protocol::AuthMode;
@@ -149,7 +149,8 @@ pub(crate) const WEBSOCKET_CONNECT_TIMEOUT: Duration =
/// configuration is per turn and is passed explicitly to streaming/unary methods.
#[derive(Debug)]
struct ModelClientState {
conversation_id: ThreadId,
root_thread_id: ThreadId,
thread_id: ThreadId,
window_generation: AtomicU64,
installation_id: String,
provider: SharedModelProvider,
@@ -187,7 +188,7 @@ impl RequestRouteTelemetry {
/// A session-scoped client for model-provider API calls.
///
/// This holds configuration and state that should be shared across turns within a Codex session
/// (auth, provider selection, conversation id, and transport fallback state).
/// (auth, provider selection, root/current thread ids, and transport fallback state).
///
/// WebSocket fallback is session-scoped: once a turn activates the HTTP fallback, subsequent turns
/// will also use HTTP for the remainder of the session.
@@ -294,7 +295,8 @@ impl ModelClient {
/// are passed to [`ModelClientSession::stream`] (and other turn-scoped methods) explicitly.
pub fn new(
auth_manager: Option<Arc<AuthManager>>,
conversation_id: ThreadId,
root_thread_id: ThreadId,
thread_id: ThreadId,
installation_id: String,
provider_info: ModelProviderInfo,
session_source: SessionSource,
@@ -312,7 +314,8 @@ impl ModelClient {
collect_auth_env_telemetry(model_provider.info(), codex_api_key_env_enabled);
Self {
state: Arc::new(ModelClientState {
conversation_id,
root_thread_id,
thread_id,
window_generation: AtomicU64::new(0),
installation_id,
provider: model_provider,
@@ -357,9 +360,9 @@ impl ModelClient {
}
fn current_window_id(&self) -> String {
let conversation_id = self.state.conversation_id;
let thread_id = self.state.thread_id;
let window_generation = self.state.window_generation.load(Ordering::Relaxed);
format!("{conversation_id}:{window_generation}")
format!("{thread_id}:{window_generation}")
}
fn take_cached_websocket_session(&self) -> WebsocketSession {
@@ -470,9 +473,10 @@ impl ModelClient {
extra_headers.insert(X_CODEX_INSTALLATION_ID_HEADER, header_value);
}
extra_headers.extend(self.build_responses_identity_headers());
extra_headers.extend(build_conversation_headers(Some(
self.state.conversation_id.to_string(),
)));
extra_headers.extend(build_session_headers(
Some(self.state.root_thread_id.to_string()),
Some(self.state.thread_id.to_string()),
));
let trace_attempt = compaction_trace.start_attempt(&payload);
let result = client
.compact_input(&payload, extra_headers)
@@ -784,16 +788,19 @@ impl ModelClient {
turn_metadata_header: Option<&str>,
) -> ApiHeaderMap {
let turn_metadata_header = parse_turn_metadata_header(turn_metadata_header);
let conversation_id = self.state.conversation_id.to_string();
let thread_id = self.state.thread_id.to_string();
let mut headers = build_responses_headers(
self.state.beta_features_header.as_deref(),
turn_state,
turn_metadata_header.as_ref(),
);
if let Ok(header_value) = HeaderValue::from_str(&conversation_id) {
if let Ok(header_value) = HeaderValue::from_str(&thread_id) {
headers.insert("x-client-request-id", header_value);
}
headers.extend(build_conversation_headers(Some(conversation_id)));
headers.extend(build_session_headers(
Some(self.state.root_thread_id.to_string()),
Some(thread_id),
));
headers.extend(self.build_responses_identity_headers());
headers.insert(
OPENAI_BETA_HEADER,
@@ -875,7 +882,7 @@ impl ModelClientSession {
&prompt.output_schema,
prompt.output_schema_strict,
);
let prompt_cache_key = Some(self.client.state.conversation_id.to_string());
let prompt_cache_key = Some(self.client.state.thread_id.to_string());
let request = ResponsesApiRequest {
model: model_info.slug.clone(),
instructions: instructions.clone(),
@@ -913,9 +920,11 @@ impl ModelClientSession {
compression: Compression,
) -> ApiResponsesOptions {
let turn_metadata_header = parse_turn_metadata_header(turn_metadata_header);
let conversation_id = self.client.state.conversation_id.to_string();
let root_thread_id = self.client.state.root_thread_id.to_string();
let thread_id = self.client.state.thread_id.to_string();
ApiResponsesOptions {
conversation_id: Some(conversation_id),
session_id: Some(root_thread_id),
thread_id: Some(thread_id),
session_source: Some(self.client.state.session_source.clone()),
extra_headers: {
let mut headers = build_responses_headers(

View File

@@ -23,7 +23,8 @@ fn test_model_client(session_source: SessionSource) -> ModelClient {
let provider = create_oss_provider_with_base_url("https://example.com/v1", WireApi::Responses);
ModelClient::new(
/*auth_manager*/ None,
ThreadId::new(),
/*root_thread_id*/ ThreadId::new(),
/*thread_id*/ ThreadId::new(),
/*installation_id*/ "11111111-1111-4111-8111-111111111111".to_string(),
provider,
session_source,
@@ -105,7 +106,7 @@ fn build_ws_client_metadata_includes_window_lineage_and_turn_metadata() {
client.advance_window_generation();
let client_metadata = client.build_ws_client_metadata(Some(r#"{"turn_id":"turn-123"}"#));
let conversation_id = client.state.conversation_id;
let thread_id = client.state.thread_id;
assert_eq!(
client_metadata,
std::collections::HashMap::from([
@@ -115,7 +116,7 @@ fn build_ws_client_metadata_includes_window_lineage_and_turn_metadata() {
),
(
X_CODEX_WINDOW_ID_HEADER.to_string(),
format!("{conversation_id}:1"),
format!("{thread_id}:1"),
),
(
X_OPENAI_SUBAGENT_HEADER.to_string(),

View File

@@ -76,6 +76,7 @@ pub(crate) async fn run_codex_thread_interactive(
let (tx_ops, rx_ops) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
let CodexSpawnOk { codex, .. } = Box::pin(Codex::spawn(CodexSpawnArgs {
config,
root_thread_id: Some(parent_session.root_thread_id),
auth_manager,
models_manager,
environment_manager: Arc::clone(&parent_session.services.environment_manager),

View File

@@ -389,6 +389,7 @@ pub struct CodexSpawnOk {
pub(crate) struct CodexSpawnArgs {
pub(crate) config: Config,
pub(crate) root_thread_id: Option<ThreadId>,
pub(crate) auth_manager: Arc<AuthManager>,
pub(crate) models_manager: SharedModelsManager,
pub(crate) environment_manager: Arc<EnvironmentManager>,
@@ -450,6 +451,7 @@ impl Codex {
async fn spawn_internal(args: CodexSpawnArgs) -> CodexResult<CodexSpawnOk> {
let CodexSpawnArgs {
mut config,
root_thread_id,
auth_manager,
models_manager,
environment_manager,
@@ -659,6 +661,7 @@ impl Codex {
let session = Session::new(
session_configuration,
root_thread_id,
config.clone(),
auth_manager.clone(),
models_manager.clone(),

View File

@@ -90,7 +90,7 @@ pub(super) async fn spawn_review_thread(
let per_turn_config = Arc::new(per_turn_config);
let review_turn_id = sub_id.to_string();
let turn_metadata_state = Arc::new(TurnMetadataState::new(
sess.conversation_id.to_string(),
sess.root_thread_id.to_string(),
&session_source,
review_turn_id.clone(),
parent_turn_context.cwd.clone(),

View File

@@ -6,6 +6,7 @@ use tokio::sync::Semaphore;
///
/// A session has at most 1 running task at a time, and can be interrupted by user input.
pub(crate) struct Session {
pub(crate) root_thread_id: ThreadId,
pub(crate) conversation_id: ThreadId,
pub(super) tx_event: Sender<Event>,
pub(super) agent_status: watch::Sender<AgentStatus>,
@@ -251,6 +252,7 @@ impl Session {
)]
pub(crate) async fn new(
mut session_configuration: SessionConfiguration,
root_thread_id: Option<ThreadId>,
config: Arc<Config>,
auth_manager: Arc<AuthManager>,
models_manager: SharedModelsManager,
@@ -287,6 +289,7 @@ impl Session {
}
InitialHistory::Resumed(resumed_history) => resumed_history.conversation_id,
};
let root_thread_id = root_thread_id.unwrap_or(conversation_id);
let window_generation = match &initial_history {
InitialHistory::Resumed(resumed_history) => u64::try_from(
resumed_history
@@ -757,7 +760,8 @@ impl Session {
thread_store: Arc::clone(&thread_store),
model_client: ModelClient::new(
Some(Arc::clone(&auth_manager)),
conversation_id,
/*root_thread_id*/ root_thread_id,
/*thread_id*/ conversation_id,
installation_id,
session_configuration.provider.clone(),
session_configuration.session_source.clone(),
@@ -783,6 +787,7 @@ impl Session {
let (mailbox, mailbox_rx) = Mailbox::new();
let sess = Arc::new(Session {
root_thread_id,
conversation_id,
tx_event: tx_event.clone(),
agent_status,

View File

@@ -331,6 +331,10 @@ async fn interrupting_regular_turn_waiting_on_startup_prewarm_emits_turn_aborted
fn test_model_client_session() -> crate::client::ModelClientSession {
crate::client::ModelClient::new(
/*auth_manager*/ None,
/*root_thread_id*/
ThreadId::try_from("00000000-0000-4000-8000-000000000001")
.expect("test thread id should be valid"),
/*thread_id*/
ThreadId::try_from("00000000-0000-4000-8000-000000000001")
.expect("test thread id should be valid"),
/*installation_id*/ "11111111-1111-4111-8111-111111111111".to_string(),
@@ -3147,6 +3151,7 @@ async fn session_new_fails_when_zsh_fork_enabled_without_zsh_path() {
));
let result = Session::new(
session_configuration,
/*root_thread_id*/ None,
Arc::clone(&config),
auth_manager,
models_manager,
@@ -3315,7 +3320,8 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
)),
model_client: ModelClient::new(
Some(auth_manager.clone()),
conversation_id,
/*root_thread_id*/ conversation_id,
/*thread_id*/ conversation_id,
/*installation_id*/ "11111111-1111-4111-8111-111111111111".to_string(),
session_configuration.provider.clone(),
session_configuration.session_source.clone(),
@@ -3372,6 +3378,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
let (mailbox, mailbox_rx) = crate::agent::Mailbox::new();
let session = Session {
root_thread_id: conversation_id,
conversation_id,
tx_event,
agent_status: agent_status_tx,
@@ -3474,6 +3481,7 @@ async fn make_session_with_config_and_rx(
let session = Session::new(
session_configuration,
/*root_thread_id*/ None,
Arc::clone(&config),
auth_manager,
models_manager,
@@ -4678,7 +4686,8 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx(
)),
model_client: ModelClient::new(
Some(Arc::clone(&auth_manager)),
conversation_id,
/*root_thread_id*/ conversation_id,
/*thread_id*/ conversation_id,
/*installation_id*/ "11111111-1111-4111-8111-111111111111".to_string(),
session_configuration.provider.clone(),
session_configuration.session_source.clone(),
@@ -4735,6 +4744,7 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx(
let (mailbox, mailbox_rx) = crate::agent::Mailbox::new();
let session = Arc::new(Session {
root_thread_id: conversation_id,
conversation_id,
tx_event,
agent_status: agent_status_tx,

View File

@@ -755,6 +755,7 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() {
let CodexSpawnOk { codex, .. } = Codex::spawn(CodexSpawnArgs {
config,
root_thread_id: None,
auth_manager,
models_manager,
environment_manager: Arc::new(EnvironmentManager::default_for_tests()),

View File

@@ -388,7 +388,7 @@ impl Session {
#[allow(clippy::too_many_arguments)]
pub(crate) fn make_turn_context(
conversation_id: ThreadId,
root_thread_id: ThreadId,
auth_manager: Option<Arc<AuthManager>>,
session_telemetry: &SessionTelemetry,
provider: ModelProviderInfo,
@@ -448,7 +448,7 @@ impl Session {
let per_turn_config = Arc::new(per_turn_config);
let turn_metadata_state = Arc::new(TurnMetadataState::new(
conversation_id.to_string(),
root_thread_id.to_string(),
&session_source,
sub_id.clone(),
cwd.clone(),
@@ -655,7 +655,7 @@ impl Session {
.await,
);
let mut turn_context: TurnContext = Self::make_turn_context(
self.conversation_id,
self.root_thread_id,
Some(Arc::clone(&self.services.auth_manager)),
&self.services.session_telemetry,
session_configuration.provider.clone(),

View File

@@ -1037,10 +1037,12 @@ impl ThreadManagerState {
let parent_rollout_thread_trace = self
.parent_rollout_thread_trace_for_source(&session_source, &initial_history)
.await;
let root_thread_id = self.root_thread_id_for_source(&session_source).await;
let CodexSpawnOk {
codex, thread_id, ..
} = Codex::spawn(CodexSpawnArgs {
config,
root_thread_id,
auth_manager,
models_manager: Arc::clone(&self.models_manager),
environment_manager: Arc::clone(&self.environment_manager),
@@ -1068,6 +1070,23 @@ impl ThreadManagerState {
.await
}
async fn root_thread_id_for_source(&self, session_source: &SessionSource) -> Option<ThreadId> {
let SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth,
..
}) = session_source
else {
return None;
};
if let Ok(parent_thread) = self.get_thread(*parent_thread_id).await {
return Some(parent_thread.codex.session.root_thread_id);
}
(*depth == 1).then_some(*parent_thread_id)
}
async fn finalize_thread_spawn(
&self,
codex: Codex,

View File

@@ -254,8 +254,10 @@ async fn shutdown_all_threads_bounded_submits_shutdown_to_every_thread() {
let temp_dir = tempdir().expect("tempdir");
let mut config = test_config().await;
config.codex_home = temp_dir.path().join("codex-home").abs();
config.sqlite_home = temp_dir.path().join("sqlite-home").abs().to_path_buf();
config.cwd = config.codex_home.abs();
std::fs::create_dir_all(&config.codex_home).expect("create codex home");
std::fs::create_dir_all(&config.sqlite_home).expect("create sqlite home");
let manager = ThreadManager::with_models_provider_and_home_for_tests(
CodexAuth::from_api_key("dummy"),

View File

@@ -100,7 +100,8 @@ async fn responses_stream_includes_subagent_header_on_review() {
let client = ModelClient::new(
/*auth_manager*/ None,
conversation_id,
/*root_thread_id*/ conversation_id,
/*thread_id*/ conversation_id,
/*installation_id*/ TEST_INSTALLATION_ID.to_string(),
provider.clone(),
session_source,
@@ -142,6 +143,7 @@ async fn responses_stream_includes_subagent_header_on_review() {
}
let request = request_recorder.single_request();
let expected_session_id = conversation_id.to_string();
let expected_window_id = format!("{conversation_id}:0");
assert_eq!(
request.header("x-openai-subagent").as_deref(),
@@ -151,6 +153,14 @@ async fn responses_stream_includes_subagent_header_on_review() {
request.header("x-codex-window-id").as_deref(),
Some(expected_window_id.as_str())
);
assert_eq!(
request.header("session_id").as_deref(),
Some(expected_session_id.as_str())
);
assert_eq!(
request.header("thread_id").as_deref(),
Some(expected_session_id.as_str())
);
assert_eq!(request.header("x-codex-parent-thread-id"), None);
assert_eq!(
request.body_json()["client_metadata"]["x-codex-installation-id"].as_str(),
@@ -159,6 +169,151 @@ async fn responses_stream_includes_subagent_header_on_review() {
assert_eq!(request.header("x-codex-sandbox"), None);
}
#[tokio::test]
async fn responses_stream_separates_session_id_from_thread_id_for_thread_spawn() {
core_test_support::skip_if_no_network!();
let server = responses::start_mock_server().await;
let response_body = responses::sse(vec![
responses::ev_response_created("resp-1"),
responses::ev_completed("resp-1"),
]);
let request_recorder = responses::mount_sse_once(&server, response_body).await;
let provider = ModelProviderInfo {
name: "mock".into(),
base_url: Some(format!("{}/v1", server.uri())),
env_key: None,
env_key_instructions: None,
experimental_bearer_token: None,
auth: None,
aws: None,
wire_api: WireApi::Responses,
query_params: None,
http_headers: None,
env_http_headers: None,
request_max_retries: Some(0),
stream_max_retries: Some(0),
stream_idle_timeout_ms: Some(5_000),
websocket_connect_timeout_ms: None,
requires_openai_auth: false,
supports_websockets: false,
};
let codex_home = TempDir::new().expect("failed to create TempDir");
let mut config = load_default_config_for_test(&codex_home).await;
config.model_provider_id = provider.name.clone();
config.model_provider = provider.clone();
let effort = config.model_reasoning_effort;
let summary = config.model_reasoning_summary;
let model = codex_core::test_support::get_model_offline(config.model.as_deref());
config.model = Some(model.clone());
let config = Arc::new(config);
let root_thread_id = ThreadId::new();
let parent_thread_id = ThreadId::new();
let thread_id = ThreadId::new();
let auth_mode = TelemetryAuthMode::Chatgpt;
let session_source = SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 2,
agent_path: None,
agent_nickname: None,
agent_role: None,
});
let model_info =
codex_core::test_support::construct_model_info_offline(model.as_str(), &config);
let session_telemetry = SessionTelemetry::new(
thread_id,
model.as_str(),
model_info.slug.as_str(),
/*account_id*/ None,
Some("test@test.com".to_string()),
Some(auth_mode),
"test_originator".to_string(),
/*log_user_prompts*/ false,
"test".to_string(),
session_source.clone(),
);
let client = ModelClient::new(
/*auth_manager*/ None,
root_thread_id,
thread_id,
/*installation_id*/ TEST_INSTALLATION_ID.to_string(),
provider.clone(),
session_source,
config.model_verbosity,
/*enable_request_compression*/ false,
/*include_timing_metrics*/ false,
/*beta_features_header*/ None,
);
let mut client_session = client.new_session();
let mut prompt = Prompt::default();
prompt.input = vec![ResponseItem::Message {
id: None,
role: "user".into(),
content: vec![ContentItem::InputText {
text: "hello".into(),
}],
end_turn: None,
phase: None,
}];
let mut stream = client_session
.stream(
&prompt,
&model_info,
&session_telemetry,
effort,
summary.unwrap_or(model_info.default_reasoning_summary),
/*service_tier*/ None,
/*turn_metadata_header*/ None,
&codex_rollout_trace::InferenceTraceContext::disabled(),
)
.await
.expect("stream failed");
while let Some(event) = stream.next().await {
if matches!(event, Ok(ResponseEvent::Completed { .. })) {
break;
}
}
let request = request_recorder.single_request();
let expected_session_id = root_thread_id.to_string();
let expected_thread_id = thread_id.to_string();
let expected_parent_thread_id = parent_thread_id.to_string();
let expected_window_id = format!("{thread_id}:0");
assert_eq!(
request.header("x-openai-subagent").as_deref(),
Some("collab_spawn")
);
assert_eq!(
request.header("session_id").as_deref(),
Some(expected_session_id.as_str())
);
assert_eq!(
request.header("thread_id").as_deref(),
Some(expected_thread_id.as_str())
);
assert_eq!(
request.header("x-client-request-id").as_deref(),
Some(expected_thread_id.as_str())
);
assert_eq!(
request.header("x-codex-parent-thread-id").as_deref(),
Some(expected_parent_thread_id.as_str())
);
assert_eq!(
request.header("x-codex-window-id").as_deref(),
Some(expected_window_id.as_str())
);
}
#[tokio::test]
async fn responses_stream_includes_subagent_header_on_other() {
core_test_support::skip_if_no_network!();
@@ -227,7 +382,8 @@ async fn responses_stream_includes_subagent_header_on_other() {
let client = ModelClient::new(
/*auth_manager*/ None,
conversation_id,
/*root_thread_id*/ conversation_id,
/*thread_id*/ conversation_id,
/*installation_id*/ TEST_INSTALLATION_ID.to_string(),
provider.clone(),
session_source,
@@ -269,10 +425,19 @@ async fn responses_stream_includes_subagent_header_on_other() {
}
let request = request_recorder.single_request();
let expected_session_id = conversation_id.to_string();
assert_eq!(
request.header("x-openai-subagent").as_deref(),
Some("my-task")
);
assert_eq!(
request.header("session_id").as_deref(),
Some(expected_session_id.as_str())
);
assert_eq!(
request.header("thread_id").as_deref(),
Some(expected_session_id.as_str())
);
}
#[tokio::test]
@@ -343,7 +508,8 @@ async fn responses_respects_model_info_overrides_from_config() {
let client = ModelClient::new(
/*auth_manager*/ None,
conversation_id,
/*root_thread_id*/ conversation_id,
/*thread_id*/ conversation_id,
/*installation_id*/ TEST_INSTALLATION_ID.to_string(),
provider.clone(),
session_source,

View File

@@ -726,7 +726,7 @@ async fn resume_replays_image_tool_outputs_with_detail() {
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn includes_conversation_id_and_model_headers_in_request() {
async fn includes_session_and_thread_headers_in_request() {
skip_if_no_network!();
// Mock server
@@ -764,6 +764,7 @@ async fn includes_conversation_id_and_model_headers_in_request() {
let request = resp_mock.single_request();
assert_eq!(request.path(), "/v1/responses");
let request_session_id = request.header("session_id").expect("session_id header");
let request_thread_id = request.header("thread_id").expect("thread_id header");
let request_authorization = request
.header("authorization")
.expect("authorization header");
@@ -774,6 +775,7 @@ async fn includes_conversation_id_and_model_headers_in_request() {
.expect("read installation id");
assert_eq!(request_session_id, session_id.to_string());
assert_eq!(request_thread_id, session_id.to_string());
assert_eq!(request_originator, originator().value);
assert_eq!(request_authorization, "Bearer Test API Key");
assert_eq!(
@@ -884,7 +886,8 @@ async fn send_provider_auth_request(server: &MockServer, auth: ModelProviderAuth
Some(AuthManager::from_auth_for_testing(CodexAuth::from_api_key(
"unused-api-key",
))),
conversation_id,
/*root_thread_id*/ conversation_id,
/*thread_id*/ conversation_id,
/*installation_id*/ "11111111-1111-4111-8111-111111111111".to_string(),
provider,
SessionSource::Exec,
@@ -1001,7 +1004,6 @@ async fn chatgpt_auth_sends_correct_request() {
.await
.expect("create new conversation");
let codex = test.codex.clone();
let thread_id = test.session_configured.session_id;
codex
.submit(Op::UserInput {
@@ -1030,10 +1032,13 @@ async fn chatgpt_auth_sends_correct_request() {
let request_body = request.body_json();
let session_id = request.header("session_id").expect("session_id header");
let thread_id = request.header("thread_id").expect("thread_id header");
let installation_id =
std::fs::read_to_string(test.codex_home_path().join(INSTALLATION_ID_FILENAME))
.expect("read installation id");
assert_eq!(session_id, thread_id.to_string());
let expected_id = test.session_configured.session_id.to_string();
assert_eq!(session_id, expected_id);
assert_eq!(thread_id, expected_id);
assert_eq!(request_originator, originator().value);
assert_eq!(request_authorization, "Bearer Access Token");
@@ -2199,7 +2204,8 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() {
let client = ModelClient::new(
/*auth_manager*/ None,
conversation_id,
/*root_thread_id*/ conversation_id,
/*thread_id*/ conversation_id,
/*installation_id*/ "11111111-1111-4111-8111-111111111111".to_string(),
provider.clone(),
SessionSource::Exec,

View File

@@ -126,6 +126,14 @@ async fn responses_websocket_streams_request() {
handshake.header(X_CLIENT_REQUEST_ID_HEADER),
Some(harness.conversation_id.to_string())
);
assert_eq!(
handshake.header("session_id"),
Some(harness.conversation_id.to_string())
);
assert_eq!(
handshake.header("thread_id"),
Some(harness.conversation_id.to_string())
);
assert_eq!(
body["client_metadata"]["x-codex-installation-id"].as_str(),
Some(TEST_INSTALLATION_ID)
@@ -1825,7 +1833,8 @@ async fn websocket_harness_with_provider_options(
let summary = ReasoningSummary::Auto;
let client = ModelClient::new(
/*auth_manager*/ None,
conversation_id,
/*root_thread_id*/ conversation_id,
/*thread_id*/ conversation_id,
/*installation_id*/ TEST_INSTALLATION_ID.to_string(),
provider.clone(),
SessionSource::Exec,