Fix agent job worker assignment race

Claim job items before spawning workers and allow reports to complete unassigned running items, so fast workers cannot lose stop=true reports before the parent records their thread id.

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
starr-openai
2026-05-07 03:47:55 -07:00
parent 75302e7b29
commit 95e20370e8
2 changed files with 69 additions and 2 deletions

View File

@@ -196,6 +196,12 @@ async fn run_agent_job_loop(
)
.await?;
for item in pending_items {
let claimed = db
.mark_agent_job_item_running(job_id.as_str(), item.item_id.as_str())
.await?;
if !claimed {
continue;
}
let prompt = build_worker_prompt(&job, &item)?;
let items = vec![UserInput::Text {
text: prompt,
@@ -240,7 +246,7 @@ async fn run_agent_job_loop(
}
};
let assigned = db
.mark_agent_job_item_running_with_thread(
.set_agent_job_item_thread(
job_id.as_str(),
item.item_id.as_str(),
thread_id.to_string().as_str(),

View File

@@ -484,7 +484,7 @@ WHERE
job_id = ?
AND item_id = ?
AND status = ?
AND assigned_thread_id = ?
AND (assigned_thread_id = ? OR assigned_thread_id IS NULL)
"#,
)
.bind(AgentJobItemStatus::Completed.as_str())
@@ -756,6 +756,67 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn report_agent_job_item_result_accepts_unassigned_running_item() -> anyhow::Result<()> {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home, "test-provider".to_string()).await?;
let job_id = "job-1".to_string();
let item_id = "item-1".to_string();
let thread_id = "thread-1".to_string();
runtime
.create_agent_job(
&AgentJobCreateParams {
id: job_id.clone(),
name: "test-job".to_string(),
instruction: "Return a result".to_string(),
auto_export: true,
max_runtime_seconds: None,
output_schema_json: None,
input_headers: vec!["path".to_string()],
input_csv_path: "/tmp/in.csv".to_string(),
output_csv_path: "/tmp/out.csv".to_string(),
},
&[AgentJobItemCreateParams {
item_id: item_id.clone(),
row_index: 0,
source_id: None,
row_json: json!({"path":"file-1"}),
}],
)
.await?;
runtime.mark_agent_job_running(job_id.as_str()).await?;
let marked_running = runtime
.mark_agent_job_item_running(job_id.as_str(), item_id.as_str())
.await?;
assert!(marked_running);
let accepted = runtime
.report_agent_job_item_result_and_cancel_job(
job_id.as_str(),
item_id.as_str(),
thread_id.as_str(),
&json!({"ok": true}),
"cancelled by worker request",
)
.await?;
assert!(accepted);
let job = runtime
.get_agent_job(job_id.as_str())
.await?
.expect("job should exist");
assert_eq!(job.status, AgentJobStatus::Cancelled);
let item = runtime
.get_agent_job_item(job_id.as_str(), item_id.as_str())
.await?
.expect("job item should exist");
assert_eq!(item.status, AgentJobItemStatus::Completed);
assert_eq!(item.result_json, Some(json!({"ok": true})));
assert_eq!(item.assigned_thread_id, None);
Ok(())
}
#[tokio::test]
async fn report_agent_job_item_result_rejects_late_reports() -> anyhow::Result<()> {
let codex_home = unique_temp_dir();