Compare commits

...

7 Commits

Author SHA1 Message Date
Charlie Weems
d624f6a521 Clean up command outputs 2025-07-23 10:17:50 -07:00
pap
903de87833 adding best of n 2025-07-22 18:42:35 -07:00
pap
8b9f09a5ba adding secs ago instead of start date 2025-07-22 18:36:45 -07:00
pap
20db1497f3 missing dependencies 2025-07-22 16:06:30 -07:00
pap
ed9de08465 adding a test and renaming jobs to tasks 2025-07-22 16:06:21 -07:00
pap
bb0befc8db adding codex jobs command, including jobs ls, inspect, logs and -a options 2025-07-22 13:42:47 -07:00
pap
2ca44b46d6 adding concurrent option using worktree 2025-07-22 12:03:52 -07:00
13 changed files with 1283 additions and 11 deletions

5
codex-rs/Cargo.lock generated
View File

@@ -626,6 +626,8 @@ name = "codex-cli"
version = "0.0.0"
dependencies = [
"anyhow",
"assert_cmd",
"chrono",
"clap",
"clap_complete",
"codex-chatgpt",
@@ -636,10 +638,13 @@ dependencies = [
"codex-login",
"codex-mcp-server",
"codex-tui",
"serde",
"serde_json",
"tempfile",
"tokio",
"tracing",
"tracing-subscriber",
"uuid",
]
[[package]]

View File

@@ -27,6 +27,8 @@ codex-linux-sandbox = { path = "../linux-sandbox" }
codex-mcp-server = { path = "../mcp-server" }
codex-tui = { path = "../tui" }
serde_json = "1"
serde = { version = "1", features = ["derive"] }
chrono = { version = "0.4", default-features = false, features = ["clock"] }
tokio = { version = "1", features = [
"io-std",
"macros",
@@ -36,3 +38,10 @@ tokio = { version = "1", features = [
] }
tracing = "0.1.41"
tracing-subscriber = "0.3.19"
uuid = { version = "1", features = ["v4"] }
[dev-dependencies]
assert_cmd = "2"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tempfile = "3"

View File

@@ -0,0 +1,357 @@
use std::fs::File;
use std::path::PathBuf;
use std::process::{Command, Stdio};
use std::io::Write; // added for write_all / flush
use anyhow::Context;
use codex_common::ApprovalModeCliArg;
use codex_tui::Cli as TuiCli;
/// Attempt to handle a concurrent background run. Returns Ok(true) if a background exec
/// process was spawned (in which case the caller should NOT start the TUI), or Ok(false)
/// to proceed with normal interactive execution.
pub fn maybe_spawn_concurrent(
tui_cli: &mut TuiCli,
root_raw_overrides: &[String],
concurrent: bool,
concurrent_automerge: Option<bool>,
concurrent_branch_name: &Option<String>,
) -> anyhow::Result<bool> {
if !concurrent { return Ok(false); }
// Enforce autonomous execution conditions when running interactive mode.
// Validate git repository presence (required for --concurrent) only if we're in interactive path.
{
let dir_to_check = tui_cli
.cwd
.clone()
.unwrap_or_else(|| std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")));
let status = Command::new("git")
.arg("-C")
.arg(&dir_to_check)
.arg("rev-parse")
.arg("--git-dir")
.stdout(Stdio::null())
.stderr(Stdio::null())
.status();
if status.as_ref().map(|s| !s.success()).unwrap_or(true) {
eprintln!(
"Error: --concurrent requires a git repository (directory {:?} is not managed by git).",
dir_to_check
);
std::process::exit(2);
}
}
let ap = tui_cli.approval_policy;
let approval_on_failure = matches!(ap, Some(ApprovalModeCliArg::OnFailure));
let autonomous = tui_cli.full_auto
|| tui_cli.dangerously_bypass_approvals_and_sandbox
|| approval_on_failure;
if !autonomous {
eprintln!(
"Error: --concurrent requires autonomous mode. Use one of: --full-auto, --ask-for-approval on-failure, or --dangerously-bypass-approvals-and-sandbox."
);
std::process::exit(2);
}
if tui_cli.prompt.is_none() {
eprintln!(
"Error: --concurrent requires a prompt argument so the agent does not wait for interactive input."
);
std::process::exit(2);
}
// Build exec args from interactive CLI for autonomous run without TUI (background).
let mut exec_args: Vec<String> = Vec::new();
if !tui_cli.images.is_empty() {
exec_args.push("--image".into());
exec_args.push(tui_cli.images.iter().map(|p| p.display().to_string()).collect::<Vec<_>>().join(","));
}
if let Some(model) = &tui_cli.model { exec_args.push("--model".into()); exec_args.push(model.clone()); }
if let Some(profile) = &tui_cli.config_profile { exec_args.push("--profile".into()); exec_args.push(profile.clone()); }
if let Some(sandbox) = &tui_cli.sandbox_mode { exec_args.push("--sandbox".into()); exec_args.push(format!("{sandbox:?}").to_lowercase().replace('_', "-")); }
if tui_cli.full_auto { exec_args.push("--full-auto".into()); }
if tui_cli.dangerously_bypass_approvals_and_sandbox { exec_args.push("--dangerously-bypass-approvals-and-sandbox".into()); }
if tui_cli.skip_git_repo_check { exec_args.push("--skip-git-repo-check".into()); }
for raw in root_raw_overrides { exec_args.push("-c".into()); exec_args.push(raw.clone()); }
// Derive a single slug (shared by worktree branch & log filename) from the prompt.
let raw_prompt = tui_cli.prompt.as_deref().unwrap_or("");
let snippet = raw_prompt.chars().take(32).collect::<String>();
let mut slug: String = snippet
.chars()
.map(|c| if c.is_ascii_alphanumeric() { c.to_ascii_lowercase() } else { '-' })
.collect();
while slug.contains("--") { slug = slug.replace("--", "-"); }
slug = slug.trim_matches('-').to_string();
if slug.is_empty() { slug = "prompt".into(); }
// Determine concurrent defaults from env (no config file), then apply CLI precedence.
let env_automerge = parse_env_bool("CONCURRENT_AUTOMERGE");
let env_branch_name = std::env::var("CONCURRENT_BRANCH_NAME").ok();
let effective_automerge = concurrent_automerge.or(env_automerge).unwrap_or(true);
let user_branch_name_opt = concurrent_branch_name.clone().or(env_branch_name);
let branch_name_effective = if let Some(bn_raw) = user_branch_name_opt.as_ref() {
let bn_trim = bn_raw.trim();
if bn_trim.is_empty() { format!("codex/{slug}") } else { bn_trim.to_string() }
} else {
format!("codex/{slug}")
};
// Unique job id for this concurrent run (used for log file naming instead of slug).
let task_id = uuid::Uuid::new_v4().to_string();
// Prepare log file path early so we can write pre-spawn logs (e.g. worktree creation output) into it.
let log_dir = match codex_base_dir() {
Ok(base) => {
let d = base.join("log");
let _ = std::fs::create_dir_all(&d);
d
}
Err(_) => PathBuf::from("/tmp"),
};
let log_path = log_dir.join(format!("codex-logs-{}.log", task_id));
// If user did NOT specify an explicit cwd, create an isolated git worktree.
let mut created_worktree: Option<(PathBuf, String)> = None; // (path, branch)
let mut original_branch: Option<String> = None;
let mut original_commit: Option<String> = None;
let mut pre_spawn_logs = String::new();
if tui_cli.cwd.is_none() {
original_branch = git_capture(["rev-parse", "--abbrev-ref", "HEAD"]).ok();
original_commit = git_capture(["rev-parse", "HEAD"]).ok();
match create_concurrent_worktree(&branch_name_effective) {
Ok(Some(info)) => {
exec_args.push("--cd".into());
exec_args.push(info.worktree_path.display().to_string());
created_worktree = Some((info.worktree_path, info.branch_name.clone()));
// Keep the original git output plus a concise created line (for log file only).
pre_spawn_logs.push_str(&info.logs);
pre_spawn_logs.push_str(&format!(
"Created git worktree at {} (branch {}) for concurrent run\n",
created_worktree.as_ref().unwrap().0.display(), info.branch_name
));
}
Ok(None) => {
// Silence console noise: do not warn here to keep stdout clean; we still proceed.
}
Err(e) => {
eprintln!("Error: failed to create git worktree for --concurrent: {e}");
eprintln!("Hint: remove or rename existing branch '{branch_name_effective}', or pass --concurrent-branch-name to choose a unique name.");
std::process::exit(3);
}
}
} else if let Some(explicit) = &tui_cli.cwd {
exec_args.push("--cd".into());
exec_args.push(explicit.display().to_string());
}
// Prompt (safe to unwrap due to earlier validation).
if let Some(prompt) = tui_cli.prompt.clone() { exec_args.push(prompt); }
// Create (or truncate) the log file and write any pre-spawn logs we captured.
let file = match File::create(&log_path) {
Ok(mut f) => {
if !pre_spawn_logs.is_empty() {
let _ = f.write_all(pre_spawn_logs.as_bytes());
let _ = f.flush();
}
f
}
Err(e) => {
eprintln!("Failed to create log file {}: {e}. Falling back to interactive mode.", log_path.display());
return Ok(false);
}
};
match File::create(&log_path) {
Ok(file) => {
let file_err = file.try_clone().ok();
let mut cmd = Command::new(
std::env::current_exe().unwrap_or_else(|_| PathBuf::from("codex"))
);
cmd.arg("exec");
for a in &exec_args { cmd.arg(a); }
// Provide metadata for auto merge if we created a worktree.
if let Some((wt_path, branch)) = &created_worktree {
if effective_automerge { cmd.env("CODEX_CONCURRENT_AUTOMERGE", "1"); }
cmd.env("CODEX_CONCURRENT_BRANCH", branch);
cmd.env("CODEX_CONCURRENT_WORKTREE", wt_path);
if let Some(ob) = &original_branch { cmd.env("CODEX_ORIGINAL_BRANCH", ob); }
if let Some(oc) = &original_commit { cmd.env("CODEX_ORIGINAL_COMMIT", oc); }
if let Ok(orig_root) = std::env::current_dir() { cmd.env("CODEX_ORIGINAL_ROOT", orig_root); }
}
// Provide task id so child process can emit token_count updates to tasks.jsonl.
cmd.env("CODEX_TASK_ID", &task_id);
cmd.stdout(Stdio::from(file));
if let Some(f2) = file_err { cmd.stderr(Stdio::from(f2)); }
match cmd.spawn() {
Ok(child) => {
// Human-friendly multi-line output with bold headers.
let branch_val = created_worktree.as_ref().map(|(_, b)| b.as_str()).unwrap_or("(none)");
let worktree_val = created_worktree
.as_ref()
.map(|(p, _)| p.display().to_string())
.unwrap_or_else(|| "(original cwd)".to_string());
// ANSI escape for bold: \x1b[1m ... \x1b[0m
println!("\x1b[1mTask ID:\x1b[0m {}", task_id);
println!("\x1b[1mPID:\x1b[0m {}", child.id());
println!("\x1b[1mBranch:\x1b[0m {}", branch_val);
println!("\x1b[1mWorktree:\x1b[0m {}", worktree_val);
println!("\x1b[1mState:\x1b[0m started");
// Use bold bright magenta (95) for actionable follow-up commands.
println!("\nMonitor all tasks: \x1b[1;95mcodex tasks ls\x1b[0m");
println!("Watch this task: \x1b[1;95mcodex logs {} -f\x1b[0m", task_id);
// Record task metadata to CODEX_HOME/tasks.jsonl (JSON Lines file).
let record_time = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
if let Ok(base) = codex_base_dir() {
let tasks_path = base.join("tasks.jsonl");
let record = serde_json::json!({
"task_id": task_id,
"pid": child.id(),
"worktree": created_worktree.as_ref().map(|(p, _)| p.display().to_string()),
"branch": created_worktree.as_ref().map(|(_, b)| b.clone()),
"original_branch": original_branch,
"original_commit": original_commit,
"log_path": log_path.display().to_string(),
"prompt": raw_prompt,
"model": tui_cli.model.clone(),
"start_time": record_time,
"automerge": effective_automerge,
"explicit_branch_name": user_branch_name_opt,
"token_count": serde_json::Value::Null,
"state": "started",
});
if let Ok(mut f) = std::fs::OpenOptions::new().create(true).append(true).open(&tasks_path) {
use std::io::Write;
if let Err(e) = writeln!(f, "{}", record.to_string()) {
eprintln!("Warning: failed writing task record to {}: {e}", tasks_path.display());
}
} else {
eprintln!("Warning: could not open tasks log file at {}", tasks_path.display());
}
}
return Ok(true); // background spawned
}
Err(e) => {
eprintln!("Failed to start background exec: {e}. Falling back to interactive mode.");
}
}
}
Err(e) => {
eprintln!(
"Failed to create log file {}: {e}. Falling back to interactive mode.",
log_path.display()
);
}
}
Ok(false)
}
/// Return the base Codex directory under the user's home (~/.codex), creating it if necessary.
fn codex_base_dir() -> anyhow::Result<PathBuf> {
if let Ok(val) = std::env::var("CODEX_HOME") {
if !val.is_empty() {
return Ok(PathBuf::from(val).canonicalize()?);
}
}
let home = std::env::var_os("HOME").ok_or_else(|| anyhow::anyhow!("Could not find home directory"))?;
let base = PathBuf::from(home).join(".codex");
std::fs::create_dir_all(&base)?;
Ok(base)
}
/// Attempt to create a git worktree for an isolated concurrent run capturing git output.
struct WorktreeInfo { worktree_path: PathBuf, branch_name: String, logs: String }
fn create_concurrent_worktree(branch_name: &str) -> anyhow::Result<Option<WorktreeInfo>> {
// Determine repository root.
let output = Command::new("git").arg("rev-parse").arg("--show-toplevel").output();
let repo_root = match output {
Ok(out) if out.status.success() => {
let s = String::from_utf8_lossy(&out.stdout).trim().to_string();
if s.is_empty() { return Ok(None); }
PathBuf::from(s)
}
_ => return Ok(None),
};
// Derive repo name from root directory.
let repo_name = repo_root
.file_name()
.and_then(|s| s.to_str())
.unwrap_or("repo");
// Fast-fail if branch already exists.
if Command::new("git")
.current_dir(&repo_root)
.arg("rev-parse")
.arg("--verify")
.arg(branch_name)
.stdout(Stdio::null())
.stderr(Stdio::null())
.status()
.map(|s| s.success())
.unwrap_or(false) {
anyhow::bail!("branch '{branch_name}' already exists");
}
// Construct worktree directory under ~/.codex/worktrees/<repo_name>/.
let base_dir = codex_base_dir()?.join("worktrees").join(repo_name);
std::fs::create_dir_all(&base_dir)?;
let mut worktree_path = base_dir.join(branch_name.replace('/', "-"));
if worktree_path.exists() {
for i in 1..1000 {
let candidate = base_dir.join(format!("{}-{}", branch_name.replace('/', "-"), i));
if !candidate.exists() { worktree_path = candidate; break; }
}
}
// Run git worktree add capturing output (stdout+stderr).
let add_out = Command::new("git")
.current_dir(&repo_root)
.arg("worktree")
.arg("add")
.arg("-b")
.arg(&branch_name)
.arg(&worktree_path)
.arg("HEAD")
.output()?;
if !add_out.status.success() {
anyhow::bail!("git worktree add failed with status {}", add_out.status);
}
let mut logs = String::new();
if !add_out.stdout.is_empty() { logs.push_str(&String::from_utf8_lossy(&add_out.stdout)); }
if !add_out.stderr.is_empty() { logs.push_str(&String::from_utf8_lossy(&add_out.stderr)); }
Ok(Some(WorktreeInfo { worktree_path, branch_name: branch_name.to_string(), logs }))
}
/// Helper: capture trimmed stdout of a git command.
fn git_capture<I, S>(args: I) -> anyhow::Result<String>
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
let mut cmd = Command::new("git");
for a in args { cmd.arg(a.as_ref()); }
let out = cmd.output().context("running git command")?;
if !out.status.success() { anyhow::bail!("git command failed"); }
Ok(String::from_utf8_lossy(&out.stdout).trim().to_string())
}
/// Parse common boolean environment variable representations.
fn parse_env_bool(name: &str) -> Option<bool> {
let raw = std::env::var(name).ok()?;
let lower = raw.to_ascii_lowercase();
match lower.as_str() {
"1" | "true" | "yes" | "on" => Some(true),
"0" | "false" | "no" | "off" => Some(false),
_ => None,
}
}

185
codex-rs/cli/src/inspect.rs Normal file
View File

@@ -0,0 +1,185 @@
use clap::Parser;
use serde::Deserialize;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::path::PathBuf;
use std::fs;
#[derive(Debug, Parser)]
pub struct InspectCli {
/// Task identifier (full/short task id or exact branch name)
pub id: String,
/// Output JSON instead of human table
#[arg(long)]
pub json: bool,
}
#[derive(Debug, Deserialize)]
struct RawRecord {
task_id: Option<String>,
pid: Option<u64>,
worktree: Option<String>,
branch: Option<String>,
original_branch: Option<String>,
original_commit: Option<String>,
log_path: Option<String>,
prompt: Option<String>,
model: Option<String>,
start_time: Option<u64>,
update_time: Option<u64>,
token_count: Option<serde_json::Value>,
state: Option<String>,
completion_time: Option<u64>,
end_time: Option<u64>,
automerge: Option<bool>,
explicit_branch_name: Option<String>,
}
#[derive(Debug, serde::Serialize, Default, Clone)]
struct TaskFull {
task_id: String,
pid: Option<u64>,
branch: Option<String>,
worktree: Option<String>,
original_branch: Option<String>,
original_commit: Option<String>,
log_path: Option<String>,
prompt: Option<String>,
model: Option<String>,
start_time: Option<u64>,
end_time: Option<u64>,
state: Option<String>,
total_tokens: Option<u64>,
input_tokens: Option<u64>,
output_tokens: Option<u64>,
reasoning_output_tokens: Option<u64>,
automerge: Option<bool>,
explicit_branch_name: Option<String>,
last_update_time: Option<u64>,
duration_secs: Option<u64>,
}
pub fn run_inspect(cli: InspectCli) -> anyhow::Result<()> {
let id = cli.id.to_lowercase();
let tasks = load_task_records()?;
let matches: Vec<TaskFull> = tasks
.into_iter()
.filter(|t| t.task_id.starts_with(&id) || t.branch.as_deref().map(|b| b == id).unwrap_or(false))
.collect();
if matches.is_empty() {
eprintln!("No task matches identifier '{}'.", id);
return Ok(());
}
if matches.len() > 1 {
eprintln!("Identifier '{}' is ambiguous; matches: {}", id, matches.iter().map(|m| &m.task_id[..8]).collect::<Vec<_>>().join(", "));
return Ok(());
}
let task = &matches[0];
if cli.json {
println!("{}", serde_json::to_string_pretty(task)?);
return Ok(());
}
print_human(task);
Ok(())
}
fn base_dir() -> Option<PathBuf> {
if let Ok(val) = std::env::var("CODEX_HOME") { if !val.is_empty() { return std::fs::canonicalize(val).ok(); } }
let home = std::env::var_os("HOME")?;
Some(PathBuf::from(home).join(".codex"))
}
fn load_task_records() -> anyhow::Result<Vec<TaskFull>> {
let mut map: std::collections::HashMap<String, TaskFull> = std::collections::HashMap::new();
let Some(base) = base_dir() else { return Ok(vec![]); };
let tasks = base.join("tasks.jsonl");
if !tasks.exists() { return Ok(vec![]); }
let f = File::open(tasks)?;
let reader = BufReader::new(f);
for line in reader.lines() {
let Ok(line) = line else { continue };
if line.trim().is_empty() { continue; }
let Ok(val) = serde_json::from_str::<serde_json::Value>(&line) else { continue };
let Ok(rec) = serde_json::from_value::<RawRecord>(val) else { continue };
let Some(task_id) = rec.task_id.clone() else { continue };
let entry = map.entry(task_id.clone()).or_insert_with(|| TaskFull { task_id: task_id.clone(), ..Default::default() });
// Initial metadata fields
if rec.start_time.is_some() {
entry.pid = rec.pid.or(entry.pid);
entry.branch = rec.branch.or(entry.branch.clone());
entry.worktree = rec.worktree.or(entry.worktree.clone());
entry.original_branch = rec.original_branch.or(entry.original_branch.clone());
entry.original_commit = rec.original_commit.or(entry.original_commit.clone());
entry.log_path = rec.log_path.or(entry.log_path.clone());
entry.prompt = rec.prompt.or(entry.prompt.clone());
entry.model = rec.model.or(entry.model.clone());
entry.start_time = rec.start_time.or(entry.start_time);
entry.automerge = rec.automerge.or(entry.automerge);
entry.explicit_branch_name = rec.explicit_branch_name.or(entry.explicit_branch_name.clone());
}
if let Some(state) = rec.state { entry.state = Some(state); }
if rec.update_time.is_some() { entry.last_update_time = rec.update_time; }
if rec.end_time.is_some() || rec.completion_time.is_some() {
entry.end_time = rec.end_time.or(rec.completion_time).or(entry.end_time);
}
if let Some(tc) = rec.token_count.as_ref() {
if let Some(total) = tc.get("total_tokens").and_then(|v| v.as_u64()) { entry.total_tokens = Some(total); }
if let Some(inp) = tc.get("input_tokens").and_then(|v| v.as_u64()) { entry.input_tokens = Some(inp); }
if let Some(out) = tc.get("output_tokens").and_then(|v| v.as_u64()) { entry.output_tokens = Some(out); }
if let Some(rout) = tc.get("reasoning_output_tokens").and_then(|v| v.as_u64()) { entry.reasoning_output_tokens = Some(rout); }
}
}
// Compute duration
for t in map.values_mut() {
if let (Some(s), Some(e)) = (t.start_time, t.end_time) { t.duration_secs = Some(e.saturating_sub(s)); }
}
Ok(map.into_values().collect())
}
fn print_human(task: &TaskFull) {
println!("Task {}", task.task_id);
println!("State: {}", task.state.as_deref().unwrap_or("?"));
if let Some(model) = &task.model { println!("Model: {}", model); } else { println!("Model: {}", resolve_default_model()); }
if let Some(branch) = &task.branch { println!("Branch: {}", branch); }
if let Some(wt) = &task.worktree { println!("Worktree: {}", wt); }
if let Some(ob) = &task.original_branch { println!("Original branch: {}", ob); }
if let Some(oc) = &task.original_commit { println!("Original commit: {}", oc); }
if let Some(start) = task.start_time { println!("Start: {}", format_epoch(start)); }
if let Some(end) = task.end_time { println!("End: {}", format_epoch(end)); }
if let Some(d) = task.duration_secs { println!("Duration: {}s", d); }
if let Some(pid) = task.pid { println!("PID: {}", pid); }
if let Some(log) = &task.log_path { println!("Log: {}", log); }
if let Some(am) = task.automerge { println!("Automerge: {}", am); }
if let Some(exp) = &task.explicit_branch_name { println!("Explicit branch name: {}", exp); }
if let Some(total) = task.total_tokens { println!("Total tokens: {}", total); }
if task.input_tokens.is_some() || task.output_tokens.is_some() {
println!(" Input: {:?} Output: {:?} Reasoning: {:?}", task.input_tokens, task.output_tokens, task.reasoning_output_tokens);
}
if let Some(p) = &task.prompt { println!("Prompt:\n{}", p); }
}
fn format_epoch(secs: u64) -> String {
use chrono::{TimeZone, Utc};
if let Some(dt) = Utc.timestamp_opt(secs as i64, 0).single() { dt.to_rfc3339() } else { secs.to_string() }
}
fn resolve_default_model() -> String {
if let Some(base) = base_dir() {
let candidates = ["config.json", "config.yaml", "config.yml"];
for name in candidates {
let p = base.join(name);
if p.exists() {
if let Ok(raw) = fs::read_to_string(&p) {
if name.ends_with(".json") {
if let Ok(v) = serde_json::from_str::<serde_json::Value>(&raw) {
if let Some(m) = v.get("model").and_then(|x| x.as_str()) { if !m.trim().is_empty() { return m.to_string(); } }
}
} else {
for line in raw.lines() { if let Some(rest) = line.trim().strip_prefix("model:") { let val = rest.trim().trim_matches('"'); if !val.is_empty() { return val.to_string(); } } }
}
}
}
}
}
"codex-mini-latest".to_string()
}

View File

@@ -1,7 +1,11 @@
pub mod concurrent;
pub mod debug_sandbox;
mod exit_status;
pub mod login;
pub mod proto;
pub mod tasks;
pub mod logs;
pub mod inspect;
use clap::Parser;
use codex_common::CliConfigOverrides;

145
codex-rs/cli/src/logs.rs Normal file
View File

@@ -0,0 +1,145 @@
use clap::Parser;
use serde::Deserialize;
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
use std::path::PathBuf;
use std::thread;
use std::time::Duration;
#[derive(Debug, Parser)]
pub struct LogsCli {
/// Task identifier: full/short task UUID or branch name
pub id: String,
/// Follow log output (stream new lines)
#[arg(short = 'f', long = "follow")]
pub follow: bool,
/// Show only the last N lines (like tail -n). If omitted, show full file.
#[arg(short = 'n', long = "lines")]
pub lines: Option<usize>,
}
#[derive(Debug, Deserialize)]
struct RawRecord {
task_id: Option<String>,
branch: Option<String>,
log_path: Option<String>,
start_time: Option<u64>,
}
#[derive(Debug, Clone)]
struct TaskMeta {
task_id: String,
branch: Option<String>,
log_path: String,
start_time: Option<u64>,
}
pub fn run_logs(cli: LogsCli) -> anyhow::Result<()> {
let id = cli.id.to_lowercase();
let tasks = load_tasks_index()?;
if tasks.is_empty() {
eprintln!("No tasks found in tasks.jsonl");
return Ok(());
}
let matches: Vec<&TaskMeta> = tasks
.values()
.filter(|meta| {
meta.task_id.starts_with(&id) || meta.branch.as_deref().map(|b| b == id).unwrap_or(false)
})
.collect();
if matches.is_empty() {
eprintln!("No task matches identifier '{}'.", id);
return Ok(());
}
if matches.len() > 1 {
eprintln!("Identifier '{}' is ambiguous; matches: {}", id, matches.iter().map(|m| &m.task_id[..8]).collect::<Vec<_>>().join(", "));
return Ok(());
}
let task = matches[0];
let path = PathBuf::from(&task.log_path);
if !path.exists() {
eprintln!("Log file not found at {}", path.display());
return Ok(());
}
if cli.follow {
tail_file(&path, cli.lines)?;
} else {
print_file(&path, cli.lines)?;
}
Ok(())
}
fn base_dir() -> Option<PathBuf> {
if let Ok(val) = std::env::var("CODEX_HOME") { if !val.is_empty() { return std::fs::canonicalize(val).ok(); } }
let home = std::env::var_os("HOME")?;
Some(PathBuf::from(home).join(".codex"))
}
fn load_tasks_index() -> anyhow::Result<HashMap<String, TaskMeta>> {
let mut map: HashMap<String, TaskMeta> = HashMap::new();
let Some(base) = base_dir() else { return Ok(map); };
let tasks = base.join("tasks.jsonl");
if !tasks.exists() { return Ok(map); }
let f = File::open(tasks)?;
let reader = BufReader::new(f);
for line in reader.lines() {
let Ok(line) = line else { continue };
if line.trim().is_empty() { continue; }
let Ok(val) = serde_json::from_str::<serde_json::Value>(&line) else { continue };
let Ok(rec) = serde_json::from_value::<RawRecord>(val) else { continue };
let (Some(task_id), Some(log_path)) = (rec.task_id.clone(), rec.log_path.clone()) else { continue };
// Insert or update only if not already present (we just need initial metadata)
map.entry(task_id.clone()).or_insert(TaskMeta {
task_id,
branch: rec.branch,
log_path,
start_time: rec.start_time,
});
}
Ok(map)
}
fn print_file(path: &PathBuf, last_lines: Option<usize>) -> anyhow::Result<()> {
if let Some(n) = last_lines {
let f = File::open(path)?;
let reader = BufReader::new(f);
let mut buf: std::collections::VecDeque<String> = std::collections::VecDeque::with_capacity(n);
for line in reader.lines() {
if let Ok(l) = line { if buf.len() == n { buf.pop_front(); } buf.push_back(l); }
}
for l in buf { println!("{}", l); }
return Ok(());
}
// Full file
let mut f = File::open(path)?;
let mut contents = String::new();
f.read_to_string(&mut contents)?;
print!("{}", contents);
Ok(())
}
fn tail_file(path: &PathBuf, last_lines: Option<usize>) -> anyhow::Result<()> {
use std::io::{self};
// Initial output
if let Some(n) = last_lines { print_file(path, Some(n))?; } else { print_file(path, None)?; }
let mut f = File::open(path)?;
let mut pos = f.metadata()?.len();
loop {
thread::sleep(Duration::from_millis(500));
let meta = match f.metadata() { Ok(m) => m, Err(_) => break };
let len = meta.len();
if len < pos { // truncated
pos = 0;
}
if len > pos {
f.seek(SeekFrom::Start(pos))?;
let mut buf = String::new();
f.read_to_string(&mut buf)?;
if !buf.is_empty() { print!("{}", buf); io::Write::flush(&mut std::io::stdout())?; }
pos = len;
}
}
Ok(())
}

View File

@@ -4,6 +4,7 @@ use clap_complete::Shell;
use clap_complete::generate;
use codex_chatgpt::apply_command::ApplyCommand;
use codex_chatgpt::apply_command::run_apply_command;
use codex_cli::concurrent::maybe_spawn_concurrent;
use codex_cli::LandlockCommand;
use codex_cli::SeatbeltCommand;
use codex_cli::login::run_login_with_chatgpt;
@@ -32,6 +33,25 @@ struct MultitoolCli {
#[clap(flatten)]
interactive: TuiCli,
/// Autonomous mode: run the command in the background & concurrently using a git worktree.
/// Requires the current directory (or --cd provided path) to be a git repository.
#[clap(long)]
concurrent: bool,
/// Control whether the concurrent run auto-merges the worktree branch back into the original branch.
/// Defaults to true (may also be set via CONCURRENT_AUTOMERGE env var).
#[clap(long = "concurrent-automerge", value_name = "BOOL")]
concurrent_automerge: Option<bool>,
/// Explicit branch name to use for the concurrent worktree instead of the default `codex/<slug>`.
/// May also be set via CONCURRENT_BRANCH_NAME env var.
#[clap(long = "concurrent-branch-name", value_name = "BRANCH")]
concurrent_branch_name: Option<String>,
/// Best-of-n: run n concurrent worktrees (1-4) and let user pick the best result. Implies --concurrent and disables automerge.
#[clap(long = "best-of-n", short = 'n', value_name = "N", default_value_t = 1)]
pub best_of_n: u8,
#[clap(subcommand)]
subcommand: Option<Subcommand>,
}
@@ -61,6 +81,15 @@ enum Subcommand {
/// Apply the latest diff produced by Codex agent as a `git apply` to your local working tree.
#[clap(visible_alias = "a")]
Apply(ApplyCommand),
/// Manage / inspect concurrent background tasks.
Tasks(codex_cli::tasks::TasksCli),
/// Show or follow logs for a specific task.
Logs(codex_cli::logs::LogsCli),
/// Inspect full metadata for a task.
Inspect(codex_cli::inspect::InspectCli),
}
#[derive(Debug, Parser)]
@@ -104,8 +133,64 @@ async fn cli_main(codex_linux_sandbox_exe: Option<PathBuf>) -> anyhow::Result<()
match cli.subcommand {
None => {
let mut tui_cli = cli.interactive;
let root_raw_overrides = cli.config_overrides.raw_overrides.clone();
prepend_config_flags(&mut tui_cli.config_overrides, cli.config_overrides);
codex_tui::run_main(tui_cli, codex_linux_sandbox_exe)?;
// Best-of-n logic
if cli.best_of_n > 1 {
let n = cli.best_of_n.min(4).max(1);
let mut spawned_any = false;
let base_branch = if let Some(ref name) = cli.concurrent_branch_name {
name.trim().to_string()
} else {
// Derive slug from prompt (copied from maybe_spawn_concurrent)
let raw_prompt = tui_cli.prompt.as_deref().unwrap_or("");
let snippet = raw_prompt.chars().take(32).collect::<String>();
let mut slug: String = snippet
.chars()
.map(|c| if c.is_ascii_alphanumeric() { c.to_ascii_lowercase() } else { '-' })
.collect();
while slug.contains("--") { slug = slug.replace("--", "-"); }
slug = slug.trim_matches('-').to_string();
if slug.is_empty() { slug = "prompt".into(); }
format!("codex/{}", slug)
};
for i in 1..=n {
let mut tui_cli_n = tui_cli.clone();
// Suffix branch name with -01, -02, etc.
let branch_name = format!("{}-{:02}", base_branch, i);
let branch_name_opt = Some(branch_name);
// Always automerge = false for best-of-n
match maybe_spawn_concurrent(
&mut tui_cli_n,
&root_raw_overrides,
true, // force concurrent
Some(false),
&branch_name_opt,
) {
Ok(true) => { spawned_any = true; },
Ok(false) => {},
Err(e) => { eprintln!("Error spawning best-of-n run {}: {e}", i); },
}
}
if !spawned_any {
codex_tui::run_main(tui_cli, codex_linux_sandbox_exe)?;
}
// If any spawned, do not run TUI (user will see task IDs)
} else {
// Attempt concurrent background spawn; if it returns true we skip launching the TUI.
if let Ok(spawned) = maybe_spawn_concurrent(
&mut tui_cli,
&root_raw_overrides,
cli.concurrent,
cli.concurrent_automerge,
&cli.concurrent_branch_name,
) {
if !spawned { codex_tui::run_main(tui_cli, codex_linux_sandbox_exe)?; }
} else {
// On error fallback to interactive.
codex_tui::run_main(tui_cli, codex_linux_sandbox_exe)?;
}
}
}
Some(Subcommand::Exec(mut exec_cli)) => {
prepend_config_flags(&mut exec_cli.config_overrides, cli.config_overrides);
@@ -147,6 +232,15 @@ async fn cli_main(codex_linux_sandbox_exe: Option<PathBuf>) -> anyhow::Result<()
prepend_config_flags(&mut apply_cli.config_overrides, cli.config_overrides);
run_apply_command(apply_cli).await?;
}
Some(Subcommand::Tasks(tasks_cli)) => {
codex_cli::tasks::run_tasks(tasks_cli)?;
}
Some(Subcommand::Logs(logs_cli)) => {
codex_cli::logs::run_logs(logs_cli)?;
}
Some(Subcommand::Inspect(inspect_cli)) => {
codex_cli::inspect::run_inspect(inspect_cli)?;
}
}
Ok(())

212
codex-rs/cli/src/tasks.rs Normal file
View File

@@ -0,0 +1,212 @@
use clap::{Parser, Subcommand};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::fs;
use chrono::Local;
use codex_common::elapsed::format_duration;
#[derive(Debug, Parser)]
pub struct TasksCli {
#[command(subcommand)]
pub cmd: TasksCommand,
}
#[derive(Debug, Subcommand)]
pub enum TasksCommand {
/// List background concurrent tasks (from ~/.codex/tasks.jsonl)
Ls(TasksListArgs),
}
#[derive(Debug, Parser)]
pub struct TasksListArgs {
/// Output raw JSON instead of table
#[arg(long)]
pub json: bool,
/// Limit number of tasks displayed (most recent first)
#[arg(long)]
pub limit: Option<usize>,
/// Show completed tasks as well (by default only running tasks)
#[arg(short = 'a', long = "all")]
pub all: bool,
/// Show all columns including prompt text
#[arg(long = "all-columns")]
pub all_columns: bool,
}
#[derive(Debug, Deserialize)]
struct RawRecord {
task_id: Option<String>,
pid: Option<u64>,
worktree: Option<String>,
branch: Option<String>,
original_branch: Option<String>,
original_commit: Option<String>,
log_path: Option<String>,
prompt: Option<String>,
model: Option<String>,
start_time: Option<u64>,
update_time: Option<u64>,
token_count: Option<serde_json::Value>,
state: Option<String>,
completion_time: Option<u64>,
end_time: Option<u64>,
}
#[derive(Debug, Serialize, Default, Clone)]
struct TaskAggregate {
task_id: String,
pid: Option<u64>,
branch: Option<String>,
worktree: Option<String>,
prompt: Option<String>,
model: Option<String>,
start_time: Option<u64>,
last_update_time: Option<u64>,
total_tokens: Option<u64>,
state: Option<String>,
end_time: Option<u64>,
}
pub fn run_tasks(cmd: TasksCli) -> anyhow::Result<()> {
match cmd.cmd {
TasksCommand::Ls(args) => list_tasks(args),
}
}
fn base_dir() -> Option<std::path::PathBuf> {
if let Ok(val) = std::env::var("CODEX_HOME") { if !val.is_empty() { return std::fs::canonicalize(val).ok(); } }
let home = std::env::var_os("HOME")?;
let base = std::path::PathBuf::from(home).join(".codex");
Some(base)
}
fn list_tasks(args: TasksListArgs) -> anyhow::Result<()> {
let Some(base) = base_dir() else {
println!("No home directory found; cannot locate tasks.jsonl");
return Ok(());
};
let path = base.join("tasks.jsonl");
if !path.exists() {
println!("No tasks.jsonl found (no concurrent tasks recorded yet)");
return Ok(());
}
let f = File::open(&path)?;
let reader = BufReader::new(f);
let mut agg: HashMap<String, TaskAggregate> = HashMap::new();
for line_res in reader.lines() {
let line = match line_res { Ok(l) => l, Err(_) => continue };
if line.trim().is_empty() { continue; }
let raw: serde_json::Value = match serde_json::from_str(&line) { Ok(v) => v, Err(_) => continue };
let rec: RawRecord = match serde_json::from_value(raw) { Ok(r) => r, Err(_) => continue };
let Some(task_id) = rec.task_id.clone() else { continue }; // must have task_id
let entry = agg.entry(task_id.clone()).or_insert_with(|| TaskAggregate { task_id: task_id.clone(), ..Default::default() });
if rec.start_time.is_some() { // initial metadata line
entry.pid = rec.pid.or(entry.pid);
entry.branch = rec.branch.or(entry.branch.clone());
entry.worktree = rec.worktree.or(entry.worktree.clone());
entry.prompt = rec.prompt.or(entry.prompt.clone());
entry.model = rec.model.or(entry.model.clone());
entry.start_time = rec.start_time.or(entry.start_time);
}
if let Some(tc_val) = rec.token_count.as_ref() { if tc_val.is_object() { if let Some(total) = tc_val.get("total_tokens").and_then(|v| v.as_u64()) { entry.total_tokens = Some(total); } } }
if rec.update_time.is_some() { entry.last_update_time = rec.update_time; }
if let Some(state) = rec.state { entry.state = Some(state); }
if rec.completion_time.is_some() || rec.end_time.is_some() {
entry.end_time = rec.end_time.or(rec.completion_time).or(entry.end_time);
}
}
// Collect and sort by start_time desc
let mut tasks: Vec<TaskAggregate> = agg.into_values().collect();
tasks.sort_by_key(|j| std::cmp::Reverse(j.start_time.unwrap_or(0)));
if !args.all { tasks.retain(|j| j.state.as_deref() != Some("done")); }
if let Some(limit) = args.limit { tasks.truncate(limit); }
if args.json {
println!("{}", serde_json::to_string_pretty(&tasks)?);
return Ok(());
}
if tasks.is_empty() {
println!("No tasks found");
return Ok(());
}
// Table header
if args.all_columns {
println!("\x1b[1m{:<8} {:>6} {:<22} {:<12} {:<8} {:>8} {:<12} {}\x1b[0m", "TASK_ID", "PID", "BRANCH", "START", "STATE", "TOKENS", "MODEL", "PROMPT");
} else {
// Widened branch column to 22 chars for better readability.
println!("\x1b[1m{:<8} {:>6} {:<22} {:<12} {:<8} {:>8} {:<12}\x1b[0m", "TASK_ID", "PID", "BRANCH", "START", "STATE", "TOKENS", "MODEL");
}
for t in tasks {
let task_short = if t.task_id.len() > 8 { &t.task_id[..8] } else { &t.task_id };
let pid_str = t.pid.map(|p| p.to_string()).unwrap_or_default();
let mut branch = t.branch.clone().unwrap_or_default();
let branch_limit = if args.all_columns { 22 } else { 22 }; // unified width
if branch.len() > branch_limit { branch.truncate(branch_limit); }
let start = t.start_time.map(|start_secs| {
let now = Local::now().timestamp() as u64;
if now > start_secs {
let elapsed = std::time::Duration::from_secs(now - start_secs);
format!("{} ago", format_duration(elapsed))
} else {
"just now".to_string()
}
}).unwrap_or_default();
let tokens = t.total_tokens.map(|t| t.to_string()).unwrap_or_default();
let state = t.state.clone().unwrap_or_else(|| "?".into());
let mut model = t.model.clone().unwrap_or_default();
if model.trim().is_empty() { model = resolve_default_model(); }
if model.is_empty() { model.push('-'); }
if model.len() > 12 { model.truncate(12); }
if args.all_columns {
let mut prompt = t.prompt.clone().unwrap_or_default().replace('\n', " ");
if prompt.len() > 60 { prompt.truncate(60); }
println!("{:<8} {:>6} {:<22} {:<12} {:<8} {:>8} {:<12} {}", task_short, pid_str, branch, start, state, tokens, model, prompt);
} else {
println!("{:<8} {:>6} {:<22} {:<12} {:<8} {:>8} {:<12}", task_short, pid_str, branch, start, state, tokens, model);
}
}
Ok(())
}
fn resolve_default_model() -> String {
// Attempt to read config json/yaml for model, otherwise fallback to hardcoded default.
if let Some(base) = base_dir() {
let candidates = ["config.json", "config.yaml", "config.yml"];
for name in candidates {
let p = base.join(name);
if p.exists() {
if let Ok(raw) = fs::read_to_string(&p) {
// Try JSON first.
if name.ends_with(".json") {
if let Ok(v) = serde_json::from_str::<serde_json::Value>(&raw) {
if let Some(m) = v.get("model").and_then(|x| x.as_str()) {
if !m.trim().is_empty() { return m.to_string(); }
}
}
} else {
// Very lightweight YAML parse: look for line starting with model:
for line in raw.lines() {
if let Some(rest) = line.trim().strip_prefix("model:") {
let val = rest.trim().trim_matches('"');
if !val.is_empty() {
return val.to_string();
}
}
}
}
}
}
}
}
// Fallback default agentic model used elsewhere.
"codex-mini-latest".to_string()
}

View File

@@ -0,0 +1,101 @@
// Minimal integration test for --concurrent background spawning.
// Verifies that invoking the top-level CLI with --concurrent records a task entry
// in CODEX_HOME/tasks.jsonl and that multiple invocations append distinct task_ids.
use std::fs;
use std::io::Write;
use std::process::Command;
use std::time::{Duration, Instant};
use tempfile::TempDir;
// Skip helper when sandbox network disabled (mirrors existing tests' behavior).
fn network_disabled() -> bool {
std::env::var(codex_core::exec::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok()
}
#[test]
fn concurrent_creates_task_records() {
if network_disabled() {
eprintln!("Skipping concurrent_creates_task_records due to sandbox network-disabled env");
return;
}
// Temp home (CODEX_HOME) and separate temp git repo.
let home = TempDir::new().expect("temp home");
let repo = TempDir::new().expect("temp repo");
// Initialize a minimal git repository (needed for --concurrent worktree logic).
assert!(Command::new("git").arg("init").current_dir(repo.path()).status().unwrap().success());
fs::write(repo.path().join("README.md"), "# temp\n").unwrap();
assert!(Command::new("git").arg("add").arg(".").current_dir(repo.path()).status().unwrap().success());
assert!(Command::new("git")
.args(["commit", "-m", "init"]) // may warn about user/email; allow non-zero if commit already exists
.current_dir(repo.path())
.status()
.map(|s| s.success())
.unwrap_or(true));
// SSE fixture so the spawned background exec does not perform a real network call.
let fixture = home.path().join("fixture.sse");
let mut f = fs::File::create(&fixture).unwrap();
writeln!(f, "data: {{\"choices\":[{{\"delta\":{{\"content\":\"ok\"}}}}]}}\n").unwrap();
writeln!(f, "data: {{\"choices\":[{{\"delta\":{{}}}}]}}\n").unwrap();
writeln!(f, "data: [DONE]\n").unwrap();
// Helper to run one concurrent invocation with a given prompt.
let run_once = |prompt: &str| {
let mut cmd = Command::new("cargo");
cmd.arg("run")
.arg("-p")
.arg("codex-cli")
.arg("--quiet")
.arg("--")
.arg("--concurrent")
.arg("--full-auto")
.arg("-C")
.arg(repo.path())
.arg(prompt);
cmd.env("CODEX_HOME", home.path())
.env("OPENAI_API_KEY", "dummy")
.env("CODEX_RS_SSE_FIXTURE", &fixture)
.env("OPENAI_BASE_URL", "http://unused.local");
let output = cmd.output().expect("spawn codex");
assert!(output.status.success(), "concurrent codex run failed: stderr={}", String::from_utf8_lossy(&output.stderr));
};
run_once("Add a cat in ASCII");
run_once("Add hello world comment");
// Wait for tasks.jsonl to contain at least two lines with task records.
let tasks_path = home.path().join("tasks.jsonl");
let deadline = Instant::now() + Duration::from_secs(10);
let mut lines: Vec<String> = Vec::new();
while Instant::now() < deadline {
if tasks_path.exists() {
let content = fs::read_to_string(&tasks_path).unwrap_or_default();
lines = content.lines().filter(|l| !l.trim().is_empty()).map(|s| s.to_string()).collect();
if lines.len() >= 2 { break; }
}
std::thread::sleep(Duration::from_millis(100));
}
assert!(lines.len() >= 2, "Expected at least 2 task records, got {}", lines.len());
// Parse JSON and ensure distinct task_ids and prompts present.
let mut task_ids = std::collections::HashSet::new();
let mut saw_cat = false;
let mut saw_hello = false;
for line in &lines {
if let Ok(val) = serde_json::from_str::<serde_json::Value>(line) {
if let Some(tid) = val.get("task_id").and_then(|v| v.as_str()) { task_ids.insert(tid.to_string()); }
if let Some(p) = val.get("prompt").and_then(|v| v.as_str()) {
if p.contains("cat") { saw_cat = true; }
if p.contains("hello") { saw_hello = true; }
}
assert_eq!(val.get("state").and_then(|v| v.as_str()), Some("started"), "task record missing started state");
}
}
assert!(task_ids.len() >= 2, "Expected distinct task_ids, got {:?}", task_ids);
assert!(saw_cat, "Did not find cat prompt in tasks.jsonl");
assert!(saw_hello, "Did not find hello prompt in tasks.jsonl");
}

View File

@@ -22,7 +22,8 @@ fn format_elapsed_millis(millis: i64) -> String {
if millis < 1000 {
format!("{millis}ms")
} else if millis < 60_000 {
format!("{:.2}s", millis as f64 / 1000.0)
let secs = millis / 1000;
format!("{secs}s")
} else {
let minutes = millis / 60_000;
let seconds = (millis % 60_000) / 1000;
@@ -48,13 +49,12 @@ mod tests {
#[test]
fn test_format_duration_seconds() {
// Durations between 1s (inclusive) and 60s (exclusive) should be
// printed with 2-decimal-place seconds.
// printed as whole seconds.
let dur = Duration::from_millis(1_500); // 1.5s
assert_eq!(format_duration(dur), "1.50s");
assert_eq!(format_duration(dur), "1s");
// 59.999s rounds to 60.00s
let dur2 = Duration::from_millis(59_999);
assert_eq!(format_duration(dur2), "60.00s");
assert_eq!(format_duration(dur2), "59s");
}
#[test]

View File

@@ -22,10 +22,26 @@ use shlex::try_join;
use std::collections::HashMap;
use std::io::Write;
use std::time::Instant;
use std::path::Path;
use crate::event_processor::EventProcessor;
use crate::event_processor::create_config_summary_entries;
// Helper: determine base ~/.codex directory similar to concurrent module.
fn codex_base_dir_for_logging() -> Option<std::path::PathBuf> {
if let Ok(val) = std::env::var("CODEX_HOME") { if !val.is_empty() { return std::fs::canonicalize(val).ok(); } }
let home = std::env::var_os("HOME")?;
let base = std::path::PathBuf::from(home).join(".codex");
let _ = std::fs::create_dir_all(&base);
Some(base)
}
fn append_json_line(path: &Path, value: &serde_json::Value) -> std::io::Result<()> {
use std::io::Write as _;
let mut f = std::fs::OpenOptions::new().create(true).append(true).open(path)?;
writeln!(f, "{}", value.to_string())
}
/// This should be configurable. When used in CI, users may not want to impose
/// a limit so they can see the full transcript.
const MAX_OUTPUT_LINES_FOR_EXEC_TOOL_CALL: usize = 20;
@@ -54,6 +70,7 @@ pub(crate) struct EventProcessorWithHumanOutput {
show_agent_reasoning: bool,
answer_started: bool,
reasoning_started: bool,
last_token_usage: Option<TokenUsage>,
}
impl EventProcessorWithHumanOutput {
@@ -77,6 +94,7 @@ impl EventProcessorWithHumanOutput {
show_agent_reasoning: !config.hide_agent_reasoning,
answer_started: false,
reasoning_started: false,
last_token_usage: None,
}
} else {
Self {
@@ -93,6 +111,7 @@ impl EventProcessorWithHumanOutput {
show_agent_reasoning: !config.hide_agent_reasoning,
answer_started: false,
reasoning_started: false,
last_token_usage: None,
}
}
}
@@ -168,11 +187,60 @@ impl EventProcessor for EventProcessorWithHumanOutput {
EventMsg::BackgroundEvent(BackgroundEventEvent { message }) => {
ts_println!(self, "{}", message.style(self.dimmed));
}
EventMsg::TaskStarted | EventMsg::TaskComplete(_) => {
// Ignore.
EventMsg::TaskStarted => {
// no-op
}
EventMsg::TokenCount(TokenUsage { total_tokens, .. }) => {
ts_println!(self, "tokens used: {total_tokens}");
EventMsg::TaskComplete(_) => {
// On completion, append a final state entry with last token count snapshot.
if let Ok(task_id) = std::env::var("CODEX_TASK_ID") {
if let Some(base) = codex_base_dir_for_logging() {
let tasks_path = base.join("tasks.jsonl");
let ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let token_json = self.last_token_usage.as_ref().map(|u| serde_json::json!({
"input_tokens": u.input_tokens,
"cached_input_tokens": u.cached_input_tokens,
"output_tokens": u.output_tokens,
"reasoning_output_tokens": u.reasoning_output_tokens,
"total_tokens": u.total_tokens,
}));
let mut obj = serde_json::json!({
"task_id": task_id,
"completion_time": ts,
"end_time": ts,
"state": "done",
});
if let Some(tj) = token_json { if let serde_json::Value::Object(ref mut map) = obj { map.insert("token_count".to_string(), tj); } }
let _ = append_json_line(&tasks_path, &obj);
}
}
}
EventMsg::TokenCount(token_usage_full) => {
self.last_token_usage = Some(token_usage_full.clone());
ts_println!(self, "tokens used: {}", token_usage_full.total_tokens);
if let Ok(task_id) = std::env::var("CODEX_TASK_ID") {
if let Some(base) = codex_base_dir_for_logging() {
let tasks_path = base.join("tasks.jsonl");
let ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let full = serde_json::json!({
"task_id": task_id,
"update_time": ts,
"token_count": {
"input_tokens": token_usage_full.input_tokens,
"cached_input_tokens": token_usage_full.cached_input_tokens,
"output_tokens": token_usage_full.output_tokens,
"reasoning_output_tokens": token_usage_full.reasoning_output_tokens,
"total_tokens": token_usage_full.total_tokens,
}
});
let _ = append_json_line(&tasks_path, &full);
}
}
}
EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }) => {
if !self.answer_started {

View File

@@ -237,6 +237,13 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
}
}
// If running in concurrent auto-merge mode, attempt to commit and merge original branch.
if std::env::var("CODEX_CONCURRENT_AUTOMERGE").ok().as_deref() == Some("1") {
if let Err(e) = auto_commit_and_fast_forward_original_branch() {
eprintln!("[codex-concurrent] Auto-merge skipped: {e}");
}
}
Ok(())
}
@@ -261,3 +268,88 @@ fn handle_last_message(
}
Ok(())
}
/// Auto-commit changes in the concurrent worktree branch and integrate them back into the original branch.
/// Strategy:
/// 1. Commit any pending changes on the concurrent branch.
/// 2. Checkout the original branch in the original root and perform a --no-ff merge.
/// Safety: Only performs merge operations if repository state allows; on conflicts it aborts and reports.
fn auto_commit_and_fast_forward_original_branch() -> anyhow::Result<()> {
use std::process::Command;
let concurrent_branch = std::env::var("CODEX_CONCURRENT_BRANCH").ok().ok_or_else(|| anyhow::anyhow!("missing concurrent branch env"))?;
let original_branch = std::env::var("CODEX_ORIGINAL_BRANCH").ok().ok_or_else(|| anyhow::anyhow!("missing original branch env"))?;
let original_commit = std::env::var("CODEX_ORIGINAL_COMMIT").ok().ok_or_else(|| anyhow::anyhow!("missing original commit env"))?;
let worktree_dir_env = std::env::var("CODEX_CONCURRENT_WORKTREE").ok();
let original_root_env = std::env::var("CODEX_ORIGINAL_ROOT").ok();
// Determine directory to run git commit for concurrent branch (worktree if provided, else repo root from rev-parse).
let worktree_dir = if let Some(wt) = worktree_dir_env.clone() {
std::path::PathBuf::from(wt)
} else {
let repo_root = Command::new("git").args(["rev-parse", "--show-toplevel"]).output()?;
if !repo_root.status.success() { anyhow::bail!("not a git repo"); }
std::path::PathBuf::from(String::from_utf8_lossy(&repo_root.stdout).trim().to_string())
};
// Commit pending changes (git add ., git commit -m ...).
let status_out = Command::new("git")
.current_dir(&worktree_dir)
.args(["status", "--porcelain"]).output()?;
if !status_out.status.success() { anyhow::bail!("git status failed"); }
if !status_out.stdout.is_empty() {
let add_status = Command::new("git")
.current_dir(&worktree_dir)
.args(["add", "."]).status()?;
if !add_status.success() { anyhow::bail!("git add failed"); }
let commit_msg = format!("Codex concurrent run auto-commit on branch {concurrent_branch}");
let commit_status = Command::new("git")
.current_dir(&worktree_dir)
.args(["commit", "-m", &commit_msg]).status()?;
if !commit_status.success() { anyhow::bail!("git commit failed"); }
eprintln!("[codex-concurrent] Created commit in {concurrent_branch}.");
} else {
eprintln!("[codex-concurrent] No changes to commit in {concurrent_branch}.");
}
// Capture head of concurrent branch (for potential future use / diagnostics).
let concurrent_head_out = Command::new("git")
.current_dir(&worktree_dir)
.args(["rev-parse", &concurrent_branch]).output()?;
if !concurrent_head_out.status.success() { anyhow::bail!("failed to rev-parse concurrent branch"); }
// Determine where to integrate (original root if known, else worktree).
let integration_dir = if let Some(root) = original_root_env.clone() { std::path::PathBuf::from(root) } else { worktree_dir.clone() };
// Checkout original branch.
let co_status = Command::new("git")
.current_dir(&integration_dir)
.args(["checkout", &original_branch])
.status()?;
if !co_status.success() { anyhow::bail!("git checkout {original_branch} failed in original root"); }
// Check if concurrent branch already merged (ancestor test).
let ancestor_status = Command::new("git")
.current_dir(&integration_dir)
.args(["merge-base", "--is-ancestor", &concurrent_branch, &original_branch])
.status();
if let Ok(code) = ancestor_status {
if code.success() {
eprintln!("[codex-concurrent] {concurrent_branch} already merged into {original_branch}; skipping.");
return Ok(());
}
}
// Perform a --no-ff merge.
let merge_msg = format!("Merge concurrent Codex branch {concurrent_branch} (base {original_commit})");
let merge_status = Command::new("git")
.current_dir(&integration_dir)
.args(["merge", "--no-ff", &concurrent_branch, "-m", &merge_msg])
.status()?;
if !merge_status.success() {
let _ = Command::new("git").current_dir(&integration_dir).args(["merge", "--abort"]).status();
anyhow::bail!("git merge --no-ff failed (conflicts?)");
}
eprintln!("[codex-concurrent] Merged {concurrent_branch} into {original_branch} in original root: {}", integration_dir.display());
Ok(())
}

View File

@@ -3,7 +3,7 @@ use codex_common::ApprovalModeCliArg;
use codex_common::CliConfigOverrides;
use std::path::PathBuf;
#[derive(Parser, Debug)]
#[derive(Parser, Debug, Clone)]
#[command(version)]
pub struct Cli {
/// Optional user prompt to start the session.