Compare commits

...

7 Commits

Author SHA1 Message Date
Dave Aitel
9c3aa81e12 Revert "Honor OPENAI_API_KEY for auth"
This reverts commit af934ab7ec.
2026-02-06 10:02:20 -05:00
Dave Aitel
af934ab7ec Honor OPENAI_API_KEY for auth 2026-02-06 09:52:59 -05:00
Dave Aitel
263cacc2bb Default agent job concurrency to 64 2026-02-05 22:29:15 -05:00
Dave Aitel
9ceb403655 Add CLI example for agent jobs 2026-02-05 13:44:08 -05:00
Dave Aitel
caddbe161c Auto-export agent job CSV on success 2026-02-05 13:05:12 -05:00
Dave Aitel
001cfb5bba Fix collab tools at max agent depth 2026-02-05 12:45:18 -05:00
Dave Aitel
e112e79b8a Add agent job runner for CSV batches 2026-02-05 12:04:23 -05:00
13 changed files with 2336 additions and 12 deletions

View File

@@ -5,6 +5,7 @@ pub(crate) mod status;
pub(crate) use codex_protocol::protocol::AgentStatus;
pub(crate) use control::AgentControl;
#[cfg(test)]
pub(crate) use guards::MAX_THREAD_SPAWN_DEPTH;
pub(crate) use guards::exceeds_thread_spawn_depth_limit;
pub(crate) use guards::next_thread_spawn_depth;

View File

@@ -10,8 +10,8 @@ use crate::CodexAuth;
use crate::SandboxState;
use crate::agent::AgentControl;
use crate::agent::AgentStatus;
use crate::agent::MAX_THREAD_SPAWN_DEPTH;
use crate::agent::agent_status_from_event;
use crate::agent::exceeds_thread_spawn_depth_limit;
use crate::analytics_client::AnalyticsEventsClient;
use crate::analytics_client::build_track_events_context;
use crate::compact;
@@ -274,7 +274,7 @@ impl Codex {
}
if let SessionSource::SubAgent(SubAgentSource::ThreadSpawn { depth, .. }) = session_source
&& depth >= MAX_THREAD_SPAWN_DEPTH
&& exceeds_thread_spawn_depth_limit(depth)
{
config.features.disable(Feature::Collab);
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,10 +1,8 @@
use crate::agent::AgentStatus;
use crate::agent::exceeds_thread_spawn_depth_limit;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::config::Config;
use crate::error::CodexErr;
use crate::features::Feature;
use crate::function_tool::FunctionCallError;
use crate::tools::context::ToolInvocation;
use crate::tools::context::ToolOutput;
@@ -585,10 +583,10 @@ fn collab_agent_error(agent_id: ThreadId, err: CodexErr) -> FunctionCallError {
}
}
fn build_agent_spawn_config(
pub(crate) fn build_agent_spawn_config(
base_instructions: &BaseInstructions,
turn: &TurnContext,
child_depth: i32,
_child_depth: i32,
) -> Result<Config, FunctionCallError> {
let base_config = turn.config.clone();
let mut config = (*base_config).clone();
@@ -615,11 +613,6 @@ fn build_agent_spawn_config(
FunctionCallError::RespondToModel(format!("sandbox_policy is invalid: {err}"))
})?;
// If the new agent will be at max depth:
if exceeds_thread_spawn_depth_limit(child_depth + 1) {
config.features.disable(Feature::Collab);
}
Ok(config)
}

View File

@@ -1,3 +1,4 @@
pub(crate) mod agent_jobs;
pub mod apply_patch;
pub(crate) mod collab;
mod dynamic;

View File

