Files
codex/codex-rs/network-proxy/src/http_proxy.rs

945 lines
33 KiB
Rust

use crate::config::NetworkMode;
use crate::network_policy::NetworkDecision;
use crate::network_policy::NetworkDecisionSource;
use crate::network_policy::NetworkPolicyDecider;
use crate::network_policy::NetworkPolicyDecision;
use crate::network_policy::NetworkPolicyRequest;
use crate::network_policy::NetworkPolicyRequestArgs;
use crate::network_policy::NetworkProtocol;
use crate::network_policy::NonDomainDenyAuditEventArgs;
use crate::network_policy::emit_non_domain_deny_audit_event;
use crate::network_policy::evaluate_host_policy;
use crate::policy::normalize_host;
use crate::reasons::REASON_METHOD_NOT_ALLOWED;
use crate::reasons::REASON_NOT_ALLOWED;
use crate::reasons::REASON_PROXY_DISABLED;
use crate::reasons::REASON_UNIX_SOCKET_UNSUPPORTED;
use crate::responses::PolicyDecisionDetails;
use crate::responses::blocked_header_value;
use crate::responses::blocked_message_with_policy;
use crate::responses::blocked_text_response_with_policy;
use crate::responses::json_response;
use crate::responses::policy_decision_prefix;
use crate::runtime::unix_socket_permissions_supported;
use crate::state::BlockedRequest;
use crate::state::BlockedRequestArgs;
use crate::state::NetworkProxyState;
use crate::upstream::UpstreamClient;
use crate::upstream::proxy_for_connect;
use anyhow::Context as _;
use anyhow::Result;
use rama_core::Layer;
use rama_core::Service;
use rama_core::error::BoxError;
use rama_core::error::ErrorExt as _;
use rama_core::error::OpaqueError;
use rama_core::extensions::ExtensionsMut;
use rama_core::extensions::ExtensionsRef;
use rama_core::layer::AddInputExtensionLayer;
use rama_core::rt::Executor;
use rama_core::service::service_fn;
use rama_http::Body;
use rama_http::HeaderValue;
use rama_http::Request;
use rama_http::Response;
use rama_http::StatusCode;
use rama_http::layer::remove_header::RemoveRequestHeaderLayer;
use rama_http::layer::remove_header::RemoveResponseHeaderLayer;
use rama_http::matcher::MethodMatcher;
use rama_http_backend::client::proxy::layer::HttpProxyConnector;
use rama_http_backend::server::HttpServer;
use rama_http_backend::server::layer::upgrade::UpgradeLayer;
use rama_http_backend::server::layer::upgrade::Upgraded;
use rama_net::Protocol;
use rama_net::address::ProxyAddress;
use rama_net::client::ConnectorService;
use rama_net::client::EstablishedClientConnection;
use rama_net::http::RequestContext;
use rama_net::proxy::ProxyRequest;
use rama_net::proxy::ProxyTarget;
use rama_net::proxy::StreamForwardService;
use rama_net::stream::SocketInfo;
use rama_tcp::client::Request as TcpRequest;
use rama_tcp::client::service::TcpConnector;
use rama_tcp::server::TcpListener;
use rama_tls_rustls::client::TlsConnectorDataBuilder;
use rama_tls_rustls::client::TlsConnectorLayer;
use serde::Serialize;
use std::convert::Infallible;
use std::net::SocketAddr;
use std::net::TcpListener as StdTcpListener;
use std::sync::Arc;
use tracing::error;
use tracing::info;
use tracing::warn;
pub async fn run_http_proxy(
state: Arc<NetworkProxyState>,
addr: SocketAddr,
policy_decider: Option<Arc<dyn NetworkPolicyDecider>>,
) -> Result<()> {
let listener = TcpListener::build()
.bind(addr)
.await
// Rama's `BoxError` is a `Box<dyn Error + Send + Sync>` without an explicit `'static`
// lifetime bound, which means it doesn't satisfy `anyhow::Context`'s `StdError` constraint.
// Wrap it in Rama's `OpaqueError` so we can preserve the original error as a source and
// still use `anyhow` for chaining.
.map_err(rama_core::error::OpaqueError::from)
.map_err(anyhow::Error::from)
.with_context(|| format!("bind HTTP proxy: {addr}"))?;
run_http_proxy_with_listener(state, listener, policy_decider).await
}
pub async fn run_http_proxy_with_std_listener(
state: Arc<NetworkProxyState>,
listener: StdTcpListener,
policy_decider: Option<Arc<dyn NetworkPolicyDecider>>,
) -> Result<()> {
let listener =
TcpListener::try_from(listener).context("convert std listener to HTTP proxy listener")?;
run_http_proxy_with_listener(state, listener, policy_decider).await
}
async fn run_http_proxy_with_listener(
state: Arc<NetworkProxyState>,
listener: TcpListener,
policy_decider: Option<Arc<dyn NetworkPolicyDecider>>,
) -> Result<()> {
let addr = listener
.local_addr()
.context("read HTTP proxy listener local addr")?;
let http_service = HttpServer::auto(Executor::new()).service(
(
UpgradeLayer::new(
MethodMatcher::CONNECT,
service_fn({
let policy_decider = policy_decider.clone();
move |req| http_connect_accept(policy_decider.clone(), req)
}),
service_fn(http_connect_proxy),
),
RemoveResponseHeaderLayer::hop_by_hop(),
RemoveRequestHeaderLayer::hop_by_hop(),
)
.into_layer(service_fn({
let policy_decider = policy_decider.clone();
move |req| http_plain_proxy(policy_decider.clone(), req)
})),
);
info!("HTTP proxy listening on {addr}");
listener
.serve(AddInputExtensionLayer::new(state).into_layer(http_service))
.await;
Ok(())
}
async fn http_connect_accept(
policy_decider: Option<Arc<dyn NetworkPolicyDecider>>,
mut req: Request,
) -> Result<(Response, Request), Response> {
let app_state = req
.extensions()
.get::<Arc<NetworkProxyState>>()
.cloned()
.ok_or_else(|| text_response(StatusCode::INTERNAL_SERVER_ERROR, "missing state"))?;
let authority = match RequestContext::try_from(&req).map(|ctx| ctx.host_with_port()) {
Ok(authority) => authority,
Err(err) => {
warn!("CONNECT missing authority: {err}");
return Err(text_response(StatusCode::BAD_REQUEST, "missing authority"));
}
};
let host = normalize_host(&authority.host.to_string());
if host.is_empty() {
return Err(text_response(StatusCode::BAD_REQUEST, "invalid host"));
}
let client = client_addr(&req);
let enabled = app_state
.enabled()
.await
.map_err(|err| internal_error("failed to read enabled state", err))?;
if !enabled {
let client = client.as_deref().unwrap_or_default();
warn!("CONNECT blocked; proxy disabled (client={client}, host={host})");
return Err(proxy_disabled_response(
&app_state,
host,
authority.port,
client_addr(&req),
Some("CONNECT".to_string()),
NetworkProtocol::HttpsConnect,
None,
)
.await);
}
let request = NetworkPolicyRequest::new(NetworkPolicyRequestArgs {
protocol: NetworkProtocol::HttpsConnect,
host: host.clone(),
port: authority.port,
client_addr: client.clone(),
method: Some("CONNECT".to_string()),
command: None,
exec_policy_hint: None,
});
match evaluate_host_policy(&app_state, policy_decider.as_ref(), &request).await {
Ok(NetworkDecision::Deny {
reason,
source,
decision,
}) => {
let details = PolicyDecisionDetails {
decision,
reason: &reason,
source,
protocol: NetworkProtocol::HttpsConnect,
host: &host,
port: authority.port,
};
let _ = app_state
.record_blocked(BlockedRequest::new(BlockedRequestArgs {
host: host.clone(),
reason: reason.clone(),
client: client.clone(),
method: Some("CONNECT".to_string()),
mode: None,
protocol: "http-connect".to_string(),
}))
.await;
let client = client.as_deref().unwrap_or_default();
warn!("CONNECT blocked (client={client}, host={host}, reason={reason})");
return Err(blocked_text_with_details(&reason, &details));
}
Ok(NetworkDecision::Allow) => {
let client = client.as_deref().unwrap_or_default();
info!("CONNECT allowed (client={client}, host={host})");
}
Err(err) => {
error!("failed to evaluate host for CONNECT {host}: {err}");
return Err(text_response(StatusCode::INTERNAL_SERVER_ERROR, "error"));
}
}
let mode = app_state
.network_mode()
.await
.map_err(|err| internal_error("failed to read network mode", err))?;
if mode == NetworkMode::Limited {
emit_non_domain_deny_audit_event(NonDomainDenyAuditEventArgs {
source: NetworkDecisionSource::ModeGuard,
reason: REASON_METHOD_NOT_ALLOWED,
protocol: NetworkProtocol::HttpsConnect,
host: &host,
port: authority.port,
method: Some("CONNECT"),
client_addr: client.as_deref(),
metadata: app_state.audit_metadata(),
});
let details = PolicyDecisionDetails {
decision: NetworkPolicyDecision::Deny,
reason: REASON_METHOD_NOT_ALLOWED,
source: NetworkDecisionSource::ModeGuard,
protocol: NetworkProtocol::HttpsConnect,
host: &host,
port: authority.port,
};
let _ = app_state
.record_blocked(BlockedRequest::new(BlockedRequestArgs {
host: host.clone(),
reason: REASON_METHOD_NOT_ALLOWED.to_string(),
client: client.clone(),
method: Some("CONNECT".to_string()),
mode: Some(NetworkMode::Limited),
protocol: "http-connect".to_string(),
}))
.await;
let client = client.as_deref().unwrap_or_default();
warn!("CONNECT blocked by method policy (client={client}, host={host}, mode=limited)");
return Err(blocked_text_with_details(
REASON_METHOD_NOT_ALLOWED,
&details,
));
}
req.extensions_mut().insert(ProxyTarget(authority));
req.extensions_mut().insert(mode);
Ok((
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap_or_else(|_| Response::new(Body::empty())),
req,
))
}
async fn http_connect_proxy(upgraded: Upgraded) -> Result<(), Infallible> {
if upgraded.extensions().get::<ProxyTarget>().is_none() {
warn!("CONNECT missing proxy target");
return Ok(());
}
let allow_upstream_proxy = match upgraded
.extensions()
.get::<Arc<NetworkProxyState>>()
.cloned()
{
Some(state) => match state.allow_upstream_proxy().await {
Ok(allowed) => allowed,
Err(err) => {
error!("failed to read upstream proxy setting: {err}");
false
}
},
None => {
error!("missing app state");
false
}
};
let proxy = if allow_upstream_proxy {
proxy_for_connect()
} else {
None
};
if let Err(err) = forward_connect_tunnel(upgraded, proxy).await {
warn!("tunnel error: {err}");
}
Ok(())
}
async fn forward_connect_tunnel(
upgraded: Upgraded,
proxy: Option<ProxyAddress>,
) -> Result<(), BoxError> {
let authority = upgraded
.extensions()
.get::<ProxyTarget>()
.map(|target| target.0.clone())
.ok_or_else(|| OpaqueError::from_display("missing forward authority").into_boxed())?;
let mut extensions = upgraded.extensions().clone();
if let Some(proxy) = proxy {
extensions.insert(proxy);
}
let req = TcpRequest::new_with_extensions(authority.clone(), extensions)
.with_protocol(Protocol::HTTPS);
let proxy_connector = HttpProxyConnector::optional(TcpConnector::new());
let tls_config = TlsConnectorDataBuilder::new()
.with_alpn_protocols_http_auto()
.build();
let connector = TlsConnectorLayer::tunnel(None)
.with_connector_data(tls_config)
.into_layer(proxy_connector);
let EstablishedClientConnection { conn: target, .. } =
connector.connect(req).await.map_err(|err| {
OpaqueError::from_boxed(err)
.with_context(|| format!("establish CONNECT tunnel to {authority}"))
.into_boxed()
})?;
let proxy_req = ProxyRequest {
source: upgraded,
target,
};
StreamForwardService::default()
.serve(proxy_req)
.await
.map_err(|err| {
OpaqueError::from_boxed(err.into())
.with_context(|| format!("forward CONNECT tunnel to {authority}"))
.into_boxed()
})
}
async fn http_plain_proxy(
policy_decider: Option<Arc<dyn NetworkPolicyDecider>>,
req: Request,
) -> Result<Response, Infallible> {
let app_state = match req.extensions().get::<Arc<NetworkProxyState>>().cloned() {
Some(state) => state,
None => {
error!("missing app state");
return Ok(text_response(StatusCode::INTERNAL_SERVER_ERROR, "error"));
}
};
let client = client_addr(&req);
let method_allowed = match app_state
.method_allowed(req.method().as_str())
.await
.map_err(|err| internal_error("failed to evaluate method policy", err))
{
Ok(allowed) => allowed,
Err(resp) => return Ok(resp),
};
// `x-unix-socket` is an escape hatch for talking to local daemons. We keep it tightly scoped:
// macOS-only + explicit allowlist, to avoid turning the proxy into a general local capability
// escalation mechanism.
if let Some(unix_socket_header) = req.headers().get("x-unix-socket") {
let socket_path = match unix_socket_header.to_str() {
Ok(value) => value.to_string(),
Err(_) => {
warn!("invalid x-unix-socket header value (non-UTF8)");
return Ok(text_response(
StatusCode::BAD_REQUEST,
"invalid x-unix-socket header",
));
}
};
let enabled = match app_state
.enabled()
.await
.map_err(|err| internal_error("failed to read enabled state", err))
{
Ok(enabled) => enabled,
Err(resp) => return Ok(resp),
};
if !enabled {
let client = client.as_deref().unwrap_or_default();
warn!("unix socket blocked; proxy disabled (client={client}, path={socket_path})");
return Ok(proxy_disabled_response(
&app_state,
socket_path,
0,
client_addr(&req),
Some(req.method().as_str().to_string()),
NetworkProtocol::Http,
Some("unix-socket"),
)
.await);
}
if !method_allowed {
emit_non_domain_deny_audit_event(NonDomainDenyAuditEventArgs {
source: NetworkDecisionSource::ModeGuard,
reason: REASON_METHOD_NOT_ALLOWED,
protocol: NetworkProtocol::Http,
host: "unix-socket",
port: 0,
method: Some(req.method().as_str()),
client_addr: client.as_deref(),
metadata: app_state.audit_metadata(),
});
let client = client.as_deref().unwrap_or_default();
let method = req.method();
warn!(
"unix socket blocked by method policy (client={client}, method={method}, mode=limited, allowed_methods=GET, HEAD, OPTIONS)"
);
return Ok(json_blocked("unix-socket", REASON_METHOD_NOT_ALLOWED, None));
}
if !unix_socket_permissions_supported() {
emit_non_domain_deny_audit_event(NonDomainDenyAuditEventArgs {
source: NetworkDecisionSource::ProxyState,
reason: REASON_UNIX_SOCKET_UNSUPPORTED,
protocol: NetworkProtocol::Http,
host: "unix-socket",
port: 0,
method: Some(req.method().as_str()),
client_addr: client.as_deref(),
metadata: app_state.audit_metadata(),
});
warn!("unix socket proxy unsupported on this platform (path={socket_path})");
return Ok(text_response(
StatusCode::NOT_IMPLEMENTED,
"unix sockets unsupported",
));
}
return match app_state.is_unix_socket_allowed(&socket_path).await {
Ok(true) => {
let client = client.as_deref().unwrap_or_default();
info!("unix socket allowed (client={client}, path={socket_path})");
match proxy_via_unix_socket(req, &socket_path).await {
Ok(resp) => Ok(resp),
Err(err) => {
warn!("unix socket proxy failed: {err}");
Ok(text_response(
StatusCode::BAD_GATEWAY,
"unix socket proxy failed",
))
}
}
}
Ok(false) => {
emit_non_domain_deny_audit_event(NonDomainDenyAuditEventArgs {
source: NetworkDecisionSource::ProxyState,
reason: REASON_NOT_ALLOWED,
protocol: NetworkProtocol::Http,
host: "unix-socket",
port: 0,
method: Some(req.method().as_str()),
client_addr: client.as_deref(),
metadata: app_state.audit_metadata(),
});
let client = client.as_deref().unwrap_or_default();
warn!("unix socket blocked (client={client}, path={socket_path})");
Ok(json_blocked("unix-socket", REASON_NOT_ALLOWED, None))
}
Err(err) => {
warn!("unix socket check failed: {err}");
Ok(text_response(StatusCode::INTERNAL_SERVER_ERROR, "error"))
}
};
}
let authority = match RequestContext::try_from(&req).map(|ctx| ctx.host_with_port()) {
Ok(authority) => authority,
Err(err) => {
warn!("missing host: {err}");
return Ok(text_response(StatusCode::BAD_REQUEST, "missing host"));
}
};
let host = normalize_host(&authority.host.to_string());
let port = authority.port;
let enabled = match app_state
.enabled()
.await
.map_err(|err| internal_error("failed to read enabled state", err))
{
Ok(enabled) => enabled,
Err(resp) => return Ok(resp),
};
if !enabled {
let client = client.as_deref().unwrap_or_default();
let method = req.method();
warn!("request blocked; proxy disabled (client={client}, host={host}, method={method})");
return Ok(proxy_disabled_response(
&app_state,
host,
port,
client_addr(&req),
Some(req.method().as_str().to_string()),
NetworkProtocol::Http,
None,
)
.await);
}
let request = NetworkPolicyRequest::new(NetworkPolicyRequestArgs {
protocol: NetworkProtocol::Http,
host: host.clone(),
port,
client_addr: client.clone(),
method: Some(req.method().as_str().to_string()),
command: None,
exec_policy_hint: None,
});
match evaluate_host_policy(&app_state, policy_decider.as_ref(), &request).await {
Ok(NetworkDecision::Deny {
reason,
source,
decision,
}) => {
let details = PolicyDecisionDetails {
decision,
reason: &reason,
source,
protocol: NetworkProtocol::Http,
host: &host,
port,
};
let _ = app_state
.record_blocked(BlockedRequest::new(BlockedRequestArgs {
host: host.clone(),
reason: reason.clone(),
client: client.clone(),
method: Some(req.method().as_str().to_string()),
mode: None,
protocol: "http".to_string(),
}))
.await;
let client = client.as_deref().unwrap_or_default();
warn!("request blocked (client={client}, host={host}, reason={reason})");
return Ok(json_blocked(&host, &reason, Some(&details)));
}
Ok(NetworkDecision::Allow) => {}
Err(err) => {
error!("failed to evaluate host for {host}: {err}");
return Ok(text_response(StatusCode::INTERNAL_SERVER_ERROR, "error"));
}
}
if !method_allowed {
emit_non_domain_deny_audit_event(NonDomainDenyAuditEventArgs {
source: NetworkDecisionSource::ModeGuard,
reason: REASON_METHOD_NOT_ALLOWED,
protocol: NetworkProtocol::Http,
host: &host,
port,
method: Some(req.method().as_str()),
client_addr: client.as_deref(),
metadata: app_state.audit_metadata(),
});
let details = PolicyDecisionDetails {
decision: NetworkPolicyDecision::Deny,
reason: REASON_METHOD_NOT_ALLOWED,
source: NetworkDecisionSource::ModeGuard,
protocol: NetworkProtocol::Http,
host: &host,
port,
};
let _ = app_state
.record_blocked(BlockedRequest::new(BlockedRequestArgs {
host: host.clone(),
reason: REASON_METHOD_NOT_ALLOWED.to_string(),
client: client.clone(),
method: Some(req.method().as_str().to_string()),
mode: Some(NetworkMode::Limited),
protocol: "http".to_string(),
}))
.await;
let client = client.as_deref().unwrap_or_default();
let method = req.method();
warn!(
"request blocked by method policy (client={client}, host={host}, method={method}, mode=limited, allowed_methods=GET, HEAD, OPTIONS)"
);
return Ok(json_blocked(
&host,
REASON_METHOD_NOT_ALLOWED,
Some(&details),
));
}
let client = client.as_deref().unwrap_or_default();
let method = req.method();
info!("request allowed (client={client}, host={host}, method={method})");
let allow_upstream_proxy = match app_state
.allow_upstream_proxy()
.await
.map_err(|err| internal_error("failed to read upstream proxy config", err))
{
Ok(allow) => allow,
Err(resp) => return Ok(resp),
};
let client = if allow_upstream_proxy {
UpstreamClient::from_env_proxy()
} else {
UpstreamClient::direct()
};
match client.serve(req).await {
Ok(resp) => Ok(resp),
Err(err) => {
warn!("upstream request failed: {err}");
Ok(text_response(StatusCode::BAD_GATEWAY, "upstream failure"))
}
}
}
async fn proxy_via_unix_socket(req: Request, socket_path: &str) -> Result<Response> {
#[cfg(target_os = "macos")]
{
let client = UpstreamClient::unix_socket(socket_path);
let (mut parts, body) = req.into_parts();
let path = parts
.uri
.path_and_query()
.map(rama_http::uri::PathAndQuery::as_str)
.unwrap_or("/");
parts.uri = path
.parse()
.with_context(|| format!("invalid unix socket request path: {path}"))?;
parts.headers.remove("x-unix-socket");
let req = Request::from_parts(parts, body);
client.serve(req).await.map_err(anyhow::Error::from)
}
#[cfg(not(target_os = "macos"))]
{
let _ = req;
let _ = socket_path;
Err(anyhow::anyhow!("unix sockets not supported"))
}
}
fn client_addr<T: ExtensionsRef>(input: &T) -> Option<String> {
input
.extensions()
.get::<SocketInfo>()
.map(|info| info.peer_addr().to_string())
}
fn json_blocked(host: &str, reason: &str, details: Option<&PolicyDecisionDetails<'_>>) -> Response {
let (policy_decision_prefix, message) = details
.map(|details| {
(
Some(policy_decision_prefix(details)),
Some(blocked_message_with_policy(reason, details)),
)
})
.unwrap_or((None, None));
let response = BlockedResponse {
status: "blocked",
host,
reason,
policy_decision_prefix,
message,
};
let mut resp = json_response(&response);
*resp.status_mut() = StatusCode::FORBIDDEN;
resp.headers_mut().insert(
"x-proxy-error",
HeaderValue::from_static(blocked_header_value(reason)),
);
resp
}
fn blocked_text_with_details(reason: &str, details: &PolicyDecisionDetails<'_>) -> Response {
blocked_text_response_with_policy(reason, details)
}
async fn proxy_disabled_response(
app_state: &NetworkProxyState,
host: String,
port: u16,
client: Option<String>,
method: Option<String>,
protocol: NetworkProtocol,
audit_host: Option<&str>,
) -> Response {
emit_non_domain_deny_audit_event(NonDomainDenyAuditEventArgs {
source: NetworkDecisionSource::ProxyState,
reason: REASON_PROXY_DISABLED,
protocol,
host: audit_host.unwrap_or(host.as_str()),
port,
method: method.as_deref(),
client_addr: client.as_deref(),
metadata: app_state.audit_metadata(),
});
let blocked_host = host.clone();
let _ = app_state
.record_blocked(BlockedRequest::new(BlockedRequestArgs {
host: blocked_host,
reason: REASON_PROXY_DISABLED.to_string(),
client,
method,
mode: None,
protocol: protocol.as_policy_protocol().to_string(),
}))
.await;
let details = PolicyDecisionDetails {
decision: NetworkPolicyDecision::Deny,
reason: REASON_PROXY_DISABLED,
source: NetworkDecisionSource::ProxyState,
protocol,
host: &host,
port,
};
text_response(
StatusCode::SERVICE_UNAVAILABLE,
&blocked_message_with_policy(REASON_PROXY_DISABLED, &details),
)
}
fn internal_error(context: &str, err: impl std::fmt::Display) -> Response {
error!("{context}: {err}");
text_response(StatusCode::INTERNAL_SERVER_ERROR, "error")
}
fn text_response(status: StatusCode, body: &str) -> Response {
Response::builder()
.status(status)
.header("content-type", "text/plain")
.body(Body::from(body.to_string()))
.unwrap_or_else(|_| Response::new(Body::from(body.to_string())))
}
#[derive(Serialize)]
struct BlockedResponse<'a> {
status: &'static str,
host: &'a str,
reason: &'a str,
#[serde(skip_serializing_if = "Option::is_none")]
policy_decision_prefix: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
message: Option<String>,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::NetworkMode;
use crate::config::NetworkProxySettings;
use crate::runtime::network_proxy_state_for_policy;
use pretty_assertions::assert_eq;
use rama_http::Method;
use rama_http::Request;
use std::io;
use std::sync::Arc;
use std::sync::Mutex;
use tracing_subscriber::fmt::MakeWriter;
#[derive(Clone, Default)]
struct CapturingMakeWriter {
buffer: Arc<Mutex<Vec<u8>>>,
}
impl CapturingMakeWriter {
fn from_buffer(buffer: Arc<Mutex<Vec<u8>>>) -> Self {
Self { buffer }
}
}
impl<'a> MakeWriter<'a> for CapturingMakeWriter {
type Writer = CapturingWriter;
fn make_writer(&'a self) -> Self::Writer {
CapturingWriter {
buffer: self.buffer.clone(),
}
}
}
struct CapturingWriter {
buffer: Arc<Mutex<Vec<u8>>>,
}
impl io::Write for CapturingWriter {
fn write(&mut self, bytes: &[u8]) -> io::Result<usize> {
self.buffer
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.extend_from_slice(bytes);
Ok(bytes.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
#[tokio::test]
async fn http_connect_accept_blocks_in_limited_mode() {
let policy = NetworkProxySettings {
allowed_domains: vec!["example.com".to_string()],
..Default::default()
};
let state = Arc::new(network_proxy_state_for_policy(policy));
state.set_network_mode(NetworkMode::Limited).await.unwrap();
let mut req = Request::builder()
.method(Method::CONNECT)
.uri("https://example.com:443")
.header("host", "example.com:443")
.body(Body::empty())
.unwrap();
req.extensions_mut().insert(state);
let response = http_connect_accept(None, req).await.unwrap_err();
assert_eq!(response.status(), StatusCode::FORBIDDEN);
assert_eq!(
response.headers().get("x-proxy-error").unwrap(),
"blocked-by-method-policy"
);
}
#[cfg(not(target_os = "macos"))]
#[tokio::test(flavor = "current_thread")]
async fn http_plain_proxy_emits_unix_socket_unsupported_audit_event() {
let policy = NetworkProxySettings::default();
let state = Arc::new(network_proxy_state_for_policy(policy));
let mut req = Request::builder()
.method(Method::GET)
.uri("http://example.com/")
.header("x-unix-socket", "/tmp/example.sock")
.body(Body::empty())
.unwrap();
req.extensions_mut().insert(state);
let buffer = Arc::new(Mutex::new(Vec::new()));
let subscriber = tracing_subscriber::fmt()
.with_ansi(false)
.with_level(false)
.with_target(true)
.without_time()
.with_writer(CapturingMakeWriter::from_buffer(buffer.clone()))
.finish();
let _guard = tracing::subscriber::set_default(subscriber);
let response = http_plain_proxy(None, req).await.unwrap();
assert_eq!(response.status(), StatusCode::NOT_IMPLEMENTED);
let logs = String::from_utf8(
buffer
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone(),
)
.unwrap();
assert!(logs.contains("event.name=\"codex.network_proxy.block_decision\""));
assert!(logs.contains("network.policy.source=\"proxy_state\""));
assert!(logs.contains("network.policy.reason=\"unix_socket_unsupported\""));
assert!(logs.contains("server.address=\"unix-socket\""));
assert!(logs.contains("server.port=0"));
}
#[cfg(target_os = "macos")]
#[tokio::test(flavor = "current_thread")]
async fn http_plain_proxy_emits_unix_socket_not_allowed_audit_event() {
let policy = NetworkProxySettings {
allow_unix_sockets: vec!["/tmp/allowed.sock".to_string()],
..Default::default()
};
let state = Arc::new(network_proxy_state_for_policy(policy));
let mut req = Request::builder()
.method(Method::GET)
.uri("http://example.com/")
.header("x-unix-socket", "/tmp/not-allowed.sock")
.body(Body::empty())
.unwrap();
req.extensions_mut().insert(state);
let buffer = Arc::new(Mutex::new(Vec::new()));
let subscriber = tracing_subscriber::fmt()
.with_ansi(false)
.with_level(false)
.with_target(true)
.without_time()
.with_writer(CapturingMakeWriter::from_buffer(buffer.clone()))
.finish();
let _guard = tracing::subscriber::set_default(subscriber);
let response = http_plain_proxy(None, req).await.unwrap();
assert_eq!(response.status(), StatusCode::FORBIDDEN);
assert_eq!(
response.headers().get("x-proxy-error").unwrap(),
"blocked-by-allowlist"
);
let logs = String::from_utf8(
buffer
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone(),
)
.unwrap();
assert!(logs.contains("event.name=\"codex.network_proxy.block_decision\""));
assert!(logs.contains("network.policy.source=\"proxy_state\""));
assert!(logs.contains("network.policy.reason=\"not_allowed\""));
assert!(logs.contains("server.address=\"unix-socket\""));
assert!(logs.contains("server.port=0"));
}
}