mirror of
https://github.com/openai/codex.git
synced 2026-05-07 21:06:39 +00:00
Compare commits
5 Commits
pr20430
...
starr/env-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4299e19dc8 | ||
|
|
279091af9c | ||
|
|
fc072478b8 | ||
|
|
331d54d10b | ||
|
|
20a58943c7 |
@@ -41,6 +41,7 @@ use opentelemetry_sdk::trace::SdkTracerProvider;
|
||||
use opentelemetry_sdk::trace::SpanData;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::collections::BTreeMap;
|
||||
use std::future::Future;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::sync::OnceLock;
|
||||
@@ -503,154 +504,187 @@ where
|
||||
spans.into_iter().skip(baseline_len).collect()
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn thread_start_jsonrpc_span_exports_server_span_and_parents_children() -> Result<()> {
|
||||
let _guard = tracing_test_guard().lock().await;
|
||||
let mut harness = TracingHarness::new().await?;
|
||||
|
||||
let RemoteTrace {
|
||||
trace_id: remote_trace_id,
|
||||
parent_span_id: remote_parent_span_id,
|
||||
context: remote_trace,
|
||||
..
|
||||
} = RemoteTrace::new("00000000000000000000000000000011", "0000000000000022");
|
||||
|
||||
let _: ThreadStartResponse = harness
|
||||
.start_thread(/*request_id*/ 20_002, /*trace*/ None)
|
||||
.await;
|
||||
let untraced_spans = wait_for_exported_spans(harness.tracing, |spans| {
|
||||
spans.iter().any(|span| {
|
||||
span.span_kind == SpanKind::Server
|
||||
&& span_attr(span, "rpc.method") == Some("thread/start")
|
||||
fn run_current_thread_test_with_large_stack<F, Fut>(test: F) -> Result<()>
|
||||
where
|
||||
F: FnOnce() -> Fut + Send + 'static,
|
||||
Fut: Future<Output = Result<()>> + 'static,
|
||||
{
|
||||
let join_handle = std::thread::Builder::new()
|
||||
.stack_size(8 * 1024 * 1024)
|
||||
.spawn(move || {
|
||||
tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("current-thread runtime should build")
|
||||
.block_on(test())
|
||||
})
|
||||
})
|
||||
.await;
|
||||
let untraced_server_span = find_rpc_span_with_trace(
|
||||
&untraced_spans,
|
||||
SpanKind::Server,
|
||||
"thread/start",
|
||||
untraced_spans
|
||||
.iter()
|
||||
.rev()
|
||||
.find(|span| {
|
||||
.expect("test thread should spawn");
|
||||
|
||||
join_handle
|
||||
.join()
|
||||
.unwrap_or_else(|panic_payload| std::panic::resume_unwind(panic_payload))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn thread_start_jsonrpc_span_exports_server_span_and_parents_children() -> Result<()> {
|
||||
run_current_thread_test_with_large_stack(|| async {
|
||||
let _guard = tracing_test_guard().lock().await;
|
||||
let mut harness = TracingHarness::new().await?;
|
||||
|
||||
let RemoteTrace {
|
||||
trace_id: remote_trace_id,
|
||||
parent_span_id: remote_parent_span_id,
|
||||
context: remote_trace,
|
||||
..
|
||||
} = RemoteTrace::new("00000000000000000000000000000011", "0000000000000022");
|
||||
|
||||
let _: ThreadStartResponse = harness
|
||||
.start_thread(/*request_id*/ 20_002, /*trace*/ None)
|
||||
.await;
|
||||
let untraced_spans = wait_for_exported_spans(harness.tracing, |spans| {
|
||||
spans.iter().any(|span| {
|
||||
span.span_kind == SpanKind::Server
|
||||
&& span_attr(span, "rpc.system") == Some("jsonrpc")
|
||||
&& span_attr(span, "rpc.method") == Some("thread/start")
|
||||
})
|
||||
.unwrap_or_else(|| {
|
||||
panic!(
|
||||
"missing latest thread/start server span; exported spans:\n{}",
|
||||
format_spans(&untraced_spans)
|
||||
)
|
||||
})
|
||||
.await;
|
||||
let untraced_server_span = find_rpc_span_with_trace(
|
||||
&untraced_spans,
|
||||
SpanKind::Server,
|
||||
"thread/start",
|
||||
untraced_spans
|
||||
.iter()
|
||||
.rev()
|
||||
.find(|span| {
|
||||
span.span_kind == SpanKind::Server
|
||||
&& span_attr(span, "rpc.system") == Some("jsonrpc")
|
||||
&& span_attr(span, "rpc.method") == Some("thread/start")
|
||||
})
|
||||
.unwrap_or_else(|| {
|
||||
panic!(
|
||||
"missing latest thread/start server span; exported spans:\n{}",
|
||||
format_spans(&untraced_spans)
|
||||
)
|
||||
})
|
||||
.span_context
|
||||
.trace_id(),
|
||||
);
|
||||
assert_has_internal_descendant_at_min_depth(
|
||||
&untraced_spans,
|
||||
untraced_server_span,
|
||||
/*min_depth*/ 1,
|
||||
);
|
||||
|
||||
let baseline_len = untraced_spans.len();
|
||||
let _: ThreadStartResponse = harness
|
||||
.start_thread(/*request_id*/ 20_003, Some(remote_trace))
|
||||
.await;
|
||||
let spans = wait_for_new_exported_spans(harness.tracing, baseline_len, |spans| {
|
||||
spans.iter().any(|span| {
|
||||
span.span_kind == SpanKind::Server
|
||||
&& span_attr(span, "rpc.method") == Some("thread/start")
|
||||
&& span.span_context.trace_id() == remote_trace_id
|
||||
}) && spans.iter().any(|span| {
|
||||
span.name.as_ref() == "app_server.thread_start.notify_started"
|
||||
&& span.span_context.trace_id() == remote_trace_id
|
||||
})
|
||||
.span_context
|
||||
.trace_id(),
|
||||
);
|
||||
assert_has_internal_descendant_at_min_depth(
|
||||
&untraced_spans,
|
||||
untraced_server_span,
|
||||
/*min_depth*/ 1,
|
||||
);
|
||||
|
||||
let baseline_len = untraced_spans.len();
|
||||
let _: ThreadStartResponse = harness
|
||||
.start_thread(/*request_id*/ 20_003, Some(remote_trace))
|
||||
.await;
|
||||
let spans = wait_for_new_exported_spans(harness.tracing, baseline_len, |spans| {
|
||||
spans.iter().any(|span| {
|
||||
span.span_kind == SpanKind::Server
|
||||
&& span_attr(span, "rpc.method") == Some("thread/start")
|
||||
&& span.span_context.trace_id() == remote_trace_id
|
||||
}) && spans.iter().any(|span| {
|
||||
span.name.as_ref() == "app_server.thread_start.notify_started"
|
||||
&& span.span_context.trace_id() == remote_trace_id
|
||||
})
|
||||
.await;
|
||||
|
||||
let server_request_span =
|
||||
find_rpc_span_with_trace(&spans, SpanKind::Server, "thread/start", remote_trace_id);
|
||||
assert_eq!(server_request_span.name.as_ref(), "thread/start");
|
||||
assert_eq!(server_request_span.parent_span_id, remote_parent_span_id);
|
||||
assert!(server_request_span.parent_span_is_remote);
|
||||
assert_eq!(server_request_span.span_context.trace_id(), remote_trace_id);
|
||||
assert_ne!(server_request_span.span_context.span_id(), SpanId::INVALID);
|
||||
assert_has_internal_descendant_at_min_depth(
|
||||
&spans,
|
||||
server_request_span,
|
||||
/*min_depth*/ 1,
|
||||
);
|
||||
assert_has_internal_descendant_at_min_depth(
|
||||
&spans,
|
||||
server_request_span,
|
||||
/*min_depth*/ 2,
|
||||
);
|
||||
harness.shutdown().await;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await;
|
||||
|
||||
let server_request_span =
|
||||
find_rpc_span_with_trace(&spans, SpanKind::Server, "thread/start", remote_trace_id);
|
||||
assert_eq!(server_request_span.name.as_ref(), "thread/start");
|
||||
assert_eq!(server_request_span.parent_span_id, remote_parent_span_id);
|
||||
assert!(server_request_span.parent_span_is_remote);
|
||||
assert_eq!(server_request_span.span_context.trace_id(), remote_trace_id);
|
||||
assert_ne!(server_request_span.span_context.span_id(), SpanId::INVALID);
|
||||
assert_has_internal_descendant_at_min_depth(&spans, server_request_span, /*min_depth*/ 1);
|
||||
assert_has_internal_descendant_at_min_depth(&spans, server_request_span, /*min_depth*/ 2);
|
||||
harness.shutdown().await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn turn_start_jsonrpc_span_parents_core_turn_spans() -> Result<()> {
|
||||
let _guard = tracing_test_guard().lock().await;
|
||||
let mut harness = TracingHarness::new().await?;
|
||||
let thread_start_response = harness.start_thread(/*request_id*/ 2, /*trace*/ None).await;
|
||||
let thread_id = thread_start_response.thread.id.clone();
|
||||
#[test]
|
||||
fn turn_start_jsonrpc_span_parents_core_turn_spans() -> Result<()> {
|
||||
run_current_thread_test_with_large_stack(|| async {
|
||||
let _guard = tracing_test_guard().lock().await;
|
||||
let mut harness = TracingHarness::new().await?;
|
||||
let thread_start_response = harness.start_thread(/*request_id*/ 2, /*trace*/ None).await;
|
||||
let thread_id = thread_start_response.thread.id.clone();
|
||||
|
||||
harness.reset_tracing();
|
||||
harness.reset_tracing();
|
||||
|
||||
let RemoteTrace {
|
||||
trace_id: remote_trace_id,
|
||||
parent_span_id: remote_parent_span_id,
|
||||
context: remote_trace,
|
||||
} = RemoteTrace::new("00000000000000000000000000000077", "0000000000000088");
|
||||
let turn_start_response: TurnStartResponse = harness
|
||||
.request(
|
||||
ClientRequest::TurnStart {
|
||||
request_id: RequestId::Integer(3),
|
||||
params: TurnStartParams {
|
||||
thread_id,
|
||||
input: vec![UserInput::Text {
|
||||
text: "hello".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
responsesapi_client_metadata: None,
|
||||
cwd: None,
|
||||
approval_policy: None,
|
||||
sandbox_policy: None,
|
||||
approvals_reviewer: None,
|
||||
model: None,
|
||||
service_tier: None,
|
||||
effort: None,
|
||||
summary: None,
|
||||
personality: None,
|
||||
output_schema: None,
|
||||
collaboration_mode: None,
|
||||
let RemoteTrace {
|
||||
trace_id: remote_trace_id,
|
||||
parent_span_id: remote_parent_span_id,
|
||||
context: remote_trace,
|
||||
} = RemoteTrace::new("00000000000000000000000000000077", "0000000000000088");
|
||||
let turn_start_response: TurnStartResponse = harness
|
||||
.request(
|
||||
ClientRequest::TurnStart {
|
||||
request_id: RequestId::Integer(3),
|
||||
params: TurnStartParams {
|
||||
thread_id,
|
||||
input: vec![UserInput::Text {
|
||||
text: "hello".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
responsesapi_client_metadata: None,
|
||||
cwd: None,
|
||||
approval_policy: None,
|
||||
sandbox_policy: None,
|
||||
approvals_reviewer: None,
|
||||
model: None,
|
||||
service_tier: None,
|
||||
effort: None,
|
||||
summary: None,
|
||||
personality: None,
|
||||
output_schema: None,
|
||||
collaboration_mode: None,
|
||||
},
|
||||
},
|
||||
},
|
||||
Some(remote_trace),
|
||||
)
|
||||
.await;
|
||||
let spans = wait_for_exported_spans(harness.tracing, |spans| {
|
||||
spans.iter().any(|span| {
|
||||
span.span_kind == SpanKind::Server
|
||||
&& span_attr(span, "rpc.method") == Some("turn/start")
|
||||
&& span.span_context.trace_id() == remote_trace_id
|
||||
}) && spans.iter().any(|span| {
|
||||
span_attr(span, "codex.op") == Some("user_input")
|
||||
&& span.span_context.trace_id() == remote_trace_id
|
||||
Some(remote_trace),
|
||||
)
|
||||
.await;
|
||||
let spans = wait_for_exported_spans(harness.tracing, |spans| {
|
||||
spans.iter().any(|span| {
|
||||
span.span_kind == SpanKind::Server
|
||||
&& span_attr(span, "rpc.method") == Some("turn/start")
|
||||
&& span.span_context.trace_id() == remote_trace_id
|
||||
}) && spans.iter().any(|span| {
|
||||
span_attr(span, "codex.op") == Some("user_input")
|
||||
&& span.span_context.trace_id() == remote_trace_id
|
||||
})
|
||||
})
|
||||
.await;
|
||||
|
||||
let server_request_span =
|
||||
find_rpc_span_with_trace(&spans, SpanKind::Server, "turn/start", remote_trace_id);
|
||||
let core_turn_span =
|
||||
find_span_with_trace(&spans, remote_trace_id, "codex.op=user_input", |span| {
|
||||
span_attr(span, "codex.op") == Some("user_input")
|
||||
});
|
||||
|
||||
assert_eq!(server_request_span.parent_span_id, remote_parent_span_id);
|
||||
assert!(server_request_span.parent_span_is_remote);
|
||||
assert_eq!(server_request_span.span_context.trace_id(), remote_trace_id);
|
||||
assert_eq!(
|
||||
span_attr(server_request_span, "turn.id"),
|
||||
Some(turn_start_response.turn.id.as_str())
|
||||
);
|
||||
assert_span_descends_from(&spans, core_turn_span, server_request_span);
|
||||
harness.shutdown().await;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await;
|
||||
|
||||
let server_request_span =
|
||||
find_rpc_span_with_trace(&spans, SpanKind::Server, "turn/start", remote_trace_id);
|
||||
let core_turn_span =
|
||||
find_span_with_trace(&spans, remote_trace_id, "codex.op=user_input", |span| {
|
||||
span_attr(span, "codex.op") == Some("user_input")
|
||||
});
|
||||
|
||||
assert_eq!(server_request_span.parent_span_id, remote_parent_span_id);
|
||||
assert!(server_request_span.parent_span_is_remote);
|
||||
assert_eq!(server_request_span.span_context.trace_id(), remote_trace_id);
|
||||
assert_eq!(
|
||||
span_attr(server_request_span, "turn.id"),
|
||||
Some(turn_start_response.turn.id.as_str())
|
||||
);
|
||||
assert_span_descends_from(&spans, core_turn_span, server_request_span);
|
||||
harness.shutdown().await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@ use codex_protocol::models::FunctionCallOutputPayload;
|
||||
use codex_protocol::models::MessagePhase;
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_rollout::read_session_meta_line;
|
||||
use codex_rollout::state_db;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use codex_utils_stream_parser::strip_proposed_plan_blocks;
|
||||
@@ -153,8 +154,34 @@ async fn maybe_mark_thread_memory_mode_polluted_from_web_search(
|
||||
{
|
||||
return;
|
||||
}
|
||||
let Some(state_db_ctx) = sess.services.state_db.as_deref() else {
|
||||
return;
|
||||
};
|
||||
if state_db_ctx
|
||||
.get_thread_memory_mode(sess.conversation_id)
|
||||
.await
|
||||
.ok()
|
||||
.flatten()
|
||||
.is_none()
|
||||
&& let Some(rollout_path) = sess.current_rollout_path().await
|
||||
&& let Ok(session_meta_line) = read_session_meta_line(rollout_path.as_path()).await
|
||||
{
|
||||
state_db::apply_rollout_items(
|
||||
Some(state_db_ctx),
|
||||
rollout_path.as_path(),
|
||||
turn_context.config.model_provider_id.as_str(),
|
||||
/*builder*/ None,
|
||||
&[codex_protocol::protocol::RolloutItem::SessionMeta(
|
||||
session_meta_line,
|
||||
)],
|
||||
"record_completed_response_item.ensure_web_search_thread",
|
||||
/*new_thread_memory_mode*/ None,
|
||||
/*updated_at_override*/ None,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
state_db::mark_thread_memory_mode_polluted(
|
||||
sess.services.state_db.as_deref(),
|
||||
Some(state_db_ctx),
|
||||
sess.conversation_id,
|
||||
"record_completed_response_item",
|
||||
)
|
||||
|
||||
@@ -10,9 +10,11 @@ use std::time::Duration;
|
||||
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::FileSystemPermissions;
|
||||
use codex_protocol::models::FunctionCallOutputContentItem;
|
||||
use codex_protocol::models::FunctionCallOutputPayload;
|
||||
use codex_protocol::models::ImageDetail;
|
||||
use codex_protocol::models::PermissionProfile;
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
@@ -48,6 +50,7 @@ use codex_sandboxing::SandboxablePreference;
|
||||
use codex_tools::ResponsesApiNamespaceTool;
|
||||
use codex_tools::ToolName;
|
||||
use codex_tools::ToolSpec;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use codex_utils_output_truncation::TruncationPolicy;
|
||||
use codex_utils_output_truncation::truncate_text;
|
||||
|
||||
@@ -1013,11 +1016,17 @@ impl JsReplManager {
|
||||
.write_kernel_script()
|
||||
.await
|
||||
.map_err(|err| err.to_string())?;
|
||||
let js_tmp_dir = AbsolutePathBuf::from_absolute_path(self.tmp_dir.path().to_path_buf())
|
||||
.map_err(|err| format!("failed to resolve js_repl temp dir: {err}"))?;
|
||||
|
||||
let mut env = create_env(&turn.shell_environment_policy, thread_id);
|
||||
if !dependency_env.is_empty() {
|
||||
env.extend(dependency_env.clone());
|
||||
}
|
||||
env.insert(
|
||||
"TMPDIR".to_string(),
|
||||
self.tmp_dir.path().to_string_lossy().to_string(),
|
||||
);
|
||||
env.insert(
|
||||
"CODEX_JS_TMP_DIR".to_string(),
|
||||
self.tmp_dir.path().to_string_lossy().to_string(),
|
||||
@@ -1054,7 +1063,13 @@ impl JsReplManager {
|
||||
],
|
||||
cwd: turn.cwd.clone(),
|
||||
env,
|
||||
additional_permissions: None,
|
||||
additional_permissions: Some(PermissionProfile {
|
||||
network: None,
|
||||
file_system: Some(FileSystemPermissions {
|
||||
read: None,
|
||||
write: Some(vec![js_tmp_dir]),
|
||||
}),
|
||||
}),
|
||||
};
|
||||
let options = ExecOptions {
|
||||
expiration: ExecExpiration::DefaultTimeout,
|
||||
@@ -1970,6 +1985,7 @@ pub(crate) async fn resolve_compatible_node(config_path: Option<&Path>) -> Resul
|
||||
let node_path = resolve_node(config_path).ok_or_else(|| {
|
||||
"Node runtime not found; install Node or set CODEX_JS_REPL_NODE_PATH".to_string()
|
||||
})?;
|
||||
let node_path = materialize_dotslash_path(node_path).await?;
|
||||
ensure_node_version(&node_path).await?;
|
||||
Ok(node_path)
|
||||
}
|
||||
@@ -1995,6 +2011,62 @@ pub(crate) fn resolve_node(config_path: Option<&Path>) -> Option<PathBuf> {
|
||||
None
|
||||
}
|
||||
|
||||
async fn materialize_dotslash_path(path: PathBuf) -> Result<PathBuf, String> {
|
||||
if !is_dotslash_manifest(&path).await? {
|
||||
return Ok(path);
|
||||
}
|
||||
|
||||
let output = tokio::process::Command::new("dotslash")
|
||||
.arg("--")
|
||||
.arg("fetch")
|
||||
.arg(&path)
|
||||
.output()
|
||||
.await
|
||||
.map_err(|err| format!("failed to run dotslash for {}: {err}", path.display()))?;
|
||||
if !output.status.success() {
|
||||
let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
|
||||
return Err(format!(
|
||||
"dotslash fetch failed for {}: {stderr}",
|
||||
path.display()
|
||||
));
|
||||
}
|
||||
|
||||
let fetched_path = String::from_utf8(output.stdout)
|
||||
.map_err(|err| {
|
||||
format!(
|
||||
"dotslash fetch output was not utf8 for {}: {err}",
|
||||
path.display()
|
||||
)
|
||||
})?
|
||||
.trim()
|
||||
.to_string();
|
||||
if fetched_path.is_empty() {
|
||||
return Err(format!(
|
||||
"dotslash fetch output was empty for {}",
|
||||
path.display()
|
||||
));
|
||||
}
|
||||
|
||||
let fetched_path = PathBuf::from(fetched_path);
|
||||
if !fetched_path.is_file() {
|
||||
return Err(format!(
|
||||
"dotslash returned non-file path for {}: {}",
|
||||
path.display(),
|
||||
fetched_path.display()
|
||||
));
|
||||
}
|
||||
|
||||
Ok(fetched_path)
|
||||
}
|
||||
|
||||
async fn is_dotslash_manifest(path: &Path) -> Result<bool, String> {
|
||||
let bytes = tokio::fs::read(path)
|
||||
.await
|
||||
.map_err(|err| format!("failed to read Node runtime path {}: {err}", path.display()))?;
|
||||
let prefix = String::from_utf8_lossy(&bytes[..bytes.len().min(128)]);
|
||||
Ok(prefix.starts_with("#!/usr/bin/env dotslash"))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[path = "mod_tests.rs"]
|
||||
mod tests;
|
||||
|
||||
@@ -57,8 +57,6 @@ use wiremock::matchers::method;
|
||||
use wiremock::matchers::path_regex;
|
||||
|
||||
const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex.";
|
||||
const STARTUP_CONTEXT_OPEN_TAG: &str = "<startup_context>";
|
||||
const STARTUP_CONTEXT_CLOSE_TAG: &str = "</startup_context>";
|
||||
const REALTIME_BACKEND_PROMPT: &str = include_str!("../../templates/realtime/backend_prompt.md");
|
||||
const USER_FIRST_NAME_PLACEHOLDER: &str = "{{ user_first_name }}";
|
||||
const MEMORY_PROMPT_PHRASE: &str =
|
||||
@@ -1019,6 +1017,16 @@ async fn conversation_uses_experimental_realtime_ws_base_url_override() -> Resul
|
||||
}))
|
||||
.await?;
|
||||
|
||||
let started = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationStarted(started) => Some(Ok(started.clone())),
|
||||
EventMsg::Error(err) => Some(Err(err.clone())),
|
||||
_ => None,
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
|
||||
assert!(started.session_id.is_some());
|
||||
assert_eq!(started.version, RealtimeConversationVersion::V1);
|
||||
|
||||
let session_updated = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
@@ -1532,8 +1540,6 @@ async fn conversation_start_injects_startup_context_from_thread_history() -> Res
|
||||
let startup_context = websocket_request_instructions(&startup_context_request)
|
||||
.expect("startup context request should contain instructions");
|
||||
|
||||
assert!(startup_context.contains(STARTUP_CONTEXT_OPEN_TAG));
|
||||
assert!(startup_context.contains(STARTUP_CONTEXT_CLOSE_TAG));
|
||||
assert!(startup_context.contains(STARTUP_CONTEXT_HEADER));
|
||||
assert!(!startup_context.contains("## User"));
|
||||
assert!(startup_context.contains("### "));
|
||||
@@ -1751,8 +1757,6 @@ async fn conversation_startup_context_falls_back_to_workspace_map() -> Result<()
|
||||
let startup_context = websocket_request_instructions(&startup_context_request)
|
||||
.expect("startup context request should contain instructions");
|
||||
|
||||
assert!(startup_context.contains(STARTUP_CONTEXT_OPEN_TAG));
|
||||
assert!(startup_context.contains(STARTUP_CONTEXT_CLOSE_TAG));
|
||||
assert!(startup_context.contains(STARTUP_CONTEXT_HEADER));
|
||||
assert!(startup_context.contains("## Machine / Workspace Map"));
|
||||
assert!(startup_context.contains("notes.txt"));
|
||||
@@ -1807,8 +1811,6 @@ async fn conversation_startup_context_is_truncated_and_sent_once_per_start() ->
|
||||
.await;
|
||||
let startup_context = websocket_request_instructions(&startup_context_request)
|
||||
.expect("startup context request should contain instructions");
|
||||
assert!(startup_context.contains(STARTUP_CONTEXT_OPEN_TAG));
|
||||
assert!(startup_context.contains(STARTUP_CONTEXT_CLOSE_TAG));
|
||||
assert!(startup_context.contains(STARTUP_CONTEXT_HEADER));
|
||||
assert!(startup_context.len() <= 20_500);
|
||||
|
||||
@@ -1887,7 +1889,6 @@ async fn conversation_user_text_turn_is_sent_to_realtime_when_active() -> Result
|
||||
assert_eq!(session_updated, "sess_user_text");
|
||||
|
||||
let user_text = "typed follow-up for realtime";
|
||||
let prefixed_user_text = format!("[USER] {user_text}");
|
||||
test.codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
@@ -1907,7 +1908,7 @@ async fn conversation_user_text_turn_is_sent_to_realtime_when_active() -> Result
|
||||
let realtime_text_request = wait_for_matching_websocket_request(
|
||||
&realtime_server,
|
||||
"normal user turn text mirrored to realtime",
|
||||
|request| websocket_request_text(request).as_deref() == Some(prefixed_user_text.as_str()),
|
||||
|request| websocket_request_text(request).as_deref() == Some(user_text),
|
||||
)
|
||||
.await;
|
||||
let model_user_texts = response_mock.single_request().message_input_texts("user");
|
||||
@@ -1916,7 +1917,7 @@ async fn conversation_user_text_turn_is_sent_to_realtime_when_active() -> Result
|
||||
model_user_texts.iter().any(|text| text == user_text),
|
||||
websocket_request_text(&realtime_text_request),
|
||||
),
|
||||
(true, Some(prefixed_user_text)),
|
||||
(true, Some(user_text.to_string())),
|
||||
);
|
||||
let realtime_response_create = timeout(Duration::from_millis(200), async {
|
||||
wait_for_matching_websocket_request(
|
||||
@@ -2698,7 +2699,7 @@ async fn inbound_conversation_item_does_not_start_turn_and_still_forwards_audio(
|
||||
.await;
|
||||
|
||||
let audio_out = tokio::time::timeout(
|
||||
Duration::from_millis(500),
|
||||
Duration::from_secs(2),
|
||||
wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::AudioOut(frame),
|
||||
|
||||
@@ -70,15 +70,28 @@ async fn build_codex_with_test_tool(server: &wiremock::MockServer) -> anyhow::Re
|
||||
builder.build(server).await
|
||||
}
|
||||
|
||||
fn assert_parallel_duration(actual: Duration) {
|
||||
// Allow headroom for slow CI scheduling; barrier synchronization already enforces overlap.
|
||||
fn assert_parallel_duration_under(actual: Duration, limit: Duration) {
|
||||
assert!(
|
||||
actual < Duration::from_millis(1_600),
|
||||
"expected parallel execution to finish quickly, got {actual:?}"
|
||||
actual < limit,
|
||||
"expected parallel execution to finish under {limit:?}, got {actual:?}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
fn assert_shell_function_call_succeeded(
|
||||
req: &core_test_support::responses::ResponsesRequest,
|
||||
call_id: &str,
|
||||
) {
|
||||
let (content, success) = req
|
||||
.function_call_output_content_and_success(call_id)
|
||||
.unwrap_or_else(|| panic!("shell output missing for {call_id}"));
|
||||
assert_eq!(
|
||||
success,
|
||||
Some(true),
|
||||
"expected successful shell output for {call_id}, got content={content:?} success={success:?}",
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn read_file_tools_run_in_parallel() -> anyhow::Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
@@ -126,7 +139,7 @@ async fn read_file_tools_run_in_parallel() -> anyhow::Result<()> {
|
||||
ev_assistant_message("msg-1", "done"),
|
||||
ev_completed("resp-2"),
|
||||
]);
|
||||
mount_sse_sequence(
|
||||
let response_mock = mount_sse_sequence(
|
||||
&server,
|
||||
vec![warmup_first, warmup_second, first_response, second_response],
|
||||
)
|
||||
@@ -135,12 +148,23 @@ async fn read_file_tools_run_in_parallel() -> anyhow::Result<()> {
|
||||
run_turn(&test, "warm up parallel tool").await?;
|
||||
|
||||
let duration = run_turn_and_measure(&test, "exercise sync tool").await?;
|
||||
assert_parallel_duration(duration);
|
||||
assert_parallel_duration_under(duration, Duration::from_secs(6));
|
||||
let req = response_mock
|
||||
.last_request()
|
||||
.expect("parallel sync tool run should send a completion request");
|
||||
assert_eq!(
|
||||
req.function_call_output_text("call-1").as_deref(),
|
||||
Some("ok")
|
||||
);
|
||||
assert_eq!(
|
||||
req.function_call_output_text("call-2").as_deref(),
|
||||
Some("ok")
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn shell_tools_run_in_parallel() -> anyhow::Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
@@ -148,15 +172,39 @@ async fn shell_tools_run_in_parallel() -> anyhow::Result<()> {
|
||||
let mut builder = test_codex().with_model("gpt-5.1");
|
||||
let test = builder.build(&server).await?;
|
||||
|
||||
let shell_args = json!({
|
||||
"command": "sleep 0.25",
|
||||
let warmup_shell_args = json!({
|
||||
"command": "sleep 0.01",
|
||||
// Avoid user-specific shell startup cost (e.g. zsh profile scripts) in timing assertions.
|
||||
"login": false,
|
||||
"timeout_ms": 1_000,
|
||||
});
|
||||
let shell_args = json!({
|
||||
"command": "sleep 5",
|
||||
// Avoid user-specific shell startup cost (e.g. zsh profile scripts) in timing assertions.
|
||||
"login": false,
|
||||
"timeout_ms": 15_000,
|
||||
});
|
||||
let shell_args_two = json!({
|
||||
"command": "sleep 5",
|
||||
// Avoid user-specific shell startup cost (e.g. zsh profile scripts) in timing assertions.
|
||||
"login": false,
|
||||
"timeout_ms": 15_000,
|
||||
});
|
||||
let warmup_args_one = serde_json::to_string(&warmup_shell_args)?;
|
||||
let warmup_args_two = serde_json::to_string(&warmup_shell_args)?;
|
||||
let args_one = serde_json::to_string(&shell_args)?;
|
||||
let args_two = serde_json::to_string(&shell_args)?;
|
||||
let args_two = serde_json::to_string(&shell_args_two)?;
|
||||
|
||||
let warmup_first_response = sse(vec![
|
||||
json!({"type": "response.created", "response": {"id": "resp-warm-1"}}),
|
||||
ev_function_call("warm-call-1", "shell_command", &warmup_args_one),
|
||||
ev_function_call("warm-call-2", "shell_command", &warmup_args_two),
|
||||
ev_completed("resp-warm-1"),
|
||||
]);
|
||||
let warmup_second_response = sse(vec![
|
||||
ev_assistant_message("warm-msg-1", "warmup done"),
|
||||
ev_completed("resp-warm-2"),
|
||||
]);
|
||||
let first_response = sse(vec![
|
||||
json!({"type": "response.created", "response": {"id": "resp-1"}}),
|
||||
ev_function_call("call-1", "shell_command", &args_one),
|
||||
@@ -167,25 +215,50 @@ async fn shell_tools_run_in_parallel() -> anyhow::Result<()> {
|
||||
ev_assistant_message("msg-1", "done"),
|
||||
ev_completed("resp-2"),
|
||||
]);
|
||||
mount_sse_sequence(&server, vec![first_response, second_response]).await;
|
||||
let response_mock = mount_sse_sequence(
|
||||
&server,
|
||||
vec![
|
||||
warmup_first_response,
|
||||
warmup_second_response,
|
||||
first_response,
|
||||
second_response,
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
run_turn(&test, "warm up shell_command in parallel").await?;
|
||||
let duration = run_turn_and_measure(&test, "run shell_command twice").await?;
|
||||
assert_parallel_duration(duration);
|
||||
assert_parallel_duration_under(duration, Duration::from_millis(8_500));
|
||||
let req = response_mock
|
||||
.last_request()
|
||||
.expect("parallel shell run should send a completion request");
|
||||
assert_shell_function_call_succeeded(&req, "call-1");
|
||||
assert_shell_function_call_succeeded(&req, "call-2");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn mixed_parallel_tools_run_in_parallel() -> anyhow::Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let test = build_codex_with_test_tool(&server).await?;
|
||||
|
||||
let warmup_sync_args = json!({
|
||||
"sleep_after_ms": 10
|
||||
})
|
||||
.to_string();
|
||||
let sync_args = json!({
|
||||
"sleep_after_ms": 300
|
||||
})
|
||||
.to_string();
|
||||
let warmup_shell_args = serde_json::to_string(&json!({
|
||||
"command": "sleep 0.01",
|
||||
// Avoid user-specific shell startup cost in timing assertions.
|
||||
"login": false,
|
||||
"timeout_ms": 1_000,
|
||||
}))?;
|
||||
let shell_args = serde_json::to_string(&json!({
|
||||
"command": "sleep 0.25",
|
||||
// Avoid user-specific shell startup cost in timing assertions.
|
||||
@@ -193,6 +266,16 @@ async fn mixed_parallel_tools_run_in_parallel() -> anyhow::Result<()> {
|
||||
"timeout_ms": 1_000,
|
||||
}))?;
|
||||
|
||||
let warmup_first_response = sse(vec![
|
||||
json!({"type": "response.created", "response": {"id": "resp-warm-1"}}),
|
||||
ev_function_call("warm-call-1", "test_sync_tool", &warmup_sync_args),
|
||||
ev_function_call("warm-call-2", "shell_command", &warmup_shell_args),
|
||||
ev_completed("resp-warm-1"),
|
||||
]);
|
||||
let warmup_second_response = sse(vec![
|
||||
ev_assistant_message("warm-msg-1", "warmup done"),
|
||||
ev_completed("resp-warm-2"),
|
||||
]);
|
||||
let first_response = sse(vec![
|
||||
json!({"type": "response.created", "response": {"id": "resp-1"}}),
|
||||
ev_function_call("call-1", "test_sync_tool", &sync_args),
|
||||
@@ -203,8 +286,18 @@ async fn mixed_parallel_tools_run_in_parallel() -> anyhow::Result<()> {
|
||||
ev_assistant_message("msg-1", "done"),
|
||||
ev_completed("resp-2"),
|
||||
]);
|
||||
mount_sse_sequence(&server, vec![first_response, second_response]).await;
|
||||
mount_sse_sequence(
|
||||
&server,
|
||||
vec![
|
||||
warmup_first_response,
|
||||
warmup_second_response,
|
||||
first_response,
|
||||
second_response,
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
run_turn(&test, "warm up mixed tools").await?;
|
||||
let duration = run_turn_and_measure(&test, "mix tools").await?;
|
||||
assert_parallel_duration(duration);
|
||||
|
||||
|
||||
@@ -974,11 +974,10 @@ async fn unified_exec_terminal_interaction_captures_delayed_output() -> Result<(
|
||||
"begin event should include process_id for a live session"
|
||||
);
|
||||
|
||||
// We expect three terminal interactions matching the three write_stdin calls.
|
||||
assert_eq!(
|
||||
terminal_events.len(),
|
||||
3,
|
||||
"expected three terminal interactions; got {terminal_events:?}"
|
||||
// Delayed polls may coalesce, but we should still observe repeated terminal interaction events.
|
||||
assert!(
|
||||
terminal_events.len() >= 2,
|
||||
"expected at least two terminal interactions; got {terminal_events:?}"
|
||||
);
|
||||
|
||||
for event in &terminal_events {
|
||||
@@ -990,8 +989,8 @@ async fn unified_exec_terminal_interaction_captures_delayed_output() -> Result<(
|
||||
.iter()
|
||||
.map(|ev| ev.stdin.as_str())
|
||||
.collect::<Vec<_>>(),
|
||||
vec!["x", "x", "x"],
|
||||
"terminal interactions should reflect the three stdin polls"
|
||||
vec!["x"; terminal_events.len()],
|
||||
"terminal interactions should reflect repeated stdin polls"
|
||||
);
|
||||
|
||||
assert!(
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use tokio::sync::OnceCell;
|
||||
@@ -15,16 +16,97 @@ use crate::remote_process::RemoteProcess;
|
||||
|
||||
pub const CODEX_EXEC_SERVER_URL_ENV_VAR: &str = "CODEX_EXEC_SERVER_URL";
|
||||
|
||||
pub type EnvironmentId = String;
|
||||
|
||||
const LOCAL_ENVIRONMENT_ID: &str = "local";
|
||||
const REMOTE_ENVIRONMENT_ID: &str = "remote";
|
||||
|
||||
/// Configuration for a named environment registration.
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub struct EnvironmentConfig {
|
||||
exec_server_url: Option<String>,
|
||||
}
|
||||
|
||||
/// Produces the named environment registrations available to an
|
||||
/// `EnvironmentManager`.
|
||||
///
|
||||
/// Implementations own the policy for which environment IDs exist and which
|
||||
/// registered environment ID, if any, is used when callers request the
|
||||
/// compatibility default via `environment(None)`.
|
||||
pub trait EnvironmentProvider: Send + Sync + std::fmt::Debug {
|
||||
fn default_environment_id(&self) -> Option<EnvironmentId>;
|
||||
|
||||
fn environment_configs(&self) -> HashMap<EnvironmentId, EnvironmentConfig>;
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
struct ExecServerUrlEnvironmentProvider {
|
||||
exec_server_url: Option<String>,
|
||||
}
|
||||
|
||||
impl ExecServerUrlEnvironmentProvider {
|
||||
fn new(exec_server_url: Option<String>) -> Self {
|
||||
Self { exec_server_url }
|
||||
}
|
||||
|
||||
fn from_env() -> Self {
|
||||
Self::new(std::env::var(CODEX_EXEC_SERVER_URL_ENV_VAR).ok())
|
||||
}
|
||||
}
|
||||
|
||||
impl EnvironmentProvider for ExecServerUrlEnvironmentProvider {
|
||||
fn default_environment_id(&self) -> Option<EnvironmentId> {
|
||||
match normalize_exec_server_url(self.exec_server_url.clone()) {
|
||||
NormalizedExecServerUrl::Disabled => None,
|
||||
NormalizedExecServerUrl::LocalOnly => Some(LOCAL_ENVIRONMENT_ID.to_string()),
|
||||
NormalizedExecServerUrl::LocalAndRemote(_) => Some(REMOTE_ENVIRONMENT_ID.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
fn environment_configs(&self) -> HashMap<EnvironmentId, EnvironmentConfig> {
|
||||
match normalize_exec_server_url(self.exec_server_url.clone()) {
|
||||
NormalizedExecServerUrl::Disabled => HashMap::new(),
|
||||
NormalizedExecServerUrl::LocalOnly => HashMap::from([(
|
||||
LOCAL_ENVIRONMENT_ID.to_string(),
|
||||
EnvironmentConfig {
|
||||
exec_server_url: None,
|
||||
},
|
||||
)]),
|
||||
NormalizedExecServerUrl::LocalAndRemote(exec_server_url) => HashMap::from([
|
||||
(
|
||||
LOCAL_ENVIRONMENT_ID.to_string(),
|
||||
EnvironmentConfig {
|
||||
exec_server_url: None,
|
||||
},
|
||||
),
|
||||
(
|
||||
REMOTE_ENVIRONMENT_ID.to_string(),
|
||||
EnvironmentConfig {
|
||||
exec_server_url: Some(exec_server_url),
|
||||
},
|
||||
),
|
||||
]),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
enum NormalizedExecServerUrl {
|
||||
LocalOnly,
|
||||
Disabled,
|
||||
LocalAndRemote(String),
|
||||
}
|
||||
|
||||
/// Lazily creates and caches the active environment for a session.
|
||||
///
|
||||
/// The manager keeps the session's environment selection stable so subagents
|
||||
/// and follow-up turns preserve an explicit disabled state.
|
||||
#[derive(Debug)]
|
||||
pub struct EnvironmentManager {
|
||||
exec_server_url: Option<String>,
|
||||
default_environment_id: Option<EnvironmentId>,
|
||||
environment_configs: HashMap<EnvironmentId, EnvironmentConfig>,
|
||||
environment_cache: HashMap<EnvironmentId, Arc<OnceCell<Option<Arc<Environment>>>>>,
|
||||
local_runtime_paths: Option<ExecServerRuntimePaths>,
|
||||
disabled: bool,
|
||||
current_environment: OnceCell<Option<Arc<Environment>>>,
|
||||
}
|
||||
|
||||
impl Default for EnvironmentManager {
|
||||
@@ -45,12 +127,30 @@ impl EnvironmentManager {
|
||||
exec_server_url: Option<String>,
|
||||
local_runtime_paths: Option<ExecServerRuntimePaths>,
|
||||
) -> Self {
|
||||
let (exec_server_url, disabled) = normalize_exec_server_url(exec_server_url);
|
||||
Self {
|
||||
exec_server_url,
|
||||
Self::new_with_provider(
|
||||
ExecServerUrlEnvironmentProvider::new(exec_server_url),
|
||||
local_runtime_paths,
|
||||
)
|
||||
}
|
||||
|
||||
/// Builds a manager from a provider that supplies named environment
|
||||
/// registrations plus the compatibility default selection.
|
||||
pub fn new_with_provider(
|
||||
provider: impl EnvironmentProvider,
|
||||
local_runtime_paths: Option<ExecServerRuntimePaths>,
|
||||
) -> Self {
|
||||
let default_environment_id = provider.default_environment_id();
|
||||
let environment_configs = provider.environment_configs();
|
||||
let environment_cache = environment_configs
|
||||
.keys()
|
||||
.cloned()
|
||||
.map(|environment_id| (environment_id, Arc::new(OnceCell::new())))
|
||||
.collect();
|
||||
Self {
|
||||
default_environment_id,
|
||||
environment_configs,
|
||||
environment_cache,
|
||||
local_runtime_paths,
|
||||
disabled,
|
||||
current_environment: OnceCell::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -64,8 +164,8 @@ impl EnvironmentManager {
|
||||
pub fn from_env_with_runtime_paths(
|
||||
local_runtime_paths: Option<ExecServerRuntimePaths>,
|
||||
) -> Self {
|
||||
Self::new_with_runtime_paths(
|
||||
std::env::var(CODEX_EXEC_SERVER_URL_ENV_VAR).ok(),
|
||||
Self::new_with_provider(
|
||||
ExecServerUrlEnvironmentProvider::from_env(),
|
||||
local_runtime_paths,
|
||||
)
|
||||
}
|
||||
@@ -74,51 +174,110 @@ impl EnvironmentManager {
|
||||
/// disabled mode when no environment is available.
|
||||
pub fn from_environment(environment: Option<&Environment>) -> Self {
|
||||
match environment {
|
||||
Some(environment) => Self {
|
||||
exec_server_url: environment.exec_server_url().map(str::to_owned),
|
||||
local_runtime_paths: environment.local_runtime_paths().cloned(),
|
||||
disabled: false,
|
||||
current_environment: OnceCell::new(),
|
||||
},
|
||||
Some(environment) => {
|
||||
let mut environment_configs = HashMap::from([(
|
||||
LOCAL_ENVIRONMENT_ID.to_string(),
|
||||
EnvironmentConfig {
|
||||
exec_server_url: None,
|
||||
},
|
||||
)]);
|
||||
if let Some(exec_server_url) = environment.exec_server_url().map(str::to_owned) {
|
||||
environment_configs.insert(
|
||||
REMOTE_ENVIRONMENT_ID.to_string(),
|
||||
EnvironmentConfig {
|
||||
exec_server_url: Some(exec_server_url),
|
||||
},
|
||||
);
|
||||
}
|
||||
let environment_cache = environment_configs
|
||||
.keys()
|
||||
.cloned()
|
||||
.map(|environment_id| (environment_id, Arc::new(OnceCell::new())))
|
||||
.collect();
|
||||
Self {
|
||||
default_environment_id: Some(
|
||||
if environment.is_remote() {
|
||||
REMOTE_ENVIRONMENT_ID
|
||||
} else {
|
||||
LOCAL_ENVIRONMENT_ID
|
||||
}
|
||||
.to_string(),
|
||||
),
|
||||
environment_configs,
|
||||
environment_cache,
|
||||
local_runtime_paths: environment.local_runtime_paths().cloned(),
|
||||
}
|
||||
}
|
||||
None => Self {
|
||||
exec_server_url: None,
|
||||
default_environment_id: None,
|
||||
environment_configs: HashMap::new(),
|
||||
environment_cache: HashMap::new(),
|
||||
local_runtime_paths: None,
|
||||
disabled: true,
|
||||
current_environment: OnceCell::new(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the remote exec-server URL when one is configured.
|
||||
/// Returns the default remote exec-server URL when one is configured.
|
||||
pub fn exec_server_url(&self) -> Option<&str> {
|
||||
self.exec_server_url.as_deref()
|
||||
self.environment_configs
|
||||
.get(REMOTE_ENVIRONMENT_ID)
|
||||
.and_then(|config| config.exec_server_url.as_deref())
|
||||
}
|
||||
|
||||
/// Returns true when this manager is configured to use a remote exec server.
|
||||
/// Returns true when the default environment is configured to use a remote exec server.
|
||||
pub fn is_remote(&self) -> bool {
|
||||
self.exec_server_url.is_some()
|
||||
self.exec_server_url().is_some()
|
||||
}
|
||||
|
||||
/// Returns the cached environment, creating it on first access.
|
||||
pub async fn current(&self) -> Result<Option<Arc<Environment>>, ExecServerError> {
|
||||
self.current_environment
|
||||
pub async fn environment(
|
||||
&self,
|
||||
environment_id: Option<&str>,
|
||||
) -> Result<Option<Arc<Environment>>, ExecServerError> {
|
||||
let Some(environment_id) = normalized_environment_id(environment_id)
|
||||
.or_else(|| self.default_environment_id.clone())
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
self.named_environment(&environment_id).await
|
||||
}
|
||||
|
||||
async fn named_environment(
|
||||
&self,
|
||||
environment_id: &str,
|
||||
) -> Result<Option<Arc<Environment>>, ExecServerError> {
|
||||
let Some(environment_config) = self.environment_configs.get(environment_id) else {
|
||||
return Err(ExecServerError::Protocol(format!(
|
||||
"unknown environment id: {environment_id}"
|
||||
)));
|
||||
};
|
||||
let cache = self.environment_cache.get(environment_id).ok_or_else(|| {
|
||||
ExecServerError::Protocol(format!("missing environment cache: {environment_id}"))
|
||||
})?;
|
||||
cache
|
||||
.get_or_try_init(|| async {
|
||||
if self.disabled {
|
||||
Ok(None)
|
||||
} else {
|
||||
Ok(Some(Arc::new(
|
||||
Environment::create_with_runtime_paths(
|
||||
self.exec_server_url.clone(),
|
||||
self.local_runtime_paths.clone(),
|
||||
)
|
||||
.await?,
|
||||
)))
|
||||
}
|
||||
self.build_environment(environment_config.exec_server_url.clone())
|
||||
.await
|
||||
.map(Some)
|
||||
})
|
||||
.await
|
||||
.map(Option::as_ref)
|
||||
.map(std::option::Option::<&Arc<Environment>>::cloned)
|
||||
}
|
||||
|
||||
async fn build_environment(
|
||||
&self,
|
||||
exec_server_url: Option<String>,
|
||||
) -> Result<Arc<Environment>, ExecServerError> {
|
||||
Ok(Arc::new(
|
||||
Environment::create_with_runtime_paths(
|
||||
exec_server_url,
|
||||
self.local_runtime_paths.clone(),
|
||||
)
|
||||
.await?,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// Concrete execution/filesystem environment selected for a session.
|
||||
@@ -229,11 +388,18 @@ impl Environment {
|
||||
}
|
||||
}
|
||||
|
||||
fn normalize_exec_server_url(exec_server_url: Option<String>) -> (Option<String>, bool) {
|
||||
fn normalize_exec_server_url(exec_server_url: Option<String>) -> NormalizedExecServerUrl {
|
||||
match exec_server_url.as_deref().map(str::trim) {
|
||||
None | Some("") => (None, false),
|
||||
Some(url) if url.eq_ignore_ascii_case("none") => (None, true),
|
||||
Some(url) => (Some(url.to_string()), false),
|
||||
None | Some("") => NormalizedExecServerUrl::LocalOnly,
|
||||
Some(url) if url.eq_ignore_ascii_case("none") => NormalizedExecServerUrl::Disabled,
|
||||
Some(url) => NormalizedExecServerUrl::LocalAndRemote(url.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
fn normalized_environment_id(environment_id: Option<&str>) -> Option<EnvironmentId> {
|
||||
match environment_id.map(str::trim) {
|
||||
None | Some("") => None,
|
||||
Some(environment_id) => Some(environment_id.to_ascii_lowercase()),
|
||||
}
|
||||
}
|
||||
#[cfg(test)]
|
||||
@@ -260,7 +426,6 @@ mod tests {
|
||||
fn environment_manager_normalizes_empty_url() {
|
||||
let manager = EnvironmentManager::new(Some(String::new()));
|
||||
|
||||
assert!(!manager.disabled);
|
||||
assert_eq!(manager.exec_server_url(), None);
|
||||
assert!(!manager.is_remote());
|
||||
}
|
||||
@@ -269,7 +434,6 @@ mod tests {
|
||||
fn environment_manager_treats_none_value_as_disabled() {
|
||||
let manager = EnvironmentManager::new(Some("none".to_string()));
|
||||
|
||||
assert!(manager.disabled);
|
||||
assert_eq!(manager.exec_server_url(), None);
|
||||
assert!(!manager.is_remote());
|
||||
}
|
||||
@@ -283,11 +447,17 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn environment_manager_current_caches_environment() {
|
||||
async fn environment_manager_default_environment_caches_environment() {
|
||||
let manager = EnvironmentManager::new(/*exec_server_url*/ None);
|
||||
|
||||
let first = manager.current().await.expect("get current environment");
|
||||
let second = manager.current().await.expect("get current environment");
|
||||
let first = manager
|
||||
.environment(None)
|
||||
.await
|
||||
.expect("get default environment");
|
||||
let second = manager
|
||||
.environment(None)
|
||||
.await
|
||||
.expect("get default environment");
|
||||
|
||||
let first = first.expect("local environment");
|
||||
let second = second.expect("local environment");
|
||||
@@ -308,9 +478,9 @@ mod tests {
|
||||
);
|
||||
|
||||
let environment = manager
|
||||
.current()
|
||||
.environment(None)
|
||||
.await
|
||||
.expect("get current environment")
|
||||
.expect("get default environment")
|
||||
.expect("local environment");
|
||||
|
||||
assert_eq!(environment.local_runtime_paths(), Some(&runtime_paths));
|
||||
@@ -321,18 +491,182 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn disabled_environment_manager_has_no_current_environment() {
|
||||
async fn disabled_environment_manager_has_no_default_environment() {
|
||||
let manager = EnvironmentManager::new(Some("none".to_string()));
|
||||
|
||||
assert!(
|
||||
manager
|
||||
.current()
|
||||
.environment(None)
|
||||
.await
|
||||
.expect("get current environment")
|
||||
.expect("get default environment")
|
||||
.is_none()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn disabled_environment_manager_has_no_local_registration() {
|
||||
let manager = EnvironmentManager::new(Some("none".to_string()));
|
||||
|
||||
let error = manager
|
||||
.environment(Some("local"))
|
||||
.await
|
||||
.expect_err("disabled mode should not register local");
|
||||
|
||||
assert_eq!(
|
||||
error.to_string(),
|
||||
"exec-server protocol error: unknown environment id: local"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn environment_manager_defaults_to_local_when_unset() {
|
||||
let manager = EnvironmentManager::new(/*exec_server_url*/ None);
|
||||
|
||||
let environment = manager
|
||||
.environment(None)
|
||||
.await
|
||||
.expect("get default environment")
|
||||
.expect("local environment");
|
||||
|
||||
assert!(!environment.is_remote());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn local_environment_caches_environment() {
|
||||
let manager = EnvironmentManager::new(/*exec_server_url*/ None);
|
||||
|
||||
let first = manager
|
||||
.environment(Some("local"))
|
||||
.await
|
||||
.expect("get local environment")
|
||||
.expect("local environment");
|
||||
let second = manager
|
||||
.environment(Some("local"))
|
||||
.await
|
||||
.expect("get local environment")
|
||||
.expect("local environment");
|
||||
|
||||
assert!(Arc::ptr_eq(&first, &second));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn remote_environment_caches_environment() {
|
||||
let manager = EnvironmentManager::new(Some("ws://127.0.0.1:8765".to_string()));
|
||||
|
||||
let first = manager
|
||||
.environment(Some("remote"))
|
||||
.await
|
||||
.expect("get remote environment")
|
||||
.expect("remote environment");
|
||||
let second = manager
|
||||
.environment(Some("remote"))
|
||||
.await
|
||||
.expect("get remote environment")
|
||||
.expect("remote environment");
|
||||
|
||||
assert!(first.is_remote());
|
||||
assert!(Arc::ptr_eq(&first, &second));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn remote_environment_requires_registration() {
|
||||
let manager = EnvironmentManager::new(/*exec_server_url*/ None);
|
||||
|
||||
let error = manager
|
||||
.environment(Some("remote"))
|
||||
.await
|
||||
.expect_err("remote environment should require registration");
|
||||
|
||||
assert_eq!(
|
||||
error.to_string(),
|
||||
"exec-server protocol error: unknown environment id: remote"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn environment_manager_rejects_unknown_environment_id() {
|
||||
let manager = EnvironmentManager::new(/*exec_server_url*/ None);
|
||||
|
||||
let error = manager
|
||||
.environment(Some("missing"))
|
||||
.await
|
||||
.expect_err("unknown environment id should error");
|
||||
|
||||
assert_eq!(
|
||||
error.to_string(),
|
||||
"exec-server protocol error: unknown environment id: missing"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn explicit_local_environment_matches_default_when_unset() {
|
||||
let manager = EnvironmentManager::new(/*exec_server_url*/ None);
|
||||
|
||||
let first = manager
|
||||
.environment(None)
|
||||
.await
|
||||
.expect("get default environment")
|
||||
.expect("local environment");
|
||||
let second = manager
|
||||
.environment(Some("local"))
|
||||
.await
|
||||
.expect("get local environment")
|
||||
.expect("local environment");
|
||||
|
||||
assert!(Arc::ptr_eq(&first, &second));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn configured_remote_environment_matches_default() {
|
||||
let manager = EnvironmentManager::new(Some("ws://127.0.0.1:8765".to_string()));
|
||||
|
||||
let first = manager
|
||||
.environment(None)
|
||||
.await
|
||||
.expect("get default environment")
|
||||
.expect("remote environment");
|
||||
let second = manager
|
||||
.environment(Some("remote"))
|
||||
.await
|
||||
.expect("get remote environment")
|
||||
.expect("remote environment");
|
||||
|
||||
assert!(first.is_remote());
|
||||
assert!(Arc::ptr_eq(&first, &second));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn environment_manager_from_none_environment_preserves_disabled_default() {
|
||||
let manager = EnvironmentManager::from_environment(None);
|
||||
|
||||
assert!(
|
||||
manager
|
||||
.environment(None)
|
||||
.await
|
||||
.expect("get default environment")
|
||||
.is_none()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn from_remote_environment_preserves_remote_default() {
|
||||
let source_manager = EnvironmentManager::new(Some("ws://127.0.0.1:8765".to_string()));
|
||||
let environment = source_manager
|
||||
.environment(Some("remote"))
|
||||
.await
|
||||
.expect("get remote environment")
|
||||
.expect("remote environment");
|
||||
let manager = EnvironmentManager::from_environment(Some(&environment));
|
||||
|
||||
let default_environment = manager
|
||||
.environment(None)
|
||||
.await
|
||||
.expect("get default environment")
|
||||
.expect("default environment");
|
||||
|
||||
assert!(default_environment.is_remote());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn default_environment_has_ready_local_executor() {
|
||||
let environment = Environment::default();
|
||||
|
||||
Reference in New Issue
Block a user