Compare commits

...

14 Commits

Author SHA1 Message Date
jif-oai
a03b55af5c Merge remote-tracking branch 'origin/main' into jif/client-2 2025-11-17 16:56:37 +00:00
jif-oai
c26ffe82f9 NIT 2025-11-14 14:24:14 +01:00
jif-oai
cf15b86f5c Merge branch 'main' into jif/client-2
# Conflicts:
#	codex-rs/core/src/client.rs
#	codex-rs/core/src/client/chat_completions.rs
#	codex-rs/core/src/client_common.rs
2025-11-14 14:04:43 +01:00
jif-oai
c8a6e4ddd7 Clippy again 2025-11-13 17:30:32 +01:00
jif-oai
c77254d070 Clippy 2025-11-13 17:18:20 +01:00
jif-oai
d2e876403a Clippy 2025-11-13 17:14:50 +01:00
jif-oai
e4e627d1a3 Clean-up 3 2025-11-13 17:09:04 +01:00
jif-oai
29d93176b6 Clean-up 2 2025-11-13 16:49:59 +01:00
jif-oai
1cd9e24232 Clean-up 2025-11-13 16:31:39 +01:00
jif-oai
2790ddff0a Clippy 2025-11-13 16:17:33 +01:00
jif-oai
38b568b504 Tests 3 2025-11-13 16:11:10 +01:00
jif-oai
def4a996c4 Tests 2 2025-11-13 15:56:18 +01:00
jif-oai
c2347f75b3 Tests 2025-11-13 15:32:34 +01:00
jif-oai
1d8f839b7d V1 2025-11-13 14:17:16 +01:00
33 changed files with 2340 additions and 2266 deletions

View File

@@ -101,12 +101,12 @@ use codex_core::RolloutRecorder;
use codex_core::SessionMeta;
use codex_core::auth::CLIENT_ID;
use codex_core::auth::login_with_api_key;
use codex_core::client::http::get_codex_user_agent;
use codex_core::config::Config;
use codex_core::config::ConfigOverrides;
use codex_core::config::ConfigToml;
use codex_core::config::edit::ConfigEditsBuilder;
use codex_core::config_loader::load_config_as_toml;
use codex_core::default_client::get_codex_user_agent;
use codex_core::exec::ExecParams;
use codex_core::exec_env::create_env;
use codex_core::find_conversation_path_by_id_str;

View File

@@ -14,9 +14,9 @@ use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::JSONRPCResponse;
use codex_core::AuthManager;
use codex_core::ConversationManager;
use codex_core::client::http::USER_AGENT_SUFFIX;
use codex_core::client::http::get_codex_user_agent;
use codex_core::config::Config;
use codex_core::default_client::USER_AGENT_SUFFIX;
use codex_core::default_client::get_codex_user_agent;
use codex_feedback::CodexFeedback;
use codex_protocol::protocol::SessionSource;
use std::sync::Arc;

View File

@@ -5,7 +5,7 @@ use crate::types::RateLimitWindowSnapshot;
use crate::types::TurnAttemptsSiblingTurnsResponse;
use anyhow::Result;
use codex_core::auth::CodexAuth;
use codex_core::default_client::get_codex_user_agent;
use codex_core::client::http::get_codex_user_agent;
use codex_protocol::protocol::RateLimitSnapshot;
use codex_protocol::protocol::RateLimitWindow;
use reqwest::header::AUTHORIZATION;

View File

@@ -1,5 +1,5 @@
use codex_core::client::http::create_client;
use codex_core::config::Config;
use codex_core::default_client::create_client;
use crate::chatgpt_token::get_chatgpt_token_data;
use crate::chatgpt_token::init_chatgpt_token_from_auth;

View File

