Merge branch 'dev/codex/add-fork-option-to-codex-exec' into dev/friel/collab-stack

This commit is contained in:
Friel
2026-03-14 15:20:25 -07:00
8 changed files with 369 additions and 21 deletions

View File

@@ -51,6 +51,7 @@ You can enable notifications by configuring a script that is run whenever the ag
### `codex exec` to run Codex programmatically/non-interactively
To run Codex non-interactively, run `codex exec PROMPT` (you can also pass the prompt via `stdin`) and Codex will work on your task until it decides that it is done and exits. Output is printed to the terminal directly. You can set the `RUST_LOG` environment variable to see more about what's going on.
Use `codex exec --fork <SESSION_ID> PROMPT` to fork an existing session without launching the interactive picker/UI.
Use `codex exec --ephemeral ...` to run without persisting session rollout files to disk.
### Experimenting with the Codex Sandbox

View File

@@ -578,7 +578,11 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
let exit_info = run_interactive_tui(interactive, arg0_paths.clone()).await?;
handle_app_exit(exit_info)?;
}
Some(Subcommand::Exec(mut exec_cli)) => {
Some(Subcommand::Exec(exec_cli)) => {
let mut exec_cli = match exec_cli.validate() {
Ok(exec_cli) => exec_cli,
Err(err) => err.exit(),
};
prepend_config_flags(
&mut exec_cli.config_overrides,
root_config_overrides.clone(),
@@ -1203,6 +1207,40 @@ mod tests {
assert_eq!(args.session_id.as_deref(), Some("session-123"));
assert_eq!(args.prompt.as_deref(), Some("re-review"));
}
#[test]
fn exec_fork_accepts_prompt_positional() {
let cli = MultitoolCli::try_parse_from([
"codex",
"exec",
"--json",
"--fork",
"session-123",
"2+2",
])
.expect("parse should succeed");
let Some(Subcommand::Exec(exec)) = cli.subcommand else {
panic!("expected exec subcommand");
};
assert_eq!(exec.fork_session_id.as_deref(), Some("session-123"));
assert!(exec.command.is_none());
assert_eq!(exec.prompt.as_deref(), Some("2+2"));
}
#[test]
fn exec_fork_conflicts_with_resume_subcommand() {
let cli =
MultitoolCli::try_parse_from(["codex", "exec", "--fork", "session-123", "resume"])
.expect("parse should succeed");
let Some(Subcommand::Exec(exec)) = cli.subcommand else {
panic!("expected exec subcommand");
};
let validate_result = exec.validate();
assert!(validate_result.is_err());
}
fn app_server_from_args(args: &[&str]) -> AppServerCommand {
let cli = MultitoolCli::try_parse_from(args).expect("parse");

View File

@@ -12,6 +12,12 @@ pub struct Cli {
#[command(subcommand)]
pub command: Option<Command>,
/// Fork from an existing session id (or thread name) before sending the prompt.
///
/// This creates a new session with copied history, similar to `codex fork`.
#[arg(long = "fork", value_name = "SESSION_ID")]
pub fork_session_id: Option<String>,
/// Optional image(s) to attach to the initial prompt.
#[arg(
long = "image",
@@ -114,6 +120,19 @@ pub struct Cli {
pub prompt: Option<String>,
}
impl Cli {
pub fn validate(self) -> Result<Self, clap::Error> {
if self.fork_session_id.is_some() && self.command.is_some() {
return Err(clap::Error::raw(
clap::error::ErrorKind::ArgumentConflict,
"--fork cannot be used with subcommands",
));
}
Ok(self)
}
}
#[derive(Debug, clap::Subcommand)]
pub enum Command {
/// Resume a previous session by id or pick the most recent with --last.
@@ -315,4 +334,23 @@ mod tests {
assert_eq!(args.session_id.as_deref(), Some("session-123"));
assert_eq!(args.prompt.as_deref(), Some(PROMPT));
}
#[test]
fn fork_option_parses_prompt() {
const PROMPT: &str = "echo fork-non-interactive";
let cli = Cli::parse_from(["codex-exec", "--fork", "session-123", "--json", PROMPT]);
assert_eq!(cli.fork_session_id.as_deref(), Some("session-123"));
assert_eq!(cli.prompt.as_deref(), Some(PROMPT));
assert!(cli.command.is_none());
}
#[test]
fn fork_option_conflicts_with_subcommands() {
let err = Cli::try_parse_from(["codex-exec", "--fork", "session-123", "resume"])
.and_then(Cli::validate)
.expect_err("fork should conflict with subcommands");
assert_eq!(err.kind(), clap::error::ErrorKind::ArgumentConflict);
}
}

View File

@@ -30,6 +30,8 @@ use codex_app_server_protocol::ReviewStartResponse;
use codex_app_server_protocol::ReviewTarget as ApiReviewTarget;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ThreadForkParams;
use codex_app_server_protocol::ThreadForkResponse;
use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadStartParams;
@@ -138,6 +140,7 @@ struct ExecRunArgs {
cursor_ansi: bool,
dangerously_bypass_approvals_and_sandbox: bool,
exec_span: tracing::Span,
fork_session_id: Option<String>,
images: Vec<PathBuf>,
json_mode: bool,
last_message_file: Option<PathBuf>,
@@ -165,6 +168,7 @@ pub async fn run_main(cli: Cli, arg0_paths: Arg0DispatchPaths) -> anyhow::Result
let Cli {
command,
fork_session_id,
images,
model: model_cli_arg,
oss,
@@ -452,6 +456,7 @@ pub async fn run_main(cli: Cli, arg0_paths: Arg0DispatchPaths) -> anyhow::Result
cursor_ansi,
dangerously_bypass_approvals_and_sandbox,
exec_span: exec_span.clone(),
fork_session_id,
images,
json_mode,
last_message_file,
@@ -474,6 +479,7 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
cursor_ansi,
dangerously_bypass_approvals_and_sandbox,
exec_span,
fork_session_id,
images,
json_mode,
last_message_file,
@@ -541,9 +547,10 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
anyhow::anyhow!("failed to initialize in-process app-server client: {err}")
})?;
// Handle resume subcommand by resolving a rollout path and using explicit resume API.
let (primary_thread_id, fallback_session_configured) =
if let Some(ExecCommand::Resume(args)) = command.as_ref() {
// Handle resume/fork/start using the explicit in-process app-server APIs so
// exec stays aligned with the app-server bootstrap path on main.
let (primary_thread_id, fallback_session_configured) = match command.as_ref() {
Some(ExecCommand::Resume(args)) => {
let resume_path = resolve_resume_path(&config, args).await?;
if let Some(path) = resume_path {
@@ -575,22 +582,44 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
.map_err(anyhow::Error::msg)?;
(session_configured.session_id, session_configured)
}
} else {
let response: ThreadStartResponse = send_request_with_response(
&client,
ClientRequest::ThreadStart {
request_id: request_ids.next(),
params: thread_start_params_from_config(&config),
},
"thread/start",
)
.await
.map_err(anyhow::Error::msg)?;
let session_configured = session_configured_from_thread_start_response(&response)
}
Some(ExecCommand::Review(_)) | None => {
if let Some(session_id) = fork_session_id.as_deref() {
let fork_path = resolve_fork_path(&config, session_id).await?;
let response: ThreadForkResponse = send_request_with_response(
&client,
ClientRequest::ThreadFork {
request_id: request_ids.next(),
params: thread_fork_params_from_config(
&config,
session_id,
Some(fork_path),
),
},
"thread/fork",
)
.await
.map_err(anyhow::Error::msg)?;
(session_configured.session_id, session_configured)
};
let session_configured = session_configured_from_thread_fork_response(&response)
.map_err(anyhow::Error::msg)?;
(session_configured.session_id, session_configured)
} else {
let response: ThreadStartResponse = send_request_with_response(
&client,
ClientRequest::ThreadStart {
request_id: request_ids.next(),
params: thread_start_params_from_config(&config),
},
"thread/start",
)
.await
.map_err(anyhow::Error::msg)?;
let session_configured = session_configured_from_thread_start_response(&response)
.map_err(anyhow::Error::msg)?;
(session_configured.session_id, session_configured)
}
}
};
let primary_thread_id_for_span = primary_thread_id.to_string();
let mut buffered_events = VecDeque::new();
// Use the start/resume response as the authoritative bootstrap payload.
@@ -952,6 +981,23 @@ fn approvals_reviewer_override_from_config(
Some(config.approvals_reviewer.into())
}
fn thread_fork_params_from_config(
config: &Config,
thread_id: &str,
path: Option<PathBuf>,
) -> ThreadForkParams {
ThreadForkParams {
thread_id: thread_id.to_string(),
path,
model: config.model.clone(),
model_provider: Some(config.model_provider_id.clone()),
cwd: Some(config.cwd.to_string_lossy().to_string()),
approval_policy: Some(config.permissions.approval_policy.value().into()),
sandbox: sandbox_mode_from_policy(config.permissions.sandbox_policy.get()),
..ThreadForkParams::default()
}
}
async fn send_request_with_response<T>(
client: &InProcessAppServerClient,
request: ClientRequest,
@@ -1005,6 +1051,24 @@ fn session_configured_from_thread_resume_response(
)
}
fn session_configured_from_thread_fork_response(
response: &ThreadForkResponse,
) -> Result<SessionConfiguredEvent, String> {
session_configured_from_thread_response(
&response.thread.id,
response.thread.name.clone(),
response.thread.path.clone(),
response.model.clone(),
response.model_provider.clone(),
response.service_tier,
response.approval_policy.to_core(),
response.approvals_reviewer.to_core(),
response.sandbox.to_core(),
response.cwd.clone(),
response.reasoning_effort,
)
}
#[expect(
clippy::too_many_arguments,
reason = "session mapping keeps explicit fields"
@@ -1437,6 +1501,27 @@ async fn resolve_resume_path(
}
}
async fn resolve_fork_path(config: &Config, session_id: &str) -> anyhow::Result<PathBuf> {
resolve_thread_path_by_id_or_name(config, session_id)
.await?
.ok_or_else(|| anyhow::anyhow!("No saved session found with ID {session_id}"))
}
async fn resolve_thread_path_by_id_or_name(
config: &Config,
id_or_name: &str,
) -> anyhow::Result<Option<PathBuf>> {
if Uuid::parse_str(id_or_name).is_ok() {
find_thread_path_by_id_str(&config.codex_home, id_or_name)
.await
.map_err(anyhow::Error::from)
} else {
find_thread_path_by_name_str(&config.codex_home, id_or_name)
.await
.map_err(anyhow::Error::from)
}
}
fn load_output_schema(path: Option<PathBuf>) -> Option<Value> {
let path = path?;

View File

@@ -29,7 +29,10 @@ fn main() -> anyhow::Result<()> {
arg0_dispatch_or_else(|arg0_paths: Arg0DispatchPaths| async move {
let top_cli = TopCli::parse();
// Merge root-level overrides into inner CLI struct so downstream logic remains unchanged.
let mut inner = top_cli.inner;
let mut inner = match top_cli.inner.validate() {
Ok(inner) => inner,
Err(err) => err.exit(),
};
inner
.config_overrides
.raw_overrides
@@ -79,4 +82,24 @@ mod tests {
"reasoning_level=xhigh"
);
}
#[test]
fn top_cli_parses_fork_option_with_root_config() {
let cli = TopCli::parse_from([
"codex-exec",
"--config",
"reasoning_level=xhigh",
"--fork",
"session-123",
"echo fork",
]);
assert_eq!(cli.inner.fork_session_id.as_deref(), Some("session-123"));
assert!(cli.inner.command.is_none());
assert_eq!(cli.inner.prompt.as_deref(), Some("echo fork"));
assert_eq!(cli.config_overrides.raw_overrides.len(), 1);
assert_eq!(
cli.config_overrides.raw_overrides[0],
"reasoning_level=xhigh"
);
}
}

View File

@@ -0,0 +1,162 @@
#![allow(clippy::unwrap_used, clippy::expect_used)]
use anyhow::Context;
use codex_utils_cargo_bin::find_resource;
use core_test_support::test_codex_exec::test_codex_exec;
use serde_json::Value;
use std::string::ToString;
use uuid::Uuid;
use walkdir::WalkDir;
/// Utility: scan the sessions dir for a rollout file that contains `marker`
/// in any response_item.message.content entry. Returns the absolute path.
fn find_session_file_containing_marker(
sessions_dir: &std::path::Path,
marker: &str,
) -> Option<std::path::PathBuf> {
for entry in WalkDir::new(sessions_dir) {
let entry = match entry {
Ok(e) => e,
Err(_) => continue,
};
if !entry.file_type().is_file() {
continue;
}
if !entry.file_name().to_string_lossy().ends_with(".jsonl") {
continue;
}
let path = entry.path();
let Ok(content) = std::fs::read_to_string(path) else {
continue;
};
// Skip the first meta line and scan remaining JSONL entries.
let mut lines = content.lines();
if lines.next().is_none() {
continue;
}
for line in lines {
if line.trim().is_empty() {
continue;
}
let Ok(item): Result<Value, _> = serde_json::from_str(line) else {
continue;
};
if item.get("type").and_then(|t| t.as_str()) == Some("response_item")
&& let Some(payload) = item.get("payload")
&& payload.get("type").and_then(|t| t.as_str()) == Some("message")
&& payload
.get("content")
.map(ToString::to_string)
.unwrap_or_default()
.contains(marker)
{
return Some(path.to_path_buf());
}
}
}
None
}
/// Extract the conversation UUID from the first SessionMeta line in the rollout file.
fn extract_conversation_id(path: &std::path::Path) -> String {
let content = std::fs::read_to_string(path).unwrap();
let mut lines = content.lines();
let meta_line = lines.next().expect("missing meta line");
let meta: Value = serde_json::from_str(meta_line).expect("invalid meta json");
meta.get("payload")
.and_then(|p| p.get("id"))
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string()
}
fn extract_forked_from_id(path: &std::path::Path) -> Option<String> {
let content = std::fs::read_to_string(path).unwrap();
let mut lines = content.lines();
let meta_line = lines.next().expect("missing meta line");
let meta: Value = serde_json::from_str(meta_line).expect("invalid meta json");
meta.get("payload")
.and_then(|payload| payload.get("forked_from_id"))
.and_then(Value::as_str)
.map(ToString::to_string)
}
fn rollout_contains_fork_reference(path: &std::path::Path) -> bool {
let Ok(content) = std::fs::read_to_string(path) else {
return false;
};
content.lines().skip(1).any(|line| {
serde_json::from_str::<Value>(line)
.ok()
.and_then(|item| item.get("type").and_then(Value::as_str).map(str::to_string))
.as_deref()
== Some("fork_reference")
})
}
fn exec_fixture() -> anyhow::Result<std::path::PathBuf> {
Ok(find_resource!("tests/fixtures/cli_responses_fixture.sse")?)
}
#[test]
fn exec_fork_by_id_creates_new_session_with_copied_history() -> anyhow::Result<()> {
let test = test_codex_exec();
let fixture = exec_fixture()?;
let marker = format!("fork-base-{}", Uuid::new_v4());
let prompt = format!("echo {marker}");
test.cmd()
.env("CODEX_RS_SSE_FIXTURE", &fixture)
.env("OPENAI_BASE_URL", "http://unused.local")
.arg("--skip-git-repo-check")
.arg(&prompt)
.assert()
.success();
let sessions_dir = test.home_path().join("sessions");
let original_path = find_session_file_containing_marker(&sessions_dir, &marker)
.context("no session file found after first run")?;
let session_id = extract_conversation_id(&original_path);
let marker2 = format!("fork-follow-up-{}", Uuid::new_v4());
let prompt2 = format!("echo {marker2}");
test.cmd()
.env("CODEX_RS_SSE_FIXTURE", &fixture)
.env("OPENAI_BASE_URL", "http://unused.local")
.arg("--skip-git-repo-check")
.arg("--fork")
.arg(&session_id)
.arg(&prompt2)
.assert()
.success();
let forked_path = find_session_file_containing_marker(&sessions_dir, &marker2)
.context("no forked session file found for second marker")?;
assert_ne!(
forked_path, original_path,
"fork should create a new session file"
);
let forked_content = std::fs::read_to_string(&forked_path)?;
assert_eq!(
extract_forked_from_id(&forked_path).as_deref(),
Some(session_id.as_str())
);
assert!(
forked_content.contains(&marker) || rollout_contains_fork_reference(&forked_path),
"forked rollout should either inline parent history or record a fork reference"
);
assert!(forked_content.contains(&marker2));
let original_content = std::fs::read_to_string(&original_path)?;
assert!(original_content.contains(&marker));
assert!(
!original_content.contains(&marker2),
"original session should not receive the forked prompt"
);
Ok(())
}

View File

@@ -3,6 +3,7 @@ mod add_dir;
mod apply_patch;
mod auth_env;
mod ephemeral;
mod fork;
mod mcp_required_exit;
mod originator;
mod output_schema;

View File

@@ -134,7 +134,7 @@ fn pipes_stdin_and_stdout_through_socket() -> anyhow::Result<()> {
assert_eq!(stdout, b"response");
let received = rx
.recv_timeout(Duration::from_secs(1))
.recv_timeout(Duration::from_secs(5))
.context("server did not receive data in time")?;
assert_eq!(received, request);