Address thread environment review feedback

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
starr-openai
2026-04-16 13:46:45 -07:00
parent 64883048f2
commit 8818b567df
9 changed files with 247 additions and 179 deletions

View File

@@ -2401,7 +2401,7 @@ impl CodexMessageProcessor {
request_trace: Option<W3cTraceContext>,
environment_id: Option<String>,
) {
let requested_cwd = typesafe_overrides.cwd.clone();
let explicit_cwd = typesafe_overrides.cwd.clone();
let mut config = match derive_config_from_params(
&cli_overrides,
config_overrides.clone(),
@@ -2436,7 +2436,7 @@ impl CodexMessageProcessor {
)
);
if requested_cwd.is_some()
if explicit_cwd.is_some()
&& !config.active_project.is_trusted()
&& (requested_sandbox_trusts_project
|| matches!(
@@ -2505,6 +2505,47 @@ impl CodexMessageProcessor {
};
}
if explicit_cwd.is_none() {
match listener_task_context
.thread_manager
.environment_manager()
.default_cwd(environment_id.as_deref())
{
Ok(Some(default_cwd)) => {
config.cwd =
match codex_utils_absolute_path::AbsolutePathBuf::from_absolute_path(
default_cwd,
) {
Ok(default_cwd) => default_cwd,
Err(err) => {
listener_task_context
.outgoing
.send_error(
request_id,
invalid_request(format!(
"failed to resolve environment default cwd {}: {err}",
default_cwd.display()
)),
)
.await;
return;
}
};
}
Ok(None) => {}
Err(err) => {
listener_task_context
.outgoing
.send_error(
request_id,
invalid_request(format!("failed to resolve environment: {err}")),
)
.await;
return;
}
}
}
let instruction_sources = Self::instruction_sources_from_config(&config).await;
let dynamic_tools = dynamic_tools.unwrap_or_default();
let core_dynamic_tools = if dynamic_tools.is_empty() {
@@ -2549,7 +2590,6 @@ impl CodexMessageProcessor {
service_name,
request_trace,
environment_id,
requested_cwd,
)
.instrument(tracing::info_span!(
"app_server.thread_start.create_thread",

View File

@@ -20,7 +20,6 @@ use codex_app_server_protocol::ThreadStatus;
use codex_app_server_protocol::ThreadStatusChangedNotification;
use codex_config::types::AuthCredentialsStoreMode;
use codex_core::config::set_project_trust_level;
use codex_exec_server::ExecServerRuntimePaths;
use codex_git_utils::resolve_root_git_project_for_trust;
use codex_login::REFRESH_TOKEN_URL_OVERRIDE_ENV_VAR;
use codex_protocol::config_types::ServiceTier;
@@ -29,16 +28,10 @@ use codex_protocol::openai_models::ReasoningEffort;
use pretty_assertions::assert_eq;
use serde_json::Value;
use serde_json::json;
use std::net::TcpListener;
use std::path::Path;
use std::path::PathBuf;
use std::time::Duration;
use tempfile::TempDir;
use tokio::task::JoinHandle;
use tokio::time::Instant;
use tokio::time::sleep;
use tokio::time::timeout;
use tokio_tungstenite::connect_async;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
@@ -52,8 +45,6 @@ use super::analytics::wait_for_analytics_payload;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
const CODEX_EXEC_SERVER_URL_ENV_VAR: &str = "CODEX_EXEC_SERVER_URL";
const CODEX_EXEC_SERVER_CWD_ENV_VAR: &str = "CODEX_EXEC_SERVER_CWD";
const EXEC_SERVER_CONNECT_RETRY_INTERVAL: Duration = Duration::from_millis(25);
#[tokio::test]
async fn thread_start_creates_thread_and_emits_started() -> Result<()> {
@@ -205,74 +196,14 @@ async fn thread_start_accepts_explicit_local_environment_when_default_is_remote(
}
#[tokio::test]
async fn thread_start_uses_remote_default_cwd_when_request_omits_cwd() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml_without_approval_policy(codex_home.path(), &server.uri())?;
let exec_server = TestExecServer::spawn().await?;
let remote_default_cwd = TempDir::new()?;
let remote_default_cwd = remote_default_cwd.path().to_string_lossy().into_owned();
let mut mcp = McpProcess::new_with_env(
codex_home.path(),
&[
(
CODEX_EXEC_SERVER_URL_ENV_VAR,
Some(exec_server.websocket_url.as_str()),
),
(
CODEX_EXEC_SERVER_CWD_ENV_VAR,
Some(remote_default_cwd.as_str()),
),
],
)
.await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let req_id = mcp
.send_thread_start_request(ThreadStartParams::default())
.await?;
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(req_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(resp)?;
assert_eq!(thread.cwd.as_path(), Path::new(&remote_default_cwd));
Ok(())
}
#[tokio::test]
async fn thread_start_explicit_cwd_overrides_remote_default_cwd() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml_without_approval_policy(codex_home.path(), &server.uri())?;
let exec_server = TestExecServer::spawn().await?;
let remote_default_cwd = TempDir::new()?;
let remote_default_cwd = remote_default_cwd.path().to_string_lossy().into_owned();
async fn thread_start_uses_explicit_cwd() -> Result<()> {
let requested_cwd = TempDir::new()?;
let requested_cwd = requested_cwd.path().to_string_lossy().into_owned();
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml_without_approval_policy(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new_with_env(
codex_home.path(),
&[
(
CODEX_EXEC_SERVER_URL_ENV_VAR,
Some(exec_server.websocket_url.as_str()),
),
(
CODEX_EXEC_SERVER_CWD_ENV_VAR,
Some(remote_default_cwd.as_str()),
),
],
)
.await?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let req_id = mcp
@@ -422,64 +353,6 @@ fn normalize_path_for_comparison(path: impl AsRef<Path>) -> PathBuf {
path.as_ref().to_path_buf()
}
struct TestExecServer {
websocket_url: String,
task: JoinHandle<()>,
}
impl TestExecServer {
async fn spawn() -> Result<Self> {
let bind_addr = TcpListener::bind("127.0.0.1:0")?.local_addr()?;
let websocket_url = format!("ws://{bind_addr}");
let runtime_paths = ExecServerRuntimePaths::new(
std::env::current_exe()?,
/*codex_linux_sandbox_exe*/ None,
)?;
let listen_url = websocket_url.clone();
let task = tokio::spawn(async move {
codex_exec_server::run_main(&listen_url, runtime_paths)
.await
.expect("test exec-server should run");
});
wait_for_exec_server(websocket_url.as_str()).await?;
Ok(Self {
websocket_url,
task,
})
}
}
impl Drop for TestExecServer {
fn drop(&mut self) {
self.task.abort();
}
}
async fn wait_for_exec_server(websocket_url: &str) -> Result<()> {
let deadline = Instant::now() + DEFAULT_READ_TIMEOUT;
loop {
match connect_async(websocket_url).await {
Ok((mut websocket, _)) => {
websocket.close(None).await?;
return Ok(());
}
Err(err) if Instant::now() < deadline => {
let is_connection_refused = matches!(
err,
tokio_tungstenite::tungstenite::Error::Io(ref io_err)
if io_err.kind() == std::io::ErrorKind::ConnectionRefused
);
if is_connection_refused {
sleep(EXEC_SERVER_CONNECT_RETRY_INTERVAL).await;
continue;
}
return Err(err.into());
}
Err(err) => return Err(err.into()),
}
}
}
#[tokio::test]
async fn thread_start_tracks_thread_initialized_analytics() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;

View File

@@ -430,8 +430,6 @@ pub(crate) struct CodexSpawnArgs {
pub(crate) auth_manager: Arc<AuthManager>,
pub(crate) models_manager: Arc<ModelsManager>,
pub(crate) environment_manager: Arc<EnvironmentManager>,
pub(crate) environment_id: Option<String>,
pub(crate) requested_cwd: Option<PathBuf>,
pub(crate) skills_manager: Arc<SkillsManager>,
pub(crate) plugins_manager: Arc<PluginsManager>,
pub(crate) mcp_manager: Arc<McpManager>,
@@ -485,8 +483,6 @@ impl Codex {
auth_manager,
models_manager,
environment_manager,
environment_id,
requested_cwd,
skills_manager,
plugins_manager,
mcp_manager,
@@ -507,23 +503,9 @@ impl Codex {
let (tx_event, rx_event) = async_channel::unbounded();
let environment = environment_manager
.environment(environment_id.as_deref())
.current()
.await
.map_err(|err| CodexErr::Fatal(format!("failed to create environment: {err}")))?;
if requested_cwd.is_none()
&& let Some(default_cwd) = environment
.as_ref()
.and_then(|environment| environment.default_cwd())
{
config.cwd =
codex_utils_absolute_path::AbsolutePathBuf::from_absolute_path(default_cwd)
.map_err(|err| {
CodexErr::Fatal(format!(
"failed to resolve environment default cwd {}: {err}",
default_cwd.display()
))
})?;
}
let fs = environment
.as_ref()
.map(|environment| environment.get_filesystem());

View File

@@ -82,8 +82,6 @@ pub(crate) async fn run_codex_thread_interactive(
environment_manager: Arc::new(EnvironmentManager::from_environment(
parent_ctx.environment.as_deref(),
)),
environment_id: None,
requested_cwd: None,
skills_manager: Arc::clone(&parent_session.services.skills_manager),
plugins_manager: Arc::clone(&parent_session.services.plugins_manager),
mcp_manager: Arc::clone(&parent_session.services.mcp_manager),

View File

@@ -435,8 +435,6 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() {
auth_manager,
models_manager,
environment_manager: Arc::new(EnvironmentManager::new(/*exec_server_url*/ None)),
environment_id: None,
requested_cwd: None,
skills_manager,
plugins_manager,
mcp_manager,

View File

@@ -3,6 +3,7 @@ use crate::codex::Codex;
use crate::codex::SteerInputError;
use crate::config::ConstraintResult;
use crate::file_watcher::WatchRegistration;
use codex_exec_server::Environment;
use codex_features::Feature;
use codex_protocol::config_types::ApprovalsReviewer;
use codex_protocol::config_types::Personality;
@@ -144,6 +145,10 @@ impl CodexThread {
self.codex.session.total_token_usage().await
}
pub(crate) fn selected_environment(&self) -> Option<std::sync::Arc<Environment>> {
self.codex.session.services.environment.clone()
}
/// Records a user-role session-prefix message without creating a new user turn boundary.
pub(crate) async fn inject_user_message_without_turn(&self, message: String) {
let message = ResponseItem::Message {

View File

@@ -41,6 +41,7 @@ use codex_protocol::protocol::Op;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SessionConfiguredEvent;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnAbortedEvent;
use codex_protocol::protocol::W3cTraceContext;
@@ -489,7 +490,6 @@ impl ThreadManager {
/*metrics_service_name*/ None,
/*parent_trace*/ None,
/*environment_id*/ None,
/*requested_cwd*/ None,
))
.await
}
@@ -503,7 +503,6 @@ impl ThreadManager {
metrics_service_name: Option<String>,
parent_trace: Option<W3cTraceContext>,
environment_id: Option<String>,
requested_cwd: Option<PathBuf>,
) -> CodexResult<NewThread> {
Box::pin(self.state.spawn_thread(
config,
@@ -516,7 +515,6 @@ impl ThreadManager {
parent_trace,
/*user_shell_override*/ None,
environment_id,
requested_cwd,
))
.await
}
@@ -558,7 +556,6 @@ impl ThreadManager {
parent_trace,
/*user_shell_override*/ None,
/*environment_id*/ None,
/*requested_cwd*/ None,
))
.await
}
@@ -579,7 +576,6 @@ impl ThreadManager {
/*parent_trace*/ None,
/*user_shell_override*/ Some(user_shell_override),
/*environment_id*/ None,
/*requested_cwd*/ None,
))
.await
}
@@ -603,7 +599,6 @@ impl ThreadManager {
/*parent_trace*/ None,
/*user_shell_override*/ Some(user_shell_override),
/*environment_id*/ None,
/*requested_cwd*/ None,
))
.await
}
@@ -713,7 +708,6 @@ impl ThreadManager {
parent_trace,
/*user_shell_override*/ None,
/*environment_id*/ None,
/*requested_cwd*/ None,
))
.await
}
@@ -816,7 +810,6 @@ impl ThreadManagerState {
/*parent_trace*/ None,
/*user_shell_override*/ None,
/*environment_id*/ None,
/*requested_cwd*/ None,
))
.await
}
@@ -845,7 +838,6 @@ impl ThreadManagerState {
/*parent_trace*/ None,
/*user_shell_override*/ None,
/*environment_id*/ None,
/*requested_cwd*/ None,
))
.await
}
@@ -875,7 +867,6 @@ impl ThreadManagerState {
/*parent_trace*/ None,
/*user_shell_override*/ None,
/*environment_id*/ None,
/*requested_cwd*/ None,
))
.await
}
@@ -894,7 +885,6 @@ impl ThreadManagerState {
parent_trace: Option<W3cTraceContext>,
user_shell_override: Option<crate::shell::Shell>,
environment_id: Option<String>,
requested_cwd: Option<PathBuf>,
) -> CodexResult<NewThread> {
Box::pin(self.spawn_thread_with_source(
config,
@@ -910,11 +900,28 @@ impl ThreadManagerState {
parent_trace,
user_shell_override,
environment_id,
requested_cwd,
))
.await
}
async fn inherited_environment_manager_for_source(
&self,
session_source: &SessionSource,
) -> Arc<EnvironmentManager> {
let SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id, ..
}) = session_source
else {
return Arc::clone(&self.environment_manager);
};
let Ok(parent_thread) = self.get_thread(*parent_thread_id).await else {
return Arc::clone(&self.environment_manager);
};
Arc::new(EnvironmentManager::from_environment(
parent_thread.selected_environment().as_deref(),
))
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn spawn_thread_with_source(
&self,
@@ -931,13 +938,19 @@ impl ThreadManagerState {
parent_trace: Option<W3cTraceContext>,
user_shell_override: Option<crate::shell::Shell>,
environment_id: Option<String>,
requested_cwd: Option<PathBuf>,
) -> CodexResult<NewThread> {
let environment = self
.environment_manager
let environment_manager = if environment_id.is_some() {
Arc::clone(&self.environment_manager)
} else {
self.inherited_environment_manager_for_source(&session_source)
.await
};
let environment = environment_manager
.environment(environment_id.as_deref())
.await
.map_err(|err| CodexErr::Fatal(format!("failed to create environment: {err}")))?;
let selected_environment_manager =
Arc::new(EnvironmentManager::from_environment(environment.as_deref()));
let watch_registration = match environment.as_ref() {
Some(environment) if !environment.is_remote() => {
self.skills_watcher
@@ -957,9 +970,7 @@ impl ThreadManagerState {
config,
auth_manager,
models_manager: Arc::clone(&self.models_manager),
environment_manager: Arc::clone(&self.environment_manager),
environment_id,
requested_cwd,
environment_manager: selected_environment_manager,
skills_manager: Arc::clone(&self.skills_manager),
plugins_manager: Arc::clone(&self.plugins_manager),
mcp_manager: Arc::clone(&self.mcp_manager),
@@ -978,7 +989,6 @@ impl ThreadManagerState {
})
.await?;
self.finalize_thread_spawn(codex, thread_id, watch_registration)
.await
}
async fn finalize_thread_spawn(

View File

@@ -10,6 +10,8 @@ use codex_protocol::models::ReasoningItemReasoningSummary;
use codex_protocol::models::ResponseItem;
use codex_protocol::openai_models::ModelsResponse;
use codex_protocol::protocol::AgentMessageEvent;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::TurnStartedEvent;
use codex_protocol::protocol::UserMessageEvent;
use core_test_support::PathBufExt;
@@ -273,6 +275,68 @@ async fn shutdown_all_threads_bounded_submits_shutdown_to_every_thread() {
assert!(manager.list_thread_ids().await.is_empty());
}
#[tokio::test]
async fn spawned_subagent_thread_inherits_parent_environment_selection() {
let temp_dir = tempdir().expect("tempdir");
let mut config = test_config().await;
config.codex_home = temp_dir.path().join("codex-home").abs();
config.cwd = config.codex_home.abs();
std::fs::create_dir_all(&config.codex_home).expect("create codex home");
let manager = ThreadManager::with_models_provider_and_home_for_tests(
CodexAuth::from_api_key("dummy"),
config.model_provider.clone(),
config.codex_home.to_path_buf(),
Arc::new(codex_exec_server::EnvironmentManager::new(Some(
"ws://127.0.0.1:8765".to_string(),
))),
);
let parent_thread = manager
.start_thread_with_tools_and_service_name(
config.clone(),
InitialHistory::New,
Vec::new(),
/*persist_extended_history*/ false,
/*metrics_service_name*/ None,
/*parent_trace*/ None,
Some("local".to_string()),
)
.await
.expect("start parent thread");
let child_thread = manager
.state
.spawn_new_thread_with_source(
config,
manager.agent_control(),
SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: parent_thread.thread_id,
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: None,
}),
/*persist_extended_history*/ false,
/*metrics_service_name*/ None,
/*inherited_shell_snapshot*/ None,
/*inherited_exec_policy*/ None,
)
.await
.expect("start child thread");
let parent_environment = parent_thread
.thread
.selected_environment()
.expect("parent environment");
let child_environment = child_thread
.thread
.selected_environment()
.expect("child environment");
assert!(!parent_environment.is_remote());
assert!(!child_environment.is_remote());
}
#[tokio::test]
async fn new_uses_configured_openai_provider_for_model_refresh() {
let server = MockServer::start().await;

View File

@@ -33,6 +33,19 @@ struct EnvironmentConfig {
default_cwd: Option<PathBuf>,
}
/// Resolves the current or explicitly selected execution environment for a
/// session.
pub trait EnvironmentResolver: Send + Sync + std::fmt::Debug {
async fn current(&self) -> Result<Option<Arc<Environment>>, ExecServerError>;
async fn environment(
&self,
environment_id: Option<&str>,
) -> Result<Option<Arc<Environment>>, ExecServerError>;
fn default_cwd(&self, environment_id: Option<&str>) -> Result<Option<&Path>, ExecServerError>;
}
/// Lazily creates and caches the active environment for a session.
///
/// The manager keeps the session's environment selection stable so subagents
@@ -172,8 +185,47 @@ impl EnvironmentManager {
self.exec_server_url().is_some()
}
fn environment_config(
&self,
environment_id: Option<&str>,
) -> Result<Option<&EnvironmentConfig>, ExecServerError> {
match parse_environment_id(environment_id)? {
None => Ok(self.current_environment_config.as_ref()),
Some(ExplicitEnvironmentId::Local) => {
if self.disabled {
return Err(ExecServerError::Protocol(
"environments are disabled for this session".to_string(),
));
}
self.local_environment_config
.as_ref()
.ok_or_else(|| {
ExecServerError::Protocol("local environment is not configured".to_string())
})
.map(Some)
}
Some(ExplicitEnvironmentId::Remote) => {
if self.disabled {
return Err(ExecServerError::Protocol(
"environments are disabled for this session".to_string(),
));
}
self.remote_environment_config
.as_ref()
.ok_or_else(|| {
ExecServerError::Protocol(
"remote environment is not configured".to_string(),
)
})
.map(Some)
}
}
}
}
impl EnvironmentResolver for EnvironmentManager {
/// Returns the cached environment, creating it on first access.
pub async fn current(&self) -> Result<Option<Arc<Environment>>, ExecServerError> {
async fn current(&self) -> Result<Option<Arc<Environment>>, ExecServerError> {
self.current_environment
.get_or_try_init(|| async {
if self.disabled {
@@ -196,7 +248,7 @@ impl EnvironmentManager {
.map(std::option::Option::<&Arc<Environment>>::cloned)
}
pub async fn environment(
async fn environment(
&self,
environment_id: Option<&str>,
) -> Result<Option<Arc<Environment>>, ExecServerError> {
@@ -207,6 +259,32 @@ impl EnvironmentManager {
}
}
fn default_cwd(&self, environment_id: Option<&str>) -> Result<Option<&Path>, ExecServerError> {
Ok(self
.environment_config(environment_id)?
.and_then(|environment_config| environment_config.default_cwd.as_deref()))
}
}
impl EnvironmentManager {
pub async fn current(&self) -> Result<Option<Arc<Environment>>, ExecServerError> {
<Self as EnvironmentResolver>::current(self).await
}
pub async fn environment(
&self,
environment_id: Option<&str>,
) -> Result<Option<Arc<Environment>>, ExecServerError> {
<Self as EnvironmentResolver>::environment(self, environment_id).await
}
pub fn default_cwd(
&self,
environment_id: Option<&str>,
) -> Result<Option<&Path>, ExecServerError> {
<Self as EnvironmentResolver>::default_cwd(self, environment_id)
}
async fn local_environment(&self) -> Result<Arc<Environment>, ExecServerError> {
if self.disabled {
return Err(ExecServerError::Protocol(
@@ -497,6 +575,7 @@ mod tests {
.expect("runtime paths");
let manager = EnvironmentManager::new_with_runtime_paths(
/*exec_server_url*/ None,
/*exec_server_cwd*/ None,
Some(runtime_paths.clone()),
);
@@ -538,6 +617,25 @@ mod tests {
);
}
#[test]
fn environment_manager_reports_default_cwd_for_selected_environment() {
let manager = EnvironmentManager::new_with_runtime_paths(
Some("ws://127.0.0.1:8765".to_string()),
Some(PathBuf::from("/tmp/devbox")),
/*local_runtime_paths*/ None,
);
assert_eq!(
manager.default_cwd(/*environment_id*/ None),
Ok(Some(Path::new("/tmp/devbox")))
);
assert_eq!(
manager.default_cwd(Some("remote")),
Ok(Some(Path::new("/tmp/devbox")))
);
assert_eq!(manager.default_cwd(Some("local")), Ok(None));
}
#[tokio::test]
async fn disabled_environment_manager_has_no_current_environment() {
let manager = EnvironmentManager::new(Some("none".to_string()));