mirror of
https://github.com/openai/codex.git
synced 2026-04-28 16:45:54 +00:00
Register agent identities behind use_agent_identity (#17386)
## Summary Stack PR 2 of 4 for feature-gated agent identity support. This PR adds agent identity registration behind `features.use_agent_identity`. It keeps the app-server protocol unchanged and starts registration after ChatGPT auth exists rather than requiring a client restart. ## Stack - PR1: https://github.com/openai/codex/pull/17385 - add `features.use_agent_identity` - PR2: https://github.com/openai/codex/pull/17386 - this PR - PR3: https://github.com/openai/codex/pull/17387 - register agent tasks when enabled - PR4: https://github.com/openai/codex/pull/17388 - use `AgentAssertion` downstream when enabled ## Validation Covered as part of the local stack validation pass: - `just fmt` - `cargo test -p codex-core --lib agent_identity` - `cargo test -p codex-core --lib agent_assertion` - `cargo test -p codex-core --lib websocket_agent_task` - `cargo test -p codex-api api_bridge` - `cargo build -p codex-cli --bin codex` ## Notes The full local app-server E2E path is still being debugged after PR creation. The current branch stack is directionally ready for review while that follow-up continues.
This commit is contained in:
@@ -5,6 +5,11 @@
|
||||
use anyhow::Result;
|
||||
use anyhow::anyhow;
|
||||
use codex_features::Feature;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use core_test_support::responses::ev_assistant_message;
|
||||
use core_test_support::responses::ev_completed;
|
||||
use core_test_support::responses::ev_function_call;
|
||||
@@ -13,10 +18,12 @@ use core_test_support::responses::mount_sse_once_match;
|
||||
use core_test_support::responses::sse;
|
||||
use core_test_support::responses::start_mock_server;
|
||||
use core_test_support::skip_if_no_network;
|
||||
use core_test_support::test_codex::TestCodex;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
use std::io::ErrorKind;
|
||||
use std::io::Write;
|
||||
use std::path::Path;
|
||||
use std::process::Child;
|
||||
@@ -29,8 +36,9 @@ use tempfile::TempDir;
|
||||
const PARENT_PROMPT: &str = "spawn a subagent and report when it is started";
|
||||
const CHILD_PROMPT: &str = "child: say done";
|
||||
const SPAWN_CALL_ID: &str = "spawn-call-1";
|
||||
const PROXY_START_TIMEOUT: Duration = Duration::from_secs(/*secs*/ 5);
|
||||
const PROXY_START_TIMEOUT: Duration = Duration::from_secs(/*secs*/ 30);
|
||||
const PROXY_POLL_INTERVAL: Duration = Duration::from_millis(/*millis*/ 20);
|
||||
const TURN_TIMEOUT: Duration = Duration::from_secs(/*secs*/ 60);
|
||||
|
||||
struct ResponsesApiProxy {
|
||||
child: Child,
|
||||
@@ -40,8 +48,17 @@ struct ResponsesApiProxy {
|
||||
impl ResponsesApiProxy {
|
||||
fn start(upstream_url: &str, dump_dir: &Path) -> Result<Self> {
|
||||
let server_info = dump_dir.join("server-info.json");
|
||||
let mut child = StdCommand::new(codex_utils_cargo_bin::cargo_bin("codex")?)
|
||||
.args(["responses-api-proxy", "--server-info"])
|
||||
let (proxy_program, use_codex_multitool) =
|
||||
match codex_utils_cargo_bin::cargo_bin("codex-responses-api-proxy") {
|
||||
Ok(path) => (path, false),
|
||||
Err(_) => (codex_utils_cargo_bin::cargo_bin("codex")?, true),
|
||||
};
|
||||
let mut command = StdCommand::new(proxy_program);
|
||||
if use_codex_multitool {
|
||||
command.arg("responses-api-proxy");
|
||||
}
|
||||
let mut child = command
|
||||
.args(["--server-info"])
|
||||
.arg(&server_info)
|
||||
.args(["--upstream-url", upstream_url, "--dump-dir"])
|
||||
.arg(dump_dir)
|
||||
@@ -58,15 +75,27 @@ impl ResponsesApiProxy {
|
||||
|
||||
let deadline = Instant::now() + PROXY_START_TIMEOUT;
|
||||
loop {
|
||||
if let Ok(info) = std::fs::read_to_string(&server_info) {
|
||||
let port = serde_json::from_str::<Value>(&info)?
|
||||
.get("port")
|
||||
.and_then(Value::as_u64)
|
||||
.ok_or_else(|| anyhow!("proxy server info missing port"))?;
|
||||
return Ok(Self {
|
||||
child,
|
||||
port: u16::try_from(port)?,
|
||||
});
|
||||
match std::fs::read_to_string(&server_info) {
|
||||
Ok(info) => {
|
||||
if !info.trim().is_empty() {
|
||||
match serde_json::from_str::<Value>(&info) {
|
||||
Ok(info) => {
|
||||
let port = info
|
||||
.get("port")
|
||||
.and_then(Value::as_u64)
|
||||
.ok_or_else(|| anyhow!("proxy server info missing port"))?;
|
||||
return Ok(Self {
|
||||
child,
|
||||
port: u16::try_from(port)?,
|
||||
});
|
||||
}
|
||||
Err(err) if err.is_eof() => {}
|
||||
Err(err) => return Err(err.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) if err.kind() == ErrorKind::NotFound => {}
|
||||
Err(err) => return Err(err.into()),
|
||||
}
|
||||
if let Some(status) = child.try_wait()? {
|
||||
return Err(anyhow!(
|
||||
@@ -144,7 +173,7 @@ async fn responses_api_proxy_dumps_parent_and_subagent_identity_headers() -> Res
|
||||
.expect("test config should allow feature update");
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
test.submit_turn(PARENT_PROMPT).await?;
|
||||
submit_turn_with_timeout(&test, PARENT_PROMPT, dump_dir.path()).await?;
|
||||
|
||||
let dumps = wait_for_proxy_request_dumps(dump_dir.path())?;
|
||||
let parent = dumps
|
||||
@@ -178,6 +207,85 @@ async fn responses_api_proxy_dumps_parent_and_subagent_identity_headers() -> Res
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn submit_turn_with_timeout(test: &TestCodex, prompt: &str, dump_dir: &Path) -> Result<()> {
|
||||
let session_model = test.session_configured.model.clone();
|
||||
test.codex
|
||||
.submit(Op::UserTurn {
|
||||
items: vec![UserInput::Text {
|
||||
text: prompt.into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
cwd: test.config.cwd.to_path_buf(),
|
||||
approval_policy: AskForApproval::OnRequest,
|
||||
approvals_reviewer: None,
|
||||
sandbox_policy: SandboxPolicy::WorkspaceWrite {
|
||||
writable_roots: Vec::new(),
|
||||
read_only_access: Default::default(),
|
||||
network_access: false,
|
||||
exclude_tmpdir_env_var: false,
|
||||
exclude_slash_tmp: false,
|
||||
},
|
||||
model: session_model,
|
||||
effort: None,
|
||||
summary: None,
|
||||
service_tier: None,
|
||||
collaboration_mode: None,
|
||||
personality: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
let turn_started = wait_for_event_result(test, "turn started", dump_dir, |event| {
|
||||
matches!(event, EventMsg::TurnStarted(_))
|
||||
})
|
||||
.await?;
|
||||
let EventMsg::TurnStarted(turn_started) = turn_started else {
|
||||
unreachable!("event predicate only matches turn started events");
|
||||
};
|
||||
wait_for_event_result(test, "turn complete", dump_dir, |event| match event {
|
||||
EventMsg::TurnComplete(event) => event.turn_id == turn_started.turn_id,
|
||||
_ => false,
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn wait_for_event_result<F>(
|
||||
test: &TestCodex,
|
||||
stage: &str,
|
||||
dump_dir: &Path,
|
||||
mut predicate: F,
|
||||
) -> Result<EventMsg>
|
||||
where
|
||||
F: FnMut(&EventMsg) -> bool,
|
||||
{
|
||||
let mut seen_events = Vec::new();
|
||||
tokio::time::timeout(TURN_TIMEOUT, async {
|
||||
loop {
|
||||
let event = test.codex.next_event().await?;
|
||||
seen_events.push(event_summary(&event.msg));
|
||||
if predicate(&event.msg) {
|
||||
return Ok::<EventMsg, anyhow::Error>(event.msg);
|
||||
}
|
||||
}
|
||||
})
|
||||
.await
|
||||
.map_err(|_| {
|
||||
anyhow!(
|
||||
"timed out waiting for {stage}; saw events: {}; {}",
|
||||
seen_events.join(" | "),
|
||||
proxy_dump_summary(dump_dir)
|
||||
)
|
||||
})?
|
||||
}
|
||||
|
||||
fn event_summary(event: &EventMsg) -> String {
|
||||
let mut summary = format!("{event:?}");
|
||||
summary.truncate(240);
|
||||
summary
|
||||
}
|
||||
|
||||
fn request_body_contains(req: &wiremock::Request, text: &str) -> bool {
|
||||
std::str::from_utf8(&req.body).is_ok_and(|body| body.contains(text))
|
||||
}
|
||||
@@ -212,12 +320,54 @@ fn read_proxy_request_dumps(dump_dir: &Path) -> Result<Vec<Value>> {
|
||||
.and_then(|name| name.to_str())
|
||||
.is_some_and(|name| name.ends_with("-request.json"))
|
||||
{
|
||||
dumps.push(serde_json::from_str(&std::fs::read_to_string(&path)?)?);
|
||||
let contents = std::fs::read_to_string(&path)?;
|
||||
if contents.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
match serde_json::from_str(&contents) {
|
||||
Ok(dump) => dumps.push(dump),
|
||||
Err(err) if err.is_eof() => continue,
|
||||
Err(err) => return Err(err.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(dumps)
|
||||
}
|
||||
|
||||
fn proxy_dump_summary(dump_dir: &Path) -> String {
|
||||
match read_proxy_request_dumps(dump_dir) {
|
||||
Ok(dumps) => {
|
||||
let bodies = dumps
|
||||
.iter()
|
||||
.filter_map(|dump| dump.get("body"))
|
||||
.map(Value::to_string)
|
||||
.collect::<Vec<_>>()
|
||||
.join("; ");
|
||||
format!("proxy wrote {} request dumps: {bodies}", dumps.len())
|
||||
}
|
||||
Err(err) => format!("failed to read proxy request dumps: {err}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn read_proxy_request_dumps_ignores_in_progress_files() -> Result<()> {
|
||||
let dump_dir = TempDir::new()?;
|
||||
std::fs::write(dump_dir.path().join("empty-request.json"), "")?;
|
||||
std::fs::write(dump_dir.path().join("partial-request.json"), "{\"body\"")?;
|
||||
std::fs::write(
|
||||
dump_dir.path().join("complete-request.json"),
|
||||
serde_json::to_string(&json!({ "body": "ready" }))?,
|
||||
)?;
|
||||
|
||||
assert_eq!(
|
||||
read_proxy_request_dumps(dump_dir.path())?,
|
||||
vec![json!({ "body": "ready" })]
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn dump_body_contains(dump: &Value, text: &str) -> bool {
|
||||
dump.get("body")
|
||||
.is_some_and(|body| body.to_string().contains(text))
|
||||
|
||||
Reference in New Issue
Block a user