Compare commits

...

1 Commits

Author SHA1 Message Date
mom-oai
12f7e21a9f fix: stabilize app-server windows ci diagnostics 2026-04-10 09:26:50 -07:00
6 changed files with 191 additions and 81 deletions

View File

@@ -74,14 +74,23 @@ print_bazel_test_log_tails() {
bazel_info_cmd+=("${bazel_startup_args[@]}")
fi
testlogs_dir="$(run_bazel "${bazel_info_cmd[@]:1}" info bazel-testlogs 2>/dev/null || echo bazel-testlogs)"
local failed_targets=()
while IFS= read -r target; do
local failed_test_logs=()
while IFS=$'\t' read -r target test_log; do
failed_targets+=("$target")
failed_test_logs+=("$test_log")
done < <(
grep -E '^FAIL: //' "$console_log" \
| sed -E 's#^FAIL: (//[^ ]+).*#\1#' \
| awk '
{
target = $2
test_log = ""
if (match($0, /\(see [^)]*test\.log\)/)) {
test_log = substr($0, RSTART + 5, RLENGTH - 6)
}
print target "\t" test_log
}
' \
| sort -u
)
@@ -90,10 +99,17 @@ print_bazel_test_log_tails() {
return
fi
for target in "${failed_targets[@]}"; do
local rel_path="${target#//}"
rel_path="${rel_path/:/\/}"
local test_log="${testlogs_dir}/${rel_path}/test.log"
for i in "${!failed_targets[@]}"; do
local target="${failed_targets[$i]}"
local test_log="${failed_test_logs[$i]}"
if [[ -z "$test_log" ]]; then
if [[ -z "${testlogs_dir:-}" ]]; then
testlogs_dir="$(run_bazel "${bazel_info_cmd[@]:1}" info bazel-testlogs 2>/dev/null || echo bazel-testlogs)"
fi
local rel_path="${target#//}"
rel_path="${rel_path/:/\/}"
test_log="${testlogs_dir}/${rel_path}/test.log"
fi
echo "::group::Bazel test log tail for ${target}"
if [[ -f "$test_log" ]]; then

View File

@@ -457,7 +457,6 @@ struct ListenerTaskContext {
thread_state_manager: ThreadStateManager,
outgoing: Arc<OutgoingMessageSender>,
analytics_events_client: AnalyticsEventsClient,
general_analytics_enabled: bool,
thread_watch_manager: ThreadWatchManager,
fallback_model_provider: String,
codex_home: PathBuf,
@@ -2254,7 +2253,6 @@ impl CodexMessageProcessor {
thread_state_manager: self.thread_state_manager.clone(),
outgoing: Arc::clone(&self.outgoing),
analytics_events_client: self.analytics_events_client.clone(),
general_analytics_enabled: self.config.features.enabled(Feature::GeneralAnalytics),
thread_watch_manager: self.thread_watch_manager.clone(),
fallback_model_provider: self.config.model_provider_id.clone(),
codex_home: self.config.codex_home.clone(),
@@ -2504,6 +2502,7 @@ impl CodexMessageProcessor {
.await;
return;
}
let general_analytics_enabled = thread.enabled(Feature::GeneralAnalytics);
let config_snapshot = thread
.config_snapshot()
.instrument(tracing::info_span!(
@@ -2569,7 +2568,7 @@ impl CodexMessageProcessor {
sandbox: config_snapshot.sandbox_policy.into(),
reasoning_effort: config_snapshot.reasoning_effort,
};
if listener_task_context.general_analytics_enabled {
if general_analytics_enabled {
listener_task_context
.analytics_events_client
.track_response(
@@ -5601,7 +5600,7 @@ impl CodexMessageProcessor {
}
async fn unload_thread_without_subscribers(
&mut self,
&self,
thread_id: ThreadId,
thread: Arc<CodexThread>,
) {
@@ -7463,7 +7462,6 @@ impl CodexMessageProcessor {
thread_state_manager: self.thread_state_manager.clone(),
outgoing: Arc::clone(&self.outgoing),
analytics_events_client: self.analytics_events_client.clone(),
general_analytics_enabled: self.config.features.enabled(Feature::GeneralAnalytics),
thread_watch_manager: self.thread_watch_manager.clone(),
fallback_model_provider: self.config.model_provider_id.clone(),
codex_home: self.config.codex_home.clone(),
@@ -7552,7 +7550,6 @@ impl CodexMessageProcessor {
thread_state_manager: self.thread_state_manager.clone(),
outgoing: Arc::clone(&self.outgoing),
analytics_events_client: self.analytics_events_client.clone(),
general_analytics_enabled: self.config.features.enabled(Feature::GeneralAnalytics),
thread_watch_manager: self.thread_watch_manager.clone(),
fallback_model_provider: self.config.model_provider_id.clone(),
codex_home: self.config.codex_home.clone(),
@@ -7585,7 +7582,6 @@ impl CodexMessageProcessor {
thread_manager,
thread_state_manager,
analytics_events_client: _,
general_analytics_enabled: _,
thread_watch_manager,
fallback_model_provider,
codex_home,

View File

@@ -40,11 +40,9 @@ struct AppServerArgs {
fn main() -> anyhow::Result<()> {
arg0_dispatch_or_else(|arg0_paths: Arg0DispatchPaths| async move {
let args = AppServerArgs::parse();
let managed_config_path = managed_config_path_from_debug_env();
let loader_overrides = LoaderOverrides {
managed_config_path,
..Default::default()
};
let loader_overrides = managed_config_path_from_debug_env()
.map(LoaderOverrides::with_managed_config_path_for_tests)
.unwrap_or_default();
let transport = args.listen;
let session_source = args.session_source;
let auth = args.auth.try_into_settings()?;

View File

@@ -96,9 +96,11 @@ pub struct McpProcess {
stdin: Option<ChildStdin>,
stdout: BufReader<ChildStdout>,
pending_messages: VecDeque<JSONRPCMessage>,
recent_messages: VecDeque<String>,
}
pub const DEFAULT_CLIENT_NAME: &str = "codex-app-server-tests";
const RECENT_MESSAGE_LIMIT: usize = 20;
impl McpProcess {
pub async fn new(codex_home: &Path) -> anyhow::Result<Self> {
@@ -135,6 +137,10 @@ impl McpProcess {
cmd.stderr(Stdio::piped());
cmd.current_dir(codex_home);
cmd.env("CODEX_HOME", codex_home);
cmd.env(
"CODEX_APP_SERVER_MANAGED_CONFIG_PATH",
codex_home.join("managed_config.toml"),
);
cmd.env("RUST_LOG", "info");
cmd.env_remove(CODEX_INTERNAL_ORIGINATOR_OVERRIDE_ENV_VAR);
cmd.args(args);
@@ -180,6 +186,7 @@ impl McpProcess {
stdin: Some(stdin),
stdout,
pending_messages: VecDeque::new(),
recent_messages: VecDeque::new(),
})
}
@@ -1055,6 +1062,10 @@ impl McpProcess {
self.stdout.read_line(&mut line).await?;
let message = serde_json::from_str::<JSONRPCMessage>(&line)?;
eprintln!("read message from stdout: {message:?}");
if self.recent_messages.len() == RECENT_MESSAGE_LIMIT {
self.recent_messages.pop_front();
}
self.recent_messages.push_back(format!("{message:?}"));
Ok(message)
}
@@ -1175,6 +1186,10 @@ impl McpProcess {
.collect()
}
pub fn recent_message_debugs(&self) -> Vec<String> {
self.recent_messages.iter().cloned().collect()
}
/// Reads the stream until a message matches `predicate`, buffering any non-matching messages
/// for later reads.
async fn read_stream_until_message<F>(&mut self, predicate: F) -> anyhow::Result<JSONRPCMessage>

View File

@@ -70,6 +70,10 @@ use wiremock::matchers::path;
use wiremock::matchers::path_regex;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
#[cfg(windows)]
const COMMAND_EXECUTION_TIMEOUT: Duration = Duration::from_secs(30);
#[cfg(not(windows))]
const COMMAND_EXECUTION_TIMEOUT: Duration = DEFAULT_TIMEOUT;
const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex.";
#[derive(Debug, Clone, Copy)]
@@ -385,6 +389,29 @@ impl RealtimeE2eHarness {
async fn shutdown(self) {
self.realtime_server.shutdown().await;
}
fn command_execution_failure_context(&self, phase: &str, turn_id: &str) -> String {
let sideband_requests = self
.realtime_server
.connections()
.iter()
.map(|connection| {
connection
.iter()
.map(WebSocketRequest::body_json)
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
format!(
"timed out waiting for delegated realtime shell command {phase}; \
thread_id={}; turn_id={turn_id}; pending_notifications={:?}; \
recent_jsonrpc_messages={:#?}; sideband_requests={:#?}",
self.thread_id,
self.mcp.pending_notification_methods(),
self.mcp.recent_message_debugs(),
sideband_requests,
)
}
}
fn main_loop_responses(responses: Vec<String>) -> MainLoopResponsesScript {
@@ -963,35 +990,12 @@ async fn realtime_webrtc_start_emits_sdp_notification() -> Result<()> {
"unexpected close reason: {closed_notification:?}"
);
let request = call_capture.single_request();
assert_eq!(request.url.path(), "/v1/realtime/calls");
assert_eq!(request.url.query(), None);
assert_eq!(
request
.headers
.get("content-type")
.and_then(|value| value.to_str().ok()),
Some("multipart/form-data; boundary=codex-realtime-call-boundary")
);
let body = String::from_utf8(request.body).context("multipart body should be utf-8")?;
let session = r#"{"tool_choice":"auto","type":"realtime","model":"gpt-realtime-1.5","instructions":"backend prompt\n\nstartup context","output_modalities":["audio"],"audio":{"input":{"format":{"type":"audio/pcm","rate":24000},"noise_reduction":{"type":"near_field"},"turn_detection":{"type":"server_vad","interrupt_response":true,"create_response":true}},"output":{"format":{"type":"audio/pcm","rate":24000},"voice":"marin"}},"tools":[{"type":"function","name":"background_agent","description":"Send a user request to the background agent. Use this as the default action. If the background agent is idle, this starts a new task and returns the final result to the user. If the background agent is already working on a task, this sends the request as guidance to steer that previous task. If the user asks to do something next, later, after this, or once current work finishes, call this tool so the work is actually queued instead of merely promising to do it later.","parameters":{"type":"object","properties":{"prompt":{"type":"string","description":"The user request to delegate to the background agent."}},"required":["prompt"],"additionalProperties":false}}]}"#;
assert_eq!(
body,
format!(
"--codex-realtime-call-boundary\r\n\
Content-Disposition: form-data; name=\"sdp\"\r\n\
Content-Type: application/sdp\r\n\
\r\n\
v=offer\r\n\
\r\n\
--codex-realtime-call-boundary\r\n\
Content-Disposition: form-data; name=\"session\"\r\n\
Content-Type: application/json\r\n\
\r\n\
{session}\r\n\
--codex-realtime-call-boundary--\r\n"
)
);
assert_call_create_multipart(
call_capture.single_request(),
"v=offer\r\n",
serde_json::from_str(session)?,
)?;
realtime_server.shutdown().await;
Ok(())
@@ -1338,7 +1342,22 @@ async fn webrtc_v2_tool_call_delegated_turn_can_execute_shell_tool() -> Result<(
let _ = harness.start_webrtc_realtime("v=offer\r\n").await?;
// Phase 2: observe the delegated background agent turn executing the requested shell command.
let started_command = wait_for_started_command_execution(&mut harness.mcp).await?;
let turn_started = harness
.read_notification::<TurnStartedNotification>("turn/started")
.await?;
assert_eq!(turn_started.thread_id, harness.thread_id);
let started_command =
match wait_for_started_command_execution(&mut harness.mcp, COMMAND_EXECUTION_TIMEOUT).await
{
Ok(started_command) => started_command,
Err(err) => {
return Err(err.context(
harness
.command_execution_failure_context("item/started", &turn_started.turn.id),
));
}
};
let ThreadItem::CommandExecution { id, status, .. } = started_command.item else {
unreachable!("helper returns command execution items");
};
@@ -1347,7 +1366,18 @@ async fn webrtc_v2_tool_call_delegated_turn_can_execute_shell_tool() -> Result<(
("shell_call", CommandExecutionStatus::InProgress)
);
let completed_command = wait_for_completed_command_execution(&mut harness.mcp).await?;
let completed_command =
match wait_for_completed_command_execution(&mut harness.mcp, COMMAND_EXECUTION_TIMEOUT)
.await
{
Ok(completed_command) => completed_command,
Err(err) => {
return Err(err.context(
harness
.command_execution_failure_context("item/completed", &turn_started.turn.id),
));
}
};
let ThreadItem::CommandExecution {
id,
status,
@@ -1588,11 +1618,27 @@ async fn realtime_conversation_requires_feature_flag() -> Result<()> {
}
async fn read_notification<T: DeserializeOwned>(mcp: &mut McpProcess, method: &str) -> Result<T> {
read_notification_with_timeout(mcp, method, DEFAULT_TIMEOUT).await
}
async fn read_notification_with_timeout<T: DeserializeOwned>(
mcp: &mut McpProcess,
method: &str,
read_timeout: Duration,
) -> Result<T> {
let notification = timeout(
DEFAULT_TIMEOUT,
read_timeout,
mcp.read_stream_until_notification_message(method),
)
.await??;
.await
.with_context(|| {
format!(
"timed out waiting for notification {method}; pending_notifications={:?}; \
recent_jsonrpc_messages={:#?}",
mcp.pending_notification_methods(),
mcp.recent_message_debugs(),
)
})??;
let params = notification
.params
.context("expected notification params to be present")?;
@@ -1614,9 +1660,15 @@ async fn login_with_api_key(mcp: &mut McpProcess, api_key: &str) -> Result<()> {
async fn wait_for_started_command_execution(
mcp: &mut McpProcess,
read_timeout: Duration,
) -> Result<ItemStartedNotification> {
loop {
let started = read_notification::<ItemStartedNotification>(mcp, "item/started").await?;
let started = read_notification_with_timeout::<ItemStartedNotification>(
mcp,
"item/started",
read_timeout,
)
.await?;
if let ThreadItem::CommandExecution { .. } = &started.item {
return Ok(started);
}
@@ -1625,10 +1677,15 @@ async fn wait_for_started_command_execution(
async fn wait_for_completed_command_execution(
mcp: &mut McpProcess,
read_timeout: Duration,
) -> Result<ItemCompletedNotification> {
loop {
let completed =
read_notification::<ItemCompletedNotification>(mcp, "item/completed").await?;
let completed = read_notification_with_timeout::<ItemCompletedNotification>(
mcp,
"item/completed",
read_timeout,
)
.await?;
if let ThreadItem::CommandExecution { .. } = &completed.item {
return Ok(completed);
}
@@ -1750,7 +1807,7 @@ fn assert_v2_session_update(request: &Value) -> Result<()> {
fn assert_call_create_multipart(
request: WiremockRequest,
offer_sdp: &str,
session: &str,
expected_session: Value,
) -> Result<()> {
assert_eq!(request.url.path(), "/v1/realtime/calls");
assert_eq!(request.url.query(), None);
@@ -1762,27 +1819,44 @@ fn assert_call_create_multipart(
Some("multipart/form-data; boundary=codex-realtime-call-boundary")
);
let body = String::from_utf8(request.body).context("multipart body should be utf-8")?;
assert_eq!(
body,
format!(
"--codex-realtime-call-boundary\r\n\
Content-Disposition: form-data; name=\"sdp\"\r\n\
Content-Type: application/sdp\r\n\
\r\n\
{offer_sdp}\r\n\
--codex-realtime-call-boundary\r\n\
Content-Disposition: form-data; name=\"session\"\r\n\
Content-Type: application/json\r\n\
\r\n\
{session}\r\n\
--codex-realtime-call-boundary--\r\n"
)
let prefix = format!(
"--codex-realtime-call-boundary\r\n\
Content-Disposition: form-data; name=\"sdp\"\r\n\
Content-Type: application/sdp\r\n\
\r\n\
{offer_sdp}\r\n\
--codex-realtime-call-boundary\r\n\
Content-Disposition: form-data; name=\"session\"\r\n\
Content-Type: application/json\r\n\
\r\n"
);
let suffix = "\r\n--codex-realtime-call-boundary--\r\n";
let session_json = body
.strip_prefix(&prefix)
.and_then(|body| body.strip_suffix(suffix))
.with_context(|| format!("unexpected multipart body: {body}"))?;
let session = serde_json::from_str::<Value>(session_json)?;
assert_eq!(session, expected_session);
Ok(())
}
fn v1_session_create_json() -> &'static str {
r#"{"audio":{"input":{"format":{"type":"audio/pcm","rate":24000}},"output":{"voice":"cove"}},"type":"quicksilver","model":"gpt-realtime-1.5","instructions":"backend prompt\n\nstartup context"}"#
fn v1_session_create_json() -> Value {
json!({
"audio": {
"input": {
"format": {
"type": "audio/pcm",
"rate": 24_000,
},
},
"output": {
"voice": "cove",
},
},
"type": "quicksilver",
"model": "gpt-realtime-1.5",
"instructions": "backend prompt\n\nstartup context",
})
}
fn create_config_toml(

View File

@@ -221,10 +221,25 @@ async fn thread_start_does_not_track_thread_initialized_analytics_without_featur
.await??;
let _ = to_response::<ThreadStartResponse>(resp)?;
let payload = wait_for_analytics_payload(&server, Duration::from_millis(250)).await;
tokio::time::sleep(Duration::from_millis(250)).await;
let analytics_payloads = server
.received_requests()
.await
.unwrap_or_default()
.iter()
.filter(|request| request.url.path() == "/codex/analytics-events/events")
.filter_map(|request| serde_json::from_slice::<Value>(&request.body).ok())
.collect::<Vec<_>>();
let has_thread_initialized_event = analytics_payloads.iter().any(|request| {
request["events"].as_array().is_some_and(|events| {
events
.iter()
.any(|event| event["event_type"] == "codex_thread_initialized")
})
});
assert!(
payload.is_err(),
"thread analytics should be gated off when general_analytics is disabled"
!has_thread_initialized_event,
"thread analytics should be gated off when general_analytics is disabled, got {analytics_payloads:#?}"
);
Ok(())
}
@@ -826,11 +841,7 @@ fn create_config_toml_with_chatgpt_base_url(
chatgpt_base_url: &str,
general_analytics_enabled: bool,
) -> std::io::Result<()> {
let general_analytics_toml = if general_analytics_enabled {
"\ngeneral_analytics = true".to_string()
} else {
String::new()
};
let general_analytics_toml = format!("general_analytics = {general_analytics_enabled}");
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,