Compare commits

...

1 Commits

Author SHA1 Message Date
celia-oai
4176ec4192 changes 2026-01-30 14:35:38 -08:00
7 changed files with 334 additions and 16 deletions

View File

@@ -1,7 +1,9 @@
use std::collections::VecDeque;
use std::fs;
use std::io::BufRead;
use std::io::BufReader;
use std::io::Write;
use std::path::Path;
use std::process::Child;
use std::process::ChildStdin;
use std::process::ChildStdout;
@@ -24,6 +26,7 @@ use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::CommandExecutionApprovalDecision;
use codex_app_server_protocol::CommandExecutionRequestApprovalParams;
use codex_app_server_protocol::CommandExecutionRequestApprovalResponse;
use codex_app_server_protocol::DynamicToolSpec;
use codex_app_server_protocol::FileChangeApprovalDecision;
use codex_app_server_protocol::FileChangeRequestApprovalParams;
use codex_app_server_protocol::FileChangeRequestApprovalResponse;
@@ -82,6 +85,15 @@ struct Cli {
)]
config_overrides: Vec<String>,
/// JSON array of dynamic tool specs or a single tool object.
/// Prefix a filename with '@' to read from a file.
///
/// Example:
/// --dynamic-tools '[{"name":"demo","description":"Demo","inputSchema":{"type":"object"}}]'
/// --dynamic-tools @/path/to/tools.json
#[arg(long, value_name = "json-or-@file", global = true)]
dynamic_tools: Option<String>,
#[command(subcommand)]
command: CliCommand,
}
@@ -139,23 +151,29 @@ fn main() -> Result<()> {
let Cli {
codex_bin,
config_overrides,
dynamic_tools,
command,
} = Cli::parse();
let dynamic_tools = parse_dynamic_tools_arg(&dynamic_tools)?;
match command {
CliCommand::SendMessage { user_message } => {
ensure_dynamic_tools_unused(&dynamic_tools, "send-message")?;
send_message(&codex_bin, &config_overrides, user_message)
}
CliCommand::SendMessageV2 { user_message } => {
send_message_v2(&codex_bin, &config_overrides, user_message)
send_message_v2(&codex_bin, &config_overrides, user_message, &dynamic_tools)
}
CliCommand::TriggerCmdApproval { user_message } => {
trigger_cmd_approval(&codex_bin, &config_overrides, user_message)
trigger_cmd_approval(&codex_bin, &config_overrides, user_message, &dynamic_tools)
}
CliCommand::TriggerPatchApproval { user_message } => {
trigger_patch_approval(&codex_bin, &config_overrides, user_message)
trigger_patch_approval(&codex_bin, &config_overrides, user_message, &dynamic_tools)
}
CliCommand::NoTriggerCmdApproval => {
no_trigger_cmd_approval(&codex_bin, &config_overrides, &dynamic_tools)
}
CliCommand::NoTriggerCmdApproval => no_trigger_cmd_approval(&codex_bin, &config_overrides),
CliCommand::SendFollowUpV2 {
first_message,
follow_up_message,
@@ -164,10 +182,20 @@ fn main() -> Result<()> {
&config_overrides,
first_message,
follow_up_message,
&dynamic_tools,
),
CliCommand::TestLogin => test_login(&codex_bin, &config_overrides),
CliCommand::GetAccountRateLimits => get_account_rate_limits(&codex_bin, &config_overrides),
CliCommand::ModelList => model_list(&codex_bin, &config_overrides),
CliCommand::TestLogin => {
ensure_dynamic_tools_unused(&dynamic_tools, "test-login")?;
test_login(&codex_bin, &config_overrides)
}
CliCommand::GetAccountRateLimits => {
ensure_dynamic_tools_unused(&dynamic_tools, "get-account-rate-limits")?;
get_account_rate_limits(&codex_bin, &config_overrides)
}
CliCommand::ModelList => {
ensure_dynamic_tools_unused(&dynamic_tools, "model-list")?;
model_list(&codex_bin, &config_overrides)
}
}
}
@@ -197,14 +225,23 @@ fn send_message_v2(
codex_bin: &str,
config_overrides: &[String],
user_message: String,
dynamic_tools: &Option<Vec<DynamicToolSpec>>,
) -> Result<()> {
send_message_v2_with_policies(codex_bin, config_overrides, user_message, None, None)
send_message_v2_with_policies(
codex_bin,
config_overrides,
user_message,
None,
None,
dynamic_tools,
)
}
fn trigger_cmd_approval(
codex_bin: &str,
config_overrides: &[String],
user_message: Option<String>,
dynamic_tools: &Option<Vec<DynamicToolSpec>>,
) -> Result<()> {
let default_prompt =
"Run `touch /tmp/should-trigger-approval` so I can confirm the file exists.";
@@ -215,6 +252,7 @@ fn trigger_cmd_approval(
message,
Some(AskForApproval::OnRequest),
Some(SandboxPolicy::ReadOnly),
dynamic_tools,
)
}
@@ -222,6 +260,7 @@ fn trigger_patch_approval(
codex_bin: &str,
config_overrides: &[String],
user_message: Option<String>,
dynamic_tools: &Option<Vec<DynamicToolSpec>>,
) -> Result<()> {
let default_prompt =
"Create a file named APPROVAL_DEMO.txt containing a short hello message using apply_patch.";
@@ -232,12 +271,24 @@ fn trigger_patch_approval(
message,
Some(AskForApproval::OnRequest),
Some(SandboxPolicy::ReadOnly),
dynamic_tools,
)
}
fn no_trigger_cmd_approval(codex_bin: &str, config_overrides: &[String]) -> Result<()> {
fn no_trigger_cmd_approval(
codex_bin: &str,
config_overrides: &[String],
dynamic_tools: &Option<Vec<DynamicToolSpec>>,
) -> Result<()> {
let prompt = "Run `touch should_not_trigger_approval.txt`";
send_message_v2_with_policies(codex_bin, config_overrides, prompt.to_string(), None, None)
send_message_v2_with_policies(
codex_bin,
config_overrides,
prompt.to_string(),
None,
None,
dynamic_tools,
)
}
fn send_message_v2_with_policies(
@@ -246,13 +297,17 @@ fn send_message_v2_with_policies(
user_message: String,
approval_policy: Option<AskForApproval>,
sandbox_policy: Option<SandboxPolicy>,
dynamic_tools: &Option<Vec<DynamicToolSpec>>,
) -> Result<()> {
let mut client = CodexClient::spawn(codex_bin, config_overrides)?;
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
let thread_response = client.thread_start(ThreadStartParams::default())?;
let thread_response = client.thread_start(ThreadStartParams {
dynamic_tools: dynamic_tools.clone(),
..Default::default()
})?;
println!("< thread/start response: {thread_response:?}");
let mut turn_params = TurnStartParams {
thread_id: thread_response.thread.id.clone(),
@@ -279,13 +334,17 @@ fn send_follow_up_v2(
config_overrides: &[String],
first_message: String,
follow_up_message: String,
dynamic_tools: &Option<Vec<DynamicToolSpec>>,
) -> Result<()> {
let mut client = CodexClient::spawn(codex_bin, config_overrides)?;
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
let thread_response = client.thread_start(ThreadStartParams::default())?;
let thread_response = client.thread_start(ThreadStartParams {
dynamic_tools: dynamic_tools.clone(),
..Default::default()
})?;
println!("< thread/start response: {thread_response:?}");
let first_turn_params = TurnStartParams {
@@ -371,6 +430,40 @@ fn model_list(codex_bin: &str, config_overrides: &[String]) -> Result<()> {
Ok(())
}
fn ensure_dynamic_tools_unused(
dynamic_tools: &Option<Vec<DynamicToolSpec>>,
command: &str,
) -> Result<()> {
if dynamic_tools.is_some() {
bail!(
"dynamic tools are only supported for v2 thread/start; remove --dynamic-tools for {command} or use send-message-v2"
);
}
Ok(())
}
fn parse_dynamic_tools_arg(dynamic_tools: &Option<String>) -> Result<Option<Vec<DynamicToolSpec>>> {
let Some(raw_arg) = dynamic_tools.as_deref() else {
return Ok(None);
};
let raw_json = if let Some(path) = raw_arg.strip_prefix('@') {
fs::read_to_string(Path::new(path))
.with_context(|| format!("read dynamic tools file {path}"))?
} else {
raw_arg.to_string()
};
let value: Value = serde_json::from_str(&raw_json).context("parse dynamic tools JSON")?;
let tools = match value {
Value::Array(_) => serde_json::from_value(value).context("decode dynamic tools array")?,
Value::Object(_) => vec![serde_json::from_value(value).context("decode dynamic tool")?],
_ => bail!("dynamic tools JSON must be an object or array"),
};
Ok(Some(tools))
}
struct CodexClient {
child: Child,
stdin: Option<ChildStdin>,

View File

@@ -331,9 +331,35 @@ impl Codex {
.clone()
.or_else(|| conversation_history.get_base_instructions().map(|s| s.text))
.unwrap_or_else(|| model_info.get_model_instructions(config.model_personality));
// Respect explicit thread-start tools; fall back to persisted tools when resuming a thread.
// Respect thread-start tools. When missing (resumed/forked threads), read from the db
// first, then fall back to rollout-file tools.
let persisted_tools = if dynamic_tools.is_empty()
&& config.features.enabled(Feature::Sqlite)
{
let thread_id = match &conversation_history {
InitialHistory::Resumed(resumed) => Some(resumed.conversation_id),
InitialHistory::Forked(_) => conversation_history.forked_from_id(),
InitialHistory::New => None,
};
match thread_id {
Some(thread_id) => {
let state_db_ctx = state_db::open_if_present(
config.codex_home.as_path(),
config.model_provider_id.as_str(),
)
.await;
state_db::get_dynamic_tools(state_db_ctx.as_deref(), thread_id, "codex_spawn")
.await
}
None => None,
}
} else {
None
};
let dynamic_tools = if dynamic_tools.is_empty() {
conversation_history.get_dynamic_tools().unwrap_or_default()
persisted_tools
.or_else(|| conversation_history.get_dynamic_tools())
.unwrap_or_default()
} else {
dynamic_tools
};

View File

@@ -187,6 +187,22 @@ pub(crate) async fn backfill_sessions(
warn!("failed to upsert rollout {}: {err}", path.display());
} else {
stats.upserted = stats.upserted.saturating_add(1);
if let Ok(meta_line) = rollout::list::read_session_meta_line(&path).await {
if let Err(err) = runtime
.replace_dynamic_tools(
meta_line.meta.id,
meta_line.meta.dynamic_tools.as_deref(),
)
.await
{
warn!("failed to backfill dynamic tools {}: {err}", path.display());
}
} else {
warn!(
"failed to read session meta for dynamic tools {}",
path.display()
);
}
}
}
Err(err) => {

View File

@@ -9,6 +9,7 @@ use chrono::Timelike;
use chrono::Utc;
use codex_otel::OtelManager;
use codex_protocol::ThreadId;
use codex_protocol::dynamic_tools::DynamicToolSpec;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SessionSource;
use codex_state::DB_METRIC_COMPARE_ERROR;
@@ -196,6 +197,37 @@ pub async fn find_rollout_path_by_id(
})
}
/// Get dynamic tools for a thread id using SQLite.
pub async fn get_dynamic_tools(
context: Option<&codex_state::StateRuntime>,
thread_id: ThreadId,
stage: &str,
) -> Option<Vec<DynamicToolSpec>> {
let ctx = context?;
match ctx.get_dynamic_tools(thread_id).await {
Ok(tools) => tools,
Err(err) => {
warn!("state db get_dynamic_tools failed during {stage}: {err}");
None
}
}
}
/// Replace dynamic tools for a thread id using SQLite.
pub async fn replace_dynamic_tools(
context: Option<&codex_state::StateRuntime>,
thread_id: ThreadId,
tools: Option<&[DynamicToolSpec]>,
stage: &str,
) {
let Some(ctx) = context else {
return;
};
if let Err(err) = ctx.replace_dynamic_tools(thread_id, tools).await {
warn!("state db replace_dynamic_tools failed during {stage}: {err}");
}
}
/// Reconcile rollout items into SQLite, falling back to scanning the rollout file.
pub async fn reconcile_rollout(
context: Option<&codex_state::StateRuntime>,
@@ -235,6 +267,21 @@ pub async fn reconcile_rollout(
"state db reconcile_rollout upsert failed {}: {err}",
rollout_path.display()
);
return;
}
if let Ok(meta_line) = crate::rollout::list::read_session_meta_line(rollout_path).await {
replace_dynamic_tools(
Some(ctx),
meta_line.meta.id,
meta_line.meta.dynamic_tools.as_deref(),
"reconcile_rollout",
)
.await;
} else {
warn!(
"state db reconcile_rollout missing session meta {}",
rollout_path.display()
);
}
}

View File

@@ -1,6 +1,7 @@
use anyhow::Result;
use codex_core::features::Feature;
use codex_protocol::ThreadId;
use codex_protocol::dynamic_tools::DynamicToolSpec;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
@@ -74,6 +75,28 @@ async fn backfill_scans_existing_rollouts() -> Result<()> {
let rollout_rel_path = format!("sessions/2026/01/27/rollout-2026-01-27T12-00-00-{uuid}.jsonl");
let rollout_rel_path_for_hook = rollout_rel_path.clone();
let dynamic_tools = vec![
DynamicToolSpec {
name: "geo_lookup".to_string(),
description: "lookup a city".to_string(),
input_schema: json!({
"type": "object",
"required": ["city"],
"properties": { "city": { "type": "string" } }
}),
},
DynamicToolSpec {
name: "weather_lookup".to_string(),
description: "lookup weather".to_string(),
input_schema: json!({
"type": "object",
"required": ["zip"],
"properties": { "zip": { "type": "string" } }
}),
},
];
let dynamic_tools_for_hook = dynamic_tools.clone();
let mut builder = test_codex()
.with_pre_build_hook(move |codex_home| {
let rollout_path = codex_home.join(&rollout_rel_path_for_hook);
@@ -81,7 +104,6 @@ async fn backfill_scans_existing_rollouts() -> Result<()> {
.parent()
.expect("rollout path should have parent");
fs::create_dir_all(parent).expect("should create rollout directory");
let session_meta_line = SessionMetaLine {
meta: SessionMeta {
id: thread_id,
@@ -93,7 +115,7 @@ async fn backfill_scans_existing_rollouts() -> Result<()> {
source: SessionSource::default(),
model_provider: None,
base_instructions: None,
dynamic_tools: None,
dynamic_tools: Some(dynamic_tools_for_hook),
},
git: None,
};
@@ -155,6 +177,12 @@ async fn backfill_scans_existing_rollouts() -> Result<()> {
assert_eq!(metadata.model_provider, default_provider);
assert!(metadata.has_user_event);
let stored_tools = db
.get_dynamic_tools(thread_id)
.await?
.expect("dynamic tools should be stored");
assert_eq!(stored_tools, dynamic_tools);
Ok(())
}

View File

@@ -0,0 +1,11 @@
CREATE TABLE thread_dynamic_tools (
thread_id TEXT NOT NULL,
position INTEGER NOT NULL,
name TEXT NOT NULL,
description TEXT NOT NULL,
input_schema TEXT NOT NULL,
PRIMARY KEY(thread_id, position),
FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
);
CREATE INDEX idx_thread_dynamic_tools_thread ON thread_dynamic_tools(thread_id);

View File

@@ -16,7 +16,9 @@ use chrono::DateTime;
use chrono::Utc;
use codex_otel::OtelManager;
use codex_protocol::ThreadId;
use codex_protocol::dynamic_tools::DynamicToolSpec;
use codex_protocol::protocol::RolloutItem;
use serde_json::Value;
use sqlx::QueryBuilder;
use sqlx::Row;
use sqlx::Sqlite;
@@ -115,6 +117,38 @@ WHERE id = ?
.transpose()
}
/// Get dynamic tools for a thread, if present.
pub async fn get_dynamic_tools(
&self,
thread_id: ThreadId,
) -> anyhow::Result<Option<Vec<DynamicToolSpec>>> {
let rows = sqlx::query(
r#"
SELECT name, description, input_schema
FROM thread_dynamic_tools
WHERE thread_id = ?
ORDER BY position ASC
"#,
)
.bind(thread_id.to_string())
.fetch_all(self.pool.as_ref())
.await?;
if rows.is_empty() {
return Ok(None);
}
let mut tools = Vec::with_capacity(rows.len());
for row in rows {
let input_schema: String = row.try_get("input_schema")?;
let input_schema = serde_json::from_str::<Value>(input_schema.as_str())?;
tools.push(DynamicToolSpec {
name: row.try_get("name")?,
description: row.try_get("description")?,
input_schema,
});
}
Ok(Some(tools))
}
/// Find a rollout path by thread id using the underlying database.
pub async fn find_rollout_path_by_id(
&self,
@@ -367,6 +401,46 @@ ON CONFLICT(id) DO UPDATE SET
Ok(())
}
/// Replace dynamic tools for a thread.
pub async fn replace_dynamic_tools(
&self,
thread_id: ThreadId,
tools: Option<&[DynamicToolSpec]>,
) -> anyhow::Result<()> {
let mut tx = self.pool.begin().await?;
let thread_id = thread_id.to_string();
sqlx::query("DELETE FROM thread_dynamic_tools WHERE thread_id = ?")
.bind(thread_id.as_str())
.execute(&mut *tx)
.await?;
if let Some(tools) = tools {
for (idx, tool) in tools.iter().enumerate() {
let position = i64::try_from(idx).unwrap_or(i64::MAX);
let input_schema = serde_json::to_string(&tool.input_schema)?;
sqlx::query(
r#"
INSERT INTO thread_dynamic_tools (
thread_id,
position,
name,
description,
input_schema
) VALUES (?, ?, ?, ?, ?)
"#,
)
.bind(thread_id.as_str())
.bind(position)
.bind(tool.name.as_str())
.bind(tool.description.as_str())
.bind(input_schema)
.execute(&mut *tx)
.await?;
}
}
tx.commit().await?;
Ok(())
}
/// Apply rollout items incrementally using the underlying database.
pub async fn apply_rollout_items(
&self,
@@ -388,12 +462,25 @@ ON CONFLICT(id) DO UPDATE SET
if let Some(updated_at) = file_modified_time_utc(builder.rollout_path.as_path()).await {
metadata.updated_at = updated_at;
}
// Keep the thread upsert before dynamic tools to satisfy the foreign key constraint:
// thread_dynamic_tools.thread_id -> threads.id.
if let Err(err) = self.upsert_thread(&metadata).await {
if let Some(otel) = otel {
otel.counter(DB_ERROR_METRIC, 1, &[("stage", "apply_rollout_items")]);
}
return Err(err);
}
let dynamic_tools = extract_dynamic_tools(items);
if let Some(dynamic_tools) = dynamic_tools
&& let Err(err) = self
.replace_dynamic_tools(builder.id, dynamic_tools.as_deref())
.await
{
if let Some(otel) = otel {
otel.counter(DB_ERROR_METRIC, 1, &[("stage", "replace_dynamic_tools")]);
}
return Err(err);
}
Ok(())
}
@@ -505,6 +592,16 @@ fn push_like_filters<'a>(
builder.push(")");
}
fn extract_dynamic_tools(items: &[RolloutItem]) -> Option<Option<Vec<DynamicToolSpec>>> {
items.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.dynamic_tools.clone()),
RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
})
}
async fn open_sqlite(path: &Path) -> anyhow::Result<SqlitePool> {
let options = SqliteConnectOptions::new()
.filename(path)