Compare commits

...

18 Commits

Author SHA1 Message Date
Ahmed Ibrahim
e74732869f codex: validate flaky stabilization (3/5) 2026-03-09 02:18:04 -07:00
Ahmed Ibrahim
9193cdd2dd codex: validate flaky stabilization (2/5) 2026-03-09 01:52:26 -07:00
Ahmed Ibrahim
7ca4c040cb utils/pty: stabilize pipe stdin round trip on windows 2026-03-09 01:29:12 -07:00
Ahmed Ibrahim
53fd49964b codex: validate flaky stabilization (3/5) 2026-03-09 00:59:58 -07:00
Ahmed Ibrahim
69cb585006 codex: validate flaky stabilization (2/5) 2026-03-09 00:38:40 -07:00
Ahmed Ibrahim
fcab4c169f codex: stabilize plan item app-server test 2026-03-09 00:15:12 -07:00
Ahmed Ibrahim
2ccca2c154 codex: fix realtime startup context bazel build 2026-03-08 23:42:09 -07:00
Ahmed Ibrahim
771243d4cc codex: stabilize realtime startup context websockets 2026-03-08 23:34:37 -07:00
Ahmed Ibrahim
779095addd codex: stabilize app list update ordering test 2026-03-08 23:16:23 -07:00
Ahmed Ibrahim
17171119b2 codex: restart PR #13593 validation after CI infra failure 2026-03-08 22:59:51 -07:00
Ahmed Ibrahim
16b668df9e codex: validate PR #13593 (5/5) 2026-03-08 22:55:46 -07:00
Ahmed Ibrahim
754a75775e codex: validate PR #13593 (4/5) 2026-03-08 22:32:29 -07:00
Ahmed Ibrahim
1cfab80eb0 codex: validate PR #13593 (3/5) 2026-03-08 22:08:36 -07:00
Ahmed Ibrahim
73db9c2d6c codex: validate PR #13593 (2/5) 2026-03-08 21:46:26 -07:00
Ahmed Ibrahim
a45c778583 codex: fix CI failure on PR #13593 2026-03-08 21:23:24 -07:00
Ahmed Ibrahim
3ef8b576ad codex: fix PR #13593 control resume regression 2026-03-08 21:09:46 -07:00
Ahmed Ibrahim
e795b69905 codex: fix resume-agent nickname flake on PR #13593 2026-03-08 21:09:16 -07:00
Ahmed Ibrahim
af618c8dd2 codex: stabilize flaky tests on PR #13593 2026-03-08 20:52:27 -07:00
43 changed files with 1562 additions and 820 deletions

View File

@@ -2,6 +2,12 @@
# Do not increase, fix your test instead
slow-timeout = { period = "15s", terminate-after = 2 }
[test-groups.app_server_protocol_codegen]
max-threads = 1
[test-groups.app_server_integration]
max-threads = 1
[[profile.default.overrides]]
# Do not add new tests here
@@ -11,3 +17,13 @@ slow-timeout = { period = "1m", terminate-after = 4 }
[[profile.default.overrides]]
filter = 'test(approval_matrix_covers_all_modes)'
slow-timeout = { period = "30s", terminate-after = 2 }
[[profile.default.overrides]]
filter = 'package(codex-app-server-protocol) & (test(typescript_schema_fixtures_match_generated) | test(json_schema_fixtures_match_generated) | test(generate_ts_with_experimental_api_retains_experimental_entries) | test(generated_ts_optional_nullable_fields_only_in_params) | test(generate_json_filters_experimental_fields_and_methods))'
test-group = 'app_server_protocol_codegen'
[[profile.default.overrides]]
# These tests spawn a fresh app-server subprocess per case. Serializing them keeps
# Windows startup deterministic without widening the read timeouts.
filter = 'package(codex-app-server)'
test-group = 'app_server_integration'

3
codex-rs/Cargo.lock generated
View File

