Compare commits

...

1 Commits

Author SHA1 Message Date
Sayan Sisodiya
1bb860c88f make /compact client handle SSE so we can send keepalives 2026-02-09 22:56:01 -08:00
4 changed files with 393 additions and 19 deletions

View File

@@ -5,12 +5,18 @@ use crate::error::ApiError;
use crate::provider::Provider;
use codex_client::HttpTransport;
use codex_client::RequestTelemetry;
use codex_client::TransportError;
use codex_protocol::models::ResponseItem;
use eventsource_stream::Eventsource;
use futures::StreamExt;
use http::HeaderMap;
use http::HeaderValue;
use http::Method;
use http::StatusCode;
use serde::Deserialize;
use serde_json::to_value;
use std::sync::Arc;
use tokio::time::timeout;
pub struct CompactClient<T: HttpTransport, A: AuthProvider> {
session: EndpointSession<T, A>,
@@ -33,6 +39,10 @@ impl<T: HttpTransport, A: AuthProvider> CompactClient<T, A> {
"responses/compact"
}
fn stream_path() -> &'static str {
"responses/compact/stream"
}
pub async fn compact(
&self,
body: serde_json::Value,
@@ -47,6 +57,93 @@ impl<T: HttpTransport, A: AuthProvider> CompactClient<T, A> {
Ok(parsed.output)
}
pub async fn compact_stream(
&self,
body: serde_json::Value,
extra_headers: HeaderMap,
) -> Result<Vec<ResponseItem>, ApiError> {
let stream_response = self
.session
.stream_with(
Method::POST,
Self::stream_path(),
extra_headers,
Some(body),
|req| {
req.headers.insert(
http::header::ACCEPT,
HeaderValue::from_static("text/event-stream"),
);
},
)
.await?;
let mut stream = stream_response.bytes.eventsource();
loop {
let sse =
match timeout(self.session.provider().stream_idle_timeout, stream.next()).await {
Ok(Some(Ok(sse))) => sse,
Ok(Some(Err(error))) => return Err(ApiError::Stream(error.to_string())),
Ok(None) => {
return Err(ApiError::Stream(
"stream closed before compact.completed".to_string(),
));
}
Err(_) => {
return Err(ApiError::Stream(
"idle timeout waiting for compact stream".to_string(),
));
}
};
let event: CompactStreamEvent = match serde_json::from_str(&sse.data) {
Ok(event) => event,
Err(_) => continue,
};
match event.kind.as_str() {
"compact.keepalive" => continue,
"compact.completed" => {
if let Some(response) = event.response {
return Ok(response.output);
}
return Err(ApiError::Stream(
"compact.completed missing response payload".to_string(),
));
}
"compact.failed" => {
let status = event
.status
.and_then(|status| StatusCode::from_u16(status).ok())
.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
let message = event
.message
.unwrap_or_else(|| "compact stream failed".to_string());
let mut response_headers = HeaderMap::new();
if let Some(model_cap_model) = event.model_cap_model
&& let Ok(model_cap_header) = HeaderValue::from_str(&model_cap_model)
{
response_headers.insert(MODEL_CAP_MODEL_HEADER, model_cap_header);
}
if let Some(model_cap_reset_after_seconds) = event.model_cap_reset_after_seconds
&& let Ok(reset_after_header) =
HeaderValue::from_str(&model_cap_reset_after_seconds.to_string())
{
response_headers.insert(MODEL_CAP_RESET_AFTER_HEADER, reset_after_header);
}
let headers = (!response_headers.is_empty()).then_some(response_headers);
return Err(ApiError::Transport(TransportError::Http {
status,
url: None,
headers,
body: Some(message),
}));
}
_ => continue,
}
}
}
pub async fn compact_input(
&self,
input: &CompactionInput<'_>,
@@ -56,6 +153,16 @@ impl<T: HttpTransport, A: AuthProvider> CompactClient<T, A> {
.map_err(|e| ApiError::Stream(format!("failed to encode compaction input: {e}")))?;
self.compact(body, extra_headers).await
}
pub async fn compact_stream_input(
&self,
input: &CompactionInput<'_>,
extra_headers: HeaderMap,
) -> Result<Vec<ResponseItem>, ApiError> {
let body = to_value(input)
.map_err(|e| ApiError::Stream(format!("failed to encode compaction input: {e}")))?;
self.compact_stream(body, extra_headers).await
}
}
#[derive(Debug, Deserialize)]
@@ -63,28 +170,35 @@ struct CompactHistoryResponse {
output: Vec<ResponseItem>,
}
#[derive(Debug, Deserialize)]
struct CompactStreamEvent {
#[serde(rename = "type")]
kind: String,
response: Option<CompactHistoryResponse>,
status: Option<u16>,
message: Option<String>,
model_cap_model: Option<String>,
model_cap_reset_after_seconds: Option<u64>,
}
const MODEL_CAP_MODEL_HEADER: &str = "x-codex-model-cap-model";
const MODEL_CAP_RESET_AFTER_HEADER: &str = "x-codex-model-cap-reset-after-seconds";
#[cfg(test)]
mod tests {
use super::*;
use crate::auth::AuthProvider;
use crate::provider::RetryConfig;
use async_trait::async_trait;
use bytes::Bytes;
use codex_client::Request;
use codex_client::RequestCompression;
use codex_client::Response;
use codex_client::StreamResponse;
use codex_client::TransportError;
#[derive(Clone, Default)]
struct DummyTransport;
#[async_trait]
impl HttpTransport for DummyTransport {
async fn execute(&self, _req: Request) -> Result<Response, TransportError> {
Err(TransportError::Build("execute should not run".to_string()))
}
async fn stream(&self, _req: Request) -> Result<StreamResponse, TransportError> {
Err(TransportError::Build("stream should not run".to_string()))
}
}
use futures::stream;
use http::HeaderMap;
use pretty_assertions::assert_eq;
use std::time::Duration;
#[derive(Clone, Default)]
struct DummyAuth;
@@ -95,10 +209,176 @@ mod tests {
}
}
fn provider(base_url: &str) -> Provider {
Provider {
name: "test".to_string(),
base_url: base_url.to_string(),
query_params: None,
headers: HeaderMap::new(),
retry: RetryConfig {
max_attempts: 1,
base_delay: Duration::from_millis(1),
retry_429: false,
retry_5xx: true,
retry_transport: true,
},
stream_idle_timeout: Duration::from_secs(1),
}
}
#[derive(Clone)]
struct StaticResponseTransport {
response: Response,
}
#[async_trait]
impl HttpTransport for StaticResponseTransport {
async fn execute(&self, _req: Request) -> Result<Response, TransportError> {
Ok(self.response.clone())
}
async fn stream(&self, _req: Request) -> Result<StreamResponse, TransportError> {
Err(TransportError::Build("stream should not run".to_string()))
}
}
#[derive(Clone)]
struct StaticStreamTransport {
stream_body: Bytes,
}
#[async_trait]
impl HttpTransport for StaticStreamTransport {
async fn execute(&self, _req: Request) -> Result<Response, TransportError> {
Err(TransportError::Build("execute should not run".to_string()))
}
async fn stream(&self, req: Request) -> Result<StreamResponse, TransportError> {
assert_eq!(req.compression, RequestCompression::None);
let bytes = self.stream_body.clone();
let stream = stream::iter(vec![Ok(bytes)]);
Ok(StreamResponse {
status: StatusCode::OK,
headers: HeaderMap::new(),
bytes: Box::pin(stream),
})
}
}
#[tokio::test]
async fn compact_stream_returns_output_from_completed_event() {
let stream_body = concat!(
"event: compact.keepalive\n",
"data: {\"type\":\"compact.keepalive\"}\n\n",
"event: compact.completed\n",
"data: {\"type\":\"compact.completed\",\"response\":{\"output\":[{\"type\":\"compaction_summary\",\"encrypted_content\":\"abc\"}]}}\n\n"
)
.to_string();
let client = CompactClient::new(
StaticStreamTransport {
stream_body: Bytes::from(stream_body),
},
provider("https://example.com/api/codex"),
DummyAuth,
);
let output = client
.compact_stream(serde_json::json!({}), HeaderMap::new())
.await
.expect("stream compact should succeed");
assert_eq!(
output,
vec![ResponseItem::Compaction {
encrypted_content: "abc".to_string()
}]
);
}
#[tokio::test]
async fn compact_stream_returns_api_error_from_failed_event() {
let stream_body = concat!(
"event: compact.failed\n",
"data: {\"type\":\"compact.failed\",\"status\":429,\"message\":\"slow down\"}\n\n"
);
let client = CompactClient::new(
StaticStreamTransport {
stream_body: Bytes::from(stream_body),
},
provider("https://example.com/api/codex"),
DummyAuth,
);
let error = client
.compact_stream(serde_json::json!({}), HeaderMap::new())
.await
.expect_err("stream compact should fail");
match error {
ApiError::Transport(TransportError::Http {
status,
headers,
body,
..
}) => {
assert_eq!(status, StatusCode::TOO_MANY_REQUESTS);
assert_eq!(body, Some("slow down".to_string()));
assert_eq!(headers, None);
}
other => panic!("expected ApiError::Transport(Http), got {other:?}"),
}
}
#[tokio::test]
async fn compact_stream_failed_event_carries_model_cap_headers() {
let stream_body = concat!(
"event: compact.failed\n",
"data: {\"type\":\"compact.failed\",\"status\":429,\"message\":\"slow down\",\"model_cap_model\":\"gpt-5-codex\",\"model_cap_reset_after_seconds\":30}\n\n"
);
let client = CompactClient::new(
StaticStreamTransport {
stream_body: Bytes::from(stream_body),
},
provider("https://example.com/api/codex"),
DummyAuth,
);
let error = client
.compact_stream(serde_json::json!({}), HeaderMap::new())
.await
.expect_err("stream compact should fail");
match error {
ApiError::Transport(TransportError::Http {
status,
headers,
body,
..
}) => {
assert_eq!(status, StatusCode::TOO_MANY_REQUESTS);
assert_eq!(body, Some("slow down".to_string()));
let headers = headers.expect("headers should be present");
assert_eq!(
headers
.get(MODEL_CAP_MODEL_HEADER)
.expect("model cap header should be present"),
"gpt-5-codex"
);
assert_eq!(
headers
.get(MODEL_CAP_RESET_AFTER_HEADER)
.expect("reset-after header should be present"),
"30"
);
}
other => panic!("expected ApiError::Transport(Http), got {other:?}"),
}
}
#[test]
fn path_is_responses_compact() {
assert_eq!(
CompactClient::<DummyTransport, DummyAuth>::path(),
CompactClient::<StaticResponseTransport, DummyAuth>::path(),
"responses/compact"
);
}

View File

@@ -269,10 +269,23 @@ impl ModelClient {
};
let extra_headers = self.build_subagent_headers();
client
.compact_input(&payload, extra_headers)
// Use the stream compact endpoint if available, otherwise fallback to the unary endpoint.
match client
.compact_stream_input(&payload, extra_headers.clone())
.await
.map_err(map_api_error)
{
Ok(output) => Ok(output),
Err(ApiError::Transport(TransportError::Http { status, .. }))
if status == HttpStatusCode::NOT_FOUND
|| status == HttpStatusCode::METHOD_NOT_ALLOWED =>
{
client
.compact_input(&payload, extra_headers)
.await
.map_err(map_api_error)
}
Err(err) => Err(map_api_error(err)),
}
}
/// Builds memory summaries for each provided normalized trace.

