Compare commits

...

1 Commits

Author SHA1 Message Date
Dave Aitel
0eb7c42ae1 Add dynamic queue agent jobs 2026-03-12 15:54:27 -04:00
10 changed files with 2586 additions and 176 deletions

View File

@@ -50,6 +50,37 @@ struct SpawnAgentsOnCsvArgs {
max_runtime_seconds: Option<u64>,
}
const DEFAULT_AGENT_QUEUE_MAX_ITEMS: usize = 1000;
const CSV_AGENT_JOB_SOURCE_PREFIX: &str = "agent_job:csv:";
const QUEUE_AGENT_JOB_SOURCE_PREFIX: &str = "agent_job:queue:";
#[derive(Debug, Clone, Deserialize, PartialEq)]
struct QueueJobItemArgs {
input: Value,
item_id: Option<String>,
dedupe_key: Option<String>,
}
#[derive(Debug, Deserialize)]
struct SpawnAgentsOnQueueArgs {
seed_items: Option<Vec<QueueJobItemArgs>>,
seed_path: Option<String>,
instruction: String,
output_jsonl_path: Option<String>,
output_schema: Option<Value>,
max_concurrency: Option<usize>,
max_workers: Option<usize>,
max_runtime_seconds: Option<u64>,
max_items: Option<usize>,
}
#[derive(Debug, Deserialize)]
struct EnqueueAgentJobItemsArgs {
job_id: String,
parent_item_id: String,
items: Vec<QueueJobItemArgs>,
}
#[derive(Debug, Deserialize)]
struct ReportAgentJobResultArgs {
job_id: String,
@@ -70,6 +101,25 @@ struct SpawnAgentsOnCsvResult {
failed_item_errors: Option<Vec<AgentJobFailureSummary>>,
}
#[derive(Debug, Serialize)]
struct SpawnAgentsOnQueueResult {
job_id: String,
status: String,
output_jsonl_path: String,
total_items: usize,
pending_items: usize,
completed_items: usize,
failed_items: usize,
job_error: Option<String>,
failed_item_errors: Option<Vec<QueueAgentJobFailureSummary>>,
}
#[derive(Debug, Serialize)]
struct QueueAgentJobFailureSummary {
item_id: String,
dedupe_key: Option<String>,
last_error: String,
}
#[derive(Debug, Serialize)]
struct AgentJobFailureSummary {
item_id: String,
@@ -93,6 +143,22 @@ struct ReportAgentJobResultToolResult {
accepted: bool,
}
#[derive(Debug, Serialize)]
struct EnqueueAgentJobItemsToolResult {
accepted_count: usize,
duplicate_count: usize,
rejected_count: usize,
items: Vec<EnqueueAgentJobItemsToolItemResult>,
}
#[derive(Debug, Serialize)]
struct EnqueueAgentJobItemsToolItemResult {
index: usize,
status: String,
item_id: Option<String>,
dedupe_key: Option<String>,
reason: Option<String>,
}
#[derive(Debug, Clone)]
struct JobRunnerOptions {
max_concurrency: usize,
@@ -203,7 +269,15 @@ impl ToolHandler for BatchJobHandler {
};
match tool_name.as_str() {
"spawn_agents_on_csv" => spawn_agents_on_csv::handle(session, turn, arguments).await,
"spawn_agents_on_csv" => {
spawn_agents_on_csv::handle(session.clone(), turn.clone(), arguments).await
}
"spawn_agents_on_queue" => {
spawn_agents_on_queue::handle(session.clone(), turn, arguments).await
}
"enqueue_agent_job_items" => {
enqueue_agent_job_items::handle(session.clone(), arguments).await
}
"report_agent_job_result" => report_agent_job_result::handle(session, arguments).await,
other => Err(FunctionCallError::RespondToModel(format!(
"unsupported agent job tool {other}"
@@ -297,8 +371,10 @@ mod spawn_agents_on_csv {
.collect::<serde_json::Map<_, _>>();
items.push(codex_state::AgentJobItemCreateParams {
item_id,
parent_item_id: None,
row_index: idx as i64,
source_id,
dedupe_key: None,
row_json: Value::Object(row_object),
});
}
@@ -314,89 +390,36 @@ mod spawn_agents_on_csv {
args.max_runtime_seconds
.or(turn.config.agent_job_max_runtime_seconds),
)?;
let _job = db
.create_agent_job(
&codex_state::AgentJobCreateParams {
id: job_id.clone(),
name: job_name,
instruction: args.instruction,
auto_export: true,
max_runtime_seconds,
output_schema_json: args.output_schema,
input_headers: headers,
input_csv_path: input_path.display().to_string(),
output_csv_path: output_csv_path.display().to_string(),
},
items.as_slice(),
)
.await
.map_err(|err| {
FunctionCallError::RespondToModel(format!("failed to create agent job: {err}"))
})?;
let requested_concurrency = args.max_concurrency.or(args.max_workers);
let options = match build_runner_options(&session, &turn, requested_concurrency).await {
Ok(options) => options,
Err(err) => {
let error_message = err.to_string();
let _ = db
.mark_agent_job_failed(job_id.as_str(), error_message.as_str())
.await;
return Err(err);
}
};
db.mark_agent_job_running(job_id.as_str())
.await
.map_err(|err| {
FunctionCallError::RespondToModel(format!(
"failed to transition agent job {job_id} to running: {err}"
))
})?;
let max_threads = turn.config.agent_max_threads;
let effective_concurrency = options.max_concurrency;
let message = format!(
"agent job concurrency: job_id={job_id} requested={requested_concurrency:?} max_threads={max_threads:?} effective={effective_concurrency}"
);
let _ = session.notify_background_event(&turn, message).await;
if let Err(err) = run_agent_job_loop(
session.clone(),
turn.clone(),
db.clone(),
job_id.clone(),
options,
db.create_agent_job(
&codex_state::AgentJobCreateParams {
id: job_id.clone(),
name: job_name,
kind: codex_state::AgentJobKind::CsvBatch,
instruction: args.instruction,
auto_export: true,
max_items: None,
max_runtime_seconds,
output_schema_json: args.output_schema,
input_headers: headers,
input_csv_path: input_path.display().to_string(),
output_csv_path: output_csv_path.display().to_string(),
},
items.as_slice(),
)
.await
{
let error_message = format!("job runner failed: {err}");
let _ = db
.mark_agent_job_failed(job_id.as_str(), error_message.as_str())
.await;
return Err(FunctionCallError::RespondToModel(format!(
"agent job {job_id} failed: {err}"
)));
}
.map_err(|err| {
FunctionCallError::RespondToModel(format!("failed to create agent job: {err}"))
})?;
let job = db
.get_agent_job(job_id.as_str())
.await
.map_err(|err| {
FunctionCallError::RespondToModel(format!(
"failed to load agent job {job_id}: {err}"
))
})?
.ok_or_else(|| {
FunctionCallError::RespondToModel(format!("agent job {job_id} not found"))
})?;
let output_path = PathBuf::from(job.output_csv_path.clone());
if !tokio::fs::try_exists(&output_path).await.unwrap_or(false) {
export_job_csv_snapshot(db.clone(), &job)
.await
.map_err(|err| {
FunctionCallError::RespondToModel(format!(
"failed to export output csv {job_id}: {err}"
))
})?;
}
let requested_concurrency = args.max_concurrency.or(args.max_workers);
let job = run_agent_job_and_load(
session,
turn,
db.clone(),
job_id.clone(),
requested_concurrency,
)
.await?;
let progress = db
.get_agent_job_progress(job_id.as_str())
.await
@@ -461,6 +484,176 @@ mod spawn_agents_on_csv {
}
}
mod spawn_agents_on_queue {
use super::*;
pub async fn handle(
session: Arc<Session>,
turn: Arc<TurnContext>,
arguments: String,
) -> Result<FunctionToolOutput, FunctionCallError> {
let args: SpawnAgentsOnQueueArgs = parse_arguments(arguments.as_str())?;
if args.instruction.trim().is_empty() {
return Err(FunctionCallError::RespondToModel(
"instruction must be non-empty".to_string(),
));
}
let db = required_state_db(&session)?;
let max_items = normalize_max_items(args.max_items)?;
let (seed_items, seed_path) =
parse_queue_seed_inputs(&turn, args.seed_items, args.seed_path).await?;
let items = build_initial_queue_job_items(seed_items.as_slice(), max_items)?;
let job_id = Uuid::new_v4().to_string();
let output_jsonl_path = args.output_jsonl_path.map_or_else(
|| {
seed_path.as_ref().map_or_else(
|| default_output_jsonl_path_for_inline(&turn, job_id.as_str()),
|path| default_output_jsonl_path_for_seed(path.as_path(), job_id.as_str()),
)
},
|path| turn.resolve_path(Some(path)),
);
let job_suffix = &job_id[..8];
let job_name = format!("agent-queue-{job_suffix}");
let max_runtime_seconds = normalize_max_runtime_seconds(
args.max_runtime_seconds
.or(turn.config.agent_job_max_runtime_seconds),
)?;
db.create_agent_job(
&codex_state::AgentJobCreateParams {
id: job_id.clone(),
name: job_name,
kind: codex_state::AgentJobKind::DynamicQueue,
instruction: args.instruction,
auto_export: true,
max_items: Some(max_items as u64),
max_runtime_seconds,
output_schema_json: args.output_schema,
input_headers: Vec::new(),
input_csv_path: seed_path
.as_ref()
.map(|path| path.display().to_string())
.unwrap_or_default(),
output_csv_path: output_jsonl_path.display().to_string(),
},
items.as_slice(),
)
.await
.map_err(|err| {
FunctionCallError::RespondToModel(format!("failed to create agent job: {err}"))
})?;
let requested_concurrency = args.max_concurrency.or(args.max_workers);
let job = run_agent_job_and_load(
session,
turn,
db.clone(),
job_id.clone(),
requested_concurrency,
)
.await?;
let progress = db
.get_agent_job_progress(job_id.as_str())
.await
.map_err(|err| {
FunctionCallError::RespondToModel(format!(
"failed to load agent job progress {job_id}: {err}"
))
})?;
let mut job_error = job.last_error.clone().filter(|err| !err.trim().is_empty());
let failed_item_errors = if progress.failed_items > 0 {
let items = db
.list_agent_job_items(
job_id.as_str(),
Some(codex_state::AgentJobItemStatus::Failed),
Some(5),
)
.await
.unwrap_or_default();
let summaries: Vec<_> = items
.into_iter()
.filter_map(|item| {
let last_error = item.last_error.unwrap_or_default();
if last_error.trim().is_empty() {
return None;
}
Some(QueueAgentJobFailureSummary {
item_id: item.item_id,
dedupe_key: item.dedupe_key,
last_error,
})
})
.collect();
if summaries.is_empty() {
if job_error.is_none() {
job_error = Some(
"agent job has failed items but no error details were recorded".to_string(),
);
}
None
} else {
Some(summaries)
}
} else {
None
};
let content = serde_json::to_string(&SpawnAgentsOnQueueResult {
job_id,
status: job.status.as_str().to_string(),
output_jsonl_path: job.output_csv_path,
total_items: progress.total_items,
pending_items: progress.pending_items,
completed_items: progress.completed_items,
failed_items: progress.failed_items,
job_error,
failed_item_errors,
})
.map_err(|err| {
FunctionCallError::Fatal(format!(
"failed to serialize spawn_agents_on_queue result: {err}"
))
})?;
Ok(FunctionToolOutput::from_text(content, Some(true)))
}
}
mod enqueue_agent_job_items {
use super::*;
pub async fn handle(
session: Arc<Session>,
arguments: String,
) -> Result<FunctionToolOutput, FunctionCallError> {
let args: EnqueueAgentJobItemsArgs = parse_arguments(arguments.as_str())?;
let db = required_state_db(&session)?;
let reporting_thread_id = session.conversation_id.to_string();
let items = build_enqueued_queue_job_items(args.items.as_slice())?;
let result = db
.enqueue_agent_job_items(
args.job_id.as_str(),
args.parent_item_id.as_str(),
reporting_thread_id.as_str(),
items.as_slice(),
)
.await
.map_err(|err| {
FunctionCallError::RespondToModel(format!(
"failed to enqueue queue items for {} / {}: {err}",
args.job_id, args.parent_item_id
))
})?;
let content = serde_json::to_string(&build_enqueue_agent_job_items_tool_result(&result))
.map_err(|err| {
FunctionCallError::Fatal(format!(
"failed to serialize enqueue_agent_job_items result: {err}"
))
})?;
Ok(FunctionToolOutput::from_text(content, Some(true)))
}
}
mod report_agent_job_result {
use super::*;
@@ -507,6 +700,73 @@ mod report_agent_job_result {
}
}
async fn run_agent_job_and_load(
session: Arc<Session>,
turn: Arc<TurnContext>,
db: Arc<codex_state::StateRuntime>,
job_id: String,
requested_concurrency: Option<usize>,
) -> Result<codex_state::AgentJob, FunctionCallError> {
let options = match build_runner_options(&session, &turn, requested_concurrency).await {
Ok(options) => options,
Err(err) => {
let error_message = err.to_string();
let _ = db
.mark_agent_job_failed(job_id.as_str(), error_message.as_str())
.await;
return Err(err);
}
};
db.mark_agent_job_running(job_id.as_str())
.await
.map_err(|err| {
FunctionCallError::RespondToModel(format!(
"failed to transition agent job {job_id} to running: {err}"
))
})?;
let max_threads = turn.config.agent_max_threads;
let effective_concurrency = options.max_concurrency;
let message = format!(
"agent job concurrency: job_id={job_id} requested={requested_concurrency:?} max_threads={max_threads:?} effective={effective_concurrency}"
);
let _ = session.notify_background_event(&turn, message).await;
if let Err(err) = run_agent_job_loop(
session.clone(),
turn.clone(),
db.clone(),
job_id.clone(),
options,
)
.await
{
let error_message = format!("job runner failed: {err}");
let _ = db
.mark_agent_job_failed(job_id.as_str(), error_message.as_str())
.await;
return Err(FunctionCallError::RespondToModel(format!(
"agent job {job_id} failed: {err}"
)));
}
let job = db
.get_agent_job(job_id.as_str())
.await
.map_err(|err| {
FunctionCallError::RespondToModel(format!("failed to load agent job {job_id}: {err}"))
})?
.ok_or_else(|| {
FunctionCallError::RespondToModel(format!("agent job {job_id} not found"))
})?;
let output_path = PathBuf::from(job.output_csv_path.clone());
if !tokio::fs::try_exists(&output_path).await.unwrap_or(false) {
export_job_snapshot(db, &job).await.map_err(|err| {
FunctionCallError::RespondToModel(format!(
"failed to export output for agent job {job_id}: {err}"
))
})?;
}
Ok(job)
}
fn required_state_db(
session: &Arc<Session>,
) -> Result<Arc<codex_state::StateRuntime>, FunctionCallError> {
@@ -560,6 +820,221 @@ fn normalize_max_runtime_seconds(requested: Option<u64>) -> Result<Option<u64>,
Ok(Some(requested))
}
fn normalize_max_items(requested: Option<usize>) -> Result<usize, FunctionCallError> {
let max_items = requested.unwrap_or(DEFAULT_AGENT_QUEUE_MAX_ITEMS);
if max_items == 0 {
return Err(FunctionCallError::RespondToModel(
"max_items must be >= 1".to_string(),
));
}
if max_items > i64::MAX as usize {
return Err(FunctionCallError::RespondToModel(
"max_items is too large".to_string(),
));
}
Ok(max_items)
}
async fn parse_queue_seed_inputs(
turn: &Arc<TurnContext>,
seed_items: Option<Vec<QueueJobItemArgs>>,
seed_path: Option<String>,
) -> Result<(Vec<QueueJobItemArgs>, Option<PathBuf>), FunctionCallError> {
match (seed_items, seed_path) {
(Some(items), None) => Ok((items, None)),
(None, Some(seed_path)) => {
if seed_path.trim().is_empty() {
return Err(FunctionCallError::RespondToModel(
"seed_path must be non-empty".to_string(),
));
}
let resolved_path = turn.resolve_path(Some(seed_path));
let display_path = resolved_path.display().to_string();
let content = tokio::fs::read_to_string(&resolved_path)
.await
.map_err(|err| {
FunctionCallError::RespondToModel(format!(
"failed to read queue seed input {display_path}: {err}"
))
})?;
let items = parse_queue_seed_content(content.as_str())?;
Ok((items, Some(resolved_path)))
}
(Some(_), Some(_)) | (None, None) => Err(FunctionCallError::RespondToModel(
"provide exactly one of seed_items or seed_path".to_string(),
)),
}
}
fn parse_queue_seed_content(content: &str) -> Result<Vec<QueueJobItemArgs>, FunctionCallError> {
let trimmed = content.trim();
if trimmed.is_empty() {
return Ok(Vec::new());
}
if trimmed.starts_with('[') {
return serde_json::from_str(trimmed).map_err(|err| {
FunctionCallError::RespondToModel(format!(
"failed to parse queue seed JSON array: {err}"
))
});
}
let mut items = Vec::new();
for (index, line) in content.lines().enumerate() {
let trimmed_line = line.trim();
if trimmed_line.is_empty() {
continue;
}
let item = serde_json::from_str::<QueueJobItemArgs>(trimmed_line).map_err(|err| {
let line_number = index + 1;
FunctionCallError::RespondToModel(format!(
"failed to parse queue seed JSONL line {line_number}: {err}"
))
})?;
items.push(item);
}
Ok(items)
}
fn build_initial_queue_job_items(
items: &[QueueJobItemArgs],
max_items: usize,
) -> Result<Vec<codex_state::AgentJobItemCreateParams>, FunctionCallError> {
let mut created_items = Vec::with_capacity(items.len().min(max_items));
let mut seen_item_ids = HashSet::new();
let mut seen_dedupe_keys = HashSet::new();
for (index, item) in items.iter().enumerate() {
let row_json = queue_item_input_object(item, index, "queue seed item")?;
let dedupe_key = normalize_optional_queue_text(item.dedupe_key.as_deref());
if let Some(dedupe_key) = dedupe_key.as_ref()
&& !seen_dedupe_keys.insert(dedupe_key.clone())
{
continue;
}
if created_items.len() >= max_items {
return Err(FunctionCallError::RespondToModel(format!(
"queue seeds exceed max_items limit of {max_items}"
)));
}
let base_item_id = normalize_optional_queue_text(item.item_id.as_deref())
.unwrap_or_else(|| format!("item-{}", index + 1));
let mut item_id = base_item_id.clone();
let mut suffix = 2usize;
while !seen_item_ids.insert(item_id.clone()) {
item_id = format!("{base_item_id}-{suffix}");
suffix = suffix.saturating_add(1);
}
created_items.push(codex_state::AgentJobItemCreateParams {
item_id,
parent_item_id: None,
row_index: created_items.len() as i64,
source_id: None,
dedupe_key,
row_json,
});
}
Ok(created_items)
}
fn build_enqueued_queue_job_items(
items: &[QueueJobItemArgs],
) -> Result<Vec<codex_state::AgentJobItemCreateParams>, FunctionCallError> {
let mut created_items = Vec::with_capacity(items.len());
for (index, item) in items.iter().enumerate() {
let row_json = queue_item_input_object(item, index, "enqueue item")?;
let item_id = normalize_optional_queue_text(item.item_id.as_deref())
.unwrap_or_else(|| format!("item-{}", index + 1));
created_items.push(codex_state::AgentJobItemCreateParams {
item_id,
parent_item_id: None,
row_index: index as i64,
source_id: None,
dedupe_key: normalize_optional_queue_text(item.dedupe_key.as_deref()),
row_json,
});
}
Ok(created_items)
}
fn queue_item_input_object(
item: &QueueJobItemArgs,
index: usize,
item_label: &str,
) -> Result<Value, FunctionCallError> {
let Some(object) = item.input.as_object() else {
let item_number = index + 1;
return Err(FunctionCallError::RespondToModel(format!(
"{item_label} {item_number} input must be a JSON object"
)));
};
Ok(Value::Object(object.clone()))
}
fn normalize_optional_queue_text(value: Option<&str>) -> Option<String> {
value
.filter(|value| !value.trim().is_empty())
.map(str::to_string)
}
fn build_enqueue_agent_job_items_tool_result(
result: &codex_state::EnqueueAgentJobItemsResult,
) -> EnqueueAgentJobItemsToolResult {
let mut accepted_count = 0usize;
let mut duplicate_count = 0usize;
let mut rejected_count = 0usize;
let mut items = Vec::with_capacity(result.outcomes.len());
for (index, outcome) in result.outcomes.iter().enumerate() {
match outcome {
codex_state::EnqueueAgentJobItemOutcome::Enqueued { item } => {
accepted_count = accepted_count.saturating_add(1);
items.push(EnqueueAgentJobItemsToolItemResult {
index,
status: "accepted".to_string(),
item_id: Some(item.item_id.clone()),
dedupe_key: item.dedupe_key.clone(),
reason: None,
});
}
codex_state::EnqueueAgentJobItemOutcome::Deduped { item } => {
duplicate_count = duplicate_count.saturating_add(1);
items.push(EnqueueAgentJobItemsToolItemResult {
index,
status: "duplicate".to_string(),
item_id: Some(item.item_id.clone()),
dedupe_key: item.dedupe_key.clone(),
reason: None,
});
}
codex_state::EnqueueAgentJobItemOutcome::MaxItemsReached {
requested_item_id,
dedupe_key,
..
} => {
rejected_count = rejected_count.saturating_add(1);
items.push(EnqueueAgentJobItemsToolItemResult {
index,
status: "rejected".to_string(),
item_id: Some(requested_item_id.clone()),
dedupe_key: dedupe_key.clone(),
reason: Some("max_items_reached".to_string()),
});
}
}
}
EnqueueAgentJobItemsToolResult {
accepted_count,
duplicate_count,
rejected_count,
items,
}
}
async fn run_agent_job_loop(
session: Arc<Session>,
turn: Arc<TurnContext>,
@@ -588,6 +1063,12 @@ async fn run_agent_job_loop(
.await?;
let mut cancel_requested = db.is_agent_job_cancelled(job_id.as_str()).await?;
let worker_source_label = match job.kind {
codex_state::AgentJobKind::CsvBatch => format!("{CSV_AGENT_JOB_SOURCE_PREFIX}{job_id}"),
codex_state::AgentJobKind::DynamicQueue => {
format!("{QUEUE_AGENT_JOB_SOURCE_PREFIX}{job_id}")
}
};
loop {
let mut progressed = false;
@@ -622,9 +1103,9 @@ async fn run_agent_job_loop(
.spawn_agent(
options.spawn_config.clone(),
items,
Some(SessionSource::SubAgent(SubAgentSource::Other(format!(
"agent_job:{job_id}"
)))),
Some(SessionSource::SubAgent(SubAgentSource::Other(
worker_source_label.clone(),
))),
)
.await
{
@@ -725,7 +1206,7 @@ async fn run_agent_job_loop(
}
let progress = db.get_agent_job_progress(job_id.as_str()).await?;
if let Err(err) = export_job_csv_snapshot(db.clone(), &job).await {
if let Err(err) = export_job_snapshot(db.clone(), &job).await {
let message = format!("auto-export failed: {err}");
db.mark_agent_job_failed(job_id.as_str(), message.as_str())
.await?;
@@ -755,6 +1236,16 @@ async fn run_agent_job_loop(
Ok(())
}
async fn export_job_snapshot(
db: Arc<codex_state::StateRuntime>,
job: &codex_state::AgentJob,
) -> anyhow::Result<()> {
match job.kind {
codex_state::AgentJobKind::CsvBatch => export_job_csv_snapshot(db, job).await,
codex_state::AgentJobKind::DynamicQueue => export_job_queue_snapshot(db, job).await,
}
}
async fn export_job_csv_snapshot(
db: Arc<codex_state::StateRuntime>,
job: &codex_state::AgentJob,
@@ -770,6 +1261,21 @@ async fn export_job_csv_snapshot(
Ok(())
}
async fn export_job_queue_snapshot(
db: Arc<codex_state::StateRuntime>,
job: &codex_state::AgentJob,
) -> anyhow::Result<()> {
let items = db.list_agent_job_items(job.id.as_str(), None, None).await?;
let jsonl_content = render_job_queue_jsonl(items.as_slice())
.map_err(|err| anyhow::anyhow!("failed to render job queue snapshot: {err}"))?;
let output_path = PathBuf::from(job.output_csv_path.clone());
if let Some(parent) = output_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
tokio::fs::write(&output_path, jsonl_content).await?;
Ok(())
}
async fn recover_running_items(
session: Arc<Session>,
db: Arc<codex_state::StateRuntime>,
@@ -944,23 +1450,93 @@ fn build_worker_prompt(
.transpose()?
.unwrap_or_else(|| "{}".to_string());
let row_json = serde_json::to_string_pretty(&item.row_json)?;
Ok(format!(
"You are processing one item for a generic agent job.\n\
Job ID: {job_id}\n\
Item ID: {item_id}\n\n\
Task instruction:\n\
{instruction}\n\n\
Input row (JSON):\n\
{row_json}\n\n\
Expected result schema (JSON Schema or {{}}):\n\
{output_schema}\n\n\
You MUST call the `report_agent_job_result` tool exactly once with:\n\
1. `job_id` = \"{job_id}\"\n\
2. `item_id` = \"{item_id}\"\n\
3. `result` = a JSON object that contains your analysis result for this row.\n\n\
If you need to stop the job early, include `stop` = true in the tool call.\n\n\
match job.kind {
codex_state::AgentJobKind::CsvBatch => Ok(format!(
"You are processing one item for a generic agent job.
\
Job ID: {job_id}
\
Item ID: {item_id}
\
Task instruction:
\
{instruction}
\
Input row (JSON):
\
{row_json}
\
Expected result schema (JSON Schema or {{}}):
\
{output_schema}
\
You MUST call the `report_agent_job_result` tool exactly once with:
\
1. `job_id` = \"{job_id}\"
\
2. `item_id` = \"{item_id}\"
\
3. `result` = a JSON object that contains your analysis result for this row.
\
If you need to stop the job early, include `stop` = true in the tool call.
\
After the tool call succeeds, stop.",
))
)),
codex_state::AgentJobKind::DynamicQueue => Ok(format!(
"You are processing one item in a queue-draining agent job.
\
Job ID: {job_id}
\
Item ID: {item_id}
\
Task instruction:
\
{instruction}
\
Input item (JSON):
\
{row_json}
\
Expected result schema (JSON Schema or {{}}):
\
{output_schema}
\
You may call the `enqueue_agent_job_items` tool zero or more times to add discovered work.
\
When calling `enqueue_agent_job_items`, use:
\
1. `job_id` = \"{job_id}\"
\
2. `parent_item_id` = \"{item_id}\"
\
3. `items` = an array of queue items, each with an `input` object and optional `item_id` and `dedupe_key`.
\
You MUST call the `report_agent_job_result` tool exactly once with:
\
1. `job_id` = \"{job_id}\"
\
2. `item_id` = \"{item_id}\"
\
3. `result` = a JSON object that contains your analysis result for this item.
\
If you need to stop the entire job early, include `stop` = true in the `report_agent_job_result` tool call.
\
After the final `report_agent_job_result` tool call succeeds, stop.",
)),
}
}
fn render_instruction_template(instruction: &str, row_json: &Value) -> String {
@@ -1034,6 +1610,52 @@ fn default_output_csv_path(input_csv_path: &Path, job_id: &str) -> PathBuf {
input_csv_path.with_file_name(format!("{stem}.agent-job-{job_suffix}.csv"))
}
fn default_output_jsonl_path_for_inline(turn: &TurnContext, job_id: &str) -> PathBuf {
turn.resolve_path(Some(format!("agent-queue-{}.jsonl", &job_id[..8])))
}
fn default_output_jsonl_path_for_seed(seed_path: &Path, job_id: &str) -> PathBuf {
let stem = seed_path
.file_stem()
.and_then(|value| value.to_str())
.filter(|value| !value.is_empty())
.unwrap_or("agent-queue");
let filename = format!("{stem}.agent-queue-{}.jsonl", &job_id[..8]);
seed_path.with_file_name(filename)
}
fn render_job_queue_jsonl(items: &[codex_state::AgentJobItem]) -> serde_json::Result<String> {
let mut lines = Vec::with_capacity(items.len());
for item in items {
lines.push(serde_json::to_string(&serde_json::json!({
"job_id": &item.job_id,
"item_id": &item.item_id,
"parent_item_id": &item.parent_item_id,
"dedupe_key": &item.dedupe_key,
"row_index": item.row_index,
"status": item.status.as_str(),
"attempt_count": item.attempt_count,
"input": &item.row_json,
"result": &item.result_json,
"last_error": &item.last_error,
"reported_at": item.reported_at.map(|value| value.to_rfc3339()),
"completed_at": item.completed_at.map(|value| value.to_rfc3339()),
}))?);
}
if lines.is_empty() {
Ok(String::new())
} else {
Ok(format!(
"{}
",
lines.join(
"
"
)
))
}
}
fn parse_csv(content: &str) -> Result<(Vec<String>, Vec<Vec<String>>), String> {
let mut reader = csv::ReaderBuilder::new()
.has_headers(true)

View File

@@ -1,7 +1,65 @@
use super::*;
use chrono::TimeZone;
use chrono::Utc;
use pretty_assertions::assert_eq;
use serde_json::json;
fn sample_queue_job() -> codex_state::AgentJob {
let now = Utc
.timestamp_opt(1_700_000_000, 0)
.single()
.expect("timestamp");
codex_state::AgentJob {
id: "job-1".to_string(),
name: "queue-job".to_string(),
kind: codex_state::AgentJobKind::DynamicQueue,
status: codex_state::AgentJobStatus::Running,
instruction: "Visit {url}".to_string(),
auto_export: true,
max_items: Some(1000),
max_runtime_seconds: Some(60),
output_schema_json: Some(json!({
"type": "object",
"properties": {
"item_id": { "type": "string" }
}
})),
input_headers: Vec::new(),
input_csv_path: String::new(),
output_csv_path: "/tmp/agent-queue.jsonl".to_string(),
created_at: now,
updated_at: now,
started_at: Some(now),
completed_at: None,
last_error: None,
}
}
fn sample_queue_item() -> codex_state::AgentJobItem {
let now = Utc
.timestamp_opt(1_700_000_000, 0)
.single()
.expect("timestamp");
codex_state::AgentJobItem {
job_id: "job-1".to_string(),
item_id: "root".to_string(),
parent_item_id: None,
row_index: 0,
source_id: None,
dedupe_key: Some("https://root.test".to_string()),
row_json: json!({ "url": "https://root.test" }),
status: codex_state::AgentJobItemStatus::Completed,
assigned_thread_id: None,
attempt_count: 1,
result_json: Some(json!({ "item_id": "root" })),
last_error: None,
created_at: now,
updated_at: now,
completed_at: Some(now),
reported_at: Some(now),
}
}
#[test]
fn parse_csv_supports_quotes_and_commas() {
let input = "id,name\n1,\"alpha, beta\"\n2,gamma\n";
@@ -60,3 +118,131 @@ fn ensure_unique_headers_rejects_duplicates() {
FunctionCallError::RespondToModel("csv header path is duplicated".to_string())
);
}
#[test]
fn parse_queue_seed_content_supports_json_array() {
let content = r#"[
{"item_id":"seed-1","input":{"url":"https://seed-1.test"}},
{"item_id":"seed-2","input":{"url":"https://seed-2.test"}}
]"#;
let parsed = parse_queue_seed_content(content).expect("queue json array");
assert_eq!(
parsed,
vec![
QueueJobItemArgs {
input: json!({ "url": "https://seed-1.test" }),
item_id: Some("seed-1".to_string()),
dedupe_key: None,
},
QueueJobItemArgs {
input: json!({ "url": "https://seed-2.test" }),
item_id: Some("seed-2".to_string()),
dedupe_key: None,
},
]
);
}
#[test]
fn parse_queue_seed_content_supports_jsonl() {
let content = concat!(
"{\"item_id\":\"seed-1\",\"input\":{\"url\":\"https://seed-1.test\"}}\n",
"\n",
"{\"item_id\":\"seed-2\",\"input\":{\"url\":\"https://seed-2.test\"}}\n",
);
let parsed = parse_queue_seed_content(content).expect("queue jsonl");
assert_eq!(
parsed,
vec![
QueueJobItemArgs {
input: json!({ "url": "https://seed-1.test" }),
item_id: Some("seed-1".to_string()),
dedupe_key: None,
},
QueueJobItemArgs {
input: json!({ "url": "https://seed-2.test" }),
item_id: Some("seed-2".to_string()),
dedupe_key: None,
},
]
);
}
#[test]
fn build_initial_queue_job_items_dedupes_and_suffixes_item_ids() {
let items = vec![
QueueJobItemArgs {
input: json!({ "url": "https://a.test" }),
item_id: Some("dup".to_string()),
dedupe_key: Some("https://a.test".to_string()),
},
QueueJobItemArgs {
input: json!({ "url": "https://duplicate.test" }),
item_id: Some("dup".to_string()),
dedupe_key: Some("https://a.test".to_string()),
},
QueueJobItemArgs {
input: json!({ "url": "https://b.test" }),
item_id: Some("dup".to_string()),
dedupe_key: None,
},
];
let built = build_initial_queue_job_items(items.as_slice(), 10).expect("build queue items");
assert_eq!(
built,
vec![
codex_state::AgentJobItemCreateParams {
item_id: "dup".to_string(),
parent_item_id: None,
row_index: 0,
source_id: None,
dedupe_key: Some("https://a.test".to_string()),
row_json: json!({ "url": "https://a.test" }),
},
codex_state::AgentJobItemCreateParams {
item_id: "dup-2".to_string(),
parent_item_id: None,
row_index: 1,
source_id: None,
dedupe_key: None,
row_json: json!({ "url": "https://b.test" }),
},
]
);
}
#[test]
fn queue_worker_prompt_mentions_enqueue_tool() {
let prompt = build_worker_prompt(&sample_queue_job(), &sample_queue_item()).expect("prompt");
assert!(prompt.contains("queue-draining agent job"));
assert!(prompt.contains("enqueue_agent_job_items"));
assert!(prompt.contains("parent_item_id"));
assert!(prompt.contains("report_agent_job_result"));
}
#[test]
fn render_job_queue_jsonl_outputs_expected_fields() {
let item = sample_queue_item();
let rendered = render_job_queue_jsonl(std::slice::from_ref(&item)).expect("render jsonl");
let line: Value = serde_json::from_str(rendered.trim()).expect("parse jsonl line");
let timestamp = item.reported_at.expect("reported_at").to_rfc3339();
assert_eq!(
line,
json!({
"job_id": "job-1",
"item_id": "root",
"parent_item_id": null,
"dedupe_key": "https://root.test",
"row_index": 0,
"status": "completed",
"attempt_count": 1,
"input": { "url": "https://root.test" },
"result": { "item_id": "root" },
"last_error": null,
"reported_at": timestamp,
"completed_at": timestamp,
})
);
}

View File

@@ -128,6 +128,7 @@ pub(crate) struct ToolsConfig {
pub experimental_supported_tools: Vec<String>,
pub agent_jobs_tools: bool,
pub agent_jobs_worker_tools: bool,
pub agent_jobs_queue_worker_tools: bool,
}
pub(crate) struct ToolsConfigParams<'a> {
@@ -206,12 +207,23 @@ impl ToolsConfig {
}
};
let agent_jobs_worker_tools = include_agent_jobs
&& matches!(
session_source,
let (agent_jobs_worker_tools, agent_jobs_queue_worker_tools) = if !include_agent_jobs {
(false, false)
} else {
match session_source {
SessionSource::SubAgent(SubAgentSource::Other(label))
if label.starts_with("agent_job:")
);
if label.starts_with("agent_job:queue:") =>
{
(true, true)
}
SessionSource::SubAgent(SubAgentSource::Other(label))
if label.starts_with("agent_job:csv:") || label.starts_with("agent_job:") =>
{
(true, false)
}
_ => (false, false),
}
};
Self {
available_models: available_models_ref.to_vec(),
@@ -240,6 +252,7 @@ impl ToolsConfig {
experimental_supported_tools: model_info.experimental_supported_tools.clone(),
agent_jobs_tools: include_agent_jobs,
agent_jobs_worker_tools,
agent_jobs_queue_worker_tools,
}
}
@@ -1057,6 +1070,177 @@ fn create_spawn_agents_on_csv_tool() -> ToolSpec {
})
}
fn create_agent_queue_item_schema() -> JsonSchema {
JsonSchema::Object {
properties: BTreeMap::from([
(
"input".to_string(),
JsonSchema::Object {
properties: BTreeMap::new(),
required: None,
additional_properties: None,
},
),
(
"item_id".to_string(),
JsonSchema::String {
description: Some(
"Optional stable identifier for this queue item.".to_string(),
),
},
),
(
"dedupe_key".to_string(),
JsonSchema::String {
description: Some(
"Optional deduplication key used to skip already-seen items.".to_string(),
),
},
),
]),
required: Some(vec!["input".to_string()]),
additional_properties: Some(false.into()),
}
}
fn create_spawn_agents_on_queue_tool() -> ToolSpec {
let queue_item_schema = create_agent_queue_item_schema();
let mut properties = BTreeMap::new();
properties.insert(
"seed_items".to_string(),
JsonSchema::Array {
items: Box::new(queue_item_schema),
description: Some(
"Initial queue items as JSON objects. Provide exactly one of `seed_items` or `seed_path`."
.to_string(),
),
},
);
properties.insert(
"seed_path".to_string(),
JsonSchema::String {
description: Some(
"Path to a JSON array or JSONL file containing initial queue items. Provide exactly one of `seed_items` or `seed_path`."
.to_string(),
),
},
);
properties.insert(
"instruction".to_string(),
JsonSchema::String {
description: Some(
"Instruction template to apply to each queue item. Use {field_name} placeholders to inject values from the item's `input` object."
.to_string(),
),
},
);
properties.insert(
"output_jsonl_path".to_string(),
JsonSchema::String {
description: Some("Optional output JSONL path for exported results.".to_string()),
},
);
properties.insert(
"max_concurrency".to_string(),
JsonSchema::Number {
description: Some(
"Maximum concurrent workers for this job. Defaults to 16 and is capped by config."
.to_string(),
),
},
);
properties.insert(
"max_workers".to_string(),
JsonSchema::Number {
description: Some(
"Alias for max_concurrency. Set to 1 to run sequentially.".to_string(),
),
},
);
properties.insert(
"max_runtime_seconds".to_string(),
JsonSchema::Number {
description: Some(
"Maximum runtime per worker before it is failed. Defaults to 1800 seconds."
.to_string(),
),
},
);
properties.insert(
"max_items".to_string(),
JsonSchema::Number {
description: Some(
"Hard cap on accepted queue items for this job, including seeds and later enqueues. Defaults to 1000."
.to_string(),
),
},
);
properties.insert(
"output_schema".to_string(),
JsonSchema::Object {
properties: BTreeMap::new(),
required: None,
additional_properties: None,
},
);
ToolSpec::Function(ResponsesApiTool {
name: "spawn_agents_on_queue".to_string(),
description: "Process a dynamic queue by spawning worker sub-agents for pending items until the queue is drained. Workers may call `enqueue_agent_job_items` zero or more times to append discovered work, and must call `report_agent_job_result` exactly once. This call blocks until the queue finishes and automatically exports a JSONL snapshot to `output_jsonl_path` (or a default path)."
.to_string(),
strict: false,
defer_loading: None,
parameters: JsonSchema::Object {
properties,
required: Some(vec!["instruction".to_string()]),
additional_properties: Some(false.into()),
},
output_schema: None,
})
}
fn create_enqueue_agent_job_items_tool() -> ToolSpec {
let mut properties = BTreeMap::new();
properties.insert(
"job_id".to_string(),
JsonSchema::String {
description: Some("Identifier of the job.".to_string()),
},
);
properties.insert(
"parent_item_id".to_string(),
JsonSchema::String {
description: Some("Identifier of the running parent queue item.".to_string()),
},
);
properties.insert(
"items".to_string(),
JsonSchema::Array {
items: Box::new(create_agent_queue_item_schema()),
description: Some(
"Queue items to append. Each item requires an `input` object and may include `item_id` and `dedupe_key`."
.to_string(),
),
},
);
ToolSpec::Function(ResponsesApiTool {
name: "enqueue_agent_job_items".to_string(),
description:
"Worker-only tool to append new pending items to a dynamic agent-job queue. Main agents should not call this."
.to_string(),
strict: false,
defer_loading: None,
parameters: JsonSchema::Object {
properties,
required: Some(vec![
"job_id".to_string(),
"parent_item_id".to_string(),
"items".to_string(),
]),
additional_properties: Some(false.into()),
},
output_schema: None,
})
}
fn create_report_agent_job_result_tool() -> ToolSpec {
let mut properties = BTreeMap::new();
properties.insert(
@@ -2724,7 +2908,14 @@ pub(crate) fn build_specs_with_discoverable_tools(
false,
config.code_mode_enabled,
);
push_tool_spec(
&mut builder,
create_spawn_agents_on_queue_tool(),
false,
config.code_mode_enabled,
);
builder.register_handler("spawn_agents_on_csv", agent_jobs_handler.clone());
builder.register_handler("spawn_agents_on_queue", agent_jobs_handler.clone());
if config.agent_jobs_worker_tools {
push_tool_spec(
&mut builder,
@@ -2732,7 +2923,16 @@ pub(crate) fn build_specs_with_discoverable_tools(
false,
config.code_mode_enabled,
);
builder.register_handler("report_agent_job_result", agent_jobs_handler);
builder.register_handler("report_agent_job_result", agent_jobs_handler.clone());
}
if config.agent_jobs_queue_worker_tools {
push_tool_spec(
&mut builder,
create_enqueue_agent_job_items_tool(),
false,
config.code_mode_enabled,
);
builder.register_handler("enqueue_agent_job_items", agent_jobs_handler);
}
}

View File

@@ -443,6 +443,7 @@ fn test_build_specs_collab_tools_enabled() {
&["spawn_agent", "send_input", "wait", "close_agent"],
);
assert_lacks_tool_name(&tools, "spawn_agents_on_csv");
assert_lacks_tool_name(&tools, "spawn_agents_on_queue");
}
#[test]
@@ -469,8 +470,10 @@ fn test_build_specs_enable_fanout_enables_agent_jobs_and_collab_tools() {
"wait",
"close_agent",
"spawn_agents_on_csv",
"spawn_agents_on_queue",
],
);
assert_lacks_tool_name(&tools, "enqueue_agent_job_items");
}
#[test]
@@ -555,7 +558,7 @@ fn test_build_specs_artifact_tool_enabled() {
}
#[test]
fn test_build_specs_agent_job_worker_tools_enabled() {
fn test_build_specs_csv_agent_job_worker_tools_enabled() {
let config = test_config();
let model_info = ModelsManager::construct_model_info_offline_for_tests("gpt-5-codex", &config);
let mut features = Features::with_defaults();
@@ -569,7 +572,7 @@ fn test_build_specs_agent_job_worker_tools_enabled() {
features: &features,
web_search_mode: Some(WebSearchMode::Cached),
session_source: SessionSource::SubAgent(SubAgentSource::Other(
"agent_job:test".to_string(),
"agent_job:csv:test".to_string(),
)),
});
let (tools, _) = build_specs(&tools_config, None, None, &[]).build();
@@ -582,9 +585,47 @@ fn test_build_specs_agent_job_worker_tools_enabled() {
"wait",
"close_agent",
"spawn_agents_on_csv",
"spawn_agents_on_queue",
"report_agent_job_result",
],
);
assert_lacks_tool_name(&tools, "enqueue_agent_job_items");
assert_lacks_tool_name(&tools, "request_user_input");
}
#[test]
fn test_build_specs_queue_agent_job_worker_tools_enabled() {
let config = test_config();
let model_info = ModelsManager::construct_model_info_offline_for_tests("gpt-5-codex", &config);
let mut features = Features::with_defaults();
features.enable(Feature::SpawnCsv);
features.normalize_dependencies();
features.enable(Feature::Sqlite);
let available_models = Vec::new();
let tools_config = ToolsConfig::new(&ToolsConfigParams {
model_info: &model_info,
available_models: &available_models,
features: &features,
web_search_mode: Some(WebSearchMode::Cached),
session_source: SessionSource::SubAgent(SubAgentSource::Other(
"agent_job:queue:test".to_string(),
)),
});
let (tools, _) = build_specs(&tools_config, None, None, &[]).build();
assert_contains_tool_names(
&tools,
&[
"spawn_agent",
"send_input",
"resume_agent",
"wait",
"close_agent",
"spawn_agents_on_csv",
"spawn_agents_on_queue",
"report_agent_job_result",
"enqueue_agent_job_items",
],
);
assert_lacks_tool_name(&tools, "request_user_input");
}

View File

@@ -7,6 +7,7 @@ use core_test_support::responses::sse;
use core_test_support::responses::sse_response;
use core_test_support::responses::start_mock_server;
use core_test_support::test_codex::test_codex;
use pretty_assertions::assert_eq;
use regex_lite::Regex;
use serde_json::Value;
use serde_json::json;
@@ -22,14 +23,16 @@ use wiremock::matchers::method;
use wiremock::matchers::path_regex;
struct AgentJobsResponder {
spawn_tool_name: String,
spawn_args_json: String,
seen_main: AtomicBool,
call_counter: AtomicUsize,
}
impl AgentJobsResponder {
fn new(spawn_args_json: String) -> Self {
fn new(spawn_tool_name: &str, spawn_args_json: String) -> Self {
Self {
spawn_tool_name: spawn_tool_name.to_string(),
spawn_args_json,
seen_main: AtomicBool::new(false),
call_counter: AtomicUsize::new(0),
@@ -38,14 +41,16 @@ impl AgentJobsResponder {
}
struct StopAfterFirstResponder {
spawn_tool_name: String,
spawn_args_json: String,
seen_main: AtomicBool,
worker_calls: Arc<AtomicUsize>,
}
impl StopAfterFirstResponder {
fn new(spawn_args_json: String, worker_calls: Arc<AtomicUsize>) -> Self {
fn new(spawn_tool_name: &str, spawn_args_json: String, worker_calls: Arc<AtomicUsize>) -> Self {
Self {
spawn_tool_name: spawn_tool_name.to_string(),
spawn_args_json,
seen_main: AtomicBool::new(false),
worker_calls,
@@ -53,50 +58,76 @@ impl StopAfterFirstResponder {
}
}
#[derive(Clone, Copy)]
enum QueueResponderMode {
ReportOnly,
EnqueueThenReport,
EnqueueTwiceThenReport,
EnqueueWithDuplicatesThenReport,
EnqueueWithoutReport,
StopAfterFirst,
}
struct QueueAgentJobsResponder {
spawn_args_json: String,
seen_main: AtomicBool,
call_counter: AtomicUsize,
worker_counter: AtomicUsize,
mode: QueueResponderMode,
}
impl QueueAgentJobsResponder {
fn new(spawn_args_json: String, mode: QueueResponderMode) -> Self {
Self {
spawn_args_json,
seen_main: AtomicBool::new(false),
call_counter: AtomicUsize::new(0),
worker_counter: AtomicUsize::new(0),
mode,
}
}
fn next_call_id(&self, prefix: &str) -> String {
let index = self.call_counter.fetch_add(1, Ordering::SeqCst);
format!("{prefix}-{index}")
}
}
impl Respond for StopAfterFirstResponder {
fn respond(&self, request: &wiremock::Request) -> ResponseTemplate {
let body_bytes = decode_body_bytes(request);
let body: Value = serde_json::from_slice(&body_bytes).unwrap_or(Value::Null);
if has_function_call_output(&body) {
return sse_response(sse(vec![
ev_response_created("resp-tool"),
ev_completed("resp-tool"),
]));
return completed_response("resp-tool");
}
if let Some((job_id, item_id)) = extract_job_and_item(&body) {
let call_index = self.worker_calls.fetch_add(1, Ordering::SeqCst);
let call_id = format!("call-worker-{call_index}");
let stop = call_index == 0;
let args = json!({
"job_id": job_id,
"item_id": item_id,
"result": { "item_id": item_id },
"stop": stop,
});
let args_json = serde_json::to_string(&args).unwrap_or_else(|err| {
panic!("worker args serialize: {err}");
});
return sse_response(sse(vec![
ev_response_created("resp-worker"),
ev_function_call(&call_id, "report_agent_job_result", &args_json),
ev_completed("resp-worker"),
]));
return response_with_events(
"resp-worker",
vec![report_result_event(
&call_id,
&job_id,
&item_id,
Some(call_index == 0),
)],
);
}
if !self.seen_main.swap(true, Ordering::SeqCst) {
return sse_response(sse(vec![
ev_response_created("resp-main"),
ev_function_call("call-spawn", "spawn_agents_on_csv", &self.spawn_args_json),
ev_completed("resp-main"),
]));
return response_with_events(
"resp-main",
vec![ev_function_call(
"call-spawn",
self.spawn_tool_name.as_str(),
&self.spawn_args_json,
)],
);
}
sse_response(sse(vec![
ev_response_created("resp-default"),
ev_completed("resp-default"),
]))
completed_response("resp-default")
}
}
@@ -106,10 +137,7 @@ impl Respond for AgentJobsResponder {
let body: Value = serde_json::from_slice(&body_bytes).unwrap_or(Value::Null);
if has_function_call_output(&body) {
return sse_response(sse(vec![
ev_response_created("resp-tool"),
ev_completed("resp-tool"),
]));
return completed_response("resp-tool");
}
if let Some((job_id, item_id)) = extract_job_and_item(&body) {
@@ -117,36 +145,242 @@ impl Respond for AgentJobsResponder {
"call-worker-{}",
self.call_counter.fetch_add(1, Ordering::SeqCst)
);
let args = json!({
"job_id": job_id,
"item_id": item_id,
"result": { "item_id": item_id }
});
let args_json = serde_json::to_string(&args).unwrap_or_else(|err| {
panic!("worker args serialize: {err}");
});
return sse_response(sse(vec![
ev_response_created("resp-worker"),
ev_function_call(&call_id, "report_agent_job_result", &args_json),
ev_completed("resp-worker"),
]));
return response_with_events(
"resp-worker",
vec![report_result_event(&call_id, &job_id, &item_id, None)],
);
}
if !self.seen_main.swap(true, Ordering::SeqCst) {
return sse_response(sse(vec![
ev_response_created("resp-main"),
ev_function_call("call-spawn", "spawn_agents_on_csv", &self.spawn_args_json),
ev_completed("resp-main"),
]));
return response_with_events(
"resp-main",
vec![ev_function_call(
"call-spawn",
self.spawn_tool_name.as_str(),
&self.spawn_args_json,
)],
);
}
sse_response(sse(vec![
ev_response_created("resp-default"),
ev_completed("resp-default"),
]))
completed_response("resp-default")
}
}
impl Respond for QueueAgentJobsResponder {
fn respond(&self, request: &wiremock::Request) -> ResponseTemplate {
let body_bytes = decode_body_bytes(request);
let body: Value = serde_json::from_slice(&body_bytes).unwrap_or(Value::Null);
if has_function_call_output(&body) {
return completed_response("resp-tool");
}
if let Some((job_id, item_id)) = extract_job_and_item(&body) {
let mut events = Vec::new();
match self.mode {
QueueResponderMode::ReportOnly => {
let call_id = self.next_call_id("call-report");
events.push(report_result_event(&call_id, &job_id, &item_id, None));
}
QueueResponderMode::EnqueueThenReport => {
if item_id == "root" {
let enqueue_call_id = self.next_call_id("call-enqueue");
events.push(enqueue_items_event(
&enqueue_call_id,
&job_id,
&item_id,
vec![
json!({
"item_id": "child-1",
"dedupe_key": "https://child-1.test",
"input": { "url": "https://child-1.test" }
}),
json!({
"item_id": "child-2",
"dedupe_key": "https://child-2.test",
"input": { "url": "https://child-2.test" }
}),
],
));
}
let report_call_id = self.next_call_id("call-report");
events.push(report_result_event(
&report_call_id,
&job_id,
&item_id,
None,
));
}
QueueResponderMode::EnqueueTwiceThenReport => {
if item_id == "root" {
let enqueue_call_id = self.next_call_id("call-enqueue");
events.push(enqueue_items_event(
&enqueue_call_id,
&job_id,
&item_id,
vec![json!({
"item_id": "child-1",
"dedupe_key": "https://child-1.test",
"input": { "url": "https://child-1.test" }
})],
));
let second_enqueue_call_id = self.next_call_id("call-enqueue");
events.push(enqueue_items_event(
&second_enqueue_call_id,
&job_id,
&item_id,
vec![json!({
"item_id": "child-2",
"dedupe_key": "https://child-2.test",
"input": { "url": "https://child-2.test" }
})],
));
}
let report_call_id = self.next_call_id("call-report");
events.push(report_result_event(
&report_call_id,
&job_id,
&item_id,
None,
));
}
QueueResponderMode::EnqueueWithDuplicatesThenReport => {
if item_id == "root" {
let enqueue_call_id = self.next_call_id("call-enqueue");
events.push(enqueue_items_event(
&enqueue_call_id,
&job_id,
&item_id,
vec![
json!({
"item_id": "child-1",
"dedupe_key": "https://child-1.test",
"input": { "url": "https://child-1.test" }
}),
json!({
"item_id": "child-dupe",
"dedupe_key": "https://child-1.test",
"input": { "url": "https://child-dupe.test" }
}),
json!({
"item_id": "child-2",
"dedupe_key": "https://child-2.test",
"input": { "url": "https://child-2.test" }
}),
],
));
}
let report_call_id = self.next_call_id("call-report");
events.push(report_result_event(
&report_call_id,
&job_id,
&item_id,
None,
));
}
QueueResponderMode::EnqueueWithoutReport => {
if item_id == "root" {
let enqueue_call_id = self.next_call_id("call-enqueue");
events.push(enqueue_items_event(
&enqueue_call_id,
&job_id,
&item_id,
vec![
json!({
"item_id": "child-1",
"dedupe_key": "https://child-1.test",
"input": { "url": "https://child-1.test" }
}),
json!({
"item_id": "child-2",
"dedupe_key": "https://child-2.test",
"input": { "url": "https://child-2.test" }
}),
],
));
} else {
let report_call_id = self.next_call_id("call-report");
events.push(report_result_event(
&report_call_id,
&job_id,
&item_id,
None,
));
}
}
QueueResponderMode::StopAfterFirst => {
let worker_index = self.worker_counter.fetch_add(1, Ordering::SeqCst);
let report_call_id = self.next_call_id("call-report");
events.push(report_result_event(
&report_call_id,
&job_id,
&item_id,
Some(worker_index == 0),
));
}
}
return response_with_events("resp-worker", events);
}
if !self.seen_main.swap(true, Ordering::SeqCst) {
return response_with_events(
"resp-main",
vec![ev_function_call(
"call-spawn",
"spawn_agents_on_queue",
&self.spawn_args_json,
)],
);
}
completed_response("resp-default")
}
}
fn response_with_events(response_id: &str, mut events: Vec<Value>) -> ResponseTemplate {
let mut full_events = Vec::with_capacity(events.len() + 2);
full_events.push(ev_response_created(response_id));
full_events.append(&mut events);
full_events.push(ev_completed(response_id));
sse_response(sse(full_events))
}
fn completed_response(response_id: &str) -> ResponseTemplate {
response_with_events(response_id, Vec::new())
}
fn report_result_event(call_id: &str, job_id: &str, item_id: &str, stop: Option<bool>) -> Value {
let mut args = json!({
"job_id": job_id,
"item_id": item_id,
"result": { "item_id": item_id },
});
if let Some(stop) = stop
&& let Some(object) = args.as_object_mut()
{
object.insert("stop".to_string(), Value::Bool(stop));
}
ev_function_call(call_id, "report_agent_job_result", &serialize_json(&args))
}
fn enqueue_items_event(
call_id: &str,
job_id: &str,
parent_item_id: &str,
items: Vec<Value>,
) -> Value {
let args = json!({
"job_id": job_id,
"parent_item_id": parent_item_id,
"items": items,
});
ev_function_call(call_id, "enqueue_agent_job_items", &serialize_json(&args))
}
fn serialize_json(value: &Value) -> String {
serde_json::to_string(value).unwrap_or_else(|err| panic!("json serialization failed: {err}"))
}
fn decode_body_bytes(request: &wiremock::Request) -> Vec<u8> {
let Some(encoding) = request
.headers
@@ -178,12 +412,17 @@ fn has_function_call_output(body: &Value) -> bool {
fn extract_job_and_item(body: &Value) -> Option<(String, String)> {
let texts = message_input_texts(body);
let mut combined = texts.join("\n");
let mut combined = texts.join(
"
",
);
if let Some(instructions) = body.get("instructions").and_then(Value::as_str) {
combined.push('\n');
combined.push_str(instructions);
}
if !combined.contains("You are processing one item for a generic agent job.") {
if !combined.contains("You are processing one item for a generic agent job.")
&& !combined.contains("You are processing one item in a queue-draining agent job.")
{
return None;
}
let job_id = Regex::new(r"Job ID:\s*([^\n]+)")
@@ -218,6 +457,16 @@ fn parse_simple_csv_line(line: &str) -> Vec<String> {
line.split(',').map(str::to_string).collect()
}
fn parse_jsonl_lines(path: &std::path::Path) -> Result<Vec<Value>> {
let content = fs::read_to_string(path)?;
content
.lines()
.filter(|line| !line.trim().is_empty())
.map(serde_json::from_str)
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(Into::into)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn report_agent_job_result_rejects_wrong_thread() -> Result<()> {
let server = start_mock_server().await;
@@ -244,7 +493,7 @@ async fn report_agent_job_result_rejects_wrong_thread() -> Result<()> {
});
let args_json = serde_json::to_string(&args)?;
let responder = AgentJobsResponder::new(args_json);
let responder = AgentJobsResponder::new("spawn_agents_on_csv", args_json);
Mock::given(method("POST"))
.and(path_regex(".*/responses$"))
.respond_with(responder)
@@ -310,7 +559,7 @@ async fn spawn_agents_on_csv_runs_and_exports() -> Result<()> {
});
let args_json = serde_json::to_string(&args)?;
let responder = AgentJobsResponder::new(args_json);
let responder = AgentJobsResponder::new("spawn_agents_on_csv", args_json);
Mock::given(method("POST"))
.and(path_regex(".*/responses$"))
.respond_with(responder)
@@ -354,7 +603,7 @@ async fn spawn_agents_on_csv_dedupes_item_ids() -> Result<()> {
});
let args_json = serde_json::to_string(&args)?;
let responder = AgentJobsResponder::new(args_json);
let responder = AgentJobsResponder::new("spawn_agents_on_csv", args_json);
Mock::given(method("POST"))
.and(path_regex(".*/responses$"))
.respond_with(responder)
@@ -413,7 +662,8 @@ async fn spawn_agents_on_csv_stop_halts_future_items() -> Result<()> {
let args_json = serde_json::to_string(&args)?;
let worker_calls = Arc::new(AtomicUsize::new(0));
let responder = StopAfterFirstResponder::new(args_json, worker_calls.clone());
let responder =
StopAfterFirstResponder::new("spawn_agents_on_csv", args_json, worker_calls.clone());
Mock::given(method("POST"))
.and(path_regex(".*/responses$"))
.respond_with(responder)
@@ -446,3 +696,432 @@ async fn spawn_agents_on_csv_stop_halts_future_items() -> Result<()> {
assert_eq!(worker_calls.load(Ordering::SeqCst), 1);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn spawn_agents_on_queue_runs_and_exports_jsonl() -> Result<()> {
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::SpawnCsv)
.expect("test config should allow feature update");
config
.features
.enable(Feature::Sqlite)
.expect("test config should allow feature update");
});
let test = builder.build(&server).await?;
let output_path = test.cwd_path().join("agent_queue_output.jsonl");
let args = json!({
"seed_items": [
{
"item_id": "seed-1",
"dedupe_key": "https://seed-1.test",
"input": { "url": "https://seed-1.test" }
},
{
"item_id": "seed-2",
"dedupe_key": "https://seed-2.test",
"input": { "url": "https://seed-2.test" }
}
],
"instruction": "Visit {url}",
"output_jsonl_path": output_path.display().to_string(),
});
let args_json = serde_json::to_string(&args)?;
let responder = QueueAgentJobsResponder::new(args_json, QueueResponderMode::ReportOnly);
Mock::given(method("POST"))
.and(path_regex(".*/responses$"))
.respond_with(responder)
.mount(&server)
.await;
test.submit_turn("run queue job").await?;
let lines = parse_jsonl_lines(&output_path)?;
assert_eq!(lines.len(), 2);
let item_ids: Vec<_> = lines
.iter()
.map(|line| line["item_id"].as_str().expect("item_id"))
.collect();
assert_eq!(item_ids, vec!["seed-1", "seed-2"]);
let statuses: Vec<_> = lines
.iter()
.map(|line| line["status"].as_str().expect("status"))
.collect();
assert_eq!(statuses, vec!["completed", "completed"]);
let job_id = lines[0]["job_id"].as_str().expect("job_id");
let db = test.codex.state_db().expect("state db");
let job = db.get_agent_job(job_id).await?.expect("job");
assert_eq!(job.kind, codex_state::AgentJobKind::DynamicQueue);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn spawn_agents_on_queue_supports_seed_path_json_array() -> Result<()> {
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::SpawnCsv)
.expect("test config should allow feature update");
config
.features
.enable(Feature::Sqlite)
.expect("test config should allow feature update");
});
let test = builder.build(&server).await?;
let seed_path = test.cwd_path().join("agent_queue_seed.json");
fs::write(
&seed_path,
serde_json::to_string(&json!([
{
"item_id": "seed-json",
"input": { "url": "https://seed-json.test" }
}
]))?,
)?;
let output_path = test.cwd_path().join("agent_queue_seed_array.jsonl");
let args = json!({
"seed_path": seed_path.display().to_string(),
"instruction": "Visit {url}",
"output_jsonl_path": output_path.display().to_string(),
});
let args_json = serde_json::to_string(&args)?;
let responder = QueueAgentJobsResponder::new(args_json, QueueResponderMode::ReportOnly);
Mock::given(method("POST"))
.and(path_regex(".*/responses$"))
.respond_with(responder)
.mount(&server)
.await;
test.submit_turn("run queue job from json array").await?;
let lines = parse_jsonl_lines(&output_path)?;
assert_eq!(lines.len(), 1);
assert_eq!(lines[0]["item_id"].as_str(), Some("seed-json"));
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn spawn_agents_on_queue_supports_seed_path_jsonl() -> Result<()> {
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::SpawnCsv)
.expect("test config should allow feature update");
config
.features
.enable(Feature::Sqlite)
.expect("test config should allow feature update");
});
let test = builder.build(&server).await?;
let seed_path = test.cwd_path().join("agent_queue_seed.jsonl");
fs::write(
&seed_path,
format!(
"{}
{}
",
serialize_json(&json!({
"item_id": "seed-jsonl-1",
"input": { "url": "https://seed-jsonl-1.test" }
})),
serialize_json(&json!({
"item_id": "seed-jsonl-2",
"input": { "url": "https://seed-jsonl-2.test" }
})),
),
)?;
let output_path = test.cwd_path().join("agent_queue_seed_jsonl.jsonl");
let args = json!({
"seed_path": seed_path.display().to_string(),
"instruction": "Visit {url}",
"output_jsonl_path": output_path.display().to_string(),
});
let args_json = serde_json::to_string(&args)?;
let responder = QueueAgentJobsResponder::new(args_json, QueueResponderMode::ReportOnly);
Mock::given(method("POST"))
.and(path_regex(".*/responses$"))
.respond_with(responder)
.mount(&server)
.await;
test.submit_turn("run queue job from jsonl").await?;
let lines = parse_jsonl_lines(&output_path)?;
assert_eq!(lines.len(), 2);
let item_ids: Vec<_> = lines
.iter()
.map(|line| line["item_id"].as_str().expect("item_id"))
.collect();
assert_eq!(item_ids, vec!["seed-jsonl-1", "seed-jsonl-2"]);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn queue_worker_can_enqueue_more_items() -> Result<()> {
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::SpawnCsv)
.expect("test config should allow feature update");
config
.features
.enable(Feature::Sqlite)
.expect("test config should allow feature update");
});
let test = builder.build(&server).await?;
let output_path = test.cwd_path().join("agent_queue_children.jsonl");
let args = json!({
"seed_items": [{
"item_id": "root",
"input": { "url": "https://root.test" }
}],
"instruction": "Visit {url}",
"output_jsonl_path": output_path.display().to_string(),
});
let args_json = serde_json::to_string(&args)?;
let responder = QueueAgentJobsResponder::new(args_json, QueueResponderMode::EnqueueThenReport);
Mock::given(method("POST"))
.and(path_regex(".*/responses$"))
.respond_with(responder)
.mount(&server)
.await;
test.submit_turn("run expanding queue job").await?;
let lines = parse_jsonl_lines(&output_path)?;
assert_eq!(lines.len(), 3);
let item_ids: Vec<_> = lines
.iter()
.map(|line| line["item_id"].as_str().expect("item_id"))
.collect();
assert_eq!(item_ids, vec!["root", "child-1", "child-2"]);
let statuses: Vec<_> = lines
.iter()
.map(|line| line["status"].as_str().expect("status"))
.collect();
assert_eq!(statuses, vec!["completed", "completed", "completed"]);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn queue_worker_multiple_enqueue_calls_are_supported() -> Result<()> {
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::SpawnCsv)
.expect("test config should allow feature update");
config
.features
.enable(Feature::Sqlite)
.expect("test config should allow feature update");
});
let test = builder.build(&server).await?;
let output_path = test.cwd_path().join("agent_queue_multiple_enqueue.jsonl");
let args = json!({
"seed_items": [{
"item_id": "root",
"input": { "url": "https://root.test" }
}],
"instruction": "Visit {url}",
"output_jsonl_path": output_path.display().to_string(),
});
let args_json = serde_json::to_string(&args)?;
let responder =
QueueAgentJobsResponder::new(args_json, QueueResponderMode::EnqueueTwiceThenReport);
Mock::given(method("POST"))
.and(path_regex(".*/responses$"))
.respond_with(responder)
.mount(&server)
.await;
test.submit_turn("run multi-enqueue queue job").await?;
let lines = parse_jsonl_lines(&output_path)?;
let mut item_ids: Vec<_> = lines
.iter()
.map(|line| line["item_id"].as_str().expect("item_id"))
.collect();
item_ids.sort_unstable();
assert_eq!(item_ids, vec!["child-1", "child-2", "root"]);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn queue_dedupe_key_skips_duplicate_urls() -> Result<()> {
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::SpawnCsv)
.expect("test config should allow feature update");
config
.features
.enable(Feature::Sqlite)
.expect("test config should allow feature update");
});
let test = builder.build(&server).await?;
let output_path = test.cwd_path().join("agent_queue_dedupe.jsonl");
let args = json!({
"seed_items": [{
"item_id": "root",
"input": { "url": "https://root.test" }
}],
"instruction": "Visit {url}",
"output_jsonl_path": output_path.display().to_string(),
});
let args_json = serde_json::to_string(&args)?;
let responder = QueueAgentJobsResponder::new(
args_json,
QueueResponderMode::EnqueueWithDuplicatesThenReport,
);
Mock::given(method("POST"))
.and(path_regex(".*/responses$"))
.respond_with(responder)
.mount(&server)
.await;
test.submit_turn("run deduping queue job").await?;
let lines = parse_jsonl_lines(&output_path)?;
assert_eq!(lines.len(), 3);
let item_ids: Vec<_> = lines
.iter()
.map(|line| line["item_id"].as_str().expect("item_id"))
.collect();
assert_eq!(item_ids, vec!["root", "child-1", "child-2"]);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn queue_parent_failure_keeps_children() -> Result<()> {
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::SpawnCsv)
.expect("test config should allow feature update");
config
.features
.enable(Feature::Sqlite)
.expect("test config should allow feature update");
});
let test = builder.build(&server).await?;
let output_path = test.cwd_path().join("agent_queue_parent_failure.jsonl");
let args = json!({
"seed_items": [{
"item_id": "root",
"input": { "url": "https://root.test" }
}],
"instruction": "Visit {url}",
"output_jsonl_path": output_path.display().to_string(),
});
let args_json = serde_json::to_string(&args)?;
let responder =
QueueAgentJobsResponder::new(args_json, QueueResponderMode::EnqueueWithoutReport);
Mock::given(method("POST"))
.and(path_regex(".*/responses$"))
.respond_with(responder)
.mount(&server)
.await;
test.submit_turn("run queue parent failure job").await?;
let lines = parse_jsonl_lines(&output_path)?;
assert_eq!(lines.len(), 3);
let root = lines
.iter()
.find(|line| line["item_id"].as_str() == Some("root"))
.expect("root item");
assert_eq!(root["status"].as_str(), Some("failed"));
assert_eq!(
root["last_error"].as_str(),
Some("worker finished without calling report_agent_job_result")
);
let child_statuses: Vec<_> = lines
.iter()
.filter(|line| line["item_id"].as_str() != Some("root"))
.map(|line| line["status"].as_str().expect("status"))
.collect();
assert_eq!(child_statuses, vec!["completed", "completed"]);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn queue_stop_cancels_remaining_pending_items() -> Result<()> {
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::SpawnCsv)
.expect("test config should allow feature update");
config
.features
.enable(Feature::Sqlite)
.expect("test config should allow feature update");
});
let test = builder.build(&server).await?;
let output_path = test.cwd_path().join("agent_queue_stop.jsonl");
let args = json!({
"seed_items": [
{ "item_id": "seed-1", "input": { "url": "https://seed-1.test" } },
{ "item_id": "seed-2", "input": { "url": "https://seed-2.test" } },
{ "item_id": "seed-3", "input": { "url": "https://seed-3.test" } }
],
"instruction": "Visit {url}",
"output_jsonl_path": output_path.display().to_string(),
"max_concurrency": 1,
});
let args_json = serde_json::to_string(&args)?;
let responder = QueueAgentJobsResponder::new(args_json, QueueResponderMode::StopAfterFirst);
Mock::given(method("POST"))
.and(path_regex(".*/responses$"))
.respond_with(responder)
.mount(&server)
.await;
test.submit_turn("run stopping queue job").await?;
let lines = parse_jsonl_lines(&output_path)?;
assert_eq!(lines.len(), 3);
let statuses: Vec<_> = lines
.iter()
.map(|line| line["status"].as_str().expect("status"))
.collect();
assert_eq!(statuses, vec!["completed", "pending", "pending"]);
let job_id = lines[0]["job_id"].as_str().expect("job_id");
let db = test.codex.state_db().expect("state db");
let job = db.get_agent_job(job_id).await?.expect("job");
assert_eq!(job.status, codex_state::AgentJobStatus::Cancelled);
let progress = db.get_agent_job_progress(job_id).await?;
assert_eq!(progress.total_items, 3);
assert_eq!(progress.completed_items, 1);
assert_eq!(progress.failed_items, 0);
assert_eq!(progress.running_items, 0);
assert_eq!(progress.pending_items, 2);
Ok(())
}

View File

@@ -0,0 +1,21 @@
ALTER TABLE agent_jobs
ADD COLUMN kind TEXT NOT NULL DEFAULT 'csv_batch';
ALTER TABLE agent_jobs
ADD COLUMN max_items INTEGER;
ALTER TABLE agent_job_items
ADD COLUMN parent_item_id TEXT;
ALTER TABLE agent_job_items
ADD COLUMN dedupe_key TEXT;
CREATE INDEX idx_agent_job_items_row_index
ON agent_job_items(job_id, row_index ASC);
CREATE INDEX idx_agent_job_items_parent
ON agent_job_items(job_id, parent_item_id, row_index ASC);
CREATE UNIQUE INDEX idx_agent_job_items_dedupe_key
ON agent_job_items(job_id, dedupe_key)
WHERE dedupe_key IS NOT NULL;

View File

@@ -29,12 +29,15 @@ pub use model::AgentJobCreateParams;
pub use model::AgentJobItem;
pub use model::AgentJobItemCreateParams;
pub use model::AgentJobItemStatus;
pub use model::AgentJobKind;
pub use model::AgentJobProgress;
pub use model::AgentJobStatus;
pub use model::Anchor;
pub use model::BackfillState;
pub use model::BackfillStats;
pub use model::BackfillStatus;
pub use model::EnqueueAgentJobItemOutcome;
pub use model::EnqueueAgentJobItemsResult;
pub use model::ExtractionOutcome;
pub use model::SortKey;
pub use model::Stage1JobClaim;
@@ -56,7 +59,7 @@ pub const SQLITE_HOME_ENV: &str = "CODEX_SQLITE_HOME";
pub const LOGS_DB_FILENAME: &str = "logs";
pub const LOGS_DB_VERSION: u32 = 1;
pub const STATE_DB_FILENAME: &str = "state";
pub const STATE_DB_VERSION: u32 = 5;
pub const STATE_DB_VERSION: u32 = 6;
/// Errors encountered during DB operations. Tags: [stage]
pub const DB_ERROR_METRIC: &str = "codex.db.error";

View File

@@ -12,6 +12,29 @@ pub enum AgentJobStatus {
Cancelled,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AgentJobKind {
CsvBatch,
DynamicQueue,
}
impl AgentJobKind {
pub const fn as_str(self) -> &'static str {
match self {
AgentJobKind::CsvBatch => "csv_batch",
AgentJobKind::DynamicQueue => "dynamic_queue",
}
}
pub fn parse(value: &str) -> Result<Self> {
match value {
"csv_batch" => Ok(Self::CsvBatch),
"dynamic_queue" => Ok(Self::DynamicQueue),
_ => Err(anyhow::anyhow!("invalid agent job kind: {value}")),
}
}
}
impl AgentJobStatus {
pub const fn as_str(self) -> &'static str {
match self {
@@ -75,9 +98,11 @@ impl AgentJobItemStatus {
pub struct AgentJob {
pub id: String,
pub name: String,
pub kind: AgentJobKind,
pub status: AgentJobStatus,
pub instruction: String,
pub auto_export: bool,
pub max_items: Option<u64>,
pub max_runtime_seconds: Option<u64>,
// TODO(jif-oai): Convert to JSON Schema and enforce structured outputs.
pub output_schema_json: Option<Value>,
@@ -95,8 +120,10 @@ pub struct AgentJob {
pub struct AgentJobItem {
pub job_id: String,
pub item_id: String,
pub parent_item_id: Option<String>,
pub row_index: i64,
pub source_id: Option<String>,
pub dedupe_key: Option<String>,
pub row_json: Value,
pub status: AgentJobItemStatus,
pub assigned_thread_id: Option<String>,
@@ -118,12 +145,14 @@ pub struct AgentJobProgress {
pub failed_items: usize,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct AgentJobCreateParams {
pub id: String,
pub name: String,
pub kind: AgentJobKind,
pub instruction: String,
pub auto_export: bool,
pub max_items: Option<u64>,
pub max_runtime_seconds: Option<u64>,
pub output_schema_json: Option<Value>,
pub input_headers: Vec<String>,
@@ -131,21 +160,45 @@ pub struct AgentJobCreateParams {
pub output_csv_path: String,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct AgentJobItemCreateParams {
pub item_id: String,
pub parent_item_id: Option<String>,
pub row_index: i64,
pub source_id: Option<String>,
pub dedupe_key: Option<String>,
pub row_json: Value,
}
#[derive(Debug, Clone, PartialEq)]
pub struct EnqueueAgentJobItemsResult {
pub outcomes: Vec<EnqueueAgentJobItemOutcome>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum EnqueueAgentJobItemOutcome {
Enqueued {
item: AgentJobItem,
},
Deduped {
item: AgentJobItem,
},
MaxItemsReached {
requested_item_id: String,
parent_item_id: Option<String>,
dedupe_key: Option<String>,
},
}
#[derive(Debug, sqlx::FromRow)]
pub(crate) struct AgentJobRow {
pub(crate) id: String,
pub(crate) name: String,
pub(crate) kind: String,
pub(crate) status: String,
pub(crate) instruction: String,
pub(crate) auto_export: i64,
pub(crate) max_items: Option<i64>,
pub(crate) max_runtime_seconds: Option<i64>,
pub(crate) output_schema_json: Option<String>,
pub(crate) input_headers_json: String,
@@ -168,6 +221,11 @@ impl TryFrom<AgentJobRow> for AgentJob {
.map(serde_json::from_str)
.transpose()?;
let input_headers = serde_json::from_str(value.input_headers_json.as_str())?;
let max_items = value
.max_items
.map(u64::try_from)
.transpose()
.map_err(|_| anyhow::anyhow!("invalid max_items value"))?;
let max_runtime_seconds = value
.max_runtime_seconds
.map(u64::try_from)
@@ -176,9 +234,11 @@ impl TryFrom<AgentJobRow> for AgentJob {
Ok(Self {
id: value.id,
name: value.name,
kind: AgentJobKind::parse(value.kind.as_str())?,
status: AgentJobStatus::parse(value.status.as_str())?,
instruction: value.instruction,
auto_export: value.auto_export != 0,
max_items,
max_runtime_seconds,
output_schema_json,
input_headers,
@@ -203,8 +263,10 @@ impl TryFrom<AgentJobRow> for AgentJob {
pub(crate) struct AgentJobItemRow {
pub(crate) job_id: String,
pub(crate) item_id: String,
pub(crate) parent_item_id: Option<String>,
pub(crate) row_index: i64,
pub(crate) source_id: Option<String>,
pub(crate) dedupe_key: Option<String>,
pub(crate) row_json: String,
pub(crate) status: String,
pub(crate) assigned_thread_id: Option<String>,
@@ -224,8 +286,10 @@ impl TryFrom<AgentJobItemRow> for AgentJobItem {
Ok(Self {
job_id: value.job_id,
item_id: value.item_id,
parent_item_id: value.parent_item_id,
row_index: value.row_index,
source_id: value.source_id,
dedupe_key: value.dedupe_key,
row_json: serde_json::from_str(value.row_json.as_str())?,
status: AgentJobItemStatus::parse(value.status.as_str())?,
assigned_thread_id: value.assigned_thread_id,

View File

@@ -9,8 +9,11 @@ pub use agent_job::AgentJobCreateParams;
pub use agent_job::AgentJobItem;
pub use agent_job::AgentJobItemCreateParams;
pub use agent_job::AgentJobItemStatus;
pub use agent_job::AgentJobKind;
pub use agent_job::AgentJobProgress;
pub use agent_job::AgentJobStatus;
pub use agent_job::EnqueueAgentJobItemOutcome;
pub use agent_job::EnqueueAgentJobItemsResult;
pub use backfill_state::BackfillState;
pub use backfill_state::BackfillStatus;
pub use log::LogEntry;

View File

@@ -1,5 +1,8 @@
use super::*;
use crate::model::AgentJobItemRow;
use crate::model::AgentJobKind;
use crate::model::EnqueueAgentJobItemOutcome;
use crate::model::EnqueueAgentJobItemsResult;
impl StateRuntime {
pub async fn create_agent_job(
@@ -14,6 +17,11 @@ impl StateRuntime {
.as_ref()
.map(serde_json::to_string)
.transpose()?;
let max_items = params
.max_items
.map(i64::try_from)
.transpose()
.map_err(|_| anyhow::anyhow!("invalid max_items value"))?;
let max_runtime_seconds = params
.max_runtime_seconds
.map(i64::try_from)
@@ -25,9 +33,11 @@ impl StateRuntime {
INSERT INTO agent_jobs (
id,
name,
kind,
status,
instruction,
auto_export,
max_items,
max_runtime_seconds,
output_schema_json,
input_headers_json,
@@ -38,14 +48,16 @@ INSERT INTO agent_jobs (
started_at,
completed_at,
last_error
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL, NULL, NULL)
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL, NULL, NULL)
"#,
)
.bind(params.id.as_str())
.bind(params.name.as_str())
.bind(params.kind.as_str())
.bind(AgentJobStatus::Pending.as_str())
.bind(params.instruction.as_str())
.bind(i64::from(params.auto_export))
.bind(max_items)
.bind(max_runtime_seconds)
.bind(output_schema_json)
.bind(input_headers_json)
@@ -63,8 +75,10 @@ INSERT INTO agent_jobs (
INSERT INTO agent_job_items (
job_id,
item_id,
parent_item_id,
row_index,
source_id,
dedupe_key,
row_json,
status,
assigned_thread_id,
@@ -75,13 +89,15 @@ INSERT INTO agent_job_items (
updated_at,
completed_at,
reported_at
) VALUES (?, ?, ?, ?, ?, ?, NULL, 0, NULL, NULL, ?, ?, NULL, NULL)
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, NULL, 0, NULL, NULL, ?, ?, NULL, NULL)
"#,
)
.bind(params.id.as_str())
.bind(item.item_id.as_str())
.bind(item.parent_item_id.as_deref())
.bind(item.row_index)
.bind(item.source_id.as_deref())
.bind(item.dedupe_key.as_deref())
.bind(row_json)
.bind(AgentJobItemStatus::Pending.as_str())
.bind(now)
@@ -104,9 +120,11 @@ INSERT INTO agent_job_items (
SELECT
id,
name,
kind,
status,
instruction,
auto_export,
max_items,
max_runtime_seconds,
output_schema_json,
input_headers_json,
@@ -138,8 +156,10 @@ WHERE id = ?
SELECT
job_id,
item_id,
parent_item_id,
row_index,
source_id,
dedupe_key,
row_json,
status,
assigned_thread_id,
@@ -181,8 +201,10 @@ WHERE job_id =
SELECT
job_id,
item_id,
parent_item_id,
row_index,
source_id,
dedupe_key,
row_json,
status,
assigned_thread_id,
@@ -204,6 +226,288 @@ WHERE job_id = ? AND item_id = ?
row.map(AgentJobItem::try_from).transpose()
}
pub async fn enqueue_agent_job_items(
&self,
job_id: &str,
parent_item_id: &str,
reporting_thread_id: &str,
items: &[AgentJobItemCreateParams],
) -> anyhow::Result<EnqueueAgentJobItemsResult> {
let now = Utc::now().timestamp();
let mut tx = self.pool.begin_with("BEGIN IMMEDIATE").await?;
let job_row = sqlx::query_as::<_, AgentJobRow>(
r#"
SELECT
id,
name,
kind,
status,
instruction,
auto_export,
max_items,
max_runtime_seconds,
output_schema_json,
input_headers_json,
input_csv_path,
output_csv_path,
created_at,
updated_at,
started_at,
completed_at,
last_error
FROM agent_jobs
WHERE id = ?
"#,
)
.bind(job_id)
.fetch_optional(&mut *tx)
.await?
.ok_or_else(|| anyhow::anyhow!("agent job {job_id} not found"))?;
let job = AgentJob::try_from(job_row)?;
if job.kind != AgentJobKind::DynamicQueue {
return Err(anyhow::anyhow!(
"agent job {job_id} does not support queue item enqueue"
));
}
if job.status.is_final() {
return Err(anyhow::anyhow!(
"agent job {job_id} is already {}",
job.status.as_str()
));
}
let parent_owner = sqlx::query(
r#"
SELECT 1
FROM agent_job_items
WHERE
job_id = ?
AND item_id = ?
AND status = ?
AND assigned_thread_id = ?
"#,
)
.bind(job_id)
.bind(parent_item_id)
.bind(AgentJobItemStatus::Running.as_str())
.bind(reporting_thread_id)
.fetch_optional(&mut *tx)
.await?;
if parent_owner.is_none() {
return Err(anyhow::anyhow!(
"agent job parent item {parent_item_id} is not owned by thread {reporting_thread_id}"
));
}
let mut total_items: i64 = sqlx::query_scalar(
r#"
SELECT COUNT(*)
FROM agent_job_items
WHERE job_id = ?
"#,
)
.bind(job_id)
.fetch_one(&mut *tx)
.await?;
let mut next_row_index: i64 = sqlx::query_scalar(
r#"
SELECT COALESCE(MAX(row_index), -1)
FROM agent_job_items
WHERE job_id = ?
"#,
)
.bind(job_id)
.fetch_one(&mut *tx)
.await?;
next_row_index = next_row_index.saturating_add(1);
let max_items = job
.max_items
.map(i64::try_from)
.transpose()
.map_err(|_| anyhow::anyhow!("invalid max_items value"))?;
let mut inserted_any = false;
let mut reserved_item_ids = BTreeSet::new();
let mut outcomes = Vec::with_capacity(items.len());
for item in items {
let resolved_parent_item_id = match item.parent_item_id.as_deref() {
Some(item_parent_item_id) if item_parent_item_id != parent_item_id => {
return Err(anyhow::anyhow!(
"enqueue item {} parent_item_id mismatch: expected {parent_item_id}, got {item_parent_item_id}",
item.item_id
));
}
Some(_) | None => Some(parent_item_id.to_string()),
};
if let Some(dedupe_key) = item.dedupe_key.as_deref() {
let existing_row = sqlx::query_as::<_, AgentJobItemRow>(
r#"
SELECT
job_id,
item_id,
parent_item_id,
row_index,
source_id,
dedupe_key,
row_json,
status,
assigned_thread_id,
attempt_count,
result_json,
last_error,
created_at,
updated_at,
completed_at,
reported_at
FROM agent_job_items
WHERE job_id = ? AND dedupe_key = ?
"#,
)
.bind(job_id)
.bind(dedupe_key)
.fetch_optional(&mut *tx)
.await?;
if let Some(existing_row) = existing_row {
outcomes.push(EnqueueAgentJobItemOutcome::Deduped {
item: AgentJobItem::try_from(existing_row)?,
});
continue;
}
}
if max_items.is_some_and(|max_items| total_items >= max_items) {
outcomes.push(EnqueueAgentJobItemOutcome::MaxItemsReached {
requested_item_id: item.item_id.clone(),
parent_item_id: resolved_parent_item_id,
dedupe_key: item.dedupe_key.clone(),
});
continue;
}
let base_item_id = item.item_id.as_str();
let mut resolved_item_id = base_item_id.to_string();
let mut suffix = 2usize;
loop {
if reserved_item_ids.contains(resolved_item_id.as_str()) {
resolved_item_id = format!("{base_item_id}-{suffix}");
suffix = suffix.saturating_add(1);
continue;
}
let existing_item = sqlx::query(
r#"
SELECT 1
FROM agent_job_items
WHERE job_id = ? AND item_id = ?
"#,
)
.bind(job_id)
.bind(resolved_item_id.as_str())
.fetch_optional(&mut *tx)
.await?;
if existing_item.is_none() {
break;
}
resolved_item_id = format!("{base_item_id}-{suffix}");
suffix = suffix.saturating_add(1);
}
reserved_item_ids.insert(resolved_item_id.clone());
let row_json = serde_json::to_string(&item.row_json)?;
sqlx::query(
r#"
INSERT INTO agent_job_items (
job_id,
item_id,
parent_item_id,
row_index,
source_id,
dedupe_key,
row_json,
status,
assigned_thread_id,
attempt_count,
result_json,
last_error,
created_at,
updated_at,
completed_at,
reported_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, NULL, 0, NULL, NULL, ?, ?, NULL, NULL)
"#,
)
.bind(job_id)
.bind(resolved_item_id.as_str())
.bind(resolved_parent_item_id.as_deref())
.bind(next_row_index)
.bind(item.source_id.as_deref())
.bind(item.dedupe_key.as_deref())
.bind(row_json)
.bind(AgentJobItemStatus::Pending.as_str())
.bind(now)
.bind(now)
.execute(&mut *tx)
.await?;
let inserted_row = sqlx::query_as::<_, AgentJobItemRow>(
r#"
SELECT
job_id,
item_id,
parent_item_id,
row_index,
source_id,
dedupe_key,
row_json,
status,
assigned_thread_id,
attempt_count,
result_json,
last_error,
created_at,
updated_at,
completed_at,
reported_at
FROM agent_job_items
WHERE job_id = ? AND item_id = ?
"#,
)
.bind(job_id)
.bind(resolved_item_id.as_str())
.fetch_one(&mut *tx)
.await?;
outcomes.push(EnqueueAgentJobItemOutcome::Enqueued {
item: AgentJobItem::try_from(inserted_row)?,
});
inserted_any = true;
total_items = total_items.saturating_add(1);
next_row_index = next_row_index.saturating_add(1);
}
if inserted_any {
sqlx::query(
r#"
UPDATE agent_jobs
SET updated_at = ?
WHERE id = ?
"#,
)
.bind(now)
.bind(job_id)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
Ok(EnqueueAgentJobItemsResult { outcomes })
}
pub async fn mark_agent_job_running(&self, job_id: &str) -> anyhow::Result<()> {
let now = Utc::now().timestamp();
sqlx::query(
@@ -560,3 +864,290 @@ WHERE job_id = ?
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::runtime::test_support::unique_temp_dir;
use pretty_assertions::assert_eq;
use serde_json::json;
fn queue_job(job_id: &str, max_items: Option<u64>) -> AgentJobCreateParams {
AgentJobCreateParams {
id: job_id.to_string(),
name: format!("queue-job-{job_id}"),
kind: AgentJobKind::DynamicQueue,
instruction: "process {value}".to_string(),
auto_export: false,
max_items,
max_runtime_seconds: None,
output_schema_json: None,
input_headers: vec!["value".to_string()],
input_csv_path: "seed.csv".to_string(),
output_csv_path: "seed-out.csv".to_string(),
}
}
fn item(
item_id: &str,
row_index: i64,
parent_item_id: Option<&str>,
dedupe_key: Option<&str>,
value: &str,
) -> AgentJobItemCreateParams {
AgentJobItemCreateParams {
item_id: item_id.to_string(),
parent_item_id: parent_item_id.map(str::to_string),
row_index,
source_id: Some(format!("source-{item_id}")),
dedupe_key: dedupe_key.map(str::to_string),
row_json: json!({"value": value}),
}
}
#[tokio::test]
async fn create_agent_job_persists_dynamic_queue_fields() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home, "test-provider".to_string())
.await
.expect("state db should initialize");
let job = runtime
.create_agent_job(
&queue_job("job-queue-persist", Some(5)),
&[
item("root", 0, None, None, "root"),
item("child", 1, Some("root"), Some("child-dedupe"), "child"),
],
)
.await
.expect("create agent job should succeed");
let items = runtime
.list_agent_job_items(job.id.as_str(), None, None)
.await
.expect("list agent job items should succeed");
assert_eq!(job.kind, AgentJobKind::DynamicQueue);
assert_eq!(job.max_items, Some(5));
let item_summary: Vec<_> = items
.iter()
.map(|item| {
(
item.item_id.clone(),
item.parent_item_id.clone(),
item.dedupe_key.clone(),
item.row_index,
)
})
.collect();
assert_eq!(
item_summary,
vec![
("root".to_string(), None, None, 0),
(
"child".to_string(),
Some("root".to_string()),
Some("child-dedupe".to_string()),
1,
),
]
);
}
#[tokio::test]
async fn enqueue_agent_job_items_dedupes_suffixes_and_caps() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home, "test-provider".to_string())
.await
.expect("state db should initialize");
let job = runtime
.create_agent_job(
&queue_job("job-queue-enqueue", Some(4)),
&[
item("root", 0, None, None, "root"),
item(
"existing",
1,
Some("root"),
Some("existing-dedupe"),
"existing",
),
],
)
.await
.expect("create queue job should succeed");
assert!(
runtime
.mark_agent_job_item_running_with_thread(
job.id.as_str(),
"root",
"thread-owned-parent",
)
.await
.expect("mark parent running should succeed"),
"parent item should become running"
);
let result = runtime
.enqueue_agent_job_items(
job.id.as_str(),
"root",
"thread-owned-parent",
&[
item("foo", -1, None, Some("new-a"), "foo-a"),
item(
"ignored",
-1,
Some("root"),
Some("existing-dedupe"),
"deduped",
),
item("foo", -1, None, Some("new-b"), "foo-b"),
item("baz", -1, None, Some("new-c"), "baz"),
],
)
.await
.expect("enqueue agent job items should succeed");
let outcome_summary: Vec<_> = result
.outcomes
.iter()
.map(|outcome| match outcome {
EnqueueAgentJobItemOutcome::Enqueued { item } => (
"enqueued",
item.item_id.clone(),
item.parent_item_id.clone(),
item.dedupe_key.clone(),
Some(item.row_index),
),
EnqueueAgentJobItemOutcome::Deduped { item } => (
"deduped",
item.item_id.clone(),
item.parent_item_id.clone(),
item.dedupe_key.clone(),
Some(item.row_index),
),
EnqueueAgentJobItemOutcome::MaxItemsReached {
requested_item_id,
parent_item_id,
dedupe_key,
} => (
"max_items_reached",
requested_item_id.clone(),
parent_item_id.clone(),
dedupe_key.clone(),
None,
),
})
.collect();
assert_eq!(
outcome_summary,
vec![
(
"enqueued",
"foo".to_string(),
Some("root".to_string()),
Some("new-a".to_string()),
Some(2),
),
(
"deduped",
"existing".to_string(),
Some("root".to_string()),
Some("existing-dedupe".to_string()),
Some(1),
),
(
"enqueued",
"foo-2".to_string(),
Some("root".to_string()),
Some("new-b".to_string()),
Some(3),
),
(
"max_items_reached",
"baz".to_string(),
Some("root".to_string()),
Some("new-c".to_string()),
None,
),
]
);
let persisted_items = runtime
.list_agent_job_items(job.id.as_str(), None, None)
.await
.expect("list agent job items should succeed");
let persisted_summary: Vec<_> = persisted_items
.iter()
.map(|item| {
(
item.item_id.clone(),
item.parent_item_id.clone(),
item.dedupe_key.clone(),
item.row_index,
)
})
.collect();
assert_eq!(
persisted_summary,
vec![
("root".to_string(), None, None, 0),
(
"existing".to_string(),
Some("root".to_string()),
Some("existing-dedupe".to_string()),
1,
),
(
"foo".to_string(),
Some("root".to_string()),
Some("new-a".to_string()),
2,
),
(
"foo-2".to_string(),
Some("root".to_string()),
Some("new-b".to_string()),
3,
),
]
);
}
#[tokio::test]
async fn enqueue_agent_job_items_rejects_unowned_parent() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home, "test-provider".to_string())
.await
.expect("state db should initialize");
let job = runtime
.create_agent_job(
&queue_job("job-queue-parent-check", Some(8)),
&[item("root", 0, None, None, "root")],
)
.await
.expect("create queue job should succeed");
assert!(
runtime
.mark_agent_job_item_running_with_thread(job.id.as_str(), "root", "thread-a")
.await
.expect("mark parent running should succeed"),
"parent item should become running"
);
let err = runtime
.enqueue_agent_job_items(
job.id.as_str(),
"root",
"thread-b",
&[item("child", -1, None, Some("child-a"), "child")],
)
.await
.expect_err("enqueue should reject unowned parent");
assert!(
err.to_string().contains("not owned by thread thread-b"),
"unexpected error: {err:?}"
);
}
}