mirror of
https://github.com/openai/codex.git
synced 2026-04-24 06:35:50 +00:00
stabilize zsh-fork approvals and resume --last
This commit is contained in:
@@ -472,10 +472,15 @@ async fn turn_start_shell_zsh_fork_subcommand_decline_marks_parent_declined_v2()
|
||||
first_file.display(),
|
||||
second_file.display()
|
||||
);
|
||||
// Login shells can emit an extra approval for system startup helpers
|
||||
// (for example `/usr/libexec/path_helper -s` on macOS) before the target
|
||||
// `rm` subcommands. Give the command enough budget to exercise the full
|
||||
// approval sequence on slower CI shards.
|
||||
let tool_timeout_ms = 15_000;
|
||||
let tool_call_arguments = serde_json::to_string(&serde_json::json!({
|
||||
"command": shell_command,
|
||||
"workdir": serde_json::Value::Null,
|
||||
"timeout_ms": 5000
|
||||
"timeout_ms": tool_timeout_ms
|
||||
}))?;
|
||||
let response = responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
|
||||
@@ -1099,18 +1099,12 @@ fn turn_items_for_thread(
|
||||
.map(|turn| turn.items.clone())
|
||||
}
|
||||
|
||||
fn all_thread_source_kinds() -> Vec<ThreadSourceKind> {
|
||||
fn resumable_thread_source_kinds() -> Vec<ThreadSourceKind> {
|
||||
vec![
|
||||
ThreadSourceKind::Cli,
|
||||
ThreadSourceKind::VsCode,
|
||||
ThreadSourceKind::Exec,
|
||||
ThreadSourceKind::AppServer,
|
||||
ThreadSourceKind::SubAgent,
|
||||
ThreadSourceKind::SubAgentReview,
|
||||
ThreadSourceKind::SubAgentCompact,
|
||||
ThreadSourceKind::SubAgentThreadSpawn,
|
||||
ThreadSourceKind::SubAgentOther,
|
||||
ThreadSourceKind::Unknown,
|
||||
]
|
||||
}
|
||||
|
||||
@@ -1169,7 +1163,7 @@ async fn resolve_resume_thread_id(
|
||||
limit: Some(100),
|
||||
sort_key: Some(ThreadSortKey::UpdatedAt),
|
||||
model_providers: model_providers.clone(),
|
||||
source_kinds: Some(all_thread_source_kinds()),
|
||||
source_kinds: Some(resumable_thread_source_kinds()),
|
||||
archived: Some(false),
|
||||
cwd: None,
|
||||
search_term: None,
|
||||
@@ -1209,7 +1203,7 @@ async fn resolve_resume_thread_id(
|
||||
limit: Some(100),
|
||||
sort_key: Some(ThreadSortKey::UpdatedAt),
|
||||
model_providers: model_providers.clone(),
|
||||
source_kinds: Some(all_thread_source_kinds()),
|
||||
source_kinds: Some(resumable_thread_source_kinds()),
|
||||
archived: Some(false),
|
||||
cwd: None,
|
||||
// Thread names are attached separately from rollout titles, so name
|
||||
@@ -1898,6 +1892,19 @@ mod tests {
|
||||
assert_eq!(resume_lookup_model_providers(&config, &named_args), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn resumable_thread_source_kinds_exclude_internal_threads() {
|
||||
assert_eq!(
|
||||
resumable_thread_source_kinds(),
|
||||
vec![
|
||||
ThreadSourceKind::Cli,
|
||||
ThreadSourceKind::VsCode,
|
||||
ThreadSourceKind::Exec,
|
||||
ThreadSourceKind::AppServer,
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn turn_items_for_thread_returns_matching_turn_items() {
|
||||
let thread = AppServerThread {
|
||||
|
||||
@@ -1,10 +1,17 @@
|
||||
#![allow(clippy::unwrap_used, clippy::expect_used)]
|
||||
use anyhow::Context;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::SessionMeta;
|
||||
use codex_protocol::protocol::SessionMetaLine;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use codex_utils_cargo_bin::find_resource;
|
||||
use core_test_support::test_codex_exec::test_codex_exec;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
use std::string::ToString;
|
||||
use std::time::Duration;
|
||||
use tempfile::TempDir;
|
||||
use uuid::Uuid;
|
||||
use walkdir::WalkDir;
|
||||
@@ -220,6 +227,118 @@ fn exec_resume_last_accepts_prompt_after_flag_in_json_mode() -> anyhow::Result<(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn exec_resume_last_ignores_newer_internal_thread() -> anyhow::Result<()> {
|
||||
let test = test_codex_exec();
|
||||
let fixture = exec_fixture()?;
|
||||
let repo_root = exec_repo_root()?;
|
||||
|
||||
let marker = format!("resume-last-visible-{}", Uuid::new_v4());
|
||||
let prompt = format!("echo {marker}");
|
||||
|
||||
test.cmd()
|
||||
.env("CODEX_RS_SSE_FIXTURE", &fixture)
|
||||
.env("OPENAI_BASE_URL", "http://unused.local")
|
||||
.arg("--skip-git-repo-check")
|
||||
.arg("-C")
|
||||
.arg(&repo_root)
|
||||
.arg(&prompt)
|
||||
.assert()
|
||||
.success();
|
||||
|
||||
let sessions_dir = test.home_path().join("sessions");
|
||||
let path = find_session_file_containing_marker(&sessions_dir, &marker)
|
||||
.expect("no session file found after first run");
|
||||
|
||||
// `updated_at` is second-granularity, so make the injected internal thread
|
||||
// deterministically newer than the visible exec session.
|
||||
std::thread::sleep(Duration::from_millis(1100));
|
||||
|
||||
let internal_thread_id = Uuid::new_v4();
|
||||
let internal_rollout_path = test.home_path().join("sessions/2026/03/27").join(format!(
|
||||
"rollout-2026-03-27T00-00-00-{internal_thread_id}.jsonl"
|
||||
));
|
||||
std::fs::create_dir_all(
|
||||
internal_rollout_path
|
||||
.parent()
|
||||
.expect("internal rollout parent directory"),
|
||||
)?;
|
||||
|
||||
let internal_thread_id_str = internal_thread_id.to_string();
|
||||
let internal_payload = serde_json::to_value(SessionMetaLine {
|
||||
meta: SessionMeta {
|
||||
id: ThreadId::from_string(&internal_thread_id_str)?,
|
||||
forked_from_id: None,
|
||||
timestamp: "2026-03-27T00:00:00.000Z".to_string(),
|
||||
cwd: repo_root.clone(),
|
||||
originator: "codex".to_string(),
|
||||
cli_version: "0.0.0".to_string(),
|
||||
source: SessionSource::SubAgent(SubAgentSource::MemoryConsolidation),
|
||||
agent_path: None,
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
model_provider: None,
|
||||
base_instructions: None,
|
||||
dynamic_tools: None,
|
||||
memory_mode: None,
|
||||
},
|
||||
git: None,
|
||||
})?;
|
||||
let internal_lines = [
|
||||
json!({
|
||||
"timestamp": "2026-03-27T00:00:00.000Z",
|
||||
"type": "session_meta",
|
||||
"payload": internal_payload,
|
||||
})
|
||||
.to_string(),
|
||||
json!({
|
||||
"timestamp": "2026-03-27T00:00:00.000Z",
|
||||
"type": "response_item",
|
||||
"payload": {
|
||||
"type": "message",
|
||||
"role": "user",
|
||||
"content": [{"type": "input_text", "text": "internal memory sweep"}],
|
||||
},
|
||||
})
|
||||
.to_string(),
|
||||
json!({
|
||||
"timestamp": "2026-03-27T00:00:00.000Z",
|
||||
"type": "event_msg",
|
||||
"payload": {
|
||||
"type": "user_message",
|
||||
"message": "internal memory sweep",
|
||||
"kind": "plain",
|
||||
},
|
||||
})
|
||||
.to_string(),
|
||||
];
|
||||
std::fs::write(&internal_rollout_path, internal_lines.join("\n") + "\n")?;
|
||||
|
||||
let marker2 = format!("resume-last-visible-2-{}", Uuid::new_v4());
|
||||
let prompt2 = format!("echo {marker2}");
|
||||
|
||||
test.cmd()
|
||||
.env("CODEX_RS_SSE_FIXTURE", &fixture)
|
||||
.env("OPENAI_BASE_URL", "http://unused.local")
|
||||
.arg("--skip-git-repo-check")
|
||||
.arg("-C")
|
||||
.arg(&repo_root)
|
||||
.arg(&prompt2)
|
||||
.arg("resume")
|
||||
.arg("--last")
|
||||
.assert()
|
||||
.success();
|
||||
|
||||
let resumed_path = find_session_file_containing_marker(&sessions_dir, &marker2)
|
||||
.expect("no resumed session file containing marker2");
|
||||
assert_eq!(
|
||||
resumed_path, path,
|
||||
"resume --last should ignore newer internal threads"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn exec_resume_last_respects_cwd_filter_and_all_flag() -> anyhow::Result<()> {
|
||||
let test = test_codex_exec();
|
||||
|
||||
@@ -33,17 +33,31 @@ fn duplicate_fd_for_transfer(fd: impl AsFd, name: &str) -> anyhow::Result<OwnedF
|
||||
.with_context(|| format!("failed to duplicate {name} for escalation transfer"))
|
||||
}
|
||||
|
||||
async fn connect_escalation_stream(
|
||||
handshake_client: AsyncDatagramSocket,
|
||||
) -> anyhow::Result<(AsyncSocket, OwnedFd)> {
|
||||
let (server, client) = AsyncSocket::pair()?;
|
||||
let server_stream_guard: OwnedFd = server.into_inner().into();
|
||||
let transferred_server_stream =
|
||||
duplicate_fd_for_transfer(&server_stream_guard, "handshake stream")?;
|
||||
const HANDSHAKE_MESSAGE: [u8; 1] = [0];
|
||||
// Keep one local reference to the transferred stream alive until the server
|
||||
// answers the first request. On macOS, dropping the sender's last local copy
|
||||
// immediately after the datagram handshake can make the peer observe EOF
|
||||
// before the received fd is fully servicing the stream.
|
||||
handshake_client
|
||||
.send_with_fds(&HANDSHAKE_MESSAGE, &[transferred_server_stream])
|
||||
.await
|
||||
.context("failed to send handshake datagram")?;
|
||||
Ok((client, server_stream_guard))
|
||||
}
|
||||
|
||||
pub async fn run_shell_escalation_execve_wrapper(
|
||||
file: String,
|
||||
argv: Vec<String>,
|
||||
) -> anyhow::Result<i32> {
|
||||
let handshake_client = get_escalate_client()?;
|
||||
let (server, client) = AsyncSocket::pair()?;
|
||||
const HANDSHAKE_MESSAGE: [u8; 1] = [0];
|
||||
handshake_client
|
||||
.send_with_fds(&HANDSHAKE_MESSAGE, &[server.into_inner().into()])
|
||||
.await
|
||||
.context("failed to send handshake datagram")?;
|
||||
let (client, server_stream_guard) = connect_escalation_stream(handshake_client).await?;
|
||||
let env = std::env::vars()
|
||||
.filter(|(k, _)| !matches!(k.as_str(), ESCALATE_SOCKET_ENV_VAR | EXEC_WRAPPER_ENV_VAR))
|
||||
.collect();
|
||||
@@ -56,6 +70,11 @@ pub async fn run_shell_escalation_execve_wrapper(
|
||||
})
|
||||
.await
|
||||
.context("failed to send EscalateRequest")?;
|
||||
// Once the first request has been written into the stream, the local guard
|
||||
// is no longer needed to bridge the datagram handoff. Dropping it here
|
||||
// lets client-side reads still observe EOF if the server exits before
|
||||
// replying.
|
||||
drop(server_stream_guard);
|
||||
let message = client
|
||||
.receive::<EscalateResponse>()
|
||||
.await
|
||||
@@ -128,6 +147,12 @@ mod tests {
|
||||
use super::*;
|
||||
use std::os::fd::AsRawFd;
|
||||
use std::os::unix::net::UnixStream;
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
use pretty_assertions::assert_eq;
|
||||
use tokio::time::sleep;
|
||||
use tokio::time::timeout;
|
||||
|
||||
#[test]
|
||||
fn duplicate_fd_for_transfer_does_not_close_original() {
|
||||
@@ -141,4 +166,83 @@ mod tests {
|
||||
|
||||
assert_ne!(unsafe { libc::fcntl(original_fd, libc::F_GETFD) }, -1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn connect_escalation_stream_keeps_sender_alive_until_first_request_write()
|
||||
-> anyhow::Result<()> {
|
||||
let (server_datagram, client_datagram) = AsyncDatagramSocket::pair()?;
|
||||
let client_task = tokio::spawn(async move {
|
||||
let (client_stream, server_stream_guard) =
|
||||
connect_escalation_stream(client_datagram).await?;
|
||||
let guard_fd = server_stream_guard.as_raw_fd();
|
||||
assert_ne!(unsafe { libc::fcntl(guard_fd, libc::F_GETFD) }, -1);
|
||||
client_stream
|
||||
.send(EscalateRequest {
|
||||
file: PathBuf::from("/bin/echo"),
|
||||
argv: vec!["echo".to_string(), "hello".to_string()],
|
||||
workdir: AbsolutePathBuf::current_dir()?,
|
||||
env: Default::default(),
|
||||
})
|
||||
.await?;
|
||||
drop(server_stream_guard);
|
||||
assert_eq!(-1, unsafe { libc::fcntl(guard_fd, libc::F_GETFD) });
|
||||
let response = client_stream.receive::<EscalateResponse>().await?;
|
||||
Ok::<EscalateResponse, anyhow::Error>(response)
|
||||
});
|
||||
|
||||
let (_, mut fds) = server_datagram.receive_with_fds().await?;
|
||||
assert_eq!(fds.len(), 1);
|
||||
sleep(Duration::from_millis(20)).await;
|
||||
let server_stream = AsyncSocket::from_fd(fds.remove(0))?;
|
||||
let request = server_stream.receive::<EscalateRequest>().await?;
|
||||
assert_eq!(request.file, PathBuf::from("/bin/echo"));
|
||||
assert_eq!(request.argv, vec!["echo".to_string(), "hello".to_string()]);
|
||||
|
||||
let expected = EscalateResponse {
|
||||
action: EscalateAction::Deny {
|
||||
reason: Some("not now".to_string()),
|
||||
},
|
||||
};
|
||||
server_stream.send(expected.clone()).await?;
|
||||
let response = client_task.await??;
|
||||
assert_eq!(response, expected);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn dropping_guard_after_request_write_preserves_server_eof() -> anyhow::Result<()> {
|
||||
let (server_datagram, client_datagram) = AsyncDatagramSocket::pair()?;
|
||||
let client_task = tokio::spawn(async move {
|
||||
let (client_stream, server_stream_guard) =
|
||||
connect_escalation_stream(client_datagram).await?;
|
||||
client_stream
|
||||
.send(EscalateRequest {
|
||||
file: PathBuf::from("/bin/echo"),
|
||||
argv: vec!["echo".to_string()],
|
||||
workdir: AbsolutePathBuf::current_dir()?,
|
||||
env: Default::default(),
|
||||
})
|
||||
.await?;
|
||||
drop(server_stream_guard);
|
||||
let err = timeout(
|
||||
Duration::from_millis(250),
|
||||
client_stream.receive::<EscalateResponse>(),
|
||||
)
|
||||
.await
|
||||
.expect("server close should not hang the client")
|
||||
.expect_err("expected EOF after server closes without replying");
|
||||
assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
|
||||
Ok::<(), anyhow::Error>(())
|
||||
});
|
||||
|
||||
let (_, mut fds) = server_datagram.receive_with_fds().await?;
|
||||
assert_eq!(fds.len(), 1);
|
||||
let server_stream = AsyncSocket::from_fd(fds.remove(0))?;
|
||||
let request = server_stream.receive::<EscalateRequest>().await?;
|
||||
assert_eq!(request.file, PathBuf::from("/bin/echo"));
|
||||
drop(server_stream);
|
||||
|
||||
client_task.await??;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user