mirror of
https://github.com/openai/codex.git
synced 2026-05-16 01:02:48 +00:00
Compare commits
3 Commits
canvnro/fi
...
starr/env-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fc072478b8 | ||
|
|
331d54d10b | ||
|
|
20a58943c7 |
@@ -41,6 +41,7 @@ use opentelemetry_sdk::trace::SdkTracerProvider;
|
||||
use opentelemetry_sdk::trace::SpanData;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::collections::BTreeMap;
|
||||
use std::future::Future;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::sync::OnceLock;
|
||||
@@ -503,154 +504,187 @@ where
|
||||
spans.into_iter().skip(baseline_len).collect()
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn thread_start_jsonrpc_span_exports_server_span_and_parents_children() -> Result<()> {
|
||||
let _guard = tracing_test_guard().lock().await;
|
||||
let mut harness = TracingHarness::new().await?;
|
||||
|
||||
let RemoteTrace {
|
||||
trace_id: remote_trace_id,
|
||||
parent_span_id: remote_parent_span_id,
|
||||
context: remote_trace,
|
||||
..
|
||||
} = RemoteTrace::new("00000000000000000000000000000011", "0000000000000022");
|
||||
|
||||
let _: ThreadStartResponse = harness
|
||||
.start_thread(/*request_id*/ 20_002, /*trace*/ None)
|
||||
.await;
|
||||
let untraced_spans = wait_for_exported_spans(harness.tracing, |spans| {
|
||||
spans.iter().any(|span| {
|
||||
span.span_kind == SpanKind::Server
|
||||
&& span_attr(span, "rpc.method") == Some("thread/start")
|
||||
fn run_current_thread_test_with_large_stack<F, Fut>(test: F) -> Result<()>
|
||||
where
|
||||
F: FnOnce() -> Fut + Send + 'static,
|
||||
Fut: Future<Output = Result<()>> + 'static,
|
||||
{
|
||||
let join_handle = std::thread::Builder::new()
|
||||
.stack_size(8 * 1024 * 1024)
|
||||
.spawn(move || {
|
||||
tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("current-thread runtime should build")
|
||||
.block_on(test())
|
||||
})
|
||||
})
|
||||
.await;
|
||||
let untraced_server_span = find_rpc_span_with_trace(
|
||||
&untraced_spans,
|
||||
SpanKind::Server,
|
||||
"thread/start",
|
||||
untraced_spans
|
||||
.iter()
|
||||
.rev()
|
||||
.find(|span| {
|
||||
.expect("test thread should spawn");
|
||||
|
||||
join_handle
|
||||
.join()
|
||||
.unwrap_or_else(|panic_payload| std::panic::resume_unwind(panic_payload))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn thread_start_jsonrpc_span_exports_server_span_and_parents_children() -> Result<()> {
|
||||
run_current_thread_test_with_large_stack(|| async {
|
||||
let _guard = tracing_test_guard().lock().await;
|
||||
let mut harness = TracingHarness::new().await?;
|
||||
|
||||
let RemoteTrace {
|
||||
trace_id: remote_trace_id,
|
||||
parent_span_id: remote_parent_span_id,
|
||||
context: remote_trace,
|
||||
..
|
||||
} = RemoteTrace::new("00000000000000000000000000000011", "0000000000000022");
|
||||
|
||||
let _: ThreadStartResponse = harness
|
||||
.start_thread(/*request_id*/ 20_002, /*trace*/ None)
|
||||
.await;
|
||||
let untraced_spans = wait_for_exported_spans(harness.tracing, |spans| {
|
||||
spans.iter().any(|span| {
|
||||
span.span_kind == SpanKind::Server
|
||||
&& span_attr(span, "rpc.system") == Some("jsonrpc")
|
||||
&& span_attr(span, "rpc.method") == Some("thread/start")
|
||||
})
|
||||
.unwrap_or_else(|| {
|
||||
panic!(
|
||||
"missing latest thread/start server span; exported spans:\n{}",
|
||||
format_spans(&untraced_spans)
|
||||
)
|
||||
})
|
||||
.await;
|
||||
let untraced_server_span = find_rpc_span_with_trace(
|
||||
&untraced_spans,
|
||||
SpanKind::Server,
|
||||
"thread/start",
|
||||
untraced_spans
|
||||
.iter()
|
||||
.rev()
|
||||
.find(|span| {
|
||||
span.span_kind == SpanKind::Server
|
||||
&& span_attr(span, "rpc.system") == Some("jsonrpc")
|
||||
&& span_attr(span, "rpc.method") == Some("thread/start")
|
||||
})
|
||||
.unwrap_or_else(|| {
|
||||
panic!(
|
||||
"missing latest thread/start server span; exported spans:\n{}",
|
||||
format_spans(&untraced_spans)
|
||||
)
|
||||
})
|
||||
.span_context
|
||||
.trace_id(),
|
||||
);
|
||||
assert_has_internal_descendant_at_min_depth(
|
||||
&untraced_spans,
|
||||
untraced_server_span,
|
||||
/*min_depth*/ 1,
|
||||
);
|
||||
|
||||
let baseline_len = untraced_spans.len();
|
||||
let _: ThreadStartResponse = harness
|
||||
.start_thread(/*request_id*/ 20_003, Some(remote_trace))
|
||||
.await;
|
||||
let spans = wait_for_new_exported_spans(harness.tracing, baseline_len, |spans| {
|
||||
spans.iter().any(|span| {
|
||||
span.span_kind == SpanKind::Server
|
||||
&& span_attr(span, "rpc.method") == Some("thread/start")
|
||||
&& span.span_context.trace_id() == remote_trace_id
|
||||
}) && spans.iter().any(|span| {
|
||||
span.name.as_ref() == "app_server.thread_start.notify_started"
|
||||
&& span.span_context.trace_id() == remote_trace_id
|
||||
})
|
||||
.span_context
|
||||
.trace_id(),
|
||||
);
|
||||
assert_has_internal_descendant_at_min_depth(
|
||||
&untraced_spans,
|
||||
untraced_server_span,
|
||||
/*min_depth*/ 1,
|
||||
);
|
||||
|
||||
let baseline_len = untraced_spans.len();
|
||||
let _: ThreadStartResponse = harness
|
||||
.start_thread(/*request_id*/ 20_003, Some(remote_trace))
|
||||
.await;
|
||||
let spans = wait_for_new_exported_spans(harness.tracing, baseline_len, |spans| {
|
||||
spans.iter().any(|span| {
|
||||
span.span_kind == SpanKind::Server
|
||||
&& span_attr(span, "rpc.method") == Some("thread/start")
|
||||
&& span.span_context.trace_id() == remote_trace_id
|
||||
}) && spans.iter().any(|span| {
|
||||
span.name.as_ref() == "app_server.thread_start.notify_started"
|
||||
&& span.span_context.trace_id() == remote_trace_id
|
||||
})
|
||||
.await;
|
||||
|
||||
let server_request_span =
|
||||
find_rpc_span_with_trace(&spans, SpanKind::Server, "thread/start", remote_trace_id);
|
||||
assert_eq!(server_request_span.name.as_ref(), "thread/start");
|
||||
assert_eq!(server_request_span.parent_span_id, remote_parent_span_id);
|
||||
assert!(server_request_span.parent_span_is_remote);
|
||||
assert_eq!(server_request_span.span_context.trace_id(), remote_trace_id);
|
||||
assert_ne!(server_request_span.span_context.span_id(), SpanId::INVALID);
|
||||
assert_has_internal_descendant_at_min_depth(
|
||||
&spans,
|
||||
server_request_span,
|
||||
/*min_depth*/ 1,
|
||||
);
|
||||
assert_has_internal_descendant_at_min_depth(
|
||||
&spans,
|
||||
server_request_span,
|
||||
/*min_depth*/ 2,
|
||||
);
|
||||
harness.shutdown().await;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await;
|
||||
|
||||
let server_request_span =
|
||||
find_rpc_span_with_trace(&spans, SpanKind::Server, "thread/start", remote_trace_id);
|
||||
assert_eq!(server_request_span.name.as_ref(), "thread/start");
|
||||
assert_eq!(server_request_span.parent_span_id, remote_parent_span_id);
|
||||
assert!(server_request_span.parent_span_is_remote);
|
||||
assert_eq!(server_request_span.span_context.trace_id(), remote_trace_id);
|
||||
assert_ne!(server_request_span.span_context.span_id(), SpanId::INVALID);
|
||||
assert_has_internal_descendant_at_min_depth(&spans, server_request_span, /*min_depth*/ 1);
|
||||
assert_has_internal_descendant_at_min_depth(&spans, server_request_span, /*min_depth*/ 2);
|
||||
harness.shutdown().await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn turn_start_jsonrpc_span_parents_core_turn_spans() -> Result<()> {
|
||||
let _guard = tracing_test_guard().lock().await;
|
||||
let mut harness = TracingHarness::new().await?;
|
||||
let thread_start_response = harness.start_thread(/*request_id*/ 2, /*trace*/ None).await;
|
||||
let thread_id = thread_start_response.thread.id.clone();
|
||||
#[test]
|
||||
fn turn_start_jsonrpc_span_parents_core_turn_spans() -> Result<()> {
|
||||
run_current_thread_test_with_large_stack(|| async {
|
||||
let _guard = tracing_test_guard().lock().await;
|
||||
let mut harness = TracingHarness::new().await?;
|
||||
let thread_start_response = harness.start_thread(/*request_id*/ 2, /*trace*/ None).await;
|
||||
let thread_id = thread_start_response.thread.id.clone();
|
||||
|
||||
harness.reset_tracing();
|
||||
harness.reset_tracing();
|
||||
|
||||
let RemoteTrace {
|
||||
trace_id: remote_trace_id,
|
||||
parent_span_id: remote_parent_span_id,
|
||||
context: remote_trace,
|
||||
} = RemoteTrace::new("00000000000000000000000000000077", "0000000000000088");
|
||||
let turn_start_response: TurnStartResponse = harness
|
||||
.request(
|
||||
ClientRequest::TurnStart {
|
||||
request_id: RequestId::Integer(3),
|
||||
params: TurnStartParams {
|
||||
thread_id,
|
||||
input: vec![UserInput::Text {
|
||||
text: "hello".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
responsesapi_client_metadata: None,
|
||||
cwd: None,
|
||||
approval_policy: None,
|
||||
sandbox_policy: None,
|
||||
approvals_reviewer: None,
|
||||
model: None,
|
||||
service_tier: None,
|
||||
effort: None,
|
||||
summary: None,
|
||||
personality: None,
|
||||
output_schema: None,
|
||||
collaboration_mode: None,
|
||||
let RemoteTrace {
|
||||
trace_id: remote_trace_id,
|
||||
parent_span_id: remote_parent_span_id,
|
||||
context: remote_trace,
|
||||
} = RemoteTrace::new("00000000000000000000000000000077", "0000000000000088");
|
||||
let turn_start_response: TurnStartResponse = harness
|
||||
.request(
|
||||
ClientRequest::TurnStart {
|
||||
request_id: RequestId::Integer(3),
|
||||
params: TurnStartParams {
|
||||
thread_id,
|
||||
input: vec![UserInput::Text {
|
||||
text: "hello".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
responsesapi_client_metadata: None,
|
||||
cwd: None,
|
||||
approval_policy: None,
|
||||
sandbox_policy: None,
|
||||
approvals_reviewer: None,
|
||||
model: None,
|
||||
service_tier: None,
|
||||
effort: None,
|
||||
summary: None,
|
||||
personality: None,
|
||||
output_schema: None,
|
||||
collaboration_mode: None,
|
||||
},
|
||||
},
|
||||
},
|
||||
Some(remote_trace),
|
||||
)
|
||||
.await;
|
||||
let spans = wait_for_exported_spans(harness.tracing, |spans| {
|
||||
spans.iter().any(|span| {
|
||||
span.span_kind == SpanKind::Server
|
||||
&& span_attr(span, "rpc.method") == Some("turn/start")
|
||||
&& span.span_context.trace_id() == remote_trace_id
|
||||
}) && spans.iter().any(|span| {
|
||||
span_attr(span, "codex.op") == Some("user_input")
|
||||
&& span.span_context.trace_id() == remote_trace_id
|
||||
Some(remote_trace),
|
||||
)
|
||||
.await;
|
||||
let spans = wait_for_exported_spans(harness.tracing, |spans| {
|
||||
spans.iter().any(|span| {
|
||||
span.span_kind == SpanKind::Server
|
||||
&& span_attr(span, "rpc.method") == Some("turn/start")
|
||||
&& span.span_context.trace_id() == remote_trace_id
|
||||
}) && spans.iter().any(|span| {
|
||||
span_attr(span, "codex.op") == Some("user_input")
|
||||
&& span.span_context.trace_id() == remote_trace_id
|
||||
})
|
||||
})
|
||||
.await;
|
||||
|
||||
let server_request_span =
|
||||
find_rpc_span_with_trace(&spans, SpanKind::Server, "turn/start", remote_trace_id);
|
||||
let core_turn_span =
|
||||
find_span_with_trace(&spans, remote_trace_id, "codex.op=user_input", |span| {
|
||||
span_attr(span, "codex.op") == Some("user_input")
|
||||
});
|
||||
|
||||
assert_eq!(server_request_span.parent_span_id, remote_parent_span_id);
|
||||
assert!(server_request_span.parent_span_is_remote);
|
||||
assert_eq!(server_request_span.span_context.trace_id(), remote_trace_id);
|
||||
assert_eq!(
|
||||
span_attr(server_request_span, "turn.id"),
|
||||
Some(turn_start_response.turn.id.as_str())
|
||||
);
|
||||
assert_span_descends_from(&spans, core_turn_span, server_request_span);
|
||||
harness.shutdown().await;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await;
|
||||
|
||||
let server_request_span =
|
||||
find_rpc_span_with_trace(&spans, SpanKind::Server, "turn/start", remote_trace_id);
|
||||
let core_turn_span =
|
||||
find_span_with_trace(&spans, remote_trace_id, "codex.op=user_input", |span| {
|
||||
span_attr(span, "codex.op") == Some("user_input")
|
||||
});
|
||||
|
||||
assert_eq!(server_request_span.parent_span_id, remote_parent_span_id);
|
||||
assert!(server_request_span.parent_span_is_remote);
|
||||
assert_eq!(server_request_span.span_context.trace_id(), remote_trace_id);
|
||||
assert_eq!(
|
||||
span_attr(server_request_span, "turn.id"),
|
||||
Some(turn_start_response.turn.id.as_str())
|
||||
);
|
||||
assert_span_descends_from(&spans, core_turn_span, server_request_span);
|
||||
harness.shutdown().await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@ use codex_protocol::models::FunctionCallOutputPayload;
|
||||
use codex_protocol::models::MessagePhase;
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_rollout::read_session_meta_line;
|
||||
use codex_rollout::state_db;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use codex_utils_stream_parser::strip_proposed_plan_blocks;
|
||||
@@ -153,8 +154,34 @@ async fn maybe_mark_thread_memory_mode_polluted_from_web_search(
|
||||
{
|
||||
return;
|
||||
}
|
||||
let Some(state_db_ctx) = sess.services.state_db.as_deref() else {
|
||||
return;
|
||||
};
|
||||
if state_db_ctx
|
||||
.get_thread_memory_mode(sess.conversation_id)
|
||||
.await
|
||||
.ok()
|
||||
.flatten()
|
||||
.is_none()
|
||||
&& let Some(rollout_path) = sess.current_rollout_path().await
|
||||
&& let Ok(session_meta_line) = read_session_meta_line(rollout_path.as_path()).await
|
||||
{
|
||||
state_db::apply_rollout_items(
|
||||
Some(state_db_ctx),
|
||||
rollout_path.as_path(),
|
||||
turn_context.config.model_provider_id.as_str(),
|
||||
/*builder*/ None,
|
||||
&[codex_protocol::protocol::RolloutItem::SessionMeta(
|
||||
session_meta_line,
|
||||
)],
|
||||
"record_completed_response_item.ensure_web_search_thread",
|
||||
/*new_thread_memory_mode*/ None,
|
||||
/*updated_at_override*/ None,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
state_db::mark_thread_memory_mode_polluted(
|
||||
sess.services.state_db.as_deref(),
|
||||
Some(state_db_ctx),
|
||||
sess.conversation_id,
|
||||
"record_completed_response_item",
|
||||
)
|
||||
|
||||
@@ -10,9 +10,11 @@ use std::time::Duration;
|
||||
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::FileSystemPermissions;
|
||||
use codex_protocol::models::FunctionCallOutputContentItem;
|
||||
use codex_protocol::models::FunctionCallOutputPayload;
|
||||
use codex_protocol::models::ImageDetail;
|
||||
use codex_protocol::models::PermissionProfile;
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
@@ -48,6 +50,7 @@ use codex_sandboxing::SandboxablePreference;
|
||||
use codex_tools::ResponsesApiNamespaceTool;
|
||||
use codex_tools::ToolName;
|
||||
use codex_tools::ToolSpec;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use codex_utils_output_truncation::TruncationPolicy;
|
||||
use codex_utils_output_truncation::truncate_text;
|
||||
|
||||
@@ -1013,11 +1016,17 @@ impl JsReplManager {
|
||||
.write_kernel_script()
|
||||
.await
|
||||
.map_err(|err| err.to_string())?;
|
||||
let js_tmp_dir = AbsolutePathBuf::from_absolute_path(self.tmp_dir.path().to_path_buf())
|
||||
.map_err(|err| format!("failed to resolve js_repl temp dir: {err}"))?;
|
||||
|
||||
let mut env = create_env(&turn.shell_environment_policy, thread_id);
|
||||
if !dependency_env.is_empty() {
|
||||
env.extend(dependency_env.clone());
|
||||
}
|
||||
env.insert(
|
||||
"TMPDIR".to_string(),
|
||||
self.tmp_dir.path().to_string_lossy().to_string(),
|
||||
);
|
||||
env.insert(
|
||||
"CODEX_JS_TMP_DIR".to_string(),
|
||||
self.tmp_dir.path().to_string_lossy().to_string(),
|
||||
@@ -1054,7 +1063,13 @@ impl JsReplManager {
|
||||
],
|
||||
cwd: turn.cwd.clone(),
|
||||
env,
|
||||
additional_permissions: None,
|
||||
additional_permissions: Some(PermissionProfile {
|
||||
network: None,
|
||||
file_system: Some(FileSystemPermissions {
|
||||
read: None,
|
||||
write: Some(vec![js_tmp_dir]),
|
||||
}),
|
||||
}),
|
||||
};
|
||||
let options = ExecOptions {
|
||||
expiration: ExecExpiration::DefaultTimeout,
|
||||
@@ -1970,6 +1985,7 @@ pub(crate) async fn resolve_compatible_node(config_path: Option<&Path>) -> Resul
|
||||
let node_path = resolve_node(config_path).ok_or_else(|| {
|
||||
"Node runtime not found; install Node or set CODEX_JS_REPL_NODE_PATH".to_string()
|
||||
})?;
|
||||
let node_path = materialize_dotslash_path(node_path).await?;
|
||||
ensure_node_version(&node_path).await?;
|
||||
Ok(node_path)
|
||||
}
|
||||
@@ -1995,6 +2011,62 @@ pub(crate) fn resolve_node(config_path: Option<&Path>) -> Option<PathBuf> {
|
||||
None
|
||||
}
|
||||
|
||||
async fn materialize_dotslash_path(path: PathBuf) -> Result<PathBuf, String> {
|
||||
if !is_dotslash_manifest(&path).await? {
|
||||
return Ok(path);
|
||||
}
|
||||
|
||||
let output = tokio::process::Command::new("dotslash")
|
||||
.arg("--")
|
||||
.arg("fetch")
|
||||
.arg(&path)
|
||||
.output()
|
||||
.await
|
||||
.map_err(|err| format!("failed to run dotslash for {}: {err}", path.display()))?;
|
||||
if !output.status.success() {
|
||||
let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
|
||||
return Err(format!(
|
||||
"dotslash fetch failed for {}: {stderr}",
|
||||
path.display()
|
||||
));
|
||||
}
|
||||
|
||||
let fetched_path = String::from_utf8(output.stdout)
|
||||
.map_err(|err| {
|
||||
format!(
|
||||
"dotslash fetch output was not utf8 for {}: {err}",
|
||||
path.display()
|
||||
)
|
||||
})?
|
||||
.trim()
|
||||
.to_string();
|
||||
if fetched_path.is_empty() {
|
||||
return Err(format!(
|
||||
"dotslash fetch output was empty for {}",
|
||||
path.display()
|
||||
));
|
||||
}
|
||||
|
||||
let fetched_path = PathBuf::from(fetched_path);
|
||||
if !fetched_path.is_file() {
|
||||
return Err(format!(
|
||||
"dotslash returned non-file path for {}: {}",
|
||||
path.display(),
|
||||
fetched_path.display()
|
||||
));
|
||||
}
|
||||
|
||||
Ok(fetched_path)
|
||||
}
|
||||
|
||||
async fn is_dotslash_manifest(path: &Path) -> Result<bool, String> {
|
||||
let bytes = tokio::fs::read(path)
|
||||
.await
|
||||
.map_err(|err| format!("failed to read Node runtime path {}: {err}", path.display()))?;
|
||||
let prefix = String::from_utf8_lossy(&bytes[..bytes.len().min(128)]);
|
||||
Ok(prefix.starts_with("#!/usr/bin/env dotslash"))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[path = "mod_tests.rs"]
|
||||
mod tests;
|
||||
|
||||
@@ -57,8 +57,6 @@ use wiremock::matchers::method;
|
||||
use wiremock::matchers::path_regex;
|
||||
|
||||
const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex.";
|
||||
const STARTUP_CONTEXT_OPEN_TAG: &str = "<startup_context>";
|
||||
const STARTUP_CONTEXT_CLOSE_TAG: &str = "</startup_context>";
|
||||
const REALTIME_BACKEND_PROMPT: &str = include_str!("../../templates/realtime/backend_prompt.md");
|
||||
const USER_FIRST_NAME_PLACEHOLDER: &str = "{{ user_first_name }}";
|
||||
const MEMORY_PROMPT_PHRASE: &str =
|
||||
@@ -1019,6 +1017,16 @@ async fn conversation_uses_experimental_realtime_ws_base_url_override() -> Resul
|
||||
}))
|
||||
.await?;
|
||||
|
||||
let started = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationStarted(started) => Some(Ok(started.clone())),
|
||||
EventMsg::Error(err) => Some(Err(err.clone())),
|
||||
_ => None,
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
|
||||
assert!(started.session_id.is_some());
|
||||
assert_eq!(started.version, RealtimeConversationVersion::V1);
|
||||
|
||||
let session_updated = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
@@ -1532,8 +1540,6 @@ async fn conversation_start_injects_startup_context_from_thread_history() -> Res
|
||||
let startup_context = websocket_request_instructions(&startup_context_request)
|
||||
.expect("startup context request should contain instructions");
|
||||
|
||||
assert!(startup_context.contains(STARTUP_CONTEXT_OPEN_TAG));
|
||||
assert!(startup_context.contains(STARTUP_CONTEXT_CLOSE_TAG));
|
||||
assert!(startup_context.contains(STARTUP_CONTEXT_HEADER));
|
||||
assert!(!startup_context.contains("## User"));
|
||||
assert!(startup_context.contains("### "));
|
||||
@@ -1751,8 +1757,6 @@ async fn conversation_startup_context_falls_back_to_workspace_map() -> Result<()
|
||||
let startup_context = websocket_request_instructions(&startup_context_request)
|
||||
.expect("startup context request should contain instructions");
|
||||
|
||||
assert!(startup_context.contains(STARTUP_CONTEXT_OPEN_TAG));
|
||||
assert!(startup_context.contains(STARTUP_CONTEXT_CLOSE_TAG));
|
||||
assert!(startup_context.contains(STARTUP_CONTEXT_HEADER));
|
||||
assert!(startup_context.contains("## Machine / Workspace Map"));
|
||||
assert!(startup_context.contains("notes.txt"));
|
||||
@@ -1807,8 +1811,6 @@ async fn conversation_startup_context_is_truncated_and_sent_once_per_start() ->
|
||||
.await;
|
||||
let startup_context = websocket_request_instructions(&startup_context_request)
|
||||
.expect("startup context request should contain instructions");
|
||||
assert!(startup_context.contains(STARTUP_CONTEXT_OPEN_TAG));
|
||||
assert!(startup_context.contains(STARTUP_CONTEXT_CLOSE_TAG));
|
||||
assert!(startup_context.contains(STARTUP_CONTEXT_HEADER));
|
||||
assert!(startup_context.len() <= 20_500);
|
||||
|
||||
@@ -1887,7 +1889,6 @@ async fn conversation_user_text_turn_is_sent_to_realtime_when_active() -> Result
|
||||
assert_eq!(session_updated, "sess_user_text");
|
||||
|
||||
let user_text = "typed follow-up for realtime";
|
||||
let prefixed_user_text = format!("[USER] {user_text}");
|
||||
test.codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
@@ -1907,7 +1908,7 @@ async fn conversation_user_text_turn_is_sent_to_realtime_when_active() -> Result
|
||||
let realtime_text_request = wait_for_matching_websocket_request(
|
||||
&realtime_server,
|
||||
"normal user turn text mirrored to realtime",
|
||||
|request| websocket_request_text(request).as_deref() == Some(prefixed_user_text.as_str()),
|
||||
|request| websocket_request_text(request).as_deref() == Some(user_text),
|
||||
)
|
||||
.await;
|
||||
let model_user_texts = response_mock.single_request().message_input_texts("user");
|
||||
@@ -1916,7 +1917,7 @@ async fn conversation_user_text_turn_is_sent_to_realtime_when_active() -> Result
|
||||
model_user_texts.iter().any(|text| text == user_text),
|
||||
websocket_request_text(&realtime_text_request),
|
||||
),
|
||||
(true, Some(prefixed_user_text)),
|
||||
(true, Some(user_text.to_string())),
|
||||
);
|
||||
let realtime_response_create = timeout(Duration::from_millis(200), async {
|
||||
wait_for_matching_websocket_request(
|
||||
@@ -2698,7 +2699,7 @@ async fn inbound_conversation_item_does_not_start_turn_and_still_forwards_audio(
|
||||
.await;
|
||||
|
||||
let audio_out = tokio::time::timeout(
|
||||
Duration::from_millis(500),
|
||||
Duration::from_secs(2),
|
||||
wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::AudioOut(frame),
|
||||
|
||||
@@ -70,15 +70,28 @@ async fn build_codex_with_test_tool(server: &wiremock::MockServer) -> anyhow::Re
|
||||
builder.build(server).await
|
||||
}
|
||||
|
||||
fn assert_parallel_duration(actual: Duration) {
|
||||
// Allow headroom for slow CI scheduling; barrier synchronization already enforces overlap.
|
||||
fn assert_parallel_duration_under(actual: Duration, limit: Duration) {
|
||||
assert!(
|
||||
actual < Duration::from_millis(1_600),
|
||||
"expected parallel execution to finish quickly, got {actual:?}"
|
||||
actual < limit,
|
||||
"expected parallel execution to finish under {limit:?}, got {actual:?}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
fn assert_shell_function_call_succeeded(
|
||||
req: &core_test_support::responses::ResponsesRequest,
|
||||
call_id: &str,
|
||||
) {
|
||||
let (content, success) = req
|
||||
.function_call_output_content_and_success(call_id)
|
||||
.unwrap_or_else(|| panic!("shell output missing for {call_id}"));
|
||||
assert_eq!(
|
||||
success,
|
||||
Some(true),
|
||||
"expected successful shell output for {call_id}, got content={content:?} success={success:?}",
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn read_file_tools_run_in_parallel() -> anyhow::Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
@@ -126,7 +139,7 @@ async fn read_file_tools_run_in_parallel() -> anyhow::Result<()> {
|
||||
ev_assistant_message("msg-1", "done"),
|
||||
ev_completed("resp-2"),
|
||||
]);
|
||||
mount_sse_sequence(
|
||||
let response_mock = mount_sse_sequence(
|
||||
&server,
|
||||
vec![warmup_first, warmup_second, first_response, second_response],
|
||||
)
|
||||
@@ -135,12 +148,23 @@ async fn read_file_tools_run_in_parallel() -> anyhow::Result<()> {
|
||||
run_turn(&test, "warm up parallel tool").await?;
|
||||
|
||||
let duration = run_turn_and_measure(&test, "exercise sync tool").await?;
|
||||
assert_parallel_duration(duration);
|
||||
assert_parallel_duration_under(duration, Duration::from_secs(6));
|
||||
let req = response_mock
|
||||
.last_request()
|
||||
.expect("parallel sync tool run should send a completion request");
|
||||
assert_eq!(
|
||||
req.function_call_output_text("call-1").as_deref(),
|
||||
Some("ok")
|
||||
);
|
||||
assert_eq!(
|
||||
req.function_call_output_text("call-2").as_deref(),
|
||||
Some("ok")
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn shell_tools_run_in_parallel() -> anyhow::Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
@@ -148,15 +172,39 @@ async fn shell_tools_run_in_parallel() -> anyhow::Result<()> {
|
||||
let mut builder = test_codex().with_model("gpt-5.1");
|
||||
let test = builder.build(&server).await?;
|
||||
|
||||
let shell_args = json!({
|
||||
"command": "sleep 0.25",
|
||||
let warmup_shell_args = json!({
|
||||
"command": "sleep 0.01",
|
||||
// Avoid user-specific shell startup cost (e.g. zsh profile scripts) in timing assertions.
|
||||
"login": false,
|
||||
"timeout_ms": 1_000,
|
||||
});
|
||||
let shell_args = json!({
|
||||
"command": "sleep 5",
|
||||
// Avoid user-specific shell startup cost (e.g. zsh profile scripts) in timing assertions.
|
||||
"login": false,
|
||||
"timeout_ms": 15_000,
|
||||
});
|
||||
let shell_args_two = json!({
|
||||
"command": "sleep 5",
|
||||
// Avoid user-specific shell startup cost (e.g. zsh profile scripts) in timing assertions.
|
||||
"login": false,
|
||||
"timeout_ms": 15_000,
|
||||
});
|
||||
let warmup_args_one = serde_json::to_string(&warmup_shell_args)?;
|
||||
let warmup_args_two = serde_json::to_string(&warmup_shell_args)?;
|
||||
let args_one = serde_json::to_string(&shell_args)?;
|
||||
let args_two = serde_json::to_string(&shell_args)?;
|
||||
let args_two = serde_json::to_string(&shell_args_two)?;
|
||||
|
||||
let warmup_first_response = sse(vec![
|
||||
json!({"type": "response.created", "response": {"id": "resp-warm-1"}}),
|
||||
ev_function_call("warm-call-1", "shell_command", &warmup_args_one),
|
||||
ev_function_call("warm-call-2", "shell_command", &warmup_args_two),
|
||||
ev_completed("resp-warm-1"),
|
||||
]);
|
||||
let warmup_second_response = sse(vec![
|
||||
ev_assistant_message("warm-msg-1", "warmup done"),
|
||||
ev_completed("resp-warm-2"),
|
||||
]);
|
||||
let first_response = sse(vec![
|
||||
json!({"type": "response.created", "response": {"id": "resp-1"}}),
|
||||
ev_function_call("call-1", "shell_command", &args_one),
|
||||
@@ -167,25 +215,50 @@ async fn shell_tools_run_in_parallel() -> anyhow::Result<()> {
|
||||
ev_assistant_message("msg-1", "done"),
|
||||
ev_completed("resp-2"),
|
||||
]);
|
||||
mount_sse_sequence(&server, vec![first_response, second_response]).await;
|
||||
let response_mock = mount_sse_sequence(
|
||||
&server,
|
||||
vec![
|
||||
warmup_first_response,
|
||||
warmup_second_response,
|
||||
first_response,
|
||||
second_response,
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
run_turn(&test, "warm up shell_command in parallel").await?;
|
||||
let duration = run_turn_and_measure(&test, "run shell_command twice").await?;
|
||||
assert_parallel_duration(duration);
|
||||
assert_parallel_duration_under(duration, Duration::from_millis(8_500));
|
||||
let req = response_mock
|
||||
.last_request()
|
||||
.expect("parallel shell run should send a completion request");
|
||||
assert_shell_function_call_succeeded(&req, "call-1");
|
||||
assert_shell_function_call_succeeded(&req, "call-2");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn mixed_parallel_tools_run_in_parallel() -> anyhow::Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let test = build_codex_with_test_tool(&server).await?;
|
||||
|
||||
let warmup_sync_args = json!({
|
||||
"sleep_after_ms": 10
|
||||
})
|
||||
.to_string();
|
||||
let sync_args = json!({
|
||||
"sleep_after_ms": 300
|
||||
})
|
||||
.to_string();
|
||||
let warmup_shell_args = serde_json::to_string(&json!({
|
||||
"command": "sleep 0.01",
|
||||
// Avoid user-specific shell startup cost in timing assertions.
|
||||
"login": false,
|
||||
"timeout_ms": 1_000,
|
||||
}))?;
|
||||
let shell_args = serde_json::to_string(&json!({
|
||||
"command": "sleep 0.25",
|
||||
// Avoid user-specific shell startup cost in timing assertions.
|
||||
@@ -193,6 +266,16 @@ async fn mixed_parallel_tools_run_in_parallel() -> anyhow::Result<()> {
|
||||
"timeout_ms": 1_000,
|
||||
}))?;
|
||||
|
||||
let warmup_first_response = sse(vec![
|
||||
json!({"type": "response.created", "response": {"id": "resp-warm-1"}}),
|
||||
ev_function_call("warm-call-1", "test_sync_tool", &warmup_sync_args),
|
||||
ev_function_call("warm-call-2", "shell_command", &warmup_shell_args),
|
||||
ev_completed("resp-warm-1"),
|
||||
]);
|
||||
let warmup_second_response = sse(vec![
|
||||
ev_assistant_message("warm-msg-1", "warmup done"),
|
||||
ev_completed("resp-warm-2"),
|
||||
]);
|
||||
let first_response = sse(vec![
|
||||
json!({"type": "response.created", "response": {"id": "resp-1"}}),
|
||||
ev_function_call("call-1", "test_sync_tool", &sync_args),
|
||||
@@ -203,8 +286,18 @@ async fn mixed_parallel_tools_run_in_parallel() -> anyhow::Result<()> {
|
||||
ev_assistant_message("msg-1", "done"),
|
||||
ev_completed("resp-2"),
|
||||
]);
|
||||
mount_sse_sequence(&server, vec![first_response, second_response]).await;
|
||||
mount_sse_sequence(
|
||||
&server,
|
||||
vec![
|
||||
warmup_first_response,
|
||||
warmup_second_response,
|
||||
first_response,
|
||||
second_response,
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
run_turn(&test, "warm up mixed tools").await?;
|
||||
let duration = run_turn_and_measure(&test, "mix tools").await?;
|
||||
assert_parallel_duration(duration);
|
||||
|
||||
|
||||
@@ -974,11 +974,10 @@ async fn unified_exec_terminal_interaction_captures_delayed_output() -> Result<(
|
||||
"begin event should include process_id for a live session"
|
||||
);
|
||||
|
||||
// We expect three terminal interactions matching the three write_stdin calls.
|
||||
assert_eq!(
|
||||
terminal_events.len(),
|
||||
3,
|
||||
"expected three terminal interactions; got {terminal_events:?}"
|
||||
// Delayed polls may coalesce, but we should still observe repeated terminal interaction events.
|
||||
assert!(
|
||||
terminal_events.len() >= 2,
|
||||
"expected at least two terminal interactions; got {terminal_events:?}"
|
||||
);
|
||||
|
||||
for event in &terminal_events {
|
||||
@@ -990,8 +989,8 @@ async fn unified_exec_terminal_interaction_captures_delayed_output() -> Result<(
|
||||
.iter()
|
||||
.map(|ev| ev.stdin.as_str())
|
||||
.collect::<Vec<_>>(),
|
||||
vec!["x", "x", "x"],
|
||||
"terminal interactions should reflect the three stdin polls"
|
||||
vec!["x"; terminal_events.len()],
|
||||
"terminal interactions should reflect repeated stdin polls"
|
||||
);
|
||||
|
||||
assert!(
|
||||
|
||||
Reference in New Issue
Block a user