mirror of
https://github.com/openai/codex.git
synced 2026-04-26 07:35:29 +00:00
Add remote env CI matrix and integration test (#14869)
`CODEX_TEST_REMOTE_ENV` will make `test_codex` start the executor "remotely" (inside a docker container) turning any integration test into remote test.
This commit is contained in:
@@ -1,10 +1,16 @@
|
||||
use std::mem::swap;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::process::Command;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use anyhow::anyhow;
|
||||
use codex_core::CodexAuth;
|
||||
use codex_core::CodexThread;
|
||||
use codex_core::ModelProviderInfo;
|
||||
@@ -14,6 +20,8 @@ use codex_core::config::Config;
|
||||
use codex_core::models_manager::collaboration_mode_presets::CollaborationModesConfig;
|
||||
use codex_core::shell::Shell;
|
||||
use codex_core::shell::get_shell_by_model_provided_path;
|
||||
use codex_exec_server::CreateDirectoryOptions;
|
||||
use codex_exec_server::ExecutorFileSystem;
|
||||
use codex_features::Feature;
|
||||
use codex_protocol::config_types::ServiceTier;
|
||||
use codex_protocol::openai_models::ModelsResponse;
|
||||
@@ -24,10 +32,13 @@ use codex_protocol::protocol::SandboxPolicy;
|
||||
use codex_protocol::protocol::SessionConfiguredEvent;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use serde_json::Value;
|
||||
use tempfile::TempDir;
|
||||
use wiremock::MockServer;
|
||||
|
||||
use crate::RemoteEnvConfig;
|
||||
use crate::get_remote_test_env;
|
||||
use crate::load_default_config_for_test;
|
||||
use crate::responses::WebSocketTestServer;
|
||||
use crate::responses::output_value_to_text;
|
||||
@@ -41,6 +52,254 @@ use wiremock::matchers::path_regex;
|
||||
type ConfigMutator = dyn FnOnce(&mut Config) + Send;
|
||||
type PreBuildHook = dyn FnOnce(&Path) + Send + 'static;
|
||||
const TEST_MODEL_WITH_EXPERIMENTAL_TOOLS: &str = "test-gpt-5.1-codex";
|
||||
const REMOTE_EXEC_SERVER_START_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
const REMOTE_EXEC_SERVER_POLL_INTERVAL: Duration = Duration::from_millis(25);
|
||||
static REMOTE_EXEC_SERVER_INSTANCE_COUNTER: AtomicU64 = AtomicU64::new(0);
|
||||
|
||||
#[derive(Debug)]
|
||||
struct RemoteExecServerProcess {
|
||||
container_name: String,
|
||||
pid: u32,
|
||||
remote_exec_server_path: String,
|
||||
stdout_path: String,
|
||||
cleanup_paths: Vec<String>,
|
||||
}
|
||||
|
||||
impl Drop for RemoteExecServerProcess {
|
||||
fn drop(&mut self) {
|
||||
let cleanup_paths = self.cleanup_paths.join(" ");
|
||||
let cleanup_paths_script = if cleanup_paths.is_empty() {
|
||||
String::new()
|
||||
} else {
|
||||
format!("rm -rf {cleanup_paths}; ")
|
||||
};
|
||||
let script = format!(
|
||||
"if kill -0 {pid} 2>/dev/null; then kill {pid}; fi; {cleanup_paths_script}rm -f {remote_exec_server_path} {stdout_path}",
|
||||
pid = self.pid,
|
||||
cleanup_paths_script = cleanup_paths_script,
|
||||
remote_exec_server_path = self.remote_exec_server_path,
|
||||
stdout_path = self.stdout_path
|
||||
);
|
||||
let _ = docker_command_capture_stdout(["exec", &self.container_name, "sh", "-lc", &script]);
|
||||
}
|
||||
}
|
||||
|
||||
impl RemoteExecServerProcess {
|
||||
fn register_cleanup_path(&mut self, path: &Path) {
|
||||
self.cleanup_paths.push(path.display().to_string());
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct TestEnv {
|
||||
environment: codex_exec_server::Environment,
|
||||
cwd: PathBuf,
|
||||
_local_cwd_temp_dir: Option<TempDir>,
|
||||
_remote_exec_server_process: Option<RemoteExecServerProcess>,
|
||||
}
|
||||
|
||||
impl TestEnv {
|
||||
pub async fn local() -> Result<Self> {
|
||||
let local_cwd_temp_dir = TempDir::new()?;
|
||||
let cwd = local_cwd_temp_dir.path().to_path_buf();
|
||||
let environment =
|
||||
codex_exec_server::Environment::create(/*experimental_exec_server_url*/ None).await?;
|
||||
Ok(Self {
|
||||
environment,
|
||||
cwd,
|
||||
_local_cwd_temp_dir: Some(local_cwd_temp_dir),
|
||||
_remote_exec_server_process: None,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn environment(&self) -> &codex_exec_server::Environment {
|
||||
&self.environment
|
||||
}
|
||||
|
||||
pub fn experimental_exec_server_url(&self) -> Option<&str> {
|
||||
self.environment.experimental_exec_server_url()
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn test_env() -> Result<TestEnv> {
|
||||
match get_remote_test_env() {
|
||||
Some(remote_env) => {
|
||||
let mut remote_process = start_remote_exec_server(&remote_env)?;
|
||||
let remote_ip = remote_container_ip(&remote_env.container_name)?;
|
||||
let websocket_url = rewrite_websocket_host(&remote_process.listen_url, &remote_ip)?;
|
||||
let environment = codex_exec_server::Environment::create(Some(websocket_url)).await?;
|
||||
let cwd = remote_aware_cwd_path();
|
||||
environment
|
||||
.get_filesystem()
|
||||
.create_directory(
|
||||
&absolute_path(&cwd)?,
|
||||
CreateDirectoryOptions { recursive: true },
|
||||
)
|
||||
.await?;
|
||||
remote_process.process.register_cleanup_path(&cwd);
|
||||
Ok(TestEnv {
|
||||
environment,
|
||||
cwd,
|
||||
_local_cwd_temp_dir: None,
|
||||
_remote_exec_server_process: Some(remote_process.process),
|
||||
})
|
||||
}
|
||||
None => TestEnv::local().await,
|
||||
}
|
||||
}
|
||||
|
||||
struct RemoteExecServerStart {
|
||||
process: RemoteExecServerProcess,
|
||||
listen_url: String,
|
||||
}
|
||||
|
||||
fn start_remote_exec_server(remote_env: &RemoteEnvConfig) -> Result<RemoteExecServerStart> {
|
||||
let container_name = remote_env.container_name.as_str();
|
||||
let instance_id = remote_exec_server_instance_id();
|
||||
let remote_exec_server_path = format!("/tmp/codex-exec-server-{instance_id}");
|
||||
let stdout_path = format!("/tmp/codex-exec-server-{instance_id}.stdout");
|
||||
let local_binary = codex_utils_cargo_bin::cargo_bin("codex-exec-server")
|
||||
.context("resolve codex-exec-server binary")?;
|
||||
let local_binary = local_binary.to_string_lossy().to_string();
|
||||
let remote_binary = format!("{container_name}:{remote_exec_server_path}");
|
||||
|
||||
docker_command_success(["cp", &local_binary, &remote_binary])?;
|
||||
docker_command_success([
|
||||
"exec",
|
||||
container_name,
|
||||
"chmod",
|
||||
"+x",
|
||||
&remote_exec_server_path,
|
||||
])?;
|
||||
|
||||
let start_script = format!(
|
||||
"rm -f {stdout_path}; \
|
||||
nohup {remote_exec_server_path} --listen ws://0.0.0.0:0 > {stdout_path} 2>&1 & \
|
||||
echo $!"
|
||||
);
|
||||
let pid_output =
|
||||
docker_command_capture_stdout(["exec", container_name, "sh", "-lc", &start_script])?;
|
||||
let pid = pid_output
|
||||
.trim()
|
||||
.parse::<u32>()
|
||||
.with_context(|| format!("parse remote exec-server PID from {pid_output:?}"))?;
|
||||
|
||||
let listen_url = wait_for_remote_listen_url(container_name, &stdout_path)?;
|
||||
|
||||
Ok(RemoteExecServerStart {
|
||||
process: RemoteExecServerProcess {
|
||||
container_name: container_name.to_string(),
|
||||
pid,
|
||||
remote_exec_server_path,
|
||||
stdout_path,
|
||||
cleanup_paths: Vec::new(),
|
||||
},
|
||||
listen_url,
|
||||
})
|
||||
}
|
||||
|
||||
fn remote_aware_cwd_path() -> PathBuf {
|
||||
PathBuf::from(format!(
|
||||
"/tmp/codex-core-test-cwd-{}",
|
||||
remote_exec_server_instance_id()
|
||||
))
|
||||
}
|
||||
|
||||
fn wait_for_remote_listen_url(container_name: &str, stdout_path: &str) -> Result<String> {
|
||||
let deadline = Instant::now() + REMOTE_EXEC_SERVER_START_TIMEOUT;
|
||||
loop {
|
||||
let line = docker_command_capture_stdout([
|
||||
"exec",
|
||||
container_name,
|
||||
"sh",
|
||||
"-lc",
|
||||
&format!("head -n 1 {stdout_path} 2>/dev/null || true"),
|
||||
])?;
|
||||
let listen_url = line.trim();
|
||||
if listen_url.starts_with("ws://") {
|
||||
return Ok(listen_url.to_string());
|
||||
}
|
||||
|
||||
if Instant::now() >= deadline {
|
||||
return Err(anyhow!(
|
||||
"timed out waiting for remote exec-server listen URL in container `{container_name}` after {REMOTE_EXEC_SERVER_START_TIMEOUT:?}"
|
||||
));
|
||||
}
|
||||
std::thread::sleep(REMOTE_EXEC_SERVER_POLL_INTERVAL);
|
||||
}
|
||||
}
|
||||
|
||||
fn remote_exec_server_instance_id() -> String {
|
||||
let instance = REMOTE_EXEC_SERVER_INSTANCE_COUNTER.fetch_add(1, Ordering::Relaxed);
|
||||
format!("{}-{instance}", std::process::id())
|
||||
}
|
||||
|
||||
fn remote_container_ip(container_name: &str) -> Result<String> {
|
||||
let ip = docker_command_capture_stdout([
|
||||
"inspect",
|
||||
"-f",
|
||||
"{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}",
|
||||
container_name,
|
||||
])?;
|
||||
let ip = ip.trim();
|
||||
if ip.is_empty() {
|
||||
return Err(anyhow!(
|
||||
"container `{container_name}` has no IP address; cannot connect to remote exec-server"
|
||||
));
|
||||
}
|
||||
Ok(ip.to_string())
|
||||
}
|
||||
|
||||
fn rewrite_websocket_host(listen_url: &str, host: &str) -> Result<String> {
|
||||
let Some(address) = listen_url.strip_prefix("ws://") else {
|
||||
return Err(anyhow!(
|
||||
"unexpected websocket listen URL `{listen_url}`; expected ws://IP:PORT"
|
||||
));
|
||||
};
|
||||
let Some((_, port)) = address.rsplit_once(':') else {
|
||||
return Err(anyhow!(
|
||||
"unexpected websocket listen URL `{listen_url}`; expected ws://IP:PORT"
|
||||
));
|
||||
};
|
||||
Ok(format!("ws://{host}:{port}"))
|
||||
}
|
||||
|
||||
fn docker_command_success<const N: usize>(args: [&str; N]) -> Result<()> {
|
||||
let output = Command::new("docker")
|
||||
.args(args)
|
||||
.output()
|
||||
.with_context(|| format!("run docker {:?}", args))?;
|
||||
if !output.status.success() {
|
||||
return Err(anyhow!(
|
||||
"docker {:?} failed: stdout={} stderr={}",
|
||||
args,
|
||||
String::from_utf8_lossy(&output.stdout).trim(),
|
||||
String::from_utf8_lossy(&output.stderr).trim()
|
||||
));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn docker_command_capture_stdout<const N: usize>(args: [&str; N]) -> Result<String> {
|
||||
let output = Command::new("docker")
|
||||
.args(args)
|
||||
.output()
|
||||
.with_context(|| format!("run docker {:?}", args))?;
|
||||
if !output.status.success() {
|
||||
return Err(anyhow!(
|
||||
"docker {:?} failed: stdout={} stderr={}",
|
||||
args,
|
||||
String::from_utf8_lossy(&output.stdout).trim(),
|
||||
String::from_utf8_lossy(&output.stderr).trim()
|
||||
));
|
||||
}
|
||||
String::from_utf8(output.stdout).context("docker stdout must be utf-8")
|
||||
}
|
||||
|
||||
fn absolute_path(path: &Path) -> Result<AbsolutePathBuf> {
|
||||
AbsolutePathBuf::try_from(path.to_path_buf())
|
||||
.map_err(|err| anyhow!("invalid absolute path {}: {err}", path.display()))
|
||||
}
|
||||
|
||||
/// A collection of different ways the model can output an apply_patch call
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
|
||||
@@ -124,6 +383,24 @@ impl TestCodexBuilder {
|
||||
Box::pin(self.build_with_home(server, home, /*resume_from*/ None)).await
|
||||
}
|
||||
|
||||
pub async fn build_remote_aware(
|
||||
&mut self,
|
||||
server: &wiremock::MockServer,
|
||||
) -> anyhow::Result<TestCodex> {
|
||||
let test_env = test_env().await?;
|
||||
let experimental_exec_server_url =
|
||||
test_env.experimental_exec_server_url().map(str::to_owned);
|
||||
let cwd = test_env.cwd.to_path_buf();
|
||||
self.config_mutators.push(Box::new(move |config| {
|
||||
config.experimental_exec_server_url = experimental_exec_server_url;
|
||||
config.cwd = cwd;
|
||||
}));
|
||||
|
||||
let mut test = self.build(server).await?;
|
||||
test._test_env = test_env;
|
||||
Ok(test)
|
||||
}
|
||||
|
||||
pub async fn build_with_streaming_server(
|
||||
&mut self,
|
||||
server: &StreamingSseServer,
|
||||
@@ -176,7 +453,8 @@ impl TestCodexBuilder {
|
||||
) -> anyhow::Result<TestCodex> {
|
||||
let base_url = format!("{}/v1", server.uri());
|
||||
let (config, cwd) = self.prepare_config(base_url, &home).await?;
|
||||
Box::pin(self.build_from_config(config, cwd, home, resume_from)).await
|
||||
Box::pin(self.build_from_config(config, cwd, home, resume_from, TestEnv::local().await?))
|
||||
.await
|
||||
}
|
||||
|
||||
async fn build_with_home_and_base_url(
|
||||
@@ -186,7 +464,8 @@ impl TestCodexBuilder {
|
||||
resume_from: Option<PathBuf>,
|
||||
) -> anyhow::Result<TestCodex> {
|
||||
let (config, cwd) = self.prepare_config(base_url, &home).await?;
|
||||
Box::pin(self.build_from_config(config, cwd, home, resume_from)).await
|
||||
Box::pin(self.build_from_config(config, cwd, home, resume_from, TestEnv::local().await?))
|
||||
.await
|
||||
}
|
||||
|
||||
async fn build_from_config(
|
||||
@@ -195,6 +474,7 @@ impl TestCodexBuilder {
|
||||
cwd: Arc<TempDir>,
|
||||
home: Arc<TempDir>,
|
||||
resume_from: Option<PathBuf>,
|
||||
test_env: TestEnv,
|
||||
) -> anyhow::Result<TestCodex> {
|
||||
let auth = self.auth.clone();
|
||||
let thread_manager = if config.model_catalog.is_some() {
|
||||
@@ -258,6 +538,7 @@ impl TestCodexBuilder {
|
||||
codex: new_conversation.thread,
|
||||
session_configured: new_conversation.session_configured,
|
||||
thread_manager,
|
||||
_test_env: test_env,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -354,6 +635,7 @@ pub struct TestCodex {
|
||||
pub session_configured: SessionConfiguredEvent,
|
||||
pub config: Config,
|
||||
pub thread_manager: Arc<ThreadManager>,
|
||||
_test_env: TestEnv,
|
||||
}
|
||||
|
||||
impl TestCodex {
|
||||
@@ -369,6 +651,14 @@ impl TestCodex {
|
||||
self.cwd_path().join(rel)
|
||||
}
|
||||
|
||||
pub fn executor_environment(&self) -> &TestEnv {
|
||||
&self._test_env
|
||||
}
|
||||
|
||||
pub fn fs(&self) -> Arc<dyn ExecutorFileSystem> {
|
||||
self._test_env.environment().get_filesystem()
|
||||
}
|
||||
|
||||
pub async fn submit_turn(&self, prompt: &str) -> Result<()> {
|
||||
self.submit_turn_with_policies(
|
||||
prompt,
|
||||
@@ -431,7 +721,7 @@ impl TestCodex {
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
cwd: self.cwd.path().to_path_buf(),
|
||||
cwd: self.config.cwd.clone(),
|
||||
approval_policy,
|
||||
sandbox_policy,
|
||||
model: session_model,
|
||||
|
||||
Reference in New Issue
Block a user