Compare commits

...

35 Commits

Author SHA1 Message Date
Ahmed Ibrahim
fd09be07ab codex: retrigger PR CI after dispatch failure (#13534) 2026-03-05 08:43:45 -08:00
Ahmed Ibrahim
2e92f82309 codex: retrigger PR CI (#13534) 2026-03-05 08:38:19 -08:00
Ahmed Ibrahim
e12ab74549 codex: stabilize app-server-protocol codegen tests (#13534) 2026-03-05 08:30:52 -08:00
Ahmed Ibrahim
d577399e06 codex: validate flaky test stability (#13534) 2026-03-05 04:39:22 -08:00
Ahmed Ibrahim
02f31c7264 codex: validate flaky test stability (#13534) 2026-03-05 04:19:56 -08:00
Ahmed Ibrahim
80a1c72f20 codex: validate flaky test stability (#13534) 2026-03-05 04:00:11 -08:00
Ahmed Ibrahim
1667b289c7 codex: validate flaky test stability (#13534) 2026-03-05 03:40:38 -08:00
Ahmed Ibrahim
b0dda0f9b2 codex: restore stdio test trait import (#13534) 2026-03-05 03:22:41 -08:00
Ahmed Ibrahim
3737fe4517 codex: stabilize windows flaky tests (#13534) 2026-03-05 03:19:04 -08:00
Ahmed Ibrahim
91ede5aebf codex: remove unused stdio-to-uds test dep (#13534) 2026-03-05 02:58:26 -08:00
Ahmed Ibrahim
9315e6e4ca codex: stabilize stdio-to-uds windows test (#13534) 2026-03-05 02:55:57 -08:00
Ahmed Ibrahim
18a943582b codex: fix notify helper resolution in app-server tests (#13534) 2026-03-05 02:38:33 -08:00
Ahmed Ibrahim
380c8f5ff3 codex: expose notify helper to Bazel tests (#13534) 2026-03-05 02:24:21 -08:00
Ahmed Ibrahim
58b7e1400e codex: fix Bazel for notify capture helper (#13534) 2026-03-05 02:20:41 -08:00
Ahmed Ibrahim
406d8b71d8 codex: fix flaky notify capture on windows (#13534) 2026-03-05 02:16:38 -08:00
Ahmed Ibrahim
ab3b133acb codex: validate flaky test stability (#13534) 2026-03-05 01:56:37 -08:00
Ahmed Ibrahim
bf4d47fb92 codex: fix CI failure on PR #13534 2026-03-05 01:37:54 -08:00
Ahmed Ibrahim
6526d00f13 codex: stabilize windows app-server tests (#13534) 2026-03-05 01:24:58 -08:00
Ahmed Ibrahim
ca498a67fa Fix initialize notify config layout 2026-03-05 00:58:10 -08:00
Ahmed Ibrahim
6f3a86a4be Disable shell snapshot in initialize suite 2026-03-05 00:53:00 -08:00
Ahmed Ibrahim
38de4c8174 Disable shell snapshot in account suite 2026-03-05 00:35:36 -08:00
Ahmed Ibrahim
c9df1ec0b9 Make PTY Python readiness deterministic 2026-03-05 00:19:15 -08:00
Ahmed Ibrahim
cf890fd725 Annotate unsubscribe stabilization timer 2026-03-04 23:38:35 -08:00
Ahmed Ibrahim
2ce32a995a Stabilize thread unsubscribe request assertion 2026-03-04 22:11:03 -08:00
Ahmed Ibrahim
df4c918bd2 Restore async legacy notify hook 2026-03-04 21:55:12 -08:00
Ahmed Ibrahim
7d7b67192f Re-export turn state 2026-03-04 21:22:53 -08:00
Ahmed Ibrahim
0d246928c2 Remove unused thread resume import 2026-03-04 21:12:26 -08:00
Ahmed Ibrahim
c6f5627349 Cancel tasks before clearing pending approvals 2026-03-04 21:03:04 -08:00
Ahmed Ibrahim
8385a285a2 Handle missing wiremock request log 2026-03-04 20:53:54 -08:00
Ahmed Ibrahim
206cfb561f Wait for resumed approval follow-up requests 2026-03-04 20:48:32 -08:00
Ahmed Ibrahim
bf3c63d160 Remove invalid abort task assertion 2026-03-04 20:41:05 -08:00
Ahmed Ibrahim
b3f11d668a Suppress cancelled turn follow-up requests 2026-03-04 20:33:30 -08:00
Ahmed Ibrahim
39c4f2b06d Wait for legacy notify hook completion 2026-03-04 20:20:48 -08:00
Ahmed Ibrahim
3a932261cf Attach subagent completion watcher before input 2026-03-04 20:03:01 -08:00
Ahmed Ibrahim
42a6b98598 Start flaky test stabilization 2026-03-04 19:52:17 -08:00
20 changed files with 535 additions and 104 deletions

View File

@@ -2,6 +2,9 @@
# Do not increase, fix your test instead
slow-timeout = { period = "15s", terminate-after = 2 }
[test-groups.app_server_protocol_codegen]
max-threads = 1
[[profile.default.overrides]]
# Do not add new tests here
@@ -11,3 +14,7 @@ slow-timeout = { period = "1m", terminate-after = 4 }
[[profile.default.overrides]]
filter = 'test(approval_matrix_covers_all_modes)'
slow-timeout = { period = "30s", terminate-after = 2 }
[[profile.default.overrides]]
filter = 'package(codex-app-server-protocol) & (test(schema_fixtures_match_generated) | test(generate_ts_with_experimental_api_retains_experimental_entries) | test(generated_ts_optional_nullable_fields_only_in_params) | test(generate_json_filters_experimental_fields_and_methods))'
test-group = 'app_server_protocol_codegen'

1
codex-rs/Cargo.lock generated
View File

@@ -2406,7 +2406,6 @@ name = "codex-stdio-to-uds"
version = "0.0.0"
dependencies = [
"anyhow",
"assert_cmd",
"codex-utils-cargo-bin",
"pretty_assertions",
"tempfile",

View File

@@ -4,15 +4,41 @@ use codex_app_server_protocol::read_schema_fixture_tree;
use codex_app_server_protocol::write_schema_fixtures;
use similar::TextDiff;
use std::path::Path;
use std::time::Instant;
#[test]
fn schema_fixtures_match_generated() -> Result<()> {
let start = Instant::now();
let schema_root = schema_root()?;
eprintln!(
"[schema_fixtures] resolved schema root in {:?}: {}",
start.elapsed(),
schema_root.display()
);
let fixture_read_start = Instant::now();
let fixture_tree = read_tree(&schema_root)?;
eprintln!(
"[schema_fixtures] read {} vendored files in {:?}",
fixture_tree.len(),
fixture_read_start.elapsed()
);
let temp_dir = tempfile::tempdir().context("create temp dir")?;
let generate_start = Instant::now();
write_schema_fixtures(temp_dir.path(), None).context("generate schema fixtures")?;
eprintln!(
"[schema_fixtures] generated schema fixtures in {:?}",
generate_start.elapsed()
);
let generated_read_start = Instant::now();
let generated_tree = read_tree(temp_dir.path())?;
eprintln!(
"[schema_fixtures] read {} generated files in {:?}",
generated_tree.len(),
generated_read_start.elapsed()
);
let fixture_paths = fixture_tree
.keys()
@@ -60,6 +86,12 @@ Run `just write-app-server-schema` to overwrite with your changes.\n\n{diff}",
);
}
eprintln!(
"[schema_fixtures] compared {} files in {:?}",
fixture_tree.len(),
start.elapsed()
);
Ok(())
}

View File

@@ -8,6 +8,10 @@ license.workspace = true
name = "codex-app-server"
path = "src/main.rs"
[[bin]]
name = "codex-app-server-test-notify-capture"
path = "src/bin/notify_capture.rs"
[lib]
name = "codex_app_server"
path = "src/lib.rs"

View File

@@ -0,0 +1,61 @@
use std::env;
use std::fs::File;
use std::fs::OpenOptions;
use std::io::Write;
use std::path::Path;
use std::path::PathBuf;
use anyhow::Context;
use anyhow::Result;
use anyhow::anyhow;
use anyhow::bail;
fn append_log(log_path: &Path, message: &str) {
if let Ok(mut file) = OpenOptions::new().create(true).append(true).open(log_path) {
let _ = writeln!(file, "{message}");
let _ = file.sync_all();
}
}
fn main() -> Result<()> {
let mut args = env::args_os();
let _program = args.next();
let output_path = PathBuf::from(
args.next()
.ok_or_else(|| anyhow!("expected output path as first argument"))?,
);
let log_path = PathBuf::from(
args.next()
.ok_or_else(|| anyhow!("expected log path as second argument"))?,
);
let payload = args
.next()
.ok_or_else(|| anyhow!("expected payload as final argument"))?;
append_log(
&log_path,
&format!(
"start cwd={} output={}",
env::current_dir()?.display(),
output_path.display()
),
);
if args.next().is_some() {
append_log(&log_path, "unexpected extra argument");
bail!("expected payload as final argument");
}
let payload = payload.to_string_lossy();
append_log(&log_path, &format!("payload-bytes={}", payload.len()));
let mut file = File::create(&output_path)
.with_context(|| format!("failed to create {}", output_path.display()))?;
file.write_all(payload.as_bytes())
.with_context(|| format!("failed to write {}", output_path.display()))?;
file.sync_all()
.with_context(|| format!("failed to sync {}", output_path.display()))?;
append_log(&log_path, &format!("wrote {}", output_path.display()));
Ok(())
}

View File

@@ -33,6 +33,9 @@ sandbox_mode = "danger-full-access"
model_provider = "mock_provider"
[features]
shell_snapshot = false
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "http://127.0.0.1:0/v1"
@@ -53,6 +56,9 @@ fn create_config_toml(codex_home: &Path) -> std::io::Result<()> {
model = "mock-model"
approval_policy = "never"
sandbox_mode = "danger-full-access"
[features]
shell_snapshot = false
"#,
)
}
@@ -65,6 +71,9 @@ model = "mock-model"
approval_policy = "never"
sandbox_mode = "danger-full-access"
forced_login_method = "{forced_method}"
[features]
shell_snapshot = false
"#
);
std::fs::write(config_toml, contents)

View File

@@ -7,6 +7,7 @@ use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::path::Path;
use tempfile::TempDir;
use tokio::time::timeout;
@@ -23,7 +24,23 @@ enum FileExpectation {
NonEmpty,
}
fn create_config_toml(codex_home: &Path) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
r#"
model = "mock-model"
approval_policy = "never"
sandbox_mode = "danger-full-access"
[features]
shell_snapshot = false
"#,
)
}
async fn initialized_mcp(codex_home: &TempDir) -> Result<McpProcess> {
create_config_toml(codex_home.path())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
Ok(mcp)
@@ -164,6 +181,7 @@ async fn assert_no_session_updates_for(
async fn test_fuzzy_file_search_sorts_and_includes_indices() -> Result<()> {
// Prepare a temporary Codex home and a separate root with test files.
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path())?;
let root = TempDir::new()?;
// Create files designed to have deterministic ordering for query "abe".
@@ -235,6 +253,7 @@ async fn test_fuzzy_file_search_sorts_and_includes_indices() -> Result<()> {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fuzzy_file_search_accepts_cancellation_token() -> Result<()> {
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path())?;
let root = TempDir::new()?;
std::fs::write(root.path().join("alpha.txt"), "contents")?;

View File

@@ -83,6 +83,9 @@ sandbox_mode = "danger-full-access"
model_provider = "mock_provider"
[features]
shell_snapshot = false
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{base_url}"

View File

@@ -14,12 +14,14 @@ use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::UserInput as V2UserInput;
use core_test_support::fs_wait;
use codex_utils_cargo_bin::cargo_bin;
use pretty_assertions::assert_eq;
use serde_json::Value;
use std::path::Path;
use std::time::Duration;
use tempfile::TempDir;
use tokio::time::Instant;
use tokio::time::sleep;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
@@ -191,29 +193,27 @@ async fn turn_start_notify_payload_includes_initialize_client_name() -> Result<(
let responses = vec![create_final_assistant_message_sse_response("Done")?];
let server = create_mock_responses_server_sequence_unchecked(responses).await;
let codex_home = TempDir::new()?;
let notify_script = codex_home.path().join("notify.py");
std::fs::write(
&notify_script,
r#"from pathlib import Path
import sys
payload_path = Path(__file__).with_name("notify.json")
tmp_path = payload_path.with_suffix(".json.tmp")
tmp_path.write_text(sys.argv[-1], encoding="utf-8")
tmp_path.replace(payload_path)
"#,
)?;
let notify_file = codex_home.path().join("notify.json");
let notify_script = notify_script
let notify_log = codex_home.path().join("notify.log");
let notify_capture = cargo_bin("codex-app-server-test-notify-capture")?;
let notify_capture = notify_capture
.to_str()
.expect("notify script path should be valid UTF-8");
.expect("notify capture path should be valid UTF-8");
let notify_file_str = notify_file
.to_str()
.expect("notify file path should be valid UTF-8");
let notify_log_str = notify_log
.to_str()
.expect("notify log path should be valid UTF-8");
create_config_toml_with_extra(
codex_home.path(),
&server.uri(),
"never",
&format!(
"notify = [\"python3\", {}]",
toml_basic_string(notify_script)
"notify = [{}, {}, {}]",
toml_basic_string(notify_capture),
toml_basic_string(notify_file_str),
toml_basic_string(notify_log_str)
),
)?;
@@ -261,14 +261,49 @@ tmp_path.replace(payload_path)
)
.await??;
fs_wait::wait_for_path_exists(&notify_file, Duration::from_secs(5)).await?;
let payload_raw = tokio::fs::read_to_string(&notify_file).await?;
let payload: Value = serde_json::from_str(&payload_raw)?;
let payload = wait_for_json_file(&notify_file, &notify_log).await?;
assert_eq!(payload["client"], "xcode");
Ok(())
}
async fn wait_for_json_file(path: &Path, log_path: &Path) -> Result<Value> {
let deadline = Instant::now() + Duration::from_secs(5);
let mut last_contents = None;
loop {
match tokio::fs::read_to_string(path).await {
Ok(contents) => {
if let Ok(payload) = serde_json::from_str(&contents) {
return Ok(payload);
}
last_contents = Some(contents);
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
Err(err) => return Err(err.into()),
}
if Instant::now() >= deadline {
let helper_log = match tokio::fs::read_to_string(log_path).await {
Ok(contents) => contents,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
"<missing helper log>".to_string()
}
Err(err) => format!("<failed to read helper log: {err}>"),
};
let last_contents =
last_contents.unwrap_or_else(|| "<missing notify file>".to_string());
anyhow::bail!(
"timed out waiting for valid JSON in {}. helper log: {} last contents: {}",
path.display(),
helper_log.trim_end(),
last_contents.trim_end()
);
}
sleep(Duration::from_millis(25)).await;
}
}
// Helper to create a config.toml pointing at the mock model server.
fn create_config_toml(
codex_home: &Path,
@@ -297,6 +332,9 @@ model_provider = "mock_provider"
{extra}
[features]
shell_snapshot = false
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"

View File

@@ -4,7 +4,7 @@ use app_test_support::create_apply_patch_sse_response;
use app_test_support::create_fake_rollout_with_text_elements;
use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::create_mock_responses_server_sequence;
use app_test_support::create_mock_responses_server_sequence_unchecked;
use app_test_support::create_shell_command_sse_response;
use app_test_support::rollout_path;
use app_test_support::to_response;
@@ -59,6 +59,36 @@ use uuid::Uuid;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
const CODEX_5_2_INSTRUCTIONS_TEMPLATE_DEFAULT: &str = "You are Codex, a coding agent based on GPT-5. You and the user share the same workspace and collaborate to achieve the user's goals.";
async fn wait_for_responses_request_count(
server: &wiremock::MockServer,
expected_count: usize,
) -> Result<()> {
timeout(DEFAULT_READ_TIMEOUT, async {
loop {
let Some(requests) = server.received_requests().await else {
anyhow::bail!("wiremock did not record requests");
};
let responses_request_count = requests
.iter()
.filter(|request| {
request.method == "POST" && request.url.path().ends_with("/responses")
})
.count();
if responses_request_count == expected_count {
return Ok::<(), anyhow::Error>(());
}
if responses_request_count > expected_count {
anyhow::bail!(
"expected exactly {expected_count} /responses requests, got {responses_request_count}"
);
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
})
.await??;
Ok(())
}
#[tokio::test]
async fn thread_resume_rejects_unmaterialized_thread() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
@@ -870,7 +900,7 @@ async fn thread_resume_replays_pending_command_execution_request_approval() -> R
)?,
create_final_assistant_message_sse_response("done")?,
];
let server = create_mock_responses_server_sequence(responses).await;
let server = create_mock_responses_server_sequence_unchecked(responses).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
@@ -985,6 +1015,7 @@ async fn thread_resume_replays_pending_command_execution_request_approval() -> R
primary.read_stream_until_notification_message("turn/completed"),
)
.await??;
wait_for_responses_request_count(&server, 3).await?;
Ok(())
}
@@ -1007,7 +1038,7 @@ async fn thread_resume_replays_pending_file_change_request_approval() -> Result<
create_apply_patch_sse_response(patch, "patch-call")?,
create_final_assistant_message_sse_response("done")?,
];
let server = create_mock_responses_server_sequence(responses).await;
let server = create_mock_responses_server_sequence_unchecked(responses).await;
create_config_toml(&codex_home, &server.uri())?;
let mut primary = McpProcess::new(&codex_home).await?;
@@ -1150,6 +1181,7 @@ async fn thread_resume_replays_pending_file_change_request_approval() -> Result<
primary.read_stream_until_notification_message("turn/completed"),
)
.await??;
wait_for_responses_request_count(&server, 3).await?;
Ok(())
}

View File

@@ -34,6 +34,51 @@ use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
async fn wait_for_responses_request_count_to_stabilize(
server: &wiremock::MockServer,
expected_count: usize,
settle_duration: std::time::Duration,
) -> Result<()> {
timeout(DEFAULT_READ_TIMEOUT, async {
let mut stable_since: Option<tokio::time::Instant> = None;
loop {
let requests = server
.received_requests()
.await
.context("failed to fetch received requests")?;
let responses_request_count = requests
.iter()
.filter(|request| {
request.method == "POST" && request.url.path().ends_with("/responses")
})
.count();
if responses_request_count > expected_count {
anyhow::bail!(
"expected exactly {expected_count} /responses requests, got {responses_request_count}"
);
}
if responses_request_count == expected_count {
match stable_since {
Some(stable_since) if stable_since.elapsed() >= settle_duration => {
return Ok::<(), anyhow::Error>(());
}
None => stable_since = Some(tokio::time::Instant::now()),
Some(_) => {}
}
} else {
stable_since = None;
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
})
.await??;
Ok(())
}
#[tokio::test]
async fn thread_unsubscribe_unloads_thread_and_emits_thread_closed_notification() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
@@ -168,6 +213,13 @@ async fn thread_unsubscribe_during_turn_interrupts_turn_and_emits_thread_closed(
};
assert_eq!(payload.thread_id, thread_id);
wait_for_responses_request_count_to_stabilize(
&server,
1,
std::time::Duration::from_millis(200),
)
.await?;
Ok(())
}

View File

@@ -25,6 +25,7 @@ use codex_protocol::user_input::UserInput;
use std::sync::Arc;
use std::sync::Weak;
use tokio::sync::watch;
use tracing::debug;
const AGENT_NAMES: &str = include_str!("agent_names.txt");
const FORKED_SPAWN_AGENT_OUTPUT_MESSAGE: &str = "You are the newly spawned agent. The prior conversation history was forked from your parent agent. Treat the next user message as your new task, and use the forked history only as background context.";
@@ -212,8 +213,9 @@ impl AgentControl {
// TODO(jif) add helper for drain
state.notify_thread_created(new_thread.thread_id);
self.maybe_start_completion_watcher(new_thread.thread_id, notification_source)
.await;
self.send_input(new_thread.thread_id, items).await?;
self.maybe_start_completion_watcher(new_thread.thread_id, notification_source);
Ok(new_thread.thread_id)
}
@@ -288,7 +290,8 @@ impl AgentControl {
// Resumed threads are re-registered in-memory and need the same listener
// attachment path as freshly spawned threads.
state.notify_thread_created(resumed_thread.thread_id);
self.maybe_start_completion_watcher(resumed_thread.thread_id, Some(notification_source));
self.maybe_start_completion_watcher(resumed_thread.thread_id, Some(notification_source))
.await;
Ok(resumed_thread.thread_id)
}
@@ -418,7 +421,7 @@ impl AgentControl {
///
/// This is only enabled for `SubAgentSource::ThreadSpawn`, where a parent thread exists and
/// can receive completion notifications.
fn maybe_start_completion_watcher(
async fn maybe_start_completion_watcher(
&self,
child_thread_id: ThreadId,
session_source: Option<SessionSource>,
@@ -429,13 +432,20 @@ impl AgentControl {
else {
return;
};
let status_rx = self.subscribe_status(child_thread_id).await.ok();
let control = self.clone();
tokio::spawn(async move {
let status = match control.subscribe_status(child_thread_id).await {
Ok(mut status_rx) => {
let status = match status_rx {
Some(mut status_rx) => {
debug!(%child_thread_id, "subagent completion watcher attached");
let mut status = status_rx.borrow().clone();
while !is_final(&status) {
if status_rx.changed().await.is_err() {
debug!(
%child_thread_id,
"subagent completion watcher lost status stream; reading latest status"
);
status = control.get_status(child_thread_id).await;
break;
}
@@ -443,9 +453,20 @@ impl AgentControl {
}
status
}
Err(_) => control.get_status(child_thread_id).await,
None => {
debug!(
%child_thread_id,
"subagent completion watcher could not subscribe; reading latest status"
);
control.get_status(child_thread_id).await
}
};
if !is_final(&status) {
debug!(
%child_thread_id,
?status,
"subagent completion watcher exiting before final status"
);
return;
}
@@ -1317,10 +1338,34 @@ mod tests {
.get_thread(child_thread_id)
.await
.expect("child thread should exist");
let mut status_rx = harness
.control
.subscribe_status(child_thread_id)
.await
.expect("status subscription should succeed");
let _ = child_thread
.submit(Op::Shutdown {})
.await
.expect("child shutdown should submit");
timeout(Duration::from_secs(5), async {
loop {
let status = status_rx.borrow().clone();
if is_final(&status) {
break;
}
if status_rx.changed().await.is_err() {
let latest = harness.control.get_status(child_thread_id).await;
assert_eq!(
is_final(&latest),
true,
"child status stream closed before a final status was observable"
);
break;
}
}
})
.await
.expect("child should reach a final status");
assert_eq!(wait_for_subagent_notification(&parent_thread).await, true);
}
@@ -1331,15 +1376,18 @@ mod tests {
let (parent_thread_id, parent_thread) = harness.start_thread().await;
let child_thread_id = ThreadId::new();
harness.control.maybe_start_completion_watcher(
child_thread_id,
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
);
harness
.control
.maybe_start_completion_watcher(
child_thread_id,
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
)
.await;
assert_eq!(wait_for_subagent_notification(&parent_thread).await, true);

View File

@@ -6583,6 +6583,16 @@ async fn try_run_sampling_request(
drain_in_flight(&mut in_flight, sess.clone(), turn_context.clone()).await?;
if cancellation_token.is_cancelled()
&& outcome.as_ref().is_ok_and(|result| result.needs_follow_up)
{
debug!(
turn_id = %turn_context.sub_id,
"turn cancelled after tool drain; suppressing follow-up request"
);
return Err(CodexErr::TurnAborted);
}
if should_emit_turn_diff {
let unified_diff = {
let mut tracker = turn_diff_tracker.lock().await;

View File

@@ -7,3 +7,4 @@ pub(crate) use session::SessionState;
pub(crate) use turn::ActiveTurn;
pub(crate) use turn::RunningTask;
pub(crate) use turn::TaskKind;
pub(crate) use turn::TurnState;

View File

@@ -149,11 +149,3 @@ impl TurnState {
!self.pending_input.is_empty()
}
}
impl ActiveTurn {
/// Clear any pending approvals and input buffered for the current turn.
pub(crate) async fn clear_pending(&self) {
let mut ts = self.turn_state.lock().await;
ts.clear_pending();
}
}

View File

@@ -31,12 +31,14 @@ use crate::protocol::TurnCompleteEvent;
use crate::state::ActiveTurn;
use crate::state::RunningTask;
use crate::state::TaskKind;
use crate::state::TurnState;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::user_input::UserInput;
use tokio::sync::Mutex;
use crate::features::Feature;
pub(crate) use compact::CompactTask;
@@ -192,7 +194,16 @@ impl Session {
}
pub async fn abort_all_tasks(self: &Arc<Self>, reason: TurnAbortReason) {
for task in self.take_all_running_tasks().await {
let (tasks, turn_state) = self.take_all_running_tasks().await;
for task in &tasks {
task.cancellation_token.cancel();
}
if let Some(turn_state) = turn_state {
// Drop pending approvals only after all running tasks observe cancellation, so
// interrupted approval waits resolve as aborts instead of synthetic denials.
turn_state.lock().await.clear_pending();
}
for task in tasks {
self.handle_task_abort(task, reason.clone()).await;
}
if reason == TurnAbortReason::Interrupted {
@@ -328,15 +339,14 @@ impl Session {
*active = Some(turn);
}
async fn take_all_running_tasks(&self) -> Vec<RunningTask> {
async fn take_all_running_tasks(&self) -> (Vec<RunningTask>, Option<Arc<Mutex<TurnState>>>) {
let mut active = self.active_turn.lock().await;
match active.take() {
Some(mut at) => {
at.clear_pending().await;
at.drain_tasks()
let turn_state = Arc::clone(&at.turn_state);
(at.drain_tasks(), Some(turn_state))
}
None => Vec::new(),
None => (Vec::new(), None),
}
}
@@ -349,10 +359,6 @@ impl Session {
async fn handle_task_abort(self: &Arc<Self>, task: RunningTask, reason: TurnAbortReason) {
let sub_id = task.turn_context.sub_id.clone();
if task.cancellation_token.is_cancelled() {
return;
}
trace!(task_kind = ?task.kind, sub_id, "aborting running task");
task.cancellation_token.cancel();
task.turn_context

View File

@@ -239,3 +239,59 @@ async fn interrupt_persists_turn_aborted_marker_in_next_request() {
"expected <turn_aborted> marker in follow-up request"
);
}
/// Interrupting a turn while a tool-produced follow-up is pending must not
/// start another model request before the session reports TurnAborted.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn interrupt_does_not_issue_follow_up_request() {
let command = "sleep 60";
let call_id = "call-no-follow-up";
let args = json!({
"command": command,
"timeout_ms": 60_000
})
.to_string();
let first_body = sse(vec![
ev_response_created("resp-no-follow-up"),
ev_function_call(call_id, "shell_command", &args),
ev_completed("resp-no-follow-up"),
]);
let server = start_mock_server().await;
let response_mock = mount_sse_once(&server, first_body).await;
let fixture = test_codex()
.with_model("gpt-5.1")
.build(&server)
.await
.unwrap();
let codex = Arc::clone(&fixture.codex);
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "start interrupt follow-up guard".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecCommandBegin(_))).await;
tokio::time::sleep(Duration::from_secs_f32(0.1)).await;
codex.submit(Op::Interrupt).await.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnAborted(_))).await;
tokio::time::sleep(Duration::from_millis(200)).await;
let requests = response_mock.requests();
assert_eq!(
requests.len(),
1,
"interrupt should not issue a follow-up responses request"
);
}

View File

@@ -22,7 +22,6 @@ anyhow = { workspace = true }
uds_windows = { workspace = true }
[dev-dependencies]
assert_cmd = { workspace = true }
codex-utils-cargo-bin = { workspace = true }
pretty_assertions = { workspace = true }
tempfile = { workspace = true }

View File

@@ -1,12 +1,15 @@
use std::io::ErrorKind;
use std::io::Read;
use std::io::Write;
use std::process::Command;
use std::process::Stdio;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
use std::time::Instant;
use anyhow::Context;
use assert_cmd::Command;
use anyhow::anyhow;
use pretty_assertions::assert_eq;
#[cfg(unix)]
@@ -19,6 +22,9 @@ use uds_windows::UnixListener;
fn pipes_stdin_and_stdout_through_socket() -> anyhow::Result<()> {
let dir = tempfile::TempDir::new().context("failed to create temp dir")?;
let socket_path = dir.path().join("socket");
let request = b"request";
let request_path = dir.path().join("request.txt");
std::fs::write(&request_path, request).context("failed to write child stdin fixture")?;
let listener = match UnixListener::bind(&socket_path) {
Ok(listener) => listener,
Err(err) if err.kind() == ErrorKind::PermissionDenied => {
@@ -31,37 +37,103 @@ fn pipes_stdin_and_stdout_through_socket() -> anyhow::Result<()> {
};
let (tx, rx) = mpsc::channel();
let (event_tx, event_rx) = mpsc::channel();
let server_thread = thread::spawn(move || -> anyhow::Result<()> {
let _ = event_tx.send("waiting for accept".to_string());
let (mut connection, _) = listener
.accept()
.context("failed to accept test connection")?;
let mut received = Vec::new();
let _ = event_tx.send("accepted connection".to_string());
let mut received = vec![0; request.len()];
connection
.read_to_end(&mut received)
.read_exact(&mut received)
.context("failed to read data from client")?;
let _ = event_tx.send(format!("read {} bytes", received.len()));
tx.send(received)
.map_err(|_| anyhow::anyhow!("failed to send received bytes to test thread"))?;
.map_err(|_| anyhow!("failed to send received bytes to test thread"))?;
connection
.write_all(b"response")
.context("failed to write response to client")?;
let _ = event_tx.send("wrote response".to_string());
Ok(())
});
Command::new(codex_utils_cargo_bin::cargo_bin("codex-stdio-to-uds")?)
let stdin = std::fs::File::open(&request_path).context("failed to open child stdin fixture")?;
let mut child = Command::new(codex_utils_cargo_bin::cargo_bin("codex-stdio-to-uds")?)
.arg(&socket_path)
.write_stdin("request")
.assert()
.success()
.stdout("response");
.stdin(Stdio::from(stdin))
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.context("failed to spawn codex-stdio-to-uds")?;
let mut child_stdout = child.stdout.take().context("missing child stdout")?;
let mut child_stderr = child.stderr.take().context("missing child stderr")?;
let (stdout_tx, stdout_rx) = mpsc::channel();
let (stderr_tx, stderr_rx) = mpsc::channel();
thread::spawn(move || {
let mut stdout = Vec::new();
let result = child_stdout.read_to_end(&mut stdout).map(|_| stdout);
let _ = stdout_tx.send(result);
});
thread::spawn(move || {
let mut stderr = Vec::new();
let result = child_stderr.read_to_end(&mut stderr).map(|_| stderr);
let _ = stderr_tx.send(result);
});
let mut server_events = Vec::new();
let deadline = Instant::now() + Duration::from_secs(5);
let status = loop {
while let Ok(event) = event_rx.try_recv() {
server_events.push(event);
}
if let Some(status) = child.try_wait().context("failed to poll child status")? {
break status;
}
if Instant::now() >= deadline {
let _ = child.kill();
let _ = child.wait();
let stderr = stderr_rx
.recv_timeout(Duration::from_secs(1))
.context("timed out waiting for child stderr after kill")?
.context("failed to read child stderr")?;
anyhow::bail!(
"codex-stdio-to-uds did not exit in time; server events: {:?}; stderr: {}",
server_events,
String::from_utf8_lossy(&stderr).trim_end()
);
}
thread::sleep(Duration::from_millis(25));
};
let stdout = stdout_rx
.recv_timeout(Duration::from_secs(1))
.context("timed out waiting for child stdout")?
.context("failed to read child stdout")?;
let stderr = stderr_rx
.recv_timeout(Duration::from_secs(1))
.context("timed out waiting for child stderr")?
.context("failed to read child stderr")?;
assert!(
status.success(),
"codex-stdio-to-uds exited with {status}; server events: {:?}; stderr: {}",
server_events,
String::from_utf8_lossy(&stderr).trim_end()
);
assert_eq!(stdout, b"response");
let received = rx
.recv_timeout(Duration::from_secs(1))
.context("server did not receive data in time")?;
assert_eq!(received, b"request");
assert_eq!(received, request);
let server_result = server_thread
.join()
.map_err(|_| anyhow::anyhow!("server thread panicked"))?;
.map_err(|_| anyhow!("server thread panicked"))?;
server_result.context("server failed")?;
Ok(())

View File

@@ -94,52 +94,36 @@ async fn collect_output_until_exit(
}
async fn wait_for_python_repl_ready(
writer: &tokio::sync::mpsc::Sender<Vec<u8>>,
output_rx: &mut tokio::sync::broadcast::Receiver<Vec<u8>>,
timeout_ms: u64,
newline: &str,
ready_marker: &str,
) -> anyhow::Result<Vec<u8>> {
let mut collected = Vec::new();
let marker = "__codex_pty_ready__";
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_millis(timeout_ms);
let probe_window = tokio::time::Duration::from_millis(if cfg!(windows) { 750 } else { 250 });
while tokio::time::Instant::now() < deadline {
writer
.send(format!("print('{marker}'){newline}").into_bytes())
.await?;
let probe_deadline = tokio::time::Instant::now() + probe_window;
loop {
let now = tokio::time::Instant::now();
if now >= deadline || now >= probe_deadline {
break;
}
let remaining = std::cmp::min(
deadline.saturating_duration_since(now),
probe_deadline.saturating_duration_since(now),
);
match tokio::time::timeout(remaining, output_rx.recv()).await {
Ok(Ok(chunk)) => {
collected.extend_from_slice(&chunk);
if String::from_utf8_lossy(&collected).contains(marker) {
return Ok(collected);
}
let now = tokio::time::Instant::now();
let remaining = deadline.saturating_duration_since(now);
match tokio::time::timeout(remaining, output_rx.recv()).await {
Ok(Ok(chunk)) => {
collected.extend_from_slice(&chunk);
if String::from_utf8_lossy(&collected).contains(ready_marker) {
return Ok(collected);
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
anyhow::bail!(
"PTY output closed while waiting for Python REPL readiness: {:?}",
String::from_utf8_lossy(&collected)
);
}
Err(_) => break,
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
anyhow::bail!(
"PTY output closed while waiting for Python REPL readiness: {:?}",
String::from_utf8_lossy(&collected)
);
}
Err(_) => break,
}
}
anyhow::bail!(
"timed out waiting for Python REPL readiness in PTY: {:?}",
"timed out waiting for Python REPL readiness marker {ready_marker:?} in PTY: {:?}",
String::from_utf8_lossy(&collected)
);
}
@@ -218,14 +202,21 @@ async fn pty_python_repl_emits_output_and_exits() -> anyhow::Result<()> {
return Ok(());
};
let ready_marker = "__codex_pty_ready__";
let args = vec![
"-i".to_string(),
"-q".to_string(),
"-c".to_string(),
format!("print('{ready_marker}')"),
];
let env_map: HashMap<String, String> = std::env::vars().collect();
let spawned = spawn_pty_process(&python, &[], Path::new("."), &env_map, &None).await?;
let spawned = spawn_pty_process(&python, &args, Path::new("."), &env_map, &None).await?;
let writer = spawned.session.writer_sender();
let mut output_rx = spawned.output_rx;
let newline = if cfg!(windows) { "\r\n" } else { "\n" };
let startup_timeout_ms = if cfg!(windows) { 10_000 } else { 5_000 };
let mut output =
wait_for_python_repl_ready(&writer, &mut output_rx, startup_timeout_ms, newline).await?;
wait_for_python_repl_ready(&mut output_rx, startup_timeout_ms, ready_marker).await?;
writer
.send(format!("print('hello from pty'){newline}").into_bytes())
.await?;