View File

@@ -756,6 +756,14 @@ fn compact_mock() -> (MockBuilder, ResponseMock) {
(mock, response_mock)
}
fn compact_stream_mock() -> (MockBuilder, ResponseMock) {
let response_mock = ResponseMock::new();
let mock = Mock::given(method("POST"))
.and(path_regex(".*/responses/compact/stream$"))
.and(response_mock.clone());
(mock, response_mock)
}
fn models_mock() -> (MockBuilder, ModelsMock) {
let models_mock = ModelsMock::new();
let mock = Mock::given(method("GET"))
@@ -820,6 +828,27 @@ pub async fn mount_compact_json_once(server: &MockServer, body: serde_json::Valu
response_mock
}
pub async fn mount_compact_stream_json_once(
server: &MockServer,
body: serde_json::Value,
) -> ResponseMock {
let (mock, response_mock) = compact_stream_mock();
let event = serde_json::json!({
"type": "compact.completed",
"response": body,
});
let response_body = format!("event: compact.completed\ndata: {event}\n\n");
mock.respond_with(
ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_string(response_body),
)
.up_to_n_times(1)
.mount(server)
.await;
response_mock
}
pub async fn mount_models_once(server: &MockServer, body: ModelsResponse) -> ModelsMock {
let (mock, models_mock) = models_mock();
mock.respond_with(

View File

@@ -156,6 +156,58 @@ async fn remote_compact_replaces_history_for_followups() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_uses_stream_endpoint_when_available() -> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()),
)
.await?;
let codex = harness.test().codex.clone();
mount_sse_once(
harness.server(),
sse(vec![
responses::ev_assistant_message("m1", "FIRST_REMOTE_REPLY"),
responses::ev_completed("resp-1"),
]),
)
.await;
let compact_stream_mock = responses::mount_compact_stream_json_once(
harness.server(),
serde_json::json!({
"output": [
{
"type": "compaction_summary",
"encrypted_content": "ENCRYPTED_STREAM_COMPACTION_SUMMARY",
}
],
}),
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "hello remote compact".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
codex.submit(Op::Compact).await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let compact_request = compact_stream_mock.single_request();
assert_eq!(compact_request.path(), "/v1/responses/compact/stream");
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_runs_automatically() -> Result<()> {
skip_if_no_network!(Ok(()));