This commit is contained in:
Ahmed Ibrahim
2025-12-02 14:54:33 -08:00
parent 47ef2cd9ca
commit 2721498ec9
3 changed files with 53 additions and 173 deletions

View File

@@ -1,12 +1,10 @@
use std::collections::HashMap;
use std::fmt::Debug;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use crate::AuthManager;
use crate::ResponseStream;
use crate::SandboxState;
use crate::client_common::REVIEW_PROMPT;
use crate::compact;
@@ -54,7 +52,6 @@ use serde_json::Value;
use tokio::sync::Mutex;
use tokio::sync::RwLock;
use tokio::sync::oneshot;
use tokio::time::sleep_until;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use tracing::error;
@@ -110,7 +107,7 @@ use crate::state::ActiveTurn;
use crate::state::SessionServices;
use crate::state::SessionState;
use crate::status::ComponentHealth;
use crate::status::IdleWarning;
use crate::status::maybe_codex_status_warning;
use crate::tasks::GhostSnapshotTask;
use crate::tasks::ReviewTask;
use crate::tasks::SessionTask;
@@ -2196,19 +2193,17 @@ async fn try_run_turn(
});
sess.persist_rollout_items(&[rollout_item]).await;
let mut idle_warning = IdleWarning::default();
let client = turn_context.client.clone();
let mut stream_future = Box::pin(client.stream(prompt).or_cancel(&cancellation_token));
if let Some(message) = maybe_codex_status_warning(sess.as_ref()).await {
sess.send_event(&turn_context, EventMsg::Warning(WarningEvent { message }))
.await;
}
let mut stream = await_stream_with_idle_warning(
stream_future.as_mut(),
&mut idle_warning,
&sess,
&turn_context,
)
.await?;
idle_warning.mark_event();
let mut stream = turn_context
.client
.clone()
.stream(prompt)
.or_cancel(&cancellation_token)
.await??;
let tool_runtime = ToolCallRuntime::new(
Arc::clone(&router),
@@ -2225,28 +2220,7 @@ async fn try_run_turn(
// Poll the next item from the model stream. We must inspect *both* Ok and Err
// cases so that transient stream failures (e.g., dropped SSE connection before
// `response.completed`) bubble up and trigger the caller's retry logic.
let event = tokio::select! {
biased;
result = stream.next().or_cancel(&cancellation_token) => result,
_ = sleep_until(idle_warning.deadline()) => {
if let Some(message) = idle_warning
.maybe_warning_message(sess.as_ref())
.await
{
sess.send_event(
&turn_context,
EventMsg::Warning(WarningEvent { message }),
)
.await;
}
idle_warning.mark_event();
continue;
}
};
idle_warning.mark_event();
let event = match event {
let event = match stream.next().or_cancel(&cancellation_token).await {
Ok(event) => event,
Err(codex_async_utils::CancelErr::Cancelled) => {
let processed_items = output.try_collect().await?;
@@ -2445,38 +2419,6 @@ async fn try_run_turn(
}
}
async fn await_stream_with_idle_warning<F>(
mut stream_future: Pin<&mut F>,
idle_warning: &mut IdleWarning,
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
) -> CodexResult<ResponseStream>
where
F: std::future::Future<
Output = Result<CodexResult<ResponseStream>, codex_async_utils::CancelErr>,
> + Send,
{
loop {
tokio::select! {
biased;
result = &mut stream_future => return result?,
_ = sleep_until(idle_warning.deadline()) => {
if let Some(message) = idle_warning
.maybe_warning_message(sess.as_ref())
.await
{
sess.send_event(
turn_context,
EventMsg::Warning(WarningEvent { message }),
)
.await;
}
idle_warning.mark_event();
}
}
}
}
async fn handle_non_tool_response_item(item: &ResponseItem) -> Option<TurnItem> {
debug!(?item, "Output item");

View File

@@ -17,14 +17,11 @@ use reqwest::Method;
use serde::Deserialize;
use serde::Serialize;
use strum_macros::Display;
use tokio::time::Instant;
const STATUS_WIDGET_URL: &str = "https://status.openai.com/proxy/status.openai.com";
const CODEX_COMPONENT_NAME: &str = "Codex";
const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(10);
static TEST_STATUS_WIDGET_URL: OnceLock<String> = OnceLock::new();
static TEST_IDLE_TIMEOUT: OnceLock<Duration> = OnceLock::new();
#[derive(Debug, Clone, Display, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
@@ -54,48 +51,19 @@ impl ComponentHealth {
}
}
pub(crate) struct IdleWarning {
last_event: Instant,
idle_timeout: Duration,
}
pub(crate) async fn maybe_codex_status_warning(session: &Session) -> Option<String> {
let Ok(status) = fetch_codex_health().await else {
return None;
};
impl IdleWarning {
pub(crate) fn new(idle_timeout: Duration) -> Self {
Self {
last_event: Instant::now(),
idle_timeout,
}
let previous = session.replace_codex_backend_status(status).await;
if status.is_operational() || previous == Some(status) {
return None;
}
pub(crate) fn deadline(&self) -> Instant {
self.last_event + self.idle_timeout
}
pub(crate) fn mark_event(&mut self) {
self.last_event = Instant::now();
}
pub(crate) async fn maybe_warning_message(&mut self, session: &Session) -> Option<String> {
let Ok(status) = fetch_codex_health().await else {
return None;
};
let previous = session.replace_codex_backend_status(status).await;
if status.is_operational() || previous == Some(status) {
return None;
}
self.mark_event();
Some(format!(
"Codex is experiencing a {status}. If a response stalls, try again later. You can follow incident updates at status.openai.com."
))
}
}
impl Default for IdleWarning {
fn default() -> Self {
Self::new(idle_timeout())
}
Some(format!(
"Codex is experiencing a {status}. If a response stalls, try again later. You can follow incident updates at status.openai.com."
))
}
async fn fetch_codex_health() -> Result<ComponentHealth> {
@@ -202,13 +170,6 @@ fn derive_component_health(
Ok(status)
}
fn idle_timeout() -> Duration {
TEST_IDLE_TIMEOUT
.get()
.copied()
.unwrap_or(DEFAULT_IDLE_TIMEOUT)
}
fn status_widget_url() -> String {
TEST_STATUS_WIDGET_URL
.get()
@@ -222,38 +183,12 @@ pub fn set_test_status_widget_url(url: impl Into<String>) {
let _ = TEST_STATUS_WIDGET_URL.set(url.into());
}
#[doc(hidden)]
#[cfg_attr(not(test), allow(dead_code))]
pub fn set_test_idle_timeout(duration: Duration) {
let _ = TEST_IDLE_TIMEOUT.set(duration);
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
use serde_json::json;
#[test]
fn defaults_to_operational_when_not_affected() {
let payload = serde_json::from_value::<StatusPayload>(json!({
"summary": {
"id": "sum-1",
"name": "OpenAI",
"components": [
{"id": "cmp-1", "name": "Codex", "status_page_id": "page-1"},
{"id": "cmp-2", "name": "Chat", "status_page_id": "page-1"}
]
}
}))
.expect("valid payload");
let status = derive_component_health(&payload, "Codex").expect("codex component exists");
assert_eq!(status, ComponentHealth::Operational);
assert!(status.is_operational());
}
#[test]
fn uses_affected_component_status() {
let payload = serde_json::from_value::<StatusPayload>(json!({

View File

@@ -8,7 +8,6 @@ use std::time::Duration;
use codex_core::protocol::EventMsg;
use codex_core::protocol::Op;
use codex_core::protocol::WarningEvent;
use codex_core::status::set_test_idle_timeout;
use codex_core::status::set_test_status_widget_url;
use codex_protocol::user_input::UserInput;
use core_test_support::responses::ev_assistant_message;
@@ -28,7 +27,7 @@ use wiremock::matchers::method;
use wiremock::matchers::path;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn emits_warning_when_stream_is_idle_and_status_is_degraded() {
async fn emits_warning_when_status_is_degraded_at_turn_start() {
let status_server = start_mock_server().await;
let status_path = "/proxy/status.openai.com";
@@ -39,8 +38,6 @@ async fn emits_warning_when_stream_is_idle_and_status_is_degraded() {
.await;
set_test_status_widget_url(format!("{}{}", status_server.uri(), status_path));
set_test_idle_timeout(Duration::from_millis(300));
let responses_server = start_mock_server().await;
let stalled_response = sse(vec![
ev_response_created("resp-1"),
@@ -51,7 +48,7 @@ async fn emits_warning_when_stream_is_idle_and_status_is_degraded() {
let _responses_mock = mount_sse_once_with_delay(
&responses_server,
stalled_response,
Duration::from_millis(400),
Duration::from_millis(10),
)
.await;
@@ -75,15 +72,6 @@ async fn emits_warning_when_stream_is_idle_and_status_is_degraded() {
"unexpected warning message"
);
let status_requests = status_server
.received_requests()
.await
.expect("status server running");
assert!(
!status_requests.is_empty(),
"status widget was not queried before idle warning"
);
wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await;
}
@@ -101,8 +89,6 @@ async fn warns_once_per_status_change_only_when_unhealthy() {
.await;
set_test_status_widget_url(format!("{}{}", status_server.uri(), status_path));
set_test_idle_timeout(Duration::from_millis(100));
let responses_server = start_mock_server().await;
let stalled_response = sse(vec![
ev_response_created("resp-1"),
@@ -127,32 +113,49 @@ async fn warns_once_per_status_change_only_when_unhealthy() {
.await
.unwrap();
let first_warning = wait_for_event(&codex, |event| matches!(event, EventMsg::Warning(_))).await;
let EventMsg::Warning(WarningEvent { message }) = first_warning else {
panic!("expected warning event");
};
assert_eq!(
message,
"Codex is experiencing a major outage. If a response stalls, try again later. You can follow incident updates at status.openai.com.",
);
wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "second".into(),
}],
})
.await
.unwrap();
let mut task_completes = 0usize;
let mut warnings = Vec::new();
loop {
while task_completes < 2 {
let event = codex.next_event().await.expect("event");
match event.msg {
EventMsg::Warning(WarningEvent { message }) => warnings.push(message),
EventMsg::TaskComplete(_) => break,
EventMsg::TaskComplete(_) => task_completes += 1,
_ => {}
}
}
let expected_messages = vec![
"Codex is experiencing a major outage. If a response stalls, try again later. You can follow incident updates at status.openai.com.".to_string(),
"Codex is experiencing a partial outage. If a response stalls, try again later. You can follow incident updates at status.openai.com.".to_string(),
];
assert!(
!warnings.is_empty(),
"expected at least one warning for non-operational status"
);
assert!(
warnings.len() <= expected_messages.len(),
"unexpected extra warnings: {warnings:?}"
assert_eq!(
warnings[0],
"Codex is experiencing a major outage. If a response stalls, try again later. You can follow incident updates at status.openai.com.",
);
assert_eq!(warnings[0], expected_messages[0], "first warning mismatch");
if warnings.len() > 1 {
assert_eq!(warnings[1], expected_messages[1], "second warning mismatch");
assert_eq!(
warnings[1],
"Codex is experiencing a partial outage. If a response stalls, try again later. You can follow incident updates at status.openai.com.",
);
}
}