Compare commits

...

1 Commits

Author SHA1 Message Date
kevin zhao
31478880b0 initial commit 2025-10-22 14:00:04 -07:00
17 changed files with 259 additions and 121 deletions

1
.gitignore vendored
View File

@@ -40,6 +40,7 @@ AGENTS.override.md
.nyc_output/
.jest/
*.tsbuildinfo
codex-backend/
# logs
*.log

1
codex-rs/Cargo.lock generated
View File

@@ -1060,6 +1060,7 @@ dependencies = [
"codex-app-server-protocol",
"codex-apply-patch",
"codex-async-utils",
"codex-backend-openapi-models",
"codex-file-search",
"codex-mcp-client",
"codex-otel",

View File

@@ -59,6 +59,7 @@ codex-apply-patch = { path = "apply-patch" }
codex-arg0 = { path = "arg0" }
codex-async-utils = { path = "async-utils" }
codex-backend-client = { path = "backend-client" }
codex-backend-openapi-models = { path = "codex-backend-openapi-models" }
codex-chatgpt = { path = "chatgpt" }
codex-common = { path = "common" }
codex-core = { path = "core" }

View File

@@ -1,13 +1,12 @@
use crate::types::CodeTaskDetailsResponse;
use crate::types::PaginatedListTaskListItem;
use crate::types::RateLimitStatusPayload;
use crate::types::RateLimitWindowSnapshot;
use crate::types::TurnAttemptsSiblingTurnsResponse;
use anyhow::Result;
use codex_core::auth::CodexAuth;
use codex_core::default_client::get_codex_user_agent;
use codex_core::rate_limits::rate_limit_snapshot_from_usage_payload;
use codex_protocol::protocol::RateLimitSnapshot;
use codex_protocol::protocol::RateLimitWindow;
use reqwest::header::AUTHORIZATION;
use reqwest::header::CONTENT_TYPE;
use reqwest::header::HeaderMap;
@@ -163,7 +162,7 @@ impl Client {
let req = self.http.get(&url).headers(self.headers());
let (body, ct) = self.exec_request(req, "GET", &url).await?;
let payload: RateLimitStatusPayload = self.decode_json(&url, &ct, &body)?;
Ok(Self::rate_limit_snapshot_from_payload(payload))
Ok(rate_limit_snapshot_from_usage_payload(payload))
}
pub async fn list_tasks(
@@ -269,49 +268,4 @@ impl Client {
Err(e) => anyhow::bail!("Decode error for {url}: {e}; content-type={ct}; body={body}"),
}
}
// rate limit helpers
fn rate_limit_snapshot_from_payload(payload: RateLimitStatusPayload) -> RateLimitSnapshot {
let Some(details) = payload
.rate_limit
.and_then(|inner| inner.map(|boxed| *boxed))
else {
return RateLimitSnapshot {
primary: None,
secondary: None,
};
};
RateLimitSnapshot {
primary: Self::map_rate_limit_window(details.primary_window),
secondary: Self::map_rate_limit_window(details.secondary_window),
}
}
fn map_rate_limit_window(
window: Option<Option<Box<RateLimitWindowSnapshot>>>,
) -> Option<RateLimitWindow> {
let snapshot = match window {
Some(Some(snapshot)) => *snapshot,
_ => return None,
};
let used_percent = f64::from(snapshot.used_percent);
let window_minutes = Self::window_minutes_from_seconds(snapshot.limit_window_seconds);
let resets_at = Some(i64::from(snapshot.reset_at));
Some(RateLimitWindow {
used_percent,
window_minutes,
resets_at,
})
}
fn window_minutes_from_seconds(seconds: i32) -> Option<i64> {
if seconds <= 0 {
return None;
}
let seconds_i64 = i64::from(seconds);
Some((seconds_i64 + 59) / 60)
}
}

View File

@@ -29,6 +29,7 @@ codex-rmcp-client = { workspace = true }
codex-async-utils = { workspace = true }
codex-utils-string = { workspace = true }
codex-utils-pty = { workspace = true }
codex-backend-openapi-models = { workspace = true }
dirs = { workspace = true }
dunce = { workspace = true }
env-flags = { workspace = true }