@@ -4,6 +4,7 @@ use crate::client_common::tools::ToolSpec;
use crate::features::Feature;
use crate::features::Features;
use crate::tools::handlers::PLAN_TOOL;
use crate::tools::handlers::agent_jobs::AgentJobsHandler;
use crate::tools::handlers::apply_patch::create_apply_patch_freeform_tool;
use crate::tools::handlers::apply_patch::create_apply_patch_json_tool;
use crate::tools::handlers::collab::DEFAULT_WAIT_TIMEOUT_MS;
@@ -481,6 +482,226 @@ fn create_spawn_agent_tool() -> ToolSpec {
})
}
fn create_spawn_agents_on_csv_tool() -> ToolSpec {
let mut properties = BTreeMap::new();
properties.insert(
"csv_path".to_string(),
JsonSchema::String {
description: Some("Path to the CSV file containing input rows.".to_string()),
},
);
properties.insert(
"instruction".to_string(),
JsonSchema::String {
description: Some("Instruction to apply to each CSV row.".to_string()),
},
);
properties.insert(
"id_column".to_string(),
JsonSchema::String {
description: Some("Optional column name to use as stable item id.".to_string()),
},
);
properties.insert(
"job_name".to_string(),
JsonSchema::String {
description: Some("Optional friendly name for the job.".to_string()),
},
);
properties.insert(
"output_csv_path".to_string(),
JsonSchema::String {
description: Some("Optional output CSV path for exported results.".to_string()),
},
);
properties.insert(
"output_schema".to_string(),
JsonSchema::Object {
properties: BTreeMap::new(),
required: None,
additional_properties: None,
},
);
properties.insert(
"max_concurrency".to_string(),
JsonSchema::Number {
description: Some(
"Maximum concurrent workers for this job. Defaults to a safe value capped by config."
.to_string(),
),
},
);
properties.insert(
"auto_export".to_string(),
JsonSchema::Boolean {
description: Some(
"Whether to auto-export CSV output on successful completion (default true)."
.to_string(),
),
},
);
ToolSpec::Function(ResponsesApiTool {
name: "spawn_agents_on_csv".to_string(),
description: "Create and run a batch agent job over a CSV file.".to_string(),
strict: false,
parameters: JsonSchema::Object {
properties,
required: Some(vec!["csv_path".to_string(), "instruction".to_string()]),
additional_properties: Some(false.into()),
},
})
}
fn create_run_agent_job_tool() -> ToolSpec {
let mut properties = BTreeMap::new();
properties.insert(
"job_id".to_string(),
JsonSchema::String {
description: Some("Identifier of the job to start.".to_string()),
},
);
properties.insert(
"max_concurrency".to_string(),
JsonSchema::Number {
description: Some(
"Maximum concurrent workers for this job. Defaults to a safe value capped by config."
.to_string(),
),
},
);
properties.insert(
"auto_export".to_string(),
JsonSchema::Boolean {
description: Some(
"Whether to auto-export CSV output on successful completion (default true)."
.to_string(),
),
},
);
ToolSpec::Function(ResponsesApiTool {
name: "run_agent_job".to_string(),
description: "Start or resume execution of an existing agent job.".to_string(),
strict: false,
parameters: JsonSchema::Object {
properties,
required: Some(vec!["job_id".to_string()]),
additional_properties: Some(false.into()),
},
})
}
fn create_get_agent_job_status_tool() -> ToolSpec {
let mut properties = BTreeMap::new();
properties.insert(
"job_id".to_string(),
JsonSchema::String {
description: Some("Identifier of the job to inspect.".to_string()),
},
);
ToolSpec::Function(ResponsesApiTool {
name: "get_agent_job_status".to_string(),
description: "Fetch job status and progress counters.".to_string(),
strict: false,
parameters: JsonSchema::Object {
properties,
required: Some(vec!["job_id".to_string()]),
additional_properties: Some(false.into()),
},
})
}
fn create_wait_agent_job_tool() -> ToolSpec {
let mut properties = BTreeMap::new();
properties.insert(
"job_id".to_string(),
JsonSchema::String {
description: Some("Identifier of the job to wait on.".to_string()),
},
);
properties.insert(
"timeout_ms".to_string(),
JsonSchema::Number {
description: Some(
"Maximum time in milliseconds to wait for job completion.".to_string(),
),
},
);
ToolSpec::Function(ResponsesApiTool {
name: "wait_agent_job".to_string(),
description: "Wait for an agent job to complete or time out.".to_string(),
strict: false,
parameters: JsonSchema::Object {
properties,
required: Some(vec!["job_id".to_string()]),
additional_properties: Some(false.into()),
},
})
}
fn create_export_agent_job_csv_tool() -> ToolSpec {
let mut properties = BTreeMap::new();
properties.insert(
"job_id".to_string(),
JsonSchema::String {
description: Some("Identifier of the job to export.".to_string()),
},
);
properties.insert(
"path".to_string(),
JsonSchema::String {
description: Some("Optional output CSV path override.".to_string()),
},
);
ToolSpec::Function(ResponsesApiTool {
name: "export_agent_job_csv".to_string(),
description: "Export job results to a CSV file.".to_string(),
strict: false,
parameters: JsonSchema::Object {
properties,
required: Some(vec!["job_id".to_string()]),
additional_properties: Some(false.into()),
},
})
}
fn create_report_agent_job_result_tool() -> ToolSpec {
let mut properties = BTreeMap::new();
properties.insert(
"job_id".to_string(),
JsonSchema::String {
description: Some("Identifier of the job.".to_string()),
},
);
properties.insert(
"item_id".to_string(),
JsonSchema::String {
description: Some("Identifier of the job item.".to_string()),
},
);
properties.insert(
"result".to_string(),
JsonSchema::Object {
properties: BTreeMap::new(),
required: None,
additional_properties: None,
},
);
ToolSpec::Function(ResponsesApiTool {
name: "report_agent_job_result".to_string(),
description: "Report a worker result for an agent job item.".to_string(),
strict: false,
parameters: JsonSchema::Object {
properties,
required: Some(vec![
"job_id".to_string(),
"item_id".to_string(),
"result".to_string(),
]),
additional_properties: Some(false.into()),
},
})
}
fn create_send_input_tool() -> ToolSpec {
let mut properties = BTreeMap::new();
properties.insert(
@@ -1386,6 +1607,22 @@ pub(crate) fn build_specs(
builder.register_handler("close_agent", collab_handler);
}
if config.collab_tools {
let agent_jobs_handler = Arc::new(AgentJobsHandler);
builder.push_spec(create_spawn_agents_on_csv_tool());
builder.push_spec(create_run_agent_job_tool());
builder.push_spec(create_get_agent_job_status_tool());
builder.push_spec(create_wait_agent_job_tool());
builder.push_spec(create_export_agent_job_csv_tool());
builder.push_spec(create_report_agent_job_result_tool());
builder.register_handler("spawn_agents_on_csv", agent_jobs_handler.clone());
builder.register_handler("run_agent_job", agent_jobs_handler.clone());
builder.register_handler("get_agent_job_status", agent_jobs_handler.clone());
builder.register_handler("wait_agent_job", agent_jobs_handler.clone());
builder.register_handler("export_agent_job_csv", agent_jobs_handler.clone());
builder.register_handler("report_agent_job_result", agent_jobs_handler);
}
if let Some(mcp_tools) = mcp_tools {
let mut entries: Vec<(String, rmcp::model::Tool)> = mcp_tools.into_iter().collect();
entries.sort_by(|a, b| a.0.cmp(&b.0));
@@ -1638,7 +1875,18 @@ mod tests {
let (tools, _) = build_specs(&tools_config, None, &[]).build();
assert_contains_tool_names(
&tools,
&["spawn_agent", "send_input", "wait", "close_agent"],
&[
"spawn_agent",
"send_input",
"wait",
"close_agent",
"spawn_agents_on_csv",
"run_agent_job",
"get_agent_job_status",
"wait_agent_job",
"export_agent_job_csv",
"report_agent_job_result",
],
);
}

102
codex-rs/docs/agent_jobs.md Normal file
View File

@@ -0,0 +1,102 @@
# Agent Jobs
This document describes the generic batch job engine used for large agentic workloads.
Agent jobs are designed to be:
1. Resumable and durable via SQLite.
2. Bounded by configured concurrency and thread limits.
3. Observable via explicit status/progress tools.
4. Exportable to CSV at stage boundaries.
## Tools
All tools are function-style and gated by the `collab` feature.
### `spawn_agents_on_csv`
Create a new job from a CSV input and immediately start it.
Required args:
- `csv_path`: path to the CSV file (first row is headers).
- `instruction`: instruction to apply to each row.
Optional args:
- `id_column`: header column name to use as a stable item id.
- `job_name`: human-friendly label.
- `output_csv_path`: destination for CSV export (defaults to `<input>.agent-job-<id>.csv`).
- `output_schema`: JSON schema for result payloads (best-effort guidance).
- `max_concurrency`: cap on parallel workers (defaults to 64, then capped by config).
- `auto_export`: auto-export CSV on successful completion (default true).
### `run_agent_job`
Start or resume an existing job by id.
### `get_agent_job_status`
Return job status and progress counters.
### `wait_agent_job`
Wait for a job to complete, or return after a timeout.
### `export_agent_job_csv`
Export the current job results to CSV using the stored headers and results.
### `report_agent_job_result`
Workers must call this exactly once to report a JSON object for their assigned item.
## Execution Model
1. Jobs are stored in SQLite with per-item state (pending/running/completed/failed).
2. The job runner spawns subagents up to `max_concurrency`.
3. Each worker processes one item and calls `report_agent_job_result`.
4. The runner marks items completed after the worker finishes.
5. If `auto_export` is true, the runner writes a CSV snapshot on successful completion.
6. CSV export can also be generated manually by a single writer from the SQLite store.
## CSV Output
Exports include original input columns plus:
- `job_id`
- `item_id`
- `row_index`
- `source_id`
- `status`
- `attempt_count`
- `last_error`
- `result_json`
- `reported_at`
- `completed_at`
## CLI Example (Auto-Export)
The example below creates a small CSV, runs a batch job, waits for completion,
and prints the auto-exported CSV.
```bash
./codex-rs/target/debug/codex exec \
--enable collab \
--enable sqlite \
--full-auto \
-C /path/to/repo \
- <<'PROMPT'
Create /tmp/security_rank_input_demo.csv with 5 rows using paths:
- codex-rs/core/src/tools/handlers/agent_jobs.rs
- codex-rs/core/src/tools/handlers/shell.rs
- codex-rs/core/src/agent/control.rs
- codex-rs/core/src/exec_policy.rs
- codex-rs/core/src/tools/handlers/mcp.rs
Columns: path, area (use "core" for area).
Then call spawn_agents_on_csv with:
csv_path: /tmp/security_rank_input_demo.csv
instruction: read the file (relative to repo root), skim first 200 lines, score security relevance 1-10, output JSON with keys path, score, rationale, signals array.
output_csv_path: /tmp/security_rank_output_demo.csv
Do NOT call export_agent_job_csv manually.
Wait for completion and then print the output path and `head -n 6` of the CSV.
PROMPT
```

View File

@@ -0,0 +1,37 @@
CREATE TABLE agent_jobs (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
status TEXT NOT NULL,
instruction TEXT NOT NULL,
output_schema_json TEXT,
input_headers_json TEXT NOT NULL,
input_csv_path TEXT NOT NULL,
output_csv_path TEXT NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
started_at INTEGER,
completed_at INTEGER,
last_error TEXT
);
CREATE TABLE agent_job_items (
job_id TEXT NOT NULL,
item_id TEXT NOT NULL,
row_index INTEGER NOT NULL,
source_id TEXT,
row_json TEXT NOT NULL,
status TEXT NOT NULL,
assigned_thread_id TEXT,
attempt_count INTEGER NOT NULL DEFAULT 0,
result_json TEXT,
last_error TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
completed_at INTEGER,
reported_at INTEGER,
PRIMARY KEY (job_id, item_id),
FOREIGN KEY(job_id) REFERENCES agent_jobs(id) ON DELETE CASCADE
);
CREATE INDEX idx_agent_jobs_status ON agent_jobs(status, updated_at DESC);
CREATE INDEX idx_agent_job_items_status ON agent_job_items(job_id, status, row_index ASC);

View File

@@ -0,0 +1,2 @@
ALTER TABLE agent_jobs
ADD COLUMN auto_export INTEGER NOT NULL DEFAULT 1;

View File

@@ -21,6 +21,13 @@ pub use runtime::StateRuntime;
///
/// Most consumers should prefer [`StateRuntime`].
pub use extract::apply_rollout_item;
pub use model::AgentJob;
pub use model::AgentJobCreateParams;
pub use model::AgentJobItem;
pub use model::AgentJobItemCreateParams;
pub use model::AgentJobItemStatus;
pub use model::AgentJobProgress;
pub use model::AgentJobStatus;
pub use model::Anchor;
pub use model::BackfillState;
pub use model::BackfillStats;

View File

@@ -0,0 +1,290 @@
use anyhow::Result;
use chrono::DateTime;
use chrono::Utc;
use serde_json::Value;
use sqlx::Row;
use sqlx::sqlite::SqliteRow;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AgentJobStatus {
Pending,
Running,
Completed,
Failed,
Cancelled,
}
impl AgentJobStatus {
pub const fn as_str(self) -> &'static str {
match self {
AgentJobStatus::Pending => "pending",
AgentJobStatus::Running => "running",
AgentJobStatus::Completed => "completed",
AgentJobStatus::Failed => "failed",
AgentJobStatus::Cancelled => "cancelled",
}
}
pub fn parse(value: &str) -> Result<Self> {
match value {
"pending" => Ok(Self::Pending),
"running" => Ok(Self::Running),
"completed" => Ok(Self::Completed),
"failed" => Ok(Self::Failed),
"cancelled" => Ok(Self::Cancelled),
_ => Err(anyhow::anyhow!("invalid agent job status: {value}")),
}
}
pub fn is_final(self) -> bool {
matches!(
self,
AgentJobStatus::Completed | AgentJobStatus::Failed | AgentJobStatus::Cancelled
)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AgentJobItemStatus {
Pending,
Running,
Completed,
Failed,
}
impl AgentJobItemStatus {
pub const fn as_str(self) -> &'static str {
match self {
AgentJobItemStatus::Pending => "pending",
AgentJobItemStatus::Running => "running",
AgentJobItemStatus::Completed => "completed",
AgentJobItemStatus::Failed => "failed",
}
}
pub fn parse(value: &str) -> Result<Self> {
match value {
"pending" => Ok(Self::Pending),
"running" => Ok(Self::Running),
"completed" => Ok(Self::Completed),
"failed" => Ok(Self::Failed),
_ => Err(anyhow::anyhow!("invalid agent job item status: {value}")),
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct AgentJob {
pub id: String,
pub name: String,
pub status: AgentJobStatus,
pub instruction: String,
pub auto_export: bool,
pub output_schema_json: Option<Value>,
pub input_headers: Vec<String>,
pub input_csv_path: String,
pub output_csv_path: String,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub started_at: Option<DateTime<Utc>>,
pub completed_at: Option<DateTime<Utc>>,
pub last_error: Option<String>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct AgentJobItem {
pub job_id: String,
pub item_id: String,
pub row_index: i64,
pub source_id: Option<String>,
pub row_json: Value,
pub status: AgentJobItemStatus,
pub assigned_thread_id: Option<String>,
pub attempt_count: i64,
pub result_json: Option<Value>,
pub last_error: Option<String>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub completed_at: Option<DateTime<Utc>>,
pub reported_at: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AgentJobProgress {
pub total_items: usize,
pub pending_items: usize,
pub running_items: usize,
pub completed_items: usize,
pub failed_items: usize,
}
#[derive(Debug, Clone)]
pub struct AgentJobCreateParams {
pub id: String,
pub name: String,
pub instruction: String,
pub auto_export: bool,
pub output_schema_json: Option<Value>,
pub input_headers: Vec<String>,
pub input_csv_path: String,
pub output_csv_path: String,
}
#[derive(Debug, Clone)]
pub struct AgentJobItemCreateParams {
pub item_id: String,
pub row_index: i64,
pub source_id: Option<String>,
pub row_json: Value,
}
#[derive(Debug)]
pub(crate) struct AgentJobRow {
pub(crate) id: String,
pub(crate) name: String,
pub(crate) status: String,
pub(crate) instruction: String,
pub(crate) auto_export: i64,
pub(crate) output_schema_json: Option<String>,
pub(crate) input_headers_json: String,
pub(crate) input_csv_path: String,
pub(crate) output_csv_path: String,
pub(crate) created_at: i64,
pub(crate) updated_at: i64,
pub(crate) started_at: Option<i64>,
pub(crate) completed_at: Option<i64>,
pub(crate) last_error: Option<String>,
}
impl AgentJobRow {
pub(crate) fn try_from_row(row: &SqliteRow) -> Result<Self> {
Ok(Self {
id: row.try_get("id")?,
name: row.try_get("name")?,
status: row.try_get("status")?,
instruction: row.try_get("instruction")?,
auto_export: row.try_get("auto_export")?,
output_schema_json: row.try_get("output_schema_json")?,
input_headers_json: row.try_get("input_headers_json")?,
input_csv_path: row.try_get("input_csv_path")?,
output_csv_path: row.try_get("output_csv_path")?,
created_at: row.try_get("created_at")?,
updated_at: row.try_get("updated_at")?,
started_at: row.try_get("started_at")?,
completed_at: row.try_get("completed_at")?,
last_error: row.try_get("last_error")?,
})
}
}
impl TryFrom<AgentJobRow> for AgentJob {
type Error = anyhow::Error;
fn try_from(value: AgentJobRow) -> Result<Self, Self::Error> {
let output_schema_json = value
.output_schema_json
.as_deref()
.map(serde_json::from_str)
.transpose()?;
let input_headers = serde_json::from_str(value.input_headers_json.as_str())?;
Ok(Self {
id: value.id,
name: value.name,
status: AgentJobStatus::parse(value.status.as_str())?,
instruction: value.instruction,
auto_export: value.auto_export != 0,
output_schema_json,
input_headers,
input_csv_path: value.input_csv_path,
output_csv_path: value.output_csv_path,
created_at: epoch_seconds_to_datetime(value.created_at)?,
updated_at: epoch_seconds_to_datetime(value.updated_at)?,
started_at: value
.started_at
.map(epoch_seconds_to_datetime)
.transpose()?,
completed_at: value
.completed_at
.map(epoch_seconds_to_datetime)
.transpose()?,
last_error: value.last_error,
})
}
}
#[derive(Debug)]
pub(crate) struct AgentJobItemRow {
pub(crate) job_id: String,
pub(crate) item_id: String,
pub(crate) row_index: i64,
pub(crate) source_id: Option<String>,
pub(crate) row_json: String,
pub(crate) status: String,
pub(crate) assigned_thread_id: Option<String>,
pub(crate) attempt_count: i64,
pub(crate) result_json: Option<String>,
pub(crate) last_error: Option<String>,
pub(crate) created_at: i64,
pub(crate) updated_at: i64,
pub(crate) completed_at: Option<i64>,
pub(crate) reported_at: Option<i64>,
}
impl AgentJobItemRow {
pub(crate) fn try_from_row(row: &SqliteRow) -> Result<Self> {
Ok(Self {
job_id: row.try_get("job_id")?,
item_id: row.try_get("item_id")?,
row_index: row.try_get("row_index")?,
source_id: row.try_get("source_id")?,
row_json: row.try_get("row_json")?,
status: row.try_get("status")?,
assigned_thread_id: row.try_get("assigned_thread_id")?,
attempt_count: row.try_get("attempt_count")?,
result_json: row.try_get("result_json")?,
last_error: row.try_get("last_error")?,
created_at: row.try_get("created_at")?,
updated_at: row.try_get("updated_at")?,
completed_at: row.try_get("completed_at")?,
reported_at: row.try_get("reported_at")?,
})
}
}
impl TryFrom<AgentJobItemRow> for AgentJobItem {
type Error = anyhow::Error;
fn try_from(value: AgentJobItemRow) -> Result<Self, Self::Error> {
Ok(Self {
job_id: value.job_id,
item_id: value.item_id,
row_index: value.row_index,
source_id: value.source_id,
row_json: serde_json::from_str(value.row_json.as_str())?,
status: AgentJobItemStatus::parse(value.status.as_str())?,
assigned_thread_id: value.assigned_thread_id,
attempt_count: value.attempt_count,
result_json: value
.result_json
.as_deref()
.map(serde_json::from_str)
.transpose()?,
last_error: value.last_error,
created_at: epoch_seconds_to_datetime(value.created_at)?,
updated_at: epoch_seconds_to_datetime(value.updated_at)?,
completed_at: value
.completed_at
.map(epoch_seconds_to_datetime)
.transpose()?,
reported_at: value
.reported_at
.map(epoch_seconds_to_datetime)
.transpose()?,
})
}
}
fn epoch_seconds_to_datetime(secs: i64) -> Result<DateTime<Utc>> {
DateTime::<Utc>::from_timestamp(secs, 0)
.ok_or_else(|| anyhow::anyhow!("invalid unix timestamp: {secs}"))
}

View File

@@ -1,8 +1,16 @@
mod agent_job;
mod backfill_state;
mod log;
mod thread_memory;
mod thread_metadata;
pub use agent_job::AgentJob;
pub use agent_job::AgentJobCreateParams;
pub use agent_job::AgentJobItem;
pub use agent_job::AgentJobItemCreateParams;
pub use agent_job::AgentJobItemStatus;
pub use agent_job::AgentJobProgress;
pub use agent_job::AgentJobStatus;
pub use backfill_state::BackfillState;
pub use backfill_state::BackfillStatus;
pub use log::LogEntry;
@@ -17,6 +25,8 @@ pub use thread_metadata::ThreadMetadata;
pub use thread_metadata::ThreadMetadataBuilder;
pub use thread_metadata::ThreadsPage;
pub(crate) use agent_job::AgentJobItemRow;
pub(crate) use agent_job::AgentJobRow;
pub(crate) use thread_memory::ThreadMemoryRow;
pub(crate) use thread_metadata::ThreadRow;
pub(crate) use thread_metadata::anchor_from_item;

View File

@@ -1,3 +1,10 @@
use crate::AgentJob;
use crate::AgentJobCreateParams;
use crate::AgentJobItem;
use crate::AgentJobItemCreateParams;
use crate::AgentJobItemStatus;
use crate::AgentJobProgress;
use crate::AgentJobStatus;
use crate::DB_ERROR_METRIC;
use crate::LogEntry;
use crate::LogQuery;
@@ -9,6 +16,8 @@ use crate::ThreadMetadataBuilder;
use crate::ThreadsPage;
use crate::apply_rollout_item;
use crate::migrations::MIGRATOR;
use crate::model::AgentJobItemRow;
use crate::model::AgentJobRow;
use crate::model::ThreadMemoryRow;
use crate::model::ThreadRow;
use crate::model::anchor_from_item;
@@ -712,6 +721,457 @@ ON CONFLICT(thread_id, position) DO NOTHING
self.upsert_thread(&metadata).await
}
pub async fn create_agent_job(
&self,
params: &AgentJobCreateParams,
items: &[AgentJobItemCreateParams],
) -> anyhow::Result<AgentJob> {
let now = Utc::now().timestamp();
let input_headers_json = serde_json::to_string(&params.input_headers)?;
let output_schema_json = params
.output_schema_json
.as_ref()
.map(serde_json::to_string)
.transpose()?;
let mut tx = self.pool.begin().await?;
sqlx::query(
r#"
INSERT INTO agent_jobs (
id,
name,
status,
instruction,
auto_export,
output_schema_json,
input_headers_json,
input_csv_path,
output_csv_path,
created_at,
updated_at,
started_at,
completed_at,
last_error
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL, NULL, NULL)
"#,
)
.bind(params.id.as_str())
.bind(params.name.as_str())
.bind(AgentJobStatus::Pending.as_str())
.bind(params.instruction.as_str())
.bind(i64::from(params.auto_export))
.bind(output_schema_json)
.bind(input_headers_json)
.bind(params.input_csv_path.as_str())
.bind(params.output_csv_path.as_str())
.bind(now)
.bind(now)
.execute(&mut *tx)
.await?;
for item in items {
let row_json = serde_json::to_string(&item.row_json)?;
sqlx::query(
r#"
INSERT INTO agent_job_items (
job_id,
item_id,
row_index,
source_id,
row_json,
status,
assigned_thread_id,
attempt_count,
result_json,
last_error,
created_at,
updated_at,
completed_at,
reported_at
) VALUES (?, ?, ?, ?, ?, ?, NULL, 0, NULL, NULL, ?, ?, NULL, NULL)
"#,
)
.bind(params.id.as_str())
.bind(item.item_id.as_str())
.bind(item.row_index)
.bind(item.source_id.as_deref())
.bind(row_json)
.bind(AgentJobItemStatus::Pending.as_str())
.bind(now)
.bind(now)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
let job_id = params.id.as_str();
self.get_agent_job(job_id)
.await?
.ok_or_else(|| anyhow::anyhow!("failed to load created agent job {job_id}"))
}
pub async fn get_agent_job(&self, job_id: &str) -> anyhow::Result<Option<AgentJob>> {
let row = sqlx::query(
r#"
SELECT
id,
name,
status,
instruction,
auto_export,
output_schema_json,
input_headers_json,
input_csv_path,
output_csv_path,
created_at,
updated_at,
started_at,
completed_at,
last_error
FROM agent_jobs
WHERE id = ?
"#,
)
.bind(job_id)
.fetch_optional(self.pool.as_ref())
.await?;
row.map(|row| AgentJobRow::try_from_row(&row).and_then(AgentJob::try_from))
.transpose()
}
pub async fn list_agent_job_items(
&self,
job_id: &str,
status: Option<AgentJobItemStatus>,
limit: Option<usize>,
) -> anyhow::Result<Vec<AgentJobItem>> {
let mut builder = QueryBuilder::<Sqlite>::new(
r#"
SELECT
job_id,
item_id,
row_index,
source_id,
row_json,
status,
assigned_thread_id,
attempt_count,
result_json,
last_error,
created_at,
updated_at,
completed_at,
reported_at
FROM agent_job_items
WHERE job_id =
"#,
);
builder.push_bind(job_id);
if let Some(status) = status {
builder.push(" AND status = ");
builder.push_bind(status.as_str());
}
builder.push(" ORDER BY row_index ASC");
if let Some(limit) = limit {
builder.push(" LIMIT ");
builder.push_bind(limit as i64);
}
let rows = builder.build().fetch_all(self.pool.as_ref()).await?;
rows.into_iter()
.map(|row| AgentJobItemRow::try_from_row(&row).and_then(AgentJobItem::try_from))
.collect()
}
pub async fn get_agent_job_item(
&self,
job_id: &str,
item_id: &str,
) -> anyhow::Result<Option<AgentJobItem>> {
let row = sqlx::query(
r#"
SELECT
job_id,
item_id,
row_index,
source_id,
row_json,
status,
assigned_thread_id,
attempt_count,
result_json,
last_error,
created_at,
updated_at,
completed_at,
reported_at
FROM agent_job_items
WHERE job_id = ? AND item_id = ?
"#,
)
.bind(job_id)
.bind(item_id)
.fetch_optional(self.pool.as_ref())
.await?;
row.map(|row| AgentJobItemRow::try_from_row(&row).and_then(AgentJobItem::try_from))
.transpose()
}
pub async fn mark_agent_job_running(&self, job_id: &str) -> anyhow::Result<()> {
let now = Utc::now().timestamp();
sqlx::query(
r#"
UPDATE agent_jobs
SET
status = ?,
updated_at = ?,
started_at = COALESCE(started_at, ?),
completed_at = NULL,
last_error = NULL
WHERE id = ?
"#,
)
.bind(AgentJobStatus::Running.as_str())
.bind(now)
.bind(now)
.bind(job_id)
.execute(self.pool.as_ref())
.await?;
Ok(())
}
pub async fn mark_agent_job_completed(&self, job_id: &str) -> anyhow::Result<()> {
let now = Utc::now().timestamp();
sqlx::query(
r#"
UPDATE agent_jobs
SET status = ?, updated_at = ?, completed_at = ?, last_error = NULL
WHERE id = ?
"#,
)
.bind(AgentJobStatus::Completed.as_str())
.bind(now)
.bind(now)
.bind(job_id)
.execute(self.pool.as_ref())
.await?;
Ok(())
}
pub async fn mark_agent_job_failed(
&self,
job_id: &str,
error_message: &str,
) -> anyhow::Result<()> {
let now = Utc::now().timestamp();
sqlx::query(
r#"
UPDATE agent_jobs
SET status = ?, updated_at = ?, completed_at = ?, last_error = ?
WHERE id = ?
"#,
)
.bind(AgentJobStatus::Failed.as_str())
.bind(now)
.bind(now)
.bind(error_message)
.bind(job_id)
.execute(self.pool.as_ref())
.await?;
Ok(())
}
pub async fn mark_agent_job_item_running(
&self,
job_id: &str,
item_id: &str,
) -> anyhow::Result<bool> {
let now = Utc::now().timestamp();
let result = sqlx::query(
r#"
UPDATE agent_job_items
SET
status = ?,
assigned_thread_id = NULL,
attempt_count = attempt_count + 1,
updated_at = ?,
last_error = NULL
WHERE job_id = ? AND item_id = ? AND status = ?
"#,
)
.bind(AgentJobItemStatus::Running.as_str())
.bind(now)
.bind(job_id)
.bind(item_id)
.bind(AgentJobItemStatus::Pending.as_str())
.execute(self.pool.as_ref())
.await?;
Ok(result.rows_affected() > 0)
}
pub async fn set_agent_job_item_thread(
&self,
job_id: &str,
item_id: &str,
thread_id: &str,
) -> anyhow::Result<bool> {
let now = Utc::now().timestamp();
let result = sqlx::query(
r#"
UPDATE agent_job_items
SET assigned_thread_id = ?, updated_at = ?
WHERE job_id = ? AND item_id = ? AND status = ?
"#,
)
.bind(thread_id)
.bind(now)
.bind(job_id)
.bind(item_id)
.bind(AgentJobItemStatus::Running.as_str())
.execute(self.pool.as_ref())
.await?;
Ok(result.rows_affected() > 0)
}
pub async fn report_agent_job_item_result(
&self,
job_id: &str,
item_id: &str,
reporting_thread_id: &str,
result_json: &Value,
) -> anyhow::Result<bool> {
let now = Utc::now().timestamp();
let serialized = serde_json::to_string(result_json)?;
let result = sqlx::query(
r#"
UPDATE agent_job_items
SET
result_json = ?,
reported_at = ?,
updated_at = ?,
assigned_thread_id = COALESCE(assigned_thread_id, ?),
last_error = NULL
WHERE
job_id = ?
AND item_id = ?
AND status = ?
AND (assigned_thread_id IS NULL OR assigned_thread_id = ?)
"#,
)
.bind(serialized)
.bind(now)
.bind(now)
.bind(reporting_thread_id)
.bind(job_id)
.bind(item_id)
.bind(AgentJobItemStatus::Running.as_str())
.bind(reporting_thread_id)
.execute(self.pool.as_ref())
.await?;
Ok(result.rows_affected() > 0)
}
pub async fn mark_agent_job_item_completed(
&self,
job_id: &str,
item_id: &str,
) -> anyhow::Result<bool> {
let now = Utc::now().timestamp();
let result = sqlx::query(
r#"
UPDATE agent_job_items
SET
status = ?,
completed_at = ?,
updated_at = ?,
assigned_thread_id = NULL
WHERE
job_id = ?
AND item_id = ?
AND status = ?
AND result_json IS NOT NULL
"#,
)
.bind(AgentJobItemStatus::Completed.as_str())
.bind(now)
.bind(now)
.bind(job_id)
.bind(item_id)
.bind(AgentJobItemStatus::Running.as_str())
.execute(self.pool.as_ref())
.await?;
Ok(result.rows_affected() > 0)
}
pub async fn mark_agent_job_item_failed(
&self,
job_id: &str,
item_id: &str,
error_message: &str,
) -> anyhow::Result<bool> {
let now = Utc::now().timestamp();
let result = sqlx::query(
r#"
UPDATE agent_job_items
SET
status = ?,
completed_at = ?,
updated_at = ?,
last_error = ?,
assigned_thread_id = NULL
WHERE
job_id = ?
AND item_id = ?
AND status = ?
"#,
)
.bind(AgentJobItemStatus::Failed.as_str())
.bind(now)
.bind(now)
.bind(error_message)
.bind(job_id)
.bind(item_id)
.bind(AgentJobItemStatus::Running.as_str())
.execute(self.pool.as_ref())
.await?;
Ok(result.rows_affected() > 0)
}
pub async fn get_agent_job_progress(&self, job_id: &str) -> anyhow::Result<AgentJobProgress> {
let row = sqlx::query(
r#"
SELECT
COUNT(*) AS total_items,
SUM(CASE WHEN status = ? THEN 1 ELSE 0 END) AS pending_items,
SUM(CASE WHEN status = ? THEN 1 ELSE 0 END) AS running_items,
SUM(CASE WHEN status = ? THEN 1 ELSE 0 END) AS completed_items,
SUM(CASE WHEN status = ? THEN 1 ELSE 0 END) AS failed_items
FROM agent_job_items
WHERE job_id = ?
"#,
)
.bind(AgentJobItemStatus::Pending.as_str())
.bind(AgentJobItemStatus::Running.as_str())
.bind(AgentJobItemStatus::Completed.as_str())
.bind(AgentJobItemStatus::Failed.as_str())
.bind(job_id)
.fetch_one(self.pool.as_ref())
.await?;
let total_items: i64 = row.try_get("total_items")?;
let pending_items: Option<i64> = row.try_get("pending_items")?;
let running_items: Option<i64> = row.try_get("running_items")?;
let completed_items: Option<i64> = row.try_get("completed_items")?;
let failed_items: Option<i64> = row.try_get("failed_items")?;
Ok(AgentJobProgress {
total_items: usize::try_from(total_items).unwrap_or_default(),
pending_items: usize::try_from(pending_items.unwrap_or_default()).unwrap_or_default(),
running_items: usize::try_from(running_items.unwrap_or_default()).unwrap_or_default(),
completed_items: usize::try_from(completed_items.unwrap_or_default())
.unwrap_or_default(),
failed_items: usize::try_from(failed_items.unwrap_or_default()).unwrap_or_default(),
})
}
async fn ensure_backfill_state_row(&self) -> anyhow::Result<()> {
sqlx::query(
r#"