mirror of
https://github.com/openai/codex.git
synced 2026-03-05 22:23:22 +00:00
Compare commits
1 Commits
codex/load
...
gt/noisy-c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
20a669c82e |
1
codex-rs/Cargo.lock
generated
1
codex-rs/Cargo.lock
generated
@@ -1413,6 +1413,7 @@ dependencies = [
|
||||
"codex-core",
|
||||
"codex-otel",
|
||||
"codex-protocol",
|
||||
"futures",
|
||||
"pretty_assertions",
|
||||
"serde_json",
|
||||
"tempfile",
|
||||
|
||||
@@ -13,6 +13,7 @@ codex-backend-client = { workspace = true }
|
||||
codex-core = { workspace = true }
|
||||
codex-otel = { workspace = true }
|
||||
codex-protocol = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
tokio = { workspace = true, features = ["sync", "time"] }
|
||||
toml = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
|
||||
@@ -15,6 +15,7 @@ use codex_core::auth::CodexAuth;
|
||||
use codex_core::config_loader::CloudRequirementsLoader;
|
||||
use codex_core::config_loader::ConfigRequirementsToml;
|
||||
use codex_protocol::account::PlanType;
|
||||
use futures::FutureExt;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
@@ -27,9 +28,21 @@ const CLOUD_REQUIREMENTS_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
trait RequirementsFetcher: Send + Sync {
|
||||
/// Returns requirements as a TOML string.
|
||||
///
|
||||
/// TODO(gt): For now, returns an Option. But when we want to make this fail-closed, return a
|
||||
/// Result.
|
||||
async fn fetch_requirements(&self, auth: &CodexAuth) -> Option<String>;
|
||||
async fn fetch_requirements(
|
||||
&self,
|
||||
auth: &CodexAuth,
|
||||
) -> Result<Option<String>, CloudRequirementsLoadFailure>;
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default, PartialEq)]
|
||||
struct CloudRequirementsLoadOutcome {
|
||||
requirements: Option<ConfigRequirementsToml>,
|
||||
warning: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
struct CloudRequirementsLoadFailure {
|
||||
status_code: Option<u16>,
|
||||
}
|
||||
|
||||
struct BackendRequirementsFetcher {
|
||||
@@ -44,7 +57,10 @@ impl BackendRequirementsFetcher {
|
||||
|
||||
#[async_trait]
|
||||
impl RequirementsFetcher for BackendRequirementsFetcher {
|
||||
async fn fetch_requirements(&self, auth: &CodexAuth) -> Option<String> {
|
||||
async fn fetch_requirements(
|
||||
&self,
|
||||
auth: &CodexAuth,
|
||||
) -> Result<Option<String>, CloudRequirementsLoadFailure> {
|
||||
let client = BackendClient::from_auth(self.base_url.clone(), auth)
|
||||
.inspect_err(|err| {
|
||||
tracing::warn!(
|
||||
@@ -52,20 +68,24 @@ impl RequirementsFetcher for BackendRequirementsFetcher {
|
||||
"Failed to construct backend client for cloud requirements"
|
||||
);
|
||||
})
|
||||
.ok()?;
|
||||
.map_err(|_| CloudRequirementsLoadFailure { status_code: None })?;
|
||||
|
||||
let response = client
|
||||
.get_config_requirements_file()
|
||||
.await
|
||||
.inspect_err(|err| tracing::warn!(error = %err, "Failed to fetch cloud requirements"))
|
||||
.ok()?;
|
||||
let response = client.get_config_requirements_file().await.map_err(|err| {
|
||||
let status_code = extract_http_status_code(&err.to_string());
|
||||
tracing::warn!(
|
||||
error = %err,
|
||||
status_code,
|
||||
"Failed to fetch cloud requirements"
|
||||
);
|
||||
CloudRequirementsLoadFailure { status_code }
|
||||
})?;
|
||||
|
||||
let Some(contents) = response.contents else {
|
||||
tracing::warn!("Cloud requirements response missing contents");
|
||||
return None;
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
Some(contents)
|
||||
Ok(Some(contents))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -88,18 +108,23 @@ impl CloudRequirementsService {
|
||||
}
|
||||
}
|
||||
|
||||
async fn fetch_with_timeout(&self) -> Option<ConfigRequirementsToml> {
|
||||
async fn fetch_with_timeout(&self) -> CloudRequirementsLoadOutcome {
|
||||
let _timer =
|
||||
codex_otel::start_global_timer("codex.cloud_requirements.fetch.duration_ms", &[]);
|
||||
let started_at = Instant::now();
|
||||
let result = timeout(self.timeout, self.fetch())
|
||||
.await
|
||||
.inspect_err(|_| {
|
||||
tracing::warn!("Timed out waiting for cloud requirements; continuing without them");
|
||||
})
|
||||
.ok()?;
|
||||
let result = match timeout(self.timeout, self.fetch()).await {
|
||||
Ok(result) => result,
|
||||
Err(_) => {
|
||||
let warning = "Failed to load Cloud Requirements: request timed out. Continuing without cloud requirements.".to_string();
|
||||
tracing::warn!("{warning}");
|
||||
return CloudRequirementsLoadOutcome {
|
||||
requirements: None,
|
||||
warning: Some(warning),
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
match result.as_ref() {
|
||||
match result.requirements.as_ref() {
|
||||
Some(requirements) => {
|
||||
tracing::info!(
|
||||
elapsed_ms = started_at.elapsed().as_millis(),
|
||||
@@ -115,25 +140,49 @@ impl CloudRequirementsService {
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(warning) = result.warning.as_deref() {
|
||||
tracing::warn!("{warning}");
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
async fn fetch(&self) -> Option<ConfigRequirementsToml> {
|
||||
let auth = self.auth_manager.auth().await?;
|
||||
async fn fetch(&self) -> CloudRequirementsLoadOutcome {
|
||||
let Some(auth) = self.auth_manager.auth().await else {
|
||||
return CloudRequirementsLoadOutcome::default();
|
||||
};
|
||||
if !auth.is_chatgpt_auth()
|
||||
|| !matches!(
|
||||
auth.account_plan_type(),
|
||||
Some(PlanType::Business | PlanType::Enterprise)
|
||||
)
|
||||
{
|
||||
return None;
|
||||
return CloudRequirementsLoadOutcome::default();
|
||||
}
|
||||
|
||||
let contents = self.fetcher.fetch_requirements(&auth).await?;
|
||||
parse_cloud_requirements(&contents)
|
||||
.inspect_err(|err| tracing::warn!(error = %err, "Failed to parse cloud requirements"))
|
||||
.ok()
|
||||
.flatten()
|
||||
let contents = match self.fetcher.fetch_requirements(&auth).await {
|
||||
Ok(Some(contents)) => contents,
|
||||
Ok(None) => return CloudRequirementsLoadOutcome::default(),
|
||||
Err(err) => {
|
||||
return CloudRequirementsLoadOutcome {
|
||||
requirements: None,
|
||||
warning: Some(fetch_warning_message(err.status_code)),
|
||||
};
|
||||
}
|
||||
};
|
||||
match parse_cloud_requirements(&contents) {
|
||||
Ok(requirements) => CloudRequirementsLoadOutcome {
|
||||
requirements,
|
||||
warning: None,
|
||||
},
|
||||
Err(err) => {
|
||||
tracing::warn!(error = %err, "Failed to parse cloud requirements");
|
||||
CloudRequirementsLoadOutcome {
|
||||
requirements: None,
|
||||
warning: Some("Failed to load Cloud Requirements due to invalid response format. Continuing without cloud requirements.".to_string()),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -147,12 +196,48 @@ pub fn cloud_requirements_loader(
|
||||
CLOUD_REQUIREMENTS_TIMEOUT,
|
||||
);
|
||||
let task = tokio::spawn(async move { service.fetch_with_timeout().await });
|
||||
CloudRequirementsLoader::new(async move {
|
||||
let load_outcome = async move {
|
||||
task.await
|
||||
.inspect_err(|err| tracing::warn!(error = %err, "Cloud requirements task failed"))
|
||||
.ok()
|
||||
.flatten()
|
||||
})
|
||||
.unwrap_or_else(|| CloudRequirementsLoadOutcome {
|
||||
requirements: None,
|
||||
warning: Some(
|
||||
"Failed to load Cloud Requirements due to an internal task failure. Continuing without cloud requirements.".to_string(),
|
||||
),
|
||||
})
|
||||
}
|
||||
.shared();
|
||||
CloudRequirementsLoader::new_with_warning(
|
||||
{
|
||||
let load_outcome = load_outcome.clone();
|
||||
async move { load_outcome.await.requirements }
|
||||
},
|
||||
async move { load_outcome.await.warning },
|
||||
)
|
||||
}
|
||||
|
||||
fn fetch_warning_message(status_code: Option<u16>) -> String {
|
||||
match status_code {
|
||||
Some(status_code) => format!(
|
||||
"Failed to load Cloud Requirements (HTTP {status_code}). Continuing without cloud requirements."
|
||||
),
|
||||
None => {
|
||||
"Failed to load Cloud Requirements. Continuing without cloud requirements.".to_string()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn extract_http_status_code(error_message: &str) -> Option<u16> {
|
||||
let status_text = error_message.split_once(" failed: ")?.1;
|
||||
let status_digits: String = status_text
|
||||
.chars()
|
||||
.take_while(|ch| ch.is_ascii_digit())
|
||||
.collect();
|
||||
if status_digits.len() != 3 {
|
||||
return None;
|
||||
}
|
||||
status_digits.parse::<u16>().ok()
|
||||
}
|
||||
|
||||
fn parse_cloud_requirements(
|
||||
@@ -247,8 +332,11 @@ mod tests {
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl RequirementsFetcher for StaticFetcher {
|
||||
async fn fetch_requirements(&self, _auth: &CodexAuth) -> Option<String> {
|
||||
self.contents.clone()
|
||||
async fn fetch_requirements(
|
||||
&self,
|
||||
_auth: &CodexAuth,
|
||||
) -> Result<Option<String>, CloudRequirementsLoadFailure> {
|
||||
Ok(self.contents.clone())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -256,9 +344,12 @@ mod tests {
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl RequirementsFetcher for PendingFetcher {
|
||||
async fn fetch_requirements(&self, _auth: &CodexAuth) -> Option<String> {
|
||||
async fn fetch_requirements(
|
||||
&self,
|
||||
_auth: &CodexAuth,
|
||||
) -> Result<Option<String>, CloudRequirementsLoadFailure> {
|
||||
pending::<()>().await;
|
||||
None
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -271,7 +362,7 @@ mod tests {
|
||||
CLOUD_REQUIREMENTS_TIMEOUT,
|
||||
);
|
||||
let result = service.fetch().await;
|
||||
assert!(result.is_none());
|
||||
assert_eq!(result, CloudRequirementsLoadOutcome::default());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -282,7 +373,7 @@ mod tests {
|
||||
CLOUD_REQUIREMENTS_TIMEOUT,
|
||||
);
|
||||
let result = service.fetch().await;
|
||||
assert!(result.is_none());
|
||||
assert_eq!(result, CloudRequirementsLoadOutcome::default());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -296,13 +387,16 @@ mod tests {
|
||||
);
|
||||
assert_eq!(
|
||||
service.fetch().await,
|
||||
Some(ConfigRequirementsToml {
|
||||
allowed_approval_policies: Some(vec![AskForApproval::Never]),
|
||||
allowed_sandbox_modes: None,
|
||||
mcp_servers: None,
|
||||
rules: None,
|
||||
enforce_residency: None,
|
||||
})
|
||||
CloudRequirementsLoadOutcome {
|
||||
requirements: Some(ConfigRequirementsToml {
|
||||
allowed_approval_policies: Some(vec![AskForApproval::Never]),
|
||||
allowed_sandbox_modes: None,
|
||||
mcp_servers: None,
|
||||
rules: None,
|
||||
enforce_residency: None,
|
||||
}),
|
||||
warning: None,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@@ -358,6 +452,34 @@ mod tests {
|
||||
tokio::time::advance(CLOUD_REQUIREMENTS_TIMEOUT + Duration::from_millis(1)).await;
|
||||
|
||||
let result = handle.await.expect("cloud requirements task");
|
||||
assert!(result.is_none());
|
||||
assert_eq!(
|
||||
result.warning,
|
||||
Some(
|
||||
"Failed to load Cloud Requirements: request timed out. Continuing without cloud requirements.".to_string()
|
||||
)
|
||||
);
|
||||
assert_eq!(result.requirements, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_http_status_code_from_backend_error_message() {
|
||||
assert_eq!(
|
||||
extract_http_status_code(
|
||||
"GET https://chatgpt.com/backend-api/wham/config/requirements failed: 403 Forbidden; content-type=application/json; body={}"
|
||||
),
|
||||
Some(403)
|
||||
);
|
||||
assert_eq!(
|
||||
extract_http_status_code("Decode error for https://example.com: unexpected EOF"),
|
||||
None
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fetch_warning_message_includes_status_code_when_available() {
|
||||
assert_eq!(
|
||||
fetch_warning_message(Some(429)),
|
||||
"Failed to load Cloud Requirements (HTTP 429). Continuing without cloud requirements."
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ use std::future::Future;
|
||||
pub struct CloudRequirementsLoader {
|
||||
// TODO(gt): This should return a Result once we can fail-closed.
|
||||
fut: Shared<BoxFuture<'static, Option<ConfigRequirementsToml>>>,
|
||||
warning_fut: Option<Shared<BoxFuture<'static, Option<String>>>>,
|
||||
}
|
||||
|
||||
impl CloudRequirementsLoader {
|
||||
@@ -18,12 +19,31 @@ impl CloudRequirementsLoader {
|
||||
{
|
||||
Self {
|
||||
fut: fut.boxed().shared(),
|
||||
warning_fut: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_with_warning<F, W>(fut: F, warning_fut: W) -> Self
|
||||
where
|
||||
F: Future<Output = Option<ConfigRequirementsToml>> + Send + 'static,
|
||||
W: Future<Output = Option<String>> + Send + 'static,
|
||||
{
|
||||
Self {
|
||||
fut: fut.boxed().shared(),
|
||||
warning_fut: Some(warning_fut.boxed().shared()),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get(&self) -> Option<ConfigRequirementsToml> {
|
||||
self.fut.clone().await
|
||||
}
|
||||
|
||||
pub async fn warning(&self) -> Option<String> {
|
||||
match &self.warning_fut {
|
||||
Some(warning_fut) => warning_fut.clone().await,
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for CloudRequirementsLoader {
|
||||
@@ -59,4 +79,18 @@ mod tests {
|
||||
assert_eq!(first, second);
|
||||
assert_eq!(counter.load(Ordering::SeqCst), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn shared_warning_future_runs_once() {
|
||||
let counter = Arc::new(AtomicUsize::new(0));
|
||||
let counter_clone = Arc::clone(&counter);
|
||||
let loader = CloudRequirementsLoader::new_with_warning(async { None }, async move {
|
||||
counter_clone.fetch_add(1, Ordering::SeqCst);
|
||||
Some("warning".to_string())
|
||||
});
|
||||
|
||||
let (first, second) = tokio::join!(loader.warning(), loader.warning());
|
||||
assert_eq!(first, second);
|
||||
assert_eq!(counter.load(Ordering::SeqCst), 1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -226,6 +226,14 @@ fn emit_project_config_warnings(app_event_tx: &AppEventSender, config: &Config)
|
||||
)));
|
||||
}
|
||||
|
||||
fn emit_cloud_requirements_warning(app_event_tx: &AppEventSender, warning: Option<String>) {
|
||||
if let Some(warning) = warning {
|
||||
app_event_tx.send(AppEvent::InsertHistoryCell(Box::new(
|
||||
history_cell::new_warning_event(warning),
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
struct SessionSummary {
|
||||
usage_line: String,
|
||||
@@ -908,6 +916,7 @@ impl App {
|
||||
tui: &mut tui::Tui,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
mut config: Config,
|
||||
cloud_requirements_warning: Option<String>,
|
||||
cli_kv_overrides: Vec<(String, TomlValue)>,
|
||||
harness_overrides: ConfigOverrides,
|
||||
active_profile: Option<String>,
|
||||
@@ -921,6 +930,7 @@ impl App {
|
||||
let (app_event_tx, mut app_event_rx) = unbounded_channel();
|
||||
let app_event_tx = AppEventSender::new(app_event_tx);
|
||||
emit_project_config_warnings(&app_event_tx, &config);
|
||||
emit_cloud_requirements_warning(&app_event_tx, cloud_requirements_warning);
|
||||
tui.set_notification_method(config.tui_notification_method);
|
||||
|
||||
let harness_overrides =
|
||||
@@ -2548,6 +2558,36 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn emit_cloud_requirements_warning_enqueues_warning_cell() {
|
||||
let (tx, mut rx) = unbounded_channel();
|
||||
let app_event_tx = AppEventSender::new(tx);
|
||||
emit_cloud_requirements_warning(
|
||||
&app_event_tx,
|
||||
Some("Failed to load Cloud Requirements (HTTP 403).".to_string()),
|
||||
);
|
||||
|
||||
let event = rx.try_recv().expect("expected warning event");
|
||||
let AppEvent::InsertHistoryCell(cell) = event else {
|
||||
panic!("expected InsertHistoryCell event");
|
||||
};
|
||||
let rendered = cell
|
||||
.display_lines(200)
|
||||
.into_iter()
|
||||
.flat_map(|line| line.spans.into_iter().map(|span| span.content.to_string()))
|
||||
.collect::<String>();
|
||||
assert!(rendered.contains("HTTP 403"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn emit_cloud_requirements_warning_skips_none() {
|
||||
let (tx, mut rx) = unbounded_channel();
|
||||
let app_event_tx = AppEventSender::new(tx);
|
||||
emit_cloud_requirements_warning(&app_event_tx, None);
|
||||
|
||||
assert!(rx.try_recv().is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn enqueue_thread_event_does_not_block_when_channel_full() -> Result<()> {
|
||||
let mut app = make_test_app().await;
|
||||
|
||||
@@ -282,6 +282,7 @@ pub async fn run_main(
|
||||
cloud_requirements.clone(),
|
||||
)
|
||||
.await;
|
||||
let cloud_requirements_warning = cloud_requirements.warning().await;
|
||||
set_default_client_residency_requirement(config.enforce_residency.value());
|
||||
|
||||
if let Some(warning) = add_dir_warning_message(&cli.add_dir, config.sandbox_policy.get()) {
|
||||
@@ -398,6 +399,7 @@ pub async fn run_main(
|
||||
run_ratatui_app(
|
||||
cli,
|
||||
config,
|
||||
cloud_requirements_warning,
|
||||
overrides,
|
||||
cli_kv_overrides,
|
||||
cloud_requirements,
|
||||
@@ -410,6 +412,7 @@ pub async fn run_main(
|
||||
async fn run_ratatui_app(
|
||||
cli: Cli,
|
||||
initial_config: Config,
|
||||
cloud_requirements_warning: Option<String>,
|
||||
overrides: ConfigOverrides,
|
||||
cli_kv_overrides: Vec<(String, toml::Value)>,
|
||||
cloud_requirements: CloudRequirementsLoader,
|
||||
@@ -684,6 +687,7 @@ async fn run_ratatui_app(
|
||||
&mut tui,
|
||||
auth_manager,
|
||||
config,
|
||||
cloud_requirements_warning,
|
||||
cli_kv_overrides.clone(),
|
||||
overrides.clone(),
|
||||
active_profile,
|
||||
|
||||
Reference in New Issue
Block a user