Compare commits

...

19 Commits

Author SHA1 Message Date
Ahmed Ibrahim
12af7f4452 tests 2026-01-13 15:20:10 -08:00
Ahmed Ibrahim
f982939887 tests 2026-01-13 15:20:07 -08:00
Ahmed Ibrahim
ad51f47550 Merge branch 'models_manager_config' into renew/cache 2026-01-13 15:15:30 -08:00
Ahmed Ibrahim
c80170754f fix 2026-01-13 15:05:57 -08:00
Ahmed Ibrahim
ff99224b55 Merge branch 'models_manager_config' into renew/cache 2026-01-13 15:01:24 -08:00
Ahmed Ibrahim
4d3785ed34 Merge branch 'clean/models/manager' into models_manager_config 2026-01-13 14:59:22 -08:00
Ahmed Ibrahim
be7284e098 refresh 2026-01-13 14:59:06 -08:00
Ahmed Ibrahim
4f80c005a3 refresh 2026-01-13 14:58:09 -08:00
Ahmed Ibrahim
941cae2db1 Merge branch 'clean/models/manager' into models_manager_config 2026-01-13 14:54:15 -08:00
Ahmed Ibrahim
da886ec2a6 fix 2026-01-13 14:53:27 -08:00
Ahmed Ibrahim
a6e26bfaf1 refresh models on thread manager start 2026-01-13 14:51:50 -08:00
Ahmed Ibrahim
fe85da4ba4 clean 2026-01-13 14:47:05 -08:00
Ahmed Ibrahim
c0a5e8c2cf clean 2026-01-13 14:46:01 -08:00
Ahmed Ibrahim
d49678a2bf clean 2026-01-13 14:37:03 -08:00
Ahmed Ibrahim
8d7362df44 clean 2026-01-13 14:26:34 -08:00
Ahmed Ibrahim
5bd85b38b8 clean 2026-01-13 14:25:44 -08:00
Ahmed Ibrahim
dd9b578e7c clean 2026-01-13 14:20:56 -08:00
Ahmed Ibrahim
5bad59116b clean up models manager 2026-01-13 14:19:01 -08:00
Ahmed Ibrahim
470a5b2158 clean up models manager 2026-01-13 14:16:53 -08:00
21 changed files with 691 additions and 370 deletions

View File

