Compare commits

...

1 Commits

Author SHA1 Message Date
gt-oai
20a669c82e add cloud requirements error msg 2026-02-03 21:55:26 +00:00
6 changed files with 247 additions and 45 deletions

1
codex-rs/Cargo.lock generated
View File

@@ -1413,6 +1413,7 @@ dependencies = [
"codex-core",
"codex-otel",
"codex-protocol",
"futures",
"pretty_assertions",
"serde_json",
"tempfile",

View File

@@ -13,6 +13,7 @@ codex-backend-client = { workspace = true }
codex-core = { workspace = true }
codex-otel = { workspace = true }
codex-protocol = { workspace = true }
futures = { workspace = true }
tokio = { workspace = true, features = ["sync", "time"] }
toml = { workspace = true }
tracing = { workspace = true }

View File

@@ -15,6 +15,7 @@ use codex_core::auth::CodexAuth;
use codex_core::config_loader::CloudRequirementsLoader;
use codex_core::config_loader::ConfigRequirementsToml;
use codex_protocol::account::PlanType;
use futures::FutureExt;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
@@ -27,9 +28,21 @@ const CLOUD_REQUIREMENTS_TIMEOUT: Duration = Duration::from_secs(5);
trait RequirementsFetcher: Send + Sync {
/// Returns requirements as a TOML string.
///
/// TODO(gt): For now, returns an Option. But when we want to make this fail-closed, return a
/// Result.
async fn fetch_requirements(&self, auth: &CodexAuth) -> Option<String>;
async fn fetch_requirements(
&self,
auth: &CodexAuth,
) -> Result<Option<String>, CloudRequirementsLoadFailure>;
}
#[derive(Debug, Clone, Default, PartialEq)]
struct CloudRequirementsLoadOutcome {
requirements: Option<ConfigRequirementsToml>,
warning: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct CloudRequirementsLoadFailure {
status_code: Option<u16>,
}
struct BackendRequirementsFetcher {
@@ -44,7 +57,10 @@ impl BackendRequirementsFetcher {
#[async_trait]
impl RequirementsFetcher for BackendRequirementsFetcher {
async fn fetch_requirements(&self, auth: &CodexAuth) -> Option<String> {
async fn fetch_requirements(
&self,
auth: &CodexAuth,
) -> Result<Option<String>, CloudRequirementsLoadFailure> {
let client = BackendClient::from_auth(self.base_url.clone(), auth)
.inspect_err(|err| {
tracing::warn!(
@@ -52,20 +68,24 @@ impl RequirementsFetcher for BackendRequirementsFetcher {
"Failed to construct backend client for cloud requirements"
);
})
.ok()?;
.map_err(|_| CloudRequirementsLoadFailure { status_code: None })?;
let response = client
.get_config_requirements_file()
.await
.inspect_err(|err| tracing::warn!(error = %err, "Failed to fetch cloud requirements"))
.ok()?;
let response = client.get_config_requirements_file().await.map_err(|err| {
let status_code = extract_http_status_code(&err.to_string());
tracing::warn!(
error = %err,
status_code,
"Failed to fetch cloud requirements"
);
CloudRequirementsLoadFailure { status_code }
})?;
let Some(contents) = response.contents else {
tracing::warn!("Cloud requirements response missing contents");
return None;
return Ok(None);
};
Some(contents)
Ok(Some(contents))
}
}
@@ -88,18 +108,23 @@ impl CloudRequirementsService {
}
}
async fn fetch_with_timeout(&self) -> Option<ConfigRequirementsToml> {
async fn fetch_with_timeout(&self) -> CloudRequirementsLoadOutcome {
let _timer =
codex_otel::start_global_timer("codex.cloud_requirements.fetch.duration_ms", &[]);
let started_at = Instant::now();
let result = timeout(self.timeout, self.fetch())
.await
.inspect_err(|_| {
tracing::warn!("Timed out waiting for cloud requirements; continuing without them");
})
.ok()?;
let result = match timeout(self.timeout, self.fetch()).await {
Ok(result) => result,
Err(_) => {
let warning = "Failed to load Cloud Requirements: request timed out. Continuing without cloud requirements.".to_string();
tracing::warn!("{warning}");
return CloudRequirementsLoadOutcome {
requirements: None,
warning: Some(warning),
};
}
};
match result.as_ref() {
match result.requirements.as_ref() {
Some(requirements) => {
tracing::info!(
elapsed_ms = started_at.elapsed().as_millis(),
@@ -115,25 +140,49 @@ impl CloudRequirementsService {
}
}
if let Some(warning) = result.warning.as_deref() {
tracing::warn!("{warning}");
}
result
}
async fn fetch(&self) -> Option<ConfigRequirementsToml> {
let auth = self.auth_manager.auth().await?;
async fn fetch(&self) -> CloudRequirementsLoadOutcome {
let Some(auth) = self.auth_manager.auth().await else {
return CloudRequirementsLoadOutcome::default();
};
if !auth.is_chatgpt_auth()
|| !matches!(
auth.account_plan_type(),
Some(PlanType::Business | PlanType::Enterprise)
)
{
return None;
return CloudRequirementsLoadOutcome::default();
}
let contents = self.fetcher.fetch_requirements(&auth).await?;
parse_cloud_requirements(&contents)
.inspect_err(|err| tracing::warn!(error = %err, "Failed to parse cloud requirements"))
.ok()
.flatten()
let contents = match self.fetcher.fetch_requirements(&auth).await {
Ok(Some(contents)) => contents,
Ok(None) => return CloudRequirementsLoadOutcome::default(),
Err(err) => {
return CloudRequirementsLoadOutcome {
requirements: None,
warning: Some(fetch_warning_message(err.status_code)),
};
}
};
match parse_cloud_requirements(&contents) {
Ok(requirements) => CloudRequirementsLoadOutcome {
requirements,
warning: None,
},
Err(err) => {
tracing::warn!(error = %err, "Failed to parse cloud requirements");
CloudRequirementsLoadOutcome {
requirements: None,
warning: Some("Failed to load Cloud Requirements due to invalid response format. Continuing without cloud requirements.".to_string()),
}
}
}
}
}
@@ -147,12 +196,48 @@ pub fn cloud_requirements_loader(
CLOUD_REQUIREMENTS_TIMEOUT,
);
let task = tokio::spawn(async move { service.fetch_with_timeout().await });
CloudRequirementsLoader::new(async move {
let load_outcome = async move {
task.await
.inspect_err(|err| tracing::warn!(error = %err, "Cloud requirements task failed"))
.ok()
.flatten()
})
.unwrap_or_else(|| CloudRequirementsLoadOutcome {
requirements: None,
warning: Some(
"Failed to load Cloud Requirements due to an internal task failure. Continuing without cloud requirements.".to_string(),
),
})
}
.shared();
CloudRequirementsLoader::new_with_warning(
{
let load_outcome = load_outcome.clone();
async move { load_outcome.await.requirements }
},
async move { load_outcome.await.warning },
)
}
fn fetch_warning_message(status_code: Option<u16>) -> String {
match status_code {
Some(status_code) => format!(
"Failed to load Cloud Requirements (HTTP {status_code}). Continuing without cloud requirements."
),
None => {
"Failed to load Cloud Requirements. Continuing without cloud requirements.".to_string()
}
}
}
fn extract_http_status_code(error_message: &str) -> Option<u16> {
let status_text = error_message.split_once(" failed: ")?.1;
let status_digits: String = status_text
.chars()
.take_while(|ch| ch.is_ascii_digit())
.collect();
if status_digits.len() != 3 {
return None;
}
status_digits.parse::<u16>().ok()
}
fn parse_cloud_requirements(
@@ -247,8 +332,11 @@ mod tests {
#[async_trait::async_trait]
impl RequirementsFetcher for StaticFetcher {
async fn fetch_requirements(&self, _auth: &CodexAuth) -> Option<String> {
self.contents.clone()
async fn fetch_requirements(
&self,
_auth: &CodexAuth,
) -> Result<Option<String>, CloudRequirementsLoadFailure> {
Ok(self.contents.clone())
}
}
@@ -256,9 +344,12 @@ mod tests {
#[async_trait::async_trait]
impl RequirementsFetcher for PendingFetcher {
async fn fetch_requirements(&self, _auth: &CodexAuth) -> Option<String> {
async fn fetch_requirements(
&self,
_auth: &CodexAuth,
) -> Result<Option<String>, CloudRequirementsLoadFailure> {
pending::<()>().await;
None
Ok(None)
}
}
@@ -271,7 +362,7 @@ mod tests {
CLOUD_REQUIREMENTS_TIMEOUT,
);
let result = service.fetch().await;
assert!(result.is_none());
assert_eq!(result, CloudRequirementsLoadOutcome::default());
}
#[tokio::test]
@@ -282,7 +373,7 @@ mod tests {
CLOUD_REQUIREMENTS_TIMEOUT,
);
let result = service.fetch().await;
assert!(result.is_none());
assert_eq!(result, CloudRequirementsLoadOutcome::default());
}
#[tokio::test]
@@ -296,13 +387,16 @@ mod tests {
);
assert_eq!(
service.fetch().await,
Some(ConfigRequirementsToml {
allowed_approval_policies: Some(vec![AskForApproval::Never]),
allowed_sandbox_modes: None,
mcp_servers: None,
rules: None,
enforce_residency: None,
})
CloudRequirementsLoadOutcome {
requirements: Some(ConfigRequirementsToml {
allowed_approval_policies: Some(vec![AskForApproval::Never]),
allowed_sandbox_modes: None,
mcp_servers: None,
rules: None,
enforce_residency: None,
}),
warning: None,
}
);
}
@@ -358,6 +452,34 @@ mod tests {
tokio::time::advance(CLOUD_REQUIREMENTS_TIMEOUT + Duration::from_millis(1)).await;
let result = handle.await.expect("cloud requirements task");
assert!(result.is_none());
assert_eq!(
result.warning,
Some(
"Failed to load Cloud Requirements: request timed out. Continuing without cloud requirements.".to_string()
)
);
assert_eq!(result.requirements, None);
}
#[test]
fn parse_http_status_code_from_backend_error_message() {
assert_eq!(
extract_http_status_code(
"GET https://chatgpt.com/backend-api/wham/config/requirements failed: 403 Forbidden; content-type=application/json; body={}"
),
Some(403)
);
assert_eq!(
extract_http_status_code("Decode error for https://example.com: unexpected EOF"),
None
);
}
#[test]
fn fetch_warning_message_includes_status_code_when_available() {
assert_eq!(
fetch_warning_message(Some(429)),
"Failed to load Cloud Requirements (HTTP 429). Continuing without cloud requirements."
);
}
}

View File

@@ -9,6 +9,7 @@ use std::future::Future;
pub struct CloudRequirementsLoader {
// TODO(gt): This should return a Result once we can fail-closed.
fut: Shared<BoxFuture<'static, Option<ConfigRequirementsToml>>>,
warning_fut: Option<Shared<BoxFuture<'static, Option<String>>>>,
}
impl CloudRequirementsLoader {
@@ -18,12 +19,31 @@ impl CloudRequirementsLoader {
{
Self {
fut: fut.boxed().shared(),
warning_fut: None,
}
}
pub fn new_with_warning<F, W>(fut: F, warning_fut: W) -> Self
where
F: Future<Output = Option<ConfigRequirementsToml>> + Send + 'static,
W: Future<Output = Option<String>> + Send + 'static,
{
Self {
fut: fut.boxed().shared(),
warning_fut: Some(warning_fut.boxed().shared()),
}
}
pub async fn get(&self) -> Option<ConfigRequirementsToml> {
self.fut.clone().await
}
pub async fn warning(&self) -> Option<String> {
match &self.warning_fut {
Some(warning_fut) => warning_fut.clone().await,
None => None,
}
}
}
impl fmt::Debug for CloudRequirementsLoader {
@@ -59,4 +79,18 @@ mod tests {
assert_eq!(first, second);
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn shared_warning_future_runs_once() {
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = Arc::clone(&counter);
let loader = CloudRequirementsLoader::new_with_warning(async { None }, async move {
counter_clone.fetch_add(1, Ordering::SeqCst);
Some("warning".to_string())
});
let (first, second) = tokio::join!(loader.warning(), loader.warning());
assert_eq!(first, second);
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
}

View File

@@ -226,6 +226,14 @@ fn emit_project_config_warnings(app_event_tx: &AppEventSender, config: &Config)
)));
}
fn emit_cloud_requirements_warning(app_event_tx: &AppEventSender, warning: Option<String>) {
if let Some(warning) = warning {
app_event_tx.send(AppEvent::InsertHistoryCell(Box::new(
history_cell::new_warning_event(warning),
)));
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct SessionSummary {
usage_line: String,
@@ -908,6 +916,7 @@ impl App {
tui: &mut tui::Tui,
auth_manager: Arc<AuthManager>,
mut config: Config,
cloud_requirements_warning: Option<String>,
cli_kv_overrides: Vec<(String, TomlValue)>,
harness_overrides: ConfigOverrides,
active_profile: Option<String>,
@@ -921,6 +930,7 @@ impl App {
let (app_event_tx, mut app_event_rx) = unbounded_channel();
let app_event_tx = AppEventSender::new(app_event_tx);
emit_project_config_warnings(&app_event_tx, &config);
emit_cloud_requirements_warning(&app_event_tx, cloud_requirements_warning);
tui.set_notification_method(config.tui_notification_method);
let harness_overrides =
@@ -2548,6 +2558,36 @@ mod tests {
Ok(())
}
#[test]
fn emit_cloud_requirements_warning_enqueues_warning_cell() {
let (tx, mut rx) = unbounded_channel();
let app_event_tx = AppEventSender::new(tx);
emit_cloud_requirements_warning(
&app_event_tx,
Some("Failed to load Cloud Requirements (HTTP 403).".to_string()),
);
let event = rx.try_recv().expect("expected warning event");
let AppEvent::InsertHistoryCell(cell) = event else {
panic!("expected InsertHistoryCell event");
};
let rendered = cell
.display_lines(200)
.into_iter()
.flat_map(|line| line.spans.into_iter().map(|span| span.content.to_string()))
.collect::<String>();
assert!(rendered.contains("HTTP 403"));
}
#[test]
fn emit_cloud_requirements_warning_skips_none() {
let (tx, mut rx) = unbounded_channel();
let app_event_tx = AppEventSender::new(tx);
emit_cloud_requirements_warning(&app_event_tx, None);
assert!(rx.try_recv().is_err());
}
#[tokio::test]
async fn enqueue_thread_event_does_not_block_when_channel_full() -> Result<()> {
let mut app = make_test_app().await;

View File

@@ -282,6 +282,7 @@ pub async fn run_main(
cloud_requirements.clone(),
)
.await;
let cloud_requirements_warning = cloud_requirements.warning().await;
set_default_client_residency_requirement(config.enforce_residency.value());
if let Some(warning) = add_dir_warning_message(&cli.add_dir, config.sandbox_policy.get()) {
@@ -398,6 +399,7 @@ pub async fn run_main(
run_ratatui_app(
cli,
config,
cloud_requirements_warning,
overrides,
cli_kv_overrides,
cloud_requirements,
@@ -410,6 +412,7 @@ pub async fn run_main(
async fn run_ratatui_app(
cli: Cli,
initial_config: Config,
cloud_requirements_warning: Option<String>,
overrides: ConfigOverrides,
cli_kv_overrides: Vec<(String, toml::Value)>,
cloud_requirements: CloudRequirementsLoader,
@@ -684,6 +687,7 @@ async fn run_ratatui_app(
&mut tui,
auth_manager,
config,
cloud_requirements_warning,
cli_kv_overrides.clone(),
overrides.clone(),
active_profile,