Files
codex/codex-rs/exec-server/src/environment_toml.rs
starr-openai faa5d4a5e2 Increase exec-server environment transport timeouts (#21825)
## Why

The environment-backed exec-server transport currently hardcodes 5
second connect and initialize timeouts in `client_transport.rs`. That is
short for SSH-backed stdio environments and remote websocket
environments, and there is currently no way to raise those values from
`CODEX_HOME/environments.toml`.

This stacked follow-up raises the default environment transport timeouts
and lets each configured environment override them in
`environments.toml`.

## What Changed

- raise the default environment transport connect and initialize
timeouts from 5s to 10s
- store concrete timeout values on `ExecServerTransportParams` instead
of hardcoding them in `connect_for_transport(...)`
- add `connect_timeout_sec` and `initialize_timeout_sec` to
`[[environments]]` entries in `environments.toml`
- apply parse-time defaults so runtime transport code receives fully
resolved timeout values
- reject `connect_timeout_sec` on stdio environments because it only
applies to websocket transports
- extend parser tests to cover the new fields and defaults

## Stack

- base: https://github.com/openai/codex/pull/21794
- this PR: configurable environment transport timeouts

## Validation

- `cd
/Users/starr/code/codex-worktrees/exec-env-timeouts-config-20260508/codex-rs
&& just fmt`
- not run: tests

---------

Co-authored-by: Codex <noreply@openai.com>
2026-05-08 16:33:29 -07:00

802 lines
26 KiB
Rust

use std::collections::HashMap;
use std::collections::HashSet;
use std::path::Path;
use std::path::PathBuf;
use std::time::Duration;
use async_trait::async_trait;
use serde::Deserialize;
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
use crate::DefaultEnvironmentProvider;
use crate::Environment;
use crate::EnvironmentProvider;
use crate::ExecServerError;
use crate::client_api::DEFAULT_REMOTE_EXEC_SERVER_CONNECT_TIMEOUT;
use crate::client_api::DEFAULT_REMOTE_EXEC_SERVER_INITIALIZE_TIMEOUT;
use crate::client_api::ExecServerTransportParams;
use crate::client_api::StdioExecServerCommand;
use crate::environment::LOCAL_ENVIRONMENT_ID;
use crate::environment_provider::EnvironmentDefault;
use crate::environment_provider::EnvironmentProviderSnapshot;
const ENVIRONMENTS_TOML_FILE: &str = "environments.toml";
const MAX_ENVIRONMENT_ID_LEN: usize = 64;
#[derive(Deserialize, Debug, Default)]
#[serde(deny_unknown_fields)]
struct EnvironmentsToml {
default: Option<String>,
#[serde(default)]
environments: Vec<EnvironmentToml>,
}
#[derive(Deserialize, Debug, Default, PartialEq, Eq)]
#[serde(deny_unknown_fields)]
struct EnvironmentToml {
id: String,
url: Option<String>,
program: Option<String>,
args: Option<Vec<String>>,
env: Option<HashMap<String, String>>,
cwd: Option<PathBuf>,
#[serde(default, with = "option_duration_secs")]
connect_timeout_sec: Option<Duration>,
#[serde(default, with = "option_duration_secs")]
initialize_timeout_sec: Option<Duration>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
struct TomlEnvironmentProvider {
default: EnvironmentDefault,
environments: Vec<(String, ExecServerTransportParams)>,
}
impl TomlEnvironmentProvider {
#[cfg(test)]
fn new(config: EnvironmentsToml) -> Result<Self, ExecServerError> {
Self::new_with_config_dir(config, /*config_dir*/ None)
}
fn new_with_config_dir(
config: EnvironmentsToml,
config_dir: Option<&Path>,
) -> Result<Self, ExecServerError> {
let mut ids = HashSet::from([LOCAL_ENVIRONMENT_ID.to_string()]);
let mut environments = Vec::with_capacity(config.environments.len());
for item in config.environments {
let (id, transport) = parse_environment_toml(item, config_dir)?;
if !ids.insert(id.clone()) {
return Err(ExecServerError::Protocol(format!(
"environment id `{id}` is duplicated"
)));
}
environments.push((id, transport));
}
let default = normalize_default_environment_id(config.default.as_deref(), &ids)?;
Ok(Self {
default,
environments,
})
}
}
#[async_trait]
impl EnvironmentProvider for TomlEnvironmentProvider {
async fn snapshot(&self) -> Result<EnvironmentProviderSnapshot, ExecServerError> {
let mut environments = Vec::with_capacity(self.environments.len());
for (id, transport_params) in &self.environments {
environments.push((
id.clone(),
Environment::remote_with_transport(
transport_params.clone(),
/*local_runtime_paths*/ None,
),
));
}
Ok(EnvironmentProviderSnapshot {
environments,
default: self.default.clone(),
include_local: true,
})
}
}
fn parse_environment_toml(
item: EnvironmentToml,
config_dir: Option<&Path>,
) -> Result<(String, ExecServerTransportParams), ExecServerError> {
let EnvironmentToml {
id,
url,
program,
args,
env,
cwd,
connect_timeout_sec,
initialize_timeout_sec,
} = item;
validate_environment_id(&id)?;
if program.is_none() && (args.is_some() || env.is_some() || cwd.is_some()) {
return Err(ExecServerError::Protocol(format!(
"environment `{id}` args, env, and cwd require program"
)));
}
if url.is_none() && connect_timeout_sec.is_some() {
return Err(ExecServerError::Protocol(format!(
"environment `{id}` connect_timeout_sec requires url"
)));
}
let connect_timeout = connect_timeout_sec.unwrap_or(DEFAULT_REMOTE_EXEC_SERVER_CONNECT_TIMEOUT);
let initialize_timeout =
initialize_timeout_sec.unwrap_or(DEFAULT_REMOTE_EXEC_SERVER_INITIALIZE_TIMEOUT);
let transport_params = match (url, program) {
(Some(url), None) => {
let url = validate_websocket_url(url)?;
ExecServerTransportParams::WebSocketUrl {
websocket_url: url,
connect_timeout,
initialize_timeout,
}
}
(None, Some(program)) => {
let program = program.trim().to_string();
if program.is_empty() {
return Err(ExecServerError::Protocol(format!(
"environment `{id}` program cannot be empty"
)));
}
let cwd = normalize_stdio_cwd(&id, cwd, config_dir)?;
ExecServerTransportParams::StdioCommand {
command: StdioExecServerCommand {
program,
args: args.unwrap_or_default(),
env: env.unwrap_or_default(),
cwd,
},
initialize_timeout,
}
}
(None, None) | (Some(_), Some(_)) => {
return Err(ExecServerError::Protocol(format!(
"environment `{id}` must set exactly one of url or program"
)));
}
};
Ok((id, transport_params))
}
fn normalize_stdio_cwd(
id: &str,
cwd: Option<PathBuf>,
config_dir: Option<&Path>,
) -> Result<Option<PathBuf>, ExecServerError> {
let Some(cwd) = cwd else {
return Ok(None);
};
if cwd.is_absolute() {
return Ok(Some(cwd));
}
let Some(config_dir) = config_dir else {
return Err(ExecServerError::Protocol(format!(
"environment `{id}` cwd must be absolute"
)));
};
Ok(Some(config_dir.join(cwd)))
}
pub(crate) fn environment_provider_from_codex_home(
codex_home: &Path,
) -> Result<Box<dyn EnvironmentProvider>, ExecServerError> {
let path = codex_home.join(ENVIRONMENTS_TOML_FILE);
if !path.try_exists().map_err(|err| {
ExecServerError::Protocol(format!(
"failed to inspect environment config `{}`: {err}",
path.display()
))
})? {
return Ok(Box::new(DefaultEnvironmentProvider::from_env()));
}
let environments = load_environments_toml(&path)?;
Ok(Box::new(TomlEnvironmentProvider::new_with_config_dir(
environments,
Some(codex_home),
)?))
}
fn normalize_default_environment_id(
default: Option<&str>,
ids: &HashSet<String>,
) -> Result<EnvironmentDefault, ExecServerError> {
let Some(default) = default.map(str::trim) else {
return Ok(EnvironmentDefault::EnvironmentId(
LOCAL_ENVIRONMENT_ID.to_string(),
));
};
if default.is_empty() {
return Err(ExecServerError::Protocol(
"default environment id cannot be empty".to_string(),
));
}
if !default.eq_ignore_ascii_case("none") && !ids.contains(default) {
return Err(ExecServerError::Protocol(format!(
"default environment `{default}` is not configured"
)));
}
if default.eq_ignore_ascii_case("none") {
Ok(EnvironmentDefault::Disabled)
} else {
Ok(EnvironmentDefault::EnvironmentId(default.to_string()))
}
}
fn validate_environment_id(id: &str) -> Result<(), ExecServerError> {
let trimmed_id = id.trim();
if trimmed_id.is_empty() {
return Err(ExecServerError::Protocol(
"environment id cannot be empty".to_string(),
));
}
if trimmed_id != id {
return Err(ExecServerError::Protocol(format!(
"environment id `{id}` must not contain surrounding whitespace"
)));
}
if id == LOCAL_ENVIRONMENT_ID || id.eq_ignore_ascii_case("none") {
return Err(ExecServerError::Protocol(format!(
"environment id `{id}` is reserved"
)));
}
if id.len() > MAX_ENVIRONMENT_ID_LEN {
return Err(ExecServerError::Protocol(format!(
"environment id `{id}` cannot be longer than {MAX_ENVIRONMENT_ID_LEN} characters"
)));
}
if !id
.chars()
.all(|ch| ch.is_ascii_alphanumeric() || ch == '-' || ch == '_')
{
return Err(ExecServerError::Protocol(format!(
"environment id `{id}` must contain only ASCII letters, numbers, '-' or '_'"
)));
}
Ok(())
}
fn validate_websocket_url(url: String) -> Result<String, ExecServerError> {
let url = url.trim();
if url.is_empty() {
return Err(ExecServerError::Protocol(
"environment url cannot be empty".to_string(),
));
}
if !url.starts_with("ws://") && !url.starts_with("wss://") {
return Err(ExecServerError::Protocol(format!(
"environment url `{url}` must use ws:// or wss://"
)));
}
url.into_client_request().map_err(|err| {
ExecServerError::Protocol(format!("environment url `{url}` is invalid: {err}"))
})?;
Ok(url.to_string())
}
fn load_environments_toml(path: &Path) -> Result<EnvironmentsToml, ExecServerError> {
let contents = std::fs::read_to_string(path).map_err(|err| {
ExecServerError::Protocol(format!(
"failed to read environment config `{}`: {err}",
path.display()
))
})?;
toml::from_str(&contents).map_err(|err| {
ExecServerError::Protocol(format!(
"failed to parse environment config `{}`: {err}",
path.display()
))
})
}
mod option_duration_secs {
use std::time::Duration;
use serde::Deserialize;
use serde::Deserializer;
pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<Duration>, D::Error>
where
D: Deserializer<'de>,
{
let secs = Option::<f64>::deserialize(deserializer)?;
secs.map(|secs| Duration::try_from_secs_f64(secs).map_err(serde::de::Error::custom))
.transpose()
}
}
#[cfg(test)]
mod tests {
use pretty_assertions::assert_eq;
use tempfile::tempdir;
use super::*;
#[tokio::test]
async fn toml_provider_includes_local_and_adds_configured_environments() {
let provider = TomlEnvironmentProvider::new(EnvironmentsToml {
default: Some("ssh-dev".to_string()),
environments: vec![
EnvironmentToml {
id: "devbox".to_string(),
url: Some(" ws://127.0.0.1:8765 ".to_string()),
..Default::default()
},
EnvironmentToml {
id: "ssh-dev".to_string(),
program: Some(" ssh ".to_string()),
args: Some(vec![
"dev".to_string(),
"codex exec-server --listen stdio".to_string(),
]),
env: Some(HashMap::from([(
"CODEX_LOG".to_string(),
"debug".to_string(),
)])),
..Default::default()
},
],
})
.expect("provider");
let snapshot = provider.snapshot().await.expect("environments");
let EnvironmentProviderSnapshot {
environments,
default,
include_local,
} = snapshot;
let environment_ids: Vec<_> = environments
.iter()
.map(|(id, _environment)| id.as_str())
.collect();
assert_eq!(environment_ids, vec!["devbox", "ssh-dev"]);
let environments: HashMap<_, _> = environments.into_iter().collect();
assert!(include_local);
assert!(!environments.contains_key(LOCAL_ENVIRONMENT_ID));
assert_eq!(
environments["devbox"].exec_server_url(),
Some("ws://127.0.0.1:8765")
);
assert!(environments["ssh-dev"].is_remote());
assert_eq!(environments["ssh-dev"].exec_server_url(), None);
assert_eq!(
default,
EnvironmentDefault::EnvironmentId("ssh-dev".to_string())
);
}
#[tokio::test]
async fn toml_provider_default_omitted_selects_local() {
let provider = TomlEnvironmentProvider::new(EnvironmentsToml::default()).expect("provider");
let snapshot = provider.snapshot().await.expect("environments");
assert!(snapshot.include_local);
assert_eq!(
snapshot.default,
EnvironmentDefault::EnvironmentId(LOCAL_ENVIRONMENT_ID.to_string())
);
}
#[tokio::test]
async fn toml_provider_default_none_disables_default() {
let provider = TomlEnvironmentProvider::new(EnvironmentsToml {
default: Some("none".to_string()),
environments: Vec::new(),
})
.expect("provider");
let snapshot = provider.snapshot().await.expect("environments");
assert!(snapshot.include_local);
assert_eq!(snapshot.default, EnvironmentDefault::Disabled);
}
#[test]
fn toml_provider_rejects_invalid_environments() {
let cases = [
(
EnvironmentToml {
id: "local".to_string(),
url: Some("ws://127.0.0.1:8765".to_string()),
..Default::default()
},
"environment id `local` is reserved",
),
(
EnvironmentToml {
id: " devbox ".to_string(),
url: Some("ws://127.0.0.1:8765".to_string()),
..Default::default()
},
"environment id ` devbox ` must not contain surrounding whitespace",
),
(
EnvironmentToml {
id: "dev box".to_string(),
url: Some("ws://127.0.0.1:8765".to_string()),
..Default::default()
},
"environment id `dev box` must contain only ASCII letters, numbers, '-' or '_'",
),
(
EnvironmentToml {
id: "devbox".to_string(),
url: Some("http://127.0.0.1:8765".to_string()),
..Default::default()
},
"environment url `http://127.0.0.1:8765` must use ws:// or wss://",
),
(
EnvironmentToml {
id: "devbox".to_string(),
url: Some("ws://127.0.0.1:8765".to_string()),
program: Some("codex".to_string()),
..Default::default()
},
"environment `devbox` must set exactly one of url or program",
),
(
EnvironmentToml {
id: "devbox".to_string(),
program: Some(" ".to_string()),
..Default::default()
},
"environment `devbox` program cannot be empty",
),
(
EnvironmentToml {
id: "devbox".to_string(),
args: Some(Vec::new()),
..Default::default()
},
"environment `devbox` args, env, and cwd require program",
),
(
EnvironmentToml {
id: "ssh-dev".to_string(),
program: Some("ssh".to_string()),
connect_timeout_sec: Some(Duration::from_secs(1)),
..Default::default()
},
"environment `ssh-dev` connect_timeout_sec requires url",
),
];
for (item, expected) in cases {
let err = TomlEnvironmentProvider::new(EnvironmentsToml {
default: None,
environments: vec![item],
})
.expect_err("invalid item should fail");
assert_eq!(
err.to_string(),
format!("exec-server protocol error: {expected}")
);
}
}
#[test]
fn toml_provider_resolves_relative_stdio_cwd_from_config_dir() {
let config_dir = tempdir().expect("tempdir");
let provider = TomlEnvironmentProvider::new_with_config_dir(
EnvironmentsToml {
default: None,
environments: vec![EnvironmentToml {
id: "ssh-dev".to_string(),
program: Some("ssh".to_string()),
cwd: Some(PathBuf::from("workspace")),
..Default::default()
}],
},
Some(config_dir.path()),
)
.expect("provider");
assert_eq!(
provider.environments[0].1,
ExecServerTransportParams::StdioCommand {
command: StdioExecServerCommand {
program: "ssh".to_string(),
args: Vec::new(),
env: HashMap::new(),
cwd: Some(config_dir.path().join("workspace")),
},
initialize_timeout: DEFAULT_REMOTE_EXEC_SERVER_INITIALIZE_TIMEOUT,
}
);
}
#[test]
fn toml_provider_parses_configured_transport_timeouts() {
let provider = TomlEnvironmentProvider::new(EnvironmentsToml {
default: None,
environments: vec![
EnvironmentToml {
id: "devbox".to_string(),
url: Some("ws://127.0.0.1:8765".to_string()),
connect_timeout_sec: Some(Duration::from_secs(12)),
initialize_timeout_sec: Some(Duration::from_secs(34)),
..Default::default()
},
EnvironmentToml {
id: "ssh-dev".to_string(),
program: Some("ssh".to_string()),
initialize_timeout_sec: Some(Duration::from_secs(56)),
..Default::default()
},
],
})
.expect("provider");
assert_eq!(
provider.environments[0].1,
ExecServerTransportParams::WebSocketUrl {
websocket_url: "ws://127.0.0.1:8765".to_string(),
connect_timeout: Duration::from_secs(12),
initialize_timeout: Duration::from_secs(34),
}
);
assert_eq!(
provider.environments[1].1,
ExecServerTransportParams::StdioCommand {
command: StdioExecServerCommand {
program: "ssh".to_string(),
args: Vec::new(),
env: HashMap::new(),
cwd: None,
},
initialize_timeout: Duration::from_secs(56),
}
);
}
#[test]
fn toml_provider_rejects_relative_stdio_cwd_without_config_dir() {
let err = TomlEnvironmentProvider::new(EnvironmentsToml {
default: None,
environments: vec![EnvironmentToml {
id: "ssh-dev".to_string(),
program: Some("ssh".to_string()),
cwd: Some(PathBuf::from("workspace")),
..Default::default()
}],
})
.expect_err("relative cwd without config dir should fail");
assert_eq!(
err.to_string(),
"exec-server protocol error: environment `ssh-dev` cwd must be absolute"
);
}
#[test]
fn toml_provider_rejects_duplicate_ids() {
let err = TomlEnvironmentProvider::new(EnvironmentsToml {
default: None,
environments: vec![
EnvironmentToml {
id: "devbox".to_string(),
url: Some("ws://127.0.0.1:8765".to_string()),
..Default::default()
},
EnvironmentToml {
id: "devbox".to_string(),
program: Some("codex".to_string()),
..Default::default()
},
],
})
.expect_err("duplicate id should fail");
assert_eq!(
err.to_string(),
"exec-server protocol error: environment id `devbox` is duplicated"
);
}
#[test]
fn toml_provider_rejects_overlong_id() {
let id = "a".repeat(MAX_ENVIRONMENT_ID_LEN + 1);
let err = TomlEnvironmentProvider::new(EnvironmentsToml {
default: None,
environments: vec![EnvironmentToml {
id: id.clone(),
url: Some("ws://127.0.0.1:8765".to_string()),
..Default::default()
}],
})
.expect_err("overlong id should fail");
assert_eq!(
err.to_string(),
format!(
"exec-server protocol error: environment id `{id}` cannot be longer than {MAX_ENVIRONMENT_ID_LEN} characters"
)
);
}
#[test]
fn toml_provider_rejects_unknown_default() {
let err = TomlEnvironmentProvider::new(EnvironmentsToml {
default: Some("missing".to_string()),
environments: Vec::new(),
})
.expect_err("unknown default should fail");
assert_eq!(
err.to_string(),
"exec-server protocol error: default environment `missing` is not configured"
);
}
#[test]
fn load_environments_toml_reads_root_environment_list() {
let codex_home = tempdir().expect("tempdir");
let path = codex_home.path().join(ENVIRONMENTS_TOML_FILE);
std::fs::write(
&path,
r#"
default = "ssh-dev"
[[environments]]
id = "devbox"
url = "ws://127.0.0.1:4512"
connect_timeout_sec = 12.0
initialize_timeout_sec = 34.0
[[environments]]
id = "ssh-dev"
program = "ssh"
args = ["dev", "codex exec-server --listen stdio"]
cwd = "/tmp"
[environments.env]
CODEX_LOG = "debug"
"#,
)
.expect("write environments.toml");
let environments = load_environments_toml(&path).expect("environments.toml");
assert_eq!(environments.default.as_deref(), Some("ssh-dev"));
assert_eq!(environments.environments.len(), 2);
assert_eq!(
environments.environments[0],
EnvironmentToml {
id: "devbox".to_string(),
url: Some("ws://127.0.0.1:4512".to_string()),
connect_timeout_sec: Some(Duration::from_secs(12)),
initialize_timeout_sec: Some(Duration::from_secs(34)),
..Default::default()
}
);
assert_eq!(
environments.environments[1],
EnvironmentToml {
id: "ssh-dev".to_string(),
program: Some("ssh".to_string()),
args: Some(vec![
"dev".to_string(),
"codex exec-server --listen stdio".to_string(),
]),
env: Some(HashMap::from([(
"CODEX_LOG".to_string(),
"debug".to_string(),
)])),
cwd: Some(PathBuf::from("/tmp")),
..Default::default()
}
);
}
#[test]
fn load_environments_toml_rejects_unknown_fields() {
let codex_home = tempdir().expect("tempdir");
let cases = [
("unknown = true\n", "unknown field `unknown`"),
(
r#"
[[environments]]
id = "devbox"
url = "ws://127.0.0.1:4512"
unknown = true
"#,
"unknown field `unknown`",
),
];
for (index, (contents, expected)) in cases.into_iter().enumerate() {
let path = codex_home.path().join(format!("environments-{index}.toml"));
std::fs::write(&path, contents).expect("write environments.toml");
let err = load_environments_toml(&path).expect_err("unknown field should fail");
assert!(
err.to_string().contains(expected),
"expected `{err}` to contain `{expected}`"
);
}
}
#[test]
fn toml_provider_rejects_malformed_websocket_url() {
let err = TomlEnvironmentProvider::new(EnvironmentsToml {
default: None,
environments: vec![EnvironmentToml {
id: "devbox".to_string(),
url: Some("ws://".to_string()),
..Default::default()
}],
})
.expect_err("malformed websocket url should fail");
assert!(
err.to_string()
.contains("environment url `ws://` is invalid"),
"expected malformed URL error, got `{err}`"
);
}
#[tokio::test]
async fn environment_provider_from_codex_home_uses_present_environments_file() {
let codex_home = tempdir().expect("tempdir");
std::fs::write(
codex_home.path().join(ENVIRONMENTS_TOML_FILE),
r#"
default = "none"
"#,
)
.expect("write environments.toml");
let provider =
environment_provider_from_codex_home(codex_home.path()).expect("environment provider");
let snapshot = provider.snapshot().await.expect("environments");
let environment_ids: Vec<_> = snapshot
.environments
.into_iter()
.map(|(id, _environment)| id)
.collect();
assert!(snapshot.include_local);
assert!(!environment_ids.contains(&LOCAL_ENVIRONMENT_ID.to_string()));
assert_eq!(snapshot.default, EnvironmentDefault::Disabled);
}
#[tokio::test]
async fn environment_provider_from_codex_home_falls_back_when_file_is_missing() {
let codex_home = tempdir().expect("tempdir");
let provider =
environment_provider_from_codex_home(codex_home.path()).expect("environment provider");
let snapshot = provider.snapshot().await.expect("environments");
let environment_ids: Vec<_> = snapshot
.environments
.into_iter()
.map(|(id, _environment)| id)
.collect();
assert!(snapshot.include_local);
assert!(!environment_ids.contains(&LOCAL_ENVIRONMENT_ID.to_string()));
assert_eq!(
snapshot.default,
EnvironmentDefault::EnvironmentId(LOCAL_ENVIRONMENT_ID.to_string())
);
}
}