@@ -1436,7 +1436,6 @@ dependencies = [
"codex-utils-cargo-bin",
"codex-utils-cli",
"codex-utils-json-to-toml",
"codex-utils-pty",
"core_test_support",
"futures",
"owo-colors",
@@ -2416,6 +2415,7 @@ dependencies = [
"anyhow",
"chrono",
"clap",
"codex-otel",
"codex-protocol",
"dirs",
"log",
@@ -2435,7 +2435,6 @@ name = "codex-stdio-to-uds"
version = "0.0.0"
dependencies = [
"anyhow",
"assert_cmd",
"codex-utils-cargo-bin",
"pretty_assertions",
"tempfile",

View File

@@ -23,6 +23,7 @@ use schemars::schema_for;
use serde::Serialize;
use serde_json::Map;
use serde_json::Value;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::HashSet;
use std::ffi::OsStr;
@@ -32,9 +33,10 @@ use std::io::Write;
use std::path::Path;
use std::path::PathBuf;
use std::process::Command;
use std::thread;
use ts_rs::TS;
const HEADER: &str = "// GENERATED CODE! DO NOT MODIFY BY HAND!\n\n";
pub(crate) const GENERATED_TS_HEADER: &str = "// GENERATED CODE! DO NOT MODIFY BY HAND!\n\n";
const IGNORED_DEFINITIONS: &[&str] = &["Option<()>"];
const JSON_V1_ALLOWLIST: &[&str] = &["InitializeParams", "InitializeResponse"];
const SPECIAL_DEFINITIONS: &[&str] = &[
@@ -137,9 +139,29 @@ pub fn generate_ts_with_options(
}
if options.ensure_headers {
for file in &ts_files {
prepend_header_if_missing(file)?;
}
let worker_count = thread::available_parallelism()
.map_or(1, usize::from)
.min(ts_files.len().max(1));
let chunk_size = ts_files.len().div_ceil(worker_count);
thread::scope(|scope| -> Result<()> {
let mut workers = Vec::new();
for chunk in ts_files.chunks(chunk_size.max(1)) {
workers.push(scope.spawn(move || -> Result<()> {
for file in chunk {
prepend_header_if_missing(file)?;
}
Ok(())
}));
}
for worker in workers {
worker
.join()
.map_err(|_| anyhow!("TypeScript header worker panicked"))??;
}
Ok(())
})?;
}
// Optionally run Prettier on all generated TS files.
@@ -231,6 +253,41 @@ fn filter_experimental_ts(out_dir: &Path) -> Result<()> {
Ok(())
}
pub(crate) fn filter_experimental_ts_tree(tree: &mut BTreeMap<PathBuf, String>) -> Result<()> {
let registered_fields = experimental_fields();
let experimental_method_types = experimental_method_types();
if let Some(content) = tree.get_mut(Path::new("ClientRequest.ts")) {
let filtered =
filter_client_request_ts_contents(std::mem::take(content), EXPERIMENTAL_CLIENT_METHODS);
*content = filtered;
}
let mut fields_by_type_name: HashMap<String, HashSet<String>> = HashMap::new();
for field in registered_fields {
fields_by_type_name
.entry(field.type_name.to_string())
.or_default()
.insert(field.field_name.to_string());
}
for (path, content) in tree.iter_mut() {
let Some(type_name) = path.file_stem().and_then(|stem| stem.to_str()) else {
continue;
};
let Some(experimental_field_names) = fields_by_type_name.get(type_name) else {
continue;
};
let filtered = filter_experimental_type_fields_ts_contents(
std::mem::take(content),
experimental_field_names,
);
*content = filtered;
}
remove_generated_type_entries(tree, &experimental_method_types, "ts");
Ok(())
}
/// Removes union arms from `ClientRequest.ts` for methods marked experimental.
fn filter_client_request_ts(out_dir: &Path, experimental_methods: &[&str]) -> Result<()> {
let path = out_dir.join("ClientRequest.ts");
@@ -239,9 +296,15 @@ fn filter_client_request_ts(out_dir: &Path, experimental_methods: &[&str]) -> Re
}
let mut content =
fs::read_to_string(&path).with_context(|| format!("Failed to read {}", path.display()))?;
content = filter_client_request_ts_contents(content, experimental_methods);
fs::write(&path, content).with_context(|| format!("Failed to write {}", path.display()))?;
Ok(())
}
fn filter_client_request_ts_contents(mut content: String, experimental_methods: &[&str]) -> String {
let Some((prefix, body, suffix)) = split_type_alias(&content) else {
return Ok(());
return content;
};
let experimental_methods: HashSet<&str> = experimental_methods
.iter()
@@ -259,12 +322,9 @@ fn filter_client_request_ts(out_dir: &Path, experimental_methods: &[&str]) -> Re
let new_body = filtered_arms.join(" | ");
content = format!("{prefix}{new_body}{suffix}");
let import_usage_scope = split_type_alias(&content)
.map(|(_, body, _)| body)
.map(|(_, filtered_body, _)| filtered_body)
.unwrap_or_else(|| new_body.clone());
content = prune_unused_type_imports(content, &import_usage_scope);
fs::write(&path, content).with_context(|| format!("Failed to write {}", path.display()))?;
Ok(())
prune_unused_type_imports(content, &import_usage_scope)
}
/// Removes experimental properties from generated TypeScript type files.
@@ -302,8 +362,17 @@ fn filter_experimental_fields_in_ts_file(
) -> Result<()> {
let mut content =
fs::read_to_string(path).with_context(|| format!("Failed to read {}", path.display()))?;
content = filter_experimental_type_fields_ts_contents(content, experimental_field_names);
fs::write(path, content).with_context(|| format!("Failed to write {}", path.display()))?;
Ok(())
}
fn filter_experimental_type_fields_ts_contents(
mut content: String,
experimental_field_names: &HashSet<String>,
) -> String {
let Some((open_brace, close_brace)) = type_body_brace_span(&content) else {
return Ok(());
return content;
};
let inner = &content[open_brace + 1..close_brace];
let fields = split_top_level_multi(inner, &[',', ';']);
@@ -322,9 +391,7 @@ fn filter_experimental_fields_in_ts_file(
let import_usage_scope = split_type_alias(&content)
.map(|(_, body, _)| body)
.unwrap_or_else(|| new_inner.clone());
content = prune_unused_type_imports(content, &import_usage_scope);
fs::write(path, content).with_context(|| format!("Failed to write {}", path.display()))?;
Ok(())
prune_unused_type_imports(content, &import_usage_scope)
}
fn filter_experimental_schema(bundle: &mut Value) -> Result<()> {
@@ -526,6 +593,23 @@ fn remove_generated_type_files(
Ok(())
}
fn remove_generated_type_entries(
tree: &mut BTreeMap<PathBuf, String>,
type_names: &HashSet<String>,
extension: &str,
) {
for type_name in type_names {
for subdir in ["", "v1", "v2"] {
let path = if subdir.is_empty() {
PathBuf::from(format!("{type_name}.{extension}"))
} else {
PathBuf::from(subdir).join(format!("{type_name}.{extension}"))
};
tree.remove(&path);
}
}
}
fn remove_experimental_method_type_definitions(bundle: &mut Value) {
let type_names = experimental_method_types();
let Some(definitions) = bundle.get_mut("definitions").and_then(Value::as_object_mut) else {
@@ -1154,40 +1238,13 @@ fn insert_into_namespace(
.or_insert_with(|| Value::Object(Map::new()));
match entry {
Value::Object(map) => {
insert_definition(map, name, schema, &format!("namespace `{namespace}`"))
map.insert(name, schema);
Ok(())
}
_ => Err(anyhow!("expected namespace {namespace} to be an object")),
}
}
fn insert_definition(
definitions: &mut Map<String, Value>,
name: String,
schema: Value,
location: &str,
) -> Result<()> {
if let Some(existing) = definitions.get(&name) {
if existing == &schema {
return Ok(());
}
let existing_title = existing
.get("title")
.and_then(Value::as_str)
.unwrap_or("<untitled>");
let new_title = schema
.get("title")
.and_then(Value::as_str)
.unwrap_or("<untitled>");
return Err(anyhow!(
"schema definition collision in {location}: {name} (existing title: {existing_title}, new title: {new_title}); use #[schemars(rename = \"...\")] to rename one of the conflicting schema definitions"
));
}
definitions.insert(name, schema);
Ok(())
}
fn write_json_schema_with_return<T>(out_dir: &Path, name: &str) -> Result<GeneratedSchema>
where
T: JsonSchema,
@@ -1807,13 +1864,13 @@ fn prepend_header_if_missing(path: &Path) -> Result<()> {
.with_context(|| format!("Failed to read {}", path.display()))?;
}
if content.starts_with(HEADER) {
if content.starts_with(GENERATED_TS_HEADER) {
return Ok(());
}
let mut f = fs::File::create(path)
.with_context(|| format!("Failed to open {} for writing", path.display()))?;
f.write_all(HEADER.as_bytes())
f.write_all(GENERATED_TS_HEADER.as_bytes())
.with_context(|| format!("Failed to write header to {}", path.display()))?;
f.write_all(content.as_bytes())
.with_context(|| format!("Failed to write content to {}", path.display()))?;
@@ -1858,35 +1915,15 @@ fn ts_files_in_recursive(dir: &Path) -> Result<Vec<PathBuf>> {
/// Generate an index.ts file that re-exports all generated types.
/// This allows consumers to import all types from a single file.
fn generate_index_ts(out_dir: &Path) -> Result<PathBuf> {
let mut entries: Vec<String> = Vec::new();
let mut stems: Vec<String> = ts_files_in(out_dir)?
.into_iter()
.filter_map(|p| {
let stem = p.file_stem()?.to_string_lossy().into_owned();
if stem == "index" { None } else { Some(stem) }
})
.collect();
stems.sort();
stems.dedup();
for name in stems {
entries.push(format!("export type {{ {name} }} from \"./{name}\";\n"));
}
// If this is the root out_dir and a ./v2 folder exists with TS files,
// expose it as a namespace to avoid symbol collisions at the root.
let v2_dir = out_dir.join("v2");
let has_v2_ts = ts_files_in(&v2_dir).map(|v| !v.is_empty()).unwrap_or(false);
if has_v2_ts {
entries.push("export * as v2 from \"./v2\";\n".to_string());
}
let mut content =
String::with_capacity(HEADER.len() + entries.iter().map(String::len).sum::<usize>());
content.push_str(HEADER);
for line in &entries {
content.push_str(line);
}
let content = generated_index_ts_with_header(index_ts_entries(
&ts_files_in(out_dir)?
.iter()
.map(PathBuf::as_path)
.collect::<Vec<_>>(),
ts_files_in(&out_dir.join("v2"))
.map(|v| !v.is_empty())
.unwrap_or(false),
));
let index_path = out_dir.join("index.ts");
let mut f = fs::File::create(&index_path)
@@ -1896,244 +1933,278 @@ fn generate_index_ts(out_dir: &Path) -> Result<PathBuf> {
Ok(index_path)
}
pub(crate) fn generate_index_ts_tree(tree: &mut BTreeMap<PathBuf, String>) {
let root_entries = tree
.keys()
.filter(|path| path.components().count() == 1)
.map(PathBuf::as_path)
.collect::<Vec<_>>();
let has_v2_ts = tree.keys().any(|path| {
path.parent()
.is_some_and(|parent| parent == Path::new("v2"))
&& path.extension() == Some(OsStr::new("ts"))
&& path.file_stem().is_some_and(|stem| stem != "index")
});
tree.insert(
PathBuf::from("index.ts"),
index_ts_entries(&root_entries, has_v2_ts),
);
let v2_entries = tree
.keys()
.filter(|path| {
path.parent()
.is_some_and(|parent| parent == Path::new("v2"))
})
.map(PathBuf::as_path)
.collect::<Vec<_>>();
if !v2_entries.is_empty() {
tree.insert(
PathBuf::from("v2").join("index.ts"),
index_ts_entries(&v2_entries, false),
);
}
}
fn generated_index_ts_with_header(content: String) -> String {
let mut with_header = String::with_capacity(GENERATED_TS_HEADER.len() + content.len());
with_header.push_str(GENERATED_TS_HEADER);
with_header.push_str(&content);
with_header
}
fn index_ts_entries(paths: &[&Path], has_v2_ts: bool) -> String {
let mut stems: Vec<String> = paths
.iter()
.filter(|path| path.extension() == Some(OsStr::new("ts")))
.filter_map(|path| {
let stem = path.file_stem()?.to_string_lossy().into_owned();
if stem == "index" { None } else { Some(stem) }
})
.collect();
stems.sort();
stems.dedup();
let mut entries = String::new();
for name in stems {
entries.push_str(&format!("export type {{ {name} }} from \"./{name}\";\n"));
}
if has_v2_ts {
entries.push_str("export * as v2 from \"./v2\";\n");
}
entries
}
#[cfg(test)]
mod tests {
use super::*;
use crate::protocol::v2;
use crate::schema_fixtures::read_schema_fixture_subtree;
use anyhow::Context;
use anyhow::Result;
use pretty_assertions::assert_eq;
use std::collections::BTreeSet;
use std::fs;
use std::path::Path;
use std::path::PathBuf;
use uuid::Uuid;
#[test]
fn generated_ts_optional_nullable_fields_only_in_params() -> Result<()> {
// Assert that "?: T | null" only appears in generated *Params types.
let output_dir = std::env::temp_dir().join(format!("codex_ts_types_{}", Uuid::now_v7()));
fs::create_dir(&output_dir)?;
let fixture_tree = read_schema_fixture_subtree(&schema_root()?, "typescript")?;
struct TempDirGuard(PathBuf);
impl Drop for TempDirGuard {
fn drop(&mut self) {
let _ = fs::remove_dir_all(&self.0);
}
}
let _guard = TempDirGuard(output_dir.clone());
// Avoid doing more work than necessary to keep the test from timing out.
let options = GenerateTsOptions {
generate_indices: false,
ensure_headers: false,
run_prettier: false,
experimental_api: false,
};
generate_ts_with_options(&output_dir, None, options)?;
let client_request_ts = fs::read_to_string(output_dir.join("ClientRequest.ts"))?;
let client_request_ts = std::str::from_utf8(
fixture_tree
.get(Path::new("ClientRequest.ts"))
.ok_or_else(|| anyhow::anyhow!("missing ClientRequest.ts fixture"))?,
)?;
assert_eq!(client_request_ts.contains("mock/experimentalMethod"), false);
assert_eq!(
client_request_ts.contains("MockExperimentalMethodParams"),
false
);
assert_eq!(output_dir.join("EventMsg.ts").exists(), true);
let thread_start_ts =
fs::read_to_string(output_dir.join("v2").join("ThreadStartParams.ts"))?;
assert_eq!(fixture_tree.contains_key(Path::new("EventMsg.ts")), true);
let thread_start_ts = std::str::from_utf8(
fixture_tree
.get(Path::new("v2/ThreadStartParams.ts"))
.ok_or_else(|| anyhow::anyhow!("missing v2/ThreadStartParams.ts fixture"))?,
)?;
assert_eq!(thread_start_ts.contains("mockExperimentalField"), false);
assert_eq!(
output_dir
.join("v2")
.join("MockExperimentalMethodParams.ts")
.exists(),
fixture_tree.contains_key(Path::new("v2/MockExperimentalMethodParams.ts")),
false
);
assert_eq!(
output_dir
.join("v2")
.join("MockExperimentalMethodResponse.ts")
.exists(),
fixture_tree.contains_key(Path::new("v2/MockExperimentalMethodResponse.ts")),
false
);
let mut undefined_offenders = Vec::new();
let mut optional_nullable_offenders = BTreeSet::new();
let mut stack = vec![output_dir];
while let Some(dir) = stack.pop() {
for entry in fs::read_dir(&dir)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
stack.push(path);
for (path, contents) in &fixture_tree {
if !matches!(path.extension().and_then(|ext| ext.to_str()), Some("ts")) {
continue;
}
// Only allow "?: T | null" in objects representing JSON-RPC requests,
// which we assume are called "*Params".
let allow_optional_nullable = path
.file_stem()
.and_then(|stem| stem.to_str())
.is_some_and(|stem| {
stem.ends_with("Params")
|| stem == "InitializeCapabilities"
|| matches!(
stem,
"CollabAgentRef"
| "CollabAgentStatusEntry"
| "CollabAgentSpawnEndEvent"
| "CollabAgentInteractionEndEvent"
| "CollabCloseEndEvent"
| "CollabResumeBeginEvent"
| "CollabResumeEndEvent"
)
});
let contents = std::str::from_utf8(contents)?;
if contents.contains("| undefined") {
undefined_offenders.push(path.clone());
}
const SKIP_PREFIXES: &[&str] = &[
"const ",
"let ",
"var ",
"export const ",
"export let ",
"export var ",
];
let mut search_start = 0;
while let Some(idx) = contents[search_start..].find("| null") {
let abs_idx = search_start + idx;
// Find the property-colon for this field by scanning forward
// from the start of the segment and ignoring nested braces,
// brackets, and parens. This avoids colons inside nested
// type literals like `{ [k in string]?: string }`.
let line_start_idx = contents[..abs_idx].rfind('\n').map(|i| i + 1).unwrap_or(0);
let mut segment_start_idx = line_start_idx;
if let Some(rel_idx) = contents[line_start_idx..abs_idx].rfind(',') {
segment_start_idx = segment_start_idx.max(line_start_idx + rel_idx + 1);
}
if let Some(rel_idx) = contents[line_start_idx..abs_idx].rfind('{') {
segment_start_idx = segment_start_idx.max(line_start_idx + rel_idx + 1);
}
if let Some(rel_idx) = contents[line_start_idx..abs_idx].rfind('}') {
segment_start_idx = segment_start_idx.max(line_start_idx + rel_idx + 1);
}
// Scan forward for the colon that separates the field name from its type.
let mut level_brace = 0_i32;
let mut level_brack = 0_i32;
let mut level_paren = 0_i32;
let mut in_single = false;
let mut in_double = false;
let mut escape = false;
let mut prop_colon_idx = None;
for (i, ch) in contents[segment_start_idx..abs_idx].char_indices() {
let idx_abs = segment_start_idx + i;
if escape {
escape = false;
continue;
}
match ch {
'\\' => {
if in_single || in_double {
escape = true;
}
}
'\'' => {
if !in_double {
in_single = !in_single;
}
}
'"' => {
if !in_single {
in_double = !in_double;
}
}
'{' if !in_single && !in_double => level_brace += 1,
'}' if !in_single && !in_double => level_brace -= 1,
'[' if !in_single && !in_double => level_brack += 1,
']' if !in_single && !in_double => level_brack -= 1,
'(' if !in_single && !in_double => level_paren += 1,
')' if !in_single && !in_double => level_paren -= 1,
':' if !in_single
&& !in_double
&& level_brace == 0
&& level_brack == 0
&& level_paren == 0 =>
{
prop_colon_idx = Some(idx_abs);
break;
}
_ => {}
}
}
let Some(colon_idx) = prop_colon_idx else {
search_start = abs_idx + 5;
continue;
};
let mut field_prefix = contents[segment_start_idx..colon_idx].trim();
if field_prefix.is_empty() {
search_start = abs_idx + 5;
continue;
}
if matches!(path.extension().and_then(|ext| ext.to_str()), Some("ts")) {
// Only allow "?: T | null" in objects representing JSON-RPC requests,
// which we assume are called "*Params".
let allow_optional_nullable = path
.file_stem()
.and_then(|stem| stem.to_str())
.is_some_and(|stem| {
stem.ends_with("Params")
|| stem == "InitializeCapabilities"
|| matches!(
stem,
"CollabAgentRef"
| "CollabAgentStatusEntry"
| "CollabAgentSpawnEndEvent"
| "CollabAgentInteractionEndEvent"
| "CollabCloseEndEvent"
| "CollabResumeBeginEvent"
| "CollabResumeEndEvent"
)
});
let contents = fs::read_to_string(&path)?;
if contents.contains("| undefined") {
undefined_offenders.push(path.clone());
}
const SKIP_PREFIXES: &[&str] = &[
"const ",
"let ",
"var ",
"export const ",
"export let ",
"export var ",
];
let mut search_start = 0;
while let Some(idx) = contents[search_start..].find("| null") {
let abs_idx = search_start + idx;
// Find the property-colon for this field by scanning forward
// from the start of the segment and ignoring nested braces,
// brackets, and parens. This avoids colons inside nested
// type literals like `{ [k in string]?: string }`.
let line_start_idx =
contents[..abs_idx].rfind('\n').map(|i| i + 1).unwrap_or(0);
let mut segment_start_idx = line_start_idx;
if let Some(rel_idx) = contents[line_start_idx..abs_idx].rfind(',') {
segment_start_idx = segment_start_idx.max(line_start_idx + rel_idx + 1);
}
if let Some(rel_idx) = contents[line_start_idx..abs_idx].rfind('{') {
segment_start_idx = segment_start_idx.max(line_start_idx + rel_idx + 1);
}
if let Some(rel_idx) = contents[line_start_idx..abs_idx].rfind('}') {
segment_start_idx = segment_start_idx.max(line_start_idx + rel_idx + 1);
}
// Scan forward for the colon that separates the field name from its type.
let mut level_brace = 0_i32;
let mut level_brack = 0_i32;
let mut level_paren = 0_i32;
let mut in_single = false;
let mut in_double = false;
let mut escape = false;
let mut prop_colon_idx = None;
for (i, ch) in contents[segment_start_idx..abs_idx].char_indices() {
let idx_abs = segment_start_idx + i;
if escape {
escape = false;
continue;
}
match ch {
'\\' => {
// Only treat as escape when inside a string.
if in_single || in_double {
escape = true;
}
}
'\'' => {
if !in_double {
in_single = !in_single;
}
}
'"' => {
if !in_single {
in_double = !in_double;
}
}
'{' if !in_single && !in_double => level_brace += 1,
'}' if !in_single && !in_double => level_brace -= 1,
'[' if !in_single && !in_double => level_brack += 1,
']' if !in_single && !in_double => level_brack -= 1,
'(' if !in_single && !in_double => level_paren += 1,
')' if !in_single && !in_double => level_paren -= 1,
':' if !in_single
&& !in_double
&& level_brace == 0
&& level_brack == 0
&& level_paren == 0 =>
{
prop_colon_idx = Some(idx_abs);
break;
}
_ => {}
}
}
let Some(colon_idx) = prop_colon_idx else {
search_start = abs_idx + 5;
continue;
};
let mut field_prefix = contents[segment_start_idx..colon_idx].trim();
if field_prefix.is_empty() {
search_start = abs_idx + 5;
continue;
}
if let Some(comment_idx) = field_prefix.rfind("*/") {
field_prefix = field_prefix[comment_idx + 2..].trim_start();
}
if field_prefix.is_empty() {
search_start = abs_idx + 5;
continue;
}
if SKIP_PREFIXES
.iter()
.any(|prefix| field_prefix.starts_with(prefix))
{
search_start = abs_idx + 5;
continue;
}
if field_prefix.contains('(') {
search_start = abs_idx + 5;
continue;
}
// If the last non-whitespace before ':' is '?', then this is an
// optional field with a nullable type (i.e., "?: T | null").
// These are only allowed in *Params types.
if field_prefix.chars().rev().find(|c| !c.is_whitespace()) == Some('?')
&& !allow_optional_nullable
{
let line_number =
contents[..abs_idx].chars().filter(|c| *c == '\n').count() + 1;
let offending_line_end = contents[line_start_idx..]
.find('\n')
.map(|i| line_start_idx + i)
.unwrap_or(contents.len());
let offending_snippet =
contents[line_start_idx..offending_line_end].trim();
optional_nullable_offenders.insert(format!(
"{}:{}: {offending_snippet}",
path.display(),
line_number
));
}
search_start = abs_idx + 5;
}
if let Some(comment_idx) = field_prefix.rfind("*/") {
field_prefix = field_prefix[comment_idx + 2..].trim_start();
}
if field_prefix.is_empty() {
search_start = abs_idx + 5;
continue;
}
if SKIP_PREFIXES
.iter()
.any(|prefix| field_prefix.starts_with(prefix))
{
search_start = abs_idx + 5;
continue;
}
if field_prefix.contains('(') {
search_start = abs_idx + 5;
continue;
}
// If the last non-whitespace before ':' is '?', then this is an
// optional field with a nullable type (i.e., "?: T | null").
// These are only allowed in *Params types.
if field_prefix.chars().rev().find(|c| !c.is_whitespace()) == Some('?')
&& !allow_optional_nullable
{
let line_number =
contents[..abs_idx].chars().filter(|c| *c == '\n').count() + 1;
let offending_line_end = contents[line_start_idx..]
.find('\n')
.map(|i| line_start_idx + i)
.unwrap_or(contents.len());
let offending_snippet = contents[line_start_idx..offending_line_end].trim();
optional_nullable_offenders.insert(format!(
"{}:{}: {offending_snippet}",
path.display(),
line_number
));
}
search_start = abs_idx + 5;
}
}
@@ -2153,55 +2224,40 @@ mod tests {
Ok(())
}
fn schema_root() -> Result<PathBuf> {
let typescript_index = codex_utils_cargo_bin::find_resource!("schema/typescript/index.ts")
.context("resolve TypeScript schema index.ts")?;
let schema_root = typescript_index
.parent()
.and_then(|parent| parent.parent())
.context("derive schema root from schema/typescript/index.ts")?
.to_path_buf();
Ok(schema_root)
}
#[test]
fn generate_ts_with_experimental_api_retains_experimental_entries() -> Result<()> {
let output_dir =
std::env::temp_dir().join(format!("codex_ts_types_experimental_{}", Uuid::now_v7()));
fs::create_dir(&output_dir)?;
struct TempDirGuard(PathBuf);
impl Drop for TempDirGuard {
fn drop(&mut self) {
let _ = fs::remove_dir_all(&self.0);
}
}
let _guard = TempDirGuard(output_dir.clone());
let options = GenerateTsOptions {
generate_indices: false,
ensure_headers: false,
run_prettier: false,
experimental_api: true,
};
generate_ts_with_options(&output_dir, None, options)?;
let client_request_ts = fs::read_to_string(output_dir.join("ClientRequest.ts"))?;
let client_request_ts = ClientRequest::export_to_string()?;
assert_eq!(client_request_ts.contains("mock/experimentalMethod"), true);
assert_eq!(
output_dir
.join("v2")
.join("MockExperimentalMethodParams.ts")
.exists(),
client_request_ts.contains("MockExperimentalMethodParams"),
true
);
assert_eq!(
output_dir
.join("v2")
.join("MockExperimentalMethodResponse.ts")
.exists(),
v2::MockExperimentalMethodParams::export_to_string()?
.contains("MockExperimentalMethodParams"),
true
);
assert_eq!(
v2::MockExperimentalMethodResponse::export_to_string()?
.contains("MockExperimentalMethodResponse"),
true
);
let thread_start_ts =
fs::read_to_string(output_dir.join("v2").join("ThreadStartParams.ts"))?;
let thread_start_ts = v2::ThreadStartParams::export_to_string()?;
assert_eq!(thread_start_ts.contains("mockExperimentalField"), true);
let command_execution_request_approval_ts = fs::read_to_string(
output_dir
.join("v2")
.join("CommandExecutionRequestApprovalParams.ts"),
)?;
let command_execution_request_approval_ts =
v2::CommandExecutionRequestApprovalParams::export_to_string()?;
assert_eq!(
command_execution_request_approval_ts.contains("additionalPermissions"),
true
@@ -2322,48 +2378,6 @@ mod tests {
Ok(())
}
#[test]
fn build_schema_bundle_rejects_conflicting_duplicate_definitions() {
let err = build_schema_bundle(vec![
GeneratedSchema {
namespace: Some("v2".to_string()),
logical_name: "First".to_string(),
in_v1_dir: false,
value: serde_json::json!({
"title": "First",
"type": "object",
"definitions": {
"Shared": {
"title": "SharedString",
"type": "string"
}
}
}),
},
GeneratedSchema {
namespace: Some("v2".to_string()),
logical_name: "Second".to_string(),
in_v1_dir: false,
value: serde_json::json!({
"title": "Second",
"type": "object",
"definitions": {
"Shared": {
"title": "SharedInteger",
"type": "integer"
}
}
}),
},
])
.expect_err("conflicting schema definitions should be rejected");
assert_eq!(
err.to_string(),
"schema definition collision in namespace `v2`: Shared (existing title: SharedString, new title: SharedInteger); use #[schemars(rename = \"...\")] to rename one of the conflicting schema definitions"
);
}
#[test]
fn build_flat_v2_schema_keeps_shared_root_schemas_and_dependencies() -> Result<()> {
let bundle = serde_json::json!({

View File

@@ -17,6 +17,9 @@ pub use protocol::thread_history::*;
pub use protocol::v1::*;
pub use protocol::v2::*;
pub use schema_fixtures::SchemaFixtureOptions;
#[doc(hidden)]
pub use schema_fixtures::generate_typescript_schema_fixture_subtree_for_tests;
pub use schema_fixtures::read_schema_fixture_subtree;
pub use schema_fixtures::read_schema_fixture_tree;
pub use schema_fixtures::write_schema_fixtures;
pub use schema_fixtures::write_schema_fixtures_with_options;

View File

@@ -171,6 +171,12 @@ macro_rules! client_request_definitions {
Ok(())
}
pub(crate) fn visit_client_response_types(v: &mut impl ::ts_rs::TypeVisitor) {
$(
v.visit::<$response>();
)*
}
#[allow(clippy::vec_init_then_push)]
pub fn export_client_response_schemas(
out_dir: &::std::path::Path,
@@ -545,6 +551,12 @@ macro_rules! server_request_definitions {
Ok(())
}
pub(crate) fn visit_server_response_types(v: &mut impl ::ts_rs::TypeVisitor) {
$(
v.visit::<$response>();
)*
}
#[allow(clippy::vec_init_then_push)]
pub fn export_server_response_schemas(
out_dir: &Path,

View File

@@ -1,11 +1,25 @@
use crate::ClientNotification;
use crate::ClientRequest;
use crate::ServerNotification;
use crate::ServerRequest;
use crate::export::GENERATED_TS_HEADER;
use crate::export::filter_experimental_ts_tree;
use crate::export::generate_index_ts_tree;
use crate::protocol::common::visit_client_response_types;
use crate::protocol::common::visit_server_response_types;
use anyhow::Context;
use anyhow::Result;
use codex_protocol::protocol::EventMsg;
use serde_json::Map;
use serde_json::Value;
use std::any::TypeId;
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::collections::HashSet;
use std::path::Path;
use std::path::PathBuf;
use ts_rs::TS;
use ts_rs::TypeVisitor;
#[derive(Clone, Copy, Debug, Default)]
pub struct SchemaFixtureOptions {
@@ -27,6 +41,42 @@ pub fn read_schema_fixture_tree(schema_root: &Path) -> Result<BTreeMap<PathBuf,
Ok(all)
}
pub fn read_schema_fixture_subtree(
schema_root: &Path,
label: &str,
) -> Result<BTreeMap<PathBuf, Vec<u8>>> {
let subtree_root = schema_root.join(label);
collect_files_recursive(&subtree_root)
.with_context(|| format!("read schema fixture subtree {}", subtree_root.display()))
}
#[doc(hidden)]
pub fn generate_typescript_schema_fixture_subtree_for_tests() -> Result<BTreeMap<PathBuf, Vec<u8>>>
{
let mut files = BTreeMap::new();
let mut seen = HashSet::new();
collect_typescript_fixture_file::<ClientRequest>(&mut files, &mut seen)?;
visit_typescript_fixture_dependencies(&mut files, &mut seen, |visitor| {
visit_client_response_types(visitor);
})?;
collect_typescript_fixture_file::<ClientNotification>(&mut files, &mut seen)?;
collect_typescript_fixture_file::<ServerRequest>(&mut files, &mut seen)?;
visit_typescript_fixture_dependencies(&mut files, &mut seen, |visitor| {
visit_server_response_types(visitor);
})?;
collect_typescript_fixture_file::<ServerNotification>(&mut files, &mut seen)?;
collect_typescript_fixture_file::<EventMsg>(&mut files, &mut seen)?;
filter_experimental_ts_tree(&mut files)?;
generate_index_ts_tree(&mut files);
Ok(files
.into_iter()
.map(|(path, content)| (path, content.into_bytes()))
.collect())
}
/// Regenerates `schema/typescript/` and `schema/json/`.
///
/// This is intended to be used by tooling (e.g., `just write-app-server-schema`).
@@ -86,6 +136,12 @@ fn read_file_bytes(path: &Path) -> Result<Vec<u8>> {
let text = String::from_utf8(bytes)
.with_context(|| format!("expected UTF-8 TypeScript in {}", path.display()))?;
let text = text.replace("\r\n", "\n").replace('\r', "\n");
// Fixture comparisons care about schema content, not whether the generator
// re-prepended the standard banner to every TypeScript file.
let text = text
.strip_prefix(GENERATED_TS_HEADER)
.unwrap_or(&text)
.to_string();
return Ok(text.into_bytes());
}
Ok(bytes)
@@ -209,6 +265,73 @@ fn collect_files_recursive(root: &Path) -> Result<BTreeMap<PathBuf, Vec<u8>>> {
Ok(files)
}
fn collect_typescript_fixture_file<T: TS + 'static + ?Sized>(
files: &mut BTreeMap<PathBuf, String>,
seen: &mut HashSet<TypeId>,
) -> Result<()> {
let Some(output_path) = T::output_path() else {
return Ok(());
};
if !seen.insert(TypeId::of::<T>()) {
return Ok(());
}
let contents = T::export_to_string().context("export TypeScript fixture content")?;
let output_path = normalize_relative_fixture_path(&output_path);
files.insert(
output_path,
contents.replace("\r\n", "\n").replace('\r', "\n"),
);
let mut visitor = TypeScriptFixtureCollector {
files,
seen,
error: None,
};
T::visit_dependencies(&mut visitor);
if let Some(error) = visitor.error {
return Err(error);
}
Ok(())
}
fn normalize_relative_fixture_path(path: &Path) -> PathBuf {
path.components().collect()
}
fn visit_typescript_fixture_dependencies(
files: &mut BTreeMap<PathBuf, String>,
seen: &mut HashSet<TypeId>,
visit: impl FnOnce(&mut TypeScriptFixtureCollector<'_>),
) -> Result<()> {
let mut visitor = TypeScriptFixtureCollector {
files,
seen,
error: None,
};
visit(&mut visitor);
if let Some(error) = visitor.error {
return Err(error);
}
Ok(())
}
struct TypeScriptFixtureCollector<'a> {
files: &'a mut BTreeMap<PathBuf, String>,
seen: &'a mut HashSet<TypeId>,
error: Option<anyhow::Error>,
}
impl TypeVisitor for TypeScriptFixtureCollector<'_> {
fn visit<T: TS + 'static + ?Sized>(&mut self) {
if self.error.is_some() {
return;
}
self.error = collect_typescript_fixture_file::<T>(self.files, self.seen).err();
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -1,19 +1,60 @@
use anyhow::Context;
use anyhow::Result;
use codex_app_server_protocol::read_schema_fixture_tree;
use codex_app_server_protocol::write_schema_fixtures;
use codex_app_server_protocol::generate_json_with_experimental;
use codex_app_server_protocol::generate_typescript_schema_fixture_subtree_for_tests;
use codex_app_server_protocol::read_schema_fixture_subtree;
use similar::TextDiff;
use std::collections::BTreeMap;
use std::path::Path;
use std::path::PathBuf;
#[test]
fn schema_fixtures_match_generated() -> Result<()> {
fn typescript_schema_fixtures_match_generated() -> Result<()> {
let schema_root = schema_root()?;
let fixture_tree = read_tree(&schema_root)?;
let fixture_tree = read_tree(&schema_root, "typescript")?;
let generated_tree = generate_typescript_schema_fixture_subtree_for_tests()
.context("generate in-memory typescript schema fixtures")?;
assert_schema_trees_match("typescript", &fixture_tree, &generated_tree)?;
Ok(())
}
#[test]
fn json_schema_fixtures_match_generated() -> Result<()> {
assert_schema_fixtures_match_generated("json", |output_dir| {
generate_json_with_experimental(output_dir, false)
})
}
fn assert_schema_fixtures_match_generated(
label: &'static str,
generate: impl FnOnce(&Path) -> Result<()>,
) -> Result<()> {
let schema_root = schema_root()?;
let fixture_tree = read_tree(&schema_root, label)?;
let temp_dir = tempfile::tempdir().context("create temp dir")?;
write_schema_fixtures(temp_dir.path(), None).context("generate schema fixtures")?;
let generated_tree = read_tree(temp_dir.path())?;
let generated_root = temp_dir.path().join(label);
generate(&generated_root).with_context(|| {
format!(
"generate {label} schema fixtures into {}",
generated_root.display()
)
})?;
let generated_tree = read_tree(temp_dir.path(), label)?;
assert_schema_trees_match(label, &fixture_tree, &generated_tree)?;
Ok(())
}
fn assert_schema_trees_match(
label: &str,
fixture_tree: &BTreeMap<PathBuf, Vec<u8>>,
generated_tree: &BTreeMap<PathBuf, Vec<u8>>,
) -> Result<()> {
let fixture_paths = fixture_tree
.keys()
.map(|p| p.display().to_string())
@@ -32,13 +73,13 @@ fn schema_fixtures_match_generated() -> Result<()> {
.to_string();
panic!(
"Vendored app-server schema fixture file set doesn't match freshly generated output. \
"Vendored {label} app-server schema fixture file set doesn't match freshly generated output. \
Run `just write-app-server-schema` to overwrite with your changes.\n\n{diff}"
);
}
// If the file sets match, diff contents for each file for a nicer error.
for (path, expected) in &fixture_tree {
for (path, expected) in fixture_tree {
let actual = generated_tree
.get(path)
.ok_or_else(|| anyhow::anyhow!("missing generated file: {}", path.display()))?;
@@ -54,7 +95,7 @@ Run `just write-app-server-schema` to overwrite with your changes.\n\n{diff}"
.header("fixture", "generated")
.to_string();
panic!(
"Vendored app-server schema fixture {} differs from generated output. \
"Vendored {label} app-server schema fixture {} differs from generated output. \
Run `just write-app-server-schema` to overwrite with your changes.\n\n{diff}",
path.display()
);
@@ -63,7 +104,7 @@ Run `just write-app-server-schema` to overwrite with your changes.\n\n{diff}",
Ok(())
}
fn schema_root() -> Result<std::path::PathBuf> {
fn schema_root() -> Result<PathBuf> {
// In Bazel runfiles (especially manifest-only mode), resolving directories is not
// reliable. Resolve a known file, then walk up to the schema root.
let typescript_index = codex_utils_cargo_bin::find_resource!("schema/typescript/index.ts")
@@ -92,6 +133,11 @@ fn schema_root() -> Result<std::path::PathBuf> {
Ok(schema_root)
}
fn read_tree(root: &Path) -> Result<std::collections::BTreeMap<std::path::PathBuf, Vec<u8>>> {
read_schema_fixture_tree(root).context("read schema fixture tree")
fn read_tree(root: &Path, label: &str) -> Result<BTreeMap<PathBuf, Vec<u8>>> {
read_schema_fixture_subtree(root, label).with_context(|| {
format!(
"read {label} schema fixture subtree from {}",
root.display()
)
})
}

View File

@@ -8,6 +8,10 @@ license.workspace = true
name = "codex-app-server"
path = "src/main.rs"
[[bin]]
name = "codex-app-server-test-notify-capture"
path = "src/bin/notify_capture.rs"
[lib]
name = "codex_app_server"
path = "src/lib.rs"

View File

@@ -0,0 +1,44 @@
use std::env;
use std::fs;
use std::fs::File;
use std::io::Write;
use std::path::PathBuf;
use anyhow::Context;
use anyhow::Result;
use anyhow::anyhow;
use anyhow::bail;
fn main() -> Result<()> {
let mut args = env::args_os();
let _program = args.next();
let output_path = PathBuf::from(
args.next()
.ok_or_else(|| anyhow!("expected output path as first argument"))?,
);
let payload = args
.next()
.ok_or_else(|| anyhow!("expected payload as final argument"))?;
if args.next().is_some() {
bail!("expected payload as final argument");
}
let payload = payload.to_string_lossy();
let temp_path = PathBuf::from(format!("{}.tmp", output_path.display()));
let mut file = File::create(&temp_path)
.with_context(|| format!("failed to create {}", temp_path.display()))?;
file.write_all(payload.as_bytes())
.with_context(|| format!("failed to write {}", temp_path.display()))?;
file.sync_all()
.with_context(|| format!("failed to sync {}", temp_path.display()))?;
fs::rename(&temp_path, &output_path).with_context(|| {
format!(
"failed to move {} into {}",
temp_path.display(),
output_path.display()
)
})?;
Ok(())
}

View File

@@ -33,6 +33,9 @@ sandbox_mode = "danger-full-access"
model_provider = "mock_provider"
[features]
shell_snapshot = false
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "http://127.0.0.1:0/v1"
@@ -53,6 +56,9 @@ fn create_config_toml(codex_home: &Path) -> std::io::Result<()> {
model = "mock-model"
approval_policy = "never"
sandbox_mode = "danger-full-access"
[features]
shell_snapshot = false
"#,
)
}
@@ -65,6 +71,9 @@ model = "mock-model"
approval_policy = "never"
sandbox_mode = "danger-full-access"
forced_login_method = "{forced_method}"
[features]
shell_snapshot = false
"#
);
std::fs::write(config_toml, contents)

View File

@@ -7,6 +7,7 @@ use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::path::Path;
use tempfile::TempDir;
use tokio::time::timeout;
@@ -23,7 +24,23 @@ enum FileExpectation {
NonEmpty,
}
fn create_config_toml(codex_home: &Path) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
r#"
model = "mock-model"
approval_policy = "never"
sandbox_mode = "danger-full-access"
[features]
shell_snapshot = false
"#,
)
}
async fn initialized_mcp(codex_home: &TempDir) -> Result<McpProcess> {
create_config_toml(codex_home.path())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
Ok(mcp)
@@ -164,6 +181,7 @@ async fn assert_no_session_updates_for(
async fn test_fuzzy_file_search_sorts_and_includes_indices() -> Result<()> {
// Prepare a temporary Codex home and a separate root with test files.
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path())?;
let root = TempDir::new()?;
// Create files designed to have deterministic ordering for query "abe".
@@ -235,6 +253,7 @@ async fn test_fuzzy_file_search_sorts_and_includes_indices() -> Result<()> {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fuzzy_file_search_accepts_cancellation_token() -> Result<()> {
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path())?;
let root = TempDir::new()?;
std::fs::write(root.path().join("alpha.txt"), "contents")?;
@@ -434,7 +453,7 @@ async fn test_fuzzy_file_search_session_update_after_stop_fails() -> Result<()>
async fn test_fuzzy_file_search_session_stops_sending_updates_after_stop() -> Result<()> {
let codex_home = TempDir::new()?;
let root = TempDir::new()?;
for i in 0..2_000 {
for i in 0..512 {
let file_path = root.path().join(format!("file-{i:04}.txt"));
std::fs::write(file_path, "contents")?;
}

View File

@@ -83,6 +83,9 @@ sandbox_mode = "danger-full-access"
model_provider = "mock_provider"
[features]
shell_snapshot = false
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{base_url}"

View File

@@ -501,16 +501,6 @@ async fn list_apps_waits_for_accessible_data_before_emitting_directory_updates()
})
.await?;
let maybe_update = timeout(
Duration::from_millis(150),
read_app_list_updated_notification(&mut mcp),
)
.await;
assert!(
maybe_update.is_err(),
"unexpected directory-only app/list update before accessible apps loaded"
);
let expected = vec![
AppInfo {
id: "beta".to_string(),
@@ -544,8 +534,17 @@ async fn list_apps_waits_for_accessible_data_before_emitting_directory_updates()
},
];
let update = read_app_list_updated_notification(&mut mcp).await?;
assert_eq!(update.data, expected);
loop {
let update = read_app_list_updated_notification(&mut mcp).await?;
if update.data == expected {
break;
}
assert!(
!update.data.is_empty() && update.data.iter().all(|connector| connector.is_accessible),
"unexpected directory-only app/list update before accessible apps loaded"
);
}
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,

View File

@@ -29,7 +29,6 @@ use super::connection_handling_websocket::assert_no_message;
use super::connection_handling_websocket::connect_websocket;
use super::connection_handling_websocket::create_config_toml;
use super::connection_handling_websocket::read_jsonrpc_message;
use super::connection_handling_websocket::reserve_local_addr;
use super::connection_handling_websocket::send_initialize_request;
use super::connection_handling_websocket::send_request;
use super::connection_handling_websocket::spawn_websocket_server;
@@ -712,8 +711,7 @@ async fn command_exec_process_ids_are_connection_scoped_and_disconnect_terminate
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri(), "never")?;
let bind_addr = reserve_local_addr()?;
let mut process = spawn_websocket_server(codex_home.path(), bind_addr).await?;
let (mut process, bind_addr) = spawn_websocket_server(codex_home.path()).await?;
let mut ws1 = connect_websocket(bind_addr).await?;
let mut ws2 = connect_websocket(bind_addr).await?;

View File

@@ -18,6 +18,7 @@ use std::path::Path;
use std::process::Stdio;
use tempfile::TempDir;
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tokio::process::Child;
use tokio::process::Command;
use tokio::time::Duration;
@@ -39,8 +40,7 @@ async fn websocket_transport_routes_per_connection_handshake_and_responses() ->
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri(), "never")?;
let bind_addr = reserve_local_addr()?;
let mut process = spawn_websocket_server(codex_home.path(), bind_addr).await?;
let (mut process, bind_addr) = spawn_websocket_server(codex_home.path()).await?;
let mut ws1 = connect_websocket(bind_addr).await?;
let mut ws2 = connect_websocket(bind_addr).await?;
@@ -79,15 +79,12 @@ async fn websocket_transport_routes_per_connection_handshake_and_responses() ->
Ok(())
}
pub(super) async fn spawn_websocket_server(
codex_home: &Path,
bind_addr: SocketAddr,
) -> Result<Child> {
pub(super) async fn spawn_websocket_server(codex_home: &Path) -> Result<(Child, SocketAddr)> {
let program = codex_utils_cargo_bin::cargo_bin("codex-app-server")
.context("should find app-server binary")?;
let mut cmd = Command::new(program);
cmd.arg("--listen")
.arg(format!("ws://{bind_addr}"))
.arg("ws://127.0.0.1:0")
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::piped())
@@ -98,23 +95,57 @@ pub(super) async fn spawn_websocket_server(
.spawn()
.context("failed to spawn websocket app-server process")?;
if let Some(stderr) = process.stderr.take() {
let mut stderr_reader = tokio::io::BufReader::new(stderr).lines();
tokio::spawn(async move {
while let Ok(Some(line)) = stderr_reader.next_line().await {
eprintln!("[websocket app-server stderr] {line}");
let stderr = process
.stderr
.take()
.context("failed to capture websocket app-server stderr")?;
let mut stderr_reader = BufReader::new(stderr).lines();
let deadline = Instant::now() + Duration::from_secs(10);
let bind_addr = loop {
let line = timeout(
deadline.saturating_duration_since(Instant::now()),
stderr_reader.next_line(),
)
.await
.context("timed out waiting for websocket app-server to report bound websocket address")?
.context("failed to read websocket app-server stderr")?
.context("websocket app-server exited before reporting bound websocket address")?;
eprintln!("[websocket app-server stderr] {line}");
let stripped_line = {
let mut stripped = String::with_capacity(line.len());
let mut chars = line.chars().peekable();
while let Some(ch) = chars.next() {
if ch == '\u{1b}' && matches!(chars.peek(), Some(&'[')) {
chars.next();
for next in chars.by_ref() {
if ('@'..='~').contains(&next) {
break;
}
}
continue;
}
stripped.push(ch);
}
});
}
stripped
};
Ok(process)
}
if let Some(bind_addr) = stripped_line
.split_whitespace()
.find_map(|token| token.strip_prefix("ws://"))
.and_then(|addr| addr.parse::<SocketAddr>().ok())
{
break bind_addr;
}
};
pub(super) fn reserve_local_addr() -> Result<SocketAddr> {
let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
let addr = listener.local_addr()?;
drop(listener);
Ok(addr)
tokio::spawn(async move {
while let Ok(Some(line)) = stderr_reader.next_line().await {
eprintln!("[websocket app-server stderr] {line}");
}
});
Ok((process, bind_addr))
}
pub(super) async fn connect_websocket(bind_addr: SocketAddr) -> Result<WsClient> {

View File

@@ -3,7 +3,6 @@ use super::connection_handling_websocket::WsClient;
use super::connection_handling_websocket::connect_websocket;
use super::connection_handling_websocket::create_config_toml;
use super::connection_handling_websocket::read_response_for_id;
use super::connection_handling_websocket::reserve_local_addr;
use super::connection_handling_websocket::send_initialize_request;
use super::connection_handling_websocket::send_request;
use super::connection_handling_websocket::spawn_websocket_server;
@@ -154,8 +153,7 @@ async fn start_ctrl_c_restart_fixture(turn_delay: Duration) -> Result<GracefulCt
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri(), "never")?;
let bind_addr = reserve_local_addr()?;
let process = spawn_websocket_server(codex_home.path(), bind_addr).await?;
let (process, bind_addr) = spawn_websocket_server(codex_home.path()).await?;
let mut ws = connect_websocket(bind_addr).await?;
send_initialize_request(&mut ws, 1, "ws_graceful_shutdown").await?;

View File

@@ -193,13 +193,13 @@ async fn turn_start_notify_payload_includes_initialize_client_name() -> Result<(
let server = create_mock_responses_server_sequence_unchecked(responses).await;
let codex_home = TempDir::new()?;
let notify_file = codex_home.path().join("notify.json");
let notify_capture = cargo_bin("test_notify_capture")?;
let notify_capture = cargo_bin("codex-app-server-test-notify-capture")?;
let notify_capture = notify_capture
.to_str()
.expect("notify capture path should be valid UTF-8");
let notify_file = notify_file
let notify_file_str = notify_file
.to_str()
.expect("notify output path should be valid UTF-8");
.expect("notify file path should be valid UTF-8");
create_config_toml_with_extra(
codex_home.path(),
&server.uri(),
@@ -207,7 +207,7 @@ async fn turn_start_notify_payload_includes_initialize_client_name() -> Result<(
&format!(
"notify = [{}, {}]",
toml_basic_string(notify_capture),
toml_basic_string(notify_file)
toml_basic_string(notify_file_str)
),
)?;
@@ -255,7 +255,7 @@ async fn turn_start_notify_payload_includes_initialize_client_name() -> Result<(
)
.await??;
let notify_file = Path::new(notify_file);
let notify_file = Path::new(&notify_file);
fs_wait::wait_for_path_exists(notify_file, Duration::from_secs(5)).await?;
let payload_raw = tokio::fs::read_to_string(notify_file).await?;
let payload: Value = serde_json::from_str(&payload_raw)?;
@@ -292,6 +292,9 @@ model_provider = "mock_provider"
{extra}
[features]
shell_snapshot = false
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"

View File

@@ -1,7 +1,8 @@
use anyhow::Result;
use anyhow::anyhow;
use anyhow::bail;
use app_test_support::McpProcess;
use app_test_support::create_mock_responses_server_sequence;
use app_test_support::create_mock_responses_server_sequence_unchecked;
use app_test_support::to_response;
use codex_app_server_protocol::ItemCompletedNotification;
use codex_app_server_protocol::ItemStartedNotification;
@@ -28,11 +29,13 @@ use pretty_assertions::assert_eq;
use std::collections::BTreeMap;
use std::path::Path;
use tempfile::TempDir;
use tokio::time::sleep;
use tokio::time::timeout;
use wiremock::MockServer;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn plan_mode_uses_proposed_plan_block_for_plan_item() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -45,7 +48,7 @@ async fn plan_mode_uses_proposed_plan_block_for_plan_item() -> Result<()> {
responses::ev_assistant_message("msg-1", &full_message),
responses::ev_completed("resp-1"),
])];
let server = create_mock_responses_server_sequence(responses).await;
let server = create_mock_responses_server_sequence_unchecked(responses).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
@@ -56,6 +59,7 @@ async fn plan_mode_uses_proposed_plan_block_for_plan_item() -> Result<()> {
let turn = start_plan_mode_turn(&mut mcp).await?;
let (_, completed_items, plan_deltas, turn_completed) =
collect_turn_notifications(&mut mcp).await?;
wait_for_responses_request_count(&server, 1).await?;
assert_eq!(turn_completed.turn.id, turn.id);
assert_eq!(turn_completed.turn.status, TurnStatus::Completed);
@@ -93,7 +97,7 @@ async fn plan_mode_uses_proposed_plan_block_for_plan_item() -> Result<()> {
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn plan_mode_without_proposed_plan_does_not_emit_plan_item() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -102,7 +106,7 @@ async fn plan_mode_without_proposed_plan_does_not_emit_plan_item() -> Result<()>
responses::ev_assistant_message("msg-1", "Done"),
responses::ev_completed("resp-1"),
])];
let server = create_mock_responses_server_sequence(responses).await;
let server = create_mock_responses_server_sequence_unchecked(responses).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
@@ -112,6 +116,7 @@ async fn plan_mode_without_proposed_plan_does_not_emit_plan_item() -> Result<()>
let _turn = start_plan_mode_turn(&mut mcp).await?;
let (_, completed_items, plan_deltas, _) = collect_turn_notifications(&mut mcp).await?;
wait_for_responses_request_count(&server, 1).await?;
let has_plan_item = completed_items
.iter()
@@ -214,6 +219,36 @@ async fn collect_turn_notifications(
}
}
async fn wait_for_responses_request_count(
server: &MockServer,
expected_count: usize,
) -> Result<()> {
timeout(DEFAULT_READ_TIMEOUT, async {
loop {
let Some(requests) = server.received_requests().await else {
bail!("wiremock did not record requests");
};
let responses_request_count = requests
.iter()
.filter(|request| {
request.method == "POST" && request.url.path().ends_with("/responses")
})
.count();
if responses_request_count == expected_count {
return Ok::<(), anyhow::Error>(());
}
if responses_request_count > expected_count {
bail!(
"expected exactly {expected_count} /responses requests, got {responses_request_count}"
);
}
sleep(std::time::Duration::from_millis(10)).await;
}
})
.await??;
Ok(())
}
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
let features = BTreeMap::from([(Feature::CollaborationModes, true)]);
let feature_entries = features

View File

@@ -6,7 +6,6 @@ use super::connection_handling_websocket::create_config_toml;
use super::connection_handling_websocket::read_notification_for_method;
use super::connection_handling_websocket::read_response_and_notification_for_method;
use super::connection_handling_websocket::read_response_for_id;
use super::connection_handling_websocket::reserve_local_addr;
use super::connection_handling_websocket::send_initialize_request;
use super::connection_handling_websocket::send_request;
use super::connection_handling_websocket::spawn_websocket_server;
@@ -34,8 +33,7 @@ async fn thread_name_updated_broadcasts_for_loaded_threads() -> Result<()> {
create_config_toml(codex_home.path(), &server.uri(), "never")?;
let conversation_id = create_rollout(codex_home.path(), "2025-01-05T12-00-00")?;
let bind_addr = reserve_local_addr()?;
let mut process = spawn_websocket_server(codex_home.path(), bind_addr).await?;
let (mut process, bind_addr) = spawn_websocket_server(codex_home.path()).await?;
let result = async {
let mut ws1 = connect_websocket(bind_addr).await?;
@@ -96,8 +94,7 @@ async fn thread_name_updated_broadcasts_for_not_loaded_threads() -> Result<()> {
create_config_toml(codex_home.path(), &server.uri(), "never")?;
let conversation_id = create_rollout(codex_home.path(), "2025-01-05T12-05-00")?;
let bind_addr = reserve_local_addr()?;
let mut process = spawn_websocket_server(codex_home.path(), bind_addr).await?;
let (mut process, bind_addr) = spawn_websocket_server(codex_home.path()).await?;
let result = async {
let mut ws1 = connect_websocket(bind_addr).await?;

View File

@@ -4,7 +4,6 @@ use app_test_support::create_apply_patch_sse_response;
use app_test_support::create_fake_rollout_with_text_elements;
use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::create_mock_responses_server_sequence;
use app_test_support::create_mock_responses_server_sequence_unchecked;
use app_test_support::create_shell_command_sse_response;
use app_test_support::rollout_path;
@@ -60,6 +59,36 @@ use uuid::Uuid;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
const CODEX_5_2_INSTRUCTIONS_TEMPLATE_DEFAULT: &str = "You are Codex, a coding agent based on GPT-5. You and the user share the same workspace and collaborate to achieve the user's goals.";
async fn wait_for_responses_request_count(
server: &wiremock::MockServer,
expected_count: usize,
) -> Result<()> {
timeout(DEFAULT_READ_TIMEOUT, async {
loop {
let Some(requests) = server.received_requests().await else {
anyhow::bail!("wiremock did not record requests");
};
let responses_request_count = requests
.iter()
.filter(|request| {
request.method == "POST" && request.url.path().ends_with("/responses")
})
.count();
if responses_request_count == expected_count {
return Ok::<(), anyhow::Error>(());
}
if responses_request_count > expected_count {
anyhow::bail!(
"expected exactly {expected_count} /responses requests, got {responses_request_count}"
);
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
})
.await??;
Ok(())
}
#[tokio::test]
async fn thread_resume_rejects_unmaterialized_thread() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
@@ -982,6 +1011,7 @@ async fn thread_resume_replays_pending_command_execution_request_approval() -> R
primary.read_stream_until_notification_message("turn/completed"),
)
.await??;
wait_for_responses_request_count(&server, 3).await?;
Ok(())
}
@@ -1004,7 +1034,7 @@ async fn thread_resume_replays_pending_file_change_request_approval() -> Result<
create_apply_patch_sse_response(patch, "patch-call")?,
create_final_assistant_message_sse_response("done")?,
];
let server = create_mock_responses_server_sequence(responses).await;
let server = create_mock_responses_server_sequence_unchecked(responses).await;
create_config_toml(&codex_home, &server.uri())?;
let mut primary = McpProcess::new(&codex_home).await?;
@@ -1147,6 +1177,7 @@ async fn thread_resume_replays_pending_file_change_request_approval() -> Result<
primary.read_stream_until_notification_message("turn/completed"),
)
.await??;
wait_for_responses_request_count(&server, 3).await?;
Ok(())
}

View File

@@ -35,6 +35,51 @@ use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
async fn wait_for_responses_request_count_to_stabilize(
server: &wiremock::MockServer,
expected_count: usize,
settle_duration: std::time::Duration,
) -> Result<()> {
timeout(DEFAULT_READ_TIMEOUT, async {
let mut stable_since: Option<tokio::time::Instant> = None;
loop {
let requests = server
.received_requests()
.await
.context("failed to fetch received requests")?;
let responses_request_count = requests
.iter()
.filter(|request| {
request.method == "POST" && request.url.path().ends_with("/responses")
})
.count();
if responses_request_count > expected_count {
anyhow::bail!(
"expected exactly {expected_count} /responses requests, got {responses_request_count}"
);
}
if responses_request_count == expected_count {
match stable_since {
Some(stable_since) if stable_since.elapsed() >= settle_duration => {
return Ok::<(), anyhow::Error>(());
}
None => stable_since = Some(tokio::time::Instant::now()),
Some(_) => {}
}
} else {
stable_since = None;
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
})
.await??;
Ok(())
}
#[tokio::test]
async fn thread_unsubscribe_unloads_thread_and_emits_thread_closed_notification() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
@@ -172,6 +217,13 @@ async fn thread_unsubscribe_during_turn_interrupts_turn_and_emits_thread_closed(
};
assert_eq!(payload.thread_id, thread_id);
wait_for_responses_request_count_to_stabilize(
&server,
1,
std::time::Duration::from_millis(200),
)
.await?;
Ok(())
}

View File

@@ -54,6 +54,7 @@ async fn turn_start_shell_zsh_fork_executes_command_v2() -> Result<()> {
std::fs::create_dir(&codex_home)?;
let workspace = tmp.path().join("workspace");
std::fs::create_dir(&workspace)?;
let release_marker = workspace.join("interrupt-release");
let Some(zsh_path) = find_test_zsh_path()? else {
eprintln!("skipping zsh fork test: no zsh executable found");
@@ -61,8 +62,15 @@ async fn turn_start_shell_zsh_fork_executes_command_v2() -> Result<()> {
};
eprintln!("using zsh path for zsh-fork test: {}", zsh_path.display());
// Keep the shell command in flight until we interrupt it. A fast command
// like `echo hi` can finish before the interrupt arrives on faster runners,
// which turns this into a test for post-command follow-up behavior instead
// of interrupting an active zsh-fork command.
let release_marker_escaped = release_marker.to_string_lossy().replace('\'', r#"'\''"#);
let wait_for_interrupt =
format!("while [ ! -f '{release_marker_escaped}' ]; do sleep 0.01; done");
let response = create_shell_command_sse_response(
vec!["echo".to_string(), "hi".to_string()],
vec!["/bin/sh".to_string(), "-c".to_string(), wait_for_interrupt],
None,
Some(5000),
"call-zsh-fork",
@@ -155,7 +163,9 @@ async fn turn_start_shell_zsh_fork_executes_command_v2() -> Result<()> {
assert_eq!(id, "call-zsh-fork");
assert_eq!(status, CommandExecutionStatus::InProgress);
assert!(command.starts_with(&zsh_path.display().to_string()));
assert!(command.contains(" -lc 'echo hi'"));
assert!(command.contains("/bin/sh -c"));
assert!(command.contains("sleep 0.01"));
assert!(command.contains(&release_marker.display().to_string()));
assert_eq!(cwd, workspace);
mcp.interrupt_turn_and_wait_for_aborted(thread.id, turn.id, DEFAULT_READ_TIMEOUT)
@@ -663,22 +673,50 @@ async fn turn_start_shell_zsh_fork_subcommand_decline_marks_parent_declined_v2()
};
assert_eq!(id, "call-zsh-fork-subcommand-decline");
assert_eq!(status, CommandExecutionStatus::Declined);
assert!(
aggregated_output.is_none()
|| aggregated_output == Some("exec command rejected by user".to_string())
);
if let Some(output) = aggregated_output.as_deref() {
assert!(
output == "exec command rejected by user"
|| output.contains("sandbox denied exec error"),
"unexpected aggregated output: {output}"
);
}
mcp.interrupt_turn_and_wait_for_aborted(
thread.id.clone(),
turn.id.clone(),
match timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await?;
.await
{
Ok(Ok(completed_notif)) => {
let completed: TurnCompletedNotification = serde_json::from_value(
completed_notif
.params
.expect("turn/completed params must be present"),
)?;
assert_eq!(completed.thread_id, thread.id);
assert_eq!(completed.turn.id, turn.id);
assert!(matches!(
completed.turn.status,
TurnStatus::Interrupted | TurnStatus::Completed
));
}
Ok(Err(error)) => return Err(error),
Err(_) => {
mcp.interrupt_turn_and_wait_for_aborted(
thread.id.clone(),
turn.id.clone(),
DEFAULT_READ_TIMEOUT,
)
.await?;
}
}
}
Ok(Err(error)) => return Err(error),
Err(_) => {
// Some zsh builds abort the turn immediately after the rejected
// subcommand without emitting a parent `item/completed`.
// subcommand without emitting a parent `item/completed`, and Linux
// sandbox failures can also complete the turn before the parent
// completion item is observed.
let completed_notif = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
@@ -691,7 +729,10 @@ async fn turn_start_shell_zsh_fork_subcommand_decline_marks_parent_declined_v2()
)?;
assert_eq!(completed.thread_id, thread.id);
assert_eq!(completed.turn.id, turn.id);
assert_eq!(completed.turn.status, TurnStatus::Interrupted);
assert!(matches!(
completed.turn.status,
TurnStatus::Interrupted | TurnStatus::Completed
));
}
}

View File

@@ -53,9 +53,6 @@ enum WsCommand {
message: Message,
tx_result: oneshot::Sender<Result<(), WsError>>,
},
Close {
tx_result: oneshot::Sender<Result<(), WsError>>,
},
}
impl WsStream {
@@ -80,11 +77,6 @@ impl WsStream {
break;
}
}
WsCommand::Close { tx_result } => {
let result = inner.close(None).await;
let _ = tx_result.send(result);
break;
}
}
}
message = inner.next() => {
@@ -144,11 +136,6 @@ impl WsStream {
.await
}
async fn close(&self) -> Result<(), WsError> {
self.request(|tx_result| WsCommand::Close { tx_result })
.await
}
async fn next(&mut self) -> Option<Result<Message, WsError>> {
self.rx_message.recv().await
}
@@ -242,26 +229,32 @@ impl ResponsesWebsocketConnection {
.await;
}
let mut guard = stream.lock().await;
let Some(ws_stream) = guard.as_mut() else {
let _ = tx_event
.send(Err(ApiError::Stream(
"websocket connection is closed".to_string(),
)))
.await;
return;
let result = {
let Some(ws_stream) = guard.as_mut() else {
let _ = tx_event
.send(Err(ApiError::Stream(
"websocket connection is closed".to_string(),
)))
.await;
return;
};
run_websocket_response_stream(
ws_stream,
tx_event.clone(),
request_body,
idle_timeout,
telemetry,
)
.await
};
if let Err(err) = run_websocket_response_stream(
ws_stream,
tx_event.clone(),
request_body,
idle_timeout,
telemetry,
)
.await
{
let _ = ws_stream.close().await;
*guard = None;
if let Err(err) = result {
// A terminal stream error should reach the caller immediately. Waiting for a
// graceful close handshake here can stall indefinitely and mask the error.
let failed_stream = guard.take();
drop(guard);
drop(failed_stream);
let _ = tx_event.send(Err(err)).await;
}
});

View File

@@ -7,6 +7,7 @@ use crate::error::CodexErr;
use crate::error::Result as CodexResult;
use crate::find_thread_path_by_id_str;
use crate::rollout::RolloutRecorder;
use crate::rollout::list::read_session_meta_line;
use crate::session_prefix::format_subagent_context_line;
use crate::session_prefix::format_subagent_notification_message;
use crate::shell_snapshot::ShellSnapshot;
@@ -227,6 +228,10 @@ impl AgentControl {
) -> CodexResult<ThreadId> {
let state = self.upgrade()?;
let mut reservation = self.state.reserve_spawn_slot(config.agent_max_threads)?;
let rollout_path =
find_thread_path_by_id_str(config.codex_home.as_path(), &thread_id.to_string())
.await?
.ok_or_else(|| CodexErr::ThreadNotFound(thread_id))?;
let session_source = match session_source {
SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
@@ -235,7 +240,7 @@ impl AgentControl {
}) => {
// Collab resume callers rebuild a placeholder ThreadSpawn source. Rehydrate the
// stored nickname/role from sqlite when available; otherwise leave both unset.
let (resumed_agent_nickname, resumed_agent_role) =
let (mut resumed_agent_nickname, mut resumed_agent_role) =
if let Some(state_db_ctx) = state_db::get_state_db(&config).await {
match state_db_ctx.get_thread(thread_id).await {
Ok(Some(metadata)) => (metadata.agent_nickname, metadata.agent_role),
@@ -244,6 +249,20 @@ impl AgentControl {
} else {
(None, None)
};
if resumed_agent_nickname.is_none() || resumed_agent_role.is_none() {
// Rollouts are the canonical source for thread-spawn metadata; use them to
// heal partial sqlite rows during shutdown/resume races.
if let Ok(meta_line) = read_session_meta_line(rollout_path.as_path()).await
&& let SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
agent_nickname,
agent_role,
..
}) = meta_line.meta.source
{
resumed_agent_nickname = resumed_agent_nickname.or(agent_nickname);
resumed_agent_role = resumed_agent_role.or(agent_role);
}
}
let reserved_agent_nickname = resumed_agent_nickname
.as_deref()
.map(|agent_nickname| {
@@ -270,10 +289,6 @@ impl AgentControl {
let inherited_shell_snapshot = self
.inherited_shell_snapshot_for_source(&state, Some(&session_source))
.await;
let rollout_path =
find_thread_path_by_id_str(config.codex_home.as_path(), &thread_id.to_string())
.await?
.ok_or_else(|| CodexErr::ThreadNotFound(thread_id))?;
let resumed_thread = state
.resume_thread_from_rollout_with_source(
@@ -1530,12 +1545,38 @@ mod tests {
})
.await
.expect("child thread metadata should be persisted to sqlite before shutdown");
// Simulate the partial sqlite row we occasionally observe in CI and assert resume heals
// missing sub-agent metadata from the rollout itself.
let mut persisted_metadata = state_db
.get_thread(child_thread_id)
.await
.expect("sqlite metadata lookup should succeed")
.expect("child metadata should exist in sqlite");
persisted_metadata.agent_nickname = None;
state_db
.upsert_thread(&persisted_metadata)
.await
.expect("test setup should allow partial sqlite metadata");
let _ = harness
.control
.shutdown_agent(child_thread_id)
.await
.expect("child shutdown should submit");
timeout(Duration::from_secs(5), async {
loop {
let status = status_rx.borrow().clone();
if is_final(&status) {
break;
}
status_rx
.changed()
.await
.expect("child status should reach a final state before resume");
}
})
.await
.expect("child should finish shutting down before resume");
let resumed_thread_id = harness
.control

View File

@@ -16,6 +16,8 @@ use codex_execpolicy::RuleMatch;
use codex_protocol::models::FunctionCallOutputBody;
use codex_protocol::models::NetworkPermissions;
use codex_protocol::models::PermissionProfile;
use codex_protocol::permissions::FileSystemSandboxPolicy;
use codex_protocol::permissions::NetworkSandboxPolicy;
use codex_utils_absolute_path::AbsolutePathBuf;
use core_test_support::codex_linux_sandbox_exe_or_skip;
use core_test_support::responses::ev_assistant_message;
@@ -70,15 +72,17 @@ async fn guardian_allows_shell_additional_permissions_requests_past_policy_valid
.features
.enable(Feature::RequestPermissions)
.expect("test setup should allow enabling request permissions");
turn_context_raw
.sandbox_policy
.set(SandboxPolicy::DangerFullAccess)
.expect("test setup should allow updating sandbox policy");
// This test is about request-permissions validation, not managed sandbox
// policy enforcement. Widen the derived sandbox policies directly so the
// command runs without depending on a platform sandbox binary.
turn_context_raw.file_system_sandbox_policy =
codex_protocol::permissions::FileSystemSandboxPolicy::from(
&SandboxPolicy::DangerFullAccess,
);
FileSystemSandboxPolicy::from(turn_context_raw.sandbox_policy.get());
turn_context_raw.network_sandbox_policy =
codex_protocol::permissions::NetworkSandboxPolicy::from(&SandboxPolicy::DangerFullAccess);
NetworkSandboxPolicy::from(turn_context_raw.sandbox_policy.get());
let mut config = (*turn_context_raw.config).clone();
config.model_provider.base_url = Some(format!("{}/v1", server.uri()));
let config = Arc::new(config);
@@ -92,11 +96,14 @@ async fn guardian_allows_shell_additional_permissions_requests_past_policy_valid
turn_context_raw.provider = config.model_provider.clone();
let session = Arc::new(session);
let turn_context = Arc::new(turn_context_raw);
let expiration_ms: u64 = if cfg!(windows) { 2_500 } else { 1_000 };
let params = ExecParams {
command: if cfg!(windows) {
vec![
"cmd.exe".to_string(),
"/Q".to_string(),
"/D".to_string(),
"/C".to_string(),
"echo hi".to_string(),
]
@@ -108,7 +115,7 @@ async fn guardian_allows_shell_additional_permissions_requests_past_policy_valid
]
},
cwd: turn_context.cwd.clone(),
expiration: 1000.into(),
expiration: expiration_ms.into(),
env: HashMap::new(),
network: None,
sandbox_permissions: SandboxPermissions::WithAdditionalPermissions,

View File

@@ -548,6 +548,8 @@ mod tests {
use super::*;
use pretty_assertions::assert_eq;
#[cfg(unix)]
use serial_test::serial;
#[cfg(unix)]
use std::os::unix::ffi::OsStrExt;
#[cfg(unix)]
use std::process::Command;
@@ -735,12 +737,17 @@ mod tests {
#[cfg(unix)]
#[tokio::test]
#[serial(stdin_fd)]
async fn snapshot_shell_does_not_inherit_stdin() -> Result<()> {
let _stdin_guard = BlockingStdinPipe::install()?;
let dir = tempdir()?;
let home = dir.path();
fs::write(home.join(".bashrc"), "read -r ignored\n").await?;
let read_status_path = home.join("stdin-read-status");
let read_status_display = read_status_path.display();
let bashrc =
format!("read -t 1 -r ignored\nprintf '%s' \"$?\" > \"{read_status_display}\"\n");
fs::write(home.join(".bashrc"), bashrc).await?;
let shell = Shell {
shell_type: ShellType::Bash,
@@ -753,10 +760,17 @@ mod tests {
"HOME=\"{home_display}\"; export HOME; {}",
bash_snapshot_script()
);
let output =
run_script_with_timeout(&shell, &script, Duration::from_millis(500), true, home)
.await
.context("run snapshot command")?;
let output = run_script_with_timeout(&shell, &script, Duration::from_secs(2), true, home)
.await
.context("run snapshot command")?;
let read_status = fs::read_to_string(&read_status_path)
.await
.context("read stdin probe status")?;
assert_eq!(
read_status, "1",
"expected shell startup read to see EOF on stdin; status={read_status:?}"
);
assert!(
output.contains("# Snapshot file"),

View File

@@ -201,8 +201,13 @@ impl Session {
}
pub async fn abort_all_tasks(self: &Arc<Self>, reason: TurnAbortReason) {
for task in self.take_all_running_tasks().await {
self.handle_task_abort(task, reason.clone()).await;
if let Some(mut active_turn) = self.take_active_turn().await {
for task in active_turn.drain_tasks() {
self.handle_task_abort(task, reason.clone()).await;
}
// Let interrupted tasks observe cancellation before dropping pending approvals, or an
// in-flight approval wait can surface as a model-visible rejection before TurnAborted.
active_turn.clear_pending().await;
}
if reason == TurnAbortReason::Interrupted {
self.close_unified_exec_processes().await;
@@ -342,16 +347,9 @@ impl Session {
*active = Some(turn);
}
async fn take_all_running_tasks(&self) -> Vec<RunningTask> {
async fn take_active_turn(&self) -> Option<ActiveTurn> {
let mut active = self.active_turn.lock().await;
match active.take() {
Some(mut at) => {
at.clear_pending().await;
at.drain_tasks()
}
None => Vec::new(),
}
active.take()
}
pub(crate) async fn close_unified_exec_processes(&self) {

View File

@@ -95,7 +95,7 @@ fn resolve_workdir_base_path(
/// Validates feature/policy constraints for `with_additional_permissions` and
/// normalizes any path-based permissions. Errors if the request is invalid.
pub(super) fn normalize_and_validate_additional_permissions(
pub(crate) fn normalize_and_validate_additional_permissions(
request_permission_enabled: bool,
approval_policy: AskForApproval,
sandbox_permissions: SandboxPermissions,

View File

@@ -29,6 +29,7 @@ use core_test_support::wait_for_event_match;
use pretty_assertions::assert_eq;
use serde_json::Value;
use serde_json::json;
use serial_test::serial;
use std::ffi::OsString;
use std::fs;
use std::time::Duration;
@@ -37,7 +38,6 @@ use tokio::sync::oneshot;
const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex.";
const MEMORY_PROMPT_PHRASE: &str =
"You have access to a memory folder with guidance from prior runs.";
fn websocket_request_text(
request: &core_test_support::responses::WebSocketRequest,
) -> Option<String> {
@@ -54,6 +54,34 @@ fn websocket_request_instructions(
.map(str::to_owned)
}
async fn wait_for_matching_websocket_request<F>(
server: &core_test_support::responses::WebSocketTestServer,
description: &str,
predicate: F,
) -> core_test_support::responses::WebSocketRequest
where
F: Fn(&core_test_support::responses::WebSocketRequest) -> bool,
{
let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
loop {
if let Some(request) = server
.connections()
.iter()
.flat_map(|connection| connection.iter())
.find(|request| predicate(request))
.cloned()
{
return request;
}
assert!(
tokio::time::Instant::now() < deadline,
"timed out waiting for {description}"
);
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
async fn seed_recent_thread(
test: &TestCodex,
title: &str,
@@ -230,6 +258,7 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> {
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial(openai_api_key_env)]
async fn conversation_start_uses_openai_env_key_fallback_with_chatgpt_auth() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -632,21 +661,23 @@ async fn conversation_uses_experimental_realtime_ws_backend_prompt_override() ->
async fn conversation_uses_experimental_realtime_ws_startup_context_override() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_websocket_server(vec![
vec![],
vec![vec![json!({
"type": "session.updated",
"session": { "id": "sess_custom_context", "instructions": "prompt from config" }
})]],
])
let startup_server = start_websocket_server(vec![vec![]]).await;
let realtime_server = start_websocket_server(vec![vec![vec![json!({
"type": "session.updated",
"session": { "id": "sess_custom_context", "instructions": "prompt from config" }
})]]])
.await;
let mut builder = test_codex().with_config(|config| {
config.experimental_realtime_ws_backend_prompt = Some("prompt from config".to_string());
config.experimental_realtime_ws_startup_context =
Some("custom startup context".to_string());
let mut builder = test_codex().with_config({
let realtime_base_url = realtime_server.uri().to_string();
move |config| {
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
config.experimental_realtime_ws_backend_prompt = Some("prompt from config".to_string());
config.experimental_realtime_ws_startup_context =
Some("custom startup context".to_string());
}
});
let test = builder.build_with_websocket_server(&server).await?;
let test = builder.build_with_websocket_server(&startup_server).await?;
seed_recent_thread(
&test,
"Recent work: cleaned up startup flows and reviewed websocket routing.",
@@ -656,7 +687,11 @@ async fn conversation_uses_experimental_realtime_ws_startup_context_override() -
.await?;
fs::create_dir_all(test.workspace_path("docs"))?;
fs::write(test.workspace_path("README.md"), "workspace marker")?;
assert!(server.wait_for_handshakes(1, Duration::from_secs(2)).await);
assert!(
startup_server
.wait_for_handshakes(1, Duration::from_secs(2))
.await
);
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
@@ -665,17 +700,12 @@ async fn conversation_uses_experimental_realtime_ws_startup_context_override() -
}))
.await?;
wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionUpdated { session_id, .. },
}) if session_id == "sess_custom_context" => Some(Ok(())),
EventMsg::Error(err) => Some(Err(err.clone())),
_ => None,
})
.await
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
let startup_context_request = server.wait_for_request(1, 0).await;
let startup_context_request = wait_for_matching_websocket_request(
&realtime_server,
"startup context request with instructions",
|request| websocket_request_instructions(request).is_some(),
)
.await;
let instructions = websocket_request_instructions(&startup_context_request)
.expect("custom startup context request should contain instructions");
@@ -683,7 +713,8 @@ async fn conversation_uses_experimental_realtime_ws_startup_context_override() -
assert!(!instructions.contains(STARTUP_CONTEXT_HEADER));
assert!(!instructions.contains("## Machine / Workspace Map"));
server.shutdown().await;
startup_server.shutdown().await;
realtime_server.shutdown().await;
Ok(())
}
@@ -691,20 +722,22 @@ async fn conversation_uses_experimental_realtime_ws_startup_context_override() -
async fn conversation_disables_realtime_startup_context_with_empty_override() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_websocket_server(vec![
vec![],
vec![vec![json!({
"type": "session.updated",
"session": { "id": "sess_no_context", "instructions": "prompt from config" }
})]],
])
let startup_server = start_websocket_server(vec![vec![]]).await;
let realtime_server = start_websocket_server(vec![vec![vec![json!({
"type": "session.updated",
"session": { "id": "sess_no_context", "instructions": "prompt from config" }
})]]])
.await;
let mut builder = test_codex().with_config(|config| {
config.experimental_realtime_ws_backend_prompt = Some("prompt from config".to_string());
config.experimental_realtime_ws_startup_context = Some(String::new());
let mut builder = test_codex().with_config({
let realtime_base_url = realtime_server.uri().to_string();
move |config| {
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
config.experimental_realtime_ws_backend_prompt = Some("prompt from config".to_string());
config.experimental_realtime_ws_startup_context = Some(String::new());
}
});
let test = builder.build_with_websocket_server(&server).await?;
let test = builder.build_with_websocket_server(&startup_server).await?;
seed_recent_thread(
&test,
"Recent work: cleaned up startup flows and reviewed websocket routing.",
@@ -714,7 +747,11 @@ async fn conversation_disables_realtime_startup_context_with_empty_override() ->
.await?;
fs::create_dir_all(test.workspace_path("docs"))?;
fs::write(test.workspace_path("README.md"), "workspace marker")?;
assert!(server.wait_for_handshakes(1, Duration::from_secs(2)).await);
assert!(
startup_server
.wait_for_handshakes(1, Duration::from_secs(2))
.await
);
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
@@ -723,17 +760,12 @@ async fn conversation_disables_realtime_startup_context_with_empty_override() ->
}))
.await?;
wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionUpdated { session_id, .. },
}) if session_id == "sess_no_context" => Some(Ok(())),
EventMsg::Error(err) => Some(Err(err.clone())),
_ => None,
})
.await
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
let startup_context_request = server.wait_for_request(1, 0).await;
let startup_context_request = wait_for_matching_websocket_request(
&realtime_server,
"startup context disable request with instructions",
|request| websocket_request_instructions(request).is_some(),
)
.await;
let instructions = websocket_request_instructions(&startup_context_request)
.expect("startup context disable request should contain instructions");
@@ -741,7 +773,8 @@ async fn conversation_disables_realtime_startup_context_with_empty_override() ->
assert!(!instructions.contains(STARTUP_CONTEXT_HEADER));
assert!(!instructions.contains("## Machine / Workspace Map"));
server.shutdown().await;
startup_server.shutdown().await;
realtime_server.shutdown().await;
Ok(())
}
@@ -749,17 +782,20 @@ async fn conversation_disables_realtime_startup_context_with_empty_override() ->
async fn conversation_start_injects_startup_context_from_thread_history() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_websocket_server(vec![
vec![],
vec![vec![json!({
"type": "session.updated",
"session": { "id": "sess_context", "instructions": "backend prompt" }
})]],
])
let startup_server = start_websocket_server(vec![vec![]]).await;
let realtime_server = start_websocket_server(vec![vec![vec![json!({
"type": "session.updated",
"session": { "id": "sess_context", "instructions": "backend prompt" }
})]]])
.await;
let mut builder = test_codex();
let test = builder.build_with_websocket_server(&server).await?;
let mut builder = test_codex().with_config({
let realtime_base_url = realtime_server.uri().to_string();
move |config| {
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
}
});
let test = builder.build_with_websocket_server(&startup_server).await?;
seed_recent_thread(
&test,
"Recent work: cleaned up startup flows and reviewed websocket routing.",
@@ -777,17 +813,12 @@ async fn conversation_start_injects_startup_context_from_thread_history() -> Res
}))
.await?;
wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionUpdated { session_id, .. },
}) if session_id == "sess_context" => Some(Ok(())),
EventMsg::Error(err) => Some(Err(err.clone())),
_ => None,
})
.await
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
let startup_context_request = server.wait_for_request(1, 0).await;
let startup_context_request = wait_for_matching_websocket_request(
&realtime_server,
"startup context request with instructions",
|request| websocket_request_instructions(request).is_some(),
)
.await;
let startup_context = websocket_request_instructions(&startup_context_request)
.expect("startup context request should contain instructions");
@@ -802,7 +833,8 @@ async fn conversation_start_injects_startup_context_from_thread_history() -> Res
assert!(startup_context.contains("README.md"));
assert!(!startup_context.contains(MEMORY_PROMPT_PHRASE));
server.shutdown().await;
startup_server.shutdown().await;
realtime_server.shutdown().await;
Ok(())
}
@@ -810,17 +842,20 @@ async fn conversation_start_injects_startup_context_from_thread_history() -> Res
async fn conversation_startup_context_falls_back_to_workspace_map() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_websocket_server(vec![
vec![],
vec![vec![json!({
"type": "session.updated",
"session": { "id": "sess_workspace", "instructions": "backend prompt" }
})]],
])
let startup_server = start_websocket_server(vec![vec![]]).await;
let realtime_server = start_websocket_server(vec![vec![vec![json!({
"type": "session.updated",
"session": { "id": "sess_workspace", "instructions": "backend prompt" }
})]]])
.await;
let mut builder = test_codex();
let test = builder.build_with_websocket_server(&server).await?;
let mut builder = test_codex().with_config({
let realtime_base_url = realtime_server.uri().to_string();
move |config| {
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
}
});
let test = builder.build_with_websocket_server(&startup_server).await?;
fs::create_dir_all(test.workspace_path("codex-rs/core"))?;
fs::write(test.workspace_path("notes.txt"), "workspace marker")?;
@@ -831,17 +866,12 @@ async fn conversation_startup_context_falls_back_to_workspace_map() -> Result<()
}))
.await?;
wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionUpdated { session_id, .. },
}) if session_id == "sess_workspace" => Some(Ok(())),
EventMsg::Error(err) => Some(Err(err.clone())),
_ => None,
})
.await
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
let startup_context_request = server.wait_for_request(1, 0).await;
let startup_context_request = wait_for_matching_websocket_request(
&realtime_server,
"workspace-map startup context request with instructions",
|request| websocket_request_instructions(request).is_some(),
)
.await;
let startup_context = websocket_request_instructions(&startup_context_request)
.expect("startup context request should contain instructions");
@@ -850,7 +880,8 @@ async fn conversation_startup_context_falls_back_to_workspace_map() -> Result<()
assert!(startup_context.contains("notes.txt"));
assert!(startup_context.contains("codex-rs/"));
server.shutdown().await;
startup_server.shutdown().await;
realtime_server.shutdown().await;
Ok(())
}
@@ -858,21 +889,24 @@ async fn conversation_startup_context_falls_back_to_workspace_map() -> Result<()
async fn conversation_startup_context_is_truncated_and_sent_once_per_start() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_websocket_server(vec![
let startup_server = start_websocket_server(vec![vec![]]).await;
let realtime_server = start_websocket_server(vec![vec![
vec![json!({
"type": "session.updated",
"session": { "id": "sess_truncated", "instructions": "backend prompt" }
})],
vec![],
vec![
vec![json!({
"type": "session.updated",
"session": { "id": "sess_truncated", "instructions": "backend prompt" }
})],
vec![],
],
])
]])
.await;
let oversized_summary = "recent work ".repeat(3_500);
let mut builder = test_codex();
let test = builder.build_with_websocket_server(&server).await?;
let mut builder = test_codex().with_config({
let realtime_base_url = realtime_server.uri().to_string();
move |config| {
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
}
});
let test = builder.build_with_websocket_server(&startup_server).await?;
seed_recent_thread(&test, &oversized_summary, "summary", "oversized").await?;
fs::write(test.workspace_path("marker.txt"), "marker")?;
@@ -883,17 +917,12 @@ async fn conversation_startup_context_is_truncated_and_sent_once_per_start() ->
}))
.await?;
wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionUpdated { session_id, .. },
}) if session_id == "sess_truncated" => Some(Ok(())),
EventMsg::Error(err) => Some(Err(err.clone())),
_ => None,
})
.await
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
let startup_context_request = server.wait_for_request(1, 0).await;
let startup_context_request = wait_for_matching_websocket_request(
&realtime_server,
"truncated startup context request with instructions",
|request| websocket_request_instructions(request).is_some(),
)
.await;
let startup_context = websocket_request_instructions(&startup_context_request)
.expect("startup context request should contain instructions");
assert!(startup_context.contains(STARTUP_CONTEXT_HEADER));
@@ -905,13 +934,19 @@ async fn conversation_startup_context_is_truncated_and_sent_once_per_start() ->
}))
.await?;
let explicit_text_request = server.wait_for_request(1, 1).await;
let explicit_text_request = wait_for_matching_websocket_request(
&realtime_server,
"explicit realtime text request",
|request| websocket_request_text(request).as_deref() == Some("hello"),
)
.await;
assert_eq!(
websocket_request_text(&explicit_text_request),
Some("hello".to_string())
);
server.shutdown().await;
startup_server.shutdown().await;
realtime_server.shutdown().await;
Ok(())
}

View File

@@ -13,10 +13,49 @@ use core_test_support::responses::mount_sse_sequence;
use core_test_support::responses::sse;
use core_test_support::responses::start_mock_server;
use core_test_support::skip_if_no_network;
use core_test_support::test_codex::TestCodex;
use core_test_support::test_codex::TestCodexBuilder;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use pretty_assertions::assert_eq;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tempfile::TempDir;
use wiremock::MockServer;
async fn resume_until_initial_messages(
builder: &mut TestCodexBuilder,
server: &MockServer,
home: Arc<TempDir>,
rollout_path: PathBuf,
predicate: impl Fn(&[EventMsg]) -> bool,
) -> Result<TestCodex> {
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
let poll_interval = Duration::from_millis(10);
let mut last_initial_messages = "<missing initial messages>".to_string();
loop {
let resumed = builder
.resume(server, Arc::clone(&home), rollout_path.clone())
.await?;
if let Some(initial_messages) = resumed.session_configured.initial_messages.as_ref() {
if predicate(initial_messages) {
return Ok(resumed);
}
last_initial_messages = format!("{initial_messages:#?}");
}
if tokio::time::Instant::now() >= deadline {
panic!(
"timed out waiting for rollout resume messages to stabilize: {last_initial_messages}"
);
}
drop(resumed);
tokio::time::sleep(poll_interval).await;
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn resume_includes_initial_messages_from_rollout_events() -> Result<()> {
@@ -57,7 +96,26 @@ async fn resume_includes_initial_messages_from_rollout_events() -> Result<()> {
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
let resumed = builder.resume(&server, home, rollout_path).await?;
let resumed = resume_until_initial_messages(
&mut builder,
&server,
home,
rollout_path,
|initial_messages| {
matches!(
initial_messages,
[
EventMsg::TurnStarted(_),
EventMsg::UserMessage(_),
EventMsg::TokenCount(_),
EventMsg::AgentMessage(_),
EventMsg::TokenCount(_),
EventMsg::TurnComplete(_),
]
)
},
)
.await?;
let initial_messages = resumed
.session_configured
.initial_messages
@@ -123,7 +181,28 @@ async fn resume_includes_initial_messages_from_reasoning_events() -> Result<()>
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
let resumed = builder.resume(&server, home, rollout_path).await?;
let resumed = resume_until_initial_messages(
&mut builder,
&server,
home,
rollout_path,
|initial_messages| {
matches!(
initial_messages,
[
EventMsg::TurnStarted(_),
EventMsg::UserMessage(_),
EventMsg::TokenCount(_),
EventMsg::AgentReasoning(_),
EventMsg::AgentReasoningRawContent(_),
EventMsg::AgentMessage(_),
EventMsg::TokenCount(_),
EventMsg::TurnComplete(_),
]
)
},
)
.await?;
let initial_messages = resumed
.session_configured
.initial_messages

View File

@@ -4,6 +4,7 @@ use std::ffi::OsString;
use std::fs;
use std::net::TcpListener;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
@@ -36,11 +37,13 @@ use core_test_support::skip_if_no_network;
use core_test_support::stdio_server_bin;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use core_test_support::wait_for_event_with_timeout;
use reqwest::Client;
use reqwest::StatusCode;
use serde_json::Value;
use serde_json::json;
use serial_test::serial;
use tempfile::tempdir;
use tokio::net::TcpStream;
use tokio::process::Child;
use tokio::process::Command;
use tokio::time::Instant;
@@ -263,7 +266,7 @@ async fn stdio_image_responses_round_trip() -> anyhow::Result<()> {
let tools_ready_deadline = Instant::now() + Duration::from_secs(30);
loop {
fixture.codex.submit(Op::ListMcpTools).await?;
let list_event = core_test_support::wait_for_event_with_timeout(
let list_event = wait_for_event_with_timeout(
&fixture.codex,
|ev| matches!(ev, EventMsg::McpListToolsResponse(_)),
Duration::from_secs(10),
@@ -851,10 +854,8 @@ async fn streamable_http_tool_call_round_trip() -> anyhow::Result<()> {
Ok(())
}
/// This test writes to a fallback credentials file in CODEX_HOME.
/// Ideally, we wouldn't need to serialize the test but it's much more cumbersome to wire CODEX_HOME through the code.
#[serial(codex_home)]
#[test]
#[serial(mcp_test_value)]
fn streamable_http_with_oauth_round_trip() -> anyhow::Result<()> {
const TEST_STACK_SIZE_BYTES: usize = 8 * 1024 * 1024;
@@ -936,8 +937,8 @@ async fn streamable_http_with_oauth_round_trip_impl() -> anyhow::Result<()> {
wait_for_streamable_http_server(&mut http_server_child, &bind_addr, Duration::from_secs(5))
.await?;
let temp_home = tempdir()?;
let _guard = EnvVarGuard::set("CODEX_HOME", temp_home.path().as_os_str());
let temp_home = Arc::new(tempdir()?);
let _codex_home_guard = EnvVarGuard::set("CODEX_HOME", temp_home.path().as_os_str());
write_fallback_oauth_tokens(
temp_home.path(),
server_name,
@@ -948,10 +949,10 @@ async fn streamable_http_with_oauth_round_trip_impl() -> anyhow::Result<()> {
)?;
let fixture = test_codex()
.with_home(temp_home.clone())
.with_config(move |config| {
// This test seeds OAuth tokens in CODEX_HOME/.credentials.json and
// validates file-backed OAuth loading. Force file mode so Linux
// keyring backend quirks do not affect this test.
// Keep OAuth credentials isolated to this test home because Bazel
// runs the full core suite in one process.
config.mcp_oauth_credentials_store_mode = serde_json::from_value(json!("file"))
.expect("`file` should deserialize as OAuthCredentialsStoreMode");
let mut servers = config.mcp_servers.get().clone();
@@ -984,6 +985,31 @@ async fn streamable_http_with_oauth_round_trip_impl() -> anyhow::Result<()> {
.await?;
let session_model = fixture.session_configured.model.clone();
let tools_ready_deadline = Instant::now() + Duration::from_secs(30);
loop {
fixture.codex.submit(Op::ListMcpTools).await?;
let list_event = wait_for_event_with_timeout(
&fixture.codex,
|ev| matches!(ev, EventMsg::McpListToolsResponse(_)),
Duration::from_secs(10),
)
.await;
let EventMsg::McpListToolsResponse(tool_list) = list_event else {
unreachable!("event guard guarantees McpListToolsResponse");
};
if tool_list.tools.contains_key(&tool_name) {
break;
}
let available_tools: Vec<&str> = tool_list.tools.keys().map(String::as_str).collect();
if Instant::now() >= tools_ready_deadline {
panic!(
"timed out waiting for MCP tool {tool_name} to become available; discovered tools: {available_tools:?}"
);
}
sleep(Duration::from_millis(200)).await;
}
fixture
.codex
.submit(Op::UserTurn {
@@ -1078,7 +1104,8 @@ async fn wait_for_streamable_http_server(
timeout: Duration,
) -> anyhow::Result<()> {
let deadline = Instant::now() + timeout;
let metadata_url = format!("http://{address}/.well-known/oauth-authorization-server/mcp");
let client = Client::builder().no_proxy().build()?;
loop {
if let Some(status) = server_child.try_wait()? {
return Err(anyhow::anyhow!(
@@ -1090,22 +1117,30 @@ async fn wait_for_streamable_http_server(
if remaining.is_zero() {
return Err(anyhow::anyhow!(
"timed out waiting for streamable HTTP server at {address}: deadline reached"
"timed out waiting for streamable HTTP server metadata at {metadata_url}: deadline reached"
));
}
match tokio::time::timeout(remaining, TcpStream::connect(address)).await {
Ok(Ok(_)) => return Ok(()),
match tokio::time::timeout(remaining, client.get(&metadata_url).send()).await {
Ok(Ok(response)) if response.status() == StatusCode::OK => return Ok(()),
Ok(Ok(response)) => {
if Instant::now() >= deadline {
return Err(anyhow::anyhow!(
"timed out waiting for streamable HTTP server metadata at {metadata_url}: HTTP {}",
response.status()
));
}
}
Ok(Err(error)) => {
if Instant::now() >= deadline {
return Err(anyhow::anyhow!(
"timed out waiting for streamable HTTP server at {address}: {error}"
"timed out waiting for streamable HTTP server metadata at {metadata_url}: {error}"
));
}
}
Err(_) => {
return Err(anyhow::anyhow!(
"timed out waiting for streamable HTTP server at {address}: connect call timed out"
"timed out waiting for streamable HTTP server metadata at {metadata_url}: request timed out"
));
}
}

View File

@@ -266,7 +266,7 @@ async fn unicode_output(login: bool) -> anyhow::Result<()> {
mount_shell_responses_with_timeout(
&harness,
call_id,
"git -c alias.say='!printf \"%s\" \"naïve_café\"' say",
"echo 'naïve_café'",
Some(login),
MEDIUM_TIMEOUT,
)

View File

@@ -6,63 +6,40 @@ use codex_core::WireApi;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::Op;
use codex_protocol::user_input::UserInput;
use codex_utils_cargo_bin::find_resource;
use core_test_support::load_sse_fixture;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::sse;
use core_test_support::responses;
use core_test_support::skip_if_no_network;
use core_test_support::streaming_sse::StreamingSseChunk;
use core_test_support::streaming_sse::start_streaming_sse_server;
use core_test_support::test_codex::TestCodex;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::Request;
use wiremock::Respond;
use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path;
fn sse_incomplete() -> String {
load_sse_fixture("tests/fixtures/incomplete_sse.json")
let fixture = find_resource!("tests/fixtures/incomplete_sse.json")
.unwrap_or_else(|err| panic!("failed to resolve incomplete_sse fixture: {err}"));
load_sse_fixture(fixture)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn retries_on_early_close() {
skip_if_no_network!();
let server = MockServer::start().await;
let incomplete_sse = sse_incomplete();
let completed_sse = responses::sse_completed("resp_ok");
struct SeqResponder;
impl Respond for SeqResponder {
fn respond(&self, _: &Request) -> ResponseTemplate {
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
static CALLS: AtomicUsize = AtomicUsize::new(0);
let n = CALLS.fetch_add(1, Ordering::SeqCst);
if n == 0 {
ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(sse_incomplete(), "text/event-stream")
} else {
ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(
sse(vec![
ev_response_created("resp_ok"),
ev_completed("resp_ok"),
]),
"text/event-stream",
)
}
}
}
Mock::given(method("POST"))
.and(path("/v1/responses"))
.respond_with(SeqResponder {})
.expect(2)
.mount(&server)
.await;
let (server, _) = start_streaming_sse_server(vec![
vec![StreamingSseChunk {
gate: None,
body: incomplete_sse,
}],
vec![StreamingSseChunk {
gate: None,
body: completed_sse,
}],
])
.await;
// Configure retry behavior explicitly to avoid mutating process-wide
// environment variables.
@@ -92,7 +69,7 @@ async fn retries_on_early_close() {
.with_config(move |config| {
config.model_provider = model_provider;
})
.build(&server)
.build_with_streaming_server(&server)
.await
.unwrap();
@@ -109,4 +86,13 @@ async fn retries_on_early_close() {
// Wait until TurnComplete (should succeed after retry).
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
let requests = server.requests().await;
assert_eq!(
requests.len(),
2,
"expected retry after incomplete SSE stream"
);
server.shutdown().await;
}

View File

@@ -54,23 +54,26 @@ async fn test_shell_command_approval_triggers_elicitation() {
async fn shell_command_approval_triggers_elicitation() -> anyhow::Result<()> {
// Use a simple, untrusted command that creates a file so we can
// observe a side-effect.
//
// Crossplatform approach: run a tiny Python snippet to touch the file
// using `python3 -c ...` on all platforms.
let workdir_for_shell_function_call = TempDir::new()?;
let created_filename = "created_by_shell_tool.txt";
let created_file = workdir_for_shell_function_call
.path()
.join(created_filename);
let shell_command = vec![
"python3".to_string(),
"-c".to_string(),
format!("import pathlib; pathlib.Path('{created_filename}').touch()"),
];
let expected_shell_command = format_with_current_shell(&format!(
"python3 -c \"import pathlib; pathlib.Path('{created_filename}').touch()\""
));
let shell_command = if cfg!(windows) {
vec![
"New-Item".to_string(),
"-ItemType".to_string(),
"File".to_string(),
"-Path".to_string(),
created_filename.to_string(),
"-Force".to_string(),
]
} else {
vec!["touch".to_string(), created_filename.to_string()]
};
let expected_shell_command =
format_with_current_shell(&shlex::try_join(shell_command.iter().map(String::as_str))?);
let McpHandle {
process: mut mcp_process,

View File

@@ -3,6 +3,7 @@ use std::collections::HashMap;
use std::io::ErrorKind;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use axum::Router;
use axum::body::Body;
@@ -44,6 +45,7 @@ use serde::Deserialize;
use serde_json::json;
use tokio::sync::Mutex;
use tokio::task;
use tokio::time::sleep;
#[derive(Clone)]
struct TestToolServer {
@@ -275,15 +277,25 @@ fn parse_bind_addr() -> Result<SocketAddr, Box<dyn std::error::Error>> {
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let bind_addr = parse_bind_addr()?;
let session_failure_state = SessionFailureState::default();
let listener = match tokio::net::TcpListener::bind(&bind_addr).await {
Ok(listener) => listener,
Err(err) if err.kind() == ErrorKind::PermissionDenied => {
eprintln!(
"failed to bind to {bind_addr}: {err}. make sure the process has network access"
);
return Ok(());
const MAX_BIND_RETRIES: u32 = 20;
const BIND_RETRY_DELAY: Duration = Duration::from_millis(50);
let mut bind_retries = 0;
let listener = loop {
match tokio::net::TcpListener::bind(&bind_addr).await {
Ok(listener) => break listener,
Err(err) if err.kind() == ErrorKind::PermissionDenied => {
eprintln!(
"failed to bind to {bind_addr}: {err}. make sure the process has network access"
);
return Ok(());
}
Err(err) if err.kind() == ErrorKind::AddrInUse && bind_retries < MAX_BIND_RETRIES => {
bind_retries += 1;
sleep(BIND_RETRY_DELAY).await;
}
Err(err) => return Err(err.into()),
}
Err(err) => return Err(err.into()),
};
eprintln!("starting rmcp streamable http test server on http://{bind_addr}/mcp");

View File

@@ -24,8 +24,13 @@ async fn wait_for_pid_file(path: &Path) -> Result<u32> {
for _ in 0..50 {
match fs::read_to_string(path) {
Ok(content) => {
let pid = content
.trim()
let trimmed = content.trim();
if trimmed.is_empty() {
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
let pid = trimmed
.parse::<u32>()
.with_context(|| format!("failed to parse pid from {}", path.display()))?;
return Ok(pid);

View File

@@ -22,7 +22,6 @@ anyhow = { workspace = true }
uds_windows = { workspace = true }
[dev-dependencies]
assert_cmd = { workspace = true }
codex-utils-cargo-bin = { workspace = true }
pretty_assertions = { workspace = true }
tempfile = { workspace = true }

View File

@@ -39,9 +39,13 @@ pub fn run(socket_path: &Path) -> anyhow::Result<()> {
io::copy(&mut handle, &mut stream).context("failed to copy data from stdin to socket")?;
}
stream
.shutdown(Shutdown::Write)
.context("failed to shutdown socket writer")?;
// The peer can close immediately after sending its response. On macOS that
// may surface as NotConnected when we half-close our write side.
if let Err(err) = stream.shutdown(Shutdown::Write)
&& err.kind() != io::ErrorKind::NotConnected
{
return Err(err).context("failed to shutdown socket writer");
}
let stdout_result = stdout_thread
.join()

View File

@@ -1,12 +1,15 @@
use std::io::ErrorKind;
use std::io::Read;
use std::io::Write;
use std::process::Command;
use std::process::Stdio;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
use std::time::Instant;
use anyhow::Context;
use assert_cmd::Command;
use anyhow::anyhow;
use pretty_assertions::assert_eq;
#[cfg(unix)]
@@ -19,6 +22,9 @@ use uds_windows::UnixListener;
fn pipes_stdin_and_stdout_through_socket() -> anyhow::Result<()> {
let dir = tempfile::TempDir::new().context("failed to create temp dir")?;
let socket_path = dir.path().join("socket");
let request = b"request";
let request_path = dir.path().join("request.txt");
std::fs::write(&request_path, request).context("failed to write child stdin fixture")?;
let listener = match UnixListener::bind(&socket_path) {
Ok(listener) => listener,
Err(err) if err.kind() == ErrorKind::PermissionDenied => {
@@ -31,37 +37,103 @@ fn pipes_stdin_and_stdout_through_socket() -> anyhow::Result<()> {
};
let (tx, rx) = mpsc::channel();
let (event_tx, event_rx) = mpsc::channel();
let server_thread = thread::spawn(move || -> anyhow::Result<()> {
let _ = event_tx.send("waiting for accept".to_string());
let (mut connection, _) = listener
.accept()
.context("failed to accept test connection")?;
let mut received = Vec::new();
let _ = event_tx.send("accepted connection".to_string());
let mut received = vec![0; request.len()];
connection
.read_to_end(&mut received)
.read_exact(&mut received)
.context("failed to read data from client")?;
let _ = event_tx.send(format!("read {} bytes", received.len()));
tx.send(received)
.map_err(|_| anyhow::anyhow!("failed to send received bytes to test thread"))?;
.map_err(|_| anyhow!("failed to send received bytes to test thread"))?;
connection
.write_all(b"response")
.context("failed to write response to client")?;
let _ = event_tx.send("wrote response".to_string());
Ok(())
});
Command::new(codex_utils_cargo_bin::cargo_bin("codex-stdio-to-uds")?)
let stdin = std::fs::File::open(&request_path).context("failed to open child stdin fixture")?;
let mut child = Command::new(codex_utils_cargo_bin::cargo_bin("codex-stdio-to-uds")?)
.arg(&socket_path)
.write_stdin("request")
.assert()
.success()
.stdout("response");
.stdin(Stdio::from(stdin))
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.context("failed to spawn codex-stdio-to-uds")?;
let mut child_stdout = child.stdout.take().context("missing child stdout")?;
let mut child_stderr = child.stderr.take().context("missing child stderr")?;
let (stdout_tx, stdout_rx) = mpsc::channel();
let (stderr_tx, stderr_rx) = mpsc::channel();
thread::spawn(move || {
let mut stdout = Vec::new();
let result = child_stdout.read_to_end(&mut stdout).map(|_| stdout);
let _ = stdout_tx.send(result);
});
thread::spawn(move || {
let mut stderr = Vec::new();
let result = child_stderr.read_to_end(&mut stderr).map(|_| stderr);
let _ = stderr_tx.send(result);
});
let mut server_events = Vec::new();
let deadline = Instant::now() + Duration::from_secs(5);
let status = loop {
while let Ok(event) = event_rx.try_recv() {
server_events.push(event);
}
if let Some(status) = child.try_wait().context("failed to poll child status")? {
break status;
}
if Instant::now() >= deadline {
let _ = child.kill();
let _ = child.wait();
let stderr = stderr_rx
.recv_timeout(Duration::from_secs(1))
.context("timed out waiting for child stderr after kill")?
.context("failed to read child stderr")?;
anyhow::bail!(
"codex-stdio-to-uds did not exit in time; server events: {:?}; stderr: {}",
server_events,
String::from_utf8_lossy(&stderr).trim_end()
);
}
thread::sleep(Duration::from_millis(25));
};
let stdout = stdout_rx
.recv_timeout(Duration::from_secs(1))
.context("timed out waiting for child stdout")?
.context("failed to read child stdout")?;
let stderr = stderr_rx
.recv_timeout(Duration::from_secs(1))
.context("timed out waiting for child stderr")?
.context("failed to read child stderr")?;
assert!(
status.success(),
"codex-stdio-to-uds exited with {status}; server events: {:?}; stderr: {}",
server_events,
String::from_utf8_lossy(&stderr).trim_end()
);
assert_eq!(stdout, b"response");
let received = rx
.recv_timeout(Duration::from_secs(1))
.context("server did not receive data in time")?;
assert_eq!(received, b"request");
assert_eq!(received, request);
let server_result = server_thread
.join()
.map_err(|_| anyhow::anyhow!("server thread panicked"))?;
.map_err(|_| anyhow!("server thread panicked"))?;
server_result.context("server failed")?;
Ok(())

View File

@@ -1,18 +0,0 @@
---
source: tui/src/chatwidget/tests.rs
expression: popup
---
Experimental features
Toggle experimental features. Changes are saved to config.toml.
[ ] JavaScript REPL Enable a persistent Node-backed JavaScript REPL for interactive website debugging
and other inline JavaScript execution capabilities. Requires Node >= v22.22.0
installed.
[ ] Multi-agents Ask Codex to spawn multiple agents to parallelize the work and win in efficiency.
[ ] Apps Use a connected ChatGPT App using "$". Install Apps via /apps command. Restart
Codex after enabling.
[ ] Guardian approvals Let a guardian subagent review `on-request` approval prompts instead of showing
them to you, including sandbox escapes and blocked network access.
[ ] Prevent sleep while running Keep your computer awake while Codex is running a thread.
Press space to select or enter to save for next conversation

View File

@@ -1,19 +0,0 @@
---
source: tui/src/chatwidget/tests.rs
expression: popup
---
Experimental features
Toggle experimental features. Changes are saved to config.toml.
[ ] JavaScript REPL Enable a persistent Node-backed JavaScript REPL for interactive website debugging
and other inline JavaScript execution capabilities. Requires Node >= v22.22.0
installed.
[ ] Bubblewrap sandbox Try the new linux sandbox based on bubblewrap.
[ ] Multi-agents Ask Codex to spawn multiple agents to parallelize the work and win in efficiency.
[ ] Apps Use a connected ChatGPT App using "$". Install Apps via /apps command. Restart
Codex after enabling.
[ ] Guardian approvals Let a guardian subagent review `on-request` approval prompts instead of showing
them to you, including sandbox escapes and blocked network access.
[ ] Prevent sleep while running Keep your computer awake while Codex is running a thread.
Press space to select or enter to save for next conversation

View File

@@ -7149,10 +7149,14 @@ async fn experimental_popup_includes_guardian_approval() {
chat.open_experimental_popup();
let popup = render_bottom_popup(&chat, 120);
#[cfg(target_os = "linux")]
assert_snapshot!("experimental_popup_includes_guardian_approval_linux", popup);
#[cfg(not(target_os = "linux"))]
assert_snapshot!("experimental_popup_includes_guardian_approval", popup);
assert!(
popup.contains("Guardian approvals"),
"expected guardian approvals entry in experimental popup, got:\n{popup}"
);
assert!(
popup.contains("blocked network access"),
"expected guardian approvals description in experimental popup, got:\n{popup}"
);
}
#[tokio::test]

View File

@@ -56,7 +56,13 @@ fn echo_sleep_command(marker: &str) -> String {
}
fn split_stdout_stderr_command() -> String {
"printf 'split-out\\n'; printf 'split-err\\n' >&2".to_string()
if cfg!(windows) {
// Keep this in cmd.exe syntax so the test does not depend on a runner-local
// PowerShell/Python setup just to produce deterministic split output.
"(echo split-out)&(>&2 echo split-err)".to_string()
} else {
"printf 'split-out\\n'; printf 'split-err\\n' >&2".to_string()
}
}
async fn collect_split_output(mut output_rx: tokio::sync::mpsc::Receiver<Vec<u8>>) -> Vec<u8> {
@@ -130,52 +136,36 @@ async fn collect_output_until_exit(
}
async fn wait_for_python_repl_ready(
writer: &tokio::sync::mpsc::Sender<Vec<u8>>,
output_rx: &mut tokio::sync::broadcast::Receiver<Vec<u8>>,
timeout_ms: u64,
newline: &str,
ready_marker: &str,
) -> anyhow::Result<Vec<u8>> {
let mut collected = Vec::new();
let marker = "__codex_pty_ready__";
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_millis(timeout_ms);
let probe_window = tokio::time::Duration::from_millis(if cfg!(windows) { 750 } else { 250 });
while tokio::time::Instant::now() < deadline {
writer
.send(format!("print('{marker}'){newline}").into_bytes())
.await?;
let probe_deadline = tokio::time::Instant::now() + probe_window;
loop {
let now = tokio::time::Instant::now();
if now >= deadline || now >= probe_deadline {
break;
}
let remaining = std::cmp::min(
deadline.saturating_duration_since(now),
probe_deadline.saturating_duration_since(now),
);
match tokio::time::timeout(remaining, output_rx.recv()).await {
Ok(Ok(chunk)) => {
collected.extend_from_slice(&chunk);
if String::from_utf8_lossy(&collected).contains(marker) {
return Ok(collected);
}
let now = tokio::time::Instant::now();
let remaining = deadline.saturating_duration_since(now);
match tokio::time::timeout(remaining, output_rx.recv()).await {
Ok(Ok(chunk)) => {
collected.extend_from_slice(&chunk);
if String::from_utf8_lossy(&collected).contains(ready_marker) {
return Ok(collected);
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
anyhow::bail!(
"PTY output closed while waiting for Python REPL readiness: {:?}",
String::from_utf8_lossy(&collected)
);
}
Err(_) => break,
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
anyhow::bail!(
"PTY output closed while waiting for Python REPL readiness: {:?}",
String::from_utf8_lossy(&collected)
);
}
Err(_) => break,
}
}
anyhow::bail!(
"timed out waiting for Python REPL readiness in PTY: {:?}",
"timed out waiting for Python REPL readiness marker {ready_marker:?} in PTY: {:?}",
String::from_utf8_lossy(&collected)
);
}
@@ -254,10 +244,17 @@ async fn pty_python_repl_emits_output_and_exits() -> anyhow::Result<()> {
return Ok(());
};
let ready_marker = "__codex_pty_ready__";
let args = vec![
"-i".to_string(),
"-q".to_string(),
"-c".to_string(),
format!("print('{ready_marker}')"),
];
let env_map: HashMap<String, String> = std::env::vars().collect();
let spawned = spawn_pty_process(
&python,
&[],
&args,
Path::new("."),
&env_map,
&None,
@@ -269,7 +266,7 @@ async fn pty_python_repl_emits_output_and_exits() -> anyhow::Result<()> {
let newline = if cfg!(windows) { "\r\n" } else { "\n" };
let startup_timeout_ms = if cfg!(windows) { 10_000 } else { 5_000 };
let mut output =
wait_for_python_repl_ready(&writer, &mut output_rx, startup_timeout_ms, newline).await?;
wait_for_python_repl_ready(&mut output_rx, startup_timeout_ms, ready_marker).await?;
writer
.send(format!("print('hello from pty'){newline}").into_bytes())
.await?;
@@ -291,21 +288,42 @@ async fn pty_python_repl_emits_output_and_exits() -> anyhow::Result<()> {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn pipe_process_round_trips_stdin() -> anyhow::Result<()> {
let Some(python) = find_python() else {
eprintln!("python not found; skipping pipe_process_round_trips_stdin");
return Ok(());
let (program, args) = if cfg!(windows) {
let cmd = std::env::var("COMSPEC").unwrap_or_else(|_| "cmd.exe".to_string());
(
cmd,
vec![
"/Q".to_string(),
"/V:ON".to_string(),
"/D".to_string(),
"/C".to_string(),
"set /p line= & echo(!line!".to_string(),
],
)
} else {
let Some(python) = find_python() else {
eprintln!("python not found; skipping pipe_process_round_trips_stdin");
return Ok(());
};
(
python,
vec![
"-u".to_string(),
"-c".to_string(),
"import sys; print(sys.stdin.readline().strip());".to_string(),
],
)
};
let args = vec![
"-u".to_string(),
"-c".to_string(),
"import sys; print(sys.stdin.readline().strip());".to_string(),
];
let env_map: HashMap<String, String> = std::env::vars().collect();
let spawned = spawn_pipe_process(&python, &args, Path::new("."), &env_map, &None).await?;
let spawned = spawn_pipe_process(&program, &args, Path::new("."), &env_map, &None).await?;
let (session, output_rx, exit_rx) = combine_spawned_output(spawned);
let writer = session.writer_sender();
writer.send(b"roundtrip\n".to_vec()).await?;
let newline = if cfg!(windows) { "\r\n" } else { "\n" };
writer
.send(format!("roundtrip{newline}").into_bytes())
.await?;
drop(writer);
session.close_stdin();
let (output, code) = collect_output_until_exit(output_rx, exit_rx, 5_000).await;
let text = String::from_utf8_lossy(&output);
@@ -427,21 +445,7 @@ async fn pipe_drains_stderr_without_stdout_activity() -> anyhow::Result<()> {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn pipe_process_can_expose_split_stdout_and_stderr() -> anyhow::Result<()> {
let env_map: HashMap<String, String> = std::env::vars().collect();
let (program, args) = if cfg!(windows) {
let Some(python) = find_python() else {
eprintln!("python not found; skipping pipe_process_can_expose_split_stdout_and_stderr");
return Ok(());
};
(
python,
vec![
"-c".to_string(),
"import sys; sys.stdout.buffer.write(b'split-out\\n'); sys.stdout.buffer.flush(); sys.stderr.buffer.write(b'split-err\\n'); sys.stderr.buffer.flush()".to_string(),
],
)
} else {
shell_command(&split_stdout_stderr_command())
};
let (program, args) = shell_command(&split_stdout_stderr_command());
let spawned =
spawn_pipe_process_no_stdin(&program, &args, Path::new("."), &env_map, &None).await?;
let SpawnedProcess {
@@ -464,8 +468,19 @@ async fn pipe_process_can_expose_split_stdout_and_stderr() -> anyhow::Result<()>
.await
.map_err(|_| anyhow::anyhow!("timed out waiting to drain split stderr"))??;
assert_eq!(stdout, b"split-out\n".to_vec());
assert_eq!(stderr, b"split-err\n".to_vec());
let expected_stdout = if cfg!(windows) {
b"split-out\r\n".to_vec()
} else {
b"split-out\n".to_vec()
};
let expected_stderr = if cfg!(windows) {
b"split-err\r\n".to_vec()
} else {
b"split-err\n".to_vec()
};
assert_eq!(stdout, expected_stdout);
assert_eq!(stderr, expected_stderr);
assert_eq!(code, 0);
Ok(())