Add feature for optional request compression (#8767)

Adds a new feature
`enable_request_compression` that will compress using zstd requests to
the codex-backend. Currently only enabled for codex-backend so only enabled for openai providers when using chatgpt::auth even when the feature is enabled

Added a new info log line too for evaluating the compression ratio and
overhead off compressing before requesting. You can enable with
`RUST_LOG=$RUST_LOG,codex_client::transport=info`

```
2026-01-06T00:09:48.272113Z  INFO codex_client::transport: Compressed request body with zstd pre_compression_bytes=28914 post_compression_bytes=11485 compression_duration_ms=0
```
This commit is contained in:
Channing Conger
2026-01-07 13:21:40 -08:00
committed by GitHub
parent a9b5e8a136
commit 21c6d40a44
19 changed files with 345 additions and 19 deletions

42
codex-rs/Cargo.lock generated
View File

@@ -819,6 +819,8 @@ version = "1.2.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "deec109607ca693028562ed836a5f1c4b8bd77755c4e132fc5ce11b0b6211ae7"
dependencies = [
"jobserver",
"libc",
"shlex",
]
@@ -1197,6 +1199,7 @@ dependencies = [
"tracing",
"tracing-opentelemetry",
"tracing-subscriber",
"zstd",
]
[[package]]
@@ -1348,6 +1351,7 @@ dependencies = [
"which",
"wildmatch",
"wiremock",
"zstd",
]
[[package]]
@@ -3924,6 +3928,16 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130"
[[package]]
name = "jobserver"
version = "0.1.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9afb3de4395d6b3e67a780b6de64b51c978ecf11cb9a462c66be7d4ca9039d33"
dependencies = [
"getrandom 0.3.3",
"libc",
]
[[package]]
name = "js-sys"
version = "0.3.77"
@@ -8809,6 +8823,34 @@ dependencies = [
"syn 2.0.104",
]
[[package]]
name = "zstd"
version = "0.13.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a"
dependencies = [
"zstd-safe",
]
[[package]]
name = "zstd-safe"
version = "7.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d"
dependencies = [
"zstd-sys",
]
[[package]]
name = "zstd-sys"
version = "2.0.16+zstd.1.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91e19ebc2adc8f83e43039e79776e3fda8ca919132d68a1fed6a5faca2683748"
dependencies = [
"cc",
"pkg-config",
]
[[package]]
name = "zune-core"
version = "0.4.12"

View File

@@ -218,6 +218,7 @@ tracing-subscriber = "0.3.22"
tracing-test = "0.2.5"
tree-sitter = "0.25.10"
tree-sitter-bash = "0.25"
zstd = "0.13"
tree-sitter-highlight = "0.25.10"
ts-rs = "11"
tui-scrollbar = "0.2.1"

View File

@@ -10,6 +10,7 @@ use crate::provider::WireApi;
use crate::sse::chat::spawn_chat_stream;
use crate::telemetry::SseTelemetry;
use codex_client::HttpTransport;
use codex_client::RequestCompression;
use codex_client::RequestTelemetry;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ReasoningItemContent;
@@ -80,7 +81,13 @@ impl<T: HttpTransport, A: AuthProvider> ChatClient<T, A> {
extra_headers: HeaderMap,
) -> Result<ResponseStream, ApiError> {
self.streaming
.stream(self.path(), body, extra_headers, spawn_chat_stream)
.stream(
self.path(),
body,
extra_headers,
RequestCompression::None,
spawn_chat_stream,
)
.await
}
}

View File

