Compare commits

...

8 Commits

Author SHA1 Message Date
Eric Traut
57f89d3c39 codex: address dynamic tool cwd race 2026-03-17 16:56:10 -06:00
Eric Traut
c422ccd890 codex: fix CI failure on PR #14954 2026-03-17 16:40:49 -06:00
Eric Traut
383fee24fc codex: fix CI failure on PR #14954 2026-03-17 16:40:49 -06:00
Eric Traut
806addef65 codex: fix rebased tui_app_server request handling 2026-03-17 16:40:49 -06:00
Eric Traut
6e97a343ab codex: address PR review feedback (#14954) 2026-03-17 16:40:49 -06:00
Eric Traut
eb11b49fd2 codex: fix CI failure on PR #14954 2026-03-17 16:40:49 -06:00
Eric Traut
eb493a8876 codex: address PR review feedback (#14954) 2026-03-17 16:40:49 -06:00
Eric Traut
0c267abf83 Add dynamic tool support to tui_app_server 2026-03-17 16:40:49 -06:00
10 changed files with 1437 additions and 29 deletions

3
codex-rs/Cargo.lock generated
View File

@@ -2585,6 +2585,7 @@ name = "codex-tui-app-server"
version = "0.0.0"
dependencies = [
"anyhow",
"app_test_support",
"arboard",
"assert_matches",
"base64 0.22.1",
@@ -2619,6 +2620,7 @@ dependencies = [
"codex-utils-string",
"codex-windows-sandbox",
"color-eyre",
"core_test_support",
"cpal",
"crossterm",
"derive_more 2.1.1",
@@ -2668,6 +2670,7 @@ dependencies = [
"which",
"windows-sys 0.52.0",
"winsplit",
"wiremock",
]
[[package]]

View File

@@ -737,6 +737,60 @@ impl InProcessAppServerRequestHandle {
serde_json::from_value(result)
.map_err(|source| TypedRequestError::Deserialize { method, source })
}
pub async fn resolve_server_request(
&self,
request_id: RequestId,
result: JsonRpcResult,
) -> IoResult<()> {
let (response_tx, response_rx) = oneshot::channel();
self.command_tx
.send(ClientCommand::ResolveServerRequest {
request_id,
result,
response_tx,
})
.await
.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server worker channel is closed",
)
})?;
response_rx.await.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server resolve channel is closed",
)
})?
}
pub async fn reject_server_request(
&self,
request_id: RequestId,
error: JSONRPCErrorError,
) -> IoResult<()> {
let (response_tx, response_rx) = oneshot::channel();
self.command_tx
.send(ClientCommand::RejectServerRequest {
request_id,
error,
response_tx,
})
.await
.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server worker channel is closed",
)
})?;
response_rx.await.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server reject channel is closed",
)
})?
}
}
impl AppServerRequestHandle {
@@ -756,6 +810,28 @@ impl AppServerRequestHandle {
Self::Remote(handle) => handle.request_typed(request).await,
}
}
pub async fn resolve_server_request(
&self,
request_id: RequestId,
result: JsonRpcResult,
) -> IoResult<()> {
match self {
Self::InProcess(handle) => handle.resolve_server_request(request_id, result).await,
Self::Remote(handle) => handle.resolve_server_request(request_id, result).await,
}
}
pub async fn reject_server_request(
&self,
request_id: RequestId,
error: JSONRPCErrorError,
) -> IoResult<()> {
match self {
Self::InProcess(handle) => handle.reject_server_request(request_id, error).await,
Self::Remote(handle) => handle.reject_server_request(request_id, error).await,
}
}
}
impl AppServerClient {

View File

@@ -631,6 +631,60 @@ impl RemoteAppServerRequestHandle {
serde_json::from_value(result)
.map_err(|source| TypedRequestError::Deserialize { method, source })
}
pub async fn resolve_server_request(
&self,
request_id: RequestId,
result: JsonRpcResult,
) -> IoResult<()> {
let (response_tx, response_rx) = oneshot::channel();
self.command_tx
.send(RemoteClientCommand::ResolveServerRequest {
request_id,
result,
response_tx,
})
.await
.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server worker channel is closed",
)
})?;
response_rx.await.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server resolve channel is closed",
)
})?
}
pub async fn reject_server_request(
&self,
request_id: RequestId,
error: JSONRPCErrorError,
) -> IoResult<()> {
let (response_tx, response_rx) = oneshot::channel();
self.command_tx
.send(RemoteClientCommand::RejectServerRequest {
request_id,
error,
response_tx,
})
.await
.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server worker channel is closed",
)
})?;
response_rx.await.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"remote app-server reject channel is closed",
)
})?
}
}
async fn initialize_remote_connection(

View File

@@ -135,8 +135,10 @@ arboard = { workspace = true }
[dev-dependencies]
app_test_support = { path = "../app-server/tests/common" }
codex-cli = { workspace = true }
codex-core = { workspace = true }
core_test_support = { path = "../core/tests/common" }
codex-utils-cargo-bin = { workspace = true }
codex-utils-pty = { workspace = true }
assert_matches = { workspace = true }
@@ -147,3 +149,4 @@ rand = { workspace = true }
serial_test = { workspace = true }
vt100 = { workspace = true }
uuid = { workspace = true }
wiremock = { workspace = true }

View File

@@ -4680,6 +4680,7 @@ mod tests {
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::time::Duration;
use tempfile::tempdir;
use tokio::time;

View File

@@ -16,6 +16,7 @@ use crate::app_event::AppEvent;
use crate::app_server_session::AppServerSession;
use crate::app_server_session::app_server_rate_limit_snapshot_to_core;
use crate::app_server_session::status_account_display_from_auth_mode;
use crate::dynamic_tools::spawn_dynamic_tool_call_task;
use crate::local_chatgpt_auth::load_local_chatgpt_auth;
use codex_app_server_client::AppServerEvent;
use codex_app_server_protocol::ChatgptAuthTokensRefreshParams;
@@ -163,6 +164,13 @@ impl App {
.await;
return;
}
if let ServerRequest::DynamicToolCall { request_id, params } = request {
let registry = app_server_client.dynamic_tool_registry();
let context =
app_server_client.dynamic_tool_execution_context(&params.thread_id);
spawn_dynamic_tool_call_task(registry, context, request_id, params);
return;
}
if let Some(unsupported) = self
.pending_app_server_requests
.note_server_request(&request)
@@ -858,14 +866,31 @@ fn app_server_codex_error_info_to_core(
#[cfg(test)]
mod tests {
use super::App;
use super::server_notification_thread_events;
use super::thread_snapshot_events;
use super::turn_snapshot_events;
use crate::app::AgentNavigationState;
use crate::app::PendingAppServerRequests;
use crate::app::WindowsSandboxState;
use crate::app_backtrack::BacktrackState;
use crate::chatwidget::tests::make_chatwidget_manual_with_sender;
use crate::dynamic_tools::DynamicToolRegistration;
use crate::dynamic_tools::DynamicToolRegistry;
use crate::file_search::FileSearchManager;
use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::create_mock_responses_server_sequence_unchecked;
use codex_app_server_client::AppServerClient;
use codex_app_server_client::AppServerEvent;
use codex_app_server_protocol::AgentMessageDeltaNotification;
use codex_app_server_protocol::CodexErrorInfo;
use codex_app_server_protocol::DynamicToolCallOutputContentItem;
use codex_app_server_protocol::DynamicToolCallResponse;
use codex_app_server_protocol::DynamicToolSpec;
use codex_app_server_protocol::ItemCompletedNotification;
use codex_app_server_protocol::ReasoningSummaryTextDeltaNotification;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::Thread;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadStatus;
@@ -873,6 +898,13 @@ mod tests {
use codex_app_server_protocol::TurnCompletedNotification;
use codex_app_server_protocol::TurnError;
use codex_app_server_protocol::TurnStatus;
use codex_arg0::Arg0DispatchPaths;
use codex_core::config::Config;
use codex_core::config::ConfigBuilder;
use codex_core::config::ConfigOverrides;
use codex_core::config_loader::CloudRequirementsLoader;
use codex_core::config_loader::LoaderOverrides;
use codex_otel::SessionTelemetry;
use codex_protocol::ThreadId;
use codex_protocol::items::AgentMessageContent;
use codex_protocol::items::AgentMessageItem;
@@ -882,8 +914,294 @@ mod tests {
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnAbortedEvent;
use codex_protocol::user_input::UserInput;
use core_test_support::responses;
use pretty_assertions::assert_eq;
use serde_json::Value;
use serde_json::json;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::time::Duration;
use tempfile::TempDir;
use tokio::time::timeout;
use wiremock::MockServer;
use crate::app::FeedbackAudience;
async fn make_test_app() -> App {
let (chat_widget, app_event_tx, _rx, _op_rx) = make_chatwidget_manual_with_sender().await;
let config = chat_widget.config_ref().clone();
let file_search = FileSearchManager::new(config.cwd.clone(), app_event_tx.clone());
let model = codex_core::test_support::get_model_offline(config.model.as_deref());
let model_info =
codex_core::test_support::construct_model_info_offline(model.as_str(), &config);
let session_telemetry = SessionTelemetry::new(
ThreadId::new(),
model.as_str(),
model_info.slug.as_str(),
None,
None,
None,
codex_core::default_client::originator().value,
config.otel.log_user_prompt,
codex_core::terminal::user_agent(),
SessionSource::Cli,
);
App {
model_catalog: chat_widget.model_catalog(),
session_telemetry,
app_event_tx,
chat_widget,
config,
active_profile: None,
cli_kv_overrides: Vec::new(),
harness_overrides: ConfigOverrides::default(),
runtime_approval_policy_override: None,
runtime_sandbox_policy_override: None,
file_search,
transcript_cells: Vec::new(),
overlay: None,
deferred_history_lines: Vec::new(),
has_emitted_history_lines: false,
enhanced_keys_supported: false,
commit_anim_running: Arc::new(AtomicBool::new(false)),
status_line_invalid_items_warned: Arc::new(AtomicBool::new(false)),
backtrack: BacktrackState::default(),
backtrack_render_pending: false,
feedback: codex_feedback::CodexFeedback::new(),
feedback_audience: FeedbackAudience::External,
remote_app_server_url: None,
pending_update_action: None,
suppress_shutdown_complete: false,
pending_shutdown_exit_thread_id: None,
windows_sandbox: WindowsSandboxState::default(),
thread_event_channels: HashMap::new(),
thread_event_listener_tasks: HashMap::new(),
agent_navigation: AgentNavigationState::default(),
active_thread_id: None,
active_thread_rx: None,
primary_thread_id: None,
primary_session_configured: None,
pending_primary_events: VecDeque::new(),
pending_app_server_requests: PendingAppServerRequests::default(),
}
}
fn mock_provider_cli_overrides(server_uri: &str) -> Vec<(String, toml::Value)> {
vec![
(
"model".to_string(),
toml::Value::String("mock-model".to_string()),
),
(
"model_provider".to_string(),
toml::Value::String("mock_provider".to_string()),
),
(
"approval_policy".to_string(),
toml::Value::String("never".to_string()),
),
(
"sandbox_mode".to_string(),
toml::Value::String("read-only".to_string()),
),
(
"model_providers.mock_provider.name".to_string(),
toml::Value::String("Mock provider for test".to_string()),
),
(
"model_providers.mock_provider.base_url".to_string(),
toml::Value::String(format!("{server_uri}/v1")),
),
(
"model_providers.mock_provider.wire_api".to_string(),
toml::Value::String("responses".to_string()),
),
(
"model_providers.mock_provider.request_max_retries".to_string(),
toml::Value::Integer(0),
),
(
"model_providers.mock_provider.stream_max_retries".to_string(),
toml::Value::Integer(0),
),
]
}
async fn build_dynamic_tool_test_session(
temp_dir: &TempDir,
cli_overrides: Vec<(String, toml::Value)>,
dynamic_tools: Arc<DynamicToolRegistry>,
) -> color_eyre::Result<(crate::app_server_session::AppServerSession, Config)> {
let config = ConfigBuilder::default()
.codex_home(temp_dir.path().to_path_buf())
.cli_overrides(cli_overrides.clone())
.build()
.await?;
let client = crate::start_embedded_app_server(
Arg0DispatchPaths::default(),
config.clone(),
cli_overrides,
LoaderOverrides::default(),
CloudRequirementsLoader::default(),
codex_feedback::CodexFeedback::new(),
)
.await?;
Ok((
crate::app_server_session::AppServerSession::new_with_dynamic_tools(
AppServerClient::InProcess(client),
dynamic_tools,
),
config,
))
}
async fn responses_bodies(server: &MockServer) -> color_eyre::Result<Vec<Value>> {
let mut bodies = Vec::new();
for request in server
.received_requests()
.await
.expect("requests should be readable")
{
if request.url.path().ends_with("/responses") {
bodies.push(
request
.body_json::<Value>()
.expect("request body should be json"),
);
}
}
Ok(bodies)
}
fn function_call_output_text(body: &Value, call_id: &str) -> Option<String> {
body.get("input")
.and_then(Value::as_array)
.and_then(|items| {
items.iter().find(|item| {
item.get("type").and_then(Value::as_str) == Some("function_call_output")
&& item.get("call_id").and_then(Value::as_str) == Some(call_id)
})
})
.and_then(|item| item.get("output"))
.and_then(|output| {
serde_json::from_value::<codex_protocol::models::FunctionCallOutputPayload>(
output.clone(),
)
.ok()
})
.and_then(|payload| match payload.body {
codex_protocol::models::FunctionCallOutputBody::Text(text) => Some(text),
codex_protocol::models::FunctionCallOutputBody::ContentItems(_) => None,
})
}
#[tokio::test]
async fn handle_app_server_event_resolves_dynamic_tool_calls_via_request_handle()
-> color_eyre::Result<()> {
let call_id = "dyn-call-1";
let tool_name = "demo_tool";
let tool_args = json!({ "city": "Paris" });
let tool_call_arguments = serde_json::to_string(&tool_args)?;
let responses = vec![
responses::sse(vec![
responses::ev_response_created("resp-1"),
responses::ev_function_call(call_id, tool_name, &tool_call_arguments),
responses::ev_completed("resp-1"),
]),
create_final_assistant_message_sse_response("Done")
.map_err(|err| color_eyre::eyre::eyre!(err))?,
];
let server = create_mock_responses_server_sequence_unchecked(responses).await;
let temp_dir = TempDir::new()?;
let cli_overrides = mock_provider_cli_overrides(&server.uri());
let registry = Arc::new(DynamicToolRegistry::from_registrations(vec![
DynamicToolRegistration::new(
DynamicToolSpec {
name: tool_name.to_string(),
description: "dynamic tool".to_string(),
input_schema: json!({
"type": "object",
"properties": {
"city": { "type": "string" }
},
"required": ["city"],
"additionalProperties": false,
}),
defer_loading: false,
},
|_context, _params| async move {
Ok(DynamicToolCallResponse {
content_items: vec![DynamicToolCallOutputContentItem::InputText {
text: _context
.cwd()
.expect("thread cwd should be present")
.display()
.to_string(),
}],
success: true,
})
},
),
]));
let (mut session, config) =
build_dynamic_tool_test_session(&temp_dir, cli_overrides, registry).await?;
let started = session.start_thread(&config).await?;
session
.turn_start(
started.session_configured.session_id,
vec![UserInput::Text {
text: "Run the tool".to_string(),
text_elements: Vec::new(),
}],
config.cwd.clone(),
config.permissions.approval_policy.value(),
config.approvals_reviewer,
config.permissions.sandbox_policy.get().clone(),
config
.model
.clone()
.expect("mock model should be configured"),
None,
None,
None,
None,
None,
None,
)
.await?;
let mut app = make_test_app().await;
let mut saw_dynamic_tool_request = false;
loop {
let event = timeout(Duration::from_secs(10), session.next_event())
.await?
.expect("app-server event stream should stay open");
if matches!(
&event,
AppServerEvent::ServerRequest(ServerRequest::DynamicToolCall { .. })
) {
saw_dynamic_tool_request = true;
}
app.handle_app_server_event(&session, event).await;
if let Some(text) = responses_bodies(&server)
.await?
.iter()
.find_map(|body| function_call_output_text(body, call_id))
{
assert_eq!(text, config.cwd.display().to_string());
break;
}
}
assert!(saw_dynamic_tool_request);
Ok(())
}
#[test]
fn bridges_completed_agent_messages_from_server_notifications() {

View File

@@ -92,13 +92,7 @@ impl PendingAppServerRequests {
}
None
}
ServerRequest::DynamicToolCall { request_id, .. } => {
Some(UnsupportedAppServerRequest {
request_id: request_id.clone(),
message: "Dynamic tool calls are not available in app-server TUI yet."
.to_string(),
})
}
ServerRequest::DynamicToolCall { .. } => None,
ServerRequest::ChatgptAuthTokensRefresh { .. } => None,
ServerRequest::ApplyPatchApproval { request_id, .. } => {
Some(UnsupportedAppServerRequest {
@@ -580,10 +574,10 @@ mod tests {
}
#[test]
fn rejects_dynamic_tool_calls_as_unsupported() {
fn does_not_track_dynamic_tool_calls_in_pending_request_map() {
let mut pending = PendingAppServerRequests::default();
let unsupported = pending
.note_server_request(&ServerRequest::DynamicToolCall {
assert_eq!(
pending.note_server_request(&ServerRequest::DynamicToolCall {
request_id: AppServerRequestId::Integer(99),
params: codex_app_server_protocol::DynamicToolCallParams {
thread_id: "thread-1".to_string(),
@@ -592,13 +586,8 @@ mod tests {
tool: "tool".to_string(),
arguments: json!({}),
},
})
.expect("dynamic tool calls should be rejected");
assert_eq!(unsupported.request_id, AppServerRequestId::Integer(99));
assert_eq!(
unsupported.message,
"Dynamic tool calls are not available in app-server TUI yet."
}),
None
);
}

View File

@@ -4,6 +4,7 @@ use codex_app_server_client::AppServerRequestHandle;
use codex_app_server_protocol::Account;
use codex_app_server_protocol::AuthMode;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::DynamicToolSpec;
use codex_app_server_protocol::GetAccountParams;
use codex_app_server_protocol::GetAccountRateLimitsResponse;
use codex_app_server_protocol::GetAccountResponse;
@@ -75,8 +76,12 @@ use color_eyre::eyre::Result;
use color_eyre::eyre::WrapErr;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::RwLock;
use crate::bottom_pane::FeedbackAudience;
use crate::dynamic_tools::DynamicToolExecutionContext;
use crate::dynamic_tools::DynamicToolRegistry;
use crate::status::StatusAccountDisplay;
pub(crate) struct AppServerBootstrap {
@@ -94,6 +99,8 @@ pub(crate) struct AppServerBootstrap {
pub(crate) struct AppServerSession {
client: AppServerClient,
dynamic_tools: Arc<DynamicToolRegistry>,
thread_cwds: RwLock<HashMap<String, PathBuf>>,
next_request_id: i64,
}
@@ -128,8 +135,17 @@ pub(crate) struct AppServerStartedThread {
impl AppServerSession {
pub(crate) fn new(client: AppServerClient) -> Self {
Self::new_with_dynamic_tools(client, Arc::new(DynamicToolRegistry::tui_owned()))
}
pub(crate) fn new_with_dynamic_tools(
client: AppServerClient,
dynamic_tools: Arc<DynamicToolRegistry>,
) -> Self {
Self {
client,
dynamic_tools,
thread_cwds: RwLock::new(HashMap::new()),
next_request_id: 1,
}
}
@@ -262,11 +278,19 @@ impl AppServerSession {
.client
.request_typed(ClientRequest::ThreadStart {
request_id,
params: thread_start_params_from_config(config, self.thread_params_mode()),
params: thread_start_params_from_config(
config,
self.thread_params_mode(),
self.dynamic_tools.specs(),
),
})
.await
.wrap_err("thread/start failed during TUI bootstrap")?;
started_thread_from_start_response(response)
let thread_id = response.thread.id.clone();
let cwd = response.cwd.clone();
let started = started_thread_from_start_response(response)?;
self.remember_thread_cwd(thread_id, cwd);
Ok(started)
}
pub(crate) async fn resume_thread(
@@ -288,7 +312,11 @@ impl AppServerSession {
})
.await
.wrap_err("thread/resume failed during TUI bootstrap")?;
started_thread_from_resume_response(response, show_raw_agent_reasoning)
let thread_id = response.thread.id.clone();
let cwd = response.cwd.clone();
let started = started_thread_from_resume_response(response, show_raw_agent_reasoning)?;
self.remember_thread_cwd(thread_id, cwd);
Ok(started)
}
pub(crate) async fn fork_thread(
@@ -310,7 +338,11 @@ impl AppServerSession {
})
.await
.wrap_err("thread/fork failed during TUI bootstrap")?;
started_thread_from_fork_response(response, show_raw_agent_reasoning)
let thread_id = response.thread.id.clone();
let cwd = response.cwd.clone();
let started = started_thread_from_fork_response(response, show_raw_agent_reasoning)?;
self.remember_thread_cwd(thread_id, cwd);
Ok(started)
}
fn thread_params_mode(&self) -> ThreadParamsMode {
@@ -369,13 +401,16 @@ impl AppServerSession {
output_schema: Option<serde_json::Value>,
) -> Result<TurnStartResponse> {
let request_id = self.next_request_id();
self.client
let request_cwd = cwd.clone();
self.remember_thread_cwd(thread_id.to_string(), cwd);
let response = self
.client
.request_typed(ClientRequest::TurnStart {
request_id,
params: TurnStartParams {
thread_id: thread_id.to_string(),
input: items.into_iter().map(Into::into).collect(),
cwd: Some(cwd),
cwd: Some(request_cwd),
approval_policy: Some(approval_policy.into()),
approvals_reviewer: Some(approvals_reviewer.into()),
sandbox_policy: Some(sandbox_policy.into()),
@@ -389,7 +424,8 @@ impl AppServerSession {
},
})
.await
.wrap_err("turn/start failed in app-server TUI")
.wrap_err("turn/start failed in app-server TUI")?;
Ok(response)
}
pub(crate) async fn turn_interrupt(
@@ -648,11 +684,38 @@ impl AppServerSession {
self.client.request_handle()
}
pub(crate) fn dynamic_tool_registry(&self) -> Arc<DynamicToolRegistry> {
Arc::clone(&self.dynamic_tools)
}
pub(crate) fn dynamic_tool_execution_context(
&self,
thread_id: &str,
) -> DynamicToolExecutionContext {
DynamicToolExecutionContext::new(self.request_handle(), self.thread_cwd(thread_id))
}
fn next_request_id(&mut self) -> RequestId {
let request_id = self.next_request_id;
self.next_request_id += 1;
RequestId::Integer(request_id)
}
fn remember_thread_cwd(&self, thread_id: String, cwd: PathBuf) {
match self.thread_cwds.write() {
Ok(mut thread_cwds) => {
thread_cwds.insert(thread_id, cwd);
}
Err(err) => panic!("thread cwd map lock should not be poisoned: {err}"),
}
}
fn thread_cwd(&self, thread_id: &str) -> Option<PathBuf> {
match self.thread_cwds.read() {
Ok(thread_cwds) => thread_cwds.get(thread_id).cloned(),
Err(err) => panic!("thread cwd map lock should not be poisoned: {err}"),
}
}
}
fn title_case(s: &str) -> String {
@@ -773,6 +836,7 @@ fn sandbox_mode_from_policy(
fn thread_start_params_from_config(
config: &Config,
thread_params_mode: ThreadParamsMode,
dynamic_tools: Option<Vec<DynamicToolSpec>>,
) -> ThreadStartParams {
ThreadStartParams {
model: config.model.clone(),
@@ -782,6 +846,7 @@ fn thread_start_params_from_config(
approvals_reviewer: approvals_reviewer_override_from_config(config),
sandbox: sandbox_mode_from_policy(config.permissions.sandbox_policy.get().clone()),
config: config_request_overrides_from_config(config),
dynamic_tools,
ephemeral: Some(config.ephemeral),
persist_extended_history: true,
..ThreadStartParams::default()
@@ -1038,39 +1103,259 @@ fn app_server_credits_snapshot_to_core(
#[cfg(test)]
mod tests {
use super::*;
use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::create_mock_responses_server_sequence_unchecked;
use codex_app_server_client::AppServerClient;
use codex_app_server_client::AppServerEvent;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::DynamicToolCallOutputContentItem;
use codex_app_server_protocol::DynamicToolCallParams;
use codex_app_server_protocol::DynamicToolCallResponse;
use codex_app_server_protocol::DynamicToolSpec;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadStatus;
use codex_app_server_protocol::Turn;
use codex_app_server_protocol::TurnStatus;
use codex_arg0::Arg0DispatchPaths;
use codex_core::config::ConfigBuilder;
use codex_core::config_loader::CloudRequirementsLoader;
use codex_core::config_loader::LoaderOverrides;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::user_input::UserInput;
use core_test_support::responses;
use pretty_assertions::assert_eq;
use serde_json::Value;
use serde_json::json;
use std::sync::Arc;
use std::time::Duration;
use tempfile::TempDir;
use tokio::time::timeout;
use wiremock::MockServer;
async fn build_config(temp_dir: &TempDir) -> Config {
use crate::dynamic_tools::DynamicToolExecutionError;
use crate::dynamic_tools::DynamicToolRegistration;
use crate::dynamic_tools::DynamicToolRegistry;
use crate::dynamic_tools::handle_dynamic_tool_call_request;
const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(10);
fn mock_provider_cli_overrides(server_uri: &str) -> Vec<(String, toml::Value)> {
vec![
(
"model".to_string(),
toml::Value::String("mock-model".to_string()),
),
(
"model_provider".to_string(),
toml::Value::String("mock_provider".to_string()),
),
(
"approval_policy".to_string(),
toml::Value::String("never".to_string()),
),
(
"sandbox_mode".to_string(),
toml::Value::String("read-only".to_string()),
),
(
"model_providers.mock_provider.name".to_string(),
toml::Value::String("Mock provider for test".to_string()),
),
(
"model_providers.mock_provider.base_url".to_string(),
toml::Value::String(format!("{server_uri}/v1")),
),
(
"model_providers.mock_provider.wire_api".to_string(),
toml::Value::String("responses".to_string()),
),
(
"model_providers.mock_provider.request_max_retries".to_string(),
toml::Value::Integer(0),
),
(
"model_providers.mock_provider.stream_max_retries".to_string(),
toml::Value::Integer(0),
),
]
}
async fn build_config(temp_dir: &TempDir, cli_overrides: Vec<(String, toml::Value)>) -> Config {
ConfigBuilder::default()
.codex_home(temp_dir.path().to_path_buf())
.cli_overrides(cli_overrides)
.build()
.await
.expect("config should build")
}
async fn start_test_session(
config: Config,
cli_overrides: Vec<(String, toml::Value)>,
dynamic_tools: Arc<DynamicToolRegistry>,
) -> Result<AppServerSession> {
let client = crate::start_embedded_app_server(
Arg0DispatchPaths::default(),
config,
cli_overrides,
LoaderOverrides::default(),
CloudRequirementsLoader::default(),
codex_feedback::CodexFeedback::new(),
)
.await?;
Ok(AppServerSession::new_with_dynamic_tools(
AppServerClient::InProcess(client),
dynamic_tools,
))
}
fn demo_tool_spec(name: &str) -> DynamicToolSpec {
DynamicToolSpec {
name: name.to_string(),
description: format!("dynamic tool {name}"),
input_schema: json!({
"type": "object",
"properties": {
"city": { "type": "string" }
},
"required": ["city"],
"additionalProperties": false,
}),
defer_loading: false,
}
}
async fn responses_bodies(server: &MockServer) -> Result<Vec<Value>> {
let mut bodies = Vec::new();
for request in server
.received_requests()
.await
.expect("requests should be readable")
{
if request.url.path().ends_with("/responses") {
bodies.push(
request
.body_json::<Value>()
.expect("request body should be json"),
);
}
}
Ok(bodies)
}
fn function_call_output_payload(
body: &Value,
call_id: &str,
) -> Option<FunctionCallOutputPayload> {
body.get("input")
.and_then(Value::as_array)
.and_then(|items| {
items.iter().find(|item| {
item.get("type").and_then(Value::as_str) == Some("function_call_output")
&& item.get("call_id").and_then(Value::as_str) == Some(call_id)
})
})
.and_then(|item| item.get("output"))
.cloned()
.and_then(|output| serde_json::from_value(output).ok())
}
async fn run_dynamic_tool_turn(
session: &mut AppServerSession,
config: &Config,
thread_id: ThreadId,
prompt: &str,
) -> Result<()> {
session
.turn_start(
thread_id,
vec![UserInput::Text {
text: prompt.to_string(),
text_elements: Vec::new(),
}],
config.cwd.clone(),
config.permissions.approval_policy.value(),
config.approvals_reviewer,
config.permissions.sandbox_policy.get().clone(),
config
.model
.clone()
.expect("mock model should be configured"),
None,
None,
None,
None,
None,
None,
)
.await?;
loop {
let event = timeout(DEFAULT_READ_TIMEOUT, session.next_event())
.await?
.expect("app-server event stream should stay open");
match event {
AppServerEvent::ServerRequest(ServerRequest::DynamicToolCall {
request_id,
params,
}) => {
handle_dynamic_tool_call_request(
session.dynamic_tool_registry(),
session.dynamic_tool_execution_context(&params.thread_id),
request_id,
params,
)
.await
.map_err(|err| color_eyre::eyre::eyre!(err))?;
}
AppServerEvent::ServerNotification(ServerNotification::TurnCompleted(_)) => {
return Ok(());
}
AppServerEvent::LegacyNotification(notification)
if notification.method.ends_with("task_complete") =>
{
return Ok(());
}
_ => {}
}
}
}
#[tokio::test]
async fn thread_start_params_include_cwd_for_embedded_sessions() {
let temp_dir = tempfile::tempdir().expect("tempdir");
let config = build_config(&temp_dir).await;
let config = build_config(&temp_dir, Vec::new()).await;
let params = thread_start_params_from_config(&config, ThreadParamsMode::Embedded);
let params = thread_start_params_from_config(&config, ThreadParamsMode::Embedded, None);
assert_eq!(params.cwd, Some(config.cwd.to_string_lossy().to_string()));
assert_eq!(params.model_provider, Some(config.model_provider_id));
}
#[tokio::test]
async fn thread_start_params_include_registered_dynamic_tools() {
let temp_dir = tempfile::tempdir().expect("tempdir");
let config = build_config(&temp_dir, Vec::new()).await;
let spec = demo_tool_spec("demo_tool");
let params = thread_start_params_from_config(
&config,
ThreadParamsMode::Embedded,
Some(vec![spec.clone()]),
);
assert_eq!(params.dynamic_tools, Some(vec![spec]));
}
#[tokio::test]
async fn thread_lifecycle_params_omit_local_overrides_for_remote_sessions() {
let temp_dir = tempfile::tempdir().expect("tempdir");
let config = build_config(&temp_dir).await;
let config = build_config(&temp_dir, Vec::new()).await;
let thread_id = ThreadId::new();
let start = thread_start_params_from_config(&config, ThreadParamsMode::Remote);
let start = thread_start_params_from_config(&config, ThreadParamsMode::Remote, None);
let resume =
thread_resume_params_from_config(config.clone(), thread_id, ThreadParamsMode::Remote);
let fork = thread_fork_params_from_config(config, thread_id, ThreadParamsMode::Remote);
@@ -1141,4 +1426,240 @@ mod tests {
assert_eq!(started.thread.turns.len(), 1);
assert_eq!(started.thread.turns[0].items.len(), 2);
}
#[tokio::test]
async fn dynamic_tool_turn_round_trip_resolves_registered_tools() -> Result<()> {
let call_id = "dyn-call-1";
let tool_name = "demo_tool";
let tool_args = json!({ "city": "Paris" });
let tool_call_arguments = serde_json::to_string(&tool_args)?;
let responses = vec![
responses::sse(vec![
responses::ev_response_created("resp-1"),
responses::ev_function_call(call_id, tool_name, &tool_call_arguments),
responses::ev_completed("resp-1"),
]),
create_final_assistant_message_sse_response("Done")
.map_err(|err| color_eyre::eyre::eyre!(err))?,
];
let server = create_mock_responses_server_sequence_unchecked(responses).await;
let codex_home = TempDir::new()?;
let cli_overrides = mock_provider_cli_overrides(&server.uri());
let config = build_config(&codex_home, cli_overrides.clone()).await;
let registry = Arc::new(DynamicToolRegistry::from_registrations(vec![
DynamicToolRegistration::new(
demo_tool_spec(tool_name),
|context, _params| async move {
Ok(DynamicToolCallResponse {
content_items: vec![DynamicToolCallOutputContentItem::InputText {
text: context
.cwd()
.expect("thread cwd should be present")
.display()
.to_string(),
}],
success: true,
})
},
),
]));
let mut session = start_test_session(config.clone(), cli_overrides, registry).await?;
let started = session.start_thread(&config).await?;
let thread_id = started.session_configured.session_id;
run_dynamic_tool_turn(&mut session, &config, thread_id, "Run the tool").await?;
let bodies = responses_bodies(&server).await?;
let payload = bodies
.iter()
.find_map(|body| function_call_output_payload(body, call_id))
.expect("expected function_call_output payload");
assert_eq!(
payload,
FunctionCallOutputPayload::from_text(config.cwd.display().to_string())
);
Ok(())
}
#[tokio::test]
async fn resumed_threads_can_reuse_registered_dynamic_tools() -> Result<()> {
let first_call_id = "dyn-call-1";
let second_call_id = "dyn-call-2";
let tool_name = "demo_tool";
let tool_args = json!({ "city": "Paris" });
let tool_call_arguments = serde_json::to_string(&tool_args)?;
let responses = vec![
responses::sse(vec![
responses::ev_response_created("resp-1"),
responses::ev_function_call(first_call_id, tool_name, &tool_call_arguments),
responses::ev_completed("resp-1"),
]),
create_final_assistant_message_sse_response("Done")
.map_err(|err| color_eyre::eyre::eyre!(err))?,
responses::sse(vec![
responses::ev_response_created("resp-2"),
responses::ev_function_call(second_call_id, tool_name, &tool_call_arguments),
responses::ev_completed("resp-2"),
]),
create_final_assistant_message_sse_response("Done again")
.map_err(|err| color_eyre::eyre::eyre!(err))?,
];
let server = create_mock_responses_server_sequence_unchecked(responses).await;
let codex_home = TempDir::new()?;
let cli_overrides = mock_provider_cli_overrides(&server.uri());
let config = build_config(&codex_home, cli_overrides.clone()).await;
let registry = Arc::new(DynamicToolRegistry::from_registrations(vec![
DynamicToolRegistration::new(
demo_tool_spec(tool_name),
|context, _params| async move {
Ok(DynamicToolCallResponse {
content_items: vec![DynamicToolCallOutputContentItem::InputText {
text: context
.cwd()
.expect("thread cwd should be present")
.display()
.to_string(),
}],
success: true,
})
},
),
]));
let mut first_session =
start_test_session(config.clone(), cli_overrides.clone(), Arc::clone(&registry))
.await?;
let started = first_session.start_thread(&config).await?;
let thread_id = started.session_configured.session_id;
run_dynamic_tool_turn(&mut first_session, &config, thread_id, "Run the tool").await?;
let rollout_path = first_session
.thread_read(thread_id, /* include_turns */ false)
.await?
.path
.expect("thread should expose a persisted rollout path");
first_session.shutdown().await?;
let resumed_config = build_config(&codex_home, cli_overrides.clone()).await;
let mut resumed_session =
start_test_session(resumed_config.clone(), cli_overrides, Arc::clone(&registry))
.await?;
let request_id = resumed_session.next_request_id();
let mut params = thread_resume_params_from_config(
resumed_config.clone(),
thread_id,
ThreadParamsMode::Embedded,
);
params.path = Some(rollout_path);
let _: ThreadResumeResponse = resumed_session
.client
.request_typed(ClientRequest::ThreadResume { request_id, params })
.await?;
run_dynamic_tool_turn(
&mut resumed_session,
&resumed_config,
thread_id,
"Run the tool again",
)
.await?;
let bodies = responses_bodies(&server).await?;
let first_payload = bodies
.iter()
.find_map(|body| function_call_output_payload(body, first_call_id))
.expect("expected first function_call_output payload");
let second_payload = bodies
.iter()
.find_map(|body| function_call_output_payload(body, second_call_id))
.expect("expected second function_call_output payload");
assert_eq!(
first_payload,
FunctionCallOutputPayload::from_text(config.cwd.display().to_string())
);
assert_eq!(
second_payload,
FunctionCallOutputPayload::from_text(resumed_config.cwd.display().to_string())
);
Ok(())
}
#[tokio::test]
async fn unknown_dynamic_tool_requests_are_rejected_cleanly() -> Result<()> {
let registry = Arc::new(DynamicToolRegistry::default());
let error = handle_dynamic_tool_call_request(
registry,
DynamicToolExecutionContext::for_tests(),
RequestId::Integer(1),
DynamicToolCallParams {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
call_id: "call-1".to_string(),
tool: "missing_tool".to_string(),
arguments: json!({}),
},
)
.await
.expect_err("missing request handle should fail before dispatch");
assert_eq!(
error,
"dynamic tool execution context is missing an app-server request handle"
);
Ok(())
}
#[tokio::test]
async fn failing_dynamic_tools_return_failed_responses() -> Result<()> {
let call_id = "dyn-call-1";
let tool_name = "demo_tool";
let tool_args = json!({ "city": "Paris" });
let tool_call_arguments = serde_json::to_string(&tool_args)?;
let responses = vec![
responses::sse(vec![
responses::ev_response_created("resp-1"),
responses::ev_function_call(call_id, tool_name, &tool_call_arguments),
responses::ev_completed("resp-1"),
]),
create_final_assistant_message_sse_response("Done")
.map_err(|err| color_eyre::eyre::eyre!(err))?,
];
let server = create_mock_responses_server_sequence_unchecked(responses).await;
let codex_home = TempDir::new()?;
let cli_overrides = mock_provider_cli_overrides(&server.uri());
let config = build_config(&codex_home, cli_overrides.clone()).await;
let registry = Arc::new(DynamicToolRegistry::from_registrations(vec![
DynamicToolRegistration::new(
demo_tool_spec(tool_name),
move |_context, _params| async move {
Err(DynamicToolExecutionError::failed(
tool_name,
"dynamic tool failed",
))
},
),
]));
let mut session = start_test_session(config.clone(), cli_overrides, registry).await?;
let started = session.start_thread(&config).await?;
run_dynamic_tool_turn(
&mut session,
&config,
started.session_configured.session_id,
"Run the tool",
)
.await?;
let bodies = responses_bodies(&server).await?;
let payload = bodies
.iter()
.find_map(|body| function_call_output_payload(body, call_id))
.expect("expected function_call_output payload");
assert_eq!(
payload,
FunctionCallOutputPayload::from_text("dynamic tool failed".to_string())
);
Ok(())
}
}

View File

@@ -0,0 +1,442 @@
use codex_app_server_client::AppServerRequestHandle;
use codex_app_server_protocol::DynamicToolCallOutputContentItem;
use codex_app_server_protocol::DynamicToolCallParams;
use codex_app_server_protocol::DynamicToolCallResponse;
use codex_app_server_protocol::DynamicToolSpec;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::RequestId;
use std::collections::HashMap;
use std::future::Future;
use std::path::Path;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use tokio::task::JoinError;
type DynamicToolFuture = Pin<
Box<dyn Future<Output = Result<DynamicToolCallResponse, DynamicToolExecutionError>> + Send>,
>;
type DynamicToolExecutor =
dyn Fn(DynamicToolExecutionContext, DynamicToolCallParams) -> DynamicToolFuture + Send + Sync;
pub(crate) struct DynamicToolRegistration {
spec: DynamicToolSpec,
executor: Arc<DynamicToolExecutor>,
}
impl DynamicToolRegistration {
#[cfg_attr(not(test), allow(dead_code))]
pub(crate) fn new<F, Fut>(spec: DynamicToolSpec, executor: F) -> Self
where
F: Fn(DynamicToolExecutionContext, DynamicToolCallParams) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<DynamicToolCallResponse, DynamicToolExecutionError>>
+ Send
+ 'static,
{
let executor = Arc::new(move |context, params| Box::pin(executor(context, params)) as _);
Self { spec, executor }
}
}
#[derive(Default)]
pub(crate) struct DynamicToolRegistry {
tools: HashMap<String, DynamicToolRegistration>,
}
impl DynamicToolRegistry {
pub(crate) fn tui_owned() -> Self {
Self::from_registrations(Vec::new())
}
pub(crate) fn from_registrations(registrations: Vec<DynamicToolRegistration>) -> Self {
let tools = registrations
.into_iter()
.map(|registration| (registration.spec.name.clone(), registration))
.collect();
Self { tools }
}
pub(crate) fn specs(&self) -> Option<Vec<DynamicToolSpec>> {
if self.tools.is_empty() {
return None;
}
let mut specs = self
.tools
.values()
.map(|registration| registration.spec.clone())
.collect::<Vec<_>>();
specs.sort_by(|left, right| left.name.cmp(&right.name));
Some(specs)
}
pub(crate) async fn execute(
&self,
context: DynamicToolExecutionContext,
params: DynamicToolCallParams,
) -> Result<DynamicToolCallResponse, DynamicToolExecutionError> {
let executor = self
.tools
.get(&params.tool)
.map(|registration| Arc::clone(&registration.executor))
.ok_or_else(|| DynamicToolExecutionError::UnknownTool {
tool: params.tool.clone(),
})?;
executor(context, params).await
}
}
#[derive(Clone)]
pub(crate) struct DynamicToolExecutionContext {
request_handle: Option<AppServerRequestHandle>,
#[cfg_attr(not(test), allow(dead_code))]
cwd: Option<PathBuf>,
}
impl DynamicToolExecutionContext {
pub(crate) fn new(request_handle: AppServerRequestHandle, cwd: Option<PathBuf>) -> Self {
Self {
request_handle: Some(request_handle),
cwd,
}
}
#[cfg(test)]
pub(crate) fn for_tests() -> Self {
Self {
request_handle: None,
cwd: None,
}
}
pub(crate) fn request_handle(&self) -> Option<&AppServerRequestHandle> {
self.request_handle.as_ref()
}
#[cfg_attr(not(test), allow(dead_code))]
pub(crate) fn cwd(&self) -> Option<&Path> {
self.cwd.as_deref()
}
}
#[cfg_attr(not(test), allow(dead_code))]
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum DynamicToolExecutionError {
UnknownTool { tool: String },
ExecutionFailed { tool: String, message: String },
}
impl DynamicToolExecutionError {
#[cfg_attr(not(test), allow(dead_code))]
pub(crate) fn failed(tool: impl Into<String>, message: impl Into<String>) -> Self {
Self::ExecutionFailed {
tool: tool.into(),
message: message.into(),
}
}
}
pub(crate) async fn handle_dynamic_tool_call_request(
registry: Arc<DynamicToolRegistry>,
context: DynamicToolExecutionContext,
request_id: RequestId,
params: DynamicToolCallParams,
) -> Result<(), String> {
let Some(request_handle) = context.request_handle().cloned() else {
return Err(
"dynamic tool execution context is missing an app-server request handle".to_string(),
);
};
match registry.execute(context, params.clone()).await {
Ok(response) => {
let result = serde_json::to_value(response).map_err(|err| {
format!(
"failed to serialize dynamic tool response for `{}`: {err}",
params.tool
)
})?;
request_handle
.resolve_server_request(request_id, result)
.await
.map_err(|err| {
format!(
"failed to resolve dynamic tool request for `{}`: {err}",
params.tool
)
})
}
Err(DynamicToolExecutionError::UnknownTool { tool }) => {
let message = format!("unknown dynamic tool `{tool}` for this TUI client");
request_handle
.reject_server_request(
request_id,
JSONRPCErrorError {
code: -32000,
message,
data: None,
},
)
.await
.map_err(|err| format!("failed to reject dynamic tool request for `{tool}`: {err}"))
}
Err(DynamicToolExecutionError::ExecutionFailed { tool, message }) => {
tracing::warn!(tool, %message, "dynamic tool executor failed");
let result =
serde_json::to_value(dynamic_tool_failure_response(&message)).map_err(|err| {
format!("failed to serialize fallback response for `{tool}`: {err}")
})?;
request_handle
.resolve_server_request(request_id, result)
.await
.map_err(|err| {
format!("failed to resolve fallback dynamic tool response for `{tool}`: {err}")
})
}
}
}
pub(crate) fn spawn_dynamic_tool_call_task(
registry: Arc<DynamicToolRegistry>,
context: DynamicToolExecutionContext,
request_id: RequestId,
params: DynamicToolCallParams,
) {
tokio::spawn(async move {
if let Err(err) =
supervise_dynamic_tool_call_task(registry, context, request_id, params).await
{
tracing::warn!("{err}");
}
});
}
async fn supervise_dynamic_tool_call_task(
registry: Arc<DynamicToolRegistry>,
context: DynamicToolExecutionContext,
request_id: RequestId,
params: DynamicToolCallParams,
) -> Result<(), String> {
let panic_context = context.clone();
let panic_request_id = request_id.clone();
let panic_tool = params.tool.clone();
supervise_spawned_task(
handle_dynamic_tool_call_request(registry, context, request_id, params),
move |join_err| async move {
reject_panicked_dynamic_tool_request(
panic_context,
panic_request_id,
panic_tool,
join_err,
)
.await
},
)
.await
}
async fn supervise_spawned_task<F, P, PF>(future: F, on_join_error: P) -> Result<(), String>
where
F: Future<Output = Result<(), String>> + Send + 'static,
P: FnOnce(JoinError) -> PF,
PF: Future<Output = Result<(), String>> + Send,
{
match tokio::spawn(future).await {
Ok(result) => result,
Err(err) => on_join_error(err).await,
}
}
async fn reject_panicked_dynamic_tool_request(
context: DynamicToolExecutionContext,
request_id: RequestId,
tool: String,
join_err: JoinError,
) -> Result<(), String> {
let Some(request_handle) = context.request_handle().cloned() else {
return Err(format!(
"dynamic tool `{tool}` task failed before producing a response: {join_err}"
));
};
let message = if join_err.is_panic() {
format!("dynamic tool `{tool}` panicked before producing a response")
} else {
format!("dynamic tool `{tool}` was cancelled before producing a response")
};
tracing::warn!(tool, error = %join_err, "dynamic tool task failed");
request_handle
.reject_server_request(
request_id,
JSONRPCErrorError {
code: -32000,
message,
data: None,
},
)
.await
.map_err(|err| format!("failed to reject dynamic tool request for `{tool}`: {err}"))
}
pub(crate) fn dynamic_tool_failure_response(message: &str) -> DynamicToolCallResponse {
DynamicToolCallResponse {
content_items: vec![DynamicToolCallOutputContentItem::InputText {
text: message.to_string(),
}],
success: false,
}
}
#[cfg(test)]
mod tests {
use super::DynamicToolExecutionContext;
use super::DynamicToolExecutionError;
use super::DynamicToolRegistration;
use super::DynamicToolRegistry;
use codex_app_server_protocol::DynamicToolCallOutputContentItem;
use codex_app_server_protocol::DynamicToolCallParams;
use codex_app_server_protocol::DynamicToolCallResponse;
use codex_app_server_protocol::DynamicToolSpec;
use pretty_assertions::assert_eq;
use serde_json::json;
use tokio::sync::oneshot;
fn demo_spec(name: &str) -> DynamicToolSpec {
DynamicToolSpec {
name: name.to_string(),
description: format!("dynamic tool {name}"),
input_schema: json!({
"type": "object",
"additionalProperties": false,
}),
defer_loading: false,
}
}
#[tokio::test]
async fn dispatches_registered_dynamic_tools() {
let registry = DynamicToolRegistry::from_registrations(vec![DynamicToolRegistration::new(
demo_spec("demo_tool"),
|_context, params| async move {
Ok(DynamicToolCallResponse {
content_items: vec![DynamicToolCallOutputContentItem::InputText {
text: params.arguments.to_string(),
}],
success: true,
})
},
)]);
let response = registry
.execute(
DynamicToolExecutionContext::for_tests(),
DynamicToolCallParams {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
call_id: "call-1".to_string(),
tool: "demo_tool".to_string(),
arguments: json!({ "city": "Paris" }),
},
)
.await
.expect("dynamic tool should execute");
assert_eq!(
response,
DynamicToolCallResponse {
content_items: vec![DynamicToolCallOutputContentItem::InputText {
text: json!({ "city": "Paris" }).to_string(),
}],
success: true,
}
);
}
#[tokio::test]
async fn rejects_unknown_dynamic_tools() {
let registry = DynamicToolRegistry::default();
let error = registry
.execute(
DynamicToolExecutionContext::for_tests(),
DynamicToolCallParams {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
call_id: "call-1".to_string(),
tool: "missing_tool".to_string(),
arguments: json!({}),
},
)
.await
.expect_err("unknown tool should be rejected");
assert_eq!(
error,
DynamicToolExecutionError::UnknownTool {
tool: "missing_tool".to_string(),
}
);
}
#[tokio::test]
async fn surfaces_executor_failures() {
let registry = DynamicToolRegistry::from_registrations(vec![DynamicToolRegistration::new(
demo_spec("demo_tool"),
|_context, _params| async move {
Err(DynamicToolExecutionError::failed(
"demo_tool",
"dynamic tool failed",
))
},
)]);
let error = registry
.execute(
DynamicToolExecutionContext::for_tests(),
DynamicToolCallParams {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
call_id: "call-1".to_string(),
tool: "demo_tool".to_string(),
arguments: json!({}),
},
)
.await
.expect_err("executor failure should surface");
assert_eq!(
error,
DynamicToolExecutionError::ExecutionFailed {
tool: "demo_tool".to_string(),
message: "dynamic tool failed".to_string(),
}
);
}
#[tokio::test]
async fn supervisor_runs_fallback_when_task_panics() {
let (panic_seen_tx, panic_seen_rx) = oneshot::channel();
let result = super::supervise_spawned_task(
async move {
panic!("boom");
#[allow(unreachable_code)]
Ok(())
},
move |join_err: tokio::task::JoinError| async move {
assert!(join_err.is_panic());
panic_seen_tx
.send(())
.expect("panic notification should send");
Ok(())
},
)
.await;
assert_eq!(result, Ok(()));
panic_seen_rx
.await
.expect("panic fallback should be invoked");
}
}

View File

@@ -89,6 +89,7 @@ pub mod custom_terminal;
mod cwd_prompt;
mod debug_config;
mod diff_render;
mod dynamic_tools;
mod exec_cell;
mod exec_command;
mod external_editor;