mirror of
https://github.com/openai/codex.git
synced 2026-03-03 05:03:20 +00:00
Compare commits
1 Commits
fix/notify
...
dev/zhao/u
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
31478880b0 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -40,6 +40,7 @@ AGENTS.override.md
|
||||
.nyc_output/
|
||||
.jest/
|
||||
*.tsbuildinfo
|
||||
codex-backend/
|
||||
|
||||
# logs
|
||||
*.log
|
||||
|
||||
1
codex-rs/Cargo.lock
generated
1
codex-rs/Cargo.lock
generated
@@ -1060,6 +1060,7 @@ dependencies = [
|
||||
"codex-app-server-protocol",
|
||||
"codex-apply-patch",
|
||||
"codex-async-utils",
|
||||
"codex-backend-openapi-models",
|
||||
"codex-file-search",
|
||||
"codex-mcp-client",
|
||||
"codex-otel",
|
||||
|
||||
@@ -59,6 +59,7 @@ codex-apply-patch = { path = "apply-patch" }
|
||||
codex-arg0 = { path = "arg0" }
|
||||
codex-async-utils = { path = "async-utils" }
|
||||
codex-backend-client = { path = "backend-client" }
|
||||
codex-backend-openapi-models = { path = "codex-backend-openapi-models" }
|
||||
codex-chatgpt = { path = "chatgpt" }
|
||||
codex-common = { path = "common" }
|
||||
codex-core = { path = "core" }
|
||||
|
||||
@@ -1,13 +1,12 @@
|
||||
use crate::types::CodeTaskDetailsResponse;
|
||||
use crate::types::PaginatedListTaskListItem;
|
||||
use crate::types::RateLimitStatusPayload;
|
||||
use crate::types::RateLimitWindowSnapshot;
|
||||
use crate::types::TurnAttemptsSiblingTurnsResponse;
|
||||
use anyhow::Result;
|
||||
use codex_core::auth::CodexAuth;
|
||||
use codex_core::default_client::get_codex_user_agent;
|
||||
use codex_core::rate_limits::rate_limit_snapshot_from_usage_payload;
|
||||
use codex_protocol::protocol::RateLimitSnapshot;
|
||||
use codex_protocol::protocol::RateLimitWindow;
|
||||
use reqwest::header::AUTHORIZATION;
|
||||
use reqwest::header::CONTENT_TYPE;
|
||||
use reqwest::header::HeaderMap;
|
||||
@@ -163,7 +162,7 @@ impl Client {
|
||||
let req = self.http.get(&url).headers(self.headers());
|
||||
let (body, ct) = self.exec_request(req, "GET", &url).await?;
|
||||
let payload: RateLimitStatusPayload = self.decode_json(&url, &ct, &body)?;
|
||||
Ok(Self::rate_limit_snapshot_from_payload(payload))
|
||||
Ok(rate_limit_snapshot_from_usage_payload(payload))
|
||||
}
|
||||
|
||||
pub async fn list_tasks(
|
||||
@@ -269,49 +268,4 @@ impl Client {
|
||||
Err(e) => anyhow::bail!("Decode error for {url}: {e}; content-type={ct}; body={body}"),
|
||||
}
|
||||
}
|
||||
|
||||
// rate limit helpers
|
||||
fn rate_limit_snapshot_from_payload(payload: RateLimitStatusPayload) -> RateLimitSnapshot {
|
||||
let Some(details) = payload
|
||||
.rate_limit
|
||||
.and_then(|inner| inner.map(|boxed| *boxed))
|
||||
else {
|
||||
return RateLimitSnapshot {
|
||||
primary: None,
|
||||
secondary: None,
|
||||
};
|
||||
};
|
||||
|
||||
RateLimitSnapshot {
|
||||
primary: Self::map_rate_limit_window(details.primary_window),
|
||||
secondary: Self::map_rate_limit_window(details.secondary_window),
|
||||
}
|
||||
}
|
||||
|
||||
fn map_rate_limit_window(
|
||||
window: Option<Option<Box<RateLimitWindowSnapshot>>>,
|
||||
) -> Option<RateLimitWindow> {
|
||||
let snapshot = match window {
|
||||
Some(Some(snapshot)) => *snapshot,
|
||||
_ => return None,
|
||||
};
|
||||
|
||||
let used_percent = f64::from(snapshot.used_percent);
|
||||
let window_minutes = Self::window_minutes_from_seconds(snapshot.limit_window_seconds);
|
||||
let resets_at = Some(i64::from(snapshot.reset_at));
|
||||
Some(RateLimitWindow {
|
||||
used_percent,
|
||||
window_minutes,
|
||||
resets_at,
|
||||
})
|
||||
}
|
||||
|
||||
fn window_minutes_from_seconds(seconds: i32) -> Option<i64> {
|
||||
if seconds <= 0 {
|
||||
return None;
|
||||
}
|
||||
|
||||
let seconds_i64 = i64::from(seconds);
|
||||
Some((seconds_i64 + 59) / 60)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,6 +29,7 @@ codex-rmcp-client = { workspace = true }
|
||||
codex-async-utils = { workspace = true }
|
||||
codex-utils-string = { workspace = true }
|
||||
codex-utils-pty = { workspace = true }
|
||||
codex-backend-openapi-models = { workspace = true }
|
||||
dirs = { workspace = true }
|
||||
dunce = { workspace = true }
|
||||
env-flags = { workspace = true }
|
||||
|
||||
@@ -751,9 +751,6 @@ where
|
||||
// Not an assistant message – forward immediately.
|
||||
return Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone(item))));
|
||||
}
|
||||
Poll::Ready(Some(Ok(ResponseEvent::RateLimits(snapshot)))) => {
|
||||
return Poll::Ready(Some(Ok(ResponseEvent::RateLimits(snapshot))));
|
||||
}
|
||||
Poll::Ready(Some(Ok(ResponseEvent::Completed {
|
||||
response_id,
|
||||
token_usage,
|
||||
|
||||
@@ -120,6 +120,10 @@ impl ModelClient {
|
||||
.map(|w| w.saturating_mul(pct) / 100)
|
||||
}
|
||||
|
||||
pub(crate) fn backend_base_url(&self) -> &str {
|
||||
self.config.chatgpt_base_url.as_str()
|
||||
}
|
||||
|
||||
pub fn get_auto_compact_token_limit(&self) -> Option<i64> {
|
||||
self.config.model_auto_compact_token_limit.or_else(|| {
|
||||
get_model_info(&self.config.model_family).and_then(|info| info.auto_compact_token_limit)
|
||||
@@ -348,15 +352,6 @@ impl ModelClient {
|
||||
Ok(resp) if resp.status().is_success() => {
|
||||
let (tx_event, rx_event) = mpsc::channel::<Result<ResponseEvent>>(1600);
|
||||
|
||||
if let Some(snapshot) = parse_rate_limit_snapshot(resp.headers())
|
||||
&& tx_event
|
||||
.send(Ok(ResponseEvent::RateLimits(snapshot)))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
debug!("receiver dropped rate limit snapshot event");
|
||||
}
|
||||
|
||||
// spawn task to process SSE
|
||||
let stream = resp.bytes_stream().map_err(move |e| {
|
||||
CodexErr::ResponseStreamFailed(ResponseStreamFailed {
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use crate::client_common::tools::ToolSpec;
|
||||
use crate::error::Result;
|
||||
use crate::model_family::ModelFamily;
|
||||
use crate::protocol::RateLimitSnapshot;
|
||||
use crate::protocol::TokenUsage;
|
||||
use codex_apply_patch::APPLY_PATCH_TOOL_INSTRUCTIONS;
|
||||
use codex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig;
|
||||
@@ -203,7 +202,6 @@ pub enum ResponseEvent {
|
||||
WebSearchCallBegin {
|
||||
call_id: String,
|
||||
},
|
||||
RateLimits(RateLimitSnapshot),
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
|
||||
@@ -17,7 +17,9 @@ use crate::terminal;
|
||||
use crate::user_notification::UserNotifier;
|
||||
use async_channel::Receiver;
|
||||
use async_channel::Sender;
|
||||
use codex_app_server_protocol::AuthMode;
|
||||
use codex_apply_patch::ApplyPatchAction;
|
||||
use codex_backend_openapi_models::models::RateLimitStatusPayload as BackendRateLimitStatusPayload;
|
||||
use codex_protocol::ConversationId;
|
||||
use codex_protocol::items::TurnItem;
|
||||
use codex_protocol::items::UserMessageItem;
|
||||
@@ -39,6 +41,7 @@ use mcp_types::ListResourcesRequestParams;
|
||||
use mcp_types::ListResourcesResult;
|
||||
use mcp_types::ReadResourceRequestParams;
|
||||
use mcp_types::ReadResourceResult;
|
||||
use reqwest::StatusCode;
|
||||
use serde_json;
|
||||
use serde_json::Value;
|
||||
use tokio::sync::Mutex;
|
||||
@@ -59,11 +62,14 @@ use crate::config::Config;
|
||||
use crate::config_types::McpServerTransportConfig;
|
||||
use crate::config_types::ShellEnvironmentPolicy;
|
||||
use crate::conversation_history::ConversationHistory;
|
||||
use crate::default_client::create_client;
|
||||
use crate::default_client::get_codex_user_agent;
|
||||
use crate::environment_context::EnvironmentContext;
|
||||
use crate::error::CodexErr;
|
||||
use crate::error::Result as CodexResult;
|
||||
#[cfg(test)]
|
||||
use crate::exec::StreamOutput;
|
||||
use crate::rate_limits::rate_limit_snapshot_from_usage_payload;
|
||||
// Removed: legacy executor wiring replaced by ToolOrchestrator flows.
|
||||
// legacy normalize_exec_result no longer used after orchestrator migration
|
||||
use crate::mcp::auth::compute_auth_statuses;
|
||||
@@ -352,6 +358,27 @@ pub(crate) struct SessionSettingsUpdate {
|
||||
pub(crate) final_output_json_schema: Option<Option<Value>>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum RateLimitFetchError {
|
||||
MissingAuth,
|
||||
WrongAuthMode,
|
||||
Request(String),
|
||||
}
|
||||
|
||||
impl RateLimitFetchError {
|
||||
fn message(&self) -> String {
|
||||
match self {
|
||||
RateLimitFetchError::MissingAuth => {
|
||||
"codex account authentication required to read rate limits".to_string()
|
||||
}
|
||||
RateLimitFetchError::WrongAuthMode => {
|
||||
"chatgpt authentication required to read rate limits".to_string()
|
||||
}
|
||||
RateLimitFetchError::Request(message) => message.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Session {
|
||||
fn make_turn_context(
|
||||
auth_manager: Option<Arc<AuthManager>>,
|
||||
@@ -895,10 +922,11 @@ impl Session {
|
||||
state.history_snapshot()
|
||||
}
|
||||
|
||||
async fn update_token_usage_info(
|
||||
async fn update_usage_snapshot(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
token_usage: Option<&TokenUsage>,
|
||||
rate_limits: Option<RateLimitSnapshot>,
|
||||
) {
|
||||
{
|
||||
let mut state = self.state.lock().await;
|
||||
@@ -908,20 +936,93 @@ impl Session {
|
||||
turn_context.client.get_model_context_window(),
|
||||
);
|
||||
}
|
||||
if let Some(snapshot) = rate_limits {
|
||||
state.set_rate_limits(snapshot);
|
||||
}
|
||||
}
|
||||
self.send_token_count_event(turn_context).await;
|
||||
}
|
||||
|
||||
async fn update_rate_limits(
|
||||
async fn refresh_usage_after_turn(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
new_rate_limits: RateLimitSnapshot,
|
||||
token_usage: Option<&TokenUsage>,
|
||||
) {
|
||||
{
|
||||
let mut state = self.state.lock().await;
|
||||
state.set_rate_limits(new_rate_limits);
|
||||
let rate_limits = match self.fetch_rate_limits(turn_context).await {
|
||||
Ok(snapshot) => Some(snapshot),
|
||||
Err(RateLimitFetchError::MissingAuth) | Err(RateLimitFetchError::WrongAuthMode) => None,
|
||||
Err(RateLimitFetchError::Request(message)) => {
|
||||
warn!("{message}");
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
self.update_usage_snapshot(turn_context, token_usage, rate_limits)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn refresh_usage_for_status(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
) -> Result<(), RateLimitFetchError> {
|
||||
let snapshot = self.fetch_rate_limits(turn_context).await?;
|
||||
self.update_usage_snapshot(turn_context, None, Some(snapshot))
|
||||
.await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn fetch_rate_limits(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
) -> Result<RateLimitSnapshot, RateLimitFetchError> {
|
||||
let Some(auth) = self.services.auth_manager.auth() else {
|
||||
return Err(RateLimitFetchError::MissingAuth);
|
||||
};
|
||||
if auth.mode != AuthMode::ChatGPT {
|
||||
return Err(RateLimitFetchError::WrongAuthMode);
|
||||
}
|
||||
self.send_token_count_event(turn_context).await;
|
||||
|
||||
let token = auth.get_token().await.map_err(|err| {
|
||||
RateLimitFetchError::Request(format!("failed to fetch codex rate limits: {err}"))
|
||||
})?;
|
||||
|
||||
let base = turn_context.client.backend_base_url().trim_end_matches('/');
|
||||
let url = if base.contains("/backend-api") {
|
||||
format!("{base}/wham/usage")
|
||||
} else {
|
||||
format!("{base}/api/codex/usage")
|
||||
};
|
||||
|
||||
let client = create_client();
|
||||
let mut request = client
|
||||
.get(&url)
|
||||
.header(reqwest::header::USER_AGENT, get_codex_user_agent())
|
||||
.bearer_auth(token);
|
||||
|
||||
if let Some(account_id) = auth.get_account_id() {
|
||||
request = request.header("ChatGPT-Account-Id", account_id);
|
||||
}
|
||||
|
||||
let response = request.send().await.map_err(|err| {
|
||||
RateLimitFetchError::Request(format!("failed to fetch codex rate limits: {err}"))
|
||||
})?;
|
||||
|
||||
let status = response.status();
|
||||
if !status.is_success() {
|
||||
if status == StatusCode::UNAUTHORIZED {
|
||||
return Err(RateLimitFetchError::WrongAuthMode);
|
||||
}
|
||||
let body = response.text().await.unwrap_or_default();
|
||||
return Err(RateLimitFetchError::Request(format!(
|
||||
"failed to fetch codex rate limits: status {status}; body={body}"
|
||||
)));
|
||||
}
|
||||
|
||||
let payload: BackendRateLimitStatusPayload = response.json().await.map_err(|err| {
|
||||
RateLimitFetchError::Request(format!("failed to fetch codex rate limits: {err}"))
|
||||
})?;
|
||||
|
||||
Ok(rate_limit_snapshot_from_usage_payload(payload))
|
||||
}
|
||||
|
||||
async fn send_token_count_event(&self, turn_context: &TurnContext) {
|
||||
@@ -1282,6 +1383,23 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
|
||||
};
|
||||
sess.send_event_raw(event).await;
|
||||
}
|
||||
Op::RefreshUsage => {
|
||||
let turn_context = sess
|
||||
.new_turn_with_sub_id(sub.id.clone(), SessionSettingsUpdate::default())
|
||||
.await;
|
||||
match sess.refresh_usage_for_status(turn_context.as_ref()).await {
|
||||
Ok(()) => {}
|
||||
Err(err) => {
|
||||
let event = Event {
|
||||
id: turn_context.sub_id.clone(),
|
||||
msg: EventMsg::Error(ErrorEvent {
|
||||
message: err.message(),
|
||||
}),
|
||||
};
|
||||
sess.send_event_raw(event).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
Op::Compact => {
|
||||
let turn_context = sess
|
||||
.new_turn_with_sub_id(sub.id.clone(), SessionSettingsUpdate::default())
|
||||
@@ -1838,7 +1956,7 @@ async fn run_turn(
|
||||
Err(CodexErr::UsageLimitReached(e)) => {
|
||||
let rate_limits = e.rate_limits.clone();
|
||||
if let Some(rate_limits) = rate_limits {
|
||||
sess.update_rate_limits(turn_context.as_ref(), rate_limits)
|
||||
sess.update_usage_snapshot(turn_context.as_ref(), None, Some(rate_limits))
|
||||
.await;
|
||||
}
|
||||
return Err(CodexErr::UsageLimitReached(e));
|
||||
@@ -2078,17 +2196,11 @@ async fn try_run_turn(
|
||||
})
|
||||
.await;
|
||||
}
|
||||
ResponseEvent::RateLimits(snapshot) => {
|
||||
// Update internal state with latest rate limits, but defer sending until
|
||||
// token usage is available to avoid duplicate TokenCount events.
|
||||
sess.update_rate_limits(turn_context.as_ref(), snapshot)
|
||||
.await;
|
||||
}
|
||||
ResponseEvent::Completed {
|
||||
response_id: _,
|
||||
token_usage,
|
||||
} => {
|
||||
sess.update_token_usage_info(turn_context.as_ref(), token_usage.as_ref())
|
||||
sess.refresh_usage_after_turn(turn_context.as_ref(), token_usage.as_ref())
|
||||
.await;
|
||||
|
||||
let processed_items = output
|
||||
@@ -2328,6 +2440,7 @@ mod tests {
|
||||
use crate::exec::ExecToolCallOutput;
|
||||
use crate::mcp::auth::McpAuthStatusEntry;
|
||||
use crate::tools::format_exec_output_str;
|
||||
use serde::Deserialize;
|
||||
|
||||
use crate::protocol::CompactedItem;
|
||||
use crate::protocol::InitialHistory;
|
||||
@@ -2352,7 +2465,6 @@ mod tests {
|
||||
use mcp_types::ContentBlock;
|
||||
use mcp_types::TextContent;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde::Deserialize;
|
||||
use serde_json::json;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -257,11 +257,8 @@ async fn drain_to_completed(
|
||||
Ok(ResponseEvent::OutputItemDone(item)) => {
|
||||
sess.record_into_history(std::slice::from_ref(&item)).await;
|
||||
}
|
||||
Ok(ResponseEvent::RateLimits(snapshot)) => {
|
||||
sess.update_rate_limits(turn_context, snapshot).await;
|
||||
}
|
||||
Ok(ResponseEvent::Completed { token_usage, .. }) => {
|
||||
sess.update_token_usage_info(turn_context, token_usage.as_ref())
|
||||
sess.refresh_usage_after_turn(turn_context, token_usage.as_ref())
|
||||
.await;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -36,6 +36,7 @@ mod mcp_tool_call;
|
||||
mod message_history;
|
||||
mod model_provider_info;
|
||||
pub mod parse_command;
|
||||
pub mod rate_limits;
|
||||
pub mod sandboxing;
|
||||
pub mod token_data;
|
||||
mod truncate;
|
||||
|
||||
51
codex-rs/core/src/rate_limits.rs
Normal file
51
codex-rs/core/src/rate_limits.rs
Normal file
@@ -0,0 +1,51 @@
|
||||
use codex_backend_openapi_models::models::RateLimitStatusPayload as BackendRateLimitStatusPayload;
|
||||
use codex_backend_openapi_models::models::RateLimitWindowSnapshot as BackendRateLimitWindowSnapshot;
|
||||
use codex_protocol::protocol::RateLimitSnapshot;
|
||||
use codex_protocol::protocol::RateLimitWindow;
|
||||
|
||||
pub fn rate_limit_snapshot_from_usage_payload(
|
||||
payload: BackendRateLimitStatusPayload,
|
||||
) -> RateLimitSnapshot {
|
||||
let Some(details) = payload
|
||||
.rate_limit
|
||||
.and_then(|inner| inner.map(|boxed| *boxed))
|
||||
else {
|
||||
return RateLimitSnapshot {
|
||||
primary: None,
|
||||
secondary: None,
|
||||
};
|
||||
};
|
||||
|
||||
RateLimitSnapshot {
|
||||
primary: map_rate_limit_window(details.primary_window),
|
||||
secondary: map_rate_limit_window(details.secondary_window),
|
||||
}
|
||||
}
|
||||
|
||||
fn map_rate_limit_window(
|
||||
window: Option<Option<Box<BackendRateLimitWindowSnapshot>>>,
|
||||
) -> Option<RateLimitWindow> {
|
||||
let snapshot = match window {
|
||||
Some(Some(snapshot)) => *snapshot,
|
||||
_ => return None,
|
||||
};
|
||||
|
||||
let used_percent = f64::from(snapshot.used_percent);
|
||||
let window_minutes = window_minutes_from_seconds(snapshot.limit_window_seconds);
|
||||
let resets_at = Some(i64::from(snapshot.reset_at));
|
||||
|
||||
Some(RateLimitWindow {
|
||||
used_percent,
|
||||
window_minutes,
|
||||
resets_at,
|
||||
})
|
||||
}
|
||||
|
||||
fn window_minutes_from_seconds(seconds: i32) -> Option<i64> {
|
||||
if seconds <= 0 {
|
||||
return None;
|
||||
}
|
||||
|
||||
let seconds_i64 = i64::from(seconds);
|
||||
Some((seconds_i64 + 59) / 60)
|
||||
}
|
||||
@@ -765,12 +765,6 @@ async fn token_count_includes_rate_limits_snapshot() {
|
||||
|
||||
let response = ResponseTemplate::new(200)
|
||||
.insert_header("content-type", "text/event-stream")
|
||||
.insert_header("x-codex-primary-used-percent", "12.5")
|
||||
.insert_header("x-codex-secondary-used-percent", "40.0")
|
||||
.insert_header("x-codex-primary-window-minutes", "10")
|
||||
.insert_header("x-codex-secondary-window-minutes", "60")
|
||||
.insert_header("x-codex-primary-reset-at", "1704069000")
|
||||
.insert_header("x-codex-secondary-reset-at", "1704074400")
|
||||
.set_body_raw(sse_body, "text/event-stream");
|
||||
|
||||
Mock::given(method("POST"))
|
||||
@@ -780,14 +774,43 @@ async fn token_count_includes_rate_limits_snapshot() {
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let usage_payload = serde_json::json!({
|
||||
"plan_type": "plus",
|
||||
"rate_limit": {
|
||||
"allowed": true,
|
||||
"limit_reached": false,
|
||||
"primary_window": {
|
||||
"used_percent": 12,
|
||||
"limit_window_seconds": 600,
|
||||
"reset_after_seconds": 30,
|
||||
"reset_at": 1704069000
|
||||
},
|
||||
"secondary_window": {
|
||||
"used_percent": 40,
|
||||
"limit_window_seconds": 3600,
|
||||
"reset_after_seconds": 120,
|
||||
"reset_at": 1704074400
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Mock::given(method("GET"))
|
||||
.and(path("/backend-api/wham/usage"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(usage_payload))
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let mut provider = built_in_model_providers()["openai"].clone();
|
||||
provider.base_url = Some(format!("{}/v1", server.uri()));
|
||||
|
||||
let home = TempDir::new().unwrap();
|
||||
let mut config = load_default_config_for_test(&home);
|
||||
config.model_provider = provider;
|
||||
config.chatgpt_base_url = format!("{}/backend-api", server.uri());
|
||||
|
||||
let conversation_manager = ConversationManager::with_auth(CodexAuth::from_api_key("test"));
|
||||
let conversation_manager =
|
||||
ConversationManager::with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing());
|
||||
let codex = conversation_manager
|
||||
.new_conversation(config)
|
||||
.await
|
||||
@@ -803,33 +826,6 @@ async fn token_count_includes_rate_limits_snapshot() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let first_token_event =
|
||||
wait_for_event(&codex, |msg| matches!(msg, EventMsg::TokenCount(_))).await;
|
||||
let rate_limit_only = match first_token_event {
|
||||
EventMsg::TokenCount(ev) => ev,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
let rate_limit_json = serde_json::to_value(&rate_limit_only).unwrap();
|
||||
pretty_assertions::assert_eq!(
|
||||
rate_limit_json,
|
||||
json!({
|
||||
"info": null,
|
||||
"rate_limits": {
|
||||
"primary": {
|
||||
"used_percent": 12.5,
|
||||
"window_minutes": 10,
|
||||
"resets_at": 1704069000
|
||||
},
|
||||
"secondary": {
|
||||
"used_percent": 40.0,
|
||||
"window_minutes": 60,
|
||||
"resets_at": 1704074400
|
||||
}
|
||||
}
|
||||
})
|
||||
);
|
||||
|
||||
let token_event = wait_for_event(
|
||||
&codex,
|
||||
|msg| matches!(msg, EventMsg::TokenCount(ev) if ev.info.is_some()),
|
||||
@@ -839,7 +835,7 @@ async fn token_count_includes_rate_limits_snapshot() {
|
||||
EventMsg::TokenCount(ev) => ev,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
// Assert full JSON for the final token count event (usage + rate limits)
|
||||
// Assert full JSON for the token count event (usage + rate limits)
|
||||
let final_json = serde_json::to_value(&final_payload).unwrap();
|
||||
pretty_assertions::assert_eq!(
|
||||
final_json,
|
||||
@@ -864,7 +860,7 @@ async fn token_count_includes_rate_limits_snapshot() {
|
||||
},
|
||||
"rate_limits": {
|
||||
"primary": {
|
||||
"used_percent": 12.5,
|
||||
"used_percent": 12.0,
|
||||
"window_minutes": 10,
|
||||
"resets_at": 1704069000
|
||||
},
|
||||
@@ -888,7 +884,7 @@ async fn token_count_includes_rate_limits_snapshot() {
|
||||
.primary
|
||||
.as_ref()
|
||||
.map(|window| window.used_percent),
|
||||
Some(12.5)
|
||||
Some(12.0)
|
||||
);
|
||||
assert_eq!(
|
||||
final_snapshot
|
||||
|
||||
@@ -53,6 +53,11 @@ async fn resume_includes_initial_messages_from_rollout_events() -> Result<()> {
|
||||
EventMsg::TokenCount(_),
|
||||
EventMsg::AgentMessage(assistant_message),
|
||||
EventMsg::TokenCount(_),
|
||||
]
|
||||
| [
|
||||
EventMsg::UserMessage(first_user),
|
||||
EventMsg::AgentMessage(assistant_message),
|
||||
EventMsg::TokenCount(_),
|
||||
] => {
|
||||
assert_eq!(first_user.message, "Record some messages");
|
||||
assert_eq!(assistant_message.message, "Completed first turn");
|
||||
|
||||
@@ -132,6 +132,9 @@ pub enum Op {
|
||||
summary: Option<ReasoningSummaryConfig>,
|
||||
},
|
||||
|
||||
/// Request the latest usage and rate limits without sending a turn to the model.
|
||||
RefreshUsage,
|
||||
|
||||
/// Approve a command execution
|
||||
ExecApproval {
|
||||
/// The id of the submission we are approving
|
||||
|
||||
@@ -241,6 +241,7 @@ pub(crate) struct ChatWidget {
|
||||
token_info: Option<TokenUsageInfo>,
|
||||
rate_limit_snapshot: Option<RateLimitSnapshotDisplay>,
|
||||
rate_limit_warnings: RateLimitWarningState,
|
||||
status_refresh_pending: bool,
|
||||
// Stream lifecycle controller
|
||||
stream_controller: Option<StreamController>,
|
||||
running_commands: HashMap<String, RunningCommand>,
|
||||
@@ -940,6 +941,7 @@ impl ChatWidget {
|
||||
token_info: None,
|
||||
rate_limit_snapshot: None,
|
||||
rate_limit_warnings: RateLimitWarningState::default(),
|
||||
status_refresh_pending: false,
|
||||
stream_controller: None,
|
||||
running_commands: HashMap::new(),
|
||||
task_complete_pending: false,
|
||||
@@ -1007,6 +1009,7 @@ impl ChatWidget {
|
||||
token_info: None,
|
||||
rate_limit_snapshot: None,
|
||||
rate_limit_warnings: RateLimitWarningState::default(),
|
||||
status_refresh_pending: false,
|
||||
stream_controller: None,
|
||||
running_commands: HashMap::new(),
|
||||
task_complete_pending: false,
|
||||
@@ -1209,7 +1212,7 @@ impl ChatWidget {
|
||||
self.insert_str("@");
|
||||
}
|
||||
SlashCommand::Status => {
|
||||
self.add_status_output();
|
||||
self.request_status_refresh();
|
||||
}
|
||||
SlashCommand::Mcp => {
|
||||
self.add_mcp_output();
|
||||
@@ -1446,8 +1449,20 @@ impl ChatWidget {
|
||||
EventMsg::TokenCount(ev) => {
|
||||
self.set_token_info(ev.info);
|
||||
self.on_rate_limit_snapshot(ev.rate_limits);
|
||||
if self.status_refresh_pending {
|
||||
self.status_refresh_pending = false;
|
||||
self.add_status_output();
|
||||
}
|
||||
}
|
||||
EventMsg::Error(ErrorEvent { message }) => {
|
||||
if self.status_refresh_pending {
|
||||
self.status_refresh_pending = false;
|
||||
self.add_to_history(history_cell::new_error_event(message));
|
||||
self.request_redraw();
|
||||
} else {
|
||||
self.on_error(message);
|
||||
}
|
||||
}
|
||||
EventMsg::Error(ErrorEvent { message }) => self.on_error(message),
|
||||
EventMsg::TurnAborted(ev) => match ev.reason {
|
||||
TurnAbortReason::Interrupted => {
|
||||
self.on_interrupted_turn(ev.reason);
|
||||
@@ -1628,6 +1643,15 @@ impl ChatWidget {
|
||||
self.request_redraw();
|
||||
}
|
||||
|
||||
fn request_status_refresh(&mut self) {
|
||||
if self.status_refresh_pending {
|
||||
return;
|
||||
}
|
||||
|
||||
self.status_refresh_pending = true;
|
||||
self.submit_op(Op::RefreshUsage);
|
||||
}
|
||||
|
||||
pub(crate) fn add_status_output(&mut self) {
|
||||
let default_usage = TokenUsage::default();
|
||||
let (total_usage, context_usage) = if let Some(ti) = &self.token_info {
|
||||
|
||||
@@ -273,6 +273,7 @@ fn make_chatwidget_manual() -> (
|
||||
token_info: None,
|
||||
rate_limit_snapshot: None,
|
||||
rate_limit_warnings: RateLimitWarningState::default(),
|
||||
status_refresh_pending: false,
|
||||
stream_controller: None,
|
||||
running_commands: HashMap::new(),
|
||||
task_complete_pending: false,
|
||||
|
||||
Reference in New Issue
Block a user