mirror of
https://github.com/openai/codex.git
synced 2026-03-13 02:03:59 +00:00
Compare commits
1 Commits
dev/cc/mul
...
codex/dyna
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0eb7c42ae1 |
@@ -50,6 +50,37 @@ struct SpawnAgentsOnCsvArgs {
|
||||
max_runtime_seconds: Option<u64>,
|
||||
}
|
||||
|
||||
const DEFAULT_AGENT_QUEUE_MAX_ITEMS: usize = 1000;
|
||||
const CSV_AGENT_JOB_SOURCE_PREFIX: &str = "agent_job:csv:";
|
||||
const QUEUE_AGENT_JOB_SOURCE_PREFIX: &str = "agent_job:queue:";
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, PartialEq)]
|
||||
struct QueueJobItemArgs {
|
||||
input: Value,
|
||||
item_id: Option<String>,
|
||||
dedupe_key: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct SpawnAgentsOnQueueArgs {
|
||||
seed_items: Option<Vec<QueueJobItemArgs>>,
|
||||
seed_path: Option<String>,
|
||||
instruction: String,
|
||||
output_jsonl_path: Option<String>,
|
||||
output_schema: Option<Value>,
|
||||
max_concurrency: Option<usize>,
|
||||
max_workers: Option<usize>,
|
||||
max_runtime_seconds: Option<u64>,
|
||||
max_items: Option<usize>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct EnqueueAgentJobItemsArgs {
|
||||
job_id: String,
|
||||
parent_item_id: String,
|
||||
items: Vec<QueueJobItemArgs>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct ReportAgentJobResultArgs {
|
||||
job_id: String,
|
||||
@@ -70,6 +101,25 @@ struct SpawnAgentsOnCsvResult {
|
||||
failed_item_errors: Option<Vec<AgentJobFailureSummary>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct SpawnAgentsOnQueueResult {
|
||||
job_id: String,
|
||||
status: String,
|
||||
output_jsonl_path: String,
|
||||
total_items: usize,
|
||||
pending_items: usize,
|
||||
completed_items: usize,
|
||||
failed_items: usize,
|
||||
job_error: Option<String>,
|
||||
failed_item_errors: Option<Vec<QueueAgentJobFailureSummary>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct QueueAgentJobFailureSummary {
|
||||
item_id: String,
|
||||
dedupe_key: Option<String>,
|
||||
last_error: String,
|
||||
}
|
||||
#[derive(Debug, Serialize)]
|
||||
struct AgentJobFailureSummary {
|
||||
item_id: String,
|
||||
@@ -93,6 +143,22 @@ struct ReportAgentJobResultToolResult {
|
||||
accepted: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct EnqueueAgentJobItemsToolResult {
|
||||
accepted_count: usize,
|
||||
duplicate_count: usize,
|
||||
rejected_count: usize,
|
||||
items: Vec<EnqueueAgentJobItemsToolItemResult>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct EnqueueAgentJobItemsToolItemResult {
|
||||
index: usize,
|
||||
status: String,
|
||||
item_id: Option<String>,
|
||||
dedupe_key: Option<String>,
|
||||
reason: Option<String>,
|
||||
}
|
||||
#[derive(Debug, Clone)]
|
||||
struct JobRunnerOptions {
|
||||
max_concurrency: usize,
|
||||
@@ -203,7 +269,15 @@ impl ToolHandler for BatchJobHandler {
|
||||
};
|
||||
|
||||
match tool_name.as_str() {
|
||||
"spawn_agents_on_csv" => spawn_agents_on_csv::handle(session, turn, arguments).await,
|
||||
"spawn_agents_on_csv" => {
|
||||
spawn_agents_on_csv::handle(session.clone(), turn.clone(), arguments).await
|
||||
}
|
||||
"spawn_agents_on_queue" => {
|
||||
spawn_agents_on_queue::handle(session.clone(), turn, arguments).await
|
||||
}
|
||||
"enqueue_agent_job_items" => {
|
||||
enqueue_agent_job_items::handle(session.clone(), arguments).await
|
||||
}
|
||||
"report_agent_job_result" => report_agent_job_result::handle(session, arguments).await,
|
||||
other => Err(FunctionCallError::RespondToModel(format!(
|
||||
"unsupported agent job tool {other}"
|
||||
@@ -297,8 +371,10 @@ mod spawn_agents_on_csv {
|
||||
.collect::<serde_json::Map<_, _>>();
|
||||
items.push(codex_state::AgentJobItemCreateParams {
|
||||
item_id,
|
||||
parent_item_id: None,
|
||||
row_index: idx as i64,
|
||||
source_id,
|
||||
dedupe_key: None,
|
||||
row_json: Value::Object(row_object),
|
||||
});
|
||||
}
|
||||
@@ -314,89 +390,36 @@ mod spawn_agents_on_csv {
|
||||
args.max_runtime_seconds
|
||||
.or(turn.config.agent_job_max_runtime_seconds),
|
||||
)?;
|
||||
let _job = db
|
||||
.create_agent_job(
|
||||
&codex_state::AgentJobCreateParams {
|
||||
id: job_id.clone(),
|
||||
name: job_name,
|
||||
instruction: args.instruction,
|
||||
auto_export: true,
|
||||
max_runtime_seconds,
|
||||
output_schema_json: args.output_schema,
|
||||
input_headers: headers,
|
||||
input_csv_path: input_path.display().to_string(),
|
||||
output_csv_path: output_csv_path.display().to_string(),
|
||||
},
|
||||
items.as_slice(),
|
||||
)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
FunctionCallError::RespondToModel(format!("failed to create agent job: {err}"))
|
||||
})?;
|
||||
|
||||
let requested_concurrency = args.max_concurrency.or(args.max_workers);
|
||||
let options = match build_runner_options(&session, &turn, requested_concurrency).await {
|
||||
Ok(options) => options,
|
||||
Err(err) => {
|
||||
let error_message = err.to_string();
|
||||
let _ = db
|
||||
.mark_agent_job_failed(job_id.as_str(), error_message.as_str())
|
||||
.await;
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
db.mark_agent_job_running(job_id.as_str())
|
||||
.await
|
||||
.map_err(|err| {
|
||||
FunctionCallError::RespondToModel(format!(
|
||||
"failed to transition agent job {job_id} to running: {err}"
|
||||
))
|
||||
})?;
|
||||
let max_threads = turn.config.agent_max_threads;
|
||||
let effective_concurrency = options.max_concurrency;
|
||||
let message = format!(
|
||||
"agent job concurrency: job_id={job_id} requested={requested_concurrency:?} max_threads={max_threads:?} effective={effective_concurrency}"
|
||||
);
|
||||
let _ = session.notify_background_event(&turn, message).await;
|
||||
if let Err(err) = run_agent_job_loop(
|
||||
session.clone(),
|
||||
turn.clone(),
|
||||
db.clone(),
|
||||
job_id.clone(),
|
||||
options,
|
||||
db.create_agent_job(
|
||||
&codex_state::AgentJobCreateParams {
|
||||
id: job_id.clone(),
|
||||
name: job_name,
|
||||
kind: codex_state::AgentJobKind::CsvBatch,
|
||||
instruction: args.instruction,
|
||||
auto_export: true,
|
||||
max_items: None,
|
||||
max_runtime_seconds,
|
||||
output_schema_json: args.output_schema,
|
||||
input_headers: headers,
|
||||
input_csv_path: input_path.display().to_string(),
|
||||
output_csv_path: output_csv_path.display().to_string(),
|
||||
},
|
||||
items.as_slice(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
let error_message = format!("job runner failed: {err}");
|
||||
let _ = db
|
||||
.mark_agent_job_failed(job_id.as_str(), error_message.as_str())
|
||||
.await;
|
||||
return Err(FunctionCallError::RespondToModel(format!(
|
||||
"agent job {job_id} failed: {err}"
|
||||
)));
|
||||
}
|
||||
.map_err(|err| {
|
||||
FunctionCallError::RespondToModel(format!("failed to create agent job: {err}"))
|
||||
})?;
|
||||
|
||||
let job = db
|
||||
.get_agent_job(job_id.as_str())
|
||||
.await
|
||||
.map_err(|err| {
|
||||
FunctionCallError::RespondToModel(format!(
|
||||
"failed to load agent job {job_id}: {err}"
|
||||
))
|
||||
})?
|
||||
.ok_or_else(|| {
|
||||
FunctionCallError::RespondToModel(format!("agent job {job_id} not found"))
|
||||
})?;
|
||||
let output_path = PathBuf::from(job.output_csv_path.clone());
|
||||
if !tokio::fs::try_exists(&output_path).await.unwrap_or(false) {
|
||||
export_job_csv_snapshot(db.clone(), &job)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
FunctionCallError::RespondToModel(format!(
|
||||
"failed to export output csv {job_id}: {err}"
|
||||
))
|
||||
})?;
|
||||
}
|
||||
let requested_concurrency = args.max_concurrency.or(args.max_workers);
|
||||
let job = run_agent_job_and_load(
|
||||
session,
|
||||
turn,
|
||||
db.clone(),
|
||||
job_id.clone(),
|
||||
requested_concurrency,
|
||||
)
|
||||
.await?;
|
||||
let progress = db
|
||||
.get_agent_job_progress(job_id.as_str())
|
||||
.await
|
||||
@@ -461,6 +484,176 @@ mod spawn_agents_on_csv {
|
||||
}
|
||||
}
|
||||
|
||||
mod spawn_agents_on_queue {
|
||||
use super::*;
|
||||
|
||||
pub async fn handle(
|
||||
session: Arc<Session>,
|
||||
turn: Arc<TurnContext>,
|
||||
arguments: String,
|
||||
) -> Result<FunctionToolOutput, FunctionCallError> {
|
||||
let args: SpawnAgentsOnQueueArgs = parse_arguments(arguments.as_str())?;
|
||||
if args.instruction.trim().is_empty() {
|
||||
return Err(FunctionCallError::RespondToModel(
|
||||
"instruction must be non-empty".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
let db = required_state_db(&session)?;
|
||||
let max_items = normalize_max_items(args.max_items)?;
|
||||
let (seed_items, seed_path) =
|
||||
parse_queue_seed_inputs(&turn, args.seed_items, args.seed_path).await?;
|
||||
let items = build_initial_queue_job_items(seed_items.as_slice(), max_items)?;
|
||||
|
||||
let job_id = Uuid::new_v4().to_string();
|
||||
let output_jsonl_path = args.output_jsonl_path.map_or_else(
|
||||
|| {
|
||||
seed_path.as_ref().map_or_else(
|
||||
|| default_output_jsonl_path_for_inline(&turn, job_id.as_str()),
|
||||
|path| default_output_jsonl_path_for_seed(path.as_path(), job_id.as_str()),
|
||||
)
|
||||
},
|
||||
|path| turn.resolve_path(Some(path)),
|
||||
);
|
||||
let job_suffix = &job_id[..8];
|
||||
let job_name = format!("agent-queue-{job_suffix}");
|
||||
let max_runtime_seconds = normalize_max_runtime_seconds(
|
||||
args.max_runtime_seconds
|
||||
.or(turn.config.agent_job_max_runtime_seconds),
|
||||
)?;
|
||||
db.create_agent_job(
|
||||
&codex_state::AgentJobCreateParams {
|
||||
id: job_id.clone(),
|
||||
name: job_name,
|
||||
kind: codex_state::AgentJobKind::DynamicQueue,
|
||||
instruction: args.instruction,
|
||||
auto_export: true,
|
||||
max_items: Some(max_items as u64),
|
||||
max_runtime_seconds,
|
||||
output_schema_json: args.output_schema,
|
||||
input_headers: Vec::new(),
|
||||
input_csv_path: seed_path
|
||||
.as_ref()
|
||||
.map(|path| path.display().to_string())
|
||||
.unwrap_or_default(),
|
||||
output_csv_path: output_jsonl_path.display().to_string(),
|
||||
},
|
||||
items.as_slice(),
|
||||
)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
FunctionCallError::RespondToModel(format!("failed to create agent job: {err}"))
|
||||
})?;
|
||||
|
||||
let requested_concurrency = args.max_concurrency.or(args.max_workers);
|
||||
let job = run_agent_job_and_load(
|
||||
session,
|
||||
turn,
|
||||
db.clone(),
|
||||
job_id.clone(),
|
||||
requested_concurrency,
|
||||
)
|
||||
.await?;
|
||||
let progress = db
|
||||
.get_agent_job_progress(job_id.as_str())
|
||||
.await
|
||||
.map_err(|err| {
|
||||
FunctionCallError::RespondToModel(format!(
|
||||
"failed to load agent job progress {job_id}: {err}"
|
||||
))
|
||||
})?;
|
||||
let mut job_error = job.last_error.clone().filter(|err| !err.trim().is_empty());
|
||||
let failed_item_errors = if progress.failed_items > 0 {
|
||||
let items = db
|
||||
.list_agent_job_items(
|
||||
job_id.as_str(),
|
||||
Some(codex_state::AgentJobItemStatus::Failed),
|
||||
Some(5),
|
||||
)
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
let summaries: Vec<_> = items
|
||||
.into_iter()
|
||||
.filter_map(|item| {
|
||||
let last_error = item.last_error.unwrap_or_default();
|
||||
if last_error.trim().is_empty() {
|
||||
return None;
|
||||
}
|
||||
Some(QueueAgentJobFailureSummary {
|
||||
item_id: item.item_id,
|
||||
dedupe_key: item.dedupe_key,
|
||||
last_error,
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
if summaries.is_empty() {
|
||||
if job_error.is_none() {
|
||||
job_error = Some(
|
||||
"agent job has failed items but no error details were recorded".to_string(),
|
||||
);
|
||||
}
|
||||
None
|
||||
} else {
|
||||
Some(summaries)
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let content = serde_json::to_string(&SpawnAgentsOnQueueResult {
|
||||
job_id,
|
||||
status: job.status.as_str().to_string(),
|
||||
output_jsonl_path: job.output_csv_path,
|
||||
total_items: progress.total_items,
|
||||
pending_items: progress.pending_items,
|
||||
completed_items: progress.completed_items,
|
||||
failed_items: progress.failed_items,
|
||||
job_error,
|
||||
failed_item_errors,
|
||||
})
|
||||
.map_err(|err| {
|
||||
FunctionCallError::Fatal(format!(
|
||||
"failed to serialize spawn_agents_on_queue result: {err}"
|
||||
))
|
||||
})?;
|
||||
Ok(FunctionToolOutput::from_text(content, Some(true)))
|
||||
}
|
||||
}
|
||||
|
||||
mod enqueue_agent_job_items {
|
||||
use super::*;
|
||||
|
||||
pub async fn handle(
|
||||
session: Arc<Session>,
|
||||
arguments: String,
|
||||
) -> Result<FunctionToolOutput, FunctionCallError> {
|
||||
let args: EnqueueAgentJobItemsArgs = parse_arguments(arguments.as_str())?;
|
||||
let db = required_state_db(&session)?;
|
||||
let reporting_thread_id = session.conversation_id.to_string();
|
||||
let items = build_enqueued_queue_job_items(args.items.as_slice())?;
|
||||
let result = db
|
||||
.enqueue_agent_job_items(
|
||||
args.job_id.as_str(),
|
||||
args.parent_item_id.as_str(),
|
||||
reporting_thread_id.as_str(),
|
||||
items.as_slice(),
|
||||
)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
FunctionCallError::RespondToModel(format!(
|
||||
"failed to enqueue queue items for {} / {}: {err}",
|
||||
args.job_id, args.parent_item_id
|
||||
))
|
||||
})?;
|
||||
let content = serde_json::to_string(&build_enqueue_agent_job_items_tool_result(&result))
|
||||
.map_err(|err| {
|
||||
FunctionCallError::Fatal(format!(
|
||||
"failed to serialize enqueue_agent_job_items result: {err}"
|
||||
))
|
||||
})?;
|
||||
Ok(FunctionToolOutput::from_text(content, Some(true)))
|
||||
}
|
||||
}
|
||||
|
||||
mod report_agent_job_result {
|
||||
use super::*;
|
||||
|
||||
@@ -507,6 +700,73 @@ mod report_agent_job_result {
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_agent_job_and_load(
|
||||
session: Arc<Session>,
|
||||
turn: Arc<TurnContext>,
|
||||
db: Arc<codex_state::StateRuntime>,
|
||||
job_id: String,
|
||||
requested_concurrency: Option<usize>,
|
||||
) -> Result<codex_state::AgentJob, FunctionCallError> {
|
||||
let options = match build_runner_options(&session, &turn, requested_concurrency).await {
|
||||
Ok(options) => options,
|
||||
Err(err) => {
|
||||
let error_message = err.to_string();
|
||||
let _ = db
|
||||
.mark_agent_job_failed(job_id.as_str(), error_message.as_str())
|
||||
.await;
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
db.mark_agent_job_running(job_id.as_str())
|
||||
.await
|
||||
.map_err(|err| {
|
||||
FunctionCallError::RespondToModel(format!(
|
||||
"failed to transition agent job {job_id} to running: {err}"
|
||||
))
|
||||
})?;
|
||||
let max_threads = turn.config.agent_max_threads;
|
||||
let effective_concurrency = options.max_concurrency;
|
||||
let message = format!(
|
||||
"agent job concurrency: job_id={job_id} requested={requested_concurrency:?} max_threads={max_threads:?} effective={effective_concurrency}"
|
||||
);
|
||||
let _ = session.notify_background_event(&turn, message).await;
|
||||
if let Err(err) = run_agent_job_loop(
|
||||
session.clone(),
|
||||
turn.clone(),
|
||||
db.clone(),
|
||||
job_id.clone(),
|
||||
options,
|
||||
)
|
||||
.await
|
||||
{
|
||||
let error_message = format!("job runner failed: {err}");
|
||||
let _ = db
|
||||
.mark_agent_job_failed(job_id.as_str(), error_message.as_str())
|
||||
.await;
|
||||
return Err(FunctionCallError::RespondToModel(format!(
|
||||
"agent job {job_id} failed: {err}"
|
||||
)));
|
||||
}
|
||||
|
||||
let job = db
|
||||
.get_agent_job(job_id.as_str())
|
||||
.await
|
||||
.map_err(|err| {
|
||||
FunctionCallError::RespondToModel(format!("failed to load agent job {job_id}: {err}"))
|
||||
})?
|
||||
.ok_or_else(|| {
|
||||
FunctionCallError::RespondToModel(format!("agent job {job_id} not found"))
|
||||
})?;
|
||||
let output_path = PathBuf::from(job.output_csv_path.clone());
|
||||
if !tokio::fs::try_exists(&output_path).await.unwrap_or(false) {
|
||||
export_job_snapshot(db, &job).await.map_err(|err| {
|
||||
FunctionCallError::RespondToModel(format!(
|
||||
"failed to export output for agent job {job_id}: {err}"
|
||||
))
|
||||
})?;
|
||||
}
|
||||
Ok(job)
|
||||
}
|
||||
fn required_state_db(
|
||||
session: &Arc<Session>,
|
||||
) -> Result<Arc<codex_state::StateRuntime>, FunctionCallError> {
|
||||
@@ -560,6 +820,221 @@ fn normalize_max_runtime_seconds(requested: Option<u64>) -> Result<Option<u64>,
|
||||
Ok(Some(requested))
|
||||
}
|
||||
|
||||
fn normalize_max_items(requested: Option<usize>) -> Result<usize, FunctionCallError> {
|
||||
let max_items = requested.unwrap_or(DEFAULT_AGENT_QUEUE_MAX_ITEMS);
|
||||
if max_items == 0 {
|
||||
return Err(FunctionCallError::RespondToModel(
|
||||
"max_items must be >= 1".to_string(),
|
||||
));
|
||||
}
|
||||
if max_items > i64::MAX as usize {
|
||||
return Err(FunctionCallError::RespondToModel(
|
||||
"max_items is too large".to_string(),
|
||||
));
|
||||
}
|
||||
Ok(max_items)
|
||||
}
|
||||
|
||||
async fn parse_queue_seed_inputs(
|
||||
turn: &Arc<TurnContext>,
|
||||
seed_items: Option<Vec<QueueJobItemArgs>>,
|
||||
seed_path: Option<String>,
|
||||
) -> Result<(Vec<QueueJobItemArgs>, Option<PathBuf>), FunctionCallError> {
|
||||
match (seed_items, seed_path) {
|
||||
(Some(items), None) => Ok((items, None)),
|
||||
(None, Some(seed_path)) => {
|
||||
if seed_path.trim().is_empty() {
|
||||
return Err(FunctionCallError::RespondToModel(
|
||||
"seed_path must be non-empty".to_string(),
|
||||
));
|
||||
}
|
||||
let resolved_path = turn.resolve_path(Some(seed_path));
|
||||
let display_path = resolved_path.display().to_string();
|
||||
let content = tokio::fs::read_to_string(&resolved_path)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
FunctionCallError::RespondToModel(format!(
|
||||
"failed to read queue seed input {display_path}: {err}"
|
||||
))
|
||||
})?;
|
||||
let items = parse_queue_seed_content(content.as_str())?;
|
||||
Ok((items, Some(resolved_path)))
|
||||
}
|
||||
(Some(_), Some(_)) | (None, None) => Err(FunctionCallError::RespondToModel(
|
||||
"provide exactly one of seed_items or seed_path".to_string(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_queue_seed_content(content: &str) -> Result<Vec<QueueJobItemArgs>, FunctionCallError> {
|
||||
let trimmed = content.trim();
|
||||
if trimmed.is_empty() {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
if trimmed.starts_with('[') {
|
||||
return serde_json::from_str(trimmed).map_err(|err| {
|
||||
FunctionCallError::RespondToModel(format!(
|
||||
"failed to parse queue seed JSON array: {err}"
|
||||
))
|
||||
});
|
||||
}
|
||||
|
||||
let mut items = Vec::new();
|
||||
for (index, line) in content.lines().enumerate() {
|
||||
let trimmed_line = line.trim();
|
||||
if trimmed_line.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let item = serde_json::from_str::<QueueJobItemArgs>(trimmed_line).map_err(|err| {
|
||||
let line_number = index + 1;
|
||||
FunctionCallError::RespondToModel(format!(
|
||||
"failed to parse queue seed JSONL line {line_number}: {err}"
|
||||
))
|
||||
})?;
|
||||
items.push(item);
|
||||
}
|
||||
Ok(items)
|
||||
}
|
||||
|
||||
fn build_initial_queue_job_items(
|
||||
items: &[QueueJobItemArgs],
|
||||
max_items: usize,
|
||||
) -> Result<Vec<codex_state::AgentJobItemCreateParams>, FunctionCallError> {
|
||||
let mut created_items = Vec::with_capacity(items.len().min(max_items));
|
||||
let mut seen_item_ids = HashSet::new();
|
||||
let mut seen_dedupe_keys = HashSet::new();
|
||||
|
||||
for (index, item) in items.iter().enumerate() {
|
||||
let row_json = queue_item_input_object(item, index, "queue seed item")?;
|
||||
let dedupe_key = normalize_optional_queue_text(item.dedupe_key.as_deref());
|
||||
if let Some(dedupe_key) = dedupe_key.as_ref()
|
||||
&& !seen_dedupe_keys.insert(dedupe_key.clone())
|
||||
{
|
||||
continue;
|
||||
}
|
||||
if created_items.len() >= max_items {
|
||||
return Err(FunctionCallError::RespondToModel(format!(
|
||||
"queue seeds exceed max_items limit of {max_items}"
|
||||
)));
|
||||
}
|
||||
|
||||
let base_item_id = normalize_optional_queue_text(item.item_id.as_deref())
|
||||
.unwrap_or_else(|| format!("item-{}", index + 1));
|
||||
let mut item_id = base_item_id.clone();
|
||||
let mut suffix = 2usize;
|
||||
while !seen_item_ids.insert(item_id.clone()) {
|
||||
item_id = format!("{base_item_id}-{suffix}");
|
||||
suffix = suffix.saturating_add(1);
|
||||
}
|
||||
|
||||
created_items.push(codex_state::AgentJobItemCreateParams {
|
||||
item_id,
|
||||
parent_item_id: None,
|
||||
row_index: created_items.len() as i64,
|
||||
source_id: None,
|
||||
dedupe_key,
|
||||
row_json,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(created_items)
|
||||
}
|
||||
|
||||
fn build_enqueued_queue_job_items(
|
||||
items: &[QueueJobItemArgs],
|
||||
) -> Result<Vec<codex_state::AgentJobItemCreateParams>, FunctionCallError> {
|
||||
let mut created_items = Vec::with_capacity(items.len());
|
||||
for (index, item) in items.iter().enumerate() {
|
||||
let row_json = queue_item_input_object(item, index, "enqueue item")?;
|
||||
let item_id = normalize_optional_queue_text(item.item_id.as_deref())
|
||||
.unwrap_or_else(|| format!("item-{}", index + 1));
|
||||
created_items.push(codex_state::AgentJobItemCreateParams {
|
||||
item_id,
|
||||
parent_item_id: None,
|
||||
row_index: index as i64,
|
||||
source_id: None,
|
||||
dedupe_key: normalize_optional_queue_text(item.dedupe_key.as_deref()),
|
||||
row_json,
|
||||
});
|
||||
}
|
||||
Ok(created_items)
|
||||
}
|
||||
|
||||
fn queue_item_input_object(
|
||||
item: &QueueJobItemArgs,
|
||||
index: usize,
|
||||
item_label: &str,
|
||||
) -> Result<Value, FunctionCallError> {
|
||||
let Some(object) = item.input.as_object() else {
|
||||
let item_number = index + 1;
|
||||
return Err(FunctionCallError::RespondToModel(format!(
|
||||
"{item_label} {item_number} input must be a JSON object"
|
||||
)));
|
||||
};
|
||||
Ok(Value::Object(object.clone()))
|
||||
}
|
||||
|
||||
fn normalize_optional_queue_text(value: Option<&str>) -> Option<String> {
|
||||
value
|
||||
.filter(|value| !value.trim().is_empty())
|
||||
.map(str::to_string)
|
||||
}
|
||||
|
||||
fn build_enqueue_agent_job_items_tool_result(
|
||||
result: &codex_state::EnqueueAgentJobItemsResult,
|
||||
) -> EnqueueAgentJobItemsToolResult {
|
||||
let mut accepted_count = 0usize;
|
||||
let mut duplicate_count = 0usize;
|
||||
let mut rejected_count = 0usize;
|
||||
let mut items = Vec::with_capacity(result.outcomes.len());
|
||||
|
||||
for (index, outcome) in result.outcomes.iter().enumerate() {
|
||||
match outcome {
|
||||
codex_state::EnqueueAgentJobItemOutcome::Enqueued { item } => {
|
||||
accepted_count = accepted_count.saturating_add(1);
|
||||
items.push(EnqueueAgentJobItemsToolItemResult {
|
||||
index,
|
||||
status: "accepted".to_string(),
|
||||
item_id: Some(item.item_id.clone()),
|
||||
dedupe_key: item.dedupe_key.clone(),
|
||||
reason: None,
|
||||
});
|
||||
}
|
||||
codex_state::EnqueueAgentJobItemOutcome::Deduped { item } => {
|
||||
duplicate_count = duplicate_count.saturating_add(1);
|
||||
items.push(EnqueueAgentJobItemsToolItemResult {
|
||||
index,
|
||||
status: "duplicate".to_string(),
|
||||
item_id: Some(item.item_id.clone()),
|
||||
dedupe_key: item.dedupe_key.clone(),
|
||||
reason: None,
|
||||
});
|
||||
}
|
||||
codex_state::EnqueueAgentJobItemOutcome::MaxItemsReached {
|
||||
requested_item_id,
|
||||
dedupe_key,
|
||||
..
|
||||
} => {
|
||||
rejected_count = rejected_count.saturating_add(1);
|
||||
items.push(EnqueueAgentJobItemsToolItemResult {
|
||||
index,
|
||||
status: "rejected".to_string(),
|
||||
item_id: Some(requested_item_id.clone()),
|
||||
dedupe_key: dedupe_key.clone(),
|
||||
reason: Some("max_items_reached".to_string()),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
EnqueueAgentJobItemsToolResult {
|
||||
accepted_count,
|
||||
duplicate_count,
|
||||
rejected_count,
|
||||
items,
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_agent_job_loop(
|
||||
session: Arc<Session>,
|
||||
turn: Arc<TurnContext>,
|
||||
@@ -588,6 +1063,12 @@ async fn run_agent_job_loop(
|
||||
.await?;
|
||||
|
||||
let mut cancel_requested = db.is_agent_job_cancelled(job_id.as_str()).await?;
|
||||
let worker_source_label = match job.kind {
|
||||
codex_state::AgentJobKind::CsvBatch => format!("{CSV_AGENT_JOB_SOURCE_PREFIX}{job_id}"),
|
||||
codex_state::AgentJobKind::DynamicQueue => {
|
||||
format!("{QUEUE_AGENT_JOB_SOURCE_PREFIX}{job_id}")
|
||||
}
|
||||
};
|
||||
loop {
|
||||
let mut progressed = false;
|
||||
|
||||
@@ -622,9 +1103,9 @@ async fn run_agent_job_loop(
|
||||
.spawn_agent(
|
||||
options.spawn_config.clone(),
|
||||
items,
|
||||
Some(SessionSource::SubAgent(SubAgentSource::Other(format!(
|
||||
"agent_job:{job_id}"
|
||||
)))),
|
||||
Some(SessionSource::SubAgent(SubAgentSource::Other(
|
||||
worker_source_label.clone(),
|
||||
))),
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -725,7 +1206,7 @@ async fn run_agent_job_loop(
|
||||
}
|
||||
|
||||
let progress = db.get_agent_job_progress(job_id.as_str()).await?;
|
||||
if let Err(err) = export_job_csv_snapshot(db.clone(), &job).await {
|
||||
if let Err(err) = export_job_snapshot(db.clone(), &job).await {
|
||||
let message = format!("auto-export failed: {err}");
|
||||
db.mark_agent_job_failed(job_id.as_str(), message.as_str())
|
||||
.await?;
|
||||
@@ -755,6 +1236,16 @@ async fn run_agent_job_loop(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn export_job_snapshot(
|
||||
db: Arc<codex_state::StateRuntime>,
|
||||
job: &codex_state::AgentJob,
|
||||
) -> anyhow::Result<()> {
|
||||
match job.kind {
|
||||
codex_state::AgentJobKind::CsvBatch => export_job_csv_snapshot(db, job).await,
|
||||
codex_state::AgentJobKind::DynamicQueue => export_job_queue_snapshot(db, job).await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn export_job_csv_snapshot(
|
||||
db: Arc<codex_state::StateRuntime>,
|
||||
job: &codex_state::AgentJob,
|
||||
@@ -770,6 +1261,21 @@ async fn export_job_csv_snapshot(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn export_job_queue_snapshot(
|
||||
db: Arc<codex_state::StateRuntime>,
|
||||
job: &codex_state::AgentJob,
|
||||
) -> anyhow::Result<()> {
|
||||
let items = db.list_agent_job_items(job.id.as_str(), None, None).await?;
|
||||
let jsonl_content = render_job_queue_jsonl(items.as_slice())
|
||||
.map_err(|err| anyhow::anyhow!("failed to render job queue snapshot: {err}"))?;
|
||||
let output_path = PathBuf::from(job.output_csv_path.clone());
|
||||
if let Some(parent) = output_path.parent() {
|
||||
tokio::fs::create_dir_all(parent).await?;
|
||||
}
|
||||
tokio::fs::write(&output_path, jsonl_content).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn recover_running_items(
|
||||
session: Arc<Session>,
|
||||
db: Arc<codex_state::StateRuntime>,
|
||||
@@ -944,23 +1450,93 @@ fn build_worker_prompt(
|
||||
.transpose()?
|
||||
.unwrap_or_else(|| "{}".to_string());
|
||||
let row_json = serde_json::to_string_pretty(&item.row_json)?;
|
||||
Ok(format!(
|
||||
"You are processing one item for a generic agent job.\n\
|
||||
Job ID: {job_id}\n\
|
||||
Item ID: {item_id}\n\n\
|
||||
Task instruction:\n\
|
||||
{instruction}\n\n\
|
||||
Input row (JSON):\n\
|
||||
{row_json}\n\n\
|
||||
Expected result schema (JSON Schema or {{}}):\n\
|
||||
{output_schema}\n\n\
|
||||
You MUST call the `report_agent_job_result` tool exactly once with:\n\
|
||||
1. `job_id` = \"{job_id}\"\n\
|
||||
2. `item_id` = \"{item_id}\"\n\
|
||||
3. `result` = a JSON object that contains your analysis result for this row.\n\n\
|
||||
If you need to stop the job early, include `stop` = true in the tool call.\n\n\
|
||||
match job.kind {
|
||||
codex_state::AgentJobKind::CsvBatch => Ok(format!(
|
||||
"You are processing one item for a generic agent job.
|
||||
\
|
||||
Job ID: {job_id}
|
||||
\
|
||||
Item ID: {item_id}
|
||||
|
||||
\
|
||||
Task instruction:
|
||||
\
|
||||
{instruction}
|
||||
|
||||
\
|
||||
Input row (JSON):
|
||||
\
|
||||
{row_json}
|
||||
|
||||
\
|
||||
Expected result schema (JSON Schema or {{}}):
|
||||
\
|
||||
{output_schema}
|
||||
|
||||
\
|
||||
You MUST call the `report_agent_job_result` tool exactly once with:
|
||||
\
|
||||
1. `job_id` = \"{job_id}\"
|
||||
\
|
||||
2. `item_id` = \"{item_id}\"
|
||||
\
|
||||
3. `result` = a JSON object that contains your analysis result for this row.
|
||||
|
||||
\
|
||||
If you need to stop the job early, include `stop` = true in the tool call.
|
||||
|
||||
\
|
||||
After the tool call succeeds, stop.",
|
||||
))
|
||||
)),
|
||||
codex_state::AgentJobKind::DynamicQueue => Ok(format!(
|
||||
"You are processing one item in a queue-draining agent job.
|
||||
\
|
||||
Job ID: {job_id}
|
||||
\
|
||||
Item ID: {item_id}
|
||||
|
||||
\
|
||||
Task instruction:
|
||||
\
|
||||
{instruction}
|
||||
|
||||
\
|
||||
Input item (JSON):
|
||||
\
|
||||
{row_json}
|
||||
|
||||
\
|
||||
Expected result schema (JSON Schema or {{}}):
|
||||
\
|
||||
{output_schema}
|
||||
|
||||
\
|
||||
You may call the `enqueue_agent_job_items` tool zero or more times to add discovered work.
|
||||
\
|
||||
When calling `enqueue_agent_job_items`, use:
|
||||
\
|
||||
1. `job_id` = \"{job_id}\"
|
||||
\
|
||||
2. `parent_item_id` = \"{item_id}\"
|
||||
\
|
||||
3. `items` = an array of queue items, each with an `input` object and optional `item_id` and `dedupe_key`.
|
||||
|
||||
\
|
||||
You MUST call the `report_agent_job_result` tool exactly once with:
|
||||
\
|
||||
1. `job_id` = \"{job_id}\"
|
||||
\
|
||||
2. `item_id` = \"{item_id}\"
|
||||
\
|
||||
3. `result` = a JSON object that contains your analysis result for this item.
|
||||
|
||||
\
|
||||
If you need to stop the entire job early, include `stop` = true in the `report_agent_job_result` tool call.
|
||||
|
||||
\
|
||||
After the final `report_agent_job_result` tool call succeeds, stop.",
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
fn render_instruction_template(instruction: &str, row_json: &Value) -> String {
|
||||
@@ -1034,6 +1610,52 @@ fn default_output_csv_path(input_csv_path: &Path, job_id: &str) -> PathBuf {
|
||||
input_csv_path.with_file_name(format!("{stem}.agent-job-{job_suffix}.csv"))
|
||||
}
|
||||
|
||||
fn default_output_jsonl_path_for_inline(turn: &TurnContext, job_id: &str) -> PathBuf {
|
||||
turn.resolve_path(Some(format!("agent-queue-{}.jsonl", &job_id[..8])))
|
||||
}
|
||||
|
||||
fn default_output_jsonl_path_for_seed(seed_path: &Path, job_id: &str) -> PathBuf {
|
||||
let stem = seed_path
|
||||
.file_stem()
|
||||
.and_then(|value| value.to_str())
|
||||
.filter(|value| !value.is_empty())
|
||||
.unwrap_or("agent-queue");
|
||||
let filename = format!("{stem}.agent-queue-{}.jsonl", &job_id[..8]);
|
||||
seed_path.with_file_name(filename)
|
||||
}
|
||||
|
||||
fn render_job_queue_jsonl(items: &[codex_state::AgentJobItem]) -> serde_json::Result<String> {
|
||||
let mut lines = Vec::with_capacity(items.len());
|
||||
for item in items {
|
||||
lines.push(serde_json::to_string(&serde_json::json!({
|
||||
"job_id": &item.job_id,
|
||||
"item_id": &item.item_id,
|
||||
"parent_item_id": &item.parent_item_id,
|
||||
"dedupe_key": &item.dedupe_key,
|
||||
"row_index": item.row_index,
|
||||
"status": item.status.as_str(),
|
||||
"attempt_count": item.attempt_count,
|
||||
"input": &item.row_json,
|
||||
"result": &item.result_json,
|
||||
"last_error": &item.last_error,
|
||||
"reported_at": item.reported_at.map(|value| value.to_rfc3339()),
|
||||
"completed_at": item.completed_at.map(|value| value.to_rfc3339()),
|
||||
}))?);
|
||||
}
|
||||
if lines.is_empty() {
|
||||
Ok(String::new())
|
||||
} else {
|
||||
Ok(format!(
|
||||
"{}
|
||||
",
|
||||
lines.join(
|
||||
"
|
||||
"
|
||||
)
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_csv(content: &str) -> Result<(Vec<String>, Vec<Vec<String>>), String> {
|
||||
let mut reader = csv::ReaderBuilder::new()
|
||||
.has_headers(true)
|
||||
|
||||
@@ -1,7 +1,65 @@
|
||||
use super::*;
|
||||
use chrono::TimeZone;
|
||||
use chrono::Utc;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
|
||||
fn sample_queue_job() -> codex_state::AgentJob {
|
||||
let now = Utc
|
||||
.timestamp_opt(1_700_000_000, 0)
|
||||
.single()
|
||||
.expect("timestamp");
|
||||
codex_state::AgentJob {
|
||||
id: "job-1".to_string(),
|
||||
name: "queue-job".to_string(),
|
||||
kind: codex_state::AgentJobKind::DynamicQueue,
|
||||
status: codex_state::AgentJobStatus::Running,
|
||||
instruction: "Visit {url}".to_string(),
|
||||
auto_export: true,
|
||||
max_items: Some(1000),
|
||||
max_runtime_seconds: Some(60),
|
||||
output_schema_json: Some(json!({
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"item_id": { "type": "string" }
|
||||
}
|
||||
})),
|
||||
input_headers: Vec::new(),
|
||||
input_csv_path: String::new(),
|
||||
output_csv_path: "/tmp/agent-queue.jsonl".to_string(),
|
||||
created_at: now,
|
||||
updated_at: now,
|
||||
started_at: Some(now),
|
||||
completed_at: None,
|
||||
last_error: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn sample_queue_item() -> codex_state::AgentJobItem {
|
||||
let now = Utc
|
||||
.timestamp_opt(1_700_000_000, 0)
|
||||
.single()
|
||||
.expect("timestamp");
|
||||
codex_state::AgentJobItem {
|
||||
job_id: "job-1".to_string(),
|
||||
item_id: "root".to_string(),
|
||||
parent_item_id: None,
|
||||
row_index: 0,
|
||||
source_id: None,
|
||||
dedupe_key: Some("https://root.test".to_string()),
|
||||
row_json: json!({ "url": "https://root.test" }),
|
||||
status: codex_state::AgentJobItemStatus::Completed,
|
||||
assigned_thread_id: None,
|
||||
attempt_count: 1,
|
||||
result_json: Some(json!({ "item_id": "root" })),
|
||||
last_error: None,
|
||||
created_at: now,
|
||||
updated_at: now,
|
||||
completed_at: Some(now),
|
||||
reported_at: Some(now),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_csv_supports_quotes_and_commas() {
|
||||
let input = "id,name\n1,\"alpha, beta\"\n2,gamma\n";
|
||||
@@ -60,3 +118,131 @@ fn ensure_unique_headers_rejects_duplicates() {
|
||||
FunctionCallError::RespondToModel("csv header path is duplicated".to_string())
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_queue_seed_content_supports_json_array() {
|
||||
let content = r#"[
|
||||
{"item_id":"seed-1","input":{"url":"https://seed-1.test"}},
|
||||
{"item_id":"seed-2","input":{"url":"https://seed-2.test"}}
|
||||
]"#;
|
||||
let parsed = parse_queue_seed_content(content).expect("queue json array");
|
||||
assert_eq!(
|
||||
parsed,
|
||||
vec![
|
||||
QueueJobItemArgs {
|
||||
input: json!({ "url": "https://seed-1.test" }),
|
||||
item_id: Some("seed-1".to_string()),
|
||||
dedupe_key: None,
|
||||
},
|
||||
QueueJobItemArgs {
|
||||
input: json!({ "url": "https://seed-2.test" }),
|
||||
item_id: Some("seed-2".to_string()),
|
||||
dedupe_key: None,
|
||||
},
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_queue_seed_content_supports_jsonl() {
|
||||
let content = concat!(
|
||||
"{\"item_id\":\"seed-1\",\"input\":{\"url\":\"https://seed-1.test\"}}\n",
|
||||
"\n",
|
||||
"{\"item_id\":\"seed-2\",\"input\":{\"url\":\"https://seed-2.test\"}}\n",
|
||||
);
|
||||
let parsed = parse_queue_seed_content(content).expect("queue jsonl");
|
||||
assert_eq!(
|
||||
parsed,
|
||||
vec![
|
||||
QueueJobItemArgs {
|
||||
input: json!({ "url": "https://seed-1.test" }),
|
||||
item_id: Some("seed-1".to_string()),
|
||||
dedupe_key: None,
|
||||
},
|
||||
QueueJobItemArgs {
|
||||
input: json!({ "url": "https://seed-2.test" }),
|
||||
item_id: Some("seed-2".to_string()),
|
||||
dedupe_key: None,
|
||||
},
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn build_initial_queue_job_items_dedupes_and_suffixes_item_ids() {
|
||||
let items = vec![
|
||||
QueueJobItemArgs {
|
||||
input: json!({ "url": "https://a.test" }),
|
||||
item_id: Some("dup".to_string()),
|
||||
dedupe_key: Some("https://a.test".to_string()),
|
||||
},
|
||||
QueueJobItemArgs {
|
||||
input: json!({ "url": "https://duplicate.test" }),
|
||||
item_id: Some("dup".to_string()),
|
||||
dedupe_key: Some("https://a.test".to_string()),
|
||||
},
|
||||
QueueJobItemArgs {
|
||||
input: json!({ "url": "https://b.test" }),
|
||||
item_id: Some("dup".to_string()),
|
||||
dedupe_key: None,
|
||||
},
|
||||
];
|
||||
|
||||
let built = build_initial_queue_job_items(items.as_slice(), 10).expect("build queue items");
|
||||
|
||||
assert_eq!(
|
||||
built,
|
||||
vec![
|
||||
codex_state::AgentJobItemCreateParams {
|
||||
item_id: "dup".to_string(),
|
||||
parent_item_id: None,
|
||||
row_index: 0,
|
||||
source_id: None,
|
||||
dedupe_key: Some("https://a.test".to_string()),
|
||||
row_json: json!({ "url": "https://a.test" }),
|
||||
},
|
||||
codex_state::AgentJobItemCreateParams {
|
||||
item_id: "dup-2".to_string(),
|
||||
parent_item_id: None,
|
||||
row_index: 1,
|
||||
source_id: None,
|
||||
dedupe_key: None,
|
||||
row_json: json!({ "url": "https://b.test" }),
|
||||
},
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn queue_worker_prompt_mentions_enqueue_tool() {
|
||||
let prompt = build_worker_prompt(&sample_queue_job(), &sample_queue_item()).expect("prompt");
|
||||
assert!(prompt.contains("queue-draining agent job"));
|
||||
assert!(prompt.contains("enqueue_agent_job_items"));
|
||||
assert!(prompt.contains("parent_item_id"));
|
||||
assert!(prompt.contains("report_agent_job_result"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn render_job_queue_jsonl_outputs_expected_fields() {
|
||||
let item = sample_queue_item();
|
||||
let rendered = render_job_queue_jsonl(std::slice::from_ref(&item)).expect("render jsonl");
|
||||
let line: Value = serde_json::from_str(rendered.trim()).expect("parse jsonl line");
|
||||
let timestamp = item.reported_at.expect("reported_at").to_rfc3339();
|
||||
assert_eq!(
|
||||
line,
|
||||
json!({
|
||||
"job_id": "job-1",
|
||||
"item_id": "root",
|
||||
"parent_item_id": null,
|
||||
"dedupe_key": "https://root.test",
|
||||
"row_index": 0,
|
||||
"status": "completed",
|
||||
"attempt_count": 1,
|
||||
"input": { "url": "https://root.test" },
|
||||
"result": { "item_id": "root" },
|
||||
"last_error": null,
|
||||
"reported_at": timestamp,
|
||||
"completed_at": timestamp,
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
@@ -128,6 +128,7 @@ pub(crate) struct ToolsConfig {
|
||||
pub experimental_supported_tools: Vec<String>,
|
||||
pub agent_jobs_tools: bool,
|
||||
pub agent_jobs_worker_tools: bool,
|
||||
pub agent_jobs_queue_worker_tools: bool,
|
||||
}
|
||||
|
||||
pub(crate) struct ToolsConfigParams<'a> {
|
||||
@@ -206,12 +207,23 @@ impl ToolsConfig {
|
||||
}
|
||||
};
|
||||
|
||||
let agent_jobs_worker_tools = include_agent_jobs
|
||||
&& matches!(
|
||||
session_source,
|
||||
let (agent_jobs_worker_tools, agent_jobs_queue_worker_tools) = if !include_agent_jobs {
|
||||
(false, false)
|
||||
} else {
|
||||
match session_source {
|
||||
SessionSource::SubAgent(SubAgentSource::Other(label))
|
||||
if label.starts_with("agent_job:")
|
||||
);
|
||||
if label.starts_with("agent_job:queue:") =>
|
||||
{
|
||||
(true, true)
|
||||
}
|
||||
SessionSource::SubAgent(SubAgentSource::Other(label))
|
||||
if label.starts_with("agent_job:csv:") || label.starts_with("agent_job:") =>
|
||||
{
|
||||
(true, false)
|
||||
}
|
||||
_ => (false, false),
|
||||
}
|
||||
};
|
||||
|
||||
Self {
|
||||
available_models: available_models_ref.to_vec(),
|
||||
@@ -240,6 +252,7 @@ impl ToolsConfig {
|
||||
experimental_supported_tools: model_info.experimental_supported_tools.clone(),
|
||||
agent_jobs_tools: include_agent_jobs,
|
||||
agent_jobs_worker_tools,
|
||||
agent_jobs_queue_worker_tools,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1057,6 +1070,177 @@ fn create_spawn_agents_on_csv_tool() -> ToolSpec {
|
||||
})
|
||||
}
|
||||
|
||||
fn create_agent_queue_item_schema() -> JsonSchema {
|
||||
JsonSchema::Object {
|
||||
properties: BTreeMap::from([
|
||||
(
|
||||
"input".to_string(),
|
||||
JsonSchema::Object {
|
||||
properties: BTreeMap::new(),
|
||||
required: None,
|
||||
additional_properties: None,
|
||||
},
|
||||
),
|
||||
(
|
||||
"item_id".to_string(),
|
||||
JsonSchema::String {
|
||||
description: Some(
|
||||
"Optional stable identifier for this queue item.".to_string(),
|
||||
),
|
||||
},
|
||||
),
|
||||
(
|
||||
"dedupe_key".to_string(),
|
||||
JsonSchema::String {
|
||||
description: Some(
|
||||
"Optional deduplication key used to skip already-seen items.".to_string(),
|
||||
),
|
||||
},
|
||||
),
|
||||
]),
|
||||
required: Some(vec!["input".to_string()]),
|
||||
additional_properties: Some(false.into()),
|
||||
}
|
||||
}
|
||||
|
||||
fn create_spawn_agents_on_queue_tool() -> ToolSpec {
|
||||
let queue_item_schema = create_agent_queue_item_schema();
|
||||
let mut properties = BTreeMap::new();
|
||||
properties.insert(
|
||||
"seed_items".to_string(),
|
||||
JsonSchema::Array {
|
||||
items: Box::new(queue_item_schema),
|
||||
description: Some(
|
||||
"Initial queue items as JSON objects. Provide exactly one of `seed_items` or `seed_path`."
|
||||
.to_string(),
|
||||
),
|
||||
},
|
||||
);
|
||||
properties.insert(
|
||||
"seed_path".to_string(),
|
||||
JsonSchema::String {
|
||||
description: Some(
|
||||
"Path to a JSON array or JSONL file containing initial queue items. Provide exactly one of `seed_items` or `seed_path`."
|
||||
.to_string(),
|
||||
),
|
||||
},
|
||||
);
|
||||
properties.insert(
|
||||
"instruction".to_string(),
|
||||
JsonSchema::String {
|
||||
description: Some(
|
||||
"Instruction template to apply to each queue item. Use {field_name} placeholders to inject values from the item's `input` object."
|
||||
.to_string(),
|
||||
),
|
||||
},
|
||||
);
|
||||
properties.insert(
|
||||
"output_jsonl_path".to_string(),
|
||||
JsonSchema::String {
|
||||
description: Some("Optional output JSONL path for exported results.".to_string()),
|
||||
},
|
||||
);
|
||||
properties.insert(
|
||||
"max_concurrency".to_string(),
|
||||
JsonSchema::Number {
|
||||
description: Some(
|
||||
"Maximum concurrent workers for this job. Defaults to 16 and is capped by config."
|
||||
.to_string(),
|
||||
),
|
||||
},
|
||||
);
|
||||
properties.insert(
|
||||
"max_workers".to_string(),
|
||||
JsonSchema::Number {
|
||||
description: Some(
|
||||
"Alias for max_concurrency. Set to 1 to run sequentially.".to_string(),
|
||||
),
|
||||
},
|
||||
);
|
||||
properties.insert(
|
||||
"max_runtime_seconds".to_string(),
|
||||
JsonSchema::Number {
|
||||
description: Some(
|
||||
"Maximum runtime per worker before it is failed. Defaults to 1800 seconds."
|
||||
.to_string(),
|
||||
),
|
||||
},
|
||||
);
|
||||
properties.insert(
|
||||
"max_items".to_string(),
|
||||
JsonSchema::Number {
|
||||
description: Some(
|
||||
"Hard cap on accepted queue items for this job, including seeds and later enqueues. Defaults to 1000."
|
||||
.to_string(),
|
||||
),
|
||||
},
|
||||
);
|
||||
properties.insert(
|
||||
"output_schema".to_string(),
|
||||
JsonSchema::Object {
|
||||
properties: BTreeMap::new(),
|
||||
required: None,
|
||||
additional_properties: None,
|
||||
},
|
||||
);
|
||||
ToolSpec::Function(ResponsesApiTool {
|
||||
name: "spawn_agents_on_queue".to_string(),
|
||||
description: "Process a dynamic queue by spawning worker sub-agents for pending items until the queue is drained. Workers may call `enqueue_agent_job_items` zero or more times to append discovered work, and must call `report_agent_job_result` exactly once. This call blocks until the queue finishes and automatically exports a JSONL snapshot to `output_jsonl_path` (or a default path)."
|
||||
.to_string(),
|
||||
strict: false,
|
||||
defer_loading: None,
|
||||
parameters: JsonSchema::Object {
|
||||
properties,
|
||||
required: Some(vec!["instruction".to_string()]),
|
||||
additional_properties: Some(false.into()),
|
||||
},
|
||||
output_schema: None,
|
||||
})
|
||||
}
|
||||
|
||||
fn create_enqueue_agent_job_items_tool() -> ToolSpec {
|
||||
let mut properties = BTreeMap::new();
|
||||
properties.insert(
|
||||
"job_id".to_string(),
|
||||
JsonSchema::String {
|
||||
description: Some("Identifier of the job.".to_string()),
|
||||
},
|
||||
);
|
||||
properties.insert(
|
||||
"parent_item_id".to_string(),
|
||||
JsonSchema::String {
|
||||
description: Some("Identifier of the running parent queue item.".to_string()),
|
||||
},
|
||||
);
|
||||
properties.insert(
|
||||
"items".to_string(),
|
||||
JsonSchema::Array {
|
||||
items: Box::new(create_agent_queue_item_schema()),
|
||||
description: Some(
|
||||
"Queue items to append. Each item requires an `input` object and may include `item_id` and `dedupe_key`."
|
||||
.to_string(),
|
||||
),
|
||||
},
|
||||
);
|
||||
ToolSpec::Function(ResponsesApiTool {
|
||||
name: "enqueue_agent_job_items".to_string(),
|
||||
description:
|
||||
"Worker-only tool to append new pending items to a dynamic agent-job queue. Main agents should not call this."
|
||||
.to_string(),
|
||||
strict: false,
|
||||
defer_loading: None,
|
||||
parameters: JsonSchema::Object {
|
||||
properties,
|
||||
required: Some(vec![
|
||||
"job_id".to_string(),
|
||||
"parent_item_id".to_string(),
|
||||
"items".to_string(),
|
||||
]),
|
||||
additional_properties: Some(false.into()),
|
||||
},
|
||||
output_schema: None,
|
||||
})
|
||||
}
|
||||
fn create_report_agent_job_result_tool() -> ToolSpec {
|
||||
let mut properties = BTreeMap::new();
|
||||
properties.insert(
|
||||
@@ -2724,7 +2908,14 @@ pub(crate) fn build_specs_with_discoverable_tools(
|
||||
false,
|
||||
config.code_mode_enabled,
|
||||
);
|
||||
push_tool_spec(
|
||||
&mut builder,
|
||||
create_spawn_agents_on_queue_tool(),
|
||||
false,
|
||||
config.code_mode_enabled,
|
||||
);
|
||||
builder.register_handler("spawn_agents_on_csv", agent_jobs_handler.clone());
|
||||
builder.register_handler("spawn_agents_on_queue", agent_jobs_handler.clone());
|
||||
if config.agent_jobs_worker_tools {
|
||||
push_tool_spec(
|
||||
&mut builder,
|
||||
@@ -2732,7 +2923,16 @@ pub(crate) fn build_specs_with_discoverable_tools(
|
||||
false,
|
||||
config.code_mode_enabled,
|
||||
);
|
||||
builder.register_handler("report_agent_job_result", agent_jobs_handler);
|
||||
builder.register_handler("report_agent_job_result", agent_jobs_handler.clone());
|
||||
}
|
||||
if config.agent_jobs_queue_worker_tools {
|
||||
push_tool_spec(
|
||||
&mut builder,
|
||||
create_enqueue_agent_job_items_tool(),
|
||||
false,
|
||||
config.code_mode_enabled,
|
||||
);
|
||||
builder.register_handler("enqueue_agent_job_items", agent_jobs_handler);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -443,6 +443,7 @@ fn test_build_specs_collab_tools_enabled() {
|
||||
&["spawn_agent", "send_input", "wait", "close_agent"],
|
||||
);
|
||||
assert_lacks_tool_name(&tools, "spawn_agents_on_csv");
|
||||
assert_lacks_tool_name(&tools, "spawn_agents_on_queue");
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -469,8 +470,10 @@ fn test_build_specs_enable_fanout_enables_agent_jobs_and_collab_tools() {
|
||||
"wait",
|
||||
"close_agent",
|
||||
"spawn_agents_on_csv",
|
||||
"spawn_agents_on_queue",
|
||||
],
|
||||
);
|
||||
assert_lacks_tool_name(&tools, "enqueue_agent_job_items");
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -555,7 +558,7 @@ fn test_build_specs_artifact_tool_enabled() {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_specs_agent_job_worker_tools_enabled() {
|
||||
fn test_build_specs_csv_agent_job_worker_tools_enabled() {
|
||||
let config = test_config();
|
||||
let model_info = ModelsManager::construct_model_info_offline_for_tests("gpt-5-codex", &config);
|
||||
let mut features = Features::with_defaults();
|
||||
@@ -569,7 +572,7 @@ fn test_build_specs_agent_job_worker_tools_enabled() {
|
||||
features: &features,
|
||||
web_search_mode: Some(WebSearchMode::Cached),
|
||||
session_source: SessionSource::SubAgent(SubAgentSource::Other(
|
||||
"agent_job:test".to_string(),
|
||||
"agent_job:csv:test".to_string(),
|
||||
)),
|
||||
});
|
||||
let (tools, _) = build_specs(&tools_config, None, None, &[]).build();
|
||||
@@ -582,9 +585,47 @@ fn test_build_specs_agent_job_worker_tools_enabled() {
|
||||
"wait",
|
||||
"close_agent",
|
||||
"spawn_agents_on_csv",
|
||||
"spawn_agents_on_queue",
|
||||
"report_agent_job_result",
|
||||
],
|
||||
);
|
||||
assert_lacks_tool_name(&tools, "enqueue_agent_job_items");
|
||||
assert_lacks_tool_name(&tools, "request_user_input");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_specs_queue_agent_job_worker_tools_enabled() {
|
||||
let config = test_config();
|
||||
let model_info = ModelsManager::construct_model_info_offline_for_tests("gpt-5-codex", &config);
|
||||
let mut features = Features::with_defaults();
|
||||
features.enable(Feature::SpawnCsv);
|
||||
features.normalize_dependencies();
|
||||
features.enable(Feature::Sqlite);
|
||||
let available_models = Vec::new();
|
||||
let tools_config = ToolsConfig::new(&ToolsConfigParams {
|
||||
model_info: &model_info,
|
||||
available_models: &available_models,
|
||||
features: &features,
|
||||
web_search_mode: Some(WebSearchMode::Cached),
|
||||
session_source: SessionSource::SubAgent(SubAgentSource::Other(
|
||||
"agent_job:queue:test".to_string(),
|
||||
)),
|
||||
});
|
||||
let (tools, _) = build_specs(&tools_config, None, None, &[]).build();
|
||||
assert_contains_tool_names(
|
||||
&tools,
|
||||
&[
|
||||
"spawn_agent",
|
||||
"send_input",
|
||||
"resume_agent",
|
||||
"wait",
|
||||
"close_agent",
|
||||
"spawn_agents_on_csv",
|
||||
"spawn_agents_on_queue",
|
||||
"report_agent_job_result",
|
||||
"enqueue_agent_job_items",
|
||||
],
|
||||
);
|
||||
assert_lacks_tool_name(&tools, "request_user_input");
|
||||
}
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ use core_test_support::responses::sse;
|
||||
use core_test_support::responses::sse_response;
|
||||
use core_test_support::responses::start_mock_server;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use pretty_assertions::assert_eq;
|
||||
use regex_lite::Regex;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
@@ -22,14 +23,16 @@ use wiremock::matchers::method;
|
||||
use wiremock::matchers::path_regex;
|
||||
|
||||
struct AgentJobsResponder {
|
||||
spawn_tool_name: String,
|
||||
spawn_args_json: String,
|
||||
seen_main: AtomicBool,
|
||||
call_counter: AtomicUsize,
|
||||
}
|
||||
|
||||
impl AgentJobsResponder {
|
||||
fn new(spawn_args_json: String) -> Self {
|
||||
fn new(spawn_tool_name: &str, spawn_args_json: String) -> Self {
|
||||
Self {
|
||||
spawn_tool_name: spawn_tool_name.to_string(),
|
||||
spawn_args_json,
|
||||
seen_main: AtomicBool::new(false),
|
||||
call_counter: AtomicUsize::new(0),
|
||||
@@ -38,14 +41,16 @@ impl AgentJobsResponder {
|
||||
}
|
||||
|
||||
struct StopAfterFirstResponder {
|
||||
spawn_tool_name: String,
|
||||
spawn_args_json: String,
|
||||
seen_main: AtomicBool,
|
||||
worker_calls: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
impl StopAfterFirstResponder {
|
||||
fn new(spawn_args_json: String, worker_calls: Arc<AtomicUsize>) -> Self {
|
||||
fn new(spawn_tool_name: &str, spawn_args_json: String, worker_calls: Arc<AtomicUsize>) -> Self {
|
||||
Self {
|
||||
spawn_tool_name: spawn_tool_name.to_string(),
|
||||
spawn_args_json,
|
||||
seen_main: AtomicBool::new(false),
|
||||
worker_calls,
|
||||
@@ -53,50 +58,76 @@ impl StopAfterFirstResponder {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
enum QueueResponderMode {
|
||||
ReportOnly,
|
||||
EnqueueThenReport,
|
||||
EnqueueTwiceThenReport,
|
||||
EnqueueWithDuplicatesThenReport,
|
||||
EnqueueWithoutReport,
|
||||
StopAfterFirst,
|
||||
}
|
||||
|
||||
struct QueueAgentJobsResponder {
|
||||
spawn_args_json: String,
|
||||
seen_main: AtomicBool,
|
||||
call_counter: AtomicUsize,
|
||||
worker_counter: AtomicUsize,
|
||||
mode: QueueResponderMode,
|
||||
}
|
||||
|
||||
impl QueueAgentJobsResponder {
|
||||
fn new(spawn_args_json: String, mode: QueueResponderMode) -> Self {
|
||||
Self {
|
||||
spawn_args_json,
|
||||
seen_main: AtomicBool::new(false),
|
||||
call_counter: AtomicUsize::new(0),
|
||||
worker_counter: AtomicUsize::new(0),
|
||||
mode,
|
||||
}
|
||||
}
|
||||
|
||||
fn next_call_id(&self, prefix: &str) -> String {
|
||||
let index = self.call_counter.fetch_add(1, Ordering::SeqCst);
|
||||
format!("{prefix}-{index}")
|
||||
}
|
||||
}
|
||||
|
||||
impl Respond for StopAfterFirstResponder {
|
||||
fn respond(&self, request: &wiremock::Request) -> ResponseTemplate {
|
||||
let body_bytes = decode_body_bytes(request);
|
||||
let body: Value = serde_json::from_slice(&body_bytes).unwrap_or(Value::Null);
|
||||
|
||||
if has_function_call_output(&body) {
|
||||
return sse_response(sse(vec![
|
||||
ev_response_created("resp-tool"),
|
||||
ev_completed("resp-tool"),
|
||||
]));
|
||||
return completed_response("resp-tool");
|
||||
}
|
||||
|
||||
if let Some((job_id, item_id)) = extract_job_and_item(&body) {
|
||||
let call_index = self.worker_calls.fetch_add(1, Ordering::SeqCst);
|
||||
let call_id = format!("call-worker-{call_index}");
|
||||
let stop = call_index == 0;
|
||||
let args = json!({
|
||||
"job_id": job_id,
|
||||
"item_id": item_id,
|
||||
"result": { "item_id": item_id },
|
||||
"stop": stop,
|
||||
});
|
||||
let args_json = serde_json::to_string(&args).unwrap_or_else(|err| {
|
||||
panic!("worker args serialize: {err}");
|
||||
});
|
||||
return sse_response(sse(vec![
|
||||
ev_response_created("resp-worker"),
|
||||
ev_function_call(&call_id, "report_agent_job_result", &args_json),
|
||||
ev_completed("resp-worker"),
|
||||
]));
|
||||
return response_with_events(
|
||||
"resp-worker",
|
||||
vec![report_result_event(
|
||||
&call_id,
|
||||
&job_id,
|
||||
&item_id,
|
||||
Some(call_index == 0),
|
||||
)],
|
||||
);
|
||||
}
|
||||
|
||||
if !self.seen_main.swap(true, Ordering::SeqCst) {
|
||||
return sse_response(sse(vec![
|
||||
ev_response_created("resp-main"),
|
||||
ev_function_call("call-spawn", "spawn_agents_on_csv", &self.spawn_args_json),
|
||||
ev_completed("resp-main"),
|
||||
]));
|
||||
return response_with_events(
|
||||
"resp-main",
|
||||
vec![ev_function_call(
|
||||
"call-spawn",
|
||||
self.spawn_tool_name.as_str(),
|
||||
&self.spawn_args_json,
|
||||
)],
|
||||
);
|
||||
}
|
||||
|
||||
sse_response(sse(vec![
|
||||
ev_response_created("resp-default"),
|
||||
ev_completed("resp-default"),
|
||||
]))
|
||||
completed_response("resp-default")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -106,10 +137,7 @@ impl Respond for AgentJobsResponder {
|
||||
let body: Value = serde_json::from_slice(&body_bytes).unwrap_or(Value::Null);
|
||||
|
||||
if has_function_call_output(&body) {
|
||||
return sse_response(sse(vec![
|
||||
ev_response_created("resp-tool"),
|
||||
ev_completed("resp-tool"),
|
||||
]));
|
||||
return completed_response("resp-tool");
|
||||
}
|
||||
|
||||
if let Some((job_id, item_id)) = extract_job_and_item(&body) {
|
||||
@@ -117,36 +145,242 @@ impl Respond for AgentJobsResponder {
|
||||
"call-worker-{}",
|
||||
self.call_counter.fetch_add(1, Ordering::SeqCst)
|
||||
);
|
||||
let args = json!({
|
||||
"job_id": job_id,
|
||||
"item_id": item_id,
|
||||
"result": { "item_id": item_id }
|
||||
});
|
||||
let args_json = serde_json::to_string(&args).unwrap_or_else(|err| {
|
||||
panic!("worker args serialize: {err}");
|
||||
});
|
||||
return sse_response(sse(vec![
|
||||
ev_response_created("resp-worker"),
|
||||
ev_function_call(&call_id, "report_agent_job_result", &args_json),
|
||||
ev_completed("resp-worker"),
|
||||
]));
|
||||
return response_with_events(
|
||||
"resp-worker",
|
||||
vec![report_result_event(&call_id, &job_id, &item_id, None)],
|
||||
);
|
||||
}
|
||||
|
||||
if !self.seen_main.swap(true, Ordering::SeqCst) {
|
||||
return sse_response(sse(vec![
|
||||
ev_response_created("resp-main"),
|
||||
ev_function_call("call-spawn", "spawn_agents_on_csv", &self.spawn_args_json),
|
||||
ev_completed("resp-main"),
|
||||
]));
|
||||
return response_with_events(
|
||||
"resp-main",
|
||||
vec![ev_function_call(
|
||||
"call-spawn",
|
||||
self.spawn_tool_name.as_str(),
|
||||
&self.spawn_args_json,
|
||||
)],
|
||||
);
|
||||
}
|
||||
|
||||
sse_response(sse(vec![
|
||||
ev_response_created("resp-default"),
|
||||
ev_completed("resp-default"),
|
||||
]))
|
||||
completed_response("resp-default")
|
||||
}
|
||||
}
|
||||
|
||||
impl Respond for QueueAgentJobsResponder {
|
||||
fn respond(&self, request: &wiremock::Request) -> ResponseTemplate {
|
||||
let body_bytes = decode_body_bytes(request);
|
||||
let body: Value = serde_json::from_slice(&body_bytes).unwrap_or(Value::Null);
|
||||
|
||||
if has_function_call_output(&body) {
|
||||
return completed_response("resp-tool");
|
||||
}
|
||||
|
||||
if let Some((job_id, item_id)) = extract_job_and_item(&body) {
|
||||
let mut events = Vec::new();
|
||||
match self.mode {
|
||||
QueueResponderMode::ReportOnly => {
|
||||
let call_id = self.next_call_id("call-report");
|
||||
events.push(report_result_event(&call_id, &job_id, &item_id, None));
|
||||
}
|
||||
QueueResponderMode::EnqueueThenReport => {
|
||||
if item_id == "root" {
|
||||
let enqueue_call_id = self.next_call_id("call-enqueue");
|
||||
events.push(enqueue_items_event(
|
||||
&enqueue_call_id,
|
||||
&job_id,
|
||||
&item_id,
|
||||
vec![
|
||||
json!({
|
||||
"item_id": "child-1",
|
||||
"dedupe_key": "https://child-1.test",
|
||||
"input": { "url": "https://child-1.test" }
|
||||
}),
|
||||
json!({
|
||||
"item_id": "child-2",
|
||||
"dedupe_key": "https://child-2.test",
|
||||
"input": { "url": "https://child-2.test" }
|
||||
}),
|
||||
],
|
||||
));
|
||||
}
|
||||
let report_call_id = self.next_call_id("call-report");
|
||||
events.push(report_result_event(
|
||||
&report_call_id,
|
||||
&job_id,
|
||||
&item_id,
|
||||
None,
|
||||
));
|
||||
}
|
||||
QueueResponderMode::EnqueueTwiceThenReport => {
|
||||
if item_id == "root" {
|
||||
let enqueue_call_id = self.next_call_id("call-enqueue");
|
||||
events.push(enqueue_items_event(
|
||||
&enqueue_call_id,
|
||||
&job_id,
|
||||
&item_id,
|
||||
vec![json!({
|
||||
"item_id": "child-1",
|
||||
"dedupe_key": "https://child-1.test",
|
||||
"input": { "url": "https://child-1.test" }
|
||||
})],
|
||||
));
|
||||
let second_enqueue_call_id = self.next_call_id("call-enqueue");
|
||||
events.push(enqueue_items_event(
|
||||
&second_enqueue_call_id,
|
||||
&job_id,
|
||||
&item_id,
|
||||
vec![json!({
|
||||
"item_id": "child-2",
|
||||
"dedupe_key": "https://child-2.test",
|
||||
"input": { "url": "https://child-2.test" }
|
||||
})],
|
||||
));
|
||||
}
|
||||
let report_call_id = self.next_call_id("call-report");
|
||||
events.push(report_result_event(
|
||||
&report_call_id,
|
||||
&job_id,
|
||||
&item_id,
|
||||
None,
|
||||
));
|
||||
}
|
||||
QueueResponderMode::EnqueueWithDuplicatesThenReport => {
|
||||
if item_id == "root" {
|
||||
let enqueue_call_id = self.next_call_id("call-enqueue");
|
||||
events.push(enqueue_items_event(
|
||||
&enqueue_call_id,
|
||||
&job_id,
|
||||
&item_id,
|
||||
vec![
|
||||
json!({
|
||||
"item_id": "child-1",
|
||||
"dedupe_key": "https://child-1.test",
|
||||
"input": { "url": "https://child-1.test" }
|
||||
}),
|
||||
json!({
|
||||
"item_id": "child-dupe",
|
||||
"dedupe_key": "https://child-1.test",
|
||||
"input": { "url": "https://child-dupe.test" }
|
||||
}),
|
||||
json!({
|
||||
"item_id": "child-2",
|
||||
"dedupe_key": "https://child-2.test",
|
||||
"input": { "url": "https://child-2.test" }
|
||||
}),
|
||||
],
|
||||
));
|
||||
}
|
||||
let report_call_id = self.next_call_id("call-report");
|
||||
events.push(report_result_event(
|
||||
&report_call_id,
|
||||
&job_id,
|
||||
&item_id,
|
||||
None,
|
||||
));
|
||||
}
|
||||
QueueResponderMode::EnqueueWithoutReport => {
|
||||
if item_id == "root" {
|
||||
let enqueue_call_id = self.next_call_id("call-enqueue");
|
||||
events.push(enqueue_items_event(
|
||||
&enqueue_call_id,
|
||||
&job_id,
|
||||
&item_id,
|
||||
vec![
|
||||
json!({
|
||||
"item_id": "child-1",
|
||||
"dedupe_key": "https://child-1.test",
|
||||
"input": { "url": "https://child-1.test" }
|
||||
}),
|
||||
json!({
|
||||
"item_id": "child-2",
|
||||
"dedupe_key": "https://child-2.test",
|
||||
"input": { "url": "https://child-2.test" }
|
||||
}),
|
||||
],
|
||||
));
|
||||
} else {
|
||||
let report_call_id = self.next_call_id("call-report");
|
||||
events.push(report_result_event(
|
||||
&report_call_id,
|
||||
&job_id,
|
||||
&item_id,
|
||||
None,
|
||||
));
|
||||
}
|
||||
}
|
||||
QueueResponderMode::StopAfterFirst => {
|
||||
let worker_index = self.worker_counter.fetch_add(1, Ordering::SeqCst);
|
||||
let report_call_id = self.next_call_id("call-report");
|
||||
events.push(report_result_event(
|
||||
&report_call_id,
|
||||
&job_id,
|
||||
&item_id,
|
||||
Some(worker_index == 0),
|
||||
));
|
||||
}
|
||||
}
|
||||
return response_with_events("resp-worker", events);
|
||||
}
|
||||
|
||||
if !self.seen_main.swap(true, Ordering::SeqCst) {
|
||||
return response_with_events(
|
||||
"resp-main",
|
||||
vec![ev_function_call(
|
||||
"call-spawn",
|
||||
"spawn_agents_on_queue",
|
||||
&self.spawn_args_json,
|
||||
)],
|
||||
);
|
||||
}
|
||||
|
||||
completed_response("resp-default")
|
||||
}
|
||||
}
|
||||
|
||||
fn response_with_events(response_id: &str, mut events: Vec<Value>) -> ResponseTemplate {
|
||||
let mut full_events = Vec::with_capacity(events.len() + 2);
|
||||
full_events.push(ev_response_created(response_id));
|
||||
full_events.append(&mut events);
|
||||
full_events.push(ev_completed(response_id));
|
||||
sse_response(sse(full_events))
|
||||
}
|
||||
|
||||
fn completed_response(response_id: &str) -> ResponseTemplate {
|
||||
response_with_events(response_id, Vec::new())
|
||||
}
|
||||
|
||||
fn report_result_event(call_id: &str, job_id: &str, item_id: &str, stop: Option<bool>) -> Value {
|
||||
let mut args = json!({
|
||||
"job_id": job_id,
|
||||
"item_id": item_id,
|
||||
"result": { "item_id": item_id },
|
||||
});
|
||||
if let Some(stop) = stop
|
||||
&& let Some(object) = args.as_object_mut()
|
||||
{
|
||||
object.insert("stop".to_string(), Value::Bool(stop));
|
||||
}
|
||||
ev_function_call(call_id, "report_agent_job_result", &serialize_json(&args))
|
||||
}
|
||||
|
||||
fn enqueue_items_event(
|
||||
call_id: &str,
|
||||
job_id: &str,
|
||||
parent_item_id: &str,
|
||||
items: Vec<Value>,
|
||||
) -> Value {
|
||||
let args = json!({
|
||||
"job_id": job_id,
|
||||
"parent_item_id": parent_item_id,
|
||||
"items": items,
|
||||
});
|
||||
ev_function_call(call_id, "enqueue_agent_job_items", &serialize_json(&args))
|
||||
}
|
||||
|
||||
fn serialize_json(value: &Value) -> String {
|
||||
serde_json::to_string(value).unwrap_or_else(|err| panic!("json serialization failed: {err}"))
|
||||
}
|
||||
|
||||
fn decode_body_bytes(request: &wiremock::Request) -> Vec<u8> {
|
||||
let Some(encoding) = request
|
||||
.headers
|
||||
@@ -178,12 +412,17 @@ fn has_function_call_output(body: &Value) -> bool {
|
||||
|
||||
fn extract_job_and_item(body: &Value) -> Option<(String, String)> {
|
||||
let texts = message_input_texts(body);
|
||||
let mut combined = texts.join("\n");
|
||||
let mut combined = texts.join(
|
||||
"
|
||||
",
|
||||
);
|
||||
if let Some(instructions) = body.get("instructions").and_then(Value::as_str) {
|
||||
combined.push('\n');
|
||||
combined.push_str(instructions);
|
||||
}
|
||||
if !combined.contains("You are processing one item for a generic agent job.") {
|
||||
if !combined.contains("You are processing one item for a generic agent job.")
|
||||
&& !combined.contains("You are processing one item in a queue-draining agent job.")
|
||||
{
|
||||
return None;
|
||||
}
|
||||
let job_id = Regex::new(r"Job ID:\s*([^\n]+)")
|
||||
@@ -218,6 +457,16 @@ fn parse_simple_csv_line(line: &str) -> Vec<String> {
|
||||
line.split(',').map(str::to_string).collect()
|
||||
}
|
||||
|
||||
fn parse_jsonl_lines(path: &std::path::Path) -> Result<Vec<Value>> {
|
||||
let content = fs::read_to_string(path)?;
|
||||
content
|
||||
.lines()
|
||||
.filter(|line| !line.trim().is_empty())
|
||||
.map(serde_json::from_str)
|
||||
.collect::<std::result::Result<Vec<_>, _>>()
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn report_agent_job_result_rejects_wrong_thread() -> Result<()> {
|
||||
let server = start_mock_server().await;
|
||||
@@ -244,7 +493,7 @@ async fn report_agent_job_result_rejects_wrong_thread() -> Result<()> {
|
||||
});
|
||||
let args_json = serde_json::to_string(&args)?;
|
||||
|
||||
let responder = AgentJobsResponder::new(args_json);
|
||||
let responder = AgentJobsResponder::new("spawn_agents_on_csv", args_json);
|
||||
Mock::given(method("POST"))
|
||||
.and(path_regex(".*/responses$"))
|
||||
.respond_with(responder)
|
||||
@@ -310,7 +559,7 @@ async fn spawn_agents_on_csv_runs_and_exports() -> Result<()> {
|
||||
});
|
||||
let args_json = serde_json::to_string(&args)?;
|
||||
|
||||
let responder = AgentJobsResponder::new(args_json);
|
||||
let responder = AgentJobsResponder::new("spawn_agents_on_csv", args_json);
|
||||
Mock::given(method("POST"))
|
||||
.and(path_regex(".*/responses$"))
|
||||
.respond_with(responder)
|
||||
@@ -354,7 +603,7 @@ async fn spawn_agents_on_csv_dedupes_item_ids() -> Result<()> {
|
||||
});
|
||||
let args_json = serde_json::to_string(&args)?;
|
||||
|
||||
let responder = AgentJobsResponder::new(args_json);
|
||||
let responder = AgentJobsResponder::new("spawn_agents_on_csv", args_json);
|
||||
Mock::given(method("POST"))
|
||||
.and(path_regex(".*/responses$"))
|
||||
.respond_with(responder)
|
||||
@@ -413,7 +662,8 @@ async fn spawn_agents_on_csv_stop_halts_future_items() -> Result<()> {
|
||||
let args_json = serde_json::to_string(&args)?;
|
||||
|
||||
let worker_calls = Arc::new(AtomicUsize::new(0));
|
||||
let responder = StopAfterFirstResponder::new(args_json, worker_calls.clone());
|
||||
let responder =
|
||||
StopAfterFirstResponder::new("spawn_agents_on_csv", args_json, worker_calls.clone());
|
||||
Mock::given(method("POST"))
|
||||
.and(path_regex(".*/responses$"))
|
||||
.respond_with(responder)
|
||||
@@ -446,3 +696,432 @@ async fn spawn_agents_on_csv_stop_halts_future_items() -> Result<()> {
|
||||
assert_eq!(worker_calls.load(Ordering::SeqCst), 1);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn spawn_agents_on_queue_runs_and_exports_jsonl() -> Result<()> {
|
||||
let server = start_mock_server().await;
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config
|
||||
.features
|
||||
.enable(Feature::SpawnCsv)
|
||||
.expect("test config should allow feature update");
|
||||
config
|
||||
.features
|
||||
.enable(Feature::Sqlite)
|
||||
.expect("test config should allow feature update");
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
|
||||
let output_path = test.cwd_path().join("agent_queue_output.jsonl");
|
||||
let args = json!({
|
||||
"seed_items": [
|
||||
{
|
||||
"item_id": "seed-1",
|
||||
"dedupe_key": "https://seed-1.test",
|
||||
"input": { "url": "https://seed-1.test" }
|
||||
},
|
||||
{
|
||||
"item_id": "seed-2",
|
||||
"dedupe_key": "https://seed-2.test",
|
||||
"input": { "url": "https://seed-2.test" }
|
||||
}
|
||||
],
|
||||
"instruction": "Visit {url}",
|
||||
"output_jsonl_path": output_path.display().to_string(),
|
||||
});
|
||||
let args_json = serde_json::to_string(&args)?;
|
||||
|
||||
let responder = QueueAgentJobsResponder::new(args_json, QueueResponderMode::ReportOnly);
|
||||
Mock::given(method("POST"))
|
||||
.and(path_regex(".*/responses$"))
|
||||
.respond_with(responder)
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
test.submit_turn("run queue job").await?;
|
||||
|
||||
let lines = parse_jsonl_lines(&output_path)?;
|
||||
assert_eq!(lines.len(), 2);
|
||||
let item_ids: Vec<_> = lines
|
||||
.iter()
|
||||
.map(|line| line["item_id"].as_str().expect("item_id"))
|
||||
.collect();
|
||||
assert_eq!(item_ids, vec!["seed-1", "seed-2"]);
|
||||
let statuses: Vec<_> = lines
|
||||
.iter()
|
||||
.map(|line| line["status"].as_str().expect("status"))
|
||||
.collect();
|
||||
assert_eq!(statuses, vec!["completed", "completed"]);
|
||||
|
||||
let job_id = lines[0]["job_id"].as_str().expect("job_id");
|
||||
let db = test.codex.state_db().expect("state db");
|
||||
let job = db.get_agent_job(job_id).await?.expect("job");
|
||||
assert_eq!(job.kind, codex_state::AgentJobKind::DynamicQueue);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn spawn_agents_on_queue_supports_seed_path_json_array() -> Result<()> {
|
||||
let server = start_mock_server().await;
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config
|
||||
.features
|
||||
.enable(Feature::SpawnCsv)
|
||||
.expect("test config should allow feature update");
|
||||
config
|
||||
.features
|
||||
.enable(Feature::Sqlite)
|
||||
.expect("test config should allow feature update");
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
|
||||
let seed_path = test.cwd_path().join("agent_queue_seed.json");
|
||||
fs::write(
|
||||
&seed_path,
|
||||
serde_json::to_string(&json!([
|
||||
{
|
||||
"item_id": "seed-json",
|
||||
"input": { "url": "https://seed-json.test" }
|
||||
}
|
||||
]))?,
|
||||
)?;
|
||||
let output_path = test.cwd_path().join("agent_queue_seed_array.jsonl");
|
||||
let args = json!({
|
||||
"seed_path": seed_path.display().to_string(),
|
||||
"instruction": "Visit {url}",
|
||||
"output_jsonl_path": output_path.display().to_string(),
|
||||
});
|
||||
let args_json = serde_json::to_string(&args)?;
|
||||
|
||||
let responder = QueueAgentJobsResponder::new(args_json, QueueResponderMode::ReportOnly);
|
||||
Mock::given(method("POST"))
|
||||
.and(path_regex(".*/responses$"))
|
||||
.respond_with(responder)
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
test.submit_turn("run queue job from json array").await?;
|
||||
|
||||
let lines = parse_jsonl_lines(&output_path)?;
|
||||
assert_eq!(lines.len(), 1);
|
||||
assert_eq!(lines[0]["item_id"].as_str(), Some("seed-json"));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn spawn_agents_on_queue_supports_seed_path_jsonl() -> Result<()> {
|
||||
let server = start_mock_server().await;
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config
|
||||
.features
|
||||
.enable(Feature::SpawnCsv)
|
||||
.expect("test config should allow feature update");
|
||||
config
|
||||
.features
|
||||
.enable(Feature::Sqlite)
|
||||
.expect("test config should allow feature update");
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
|
||||
let seed_path = test.cwd_path().join("agent_queue_seed.jsonl");
|
||||
fs::write(
|
||||
&seed_path,
|
||||
format!(
|
||||
"{}
|
||||
|
||||
{}
|
||||
",
|
||||
serialize_json(&json!({
|
||||
"item_id": "seed-jsonl-1",
|
||||
"input": { "url": "https://seed-jsonl-1.test" }
|
||||
})),
|
||||
serialize_json(&json!({
|
||||
"item_id": "seed-jsonl-2",
|
||||
"input": { "url": "https://seed-jsonl-2.test" }
|
||||
})),
|
||||
),
|
||||
)?;
|
||||
let output_path = test.cwd_path().join("agent_queue_seed_jsonl.jsonl");
|
||||
let args = json!({
|
||||
"seed_path": seed_path.display().to_string(),
|
||||
"instruction": "Visit {url}",
|
||||
"output_jsonl_path": output_path.display().to_string(),
|
||||
});
|
||||
let args_json = serde_json::to_string(&args)?;
|
||||
|
||||
let responder = QueueAgentJobsResponder::new(args_json, QueueResponderMode::ReportOnly);
|
||||
Mock::given(method("POST"))
|
||||
.and(path_regex(".*/responses$"))
|
||||
.respond_with(responder)
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
test.submit_turn("run queue job from jsonl").await?;
|
||||
|
||||
let lines = parse_jsonl_lines(&output_path)?;
|
||||
assert_eq!(lines.len(), 2);
|
||||
let item_ids: Vec<_> = lines
|
||||
.iter()
|
||||
.map(|line| line["item_id"].as_str().expect("item_id"))
|
||||
.collect();
|
||||
assert_eq!(item_ids, vec!["seed-jsonl-1", "seed-jsonl-2"]);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn queue_worker_can_enqueue_more_items() -> Result<()> {
|
||||
let server = start_mock_server().await;
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config
|
||||
.features
|
||||
.enable(Feature::SpawnCsv)
|
||||
.expect("test config should allow feature update");
|
||||
config
|
||||
.features
|
||||
.enable(Feature::Sqlite)
|
||||
.expect("test config should allow feature update");
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
|
||||
let output_path = test.cwd_path().join("agent_queue_children.jsonl");
|
||||
let args = json!({
|
||||
"seed_items": [{
|
||||
"item_id": "root",
|
||||
"input": { "url": "https://root.test" }
|
||||
}],
|
||||
"instruction": "Visit {url}",
|
||||
"output_jsonl_path": output_path.display().to_string(),
|
||||
});
|
||||
let args_json = serde_json::to_string(&args)?;
|
||||
|
||||
let responder = QueueAgentJobsResponder::new(args_json, QueueResponderMode::EnqueueThenReport);
|
||||
Mock::given(method("POST"))
|
||||
.and(path_regex(".*/responses$"))
|
||||
.respond_with(responder)
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
test.submit_turn("run expanding queue job").await?;
|
||||
|
||||
let lines = parse_jsonl_lines(&output_path)?;
|
||||
assert_eq!(lines.len(), 3);
|
||||
let item_ids: Vec<_> = lines
|
||||
.iter()
|
||||
.map(|line| line["item_id"].as_str().expect("item_id"))
|
||||
.collect();
|
||||
assert_eq!(item_ids, vec!["root", "child-1", "child-2"]);
|
||||
let statuses: Vec<_> = lines
|
||||
.iter()
|
||||
.map(|line| line["status"].as_str().expect("status"))
|
||||
.collect();
|
||||
assert_eq!(statuses, vec!["completed", "completed", "completed"]);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn queue_worker_multiple_enqueue_calls_are_supported() -> Result<()> {
|
||||
let server = start_mock_server().await;
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config
|
||||
.features
|
||||
.enable(Feature::SpawnCsv)
|
||||
.expect("test config should allow feature update");
|
||||
config
|
||||
.features
|
||||
.enable(Feature::Sqlite)
|
||||
.expect("test config should allow feature update");
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
|
||||
let output_path = test.cwd_path().join("agent_queue_multiple_enqueue.jsonl");
|
||||
let args = json!({
|
||||
"seed_items": [{
|
||||
"item_id": "root",
|
||||
"input": { "url": "https://root.test" }
|
||||
}],
|
||||
"instruction": "Visit {url}",
|
||||
"output_jsonl_path": output_path.display().to_string(),
|
||||
});
|
||||
let args_json = serde_json::to_string(&args)?;
|
||||
|
||||
let responder =
|
||||
QueueAgentJobsResponder::new(args_json, QueueResponderMode::EnqueueTwiceThenReport);
|
||||
Mock::given(method("POST"))
|
||||
.and(path_regex(".*/responses$"))
|
||||
.respond_with(responder)
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
test.submit_turn("run multi-enqueue queue job").await?;
|
||||
|
||||
let lines = parse_jsonl_lines(&output_path)?;
|
||||
let mut item_ids: Vec<_> = lines
|
||||
.iter()
|
||||
.map(|line| line["item_id"].as_str().expect("item_id"))
|
||||
.collect();
|
||||
item_ids.sort_unstable();
|
||||
assert_eq!(item_ids, vec!["child-1", "child-2", "root"]);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn queue_dedupe_key_skips_duplicate_urls() -> Result<()> {
|
||||
let server = start_mock_server().await;
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config
|
||||
.features
|
||||
.enable(Feature::SpawnCsv)
|
||||
.expect("test config should allow feature update");
|
||||
config
|
||||
.features
|
||||
.enable(Feature::Sqlite)
|
||||
.expect("test config should allow feature update");
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
|
||||
let output_path = test.cwd_path().join("agent_queue_dedupe.jsonl");
|
||||
let args = json!({
|
||||
"seed_items": [{
|
||||
"item_id": "root",
|
||||
"input": { "url": "https://root.test" }
|
||||
}],
|
||||
"instruction": "Visit {url}",
|
||||
"output_jsonl_path": output_path.display().to_string(),
|
||||
});
|
||||
let args_json = serde_json::to_string(&args)?;
|
||||
|
||||
let responder = QueueAgentJobsResponder::new(
|
||||
args_json,
|
||||
QueueResponderMode::EnqueueWithDuplicatesThenReport,
|
||||
);
|
||||
Mock::given(method("POST"))
|
||||
.and(path_regex(".*/responses$"))
|
||||
.respond_with(responder)
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
test.submit_turn("run deduping queue job").await?;
|
||||
|
||||
let lines = parse_jsonl_lines(&output_path)?;
|
||||
assert_eq!(lines.len(), 3);
|
||||
let item_ids: Vec<_> = lines
|
||||
.iter()
|
||||
.map(|line| line["item_id"].as_str().expect("item_id"))
|
||||
.collect();
|
||||
assert_eq!(item_ids, vec!["root", "child-1", "child-2"]);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn queue_parent_failure_keeps_children() -> Result<()> {
|
||||
let server = start_mock_server().await;
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config
|
||||
.features
|
||||
.enable(Feature::SpawnCsv)
|
||||
.expect("test config should allow feature update");
|
||||
config
|
||||
.features
|
||||
.enable(Feature::Sqlite)
|
||||
.expect("test config should allow feature update");
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
|
||||
let output_path = test.cwd_path().join("agent_queue_parent_failure.jsonl");
|
||||
let args = json!({
|
||||
"seed_items": [{
|
||||
"item_id": "root",
|
||||
"input": { "url": "https://root.test" }
|
||||
}],
|
||||
"instruction": "Visit {url}",
|
||||
"output_jsonl_path": output_path.display().to_string(),
|
||||
});
|
||||
let args_json = serde_json::to_string(&args)?;
|
||||
|
||||
let responder =
|
||||
QueueAgentJobsResponder::new(args_json, QueueResponderMode::EnqueueWithoutReport);
|
||||
Mock::given(method("POST"))
|
||||
.and(path_regex(".*/responses$"))
|
||||
.respond_with(responder)
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
test.submit_turn("run queue parent failure job").await?;
|
||||
|
||||
let lines = parse_jsonl_lines(&output_path)?;
|
||||
assert_eq!(lines.len(), 3);
|
||||
let root = lines
|
||||
.iter()
|
||||
.find(|line| line["item_id"].as_str() == Some("root"))
|
||||
.expect("root item");
|
||||
assert_eq!(root["status"].as_str(), Some("failed"));
|
||||
assert_eq!(
|
||||
root["last_error"].as_str(),
|
||||
Some("worker finished without calling report_agent_job_result")
|
||||
);
|
||||
let child_statuses: Vec<_> = lines
|
||||
.iter()
|
||||
.filter(|line| line["item_id"].as_str() != Some("root"))
|
||||
.map(|line| line["status"].as_str().expect("status"))
|
||||
.collect();
|
||||
assert_eq!(child_statuses, vec!["completed", "completed"]);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn queue_stop_cancels_remaining_pending_items() -> Result<()> {
|
||||
let server = start_mock_server().await;
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config
|
||||
.features
|
||||
.enable(Feature::SpawnCsv)
|
||||
.expect("test config should allow feature update");
|
||||
config
|
||||
.features
|
||||
.enable(Feature::Sqlite)
|
||||
.expect("test config should allow feature update");
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
|
||||
let output_path = test.cwd_path().join("agent_queue_stop.jsonl");
|
||||
let args = json!({
|
||||
"seed_items": [
|
||||
{ "item_id": "seed-1", "input": { "url": "https://seed-1.test" } },
|
||||
{ "item_id": "seed-2", "input": { "url": "https://seed-2.test" } },
|
||||
{ "item_id": "seed-3", "input": { "url": "https://seed-3.test" } }
|
||||
],
|
||||
"instruction": "Visit {url}",
|
||||
"output_jsonl_path": output_path.display().to_string(),
|
||||
"max_concurrency": 1,
|
||||
});
|
||||
let args_json = serde_json::to_string(&args)?;
|
||||
|
||||
let responder = QueueAgentJobsResponder::new(args_json, QueueResponderMode::StopAfterFirst);
|
||||
Mock::given(method("POST"))
|
||||
.and(path_regex(".*/responses$"))
|
||||
.respond_with(responder)
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
test.submit_turn("run stopping queue job").await?;
|
||||
|
||||
let lines = parse_jsonl_lines(&output_path)?;
|
||||
assert_eq!(lines.len(), 3);
|
||||
let statuses: Vec<_> = lines
|
||||
.iter()
|
||||
.map(|line| line["status"].as_str().expect("status"))
|
||||
.collect();
|
||||
assert_eq!(statuses, vec!["completed", "pending", "pending"]);
|
||||
|
||||
let job_id = lines[0]["job_id"].as_str().expect("job_id");
|
||||
let db = test.codex.state_db().expect("state db");
|
||||
let job = db.get_agent_job(job_id).await?.expect("job");
|
||||
assert_eq!(job.status, codex_state::AgentJobStatus::Cancelled);
|
||||
let progress = db.get_agent_job_progress(job_id).await?;
|
||||
assert_eq!(progress.total_items, 3);
|
||||
assert_eq!(progress.completed_items, 1);
|
||||
assert_eq!(progress.failed_items, 0);
|
||||
assert_eq!(progress.running_items, 0);
|
||||
assert_eq!(progress.pending_items, 2);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
21
codex-rs/state/migrations/0019_agent_jobs_dynamic_queue.sql
Normal file
21
codex-rs/state/migrations/0019_agent_jobs_dynamic_queue.sql
Normal file
@@ -0,0 +1,21 @@
|
||||
ALTER TABLE agent_jobs
|
||||
ADD COLUMN kind TEXT NOT NULL DEFAULT 'csv_batch';
|
||||
|
||||
ALTER TABLE agent_jobs
|
||||
ADD COLUMN max_items INTEGER;
|
||||
|
||||
ALTER TABLE agent_job_items
|
||||
ADD COLUMN parent_item_id TEXT;
|
||||
|
||||
ALTER TABLE agent_job_items
|
||||
ADD COLUMN dedupe_key TEXT;
|
||||
|
||||
CREATE INDEX idx_agent_job_items_row_index
|
||||
ON agent_job_items(job_id, row_index ASC);
|
||||
|
||||
CREATE INDEX idx_agent_job_items_parent
|
||||
ON agent_job_items(job_id, parent_item_id, row_index ASC);
|
||||
|
||||
CREATE UNIQUE INDEX idx_agent_job_items_dedupe_key
|
||||
ON agent_job_items(job_id, dedupe_key)
|
||||
WHERE dedupe_key IS NOT NULL;
|
||||
@@ -29,12 +29,15 @@ pub use model::AgentJobCreateParams;
|
||||
pub use model::AgentJobItem;
|
||||
pub use model::AgentJobItemCreateParams;
|
||||
pub use model::AgentJobItemStatus;
|
||||
pub use model::AgentJobKind;
|
||||
pub use model::AgentJobProgress;
|
||||
pub use model::AgentJobStatus;
|
||||
pub use model::Anchor;
|
||||
pub use model::BackfillState;
|
||||
pub use model::BackfillStats;
|
||||
pub use model::BackfillStatus;
|
||||
pub use model::EnqueueAgentJobItemOutcome;
|
||||
pub use model::EnqueueAgentJobItemsResult;
|
||||
pub use model::ExtractionOutcome;
|
||||
pub use model::SortKey;
|
||||
pub use model::Stage1JobClaim;
|
||||
@@ -56,7 +59,7 @@ pub const SQLITE_HOME_ENV: &str = "CODEX_SQLITE_HOME";
|
||||
pub const LOGS_DB_FILENAME: &str = "logs";
|
||||
pub const LOGS_DB_VERSION: u32 = 1;
|
||||
pub const STATE_DB_FILENAME: &str = "state";
|
||||
pub const STATE_DB_VERSION: u32 = 5;
|
||||
pub const STATE_DB_VERSION: u32 = 6;
|
||||
|
||||
/// Errors encountered during DB operations. Tags: [stage]
|
||||
pub const DB_ERROR_METRIC: &str = "codex.db.error";
|
||||
|
||||
@@ -12,6 +12,29 @@ pub enum AgentJobStatus {
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum AgentJobKind {
|
||||
CsvBatch,
|
||||
DynamicQueue,
|
||||
}
|
||||
|
||||
impl AgentJobKind {
|
||||
pub const fn as_str(self) -> &'static str {
|
||||
match self {
|
||||
AgentJobKind::CsvBatch => "csv_batch",
|
||||
AgentJobKind::DynamicQueue => "dynamic_queue",
|
||||
}
|
||||
}
|
||||
|
||||
pub fn parse(value: &str) -> Result<Self> {
|
||||
match value {
|
||||
"csv_batch" => Ok(Self::CsvBatch),
|
||||
"dynamic_queue" => Ok(Self::DynamicQueue),
|
||||
_ => Err(anyhow::anyhow!("invalid agent job kind: {value}")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AgentJobStatus {
|
||||
pub const fn as_str(self) -> &'static str {
|
||||
match self {
|
||||
@@ -75,9 +98,11 @@ impl AgentJobItemStatus {
|
||||
pub struct AgentJob {
|
||||
pub id: String,
|
||||
pub name: String,
|
||||
pub kind: AgentJobKind,
|
||||
pub status: AgentJobStatus,
|
||||
pub instruction: String,
|
||||
pub auto_export: bool,
|
||||
pub max_items: Option<u64>,
|
||||
pub max_runtime_seconds: Option<u64>,
|
||||
// TODO(jif-oai): Convert to JSON Schema and enforce structured outputs.
|
||||
pub output_schema_json: Option<Value>,
|
||||
@@ -95,8 +120,10 @@ pub struct AgentJob {
|
||||
pub struct AgentJobItem {
|
||||
pub job_id: String,
|
||||
pub item_id: String,
|
||||
pub parent_item_id: Option<String>,
|
||||
pub row_index: i64,
|
||||
pub source_id: Option<String>,
|
||||
pub dedupe_key: Option<String>,
|
||||
pub row_json: Value,
|
||||
pub status: AgentJobItemStatus,
|
||||
pub assigned_thread_id: Option<String>,
|
||||
@@ -118,12 +145,14 @@ pub struct AgentJobProgress {
|
||||
pub failed_items: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct AgentJobCreateParams {
|
||||
pub id: String,
|
||||
pub name: String,
|
||||
pub kind: AgentJobKind,
|
||||
pub instruction: String,
|
||||
pub auto_export: bool,
|
||||
pub max_items: Option<u64>,
|
||||
pub max_runtime_seconds: Option<u64>,
|
||||
pub output_schema_json: Option<Value>,
|
||||
pub input_headers: Vec<String>,
|
||||
@@ -131,21 +160,45 @@ pub struct AgentJobCreateParams {
|
||||
pub output_csv_path: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct AgentJobItemCreateParams {
|
||||
pub item_id: String,
|
||||
pub parent_item_id: Option<String>,
|
||||
pub row_index: i64,
|
||||
pub source_id: Option<String>,
|
||||
pub dedupe_key: Option<String>,
|
||||
pub row_json: Value,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct EnqueueAgentJobItemsResult {
|
||||
pub outcomes: Vec<EnqueueAgentJobItemOutcome>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub enum EnqueueAgentJobItemOutcome {
|
||||
Enqueued {
|
||||
item: AgentJobItem,
|
||||
},
|
||||
Deduped {
|
||||
item: AgentJobItem,
|
||||
},
|
||||
MaxItemsReached {
|
||||
requested_item_id: String,
|
||||
parent_item_id: Option<String>,
|
||||
dedupe_key: Option<String>,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, sqlx::FromRow)]
|
||||
pub(crate) struct AgentJobRow {
|
||||
pub(crate) id: String,
|
||||
pub(crate) name: String,
|
||||
pub(crate) kind: String,
|
||||
pub(crate) status: String,
|
||||
pub(crate) instruction: String,
|
||||
pub(crate) auto_export: i64,
|
||||
pub(crate) max_items: Option<i64>,
|
||||
pub(crate) max_runtime_seconds: Option<i64>,
|
||||
pub(crate) output_schema_json: Option<String>,
|
||||
pub(crate) input_headers_json: String,
|
||||
@@ -168,6 +221,11 @@ impl TryFrom<AgentJobRow> for AgentJob {
|
||||
.map(serde_json::from_str)
|
||||
.transpose()?;
|
||||
let input_headers = serde_json::from_str(value.input_headers_json.as_str())?;
|
||||
let max_items = value
|
||||
.max_items
|
||||
.map(u64::try_from)
|
||||
.transpose()
|
||||
.map_err(|_| anyhow::anyhow!("invalid max_items value"))?;
|
||||
let max_runtime_seconds = value
|
||||
.max_runtime_seconds
|
||||
.map(u64::try_from)
|
||||
@@ -176,9 +234,11 @@ impl TryFrom<AgentJobRow> for AgentJob {
|
||||
Ok(Self {
|
||||
id: value.id,
|
||||
name: value.name,
|
||||
kind: AgentJobKind::parse(value.kind.as_str())?,
|
||||
status: AgentJobStatus::parse(value.status.as_str())?,
|
||||
instruction: value.instruction,
|
||||
auto_export: value.auto_export != 0,
|
||||
max_items,
|
||||
max_runtime_seconds,
|
||||
output_schema_json,
|
||||
input_headers,
|
||||
@@ -203,8 +263,10 @@ impl TryFrom<AgentJobRow> for AgentJob {
|
||||
pub(crate) struct AgentJobItemRow {
|
||||
pub(crate) job_id: String,
|
||||
pub(crate) item_id: String,
|
||||
pub(crate) parent_item_id: Option<String>,
|
||||
pub(crate) row_index: i64,
|
||||
pub(crate) source_id: Option<String>,
|
||||
pub(crate) dedupe_key: Option<String>,
|
||||
pub(crate) row_json: String,
|
||||
pub(crate) status: String,
|
||||
pub(crate) assigned_thread_id: Option<String>,
|
||||
@@ -224,8 +286,10 @@ impl TryFrom<AgentJobItemRow> for AgentJobItem {
|
||||
Ok(Self {
|
||||
job_id: value.job_id,
|
||||
item_id: value.item_id,
|
||||
parent_item_id: value.parent_item_id,
|
||||
row_index: value.row_index,
|
||||
source_id: value.source_id,
|
||||
dedupe_key: value.dedupe_key,
|
||||
row_json: serde_json::from_str(value.row_json.as_str())?,
|
||||
status: AgentJobItemStatus::parse(value.status.as_str())?,
|
||||
assigned_thread_id: value.assigned_thread_id,
|
||||
|
||||
@@ -9,8 +9,11 @@ pub use agent_job::AgentJobCreateParams;
|
||||
pub use agent_job::AgentJobItem;
|
||||
pub use agent_job::AgentJobItemCreateParams;
|
||||
pub use agent_job::AgentJobItemStatus;
|
||||
pub use agent_job::AgentJobKind;
|
||||
pub use agent_job::AgentJobProgress;
|
||||
pub use agent_job::AgentJobStatus;
|
||||
pub use agent_job::EnqueueAgentJobItemOutcome;
|
||||
pub use agent_job::EnqueueAgentJobItemsResult;
|
||||
pub use backfill_state::BackfillState;
|
||||
pub use backfill_state::BackfillStatus;
|
||||
pub use log::LogEntry;
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
use super::*;
|
||||
use crate::model::AgentJobItemRow;
|
||||
use crate::model::AgentJobKind;
|
||||
use crate::model::EnqueueAgentJobItemOutcome;
|
||||
use crate::model::EnqueueAgentJobItemsResult;
|
||||
|
||||
impl StateRuntime {
|
||||
pub async fn create_agent_job(
|
||||
@@ -14,6 +17,11 @@ impl StateRuntime {
|
||||
.as_ref()
|
||||
.map(serde_json::to_string)
|
||||
.transpose()?;
|
||||
let max_items = params
|
||||
.max_items
|
||||
.map(i64::try_from)
|
||||
.transpose()
|
||||
.map_err(|_| anyhow::anyhow!("invalid max_items value"))?;
|
||||
let max_runtime_seconds = params
|
||||
.max_runtime_seconds
|
||||
.map(i64::try_from)
|
||||
@@ -25,9 +33,11 @@ impl StateRuntime {
|
||||
INSERT INTO agent_jobs (
|
||||
id,
|
||||
name,
|
||||
kind,
|
||||
status,
|
||||
instruction,
|
||||
auto_export,
|
||||
max_items,
|
||||
max_runtime_seconds,
|
||||
output_schema_json,
|
||||
input_headers_json,
|
||||
@@ -38,14 +48,16 @@ INSERT INTO agent_jobs (
|
||||
started_at,
|
||||
completed_at,
|
||||
last_error
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL, NULL, NULL)
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL, NULL, NULL)
|
||||
"#,
|
||||
)
|
||||
.bind(params.id.as_str())
|
||||
.bind(params.name.as_str())
|
||||
.bind(params.kind.as_str())
|
||||
.bind(AgentJobStatus::Pending.as_str())
|
||||
.bind(params.instruction.as_str())
|
||||
.bind(i64::from(params.auto_export))
|
||||
.bind(max_items)
|
||||
.bind(max_runtime_seconds)
|
||||
.bind(output_schema_json)
|
||||
.bind(input_headers_json)
|
||||
@@ -63,8 +75,10 @@ INSERT INTO agent_jobs (
|
||||
INSERT INTO agent_job_items (
|
||||
job_id,
|
||||
item_id,
|
||||
parent_item_id,
|
||||
row_index,
|
||||
source_id,
|
||||
dedupe_key,
|
||||
row_json,
|
||||
status,
|
||||
assigned_thread_id,
|
||||
@@ -75,13 +89,15 @@ INSERT INTO agent_job_items (
|
||||
updated_at,
|
||||
completed_at,
|
||||
reported_at
|
||||
) VALUES (?, ?, ?, ?, ?, ?, NULL, 0, NULL, NULL, ?, ?, NULL, NULL)
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, NULL, 0, NULL, NULL, ?, ?, NULL, NULL)
|
||||
"#,
|
||||
)
|
||||
.bind(params.id.as_str())
|
||||
.bind(item.item_id.as_str())
|
||||
.bind(item.parent_item_id.as_deref())
|
||||
.bind(item.row_index)
|
||||
.bind(item.source_id.as_deref())
|
||||
.bind(item.dedupe_key.as_deref())
|
||||
.bind(row_json)
|
||||
.bind(AgentJobItemStatus::Pending.as_str())
|
||||
.bind(now)
|
||||
@@ -104,9 +120,11 @@ INSERT INTO agent_job_items (
|
||||
SELECT
|
||||
id,
|
||||
name,
|
||||
kind,
|
||||
status,
|
||||
instruction,
|
||||
auto_export,
|
||||
max_items,
|
||||
max_runtime_seconds,
|
||||
output_schema_json,
|
||||
input_headers_json,
|
||||
@@ -138,8 +156,10 @@ WHERE id = ?
|
||||
SELECT
|
||||
job_id,
|
||||
item_id,
|
||||
parent_item_id,
|
||||
row_index,
|
||||
source_id,
|
||||
dedupe_key,
|
||||
row_json,
|
||||
status,
|
||||
assigned_thread_id,
|
||||
@@ -181,8 +201,10 @@ WHERE job_id =
|
||||
SELECT
|
||||
job_id,
|
||||
item_id,
|
||||
parent_item_id,
|
||||
row_index,
|
||||
source_id,
|
||||
dedupe_key,
|
||||
row_json,
|
||||
status,
|
||||
assigned_thread_id,
|
||||
@@ -204,6 +226,288 @@ WHERE job_id = ? AND item_id = ?
|
||||
row.map(AgentJobItem::try_from).transpose()
|
||||
}
|
||||
|
||||
pub async fn enqueue_agent_job_items(
|
||||
&self,
|
||||
job_id: &str,
|
||||
parent_item_id: &str,
|
||||
reporting_thread_id: &str,
|
||||
items: &[AgentJobItemCreateParams],
|
||||
) -> anyhow::Result<EnqueueAgentJobItemsResult> {
|
||||
let now = Utc::now().timestamp();
|
||||
let mut tx = self.pool.begin_with("BEGIN IMMEDIATE").await?;
|
||||
|
||||
let job_row = sqlx::query_as::<_, AgentJobRow>(
|
||||
r#"
|
||||
SELECT
|
||||
id,
|
||||
name,
|
||||
kind,
|
||||
status,
|
||||
instruction,
|
||||
auto_export,
|
||||
max_items,
|
||||
max_runtime_seconds,
|
||||
output_schema_json,
|
||||
input_headers_json,
|
||||
input_csv_path,
|
||||
output_csv_path,
|
||||
created_at,
|
||||
updated_at,
|
||||
started_at,
|
||||
completed_at,
|
||||
last_error
|
||||
FROM agent_jobs
|
||||
WHERE id = ?
|
||||
"#,
|
||||
)
|
||||
.bind(job_id)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?
|
||||
.ok_or_else(|| anyhow::anyhow!("agent job {job_id} not found"))?;
|
||||
let job = AgentJob::try_from(job_row)?;
|
||||
|
||||
if job.kind != AgentJobKind::DynamicQueue {
|
||||
return Err(anyhow::anyhow!(
|
||||
"agent job {job_id} does not support queue item enqueue"
|
||||
));
|
||||
}
|
||||
if job.status.is_final() {
|
||||
return Err(anyhow::anyhow!(
|
||||
"agent job {job_id} is already {}",
|
||||
job.status.as_str()
|
||||
));
|
||||
}
|
||||
|
||||
let parent_owner = sqlx::query(
|
||||
r#"
|
||||
SELECT 1
|
||||
FROM agent_job_items
|
||||
WHERE
|
||||
job_id = ?
|
||||
AND item_id = ?
|
||||
AND status = ?
|
||||
AND assigned_thread_id = ?
|
||||
"#,
|
||||
)
|
||||
.bind(job_id)
|
||||
.bind(parent_item_id)
|
||||
.bind(AgentJobItemStatus::Running.as_str())
|
||||
.bind(reporting_thread_id)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?;
|
||||
if parent_owner.is_none() {
|
||||
return Err(anyhow::anyhow!(
|
||||
"agent job parent item {parent_item_id} is not owned by thread {reporting_thread_id}"
|
||||
));
|
||||
}
|
||||
|
||||
let mut total_items: i64 = sqlx::query_scalar(
|
||||
r#"
|
||||
SELECT COUNT(*)
|
||||
FROM agent_job_items
|
||||
WHERE job_id = ?
|
||||
"#,
|
||||
)
|
||||
.bind(job_id)
|
||||
.fetch_one(&mut *tx)
|
||||
.await?;
|
||||
let mut next_row_index: i64 = sqlx::query_scalar(
|
||||
r#"
|
||||
SELECT COALESCE(MAX(row_index), -1)
|
||||
FROM agent_job_items
|
||||
WHERE job_id = ?
|
||||
"#,
|
||||
)
|
||||
.bind(job_id)
|
||||
.fetch_one(&mut *tx)
|
||||
.await?;
|
||||
next_row_index = next_row_index.saturating_add(1);
|
||||
|
||||
let max_items = job
|
||||
.max_items
|
||||
.map(i64::try_from)
|
||||
.transpose()
|
||||
.map_err(|_| anyhow::anyhow!("invalid max_items value"))?;
|
||||
let mut inserted_any = false;
|
||||
let mut reserved_item_ids = BTreeSet::new();
|
||||
let mut outcomes = Vec::with_capacity(items.len());
|
||||
|
||||
for item in items {
|
||||
let resolved_parent_item_id = match item.parent_item_id.as_deref() {
|
||||
Some(item_parent_item_id) if item_parent_item_id != parent_item_id => {
|
||||
return Err(anyhow::anyhow!(
|
||||
"enqueue item {} parent_item_id mismatch: expected {parent_item_id}, got {item_parent_item_id}",
|
||||
item.item_id
|
||||
));
|
||||
}
|
||||
Some(_) | None => Some(parent_item_id.to_string()),
|
||||
};
|
||||
|
||||
if let Some(dedupe_key) = item.dedupe_key.as_deref() {
|
||||
let existing_row = sqlx::query_as::<_, AgentJobItemRow>(
|
||||
r#"
|
||||
SELECT
|
||||
job_id,
|
||||
item_id,
|
||||
parent_item_id,
|
||||
row_index,
|
||||
source_id,
|
||||
dedupe_key,
|
||||
row_json,
|
||||
status,
|
||||
assigned_thread_id,
|
||||
attempt_count,
|
||||
result_json,
|
||||
last_error,
|
||||
created_at,
|
||||
updated_at,
|
||||
completed_at,
|
||||
reported_at
|
||||
FROM agent_job_items
|
||||
WHERE job_id = ? AND dedupe_key = ?
|
||||
"#,
|
||||
)
|
||||
.bind(job_id)
|
||||
.bind(dedupe_key)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?;
|
||||
if let Some(existing_row) = existing_row {
|
||||
outcomes.push(EnqueueAgentJobItemOutcome::Deduped {
|
||||
item: AgentJobItem::try_from(existing_row)?,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if max_items.is_some_and(|max_items| total_items >= max_items) {
|
||||
outcomes.push(EnqueueAgentJobItemOutcome::MaxItemsReached {
|
||||
requested_item_id: item.item_id.clone(),
|
||||
parent_item_id: resolved_parent_item_id,
|
||||
dedupe_key: item.dedupe_key.clone(),
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
let base_item_id = item.item_id.as_str();
|
||||
let mut resolved_item_id = base_item_id.to_string();
|
||||
let mut suffix = 2usize;
|
||||
loop {
|
||||
if reserved_item_ids.contains(resolved_item_id.as_str()) {
|
||||
resolved_item_id = format!("{base_item_id}-{suffix}");
|
||||
suffix = suffix.saturating_add(1);
|
||||
continue;
|
||||
}
|
||||
|
||||
let existing_item = sqlx::query(
|
||||
r#"
|
||||
SELECT 1
|
||||
FROM agent_job_items
|
||||
WHERE job_id = ? AND item_id = ?
|
||||
"#,
|
||||
)
|
||||
.bind(job_id)
|
||||
.bind(resolved_item_id.as_str())
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?;
|
||||
if existing_item.is_none() {
|
||||
break;
|
||||
}
|
||||
|
||||
resolved_item_id = format!("{base_item_id}-{suffix}");
|
||||
suffix = suffix.saturating_add(1);
|
||||
}
|
||||
reserved_item_ids.insert(resolved_item_id.clone());
|
||||
|
||||
let row_json = serde_json::to_string(&item.row_json)?;
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO agent_job_items (
|
||||
job_id,
|
||||
item_id,
|
||||
parent_item_id,
|
||||
row_index,
|
||||
source_id,
|
||||
dedupe_key,
|
||||
row_json,
|
||||
status,
|
||||
assigned_thread_id,
|
||||
attempt_count,
|
||||
result_json,
|
||||
last_error,
|
||||
created_at,
|
||||
updated_at,
|
||||
completed_at,
|
||||
reported_at
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, NULL, 0, NULL, NULL, ?, ?, NULL, NULL)
|
||||
"#,
|
||||
)
|
||||
.bind(job_id)
|
||||
.bind(resolved_item_id.as_str())
|
||||
.bind(resolved_parent_item_id.as_deref())
|
||||
.bind(next_row_index)
|
||||
.bind(item.source_id.as_deref())
|
||||
.bind(item.dedupe_key.as_deref())
|
||||
.bind(row_json)
|
||||
.bind(AgentJobItemStatus::Pending.as_str())
|
||||
.bind(now)
|
||||
.bind(now)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
let inserted_row = sqlx::query_as::<_, AgentJobItemRow>(
|
||||
r#"
|
||||
SELECT
|
||||
job_id,
|
||||
item_id,
|
||||
parent_item_id,
|
||||
row_index,
|
||||
source_id,
|
||||
dedupe_key,
|
||||
row_json,
|
||||
status,
|
||||
assigned_thread_id,
|
||||
attempt_count,
|
||||
result_json,
|
||||
last_error,
|
||||
created_at,
|
||||
updated_at,
|
||||
completed_at,
|
||||
reported_at
|
||||
FROM agent_job_items
|
||||
WHERE job_id = ? AND item_id = ?
|
||||
"#,
|
||||
)
|
||||
.bind(job_id)
|
||||
.bind(resolved_item_id.as_str())
|
||||
.fetch_one(&mut *tx)
|
||||
.await?;
|
||||
outcomes.push(EnqueueAgentJobItemOutcome::Enqueued {
|
||||
item: AgentJobItem::try_from(inserted_row)?,
|
||||
});
|
||||
|
||||
inserted_any = true;
|
||||
total_items = total_items.saturating_add(1);
|
||||
next_row_index = next_row_index.saturating_add(1);
|
||||
}
|
||||
|
||||
if inserted_any {
|
||||
sqlx::query(
|
||||
r#"
|
||||
UPDATE agent_jobs
|
||||
SET updated_at = ?
|
||||
WHERE id = ?
|
||||
"#,
|
||||
)
|
||||
.bind(now)
|
||||
.bind(job_id)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
}
|
||||
|
||||
tx.commit().await?;
|
||||
Ok(EnqueueAgentJobItemsResult { outcomes })
|
||||
}
|
||||
|
||||
pub async fn mark_agent_job_running(&self, job_id: &str) -> anyhow::Result<()> {
|
||||
let now = Utc::now().timestamp();
|
||||
sqlx::query(
|
||||
@@ -560,3 +864,290 @@ WHERE job_id = ?
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::runtime::test_support::unique_temp_dir;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
|
||||
fn queue_job(job_id: &str, max_items: Option<u64>) -> AgentJobCreateParams {
|
||||
AgentJobCreateParams {
|
||||
id: job_id.to_string(),
|
||||
name: format!("queue-job-{job_id}"),
|
||||
kind: AgentJobKind::DynamicQueue,
|
||||
instruction: "process {value}".to_string(),
|
||||
auto_export: false,
|
||||
max_items,
|
||||
max_runtime_seconds: None,
|
||||
output_schema_json: None,
|
||||
input_headers: vec!["value".to_string()],
|
||||
input_csv_path: "seed.csv".to_string(),
|
||||
output_csv_path: "seed-out.csv".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
fn item(
|
||||
item_id: &str,
|
||||
row_index: i64,
|
||||
parent_item_id: Option<&str>,
|
||||
dedupe_key: Option<&str>,
|
||||
value: &str,
|
||||
) -> AgentJobItemCreateParams {
|
||||
AgentJobItemCreateParams {
|
||||
item_id: item_id.to_string(),
|
||||
parent_item_id: parent_item_id.map(str::to_string),
|
||||
row_index,
|
||||
source_id: Some(format!("source-{item_id}")),
|
||||
dedupe_key: dedupe_key.map(str::to_string),
|
||||
row_json: json!({"value": value}),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn create_agent_job_persists_dynamic_queue_fields() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home, "test-provider".to_string())
|
||||
.await
|
||||
.expect("state db should initialize");
|
||||
|
||||
let job = runtime
|
||||
.create_agent_job(
|
||||
&queue_job("job-queue-persist", Some(5)),
|
||||
&[
|
||||
item("root", 0, None, None, "root"),
|
||||
item("child", 1, Some("root"), Some("child-dedupe"), "child"),
|
||||
],
|
||||
)
|
||||
.await
|
||||
.expect("create agent job should succeed");
|
||||
let items = runtime
|
||||
.list_agent_job_items(job.id.as_str(), None, None)
|
||||
.await
|
||||
.expect("list agent job items should succeed");
|
||||
|
||||
assert_eq!(job.kind, AgentJobKind::DynamicQueue);
|
||||
assert_eq!(job.max_items, Some(5));
|
||||
let item_summary: Vec<_> = items
|
||||
.iter()
|
||||
.map(|item| {
|
||||
(
|
||||
item.item_id.clone(),
|
||||
item.parent_item_id.clone(),
|
||||
item.dedupe_key.clone(),
|
||||
item.row_index,
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
assert_eq!(
|
||||
item_summary,
|
||||
vec![
|
||||
("root".to_string(), None, None, 0),
|
||||
(
|
||||
"child".to_string(),
|
||||
Some("root".to_string()),
|
||||
Some("child-dedupe".to_string()),
|
||||
1,
|
||||
),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn enqueue_agent_job_items_dedupes_suffixes_and_caps() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home, "test-provider".to_string())
|
||||
.await
|
||||
.expect("state db should initialize");
|
||||
let job = runtime
|
||||
.create_agent_job(
|
||||
&queue_job("job-queue-enqueue", Some(4)),
|
||||
&[
|
||||
item("root", 0, None, None, "root"),
|
||||
item(
|
||||
"existing",
|
||||
1,
|
||||
Some("root"),
|
||||
Some("existing-dedupe"),
|
||||
"existing",
|
||||
),
|
||||
],
|
||||
)
|
||||
.await
|
||||
.expect("create queue job should succeed");
|
||||
assert!(
|
||||
runtime
|
||||
.mark_agent_job_item_running_with_thread(
|
||||
job.id.as_str(),
|
||||
"root",
|
||||
"thread-owned-parent",
|
||||
)
|
||||
.await
|
||||
.expect("mark parent running should succeed"),
|
||||
"parent item should become running"
|
||||
);
|
||||
|
||||
let result = runtime
|
||||
.enqueue_agent_job_items(
|
||||
job.id.as_str(),
|
||||
"root",
|
||||
"thread-owned-parent",
|
||||
&[
|
||||
item("foo", -1, None, Some("new-a"), "foo-a"),
|
||||
item(
|
||||
"ignored",
|
||||
-1,
|
||||
Some("root"),
|
||||
Some("existing-dedupe"),
|
||||
"deduped",
|
||||
),
|
||||
item("foo", -1, None, Some("new-b"), "foo-b"),
|
||||
item("baz", -1, None, Some("new-c"), "baz"),
|
||||
],
|
||||
)
|
||||
.await
|
||||
.expect("enqueue agent job items should succeed");
|
||||
|
||||
let outcome_summary: Vec<_> = result
|
||||
.outcomes
|
||||
.iter()
|
||||
.map(|outcome| match outcome {
|
||||
EnqueueAgentJobItemOutcome::Enqueued { item } => (
|
||||
"enqueued",
|
||||
item.item_id.clone(),
|
||||
item.parent_item_id.clone(),
|
||||
item.dedupe_key.clone(),
|
||||
Some(item.row_index),
|
||||
),
|
||||
EnqueueAgentJobItemOutcome::Deduped { item } => (
|
||||
"deduped",
|
||||
item.item_id.clone(),
|
||||
item.parent_item_id.clone(),
|
||||
item.dedupe_key.clone(),
|
||||
Some(item.row_index),
|
||||
),
|
||||
EnqueueAgentJobItemOutcome::MaxItemsReached {
|
||||
requested_item_id,
|
||||
parent_item_id,
|
||||
dedupe_key,
|
||||
} => (
|
||||
"max_items_reached",
|
||||
requested_item_id.clone(),
|
||||
parent_item_id.clone(),
|
||||
dedupe_key.clone(),
|
||||
None,
|
||||
),
|
||||
})
|
||||
.collect();
|
||||
assert_eq!(
|
||||
outcome_summary,
|
||||
vec![
|
||||
(
|
||||
"enqueued",
|
||||
"foo".to_string(),
|
||||
Some("root".to_string()),
|
||||
Some("new-a".to_string()),
|
||||
Some(2),
|
||||
),
|
||||
(
|
||||
"deduped",
|
||||
"existing".to_string(),
|
||||
Some("root".to_string()),
|
||||
Some("existing-dedupe".to_string()),
|
||||
Some(1),
|
||||
),
|
||||
(
|
||||
"enqueued",
|
||||
"foo-2".to_string(),
|
||||
Some("root".to_string()),
|
||||
Some("new-b".to_string()),
|
||||
Some(3),
|
||||
),
|
||||
(
|
||||
"max_items_reached",
|
||||
"baz".to_string(),
|
||||
Some("root".to_string()),
|
||||
Some("new-c".to_string()),
|
||||
None,
|
||||
),
|
||||
]
|
||||
);
|
||||
|
||||
let persisted_items = runtime
|
||||
.list_agent_job_items(job.id.as_str(), None, None)
|
||||
.await
|
||||
.expect("list agent job items should succeed");
|
||||
let persisted_summary: Vec<_> = persisted_items
|
||||
.iter()
|
||||
.map(|item| {
|
||||
(
|
||||
item.item_id.clone(),
|
||||
item.parent_item_id.clone(),
|
||||
item.dedupe_key.clone(),
|
||||
item.row_index,
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
assert_eq!(
|
||||
persisted_summary,
|
||||
vec![
|
||||
("root".to_string(), None, None, 0),
|
||||
(
|
||||
"existing".to_string(),
|
||||
Some("root".to_string()),
|
||||
Some("existing-dedupe".to_string()),
|
||||
1,
|
||||
),
|
||||
(
|
||||
"foo".to_string(),
|
||||
Some("root".to_string()),
|
||||
Some("new-a".to_string()),
|
||||
2,
|
||||
),
|
||||
(
|
||||
"foo-2".to_string(),
|
||||
Some("root".to_string()),
|
||||
Some("new-b".to_string()),
|
||||
3,
|
||||
),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn enqueue_agent_job_items_rejects_unowned_parent() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home, "test-provider".to_string())
|
||||
.await
|
||||
.expect("state db should initialize");
|
||||
let job = runtime
|
||||
.create_agent_job(
|
||||
&queue_job("job-queue-parent-check", Some(8)),
|
||||
&[item("root", 0, None, None, "root")],
|
||||
)
|
||||
.await
|
||||
.expect("create queue job should succeed");
|
||||
assert!(
|
||||
runtime
|
||||
.mark_agent_job_item_running_with_thread(job.id.as_str(), "root", "thread-a")
|
||||
.await
|
||||
.expect("mark parent running should succeed"),
|
||||
"parent item should become running"
|
||||
);
|
||||
|
||||
let err = runtime
|
||||
.enqueue_agent_job_items(
|
||||
job.id.as_str(),
|
||||
"root",
|
||||
"thread-b",
|
||||
&[item("child", -1, None, Some("child-a"), "child")],
|
||||
)
|
||||
.await
|
||||
.expect_err("enqueue should reject unowned parent");
|
||||
assert!(
|
||||
err.to_string().contains("not owned by thread thread-b"),
|
||||
"unexpected error: {err:?}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user