mirror of
https://github.com/openai/codex.git
synced 2026-05-17 17:53:06 +00:00
Compare commits
15 Commits
xli-codex/
...
starr/wind
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5815dd6a4b | ||
|
|
296fa6df0c | ||
|
|
64c684bd57 | ||
|
|
ce5d84e43a | ||
|
|
926b8d77cd | ||
|
|
7cd5127421 | ||
|
|
6a2ce743f1 | ||
|
|
32deb67fc6 | ||
|
|
59d9e96d66 | ||
|
|
097e3ef949 | ||
|
|
f3afa1132d | ||
|
|
a666109389 | ||
|
|
16648c8d1c | ||
|
|
7d2c8dbec4 | ||
|
|
bfe33e5a7a |
38
.github/workflows/rust-ci-full.yml
vendored
38
.github/workflows/rust-ci-full.yml
vendored
@@ -1,10 +1,21 @@
|
||||
name: rust-ci-full
|
||||
run-name: >-
|
||||
rust-ci-full${{
|
||||
github.event_name == 'workflow_dispatch' &&
|
||||
format(' windows-nextest-{0}', inputs.windows_nextest_threads) ||
|
||||
''
|
||||
}}
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- main
|
||||
- "**full-ci**"
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
windows_nextest_threads:
|
||||
description: "Optional nextest --test-threads override for Windows test jobs"
|
||||
required: false
|
||||
type: string
|
||||
|
||||
# CI builds in debug (dev) for faster signal.
|
||||
|
||||
@@ -510,10 +521,10 @@ jobs:
|
||||
tests:
|
||||
name: Tests — ${{ matrix.runner }} - ${{ matrix.target }}${{ matrix.remote_env == 'true' && ' (remote)' || '' }}
|
||||
runs-on: ${{ matrix.runs_on || matrix.runner }}
|
||||
# Perhaps we can bring this back down to 30m once we finish the cutover
|
||||
# from tui_app_server/ to tui/. Incidentally, windows-arm64 was the main
|
||||
# offender for exceeding the timeout.
|
||||
timeout-minutes: 45
|
||||
# Perhaps we can bring this back down once we finish the cutover from
|
||||
# tui_app_server/ to tui/. Incidentally, windows-arm64 was the main offender
|
||||
# for exceeding the timeout.
|
||||
timeout-minutes: ${{ matrix.timeout_minutes || 45 }}
|
||||
defaults:
|
||||
run:
|
||||
working-directory: codex-rs
|
||||
@@ -524,6 +535,7 @@ jobs:
|
||||
USE_SCCACHE: ${{ (startsWith(matrix.runner, 'windows') || (matrix.runner == 'macos-15-xlarge' && matrix.target == 'x86_64-apple-darwin')) && 'false' || 'true' }}
|
||||
CARGO_INCREMENTAL: "0"
|
||||
SCCACHE_CACHE_SIZE: 10G
|
||||
WINDOWS_NEXTEST_THREADS: ${{ github.event_name == 'workflow_dispatch' && inputs.windows_nextest_threads || '' }}
|
||||
|
||||
strategy:
|
||||
fail-fast: false
|
||||
@@ -554,6 +566,7 @@ jobs:
|
||||
- runner: windows-arm64
|
||||
target: aarch64-pc-windows-msvc
|
||||
profile: dev
|
||||
timeout_minutes: 75
|
||||
runs_on:
|
||||
group: codex-runners
|
||||
labels: codex-windows-arm64
|
||||
@@ -640,8 +653,7 @@ jobs:
|
||||
|
||||
- uses: taiki-e/install-action@44c6d64aa62cd779e873306675c7a58e86d6d532 # v2.62.49
|
||||
with:
|
||||
tool: nextest
|
||||
version: 0.9.103
|
||||
tool: nextest@0.9.103
|
||||
|
||||
- name: Enable unprivileged user namespaces (Linux)
|
||||
if: runner.os == 'Linux'
|
||||
@@ -666,7 +678,19 @@ jobs:
|
||||
|
||||
- name: tests
|
||||
id: test
|
||||
run: cargo nextest run --no-fail-fast --target ${{ matrix.target }} --cargo-profile ci-test --timings
|
||||
shell: bash
|
||||
run: |
|
||||
set -euo pipefail
|
||||
nextest_args=(
|
||||
--no-fail-fast
|
||||
--target "${{ matrix.target }}"
|
||||
--cargo-profile ci-test
|
||||
--timings
|
||||
)
|
||||
if [[ "${{ runner.os }}" == "Windows" && -n "${WINDOWS_NEXTEST_THREADS}" ]]; then
|
||||
nextest_args+=(--test-threads "${WINDOWS_NEXTEST_THREADS}")
|
||||
fi
|
||||
cargo nextest run "${nextest_args[@]}"
|
||||
env:
|
||||
RUST_BACKTRACE: 1
|
||||
RUST_MIN_STACK: "8388608" # 8 MiB
|
||||
|
||||
@@ -14,6 +14,9 @@ max-threads = 1
|
||||
[test-groups.windows_sandbox_legacy_sessions]
|
||||
max-threads = 1
|
||||
|
||||
[test-groups.windows_process_heavy]
|
||||
max-threads = 1
|
||||
|
||||
[[profile.default.overrides]]
|
||||
# Do not add new tests here
|
||||
filter = 'test(rmcp_client) | test(humanlike_typing_1000_chars_appears_live_no_placeholder)'
|
||||
@@ -27,6 +30,41 @@ slow-timeout = { period = "30s", terminate-after = 2 }
|
||||
filter = 'package(codex-app-server-protocol) & (test(typescript_schema_fixtures_match_generated) | test(json_schema_fixtures_match_generated) | test(generate_ts_with_experimental_api_retains_experimental_entries) | test(generated_ts_optional_nullable_fields_only_in_params) | test(generate_json_filters_experimental_fields_and_methods))'
|
||||
test-group = 'app_server_protocol_codegen'
|
||||
|
||||
[[profile.default.overrides]]
|
||||
# These Windows CI tests launch full Codex/app-server process trees. They have
|
||||
# repeatedly timed out when nextest schedules them alongside similar tests.
|
||||
platform = 'cfg(windows)'
|
||||
filter = 'package(codex-core) & kind(test) & (test(cli_stream) | test(realtime_conversation))'
|
||||
test-group = 'windows_process_heavy'
|
||||
threads-required = "num-test-threads"
|
||||
slow-timeout = { period = "1m", terminate-after = 4 }
|
||||
|
||||
[[profile.default.overrides]]
|
||||
# The exec resume tests spawn the CLI and touch shared session state on disk.
|
||||
platform = 'cfg(windows)'
|
||||
filter = 'package(codex-exec) & kind(test) & test(exec_resume)'
|
||||
test-group = 'windows_process_heavy'
|
||||
threads-required = "num-test-threads"
|
||||
slow-timeout = { period = "1m", terminate-after = 4 }
|
||||
|
||||
[[profile.default.overrides]]
|
||||
# Keep the specific app-server subprocess-heavy cases isolated on Windows. This
|
||||
# must stay before the broader codex-app-server override below.
|
||||
platform = 'cfg(windows)'
|
||||
filter = 'package(codex-app-server) & kind(test) & (test(thread_fork_can_exclude_turns_and_skip_restored_token_usage) | test(turn_start_resolves_sticky_thread_environments_and_turn_overrides) | test(message_processor_tracing_tests))'
|
||||
test-group = 'windows_process_heavy'
|
||||
threads-required = "num-test-threads"
|
||||
slow-timeout = { period = "1m", terminate-after = 4 }
|
||||
|
||||
[[profile.default.overrides]]
|
||||
# These tests create restricted-token Windows child processes and private
|
||||
# desktops. Running them alone avoids contention with other subprocess tests.
|
||||
platform = 'cfg(windows)'
|
||||
filter = 'package(codex-windows-sandbox) & kind(test) & test(legacy_)'
|
||||
test-group = 'windows_process_heavy'
|
||||
threads-required = "num-test-threads"
|
||||
slow-timeout = { period = "1m", terminate-after = 4 }
|
||||
|
||||
[[profile.default.overrides]]
|
||||
# These integration tests spawn a fresh app-server subprocess per case.
|
||||
# Keep the library unit tests parallel.
|
||||
|
||||
@@ -8,6 +8,7 @@ use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::TokenCountEvent;
|
||||
use codex_protocol::protocol::TokenUsage;
|
||||
use codex_protocol::protocol::TokenUsageInfo;
|
||||
use core_test_support::test_path_buf;
|
||||
use serde_json::json;
|
||||
use std::fs;
|
||||
use std::fs::FileTimes;
|
||||
@@ -134,7 +135,7 @@ pub fn create_fake_rollout_with_source(
|
||||
id: conversation_id,
|
||||
forked_from_id: None,
|
||||
timestamp: meta_rfc3339.to_string(),
|
||||
cwd: PathBuf::from("/"),
|
||||
cwd: test_path_buf("/"),
|
||||
originator: "codex".to_string(),
|
||||
cli_version: "0.0.0".to_string(),
|
||||
source,
|
||||
@@ -218,7 +219,7 @@ pub fn create_fake_rollout_with_text_elements(
|
||||
id: conversation_id,
|
||||
forked_from_id: None,
|
||||
timestamp: meta_rfc3339.to_string(),
|
||||
cwd: PathBuf::from("/"),
|
||||
cwd: test_path_buf("/"),
|
||||
originator: "codex".to_string(),
|
||||
cli_version: "0.0.0".to_string(),
|
||||
source: SessionSource::Cli,
|
||||
|
||||
@@ -31,6 +31,7 @@ use codex_thread_store::ThreadEventPersistenceMode;
|
||||
use codex_thread_store::ThreadPersistenceMetadata;
|
||||
use codex_thread_store::ThreadStore;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use core_test_support::test_path_buf;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
@@ -56,7 +57,7 @@ fn expected_summary(conversation_id: ThreadId, path: PathBuf) -> ConversationSum
|
||||
timestamp: Some(CREATED_AT_RFC3339.to_string()),
|
||||
updated_at: Some(UPDATED_AT_RFC3339.to_string()),
|
||||
model_provider: MODEL_PROVIDER.to_string(),
|
||||
cwd: PathBuf::from("/"),
|
||||
cwd: test_path_buf("/"),
|
||||
cli_version: "0.0.0".to_string(),
|
||||
source: SessionSource::Cli,
|
||||
git_info: None,
|
||||
|
||||
@@ -426,6 +426,8 @@ fn realtime_sideband_connection(
|
||||
WebSocketConnectionConfig {
|
||||
requests: realtime_server_events,
|
||||
response_headers: Vec::new(),
|
||||
accept_started: None,
|
||||
accept_release: None,
|
||||
accept_delay: None,
|
||||
close_after_requests: true,
|
||||
}
|
||||
@@ -1044,6 +1046,8 @@ async fn realtime_webrtc_start_emits_sdp_notification() -> Result<()> {
|
||||
"session": { "id": "sess_webrtc", "instructions": "backend prompt" }
|
||||
})]],
|
||||
response_headers: Vec::new(),
|
||||
accept_started: None,
|
||||
accept_release: None,
|
||||
accept_delay: None,
|
||||
close_after_requests: false,
|
||||
}])
|
||||
@@ -1836,7 +1840,10 @@ async fn webrtc_v2_tool_call_delegated_turn_can_execute_shell_tool() -> Result<(
|
||||
};
|
||||
assert_eq!(id.as_str(), "shell_call");
|
||||
assert_eq!(status, CommandExecutionStatus::Completed);
|
||||
assert_eq!(aggregated_output.as_deref(), Some("realtime-tool-ok"));
|
||||
assert_eq!(
|
||||
aggregated_output.as_deref().map(str::trim),
|
||||
Some("realtime-tool-ok")
|
||||
);
|
||||
|
||||
// Phase 3: verify the shell output reached Responses and the final delegated answer returned
|
||||
// to realtime as a single function-call-output item.
|
||||
@@ -2154,10 +2161,10 @@ fn realtime_tool_ok_command() -> Vec<String> {
|
||||
#[cfg(windows)]
|
||||
{
|
||||
vec![
|
||||
"powershell.exe".to_string(),
|
||||
"-NoProfile".to_string(),
|
||||
"-Command".to_string(),
|
||||
"[Console]::Write('realtime-tool-ok')".to_string(),
|
||||
"cmd.exe".to_string(),
|
||||
"/D".to_string(),
|
||||
"/C".to_string(),
|
||||
"echo realtime-tool-ok".to_string(),
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
@@ -239,6 +239,65 @@ async fn wait_for_live_thread_spawn_children(
|
||||
.expect("expected persisted child tree");
|
||||
}
|
||||
|
||||
async fn wait_for_agent_shutdown(
|
||||
thread_id: ThreadId,
|
||||
mut status_rx: tokio::sync::watch::Receiver<AgentStatus>,
|
||||
) {
|
||||
if matches!(status_rx.borrow().clone(), AgentStatus::Shutdown) {
|
||||
return;
|
||||
}
|
||||
|
||||
timeout(Duration::from_secs(5), async {
|
||||
loop {
|
||||
status_rx
|
||||
.changed()
|
||||
.await
|
||||
.unwrap_or_else(|_| panic!("thread {thread_id} status should reach shutdown"));
|
||||
if matches!(status_rx.borrow().clone(), AgentStatus::Shutdown) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|_| panic!("thread {thread_id} should shut down before resume"));
|
||||
}
|
||||
|
||||
async fn shutdown_live_agent_and_wait(control: &AgentControl, thread_id: ThreadId) {
|
||||
let status_rx = control
|
||||
.subscribe_status(thread_id)
|
||||
.await
|
||||
.expect("status subscription should succeed before shutdown");
|
||||
let _ = control
|
||||
.shutdown_live_agent(thread_id)
|
||||
.await
|
||||
.expect("thread shutdown should submit");
|
||||
wait_for_agent_shutdown(thread_id, status_rx).await;
|
||||
}
|
||||
|
||||
async fn close_agent_and_wait(
|
||||
control: &AgentControl,
|
||||
agent_id: ThreadId,
|
||||
shutdown_ids: &[ThreadId],
|
||||
) {
|
||||
let mut status_rxs = Vec::with_capacity(shutdown_ids.len());
|
||||
for thread_id in shutdown_ids {
|
||||
status_rxs.push((
|
||||
*thread_id,
|
||||
control
|
||||
.subscribe_status(*thread_id)
|
||||
.await
|
||||
.expect("status subscription should succeed before close"),
|
||||
));
|
||||
}
|
||||
let _ = control
|
||||
.close_agent(agent_id)
|
||||
.await
|
||||
.expect("agent close should succeed");
|
||||
for (thread_id, status_rx) in status_rxs {
|
||||
wait_for_agent_shutdown(thread_id, status_rx).await;
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn send_input_errors_when_manager_dropped() {
|
||||
let control = AgentControl::default();
|
||||
@@ -1626,11 +1685,9 @@ async fn resume_thread_subagent_restores_stored_nickname_and_role() {
|
||||
.await
|
||||
.expect("child thread metadata should be persisted to sqlite before shutdown");
|
||||
|
||||
let _ = harness
|
||||
.control
|
||||
.shutdown_live_agent(child_thread_id)
|
||||
.await
|
||||
.expect("child shutdown should submit");
|
||||
drop(status_rx);
|
||||
shutdown_live_agent_and_wait(&harness.control, child_thread_id).await;
|
||||
drop(child_thread);
|
||||
|
||||
let resumed_thread_id = harness
|
||||
.control
|
||||
@@ -1699,11 +1756,8 @@ async fn resume_agent_from_rollout_reads_archived_rollout_path() {
|
||||
.await
|
||||
.expect("child thread should exist");
|
||||
persist_thread_for_tree_resume(&child_thread, "persist before archiving").await;
|
||||
let _ = harness
|
||||
.control
|
||||
.shutdown_live_agent(child_thread_id)
|
||||
.await
|
||||
.expect("child shutdown should succeed");
|
||||
shutdown_live_agent_and_wait(&harness.control, child_thread_id).await;
|
||||
drop(child_thread);
|
||||
let store = LocalThreadStore::new(
|
||||
LocalThreadStoreConfig::from_config(&harness.config),
|
||||
harness.state_db.clone(),
|
||||
@@ -1993,11 +2047,12 @@ async fn shutdown_agent_tree_closes_descendants_when_started_at_child() {
|
||||
wait_for_live_thread_spawn_children(&harness.control, child_thread_id, &[grandchild_thread_id])
|
||||
.await;
|
||||
|
||||
let _ = harness
|
||||
.control
|
||||
.close_agent(child_thread_id)
|
||||
.await
|
||||
.expect("child close should succeed");
|
||||
close_agent_and_wait(
|
||||
&harness.control,
|
||||
child_thread_id,
|
||||
&[child_thread_id, grandchild_thread_id],
|
||||
)
|
||||
.await;
|
||||
|
||||
let _ = harness
|
||||
.control
|
||||
@@ -2085,16 +2140,14 @@ async fn resume_agent_from_rollout_does_not_reopen_closed_descendants() {
|
||||
wait_for_live_thread_spawn_children(&harness.control, child_thread_id, &[grandchild_thread_id])
|
||||
.await;
|
||||
|
||||
let _ = harness
|
||||
.control
|
||||
.close_agent(child_thread_id)
|
||||
.await
|
||||
.expect("child close should succeed");
|
||||
let _ = harness
|
||||
.control
|
||||
.shutdown_live_agent(parent_thread_id)
|
||||
.await
|
||||
.expect("parent shutdown should succeed");
|
||||
close_agent_and_wait(
|
||||
&harness.control,
|
||||
child_thread_id,
|
||||
&[child_thread_id, grandchild_thread_id],
|
||||
)
|
||||
.await;
|
||||
shutdown_live_agent_and_wait(&harness.control, parent_thread_id).await;
|
||||
drop(parent_thread);
|
||||
|
||||
let resumed_parent_thread_id = harness
|
||||
.control
|
||||
@@ -2180,11 +2233,12 @@ async fn resume_closed_child_reopens_open_descendants() {
|
||||
wait_for_live_thread_spawn_children(&harness.control, child_thread_id, &[grandchild_thread_id])
|
||||
.await;
|
||||
|
||||
let _ = harness
|
||||
.control
|
||||
.close_agent(child_thread_id)
|
||||
.await
|
||||
.expect("child close should succeed");
|
||||
close_agent_and_wait(
|
||||
&harness.control,
|
||||
child_thread_id,
|
||||
&[child_thread_id, grandchild_thread_id],
|
||||
)
|
||||
.await;
|
||||
|
||||
let resumed_child_thread_id = harness
|
||||
.control
|
||||
|
||||
@@ -196,6 +196,12 @@ async fn run_agent_job_loop(
|
||||
)
|
||||
.await?;
|
||||
for item in pending_items {
|
||||
let claimed = db
|
||||
.mark_agent_job_item_running(job_id.as_str(), item.item_id.as_str())
|
||||
.await?;
|
||||
if !claimed {
|
||||
continue;
|
||||
}
|
||||
let prompt = build_worker_prompt(&job, &item)?;
|
||||
let items = vec![UserInput::Text {
|
||||
text: prompt,
|
||||
@@ -240,7 +246,7 @@ async fn run_agent_job_loop(
|
||||
}
|
||||
};
|
||||
let assigned = db
|
||||
.mark_agent_job_item_running_with_thread(
|
||||
.set_agent_job_item_thread(
|
||||
job_id.as_str(),
|
||||
item.item_id.as_str(),
|
||||
thread_id.to_string().as_str(),
|
||||
|
||||
@@ -61,27 +61,31 @@ pub async fn handle(
|
||||
}
|
||||
let db = required_state_db(&session)?;
|
||||
let reporting_thread_id = session.conversation_id.to_string();
|
||||
let accepted = db
|
||||
.report_agent_job_item_result(
|
||||
let accepted = if args.stop.unwrap_or(false) {
|
||||
db.report_agent_job_item_result_and_cancel_job(
|
||||
args.job_id.as_str(),
|
||||
args.item_id.as_str(),
|
||||
reporting_thread_id.as_str(),
|
||||
&args.result,
|
||||
"cancelled by worker request",
|
||||
)
|
||||
.await
|
||||
} else {
|
||||
db.report_agent_job_item_result(
|
||||
args.job_id.as_str(),
|
||||
args.item_id.as_str(),
|
||||
reporting_thread_id.as_str(),
|
||||
&args.result,
|
||||
)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
let job_id = args.job_id.as_str();
|
||||
let item_id = args.item_id.as_str();
|
||||
FunctionCallError::RespondToModel(format!(
|
||||
"failed to record agent job result for {job_id} / {item_id}: {err}"
|
||||
))
|
||||
})?;
|
||||
if accepted && args.stop.unwrap_or(false) {
|
||||
let message = "cancelled by worker request";
|
||||
let _ = db
|
||||
.mark_agent_job_cancelled(args.job_id.as_str(), message)
|
||||
.await;
|
||||
}
|
||||
.map_err(|err| {
|
||||
let job_id = args.job_id.as_str();
|
||||
let item_id = args.item_id.as_str();
|
||||
FunctionCallError::RespondToModel(format!(
|
||||
"failed to record agent job result for {job_id} / {item_id}: {err}"
|
||||
))
|
||||
})?;
|
||||
let content =
|
||||
serde_json::to_string(&ReportAgentJobResultToolResult { accepted }).map_err(|err| {
|
||||
FunctionCallError::Fatal(format!(
|
||||
|
||||
@@ -446,6 +446,11 @@ impl WebSocketHandshake {
|
||||
pub struct WebSocketConnectionConfig {
|
||||
pub requests: Vec<Vec<Value>>,
|
||||
pub response_headers: Vec<(String, String)>,
|
||||
/// Optional notification fired after the TCP connection is accepted and before the websocket
|
||||
/// handshake is accepted.
|
||||
pub accept_started: Option<Arc<Notify>>,
|
||||
/// Optional gate that blocks websocket handshake acceptance until the notifier is signalled.
|
||||
pub accept_release: Option<Arc<Notify>>,
|
||||
/// Optional delay inserted before accepting the websocket handshake.
|
||||
///
|
||||
/// Tests use this to force websocket setup into an in-flight state so first-turn warmup paths
|
||||
@@ -1254,6 +1259,8 @@ pub async fn start_websocket_server(connections: Vec<Vec<Vec<Value>>>) -> WebSoc
|
||||
.map(|requests| WebSocketConnectionConfig {
|
||||
requests,
|
||||
response_headers: Vec::new(),
|
||||
accept_started: None,
|
||||
accept_release: None,
|
||||
accept_delay: None,
|
||||
close_after_requests: true,
|
||||
})
|
||||
@@ -1298,12 +1305,27 @@ pub async fn start_websocket_server_with_headers(
|
||||
continue;
|
||||
};
|
||||
|
||||
if let Some(accept_started) = &connection.accept_started {
|
||||
accept_started.notify_one();
|
||||
}
|
||||
|
||||
if let Some(accept_release) = &connection.accept_release {
|
||||
tokio::select! {
|
||||
_ = accept_release.notified() => {}
|
||||
_ = &mut shutdown_rx => return,
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(delay) = connection.accept_delay {
|
||||
tokio::time::sleep(delay).await;
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(delay) => {}
|
||||
_ = &mut shutdown_rx => return,
|
||||
}
|
||||
}
|
||||
|
||||
let response_headers = connection.response_headers.clone();
|
||||
let handshake_log = Arc::clone(&handshakes);
|
||||
let pending_handshake = Arc::new(Mutex::new(None));
|
||||
let callback_handshake = Arc::clone(&pending_handshake);
|
||||
let callback = move |req: &Request, mut response: Response| {
|
||||
let headers = req
|
||||
.headers()
|
||||
@@ -1315,7 +1337,7 @@ pub async fn start_websocket_server_with_headers(
|
||||
.map(|value| (name.as_str().to_string(), value.to_string()))
|
||||
})
|
||||
.collect();
|
||||
handshake_log.lock().unwrap().push(WebSocketHandshake {
|
||||
*callback_handshake.lock().unwrap() = Some(WebSocketHandshake {
|
||||
uri: req.uri().to_string(),
|
||||
headers,
|
||||
});
|
||||
@@ -1344,6 +1366,10 @@ pub async fn start_websocket_server_with_headers(
|
||||
Err(_) => continue,
|
||||
};
|
||||
|
||||
if let Some(handshake) = pending_handshake.lock().unwrap().take() {
|
||||
handshakes.lock().unwrap().push(handshake);
|
||||
}
|
||||
|
||||
let connection_index = {
|
||||
let mut log = requests.lock().unwrap();
|
||||
log.push(Vec::new());
|
||||
|
||||
@@ -121,6 +121,8 @@ async fn websocket_first_turn_handles_handshake_delay_with_startup_prewarm() ->
|
||||
],
|
||||
],
|
||||
response_headers: Vec::new(),
|
||||
accept_started: None,
|
||||
accept_release: None,
|
||||
// Delay handshake so turn processing must tolerate websocket startup latency.
|
||||
accept_delay: Some(Duration::from_millis(150)),
|
||||
close_after_requests: true,
|
||||
|
||||
@@ -941,6 +941,8 @@ async fn responses_websocket_emits_reasoning_included_event() {
|
||||
let server = start_websocket_server_with_headers(vec![WebSocketConnectionConfig {
|
||||
requests: vec![vec![ev_response_created("resp-1"), ev_completed("resp-1")]],
|
||||
response_headers: vec![("X-Reasoning-Included".to_string(), "true".to_string())],
|
||||
accept_started: None,
|
||||
accept_release: None,
|
||||
accept_delay: None,
|
||||
close_after_requests: true,
|
||||
}])
|
||||
@@ -1015,6 +1017,8 @@ async fn responses_websocket_emits_rate_limit_events() {
|
||||
("X-Models-Etag".to_string(), "etag-123".to_string()),
|
||||
("X-Reasoning-Included".to_string(), "true".to_string()),
|
||||
],
|
||||
accept_started: None,
|
||||
accept_release: None,
|
||||
accept_delay: None,
|
||||
close_after_requests: true,
|
||||
}])
|
||||
@@ -1751,6 +1755,8 @@ async fn responses_websocket_v2_surfaces_terminal_error_without_close_handshake(
|
||||
})],
|
||||
],
|
||||
response_headers: Vec::new(),
|
||||
accept_started: None,
|
||||
accept_release: None,
|
||||
accept_delay: None,
|
||||
close_after_requests: false,
|
||||
}])
|
||||
|
||||
@@ -48,6 +48,7 @@ use std::process::Command;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::Notify;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::time::timeout;
|
||||
use wiremock::Match;
|
||||
@@ -492,6 +493,8 @@ async fn conversation_webrtc_start_posts_generated_session() -> Result<()> {
|
||||
vec![],
|
||||
],
|
||||
response_headers: Vec::new(),
|
||||
accept_started: None,
|
||||
accept_release: None,
|
||||
accept_delay: Some(sideband_accept_delay),
|
||||
close_after_requests: false,
|
||||
}])
|
||||
@@ -659,10 +662,14 @@ async fn conversation_webrtc_close_while_sideband_connecting_drops_pending_join(
|
||||
)
|
||||
.mount(&server)
|
||||
.await;
|
||||
let accept_started = Arc::new(Notify::new());
|
||||
let accept_release = Arc::new(Notify::new());
|
||||
let realtime_server = start_websocket_server_with_headers(vec![WebSocketConnectionConfig {
|
||||
requests: vec![vec![]],
|
||||
response_headers: Vec::new(),
|
||||
accept_delay: Some(Duration::from_millis(500)),
|
||||
accept_started: Some(Arc::clone(&accept_started)),
|
||||
accept_release: Some(Arc::clone(&accept_release)),
|
||||
accept_delay: None,
|
||||
close_after_requests: false,
|
||||
}])
|
||||
.await;
|
||||
@@ -699,6 +706,9 @@ async fn conversation_webrtc_close_while_sideband_connecting_drops_pending_join(
|
||||
realtime_server.handshakes().is_empty(),
|
||||
"sideband websocket should still be pending when SDP is emitted"
|
||||
);
|
||||
timeout(Duration::from_secs(5), accept_started.notified())
|
||||
.await
|
||||
.context("sideband websocket should connect before close")?;
|
||||
|
||||
test.codex.submit(Op::RealtimeConversationClose).await?;
|
||||
let closed = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
@@ -726,9 +736,17 @@ async fn conversation_webrtc_close_while_sideband_connecting_drops_pending_join(
|
||||
"pending sideband task leaked after close: {:?}",
|
||||
stale_event.ok()
|
||||
);
|
||||
accept_release.notify_one();
|
||||
let stale_request = timeout(Duration::from_millis(250), async {
|
||||
realtime_server
|
||||
.wait_for_request(/*connection_index*/ 0, /*request_index*/ 0)
|
||||
.await
|
||||
})
|
||||
.await;
|
||||
assert!(
|
||||
realtime_server.handshakes().is_empty(),
|
||||
"pending sideband task should abort before websocket handshake completes"
|
||||
stale_request.is_err(),
|
||||
"pending sideband task sent websocket request after close: {:?}",
|
||||
stale_request.ok().map(|request| request.body_json())
|
||||
);
|
||||
|
||||
realtime_server.shutdown().await;
|
||||
@@ -749,11 +767,13 @@ async fn conversation_webrtc_sideband_connect_failure_closes_with_error() -> Res
|
||||
)
|
||||
.mount(&server)
|
||||
.await;
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
let realtime_base_url = server.uri();
|
||||
let mut builder = test_codex().with_config(move |config| {
|
||||
config.experimental_realtime_ws_backend_prompt = Some("backend prompt".to_string());
|
||||
config.experimental_realtime_ws_model = Some("realtime-test-model".to_string());
|
||||
config.experimental_realtime_ws_startup_context = Some(String::new());
|
||||
config.experimental_realtime_ws_base_url = Some("http://127.0.0.1:1".to_string());
|
||||
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
||||
config.model_provider.request_max_retries = Some(0);
|
||||
config.realtime.version = RealtimeWsVersion::V1;
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
|
||||
@@ -102,6 +102,8 @@ async fn websocket_turn_state_persists_within_turn_and_resets_after() -> Result<
|
||||
ev_completed("resp-1"),
|
||||
]],
|
||||
response_headers: vec![(TURN_STATE_HEADER.to_string(), "ts-1".to_string())],
|
||||
accept_started: None,
|
||||
accept_release: None,
|
||||
accept_delay: None,
|
||||
close_after_requests: true,
|
||||
},
|
||||
@@ -112,6 +114,8 @@ async fn websocket_turn_state_persists_within_turn_and_resets_after() -> Result<
|
||||
ev_completed("resp-2"),
|
||||
]],
|
||||
response_headers: Vec::new(),
|
||||
accept_started: None,
|
||||
accept_release: None,
|
||||
accept_delay: None,
|
||||
close_after_requests: true,
|
||||
},
|
||||
@@ -122,6 +126,8 @@ async fn websocket_turn_state_persists_within_turn_and_resets_after() -> Result<
|
||||
ev_completed("resp-3"),
|
||||
]],
|
||||
response_headers: Vec::new(),
|
||||
accept_started: None,
|
||||
accept_release: None,
|
||||
accept_delay: None,
|
||||
close_after_requests: true,
|
||||
},
|
||||
|
||||
@@ -372,7 +372,16 @@ async fn wait_for_single_request(mock: &ResponseMock) -> ResponsesRequest {
|
||||
async fn wait_for_file_removed(path: &Path) -> anyhow::Result<()> {
|
||||
let deadline = Instant::now() + Duration::from_secs(10);
|
||||
loop {
|
||||
if !tokio::fs::try_exists(path).await? {
|
||||
let exists = match tokio::fs::try_exists(path).await {
|
||||
Ok(exists) => exists,
|
||||
Err(err) if err.kind() == std::io::ErrorKind::PermissionDenied => {
|
||||
// Windows can transiently deny metadata reads while another task
|
||||
// is removing or resetting files in this workspace.
|
||||
true
|
||||
}
|
||||
Err(err) => return Err(err.into()),
|
||||
};
|
||||
if !exists {
|
||||
return Ok(());
|
||||
}
|
||||
assert!(
|
||||
|
||||
@@ -227,22 +227,23 @@ WHERE id = ?
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn mark_agent_job_completed(&self, job_id: &str) -> anyhow::Result<()> {
|
||||
pub async fn mark_agent_job_completed(&self, job_id: &str) -> anyhow::Result<bool> {
|
||||
let now = Utc::now().timestamp();
|
||||
sqlx::query(
|
||||
let result = sqlx::query(
|
||||
r#"
|
||||
UPDATE agent_jobs
|
||||
SET status = ?, updated_at = ?, completed_at = ?, last_error = NULL
|
||||
WHERE id = ?
|
||||
WHERE id = ? AND status = ?
|
||||
"#,
|
||||
)
|
||||
.bind(AgentJobStatus::Completed.as_str())
|
||||
.bind(now)
|
||||
.bind(now)
|
||||
.bind(job_id)
|
||||
.bind(AgentJobStatus::Running.as_str())
|
||||
.execute(self.pool.as_ref())
|
||||
.await?;
|
||||
Ok(())
|
||||
Ok(result.rows_affected() > 0)
|
||||
}
|
||||
|
||||
pub async fn mark_agent_job_failed(
|
||||
@@ -428,9 +429,46 @@ WHERE job_id = ? AND item_id = ? AND status = ?
|
||||
item_id: &str,
|
||||
reporting_thread_id: &str,
|
||||
result_json: &Value,
|
||||
) -> anyhow::Result<bool> {
|
||||
self.report_agent_job_item_result_inner(
|
||||
job_id,
|
||||
item_id,
|
||||
reporting_thread_id,
|
||||
result_json,
|
||||
/*cancel_job_reason*/ None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn report_agent_job_item_result_and_cancel_job(
|
||||
&self,
|
||||
job_id: &str,
|
||||
item_id: &str,
|
||||
reporting_thread_id: &str,
|
||||
result_json: &Value,
|
||||
cancel_job_reason: &str,
|
||||
) -> anyhow::Result<bool> {
|
||||
self.report_agent_job_item_result_inner(
|
||||
job_id,
|
||||
item_id,
|
||||
reporting_thread_id,
|
||||
result_json,
|
||||
Some(cancel_job_reason),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn report_agent_job_item_result_inner(
|
||||
&self,
|
||||
job_id: &str,
|
||||
item_id: &str,
|
||||
reporting_thread_id: &str,
|
||||
result_json: &Value,
|
||||
cancel_job_reason: Option<&str>,
|
||||
) -> anyhow::Result<bool> {
|
||||
let now = Utc::now().timestamp();
|
||||
let serialized = serde_json::to_string(result_json)?;
|
||||
let mut tx = self.pool.begin().await?;
|
||||
let result = sqlx::query(
|
||||
r#"
|
||||
UPDATE agent_job_items
|
||||
@@ -446,7 +484,7 @@ WHERE
|
||||
job_id = ?
|
||||
AND item_id = ?
|
||||
AND status = ?
|
||||
AND assigned_thread_id = ?
|
||||
AND (assigned_thread_id = ? OR assigned_thread_id IS NULL)
|
||||
"#,
|
||||
)
|
||||
.bind(AgentJobItemStatus::Completed.as_str())
|
||||
@@ -458,9 +496,29 @@ WHERE
|
||||
.bind(item_id)
|
||||
.bind(AgentJobItemStatus::Running.as_str())
|
||||
.bind(reporting_thread_id)
|
||||
.execute(self.pool.as_ref())
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
Ok(result.rows_affected() > 0)
|
||||
let accepted = result.rows_affected() > 0;
|
||||
if accepted && let Some(reason) = cancel_job_reason {
|
||||
sqlx::query(
|
||||
r#"
|
||||
UPDATE agent_jobs
|
||||
SET status = ?, updated_at = ?, completed_at = ?, last_error = ?
|
||||
WHERE id = ? AND status IN (?, ?)
|
||||
"#,
|
||||
)
|
||||
.bind(AgentJobStatus::Cancelled.as_str())
|
||||
.bind(now)
|
||||
.bind(now)
|
||||
.bind(reason)
|
||||
.bind(job_id)
|
||||
.bind(AgentJobStatus::Pending.as_str())
|
||||
.bind(AgentJobStatus::Running.as_str())
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
}
|
||||
tx.commit().await?;
|
||||
Ok(accepted)
|
||||
}
|
||||
|
||||
pub async fn mark_agent_job_item_completed(
|
||||
@@ -652,6 +710,113 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn report_agent_job_item_result_can_cancel_job_atomically() -> anyhow::Result<()> {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home, "test-provider".to_string()).await?;
|
||||
let (job_id, item_id, thread_id) = create_running_single_item_job(runtime.as_ref()).await?;
|
||||
|
||||
let accepted = runtime
|
||||
.report_agent_job_item_result_and_cancel_job(
|
||||
job_id.as_str(),
|
||||
item_id.as_str(),
|
||||
thread_id.as_str(),
|
||||
&json!({"ok": true}),
|
||||
"cancelled by worker request",
|
||||
)
|
||||
.await?;
|
||||
assert!(accepted);
|
||||
|
||||
let job = runtime
|
||||
.get_agent_job(job_id.as_str())
|
||||
.await?
|
||||
.expect("job should exist");
|
||||
assert_eq!(job.status, AgentJobStatus::Cancelled);
|
||||
assert_eq!(
|
||||
job.last_error,
|
||||
Some("cancelled by worker request".to_string())
|
||||
);
|
||||
|
||||
let item = runtime
|
||||
.get_agent_job_item(job_id.as_str(), item_id.as_str())
|
||||
.await?
|
||||
.expect("job item should exist");
|
||||
assert_eq!(item.status, AgentJobItemStatus::Completed);
|
||||
assert_eq!(item.result_json, Some(json!({"ok": true})));
|
||||
assert_eq!(item.assigned_thread_id, None);
|
||||
|
||||
let completed = runtime.mark_agent_job_completed(job_id.as_str()).await?;
|
||||
assert!(!completed);
|
||||
let job = runtime
|
||||
.get_agent_job(job_id.as_str())
|
||||
.await?
|
||||
.expect("job should exist");
|
||||
assert_eq!(job.status, AgentJobStatus::Cancelled);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn report_agent_job_item_result_accepts_unassigned_running_item() -> anyhow::Result<()> {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home, "test-provider".to_string()).await?;
|
||||
let job_id = "job-1".to_string();
|
||||
let item_id = "item-1".to_string();
|
||||
let thread_id = "thread-1".to_string();
|
||||
runtime
|
||||
.create_agent_job(
|
||||
&AgentJobCreateParams {
|
||||
id: job_id.clone(),
|
||||
name: "test-job".to_string(),
|
||||
instruction: "Return a result".to_string(),
|
||||
auto_export: true,
|
||||
max_runtime_seconds: None,
|
||||
output_schema_json: None,
|
||||
input_headers: vec!["path".to_string()],
|
||||
input_csv_path: "/tmp/in.csv".to_string(),
|
||||
output_csv_path: "/tmp/out.csv".to_string(),
|
||||
},
|
||||
&[AgentJobItemCreateParams {
|
||||
item_id: item_id.clone(),
|
||||
row_index: 0,
|
||||
source_id: None,
|
||||
row_json: json!({"path":"file-1"}),
|
||||
}],
|
||||
)
|
||||
.await?;
|
||||
runtime.mark_agent_job_running(job_id.as_str()).await?;
|
||||
let marked_running = runtime
|
||||
.mark_agent_job_item_running(job_id.as_str(), item_id.as_str())
|
||||
.await?;
|
||||
assert!(marked_running);
|
||||
|
||||
let accepted = runtime
|
||||
.report_agent_job_item_result_and_cancel_job(
|
||||
job_id.as_str(),
|
||||
item_id.as_str(),
|
||||
thread_id.as_str(),
|
||||
&json!({"ok": true}),
|
||||
"cancelled by worker request",
|
||||
)
|
||||
.await?;
|
||||
assert!(accepted);
|
||||
|
||||
let job = runtime
|
||||
.get_agent_job(job_id.as_str())
|
||||
.await?
|
||||
.expect("job should exist");
|
||||
assert_eq!(job.status, AgentJobStatus::Cancelled);
|
||||
let item = runtime
|
||||
.get_agent_job_item(job_id.as_str(), item_id.as_str())
|
||||
.await?
|
||||
.expect("job item should exist");
|
||||
assert_eq!(item.status, AgentJobItemStatus::Completed);
|
||||
assert_eq!(item.result_json, Some(json!({"ok": true})));
|
||||
assert_eq!(item.assigned_thread_id, None);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn report_agent_job_item_result_rejects_late_reports() -> anyhow::Result<()> {
|
||||
let codex_home = unique_temp_dir();
|
||||
|
||||
@@ -195,7 +195,7 @@ fn legacy_non_tty_powershell_emits_output() {
|
||||
pwsh.display().to_string(),
|
||||
"-NoProfile".to_string(),
|
||||
"-Command".to_string(),
|
||||
"Write-Output LEGACY-NONTTY-DIRECT".to_string(),
|
||||
"'LEGACY-NONTTY-DIRECT'".to_string(),
|
||||
],
|
||||
cwd.as_path(),
|
||||
HashMap::new(),
|
||||
@@ -378,7 +378,7 @@ fn legacy_capture_powershell_emits_output() {
|
||||
pwsh.display().to_string(),
|
||||
"-NoProfile".to_string(),
|
||||
"-Command".to_string(),
|
||||
"Write-Output LEGACY-CAPTURE-DIRECT".to_string(),
|
||||
"'LEGACY-CAPTURE-DIRECT'".to_string(),
|
||||
],
|
||||
cwd.as_path(),
|
||||
HashMap::new(),
|
||||
@@ -419,7 +419,7 @@ fn legacy_tty_powershell_emits_output_and_accepts_input() {
|
||||
"-NoProfile".to_string(),
|
||||
"-NoExit".to_string(),
|
||||
"-Command".to_string(),
|
||||
"$PID; Write-Output ready".to_string(),
|
||||
"$PID; 'ready'".to_string(),
|
||||
],
|
||||
cwd.as_path(),
|
||||
HashMap::new(),
|
||||
@@ -434,7 +434,7 @@ fn legacy_tty_powershell_emits_output_and_accepts_input() {
|
||||
|
||||
let writer = spawned.session.writer_sender();
|
||||
writer
|
||||
.send(b"Write-Output second\n".to_vec())
|
||||
.send(b"'second'\n".to_vec())
|
||||
.await
|
||||
.expect("send second command");
|
||||
writer
|
||||
|
||||
Reference in New Issue
Block a user