Files
codex/codex-rs/codex-api/src/endpoint/responses.rs
jif-oai a98623511b feat: add session_id (#20437)
## Summary

Related to
https://openai.slack.com/archives/C095U48JNL9/p1777537279707449
TLDR:
We update the meaning of session ids and thread ids:
* thread_id stays as now
* session_id become a shared id between every thread under a /root
thread (i.e. every sub-agent share the same session id)

This PR introduces an explicit `SessionId` and threads it through the
protocol/client boundary so `session_id` and `thread_id` can diverge
when they need to, while preserving compatibility for older serialized
`session_configured` events.

---------

Co-authored-by: Codex <noreply@openai.com>
2026-05-06 10:48:37 +02:00

154 lines
4.5 KiB
Rust

use crate::auth::SharedAuthProvider;
use crate::common::ResponseStream;
use crate::common::ResponsesApiRequest;
use crate::endpoint::session::EndpointSession;
use crate::error::ApiError;
use crate::provider::Provider;
use crate::requests::Compression;
use crate::requests::attach_item_ids;
use crate::requests::headers::build_session_headers;
use crate::requests::headers::insert_header;
use crate::requests::headers::subagent_header;
use crate::sse::spawn_response_stream;
use crate::telemetry::SseTelemetry;
use codex_client::HttpTransport;
use codex_client::RequestCompression;
use codex_client::RequestTelemetry;
use codex_protocol::protocol::SessionSource;
use http::HeaderMap;
use http::HeaderValue;
use http::Method;
use serde_json::Value;
use std::sync::Arc;
use std::sync::OnceLock;
use tracing::instrument;
pub struct ResponsesClient<T: HttpTransport> {
session: EndpointSession<T>,
sse_telemetry: Option<Arc<dyn SseTelemetry>>,
}
#[derive(Default)]
pub struct ResponsesOptions {
pub session_id: Option<String>,
pub thread_id: Option<String>,
pub session_source: Option<SessionSource>,
pub extra_headers: HeaderMap,
pub compression: Compression,
pub turn_state: Option<Arc<OnceLock<String>>>,
}
impl<T: HttpTransport> ResponsesClient<T> {
pub fn new(transport: T, provider: Provider, auth: SharedAuthProvider) -> Self {
Self {
session: EndpointSession::new(transport, provider, auth),
sse_telemetry: None,
}
}
pub fn with_telemetry(
self,
request: Option<Arc<dyn RequestTelemetry>>,
sse: Option<Arc<dyn SseTelemetry>>,
) -> Self {
Self {
session: self.session.with_request_telemetry(request),
sse_telemetry: sse,
}
}
#[instrument(
name = "responses.stream_request",
level = "info",
skip_all,
fields(
transport = "responses_http",
http.method = "POST",
api.path = "responses"
)
)]
pub async fn stream_request(
&self,
request: ResponsesApiRequest,
options: ResponsesOptions,
) -> Result<ResponseStream, ApiError> {
let ResponsesOptions {
session_id,
thread_id,
session_source,
extra_headers,
compression,
turn_state,
} = options;
let mut body = serde_json::to_value(&request)
.map_err(|e| ApiError::Stream(format!("failed to encode responses request: {e}")))?;
if request.store && self.session.provider().is_azure_responses_endpoint() {
attach_item_ids(&mut body, &request.input);
}
let mut headers = extra_headers;
if let Some(ref thread_id) = thread_id {
insert_header(&mut headers, "x-client-request-id", thread_id);
}
headers.extend(build_session_headers(session_id, thread_id));
if let Some(subagent) = subagent_header(&session_source) {
insert_header(&mut headers, "x-openai-subagent", &subagent);
}
self.stream(body, headers, compression, turn_state).await
}
fn path() -> &'static str {
"responses"
}
#[instrument(
name = "responses.stream",
level = "info",
skip_all,
fields(
transport = "responses_http",
http.method = "POST",
api.path = "responses",
turn.has_state = turn_state.is_some()
)
)]
pub async fn stream(
&self,
body: Value,
extra_headers: HeaderMap,
compression: Compression,
turn_state: Option<Arc<OnceLock<String>>>,
) -> Result<ResponseStream, ApiError> {
let request_compression = match compression {
Compression::None => RequestCompression::None,
Compression::Zstd => RequestCompression::Zstd,
};
let stream_response = self
.session
.stream_with(
Method::POST,
Self::path(),
extra_headers,
Some(body),
|req| {
req.headers.insert(
http::header::ACCEPT,
HeaderValue::from_static("text/event-stream"),
);
req.compression = request_compression;
},
)
.await?;
Ok(spawn_response_stream(
stream_response,
self.session.provider().stream_idle_timeout,
self.sse_telemetry.clone(),
turn_state,
))
}
}