@@ -9,9 +9,11 @@ use crate::provider::Provider;
use crate::provider::WireApi;
use crate::requests::ResponsesRequest;
use crate::requests::ResponsesRequestBuilder;
use crate::requests::responses::Compression;
use crate::sse::spawn_response_stream;
use crate::telemetry::SseTelemetry;
use codex_client::HttpTransport;
use codex_client::RequestCompression;
use codex_client::RequestTelemetry;
use codex_protocol::protocol::SessionSource;
use http::HeaderMap;
@@ -33,6 +35,7 @@ pub struct ResponsesOptions {
pub conversation_id: Option<String>,
pub session_source: Option<SessionSource>,
pub extra_headers: HeaderMap,
pub compression: Compression,
}
impl<T: HttpTransport, A: AuthProvider> ResponsesClient<T, A> {
@@ -56,7 +59,8 @@ impl<T: HttpTransport, A: AuthProvider> ResponsesClient<T, A> {
&self,
request: ResponsesRequest,
) -> Result<ResponseStream, ApiError> {
self.stream(request.body, request.headers).await
self.stream(request.body, request.headers, request.compression)
.await
}
#[instrument(level = "trace", skip_all, err)]
@@ -75,6 +79,7 @@ impl<T: HttpTransport, A: AuthProvider> ResponsesClient<T, A> {
conversation_id,
session_source,
extra_headers,
compression,
} = options;
let request = ResponsesRequestBuilder::new(model, &prompt.instructions, &prompt.input)
@@ -88,6 +93,7 @@ impl<T: HttpTransport, A: AuthProvider> ResponsesClient<T, A> {
.session_source(session_source)
.store_override(store_override)
.extra_headers(extra_headers)
.compression(compression)
.build(self.streaming.provider())?;
self.stream_request(request).await
@@ -104,9 +110,21 @@ impl<T: HttpTransport, A: AuthProvider> ResponsesClient<T, A> {
&self,
body: Value,
extra_headers: HeaderMap,
compression: Compression,
) -> Result<ResponseStream, ApiError> {
let compression = match compression {
Compression::None => RequestCompression::None,
Compression::Zstd => RequestCompression::Zstd,
};
self.streaming
.stream(self.path(), body, extra_headers, spawn_response_stream)
.stream(
self.path(),
body,
extra_headers,
compression,
spawn_response_stream,
)
.await
}
}

View File

@@ -6,6 +6,7 @@ use crate::provider::Provider;
use crate::telemetry::SseTelemetry;
use crate::telemetry::run_with_request_telemetry;
use codex_client::HttpTransport;
use codex_client::RequestCompression;
use codex_client::RequestTelemetry;
use codex_client::StreamResponse;
use http::HeaderMap;
@@ -52,6 +53,7 @@ impl<T: HttpTransport, A: AuthProvider> StreamingClient<T, A> {
path: &str,
body: Value,
extra_headers: HeaderMap,
compression: RequestCompression,
spawner: fn(StreamResponse, Duration, Option<Arc<dyn SseTelemetry>>) -> ResponseStream,
) -> Result<ResponseStream, ApiError> {
let builder = || {
@@ -62,6 +64,7 @@ impl<T: HttpTransport, A: AuthProvider> StreamingClient<T, A> {
http::HeaderValue::from_static("text/event-stream"),
);
req.body = Some(body.clone());
req.compression = compression;
add_auth_headers(&self.auth, req)
};

View File

@@ -1,4 +1,5 @@
use codex_client::Request;
use codex_client::RequestCompression;
use codex_client::RetryOn;
use codex_client::RetryPolicy;
use http::Method;
@@ -87,6 +88,7 @@ impl Provider {
url: self.url_for_path(path),
headers: self.headers.clone(),
body: None,
compression: RequestCompression::None,
timeout: None,
}
}

View File

@@ -11,10 +11,18 @@ use codex_protocol::protocol::SessionSource;
use http::HeaderMap;
use serde_json::Value;
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum Compression {
#[default]
None,
Zstd,
}
/// Assembled request body plus headers for a Responses stream request.
pub struct ResponsesRequest {
pub body: Value,
pub headers: HeaderMap,
pub compression: Compression,
}
#[derive(Default)]
@@ -32,6 +40,7 @@ pub struct ResponsesRequestBuilder<'a> {
session_source: Option<SessionSource>,
store_override: Option<bool>,
headers: HeaderMap,
compression: Compression,
}
impl<'a> ResponsesRequestBuilder<'a> {
@@ -94,6 +103,11 @@ impl<'a> ResponsesRequestBuilder<'a> {
self
}
pub fn compression(mut self, compression: Compression) -> Self {
self.compression = compression;
self
}
pub fn build(self, provider: &Provider) -> Result<ResponsesRequest, ApiError> {
let model = self
.model
@@ -138,7 +152,11 @@ impl<'a> ResponsesRequestBuilder<'a> {
insert_header(&mut headers, "x-openai-subagent", &subagent);
}
Ok(ResponsesRequest { body, headers })
Ok(ResponsesRequest {
body,
headers,
compression: self.compression,
})
}
}

