mirror of
https://github.com/openai/codex.git
synced 2026-04-25 23:24:55 +00:00
feat: reserve loopback ephemeral listeners for managed proxy (#11269)
Codex may run many per-thread proxy instances, so hardcoded proxy ports are brittle and conflict-prone. The previous "ephemeral" approach still had a race: `build()` read `local_addr()` from temporary listeners and dropped them before `run()` rebound the ports. That left a [TOCTOU](https://en.wikipedia.org/wiki/Time-of-check_to_time-of-use) window where the OS (or another process) could reuse the same port, causing intermittent `EADDRINUSE` and partial proxy startup. Change the managed proxy path to reserve real listener sockets up front and keep them alive until startup: - add `ReservedListeners` on `NetworkProxy` to hold HTTP/SOCKS/admin std listeners allocated during `build()` - in managed mode, bind `127.0.0.1:0` for each listener and carry those bound sockets into `run()` instead of rebinding by address later - add `run_*_with_std_listener` entry points for HTTP, SOCKS5, and admin servers so `run()` can start services from already-reserved sockets - keep static/configured ports only when `managed_by_codex(false)`, including explicit `socks_addr` override support - remove fallback synthetic port allocation and add tests for managed ephemeral loopback binding and unmanaged configured-port behavior This makes managed startup deterministic, avoids port collisions, and preserves the intended distinction between Codex-managed ephemeral ports and externally managed fixed ports.
This commit is contained in:
@@ -10,7 +10,9 @@ use anyhow::Result;
|
||||
use clap::Parser;
|
||||
use std::collections::HashMap;
|
||||
use std::net::SocketAddr;
|
||||
use std::net::TcpListener as StdTcpListener;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::warn;
|
||||
|
||||
@@ -18,14 +20,70 @@ use tracing::warn;
|
||||
#[command(name = "codex-network-proxy", about = "Codex network sandbox proxy")]
|
||||
pub struct Args {}
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
#[derive(Debug)]
|
||||
struct ReservedListeners {
|
||||
http: Mutex<Option<StdTcpListener>>,
|
||||
socks: Mutex<Option<StdTcpListener>>,
|
||||
admin: Mutex<Option<StdTcpListener>>,
|
||||
}
|
||||
|
||||
impl ReservedListeners {
|
||||
fn new(http: StdTcpListener, socks: StdTcpListener, admin: StdTcpListener) -> Self {
|
||||
Self {
|
||||
http: Mutex::new(Some(http)),
|
||||
socks: Mutex::new(Some(socks)),
|
||||
admin: Mutex::new(Some(admin)),
|
||||
}
|
||||
}
|
||||
|
||||
fn take_http(&self) -> Option<StdTcpListener> {
|
||||
let mut guard = self
|
||||
.http
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
guard.take()
|
||||
}
|
||||
|
||||
fn take_socks(&self) -> Option<StdTcpListener> {
|
||||
let mut guard = self
|
||||
.socks
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
guard.take()
|
||||
}
|
||||
|
||||
fn take_admin(&self) -> Option<StdTcpListener> {
|
||||
let mut guard = self
|
||||
.admin
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
guard.take()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct NetworkProxyBuilder {
|
||||
state: Option<Arc<NetworkProxyState>>,
|
||||
http_addr: Option<SocketAddr>,
|
||||
socks_addr: Option<SocketAddr>,
|
||||
admin_addr: Option<SocketAddr>,
|
||||
managed_by_codex: bool,
|
||||
policy_decider: Option<Arc<dyn NetworkPolicyDecider>>,
|
||||
}
|
||||
|
||||
impl Default for NetworkProxyBuilder {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
state: None,
|
||||
http_addr: None,
|
||||
socks_addr: None,
|
||||
admin_addr: None,
|
||||
managed_by_codex: true,
|
||||
policy_decider: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl NetworkProxyBuilder {
|
||||
pub fn state(mut self, state: Arc<NetworkProxyState>) -> Self {
|
||||
self.state = Some(state);
|
||||
@@ -37,11 +95,21 @@ impl NetworkProxyBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn socks_addr(mut self, addr: SocketAddr) -> Self {
|
||||
self.socks_addr = Some(addr);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn admin_addr(mut self, addr: SocketAddr) -> Self {
|
||||
self.admin_addr = Some(addr);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn managed_by_codex(mut self, managed_by_codex: bool) -> Self {
|
||||
self.managed_by_codex = managed_by_codex;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn policy_decider<D>(mut self, decider: D) -> Self
|
||||
where
|
||||
D: NetworkPolicyDecider,
|
||||
@@ -62,12 +130,45 @@ impl NetworkProxyBuilder {
|
||||
)
|
||||
})?;
|
||||
let current_cfg = state.current_cfg().await?;
|
||||
let runtime = config::resolve_runtime(¤t_cfg)?;
|
||||
let (requested_http_addr, requested_socks_addr, requested_admin_addr, reserved_listeners) =
|
||||
if self.managed_by_codex {
|
||||
let (http_listener, socks_listener, admin_listener) =
|
||||
reserve_loopback_ephemeral_listeners()
|
||||
.context("reserve managed loopback proxy listeners")?;
|
||||
let http_addr = http_listener
|
||||
.local_addr()
|
||||
.context("failed to read reserved HTTP proxy address")?;
|
||||
let socks_addr = socks_listener
|
||||
.local_addr()
|
||||
.context("failed to read reserved SOCKS5 proxy address")?;
|
||||
let admin_addr = admin_listener
|
||||
.local_addr()
|
||||
.context("failed to read reserved admin API address")?;
|
||||
(
|
||||
http_addr,
|
||||
socks_addr,
|
||||
admin_addr,
|
||||
Some(Arc::new(ReservedListeners::new(
|
||||
http_listener,
|
||||
socks_listener,
|
||||
admin_listener,
|
||||
))),
|
||||
)
|
||||
} else {
|
||||
let runtime = config::resolve_runtime(¤t_cfg)?;
|
||||
(
|
||||
self.http_addr.unwrap_or(runtime.http_addr),
|
||||
self.socks_addr.unwrap_or(runtime.socks_addr),
|
||||
self.admin_addr.unwrap_or(runtime.admin_addr),
|
||||
None,
|
||||
)
|
||||
};
|
||||
|
||||
// Reapply bind clamping for caller overrides so unix-socket proxying stays loopback-only.
|
||||
let (http_addr, socks_addr, admin_addr) = config::clamp_bind_addrs(
|
||||
self.http_addr.unwrap_or(runtime.http_addr),
|
||||
runtime.socks_addr,
|
||||
self.admin_addr.unwrap_or(runtime.admin_addr),
|
||||
requested_http_addr,
|
||||
requested_socks_addr,
|
||||
requested_admin_addr,
|
||||
¤t_cfg.network,
|
||||
);
|
||||
|
||||
@@ -76,17 +177,33 @@ impl NetworkProxyBuilder {
|
||||
http_addr,
|
||||
socks_addr,
|
||||
admin_addr,
|
||||
reserved_listeners,
|
||||
policy_decider: self.policy_decider,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn reserve_loopback_ephemeral_listeners() -> Result<(StdTcpListener, StdTcpListener, StdTcpListener)>
|
||||
{
|
||||
Ok((
|
||||
reserve_loopback_ephemeral_listener().context("reserve HTTP proxy listener")?,
|
||||
reserve_loopback_ephemeral_listener().context("reserve SOCKS5 proxy listener")?,
|
||||
reserve_loopback_ephemeral_listener().context("reserve admin API listener")?,
|
||||
))
|
||||
}
|
||||
|
||||
fn reserve_loopback_ephemeral_listener() -> Result<StdTcpListener> {
|
||||
StdTcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
|
||||
.context("bind loopback ephemeral port")
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct NetworkProxy {
|
||||
state: Arc<NetworkProxyState>,
|
||||
http_addr: SocketAddr,
|
||||
socks_addr: SocketAddr,
|
||||
admin_addr: SocketAddr,
|
||||
reserved_listeners: Option<Arc<ReservedListeners>>,
|
||||
policy_decider: Option<Arc<dyn NetworkPolicyDecider>>,
|
||||
}
|
||||
|
||||
@@ -137,22 +254,64 @@ impl NetworkProxy {
|
||||
warn!("allowUnixSockets is macOS-only; requests will be rejected on this platform");
|
||||
}
|
||||
|
||||
let http_task = tokio::spawn(http_proxy::run_http_proxy(
|
||||
self.state.clone(),
|
||||
self.http_addr,
|
||||
self.policy_decider.clone(),
|
||||
));
|
||||
let reserved_listeners = self.reserved_listeners.as_ref();
|
||||
let http_listener = reserved_listeners.and_then(|listeners| listeners.take_http());
|
||||
let socks_listener = reserved_listeners.and_then(|listeners| listeners.take_socks());
|
||||
let admin_listener = reserved_listeners.and_then(|listeners| listeners.take_admin());
|
||||
|
||||
let http_state = self.state.clone();
|
||||
let http_decider = self.policy_decider.clone();
|
||||
let http_addr = self.http_addr;
|
||||
let http_task = tokio::spawn(async move {
|
||||
match http_listener {
|
||||
Some(listener) => {
|
||||
http_proxy::run_http_proxy_with_std_listener(http_state, listener, http_decider)
|
||||
.await
|
||||
}
|
||||
None => http_proxy::run_http_proxy(http_state, http_addr, http_decider).await,
|
||||
}
|
||||
});
|
||||
|
||||
let socks_task = if current_cfg.network.enable_socks5 {
|
||||
Some(tokio::spawn(socks5::run_socks5(
|
||||
self.state.clone(),
|
||||
self.socks_addr,
|
||||
self.policy_decider.clone(),
|
||||
current_cfg.network.enable_socks5_udp,
|
||||
)))
|
||||
let socks_state = self.state.clone();
|
||||
let socks_decider = self.policy_decider.clone();
|
||||
let socks_addr = self.socks_addr;
|
||||
let enable_socks5_udp = current_cfg.network.enable_socks5_udp;
|
||||
Some(tokio::spawn(async move {
|
||||
match socks_listener {
|
||||
Some(listener) => {
|
||||
socks5::run_socks5_with_std_listener(
|
||||
socks_state,
|
||||
listener,
|
||||
socks_decider,
|
||||
enable_socks5_udp,
|
||||
)
|
||||
.await
|
||||
}
|
||||
None => {
|
||||
socks5::run_socks5(
|
||||
socks_state,
|
||||
socks_addr,
|
||||
socks_decider,
|
||||
enable_socks5_udp,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let admin_task = tokio::spawn(admin::run_admin_api(self.state.clone(), self.admin_addr));
|
||||
let admin_state = self.state.clone();
|
||||
let admin_addr = self.admin_addr;
|
||||
let admin_task = tokio::spawn(async move {
|
||||
match admin_listener {
|
||||
Some(listener) => {
|
||||
admin::run_admin_api_with_std_listener(admin_state, listener).await
|
||||
}
|
||||
None => admin::run_admin_api(admin_state, admin_addr).await,
|
||||
}
|
||||
});
|
||||
|
||||
Ok(NetworkProxyHandle {
|
||||
http_task: Some(http_task),
|
||||
@@ -241,3 +400,66 @@ impl Drop for NetworkProxyHandle {
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::config::NetworkProxySettings;
|
||||
use crate::state::network_proxy_state_for_policy;
|
||||
|
||||
#[tokio::test]
|
||||
async fn managed_proxy_builder_uses_loopback_ephemeral_ports() {
|
||||
let state = Arc::new(network_proxy_state_for_policy(
|
||||
NetworkProxySettings::default(),
|
||||
));
|
||||
let proxy = match NetworkProxy::builder().state(state).build().await {
|
||||
Ok(proxy) => proxy,
|
||||
Err(err) => {
|
||||
if err
|
||||
.chain()
|
||||
.any(|cause| cause.to_string().contains("Operation not permitted"))
|
||||
{
|
||||
return;
|
||||
}
|
||||
panic!("failed to build managed proxy: {err:#}");
|
||||
}
|
||||
};
|
||||
|
||||
assert!(proxy.http_addr.ip().is_loopback());
|
||||
assert!(proxy.socks_addr.ip().is_loopback());
|
||||
assert!(proxy.admin_addr.ip().is_loopback());
|
||||
assert_ne!(proxy.http_addr.port(), 0);
|
||||
assert_ne!(proxy.socks_addr.port(), 0);
|
||||
assert_ne!(proxy.admin_addr.port(), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn non_codex_managed_proxy_builder_uses_configured_ports() {
|
||||
let settings = NetworkProxySettings {
|
||||
proxy_url: "http://127.0.0.1:43128".to_string(),
|
||||
socks_url: "http://127.0.0.1:48081".to_string(),
|
||||
admin_url: "http://127.0.0.1:48080".to_string(),
|
||||
..NetworkProxySettings::default()
|
||||
};
|
||||
let state = Arc::new(network_proxy_state_for_policy(settings));
|
||||
let proxy = NetworkProxy::builder()
|
||||
.state(state)
|
||||
.managed_by_codex(false)
|
||||
.build()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
proxy.http_addr,
|
||||
"127.0.0.1:43128".parse::<SocketAddr>().unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
proxy.socks_addr,
|
||||
"127.0.0.1:48081".parse::<SocketAddr>().unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
proxy.admin_addr,
|
||||
"127.0.0.1:48080".parse::<SocketAddr>().unwrap()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user