mirror of
https://github.com/openai/codex.git
synced 2026-05-06 12:26:38 +00:00
Compare commits
2 Commits
pakrym/res
...
jif/sessio
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
17e9542872 | ||
|
|
05444a2eb6 |
@@ -14,7 +14,7 @@ The public interface of this crate is intentionally small and uniform:
|
||||
- **Responses endpoint**
|
||||
- Input:
|
||||
- `ResponsesApiRequest` for the request body (`model`, `instructions`, `input`, `tools`, `parallel_tool_calls`, reasoning/text controls).
|
||||
- `ResponsesOptions` for transport/header concerns (`conversation_id`, `session_source`, `extra_headers`, `compression`, `turn_state`).
|
||||
- `ResponsesOptions` for transport/header concerns (`session_id`, `thread_id`, `session_source`, `extra_headers`, `compression`, `turn_state`).
|
||||
- Output: a `ResponseStream` of `ResponseEvent` (both re-exported from `common`).
|
||||
|
||||
- **Compaction endpoint**
|
||||
|
||||
@@ -6,7 +6,7 @@ use crate::error::ApiError;
|
||||
use crate::provider::Provider;
|
||||
use crate::requests::Compression;
|
||||
use crate::requests::attach_item_ids;
|
||||
use crate::requests::headers::build_conversation_headers;
|
||||
use crate::requests::headers::build_session_headers;
|
||||
use crate::requests::headers::insert_header;
|
||||
use crate::requests::headers::subagent_header;
|
||||
use crate::sse::spawn_response_stream;
|
||||
@@ -30,7 +30,8 @@ pub struct ResponsesClient<T: HttpTransport> {
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct ResponsesOptions {
|
||||
pub conversation_id: Option<String>,
|
||||
pub session_id: Option<String>,
|
||||
pub thread_id: Option<String>,
|
||||
pub session_source: Option<SessionSource>,
|
||||
pub extra_headers: HeaderMap,
|
||||
pub compression: Compression,
|
||||
@@ -72,7 +73,8 @@ impl<T: HttpTransport> ResponsesClient<T> {
|
||||
options: ResponsesOptions,
|
||||
) -> Result<ResponseStream, ApiError> {
|
||||
let ResponsesOptions {
|
||||
conversation_id,
|
||||
session_id,
|
||||
thread_id,
|
||||
session_source,
|
||||
extra_headers,
|
||||
compression,
|
||||
@@ -86,10 +88,10 @@ impl<T: HttpTransport> ResponsesClient<T> {
|
||||
}
|
||||
|
||||
let mut headers = extra_headers;
|
||||
if let Some(ref conv_id) = conversation_id {
|
||||
insert_header(&mut headers, "x-client-request-id", conv_id);
|
||||
if let Some(ref thread_id) = thread_id {
|
||||
insert_header(&mut headers, "x-client-request-id", thread_id);
|
||||
}
|
||||
headers.extend(build_conversation_headers(conversation_id));
|
||||
headers.extend(build_session_headers(session_id, thread_id));
|
||||
if let Some(subagent) = subagent_header(&session_source) {
|
||||
insert_header(&mut headers, "x-openai-subagent", &subagent);
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ pub(crate) mod requests;
|
||||
pub(crate) mod sse;
|
||||
pub(crate) mod telemetry;
|
||||
|
||||
pub use crate::requests::headers::build_conversation_headers;
|
||||
pub use crate::requests::headers::build_session_headers;
|
||||
pub use codex_client::RequestTelemetry;
|
||||
pub use codex_client::ReqwestTransport;
|
||||
pub use codex_client::TransportError;
|
||||
|
||||
@@ -2,11 +2,14 @@ use codex_protocol::protocol::SessionSource;
|
||||
use http::HeaderMap;
|
||||
use http::HeaderValue;
|
||||
|
||||
pub fn build_conversation_headers(conversation_id: Option<String>) -> HeaderMap {
|
||||
pub fn build_session_headers(session_id: Option<String>, thread_id: Option<String>) -> HeaderMap {
|
||||
let mut headers = HeaderMap::new();
|
||||
if let Some(id) = conversation_id {
|
||||
if let Some(id) = session_id {
|
||||
insert_header(&mut headers, "session_id", &id);
|
||||
}
|
||||
if let Some(id) = thread_id {
|
||||
insert_header(&mut headers, "thread_id", &id);
|
||||
}
|
||||
headers
|
||||
}
|
||||
|
||||
|
||||
@@ -445,7 +445,8 @@ async fn azure_default_store_attaches_ids_and_headers() -> Result<()> {
|
||||
.stream_request(
|
||||
request,
|
||||
ResponsesOptions {
|
||||
conversation_id: Some("sess_123".into()),
|
||||
session_id: Some("sess_123".into()),
|
||||
thread_id: Some("thread_456".into()),
|
||||
session_source: Some(SessionSource::SubAgent(SubAgentSource::Review)),
|
||||
extra_headers,
|
||||
compression: Compression::None,
|
||||
@@ -462,6 +463,10 @@ async fn azure_default_store_attaches_ids_and_headers() -> Result<()> {
|
||||
req.headers.get("session_id").and_then(|v| v.to_str().ok()),
|
||||
Some("sess_123")
|
||||
);
|
||||
assert_eq!(
|
||||
req.headers.get("thread_id").and_then(|v| v.to_str().ok()),
|
||||
Some("thread_456")
|
||||
);
|
||||
assert_eq!(
|
||||
req.headers
|
||||
.get("x-openai-subagent")
|
||||
|
||||
@@ -58,7 +58,7 @@ use codex_api::SseTelemetry;
|
||||
use codex_api::TransportError;
|
||||
use codex_api::WebsocketTelemetry;
|
||||
use codex_api::auth_header_telemetry;
|
||||
use codex_api::build_conversation_headers;
|
||||
use codex_api::build_session_headers;
|
||||
use codex_api::create_text_param_for_request;
|
||||
use codex_api::response_create_client_metadata;
|
||||
use codex_app_server_protocol::AuthMode;
|
||||
@@ -149,7 +149,8 @@ pub(crate) const WEBSOCKET_CONNECT_TIMEOUT: Duration =
|
||||
/// configuration is per turn and is passed explicitly to streaming/unary methods.
|
||||
#[derive(Debug)]
|
||||
struct ModelClientState {
|
||||
conversation_id: ThreadId,
|
||||
root_thread_id: ThreadId,
|
||||
thread_id: ThreadId,
|
||||
window_generation: AtomicU64,
|
||||
installation_id: String,
|
||||
provider: SharedModelProvider,
|
||||
@@ -187,7 +188,7 @@ impl RequestRouteTelemetry {
|
||||
/// A session-scoped client for model-provider API calls.
|
||||
///
|
||||
/// This holds configuration and state that should be shared across turns within a Codex session
|
||||
/// (auth, provider selection, conversation id, and transport fallback state).
|
||||
/// (auth, provider selection, root/current thread ids, and transport fallback state).
|
||||
///
|
||||
/// WebSocket fallback is session-scoped: once a turn activates the HTTP fallback, subsequent turns
|
||||
/// will also use HTTP for the remainder of the session.
|
||||
@@ -294,7 +295,8 @@ impl ModelClient {
|
||||
/// are passed to [`ModelClientSession::stream`] (and other turn-scoped methods) explicitly.
|
||||
pub fn new(
|
||||
auth_manager: Option<Arc<AuthManager>>,
|
||||
conversation_id: ThreadId,
|
||||
root_thread_id: ThreadId,
|
||||
thread_id: ThreadId,
|
||||
installation_id: String,
|
||||
provider_info: ModelProviderInfo,
|
||||
session_source: SessionSource,
|
||||
@@ -312,7 +314,8 @@ impl ModelClient {
|
||||
collect_auth_env_telemetry(model_provider.info(), codex_api_key_env_enabled);
|
||||
Self {
|
||||
state: Arc::new(ModelClientState {
|
||||
conversation_id,
|
||||
root_thread_id,
|
||||
thread_id,
|
||||
window_generation: AtomicU64::new(0),
|
||||
installation_id,
|
||||
provider: model_provider,
|
||||
@@ -357,9 +360,9 @@ impl ModelClient {
|
||||
}
|
||||
|
||||
fn current_window_id(&self) -> String {
|
||||
let conversation_id = self.state.conversation_id;
|
||||
let thread_id = self.state.thread_id;
|
||||
let window_generation = self.state.window_generation.load(Ordering::Relaxed);
|
||||
format!("{conversation_id}:{window_generation}")
|
||||
format!("{thread_id}:{window_generation}")
|
||||
}
|
||||
|
||||
fn take_cached_websocket_session(&self) -> WebsocketSession {
|
||||
@@ -470,9 +473,10 @@ impl ModelClient {
|
||||
extra_headers.insert(X_CODEX_INSTALLATION_ID_HEADER, header_value);
|
||||
}
|
||||
extra_headers.extend(self.build_responses_identity_headers());
|
||||
extra_headers.extend(build_conversation_headers(Some(
|
||||
self.state.conversation_id.to_string(),
|
||||
)));
|
||||
extra_headers.extend(build_session_headers(
|
||||
Some(self.state.root_thread_id.to_string()),
|
||||
Some(self.state.thread_id.to_string()),
|
||||
));
|
||||
let trace_attempt = compaction_trace.start_attempt(&payload);
|
||||
let result = client
|
||||
.compact_input(&payload, extra_headers)
|
||||
@@ -784,16 +788,19 @@ impl ModelClient {
|
||||
turn_metadata_header: Option<&str>,
|
||||
) -> ApiHeaderMap {
|
||||
let turn_metadata_header = parse_turn_metadata_header(turn_metadata_header);
|
||||
let conversation_id = self.state.conversation_id.to_string();
|
||||
let thread_id = self.state.thread_id.to_string();
|
||||
let mut headers = build_responses_headers(
|
||||
self.state.beta_features_header.as_deref(),
|
||||
turn_state,
|
||||
turn_metadata_header.as_ref(),
|
||||
);
|
||||
if let Ok(header_value) = HeaderValue::from_str(&conversation_id) {
|
||||
if let Ok(header_value) = HeaderValue::from_str(&thread_id) {
|
||||
headers.insert("x-client-request-id", header_value);
|
||||
}
|
||||
headers.extend(build_conversation_headers(Some(conversation_id)));
|
||||
headers.extend(build_session_headers(
|
||||
Some(self.state.root_thread_id.to_string()),
|
||||
Some(thread_id),
|
||||
));
|
||||
headers.extend(self.build_responses_identity_headers());
|
||||
headers.insert(
|
||||
OPENAI_BETA_HEADER,
|
||||
@@ -875,7 +882,7 @@ impl ModelClientSession {
|
||||
&prompt.output_schema,
|
||||
prompt.output_schema_strict,
|
||||
);
|
||||
let prompt_cache_key = Some(self.client.state.conversation_id.to_string());
|
||||
let prompt_cache_key = Some(self.client.state.thread_id.to_string());
|
||||
let request = ResponsesApiRequest {
|
||||
model: model_info.slug.clone(),
|
||||
instructions: instructions.clone(),
|
||||
@@ -913,9 +920,11 @@ impl ModelClientSession {
|
||||
compression: Compression,
|
||||
) -> ApiResponsesOptions {
|
||||
let turn_metadata_header = parse_turn_metadata_header(turn_metadata_header);
|
||||
let conversation_id = self.client.state.conversation_id.to_string();
|
||||
let root_thread_id = self.client.state.root_thread_id.to_string();
|
||||
let thread_id = self.client.state.thread_id.to_string();
|
||||
ApiResponsesOptions {
|
||||
conversation_id: Some(conversation_id),
|
||||
session_id: Some(root_thread_id),
|
||||
thread_id: Some(thread_id),
|
||||
session_source: Some(self.client.state.session_source.clone()),
|
||||
extra_headers: {
|
||||
let mut headers = build_responses_headers(
|
||||
|
||||
@@ -23,7 +23,8 @@ fn test_model_client(session_source: SessionSource) -> ModelClient {
|
||||
let provider = create_oss_provider_with_base_url("https://example.com/v1", WireApi::Responses);
|
||||
ModelClient::new(
|
||||
/*auth_manager*/ None,
|
||||
ThreadId::new(),
|
||||
/*root_thread_id*/ ThreadId::new(),
|
||||
/*thread_id*/ ThreadId::new(),
|
||||
/*installation_id*/ "11111111-1111-4111-8111-111111111111".to_string(),
|
||||
provider,
|
||||
session_source,
|
||||
@@ -105,7 +106,7 @@ fn build_ws_client_metadata_includes_window_lineage_and_turn_metadata() {
|
||||
client.advance_window_generation();
|
||||
|
||||
let client_metadata = client.build_ws_client_metadata(Some(r#"{"turn_id":"turn-123"}"#));
|
||||
let conversation_id = client.state.conversation_id;
|
||||
let thread_id = client.state.thread_id;
|
||||
assert_eq!(
|
||||
client_metadata,
|
||||
std::collections::HashMap::from([
|
||||
@@ -115,7 +116,7 @@ fn build_ws_client_metadata_includes_window_lineage_and_turn_metadata() {
|
||||
),
|
||||
(
|
||||
X_CODEX_WINDOW_ID_HEADER.to_string(),
|
||||
format!("{conversation_id}:1"),
|
||||
format!("{thread_id}:1"),
|
||||
),
|
||||
(
|
||||
X_OPENAI_SUBAGENT_HEADER.to_string(),
|
||||
|
||||
@@ -76,6 +76,7 @@ pub(crate) async fn run_codex_thread_interactive(
|
||||
let (tx_ops, rx_ops) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
|
||||
let CodexSpawnOk { codex, .. } = Box::pin(Codex::spawn(CodexSpawnArgs {
|
||||
config,
|
||||
root_thread_id: Some(parent_session.root_thread_id),
|
||||
auth_manager,
|
||||
models_manager,
|
||||
environment_manager: Arc::clone(&parent_session.services.environment_manager),
|
||||
|
||||
@@ -389,6 +389,7 @@ pub struct CodexSpawnOk {
|
||||
|
||||
pub(crate) struct CodexSpawnArgs {
|
||||
pub(crate) config: Config,
|
||||
pub(crate) root_thread_id: Option<ThreadId>,
|
||||
pub(crate) auth_manager: Arc<AuthManager>,
|
||||
pub(crate) models_manager: SharedModelsManager,
|
||||
pub(crate) environment_manager: Arc<EnvironmentManager>,
|
||||
@@ -450,6 +451,7 @@ impl Codex {
|
||||
async fn spawn_internal(args: CodexSpawnArgs) -> CodexResult<CodexSpawnOk> {
|
||||
let CodexSpawnArgs {
|
||||
mut config,
|
||||
root_thread_id,
|
||||
auth_manager,
|
||||
models_manager,
|
||||
environment_manager,
|
||||
@@ -659,6 +661,7 @@ impl Codex {
|
||||
|
||||
let session = Session::new(
|
||||
session_configuration,
|
||||
root_thread_id,
|
||||
config.clone(),
|
||||
auth_manager.clone(),
|
||||
models_manager.clone(),
|
||||
|
||||
@@ -90,7 +90,7 @@ pub(super) async fn spawn_review_thread(
|
||||
let per_turn_config = Arc::new(per_turn_config);
|
||||
let review_turn_id = sub_id.to_string();
|
||||
let turn_metadata_state = Arc::new(TurnMetadataState::new(
|
||||
sess.conversation_id.to_string(),
|
||||
sess.root_thread_id.to_string(),
|
||||
&session_source,
|
||||
review_turn_id.clone(),
|
||||
parent_turn_context.cwd.clone(),
|
||||
|
||||
@@ -6,6 +6,7 @@ use tokio::sync::Semaphore;
|
||||
///
|
||||
/// A session has at most 1 running task at a time, and can be interrupted by user input.
|
||||
pub(crate) struct Session {
|
||||
pub(crate) root_thread_id: ThreadId,
|
||||
pub(crate) conversation_id: ThreadId,
|
||||
pub(super) tx_event: Sender<Event>,
|
||||
pub(super) agent_status: watch::Sender<AgentStatus>,
|
||||
@@ -251,6 +252,7 @@ impl Session {
|
||||
)]
|
||||
pub(crate) async fn new(
|
||||
mut session_configuration: SessionConfiguration,
|
||||
root_thread_id: Option<ThreadId>,
|
||||
config: Arc<Config>,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
models_manager: SharedModelsManager,
|
||||
@@ -287,6 +289,7 @@ impl Session {
|
||||
}
|
||||
InitialHistory::Resumed(resumed_history) => resumed_history.conversation_id,
|
||||
};
|
||||
let root_thread_id = root_thread_id.unwrap_or(conversation_id);
|
||||
let window_generation = match &initial_history {
|
||||
InitialHistory::Resumed(resumed_history) => u64::try_from(
|
||||
resumed_history
|
||||
@@ -757,7 +760,8 @@ impl Session {
|
||||
thread_store: Arc::clone(&thread_store),
|
||||
model_client: ModelClient::new(
|
||||
Some(Arc::clone(&auth_manager)),
|
||||
conversation_id,
|
||||
/*root_thread_id*/ root_thread_id,
|
||||
/*thread_id*/ conversation_id,
|
||||
installation_id,
|
||||
session_configuration.provider.clone(),
|
||||
session_configuration.session_source.clone(),
|
||||
@@ -783,6 +787,7 @@ impl Session {
|
||||
|
||||
let (mailbox, mailbox_rx) = Mailbox::new();
|
||||
let sess = Arc::new(Session {
|
||||
root_thread_id,
|
||||
conversation_id,
|
||||
tx_event: tx_event.clone(),
|
||||
agent_status,
|
||||
|
||||
@@ -331,6 +331,10 @@ async fn interrupting_regular_turn_waiting_on_startup_prewarm_emits_turn_aborted
|
||||
fn test_model_client_session() -> crate::client::ModelClientSession {
|
||||
crate::client::ModelClient::new(
|
||||
/*auth_manager*/ None,
|
||||
/*root_thread_id*/
|
||||
ThreadId::try_from("00000000-0000-4000-8000-000000000001")
|
||||
.expect("test thread id should be valid"),
|
||||
/*thread_id*/
|
||||
ThreadId::try_from("00000000-0000-4000-8000-000000000001")
|
||||
.expect("test thread id should be valid"),
|
||||
/*installation_id*/ "11111111-1111-4111-8111-111111111111".to_string(),
|
||||
@@ -3147,6 +3151,7 @@ async fn session_new_fails_when_zsh_fork_enabled_without_zsh_path() {
|
||||
));
|
||||
let result = Session::new(
|
||||
session_configuration,
|
||||
/*root_thread_id*/ None,
|
||||
Arc::clone(&config),
|
||||
auth_manager,
|
||||
models_manager,
|
||||
@@ -3315,7 +3320,8 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
|
||||
)),
|
||||
model_client: ModelClient::new(
|
||||
Some(auth_manager.clone()),
|
||||
conversation_id,
|
||||
/*root_thread_id*/ conversation_id,
|
||||
/*thread_id*/ conversation_id,
|
||||
/*installation_id*/ "11111111-1111-4111-8111-111111111111".to_string(),
|
||||
session_configuration.provider.clone(),
|
||||
session_configuration.session_source.clone(),
|
||||
@@ -3372,6 +3378,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
|
||||
|
||||
let (mailbox, mailbox_rx) = crate::agent::Mailbox::new();
|
||||
let session = Session {
|
||||
root_thread_id: conversation_id,
|
||||
conversation_id,
|
||||
tx_event,
|
||||
agent_status: agent_status_tx,
|
||||
@@ -3474,6 +3481,7 @@ async fn make_session_with_config_and_rx(
|
||||
|
||||
let session = Session::new(
|
||||
session_configuration,
|
||||
/*root_thread_id*/ None,
|
||||
Arc::clone(&config),
|
||||
auth_manager,
|
||||
models_manager,
|
||||
@@ -4678,7 +4686,8 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx(
|
||||
)),
|
||||
model_client: ModelClient::new(
|
||||
Some(Arc::clone(&auth_manager)),
|
||||
conversation_id,
|
||||
/*root_thread_id*/ conversation_id,
|
||||
/*thread_id*/ conversation_id,
|
||||
/*installation_id*/ "11111111-1111-4111-8111-111111111111".to_string(),
|
||||
session_configuration.provider.clone(),
|
||||
session_configuration.session_source.clone(),
|
||||
@@ -4735,6 +4744,7 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx(
|
||||
|
||||
let (mailbox, mailbox_rx) = crate::agent::Mailbox::new();
|
||||
let session = Arc::new(Session {
|
||||
root_thread_id: conversation_id,
|
||||
conversation_id,
|
||||
tx_event,
|
||||
agent_status: agent_status_tx,
|
||||
|
||||
@@ -755,6 +755,7 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() {
|
||||
|
||||
let CodexSpawnOk { codex, .. } = Codex::spawn(CodexSpawnArgs {
|
||||
config,
|
||||
root_thread_id: None,
|
||||
auth_manager,
|
||||
models_manager,
|
||||
environment_manager: Arc::new(EnvironmentManager::default_for_tests()),
|
||||
|
||||
@@ -388,7 +388,7 @@ impl Session {
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) fn make_turn_context(
|
||||
conversation_id: ThreadId,
|
||||
root_thread_id: ThreadId,
|
||||
auth_manager: Option<Arc<AuthManager>>,
|
||||
session_telemetry: &SessionTelemetry,
|
||||
provider: ModelProviderInfo,
|
||||
@@ -448,7 +448,7 @@ impl Session {
|
||||
|
||||
let per_turn_config = Arc::new(per_turn_config);
|
||||
let turn_metadata_state = Arc::new(TurnMetadataState::new(
|
||||
conversation_id.to_string(),
|
||||
root_thread_id.to_string(),
|
||||
&session_source,
|
||||
sub_id.clone(),
|
||||
cwd.clone(),
|
||||
@@ -655,7 +655,7 @@ impl Session {
|
||||
.await,
|
||||
);
|
||||
let mut turn_context: TurnContext = Self::make_turn_context(
|
||||
self.conversation_id,
|
||||
self.root_thread_id,
|
||||
Some(Arc::clone(&self.services.auth_manager)),
|
||||
&self.services.session_telemetry,
|
||||
session_configuration.provider.clone(),
|
||||
|
||||
@@ -1037,10 +1037,12 @@ impl ThreadManagerState {
|
||||
let parent_rollout_thread_trace = self
|
||||
.parent_rollout_thread_trace_for_source(&session_source, &initial_history)
|
||||
.await;
|
||||
let root_thread_id = self.root_thread_id_for_source(&session_source).await;
|
||||
let CodexSpawnOk {
|
||||
codex, thread_id, ..
|
||||
} = Codex::spawn(CodexSpawnArgs {
|
||||
config,
|
||||
root_thread_id,
|
||||
auth_manager,
|
||||
models_manager: Arc::clone(&self.models_manager),
|
||||
environment_manager: Arc::clone(&self.environment_manager),
|
||||
@@ -1068,6 +1070,23 @@ impl ThreadManagerState {
|
||||
.await
|
||||
}
|
||||
|
||||
async fn root_thread_id_for_source(&self, session_source: &SessionSource) -> Option<ThreadId> {
|
||||
let SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id,
|
||||
depth,
|
||||
..
|
||||
}) = session_source
|
||||
else {
|
||||
return None;
|
||||
};
|
||||
|
||||
if let Ok(parent_thread) = self.get_thread(*parent_thread_id).await {
|
||||
return Some(parent_thread.codex.session.root_thread_id);
|
||||
}
|
||||
|
||||
(*depth == 1).then_some(*parent_thread_id)
|
||||
}
|
||||
|
||||
async fn finalize_thread_spawn(
|
||||
&self,
|
||||
codex: Codex,
|
||||
|
||||
@@ -254,8 +254,10 @@ async fn shutdown_all_threads_bounded_submits_shutdown_to_every_thread() {
|
||||
let temp_dir = tempdir().expect("tempdir");
|
||||
let mut config = test_config().await;
|
||||
config.codex_home = temp_dir.path().join("codex-home").abs();
|
||||
config.sqlite_home = temp_dir.path().join("sqlite-home").abs().to_path_buf();
|
||||
config.cwd = config.codex_home.abs();
|
||||
std::fs::create_dir_all(&config.codex_home).expect("create codex home");
|
||||
std::fs::create_dir_all(&config.sqlite_home).expect("create sqlite home");
|
||||
|
||||
let manager = ThreadManager::with_models_provider_and_home_for_tests(
|
||||
CodexAuth::from_api_key("dummy"),
|
||||
|
||||
@@ -100,7 +100,8 @@ async fn responses_stream_includes_subagent_header_on_review() {
|
||||
|
||||
let client = ModelClient::new(
|
||||
/*auth_manager*/ None,
|
||||
conversation_id,
|
||||
/*root_thread_id*/ conversation_id,
|
||||
/*thread_id*/ conversation_id,
|
||||
/*installation_id*/ TEST_INSTALLATION_ID.to_string(),
|
||||
provider.clone(),
|
||||
session_source,
|
||||
@@ -142,6 +143,7 @@ async fn responses_stream_includes_subagent_header_on_review() {
|
||||
}
|
||||
|
||||
let request = request_recorder.single_request();
|
||||
let expected_session_id = conversation_id.to_string();
|
||||
let expected_window_id = format!("{conversation_id}:0");
|
||||
assert_eq!(
|
||||
request.header("x-openai-subagent").as_deref(),
|
||||
@@ -151,6 +153,14 @@ async fn responses_stream_includes_subagent_header_on_review() {
|
||||
request.header("x-codex-window-id").as_deref(),
|
||||
Some(expected_window_id.as_str())
|
||||
);
|
||||
assert_eq!(
|
||||
request.header("session_id").as_deref(),
|
||||
Some(expected_session_id.as_str())
|
||||
);
|
||||
assert_eq!(
|
||||
request.header("thread_id").as_deref(),
|
||||
Some(expected_session_id.as_str())
|
||||
);
|
||||
assert_eq!(request.header("x-codex-parent-thread-id"), None);
|
||||
assert_eq!(
|
||||
request.body_json()["client_metadata"]["x-codex-installation-id"].as_str(),
|
||||
@@ -159,6 +169,151 @@ async fn responses_stream_includes_subagent_header_on_review() {
|
||||
assert_eq!(request.header("x-codex-sandbox"), None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn responses_stream_separates_session_id_from_thread_id_for_thread_spawn() {
|
||||
core_test_support::skip_if_no_network!();
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
let response_body = responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_completed("resp-1"),
|
||||
]);
|
||||
|
||||
let request_recorder = responses::mount_sse_once(&server, response_body).await;
|
||||
|
||||
let provider = ModelProviderInfo {
|
||||
name: "mock".into(),
|
||||
base_url: Some(format!("{}/v1", server.uri())),
|
||||
env_key: None,
|
||||
env_key_instructions: None,
|
||||
experimental_bearer_token: None,
|
||||
auth: None,
|
||||
aws: None,
|
||||
wire_api: WireApi::Responses,
|
||||
query_params: None,
|
||||
http_headers: None,
|
||||
env_http_headers: None,
|
||||
request_max_retries: Some(0),
|
||||
stream_max_retries: Some(0),
|
||||
stream_idle_timeout_ms: Some(5_000),
|
||||
websocket_connect_timeout_ms: None,
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
};
|
||||
|
||||
let codex_home = TempDir::new().expect("failed to create TempDir");
|
||||
let mut config = load_default_config_for_test(&codex_home).await;
|
||||
config.model_provider_id = provider.name.clone();
|
||||
config.model_provider = provider.clone();
|
||||
let effort = config.model_reasoning_effort;
|
||||
let summary = config.model_reasoning_summary;
|
||||
let model = codex_core::test_support::get_model_offline(config.model.as_deref());
|
||||
config.model = Some(model.clone());
|
||||
let config = Arc::new(config);
|
||||
|
||||
let root_thread_id = ThreadId::new();
|
||||
let parent_thread_id = ThreadId::new();
|
||||
let thread_id = ThreadId::new();
|
||||
let auth_mode = TelemetryAuthMode::Chatgpt;
|
||||
let session_source = SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id,
|
||||
depth: 2,
|
||||
agent_path: None,
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
});
|
||||
let model_info =
|
||||
codex_core::test_support::construct_model_info_offline(model.as_str(), &config);
|
||||
|
||||
let session_telemetry = SessionTelemetry::new(
|
||||
thread_id,
|
||||
model.as_str(),
|
||||
model_info.slug.as_str(),
|
||||
/*account_id*/ None,
|
||||
Some("test@test.com".to_string()),
|
||||
Some(auth_mode),
|
||||
"test_originator".to_string(),
|
||||
/*log_user_prompts*/ false,
|
||||
"test".to_string(),
|
||||
session_source.clone(),
|
||||
);
|
||||
|
||||
let client = ModelClient::new(
|
||||
/*auth_manager*/ None,
|
||||
root_thread_id,
|
||||
thread_id,
|
||||
/*installation_id*/ TEST_INSTALLATION_ID.to_string(),
|
||||
provider.clone(),
|
||||
session_source,
|
||||
config.model_verbosity,
|
||||
/*enable_request_compression*/ false,
|
||||
/*include_timing_metrics*/ false,
|
||||
/*beta_features_header*/ None,
|
||||
);
|
||||
let mut client_session = client.new_session();
|
||||
|
||||
let mut prompt = Prompt::default();
|
||||
prompt.input = vec![ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".into(),
|
||||
content: vec![ContentItem::InputText {
|
||||
text: "hello".into(),
|
||||
}],
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
}];
|
||||
|
||||
let mut stream = client_session
|
||||
.stream(
|
||||
&prompt,
|
||||
&model_info,
|
||||
&session_telemetry,
|
||||
effort,
|
||||
summary.unwrap_or(model_info.default_reasoning_summary),
|
||||
/*service_tier*/ None,
|
||||
/*turn_metadata_header*/ None,
|
||||
&codex_rollout_trace::InferenceTraceContext::disabled(),
|
||||
)
|
||||
.await
|
||||
.expect("stream failed");
|
||||
while let Some(event) = stream.next().await {
|
||||
if matches!(event, Ok(ResponseEvent::Completed { .. })) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let request = request_recorder.single_request();
|
||||
let expected_session_id = root_thread_id.to_string();
|
||||
let expected_thread_id = thread_id.to_string();
|
||||
let expected_parent_thread_id = parent_thread_id.to_string();
|
||||
let expected_window_id = format!("{thread_id}:0");
|
||||
|
||||
assert_eq!(
|
||||
request.header("x-openai-subagent").as_deref(),
|
||||
Some("collab_spawn")
|
||||
);
|
||||
assert_eq!(
|
||||
request.header("session_id").as_deref(),
|
||||
Some(expected_session_id.as_str())
|
||||
);
|
||||
assert_eq!(
|
||||
request.header("thread_id").as_deref(),
|
||||
Some(expected_thread_id.as_str())
|
||||
);
|
||||
assert_eq!(
|
||||
request.header("x-client-request-id").as_deref(),
|
||||
Some(expected_thread_id.as_str())
|
||||
);
|
||||
assert_eq!(
|
||||
request.header("x-codex-parent-thread-id").as_deref(),
|
||||
Some(expected_parent_thread_id.as_str())
|
||||
);
|
||||
assert_eq!(
|
||||
request.header("x-codex-window-id").as_deref(),
|
||||
Some(expected_window_id.as_str())
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn responses_stream_includes_subagent_header_on_other() {
|
||||
core_test_support::skip_if_no_network!();
|
||||
@@ -227,7 +382,8 @@ async fn responses_stream_includes_subagent_header_on_other() {
|
||||
|
||||
let client = ModelClient::new(
|
||||
/*auth_manager*/ None,
|
||||
conversation_id,
|
||||
/*root_thread_id*/ conversation_id,
|
||||
/*thread_id*/ conversation_id,
|
||||
/*installation_id*/ TEST_INSTALLATION_ID.to_string(),
|
||||
provider.clone(),
|
||||
session_source,
|
||||
@@ -269,10 +425,19 @@ async fn responses_stream_includes_subagent_header_on_other() {
|
||||
}
|
||||
|
||||
let request = request_recorder.single_request();
|
||||
let expected_session_id = conversation_id.to_string();
|
||||
assert_eq!(
|
||||
request.header("x-openai-subagent").as_deref(),
|
||||
Some("my-task")
|
||||
);
|
||||
assert_eq!(
|
||||
request.header("session_id").as_deref(),
|
||||
Some(expected_session_id.as_str())
|
||||
);
|
||||
assert_eq!(
|
||||
request.header("thread_id").as_deref(),
|
||||
Some(expected_session_id.as_str())
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -343,7 +508,8 @@ async fn responses_respects_model_info_overrides_from_config() {
|
||||
|
||||
let client = ModelClient::new(
|
||||
/*auth_manager*/ None,
|
||||
conversation_id,
|
||||
/*root_thread_id*/ conversation_id,
|
||||
/*thread_id*/ conversation_id,
|
||||
/*installation_id*/ TEST_INSTALLATION_ID.to_string(),
|
||||
provider.clone(),
|
||||
session_source,
|
||||
|
||||
@@ -726,7 +726,7 @@ async fn resume_replays_image_tool_outputs_with_detail() {
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn includes_conversation_id_and_model_headers_in_request() {
|
||||
async fn includes_session_and_thread_headers_in_request() {
|
||||
skip_if_no_network!();
|
||||
|
||||
// Mock server
|
||||
@@ -764,6 +764,7 @@ async fn includes_conversation_id_and_model_headers_in_request() {
|
||||
let request = resp_mock.single_request();
|
||||
assert_eq!(request.path(), "/v1/responses");
|
||||
let request_session_id = request.header("session_id").expect("session_id header");
|
||||
let request_thread_id = request.header("thread_id").expect("thread_id header");
|
||||
let request_authorization = request
|
||||
.header("authorization")
|
||||
.expect("authorization header");
|
||||
@@ -774,6 +775,7 @@ async fn includes_conversation_id_and_model_headers_in_request() {
|
||||
.expect("read installation id");
|
||||
|
||||
assert_eq!(request_session_id, session_id.to_string());
|
||||
assert_eq!(request_thread_id, session_id.to_string());
|
||||
assert_eq!(request_originator, originator().value);
|
||||
assert_eq!(request_authorization, "Bearer Test API Key");
|
||||
assert_eq!(
|
||||
@@ -884,7 +886,8 @@ async fn send_provider_auth_request(server: &MockServer, auth: ModelProviderAuth
|
||||
Some(AuthManager::from_auth_for_testing(CodexAuth::from_api_key(
|
||||
"unused-api-key",
|
||||
))),
|
||||
conversation_id,
|
||||
/*root_thread_id*/ conversation_id,
|
||||
/*thread_id*/ conversation_id,
|
||||
/*installation_id*/ "11111111-1111-4111-8111-111111111111".to_string(),
|
||||
provider,
|
||||
SessionSource::Exec,
|
||||
@@ -1001,7 +1004,6 @@ async fn chatgpt_auth_sends_correct_request() {
|
||||
.await
|
||||
.expect("create new conversation");
|
||||
let codex = test.codex.clone();
|
||||
let thread_id = test.session_configured.session_id;
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
@@ -1030,10 +1032,13 @@ async fn chatgpt_auth_sends_correct_request() {
|
||||
let request_body = request.body_json();
|
||||
|
||||
let session_id = request.header("session_id").expect("session_id header");
|
||||
let thread_id = request.header("thread_id").expect("thread_id header");
|
||||
let installation_id =
|
||||
std::fs::read_to_string(test.codex_home_path().join(INSTALLATION_ID_FILENAME))
|
||||
.expect("read installation id");
|
||||
assert_eq!(session_id, thread_id.to_string());
|
||||
let expected_id = test.session_configured.session_id.to_string();
|
||||
assert_eq!(session_id, expected_id);
|
||||
assert_eq!(thread_id, expected_id);
|
||||
|
||||
assert_eq!(request_originator, originator().value);
|
||||
assert_eq!(request_authorization, "Bearer Access Token");
|
||||
@@ -2199,7 +2204,8 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() {
|
||||
|
||||
let client = ModelClient::new(
|
||||
/*auth_manager*/ None,
|
||||
conversation_id,
|
||||
/*root_thread_id*/ conversation_id,
|
||||
/*thread_id*/ conversation_id,
|
||||
/*installation_id*/ "11111111-1111-4111-8111-111111111111".to_string(),
|
||||
provider.clone(),
|
||||
SessionSource::Exec,
|
||||
|
||||
@@ -126,6 +126,14 @@ async fn responses_websocket_streams_request() {
|
||||
handshake.header(X_CLIENT_REQUEST_ID_HEADER),
|
||||
Some(harness.conversation_id.to_string())
|
||||
);
|
||||
assert_eq!(
|
||||
handshake.header("session_id"),
|
||||
Some(harness.conversation_id.to_string())
|
||||
);
|
||||
assert_eq!(
|
||||
handshake.header("thread_id"),
|
||||
Some(harness.conversation_id.to_string())
|
||||
);
|
||||
assert_eq!(
|
||||
body["client_metadata"]["x-codex-installation-id"].as_str(),
|
||||
Some(TEST_INSTALLATION_ID)
|
||||
@@ -1825,7 +1833,8 @@ async fn websocket_harness_with_provider_options(
|
||||
let summary = ReasoningSummary::Auto;
|
||||
let client = ModelClient::new(
|
||||
/*auth_manager*/ None,
|
||||
conversation_id,
|
||||
/*root_thread_id*/ conversation_id,
|
||||
/*thread_id*/ conversation_id,
|
||||
/*installation_id*/ TEST_INSTALLATION_ID.to_string(),
|
||||
provider.clone(),
|
||||
SessionSource::Exec,
|
||||
|
||||
Reference in New Issue
Block a user