View File

@@ -751,9 +751,6 @@ where
// Not an assistant message forward immediately.
return Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone(item))));
}
Poll::Ready(Some(Ok(ResponseEvent::RateLimits(snapshot)))) => {
return Poll::Ready(Some(Ok(ResponseEvent::RateLimits(snapshot))));
}
Poll::Ready(Some(Ok(ResponseEvent::Completed {
response_id,
token_usage,

View File

@@ -120,6 +120,10 @@ impl ModelClient {
.map(|w| w.saturating_mul(pct) / 100)
}
pub(crate) fn backend_base_url(&self) -> &str {
self.config.chatgpt_base_url.as_str()
}
pub fn get_auto_compact_token_limit(&self) -> Option<i64> {
self.config.model_auto_compact_token_limit.or_else(|| {
get_model_info(&self.config.model_family).and_then(|info| info.auto_compact_token_limit)
@@ -348,15 +352,6 @@ impl ModelClient {
Ok(resp) if resp.status().is_success() => {
let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent>>(1600);
if let Some(snapshot) = parse_rate_limit_snapshot(resp.headers())
&& tx_event
.send(Ok(ResponseEvent::RateLimits(snapshot)))
.await
.is_err()
{
debug!("receiver dropped rate limit snapshot event");
}
// spawn task to process SSE
let stream = resp.bytes_stream().map_err(move |e| {
CodexErr::ResponseStreamFailed(ResponseStreamFailed {

View File

@@ -1,7 +1,6 @@
use crate::client_common::tools::ToolSpec;
use crate::error::Result;
use crate::model_family::ModelFamily;
use crate::protocol::RateLimitSnapshot;
use crate::protocol::TokenUsage;
use codex_apply_patch::APPLY_PATCH_TOOL_INSTRUCTIONS;
use codex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig;
@@ -203,7 +202,6 @@ pub enum ResponseEvent {
WebSearchCallBegin {
call_id: String,
},
RateLimits(RateLimitSnapshot),
}
#[derive(Debug, Serialize)]

View File

@@ -17,7 +17,9 @@ use crate::terminal;
use crate::user_notification::UserNotifier;
use async_channel::Receiver;
use async_channel::Sender;
use codex_app_server_protocol::AuthMode;
use codex_apply_patch::ApplyPatchAction;
use codex_backend_openapi_models::models::RateLimitStatusPayload as BackendRateLimitStatusPayload;
use codex_protocol::ConversationId;
use codex_protocol::items::TurnItem;
use codex_protocol::items::UserMessageItem;
@@ -39,6 +41,7 @@ use mcp_types::ListResourcesRequestParams;
use mcp_types::ListResourcesResult;
use mcp_types::ReadResourceRequestParams;
use mcp_types::ReadResourceResult;
use reqwest::StatusCode;
use serde_json;
use serde_json::Value;
use tokio::sync::Mutex;
@@ -59,11 +62,14 @@ use crate::config::Config;
use crate::config_types::McpServerTransportConfig;
use crate::config_types::ShellEnvironmentPolicy;
use crate::conversation_history::ConversationHistory;
use crate::default_client::create_client;
use crate::default_client::get_codex_user_agent;
use crate::environment_context::EnvironmentContext;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
#[cfg(test)]
use crate::exec::StreamOutput;
use crate::rate_limits::rate_limit_snapshot_from_usage_payload;
// Removed: legacy executor wiring replaced by ToolOrchestrator flows.
// legacy normalize_exec_result no longer used after orchestrator migration
use crate::mcp::auth::compute_auth_statuses;
@@ -352,6 +358,27 @@ pub(crate) struct SessionSettingsUpdate {
pub(crate) final_output_json_schema: Option<Option<Value>>,
}
#[derive(Debug)]
enum RateLimitFetchError {
MissingAuth,
WrongAuthMode,
Request(String),
}
impl RateLimitFetchError {
fn message(&self) -> String {
match self {
RateLimitFetchError::MissingAuth => {
"codex account authentication required to read rate limits".to_string()
}
RateLimitFetchError::WrongAuthMode => {
"chatgpt authentication required to read rate limits".to_string()
}
RateLimitFetchError::Request(message) => message.clone(),
}
}
}
impl Session {
fn make_turn_context(
auth_manager: Option<Arc<AuthManager>>,
@@ -895,10 +922,11 @@ impl Session {
state.history_snapshot()
}
async fn update_token_usage_info(
async fn update_usage_snapshot(
&self,
turn_context: &TurnContext,
token_usage: Option<&TokenUsage>,
rate_limits: Option<RateLimitSnapshot>,
) {
{
let mut state = self.state.lock().await;
@@ -908,20 +936,93 @@ impl Session {
turn_context.client.get_model_context_window(),
);
}
if let Some(snapshot) = rate_limits {
state.set_rate_limits(snapshot);
}
}
self.send_token_count_event(turn_context).await;
}
async fn update_rate_limits(
async fn refresh_usage_after_turn(
&self,
turn_context: &TurnContext,
new_rate_limits: RateLimitSnapshot,
token_usage: Option<&TokenUsage>,
) {
{
let mut state = self.state.lock().await;
state.set_rate_limits(new_rate_limits);
let rate_limits = match self.fetch_rate_limits(turn_context).await {
Ok(snapshot) => Some(snapshot),
Err(RateLimitFetchError::MissingAuth) | Err(RateLimitFetchError::WrongAuthMode) => None,
Err(RateLimitFetchError::Request(message)) => {
warn!("{message}");
None
}
};
self.update_usage_snapshot(turn_context, token_usage, rate_limits)
.await;
}
async fn refresh_usage_for_status(
&self,
turn_context: &TurnContext,
) -> Result<(), RateLimitFetchError> {
let snapshot = self.fetch_rate_limits(turn_context).await?;
self.update_usage_snapshot(turn_context, None, Some(snapshot))
.await;
Ok(())
}
async fn fetch_rate_limits(
&self,
turn_context: &TurnContext,
) -> Result<RateLimitSnapshot, RateLimitFetchError> {
let Some(auth) = self.services.auth_manager.auth() else {
return Err(RateLimitFetchError::MissingAuth);
};
if auth.mode != AuthMode::ChatGPT {
return Err(RateLimitFetchError::WrongAuthMode);
}
self.send_token_count_event(turn_context).await;
let token = auth.get_token().await.map_err(|err| {
RateLimitFetchError::Request(format!("failed to fetch codex rate limits: {err}"))
})?;
let base = turn_context.client.backend_base_url().trim_end_matches('/');
let url = if base.contains("/backend-api") {
format!("{base}/wham/usage")
} else {
format!("{base}/api/codex/usage")
};
let client = create_client();
let mut request = client
.get(&url)
.header(reqwest::header::USER_AGENT, get_codex_user_agent())
.bearer_auth(token);
if let Some(account_id) = auth.get_account_id() {
request = request.header("ChatGPT-Account-Id", account_id);
}
let response = request.send().await.map_err(|err| {
RateLimitFetchError::Request(format!("failed to fetch codex rate limits: {err}"))
})?;
let status = response.status();
if !status.is_success() {
if status == StatusCode::UNAUTHORIZED {
return Err(RateLimitFetchError::WrongAuthMode);
}
let body = response.text().await.unwrap_or_default();
return Err(RateLimitFetchError::Request(format!(
"failed to fetch codex rate limits: status {status}; body={body}"
)));
}
let payload: BackendRateLimitStatusPayload = response.json().await.map_err(|err| {
RateLimitFetchError::Request(format!("failed to fetch codex rate limits: {err}"))
})?;
Ok(rate_limit_snapshot_from_usage_payload(payload))
}
async fn send_token_count_event(&self, turn_context: &TurnContext) {
@@ -1282,6 +1383,23 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
};
sess.send_event_raw(event).await;
}
Op::RefreshUsage => {
let turn_context = sess
.new_turn_with_sub_id(sub.id.clone(), SessionSettingsUpdate::default())
.await;
match sess.refresh_usage_for_status(turn_context.as_ref()).await {
Ok(()) => {}
Err(err) => {
let event = Event {
id: turn_context.sub_id.clone(),
msg: EventMsg::Error(ErrorEvent {
message: err.message(),
}),
};
sess.send_event_raw(event).await;
}
}
}
Op::Compact => {
let turn_context = sess
.new_turn_with_sub_id(sub.id.clone(), SessionSettingsUpdate::default())
@@ -1838,7 +1956,7 @@ async fn run_turn(
Err(CodexErr::UsageLimitReached(e)) => {
let rate_limits = e.rate_limits.clone();
if let Some(rate_limits) = rate_limits {
sess.update_rate_limits(turn_context.as_ref(), rate_limits)
sess.update_usage_snapshot(turn_context.as_ref(), None, Some(rate_limits))
.await;
}
return Err(CodexErr::UsageLimitReached(e));
@@ -2078,17 +2196,11 @@ async fn try_run_turn(
})
.await;
}
ResponseEvent::RateLimits(snapshot) => {
// Update internal state with latest rate limits, but defer sending until
// token usage is available to avoid duplicate TokenCount events.
sess.update_rate_limits(turn_context.as_ref(), snapshot)
.await;
}
ResponseEvent::Completed {
response_id: _,
token_usage,
} => {
sess.update_token_usage_info(turn_context.as_ref(), token_usage.as_ref())
sess.refresh_usage_after_turn(turn_context.as_ref(), token_usage.as_ref())
.await;
let processed_items = output
@@ -2328,6 +2440,7 @@ mod tests {
use crate::exec::ExecToolCallOutput;
use crate::mcp::auth::McpAuthStatusEntry;
use crate::tools::format_exec_output_str;
use serde::Deserialize;
use crate::protocol::CompactedItem;
use crate::protocol::InitialHistory;
@@ -2352,7 +2465,6 @@ mod tests {
use mcp_types::ContentBlock;
use mcp_types::TextContent;
use pretty_assertions::assert_eq;
use serde::Deserialize;
use serde_json::json;
use std::path::PathBuf;
use std::sync::Arc;

View File

@@ -257,11 +257,8 @@ async fn drain_to_completed(
Ok(ResponseEvent::OutputItemDone(item)) => {
sess.record_into_history(std::slice::from_ref(&item)).await;
}
Ok(ResponseEvent::RateLimits(snapshot)) => {
sess.update_rate_limits(turn_context, snapshot).await;
}
Ok(ResponseEvent::Completed { token_usage, .. }) => {
sess.update_token_usage_info(turn_context, token_usage.as_ref())
sess.refresh_usage_after_turn(turn_context, token_usage.as_ref())
.await;
return Ok(());
}

View File

@@ -36,6 +36,7 @@ mod mcp_tool_call;
mod message_history;
mod model_provider_info;
pub mod parse_command;
pub mod rate_limits;
pub mod sandboxing;
pub mod token_data;
mod truncate;

View File

@@ -0,0 +1,51 @@
use codex_backend_openapi_models::models::RateLimitStatusPayload as BackendRateLimitStatusPayload;
use codex_backend_openapi_models::models::RateLimitWindowSnapshot as BackendRateLimitWindowSnapshot;
use codex_protocol::protocol::RateLimitSnapshot;
use codex_protocol::protocol::RateLimitWindow;
pub fn rate_limit_snapshot_from_usage_payload(
payload: BackendRateLimitStatusPayload,
) -> RateLimitSnapshot {
let Some(details) = payload
.rate_limit
.and_then(|inner| inner.map(|boxed| *boxed))
else {
return RateLimitSnapshot {
primary: None,
secondary: None,
};
};
RateLimitSnapshot {
primary: map_rate_limit_window(details.primary_window),
secondary: map_rate_limit_window(details.secondary_window),
}
}
fn map_rate_limit_window(
window: Option<Option<Box<BackendRateLimitWindowSnapshot>>>,
) -> Option<RateLimitWindow> {
let snapshot = match window {
Some(Some(snapshot)) => *snapshot,
_ => return None,
};
let used_percent = f64::from(snapshot.used_percent);
let window_minutes = window_minutes_from_seconds(snapshot.limit_window_seconds);
let resets_at = Some(i64::from(snapshot.reset_at));
Some(RateLimitWindow {
used_percent,
window_minutes,
resets_at,
})
}
fn window_minutes_from_seconds(seconds: i32) -> Option<i64> {
if seconds <= 0 {
return None;
}
let seconds_i64 = i64::from(seconds);
Some((seconds_i64 + 59) / 60)
}

View File

@@ -765,12 +765,6 @@ async fn token_count_includes_rate_limits_snapshot() {
let response = ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.insert_header("x-codex-primary-used-percent", "12.5")
.insert_header("x-codex-secondary-used-percent", "40.0")
.insert_header("x-codex-primary-window-minutes", "10")
.insert_header("x-codex-secondary-window-minutes", "60")
.insert_header("x-codex-primary-reset-at", "1704069000")
.insert_header("x-codex-secondary-reset-at", "1704074400")
.set_body_raw(sse_body, "text/event-stream");
Mock::given(method("POST"))
@@ -780,14 +774,43 @@ async fn token_count_includes_rate_limits_snapshot() {
.mount(&server)
.await;
let usage_payload = serde_json::json!({
"plan_type": "plus",
"rate_limit": {
"allowed": true,
"limit_reached": false,
"primary_window": {
"used_percent": 12,
"limit_window_seconds": 600,
"reset_after_seconds": 30,
"reset_at": 1704069000
},
"secondary_window": {
"used_percent": 40,
"limit_window_seconds": 3600,
"reset_after_seconds": 120,
"reset_at": 1704074400
}
}
});
Mock::given(method("GET"))
.and(path("/backend-api/wham/usage"))
.respond_with(ResponseTemplate::new(200).set_body_json(usage_payload))
.expect(1)
.mount(&server)
.await;
let mut provider = built_in_model_providers()["openai"].clone();
provider.base_url = Some(format!("{}/v1", server.uri()));
let home = TempDir::new().unwrap();
let mut config = load_default_config_for_test(&home);
config.model_provider = provider;
config.chatgpt_base_url = format!("{}/backend-api", server.uri());
let conversation_manager = ConversationManager::with_auth(CodexAuth::from_api_key("test"));
let conversation_manager =
ConversationManager::with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let codex = conversation_manager
.new_conversation(config)
.await
@@ -803,33 +826,6 @@ async fn token_count_includes_rate_limits_snapshot() {
.await
.unwrap();
let first_token_event =
wait_for_event(&codex, |msg| matches!(msg, EventMsg::TokenCount(_))).await;
let rate_limit_only = match first_token_event {
EventMsg::TokenCount(ev) => ev,
_ => unreachable!(),
};
let rate_limit_json = serde_json::to_value(&rate_limit_only).unwrap();
pretty_assertions::assert_eq!(
rate_limit_json,
json!({
"info": null,
"rate_limits": {
"primary": {
"used_percent": 12.5,
"window_minutes": 10,
"resets_at": 1704069000
},
"secondary": {
"used_percent": 40.0,
"window_minutes": 60,
"resets_at": 1704074400
}
}
})
);
let token_event = wait_for_event(
&codex,
|msg| matches!(msg, EventMsg::TokenCount(ev) if ev.info.is_some()),
@@ -839,7 +835,7 @@ async fn token_count_includes_rate_limits_snapshot() {
EventMsg::TokenCount(ev) => ev,
_ => unreachable!(),
};
// Assert full JSON for the final token count event (usage + rate limits)
// Assert full JSON for the token count event (usage + rate limits)
let final_json = serde_json::to_value(&final_payload).unwrap();
pretty_assertions::assert_eq!(
final_json,
@@ -864,7 +860,7 @@ async fn token_count_includes_rate_limits_snapshot() {
},
"rate_limits": {
"primary": {
"used_percent": 12.5,
"used_percent": 12.0,
"window_minutes": 10,
"resets_at": 1704069000
},
@@ -888,7 +884,7 @@ async fn token_count_includes_rate_limits_snapshot() {
.primary
.as_ref()
.map(|window| window.used_percent),
Some(12.5)
Some(12.0)
);
assert_eq!(
final_snapshot

View File

@@ -53,6 +53,11 @@ async fn resume_includes_initial_messages_from_rollout_events() -> Result<()> {
EventMsg::TokenCount(_),
EventMsg::AgentMessage(assistant_message),
EventMsg::TokenCount(_),
]
| [
EventMsg::UserMessage(first_user),
EventMsg::AgentMessage(assistant_message),
EventMsg::TokenCount(_),
] => {
assert_eq!(first_user.message, "Record some messages");
assert_eq!(assistant_message.message, "Completed first turn");

View File

@@ -132,6 +132,9 @@ pub enum Op {
summary: Option<ReasoningSummaryConfig>,
},
/// Request the latest usage and rate limits without sending a turn to the model.
RefreshUsage,
/// Approve a command execution
ExecApproval {
/// The id of the submission we are approving

View File

@@ -241,6 +241,7 @@ pub(crate) struct ChatWidget {
token_info: Option<TokenUsageInfo>,
rate_limit_snapshot: Option<RateLimitSnapshotDisplay>,
rate_limit_warnings: RateLimitWarningState,
status_refresh_pending: bool,
// Stream lifecycle controller
stream_controller: Option<StreamController>,
running_commands: HashMap<String, RunningCommand>,
@@ -940,6 +941,7 @@ impl ChatWidget {
token_info: None,
rate_limit_snapshot: None,
rate_limit_warnings: RateLimitWarningState::default(),
status_refresh_pending: false,
stream_controller: None,
running_commands: HashMap::new(),
task_complete_pending: false,
@@ -1007,6 +1009,7 @@ impl ChatWidget {
token_info: None,
rate_limit_snapshot: None,
rate_limit_warnings: RateLimitWarningState::default(),
status_refresh_pending: false,
stream_controller: None,
running_commands: HashMap::new(),
task_complete_pending: false,
@@ -1209,7 +1212,7 @@ impl ChatWidget {
self.insert_str("@");
}
SlashCommand::Status => {
self.add_status_output();
self.request_status_refresh();
}
SlashCommand::Mcp => {
self.add_mcp_output();
@@ -1446,8 +1449,20 @@ impl ChatWidget {
EventMsg::TokenCount(ev) => {
self.set_token_info(ev.info);
self.on_rate_limit_snapshot(ev.rate_limits);
if self.status_refresh_pending {
self.status_refresh_pending = false;
self.add_status_output();
}
}
EventMsg::Error(ErrorEvent { message }) => {
if self.status_refresh_pending {
self.status_refresh_pending = false;
self.add_to_history(history_cell::new_error_event(message));
self.request_redraw();
} else {
self.on_error(message);
}
}
EventMsg::Error(ErrorEvent { message }) => self.on_error(message),
EventMsg::TurnAborted(ev) => match ev.reason {
TurnAbortReason::Interrupted => {
self.on_interrupted_turn(ev.reason);
@@ -1628,6 +1643,15 @@ impl ChatWidget {
self.request_redraw();
}
fn request_status_refresh(&mut self) {
if self.status_refresh_pending {
return;
}
self.status_refresh_pending = true;
self.submit_op(Op::RefreshUsage);
}
pub(crate) fn add_status_output(&mut self) {
let default_usage = TokenUsage::default();
let (total_usage, context_usage) = if let Some(ti) = &self.token_info {

View File

@@ -273,6 +273,7 @@ fn make_chatwidget_manual() -> (
token_info: None,
rate_limit_snapshot: None,
rate_limit_warnings: RateLimitWarningState::default(),
status_refresh_pending: false,
stream_controller: None,
running_commands: HashMap::new(),
task_complete_pending: false,