diff --git a/codex-rs/core/src/client/responses.rs b/codex-rs/core/src/client/responses.rs index 0f621d83b3..f4fb4e5fd4 100644 --- a/codex-rs/core/src/client/responses.rs +++ b/codex-rs/core/src/client/responses.rs @@ -218,13 +218,13 @@ impl ModelClient { self.config .model_verbosity .or(self.config.model_family.default_verbosity) + } else if self.config.model_verbosity.is_some() { + warn!( + "model_verbosity is set but ignored as the model does not support verbosity: {}", + self.config.model_family.family + ); + None } else { - if self.config.model_verbosity.is_some() { - warn!( - "model_verbosity is set but ignored as the model does not support verbosity: {}", - self.config.model_family.family - ); - } None }; @@ -277,7 +277,9 @@ impl ModelClient { return Err(retryable_attempt_error.into_error()); } - tokio::time::sleep(retryable_attempt_error.delay(attempt)).await; + if let Some(delay) = retryable_attempt_error.delay(attempt) { + tokio::time::sleep(delay).await; + } } } } @@ -515,11 +517,7 @@ impl ModelClient { fn parse_rate_limit_snapshot(headers: &HeaderMap) -> Option { // Prefer codex-specific aggregate rate limit headers if present; fall back // to raw OpenAI-style request headers otherwise. - if let Some(snapshot) = parse_codex_rate_limits(headers) { - return Some(snapshot); - } - - parse_openai_rate_limits(headers) + parse_codex_rate_limits(headers).or_else(|| parse_openai_rate_limits(headers)) } fn parse_codex_rate_limits(headers: &HeaderMap) -> Option { @@ -637,21 +635,8 @@ fn try_parse_retry_after(error: &Error) -> Option { } fn is_context_window_error(error: &Error) -> bool { - let is_type = error - .r#type - .as_deref() - .map(|t| t == "context_length_exceeded") - .unwrap_or(false); - - if is_type { - return true; - } - - error - .code - .as_deref() - .map(|c| c == "context_length_exceeded") - .unwrap_or(false) + error.r#type.as_deref() == Some("context_length_exceeded") + || error.code.as_deref() == Some("context_length_exceeded") } fn is_quota_exceeded_error(error: &Error) -> bool { @@ -679,13 +664,13 @@ enum StreamAttemptError { } impl StreamAttemptError { - fn delay(&self, attempt: u64) -> Duration { + fn delay(&self, attempt: u64) -> Option { match self { StreamAttemptError::RetryableHttpError { retry_after, .. } => { - retry_after.unwrap_or_else(|| backoff(attempt)) + Some(retry_after.unwrap_or_else(|| backoff(attempt))) } - StreamAttemptError::RetryableTransportError(_) => backoff(attempt), - StreamAttemptError::Fatal(_) => Duration::from_secs(0), + StreamAttemptError::RetryableTransportError(_) => Some(backoff(attempt)), + StreamAttemptError::Fatal(_) => None, } } @@ -1031,7 +1016,7 @@ mod tests { use codex_app_server_protocol::AuthMode; use codex_protocol::ConversationId; use codex_protocol::models::ResponseItem; - + use futures::StreamExt; use pretty_assertions::assert_eq; use serde_json::json; @@ -1069,21 +1054,7 @@ mod tests { let sse2 = format!("event: response.output_item.done\ndata: {item2}\n\n"); let sse3 = format!("event: response.completed\ndata: {completed}\n\n"); - let provider = ModelProviderInfo { - name: "test".to_string(), - base_url: Some("https://test.com".to_string()), - env_key: Some("TEST_API_KEY".to_string()), - env_key_instructions: None, - experimental_bearer_token: None, - wire_api: WireApi::Responses, - query_params: None, - http_headers: None, - env_http_headers: None, - request_max_retries: Some(0), - stream_max_retries: Some(0), - stream_idle_timeout_ms: Some(1000), - requires_openai_auth: false, - }; + let provider = test_provider(); let otel_event_manager = otel_event_manager(); @@ -1133,21 +1104,7 @@ mod tests { .to_string(); let sse1 = format!("event: response.output_item.done\ndata: {item1}\n\n"); - let provider = ModelProviderInfo { - name: "test".to_string(), - base_url: Some("https://test.com".to_string()), - env_key: Some("TEST_API_KEY".to_string()), - env_key_instructions: None, - experimental_bearer_token: None, - wire_api: WireApi::Responses, - query_params: None, - http_headers: None, - env_http_headers: None, - request_max_retries: Some(0), - stream_max_retries: Some(0), - stream_idle_timeout_ms: Some(1000), - requires_openai_auth: false, - }; + let provider = test_provider(); let otel_event_manager = otel_event_manager(); @@ -1167,21 +1124,7 @@ mod tests { let fixture_path = Path::new(env!("CARGO_MANIFEST_DIR")).join("tests/cli_responses_fixture.sse"); - let provider = ModelProviderInfo { - name: "test".to_string(), - base_url: Some("https://test.com".to_string()), - env_key: Some("TEST_API_KEY".to_string()), - env_key_instructions: None, - experimental_bearer_token: None, - wire_api: WireApi::Responses, - query_params: None, - http_headers: None, - env_http_headers: None, - request_max_retries: Some(0), - stream_max_retries: Some(0), - stream_idle_timeout_ms: Some(1000), - requires_openai_auth: false, - }; + let provider = test_provider(); let otel_event_manager = otel_event_manager(); let result = @@ -1217,21 +1160,7 @@ mod tests { let fixture_path = Path::new(env!("CARGO_MANIFEST_DIR")).join("tests/cli_responses_fixture.sse"); - let provider = ModelProviderInfo { - name: "test".to_string(), - base_url: Some("https://test.com".to_string()), - env_key: Some("TEST_API_KEY".to_string()), - env_key_instructions: None, - experimental_bearer_token: None, - wire_api: WireApi::Responses, - query_params: None, - http_headers: None, - env_http_headers: None, - request_max_retries: Some(0), - stream_max_retries: Some(0), - stream_idle_timeout_ms: Some(1000), - requires_openai_auth: false, - }; + let provider = test_provider(); let otel_event_manager = otel_event_manager(); let result = @@ -1266,13 +1195,21 @@ mod tests { ) } - trait IdleTimeoutExt { - fn last_idle_timeout(&self) -> Option; - } - - impl IdleTimeoutExt for OtelEventManager { - fn last_idle_timeout(&self) -> Option { - None + fn test_provider() -> ModelProviderInfo { + ModelProviderInfo { + name: "test".to_string(), + base_url: Some("https://test.com".to_string()), + env_key: Some("TEST_API_KEY".to_string()), + env_key_instructions: None, + experimental_bearer_token: None, + wire_api: WireApi::Responses, + query_params: None, + http_headers: None, + env_http_headers: None, + request_max_retries: Some(0), + stream_max_retries: Some(0), + stream_idle_timeout_ms: Some(1000), + requires_openai_auth: false, } } @@ -1356,21 +1293,7 @@ mod tests { std::fs::write(&fixture_path, sse).unwrap(); - let provider = ModelProviderInfo { - name: "test".to_string(), - base_url: Some("https://test.com".to_string()), - env_key: Some("TEST_API_KEY".to_string()), - env_key_instructions: None, - experimental_bearer_token: None, - wire_api: WireApi::Responses, - query_params: None, - http_headers: None, - env_http_headers: None, - request_max_retries: Some(0), - stream_max_retries: Some(0), - stream_idle_timeout_ms: Some(1000), - requires_openai_auth: false, - }; + let provider = test_provider(); let otel_event_manager = otel_event_manager(); let result = @@ -1384,11 +1307,6 @@ mod tests { assert_eq!(events.len(), 1); matches!(events[0], Err(CodexErr::ContextWindowExceeded)); - - let delay = otel_event_manager - .last_idle_timeout() - .unwrap_or(Duration::ZERO); - assert_eq!(delay, Duration::ZERO); } #[tokio::test] @@ -1405,21 +1323,7 @@ mod tests { std::fs::write(&fixture_path, sse).unwrap(); - let provider = ModelProviderInfo { - name: "test".to_string(), - base_url: Some("https://test.com".to_string()), - env_key: Some("TEST_API_KEY".to_string()), - env_key_instructions: None, - experimental_bearer_token: None, - wire_api: WireApi::Responses, - query_params: None, - http_headers: None, - env_http_headers: None, - request_max_retries: Some(0), - stream_max_retries: Some(0), - stream_idle_timeout_ms: Some(1000), - requires_openai_auth: false, - }; + let provider = test_provider(); let otel_event_manager = otel_event_manager(); let result = @@ -1433,11 +1337,6 @@ mod tests { assert_eq!(events.len(), 1); matches!(events[0], Err(CodexErr::QuotaExceeded)); - - let delay = otel_event_manager - .last_idle_timeout() - .unwrap_or(Duration::ZERO); - assert_eq!(delay, Duration::ZERO); } #[tokio::test] @@ -1454,21 +1353,7 @@ mod tests { std::fs::write(&fixture_path, sse).unwrap(); - let provider = ModelProviderInfo { - name: "test".to_string(), - base_url: Some("https://test.com".to_string()), - env_key: Some("TEST_API_KEY".to_string()), - env_key_instructions: None, - experimental_bearer_token: None, - wire_api: WireApi::Responses, - query_params: None, - http_headers: None, - env_http_headers: None, - request_max_retries: Some(0), - stream_max_retries: Some(0), - stream_idle_timeout_ms: Some(1000), - requires_openai_auth: false, - }; + let provider = test_provider(); let otel_event_manager = otel_event_manager(); let result = @@ -1482,59 +1367,5 @@ mod tests { assert_eq!(events.len(), 1); matches!(events[0], Err(CodexErr::ContextWindowExceeded)); - - let delay = otel_event_manager - .last_idle_timeout() - .unwrap_or(Duration::ZERO); - assert_eq!(delay, Duration::ZERO); - } - - #[tokio::test] - async fn quota_exceeded_error_is_fatal_for_quota_exceeded_type() { - let file = tempfile::NamedTempFile::new().unwrap(); - let fixture_path = file.path().to_path_buf(); - let sse = concat!( - "data: {\"type\":\"response.failed\",\"response\":{\"error\":", - "{\"type\":\"usage_limit_reached\",\"code\":\"insufficient_quota\",", - "\"message\":\"quota exceeded\"}}}\n\n", - "data: {\"type\":\"response.completed\",\"response\":{\"id\":\"resp1\"}}\n\n", - "data: [DONE]\n\n", - ); - - std::fs::write(&fixture_path, sse).unwrap(); - - let provider = ModelProviderInfo { - name: "test".to_string(), - base_url: Some("https://test.com".to_string()), - env_key: Some("TEST_API_KEY".to_string()), - env_key_instructions: None, - experimental_bearer_token: None, - wire_api: WireApi::Responses, - query_params: None, - http_headers: None, - env_http_headers: None, - request_max_retries: Some(0), - stream_max_retries: Some(0), - stream_idle_timeout_ms: Some(1000), - requires_openai_auth: false, - }; - - let otel_event_manager = otel_event_manager(); - let result = - stream_from_fixture(fixture_path.as_path(), provider, otel_event_manager.clone()).await; - - let mut stream = result.expect("stream should be created"); - let mut events = Vec::new(); - while let Some(event) = stream.next().await { - events.push(event); - } - - assert_eq!(events.len(), 1); - matches!(events[0], Err(CodexErr::QuotaExceeded)); - - let delay = otel_event_manager - .last_idle_timeout() - .unwrap_or(Duration::ZERO); - assert_eq!(delay, Duration::ZERO); } }