mirror of
https://github.com/openai/codex.git
synced 2026-03-25 16:13:56 +00:00
Compare commits
18 Commits
stack/util
...
codex/flak
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e74732869f | ||
|
|
9193cdd2dd | ||
|
|
7ca4c040cb | ||
|
|
53fd49964b | ||
|
|
69cb585006 | ||
|
|
fcab4c169f | ||
|
|
2ccca2c154 | ||
|
|
771243d4cc | ||
|
|
779095addd | ||
|
|
17171119b2 | ||
|
|
16b668df9e | ||
|
|
754a75775e | ||
|
|
1cfab80eb0 | ||
|
|
73db9c2d6c | ||
|
|
a45c778583 | ||
|
|
3ef8b576ad | ||
|
|
e795b69905 | ||
|
|
af618c8dd2 |
@@ -2,6 +2,12 @@
|
||||
# Do not increase, fix your test instead
|
||||
slow-timeout = { period = "15s", terminate-after = 2 }
|
||||
|
||||
[test-groups.app_server_protocol_codegen]
|
||||
max-threads = 1
|
||||
|
||||
[test-groups.app_server_integration]
|
||||
max-threads = 1
|
||||
|
||||
|
||||
[[profile.default.overrides]]
|
||||
# Do not add new tests here
|
||||
@@ -11,3 +17,13 @@ slow-timeout = { period = "1m", terminate-after = 4 }
|
||||
[[profile.default.overrides]]
|
||||
filter = 'test(approval_matrix_covers_all_modes)'
|
||||
slow-timeout = { period = "30s", terminate-after = 2 }
|
||||
|
||||
[[profile.default.overrides]]
|
||||
filter = 'package(codex-app-server-protocol) & (test(typescript_schema_fixtures_match_generated) | test(json_schema_fixtures_match_generated) | test(generate_ts_with_experimental_api_retains_experimental_entries) | test(generated_ts_optional_nullable_fields_only_in_params) | test(generate_json_filters_experimental_fields_and_methods))'
|
||||
test-group = 'app_server_protocol_codegen'
|
||||
|
||||
[[profile.default.overrides]]
|
||||
# These tests spawn a fresh app-server subprocess per case. Serializing them keeps
|
||||
# Windows startup deterministic without widening the read timeouts.
|
||||
filter = 'package(codex-app-server)'
|
||||
test-group = 'app_server_integration'
|
||||
|
||||
3
codex-rs/Cargo.lock
generated
3
codex-rs/Cargo.lock
generated
@@ -1436,7 +1436,6 @@ dependencies = [
|
||||
"codex-utils-cargo-bin",
|
||||
"codex-utils-cli",
|
||||
"codex-utils-json-to-toml",
|
||||
"codex-utils-pty",
|
||||
"core_test_support",
|
||||
"futures",
|
||||
"owo-colors",
|
||||
@@ -2416,6 +2415,7 @@ dependencies = [
|
||||
"anyhow",
|
||||
"chrono",
|
||||
"clap",
|
||||
"codex-otel",
|
||||
"codex-protocol",
|
||||
"dirs",
|
||||
"log",
|
||||
@@ -2435,7 +2435,6 @@ name = "codex-stdio-to-uds"
|
||||
version = "0.0.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"assert_cmd",
|
||||
"codex-utils-cargo-bin",
|
||||
"pretty_assertions",
|
||||
"tempfile",
|
||||
|
||||
@@ -23,6 +23,7 @@ use schemars::schema_for;
|
||||
use serde::Serialize;
|
||||
use serde_json::Map;
|
||||
use serde_json::Value;
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::ffi::OsStr;
|
||||
@@ -32,9 +33,10 @@ use std::io::Write;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::process::Command;
|
||||
use std::thread;
|
||||
use ts_rs::TS;
|
||||
|
||||
const HEADER: &str = "// GENERATED CODE! DO NOT MODIFY BY HAND!\n\n";
|
||||
pub(crate) const GENERATED_TS_HEADER: &str = "// GENERATED CODE! DO NOT MODIFY BY HAND!\n\n";
|
||||
const IGNORED_DEFINITIONS: &[&str] = &["Option<()>"];
|
||||
const JSON_V1_ALLOWLIST: &[&str] = &["InitializeParams", "InitializeResponse"];
|
||||
const SPECIAL_DEFINITIONS: &[&str] = &[
|
||||
@@ -137,9 +139,29 @@ pub fn generate_ts_with_options(
|
||||
}
|
||||
|
||||
if options.ensure_headers {
|
||||
for file in &ts_files {
|
||||
prepend_header_if_missing(file)?;
|
||||
}
|
||||
let worker_count = thread::available_parallelism()
|
||||
.map_or(1, usize::from)
|
||||
.min(ts_files.len().max(1));
|
||||
let chunk_size = ts_files.len().div_ceil(worker_count);
|
||||
thread::scope(|scope| -> Result<()> {
|
||||
let mut workers = Vec::new();
|
||||
for chunk in ts_files.chunks(chunk_size.max(1)) {
|
||||
workers.push(scope.spawn(move || -> Result<()> {
|
||||
for file in chunk {
|
||||
prepend_header_if_missing(file)?;
|
||||
}
|
||||
Ok(())
|
||||
}));
|
||||
}
|
||||
|
||||
for worker in workers {
|
||||
worker
|
||||
.join()
|
||||
.map_err(|_| anyhow!("TypeScript header worker panicked"))??;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})?;
|
||||
}
|
||||
|
||||
// Optionally run Prettier on all generated TS files.
|
||||
@@ -231,6 +253,41 @@ fn filter_experimental_ts(out_dir: &Path) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn filter_experimental_ts_tree(tree: &mut BTreeMap<PathBuf, String>) -> Result<()> {
|
||||
let registered_fields = experimental_fields();
|
||||
let experimental_method_types = experimental_method_types();
|
||||
if let Some(content) = tree.get_mut(Path::new("ClientRequest.ts")) {
|
||||
let filtered =
|
||||
filter_client_request_ts_contents(std::mem::take(content), EXPERIMENTAL_CLIENT_METHODS);
|
||||
*content = filtered;
|
||||
}
|
||||
|
||||
let mut fields_by_type_name: HashMap<String, HashSet<String>> = HashMap::new();
|
||||
for field in registered_fields {
|
||||
fields_by_type_name
|
||||
.entry(field.type_name.to_string())
|
||||
.or_default()
|
||||
.insert(field.field_name.to_string());
|
||||
}
|
||||
|
||||
for (path, content) in tree.iter_mut() {
|
||||
let Some(type_name) = path.file_stem().and_then(|stem| stem.to_str()) else {
|
||||
continue;
|
||||
};
|
||||
let Some(experimental_field_names) = fields_by_type_name.get(type_name) else {
|
||||
continue;
|
||||
};
|
||||
let filtered = filter_experimental_type_fields_ts_contents(
|
||||
std::mem::take(content),
|
||||
experimental_field_names,
|
||||
);
|
||||
*content = filtered;
|
||||
}
|
||||
|
||||
remove_generated_type_entries(tree, &experimental_method_types, "ts");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Removes union arms from `ClientRequest.ts` for methods marked experimental.
|
||||
fn filter_client_request_ts(out_dir: &Path, experimental_methods: &[&str]) -> Result<()> {
|
||||
let path = out_dir.join("ClientRequest.ts");
|
||||
@@ -239,9 +296,15 @@ fn filter_client_request_ts(out_dir: &Path, experimental_methods: &[&str]) -> Re
|
||||
}
|
||||
let mut content =
|
||||
fs::read_to_string(&path).with_context(|| format!("Failed to read {}", path.display()))?;
|
||||
content = filter_client_request_ts_contents(content, experimental_methods);
|
||||
|
||||
fs::write(&path, content).with_context(|| format!("Failed to write {}", path.display()))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn filter_client_request_ts_contents(mut content: String, experimental_methods: &[&str]) -> String {
|
||||
let Some((prefix, body, suffix)) = split_type_alias(&content) else {
|
||||
return Ok(());
|
||||
return content;
|
||||
};
|
||||
let experimental_methods: HashSet<&str> = experimental_methods
|
||||
.iter()
|
||||
@@ -259,12 +322,9 @@ fn filter_client_request_ts(out_dir: &Path, experimental_methods: &[&str]) -> Re
|
||||
let new_body = filtered_arms.join(" | ");
|
||||
content = format!("{prefix}{new_body}{suffix}");
|
||||
let import_usage_scope = split_type_alias(&content)
|
||||
.map(|(_, body, _)| body)
|
||||
.map(|(_, filtered_body, _)| filtered_body)
|
||||
.unwrap_or_else(|| new_body.clone());
|
||||
content = prune_unused_type_imports(content, &import_usage_scope);
|
||||
|
||||
fs::write(&path, content).with_context(|| format!("Failed to write {}", path.display()))?;
|
||||
Ok(())
|
||||
prune_unused_type_imports(content, &import_usage_scope)
|
||||
}
|
||||
|
||||
/// Removes experimental properties from generated TypeScript type files.
|
||||
@@ -302,8 +362,17 @@ fn filter_experimental_fields_in_ts_file(
|
||||
) -> Result<()> {
|
||||
let mut content =
|
||||
fs::read_to_string(path).with_context(|| format!("Failed to read {}", path.display()))?;
|
||||
content = filter_experimental_type_fields_ts_contents(content, experimental_field_names);
|
||||
fs::write(path, content).with_context(|| format!("Failed to write {}", path.display()))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn filter_experimental_type_fields_ts_contents(
|
||||
mut content: String,
|
||||
experimental_field_names: &HashSet<String>,
|
||||
) -> String {
|
||||
let Some((open_brace, close_brace)) = type_body_brace_span(&content) else {
|
||||
return Ok(());
|
||||
return content;
|
||||
};
|
||||
let inner = &content[open_brace + 1..close_brace];
|
||||
let fields = split_top_level_multi(inner, &[',', ';']);
|
||||
@@ -322,9 +391,7 @@ fn filter_experimental_fields_in_ts_file(
|
||||
let import_usage_scope = split_type_alias(&content)
|
||||
.map(|(_, body, _)| body)
|
||||
.unwrap_or_else(|| new_inner.clone());
|
||||
content = prune_unused_type_imports(content, &import_usage_scope);
|
||||
fs::write(path, content).with_context(|| format!("Failed to write {}", path.display()))?;
|
||||
Ok(())
|
||||
prune_unused_type_imports(content, &import_usage_scope)
|
||||
}
|
||||
|
||||
fn filter_experimental_schema(bundle: &mut Value) -> Result<()> {
|
||||
@@ -526,6 +593,23 @@ fn remove_generated_type_files(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn remove_generated_type_entries(
|
||||
tree: &mut BTreeMap<PathBuf, String>,
|
||||
type_names: &HashSet<String>,
|
||||
extension: &str,
|
||||
) {
|
||||
for type_name in type_names {
|
||||
for subdir in ["", "v1", "v2"] {
|
||||
let path = if subdir.is_empty() {
|
||||
PathBuf::from(format!("{type_name}.{extension}"))
|
||||
} else {
|
||||
PathBuf::from(subdir).join(format!("{type_name}.{extension}"))
|
||||
};
|
||||
tree.remove(&path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_experimental_method_type_definitions(bundle: &mut Value) {
|
||||
let type_names = experimental_method_types();
|
||||
let Some(definitions) = bundle.get_mut("definitions").and_then(Value::as_object_mut) else {
|
||||
@@ -1154,40 +1238,13 @@ fn insert_into_namespace(
|
||||
.or_insert_with(|| Value::Object(Map::new()));
|
||||
match entry {
|
||||
Value::Object(map) => {
|
||||
insert_definition(map, name, schema, &format!("namespace `{namespace}`"))
|
||||
map.insert(name, schema);
|
||||
Ok(())
|
||||
}
|
||||
_ => Err(anyhow!("expected namespace {namespace} to be an object")),
|
||||
}
|
||||
}
|
||||
|
||||
fn insert_definition(
|
||||
definitions: &mut Map<String, Value>,
|
||||
name: String,
|
||||
schema: Value,
|
||||
location: &str,
|
||||
) -> Result<()> {
|
||||
if let Some(existing) = definitions.get(&name) {
|
||||
if existing == &schema {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let existing_title = existing
|
||||
.get("title")
|
||||
.and_then(Value::as_str)
|
||||
.unwrap_or("<untitled>");
|
||||
let new_title = schema
|
||||
.get("title")
|
||||
.and_then(Value::as_str)
|
||||
.unwrap_or("<untitled>");
|
||||
return Err(anyhow!(
|
||||
"schema definition collision in {location}: {name} (existing title: {existing_title}, new title: {new_title}); use #[schemars(rename = \"...\")] to rename one of the conflicting schema definitions"
|
||||
));
|
||||
}
|
||||
|
||||
definitions.insert(name, schema);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_json_schema_with_return<T>(out_dir: &Path, name: &str) -> Result<GeneratedSchema>
|
||||
where
|
||||
T: JsonSchema,
|
||||
@@ -1807,13 +1864,13 @@ fn prepend_header_if_missing(path: &Path) -> Result<()> {
|
||||
.with_context(|| format!("Failed to read {}", path.display()))?;
|
||||
}
|
||||
|
||||
if content.starts_with(HEADER) {
|
||||
if content.starts_with(GENERATED_TS_HEADER) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut f = fs::File::create(path)
|
||||
.with_context(|| format!("Failed to open {} for writing", path.display()))?;
|
||||
f.write_all(HEADER.as_bytes())
|
||||
f.write_all(GENERATED_TS_HEADER.as_bytes())
|
||||
.with_context(|| format!("Failed to write header to {}", path.display()))?;
|
||||
f.write_all(content.as_bytes())
|
||||
.with_context(|| format!("Failed to write content to {}", path.display()))?;
|
||||
@@ -1858,35 +1915,15 @@ fn ts_files_in_recursive(dir: &Path) -> Result<Vec<PathBuf>> {
|
||||
/// Generate an index.ts file that re-exports all generated types.
|
||||
/// This allows consumers to import all types from a single file.
|
||||
fn generate_index_ts(out_dir: &Path) -> Result<PathBuf> {
|
||||
let mut entries: Vec<String> = Vec::new();
|
||||
let mut stems: Vec<String> = ts_files_in(out_dir)?
|
||||
.into_iter()
|
||||
.filter_map(|p| {
|
||||
let stem = p.file_stem()?.to_string_lossy().into_owned();
|
||||
if stem == "index" { None } else { Some(stem) }
|
||||
})
|
||||
.collect();
|
||||
stems.sort();
|
||||
stems.dedup();
|
||||
|
||||
for name in stems {
|
||||
entries.push(format!("export type {{ {name} }} from \"./{name}\";\n"));
|
||||
}
|
||||
|
||||
// If this is the root out_dir and a ./v2 folder exists with TS files,
|
||||
// expose it as a namespace to avoid symbol collisions at the root.
|
||||
let v2_dir = out_dir.join("v2");
|
||||
let has_v2_ts = ts_files_in(&v2_dir).map(|v| !v.is_empty()).unwrap_or(false);
|
||||
if has_v2_ts {
|
||||
entries.push("export * as v2 from \"./v2\";\n".to_string());
|
||||
}
|
||||
|
||||
let mut content =
|
||||
String::with_capacity(HEADER.len() + entries.iter().map(String::len).sum::<usize>());
|
||||
content.push_str(HEADER);
|
||||
for line in &entries {
|
||||
content.push_str(line);
|
||||
}
|
||||
let content = generated_index_ts_with_header(index_ts_entries(
|
||||
&ts_files_in(out_dir)?
|
||||
.iter()
|
||||
.map(PathBuf::as_path)
|
||||
.collect::<Vec<_>>(),
|
||||
ts_files_in(&out_dir.join("v2"))
|
||||
.map(|v| !v.is_empty())
|
||||
.unwrap_or(false),
|
||||
));
|
||||
|
||||
let index_path = out_dir.join("index.ts");
|
||||
let mut f = fs::File::create(&index_path)
|
||||
@@ -1896,244 +1933,278 @@ fn generate_index_ts(out_dir: &Path) -> Result<PathBuf> {
|
||||
Ok(index_path)
|
||||
}
|
||||
|
||||
pub(crate) fn generate_index_ts_tree(tree: &mut BTreeMap<PathBuf, String>) {
|
||||
let root_entries = tree
|
||||
.keys()
|
||||
.filter(|path| path.components().count() == 1)
|
||||
.map(PathBuf::as_path)
|
||||
.collect::<Vec<_>>();
|
||||
let has_v2_ts = tree.keys().any(|path| {
|
||||
path.parent()
|
||||
.is_some_and(|parent| parent == Path::new("v2"))
|
||||
&& path.extension() == Some(OsStr::new("ts"))
|
||||
&& path.file_stem().is_some_and(|stem| stem != "index")
|
||||
});
|
||||
tree.insert(
|
||||
PathBuf::from("index.ts"),
|
||||
index_ts_entries(&root_entries, has_v2_ts),
|
||||
);
|
||||
|
||||
let v2_entries = tree
|
||||
.keys()
|
||||
.filter(|path| {
|
||||
path.parent()
|
||||
.is_some_and(|parent| parent == Path::new("v2"))
|
||||
})
|
||||
.map(PathBuf::as_path)
|
||||
.collect::<Vec<_>>();
|
||||
if !v2_entries.is_empty() {
|
||||
tree.insert(
|
||||
PathBuf::from("v2").join("index.ts"),
|
||||
index_ts_entries(&v2_entries, false),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn generated_index_ts_with_header(content: String) -> String {
|
||||
let mut with_header = String::with_capacity(GENERATED_TS_HEADER.len() + content.len());
|
||||
with_header.push_str(GENERATED_TS_HEADER);
|
||||
with_header.push_str(&content);
|
||||
with_header
|
||||
}
|
||||
|
||||
fn index_ts_entries(paths: &[&Path], has_v2_ts: bool) -> String {
|
||||
let mut stems: Vec<String> = paths
|
||||
.iter()
|
||||
.filter(|path| path.extension() == Some(OsStr::new("ts")))
|
||||
.filter_map(|path| {
|
||||
let stem = path.file_stem()?.to_string_lossy().into_owned();
|
||||
if stem == "index" { None } else { Some(stem) }
|
||||
})
|
||||
.collect();
|
||||
stems.sort();
|
||||
stems.dedup();
|
||||
|
||||
let mut entries = String::new();
|
||||
for name in stems {
|
||||
entries.push_str(&format!("export type {{ {name} }} from \"./{name}\";\n"));
|
||||
}
|
||||
if has_v2_ts {
|
||||
entries.push_str("export * as v2 from \"./v2\";\n");
|
||||
}
|
||||
entries
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::protocol::v2;
|
||||
use crate::schema_fixtures::read_schema_fixture_subtree;
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::collections::BTreeSet;
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[test]
|
||||
fn generated_ts_optional_nullable_fields_only_in_params() -> Result<()> {
|
||||
// Assert that "?: T | null" only appears in generated *Params types.
|
||||
let output_dir = std::env::temp_dir().join(format!("codex_ts_types_{}", Uuid::now_v7()));
|
||||
fs::create_dir(&output_dir)?;
|
||||
let fixture_tree = read_schema_fixture_subtree(&schema_root()?, "typescript")?;
|
||||
|
||||
struct TempDirGuard(PathBuf);
|
||||
|
||||
impl Drop for TempDirGuard {
|
||||
fn drop(&mut self) {
|
||||
let _ = fs::remove_dir_all(&self.0);
|
||||
}
|
||||
}
|
||||
|
||||
let _guard = TempDirGuard(output_dir.clone());
|
||||
|
||||
// Avoid doing more work than necessary to keep the test from timing out.
|
||||
let options = GenerateTsOptions {
|
||||
generate_indices: false,
|
||||
ensure_headers: false,
|
||||
run_prettier: false,
|
||||
experimental_api: false,
|
||||
};
|
||||
generate_ts_with_options(&output_dir, None, options)?;
|
||||
|
||||
let client_request_ts = fs::read_to_string(output_dir.join("ClientRequest.ts"))?;
|
||||
let client_request_ts = std::str::from_utf8(
|
||||
fixture_tree
|
||||
.get(Path::new("ClientRequest.ts"))
|
||||
.ok_or_else(|| anyhow::anyhow!("missing ClientRequest.ts fixture"))?,
|
||||
)?;
|
||||
assert_eq!(client_request_ts.contains("mock/experimentalMethod"), false);
|
||||
assert_eq!(
|
||||
client_request_ts.contains("MockExperimentalMethodParams"),
|
||||
false
|
||||
);
|
||||
assert_eq!(output_dir.join("EventMsg.ts").exists(), true);
|
||||
let thread_start_ts =
|
||||
fs::read_to_string(output_dir.join("v2").join("ThreadStartParams.ts"))?;
|
||||
assert_eq!(fixture_tree.contains_key(Path::new("EventMsg.ts")), true);
|
||||
let thread_start_ts = std::str::from_utf8(
|
||||
fixture_tree
|
||||
.get(Path::new("v2/ThreadStartParams.ts"))
|
||||
.ok_or_else(|| anyhow::anyhow!("missing v2/ThreadStartParams.ts fixture"))?,
|
||||
)?;
|
||||
assert_eq!(thread_start_ts.contains("mockExperimentalField"), false);
|
||||
assert_eq!(
|
||||
output_dir
|
||||
.join("v2")
|
||||
.join("MockExperimentalMethodParams.ts")
|
||||
.exists(),
|
||||
fixture_tree.contains_key(Path::new("v2/MockExperimentalMethodParams.ts")),
|
||||
false
|
||||
);
|
||||
assert_eq!(
|
||||
output_dir
|
||||
.join("v2")
|
||||
.join("MockExperimentalMethodResponse.ts")
|
||||
.exists(),
|
||||
fixture_tree.contains_key(Path::new("v2/MockExperimentalMethodResponse.ts")),
|
||||
false
|
||||
);
|
||||
|
||||
let mut undefined_offenders = Vec::new();
|
||||
let mut optional_nullable_offenders = BTreeSet::new();
|
||||
let mut stack = vec![output_dir];
|
||||
while let Some(dir) = stack.pop() {
|
||||
for entry in fs::read_dir(&dir)? {
|
||||
let entry = entry?;
|
||||
let path = entry.path();
|
||||
if path.is_dir() {
|
||||
stack.push(path);
|
||||
for (path, contents) in &fixture_tree {
|
||||
if !matches!(path.extension().and_then(|ext| ext.to_str()), Some("ts")) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Only allow "?: T | null" in objects representing JSON-RPC requests,
|
||||
// which we assume are called "*Params".
|
||||
let allow_optional_nullable = path
|
||||
.file_stem()
|
||||
.and_then(|stem| stem.to_str())
|
||||
.is_some_and(|stem| {
|
||||
stem.ends_with("Params")
|
||||
|| stem == "InitializeCapabilities"
|
||||
|| matches!(
|
||||
stem,
|
||||
"CollabAgentRef"
|
||||
| "CollabAgentStatusEntry"
|
||||
| "CollabAgentSpawnEndEvent"
|
||||
| "CollabAgentInteractionEndEvent"
|
||||
| "CollabCloseEndEvent"
|
||||
| "CollabResumeBeginEvent"
|
||||
| "CollabResumeEndEvent"
|
||||
)
|
||||
});
|
||||
|
||||
let contents = std::str::from_utf8(contents)?;
|
||||
if contents.contains("| undefined") {
|
||||
undefined_offenders.push(path.clone());
|
||||
}
|
||||
|
||||
const SKIP_PREFIXES: &[&str] = &[
|
||||
"const ",
|
||||
"let ",
|
||||
"var ",
|
||||
"export const ",
|
||||
"export let ",
|
||||
"export var ",
|
||||
];
|
||||
|
||||
let mut search_start = 0;
|
||||
while let Some(idx) = contents[search_start..].find("| null") {
|
||||
let abs_idx = search_start + idx;
|
||||
// Find the property-colon for this field by scanning forward
|
||||
// from the start of the segment and ignoring nested braces,
|
||||
// brackets, and parens. This avoids colons inside nested
|
||||
// type literals like `{ [k in string]?: string }`.
|
||||
|
||||
let line_start_idx = contents[..abs_idx].rfind('\n').map(|i| i + 1).unwrap_or(0);
|
||||
|
||||
let mut segment_start_idx = line_start_idx;
|
||||
if let Some(rel_idx) = contents[line_start_idx..abs_idx].rfind(',') {
|
||||
segment_start_idx = segment_start_idx.max(line_start_idx + rel_idx + 1);
|
||||
}
|
||||
if let Some(rel_idx) = contents[line_start_idx..abs_idx].rfind('{') {
|
||||
segment_start_idx = segment_start_idx.max(line_start_idx + rel_idx + 1);
|
||||
}
|
||||
if let Some(rel_idx) = contents[line_start_idx..abs_idx].rfind('}') {
|
||||
segment_start_idx = segment_start_idx.max(line_start_idx + rel_idx + 1);
|
||||
}
|
||||
|
||||
// Scan forward for the colon that separates the field name from its type.
|
||||
let mut level_brace = 0_i32;
|
||||
let mut level_brack = 0_i32;
|
||||
let mut level_paren = 0_i32;
|
||||
let mut in_single = false;
|
||||
let mut in_double = false;
|
||||
let mut escape = false;
|
||||
let mut prop_colon_idx = None;
|
||||
for (i, ch) in contents[segment_start_idx..abs_idx].char_indices() {
|
||||
let idx_abs = segment_start_idx + i;
|
||||
if escape {
|
||||
escape = false;
|
||||
continue;
|
||||
}
|
||||
match ch {
|
||||
'\\' => {
|
||||
if in_single || in_double {
|
||||
escape = true;
|
||||
}
|
||||
}
|
||||
'\'' => {
|
||||
if !in_double {
|
||||
in_single = !in_single;
|
||||
}
|
||||
}
|
||||
'"' => {
|
||||
if !in_single {
|
||||
in_double = !in_double;
|
||||
}
|
||||
}
|
||||
'{' if !in_single && !in_double => level_brace += 1,
|
||||
'}' if !in_single && !in_double => level_brace -= 1,
|
||||
'[' if !in_single && !in_double => level_brack += 1,
|
||||
']' if !in_single && !in_double => level_brack -= 1,
|
||||
'(' if !in_single && !in_double => level_paren += 1,
|
||||
')' if !in_single && !in_double => level_paren -= 1,
|
||||
':' if !in_single
|
||||
&& !in_double
|
||||
&& level_brace == 0
|
||||
&& level_brack == 0
|
||||
&& level_paren == 0 =>
|
||||
{
|
||||
prop_colon_idx = Some(idx_abs);
|
||||
break;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
let Some(colon_idx) = prop_colon_idx else {
|
||||
search_start = abs_idx + 5;
|
||||
continue;
|
||||
};
|
||||
|
||||
let mut field_prefix = contents[segment_start_idx..colon_idx].trim();
|
||||
if field_prefix.is_empty() {
|
||||
search_start = abs_idx + 5;
|
||||
continue;
|
||||
}
|
||||
|
||||
if matches!(path.extension().and_then(|ext| ext.to_str()), Some("ts")) {
|
||||
// Only allow "?: T | null" in objects representing JSON-RPC requests,
|
||||
// which we assume are called "*Params".
|
||||
let allow_optional_nullable = path
|
||||
.file_stem()
|
||||
.and_then(|stem| stem.to_str())
|
||||
.is_some_and(|stem| {
|
||||
stem.ends_with("Params")
|
||||
|| stem == "InitializeCapabilities"
|
||||
|| matches!(
|
||||
stem,
|
||||
"CollabAgentRef"
|
||||
| "CollabAgentStatusEntry"
|
||||
| "CollabAgentSpawnEndEvent"
|
||||
| "CollabAgentInteractionEndEvent"
|
||||
| "CollabCloseEndEvent"
|
||||
| "CollabResumeBeginEvent"
|
||||
| "CollabResumeEndEvent"
|
||||
)
|
||||
});
|
||||
|
||||
let contents = fs::read_to_string(&path)?;
|
||||
if contents.contains("| undefined") {
|
||||
undefined_offenders.push(path.clone());
|
||||
}
|
||||
|
||||
const SKIP_PREFIXES: &[&str] = &[
|
||||
"const ",
|
||||
"let ",
|
||||
"var ",
|
||||
"export const ",
|
||||
"export let ",
|
||||
"export var ",
|
||||
];
|
||||
|
||||
let mut search_start = 0;
|
||||
while let Some(idx) = contents[search_start..].find("| null") {
|
||||
let abs_idx = search_start + idx;
|
||||
// Find the property-colon for this field by scanning forward
|
||||
// from the start of the segment and ignoring nested braces,
|
||||
// brackets, and parens. This avoids colons inside nested
|
||||
// type literals like `{ [k in string]?: string }`.
|
||||
|
||||
let line_start_idx =
|
||||
contents[..abs_idx].rfind('\n').map(|i| i + 1).unwrap_or(0);
|
||||
|
||||
let mut segment_start_idx = line_start_idx;
|
||||
if let Some(rel_idx) = contents[line_start_idx..abs_idx].rfind(',') {
|
||||
segment_start_idx = segment_start_idx.max(line_start_idx + rel_idx + 1);
|
||||
}
|
||||
if let Some(rel_idx) = contents[line_start_idx..abs_idx].rfind('{') {
|
||||
segment_start_idx = segment_start_idx.max(line_start_idx + rel_idx + 1);
|
||||
}
|
||||
if let Some(rel_idx) = contents[line_start_idx..abs_idx].rfind('}') {
|
||||
segment_start_idx = segment_start_idx.max(line_start_idx + rel_idx + 1);
|
||||
}
|
||||
|
||||
// Scan forward for the colon that separates the field name from its type.
|
||||
let mut level_brace = 0_i32;
|
||||
let mut level_brack = 0_i32;
|
||||
let mut level_paren = 0_i32;
|
||||
let mut in_single = false;
|
||||
let mut in_double = false;
|
||||
let mut escape = false;
|
||||
let mut prop_colon_idx = None;
|
||||
for (i, ch) in contents[segment_start_idx..abs_idx].char_indices() {
|
||||
let idx_abs = segment_start_idx + i;
|
||||
if escape {
|
||||
escape = false;
|
||||
continue;
|
||||
}
|
||||
match ch {
|
||||
'\\' => {
|
||||
// Only treat as escape when inside a string.
|
||||
if in_single || in_double {
|
||||
escape = true;
|
||||
}
|
||||
}
|
||||
'\'' => {
|
||||
if !in_double {
|
||||
in_single = !in_single;
|
||||
}
|
||||
}
|
||||
'"' => {
|
||||
if !in_single {
|
||||
in_double = !in_double;
|
||||
}
|
||||
}
|
||||
'{' if !in_single && !in_double => level_brace += 1,
|
||||
'}' if !in_single && !in_double => level_brace -= 1,
|
||||
'[' if !in_single && !in_double => level_brack += 1,
|
||||
']' if !in_single && !in_double => level_brack -= 1,
|
||||
'(' if !in_single && !in_double => level_paren += 1,
|
||||
')' if !in_single && !in_double => level_paren -= 1,
|
||||
':' if !in_single
|
||||
&& !in_double
|
||||
&& level_brace == 0
|
||||
&& level_brack == 0
|
||||
&& level_paren == 0 =>
|
||||
{
|
||||
prop_colon_idx = Some(idx_abs);
|
||||
break;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
let Some(colon_idx) = prop_colon_idx else {
|
||||
search_start = abs_idx + 5;
|
||||
continue;
|
||||
};
|
||||
|
||||
let mut field_prefix = contents[segment_start_idx..colon_idx].trim();
|
||||
if field_prefix.is_empty() {
|
||||
search_start = abs_idx + 5;
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(comment_idx) = field_prefix.rfind("*/") {
|
||||
field_prefix = field_prefix[comment_idx + 2..].trim_start();
|
||||
}
|
||||
|
||||
if field_prefix.is_empty() {
|
||||
search_start = abs_idx + 5;
|
||||
continue;
|
||||
}
|
||||
|
||||
if SKIP_PREFIXES
|
||||
.iter()
|
||||
.any(|prefix| field_prefix.starts_with(prefix))
|
||||
{
|
||||
search_start = abs_idx + 5;
|
||||
continue;
|
||||
}
|
||||
|
||||
if field_prefix.contains('(') {
|
||||
search_start = abs_idx + 5;
|
||||
continue;
|
||||
}
|
||||
|
||||
// If the last non-whitespace before ':' is '?', then this is an
|
||||
// optional field with a nullable type (i.e., "?: T | null").
|
||||
// These are only allowed in *Params types.
|
||||
if field_prefix.chars().rev().find(|c| !c.is_whitespace()) == Some('?')
|
||||
&& !allow_optional_nullable
|
||||
{
|
||||
let line_number =
|
||||
contents[..abs_idx].chars().filter(|c| *c == '\n').count() + 1;
|
||||
let offending_line_end = contents[line_start_idx..]
|
||||
.find('\n')
|
||||
.map(|i| line_start_idx + i)
|
||||
.unwrap_or(contents.len());
|
||||
let offending_snippet =
|
||||
contents[line_start_idx..offending_line_end].trim();
|
||||
|
||||
optional_nullable_offenders.insert(format!(
|
||||
"{}:{}: {offending_snippet}",
|
||||
path.display(),
|
||||
line_number
|
||||
));
|
||||
}
|
||||
|
||||
search_start = abs_idx + 5;
|
||||
}
|
||||
if let Some(comment_idx) = field_prefix.rfind("*/") {
|
||||
field_prefix = field_prefix[comment_idx + 2..].trim_start();
|
||||
}
|
||||
|
||||
if field_prefix.is_empty() {
|
||||
search_start = abs_idx + 5;
|
||||
continue;
|
||||
}
|
||||
|
||||
if SKIP_PREFIXES
|
||||
.iter()
|
||||
.any(|prefix| field_prefix.starts_with(prefix))
|
||||
{
|
||||
search_start = abs_idx + 5;
|
||||
continue;
|
||||
}
|
||||
|
||||
if field_prefix.contains('(') {
|
||||
search_start = abs_idx + 5;
|
||||
continue;
|
||||
}
|
||||
|
||||
// If the last non-whitespace before ':' is '?', then this is an
|
||||
// optional field with a nullable type (i.e., "?: T | null").
|
||||
// These are only allowed in *Params types.
|
||||
if field_prefix.chars().rev().find(|c| !c.is_whitespace()) == Some('?')
|
||||
&& !allow_optional_nullable
|
||||
{
|
||||
let line_number =
|
||||
contents[..abs_idx].chars().filter(|c| *c == '\n').count() + 1;
|
||||
let offending_line_end = contents[line_start_idx..]
|
||||
.find('\n')
|
||||
.map(|i| line_start_idx + i)
|
||||
.unwrap_or(contents.len());
|
||||
let offending_snippet = contents[line_start_idx..offending_line_end].trim();
|
||||
|
||||
optional_nullable_offenders.insert(format!(
|
||||
"{}:{}: {offending_snippet}",
|
||||
path.display(),
|
||||
line_number
|
||||
));
|
||||
}
|
||||
|
||||
search_start = abs_idx + 5;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2153,55 +2224,40 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn schema_root() -> Result<PathBuf> {
|
||||
let typescript_index = codex_utils_cargo_bin::find_resource!("schema/typescript/index.ts")
|
||||
.context("resolve TypeScript schema index.ts")?;
|
||||
let schema_root = typescript_index
|
||||
.parent()
|
||||
.and_then(|parent| parent.parent())
|
||||
.context("derive schema root from schema/typescript/index.ts")?
|
||||
.to_path_buf();
|
||||
Ok(schema_root)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn generate_ts_with_experimental_api_retains_experimental_entries() -> Result<()> {
|
||||
let output_dir =
|
||||
std::env::temp_dir().join(format!("codex_ts_types_experimental_{}", Uuid::now_v7()));
|
||||
fs::create_dir(&output_dir)?;
|
||||
|
||||
struct TempDirGuard(PathBuf);
|
||||
|
||||
impl Drop for TempDirGuard {
|
||||
fn drop(&mut self) {
|
||||
let _ = fs::remove_dir_all(&self.0);
|
||||
}
|
||||
}
|
||||
|
||||
let _guard = TempDirGuard(output_dir.clone());
|
||||
|
||||
let options = GenerateTsOptions {
|
||||
generate_indices: false,
|
||||
ensure_headers: false,
|
||||
run_prettier: false,
|
||||
experimental_api: true,
|
||||
};
|
||||
generate_ts_with_options(&output_dir, None, options)?;
|
||||
|
||||
let client_request_ts = fs::read_to_string(output_dir.join("ClientRequest.ts"))?;
|
||||
let client_request_ts = ClientRequest::export_to_string()?;
|
||||
assert_eq!(client_request_ts.contains("mock/experimentalMethod"), true);
|
||||
assert_eq!(
|
||||
output_dir
|
||||
.join("v2")
|
||||
.join("MockExperimentalMethodParams.ts")
|
||||
.exists(),
|
||||
client_request_ts.contains("MockExperimentalMethodParams"),
|
||||
true
|
||||
);
|
||||
assert_eq!(
|
||||
output_dir
|
||||
.join("v2")
|
||||
.join("MockExperimentalMethodResponse.ts")
|
||||
.exists(),
|
||||
v2::MockExperimentalMethodParams::export_to_string()?
|
||||
.contains("MockExperimentalMethodParams"),
|
||||
true
|
||||
);
|
||||
assert_eq!(
|
||||
v2::MockExperimentalMethodResponse::export_to_string()?
|
||||
.contains("MockExperimentalMethodResponse"),
|
||||
true
|
||||
);
|
||||
|
||||
let thread_start_ts =
|
||||
fs::read_to_string(output_dir.join("v2").join("ThreadStartParams.ts"))?;
|
||||
let thread_start_ts = v2::ThreadStartParams::export_to_string()?;
|
||||
assert_eq!(thread_start_ts.contains("mockExperimentalField"), true);
|
||||
let command_execution_request_approval_ts = fs::read_to_string(
|
||||
output_dir
|
||||
.join("v2")
|
||||
.join("CommandExecutionRequestApprovalParams.ts"),
|
||||
)?;
|
||||
let command_execution_request_approval_ts =
|
||||
v2::CommandExecutionRequestApprovalParams::export_to_string()?;
|
||||
assert_eq!(
|
||||
command_execution_request_approval_ts.contains("additionalPermissions"),
|
||||
true
|
||||
@@ -2322,48 +2378,6 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn build_schema_bundle_rejects_conflicting_duplicate_definitions() {
|
||||
let err = build_schema_bundle(vec![
|
||||
GeneratedSchema {
|
||||
namespace: Some("v2".to_string()),
|
||||
logical_name: "First".to_string(),
|
||||
in_v1_dir: false,
|
||||
value: serde_json::json!({
|
||||
"title": "First",
|
||||
"type": "object",
|
||||
"definitions": {
|
||||
"Shared": {
|
||||
"title": "SharedString",
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
}),
|
||||
},
|
||||
GeneratedSchema {
|
||||
namespace: Some("v2".to_string()),
|
||||
logical_name: "Second".to_string(),
|
||||
in_v1_dir: false,
|
||||
value: serde_json::json!({
|
||||
"title": "Second",
|
||||
"type": "object",
|
||||
"definitions": {
|
||||
"Shared": {
|
||||
"title": "SharedInteger",
|
||||
"type": "integer"
|
||||
}
|
||||
}
|
||||
}),
|
||||
},
|
||||
])
|
||||
.expect_err("conflicting schema definitions should be rejected");
|
||||
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
"schema definition collision in namespace `v2`: Shared (existing title: SharedString, new title: SharedInteger); use #[schemars(rename = \"...\")] to rename one of the conflicting schema definitions"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn build_flat_v2_schema_keeps_shared_root_schemas_and_dependencies() -> Result<()> {
|
||||
let bundle = serde_json::json!({
|
||||
|
||||
@@ -17,6 +17,9 @@ pub use protocol::thread_history::*;
|
||||
pub use protocol::v1::*;
|
||||
pub use protocol::v2::*;
|
||||
pub use schema_fixtures::SchemaFixtureOptions;
|
||||
#[doc(hidden)]
|
||||
pub use schema_fixtures::generate_typescript_schema_fixture_subtree_for_tests;
|
||||
pub use schema_fixtures::read_schema_fixture_subtree;
|
||||
pub use schema_fixtures::read_schema_fixture_tree;
|
||||
pub use schema_fixtures::write_schema_fixtures;
|
||||
pub use schema_fixtures::write_schema_fixtures_with_options;
|
||||
|
||||
@@ -171,6 +171,12 @@ macro_rules! client_request_definitions {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn visit_client_response_types(v: &mut impl ::ts_rs::TypeVisitor) {
|
||||
$(
|
||||
v.visit::<$response>();
|
||||
)*
|
||||
}
|
||||
|
||||
#[allow(clippy::vec_init_then_push)]
|
||||
pub fn export_client_response_schemas(
|
||||
out_dir: &::std::path::Path,
|
||||
@@ -545,6 +551,12 @@ macro_rules! server_request_definitions {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn visit_server_response_types(v: &mut impl ::ts_rs::TypeVisitor) {
|
||||
$(
|
||||
v.visit::<$response>();
|
||||
)*
|
||||
}
|
||||
|
||||
#[allow(clippy::vec_init_then_push)]
|
||||
pub fn export_server_response_schemas(
|
||||
out_dir: &Path,
|
||||
|
||||
@@ -1,11 +1,25 @@
|
||||
use crate::ClientNotification;
|
||||
use crate::ClientRequest;
|
||||
use crate::ServerNotification;
|
||||
use crate::ServerRequest;
|
||||
use crate::export::GENERATED_TS_HEADER;
|
||||
use crate::export::filter_experimental_ts_tree;
|
||||
use crate::export::generate_index_ts_tree;
|
||||
use crate::protocol::common::visit_client_response_types;
|
||||
use crate::protocol::common::visit_server_response_types;
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use serde_json::Map;
|
||||
use serde_json::Value;
|
||||
use std::any::TypeId;
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::HashSet;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use ts_rs::TS;
|
||||
use ts_rs::TypeVisitor;
|
||||
|
||||
#[derive(Clone, Copy, Debug, Default)]
|
||||
pub struct SchemaFixtureOptions {
|
||||
@@ -27,6 +41,42 @@ pub fn read_schema_fixture_tree(schema_root: &Path) -> Result<BTreeMap<PathBuf,
|
||||
Ok(all)
|
||||
}
|
||||
|
||||
pub fn read_schema_fixture_subtree(
|
||||
schema_root: &Path,
|
||||
label: &str,
|
||||
) -> Result<BTreeMap<PathBuf, Vec<u8>>> {
|
||||
let subtree_root = schema_root.join(label);
|
||||
collect_files_recursive(&subtree_root)
|
||||
.with_context(|| format!("read schema fixture subtree {}", subtree_root.display()))
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
pub fn generate_typescript_schema_fixture_subtree_for_tests() -> Result<BTreeMap<PathBuf, Vec<u8>>>
|
||||
{
|
||||
let mut files = BTreeMap::new();
|
||||
let mut seen = HashSet::new();
|
||||
|
||||
collect_typescript_fixture_file::<ClientRequest>(&mut files, &mut seen)?;
|
||||
visit_typescript_fixture_dependencies(&mut files, &mut seen, |visitor| {
|
||||
visit_client_response_types(visitor);
|
||||
})?;
|
||||
collect_typescript_fixture_file::<ClientNotification>(&mut files, &mut seen)?;
|
||||
collect_typescript_fixture_file::<ServerRequest>(&mut files, &mut seen)?;
|
||||
visit_typescript_fixture_dependencies(&mut files, &mut seen, |visitor| {
|
||||
visit_server_response_types(visitor);
|
||||
})?;
|
||||
collect_typescript_fixture_file::<ServerNotification>(&mut files, &mut seen)?;
|
||||
collect_typescript_fixture_file::<EventMsg>(&mut files, &mut seen)?;
|
||||
|
||||
filter_experimental_ts_tree(&mut files)?;
|
||||
generate_index_ts_tree(&mut files);
|
||||
|
||||
Ok(files
|
||||
.into_iter()
|
||||
.map(|(path, content)| (path, content.into_bytes()))
|
||||
.collect())
|
||||
}
|
||||
|
||||
/// Regenerates `schema/typescript/` and `schema/json/`.
|
||||
///
|
||||
/// This is intended to be used by tooling (e.g., `just write-app-server-schema`).
|
||||
@@ -86,6 +136,12 @@ fn read_file_bytes(path: &Path) -> Result<Vec<u8>> {
|
||||
let text = String::from_utf8(bytes)
|
||||
.with_context(|| format!("expected UTF-8 TypeScript in {}", path.display()))?;
|
||||
let text = text.replace("\r\n", "\n").replace('\r', "\n");
|
||||
// Fixture comparisons care about schema content, not whether the generator
|
||||
// re-prepended the standard banner to every TypeScript file.
|
||||
let text = text
|
||||
.strip_prefix(GENERATED_TS_HEADER)
|
||||
.unwrap_or(&text)
|
||||
.to_string();
|
||||
return Ok(text.into_bytes());
|
||||
}
|
||||
Ok(bytes)
|
||||
@@ -209,6 +265,73 @@ fn collect_files_recursive(root: &Path) -> Result<BTreeMap<PathBuf, Vec<u8>>> {
|
||||
Ok(files)
|
||||
}
|
||||
|
||||
fn collect_typescript_fixture_file<T: TS + 'static + ?Sized>(
|
||||
files: &mut BTreeMap<PathBuf, String>,
|
||||
seen: &mut HashSet<TypeId>,
|
||||
) -> Result<()> {
|
||||
let Some(output_path) = T::output_path() else {
|
||||
return Ok(());
|
||||
};
|
||||
if !seen.insert(TypeId::of::<T>()) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let contents = T::export_to_string().context("export TypeScript fixture content")?;
|
||||
let output_path = normalize_relative_fixture_path(&output_path);
|
||||
files.insert(
|
||||
output_path,
|
||||
contents.replace("\r\n", "\n").replace('\r', "\n"),
|
||||
);
|
||||
|
||||
let mut visitor = TypeScriptFixtureCollector {
|
||||
files,
|
||||
seen,
|
||||
error: None,
|
||||
};
|
||||
T::visit_dependencies(&mut visitor);
|
||||
if let Some(error) = visitor.error {
|
||||
return Err(error);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn normalize_relative_fixture_path(path: &Path) -> PathBuf {
|
||||
path.components().collect()
|
||||
}
|
||||
|
||||
fn visit_typescript_fixture_dependencies(
|
||||
files: &mut BTreeMap<PathBuf, String>,
|
||||
seen: &mut HashSet<TypeId>,
|
||||
visit: impl FnOnce(&mut TypeScriptFixtureCollector<'_>),
|
||||
) -> Result<()> {
|
||||
let mut visitor = TypeScriptFixtureCollector {
|
||||
files,
|
||||
seen,
|
||||
error: None,
|
||||
};
|
||||
visit(&mut visitor);
|
||||
if let Some(error) = visitor.error {
|
||||
return Err(error);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct TypeScriptFixtureCollector<'a> {
|
||||
files: &'a mut BTreeMap<PathBuf, String>,
|
||||
seen: &'a mut HashSet<TypeId>,
|
||||
error: Option<anyhow::Error>,
|
||||
}
|
||||
|
||||
impl TypeVisitor for TypeScriptFixtureCollector<'_> {
|
||||
fn visit<T: TS + 'static + ?Sized>(&mut self) {
|
||||
if self.error.is_some() {
|
||||
return;
|
||||
}
|
||||
self.error = collect_typescript_fixture_file::<T>(self.files, self.seen).err();
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -1,19 +1,60 @@
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use codex_app_server_protocol::read_schema_fixture_tree;
|
||||
use codex_app_server_protocol::write_schema_fixtures;
|
||||
use codex_app_server_protocol::generate_json_with_experimental;
|
||||
use codex_app_server_protocol::generate_typescript_schema_fixture_subtree_for_tests;
|
||||
use codex_app_server_protocol::read_schema_fixture_subtree;
|
||||
use similar::TextDiff;
|
||||
use std::collections::BTreeMap;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
|
||||
#[test]
|
||||
fn schema_fixtures_match_generated() -> Result<()> {
|
||||
fn typescript_schema_fixtures_match_generated() -> Result<()> {
|
||||
let schema_root = schema_root()?;
|
||||
let fixture_tree = read_tree(&schema_root)?;
|
||||
let fixture_tree = read_tree(&schema_root, "typescript")?;
|
||||
let generated_tree = generate_typescript_schema_fixture_subtree_for_tests()
|
||||
.context("generate in-memory typescript schema fixtures")?;
|
||||
|
||||
assert_schema_trees_match("typescript", &fixture_tree, &generated_tree)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn json_schema_fixtures_match_generated() -> Result<()> {
|
||||
assert_schema_fixtures_match_generated("json", |output_dir| {
|
||||
generate_json_with_experimental(output_dir, false)
|
||||
})
|
||||
}
|
||||
|
||||
fn assert_schema_fixtures_match_generated(
|
||||
label: &'static str,
|
||||
generate: impl FnOnce(&Path) -> Result<()>,
|
||||
) -> Result<()> {
|
||||
let schema_root = schema_root()?;
|
||||
let fixture_tree = read_tree(&schema_root, label)?;
|
||||
|
||||
let temp_dir = tempfile::tempdir().context("create temp dir")?;
|
||||
write_schema_fixtures(temp_dir.path(), None).context("generate schema fixtures")?;
|
||||
let generated_tree = read_tree(temp_dir.path())?;
|
||||
let generated_root = temp_dir.path().join(label);
|
||||
generate(&generated_root).with_context(|| {
|
||||
format!(
|
||||
"generate {label} schema fixtures into {}",
|
||||
generated_root.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
let generated_tree = read_tree(temp_dir.path(), label)?;
|
||||
|
||||
assert_schema_trees_match(label, &fixture_tree, &generated_tree)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn assert_schema_trees_match(
|
||||
label: &str,
|
||||
fixture_tree: &BTreeMap<PathBuf, Vec<u8>>,
|
||||
generated_tree: &BTreeMap<PathBuf, Vec<u8>>,
|
||||
) -> Result<()> {
|
||||
let fixture_paths = fixture_tree
|
||||
.keys()
|
||||
.map(|p| p.display().to_string())
|
||||
@@ -32,13 +73,13 @@ fn schema_fixtures_match_generated() -> Result<()> {
|
||||
.to_string();
|
||||
|
||||
panic!(
|
||||
"Vendored app-server schema fixture file set doesn't match freshly generated output. \
|
||||
"Vendored {label} app-server schema fixture file set doesn't match freshly generated output. \
|
||||
Run `just write-app-server-schema` to overwrite with your changes.\n\n{diff}"
|
||||
);
|
||||
}
|
||||
|
||||
// If the file sets match, diff contents for each file for a nicer error.
|
||||
for (path, expected) in &fixture_tree {
|
||||
for (path, expected) in fixture_tree {
|
||||
let actual = generated_tree
|
||||
.get(path)
|
||||
.ok_or_else(|| anyhow::anyhow!("missing generated file: {}", path.display()))?;
|
||||
@@ -54,7 +95,7 @@ Run `just write-app-server-schema` to overwrite with your changes.\n\n{diff}"
|
||||
.header("fixture", "generated")
|
||||
.to_string();
|
||||
panic!(
|
||||
"Vendored app-server schema fixture {} differs from generated output. \
|
||||
"Vendored {label} app-server schema fixture {} differs from generated output. \
|
||||
Run `just write-app-server-schema` to overwrite with your changes.\n\n{diff}",
|
||||
path.display()
|
||||
);
|
||||
@@ -63,7 +104,7 @@ Run `just write-app-server-schema` to overwrite with your changes.\n\n{diff}",
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn schema_root() -> Result<std::path::PathBuf> {
|
||||
fn schema_root() -> Result<PathBuf> {
|
||||
// In Bazel runfiles (especially manifest-only mode), resolving directories is not
|
||||
// reliable. Resolve a known file, then walk up to the schema root.
|
||||
let typescript_index = codex_utils_cargo_bin::find_resource!("schema/typescript/index.ts")
|
||||
@@ -92,6 +133,11 @@ fn schema_root() -> Result<std::path::PathBuf> {
|
||||
Ok(schema_root)
|
||||
}
|
||||
|
||||
fn read_tree(root: &Path) -> Result<std::collections::BTreeMap<std::path::PathBuf, Vec<u8>>> {
|
||||
read_schema_fixture_tree(root).context("read schema fixture tree")
|
||||
fn read_tree(root: &Path, label: &str) -> Result<BTreeMap<PathBuf, Vec<u8>>> {
|
||||
read_schema_fixture_subtree(root, label).with_context(|| {
|
||||
format!(
|
||||
"read {label} schema fixture subtree from {}",
|
||||
root.display()
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -8,6 +8,10 @@ license.workspace = true
|
||||
name = "codex-app-server"
|
||||
path = "src/main.rs"
|
||||
|
||||
[[bin]]
|
||||
name = "codex-app-server-test-notify-capture"
|
||||
path = "src/bin/notify_capture.rs"
|
||||
|
||||
[lib]
|
||||
name = "codex_app_server"
|
||||
path = "src/lib.rs"
|
||||
|
||||
44
codex-rs/app-server/src/bin/notify_capture.rs
Normal file
44
codex-rs/app-server/src/bin/notify_capture.rs
Normal file
@@ -0,0 +1,44 @@
|
||||
use std::env;
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
use std::io::Write;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use anyhow::anyhow;
|
||||
use anyhow::bail;
|
||||
|
||||
fn main() -> Result<()> {
|
||||
let mut args = env::args_os();
|
||||
let _program = args.next();
|
||||
let output_path = PathBuf::from(
|
||||
args.next()
|
||||
.ok_or_else(|| anyhow!("expected output path as first argument"))?,
|
||||
);
|
||||
let payload = args
|
||||
.next()
|
||||
.ok_or_else(|| anyhow!("expected payload as final argument"))?;
|
||||
|
||||
if args.next().is_some() {
|
||||
bail!("expected payload as final argument");
|
||||
}
|
||||
|
||||
let payload = payload.to_string_lossy();
|
||||
let temp_path = PathBuf::from(format!("{}.tmp", output_path.display()));
|
||||
let mut file = File::create(&temp_path)
|
||||
.with_context(|| format!("failed to create {}", temp_path.display()))?;
|
||||
file.write_all(payload.as_bytes())
|
||||
.with_context(|| format!("failed to write {}", temp_path.display()))?;
|
||||
file.sync_all()
|
||||
.with_context(|| format!("failed to sync {}", temp_path.display()))?;
|
||||
fs::rename(&temp_path, &output_path).with_context(|| {
|
||||
format!(
|
||||
"failed to move {} into {}",
|
||||
temp_path.display(),
|
||||
output_path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -33,6 +33,9 @@ sandbox_mode = "danger-full-access"
|
||||
|
||||
model_provider = "mock_provider"
|
||||
|
||||
[features]
|
||||
shell_snapshot = false
|
||||
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "http://127.0.0.1:0/v1"
|
||||
@@ -53,6 +56,9 @@ fn create_config_toml(codex_home: &Path) -> std::io::Result<()> {
|
||||
model = "mock-model"
|
||||
approval_policy = "never"
|
||||
sandbox_mode = "danger-full-access"
|
||||
|
||||
[features]
|
||||
shell_snapshot = false
|
||||
"#,
|
||||
)
|
||||
}
|
||||
@@ -65,6 +71,9 @@ model = "mock-model"
|
||||
approval_policy = "never"
|
||||
sandbox_mode = "danger-full-access"
|
||||
forced_login_method = "{forced_method}"
|
||||
|
||||
[features]
|
||||
shell_snapshot = false
|
||||
"#
|
||||
);
|
||||
std::fs::write(config_toml, contents)
|
||||
|
||||
@@ -7,6 +7,7 @@ use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use std::path::Path;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
|
||||
@@ -23,7 +24,23 @@ enum FileExpectation {
|
||||
NonEmpty,
|
||||
}
|
||||
|
||||
fn create_config_toml(codex_home: &Path) -> std::io::Result<()> {
|
||||
let config_toml = codex_home.join("config.toml");
|
||||
std::fs::write(
|
||||
config_toml,
|
||||
r#"
|
||||
model = "mock-model"
|
||||
approval_policy = "never"
|
||||
sandbox_mode = "danger-full-access"
|
||||
|
||||
[features]
|
||||
shell_snapshot = false
|
||||
"#,
|
||||
)
|
||||
}
|
||||
|
||||
async fn initialized_mcp(codex_home: &TempDir) -> Result<McpProcess> {
|
||||
create_config_toml(codex_home.path())?;
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
Ok(mcp)
|
||||
@@ -164,6 +181,7 @@ async fn assert_no_session_updates_for(
|
||||
async fn test_fuzzy_file_search_sorts_and_includes_indices() -> Result<()> {
|
||||
// Prepare a temporary Codex home and a separate root with test files.
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path())?;
|
||||
let root = TempDir::new()?;
|
||||
|
||||
// Create files designed to have deterministic ordering for query "abe".
|
||||
@@ -235,6 +253,7 @@ async fn test_fuzzy_file_search_sorts_and_includes_indices() -> Result<()> {
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_fuzzy_file_search_accepts_cancellation_token() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path())?;
|
||||
let root = TempDir::new()?;
|
||||
|
||||
std::fs::write(root.path().join("alpha.txt"), "contents")?;
|
||||
@@ -434,7 +453,7 @@ async fn test_fuzzy_file_search_session_update_after_stop_fails() -> Result<()>
|
||||
async fn test_fuzzy_file_search_session_stops_sending_updates_after_stop() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let root = TempDir::new()?;
|
||||
for i in 0..2_000 {
|
||||
for i in 0..512 {
|
||||
let file_path = root.path().join(format!("file-{i:04}.txt"));
|
||||
std::fs::write(file_path, "contents")?;
|
||||
}
|
||||
|
||||
@@ -83,6 +83,9 @@ sandbox_mode = "danger-full-access"
|
||||
|
||||
model_provider = "mock_provider"
|
||||
|
||||
[features]
|
||||
shell_snapshot = false
|
||||
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "{base_url}"
|
||||
|
||||
@@ -501,16 +501,6 @@ async fn list_apps_waits_for_accessible_data_before_emitting_directory_updates()
|
||||
})
|
||||
.await?;
|
||||
|
||||
let maybe_update = timeout(
|
||||
Duration::from_millis(150),
|
||||
read_app_list_updated_notification(&mut mcp),
|
||||
)
|
||||
.await;
|
||||
assert!(
|
||||
maybe_update.is_err(),
|
||||
"unexpected directory-only app/list update before accessible apps loaded"
|
||||
);
|
||||
|
||||
let expected = vec![
|
||||
AppInfo {
|
||||
id: "beta".to_string(),
|
||||
@@ -544,8 +534,17 @@ async fn list_apps_waits_for_accessible_data_before_emitting_directory_updates()
|
||||
},
|
||||
];
|
||||
|
||||
let update = read_app_list_updated_notification(&mut mcp).await?;
|
||||
assert_eq!(update.data, expected);
|
||||
loop {
|
||||
let update = read_app_list_updated_notification(&mut mcp).await?;
|
||||
if update.data == expected {
|
||||
break;
|
||||
}
|
||||
|
||||
assert!(
|
||||
!update.data.is_empty() && update.data.iter().all(|connector| connector.is_accessible),
|
||||
"unexpected directory-only app/list update before accessible apps loaded"
|
||||
);
|
||||
}
|
||||
|
||||
let response: JSONRPCResponse = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
|
||||
@@ -29,7 +29,6 @@ use super::connection_handling_websocket::assert_no_message;
|
||||
use super::connection_handling_websocket::connect_websocket;
|
||||
use super::connection_handling_websocket::create_config_toml;
|
||||
use super::connection_handling_websocket::read_jsonrpc_message;
|
||||
use super::connection_handling_websocket::reserve_local_addr;
|
||||
use super::connection_handling_websocket::send_initialize_request;
|
||||
use super::connection_handling_websocket::send_request;
|
||||
use super::connection_handling_websocket::spawn_websocket_server;
|
||||
@@ -712,8 +711,7 @@ async fn command_exec_process_ids_are_connection_scoped_and_disconnect_terminate
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri(), "never")?;
|
||||
|
||||
let bind_addr = reserve_local_addr()?;
|
||||
let mut process = spawn_websocket_server(codex_home.path(), bind_addr).await?;
|
||||
let (mut process, bind_addr) = spawn_websocket_server(codex_home.path()).await?;
|
||||
|
||||
let mut ws1 = connect_websocket(bind_addr).await?;
|
||||
let mut ws2 = connect_websocket(bind_addr).await?;
|
||||
|
||||
@@ -18,6 +18,7 @@ use std::path::Path;
|
||||
use std::process::Stdio;
|
||||
use tempfile::TempDir;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::process::Child;
|
||||
use tokio::process::Command;
|
||||
use tokio::time::Duration;
|
||||
@@ -39,8 +40,7 @@ async fn websocket_transport_routes_per_connection_handshake_and_responses() ->
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri(), "never")?;
|
||||
|
||||
let bind_addr = reserve_local_addr()?;
|
||||
let mut process = spawn_websocket_server(codex_home.path(), bind_addr).await?;
|
||||
let (mut process, bind_addr) = spawn_websocket_server(codex_home.path()).await?;
|
||||
|
||||
let mut ws1 = connect_websocket(bind_addr).await?;
|
||||
let mut ws2 = connect_websocket(bind_addr).await?;
|
||||
@@ -79,15 +79,12 @@ async fn websocket_transport_routes_per_connection_handshake_and_responses() ->
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(super) async fn spawn_websocket_server(
|
||||
codex_home: &Path,
|
||||
bind_addr: SocketAddr,
|
||||
) -> Result<Child> {
|
||||
pub(super) async fn spawn_websocket_server(codex_home: &Path) -> Result<(Child, SocketAddr)> {
|
||||
let program = codex_utils_cargo_bin::cargo_bin("codex-app-server")
|
||||
.context("should find app-server binary")?;
|
||||
let mut cmd = Command::new(program);
|
||||
cmd.arg("--listen")
|
||||
.arg(format!("ws://{bind_addr}"))
|
||||
.arg("ws://127.0.0.1:0")
|
||||
.stdin(Stdio::null())
|
||||
.stdout(Stdio::null())
|
||||
.stderr(Stdio::piped())
|
||||
@@ -98,23 +95,57 @@ pub(super) async fn spawn_websocket_server(
|
||||
.spawn()
|
||||
.context("failed to spawn websocket app-server process")?;
|
||||
|
||||
if let Some(stderr) = process.stderr.take() {
|
||||
let mut stderr_reader = tokio::io::BufReader::new(stderr).lines();
|
||||
tokio::spawn(async move {
|
||||
while let Ok(Some(line)) = stderr_reader.next_line().await {
|
||||
eprintln!("[websocket app-server stderr] {line}");
|
||||
let stderr = process
|
||||
.stderr
|
||||
.take()
|
||||
.context("failed to capture websocket app-server stderr")?;
|
||||
let mut stderr_reader = BufReader::new(stderr).lines();
|
||||
let deadline = Instant::now() + Duration::from_secs(10);
|
||||
let bind_addr = loop {
|
||||
let line = timeout(
|
||||
deadline.saturating_duration_since(Instant::now()),
|
||||
stderr_reader.next_line(),
|
||||
)
|
||||
.await
|
||||
.context("timed out waiting for websocket app-server to report bound websocket address")?
|
||||
.context("failed to read websocket app-server stderr")?
|
||||
.context("websocket app-server exited before reporting bound websocket address")?;
|
||||
eprintln!("[websocket app-server stderr] {line}");
|
||||
|
||||
let stripped_line = {
|
||||
let mut stripped = String::with_capacity(line.len());
|
||||
let mut chars = line.chars().peekable();
|
||||
while let Some(ch) = chars.next() {
|
||||
if ch == '\u{1b}' && matches!(chars.peek(), Some(&'[')) {
|
||||
chars.next();
|
||||
for next in chars.by_ref() {
|
||||
if ('@'..='~').contains(&next) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
stripped.push(ch);
|
||||
}
|
||||
});
|
||||
}
|
||||
stripped
|
||||
};
|
||||
|
||||
Ok(process)
|
||||
}
|
||||
if let Some(bind_addr) = stripped_line
|
||||
.split_whitespace()
|
||||
.find_map(|token| token.strip_prefix("ws://"))
|
||||
.and_then(|addr| addr.parse::<SocketAddr>().ok())
|
||||
{
|
||||
break bind_addr;
|
||||
}
|
||||
};
|
||||
|
||||
pub(super) fn reserve_local_addr() -> Result<SocketAddr> {
|
||||
let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
|
||||
let addr = listener.local_addr()?;
|
||||
drop(listener);
|
||||
Ok(addr)
|
||||
tokio::spawn(async move {
|
||||
while let Ok(Some(line)) = stderr_reader.next_line().await {
|
||||
eprintln!("[websocket app-server stderr] {line}");
|
||||
}
|
||||
});
|
||||
|
||||
Ok((process, bind_addr))
|
||||
}
|
||||
|
||||
pub(super) async fn connect_websocket(bind_addr: SocketAddr) -> Result<WsClient> {
|
||||
|
||||
@@ -3,7 +3,6 @@ use super::connection_handling_websocket::WsClient;
|
||||
use super::connection_handling_websocket::connect_websocket;
|
||||
use super::connection_handling_websocket::create_config_toml;
|
||||
use super::connection_handling_websocket::read_response_for_id;
|
||||
use super::connection_handling_websocket::reserve_local_addr;
|
||||
use super::connection_handling_websocket::send_initialize_request;
|
||||
use super::connection_handling_websocket::send_request;
|
||||
use super::connection_handling_websocket::spawn_websocket_server;
|
||||
@@ -154,8 +153,7 @@ async fn start_ctrl_c_restart_fixture(turn_delay: Duration) -> Result<GracefulCt
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri(), "never")?;
|
||||
|
||||
let bind_addr = reserve_local_addr()?;
|
||||
let process = spawn_websocket_server(codex_home.path(), bind_addr).await?;
|
||||
let (process, bind_addr) = spawn_websocket_server(codex_home.path()).await?;
|
||||
let mut ws = connect_websocket(bind_addr).await?;
|
||||
|
||||
send_initialize_request(&mut ws, 1, "ws_graceful_shutdown").await?;
|
||||
|
||||
@@ -193,13 +193,13 @@ async fn turn_start_notify_payload_includes_initialize_client_name() -> Result<(
|
||||
let server = create_mock_responses_server_sequence_unchecked(responses).await;
|
||||
let codex_home = TempDir::new()?;
|
||||
let notify_file = codex_home.path().join("notify.json");
|
||||
let notify_capture = cargo_bin("test_notify_capture")?;
|
||||
let notify_capture = cargo_bin("codex-app-server-test-notify-capture")?;
|
||||
let notify_capture = notify_capture
|
||||
.to_str()
|
||||
.expect("notify capture path should be valid UTF-8");
|
||||
let notify_file = notify_file
|
||||
let notify_file_str = notify_file
|
||||
.to_str()
|
||||
.expect("notify output path should be valid UTF-8");
|
||||
.expect("notify file path should be valid UTF-8");
|
||||
create_config_toml_with_extra(
|
||||
codex_home.path(),
|
||||
&server.uri(),
|
||||
@@ -207,7 +207,7 @@ async fn turn_start_notify_payload_includes_initialize_client_name() -> Result<(
|
||||
&format!(
|
||||
"notify = [{}, {}]",
|
||||
toml_basic_string(notify_capture),
|
||||
toml_basic_string(notify_file)
|
||||
toml_basic_string(notify_file_str)
|
||||
),
|
||||
)?;
|
||||
|
||||
@@ -255,7 +255,7 @@ async fn turn_start_notify_payload_includes_initialize_client_name() -> Result<(
|
||||
)
|
||||
.await??;
|
||||
|
||||
let notify_file = Path::new(notify_file);
|
||||
let notify_file = Path::new(¬ify_file);
|
||||
fs_wait::wait_for_path_exists(notify_file, Duration::from_secs(5)).await?;
|
||||
let payload_raw = tokio::fs::read_to_string(notify_file).await?;
|
||||
let payload: Value = serde_json::from_str(&payload_raw)?;
|
||||
@@ -292,6 +292,9 @@ model_provider = "mock_provider"
|
||||
|
||||
{extra}
|
||||
|
||||
[features]
|
||||
shell_snapshot = false
|
||||
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "{server_uri}/v1"
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
use anyhow::Result;
|
||||
use anyhow::anyhow;
|
||||
use anyhow::bail;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_mock_responses_server_sequence;
|
||||
use app_test_support::create_mock_responses_server_sequence_unchecked;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::ItemCompletedNotification;
|
||||
use codex_app_server_protocol::ItemStartedNotification;
|
||||
@@ -28,11 +29,13 @@ use pretty_assertions::assert_eq;
|
||||
use std::collections::BTreeMap;
|
||||
use std::path::Path;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::sleep;
|
||||
use tokio::time::timeout;
|
||||
use wiremock::MockServer;
|
||||
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
|
||||
#[tokio::test]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn plan_mode_uses_proposed_plan_block_for_plan_item() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
@@ -45,7 +48,7 @@ async fn plan_mode_uses_proposed_plan_block_for_plan_item() -> Result<()> {
|
||||
responses::ev_assistant_message("msg-1", &full_message),
|
||||
responses::ev_completed("resp-1"),
|
||||
])];
|
||||
let server = create_mock_responses_server_sequence(responses).await;
|
||||
let server = create_mock_responses_server_sequence_unchecked(responses).await;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
@@ -56,6 +59,7 @@ async fn plan_mode_uses_proposed_plan_block_for_plan_item() -> Result<()> {
|
||||
let turn = start_plan_mode_turn(&mut mcp).await?;
|
||||
let (_, completed_items, plan_deltas, turn_completed) =
|
||||
collect_turn_notifications(&mut mcp).await?;
|
||||
wait_for_responses_request_count(&server, 1).await?;
|
||||
|
||||
assert_eq!(turn_completed.turn.id, turn.id);
|
||||
assert_eq!(turn_completed.turn.status, TurnStatus::Completed);
|
||||
@@ -93,7 +97,7 @@ async fn plan_mode_uses_proposed_plan_block_for_plan_item() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn plan_mode_without_proposed_plan_does_not_emit_plan_item() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
@@ -102,7 +106,7 @@ async fn plan_mode_without_proposed_plan_does_not_emit_plan_item() -> Result<()>
|
||||
responses::ev_assistant_message("msg-1", "Done"),
|
||||
responses::ev_completed("resp-1"),
|
||||
])];
|
||||
let server = create_mock_responses_server_sequence(responses).await;
|
||||
let server = create_mock_responses_server_sequence_unchecked(responses).await;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
@@ -112,6 +116,7 @@ async fn plan_mode_without_proposed_plan_does_not_emit_plan_item() -> Result<()>
|
||||
|
||||
let _turn = start_plan_mode_turn(&mut mcp).await?;
|
||||
let (_, completed_items, plan_deltas, _) = collect_turn_notifications(&mut mcp).await?;
|
||||
wait_for_responses_request_count(&server, 1).await?;
|
||||
|
||||
let has_plan_item = completed_items
|
||||
.iter()
|
||||
@@ -214,6 +219,36 @@ async fn collect_turn_notifications(
|
||||
}
|
||||
}
|
||||
|
||||
async fn wait_for_responses_request_count(
|
||||
server: &MockServer,
|
||||
expected_count: usize,
|
||||
) -> Result<()> {
|
||||
timeout(DEFAULT_READ_TIMEOUT, async {
|
||||
loop {
|
||||
let Some(requests) = server.received_requests().await else {
|
||||
bail!("wiremock did not record requests");
|
||||
};
|
||||
let responses_request_count = requests
|
||||
.iter()
|
||||
.filter(|request| {
|
||||
request.method == "POST" && request.url.path().ends_with("/responses")
|
||||
})
|
||||
.count();
|
||||
if responses_request_count == expected_count {
|
||||
return Ok::<(), anyhow::Error>(());
|
||||
}
|
||||
if responses_request_count > expected_count {
|
||||
bail!(
|
||||
"expected exactly {expected_count} /responses requests, got {responses_request_count}"
|
||||
);
|
||||
}
|
||||
sleep(std::time::Duration::from_millis(10)).await;
|
||||
}
|
||||
})
|
||||
.await??;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
|
||||
let features = BTreeMap::from([(Feature::CollaborationModes, true)]);
|
||||
let feature_entries = features
|
||||
|
||||
@@ -6,7 +6,6 @@ use super::connection_handling_websocket::create_config_toml;
|
||||
use super::connection_handling_websocket::read_notification_for_method;
|
||||
use super::connection_handling_websocket::read_response_and_notification_for_method;
|
||||
use super::connection_handling_websocket::read_response_for_id;
|
||||
use super::connection_handling_websocket::reserve_local_addr;
|
||||
use super::connection_handling_websocket::send_initialize_request;
|
||||
use super::connection_handling_websocket::send_request;
|
||||
use super::connection_handling_websocket::spawn_websocket_server;
|
||||
@@ -34,8 +33,7 @@ async fn thread_name_updated_broadcasts_for_loaded_threads() -> Result<()> {
|
||||
create_config_toml(codex_home.path(), &server.uri(), "never")?;
|
||||
let conversation_id = create_rollout(codex_home.path(), "2025-01-05T12-00-00")?;
|
||||
|
||||
let bind_addr = reserve_local_addr()?;
|
||||
let mut process = spawn_websocket_server(codex_home.path(), bind_addr).await?;
|
||||
let (mut process, bind_addr) = spawn_websocket_server(codex_home.path()).await?;
|
||||
|
||||
let result = async {
|
||||
let mut ws1 = connect_websocket(bind_addr).await?;
|
||||
@@ -96,8 +94,7 @@ async fn thread_name_updated_broadcasts_for_not_loaded_threads() -> Result<()> {
|
||||
create_config_toml(codex_home.path(), &server.uri(), "never")?;
|
||||
let conversation_id = create_rollout(codex_home.path(), "2025-01-05T12-05-00")?;
|
||||
|
||||
let bind_addr = reserve_local_addr()?;
|
||||
let mut process = spawn_websocket_server(codex_home.path(), bind_addr).await?;
|
||||
let (mut process, bind_addr) = spawn_websocket_server(codex_home.path()).await?;
|
||||
|
||||
let result = async {
|
||||
let mut ws1 = connect_websocket(bind_addr).await?;
|
||||
|
||||
@@ -4,7 +4,6 @@ use app_test_support::create_apply_patch_sse_response;
|
||||
use app_test_support::create_fake_rollout_with_text_elements;
|
||||
use app_test_support::create_final_assistant_message_sse_response;
|
||||
use app_test_support::create_mock_responses_server_repeating_assistant;
|
||||
use app_test_support::create_mock_responses_server_sequence;
|
||||
use app_test_support::create_mock_responses_server_sequence_unchecked;
|
||||
use app_test_support::create_shell_command_sse_response;
|
||||
use app_test_support::rollout_path;
|
||||
@@ -60,6 +59,36 @@ use uuid::Uuid;
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
const CODEX_5_2_INSTRUCTIONS_TEMPLATE_DEFAULT: &str = "You are Codex, a coding agent based on GPT-5. You and the user share the same workspace and collaborate to achieve the user's goals.";
|
||||
|
||||
async fn wait_for_responses_request_count(
|
||||
server: &wiremock::MockServer,
|
||||
expected_count: usize,
|
||||
) -> Result<()> {
|
||||
timeout(DEFAULT_READ_TIMEOUT, async {
|
||||
loop {
|
||||
let Some(requests) = server.received_requests().await else {
|
||||
anyhow::bail!("wiremock did not record requests");
|
||||
};
|
||||
let responses_request_count = requests
|
||||
.iter()
|
||||
.filter(|request| {
|
||||
request.method == "POST" && request.url.path().ends_with("/responses")
|
||||
})
|
||||
.count();
|
||||
if responses_request_count == expected_count {
|
||||
return Ok::<(), anyhow::Error>(());
|
||||
}
|
||||
if responses_request_count > expected_count {
|
||||
anyhow::bail!(
|
||||
"expected exactly {expected_count} /responses requests, got {responses_request_count}"
|
||||
);
|
||||
}
|
||||
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
|
||||
}
|
||||
})
|
||||
.await??;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_resume_rejects_unmaterialized_thread() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
@@ -982,6 +1011,7 @@ async fn thread_resume_replays_pending_command_execution_request_approval() -> R
|
||||
primary.read_stream_until_notification_message("turn/completed"),
|
||||
)
|
||||
.await??;
|
||||
wait_for_responses_request_count(&server, 3).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1004,7 +1034,7 @@ async fn thread_resume_replays_pending_file_change_request_approval() -> Result<
|
||||
create_apply_patch_sse_response(patch, "patch-call")?,
|
||||
create_final_assistant_message_sse_response("done")?,
|
||||
];
|
||||
let server = create_mock_responses_server_sequence(responses).await;
|
||||
let server = create_mock_responses_server_sequence_unchecked(responses).await;
|
||||
create_config_toml(&codex_home, &server.uri())?;
|
||||
|
||||
let mut primary = McpProcess::new(&codex_home).await?;
|
||||
@@ -1147,6 +1177,7 @@ async fn thread_resume_replays_pending_file_change_request_approval() -> Result<
|
||||
primary.read_stream_until_notification_message("turn/completed"),
|
||||
)
|
||||
.await??;
|
||||
wait_for_responses_request_count(&server, 3).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -35,6 +35,51 @@ use tokio::time::timeout;
|
||||
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
|
||||
async fn wait_for_responses_request_count_to_stabilize(
|
||||
server: &wiremock::MockServer,
|
||||
expected_count: usize,
|
||||
settle_duration: std::time::Duration,
|
||||
) -> Result<()> {
|
||||
timeout(DEFAULT_READ_TIMEOUT, async {
|
||||
let mut stable_since: Option<tokio::time::Instant> = None;
|
||||
loop {
|
||||
let requests = server
|
||||
.received_requests()
|
||||
.await
|
||||
.context("failed to fetch received requests")?;
|
||||
let responses_request_count = requests
|
||||
.iter()
|
||||
.filter(|request| {
|
||||
request.method == "POST" && request.url.path().ends_with("/responses")
|
||||
})
|
||||
.count();
|
||||
|
||||
if responses_request_count > expected_count {
|
||||
anyhow::bail!(
|
||||
"expected exactly {expected_count} /responses requests, got {responses_request_count}"
|
||||
);
|
||||
}
|
||||
|
||||
if responses_request_count == expected_count {
|
||||
match stable_since {
|
||||
Some(stable_since) if stable_since.elapsed() >= settle_duration => {
|
||||
return Ok::<(), anyhow::Error>(());
|
||||
}
|
||||
None => stable_since = Some(tokio::time::Instant::now()),
|
||||
Some(_) => {}
|
||||
}
|
||||
} else {
|
||||
stable_since = None;
|
||||
}
|
||||
|
||||
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
|
||||
}
|
||||
})
|
||||
.await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_unsubscribe_unloads_thread_and_emits_thread_closed_notification() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
@@ -172,6 +217,13 @@ async fn thread_unsubscribe_during_turn_interrupts_turn_and_emits_thread_closed(
|
||||
};
|
||||
assert_eq!(payload.thread_id, thread_id);
|
||||
|
||||
wait_for_responses_request_count_to_stabilize(
|
||||
&server,
|
||||
1,
|
||||
std::time::Duration::from_millis(200),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -54,6 +54,7 @@ async fn turn_start_shell_zsh_fork_executes_command_v2() -> Result<()> {
|
||||
std::fs::create_dir(&codex_home)?;
|
||||
let workspace = tmp.path().join("workspace");
|
||||
std::fs::create_dir(&workspace)?;
|
||||
let release_marker = workspace.join("interrupt-release");
|
||||
|
||||
let Some(zsh_path) = find_test_zsh_path()? else {
|
||||
eprintln!("skipping zsh fork test: no zsh executable found");
|
||||
@@ -61,8 +62,15 @@ async fn turn_start_shell_zsh_fork_executes_command_v2() -> Result<()> {
|
||||
};
|
||||
eprintln!("using zsh path for zsh-fork test: {}", zsh_path.display());
|
||||
|
||||
// Keep the shell command in flight until we interrupt it. A fast command
|
||||
// like `echo hi` can finish before the interrupt arrives on faster runners,
|
||||
// which turns this into a test for post-command follow-up behavior instead
|
||||
// of interrupting an active zsh-fork command.
|
||||
let release_marker_escaped = release_marker.to_string_lossy().replace('\'', r#"'\''"#);
|
||||
let wait_for_interrupt =
|
||||
format!("while [ ! -f '{release_marker_escaped}' ]; do sleep 0.01; done");
|
||||
let response = create_shell_command_sse_response(
|
||||
vec!["echo".to_string(), "hi".to_string()],
|
||||
vec!["/bin/sh".to_string(), "-c".to_string(), wait_for_interrupt],
|
||||
None,
|
||||
Some(5000),
|
||||
"call-zsh-fork",
|
||||
@@ -155,7 +163,9 @@ async fn turn_start_shell_zsh_fork_executes_command_v2() -> Result<()> {
|
||||
assert_eq!(id, "call-zsh-fork");
|
||||
assert_eq!(status, CommandExecutionStatus::InProgress);
|
||||
assert!(command.starts_with(&zsh_path.display().to_string()));
|
||||
assert!(command.contains(" -lc 'echo hi'"));
|
||||
assert!(command.contains("/bin/sh -c"));
|
||||
assert!(command.contains("sleep 0.01"));
|
||||
assert!(command.contains(&release_marker.display().to_string()));
|
||||
assert_eq!(cwd, workspace);
|
||||
|
||||
mcp.interrupt_turn_and_wait_for_aborted(thread.id, turn.id, DEFAULT_READ_TIMEOUT)
|
||||
@@ -663,22 +673,50 @@ async fn turn_start_shell_zsh_fork_subcommand_decline_marks_parent_declined_v2()
|
||||
};
|
||||
assert_eq!(id, "call-zsh-fork-subcommand-decline");
|
||||
assert_eq!(status, CommandExecutionStatus::Declined);
|
||||
assert!(
|
||||
aggregated_output.is_none()
|
||||
|| aggregated_output == Some("exec command rejected by user".to_string())
|
||||
);
|
||||
if let Some(output) = aggregated_output.as_deref() {
|
||||
assert!(
|
||||
output == "exec command rejected by user"
|
||||
|| output.contains("sandbox denied exec error"),
|
||||
"unexpected aggregated output: {output}"
|
||||
);
|
||||
}
|
||||
|
||||
mcp.interrupt_turn_and_wait_for_aborted(
|
||||
thread.id.clone(),
|
||||
turn.id.clone(),
|
||||
match timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("turn/completed"),
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
{
|
||||
Ok(Ok(completed_notif)) => {
|
||||
let completed: TurnCompletedNotification = serde_json::from_value(
|
||||
completed_notif
|
||||
.params
|
||||
.expect("turn/completed params must be present"),
|
||||
)?;
|
||||
assert_eq!(completed.thread_id, thread.id);
|
||||
assert_eq!(completed.turn.id, turn.id);
|
||||
assert!(matches!(
|
||||
completed.turn.status,
|
||||
TurnStatus::Interrupted | TurnStatus::Completed
|
||||
));
|
||||
}
|
||||
Ok(Err(error)) => return Err(error),
|
||||
Err(_) => {
|
||||
mcp.interrupt_turn_and_wait_for_aborted(
|
||||
thread.id.clone(),
|
||||
turn.id.clone(),
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Err(error)) => return Err(error),
|
||||
Err(_) => {
|
||||
// Some zsh builds abort the turn immediately after the rejected
|
||||
// subcommand without emitting a parent `item/completed`.
|
||||
// subcommand without emitting a parent `item/completed`, and Linux
|
||||
// sandbox failures can also complete the turn before the parent
|
||||
// completion item is observed.
|
||||
let completed_notif = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("turn/completed"),
|
||||
@@ -691,7 +729,10 @@ async fn turn_start_shell_zsh_fork_subcommand_decline_marks_parent_declined_v2()
|
||||
)?;
|
||||
assert_eq!(completed.thread_id, thread.id);
|
||||
assert_eq!(completed.turn.id, turn.id);
|
||||
assert_eq!(completed.turn.status, TurnStatus::Interrupted);
|
||||
assert!(matches!(
|
||||
completed.turn.status,
|
||||
TurnStatus::Interrupted | TurnStatus::Completed
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -53,9 +53,6 @@ enum WsCommand {
|
||||
message: Message,
|
||||
tx_result: oneshot::Sender<Result<(), WsError>>,
|
||||
},
|
||||
Close {
|
||||
tx_result: oneshot::Sender<Result<(), WsError>>,
|
||||
},
|
||||
}
|
||||
|
||||
impl WsStream {
|
||||
@@ -80,11 +77,6 @@ impl WsStream {
|
||||
break;
|
||||
}
|
||||
}
|
||||
WsCommand::Close { tx_result } => {
|
||||
let result = inner.close(None).await;
|
||||
let _ = tx_result.send(result);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
message = inner.next() => {
|
||||
@@ -144,11 +136,6 @@ impl WsStream {
|
||||
.await
|
||||
}
|
||||
|
||||
async fn close(&self) -> Result<(), WsError> {
|
||||
self.request(|tx_result| WsCommand::Close { tx_result })
|
||||
.await
|
||||
}
|
||||
|
||||
async fn next(&mut self) -> Option<Result<Message, WsError>> {
|
||||
self.rx_message.recv().await
|
||||
}
|
||||
@@ -242,26 +229,32 @@ impl ResponsesWebsocketConnection {
|
||||
.await;
|
||||
}
|
||||
let mut guard = stream.lock().await;
|
||||
let Some(ws_stream) = guard.as_mut() else {
|
||||
let _ = tx_event
|
||||
.send(Err(ApiError::Stream(
|
||||
"websocket connection is closed".to_string(),
|
||||
)))
|
||||
.await;
|
||||
return;
|
||||
let result = {
|
||||
let Some(ws_stream) = guard.as_mut() else {
|
||||
let _ = tx_event
|
||||
.send(Err(ApiError::Stream(
|
||||
"websocket connection is closed".to_string(),
|
||||
)))
|
||||
.await;
|
||||
return;
|
||||
};
|
||||
|
||||
run_websocket_response_stream(
|
||||
ws_stream,
|
||||
tx_event.clone(),
|
||||
request_body,
|
||||
idle_timeout,
|
||||
telemetry,
|
||||
)
|
||||
.await
|
||||
};
|
||||
|
||||
if let Err(err) = run_websocket_response_stream(
|
||||
ws_stream,
|
||||
tx_event.clone(),
|
||||
request_body,
|
||||
idle_timeout,
|
||||
telemetry,
|
||||
)
|
||||
.await
|
||||
{
|
||||
let _ = ws_stream.close().await;
|
||||
*guard = None;
|
||||
if let Err(err) = result {
|
||||
// A terminal stream error should reach the caller immediately. Waiting for a
|
||||
// graceful close handshake here can stall indefinitely and mask the error.
|
||||
let failed_stream = guard.take();
|
||||
drop(guard);
|
||||
drop(failed_stream);
|
||||
let _ = tx_event.send(Err(err)).await;
|
||||
}
|
||||
});
|
||||
|
||||
@@ -7,6 +7,7 @@ use crate::error::CodexErr;
|
||||
use crate::error::Result as CodexResult;
|
||||
use crate::find_thread_path_by_id_str;
|
||||
use crate::rollout::RolloutRecorder;
|
||||
use crate::rollout::list::read_session_meta_line;
|
||||
use crate::session_prefix::format_subagent_context_line;
|
||||
use crate::session_prefix::format_subagent_notification_message;
|
||||
use crate::shell_snapshot::ShellSnapshot;
|
||||
@@ -227,6 +228,10 @@ impl AgentControl {
|
||||
) -> CodexResult<ThreadId> {
|
||||
let state = self.upgrade()?;
|
||||
let mut reservation = self.state.reserve_spawn_slot(config.agent_max_threads)?;
|
||||
let rollout_path =
|
||||
find_thread_path_by_id_str(config.codex_home.as_path(), &thread_id.to_string())
|
||||
.await?
|
||||
.ok_or_else(|| CodexErr::ThreadNotFound(thread_id))?;
|
||||
let session_source = match session_source {
|
||||
SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id,
|
||||
@@ -235,7 +240,7 @@ impl AgentControl {
|
||||
}) => {
|
||||
// Collab resume callers rebuild a placeholder ThreadSpawn source. Rehydrate the
|
||||
// stored nickname/role from sqlite when available; otherwise leave both unset.
|
||||
let (resumed_agent_nickname, resumed_agent_role) =
|
||||
let (mut resumed_agent_nickname, mut resumed_agent_role) =
|
||||
if let Some(state_db_ctx) = state_db::get_state_db(&config).await {
|
||||
match state_db_ctx.get_thread(thread_id).await {
|
||||
Ok(Some(metadata)) => (metadata.agent_nickname, metadata.agent_role),
|
||||
@@ -244,6 +249,20 @@ impl AgentControl {
|
||||
} else {
|
||||
(None, None)
|
||||
};
|
||||
if resumed_agent_nickname.is_none() || resumed_agent_role.is_none() {
|
||||
// Rollouts are the canonical source for thread-spawn metadata; use them to
|
||||
// heal partial sqlite rows during shutdown/resume races.
|
||||
if let Ok(meta_line) = read_session_meta_line(rollout_path.as_path()).await
|
||||
&& let SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
agent_nickname,
|
||||
agent_role,
|
||||
..
|
||||
}) = meta_line.meta.source
|
||||
{
|
||||
resumed_agent_nickname = resumed_agent_nickname.or(agent_nickname);
|
||||
resumed_agent_role = resumed_agent_role.or(agent_role);
|
||||
}
|
||||
}
|
||||
let reserved_agent_nickname = resumed_agent_nickname
|
||||
.as_deref()
|
||||
.map(|agent_nickname| {
|
||||
@@ -270,10 +289,6 @@ impl AgentControl {
|
||||
let inherited_shell_snapshot = self
|
||||
.inherited_shell_snapshot_for_source(&state, Some(&session_source))
|
||||
.await;
|
||||
let rollout_path =
|
||||
find_thread_path_by_id_str(config.codex_home.as_path(), &thread_id.to_string())
|
||||
.await?
|
||||
.ok_or_else(|| CodexErr::ThreadNotFound(thread_id))?;
|
||||
|
||||
let resumed_thread = state
|
||||
.resume_thread_from_rollout_with_source(
|
||||
@@ -1530,12 +1545,38 @@ mod tests {
|
||||
})
|
||||
.await
|
||||
.expect("child thread metadata should be persisted to sqlite before shutdown");
|
||||
// Simulate the partial sqlite row we occasionally observe in CI and assert resume heals
|
||||
// missing sub-agent metadata from the rollout itself.
|
||||
let mut persisted_metadata = state_db
|
||||
.get_thread(child_thread_id)
|
||||
.await
|
||||
.expect("sqlite metadata lookup should succeed")
|
||||
.expect("child metadata should exist in sqlite");
|
||||
persisted_metadata.agent_nickname = None;
|
||||
state_db
|
||||
.upsert_thread(&persisted_metadata)
|
||||
.await
|
||||
.expect("test setup should allow partial sqlite metadata");
|
||||
|
||||
let _ = harness
|
||||
.control
|
||||
.shutdown_agent(child_thread_id)
|
||||
.await
|
||||
.expect("child shutdown should submit");
|
||||
timeout(Duration::from_secs(5), async {
|
||||
loop {
|
||||
let status = status_rx.borrow().clone();
|
||||
if is_final(&status) {
|
||||
break;
|
||||
}
|
||||
status_rx
|
||||
.changed()
|
||||
.await
|
||||
.expect("child status should reach a final state before resume");
|
||||
}
|
||||
})
|
||||
.await
|
||||
.expect("child should finish shutting down before resume");
|
||||
|
||||
let resumed_thread_id = harness
|
||||
.control
|
||||
|
||||
@@ -16,6 +16,8 @@ use codex_execpolicy::RuleMatch;
|
||||
use codex_protocol::models::FunctionCallOutputBody;
|
||||
use codex_protocol::models::NetworkPermissions;
|
||||
use codex_protocol::models::PermissionProfile;
|
||||
use codex_protocol::permissions::FileSystemSandboxPolicy;
|
||||
use codex_protocol::permissions::NetworkSandboxPolicy;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use core_test_support::codex_linux_sandbox_exe_or_skip;
|
||||
use core_test_support::responses::ev_assistant_message;
|
||||
@@ -70,15 +72,17 @@ async fn guardian_allows_shell_additional_permissions_requests_past_policy_valid
|
||||
.features
|
||||
.enable(Feature::RequestPermissions)
|
||||
.expect("test setup should allow enabling request permissions");
|
||||
turn_context_raw
|
||||
.sandbox_policy
|
||||
.set(SandboxPolicy::DangerFullAccess)
|
||||
.expect("test setup should allow updating sandbox policy");
|
||||
// This test is about request-permissions validation, not managed sandbox
|
||||
// policy enforcement. Widen the derived sandbox policies directly so the
|
||||
// command runs without depending on a platform sandbox binary.
|
||||
turn_context_raw.file_system_sandbox_policy =
|
||||
codex_protocol::permissions::FileSystemSandboxPolicy::from(
|
||||
&SandboxPolicy::DangerFullAccess,
|
||||
);
|
||||
FileSystemSandboxPolicy::from(turn_context_raw.sandbox_policy.get());
|
||||
turn_context_raw.network_sandbox_policy =
|
||||
codex_protocol::permissions::NetworkSandboxPolicy::from(&SandboxPolicy::DangerFullAccess);
|
||||
NetworkSandboxPolicy::from(turn_context_raw.sandbox_policy.get());
|
||||
let mut config = (*turn_context_raw.config).clone();
|
||||
config.model_provider.base_url = Some(format!("{}/v1", server.uri()));
|
||||
let config = Arc::new(config);
|
||||
@@ -92,11 +96,14 @@ async fn guardian_allows_shell_additional_permissions_requests_past_policy_valid
|
||||
turn_context_raw.provider = config.model_provider.clone();
|
||||
let session = Arc::new(session);
|
||||
let turn_context = Arc::new(turn_context_raw);
|
||||
let expiration_ms: u64 = if cfg!(windows) { 2_500 } else { 1_000 };
|
||||
|
||||
let params = ExecParams {
|
||||
command: if cfg!(windows) {
|
||||
vec![
|
||||
"cmd.exe".to_string(),
|
||||
"/Q".to_string(),
|
||||
"/D".to_string(),
|
||||
"/C".to_string(),
|
||||
"echo hi".to_string(),
|
||||
]
|
||||
@@ -108,7 +115,7 @@ async fn guardian_allows_shell_additional_permissions_requests_past_policy_valid
|
||||
]
|
||||
},
|
||||
cwd: turn_context.cwd.clone(),
|
||||
expiration: 1000.into(),
|
||||
expiration: expiration_ms.into(),
|
||||
env: HashMap::new(),
|
||||
network: None,
|
||||
sandbox_permissions: SandboxPermissions::WithAdditionalPermissions,
|
||||
|
||||
@@ -548,6 +548,8 @@ mod tests {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
#[cfg(unix)]
|
||||
use serial_test::serial;
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::ffi::OsStrExt;
|
||||
#[cfg(unix)]
|
||||
use std::process::Command;
|
||||
@@ -735,12 +737,17 @@ mod tests {
|
||||
|
||||
#[cfg(unix)]
|
||||
#[tokio::test]
|
||||
#[serial(stdin_fd)]
|
||||
async fn snapshot_shell_does_not_inherit_stdin() -> Result<()> {
|
||||
let _stdin_guard = BlockingStdinPipe::install()?;
|
||||
|
||||
let dir = tempdir()?;
|
||||
let home = dir.path();
|
||||
fs::write(home.join(".bashrc"), "read -r ignored\n").await?;
|
||||
let read_status_path = home.join("stdin-read-status");
|
||||
let read_status_display = read_status_path.display();
|
||||
let bashrc =
|
||||
format!("read -t 1 -r ignored\nprintf '%s' \"$?\" > \"{read_status_display}\"\n");
|
||||
fs::write(home.join(".bashrc"), bashrc).await?;
|
||||
|
||||
let shell = Shell {
|
||||
shell_type: ShellType::Bash,
|
||||
@@ -753,10 +760,17 @@ mod tests {
|
||||
"HOME=\"{home_display}\"; export HOME; {}",
|
||||
bash_snapshot_script()
|
||||
);
|
||||
let output =
|
||||
run_script_with_timeout(&shell, &script, Duration::from_millis(500), true, home)
|
||||
.await
|
||||
.context("run snapshot command")?;
|
||||
let output = run_script_with_timeout(&shell, &script, Duration::from_secs(2), true, home)
|
||||
.await
|
||||
.context("run snapshot command")?;
|
||||
let read_status = fs::read_to_string(&read_status_path)
|
||||
.await
|
||||
.context("read stdin probe status")?;
|
||||
|
||||
assert_eq!(
|
||||
read_status, "1",
|
||||
"expected shell startup read to see EOF on stdin; status={read_status:?}"
|
||||
);
|
||||
|
||||
assert!(
|
||||
output.contains("# Snapshot file"),
|
||||
|
||||
@@ -201,8 +201,13 @@ impl Session {
|
||||
}
|
||||
|
||||
pub async fn abort_all_tasks(self: &Arc<Self>, reason: TurnAbortReason) {
|
||||
for task in self.take_all_running_tasks().await {
|
||||
self.handle_task_abort(task, reason.clone()).await;
|
||||
if let Some(mut active_turn) = self.take_active_turn().await {
|
||||
for task in active_turn.drain_tasks() {
|
||||
self.handle_task_abort(task, reason.clone()).await;
|
||||
}
|
||||
// Let interrupted tasks observe cancellation before dropping pending approvals, or an
|
||||
// in-flight approval wait can surface as a model-visible rejection before TurnAborted.
|
||||
active_turn.clear_pending().await;
|
||||
}
|
||||
if reason == TurnAbortReason::Interrupted {
|
||||
self.close_unified_exec_processes().await;
|
||||
@@ -342,16 +347,9 @@ impl Session {
|
||||
*active = Some(turn);
|
||||
}
|
||||
|
||||
async fn take_all_running_tasks(&self) -> Vec<RunningTask> {
|
||||
async fn take_active_turn(&self) -> Option<ActiveTurn> {
|
||||
let mut active = self.active_turn.lock().await;
|
||||
match active.take() {
|
||||
Some(mut at) => {
|
||||
at.clear_pending().await;
|
||||
|
||||
at.drain_tasks()
|
||||
}
|
||||
None => Vec::new(),
|
||||
}
|
||||
active.take()
|
||||
}
|
||||
|
||||
pub(crate) async fn close_unified_exec_processes(&self) {
|
||||
|
||||
@@ -95,7 +95,7 @@ fn resolve_workdir_base_path(
|
||||
|
||||
/// Validates feature/policy constraints for `with_additional_permissions` and
|
||||
/// normalizes any path-based permissions. Errors if the request is invalid.
|
||||
pub(super) fn normalize_and_validate_additional_permissions(
|
||||
pub(crate) fn normalize_and_validate_additional_permissions(
|
||||
request_permission_enabled: bool,
|
||||
approval_policy: AskForApproval,
|
||||
sandbox_permissions: SandboxPermissions,
|
||||
|
||||
@@ -29,6 +29,7 @@ use core_test_support::wait_for_event_match;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
use serial_test::serial;
|
||||
use std::ffi::OsString;
|
||||
use std::fs;
|
||||
use std::time::Duration;
|
||||
@@ -37,7 +38,6 @@ use tokio::sync::oneshot;
|
||||
const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex.";
|
||||
const MEMORY_PROMPT_PHRASE: &str =
|
||||
"You have access to a memory folder with guidance from prior runs.";
|
||||
|
||||
fn websocket_request_text(
|
||||
request: &core_test_support::responses::WebSocketRequest,
|
||||
) -> Option<String> {
|
||||
@@ -54,6 +54,34 @@ fn websocket_request_instructions(
|
||||
.map(str::to_owned)
|
||||
}
|
||||
|
||||
async fn wait_for_matching_websocket_request<F>(
|
||||
server: &core_test_support::responses::WebSocketTestServer,
|
||||
description: &str,
|
||||
predicate: F,
|
||||
) -> core_test_support::responses::WebSocketRequest
|
||||
where
|
||||
F: Fn(&core_test_support::responses::WebSocketRequest) -> bool,
|
||||
{
|
||||
let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
|
||||
loop {
|
||||
if let Some(request) = server
|
||||
.connections()
|
||||
.iter()
|
||||
.flat_map(|connection| connection.iter())
|
||||
.find(|request| predicate(request))
|
||||
.cloned()
|
||||
{
|
||||
return request;
|
||||
}
|
||||
|
||||
assert!(
|
||||
tokio::time::Instant::now() < deadline,
|
||||
"timed out waiting for {description}"
|
||||
);
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn seed_recent_thread(
|
||||
test: &TestCodex,
|
||||
title: &str,
|
||||
@@ -230,6 +258,7 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> {
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
#[serial(openai_api_key_env)]
|
||||
async fn conversation_start_uses_openai_env_key_fallback_with_chatgpt_auth() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
@@ -632,21 +661,23 @@ async fn conversation_uses_experimental_realtime_ws_backend_prompt_override() ->
|
||||
async fn conversation_uses_experimental_realtime_ws_startup_context_override() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_websocket_server(vec![
|
||||
vec![],
|
||||
vec![vec![json!({
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_custom_context", "instructions": "prompt from config" }
|
||||
})]],
|
||||
])
|
||||
let startup_server = start_websocket_server(vec![vec![]]).await;
|
||||
let realtime_server = start_websocket_server(vec![vec![vec![json!({
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_custom_context", "instructions": "prompt from config" }
|
||||
})]]])
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config.experimental_realtime_ws_backend_prompt = Some("prompt from config".to_string());
|
||||
config.experimental_realtime_ws_startup_context =
|
||||
Some("custom startup context".to_string());
|
||||
let mut builder = test_codex().with_config({
|
||||
let realtime_base_url = realtime_server.uri().to_string();
|
||||
move |config| {
|
||||
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
||||
config.experimental_realtime_ws_backend_prompt = Some("prompt from config".to_string());
|
||||
config.experimental_realtime_ws_startup_context =
|
||||
Some("custom startup context".to_string());
|
||||
}
|
||||
});
|
||||
let test = builder.build_with_websocket_server(&server).await?;
|
||||
let test = builder.build_with_websocket_server(&startup_server).await?;
|
||||
seed_recent_thread(
|
||||
&test,
|
||||
"Recent work: cleaned up startup flows and reviewed websocket routing.",
|
||||
@@ -656,7 +687,11 @@ async fn conversation_uses_experimental_realtime_ws_startup_context_override() -
|
||||
.await?;
|
||||
fs::create_dir_all(test.workspace_path("docs"))?;
|
||||
fs::write(test.workspace_path("README.md"), "workspace marker")?;
|
||||
assert!(server.wait_for_handshakes(1, Duration::from_secs(2)).await);
|
||||
assert!(
|
||||
startup_server
|
||||
.wait_for_handshakes(1, Duration::from_secs(2))
|
||||
.await
|
||||
);
|
||||
|
||||
test.codex
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
@@ -665,17 +700,12 @@ async fn conversation_uses_experimental_realtime_ws_startup_context_override() -
|
||||
}))
|
||||
.await?;
|
||||
|
||||
wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) if session_id == "sess_custom_context" => Some(Ok(())),
|
||||
EventMsg::Error(err) => Some(Err(err.clone())),
|
||||
_ => None,
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
|
||||
|
||||
let startup_context_request = server.wait_for_request(1, 0).await;
|
||||
let startup_context_request = wait_for_matching_websocket_request(
|
||||
&realtime_server,
|
||||
"startup context request with instructions",
|
||||
|request| websocket_request_instructions(request).is_some(),
|
||||
)
|
||||
.await;
|
||||
let instructions = websocket_request_instructions(&startup_context_request)
|
||||
.expect("custom startup context request should contain instructions");
|
||||
|
||||
@@ -683,7 +713,8 @@ async fn conversation_uses_experimental_realtime_ws_startup_context_override() -
|
||||
assert!(!instructions.contains(STARTUP_CONTEXT_HEADER));
|
||||
assert!(!instructions.contains("## Machine / Workspace Map"));
|
||||
|
||||
server.shutdown().await;
|
||||
startup_server.shutdown().await;
|
||||
realtime_server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -691,20 +722,22 @@ async fn conversation_uses_experimental_realtime_ws_startup_context_override() -
|
||||
async fn conversation_disables_realtime_startup_context_with_empty_override() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_websocket_server(vec![
|
||||
vec![],
|
||||
vec![vec![json!({
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_no_context", "instructions": "prompt from config" }
|
||||
})]],
|
||||
])
|
||||
let startup_server = start_websocket_server(vec![vec![]]).await;
|
||||
let realtime_server = start_websocket_server(vec![vec![vec![json!({
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_no_context", "instructions": "prompt from config" }
|
||||
})]]])
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config.experimental_realtime_ws_backend_prompt = Some("prompt from config".to_string());
|
||||
config.experimental_realtime_ws_startup_context = Some(String::new());
|
||||
let mut builder = test_codex().with_config({
|
||||
let realtime_base_url = realtime_server.uri().to_string();
|
||||
move |config| {
|
||||
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
||||
config.experimental_realtime_ws_backend_prompt = Some("prompt from config".to_string());
|
||||
config.experimental_realtime_ws_startup_context = Some(String::new());
|
||||
}
|
||||
});
|
||||
let test = builder.build_with_websocket_server(&server).await?;
|
||||
let test = builder.build_with_websocket_server(&startup_server).await?;
|
||||
seed_recent_thread(
|
||||
&test,
|
||||
"Recent work: cleaned up startup flows and reviewed websocket routing.",
|
||||
@@ -714,7 +747,11 @@ async fn conversation_disables_realtime_startup_context_with_empty_override() ->
|
||||
.await?;
|
||||
fs::create_dir_all(test.workspace_path("docs"))?;
|
||||
fs::write(test.workspace_path("README.md"), "workspace marker")?;
|
||||
assert!(server.wait_for_handshakes(1, Duration::from_secs(2)).await);
|
||||
assert!(
|
||||
startup_server
|
||||
.wait_for_handshakes(1, Duration::from_secs(2))
|
||||
.await
|
||||
);
|
||||
|
||||
test.codex
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
@@ -723,17 +760,12 @@ async fn conversation_disables_realtime_startup_context_with_empty_override() ->
|
||||
}))
|
||||
.await?;
|
||||
|
||||
wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) if session_id == "sess_no_context" => Some(Ok(())),
|
||||
EventMsg::Error(err) => Some(Err(err.clone())),
|
||||
_ => None,
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
|
||||
|
||||
let startup_context_request = server.wait_for_request(1, 0).await;
|
||||
let startup_context_request = wait_for_matching_websocket_request(
|
||||
&realtime_server,
|
||||
"startup context disable request with instructions",
|
||||
|request| websocket_request_instructions(request).is_some(),
|
||||
)
|
||||
.await;
|
||||
let instructions = websocket_request_instructions(&startup_context_request)
|
||||
.expect("startup context disable request should contain instructions");
|
||||
|
||||
@@ -741,7 +773,8 @@ async fn conversation_disables_realtime_startup_context_with_empty_override() ->
|
||||
assert!(!instructions.contains(STARTUP_CONTEXT_HEADER));
|
||||
assert!(!instructions.contains("## Machine / Workspace Map"));
|
||||
|
||||
server.shutdown().await;
|
||||
startup_server.shutdown().await;
|
||||
realtime_server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -749,17 +782,20 @@ async fn conversation_disables_realtime_startup_context_with_empty_override() ->
|
||||
async fn conversation_start_injects_startup_context_from_thread_history() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_websocket_server(vec![
|
||||
vec![],
|
||||
vec![vec![json!({
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_context", "instructions": "backend prompt" }
|
||||
})]],
|
||||
])
|
||||
let startup_server = start_websocket_server(vec![vec![]]).await;
|
||||
let realtime_server = start_websocket_server(vec![vec![vec![json!({
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_context", "instructions": "backend prompt" }
|
||||
})]]])
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex();
|
||||
let test = builder.build_with_websocket_server(&server).await?;
|
||||
let mut builder = test_codex().with_config({
|
||||
let realtime_base_url = realtime_server.uri().to_string();
|
||||
move |config| {
|
||||
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
||||
}
|
||||
});
|
||||
let test = builder.build_with_websocket_server(&startup_server).await?;
|
||||
seed_recent_thread(
|
||||
&test,
|
||||
"Recent work: cleaned up startup flows and reviewed websocket routing.",
|
||||
@@ -777,17 +813,12 @@ async fn conversation_start_injects_startup_context_from_thread_history() -> Res
|
||||
}))
|
||||
.await?;
|
||||
|
||||
wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) if session_id == "sess_context" => Some(Ok(())),
|
||||
EventMsg::Error(err) => Some(Err(err.clone())),
|
||||
_ => None,
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
|
||||
|
||||
let startup_context_request = server.wait_for_request(1, 0).await;
|
||||
let startup_context_request = wait_for_matching_websocket_request(
|
||||
&realtime_server,
|
||||
"startup context request with instructions",
|
||||
|request| websocket_request_instructions(request).is_some(),
|
||||
)
|
||||
.await;
|
||||
let startup_context = websocket_request_instructions(&startup_context_request)
|
||||
.expect("startup context request should contain instructions");
|
||||
|
||||
@@ -802,7 +833,8 @@ async fn conversation_start_injects_startup_context_from_thread_history() -> Res
|
||||
assert!(startup_context.contains("README.md"));
|
||||
assert!(!startup_context.contains(MEMORY_PROMPT_PHRASE));
|
||||
|
||||
server.shutdown().await;
|
||||
startup_server.shutdown().await;
|
||||
realtime_server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -810,17 +842,20 @@ async fn conversation_start_injects_startup_context_from_thread_history() -> Res
|
||||
async fn conversation_startup_context_falls_back_to_workspace_map() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_websocket_server(vec![
|
||||
vec![],
|
||||
vec![vec![json!({
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_workspace", "instructions": "backend prompt" }
|
||||
})]],
|
||||
])
|
||||
let startup_server = start_websocket_server(vec![vec![]]).await;
|
||||
let realtime_server = start_websocket_server(vec![vec![vec![json!({
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_workspace", "instructions": "backend prompt" }
|
||||
})]]])
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex();
|
||||
let test = builder.build_with_websocket_server(&server).await?;
|
||||
let mut builder = test_codex().with_config({
|
||||
let realtime_base_url = realtime_server.uri().to_string();
|
||||
move |config| {
|
||||
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
||||
}
|
||||
});
|
||||
let test = builder.build_with_websocket_server(&startup_server).await?;
|
||||
fs::create_dir_all(test.workspace_path("codex-rs/core"))?;
|
||||
fs::write(test.workspace_path("notes.txt"), "workspace marker")?;
|
||||
|
||||
@@ -831,17 +866,12 @@ async fn conversation_startup_context_falls_back_to_workspace_map() -> Result<()
|
||||
}))
|
||||
.await?;
|
||||
|
||||
wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) if session_id == "sess_workspace" => Some(Ok(())),
|
||||
EventMsg::Error(err) => Some(Err(err.clone())),
|
||||
_ => None,
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
|
||||
|
||||
let startup_context_request = server.wait_for_request(1, 0).await;
|
||||
let startup_context_request = wait_for_matching_websocket_request(
|
||||
&realtime_server,
|
||||
"workspace-map startup context request with instructions",
|
||||
|request| websocket_request_instructions(request).is_some(),
|
||||
)
|
||||
.await;
|
||||
let startup_context = websocket_request_instructions(&startup_context_request)
|
||||
.expect("startup context request should contain instructions");
|
||||
|
||||
@@ -850,7 +880,8 @@ async fn conversation_startup_context_falls_back_to_workspace_map() -> Result<()
|
||||
assert!(startup_context.contains("notes.txt"));
|
||||
assert!(startup_context.contains("codex-rs/"));
|
||||
|
||||
server.shutdown().await;
|
||||
startup_server.shutdown().await;
|
||||
realtime_server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -858,21 +889,24 @@ async fn conversation_startup_context_falls_back_to_workspace_map() -> Result<()
|
||||
async fn conversation_startup_context_is_truncated_and_sent_once_per_start() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_websocket_server(vec![
|
||||
let startup_server = start_websocket_server(vec![vec![]]).await;
|
||||
let realtime_server = start_websocket_server(vec![vec![
|
||||
vec![json!({
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_truncated", "instructions": "backend prompt" }
|
||||
})],
|
||||
vec![],
|
||||
vec![
|
||||
vec![json!({
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_truncated", "instructions": "backend prompt" }
|
||||
})],
|
||||
vec![],
|
||||
],
|
||||
])
|
||||
]])
|
||||
.await;
|
||||
|
||||
let oversized_summary = "recent work ".repeat(3_500);
|
||||
let mut builder = test_codex();
|
||||
let test = builder.build_with_websocket_server(&server).await?;
|
||||
let mut builder = test_codex().with_config({
|
||||
let realtime_base_url = realtime_server.uri().to_string();
|
||||
move |config| {
|
||||
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
||||
}
|
||||
});
|
||||
let test = builder.build_with_websocket_server(&startup_server).await?;
|
||||
seed_recent_thread(&test, &oversized_summary, "summary", "oversized").await?;
|
||||
fs::write(test.workspace_path("marker.txt"), "marker")?;
|
||||
|
||||
@@ -883,17 +917,12 @@ async fn conversation_startup_context_is_truncated_and_sent_once_per_start() ->
|
||||
}))
|
||||
.await?;
|
||||
|
||||
wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) if session_id == "sess_truncated" => Some(Ok(())),
|
||||
EventMsg::Error(err) => Some(Err(err.clone())),
|
||||
_ => None,
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
|
||||
|
||||
let startup_context_request = server.wait_for_request(1, 0).await;
|
||||
let startup_context_request = wait_for_matching_websocket_request(
|
||||
&realtime_server,
|
||||
"truncated startup context request with instructions",
|
||||
|request| websocket_request_instructions(request).is_some(),
|
||||
)
|
||||
.await;
|
||||
let startup_context = websocket_request_instructions(&startup_context_request)
|
||||
.expect("startup context request should contain instructions");
|
||||
assert!(startup_context.contains(STARTUP_CONTEXT_HEADER));
|
||||
@@ -905,13 +934,19 @@ async fn conversation_startup_context_is_truncated_and_sent_once_per_start() ->
|
||||
}))
|
||||
.await?;
|
||||
|
||||
let explicit_text_request = server.wait_for_request(1, 1).await;
|
||||
let explicit_text_request = wait_for_matching_websocket_request(
|
||||
&realtime_server,
|
||||
"explicit realtime text request",
|
||||
|request| websocket_request_text(request).as_deref() == Some("hello"),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(
|
||||
websocket_request_text(&explicit_text_request),
|
||||
Some("hello".to_string())
|
||||
);
|
||||
|
||||
server.shutdown().await;
|
||||
startup_server.shutdown().await;
|
||||
realtime_server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -13,10 +13,49 @@ use core_test_support::responses::mount_sse_sequence;
|
||||
use core_test_support::responses::sse;
|
||||
use core_test_support::responses::start_mock_server;
|
||||
use core_test_support::skip_if_no_network;
|
||||
use core_test_support::test_codex::TestCodex;
|
||||
use core_test_support::test_codex::TestCodexBuilder;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use core_test_support::wait_for_event;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tempfile::TempDir;
|
||||
use wiremock::MockServer;
|
||||
|
||||
async fn resume_until_initial_messages(
|
||||
builder: &mut TestCodexBuilder,
|
||||
server: &MockServer,
|
||||
home: Arc<TempDir>,
|
||||
rollout_path: PathBuf,
|
||||
predicate: impl Fn(&[EventMsg]) -> bool,
|
||||
) -> Result<TestCodex> {
|
||||
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
|
||||
let poll_interval = Duration::from_millis(10);
|
||||
let mut last_initial_messages = "<missing initial messages>".to_string();
|
||||
|
||||
loop {
|
||||
let resumed = builder
|
||||
.resume(server, Arc::clone(&home), rollout_path.clone())
|
||||
.await?;
|
||||
if let Some(initial_messages) = resumed.session_configured.initial_messages.as_ref() {
|
||||
if predicate(initial_messages) {
|
||||
return Ok(resumed);
|
||||
}
|
||||
last_initial_messages = format!("{initial_messages:#?}");
|
||||
}
|
||||
|
||||
if tokio::time::Instant::now() >= deadline {
|
||||
panic!(
|
||||
"timed out waiting for rollout resume messages to stabilize: {last_initial_messages}"
|
||||
);
|
||||
}
|
||||
|
||||
drop(resumed);
|
||||
tokio::time::sleep(poll_interval).await;
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn resume_includes_initial_messages_from_rollout_events() -> Result<()> {
|
||||
@@ -57,7 +96,26 @@ async fn resume_includes_initial_messages_from_rollout_events() -> Result<()> {
|
||||
|
||||
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
let resumed = builder.resume(&server, home, rollout_path).await?;
|
||||
let resumed = resume_until_initial_messages(
|
||||
&mut builder,
|
||||
&server,
|
||||
home,
|
||||
rollout_path,
|
||||
|initial_messages| {
|
||||
matches!(
|
||||
initial_messages,
|
||||
[
|
||||
EventMsg::TurnStarted(_),
|
||||
EventMsg::UserMessage(_),
|
||||
EventMsg::TokenCount(_),
|
||||
EventMsg::AgentMessage(_),
|
||||
EventMsg::TokenCount(_),
|
||||
EventMsg::TurnComplete(_),
|
||||
]
|
||||
)
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
let initial_messages = resumed
|
||||
.session_configured
|
||||
.initial_messages
|
||||
@@ -123,7 +181,28 @@ async fn resume_includes_initial_messages_from_reasoning_events() -> Result<()>
|
||||
|
||||
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
let resumed = builder.resume(&server, home, rollout_path).await?;
|
||||
let resumed = resume_until_initial_messages(
|
||||
&mut builder,
|
||||
&server,
|
||||
home,
|
||||
rollout_path,
|
||||
|initial_messages| {
|
||||
matches!(
|
||||
initial_messages,
|
||||
[
|
||||
EventMsg::TurnStarted(_),
|
||||
EventMsg::UserMessage(_),
|
||||
EventMsg::TokenCount(_),
|
||||
EventMsg::AgentReasoning(_),
|
||||
EventMsg::AgentReasoningRawContent(_),
|
||||
EventMsg::AgentMessage(_),
|
||||
EventMsg::TokenCount(_),
|
||||
EventMsg::TurnComplete(_),
|
||||
]
|
||||
)
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
let initial_messages = resumed
|
||||
.session_configured
|
||||
.initial_messages
|
||||
|
||||
@@ -4,6 +4,7 @@ use std::ffi::OsString;
|
||||
use std::fs;
|
||||
use std::net::TcpListener;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::SystemTime;
|
||||
use std::time::UNIX_EPOCH;
|
||||
@@ -36,11 +37,13 @@ use core_test_support::skip_if_no_network;
|
||||
use core_test_support::stdio_server_bin;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use core_test_support::wait_for_event;
|
||||
use core_test_support::wait_for_event_with_timeout;
|
||||
use reqwest::Client;
|
||||
use reqwest::StatusCode;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
use serial_test::serial;
|
||||
use tempfile::tempdir;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::process::Child;
|
||||
use tokio::process::Command;
|
||||
use tokio::time::Instant;
|
||||
@@ -263,7 +266,7 @@ async fn stdio_image_responses_round_trip() -> anyhow::Result<()> {
|
||||
let tools_ready_deadline = Instant::now() + Duration::from_secs(30);
|
||||
loop {
|
||||
fixture.codex.submit(Op::ListMcpTools).await?;
|
||||
let list_event = core_test_support::wait_for_event_with_timeout(
|
||||
let list_event = wait_for_event_with_timeout(
|
||||
&fixture.codex,
|
||||
|ev| matches!(ev, EventMsg::McpListToolsResponse(_)),
|
||||
Duration::from_secs(10),
|
||||
@@ -851,10 +854,8 @@ async fn streamable_http_tool_call_round_trip() -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// This test writes to a fallback credentials file in CODEX_HOME.
|
||||
/// Ideally, we wouldn't need to serialize the test but it's much more cumbersome to wire CODEX_HOME through the code.
|
||||
#[serial(codex_home)]
|
||||
#[test]
|
||||
#[serial(mcp_test_value)]
|
||||
fn streamable_http_with_oauth_round_trip() -> anyhow::Result<()> {
|
||||
const TEST_STACK_SIZE_BYTES: usize = 8 * 1024 * 1024;
|
||||
|
||||
@@ -936,8 +937,8 @@ async fn streamable_http_with_oauth_round_trip_impl() -> anyhow::Result<()> {
|
||||
wait_for_streamable_http_server(&mut http_server_child, &bind_addr, Duration::from_secs(5))
|
||||
.await?;
|
||||
|
||||
let temp_home = tempdir()?;
|
||||
let _guard = EnvVarGuard::set("CODEX_HOME", temp_home.path().as_os_str());
|
||||
let temp_home = Arc::new(tempdir()?);
|
||||
let _codex_home_guard = EnvVarGuard::set("CODEX_HOME", temp_home.path().as_os_str());
|
||||
write_fallback_oauth_tokens(
|
||||
temp_home.path(),
|
||||
server_name,
|
||||
@@ -948,10 +949,10 @@ async fn streamable_http_with_oauth_round_trip_impl() -> anyhow::Result<()> {
|
||||
)?;
|
||||
|
||||
let fixture = test_codex()
|
||||
.with_home(temp_home.clone())
|
||||
.with_config(move |config| {
|
||||
// This test seeds OAuth tokens in CODEX_HOME/.credentials.json and
|
||||
// validates file-backed OAuth loading. Force file mode so Linux
|
||||
// keyring backend quirks do not affect this test.
|
||||
// Keep OAuth credentials isolated to this test home because Bazel
|
||||
// runs the full core suite in one process.
|
||||
config.mcp_oauth_credentials_store_mode = serde_json::from_value(json!("file"))
|
||||
.expect("`file` should deserialize as OAuthCredentialsStoreMode");
|
||||
let mut servers = config.mcp_servers.get().clone();
|
||||
@@ -984,6 +985,31 @@ async fn streamable_http_with_oauth_round_trip_impl() -> anyhow::Result<()> {
|
||||
.await?;
|
||||
let session_model = fixture.session_configured.model.clone();
|
||||
|
||||
let tools_ready_deadline = Instant::now() + Duration::from_secs(30);
|
||||
loop {
|
||||
fixture.codex.submit(Op::ListMcpTools).await?;
|
||||
let list_event = wait_for_event_with_timeout(
|
||||
&fixture.codex,
|
||||
|ev| matches!(ev, EventMsg::McpListToolsResponse(_)),
|
||||
Duration::from_secs(10),
|
||||
)
|
||||
.await;
|
||||
let EventMsg::McpListToolsResponse(tool_list) = list_event else {
|
||||
unreachable!("event guard guarantees McpListToolsResponse");
|
||||
};
|
||||
if tool_list.tools.contains_key(&tool_name) {
|
||||
break;
|
||||
}
|
||||
|
||||
let available_tools: Vec<&str> = tool_list.tools.keys().map(String::as_str).collect();
|
||||
if Instant::now() >= tools_ready_deadline {
|
||||
panic!(
|
||||
"timed out waiting for MCP tool {tool_name} to become available; discovered tools: {available_tools:?}"
|
||||
);
|
||||
}
|
||||
sleep(Duration::from_millis(200)).await;
|
||||
}
|
||||
|
||||
fixture
|
||||
.codex
|
||||
.submit(Op::UserTurn {
|
||||
@@ -1078,7 +1104,8 @@ async fn wait_for_streamable_http_server(
|
||||
timeout: Duration,
|
||||
) -> anyhow::Result<()> {
|
||||
let deadline = Instant::now() + timeout;
|
||||
|
||||
let metadata_url = format!("http://{address}/.well-known/oauth-authorization-server/mcp");
|
||||
let client = Client::builder().no_proxy().build()?;
|
||||
loop {
|
||||
if let Some(status) = server_child.try_wait()? {
|
||||
return Err(anyhow::anyhow!(
|
||||
@@ -1090,22 +1117,30 @@ async fn wait_for_streamable_http_server(
|
||||
|
||||
if remaining.is_zero() {
|
||||
return Err(anyhow::anyhow!(
|
||||
"timed out waiting for streamable HTTP server at {address}: deadline reached"
|
||||
"timed out waiting for streamable HTTP server metadata at {metadata_url}: deadline reached"
|
||||
));
|
||||
}
|
||||
|
||||
match tokio::time::timeout(remaining, TcpStream::connect(address)).await {
|
||||
Ok(Ok(_)) => return Ok(()),
|
||||
match tokio::time::timeout(remaining, client.get(&metadata_url).send()).await {
|
||||
Ok(Ok(response)) if response.status() == StatusCode::OK => return Ok(()),
|
||||
Ok(Ok(response)) => {
|
||||
if Instant::now() >= deadline {
|
||||
return Err(anyhow::anyhow!(
|
||||
"timed out waiting for streamable HTTP server metadata at {metadata_url}: HTTP {}",
|
||||
response.status()
|
||||
));
|
||||
}
|
||||
}
|
||||
Ok(Err(error)) => {
|
||||
if Instant::now() >= deadline {
|
||||
return Err(anyhow::anyhow!(
|
||||
"timed out waiting for streamable HTTP server at {address}: {error}"
|
||||
"timed out waiting for streamable HTTP server metadata at {metadata_url}: {error}"
|
||||
));
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
return Err(anyhow::anyhow!(
|
||||
"timed out waiting for streamable HTTP server at {address}: connect call timed out"
|
||||
"timed out waiting for streamable HTTP server metadata at {metadata_url}: request timed out"
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -266,7 +266,7 @@ async fn unicode_output(login: bool) -> anyhow::Result<()> {
|
||||
mount_shell_responses_with_timeout(
|
||||
&harness,
|
||||
call_id,
|
||||
"git -c alias.say='!printf \"%s\" \"naïve_café\"' say",
|
||||
"echo 'naïve_café'",
|
||||
Some(login),
|
||||
MEDIUM_TIMEOUT,
|
||||
)
|
||||
|
||||
@@ -6,63 +6,40 @@ use codex_core::WireApi;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use codex_utils_cargo_bin::find_resource;
|
||||
use core_test_support::load_sse_fixture;
|
||||
use core_test_support::responses::ev_completed;
|
||||
use core_test_support::responses::ev_response_created;
|
||||
use core_test_support::responses::sse;
|
||||
use core_test_support::responses;
|
||||
use core_test_support::skip_if_no_network;
|
||||
use core_test_support::streaming_sse::StreamingSseChunk;
|
||||
use core_test_support::streaming_sse::start_streaming_sse_server;
|
||||
use core_test_support::test_codex::TestCodex;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use core_test_support::wait_for_event;
|
||||
use wiremock::Mock;
|
||||
use wiremock::MockServer;
|
||||
use wiremock::Request;
|
||||
use wiremock::Respond;
|
||||
use wiremock::ResponseTemplate;
|
||||
use wiremock::matchers::method;
|
||||
use wiremock::matchers::path;
|
||||
|
||||
fn sse_incomplete() -> String {
|
||||
load_sse_fixture("tests/fixtures/incomplete_sse.json")
|
||||
let fixture = find_resource!("tests/fixtures/incomplete_sse.json")
|
||||
.unwrap_or_else(|err| panic!("failed to resolve incomplete_sse fixture: {err}"));
|
||||
load_sse_fixture(fixture)
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn retries_on_early_close() {
|
||||
skip_if_no_network!();
|
||||
|
||||
let server = MockServer::start().await;
|
||||
let incomplete_sse = sse_incomplete();
|
||||
let completed_sse = responses::sse_completed("resp_ok");
|
||||
|
||||
struct SeqResponder;
|
||||
impl Respond for SeqResponder {
|
||||
fn respond(&self, _: &Request) -> ResponseTemplate {
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::atomic::Ordering;
|
||||
static CALLS: AtomicUsize = AtomicUsize::new(0);
|
||||
let n = CALLS.fetch_add(1, Ordering::SeqCst);
|
||||
if n == 0 {
|
||||
ResponseTemplate::new(200)
|
||||
.insert_header("content-type", "text/event-stream")
|
||||
.set_body_raw(sse_incomplete(), "text/event-stream")
|
||||
} else {
|
||||
ResponseTemplate::new(200)
|
||||
.insert_header("content-type", "text/event-stream")
|
||||
.set_body_raw(
|
||||
sse(vec![
|
||||
ev_response_created("resp_ok"),
|
||||
ev_completed("resp_ok"),
|
||||
]),
|
||||
"text/event-stream",
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/v1/responses"))
|
||||
.respond_with(SeqResponder {})
|
||||
.expect(2)
|
||||
.mount(&server)
|
||||
.await;
|
||||
let (server, _) = start_streaming_sse_server(vec![
|
||||
vec![StreamingSseChunk {
|
||||
gate: None,
|
||||
body: incomplete_sse,
|
||||
}],
|
||||
vec![StreamingSseChunk {
|
||||
gate: None,
|
||||
body: completed_sse,
|
||||
}],
|
||||
])
|
||||
.await;
|
||||
|
||||
// Configure retry behavior explicitly to avoid mutating process-wide
|
||||
// environment variables.
|
||||
@@ -92,7 +69,7 @@ async fn retries_on_early_close() {
|
||||
.with_config(move |config| {
|
||||
config.model_provider = model_provider;
|
||||
})
|
||||
.build(&server)
|
||||
.build_with_streaming_server(&server)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -109,4 +86,13 @@ async fn retries_on_early_close() {
|
||||
|
||||
// Wait until TurnComplete (should succeed after retry).
|
||||
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
let requests = server.requests().await;
|
||||
assert_eq!(
|
||||
requests.len(),
|
||||
2,
|
||||
"expected retry after incomplete SSE stream"
|
||||
);
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
@@ -54,23 +54,26 @@ async fn test_shell_command_approval_triggers_elicitation() {
|
||||
async fn shell_command_approval_triggers_elicitation() -> anyhow::Result<()> {
|
||||
// Use a simple, untrusted command that creates a file so we can
|
||||
// observe a side-effect.
|
||||
//
|
||||
// Cross‑platform approach: run a tiny Python snippet to touch the file
|
||||
// using `python3 -c ...` on all platforms.
|
||||
let workdir_for_shell_function_call = TempDir::new()?;
|
||||
let created_filename = "created_by_shell_tool.txt";
|
||||
let created_file = workdir_for_shell_function_call
|
||||
.path()
|
||||
.join(created_filename);
|
||||
|
||||
let shell_command = vec![
|
||||
"python3".to_string(),
|
||||
"-c".to_string(),
|
||||
format!("import pathlib; pathlib.Path('{created_filename}').touch()"),
|
||||
];
|
||||
let expected_shell_command = format_with_current_shell(&format!(
|
||||
"python3 -c \"import pathlib; pathlib.Path('{created_filename}').touch()\""
|
||||
));
|
||||
let shell_command = if cfg!(windows) {
|
||||
vec![
|
||||
"New-Item".to_string(),
|
||||
"-ItemType".to_string(),
|
||||
"File".to_string(),
|
||||
"-Path".to_string(),
|
||||
created_filename.to_string(),
|
||||
"-Force".to_string(),
|
||||
]
|
||||
} else {
|
||||
vec!["touch".to_string(), created_filename.to_string()]
|
||||
};
|
||||
let expected_shell_command =
|
||||
format_with_current_shell(&shlex::try_join(shell_command.iter().map(String::as_str))?);
|
||||
|
||||
let McpHandle {
|
||||
process: mut mcp_process,
|
||||
|
||||
@@ -3,6 +3,7 @@ use std::collections::HashMap;
|
||||
use std::io::ErrorKind;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use axum::Router;
|
||||
use axum::body::Body;
|
||||
@@ -44,6 +45,7 @@ use serde::Deserialize;
|
||||
use serde_json::json;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::task;
|
||||
use tokio::time::sleep;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct TestToolServer {
|
||||
@@ -275,15 +277,25 @@ fn parse_bind_addr() -> Result<SocketAddr, Box<dyn std::error::Error>> {
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let bind_addr = parse_bind_addr()?;
|
||||
let session_failure_state = SessionFailureState::default();
|
||||
let listener = match tokio::net::TcpListener::bind(&bind_addr).await {
|
||||
Ok(listener) => listener,
|
||||
Err(err) if err.kind() == ErrorKind::PermissionDenied => {
|
||||
eprintln!(
|
||||
"failed to bind to {bind_addr}: {err}. make sure the process has network access"
|
||||
);
|
||||
return Ok(());
|
||||
const MAX_BIND_RETRIES: u32 = 20;
|
||||
const BIND_RETRY_DELAY: Duration = Duration::from_millis(50);
|
||||
|
||||
let mut bind_retries = 0;
|
||||
let listener = loop {
|
||||
match tokio::net::TcpListener::bind(&bind_addr).await {
|
||||
Ok(listener) => break listener,
|
||||
Err(err) if err.kind() == ErrorKind::PermissionDenied => {
|
||||
eprintln!(
|
||||
"failed to bind to {bind_addr}: {err}. make sure the process has network access"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
Err(err) if err.kind() == ErrorKind::AddrInUse && bind_retries < MAX_BIND_RETRIES => {
|
||||
bind_retries += 1;
|
||||
sleep(BIND_RETRY_DELAY).await;
|
||||
}
|
||||
Err(err) => return Err(err.into()),
|
||||
}
|
||||
Err(err) => return Err(err.into()),
|
||||
};
|
||||
eprintln!("starting rmcp streamable http test server on http://{bind_addr}/mcp");
|
||||
|
||||
|
||||
@@ -24,8 +24,13 @@ async fn wait_for_pid_file(path: &Path) -> Result<u32> {
|
||||
for _ in 0..50 {
|
||||
match fs::read_to_string(path) {
|
||||
Ok(content) => {
|
||||
let pid = content
|
||||
.trim()
|
||||
let trimmed = content.trim();
|
||||
if trimmed.is_empty() {
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
continue;
|
||||
}
|
||||
|
||||
let pid = trimmed
|
||||
.parse::<u32>()
|
||||
.with_context(|| format!("failed to parse pid from {}", path.display()))?;
|
||||
return Ok(pid);
|
||||
|
||||
@@ -22,7 +22,6 @@ anyhow = { workspace = true }
|
||||
uds_windows = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
assert_cmd = { workspace = true }
|
||||
codex-utils-cargo-bin = { workspace = true }
|
||||
pretty_assertions = { workspace = true }
|
||||
tempfile = { workspace = true }
|
||||
|
||||
@@ -39,9 +39,13 @@ pub fn run(socket_path: &Path) -> anyhow::Result<()> {
|
||||
io::copy(&mut handle, &mut stream).context("failed to copy data from stdin to socket")?;
|
||||
}
|
||||
|
||||
stream
|
||||
.shutdown(Shutdown::Write)
|
||||
.context("failed to shutdown socket writer")?;
|
||||
// The peer can close immediately after sending its response. On macOS that
|
||||
// may surface as NotConnected when we half-close our write side.
|
||||
if let Err(err) = stream.shutdown(Shutdown::Write)
|
||||
&& err.kind() != io::ErrorKind::NotConnected
|
||||
{
|
||||
return Err(err).context("failed to shutdown socket writer");
|
||||
}
|
||||
|
||||
let stdout_result = stdout_thread
|
||||
.join()
|
||||
|
||||
@@ -1,12 +1,15 @@
|
||||
use std::io::ErrorKind;
|
||||
use std::io::Read;
|
||||
use std::io::Write;
|
||||
use std::process::Command;
|
||||
use std::process::Stdio;
|
||||
use std::sync::mpsc;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
use anyhow::Context;
|
||||
use assert_cmd::Command;
|
||||
use anyhow::anyhow;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[cfg(unix)]
|
||||
@@ -19,6 +22,9 @@ use uds_windows::UnixListener;
|
||||
fn pipes_stdin_and_stdout_through_socket() -> anyhow::Result<()> {
|
||||
let dir = tempfile::TempDir::new().context("failed to create temp dir")?;
|
||||
let socket_path = dir.path().join("socket");
|
||||
let request = b"request";
|
||||
let request_path = dir.path().join("request.txt");
|
||||
std::fs::write(&request_path, request).context("failed to write child stdin fixture")?;
|
||||
let listener = match UnixListener::bind(&socket_path) {
|
||||
Ok(listener) => listener,
|
||||
Err(err) if err.kind() == ErrorKind::PermissionDenied => {
|
||||
@@ -31,37 +37,103 @@ fn pipes_stdin_and_stdout_through_socket() -> anyhow::Result<()> {
|
||||
};
|
||||
|
||||
let (tx, rx) = mpsc::channel();
|
||||
let (event_tx, event_rx) = mpsc::channel();
|
||||
let server_thread = thread::spawn(move || -> anyhow::Result<()> {
|
||||
let _ = event_tx.send("waiting for accept".to_string());
|
||||
let (mut connection, _) = listener
|
||||
.accept()
|
||||
.context("failed to accept test connection")?;
|
||||
let mut received = Vec::new();
|
||||
let _ = event_tx.send("accepted connection".to_string());
|
||||
let mut received = vec![0; request.len()];
|
||||
connection
|
||||
.read_to_end(&mut received)
|
||||
.read_exact(&mut received)
|
||||
.context("failed to read data from client")?;
|
||||
let _ = event_tx.send(format!("read {} bytes", received.len()));
|
||||
tx.send(received)
|
||||
.map_err(|_| anyhow::anyhow!("failed to send received bytes to test thread"))?;
|
||||
.map_err(|_| anyhow!("failed to send received bytes to test thread"))?;
|
||||
connection
|
||||
.write_all(b"response")
|
||||
.context("failed to write response to client")?;
|
||||
let _ = event_tx.send("wrote response".to_string());
|
||||
Ok(())
|
||||
});
|
||||
|
||||
Command::new(codex_utils_cargo_bin::cargo_bin("codex-stdio-to-uds")?)
|
||||
let stdin = std::fs::File::open(&request_path).context("failed to open child stdin fixture")?;
|
||||
let mut child = Command::new(codex_utils_cargo_bin::cargo_bin("codex-stdio-to-uds")?)
|
||||
.arg(&socket_path)
|
||||
.write_stdin("request")
|
||||
.assert()
|
||||
.success()
|
||||
.stdout("response");
|
||||
.stdin(Stdio::from(stdin))
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.spawn()
|
||||
.context("failed to spawn codex-stdio-to-uds")?;
|
||||
|
||||
let mut child_stdout = child.stdout.take().context("missing child stdout")?;
|
||||
let mut child_stderr = child.stderr.take().context("missing child stderr")?;
|
||||
let (stdout_tx, stdout_rx) = mpsc::channel();
|
||||
let (stderr_tx, stderr_rx) = mpsc::channel();
|
||||
thread::spawn(move || {
|
||||
let mut stdout = Vec::new();
|
||||
let result = child_stdout.read_to_end(&mut stdout).map(|_| stdout);
|
||||
let _ = stdout_tx.send(result);
|
||||
});
|
||||
thread::spawn(move || {
|
||||
let mut stderr = Vec::new();
|
||||
let result = child_stderr.read_to_end(&mut stderr).map(|_| stderr);
|
||||
let _ = stderr_tx.send(result);
|
||||
});
|
||||
|
||||
let mut server_events = Vec::new();
|
||||
let deadline = Instant::now() + Duration::from_secs(5);
|
||||
let status = loop {
|
||||
while let Ok(event) = event_rx.try_recv() {
|
||||
server_events.push(event);
|
||||
}
|
||||
|
||||
if let Some(status) = child.try_wait().context("failed to poll child status")? {
|
||||
break status;
|
||||
}
|
||||
|
||||
if Instant::now() >= deadline {
|
||||
let _ = child.kill();
|
||||
let _ = child.wait();
|
||||
let stderr = stderr_rx
|
||||
.recv_timeout(Duration::from_secs(1))
|
||||
.context("timed out waiting for child stderr after kill")?
|
||||
.context("failed to read child stderr")?;
|
||||
anyhow::bail!(
|
||||
"codex-stdio-to-uds did not exit in time; server events: {:?}; stderr: {}",
|
||||
server_events,
|
||||
String::from_utf8_lossy(&stderr).trim_end()
|
||||
);
|
||||
}
|
||||
|
||||
thread::sleep(Duration::from_millis(25));
|
||||
};
|
||||
|
||||
let stdout = stdout_rx
|
||||
.recv_timeout(Duration::from_secs(1))
|
||||
.context("timed out waiting for child stdout")?
|
||||
.context("failed to read child stdout")?;
|
||||
let stderr = stderr_rx
|
||||
.recv_timeout(Duration::from_secs(1))
|
||||
.context("timed out waiting for child stderr")?
|
||||
.context("failed to read child stderr")?;
|
||||
assert!(
|
||||
status.success(),
|
||||
"codex-stdio-to-uds exited with {status}; server events: {:?}; stderr: {}",
|
||||
server_events,
|
||||
String::from_utf8_lossy(&stderr).trim_end()
|
||||
);
|
||||
assert_eq!(stdout, b"response");
|
||||
|
||||
let received = rx
|
||||
.recv_timeout(Duration::from_secs(1))
|
||||
.context("server did not receive data in time")?;
|
||||
assert_eq!(received, b"request");
|
||||
assert_eq!(received, request);
|
||||
|
||||
let server_result = server_thread
|
||||
.join()
|
||||
.map_err(|_| anyhow::anyhow!("server thread panicked"))?;
|
||||
.map_err(|_| anyhow!("server thread panicked"))?;
|
||||
server_result.context("server failed")?;
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -1,18 +0,0 @@
|
||||
---
|
||||
source: tui/src/chatwidget/tests.rs
|
||||
expression: popup
|
||||
---
|
||||
Experimental features
|
||||
Toggle experimental features. Changes are saved to config.toml.
|
||||
|
||||
› [ ] JavaScript REPL Enable a persistent Node-backed JavaScript REPL for interactive website debugging
|
||||
and other inline JavaScript execution capabilities. Requires Node >= v22.22.0
|
||||
installed.
|
||||
[ ] Multi-agents Ask Codex to spawn multiple agents to parallelize the work and win in efficiency.
|
||||
[ ] Apps Use a connected ChatGPT App using "$". Install Apps via /apps command. Restart
|
||||
Codex after enabling.
|
||||
[ ] Guardian approvals Let a guardian subagent review `on-request` approval prompts instead of showing
|
||||
them to you, including sandbox escapes and blocked network access.
|
||||
[ ] Prevent sleep while running Keep your computer awake while Codex is running a thread.
|
||||
|
||||
Press space to select or enter to save for next conversation
|
||||
@@ -1,19 +0,0 @@
|
||||
---
|
||||
source: tui/src/chatwidget/tests.rs
|
||||
expression: popup
|
||||
---
|
||||
Experimental features
|
||||
Toggle experimental features. Changes are saved to config.toml.
|
||||
|
||||
› [ ] JavaScript REPL Enable a persistent Node-backed JavaScript REPL for interactive website debugging
|
||||
and other inline JavaScript execution capabilities. Requires Node >= v22.22.0
|
||||
installed.
|
||||
[ ] Bubblewrap sandbox Try the new linux sandbox based on bubblewrap.
|
||||
[ ] Multi-agents Ask Codex to spawn multiple agents to parallelize the work and win in efficiency.
|
||||
[ ] Apps Use a connected ChatGPT App using "$". Install Apps via /apps command. Restart
|
||||
Codex after enabling.
|
||||
[ ] Guardian approvals Let a guardian subagent review `on-request` approval prompts instead of showing
|
||||
them to you, including sandbox escapes and blocked network access.
|
||||
[ ] Prevent sleep while running Keep your computer awake while Codex is running a thread.
|
||||
|
||||
Press space to select or enter to save for next conversation
|
||||
@@ -7149,10 +7149,14 @@ async fn experimental_popup_includes_guardian_approval() {
|
||||
chat.open_experimental_popup();
|
||||
|
||||
let popup = render_bottom_popup(&chat, 120);
|
||||
#[cfg(target_os = "linux")]
|
||||
assert_snapshot!("experimental_popup_includes_guardian_approval_linux", popup);
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
assert_snapshot!("experimental_popup_includes_guardian_approval", popup);
|
||||
assert!(
|
||||
popup.contains("Guardian approvals"),
|
||||
"expected guardian approvals entry in experimental popup, got:\n{popup}"
|
||||
);
|
||||
assert!(
|
||||
popup.contains("blocked network access"),
|
||||
"expected guardian approvals description in experimental popup, got:\n{popup}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -56,7 +56,13 @@ fn echo_sleep_command(marker: &str) -> String {
|
||||
}
|
||||
|
||||
fn split_stdout_stderr_command() -> String {
|
||||
"printf 'split-out\\n'; printf 'split-err\\n' >&2".to_string()
|
||||
if cfg!(windows) {
|
||||
// Keep this in cmd.exe syntax so the test does not depend on a runner-local
|
||||
// PowerShell/Python setup just to produce deterministic split output.
|
||||
"(echo split-out)&(>&2 echo split-err)".to_string()
|
||||
} else {
|
||||
"printf 'split-out\\n'; printf 'split-err\\n' >&2".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
async fn collect_split_output(mut output_rx: tokio::sync::mpsc::Receiver<Vec<u8>>) -> Vec<u8> {
|
||||
@@ -130,52 +136,36 @@ async fn collect_output_until_exit(
|
||||
}
|
||||
|
||||
async fn wait_for_python_repl_ready(
|
||||
writer: &tokio::sync::mpsc::Sender<Vec<u8>>,
|
||||
output_rx: &mut tokio::sync::broadcast::Receiver<Vec<u8>>,
|
||||
timeout_ms: u64,
|
||||
newline: &str,
|
||||
ready_marker: &str,
|
||||
) -> anyhow::Result<Vec<u8>> {
|
||||
let mut collected = Vec::new();
|
||||
let marker = "__codex_pty_ready__";
|
||||
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_millis(timeout_ms);
|
||||
let probe_window = tokio::time::Duration::from_millis(if cfg!(windows) { 750 } else { 250 });
|
||||
|
||||
while tokio::time::Instant::now() < deadline {
|
||||
writer
|
||||
.send(format!("print('{marker}'){newline}").into_bytes())
|
||||
.await?;
|
||||
|
||||
let probe_deadline = tokio::time::Instant::now() + probe_window;
|
||||
loop {
|
||||
let now = tokio::time::Instant::now();
|
||||
if now >= deadline || now >= probe_deadline {
|
||||
break;
|
||||
}
|
||||
let remaining = std::cmp::min(
|
||||
deadline.saturating_duration_since(now),
|
||||
probe_deadline.saturating_duration_since(now),
|
||||
);
|
||||
match tokio::time::timeout(remaining, output_rx.recv()).await {
|
||||
Ok(Ok(chunk)) => {
|
||||
collected.extend_from_slice(&chunk);
|
||||
if String::from_utf8_lossy(&collected).contains(marker) {
|
||||
return Ok(collected);
|
||||
}
|
||||
let now = tokio::time::Instant::now();
|
||||
let remaining = deadline.saturating_duration_since(now);
|
||||
match tokio::time::timeout(remaining, output_rx.recv()).await {
|
||||
Ok(Ok(chunk)) => {
|
||||
collected.extend_from_slice(&chunk);
|
||||
if String::from_utf8_lossy(&collected).contains(ready_marker) {
|
||||
return Ok(collected);
|
||||
}
|
||||
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
|
||||
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
|
||||
anyhow::bail!(
|
||||
"PTY output closed while waiting for Python REPL readiness: {:?}",
|
||||
String::from_utf8_lossy(&collected)
|
||||
);
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
|
||||
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
|
||||
anyhow::bail!(
|
||||
"PTY output closed while waiting for Python REPL readiness: {:?}",
|
||||
String::from_utf8_lossy(&collected)
|
||||
);
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
|
||||
anyhow::bail!(
|
||||
"timed out waiting for Python REPL readiness in PTY: {:?}",
|
||||
"timed out waiting for Python REPL readiness marker {ready_marker:?} in PTY: {:?}",
|
||||
String::from_utf8_lossy(&collected)
|
||||
);
|
||||
}
|
||||
@@ -254,10 +244,17 @@ async fn pty_python_repl_emits_output_and_exits() -> anyhow::Result<()> {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let ready_marker = "__codex_pty_ready__";
|
||||
let args = vec![
|
||||
"-i".to_string(),
|
||||
"-q".to_string(),
|
||||
"-c".to_string(),
|
||||
format!("print('{ready_marker}')"),
|
||||
];
|
||||
let env_map: HashMap<String, String> = std::env::vars().collect();
|
||||
let spawned = spawn_pty_process(
|
||||
&python,
|
||||
&[],
|
||||
&args,
|
||||
Path::new("."),
|
||||
&env_map,
|
||||
&None,
|
||||
@@ -269,7 +266,7 @@ async fn pty_python_repl_emits_output_and_exits() -> anyhow::Result<()> {
|
||||
let newline = if cfg!(windows) { "\r\n" } else { "\n" };
|
||||
let startup_timeout_ms = if cfg!(windows) { 10_000 } else { 5_000 };
|
||||
let mut output =
|
||||
wait_for_python_repl_ready(&writer, &mut output_rx, startup_timeout_ms, newline).await?;
|
||||
wait_for_python_repl_ready(&mut output_rx, startup_timeout_ms, ready_marker).await?;
|
||||
writer
|
||||
.send(format!("print('hello from pty'){newline}").into_bytes())
|
||||
.await?;
|
||||
@@ -291,21 +288,42 @@ async fn pty_python_repl_emits_output_and_exits() -> anyhow::Result<()> {
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn pipe_process_round_trips_stdin() -> anyhow::Result<()> {
|
||||
let Some(python) = find_python() else {
|
||||
eprintln!("python not found; skipping pipe_process_round_trips_stdin");
|
||||
return Ok(());
|
||||
let (program, args) = if cfg!(windows) {
|
||||
let cmd = std::env::var("COMSPEC").unwrap_or_else(|_| "cmd.exe".to_string());
|
||||
(
|
||||
cmd,
|
||||
vec![
|
||||
"/Q".to_string(),
|
||||
"/V:ON".to_string(),
|
||||
"/D".to_string(),
|
||||
"/C".to_string(),
|
||||
"set /p line= & echo(!line!".to_string(),
|
||||
],
|
||||
)
|
||||
} else {
|
||||
let Some(python) = find_python() else {
|
||||
eprintln!("python not found; skipping pipe_process_round_trips_stdin");
|
||||
return Ok(());
|
||||
};
|
||||
(
|
||||
python,
|
||||
vec![
|
||||
"-u".to_string(),
|
||||
"-c".to_string(),
|
||||
"import sys; print(sys.stdin.readline().strip());".to_string(),
|
||||
],
|
||||
)
|
||||
};
|
||||
|
||||
let args = vec![
|
||||
"-u".to_string(),
|
||||
"-c".to_string(),
|
||||
"import sys; print(sys.stdin.readline().strip());".to_string(),
|
||||
];
|
||||
let env_map: HashMap<String, String> = std::env::vars().collect();
|
||||
let spawned = spawn_pipe_process(&python, &args, Path::new("."), &env_map, &None).await?;
|
||||
let spawned = spawn_pipe_process(&program, &args, Path::new("."), &env_map, &None).await?;
|
||||
let (session, output_rx, exit_rx) = combine_spawned_output(spawned);
|
||||
let writer = session.writer_sender();
|
||||
writer.send(b"roundtrip\n".to_vec()).await?;
|
||||
let newline = if cfg!(windows) { "\r\n" } else { "\n" };
|
||||
writer
|
||||
.send(format!("roundtrip{newline}").into_bytes())
|
||||
.await?;
|
||||
drop(writer);
|
||||
session.close_stdin();
|
||||
|
||||
let (output, code) = collect_output_until_exit(output_rx, exit_rx, 5_000).await;
|
||||
let text = String::from_utf8_lossy(&output);
|
||||
@@ -427,21 +445,7 @@ async fn pipe_drains_stderr_without_stdout_activity() -> anyhow::Result<()> {
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn pipe_process_can_expose_split_stdout_and_stderr() -> anyhow::Result<()> {
|
||||
let env_map: HashMap<String, String> = std::env::vars().collect();
|
||||
let (program, args) = if cfg!(windows) {
|
||||
let Some(python) = find_python() else {
|
||||
eprintln!("python not found; skipping pipe_process_can_expose_split_stdout_and_stderr");
|
||||
return Ok(());
|
||||
};
|
||||
(
|
||||
python,
|
||||
vec![
|
||||
"-c".to_string(),
|
||||
"import sys; sys.stdout.buffer.write(b'split-out\\n'); sys.stdout.buffer.flush(); sys.stderr.buffer.write(b'split-err\\n'); sys.stderr.buffer.flush()".to_string(),
|
||||
],
|
||||
)
|
||||
} else {
|
||||
shell_command(&split_stdout_stderr_command())
|
||||
};
|
||||
let (program, args) = shell_command(&split_stdout_stderr_command());
|
||||
let spawned =
|
||||
spawn_pipe_process_no_stdin(&program, &args, Path::new("."), &env_map, &None).await?;
|
||||
let SpawnedProcess {
|
||||
@@ -464,8 +468,19 @@ async fn pipe_process_can_expose_split_stdout_and_stderr() -> anyhow::Result<()>
|
||||
.await
|
||||
.map_err(|_| anyhow::anyhow!("timed out waiting to drain split stderr"))??;
|
||||
|
||||
assert_eq!(stdout, b"split-out\n".to_vec());
|
||||
assert_eq!(stderr, b"split-err\n".to_vec());
|
||||
let expected_stdout = if cfg!(windows) {
|
||||
b"split-out\r\n".to_vec()
|
||||
} else {
|
||||
b"split-out\n".to_vec()
|
||||
};
|
||||
let expected_stderr = if cfg!(windows) {
|
||||
b"split-err\r\n".to_vec()
|
||||
} else {
|
||||
b"split-err\n".to_vec()
|
||||
};
|
||||
|
||||
assert_eq!(stdout, expected_stdout);
|
||||
assert_eq!(stderr, expected_stderr);
|
||||
assert_eq!(code, 0);
|
||||
|
||||
Ok(())
|
||||
|
||||
Reference in New Issue
Block a user