Remove metadata extraction from proxy

This commit is contained in:
viyatb-oai
2026-01-16 23:17:41 -08:00
parent 6c1df8b73e
commit cbb5f48ba3
10 changed files with 866 additions and 337 deletions

View File

@@ -17,6 +17,7 @@ workspace = true
[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
clap = { workspace = true, features = ["derive"] }
codex-app-server-protocol = { workspace = true }
codex-core = { workspace = true }
@@ -28,7 +29,7 @@ time = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["fmt"] }
rama = { version = "=0.3.0-alpha.3", default-features = false, features = ["http-full", "proxy-full", "socks5", "rustls"] }
rama = { version = "=0.3.0-alpha.4", default-features = false, features = ["http-full", "proxy-full", "socks5", "rustls"] }
[dev-dependencies]
pretty_assertions = { workspace = true }

View File

@@ -67,6 +67,13 @@ cargo run -p codex-network-proxy -- init
cargo run -p codex-network-proxy --
```
Optional flags:
```bash
# Enable SOCKS5 UDP associate support (off by default).
cargo run -p codex-network-proxy -- --enable-socks5-udp
```
### 4) Point a client at it
For HTTP(S) traffic:
@@ -97,6 +104,45 @@ In "limited" mode, only `GET`, `HEAD`, and `OPTIONS` are allowed. In addition, H
requires MITM to be enabled to allow read-only HTTPS; otherwise the proxy blocks CONNECT with
reason `mitm_required`.
## Library API
`codex-network-proxy` can be embedded as a library with a thin API:
```rust
use codex_network_proxy::{NetworkProxy, NetworkDecision, NetworkPolicyRequest};
let proxy = NetworkProxy::builder()
.http_addr("127.0.0.1:8080".parse()?)
.socks_addr("127.0.0.1:1080".parse()?)
.admin_addr("127.0.0.1:9000".parse()?)
.policy_decider(|request: NetworkPolicyRequest| async move {
// Example: auto-allow when exec policy already approved a command prefix.
if let Some(command) = request.command.as_deref() {
if command.starts_with("curl ") {
return NetworkDecision::Allow;
}
}
NetworkDecision::Deny {
reason: "policy_denied".to_string(),
}
})
.build()
.await?;
let handle = proxy.run().await?;
handle.shutdown().await?;
```
### Policy hook (exec-policy mapping)
The proxy exposes a policy hook (`NetworkPolicyDecider`) that can override allowlist-only blocks.
It receives `command` and `exec_policy_hint` fields when supplied by the embedding app. This lets
core map exec approvals to network access, e.g. if a user already approved `curl *` for a session,
the decider can auto-allow network requests originating from that command.
**Important:** Explicit deny rules still win. The decider only gets a chance to override
`not_allowed` (allowlist misses), not `denied` or `not_allowed_local`.
## Admin API
The admin API is a small HTTP server intended for debugging and runtime adjustments.

View File

@@ -1,14 +1,20 @@
use crate::config::NetworkMode;
use crate::mitm;
use crate::network_policy::NetworkDecision;
use crate::network_policy::NetworkPolicyDecider;
use crate::network_policy::NetworkPolicyRequest;
use crate::network_policy::NetworkProtocol;
use crate::network_policy::evaluate_host_policy;
use crate::policy::normalize_host;
use crate::responses::blocked_header_value;
use crate::state::AppState;
use crate::state::BlockedRequest;
use anyhow::Context as _;
use anyhow::Result;
use rama::Context;
use rama::Layer;
use rama::Service;
use rama::extensions::ExtensionsMut;
use rama::extensions::ExtensionsRef;
use rama::http::Body;
use rama::http::Request;
use rama::http::Response;
@@ -20,7 +26,7 @@ use rama::http::layer::upgrade::UpgradeLayer;
use rama::http::layer::upgrade::Upgraded;
use rama::http::matcher::MethodMatcher;
use rama::http::server::HttpServer;
use rama::layer::AddExtensionLayer;
use rama::layer::AddInputExtensionLayer;
use rama::net::http::RequestContext;
use rama::net::proxy::ProxyTarget;
use rama::net::stream::SocketInfo;
@@ -35,7 +41,11 @@ use tracing::error;
use tracing::info;
use tracing::warn;
pub async fn run_http_proxy(state: Arc<AppState>, addr: SocketAddr) -> Result<()> {
pub async fn run_http_proxy(
state: Arc<AppState>,
addr: SocketAddr,
policy_decider: Option<Arc<dyn NetworkPolicyDecider>>,
) -> Result<()> {
let listener = TcpListener::build()
.bind(addr)
.await
@@ -51,39 +61,40 @@ pub async fn run_http_proxy(state: Arc<AppState>, addr: SocketAddr) -> Result<()
(
UpgradeLayer::new(
MethodMatcher::CONNECT,
service_fn(http_connect_accept),
service_fn({
let policy_decider = policy_decider.clone();
move |req| http_connect_accept(policy_decider.clone(), req)
}),
service_fn(http_connect_proxy),
),
RemoveResponseHeaderLayer::hop_by_hop(),
RemoveRequestHeaderLayer::hop_by_hop(),
)
.into_layer(service_fn(http_plain_proxy)),
.into_layer(service_fn({
let policy_decider = policy_decider.clone();
move |req| http_plain_proxy(policy_decider.clone(), req)
})),
);
info!("HTTP proxy listening on {addr}");
listener
.serve(AddExtensionLayer::new(state).into_layer(http_service))
.serve(AddInputExtensionLayer::new(state).into_layer(http_service))
.await;
Ok(())
}
async fn http_connect_accept<S>(
mut ctx: Context<S>,
req: Request,
) -> Result<(Response, Context<S>, Request), Response>
where
S: Clone + Send + Sync + 'static,
{
let app_state = ctx
async fn http_connect_accept(
policy_decider: Option<Arc<dyn NetworkPolicyDecider>>,
mut req: Request,
) -> Result<(Response, Request), Response> {
let app_state = req
.extensions()
.get::<Arc<AppState>>()
.cloned()
.ok_or_else(|| text_response(StatusCode::INTERNAL_SERVER_ERROR, "missing state"))?;
let authority = match ctx
.get_or_try_insert_with_ctx::<RequestContext, _>(|ctx| (ctx, &req).try_into())
.map(|ctx| ctx.authority.clone())
{
let authority = match RequestContext::try_from(&req).map(|ctx| ctx.host_with_port()) {
Ok(authority) => authority,
Err(err) => {
warn!("CONNECT missing authority: {err}");
@@ -91,15 +102,25 @@ where
}
};
let host = normalize_host(&authority.host().to_string());
let host = normalize_host(&authority.host.to_string());
if host.is_empty() {
return Err(text_response(StatusCode::BAD_REQUEST, "invalid host"));
}
let client = client_addr(&ctx);
let client = client_addr(&req);
match app_state.host_blocked(&host, authority.port()).await {
Ok((true, reason)) => {
let request = NetworkPolicyRequest::new(
NetworkProtocol::HttpsConnect,
host.clone(),
authority.port,
client.clone(),
Some("CONNECT".to_string()),
None,
None,
);
match evaluate_host_policy(&app_state, policy_decider.as_ref(), &request).await {
Ok(NetworkDecision::Deny { reason }) => {
let _ = app_state
.record_blocked(BlockedRequest::new(
host.clone(),
@@ -114,7 +135,7 @@ where
warn!("CONNECT blocked (client={client}, host={host}, reason={reason})");
return Err(blocked_text(&reason));
}
Ok((false, _)) => {
Ok(NetworkDecision::Allow) => {
let client = client.as_deref().unwrap_or_default();
info!("CONNECT allowed (client={client}, host={host})");
}
@@ -160,10 +181,10 @@ where
return Err(blocked_text("mitm_required"));
}
ctx.insert(ProxyTarget(authority));
ctx.insert(mode);
req.extensions_mut().insert(ProxyTarget(authority));
req.extensions_mut().insert(mode);
if let Some(mitm_state) = mitm_state {
ctx.insert(mitm_state);
req.extensions_mut().insert(mitm_state);
}
Ok((
@@ -171,54 +192,59 @@ where
.status(StatusCode::OK)
.body(Body::empty())
.unwrap_or_else(|_| Response::new(Body::empty())),
ctx,
req,
))
}
async fn http_connect_proxy<S>(ctx: Context<S>, upgraded: Upgraded) -> Result<(), Infallible>
where
S: Clone + Send + Sync + 'static,
{
let mode = ctx
async fn http_connect_proxy(upgraded: Upgraded) -> Result<(), Infallible> {
let mode = upgraded
.extensions()
.get::<NetworkMode>()
.copied()
.unwrap_or(NetworkMode::Full);
let Some(target) = ctx.get::<ProxyTarget>().map(|t| t.0.clone()) else {
let Some(target) = upgraded
.extensions()
.get::<ProxyTarget>()
.map(|t| t.0.clone())
else {
warn!("CONNECT missing proxy target");
return Ok(());
};
let host = normalize_host(&target.host().to_string());
let host = normalize_host(&target.host.to_string());
if ctx.get::<Arc<mitm::MitmState>>().is_some() {
let port = target.port();
if upgraded
.extensions()
.get::<Arc<mitm::MitmState>>()
.is_some()
{
let port = target.port;
info!("CONNECT MITM enabled (host={host}, port={port}, mode={mode:?})");
if let Err(err) = mitm::mitm_tunnel(ctx, upgraded).await {
if let Err(err) = mitm::mitm_tunnel(upgraded).await {
warn!("MITM tunnel error: {err}");
}
return Ok(());
}
let forwarder = Forwarder::ctx();
if let Err(err) = forwarder.serve(ctx, upgraded).await {
if let Err(err) = forwarder.serve(upgraded).await {
warn!("tunnel error: {err}");
}
Ok(())
}
async fn http_plain_proxy<S>(mut ctx: Context<S>, req: Request) -> Result<Response, Infallible>
where
S: Clone + Send + Sync + 'static,
{
let app_state = match ctx.get::<Arc<AppState>>().cloned() {
async fn http_plain_proxy(
policy_decider: Option<Arc<dyn NetworkPolicyDecider>>,
req: Request,
) -> Result<Response, Infallible> {
let app_state = match req.extensions().get::<Arc<AppState>>().cloned() {
Some(state) => state,
None => {
error!("missing app state");
return Ok(text_response(StatusCode::INTERNAL_SERVER_ERROR, "error"));
}
};
let client = client_addr(&ctx);
let client = client_addr(&req);
let method_allowed = match app_state.method_allowed(req.method().as_str()).await {
Ok(allowed) => allowed,
@@ -263,7 +289,7 @@ where
Ok(true) => {
let client = client.as_deref().unwrap_or_default();
info!("unix socket allowed (client={client}, path={socket_path})");
match proxy_via_unix_socket(ctx, req, &socket_path).await {
match proxy_via_unix_socket(req, &socket_path).await {
Ok(resp) => return Ok(resp),
Err(err) => {
warn!("unix socket proxy failed: {err}");
@@ -286,21 +312,28 @@ where
}
}
let authority = match ctx
.get_or_try_insert_with_ctx::<RequestContext, _>(|ctx| (ctx, &req).try_into())
.map(|ctx| ctx.authority.clone())
{
let authority = match RequestContext::try_from(&req).map(|ctx| ctx.host_with_port()) {
Ok(authority) => authority,
Err(err) => {
warn!("missing host: {err}");
return Ok(text_response(StatusCode::BAD_REQUEST, "missing host"));
}
};
let host = normalize_host(&authority.host().to_string());
let port = authority.port();
let host = normalize_host(&authority.host.to_string());
let port = authority.port;
match app_state.host_blocked(&host, port).await {
Ok((true, reason)) => {
let request = NetworkPolicyRequest::new(
NetworkProtocol::Http,
host.clone(),
port,
client.clone(),
Some(req.method().as_str().to_string()),
None,
None,
);
match evaluate_host_policy(&app_state, policy_decider.as_ref(), &request).await {
Ok(NetworkDecision::Deny { reason }) => {
let _ = app_state
.record_blocked(BlockedRequest::new(
host.clone(),
@@ -315,7 +348,7 @@ where
warn!("request blocked (client={client}, host={host}, reason={reason})");
return Ok(json_blocked(&host, &reason));
}
Ok((false, _)) => {}
Ok(NetworkDecision::Allow) => {}
Err(err) => {
error!("failed to evaluate host for {host}: {err}");
return Ok(text_response(StatusCode::INTERNAL_SERVER_ERROR, "error"));
@@ -346,7 +379,7 @@ where
info!("request allowed (client={client}, host={host}, method={method})");
let client = EasyHttpWebClient::default();
match client.serve(ctx, req).await {
match client.serve(req).await {
Ok(resp) => Ok(resp),
Err(err) => {
warn!("upstream request failed: {err}");
@@ -355,30 +388,24 @@ where
}
}
async fn proxy_via_unix_socket<S>(
ctx: Context<S>,
req: Request,
socket_path: &str,
) -> Result<Response>
where
S: Clone + Send + Sync + 'static,
{
async fn proxy_via_unix_socket(req: Request, socket_path: &str) -> Result<Response> {
#[cfg(target_os = "macos")]
{
use rama::unix::client::UnixConnector;
let client = EasyHttpWebClient::builder()
let client = EasyHttpWebClient::connector_builder()
.with_custom_transport_connector(UnixConnector::fixed(socket_path))
.without_tls_proxy_support()
.without_proxy_support()
.without_tls_support()
.build();
.with_default_http_connector()
.build_client();
let (mut parts, body) = req.into_parts();
let path = parts
.uri
.path_and_query()
.map(rama::http::dep::http::uri::PathAndQuery::as_str)
.map(rama::http::uri::PathAndQuery::as_str)
.unwrap_or("/");
parts.uri = path
.parse()
@@ -386,19 +413,20 @@ where
parts.headers.remove("x-unix-socket");
let req = Request::from_parts(parts, body);
Ok(client.serve(ctx, req).await?)
Ok(client.serve(req).await?)
}
#[cfg(not(target_os = "macos"))]
{
let _ = req;
let _ = ctx;
let _ = socket_path;
Err(anyhow::anyhow!("unix sockets not supported"))
}
}
fn client_addr<S>(ctx: &Context<S>) -> Option<String> {
ctx.get::<SocketInfo>()
fn client_addr<T: ExtensionsRef>(input: &T) -> Option<String> {
input
.extensions()
.get::<SocketInfo>()
.map(|info| info.peer_addr().to_string())
}

View File

@@ -3,55 +3,31 @@ mod config;
mod http_proxy;
mod init;
mod mitm;
mod network_policy;
mod policy;
mod proxy;
mod responses;
mod socks5;
mod state;
use crate::state::AppState;
use anyhow::Result;
use clap::Parser;
use clap::Subcommand;
use std::net::SocketAddr;
use std::sync::Arc;
use tracing::warn;
#[derive(Debug, Clone, Parser)]
#[command(name = "codex-network-proxy", about = "Codex network sandbox proxy")]
pub struct Args {
#[command(subcommand)]
pub command: Option<Command>,
}
#[derive(Debug, Clone, Subcommand)]
pub enum Command {
/// Initialize the Codex network proxy directories (e.g. MITM cert paths).
Init,
}
pub use network_policy::NetworkDecision;
pub use network_policy::NetworkPolicyDecider;
pub use network_policy::NetworkPolicyRequest;
pub use network_policy::NetworkProtocol;
pub use proxy::Args;
pub use proxy::Command;
pub use proxy::NetworkProxy;
pub use proxy::NetworkProxyBuilder;
pub use proxy::NetworkProxyHandle;
pub use proxy::run_init;
pub async fn run_main(args: Args) -> Result<()> {
tracing_subscriber::fmt::init();
if let Some(Command::Init) = args.command {
init::run_init()?;
run_init()?;
return Ok(());
}
if cfg!(not(target_os = "macos")) {
warn!("allowUnixSockets is macOS-only; requests will be rejected on this platform");
}
let state = Arc::new(AppState::new().await?);
let runtime = config::resolve_runtime(&state.current_cfg().await?);
let http_addr: SocketAddr = runtime.http_addr;
let socks_addr: SocketAddr = runtime.socks_addr;
let admin_addr: SocketAddr = runtime.admin_addr;
let http_task = http_proxy::run_http_proxy(state.clone(), http_addr);
let socks_task = socks5::run_socks5(state.clone(), socks_addr);
let admin_task = admin::run_admin_api(state.clone(), admin_addr);
tokio::try_join!(http_task, socks_task, admin_task)?;
Ok(())
let proxy = NetworkProxy::from_cli_args(args).await?;
proxy.run().await?.wait().await
}

View File

@@ -1,8 +1,20 @@
use anyhow::Result;
use clap::Parser;
use codex_network_proxy::Args;
use codex_network_proxy::Command;
use codex_network_proxy::NetworkProxy;
use codex_network_proxy::run_init;
#[tokio::main]
async fn main() -> Result<()> {
codex_network_proxy::run_main(Args::parse()).await
tracing_subscriber::fmt::init();
let args = Args::parse();
if let Some(Command::Init) = args.command {
run_init()?;
return Ok(());
}
let proxy = NetworkProxy::from_cli_args(args).await?;
proxy.run().await?.wait().await
}

View File

@@ -8,12 +8,12 @@ use crate::state::BlockedRequest;
use anyhow::Context as _;
use anyhow::Result;
use anyhow::anyhow;
use rama::Context;
use rama::Layer;
use rama::Service;
use rama::bytes::Bytes;
use rama::error::BoxError;
use rama::error::OpaqueError;
use rama::extensions::ExtensionsRef;
use rama::futures::stream::Stream;
use rama::http::Body;
use rama::http::HeaderValue;
@@ -29,6 +29,7 @@ use rama::http::layer::upgrade::Upgraded;
use rama::http::server::HttpServer;
use rama::net::proxy::ProxyTarget;
use rama::net::stream::SocketInfo;
use rama::rt::Executor;
use rama::service::service_fn;
use rama::tls::rustls::dep::pki_types::CertificateDer;
use rama::tls::rustls::dep::pki_types::PrivateKeyDer;
@@ -63,7 +64,7 @@ use rcgen_rama::SanType;
pub struct MitmState {
issuer: Issuer<'static, KeyPair>,
upstream: rama::service::BoxService<(), Request, Response, OpaqueError>,
upstream: rama::service::BoxService<Request, Response, OpaqueError>,
inspect: bool,
max_body_bytes: usize,
}
@@ -88,16 +89,17 @@ impl MitmState {
let issuer: Issuer<'static, KeyPair> =
Issuer::from_ca_cert_pem(&ca_cert_pem, ca_key).context("failed to parse CA cert")?;
let tls_config = rama::tls::rustls::client::TlsConnectorData::new_http_auto()
let tls_config = rama::tls::rustls::client::TlsConnectorData::try_new_http_auto()
.context("create upstream TLS config")?;
let upstream: rama::service::BoxService<(), Request, Response, OpaqueError> =
EasyHttpWebClient::builder()
let upstream: rama::service::BoxService<Request, Response, OpaqueError> =
EasyHttpWebClient::connector_builder()
// Use a direct transport connector (no upstream proxy) to avoid proxy loops.
.with_default_transport_connector()
.without_tls_proxy_support()
.without_proxy_support()
.with_tls_support_using_rustls(Some(tls_config))
.build()
.with_default_http_connector()
.build_client()
.boxed();
Ok(Self {
@@ -135,23 +137,26 @@ impl MitmState {
}
}
pub async fn mitm_tunnel<S>(ctx: Context<S>, upgraded: Upgraded) -> Result<()>
where
S: Clone + Send + Sync + 'static,
{
let state = ctx
pub async fn mitm_tunnel(upgraded: Upgraded) -> Result<()> {
let state = upgraded
.extensions()
.get::<Arc<MitmState>>()
.cloned()
.context("missing MITM state")?;
let target = ctx
let target = upgraded
.extensions()
.get::<ProxyTarget>()
.context("missing proxy target")?
.0
.clone();
let host = normalize_host(&target.host().to_string());
let host = normalize_host(&target.host.to_string());
let acceptor_data = state.tls_acceptor_data_for_host(&host)?;
let executor = ctx.executor().clone();
let executor = upgraded
.extensions()
.get::<Executor>()
.cloned()
.unwrap_or_default();
let http_service = HttpServer::auto(executor).service(
(
@@ -166,20 +171,14 @@ where
.into_layer(http_service);
https_service
.serve(ctx, upgraded)
.serve(upgraded)
.await
.map_err(|err| anyhow!("MITM serve error: {err}"))?;
Ok(())
}
async fn handle_mitm_request<S>(
ctx: Context<S>,
req: Request,
) -> Result<Response, std::convert::Infallible>
where
S: Clone + Send + Sync + 'static,
{
let response = match forward_request(ctx, req).await {
async fn handle_mitm_request(req: Request) -> Result<Response, std::convert::Infallible> {
let response = match forward_request(req).await {
Ok(resp) => resp,
Err(err) => {
warn!("MITM upstream request failed: {err}");
@@ -189,27 +188,28 @@ where
Ok(response)
}
async fn forward_request<S>(ctx: Context<S>, req: Request) -> Result<Response>
where
S: Clone + Send + Sync + 'static,
{
let target = ctx
async fn forward_request(req: Request) -> Result<Response> {
let target = req
.extensions()
.get::<ProxyTarget>()
.context("missing proxy target")?
.0
.clone();
let target_host = normalize_host(&target.host().to_string());
let target_port = target.port();
let mode = ctx
let target_host = normalize_host(&target.host.to_string());
let target_port = target.port;
let mode = req
.extensions()
.get::<NetworkMode>()
.copied()
.unwrap_or(NetworkMode::Full);
let mitm = ctx
let mitm = req
.extensions()
.get::<Arc<MitmState>>()
.cloned()
.context("missing MITM state")?;
let app_state = ctx
let app_state = req
.extensions()
.get::<Arc<AppState>>()
.cloned()
.context("missing app state")?;
@@ -223,7 +223,8 @@ where
let method = req.method().as_str().to_string();
let path = path_and_query(req.uri());
let client = ctx
let client = req
.extensions()
.get::<SocketInfo>()
.map(|info| info.peer_addr().to_string());
@@ -276,10 +277,7 @@ where
};
let upstream_req = Request::from_parts(parts, body);
let upstream_resp = mitm
.upstream
.serve(ctx.map_state(|_| ()), upstream_req)
.await?;
let upstream_resp = mitm.upstream.serve(upstream_req).await?;
respond_with_inspection(
upstream_resp,
inspect,
@@ -428,7 +426,7 @@ fn build_https_uri(authority: &str, path: &str) -> Result<Uri> {
fn path_and_query(uri: &Uri) -> String {
uri.path_and_query()
.map(rama::http::dep::http::uri::PathAndQuery::as_str)
.map(rama::http::uri::PathAndQuery::as_str)
.unwrap_or("/")
.to_string()
}

View File

@@ -0,0 +1,113 @@
use crate::state::AppState;
use anyhow::Result;
use async_trait::async_trait;
use std::future::Future;
use std::sync::Arc;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum NetworkProtocol {
Http,
HttpsConnect,
Socks5Tcp,
Socks5Udp,
}
#[derive(Clone, Debug)]
pub struct NetworkPolicyRequest {
pub protocol: NetworkProtocol,
pub host: String,
pub port: u16,
pub client_addr: Option<String>,
pub method: Option<String>,
pub command: Option<String>,
pub exec_policy_hint: Option<String>,
}
impl NetworkPolicyRequest {
#[must_use]
pub fn new(
protocol: NetworkProtocol,
host: String,
port: u16,
client_addr: Option<String>,
method: Option<String>,
command: Option<String>,
exec_policy_hint: Option<String>,
) -> Self {
Self {
protocol,
host,
port,
client_addr,
method,
command,
exec_policy_hint,
}
}
}
#[derive(Clone, Debug)]
pub enum NetworkDecision {
Allow,
Deny { reason: String },
}
impl NetworkDecision {
#[must_use]
pub fn deny(reason: impl Into<String>) -> Self {
let reason = reason.into();
let reason = if reason.is_empty() {
"policy_denied".to_string()
} else {
reason
};
Self::Deny { reason }
}
}
/// Decide whether a network request should be allowed.
///
/// If `command` or `exec_policy_hint` is provided, callers can map exec-policy
/// approvals to network access (e.g., allow all requests for commands matching
/// approved prefixes like `curl *`).
#[async_trait]
pub trait NetworkPolicyDecider: Send + Sync + 'static {
async fn decide(&self, req: NetworkPolicyRequest) -> NetworkDecision;
}
#[async_trait]
impl<D: NetworkPolicyDecider + ?Sized> NetworkPolicyDecider for Arc<D> {
async fn decide(&self, req: NetworkPolicyRequest) -> NetworkDecision {
(**self).decide(req).await
}
}
#[async_trait]
impl<F, Fut> NetworkPolicyDecider for F
where
F: Fn(NetworkPolicyRequest) -> Fut + Send + Sync + 'static,
Fut: Future<Output = NetworkDecision> + Send,
{
async fn decide(&self, req: NetworkPolicyRequest) -> NetworkDecision {
(self)(req).await
}
}
pub(crate) async fn evaluate_host_policy(
state: &AppState,
decider: Option<&Arc<dyn NetworkPolicyDecider>>,
request: &NetworkPolicyRequest,
) -> Result<NetworkDecision> {
let (blocked, reason) = state.host_blocked(&request.host, request.port).await?;
if !blocked {
return Ok(NetworkDecision::Allow);
}
if reason == "not_allowed"
&& let Some(decider) = decider
{
return Ok(decider.decide(request.clone()).await);
}
Ok(NetworkDecision::deny(reason))
}

View File

@@ -0,0 +1,181 @@
use crate::admin;
use crate::config;
use crate::http_proxy;
use crate::init;
use crate::network_policy::NetworkPolicyDecider;
use crate::socks5;
use crate::state::AppState;
use anyhow::Result;
use clap::Parser;
use clap::Subcommand;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::task::JoinHandle;
use tracing::warn;
#[derive(Debug, Clone, Parser)]
#[command(name = "codex-network-proxy", about = "Codex network sandbox proxy")]
pub struct Args {
#[command(subcommand)]
pub command: Option<Command>,
/// Enable SOCKS5 UDP associate support (default: disabled).
#[arg(long, default_value_t = false)]
pub enable_socks5_udp: bool,
}
#[derive(Debug, Clone, Subcommand)]
pub enum Command {
/// Initialize the Codex network proxy directories (e.g. MITM cert paths).
Init,
}
#[derive(Clone, Default)]
pub struct NetworkProxyBuilder {
state: Option<Arc<AppState>>,
http_addr: Option<SocketAddr>,
socks_addr: Option<SocketAddr>,
admin_addr: Option<SocketAddr>,
policy_decider: Option<Arc<dyn NetworkPolicyDecider>>,
enable_socks5_udp: bool,
}
impl NetworkProxyBuilder {
#[must_use]
pub fn state(mut self, state: Arc<AppState>) -> Self {
self.state = Some(state);
self
}
#[must_use]
pub fn http_addr(mut self, addr: SocketAddr) -> Self {
self.http_addr = Some(addr);
self
}
#[must_use]
pub fn socks_addr(mut self, addr: SocketAddr) -> Self {
self.socks_addr = Some(addr);
self
}
#[must_use]
pub fn admin_addr(mut self, addr: SocketAddr) -> Self {
self.admin_addr = Some(addr);
self
}
#[must_use]
pub fn policy_decider<D>(mut self, decider: D) -> Self
where
D: NetworkPolicyDecider,
{
self.policy_decider = Some(Arc::new(decider));
self
}
#[must_use]
pub fn policy_decider_arc(mut self, decider: Arc<dyn NetworkPolicyDecider>) -> Self {
self.policy_decider = Some(decider);
self
}
#[must_use]
pub fn enable_socks5_udp(mut self, enabled: bool) -> Self {
self.enable_socks5_udp = enabled;
self
}
pub async fn build(self) -> Result<NetworkProxy> {
let state = match self.state {
Some(state) => state,
None => Arc::new(AppState::new().await?),
};
let runtime = config::resolve_runtime(&state.current_cfg().await?);
Ok(NetworkProxy {
state,
http_addr: self.http_addr.unwrap_or(runtime.http_addr),
socks_addr: self.socks_addr.unwrap_or(runtime.socks_addr),
admin_addr: self.admin_addr.unwrap_or(runtime.admin_addr),
policy_decider: self.policy_decider,
enable_socks5_udp: self.enable_socks5_udp,
})
}
}
#[derive(Clone)]
pub struct NetworkProxy {
state: Arc<AppState>,
http_addr: SocketAddr,
socks_addr: SocketAddr,
admin_addr: SocketAddr,
policy_decider: Option<Arc<dyn NetworkPolicyDecider>>,
enable_socks5_udp: bool,
}
impl NetworkProxy {
#[must_use]
pub fn builder() -> NetworkProxyBuilder {
NetworkProxyBuilder::default()
}
pub async fn from_cli_args(args: Args) -> Result<Self> {
let mut builder = Self::builder();
builder = builder.enable_socks5_udp(args.enable_socks5_udp);
builder.build().await
}
pub async fn run(&self) -> Result<NetworkProxyHandle> {
if cfg!(not(target_os = "macos")) {
warn!("allowUnixSockets is macOS-only; requests will be rejected on this platform");
}
let http_task = tokio::spawn(http_proxy::run_http_proxy(
self.state.clone(),
self.http_addr,
self.policy_decider.clone(),
));
let socks_task = tokio::spawn(socks5::run_socks5(
self.state.clone(),
self.socks_addr,
self.policy_decider.clone(),
self.enable_socks5_udp,
));
let admin_task = tokio::spawn(admin::run_admin_api(self.state.clone(), self.admin_addr));
Ok(NetworkProxyHandle {
http_task,
socks_task,
admin_task,
})
}
}
pub struct NetworkProxyHandle {
http_task: JoinHandle<Result<()>>,
socks_task: JoinHandle<Result<()>>,
admin_task: JoinHandle<Result<()>>,
}
impl NetworkProxyHandle {
pub async fn wait(self) -> Result<()> {
self.http_task.await??;
self.socks_task.await??;
self.admin_task.await??;
Ok(())
}
pub async fn shutdown(self) -> Result<()> {
self.http_task.abort();
self.socks_task.abort();
self.admin_task.abort();
let _ = self.http_task.await;
let _ = self.socks_task.await;
let _ = self.admin_task.await;
Ok(())
}
}
pub fn run_init() -> Result<()> {
init::run_init()
}

View File

@@ -1,16 +1,24 @@
use crate::config::NetworkMode;
use crate::network_policy::NetworkDecision;
use crate::network_policy::NetworkPolicyDecider;
use crate::network_policy::NetworkPolicyRequest;
use crate::network_policy::NetworkProtocol;
use crate::network_policy::evaluate_host_policy;
use crate::policy::normalize_host;
use crate::state::AppState;
use crate::state::BlockedRequest;
use anyhow::Context as _;
use anyhow::Result;
use rama::Context;
use rama::Layer;
use rama::Service;
use rama::layer::AddExtensionLayer;
use rama::extensions::ExtensionsRef;
use rama::layer::AddInputExtensionLayer;
use rama::net::stream::SocketInfo;
use rama::proxy::socks5::Socks5Acceptor;
use rama::proxy::socks5::server::DefaultConnector;
use rama::proxy::socks5::server::DefaultUdpRelay;
use rama::proxy::socks5::server::udp::RelayRequest;
use rama::proxy::socks5::server::udp::RelayResponse;
use rama::service::service_fn;
use rama::tcp::client::Request as TcpRequest;
use rama::tcp::client::service::TcpConnector;
@@ -22,7 +30,12 @@ use tracing::error;
use tracing::info;
use tracing::warn;
pub async fn run_socks5(state: Arc<AppState>, addr: SocketAddr) -> Result<()> {
pub async fn run_socks5(
state: Arc<AppState>,
addr: SocketAddr,
policy_decider: Option<Arc<dyn NetworkPolicyDecider>>,
enable_socks5_udp: bool,
) -> Result<()> {
let listener = TcpListener::build()
.bind(addr)
.await
@@ -44,80 +57,193 @@ pub async fn run_socks5(state: Arc<AppState>, addr: SocketAddr) -> Result<()> {
}
let tcp_connector = TcpConnector::default();
let policy_tcp_connector = service_fn(move |ctx: Context<()>, req: TcpRequest| {
let tcp_connector = tcp_connector.clone();
async move {
let app_state = ctx
.get::<Arc<AppState>>()
.cloned()
.ok_or_else(|| io::Error::other("missing state"))?;
let policy_tcp_connector = service_fn({
let policy_decider = policy_decider.clone();
move |req: TcpRequest| {
let tcp_connector = tcp_connector.clone();
let policy_decider = policy_decider.clone();
async move {
let app_state = req
.extensions()
.get::<Arc<AppState>>()
.cloned()
.ok_or_else(|| io::Error::other("missing state"))?;
let host = normalize_host(&req.authority().host().to_string());
let port = req.authority().port();
let client = ctx
.get::<SocketInfo>()
.map(|info| info.peer_addr().to_string());
let host = normalize_host(&req.authority.host.to_string());
let port = req.authority.port;
let client = req
.extensions()
.get::<SocketInfo>()
.map(|info| info.peer_addr().to_string());
match app_state.network_mode().await {
Ok(NetworkMode::Limited) => {
let _ = app_state
.record_blocked(BlockedRequest::new(
host.clone(),
"method_not_allowed".to_string(),
client.clone(),
None,
Some(NetworkMode::Limited),
"socks5".to_string(),
))
.await;
let client = client.as_deref().unwrap_or_default();
warn!(
"SOCKS blocked by method policy (client={client}, host={host}, mode=limited, allowed_methods=GET, HEAD, OPTIONS)"
);
return Err(
io::Error::new(io::ErrorKind::PermissionDenied, "blocked").into()
);
}
Ok(NetworkMode::Full) => {}
Err(err) => {
error!("failed to evaluate method policy: {err}");
return Err(io::Error::other("proxy error").into());
}
}
match app_state.network_mode().await {
Ok(NetworkMode::Limited) => {
let _ = app_state
.record_blocked(BlockedRequest::new(
host.clone(),
"method_not_allowed".to_string(),
client.clone(),
None,
Some(NetworkMode::Limited),
"socks5".to_string(),
))
.await;
let client = client.as_deref().unwrap_or_default();
warn!(
"SOCKS blocked by method policy (client={client}, host={host}, mode=limited, allowed_methods=GET, HEAD, OPTIONS)"
);
return Err(io::Error::new(io::ErrorKind::PermissionDenied, "blocked").into());
}
Ok(NetworkMode::Full) => {}
Err(err) => {
error!("failed to evaluate method policy: {err}");
return Err(io::Error::other("proxy error").into());
let request = NetworkPolicyRequest::new(
NetworkProtocol::Socks5Tcp,
host.clone(),
port,
client.clone(),
None,
None,
None,
);
match evaluate_host_policy(&app_state, policy_decider.as_ref(), &request).await {
Ok(NetworkDecision::Deny { reason }) => {
let _ = app_state
.record_blocked(BlockedRequest::new(
host.clone(),
reason.clone(),
client.clone(),
None,
None,
"socks5".to_string(),
))
.await;
let client = client.as_deref().unwrap_or_default();
warn!("SOCKS blocked (client={client}, host={host}, reason={reason})");
return Err(
io::Error::new(io::ErrorKind::PermissionDenied, "blocked").into()
);
}
Ok(NetworkDecision::Allow) => {
let client = client.as_deref().unwrap_or_default();
info!("SOCKS allowed (client={client}, host={host}, port={port})");
}
Err(err) => {
error!("failed to evaluate host: {err}");
return Err(io::Error::other("proxy error").into());
}
}
tcp_connector.serve(req).await
}
match app_state.host_blocked(&host, port).await {
Ok((true, reason)) => {
let _ = app_state
.record_blocked(BlockedRequest::new(
host.clone(),
reason.clone(),
client.clone(),
None,
None,
"socks5".to_string(),
))
.await;
let client = client.as_deref().unwrap_or_default();
warn!("SOCKS blocked (client={client}, host={host}, reason={reason})");
return Err(io::Error::new(io::ErrorKind::PermissionDenied, "blocked").into());
}
Ok((false, _)) => {
let client = client.as_deref().unwrap_or_default();
info!("SOCKS allowed (client={client}, host={host}, port={port})");
}
Err(err) => {
error!("failed to evaluate host: {err}");
return Err(io::Error::other("proxy error").into());
}
}
tcp_connector.serve(ctx, req).await
}
});
let socks_connector = DefaultConnector::default().with_connector(policy_tcp_connector);
let socks_acceptor = Socks5Acceptor::new().with_connector(socks_connector);
let base = Socks5Acceptor::new().with_connector(socks_connector);
listener
.serve(AddExtensionLayer::new(state).into_layer(socks_acceptor))
.await;
if enable_socks5_udp {
let udp_state = state.clone();
let udp_decider = policy_decider.clone();
let udp_relay = DefaultUdpRelay::default().with_async_inspector(service_fn(
move |request: RelayRequest| {
let udp_state = udp_state.clone();
let udp_decider = udp_decider.clone();
async move {
let RelayRequest {
server_address,
payload,
extensions,
..
} = request;
let host = normalize_host(&server_address.ip_addr.to_string());
let port = server_address.port;
let client = extensions
.get::<SocketInfo>()
.map(|info| info.peer_addr().to_string());
match udp_state.network_mode().await {
Ok(NetworkMode::Limited) => {
let _ = udp_state
.record_blocked(BlockedRequest::new(
host.clone(),
"method_not_allowed".to_string(),
client.clone(),
None,
Some(NetworkMode::Limited),
"socks5-udp".to_string(),
))
.await;
return Ok(RelayResponse {
maybe_payload: None,
extensions,
});
}
Ok(NetworkMode::Full) => {}
Err(err) => {
error!("failed to evaluate method policy: {err}");
return Err(io::Error::other("proxy error"));
}
}
let request = NetworkPolicyRequest::new(
NetworkProtocol::Socks5Udp,
host.clone(),
port,
client.clone(),
None,
None,
None,
);
match evaluate_host_policy(&udp_state, udp_decider.as_ref(), &request).await {
Ok(NetworkDecision::Deny { reason }) => {
let _ = udp_state
.record_blocked(BlockedRequest::new(
host.clone(),
reason.clone(),
client.clone(),
None,
None,
"socks5-udp".to_string(),
))
.await;
let client = client.as_deref().unwrap_or_default();
warn!(
"SOCKS UDP blocked (client={client}, host={host}, reason={reason})"
);
Ok(RelayResponse {
maybe_payload: None,
extensions,
})
}
Ok(NetworkDecision::Allow) => Ok(RelayResponse {
maybe_payload: Some(payload),
extensions,
}),
Err(err) => {
error!("failed to evaluate UDP host: {err}");
Err(io::Error::other("proxy error"))
}
}
}
},
));
let socks_acceptor = base.with_udp_associator(udp_relay);
listener
.serve(AddInputExtensionLayer::new(state).into_layer(socks_acceptor))
.await;
} else {
listener
.serve(AddInputExtensionLayer::new(state).into_layer(base))
.await;
}
Ok(())
}