Compare commits

...

1 Commits

Author SHA1 Message Date
starr-openai
e7fc6966bd Fix remote CI environment routing
Co-authored-by: Codex <noreply@openai.com>
2026-05-11 12:40:01 -07:00
13 changed files with 359 additions and 45 deletions

View File

@@ -937,6 +937,9 @@ async fn find_project_root(
.await
.is_ok()
{
if marker == ".git" && is_ambient_git_marker_dir(ancestor.as_path()) {
continue;
}
return Ok(ancestor);
}
}
@@ -944,6 +947,22 @@ async fn find_project_root(
Ok(cwd.clone())
}
#[cfg(unix)]
fn is_ambient_git_marker_dir(dir: &Path) -> bool {
use std::os::unix::fs::MetadataExt;
dir.parent().is_none()
|| dir.metadata().is_ok_and(|metadata| {
let mode = metadata.mode();
mode & 0o002 != 0 && mode & 0o1000 != 0
})
}
#[cfg(not(unix))]
fn is_ambient_git_marker_dir(dir: &Path) -> bool {
dir.parent().is_none()
}
struct LoadedProjectLayers {
layers: Vec<ConfigLayerEntry>,
startup_warnings: Vec<String>,

View File

@@ -30,6 +30,7 @@ use std::error::Error;
use std::fmt;
use std::io;
use std::path::Component;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use toml::Value as TomlValue;
@@ -408,6 +409,9 @@ async fn find_project_root(
for marker in project_root_markers {
let marker_path = ancestor.join(marker);
match fs.get_metadata(&marker_path, /*sandbox*/ None).await {
Ok(_) if marker == ".git" && is_ambient_git_marker_dir(ancestor.as_path()) => {
continue;
}
Ok(_) => return ancestor,
Err(err) if err.kind() == io::ErrorKind::NotFound => {}
Err(err) => {
@@ -423,6 +427,22 @@ async fn find_project_root(
cwd.clone()
}
#[cfg(unix)]
fn is_ambient_git_marker_dir(dir: &Path) -> bool {
use std::os::unix::fs::MetadataExt;
dir.parent().is_none()
|| dir.metadata().is_ok_and(|metadata| {
let mode = metadata.mode();
mode & 0o002 != 0 && mode & 0o1000 != 0
})
}
#[cfg(not(unix))]
fn is_ambient_git_marker_dir(dir: &Path) -> bool {
dir.parent().is_none()
}
fn dirs_between_project_root_and_cwd(
cwd: &AbsolutePathBuf,
project_root: &AbsolutePathBuf,

View File

@@ -44,7 +44,7 @@ fn project_layers_for_cwd(cwd: &Path) -> Vec<ConfigLayerEntry> {
};
let project_root = cwd_dir
.ancestors()
.find(|ancestor| ancestor.join(".git").exists())
.find(|ancestor| ancestor.join(".git").exists() && !is_ambient_git_marker_dir(ancestor))
.unwrap_or(cwd_dir.as_path())
.to_path_buf();

View File

@@ -70,6 +70,7 @@ use codex_analytics::build_track_events_context;
use codex_async_utils::OrCancelExt;
use codex_features::Feature;
use codex_git_utils::get_git_repo_root;
use codex_git_utils::get_git_repo_root_with_fs;
use codex_hooks::HookEvent;
use codex_hooks::HookEventAfterAgent;
use codex_hooks::HookPayload;
@@ -101,6 +102,7 @@ use codex_protocol::protocol::WarningEvent;
use codex_protocol::user_input::UserInput;
use codex_tools::ToolName;
use codex_tools::filter_request_plugin_install_discoverable_tools_for_client;
use codex_utils_absolute_path::AbsolutePathBuf;
use codex_utils_stream_parser::AssistantTextChunk;
use codex_utils_stream_parser::AssistantTextStreamParser;
use codex_utils_stream_parser::ProposedPlanSegment;
@@ -367,8 +369,13 @@ pub(crate) async fn run_turn(
let mut stop_hook_active = false;
// Although from the perspective of codex.rs, TurnDiffTracker has the lifecycle of a Task which contains
// many turns, from the perspective of the user, it is a single turn.
let display_root = get_git_repo_root(turn_context.cwd.as_path())
.unwrap_or_else(|| turn_context.cwd.clone().into_path_buf());
let display_root = match turn_context.environments.primary_filesystem() {
Some(fs) => get_git_repo_root_with_fs(fs.as_ref(), &turn_context.cwd)
.await
.map(AbsolutePathBuf::into_path_buf),
None => get_git_repo_root(turn_context.cwd.as_path()),
}
.unwrap_or_else(|| turn_context.cwd.clone().into_path_buf());
let turn_diff_tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::with_display_root(
display_root,
)));

View File

@@ -382,6 +382,20 @@ impl UnifiedExecProcessManager {
(Arc::new(process), deferred_network_approval)
}
Err(err) => {
let message = err.to_string();
emit_failed_exec_end_for_unified_exec(
Arc::clone(&context.session),
Arc::clone(&context.turn),
context.call_id.clone(),
request.command.clone(),
cwd,
Some(request.process_id.to_string()),
Arc::new(tokio::sync::Mutex::new(HeadTailBuffer::default())),
String::new(),
message,
Duration::ZERO,
)
.await;
self.release_process_id(request.process_id).await;
return Err(err);
}

View File

@@ -80,6 +80,14 @@ impl ResponseMock {
.iter()
.find_map(|req| req.function_call_output_text(call_id))
}
/// Returns the full matching `function_call_output` item for the provided
/// `call_id`, searching across all captured requests.
pub fn function_call_output(&self, call_id: &str) -> Option<Value> {
self.requests()
.iter()
.find_map(|req| req.maybe_function_call_output(call_id))
}
}
#[derive(Debug, Clone)]
@@ -209,6 +217,10 @@ impl ResponsesRequest {
self.call_output(call_id, "function_call_output")
}
pub fn maybe_function_call_output(&self, call_id: &str) -> Option<Value> {
self.maybe_call_output(call_id, "function_call_output")
}
pub fn custom_tool_call_output(&self, call_id: &str) -> Value {
self.call_output(call_id, "custom_tool_call_output")
}
@@ -218,13 +230,18 @@ impl ResponsesRequest {
}
pub fn call_output(&self, call_id: &str, call_type: &str) -> Value {
self.maybe_call_output(call_id, call_type)
.unwrap_or_else(|| panic!("function call output {call_id} item not found in request"))
}
pub fn maybe_call_output(&self, call_id: &str, call_type: &str) -> Option<Value> {
self.input()
.iter()
.find(|item| {
item.get("type").unwrap() == call_type && item.get("call_id").unwrap() == call_id
item.get("type").and_then(Value::as_str) == Some(call_type)
&& item.get("call_id").and_then(Value::as_str) == Some(call_id)
})
.cloned()
.unwrap_or_else(|| panic!("function call output {call_id} item not found in request"))
}
/// Returns true if this request's `input` contains a `function_call` with
@@ -1469,6 +1486,24 @@ pub async fn mount_function_call_agent_response(
/// POST to `/v1/responses`. Panics if more requests are received than bodies
/// provided. Also asserts the exact number of expected calls.
pub async fn mount_sse_sequence(server: &MockServer, bodies: Vec<String>) -> ResponseMock {
mount_sse_sequence_with_expectation(server, bodies, /*expect_all_calls*/ true).await
}
/// Mounts a sequence of SSE response bodies without verifying that every body
/// was consumed. Tests that need to inspect early Codex errors can use this to
/// avoid masking the real failure with WireMock's drop-time verifier.
pub async fn mount_sse_sequence_no_verify(
server: &MockServer,
bodies: Vec<String>,
) -> ResponseMock {
mount_sse_sequence_with_expectation(server, bodies, /*expect_all_calls*/ false).await
}
async fn mount_sse_sequence_with_expectation(
server: &MockServer,
bodies: Vec<String>,
expect_all_calls: bool,
) -> ResponseMock {
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
@@ -1496,11 +1531,12 @@ pub async fn mount_sse_sequence(server: &MockServer, bodies: Vec<String>) -> Res
};
let (mock, response_mock) = base_mock();
mock.respond_with(responder)
.up_to_n_times(num_calls as u64)
.expect(num_calls as u64)
.mount(server)
.await;
let mock = mock.respond_with(responder).up_to_n_times(num_calls as u64);
if expect_all_calls {
mock.expect(num_calls as u64).mount(server).await;
} else {
mock.mount(server).await;
}
response_mock
}

View File

@@ -395,13 +395,23 @@ impl TestCodexBuilder {
std::env::current_exe()?,
codex_linux_sandbox_exe,
)?;
let environment_manager = Arc::new(
codex_exec_server::EnvironmentManager::create_for_tests(
exec_server_url,
local_runtime_paths,
)
.await,
);
let environment_manager = match exec_server_url {
Some(exec_server_url) => {
codex_exec_server::EnvironmentManager::create_remote_aware_for_tests(
exec_server_url,
local_runtime_paths,
)
.await?
}
None => {
codex_exec_server::EnvironmentManager::create_for_tests(
/*exec_server_url*/ None,
local_runtime_paths,
)
.await
}
};
let environment_manager = Arc::new(environment_manager);
let file_system = test_env.environment().get_filesystem();
let mut workspace_setups = vec![];
swap(&mut self.workspace_setups, &mut workspace_setups);
@@ -703,6 +713,38 @@ impl TestCodex {
.await
}
pub async fn submit_turn_with_environments_no_wait(
&self,
prompt: &str,
environments: Option<Vec<TurnEnvironmentSelection>>,
) -> Result<()> {
let (sandbox_policy, permission_profile) =
turn_permission_fields(PermissionProfile::Disabled, self.config.cwd.as_path());
let session_model = self.session_configured.model.clone();
self.codex
.submit(Op::UserTurn {
environments,
items: vec![UserInput::Text {
text: prompt.into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: self.config.cwd.to_path_buf(),
approval_policy: AskForApproval::Never,
approvals_reviewer: None,
sandbox_policy,
permission_profile,
model: session_model,
effort: None,
summary: None,
service_tier: None,
collaboration_mode: None,
personality: None,
})
.await?;
Ok(())
}
async fn submit_turn_with_permission_profile_context(
&self,
prompt: &str,

View File

@@ -13,6 +13,7 @@ use codex_protocol::permissions::FileSystemPath;
use codex_protocol::permissions::FileSystemSandboxEntry;
use codex_protocol::permissions::FileSystemSandboxPolicy;
use codex_protocol::permissions::NetworkSandboxPolicy;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::TurnEnvironmentSelection;
use codex_utils_absolute_path::AbsolutePathBuf;
use core_test_support::PathBufExt;
@@ -22,7 +23,7 @@ use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_function_call;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::mount_sse_sequence;
use core_test_support::responses::mount_sse_sequence_no_verify;
use core_test_support::responses::sse;
use core_test_support::responses::start_mock_server;
use core_test_support::skip_if_no_network;
@@ -38,6 +39,42 @@ use std::process::Command;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use tempfile::TempDir;
use tokio::time::Duration;
async fn wait_for_function_call_output(
test: &TestCodex,
response_mock: &core_test_support::responses::ResponseMock,
call_id: &str,
) -> Result<String> {
let mut events = Vec::new();
let output = tokio::time::timeout(Duration::from_secs(120), async {
loop {
if let Some(output) = response_mock.function_call_output_text(call_id) {
return Ok(output);
}
tokio::select! {
event = test.codex.next_event() => {
let event = event.context("codex event stream ended while waiting for function_call_output")?;
match &event.msg {
EventMsg::Error(error) => {
anyhow::bail!("turn errored before function_call_output for {call_id}: {}", error.message);
}
EventMsg::TurnComplete(_) => {
anyhow::bail!("turn completed before function_call_output for {call_id}; events: {events:?}");
}
_ => events.push(format!("{:?}", event.msg)),
}
}
_ = tokio::time::sleep(Duration::from_millis(50)) => {}
}
}
})
.await
.with_context(|| {
format!("timed out waiting for function_call_output for {call_id}; events: {events:?}")
})??;
Ok(output)
}
async fn unified_exec_test(server: &wiremock::MockServer) -> Result<TestCodex> {
let mut builder = test_codex().with_config(|config| {
config.use_experimental_unified_exec_tool = true;
@@ -157,7 +194,7 @@ async fn exec_command_routing_output(
arguments: Value,
environments: Option<Vec<TurnEnvironmentSelection>>,
) -> Result<String> {
let response_mock = mount_sse_sequence(
let response_mock = mount_sse_sequence_no_verify(
server,
vec![
sse(vec![
@@ -174,12 +211,12 @@ async fn exec_command_routing_output(
)
.await;
test.submit_turn_with_environments("route exec command", environments)
test.submit_turn_with_environments_no_wait("route exec command", environments)
.await?;
response_mock
.function_call_output_text(call_id)
.with_context(|| format!("missing function_call_output for {call_id}"))
let output = wait_for_function_call_output(test, &response_mock, call_id).await?;
assert_eq!(response_mock.requests().len(), 2);
Ok(output)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
@@ -232,7 +269,7 @@ async fn exec_command_routes_to_selected_remote_environment() -> Result<()> {
"yield_time_ms": 1_000,
"environment_id": REMOTE_ENVIRONMENT_ID,
}),
Some(vec![local_selection, remote_selection]),
Some(vec![remote_selection, local_selection]),
)
.await?;
assert!(

View File

@@ -941,7 +941,7 @@ allow_local_binding = true
Err(err) => panic!("rebuild config layer stack with network requirements: {err}"),
};
});
let test = builder.build_remote_aware(server).await?;
let test = builder.build(server).await?;
assert!(
test.config.permissions.network.is_some(),
"expected managed network proxy config to be present"

View File

@@ -32,7 +32,6 @@ use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_function_call;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::mount_models_once;
use core_test_support::responses::mount_sse_sequence;
use core_test_support::responses::sse;
use core_test_support::responses::start_mock_server;
use core_test_support::skip_if_no_network;
@@ -64,6 +63,41 @@ use wiremock::matchers::body_string_contains;
const VIEW_IMAGE_TURN_COMPLETE_TIMEOUT: Duration = Duration::from_secs(30);
async fn wait_for_function_call_output(
test: &TestCodex,
response_mock: &responses::ResponseMock,
call_id: &str,
) -> anyhow::Result<Value> {
let mut events = Vec::new();
let output = tokio::time::timeout(Duration::from_secs(120), async {
loop {
if let Some(output) = response_mock.function_call_output(call_id) {
return Ok(output);
}
tokio::select! {
event = test.codex.next_event() => {
let event = event.context("codex event stream ended while waiting for function_call_output")?;
match &event.msg {
EventMsg::Error(error) => {
anyhow::bail!("turn errored before function_call_output for {call_id}: {}", error.message);
}
EventMsg::TurnComplete(_) => {
anyhow::bail!("turn completed before function_call_output for {call_id}; events: {events:?}");
}
_ => events.push(format!("{:?}", event.msg)),
}
}
_ = tokio::time::sleep(Duration::from_millis(50)) => {}
}
}
})
.await
.with_context(|| {
format!("timed out waiting for function_call_output for {call_id}; events: {events:?}")
})??;
Ok(output)
}
fn disabled_user_turn(test: &TestCodex, items: Vec<UserInput>, model: String) -> Op {
let (sandbox_policy, permission_profile) =
turn_permission_fields(PermissionProfile::Disabled, test.config.cwd.as_path());
@@ -417,7 +451,7 @@ async fn view_image_routes_to_selected_local_environment() -> anyhow::Result<()>
)
.await?;
let call_id = "call-view-image-local-env";
let response_mock = mount_sse_sequence(
let response_mock = responses::mount_sse_sequence(
&server,
vec![
sse(vec![
@@ -501,18 +535,19 @@ async fn view_image_routes_to_selected_remote_environment() -> anyhow::Result<()
/*sandbox*/ None,
)
.await?;
let png = BASE64_STANDARD.decode(
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mP8/x8AAwMCAO+/p9sAAAAASUVORK5CYII=",
)?;
test.fs()
.write_file(&image_path, png, /*sandbox*/ None)
.write_file(
&image_path,
png_bytes(/*width*/ 1, /*height*/ 1, [255, 0, 0, 255])?,
/*sandbox*/ None,
)
.await?;
let remote_selection = TurnEnvironmentSelection {
environment_id: REMOTE_ENVIRONMENT_ID.to_string(),
cwd: remote_cwd.clone(),
};
let call_id = "call-view-image-multi-env";
let response_mock = mount_sse_sequence(
let response_mock = responses::mount_sse_sequence_no_verify(
&server,
vec![
sse(vec![
@@ -537,21 +572,18 @@ async fn view_image_routes_to_selected_remote_environment() -> anyhow::Result<()
)
.await;
test.submit_turn_with_environments(
test.submit_turn_with_environments_no_wait(
"route view image",
Some(vec![local_selection, remote_selection]),
Some(vec![remote_selection, local_selection]),
)
.await?;
let output = response_mock
.last_request()
.context("missing request containing view_image output")?
.function_call_output(call_id)
.clone();
let output_items = output
.get("output")
.and_then(Value::as_array)
.context("view_image output should be content items")?;
let output = wait_for_function_call_output(&test, &response_mock, call_id).await?;
assert_eq!(response_mock.requests().len(), 2);
let output_value = output.get("output").unwrap_or(&output);
let output_items = output_value.as_array().with_context(|| {
format!("view_image output should be content items, got {output_value}")
})?;
assert_eq!(output_items.len(), 1);
let image_url = output_items[0]
.get("image_url")

View File

@@ -91,6 +91,23 @@ impl EnvironmentManager {
Self::from_default_provider_url(exec_server_url, local_runtime_paths).await
}
/// Builds a test-only manager that keeps the local environment addressable
/// alongside a remote default environment.
pub async fn create_remote_aware_for_tests(
exec_server_url: String,
local_runtime_paths: ExecServerRuntimePaths,
) -> Result<Self, ExecServerError> {
let snapshot = EnvironmentProviderSnapshot {
environments: vec![(
REMOTE_ENVIRONMENT_ID.to_string(),
Environment::remote_inner(exec_server_url, /*local_runtime_paths*/ None),
)],
default: EnvironmentDefault::EnvironmentId(REMOTE_ENVIRONMENT_ID.to_string()),
include_local: true,
};
Self::from_provider_snapshot(snapshot, local_runtime_paths)
}
/// Builds a manager from `CODEX_EXEC_SERVER_URL` and local runtime paths
/// used when creating local filesystem helpers.
pub async fn new(args: EnvironmentManagerArgs) -> Self {
@@ -528,6 +545,34 @@ mod tests {
assert!(!manager.local_environment().is_remote());
}
#[tokio::test]
async fn remote_aware_test_manager_keeps_local_environment_addressable()
-> Result<(), ExecServerError> {
let manager = EnvironmentManager::create_remote_aware_for_tests(
"ws://127.0.0.1:8765".to_string(),
test_runtime_paths(),
)
.await?;
assert_eq!(
manager.default_environment_id(),
Some(REMOTE_ENVIRONMENT_ID)
);
assert!(
manager
.get_environment(REMOTE_ENVIRONMENT_ID)
.expect("remote environment")
.is_remote()
);
assert!(
!manager
.get_environment(LOCAL_ENVIRONMENT_ID)
.expect("local environment")
.is_remote()
);
Ok(())
}
#[tokio::test]
async fn environment_manager_default_environment_caches_environment() {
let manager = EnvironmentManager::default_for_tests();

View File

@@ -38,6 +38,21 @@ pub fn get_git_repo_root(base_dir: &Path) -> Option<PathBuf> {
find_ancestor_git_entry(base).map(|(repo_root, _)| repo_root)
}
/// Return the git repository root for `base_dir` using the provided executor
/// filesystem. This is the remote-environment equivalent of [`get_git_repo_root`].
pub async fn get_git_repo_root_with_fs(
fs: &dyn ExecutorFileSystem,
base_dir: &AbsolutePathBuf,
) -> Option<AbsolutePathBuf> {
let base = match fs.get_metadata(base_dir, /*sandbox*/ None).await {
Ok(metadata) if metadata.is_directory => base_dir.clone(),
_ => base_dir.parent()?,
};
find_ancestor_git_entry_with_fs(fs, &base)
.await
.map(|(repo_root, _)| repo_root)
}
/// Timeout for git commands to prevent freezing on large repositories
const GIT_COMMAND_TIMEOUT: TokioDuration = TokioDuration::from_secs(5);
@@ -760,7 +775,7 @@ fn find_ancestor_git_entry(base_dir: &Path) -> Option<(PathBuf, PathBuf)> {
loop {
let dot_git = dir.join(".git");
if dot_git.exists() {
if dot_git.exists() && !is_ambient_git_marker_dir(&dir) {
return Some((dir, dot_git));
}
@@ -774,13 +789,31 @@ fn find_ancestor_git_entry(base_dir: &Path) -> Option<(PathBuf, PathBuf)> {
None
}
#[cfg(unix)]
fn is_ambient_git_marker_dir(dir: &Path) -> bool {
use std::os::unix::fs::MetadataExt;
dir.parent().is_none()
|| dir.metadata().is_ok_and(|metadata| {
let mode = metadata.mode();
mode & 0o002 != 0 && mode & 0o1000 != 0
})
}
#[cfg(not(unix))]
fn is_ambient_git_marker_dir(dir: &Path) -> bool {
dir.parent().is_none()
}
async fn find_ancestor_git_entry_with_fs(
fs: &dyn ExecutorFileSystem,
base_dir: &AbsolutePathBuf,
) -> Option<(AbsolutePathBuf, AbsolutePathBuf)> {
for dir in base_dir.ancestors() {
let dot_git = dir.join(".git");
if fs.get_metadata(&dot_git, /*sandbox*/ None).await.is_ok() {
if fs.get_metadata(&dot_git, /*sandbox*/ None).await.is_ok()
&& !is_ambient_git_marker_dir(dir.as_path())
{
return Some((dir, dot_git));
}
}
@@ -832,6 +865,34 @@ mod tests {
use super::*;
use pretty_assertions::assert_eq;
#[cfg(unix)]
#[test]
fn get_git_repo_root_ignores_ambient_git_markers() {
use std::fs;
use std::os::unix::fs::PermissionsExt;
let tmp_root = tempfile::tempdir().expect("tempdir");
let mut permissions = fs::metadata(tmp_root.path())
.expect("metadata")
.permissions();
permissions.set_mode(0o1777);
fs::set_permissions(tmp_root.path(), permissions).expect("set sticky permissions");
fs::write(tmp_root.path().join(".git"), "gitdir: fake\n").expect("write .git marker");
let non_repo_cwd = tmp_root.path().join("child");
fs::create_dir_all(&non_repo_cwd).expect("create non-repo cwd");
assert_eq!(get_git_repo_root(&non_repo_cwd), None);
let repo_cwd = tmp_root.path().join("repo").join("nested");
fs::create_dir_all(&repo_cwd).expect("create repo cwd");
fs::write(tmp_root.path().join("repo").join(".git"), "gitdir: fake\n")
.expect("write nested .git marker");
assert_eq!(
get_git_repo_root(&repo_cwd),
Some(tmp_root.path().join("repo"))
);
}
#[test]
fn canonicalize_git_remote_url_normalizes_github_variants() {
for remote in [

View File

@@ -31,6 +31,7 @@ pub use info::default_branch_name;
pub use info::get_git_remote_urls;
pub use info::get_git_remote_urls_assume_git_repo;
pub use info::get_git_repo_root;
pub use info::get_git_repo_root_with_fs;
pub use info::get_has_changes;
pub use info::get_head_commit_hash;
pub use info::git_diff_to_remote;