mirror of
https://github.com/openai/codex.git
synced 2026-04-14 17:54:57 +00:00
Compare commits
43 Commits
dev/shaqay
...
dev/pr1359
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
75499a381b | ||
|
|
bddc6cd53e | ||
|
|
7456d28a7a | ||
|
|
406b5b386c | ||
|
|
da2ddb4bf0 | ||
|
|
2260beb57f | ||
|
|
dba2ee22dc | ||
|
|
ea0e790813 | ||
|
|
c42121a9f1 | ||
|
|
f0cb95be20 | ||
|
|
e6f39a2d94 | ||
|
|
78fb500503 | ||
|
|
9c5049c21a | ||
|
|
faf223c4ae | ||
|
|
762d419a48 | ||
|
|
38354d89f0 | ||
|
|
e064e7035b | ||
|
|
c0b50b4ddc | ||
|
|
0de39cd7ce | ||
|
|
93af8e0d57 | ||
|
|
3b79f58849 | ||
|
|
488a602833 | ||
|
|
72cf2819be | ||
|
|
e64133e403 | ||
|
|
c00078dd91 | ||
|
|
e951d6167f | ||
|
|
2b82a61f7e | ||
|
|
a5e13e321b | ||
|
|
b5208d7979 | ||
|
|
518c9a7ccf | ||
|
|
12c68ddc19 | ||
|
|
a9406ce8e8 | ||
|
|
484668e073 | ||
|
|
ce981d77d2 | ||
|
|
427880981c | ||
|
|
38741c5c78 | ||
|
|
82cc8397d4 | ||
|
|
3f393b65fb | ||
|
|
56bf69c219 | ||
|
|
c38f0516c6 | ||
|
|
f60933bc8c | ||
|
|
38065128ed | ||
|
|
91af9d634e |
@@ -2,6 +2,12 @@
|
||||
# Do not increase, fix your test instead
|
||||
slow-timeout = { period = "15s", terminate-after = 2 }
|
||||
|
||||
[test-groups.app_server_protocol_codegen]
|
||||
max-threads = 1
|
||||
|
||||
[test-groups.app_server_integration]
|
||||
max-threads = 1
|
||||
|
||||
|
||||
[[profile.default.overrides]]
|
||||
# Do not add new tests here
|
||||
@@ -11,3 +17,13 @@ slow-timeout = { period = "1m", terminate-after = 4 }
|
||||
[[profile.default.overrides]]
|
||||
filter = 'test(approval_matrix_covers_all_modes)'
|
||||
slow-timeout = { period = "30s", terminate-after = 2 }
|
||||
|
||||
[[profile.default.overrides]]
|
||||
filter = 'package(codex-app-server-protocol) & (test(typescript_schema_fixtures_match_generated) | test(json_schema_fixtures_match_generated) | test(generate_ts_with_experimental_api_retains_experimental_entries) | test(generated_ts_optional_nullable_fields_only_in_params) | test(generate_json_filters_experimental_fields_and_methods))'
|
||||
test-group = 'app_server_protocol_codegen'
|
||||
|
||||
[[profile.default.overrides]]
|
||||
# These tests spawn a fresh app-server subprocess per case. Serializing them keeps
|
||||
# Windows startup deterministic without widening the read timeouts.
|
||||
filter = 'package(codex-app-server)'
|
||||
test-group = 'app_server_integration'
|
||||
|
||||
1
codex-rs/Cargo.lock
generated
1
codex-rs/Cargo.lock
generated
@@ -2414,7 +2414,6 @@ name = "codex-stdio-to-uds"
|
||||
version = "0.0.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"assert_cmd",
|
||||
"codex-utils-cargo-bin",
|
||||
"pretty_assertions",
|
||||
"tempfile",
|
||||
|
||||
@@ -23,6 +23,7 @@ use schemars::schema_for;
|
||||
use serde::Serialize;
|
||||
use serde_json::Map;
|
||||
use serde_json::Value;
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::ffi::OsStr;
|
||||
@@ -32,9 +33,10 @@ use std::io::Write;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::process::Command;
|
||||
use std::thread;
|
||||
use ts_rs::TS;
|
||||
|
||||
const HEADER: &str = "// GENERATED CODE! DO NOT MODIFY BY HAND!\n\n";
|
||||
pub(crate) const GENERATED_TS_HEADER: &str = "// GENERATED CODE! DO NOT MODIFY BY HAND!\n\n";
|
||||
const IGNORED_DEFINITIONS: &[&str] = &["Option<()>"];
|
||||
const JSON_V1_ALLOWLIST: &[&str] = &["InitializeParams", "InitializeResponse"];
|
||||
const SPECIAL_DEFINITIONS: &[&str] = &[
|
||||
@@ -137,9 +139,29 @@ pub fn generate_ts_with_options(
|
||||
}
|
||||
|
||||
if options.ensure_headers {
|
||||
for file in &ts_files {
|
||||
prepend_header_if_missing(file)?;
|
||||
}
|
||||
let worker_count = thread::available_parallelism()
|
||||
.map_or(1, usize::from)
|
||||
.min(ts_files.len().max(1));
|
||||
let chunk_size = ts_files.len().div_ceil(worker_count);
|
||||
thread::scope(|scope| -> Result<()> {
|
||||
let mut workers = Vec::new();
|
||||
for chunk in ts_files.chunks(chunk_size.max(1)) {
|
||||
workers.push(scope.spawn(move || -> Result<()> {
|
||||
for file in chunk {
|
||||
prepend_header_if_missing(file)?;
|
||||
}
|
||||
Ok(())
|
||||
}));
|
||||
}
|
||||
|
||||
for worker in workers {
|
||||
worker
|
||||
.join()
|
||||
.map_err(|_| anyhow!("TypeScript header worker panicked"))??;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})?;
|
||||
}
|
||||
|
||||
// Optionally run Prettier on all generated TS files.
|
||||
@@ -231,6 +253,41 @@ fn filter_experimental_ts(out_dir: &Path) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn filter_experimental_ts_tree(tree: &mut BTreeMap<PathBuf, String>) -> Result<()> {
|
||||
let registered_fields = experimental_fields();
|
||||
let experimental_method_types = experimental_method_types();
|
||||
if let Some(content) = tree.get_mut(Path::new("ClientRequest.ts")) {
|
||||
let filtered =
|
||||
filter_client_request_ts_contents(std::mem::take(content), EXPERIMENTAL_CLIENT_METHODS);
|
||||
*content = filtered;
|
||||
}
|
||||
|
||||
let mut fields_by_type_name: HashMap<String, HashSet<String>> = HashMap::new();
|
||||
for field in registered_fields {
|
||||
fields_by_type_name
|
||||
.entry(field.type_name.to_string())
|
||||
.or_default()
|
||||
.insert(field.field_name.to_string());
|
||||
}
|
||||
|
||||
for (path, content) in tree.iter_mut() {
|
||||
let Some(type_name) = path.file_stem().and_then(|stem| stem.to_str()) else {
|
||||
continue;
|
||||
};
|
||||
let Some(experimental_field_names) = fields_by_type_name.get(type_name) else {
|
||||
continue;
|
||||
};
|
||||
let filtered = filter_experimental_type_fields_ts_contents(
|
||||
std::mem::take(content),
|
||||
experimental_field_names,
|
||||
);
|
||||
*content = filtered;
|
||||
}
|
||||
|
||||
remove_generated_type_entries(tree, &experimental_method_types, "ts");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Removes union arms from `ClientRequest.ts` for methods marked experimental.
|
||||
fn filter_client_request_ts(out_dir: &Path, experimental_methods: &[&str]) -> Result<()> {
|
||||
let path = out_dir.join("ClientRequest.ts");
|
||||
@@ -239,9 +296,15 @@ fn filter_client_request_ts(out_dir: &Path, experimental_methods: &[&str]) -> Re
|
||||
}
|
||||
let mut content =
|
||||
fs::read_to_string(&path).with_context(|| format!("Failed to read {}", path.display()))?;
|
||||
content = filter_client_request_ts_contents(content, experimental_methods);
|
||||
|
||||
fs::write(&path, content).with_context(|| format!("Failed to write {}", path.display()))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn filter_client_request_ts_contents(mut content: String, experimental_methods: &[&str]) -> String {
|
||||
let Some((prefix, body, suffix)) = split_type_alias(&content) else {
|
||||
return Ok(());
|
||||
return content;
|
||||
};
|
||||
let experimental_methods: HashSet<&str> = experimental_methods
|
||||
.iter()
|
||||
@@ -259,12 +322,9 @@ fn filter_client_request_ts(out_dir: &Path, experimental_methods: &[&str]) -> Re
|
||||
let new_body = filtered_arms.join(" | ");
|
||||
content = format!("{prefix}{new_body}{suffix}");
|
||||
let import_usage_scope = split_type_alias(&content)
|
||||
.map(|(_, body, _)| body)
|
||||
.map(|(_, filtered_body, _)| filtered_body)
|
||||
.unwrap_or_else(|| new_body.clone());
|
||||
content = prune_unused_type_imports(content, &import_usage_scope);
|
||||
|
||||
fs::write(&path, content).with_context(|| format!("Failed to write {}", path.display()))?;
|
||||
Ok(())
|
||||
prune_unused_type_imports(content, &import_usage_scope)
|
||||
}
|
||||
|
||||
/// Removes experimental properties from generated TypeScript type files.
|
||||
@@ -302,8 +362,17 @@ fn filter_experimental_fields_in_ts_file(
|
||||
) -> Result<()> {
|
||||
let mut content =
|
||||
fs::read_to_string(path).with_context(|| format!("Failed to read {}", path.display()))?;
|
||||
content = filter_experimental_type_fields_ts_contents(content, experimental_field_names);
|
||||
fs::write(path, content).with_context(|| format!("Failed to write {}", path.display()))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn filter_experimental_type_fields_ts_contents(
|
||||
mut content: String,
|
||||
experimental_field_names: &HashSet<String>,
|
||||
) -> String {
|
||||
let Some((open_brace, close_brace)) = type_body_brace_span(&content) else {
|
||||
return Ok(());
|
||||
return content;
|
||||
};
|
||||
let inner = &content[open_brace + 1..close_brace];
|
||||
let fields = split_top_level_multi(inner, &[',', ';']);
|
||||
@@ -322,9 +391,7 @@ fn filter_experimental_fields_in_ts_file(
|
||||
let import_usage_scope = split_type_alias(&content)
|
||||
.map(|(_, body, _)| body)
|
||||
.unwrap_or_else(|| new_inner.clone());
|
||||
content = prune_unused_type_imports(content, &import_usage_scope);
|
||||
fs::write(path, content).with_context(|| format!("Failed to write {}", path.display()))?;
|
||||
Ok(())
|
||||
prune_unused_type_imports(content, &import_usage_scope)
|
||||
}
|
||||
|
||||
fn filter_experimental_schema(bundle: &mut Value) -> Result<()> {
|
||||
@@ -526,6 +593,23 @@ fn remove_generated_type_files(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn remove_generated_type_entries(
|
||||
tree: &mut BTreeMap<PathBuf, String>,
|
||||
type_names: &HashSet<String>,
|
||||
extension: &str,
|
||||
) {
|
||||
for type_name in type_names {
|
||||
for subdir in ["", "v1", "v2"] {
|
||||
let path = if subdir.is_empty() {
|
||||
PathBuf::from(format!("{type_name}.{extension}"))
|
||||
} else {
|
||||
PathBuf::from(subdir).join(format!("{type_name}.{extension}"))
|
||||
};
|
||||
tree.remove(&path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_experimental_method_type_definitions(bundle: &mut Value) {
|
||||
let type_names = experimental_method_types();
|
||||
let Some(definitions) = bundle.get_mut("definitions").and_then(Value::as_object_mut) else {
|
||||
@@ -1807,13 +1891,13 @@ fn prepend_header_if_missing(path: &Path) -> Result<()> {
|
||||
.with_context(|| format!("Failed to read {}", path.display()))?;
|
||||
}
|
||||
|
||||
if content.starts_with(HEADER) {
|
||||
if content.starts_with(GENERATED_TS_HEADER) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut f = fs::File::create(path)
|
||||
.with_context(|| format!("Failed to open {} for writing", path.display()))?;
|
||||
f.write_all(HEADER.as_bytes())
|
||||
f.write_all(GENERATED_TS_HEADER.as_bytes())
|
||||
.with_context(|| format!("Failed to write header to {}", path.display()))?;
|
||||
f.write_all(content.as_bytes())
|
||||
.with_context(|| format!("Failed to write content to {}", path.display()))?;
|
||||
@@ -1858,35 +1942,15 @@ fn ts_files_in_recursive(dir: &Path) -> Result<Vec<PathBuf>> {
|
||||
/// Generate an index.ts file that re-exports all generated types.
|
||||
/// This allows consumers to import all types from a single file.
|
||||
fn generate_index_ts(out_dir: &Path) -> Result<PathBuf> {
|
||||
let mut entries: Vec<String> = Vec::new();
|
||||
let mut stems: Vec<String> = ts_files_in(out_dir)?
|
||||
.into_iter()
|
||||
.filter_map(|p| {
|
||||
let stem = p.file_stem()?.to_string_lossy().into_owned();
|
||||
if stem == "index" { None } else { Some(stem) }
|
||||
})
|
||||
.collect();
|
||||
stems.sort();
|
||||
stems.dedup();
|
||||
|
||||
for name in stems {
|
||||
entries.push(format!("export type {{ {name} }} from \"./{name}\";\n"));
|
||||
}
|
||||
|
||||
// If this is the root out_dir and a ./v2 folder exists with TS files,
|
||||
// expose it as a namespace to avoid symbol collisions at the root.
|
||||
let v2_dir = out_dir.join("v2");
|
||||
let has_v2_ts = ts_files_in(&v2_dir).map(|v| !v.is_empty()).unwrap_or(false);
|
||||
if has_v2_ts {
|
||||
entries.push("export * as v2 from \"./v2\";\n".to_string());
|
||||
}
|
||||
|
||||
let mut content =
|
||||
String::with_capacity(HEADER.len() + entries.iter().map(String::len).sum::<usize>());
|
||||
content.push_str(HEADER);
|
||||
for line in &entries {
|
||||
content.push_str(line);
|
||||
}
|
||||
let content = generated_index_ts_with_header(index_ts_entries(
|
||||
&ts_files_in(out_dir)?
|
||||
.iter()
|
||||
.map(PathBuf::as_path)
|
||||
.collect::<Vec<_>>(),
|
||||
ts_files_in(&out_dir.join("v2"))
|
||||
.map(|v| !v.is_empty())
|
||||
.unwrap_or(false),
|
||||
));
|
||||
|
||||
let index_path = out_dir.join("index.ts");
|
||||
let mut f = fs::File::create(&index_path)
|
||||
@@ -1896,244 +1960,278 @@ fn generate_index_ts(out_dir: &Path) -> Result<PathBuf> {
|
||||
Ok(index_path)
|
||||
}
|
||||
|
||||
pub(crate) fn generate_index_ts_tree(tree: &mut BTreeMap<PathBuf, String>) {
|
||||
let root_entries = tree
|
||||
.keys()
|
||||
.filter(|path| path.components().count() == 1)
|
||||
.map(PathBuf::as_path)
|
||||
.collect::<Vec<_>>();
|
||||
let has_v2_ts = tree.keys().any(|path| {
|
||||
path.parent()
|
||||
.is_some_and(|parent| parent == Path::new("v2"))
|
||||
&& path.extension() == Some(OsStr::new("ts"))
|
||||
&& path.file_stem().is_some_and(|stem| stem != "index")
|
||||
});
|
||||
tree.insert(
|
||||
PathBuf::from("index.ts"),
|
||||
index_ts_entries(&root_entries, has_v2_ts),
|
||||
);
|
||||
|
||||
let v2_entries = tree
|
||||
.keys()
|
||||
.filter(|path| {
|
||||
path.parent()
|
||||
.is_some_and(|parent| parent == Path::new("v2"))
|
||||
})
|
||||
.map(PathBuf::as_path)
|
||||
.collect::<Vec<_>>();
|
||||
if !v2_entries.is_empty() {
|
||||
tree.insert(
|
||||
PathBuf::from("v2").join("index.ts"),
|
||||
index_ts_entries(&v2_entries, false),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn generated_index_ts_with_header(content: String) -> String {
|
||||
let mut with_header = String::with_capacity(GENERATED_TS_HEADER.len() + content.len());
|
||||
with_header.push_str(GENERATED_TS_HEADER);
|
||||
with_header.push_str(&content);
|
||||
with_header
|
||||
}
|
||||
|
||||
fn index_ts_entries(paths: &[&Path], has_v2_ts: bool) -> String {
|
||||
let mut stems: Vec<String> = paths
|
||||
.iter()
|
||||
.filter(|path| path.extension() == Some(OsStr::new("ts")))
|
||||
.filter_map(|path| {
|
||||
let stem = path.file_stem()?.to_string_lossy().into_owned();
|
||||
if stem == "index" { None } else { Some(stem) }
|
||||
})
|
||||
.collect();
|
||||
stems.sort();
|
||||
stems.dedup();
|
||||
|
||||
let mut entries = String::new();
|
||||
for name in stems {
|
||||
entries.push_str(&format!("export type {{ {name} }} from \"./{name}\";\n"));
|
||||
}
|
||||
if has_v2_ts {
|
||||
entries.push_str("export * as v2 from \"./v2\";\n");
|
||||
}
|
||||
entries
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::protocol::v2;
|
||||
use crate::schema_fixtures::read_schema_fixture_subtree;
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::collections::BTreeSet;
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[test]
|
||||
fn generated_ts_optional_nullable_fields_only_in_params() -> Result<()> {
|
||||
// Assert that "?: T | null" only appears in generated *Params types.
|
||||
let output_dir = std::env::temp_dir().join(format!("codex_ts_types_{}", Uuid::now_v7()));
|
||||
fs::create_dir(&output_dir)?;
|
||||
let fixture_tree = read_schema_fixture_subtree(&schema_root()?, "typescript")?;
|
||||
|
||||
struct TempDirGuard(PathBuf);
|
||||
|
||||
impl Drop for TempDirGuard {
|
||||
fn drop(&mut self) {
|
||||
let _ = fs::remove_dir_all(&self.0);
|
||||
}
|
||||
}
|
||||
|
||||
let _guard = TempDirGuard(output_dir.clone());
|
||||
|
||||
// Avoid doing more work than necessary to keep the test from timing out.
|
||||
let options = GenerateTsOptions {
|
||||
generate_indices: false,
|
||||
ensure_headers: false,
|
||||
run_prettier: false,
|
||||
experimental_api: false,
|
||||
};
|
||||
generate_ts_with_options(&output_dir, None, options)?;
|
||||
|
||||
let client_request_ts = fs::read_to_string(output_dir.join("ClientRequest.ts"))?;
|
||||
let client_request_ts = std::str::from_utf8(
|
||||
fixture_tree
|
||||
.get(Path::new("ClientRequest.ts"))
|
||||
.ok_or_else(|| anyhow::anyhow!("missing ClientRequest.ts fixture"))?,
|
||||
)?;
|
||||
assert_eq!(client_request_ts.contains("mock/experimentalMethod"), false);
|
||||
assert_eq!(
|
||||
client_request_ts.contains("MockExperimentalMethodParams"),
|
||||
false
|
||||
);
|
||||
assert_eq!(output_dir.join("EventMsg.ts").exists(), true);
|
||||
let thread_start_ts =
|
||||
fs::read_to_string(output_dir.join("v2").join("ThreadStartParams.ts"))?;
|
||||
assert_eq!(fixture_tree.contains_key(Path::new("EventMsg.ts")), true);
|
||||
let thread_start_ts = std::str::from_utf8(
|
||||
fixture_tree
|
||||
.get(Path::new("v2/ThreadStartParams.ts"))
|
||||
.ok_or_else(|| anyhow::anyhow!("missing v2/ThreadStartParams.ts fixture"))?,
|
||||
)?;
|
||||
assert_eq!(thread_start_ts.contains("mockExperimentalField"), false);
|
||||
assert_eq!(
|
||||
output_dir
|
||||
.join("v2")
|
||||
.join("MockExperimentalMethodParams.ts")
|
||||
.exists(),
|
||||
fixture_tree.contains_key(Path::new("v2/MockExperimentalMethodParams.ts")),
|
||||
false
|
||||
);
|
||||
assert_eq!(
|
||||
output_dir
|
||||
.join("v2")
|
||||
.join("MockExperimentalMethodResponse.ts")
|
||||
.exists(),
|
||||
fixture_tree.contains_key(Path::new("v2/MockExperimentalMethodResponse.ts")),
|
||||
false
|
||||
);
|
||||
|
||||
let mut undefined_offenders = Vec::new();
|
||||
let mut optional_nullable_offenders = BTreeSet::new();
|
||||
let mut stack = vec![output_dir];
|
||||
while let Some(dir) = stack.pop() {
|
||||
for entry in fs::read_dir(&dir)? {
|
||||
let entry = entry?;
|
||||
let path = entry.path();
|
||||
if path.is_dir() {
|
||||
stack.push(path);
|
||||
for (path, contents) in &fixture_tree {
|
||||
if !matches!(path.extension().and_then(|ext| ext.to_str()), Some("ts")) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Only allow "?: T | null" in objects representing JSON-RPC requests,
|
||||
// which we assume are called "*Params".
|
||||
let allow_optional_nullable = path
|
||||
.file_stem()
|
||||
.and_then(|stem| stem.to_str())
|
||||
.is_some_and(|stem| {
|
||||
stem.ends_with("Params")
|
||||
|| stem == "InitializeCapabilities"
|
||||
|| matches!(
|
||||
stem,
|
||||
"CollabAgentRef"
|
||||
| "CollabAgentStatusEntry"
|
||||
| "CollabAgentSpawnEndEvent"
|
||||
| "CollabAgentInteractionEndEvent"
|
||||
| "CollabCloseEndEvent"
|
||||
| "CollabResumeBeginEvent"
|
||||
| "CollabResumeEndEvent"
|
||||
)
|
||||
});
|
||||
|
||||
let contents = std::str::from_utf8(contents)?;
|
||||
if contents.contains("| undefined") {
|
||||
undefined_offenders.push(path.clone());
|
||||
}
|
||||
|
||||
const SKIP_PREFIXES: &[&str] = &[
|
||||
"const ",
|
||||
"let ",
|
||||
"var ",
|
||||
"export const ",
|
||||
"export let ",
|
||||
"export var ",
|
||||
];
|
||||
|
||||
let mut search_start = 0;
|
||||
while let Some(idx) = contents[search_start..].find("| null") {
|
||||
let abs_idx = search_start + idx;
|
||||
// Find the property-colon for this field by scanning forward
|
||||
// from the start of the segment and ignoring nested braces,
|
||||
// brackets, and parens. This avoids colons inside nested
|
||||
// type literals like `{ [k in string]?: string }`.
|
||||
|
||||
let line_start_idx = contents[..abs_idx].rfind('\n').map(|i| i + 1).unwrap_or(0);
|
||||
|
||||
let mut segment_start_idx = line_start_idx;
|
||||
if let Some(rel_idx) = contents[line_start_idx..abs_idx].rfind(',') {
|
||||
segment_start_idx = segment_start_idx.max(line_start_idx + rel_idx + 1);
|
||||
}
|
||||
if let Some(rel_idx) = contents[line_start_idx..abs_idx].rfind('{') {
|
||||
segment_start_idx = segment_start_idx.max(line_start_idx + rel_idx + 1);
|
||||
}
|
||||
if let Some(rel_idx) = contents[line_start_idx..abs_idx].rfind('}') {
|
||||
segment_start_idx = segment_start_idx.max(line_start_idx + rel_idx + 1);
|
||||
}
|
||||
|
||||
// Scan forward for the colon that separates the field name from its type.
|
||||
let mut level_brace = 0_i32;
|
||||
let mut level_brack = 0_i32;
|
||||
let mut level_paren = 0_i32;
|
||||
let mut in_single = false;
|
||||
let mut in_double = false;
|
||||
let mut escape = false;
|
||||
let mut prop_colon_idx = None;
|
||||
for (i, ch) in contents[segment_start_idx..abs_idx].char_indices() {
|
||||
let idx_abs = segment_start_idx + i;
|
||||
if escape {
|
||||
escape = false;
|
||||
continue;
|
||||
}
|
||||
match ch {
|
||||
'\\' => {
|
||||
if in_single || in_double {
|
||||
escape = true;
|
||||
}
|
||||
}
|
||||
'\'' => {
|
||||
if !in_double {
|
||||
in_single = !in_single;
|
||||
}
|
||||
}
|
||||
'"' => {
|
||||
if !in_single {
|
||||
in_double = !in_double;
|
||||
}
|
||||
}
|
||||
'{' if !in_single && !in_double => level_brace += 1,
|
||||
'}' if !in_single && !in_double => level_brace -= 1,
|
||||
'[' if !in_single && !in_double => level_brack += 1,
|
||||
']' if !in_single && !in_double => level_brack -= 1,
|
||||
'(' if !in_single && !in_double => level_paren += 1,
|
||||
')' if !in_single && !in_double => level_paren -= 1,
|
||||
':' if !in_single
|
||||
&& !in_double
|
||||
&& level_brace == 0
|
||||
&& level_brack == 0
|
||||
&& level_paren == 0 =>
|
||||
{
|
||||
prop_colon_idx = Some(idx_abs);
|
||||
break;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
let Some(colon_idx) = prop_colon_idx else {
|
||||
search_start = abs_idx + 5;
|
||||
continue;
|
||||
};
|
||||
|
||||
let mut field_prefix = contents[segment_start_idx..colon_idx].trim();
|
||||
if field_prefix.is_empty() {
|
||||
search_start = abs_idx + 5;
|
||||
continue;
|
||||
}
|
||||
|
||||
if matches!(path.extension().and_then(|ext| ext.to_str()), Some("ts")) {
|
||||
// Only allow "?: T | null" in objects representing JSON-RPC requests,
|
||||
// which we assume are called "*Params".
|
||||
let allow_optional_nullable = path
|
||||
.file_stem()
|
||||
.and_then(|stem| stem.to_str())
|
||||
.is_some_and(|stem| {
|
||||
stem.ends_with("Params")
|
||||
|| stem == "InitializeCapabilities"
|
||||
|| matches!(
|
||||
stem,
|
||||
"CollabAgentRef"
|
||||
| "CollabAgentStatusEntry"
|
||||
| "CollabAgentSpawnEndEvent"
|
||||
| "CollabAgentInteractionEndEvent"
|
||||
| "CollabCloseEndEvent"
|
||||
| "CollabResumeBeginEvent"
|
||||
| "CollabResumeEndEvent"
|
||||
)
|
||||
});
|
||||
|
||||
let contents = fs::read_to_string(&path)?;
|
||||
if contents.contains("| undefined") {
|
||||
undefined_offenders.push(path.clone());
|
||||
}
|
||||
|
||||
const SKIP_PREFIXES: &[&str] = &[
|
||||
"const ",
|
||||
"let ",
|
||||
"var ",
|
||||
"export const ",
|
||||
"export let ",
|
||||
"export var ",
|
||||
];
|
||||
|
||||
let mut search_start = 0;
|
||||
while let Some(idx) = contents[search_start..].find("| null") {
|
||||
let abs_idx = search_start + idx;
|
||||
// Find the property-colon for this field by scanning forward
|
||||
// from the start of the segment and ignoring nested braces,
|
||||
// brackets, and parens. This avoids colons inside nested
|
||||
// type literals like `{ [k in string]?: string }`.
|
||||
|
||||
let line_start_idx =
|
||||
contents[..abs_idx].rfind('\n').map(|i| i + 1).unwrap_or(0);
|
||||
|
||||
let mut segment_start_idx = line_start_idx;
|
||||
if let Some(rel_idx) = contents[line_start_idx..abs_idx].rfind(',') {
|
||||
segment_start_idx = segment_start_idx.max(line_start_idx + rel_idx + 1);
|
||||
}
|
||||
if let Some(rel_idx) = contents[line_start_idx..abs_idx].rfind('{') {
|
||||
segment_start_idx = segment_start_idx.max(line_start_idx + rel_idx + 1);
|
||||
}
|
||||
if let Some(rel_idx) = contents[line_start_idx..abs_idx].rfind('}') {
|
||||
segment_start_idx = segment_start_idx.max(line_start_idx + rel_idx + 1);
|
||||
}
|
||||
|
||||
// Scan forward for the colon that separates the field name from its type.
|
||||
let mut level_brace = 0_i32;
|
||||
let mut level_brack = 0_i32;
|
||||
let mut level_paren = 0_i32;
|
||||
let mut in_single = false;
|
||||
let mut in_double = false;
|
||||
let mut escape = false;
|
||||
let mut prop_colon_idx = None;
|
||||
for (i, ch) in contents[segment_start_idx..abs_idx].char_indices() {
|
||||
let idx_abs = segment_start_idx + i;
|
||||
if escape {
|
||||
escape = false;
|
||||
continue;
|
||||
}
|
||||
match ch {
|
||||
'\\' => {
|
||||
// Only treat as escape when inside a string.
|
||||
if in_single || in_double {
|
||||
escape = true;
|
||||
}
|
||||
}
|
||||
'\'' => {
|
||||
if !in_double {
|
||||
in_single = !in_single;
|
||||
}
|
||||
}
|
||||
'"' => {
|
||||
if !in_single {
|
||||
in_double = !in_double;
|
||||
}
|
||||
}
|
||||
'{' if !in_single && !in_double => level_brace += 1,
|
||||
'}' if !in_single && !in_double => level_brace -= 1,
|
||||
'[' if !in_single && !in_double => level_brack += 1,
|
||||
']' if !in_single && !in_double => level_brack -= 1,
|
||||
'(' if !in_single && !in_double => level_paren += 1,
|
||||
')' if !in_single && !in_double => level_paren -= 1,
|
||||
':' if !in_single
|
||||
&& !in_double
|
||||
&& level_brace == 0
|
||||
&& level_brack == 0
|
||||
&& level_paren == 0 =>
|
||||
{
|
||||
prop_colon_idx = Some(idx_abs);
|
||||
break;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
let Some(colon_idx) = prop_colon_idx else {
|
||||
search_start = abs_idx + 5;
|
||||
continue;
|
||||
};
|
||||
|
||||
let mut field_prefix = contents[segment_start_idx..colon_idx].trim();
|
||||
if field_prefix.is_empty() {
|
||||
search_start = abs_idx + 5;
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(comment_idx) = field_prefix.rfind("*/") {
|
||||
field_prefix = field_prefix[comment_idx + 2..].trim_start();
|
||||
}
|
||||
|
||||
if field_prefix.is_empty() {
|
||||
search_start = abs_idx + 5;
|
||||
continue;
|
||||
}
|
||||
|
||||
if SKIP_PREFIXES
|
||||
.iter()
|
||||
.any(|prefix| field_prefix.starts_with(prefix))
|
||||
{
|
||||
search_start = abs_idx + 5;
|
||||
continue;
|
||||
}
|
||||
|
||||
if field_prefix.contains('(') {
|
||||
search_start = abs_idx + 5;
|
||||
continue;
|
||||
}
|
||||
|
||||
// If the last non-whitespace before ':' is '?', then this is an
|
||||
// optional field with a nullable type (i.e., "?: T | null").
|
||||
// These are only allowed in *Params types.
|
||||
if field_prefix.chars().rev().find(|c| !c.is_whitespace()) == Some('?')
|
||||
&& !allow_optional_nullable
|
||||
{
|
||||
let line_number =
|
||||
contents[..abs_idx].chars().filter(|c| *c == '\n').count() + 1;
|
||||
let offending_line_end = contents[line_start_idx..]
|
||||
.find('\n')
|
||||
.map(|i| line_start_idx + i)
|
||||
.unwrap_or(contents.len());
|
||||
let offending_snippet =
|
||||
contents[line_start_idx..offending_line_end].trim();
|
||||
|
||||
optional_nullable_offenders.insert(format!(
|
||||
"{}:{}: {offending_snippet}",
|
||||
path.display(),
|
||||
line_number
|
||||
));
|
||||
}
|
||||
|
||||
search_start = abs_idx + 5;
|
||||
}
|
||||
if let Some(comment_idx) = field_prefix.rfind("*/") {
|
||||
field_prefix = field_prefix[comment_idx + 2..].trim_start();
|
||||
}
|
||||
|
||||
if field_prefix.is_empty() {
|
||||
search_start = abs_idx + 5;
|
||||
continue;
|
||||
}
|
||||
|
||||
if SKIP_PREFIXES
|
||||
.iter()
|
||||
.any(|prefix| field_prefix.starts_with(prefix))
|
||||
{
|
||||
search_start = abs_idx + 5;
|
||||
continue;
|
||||
}
|
||||
|
||||
if field_prefix.contains('(') {
|
||||
search_start = abs_idx + 5;
|
||||
continue;
|
||||
}
|
||||
|
||||
// If the last non-whitespace before ':' is '?', then this is an
|
||||
// optional field with a nullable type (i.e., "?: T | null").
|
||||
// These are only allowed in *Params types.
|
||||
if field_prefix.chars().rev().find(|c| !c.is_whitespace()) == Some('?')
|
||||
&& !allow_optional_nullable
|
||||
{
|
||||
let line_number =
|
||||
contents[..abs_idx].chars().filter(|c| *c == '\n').count() + 1;
|
||||
let offending_line_end = contents[line_start_idx..]
|
||||
.find('\n')
|
||||
.map(|i| line_start_idx + i)
|
||||
.unwrap_or(contents.len());
|
||||
let offending_snippet = contents[line_start_idx..offending_line_end].trim();
|
||||
|
||||
optional_nullable_offenders.insert(format!(
|
||||
"{}:{}: {offending_snippet}",
|
||||
path.display(),
|
||||
line_number
|
||||
));
|
||||
}
|
||||
|
||||
search_start = abs_idx + 5;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2153,55 +2251,40 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn schema_root() -> Result<PathBuf> {
|
||||
let typescript_index = codex_utils_cargo_bin::find_resource!("schema/typescript/index.ts")
|
||||
.context("resolve TypeScript schema index.ts")?;
|
||||
let schema_root = typescript_index
|
||||
.parent()
|
||||
.and_then(|parent| parent.parent())
|
||||
.context("derive schema root from schema/typescript/index.ts")?
|
||||
.to_path_buf();
|
||||
Ok(schema_root)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn generate_ts_with_experimental_api_retains_experimental_entries() -> Result<()> {
|
||||
let output_dir =
|
||||
std::env::temp_dir().join(format!("codex_ts_types_experimental_{}", Uuid::now_v7()));
|
||||
fs::create_dir(&output_dir)?;
|
||||
|
||||
struct TempDirGuard(PathBuf);
|
||||
|
||||
impl Drop for TempDirGuard {
|
||||
fn drop(&mut self) {
|
||||
let _ = fs::remove_dir_all(&self.0);
|
||||
}
|
||||
}
|
||||
|
||||
let _guard = TempDirGuard(output_dir.clone());
|
||||
|
||||
let options = GenerateTsOptions {
|
||||
generate_indices: false,
|
||||
ensure_headers: false,
|
||||
run_prettier: false,
|
||||
experimental_api: true,
|
||||
};
|
||||
generate_ts_with_options(&output_dir, None, options)?;
|
||||
|
||||
let client_request_ts = fs::read_to_string(output_dir.join("ClientRequest.ts"))?;
|
||||
let client_request_ts = ClientRequest::export_to_string()?;
|
||||
assert_eq!(client_request_ts.contains("mock/experimentalMethod"), true);
|
||||
assert_eq!(
|
||||
output_dir
|
||||
.join("v2")
|
||||
.join("MockExperimentalMethodParams.ts")
|
||||
.exists(),
|
||||
client_request_ts.contains("MockExperimentalMethodParams"),
|
||||
true
|
||||
);
|
||||
assert_eq!(
|
||||
output_dir
|
||||
.join("v2")
|
||||
.join("MockExperimentalMethodResponse.ts")
|
||||
.exists(),
|
||||
v2::MockExperimentalMethodParams::export_to_string()?
|
||||
.contains("MockExperimentalMethodParams"),
|
||||
true
|
||||
);
|
||||
assert_eq!(
|
||||
v2::MockExperimentalMethodResponse::export_to_string()?
|
||||
.contains("MockExperimentalMethodResponse"),
|
||||
true
|
||||
);
|
||||
|
||||
let thread_start_ts =
|
||||
fs::read_to_string(output_dir.join("v2").join("ThreadStartParams.ts"))?;
|
||||
let thread_start_ts = v2::ThreadStartParams::export_to_string()?;
|
||||
assert_eq!(thread_start_ts.contains("mockExperimentalField"), true);
|
||||
let command_execution_request_approval_ts = fs::read_to_string(
|
||||
output_dir
|
||||
.join("v2")
|
||||
.join("CommandExecutionRequestApprovalParams.ts"),
|
||||
)?;
|
||||
let command_execution_request_approval_ts =
|
||||
v2::CommandExecutionRequestApprovalParams::export_to_string()?;
|
||||
assert_eq!(
|
||||
command_execution_request_approval_ts.contains("additionalPermissions"),
|
||||
true
|
||||
|
||||
@@ -17,6 +17,9 @@ pub use protocol::thread_history::*;
|
||||
pub use protocol::v1::*;
|
||||
pub use protocol::v2::*;
|
||||
pub use schema_fixtures::SchemaFixtureOptions;
|
||||
#[doc(hidden)]
|
||||
pub use schema_fixtures::generate_typescript_schema_fixture_subtree_for_tests;
|
||||
pub use schema_fixtures::read_schema_fixture_subtree;
|
||||
pub use schema_fixtures::read_schema_fixture_tree;
|
||||
pub use schema_fixtures::write_schema_fixtures;
|
||||
pub use schema_fixtures::write_schema_fixtures_with_options;
|
||||
|
||||
@@ -151,6 +151,12 @@ macro_rules! client_request_definitions {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn visit_client_response_types(v: &mut impl ::ts_rs::TypeVisitor) {
|
||||
$(
|
||||
v.visit::<$response>();
|
||||
)*
|
||||
}
|
||||
|
||||
#[allow(clippy::vec_init_then_push)]
|
||||
pub fn export_client_response_schemas(
|
||||
out_dir: &::std::path::Path,
|
||||
@@ -525,6 +531,12 @@ macro_rules! server_request_definitions {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn visit_server_response_types(v: &mut impl ::ts_rs::TypeVisitor) {
|
||||
$(
|
||||
v.visit::<$response>();
|
||||
)*
|
||||
}
|
||||
|
||||
#[allow(clippy::vec_init_then_push)]
|
||||
pub fn export_server_response_schemas(
|
||||
out_dir: &Path,
|
||||
|
||||
@@ -1,11 +1,25 @@
|
||||
use crate::ClientNotification;
|
||||
use crate::ClientRequest;
|
||||
use crate::ServerNotification;
|
||||
use crate::ServerRequest;
|
||||
use crate::export::GENERATED_TS_HEADER;
|
||||
use crate::export::filter_experimental_ts_tree;
|
||||
use crate::export::generate_index_ts_tree;
|
||||
use crate::protocol::common::visit_client_response_types;
|
||||
use crate::protocol::common::visit_server_response_types;
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use serde_json::Map;
|
||||
use serde_json::Value;
|
||||
use std::any::TypeId;
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::HashSet;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use ts_rs::TS;
|
||||
use ts_rs::TypeVisitor;
|
||||
|
||||
#[derive(Clone, Copy, Debug, Default)]
|
||||
pub struct SchemaFixtureOptions {
|
||||
@@ -27,6 +41,42 @@ pub fn read_schema_fixture_tree(schema_root: &Path) -> Result<BTreeMap<PathBuf,
|
||||
Ok(all)
|
||||
}
|
||||
|
||||
pub fn read_schema_fixture_subtree(
|
||||
schema_root: &Path,
|
||||
label: &str,
|
||||
) -> Result<BTreeMap<PathBuf, Vec<u8>>> {
|
||||
let subtree_root = schema_root.join(label);
|
||||
collect_files_recursive(&subtree_root)
|
||||
.with_context(|| format!("read schema fixture subtree {}", subtree_root.display()))
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
pub fn generate_typescript_schema_fixture_subtree_for_tests() -> Result<BTreeMap<PathBuf, Vec<u8>>>
|
||||
{
|
||||
let mut files = BTreeMap::new();
|
||||
let mut seen = HashSet::new();
|
||||
|
||||
collect_typescript_fixture_file::<ClientRequest>(&mut files, &mut seen)?;
|
||||
visit_typescript_fixture_dependencies(&mut files, &mut seen, |visitor| {
|
||||
visit_client_response_types(visitor);
|
||||
})?;
|
||||
collect_typescript_fixture_file::<ClientNotification>(&mut files, &mut seen)?;
|
||||
collect_typescript_fixture_file::<ServerRequest>(&mut files, &mut seen)?;
|
||||
visit_typescript_fixture_dependencies(&mut files, &mut seen, |visitor| {
|
||||
visit_server_response_types(visitor);
|
||||
})?;
|
||||
collect_typescript_fixture_file::<ServerNotification>(&mut files, &mut seen)?;
|
||||
collect_typescript_fixture_file::<EventMsg>(&mut files, &mut seen)?;
|
||||
|
||||
filter_experimental_ts_tree(&mut files)?;
|
||||
generate_index_ts_tree(&mut files);
|
||||
|
||||
Ok(files
|
||||
.into_iter()
|
||||
.map(|(path, content)| (path, content.into_bytes()))
|
||||
.collect())
|
||||
}
|
||||
|
||||
/// Regenerates `schema/typescript/` and `schema/json/`.
|
||||
///
|
||||
/// This is intended to be used by tooling (e.g., `just write-app-server-schema`).
|
||||
@@ -86,6 +136,12 @@ fn read_file_bytes(path: &Path) -> Result<Vec<u8>> {
|
||||
let text = String::from_utf8(bytes)
|
||||
.with_context(|| format!("expected UTF-8 TypeScript in {}", path.display()))?;
|
||||
let text = text.replace("\r\n", "\n").replace('\r', "\n");
|
||||
// Fixture comparisons care about schema content, not whether the generator
|
||||
// re-prepended the standard banner to every TypeScript file.
|
||||
let text = text
|
||||
.strip_prefix(GENERATED_TS_HEADER)
|
||||
.unwrap_or(&text)
|
||||
.to_string();
|
||||
return Ok(text.into_bytes());
|
||||
}
|
||||
Ok(bytes)
|
||||
@@ -209,6 +265,73 @@ fn collect_files_recursive(root: &Path) -> Result<BTreeMap<PathBuf, Vec<u8>>> {
|
||||
Ok(files)
|
||||
}
|
||||
|
||||
fn collect_typescript_fixture_file<T: TS + 'static + ?Sized>(
|
||||
files: &mut BTreeMap<PathBuf, String>,
|
||||
seen: &mut HashSet<TypeId>,
|
||||
) -> Result<()> {
|
||||
let Some(output_path) = T::output_path() else {
|
||||
return Ok(());
|
||||
};
|
||||
if !seen.insert(TypeId::of::<T>()) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let contents = T::export_to_string().context("export TypeScript fixture content")?;
|
||||
let output_path = normalize_relative_fixture_path(&output_path);
|
||||
files.insert(
|
||||
output_path,
|
||||
contents.replace("\r\n", "\n").replace('\r', "\n"),
|
||||
);
|
||||
|
||||
let mut visitor = TypeScriptFixtureCollector {
|
||||
files,
|
||||
seen,
|
||||
error: None,
|
||||
};
|
||||
T::visit_dependencies(&mut visitor);
|
||||
if let Some(error) = visitor.error {
|
||||
return Err(error);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn normalize_relative_fixture_path(path: &Path) -> PathBuf {
|
||||
path.components().collect()
|
||||
}
|
||||
|
||||
fn visit_typescript_fixture_dependencies(
|
||||
files: &mut BTreeMap<PathBuf, String>,
|
||||
seen: &mut HashSet<TypeId>,
|
||||
visit: impl FnOnce(&mut TypeScriptFixtureCollector<'_>),
|
||||
) -> Result<()> {
|
||||
let mut visitor = TypeScriptFixtureCollector {
|
||||
files,
|
||||
seen,
|
||||
error: None,
|
||||
};
|
||||
visit(&mut visitor);
|
||||
if let Some(error) = visitor.error {
|
||||
return Err(error);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct TypeScriptFixtureCollector<'a> {
|
||||
files: &'a mut BTreeMap<PathBuf, String>,
|
||||
seen: &'a mut HashSet<TypeId>,
|
||||
error: Option<anyhow::Error>,
|
||||
}
|
||||
|
||||
impl TypeVisitor for TypeScriptFixtureCollector<'_> {
|
||||
fn visit<T: TS + 'static + ?Sized>(&mut self) {
|
||||
if self.error.is_some() {
|
||||
return;
|
||||
}
|
||||
self.error = collect_typescript_fixture_file::<T>(self.files, self.seen).err();
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -1,19 +1,60 @@
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use codex_app_server_protocol::read_schema_fixture_tree;
|
||||
use codex_app_server_protocol::write_schema_fixtures;
|
||||
use codex_app_server_protocol::generate_json_with_experimental;
|
||||
use codex_app_server_protocol::generate_typescript_schema_fixture_subtree_for_tests;
|
||||
use codex_app_server_protocol::read_schema_fixture_subtree;
|
||||
use similar::TextDiff;
|
||||
use std::collections::BTreeMap;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
|
||||
#[test]
|
||||
fn schema_fixtures_match_generated() -> Result<()> {
|
||||
fn typescript_schema_fixtures_match_generated() -> Result<()> {
|
||||
let schema_root = schema_root()?;
|
||||
let fixture_tree = read_tree(&schema_root)?;
|
||||
let fixture_tree = read_tree(&schema_root, "typescript")?;
|
||||
let generated_tree = generate_typescript_schema_fixture_subtree_for_tests()
|
||||
.context("generate in-memory typescript schema fixtures")?;
|
||||
|
||||
assert_schema_trees_match("typescript", &fixture_tree, &generated_tree)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn json_schema_fixtures_match_generated() -> Result<()> {
|
||||
assert_schema_fixtures_match_generated("json", |output_dir| {
|
||||
generate_json_with_experimental(output_dir, false)
|
||||
})
|
||||
}
|
||||
|
||||
fn assert_schema_fixtures_match_generated(
|
||||
label: &'static str,
|
||||
generate: impl FnOnce(&Path) -> Result<()>,
|
||||
) -> Result<()> {
|
||||
let schema_root = schema_root()?;
|
||||
let fixture_tree = read_tree(&schema_root, label)?;
|
||||
|
||||
let temp_dir = tempfile::tempdir().context("create temp dir")?;
|
||||
write_schema_fixtures(temp_dir.path(), None).context("generate schema fixtures")?;
|
||||
let generated_tree = read_tree(temp_dir.path())?;
|
||||
let generated_root = temp_dir.path().join(label);
|
||||
generate(&generated_root).with_context(|| {
|
||||
format!(
|
||||
"generate {label} schema fixtures into {}",
|
||||
generated_root.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
let generated_tree = read_tree(temp_dir.path(), label)?;
|
||||
|
||||
assert_schema_trees_match(label, &fixture_tree, &generated_tree)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn assert_schema_trees_match(
|
||||
label: &str,
|
||||
fixture_tree: &BTreeMap<PathBuf, Vec<u8>>,
|
||||
generated_tree: &BTreeMap<PathBuf, Vec<u8>>,
|
||||
) -> Result<()> {
|
||||
let fixture_paths = fixture_tree
|
||||
.keys()
|
||||
.map(|p| p.display().to_string())
|
||||
@@ -32,13 +73,13 @@ fn schema_fixtures_match_generated() -> Result<()> {
|
||||
.to_string();
|
||||
|
||||
panic!(
|
||||
"Vendored app-server schema fixture file set doesn't match freshly generated output. \
|
||||
"Vendored {label} app-server schema fixture file set doesn't match freshly generated output. \
|
||||
Run `just write-app-server-schema` to overwrite with your changes.\n\n{diff}"
|
||||
);
|
||||
}
|
||||
|
||||
// If the file sets match, diff contents for each file for a nicer error.
|
||||
for (path, expected) in &fixture_tree {
|
||||
for (path, expected) in fixture_tree {
|
||||
let actual = generated_tree
|
||||
.get(path)
|
||||
.ok_or_else(|| anyhow::anyhow!("missing generated file: {}", path.display()))?;
|
||||
@@ -54,7 +95,7 @@ Run `just write-app-server-schema` to overwrite with your changes.\n\n{diff}"
|
||||
.header("fixture", "generated")
|
||||
.to_string();
|
||||
panic!(
|
||||
"Vendored app-server schema fixture {} differs from generated output. \
|
||||
"Vendored {label} app-server schema fixture {} differs from generated output. \
|
||||
Run `just write-app-server-schema` to overwrite with your changes.\n\n{diff}",
|
||||
path.display()
|
||||
);
|
||||
@@ -63,7 +104,7 @@ Run `just write-app-server-schema` to overwrite with your changes.\n\n{diff}",
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn schema_root() -> Result<std::path::PathBuf> {
|
||||
fn schema_root() -> Result<PathBuf> {
|
||||
// In Bazel runfiles (especially manifest-only mode), resolving directories is not
|
||||
// reliable. Resolve a known file, then walk up to the schema root.
|
||||
let typescript_index = codex_utils_cargo_bin::find_resource!("schema/typescript/index.ts")
|
||||
@@ -92,6 +133,11 @@ fn schema_root() -> Result<std::path::PathBuf> {
|
||||
Ok(schema_root)
|
||||
}
|
||||
|
||||
fn read_tree(root: &Path) -> Result<std::collections::BTreeMap<std::path::PathBuf, Vec<u8>>> {
|
||||
read_schema_fixture_tree(root).context("read schema fixture tree")
|
||||
fn read_tree(root: &Path, label: &str) -> Result<BTreeMap<PathBuf, Vec<u8>>> {
|
||||
read_schema_fixture_subtree(root, label).with_context(|| {
|
||||
format!(
|
||||
"read {label} schema fixture subtree from {}",
|
||||
root.display()
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -8,6 +8,10 @@ license.workspace = true
|
||||
name = "codex-app-server"
|
||||
path = "src/main.rs"
|
||||
|
||||
[[bin]]
|
||||
name = "codex-app-server-test-notify-capture"
|
||||
path = "src/bin/notify_capture.rs"
|
||||
|
||||
[lib]
|
||||
name = "codex_app_server"
|
||||
path = "src/lib.rs"
|
||||
|
||||
44
codex-rs/app-server/src/bin/notify_capture.rs
Normal file
44
codex-rs/app-server/src/bin/notify_capture.rs
Normal file
@@ -0,0 +1,44 @@
|
||||
use std::env;
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
use std::io::Write;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use anyhow::anyhow;
|
||||
use anyhow::bail;
|
||||
|
||||
fn main() -> Result<()> {
|
||||
let mut args = env::args_os();
|
||||
let _program = args.next();
|
||||
let output_path = PathBuf::from(
|
||||
args.next()
|
||||
.ok_or_else(|| anyhow!("expected output path as first argument"))?,
|
||||
);
|
||||
let payload = args
|
||||
.next()
|
||||
.ok_or_else(|| anyhow!("expected payload as final argument"))?;
|
||||
|
||||
if args.next().is_some() {
|
||||
bail!("expected payload as final argument");
|
||||
}
|
||||
|
||||
let payload = payload.to_string_lossy();
|
||||
let temp_path = PathBuf::from(format!("{}.tmp", output_path.display()));
|
||||
let mut file = File::create(&temp_path)
|
||||
.with_context(|| format!("failed to create {}", temp_path.display()))?;
|
||||
file.write_all(payload.as_bytes())
|
||||
.with_context(|| format!("failed to write {}", temp_path.display()))?;
|
||||
file.sync_all()
|
||||
.with_context(|| format!("failed to sync {}", temp_path.display()))?;
|
||||
fs::rename(&temp_path, &output_path).with_context(|| {
|
||||
format!(
|
||||
"failed to move {} into {}",
|
||||
temp_path.display(),
|
||||
output_path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -711,7 +711,6 @@ pub async fn run_main_with_transport(
|
||||
request,
|
||||
transport,
|
||||
&mut connection_state.session,
|
||||
&connection_state.outbound_initialized,
|
||||
)
|
||||
.await;
|
||||
if let Ok(mut opted_out_notification_methods) = connection_state
|
||||
@@ -734,7 +733,15 @@ pub async fn run_main_with_transport(
|
||||
std::sync::atomic::Ordering::Release,
|
||||
);
|
||||
if !was_initialized && connection_state.session.initialized {
|
||||
processor.send_initialize_notifications().await;
|
||||
processor
|
||||
.send_initialize_notifications_to_connection(
|
||||
connection_id,
|
||||
)
|
||||
.await;
|
||||
processor.connection_initialized(connection_id).await;
|
||||
connection_state
|
||||
.outbound_initialized
|
||||
.store(true, std::sync::atomic::Ordering::Release);
|
||||
}
|
||||
}
|
||||
JSONRPCMessage::Response(response) => {
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use crate::codex_message_processor::CodexMessageProcessor;
|
||||
use crate::codex_message_processor::CodexMessageProcessorArgs;
|
||||
@@ -233,7 +231,6 @@ impl MessageProcessor {
|
||||
request: JSONRPCRequest,
|
||||
transport: AppServerTransport,
|
||||
session: &mut ConnectionSessionState,
|
||||
outbound_initialized: &AtomicBool,
|
||||
) {
|
||||
let request_span =
|
||||
crate::app_server_tracing::request_span(&request, transport, connection_id, session);
|
||||
@@ -349,10 +346,6 @@ impl MessageProcessor {
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
|
||||
session.initialized = true;
|
||||
outbound_initialized.store(true, Ordering::Release);
|
||||
self.codex_message_processor
|
||||
.connection_initialized(connection_id)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -470,10 +463,22 @@ impl MessageProcessor {
|
||||
self.codex_message_processor.thread_created_receiver()
|
||||
}
|
||||
|
||||
pub(crate) async fn send_initialize_notifications(&self) {
|
||||
pub(crate) async fn connection_initialized(&self, connection_id: ConnectionId) {
|
||||
self.codex_message_processor
|
||||
.connection_initialized(connection_id)
|
||||
.await;
|
||||
}
|
||||
|
||||
pub(crate) async fn send_initialize_notifications_to_connection(
|
||||
&self,
|
||||
connection_id: ConnectionId,
|
||||
) {
|
||||
for notification in self.config_warnings.iter().cloned() {
|
||||
self.outgoing
|
||||
.send_server_notification(ServerNotification::ConfigWarning(notification))
|
||||
.send_server_notification_to_connections(
|
||||
&[connection_id],
|
||||
ServerNotification::ConfigWarning(notification),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,6 +33,9 @@ sandbox_mode = "danger-full-access"
|
||||
|
||||
model_provider = "mock_provider"
|
||||
|
||||
[features]
|
||||
shell_snapshot = false
|
||||
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "http://127.0.0.1:0/v1"
|
||||
@@ -53,6 +56,9 @@ fn create_config_toml(codex_home: &Path) -> std::io::Result<()> {
|
||||
model = "mock-model"
|
||||
approval_policy = "never"
|
||||
sandbox_mode = "danger-full-access"
|
||||
|
||||
[features]
|
||||
shell_snapshot = false
|
||||
"#,
|
||||
)
|
||||
}
|
||||
@@ -65,6 +71,9 @@ model = "mock-model"
|
||||
approval_policy = "never"
|
||||
sandbox_mode = "danger-full-access"
|
||||
forced_login_method = "{forced_method}"
|
||||
|
||||
[features]
|
||||
shell_snapshot = false
|
||||
"#
|
||||
);
|
||||
std::fs::write(config_toml, contents)
|
||||
|
||||
@@ -7,6 +7,7 @@ use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use std::path::Path;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
|
||||
@@ -23,7 +24,23 @@ enum FileExpectation {
|
||||
NonEmpty,
|
||||
}
|
||||
|
||||
fn create_config_toml(codex_home: &Path) -> std::io::Result<()> {
|
||||
let config_toml = codex_home.join("config.toml");
|
||||
std::fs::write(
|
||||
config_toml,
|
||||
r#"
|
||||
model = "mock-model"
|
||||
approval_policy = "never"
|
||||
sandbox_mode = "danger-full-access"
|
||||
|
||||
[features]
|
||||
shell_snapshot = false
|
||||
"#,
|
||||
)
|
||||
}
|
||||
|
||||
async fn initialized_mcp(codex_home: &TempDir) -> Result<McpProcess> {
|
||||
create_config_toml(codex_home.path())?;
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
Ok(mcp)
|
||||
@@ -164,6 +181,7 @@ async fn assert_no_session_updates_for(
|
||||
async fn test_fuzzy_file_search_sorts_and_includes_indices() -> Result<()> {
|
||||
// Prepare a temporary Codex home and a separate root with test files.
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path())?;
|
||||
let root = TempDir::new()?;
|
||||
|
||||
// Create files designed to have deterministic ordering for query "abe".
|
||||
@@ -235,6 +253,7 @@ async fn test_fuzzy_file_search_sorts_and_includes_indices() -> Result<()> {
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_fuzzy_file_search_accepts_cancellation_token() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path())?;
|
||||
let root = TempDir::new()?;
|
||||
|
||||
std::fs::write(root.path().join("alpha.txt"), "contents")?;
|
||||
@@ -434,7 +453,7 @@ async fn test_fuzzy_file_search_session_update_after_stop_fails() -> Result<()>
|
||||
async fn test_fuzzy_file_search_session_stops_sending_updates_after_stop() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let root = TempDir::new()?;
|
||||
for i in 0..2_000 {
|
||||
for i in 0..512 {
|
||||
let file_path = root.path().join(format!("file-{i:04}.txt"));
|
||||
std::fs::write(file_path, "contents")?;
|
||||
}
|
||||
|
||||
@@ -83,6 +83,9 @@ sandbox_mode = "danger-full-access"
|
||||
|
||||
model_provider = "mock_provider"
|
||||
|
||||
[features]
|
||||
shell_snapshot = false
|
||||
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "{base_url}"
|
||||
|
||||
@@ -14,6 +14,7 @@ use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::TurnStartResponse;
|
||||
use codex_app_server_protocol::UserInput as V2UserInput;
|
||||
use codex_utils_cargo_bin::cargo_bin;
|
||||
use core_test_support::fs_wait;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::Value;
|
||||
@@ -191,29 +192,22 @@ async fn turn_start_notify_payload_includes_initialize_client_name() -> Result<(
|
||||
let responses = vec![create_final_assistant_message_sse_response("Done")?];
|
||||
let server = create_mock_responses_server_sequence_unchecked(responses).await;
|
||||
let codex_home = TempDir::new()?;
|
||||
let notify_script = codex_home.path().join("notify.py");
|
||||
std::fs::write(
|
||||
¬ify_script,
|
||||
r#"from pathlib import Path
|
||||
import sys
|
||||
|
||||
payload_path = Path(__file__).with_name("notify.json")
|
||||
tmp_path = payload_path.with_suffix(".json.tmp")
|
||||
tmp_path.write_text(sys.argv[-1], encoding="utf-8")
|
||||
tmp_path.replace(payload_path)
|
||||
"#,
|
||||
)?;
|
||||
let notify_file = codex_home.path().join("notify.json");
|
||||
let notify_script = notify_script
|
||||
let notify_capture = cargo_bin("codex-app-server-test-notify-capture")?;
|
||||
let notify_capture = notify_capture
|
||||
.to_str()
|
||||
.expect("notify script path should be valid UTF-8");
|
||||
.expect("notify capture path should be valid UTF-8");
|
||||
let notify_file_str = notify_file
|
||||
.to_str()
|
||||
.expect("notify file path should be valid UTF-8");
|
||||
create_config_toml_with_extra(
|
||||
codex_home.path(),
|
||||
&server.uri(),
|
||||
"never",
|
||||
&format!(
|
||||
"notify = [\"python3\", {}]",
|
||||
toml_basic_string(notify_script)
|
||||
"notify = [{}, {}]",
|
||||
toml_basic_string(notify_capture),
|
||||
toml_basic_string(notify_file_str)
|
||||
),
|
||||
)?;
|
||||
|
||||
@@ -297,6 +291,9 @@ model_provider = "mock_provider"
|
||||
|
||||
{extra}
|
||||
|
||||
[features]
|
||||
shell_snapshot = false
|
||||
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "{server_uri}/v1"
|
||||
|
||||
@@ -4,7 +4,7 @@ use app_test_support::create_apply_patch_sse_response;
|
||||
use app_test_support::create_fake_rollout_with_text_elements;
|
||||
use app_test_support::create_final_assistant_message_sse_response;
|
||||
use app_test_support::create_mock_responses_server_repeating_assistant;
|
||||
use app_test_support::create_mock_responses_server_sequence;
|
||||
use app_test_support::create_mock_responses_server_sequence_unchecked;
|
||||
use app_test_support::create_shell_command_sse_response;
|
||||
use app_test_support::rollout_path;
|
||||
use app_test_support::to_response;
|
||||
@@ -59,6 +59,36 @@ use uuid::Uuid;
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
const CODEX_5_2_INSTRUCTIONS_TEMPLATE_DEFAULT: &str = "You are Codex, a coding agent based on GPT-5. You and the user share the same workspace and collaborate to achieve the user's goals.";
|
||||
|
||||
async fn wait_for_responses_request_count(
|
||||
server: &wiremock::MockServer,
|
||||
expected_count: usize,
|
||||
) -> Result<()> {
|
||||
timeout(DEFAULT_READ_TIMEOUT, async {
|
||||
loop {
|
||||
let Some(requests) = server.received_requests().await else {
|
||||
anyhow::bail!("wiremock did not record requests");
|
||||
};
|
||||
let responses_request_count = requests
|
||||
.iter()
|
||||
.filter(|request| {
|
||||
request.method == "POST" && request.url.path().ends_with("/responses")
|
||||
})
|
||||
.count();
|
||||
if responses_request_count == expected_count {
|
||||
return Ok::<(), anyhow::Error>(());
|
||||
}
|
||||
if responses_request_count > expected_count {
|
||||
anyhow::bail!(
|
||||
"expected exactly {expected_count} /responses requests, got {responses_request_count}"
|
||||
);
|
||||
}
|
||||
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
|
||||
}
|
||||
})
|
||||
.await??;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_resume_rejects_unmaterialized_thread() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
@@ -866,7 +896,7 @@ async fn thread_resume_replays_pending_command_execution_request_approval() -> R
|
||||
)?,
|
||||
create_final_assistant_message_sse_response("done")?,
|
||||
];
|
||||
let server = create_mock_responses_server_sequence(responses).await;
|
||||
let server = create_mock_responses_server_sequence_unchecked(responses).await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
@@ -981,6 +1011,7 @@ async fn thread_resume_replays_pending_command_execution_request_approval() -> R
|
||||
primary.read_stream_until_notification_message("turn/completed"),
|
||||
)
|
||||
.await??;
|
||||
wait_for_responses_request_count(&server, 3).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1003,7 +1034,7 @@ async fn thread_resume_replays_pending_file_change_request_approval() -> Result<
|
||||
create_apply_patch_sse_response(patch, "patch-call")?,
|
||||
create_final_assistant_message_sse_response("done")?,
|
||||
];
|
||||
let server = create_mock_responses_server_sequence(responses).await;
|
||||
let server = create_mock_responses_server_sequence_unchecked(responses).await;
|
||||
create_config_toml(&codex_home, &server.uri())?;
|
||||
|
||||
let mut primary = McpProcess::new(&codex_home).await?;
|
||||
@@ -1146,6 +1177,7 @@ async fn thread_resume_replays_pending_file_change_request_approval() -> Result<
|
||||
primary.read_stream_until_notification_message("turn/completed"),
|
||||
)
|
||||
.await??;
|
||||
wait_for_responses_request_count(&server, 3).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -34,6 +34,51 @@ use tokio::time::timeout;
|
||||
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
|
||||
async fn wait_for_responses_request_count_to_stabilize(
|
||||
server: &wiremock::MockServer,
|
||||
expected_count: usize,
|
||||
settle_duration: std::time::Duration,
|
||||
) -> Result<()> {
|
||||
timeout(DEFAULT_READ_TIMEOUT, async {
|
||||
let mut stable_since: Option<tokio::time::Instant> = None;
|
||||
loop {
|
||||
let requests = server
|
||||
.received_requests()
|
||||
.await
|
||||
.context("failed to fetch received requests")?;
|
||||
let responses_request_count = requests
|
||||
.iter()
|
||||
.filter(|request| {
|
||||
request.method == "POST" && request.url.path().ends_with("/responses")
|
||||
})
|
||||
.count();
|
||||
|
||||
if responses_request_count > expected_count {
|
||||
anyhow::bail!(
|
||||
"expected exactly {expected_count} /responses requests, got {responses_request_count}"
|
||||
);
|
||||
}
|
||||
|
||||
if responses_request_count == expected_count {
|
||||
match stable_since {
|
||||
Some(stable_since) if stable_since.elapsed() >= settle_duration => {
|
||||
return Ok::<(), anyhow::Error>(());
|
||||
}
|
||||
None => stable_since = Some(tokio::time::Instant::now()),
|
||||
Some(_) => {}
|
||||
}
|
||||
} else {
|
||||
stable_since = None;
|
||||
}
|
||||
|
||||
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
|
||||
}
|
||||
})
|
||||
.await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_unsubscribe_unloads_thread_and_emits_thread_closed_notification() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
@@ -168,6 +213,13 @@ async fn thread_unsubscribe_during_turn_interrupts_turn_and_emits_thread_closed(
|
||||
};
|
||||
assert_eq!(payload.thread_id, thread_id);
|
||||
|
||||
wait_for_responses_request_count_to_stabilize(
|
||||
&server,
|
||||
1,
|
||||
std::time::Duration::from_millis(200),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -54,6 +54,7 @@ async fn turn_start_shell_zsh_fork_executes_command_v2() -> Result<()> {
|
||||
std::fs::create_dir(&codex_home)?;
|
||||
let workspace = tmp.path().join("workspace");
|
||||
std::fs::create_dir(&workspace)?;
|
||||
let release_marker = workspace.join("interrupt-release");
|
||||
|
||||
let Some(zsh_path) = find_test_zsh_path()? else {
|
||||
eprintln!("skipping zsh fork test: no zsh executable found");
|
||||
@@ -61,8 +62,15 @@ async fn turn_start_shell_zsh_fork_executes_command_v2() -> Result<()> {
|
||||
};
|
||||
eprintln!("using zsh path for zsh-fork test: {}", zsh_path.display());
|
||||
|
||||
// Keep the shell command in flight until we interrupt it. A fast command
|
||||
// like `echo hi` can finish before the interrupt arrives on faster runners,
|
||||
// which turns this into a test for post-command follow-up behavior instead
|
||||
// of interrupting an active zsh-fork command.
|
||||
let release_marker_escaped = release_marker.to_string_lossy().replace('\'', r#"'\''"#);
|
||||
let wait_for_interrupt =
|
||||
format!("while [ ! -f '{release_marker_escaped}' ]; do sleep 0.01; done");
|
||||
let response = create_shell_command_sse_response(
|
||||
vec!["echo".to_string(), "hi".to_string()],
|
||||
vec!["/bin/sh".to_string(), "-c".to_string(), wait_for_interrupt],
|
||||
None,
|
||||
Some(5000),
|
||||
"call-zsh-fork",
|
||||
@@ -155,7 +163,9 @@ async fn turn_start_shell_zsh_fork_executes_command_v2() -> Result<()> {
|
||||
assert_eq!(id, "call-zsh-fork");
|
||||
assert_eq!(status, CommandExecutionStatus::InProgress);
|
||||
assert!(command.starts_with(&zsh_path.display().to_string()));
|
||||
assert!(command.contains(" -lc 'echo hi'"));
|
||||
assert!(command.contains("/bin/sh -c"));
|
||||
assert!(command.contains("sleep 0.01"));
|
||||
assert!(command.contains(&release_marker.display().to_string()));
|
||||
assert_eq!(cwd, workspace);
|
||||
|
||||
mcp.interrupt_turn_and_wait_for_aborted(thread.id, turn.id, DEFAULT_READ_TIMEOUT)
|
||||
@@ -663,22 +673,50 @@ async fn turn_start_shell_zsh_fork_subcommand_decline_marks_parent_declined_v2()
|
||||
};
|
||||
assert_eq!(id, "call-zsh-fork-subcommand-decline");
|
||||
assert_eq!(status, CommandExecutionStatus::Declined);
|
||||
assert!(
|
||||
aggregated_output.is_none()
|
||||
|| aggregated_output == Some("exec command rejected by user".to_string())
|
||||
);
|
||||
if let Some(output) = aggregated_output.as_deref() {
|
||||
assert!(
|
||||
output == "exec command rejected by user"
|
||||
|| output.contains("sandbox denied exec error"),
|
||||
"unexpected aggregated output: {output}"
|
||||
);
|
||||
}
|
||||
|
||||
mcp.interrupt_turn_and_wait_for_aborted(
|
||||
thread.id.clone(),
|
||||
turn.id.clone(),
|
||||
match timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("turn/completed"),
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
{
|
||||
Ok(Ok(completed_notif)) => {
|
||||
let completed: TurnCompletedNotification = serde_json::from_value(
|
||||
completed_notif
|
||||
.params
|
||||
.expect("turn/completed params must be present"),
|
||||
)?;
|
||||
assert_eq!(completed.thread_id, thread.id);
|
||||
assert_eq!(completed.turn.id, turn.id);
|
||||
assert!(matches!(
|
||||
completed.turn.status,
|
||||
TurnStatus::Interrupted | TurnStatus::Completed
|
||||
));
|
||||
}
|
||||
Ok(Err(error)) => return Err(error),
|
||||
Err(_) => {
|
||||
mcp.interrupt_turn_and_wait_for_aborted(
|
||||
thread.id.clone(),
|
||||
turn.id.clone(),
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Err(error)) => return Err(error),
|
||||
Err(_) => {
|
||||
// Some zsh builds abort the turn immediately after the rejected
|
||||
// subcommand without emitting a parent `item/completed`.
|
||||
// subcommand without emitting a parent `item/completed`, and Linux
|
||||
// sandbox failures can also complete the turn before the parent
|
||||
// completion item is observed.
|
||||
let completed_notif = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("turn/completed"),
|
||||
@@ -691,7 +729,10 @@ async fn turn_start_shell_zsh_fork_subcommand_decline_marks_parent_declined_v2()
|
||||
)?;
|
||||
assert_eq!(completed.thread_id, thread.id);
|
||||
assert_eq!(completed.turn.id, turn.id);
|
||||
assert_eq!(completed.turn.status, TurnStatus::Interrupted);
|
||||
assert!(matches!(
|
||||
completed.turn.status,
|
||||
TurnStatus::Interrupted | TurnStatus::Completed
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -34,7 +34,7 @@ codex_rust_crate(
|
||||
"models.json",
|
||||
"prompt.md",
|
||||
],
|
||||
test_data_extra = [
|
||||
test_data_extra = glob(["src/**/snapshots/**"]) + [
|
||||
"config.schema.json",
|
||||
# This is a bit of a hack, but empirically, some of our integration tests
|
||||
# are relying on the presence of this file as a repo root marker. When
|
||||
|
||||
@@ -212,8 +212,9 @@ impl AgentControl {
|
||||
// TODO(jif) add helper for drain
|
||||
state.notify_thread_created(new_thread.thread_id);
|
||||
|
||||
self.maybe_start_completion_watcher(new_thread.thread_id, notification_source)
|
||||
.await;
|
||||
self.send_input(new_thread.thread_id, items).await?;
|
||||
self.maybe_start_completion_watcher(new_thread.thread_id, notification_source);
|
||||
|
||||
Ok(new_thread.thread_id)
|
||||
}
|
||||
@@ -288,7 +289,8 @@ impl AgentControl {
|
||||
// Resumed threads are re-registered in-memory and need the same listener
|
||||
// attachment path as freshly spawned threads.
|
||||
state.notify_thread_created(resumed_thread.thread_id);
|
||||
self.maybe_start_completion_watcher(resumed_thread.thread_id, Some(notification_source));
|
||||
self.maybe_start_completion_watcher(resumed_thread.thread_id, Some(notification_source))
|
||||
.await;
|
||||
|
||||
Ok(resumed_thread.thread_id)
|
||||
}
|
||||
@@ -418,7 +420,7 @@ impl AgentControl {
|
||||
///
|
||||
/// This is only enabled for `SubAgentSource::ThreadSpawn`, where a parent thread exists and
|
||||
/// can receive completion notifications.
|
||||
fn maybe_start_completion_watcher(
|
||||
async fn maybe_start_completion_watcher(
|
||||
&self,
|
||||
child_thread_id: ThreadId,
|
||||
session_source: Option<SessionSource>,
|
||||
@@ -429,10 +431,12 @@ impl AgentControl {
|
||||
else {
|
||||
return;
|
||||
};
|
||||
|
||||
let status_rx = self.subscribe_status(child_thread_id).await.ok();
|
||||
let control = self.clone();
|
||||
tokio::spawn(async move {
|
||||
let status = match control.subscribe_status(child_thread_id).await {
|
||||
Ok(mut status_rx) => {
|
||||
let status = match status_rx {
|
||||
Some(mut status_rx) => {
|
||||
let mut status = status_rx.borrow().clone();
|
||||
while !is_final(&status) {
|
||||
if status_rx.changed().await.is_err() {
|
||||
@@ -443,7 +447,7 @@ impl AgentControl {
|
||||
}
|
||||
status
|
||||
}
|
||||
Err(_) => control.get_status(child_thread_id).await,
|
||||
None => control.get_status(child_thread_id).await,
|
||||
};
|
||||
if !is_final(&status) {
|
||||
return;
|
||||
@@ -1317,10 +1321,34 @@ mod tests {
|
||||
.get_thread(child_thread_id)
|
||||
.await
|
||||
.expect("child thread should exist");
|
||||
let mut status_rx = harness
|
||||
.control
|
||||
.subscribe_status(child_thread_id)
|
||||
.await
|
||||
.expect("status subscription should succeed");
|
||||
let _ = child_thread
|
||||
.submit(Op::Shutdown {})
|
||||
.await
|
||||
.expect("child shutdown should submit");
|
||||
timeout(Duration::from_secs(5), async {
|
||||
loop {
|
||||
let status = status_rx.borrow().clone();
|
||||
if is_final(&status) {
|
||||
break;
|
||||
}
|
||||
if status_rx.changed().await.is_err() {
|
||||
let latest = harness.control.get_status(child_thread_id).await;
|
||||
assert_eq!(
|
||||
is_final(&latest),
|
||||
true,
|
||||
"child status stream closed before a final status was observable"
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
})
|
||||
.await
|
||||
.expect("child should reach a final status");
|
||||
|
||||
assert_eq!(wait_for_subagent_notification(&parent_thread).await, true);
|
||||
}
|
||||
@@ -1331,15 +1359,18 @@ mod tests {
|
||||
let (parent_thread_id, parent_thread) = harness.start_thread().await;
|
||||
let child_thread_id = ThreadId::new();
|
||||
|
||||
harness.control.maybe_start_completion_watcher(
|
||||
child_thread_id,
|
||||
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id,
|
||||
depth: 1,
|
||||
agent_nickname: None,
|
||||
agent_role: Some("explorer".to_string()),
|
||||
})),
|
||||
);
|
||||
harness
|
||||
.control
|
||||
.maybe_start_completion_watcher(
|
||||
child_thread_id,
|
||||
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id,
|
||||
depth: 1,
|
||||
agent_nickname: None,
|
||||
agent_role: Some("explorer".to_string()),
|
||||
})),
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(wait_for_subagent_notification(&parent_thread).await, true);
|
||||
|
||||
|
||||
@@ -6859,6 +6859,12 @@ async fn try_run_sampling_request(
|
||||
|
||||
drain_in_flight(&mut in_flight, sess.clone(), turn_context.clone()).await?;
|
||||
|
||||
if cancellation_token.is_cancelled()
|
||||
&& outcome.as_ref().is_ok_and(|result| result.needs_follow_up)
|
||||
{
|
||||
return Err(CodexErr::TurnAborted);
|
||||
}
|
||||
|
||||
if should_emit_turn_diff {
|
||||
let unified_diff = {
|
||||
let mut tracker = turn_diff_tracker.lock().await;
|
||||
|
||||
@@ -2,24 +2,21 @@ use super::*;
|
||||
use crate::config_loader::ConfigLayerEntry;
|
||||
use crate::config_loader::ConfigRequirements;
|
||||
use crate::config_loader::ConfigRequirementsToml;
|
||||
use crate::exec::ExecParams;
|
||||
use crate::exec_policy::ExecPolicyManager;
|
||||
use crate::features::Feature;
|
||||
use crate::guardian::GUARDIAN_SUBAGENT_NAME;
|
||||
use crate::protocol::AskForApproval;
|
||||
use crate::sandboxing::SandboxPermissions;
|
||||
use crate::tools::handlers::normalize_and_validate_additional_permissions;
|
||||
use crate::turn_diff_tracker::TurnDiffTracker;
|
||||
use codex_app_server_protocol::ConfigLayerSource;
|
||||
use codex_execpolicy::Decision;
|
||||
use codex_execpolicy::Evaluation;
|
||||
use codex_execpolicy::RuleMatch;
|
||||
use codex_protocol::models::FunctionCallOutputBody;
|
||||
use codex_protocol::models::NetworkPermissions;
|
||||
use codex_protocol::models::PermissionProfile;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde::Deserialize;
|
||||
use std::collections::HashMap;
|
||||
use std::fs;
|
||||
use std::sync::Arc;
|
||||
use tempfile::tempdir;
|
||||
@@ -39,89 +36,23 @@ async fn guardian_allows_shell_additional_permissions_requests_past_policy_valid
|
||||
.features
|
||||
.enable(Feature::RequestPermissions)
|
||||
.expect("test setup should allow enabling request permissions");
|
||||
turn_context_raw
|
||||
.sandbox_policy
|
||||
.set(SandboxPolicy::DangerFullAccess)
|
||||
.expect("test setup should allow updating sandbox policy");
|
||||
let session = Arc::new(session);
|
||||
let turn_context = Arc::new(turn_context_raw);
|
||||
|
||||
let params = ExecParams {
|
||||
command: if cfg!(windows) {
|
||||
vec![
|
||||
"cmd.exe".to_string(),
|
||||
"/C".to_string(),
|
||||
"echo hi".to_string(),
|
||||
]
|
||||
} else {
|
||||
vec![
|
||||
"/bin/sh".to_string(),
|
||||
"-c".to_string(),
|
||||
"echo hi".to_string(),
|
||||
]
|
||||
},
|
||||
cwd: turn_context.cwd.clone(),
|
||||
expiration: 1000.into(),
|
||||
env: HashMap::new(),
|
||||
network: None,
|
||||
sandbox_permissions: SandboxPermissions::WithAdditionalPermissions,
|
||||
windows_sandbox_level: turn_context.windows_sandbox_level,
|
||||
justification: Some("test".to_string()),
|
||||
arg0: None,
|
||||
let additional_permissions = PermissionProfile {
|
||||
network: Some(NetworkPermissions {
|
||||
enabled: Some(true),
|
||||
}),
|
||||
file_system: None,
|
||||
macos: None,
|
||||
};
|
||||
let normalized = normalize_and_validate_additional_permissions(
|
||||
session.features().enabled(Feature::RequestPermissions),
|
||||
turn_context_raw.approval_policy.value(),
|
||||
SandboxPermissions::WithAdditionalPermissions,
|
||||
Some(additional_permissions.clone()),
|
||||
&turn_context_raw.cwd,
|
||||
)
|
||||
.expect("shell additional permissions should pass policy validation");
|
||||
|
||||
let handler = ShellHandler;
|
||||
let resp = handler
|
||||
.handle(ToolInvocation {
|
||||
session: Arc::clone(&session),
|
||||
turn: Arc::clone(&turn_context),
|
||||
tracker: Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new())),
|
||||
call_id: "test-call".to_string(),
|
||||
tool_name: "shell".to_string(),
|
||||
payload: ToolPayload::Function {
|
||||
arguments: serde_json::json!({
|
||||
"command": params.command.clone(),
|
||||
"workdir": Some(turn_context.cwd.to_string_lossy().to_string()),
|
||||
"timeout_ms": params.expiration.timeout_ms(),
|
||||
"sandbox_permissions": params.sandbox_permissions,
|
||||
"additional_permissions": PermissionProfile {
|
||||
network: Some(NetworkPermissions {
|
||||
enabled: Some(true),
|
||||
}),
|
||||
file_system: None,
|
||||
macos: None,
|
||||
},
|
||||
"justification": params.justification.clone(),
|
||||
})
|
||||
.to_string(),
|
||||
},
|
||||
})
|
||||
.await;
|
||||
|
||||
let output = match resp.expect("expected Ok result") {
|
||||
ToolOutput::Function {
|
||||
body: FunctionCallOutputBody::Text(content),
|
||||
..
|
||||
} => content,
|
||||
_ => panic!("unexpected tool output"),
|
||||
};
|
||||
|
||||
#[derive(Deserialize, PartialEq, Eq, Debug)]
|
||||
struct ResponseExecMetadata {
|
||||
exit_code: i32,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct ResponseExecOutput {
|
||||
output: String,
|
||||
metadata: ResponseExecMetadata,
|
||||
}
|
||||
|
||||
let exec_output: ResponseExecOutput =
|
||||
serde_json::from_str(&output).expect("valid exec output json");
|
||||
|
||||
assert_eq!(exec_output.metadata, ResponseExecMetadata { exit_code: 0 });
|
||||
assert!(exec_output.output.contains("hi"));
|
||||
assert_eq!(normalized, Some(additional_permissions));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -548,6 +548,8 @@ mod tests {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
#[cfg(unix)]
|
||||
use serial_test::serial;
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::ffi::OsStrExt;
|
||||
#[cfg(unix)]
|
||||
use std::process::Command;
|
||||
@@ -735,6 +737,7 @@ mod tests {
|
||||
|
||||
#[cfg(unix)]
|
||||
#[tokio::test]
|
||||
#[serial(stdin_fd)]
|
||||
async fn snapshot_shell_does_not_inherit_stdin() -> Result<()> {
|
||||
let _stdin_guard = BlockingStdinPipe::install()?;
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
---
|
||||
source: core/src/guardian.rs
|
||||
source: core/src/guardian_tests.rs
|
||||
expression: "context_snapshot::format_labeled_requests_snapshot(\"Guardian review request layout\",\n&[(\"Guardian Review Request\", &request)], &ContextSnapshotOptions::default(),)"
|
||||
---
|
||||
Scenario: Guardian review request layout
|
||||
@@ -25,6 +25,6 @@ Scenario: Guardian review request layout
|
||||
[11] Sandbox denied outbound git push to github.com.\n\n
|
||||
[12] Assess the exact planned action below. Use read-only tool checks when local state matters.\n
|
||||
[13] Planned action JSON:\n
|
||||
[14] {\n "command": [\n "git",\n "push",\n "origin",\n "guardian-approval-mvp"\n ],\n "cwd": "/repo/codex-rs/core",\n "justification": "Need to push the reviewed docs fix to the repo remote.",\n "sandbox_permissions": "use_default",\n "tool": "shell"\n}\n
|
||||
[14] {\n "tool": "shell",\n "command": [\n "git",\n "push",\n "origin",\n "guardian-approval-mvp"\n ],\n "cwd": "/repo/codex-rs/core",\n "sandbox_permissions": "use_default",\n "justification": "Need to push the reviewed docs fix to the repo remote."\n}\n
|
||||
[15] >>> APPROVAL REQUEST END\n
|
||||
[16] You may use read-only tool checks to gather any additional context you need to make a high-confidence determination.\n\nYour final message must be strict JSON with this exact schema:\n{\n "risk_level": "low" | "medium" | "high",\n "risk_score": 0-100,\n "rationale": string,\n "evidence": [{"message": string, "why": string}]\n}\n
|
||||
|
||||
@@ -7,3 +7,4 @@ pub(crate) use session::SessionState;
|
||||
pub(crate) use turn::ActiveTurn;
|
||||
pub(crate) use turn::RunningTask;
|
||||
pub(crate) use turn::TaskKind;
|
||||
pub(crate) use turn::TurnState;
|
||||
|
||||
@@ -172,11 +172,3 @@ impl TurnState {
|
||||
!self.pending_input.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
impl ActiveTurn {
|
||||
/// Clear any pending approvals and input buffered for the current turn.
|
||||
pub(crate) async fn clear_pending(&self) {
|
||||
let mut ts = self.turn_state.lock().await;
|
||||
ts.clear_pending();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,12 +33,14 @@ use crate::protocol::TurnCompleteEvent;
|
||||
use crate::state::ActiveTurn;
|
||||
use crate::state::RunningTask;
|
||||
use crate::state::TaskKind;
|
||||
use crate::state::TurnState;
|
||||
use codex_protocol::items::TurnItem;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use crate::features::Feature;
|
||||
pub(crate) use compact::CompactTask;
|
||||
@@ -201,7 +203,16 @@ impl Session {
|
||||
}
|
||||
|
||||
pub async fn abort_all_tasks(self: &Arc<Self>, reason: TurnAbortReason) {
|
||||
for task in self.take_all_running_tasks().await {
|
||||
let (tasks, turn_state) = self.take_all_running_tasks().await;
|
||||
for task in &tasks {
|
||||
task.cancellation_token.cancel();
|
||||
}
|
||||
if let Some(turn_state) = turn_state {
|
||||
// Drop pending approvals only after all running tasks observe cancellation, so
|
||||
// interrupted approval waits resolve as aborts instead of synthetic denials.
|
||||
turn_state.lock().await.clear_pending();
|
||||
}
|
||||
for task in tasks {
|
||||
self.handle_task_abort(task, reason.clone()).await;
|
||||
}
|
||||
if reason == TurnAbortReason::Interrupted {
|
||||
@@ -342,15 +353,14 @@ impl Session {
|
||||
*active = Some(turn);
|
||||
}
|
||||
|
||||
async fn take_all_running_tasks(&self) -> Vec<RunningTask> {
|
||||
async fn take_all_running_tasks(&self) -> (Vec<RunningTask>, Option<Arc<Mutex<TurnState>>>) {
|
||||
let mut active = self.active_turn.lock().await;
|
||||
match active.take() {
|
||||
Some(mut at) => {
|
||||
at.clear_pending().await;
|
||||
|
||||
at.drain_tasks()
|
||||
let turn_state = Arc::clone(&at.turn_state);
|
||||
(at.drain_tasks(), Some(turn_state))
|
||||
}
|
||||
None => Vec::new(),
|
||||
None => (Vec::new(), None),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -363,10 +373,6 @@ impl Session {
|
||||
|
||||
async fn handle_task_abort(self: &Arc<Self>, task: RunningTask, reason: TurnAbortReason) {
|
||||
let sub_id = task.turn_context.sub_id.clone();
|
||||
if task.cancellation_token.is_cancelled() {
|
||||
return;
|
||||
}
|
||||
|
||||
trace!(task_kind = ?task.kind, sub_id, "aborting running task");
|
||||
task.cancellation_token.cancel();
|
||||
task.turn_context
|
||||
|
||||
@@ -90,7 +90,7 @@ fn resolve_workdir_base_path(
|
||||
|
||||
/// Validates feature/policy constraints for `with_additional_permissions` and
|
||||
/// normalizes any path-based permissions. Errors if the request is invalid.
|
||||
pub(super) fn normalize_and_validate_additional_permissions(
|
||||
pub(crate) fn normalize_and_validate_additional_permissions(
|
||||
request_permission_enabled: bool,
|
||||
approval_policy: AskForApproval,
|
||||
sandbox_permissions: SandboxPermissions,
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
use assert_matches::assert_matches;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use core_test_support::responses::ResponseMock;
|
||||
use core_test_support::responses::ResponsesRequest;
|
||||
use core_test_support::responses::ev_completed;
|
||||
use core_test_support::responses::ev_function_call;
|
||||
use core_test_support::responses::ev_response_created;
|
||||
@@ -16,6 +17,45 @@ use core_test_support::test_codex::test_codex;
|
||||
use core_test_support::wait_for_event;
|
||||
use regex_lite::Regex;
|
||||
use serde_json::json;
|
||||
use tokio::time::Instant;
|
||||
use tokio::time::sleep;
|
||||
|
||||
async fn wait_for_responses_request_count_to_stabilize(
|
||||
response_mock: &ResponseMock,
|
||||
expected_count: usize,
|
||||
timeout: Duration,
|
||||
settle_duration: Duration,
|
||||
) -> Vec<ResponsesRequest> {
|
||||
let deadline = Instant::now() + timeout;
|
||||
let mut stable_since: Option<Instant> = None;
|
||||
|
||||
loop {
|
||||
let requests = response_mock.requests();
|
||||
let request_count = requests.len();
|
||||
|
||||
if request_count > expected_count {
|
||||
panic!("expected at most {expected_count} responses requests, got {request_count}");
|
||||
}
|
||||
|
||||
if request_count == expected_count {
|
||||
match stable_since {
|
||||
Some(stable_since) if stable_since.elapsed() >= settle_duration => {
|
||||
return requests;
|
||||
}
|
||||
Some(_) => {}
|
||||
None => stable_since = Some(Instant::now()),
|
||||
}
|
||||
} else {
|
||||
stable_since = None;
|
||||
}
|
||||
|
||||
assert!(
|
||||
Instant::now() < deadline,
|
||||
"timed out waiting for {expected_count} responses requests; last count was {request_count}"
|
||||
);
|
||||
sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Integration test: spawn a long‑running shell_command tool via a mocked Responses SSE
|
||||
/// function call, then interrupt the session and expect TurnAborted.
|
||||
@@ -62,6 +102,9 @@ async fn interrupt_long_running_tool_emits_turn_aborted() {
|
||||
|
||||
// Expect TurnAborted soon after.
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnAborted(_))).await;
|
||||
|
||||
codex.submit(Op::Shutdown).await.unwrap();
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await;
|
||||
}
|
||||
|
||||
/// After an interrupt we expect the next request to the model to include both
|
||||
@@ -129,7 +172,13 @@ async fn interrupt_tool_records_history_entries() {
|
||||
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
let requests = response_mock.requests();
|
||||
let requests = wait_for_responses_request_count_to_stabilize(
|
||||
&response_mock,
|
||||
2,
|
||||
Duration::from_secs(2),
|
||||
Duration::from_millis(100),
|
||||
)
|
||||
.await;
|
||||
assert!(
|
||||
requests.len() == 2,
|
||||
"expected two calls to the responses API, got {}",
|
||||
@@ -140,28 +189,44 @@ async fn interrupt_tool_records_history_entries() {
|
||||
response_mock.saw_function_call(call_id),
|
||||
"function call not recorded in responses payload"
|
||||
);
|
||||
let output = response_mock
|
||||
.function_call_output_text(call_id)
|
||||
.expect("missing function_call_output text");
|
||||
let re = Regex::new(r"^Wall time: ([0-9]+(?:\.[0-9])?) seconds\naborted by user$")
|
||||
.expect("compile regex");
|
||||
let captures = re.captures(&output);
|
||||
assert_matches!(
|
||||
captures.as_ref(),
|
||||
Some(caps) if caps.get(1).is_some(),
|
||||
"aborted message with elapsed seconds"
|
||||
);
|
||||
let secs: f32 = captures
|
||||
.expect("aborted message with elapsed seconds")
|
||||
.get(1)
|
||||
.unwrap()
|
||||
.as_str()
|
||||
.parse()
|
||||
.unwrap();
|
||||
let follow_up_request = &requests[1];
|
||||
let (output, success) = follow_up_request
|
||||
.function_call_output_content_and_success(call_id)
|
||||
.expect("missing function_call_output entry");
|
||||
let output = output.expect("missing function_call_output text");
|
||||
assert!(
|
||||
secs >= 0.1,
|
||||
"expected at least one tenth of a second of elapsed time, got {secs}"
|
||||
output.contains("aborted"),
|
||||
"expected aborted output in follow-up request, got {output:?}"
|
||||
);
|
||||
assert!(
|
||||
success != Some(true),
|
||||
"expected aborted output to avoid a success=true marker, got {success:?} with output {output:?}"
|
||||
);
|
||||
|
||||
let wall_time =
|
||||
Regex::new(r"Wall time: ([0-9]+(?:\.[0-9]+)?) seconds").expect("compile wall time regex");
|
||||
let aborted_after =
|
||||
Regex::new(r"aborted by user after ([0-9]+(?:\.[0-9]+)?)s").expect("compile abort regex");
|
||||
|
||||
let secs = wall_time
|
||||
.captures(&output)
|
||||
.and_then(|captures| captures.get(1))
|
||||
.or_else(|| {
|
||||
aborted_after
|
||||
.captures(&output)
|
||||
.and_then(|captures| captures.get(1))
|
||||
})
|
||||
.map(|capture| capture.as_str().parse::<f32>().unwrap());
|
||||
|
||||
if let Some(secs) = secs {
|
||||
assert!(
|
||||
secs >= 0.1,
|
||||
"expected at least one tenth of a second of elapsed time, got {secs} in {output:?}"
|
||||
);
|
||||
}
|
||||
|
||||
codex.submit(Op::Shutdown).await.unwrap();
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await;
|
||||
}
|
||||
|
||||
/// After an interrupt we persist a model-visible `<turn_aborted>` marker in the conversation
|
||||
@@ -227,7 +292,13 @@ async fn interrupt_persists_turn_aborted_marker_in_next_request() {
|
||||
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
let requests = response_mock.requests();
|
||||
let requests = wait_for_responses_request_count_to_stabilize(
|
||||
&response_mock,
|
||||
2,
|
||||
Duration::from_secs(2),
|
||||
Duration::from_millis(100),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(requests.len(), 2, "expected two calls to the responses API");
|
||||
|
||||
let follow_up_request = &requests[1];
|
||||
@@ -238,4 +309,70 @@ async fn interrupt_persists_turn_aborted_marker_in_next_request() {
|
||||
.any(|text| text.contains("<turn_aborted>")),
|
||||
"expected <turn_aborted> marker in follow-up request"
|
||||
);
|
||||
|
||||
codex.submit(Op::Shutdown).await.unwrap();
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await;
|
||||
}
|
||||
|
||||
/// Interrupting a turn while a tool-produced follow-up is pending must not
|
||||
/// start another model request before the session reports TurnAborted.
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn interrupt_does_not_issue_follow_up_request() {
|
||||
let command = "sleep 60";
|
||||
let call_id = "call-no-follow-up";
|
||||
|
||||
let args = json!({
|
||||
"command": command,
|
||||
"timeout_ms": 60_000
|
||||
})
|
||||
.to_string();
|
||||
let first_body = sse(vec![
|
||||
ev_response_created("resp-no-follow-up"),
|
||||
ev_function_call(call_id, "shell_command", &args),
|
||||
ev_completed("resp-no-follow-up"),
|
||||
]);
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let response_mock = mount_sse_once(&server, first_body).await;
|
||||
|
||||
let fixture = test_codex()
|
||||
.with_model("gpt-5.1")
|
||||
.build(&server)
|
||||
.await
|
||||
.unwrap();
|
||||
let codex = Arc::clone(&fixture.codex);
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "start interrupt follow-up guard".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecCommandBegin(_))).await;
|
||||
|
||||
tokio::time::sleep(Duration::from_secs_f32(0.1)).await;
|
||||
codex.submit(Op::Interrupt).await.unwrap();
|
||||
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnAborted(_))).await;
|
||||
|
||||
let requests = wait_for_responses_request_count_to_stabilize(
|
||||
&response_mock,
|
||||
1,
|
||||
Duration::from_secs(2),
|
||||
Duration::from_millis(200),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(
|
||||
requests.len(),
|
||||
1,
|
||||
"interrupt should not issue a follow-up responses request"
|
||||
);
|
||||
|
||||
codex.submit(Op::Shutdown).await.unwrap();
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await;
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use chrono::Utc;
|
||||
use codex_core::CodexAuth;
|
||||
use codex_core::CodexThread;
|
||||
use codex_core::auth::OPENAI_API_KEY_ENV_VAR;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::CodexErrorInfo;
|
||||
@@ -29,6 +30,7 @@ use core_test_support::wait_for_event_match;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
use serial_test::serial;
|
||||
use std::ffi::OsString;
|
||||
use std::fs;
|
||||
use std::time::Duration;
|
||||
@@ -37,6 +39,8 @@ use tokio::sync::oneshot;
|
||||
const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex.";
|
||||
const MEMORY_PROMPT_PHRASE: &str =
|
||||
"You have access to a memory folder with guidance from prior runs.";
|
||||
const REALTIME_NORMAL_CLOSE_MESSAGE: &str =
|
||||
"failed to send realtime request: Connection closed normally";
|
||||
|
||||
fn websocket_request_text(
|
||||
request: &core_test_support::responses::WebSocketRequest,
|
||||
@@ -54,6 +58,32 @@ fn websocket_request_instructions(
|
||||
.map(str::to_owned)
|
||||
}
|
||||
|
||||
async fn wait_for_session_updated_allowing_clean_close(
|
||||
codex: &CodexThread,
|
||||
expected_session_id: &str,
|
||||
) {
|
||||
tokio::time::timeout(Duration::from_secs(3), async {
|
||||
loop {
|
||||
let event = match codex.next_event().await {
|
||||
Ok(event) => event,
|
||||
Err(err) => panic!("realtime conversation event should arrive: {err:?}"),
|
||||
};
|
||||
match event.msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) if session_id == expected_session_id => return,
|
||||
EventMsg::Error(err) if err.message.contains(REALTIME_NORMAL_CLOSE_MESSAGE) => {}
|
||||
EventMsg::Error(err) => panic!("conversation start failed: {err:?}"),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|_| {
|
||||
panic!("timeout waiting for session.updated event for {expected_session_id}")
|
||||
});
|
||||
}
|
||||
|
||||
async fn seed_recent_thread(
|
||||
test: &TestCodex,
|
||||
title: &str,
|
||||
@@ -230,6 +260,7 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> {
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
#[serial(openai_api_key_env)]
|
||||
async fn conversation_start_uses_openai_env_key_fallback_with_chatgpt_auth() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
@@ -665,15 +696,7 @@ async fn conversation_uses_experimental_realtime_ws_startup_context_override() -
|
||||
}))
|
||||
.await?;
|
||||
|
||||
wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) if session_id == "sess_custom_context" => Some(Ok(())),
|
||||
EventMsg::Error(err) => Some(Err(err.clone())),
|
||||
_ => None,
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
|
||||
wait_for_session_updated_allowing_clean_close(&test.codex, "sess_custom_context").await;
|
||||
|
||||
let startup_context_request = server.wait_for_request(1, 0).await;
|
||||
let instructions = websocket_request_instructions(&startup_context_request)
|
||||
@@ -723,15 +746,7 @@ async fn conversation_disables_realtime_startup_context_with_empty_override() ->
|
||||
}))
|
||||
.await?;
|
||||
|
||||
wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) if session_id == "sess_no_context" => Some(Ok(())),
|
||||
EventMsg::Error(err) => Some(Err(err.clone())),
|
||||
_ => None,
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
|
||||
wait_for_session_updated_allowing_clean_close(&test.codex, "sess_no_context").await;
|
||||
|
||||
let startup_context_request = server.wait_for_request(1, 0).await;
|
||||
let instructions = websocket_request_instructions(&startup_context_request)
|
||||
@@ -777,15 +792,7 @@ async fn conversation_start_injects_startup_context_from_thread_history() -> Res
|
||||
}))
|
||||
.await?;
|
||||
|
||||
wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) if session_id == "sess_context" => Some(Ok(())),
|
||||
EventMsg::Error(err) => Some(Err(err.clone())),
|
||||
_ => None,
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
|
||||
wait_for_session_updated_allowing_clean_close(&test.codex, "sess_context").await;
|
||||
|
||||
let startup_context_request = server.wait_for_request(1, 0).await;
|
||||
let startup_context = websocket_request_instructions(&startup_context_request)
|
||||
@@ -831,15 +838,7 @@ async fn conversation_startup_context_falls_back_to_workspace_map() -> Result<()
|
||||
}))
|
||||
.await?;
|
||||
|
||||
wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionUpdated { session_id, .. },
|
||||
}) if session_id == "sess_workspace" => Some(Ok(())),
|
||||
EventMsg::Error(err) => Some(Err(err.clone())),
|
||||
_ => None,
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
|
||||
wait_for_session_updated_allowing_clean_close(&test.codex, "sess_workspace").await;
|
||||
|
||||
let startup_context_request = server.wait_for_request(1, 0).await;
|
||||
let startup_context = websocket_request_instructions(&startup_context_request)
|
||||
|
||||
@@ -4,6 +4,7 @@ use std::ffi::OsString;
|
||||
use std::fs;
|
||||
use std::net::TcpListener;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::SystemTime;
|
||||
use std::time::UNIX_EPOCH;
|
||||
@@ -36,11 +37,13 @@ use core_test_support::skip_if_no_network;
|
||||
use core_test_support::stdio_server_bin;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use core_test_support::wait_for_event;
|
||||
use core_test_support::wait_for_event_with_timeout;
|
||||
use reqwest::Client;
|
||||
use reqwest::StatusCode;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
use serial_test::serial;
|
||||
use tempfile::tempdir;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::process::Child;
|
||||
use tokio::process::Command;
|
||||
use tokio::time::Instant;
|
||||
@@ -263,7 +266,7 @@ async fn stdio_image_responses_round_trip() -> anyhow::Result<()> {
|
||||
let tools_ready_deadline = Instant::now() + Duration::from_secs(30);
|
||||
loop {
|
||||
fixture.codex.submit(Op::ListMcpTools).await?;
|
||||
let list_event = core_test_support::wait_for_event_with_timeout(
|
||||
let list_event = wait_for_event_with_timeout(
|
||||
&fixture.codex,
|
||||
|ev| matches!(ev, EventMsg::McpListToolsResponse(_)),
|
||||
Duration::from_secs(10),
|
||||
@@ -851,10 +854,8 @@ async fn streamable_http_tool_call_round_trip() -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// This test writes to a fallback credentials file in CODEX_HOME.
|
||||
/// Ideally, we wouldn't need to serialize the test but it's much more cumbersome to wire CODEX_HOME through the code.
|
||||
#[serial(codex_home)]
|
||||
#[test]
|
||||
#[serial(mcp_test_value)]
|
||||
fn streamable_http_with_oauth_round_trip() -> anyhow::Result<()> {
|
||||
const TEST_STACK_SIZE_BYTES: usize = 8 * 1024 * 1024;
|
||||
|
||||
@@ -936,8 +937,8 @@ async fn streamable_http_with_oauth_round_trip_impl() -> anyhow::Result<()> {
|
||||
wait_for_streamable_http_server(&mut http_server_child, &bind_addr, Duration::from_secs(5))
|
||||
.await?;
|
||||
|
||||
let temp_home = tempdir()?;
|
||||
let _guard = EnvVarGuard::set("CODEX_HOME", temp_home.path().as_os_str());
|
||||
let temp_home = Arc::new(tempdir()?);
|
||||
let _codex_home_guard = EnvVarGuard::set("CODEX_HOME", temp_home.path().as_os_str());
|
||||
write_fallback_oauth_tokens(
|
||||
temp_home.path(),
|
||||
server_name,
|
||||
@@ -948,10 +949,10 @@ async fn streamable_http_with_oauth_round_trip_impl() -> anyhow::Result<()> {
|
||||
)?;
|
||||
|
||||
let fixture = test_codex()
|
||||
.with_home(temp_home.clone())
|
||||
.with_config(move |config| {
|
||||
// This test seeds OAuth tokens in CODEX_HOME/.credentials.json and
|
||||
// validates file-backed OAuth loading. Force file mode so Linux
|
||||
// keyring backend quirks do not affect this test.
|
||||
// Keep OAuth credentials isolated to this test home because Bazel
|
||||
// runs the full core suite in one process.
|
||||
config.mcp_oauth_credentials_store_mode = serde_json::from_value(json!("file"))
|
||||
.expect("`file` should deserialize as OAuthCredentialsStoreMode");
|
||||
let mut servers = config.mcp_servers.get().clone();
|
||||
@@ -984,6 +985,31 @@ async fn streamable_http_with_oauth_round_trip_impl() -> anyhow::Result<()> {
|
||||
.await?;
|
||||
let session_model = fixture.session_configured.model.clone();
|
||||
|
||||
let tools_ready_deadline = Instant::now() + Duration::from_secs(30);
|
||||
loop {
|
||||
fixture.codex.submit(Op::ListMcpTools).await?;
|
||||
let list_event = wait_for_event_with_timeout(
|
||||
&fixture.codex,
|
||||
|ev| matches!(ev, EventMsg::McpListToolsResponse(_)),
|
||||
Duration::from_secs(10),
|
||||
)
|
||||
.await;
|
||||
let EventMsg::McpListToolsResponse(tool_list) = list_event else {
|
||||
unreachable!("event guard guarantees McpListToolsResponse");
|
||||
};
|
||||
if tool_list.tools.contains_key(&tool_name) {
|
||||
break;
|
||||
}
|
||||
|
||||
let available_tools: Vec<&str> = tool_list.tools.keys().map(String::as_str).collect();
|
||||
if Instant::now() >= tools_ready_deadline {
|
||||
panic!(
|
||||
"timed out waiting for MCP tool {tool_name} to become available; discovered tools: {available_tools:?}"
|
||||
);
|
||||
}
|
||||
sleep(Duration::from_millis(200)).await;
|
||||
}
|
||||
|
||||
fixture
|
||||
.codex
|
||||
.submit(Op::UserTurn {
|
||||
@@ -1078,7 +1104,8 @@ async fn wait_for_streamable_http_server(
|
||||
timeout: Duration,
|
||||
) -> anyhow::Result<()> {
|
||||
let deadline = Instant::now() + timeout;
|
||||
|
||||
let metadata_url = format!("http://{address}/.well-known/oauth-authorization-server/mcp");
|
||||
let client = Client::builder().no_proxy().build()?;
|
||||
loop {
|
||||
if let Some(status) = server_child.try_wait()? {
|
||||
return Err(anyhow::anyhow!(
|
||||
@@ -1090,22 +1117,30 @@ async fn wait_for_streamable_http_server(
|
||||
|
||||
if remaining.is_zero() {
|
||||
return Err(anyhow::anyhow!(
|
||||
"timed out waiting for streamable HTTP server at {address}: deadline reached"
|
||||
"timed out waiting for streamable HTTP server metadata at {metadata_url}: deadline reached"
|
||||
));
|
||||
}
|
||||
|
||||
match tokio::time::timeout(remaining, TcpStream::connect(address)).await {
|
||||
Ok(Ok(_)) => return Ok(()),
|
||||
match tokio::time::timeout(remaining, client.get(&metadata_url).send()).await {
|
||||
Ok(Ok(response)) if response.status() == StatusCode::OK => return Ok(()),
|
||||
Ok(Ok(response)) => {
|
||||
if Instant::now() >= deadline {
|
||||
return Err(anyhow::anyhow!(
|
||||
"timed out waiting for streamable HTTP server metadata at {metadata_url}: HTTP {}",
|
||||
response.status()
|
||||
));
|
||||
}
|
||||
}
|
||||
Ok(Err(error)) => {
|
||||
if Instant::now() >= deadline {
|
||||
return Err(anyhow::anyhow!(
|
||||
"timed out waiting for streamable HTTP server at {address}: {error}"
|
||||
"timed out waiting for streamable HTTP server metadata at {metadata_url}: {error}"
|
||||
));
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
return Err(anyhow::anyhow!(
|
||||
"timed out waiting for streamable HTTP server at {address}: connect call timed out"
|
||||
"timed out waiting for streamable HTTP server metadata at {metadata_url}: request timed out"
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -349,7 +349,7 @@ async fn shell_output_for_freeform_tool_records_duration(
|
||||
let test = builder.build(&server).await?;
|
||||
|
||||
let call_id = "shell-structured";
|
||||
let responses = shell_responses(call_id, vec!["/bin/sh", "-c", "sleep 1"], output_type)?;
|
||||
let responses = shell_responses(call_id, vec!["/bin/sh", "-c", "sleep 0.2"], output_type)?;
|
||||
let mock = mount_sse_sequence(&server, responses).await;
|
||||
|
||||
test.submit_turn_with_policy(
|
||||
@@ -381,7 +381,7 @@ $"#;
|
||||
.and_then(|value| value.as_str().parse::<f32>().ok())
|
||||
.expect("expected structured shell output to contain wall time seconds");
|
||||
assert!(
|
||||
wall_time_seconds > 0.5,
|
||||
wall_time_seconds > 0.1,
|
||||
"expected wall time to be greater than zero seconds, got {wall_time_seconds}"
|
||||
);
|
||||
|
||||
@@ -740,6 +740,7 @@ async fn shell_command_output_is_freeform() -> Result<()> {
|
||||
let call_id = "shell-command";
|
||||
let args = json!({
|
||||
"command": "echo shell command",
|
||||
"login": false,
|
||||
"timeout_ms": 1_000,
|
||||
});
|
||||
let responses = vec![
|
||||
@@ -791,6 +792,7 @@ async fn shell_command_output_is_not_truncated_under_10k_bytes() -> Result<()> {
|
||||
let call_id = "shell-command";
|
||||
let args = json!({
|
||||
"command": "perl -e 'print \"1\" x 10000'",
|
||||
"login": false,
|
||||
"timeout_ms": 1000,
|
||||
});
|
||||
let responses = vec![
|
||||
@@ -841,6 +843,7 @@ async fn shell_command_output_is_not_truncated_over_10k_bytes() -> Result<()> {
|
||||
let call_id = "shell-command";
|
||||
let args = json!({
|
||||
"command": "perl -e 'print \"1\" x 10001'",
|
||||
"login": false,
|
||||
"timeout_ms": 1000,
|
||||
});
|
||||
let responses = vec![
|
||||
|
||||
@@ -6,63 +6,40 @@ use codex_core::WireApi;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use codex_utils_cargo_bin::find_resource;
|
||||
use core_test_support::load_sse_fixture;
|
||||
use core_test_support::responses::ev_completed;
|
||||
use core_test_support::responses::ev_response_created;
|
||||
use core_test_support::responses::sse;
|
||||
use core_test_support::responses;
|
||||
use core_test_support::skip_if_no_network;
|
||||
use core_test_support::streaming_sse::StreamingSseChunk;
|
||||
use core_test_support::streaming_sse::start_streaming_sse_server;
|
||||
use core_test_support::test_codex::TestCodex;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use core_test_support::wait_for_event;
|
||||
use wiremock::Mock;
|
||||
use wiremock::MockServer;
|
||||
use wiremock::Request;
|
||||
use wiremock::Respond;
|
||||
use wiremock::ResponseTemplate;
|
||||
use wiremock::matchers::method;
|
||||
use wiremock::matchers::path;
|
||||
|
||||
fn sse_incomplete() -> String {
|
||||
load_sse_fixture("tests/fixtures/incomplete_sse.json")
|
||||
let fixture = find_resource!("tests/fixtures/incomplete_sse.json")
|
||||
.unwrap_or_else(|err| panic!("failed to resolve incomplete_sse fixture: {err}"));
|
||||
load_sse_fixture(fixture)
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn retries_on_early_close() {
|
||||
skip_if_no_network!();
|
||||
|
||||
let server = MockServer::start().await;
|
||||
let incomplete_sse = sse_incomplete();
|
||||
let completed_sse = responses::sse_completed("resp_ok");
|
||||
|
||||
struct SeqResponder;
|
||||
impl Respond for SeqResponder {
|
||||
fn respond(&self, _: &Request) -> ResponseTemplate {
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::atomic::Ordering;
|
||||
static CALLS: AtomicUsize = AtomicUsize::new(0);
|
||||
let n = CALLS.fetch_add(1, Ordering::SeqCst);
|
||||
if n == 0 {
|
||||
ResponseTemplate::new(200)
|
||||
.insert_header("content-type", "text/event-stream")
|
||||
.set_body_raw(sse_incomplete(), "text/event-stream")
|
||||
} else {
|
||||
ResponseTemplate::new(200)
|
||||
.insert_header("content-type", "text/event-stream")
|
||||
.set_body_raw(
|
||||
sse(vec![
|
||||
ev_response_created("resp_ok"),
|
||||
ev_completed("resp_ok"),
|
||||
]),
|
||||
"text/event-stream",
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/v1/responses"))
|
||||
.respond_with(SeqResponder {})
|
||||
.expect(2)
|
||||
.mount(&server)
|
||||
.await;
|
||||
let (server, _) = start_streaming_sse_server(vec![
|
||||
vec![StreamingSseChunk {
|
||||
gate: None,
|
||||
body: incomplete_sse,
|
||||
}],
|
||||
vec![StreamingSseChunk {
|
||||
gate: None,
|
||||
body: completed_sse,
|
||||
}],
|
||||
])
|
||||
.await;
|
||||
|
||||
// Configure retry behavior explicitly to avoid mutating process-wide
|
||||
// environment variables.
|
||||
@@ -92,7 +69,7 @@ async fn retries_on_early_close() {
|
||||
.with_config(move |config| {
|
||||
config.model_provider = model_provider;
|
||||
})
|
||||
.build(&server)
|
||||
.build_with_streaming_server(&server)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -109,4 +86,13 @@ async fn retries_on_early_close() {
|
||||
|
||||
// Wait until TurnComplete (should succeed after retry).
|
||||
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
let requests = server.requests().await;
|
||||
assert_eq!(
|
||||
requests.len(),
|
||||
2,
|
||||
"expected retry after incomplete SSE stream"
|
||||
);
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ use std::collections::HashMap;
|
||||
use std::io::ErrorKind;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use axum::Router;
|
||||
use axum::body::Body;
|
||||
@@ -44,6 +45,7 @@ use serde::Deserialize;
|
||||
use serde_json::json;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::task;
|
||||
use tokio::time::sleep;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct TestToolServer {
|
||||
@@ -275,15 +277,25 @@ fn parse_bind_addr() -> Result<SocketAddr, Box<dyn std::error::Error>> {
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let bind_addr = parse_bind_addr()?;
|
||||
let session_failure_state = SessionFailureState::default();
|
||||
let listener = match tokio::net::TcpListener::bind(&bind_addr).await {
|
||||
Ok(listener) => listener,
|
||||
Err(err) if err.kind() == ErrorKind::PermissionDenied => {
|
||||
eprintln!(
|
||||
"failed to bind to {bind_addr}: {err}. make sure the process has network access"
|
||||
);
|
||||
return Ok(());
|
||||
const MAX_BIND_RETRIES: u32 = 20;
|
||||
const BIND_RETRY_DELAY: Duration = Duration::from_millis(50);
|
||||
|
||||
let mut bind_retries = 0;
|
||||
let listener = loop {
|
||||
match tokio::net::TcpListener::bind(&bind_addr).await {
|
||||
Ok(listener) => break listener,
|
||||
Err(err) if err.kind() == ErrorKind::PermissionDenied => {
|
||||
eprintln!(
|
||||
"failed to bind to {bind_addr}: {err}. make sure the process has network access"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
Err(err) if err.kind() == ErrorKind::AddrInUse && bind_retries < MAX_BIND_RETRIES => {
|
||||
bind_retries += 1;
|
||||
sleep(BIND_RETRY_DELAY).await;
|
||||
}
|
||||
Err(err) => return Err(err.into()),
|
||||
}
|
||||
Err(err) => return Err(err.into()),
|
||||
};
|
||||
eprintln!("starting rmcp streamable http test server on http://{bind_addr}/mcp");
|
||||
|
||||
|
||||
@@ -24,8 +24,13 @@ async fn wait_for_pid_file(path: &Path) -> Result<u32> {
|
||||
for _ in 0..50 {
|
||||
match fs::read_to_string(path) {
|
||||
Ok(content) => {
|
||||
let pid = content
|
||||
.trim()
|
||||
let trimmed = content.trim();
|
||||
if trimmed.is_empty() {
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
continue;
|
||||
}
|
||||
|
||||
let pid = trimmed
|
||||
.parse::<u32>()
|
||||
.with_context(|| format!("failed to parse pid from {}", path.display()))?;
|
||||
return Ok(pid);
|
||||
|
||||
@@ -22,7 +22,6 @@ anyhow = { workspace = true }
|
||||
uds_windows = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
assert_cmd = { workspace = true }
|
||||
codex-utils-cargo-bin = { workspace = true }
|
||||
pretty_assertions = { workspace = true }
|
||||
tempfile = { workspace = true }
|
||||
|
||||
@@ -39,9 +39,13 @@ pub fn run(socket_path: &Path) -> anyhow::Result<()> {
|
||||
io::copy(&mut handle, &mut stream).context("failed to copy data from stdin to socket")?;
|
||||
}
|
||||
|
||||
stream
|
||||
.shutdown(Shutdown::Write)
|
||||
.context("failed to shutdown socket writer")?;
|
||||
// The peer can close immediately after sending its response. On macOS that
|
||||
// may surface as NotConnected when we half-close our write side.
|
||||
if let Err(err) = stream.shutdown(Shutdown::Write)
|
||||
&& err.kind() != io::ErrorKind::NotConnected
|
||||
{
|
||||
return Err(err).context("failed to shutdown socket writer");
|
||||
}
|
||||
|
||||
let stdout_result = stdout_thread
|
||||
.join()
|
||||
|
||||
@@ -1,12 +1,15 @@
|
||||
use std::io::ErrorKind;
|
||||
use std::io::Read;
|
||||
use std::io::Write;
|
||||
use std::process::Command;
|
||||
use std::process::Stdio;
|
||||
use std::sync::mpsc;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
use anyhow::Context;
|
||||
use assert_cmd::Command;
|
||||
use anyhow::anyhow;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[cfg(unix)]
|
||||
@@ -19,6 +22,9 @@ use uds_windows::UnixListener;
|
||||
fn pipes_stdin_and_stdout_through_socket() -> anyhow::Result<()> {
|
||||
let dir = tempfile::TempDir::new().context("failed to create temp dir")?;
|
||||
let socket_path = dir.path().join("socket");
|
||||
let request = b"request";
|
||||
let request_path = dir.path().join("request.txt");
|
||||
std::fs::write(&request_path, request).context("failed to write child stdin fixture")?;
|
||||
let listener = match UnixListener::bind(&socket_path) {
|
||||
Ok(listener) => listener,
|
||||
Err(err) if err.kind() == ErrorKind::PermissionDenied => {
|
||||
@@ -31,37 +37,103 @@ fn pipes_stdin_and_stdout_through_socket() -> anyhow::Result<()> {
|
||||
};
|
||||
|
||||
let (tx, rx) = mpsc::channel();
|
||||
let (event_tx, event_rx) = mpsc::channel();
|
||||
let server_thread = thread::spawn(move || -> anyhow::Result<()> {
|
||||
let _ = event_tx.send("waiting for accept".to_string());
|
||||
let (mut connection, _) = listener
|
||||
.accept()
|
||||
.context("failed to accept test connection")?;
|
||||
let mut received = Vec::new();
|
||||
let _ = event_tx.send("accepted connection".to_string());
|
||||
let mut received = vec![0; request.len()];
|
||||
connection
|
||||
.read_to_end(&mut received)
|
||||
.read_exact(&mut received)
|
||||
.context("failed to read data from client")?;
|
||||
let _ = event_tx.send(format!("read {} bytes", received.len()));
|
||||
tx.send(received)
|
||||
.map_err(|_| anyhow::anyhow!("failed to send received bytes to test thread"))?;
|
||||
.map_err(|_| anyhow!("failed to send received bytes to test thread"))?;
|
||||
connection
|
||||
.write_all(b"response")
|
||||
.context("failed to write response to client")?;
|
||||
let _ = event_tx.send("wrote response".to_string());
|
||||
Ok(())
|
||||
});
|
||||
|
||||
Command::new(codex_utils_cargo_bin::cargo_bin("codex-stdio-to-uds")?)
|
||||
let stdin = std::fs::File::open(&request_path).context("failed to open child stdin fixture")?;
|
||||
let mut child = Command::new(codex_utils_cargo_bin::cargo_bin("codex-stdio-to-uds")?)
|
||||
.arg(&socket_path)
|
||||
.write_stdin("request")
|
||||
.assert()
|
||||
.success()
|
||||
.stdout("response");
|
||||
.stdin(Stdio::from(stdin))
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.spawn()
|
||||
.context("failed to spawn codex-stdio-to-uds")?;
|
||||
|
||||
let mut child_stdout = child.stdout.take().context("missing child stdout")?;
|
||||
let mut child_stderr = child.stderr.take().context("missing child stderr")?;
|
||||
let (stdout_tx, stdout_rx) = mpsc::channel();
|
||||
let (stderr_tx, stderr_rx) = mpsc::channel();
|
||||
thread::spawn(move || {
|
||||
let mut stdout = Vec::new();
|
||||
let result = child_stdout.read_to_end(&mut stdout).map(|_| stdout);
|
||||
let _ = stdout_tx.send(result);
|
||||
});
|
||||
thread::spawn(move || {
|
||||
let mut stderr = Vec::new();
|
||||
let result = child_stderr.read_to_end(&mut stderr).map(|_| stderr);
|
||||
let _ = stderr_tx.send(result);
|
||||
});
|
||||
|
||||
let mut server_events = Vec::new();
|
||||
let deadline = Instant::now() + Duration::from_secs(5);
|
||||
let status = loop {
|
||||
while let Ok(event) = event_rx.try_recv() {
|
||||
server_events.push(event);
|
||||
}
|
||||
|
||||
if let Some(status) = child.try_wait().context("failed to poll child status")? {
|
||||
break status;
|
||||
}
|
||||
|
||||
if Instant::now() >= deadline {
|
||||
let _ = child.kill();
|
||||
let _ = child.wait();
|
||||
let stderr = stderr_rx
|
||||
.recv_timeout(Duration::from_secs(1))
|
||||
.context("timed out waiting for child stderr after kill")?
|
||||
.context("failed to read child stderr")?;
|
||||
anyhow::bail!(
|
||||
"codex-stdio-to-uds did not exit in time; server events: {:?}; stderr: {}",
|
||||
server_events,
|
||||
String::from_utf8_lossy(&stderr).trim_end()
|
||||
);
|
||||
}
|
||||
|
||||
thread::sleep(Duration::from_millis(25));
|
||||
};
|
||||
|
||||
let stdout = stdout_rx
|
||||
.recv_timeout(Duration::from_secs(1))
|
||||
.context("timed out waiting for child stdout")?
|
||||
.context("failed to read child stdout")?;
|
||||
let stderr = stderr_rx
|
||||
.recv_timeout(Duration::from_secs(1))
|
||||
.context("timed out waiting for child stderr")?
|
||||
.context("failed to read child stderr")?;
|
||||
assert!(
|
||||
status.success(),
|
||||
"codex-stdio-to-uds exited with {status}; server events: {:?}; stderr: {}",
|
||||
server_events,
|
||||
String::from_utf8_lossy(&stderr).trim_end()
|
||||
);
|
||||
assert_eq!(stdout, b"response");
|
||||
|
||||
let received = rx
|
||||
.recv_timeout(Duration::from_secs(1))
|
||||
.context("server did not receive data in time")?;
|
||||
assert_eq!(received, b"request");
|
||||
assert_eq!(received, request);
|
||||
|
||||
let server_result = server_thread
|
||||
.join()
|
||||
.map_err(|_| anyhow::anyhow!("server thread panicked"))?;
|
||||
.map_err(|_| anyhow!("server thread panicked"))?;
|
||||
server_result.context("server failed")?;
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
---
|
||||
source: tui/src/chatwidget/tests.rs
|
||||
expression: popup
|
||||
---
|
||||
Experimental features
|
||||
Toggle experimental features. Changes are saved to config.toml.
|
||||
|
||||
› [ ] JavaScript REPL Enable a persistent Node-backed JavaScript REPL for interactive website debugging
|
||||
and other inline JavaScript execution capabilities. Requires Node >= v22.22.0
|
||||
installed.
|
||||
[ ] Bubblewrap sandbox Try the new linux sandbox based on bubblewrap.
|
||||
[ ] Multi-agents Ask Codex to spawn multiple agents to parallelize the work and win in efficiency.
|
||||
[ ] Apps Use a connected ChatGPT App using "$". Install Apps via /apps command. Restart
|
||||
Codex after enabling.
|
||||
[ ] Guardian approvals Let a guardian subagent review `on-request` approval prompts instead of showing
|
||||
them to you, including sandbox escapes and blocked network access.
|
||||
[ ] Prevent sleep while running Keep your computer awake while Codex is running a thread.
|
||||
|
||||
Press space to select or enter to save for next conversation
|
||||
@@ -6949,6 +6949,11 @@ async fn experimental_popup_includes_guardian_approval() {
|
||||
chat.open_experimental_popup();
|
||||
|
||||
let popup = render_bottom_popup(&chat, 120);
|
||||
#[cfg(target_os = "linux")]
|
||||
insta::with_settings!({ snapshot_suffix => "linux" }, {
|
||||
assert_snapshot!("experimental_popup_includes_guardian_approval", popup);
|
||||
});
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
assert_snapshot!("experimental_popup_includes_guardian_approval", popup);
|
||||
}
|
||||
|
||||
|
||||
@@ -130,52 +130,36 @@ async fn collect_output_until_exit(
|
||||
}
|
||||
|
||||
async fn wait_for_python_repl_ready(
|
||||
writer: &tokio::sync::mpsc::Sender<Vec<u8>>,
|
||||
output_rx: &mut tokio::sync::broadcast::Receiver<Vec<u8>>,
|
||||
timeout_ms: u64,
|
||||
newline: &str,
|
||||
ready_marker: &str,
|
||||
) -> anyhow::Result<Vec<u8>> {
|
||||
let mut collected = Vec::new();
|
||||
let marker = "__codex_pty_ready__";
|
||||
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_millis(timeout_ms);
|
||||
let probe_window = tokio::time::Duration::from_millis(if cfg!(windows) { 750 } else { 250 });
|
||||
|
||||
while tokio::time::Instant::now() < deadline {
|
||||
writer
|
||||
.send(format!("print('{marker}'){newline}").into_bytes())
|
||||
.await?;
|
||||
|
||||
let probe_deadline = tokio::time::Instant::now() + probe_window;
|
||||
loop {
|
||||
let now = tokio::time::Instant::now();
|
||||
if now >= deadline || now >= probe_deadline {
|
||||
break;
|
||||
}
|
||||
let remaining = std::cmp::min(
|
||||
deadline.saturating_duration_since(now),
|
||||
probe_deadline.saturating_duration_since(now),
|
||||
);
|
||||
match tokio::time::timeout(remaining, output_rx.recv()).await {
|
||||
Ok(Ok(chunk)) => {
|
||||
collected.extend_from_slice(&chunk);
|
||||
if String::from_utf8_lossy(&collected).contains(marker) {
|
||||
return Ok(collected);
|
||||
}
|
||||
let now = tokio::time::Instant::now();
|
||||
let remaining = deadline.saturating_duration_since(now);
|
||||
match tokio::time::timeout(remaining, output_rx.recv()).await {
|
||||
Ok(Ok(chunk)) => {
|
||||
collected.extend_from_slice(&chunk);
|
||||
if String::from_utf8_lossy(&collected).contains(ready_marker) {
|
||||
return Ok(collected);
|
||||
}
|
||||
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
|
||||
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
|
||||
anyhow::bail!(
|
||||
"PTY output closed while waiting for Python REPL readiness: {:?}",
|
||||
String::from_utf8_lossy(&collected)
|
||||
);
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
|
||||
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
|
||||
anyhow::bail!(
|
||||
"PTY output closed while waiting for Python REPL readiness: {:?}",
|
||||
String::from_utf8_lossy(&collected)
|
||||
);
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
|
||||
anyhow::bail!(
|
||||
"timed out waiting for Python REPL readiness in PTY: {:?}",
|
||||
"timed out waiting for Python REPL readiness marker {ready_marker:?} in PTY: {:?}",
|
||||
String::from_utf8_lossy(&collected)
|
||||
);
|
||||
}
|
||||
@@ -254,10 +238,17 @@ async fn pty_python_repl_emits_output_and_exits() -> anyhow::Result<()> {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let ready_marker = "__codex_pty_ready__";
|
||||
let args = vec![
|
||||
"-i".to_string(),
|
||||
"-q".to_string(),
|
||||
"-c".to_string(),
|
||||
format!("print('{ready_marker}')"),
|
||||
];
|
||||
let env_map: HashMap<String, String> = std::env::vars().collect();
|
||||
let spawned = spawn_pty_process(
|
||||
&python,
|
||||
&[],
|
||||
&args,
|
||||
Path::new("."),
|
||||
&env_map,
|
||||
&None,
|
||||
@@ -269,7 +260,7 @@ async fn pty_python_repl_emits_output_and_exits() -> anyhow::Result<()> {
|
||||
let newline = if cfg!(windows) { "\r\n" } else { "\n" };
|
||||
let startup_timeout_ms = if cfg!(windows) { 10_000 } else { 5_000 };
|
||||
let mut output =
|
||||
wait_for_python_repl_ready(&writer, &mut output_rx, startup_timeout_ms, newline).await?;
|
||||
wait_for_python_repl_ready(&mut output_rx, startup_timeout_ms, ready_marker).await?;
|
||||
writer
|
||||
.send(format!("print('hello from pty'){newline}").into_bytes())
|
||||
.await?;
|
||||
|
||||
8
defs.bzl
8
defs.bzl
@@ -80,7 +80,7 @@ def codex_rust_crate(
|
||||
`CARGO_BIN_EXE_*` environment variables. These are only needed for binaries from a different crate.
|
||||
"""
|
||||
test_env = {
|
||||
"INSTA_WORKSPACE_ROOT": ".",
|
||||
"INSTA_WORKSPACE_ROOT": "codex-rs",
|
||||
"INSTA_SNAPSHOT_PATH": "src",
|
||||
}
|
||||
|
||||
@@ -127,7 +127,9 @@ def codex_rust_crate(
|
||||
crate = name,
|
||||
env = test_env,
|
||||
deps = all_crate_deps(normal = True, normal_dev = True) + maybe_deps + deps_extra,
|
||||
rustc_flags = rustc_flags_extra,
|
||||
# Keep `file!()` paths Cargo-like (`core/src/...`) instead of
|
||||
# Bazel workspace-prefixed (`codex-rs/core/src/...`) for snapshot parity.
|
||||
rustc_flags = rustc_flags_extra + ["--remap-path-prefix=codex-rs="],
|
||||
rustc_env = rustc_env,
|
||||
data = test_data_extra,
|
||||
tags = test_tags,
|
||||
@@ -178,7 +180,7 @@ def codex_rust_crate(
|
||||
rustc_flags = rustc_flags_extra + ["--remap-path-prefix=codex-rs="],
|
||||
rustc_env = rustc_env,
|
||||
# Important: do not merge `test_env` here. Its unit-test-only
|
||||
# `INSTA_WORKSPACE_ROOT="."` can point integration tests at the
|
||||
# `INSTA_WORKSPACE_ROOT="codex-rs"` can point integration tests at the
|
||||
# runfiles cwd and cause false `.snap.new` churn on Linux.
|
||||
env = cargo_env,
|
||||
tags = test_tags,
|
||||
|
||||
Reference in New Issue
Block a user