mirror of
https://github.com/openai/codex.git
synced 2026-04-24 06:35:50 +00:00
adding codex jobs command, including jobs ls, inspect, logs and -a options
This commit is contained in:
2
codex-rs/Cargo.lock
generated
2
codex-rs/Cargo.lock
generated
@@ -626,6 +626,7 @@ name = "codex-cli"
|
||||
version = "0.0.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"chrono",
|
||||
"clap",
|
||||
"clap_complete",
|
||||
"codex-chatgpt",
|
||||
@@ -636,6 +637,7 @@ dependencies = [
|
||||
"codex-login",
|
||||
"codex-mcp-server",
|
||||
"codex-tui",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"tracing",
|
||||
|
||||
@@ -27,6 +27,8 @@ codex-linux-sandbox = { path = "../linux-sandbox" }
|
||||
codex-mcp-server = { path = "../mcp-server" }
|
||||
codex-tui = { path = "../tui" }
|
||||
serde_json = "1"
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
chrono = { version = "0.4", default-features = false, features = ["clock"] }
|
||||
tokio = { version = "1", features = [
|
||||
"io-std",
|
||||
"macros",
|
||||
|
||||
@@ -164,6 +164,8 @@ pub fn maybe_spawn_concurrent(
|
||||
if let Some(oc) = &original_commit { cmd.env("CODEX_ORIGINAL_COMMIT", oc); }
|
||||
if let Ok(orig_root) = std::env::current_dir() { cmd.env("CODEX_ORIGINAL_ROOT", orig_root); }
|
||||
}
|
||||
// Provide job id so child process can emit token_count updates to tasks.jsonl.
|
||||
cmd.env("CODEX_JOB_ID", &job_id);
|
||||
cmd.stdout(Stdio::from(file));
|
||||
if let Some(f2) = file_err { cmd.stderr(Stdio::from(f2)); }
|
||||
match cmd.spawn() {
|
||||
@@ -187,7 +189,7 @@ pub fn maybe_spawn_concurrent(
|
||||
.map(|d| d.as_secs())
|
||||
.unwrap_or(0);
|
||||
if let Ok(base) = codex_base_dir() {
|
||||
let jobs_path = base.join("jobs.jsonl");
|
||||
let tasks_path = base.join("tasks.jsonl");
|
||||
let record = serde_json::json!({
|
||||
"job_id": job_id,
|
||||
"pid": child.id(),
|
||||
@@ -197,17 +199,20 @@ pub fn maybe_spawn_concurrent(
|
||||
"original_commit": original_commit,
|
||||
"log_path": log_path.display().to_string(),
|
||||
"prompt": raw_prompt,
|
||||
"model": tui_cli.model.clone(),
|
||||
"start_time": record_time,
|
||||
"automerge": effective_automerge,
|
||||
"explicit_branch_name": user_branch_name_opt,
|
||||
"token_count": serde_json::Value::Null,
|
||||
"state": "started",
|
||||
});
|
||||
if let Ok(mut f) = std::fs::OpenOptions::new().create(true).append(true).open(&jobs_path) {
|
||||
if let Ok(mut f) = std::fs::OpenOptions::new().create(true).append(true).open(&tasks_path) {
|
||||
use std::io::Write;
|
||||
if let Err(e) = writeln!(f, "{}", record.to_string()) {
|
||||
eprintln!("Warning: failed writing job record to {}: {e}", jobs_path.display());
|
||||
eprintln!("Warning: failed writing task record to {}: {e}", tasks_path.display());
|
||||
}
|
||||
} else {
|
||||
eprintln!("Warning: could not open jobs log file at {}", jobs_path.display());
|
||||
eprintln!("Warning: could not open tasks log file at {}", tasks_path.display());
|
||||
}
|
||||
}
|
||||
return Ok(true); // background spawned
|
||||
|
||||
185
codex-rs/cli/src/inspect.rs
Normal file
185
codex-rs/cli/src/inspect.rs
Normal file
@@ -0,0 +1,185 @@
|
||||
use clap::Parser;
|
||||
use serde::Deserialize;
|
||||
use std::fs::File;
|
||||
use std::io::{BufRead, BufReader};
|
||||
use std::path::PathBuf;
|
||||
use std::fs;
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
pub struct InspectCli {
|
||||
/// Job identifier (full/short job id or exact branch name)
|
||||
pub id: String,
|
||||
/// Output JSON instead of human table
|
||||
#[arg(long)]
|
||||
pub json: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct RawRecord {
|
||||
job_id: Option<String>,
|
||||
pid: Option<u64>,
|
||||
worktree: Option<String>,
|
||||
branch: Option<String>,
|
||||
original_branch: Option<String>,
|
||||
original_commit: Option<String>,
|
||||
log_path: Option<String>,
|
||||
prompt: Option<String>,
|
||||
model: Option<String>,
|
||||
start_time: Option<u64>,
|
||||
update_time: Option<u64>,
|
||||
token_count: Option<serde_json::Value>,
|
||||
state: Option<String>,
|
||||
completion_time: Option<u64>,
|
||||
end_time: Option<u64>,
|
||||
automerge: Option<bool>,
|
||||
explicit_branch_name: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Serialize, Default, Clone)]
|
||||
struct JobFull {
|
||||
job_id: String,
|
||||
pid: Option<u64>,
|
||||
branch: Option<String>,
|
||||
worktree: Option<String>,
|
||||
original_branch: Option<String>,
|
||||
original_commit: Option<String>,
|
||||
log_path: Option<String>,
|
||||
prompt: Option<String>,
|
||||
model: Option<String>,
|
||||
start_time: Option<u64>,
|
||||
end_time: Option<u64>,
|
||||
state: Option<String>,
|
||||
total_tokens: Option<u64>,
|
||||
input_tokens: Option<u64>,
|
||||
output_tokens: Option<u64>,
|
||||
reasoning_output_tokens: Option<u64>,
|
||||
automerge: Option<bool>,
|
||||
explicit_branch_name: Option<String>,
|
||||
last_update_time: Option<u64>,
|
||||
duration_secs: Option<u64>,
|
||||
}
|
||||
|
||||
pub fn run_inspect(cli: InspectCli) -> anyhow::Result<()> {
|
||||
let id = cli.id.to_lowercase();
|
||||
let jobs = load_job_records()?;
|
||||
let matches: Vec<JobFull> = jobs
|
||||
.into_iter()
|
||||
.filter(|j| j.job_id.starts_with(&id) || j.branch.as_deref().map(|b| b == id).unwrap_or(false))
|
||||
.collect();
|
||||
if matches.is_empty() {
|
||||
eprintln!("No job matches identifier '{}'.", id);
|
||||
return Ok(());
|
||||
}
|
||||
if matches.len() > 1 {
|
||||
eprintln!("Identifier '{}' is ambiguous; matches: {}", id, matches.iter().map(|m| &m.job_id[..8]).collect::<Vec<_>>().join(", "));
|
||||
return Ok(());
|
||||
}
|
||||
let job = &matches[0];
|
||||
if cli.json {
|
||||
println!("{}", serde_json::to_string_pretty(job)?);
|
||||
return Ok(());
|
||||
}
|
||||
print_human(job);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn base_dir() -> Option<PathBuf> {
|
||||
if let Ok(val) = std::env::var("CODEX_HOME") { if !val.is_empty() { return std::fs::canonicalize(val).ok(); } }
|
||||
let home = std::env::var_os("HOME")?;
|
||||
Some(PathBuf::from(home).join(".codex"))
|
||||
}
|
||||
|
||||
fn load_job_records() -> anyhow::Result<Vec<JobFull>> {
|
||||
let mut map: std::collections::HashMap<String, JobFull> = std::collections::HashMap::new();
|
||||
let Some(base) = base_dir() else { return Ok(vec![]); };
|
||||
let tasks = base.join("tasks.jsonl");
|
||||
if !tasks.exists() { return Ok(vec![]); }
|
||||
let f = File::open(tasks)?;
|
||||
let reader = BufReader::new(f);
|
||||
for line in reader.lines() {
|
||||
let Ok(line) = line else { continue };
|
||||
if line.trim().is_empty() { continue; }
|
||||
let Ok(val) = serde_json::from_str::<serde_json::Value>(&line) else { continue };
|
||||
let Ok(rec) = serde_json::from_value::<RawRecord>(val) else { continue };
|
||||
let Some(job_id) = rec.job_id.clone() else { continue };
|
||||
let entry = map.entry(job_id.clone()).or_insert_with(|| JobFull { job_id: job_id.clone(), ..Default::default() });
|
||||
// Initial metadata fields
|
||||
if rec.start_time.is_some() {
|
||||
entry.pid = rec.pid.or(entry.pid);
|
||||
entry.branch = rec.branch.or(entry.branch.clone());
|
||||
entry.worktree = rec.worktree.or(entry.worktree.clone());
|
||||
entry.original_branch = rec.original_branch.or(entry.original_branch.clone());
|
||||
entry.original_commit = rec.original_commit.or(entry.original_commit.clone());
|
||||
entry.log_path = rec.log_path.or(entry.log_path.clone());
|
||||
entry.prompt = rec.prompt.or(entry.prompt.clone());
|
||||
entry.model = rec.model.or(entry.model.clone());
|
||||
entry.start_time = rec.start_time.or(entry.start_time);
|
||||
entry.automerge = rec.automerge.or(entry.automerge);
|
||||
entry.explicit_branch_name = rec.explicit_branch_name.or(entry.explicit_branch_name.clone());
|
||||
}
|
||||
if let Some(state) = rec.state { entry.state = Some(state); }
|
||||
if rec.update_time.is_some() { entry.last_update_time = rec.update_time; }
|
||||
if rec.end_time.is_some() || rec.completion_time.is_some() {
|
||||
entry.end_time = rec.end_time.or(rec.completion_time).or(entry.end_time);
|
||||
}
|
||||
if let Some(tc) = rec.token_count.as_ref() {
|
||||
if let Some(total) = tc.get("total_tokens").and_then(|v| v.as_u64()) { entry.total_tokens = Some(total); }
|
||||
if let Some(inp) = tc.get("input_tokens").and_then(|v| v.as_u64()) { entry.input_tokens = Some(inp); }
|
||||
if let Some(out) = tc.get("output_tokens").and_then(|v| v.as_u64()) { entry.output_tokens = Some(out); }
|
||||
if let Some(rout) = tc.get("reasoning_output_tokens").and_then(|v| v.as_u64()) { entry.reasoning_output_tokens = Some(rout); }
|
||||
}
|
||||
}
|
||||
// Compute duration
|
||||
for j in map.values_mut() {
|
||||
if let (Some(s), Some(e)) = (j.start_time, j.end_time) { j.duration_secs = Some(e.saturating_sub(s)); }
|
||||
}
|
||||
Ok(map.into_values().collect())
|
||||
}
|
||||
|
||||
fn print_human(job: &JobFull) {
|
||||
println!("Job {}", job.job_id);
|
||||
println!("State: {}", job.state.as_deref().unwrap_or("?"));
|
||||
if let Some(model) = &job.model { println!("Model: {}", model); } else { println!("Model: {}", resolve_default_model()); }
|
||||
if let Some(branch) = &job.branch { println!("Branch: {}", branch); }
|
||||
if let Some(wt) = &job.worktree { println!("Worktree: {}", wt); }
|
||||
if let Some(ob) = &job.original_branch { println!("Original branch: {}", ob); }
|
||||
if let Some(oc) = &job.original_commit { println!("Original commit: {}", oc); }
|
||||
if let Some(start) = job.start_time { println!("Start: {}", format_epoch(start)); }
|
||||
if let Some(end) = job.end_time { println!("End: {}", format_epoch(end)); }
|
||||
if let Some(d) = job.duration_secs { println!("Duration: {}s", d); }
|
||||
if let Some(pid) = job.pid { println!("PID: {}", pid); }
|
||||
if let Some(log) = &job.log_path { println!("Log: {}", log); }
|
||||
if let Some(am) = job.automerge { println!("Automerge: {}", am); }
|
||||
if let Some(exp) = &job.explicit_branch_name { println!("Explicit branch name: {}", exp); }
|
||||
if let Some(total) = job.total_tokens { println!("Total tokens: {}", total); }
|
||||
if job.input_tokens.is_some() || job.output_tokens.is_some() {
|
||||
println!(" Input: {:?} Output: {:?} Reasoning: {:?}", job.input_tokens, job.output_tokens, job.reasoning_output_tokens);
|
||||
}
|
||||
if let Some(p) = &job.prompt { println!("Prompt:\n{}", p); }
|
||||
}
|
||||
|
||||
fn format_epoch(secs: u64) -> String {
|
||||
use chrono::{TimeZone, Utc};
|
||||
if let Some(dt) = Utc.timestamp_opt(secs as i64, 0).single() { dt.to_rfc3339() } else { secs.to_string() }
|
||||
}
|
||||
|
||||
fn resolve_default_model() -> String {
|
||||
if let Some(base) = base_dir() {
|
||||
let candidates = ["config.json", "config.yaml", "config.yml"];
|
||||
for name in candidates {
|
||||
let p = base.join(name);
|
||||
if p.exists() {
|
||||
if let Ok(raw) = fs::read_to_string(&p) {
|
||||
if name.ends_with(".json") {
|
||||
if let Ok(v) = serde_json::from_str::<serde_json::Value>(&raw) {
|
||||
if let Some(m) = v.get("model").and_then(|x| x.as_str()) { if !m.trim().is_empty() { return m.to_string(); } }
|
||||
}
|
||||
} else {
|
||||
for line in raw.lines() { if let Some(rest) = line.trim().strip_prefix("model:") { let val = rest.trim().trim_matches('"'); if !val.is_empty() { return val.to_string(); } } }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
"codex-mini-latest".to_string()
|
||||
}
|
||||
217
codex-rs/cli/src/jobs.rs
Normal file
217
codex-rs/cli/src/jobs.rs
Normal file
@@ -0,0 +1,217 @@
|
||||
use clap::{Parser, Subcommand};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::fs::File;
|
||||
use std::io::{BufRead, BufReader};
|
||||
use std::fs;
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
pub struct JobsCli {
|
||||
#[command(subcommand)]
|
||||
pub cmd: JobsCommand,
|
||||
}
|
||||
|
||||
#[derive(Debug, Subcommand)]
|
||||
pub enum JobsCommand {
|
||||
/// List background concurrent jobs (from ~/.codex/tasks.jsonl)
|
||||
Ls(JobsListArgs),
|
||||
}
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
pub struct JobsListArgs {
|
||||
/// Output raw JSON instead of table
|
||||
#[arg(long)]
|
||||
pub json: bool,
|
||||
/// Limit number of jobs displayed (most recent first)
|
||||
#[arg(long)]
|
||||
pub limit: Option<usize>,
|
||||
/// Show completed jobs as well (by default only running jobs)
|
||||
#[arg(short = 'a', long = "all")]
|
||||
pub all: bool,
|
||||
/// Show all columns including prompt text
|
||||
#[arg(long = "all-columns")]
|
||||
pub all_columns: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct RawRecord {
|
||||
job_id: Option<String>,
|
||||
pid: Option<u64>,
|
||||
worktree: Option<String>,
|
||||
branch: Option<String>,
|
||||
original_branch: Option<String>,
|
||||
original_commit: Option<String>,
|
||||
log_path: Option<String>,
|
||||
prompt: Option<String>,
|
||||
model: Option<String>,
|
||||
start_time: Option<u64>,
|
||||
update_time: Option<u64>,
|
||||
token_count: Option<serde_json::Value>,
|
||||
state: Option<String>,
|
||||
completion_time: Option<u64>,
|
||||
end_time: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Default, Clone)]
|
||||
struct JobAggregate {
|
||||
job_id: String,
|
||||
pid: Option<u64>,
|
||||
branch: Option<String>,
|
||||
worktree: Option<String>,
|
||||
prompt: Option<String>,
|
||||
model: Option<String>,
|
||||
start_time: Option<u64>,
|
||||
last_update_time: Option<u64>,
|
||||
total_tokens: Option<u64>,
|
||||
state: Option<String>,
|
||||
end_time: Option<u64>,
|
||||
}
|
||||
|
||||
pub fn run_jobs(cmd: JobsCli) -> anyhow::Result<()> {
|
||||
match cmd.cmd {
|
||||
JobsCommand::Ls(args) => list_jobs(args),
|
||||
}
|
||||
}
|
||||
|
||||
fn base_dir() -> Option<std::path::PathBuf> {
|
||||
if let Ok(val) = std::env::var("CODEX_HOME") { if !val.is_empty() { return std::fs::canonicalize(val).ok(); } }
|
||||
let home = std::env::var_os("HOME")?;
|
||||
let base = std::path::PathBuf::from(home).join(".codex");
|
||||
Some(base)
|
||||
}
|
||||
|
||||
fn list_jobs(args: JobsListArgs) -> anyhow::Result<()> {
|
||||
let Some(base) = base_dir() else {
|
||||
println!("No home directory found; cannot locate tasks.jsonl");
|
||||
return Ok(());
|
||||
};
|
||||
let path = base.join("tasks.jsonl");
|
||||
if !path.exists() {
|
||||
println!("No tasks.jsonl found (no concurrent jobs recorded yet)");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let f = File::open(&path)?;
|
||||
let reader = BufReader::new(f);
|
||||
|
||||
let mut agg: HashMap<String, JobAggregate> = HashMap::new();
|
||||
for line_res in reader.lines() {
|
||||
let line = match line_res { Ok(l) => l, Err(_) => continue };
|
||||
if line.trim().is_empty() { continue; }
|
||||
let raw: serde_json::Value = match serde_json::from_str(&line) { Ok(v) => v, Err(_) => continue };
|
||||
let rec: RawRecord = match serde_json::from_value(raw) { Ok(r) => r, Err(_) => continue };
|
||||
let Some(job_id) = rec.job_id.clone() else { continue }; // must have job_id
|
||||
let entry = agg.entry(job_id.clone()).or_insert_with(|| JobAggregate { job_id: job_id.clone(), ..Default::default() });
|
||||
if rec.start_time.is_some() { // initial metadata line
|
||||
entry.pid = rec.pid.or(entry.pid);
|
||||
entry.branch = rec.branch.or(entry.branch.clone());
|
||||
entry.worktree = rec.worktree.or(entry.worktree.clone());
|
||||
entry.prompt = rec.prompt.or(entry.prompt.clone());
|
||||
entry.model = rec.model.or(entry.model.clone());
|
||||
entry.start_time = rec.start_time.or(entry.start_time);
|
||||
}
|
||||
if let Some(tc_val) = rec.token_count.as_ref() { if tc_val.is_object() { if let Some(total) = tc_val.get("total_tokens").and_then(|v| v.as_u64()) { entry.total_tokens = Some(total); } } }
|
||||
if rec.update_time.is_some() { entry.last_update_time = rec.update_time; }
|
||||
if let Some(state) = rec.state { entry.state = Some(state); }
|
||||
if rec.completion_time.is_some() || rec.end_time.is_some() {
|
||||
entry.end_time = rec.end_time.or(rec.completion_time).or(entry.end_time);
|
||||
}
|
||||
}
|
||||
|
||||
// Collect and sort by start_time desc
|
||||
let mut jobs: Vec<JobAggregate> = agg.into_values().collect();
|
||||
jobs.sort_by_key(|j| std::cmp::Reverse(j.start_time.unwrap_or(0)));
|
||||
|
||||
if !args.all { jobs.retain(|j| j.state.as_deref() != Some("done")); }
|
||||
if let Some(limit) = args.limit { jobs.truncate(limit); }
|
||||
|
||||
if args.json {
|
||||
println!("{}", serde_json::to_string_pretty(&jobs)?);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if jobs.is_empty() {
|
||||
println!("No jobs found");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Table header
|
||||
if args.all_columns {
|
||||
println!("{:<8} {:>6} {:<22} {:<12} {:<8} {:>8} {:<12} {}", "JOB_ID", "PID", "BRANCH", "START", "STATE", "TOKENS", "MODEL", "PROMPT");
|
||||
} else {
|
||||
// Widened branch column to 22 chars for better readability.
|
||||
println!("{:<8} {:>6} {:<22} {:<12} {:<8} {:>8} {:<12}", "JOB_ID", "PID", "BRANCH", "START", "STATE", "TOKENS", "MODEL");
|
||||
}
|
||||
for j in jobs {
|
||||
let job_short = if j.job_id.len() > 8 { &j.job_id[..8] } else { &j.job_id };
|
||||
let pid_str = j.pid.map(|p| p.to_string()).unwrap_or_default();
|
||||
let mut branch = j.branch.clone().unwrap_or_default();
|
||||
let branch_limit = if args.all_columns { 22 } else { 22 }; // unified width
|
||||
if branch.len() > branch_limit { branch.truncate(branch_limit); }
|
||||
let start = j.start_time.map(format_epoch_short).unwrap_or_default();
|
||||
let tokens = j.total_tokens.map(|t| t.to_string()).unwrap_or_default();
|
||||
let state = j.state.clone().unwrap_or_else(|| "?".into());
|
||||
let mut model = j.model.clone().unwrap_or_default();
|
||||
if model.trim().is_empty() { model = resolve_default_model(); }
|
||||
if model.is_empty() { model.push('-'); }
|
||||
if model.len() > 12 { model.truncate(12); }
|
||||
if args.all_columns {
|
||||
let mut prompt = j.prompt.clone().unwrap_or_default().replace('\n', " ");
|
||||
if prompt.len() > 60 { prompt.truncate(60); }
|
||||
println!("{:<8} {:>6} {:<22} {:<12} {:<8} {:>8} {:<12} {}", job_short, pid_str, branch, start, state, tokens, model, prompt);
|
||||
} else {
|
||||
println!("{:<8} {:>6} {:<22} {:<12} {:<8} {:>8} {:<12}", job_short, pid_str, branch, start, state, tokens, model);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn format_epoch_short(secs: u64) -> String {
|
||||
use chrono::{Datelike, Local, TimeZone};
|
||||
let dt = Local.timestamp_opt(secs as i64, 0).single();
|
||||
if let Some(dt) = dt {
|
||||
let now = Local::now();
|
||||
if dt.year() == now.year() {
|
||||
dt.format("%d %b %H:%M").to_string() // e.g. 22 Jul 11:56
|
||||
} else {
|
||||
dt.format("%d %b %Y").to_string() // older year
|
||||
}
|
||||
} else {
|
||||
String::new()
|
||||
}
|
||||
}
|
||||
|
||||
fn resolve_default_model() -> String {
|
||||
// Attempt to read config json/yaml for model, otherwise fallback to hardcoded default.
|
||||
if let Some(base) = base_dir() {
|
||||
let candidates = ["config.json", "config.yaml", "config.yml"];
|
||||
for name in candidates {
|
||||
let p = base.join(name);
|
||||
if p.exists() {
|
||||
if let Ok(raw) = fs::read_to_string(&p) {
|
||||
// Try JSON first.
|
||||
if name.ends_with(".json") {
|
||||
if let Ok(v) = serde_json::from_str::<serde_json::Value>(&raw) {
|
||||
if let Some(m) = v.get("model").and_then(|x| x.as_str()) {
|
||||
if !m.trim().is_empty() { return m.to_string(); }
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Very lightweight YAML parse: look for line starting with model:
|
||||
for line in raw.lines() {
|
||||
if let Some(rest) = line.trim().strip_prefix("model:") {
|
||||
let val = rest.trim().trim_matches('"');
|
||||
if !val.is_empty() {
|
||||
return val.to_string();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Fallback default agentic model used elsewhere.
|
||||
"codex-mini-latest".to_string()
|
||||
}
|
||||
@@ -3,6 +3,9 @@ pub mod debug_sandbox;
|
||||
mod exit_status;
|
||||
pub mod login;
|
||||
pub mod proto;
|
||||
pub mod jobs;
|
||||
pub mod logs;
|
||||
pub mod inspect;
|
||||
|
||||
use clap::Parser;
|
||||
use codex_common::CliConfigOverrides;
|
||||
|
||||
145
codex-rs/cli/src/logs.rs
Normal file
145
codex-rs/cli/src/logs.rs
Normal file
@@ -0,0 +1,145 @@
|
||||
use clap::Parser;
|
||||
use serde::Deserialize;
|
||||
use std::collections::HashMap;
|
||||
use std::fs::File;
|
||||
use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
|
||||
use std::path::PathBuf;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
pub struct LogsCli {
|
||||
/// Job identifier: full/short job UUID or branch name
|
||||
pub id: String,
|
||||
/// Follow log output (stream new lines)
|
||||
#[arg(short = 'f', long = "follow")]
|
||||
pub follow: bool,
|
||||
/// Show only the last N lines (like tail -n). If omitted, show full file.
|
||||
#[arg(short = 'n', long = "lines")]
|
||||
pub lines: Option<usize>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct RawRecord {
|
||||
job_id: Option<String>,
|
||||
branch: Option<String>,
|
||||
log_path: Option<String>,
|
||||
start_time: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct JobMeta {
|
||||
job_id: String,
|
||||
branch: Option<String>,
|
||||
log_path: String,
|
||||
start_time: Option<u64>,
|
||||
}
|
||||
|
||||
pub fn run_logs(cli: LogsCli) -> anyhow::Result<()> {
|
||||
let id = cli.id.to_lowercase();
|
||||
let jobs = load_jobs_index()?;
|
||||
if jobs.is_empty() {
|
||||
eprintln!("No jobs found in tasks.jsonl");
|
||||
return Ok(());
|
||||
}
|
||||
let matches: Vec<&JobMeta> = jobs
|
||||
.values()
|
||||
.filter(|meta| {
|
||||
meta.job_id.starts_with(&id) || meta.branch.as_deref().map(|b| b == id).unwrap_or(false)
|
||||
})
|
||||
.collect();
|
||||
if matches.is_empty() {
|
||||
eprintln!("No job matches identifier '{}'.", id);
|
||||
return Ok(());
|
||||
}
|
||||
if matches.len() > 1 {
|
||||
eprintln!("Identifier '{}' is ambiguous; matches: {}", id, matches.iter().map(|m| &m.job_id[..8]).collect::<Vec<_>>().join(", "));
|
||||
return Ok(());
|
||||
}
|
||||
let job = matches[0];
|
||||
let path = PathBuf::from(&job.log_path);
|
||||
if !path.exists() {
|
||||
eprintln!("Log file not found at {}", path.display());
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if cli.follow {
|
||||
tail_file(&path, cli.lines)?;
|
||||
} else {
|
||||
print_file(&path, cli.lines)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn base_dir() -> Option<PathBuf> {
|
||||
if let Ok(val) = std::env::var("CODEX_HOME") { if !val.is_empty() { return std::fs::canonicalize(val).ok(); } }
|
||||
let home = std::env::var_os("HOME")?;
|
||||
Some(PathBuf::from(home).join(".codex"))
|
||||
}
|
||||
|
||||
fn load_jobs_index() -> anyhow::Result<HashMap<String, JobMeta>> {
|
||||
let mut map: HashMap<String, JobMeta> = HashMap::new();
|
||||
let Some(base) = base_dir() else { return Ok(map); };
|
||||
let tasks = base.join("tasks.jsonl");
|
||||
if !tasks.exists() { return Ok(map); }
|
||||
let f = File::open(tasks)?;
|
||||
let reader = BufReader::new(f);
|
||||
for line in reader.lines() {
|
||||
let Ok(line) = line else { continue };
|
||||
if line.trim().is_empty() { continue; }
|
||||
let Ok(val) = serde_json::from_str::<serde_json::Value>(&line) else { continue };
|
||||
let Ok(rec) = serde_json::from_value::<RawRecord>(val) else { continue };
|
||||
let (Some(job_id), Some(log_path)) = (rec.job_id.clone(), rec.log_path.clone()) else { continue };
|
||||
// Insert or update only if not already present (we just need initial metadata)
|
||||
map.entry(job_id.clone()).or_insert(JobMeta {
|
||||
job_id,
|
||||
branch: rec.branch,
|
||||
log_path,
|
||||
start_time: rec.start_time,
|
||||
});
|
||||
}
|
||||
Ok(map)
|
||||
}
|
||||
|
||||
fn print_file(path: &PathBuf, last_lines: Option<usize>) -> anyhow::Result<()> {
|
||||
if let Some(n) = last_lines {
|
||||
let f = File::open(path)?;
|
||||
let reader = BufReader::new(f);
|
||||
let mut buf: std::collections::VecDeque<String> = std::collections::VecDeque::with_capacity(n);
|
||||
for line in reader.lines() {
|
||||
if let Ok(l) = line { if buf.len() == n { buf.pop_front(); } buf.push_back(l); }
|
||||
}
|
||||
for l in buf { println!("{}", l); }
|
||||
return Ok(());
|
||||
}
|
||||
// Full file
|
||||
let mut f = File::open(path)?;
|
||||
let mut contents = String::new();
|
||||
f.read_to_string(&mut contents)?;
|
||||
print!("{}", contents);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn tail_file(path: &PathBuf, last_lines: Option<usize>) -> anyhow::Result<()> {
|
||||
use std::io::{self};
|
||||
// Initial output
|
||||
if let Some(n) = last_lines { print_file(path, Some(n))?; } else { print_file(path, None)?; }
|
||||
let mut f = File::open(path)?;
|
||||
let mut pos = f.metadata()?.len();
|
||||
loop {
|
||||
thread::sleep(Duration::from_millis(500));
|
||||
let meta = match f.metadata() { Ok(m) => m, Err(_) => break };
|
||||
let len = meta.len();
|
||||
if len < pos { // truncated
|
||||
pos = 0;
|
||||
}
|
||||
if len > pos {
|
||||
f.seek(SeekFrom::Start(pos))?;
|
||||
let mut buf = String::new();
|
||||
f.read_to_string(&mut buf)?;
|
||||
if !buf.is_empty() { print!("{}", buf); io::Write::flush(&mut std::io::stdout())?; }
|
||||
pos = len;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -77,6 +77,15 @@ enum Subcommand {
|
||||
/// Apply the latest diff produced by Codex agent as a `git apply` to your local working tree.
|
||||
#[clap(visible_alias = "a")]
|
||||
Apply(ApplyCommand),
|
||||
|
||||
/// Manage / inspect concurrent background jobs.
|
||||
Jobs(codex_cli::jobs::JobsCli),
|
||||
|
||||
/// Show or follow logs for a specific job.
|
||||
Logs(codex_cli::logs::LogsCli),
|
||||
|
||||
/// Inspect full metadata for a job.
|
||||
Inspect(codex_cli::inspect::InspectCli),
|
||||
}
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
@@ -176,6 +185,15 @@ async fn cli_main(codex_linux_sandbox_exe: Option<PathBuf>) -> anyhow::Result<()
|
||||
prepend_config_flags(&mut apply_cli.config_overrides, cli.config_overrides);
|
||||
run_apply_command(apply_cli).await?;
|
||||
}
|
||||
Some(Subcommand::Jobs(jobs_cli)) => {
|
||||
codex_cli::jobs::run_jobs(jobs_cli)?;
|
||||
}
|
||||
Some(Subcommand::Logs(logs_cli)) => {
|
||||
codex_cli::logs::run_logs(logs_cli)?;
|
||||
}
|
||||
Some(Subcommand::Inspect(inspect_cli)) => {
|
||||
codex_cli::inspect::run_inspect(inspect_cli)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -22,10 +22,26 @@ use shlex::try_join;
|
||||
use std::collections::HashMap;
|
||||
use std::io::Write;
|
||||
use std::time::Instant;
|
||||
use std::path::Path;
|
||||
|
||||
use crate::event_processor::EventProcessor;
|
||||
use crate::event_processor::create_config_summary_entries;
|
||||
|
||||
// Helper: determine base ~/.codex directory similar to concurrent module.
|
||||
fn codex_base_dir_for_logging() -> Option<std::path::PathBuf> {
|
||||
if let Ok(val) = std::env::var("CODEX_HOME") { if !val.is_empty() { return std::fs::canonicalize(val).ok(); } }
|
||||
let home = std::env::var_os("HOME")?;
|
||||
let base = std::path::PathBuf::from(home).join(".codex");
|
||||
let _ = std::fs::create_dir_all(&base);
|
||||
Some(base)
|
||||
}
|
||||
|
||||
fn append_json_line(path: &Path, value: &serde_json::Value) -> std::io::Result<()> {
|
||||
use std::io::Write as _;
|
||||
let mut f = std::fs::OpenOptions::new().create(true).append(true).open(path)?;
|
||||
writeln!(f, "{}", value.to_string())
|
||||
}
|
||||
|
||||
/// This should be configurable. When used in CI, users may not want to impose
|
||||
/// a limit so they can see the full transcript.
|
||||
const MAX_OUTPUT_LINES_FOR_EXEC_TOOL_CALL: usize = 20;
|
||||
@@ -54,6 +70,7 @@ pub(crate) struct EventProcessorWithHumanOutput {
|
||||
show_agent_reasoning: bool,
|
||||
answer_started: bool,
|
||||
reasoning_started: bool,
|
||||
last_token_usage: Option<TokenUsage>,
|
||||
}
|
||||
|
||||
impl EventProcessorWithHumanOutput {
|
||||
@@ -77,6 +94,7 @@ impl EventProcessorWithHumanOutput {
|
||||
show_agent_reasoning: !config.hide_agent_reasoning,
|
||||
answer_started: false,
|
||||
reasoning_started: false,
|
||||
last_token_usage: None,
|
||||
}
|
||||
} else {
|
||||
Self {
|
||||
@@ -93,6 +111,7 @@ impl EventProcessorWithHumanOutput {
|
||||
show_agent_reasoning: !config.hide_agent_reasoning,
|
||||
answer_started: false,
|
||||
reasoning_started: false,
|
||||
last_token_usage: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -168,11 +187,60 @@ impl EventProcessor for EventProcessorWithHumanOutput {
|
||||
EventMsg::BackgroundEvent(BackgroundEventEvent { message }) => {
|
||||
ts_println!(self, "{}", message.style(self.dimmed));
|
||||
}
|
||||
EventMsg::TaskStarted | EventMsg::TaskComplete(_) => {
|
||||
// Ignore.
|
||||
EventMsg::TaskStarted => {
|
||||
// no-op
|
||||
}
|
||||
EventMsg::TokenCount(TokenUsage { total_tokens, .. }) => {
|
||||
ts_println!(self, "tokens used: {total_tokens}");
|
||||
EventMsg::TaskComplete(_) => {
|
||||
// On completion, append a final state entry with last token count snapshot.
|
||||
if let Ok(job_id) = std::env::var("CODEX_JOB_ID") {
|
||||
if let Some(base) = codex_base_dir_for_logging() {
|
||||
let tasks_path = base.join("tasks.jsonl");
|
||||
let ts = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.map(|d| d.as_secs())
|
||||
.unwrap_or(0);
|
||||
let token_json = self.last_token_usage.as_ref().map(|u| serde_json::json!({
|
||||
"input_tokens": u.input_tokens,
|
||||
"cached_input_tokens": u.cached_input_tokens,
|
||||
"output_tokens": u.output_tokens,
|
||||
"reasoning_output_tokens": u.reasoning_output_tokens,
|
||||
"total_tokens": u.total_tokens,
|
||||
}));
|
||||
let mut obj = serde_json::json!({
|
||||
"job_id": job_id,
|
||||
"completion_time": ts,
|
||||
"end_time": ts,
|
||||
"state": "done",
|
||||
});
|
||||
if let Some(tj) = token_json { if let serde_json::Value::Object(ref mut map) = obj { map.insert("token_count".to_string(), tj); } }
|
||||
let _ = append_json_line(&tasks_path, &obj);
|
||||
}
|
||||
}
|
||||
}
|
||||
EventMsg::TokenCount(token_usage_full) => {
|
||||
self.last_token_usage = Some(token_usage_full.clone());
|
||||
ts_println!(self, "tokens used: {}", token_usage_full.total_tokens);
|
||||
if let Ok(job_id) = std::env::var("CODEX_JOB_ID") {
|
||||
if let Some(base) = codex_base_dir_for_logging() {
|
||||
let tasks_path = base.join("tasks.jsonl");
|
||||
let ts = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.map(|d| d.as_secs())
|
||||
.unwrap_or(0);
|
||||
let full = serde_json::json!({
|
||||
"job_id": job_id,
|
||||
"update_time": ts,
|
||||
"token_count": {
|
||||
"input_tokens": token_usage_full.input_tokens,
|
||||
"cached_input_tokens": token_usage_full.cached_input_tokens,
|
||||
"output_tokens": token_usage_full.output_tokens,
|
||||
"reasoning_output_tokens": token_usage_full.reasoning_output_tokens,
|
||||
"total_tokens": token_usage_full.total_tokens,
|
||||
}
|
||||
});
|
||||
let _ = append_json_line(&tasks_path, &full);
|
||||
}
|
||||
}
|
||||
}
|
||||
EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }) => {
|
||||
if !self.answer_started {
|
||||
|
||||
Reference in New Issue
Block a user