mirror of
https://github.com/openai/codex.git
synced 2026-04-21 05:04:49 +00:00
Compare commits
29 Commits
codex/func
...
feat/swarm
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
81ca23115b | ||
|
|
4afd27ad1c | ||
|
|
7ca99ba4d8 | ||
|
|
8077634a5f | ||
|
|
7bbfcfa3ca | ||
|
|
ef7798b9af | ||
|
|
e27d4712b1 | ||
|
|
993ae29b2a | ||
|
|
2172064556 | ||
|
|
52b08e3106 | ||
|
|
0d24339e75 | ||
|
|
94597f2663 | ||
|
|
9b5e27c359 | ||
|
|
776318e9c3 | ||
|
|
70f2b2ecc4 | ||
|
|
6225a73c35 | ||
|
|
b66f7c2357 | ||
|
|
23dd69ea9c | ||
|
|
4717f0c5b8 | ||
|
|
9f1e7671fb | ||
|
|
903b8f4b0f | ||
|
|
5f850f40c5 | ||
|
|
2126e3ffe6 | ||
|
|
6630587da3 | ||
|
|
263cacc2bb | ||
|
|
9ceb403655 | ||
|
|
caddbe161c | ||
|
|
001cfb5bba | ||
|
|
e112e79b8a |
@@ -5,6 +5,7 @@ pub(crate) mod status;
|
||||
|
||||
pub(crate) use codex_protocol::protocol::AgentStatus;
|
||||
pub(crate) use control::AgentControl;
|
||||
#[cfg(test)]
|
||||
pub(crate) use guards::MAX_THREAD_SPAWN_DEPTH;
|
||||
pub(crate) use guards::exceeds_thread_spawn_depth_limit;
|
||||
pub(crate) use guards::next_thread_spawn_depth;
|
||||
|
||||
@@ -10,8 +10,8 @@ use crate::CodexAuth;
|
||||
use crate::SandboxState;
|
||||
use crate::agent::AgentControl;
|
||||
use crate::agent::AgentStatus;
|
||||
use crate::agent::MAX_THREAD_SPAWN_DEPTH;
|
||||
use crate::agent::agent_status_from_event;
|
||||
use crate::agent::exceeds_thread_spawn_depth_limit;
|
||||
use crate::analytics_client::AnalyticsEventsClient;
|
||||
use crate::analytics_client::build_track_events_context;
|
||||
use crate::compact;
|
||||
@@ -274,7 +274,7 @@ impl Codex {
|
||||
}
|
||||
|
||||
if let SessionSource::SubAgent(SubAgentSource::ThreadSpawn { depth, .. }) = session_source
|
||||
&& depth >= MAX_THREAD_SPAWN_DEPTH
|
||||
&& exceeds_thread_spawn_depth_limit(depth)
|
||||
{
|
||||
config.features.disable(Feature::Collab);
|
||||
}
|
||||
|
||||
1014
codex-rs/core/src/tools/handlers/agent_jobs.rs
Normal file
1014
codex-rs/core/src/tools/handlers/agent_jobs.rs
Normal file
File diff suppressed because it is too large
Load Diff
@@ -1,10 +1,8 @@
|
||||
use crate::agent::AgentStatus;
|
||||
use crate::agent::exceeds_thread_spawn_depth_limit;
|
||||
use crate::codex::Session;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::config::Config;
|
||||
use crate::error::CodexErr;
|
||||
use crate::features::Feature;
|
||||
use crate::function_tool::FunctionCallError;
|
||||
use crate::tools::context::ToolInvocation;
|
||||
use crate::tools::context::ToolOutput;
|
||||
@@ -585,10 +583,10 @@ fn collab_agent_error(agent_id: ThreadId, err: CodexErr) -> FunctionCallError {
|
||||
}
|
||||
}
|
||||
|
||||
fn build_agent_spawn_config(
|
||||
pub(crate) fn build_agent_spawn_config(
|
||||
base_instructions: &BaseInstructions,
|
||||
turn: &TurnContext,
|
||||
child_depth: i32,
|
||||
_child_depth: i32,
|
||||
) -> Result<Config, FunctionCallError> {
|
||||
let base_config = turn.config.clone();
|
||||
let mut config = (*base_config).clone();
|
||||
@@ -615,11 +613,6 @@ fn build_agent_spawn_config(
|
||||
FunctionCallError::RespondToModel(format!("sandbox_policy is invalid: {err}"))
|
||||
})?;
|
||||
|
||||
// If the new agent will be at max depth:
|
||||
if exceeds_thread_spawn_depth_limit(child_depth + 1) {
|
||||
config.features.disable(Feature::Collab);
|
||||
}
|
||||
|
||||
Ok(config)
|
||||
}
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
pub(crate) mod agent_jobs;
|
||||
pub mod apply_patch;
|
||||
pub(crate) mod collab;
|
||||
mod dynamic;
|
||||
|
||||
@@ -4,6 +4,7 @@ use crate::client_common::tools::ToolSpec;
|
||||
use crate::features::Feature;
|
||||
use crate::features::Features;
|
||||
use crate::tools::handlers::PLAN_TOOL;
|
||||
use crate::tools::handlers::agent_jobs::AgentJobsHandler;
|
||||
use crate::tools::handlers::apply_patch::create_apply_patch_freeform_tool;
|
||||
use crate::tools::handlers::apply_patch::create_apply_patch_json_tool;
|
||||
use crate::tools::handlers::collab::DEFAULT_WAIT_TIMEOUT_MS;
|
||||
@@ -481,6 +482,112 @@ fn create_spawn_agent_tool() -> ToolSpec {
|
||||
})
|
||||
}
|
||||
|
||||
fn create_spawn_agents_on_csv_tool() -> ToolSpec {
|
||||
let mut properties = BTreeMap::new();
|
||||
properties.insert(
|
||||
"csv_path".to_string(),
|
||||
JsonSchema::String {
|
||||
description: Some("Path to the CSV file containing input rows.".to_string()),
|
||||
},
|
||||
);
|
||||
properties.insert(
|
||||
"instruction".to_string(),
|
||||
JsonSchema::String {
|
||||
description: Some(
|
||||
"Instruction template to apply to each CSV row. Use {column_name} placeholders to inject values from the row."
|
||||
.to_string(),
|
||||
),
|
||||
},
|
||||
);
|
||||
properties.insert(
|
||||
"id_column".to_string(),
|
||||
JsonSchema::String {
|
||||
description: Some("Optional column name to use as stable item id.".to_string()),
|
||||
},
|
||||
);
|
||||
properties.insert(
|
||||
"job_name".to_string(),
|
||||
JsonSchema::String {
|
||||
description: Some("Optional friendly name for the job.".to_string()),
|
||||
},
|
||||
);
|
||||
properties.insert(
|
||||
"output_csv_path".to_string(),
|
||||
JsonSchema::String {
|
||||
description: Some("Optional output CSV path for exported results.".to_string()),
|
||||
},
|
||||
);
|
||||
properties.insert(
|
||||
"max_concurrency".to_string(),
|
||||
JsonSchema::Number {
|
||||
description: Some(
|
||||
"Maximum concurrent workers for this job. Defaults to 64 and is capped by config."
|
||||
.to_string(),
|
||||
),
|
||||
},
|
||||
);
|
||||
properties.insert(
|
||||
"output_schema".to_string(),
|
||||
JsonSchema::Object {
|
||||
properties: BTreeMap::new(),
|
||||
required: None,
|
||||
additional_properties: None,
|
||||
},
|
||||
);
|
||||
ToolSpec::Function(ResponsesApiTool {
|
||||
name: "spawn_agents_on_csv".to_string(),
|
||||
description:
|
||||
"Run a batch job that spawns subagents to process each CSV row, blocking until completion and auto-exporting results on success."
|
||||
.to_string(),
|
||||
strict: false,
|
||||
parameters: JsonSchema::Object {
|
||||
properties,
|
||||
required: Some(vec!["csv_path".to_string(), "instruction".to_string()]),
|
||||
additional_properties: Some(false.into()),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
fn create_report_agent_job_result_tool() -> ToolSpec {
|
||||
let mut properties = BTreeMap::new();
|
||||
properties.insert(
|
||||
"job_id".to_string(),
|
||||
JsonSchema::String {
|
||||
description: Some("Identifier of the job.".to_string()),
|
||||
},
|
||||
);
|
||||
properties.insert(
|
||||
"item_id".to_string(),
|
||||
JsonSchema::String {
|
||||
description: Some("Identifier of the job item.".to_string()),
|
||||
},
|
||||
);
|
||||
properties.insert(
|
||||
"result".to_string(),
|
||||
JsonSchema::Object {
|
||||
properties: BTreeMap::new(),
|
||||
required: None,
|
||||
additional_properties: None,
|
||||
},
|
||||
);
|
||||
ToolSpec::Function(ResponsesApiTool {
|
||||
name: "report_agent_job_result".to_string(),
|
||||
description:
|
||||
"Worker-only tool to report a result for an agent job item. Main agents should not call this."
|
||||
.to_string(),
|
||||
strict: false,
|
||||
parameters: JsonSchema::Object {
|
||||
properties,
|
||||
required: Some(vec![
|
||||
"job_id".to_string(),
|
||||
"item_id".to_string(),
|
||||
"result".to_string(),
|
||||
]),
|
||||
additional_properties: Some(false.into()),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
fn create_send_input_tool() -> ToolSpec {
|
||||
let mut properties = BTreeMap::new();
|
||||
properties.insert(
|
||||
@@ -1386,6 +1493,14 @@ pub(crate) fn build_specs(
|
||||
builder.register_handler("close_agent", collab_handler);
|
||||
}
|
||||
|
||||
if config.collab_tools {
|
||||
let agent_jobs_handler = Arc::new(AgentJobsHandler);
|
||||
builder.push_spec(create_spawn_agents_on_csv_tool());
|
||||
builder.push_spec(create_report_agent_job_result_tool());
|
||||
builder.register_handler("spawn_agents_on_csv", agent_jobs_handler.clone());
|
||||
builder.register_handler("report_agent_job_result", agent_jobs_handler);
|
||||
}
|
||||
|
||||
if let Some(mcp_tools) = mcp_tools {
|
||||
let mut entries: Vec<(String, rmcp::model::Tool)> = mcp_tools.into_iter().collect();
|
||||
entries.sort_by(|a, b| a.0.cmp(&b.0));
|
||||
@@ -1638,7 +1753,14 @@ mod tests {
|
||||
let (tools, _) = build_specs(&tools_config, None, &[]).build();
|
||||
assert_contains_tool_names(
|
||||
&tools,
|
||||
&["spawn_agent", "send_input", "wait", "close_agent"],
|
||||
&[
|
||||
"spawn_agent",
|
||||
"send_input",
|
||||
"wait",
|
||||
"close_agent",
|
||||
"spawn_agents_on_csv",
|
||||
"report_agent_job_result",
|
||||
],
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
86
codex-rs/docs/agent_jobs.md
Normal file
86
codex-rs/docs/agent_jobs.md
Normal file
@@ -0,0 +1,86 @@
|
||||
# Agent Jobs
|
||||
|
||||
This document describes the generic batch job engine used for large agentic workloads.
|
||||
Agent jobs are designed to be:
|
||||
|
||||
1. Durable via SQLite for progress tracking and export.
|
||||
2. Bounded by configured concurrency and thread limits.
|
||||
3. Exportable to CSV on successful completion.
|
||||
|
||||
## Tools
|
||||
|
||||
All tools are function-style and gated by the `collab` feature.
|
||||
|
||||
### `spawn_agents_on_csv`
|
||||
|
||||
Create a new job from a CSV input and immediately start it.
|
||||
This tool blocks until the job completes and auto-exports on success.
|
||||
|
||||
Required args:
|
||||
- `csv_path`: path to the CSV file (first row is headers).
|
||||
- `instruction`: instruction template to apply to each row. Use `{column_name}` placeholders to
|
||||
inject values from the CSV row (column names are case-sensitive and may include spaces).
|
||||
Use `{{` and `}}` for literal braces.
|
||||
|
||||
Optional args:
|
||||
- `id_column`: header column name to use as a stable item id.
|
||||
- `job_name`: human-friendly label.
|
||||
- `output_csv_path`: destination for CSV export (defaults to `<input>.agent-job-<id>.csv`).
|
||||
- `output_schema`: JSON schema for result payloads (best-effort guidance).
|
||||
- `max_concurrency`: cap on parallel workers (defaults to 64, then capped by config).
|
||||
|
||||
### `report_agent_job_result`
|
||||
|
||||
Worker-only tool used internally for reporting JSON results. Main agents should not call this.
|
||||
|
||||
## Execution Model
|
||||
|
||||
1. Jobs are stored in SQLite with per-item state (pending/running/completed/failed).
|
||||
2. The job runner spawns subagents up to `max_concurrency`.
|
||||
3. The job instruction is rendered per row by substituting `{column_name}` placeholders.
|
||||
4. Each worker processes one item and reports results through `report_agent_job_result`.
|
||||
5. The runner marks items completed after the worker finishes.
|
||||
6. The runner writes a CSV snapshot on successful completion.
|
||||
|
||||
## CSV Output
|
||||
|
||||
Exports include original input columns plus:
|
||||
- `job_id`
|
||||
- `item_id`
|
||||
- `row_index`
|
||||
- `source_id`
|
||||
- `status`
|
||||
- `attempt_count`
|
||||
- `last_error`
|
||||
- `result_json`
|
||||
- `reported_at`
|
||||
- `completed_at`
|
||||
|
||||
## CLI Example (Auto-Export)
|
||||
|
||||
The example below creates a small CSV, runs a batch job, waits for completion,
|
||||
and prints the auto-exported CSV.
|
||||
|
||||
```bash
|
||||
./codex-rs/target/debug/codex exec \
|
||||
--enable collab \
|
||||
--enable sqlite \
|
||||
--full-auto \
|
||||
-C /path/to/repo \
|
||||
- <<'PROMPT'
|
||||
Create /tmp/security_rank_input_demo.csv with 5 rows using paths:
|
||||
- codex-rs/core/src/tools/handlers/agent_jobs.rs
|
||||
- codex-rs/core/src/tools/handlers/shell.rs
|
||||
- codex-rs/core/src/agent/control.rs
|
||||
- codex-rs/core/src/exec_policy.rs
|
||||
- codex-rs/core/src/tools/handlers/mcp.rs
|
||||
Columns: path, area (use "core" for area).
|
||||
|
||||
Then call spawn_agents_on_csv with:
|
||||
csv_path: /tmp/security_rank_input_demo.csv
|
||||
instruction: read the file (relative to repo root), skim first 200 lines, score security relevance 1-10, output JSON with keys path, score, rationale, signals array.
|
||||
output_csv_path: /tmp/security_rank_output_demo.csv
|
||||
|
||||
After completion, print the output path and `head -n 6` of the CSV.
|
||||
PROMPT
|
||||
```
|
||||
@@ -82,6 +82,10 @@ pub struct Cli {
|
||||
#[arg(long = "color", value_enum, default_value_t = Color::Auto)]
|
||||
pub color: Color,
|
||||
|
||||
/// Force cursor-based progress updates in exec mode.
|
||||
#[arg(long = "progress-cursor", default_value_t = false)]
|
||||
pub progress_cursor: bool,
|
||||
|
||||
/// Print events to stdout as JSONL.
|
||||
#[arg(
|
||||
long = "json",
|
||||
|
||||
@@ -38,9 +38,12 @@ use codex_protocol::items::TurnItem;
|
||||
use codex_protocol::num_format::format_with_separators;
|
||||
use owo_colors::OwoColorize;
|
||||
use owo_colors::Style;
|
||||
use serde::Deserialize;
|
||||
use shlex::try_join;
|
||||
use std::collections::HashMap;
|
||||
use std::io::Write;
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
use crate::event_processor::CodexStatus;
|
||||
@@ -76,11 +79,17 @@ pub(crate) struct EventProcessorWithHumanOutput {
|
||||
last_total_token_usage: Option<codex_core::protocol::TokenUsageInfo>,
|
||||
final_message: Option<String>,
|
||||
last_proposed_plan: Option<String>,
|
||||
progress_active: bool,
|
||||
progress_last_len: usize,
|
||||
use_ansi_cursor: bool,
|
||||
progress_anchor: bool,
|
||||
progress_done: bool,
|
||||
}
|
||||
|
||||
impl EventProcessorWithHumanOutput {
|
||||
pub(crate) fn create_with_ansi(
|
||||
with_ansi: bool,
|
||||
cursor_ansi: bool,
|
||||
config: &Config,
|
||||
last_message_path: Option<PathBuf>,
|
||||
) -> Self {
|
||||
@@ -103,6 +112,11 @@ impl EventProcessorWithHumanOutput {
|
||||
last_total_token_usage: None,
|
||||
final_message: None,
|
||||
last_proposed_plan: None,
|
||||
progress_active: false,
|
||||
progress_last_len: 0,
|
||||
use_ansi_cursor: cursor_ansi,
|
||||
progress_anchor: false,
|
||||
progress_done: false,
|
||||
}
|
||||
} else {
|
||||
Self {
|
||||
@@ -121,11 +135,27 @@ impl EventProcessorWithHumanOutput {
|
||||
last_total_token_usage: None,
|
||||
final_message: None,
|
||||
last_proposed_plan: None,
|
||||
progress_active: false,
|
||||
progress_last_len: 0,
|
||||
use_ansi_cursor: cursor_ansi,
|
||||
progress_anchor: false,
|
||||
progress_done: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct AgentJobProgressMessage {
|
||||
job_id: String,
|
||||
total_items: usize,
|
||||
pending_items: usize,
|
||||
running_items: usize,
|
||||
completed_items: usize,
|
||||
failed_items: usize,
|
||||
eta_seconds: Option<u64>,
|
||||
}
|
||||
|
||||
struct PatchApplyBegin {
|
||||
start_time: Instant,
|
||||
auto_approved: bool,
|
||||
@@ -176,6 +206,18 @@ impl EventProcessor for EventProcessorWithHumanOutput {
|
||||
|
||||
fn process_event(&mut self, event: Event) -> CodexStatus {
|
||||
let Event { id: _, msg } = event;
|
||||
if let EventMsg::BackgroundEvent(BackgroundEventEvent { message }) = &msg
|
||||
&& let Some(update) = Self::parse_agent_job_progress(message)
|
||||
{
|
||||
self.render_agent_job_progress(update);
|
||||
return CodexStatus::Running;
|
||||
}
|
||||
if self.progress_active && !Self::should_interrupt_progress(&msg) {
|
||||
return CodexStatus::Running;
|
||||
}
|
||||
if !Self::is_silent_event(&msg) {
|
||||
self.finish_progress_line();
|
||||
}
|
||||
match msg {
|
||||
EventMsg::Error(ErrorEvent { message, .. }) => {
|
||||
let prefix = "ERROR:".style(self.red);
|
||||
@@ -802,6 +844,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
|
||||
}
|
||||
|
||||
fn print_final_output(&mut self) {
|
||||
self.finish_progress_line();
|
||||
if let Some(usage_info) = &self.last_total_token_usage {
|
||||
eprintln!(
|
||||
"{}\n{}",
|
||||
@@ -825,6 +868,193 @@ impl EventProcessor for EventProcessorWithHumanOutput {
|
||||
}
|
||||
}
|
||||
|
||||
impl EventProcessorWithHumanOutput {
|
||||
fn parse_agent_job_progress(message: &str) -> Option<AgentJobProgressMessage> {
|
||||
let payload = message.strip_prefix("agent_job_progress:")?;
|
||||
serde_json::from_str::<AgentJobProgressMessage>(payload).ok()
|
||||
}
|
||||
|
||||
fn is_silent_event(msg: &EventMsg) -> bool {
|
||||
matches!(
|
||||
msg,
|
||||
EventMsg::ThreadNameUpdated(_)
|
||||
| EventMsg::TokenCount(_)
|
||||
| EventMsg::TurnStarted(_)
|
||||
| EventMsg::ExecApprovalRequest(_)
|
||||
| EventMsg::ApplyPatchApprovalRequest(_)
|
||||
| EventMsg::TerminalInteraction(_)
|
||||
| EventMsg::ExecCommandOutputDelta(_)
|
||||
| EventMsg::GetHistoryEntryResponse(_)
|
||||
| EventMsg::McpListToolsResponse(_)
|
||||
| EventMsg::ListCustomPromptsResponse(_)
|
||||
| EventMsg::ListSkillsResponse(_)
|
||||
| EventMsg::ListRemoteSkillsResponse(_)
|
||||
| EventMsg::RemoteSkillDownloaded(_)
|
||||
| EventMsg::RawResponseItem(_)
|
||||
| EventMsg::UserMessage(_)
|
||||
| EventMsg::EnteredReviewMode(_)
|
||||
| EventMsg::ExitedReviewMode(_)
|
||||
| EventMsg::AgentMessageDelta(_)
|
||||
| EventMsg::AgentReasoningDelta(_)
|
||||
| EventMsg::AgentReasoningRawContentDelta(_)
|
||||
| EventMsg::ItemStarted(_)
|
||||
| EventMsg::ItemCompleted(_)
|
||||
| EventMsg::AgentMessageContentDelta(_)
|
||||
| EventMsg::PlanDelta(_)
|
||||
| EventMsg::ReasoningContentDelta(_)
|
||||
| EventMsg::ReasoningRawContentDelta(_)
|
||||
| EventMsg::SkillsUpdateAvailable
|
||||
| EventMsg::UndoCompleted(_)
|
||||
| EventMsg::UndoStarted(_)
|
||||
| EventMsg::ThreadRolledBack(_)
|
||||
| EventMsg::RequestUserInput(_)
|
||||
| EventMsg::DynamicToolCallRequest(_)
|
||||
)
|
||||
}
|
||||
|
||||
fn should_interrupt_progress(msg: &EventMsg) -> bool {
|
||||
matches!(
|
||||
msg,
|
||||
EventMsg::Error(_)
|
||||
| EventMsg::Warning(_)
|
||||
| EventMsg::DeprecationNotice(_)
|
||||
| EventMsg::StreamError(_)
|
||||
| EventMsg::TurnComplete(_)
|
||||
| EventMsg::ShutdownComplete
|
||||
)
|
||||
}
|
||||
|
||||
fn finish_progress_line(&mut self) {
|
||||
if self.progress_active {
|
||||
self.progress_active = false;
|
||||
self.progress_last_len = 0;
|
||||
self.progress_done = false;
|
||||
if self.use_ansi_cursor {
|
||||
if self.progress_anchor {
|
||||
eprint!("\u{1b}[1A\u{1b}[1G\u{1b}[2K\n");
|
||||
} else {
|
||||
eprint!("\u{1b}[1G\u{1b}[2K\n");
|
||||
}
|
||||
} else {
|
||||
eprintln!();
|
||||
}
|
||||
self.progress_anchor = false;
|
||||
}
|
||||
}
|
||||
|
||||
fn render_agent_job_progress(&mut self, update: AgentJobProgressMessage) {
|
||||
let total = update.total_items.max(1);
|
||||
let processed = update.completed_items + update.failed_items;
|
||||
let percent = (processed as f64 / total as f64 * 100.0).round() as i64;
|
||||
let job_label = update.job_id.chars().take(8).collect::<String>();
|
||||
let eta = update
|
||||
.eta_seconds
|
||||
.map(|secs| format_duration(Duration::from_secs(secs)))
|
||||
.unwrap_or_else(|| "--".to_string());
|
||||
let columns = std::env::var("COLUMNS")
|
||||
.ok()
|
||||
.and_then(|value| value.parse::<usize>().ok())
|
||||
.filter(|value| *value > 0);
|
||||
let line = format_agent_job_progress_line(
|
||||
columns,
|
||||
job_label.as_str(),
|
||||
processed,
|
||||
total,
|
||||
percent,
|
||||
update.failed_items,
|
||||
update.running_items,
|
||||
update.pending_items,
|
||||
eta.as_str(),
|
||||
);
|
||||
let done = processed >= update.total_items;
|
||||
if !self.use_ansi_cursor {
|
||||
eprintln!("{line}");
|
||||
if done {
|
||||
self.progress_active = false;
|
||||
self.progress_last_len = 0;
|
||||
}
|
||||
return;
|
||||
}
|
||||
if done && self.progress_done {
|
||||
return;
|
||||
}
|
||||
if !self.progress_active {
|
||||
eprint!("\n");
|
||||
self.progress_anchor = true;
|
||||
self.progress_done = false;
|
||||
}
|
||||
let mut output = String::new();
|
||||
if self.progress_anchor {
|
||||
output.push_str("\u{1b}[1A\u{1b}[1G\u{1b}[2K");
|
||||
} else {
|
||||
output.push_str("\u{1b}[1G\u{1b}[2K");
|
||||
}
|
||||
output.push_str(&line);
|
||||
if done {
|
||||
output.push('\n');
|
||||
eprint!("{output}");
|
||||
self.progress_active = false;
|
||||
self.progress_last_len = 0;
|
||||
self.progress_anchor = false;
|
||||
self.progress_done = true;
|
||||
return;
|
||||
}
|
||||
eprint!("{output}");
|
||||
let _ = std::io::stderr().flush();
|
||||
self.progress_active = true;
|
||||
self.progress_last_len = line.len();
|
||||
}
|
||||
}
|
||||
|
||||
fn format_agent_job_progress_line(
|
||||
columns: Option<usize>,
|
||||
job_label: &str,
|
||||
processed: usize,
|
||||
total: usize,
|
||||
percent: i64,
|
||||
failed: usize,
|
||||
running: usize,
|
||||
pending: usize,
|
||||
eta: &str,
|
||||
) -> String {
|
||||
let rest = format!("{processed}/{total} {percent}% f{failed} r{running} p{pending} eta {eta}");
|
||||
let prefix = format!("job {job_label}");
|
||||
let base_len = prefix.len() + rest.len() + 4;
|
||||
let mut bar_width = columns
|
||||
.and_then(|columns| columns.checked_sub(base_len))
|
||||
.filter(|available| *available > 0)
|
||||
.unwrap_or(20usize);
|
||||
let with_bar = |width: usize| {
|
||||
let filled = ((processed as f64 / total as f64) * width as f64)
|
||||
.round()
|
||||
.clamp(0.0, width as f64) as usize;
|
||||
let mut bar = "#".repeat(filled);
|
||||
bar.push_str(&"-".repeat(width - filled));
|
||||
format!("{prefix} [{bar}] {rest}")
|
||||
};
|
||||
let mut line = with_bar(bar_width);
|
||||
if let Some(columns) = columns {
|
||||
if line.len() > columns {
|
||||
let min_line = format!("{prefix} {rest}");
|
||||
if min_line.len() > columns {
|
||||
let mut truncated = min_line;
|
||||
if columns > 2 && truncated.len() > columns {
|
||||
truncated.truncate(columns - 2);
|
||||
truncated.push_str("..");
|
||||
}
|
||||
return truncated;
|
||||
}
|
||||
let available = columns.saturating_sub(base_len);
|
||||
if available == 0 {
|
||||
return min_line;
|
||||
}
|
||||
bar_width = available.min(bar_width).max(1);
|
||||
line = with_bar(bar_width);
|
||||
}
|
||||
}
|
||||
line
|
||||
}
|
||||
|
||||
fn escape_command(command: &[String]) -> String {
|
||||
try_join(command.iter().map(String::as_str)).unwrap_or_else(|_| command.join(" "))
|
||||
}
|
||||
|
||||
@@ -41,6 +41,7 @@ use codex_core::protocol::ReviewTarget;
|
||||
use codex_core::protocol::SessionSource;
|
||||
use codex_protocol::approvals::ElicitationAction;
|
||||
use codex_protocol::config_types::SandboxMode;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use event_processor_with_human_output::EventProcessorWithHumanOutput;
|
||||
@@ -84,6 +85,7 @@ struct ThreadEventEnvelope {
|
||||
thread_id: codex_protocol::ThreadId,
|
||||
thread: Arc<codex_core::CodexThread>,
|
||||
event: Event,
|
||||
suppress_output: bool,
|
||||
}
|
||||
|
||||
pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> anyhow::Result<()> {
|
||||
@@ -110,9 +112,10 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
|
||||
prompt,
|
||||
output_schema: output_schema_path,
|
||||
config_overrides,
|
||||
progress_cursor,
|
||||
} = cli;
|
||||
|
||||
let (stdout_with_ansi, stderr_with_ansi) = match color {
|
||||
let (_stdout_with_ansi, stderr_with_ansi) = match color {
|
||||
cli::Color::Always => (true, true),
|
||||
cli::Color::Never => (false, false),
|
||||
cli::Color::Auto => (
|
||||
@@ -120,6 +123,24 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
|
||||
supports_color::on_cached(Stream::Stderr).is_some(),
|
||||
),
|
||||
};
|
||||
let cursor_ansi = if progress_cursor {
|
||||
true
|
||||
} else {
|
||||
match color {
|
||||
cli::Color::Never => false,
|
||||
cli::Color::Always => true,
|
||||
cli::Color::Auto => {
|
||||
if stderr_with_ansi || std::io::stderr().is_terminal() {
|
||||
true
|
||||
} else {
|
||||
match std::env::var("TERM") {
|
||||
Ok(term) => !term.is_empty() && term != "dumb",
|
||||
Err(_) => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Build fmt layer (existing logging) to compose with OTEL layer.
|
||||
let default_level = "error";
|
||||
@@ -298,7 +319,8 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
|
||||
let mut event_processor: Box<dyn EventProcessor> = match json_mode {
|
||||
true => Box::new(EventProcessorWithJsonOutput::new(last_message_file.clone())),
|
||||
_ => Box::new(EventProcessorWithHumanOutput::create_with_ansi(
|
||||
stdout_with_ansi,
|
||||
stderr_with_ansi,
|
||||
cursor_ansi,
|
||||
&config,
|
||||
last_message_file.clone(),
|
||||
)),
|
||||
@@ -438,7 +460,7 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
|
||||
|
||||
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ThreadEventEnvelope>();
|
||||
let attached_threads = Arc::new(Mutex::new(HashSet::from([primary_thread_id])));
|
||||
spawn_thread_listener(primary_thread_id, thread.clone(), tx.clone());
|
||||
spawn_thread_listener(primary_thread_id, thread.clone(), tx.clone(), false);
|
||||
|
||||
{
|
||||
let thread = thread.clone();
|
||||
@@ -466,7 +488,14 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
|
||||
match thread_manager.get_thread(thread_id).await {
|
||||
Ok(thread) => {
|
||||
attached_threads.lock().await.insert(thread_id);
|
||||
spawn_thread_listener(thread_id, thread, tx.clone());
|
||||
let suppress_output =
|
||||
is_agent_job_subagent(&thread.config_snapshot().await);
|
||||
spawn_thread_listener(
|
||||
thread_id,
|
||||
thread,
|
||||
tx.clone(),
|
||||
suppress_output,
|
||||
);
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("failed to attach listener for thread {thread_id}: {err}")
|
||||
@@ -520,7 +549,11 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
|
||||
thread_id,
|
||||
thread,
|
||||
event,
|
||||
suppress_output,
|
||||
} = envelope;
|
||||
if suppress_output && should_suppress_agent_job_event(&event.msg) {
|
||||
continue;
|
||||
}
|
||||
if let EventMsg::ElicitationRequest(ev) = &event.msg {
|
||||
// Automatically cancel elicitation requests in exec mode.
|
||||
thread
|
||||
@@ -562,6 +595,7 @@ fn spawn_thread_listener(
|
||||
thread_id: codex_protocol::ThreadId,
|
||||
thread: Arc<codex_core::CodexThread>,
|
||||
tx: tokio::sync::mpsc::UnboundedSender<ThreadEventEnvelope>,
|
||||
suppress_output: bool,
|
||||
) {
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
@@ -574,6 +608,7 @@ fn spawn_thread_listener(
|
||||
thread_id,
|
||||
thread: Arc::clone(&thread),
|
||||
event,
|
||||
suppress_output,
|
||||
}) {
|
||||
error!("Error sending event: {err:?}");
|
||||
break;
|
||||
@@ -594,6 +629,29 @@ fn spawn_thread_listener(
|
||||
});
|
||||
}
|
||||
|
||||
fn is_agent_job_subagent(config: &codex_core::ThreadConfigSnapshot) -> bool {
|
||||
match &config.session_source {
|
||||
SessionSource::SubAgent(SubAgentSource::Other(source)) => source.starts_with("agent_job:"),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn should_suppress_agent_job_event(msg: &EventMsg) -> bool {
|
||||
!matches!(
|
||||
msg,
|
||||
EventMsg::ExecApprovalRequest(_)
|
||||
| EventMsg::ApplyPatchApprovalRequest(_)
|
||||
| EventMsg::RequestUserInput(_)
|
||||
| EventMsg::DynamicToolCallRequest(_)
|
||||
| EventMsg::ElicitationRequest(_)
|
||||
| EventMsg::Error(_)
|
||||
| EventMsg::Warning(_)
|
||||
| EventMsg::DeprecationNotice(_)
|
||||
| EventMsg::StreamError(_)
|
||||
| EventMsg::ShutdownComplete
|
||||
)
|
||||
}
|
||||
|
||||
async fn resolve_resume_path(
|
||||
config: &Config,
|
||||
args: &crate::cli::ResumeArgs,
|
||||
|
||||
37
codex-rs/state/migrations/0009_agent_jobs.sql
Normal file
37
codex-rs/state/migrations/0009_agent_jobs.sql
Normal file
@@ -0,0 +1,37 @@
|
||||
CREATE TABLE agent_jobs (
|
||||
id TEXT PRIMARY KEY,
|
||||
name TEXT NOT NULL,
|
||||
status TEXT NOT NULL,
|
||||
instruction TEXT NOT NULL,
|
||||
output_schema_json TEXT,
|
||||
input_headers_json TEXT NOT NULL,
|
||||
input_csv_path TEXT NOT NULL,
|
||||
output_csv_path TEXT NOT NULL,
|
||||
created_at INTEGER NOT NULL,
|
||||
updated_at INTEGER NOT NULL,
|
||||
started_at INTEGER,
|
||||
completed_at INTEGER,
|
||||
last_error TEXT
|
||||
);
|
||||
|
||||
CREATE TABLE agent_job_items (
|
||||
job_id TEXT NOT NULL,
|
||||
item_id TEXT NOT NULL,
|
||||
row_index INTEGER NOT NULL,
|
||||
source_id TEXT,
|
||||
row_json TEXT NOT NULL,
|
||||
status TEXT NOT NULL,
|
||||
assigned_thread_id TEXT,
|
||||
attempt_count INTEGER NOT NULL DEFAULT 0,
|
||||
result_json TEXT,
|
||||
last_error TEXT,
|
||||
created_at INTEGER NOT NULL,
|
||||
updated_at INTEGER NOT NULL,
|
||||
completed_at INTEGER,
|
||||
reported_at INTEGER,
|
||||
PRIMARY KEY (job_id, item_id),
|
||||
FOREIGN KEY(job_id) REFERENCES agent_jobs(id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE INDEX idx_agent_jobs_status ON agent_jobs(status, updated_at DESC);
|
||||
CREATE INDEX idx_agent_job_items_status ON agent_job_items(job_id, status, row_index ASC);
|
||||
@@ -0,0 +1,2 @@
|
||||
ALTER TABLE agent_jobs
|
||||
ADD COLUMN auto_export INTEGER NOT NULL DEFAULT 1;
|
||||
@@ -21,6 +21,13 @@ pub use runtime::StateRuntime;
|
||||
///
|
||||
/// Most consumers should prefer [`StateRuntime`].
|
||||
pub use extract::apply_rollout_item;
|
||||
pub use model::AgentJob;
|
||||
pub use model::AgentJobCreateParams;
|
||||
pub use model::AgentJobItem;
|
||||
pub use model::AgentJobItemCreateParams;
|
||||
pub use model::AgentJobItemStatus;
|
||||
pub use model::AgentJobProgress;
|
||||
pub use model::AgentJobStatus;
|
||||
pub use model::Anchor;
|
||||
pub use model::BackfillState;
|
||||
pub use model::BackfillStats;
|
||||
|
||||
290
codex-rs/state/src/model/agent_job.rs
Normal file
290
codex-rs/state/src/model/agent_job.rs
Normal file
@@ -0,0 +1,290 @@
|
||||
use anyhow::Result;
|
||||
use chrono::DateTime;
|
||||
use chrono::Utc;
|
||||
use serde_json::Value;
|
||||
use sqlx::Row;
|
||||
use sqlx::sqlite::SqliteRow;
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum AgentJobStatus {
|
||||
Pending,
|
||||
Running,
|
||||
Completed,
|
||||
Failed,
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
impl AgentJobStatus {
|
||||
pub const fn as_str(self) -> &'static str {
|
||||
match self {
|
||||
AgentJobStatus::Pending => "pending",
|
||||
AgentJobStatus::Running => "running",
|
||||
AgentJobStatus::Completed => "completed",
|
||||
AgentJobStatus::Failed => "failed",
|
||||
AgentJobStatus::Cancelled => "cancelled",
|
||||
}
|
||||
}
|
||||
|
||||
pub fn parse(value: &str) -> Result<Self> {
|
||||
match value {
|
||||
"pending" => Ok(Self::Pending),
|
||||
"running" => Ok(Self::Running),
|
||||
"completed" => Ok(Self::Completed),
|
||||
"failed" => Ok(Self::Failed),
|
||||
"cancelled" => Ok(Self::Cancelled),
|
||||
_ => Err(anyhow::anyhow!("invalid agent job status: {value}")),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_final(self) -> bool {
|
||||
matches!(
|
||||
self,
|
||||
AgentJobStatus::Completed | AgentJobStatus::Failed | AgentJobStatus::Cancelled
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum AgentJobItemStatus {
|
||||
Pending,
|
||||
Running,
|
||||
Completed,
|
||||
Failed,
|
||||
}
|
||||
|
||||
impl AgentJobItemStatus {
|
||||
pub const fn as_str(self) -> &'static str {
|
||||
match self {
|
||||
AgentJobItemStatus::Pending => "pending",
|
||||
AgentJobItemStatus::Running => "running",
|
||||
AgentJobItemStatus::Completed => "completed",
|
||||
AgentJobItemStatus::Failed => "failed",
|
||||
}
|
||||
}
|
||||
|
||||
pub fn parse(value: &str) -> Result<Self> {
|
||||
match value {
|
||||
"pending" => Ok(Self::Pending),
|
||||
"running" => Ok(Self::Running),
|
||||
"completed" => Ok(Self::Completed),
|
||||
"failed" => Ok(Self::Failed),
|
||||
_ => Err(anyhow::anyhow!("invalid agent job item status: {value}")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct AgentJob {
|
||||
pub id: String,
|
||||
pub name: String,
|
||||
pub status: AgentJobStatus,
|
||||
pub instruction: String,
|
||||
pub auto_export: bool,
|
||||
pub output_schema_json: Option<Value>,
|
||||
pub input_headers: Vec<String>,
|
||||
pub input_csv_path: String,
|
||||
pub output_csv_path: String,
|
||||
pub created_at: DateTime<Utc>,
|
||||
pub updated_at: DateTime<Utc>,
|
||||
pub started_at: Option<DateTime<Utc>>,
|
||||
pub completed_at: Option<DateTime<Utc>>,
|
||||
pub last_error: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct AgentJobItem {
|
||||
pub job_id: String,
|
||||
pub item_id: String,
|
||||
pub row_index: i64,
|
||||
pub source_id: Option<String>,
|
||||
pub row_json: Value,
|
||||
pub status: AgentJobItemStatus,
|
||||
pub assigned_thread_id: Option<String>,
|
||||
pub attempt_count: i64,
|
||||
pub result_json: Option<Value>,
|
||||
pub last_error: Option<String>,
|
||||
pub created_at: DateTime<Utc>,
|
||||
pub updated_at: DateTime<Utc>,
|
||||
pub completed_at: Option<DateTime<Utc>>,
|
||||
pub reported_at: Option<DateTime<Utc>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct AgentJobProgress {
|
||||
pub total_items: usize,
|
||||
pub pending_items: usize,
|
||||
pub running_items: usize,
|
||||
pub completed_items: usize,
|
||||
pub failed_items: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct AgentJobCreateParams {
|
||||
pub id: String,
|
||||
pub name: String,
|
||||
pub instruction: String,
|
||||
pub auto_export: bool,
|
||||
pub output_schema_json: Option<Value>,
|
||||
pub input_headers: Vec<String>,
|
||||
pub input_csv_path: String,
|
||||
pub output_csv_path: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct AgentJobItemCreateParams {
|
||||
pub item_id: String,
|
||||
pub row_index: i64,
|
||||
pub source_id: Option<String>,
|
||||
pub row_json: Value,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct AgentJobRow {
|
||||
pub(crate) id: String,
|
||||
pub(crate) name: String,
|
||||
pub(crate) status: String,
|
||||
pub(crate) instruction: String,
|
||||
pub(crate) auto_export: i64,
|
||||
pub(crate) output_schema_json: Option<String>,
|
||||
pub(crate) input_headers_json: String,
|
||||
pub(crate) input_csv_path: String,
|
||||
pub(crate) output_csv_path: String,
|
||||
pub(crate) created_at: i64,
|
||||
pub(crate) updated_at: i64,
|
||||
pub(crate) started_at: Option<i64>,
|
||||
pub(crate) completed_at: Option<i64>,
|
||||
pub(crate) last_error: Option<String>,
|
||||
}
|
||||
|
||||
impl AgentJobRow {
|
||||
pub(crate) fn try_from_row(row: &SqliteRow) -> Result<Self> {
|
||||
Ok(Self {
|
||||
id: row.try_get("id")?,
|
||||
name: row.try_get("name")?,
|
||||
status: row.try_get("status")?,
|
||||
instruction: row.try_get("instruction")?,
|
||||
auto_export: row.try_get("auto_export")?,
|
||||
output_schema_json: row.try_get("output_schema_json")?,
|
||||
input_headers_json: row.try_get("input_headers_json")?,
|
||||
input_csv_path: row.try_get("input_csv_path")?,
|
||||
output_csv_path: row.try_get("output_csv_path")?,
|
||||
created_at: row.try_get("created_at")?,
|
||||
updated_at: row.try_get("updated_at")?,
|
||||
started_at: row.try_get("started_at")?,
|
||||
completed_at: row.try_get("completed_at")?,
|
||||
last_error: row.try_get("last_error")?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<AgentJobRow> for AgentJob {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(value: AgentJobRow) -> Result<Self, Self::Error> {
|
||||
let output_schema_json = value
|
||||
.output_schema_json
|
||||
.as_deref()
|
||||
.map(serde_json::from_str)
|
||||
.transpose()?;
|
||||
let input_headers = serde_json::from_str(value.input_headers_json.as_str())?;
|
||||
Ok(Self {
|
||||
id: value.id,
|
||||
name: value.name,
|
||||
status: AgentJobStatus::parse(value.status.as_str())?,
|
||||
instruction: value.instruction,
|
||||
auto_export: value.auto_export != 0,
|
||||
output_schema_json,
|
||||
input_headers,
|
||||
input_csv_path: value.input_csv_path,
|
||||
output_csv_path: value.output_csv_path,
|
||||
created_at: epoch_seconds_to_datetime(value.created_at)?,
|
||||
updated_at: epoch_seconds_to_datetime(value.updated_at)?,
|
||||
started_at: value
|
||||
.started_at
|
||||
.map(epoch_seconds_to_datetime)
|
||||
.transpose()?,
|
||||
completed_at: value
|
||||
.completed_at
|
||||
.map(epoch_seconds_to_datetime)
|
||||
.transpose()?,
|
||||
last_error: value.last_error,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct AgentJobItemRow {
|
||||
pub(crate) job_id: String,
|
||||
pub(crate) item_id: String,
|
||||
pub(crate) row_index: i64,
|
||||
pub(crate) source_id: Option<String>,
|
||||
pub(crate) row_json: String,
|
||||
pub(crate) status: String,
|
||||
pub(crate) assigned_thread_id: Option<String>,
|
||||
pub(crate) attempt_count: i64,
|
||||
pub(crate) result_json: Option<String>,
|
||||
pub(crate) last_error: Option<String>,
|
||||
pub(crate) created_at: i64,
|
||||
pub(crate) updated_at: i64,
|
||||
pub(crate) completed_at: Option<i64>,
|
||||
pub(crate) reported_at: Option<i64>,
|
||||
}
|
||||
|
||||
impl AgentJobItemRow {
|
||||
pub(crate) fn try_from_row(row: &SqliteRow) -> Result<Self> {
|
||||
Ok(Self {
|
||||
job_id: row.try_get("job_id")?,
|
||||
item_id: row.try_get("item_id")?,
|
||||
row_index: row.try_get("row_index")?,
|
||||
source_id: row.try_get("source_id")?,
|
||||
row_json: row.try_get("row_json")?,
|
||||
status: row.try_get("status")?,
|
||||
assigned_thread_id: row.try_get("assigned_thread_id")?,
|
||||
attempt_count: row.try_get("attempt_count")?,
|
||||
result_json: row.try_get("result_json")?,
|
||||
last_error: row.try_get("last_error")?,
|
||||
created_at: row.try_get("created_at")?,
|
||||
updated_at: row.try_get("updated_at")?,
|
||||
completed_at: row.try_get("completed_at")?,
|
||||
reported_at: row.try_get("reported_at")?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<AgentJobItemRow> for AgentJobItem {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(value: AgentJobItemRow) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
job_id: value.job_id,
|
||||
item_id: value.item_id,
|
||||
row_index: value.row_index,
|
||||
source_id: value.source_id,
|
||||
row_json: serde_json::from_str(value.row_json.as_str())?,
|
||||
status: AgentJobItemStatus::parse(value.status.as_str())?,
|
||||
assigned_thread_id: value.assigned_thread_id,
|
||||
attempt_count: value.attempt_count,
|
||||
result_json: value
|
||||
.result_json
|
||||
.as_deref()
|
||||
.map(serde_json::from_str)
|
||||
.transpose()?,
|
||||
last_error: value.last_error,
|
||||
created_at: epoch_seconds_to_datetime(value.created_at)?,
|
||||
updated_at: epoch_seconds_to_datetime(value.updated_at)?,
|
||||
completed_at: value
|
||||
.completed_at
|
||||
.map(epoch_seconds_to_datetime)
|
||||
.transpose()?,
|
||||
reported_at: value
|
||||
.reported_at
|
||||
.map(epoch_seconds_to_datetime)
|
||||
.transpose()?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn epoch_seconds_to_datetime(secs: i64) -> Result<DateTime<Utc>> {
|
||||
DateTime::<Utc>::from_timestamp(secs, 0)
|
||||
.ok_or_else(|| anyhow::anyhow!("invalid unix timestamp: {secs}"))
|
||||
}
|
||||
@@ -1,8 +1,16 @@
|
||||
mod agent_job;
|
||||
mod backfill_state;
|
||||
mod log;
|
||||
mod thread_memory;
|
||||
mod thread_metadata;
|
||||
|
||||
pub use agent_job::AgentJob;
|
||||
pub use agent_job::AgentJobCreateParams;
|
||||
pub use agent_job::AgentJobItem;
|
||||
pub use agent_job::AgentJobItemCreateParams;
|
||||
pub use agent_job::AgentJobItemStatus;
|
||||
pub use agent_job::AgentJobProgress;
|
||||
pub use agent_job::AgentJobStatus;
|
||||
pub use backfill_state::BackfillState;
|
||||
pub use backfill_state::BackfillStatus;
|
||||
pub use log::LogEntry;
|
||||
@@ -17,6 +25,8 @@ pub use thread_metadata::ThreadMetadata;
|
||||
pub use thread_metadata::ThreadMetadataBuilder;
|
||||
pub use thread_metadata::ThreadsPage;
|
||||
|
||||
pub(crate) use agent_job::AgentJobItemRow;
|
||||
pub(crate) use agent_job::AgentJobRow;
|
||||
pub(crate) use thread_memory::ThreadMemoryRow;
|
||||
pub(crate) use thread_metadata::ThreadRow;
|
||||
pub(crate) use thread_metadata::anchor_from_item;
|
||||
|
||||
@@ -1,3 +1,10 @@
|
||||
use crate::AgentJob;
|
||||
use crate::AgentJobCreateParams;
|
||||
use crate::AgentJobItem;
|
||||
use crate::AgentJobItemCreateParams;
|
||||
use crate::AgentJobItemStatus;
|
||||
use crate::AgentJobProgress;
|
||||
use crate::AgentJobStatus;
|
||||
use crate::DB_ERROR_METRIC;
|
||||
use crate::LogEntry;
|
||||
use crate::LogQuery;
|
||||
@@ -9,6 +16,8 @@ use crate::ThreadMetadataBuilder;
|
||||
use crate::ThreadsPage;
|
||||
use crate::apply_rollout_item;
|
||||
use crate::migrations::MIGRATOR;
|
||||
use crate::model::AgentJobItemRow;
|
||||
use crate::model::AgentJobRow;
|
||||
use crate::model::ThreadMemoryRow;
|
||||
use crate::model::ThreadRow;
|
||||
use crate::model::anchor_from_item;
|
||||
@@ -712,6 +721,486 @@ ON CONFLICT(thread_id, position) DO NOTHING
|
||||
self.upsert_thread(&metadata).await
|
||||
}
|
||||
|
||||
pub async fn create_agent_job(
|
||||
&self,
|
||||
params: &AgentJobCreateParams,
|
||||
items: &[AgentJobItemCreateParams],
|
||||
) -> anyhow::Result<AgentJob> {
|
||||
let now = Utc::now().timestamp();
|
||||
let input_headers_json = serde_json::to_string(¶ms.input_headers)?;
|
||||
let output_schema_json = params
|
||||
.output_schema_json
|
||||
.as_ref()
|
||||
.map(serde_json::to_string)
|
||||
.transpose()?;
|
||||
let mut tx = self.pool.begin().await?;
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO agent_jobs (
|
||||
id,
|
||||
name,
|
||||
status,
|
||||
instruction,
|
||||
auto_export,
|
||||
output_schema_json,
|
||||
input_headers_json,
|
||||
input_csv_path,
|
||||
output_csv_path,
|
||||
created_at,
|
||||
updated_at,
|
||||
started_at,
|
||||
completed_at,
|
||||
last_error
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL, NULL, NULL)
|
||||
"#,
|
||||
)
|
||||
.bind(params.id.as_str())
|
||||
.bind(params.name.as_str())
|
||||
.bind(AgentJobStatus::Pending.as_str())
|
||||
.bind(params.instruction.as_str())
|
||||
.bind(i64::from(params.auto_export))
|
||||
.bind(output_schema_json)
|
||||
.bind(input_headers_json)
|
||||
.bind(params.input_csv_path.as_str())
|
||||
.bind(params.output_csv_path.as_str())
|
||||
.bind(now)
|
||||
.bind(now)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
for item in items {
|
||||
let row_json = serde_json::to_string(&item.row_json)?;
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO agent_job_items (
|
||||
job_id,
|
||||
item_id,
|
||||
row_index,
|
||||
source_id,
|
||||
row_json,
|
||||
status,
|
||||
assigned_thread_id,
|
||||
attempt_count,
|
||||
result_json,
|
||||
last_error,
|
||||
created_at,
|
||||
updated_at,
|
||||
completed_at,
|
||||
reported_at
|
||||
) VALUES (?, ?, ?, ?, ?, ?, NULL, 0, NULL, NULL, ?, ?, NULL, NULL)
|
||||
"#,
|
||||
)
|
||||
.bind(params.id.as_str())
|
||||
.bind(item.item_id.as_str())
|
||||
.bind(item.row_index)
|
||||
.bind(item.source_id.as_deref())
|
||||
.bind(row_json)
|
||||
.bind(AgentJobItemStatus::Pending.as_str())
|
||||
.bind(now)
|
||||
.bind(now)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
}
|
||||
|
||||
tx.commit().await?;
|
||||
|
||||
let job_id = params.id.as_str();
|
||||
self.get_agent_job(job_id)
|
||||
.await?
|
||||
.ok_or_else(|| anyhow::anyhow!("failed to load created agent job {job_id}"))
|
||||
}
|
||||
|
||||
pub async fn get_agent_job(&self, job_id: &str) -> anyhow::Result<Option<AgentJob>> {
|
||||
let row = sqlx::query(
|
||||
r#"
|
||||
SELECT
|
||||
id,
|
||||
name,
|
||||
status,
|
||||
instruction,
|
||||
auto_export,
|
||||
output_schema_json,
|
||||
input_headers_json,
|
||||
input_csv_path,
|
||||
output_csv_path,
|
||||
created_at,
|
||||
updated_at,
|
||||
started_at,
|
||||
completed_at,
|
||||
last_error
|
||||
FROM agent_jobs
|
||||
WHERE id = ?
|
||||
"#,
|
||||
)
|
||||
.bind(job_id)
|
||||
.fetch_optional(self.pool.as_ref())
|
||||
.await?;
|
||||
row.map(|row| AgentJobRow::try_from_row(&row).and_then(AgentJob::try_from))
|
||||
.transpose()
|
||||
}
|
||||
|
||||
pub async fn list_agent_job_items(
|
||||
&self,
|
||||
job_id: &str,
|
||||
status: Option<AgentJobItemStatus>,
|
||||
limit: Option<usize>,
|
||||
) -> anyhow::Result<Vec<AgentJobItem>> {
|
||||
let mut builder = QueryBuilder::<Sqlite>::new(
|
||||
r#"
|
||||
SELECT
|
||||
job_id,
|
||||
item_id,
|
||||
row_index,
|
||||
source_id,
|
||||
row_json,
|
||||
status,
|
||||
assigned_thread_id,
|
||||
attempt_count,
|
||||
result_json,
|
||||
last_error,
|
||||
created_at,
|
||||
updated_at,
|
||||
completed_at,
|
||||
reported_at
|
||||
FROM agent_job_items
|
||||
WHERE job_id =
|
||||
"#,
|
||||
);
|
||||
builder.push_bind(job_id);
|
||||
if let Some(status) = status {
|
||||
builder.push(" AND status = ");
|
||||
builder.push_bind(status.as_str());
|
||||
}
|
||||
builder.push(" ORDER BY row_index ASC");
|
||||
if let Some(limit) = limit {
|
||||
builder.push(" LIMIT ");
|
||||
builder.push_bind(limit as i64);
|
||||
}
|
||||
let rows = builder.build().fetch_all(self.pool.as_ref()).await?;
|
||||
rows.into_iter()
|
||||
.map(|row| AgentJobItemRow::try_from_row(&row).and_then(AgentJobItem::try_from))
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub async fn get_agent_job_item(
|
||||
&self,
|
||||
job_id: &str,
|
||||
item_id: &str,
|
||||
) -> anyhow::Result<Option<AgentJobItem>> {
|
||||
let row = sqlx::query(
|
||||
r#"
|
||||
SELECT
|
||||
job_id,
|
||||
item_id,
|
||||
row_index,
|
||||
source_id,
|
||||
row_json,
|
||||
status,
|
||||
assigned_thread_id,
|
||||
attempt_count,
|
||||
result_json,
|
||||
last_error,
|
||||
created_at,
|
||||
updated_at,
|
||||
completed_at,
|
||||
reported_at
|
||||
FROM agent_job_items
|
||||
WHERE job_id = ? AND item_id = ?
|
||||
"#,
|
||||
)
|
||||
.bind(job_id)
|
||||
.bind(item_id)
|
||||
.fetch_optional(self.pool.as_ref())
|
||||
.await?;
|
||||
row.map(|row| AgentJobItemRow::try_from_row(&row).and_then(AgentJobItem::try_from))
|
||||
.transpose()
|
||||
}
|
||||
|
||||
pub async fn mark_agent_job_running(&self, job_id: &str) -> anyhow::Result<()> {
|
||||
let now = Utc::now().timestamp();
|
||||
sqlx::query(
|
||||
r#"
|
||||
UPDATE agent_jobs
|
||||
SET
|
||||
status = ?,
|
||||
updated_at = ?,
|
||||
started_at = COALESCE(started_at, ?),
|
||||
completed_at = NULL,
|
||||
last_error = NULL
|
||||
WHERE id = ?
|
||||
"#,
|
||||
)
|
||||
.bind(AgentJobStatus::Running.as_str())
|
||||
.bind(now)
|
||||
.bind(now)
|
||||
.bind(job_id)
|
||||
.execute(self.pool.as_ref())
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn mark_agent_job_completed(&self, job_id: &str) -> anyhow::Result<()> {
|
||||
let now = Utc::now().timestamp();
|
||||
sqlx::query(
|
||||
r#"
|
||||
UPDATE agent_jobs
|
||||
SET status = ?, updated_at = ?, completed_at = ?, last_error = NULL
|
||||
WHERE id = ?
|
||||
"#,
|
||||
)
|
||||
.bind(AgentJobStatus::Completed.as_str())
|
||||
.bind(now)
|
||||
.bind(now)
|
||||
.bind(job_id)
|
||||
.execute(self.pool.as_ref())
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn mark_agent_job_failed(
|
||||
&self,
|
||||
job_id: &str,
|
||||
error_message: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
let now = Utc::now().timestamp();
|
||||
sqlx::query(
|
||||
r#"
|
||||
UPDATE agent_jobs
|
||||
SET status = ?, updated_at = ?, completed_at = ?, last_error = ?
|
||||
WHERE id = ?
|
||||
"#,
|
||||
)
|
||||
.bind(AgentJobStatus::Failed.as_str())
|
||||
.bind(now)
|
||||
.bind(now)
|
||||
.bind(error_message)
|
||||
.bind(job_id)
|
||||
.execute(self.pool.as_ref())
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn mark_agent_job_item_running(
|
||||
&self,
|
||||
job_id: &str,
|
||||
item_id: &str,
|
||||
) -> anyhow::Result<bool> {
|
||||
let now = Utc::now().timestamp();
|
||||
let result = sqlx::query(
|
||||
r#"
|
||||
UPDATE agent_job_items
|
||||
SET
|
||||
status = ?,
|
||||
assigned_thread_id = NULL,
|
||||
attempt_count = attempt_count + 1,
|
||||
updated_at = ?,
|
||||
last_error = NULL
|
||||
WHERE job_id = ? AND item_id = ? AND status = ?
|
||||
"#,
|
||||
)
|
||||
.bind(AgentJobItemStatus::Running.as_str())
|
||||
.bind(now)
|
||||
.bind(job_id)
|
||||
.bind(item_id)
|
||||
.bind(AgentJobItemStatus::Pending.as_str())
|
||||
.execute(self.pool.as_ref())
|
||||
.await?;
|
||||
Ok(result.rows_affected() > 0)
|
||||
}
|
||||
|
||||
pub async fn mark_agent_job_item_pending(
|
||||
&self,
|
||||
job_id: &str,
|
||||
item_id: &str,
|
||||
error_message: Option<&str>,
|
||||
) -> anyhow::Result<bool> {
|
||||
let now = Utc::now().timestamp();
|
||||
let result = sqlx::query(
|
||||
r#"
|
||||
UPDATE agent_job_items
|
||||
SET
|
||||
status = ?,
|
||||
assigned_thread_id = NULL,
|
||||
updated_at = ?,
|
||||
last_error = ?
|
||||
WHERE job_id = ? AND item_id = ? AND status = ?
|
||||
"#,
|
||||
)
|
||||
.bind(AgentJobItemStatus::Pending.as_str())
|
||||
.bind(now)
|
||||
.bind(error_message)
|
||||
.bind(job_id)
|
||||
.bind(item_id)
|
||||
.bind(AgentJobItemStatus::Running.as_str())
|
||||
.execute(self.pool.as_ref())
|
||||
.await?;
|
||||
Ok(result.rows_affected() > 0)
|
||||
}
|
||||
|
||||
pub async fn set_agent_job_item_thread(
|
||||
&self,
|
||||
job_id: &str,
|
||||
item_id: &str,
|
||||
thread_id: &str,
|
||||
) -> anyhow::Result<bool> {
|
||||
let now = Utc::now().timestamp();
|
||||
let result = sqlx::query(
|
||||
r#"
|
||||
UPDATE agent_job_items
|
||||
SET assigned_thread_id = ?, updated_at = ?
|
||||
WHERE job_id = ? AND item_id = ? AND status = ?
|
||||
"#,
|
||||
)
|
||||
.bind(thread_id)
|
||||
.bind(now)
|
||||
.bind(job_id)
|
||||
.bind(item_id)
|
||||
.bind(AgentJobItemStatus::Running.as_str())
|
||||
.execute(self.pool.as_ref())
|
||||
.await?;
|
||||
Ok(result.rows_affected() > 0)
|
||||
}
|
||||
|
||||
pub async fn report_agent_job_item_result(
|
||||
&self,
|
||||
job_id: &str,
|
||||
item_id: &str,
|
||||
reporting_thread_id: &str,
|
||||
result_json: &Value,
|
||||
) -> anyhow::Result<bool> {
|
||||
let now = Utc::now().timestamp();
|
||||
let serialized = serde_json::to_string(result_json)?;
|
||||
let result = sqlx::query(
|
||||
r#"
|
||||
UPDATE agent_job_items
|
||||
SET
|
||||
result_json = ?,
|
||||
reported_at = ?,
|
||||
updated_at = ?,
|
||||
assigned_thread_id = COALESCE(assigned_thread_id, ?),
|
||||
last_error = NULL
|
||||
WHERE
|
||||
job_id = ?
|
||||
AND item_id = ?
|
||||
AND status = ?
|
||||
AND (assigned_thread_id IS NULL OR assigned_thread_id = ?)
|
||||
"#,
|
||||
)
|
||||
.bind(serialized)
|
||||
.bind(now)
|
||||
.bind(now)
|
||||
.bind(reporting_thread_id)
|
||||
.bind(job_id)
|
||||
.bind(item_id)
|
||||
.bind(AgentJobItemStatus::Running.as_str())
|
||||
.bind(reporting_thread_id)
|
||||
.execute(self.pool.as_ref())
|
||||
.await?;
|
||||
Ok(result.rows_affected() > 0)
|
||||
}
|
||||
|
||||
pub async fn mark_agent_job_item_completed(
|
||||
&self,
|
||||
job_id: &str,
|
||||
item_id: &str,
|
||||
) -> anyhow::Result<bool> {
|
||||
let now = Utc::now().timestamp();
|
||||
let result = sqlx::query(
|
||||
r#"
|
||||
UPDATE agent_job_items
|
||||
SET
|
||||
status = ?,
|
||||
completed_at = ?,
|
||||
updated_at = ?,
|
||||
assigned_thread_id = NULL
|
||||
WHERE
|
||||
job_id = ?
|
||||
AND item_id = ?
|
||||
AND status = ?
|
||||
AND result_json IS NOT NULL
|
||||
"#,
|
||||
)
|
||||
.bind(AgentJobItemStatus::Completed.as_str())
|
||||
.bind(now)
|
||||
.bind(now)
|
||||
.bind(job_id)
|
||||
.bind(item_id)
|
||||
.bind(AgentJobItemStatus::Running.as_str())
|
||||
.execute(self.pool.as_ref())
|
||||
.await?;
|
||||
Ok(result.rows_affected() > 0)
|
||||
}
|
||||
|
||||
pub async fn mark_agent_job_item_failed(
|
||||
&self,
|
||||
job_id: &str,
|
||||
item_id: &str,
|
||||
error_message: &str,
|
||||
) -> anyhow::Result<bool> {
|
||||
let now = Utc::now().timestamp();
|
||||
let result = sqlx::query(
|
||||
r#"
|
||||
UPDATE agent_job_items
|
||||
SET
|
||||
status = ?,
|
||||
completed_at = ?,
|
||||
updated_at = ?,
|
||||
last_error = ?,
|
||||
assigned_thread_id = NULL
|
||||
WHERE
|
||||
job_id = ?
|
||||
AND item_id = ?
|
||||
AND status = ?
|
||||
"#,
|
||||
)
|
||||
.bind(AgentJobItemStatus::Failed.as_str())
|
||||
.bind(now)
|
||||
.bind(now)
|
||||
.bind(error_message)
|
||||
.bind(job_id)
|
||||
.bind(item_id)
|
||||
.bind(AgentJobItemStatus::Running.as_str())
|
||||
.execute(self.pool.as_ref())
|
||||
.await?;
|
||||
Ok(result.rows_affected() > 0)
|
||||
}
|
||||
|
||||
pub async fn get_agent_job_progress(&self, job_id: &str) -> anyhow::Result<AgentJobProgress> {
|
||||
let row = sqlx::query(
|
||||
r#"
|
||||
SELECT
|
||||
COUNT(*) AS total_items,
|
||||
SUM(CASE WHEN status = ? THEN 1 ELSE 0 END) AS pending_items,
|
||||
SUM(CASE WHEN status = ? THEN 1 ELSE 0 END) AS running_items,
|
||||
SUM(CASE WHEN status = ? THEN 1 ELSE 0 END) AS completed_items,
|
||||
SUM(CASE WHEN status = ? THEN 1 ELSE 0 END) AS failed_items
|
||||
FROM agent_job_items
|
||||
WHERE job_id = ?
|
||||
"#,
|
||||
)
|
||||
.bind(AgentJobItemStatus::Pending.as_str())
|
||||
.bind(AgentJobItemStatus::Running.as_str())
|
||||
.bind(AgentJobItemStatus::Completed.as_str())
|
||||
.bind(AgentJobItemStatus::Failed.as_str())
|
||||
.bind(job_id)
|
||||
.fetch_one(self.pool.as_ref())
|
||||
.await?;
|
||||
|
||||
let total_items: i64 = row.try_get("total_items")?;
|
||||
let pending_items: Option<i64> = row.try_get("pending_items")?;
|
||||
let running_items: Option<i64> = row.try_get("running_items")?;
|
||||
let completed_items: Option<i64> = row.try_get("completed_items")?;
|
||||
let failed_items: Option<i64> = row.try_get("failed_items")?;
|
||||
Ok(AgentJobProgress {
|
||||
total_items: usize::try_from(total_items).unwrap_or_default(),
|
||||
pending_items: usize::try_from(pending_items.unwrap_or_default()).unwrap_or_default(),
|
||||
running_items: usize::try_from(running_items.unwrap_or_default()).unwrap_or_default(),
|
||||
completed_items: usize::try_from(completed_items.unwrap_or_default())
|
||||
.unwrap_or_default(),
|
||||
failed_items: usize::try_from(failed_items.unwrap_or_default()).unwrap_or_default(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn ensure_backfill_state_row(&self) -> anyhow::Result<()> {
|
||||
sqlx::query(
|
||||
r#"
|
||||
|
||||
Reference in New Issue
Block a user