@@ -48,7 +48,7 @@ async fn init_backend(user_agent_suffix: &str) -> anyhow::Result<BackendContext>
});
}
let ua = codex_core::default_client::get_codex_user_agent();
let ua = codex_core::client::http::get_codex_user_agent();
let mut http = codex_cloud_tasks_client::HttpClient::new(base_url.clone())?.with_user_agent(ua);
let style = if base_url.contains("/backend-api") {
"wham"
@@ -384,7 +384,7 @@ pub async fn run_main(cli: Cli, _codex_linux_sandbox_exe: Option<PathBuf>) -> an
append_error_log(format!(
"startup: wham_force_internal={} ua={}",
force_internal,
codex_core::default_client::get_codex_user_agent()
codex_core::client::http::get_codex_user_agent()
));
// Non-blocking initial load so the in-box spinner can animate
app.status = "Loading tasks…".to_string();

View File

@@ -7,7 +7,7 @@ use codex_core::config::ConfigOverrides;
use codex_login::AuthManager;
pub fn set_user_agent_suffix(suffix: &str) {
if let Ok(mut guard) = codex_core::default_client::USER_AGENT_SUFFIX.lock() {
if let Ok(mut guard) = codex_core::client::http::USER_AGENT_SUFFIX.lock() {
guard.replace(suffix.to_string());
}
}
@@ -79,7 +79,7 @@ pub async fn build_chatgpt_headers() -> HeaderMap {
use reqwest::header::USER_AGENT;
set_user_agent_suffix("codex_cloud_tasks_tui");
let ua = codex_core::default_client::get_codex_user_agent();
let ua = codex_core::client::http::get_codex_user_agent();
let mut headers = HeaderMap::new();
headers.insert(
USER_AGENT,

View File

@@ -23,7 +23,6 @@ pub use crate::auth::storage::AuthDotJson;
use crate::auth::storage::AuthStorageBackend;
use crate::auth::storage::create_auth_storage;
use crate::config::Config;
use crate::default_client::CodexHttpClient;
use crate::error::RefreshTokenFailedError;
use crate::error::RefreshTokenFailedReason;
use crate::token_data::KnownPlan as InternalKnownPlan;
@@ -272,7 +271,7 @@ impl CodexAuth {
mode: AuthMode::ChatGPT,
storage: create_auth_storage(PathBuf::new(), AuthCredentialsStoreMode::File),
auth_dot_json,
client: crate::default_client::create_client(),
client: crate::client::http::create_client(),
}
}
@@ -287,7 +286,7 @@ impl CodexAuth {
}
pub fn from_api_key(api_key: &str) -> Self {
Self::from_api_key_with_client(api_key, crate::default_client::create_client())
Self::from_api_key_with_client(api_key, crate::client::http::create_client())
}
}
@@ -447,7 +446,7 @@ fn load_auth(
auth_credentials_store_mode: AuthCredentialsStoreMode,
) -> std::io::Result<Option<CodexAuth>> {
if enable_codex_api_key_env && let Some(api_key) = read_codex_api_key_from_env() {
let client = crate::default_client::create_client();
let client = crate::client::http::create_client();
return Ok(Some(CodexAuth::from_api_key_with_client(
api_key.as_str(),
client,
@@ -456,7 +455,7 @@ fn load_auth(
let storage = create_auth_storage(codex_home.to_path_buf(), auth_credentials_store_mode);
let client = crate::default_client::create_client();
let client = crate::client::http::create_client();
let auth_dot_json = match storage.load()? {
Some(auth) => auth,
None => return Ok(None),
@@ -632,6 +631,7 @@ fn refresh_token_endpoint() -> String {
.unwrap_or_else(|_| REFRESH_TOKEN_URL.to_string())
}
use crate::client::http::CodexHttpClient;
use std::sync::RwLock;
/// Internal cached auth state.

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,235 @@
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use crate::client::ResponseEvent;
use crate::error::Result;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ReasoningItemContent;
use codex_protocol::models::ResponseItem;
use futures::Stream;
/// Optional client-side aggregation helper
///
/// Stream adapter that merges the incremental `OutputItemDone` chunks coming from
/// the chat SSE decoder into a *running* assistant message, **suppressing the
/// per-token deltas**. The stream stays silent while the model is thinking and
/// only emits two events per turn:
///
/// 1. `ResponseEvent::OutputItemDone` with the *complete* assistant message
/// (fully concatenated).
/// 2. The original `ResponseEvent::Completed` right after it.
///
/// The adapter is intentionally *lossless*: callers who do **not** opt in via
/// [`AggregateStreamExt::aggregate()`] keep receiving the original unmodified
/// events.
#[derive(Copy, Clone, Eq, PartialEq)]
enum AggregateMode {
AggregatedOnly,
Streaming,
}
pub(crate) struct AggregatedChatStream<S> {
inner: S,
cumulative: String,
cumulative_reasoning: String,
pending: std::collections::VecDeque<ResponseEvent>,
mode: AggregateMode,
}
impl<S> Stream for AggregatedChatStream<S>
where
S: Stream<Item = Result<ResponseEvent>> + Unpin,
{
type Item = Result<ResponseEvent>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
// First, flush any buffered events from the previous call.
if let Some(ev) = this.pending.pop_front() {
return Poll::Ready(Some(Ok(ev)));
}
loop {
match Pin::new(&mut this.inner).poll_next(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone(item)))) => {
// If this is an incremental assistant message chunk, accumulate but
// do NOT emit yet. Forward any other item (e.g. FunctionCall) right
// away so downstream consumers see it.
let is_assistant_message = matches!(
&item,
ResponseItem::Message { role, .. } if role == "assistant"
);
if is_assistant_message {
match this.mode {
AggregateMode::AggregatedOnly => {
// Only use the final assistant message if we have not
// seen any deltas; otherwise, deltas already built the
// cumulative text and this would duplicate it.
if this.cumulative.is_empty()
&& let ResponseItem::Message { content, .. } = &item
&& let Some(text) = content.iter().find_map(|c| match c {
ContentItem::OutputText { text } => Some(text),
_ => None,
})
{
this.cumulative.push_str(text);
}
// Swallow assistant message here; emit on Completed.
continue;
}
AggregateMode::Streaming => {
// In streaming mode, if we have not seen any deltas, forward
// the final assistant message directly. If deltas were seen,
// suppress the final message to avoid duplication.
if this.cumulative.is_empty() {
return Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone(
item,
))));
}
continue;
}
}
}
// Not an assistant message forward immediately.
return Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone(item))));
}
Poll::Ready(Some(Ok(ResponseEvent::RateLimits(snapshot)))) => {
return Poll::Ready(Some(Ok(ResponseEvent::RateLimits(snapshot))));
}
Poll::Ready(Some(Ok(ResponseEvent::Completed {
response_id,
token_usage,
}))) => {
// Build any aggregated items in the correct order: Reasoning first, then Message.
let mut emitted_any = false;
if !this.cumulative_reasoning.is_empty()
&& matches!(this.mode, AggregateMode::AggregatedOnly)
{
let aggregated_reasoning = ResponseItem::Reasoning {
id: String::new(),
summary: Vec::new(),
content: Some(vec![ReasoningItemContent::ReasoningText {
text: std::mem::take(&mut this.cumulative_reasoning),
}]),
encrypted_content: None,
};
this.pending
.push_back(ResponseEvent::OutputItemDone(aggregated_reasoning));
emitted_any = true;
}
// Always emit the final aggregated assistant message when any
// content deltas have been observed. In AggregatedOnly mode this
// is the sole assistant output; in Streaming mode this finalizes
// the streamed deltas into a terminal OutputItemDone so callers
// can persist/render the message once per turn.
if !this.cumulative.is_empty() {
let aggregated_message = ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: std::mem::take(&mut this.cumulative),
}],
};
this.pending
.push_back(ResponseEvent::OutputItemDone(aggregated_message));
emitted_any = true;
}
// Always emit Completed last when anything was aggregated.
if emitted_any {
this.pending.push_back(ResponseEvent::Completed {
response_id: response_id.clone(),
token_usage: token_usage.clone(),
});
if let Some(ev) = this.pending.pop_front() {
return Poll::Ready(Some(Ok(ev)));
}
}
// Nothing aggregated forward Completed directly.
return Poll::Ready(Some(Ok(ResponseEvent::Completed {
response_id,
token_usage,
})));
}
Poll::Ready(Some(Ok(ResponseEvent::Created))) => {
// These events are exclusive to the Responses API and
// will never appear in a Chat Completions stream.
continue;
}
Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(delta)))) => {
// Always accumulate deltas so we can emit a final OutputItemDone at Completed.
this.cumulative.push_str(&delta);
if matches!(this.mode, AggregateMode::Streaming) {
// In streaming mode, also forward the delta immediately.
return Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(delta))));
}
}
Poll::Ready(Some(Ok(ResponseEvent::ReasoningContentDelta {
delta,
content_index,
}))) => {
// Always accumulate reasoning deltas so we can emit a final Reasoning item at Completed.
this.cumulative_reasoning.push_str(&delta);
if matches!(this.mode, AggregateMode::Streaming) {
// In streaming mode, also forward the delta immediately.
return Poll::Ready(Some(Ok(ResponseEvent::ReasoningContentDelta {
delta,
content_index,
})));
}
}
Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta { .. }))) => {}
Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryPartAdded { .. }))) => {}
Poll::Ready(Some(Ok(ResponseEvent::OutputItemAdded(item)))) => {
return Poll::Ready(Some(Ok(ResponseEvent::OutputItemAdded(item))));
}
}
}
}
}
/// Extension trait that activates aggregation on any stream of [`ResponseEvent`].
pub(crate) trait AggregateStreamExt: Stream<Item = Result<ResponseEvent>> + Sized {
/// Returns a new stream that emits **only** the final assistant message
/// per turn instead of every incremental delta. The produced
/// `ResponseEvent` sequence for a typical text turn looks like:
///
/// ```ignore
/// OutputItemDone(<full message>)
/// Completed
/// ```
///
/// No other `OutputItemDone` events will be seen by the caller.
fn aggregate(self) -> AggregatedChatStream<Self> {
AggregatedChatStream::new(self, AggregateMode::AggregatedOnly)
}
}
impl<T> AggregateStreamExt for T where T: Stream<Item = Result<ResponseEvent>> + Sized {}
impl<S> AggregatedChatStream<S> {
fn new(inner: S, mode: AggregateMode) -> Self {
AggregatedChatStream {
inner,
cumulative: String::new(),
cumulative_reasoning: String::new(),
pending: std::collections::VecDeque::new(),
mode,
}
}
pub(crate) fn streaming_mode(inner: S) -> Self {
Self::new(inner, AggregateMode::Streaming)
}
}

View File

