diff --git a/codex-rs/core/src/tools/handlers/agent_jobs.rs b/codex-rs/core/src/tools/handlers/agent_jobs.rs index 4c3cd34c2e..f11fdc53b3 100644 --- a/codex-rs/core/src/tools/handlers/agent_jobs.rs +++ b/codex-rs/core/src/tools/handlers/agent_jobs.rs @@ -196,6 +196,12 @@ async fn run_agent_job_loop( ) .await?; for item in pending_items { + let claimed = db + .mark_agent_job_item_running(job_id.as_str(), item.item_id.as_str()) + .await?; + if !claimed { + continue; + } let prompt = build_worker_prompt(&job, &item)?; let items = vec![UserInput::Text { text: prompt, @@ -240,7 +246,7 @@ async fn run_agent_job_loop( } }; let assigned = db - .mark_agent_job_item_running_with_thread( + .set_agent_job_item_thread( job_id.as_str(), item.item_id.as_str(), thread_id.to_string().as_str(), diff --git a/codex-rs/state/src/runtime/agent_jobs.rs b/codex-rs/state/src/runtime/agent_jobs.rs index fc0e75640e..e72b80d5f7 100644 --- a/codex-rs/state/src/runtime/agent_jobs.rs +++ b/codex-rs/state/src/runtime/agent_jobs.rs @@ -484,7 +484,7 @@ WHERE job_id = ? AND item_id = ? AND status = ? - AND assigned_thread_id = ? + AND (assigned_thread_id = ? OR assigned_thread_id IS NULL) "#, ) .bind(AgentJobItemStatus::Completed.as_str()) @@ -756,6 +756,67 @@ mod tests { Ok(()) } + #[tokio::test] + async fn report_agent_job_item_result_accepts_unassigned_running_item() -> anyhow::Result<()> { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home, "test-provider".to_string()).await?; + let job_id = "job-1".to_string(); + let item_id = "item-1".to_string(); + let thread_id = "thread-1".to_string(); + runtime + .create_agent_job( + &AgentJobCreateParams { + id: job_id.clone(), + name: "test-job".to_string(), + instruction: "Return a result".to_string(), + auto_export: true, + max_runtime_seconds: None, + output_schema_json: None, + input_headers: vec!["path".to_string()], + input_csv_path: "/tmp/in.csv".to_string(), + output_csv_path: "/tmp/out.csv".to_string(), + }, + &[AgentJobItemCreateParams { + item_id: item_id.clone(), + row_index: 0, + source_id: None, + row_json: json!({"path":"file-1"}), + }], + ) + .await?; + runtime.mark_agent_job_running(job_id.as_str()).await?; + let marked_running = runtime + .mark_agent_job_item_running(job_id.as_str(), item_id.as_str()) + .await?; + assert!(marked_running); + + let accepted = runtime + .report_agent_job_item_result_and_cancel_job( + job_id.as_str(), + item_id.as_str(), + thread_id.as_str(), + &json!({"ok": true}), + "cancelled by worker request", + ) + .await?; + assert!(accepted); + + let job = runtime + .get_agent_job(job_id.as_str()) + .await? + .expect("job should exist"); + assert_eq!(job.status, AgentJobStatus::Cancelled); + let item = runtime + .get_agent_job_item(job_id.as_str(), item_id.as_str()) + .await? + .expect("job item should exist"); + assert_eq!(item.status, AgentJobItemStatus::Completed); + assert_eq!(item.result_json, Some(json!({"ok": true}))); + assert_eq!(item.assigned_thread_id, None); + + Ok(()) + } + #[tokio::test] async fn report_agent_job_item_result_rejects_late_reports() -> anyhow::Result<()> { let codex_home = unique_temp_dir();