mirror of
https://github.com/openai/codex.git
synced 2026-02-01 22:47:52 +00:00
Compare commits
14 Commits
main
...
jif/client
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a03b55af5c | ||
|
|
c26ffe82f9 | ||
|
|
cf15b86f5c | ||
|
|
c8a6e4ddd7 | ||
|
|
c77254d070 | ||
|
|
d2e876403a | ||
|
|
e4e627d1a3 | ||
|
|
29d93176b6 | ||
|
|
1cd9e24232 | ||
|
|
2790ddff0a | ||
|
|
38b568b504 | ||
|
|
def4a996c4 | ||
|
|
c2347f75b3 | ||
|
|
1d8f839b7d |
@@ -101,12 +101,12 @@ use codex_core::RolloutRecorder;
|
||||
use codex_core::SessionMeta;
|
||||
use codex_core::auth::CLIENT_ID;
|
||||
use codex_core::auth::login_with_api_key;
|
||||
use codex_core::client::http::get_codex_user_agent;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::config::ConfigOverrides;
|
||||
use codex_core::config::ConfigToml;
|
||||
use codex_core::config::edit::ConfigEditsBuilder;
|
||||
use codex_core::config_loader::load_config_as_toml;
|
||||
use codex_core::default_client::get_codex_user_agent;
|
||||
use codex_core::exec::ExecParams;
|
||||
use codex_core::exec_env::create_env;
|
||||
use codex_core::find_conversation_path_by_id_str;
|
||||
|
||||
@@ -14,9 +14,9 @@ use codex_app_server_protocol::JSONRPCRequest;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_core::AuthManager;
|
||||
use codex_core::ConversationManager;
|
||||
use codex_core::client::http::USER_AGENT_SUFFIX;
|
||||
use codex_core::client::http::get_codex_user_agent;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::default_client::USER_AGENT_SUFFIX;
|
||||
use codex_core::default_client::get_codex_user_agent;
|
||||
use codex_feedback::CodexFeedback;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -5,7 +5,7 @@ use crate::types::RateLimitWindowSnapshot;
|
||||
use crate::types::TurnAttemptsSiblingTurnsResponse;
|
||||
use anyhow::Result;
|
||||
use codex_core::auth::CodexAuth;
|
||||
use codex_core::default_client::get_codex_user_agent;
|
||||
use codex_core::client::http::get_codex_user_agent;
|
||||
use codex_protocol::protocol::RateLimitSnapshot;
|
||||
use codex_protocol::protocol::RateLimitWindow;
|
||||
use reqwest::header::AUTHORIZATION;
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use codex_core::client::http::create_client;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::default_client::create_client;
|
||||
|
||||
use crate::chatgpt_token::get_chatgpt_token_data;
|
||||
use crate::chatgpt_token::init_chatgpt_token_from_auth;
|
||||
|
||||
@@ -48,7 +48,7 @@ async fn init_backend(user_agent_suffix: &str) -> anyhow::Result<BackendContext>
|
||||
});
|
||||
}
|
||||
|
||||
let ua = codex_core::default_client::get_codex_user_agent();
|
||||
let ua = codex_core::client::http::get_codex_user_agent();
|
||||
let mut http = codex_cloud_tasks_client::HttpClient::new(base_url.clone())?.with_user_agent(ua);
|
||||
let style = if base_url.contains("/backend-api") {
|
||||
"wham"
|
||||
@@ -384,7 +384,7 @@ pub async fn run_main(cli: Cli, _codex_linux_sandbox_exe: Option<PathBuf>) -> an
|
||||
append_error_log(format!(
|
||||
"startup: wham_force_internal={} ua={}",
|
||||
force_internal,
|
||||
codex_core::default_client::get_codex_user_agent()
|
||||
codex_core::client::http::get_codex_user_agent()
|
||||
));
|
||||
// Non-blocking initial load so the in-box spinner can animate
|
||||
app.status = "Loading tasks…".to_string();
|
||||
|
||||
@@ -7,7 +7,7 @@ use codex_core::config::ConfigOverrides;
|
||||
use codex_login::AuthManager;
|
||||
|
||||
pub fn set_user_agent_suffix(suffix: &str) {
|
||||
if let Ok(mut guard) = codex_core::default_client::USER_AGENT_SUFFIX.lock() {
|
||||
if let Ok(mut guard) = codex_core::client::http::USER_AGENT_SUFFIX.lock() {
|
||||
guard.replace(suffix.to_string());
|
||||
}
|
||||
}
|
||||
@@ -79,7 +79,7 @@ pub async fn build_chatgpt_headers() -> HeaderMap {
|
||||
use reqwest::header::USER_AGENT;
|
||||
|
||||
set_user_agent_suffix("codex_cloud_tasks_tui");
|
||||
let ua = codex_core::default_client::get_codex_user_agent();
|
||||
let ua = codex_core::client::http::get_codex_user_agent();
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert(
|
||||
USER_AGENT,
|
||||
|
||||
@@ -23,7 +23,6 @@ pub use crate::auth::storage::AuthDotJson;
|
||||
use crate::auth::storage::AuthStorageBackend;
|
||||
use crate::auth::storage::create_auth_storage;
|
||||
use crate::config::Config;
|
||||
use crate::default_client::CodexHttpClient;
|
||||
use crate::error::RefreshTokenFailedError;
|
||||
use crate::error::RefreshTokenFailedReason;
|
||||
use crate::token_data::KnownPlan as InternalKnownPlan;
|
||||
@@ -272,7 +271,7 @@ impl CodexAuth {
|
||||
mode: AuthMode::ChatGPT,
|
||||
storage: create_auth_storage(PathBuf::new(), AuthCredentialsStoreMode::File),
|
||||
auth_dot_json,
|
||||
client: crate::default_client::create_client(),
|
||||
client: crate::client::http::create_client(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -287,7 +286,7 @@ impl CodexAuth {
|
||||
}
|
||||
|
||||
pub fn from_api_key(api_key: &str) -> Self {
|
||||
Self::from_api_key_with_client(api_key, crate::default_client::create_client())
|
||||
Self::from_api_key_with_client(api_key, crate::client::http::create_client())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -447,7 +446,7 @@ fn load_auth(
|
||||
auth_credentials_store_mode: AuthCredentialsStoreMode,
|
||||
) -> std::io::Result<Option<CodexAuth>> {
|
||||
if enable_codex_api_key_env && let Some(api_key) = read_codex_api_key_from_env() {
|
||||
let client = crate::default_client::create_client();
|
||||
let client = crate::client::http::create_client();
|
||||
return Ok(Some(CodexAuth::from_api_key_with_client(
|
||||
api_key.as_str(),
|
||||
client,
|
||||
@@ -456,7 +455,7 @@ fn load_auth(
|
||||
|
||||
let storage = create_auth_storage(codex_home.to_path_buf(), auth_credentials_store_mode);
|
||||
|
||||
let client = crate::default_client::create_client();
|
||||
let client = crate::client::http::create_client();
|
||||
let auth_dot_json = match storage.load()? {
|
||||
Some(auth) => auth,
|
||||
None => return Ok(None),
|
||||
@@ -632,6 +631,7 @@ fn refresh_token_endpoint() -> String {
|
||||
.unwrap_or_else(|_| REFRESH_TOKEN_URL.to_string())
|
||||
}
|
||||
|
||||
use crate::client::http::CodexHttpClient;
|
||||
use std::sync::RwLock;
|
||||
|
||||
/// Internal cached auth state.
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
235
codex-rs/core/src/client/aggregation.rs
Normal file
235
codex-rs/core/src/client/aggregation.rs
Normal file
@@ -0,0 +1,235 @@
|
||||
use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
use crate::client::ResponseEvent;
|
||||
use crate::error::Result;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ReasoningItemContent;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use futures::Stream;
|
||||
|
||||
/// Optional client-side aggregation helper
|
||||
///
|
||||
/// Stream adapter that merges the incremental `OutputItemDone` chunks coming from
|
||||
/// the chat SSE decoder into a *running* assistant message, **suppressing the
|
||||
/// per-token deltas**. The stream stays silent while the model is thinking and
|
||||
/// only emits two events per turn:
|
||||
///
|
||||
/// 1. `ResponseEvent::OutputItemDone` with the *complete* assistant message
|
||||
/// (fully concatenated).
|
||||
/// 2. The original `ResponseEvent::Completed` right after it.
|
||||
///
|
||||
/// The adapter is intentionally *lossless*: callers who do **not** opt in via
|
||||
/// [`AggregateStreamExt::aggregate()`] keep receiving the original unmodified
|
||||
/// events.
|
||||
#[derive(Copy, Clone, Eq, PartialEq)]
|
||||
enum AggregateMode {
|
||||
AggregatedOnly,
|
||||
Streaming,
|
||||
}
|
||||
|
||||
pub(crate) struct AggregatedChatStream<S> {
|
||||
inner: S,
|
||||
cumulative: String,
|
||||
cumulative_reasoning: String,
|
||||
pending: std::collections::VecDeque<ResponseEvent>,
|
||||
mode: AggregateMode,
|
||||
}
|
||||
|
||||
impl<S> Stream for AggregatedChatStream<S>
|
||||
where
|
||||
S: Stream<Item = Result<ResponseEvent>> + Unpin,
|
||||
{
|
||||
type Item = Result<ResponseEvent>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let this = self.get_mut();
|
||||
|
||||
// First, flush any buffered events from the previous call.
|
||||
if let Some(ev) = this.pending.pop_front() {
|
||||
return Poll::Ready(Some(Ok(ev)));
|
||||
}
|
||||
|
||||
loop {
|
||||
match Pin::new(&mut this.inner).poll_next(cx) {
|
||||
Poll::Pending => return Poll::Pending,
|
||||
Poll::Ready(None) => return Poll::Ready(None),
|
||||
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
|
||||
Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone(item)))) => {
|
||||
// If this is an incremental assistant message chunk, accumulate but
|
||||
// do NOT emit yet. Forward any other item (e.g. FunctionCall) right
|
||||
// away so downstream consumers see it.
|
||||
|
||||
let is_assistant_message = matches!(
|
||||
&item,
|
||||
ResponseItem::Message { role, .. } if role == "assistant"
|
||||
);
|
||||
|
||||
if is_assistant_message {
|
||||
match this.mode {
|
||||
AggregateMode::AggregatedOnly => {
|
||||
// Only use the final assistant message if we have not
|
||||
// seen any deltas; otherwise, deltas already built the
|
||||
// cumulative text and this would duplicate it.
|
||||
if this.cumulative.is_empty()
|
||||
&& let ResponseItem::Message { content, .. } = &item
|
||||
&& let Some(text) = content.iter().find_map(|c| match c {
|
||||
ContentItem::OutputText { text } => Some(text),
|
||||
_ => None,
|
||||
})
|
||||
{
|
||||
this.cumulative.push_str(text);
|
||||
}
|
||||
// Swallow assistant message here; emit on Completed.
|
||||
continue;
|
||||
}
|
||||
AggregateMode::Streaming => {
|
||||
// In streaming mode, if we have not seen any deltas, forward
|
||||
// the final assistant message directly. If deltas were seen,
|
||||
// suppress the final message to avoid duplication.
|
||||
if this.cumulative.is_empty() {
|
||||
return Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone(
|
||||
item,
|
||||
))));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Not an assistant message – forward immediately.
|
||||
return Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone(item))));
|
||||
}
|
||||
Poll::Ready(Some(Ok(ResponseEvent::RateLimits(snapshot)))) => {
|
||||
return Poll::Ready(Some(Ok(ResponseEvent::RateLimits(snapshot))));
|
||||
}
|
||||
Poll::Ready(Some(Ok(ResponseEvent::Completed {
|
||||
response_id,
|
||||
token_usage,
|
||||
}))) => {
|
||||
// Build any aggregated items in the correct order: Reasoning first, then Message.
|
||||
let mut emitted_any = false;
|
||||
|
||||
if !this.cumulative_reasoning.is_empty()
|
||||
&& matches!(this.mode, AggregateMode::AggregatedOnly)
|
||||
{
|
||||
let aggregated_reasoning = ResponseItem::Reasoning {
|
||||
id: String::new(),
|
||||
summary: Vec::new(),
|
||||
content: Some(vec![ReasoningItemContent::ReasoningText {
|
||||
text: std::mem::take(&mut this.cumulative_reasoning),
|
||||
}]),
|
||||
encrypted_content: None,
|
||||
};
|
||||
this.pending
|
||||
.push_back(ResponseEvent::OutputItemDone(aggregated_reasoning));
|
||||
emitted_any = true;
|
||||
}
|
||||
|
||||
// Always emit the final aggregated assistant message when any
|
||||
// content deltas have been observed. In AggregatedOnly mode this
|
||||
// is the sole assistant output; in Streaming mode this finalizes
|
||||
// the streamed deltas into a terminal OutputItemDone so callers
|
||||
// can persist/render the message once per turn.
|
||||
if !this.cumulative.is_empty() {
|
||||
let aggregated_message = ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".to_string(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: std::mem::take(&mut this.cumulative),
|
||||
}],
|
||||
};
|
||||
this.pending
|
||||
.push_back(ResponseEvent::OutputItemDone(aggregated_message));
|
||||
emitted_any = true;
|
||||
}
|
||||
|
||||
// Always emit Completed last when anything was aggregated.
|
||||
if emitted_any {
|
||||
this.pending.push_back(ResponseEvent::Completed {
|
||||
response_id: response_id.clone(),
|
||||
token_usage: token_usage.clone(),
|
||||
});
|
||||
if let Some(ev) = this.pending.pop_front() {
|
||||
return Poll::Ready(Some(Ok(ev)));
|
||||
}
|
||||
}
|
||||
|
||||
// Nothing aggregated – forward Completed directly.
|
||||
return Poll::Ready(Some(Ok(ResponseEvent::Completed {
|
||||
response_id,
|
||||
token_usage,
|
||||
})));
|
||||
}
|
||||
Poll::Ready(Some(Ok(ResponseEvent::Created))) => {
|
||||
// These events are exclusive to the Responses API and
|
||||
// will never appear in a Chat Completions stream.
|
||||
continue;
|
||||
}
|
||||
Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(delta)))) => {
|
||||
// Always accumulate deltas so we can emit a final OutputItemDone at Completed.
|
||||
this.cumulative.push_str(&delta);
|
||||
if matches!(this.mode, AggregateMode::Streaming) {
|
||||
// In streaming mode, also forward the delta immediately.
|
||||
return Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(delta))));
|
||||
}
|
||||
}
|
||||
Poll::Ready(Some(Ok(ResponseEvent::ReasoningContentDelta {
|
||||
delta,
|
||||
content_index,
|
||||
}))) => {
|
||||
// Always accumulate reasoning deltas so we can emit a final Reasoning item at Completed.
|
||||
this.cumulative_reasoning.push_str(&delta);
|
||||
if matches!(this.mode, AggregateMode::Streaming) {
|
||||
// In streaming mode, also forward the delta immediately.
|
||||
return Poll::Ready(Some(Ok(ResponseEvent::ReasoningContentDelta {
|
||||
delta,
|
||||
content_index,
|
||||
})));
|
||||
}
|
||||
}
|
||||
Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta { .. }))) => {}
|
||||
Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryPartAdded { .. }))) => {}
|
||||
Poll::Ready(Some(Ok(ResponseEvent::OutputItemAdded(item)))) => {
|
||||
return Poll::Ready(Some(Ok(ResponseEvent::OutputItemAdded(item))));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Extension trait that activates aggregation on any stream of [`ResponseEvent`].
|
||||
pub(crate) trait AggregateStreamExt: Stream<Item = Result<ResponseEvent>> + Sized {
|
||||
/// Returns a new stream that emits **only** the final assistant message
|
||||
/// per turn instead of every incremental delta. The produced
|
||||
/// `ResponseEvent` sequence for a typical text turn looks like:
|
||||
///
|
||||
/// ```ignore
|
||||
/// OutputItemDone(<full message>)
|
||||
/// Completed
|
||||
/// ```
|
||||
///
|
||||
/// No other `OutputItemDone` events will be seen by the caller.
|
||||
fn aggregate(self) -> AggregatedChatStream<Self> {
|
||||
AggregatedChatStream::new(self, AggregateMode::AggregatedOnly)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> AggregateStreamExt for T where T: Stream<Item = Result<ResponseEvent>> + Sized {}
|
||||
|
||||
impl<S> AggregatedChatStream<S> {
|
||||
fn new(inner: S, mode: AggregateMode) -> Self {
|
||||
AggregatedChatStream {
|
||||
inner,
|
||||
cumulative: String::new(),
|
||||
cumulative_reasoning: String::new(),
|
||||
pending: std::collections::VecDeque::new(),
|
||||
mode,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn streaming_mode(inner: S) -> Self {
|
||||
Self::new(inner, AggregateMode::Streaming)
|
||||
}
|
||||
}
|
||||
@@ -1,15 +1,16 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::ModelProviderInfo;
|
||||
use crate::client::ResponseEvent;
|
||||
use crate::client::ResponseStream;
|
||||
use crate::client::http::CodexHttpClient;
|
||||
use crate::client::retry::RetryableStreamError;
|
||||
use crate::client::retry::retry_stream;
|
||||
use crate::client_common::Prompt;
|
||||
use crate::client_common::ResponseEvent;
|
||||
use crate::client_common::ResponseStream;
|
||||
use crate::default_client::CodexHttpClient;
|
||||
use crate::error::CodexErr;
|
||||
use crate::error::ConnectionFailedError;
|
||||
use crate::error::ResponseStreamFailed;
|
||||
use crate::error::Result;
|
||||
use crate::error::RetryLimitReachedError;
|
||||
use crate::error::UnexpectedResponseError;
|
||||
use crate::model_family::ModelFamily;
|
||||
use crate::tools::spec::create_tools_json_for_chat_completions_api;
|
||||
@@ -21,19 +22,12 @@ use codex_protocol::models::FunctionCallOutputContentItem;
|
||||
use codex_protocol::models::ReasoningItemContent;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use eventsource_stream::Eventsource;
|
||||
use futures::Stream;
|
||||
use futures::StreamExt;
|
||||
use futures::TryStreamExt;
|
||||
use reqwest::StatusCode;
|
||||
use serde_json::json;
|
||||
use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::time::timeout;
|
||||
use tracing::debug;
|
||||
use tracing::trace;
|
||||
|
||||
/// Implementation for the classic Chat Completions API.
|
||||
@@ -55,7 +49,7 @@ pub(crate) async fn stream_chat_completions(
|
||||
let mut messages = Vec::<serde_json::Value>::new();
|
||||
|
||||
let full_instructions = prompt.get_full_instructions(model_family);
|
||||
messages.push(json!({"role": "system", "content": full_instructions}));
|
||||
messages.push(json!({ "role": "system", "content": full_instructions }));
|
||||
|
||||
let input = prompt.get_formatted_input();
|
||||
|
||||
@@ -128,7 +122,7 @@ pub(crate) async fn stream_chat_completions(
|
||||
{
|
||||
reasoning_by_anchor_index
|
||||
.entry(idx - 1)
|
||||
.and_modify(|v| v.push_str(&text))
|
||||
.and_modify(|v| v.push_str(text.as_str()))
|
||||
.or_insert(text.clone());
|
||||
attached = true;
|
||||
}
|
||||
@@ -139,13 +133,13 @@ pub(crate) async fn stream_chat_completions(
|
||||
ResponseItem::FunctionCall { .. } | ResponseItem::LocalShellCall { .. } => {
|
||||
reasoning_by_anchor_index
|
||||
.entry(idx + 1)
|
||||
.and_modify(|v| v.push_str(&text))
|
||||
.and_modify(|v| v.push_str(text.as_str()))
|
||||
.or_insert(text.clone());
|
||||
}
|
||||
ResponseItem::Message { role, .. } if role == "assistant" => {
|
||||
reasoning_by_anchor_index
|
||||
.entry(idx + 1)
|
||||
.and_modify(|v| v.push_str(&text))
|
||||
.and_modify(|v| v.push_str(text.as_str()))
|
||||
.or_insert(text.clone());
|
||||
}
|
||||
_ => {}
|
||||
@@ -203,13 +197,18 @@ pub(crate) async fn stream_chat_completions(
|
||||
json!(text)
|
||||
};
|
||||
|
||||
let mut msg = json!({"role": role, "content": content_value});
|
||||
let mut msg = json!({
|
||||
"role": role,
|
||||
"content": content_value
|
||||
});
|
||||
|
||||
if role == "assistant"
|
||||
&& let Some(reasoning) = reasoning_by_anchor_index.get(&idx)
|
||||
&& let Some(obj) = msg.as_object_mut()
|
||||
{
|
||||
obj.insert("reasoning".to_string(), json!(reasoning));
|
||||
}
|
||||
|
||||
messages.push(msg);
|
||||
}
|
||||
ResponseItem::FunctionCall {
|
||||
@@ -228,22 +227,24 @@ pub(crate) async fn stream_chat_completions(
|
||||
"name": name,
|
||||
"arguments": arguments,
|
||||
}
|
||||
}]
|
||||
}],
|
||||
});
|
||||
|
||||
if let Some(reasoning) = reasoning_by_anchor_index.get(&idx)
|
||||
&& let Some(obj) = msg.as_object_mut()
|
||||
{
|
||||
obj.insert("reasoning".to_string(), json!(reasoning));
|
||||
}
|
||||
|
||||
messages.push(msg);
|
||||
}
|
||||
|
||||
ResponseItem::LocalShellCall {
|
||||
id,
|
||||
call_id: _,
|
||||
status,
|
||||
action,
|
||||
} => {
|
||||
// Confirm with API team.
|
||||
let mut msg = json!({
|
||||
"role": "assistant",
|
||||
"content": null,
|
||||
@@ -254,13 +255,16 @@ pub(crate) async fn stream_chat_completions(
|
||||
"action": action,
|
||||
}]
|
||||
});
|
||||
|
||||
if let Some(reasoning) = reasoning_by_anchor_index.get(&idx)
|
||||
&& let Some(obj) = msg.as_object_mut()
|
||||
{
|
||||
obj.insert("reasoning".to_string(), json!(reasoning));
|
||||
}
|
||||
|
||||
messages.push(msg);
|
||||
}
|
||||
|
||||
ResponseItem::FunctionCallOutput { call_id, output } => {
|
||||
// Prefer structured content items when available (e.g., images)
|
||||
// otherwise fall back to the legacy plain-string content.
|
||||
@@ -287,6 +291,7 @@ pub(crate) async fn stream_chat_completions(
|
||||
"content": content_value,
|
||||
}));
|
||||
}
|
||||
|
||||
ResponseItem::CustomToolCall {
|
||||
id,
|
||||
call_id: _,
|
||||
@@ -304,7 +309,7 @@ pub(crate) async fn stream_chat_completions(
|
||||
"name": name,
|
||||
"input": input,
|
||||
}
|
||||
}]
|
||||
}],
|
||||
}));
|
||||
}
|
||||
ResponseItem::CustomToolCallOutput { call_id, output } => {
|
||||
@@ -314,117 +319,148 @@ pub(crate) async fn stream_chat_completions(
|
||||
"content": output,
|
||||
}));
|
||||
}
|
||||
ResponseItem::GhostSnapshot { .. } => {
|
||||
// Ghost snapshots annotate history but are not sent to the model.
|
||||
|
||||
ResponseItem::Reasoning { .. } => {
|
||||
// Omit from conversation history; reasoning is attached to anchors above.
|
||||
continue;
|
||||
}
|
||||
ResponseItem::Reasoning { .. }
|
||||
| ResponseItem::WebSearchCall { .. }
|
||||
| ResponseItem::Other => {
|
||||
// Omit these items from the conversation history.
|
||||
|
||||
ResponseItem::WebSearchCall { .. } | ResponseItem::Other => {
|
||||
continue;
|
||||
}
|
||||
|
||||
ResponseItem::GhostSnapshot { .. } => {
|
||||
// Ghost snapshots annotate history but are not sent to the model.
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let tools_json = create_tools_json_for_chat_completions_api(&prompt.tools)?;
|
||||
let payload = json!({
|
||||
|
||||
let mut body = json!({
|
||||
"model": model_family.slug,
|
||||
"messages": messages,
|
||||
"stream": true,
|
||||
"tools": tools_json,
|
||||
"stream_options": {
|
||||
"include_usage": true,
|
||||
},
|
||||
});
|
||||
|
||||
debug!(
|
||||
if !tools_json.is_empty() {
|
||||
body["tools"] = json!(tools_json);
|
||||
body["tool_choice"] = json!("auto");
|
||||
}
|
||||
|
||||
if let SessionSource::SubAgent(sub) = session_source {
|
||||
let subagent = crate::client::types::subagent_label(sub);
|
||||
body["metadata"] = json!({
|
||||
"x-openai-subagent": subagent,
|
||||
});
|
||||
}
|
||||
|
||||
let max_attempts = provider.request_max_retries();
|
||||
retry_stream(max_attempts, |attempt| {
|
||||
let body = body.clone();
|
||||
async move {
|
||||
stream_single_chat_completion(attempt, client, provider, otel_event_manager, body)
|
||||
.await
|
||||
.map_err(ChatStreamError::Retryable)
|
||||
}
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
async fn stream_single_chat_completion(
|
||||
attempt: u64,
|
||||
client: &CodexHttpClient,
|
||||
provider: &ModelProviderInfo,
|
||||
otel_event_manager: &OtelEventManager,
|
||||
body: serde_json::Value,
|
||||
) -> Result<ResponseStream> {
|
||||
trace!(
|
||||
"POST to {}: {}",
|
||||
provider.get_full_url(&None),
|
||||
payload.to_string()
|
||||
body.to_string()
|
||||
);
|
||||
|
||||
let mut attempt = 0;
|
||||
let max_retries = provider.request_max_retries();
|
||||
loop {
|
||||
attempt += 1;
|
||||
let mut req_builder = provider.create_request_builder(client, &None).await?;
|
||||
req_builder = req_builder
|
||||
.header(reqwest::header::ACCEPT, "text/event-stream")
|
||||
.json(&body);
|
||||
|
||||
let mut req_builder = provider.create_request_builder(client, &None).await?;
|
||||
let res = otel_event_manager
|
||||
.log_request(attempt, || req_builder.send())
|
||||
.await;
|
||||
|
||||
// Include subagent header only for subagent sessions.
|
||||
if let SessionSource::SubAgent(sub) = session_source.clone() {
|
||||
let subagent = if let SubAgentSource::Other(label) = sub {
|
||||
label
|
||||
} else {
|
||||
serde_json::to_value(&sub)
|
||||
.ok()
|
||||
.and_then(|v| v.as_str().map(std::string::ToString::to_string))
|
||||
.unwrap_or_else(|| "other".to_string())
|
||||
};
|
||||
req_builder = req_builder.header("x-openai-subagent", subagent);
|
||||
let mut request_id = None;
|
||||
if let Ok(resp) = &res {
|
||||
request_id = resp
|
||||
.headers()
|
||||
.get("cf-ray")
|
||||
.map(|v| v.to_str().unwrap_or_default().to_string());
|
||||
}
|
||||
|
||||
match res {
|
||||
Ok(resp) if resp.status().is_success() => {
|
||||
let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent>>(1600);
|
||||
|
||||
// spawn task to process SSE
|
||||
let stream = resp.bytes_stream().map_err(move |e| {
|
||||
CodexErr::ResponseStreamFailed(ResponseStreamFailed {
|
||||
source: e,
|
||||
request_id: request_id.clone(),
|
||||
})
|
||||
});
|
||||
tokio::spawn(process_chat_sse(
|
||||
stream,
|
||||
tx_event,
|
||||
provider.stream_idle_timeout(),
|
||||
otel_event_manager.clone(),
|
||||
));
|
||||
|
||||
Ok(ResponseStream { rx_event })
|
||||
}
|
||||
Ok(res) => {
|
||||
let status = res.status();
|
||||
|
||||
let res = otel_event_manager
|
||||
.log_request(attempt, || {
|
||||
req_builder
|
||||
.header(reqwest::header::ACCEPT, "text/event-stream")
|
||||
.json(&payload)
|
||||
.send()
|
||||
})
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(resp) if resp.status().is_success() => {
|
||||
let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent>>(1600);
|
||||
let stream = resp.bytes_stream().map_err(|e| {
|
||||
CodexErr::ResponseStreamFailed(ResponseStreamFailed {
|
||||
source: e,
|
||||
request_id: None,
|
||||
})
|
||||
});
|
||||
tokio::spawn(process_chat_sse(
|
||||
stream,
|
||||
tx_event,
|
||||
provider.stream_idle_timeout(),
|
||||
otel_event_manager.clone(),
|
||||
));
|
||||
return Ok(ResponseStream { rx_event });
|
||||
if !(status == StatusCode::TOO_MANY_REQUESTS
|
||||
|| status == StatusCode::UNAUTHORIZED
|
||||
|| status.is_server_error())
|
||||
{
|
||||
// Surface the error body to callers. Use `unwrap_or_default` per Clippy.
|
||||
let body = res.text().await.unwrap_or_default();
|
||||
return Err(CodexErr::UnexpectedStatus(UnexpectedResponseError {
|
||||
status,
|
||||
body,
|
||||
request_id: None,
|
||||
}));
|
||||
}
|
||||
Ok(res) => {
|
||||
let status = res.status();
|
||||
if !(status == StatusCode::TOO_MANY_REQUESTS || status.is_server_error()) {
|
||||
let body = (res.text().await).unwrap_or_default();
|
||||
return Err(CodexErr::UnexpectedStatus(UnexpectedResponseError {
|
||||
status,
|
||||
body,
|
||||
request_id: None,
|
||||
}));
|
||||
}
|
||||
|
||||
if attempt > max_retries {
|
||||
return Err(CodexErr::RetryLimit(RetryLimitReachedError {
|
||||
status,
|
||||
request_id: None,
|
||||
}));
|
||||
}
|
||||
Err(CodexErr::UnexpectedStatus(UnexpectedResponseError {
|
||||
status,
|
||||
body: String::new(),
|
||||
request_id,
|
||||
}))
|
||||
}
|
||||
Err(e) => Err(CodexErr::ConnectionFailed(ConnectionFailedError {
|
||||
source: e,
|
||||
})),
|
||||
}
|
||||
}
|
||||
|
||||
let retry_after_secs = res
|
||||
.headers()
|
||||
.get(reqwest::header::RETRY_AFTER)
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.and_then(|s| s.parse::<u64>().ok());
|
||||
enum ChatStreamError {
|
||||
Retryable(CodexErr),
|
||||
}
|
||||
|
||||
let delay = retry_after_secs
|
||||
.map(|s| Duration::from_millis(s * 1_000))
|
||||
.unwrap_or_else(|| backoff(attempt));
|
||||
tokio::time::sleep(delay).await;
|
||||
}
|
||||
Err(e) => {
|
||||
if attempt > max_retries {
|
||||
return Err(CodexErr::ConnectionFailed(ConnectionFailedError {
|
||||
source: e,
|
||||
}));
|
||||
}
|
||||
let delay = backoff(attempt);
|
||||
tokio::time::sleep(delay).await;
|
||||
}
|
||||
impl RetryableStreamError for ChatStreamError {
|
||||
fn delay(&self, attempt: u64) -> Option<Duration> {
|
||||
Some(backoff(attempt))
|
||||
}
|
||||
|
||||
fn into_error(self) -> CodexErr {
|
||||
match self {
|
||||
ChatStreamError::Retryable(e) => e,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -488,9 +524,7 @@ async fn append_reasoning_text(
|
||||
.await;
|
||||
}
|
||||
}
|
||||
/// Lightweight SSE processor for the Chat Completions streaming format. The
|
||||
/// output is mapped onto Codex's internal [`ResponseEvent`] so that the rest
|
||||
/// of the pipeline can stay agnostic of the underlying wire format.
|
||||
|
||||
async fn process_chat_sse<S>(
|
||||
stream: S,
|
||||
tx_event: mpsc::Sender<Result<ResponseEvent>>,
|
||||
@@ -519,21 +553,15 @@ async fn process_chat_sse<S>(
|
||||
let mut reasoning_item: Option<ResponseItem> = None;
|
||||
|
||||
loop {
|
||||
let start = std::time::Instant::now();
|
||||
let response = timeout(idle_timeout, stream.next()).await;
|
||||
let duration = start.elapsed();
|
||||
otel_event_manager.log_sse_event(&response, duration);
|
||||
|
||||
let sse = match response {
|
||||
Ok(Some(Ok(ev))) => ev,
|
||||
Ok(Some(Err(e))) => {
|
||||
let _ = tx_event
|
||||
.send(Err(CodexErr::Stream(e.to_string(), None)))
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
Ok(None) => {
|
||||
// Stream closed gracefully – emit Completed with dummy id.
|
||||
let sse = match crate::client::sse::next_sse_event(
|
||||
&mut stream,
|
||||
idle_timeout,
|
||||
&otel_event_manager,
|
||||
)
|
||||
.await
|
||||
{
|
||||
crate::client::sse::SseNext::Event(ev) => ev,
|
||||
crate::client::sse::SseNext::Eof => {
|
||||
let _ = tx_event
|
||||
.send(Ok(ResponseEvent::Completed {
|
||||
response_id: String::new(),
|
||||
@@ -542,7 +570,11 @@ async fn process_chat_sse<S>(
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
Err(_) => {
|
||||
crate::client::sse::SseNext::StreamError(message) => {
|
||||
let _ = tx_event.send(Err(CodexErr::Stream(message, None))).await;
|
||||
return;
|
||||
}
|
||||
crate::client::sse::SseNext::Timeout => {
|
||||
let _ = tx_event
|
||||
.send(Err(CodexErr::Stream(
|
||||
"idle timeout waiting for SSE".into(),
|
||||
@@ -724,256 +756,3 @@ async fn process_chat_sse<S>(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Optional client-side aggregation helper
|
||||
///
|
||||
/// Stream adapter that merges the incremental `OutputItemDone` chunks coming from
|
||||
/// [`process_chat_sse`] into a *running* assistant message, **suppressing the
|
||||
/// per-token deltas**. The stream stays silent while the model is thinking
|
||||
/// and only emits two events per turn:
|
||||
///
|
||||
/// 1. `ResponseEvent::OutputItemDone` with the *complete* assistant message
|
||||
/// (fully concatenated).
|
||||
/// 2. The original `ResponseEvent::Completed` right after it.
|
||||
///
|
||||
/// This mirrors the behaviour the TypeScript CLI exposes to its higher layers.
|
||||
///
|
||||
/// The adapter is intentionally *lossless*: callers who do **not** opt in via
|
||||
/// [`AggregateStreamExt::aggregate()`] keep receiving the original unmodified
|
||||
/// events.
|
||||
#[derive(Copy, Clone, Eq, PartialEq)]
|
||||
enum AggregateMode {
|
||||
AggregatedOnly,
|
||||
Streaming,
|
||||
}
|
||||
pub(crate) struct AggregatedChatStream<S> {
|
||||
inner: S,
|
||||
cumulative: String,
|
||||
cumulative_reasoning: String,
|
||||
pending: std::collections::VecDeque<ResponseEvent>,
|
||||
mode: AggregateMode,
|
||||
}
|
||||
|
||||
impl<S> Stream for AggregatedChatStream<S>
|
||||
where
|
||||
S: Stream<Item = Result<ResponseEvent>> + Unpin,
|
||||
{
|
||||
type Item = Result<ResponseEvent>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let this = self.get_mut();
|
||||
|
||||
// First, flush any buffered events from the previous call.
|
||||
if let Some(ev) = this.pending.pop_front() {
|
||||
return Poll::Ready(Some(Ok(ev)));
|
||||
}
|
||||
|
||||
loop {
|
||||
match Pin::new(&mut this.inner).poll_next(cx) {
|
||||
Poll::Pending => return Poll::Pending,
|
||||
Poll::Ready(None) => return Poll::Ready(None),
|
||||
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
|
||||
Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone(item)))) => {
|
||||
// If this is an incremental assistant message chunk, accumulate but
|
||||
// do NOT emit yet. Forward any other item (e.g. FunctionCall) right
|
||||
// away so downstream consumers see it.
|
||||
|
||||
let is_assistant_message = matches!(
|
||||
&item,
|
||||
codex_protocol::models::ResponseItem::Message { role, .. } if role == "assistant"
|
||||
);
|
||||
|
||||
if is_assistant_message {
|
||||
match this.mode {
|
||||
AggregateMode::AggregatedOnly => {
|
||||
// Only use the final assistant message if we have not
|
||||
// seen any deltas; otherwise, deltas already built the
|
||||
// cumulative text and this would duplicate it.
|
||||
if this.cumulative.is_empty()
|
||||
&& let codex_protocol::models::ResponseItem::Message {
|
||||
content,
|
||||
..
|
||||
} = &item
|
||||
&& let Some(text) = content.iter().find_map(|c| match c {
|
||||
codex_protocol::models::ContentItem::OutputText {
|
||||
text,
|
||||
} => Some(text),
|
||||
_ => None,
|
||||
})
|
||||
{
|
||||
this.cumulative.push_str(text);
|
||||
}
|
||||
// Swallow assistant message here; emit on Completed.
|
||||
continue;
|
||||
}
|
||||
AggregateMode::Streaming => {
|
||||
// In streaming mode, if we have not seen any deltas, forward
|
||||
// the final assistant message directly. If deltas were seen,
|
||||
// suppress the final message to avoid duplication.
|
||||
if this.cumulative.is_empty() {
|
||||
return Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone(
|
||||
item,
|
||||
))));
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Not an assistant message – forward immediately.
|
||||
return Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone(item))));
|
||||
}
|
||||
Poll::Ready(Some(Ok(ResponseEvent::RateLimits(snapshot)))) => {
|
||||
return Poll::Ready(Some(Ok(ResponseEvent::RateLimits(snapshot))));
|
||||
}
|
||||
Poll::Ready(Some(Ok(ResponseEvent::Completed {
|
||||
response_id,
|
||||
token_usage,
|
||||
}))) => {
|
||||
// Build any aggregated items in the correct order: Reasoning first, then Message.
|
||||
let mut emitted_any = false;
|
||||
|
||||
if !this.cumulative_reasoning.is_empty()
|
||||
&& matches!(this.mode, AggregateMode::AggregatedOnly)
|
||||
{
|
||||
let aggregated_reasoning =
|
||||
codex_protocol::models::ResponseItem::Reasoning {
|
||||
id: String::new(),
|
||||
summary: Vec::new(),
|
||||
content: Some(vec![
|
||||
codex_protocol::models::ReasoningItemContent::ReasoningText {
|
||||
text: std::mem::take(&mut this.cumulative_reasoning),
|
||||
},
|
||||
]),
|
||||
encrypted_content: None,
|
||||
};
|
||||
this.pending
|
||||
.push_back(ResponseEvent::OutputItemDone(aggregated_reasoning));
|
||||
emitted_any = true;
|
||||
}
|
||||
|
||||
// Always emit the final aggregated assistant message when any
|
||||
// content deltas have been observed. In AggregatedOnly mode this
|
||||
// is the sole assistant output; in Streaming mode this finalizes
|
||||
// the streamed deltas into a terminal OutputItemDone so callers
|
||||
// can persist/render the message once per turn.
|
||||
if !this.cumulative.is_empty() {
|
||||
let aggregated_message = codex_protocol::models::ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".to_string(),
|
||||
content: vec![codex_protocol::models::ContentItem::OutputText {
|
||||
text: std::mem::take(&mut this.cumulative),
|
||||
}],
|
||||
};
|
||||
this.pending
|
||||
.push_back(ResponseEvent::OutputItemDone(aggregated_message));
|
||||
emitted_any = true;
|
||||
}
|
||||
|
||||
// Always emit Completed last when anything was aggregated.
|
||||
if emitted_any {
|
||||
this.pending.push_back(ResponseEvent::Completed {
|
||||
response_id: response_id.clone(),
|
||||
token_usage: token_usage.clone(),
|
||||
});
|
||||
// Return the first pending event now.
|
||||
if let Some(ev) = this.pending.pop_front() {
|
||||
return Poll::Ready(Some(Ok(ev)));
|
||||
}
|
||||
}
|
||||
|
||||
// Nothing aggregated – forward Completed directly.
|
||||
return Poll::Ready(Some(Ok(ResponseEvent::Completed {
|
||||
response_id,
|
||||
token_usage,
|
||||
})));
|
||||
}
|
||||
Poll::Ready(Some(Ok(ResponseEvent::Created))) => {
|
||||
// These events are exclusive to the Responses API and
|
||||
// will never appear in a Chat Completions stream.
|
||||
continue;
|
||||
}
|
||||
Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(delta)))) => {
|
||||
// Always accumulate deltas so we can emit a final OutputItemDone at Completed.
|
||||
this.cumulative.push_str(&delta);
|
||||
if matches!(this.mode, AggregateMode::Streaming) {
|
||||
// In streaming mode, also forward the delta immediately.
|
||||
return Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(delta))));
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Poll::Ready(Some(Ok(ResponseEvent::ReasoningContentDelta {
|
||||
delta,
|
||||
content_index,
|
||||
}))) => {
|
||||
// Always accumulate reasoning deltas so we can emit a final Reasoning item at Completed.
|
||||
this.cumulative_reasoning.push_str(&delta);
|
||||
if matches!(this.mode, AggregateMode::Streaming) {
|
||||
// In streaming mode, also forward the delta immediately.
|
||||
return Poll::Ready(Some(Ok(ResponseEvent::ReasoningContentDelta {
|
||||
delta,
|
||||
content_index,
|
||||
})));
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta { .. }))) => {
|
||||
continue;
|
||||
}
|
||||
Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryPartAdded { .. }))) => {
|
||||
continue;
|
||||
}
|
||||
Poll::Ready(Some(Ok(ResponseEvent::OutputItemAdded(item)))) => {
|
||||
return Poll::Ready(Some(Ok(ResponseEvent::OutputItemAdded(item))));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Extension trait that activates aggregation on any stream of [`ResponseEvent`].
|
||||
pub(crate) trait AggregateStreamExt: Stream<Item = Result<ResponseEvent>> + Sized {
|
||||
/// Returns a new stream that emits **only** the final assistant message
|
||||
/// per turn instead of every incremental delta. The produced
|
||||
/// `ResponseEvent` sequence for a typical text turn looks like:
|
||||
///
|
||||
/// ```ignore
|
||||
/// OutputItemDone(<full message>)
|
||||
/// Completed
|
||||
/// ```
|
||||
///
|
||||
/// No other `OutputItemDone` events will be seen by the caller.
|
||||
///
|
||||
/// Usage:
|
||||
///
|
||||
/// ```ignore
|
||||
/// let agg_stream = client.stream(&prompt).await?.aggregate();
|
||||
/// while let Some(event) = agg_stream.next().await {
|
||||
/// // event now contains cumulative text
|
||||
/// }
|
||||
/// ```
|
||||
fn aggregate(self) -> AggregatedChatStream<Self> {
|
||||
AggregatedChatStream::new(self, AggregateMode::AggregatedOnly)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> AggregateStreamExt for T where T: Stream<Item = Result<ResponseEvent>> + Sized {}
|
||||
|
||||
impl<S> AggregatedChatStream<S> {
|
||||
fn new(inner: S, mode: AggregateMode) -> Self {
|
||||
AggregatedChatStream {
|
||||
inner,
|
||||
cumulative: String::new(),
|
||||
cumulative_reasoning: String::new(),
|
||||
pending: std::collections::VecDeque::new(),
|
||||
mode,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn streaming_mode(inner: S) -> Self {
|
||||
Self::new(inner, AggregateMode::Streaming)
|
||||
}
|
||||
}
|
||||
@@ -154,11 +154,13 @@ impl CodexRequestBuilder {
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Originator {
|
||||
pub value: String,
|
||||
pub header_value: HeaderValue,
|
||||
}
|
||||
|
||||
static ORIGINATOR: OnceLock<Originator> = OnceLock::new();
|
||||
|
||||
#[derive(Debug)]
|
||||
22
codex-rs/core/src/client/mod.rs
Normal file
22
codex-rs/core/src/client/mod.rs
Normal file
@@ -0,0 +1,22 @@
|
||||
mod aggregation;
|
||||
mod chat_completions;
|
||||
pub mod http;
|
||||
mod rate_limits;
|
||||
mod responses;
|
||||
mod retry;
|
||||
mod sse;
|
||||
pub mod types;
|
||||
|
||||
pub(crate) use aggregation::AggregateStreamExt;
|
||||
pub(crate) use aggregation::AggregatedChatStream;
|
||||
pub(crate) use chat_completions::stream_chat_completions;
|
||||
pub use responses::ModelClient;
|
||||
pub(crate) use types::FreeformTool;
|
||||
pub(crate) use types::FreeformToolFormat;
|
||||
pub(crate) use types::Reasoning;
|
||||
pub use types::ResponseEvent;
|
||||
pub use types::ResponseStream;
|
||||
pub(crate) use types::ResponsesApiRequest;
|
||||
pub(crate) use types::ResponsesApiTool;
|
||||
pub(crate) use types::ToolSpec;
|
||||
pub(crate) use types::create_text_param_for_request;
|
||||
86
codex-rs/core/src/client/rate_limits.rs
Normal file
86
codex-rs/core/src/client/rate_limits.rs
Normal file
@@ -0,0 +1,86 @@
|
||||
use crate::protocol::RateLimitSnapshot;
|
||||
use crate::protocol::RateLimitWindow;
|
||||
use chrono::Utc;
|
||||
use reqwest::header::HeaderMap;
|
||||
|
||||
/// Prefer Codex-specific aggregate rate limit headers if present; fall back
|
||||
/// to raw OpenAI-style request headers otherwise.
|
||||
pub(crate) fn parse_rate_limit_snapshot(headers: &HeaderMap) -> Option<RateLimitSnapshot> {
|
||||
parse_codex_rate_limits(headers).or_else(|| parse_openai_rate_limits(headers))
|
||||
}
|
||||
|
||||
fn parse_codex_rate_limits(headers: &HeaderMap) -> Option<RateLimitSnapshot> {
|
||||
fn parse_f64(headers: &HeaderMap, name: &str) -> Option<f64> {
|
||||
headers
|
||||
.get(name)
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.and_then(|s| s.parse::<f64>().ok())
|
||||
}
|
||||
|
||||
fn parse_i64(headers: &HeaderMap, name: &str) -> Option<i64> {
|
||||
headers
|
||||
.get(name)
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.and_then(|s| s.parse::<i64>().ok())
|
||||
}
|
||||
|
||||
let primary_used = parse_f64(headers, "x-codex-primary-used-percent");
|
||||
let secondary_used = parse_f64(headers, "x-codex-secondary-used-percent");
|
||||
|
||||
if primary_used.is_none() && secondary_used.is_none() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let primary = primary_used.map(|used_percent| RateLimitWindow {
|
||||
used_percent,
|
||||
window_minutes: parse_i64(headers, "x-codex-primary-window-minutes"),
|
||||
resets_at: parse_i64(headers, "x-codex-primary-reset-at"),
|
||||
});
|
||||
|
||||
let secondary = secondary_used.map(|used_percent| RateLimitWindow {
|
||||
used_percent,
|
||||
window_minutes: parse_i64(headers, "x-codex-secondary-window-minutes"),
|
||||
resets_at: parse_i64(headers, "x-codex-secondary-reset-at"),
|
||||
});
|
||||
|
||||
Some(RateLimitSnapshot { primary, secondary })
|
||||
}
|
||||
|
||||
fn parse_openai_rate_limits(headers: &HeaderMap) -> Option<RateLimitSnapshot> {
|
||||
let limit = headers.get("x-ratelimit-limit-requests")?;
|
||||
let remaining = headers.get("x-ratelimit-remaining-requests")?;
|
||||
let reset_ms = headers.get("x-ratelimit-reset-requests")?;
|
||||
|
||||
let limit = limit.to_str().ok()?.parse::<f64>().ok()?;
|
||||
let remaining = remaining.to_str().ok()?.parse::<f64>().ok()?;
|
||||
let reset_ms = reset_ms.to_str().ok()?.parse::<i64>().ok()?;
|
||||
|
||||
if limit <= 0.0 {
|
||||
return None;
|
||||
}
|
||||
|
||||
let used = (limit - remaining).max(0.0);
|
||||
let used_percent = (used / limit) * 100.0;
|
||||
|
||||
let window_minutes = if reset_ms <= 0 {
|
||||
None
|
||||
} else {
|
||||
let seconds = reset_ms / 1000;
|
||||
Some((seconds + 59) / 60)
|
||||
};
|
||||
|
||||
let resets_at = if reset_ms > 0 {
|
||||
Some(Utc::now().timestamp() + reset_ms / 1000)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Some(RateLimitSnapshot {
|
||||
primary: Some(RateLimitWindow {
|
||||
used_percent,
|
||||
window_minutes,
|
||||
resets_at,
|
||||
}),
|
||||
secondary: None,
|
||||
})
|
||||
}
|
||||
1281
codex-rs/core/src/client/responses.rs
Normal file
1281
codex-rs/core/src/client/responses.rs
Normal file
File diff suppressed because it is too large
Load Diff
140
codex-rs/core/src/client/retry.rs
Normal file
140
codex-rs/core/src/client/retry.rs
Normal file
@@ -0,0 +1,140 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::error::CodexErr;
|
||||
use crate::error::Result;
|
||||
|
||||
/// Common interface for classifying stream start errors as retryable or fatal.
|
||||
pub(crate) trait RetryableStreamError {
|
||||
/// Returns a delay for the next retry attempt, or `None` if the error
|
||||
/// should be treated as fatal and not retried.
|
||||
fn delay(&self, attempt: u64) -> Option<Duration>;
|
||||
|
||||
/// Converts this error into the final `CodexErr` that should be surfaced
|
||||
/// to callers when retries are exhausted or the error is fatal.
|
||||
fn into_error(self) -> CodexErr;
|
||||
}
|
||||
|
||||
/// Helper to retry a streaming operation with provider-configured backoff.
|
||||
///
|
||||
/// The caller supplies an `attempt_fn` that is invoked once per attempt with
|
||||
/// the current attempt index in `[0, max_attempts]`. On success, the value is
|
||||
/// returned immediately. On error, the error's [`RetryableStreamError`]
|
||||
/// implementation decides whether to retry (with an optional delay) or to
|
||||
/// surface a final error.
|
||||
pub(crate) async fn retry_stream<F, Fut, T, E>(max_attempts: u64, mut attempt_fn: F) -> Result<T>
|
||||
where
|
||||
F: FnMut(u64) -> Fut,
|
||||
Fut: std::future::Future<Output = std::result::Result<T, E>>,
|
||||
E: RetryableStreamError,
|
||||
{
|
||||
for attempt in 0..=max_attempts {
|
||||
match attempt_fn(attempt).await {
|
||||
Ok(value) => return Ok(value),
|
||||
Err(err) => {
|
||||
let delay = err.delay(attempt);
|
||||
|
||||
// Fatal error or final attempt: surface to caller.
|
||||
if attempt == max_attempts || delay.is_none() {
|
||||
return Err(err.into_error());
|
||||
}
|
||||
|
||||
if let Some(duration) = delay {
|
||||
tokio::time::sleep(duration).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
unreachable!("retry_stream should always return");
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct TestError {
|
||||
fatal: bool,
|
||||
}
|
||||
|
||||
impl RetryableStreamError for TestError {
|
||||
fn delay(&self, attempt: u64) -> Option<Duration> {
|
||||
if self.fatal {
|
||||
None
|
||||
} else {
|
||||
Some(Duration::from_millis(attempt * 10))
|
||||
}
|
||||
}
|
||||
|
||||
fn into_error(self) -> CodexErr {
|
||||
if self.fatal {
|
||||
CodexErr::InternalServerError
|
||||
} else {
|
||||
CodexErr::Io(std::io::Error::other("retryable"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn retries_until_success_before_max_attempts() {
|
||||
let max_attempts = 3;
|
||||
|
||||
let result: Result<&str> = retry_stream(max_attempts, |attempt| async move {
|
||||
if attempt < 2 {
|
||||
Err(TestError { fatal: false })
|
||||
} else {
|
||||
Ok("ok")
|
||||
}
|
||||
})
|
||||
.await;
|
||||
|
||||
assert_eq!(result.unwrap(), "ok");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn stops_on_fatal_error_without_retrying() {
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
let calls = Arc::new(AtomicUsize::new(0));
|
||||
let calls_ref = calls.clone();
|
||||
|
||||
let result: Result<()> = retry_stream(5, move |_attempt| {
|
||||
let calls_ref = calls_ref.clone();
|
||||
async move {
|
||||
calls_ref.fetch_add(1, Ordering::SeqCst);
|
||||
Err(TestError { fatal: true })
|
||||
}
|
||||
})
|
||||
.await;
|
||||
|
||||
assert!(result.is_err());
|
||||
assert_eq!(calls.load(Ordering::SeqCst), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn stops_after_max_attempts_for_retryable_errors() {
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
let calls = Arc::new(AtomicUsize::new(0));
|
||||
let calls_ref = calls.clone();
|
||||
|
||||
let max_attempts = 2;
|
||||
|
||||
let result: Result<()> = retry_stream(max_attempts, move |_attempt| {
|
||||
let calls_ref = calls_ref.clone();
|
||||
async move {
|
||||
calls_ref.fetch_add(1, Ordering::SeqCst);
|
||||
Err(TestError { fatal: false })
|
||||
}
|
||||
})
|
||||
.await;
|
||||
|
||||
assert!(result.is_err());
|
||||
assert_eq!(calls.load(Ordering::SeqCst), (max_attempts + 1) as usize);
|
||||
}
|
||||
}
|
||||
46
codex-rs/core/src/client/sse.rs
Normal file
46
codex-rs/core/src/client/sse.rs
Normal file
@@ -0,0 +1,46 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use codex_otel::otel_event_manager::OtelEventManager;
|
||||
use eventsource_stream::Event;
|
||||
use eventsource_stream::EventStreamError as StreamError;
|
||||
use futures::Stream;
|
||||
use futures::StreamExt;
|
||||
use tokio::time::timeout;
|
||||
|
||||
/// Result of polling the next SSE event with timeout and logging applied.
|
||||
pub(crate) enum SseNext {
|
||||
Event(Event),
|
||||
Eof,
|
||||
StreamError(String),
|
||||
Timeout,
|
||||
}
|
||||
|
||||
/// Read the next SSE event from `stream`, applying an idle timeout and recording
|
||||
/// telemetry via `otel_event_manager`.
|
||||
///
|
||||
/// This helper centralizes the boilerplate for:
|
||||
/// - `tokio::time::timeout`
|
||||
/// - calling `log_sse_event`
|
||||
/// - mapping the different outcomes into a small enum that callers can
|
||||
/// interpret according to their own protocol semantics.
|
||||
pub(crate) async fn next_sse_event<S, E>(
|
||||
stream: &mut S,
|
||||
idle_timeout: Duration,
|
||||
otel_event_manager: &OtelEventManager,
|
||||
) -> SseNext
|
||||
where
|
||||
S: Stream<Item = Result<Event, StreamError<E>>> + Unpin,
|
||||
E: std::fmt::Display,
|
||||
{
|
||||
let start = tokio::time::Instant::now();
|
||||
let next_event = timeout(idle_timeout, stream.next()).await;
|
||||
let duration = start.elapsed();
|
||||
otel_event_manager.log_sse_event(&next_event, duration);
|
||||
|
||||
match next_event {
|
||||
Ok(Some(Ok(ev))) => SseNext::Event(ev),
|
||||
Ok(Some(Err(e))) => SseNext::StreamError(e.to_string()),
|
||||
Ok(None) => SseNext::Eof,
|
||||
Err(_) => SseNext::Timeout,
|
||||
}
|
||||
}
|
||||
327
codex-rs/core/src/client/types.rs
Normal file
327
codex-rs/core/src/client/types.rs
Normal file
@@ -0,0 +1,327 @@
|
||||
use crate::error::Result;
|
||||
use crate::protocol::RateLimitSnapshot;
|
||||
use crate::protocol::SubAgentSource;
|
||||
use crate::protocol::TokenUsage;
|
||||
use crate::tools::spec::JsonSchema;
|
||||
use codex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig;
|
||||
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
|
||||
use codex_protocol::config_types::Verbosity as VerbosityConfig;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use futures::Stream;
|
||||
use serde::Serialize;
|
||||
use serde_json::Value;
|
||||
use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum ResponseEvent {
|
||||
Created,
|
||||
OutputItemDone(ResponseItem),
|
||||
OutputItemAdded(ResponseItem),
|
||||
Completed {
|
||||
response_id: String,
|
||||
token_usage: Option<TokenUsage>,
|
||||
},
|
||||
OutputTextDelta(String),
|
||||
ReasoningSummaryDelta {
|
||||
delta: String,
|
||||
summary_index: i64,
|
||||
},
|
||||
ReasoningContentDelta {
|
||||
delta: String,
|
||||
content_index: i64,
|
||||
},
|
||||
ReasoningSummaryPartAdded {
|
||||
summary_index: i64,
|
||||
},
|
||||
RateLimits(RateLimitSnapshot),
|
||||
}
|
||||
|
||||
pub(crate) fn subagent_label(sub: &SubAgentSource) -> String {
|
||||
if let SubAgentSource::Other(label) = sub {
|
||||
label.clone()
|
||||
} else {
|
||||
serde_json::to_value(sub)
|
||||
.ok()
|
||||
.and_then(|v| v.as_str().map(std::string::ToString::to_string))
|
||||
.unwrap_or_else(|| "other".to_string())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub(crate) struct Reasoning {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(crate) effort: Option<ReasoningEffortConfig>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(crate) summary: Option<ReasoningSummaryConfig>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Default, Clone)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub(crate) enum TextFormatType {
|
||||
#[default]
|
||||
JsonSchema,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Default, Clone)]
|
||||
pub(crate) struct TextFormat {
|
||||
pub(crate) r#type: TextFormatType,
|
||||
pub(crate) strict: bool,
|
||||
pub(crate) schema: Value,
|
||||
pub(crate) name: String,
|
||||
}
|
||||
|
||||
/// Controls under the `text` field in the Responses API for GPT-5.
|
||||
#[derive(Debug, Serialize, Default, Clone)]
|
||||
pub(crate) struct TextControls {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(crate) verbosity: Option<OpenAiVerbosity>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(crate) format: Option<TextFormat>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Default, Clone)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub(crate) enum OpenAiVerbosity {
|
||||
Low,
|
||||
#[default]
|
||||
Medium,
|
||||
High,
|
||||
}
|
||||
|
||||
impl From<VerbosityConfig> for OpenAiVerbosity {
|
||||
fn from(v: VerbosityConfig) -> Self {
|
||||
match v {
|
||||
VerbosityConfig::Low => OpenAiVerbosity::Low,
|
||||
VerbosityConfig::Medium => OpenAiVerbosity::Medium,
|
||||
VerbosityConfig::High => OpenAiVerbosity::High,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Request object that is serialized as JSON and POST'ed when using the
|
||||
/// Responses API.
|
||||
#[derive(Debug, Serialize)]
|
||||
pub(crate) struct ResponsesApiRequest<'a> {
|
||||
pub(crate) model: &'a str,
|
||||
pub(crate) instructions: &'a str,
|
||||
// TODO(mbolin): ResponseItem::Other should not be serialized. Currently,
|
||||
// we code defensively to avoid this case, but perhaps we should use a
|
||||
// separate enum for serialization.
|
||||
pub(crate) input: &'a Vec<ResponseItem>,
|
||||
pub(crate) tools: &'a [serde_json::Value],
|
||||
pub(crate) tool_choice: &'static str,
|
||||
pub(crate) parallel_tool_calls: bool,
|
||||
pub(crate) reasoning: Option<Reasoning>,
|
||||
pub(crate) store: bool,
|
||||
pub(crate) stream: bool,
|
||||
pub(crate) include: Vec<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(crate) prompt_cache_key: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(crate) text: Option<TextControls>,
|
||||
}
|
||||
|
||||
pub(crate) mod tools {
|
||||
use super::JsonSchema;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
|
||||
/// When serialized as JSON, this produces a valid "Tool" in the OpenAI
|
||||
/// Responses API.
|
||||
#[derive(Debug, Clone, Serialize, PartialEq)]
|
||||
#[serde(tag = "type")]
|
||||
pub(crate) enum ToolSpec {
|
||||
#[serde(rename = "function")]
|
||||
Function(ResponsesApiTool),
|
||||
#[serde(rename = "local_shell")]
|
||||
LocalShell {},
|
||||
// TODO: Understand why we get an error on web_search although the API docs say it's supported.
|
||||
// https://platform.openai.com/docs/guides/tools-web-search?api-mode=responses#:~:text=%7B%20type%3A%20%22web_search%22%20%7D%2C
|
||||
#[serde(rename = "web_search")]
|
||||
WebSearch {},
|
||||
#[serde(rename = "custom")]
|
||||
Freeform(FreeformTool),
|
||||
}
|
||||
|
||||
impl ToolSpec {
|
||||
pub(crate) fn name(&self) -> &str {
|
||||
match self {
|
||||
ToolSpec::Function(tool) => tool.name.as_str(),
|
||||
ToolSpec::LocalShell {} => "local_shell",
|
||||
ToolSpec::WebSearch {} => "web_search",
|
||||
ToolSpec::Freeform(tool) => tool.name.as_str(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub struct FreeformTool {
|
||||
pub(crate) name: String,
|
||||
pub(crate) description: String,
|
||||
pub(crate) format: FreeformToolFormat,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub struct FreeformToolFormat {
|
||||
pub(crate) r#type: String,
|
||||
pub(crate) syntax: String,
|
||||
pub(crate) definition: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, PartialEq)]
|
||||
pub struct ResponsesApiTool {
|
||||
pub(crate) name: String,
|
||||
pub(crate) description: String,
|
||||
/// TODO: Validation. When strict is set to true, the JSON schema,
|
||||
/// `required` and `additional_properties` must be present. All fields in
|
||||
/// `properties` must be present in `required`.
|
||||
pub(crate) strict: bool,
|
||||
pub(crate) parameters: JsonSchema,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) use tools::FreeformTool;
|
||||
pub(crate) use tools::FreeformToolFormat;
|
||||
pub(crate) use tools::ResponsesApiTool;
|
||||
pub(crate) use tools::ToolSpec;
|
||||
|
||||
pub(crate) fn create_text_param_for_request(
|
||||
verbosity: Option<VerbosityConfig>,
|
||||
output_schema: &Option<Value>,
|
||||
) -> Option<TextControls> {
|
||||
if verbosity.is_none() && output_schema.is_none() {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(TextControls {
|
||||
verbosity: verbosity.map(std::convert::Into::into),
|
||||
format: output_schema.as_ref().map(|schema| TextFormat {
|
||||
r#type: TextFormatType::JsonSchema,
|
||||
strict: true,
|
||||
schema: schema.clone(),
|
||||
name: "codex_output_schema".to_string(),
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
pub struct ResponseStream {
|
||||
pub(crate) rx_event: mpsc::Receiver<Result<ResponseEvent>>,
|
||||
}
|
||||
|
||||
impl Stream for ResponseStream {
|
||||
type Item = Result<ResponseEvent>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
self.rx_event.poll_recv(cx)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
fn serializes_text_verbosity_when_set() {
|
||||
let input: Vec<ResponseItem> = vec![];
|
||||
let tools: Vec<serde_json::Value> = vec![];
|
||||
let req = ResponsesApiRequest {
|
||||
model: "gpt-5",
|
||||
instructions: "i",
|
||||
input: &input,
|
||||
tools: &tools,
|
||||
tool_choice: "auto",
|
||||
parallel_tool_calls: true,
|
||||
reasoning: None,
|
||||
store: false,
|
||||
stream: true,
|
||||
include: vec![],
|
||||
prompt_cache_key: None,
|
||||
text: Some(TextControls {
|
||||
verbosity: Some(OpenAiVerbosity::Low),
|
||||
format: None,
|
||||
}),
|
||||
};
|
||||
|
||||
let v = serde_json::to_value(&req).expect("json");
|
||||
assert_eq!(
|
||||
v.get("text")
|
||||
.and_then(|t| t.get("verbosity"))
|
||||
.and_then(|s| s.as_str()),
|
||||
Some("low")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serializes_text_schema_with_strict_format() {
|
||||
let input: Vec<ResponseItem> = vec![];
|
||||
let tools: Vec<serde_json::Value> = vec![];
|
||||
let schema = serde_json::json!({
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"answer": {"type": "string"}
|
||||
},
|
||||
"required": ["answer"],
|
||||
});
|
||||
let text_controls =
|
||||
create_text_param_for_request(None, &Some(schema.clone())).expect("text controls");
|
||||
|
||||
let req = ResponsesApiRequest {
|
||||
model: "gpt-5",
|
||||
instructions: "i",
|
||||
input: &input,
|
||||
tools: &tools,
|
||||
tool_choice: "auto",
|
||||
parallel_tool_calls: true,
|
||||
reasoning: None,
|
||||
store: false,
|
||||
stream: true,
|
||||
include: vec![],
|
||||
prompt_cache_key: None,
|
||||
text: Some(text_controls),
|
||||
};
|
||||
|
||||
let v = serde_json::to_value(&req).expect("json");
|
||||
let text = v.get("text").expect("text field");
|
||||
assert!(text.get("verbosity").is_none());
|
||||
let format = text.get("format").expect("format field");
|
||||
|
||||
assert_eq!(
|
||||
format.get("name"),
|
||||
Some(&serde_json::Value::String("codex_output_schema".into()))
|
||||
);
|
||||
assert_eq!(
|
||||
format.get("type"),
|
||||
Some(&serde_json::Value::String("json_schema".into()))
|
||||
);
|
||||
assert_eq!(format.get("strict"), Some(&serde_json::Value::Bool(true)));
|
||||
assert_eq!(format.get("schema"), Some(&schema));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn omits_text_when_not_set() {
|
||||
let input: Vec<ResponseItem> = vec![];
|
||||
let tools: Vec<serde_json::Value> = vec![];
|
||||
let req = ResponsesApiRequest {
|
||||
model: "gpt-5",
|
||||
instructions: "i",
|
||||
input: &input,
|
||||
tools: &tools,
|
||||
tool_choice: "auto",
|
||||
parallel_tool_calls: true,
|
||||
reasoning: None,
|
||||
store: false,
|
||||
stream: true,
|
||||
include: vec![],
|
||||
prompt_cache_key: None,
|
||||
text: None,
|
||||
};
|
||||
|
||||
let v = serde_json::to_value(&req).expect("json");
|
||||
assert!(v.get("text").is_none());
|
||||
}
|
||||
}
|
||||
@@ -1,24 +1,12 @@
|
||||
use crate::client_common::tools::ToolSpec;
|
||||
use crate::error::Result;
|
||||
use crate::client::ToolSpec;
|
||||
use crate::model_family::ModelFamily;
|
||||
use crate::protocol::RateLimitSnapshot;
|
||||
use crate::protocol::TokenUsage;
|
||||
use codex_apply_patch::APPLY_PATCH_TOOL_INSTRUCTIONS;
|
||||
use codex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig;
|
||||
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
|
||||
use codex_protocol::config_types::Verbosity as VerbosityConfig;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use futures::Stream;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use serde_json::Value;
|
||||
use std::borrow::Cow;
|
||||
use std::collections::HashSet;
|
||||
use std::ops::Deref;
|
||||
use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
/// Review thread system prompt. Edit `core/src/review_prompt.md` to customize.
|
||||
pub const REVIEW_PROMPT: &str = include_str!("../review_prompt.md");
|
||||
@@ -193,194 +181,6 @@ fn strip_total_output_header(output: &str) -> Option<&str> {
|
||||
Some(remainder)
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum ResponseEvent {
|
||||
Created,
|
||||
OutputItemDone(ResponseItem),
|
||||
OutputItemAdded(ResponseItem),
|
||||
Completed {
|
||||
response_id: String,
|
||||
token_usage: Option<TokenUsage>,
|
||||
},
|
||||
OutputTextDelta(String),
|
||||
ReasoningSummaryDelta {
|
||||
delta: String,
|
||||
summary_index: i64,
|
||||
},
|
||||
ReasoningContentDelta {
|
||||
delta: String,
|
||||
content_index: i64,
|
||||
},
|
||||
ReasoningSummaryPartAdded {
|
||||
summary_index: i64,
|
||||
},
|
||||
RateLimits(RateLimitSnapshot),
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub(crate) struct Reasoning {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(crate) effort: Option<ReasoningEffortConfig>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(crate) summary: Option<ReasoningSummaryConfig>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Default, Clone)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub(crate) enum TextFormatType {
|
||||
#[default]
|
||||
JsonSchema,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Default, Clone)]
|
||||
pub(crate) struct TextFormat {
|
||||
pub(crate) r#type: TextFormatType,
|
||||
pub(crate) strict: bool,
|
||||
pub(crate) schema: Value,
|
||||
pub(crate) name: String,
|
||||
}
|
||||
|
||||
/// Controls under the `text` field in the Responses API for GPT-5.
|
||||
#[derive(Debug, Serialize, Default, Clone)]
|
||||
pub(crate) struct TextControls {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(crate) verbosity: Option<OpenAiVerbosity>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(crate) format: Option<TextFormat>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Default, Clone)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub(crate) enum OpenAiVerbosity {
|
||||
Low,
|
||||
#[default]
|
||||
Medium,
|
||||
High,
|
||||
}
|
||||
|
||||
impl From<VerbosityConfig> for OpenAiVerbosity {
|
||||
fn from(v: VerbosityConfig) -> Self {
|
||||
match v {
|
||||
VerbosityConfig::Low => OpenAiVerbosity::Low,
|
||||
VerbosityConfig::Medium => OpenAiVerbosity::Medium,
|
||||
VerbosityConfig::High => OpenAiVerbosity::High,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Request object that is serialized as JSON and POST'ed when using the
|
||||
/// Responses API.
|
||||
#[derive(Debug, Serialize)]
|
||||
pub(crate) struct ResponsesApiRequest<'a> {
|
||||
pub(crate) model: &'a str,
|
||||
pub(crate) instructions: &'a str,
|
||||
// TODO(mbolin): ResponseItem::Other should not be serialized. Currently,
|
||||
// we code defensively to avoid this case, but perhaps we should use a
|
||||
// separate enum for serialization.
|
||||
pub(crate) input: &'a Vec<ResponseItem>,
|
||||
pub(crate) tools: &'a [serde_json::Value],
|
||||
pub(crate) tool_choice: &'static str,
|
||||
pub(crate) parallel_tool_calls: bool,
|
||||
pub(crate) reasoning: Option<Reasoning>,
|
||||
pub(crate) store: bool,
|
||||
pub(crate) stream: bool,
|
||||
pub(crate) include: Vec<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(crate) prompt_cache_key: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(crate) text: Option<TextControls>,
|
||||
}
|
||||
|
||||
pub(crate) mod tools {
|
||||
use crate::tools::spec::JsonSchema;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
|
||||
/// When serialized as JSON, this produces a valid "Tool" in the OpenAI
|
||||
/// Responses API.
|
||||
#[derive(Debug, Clone, Serialize, PartialEq)]
|
||||
#[serde(tag = "type")]
|
||||
pub(crate) enum ToolSpec {
|
||||
#[serde(rename = "function")]
|
||||
Function(ResponsesApiTool),
|
||||
#[serde(rename = "local_shell")]
|
||||
LocalShell {},
|
||||
// TODO: Understand why we get an error on web_search although the API docs say it's supported.
|
||||
// https://platform.openai.com/docs/guides/tools-web-search?api-mode=responses#:~:text=%7B%20type%3A%20%22web_search%22%20%7D%2C
|
||||
#[serde(rename = "web_search")]
|
||||
WebSearch {},
|
||||
#[serde(rename = "custom")]
|
||||
Freeform(FreeformTool),
|
||||
}
|
||||
|
||||
impl ToolSpec {
|
||||
pub(crate) fn name(&self) -> &str {
|
||||
match self {
|
||||
ToolSpec::Function(tool) => tool.name.as_str(),
|
||||
ToolSpec::LocalShell {} => "local_shell",
|
||||
ToolSpec::WebSearch {} => "web_search",
|
||||
ToolSpec::Freeform(tool) => tool.name.as_str(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub struct FreeformTool {
|
||||
pub(crate) name: String,
|
||||
pub(crate) description: String,
|
||||
pub(crate) format: FreeformToolFormat,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub struct FreeformToolFormat {
|
||||
pub(crate) r#type: String,
|
||||
pub(crate) syntax: String,
|
||||
pub(crate) definition: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, PartialEq)]
|
||||
pub struct ResponsesApiTool {
|
||||
pub(crate) name: String,
|
||||
pub(crate) description: String,
|
||||
/// TODO: Validation. When strict is set to true, the JSON schema,
|
||||
/// `required` and `additional_properties` must be present. All fields in
|
||||
/// `properties` must be present in `required`.
|
||||
pub(crate) strict: bool,
|
||||
pub(crate) parameters: JsonSchema,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn create_text_param_for_request(
|
||||
verbosity: Option<VerbosityConfig>,
|
||||
output_schema: &Option<Value>,
|
||||
) -> Option<TextControls> {
|
||||
if verbosity.is_none() && output_schema.is_none() {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(TextControls {
|
||||
verbosity: verbosity.map(std::convert::Into::into),
|
||||
format: output_schema.as_ref().map(|schema| TextFormat {
|
||||
r#type: TextFormatType::JsonSchema,
|
||||
strict: true,
|
||||
schema: schema.clone(),
|
||||
name: "codex_output_schema".to_string(),
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
pub struct ResponseStream {
|
||||
pub(crate) rx_event: mpsc::Receiver<Result<ResponseEvent>>,
|
||||
}
|
||||
|
||||
impl Stream for ResponseStream {
|
||||
type Item = Result<ResponseEvent>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
self.rx_event.poll_recv(cx)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::model_family::find_family_for_model;
|
||||
@@ -451,104 +251,4 @@ mod tests {
|
||||
assert_eq!(full, expected);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serializes_text_verbosity_when_set() {
|
||||
let input: Vec<ResponseItem> = vec![];
|
||||
let tools: Vec<serde_json::Value> = vec![];
|
||||
let req = ResponsesApiRequest {
|
||||
model: "gpt-5",
|
||||
instructions: "i",
|
||||
input: &input,
|
||||
tools: &tools,
|
||||
tool_choice: "auto",
|
||||
parallel_tool_calls: true,
|
||||
reasoning: None,
|
||||
store: false,
|
||||
stream: true,
|
||||
include: vec![],
|
||||
prompt_cache_key: None,
|
||||
text: Some(TextControls {
|
||||
verbosity: Some(OpenAiVerbosity::Low),
|
||||
format: None,
|
||||
}),
|
||||
};
|
||||
|
||||
let v = serde_json::to_value(&req).expect("json");
|
||||
assert_eq!(
|
||||
v.get("text")
|
||||
.and_then(|t| t.get("verbosity"))
|
||||
.and_then(|s| s.as_str()),
|
||||
Some("low")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serializes_text_schema_with_strict_format() {
|
||||
let input: Vec<ResponseItem> = vec![];
|
||||
let tools: Vec<serde_json::Value> = vec![];
|
||||
let schema = serde_json::json!({
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"answer": {"type": "string"}
|
||||
},
|
||||
"required": ["answer"],
|
||||
});
|
||||
let text_controls =
|
||||
create_text_param_for_request(None, &Some(schema.clone())).expect("text controls");
|
||||
|
||||
let req = ResponsesApiRequest {
|
||||
model: "gpt-5",
|
||||
instructions: "i",
|
||||
input: &input,
|
||||
tools: &tools,
|
||||
tool_choice: "auto",
|
||||
parallel_tool_calls: true,
|
||||
reasoning: None,
|
||||
store: false,
|
||||
stream: true,
|
||||
include: vec![],
|
||||
prompt_cache_key: None,
|
||||
text: Some(text_controls),
|
||||
};
|
||||
|
||||
let v = serde_json::to_value(&req).expect("json");
|
||||
let text = v.get("text").expect("text field");
|
||||
assert!(text.get("verbosity").is_none());
|
||||
let format = text.get("format").expect("format field");
|
||||
|
||||
assert_eq!(
|
||||
format.get("name"),
|
||||
Some(&serde_json::Value::String("codex_output_schema".into()))
|
||||
);
|
||||
assert_eq!(
|
||||
format.get("type"),
|
||||
Some(&serde_json::Value::String("json_schema".into()))
|
||||
);
|
||||
assert_eq!(format.get("strict"), Some(&serde_json::Value::Bool(true)));
|
||||
assert_eq!(format.get("schema"), Some(&schema));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn omits_text_when_not_set() {
|
||||
let input: Vec<ResponseItem> = vec![];
|
||||
let tools: Vec<serde_json::Value> = vec![];
|
||||
let req = ResponsesApiRequest {
|
||||
model: "gpt-5",
|
||||
instructions: "i",
|
||||
input: &input,
|
||||
tools: &tools,
|
||||
tool_choice: "auto",
|
||||
parallel_tool_calls: true,
|
||||
reasoning: None,
|
||||
store: false,
|
||||
stream: true,
|
||||
include: vec![],
|
||||
prompt_cache_key: None,
|
||||
text: None,
|
||||
};
|
||||
|
||||
let v = serde_json::to_value(&req).expect("json");
|
||||
assert!(v.get("text").is_none());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
|
||||
use crate::AuthManager;
|
||||
use crate::ResponseEvent;
|
||||
use crate::client_common::REVIEW_PROMPT;
|
||||
use crate::compact;
|
||||
use crate::features::Feature;
|
||||
@@ -55,7 +56,6 @@ use tracing::warn;
|
||||
use crate::ModelProviderInfo;
|
||||
use crate::client::ModelClient;
|
||||
use crate::client_common::Prompt;
|
||||
use crate::client_common::ResponseEvent;
|
||||
use crate::config::Config;
|
||||
use crate::config::types::McpServerTransportConfig;
|
||||
use crate::config::types::ShellEnvironmentPolicy;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::Prompt;
|
||||
use crate::client_common::ResponseEvent;
|
||||
use crate::ResponseEvent;
|
||||
use crate::codex::Session;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::codex::get_last_assistant_message_from_turn;
|
||||
|
||||
@@ -8,8 +8,7 @@
|
||||
mod apply_patch;
|
||||
pub mod auth;
|
||||
pub mod bash;
|
||||
mod chat_completions;
|
||||
mod client;
|
||||
pub mod client;
|
||||
mod client_common;
|
||||
pub mod codex;
|
||||
mod codex_conversation;
|
||||
@@ -54,7 +53,6 @@ pub use conversation_manager::NewConversation;
|
||||
// Re-export common auth types for workspace consumers
|
||||
pub use auth::AuthManager;
|
||||
pub use auth::CodexAuth;
|
||||
pub mod default_client;
|
||||
pub mod model_family;
|
||||
mod openai_model_info;
|
||||
pub mod project_doc;
|
||||
@@ -96,10 +94,10 @@ pub use codex_protocol::protocol;
|
||||
pub use codex_protocol::config_types as protocol_config_types;
|
||||
|
||||
pub use client::ModelClient;
|
||||
pub use client::ResponseEvent;
|
||||
pub use client::ResponseStream;
|
||||
pub use client_common::Prompt;
|
||||
pub use client_common::REVIEW_PROMPT;
|
||||
pub use client_common::ResponseEvent;
|
||||
pub use client_common::ResponseStream;
|
||||
pub use codex_protocol::models::ContentItem;
|
||||
pub use codex_protocol::models::LocalShellAction;
|
||||
pub use codex_protocol::models::LocalShellExecAction;
|
||||
|
||||
@@ -6,16 +6,15 @@
|
||||
//! key. These override or extend the defaults at runtime.
|
||||
|
||||
use crate::CodexAuth;
|
||||
use crate::default_client::CodexHttpClient;
|
||||
use crate::default_client::CodexRequestBuilder;
|
||||
use crate::client::http::CodexHttpClient;
|
||||
use crate::client::http::CodexRequestBuilder;
|
||||
use crate::error::EnvVarError;
|
||||
use codex_app_server_protocol::AuthMode;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use std::collections::HashMap;
|
||||
use std::env::VarError;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::error::EnvVarError;
|
||||
const DEFAULT_STREAM_IDLE_TIMEOUT_MS: u64 = 300_000;
|
||||
const DEFAULT_STREAM_MAX_RETRIES: u64 = 5;
|
||||
const DEFAULT_REQUEST_MAX_RETRIES: u64 = 4;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use crate::client::http::originator;
|
||||
use crate::config::Config;
|
||||
use crate::config::types::OtelExporterKind as Kind;
|
||||
use crate::config::types::OtelHttpProtocol as Protocol;
|
||||
use crate::default_client::originator;
|
||||
use codex_otel::config::OtelExporter;
|
||||
use codex_otel::config::OtelHttpProtocol;
|
||||
use codex_otel::config::OtelSettings;
|
||||
|
||||
@@ -23,8 +23,8 @@ use super::list::ConversationsPage;
|
||||
use super::list::Cursor;
|
||||
use super::list::get_conversations;
|
||||
use super::policy::is_persisted_response_item;
|
||||
use crate::client::http::originator;
|
||||
use crate::config::Config;
|
||||
use crate::default_client::originator;
|
||||
use crate::git_info::collect_git_info;
|
||||
use codex_protocol::protocol::InitialHistory;
|
||||
use codex_protocol::protocol::ResumedHistory;
|
||||
|
||||
@@ -6,9 +6,9 @@ use std::time::Instant;
|
||||
|
||||
use crate::AuthManager;
|
||||
use crate::ModelProviderInfo;
|
||||
use crate::ResponseEvent;
|
||||
use crate::client::ModelClient;
|
||||
use crate::client_common::Prompt;
|
||||
use crate::client_common::ResponseEvent;
|
||||
use crate::config::Config;
|
||||
use crate::protocol::SandboxPolicy;
|
||||
use askama::Template;
|
||||
|
||||
@@ -3,10 +3,10 @@ use std::collections::BTreeMap;
|
||||
use crate::apply_patch;
|
||||
use crate::apply_patch::InternalApplyPatchInvocation;
|
||||
use crate::apply_patch::convert_apply_patch_to_protocol;
|
||||
use crate::client_common::tools::FreeformTool;
|
||||
use crate::client_common::tools::FreeformToolFormat;
|
||||
use crate::client_common::tools::ResponsesApiTool;
|
||||
use crate::client_common::tools::ToolSpec;
|
||||
use crate::client::FreeformTool;
|
||||
use crate::client::FreeformToolFormat;
|
||||
use crate::client::ResponsesApiTool;
|
||||
use crate::client::ToolSpec;
|
||||
use crate::function_tool::FunctionCallError;
|
||||
use crate::tools::context::ToolInvocation;
|
||||
use crate::tools::context::ToolOutput;
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use crate::client_common::tools::ResponsesApiTool;
|
||||
use crate::client_common::tools::ToolSpec;
|
||||
use crate::client::ResponsesApiTool;
|
||||
use crate::client::ToolSpec;
|
||||
use crate::codex::Session;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::function_tool::FunctionCallError;
|
||||
|
||||
@@ -2,7 +2,7 @@ use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::client_common::tools::ToolSpec;
|
||||
use crate::client::ToolSpec;
|
||||
use crate::function_tool::FunctionCallError;
|
||||
use crate::tools::context::ToolInvocation;
|
||||
use crate::tools::context::ToolOutput;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::client_common::tools::ToolSpec;
|
||||
use crate::client::ToolSpec;
|
||||
use crate::codex::Session;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::function_tool::FunctionCallError;
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use crate::client_common::tools::ResponsesApiTool;
|
||||
use crate::client_common::tools::ToolSpec;
|
||||
use crate::client::ResponsesApiTool;
|
||||
use crate::client::ToolSpec;
|
||||
use crate::features::Feature;
|
||||
use crate::features::Features;
|
||||
use crate::model_family::ModelFamily;
|
||||
@@ -1074,7 +1074,7 @@ pub(crate) fn build_specs(
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::client_common::tools::FreeformTool;
|
||||
use crate::client::FreeformTool;
|
||||
use crate::model_family::find_family_for_model;
|
||||
use crate::tools::registry::ConfiguredToolSpec;
|
||||
use mcp_types::ToolInputSchema;
|
||||
|
||||
@@ -10,14 +10,19 @@ mod event_processor_with_human_output;
|
||||
pub mod event_processor_with_jsonl_output;
|
||||
pub mod exec_events;
|
||||
|
||||
use crate::cli::Command as ExecCommand;
|
||||
use crate::event_processor::CodexStatus;
|
||||
use crate::event_processor::EventProcessor;
|
||||
pub use cli::Cli;
|
||||
use codex_core::AuthManager;
|
||||
use codex_core::BUILT_IN_OSS_MODEL_PROVIDER_ID;
|
||||
use codex_core::ConversationManager;
|
||||
use codex_core::NewConversation;
|
||||
use codex_core::auth::enforce_login_restrictions;
|
||||
use codex_core::client::http::set_default_originator;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::config::ConfigOverrides;
|
||||
use codex_core::find_conversation_path_by_id_str;
|
||||
use codex_core::git_info::get_git_repo_root;
|
||||
use codex_core::protocol::AskForApproval;
|
||||
use codex_core::protocol::Event;
|
||||
@@ -41,12 +46,6 @@ use tracing::info;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
use tracing_subscriber::prelude::*;
|
||||
|
||||
use crate::cli::Command as ExecCommand;
|
||||
use crate::event_processor::CodexStatus;
|
||||
use crate::event_processor::EventProcessor;
|
||||
use codex_core::default_client::set_default_originator;
|
||||
use codex_core::find_conversation_path_by_id_str;
|
||||
|
||||
pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> anyhow::Result<()> {
|
||||
if let Err(err) = set_default_originator("codex_exec".to_string()) {
|
||||
tracing::warn!(?err, "Failed to set codex exec originator override {err:?}");
|
||||
|
||||
@@ -17,7 +17,7 @@ use chrono::Utc;
|
||||
use codex_core::auth::AuthCredentialsStoreMode;
|
||||
use codex_core::auth::AuthDotJson;
|
||||
use codex_core::auth::save_auth;
|
||||
use codex_core::default_client::originator;
|
||||
use codex_core::client::http::originator;
|
||||
use codex_core::token_data::TokenData;
|
||||
use codex_core::token_data::parse_id_token;
|
||||
use rand::RngCore;
|
||||
|
||||
@@ -12,9 +12,9 @@ use codex_protocol::protocol::SessionSource;
|
||||
|
||||
use codex_core::AuthManager;
|
||||
use codex_core::ConversationManager;
|
||||
use codex_core::client::http::USER_AGENT_SUFFIX;
|
||||
use codex_core::client::http::get_codex_user_agent;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::default_client::USER_AGENT_SUFFIX;
|
||||
use codex_core::default_client::get_codex_user_agent;
|
||||
use codex_core::protocol::Submission;
|
||||
use mcp_types::CallToolRequestParams;
|
||||
use mcp_types::CallToolResult;
|
||||
|
||||
Reference in New Issue
Block a user