mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
Remove agent job resume tool
This commit is contained in:
@@ -22,7 +22,6 @@ use once_cell::sync::Lazy;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::path::Path;
|
||||
@@ -61,13 +60,6 @@ struct JobIdArgs {
|
||||
job_id: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct RunAgentJobArgs {
|
||||
job_id: String,
|
||||
max_concurrency: Option<usize>,
|
||||
auto_export: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct WaitAgentJobArgs {
|
||||
job_id: String,
|
||||
@@ -165,7 +157,6 @@ impl ToolHandler for AgentJobsHandler {
|
||||
|
||||
match tool_name.as_str() {
|
||||
"spawn_agents_on_csv" => spawn_agents_on_csv::handle(session, turn, arguments).await,
|
||||
"run_agent_job" => run_agent_job::handle(session, turn, arguments).await,
|
||||
"get_agent_job_status" => get_agent_job_status::handle(session, arguments).await,
|
||||
"wait_agent_job" => wait_agent_job::handle(session, arguments).await,
|
||||
"export_agent_job_csv" => export_agent_job_csv::handle(session, turn, arguments).await,
|
||||
@@ -311,60 +302,6 @@ mod spawn_agents_on_csv {
|
||||
}
|
||||
}
|
||||
|
||||
mod run_agent_job {
|
||||
use super::*;
|
||||
|
||||
pub async fn handle(
|
||||
session: Arc<Session>,
|
||||
turn: Arc<TurnContext>,
|
||||
arguments: String,
|
||||
) -> Result<ToolOutput, FunctionCallError> {
|
||||
let args: RunAgentJobArgs = parse_arguments(arguments.as_str())?;
|
||||
let job_id = args.job_id;
|
||||
let db = required_state_db(&session)?;
|
||||
let job = db
|
||||
.get_agent_job(job_id.as_str())
|
||||
.await
|
||||
.map_err(|err| {
|
||||
FunctionCallError::RespondToModel(format!(
|
||||
"failed to load agent job {job_id}: {err}"
|
||||
))
|
||||
})?
|
||||
.ok_or_else(|| {
|
||||
FunctionCallError::RespondToModel(format!("agent job {job_id} not found"))
|
||||
})?;
|
||||
if !job_runner_active(job_id.as_str()).await {
|
||||
db.reset_agent_job_running_items(job_id.as_str())
|
||||
.await
|
||||
.map_err(|err| {
|
||||
FunctionCallError::RespondToModel(format!(
|
||||
"failed to reset running items for agent job {job_id}: {err}"
|
||||
))
|
||||
})?;
|
||||
}
|
||||
let options = build_runner_options(
|
||||
&session,
|
||||
&turn,
|
||||
args.max_concurrency,
|
||||
args.auto_export.or(Some(job.auto_export)),
|
||||
)
|
||||
.await?;
|
||||
let started = start_job_runner(session, job_id.clone(), options).await?;
|
||||
let status = render_job_status(db, job_id.as_str()).await?;
|
||||
let content = serde_json::to_string(&json!({
|
||||
"started": started,
|
||||
"status": status,
|
||||
}))
|
||||
.map_err(|err| {
|
||||
FunctionCallError::Fatal(format!("failed to serialize run_agent_job result: {err}"))
|
||||
})?;
|
||||
Ok(ToolOutput::Function {
|
||||
body: FunctionCallOutputBody::Text(content),
|
||||
success: Some(true),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
mod get_agent_job_status {
|
||||
use super::*;
|
||||
|
||||
@@ -1194,6 +1131,7 @@ fn csv_escape(value: &str) -> String {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
|
||||
#[test]
|
||||
fn parse_csv_supports_quotes_and_commas() {
|
||||
|
||||
@@ -556,44 +556,6 @@ fn create_spawn_agents_on_csv_tool() -> ToolSpec {
|
||||
})
|
||||
}
|
||||
|
||||
fn create_run_agent_job_tool() -> ToolSpec {
|
||||
let mut properties = BTreeMap::new();
|
||||
properties.insert(
|
||||
"job_id".to_string(),
|
||||
JsonSchema::String {
|
||||
description: Some("Identifier of the job to resume.".to_string()),
|
||||
},
|
||||
);
|
||||
properties.insert(
|
||||
"max_concurrency".to_string(),
|
||||
JsonSchema::Number {
|
||||
description: Some(
|
||||
"Maximum concurrent workers for this job. Defaults to 64 and is capped by config."
|
||||
.to_string(),
|
||||
),
|
||||
},
|
||||
);
|
||||
properties.insert(
|
||||
"auto_export".to_string(),
|
||||
JsonSchema::Boolean {
|
||||
description: Some(
|
||||
"Whether to auto-export CSV output on successful completion (default true)."
|
||||
.to_string(),
|
||||
),
|
||||
},
|
||||
);
|
||||
ToolSpec::Function(ResponsesApiTool {
|
||||
name: "run_agent_job".to_string(),
|
||||
description: "Resume execution of an existing agent job.".to_string(),
|
||||
strict: false,
|
||||
parameters: JsonSchema::Object {
|
||||
properties,
|
||||
required: Some(vec!["job_id".to_string()]),
|
||||
additional_properties: Some(false.into()),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
fn create_get_agent_job_status_tool() -> ToolSpec {
|
||||
let mut properties = BTreeMap::new();
|
||||
properties.insert(
|
||||
@@ -1614,13 +1576,11 @@ pub(crate) fn build_specs(
|
||||
if config.collab_tools {
|
||||
let agent_jobs_handler = Arc::new(AgentJobsHandler);
|
||||
builder.push_spec(create_spawn_agents_on_csv_tool());
|
||||
builder.push_spec(create_run_agent_job_tool());
|
||||
builder.push_spec(create_get_agent_job_status_tool());
|
||||
builder.push_spec(create_wait_agent_job_tool());
|
||||
builder.push_spec(create_export_agent_job_csv_tool());
|
||||
builder.push_spec(create_report_agent_job_result_tool());
|
||||
builder.register_handler("spawn_agents_on_csv", agent_jobs_handler.clone());
|
||||
builder.register_handler("run_agent_job", agent_jobs_handler.clone());
|
||||
builder.register_handler("get_agent_job_status", agent_jobs_handler.clone());
|
||||
builder.register_handler("wait_agent_job", agent_jobs_handler.clone());
|
||||
builder.register_handler("export_agent_job_csv", agent_jobs_handler.clone());
|
||||
@@ -1885,7 +1845,6 @@ mod tests {
|
||||
"wait",
|
||||
"close_agent",
|
||||
"spawn_agents_on_csv",
|
||||
"run_agent_job",
|
||||
"get_agent_job_status",
|
||||
"wait_agent_job",
|
||||
"export_agent_job_csv",
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
This document describes the generic batch job engine used for large agentic workloads.
|
||||
Agent jobs are designed to be:
|
||||
|
||||
1. Resumable and durable via SQLite.
|
||||
1. Durable via SQLite for progress tracking and export.
|
||||
2. Bounded by configured concurrency and thread limits.
|
||||
3. Observable via explicit status/progress tools.
|
||||
4. Exportable to CSV at stage boundaries.
|
||||
@@ -30,11 +30,6 @@ Optional args:
|
||||
- `max_concurrency`: cap on parallel workers (defaults to 64, then capped by config).
|
||||
- `auto_export`: auto-export CSV on successful completion (default true).
|
||||
|
||||
### `run_agent_job`
|
||||
|
||||
Resume an existing job by id. Jobs auto-start when created. When resuming, any items
|
||||
left in `running` state are reset to `pending` unless they already reported a result.
|
||||
|
||||
### `get_agent_job_status`
|
||||
|
||||
Return job status and progress counters. Most flows should prefer `wait_agent_job`
|
||||
|
||||
@@ -1165,54 +1165,6 @@ WHERE
|
||||
Ok(result.rows_affected() > 0)
|
||||
}
|
||||
|
||||
pub async fn reset_agent_job_running_items(&self, job_id: &str) -> anyhow::Result<()> {
|
||||
let now = Utc::now().timestamp();
|
||||
sqlx::query(
|
||||
r#"
|
||||
UPDATE agent_job_items
|
||||
SET
|
||||
status = ?,
|
||||
completed_at = ?,
|
||||
updated_at = ?,
|
||||
assigned_thread_id = NULL,
|
||||
last_error = NULL
|
||||
WHERE
|
||||
job_id = ?
|
||||
AND status = ?
|
||||
AND result_json IS NOT NULL
|
||||
"#,
|
||||
)
|
||||
.bind(AgentJobItemStatus::Completed.as_str())
|
||||
.bind(now)
|
||||
.bind(now)
|
||||
.bind(job_id)
|
||||
.bind(AgentJobItemStatus::Running.as_str())
|
||||
.execute(self.pool.as_ref())
|
||||
.await?;
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
UPDATE agent_job_items
|
||||
SET
|
||||
status = ?,
|
||||
updated_at = ?,
|
||||
assigned_thread_id = NULL,
|
||||
last_error = ?
|
||||
WHERE
|
||||
job_id = ?
|
||||
AND status = ?
|
||||
"#,
|
||||
)
|
||||
.bind(AgentJobItemStatus::Pending.as_str())
|
||||
.bind(now)
|
||||
.bind("reset to pending after resume")
|
||||
.bind(job_id)
|
||||
.bind(AgentJobItemStatus::Running.as_str())
|
||||
.execute(self.pool.as_ref())
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_agent_job_progress(&self, job_id: &str) -> anyhow::Result<AgentJobProgress> {
|
||||
let row = sqlx::query(
|
||||
r#"
|
||||
@@ -1499,16 +1451,12 @@ mod tests {
|
||||
use super::StateRuntime;
|
||||
use super::ThreadMetadata;
|
||||
use super::state_db_filename;
|
||||
use crate::AgentJobCreateParams;
|
||||
use crate::AgentJobItemCreateParams;
|
||||
use crate::AgentJobItemStatus;
|
||||
use chrono::DateTime;
|
||||
use chrono::Utc;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use sqlx::Row;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
@@ -1601,88 +1549,6 @@ mod tests {
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reset_running_items_for_resume_updates_statuses() {
|
||||
let codex_home = unique_temp_dir();
|
||||
tokio::fs::create_dir_all(&codex_home)
|
||||
.await
|
||||
.expect("create codex_home");
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
|
||||
let job_id = Uuid::new_v4().to_string();
|
||||
let job = runtime
|
||||
.create_agent_job(
|
||||
&AgentJobCreateParams {
|
||||
id: job_id.clone(),
|
||||
name: "resume-test".to_string(),
|
||||
instruction: "do work".to_string(),
|
||||
auto_export: true,
|
||||
output_schema_json: None,
|
||||
input_headers: vec!["path".to_string()],
|
||||
input_csv_path: "/tmp/input.csv".to_string(),
|
||||
output_csv_path: "/tmp/output.csv".to_string(),
|
||||
},
|
||||
&[
|
||||
AgentJobItemCreateParams {
|
||||
item_id: "row-1".to_string(),
|
||||
row_index: 0,
|
||||
source_id: None,
|
||||
row_json: json!({ "path": "a.txt" }),
|
||||
},
|
||||
AgentJobItemCreateParams {
|
||||
item_id: "row-2".to_string(),
|
||||
row_index: 1,
|
||||
source_id: None,
|
||||
row_json: json!({ "path": "b.txt" }),
|
||||
},
|
||||
],
|
||||
)
|
||||
.await
|
||||
.expect("create job");
|
||||
|
||||
runtime
|
||||
.mark_agent_job_item_running(job.id.as_str(), "row-1")
|
||||
.await
|
||||
.expect("mark running");
|
||||
runtime
|
||||
.report_agent_job_item_result(job.id.as_str(), "row-1", "thread-1", &json!({"ok":true}))
|
||||
.await
|
||||
.expect("report result");
|
||||
|
||||
runtime
|
||||
.mark_agent_job_item_running(job.id.as_str(), "row-2")
|
||||
.await
|
||||
.expect("mark running");
|
||||
runtime
|
||||
.set_agent_job_item_thread(job.id.as_str(), "row-2", "thread-2")
|
||||
.await
|
||||
.expect("set thread");
|
||||
|
||||
runtime
|
||||
.reset_agent_job_running_items(job.id.as_str())
|
||||
.await
|
||||
.expect("reset running items");
|
||||
|
||||
let completed = runtime
|
||||
.list_agent_job_items(job.id.as_str(), Some(AgentJobItemStatus::Completed), None)
|
||||
.await
|
||||
.expect("list completed");
|
||||
let pending = runtime
|
||||
.list_agent_job_items(job.id.as_str(), Some(AgentJobItemStatus::Pending), None)
|
||||
.await
|
||||
.expect("list pending");
|
||||
|
||||
assert_eq!(completed.len(), 1);
|
||||
assert_eq!(completed[0].item_id, "row-1");
|
||||
assert_eq!(completed[0].assigned_thread_id, None);
|
||||
|
||||
assert_eq!(pending.len(), 1);
|
||||
assert_eq!(pending[0].item_id, "row-2");
|
||||
assert_eq!(pending[0].assigned_thread_id, None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn backfill_state_persists_progress_and_completion() {
|
||||
let codex_home = unique_temp_dir();
|
||||
|
||||
Reference in New Issue
Block a user