@@ -129,7 +129,7 @@ pub async fn run_main(
.try_init();
// Task: process incoming messages.
let processor_handle = tokio::spawn({
let processor_handle = tokio::spawn(async move {
let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx);
let cli_overrides: Vec<(String, TomlValue)> = cli_kv_overrides.clone();
let loader_overrides = loader_overrides_for_config_api;
@@ -140,19 +140,18 @@ pub async fn run_main(
cli_overrides,
loader_overrides,
feedback.clone(),
);
async move {
while let Some(msg) = incoming_rx.recv().await {
match msg {
JSONRPCMessage::Request(r) => processor.process_request(r).await,
JSONRPCMessage::Response(r) => processor.process_response(r).await,
JSONRPCMessage::Notification(n) => processor.process_notification(n).await,
JSONRPCMessage::Error(e) => processor.process_error(e),
}
)
.await;
while let Some(msg) = incoming_rx.recv().await {
match msg {
JSONRPCMessage::Request(r) => processor.process_request(r).await,
JSONRPCMessage::Response(r) => processor.process_response(r).await,
JSONRPCMessage::Notification(n) => processor.process_notification(n).await,
JSONRPCMessage::Error(e) => processor.process_error(e),
}
info!("processor task exited (channel closed)");
}
info!("processor task exited (channel closed)");
});
// Task: write outgoing messages to stdout.

View File

@@ -39,7 +39,7 @@ pub(crate) struct MessageProcessor {
impl MessageProcessor {
/// Create a new `MessageProcessor`, retaining a handle to the outgoing
/// `Sender` so handlers can enqueue messages to be written to stdout.
pub(crate) fn new(
pub(crate) async fn new(
outgoing: OutgoingMessageSender,
codex_linux_sandbox_exe: Option<PathBuf>,
config: Arc<Config>,
@@ -53,11 +53,14 @@ impl MessageProcessor {
false,
config.cli_auth_credentials_store_mode,
);
let thread_manager = Arc::new(ThreadManager::new(
config.codex_home.clone(),
auth_manager.clone(),
SessionSource::VSCode,
));
let thread_manager = Arc::new(
ThreadManager::new(
(*config).clone(),
auth_manager.clone(),
SessionSource::VSCode,
)
.await,
);
let codex_message_processor = CodexMessageProcessor::new(
auth_manager,
thread_manager,

View File

@@ -4,12 +4,13 @@ use codex_app_server_protocol::Model;
use codex_app_server_protocol::ReasoningEffortOption;
use codex_core::ThreadManager;
use codex_core::config::Config;
use codex_core::models_manager::manager::RefreshStrategy;
use codex_protocol::openai_models::ModelPreset;
use codex_protocol::openai_models::ReasoningEffortPreset;
pub async fn supported_models(thread_manager: Arc<ThreadManager>, config: &Config) -> Vec<Model> {
thread_manager
.list_models(config)
.list_models(config, RefreshStrategy::default())
.await
.into_iter()
.filter(|preset| preset.show_in_picker)

View File

@@ -254,14 +254,20 @@ impl Codex {
.map_err(|err| CodexErr::Fatal(format!("failed to load execpolicy: {err}")))?;
let config = Arc::new(config);
if config.features.enabled(Feature::RemoteModels)
&& let Err(err) = models_manager
.refresh_available_models_with_cache(&config)
.await
{
error!("failed to refresh available models: {err:?}");
}
let model = models_manager.get_model(&config.model, &config).await;
// todo(aibrahim): this should be done on models manager startup instead of here
let _ = models_manager
.list_models(
&config,
crate::models_manager::manager::RefreshStrategy::default(),
)
.await;
let model = models_manager
.get_default_model(
&config.model,
&config,
crate::models_manager::manager::RefreshStrategy::default(),
)
.await;
let session_configuration = SessionConfiguration {
provider: config.model_provider.clone(),
model: model.clone(),
@@ -965,7 +971,7 @@ impl Session {
let model_info = self
.services
.models_manager
.construct_model_info(session_configuration.model.as_str(), &per_turn_config)
.get_model_info(session_configuration.model.as_str(), &per_turn_config)
.await;
let mut turn_context: TurnContext = Self::make_turn_context(
Some(Arc::clone(&self.services.auth_manager)),
@@ -988,6 +994,14 @@ impl Session {
.await
}
async fn get_config(&self) -> std::sync::Arc<Config> {
let state = self.state.lock().await;
state
.session_configuration
.original_config_do_not_use
.clone()
}
pub(crate) async fn new_default_turn_with_sub_id(&self, sub_id: String) -> Arc<TurnContext> {
let session_configuration = {
let state = self.state.lock().await;
@@ -2374,7 +2388,7 @@ async fn spawn_review_thread(
let review_model_info = sess
.services
.models_manager
.construct_model_info(&model, &config)
.get_model_info(&model, &config)
.await;
// For reviews, disable web_search and view_image regardless of global settings.
let mut review_features = sess.features.clone();
@@ -2904,9 +2918,10 @@ async fn try_run_turn(
}
ResponseEvent::ModelsEtag(etag) => {
// Update internal state with latest models etag
let config = sess.get_config().await;
sess.services
.models_manager
.refresh_if_new_etag(etag, sess.features.enabled(Feature::RemoteModels))
.refresh_if_new_etag(etag, &config)
.await;
}
ResponseEvent::Completed {
@@ -3641,10 +3656,8 @@ mod tests {
let conversation_id = ThreadId::default();
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key"));
let models_manager = Arc::new(ModelsManager::new(
config.codex_home.clone(),
auth_manager.clone(),
));
let models_manager =
Arc::new(ModelsManager::new((*config).clone(), auth_manager.clone()).await);
let agent_control = AgentControl::default();
let exec_policy = ExecPolicyManager::default();
let (agent_status_tx, _agent_status_rx) = watch::channel(AgentStatus::PendingInit);
@@ -3736,10 +3749,8 @@ mod tests {
let conversation_id = ThreadId::default();
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key"));
let models_manager = Arc::new(ModelsManager::new(
config.codex_home.clone(),
auth_manager.clone(),
));
let models_manager =
Arc::new(ModelsManager::new((*config).clone(), auth_manager.clone()).await);
let agent_control = AgentControl::default();
let exec_policy = ExecPolicyManager::default();
let (agent_status_tx, _agent_status_rx) = watch::channel(AgentStatus::PendingInit);

View File

@@ -5,22 +5,128 @@ use serde::Deserialize;
use serde::Serialize;
use std::io;
use std::io::ErrorKind;
use std::path::Path;
use std::path::PathBuf;
use std::time::Duration;
use tokio::fs;
use tracing::error;
/// Manages loading and saving of models cache to disk.
#[derive(Debug)]
pub(crate) struct ModelsCacheManager {
cache_path: PathBuf,
cache_ttl: Duration,
}
impl ModelsCacheManager {
/// Create a new cache manager with the given path and TTL.
pub(crate) fn new(cache_path: PathBuf, cache_ttl: Duration) -> Self {
Self {
cache_path,
cache_ttl,
}
}
/// Attempt to load a fresh cache entry. Returns `None` if the cache doesn't exist or is stale.
pub(crate) async fn load_fresh(&self) -> Option<CacheData> {
let cache = match self.load().await {
Ok(cache) => cache?,
Err(err) => {
error!("failed to load models cache: {err}");
return None;
}
};
if !cache.is_fresh(self.cache_ttl) {
return None;
}
Some(CacheData {
models: cache.models,
etag: cache.etag,
})
}
/// Persist the cache to disk, creating parent directories as needed.
pub(crate) async fn persist_cache(&self, models: &[ModelInfo], etag: Option<String>) {
let cache = ModelsCache {
fetched_at: Utc::now(),
etag,
models: models.to_vec(),
};
if let Err(err) = self.save_internal(&cache).await {
error!("failed to write models cache: {err}");
}
}
/// Renew the cache TTL by updating the fetched_at timestamp to the current time.
/// This rewrites the cache with a newer fetched_at value, effectively resetting the TTL.
pub(crate) async fn renew_cache_ttl(&self) -> io::Result<()> {
let mut cache = match self.load().await? {
Some(cache) => cache,
None => return Err(io::Error::new(ErrorKind::NotFound, "cache not found")),
};
cache.fetched_at = Utc::now();
self.save_internal(&cache).await
}
async fn load(&self) -> io::Result<Option<ModelsCache>> {
match fs::read(&self.cache_path).await {
Ok(contents) => {
let cache = serde_json::from_slice(&contents)
.map_err(|err| io::Error::new(ErrorKind::InvalidData, err.to_string()))?;
Ok(Some(cache))
}
Err(err) if err.kind() == ErrorKind::NotFound => Ok(None),
Err(err) => Err(err),
}
}
async fn save_internal(&self, cache: &ModelsCache) -> io::Result<()> {
if let Some(parent) = self.cache_path.parent() {
fs::create_dir_all(parent).await?;
}
let json = serde_json::to_vec_pretty(cache)
.map_err(|err| io::Error::new(ErrorKind::InvalidData, err.to_string()))?;
fs::write(&self.cache_path, json).await
}
#[cfg(test)]
/// Set the cache TTL.
pub(crate) fn set_ttl(&mut self, ttl: Duration) {
self.cache_ttl = ttl;
}
#[cfg(test)]
/// Manipulate cache file for testing. Allows setting a custom fetched_at timestamp.
pub(crate) async fn manipulate_cache_for_test<F>(&self, f: F) -> io::Result<()>
where
F: FnOnce(&mut DateTime<Utc>),
{
let mut cache = match self.load().await? {
Some(cache) => cache,
None => return Err(io::Error::new(ErrorKind::NotFound, "cache not found")),
};
f(&mut cache.fetched_at);
self.save_internal(&cache).await
}
}
/// Data returned from loading a fresh cache entry.
pub(crate) struct CacheData {
pub(crate) models: Vec<ModelInfo>,
pub(crate) etag: Option<String>,
}
/// Serialized snapshot of models and metadata cached on disk.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct ModelsCache {
pub(crate) fetched_at: DateTime<Utc>,
struct ModelsCache {
fetched_at: DateTime<Utc>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub(crate) etag: Option<String>,
pub(crate) models: Vec<ModelInfo>,
etag: Option<String>,
models: Vec<ModelInfo>,
}
impl ModelsCache {
/// Returns `true` when the cache entry has not exceeded the configured TTL.
pub(crate) fn is_fresh(&self, ttl: Duration) -> bool {
fn is_fresh(&self, ttl: Duration) -> bool {
if ttl.is_zero() {
return false;
}
@@ -31,26 +137,3 @@ impl ModelsCache {
age <= ttl_duration
}
}
/// Read and deserialize the cache file if it exists.
pub(crate) async fn load_cache(path: &Path) -> io::Result<Option<ModelsCache>> {
match fs::read(path).await {
Ok(contents) => {
let cache = serde_json::from_slice(&contents)
.map_err(|err| io::Error::new(ErrorKind::InvalidData, err.to_string()))?;
Ok(Some(cache))
}
Err(err) if err.kind() == ErrorKind::NotFound => Ok(None),
Err(err) => Err(err),
}
}
/// Persist the cache contents to disk, creating parent directories as needed.
pub(crate) async fn save_cache(path: &Path, cache: &ModelsCache) -> io::Result<()> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).await?;
}
let json = serde_json::to_vec_pretty(cache)
.map_err(|err| io::Error::new(ErrorKind::InvalidData, err.to_string()))?;
fs::write(path, json).await
}

View File

@@ -1,4 +1,3 @@
use chrono::Utc;
use codex_api::ModelsClient;
use codex_api::ReqwestTransport;
use codex_app_server_protocol::AuthMode;
@@ -6,7 +5,6 @@ use codex_protocol::openai_models::ModelInfo;
use codex_protocol::openai_models::ModelPreset;
use codex_protocol::openai_models::ModelsResponse;
use http::HeaderMap;
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
@@ -15,8 +13,7 @@ use tokio::sync::TryLockError;
use tokio::time::timeout;
use tracing::error;
use super::cache;
use super::cache::ModelsCache;
use super::cache::ModelsCacheManager;
use crate::api_bridge::auth_provider_from_auth;
use crate::api_bridge::map_api_error;
use crate::auth::AuthManager;
@@ -36,6 +33,18 @@ const OPENAI_DEFAULT_API_MODEL: &str = "gpt-5.1-codex-max";
const OPENAI_DEFAULT_CHATGPT_MODEL: &str = "gpt-5.2-codex";
const CODEX_AUTO_BALANCED_MODEL: &str = "codex-auto-balanced";
/// Strategy for refreshing available models.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum RefreshStrategy {
/// Always fetch from the network, ignoring cache.
Online,
/// Only use cached data, never fetch from the network.
Offline,
/// Use cache if available and fresh, otherwise fetch from the network.
#[default]
OnlineIfUncached,
}
/// Coordinates remote model discovery plus cached metadata on disk.
#[derive(Debug)]
pub struct ModelsManager {
@@ -43,64 +52,166 @@ pub struct ModelsManager {
remote_models: RwLock<Vec<ModelInfo>>,
auth_manager: Arc<AuthManager>,
etag: RwLock<Option<String>>,
codex_home: PathBuf,
cache_ttl: Duration,
cache_manager: ModelsCacheManager,
provider: ModelProviderInfo,
}
impl ModelsManager {
/// Construct a manager scoped to the provided `AuthManager`.
pub fn new(codex_home: PathBuf, auth_manager: Arc<AuthManager>) -> Self {
Self {
///
/// Uses `config.codex_home` to store cached model metadata and initializes with built-in presets.
/// Refreshes available models using the default strategy.
pub async fn new(config: Config, auth_manager: Arc<AuthManager>) -> Self {
let cache_path = config.codex_home.join(MODEL_CACHE_FILE);
let cache_manager = ModelsCacheManager::new(cache_path, DEFAULT_MODEL_CACHE_TTL);
let manager = Self {
local_models: builtin_model_presets(auth_manager.get_auth_mode()),
remote_models: RwLock::new(Self::load_remote_models_from_file().unwrap_or_default()),
auth_manager,
etag: RwLock::new(None),
codex_home,
cache_ttl: DEFAULT_MODEL_CACHE_TTL,
cache_manager,
provider: ModelProviderInfo::create_openai_provider(),
};
if let Err(err) = manager
.refresh_available_models(&config, RefreshStrategy::default())
.await
{
error!("failed to refresh available models during initialization: {err}");
}
manager
}
/// List all available models, refreshing according to the specified strategy.
///
/// Returns model presets sorted by priority and filtered by auth mode and visibility.
pub async fn list_models(
&self,
config: &Config,
refresh_strategy: RefreshStrategy,
) -> Vec<ModelPreset> {
if let Err(err) = self
.refresh_available_models(config, refresh_strategy)
.await
{
error!("failed to refresh available models: {err}");
}
let remote_models = self.get_remote_models(config).await;
self.build_available_models(remote_models)
}
/// Attempt to list models without blocking, using the current cached state.
///
/// Returns an error if the internal lock cannot be acquired.
pub fn try_list_models(&self, config: &Config) -> Result<Vec<ModelPreset>, TryLockError> {
let remote_models = self.try_get_remote_models(config)?;
Ok(self.build_available_models(remote_models))
}
// todo(aibrahim): should be visible to core only and sent on session_configured event
/// Get the model identifier to use, refreshing according to the specified strategy.
///
/// If `model` is provided, returns it directly. Otherwise selects the default based on
/// auth mode and available models (prefers `codex-auto-balanced` for ChatGPT auth).
pub async fn get_default_model(
&self,
model: &Option<String>,
config: &Config,
refresh_strategy: RefreshStrategy,
) -> String {
if let Some(model) = model.as_ref() {
return model.to_string();
}
if let Err(err) = self
.refresh_available_models(config, refresh_strategy)
.await
{
error!("failed to refresh available models: {err}");
}
// if codex-auto-balanced exists & signed in with chatgpt mode, return it, otherwise return the default model
let auth_mode = self.auth_manager.get_auth_mode();
let remote_models = self.get_remote_models(config).await;
if auth_mode == Some(AuthMode::ChatGPT) {
let has_auto_balanced = self
.build_available_models(remote_models)
.iter()
.any(|model| model.model == CODEX_AUTO_BALANCED_MODEL && model.show_in_picker);
if has_auto_balanced {
return CODEX_AUTO_BALANCED_MODEL.to_string();
}
return OPENAI_DEFAULT_CHATGPT_MODEL.to_string();
}
OPENAI_DEFAULT_API_MODEL.to_string()
}
// todo(aibrahim): look if we can tighten it to pub(crate)
/// Look up model metadata, applying remote overrides and config adjustments.
pub async fn get_model_info(&self, model: &str, config: &Config) -> ModelInfo {
let remote = self
.get_remote_models(config)
.await
.into_iter()
.find(|m| m.slug == model);
let model = if let Some(remote) = remote {
remote
} else {
model_info::find_model_info_for_slug(model)
};
model_info::with_config_overrides(model, config)
}
/// Refresh models if the provided ETag differs from the cached ETag.
///
/// Uses `Online` strategy to fetch latest models when ETags differ.
/// If ETags match, renews the cache TTL to extend its validity.
pub(crate) async fn refresh_if_new_etag(&self, etag: String, config: &Config) {
let current_etag = self.get_etag().await;
if current_etag.clone().is_some() && current_etag.as_deref() == Some(etag.as_str()) {
if let Err(err) = self.cache_manager.renew_cache_ttl().await {
error!("failed to renew cache TTL: {err}");
}
return;
}
if let Err(err) = self
.refresh_available_models(config, RefreshStrategy::Online)
.await
{
error!("failed to refresh available models: {err}");
}
}
#[cfg(any(test, feature = "test-support"))]
/// Construct a manager scoped to the provided `AuthManager` with a specific provider. Used for integration tests.
pub fn with_provider(
codex_home: PathBuf,
auth_manager: Arc<AuthManager>,
provider: ModelProviderInfo,
) -> Self {
Self {
local_models: builtin_model_presets(auth_manager.get_auth_mode()),
remote_models: RwLock::new(Self::load_remote_models_from_file().unwrap_or_default()),
auth_manager,
etag: RwLock::new(None),
codex_home,
cache_ttl: DEFAULT_MODEL_CACHE_TTL,
provider,
}
}
/// Fetch the latest remote models, using the on-disk cache when still fresh.
pub async fn refresh_available_models_with_cache(&self, config: &Config) -> CoreResult<()> {
/// Refresh available models according to the specified strategy.
async fn refresh_available_models(
&self,
config: &Config,
refresh_strategy: RefreshStrategy,
) -> CoreResult<()> {
if !config.features.enabled(Feature::RemoteModels)
|| self.auth_manager.get_auth_mode() == Some(AuthMode::ApiKey)
{
return Ok(());
}
if self.try_load_cache().await {
return Ok(());
match refresh_strategy {
RefreshStrategy::Offline => {
// Only try to load from cache, never fetch
self.try_load_cache().await;
Ok(())
}
RefreshStrategy::OnlineIfUncached => {
// Try cache first, fall back to online if unavailable
if self.try_load_cache().await {
return Ok(());
}
self.fetch_and_update_models().await
}
RefreshStrategy::Online => {
// Always fetch from network
self.fetch_and_update_models().await
}
}
self.refresh_available_models_no_cache(config.features.enabled(Feature::RemoteModels))
.await
}
pub(crate) async fn refresh_available_models_no_cache(
&self,
remote_models_feature: bool,
) -> CoreResult<()> {
if !remote_models_feature || self.auth_manager.get_auth_mode() == Some(AuthMode::ApiKey) {
return Ok(());
}
async fn fetch_and_update_models(&self) -> CoreResult<()> {
let auth = self.auth_manager.auth().await;
let api_provider = self.provider.to_api_provider(Some(AuthMode::ChatGPT))?;
let api_auth = auth_provider_from_auth(auth.clone(), &self.provider)?;
@@ -118,84 +229,10 @@ impl ModelsManager {
self.apply_remote_models(models.clone()).await;
*self.etag.write().await = etag.clone();
self.persist_cache(&models, etag).await;
self.cache_manager.persist_cache(&models, etag).await;
Ok(())
}
pub async fn list_models(&self, config: &Config) -> Vec<ModelPreset> {
if let Err(err) = self.refresh_available_models_with_cache(config).await {
error!("failed to refresh available models: {err}");
}
let remote_models = self.remote_models(config).await;
self.build_available_models(remote_models)
}
pub fn try_list_models(&self, config: &Config) -> Result<Vec<ModelPreset>, TryLockError> {
let remote_models = self.try_get_remote_models(config)?;
Ok(self.build_available_models(remote_models))
}
/// Look up the requested model metadata while applying remote metadata overrides.
pub async fn construct_model_info(&self, model: &str, config: &Config) -> ModelInfo {
let remote = self
.remote_models(config)
.await
.into_iter()
.find(|m| m.slug == model);
let model = if let Some(remote) = remote {
remote
} else {
model_info::find_model_info_for_slug(model)
};
model_info::with_config_overrides(model, config)
}
pub async fn get_model(&self, model: &Option<String>, config: &Config) -> String {
if let Some(model) = model.as_ref() {
return model.to_string();
}
if let Err(err) = self.refresh_available_models_with_cache(config).await {
error!("failed to refresh available models: {err}");
}
// if codex-auto-balanced exists & signed in with chatgpt mode, return it, otherwise return the default model
let auth_mode = self.auth_manager.get_auth_mode();
let remote_models = self.remote_models(config).await;
if auth_mode == Some(AuthMode::ChatGPT) {
let has_auto_balanced = self
.build_available_models(remote_models)
.iter()
.any(|model| model.model == CODEX_AUTO_BALANCED_MODEL && model.show_in_picker);
if has_auto_balanced {
return CODEX_AUTO_BALANCED_MODEL.to_string();
}
return OPENAI_DEFAULT_CHATGPT_MODEL.to_string();
}
OPENAI_DEFAULT_API_MODEL.to_string()
}
pub async fn refresh_if_new_etag(&self, etag: String, remote_models_feature: bool) {
let current_etag = self.get_etag().await;
if current_etag.clone().is_some() && current_etag.as_deref() == Some(etag.as_str()) {
return;
}
if let Err(err) = self
.refresh_available_models_no_cache(remote_models_feature)
.await
{
error!("failed to refresh available models: {err}");
}
}
#[cfg(any(test, feature = "test-support"))]
pub fn get_model_offline(model: Option<&str>) -> String {
model.unwrap_or(OPENAI_DEFAULT_CHATGPT_MODEL).to_string()
}
#[cfg(any(test, feature = "test-support"))]
/// Offline helper that builds a `ModelInfo` without consulting remote state.
pub fn construct_model_info_offline(model: &str, config: &Config) -> ModelInfo {
model_info::with_config_overrides(model_info::find_model_info_for_slug(model), config)
}
async fn get_etag(&self) -> Option<String> {
self.etag.read().await.clone()
}
@@ -213,49 +250,25 @@ impl ModelsManager {
/// Attempt to satisfy the refresh from the cache when it matches the provider and TTL.
async fn try_load_cache(&self) -> bool {
// todo(aibrahim): think if we should store fetched_at in ModelsManager so we don't always need to read the disk
let cache_path = self.cache_path();
let cache = match cache::load_cache(&cache_path).await {
Ok(cache) => cache,
Err(err) => {
error!("failed to load models cache: {err}");
return false;
}
};
let cache = match cache {
let cache = match self.cache_manager.load_fresh().await {
Some(cache) => cache,
None => return false,
};
if !cache.is_fresh(self.cache_ttl) {
return false;
}
let models = cache.models.clone();
*self.etag.write().await = cache.etag.clone();
self.apply_remote_models(models.clone()).await;
true
}
/// Serialize the latest fetch to disk for reuse across future processes.
async fn persist_cache(&self, models: &[ModelInfo], etag: Option<String>) {
let cache = ModelsCache {
fetched_at: Utc::now(),
etag,
models: models.to_vec(),
};
let cache_path = self.cache_path();
if let Err(err) = cache::save_cache(&cache_path, &cache).await {
error!("failed to write models cache: {err}");
}
}
/// Merge remote model metadata into picker-ready presets, preserving existing entries.
fn build_available_models(&self, mut remote_models: Vec<ModelInfo>) -> Vec<ModelPreset> {
remote_models.sort_by(|a, b| a.priority.cmp(&b.priority));
let remote_presets: Vec<ModelPreset> = remote_models.into_iter().map(Into::into).collect();
let existing_presets = self.local_models.clone();
let mut merged_presets = Self::merge_presets(remote_presets, existing_presets);
merged_presets = self.filter_visible_models(merged_presets);
let mut merged_presets = ModelPreset::merge(remote_presets, existing_presets);
let chatgpt_mode = self.auth_manager.get_auth_mode() == Some(AuthMode::ChatGPT);
merged_presets = ModelPreset::with_auth(merged_presets, chatgpt_mode);
let has_default = merged_presets.iter().any(|preset| preset.is_default);
if !has_default {
@@ -272,40 +285,7 @@ impl ModelsManager {
merged_presets
}
fn filter_visible_models(&self, models: Vec<ModelPreset>) -> Vec<ModelPreset> {
let chatgpt_mode = self.auth_manager.get_auth_mode() == Some(AuthMode::ChatGPT);
models
.into_iter()
.filter(|model| chatgpt_mode || model.supported_in_api)
.collect()
}
fn merge_presets(
remote_presets: Vec<ModelPreset>,
existing_presets: Vec<ModelPreset>,
) -> Vec<ModelPreset> {
if remote_presets.is_empty() {
return existing_presets;
}
let remote_slugs: HashSet<&str> = remote_presets
.iter()
.map(|preset| preset.model.as_str())
.collect();
let mut merged_presets = remote_presets.clone();
for mut preset in existing_presets {
if remote_slugs.contains(preset.model.as_str()) {
continue;
}
preset.is_default = false;
merged_presets.push(preset);
}
merged_presets
}
async fn remote_models(&self, config: &Config) -> Vec<ModelInfo> {
async fn get_remote_models(&self, config: &Config) -> Vec<ModelInfo> {
if config.features.enabled(Feature::RemoteModels) {
self.remote_models.read().await.clone()
} else {
@@ -321,8 +301,35 @@ impl ModelsManager {
}
}
fn cache_path(&self) -> PathBuf {
self.codex_home.join(MODEL_CACHE_FILE)
#[cfg(any(test, feature = "test-support"))]
/// Construct a manager with a specific provider for testing.
pub fn with_provider(
codex_home: PathBuf,
auth_manager: Arc<AuthManager>,
provider: ModelProviderInfo,
) -> Self {
let cache_path = codex_home.join(MODEL_CACHE_FILE);
let cache_manager = ModelsCacheManager::new(cache_path, DEFAULT_MODEL_CACHE_TTL);
Self {
local_models: builtin_model_presets(auth_manager.get_auth_mode()),
remote_models: RwLock::new(Self::load_remote_models_from_file().unwrap_or_default()),
auth_manager,
etag: RwLock::new(None),
cache_manager,
provider,
}
}
#[cfg(any(test, feature = "test-support"))]
/// Get model identifier without consulting remote state or cache.
pub fn get_model_offline(model: Option<&str>) -> String {
model.unwrap_or(OPENAI_DEFAULT_CHATGPT_MODEL).to_string()
}
#[cfg(any(test, feature = "test-support"))]
/// Build `ModelInfo` without consulting remote state or cache.
pub fn construct_model_info_offline(model: &str, config: &Config) -> ModelInfo {
model_info::with_config_overrides(model_info::find_model_info_for_slug(model), config)
}
}
@@ -338,13 +345,13 @@ fn format_client_version_to_whole() -> String {
#[cfg(test)]
mod tests {
use super::cache::ModelsCache;
use super::*;
use crate::CodexAuth;
use crate::auth::AuthCredentialsStoreMode;
use crate::config::ConfigBuilder;
use crate::features::Feature;
use crate::model_provider_info::WireApi;
use chrono::Utc;
use codex_protocol::openai_models::ModelsResponse;
use core_test_support::responses::mount_models_once;
use pretty_assertions::assert_eq;
@@ -434,13 +441,15 @@ mod tests {
ModelsManager::with_provider(codex_home.path().to_path_buf(), auth_manager, provider);
manager
.refresh_available_models_with_cache(&config)
.refresh_available_models(&config, RefreshStrategy::OnlineIfUncached)
.await
.expect("refresh succeeds");
let cached_remote = manager.remote_models(&config).await;
let cached_remote = manager.get_remote_models(&config).await;
assert_eq!(cached_remote, remote_models);
let available = manager.list_models(&config).await;
let available = manager
.list_models(&config, RefreshStrategy::OnlineIfUncached)
.await;
let high_idx = available
.iter()
.position(|model| model.model == "priority-high")
@@ -494,22 +503,22 @@ mod tests {
ModelsManager::with_provider(codex_home.path().to_path_buf(), auth_manager, provider);
manager
.refresh_available_models_with_cache(&config)
.refresh_available_models(&config, RefreshStrategy::OnlineIfUncached)
.await
.expect("first refresh succeeds");
assert_eq!(
manager.remote_models(&config).await,
manager.get_remote_models(&config).await,
remote_models,
"remote cache should store fetched models"
);
// Second call should read from cache and avoid the network.
manager
.refresh_available_models_with_cache(&config)
.refresh_available_models(&config, RefreshStrategy::OnlineIfUncached)
.await
.expect("cached refresh succeeds");
assert_eq!(
manager.remote_models(&config).await,
manager.get_remote_models(&config).await,
remote_models,
"cache path should not mutate stored models"
);
@@ -549,19 +558,18 @@ mod tests {
ModelsManager::with_provider(codex_home.path().to_path_buf(), auth_manager, provider);
manager
.refresh_available_models_with_cache(&config)
.refresh_available_models(&config, RefreshStrategy::OnlineIfUncached)
.await
.expect("initial refresh succeeds");
// Rewrite cache with an old timestamp so it is treated as stale.
let cache_path = codex_home.path().join(MODEL_CACHE_FILE);
let contents =
std::fs::read_to_string(&cache_path).expect("cache file should exist after refresh");
let mut cache: ModelsCache =
serde_json::from_str(&contents).expect("cache should deserialize");
cache.fetched_at = Utc::now() - chrono::Duration::hours(1);
std::fs::write(&cache_path, serde_json::to_string_pretty(&cache).unwrap())
.expect("cache rewrite succeeds");
manager
.cache_manager
.manipulate_cache_for_test(|fetched_at| {
*fetched_at = Utc::now() - chrono::Duration::hours(1);
})
.await
.expect("cache manipulation succeeds");
let updated_models = vec![remote_model("fresh", "Fresh", 9)];
server.reset().await;
@@ -574,11 +582,11 @@ mod tests {
.await;
manager
.refresh_available_models_with_cache(&config)
.refresh_available_models(&config, RefreshStrategy::OnlineIfUncached)
.await
.expect("second refresh succeeds");
assert_eq!(
manager.remote_models(&config).await,
manager.get_remote_models(&config).await,
updated_models,
"stale cache should trigger refetch"
);
@@ -618,10 +626,10 @@ mod tests {
let provider = provider_for(server.uri());
let mut manager =
ModelsManager::with_provider(codex_home.path().to_path_buf(), auth_manager, provider);
manager.cache_ttl = Duration::ZERO;
manager.cache_manager.set_ttl(Duration::ZERO);
manager
.refresh_available_models_with_cache(&config)
.refresh_available_models(&config, RefreshStrategy::OnlineIfUncached)
.await
.expect("initial refresh succeeds");
@@ -636,7 +644,7 @@ mod tests {
.await;
manager
.refresh_available_models_with_cache(&config)
.refresh_available_models(&config, RefreshStrategy::OnlineIfUncached)
.await
.expect("second refresh succeeds");

View File

@@ -65,19 +65,18 @@ pub(crate) struct ThreadManagerState {
}
impl ThreadManager {
pub fn new(
codex_home: PathBuf,
pub async fn new(
config: Config,
auth_manager: Arc<AuthManager>,
session_source: SessionSource,
) -> Self {
Self {
state: Arc::new(ThreadManagerState {
threads: Arc::new(RwLock::new(HashMap::new())),
models_manager: Arc::new(ModelsManager::new(
codex_home.clone(),
auth_manager.clone(),
)),
skills_manager: Arc::new(SkillsManager::new(codex_home)),
models_manager: Arc::new(
ModelsManager::new(config.clone(), auth_manager.clone()).await,
),
skills_manager: Arc::new(SkillsManager::new(config.codex_home.clone())),
auth_manager,
session_source,
#[cfg(any(test, feature = "test-support"))]
@@ -138,8 +137,15 @@ impl ThreadManager {
self.state.models_manager.clone()
}
pub async fn list_models(&self, config: &Config) -> Vec<ModelPreset> {
self.state.models_manager.list_models(config).await
pub async fn list_models(
&self,
config: &Config,
refresh_strategy: crate::models_manager::manager::RefreshStrategy,
) -> Vec<ModelPreset> {
self.state
.models_manager
.list_models(config, refresh_strategy)
.await
}
pub async fn list_thread_ids(&self) -> Vec<ThreadId> {

View File

@@ -574,11 +574,8 @@ async fn prefers_apikey_when_config_prefers_apikey_even_with_chatgpt_tokens() {
Ok(None) => panic!("No CodexAuth found in codex_home"),
Err(e) => panic!("Failed to load CodexAuth: {e}"),
};
let thread_manager = ThreadManager::new(
codex_home.path().to_path_buf(),
auth_manager,
SessionSource::Exec,
);
let thread_manager =
ThreadManager::new(config.clone(), auth_manager, SessionSource::Exec).await;
let NewThread { thread: codex, .. } = thread_manager
.start_thread(config)
.await

View File

@@ -2,6 +2,7 @@ use anyhow::Result;
use codex_core::CodexAuth;
use codex_core::ThreadManager;
use codex_core::built_in_model_providers;
use codex_core::models_manager::manager::RefreshStrategy;
use codex_protocol::openai_models::ModelPreset;
use codex_protocol::openai_models::ReasoningEffort;
use codex_protocol::openai_models::ReasoningEffortPreset;
@@ -18,7 +19,9 @@ async fn list_models_returns_api_key_models() -> Result<()> {
CodexAuth::from_api_key("sk-test"),
built_in_model_providers()["openai"].clone(),
);
let models = manager.list_models(&config).await;
let models = manager
.list_models(&config, RefreshStrategy::default())
.await;
let expected_models = expected_models_for_api_key();
assert_eq!(expected_models, models);
@@ -34,7 +37,9 @@ async fn list_models_returns_chatgpt_models() -> Result<()> {
CodexAuth::create_dummy_chatgpt_auth_for_testing(),
built_in_model_providers()["openai"].clone(),
);
let models = manager.list_models(&config).await;
let models = manager
.list_models(&config, RefreshStrategy::default())
.await;
let expected_models = expected_models_for_chatgpt();
assert_eq!(expected_models, models);

View File

@@ -42,6 +42,7 @@ mod live_cli;
mod model_info_overrides;
mod model_overrides;
mod model_tools;
mod models_cache_ttl;
mod models_etag_responses;
mod otel;
mod pending_input;

View File

@@ -0,0 +1,176 @@
#![cfg(not(target_os = "windows"))]
use std::path::Path;
use std::sync::Arc;
use anyhow::Result;
use chrono::DateTime;
use chrono::TimeZone;
use chrono::Utc;
use codex_core::CodexAuth;
use codex_core::features::Feature;
use codex_core::models_manager::manager::RefreshStrategy;
use codex_core::protocol::EventMsg;
use codex_core::protocol::Op;
use codex_core::protocol::SandboxPolicy;
use codex_protocol::config_types::ReasoningSummary;
use codex_protocol::openai_models::ConfigShellToolType;
use codex_protocol::openai_models::ModelInfo;
use codex_protocol::openai_models::ModelVisibility;
use codex_protocol::openai_models::ModelsResponse;
use codex_protocol::openai_models::ReasoningEffort;
use codex_protocol::openai_models::ReasoningEffortPreset;
use codex_protocol::openai_models::TruncationPolicyConfig;
use codex_protocol::user_input::UserInput;
use core_test_support::responses;
use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::sse;
use core_test_support::responses::sse_response;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use pretty_assertions::assert_eq;
use serde::Deserialize;
use serde::Serialize;
use wiremock::MockServer;
const ETAG: &str = "\"models-etag-ttl\"";
const CACHE_FILE: &str = "models_cache.json";
const REMOTE_MODEL: &str = "codex-test-ttl";
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn renews_cache_ttl_on_matching_models_etag() -> Result<()> {
let server = MockServer::start().await;
let remote_model = test_remote_model(REMOTE_MODEL, 1);
let models_mock = responses::mount_models_once_with_etag(
&server,
ModelsResponse {
models: vec![remote_model.clone()],
},
ETAG,
)
.await;
let mut builder = test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing());
builder = builder.with_config(|config| {
config.features.enable(Feature::RemoteModels);
config.model = Some("gpt-5".to_string());
config.model_provider.request_max_retries = Some(0);
config.model_provider.stream_max_retries = Some(1);
});
let test = builder.build(&server).await?;
let codex = Arc::clone(&test.codex);
let config = test.config.clone();
// Populate cache via initial refresh.
let models_manager = test.thread_manager.get_models_manager();
let _ = models_manager.list_models(&config, RefreshStrategy::OnlineIfUncached).await;
let cache_path = config.codex_home.join(CACHE_FILE);
let stale_time = Utc.timestamp_opt(0, 0).single().expect("valid epoch");
rewrite_cache_timestamp(&cache_path, stale_time).await?;
// Trigger responses with matching ETag, which should renew the cache TTL without another /models.
let response_body = sse(vec![
ev_response_created("resp-1"),
ev_assistant_message("msg-1", "done"),
ev_completed("resp-1"),
]);
let _responses_mock = responses::mount_response_once(
&server,
sse_response(response_body).insert_header("X-Models-Etag", ETAG),
)
.await;
codex
.submit(Op::UserTurn {
items: vec![UserInput::Text { text: "hi".into() }],
final_output_json_schema: None,
cwd: test.cwd_path().to_path_buf(),
approval_policy: codex_core::protocol::AskForApproval::Never,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: test.session_configured.model.clone(),
effort: None,
summary: ReasoningSummary::Auto,
})
.await?;
let _ = wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
let refreshed_cache = read_cache(&cache_path).await?;
assert!(
refreshed_cache.fetched_at > stale_time,
"cache TTL should be renewed"
);
assert_eq!(
models_mock.requests().len(),
1,
"/models should not refetch on matching etag"
);
// Cached models remain usable offline.
let offline_models = test
.thread_manager
.list_models(&config, RefreshStrategy::Offline)
.await;
assert!(
offline_models
.iter()
.any(|preset| preset.model == REMOTE_MODEL),
"offline listing should use renewed cache"
);
Ok(())
}
async fn rewrite_cache_timestamp(path: &Path, fetched_at: DateTime<Utc>) -> Result<()> {
let mut cache = read_cache(path).await?;
cache.fetched_at = fetched_at;
let contents = serde_json::to_vec_pretty(&cache)?;
tokio::fs::write(path, contents).await?;
Ok(())
}
async fn read_cache(path: &Path) -> Result<ModelsCache> {
let contents = tokio::fs::read(path).await?;
let cache = serde_json::from_slice(&contents)?;
Ok(cache)
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct ModelsCache {
fetched_at: DateTime<Utc>,
#[serde(default)]
etag: Option<String>,
models: Vec<ModelInfo>,
}
fn test_remote_model(slug: &str, priority: i32) -> ModelInfo {
serde_json::from_value(serde_json::json!({
"slug": slug,
"display_name": "Remote Test",
"description": "remote model",
"default_reasoning_level": "medium",
"supported_reasoning_levels": [{"effort": "low", "description": "low"}, {"effort": "medium", "description": "medium"}],
"shell_type": ConfigShellToolType::ShellCommand,
"visibility": ModelVisibility::List,
"minimal_client_version": [0, 1, 0],
"supported_in_api": true,
"priority": priority,
"upgrade": null,
"base_instructions": "base instructions",
"supports_reasoning_summaries": false,
"support_verbosity": false,
"default_verbosity": null,
"apply_patch_tool_type": null,
"truncation_policy": {"mode": "bytes", "limit": 10_000},
"supports_parallel_tool_calls": false,
"context_window": 272_000,
"effective_context_window_percent": 95,
"experimental_supported_tools": [],
}))
.expect("valid model")
}

View File

@@ -85,7 +85,7 @@ async fn prompt_tools_are_consistent_across_requests() -> anyhow::Result<()> {
.await?;
let base_instructions = thread_manager
.get_models_manager()
.construct_model_info(
.get_model_info(
config
.model
.as_deref()

View File

@@ -7,9 +7,9 @@ use codex_core::CodexAuth;
use codex_core::ModelProviderInfo;
use codex_core::built_in_model_providers;
use codex_core::config::Config;
use codex_core::error::CodexErr;
use codex_core::features::Feature;
use codex_core::models_manager::manager::ModelsManager;
use codex_core::models_manager::manager::RefreshStrategy;
use codex_core::protocol::AskForApproval;
use codex_core::protocol::EventMsg;
use codex_core::protocol::ExecCommandSource;
@@ -127,7 +127,7 @@ async fn remote_models_remote_model_uses_unified_exec() -> Result<()> {
assert_eq!(requests[0].url.path(), "/v1/models");
let model_info = models_manager
.construct_model_info(REMOTE_MODEL_SLUG, &config)
.get_model_info(REMOTE_MODEL_SLUG, &config)
.await;
assert_eq!(model_info.shell_type, ConfigShellToolType::UnifiedExec);
@@ -225,9 +225,7 @@ async fn remote_models_truncation_policy_without_override_preserves_remote() ->
let models_manager = test.thread_manager.get_models_manager();
wait_for_model_available(&models_manager, slug, &test.config).await;
let model_info = models_manager
.construct_model_info(slug, &test.config)
.await;
let model_info = models_manager.get_model_info(slug, &test.config).await;
assert_eq!(
model_info.truncation_policy,
TruncationPolicyConfig::bytes(12_000)
@@ -273,9 +271,7 @@ async fn remote_models_truncation_policy_with_tool_output_override() -> Result<(
let models_manager = test.thread_manager.get_models_manager();
wait_for_model_available(&models_manager, slug, &test.config).await;
let model_info = models_manager
.construct_model_info(slug, &test.config)
.await;
let model_info = models_manager.get_model_info(slug, &test.config).await;
assert_eq!(
model_info.truncation_policy,
TruncationPolicyConfig::bytes(200)
@@ -423,12 +419,9 @@ async fn remote_models_preserve_builtin_presets() -> Result<()> {
provider,
);
manager
.refresh_available_models_with_cache(&config)
.await
.expect("refresh succeeds");
let available = manager.list_models(&config).await;
let available = manager
.list_models(&config, RefreshStrategy::OnlineIfUncached)
.await;
let remote = available
.iter()
.find(|model| model.model == "remote-alpha")
@@ -483,16 +476,19 @@ async fn remote_models_request_times_out_after_5s() -> Result<()> {
);
let start = Instant::now();
let refresh = timeout(
let model = timeout(
Duration::from_secs(7),
manager.refresh_available_models_with_cache(&config),
manager.get_default_model(&None, &config, RefreshStrategy::OnlineIfUncached),
)
.await;
let elapsed = start.elapsed();
let err = refresh
.expect("refresh should finish")
.expect_err("refresh should time out");
let request_summaries: Vec<String> = server
// get_model should return a default model even when refresh times out
let default_model = model.expect("get_model should finish and return default model");
assert!(
default_model == "gpt-5.2-codex" || default_model == "gpt-5.1-codex-max",
"get_model should return default model when refresh times out, got: {default_model}"
);
let _request_summaries: Vec<String> = server
.received_requests()
.await
.expect("mock server should capture requests")
@@ -507,10 +503,6 @@ async fn remote_models_request_times_out_after_5s() -> Result<()> {
elapsed < Duration::from_millis(5_800),
"expected models call to time out before the delayed response; took {elapsed:?}"
);
match err {
CodexErr::Timeout => {}
other => panic!("expected timeout error, got {other:?}; requests: {request_summaries:?}"),
}
assert_eq!(
models_mock.requests().len(),
1,
@@ -550,10 +542,14 @@ async fn remote_models_hide_picker_only_models() -> Result<()> {
provider,
);
let selected = manager.get_model(&None, &config).await;
let selected = manager
.get_default_model(&None, &config, RefreshStrategy::OnlineIfUncached)
.await;
assert_eq!(selected, "gpt-5.2-codex");
let available = manager.list_models(&config).await;
let available = manager
.list_models(&config, RefreshStrategy::OnlineIfUncached)
.await;
let hidden = available
.iter()
.find(|model| model.model == "codex-auto-balanced")
@@ -571,7 +567,9 @@ async fn wait_for_model_available(
let deadline = Instant::now() + Duration::from_secs(2);
loop {
if let Some(model) = {
let guard = manager.list_models(config).await;
let guard = manager
.list_models(config, RefreshStrategy::OnlineIfUncached)
.await;
guard.iter().find(|model| model.model == slug).cloned()
} {
return model;

View File

@@ -29,6 +29,7 @@ use codex_core::config::find_codex_home;
use codex_core::config::load_config_as_toml_with_cli_overrides;
use codex_core::config::resolve_oss_provider;
use codex_core::git_info::get_git_repo_root;
use codex_core::models_manager::manager::RefreshStrategy;
use codex_core::protocol::AskForApproval;
use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
@@ -303,14 +304,11 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
true,
config.cli_auth_credentials_store_mode,
);
let thread_manager = ThreadManager::new(
config.codex_home.clone(),
auth_manager.clone(),
SessionSource::Exec,
);
let thread_manager =
ThreadManager::new(config.clone(), auth_manager.clone(), SessionSource::Exec).await;
let default_model = thread_manager
.get_models_manager()
.get_model(&config.model, &config)
.get_default_model(&config.model, &config, RefreshStrategy::default())
.await;
// Handle resume subcommand by resolving a rollout path and using explicit resume API.

View File

@@ -96,25 +96,24 @@ pub async fn run_main(
})?;
// Task: process incoming messages.
let processor_handle = tokio::spawn({
let processor_handle = tokio::spawn(async move {
let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx);
let mut processor = MessageProcessor::new(
outgoing_message_sender,
codex_linux_sandbox_exe,
std::sync::Arc::new(config),
);
async move {
while let Some(msg) = incoming_rx.recv().await {
match msg {
JSONRPCMessage::Request(r) => processor.process_request(r).await,
JSONRPCMessage::Response(r) => processor.process_response(r).await,
JSONRPCMessage::Notification(n) => processor.process_notification(n).await,
JSONRPCMessage::Error(e) => processor.process_error(e),
}
)
.await;
while let Some(msg) = incoming_rx.recv().await {
match msg {
JSONRPCMessage::Request(r) => processor.process_request(r).await,
JSONRPCMessage::Response(r) => processor.process_response(r).await,
JSONRPCMessage::Notification(n) => processor.process_notification(n).await,
JSONRPCMessage::Error(e) => processor.process_error(e),
}
info!("processor task exited (channel closed)");
}
info!("processor task exited (channel closed)");
});
// Task: write outgoing messages to stdout.

View File

@@ -47,7 +47,7 @@ pub(crate) struct MessageProcessor {
impl MessageProcessor {
/// Create a new `MessageProcessor`, retaining a handle to the outgoing
/// `Sender` so handlers can enqueue messages to be written to stdout.
pub(crate) fn new(
pub(crate) async fn new(
outgoing: OutgoingMessageSender,
codex_linux_sandbox_exe: Option<PathBuf>,
config: Arc<Config>,
@@ -58,11 +58,8 @@ impl MessageProcessor {
false,
config.cli_auth_credentials_store_mode,
);
let thread_manager = Arc::new(ThreadManager::new(
config.codex_home.clone(),
auth_manager,
SessionSource::Mcp,
));
let thread_manager =
Arc::new(ThreadManager::new((*config).clone(), auth_manager, SessionSource::Mcp).await);
Self {
outgoing,
initialized: false,

View File

@@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::collections::HashSet;
use schemars::JsonSchema;
use serde::Deserialize;
@@ -243,6 +244,46 @@ impl From<ModelInfo> for ModelPreset {
}
}
impl ModelPreset {
/// Filter models based on authentication mode.
///
/// In ChatGPT mode, all models are visible. Otherwise, only API-supported models are shown.
pub fn with_auth(models: Vec<ModelPreset>, chatgpt_mode: bool) -> Vec<ModelPreset> {
models
.into_iter()
.filter(|model| chatgpt_mode || model.supported_in_api)
.collect()
}
/// Merge remote presets with existing presets, preferring remote when slugs match.
///
/// Remote presets take precedence. Existing presets not in remote are appended with `is_default` set to false.
pub fn merge(
remote_presets: Vec<ModelPreset>,
existing_presets: Vec<ModelPreset>,
) -> Vec<ModelPreset> {
if remote_presets.is_empty() {
return existing_presets;
}
let remote_slugs: HashSet<&str> = remote_presets
.iter()
.map(|preset| preset.model.as_str())
.collect();
let mut merged_presets = remote_presets.clone();
for mut preset in existing_presets {
if remote_slugs.contains(preset.model.as_str()) {
continue;
}
preset.is_default = false;
merged_presets.push(preset);
}
merged_presets
}
}
fn reasoning_effort_mapping_from_presets(
presets: &[ReasoningEffortPreset],
) -> Option<HashMap<ReasoningEffort, ReasoningEffort>> {

View File

@@ -33,6 +33,7 @@ use codex_core::config::edit::ConfigEditsBuilder;
#[cfg(target_os = "windows")]
use codex_core::features::Feature;
use codex_core::models_manager::manager::ModelsManager;
use codex_core::models_manager::manager::RefreshStrategy;
use codex_core::models_manager::model_presets::HIDE_GPT_5_1_CODEX_MAX_MIGRATION_PROMPT_CONFIG;
use codex_core::models_manager::model_presets::HIDE_GPT5_1_MIGRATION_PROMPT_CONFIG;
use codex_core::protocol::DeprecationNoticeEvent;
@@ -199,7 +200,9 @@ async fn handle_model_migration_prompt_if_needed(
app_event_tx: &AppEventSender,
models_manager: Arc<ModelsManager>,
) -> Option<AppExitInfo> {
let available_models = models_manager.list_models(config).await;
let available_models = models_manager
.list_models(config, RefreshStrategy::default())
.await;
let upgrade = available_models
.iter()
.find(|preset| preset.model == model)
@@ -362,14 +365,12 @@ impl App {
let app_event_tx = AppEventSender::new(app_event_tx);
emit_deprecation_notice(&app_event_tx, ollama_chat_support_notice);
let thread_manager = Arc::new(ThreadManager::new(
config.codex_home.clone(),
auth_manager.clone(),
SessionSource::Cli,
));
let thread_manager = Arc::new(
ThreadManager::new(config.clone(), auth_manager.clone(), SessionSource::Cli).await,
);
let mut model = thread_manager
.get_models_manager()
.get_model(&config.model, &config)
.get_default_model(&config.model, &config, RefreshStrategy::default())
.await;
let exit_info = handle_model_migration_prompt_if_needed(
tui,
@@ -589,7 +590,7 @@ impl App {
let model_info = self
.server
.get_models_manager()
.construct_model_info(self.current_model.as_str(), &self.config)
.get_model_info(self.current_model.as_str(), &self.config)
.await;
match event {
AppEvent::NewSession => {

View File

@@ -394,7 +394,7 @@ async fn make_chatwidget_manual(
config: cfg,
model: resolved_model.clone(),
auth_manager: auth_manager.clone(),
models_manager: Arc::new(ModelsManager::new(codex_home, auth_manager)),
models_manager: Arc::new(ModelsManager::new(cfg.clone(), auth_manager).await),
session_header: SessionHeader::new(resolved_model),
initial_user_message: None,
token_info: None,
@@ -432,13 +432,11 @@ async fn make_chatwidget_manual(
(widget, rx, op_rx)
}
fn set_chatgpt_auth(chat: &mut ChatWidget) {
async fn set_chatgpt_auth(chat: &mut ChatWidget) {
chat.auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
chat.models_manager = Arc::new(ModelsManager::new(
chat.config.codex_home.clone(),
chat.auth_manager.clone(),
));
chat.models_manager =
Arc::new(ModelsManager::new(chat.config.clone(), chat.auth_manager.clone()).await);
}
pub(crate) async fn make_chatwidget_manual_with_sender() -> (
@@ -2220,7 +2218,7 @@ async fn startup_prompts_for_windows_sandbox_when_agent_requested() {
async fn model_reasoning_selection_popup_snapshot() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(Some("gpt-5.1-codex-max")).await;
set_chatgpt_auth(&mut chat);
set_chatgpt_auth(&mut chat).await;
chat.config.model_reasoning_effort = Some(ReasoningEffortConfig::High);
let preset = get_available_model(&chat, "gpt-5.1-codex-max");
@@ -2234,7 +2232,7 @@ async fn model_reasoning_selection_popup_snapshot() {
async fn model_reasoning_selection_popup_extra_high_warning_snapshot() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(Some("gpt-5.1-codex-max")).await;
set_chatgpt_auth(&mut chat);
set_chatgpt_auth(&mut chat).await;
chat.config.model_reasoning_effort = Some(ReasoningEffortConfig::XHigh);
let preset = get_available_model(&chat, "gpt-5.1-codex-max");
@@ -2248,7 +2246,7 @@ async fn model_reasoning_selection_popup_extra_high_warning_snapshot() {
async fn reasoning_popup_shows_extra_high_with_space() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(Some("gpt-5.1-codex-max")).await;
set_chatgpt_auth(&mut chat);
set_chatgpt_auth(&mut chat).await;
let preset = get_available_model(&chat, "gpt-5.1-codex-max");
chat.open_reasoning_popup(preset);

View File

@@ -50,6 +50,7 @@ use codex_core::config::edit::ConfigEditsBuilder;
#[cfg(target_os = "windows")]
use codex_core::features::Feature;
use codex_core::models_manager::manager::ModelsManager;
use codex_core::models_manager::manager::RefreshStrategy;
use codex_core::models_manager::model_presets::HIDE_GPT_5_1_CODEX_MAX_MIGRATION_PROMPT_CONFIG;
use codex_core::models_manager::model_presets::HIDE_GPT5_1_MIGRATION_PROMPT_CONFIG;
use codex_core::protocol::DeprecationNoticeEvent;
@@ -231,7 +232,9 @@ async fn handle_model_migration_prompt_if_needed(
app_event_tx: &AppEventSender,
models_manager: Arc<ModelsManager>,
) -> Option<AppExitInfo> {
let available_models = models_manager.list_models(config).await;
let available_models = models_manager
.list_models(config, RefreshStrategy::default())
.await;
let upgrade = available_models
.iter()
.find(|preset| preset.model == model)
@@ -425,14 +428,12 @@ impl App {
let app_event_tx = AppEventSender::new(app_event_tx);
emit_deprecation_notice(&app_event_tx, ollama_chat_support_notice);
let thread_manager = Arc::new(ThreadManager::new(
config.codex_home.clone(),
auth_manager.clone(),
SessionSource::Cli,
));
let thread_manager = Arc::new(
ThreadManager::new(config.clone(), auth_manager.clone(), SessionSource::Cli).await,
);
let mut model = thread_manager
.get_models_manager()
.get_model(&config.model, &config)
.get_default_model(&config.model, &config, RefreshStrategy::default())
.await;
let exit_info = handle_model_migration_prompt_if_needed(
tui,

View File

@@ -383,7 +383,7 @@ async fn make_chatwidget_manual(
config: cfg,
model: resolved_model.clone(),
auth_manager: auth_manager.clone(),
models_manager: Arc::new(ModelsManager::new(codex_home, auth_manager)),
models_manager: Arc::new(ModelsManager::new(cfg.clone(), auth_manager).await),
session_header: SessionHeader::new(resolved_model),
initial_user_message: None,
token_info: None,
@@ -419,13 +419,11 @@ async fn make_chatwidget_manual(
(widget, rx, op_rx)
}
fn set_chatgpt_auth(chat: &mut ChatWidget) {
async fn set_chatgpt_auth(chat: &mut ChatWidget) {
chat.auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
chat.models_manager = Arc::new(ModelsManager::new(
chat.config.codex_home.clone(),
chat.auth_manager.clone(),
));
chat.models_manager =
Arc::new(ModelsManager::new(chat.config.clone(), chat.auth_manager.clone()).await);
}
pub(crate) async fn make_chatwidget_manual_with_sender() -> (
@@ -1921,7 +1919,7 @@ async fn startup_prompts_for_windows_sandbox_when_agent_requested() {
async fn model_reasoning_selection_popup_snapshot() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(Some("gpt-5.1-codex-max")).await;
set_chatgpt_auth(&mut chat);
set_chatgpt_auth(&mut chat).await;
chat.config.model_reasoning_effort = Some(ReasoningEffortConfig::High);
let preset = get_available_model(&chat, "gpt-5.1-codex-max");
@@ -1935,7 +1933,7 @@ async fn model_reasoning_selection_popup_snapshot() {
async fn model_reasoning_selection_popup_extra_high_warning_snapshot() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(Some("gpt-5.1-codex-max")).await;
set_chatgpt_auth(&mut chat);
set_chatgpt_auth(&mut chat).await;
chat.config.model_reasoning_effort = Some(ReasoningEffortConfig::XHigh);
let preset = get_available_model(&chat, "gpt-5.1-codex-max");
@@ -1949,7 +1947,7 @@ async fn model_reasoning_selection_popup_extra_high_warning_snapshot() {
async fn reasoning_popup_shows_extra_high_with_space() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(Some("gpt-5.1-codex-max")).await;
set_chatgpt_auth(&mut chat);
set_chatgpt_auth(&mut chat).await;
let preset = get_available_model(&chat, "gpt-5.1-codex-max");
chat.open_reasoning_popup(preset);