@@ -1,15 +1,16 @@
use std::time::Duration;
use crate::ModelProviderInfo;
use crate::client::ResponseEvent;
use crate::client::ResponseStream;
use crate::client::http::CodexHttpClient;
use crate::client::retry::RetryableStreamError;
use crate::client::retry::retry_stream;
use crate::client_common::Prompt;
use crate::client_common::ResponseEvent;
use crate::client_common::ResponseStream;
use crate::default_client::CodexHttpClient;
use crate::error::CodexErr;
use crate::error::ConnectionFailedError;
use crate::error::ResponseStreamFailed;
use crate::error::Result;
use crate::error::RetryLimitReachedError;
use crate::error::UnexpectedResponseError;
use crate::model_family::ModelFamily;
use crate::tools::spec::create_tools_json_for_chat_completions_api;
@@ -21,19 +22,12 @@ use codex_protocol::models::FunctionCallOutputContentItem;
use codex_protocol::models::ReasoningItemContent;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use eventsource_stream::Eventsource;
use futures::Stream;
use futures::StreamExt;
use futures::TryStreamExt;
use reqwest::StatusCode;
use serde_json::json;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use tokio::sync::mpsc;
use tokio::time::timeout;
use tracing::debug;
use tracing::trace;
/// Implementation for the classic Chat Completions API.
@@ -55,7 +49,7 @@ pub(crate) async fn stream_chat_completions(
let mut messages = Vec::<serde_json::Value>::new();
let full_instructions = prompt.get_full_instructions(model_family);
messages.push(json!({"role": "system", "content": full_instructions}));
messages.push(json!({ "role": "system", "content": full_instructions }));
let input = prompt.get_formatted_input();
@@ -128,7 +122,7 @@ pub(crate) async fn stream_chat_completions(
{
reasoning_by_anchor_index
.entry(idx - 1)
.and_modify(|v| v.push_str(&text))
.and_modify(|v| v.push_str(text.as_str()))
.or_insert(text.clone());
attached = true;
}
@@ -139,13 +133,13 @@ pub(crate) async fn stream_chat_completions(
ResponseItem::FunctionCall { .. } | ResponseItem::LocalShellCall { .. } => {
reasoning_by_anchor_index
.entry(idx + 1)
.and_modify(|v| v.push_str(&text))
.and_modify(|v| v.push_str(text.as_str()))
.or_insert(text.clone());
}
ResponseItem::Message { role, .. } if role == "assistant" => {
reasoning_by_anchor_index
.entry(idx + 1)
.and_modify(|v| v.push_str(&text))
.and_modify(|v| v.push_str(text.as_str()))
.or_insert(text.clone());
}
_ => {}
@@ -203,13 +197,18 @@ pub(crate) async fn stream_chat_completions(
json!(text)
};
let mut msg = json!({"role": role, "content": content_value});
let mut msg = json!({
"role": role,
"content": content_value
});
if role == "assistant"
&& let Some(reasoning) = reasoning_by_anchor_index.get(&idx)
&& let Some(obj) = msg.as_object_mut()
{
obj.insert("reasoning".to_string(), json!(reasoning));
}
messages.push(msg);
}
ResponseItem::FunctionCall {
@@ -228,22 +227,24 @@ pub(crate) async fn stream_chat_completions(
"name": name,
"arguments": arguments,
}
}]
}],
});
if let Some(reasoning) = reasoning_by_anchor_index.get(&idx)
&& let Some(obj) = msg.as_object_mut()
{
obj.insert("reasoning".to_string(), json!(reasoning));
}
messages.push(msg);
}
ResponseItem::LocalShellCall {
id,
call_id: _,
status,
action,
} => {
// Confirm with API team.
let mut msg = json!({
"role": "assistant",
"content": null,
@@ -254,13 +255,16 @@ pub(crate) async fn stream_chat_completions(
"action": action,
}]
});
if let Some(reasoning) = reasoning_by_anchor_index.get(&idx)
&& let Some(obj) = msg.as_object_mut()
{
obj.insert("reasoning".to_string(), json!(reasoning));
}
messages.push(msg);
}
ResponseItem::FunctionCallOutput { call_id, output } => {
// Prefer structured content items when available (e.g., images)
// otherwise fall back to the legacy plain-string content.
@@ -287,6 +291,7 @@ pub(crate) async fn stream_chat_completions(
"content": content_value,
}));
}
ResponseItem::CustomToolCall {
id,
call_id: _,
@@ -304,7 +309,7 @@ pub(crate) async fn stream_chat_completions(
"name": name,
"input": input,
}
}]
}],
}));
}
ResponseItem::CustomToolCallOutput { call_id, output } => {
@@ -314,117 +319,148 @@ pub(crate) async fn stream_chat_completions(
"content": output,
}));
}
ResponseItem::GhostSnapshot { .. } => {
// Ghost snapshots annotate history but are not sent to the model.
ResponseItem::Reasoning { .. } => {
// Omit from conversation history; reasoning is attached to anchors above.
continue;
}
ResponseItem::Reasoning { .. }
| ResponseItem::WebSearchCall { .. }
| ResponseItem::Other => {
// Omit these items from the conversation history.
ResponseItem::WebSearchCall { .. } | ResponseItem::Other => {
continue;
}
ResponseItem::GhostSnapshot { .. } => {
// Ghost snapshots annotate history but are not sent to the model.
continue;
}
}
}
let tools_json = create_tools_json_for_chat_completions_api(&prompt.tools)?;
let payload = json!({
let mut body = json!({
"model": model_family.slug,
"messages": messages,
"stream": true,
"tools": tools_json,
"stream_options": {
"include_usage": true,
},
});
debug!(
if !tools_json.is_empty() {
body["tools"] = json!(tools_json);
body["tool_choice"] = json!("auto");
}
if let SessionSource::SubAgent(sub) = session_source {
let subagent = crate::client::types::subagent_label(sub);
body["metadata"] = json!({
"x-openai-subagent": subagent,
});
}
let max_attempts = provider.request_max_retries();
retry_stream(max_attempts, |attempt| {
let body = body.clone();
async move {
stream_single_chat_completion(attempt, client, provider, otel_event_manager, body)
.await
.map_err(ChatStreamError::Retryable)
}
})
.await
}
async fn stream_single_chat_completion(
attempt: u64,
client: &CodexHttpClient,
provider: &ModelProviderInfo,
otel_event_manager: &OtelEventManager,
body: serde_json::Value,
) -> Result<ResponseStream> {
trace!(
"POST to {}: {}",
provider.get_full_url(&None),
payload.to_string()
body.to_string()
);
let mut attempt = 0;
let max_retries = provider.request_max_retries();
loop {
attempt += 1;
let mut req_builder = provider.create_request_builder(client, &None).await?;
req_builder = req_builder
.header(reqwest::header::ACCEPT, "text/event-stream")
.json(&body);
let mut req_builder = provider.create_request_builder(client, &None).await?;
let res = otel_event_manager
.log_request(attempt, || req_builder.send())
.await;
// Include subagent header only for subagent sessions.
if let SessionSource::SubAgent(sub) = session_source.clone() {
let subagent = if let SubAgentSource::Other(label) = sub {
label
} else {
serde_json::to_value(&sub)
.ok()
.and_then(|v| v.as_str().map(std::string::ToString::to_string))
.unwrap_or_else(|| "other".to_string())
};
req_builder = req_builder.header("x-openai-subagent", subagent);
let mut request_id = None;
if let Ok(resp) = &res {
request_id = resp
.headers()
.get("cf-ray")
.map(|v| v.to_str().unwrap_or_default().to_string());
}
match res {
Ok(resp) if resp.status().is_success() => {
let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent>>(1600);
// spawn task to process SSE
let stream = resp.bytes_stream().map_err(move |e| {
CodexErr::ResponseStreamFailed(ResponseStreamFailed {
source: e,
request_id: request_id.clone(),
})
});
tokio::spawn(process_chat_sse(
stream,
tx_event,
provider.stream_idle_timeout(),
otel_event_manager.clone(),
));
Ok(ResponseStream { rx_event })
}
Ok(res) => {
let status = res.status();
let res = otel_event_manager
.log_request(attempt, || {
req_builder
.header(reqwest::header::ACCEPT, "text/event-stream")
.json(&payload)
.send()
})
.await;
match res {
Ok(resp) if resp.status().is_success() => {
let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent>>(1600);
let stream = resp.bytes_stream().map_err(|e| {
CodexErr::ResponseStreamFailed(ResponseStreamFailed {
source: e,
request_id: None,
})
});
tokio::spawn(process_chat_sse(
stream,
tx_event,
provider.stream_idle_timeout(),
otel_event_manager.clone(),
));
return Ok(ResponseStream { rx_event });
if !(status == StatusCode::TOO_MANY_REQUESTS
|| status == StatusCode::UNAUTHORIZED
|| status.is_server_error())
{
// Surface the error body to callers. Use `unwrap_or_default` per Clippy.
let body = res.text().await.unwrap_or_default();
return Err(CodexErr::UnexpectedStatus(UnexpectedResponseError {
status,
body,
request_id: None,
}));
}
Ok(res) => {
let status = res.status();
if !(status == StatusCode::TOO_MANY_REQUESTS || status.is_server_error()) {
let body = (res.text().await).unwrap_or_default();
return Err(CodexErr::UnexpectedStatus(UnexpectedResponseError {
status,
body,
request_id: None,
}));
}
if attempt > max_retries {
return Err(CodexErr::RetryLimit(RetryLimitReachedError {
status,
request_id: None,
}));
}
Err(CodexErr::UnexpectedStatus(UnexpectedResponseError {
status,
body: String::new(),
request_id,
}))
}
Err(e) => Err(CodexErr::ConnectionFailed(ConnectionFailedError {
source: e,
})),
}
}
let retry_after_secs = res
.headers()
.get(reqwest::header::RETRY_AFTER)
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u64>().ok());
enum ChatStreamError {
Retryable(CodexErr),
}
let delay = retry_after_secs
.map(|s| Duration::from_millis(s * 1_000))
.unwrap_or_else(|| backoff(attempt));
tokio::time::sleep(delay).await;
}
Err(e) => {
if attempt > max_retries {
return Err(CodexErr::ConnectionFailed(ConnectionFailedError {
source: e,
}));
}
let delay = backoff(attempt);
tokio::time::sleep(delay).await;
}
impl RetryableStreamError for ChatStreamError {
fn delay(&self, attempt: u64) -> Option<Duration> {
Some(backoff(attempt))
}
fn into_error(self) -> CodexErr {
match self {
ChatStreamError::Retryable(e) => e,
}
}
}
@@ -488,9 +524,7 @@ async fn append_reasoning_text(
.await;
}
}
/// Lightweight SSE processor for the Chat Completions streaming format. The
/// output is mapped onto Codex's internal [`ResponseEvent`] so that the rest
/// of the pipeline can stay agnostic of the underlying wire format.
async fn process_chat_sse<S>(
stream: S,
tx_event: mpsc::Sender<Result<ResponseEvent>>,
@@ -519,21 +553,15 @@ async fn process_chat_sse<S>(
let mut reasoning_item: Option<ResponseItem> = None;
loop {
let start = std::time::Instant::now();
let response = timeout(idle_timeout, stream.next()).await;
let duration = start.elapsed();
otel_event_manager.log_sse_event(&response, duration);
let sse = match response {
Ok(Some(Ok(ev))) => ev,
Ok(Some(Err(e))) => {
let _ = tx_event
.send(Err(CodexErr::Stream(e.to_string(), None)))
.await;
return;
}
Ok(None) => {
// Stream closed gracefully emit Completed with dummy id.
let sse = match crate::client::sse::next_sse_event(
&mut stream,
idle_timeout,
&otel_event_manager,
)
.await
{
crate::client::sse::SseNext::Event(ev) => ev,
crate::client::sse::SseNext::Eof => {
let _ = tx_event
.send(Ok(ResponseEvent::Completed {
response_id: String::new(),
@@ -542,7 +570,11 @@ async fn process_chat_sse<S>(
.await;
return;
}
Err(_) => {
crate::client::sse::SseNext::StreamError(message) => {
let _ = tx_event.send(Err(CodexErr::Stream(message, None))).await;
return;
}
crate::client::sse::SseNext::Timeout => {
let _ = tx_event
.send(Err(CodexErr::Stream(
"idle timeout waiting for SSE".into(),
@@ -724,256 +756,3 @@ async fn process_chat_sse<S>(
}
}
}
/// Optional client-side aggregation helper
///
/// Stream adapter that merges the incremental `OutputItemDone` chunks coming from
/// [`process_chat_sse`] into a *running* assistant message, **suppressing the
/// per-token deltas**. The stream stays silent while the model is thinking
/// and only emits two events per turn:
///
/// 1. `ResponseEvent::OutputItemDone` with the *complete* assistant message
/// (fully concatenated).
/// 2. The original `ResponseEvent::Completed` right after it.
///
/// This mirrors the behaviour the TypeScript CLI exposes to its higher layers.
///
/// The adapter is intentionally *lossless*: callers who do **not** opt in via
/// [`AggregateStreamExt::aggregate()`] keep receiving the original unmodified
/// events.
#[derive(Copy, Clone, Eq, PartialEq)]
enum AggregateMode {
AggregatedOnly,
Streaming,
}
pub(crate) struct AggregatedChatStream<S> {
inner: S,
cumulative: String,
cumulative_reasoning: String,
pending: std::collections::VecDeque<ResponseEvent>,
mode: AggregateMode,
}
impl<S> Stream for AggregatedChatStream<S>
where
S: Stream<Item = Result<ResponseEvent>> + Unpin,
{
type Item = Result<ResponseEvent>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
// First, flush any buffered events from the previous call.
if let Some(ev) = this.pending.pop_front() {
return Poll::Ready(Some(Ok(ev)));
}
loop {
match Pin::new(&mut this.inner).poll_next(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone(item)))) => {
// If this is an incremental assistant message chunk, accumulate but
// do NOT emit yet. Forward any other item (e.g. FunctionCall) right
// away so downstream consumers see it.
let is_assistant_message = matches!(
&item,
codex_protocol::models::ResponseItem::Message { role, .. } if role == "assistant"
);
if is_assistant_message {
match this.mode {
AggregateMode::AggregatedOnly => {
// Only use the final assistant message if we have not
// seen any deltas; otherwise, deltas already built the
// cumulative text and this would duplicate it.
if this.cumulative.is_empty()
&& let codex_protocol::models::ResponseItem::Message {
content,
..
} = &item
&& let Some(text) = content.iter().find_map(|c| match c {
codex_protocol::models::ContentItem::OutputText {
text,
} => Some(text),
_ => None,
})
{
this.cumulative.push_str(text);
}
// Swallow assistant message here; emit on Completed.
continue;
}
AggregateMode::Streaming => {
// In streaming mode, if we have not seen any deltas, forward
// the final assistant message directly. If deltas were seen,
// suppress the final message to avoid duplication.
if this.cumulative.is_empty() {
return Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone(
item,
))));
} else {
continue;
}
}
}
}
// Not an assistant message forward immediately.
return Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone(item))));
}
Poll::Ready(Some(Ok(ResponseEvent::RateLimits(snapshot)))) => {
return Poll::Ready(Some(Ok(ResponseEvent::RateLimits(snapshot))));
}
Poll::Ready(Some(Ok(ResponseEvent::Completed {
response_id,
token_usage,
}))) => {
// Build any aggregated items in the correct order: Reasoning first, then Message.
let mut emitted_any = false;
if !this.cumulative_reasoning.is_empty()
&& matches!(this.mode, AggregateMode::AggregatedOnly)
{
let aggregated_reasoning =
codex_protocol::models::ResponseItem::Reasoning {
id: String::new(),
summary: Vec::new(),
content: Some(vec![
codex_protocol::models::ReasoningItemContent::ReasoningText {
text: std::mem::take(&mut this.cumulative_reasoning),
},
]),
encrypted_content: None,
};
this.pending
.push_back(ResponseEvent::OutputItemDone(aggregated_reasoning));
emitted_any = true;
}
// Always emit the final aggregated assistant message when any
// content deltas have been observed. In AggregatedOnly mode this
// is the sole assistant output; in Streaming mode this finalizes
// the streamed deltas into a terminal OutputItemDone so callers
// can persist/render the message once per turn.
if !this.cumulative.is_empty() {
let aggregated_message = codex_protocol::models::ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![codex_protocol::models::ContentItem::OutputText {
text: std::mem::take(&mut this.cumulative),
}],
};
this.pending
.push_back(ResponseEvent::OutputItemDone(aggregated_message));
emitted_any = true;
}
// Always emit Completed last when anything was aggregated.
if emitted_any {
this.pending.push_back(ResponseEvent::Completed {
response_id: response_id.clone(),
token_usage: token_usage.clone(),
});
// Return the first pending event now.
if let Some(ev) = this.pending.pop_front() {
return Poll::Ready(Some(Ok(ev)));
}
}
// Nothing aggregated forward Completed directly.
return Poll::Ready(Some(Ok(ResponseEvent::Completed {
response_id,
token_usage,
})));
}
Poll::Ready(Some(Ok(ResponseEvent::Created))) => {
// These events are exclusive to the Responses API and
// will never appear in a Chat Completions stream.
continue;
}
Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(delta)))) => {
// Always accumulate deltas so we can emit a final OutputItemDone at Completed.
this.cumulative.push_str(&delta);
if matches!(this.mode, AggregateMode::Streaming) {
// In streaming mode, also forward the delta immediately.
return Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(delta))));
} else {
continue;
}
}
Poll::Ready(Some(Ok(ResponseEvent::ReasoningContentDelta {
delta,
content_index,
}))) => {
// Always accumulate reasoning deltas so we can emit a final Reasoning item at Completed.
this.cumulative_reasoning.push_str(&delta);
if matches!(this.mode, AggregateMode::Streaming) {
// In streaming mode, also forward the delta immediately.
return Poll::Ready(Some(Ok(ResponseEvent::ReasoningContentDelta {
delta,
content_index,
})));
} else {
continue;
}
}
Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta { .. }))) => {
continue;
}
Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryPartAdded { .. }))) => {
continue;
}
Poll::Ready(Some(Ok(ResponseEvent::OutputItemAdded(item)))) => {
return Poll::Ready(Some(Ok(ResponseEvent::OutputItemAdded(item))));
}
}
}
}
}
/// Extension trait that activates aggregation on any stream of [`ResponseEvent`].
pub(crate) trait AggregateStreamExt: Stream<Item = Result<ResponseEvent>> + Sized {
/// Returns a new stream that emits **only** the final assistant message
/// per turn instead of every incremental delta. The produced
/// `ResponseEvent` sequence for a typical text turn looks like:
///
/// ```ignore
/// OutputItemDone(<full message>)
/// Completed
/// ```
///
/// No other `OutputItemDone` events will be seen by the caller.
///
/// Usage:
///
/// ```ignore
/// let agg_stream = client.stream(&prompt).await?.aggregate();
/// while let Some(event) = agg_stream.next().await {
/// // event now contains cumulative text
/// }
/// ```
fn aggregate(self) -> AggregatedChatStream<Self> {
AggregatedChatStream::new(self, AggregateMode::AggregatedOnly)
}
}
impl<T> AggregateStreamExt for T where T: Stream<Item = Result<ResponseEvent>> + Sized {}
impl<S> AggregatedChatStream<S> {
fn new(inner: S, mode: AggregateMode) -> Self {
AggregatedChatStream {
inner,
cumulative: String::new(),
cumulative_reasoning: String::new(),
pending: std::collections::VecDeque::new(),
mode,
}
}
pub(crate) fn streaming_mode(inner: S) -> Self {
Self::new(inner, AggregateMode::Streaming)
}
}