View File

@@ -11,6 +11,7 @@ use codex_api::Provider;
use codex_api::ResponsesClient;
use codex_api::ResponsesOptions;
use codex_api::WireApi;
use codex_api::requests::responses::Compression;
use codex_client::HttpTransport;
use codex_client::Request;
use codex_client::Response;
@@ -229,7 +230,9 @@ async fn responses_client_uses_responses_path_for_responses_wire() -> Result<()>
let client = ResponsesClient::new(transport, provider("openai", WireApi::Responses), NoAuth);
let body = serde_json::json!({ "echo": true });
let _stream = client.stream(body, HeaderMap::new()).await?;
let _stream = client
.stream(body, HeaderMap::new(), Compression::None)
.await?;
let requests = state.take_stream_requests();
assert_path_ends_with(&requests, "/responses");
@@ -243,7 +246,9 @@ async fn responses_client_uses_chat_path_for_chat_wire() -> Result<()> {
let client = ResponsesClient::new(transport, provider("openai", WireApi::Chat), NoAuth);
let body = serde_json::json!({ "echo": true });
let _stream = client.stream(body, HeaderMap::new()).await?;
let _stream = client
.stream(body, HeaderMap::new(), Compression::None)
.await?;
let requests = state.take_stream_requests();
assert_path_ends_with(&requests, "/chat/completions");
@@ -258,7 +263,9 @@ async fn streaming_client_adds_auth_headers() -> Result<()> {
let client = ResponsesClient::new(transport, provider("openai", WireApi::Responses), auth);
let body = serde_json::json!({ "model": "gpt-test" });
let _stream = client.stream(body, HeaderMap::new()).await?;
let _stream = client
.stream(body, HeaderMap::new(), Compression::None)
.await?;
let requests = state.take_stream_requests();
assert_eq!(requests.len(), 1);

View File

@@ -9,6 +9,7 @@ use codex_api::Provider;
use codex_api::ResponseEvent;
use codex_api::ResponsesClient;
use codex_api::WireApi;
use codex_api::requests::responses::Compression;
use codex_client::HttpTransport;
use codex_client::Request;
use codex_client::Response;
@@ -124,7 +125,11 @@ async fn responses_stream_parses_items_and_completed_end_to_end() -> Result<()>
let client = ResponsesClient::new(transport, provider("openai", WireApi::Responses), NoAuth);
let mut stream = client
.stream(serde_json::json!({"echo": true}), HeaderMap::new())
.stream(
serde_json::json!({"echo": true}),
HeaderMap::new(),
Compression::None,
)
.await?;
let mut events = Vec::new();
@@ -189,7 +194,11 @@ async fn responses_stream_aggregates_output_text_deltas() -> Result<()> {
let client = ResponsesClient::new(transport, provider("openai", WireApi::Responses), NoAuth);
let stream = client
.stream(serde_json::json!({"echo": true}), HeaderMap::new())
.stream(
serde_json::json!({"echo": true}),
HeaderMap::new(),
Compression::None,
)
.await?;
let mut stream = stream.aggregate();

View File

@@ -19,6 +19,7 @@ thiserror = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt", "time", "sync"] }
tracing = { workspace = true }
tracing-opentelemetry = { workspace = true }
zstd = { workspace = true }
[lints]
workspace = true

View File

@@ -104,6 +104,13 @@ impl CodexRequestBuilder {
self.map(|builder| builder.json(value))
}
pub fn body<B>(self, body: B) -> Self
where
B: Into<reqwest::Body>,
{
self.map(|builder| builder.body(body))
}
pub async fn send(self) -> Result<Response, reqwest::Error> {
let headers = trace_headers();

View File

@@ -11,6 +11,7 @@ pub use crate::default_client::CodexRequestBuilder;
pub use crate::error::StreamError;
pub use crate::error::TransportError;
pub use crate::request::Request;
pub use crate::request::RequestCompression;
pub use crate::request::Response;
pub use crate::retry::RetryOn;
pub use crate::retry::RetryPolicy;

View File

@@ -5,12 +5,20 @@ use serde::Serialize;
use serde_json::Value;
use std::time::Duration;
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum RequestCompression {
#[default]
None,
Zstd,
}
#[derive(Debug, Clone)]
pub struct Request {
pub method: Method,
pub url: String,
pub headers: HeaderMap,
pub body: Option<Value>,
pub compression: RequestCompression,
pub timeout: Option<Duration>,
}
@@ -21,6 +29,7 @@ impl Request {
url,
headers: HeaderMap::new(),
body: None,
compression: RequestCompression::None,
timeout: None,
}
}
@@ -29,6 +38,11 @@ impl Request {
self.body = serde_json::to_value(body).ok();
self
}
pub fn with_compression(mut self, compression: RequestCompression) -> Self {
self.compression = compression;
self
}
}
#[derive(Debug, Clone)]

View File

@@ -2,6 +2,7 @@ use crate::default_client::CodexHttpClient;
use crate::default_client::CodexRequestBuilder;
use crate::error::TransportError;
use crate::request::Request;
use crate::request::RequestCompression;
use crate::request::Response;
use async_trait::async_trait;
use bytes::Bytes;
@@ -41,18 +42,70 @@ impl ReqwestTransport {
}
fn build(&self, req: Request) -> Result<CodexRequestBuilder, TransportError> {
let mut builder = self
.client
.request(
Method::from_bytes(req.method.as_str().as_bytes()).unwrap_or(Method::GET),
&req.url,
)
.headers(req.headers);
if let Some(timeout) = req.timeout {
let Request {
method,
url,
mut headers,
body,
compression,
timeout,
} = req;
let mut builder = self.client.request(
Method::from_bytes(method.as_str().as_bytes()).unwrap_or(Method::GET),
&url,
);
if let Some(timeout) = timeout {
builder = builder.timeout(timeout);
}
if let Some(body) = req.body {
builder = builder.json(&body);
if let Some(body) = body {
if compression != RequestCompression::None {
if headers.contains_key(http::header::CONTENT_ENCODING) {
return Err(TransportError::Build(
"request compression was requested but content-encoding is already set"
.to_string(),
));
}
let json = serde_json::to_vec(&body)
.map_err(|err| TransportError::Build(err.to_string()))?;
let pre_compression_bytes = json.len();
let compression_start = std::time::Instant::now();
let (compressed, content_encoding) = match compression {
RequestCompression::None => unreachable!("guarded by compression != None"),
RequestCompression::Zstd => (
zstd::stream::encode_all(std::io::Cursor::new(json), 3)
.map_err(|err| TransportError::Build(err.to_string()))?,
http::HeaderValue::from_static("zstd"),
),
};
let post_compression_bytes = compressed.len();
let compression_duration = compression_start.elapsed();
// Ensure the server knows to unpack the request body.
headers.insert(http::header::CONTENT_ENCODING, content_encoding);
if !headers.contains_key(http::header::CONTENT_TYPE) {
headers.insert(
http::header::CONTENT_TYPE,
http::HeaderValue::from_static("application/json"),
);
}
tracing::info!(
pre_compression_bytes,
post_compression_bytes,
compression_duration_ms = compression_duration.as_millis(),
"Compressed request body with zstd"
);
builder = builder.headers(headers).body(compressed);
} else {
builder = builder.headers(headers).json(&body);
}
} else {
builder = builder.headers(headers);
}
Ok(builder)
}

View File

@@ -137,6 +137,7 @@ tracing-subscriber = { workspace = true }
tracing-test = { workspace = true, features = ["no-env-filter"] }
walkdir = { workspace = true }
wiremock = { workspace = true }
zstd = { workspace = true }
[package.metadata.cargo-shear]
ignored = ["openssl-sys"]

View File

@@ -17,6 +17,7 @@ use codex_api::TransportError;
use codex_api::common::Reasoning;
use codex_api::create_text_param_for_request;
use codex_api::error::ApiError;
use codex_api::requests::responses::Compression;
use codex_app_server_protocol::AuthMode;
use codex_otel::otel_manager::OtelManager;
use codex_protocol::ThreadId;
@@ -47,6 +48,7 @@ use crate::default_client::build_reqwest_client;
use crate::error::CodexErr;
use crate::error::Result;
use crate::features::FEATURES;
use crate::features::Feature;
use crate::flags::CODEX_RS_SSE_FIXTURE;
use crate::model_provider_info::ModelProviderInfo;
use crate::model_provider_info::WireApi;
@@ -250,6 +252,20 @@ impl ModelClient {
let api_auth = auth_provider_from_auth(auth.clone(), &self.provider).await?;
let transport = ReqwestTransport::new(build_reqwest_client());
let (request_telemetry, sse_telemetry) = self.build_streaming_telemetry();
let compression = if self
.config
.features
.enabled(Feature::EnableRequestCompression)
&& auth
.as_ref()
.is_some_and(|auth| auth.mode == AuthMode::ChatGPT)
&& self.provider.is_openai()
{
Compression::Zstd
} else {
Compression::None
};
let client = ApiResponsesClient::new(transport, api_provider, api_auth)
.with_telemetry(Some(request_telemetry), Some(sse_telemetry));
@@ -262,6 +278,7 @@ impl ModelClient {
conversation_id: Some(conversation_id.clone()),
session_source: Some(session_source.clone()),
extra_headers: beta_feature_headers(&self.config),
compression,
};
let stream_result = client

View File

@@ -89,6 +89,8 @@ pub enum Feature {
Tui2,
/// Enforce UTF8 output in Powershell.
PowershellUtf8,
/// Compress request bodies (zstd) when sending streaming requests to codex-backend.
EnableRequestCompression,
}
impl Feature {
@@ -374,6 +376,12 @@ pub const FEATURES: &[FeatureSpec] = &[
stage: Stage::Experimental,
default_enabled: false,
},
FeatureSpec {
id: Feature::EnableRequestCompression,
key: "enable_request_compression",
stage: Stage::Experimental,
default_enabled: false,
},
FeatureSpec {
id: Feature::Tui2,
key: "tui2",

View File

@@ -44,6 +44,7 @@ mod prompt_caching;
mod quota_exceeded;
mod read_file;
mod remote_models;
mod request_compression;
mod resume;
mod resume_warning;
mod review;

View File

@@ -0,0 +1,116 @@
#![cfg(not(target_os = "windows"))]
use codex_core::CodexAuth;
use codex_core::features::Feature;
use codex_core::protocol::EventMsg;
use codex_core::protocol::Op;
use codex_protocol::user_input::UserInput;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::get_responses_requests;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::sse;
use core_test_support::responses::start_mock_server;
use core_test_support::skip_if_no_network;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use pretty_assertions::assert_eq;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn request_body_is_zstd_compressed_for_codex_backend_when_enabled() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
mount_sse_once(
&server,
sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]),
)
.await;
let base_url = format!("{}/backend-api/codex/v1", server.uri());
let mut builder = test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(move |config| {
config.features.enable(Feature::EnableRequestCompression);
config.model_provider.base_url = Some(base_url);
});
let codex = builder.build(&server).await?.codex;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "compress me".into(),
}],
final_output_json_schema: None,
})
.await?;
// Wait until the task completes so the request definitely hit the server.
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
let requests = get_responses_requests(&server).await;
assert_eq!(requests.len(), 1);
let request = &requests[0];
let content_encoding = request
.headers
.get("content-encoding")
.and_then(|v| v.to_str().ok());
assert_eq!(content_encoding, Some("zstd"));
let decompressed = zstd::stream::decode_all(std::io::Cursor::new(request.body.clone()))?;
let json: serde_json::Value = serde_json::from_slice(&decompressed)?;
assert!(
json.get("input").is_some(),
"expected request body to decode as Responses API JSON"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn request_body_is_not_compressed_for_api_key_auth_even_when_enabled() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
mount_sse_once(
&server,
sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]),
)
.await;
let base_url = format!("{}/backend-api/codex/v1", server.uri());
let mut builder = test_codex().with_config(move |config| {
config.features.enable(Feature::EnableRequestCompression);
config.model_provider.base_url = Some(base_url);
});
let codex = builder.build(&server).await?.codex;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "do not compress".into(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
let requests = get_responses_requests(&server).await;
assert_eq!(requests.len(), 1);
let request = &requests[0];
assert!(
request.headers.get("content-encoding").is_none(),
"did not expect request compression for API-key auth"
);
let json: serde_json::Value = serde_json::from_slice(&request.body)?;
assert!(
json.get("input").is_some(),
"expected request body to be plain Responses API JSON"
);
Ok(())
}