mirror of
https://github.com/openai/codex.git
synced 2026-04-26 07:35:29 +00:00
feat(app-server): add ThreadItem::DynamicToolCall (#12732)
Previously, clients would call `thread/start` with dynamic_tools set, and when a model invokes a dynamic tool, it would just make the server->client `item/tool/call` request and wait for the client's response to complete the tool call. This works, but it doesn't have an `item/started` or `item/completed` event. Now we are doing this: - [new] emit `item/started` with `DynamicToolCall` populated with the call arguments - send an `item/tool/call` server request - [new] once the client responds, emit `item/completed` with `DynamicToolCall` populated with the response. Also, with `persistExtendedHistory: true`, dynamic tool calls are now reconstructable in `thread/read` and `thread/resume` as `ThreadItem::DynamicToolCall`.
This commit is contained in:
@@ -7,10 +7,15 @@ use app_test_support::to_response;
|
||||
use codex_app_server_protocol::DynamicToolCallOutputContentItem;
|
||||
use codex_app_server_protocol::DynamicToolCallParams;
|
||||
use codex_app_server_protocol::DynamicToolCallResponse;
|
||||
use codex_app_server_protocol::DynamicToolCallStatus;
|
||||
use codex_app_server_protocol::DynamicToolSpec;
|
||||
use codex_app_server_protocol::ItemCompletedNotification;
|
||||
use codex_app_server_protocol::ItemStartedNotification;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_app_server_protocol::ThreadItem;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::TurnStartParams;
|
||||
@@ -163,11 +168,12 @@ async fn dynamic_tool_call_round_trip_sends_text_content_items_to_model() -> Res
|
||||
)
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
|
||||
let thread_id = thread.id.clone();
|
||||
|
||||
// Start a turn so the tool call is emitted.
|
||||
let turn_req = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id.clone(),
|
||||
thread_id: thread_id.clone(),
|
||||
input: vec![V2UserInput::Text {
|
||||
text: "Run the tool".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
@@ -181,6 +187,30 @@ async fn dynamic_tool_call_round_trip_sends_text_content_items_to_model() -> Res
|
||||
)
|
||||
.await??;
|
||||
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
|
||||
let turn_id = turn.id.clone();
|
||||
|
||||
let started = wait_for_dynamic_tool_started(&mut mcp, call_id).await?;
|
||||
assert_eq!(started.thread_id, thread_id);
|
||||
assert_eq!(started.turn_id, turn_id.clone());
|
||||
let ThreadItem::DynamicToolCall {
|
||||
id,
|
||||
tool,
|
||||
arguments,
|
||||
status,
|
||||
content_items,
|
||||
success,
|
||||
duration_ms,
|
||||
} = started.item
|
||||
else {
|
||||
panic!("expected dynamic tool call item");
|
||||
};
|
||||
assert_eq!(id, call_id);
|
||||
assert_eq!(tool, tool_name);
|
||||
assert_eq!(arguments, tool_args);
|
||||
assert_eq!(status, DynamicToolCallStatus::InProgress);
|
||||
assert_eq!(content_items, None);
|
||||
assert_eq!(success, None);
|
||||
assert_eq!(duration_ms, None);
|
||||
|
||||
// Read the tool call request from the app server.
|
||||
let request = timeout(
|
||||
@@ -194,8 +224,8 @@ async fn dynamic_tool_call_round_trip_sends_text_content_items_to_model() -> Res
|
||||
};
|
||||
|
||||
let expected = DynamicToolCallParams {
|
||||
thread_id: thread.id,
|
||||
turn_id: turn.id,
|
||||
thread_id: thread_id.clone(),
|
||||
turn_id: turn_id.clone(),
|
||||
call_id: call_id.to_string(),
|
||||
tool: tool_name.to_string(),
|
||||
arguments: tool_args.clone(),
|
||||
@@ -212,6 +242,34 @@ async fn dynamic_tool_call_round_trip_sends_text_content_items_to_model() -> Res
|
||||
mcp.send_response(request_id, serde_json::to_value(response)?)
|
||||
.await?;
|
||||
|
||||
let completed = wait_for_dynamic_tool_completed(&mut mcp, call_id).await?;
|
||||
assert_eq!(completed.thread_id, thread_id);
|
||||
assert_eq!(completed.turn_id, turn_id);
|
||||
let ThreadItem::DynamicToolCall {
|
||||
id,
|
||||
tool,
|
||||
arguments,
|
||||
status,
|
||||
content_items,
|
||||
success,
|
||||
duration_ms,
|
||||
} = completed.item
|
||||
else {
|
||||
panic!("expected dynamic tool call item");
|
||||
};
|
||||
assert_eq!(id, call_id);
|
||||
assert_eq!(tool, tool_name);
|
||||
assert_eq!(arguments, tool_args);
|
||||
assert_eq!(status, DynamicToolCallStatus::Completed);
|
||||
assert_eq!(
|
||||
content_items,
|
||||
Some(vec![DynamicToolCallOutputContentItem::InputText {
|
||||
text: "dynamic-ok".to_string(),
|
||||
}])
|
||||
);
|
||||
assert_eq!(success, Some(true));
|
||||
assert!(duration_ms.is_some());
|
||||
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("turn/completed"),
|
||||
@@ -282,10 +340,11 @@ async fn dynamic_tool_call_round_trip_sends_content_items_to_model() -> Result<(
|
||||
)
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
|
||||
let thread_id = thread.id.clone();
|
||||
|
||||
let turn_req = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id.clone(),
|
||||
thread_id: thread_id.clone(),
|
||||
input: vec![V2UserInput::Text {
|
||||
text: "Run the tool".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
@@ -299,6 +358,11 @@ async fn dynamic_tool_call_round_trip_sends_content_items_to_model() -> Result<(
|
||||
)
|
||||
.await??;
|
||||
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
|
||||
let turn_id = turn.id.clone();
|
||||
|
||||
let started = wait_for_dynamic_tool_started(&mut mcp, call_id).await?;
|
||||
assert_eq!(started.thread_id, thread_id.clone());
|
||||
assert_eq!(started.turn_id, turn_id.clone());
|
||||
|
||||
let request = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
@@ -311,8 +375,8 @@ async fn dynamic_tool_call_round_trip_sends_content_items_to_model() -> Result<(
|
||||
};
|
||||
|
||||
let expected = DynamicToolCallParams {
|
||||
thread_id: thread.id,
|
||||
turn_id: turn.id,
|
||||
thread_id,
|
||||
turn_id: turn_id.clone(),
|
||||
call_id: call_id.to_string(),
|
||||
tool: tool_name.to_string(),
|
||||
arguments: tool_args,
|
||||
@@ -346,6 +410,32 @@ async fn dynamic_tool_call_round_trip_sends_content_items_to_model() -> Result<(
|
||||
mcp.send_response(request_id, serde_json::to_value(response)?)
|
||||
.await?;
|
||||
|
||||
let completed = wait_for_dynamic_tool_completed(&mut mcp, call_id).await?;
|
||||
assert_eq!(completed.thread_id, expected.thread_id.clone());
|
||||
assert_eq!(completed.turn_id, turn_id);
|
||||
let ThreadItem::DynamicToolCall {
|
||||
status,
|
||||
content_items: completed_content_items,
|
||||
success,
|
||||
..
|
||||
} = completed.item
|
||||
else {
|
||||
panic!("expected dynamic tool call item");
|
||||
};
|
||||
assert_eq!(status, DynamicToolCallStatus::Completed);
|
||||
assert_eq!(
|
||||
completed_content_items,
|
||||
Some(vec![
|
||||
DynamicToolCallOutputContentItem::InputText {
|
||||
text: "dynamic-ok".to_string(),
|
||||
},
|
||||
DynamicToolCallOutputContentItem::InputImage {
|
||||
image_url: "data:image/png;base64,AAA".to_string(),
|
||||
},
|
||||
])
|
||||
);
|
||||
assert_eq!(success, Some(true));
|
||||
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("turn/completed"),
|
||||
@@ -432,6 +522,46 @@ fn function_call_output_raw_output(body: &Value, call_id: &str) -> Option<Value>
|
||||
.cloned()
|
||||
}
|
||||
|
||||
async fn wait_for_dynamic_tool_started(
|
||||
mcp: &mut McpProcess,
|
||||
call_id: &str,
|
||||
) -> Result<ItemStartedNotification> {
|
||||
loop {
|
||||
let notification: JSONRPCNotification = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("item/started"),
|
||||
)
|
||||
.await??;
|
||||
let Some(params) = notification.params else {
|
||||
continue;
|
||||
};
|
||||
let started: ItemStartedNotification = serde_json::from_value(params)?;
|
||||
if matches!(&started.item, ThreadItem::DynamicToolCall { id, .. } if id == call_id) {
|
||||
return Ok(started);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn wait_for_dynamic_tool_completed(
|
||||
mcp: &mut McpProcess,
|
||||
call_id: &str,
|
||||
) -> Result<ItemCompletedNotification> {
|
||||
loop {
|
||||
let notification: JSONRPCNotification = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("item/completed"),
|
||||
)
|
||||
.await??;
|
||||
let Some(params) = notification.params else {
|
||||
continue;
|
||||
};
|
||||
let completed: ItemCompletedNotification = serde_json::from_value(params)?;
|
||||
if matches!(&completed.item, ThreadItem::DynamicToolCall { id, .. } if id == call_id) {
|
||||
return Ok(completed);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
|
||||
let config_toml = codex_home.join("config.toml");
|
||||
std::fs::write(
|
||||
|
||||
Reference in New Issue
Block a user