Auto-export agent job CSV on success

This commit is contained in:
Dave Aitel
2026-02-05 13:05:12 -05:00
parent 001cfb5bba
commit caddbe161c
6 changed files with 76 additions and 12 deletions

View File

@@ -53,6 +53,7 @@ struct SpawnAgentsOnCsvArgs {
output_csv_path: Option<String>,
output_schema: Option<Value>,
max_concurrency: Option<usize>,
auto_export: Option<bool>,
}
#[derive(Debug, Deserialize)]
@@ -64,6 +65,7 @@ struct JobIdArgs {
struct RunAgentJobArgs {
job_id: String,
max_concurrency: Option<usize>,
auto_export: Option<bool>,
}
#[derive(Debug, Deserialize)]
@@ -129,6 +131,7 @@ struct JobRunnerOptions {
max_concurrency: usize,
spawn_config: Config,
child_depth: i32,
auto_export: bool,
}
#[async_trait]
@@ -265,12 +268,14 @@ mod spawn_agents_on_csv {
let job_name = args
.job_name
.unwrap_or_else(|| format!("agent-job-{job_suffix}"));
let auto_export = args.auto_export.unwrap_or(true);
let job = db
.create_agent_job(
&codex_state::AgentJobCreateParams {
id: job_id.clone(),
name: job_name,
instruction: args.instruction,
auto_export,
output_schema_json: args.output_schema,
input_headers: headers,
input_csv_path: input_path.display().to_string(),
@@ -283,7 +288,8 @@ mod spawn_agents_on_csv {
FunctionCallError::RespondToModel(format!("failed to create agent job: {err}"))
})?;
let options = build_runner_options(&session, &turn, args.max_concurrency).await?;
let options =
build_runner_options(&session, &turn, args.max_concurrency, Some(auto_export)).await?;
let started = start_job_runner(session, job_id.clone(), options).await?;
let content = serde_json::to_string(&SpawnAgentsOnCsvResult {
@@ -315,21 +321,24 @@ mod run_agent_job {
let args: RunAgentJobArgs = parse_arguments(arguments.as_str())?;
let job_id = args.job_id;
let db = required_state_db(&session)?;
if db
let job = db
.get_agent_job(job_id.as_str())
.await
.map_err(|err| {
FunctionCallError::RespondToModel(format!(
"failed to look up agent job {job_id}: {err}"
"failed to load agent job {job_id}: {err}"
))
})?
.is_none()
{
return Err(FunctionCallError::RespondToModel(format!(
"agent job {job_id} not found"
)));
}
let options = build_runner_options(&session, &turn, args.max_concurrency).await?;
.ok_or_else(|| {
FunctionCallError::RespondToModel(format!("agent job {job_id} not found"))
})?;
let options = build_runner_options(
&session,
&turn,
args.max_concurrency,
args.auto_export.or(Some(job.auto_export)),
)
.await?;
let started = start_job_runner(session, job_id.clone(), options).await?;
let status = render_job_status(db, job_id.as_str()).await?;
let content = serde_json::to_string(&json!({
@@ -552,6 +561,7 @@ async fn build_runner_options(
session: &Arc<Session>,
turn: &Arc<TurnContext>,
requested_concurrency: Option<usize>,
requested_auto_export: Option<bool>,
) -> Result<JobRunnerOptions, FunctionCallError> {
let session_source = turn.session_source.clone();
let child_depth = next_thread_spawn_depth(&session_source);
@@ -568,6 +578,7 @@ async fn build_runner_options(
max_concurrency,
spawn_config,
child_depth,
auto_export: requested_auto_export.unwrap_or(true),
})
}
@@ -770,11 +781,34 @@ async fn run_agent_job_loop(
db.mark_agent_job_failed(job_id.as_str(), message.as_str())
.await?;
} else {
if options.auto_export {
if let Err(err) = export_job_csv_snapshot(db.clone(), &job).await {
let message = format!("auto-export failed: {err}");
db.mark_agent_job_failed(job_id.as_str(), message.as_str())
.await?;
return Ok(());
}
}
db.mark_agent_job_completed(job_id.as_str()).await?;
}
Ok(())
}
async fn export_job_csv_snapshot(
db: Arc<codex_state::StateRuntime>,
job: &codex_state::AgentJob,
) -> anyhow::Result<()> {
let items = db.list_agent_job_items(job.id.as_str(), None, None).await?;
let csv_content = render_job_csv(job.input_headers.as_slice(), items.as_slice())
.map_err(|err| anyhow::anyhow!("failed to render job csv for auto-export: {err}"))?;
let output_path = PathBuf::from(job.output_csv_path.clone());
if let Some(parent) = output_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
tokio::fs::write(&output_path, csv_content).await?;
Ok(())
}
async fn recover_running_items(
session: Arc<Session>,
db: Arc<codex_state::StateRuntime>,

View File

@@ -531,6 +531,15 @@ fn create_spawn_agents_on_csv_tool() -> ToolSpec {
),
},
);
properties.insert(
"auto_export".to_string(),
JsonSchema::Boolean {
description: Some(
"Whether to auto-export CSV output on successful completion (default true)."
.to_string(),
),
},
);
ToolSpec::Function(ResponsesApiTool {
name: "spawn_agents_on_csv".to_string(),
description: "Create and run a batch agent job over a CSV file.".to_string(),
@@ -560,6 +569,15 @@ fn create_run_agent_job_tool() -> ToolSpec {
),
},
);
properties.insert(
"auto_export".to_string(),
JsonSchema::Boolean {
description: Some(
"Whether to auto-export CSV output on successful completion (default true)."
.to_string(),
),
},
);
ToolSpec::Function(ResponsesApiTool {
name: "run_agent_job".to_string(),
description: "Start or resume execution of an existing agent job.".to_string(),

View File

@@ -26,6 +26,7 @@ Optional args:
- `output_csv_path`: destination for CSV export (defaults to `<input>.agent-job-<id>.csv`).
- `output_schema`: JSON schema for result payloads (best-effort guidance).
- `max_concurrency`: cap on parallel workers.
- `auto_export`: auto-export CSV on successful completion (default true).
### `run_agent_job`
@@ -53,7 +54,8 @@ Workers must call this exactly once to report a JSON object for their assigned i
2. The job runner spawns subagents up to `max_concurrency`.
3. Each worker processes one item and calls `report_agent_job_result`.
4. The runner marks items completed after the worker finishes.
5. CSV export is generated by a single writer from the SQLite store.
5. If `auto_export` is true, the runner writes a CSV snapshot on successful completion.
6. CSV export can also be generated manually by a single writer from the SQLite store.
## CSV Output

View File

@@ -0,0 +1,2 @@
ALTER TABLE agent_jobs
ADD COLUMN auto_export INTEGER NOT NULL DEFAULT 1;

View File

@@ -79,6 +79,7 @@ pub struct AgentJob {
pub name: String,
pub status: AgentJobStatus,
pub instruction: String,
pub auto_export: bool,
pub output_schema_json: Option<Value>,
pub input_headers: Vec<String>,
pub input_csv_path: String,
@@ -122,6 +123,7 @@ pub struct AgentJobCreateParams {
pub id: String,
pub name: String,
pub instruction: String,
pub auto_export: bool,
pub output_schema_json: Option<Value>,
pub input_headers: Vec<String>,
pub input_csv_path: String,
@@ -142,6 +144,7 @@ pub(crate) struct AgentJobRow {
pub(crate) name: String,
pub(crate) status: String,
pub(crate) instruction: String,
pub(crate) auto_export: i64,
pub(crate) output_schema_json: Option<String>,
pub(crate) input_headers_json: String,
pub(crate) input_csv_path: String,
@@ -160,6 +163,7 @@ impl AgentJobRow {
name: row.try_get("name")?,
status: row.try_get("status")?,
instruction: row.try_get("instruction")?,
auto_export: row.try_get("auto_export")?,
output_schema_json: row.try_get("output_schema_json")?,
input_headers_json: row.try_get("input_headers_json")?,
input_csv_path: row.try_get("input_csv_path")?,
@@ -188,6 +192,7 @@ impl TryFrom<AgentJobRow> for AgentJob {
name: value.name,
status: AgentJobStatus::parse(value.status.as_str())?,
instruction: value.instruction,
auto_export: value.auto_export != 0,
output_schema_json,
input_headers,
input_csv_path: value.input_csv_path,

View File

@@ -741,6 +741,7 @@ INSERT INTO agent_jobs (
name,
status,
instruction,
auto_export,
output_schema_json,
input_headers_json,
input_csv_path,
@@ -750,13 +751,14 @@ INSERT INTO agent_jobs (
started_at,
completed_at,
last_error
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL, NULL, NULL)
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL, NULL, NULL)
"#,
)
.bind(params.id.as_str())
.bind(params.name.as_str())
.bind(AgentJobStatus::Pending.as_str())
.bind(params.instruction.as_str())
.bind(i64::from(params.auto_export))
.bind(output_schema_json)
.bind(input_headers_json)
.bind(params.input_csv_path.as_str())
@@ -816,6 +818,7 @@ SELECT
name,
status,
instruction,
auto_export,
output_schema_json,
input_headers_json,
input_csv_path,