mirror of
https://github.com/openai/codex.git
synced 2026-02-01 22:47:52 +00:00
Compare commits
1 Commits
main
...
dev/cc/dyn
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4176ec4192 |
@@ -1,7 +1,9 @@
|
||||
use std::collections::VecDeque;
|
||||
use std::fs;
|
||||
use std::io::BufRead;
|
||||
use std::io::BufReader;
|
||||
use std::io::Write;
|
||||
use std::path::Path;
|
||||
use std::process::Child;
|
||||
use std::process::ChildStdin;
|
||||
use std::process::ChildStdout;
|
||||
@@ -24,6 +26,7 @@ use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::CommandExecutionApprovalDecision;
|
||||
use codex_app_server_protocol::CommandExecutionRequestApprovalParams;
|
||||
use codex_app_server_protocol::CommandExecutionRequestApprovalResponse;
|
||||
use codex_app_server_protocol::DynamicToolSpec;
|
||||
use codex_app_server_protocol::FileChangeApprovalDecision;
|
||||
use codex_app_server_protocol::FileChangeRequestApprovalParams;
|
||||
use codex_app_server_protocol::FileChangeRequestApprovalResponse;
|
||||
@@ -82,6 +85,15 @@ struct Cli {
|
||||
)]
|
||||
config_overrides: Vec<String>,
|
||||
|
||||
/// JSON array of dynamic tool specs or a single tool object.
|
||||
/// Prefix a filename with '@' to read from a file.
|
||||
///
|
||||
/// Example:
|
||||
/// --dynamic-tools '[{"name":"demo","description":"Demo","inputSchema":{"type":"object"}}]'
|
||||
/// --dynamic-tools @/path/to/tools.json
|
||||
#[arg(long, value_name = "json-or-@file", global = true)]
|
||||
dynamic_tools: Option<String>,
|
||||
|
||||
#[command(subcommand)]
|
||||
command: CliCommand,
|
||||
}
|
||||
@@ -139,23 +151,29 @@ fn main() -> Result<()> {
|
||||
let Cli {
|
||||
codex_bin,
|
||||
config_overrides,
|
||||
dynamic_tools,
|
||||
command,
|
||||
} = Cli::parse();
|
||||
|
||||
let dynamic_tools = parse_dynamic_tools_arg(&dynamic_tools)?;
|
||||
|
||||
match command {
|
||||
CliCommand::SendMessage { user_message } => {
|
||||
ensure_dynamic_tools_unused(&dynamic_tools, "send-message")?;
|
||||
send_message(&codex_bin, &config_overrides, user_message)
|
||||
}
|
||||
CliCommand::SendMessageV2 { user_message } => {
|
||||
send_message_v2(&codex_bin, &config_overrides, user_message)
|
||||
send_message_v2(&codex_bin, &config_overrides, user_message, &dynamic_tools)
|
||||
}
|
||||
CliCommand::TriggerCmdApproval { user_message } => {
|
||||
trigger_cmd_approval(&codex_bin, &config_overrides, user_message)
|
||||
trigger_cmd_approval(&codex_bin, &config_overrides, user_message, &dynamic_tools)
|
||||
}
|
||||
CliCommand::TriggerPatchApproval { user_message } => {
|
||||
trigger_patch_approval(&codex_bin, &config_overrides, user_message)
|
||||
trigger_patch_approval(&codex_bin, &config_overrides, user_message, &dynamic_tools)
|
||||
}
|
||||
CliCommand::NoTriggerCmdApproval => {
|
||||
no_trigger_cmd_approval(&codex_bin, &config_overrides, &dynamic_tools)
|
||||
}
|
||||
CliCommand::NoTriggerCmdApproval => no_trigger_cmd_approval(&codex_bin, &config_overrides),
|
||||
CliCommand::SendFollowUpV2 {
|
||||
first_message,
|
||||
follow_up_message,
|
||||
@@ -164,10 +182,20 @@ fn main() -> Result<()> {
|
||||
&config_overrides,
|
||||
first_message,
|
||||
follow_up_message,
|
||||
&dynamic_tools,
|
||||
),
|
||||
CliCommand::TestLogin => test_login(&codex_bin, &config_overrides),
|
||||
CliCommand::GetAccountRateLimits => get_account_rate_limits(&codex_bin, &config_overrides),
|
||||
CliCommand::ModelList => model_list(&codex_bin, &config_overrides),
|
||||
CliCommand::TestLogin => {
|
||||
ensure_dynamic_tools_unused(&dynamic_tools, "test-login")?;
|
||||
test_login(&codex_bin, &config_overrides)
|
||||
}
|
||||
CliCommand::GetAccountRateLimits => {
|
||||
ensure_dynamic_tools_unused(&dynamic_tools, "get-account-rate-limits")?;
|
||||
get_account_rate_limits(&codex_bin, &config_overrides)
|
||||
}
|
||||
CliCommand::ModelList => {
|
||||
ensure_dynamic_tools_unused(&dynamic_tools, "model-list")?;
|
||||
model_list(&codex_bin, &config_overrides)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -197,14 +225,23 @@ fn send_message_v2(
|
||||
codex_bin: &str,
|
||||
config_overrides: &[String],
|
||||
user_message: String,
|
||||
dynamic_tools: &Option<Vec<DynamicToolSpec>>,
|
||||
) -> Result<()> {
|
||||
send_message_v2_with_policies(codex_bin, config_overrides, user_message, None, None)
|
||||
send_message_v2_with_policies(
|
||||
codex_bin,
|
||||
config_overrides,
|
||||
user_message,
|
||||
None,
|
||||
None,
|
||||
dynamic_tools,
|
||||
)
|
||||
}
|
||||
|
||||
fn trigger_cmd_approval(
|
||||
codex_bin: &str,
|
||||
config_overrides: &[String],
|
||||
user_message: Option<String>,
|
||||
dynamic_tools: &Option<Vec<DynamicToolSpec>>,
|
||||
) -> Result<()> {
|
||||
let default_prompt =
|
||||
"Run `touch /tmp/should-trigger-approval` so I can confirm the file exists.";
|
||||
@@ -215,6 +252,7 @@ fn trigger_cmd_approval(
|
||||
message,
|
||||
Some(AskForApproval::OnRequest),
|
||||
Some(SandboxPolicy::ReadOnly),
|
||||
dynamic_tools,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -222,6 +260,7 @@ fn trigger_patch_approval(
|
||||
codex_bin: &str,
|
||||
config_overrides: &[String],
|
||||
user_message: Option<String>,
|
||||
dynamic_tools: &Option<Vec<DynamicToolSpec>>,
|
||||
) -> Result<()> {
|
||||
let default_prompt =
|
||||
"Create a file named APPROVAL_DEMO.txt containing a short hello message using apply_patch.";
|
||||
@@ -232,12 +271,24 @@ fn trigger_patch_approval(
|
||||
message,
|
||||
Some(AskForApproval::OnRequest),
|
||||
Some(SandboxPolicy::ReadOnly),
|
||||
dynamic_tools,
|
||||
)
|
||||
}
|
||||
|
||||
fn no_trigger_cmd_approval(codex_bin: &str, config_overrides: &[String]) -> Result<()> {
|
||||
fn no_trigger_cmd_approval(
|
||||
codex_bin: &str,
|
||||
config_overrides: &[String],
|
||||
dynamic_tools: &Option<Vec<DynamicToolSpec>>,
|
||||
) -> Result<()> {
|
||||
let prompt = "Run `touch should_not_trigger_approval.txt`";
|
||||
send_message_v2_with_policies(codex_bin, config_overrides, prompt.to_string(), None, None)
|
||||
send_message_v2_with_policies(
|
||||
codex_bin,
|
||||
config_overrides,
|
||||
prompt.to_string(),
|
||||
None,
|
||||
None,
|
||||
dynamic_tools,
|
||||
)
|
||||
}
|
||||
|
||||
fn send_message_v2_with_policies(
|
||||
@@ -246,13 +297,17 @@ fn send_message_v2_with_policies(
|
||||
user_message: String,
|
||||
approval_policy: Option<AskForApproval>,
|
||||
sandbox_policy: Option<SandboxPolicy>,
|
||||
dynamic_tools: &Option<Vec<DynamicToolSpec>>,
|
||||
) -> Result<()> {
|
||||
let mut client = CodexClient::spawn(codex_bin, config_overrides)?;
|
||||
|
||||
let initialize = client.initialize()?;
|
||||
println!("< initialize response: {initialize:?}");
|
||||
|
||||
let thread_response = client.thread_start(ThreadStartParams::default())?;
|
||||
let thread_response = client.thread_start(ThreadStartParams {
|
||||
dynamic_tools: dynamic_tools.clone(),
|
||||
..Default::default()
|
||||
})?;
|
||||
println!("< thread/start response: {thread_response:?}");
|
||||
let mut turn_params = TurnStartParams {
|
||||
thread_id: thread_response.thread.id.clone(),
|
||||
@@ -279,13 +334,17 @@ fn send_follow_up_v2(
|
||||
config_overrides: &[String],
|
||||
first_message: String,
|
||||
follow_up_message: String,
|
||||
dynamic_tools: &Option<Vec<DynamicToolSpec>>,
|
||||
) -> Result<()> {
|
||||
let mut client = CodexClient::spawn(codex_bin, config_overrides)?;
|
||||
|
||||
let initialize = client.initialize()?;
|
||||
println!("< initialize response: {initialize:?}");
|
||||
|
||||
let thread_response = client.thread_start(ThreadStartParams::default())?;
|
||||
let thread_response = client.thread_start(ThreadStartParams {
|
||||
dynamic_tools: dynamic_tools.clone(),
|
||||
..Default::default()
|
||||
})?;
|
||||
println!("< thread/start response: {thread_response:?}");
|
||||
|
||||
let first_turn_params = TurnStartParams {
|
||||
@@ -371,6 +430,40 @@ fn model_list(codex_bin: &str, config_overrides: &[String]) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn ensure_dynamic_tools_unused(
|
||||
dynamic_tools: &Option<Vec<DynamicToolSpec>>,
|
||||
command: &str,
|
||||
) -> Result<()> {
|
||||
if dynamic_tools.is_some() {
|
||||
bail!(
|
||||
"dynamic tools are only supported for v2 thread/start; remove --dynamic-tools for {command} or use send-message-v2"
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn parse_dynamic_tools_arg(dynamic_tools: &Option<String>) -> Result<Option<Vec<DynamicToolSpec>>> {
|
||||
let Some(raw_arg) = dynamic_tools.as_deref() else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let raw_json = if let Some(path) = raw_arg.strip_prefix('@') {
|
||||
fs::read_to_string(Path::new(path))
|
||||
.with_context(|| format!("read dynamic tools file {path}"))?
|
||||
} else {
|
||||
raw_arg.to_string()
|
||||
};
|
||||
|
||||
let value: Value = serde_json::from_str(&raw_json).context("parse dynamic tools JSON")?;
|
||||
let tools = match value {
|
||||
Value::Array(_) => serde_json::from_value(value).context("decode dynamic tools array")?,
|
||||
Value::Object(_) => vec![serde_json::from_value(value).context("decode dynamic tool")?],
|
||||
_ => bail!("dynamic tools JSON must be an object or array"),
|
||||
};
|
||||
|
||||
Ok(Some(tools))
|
||||
}
|
||||
|
||||
struct CodexClient {
|
||||
child: Child,
|
||||
stdin: Option<ChildStdin>,
|
||||
|
||||
@@ -331,9 +331,35 @@ impl Codex {
|
||||
.clone()
|
||||
.or_else(|| conversation_history.get_base_instructions().map(|s| s.text))
|
||||
.unwrap_or_else(|| model_info.get_model_instructions(config.model_personality));
|
||||
// Respect explicit thread-start tools; fall back to persisted tools when resuming a thread.
|
||||
// Respect thread-start tools. When missing (resumed/forked threads), read from the db
|
||||
// first, then fall back to rollout-file tools.
|
||||
let persisted_tools = if dynamic_tools.is_empty()
|
||||
&& config.features.enabled(Feature::Sqlite)
|
||||
{
|
||||
let thread_id = match &conversation_history {
|
||||
InitialHistory::Resumed(resumed) => Some(resumed.conversation_id),
|
||||
InitialHistory::Forked(_) => conversation_history.forked_from_id(),
|
||||
InitialHistory::New => None,
|
||||
};
|
||||
match thread_id {
|
||||
Some(thread_id) => {
|
||||
let state_db_ctx = state_db::open_if_present(
|
||||
config.codex_home.as_path(),
|
||||
config.model_provider_id.as_str(),
|
||||
)
|
||||
.await;
|
||||
state_db::get_dynamic_tools(state_db_ctx.as_deref(), thread_id, "codex_spawn")
|
||||
.await
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let dynamic_tools = if dynamic_tools.is_empty() {
|
||||
conversation_history.get_dynamic_tools().unwrap_or_default()
|
||||
persisted_tools
|
||||
.or_else(|| conversation_history.get_dynamic_tools())
|
||||
.unwrap_or_default()
|
||||
} else {
|
||||
dynamic_tools
|
||||
};
|
||||
|
||||
@@ -187,6 +187,22 @@ pub(crate) async fn backfill_sessions(
|
||||
warn!("failed to upsert rollout {}: {err}", path.display());
|
||||
} else {
|
||||
stats.upserted = stats.upserted.saturating_add(1);
|
||||
if let Ok(meta_line) = rollout::list::read_session_meta_line(&path).await {
|
||||
if let Err(err) = runtime
|
||||
.replace_dynamic_tools(
|
||||
meta_line.meta.id,
|
||||
meta_line.meta.dynamic_tools.as_deref(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!("failed to backfill dynamic tools {}: {err}", path.display());
|
||||
}
|
||||
} else {
|
||||
warn!(
|
||||
"failed to read session meta for dynamic tools {}",
|
||||
path.display()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
|
||||
@@ -9,6 +9,7 @@ use chrono::Timelike;
|
||||
use chrono::Utc;
|
||||
use codex_otel::OtelManager;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::dynamic_tools::DynamicToolSpec;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_state::DB_METRIC_COMPARE_ERROR;
|
||||
@@ -196,6 +197,37 @@ pub async fn find_rollout_path_by_id(
|
||||
})
|
||||
}
|
||||
|
||||
/// Get dynamic tools for a thread id using SQLite.
|
||||
pub async fn get_dynamic_tools(
|
||||
context: Option<&codex_state::StateRuntime>,
|
||||
thread_id: ThreadId,
|
||||
stage: &str,
|
||||
) -> Option<Vec<DynamicToolSpec>> {
|
||||
let ctx = context?;
|
||||
match ctx.get_dynamic_tools(thread_id).await {
|
||||
Ok(tools) => tools,
|
||||
Err(err) => {
|
||||
warn!("state db get_dynamic_tools failed during {stage}: {err}");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Replace dynamic tools for a thread id using SQLite.
|
||||
pub async fn replace_dynamic_tools(
|
||||
context: Option<&codex_state::StateRuntime>,
|
||||
thread_id: ThreadId,
|
||||
tools: Option<&[DynamicToolSpec]>,
|
||||
stage: &str,
|
||||
) {
|
||||
let Some(ctx) = context else {
|
||||
return;
|
||||
};
|
||||
if let Err(err) = ctx.replace_dynamic_tools(thread_id, tools).await {
|
||||
warn!("state db replace_dynamic_tools failed during {stage}: {err}");
|
||||
}
|
||||
}
|
||||
|
||||
/// Reconcile rollout items into SQLite, falling back to scanning the rollout file.
|
||||
pub async fn reconcile_rollout(
|
||||
context: Option<&codex_state::StateRuntime>,
|
||||
@@ -235,6 +267,21 @@ pub async fn reconcile_rollout(
|
||||
"state db reconcile_rollout upsert failed {}: {err}",
|
||||
rollout_path.display()
|
||||
);
|
||||
return;
|
||||
}
|
||||
if let Ok(meta_line) = crate::rollout::list::read_session_meta_line(rollout_path).await {
|
||||
replace_dynamic_tools(
|
||||
Some(ctx),
|
||||
meta_line.meta.id,
|
||||
meta_line.meta.dynamic_tools.as_deref(),
|
||||
"reconcile_rollout",
|
||||
)
|
||||
.await;
|
||||
} else {
|
||||
warn!(
|
||||
"state db reconcile_rollout missing session meta {}",
|
||||
rollout_path.display()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use anyhow::Result;
|
||||
use codex_core::features::Feature;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::dynamic_tools::DynamicToolSpec;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::RolloutLine;
|
||||
@@ -74,6 +75,28 @@ async fn backfill_scans_existing_rollouts() -> Result<()> {
|
||||
let rollout_rel_path = format!("sessions/2026/01/27/rollout-2026-01-27T12-00-00-{uuid}.jsonl");
|
||||
let rollout_rel_path_for_hook = rollout_rel_path.clone();
|
||||
|
||||
let dynamic_tools = vec![
|
||||
DynamicToolSpec {
|
||||
name: "geo_lookup".to_string(),
|
||||
description: "lookup a city".to_string(),
|
||||
input_schema: json!({
|
||||
"type": "object",
|
||||
"required": ["city"],
|
||||
"properties": { "city": { "type": "string" } }
|
||||
}),
|
||||
},
|
||||
DynamicToolSpec {
|
||||
name: "weather_lookup".to_string(),
|
||||
description: "lookup weather".to_string(),
|
||||
input_schema: json!({
|
||||
"type": "object",
|
||||
"required": ["zip"],
|
||||
"properties": { "zip": { "type": "string" } }
|
||||
}),
|
||||
},
|
||||
];
|
||||
let dynamic_tools_for_hook = dynamic_tools.clone();
|
||||
|
||||
let mut builder = test_codex()
|
||||
.with_pre_build_hook(move |codex_home| {
|
||||
let rollout_path = codex_home.join(&rollout_rel_path_for_hook);
|
||||
@@ -81,7 +104,6 @@ async fn backfill_scans_existing_rollouts() -> Result<()> {
|
||||
.parent()
|
||||
.expect("rollout path should have parent");
|
||||
fs::create_dir_all(parent).expect("should create rollout directory");
|
||||
|
||||
let session_meta_line = SessionMetaLine {
|
||||
meta: SessionMeta {
|
||||
id: thread_id,
|
||||
@@ -93,7 +115,7 @@ async fn backfill_scans_existing_rollouts() -> Result<()> {
|
||||
source: SessionSource::default(),
|
||||
model_provider: None,
|
||||
base_instructions: None,
|
||||
dynamic_tools: None,
|
||||
dynamic_tools: Some(dynamic_tools_for_hook),
|
||||
},
|
||||
git: None,
|
||||
};
|
||||
@@ -155,6 +177,12 @@ async fn backfill_scans_existing_rollouts() -> Result<()> {
|
||||
assert_eq!(metadata.model_provider, default_provider);
|
||||
assert!(metadata.has_user_event);
|
||||
|
||||
let stored_tools = db
|
||||
.get_dynamic_tools(thread_id)
|
||||
.await?
|
||||
.expect("dynamic tools should be stored");
|
||||
assert_eq!(stored_tools, dynamic_tools);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
11
codex-rs/state/migrations/0004_thread_dynamic_tools.sql
Normal file
11
codex-rs/state/migrations/0004_thread_dynamic_tools.sql
Normal file
@@ -0,0 +1,11 @@
|
||||
CREATE TABLE thread_dynamic_tools (
|
||||
thread_id TEXT NOT NULL,
|
||||
position INTEGER NOT NULL,
|
||||
name TEXT NOT NULL,
|
||||
description TEXT NOT NULL,
|
||||
input_schema TEXT NOT NULL,
|
||||
PRIMARY KEY(thread_id, position),
|
||||
FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE INDEX idx_thread_dynamic_tools_thread ON thread_dynamic_tools(thread_id);
|
||||
@@ -16,7 +16,9 @@ use chrono::DateTime;
|
||||
use chrono::Utc;
|
||||
use codex_otel::OtelManager;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::dynamic_tools::DynamicToolSpec;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use serde_json::Value;
|
||||
use sqlx::QueryBuilder;
|
||||
use sqlx::Row;
|
||||
use sqlx::Sqlite;
|
||||
@@ -115,6 +117,38 @@ WHERE id = ?
|
||||
.transpose()
|
||||
}
|
||||
|
||||
/// Get dynamic tools for a thread, if present.
|
||||
pub async fn get_dynamic_tools(
|
||||
&self,
|
||||
thread_id: ThreadId,
|
||||
) -> anyhow::Result<Option<Vec<DynamicToolSpec>>> {
|
||||
let rows = sqlx::query(
|
||||
r#"
|
||||
SELECT name, description, input_schema
|
||||
FROM thread_dynamic_tools
|
||||
WHERE thread_id = ?
|
||||
ORDER BY position ASC
|
||||
"#,
|
||||
)
|
||||
.bind(thread_id.to_string())
|
||||
.fetch_all(self.pool.as_ref())
|
||||
.await?;
|
||||
if rows.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
let mut tools = Vec::with_capacity(rows.len());
|
||||
for row in rows {
|
||||
let input_schema: String = row.try_get("input_schema")?;
|
||||
let input_schema = serde_json::from_str::<Value>(input_schema.as_str())?;
|
||||
tools.push(DynamicToolSpec {
|
||||
name: row.try_get("name")?,
|
||||
description: row.try_get("description")?,
|
||||
input_schema,
|
||||
});
|
||||
}
|
||||
Ok(Some(tools))
|
||||
}
|
||||
|
||||
/// Find a rollout path by thread id using the underlying database.
|
||||
pub async fn find_rollout_path_by_id(
|
||||
&self,
|
||||
@@ -367,6 +401,46 @@ ON CONFLICT(id) DO UPDATE SET
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Replace dynamic tools for a thread.
|
||||
pub async fn replace_dynamic_tools(
|
||||
&self,
|
||||
thread_id: ThreadId,
|
||||
tools: Option<&[DynamicToolSpec]>,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut tx = self.pool.begin().await?;
|
||||
let thread_id = thread_id.to_string();
|
||||
sqlx::query("DELETE FROM thread_dynamic_tools WHERE thread_id = ?")
|
||||
.bind(thread_id.as_str())
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
if let Some(tools) = tools {
|
||||
for (idx, tool) in tools.iter().enumerate() {
|
||||
let position = i64::try_from(idx).unwrap_or(i64::MAX);
|
||||
let input_schema = serde_json::to_string(&tool.input_schema)?;
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO thread_dynamic_tools (
|
||||
thread_id,
|
||||
position,
|
||||
name,
|
||||
description,
|
||||
input_schema
|
||||
) VALUES (?, ?, ?, ?, ?)
|
||||
"#,
|
||||
)
|
||||
.bind(thread_id.as_str())
|
||||
.bind(position)
|
||||
.bind(tool.name.as_str())
|
||||
.bind(tool.description.as_str())
|
||||
.bind(input_schema)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
tx.commit().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Apply rollout items incrementally using the underlying database.
|
||||
pub async fn apply_rollout_items(
|
||||
&self,
|
||||
@@ -388,12 +462,25 @@ ON CONFLICT(id) DO UPDATE SET
|
||||
if let Some(updated_at) = file_modified_time_utc(builder.rollout_path.as_path()).await {
|
||||
metadata.updated_at = updated_at;
|
||||
}
|
||||
// Keep the thread upsert before dynamic tools to satisfy the foreign key constraint:
|
||||
// thread_dynamic_tools.thread_id -> threads.id.
|
||||
if let Err(err) = self.upsert_thread(&metadata).await {
|
||||
if let Some(otel) = otel {
|
||||
otel.counter(DB_ERROR_METRIC, 1, &[("stage", "apply_rollout_items")]);
|
||||
}
|
||||
return Err(err);
|
||||
}
|
||||
let dynamic_tools = extract_dynamic_tools(items);
|
||||
if let Some(dynamic_tools) = dynamic_tools
|
||||
&& let Err(err) = self
|
||||
.replace_dynamic_tools(builder.id, dynamic_tools.as_deref())
|
||||
.await
|
||||
{
|
||||
if let Some(otel) = otel {
|
||||
otel.counter(DB_ERROR_METRIC, 1, &[("stage", "replace_dynamic_tools")]);
|
||||
}
|
||||
return Err(err);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -505,6 +592,16 @@ fn push_like_filters<'a>(
|
||||
builder.push(")");
|
||||
}
|
||||
|
||||
fn extract_dynamic_tools(items: &[RolloutItem]) -> Option<Option<Vec<DynamicToolSpec>>> {
|
||||
items.iter().find_map(|item| match item {
|
||||
RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.dynamic_tools.clone()),
|
||||
RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
})
|
||||
}
|
||||
|
||||
async fn open_sqlite(path: &Path) -> anyhow::Result<SqlitePool> {
|
||||
let options = SqliteConnectOptions::new()
|
||||
.filename(path)
|
||||
|
||||
Reference in New Issue
Block a user