mirror of
https://github.com/openai/codex.git
synced 2026-04-24 06:35:50 +00:00
merge upstream/dev/codex/add-fork-option-to-codex-exec into collab stack
This commit is contained in:
@@ -50,7 +50,8 @@ You can enable notifications by configuring a script that is run whenever the ag
|
||||
|
||||
### `codex exec` to run Codex programmatically/non-interactively
|
||||
|
||||
To run Codex non-interactively, run `codex exec PROMPT` (you can also pass the prompt via `stdin`) and Codex will work on your task until it decides that it is done and exits. If you provide both a prompt argument and piped stdin, Codex appends stdin as a `<stdin>` block after the prompt so patterns like `echo "my output" | codex exec "Summarize this concisely"` work naturally. Output is printed to the terminal directly. You can set the `RUST_LOG` environment variable to see more about what's going on.
|
||||
To run Codex non-interactively, run `codex exec PROMPT` (you can also pass the prompt via `stdin`) and Codex will work on your task until it decides that it is done and exits. Output is printed to the terminal directly. You can set the `RUST_LOG` environment variable to see more about what's going on.
|
||||
Use `codex exec --fork <SESSION_ID> PROMPT` to fork an existing session without launching the interactive picker/UI.
|
||||
Use `codex exec --ephemeral ...` to run without persisting session rollout files to disk.
|
||||
|
||||
### Experimenting with the Codex Sandbox
|
||||
|
||||
@@ -629,12 +629,16 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
|
||||
.await?;
|
||||
handle_app_exit(exit_info)?;
|
||||
}
|
||||
Some(Subcommand::Exec(mut exec_cli)) => {
|
||||
Some(Subcommand::Exec(exec_cli)) => {
|
||||
reject_remote_mode_for_subcommand(
|
||||
root_remote.as_deref(),
|
||||
root_remote_auth_token_env.as_deref(),
|
||||
"exec",
|
||||
)?;
|
||||
let mut exec_cli = match exec_cli.validate() {
|
||||
Ok(exec_cli) => exec_cli,
|
||||
Err(err) => err.exit(),
|
||||
};
|
||||
prepend_config_flags(
|
||||
&mut exec_cli.config_overrides,
|
||||
root_config_overrides.clone(),
|
||||
@@ -1478,6 +1482,40 @@ mod tests {
|
||||
assert_eq!(args.session_id.as_deref(), Some("session-123"));
|
||||
assert_eq!(args.prompt.as_deref(), Some("re-review"));
|
||||
}
|
||||
#[test]
|
||||
fn exec_fork_accepts_prompt_positional() {
|
||||
let cli = MultitoolCli::try_parse_from([
|
||||
"codex",
|
||||
"exec",
|
||||
"--json",
|
||||
"--fork",
|
||||
"session-123",
|
||||
"2+2",
|
||||
])
|
||||
.expect("parse should succeed");
|
||||
|
||||
let Some(Subcommand::Exec(exec)) = cli.subcommand else {
|
||||
panic!("expected exec subcommand");
|
||||
};
|
||||
|
||||
assert_eq!(exec.fork_session_id.as_deref(), Some("session-123"));
|
||||
assert!(exec.command.is_none());
|
||||
assert_eq!(exec.prompt.as_deref(), Some("2+2"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn exec_fork_conflicts_with_resume_subcommand() {
|
||||
let cli =
|
||||
MultitoolCli::try_parse_from(["codex", "exec", "--fork", "session-123", "resume"])
|
||||
.expect("parse should succeed");
|
||||
|
||||
let Some(Subcommand::Exec(exec)) = cli.subcommand else {
|
||||
panic!("expected exec subcommand");
|
||||
};
|
||||
|
||||
let validate_result = exec.validate();
|
||||
assert!(validate_result.is_err());
|
||||
}
|
||||
|
||||
fn app_server_from_args(args: &[&str]) -> AppServerCommand {
|
||||
let cli = MultitoolCli::try_parse_from(args).expect("parse");
|
||||
|
||||
@@ -12,6 +12,12 @@ pub struct Cli {
|
||||
#[command(subcommand)]
|
||||
pub command: Option<Command>,
|
||||
|
||||
/// Fork from an existing session id (or thread name) before sending the prompt.
|
||||
///
|
||||
/// This creates a new session with copied history, similar to `codex fork`.
|
||||
#[arg(long = "fork", value_name = "SESSION_ID")]
|
||||
pub fork_session_id: Option<String>,
|
||||
|
||||
/// Optional image(s) to attach to the initial prompt.
|
||||
#[arg(
|
||||
long = "image",
|
||||
@@ -111,6 +117,19 @@ pub struct Cli {
|
||||
pub prompt: Option<String>,
|
||||
}
|
||||
|
||||
impl Cli {
|
||||
pub fn validate(self) -> Result<Self, clap::Error> {
|
||||
if self.fork_session_id.is_some() && self.command.is_some() {
|
||||
return Err(clap::Error::raw(
|
||||
clap::error::ErrorKind::ArgumentConflict,
|
||||
"--fork cannot be used with subcommands",
|
||||
));
|
||||
}
|
||||
|
||||
Ok(self)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, clap::Subcommand)]
|
||||
pub enum Command {
|
||||
/// Resume a previous session by id or pick the most recent with --last.
|
||||
@@ -312,4 +331,23 @@ mod tests {
|
||||
assert_eq!(args.session_id.as_deref(), Some("session-123"));
|
||||
assert_eq!(args.prompt.as_deref(), Some(PROMPT));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fork_option_parses_prompt() {
|
||||
const PROMPT: &str = "echo fork-non-interactive";
|
||||
let cli = Cli::parse_from(["codex-exec", "--fork", "session-123", "--json", PROMPT]);
|
||||
|
||||
assert_eq!(cli.fork_session_id.as_deref(), Some("session-123"));
|
||||
assert_eq!(cli.prompt.as_deref(), Some(PROMPT));
|
||||
assert!(cli.command.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fork_option_conflicts_with_subcommands() {
|
||||
let err = Cli::try_parse_from(["codex-exec", "--fork", "session-123", "resume"])
|
||||
.and_then(Cli::validate)
|
||||
.expect_err("fork should conflict with subcommands");
|
||||
|
||||
assert_eq!(err.kind(), clap::error::ErrorKind::ArgumentConflict);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,6 +29,8 @@ use codex_app_server_protocol::ReviewTarget as ApiReviewTarget;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_app_server_protocol::Thread as AppServerThread;
|
||||
use codex_app_server_protocol::ThreadForkParams;
|
||||
use codex_app_server_protocol::ThreadForkResponse;
|
||||
use codex_app_server_protocol::ThreadItem as AppServerThreadItem;
|
||||
use codex_app_server_protocol::ThreadListParams;
|
||||
use codex_app_server_protocol::ThreadListResponse;
|
||||
@@ -120,18 +122,6 @@ enum InitialOperation {
|
||||
},
|
||||
}
|
||||
|
||||
enum StdinPromptBehavior {
|
||||
/// Read stdin only when there is no positional prompt, which is the legacy
|
||||
/// `codex exec` behavior for `codex exec` with piped input.
|
||||
RequiredIfPiped,
|
||||
/// Always treat stdin as the prompt, used for the explicit `codex exec -`
|
||||
/// sentinel and similar forced-stdin call sites.
|
||||
Forced,
|
||||
/// If stdin is piped alongside a positional prompt, treat stdin as
|
||||
/// additional context to append rather than as the primary prompt.
|
||||
OptionalAppend,
|
||||
}
|
||||
|
||||
struct RequestIdSequencer {
|
||||
next: i64,
|
||||
}
|
||||
@@ -154,6 +144,7 @@ struct ExecRunArgs {
|
||||
config: Config,
|
||||
dangerously_bypass_approvals_and_sandbox: bool,
|
||||
exec_span: tracing::Span,
|
||||
fork_session_id: Option<String>,
|
||||
images: Vec<PathBuf>,
|
||||
json_mode: bool,
|
||||
last_message_file: Option<PathBuf>,
|
||||
@@ -181,6 +172,7 @@ pub async fn run_main(cli: Cli, arg0_paths: Arg0DispatchPaths) -> anyhow::Result
|
||||
|
||||
let Cli {
|
||||
command,
|
||||
fork_session_id,
|
||||
images,
|
||||
model: model_cli_arg,
|
||||
oss,
|
||||
@@ -452,6 +444,7 @@ pub async fn run_main(cli: Cli, arg0_paths: Arg0DispatchPaths) -> anyhow::Result
|
||||
config,
|
||||
dangerously_bypass_approvals_and_sandbox,
|
||||
exec_span: exec_span.clone(),
|
||||
fork_session_id,
|
||||
images,
|
||||
json_mode,
|
||||
last_message_file,
|
||||
@@ -473,6 +466,7 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
|
||||
config,
|
||||
dangerously_bypass_approvals_and_sandbox,
|
||||
exec_span,
|
||||
fork_session_id,
|
||||
images,
|
||||
json_mode,
|
||||
last_message_file,
|
||||
@@ -531,10 +525,10 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
|
||||
anyhow::anyhow!("failed to initialize in-process app-server client: {err}")
|
||||
})?;
|
||||
|
||||
// Handle resume subcommand through existing `thread/list` + `thread/resume`
|
||||
// APIs so exec no longer reaches into rollout storage directly.
|
||||
let (primary_thread_id, fallback_session_configured) =
|
||||
if let Some(ExecCommand::Resume(args)) = command.as_ref() {
|
||||
// Handle resume/fork/start through app-server APIs so exec no longer reaches into
|
||||
// rollout storage directly for normal bootstrap.
|
||||
let (primary_thread_id, fallback_session_configured) = match command.as_ref() {
|
||||
Some(ExecCommand::Resume(args)) => {
|
||||
if let Some(thread_id) = resolve_resume_thread_id(&client, &config, args).await? {
|
||||
let response: ThreadResumeResponse = send_request_with_response(
|
||||
&client,
|
||||
@@ -564,22 +558,41 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
|
||||
.map_err(anyhow::Error::msg)?;
|
||||
(session_configured.session_id, session_configured)
|
||||
}
|
||||
} else {
|
||||
let response: ThreadStartResponse = send_request_with_response(
|
||||
&client,
|
||||
ClientRequest::ThreadStart {
|
||||
request_id: request_ids.next(),
|
||||
params: thread_start_params_from_config(&config),
|
||||
},
|
||||
"thread/start",
|
||||
)
|
||||
.await
|
||||
.map_err(anyhow::Error::msg)?;
|
||||
let session_configured = session_configured_from_thread_start_response(&response)
|
||||
}
|
||||
Some(ExecCommand::Review(_)) | None => {
|
||||
if let Some(session_id) = fork_session_id.as_deref() {
|
||||
let response: ThreadForkResponse = send_request_with_response(
|
||||
&client,
|
||||
ClientRequest::ThreadFork {
|
||||
request_id: request_ids.next(),
|
||||
params: thread_fork_params_from_config(
|
||||
&config, session_id, /*path*/ None,
|
||||
),
|
||||
},
|
||||
"thread/fork",
|
||||
)
|
||||
.await
|
||||
.map_err(anyhow::Error::msg)?;
|
||||
(session_configured.session_id, session_configured)
|
||||
};
|
||||
|
||||
let session_configured = session_configured_from_thread_fork_response(&response)
|
||||
.map_err(anyhow::Error::msg)?;
|
||||
(session_configured.session_id, session_configured)
|
||||
} else {
|
||||
let response: ThreadStartResponse = send_request_with_response(
|
||||
&client,
|
||||
ClientRequest::ThreadStart {
|
||||
request_id: request_ids.next(),
|
||||
params: thread_start_params_from_config(&config),
|
||||
},
|
||||
"thread/start",
|
||||
)
|
||||
.await
|
||||
.map_err(anyhow::Error::msg)?;
|
||||
let session_configured = session_configured_from_thread_start_response(&response)
|
||||
.map_err(anyhow::Error::msg)?;
|
||||
(session_configured.session_id, session_configured)
|
||||
}
|
||||
}
|
||||
};
|
||||
let primary_thread_id_for_span = primary_thread_id.to_string();
|
||||
// Use the start/resume response as the authoritative bootstrap payload.
|
||||
// Waiting for a later streamed `SessionConfigured` event adds up to 10s of
|
||||
@@ -627,7 +640,7 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
|
||||
)
|
||||
}
|
||||
(None, root_prompt, imgs) => {
|
||||
let prompt_text = resolve_root_prompt(root_prompt);
|
||||
let prompt_text = resolve_prompt(root_prompt);
|
||||
let mut items: Vec<UserInput> = imgs
|
||||
.into_iter()
|
||||
.map(|path| UserInput::LocalImage { path })
|
||||
@@ -889,6 +902,23 @@ fn approvals_reviewer_override_from_config(
|
||||
Some(config.approvals_reviewer.into())
|
||||
}
|
||||
|
||||
fn thread_fork_params_from_config(
|
||||
config: &Config,
|
||||
thread_id: &str,
|
||||
path: Option<PathBuf>,
|
||||
) -> ThreadForkParams {
|
||||
ThreadForkParams {
|
||||
thread_id: thread_id.to_string(),
|
||||
path,
|
||||
model: config.model.clone(),
|
||||
model_provider: Some(config.model_provider_id.clone()),
|
||||
cwd: Some(config.cwd.to_string_lossy().to_string()),
|
||||
approval_policy: Some(config.permissions.approval_policy.value().into()),
|
||||
sandbox: sandbox_mode_from_policy(config.permissions.sandbox_policy.get()),
|
||||
..ThreadForkParams::default()
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_request_with_response<T>(
|
||||
client: &InProcessAppServerClient,
|
||||
request: ClientRequest,
|
||||
@@ -942,6 +972,24 @@ fn session_configured_from_thread_resume_response(
|
||||
)
|
||||
}
|
||||
|
||||
fn session_configured_from_thread_fork_response(
|
||||
response: &ThreadForkResponse,
|
||||
) -> Result<SessionConfiguredEvent, String> {
|
||||
session_configured_from_thread_response(
|
||||
&response.thread.id,
|
||||
response.thread.name.clone(),
|
||||
response.thread.path.clone(),
|
||||
response.model.clone(),
|
||||
response.model_provider.clone(),
|
||||
response.service_tier,
|
||||
response.approval_policy.to_core(),
|
||||
response.approvals_reviewer.to_core(),
|
||||
response.sandbox.to_core(),
|
||||
response.cwd.clone(),
|
||||
response.reasoning_effort,
|
||||
)
|
||||
}
|
||||
|
||||
fn review_target_to_api(target: ReviewTarget) -> ApiReviewTarget {
|
||||
match target {
|
||||
ReviewTarget::UncommittedChanges => ApiReviewTarget::UncommittedChanges,
|
||||
@@ -1540,89 +1588,43 @@ fn decode_utf16(
|
||||
String::from_utf16(&units).map_err(|_| PromptDecodeError::InvalidUtf16 { encoding })
|
||||
}
|
||||
|
||||
fn read_prompt_from_stdin(behavior: StdinPromptBehavior) -> Option<String> {
|
||||
let stdin_is_terminal = std::io::stdin().is_terminal();
|
||||
|
||||
match behavior {
|
||||
StdinPromptBehavior::RequiredIfPiped if stdin_is_terminal => {
|
||||
eprintln!(
|
||||
"No prompt provided. Either specify one as an argument or pipe the prompt into stdin."
|
||||
);
|
||||
std::process::exit(1);
|
||||
}
|
||||
StdinPromptBehavior::RequiredIfPiped => {
|
||||
eprintln!("Reading prompt from stdin...");
|
||||
}
|
||||
StdinPromptBehavior::Forced => {}
|
||||
StdinPromptBehavior::OptionalAppend if stdin_is_terminal => return None,
|
||||
StdinPromptBehavior::OptionalAppend => {
|
||||
eprintln!("Reading additional input from stdin...");
|
||||
}
|
||||
}
|
||||
|
||||
let mut bytes = Vec::new();
|
||||
if let Err(e) = std::io::stdin().read_to_end(&mut bytes) {
|
||||
eprintln!("Failed to read prompt from stdin: {e}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
let buffer = match decode_prompt_bytes(&bytes) {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
eprintln!("Failed to read prompt from stdin: {e}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
|
||||
if buffer.trim().is_empty() {
|
||||
match behavior {
|
||||
StdinPromptBehavior::OptionalAppend => None,
|
||||
StdinPromptBehavior::RequiredIfPiped | StdinPromptBehavior::Forced => {
|
||||
eprintln!("No prompt provided via stdin.");
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Some(buffer)
|
||||
}
|
||||
}
|
||||
|
||||
fn prompt_with_stdin_context(prompt: &str, stdin_text: &str) -> String {
|
||||
let mut combined = format!("{prompt}\n\n<stdin>\n{stdin_text}");
|
||||
if !stdin_text.ends_with('\n') {
|
||||
combined.push('\n');
|
||||
}
|
||||
combined.push_str("</stdin>");
|
||||
combined
|
||||
}
|
||||
|
||||
fn resolve_prompt(prompt_arg: Option<String>) -> String {
|
||||
match prompt_arg {
|
||||
Some(p) if p != "-" => p,
|
||||
maybe_dash => {
|
||||
let behavior = if matches!(maybe_dash.as_deref(), Some("-")) {
|
||||
StdinPromptBehavior::Forced
|
||||
} else {
|
||||
StdinPromptBehavior::RequiredIfPiped
|
||||
};
|
||||
let Some(prompt) = read_prompt_from_stdin(behavior) else {
|
||||
unreachable!("required stdin prompt should produce content");
|
||||
};
|
||||
prompt
|
||||
}
|
||||
}
|
||||
}
|
||||
let force_stdin = matches!(maybe_dash.as_deref(), Some("-"));
|
||||
|
||||
fn resolve_root_prompt(prompt_arg: Option<String>) -> String {
|
||||
match prompt_arg {
|
||||
Some(prompt) if prompt != "-" => {
|
||||
if let Some(stdin_text) = read_prompt_from_stdin(StdinPromptBehavior::OptionalAppend) {
|
||||
prompt_with_stdin_context(&prompt, &stdin_text)
|
||||
} else {
|
||||
prompt
|
||||
if std::io::stdin().is_terminal() && !force_stdin {
|
||||
eprintln!(
|
||||
"No prompt provided. Either specify one as an argument or pipe the prompt into stdin."
|
||||
);
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
if !force_stdin {
|
||||
eprintln!("Reading prompt from stdin...");
|
||||
}
|
||||
|
||||
let mut bytes = Vec::new();
|
||||
if let Err(e) = std::io::stdin().read_to_end(&mut bytes) {
|
||||
eprintln!("Failed to read prompt from stdin: {e}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
let buffer = match decode_prompt_bytes(&bytes) {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
eprintln!("Failed to read prompt from stdin: {e}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
|
||||
if buffer.trim().is_empty() {
|
||||
eprintln!("No prompt provided via stdin.");
|
||||
std::process::exit(1);
|
||||
}
|
||||
buffer
|
||||
}
|
||||
maybe_dash => resolve_prompt(maybe_dash),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1836,26 +1838,6 @@ mod tests {
|
||||
assert_eq!(err, PromptDecodeError::InvalidUtf8 { valid_up_to: 0 });
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn prompt_with_stdin_context_wraps_stdin_block() {
|
||||
let combined = prompt_with_stdin_context("Summarize this concisely", "my output");
|
||||
|
||||
assert_eq!(
|
||||
combined,
|
||||
"Summarize this concisely\n\n<stdin>\nmy output\n</stdin>"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn prompt_with_stdin_context_preserves_trailing_newline() {
|
||||
let combined = prompt_with_stdin_context("Summarize this concisely", "my output\n");
|
||||
|
||||
assert_eq!(
|
||||
combined,
|
||||
"Summarize this concisely\n\n<stdin>\nmy output\n</stdin>"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn lagged_event_warning_message_is_explicit() {
|
||||
assert_eq!(
|
||||
|
||||
@@ -29,7 +29,10 @@ fn main() -> anyhow::Result<()> {
|
||||
arg0_dispatch_or_else(|arg0_paths: Arg0DispatchPaths| async move {
|
||||
let top_cli = TopCli::parse();
|
||||
// Merge root-level overrides into inner CLI struct so downstream logic remains unchanged.
|
||||
let mut inner = top_cli.inner;
|
||||
let mut inner = match top_cli.inner.validate() {
|
||||
Ok(inner) => inner,
|
||||
Err(err) => err.exit(),
|
||||
};
|
||||
inner
|
||||
.config_overrides
|
||||
.raw_overrides
|
||||
@@ -79,4 +82,24 @@ mod tests {
|
||||
"reasoning_level=xhigh"
|
||||
);
|
||||
}
|
||||
#[test]
|
||||
fn top_cli_parses_fork_option_with_root_config() {
|
||||
let cli = TopCli::parse_from([
|
||||
"codex-exec",
|
||||
"--config",
|
||||
"reasoning_level=xhigh",
|
||||
"--fork",
|
||||
"session-123",
|
||||
"echo fork",
|
||||
]);
|
||||
|
||||
assert_eq!(cli.inner.fork_session_id.as_deref(), Some("session-123"));
|
||||
assert!(cli.inner.command.is_none());
|
||||
assert_eq!(cli.inner.prompt.as_deref(), Some("echo fork"));
|
||||
assert_eq!(cli.config_overrides.raw_overrides.len(), 1);
|
||||
assert_eq!(
|
||||
cli.config_overrides.raw_overrides[0],
|
||||
"reasoning_level=xhigh"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
162
codex-rs/exec/tests/suite/fork.rs
Normal file
162
codex-rs/exec/tests/suite/fork.rs
Normal file
@@ -0,0 +1,162 @@
|
||||
#![allow(clippy::unwrap_used, clippy::expect_used)]
|
||||
|
||||
use anyhow::Context;
|
||||
use codex_utils_cargo_bin::find_resource;
|
||||
use core_test_support::test_codex_exec::test_codex_exec;
|
||||
use serde_json::Value;
|
||||
use std::string::ToString;
|
||||
use uuid::Uuid;
|
||||
use walkdir::WalkDir;
|
||||
|
||||
/// Utility: scan the sessions dir for a rollout file that contains `marker`
|
||||
/// in any response_item.message.content entry. Returns the absolute path.
|
||||
fn find_session_file_containing_marker(
|
||||
sessions_dir: &std::path::Path,
|
||||
marker: &str,
|
||||
) -> Option<std::path::PathBuf> {
|
||||
for entry in WalkDir::new(sessions_dir) {
|
||||
let entry = match entry {
|
||||
Ok(e) => e,
|
||||
Err(_) => continue,
|
||||
};
|
||||
if !entry.file_type().is_file() {
|
||||
continue;
|
||||
}
|
||||
if !entry.file_name().to_string_lossy().ends_with(".jsonl") {
|
||||
continue;
|
||||
}
|
||||
let path = entry.path();
|
||||
let Ok(content) = std::fs::read_to_string(path) else {
|
||||
continue;
|
||||
};
|
||||
// Skip the first meta line and scan remaining JSONL entries.
|
||||
let mut lines = content.lines();
|
||||
if lines.next().is_none() {
|
||||
continue;
|
||||
}
|
||||
for line in lines {
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
let Ok(item): Result<Value, _> = serde_json::from_str(line) else {
|
||||
continue;
|
||||
};
|
||||
if item.get("type").and_then(|t| t.as_str()) == Some("response_item")
|
||||
&& let Some(payload) = item.get("payload")
|
||||
&& payload.get("type").and_then(|t| t.as_str()) == Some("message")
|
||||
&& payload
|
||||
.get("content")
|
||||
.map(ToString::to_string)
|
||||
.unwrap_or_default()
|
||||
.contains(marker)
|
||||
{
|
||||
return Some(path.to_path_buf());
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Extract the conversation UUID from the first SessionMeta line in the rollout file.
|
||||
fn extract_conversation_id(path: &std::path::Path) -> String {
|
||||
let content = std::fs::read_to_string(path).unwrap();
|
||||
let mut lines = content.lines();
|
||||
let meta_line = lines.next().expect("missing meta line");
|
||||
let meta: Value = serde_json::from_str(meta_line).expect("invalid meta json");
|
||||
meta.get("payload")
|
||||
.and_then(|p| p.get("id"))
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or_default()
|
||||
.to_string()
|
||||
}
|
||||
|
||||
fn extract_forked_from_id(path: &std::path::Path) -> Option<String> {
|
||||
let content = std::fs::read_to_string(path).unwrap();
|
||||
let mut lines = content.lines();
|
||||
let meta_line = lines.next().expect("missing meta line");
|
||||
let meta: Value = serde_json::from_str(meta_line).expect("invalid meta json");
|
||||
meta.get("payload")
|
||||
.and_then(|payload| payload.get("forked_from_id"))
|
||||
.and_then(Value::as_str)
|
||||
.map(ToString::to_string)
|
||||
}
|
||||
|
||||
fn rollout_contains_fork_reference(path: &std::path::Path) -> bool {
|
||||
let Ok(content) = std::fs::read_to_string(path) else {
|
||||
return false;
|
||||
};
|
||||
content.lines().skip(1).any(|line| {
|
||||
serde_json::from_str::<Value>(line)
|
||||
.ok()
|
||||
.and_then(|item| item.get("type").and_then(Value::as_str).map(str::to_string))
|
||||
.as_deref()
|
||||
== Some("fork_reference")
|
||||
})
|
||||
}
|
||||
|
||||
fn exec_fixture() -> anyhow::Result<std::path::PathBuf> {
|
||||
Ok(find_resource!("tests/fixtures/cli_responses_fixture.sse")?)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn exec_fork_by_id_creates_new_session_with_copied_history() -> anyhow::Result<()> {
|
||||
let test = test_codex_exec();
|
||||
let fixture = exec_fixture()?;
|
||||
|
||||
let marker = format!("fork-base-{}", Uuid::new_v4());
|
||||
let prompt = format!("echo {marker}");
|
||||
|
||||
test.cmd()
|
||||
.env("CODEX_RS_SSE_FIXTURE", &fixture)
|
||||
.env("OPENAI_BASE_URL", "http://unused.local")
|
||||
.arg("--skip-git-repo-check")
|
||||
.arg(&prompt)
|
||||
.assert()
|
||||
.success();
|
||||
|
||||
let sessions_dir = test.home_path().join("sessions");
|
||||
let original_path = find_session_file_containing_marker(&sessions_dir, &marker)
|
||||
.context("no session file found after first run")?;
|
||||
let session_id = extract_conversation_id(&original_path);
|
||||
|
||||
let marker2 = format!("fork-follow-up-{}", Uuid::new_v4());
|
||||
let prompt2 = format!("echo {marker2}");
|
||||
|
||||
test.cmd()
|
||||
.env("CODEX_RS_SSE_FIXTURE", &fixture)
|
||||
.env("OPENAI_BASE_URL", "http://unused.local")
|
||||
.arg("--skip-git-repo-check")
|
||||
.arg("--fork")
|
||||
.arg(&session_id)
|
||||
.arg(&prompt2)
|
||||
.assert()
|
||||
.success();
|
||||
|
||||
let forked_path = find_session_file_containing_marker(&sessions_dir, &marker2)
|
||||
.context("no forked session file found for second marker")?;
|
||||
|
||||
assert_ne!(
|
||||
forked_path, original_path,
|
||||
"fork should create a new session file"
|
||||
);
|
||||
|
||||
let forked_content = std::fs::read_to_string(&forked_path)?;
|
||||
assert_eq!(
|
||||
extract_forked_from_id(&forked_path).as_deref(),
|
||||
Some(session_id.as_str())
|
||||
);
|
||||
assert!(
|
||||
forked_content.contains(&marker) || rollout_contains_fork_reference(&forked_path),
|
||||
"forked rollout should either inline parent history or record a fork reference"
|
||||
);
|
||||
assert!(forked_content.contains(&marker2));
|
||||
|
||||
let original_content = std::fs::read_to_string(&original_path)?;
|
||||
assert!(original_content.contains(&marker));
|
||||
assert!(
|
||||
!original_content.contains(&marker2),
|
||||
"original session should not receive the forked prompt"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -3,6 +3,7 @@ mod add_dir;
|
||||
mod apply_patch;
|
||||
mod auth_env;
|
||||
mod ephemeral;
|
||||
mod fork;
|
||||
mod mcp_required_exit;
|
||||
mod originator;
|
||||
mod output_schema;
|
||||
|
||||
@@ -134,7 +134,7 @@ fn pipes_stdin_and_stdout_through_socket() -> anyhow::Result<()> {
|
||||
assert_eq!(stdout, b"response");
|
||||
|
||||
let received = rx
|
||||
.recv_timeout(Duration::from_secs(1))
|
||||
.recv_timeout(Duration::from_secs(5))
|
||||
.context("server did not receive data in time")?;
|
||||
assert_eq!(received, request);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user