Add sticky environment API and thread state (#18897)

## Summary
- add sticky environment selections to app-server v2 thread/start and
turn/start request flow
- carry thread-level selections through core session/thread state
- add app-server coverage for sticky selections and turn overrides

## Stack
1. This PR: API and thread persistence
2. #18898: config.toml named environment loading
3. #18899: downstream tool/runtime consumers

## Validation
- Not run locally; split only.

---------

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
starr-openai
2026-04-23 18:57:13 -07:00
committed by GitHub
parent e3c8720a99
commit 49fb25997f
26 changed files with 988 additions and 165 deletions

View File

@@ -324,6 +324,7 @@ async fn skills_changed_notification_is_emitted_after_skill_change() -> Result<(
ephemeral: None,
session_start_source: None,
dynamic_tools: None,
environments: None,
mock_experimental_field: None,
experimental_raw_events: false,
persist_extended_history: false,

View File

@@ -19,6 +19,7 @@ use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStartedNotification;
use codex_app_server_protocol::ThreadStatus;
use codex_app_server_protocol::ThreadStatusChangedNotification;
use codex_app_server_protocol::TurnEnvironmentParams;
use codex_config::types::AuthCredentialsStoreMode;
use codex_core::config::set_project_trust_level;
use codex_core::config_loader::project_trust_key;
@@ -48,6 +49,7 @@ use super::analytics::thread_initialized_event;
use super::analytics::wait_for_analytics_payload;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
const INVALID_REQUEST_ERROR_CODE: i64 = -32600;
#[tokio::test]
async fn thread_start_creates_thread_and_emits_started() -> Result<()> {
@@ -166,6 +168,39 @@ async fn thread_start_creates_thread_and_emits_started() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn thread_start_rejects_unknown_environment_as_invalid_request() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml_without_approval_policy(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_thread_start_request(ThreadStartParams {
environments: Some(vec![TurnEnvironmentParams {
environment_id: "missing".to_string(),
cwd: codex_home.path().to_path_buf().try_into()?,
}]),
..Default::default()
})
.await?;
let error: JSONRPCError = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(error.id, RequestId::Integer(request_id));
assert_eq!(error.error.code, INVALID_REQUEST_ERROR_CODE);
assert_eq!(error.error.message, "unknown turn environment id `missing`");
Ok(())
}
#[tokio::test]
async fn thread_start_response_includes_loaded_instruction_sources() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;

View File

@@ -5,6 +5,7 @@ use app_test_support::create_apply_patch_sse_response;
use app_test_support::create_exec_command_sse_response;
use app_test_support::create_fake_rollout;
use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::create_mock_responses_server_sequence;
use app_test_support::create_mock_responses_server_sequence_unchecked;
use app_test_support::create_shell_command_sse_response;
@@ -47,6 +48,7 @@ use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnCompletedNotification;
use codex_app_server_protocol::TurnEnvironmentParams;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStartedNotification;
@@ -820,6 +822,69 @@ async fn turn_start_rejects_invalid_permission_profile_before_starting_turn() ->
Ok(())
}
#[tokio::test]
async fn turn_start_rejects_unknown_environment_before_starting_turn() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(
codex_home.path(),
&server.uri(),
"never",
&BTreeMap::default(),
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_req = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
}],
environments: Some(vec![TurnEnvironmentParams {
environment_id: "missing".to_string(),
cwd: codex_home.path().to_path_buf().try_into()?,
}]),
..Default::default()
})
.await?;
let err: JSONRPCError = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(turn_req)),
)
.await??;
assert_eq!(err.id, RequestId::Integer(turn_req));
assert_eq!(err.error.code, INVALID_REQUEST_ERROR_CODE);
assert_eq!(err.error.message, "unknown turn environment id `missing`");
let turn_started = tokio::time::timeout(
std::time::Duration::from_millis(250),
mcp.read_stream_until_notification_message("turn/started"),
)
.await;
assert!(
turn_started.is_err(),
"did not expect a turn/started notification after rejected environments"
);
Ok(())
}
#[tokio::test]
async fn turn_start_emits_notifications_and_accepts_model_override() -> Result<()> {
// Provide a mock server and config so model wiring is valid.
@@ -1926,6 +1991,179 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn turn_start_resolves_sticky_thread_environments_and_turn_overrides() -> Result<()> {
let tmp = TempDir::new()?;
let codex_home = tmp.path().join("codex_home");
std::fs::create_dir(&codex_home)?;
let workspace = tmp.path().join("workspace");
std::fs::create_dir(&workspace)?;
let server = create_mock_responses_server_repeating_assistant("done").await;
create_config_toml(&codex_home, &server.uri(), "never", &BTreeMap::default())?;
let mut mcp = McpProcess::new_with_env(
&codex_home,
&[("CODEX_EXEC_SERVER_URL", Some("http://127.0.0.1:1"))],
)
.await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
for case in [
EnvironmentSelectionCase {
name: "sticky_unset_turn_unset",
sticky: None,
turn: None,
},
EnvironmentSelectionCase {
name: "sticky_empty_turn_unset",
sticky: Some(&[]),
turn: None,
},
EnvironmentSelectionCase {
name: "sticky_local_turn_unset",
sticky: Some(&["local"]),
turn: None,
},
EnvironmentSelectionCase {
name: "sticky_remote_turn_unset",
sticky: Some(&["remote"]),
turn: None,
},
EnvironmentSelectionCase {
name: "sticky_local_remote_turn_unset",
sticky: Some(&["local", "remote"]),
turn: None,
},
EnvironmentSelectionCase {
name: "sticky_local_turn_empty",
sticky: Some(&["local"]),
turn: Some(&[]),
},
EnvironmentSelectionCase {
name: "sticky_empty_turn_local",
sticky: Some(&[]),
turn: Some(&["local"]),
},
EnvironmentSelectionCase {
name: "sticky_local_turn_remote",
sticky: Some(&["local"]),
turn: Some(&["remote"]),
},
EnvironmentSelectionCase {
name: "sticky_remote_turn_local",
sticky: Some(&["remote"]),
turn: Some(&["local"]),
},
EnvironmentSelectionCase {
name: "sticky_unset_turn_local_remote",
sticky: None,
turn: Some(&["local", "remote"]),
},
] {
run_environment_selection_case(&mut mcp, &workspace, case).await?;
}
Ok(())
}
struct EnvironmentSelectionCase {
name: &'static str,
sticky: Option<&'static [&'static str]>,
turn: Option<&'static [&'static str]>,
}
async fn run_environment_selection_case(
mcp: &mut McpProcess,
workspace: &Path,
case: EnvironmentSelectionCase,
) -> Result<()> {
let thread_req = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
cwd: Some(workspace.to_string_lossy().into_owned()),
environments: environment_params(case.sticky, workspace)?,
..Default::default()
})
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![V2UserInput::Text {
text: format!("run {}", case.name),
text_elements: Vec::new(),
}],
environments: environment_params(case.turn, workspace)?,
cwd: Some(workspace.to_path_buf()),
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
)
.await??;
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
let started_notification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/started"),
)
.await??;
let started: TurnStartedNotification = serde_json::from_value(
started_notification
.params
.ok_or_else(|| anyhow::anyhow!("turn/started notification should include params"))?,
)?;
assert_eq!(started.turn.id, turn.id, "{}", case.name);
let completed_notification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let completed: TurnCompletedNotification =
serde_json::from_value(completed_notification.params.ok_or_else(|| {
anyhow::anyhow!("turn/completed notification should include params")
})?)?;
assert_eq!(completed.turn.id, turn.id, "{}", case.name);
assert_eq!(
completed.turn.status,
TurnStatus::Completed,
"{}",
case.name
);
mcp.clear_message_buffer();
Ok(())
}
fn environment_params(
ids: Option<&[&str]>,
cwd: &Path,
) -> Result<Option<Vec<TurnEnvironmentParams>>> {
ids.map(|ids| {
ids.iter()
.map(|id| {
Ok(TurnEnvironmentParams {
environment_id: (*id).to_string(),
cwd: cwd.to_path_buf().try_into()?,
})
})
.collect()
})
.transpose()
}
#[tokio::test]
async fn turn_start_file_change_approval_v2() -> Result<()> {
skip_if_no_network!(Ok(()));