View File

@@ -154,11 +154,13 @@ impl CodexRequestBuilder {
.collect()
}
}
#[derive(Debug, Clone)]
pub struct Originator {
pub value: String,
pub header_value: HeaderValue,
}
static ORIGINATOR: OnceLock<Originator> = OnceLock::new();
#[derive(Debug)]

View File

@@ -0,0 +1,22 @@
mod aggregation;
mod chat_completions;
pub mod http;
mod rate_limits;
mod responses;
mod retry;
mod sse;
pub mod types;
pub(crate) use aggregation::AggregateStreamExt;
pub(crate) use aggregation::AggregatedChatStream;
pub(crate) use chat_completions::stream_chat_completions;
pub use responses::ModelClient;
pub(crate) use types::FreeformTool;
pub(crate) use types::FreeformToolFormat;
pub(crate) use types::Reasoning;
pub use types::ResponseEvent;
pub use types::ResponseStream;
pub(crate) use types::ResponsesApiRequest;
pub(crate) use types::ResponsesApiTool;
pub(crate) use types::ToolSpec;
pub(crate) use types::create_text_param_for_request;

View File

@@ -0,0 +1,86 @@
use crate::protocol::RateLimitSnapshot;
use crate::protocol::RateLimitWindow;
use chrono::Utc;
use reqwest::header::HeaderMap;
/// Prefer Codex-specific aggregate rate limit headers if present; fall back
/// to raw OpenAI-style request headers otherwise.
pub(crate) fn parse_rate_limit_snapshot(headers: &HeaderMap) -> Option<RateLimitSnapshot> {
parse_codex_rate_limits(headers).or_else(|| parse_openai_rate_limits(headers))
}
fn parse_codex_rate_limits(headers: &HeaderMap) -> Option<RateLimitSnapshot> {
fn parse_f64(headers: &HeaderMap, name: &str) -> Option<f64> {
headers
.get(name)
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<f64>().ok())
}
fn parse_i64(headers: &HeaderMap, name: &str) -> Option<i64> {
headers
.get(name)
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<i64>().ok())
}
let primary_used = parse_f64(headers, "x-codex-primary-used-percent");
let secondary_used = parse_f64(headers, "x-codex-secondary-used-percent");
if primary_used.is_none() && secondary_used.is_none() {
return None;
}
let primary = primary_used.map(|used_percent| RateLimitWindow {
used_percent,
window_minutes: parse_i64(headers, "x-codex-primary-window-minutes"),
resets_at: parse_i64(headers, "x-codex-primary-reset-at"),
});
let secondary = secondary_used.map(|used_percent| RateLimitWindow {
used_percent,
window_minutes: parse_i64(headers, "x-codex-secondary-window-minutes"),
resets_at: parse_i64(headers, "x-codex-secondary-reset-at"),
});
Some(RateLimitSnapshot { primary, secondary })
}
fn parse_openai_rate_limits(headers: &HeaderMap) -> Option<RateLimitSnapshot> {
let limit = headers.get("x-ratelimit-limit-requests")?;
let remaining = headers.get("x-ratelimit-remaining-requests")?;
let reset_ms = headers.get("x-ratelimit-reset-requests")?;
let limit = limit.to_str().ok()?.parse::<f64>().ok()?;
let remaining = remaining.to_str().ok()?.parse::<f64>().ok()?;
let reset_ms = reset_ms.to_str().ok()?.parse::<i64>().ok()?;
if limit <= 0.0 {
return None;
}
let used = (limit - remaining).max(0.0);
let used_percent = (used / limit) * 100.0;
let window_minutes = if reset_ms <= 0 {
None
} else {
let seconds = reset_ms / 1000;
Some((seconds + 59) / 60)
};
let resets_at = if reset_ms > 0 {
Some(Utc::now().timestamp() + reset_ms / 1000)
} else {
None
};
Some(RateLimitSnapshot {
primary: Some(RateLimitWindow {
used_percent,
window_minutes,
resets_at,
}),
secondary: None,
})
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,140 @@
use std::time::Duration;
use crate::error::CodexErr;
use crate::error::Result;
/// Common interface for classifying stream start errors as retryable or fatal.
pub(crate) trait RetryableStreamError {
/// Returns a delay for the next retry attempt, or `None` if the error
/// should be treated as fatal and not retried.
fn delay(&self, attempt: u64) -> Option<Duration>;
/// Converts this error into the final `CodexErr` that should be surfaced
/// to callers when retries are exhausted or the error is fatal.
fn into_error(self) -> CodexErr;
}
/// Helper to retry a streaming operation with provider-configured backoff.
///
/// The caller supplies an `attempt_fn` that is invoked once per attempt with
/// the current attempt index in `[0, max_attempts]`. On success, the value is
/// returned immediately. On error, the error's [`RetryableStreamError`]
/// implementation decides whether to retry (with an optional delay) or to
/// surface a final error.
pub(crate) async fn retry_stream<F, Fut, T, E>(max_attempts: u64, mut attempt_fn: F) -> Result<T>
where
F: FnMut(u64) -> Fut,
Fut: std::future::Future<Output = std::result::Result<T, E>>,
E: RetryableStreamError,
{
for attempt in 0..=max_attempts {
match attempt_fn(attempt).await {
Ok(value) => return Ok(value),
Err(err) => {
let delay = err.delay(attempt);
// Fatal error or final attempt: surface to caller.
if attempt == max_attempts || delay.is_none() {
return Err(err.into_error());
}
if let Some(duration) = delay {
tokio::time::sleep(duration).await;
}
}
}
}
unreachable!("retry_stream should always return");
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
#[derive(Clone)]
struct TestError {
fatal: bool,
}
impl RetryableStreamError for TestError {
fn delay(&self, attempt: u64) -> Option<Duration> {
if self.fatal {
None
} else {
Some(Duration::from_millis(attempt * 10))
}
}
fn into_error(self) -> CodexErr {
if self.fatal {
CodexErr::InternalServerError
} else {
CodexErr::Io(std::io::Error::other("retryable"))
}
}
}
#[tokio::test]
async fn retries_until_success_before_max_attempts() {
let max_attempts = 3;
let result: Result<&str> = retry_stream(max_attempts, |attempt| async move {
if attempt < 2 {
Err(TestError { fatal: false })
} else {
Ok("ok")
}
})
.await;
assert_eq!(result.unwrap(), "ok");
}
#[tokio::test]
async fn stops_on_fatal_error_without_retrying() {
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
let calls = Arc::new(AtomicUsize::new(0));
let calls_ref = calls.clone();
let result: Result<()> = retry_stream(5, move |_attempt| {
let calls_ref = calls_ref.clone();
async move {
calls_ref.fetch_add(1, Ordering::SeqCst);
Err(TestError { fatal: true })
}
})
.await;
assert!(result.is_err());
assert_eq!(calls.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn stops_after_max_attempts_for_retryable_errors() {
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
let calls = Arc::new(AtomicUsize::new(0));
let calls_ref = calls.clone();
let max_attempts = 2;
let result: Result<()> = retry_stream(max_attempts, move |_attempt| {
let calls_ref = calls_ref.clone();
async move {
calls_ref.fetch_add(1, Ordering::SeqCst);
Err(TestError { fatal: false })
}
})
.await;
assert!(result.is_err());
assert_eq!(calls.load(Ordering::SeqCst), (max_attempts + 1) as usize);
}
}

View File

@@ -0,0 +1,46 @@
use std::time::Duration;
use codex_otel::otel_event_manager::OtelEventManager;
use eventsource_stream::Event;
use eventsource_stream::EventStreamError as StreamError;
use futures::Stream;
use futures::StreamExt;
use tokio::time::timeout;
/// Result of polling the next SSE event with timeout and logging applied.
pub(crate) enum SseNext {
Event(Event),
Eof,
StreamError(String),
Timeout,
}
/// Read the next SSE event from `stream`, applying an idle timeout and recording
/// telemetry via `otel_event_manager`.
///
/// This helper centralizes the boilerplate for:
/// - `tokio::time::timeout`
/// - calling `log_sse_event`
/// - mapping the different outcomes into a small enum that callers can
/// interpret according to their own protocol semantics.
pub(crate) async fn next_sse_event<S, E>(
stream: &mut S,
idle_timeout: Duration,
otel_event_manager: &OtelEventManager,
) -> SseNext
where
S: Stream<Item = Result<Event, StreamError<E>>> + Unpin,
E: std::fmt::Display,
{
let start = tokio::time::Instant::now();
let next_event = timeout(idle_timeout, stream.next()).await;
let duration = start.elapsed();
otel_event_manager.log_sse_event(&next_event, duration);
match next_event {
Ok(Some(Ok(ev))) => SseNext::Event(ev),
Ok(Some(Err(e))) => SseNext::StreamError(e.to_string()),
Ok(None) => SseNext::Eof,
Err(_) => SseNext::Timeout,
}
}

View File

@@ -0,0 +1,327 @@
use crate::error::Result;
use crate::protocol::RateLimitSnapshot;
use crate::protocol::SubAgentSource;
use crate::protocol::TokenUsage;
use crate::tools::spec::JsonSchema;
use codex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig;
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
use codex_protocol::config_types::Verbosity as VerbosityConfig;
use codex_protocol::models::ResponseItem;
use futures::Stream;
use serde::Serialize;
use serde_json::Value;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use tokio::sync::mpsc;
#[derive(Debug)]
pub enum ResponseEvent {
Created,
OutputItemDone(ResponseItem),
OutputItemAdded(ResponseItem),
Completed {
response_id: String,
token_usage: Option<TokenUsage>,
},
OutputTextDelta(String),
ReasoningSummaryDelta {
delta: String,
summary_index: i64,
},
ReasoningContentDelta {
delta: String,
content_index: i64,
},
ReasoningSummaryPartAdded {
summary_index: i64,
},
RateLimits(RateLimitSnapshot),
}
pub(crate) fn subagent_label(sub: &SubAgentSource) -> String {
if let SubAgentSource::Other(label) = sub {
label.clone()
} else {
serde_json::to_value(sub)
.ok()
.and_then(|v| v.as_str().map(std::string::ToString::to_string))
.unwrap_or_else(|| "other".to_string())
}
}
#[derive(Debug, Serialize)]
pub(crate) struct Reasoning {
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) effort: Option<ReasoningEffortConfig>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) summary: Option<ReasoningSummaryConfig>,
}
#[derive(Debug, Serialize, Default, Clone)]
#[serde(rename_all = "snake_case")]
pub(crate) enum TextFormatType {
#[default]
JsonSchema,
}
#[derive(Debug, Serialize, Default, Clone)]
pub(crate) struct TextFormat {
pub(crate) r#type: TextFormatType,
pub(crate) strict: bool,
pub(crate) schema: Value,
pub(crate) name: String,
}
/// Controls under the `text` field in the Responses API for GPT-5.
#[derive(Debug, Serialize, Default, Clone)]
pub(crate) struct TextControls {
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) verbosity: Option<OpenAiVerbosity>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) format: Option<TextFormat>,
}
#[derive(Debug, Serialize, Default, Clone)]
#[serde(rename_all = "lowercase")]
pub(crate) enum OpenAiVerbosity {
Low,
#[default]
Medium,
High,
}
impl From<VerbosityConfig> for OpenAiVerbosity {
fn from(v: VerbosityConfig) -> Self {
match v {
VerbosityConfig::Low => OpenAiVerbosity::Low,
VerbosityConfig::Medium => OpenAiVerbosity::Medium,
VerbosityConfig::High => OpenAiVerbosity::High,
}
}
}
/// Request object that is serialized as JSON and POST'ed when using the
/// Responses API.
#[derive(Debug, Serialize)]
pub(crate) struct ResponsesApiRequest<'a> {
pub(crate) model: &'a str,
pub(crate) instructions: &'a str,
// TODO(mbolin): ResponseItem::Other should not be serialized. Currently,
// we code defensively to avoid this case, but perhaps we should use a
// separate enum for serialization.
pub(crate) input: &'a Vec<ResponseItem>,
pub(crate) tools: &'a [serde_json::Value],
pub(crate) tool_choice: &'static str,
pub(crate) parallel_tool_calls: bool,
pub(crate) reasoning: Option<Reasoning>,
pub(crate) store: bool,
pub(crate) stream: bool,
pub(crate) include: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) prompt_cache_key: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) text: Option<TextControls>,
}
pub(crate) mod tools {
use super::JsonSchema;
use serde::Deserialize;
use serde::Serialize;
/// When serialized as JSON, this produces a valid "Tool" in the OpenAI
/// Responses API.
#[derive(Debug, Clone, Serialize, PartialEq)]
#[serde(tag = "type")]
pub(crate) enum ToolSpec {
#[serde(rename = "function")]
Function(ResponsesApiTool),
#[serde(rename = "local_shell")]
LocalShell {},
// TODO: Understand why we get an error on web_search although the API docs say it's supported.
// https://platform.openai.com/docs/guides/tools-web-search?api-mode=responses#:~:text=%7B%20type%3A%20%22web_search%22%20%7D%2C
#[serde(rename = "web_search")]
WebSearch {},
#[serde(rename = "custom")]
Freeform(FreeformTool),
}
impl ToolSpec {
pub(crate) fn name(&self) -> &str {
match self {
ToolSpec::Function(tool) => tool.name.as_str(),
ToolSpec::LocalShell {} => "local_shell",
ToolSpec::WebSearch {} => "web_search",
ToolSpec::Freeform(tool) => tool.name.as_str(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct FreeformTool {
pub(crate) name: String,
pub(crate) description: String,
pub(crate) format: FreeformToolFormat,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct FreeformToolFormat {
pub(crate) r#type: String,
pub(crate) syntax: String,
pub(crate) definition: String,
}
#[derive(Debug, Clone, Serialize, PartialEq)]
pub struct ResponsesApiTool {
pub(crate) name: String,
pub(crate) description: String,
/// TODO: Validation. When strict is set to true, the JSON schema,
/// `required` and `additional_properties` must be present. All fields in
/// `properties` must be present in `required`.
pub(crate) strict: bool,
pub(crate) parameters: JsonSchema,
}
}
pub(crate) use tools::FreeformTool;
pub(crate) use tools::FreeformToolFormat;
pub(crate) use tools::ResponsesApiTool;
pub(crate) use tools::ToolSpec;
pub(crate) fn create_text_param_for_request(
verbosity: Option<VerbosityConfig>,
output_schema: &Option<Value>,
) -> Option<TextControls> {
if verbosity.is_none() && output_schema.is_none() {
return None;
}
Some(TextControls {
verbosity: verbosity.map(std::convert::Into::into),
format: output_schema.as_ref().map(|schema| TextFormat {
r#type: TextFormatType::JsonSchema,
strict: true,
schema: schema.clone(),
name: "codex_output_schema".to_string(),
}),
})
}
pub struct ResponseStream {
pub(crate) rx_event: mpsc::Receiver<Result<ResponseEvent>>,
}
impl Stream for ResponseStream {
type Item = Result<ResponseEvent>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.rx_event.poll_recv(cx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use codex_protocol::models::ResponseItem;
use pretty_assertions::assert_eq;
#[test]
fn serializes_text_verbosity_when_set() {
let input: Vec<ResponseItem> = vec![];
let tools: Vec<serde_json::Value> = vec![];
let req = ResponsesApiRequest {
model: "gpt-5",
instructions: "i",
input: &input,
tools: &tools,
tool_choice: "auto",
parallel_tool_calls: true,
reasoning: None,
store: false,
stream: true,
include: vec![],
prompt_cache_key: None,
text: Some(TextControls {
verbosity: Some(OpenAiVerbosity::Low),
format: None,
}),
};
let v = serde_json::to_value(&req).expect("json");
assert_eq!(
v.get("text")
.and_then(|t| t.get("verbosity"))
.and_then(|s| s.as_str()),
Some("low")
);
}
#[test]
fn serializes_text_schema_with_strict_format() {
let input: Vec<ResponseItem> = vec![];
let tools: Vec<serde_json::Value> = vec![];
let schema = serde_json::json!({
"type": "object",
"properties": {
"answer": {"type": "string"}
},
"required": ["answer"],
});
let text_controls =
create_text_param_for_request(None, &Some(schema.clone())).expect("text controls");
let req = ResponsesApiRequest {
model: "gpt-5",
instructions: "i",
input: &input,
tools: &tools,
tool_choice: "auto",
parallel_tool_calls: true,
reasoning: None,
store: false,
stream: true,
include: vec![],
prompt_cache_key: None,
text: Some(text_controls),
};
let v = serde_json::to_value(&req).expect("json");
let text = v.get("text").expect("text field");
assert!(text.get("verbosity").is_none());
let format = text.get("format").expect("format field");
assert_eq!(
format.get("name"),
Some(&serde_json::Value::String("codex_output_schema".into()))
);
assert_eq!(
format.get("type"),
Some(&serde_json::Value::String("json_schema".into()))
);
assert_eq!(format.get("strict"), Some(&serde_json::Value::Bool(true)));
assert_eq!(format.get("schema"), Some(&schema));
}
#[test]
fn omits_text_when_not_set() {
let input: Vec<ResponseItem> = vec![];
let tools: Vec<serde_json::Value> = vec![];
let req = ResponsesApiRequest {
model: "gpt-5",
instructions: "i",
input: &input,
tools: &tools,
tool_choice: "auto",
parallel_tool_calls: true,
reasoning: None,
store: false,
stream: true,
include: vec![],
prompt_cache_key: None,
text: None,
};
let v = serde_json::to_value(&req).expect("json");
assert!(v.get("text").is_none());
}
}

View File

@@ -1,24 +1,12 @@
use crate::client_common::tools::ToolSpec;
use crate::error::Result;
use crate::client::ToolSpec;
use crate::model_family::ModelFamily;
use crate::protocol::RateLimitSnapshot;
use crate::protocol::TokenUsage;
use codex_apply_patch::APPLY_PATCH_TOOL_INSTRUCTIONS;
use codex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig;
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
use codex_protocol::config_types::Verbosity as VerbosityConfig;
use codex_protocol::models::ResponseItem;
use futures::Stream;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value;
use std::borrow::Cow;
use std::collections::HashSet;
use std::ops::Deref;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use tokio::sync::mpsc;
/// Review thread system prompt. Edit `core/src/review_prompt.md` to customize.
pub const REVIEW_PROMPT: &str = include_str!("../review_prompt.md");
@@ -193,194 +181,6 @@ fn strip_total_output_header(output: &str) -> Option<&str> {
Some(remainder)
}
#[derive(Debug)]
pub enum ResponseEvent {
Created,
OutputItemDone(ResponseItem),
OutputItemAdded(ResponseItem),
Completed {
response_id: String,
token_usage: Option<TokenUsage>,
},
OutputTextDelta(String),
ReasoningSummaryDelta {
delta: String,
summary_index: i64,
},
ReasoningContentDelta {
delta: String,
content_index: i64,
},
ReasoningSummaryPartAdded {
summary_index: i64,
},
RateLimits(RateLimitSnapshot),
}
#[derive(Debug, Serialize)]
pub(crate) struct Reasoning {
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) effort: Option<ReasoningEffortConfig>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) summary: Option<ReasoningSummaryConfig>,
}
#[derive(Debug, Serialize, Default, Clone)]
#[serde(rename_all = "snake_case")]
pub(crate) enum TextFormatType {
#[default]
JsonSchema,
}
#[derive(Debug, Serialize, Default, Clone)]
pub(crate) struct TextFormat {
pub(crate) r#type: TextFormatType,
pub(crate) strict: bool,
pub(crate) schema: Value,
pub(crate) name: String,
}
/// Controls under the `text` field in the Responses API for GPT-5.
#[derive(Debug, Serialize, Default, Clone)]
pub(crate) struct TextControls {
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) verbosity: Option<OpenAiVerbosity>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) format: Option<TextFormat>,
}
#[derive(Debug, Serialize, Default, Clone)]
#[serde(rename_all = "lowercase")]
pub(crate) enum OpenAiVerbosity {
Low,
#[default]
Medium,
High,
}
impl From<VerbosityConfig> for OpenAiVerbosity {
fn from(v: VerbosityConfig) -> Self {
match v {
VerbosityConfig::Low => OpenAiVerbosity::Low,
VerbosityConfig::Medium => OpenAiVerbosity::Medium,
VerbosityConfig::High => OpenAiVerbosity::High,
}
}
}
/// Request object that is serialized as JSON and POST'ed when using the
/// Responses API.
#[derive(Debug, Serialize)]
pub(crate) struct ResponsesApiRequest<'a> {
pub(crate) model: &'a str,
pub(crate) instructions: &'a str,
// TODO(mbolin): ResponseItem::Other should not be serialized. Currently,
// we code defensively to avoid this case, but perhaps we should use a
// separate enum for serialization.
pub(crate) input: &'a Vec<ResponseItem>,
pub(crate) tools: &'a [serde_json::Value],
pub(crate) tool_choice: &'static str,
pub(crate) parallel_tool_calls: bool,
pub(crate) reasoning: Option<Reasoning>,
pub(crate) store: bool,
pub(crate) stream: bool,
pub(crate) include: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) prompt_cache_key: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) text: Option<TextControls>,
}
pub(crate) mod tools {
use crate::tools::spec::JsonSchema;
use serde::Deserialize;
use serde::Serialize;
/// When serialized as JSON, this produces a valid "Tool" in the OpenAI
/// Responses API.
#[derive(Debug, Clone, Serialize, PartialEq)]
#[serde(tag = "type")]
pub(crate) enum ToolSpec {
#[serde(rename = "function")]
Function(ResponsesApiTool),
#[serde(rename = "local_shell")]
LocalShell {},
// TODO: Understand why we get an error on web_search although the API docs say it's supported.
// https://platform.openai.com/docs/guides/tools-web-search?api-mode=responses#:~:text=%7B%20type%3A%20%22web_search%22%20%7D%2C
#[serde(rename = "web_search")]
WebSearch {},
#[serde(rename = "custom")]
Freeform(FreeformTool),
}
impl ToolSpec {
pub(crate) fn name(&self) -> &str {
match self {
ToolSpec::Function(tool) => tool.name.as_str(),
ToolSpec::LocalShell {} => "local_shell",
ToolSpec::WebSearch {} => "web_search",
ToolSpec::Freeform(tool) => tool.name.as_str(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct FreeformTool {
pub(crate) name: String,
pub(crate) description: String,
pub(crate) format: FreeformToolFormat,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct FreeformToolFormat {
pub(crate) r#type: String,
pub(crate) syntax: String,
pub(crate) definition: String,
}
#[derive(Debug, Clone, Serialize, PartialEq)]
pub struct ResponsesApiTool {
pub(crate) name: String,
pub(crate) description: String,
/// TODO: Validation. When strict is set to true, the JSON schema,
/// `required` and `additional_properties` must be present. All fields in
/// `properties` must be present in `required`.
pub(crate) strict: bool,
pub(crate) parameters: JsonSchema,
}
}
pub(crate) fn create_text_param_for_request(
verbosity: Option<VerbosityConfig>,
output_schema: &Option<Value>,
) -> Option<TextControls> {
if verbosity.is_none() && output_schema.is_none() {
return None;
}
Some(TextControls {
verbosity: verbosity.map(std::convert::Into::into),
format: output_schema.as_ref().map(|schema| TextFormat {
r#type: TextFormatType::JsonSchema,
strict: true,
schema: schema.clone(),
name: "codex_output_schema".to_string(),
}),
})
}
pub struct ResponseStream {
pub(crate) rx_event: mpsc::Receiver<Result<ResponseEvent>>,
}
impl Stream for ResponseStream {
type Item = Result<ResponseEvent>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.rx_event.poll_recv(cx)
}
}
#[cfg(test)]
mod tests {
use crate::model_family::find_family_for_model;
@@ -451,104 +251,4 @@ mod tests {
assert_eq!(full, expected);
}
}
#[test]
fn serializes_text_verbosity_when_set() {
let input: Vec<ResponseItem> = vec![];
let tools: Vec<serde_json::Value> = vec![];
let req = ResponsesApiRequest {
model: "gpt-5",
instructions: "i",
input: &input,
tools: &tools,
tool_choice: "auto",
parallel_tool_calls: true,
reasoning: None,
store: false,
stream: true,
include: vec![],
prompt_cache_key: None,
text: Some(TextControls {
verbosity: Some(OpenAiVerbosity::Low),
format: None,
}),
};
let v = serde_json::to_value(&req).expect("json");
assert_eq!(
v.get("text")
.and_then(|t| t.get("verbosity"))
.and_then(|s| s.as_str()),
Some("low")
);
}
#[test]
fn serializes_text_schema_with_strict_format() {
let input: Vec<ResponseItem> = vec![];
let tools: Vec<serde_json::Value> = vec![];
let schema = serde_json::json!({
"type": "object",
"properties": {
"answer": {"type": "string"}
},
"required": ["answer"],
});
let text_controls =
create_text_param_for_request(None, &Some(schema.clone())).expect("text controls");
let req = ResponsesApiRequest {
model: "gpt-5",
instructions: "i",
input: &input,
tools: &tools,
tool_choice: "auto",
parallel_tool_calls: true,
reasoning: None,
store: false,
stream: true,
include: vec![],
prompt_cache_key: None,
text: Some(text_controls),
};
let v = serde_json::to_value(&req).expect("json");
let text = v.get("text").expect("text field");
assert!(text.get("verbosity").is_none());
let format = text.get("format").expect("format field");
assert_eq!(
format.get("name"),
Some(&serde_json::Value::String("codex_output_schema".into()))
);
assert_eq!(
format.get("type"),
Some(&serde_json::Value::String("json_schema".into()))
);
assert_eq!(format.get("strict"), Some(&serde_json::Value::Bool(true)));
assert_eq!(format.get("schema"), Some(&schema));
}
#[test]
fn omits_text_when_not_set() {
let input: Vec<ResponseItem> = vec![];
let tools: Vec<serde_json::Value> = vec![];
let req = ResponsesApiRequest {
model: "gpt-5",
instructions: "i",
input: &input,
tools: &tools,
tool_choice: "auto",
parallel_tool_calls: true,
reasoning: None,
store: false,
stream: true,
include: vec![],
prompt_cache_key: None,
text: None,
};
let v = serde_json::to_value(&req).expect("json");
assert!(v.get("text").is_none());
}
}

View File

@@ -5,6 +5,7 @@ use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use crate::AuthManager;
use crate::ResponseEvent;
use crate::client_common::REVIEW_PROMPT;
use crate::compact;
use crate::features::Feature;
@@ -55,7 +56,6 @@ use tracing::warn;
use crate::ModelProviderInfo;
use crate::client::ModelClient;
use crate::client_common::Prompt;
use crate::client_common::ResponseEvent;
use crate::config::Config;
use crate::config::types::McpServerTransportConfig;
use crate::config::types::ShellEnvironmentPolicy;

View File

@@ -1,7 +1,7 @@
use std::sync::Arc;
use crate::Prompt;
use crate::client_common::ResponseEvent;
use crate::ResponseEvent;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::codex::get_last_assistant_message_from_turn;

View File

@@ -8,8 +8,7 @@
mod apply_patch;
pub mod auth;
pub mod bash;
mod chat_completions;
mod client;
pub mod client;
mod client_common;
pub mod codex;
mod codex_conversation;
@@ -54,7 +53,6 @@ pub use conversation_manager::NewConversation;
// Re-export common auth types for workspace consumers
pub use auth::AuthManager;
pub use auth::CodexAuth;
pub mod default_client;
pub mod model_family;
mod openai_model_info;
pub mod project_doc;
@@ -96,10 +94,10 @@ pub use codex_protocol::protocol;
pub use codex_protocol::config_types as protocol_config_types;
pub use client::ModelClient;
pub use client::ResponseEvent;
pub use client::ResponseStream;
pub use client_common::Prompt;
pub use client_common::REVIEW_PROMPT;
pub use client_common::ResponseEvent;
pub use client_common::ResponseStream;
pub use codex_protocol::models::ContentItem;
pub use codex_protocol::models::LocalShellAction;
pub use codex_protocol::models::LocalShellExecAction;

View File

@@ -6,16 +6,15 @@
//! key. These override or extend the defaults at runtime.
use crate::CodexAuth;
use crate::default_client::CodexHttpClient;
use crate::default_client::CodexRequestBuilder;
use crate::client::http::CodexHttpClient;
use crate::client::http::CodexRequestBuilder;
use crate::error::EnvVarError;
use codex_app_server_protocol::AuthMode;
use serde::Deserialize;
use serde::Serialize;
use std::collections::HashMap;
use std::env::VarError;
use std::time::Duration;
use crate::error::EnvVarError;
const DEFAULT_STREAM_IDLE_TIMEOUT_MS: u64 = 300_000;
const DEFAULT_STREAM_MAX_RETRIES: u64 = 5;
const DEFAULT_REQUEST_MAX_RETRIES: u64 = 4;

View File

@@ -1,7 +1,7 @@
use crate::client::http::originator;
use crate::config::Config;
use crate::config::types::OtelExporterKind as Kind;
use crate::config::types::OtelHttpProtocol as Protocol;
use crate::default_client::originator;
use codex_otel::config::OtelExporter;
use codex_otel::config::OtelHttpProtocol;
use codex_otel::config::OtelSettings;

View File

@@ -23,8 +23,8 @@ use super::list::ConversationsPage;
use super::list::Cursor;
use super::list::get_conversations;
use super::policy::is_persisted_response_item;
use crate::client::http::originator;
use crate::config::Config;
use crate::default_client::originator;
use crate::git_info::collect_git_info;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::ResumedHistory;

View File

@@ -6,9 +6,9 @@ use std::time::Instant;
use crate::AuthManager;
use crate::ModelProviderInfo;
use crate::ResponseEvent;
use crate::client::ModelClient;
use crate::client_common::Prompt;
use crate::client_common::ResponseEvent;
use crate::config::Config;
use crate::protocol::SandboxPolicy;
use askama::Template;

View File

@@ -3,10 +3,10 @@ use std::collections::BTreeMap;
use crate::apply_patch;
use crate::apply_patch::InternalApplyPatchInvocation;
use crate::apply_patch::convert_apply_patch_to_protocol;
use crate::client_common::tools::FreeformTool;
use crate::client_common::tools::FreeformToolFormat;
use crate::client_common::tools::ResponsesApiTool;
use crate::client_common::tools::ToolSpec;
use crate::client::FreeformTool;
use crate::client::FreeformToolFormat;
use crate::client::ResponsesApiTool;
use crate::client::ToolSpec;
use crate::function_tool::FunctionCallError;
use crate::tools::context::ToolInvocation;
use crate::tools::context::ToolOutput;

View File

@@ -1,5 +1,5 @@
use crate::client_common::tools::ResponsesApiTool;
use crate::client_common::tools::ToolSpec;
use crate::client::ResponsesApiTool;
use crate::client::ToolSpec;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::function_tool::FunctionCallError;

View File

@@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use crate::client_common::tools::ToolSpec;
use crate::client::ToolSpec;
use crate::function_tool::FunctionCallError;
use crate::tools::context::ToolInvocation;
use crate::tools::context::ToolOutput;

View File

@@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::sync::Arc;
use crate::client_common::tools::ToolSpec;
use crate::client::ToolSpec;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::function_tool::FunctionCallError;

View File

@@ -1,5 +1,5 @@
use crate::client_common::tools::ResponsesApiTool;
use crate::client_common::tools::ToolSpec;
use crate::client::ResponsesApiTool;
use crate::client::ToolSpec;
use crate::features::Feature;
use crate::features::Features;
use crate::model_family::ModelFamily;
@@ -1074,7 +1074,7 @@ pub(crate) fn build_specs(
#[cfg(test)]
mod tests {
use crate::client_common::tools::FreeformTool;
use crate::client::FreeformTool;
use crate::model_family::find_family_for_model;
use crate::tools::registry::ConfiguredToolSpec;
use mcp_types::ToolInputSchema;

View File

@@ -10,14 +10,19 @@ mod event_processor_with_human_output;
pub mod event_processor_with_jsonl_output;
pub mod exec_events;
use crate::cli::Command as ExecCommand;
use crate::event_processor::CodexStatus;
use crate::event_processor::EventProcessor;
pub use cli::Cli;
use codex_core::AuthManager;
use codex_core::BUILT_IN_OSS_MODEL_PROVIDER_ID;
use codex_core::ConversationManager;
use codex_core::NewConversation;
use codex_core::auth::enforce_login_restrictions;
use codex_core::client::http::set_default_originator;
use codex_core::config::Config;
use codex_core::config::ConfigOverrides;
use codex_core::find_conversation_path_by_id_str;
use codex_core::git_info::get_git_repo_root;
use codex_core::protocol::AskForApproval;
use codex_core::protocol::Event;
@@ -41,12 +46,6 @@ use tracing::info;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::prelude::*;
use crate::cli::Command as ExecCommand;
use crate::event_processor::CodexStatus;
use crate::event_processor::EventProcessor;
use codex_core::default_client::set_default_originator;
use codex_core::find_conversation_path_by_id_str;
pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> anyhow::Result<()> {
if let Err(err) = set_default_originator("codex_exec".to_string()) {
tracing::warn!(?err, "Failed to set codex exec originator override {err:?}");

View File

@@ -17,7 +17,7 @@ use chrono::Utc;
use codex_core::auth::AuthCredentialsStoreMode;
use codex_core::auth::AuthDotJson;
use codex_core::auth::save_auth;
use codex_core::default_client::originator;
use codex_core::client::http::originator;
use codex_core::token_data::TokenData;
use codex_core::token_data::parse_id_token;
use rand::RngCore;

View File

@@ -12,9 +12,9 @@ use codex_protocol::protocol::SessionSource;
use codex_core::AuthManager;
use codex_core::ConversationManager;
use codex_core::client::http::USER_AGENT_SUFFIX;
use codex_core::client::http::get_codex_user_agent;
use codex_core::config::Config;
use codex_core::default_client::USER_AGENT_SUFFIX;
use codex_core::default_client::get_codex_user_agent;
use codex_core::protocol::Submission;
use mcp_types::CallToolRequestParams;
use mcp_types::CallToolResult;