Compare commits

...

43 Commits

Author SHA1 Message Date
Ahmed Ibrahim
75499a381b codex: fix Bazel insta snapshot paths (#13593) 2026-03-07 10:42:25 -08:00
Ahmed Ibrahim
bddc6cd53e codex: align guardian snapshot metadata 2026-03-07 10:26:19 -08:00
Ahmed Ibrahim
7456d28a7a codex: include core snapshots in bazel tests 2026-03-07 10:25:14 -08:00
Ahmed Ibrahim
406b5b386c codex: fix non-linux guardian popup snapshot 2026-03-07 10:21:29 -08:00
Ahmed Ibrahim
da2ddb4bf0 codex: stabilize guardian snapshots on linux 2026-03-07 10:19:58 -08:00
Ahmed Ibrahim
2260beb57f codex: restore guardian popup snapshot 2026-03-07 10:18:57 -08:00
Ahmed Ibrahim
dba2ee22dc codex: fix guardian popup snapshot 2026-03-07 10:13:34 -08:00
Ahmed Ibrahim
ea0e790813 codex: stabilize guardian permissions validation test 2026-03-07 09:54:59 -08:00
Ahmed Ibrahim
c42121a9f1 codex: fix guardian CI drift on PR #13593 2026-03-07 09:39:19 -08:00
Ahmed Ibrahim
f0cb95be20 Merge remote-tracking branch 'origin/main' into dev/pr13593-lint-fix 2026-03-07 09:10:33 -08:00
Ahmed Ibrahim
e6f39a2d94 codex: fix rmcp lint regression 2026-03-07 08:52:55 -08:00
Ahmed Ibrahim
78fb500503 Merge branch 'main' into codex/flaky-test-stabilization-3 2026-03-07 08:47:37 -08:00
Ahmed Ibrahim
9c5049c21a codex: clean flaky test stabilization PR 2026-03-07 08:36:14 -08:00
Ahmed Ibrahim
faf223c4ae codex: validate CI stability (#13593) 2026-03-07 04:40:19 -08:00
Ahmed Ibrahim
762d419a48 codex: validate CI stability (#13593) 2026-03-07 03:56:55 -08:00
Ahmed Ibrahim
38354d89f0 codex: validate CI stability (#13593) 2026-03-07 03:27:33 -08:00
Ahmed Ibrahim
e064e7035b codex: validate CI stability (#13593) 2026-03-07 03:05:51 -08:00
Ahmed Ibrahim
c0b50b4ddc codex: add missing initialize forwarding hook (#13593) 2026-03-07 02:43:59 -08:00
Ahmed Ibrahim
0de39cd7ce codex: order websocket initialize readiness after handshake (#13593) 2026-03-07 02:39:20 -08:00
Ahmed Ibrahim
93af8e0d57 codex: validate flaky stabilization streak (#13593) 2026-03-07 02:10:32 -08:00
Ahmed Ibrahim
3b79f58849 codex: stabilize shell serialization duration test (#13593) 2026-03-07 01:46:25 -08:00
Ahmed Ibrahim
488a602833 codex: stabilize abort history test (#13593) 2026-03-07 01:33:48 -08:00
Ahmed Ibrahim
72cf2819be codex: normalize schema fixture TS paths (#13593) 2026-03-07 01:18:23 -08:00
Ahmed Ibrahim
e64133e403 codex: fix schema fixture compile regression (#13593) 2026-03-07 00:55:47 -08:00
Ahmed Ibrahim
c00078dd91 codex: reduce flaky test timeout pressure (#13593) 2026-03-07 00:50:42 -08:00
Ahmed Ibrahim
e951d6167f codex: validate flaky CI streak (5/5) (#13593) 2026-03-07 00:08:58 -08:00
Ahmed Ibrahim
2b82a61f7e codex: validate flaky CI streak (4/5) (#13593) 2026-03-06 23:48:01 -08:00
Ahmed Ibrahim
a5e13e321b codex: validate flaky CI streak (3/5) (#13593) 2026-03-06 23:21:57 -08:00
Ahmed Ibrahim
b5208d7979 codex: validate flaky CI streak (2/5) (#13593) 2026-03-06 21:38:43 -08:00
Ahmed Ibrahim
518c9a7ccf codex: fix rmcp pid-file race (#13593) 2026-03-06 21:15:48 -08:00
Ahmed Ibrahim
12c68ddc19 codex: shrink flaky protocol export test (#13593) 2026-03-06 21:03:34 -08:00
Ahmed Ibrahim
a9406ce8e8 codex: satisfy realtime startup context clippy (#13593) 2026-03-06 20:34:20 -08:00
Ahmed Ibrahim
484668e073 codex: fix realtime startup context close race (#13593) 2026-03-06 20:28:25 -08:00
Ahmed Ibrahim
ce981d77d2 codex: fix flaky realtime startup context test (#13593) 2026-03-06 20:14:50 -08:00
Ahmed Ibrahim
427880981c codex: validate flaky test stabilization (#13593) [4/5] 2026-03-06 20:03:17 -08:00
Ahmed Ibrahim
38741c5c78 codex: validate flaky test stabilization (#13593) [3/5] 2026-03-06 19:39:21 -08:00
Ahmed Ibrahim
82cc8397d4 codex: validate flaky test stabilization (#13593) [2/5] 2026-03-06 19:16:29 -08:00
Ahmed Ibrahim
3f393b65fb codex: fix flaky shell serialization timeout (#13593) 2026-03-06 18:45:46 -08:00
Ahmed Ibrahim
56bf69c219 codex: validate flaky test stabilization (#13593) [2/5] 2026-03-06 18:33:10 -08:00
Ahmed Ibrahim
c38f0516c6 codex: fix flaky schema fixture timeout (#13593) 2026-03-06 18:08:01 -08:00
Ahmed Ibrahim
f60933bc8c codex: validate flaky test stabilization (#13593) 2026-03-06 17:37:16 -08:00
Ahmed Ibrahim
38065128ed codex: validate flaky test stabilization (#13593) 2026-03-06 16:59:00 -08:00
Ahmed Ibrahim
91af9d634e codex: stabilize flaky tests 2026-03-06 16:07:07 -08:00
42 changed files with 1384 additions and 650 deletions

View File

@@ -2,6 +2,12 @@
# Do not increase, fix your test instead
slow-timeout = { period = "15s", terminate-after = 2 }
[test-groups.app_server_protocol_codegen]
max-threads = 1
[test-groups.app_server_integration]
max-threads = 1
[[profile.default.overrides]]
# Do not add new tests here
@@ -11,3 +17,13 @@ slow-timeout = { period = "1m", terminate-after = 4 }
[[profile.default.overrides]]
filter = 'test(approval_matrix_covers_all_modes)'
slow-timeout = { period = "30s", terminate-after = 2 }
[[profile.default.overrides]]
filter = 'package(codex-app-server-protocol) & (test(typescript_schema_fixtures_match_generated) | test(json_schema_fixtures_match_generated) | test(generate_ts_with_experimental_api_retains_experimental_entries) | test(generated_ts_optional_nullable_fields_only_in_params) | test(generate_json_filters_experimental_fields_and_methods))'
test-group = 'app_server_protocol_codegen'
[[profile.default.overrides]]
# These tests spawn a fresh app-server subprocess per case. Serializing them keeps
# Windows startup deterministic without widening the read timeouts.
filter = 'package(codex-app-server)'
test-group = 'app_server_integration'

1
codex-rs/Cargo.lock generated
View File

@@ -2414,7 +2414,6 @@ name = "codex-stdio-to-uds"
version = "0.0.0"
dependencies = [
"anyhow",
"assert_cmd",
"codex-utils-cargo-bin",
"pretty_assertions",
"tempfile",

View File

@@ -23,6 +23,7 @@ use schemars::schema_for;
use serde::Serialize;
use serde_json::Map;
use serde_json::Value;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::HashSet;
use std::ffi::OsStr;
@@ -32,9 +33,10 @@ use std::io::Write;
use std::path::Path;
use std::path::PathBuf;
use std::process::Command;
use std::thread;
use ts_rs::TS;
const HEADER: &str = "// GENERATED CODE! DO NOT MODIFY BY HAND!\n\n";
pub(crate) const GENERATED_TS_HEADER: &str = "// GENERATED CODE! DO NOT MODIFY BY HAND!\n\n";
const IGNORED_DEFINITIONS: &[&str] = &["Option<()>"];
const JSON_V1_ALLOWLIST: &[&str] = &["InitializeParams", "InitializeResponse"];
const SPECIAL_DEFINITIONS: &[&str] = &[
@@ -137,9 +139,29 @@ pub fn generate_ts_with_options(
}
if options.ensure_headers {
for file in &ts_files {
prepend_header_if_missing(file)?;
}
let worker_count = thread::available_parallelism()
.map_or(1, usize::from)
.min(ts_files.len().max(1));
let chunk_size = ts_files.len().div_ceil(worker_count);
thread::scope(|scope| -> Result<()> {
let mut workers = Vec::new();
for chunk in ts_files.chunks(chunk_size.max(1)) {
workers.push(scope.spawn(move || -> Result<()> {
for file in chunk {
prepend_header_if_missing(file)?;
}
Ok(())
}));
}
for worker in workers {
worker
.join()
.map_err(|_| anyhow!("TypeScript header worker panicked"))??;
}
Ok(())
})?;
}
// Optionally run Prettier on all generated TS files.
@@ -231,6 +253,41 @@ fn filter_experimental_ts(out_dir: &Path) -> Result<()> {
Ok(())
}
pub(crate) fn filter_experimental_ts_tree(tree: &mut BTreeMap<PathBuf, String>) -> Result<()> {
let registered_fields = experimental_fields();
let experimental_method_types = experimental_method_types();
if let Some(content) = tree.get_mut(Path::new("ClientRequest.ts")) {
let filtered =
filter_client_request_ts_contents(std::mem::take(content), EXPERIMENTAL_CLIENT_METHODS);
*content = filtered;
}
let mut fields_by_type_name: HashMap<String, HashSet<String>> = HashMap::new();
for field in registered_fields {
fields_by_type_name
.entry(field.type_name.to_string())
.or_default()
.insert(field.field_name.to_string());
}
for (path, content) in tree.iter_mut() {
let Some(type_name) = path.file_stem().and_then(|stem| stem.to_str()) else {
continue;
};
let Some(experimental_field_names) = fields_by_type_name.get(type_name) else {
continue;
};
let filtered = filter_experimental_type_fields_ts_contents(
std::mem::take(content),
experimental_field_names,
);
*content = filtered;
}
remove_generated_type_entries(tree, &experimental_method_types, "ts");
Ok(())
}
/// Removes union arms from `ClientRequest.ts` for methods marked experimental.
fn filter_client_request_ts(out_dir: &Path, experimental_methods: &[&str]) -> Result<()> {
let path = out_dir.join("ClientRequest.ts");
@@ -239,9 +296,15 @@ fn filter_client_request_ts(out_dir: &Path, experimental_methods: &[&str]) -> Re
}
let mut content =
fs::read_to_string(&path).with_context(|| format!("Failed to read {}", path.display()))?;
content = filter_client_request_ts_contents(content, experimental_methods);
fs::write(&path, content).with_context(|| format!("Failed to write {}", path.display()))?;
Ok(())
}
fn filter_client_request_ts_contents(mut content: String, experimental_methods: &[&str]) -> String {
let Some((prefix, body, suffix)) = split_type_alias(&content) else {
return Ok(());
return content;
};
let experimental_methods: HashSet<&str> = experimental_methods
.iter()
@@ -259,12 +322,9 @@ fn filter_client_request_ts(out_dir: &Path, experimental_methods: &[&str]) -> Re
let new_body = filtered_arms.join(" | ");
content = format!("{prefix}{new_body}{suffix}");
let import_usage_scope = split_type_alias(&content)
.map(|(_, body, _)| body)
.map(|(_, filtered_body, _)| filtered_body)
.unwrap_or_else(|| new_body.clone());
content = prune_unused_type_imports(content, &import_usage_scope);
fs::write(&path, content).with_context(|| format!("Failed to write {}", path.display()))?;
Ok(())
prune_unused_type_imports(content, &import_usage_scope)
}
/// Removes experimental properties from generated TypeScript type files.
@@ -302,8 +362,17 @@ fn filter_experimental_fields_in_ts_file(
) -> Result<()> {
let mut content =
fs::read_to_string(path).with_context(|| format!("Failed to read {}", path.display()))?;
content = filter_experimental_type_fields_ts_contents(content, experimental_field_names);
fs::write(path, content).with_context(|| format!("Failed to write {}", path.display()))?;
Ok(())
}
fn filter_experimental_type_fields_ts_contents(
mut content: String,
experimental_field_names: &HashSet<String>,
) -> String {
let Some((open_brace, close_brace)) = type_body_brace_span(&content) else {
return Ok(());
return content;
};
let inner = &content[open_brace + 1..close_brace];
let fields = split_top_level_multi(inner, &[',', ';']);
@@ -322,9 +391,7 @@ fn filter_experimental_fields_in_ts_file(
let import_usage_scope = split_type_alias(&content)
.map(|(_, body, _)| body)
.unwrap_or_else(|| new_inner.clone());
content = prune_unused_type_imports(content, &import_usage_scope);
fs::write(path, content).with_context(|| format!("Failed to write {}", path.display()))?;
Ok(())
prune_unused_type_imports(content, &import_usage_scope)
}
fn filter_experimental_schema(bundle: &mut Value) -> Result<()> {
@@ -526,6 +593,23 @@ fn remove_generated_type_files(
Ok(())
}
fn remove_generated_type_entries(
tree: &mut BTreeMap<PathBuf, String>,
type_names: &HashSet<String>,
extension: &str,
) {
for type_name in type_names {
for subdir in ["", "v1", "v2"] {
let path = if subdir.is_empty() {
PathBuf::from(format!("{type_name}.{extension}"))
} else {
PathBuf::from(subdir).join(format!("{type_name}.{extension}"))
};
tree.remove(&path);
}
}
}
fn remove_experimental_method_type_definitions(bundle: &mut Value) {
let type_names = experimental_method_types();
let Some(definitions) = bundle.get_mut("definitions").and_then(Value::as_object_mut) else {
@@ -1807,13 +1891,13 @@ fn prepend_header_if_missing(path: &Path) -> Result<()> {
.with_context(|| format!("Failed to read {}", path.display()))?;
}
if content.starts_with(HEADER) {
if content.starts_with(GENERATED_TS_HEADER) {
return Ok(());
}
let mut f = fs::File::create(path)
.with_context(|| format!("Failed to open {} for writing", path.display()))?;
f.write_all(HEADER.as_bytes())
f.write_all(GENERATED_TS_HEADER.as_bytes())
.with_context(|| format!("Failed to write header to {}", path.display()))?;
f.write_all(content.as_bytes())
.with_context(|| format!("Failed to write content to {}", path.display()))?;
@@ -1858,35 +1942,15 @@ fn ts_files_in_recursive(dir: &Path) -> Result<Vec<PathBuf>> {
/// Generate an index.ts file that re-exports all generated types.
/// This allows consumers to import all types from a single file.
fn generate_index_ts(out_dir: &Path) -> Result<PathBuf> {
let mut entries: Vec<String> = Vec::new();
let mut stems: Vec<String> = ts_files_in(out_dir)?
.into_iter()
.filter_map(|p| {
let stem = p.file_stem()?.to_string_lossy().into_owned();
if stem == "index" { None } else { Some(stem) }
})
.collect();
stems.sort();
stems.dedup();
for name in stems {
entries.push(format!("export type {{ {name} }} from \"./{name}\";\n"));
}
// If this is the root out_dir and a ./v2 folder exists with TS files,
// expose it as a namespace to avoid symbol collisions at the root.
let v2_dir = out_dir.join("v2");
let has_v2_ts = ts_files_in(&v2_dir).map(|v| !v.is_empty()).unwrap_or(false);
if has_v2_ts {
entries.push("export * as v2 from \"./v2\";\n".to_string());
}
let mut content =
String::with_capacity(HEADER.len() + entries.iter().map(String::len).sum::<usize>());
content.push_str(HEADER);
for line in &entries {
content.push_str(line);
}
let content = generated_index_ts_with_header(index_ts_entries(
&ts_files_in(out_dir)?
.iter()
.map(PathBuf::as_path)
.collect::<Vec<_>>(),
ts_files_in(&out_dir.join("v2"))
.map(|v| !v.is_empty())
.unwrap_or(false),
));
let index_path = out_dir.join("index.ts");
let mut f = fs::File::create(&index_path)
@@ -1896,244 +1960,278 @@ fn generate_index_ts(out_dir: &Path) -> Result<PathBuf> {
Ok(index_path)
}
pub(crate) fn generate_index_ts_tree(tree: &mut BTreeMap<PathBuf, String>) {
let root_entries = tree
.keys()
.filter(|path| path.components().count() == 1)
.map(PathBuf::as_path)
.collect::<Vec<_>>();
let has_v2_ts = tree.keys().any(|path| {
path.parent()
.is_some_and(|parent| parent == Path::new("v2"))
&& path.extension() == Some(OsStr::new("ts"))
&& path.file_stem().is_some_and(|stem| stem != "index")
});
tree.insert(
PathBuf::from("index.ts"),
index_ts_entries(&root_entries, has_v2_ts),
);
let v2_entries = tree
.keys()
.filter(|path| {
path.parent()
.is_some_and(|parent| parent == Path::new("v2"))
})
.map(PathBuf::as_path)
.collect::<Vec<_>>();
if !v2_entries.is_empty() {
tree.insert(
PathBuf::from("v2").join("index.ts"),
index_ts_entries(&v2_entries, false),
);
}
}
fn generated_index_ts_with_header(content: String) -> String {
let mut with_header = String::with_capacity(GENERATED_TS_HEADER.len() + content.len());
with_header.push_str(GENERATED_TS_HEADER);
with_header.push_str(&content);
with_header
}
fn index_ts_entries(paths: &[&Path], has_v2_ts: bool) -> String {
let mut stems: Vec<String> = paths
.iter()
.filter(|path| path.extension() == Some(OsStr::new("ts")))
.filter_map(|path| {
let stem = path.file_stem()?.to_string_lossy().into_owned();
if stem == "index" { None } else { Some(stem) }
})
.collect();
stems.sort();
stems.dedup();
let mut entries = String::new();
for name in stems {
entries.push_str(&format!("export type {{ {name} }} from \"./{name}\";\n"));
}
if has_v2_ts {
entries.push_str("export * as v2 from \"./v2\";\n");
}
entries
}
#[cfg(test)]
mod tests {
use super::*;
use crate::protocol::v2;
use crate::schema_fixtures::read_schema_fixture_subtree;
use anyhow::Context;
use anyhow::Result;
use pretty_assertions::assert_eq;
use std::collections::BTreeSet;
use std::fs;
use std::path::Path;
use std::path::PathBuf;
use uuid::Uuid;
#[test]
fn generated_ts_optional_nullable_fields_only_in_params() -> Result<()> {
// Assert that "?: T | null" only appears in generated *Params types.
let output_dir = std::env::temp_dir().join(format!("codex_ts_types_{}", Uuid::now_v7()));
fs::create_dir(&output_dir)?;
let fixture_tree = read_schema_fixture_subtree(&schema_root()?, "typescript")?;
struct TempDirGuard(PathBuf);
impl Drop for TempDirGuard {
fn drop(&mut self) {
let _ = fs::remove_dir_all(&self.0);
}
}
let _guard = TempDirGuard(output_dir.clone());
// Avoid doing more work than necessary to keep the test from timing out.
let options = GenerateTsOptions {
generate_indices: false,
ensure_headers: false,
run_prettier: false,
experimental_api: false,
};
generate_ts_with_options(&output_dir, None, options)?;
let client_request_ts = fs::read_to_string(output_dir.join("ClientRequest.ts"))?;
let client_request_ts = std::str::from_utf8(
fixture_tree
.get(Path::new("ClientRequest.ts"))
.ok_or_else(|| anyhow::anyhow!("missing ClientRequest.ts fixture"))?,
)?;
assert_eq!(client_request_ts.contains("mock/experimentalMethod"), false);
assert_eq!(
client_request_ts.contains("MockExperimentalMethodParams"),
false
);
assert_eq!(output_dir.join("EventMsg.ts").exists(), true);
let thread_start_ts =
fs::read_to_string(output_dir.join("v2").join("ThreadStartParams.ts"))?;
assert_eq!(fixture_tree.contains_key(Path::new("EventMsg.ts")), true);
let thread_start_ts = std::str::from_utf8(
fixture_tree
.get(Path::new("v2/ThreadStartParams.ts"))
.ok_or_else(|| anyhow::anyhow!("missing v2/ThreadStartParams.ts fixture"))?,
)?;
assert_eq!(thread_start_ts.contains("mockExperimentalField"), false);
assert_eq!(
output_dir
.join("v2")
.join("MockExperimentalMethodParams.ts")
.exists(),
fixture_tree.contains_key(Path::new("v2/MockExperimentalMethodParams.ts")),
false
);
assert_eq!(
output_dir
.join("v2")
.join("MockExperimentalMethodResponse.ts")
.exists(),
fixture_tree.contains_key(Path::new("v2/MockExperimentalMethodResponse.ts")),
false
);
let mut undefined_offenders = Vec::new();
let mut optional_nullable_offenders = BTreeSet::new();
let mut stack = vec![output_dir];
while let Some(dir) = stack.pop() {
for entry in fs::read_dir(&dir)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
stack.push(path);
for (path, contents) in &fixture_tree {
if !matches!(path.extension().and_then(|ext| ext.to_str()), Some("ts")) {
continue;
}
// Only allow "?: T | null" in objects representing JSON-RPC requests,
// which we assume are called "*Params".
let allow_optional_nullable = path
.file_stem()
.and_then(|stem| stem.to_str())
.is_some_and(|stem| {
stem.ends_with("Params")
|| stem == "InitializeCapabilities"
|| matches!(
stem,
"CollabAgentRef"
| "CollabAgentStatusEntry"
| "CollabAgentSpawnEndEvent"
| "CollabAgentInteractionEndEvent"
| "CollabCloseEndEvent"
| "CollabResumeBeginEvent"
| "CollabResumeEndEvent"
)
});
let contents = std::str::from_utf8(contents)?;
if contents.contains("| undefined") {
undefined_offenders.push(path.clone());
}
const SKIP_PREFIXES: &[&str] = &[
"const ",
"let ",
"var ",
"export const ",
"export let ",
"export var ",
];
let mut search_start = 0;
while let Some(idx) = contents[search_start..].find("| null") {
let abs_idx = search_start + idx;
// Find the property-colon for this field by scanning forward
// from the start of the segment and ignoring nested braces,
// brackets, and parens. This avoids colons inside nested
// type literals like `{ [k in string]?: string }`.
let line_start_idx = contents[..abs_idx].rfind('\n').map(|i| i + 1).unwrap_or(0);
let mut segment_start_idx = line_start_idx;
if let Some(rel_idx) = contents[line_start_idx..abs_idx].rfind(',') {
segment_start_idx = segment_start_idx.max(line_start_idx + rel_idx + 1);
}
if let Some(rel_idx) = contents[line_start_idx..abs_idx].rfind('{') {
segment_start_idx = segment_start_idx.max(line_start_idx + rel_idx + 1);
}
if let Some(rel_idx) = contents[line_start_idx..abs_idx].rfind('}') {
segment_start_idx = segment_start_idx.max(line_start_idx + rel_idx + 1);
}
// Scan forward for the colon that separates the field name from its type.
let mut level_brace = 0_i32;
let mut level_brack = 0_i32;
let mut level_paren = 0_i32;
let mut in_single = false;
let mut in_double = false;
let mut escape = false;
let mut prop_colon_idx = None;
for (i, ch) in contents[segment_start_idx..abs_idx].char_indices() {
let idx_abs = segment_start_idx + i;
if escape {
escape = false;
continue;
}
match ch {
'\\' => {
if in_single || in_double {
escape = true;
}
}
'\'' => {
if !in_double {
in_single = !in_single;
}
}
'"' => {
if !in_single {
in_double = !in_double;
}
}
'{' if !in_single && !in_double => level_brace += 1,
'}' if !in_single && !in_double => level_brace -= 1,
'[' if !in_single && !in_double => level_brack += 1,
']' if !in_single && !in_double => level_brack -= 1,
'(' if !in_single && !in_double => level_paren += 1,
')' if !in_single && !in_double => level_paren -= 1,
':' if !in_single
&& !in_double
&& level_brace == 0
&& level_brack == 0
&& level_paren == 0 =>
{
prop_colon_idx = Some(idx_abs);
break;
}
_ => {}
}
}
let Some(colon_idx) = prop_colon_idx else {
search_start = abs_idx + 5;
continue;
};
let mut field_prefix = contents[segment_start_idx..colon_idx].trim();
if field_prefix.is_empty() {
search_start = abs_idx + 5;
continue;
}
if matches!(path.extension().and_then(|ext| ext.to_str()), Some("ts")) {
// Only allow "?: T | null" in objects representing JSON-RPC requests,
// which we assume are called "*Params".
let allow_optional_nullable = path
.file_stem()
.and_then(|stem| stem.to_str())
.is_some_and(|stem| {
stem.ends_with("Params")
|| stem == "InitializeCapabilities"
|| matches!(
stem,
"CollabAgentRef"
| "CollabAgentStatusEntry"
| "CollabAgentSpawnEndEvent"
| "CollabAgentInteractionEndEvent"
| "CollabCloseEndEvent"
| "CollabResumeBeginEvent"
| "CollabResumeEndEvent"
)
});
let contents = fs::read_to_string(&path)?;
if contents.contains("| undefined") {
undefined_offenders.push(path.clone());
}
const SKIP_PREFIXES: &[&str] = &[
"const ",
"let ",
"var ",
"export const ",
"export let ",
"export var ",
];
let mut search_start = 0;
while let Some(idx) = contents[search_start..].find("| null") {
let abs_idx = search_start + idx;
// Find the property-colon for this field by scanning forward
// from the start of the segment and ignoring nested braces,
// brackets, and parens. This avoids colons inside nested
// type literals like `{ [k in string]?: string }`.
let line_start_idx =
contents[..abs_idx].rfind('\n').map(|i| i + 1).unwrap_or(0);
let mut segment_start_idx = line_start_idx;
if let Some(rel_idx) = contents[line_start_idx..abs_idx].rfind(',') {
segment_start_idx = segment_start_idx.max(line_start_idx + rel_idx + 1);
}
if let Some(rel_idx) = contents[line_start_idx..abs_idx].rfind('{') {
segment_start_idx = segment_start_idx.max(line_start_idx + rel_idx + 1);
}
if let Some(rel_idx) = contents[line_start_idx..abs_idx].rfind('}') {
segment_start_idx = segment_start_idx.max(line_start_idx + rel_idx + 1);
}
// Scan forward for the colon that separates the field name from its type.
let mut level_brace = 0_i32;
let mut level_brack = 0_i32;
let mut level_paren = 0_i32;
let mut in_single = false;
let mut in_double = false;
let mut escape = false;
let mut prop_colon_idx = None;
for (i, ch) in contents[segment_start_idx..abs_idx].char_indices() {
let idx_abs = segment_start_idx + i;
if escape {
escape = false;
continue;
}
match ch {
'\\' => {
// Only treat as escape when inside a string.
if in_single || in_double {
escape = true;
}
}
'\'' => {
if !in_double {
in_single = !in_single;
}
}
'"' => {
if !in_single {
in_double = !in_double;
}
}
'{' if !in_single && !in_double => level_brace += 1,
'}' if !in_single && !in_double => level_brace -= 1,
'[' if !in_single && !in_double => level_brack += 1,
']' if !in_single && !in_double => level_brack -= 1,
'(' if !in_single && !in_double => level_paren += 1,
')' if !in_single && !in_double => level_paren -= 1,
':' if !in_single
&& !in_double
&& level_brace == 0
&& level_brack == 0
&& level_paren == 0 =>
{
prop_colon_idx = Some(idx_abs);
break;
}
_ => {}
}
}
let Some(colon_idx) = prop_colon_idx else {
search_start = abs_idx + 5;
continue;
};
let mut field_prefix = contents[segment_start_idx..colon_idx].trim();
if field_prefix.is_empty() {
search_start = abs_idx + 5;
continue;
}
if let Some(comment_idx) = field_prefix.rfind("*/") {
field_prefix = field_prefix[comment_idx + 2..].trim_start();
}
if field_prefix.is_empty() {
search_start = abs_idx + 5;
continue;
}
if SKIP_PREFIXES
.iter()
.any(|prefix| field_prefix.starts_with(prefix))
{
search_start = abs_idx + 5;
continue;
}
if field_prefix.contains('(') {
search_start = abs_idx + 5;
continue;
}
// If the last non-whitespace before ':' is '?', then this is an
// optional field with a nullable type (i.e., "?: T | null").
// These are only allowed in *Params types.
if field_prefix.chars().rev().find(|c| !c.is_whitespace()) == Some('?')
&& !allow_optional_nullable
{
let line_number =
contents[..abs_idx].chars().filter(|c| *c == '\n').count() + 1;
let offending_line_end = contents[line_start_idx..]
.find('\n')
.map(|i| line_start_idx + i)
.unwrap_or(contents.len());
let offending_snippet =
contents[line_start_idx..offending_line_end].trim();
optional_nullable_offenders.insert(format!(
"{}:{}: {offending_snippet}",
path.display(),
line_number
));
}
search_start = abs_idx + 5;
}
if let Some(comment_idx) = field_prefix.rfind("*/") {
field_prefix = field_prefix[comment_idx + 2..].trim_start();
}
if field_prefix.is_empty() {
search_start = abs_idx + 5;
continue;
}
if SKIP_PREFIXES
.iter()
.any(|prefix| field_prefix.starts_with(prefix))
{
search_start = abs_idx + 5;
continue;
}
if field_prefix.contains('(') {
search_start = abs_idx + 5;
continue;
}
// If the last non-whitespace before ':' is '?', then this is an
// optional field with a nullable type (i.e., "?: T | null").
// These are only allowed in *Params types.
if field_prefix.chars().rev().find(|c| !c.is_whitespace()) == Some('?')
&& !allow_optional_nullable
{
let line_number =
contents[..abs_idx].chars().filter(|c| *c == '\n').count() + 1;
let offending_line_end = contents[line_start_idx..]
.find('\n')
.map(|i| line_start_idx + i)
.unwrap_or(contents.len());
let offending_snippet = contents[line_start_idx..offending_line_end].trim();
optional_nullable_offenders.insert(format!(
"{}:{}: {offending_snippet}",
path.display(),
line_number
));
}
search_start = abs_idx + 5;
}
}
@@ -2153,55 +2251,40 @@ mod tests {
Ok(())
}
fn schema_root() -> Result<PathBuf> {
let typescript_index = codex_utils_cargo_bin::find_resource!("schema/typescript/index.ts")
.context("resolve TypeScript schema index.ts")?;
let schema_root = typescript_index
.parent()
.and_then(|parent| parent.parent())
.context("derive schema root from schema/typescript/index.ts")?
.to_path_buf();
Ok(schema_root)
}
#[test]
fn generate_ts_with_experimental_api_retains_experimental_entries() -> Result<()> {
let output_dir =
std::env::temp_dir().join(format!("codex_ts_types_experimental_{}", Uuid::now_v7()));
fs::create_dir(&output_dir)?;
struct TempDirGuard(PathBuf);
impl Drop for TempDirGuard {
fn drop(&mut self) {
let _ = fs::remove_dir_all(&self.0);
}
}
let _guard = TempDirGuard(output_dir.clone());
let options = GenerateTsOptions {
generate_indices: false,
ensure_headers: false,
run_prettier: false,
experimental_api: true,
};
generate_ts_with_options(&output_dir, None, options)?;
let client_request_ts = fs::read_to_string(output_dir.join("ClientRequest.ts"))?;
let client_request_ts = ClientRequest::export_to_string()?;
assert_eq!(client_request_ts.contains("mock/experimentalMethod"), true);
assert_eq!(
output_dir
.join("v2")
.join("MockExperimentalMethodParams.ts")
.exists(),
client_request_ts.contains("MockExperimentalMethodParams"),
true
);
assert_eq!(
output_dir
.join("v2")
.join("MockExperimentalMethodResponse.ts")
.exists(),
v2::MockExperimentalMethodParams::export_to_string()?
.contains("MockExperimentalMethodParams"),
true
);
assert_eq!(
v2::MockExperimentalMethodResponse::export_to_string()?
.contains("MockExperimentalMethodResponse"),
true
);
let thread_start_ts =
fs::read_to_string(output_dir.join("v2").join("ThreadStartParams.ts"))?;
let thread_start_ts = v2::ThreadStartParams::export_to_string()?;
assert_eq!(thread_start_ts.contains("mockExperimentalField"), true);
let command_execution_request_approval_ts = fs::read_to_string(
output_dir
.join("v2")
.join("CommandExecutionRequestApprovalParams.ts"),
)?;
let command_execution_request_approval_ts =
v2::CommandExecutionRequestApprovalParams::export_to_string()?;
assert_eq!(
command_execution_request_approval_ts.contains("additionalPermissions"),
true

View File

@@ -17,6 +17,9 @@ pub use protocol::thread_history::*;
pub use protocol::v1::*;
pub use protocol::v2::*;
pub use schema_fixtures::SchemaFixtureOptions;
#[doc(hidden)]
pub use schema_fixtures::generate_typescript_schema_fixture_subtree_for_tests;
pub use schema_fixtures::read_schema_fixture_subtree;
pub use schema_fixtures::read_schema_fixture_tree;
pub use schema_fixtures::write_schema_fixtures;
pub use schema_fixtures::write_schema_fixtures_with_options;

View File

@@ -151,6 +151,12 @@ macro_rules! client_request_definitions {
Ok(())
}
pub(crate) fn visit_client_response_types(v: &mut impl ::ts_rs::TypeVisitor) {
$(
v.visit::<$response>();
)*
}
#[allow(clippy::vec_init_then_push)]
pub fn export_client_response_schemas(
out_dir: &::std::path::Path,
@@ -525,6 +531,12 @@ macro_rules! server_request_definitions {
Ok(())
}
pub(crate) fn visit_server_response_types(v: &mut impl ::ts_rs::TypeVisitor) {
$(
v.visit::<$response>();
)*
}
#[allow(clippy::vec_init_then_push)]
pub fn export_server_response_schemas(
out_dir: &Path,

View File

@@ -1,11 +1,25 @@
use crate::ClientNotification;
use crate::ClientRequest;
use crate::ServerNotification;
use crate::ServerRequest;
use crate::export::GENERATED_TS_HEADER;
use crate::export::filter_experimental_ts_tree;
use crate::export::generate_index_ts_tree;
use crate::protocol::common::visit_client_response_types;
use crate::protocol::common::visit_server_response_types;
use anyhow::Context;
use anyhow::Result;
use codex_protocol::protocol::EventMsg;
use serde_json::Map;
use serde_json::Value;
use std::any::TypeId;
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::collections::HashSet;
use std::path::Path;
use std::path::PathBuf;
use ts_rs::TS;
use ts_rs::TypeVisitor;
#[derive(Clone, Copy, Debug, Default)]
pub struct SchemaFixtureOptions {
@@ -27,6 +41,42 @@ pub fn read_schema_fixture_tree(schema_root: &Path) -> Result<BTreeMap<PathBuf,
Ok(all)
}
pub fn read_schema_fixture_subtree(
schema_root: &Path,
label: &str,
) -> Result<BTreeMap<PathBuf, Vec<u8>>> {
let subtree_root = schema_root.join(label);
collect_files_recursive(&subtree_root)
.with_context(|| format!("read schema fixture subtree {}", subtree_root.display()))
}
#[doc(hidden)]
pub fn generate_typescript_schema_fixture_subtree_for_tests() -> Result<BTreeMap<PathBuf, Vec<u8>>>
{
let mut files = BTreeMap::new();
let mut seen = HashSet::new();
collect_typescript_fixture_file::<ClientRequest>(&mut files, &mut seen)?;
visit_typescript_fixture_dependencies(&mut files, &mut seen, |visitor| {
visit_client_response_types(visitor);
})?;
collect_typescript_fixture_file::<ClientNotification>(&mut files, &mut seen)?;
collect_typescript_fixture_file::<ServerRequest>(&mut files, &mut seen)?;
visit_typescript_fixture_dependencies(&mut files, &mut seen, |visitor| {
visit_server_response_types(visitor);
})?;
collect_typescript_fixture_file::<ServerNotification>(&mut files, &mut seen)?;
collect_typescript_fixture_file::<EventMsg>(&mut files, &mut seen)?;
filter_experimental_ts_tree(&mut files)?;
generate_index_ts_tree(&mut files);
Ok(files
.into_iter()
.map(|(path, content)| (path, content.into_bytes()))
.collect())
}
/// Regenerates `schema/typescript/` and `schema/json/`.
///
/// This is intended to be used by tooling (e.g., `just write-app-server-schema`).
@@ -86,6 +136,12 @@ fn read_file_bytes(path: &Path) -> Result<Vec<u8>> {
let text = String::from_utf8(bytes)
.with_context(|| format!("expected UTF-8 TypeScript in {}", path.display()))?;
let text = text.replace("\r\n", "\n").replace('\r', "\n");
// Fixture comparisons care about schema content, not whether the generator
// re-prepended the standard banner to every TypeScript file.
let text = text
.strip_prefix(GENERATED_TS_HEADER)
.unwrap_or(&text)
.to_string();
return Ok(text.into_bytes());
}
Ok(bytes)
@@ -209,6 +265,73 @@ fn collect_files_recursive(root: &Path) -> Result<BTreeMap<PathBuf, Vec<u8>>> {
Ok(files)
}
fn collect_typescript_fixture_file<T: TS + 'static + ?Sized>(
files: &mut BTreeMap<PathBuf, String>,
seen: &mut HashSet<TypeId>,
) -> Result<()> {
let Some(output_path) = T::output_path() else {
return Ok(());
};
if !seen.insert(TypeId::of::<T>()) {
return Ok(());
}
let contents = T::export_to_string().context("export TypeScript fixture content")?;
let output_path = normalize_relative_fixture_path(&output_path);
files.insert(
output_path,
contents.replace("\r\n", "\n").replace('\r', "\n"),
);
let mut visitor = TypeScriptFixtureCollector {
files,
seen,
error: None,
};
T::visit_dependencies(&mut visitor);
if let Some(error) = visitor.error {
return Err(error);
}
Ok(())
}
fn normalize_relative_fixture_path(path: &Path) -> PathBuf {
path.components().collect()
}
fn visit_typescript_fixture_dependencies(
files: &mut BTreeMap<PathBuf, String>,
seen: &mut HashSet<TypeId>,
visit: impl FnOnce(&mut TypeScriptFixtureCollector<'_>),
) -> Result<()> {
let mut visitor = TypeScriptFixtureCollector {
files,
seen,
error: None,
};
visit(&mut visitor);
if let Some(error) = visitor.error {
return Err(error);
}
Ok(())
}
struct TypeScriptFixtureCollector<'a> {
files: &'a mut BTreeMap<PathBuf, String>,
seen: &'a mut HashSet<TypeId>,
error: Option<anyhow::Error>,
}
impl TypeVisitor for TypeScriptFixtureCollector<'_> {
fn visit<T: TS + 'static + ?Sized>(&mut self) {
if self.error.is_some() {
return;
}
self.error = collect_typescript_fixture_file::<T>(self.files, self.seen).err();
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -1,19 +1,60 @@
use anyhow::Context;
use anyhow::Result;
use codex_app_server_protocol::read_schema_fixture_tree;
use codex_app_server_protocol::write_schema_fixtures;
use codex_app_server_protocol::generate_json_with_experimental;
use codex_app_server_protocol::generate_typescript_schema_fixture_subtree_for_tests;
use codex_app_server_protocol::read_schema_fixture_subtree;
use similar::TextDiff;
use std::collections::BTreeMap;
use std::path::Path;
use std::path::PathBuf;
#[test]
fn schema_fixtures_match_generated() -> Result<()> {
fn typescript_schema_fixtures_match_generated() -> Result<()> {
let schema_root = schema_root()?;
let fixture_tree = read_tree(&schema_root)?;
let fixture_tree = read_tree(&schema_root, "typescript")?;
let generated_tree = generate_typescript_schema_fixture_subtree_for_tests()
.context("generate in-memory typescript schema fixtures")?;
assert_schema_trees_match("typescript", &fixture_tree, &generated_tree)?;
Ok(())
}
#[test]
fn json_schema_fixtures_match_generated() -> Result<()> {
assert_schema_fixtures_match_generated("json", |output_dir| {
generate_json_with_experimental(output_dir, false)
})
}
fn assert_schema_fixtures_match_generated(
label: &'static str,
generate: impl FnOnce(&Path) -> Result<()>,
) -> Result<()> {
let schema_root = schema_root()?;
let fixture_tree = read_tree(&schema_root, label)?;
let temp_dir = tempfile::tempdir().context("create temp dir")?;
write_schema_fixtures(temp_dir.path(), None).context("generate schema fixtures")?;
let generated_tree = read_tree(temp_dir.path())?;
let generated_root = temp_dir.path().join(label);
generate(&generated_root).with_context(|| {
format!(
"generate {label} schema fixtures into {}",
generated_root.display()
)
})?;
let generated_tree = read_tree(temp_dir.path(), label)?;
assert_schema_trees_match(label, &fixture_tree, &generated_tree)?;
Ok(())
}
fn assert_schema_trees_match(
label: &str,
fixture_tree: &BTreeMap<PathBuf, Vec<u8>>,
generated_tree: &BTreeMap<PathBuf, Vec<u8>>,
) -> Result<()> {
let fixture_paths = fixture_tree
.keys()
.map(|p| p.display().to_string())
@@ -32,13 +73,13 @@ fn schema_fixtures_match_generated() -> Result<()> {
.to_string();
panic!(
"Vendored app-server schema fixture file set doesn't match freshly generated output. \
"Vendored {label} app-server schema fixture file set doesn't match freshly generated output. \
Run `just write-app-server-schema` to overwrite with your changes.\n\n{diff}"
);
}
// If the file sets match, diff contents for each file for a nicer error.
for (path, expected) in &fixture_tree {
for (path, expected) in fixture_tree {
let actual = generated_tree
.get(path)
.ok_or_else(|| anyhow::anyhow!("missing generated file: {}", path.display()))?;
@@ -54,7 +95,7 @@ Run `just write-app-server-schema` to overwrite with your changes.\n\n{diff}"
.header("fixture", "generated")
.to_string();
panic!(
"Vendored app-server schema fixture {} differs from generated output. \
"Vendored {label} app-server schema fixture {} differs from generated output. \
Run `just write-app-server-schema` to overwrite with your changes.\n\n{diff}",
path.display()
);
@@ -63,7 +104,7 @@ Run `just write-app-server-schema` to overwrite with your changes.\n\n{diff}",
Ok(())
}
fn schema_root() -> Result<std::path::PathBuf> {
fn schema_root() -> Result<PathBuf> {
// In Bazel runfiles (especially manifest-only mode), resolving directories is not
// reliable. Resolve a known file, then walk up to the schema root.
let typescript_index = codex_utils_cargo_bin::find_resource!("schema/typescript/index.ts")
@@ -92,6 +133,11 @@ fn schema_root() -> Result<std::path::PathBuf> {
Ok(schema_root)
}
fn read_tree(root: &Path) -> Result<std::collections::BTreeMap<std::path::PathBuf, Vec<u8>>> {
read_schema_fixture_tree(root).context("read schema fixture tree")
fn read_tree(root: &Path, label: &str) -> Result<BTreeMap<PathBuf, Vec<u8>>> {
read_schema_fixture_subtree(root, label).with_context(|| {
format!(
"read {label} schema fixture subtree from {}",
root.display()
)
})
}

View File

@@ -8,6 +8,10 @@ license.workspace = true
name = "codex-app-server"
path = "src/main.rs"
[[bin]]
name = "codex-app-server-test-notify-capture"
path = "src/bin/notify_capture.rs"
[lib]
name = "codex_app_server"
path = "src/lib.rs"

View File

@@ -0,0 +1,44 @@
use std::env;
use std::fs;
use std::fs::File;
use std::io::Write;
use std::path::PathBuf;
use anyhow::Context;
use anyhow::Result;
use anyhow::anyhow;
use anyhow::bail;
fn main() -> Result<()> {
let mut args = env::args_os();
let _program = args.next();
let output_path = PathBuf::from(
args.next()
.ok_or_else(|| anyhow!("expected output path as first argument"))?,
);
let payload = args
.next()
.ok_or_else(|| anyhow!("expected payload as final argument"))?;
if args.next().is_some() {
bail!("expected payload as final argument");
}
let payload = payload.to_string_lossy();
let temp_path = PathBuf::from(format!("{}.tmp", output_path.display()));
let mut file = File::create(&temp_path)
.with_context(|| format!("failed to create {}", temp_path.display()))?;
file.write_all(payload.as_bytes())
.with_context(|| format!("failed to write {}", temp_path.display()))?;
file.sync_all()
.with_context(|| format!("failed to sync {}", temp_path.display()))?;
fs::rename(&temp_path, &output_path).with_context(|| {
format!(
"failed to move {} into {}",
temp_path.display(),
output_path.display()
)
})?;
Ok(())
}

View File

@@ -711,7 +711,6 @@ pub async fn run_main_with_transport(
request,
transport,
&mut connection_state.session,
&connection_state.outbound_initialized,
)
.await;
if let Ok(mut opted_out_notification_methods) = connection_state
@@ -734,7 +733,15 @@ pub async fn run_main_with_transport(
std::sync::atomic::Ordering::Release,
);
if !was_initialized && connection_state.session.initialized {
processor.send_initialize_notifications().await;
processor
.send_initialize_notifications_to_connection(
connection_id,
)
.await;
processor.connection_initialized(connection_id).await;
connection_state
.outbound_initialized
.store(true, std::sync::atomic::Ordering::Release);
}
}
JSONRPCMessage::Response(response) => {

View File

@@ -1,8 +1,6 @@
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::RwLock;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use crate::codex_message_processor::CodexMessageProcessor;
use crate::codex_message_processor::CodexMessageProcessorArgs;
@@ -233,7 +231,6 @@ impl MessageProcessor {
request: JSONRPCRequest,
transport: AppServerTransport,
session: &mut ConnectionSessionState,
outbound_initialized: &AtomicBool,
) {
let request_span =
crate::app_server_tracing::request_span(&request, transport, connection_id, session);
@@ -349,10 +346,6 @@ impl MessageProcessor {
self.outgoing.send_response(request_id, response).await;
session.initialized = true;
outbound_initialized.store(true, Ordering::Release);
self.codex_message_processor
.connection_initialized(connection_id)
.await;
return;
}
}
@@ -470,10 +463,22 @@ impl MessageProcessor {
self.codex_message_processor.thread_created_receiver()
}
pub(crate) async fn send_initialize_notifications(&self) {
pub(crate) async fn connection_initialized(&self, connection_id: ConnectionId) {
self.codex_message_processor
.connection_initialized(connection_id)
.await;
}
pub(crate) async fn send_initialize_notifications_to_connection(
&self,
connection_id: ConnectionId,
) {
for notification in self.config_warnings.iter().cloned() {
self.outgoing
.send_server_notification(ServerNotification::ConfigWarning(notification))
.send_server_notification_to_connections(
&[connection_id],
ServerNotification::ConfigWarning(notification),
)
.await;
}
}

View File

@@ -33,6 +33,9 @@ sandbox_mode = "danger-full-access"
model_provider = "mock_provider"
[features]
shell_snapshot = false
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "http://127.0.0.1:0/v1"
@@ -53,6 +56,9 @@ fn create_config_toml(codex_home: &Path) -> std::io::Result<()> {
model = "mock-model"
approval_policy = "never"
sandbox_mode = "danger-full-access"
[features]
shell_snapshot = false
"#,
)
}
@@ -65,6 +71,9 @@ model = "mock-model"
approval_policy = "never"
sandbox_mode = "danger-full-access"
forced_login_method = "{forced_method}"
[features]
shell_snapshot = false
"#
);
std::fs::write(config_toml, contents)

View File

@@ -7,6 +7,7 @@ use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::path::Path;
use tempfile::TempDir;
use tokio::time::timeout;
@@ -23,7 +24,23 @@ enum FileExpectation {
NonEmpty,
}
fn create_config_toml(codex_home: &Path) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
r#"
model = "mock-model"
approval_policy = "never"
sandbox_mode = "danger-full-access"
[features]
shell_snapshot = false
"#,
)
}
async fn initialized_mcp(codex_home: &TempDir) -> Result<McpProcess> {
create_config_toml(codex_home.path())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
Ok(mcp)
@@ -164,6 +181,7 @@ async fn assert_no_session_updates_for(
async fn test_fuzzy_file_search_sorts_and_includes_indices() -> Result<()> {
// Prepare a temporary Codex home and a separate root with test files.
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path())?;
let root = TempDir::new()?;
// Create files designed to have deterministic ordering for query "abe".
@@ -235,6 +253,7 @@ async fn test_fuzzy_file_search_sorts_and_includes_indices() -> Result<()> {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fuzzy_file_search_accepts_cancellation_token() -> Result<()> {
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path())?;
let root = TempDir::new()?;
std::fs::write(root.path().join("alpha.txt"), "contents")?;
@@ -434,7 +453,7 @@ async fn test_fuzzy_file_search_session_update_after_stop_fails() -> Result<()>
async fn test_fuzzy_file_search_session_stops_sending_updates_after_stop() -> Result<()> {
let codex_home = TempDir::new()?;
let root = TempDir::new()?;
for i in 0..2_000 {
for i in 0..512 {
let file_path = root.path().join(format!("file-{i:04}.txt"));
std::fs::write(file_path, "contents")?;
}

View File

@@ -83,6 +83,9 @@ sandbox_mode = "danger-full-access"
model_provider = "mock_provider"
[features]
shell_snapshot = false
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{base_url}"

View File

@@ -14,6 +14,7 @@ use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::UserInput as V2UserInput;
use codex_utils_cargo_bin::cargo_bin;
use core_test_support::fs_wait;
use pretty_assertions::assert_eq;
use serde_json::Value;
@@ -191,29 +192,22 @@ async fn turn_start_notify_payload_includes_initialize_client_name() -> Result<(
let responses = vec![create_final_assistant_message_sse_response("Done")?];
let server = create_mock_responses_server_sequence_unchecked(responses).await;
let codex_home = TempDir::new()?;
let notify_script = codex_home.path().join("notify.py");
std::fs::write(
&notify_script,
r#"from pathlib import Path
import sys
payload_path = Path(__file__).with_name("notify.json")
tmp_path = payload_path.with_suffix(".json.tmp")
tmp_path.write_text(sys.argv[-1], encoding="utf-8")
tmp_path.replace(payload_path)
"#,
)?;
let notify_file = codex_home.path().join("notify.json");
let notify_script = notify_script
let notify_capture = cargo_bin("codex-app-server-test-notify-capture")?;
let notify_capture = notify_capture
.to_str()
.expect("notify script path should be valid UTF-8");
.expect("notify capture path should be valid UTF-8");
let notify_file_str = notify_file
.to_str()
.expect("notify file path should be valid UTF-8");
create_config_toml_with_extra(
codex_home.path(),
&server.uri(),
"never",
&format!(
"notify = [\"python3\", {}]",
toml_basic_string(notify_script)
"notify = [{}, {}]",
toml_basic_string(notify_capture),
toml_basic_string(notify_file_str)
),
)?;
@@ -297,6 +291,9 @@ model_provider = "mock_provider"
{extra}
[features]
shell_snapshot = false
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"

View File

@@ -4,7 +4,7 @@ use app_test_support::create_apply_patch_sse_response;
use app_test_support::create_fake_rollout_with_text_elements;
use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::create_mock_responses_server_sequence;
use app_test_support::create_mock_responses_server_sequence_unchecked;
use app_test_support::create_shell_command_sse_response;
use app_test_support::rollout_path;
use app_test_support::to_response;
@@ -59,6 +59,36 @@ use uuid::Uuid;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
const CODEX_5_2_INSTRUCTIONS_TEMPLATE_DEFAULT: &str = "You are Codex, a coding agent based on GPT-5. You and the user share the same workspace and collaborate to achieve the user's goals.";
async fn wait_for_responses_request_count(
server: &wiremock::MockServer,
expected_count: usize,
) -> Result<()> {
timeout(DEFAULT_READ_TIMEOUT, async {
loop {
let Some(requests) = server.received_requests().await else {
anyhow::bail!("wiremock did not record requests");
};
let responses_request_count = requests
.iter()
.filter(|request| {
request.method == "POST" && request.url.path().ends_with("/responses")
})
.count();
if responses_request_count == expected_count {
return Ok::<(), anyhow::Error>(());
}
if responses_request_count > expected_count {
anyhow::bail!(
"expected exactly {expected_count} /responses requests, got {responses_request_count}"
);
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
})
.await??;
Ok(())
}
#[tokio::test]
async fn thread_resume_rejects_unmaterialized_thread() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
@@ -866,7 +896,7 @@ async fn thread_resume_replays_pending_command_execution_request_approval() -> R
)?,
create_final_assistant_message_sse_response("done")?,
];
let server = create_mock_responses_server_sequence(responses).await;
let server = create_mock_responses_server_sequence_unchecked(responses).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
@@ -981,6 +1011,7 @@ async fn thread_resume_replays_pending_command_execution_request_approval() -> R
primary.read_stream_until_notification_message("turn/completed"),
)
.await??;
wait_for_responses_request_count(&server, 3).await?;
Ok(())
}
@@ -1003,7 +1034,7 @@ async fn thread_resume_replays_pending_file_change_request_approval() -> Result<
create_apply_patch_sse_response(patch, "patch-call")?,
create_final_assistant_message_sse_response("done")?,
];
let server = create_mock_responses_server_sequence(responses).await;
let server = create_mock_responses_server_sequence_unchecked(responses).await;
create_config_toml(&codex_home, &server.uri())?;
let mut primary = McpProcess::new(&codex_home).await?;
@@ -1146,6 +1177,7 @@ async fn thread_resume_replays_pending_file_change_request_approval() -> Result<
primary.read_stream_until_notification_message("turn/completed"),
)
.await??;
wait_for_responses_request_count(&server, 3).await?;
Ok(())
}

View File

@@ -34,6 +34,51 @@ use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
async fn wait_for_responses_request_count_to_stabilize(
server: &wiremock::MockServer,
expected_count: usize,
settle_duration: std::time::Duration,
) -> Result<()> {
timeout(DEFAULT_READ_TIMEOUT, async {
let mut stable_since: Option<tokio::time::Instant> = None;
loop {
let requests = server
.received_requests()
.await
.context("failed to fetch received requests")?;
let responses_request_count = requests
.iter()
.filter(|request| {
request.method == "POST" && request.url.path().ends_with("/responses")
})
.count();
if responses_request_count > expected_count {
anyhow::bail!(
"expected exactly {expected_count} /responses requests, got {responses_request_count}"
);
}
if responses_request_count == expected_count {
match stable_since {
Some(stable_since) if stable_since.elapsed() >= settle_duration => {
return Ok::<(), anyhow::Error>(());
}
None => stable_since = Some(tokio::time::Instant::now()),
Some(_) => {}
}
} else {
stable_since = None;
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
})
.await??;
Ok(())
}
#[tokio::test]
async fn thread_unsubscribe_unloads_thread_and_emits_thread_closed_notification() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
@@ -168,6 +213,13 @@ async fn thread_unsubscribe_during_turn_interrupts_turn_and_emits_thread_closed(
};
assert_eq!(payload.thread_id, thread_id);
wait_for_responses_request_count_to_stabilize(
&server,
1,
std::time::Duration::from_millis(200),
)
.await?;
Ok(())
}

View File

@@ -54,6 +54,7 @@ async fn turn_start_shell_zsh_fork_executes_command_v2() -> Result<()> {
std::fs::create_dir(&codex_home)?;
let workspace = tmp.path().join("workspace");
std::fs::create_dir(&workspace)?;
let release_marker = workspace.join("interrupt-release");
let Some(zsh_path) = find_test_zsh_path()? else {
eprintln!("skipping zsh fork test: no zsh executable found");
@@ -61,8 +62,15 @@ async fn turn_start_shell_zsh_fork_executes_command_v2() -> Result<()> {
};
eprintln!("using zsh path for zsh-fork test: {}", zsh_path.display());
// Keep the shell command in flight until we interrupt it. A fast command
// like `echo hi` can finish before the interrupt arrives on faster runners,
// which turns this into a test for post-command follow-up behavior instead
// of interrupting an active zsh-fork command.
let release_marker_escaped = release_marker.to_string_lossy().replace('\'', r#"'\''"#);
let wait_for_interrupt =
format!("while [ ! -f '{release_marker_escaped}' ]; do sleep 0.01; done");
let response = create_shell_command_sse_response(
vec!["echo".to_string(), "hi".to_string()],
vec!["/bin/sh".to_string(), "-c".to_string(), wait_for_interrupt],
None,
Some(5000),
"call-zsh-fork",
@@ -155,7 +163,9 @@ async fn turn_start_shell_zsh_fork_executes_command_v2() -> Result<()> {
assert_eq!(id, "call-zsh-fork");
assert_eq!(status, CommandExecutionStatus::InProgress);
assert!(command.starts_with(&zsh_path.display().to_string()));
assert!(command.contains(" -lc 'echo hi'"));
assert!(command.contains("/bin/sh -c"));
assert!(command.contains("sleep 0.01"));
assert!(command.contains(&release_marker.display().to_string()));
assert_eq!(cwd, workspace);
mcp.interrupt_turn_and_wait_for_aborted(thread.id, turn.id, DEFAULT_READ_TIMEOUT)
@@ -663,22 +673,50 @@ async fn turn_start_shell_zsh_fork_subcommand_decline_marks_parent_declined_v2()
};
assert_eq!(id, "call-zsh-fork-subcommand-decline");
assert_eq!(status, CommandExecutionStatus::Declined);
assert!(
aggregated_output.is_none()
|| aggregated_output == Some("exec command rejected by user".to_string())
);
if let Some(output) = aggregated_output.as_deref() {
assert!(
output == "exec command rejected by user"
|| output.contains("sandbox denied exec error"),
"unexpected aggregated output: {output}"
);
}
mcp.interrupt_turn_and_wait_for_aborted(
thread.id.clone(),
turn.id.clone(),
match timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await?;
.await
{
Ok(Ok(completed_notif)) => {
let completed: TurnCompletedNotification = serde_json::from_value(
completed_notif
.params
.expect("turn/completed params must be present"),
)?;
assert_eq!(completed.thread_id, thread.id);
assert_eq!(completed.turn.id, turn.id);
assert!(matches!(
completed.turn.status,
TurnStatus::Interrupted | TurnStatus::Completed
));
}
Ok(Err(error)) => return Err(error),
Err(_) => {
mcp.interrupt_turn_and_wait_for_aborted(
thread.id.clone(),
turn.id.clone(),
DEFAULT_READ_TIMEOUT,
)
.await?;
}
}
}
Ok(Err(error)) => return Err(error),
Err(_) => {
// Some zsh builds abort the turn immediately after the rejected
// subcommand without emitting a parent `item/completed`.
// subcommand without emitting a parent `item/completed`, and Linux
// sandbox failures can also complete the turn before the parent
// completion item is observed.
let completed_notif = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
@@ -691,7 +729,10 @@ async fn turn_start_shell_zsh_fork_subcommand_decline_marks_parent_declined_v2()
)?;
assert_eq!(completed.thread_id, thread.id);
assert_eq!(completed.turn.id, turn.id);
assert_eq!(completed.turn.status, TurnStatus::Interrupted);
assert!(matches!(
completed.turn.status,
TurnStatus::Interrupted | TurnStatus::Completed
));
}
}

View File

@@ -34,7 +34,7 @@ codex_rust_crate(
"models.json",
"prompt.md",
],
test_data_extra = [
test_data_extra = glob(["src/**/snapshots/**"]) + [
"config.schema.json",
# This is a bit of a hack, but empirically, some of our integration tests
# are relying on the presence of this file as a repo root marker. When

View File

@@ -212,8 +212,9 @@ impl AgentControl {
// TODO(jif) add helper for drain
state.notify_thread_created(new_thread.thread_id);
self.maybe_start_completion_watcher(new_thread.thread_id, notification_source)
.await;
self.send_input(new_thread.thread_id, items).await?;
self.maybe_start_completion_watcher(new_thread.thread_id, notification_source);
Ok(new_thread.thread_id)
}
@@ -288,7 +289,8 @@ impl AgentControl {
// Resumed threads are re-registered in-memory and need the same listener
// attachment path as freshly spawned threads.
state.notify_thread_created(resumed_thread.thread_id);
self.maybe_start_completion_watcher(resumed_thread.thread_id, Some(notification_source));
self.maybe_start_completion_watcher(resumed_thread.thread_id, Some(notification_source))
.await;
Ok(resumed_thread.thread_id)
}
@@ -418,7 +420,7 @@ impl AgentControl {
///
/// This is only enabled for `SubAgentSource::ThreadSpawn`, where a parent thread exists and
/// can receive completion notifications.
fn maybe_start_completion_watcher(
async fn maybe_start_completion_watcher(
&self,
child_thread_id: ThreadId,
session_source: Option<SessionSource>,
@@ -429,10 +431,12 @@ impl AgentControl {
else {
return;
};
let status_rx = self.subscribe_status(child_thread_id).await.ok();
let control = self.clone();
tokio::spawn(async move {
let status = match control.subscribe_status(child_thread_id).await {
Ok(mut status_rx) => {
let status = match status_rx {
Some(mut status_rx) => {
let mut status = status_rx.borrow().clone();
while !is_final(&status) {
if status_rx.changed().await.is_err() {
@@ -443,7 +447,7 @@ impl AgentControl {
}
status
}
Err(_) => control.get_status(child_thread_id).await,
None => control.get_status(child_thread_id).await,
};
if !is_final(&status) {
return;
@@ -1317,10 +1321,34 @@ mod tests {
.get_thread(child_thread_id)
.await
.expect("child thread should exist");
let mut status_rx = harness
.control
.subscribe_status(child_thread_id)
.await
.expect("status subscription should succeed");
let _ = child_thread
.submit(Op::Shutdown {})
.await
.expect("child shutdown should submit");
timeout(Duration::from_secs(5), async {
loop {
let status = status_rx.borrow().clone();
if is_final(&status) {
break;
}
if status_rx.changed().await.is_err() {
let latest = harness.control.get_status(child_thread_id).await;
assert_eq!(
is_final(&latest),
true,
"child status stream closed before a final status was observable"
);
break;
}
}
})
.await
.expect("child should reach a final status");
assert_eq!(wait_for_subagent_notification(&parent_thread).await, true);
}
@@ -1331,15 +1359,18 @@ mod tests {
let (parent_thread_id, parent_thread) = harness.start_thread().await;
let child_thread_id = ThreadId::new();
harness.control.maybe_start_completion_watcher(
child_thread_id,
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
);
harness
.control
.maybe_start_completion_watcher(
child_thread_id,
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
)
.await;
assert_eq!(wait_for_subagent_notification(&parent_thread).await, true);

View File

@@ -6859,6 +6859,12 @@ async fn try_run_sampling_request(
drain_in_flight(&mut in_flight, sess.clone(), turn_context.clone()).await?;
if cancellation_token.is_cancelled()
&& outcome.as_ref().is_ok_and(|result| result.needs_follow_up)
{
return Err(CodexErr::TurnAborted);
}
if should_emit_turn_diff {
let unified_diff = {
let mut tracker = turn_diff_tracker.lock().await;

View File

@@ -2,24 +2,21 @@ use super::*;
use crate::config_loader::ConfigLayerEntry;
use crate::config_loader::ConfigRequirements;
use crate::config_loader::ConfigRequirementsToml;
use crate::exec::ExecParams;
use crate::exec_policy::ExecPolicyManager;
use crate::features::Feature;
use crate::guardian::GUARDIAN_SUBAGENT_NAME;
use crate::protocol::AskForApproval;
use crate::sandboxing::SandboxPermissions;
use crate::tools::handlers::normalize_and_validate_additional_permissions;
use crate::turn_diff_tracker::TurnDiffTracker;
use codex_app_server_protocol::ConfigLayerSource;
use codex_execpolicy::Decision;
use codex_execpolicy::Evaluation;
use codex_execpolicy::RuleMatch;
use codex_protocol::models::FunctionCallOutputBody;
use codex_protocol::models::NetworkPermissions;
use codex_protocol::models::PermissionProfile;
use codex_utils_absolute_path::AbsolutePathBuf;
use pretty_assertions::assert_eq;
use serde::Deserialize;
use std::collections::HashMap;
use std::fs;
use std::sync::Arc;
use tempfile::tempdir;
@@ -39,89 +36,23 @@ async fn guardian_allows_shell_additional_permissions_requests_past_policy_valid
.features
.enable(Feature::RequestPermissions)
.expect("test setup should allow enabling request permissions");
turn_context_raw
.sandbox_policy
.set(SandboxPolicy::DangerFullAccess)
.expect("test setup should allow updating sandbox policy");
let session = Arc::new(session);
let turn_context = Arc::new(turn_context_raw);
let params = ExecParams {
command: if cfg!(windows) {
vec![
"cmd.exe".to_string(),
"/C".to_string(),
"echo hi".to_string(),
]
} else {
vec![
"/bin/sh".to_string(),
"-c".to_string(),
"echo hi".to_string(),
]
},
cwd: turn_context.cwd.clone(),
expiration: 1000.into(),
env: HashMap::new(),
network: None,
sandbox_permissions: SandboxPermissions::WithAdditionalPermissions,
windows_sandbox_level: turn_context.windows_sandbox_level,
justification: Some("test".to_string()),
arg0: None,
let additional_permissions = PermissionProfile {
network: Some(NetworkPermissions {
enabled: Some(true),
}),
file_system: None,
macos: None,
};
let normalized = normalize_and_validate_additional_permissions(
session.features().enabled(Feature::RequestPermissions),
turn_context_raw.approval_policy.value(),
SandboxPermissions::WithAdditionalPermissions,
Some(additional_permissions.clone()),
&turn_context_raw.cwd,
)
.expect("shell additional permissions should pass policy validation");
let handler = ShellHandler;
let resp = handler
.handle(ToolInvocation {
session: Arc::clone(&session),
turn: Arc::clone(&turn_context),
tracker: Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new())),
call_id: "test-call".to_string(),
tool_name: "shell".to_string(),
payload: ToolPayload::Function {
arguments: serde_json::json!({
"command": params.command.clone(),
"workdir": Some(turn_context.cwd.to_string_lossy().to_string()),
"timeout_ms": params.expiration.timeout_ms(),
"sandbox_permissions": params.sandbox_permissions,
"additional_permissions": PermissionProfile {
network: Some(NetworkPermissions {
enabled: Some(true),
}),
file_system: None,
macos: None,
},
"justification": params.justification.clone(),
})
.to_string(),
},
})
.await;
let output = match resp.expect("expected Ok result") {
ToolOutput::Function {
body: FunctionCallOutputBody::Text(content),
..
} => content,
_ => panic!("unexpected tool output"),
};
#[derive(Deserialize, PartialEq, Eq, Debug)]
struct ResponseExecMetadata {
exit_code: i32,
}
#[derive(Deserialize)]
struct ResponseExecOutput {
output: String,
metadata: ResponseExecMetadata,
}
let exec_output: ResponseExecOutput =
serde_json::from_str(&output).expect("valid exec output json");
assert_eq!(exec_output.metadata, ResponseExecMetadata { exit_code: 0 });
assert!(exec_output.output.contains("hi"));
assert_eq!(normalized, Some(additional_permissions));
}
#[tokio::test]

View File

@@ -548,6 +548,8 @@ mod tests {
use super::*;
use pretty_assertions::assert_eq;
#[cfg(unix)]
use serial_test::serial;
#[cfg(unix)]
use std::os::unix::ffi::OsStrExt;
#[cfg(unix)]
use std::process::Command;
@@ -735,6 +737,7 @@ mod tests {
#[cfg(unix)]
#[tokio::test]
#[serial(stdin_fd)]
async fn snapshot_shell_does_not_inherit_stdin() -> Result<()> {
let _stdin_guard = BlockingStdinPipe::install()?;

View File

@@ -1,5 +1,5 @@
---
source: core/src/guardian.rs
source: core/src/guardian_tests.rs
expression: "context_snapshot::format_labeled_requests_snapshot(\"Guardian review request layout\",\n&[(\"Guardian Review Request\", &request)], &ContextSnapshotOptions::default(),)"
---
Scenario: Guardian review request layout
@@ -25,6 +25,6 @@ Scenario: Guardian review request layout
[11] Sandbox denied outbound git push to github.com.\n\n
[12] Assess the exact planned action below. Use read-only tool checks when local state matters.\n
[13] Planned action JSON:\n
[14] {\n "command": [\n "git",\n "push",\n "origin",\n "guardian-approval-mvp"\n ],\n "cwd": "/repo/codex-rs/core",\n "justification": "Need to push the reviewed docs fix to the repo remote.",\n "sandbox_permissions": "use_default",\n "tool": "shell"\n}\n
[14] {\n "tool": "shell",\n "command": [\n "git",\n "push",\n "origin",\n "guardian-approval-mvp"\n ],\n "cwd": "/repo/codex-rs/core",\n "sandbox_permissions": "use_default",\n "justification": "Need to push the reviewed docs fix to the repo remote."\n}\n
[15] >>> APPROVAL REQUEST END\n
[16] You may use read-only tool checks to gather any additional context you need to make a high-confidence determination.\n\nYour final message must be strict JSON with this exact schema:\n{\n "risk_level": "low" | "medium" | "high",\n "risk_score": 0-100,\n "rationale": string,\n "evidence": [{"message": string, "why": string}]\n}\n

View File

@@ -7,3 +7,4 @@ pub(crate) use session::SessionState;
pub(crate) use turn::ActiveTurn;
pub(crate) use turn::RunningTask;
pub(crate) use turn::TaskKind;
pub(crate) use turn::TurnState;

View File

@@ -172,11 +172,3 @@ impl TurnState {
!self.pending_input.is_empty()
}
}
impl ActiveTurn {
/// Clear any pending approvals and input buffered for the current turn.
pub(crate) async fn clear_pending(&self) {
let mut ts = self.turn_state.lock().await;
ts.clear_pending();
}
}

View File

@@ -33,12 +33,14 @@ use crate::protocol::TurnCompleteEvent;
use crate::state::ActiveTurn;
use crate::state::RunningTask;
use crate::state::TaskKind;
use crate::state::TurnState;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::user_input::UserInput;
use tokio::sync::Mutex;
use crate::features::Feature;
pub(crate) use compact::CompactTask;
@@ -201,7 +203,16 @@ impl Session {
}
pub async fn abort_all_tasks(self: &Arc<Self>, reason: TurnAbortReason) {
for task in self.take_all_running_tasks().await {
let (tasks, turn_state) = self.take_all_running_tasks().await;
for task in &tasks {
task.cancellation_token.cancel();
}
if let Some(turn_state) = turn_state {
// Drop pending approvals only after all running tasks observe cancellation, so
// interrupted approval waits resolve as aborts instead of synthetic denials.
turn_state.lock().await.clear_pending();
}
for task in tasks {
self.handle_task_abort(task, reason.clone()).await;
}
if reason == TurnAbortReason::Interrupted {
@@ -342,15 +353,14 @@ impl Session {
*active = Some(turn);
}
async fn take_all_running_tasks(&self) -> Vec<RunningTask> {
async fn take_all_running_tasks(&self) -> (Vec<RunningTask>, Option<Arc<Mutex<TurnState>>>) {
let mut active = self.active_turn.lock().await;
match active.take() {
Some(mut at) => {
at.clear_pending().await;
at.drain_tasks()
let turn_state = Arc::clone(&at.turn_state);
(at.drain_tasks(), Some(turn_state))
}
None => Vec::new(),
None => (Vec::new(), None),
}
}
@@ -363,10 +373,6 @@ impl Session {
async fn handle_task_abort(self: &Arc<Self>, task: RunningTask, reason: TurnAbortReason) {
let sub_id = task.turn_context.sub_id.clone();
if task.cancellation_token.is_cancelled() {
return;
}
trace!(task_kind = ?task.kind, sub_id, "aborting running task");
task.cancellation_token.cancel();
task.turn_context

View File

@@ -90,7 +90,7 @@ fn resolve_workdir_base_path(
/// Validates feature/policy constraints for `with_additional_permissions` and
/// normalizes any path-based permissions. Errors if the request is invalid.
pub(super) fn normalize_and_validate_additional_permissions(
pub(crate) fn normalize_and_validate_additional_permissions(
request_permission_enabled: bool,
approval_policy: AskForApproval,
sandbox_permissions: SandboxPermissions,

View File

@@ -1,10 +1,11 @@
use assert_matches::assert_matches;
use std::sync::Arc;
use std::time::Duration;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::Op;
use codex_protocol::user_input::UserInput;
use core_test_support::responses::ResponseMock;
use core_test_support::responses::ResponsesRequest;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_function_call;
use core_test_support::responses::ev_response_created;
@@ -16,6 +17,45 @@ use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use regex_lite::Regex;
use serde_json::json;
use tokio::time::Instant;
use tokio::time::sleep;
async fn wait_for_responses_request_count_to_stabilize(
response_mock: &ResponseMock,
expected_count: usize,
timeout: Duration,
settle_duration: Duration,
) -> Vec<ResponsesRequest> {
let deadline = Instant::now() + timeout;
let mut stable_since: Option<Instant> = None;
loop {
let requests = response_mock.requests();
let request_count = requests.len();
if request_count > expected_count {
panic!("expected at most {expected_count} responses requests, got {request_count}");
}
if request_count == expected_count {
match stable_since {
Some(stable_since) if stable_since.elapsed() >= settle_duration => {
return requests;
}
Some(_) => {}
None => stable_since = Some(Instant::now()),
}
} else {
stable_since = None;
}
assert!(
Instant::now() < deadline,
"timed out waiting for {expected_count} responses requests; last count was {request_count}"
);
sleep(Duration::from_millis(10)).await;
}
}
/// Integration test: spawn a longrunning shell_command tool via a mocked Responses SSE
/// function call, then interrupt the session and expect TurnAborted.
@@ -62,6 +102,9 @@ async fn interrupt_long_running_tool_emits_turn_aborted() {
// Expect TurnAborted soon after.
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnAborted(_))).await;
codex.submit(Op::Shutdown).await.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await;
}
/// After an interrupt we expect the next request to the model to include both
@@ -129,7 +172,13 @@ async fn interrupt_tool_records_history_entries() {
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let requests = response_mock.requests();
let requests = wait_for_responses_request_count_to_stabilize(
&response_mock,
2,
Duration::from_secs(2),
Duration::from_millis(100),
)
.await;
assert!(
requests.len() == 2,
"expected two calls to the responses API, got {}",
@@ -140,28 +189,44 @@ async fn interrupt_tool_records_history_entries() {
response_mock.saw_function_call(call_id),
"function call not recorded in responses payload"
);
let output = response_mock
.function_call_output_text(call_id)
.expect("missing function_call_output text");
let re = Regex::new(r"^Wall time: ([0-9]+(?:\.[0-9])?) seconds\naborted by user$")
.expect("compile regex");
let captures = re.captures(&output);
assert_matches!(
captures.as_ref(),
Some(caps) if caps.get(1).is_some(),
"aborted message with elapsed seconds"
);
let secs: f32 = captures
.expect("aborted message with elapsed seconds")
.get(1)
.unwrap()
.as_str()
.parse()
.unwrap();
let follow_up_request = &requests[1];
let (output, success) = follow_up_request
.function_call_output_content_and_success(call_id)
.expect("missing function_call_output entry");
let output = output.expect("missing function_call_output text");
assert!(
secs >= 0.1,
"expected at least one tenth of a second of elapsed time, got {secs}"
output.contains("aborted"),
"expected aborted output in follow-up request, got {output:?}"
);
assert!(
success != Some(true),
"expected aborted output to avoid a success=true marker, got {success:?} with output {output:?}"
);
let wall_time =
Regex::new(r"Wall time: ([0-9]+(?:\.[0-9]+)?) seconds").expect("compile wall time regex");
let aborted_after =
Regex::new(r"aborted by user after ([0-9]+(?:\.[0-9]+)?)s").expect("compile abort regex");
let secs = wall_time
.captures(&output)
.and_then(|captures| captures.get(1))
.or_else(|| {
aborted_after
.captures(&output)
.and_then(|captures| captures.get(1))
})
.map(|capture| capture.as_str().parse::<f32>().unwrap());
if let Some(secs) = secs {
assert!(
secs >= 0.1,
"expected at least one tenth of a second of elapsed time, got {secs} in {output:?}"
);
}
codex.submit(Op::Shutdown).await.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await;
}
/// After an interrupt we persist a model-visible `<turn_aborted>` marker in the conversation
@@ -227,7 +292,13 @@ async fn interrupt_persists_turn_aborted_marker_in_next_request() {
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let requests = response_mock.requests();
let requests = wait_for_responses_request_count_to_stabilize(
&response_mock,
2,
Duration::from_secs(2),
Duration::from_millis(100),
)
.await;
assert_eq!(requests.len(), 2, "expected two calls to the responses API");
let follow_up_request = &requests[1];
@@ -238,4 +309,70 @@ async fn interrupt_persists_turn_aborted_marker_in_next_request() {
.any(|text| text.contains("<turn_aborted>")),
"expected <turn_aborted> marker in follow-up request"
);
codex.submit(Op::Shutdown).await.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await;
}
/// Interrupting a turn while a tool-produced follow-up is pending must not
/// start another model request before the session reports TurnAborted.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn interrupt_does_not_issue_follow_up_request() {
let command = "sleep 60";
let call_id = "call-no-follow-up";
let args = json!({
"command": command,
"timeout_ms": 60_000
})
.to_string();
let first_body = sse(vec![
ev_response_created("resp-no-follow-up"),
ev_function_call(call_id, "shell_command", &args),
ev_completed("resp-no-follow-up"),
]);
let server = start_mock_server().await;
let response_mock = mount_sse_once(&server, first_body).await;
let fixture = test_codex()
.with_model("gpt-5.1")
.build(&server)
.await
.unwrap();
let codex = Arc::clone(&fixture.codex);
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "start interrupt follow-up guard".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecCommandBegin(_))).await;
tokio::time::sleep(Duration::from_secs_f32(0.1)).await;
codex.submit(Op::Interrupt).await.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnAborted(_))).await;
let requests = wait_for_responses_request_count_to_stabilize(
&response_mock,
1,
Duration::from_secs(2),
Duration::from_millis(200),
)
.await;
assert_eq!(
requests.len(),
1,
"interrupt should not issue a follow-up responses request"
);
codex.submit(Op::Shutdown).await.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await;
}

View File

@@ -2,6 +2,7 @@ use anyhow::Context;
use anyhow::Result;
use chrono::Utc;
use codex_core::CodexAuth;
use codex_core::CodexThread;
use codex_core::auth::OPENAI_API_KEY_ENV_VAR;
use codex_protocol::ThreadId;
use codex_protocol::protocol::CodexErrorInfo;
@@ -29,6 +30,7 @@ use core_test_support::wait_for_event_match;
use pretty_assertions::assert_eq;
use serde_json::Value;
use serde_json::json;
use serial_test::serial;
use std::ffi::OsString;
use std::fs;
use std::time::Duration;
@@ -37,6 +39,8 @@ use tokio::sync::oneshot;
const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex.";
const MEMORY_PROMPT_PHRASE: &str =
"You have access to a memory folder with guidance from prior runs.";
const REALTIME_NORMAL_CLOSE_MESSAGE: &str =
"failed to send realtime request: Connection closed normally";
fn websocket_request_text(
request: &core_test_support::responses::WebSocketRequest,
@@ -54,6 +58,32 @@ fn websocket_request_instructions(
.map(str::to_owned)
}
async fn wait_for_session_updated_allowing_clean_close(
codex: &CodexThread,
expected_session_id: &str,
) {
tokio::time::timeout(Duration::from_secs(3), async {
loop {
let event = match codex.next_event().await {
Ok(event) => event,
Err(err) => panic!("realtime conversation event should arrive: {err:?}"),
};
match event.msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionUpdated { session_id, .. },
}) if session_id == expected_session_id => return,
EventMsg::Error(err) if err.message.contains(REALTIME_NORMAL_CLOSE_MESSAGE) => {}
EventMsg::Error(err) => panic!("conversation start failed: {err:?}"),
_ => {}
}
}
})
.await
.unwrap_or_else(|_| {
panic!("timeout waiting for session.updated event for {expected_session_id}")
});
}
async fn seed_recent_thread(
test: &TestCodex,
title: &str,
@@ -230,6 +260,7 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> {
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial(openai_api_key_env)]
async fn conversation_start_uses_openai_env_key_fallback_with_chatgpt_auth() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -665,15 +696,7 @@ async fn conversation_uses_experimental_realtime_ws_startup_context_override() -
}))
.await?;
wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionUpdated { session_id, .. },
}) if session_id == "sess_custom_context" => Some(Ok(())),
EventMsg::Error(err) => Some(Err(err.clone())),
_ => None,
})
.await
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
wait_for_session_updated_allowing_clean_close(&test.codex, "sess_custom_context").await;
let startup_context_request = server.wait_for_request(1, 0).await;
let instructions = websocket_request_instructions(&startup_context_request)
@@ -723,15 +746,7 @@ async fn conversation_disables_realtime_startup_context_with_empty_override() ->
}))
.await?;
wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionUpdated { session_id, .. },
}) if session_id == "sess_no_context" => Some(Ok(())),
EventMsg::Error(err) => Some(Err(err.clone())),
_ => None,
})
.await
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
wait_for_session_updated_allowing_clean_close(&test.codex, "sess_no_context").await;
let startup_context_request = server.wait_for_request(1, 0).await;
let instructions = websocket_request_instructions(&startup_context_request)
@@ -777,15 +792,7 @@ async fn conversation_start_injects_startup_context_from_thread_history() -> Res
}))
.await?;
wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionUpdated { session_id, .. },
}) if session_id == "sess_context" => Some(Ok(())),
EventMsg::Error(err) => Some(Err(err.clone())),
_ => None,
})
.await
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
wait_for_session_updated_allowing_clean_close(&test.codex, "sess_context").await;
let startup_context_request = server.wait_for_request(1, 0).await;
let startup_context = websocket_request_instructions(&startup_context_request)
@@ -831,15 +838,7 @@ async fn conversation_startup_context_falls_back_to_workspace_map() -> Result<()
}))
.await?;
wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionUpdated { session_id, .. },
}) if session_id == "sess_workspace" => Some(Ok(())),
EventMsg::Error(err) => Some(Err(err.clone())),
_ => None,
})
.await
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
wait_for_session_updated_allowing_clean_close(&test.codex, "sess_workspace").await;
let startup_context_request = server.wait_for_request(1, 0).await;
let startup_context = websocket_request_instructions(&startup_context_request)

View File

@@ -4,6 +4,7 @@ use std::ffi::OsString;
use std::fs;
use std::net::TcpListener;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
@@ -36,11 +37,13 @@ use core_test_support::skip_if_no_network;
use core_test_support::stdio_server_bin;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use core_test_support::wait_for_event_with_timeout;
use reqwest::Client;
use reqwest::StatusCode;
use serde_json::Value;
use serde_json::json;
use serial_test::serial;
use tempfile::tempdir;
use tokio::net::TcpStream;
use tokio::process::Child;
use tokio::process::Command;
use tokio::time::Instant;
@@ -263,7 +266,7 @@ async fn stdio_image_responses_round_trip() -> anyhow::Result<()> {
let tools_ready_deadline = Instant::now() + Duration::from_secs(30);
loop {
fixture.codex.submit(Op::ListMcpTools).await?;
let list_event = core_test_support::wait_for_event_with_timeout(
let list_event = wait_for_event_with_timeout(
&fixture.codex,
|ev| matches!(ev, EventMsg::McpListToolsResponse(_)),
Duration::from_secs(10),
@@ -851,10 +854,8 @@ async fn streamable_http_tool_call_round_trip() -> anyhow::Result<()> {
Ok(())
}
/// This test writes to a fallback credentials file in CODEX_HOME.
/// Ideally, we wouldn't need to serialize the test but it's much more cumbersome to wire CODEX_HOME through the code.
#[serial(codex_home)]
#[test]
#[serial(mcp_test_value)]
fn streamable_http_with_oauth_round_trip() -> anyhow::Result<()> {
const TEST_STACK_SIZE_BYTES: usize = 8 * 1024 * 1024;
@@ -936,8 +937,8 @@ async fn streamable_http_with_oauth_round_trip_impl() -> anyhow::Result<()> {
wait_for_streamable_http_server(&mut http_server_child, &bind_addr, Duration::from_secs(5))
.await?;
let temp_home = tempdir()?;
let _guard = EnvVarGuard::set("CODEX_HOME", temp_home.path().as_os_str());
let temp_home = Arc::new(tempdir()?);
let _codex_home_guard = EnvVarGuard::set("CODEX_HOME", temp_home.path().as_os_str());
write_fallback_oauth_tokens(
temp_home.path(),
server_name,
@@ -948,10 +949,10 @@ async fn streamable_http_with_oauth_round_trip_impl() -> anyhow::Result<()> {
)?;
let fixture = test_codex()
.with_home(temp_home.clone())
.with_config(move |config| {
// This test seeds OAuth tokens in CODEX_HOME/.credentials.json and
// validates file-backed OAuth loading. Force file mode so Linux
// keyring backend quirks do not affect this test.
// Keep OAuth credentials isolated to this test home because Bazel
// runs the full core suite in one process.
config.mcp_oauth_credentials_store_mode = serde_json::from_value(json!("file"))
.expect("`file` should deserialize as OAuthCredentialsStoreMode");
let mut servers = config.mcp_servers.get().clone();
@@ -984,6 +985,31 @@ async fn streamable_http_with_oauth_round_trip_impl() -> anyhow::Result<()> {
.await?;
let session_model = fixture.session_configured.model.clone();
let tools_ready_deadline = Instant::now() + Duration::from_secs(30);
loop {
fixture.codex.submit(Op::ListMcpTools).await?;
let list_event = wait_for_event_with_timeout(
&fixture.codex,
|ev| matches!(ev, EventMsg::McpListToolsResponse(_)),
Duration::from_secs(10),
)
.await;
let EventMsg::McpListToolsResponse(tool_list) = list_event else {
unreachable!("event guard guarantees McpListToolsResponse");
};
if tool_list.tools.contains_key(&tool_name) {
break;
}
let available_tools: Vec<&str> = tool_list.tools.keys().map(String::as_str).collect();
if Instant::now() >= tools_ready_deadline {
panic!(
"timed out waiting for MCP tool {tool_name} to become available; discovered tools: {available_tools:?}"
);
}
sleep(Duration::from_millis(200)).await;
}
fixture
.codex
.submit(Op::UserTurn {
@@ -1078,7 +1104,8 @@ async fn wait_for_streamable_http_server(
timeout: Duration,
) -> anyhow::Result<()> {
let deadline = Instant::now() + timeout;
let metadata_url = format!("http://{address}/.well-known/oauth-authorization-server/mcp");
let client = Client::builder().no_proxy().build()?;
loop {
if let Some(status) = server_child.try_wait()? {
return Err(anyhow::anyhow!(
@@ -1090,22 +1117,30 @@ async fn wait_for_streamable_http_server(
if remaining.is_zero() {
return Err(anyhow::anyhow!(
"timed out waiting for streamable HTTP server at {address}: deadline reached"
"timed out waiting for streamable HTTP server metadata at {metadata_url}: deadline reached"
));
}
match tokio::time::timeout(remaining, TcpStream::connect(address)).await {
Ok(Ok(_)) => return Ok(()),
match tokio::time::timeout(remaining, client.get(&metadata_url).send()).await {
Ok(Ok(response)) if response.status() == StatusCode::OK => return Ok(()),
Ok(Ok(response)) => {
if Instant::now() >= deadline {
return Err(anyhow::anyhow!(
"timed out waiting for streamable HTTP server metadata at {metadata_url}: HTTP {}",
response.status()
));
}
}
Ok(Err(error)) => {
if Instant::now() >= deadline {
return Err(anyhow::anyhow!(
"timed out waiting for streamable HTTP server at {address}: {error}"
"timed out waiting for streamable HTTP server metadata at {metadata_url}: {error}"
));
}
}
Err(_) => {
return Err(anyhow::anyhow!(
"timed out waiting for streamable HTTP server at {address}: connect call timed out"
"timed out waiting for streamable HTTP server metadata at {metadata_url}: request timed out"
));
}
}

View File

@@ -349,7 +349,7 @@ async fn shell_output_for_freeform_tool_records_duration(
let test = builder.build(&server).await?;
let call_id = "shell-structured";
let responses = shell_responses(call_id, vec!["/bin/sh", "-c", "sleep 1"], output_type)?;
let responses = shell_responses(call_id, vec!["/bin/sh", "-c", "sleep 0.2"], output_type)?;
let mock = mount_sse_sequence(&server, responses).await;
test.submit_turn_with_policy(
@@ -381,7 +381,7 @@ $"#;
.and_then(|value| value.as_str().parse::<f32>().ok())
.expect("expected structured shell output to contain wall time seconds");
assert!(
wall_time_seconds > 0.5,
wall_time_seconds > 0.1,
"expected wall time to be greater than zero seconds, got {wall_time_seconds}"
);
@@ -740,6 +740,7 @@ async fn shell_command_output_is_freeform() -> Result<()> {
let call_id = "shell-command";
let args = json!({
"command": "echo shell command",
"login": false,
"timeout_ms": 1_000,
});
let responses = vec![
@@ -791,6 +792,7 @@ async fn shell_command_output_is_not_truncated_under_10k_bytes() -> Result<()> {
let call_id = "shell-command";
let args = json!({
"command": "perl -e 'print \"1\" x 10000'",
"login": false,
"timeout_ms": 1000,
});
let responses = vec![
@@ -841,6 +843,7 @@ async fn shell_command_output_is_not_truncated_over_10k_bytes() -> Result<()> {
let call_id = "shell-command";
let args = json!({
"command": "perl -e 'print \"1\" x 10001'",
"login": false,
"timeout_ms": 1000,
});
let responses = vec![

View File

@@ -6,63 +6,40 @@ use codex_core::WireApi;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::Op;
use codex_protocol::user_input::UserInput;
use codex_utils_cargo_bin::find_resource;
use core_test_support::load_sse_fixture;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::sse;
use core_test_support::responses;
use core_test_support::skip_if_no_network;
use core_test_support::streaming_sse::StreamingSseChunk;
use core_test_support::streaming_sse::start_streaming_sse_server;
use core_test_support::test_codex::TestCodex;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::Request;
use wiremock::Respond;
use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path;
fn sse_incomplete() -> String {
load_sse_fixture("tests/fixtures/incomplete_sse.json")
let fixture = find_resource!("tests/fixtures/incomplete_sse.json")
.unwrap_or_else(|err| panic!("failed to resolve incomplete_sse fixture: {err}"));
load_sse_fixture(fixture)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn retries_on_early_close() {
skip_if_no_network!();
let server = MockServer::start().await;
let incomplete_sse = sse_incomplete();
let completed_sse = responses::sse_completed("resp_ok");
struct SeqResponder;
impl Respond for SeqResponder {
fn respond(&self, _: &Request) -> ResponseTemplate {
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
static CALLS: AtomicUsize = AtomicUsize::new(0);
let n = CALLS.fetch_add(1, Ordering::SeqCst);
if n == 0 {
ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(sse_incomplete(), "text/event-stream")
} else {
ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(
sse(vec![
ev_response_created("resp_ok"),
ev_completed("resp_ok"),
]),
"text/event-stream",
)
}
}
}
Mock::given(method("POST"))
.and(path("/v1/responses"))
.respond_with(SeqResponder {})
.expect(2)
.mount(&server)
.await;
let (server, _) = start_streaming_sse_server(vec![
vec![StreamingSseChunk {
gate: None,
body: incomplete_sse,
}],
vec![StreamingSseChunk {
gate: None,
body: completed_sse,
}],
])
.await;
// Configure retry behavior explicitly to avoid mutating process-wide
// environment variables.
@@ -92,7 +69,7 @@ async fn retries_on_early_close() {
.with_config(move |config| {
config.model_provider = model_provider;
})
.build(&server)
.build_with_streaming_server(&server)
.await
.unwrap();
@@ -109,4 +86,13 @@ async fn retries_on_early_close() {
// Wait until TurnComplete (should succeed after retry).
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
let requests = server.requests().await;
assert_eq!(
requests.len(),
2,
"expected retry after incomplete SSE stream"
);
server.shutdown().await;
}

View File

@@ -3,6 +3,7 @@ use std::collections::HashMap;
use std::io::ErrorKind;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use axum::Router;
use axum::body::Body;
@@ -44,6 +45,7 @@ use serde::Deserialize;
use serde_json::json;
use tokio::sync::Mutex;
use tokio::task;
use tokio::time::sleep;
#[derive(Clone)]
struct TestToolServer {
@@ -275,15 +277,25 @@ fn parse_bind_addr() -> Result<SocketAddr, Box<dyn std::error::Error>> {
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let bind_addr = parse_bind_addr()?;
let session_failure_state = SessionFailureState::default();
let listener = match tokio::net::TcpListener::bind(&bind_addr).await {
Ok(listener) => listener,
Err(err) if err.kind() == ErrorKind::PermissionDenied => {
eprintln!(
"failed to bind to {bind_addr}: {err}. make sure the process has network access"
);
return Ok(());
const MAX_BIND_RETRIES: u32 = 20;
const BIND_RETRY_DELAY: Duration = Duration::from_millis(50);
let mut bind_retries = 0;
let listener = loop {
match tokio::net::TcpListener::bind(&bind_addr).await {
Ok(listener) => break listener,
Err(err) if err.kind() == ErrorKind::PermissionDenied => {
eprintln!(
"failed to bind to {bind_addr}: {err}. make sure the process has network access"
);
return Ok(());
}
Err(err) if err.kind() == ErrorKind::AddrInUse && bind_retries < MAX_BIND_RETRIES => {
bind_retries += 1;
sleep(BIND_RETRY_DELAY).await;
}
Err(err) => return Err(err.into()),
}
Err(err) => return Err(err.into()),
};
eprintln!("starting rmcp streamable http test server on http://{bind_addr}/mcp");

View File

@@ -24,8 +24,13 @@ async fn wait_for_pid_file(path: &Path) -> Result<u32> {
for _ in 0..50 {
match fs::read_to_string(path) {
Ok(content) => {
let pid = content
.trim()
let trimmed = content.trim();
if trimmed.is_empty() {
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
let pid = trimmed
.parse::<u32>()
.with_context(|| format!("failed to parse pid from {}", path.display()))?;
return Ok(pid);

View File

@@ -22,7 +22,6 @@ anyhow = { workspace = true }
uds_windows = { workspace = true }
[dev-dependencies]
assert_cmd = { workspace = true }
codex-utils-cargo-bin = { workspace = true }
pretty_assertions = { workspace = true }
tempfile = { workspace = true }

View File

@@ -39,9 +39,13 @@ pub fn run(socket_path: &Path) -> anyhow::Result<()> {
io::copy(&mut handle, &mut stream).context("failed to copy data from stdin to socket")?;
}
stream
.shutdown(Shutdown::Write)
.context("failed to shutdown socket writer")?;
// The peer can close immediately after sending its response. On macOS that
// may surface as NotConnected when we half-close our write side.
if let Err(err) = stream.shutdown(Shutdown::Write)
&& err.kind() != io::ErrorKind::NotConnected
{
return Err(err).context("failed to shutdown socket writer");
}
let stdout_result = stdout_thread
.join()

View File

@@ -1,12 +1,15 @@
use std::io::ErrorKind;
use std::io::Read;
use std::io::Write;
use std::process::Command;
use std::process::Stdio;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
use std::time::Instant;
use anyhow::Context;
use assert_cmd::Command;
use anyhow::anyhow;
use pretty_assertions::assert_eq;
#[cfg(unix)]
@@ -19,6 +22,9 @@ use uds_windows::UnixListener;
fn pipes_stdin_and_stdout_through_socket() -> anyhow::Result<()> {
let dir = tempfile::TempDir::new().context("failed to create temp dir")?;
let socket_path = dir.path().join("socket");
let request = b"request";
let request_path = dir.path().join("request.txt");
std::fs::write(&request_path, request).context("failed to write child stdin fixture")?;
let listener = match UnixListener::bind(&socket_path) {
Ok(listener) => listener,
Err(err) if err.kind() == ErrorKind::PermissionDenied => {
@@ -31,37 +37,103 @@ fn pipes_stdin_and_stdout_through_socket() -> anyhow::Result<()> {
};
let (tx, rx) = mpsc::channel();
let (event_tx, event_rx) = mpsc::channel();
let server_thread = thread::spawn(move || -> anyhow::Result<()> {
let _ = event_tx.send("waiting for accept".to_string());
let (mut connection, _) = listener
.accept()
.context("failed to accept test connection")?;
let mut received = Vec::new();
let _ = event_tx.send("accepted connection".to_string());
let mut received = vec![0; request.len()];
connection
.read_to_end(&mut received)
.read_exact(&mut received)
.context("failed to read data from client")?;
let _ = event_tx.send(format!("read {} bytes", received.len()));
tx.send(received)
.map_err(|_| anyhow::anyhow!("failed to send received bytes to test thread"))?;
.map_err(|_| anyhow!("failed to send received bytes to test thread"))?;
connection
.write_all(b"response")
.context("failed to write response to client")?;
let _ = event_tx.send("wrote response".to_string());
Ok(())
});
Command::new(codex_utils_cargo_bin::cargo_bin("codex-stdio-to-uds")?)
let stdin = std::fs::File::open(&request_path).context("failed to open child stdin fixture")?;
let mut child = Command::new(codex_utils_cargo_bin::cargo_bin("codex-stdio-to-uds")?)
.arg(&socket_path)
.write_stdin("request")
.assert()
.success()
.stdout("response");
.stdin(Stdio::from(stdin))
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.context("failed to spawn codex-stdio-to-uds")?;
let mut child_stdout = child.stdout.take().context("missing child stdout")?;
let mut child_stderr = child.stderr.take().context("missing child stderr")?;
let (stdout_tx, stdout_rx) = mpsc::channel();
let (stderr_tx, stderr_rx) = mpsc::channel();
thread::spawn(move || {
let mut stdout = Vec::new();
let result = child_stdout.read_to_end(&mut stdout).map(|_| stdout);
let _ = stdout_tx.send(result);
});
thread::spawn(move || {
let mut stderr = Vec::new();
let result = child_stderr.read_to_end(&mut stderr).map(|_| stderr);
let _ = stderr_tx.send(result);
});
let mut server_events = Vec::new();
let deadline = Instant::now() + Duration::from_secs(5);
let status = loop {
while let Ok(event) = event_rx.try_recv() {
server_events.push(event);
}
if let Some(status) = child.try_wait().context("failed to poll child status")? {
break status;
}
if Instant::now() >= deadline {
let _ = child.kill();
let _ = child.wait();
let stderr = stderr_rx
.recv_timeout(Duration::from_secs(1))
.context("timed out waiting for child stderr after kill")?
.context("failed to read child stderr")?;
anyhow::bail!(
"codex-stdio-to-uds did not exit in time; server events: {:?}; stderr: {}",
server_events,
String::from_utf8_lossy(&stderr).trim_end()
);
}
thread::sleep(Duration::from_millis(25));
};
let stdout = stdout_rx
.recv_timeout(Duration::from_secs(1))
.context("timed out waiting for child stdout")?
.context("failed to read child stdout")?;
let stderr = stderr_rx
.recv_timeout(Duration::from_secs(1))
.context("timed out waiting for child stderr")?
.context("failed to read child stderr")?;
assert!(
status.success(),
"codex-stdio-to-uds exited with {status}; server events: {:?}; stderr: {}",
server_events,
String::from_utf8_lossy(&stderr).trim_end()
);
assert_eq!(stdout, b"response");
let received = rx
.recv_timeout(Duration::from_secs(1))
.context("server did not receive data in time")?;
assert_eq!(received, b"request");
assert_eq!(received, request);
let server_result = server_thread
.join()
.map_err(|_| anyhow::anyhow!("server thread panicked"))?;
.map_err(|_| anyhow!("server thread panicked"))?;
server_result.context("server failed")?;
Ok(())

View File

@@ -0,0 +1,19 @@
---
source: tui/src/chatwidget/tests.rs
expression: popup
---
Experimental features
Toggle experimental features. Changes are saved to config.toml.
[ ] JavaScript REPL Enable a persistent Node-backed JavaScript REPL for interactive website debugging
and other inline JavaScript execution capabilities. Requires Node >= v22.22.0
installed.
[ ] Bubblewrap sandbox Try the new linux sandbox based on bubblewrap.
[ ] Multi-agents Ask Codex to spawn multiple agents to parallelize the work and win in efficiency.
[ ] Apps Use a connected ChatGPT App using "$". Install Apps via /apps command. Restart
Codex after enabling.
[ ] Guardian approvals Let a guardian subagent review `on-request` approval prompts instead of showing
them to you, including sandbox escapes and blocked network access.
[ ] Prevent sleep while running Keep your computer awake while Codex is running a thread.
Press space to select or enter to save for next conversation

View File

@@ -6949,6 +6949,11 @@ async fn experimental_popup_includes_guardian_approval() {
chat.open_experimental_popup();
let popup = render_bottom_popup(&chat, 120);
#[cfg(target_os = "linux")]
insta::with_settings!({ snapshot_suffix => "linux" }, {
assert_snapshot!("experimental_popup_includes_guardian_approval", popup);
});
#[cfg(not(target_os = "linux"))]
assert_snapshot!("experimental_popup_includes_guardian_approval", popup);
}

View File

@@ -130,52 +130,36 @@ async fn collect_output_until_exit(
}
async fn wait_for_python_repl_ready(
writer: &tokio::sync::mpsc::Sender<Vec<u8>>,
output_rx: &mut tokio::sync::broadcast::Receiver<Vec<u8>>,
timeout_ms: u64,
newline: &str,
ready_marker: &str,
) -> anyhow::Result<Vec<u8>> {
let mut collected = Vec::new();
let marker = "__codex_pty_ready__";
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_millis(timeout_ms);
let probe_window = tokio::time::Duration::from_millis(if cfg!(windows) { 750 } else { 250 });
while tokio::time::Instant::now() < deadline {
writer
.send(format!("print('{marker}'){newline}").into_bytes())
.await?;
let probe_deadline = tokio::time::Instant::now() + probe_window;
loop {
let now = tokio::time::Instant::now();
if now >= deadline || now >= probe_deadline {
break;
}
let remaining = std::cmp::min(
deadline.saturating_duration_since(now),
probe_deadline.saturating_duration_since(now),
);
match tokio::time::timeout(remaining, output_rx.recv()).await {
Ok(Ok(chunk)) => {
collected.extend_from_slice(&chunk);
if String::from_utf8_lossy(&collected).contains(marker) {
return Ok(collected);
}
let now = tokio::time::Instant::now();
let remaining = deadline.saturating_duration_since(now);
match tokio::time::timeout(remaining, output_rx.recv()).await {
Ok(Ok(chunk)) => {
collected.extend_from_slice(&chunk);
if String::from_utf8_lossy(&collected).contains(ready_marker) {
return Ok(collected);
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
anyhow::bail!(
"PTY output closed while waiting for Python REPL readiness: {:?}",
String::from_utf8_lossy(&collected)
);
}
Err(_) => break,
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
anyhow::bail!(
"PTY output closed while waiting for Python REPL readiness: {:?}",
String::from_utf8_lossy(&collected)
);
}
Err(_) => break,
}
}
anyhow::bail!(
"timed out waiting for Python REPL readiness in PTY: {:?}",
"timed out waiting for Python REPL readiness marker {ready_marker:?} in PTY: {:?}",
String::from_utf8_lossy(&collected)
);
}
@@ -254,10 +238,17 @@ async fn pty_python_repl_emits_output_and_exits() -> anyhow::Result<()> {
return Ok(());
};
let ready_marker = "__codex_pty_ready__";
let args = vec![
"-i".to_string(),
"-q".to_string(),
"-c".to_string(),
format!("print('{ready_marker}')"),
];
let env_map: HashMap<String, String> = std::env::vars().collect();
let spawned = spawn_pty_process(
&python,
&[],
&args,
Path::new("."),
&env_map,
&None,
@@ -269,7 +260,7 @@ async fn pty_python_repl_emits_output_and_exits() -> anyhow::Result<()> {
let newline = if cfg!(windows) { "\r\n" } else { "\n" };
let startup_timeout_ms = if cfg!(windows) { 10_000 } else { 5_000 };
let mut output =
wait_for_python_repl_ready(&writer, &mut output_rx, startup_timeout_ms, newline).await?;
wait_for_python_repl_ready(&mut output_rx, startup_timeout_ms, ready_marker).await?;
writer
.send(format!("print('hello from pty'){newline}").into_bytes())
.await?;

View File

@@ -80,7 +80,7 @@ def codex_rust_crate(
`CARGO_BIN_EXE_*` environment variables. These are only needed for binaries from a different crate.
"""
test_env = {
"INSTA_WORKSPACE_ROOT": ".",
"INSTA_WORKSPACE_ROOT": "codex-rs",
"INSTA_SNAPSHOT_PATH": "src",
}
@@ -127,7 +127,9 @@ def codex_rust_crate(
crate = name,
env = test_env,
deps = all_crate_deps(normal = True, normal_dev = True) + maybe_deps + deps_extra,
rustc_flags = rustc_flags_extra,
# Keep `file!()` paths Cargo-like (`core/src/...`) instead of
# Bazel workspace-prefixed (`codex-rs/core/src/...`) for snapshot parity.
rustc_flags = rustc_flags_extra + ["--remap-path-prefix=codex-rs="],
rustc_env = rustc_env,
data = test_data_extra,
tags = test_tags,
@@ -178,7 +180,7 @@ def codex_rust_crate(
rustc_flags = rustc_flags_extra + ["--remap-path-prefix=codex-rs="],
rustc_env = rustc_env,
# Important: do not merge `test_env` here. Its unit-test-only
# `INSTA_WORKSPACE_ROOT="."` can point integration tests at the
# `INSTA_WORKSPACE_ROOT="codex-rs"` can point integration tests at the
# runfiles cwd and cause false `.snap.new` churn on Linux.
env = cargo_env,
tags = test_tags,