mirror of
https://github.com/openai/codex.git
synced 2026-04-20 12:44:47 +00:00
Compare commits
8 Commits
dev/shaqay
...
etraut/dyn
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
57f89d3c39 | ||
|
|
c422ccd890 | ||
|
|
383fee24fc | ||
|
|
806addef65 | ||
|
|
6e97a343ab | ||
|
|
eb11b49fd2 | ||
|
|
eb493a8876 | ||
|
|
0c267abf83 |
3
codex-rs/Cargo.lock
generated
3
codex-rs/Cargo.lock
generated
@@ -2585,6 +2585,7 @@ name = "codex-tui-app-server"
|
||||
version = "0.0.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"app_test_support",
|
||||
"arboard",
|
||||
"assert_matches",
|
||||
"base64 0.22.1",
|
||||
@@ -2619,6 +2620,7 @@ dependencies = [
|
||||
"codex-utils-string",
|
||||
"codex-windows-sandbox",
|
||||
"color-eyre",
|
||||
"core_test_support",
|
||||
"cpal",
|
||||
"crossterm",
|
||||
"derive_more 2.1.1",
|
||||
@@ -2668,6 +2670,7 @@ dependencies = [
|
||||
"which",
|
||||
"windows-sys 0.52.0",
|
||||
"winsplit",
|
||||
"wiremock",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -737,6 +737,60 @@ impl InProcessAppServerRequestHandle {
|
||||
serde_json::from_value(result)
|
||||
.map_err(|source| TypedRequestError::Deserialize { method, source })
|
||||
}
|
||||
|
||||
pub async fn resolve_server_request(
|
||||
&self,
|
||||
request_id: RequestId,
|
||||
result: JsonRpcResult,
|
||||
) -> IoResult<()> {
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
self.command_tx
|
||||
.send(ClientCommand::ResolveServerRequest {
|
||||
request_id,
|
||||
result,
|
||||
response_tx,
|
||||
})
|
||||
.await
|
||||
.map_err(|_| {
|
||||
IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"in-process app-server worker channel is closed",
|
||||
)
|
||||
})?;
|
||||
response_rx.await.map_err(|_| {
|
||||
IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"in-process app-server resolve channel is closed",
|
||||
)
|
||||
})?
|
||||
}
|
||||
|
||||
pub async fn reject_server_request(
|
||||
&self,
|
||||
request_id: RequestId,
|
||||
error: JSONRPCErrorError,
|
||||
) -> IoResult<()> {
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
self.command_tx
|
||||
.send(ClientCommand::RejectServerRequest {
|
||||
request_id,
|
||||
error,
|
||||
response_tx,
|
||||
})
|
||||
.await
|
||||
.map_err(|_| {
|
||||
IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"in-process app-server worker channel is closed",
|
||||
)
|
||||
})?;
|
||||
response_rx.await.map_err(|_| {
|
||||
IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"in-process app-server reject channel is closed",
|
||||
)
|
||||
})?
|
||||
}
|
||||
}
|
||||
|
||||
impl AppServerRequestHandle {
|
||||
@@ -756,6 +810,28 @@ impl AppServerRequestHandle {
|
||||
Self::Remote(handle) => handle.request_typed(request).await,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn resolve_server_request(
|
||||
&self,
|
||||
request_id: RequestId,
|
||||
result: JsonRpcResult,
|
||||
) -> IoResult<()> {
|
||||
match self {
|
||||
Self::InProcess(handle) => handle.resolve_server_request(request_id, result).await,
|
||||
Self::Remote(handle) => handle.resolve_server_request(request_id, result).await,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn reject_server_request(
|
||||
&self,
|
||||
request_id: RequestId,
|
||||
error: JSONRPCErrorError,
|
||||
) -> IoResult<()> {
|
||||
match self {
|
||||
Self::InProcess(handle) => handle.reject_server_request(request_id, error).await,
|
||||
Self::Remote(handle) => handle.reject_server_request(request_id, error).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AppServerClient {
|
||||
|
||||
@@ -631,6 +631,60 @@ impl RemoteAppServerRequestHandle {
|
||||
serde_json::from_value(result)
|
||||
.map_err(|source| TypedRequestError::Deserialize { method, source })
|
||||
}
|
||||
|
||||
pub async fn resolve_server_request(
|
||||
&self,
|
||||
request_id: RequestId,
|
||||
result: JsonRpcResult,
|
||||
) -> IoResult<()> {
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
self.command_tx
|
||||
.send(RemoteClientCommand::ResolveServerRequest {
|
||||
request_id,
|
||||
result,
|
||||
response_tx,
|
||||
})
|
||||
.await
|
||||
.map_err(|_| {
|
||||
IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"remote app-server worker channel is closed",
|
||||
)
|
||||
})?;
|
||||
response_rx.await.map_err(|_| {
|
||||
IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"remote app-server resolve channel is closed",
|
||||
)
|
||||
})?
|
||||
}
|
||||
|
||||
pub async fn reject_server_request(
|
||||
&self,
|
||||
request_id: RequestId,
|
||||
error: JSONRPCErrorError,
|
||||
) -> IoResult<()> {
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
self.command_tx
|
||||
.send(RemoteClientCommand::RejectServerRequest {
|
||||
request_id,
|
||||
error,
|
||||
response_tx,
|
||||
})
|
||||
.await
|
||||
.map_err(|_| {
|
||||
IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"remote app-server worker channel is closed",
|
||||
)
|
||||
})?;
|
||||
response_rx.await.map_err(|_| {
|
||||
IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"remote app-server reject channel is closed",
|
||||
)
|
||||
})?
|
||||
}
|
||||
}
|
||||
|
||||
async fn initialize_remote_connection(
|
||||
|
||||
@@ -135,8 +135,10 @@ arboard = { workspace = true }
|
||||
|
||||
|
||||
[dev-dependencies]
|
||||
app_test_support = { path = "../app-server/tests/common" }
|
||||
codex-cli = { workspace = true }
|
||||
codex-core = { workspace = true }
|
||||
core_test_support = { path = "../core/tests/common" }
|
||||
codex-utils-cargo-bin = { workspace = true }
|
||||
codex-utils-pty = { workspace = true }
|
||||
assert_matches = { workspace = true }
|
||||
@@ -147,3 +149,4 @@ rand = { workspace = true }
|
||||
serial_test = { workspace = true }
|
||||
vt100 = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
wiremock = { workspace = true }
|
||||
|
||||
@@ -4680,6 +4680,7 @@ mod tests {
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::time::Duration;
|
||||
use tempfile::tempdir;
|
||||
use tokio::time;
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@ use crate::app_event::AppEvent;
|
||||
use crate::app_server_session::AppServerSession;
|
||||
use crate::app_server_session::app_server_rate_limit_snapshot_to_core;
|
||||
use crate::app_server_session::status_account_display_from_auth_mode;
|
||||
use crate::dynamic_tools::spawn_dynamic_tool_call_task;
|
||||
use crate::local_chatgpt_auth::load_local_chatgpt_auth;
|
||||
use codex_app_server_client::AppServerEvent;
|
||||
use codex_app_server_protocol::ChatgptAuthTokensRefreshParams;
|
||||
@@ -163,6 +164,13 @@ impl App {
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
if let ServerRequest::DynamicToolCall { request_id, params } = request {
|
||||
let registry = app_server_client.dynamic_tool_registry();
|
||||
let context =
|
||||
app_server_client.dynamic_tool_execution_context(¶ms.thread_id);
|
||||
spawn_dynamic_tool_call_task(registry, context, request_id, params);
|
||||
return;
|
||||
}
|
||||
if let Some(unsupported) = self
|
||||
.pending_app_server_requests
|
||||
.note_server_request(&request)
|
||||
@@ -858,14 +866,31 @@ fn app_server_codex_error_info_to_core(
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::App;
|
||||
use super::server_notification_thread_events;
|
||||
use super::thread_snapshot_events;
|
||||
use super::turn_snapshot_events;
|
||||
use crate::app::AgentNavigationState;
|
||||
use crate::app::PendingAppServerRequests;
|
||||
use crate::app::WindowsSandboxState;
|
||||
use crate::app_backtrack::BacktrackState;
|
||||
use crate::chatwidget::tests::make_chatwidget_manual_with_sender;
|
||||
use crate::dynamic_tools::DynamicToolRegistration;
|
||||
use crate::dynamic_tools::DynamicToolRegistry;
|
||||
use crate::file_search::FileSearchManager;
|
||||
use app_test_support::create_final_assistant_message_sse_response;
|
||||
use app_test_support::create_mock_responses_server_sequence_unchecked;
|
||||
use codex_app_server_client::AppServerClient;
|
||||
use codex_app_server_client::AppServerEvent;
|
||||
use codex_app_server_protocol::AgentMessageDeltaNotification;
|
||||
use codex_app_server_protocol::CodexErrorInfo;
|
||||
use codex_app_server_protocol::DynamicToolCallOutputContentItem;
|
||||
use codex_app_server_protocol::DynamicToolCallResponse;
|
||||
use codex_app_server_protocol::DynamicToolSpec;
|
||||
use codex_app_server_protocol::ItemCompletedNotification;
|
||||
use codex_app_server_protocol::ReasoningSummaryTextDeltaNotification;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_app_server_protocol::Thread;
|
||||
use codex_app_server_protocol::ThreadItem;
|
||||
use codex_app_server_protocol::ThreadStatus;
|
||||
@@ -873,6 +898,13 @@ mod tests {
|
||||
use codex_app_server_protocol::TurnCompletedNotification;
|
||||
use codex_app_server_protocol::TurnError;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_arg0::Arg0DispatchPaths;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::config::ConfigBuilder;
|
||||
use codex_core::config::ConfigOverrides;
|
||||
use codex_core::config_loader::CloudRequirementsLoader;
|
||||
use codex_core::config_loader::LoaderOverrides;
|
||||
use codex_otel::SessionTelemetry;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::items::AgentMessageContent;
|
||||
use codex_protocol::items::AgentMessageItem;
|
||||
@@ -882,8 +914,294 @@ mod tests {
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::TurnAbortReason;
|
||||
use codex_protocol::protocol::TurnAbortedEvent;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use core_test_support::responses;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::VecDeque;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::time::Duration;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
use wiremock::MockServer;
|
||||
|
||||
use crate::app::FeedbackAudience;
|
||||
|
||||
async fn make_test_app() -> App {
|
||||
let (chat_widget, app_event_tx, _rx, _op_rx) = make_chatwidget_manual_with_sender().await;
|
||||
let config = chat_widget.config_ref().clone();
|
||||
let file_search = FileSearchManager::new(config.cwd.clone(), app_event_tx.clone());
|
||||
let model = codex_core::test_support::get_model_offline(config.model.as_deref());
|
||||
let model_info =
|
||||
codex_core::test_support::construct_model_info_offline(model.as_str(), &config);
|
||||
let session_telemetry = SessionTelemetry::new(
|
||||
ThreadId::new(),
|
||||
model.as_str(),
|
||||
model_info.slug.as_str(),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
codex_core::default_client::originator().value,
|
||||
config.otel.log_user_prompt,
|
||||
codex_core::terminal::user_agent(),
|
||||
SessionSource::Cli,
|
||||
);
|
||||
|
||||
App {
|
||||
model_catalog: chat_widget.model_catalog(),
|
||||
session_telemetry,
|
||||
app_event_tx,
|
||||
chat_widget,
|
||||
config,
|
||||
active_profile: None,
|
||||
cli_kv_overrides: Vec::new(),
|
||||
harness_overrides: ConfigOverrides::default(),
|
||||
runtime_approval_policy_override: None,
|
||||
runtime_sandbox_policy_override: None,
|
||||
file_search,
|
||||
transcript_cells: Vec::new(),
|
||||
overlay: None,
|
||||
deferred_history_lines: Vec::new(),
|
||||
has_emitted_history_lines: false,
|
||||
enhanced_keys_supported: false,
|
||||
commit_anim_running: Arc::new(AtomicBool::new(false)),
|
||||
status_line_invalid_items_warned: Arc::new(AtomicBool::new(false)),
|
||||
backtrack: BacktrackState::default(),
|
||||
backtrack_render_pending: false,
|
||||
feedback: codex_feedback::CodexFeedback::new(),
|
||||
feedback_audience: FeedbackAudience::External,
|
||||
remote_app_server_url: None,
|
||||
pending_update_action: None,
|
||||
suppress_shutdown_complete: false,
|
||||
pending_shutdown_exit_thread_id: None,
|
||||
windows_sandbox: WindowsSandboxState::default(),
|
||||
thread_event_channels: HashMap::new(),
|
||||
thread_event_listener_tasks: HashMap::new(),
|
||||
agent_navigation: AgentNavigationState::default(),
|
||||
active_thread_id: None,
|
||||
active_thread_rx: None,
|
||||
primary_thread_id: None,
|
||||
primary_session_configured: None,
|
||||
pending_primary_events: VecDeque::new(),
|
||||
pending_app_server_requests: PendingAppServerRequests::default(),
|
||||
}
|
||||
}
|
||||
|
||||
fn mock_provider_cli_overrides(server_uri: &str) -> Vec<(String, toml::Value)> {
|
||||
vec![
|
||||
(
|
||||
"model".to_string(),
|
||||
toml::Value::String("mock-model".to_string()),
|
||||
),
|
||||
(
|
||||
"model_provider".to_string(),
|
||||
toml::Value::String("mock_provider".to_string()),
|
||||
),
|
||||
(
|
||||
"approval_policy".to_string(),
|
||||
toml::Value::String("never".to_string()),
|
||||
),
|
||||
(
|
||||
"sandbox_mode".to_string(),
|
||||
toml::Value::String("read-only".to_string()),
|
||||
),
|
||||
(
|
||||
"model_providers.mock_provider.name".to_string(),
|
||||
toml::Value::String("Mock provider for test".to_string()),
|
||||
),
|
||||
(
|
||||
"model_providers.mock_provider.base_url".to_string(),
|
||||
toml::Value::String(format!("{server_uri}/v1")),
|
||||
),
|
||||
(
|
||||
"model_providers.mock_provider.wire_api".to_string(),
|
||||
toml::Value::String("responses".to_string()),
|
||||
),
|
||||
(
|
||||
"model_providers.mock_provider.request_max_retries".to_string(),
|
||||
toml::Value::Integer(0),
|
||||
),
|
||||
(
|
||||
"model_providers.mock_provider.stream_max_retries".to_string(),
|
||||
toml::Value::Integer(0),
|
||||
),
|
||||
]
|
||||
}
|
||||
|
||||
async fn build_dynamic_tool_test_session(
|
||||
temp_dir: &TempDir,
|
||||
cli_overrides: Vec<(String, toml::Value)>,
|
||||
dynamic_tools: Arc<DynamicToolRegistry>,
|
||||
) -> color_eyre::Result<(crate::app_server_session::AppServerSession, Config)> {
|
||||
let config = ConfigBuilder::default()
|
||||
.codex_home(temp_dir.path().to_path_buf())
|
||||
.cli_overrides(cli_overrides.clone())
|
||||
.build()
|
||||
.await?;
|
||||
let client = crate::start_embedded_app_server(
|
||||
Arg0DispatchPaths::default(),
|
||||
config.clone(),
|
||||
cli_overrides,
|
||||
LoaderOverrides::default(),
|
||||
CloudRequirementsLoader::default(),
|
||||
codex_feedback::CodexFeedback::new(),
|
||||
)
|
||||
.await?;
|
||||
Ok((
|
||||
crate::app_server_session::AppServerSession::new_with_dynamic_tools(
|
||||
AppServerClient::InProcess(client),
|
||||
dynamic_tools,
|
||||
),
|
||||
config,
|
||||
))
|
||||
}
|
||||
|
||||
async fn responses_bodies(server: &MockServer) -> color_eyre::Result<Vec<Value>> {
|
||||
let mut bodies = Vec::new();
|
||||
for request in server
|
||||
.received_requests()
|
||||
.await
|
||||
.expect("requests should be readable")
|
||||
{
|
||||
if request.url.path().ends_with("/responses") {
|
||||
bodies.push(
|
||||
request
|
||||
.body_json::<Value>()
|
||||
.expect("request body should be json"),
|
||||
);
|
||||
}
|
||||
}
|
||||
Ok(bodies)
|
||||
}
|
||||
|
||||
fn function_call_output_text(body: &Value, call_id: &str) -> Option<String> {
|
||||
body.get("input")
|
||||
.and_then(Value::as_array)
|
||||
.and_then(|items| {
|
||||
items.iter().find(|item| {
|
||||
item.get("type").and_then(Value::as_str) == Some("function_call_output")
|
||||
&& item.get("call_id").and_then(Value::as_str) == Some(call_id)
|
||||
})
|
||||
})
|
||||
.and_then(|item| item.get("output"))
|
||||
.and_then(|output| {
|
||||
serde_json::from_value::<codex_protocol::models::FunctionCallOutputPayload>(
|
||||
output.clone(),
|
||||
)
|
||||
.ok()
|
||||
})
|
||||
.and_then(|payload| match payload.body {
|
||||
codex_protocol::models::FunctionCallOutputBody::Text(text) => Some(text),
|
||||
codex_protocol::models::FunctionCallOutputBody::ContentItems(_) => None,
|
||||
})
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn handle_app_server_event_resolves_dynamic_tool_calls_via_request_handle()
|
||||
-> color_eyre::Result<()> {
|
||||
let call_id = "dyn-call-1";
|
||||
let tool_name = "demo_tool";
|
||||
let tool_args = json!({ "city": "Paris" });
|
||||
let tool_call_arguments = serde_json::to_string(&tool_args)?;
|
||||
let responses = vec![
|
||||
responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_function_call(call_id, tool_name, &tool_call_arguments),
|
||||
responses::ev_completed("resp-1"),
|
||||
]),
|
||||
create_final_assistant_message_sse_response("Done")
|
||||
.map_err(|err| color_eyre::eyre::eyre!(err))?,
|
||||
];
|
||||
let server = create_mock_responses_server_sequence_unchecked(responses).await;
|
||||
let temp_dir = TempDir::new()?;
|
||||
let cli_overrides = mock_provider_cli_overrides(&server.uri());
|
||||
|
||||
let registry = Arc::new(DynamicToolRegistry::from_registrations(vec![
|
||||
DynamicToolRegistration::new(
|
||||
DynamicToolSpec {
|
||||
name: tool_name.to_string(),
|
||||
description: "dynamic tool".to_string(),
|
||||
input_schema: json!({
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"city": { "type": "string" }
|
||||
},
|
||||
"required": ["city"],
|
||||
"additionalProperties": false,
|
||||
}),
|
||||
defer_loading: false,
|
||||
},
|
||||
|_context, _params| async move {
|
||||
Ok(DynamicToolCallResponse {
|
||||
content_items: vec![DynamicToolCallOutputContentItem::InputText {
|
||||
text: _context
|
||||
.cwd()
|
||||
.expect("thread cwd should be present")
|
||||
.display()
|
||||
.to_string(),
|
||||
}],
|
||||
success: true,
|
||||
})
|
||||
},
|
||||
),
|
||||
]));
|
||||
let (mut session, config) =
|
||||
build_dynamic_tool_test_session(&temp_dir, cli_overrides, registry).await?;
|
||||
let started = session.start_thread(&config).await?;
|
||||
session
|
||||
.turn_start(
|
||||
started.session_configured.session_id,
|
||||
vec![UserInput::Text {
|
||||
text: "Run the tool".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
config.cwd.clone(),
|
||||
config.permissions.approval_policy.value(),
|
||||
config.approvals_reviewer,
|
||||
config.permissions.sandbox_policy.get().clone(),
|
||||
config
|
||||
.model
|
||||
.clone()
|
||||
.expect("mock model should be configured"),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut app = make_test_app().await;
|
||||
let mut saw_dynamic_tool_request = false;
|
||||
loop {
|
||||
let event = timeout(Duration::from_secs(10), session.next_event())
|
||||
.await?
|
||||
.expect("app-server event stream should stay open");
|
||||
if matches!(
|
||||
&event,
|
||||
AppServerEvent::ServerRequest(ServerRequest::DynamicToolCall { .. })
|
||||
) {
|
||||
saw_dynamic_tool_request = true;
|
||||
}
|
||||
app.handle_app_server_event(&session, event).await;
|
||||
|
||||
if let Some(text) = responses_bodies(&server)
|
||||
.await?
|
||||
.iter()
|
||||
.find_map(|body| function_call_output_text(body, call_id))
|
||||
{
|
||||
assert_eq!(text, config.cwd.display().to_string());
|
||||
break;
|
||||
}
|
||||
}
|
||||
assert!(saw_dynamic_tool_request);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bridges_completed_agent_messages_from_server_notifications() {
|
||||
|
||||
@@ -92,13 +92,7 @@ impl PendingAppServerRequests {
|
||||
}
|
||||
None
|
||||
}
|
||||
ServerRequest::DynamicToolCall { request_id, .. } => {
|
||||
Some(UnsupportedAppServerRequest {
|
||||
request_id: request_id.clone(),
|
||||
message: "Dynamic tool calls are not available in app-server TUI yet."
|
||||
.to_string(),
|
||||
})
|
||||
}
|
||||
ServerRequest::DynamicToolCall { .. } => None,
|
||||
ServerRequest::ChatgptAuthTokensRefresh { .. } => None,
|
||||
ServerRequest::ApplyPatchApproval { request_id, .. } => {
|
||||
Some(UnsupportedAppServerRequest {
|
||||
@@ -580,10 +574,10 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rejects_dynamic_tool_calls_as_unsupported() {
|
||||
fn does_not_track_dynamic_tool_calls_in_pending_request_map() {
|
||||
let mut pending = PendingAppServerRequests::default();
|
||||
let unsupported = pending
|
||||
.note_server_request(&ServerRequest::DynamicToolCall {
|
||||
assert_eq!(
|
||||
pending.note_server_request(&ServerRequest::DynamicToolCall {
|
||||
request_id: AppServerRequestId::Integer(99),
|
||||
params: codex_app_server_protocol::DynamicToolCallParams {
|
||||
thread_id: "thread-1".to_string(),
|
||||
@@ -592,13 +586,8 @@ mod tests {
|
||||
tool: "tool".to_string(),
|
||||
arguments: json!({}),
|
||||
},
|
||||
})
|
||||
.expect("dynamic tool calls should be rejected");
|
||||
|
||||
assert_eq!(unsupported.request_id, AppServerRequestId::Integer(99));
|
||||
assert_eq!(
|
||||
unsupported.message,
|
||||
"Dynamic tool calls are not available in app-server TUI yet."
|
||||
}),
|
||||
None
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ use codex_app_server_client::AppServerRequestHandle;
|
||||
use codex_app_server_protocol::Account;
|
||||
use codex_app_server_protocol::AuthMode;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::DynamicToolSpec;
|
||||
use codex_app_server_protocol::GetAccountParams;
|
||||
use codex_app_server_protocol::GetAccountRateLimitsResponse;
|
||||
use codex_app_server_protocol::GetAccountResponse;
|
||||
@@ -75,8 +76,12 @@ use color_eyre::eyre::Result;
|
||||
use color_eyre::eyre::WrapErr;
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
|
||||
use crate::bottom_pane::FeedbackAudience;
|
||||
use crate::dynamic_tools::DynamicToolExecutionContext;
|
||||
use crate::dynamic_tools::DynamicToolRegistry;
|
||||
use crate::status::StatusAccountDisplay;
|
||||
|
||||
pub(crate) struct AppServerBootstrap {
|
||||
@@ -94,6 +99,8 @@ pub(crate) struct AppServerBootstrap {
|
||||
|
||||
pub(crate) struct AppServerSession {
|
||||
client: AppServerClient,
|
||||
dynamic_tools: Arc<DynamicToolRegistry>,
|
||||
thread_cwds: RwLock<HashMap<String, PathBuf>>,
|
||||
next_request_id: i64,
|
||||
}
|
||||
|
||||
@@ -128,8 +135,17 @@ pub(crate) struct AppServerStartedThread {
|
||||
|
||||
impl AppServerSession {
|
||||
pub(crate) fn new(client: AppServerClient) -> Self {
|
||||
Self::new_with_dynamic_tools(client, Arc::new(DynamicToolRegistry::tui_owned()))
|
||||
}
|
||||
|
||||
pub(crate) fn new_with_dynamic_tools(
|
||||
client: AppServerClient,
|
||||
dynamic_tools: Arc<DynamicToolRegistry>,
|
||||
) -> Self {
|
||||
Self {
|
||||
client,
|
||||
dynamic_tools,
|
||||
thread_cwds: RwLock::new(HashMap::new()),
|
||||
next_request_id: 1,
|
||||
}
|
||||
}
|
||||
@@ -262,11 +278,19 @@ impl AppServerSession {
|
||||
.client
|
||||
.request_typed(ClientRequest::ThreadStart {
|
||||
request_id,
|
||||
params: thread_start_params_from_config(config, self.thread_params_mode()),
|
||||
params: thread_start_params_from_config(
|
||||
config,
|
||||
self.thread_params_mode(),
|
||||
self.dynamic_tools.specs(),
|
||||
),
|
||||
})
|
||||
.await
|
||||
.wrap_err("thread/start failed during TUI bootstrap")?;
|
||||
started_thread_from_start_response(response)
|
||||
let thread_id = response.thread.id.clone();
|
||||
let cwd = response.cwd.clone();
|
||||
let started = started_thread_from_start_response(response)?;
|
||||
self.remember_thread_cwd(thread_id, cwd);
|
||||
Ok(started)
|
||||
}
|
||||
|
||||
pub(crate) async fn resume_thread(
|
||||
@@ -288,7 +312,11 @@ impl AppServerSession {
|
||||
})
|
||||
.await
|
||||
.wrap_err("thread/resume failed during TUI bootstrap")?;
|
||||
started_thread_from_resume_response(response, show_raw_agent_reasoning)
|
||||
let thread_id = response.thread.id.clone();
|
||||
let cwd = response.cwd.clone();
|
||||
let started = started_thread_from_resume_response(response, show_raw_agent_reasoning)?;
|
||||
self.remember_thread_cwd(thread_id, cwd);
|
||||
Ok(started)
|
||||
}
|
||||
|
||||
pub(crate) async fn fork_thread(
|
||||
@@ -310,7 +338,11 @@ impl AppServerSession {
|
||||
})
|
||||
.await
|
||||
.wrap_err("thread/fork failed during TUI bootstrap")?;
|
||||
started_thread_from_fork_response(response, show_raw_agent_reasoning)
|
||||
let thread_id = response.thread.id.clone();
|
||||
let cwd = response.cwd.clone();
|
||||
let started = started_thread_from_fork_response(response, show_raw_agent_reasoning)?;
|
||||
self.remember_thread_cwd(thread_id, cwd);
|
||||
Ok(started)
|
||||
}
|
||||
|
||||
fn thread_params_mode(&self) -> ThreadParamsMode {
|
||||
@@ -369,13 +401,16 @@ impl AppServerSession {
|
||||
output_schema: Option<serde_json::Value>,
|
||||
) -> Result<TurnStartResponse> {
|
||||
let request_id = self.next_request_id();
|
||||
self.client
|
||||
let request_cwd = cwd.clone();
|
||||
self.remember_thread_cwd(thread_id.to_string(), cwd);
|
||||
let response = self
|
||||
.client
|
||||
.request_typed(ClientRequest::TurnStart {
|
||||
request_id,
|
||||
params: TurnStartParams {
|
||||
thread_id: thread_id.to_string(),
|
||||
input: items.into_iter().map(Into::into).collect(),
|
||||
cwd: Some(cwd),
|
||||
cwd: Some(request_cwd),
|
||||
approval_policy: Some(approval_policy.into()),
|
||||
approvals_reviewer: Some(approvals_reviewer.into()),
|
||||
sandbox_policy: Some(sandbox_policy.into()),
|
||||
@@ -389,7 +424,8 @@ impl AppServerSession {
|
||||
},
|
||||
})
|
||||
.await
|
||||
.wrap_err("turn/start failed in app-server TUI")
|
||||
.wrap_err("turn/start failed in app-server TUI")?;
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
pub(crate) async fn turn_interrupt(
|
||||
@@ -648,11 +684,38 @@ impl AppServerSession {
|
||||
self.client.request_handle()
|
||||
}
|
||||
|
||||
pub(crate) fn dynamic_tool_registry(&self) -> Arc<DynamicToolRegistry> {
|
||||
Arc::clone(&self.dynamic_tools)
|
||||
}
|
||||
|
||||
pub(crate) fn dynamic_tool_execution_context(
|
||||
&self,
|
||||
thread_id: &str,
|
||||
) -> DynamicToolExecutionContext {
|
||||
DynamicToolExecutionContext::new(self.request_handle(), self.thread_cwd(thread_id))
|
||||
}
|
||||
|
||||
fn next_request_id(&mut self) -> RequestId {
|
||||
let request_id = self.next_request_id;
|
||||
self.next_request_id += 1;
|
||||
RequestId::Integer(request_id)
|
||||
}
|
||||
|
||||
fn remember_thread_cwd(&self, thread_id: String, cwd: PathBuf) {
|
||||
match self.thread_cwds.write() {
|
||||
Ok(mut thread_cwds) => {
|
||||
thread_cwds.insert(thread_id, cwd);
|
||||
}
|
||||
Err(err) => panic!("thread cwd map lock should not be poisoned: {err}"),
|
||||
}
|
||||
}
|
||||
|
||||
fn thread_cwd(&self, thread_id: &str) -> Option<PathBuf> {
|
||||
match self.thread_cwds.read() {
|
||||
Ok(thread_cwds) => thread_cwds.get(thread_id).cloned(),
|
||||
Err(err) => panic!("thread cwd map lock should not be poisoned: {err}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn title_case(s: &str) -> String {
|
||||
@@ -773,6 +836,7 @@ fn sandbox_mode_from_policy(
|
||||
fn thread_start_params_from_config(
|
||||
config: &Config,
|
||||
thread_params_mode: ThreadParamsMode,
|
||||
dynamic_tools: Option<Vec<DynamicToolSpec>>,
|
||||
) -> ThreadStartParams {
|
||||
ThreadStartParams {
|
||||
model: config.model.clone(),
|
||||
@@ -782,6 +846,7 @@ fn thread_start_params_from_config(
|
||||
approvals_reviewer: approvals_reviewer_override_from_config(config),
|
||||
sandbox: sandbox_mode_from_policy(config.permissions.sandbox_policy.get().clone()),
|
||||
config: config_request_overrides_from_config(config),
|
||||
dynamic_tools,
|
||||
ephemeral: Some(config.ephemeral),
|
||||
persist_extended_history: true,
|
||||
..ThreadStartParams::default()
|
||||
@@ -1038,39 +1103,259 @@ fn app_server_credits_snapshot_to_core(
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use app_test_support::create_final_assistant_message_sse_response;
|
||||
use app_test_support::create_mock_responses_server_sequence_unchecked;
|
||||
use codex_app_server_client::AppServerClient;
|
||||
use codex_app_server_client::AppServerEvent;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::DynamicToolCallOutputContentItem;
|
||||
use codex_app_server_protocol::DynamicToolCallParams;
|
||||
use codex_app_server_protocol::DynamicToolCallResponse;
|
||||
use codex_app_server_protocol::DynamicToolSpec;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_app_server_protocol::ThreadResumeResponse;
|
||||
use codex_app_server_protocol::ThreadStatus;
|
||||
use codex_app_server_protocol::Turn;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_arg0::Arg0DispatchPaths;
|
||||
use codex_core::config::ConfigBuilder;
|
||||
use codex_core::config_loader::CloudRequirementsLoader;
|
||||
use codex_core::config_loader::LoaderOverrides;
|
||||
use codex_protocol::models::FunctionCallOutputPayload;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use core_test_support::responses;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
use wiremock::MockServer;
|
||||
|
||||
async fn build_config(temp_dir: &TempDir) -> Config {
|
||||
use crate::dynamic_tools::DynamicToolExecutionError;
|
||||
use crate::dynamic_tools::DynamicToolRegistration;
|
||||
use crate::dynamic_tools::DynamicToolRegistry;
|
||||
use crate::dynamic_tools::handle_dynamic_tool_call_request;
|
||||
|
||||
const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
|
||||
fn mock_provider_cli_overrides(server_uri: &str) -> Vec<(String, toml::Value)> {
|
||||
vec![
|
||||
(
|
||||
"model".to_string(),
|
||||
toml::Value::String("mock-model".to_string()),
|
||||
),
|
||||
(
|
||||
"model_provider".to_string(),
|
||||
toml::Value::String("mock_provider".to_string()),
|
||||
),
|
||||
(
|
||||
"approval_policy".to_string(),
|
||||
toml::Value::String("never".to_string()),
|
||||
),
|
||||
(
|
||||
"sandbox_mode".to_string(),
|
||||
toml::Value::String("read-only".to_string()),
|
||||
),
|
||||
(
|
||||
"model_providers.mock_provider.name".to_string(),
|
||||
toml::Value::String("Mock provider for test".to_string()),
|
||||
),
|
||||
(
|
||||
"model_providers.mock_provider.base_url".to_string(),
|
||||
toml::Value::String(format!("{server_uri}/v1")),
|
||||
),
|
||||
(
|
||||
"model_providers.mock_provider.wire_api".to_string(),
|
||||
toml::Value::String("responses".to_string()),
|
||||
),
|
||||
(
|
||||
"model_providers.mock_provider.request_max_retries".to_string(),
|
||||
toml::Value::Integer(0),
|
||||
),
|
||||
(
|
||||
"model_providers.mock_provider.stream_max_retries".to_string(),
|
||||
toml::Value::Integer(0),
|
||||
),
|
||||
]
|
||||
}
|
||||
|
||||
async fn build_config(temp_dir: &TempDir, cli_overrides: Vec<(String, toml::Value)>) -> Config {
|
||||
ConfigBuilder::default()
|
||||
.codex_home(temp_dir.path().to_path_buf())
|
||||
.cli_overrides(cli_overrides)
|
||||
.build()
|
||||
.await
|
||||
.expect("config should build")
|
||||
}
|
||||
|
||||
async fn start_test_session(
|
||||
config: Config,
|
||||
cli_overrides: Vec<(String, toml::Value)>,
|
||||
dynamic_tools: Arc<DynamicToolRegistry>,
|
||||
) -> Result<AppServerSession> {
|
||||
let client = crate::start_embedded_app_server(
|
||||
Arg0DispatchPaths::default(),
|
||||
config,
|
||||
cli_overrides,
|
||||
LoaderOverrides::default(),
|
||||
CloudRequirementsLoader::default(),
|
||||
codex_feedback::CodexFeedback::new(),
|
||||
)
|
||||
.await?;
|
||||
Ok(AppServerSession::new_with_dynamic_tools(
|
||||
AppServerClient::InProcess(client),
|
||||
dynamic_tools,
|
||||
))
|
||||
}
|
||||
|
||||
fn demo_tool_spec(name: &str) -> DynamicToolSpec {
|
||||
DynamicToolSpec {
|
||||
name: name.to_string(),
|
||||
description: format!("dynamic tool {name}"),
|
||||
input_schema: json!({
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"city": { "type": "string" }
|
||||
},
|
||||
"required": ["city"],
|
||||
"additionalProperties": false,
|
||||
}),
|
||||
defer_loading: false,
|
||||
}
|
||||
}
|
||||
|
||||
async fn responses_bodies(server: &MockServer) -> Result<Vec<Value>> {
|
||||
let mut bodies = Vec::new();
|
||||
for request in server
|
||||
.received_requests()
|
||||
.await
|
||||
.expect("requests should be readable")
|
||||
{
|
||||
if request.url.path().ends_with("/responses") {
|
||||
bodies.push(
|
||||
request
|
||||
.body_json::<Value>()
|
||||
.expect("request body should be json"),
|
||||
);
|
||||
}
|
||||
}
|
||||
Ok(bodies)
|
||||
}
|
||||
|
||||
fn function_call_output_payload(
|
||||
body: &Value,
|
||||
call_id: &str,
|
||||
) -> Option<FunctionCallOutputPayload> {
|
||||
body.get("input")
|
||||
.and_then(Value::as_array)
|
||||
.and_then(|items| {
|
||||
items.iter().find(|item| {
|
||||
item.get("type").and_then(Value::as_str) == Some("function_call_output")
|
||||
&& item.get("call_id").and_then(Value::as_str) == Some(call_id)
|
||||
})
|
||||
})
|
||||
.and_then(|item| item.get("output"))
|
||||
.cloned()
|
||||
.and_then(|output| serde_json::from_value(output).ok())
|
||||
}
|
||||
|
||||
async fn run_dynamic_tool_turn(
|
||||
session: &mut AppServerSession,
|
||||
config: &Config,
|
||||
thread_id: ThreadId,
|
||||
prompt: &str,
|
||||
) -> Result<()> {
|
||||
session
|
||||
.turn_start(
|
||||
thread_id,
|
||||
vec![UserInput::Text {
|
||||
text: prompt.to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
config.cwd.clone(),
|
||||
config.permissions.approval_policy.value(),
|
||||
config.approvals_reviewer,
|
||||
config.permissions.sandbox_policy.get().clone(),
|
||||
config
|
||||
.model
|
||||
.clone()
|
||||
.expect("mock model should be configured"),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
loop {
|
||||
let event = timeout(DEFAULT_READ_TIMEOUT, session.next_event())
|
||||
.await?
|
||||
.expect("app-server event stream should stay open");
|
||||
match event {
|
||||
AppServerEvent::ServerRequest(ServerRequest::DynamicToolCall {
|
||||
request_id,
|
||||
params,
|
||||
}) => {
|
||||
handle_dynamic_tool_call_request(
|
||||
session.dynamic_tool_registry(),
|
||||
session.dynamic_tool_execution_context(¶ms.thread_id),
|
||||
request_id,
|
||||
params,
|
||||
)
|
||||
.await
|
||||
.map_err(|err| color_eyre::eyre::eyre!(err))?;
|
||||
}
|
||||
AppServerEvent::ServerNotification(ServerNotification::TurnCompleted(_)) => {
|
||||
return Ok(());
|
||||
}
|
||||
AppServerEvent::LegacyNotification(notification)
|
||||
if notification.method.ends_with("task_complete") =>
|
||||
{
|
||||
return Ok(());
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_start_params_include_cwd_for_embedded_sessions() {
|
||||
let temp_dir = tempfile::tempdir().expect("tempdir");
|
||||
let config = build_config(&temp_dir).await;
|
||||
let config = build_config(&temp_dir, Vec::new()).await;
|
||||
|
||||
let params = thread_start_params_from_config(&config, ThreadParamsMode::Embedded);
|
||||
let params = thread_start_params_from_config(&config, ThreadParamsMode::Embedded, None);
|
||||
|
||||
assert_eq!(params.cwd, Some(config.cwd.to_string_lossy().to_string()));
|
||||
assert_eq!(params.model_provider, Some(config.model_provider_id));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_start_params_include_registered_dynamic_tools() {
|
||||
let temp_dir = tempfile::tempdir().expect("tempdir");
|
||||
let config = build_config(&temp_dir, Vec::new()).await;
|
||||
let spec = demo_tool_spec("demo_tool");
|
||||
|
||||
let params = thread_start_params_from_config(
|
||||
&config,
|
||||
ThreadParamsMode::Embedded,
|
||||
Some(vec![spec.clone()]),
|
||||
);
|
||||
|
||||
assert_eq!(params.dynamic_tools, Some(vec![spec]));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_lifecycle_params_omit_local_overrides_for_remote_sessions() {
|
||||
let temp_dir = tempfile::tempdir().expect("tempdir");
|
||||
let config = build_config(&temp_dir).await;
|
||||
let config = build_config(&temp_dir, Vec::new()).await;
|
||||
let thread_id = ThreadId::new();
|
||||
|
||||
let start = thread_start_params_from_config(&config, ThreadParamsMode::Remote);
|
||||
let start = thread_start_params_from_config(&config, ThreadParamsMode::Remote, None);
|
||||
let resume =
|
||||
thread_resume_params_from_config(config.clone(), thread_id, ThreadParamsMode::Remote);
|
||||
let fork = thread_fork_params_from_config(config, thread_id, ThreadParamsMode::Remote);
|
||||
@@ -1141,4 +1426,240 @@ mod tests {
|
||||
assert_eq!(started.thread.turns.len(), 1);
|
||||
assert_eq!(started.thread.turns[0].items.len(), 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn dynamic_tool_turn_round_trip_resolves_registered_tools() -> Result<()> {
|
||||
let call_id = "dyn-call-1";
|
||||
let tool_name = "demo_tool";
|
||||
let tool_args = json!({ "city": "Paris" });
|
||||
let tool_call_arguments = serde_json::to_string(&tool_args)?;
|
||||
let responses = vec![
|
||||
responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_function_call(call_id, tool_name, &tool_call_arguments),
|
||||
responses::ev_completed("resp-1"),
|
||||
]),
|
||||
create_final_assistant_message_sse_response("Done")
|
||||
.map_err(|err| color_eyre::eyre::eyre!(err))?,
|
||||
];
|
||||
let server = create_mock_responses_server_sequence_unchecked(responses).await;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
let cli_overrides = mock_provider_cli_overrides(&server.uri());
|
||||
let config = build_config(&codex_home, cli_overrides.clone()).await;
|
||||
let registry = Arc::new(DynamicToolRegistry::from_registrations(vec![
|
||||
DynamicToolRegistration::new(
|
||||
demo_tool_spec(tool_name),
|
||||
|context, _params| async move {
|
||||
Ok(DynamicToolCallResponse {
|
||||
content_items: vec![DynamicToolCallOutputContentItem::InputText {
|
||||
text: context
|
||||
.cwd()
|
||||
.expect("thread cwd should be present")
|
||||
.display()
|
||||
.to_string(),
|
||||
}],
|
||||
success: true,
|
||||
})
|
||||
},
|
||||
),
|
||||
]));
|
||||
let mut session = start_test_session(config.clone(), cli_overrides, registry).await?;
|
||||
let started = session.start_thread(&config).await?;
|
||||
let thread_id = started.session_configured.session_id;
|
||||
|
||||
run_dynamic_tool_turn(&mut session, &config, thread_id, "Run the tool").await?;
|
||||
|
||||
let bodies = responses_bodies(&server).await?;
|
||||
let payload = bodies
|
||||
.iter()
|
||||
.find_map(|body| function_call_output_payload(body, call_id))
|
||||
.expect("expected function_call_output payload");
|
||||
assert_eq!(
|
||||
payload,
|
||||
FunctionCallOutputPayload::from_text(config.cwd.display().to_string())
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn resumed_threads_can_reuse_registered_dynamic_tools() -> Result<()> {
|
||||
let first_call_id = "dyn-call-1";
|
||||
let second_call_id = "dyn-call-2";
|
||||
let tool_name = "demo_tool";
|
||||
let tool_args = json!({ "city": "Paris" });
|
||||
let tool_call_arguments = serde_json::to_string(&tool_args)?;
|
||||
let responses = vec![
|
||||
responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_function_call(first_call_id, tool_name, &tool_call_arguments),
|
||||
responses::ev_completed("resp-1"),
|
||||
]),
|
||||
create_final_assistant_message_sse_response("Done")
|
||||
.map_err(|err| color_eyre::eyre::eyre!(err))?,
|
||||
responses::sse(vec![
|
||||
responses::ev_response_created("resp-2"),
|
||||
responses::ev_function_call(second_call_id, tool_name, &tool_call_arguments),
|
||||
responses::ev_completed("resp-2"),
|
||||
]),
|
||||
create_final_assistant_message_sse_response("Done again")
|
||||
.map_err(|err| color_eyre::eyre::eyre!(err))?,
|
||||
];
|
||||
let server = create_mock_responses_server_sequence_unchecked(responses).await;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
let cli_overrides = mock_provider_cli_overrides(&server.uri());
|
||||
let config = build_config(&codex_home, cli_overrides.clone()).await;
|
||||
let registry = Arc::new(DynamicToolRegistry::from_registrations(vec![
|
||||
DynamicToolRegistration::new(
|
||||
demo_tool_spec(tool_name),
|
||||
|context, _params| async move {
|
||||
Ok(DynamicToolCallResponse {
|
||||
content_items: vec![DynamicToolCallOutputContentItem::InputText {
|
||||
text: context
|
||||
.cwd()
|
||||
.expect("thread cwd should be present")
|
||||
.display()
|
||||
.to_string(),
|
||||
}],
|
||||
success: true,
|
||||
})
|
||||
},
|
||||
),
|
||||
]));
|
||||
|
||||
let mut first_session =
|
||||
start_test_session(config.clone(), cli_overrides.clone(), Arc::clone(®istry))
|
||||
.await?;
|
||||
let started = first_session.start_thread(&config).await?;
|
||||
let thread_id = started.session_configured.session_id;
|
||||
run_dynamic_tool_turn(&mut first_session, &config, thread_id, "Run the tool").await?;
|
||||
let rollout_path = first_session
|
||||
.thread_read(thread_id, /* include_turns */ false)
|
||||
.await?
|
||||
.path
|
||||
.expect("thread should expose a persisted rollout path");
|
||||
first_session.shutdown().await?;
|
||||
|
||||
let resumed_config = build_config(&codex_home, cli_overrides.clone()).await;
|
||||
let mut resumed_session =
|
||||
start_test_session(resumed_config.clone(), cli_overrides, Arc::clone(®istry))
|
||||
.await?;
|
||||
let request_id = resumed_session.next_request_id();
|
||||
let mut params = thread_resume_params_from_config(
|
||||
resumed_config.clone(),
|
||||
thread_id,
|
||||
ThreadParamsMode::Embedded,
|
||||
);
|
||||
params.path = Some(rollout_path);
|
||||
let _: ThreadResumeResponse = resumed_session
|
||||
.client
|
||||
.request_typed(ClientRequest::ThreadResume { request_id, params })
|
||||
.await?;
|
||||
run_dynamic_tool_turn(
|
||||
&mut resumed_session,
|
||||
&resumed_config,
|
||||
thread_id,
|
||||
"Run the tool again",
|
||||
)
|
||||
.await?;
|
||||
|
||||
let bodies = responses_bodies(&server).await?;
|
||||
let first_payload = bodies
|
||||
.iter()
|
||||
.find_map(|body| function_call_output_payload(body, first_call_id))
|
||||
.expect("expected first function_call_output payload");
|
||||
let second_payload = bodies
|
||||
.iter()
|
||||
.find_map(|body| function_call_output_payload(body, second_call_id))
|
||||
.expect("expected second function_call_output payload");
|
||||
assert_eq!(
|
||||
first_payload,
|
||||
FunctionCallOutputPayload::from_text(config.cwd.display().to_string())
|
||||
);
|
||||
assert_eq!(
|
||||
second_payload,
|
||||
FunctionCallOutputPayload::from_text(resumed_config.cwd.display().to_string())
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn unknown_dynamic_tool_requests_are_rejected_cleanly() -> Result<()> {
|
||||
let registry = Arc::new(DynamicToolRegistry::default());
|
||||
let error = handle_dynamic_tool_call_request(
|
||||
registry,
|
||||
DynamicToolExecutionContext::for_tests(),
|
||||
RequestId::Integer(1),
|
||||
DynamicToolCallParams {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
call_id: "call-1".to_string(),
|
||||
tool: "missing_tool".to_string(),
|
||||
arguments: json!({}),
|
||||
},
|
||||
)
|
||||
.await
|
||||
.expect_err("missing request handle should fail before dispatch");
|
||||
|
||||
assert_eq!(
|
||||
error,
|
||||
"dynamic tool execution context is missing an app-server request handle"
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn failing_dynamic_tools_return_failed_responses() -> Result<()> {
|
||||
let call_id = "dyn-call-1";
|
||||
let tool_name = "demo_tool";
|
||||
let tool_args = json!({ "city": "Paris" });
|
||||
let tool_call_arguments = serde_json::to_string(&tool_args)?;
|
||||
let responses = vec![
|
||||
responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_function_call(call_id, tool_name, &tool_call_arguments),
|
||||
responses::ev_completed("resp-1"),
|
||||
]),
|
||||
create_final_assistant_message_sse_response("Done")
|
||||
.map_err(|err| color_eyre::eyre::eyre!(err))?,
|
||||
];
|
||||
let server = create_mock_responses_server_sequence_unchecked(responses).await;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
let cli_overrides = mock_provider_cli_overrides(&server.uri());
|
||||
let config = build_config(&codex_home, cli_overrides.clone()).await;
|
||||
let registry = Arc::new(DynamicToolRegistry::from_registrations(vec![
|
||||
DynamicToolRegistration::new(
|
||||
demo_tool_spec(tool_name),
|
||||
move |_context, _params| async move {
|
||||
Err(DynamicToolExecutionError::failed(
|
||||
tool_name,
|
||||
"dynamic tool failed",
|
||||
))
|
||||
},
|
||||
),
|
||||
]));
|
||||
let mut session = start_test_session(config.clone(), cli_overrides, registry).await?;
|
||||
let started = session.start_thread(&config).await?;
|
||||
|
||||
run_dynamic_tool_turn(
|
||||
&mut session,
|
||||
&config,
|
||||
started.session_configured.session_id,
|
||||
"Run the tool",
|
||||
)
|
||||
.await?;
|
||||
|
||||
let bodies = responses_bodies(&server).await?;
|
||||
let payload = bodies
|
||||
.iter()
|
||||
.find_map(|body| function_call_output_payload(body, call_id))
|
||||
.expect("expected function_call_output payload");
|
||||
assert_eq!(
|
||||
payload,
|
||||
FunctionCallOutputPayload::from_text("dynamic tool failed".to_string())
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
442
codex-rs/tui_app_server/src/dynamic_tools.rs
Normal file
442
codex-rs/tui_app_server/src/dynamic_tools.rs
Normal file
@@ -0,0 +1,442 @@
|
||||
use codex_app_server_client::AppServerRequestHandle;
|
||||
use codex_app_server_protocol::DynamicToolCallOutputContentItem;
|
||||
use codex_app_server_protocol::DynamicToolCallParams;
|
||||
use codex_app_server_protocol::DynamicToolCallResponse;
|
||||
use codex_app_server_protocol::DynamicToolSpec;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use std::collections::HashMap;
|
||||
use std::future::Future;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use tokio::task::JoinError;
|
||||
|
||||
type DynamicToolFuture = Pin<
|
||||
Box<dyn Future<Output = Result<DynamicToolCallResponse, DynamicToolExecutionError>> + Send>,
|
||||
>;
|
||||
type DynamicToolExecutor =
|
||||
dyn Fn(DynamicToolExecutionContext, DynamicToolCallParams) -> DynamicToolFuture + Send + Sync;
|
||||
|
||||
pub(crate) struct DynamicToolRegistration {
|
||||
spec: DynamicToolSpec,
|
||||
executor: Arc<DynamicToolExecutor>,
|
||||
}
|
||||
|
||||
impl DynamicToolRegistration {
|
||||
#[cfg_attr(not(test), allow(dead_code))]
|
||||
pub(crate) fn new<F, Fut>(spec: DynamicToolSpec, executor: F) -> Self
|
||||
where
|
||||
F: Fn(DynamicToolExecutionContext, DynamicToolCallParams) -> Fut + Send + Sync + 'static,
|
||||
Fut: Future<Output = Result<DynamicToolCallResponse, DynamicToolExecutionError>>
|
||||
+ Send
|
||||
+ 'static,
|
||||
{
|
||||
let executor = Arc::new(move |context, params| Box::pin(executor(context, params)) as _);
|
||||
Self { spec, executor }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct DynamicToolRegistry {
|
||||
tools: HashMap<String, DynamicToolRegistration>,
|
||||
}
|
||||
|
||||
impl DynamicToolRegistry {
|
||||
pub(crate) fn tui_owned() -> Self {
|
||||
Self::from_registrations(Vec::new())
|
||||
}
|
||||
|
||||
pub(crate) fn from_registrations(registrations: Vec<DynamicToolRegistration>) -> Self {
|
||||
let tools = registrations
|
||||
.into_iter()
|
||||
.map(|registration| (registration.spec.name.clone(), registration))
|
||||
.collect();
|
||||
Self { tools }
|
||||
}
|
||||
|
||||
pub(crate) fn specs(&self) -> Option<Vec<DynamicToolSpec>> {
|
||||
if self.tools.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let mut specs = self
|
||||
.tools
|
||||
.values()
|
||||
.map(|registration| registration.spec.clone())
|
||||
.collect::<Vec<_>>();
|
||||
specs.sort_by(|left, right| left.name.cmp(&right.name));
|
||||
Some(specs)
|
||||
}
|
||||
|
||||
pub(crate) async fn execute(
|
||||
&self,
|
||||
context: DynamicToolExecutionContext,
|
||||
params: DynamicToolCallParams,
|
||||
) -> Result<DynamicToolCallResponse, DynamicToolExecutionError> {
|
||||
let executor = self
|
||||
.tools
|
||||
.get(¶ms.tool)
|
||||
.map(|registration| Arc::clone(®istration.executor))
|
||||
.ok_or_else(|| DynamicToolExecutionError::UnknownTool {
|
||||
tool: params.tool.clone(),
|
||||
})?;
|
||||
executor(context, params).await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct DynamicToolExecutionContext {
|
||||
request_handle: Option<AppServerRequestHandle>,
|
||||
#[cfg_attr(not(test), allow(dead_code))]
|
||||
cwd: Option<PathBuf>,
|
||||
}
|
||||
|
||||
impl DynamicToolExecutionContext {
|
||||
pub(crate) fn new(request_handle: AppServerRequestHandle, cwd: Option<PathBuf>) -> Self {
|
||||
Self {
|
||||
request_handle: Some(request_handle),
|
||||
cwd,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn for_tests() -> Self {
|
||||
Self {
|
||||
request_handle: None,
|
||||
cwd: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn request_handle(&self) -> Option<&AppServerRequestHandle> {
|
||||
self.request_handle.as_ref()
|
||||
}
|
||||
|
||||
#[cfg_attr(not(test), allow(dead_code))]
|
||||
pub(crate) fn cwd(&self) -> Option<&Path> {
|
||||
self.cwd.as_deref()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg_attr(not(test), allow(dead_code))]
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub(crate) enum DynamicToolExecutionError {
|
||||
UnknownTool { tool: String },
|
||||
ExecutionFailed { tool: String, message: String },
|
||||
}
|
||||
|
||||
impl DynamicToolExecutionError {
|
||||
#[cfg_attr(not(test), allow(dead_code))]
|
||||
pub(crate) fn failed(tool: impl Into<String>, message: impl Into<String>) -> Self {
|
||||
Self::ExecutionFailed {
|
||||
tool: tool.into(),
|
||||
message: message.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_dynamic_tool_call_request(
|
||||
registry: Arc<DynamicToolRegistry>,
|
||||
context: DynamicToolExecutionContext,
|
||||
request_id: RequestId,
|
||||
params: DynamicToolCallParams,
|
||||
) -> Result<(), String> {
|
||||
let Some(request_handle) = context.request_handle().cloned() else {
|
||||
return Err(
|
||||
"dynamic tool execution context is missing an app-server request handle".to_string(),
|
||||
);
|
||||
};
|
||||
|
||||
match registry.execute(context, params.clone()).await {
|
||||
Ok(response) => {
|
||||
let result = serde_json::to_value(response).map_err(|err| {
|
||||
format!(
|
||||
"failed to serialize dynamic tool response for `{}`: {err}",
|
||||
params.tool
|
||||
)
|
||||
})?;
|
||||
request_handle
|
||||
.resolve_server_request(request_id, result)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
format!(
|
||||
"failed to resolve dynamic tool request for `{}`: {err}",
|
||||
params.tool
|
||||
)
|
||||
})
|
||||
}
|
||||
Err(DynamicToolExecutionError::UnknownTool { tool }) => {
|
||||
let message = format!("unknown dynamic tool `{tool}` for this TUI client");
|
||||
request_handle
|
||||
.reject_server_request(
|
||||
request_id,
|
||||
JSONRPCErrorError {
|
||||
code: -32000,
|
||||
message,
|
||||
data: None,
|
||||
},
|
||||
)
|
||||
.await
|
||||
.map_err(|err| format!("failed to reject dynamic tool request for `{tool}`: {err}"))
|
||||
}
|
||||
Err(DynamicToolExecutionError::ExecutionFailed { tool, message }) => {
|
||||
tracing::warn!(tool, %message, "dynamic tool executor failed");
|
||||
let result =
|
||||
serde_json::to_value(dynamic_tool_failure_response(&message)).map_err(|err| {
|
||||
format!("failed to serialize fallback response for `{tool}`: {err}")
|
||||
})?;
|
||||
request_handle
|
||||
.resolve_server_request(request_id, result)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
format!("failed to resolve fallback dynamic tool response for `{tool}`: {err}")
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn spawn_dynamic_tool_call_task(
|
||||
registry: Arc<DynamicToolRegistry>,
|
||||
context: DynamicToolExecutionContext,
|
||||
request_id: RequestId,
|
||||
params: DynamicToolCallParams,
|
||||
) {
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) =
|
||||
supervise_dynamic_tool_call_task(registry, context, request_id, params).await
|
||||
{
|
||||
tracing::warn!("{err}");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async fn supervise_dynamic_tool_call_task(
|
||||
registry: Arc<DynamicToolRegistry>,
|
||||
context: DynamicToolExecutionContext,
|
||||
request_id: RequestId,
|
||||
params: DynamicToolCallParams,
|
||||
) -> Result<(), String> {
|
||||
let panic_context = context.clone();
|
||||
let panic_request_id = request_id.clone();
|
||||
let panic_tool = params.tool.clone();
|
||||
|
||||
supervise_spawned_task(
|
||||
handle_dynamic_tool_call_request(registry, context, request_id, params),
|
||||
move |join_err| async move {
|
||||
reject_panicked_dynamic_tool_request(
|
||||
panic_context,
|
||||
panic_request_id,
|
||||
panic_tool,
|
||||
join_err,
|
||||
)
|
||||
.await
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn supervise_spawned_task<F, P, PF>(future: F, on_join_error: P) -> Result<(), String>
|
||||
where
|
||||
F: Future<Output = Result<(), String>> + Send + 'static,
|
||||
P: FnOnce(JoinError) -> PF,
|
||||
PF: Future<Output = Result<(), String>> + Send,
|
||||
{
|
||||
match tokio::spawn(future).await {
|
||||
Ok(result) => result,
|
||||
Err(err) => on_join_error(err).await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn reject_panicked_dynamic_tool_request(
|
||||
context: DynamicToolExecutionContext,
|
||||
request_id: RequestId,
|
||||
tool: String,
|
||||
join_err: JoinError,
|
||||
) -> Result<(), String> {
|
||||
let Some(request_handle) = context.request_handle().cloned() else {
|
||||
return Err(format!(
|
||||
"dynamic tool `{tool}` task failed before producing a response: {join_err}"
|
||||
));
|
||||
};
|
||||
|
||||
let message = if join_err.is_panic() {
|
||||
format!("dynamic tool `{tool}` panicked before producing a response")
|
||||
} else {
|
||||
format!("dynamic tool `{tool}` was cancelled before producing a response")
|
||||
};
|
||||
tracing::warn!(tool, error = %join_err, "dynamic tool task failed");
|
||||
request_handle
|
||||
.reject_server_request(
|
||||
request_id,
|
||||
JSONRPCErrorError {
|
||||
code: -32000,
|
||||
message,
|
||||
data: None,
|
||||
},
|
||||
)
|
||||
.await
|
||||
.map_err(|err| format!("failed to reject dynamic tool request for `{tool}`: {err}"))
|
||||
}
|
||||
|
||||
pub(crate) fn dynamic_tool_failure_response(message: &str) -> DynamicToolCallResponse {
|
||||
DynamicToolCallResponse {
|
||||
content_items: vec![DynamicToolCallOutputContentItem::InputText {
|
||||
text: message.to_string(),
|
||||
}],
|
||||
success: false,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::DynamicToolExecutionContext;
|
||||
use super::DynamicToolExecutionError;
|
||||
use super::DynamicToolRegistration;
|
||||
use super::DynamicToolRegistry;
|
||||
use codex_app_server_protocol::DynamicToolCallOutputContentItem;
|
||||
use codex_app_server_protocol::DynamicToolCallParams;
|
||||
use codex_app_server_protocol::DynamicToolCallResponse;
|
||||
use codex_app_server_protocol::DynamicToolSpec;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
fn demo_spec(name: &str) -> DynamicToolSpec {
|
||||
DynamicToolSpec {
|
||||
name: name.to_string(),
|
||||
description: format!("dynamic tool {name}"),
|
||||
input_schema: json!({
|
||||
"type": "object",
|
||||
"additionalProperties": false,
|
||||
}),
|
||||
defer_loading: false,
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn dispatches_registered_dynamic_tools() {
|
||||
let registry = DynamicToolRegistry::from_registrations(vec![DynamicToolRegistration::new(
|
||||
demo_spec("demo_tool"),
|
||||
|_context, params| async move {
|
||||
Ok(DynamicToolCallResponse {
|
||||
content_items: vec![DynamicToolCallOutputContentItem::InputText {
|
||||
text: params.arguments.to_string(),
|
||||
}],
|
||||
success: true,
|
||||
})
|
||||
},
|
||||
)]);
|
||||
|
||||
let response = registry
|
||||
.execute(
|
||||
DynamicToolExecutionContext::for_tests(),
|
||||
DynamicToolCallParams {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
call_id: "call-1".to_string(),
|
||||
tool: "demo_tool".to_string(),
|
||||
arguments: json!({ "city": "Paris" }),
|
||||
},
|
||||
)
|
||||
.await
|
||||
.expect("dynamic tool should execute");
|
||||
|
||||
assert_eq!(
|
||||
response,
|
||||
DynamicToolCallResponse {
|
||||
content_items: vec![DynamicToolCallOutputContentItem::InputText {
|
||||
text: json!({ "city": "Paris" }).to_string(),
|
||||
}],
|
||||
success: true,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rejects_unknown_dynamic_tools() {
|
||||
let registry = DynamicToolRegistry::default();
|
||||
|
||||
let error = registry
|
||||
.execute(
|
||||
DynamicToolExecutionContext::for_tests(),
|
||||
DynamicToolCallParams {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
call_id: "call-1".to_string(),
|
||||
tool: "missing_tool".to_string(),
|
||||
arguments: json!({}),
|
||||
},
|
||||
)
|
||||
.await
|
||||
.expect_err("unknown tool should be rejected");
|
||||
|
||||
assert_eq!(
|
||||
error,
|
||||
DynamicToolExecutionError::UnknownTool {
|
||||
tool: "missing_tool".to_string(),
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn surfaces_executor_failures() {
|
||||
let registry = DynamicToolRegistry::from_registrations(vec![DynamicToolRegistration::new(
|
||||
demo_spec("demo_tool"),
|
||||
|_context, _params| async move {
|
||||
Err(DynamicToolExecutionError::failed(
|
||||
"demo_tool",
|
||||
"dynamic tool failed",
|
||||
))
|
||||
},
|
||||
)]);
|
||||
|
||||
let error = registry
|
||||
.execute(
|
||||
DynamicToolExecutionContext::for_tests(),
|
||||
DynamicToolCallParams {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
call_id: "call-1".to_string(),
|
||||
tool: "demo_tool".to_string(),
|
||||
arguments: json!({}),
|
||||
},
|
||||
)
|
||||
.await
|
||||
.expect_err("executor failure should surface");
|
||||
|
||||
assert_eq!(
|
||||
error,
|
||||
DynamicToolExecutionError::ExecutionFailed {
|
||||
tool: "demo_tool".to_string(),
|
||||
message: "dynamic tool failed".to_string(),
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn supervisor_runs_fallback_when_task_panics() {
|
||||
let (panic_seen_tx, panic_seen_rx) = oneshot::channel();
|
||||
|
||||
let result = super::supervise_spawned_task(
|
||||
async move {
|
||||
panic!("boom");
|
||||
#[allow(unreachable_code)]
|
||||
Ok(())
|
||||
},
|
||||
move |join_err: tokio::task::JoinError| async move {
|
||||
assert!(join_err.is_panic());
|
||||
panic_seen_tx
|
||||
.send(())
|
||||
.expect("panic notification should send");
|
||||
Ok(())
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(result, Ok(()));
|
||||
panic_seen_rx
|
||||
.await
|
||||
.expect("panic fallback should be invoked");
|
||||
}
|
||||
}
|
||||
@@ -89,6 +89,7 @@ pub mod custom_terminal;
|
||||
mod cwd_prompt;
|
||||
mod debug_config;
|
||||
mod diff_render;
|
||||
mod dynamic_tools;
|
||||
mod exec_cell;
|
||||
mod exec_command;
|
||||
mod external_editor;
|
||||
|
||||
Reference in New Issue
Block a user