mirror of
https://github.com/openai/codex.git
synced 2026-05-15 00:32:51 +00:00
Compare commits
1 Commits
pr20375
...
dev/vasili
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
687543f401 |
@@ -2694,6 +2694,27 @@ export type Config = { stableField: Keep, unstableField: string | null } & ({ [k
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn experimental_type_fields_ts_filter_removes_turn_start_params_fields() -> Result<()> {
|
||||
let registered_fields = experimental_fields();
|
||||
assert!(registered_fields.iter().any(|field| {
|
||||
field.type_name == "TurnStartParams" && field.field_name == "responsesapiClientMetadata"
|
||||
}));
|
||||
let turn_start_ts = v2::TurnStartParams::export_to_string()?;
|
||||
assert!(type_body_brace_span(&turn_start_ts).is_some());
|
||||
let mut tree = BTreeMap::from([(PathBuf::from("v2/TurnStartParams.ts"), turn_start_ts)]);
|
||||
filter_experimental_ts_tree(&mut tree)?;
|
||||
|
||||
let filtered = tree
|
||||
.get(Path::new("v2/TurnStartParams.ts"))
|
||||
.context("missing filtered TurnStartParams.ts")?;
|
||||
assert_eq!(filtered.contains("responsesapiClientMetadata"), false);
|
||||
assert_eq!(filtered.contains("prefetchedToolResults"), false);
|
||||
assert_eq!(filtered.contains("environments"), false);
|
||||
assert_eq!(filtered.contains("cwd"), true);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stable_schema_filter_removes_mock_experimental_method() -> Result<()> {
|
||||
let output_dir = std::env::temp_dir().join(format!("codex_schema_{}", Uuid::now_v7()));
|
||||
|
||||
@@ -4810,6 +4810,11 @@ pub struct TurnStartParams {
|
||||
#[experimental("turn/start.responsesapiClientMetadata")]
|
||||
#[ts(optional = nullable)]
|
||||
pub responsesapi_client_metadata: Option<HashMap<String, String>>,
|
||||
/// EXPERIMENTAL - Raw Responses API `function_call` and matching
|
||||
/// `function_call_output` items to place before the user input for this turn.
|
||||
#[experimental("turn/start.prefetchedToolResults")]
|
||||
#[ts(optional = nullable)]
|
||||
pub prefetched_tool_results: Option<Vec<JsonValue>>,
|
||||
/// Optional turn-scoped environment selections.
|
||||
#[experimental("turn/start.environments")]
|
||||
#[ts(optional = nullable)]
|
||||
@@ -10018,6 +10023,7 @@ mod tests {
|
||||
thread_id: "thread_123".to_string(),
|
||||
input: vec![],
|
||||
responsesapi_client_metadata: None,
|
||||
prefetched_tool_results: None,
|
||||
environments: None,
|
||||
cwd: None,
|
||||
approval_policy: None,
|
||||
|
||||
@@ -309,6 +309,7 @@ use codex_protocol::dynamic_tools::DynamicToolSpec as CoreDynamicToolSpec;
|
||||
use codex_protocol::error::CodexErr;
|
||||
use codex_protocol::error::Result as CodexResult;
|
||||
use codex_protocol::items::TurnItem;
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::AgentStatus;
|
||||
use codex_protocol::protocol::ConversationAudioParams;
|
||||
@@ -6108,6 +6109,101 @@ impl CodexMessageProcessor {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn invalid_prefetched_tool_results_error(message: String) -> JSONRPCErrorError {
|
||||
JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message,
|
||||
data: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_prefetched_tool_results(
|
||||
values: Option<Vec<serde_json::Value>>,
|
||||
) -> Result<Vec<ResponseInputItem>, JSONRPCErrorError> {
|
||||
let Some(values) = values else {
|
||||
return Ok(Vec::new());
|
||||
};
|
||||
|
||||
let mut parsed = Vec::with_capacity(values.len());
|
||||
let mut calls = HashMap::<String, usize>::new();
|
||||
let mut outputs = HashMap::<String, usize>::new();
|
||||
|
||||
for (index, value) in values.into_iter().enumerate() {
|
||||
let item = serde_json::from_value::<ResponseItem>(value).map_err(|err| {
|
||||
Self::invalid_prefetched_tool_results_error(format!(
|
||||
"prefetchedToolResults[{index}] is not a valid response item: {err}"
|
||||
))
|
||||
})?;
|
||||
|
||||
match item {
|
||||
ResponseItem::FunctionCall {
|
||||
name,
|
||||
namespace,
|
||||
arguments,
|
||||
call_id,
|
||||
..
|
||||
} => {
|
||||
if call_id.is_empty() {
|
||||
return Err(Self::invalid_prefetched_tool_results_error(format!(
|
||||
"prefetchedToolResults[{index}] function_call call_id must not be empty"
|
||||
)));
|
||||
}
|
||||
if calls.insert(call_id.clone(), index).is_some() {
|
||||
return Err(Self::invalid_prefetched_tool_results_error(format!(
|
||||
"prefetchedToolResults contains duplicate function_call call_id `{call_id}`"
|
||||
)));
|
||||
}
|
||||
parsed.push(ResponseInputItem::FunctionCall {
|
||||
name,
|
||||
namespace,
|
||||
arguments,
|
||||
call_id,
|
||||
});
|
||||
}
|
||||
ResponseItem::FunctionCallOutput { call_id, output } => {
|
||||
if call_id.is_empty() {
|
||||
return Err(Self::invalid_prefetched_tool_results_error(format!(
|
||||
"prefetchedToolResults[{index}] function_call_output call_id must not be empty"
|
||||
)));
|
||||
}
|
||||
if outputs.insert(call_id.clone(), index).is_some() {
|
||||
return Err(Self::invalid_prefetched_tool_results_error(format!(
|
||||
"prefetchedToolResults contains duplicate function_call_output call_id `{call_id}`"
|
||||
)));
|
||||
}
|
||||
parsed.push(ResponseInputItem::FunctionCallOutput { call_id, output });
|
||||
}
|
||||
_ => {
|
||||
return Err(Self::invalid_prefetched_tool_results_error(format!(
|
||||
"prefetchedToolResults[{index}] must be a function_call or function_call_output response item"
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (call_id, output_index) in &outputs {
|
||||
let Some(call_index) = calls.get(call_id) else {
|
||||
return Err(Self::invalid_prefetched_tool_results_error(format!(
|
||||
"prefetchedToolResults contains function_call_output for missing call_id `{call_id}`"
|
||||
)));
|
||||
};
|
||||
if output_index < call_index {
|
||||
return Err(Self::invalid_prefetched_tool_results_error(format!(
|
||||
"prefetchedToolResults function_call_output for call_id `{call_id}` must appear after its function_call"
|
||||
)));
|
||||
}
|
||||
}
|
||||
for call_id in calls.keys() {
|
||||
if !outputs.contains_key(call_id) {
|
||||
return Err(Self::invalid_prefetched_tool_results_error(format!(
|
||||
"prefetchedToolResults contains function_call without function_call_output for call_id `{call_id}`"
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(parsed)
|
||||
}
|
||||
|
||||
async fn send_internal_error(&self, request_id: ConnectionRequestId, message: String) {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
@@ -6783,6 +6879,29 @@ impl CodexMessageProcessor {
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
let prefetched_tool_results =
|
||||
match Self::parse_prefetched_tool_results(params.prefetched_tool_results) {
|
||||
Ok(items) => items,
|
||||
Err(error) => {
|
||||
self.track_error_response(&request_id, &error, /*error_type*/ None);
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
if params.input.is_empty() && prefetched_tool_results.is_empty() {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: "input or prefetchedToolResults must not be empty".to_string(),
|
||||
data: None,
|
||||
};
|
||||
self.track_error_response(
|
||||
&request_id,
|
||||
&error,
|
||||
Some(AnalyticsJsonRpcError::Input(InputError::Empty)),
|
||||
);
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
let (_, thread) = match self.load_thread(¶ms.thread_id).await {
|
||||
Ok(v) => v,
|
||||
Err(error) => {
|
||||
@@ -6894,6 +7013,7 @@ impl CodexMessageProcessor {
|
||||
let turn_op = if has_any_overrides {
|
||||
Op::UserInputWithTurnContext {
|
||||
items: mapped_items,
|
||||
prefetched_tool_results,
|
||||
environments,
|
||||
final_output_json_schema: params.output_schema,
|
||||
responsesapi_client_metadata: params.responsesapi_client_metadata,
|
||||
@@ -6910,13 +7030,21 @@ impl CodexMessageProcessor {
|
||||
collaboration_mode,
|
||||
personality,
|
||||
}
|
||||
} else {
|
||||
} else if prefetched_tool_results.is_empty() {
|
||||
Op::UserInput {
|
||||
items: mapped_items,
|
||||
environments,
|
||||
final_output_json_schema: params.output_schema,
|
||||
responsesapi_client_metadata: params.responsesapi_client_metadata,
|
||||
}
|
||||
} else {
|
||||
Op::UserInputWithPrefetchedToolResults {
|
||||
items: mapped_items,
|
||||
prefetched_tool_results,
|
||||
environments,
|
||||
final_output_json_schema: params.output_schema,
|
||||
responsesapi_client_metadata: params.responsesapi_client_metadata,
|
||||
}
|
||||
};
|
||||
let turn_id = self
|
||||
.submit_core_op(&request_id, thread.as_ref(), turn_op)
|
||||
@@ -7055,6 +7183,20 @@ impl CodexMessageProcessor {
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
if params.input.is_empty() {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: "input must not be empty".to_string(),
|
||||
data: None,
|
||||
};
|
||||
self.track_error_response(
|
||||
&request_id,
|
||||
&error,
|
||||
Some(AnalyticsJsonRpcError::Input(InputError::Empty)),
|
||||
);
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
|
||||
let mapped_items: Vec<CoreInputItem> = params
|
||||
.input
|
||||
@@ -7062,10 +7204,11 @@ impl CodexMessageProcessor {
|
||||
.map(V2UserInput::into_core)
|
||||
.collect();
|
||||
|
||||
let expected_turn_id = params.expected_turn_id;
|
||||
match thread
|
||||
.steer_input(
|
||||
mapped_items,
|
||||
Some(¶ms.expected_turn_id),
|
||||
Some(&expected_turn_id),
|
||||
params.responsesapi_client_metadata,
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -48,6 +48,7 @@ use opentelemetry_sdk::trace::SpanData;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serial_test::serial;
|
||||
use std::collections::BTreeMap;
|
||||
use std::future::Future;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::sync::OnceLock;
|
||||
@@ -57,6 +58,7 @@ use tracing_subscriber::layer::SubscriberExt;
|
||||
use wiremock::MockServer;
|
||||
|
||||
const TEST_CONNECTION_ID: ConnectionId = ConnectionId(7);
|
||||
const TRACING_TEST_STACK_SIZE_BYTES: usize = 4 * 1024 * 1024;
|
||||
|
||||
struct TestTracing {
|
||||
exporter: InMemorySpanExporter,
|
||||
@@ -69,6 +71,27 @@ struct RemoteTrace {
|
||||
context: W3cTraceContext,
|
||||
}
|
||||
|
||||
fn run_tracing_test<F, Fut>(name: &'static str, test: F) -> Result<()>
|
||||
where
|
||||
F: FnOnce() -> Fut + Send + 'static,
|
||||
Fut: Future<Output = Result<()>> + 'static,
|
||||
{
|
||||
let handle = std::thread::Builder::new()
|
||||
.name(name.to_string())
|
||||
.stack_size(TRACING_TEST_STACK_SIZE_BYTES)
|
||||
.spawn(move || -> Result<()> {
|
||||
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()?;
|
||||
runtime.block_on(Box::pin(test()))
|
||||
})?;
|
||||
|
||||
match handle.join() {
|
||||
Ok(result) => result,
|
||||
Err(_) => Err(anyhow::anyhow!("{name} thread panicked")),
|
||||
}
|
||||
}
|
||||
|
||||
impl RemoteTrace {
|
||||
fn new(trace_id: &str, parent_span_id: &str) -> Self {
|
||||
let trace_id = TraceId::from_hex(trace_id).expect("trace id");
|
||||
@@ -568,9 +591,16 @@ where
|
||||
spans.into_iter().skip(baseline_len).collect()
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
#[test]
|
||||
#[serial(app_server_tracing)]
|
||||
async fn thread_start_jsonrpc_span_exports_server_span_and_parents_children() -> Result<()> {
|
||||
fn thread_start_jsonrpc_span_exports_server_span_and_parents_children() -> Result<()> {
|
||||
run_tracing_test(
|
||||
"thread_start_jsonrpc_span_exports_server_span_and_parents_children",
|
||||
thread_start_jsonrpc_span_exports_server_span_and_parents_children_impl,
|
||||
)
|
||||
}
|
||||
|
||||
async fn thread_start_jsonrpc_span_exports_server_span_and_parents_children_impl() -> Result<()> {
|
||||
let mut harness = TracingHarness::new().await?;
|
||||
|
||||
let RemoteTrace {
|
||||
@@ -688,9 +718,16 @@ async fn remote_control_origin_rejects_device_key_requests() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
#[test]
|
||||
#[serial(app_server_tracing)]
|
||||
async fn turn_start_jsonrpc_span_parents_core_turn_spans() -> Result<()> {
|
||||
fn turn_start_jsonrpc_span_parents_core_turn_spans() -> Result<()> {
|
||||
run_tracing_test(
|
||||
"turn_start_jsonrpc_span_parents_core_turn_spans",
|
||||
turn_start_jsonrpc_span_parents_core_turn_spans_impl,
|
||||
)
|
||||
}
|
||||
|
||||
async fn turn_start_jsonrpc_span_parents_core_turn_spans_impl() -> Result<()> {
|
||||
let mut harness = TracingHarness::new().await?;
|
||||
let thread_start_response = harness.start_thread(/*request_id*/ 2, /*trace*/ None).await;
|
||||
let thread_id = thread_start_response.thread.id.clone();
|
||||
@@ -714,6 +751,7 @@ async fn turn_start_jsonrpc_span_parents_core_turn_spans() -> Result<()> {
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
responsesapi_client_metadata: None,
|
||||
prefetched_tool_results: None,
|
||||
cwd: None,
|
||||
approval_policy: None,
|
||||
sandbox_policy: None,
|
||||
|
||||
@@ -19,6 +19,7 @@ use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_protocol::protocol::RealtimeOutputModality;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
use tempfile::TempDir;
|
||||
@@ -280,6 +281,55 @@ async fn thread_start_granular_approval_policy_requires_experimental_api_capabil
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_start_prefetched_tool_results_requires_experimental_api_capability() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
|
||||
let init = mcp
|
||||
.initialize_with_capabilities(
|
||||
default_client_info(),
|
||||
Some(InitializeCapabilities {
|
||||
experimental_api: false,
|
||||
opt_out_notification_methods: None,
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
let JSONRPCMessage::Response(_) = init else {
|
||||
anyhow::bail!("expected initialize response, got {init:?}");
|
||||
};
|
||||
|
||||
let request_id = mcp
|
||||
.send_raw_request(
|
||||
"turn/start",
|
||||
Some(json!({
|
||||
"threadId": "thr_123",
|
||||
"input": [],
|
||||
"prefetchedToolResults": [
|
||||
{
|
||||
"type": "function_call",
|
||||
"name": "lookup",
|
||||
"arguments": "{}",
|
||||
"call_id": "prefetch-call-1"
|
||||
},
|
||||
{
|
||||
"type": "function_call_output",
|
||||
"call_id": "prefetch-call-1",
|
||||
"output": "cached result"
|
||||
}
|
||||
]
|
||||
})),
|
||||
)
|
||||
.await?;
|
||||
let error = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
|
||||
)
|
||||
.await??;
|
||||
assert_experimental_capability_error(error, "turn/start.prefetchedToolResults");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn default_client_info() -> ClientInfo {
|
||||
ClientInfo {
|
||||
name: DEFAULT_CLIENT_NAME.to_string(),
|
||||
|
||||
@@ -254,6 +254,161 @@ async fn turn_start_emits_user_message_item_with_text_elements() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_start_prefetched_tool_results_precede_user_input() -> Result<()> {
|
||||
let server = responses::start_mock_server().await;
|
||||
let body = responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_assistant_message("msg-1", "Done"),
|
||||
responses::ev_completed("resp-1"),
|
||||
]);
|
||||
let response_mock = responses::mount_sse_once(&server, body).await;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(
|
||||
codex_home.path(),
|
||||
&server.uri(),
|
||||
"never",
|
||||
&BTreeMap::from([(Feature::Personality, true)]),
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let thread_req = mcp
|
||||
.send_thread_start_request(ThreadStartParams {
|
||||
model: Some("mock-model".to_string()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let thread_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
|
||||
|
||||
let turn_req = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id,
|
||||
input: vec![V2UserInput::Text {
|
||||
text: "Use the cached result".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
prefetched_tool_results: Some(vec![
|
||||
json!({
|
||||
"type": "function_call",
|
||||
"name": "lookup",
|
||||
"arguments": "{\"query\":\"cached\"}",
|
||||
"call_id": "prefetch-call-1"
|
||||
}),
|
||||
json!({
|
||||
"type": "function_call_output",
|
||||
"call_id": "prefetch-call-1",
|
||||
"output": "cached result"
|
||||
}),
|
||||
]),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let turn_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
|
||||
)
|
||||
.await??;
|
||||
let _turn: TurnStartResponse = to_response::<TurnStartResponse>(turn_resp)?;
|
||||
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("turn/completed"),
|
||||
)
|
||||
.await??;
|
||||
|
||||
let payload = response_mock.single_request().body_json();
|
||||
let input = payload["input"].as_array().expect("model input array");
|
||||
let function_call_index = input
|
||||
.iter()
|
||||
.position(|item| {
|
||||
item.get("type").and_then(serde_json::Value::as_str) == Some("function_call")
|
||||
&& item.get("call_id").and_then(serde_json::Value::as_str)
|
||||
== Some("prefetch-call-1")
|
||||
})
|
||||
.expect("prefetched function_call in model input");
|
||||
let output_index = input
|
||||
.iter()
|
||||
.position(|item| {
|
||||
item.get("type").and_then(serde_json::Value::as_str) == Some("function_call_output")
|
||||
&& item.get("call_id").and_then(serde_json::Value::as_str)
|
||||
== Some("prefetch-call-1")
|
||||
})
|
||||
.expect("prefetched function_call_output in model input");
|
||||
let user_index = input
|
||||
.iter()
|
||||
.position(|item| {
|
||||
item.get("role").and_then(serde_json::Value::as_str) == Some("user")
|
||||
&& item
|
||||
.get("content")
|
||||
.and_then(serde_json::Value::as_array)
|
||||
.is_some_and(|content| {
|
||||
content.iter().any(|part| {
|
||||
part.get("type").and_then(serde_json::Value::as_str)
|
||||
== Some("input_text")
|
||||
&& part.get("text").and_then(serde_json::Value::as_str)
|
||||
== Some("Use the cached result")
|
||||
})
|
||||
})
|
||||
})
|
||||
.expect("user input in model input");
|
||||
|
||||
assert_eq!(
|
||||
input[function_call_index]["arguments"],
|
||||
"{\"query\":\"cached\"}"
|
||||
);
|
||||
assert!(function_call_index < output_index);
|
||||
assert!(output_index < user_index);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_start_rejects_unpaired_prefetched_tool_results() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let request_id = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: "thread-does-not-matter".to_string(),
|
||||
input: vec![V2UserInput::Text {
|
||||
text: "Hello".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
prefetched_tool_results: Some(vec![json!({
|
||||
"type": "function_call",
|
||||
"name": "lookup",
|
||||
"arguments": "{}",
|
||||
"call_id": "missing-output"
|
||||
})]),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
|
||||
let error: JSONRPCError = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
|
||||
)
|
||||
.await??;
|
||||
assert_eq!(error.error.code, -32600);
|
||||
assert!(
|
||||
error
|
||||
.error
|
||||
.message
|
||||
.contains("prefetchedToolResults contains function_call without function_call_output")
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_start_emits_thread_scoped_warning_notification_for_trimmed_skills() -> Result<()> {
|
||||
let responses = vec![create_final_assistant_message_sse_response("Done")?];
|
||||
@@ -1824,6 +1979,7 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
responsesapi_client_metadata: None,
|
||||
prefetched_tool_results: None,
|
||||
cwd: Some(first_cwd.clone()),
|
||||
approval_policy: Some(codex_app_server_protocol::AskForApproval::Never),
|
||||
approvals_reviewer: None,
|
||||
@@ -1866,6 +2022,7 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
responsesapi_client_metadata: None,
|
||||
prefetched_tool_results: None,
|
||||
cwd: Some(second_cwd.clone()),
|
||||
approval_policy: Some(codex_app_server_protocol::AskForApproval::Never),
|
||||
approvals_reviewer: None,
|
||||
|
||||
@@ -30,6 +30,7 @@ use crate::review_prompts::resolve_review_request;
|
||||
use crate::rollout::RolloutRecorder;
|
||||
use crate::rollout::read_session_meta_line;
|
||||
use crate::tasks::CompactTask;
|
||||
use crate::tasks::TurnTaskInput;
|
||||
use crate::tasks::UndoTask;
|
||||
use crate::tasks::UserShellCommandMode;
|
||||
use crate::tasks::UserShellCommandTask;
|
||||
@@ -129,119 +130,140 @@ pub(super) async fn user_input_or_turn_inner(
|
||||
op: Op,
|
||||
mirror_user_text_to_realtime: Option<()>,
|
||||
) {
|
||||
let (items, updates, responsesapi_client_metadata, environments) = match op {
|
||||
Op::UserTurn {
|
||||
cwd,
|
||||
approval_policy,
|
||||
approvals_reviewer,
|
||||
sandbox_policy,
|
||||
model,
|
||||
effort,
|
||||
summary,
|
||||
service_tier,
|
||||
final_output_json_schema,
|
||||
items,
|
||||
collaboration_mode,
|
||||
personality,
|
||||
environments,
|
||||
} => {
|
||||
let collaboration_mode = collaboration_mode.or_else(|| {
|
||||
Some(CollaborationMode {
|
||||
mode: ModeKind::Default,
|
||||
settings: Settings {
|
||||
model: model.clone(),
|
||||
reasoning_effort: effort,
|
||||
developer_instructions: None,
|
||||
},
|
||||
})
|
||||
});
|
||||
(
|
||||
let (items, prefetched_tool_results, updates, responsesapi_client_metadata, environments) =
|
||||
match op {
|
||||
Op::UserTurn {
|
||||
cwd,
|
||||
approval_policy,
|
||||
approvals_reviewer,
|
||||
sandbox_policy,
|
||||
model,
|
||||
effort,
|
||||
summary,
|
||||
service_tier,
|
||||
final_output_json_schema,
|
||||
items,
|
||||
SessionSettingsUpdate {
|
||||
cwd: Some(cwd),
|
||||
approval_policy: Some(approval_policy),
|
||||
approvals_reviewer,
|
||||
sandbox_policy: Some(sandbox_policy),
|
||||
permission_profile: None,
|
||||
windows_sandbox_level: None,
|
||||
collaboration_mode,
|
||||
reasoning_summary: summary,
|
||||
service_tier,
|
||||
final_output_json_schema: Some(final_output_json_schema),
|
||||
personality,
|
||||
app_server_client_name: None,
|
||||
app_server_client_version: None,
|
||||
},
|
||||
None,
|
||||
collaboration_mode,
|
||||
personality,
|
||||
environments,
|
||||
)
|
||||
}
|
||||
Op::UserInputWithTurnContext {
|
||||
cwd,
|
||||
approval_policy,
|
||||
approvals_reviewer,
|
||||
sandbox_policy,
|
||||
permission_profile,
|
||||
windows_sandbox_level,
|
||||
model,
|
||||
effort,
|
||||
summary,
|
||||
service_tier,
|
||||
final_output_json_schema,
|
||||
items,
|
||||
responsesapi_client_metadata,
|
||||
collaboration_mode,
|
||||
personality,
|
||||
environments,
|
||||
} => {
|
||||
let collaboration_mode = if let Some(collab_mode) = collaboration_mode {
|
||||
Some(collab_mode)
|
||||
} else {
|
||||
let state = sess.state.lock().await;
|
||||
Some(
|
||||
state
|
||||
.session_configuration
|
||||
.collaboration_mode
|
||||
.with_updates(model, effort, /*developer_instructions*/ None),
|
||||
} => {
|
||||
let collaboration_mode = collaboration_mode.or_else(|| {
|
||||
Some(CollaborationMode {
|
||||
mode: ModeKind::Default,
|
||||
settings: Settings {
|
||||
model: model.clone(),
|
||||
reasoning_effort: effort,
|
||||
developer_instructions: None,
|
||||
},
|
||||
})
|
||||
});
|
||||
(
|
||||
items,
|
||||
Vec::new(),
|
||||
SessionSettingsUpdate {
|
||||
cwd: Some(cwd),
|
||||
approval_policy: Some(approval_policy),
|
||||
approvals_reviewer,
|
||||
sandbox_policy: Some(sandbox_policy),
|
||||
permission_profile: None,
|
||||
windows_sandbox_level: None,
|
||||
collaboration_mode,
|
||||
reasoning_summary: summary,
|
||||
service_tier,
|
||||
final_output_json_schema: Some(final_output_json_schema),
|
||||
personality,
|
||||
app_server_client_name: None,
|
||||
app_server_client_version: None,
|
||||
},
|
||||
None,
|
||||
environments,
|
||||
)
|
||||
};
|
||||
(
|
||||
}
|
||||
Op::UserInputWithTurnContext {
|
||||
cwd,
|
||||
approval_policy,
|
||||
approvals_reviewer,
|
||||
sandbox_policy,
|
||||
permission_profile,
|
||||
windows_sandbox_level,
|
||||
model,
|
||||
effort,
|
||||
summary,
|
||||
service_tier,
|
||||
final_output_json_schema,
|
||||
items,
|
||||
prefetched_tool_results,
|
||||
responsesapi_client_metadata,
|
||||
collaboration_mode,
|
||||
personality,
|
||||
environments,
|
||||
} => {
|
||||
let collaboration_mode = if let Some(collab_mode) = collaboration_mode {
|
||||
Some(collab_mode)
|
||||
} else {
|
||||
let state = sess.state.lock().await;
|
||||
Some(
|
||||
state
|
||||
.session_configuration
|
||||
.collaboration_mode
|
||||
.with_updates(model, effort, /*developer_instructions*/ None),
|
||||
)
|
||||
};
|
||||
(
|
||||
items,
|
||||
prefetched_tool_results,
|
||||
SessionSettingsUpdate {
|
||||
cwd,
|
||||
approval_policy,
|
||||
approvals_reviewer,
|
||||
sandbox_policy,
|
||||
permission_profile,
|
||||
windows_sandbox_level,
|
||||
collaboration_mode,
|
||||
reasoning_summary: summary,
|
||||
service_tier,
|
||||
final_output_json_schema: Some(final_output_json_schema),
|
||||
personality,
|
||||
app_server_client_name: None,
|
||||
app_server_client_version: None,
|
||||
},
|
||||
responsesapi_client_metadata,
|
||||
environments,
|
||||
)
|
||||
}
|
||||
Op::UserInput {
|
||||
items,
|
||||
environments,
|
||||
final_output_json_schema,
|
||||
responsesapi_client_metadata,
|
||||
} => (
|
||||
items,
|
||||
Vec::new(),
|
||||
SessionSettingsUpdate {
|
||||
cwd,
|
||||
approval_policy,
|
||||
approvals_reviewer,
|
||||
sandbox_policy,
|
||||
permission_profile,
|
||||
windows_sandbox_level,
|
||||
collaboration_mode,
|
||||
reasoning_summary: summary,
|
||||
service_tier,
|
||||
final_output_json_schema: Some(final_output_json_schema),
|
||||
personality,
|
||||
app_server_client_name: None,
|
||||
app_server_client_version: None,
|
||||
..Default::default()
|
||||
},
|
||||
responsesapi_client_metadata,
|
||||
environments,
|
||||
)
|
||||
}
|
||||
Op::UserInput {
|
||||
items,
|
||||
environments,
|
||||
final_output_json_schema,
|
||||
responsesapi_client_metadata,
|
||||
} => (
|
||||
items,
|
||||
SessionSettingsUpdate {
|
||||
final_output_json_schema: Some(final_output_json_schema),
|
||||
..Default::default()
|
||||
},
|
||||
responsesapi_client_metadata,
|
||||
environments,
|
||||
),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
),
|
||||
Op::UserInputWithPrefetchedToolResults {
|
||||
items,
|
||||
prefetched_tool_results,
|
||||
environments,
|
||||
final_output_json_schema,
|
||||
responsesapi_client_metadata,
|
||||
} => (
|
||||
items,
|
||||
prefetched_tool_results,
|
||||
SessionSettingsUpdate {
|
||||
final_output_json_schema: Some(final_output_json_schema),
|
||||
..Default::default()
|
||||
},
|
||||
responsesapi_client_metadata,
|
||||
environments,
|
||||
),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
let Ok(current_context) = sess
|
||||
.new_turn_with_sub_id(sub_id.clone(), updates, environments)
|
||||
@@ -252,14 +274,17 @@ pub(super) async fn user_input_or_turn_inner(
|
||||
};
|
||||
sess.maybe_emit_unknown_model_warning_for_turn(current_context.as_ref())
|
||||
.await;
|
||||
let accepted_items = match sess
|
||||
.steer_input(
|
||||
let steer_result = if prefetched_tool_results.is_empty() {
|
||||
sess.steer_input(
|
||||
items.clone(),
|
||||
/*expected_turn_id*/ None,
|
||||
responsesapi_client_metadata.clone(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
} else {
|
||||
Err(SteerInputError::NoActiveTurn(items.clone()))
|
||||
};
|
||||
let accepted_items = match steer_result {
|
||||
Ok(_) => {
|
||||
current_context.session_telemetry.user_prompt(&items);
|
||||
Some(items)
|
||||
@@ -276,7 +301,10 @@ pub(super) async fn user_input_or_turn_inner(
|
||||
let accepted_items = items.clone();
|
||||
sess.spawn_task(
|
||||
Arc::clone(¤t_context),
|
||||
items,
|
||||
TurnTaskInput {
|
||||
user_input: items,
|
||||
prefetched_tool_results,
|
||||
},
|
||||
crate::tasks::RegularTask::new(),
|
||||
)
|
||||
.await;
|
||||
@@ -1153,6 +1181,7 @@ pub(super) async fn submission_loop(
|
||||
false
|
||||
}
|
||||
Op::UserInput { .. }
|
||||
| Op::UserInputWithPrefetchedToolResults { .. }
|
||||
| Op::UserInputWithTurnContext { .. }
|
||||
| Op::UserTurn { .. } => {
|
||||
user_input_or_turn(&sess, sub.id.clone(), sub.op).await;
|
||||
|
||||
@@ -2839,7 +2839,7 @@ impl Session {
|
||||
.run(
|
||||
Arc::new(SessionTaskContext::new(self.clone())),
|
||||
turn_context.clone(),
|
||||
Vec::new(),
|
||||
crate::tasks::TurnTaskInput::default(),
|
||||
cancellation_token,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -49,6 +49,7 @@ use crate::rollout::recorder::RolloutRecorder;
|
||||
use crate::state::TaskKind;
|
||||
use crate::tasks::SessionTask;
|
||||
use crate::tasks::SessionTaskContext;
|
||||
use crate::tasks::TurnTaskInput;
|
||||
use crate::tasks::UserShellCommandMode;
|
||||
use crate::tasks::execute_user_shell_command;
|
||||
use crate::tools::ToolRouter;
|
||||
@@ -3790,6 +3791,7 @@ fn op_kind_distinguishes_turn_ops() {
|
||||
Op::UserInputWithTurnContext {
|
||||
environments: None,
|
||||
items: vec![],
|
||||
prefetched_tool_results: vec![],
|
||||
final_output_json_schema: None,
|
||||
responsesapi_client_metadata: None,
|
||||
cwd: None,
|
||||
@@ -3992,7 +3994,7 @@ async fn spawn_task_turn_span_inherits_dispatch_trace_context() {
|
||||
self: Arc<Self>,
|
||||
_session: Arc<SessionTaskContext>,
|
||||
_ctx: Arc<TurnContext>,
|
||||
_input: Vec<UserInput>,
|
||||
_input: TurnTaskInput,
|
||||
_cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
let mut trace = self
|
||||
@@ -5608,7 +5610,7 @@ impl SessionTask for NeverEndingTask {
|
||||
self: Arc<Self>,
|
||||
_session: Arc<SessionTaskContext>,
|
||||
_ctx: Arc<TurnContext>,
|
||||
_input: Vec<UserInput>,
|
||||
_input: TurnTaskInput,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
if self.listen_to_cancellation_token {
|
||||
@@ -5637,7 +5639,7 @@ impl SessionTask for GuardianDeniedApprovalTask {
|
||||
self: Arc<Self>,
|
||||
session: Arc<SessionTaskContext>,
|
||||
ctx: Arc<TurnContext>,
|
||||
_input: Vec<UserInput>,
|
||||
_input: TurnTaskInput,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
let session = session.clone_session();
|
||||
|
||||
@@ -48,6 +48,7 @@ use crate::stream_events_utils::last_assistant_message_from_item;
|
||||
use crate::stream_events_utils::mark_thread_memory_mode_polluted_if_external_context;
|
||||
use crate::stream_events_utils::raw_assistant_output_text_from_item;
|
||||
use crate::stream_events_utils::record_completed_response_item;
|
||||
use crate::tasks::TurnTaskInput;
|
||||
use crate::tools::ToolRouter;
|
||||
use crate::tools::context::SharedTurnDiffTracker;
|
||||
use crate::tools::parallel::ToolCallRuntime;
|
||||
@@ -137,11 +138,15 @@ use tracing::warn;
|
||||
pub(crate) async fn run_turn(
|
||||
sess: Arc<Session>,
|
||||
turn_context: Arc<TurnContext>,
|
||||
input: Vec<UserInput>,
|
||||
input: TurnTaskInput,
|
||||
prewarmed_client_session: Option<ModelClientSession>,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
if input.is_empty() && !sess.has_pending_input().await {
|
||||
let TurnTaskInput {
|
||||
user_input: input,
|
||||
prefetched_tool_results,
|
||||
} = input;
|
||||
if input.is_empty() && prefetched_tool_results.is_empty() && !sess.has_pending_input().await {
|
||||
return None;
|
||||
}
|
||||
|
||||
@@ -300,11 +305,13 @@ pub(crate) async fn run_turn(
|
||||
if run_pending_session_start_hooks(&sess, &turn_context).await {
|
||||
return None;
|
||||
}
|
||||
let prefetched_tool_result_items = prefetched_tool_results
|
||||
.into_iter()
|
||||
.map(ResponseItem::from)
|
||||
.collect::<Vec<_>>();
|
||||
let additional_contexts = if input.is_empty() {
|
||||
Vec::new()
|
||||
} else {
|
||||
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input.clone());
|
||||
let response_item: ResponseItem = initial_input_for_turn.clone().into();
|
||||
let user_prompt_submit_outcome = run_user_prompt_submit_hooks(
|
||||
&sess,
|
||||
&turn_context,
|
||||
@@ -320,10 +327,18 @@ pub(crate) async fn run_turn(
|
||||
.await;
|
||||
return None;
|
||||
}
|
||||
sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), &input, response_item)
|
||||
.await;
|
||||
user_prompt_submit_outcome.additional_contexts
|
||||
};
|
||||
if !prefetched_tool_result_items.is_empty() {
|
||||
sess.record_conversation_items(&turn_context, &prefetched_tool_result_items)
|
||||
.await;
|
||||
}
|
||||
if !input.is_empty() {
|
||||
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input.clone());
|
||||
let response_item: ResponseItem = initial_input_for_turn.into();
|
||||
sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), &input, response_item)
|
||||
.await;
|
||||
}
|
||||
sess.services
|
||||
.analytics_events_client
|
||||
.track_app_mentioned(tracking.clone(), mentioned_app_invocations);
|
||||
|
||||
@@ -2,9 +2,9 @@ use std::sync::Arc;
|
||||
|
||||
use super::SessionTask;
|
||||
use super::SessionTaskContext;
|
||||
use super::TurnTaskInput;
|
||||
use crate::session::turn_context::TurnContext;
|
||||
use crate::state::TaskKind;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
#[derive(Clone, Copy, Default)]
|
||||
@@ -23,7 +23,7 @@ impl SessionTask for CompactTask {
|
||||
self: Arc<Self>,
|
||||
session: Arc<SessionTaskContext>,
|
||||
ctx: Arc<TurnContext>,
|
||||
input: Vec<UserInput>,
|
||||
input: TurnTaskInput,
|
||||
_cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
let session = session.clone_session();
|
||||
@@ -40,7 +40,7 @@ impl SessionTask for CompactTask {
|
||||
/*inc*/ 1,
|
||||
&[("type", "local")],
|
||||
);
|
||||
crate::compact::run_compact_task(session.clone(), ctx, input).await
|
||||
crate::compact::run_compact_task(session.clone(), ctx, input.user_input).await
|
||||
};
|
||||
None
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use super::TurnTaskInput;
|
||||
use crate::session::turn_context::TurnContext;
|
||||
use crate::state::TaskKind;
|
||||
use crate::tasks::SessionTask;
|
||||
@@ -9,7 +10,6 @@ use codex_git_utils::create_ghost_commit_with_report;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::WarningEvent;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use codex_utils_readiness::Readiness;
|
||||
use codex_utils_readiness::Token;
|
||||
use std::sync::Arc;
|
||||
@@ -38,7 +38,7 @@ impl SessionTask for GhostSnapshotTask {
|
||||
self: Arc<Self>,
|
||||
session: Arc<SessionTaskContext>,
|
||||
ctx: Arc<TurnContext>,
|
||||
_input: Vec<UserInput>,
|
||||
_input: TurnTaskInput,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
tokio::task::spawn(async move {
|
||||
|
||||
@@ -61,6 +61,21 @@ pub(crate) use user_shell::execute_user_shell_command;
|
||||
|
||||
const GRACEFULL_INTERRUPTION_TIMEOUT_MS: u64 = 100;
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub(crate) struct TurnTaskInput {
|
||||
pub(crate) user_input: Vec<UserInput>,
|
||||
pub(crate) prefetched_tool_results: Vec<ResponseInputItem>,
|
||||
}
|
||||
|
||||
impl From<Vec<UserInput>> for TurnTaskInput {
|
||||
fn from(user_input: Vec<UserInput>) -> Self {
|
||||
Self {
|
||||
user_input,
|
||||
prefetched_tool_results: Vec::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Shared model-visible marker used by both the real interrupt path and
|
||||
/// interrupted fork snapshots.
|
||||
pub(crate) fn interrupted_turn_history_marker() -> ResponseItem {
|
||||
@@ -161,7 +176,7 @@ pub(crate) trait SessionTask: Send + Sync + 'static {
|
||||
self: Arc<Self>,
|
||||
session: Arc<SessionTaskContext>,
|
||||
ctx: Arc<TurnContext>,
|
||||
input: Vec<UserInput>,
|
||||
input: TurnTaskInput,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> impl std::future::Future<Output = Option<String>> + Send;
|
||||
|
||||
@@ -190,7 +205,7 @@ pub(crate) trait AnySessionTask: Send + Sync + 'static {
|
||||
self: Arc<Self>,
|
||||
session: Arc<SessionTaskContext>,
|
||||
ctx: Arc<TurnContext>,
|
||||
input: Vec<UserInput>,
|
||||
input: TurnTaskInput,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> BoxFuture<'static, Option<String>>;
|
||||
|
||||
@@ -217,7 +232,7 @@ where
|
||||
self: Arc<Self>,
|
||||
session: Arc<SessionTaskContext>,
|
||||
ctx: Arc<TurnContext>,
|
||||
input: Vec<UserInput>,
|
||||
input: TurnTaskInput,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> BoxFuture<'static, Option<String>> {
|
||||
Box::pin(SessionTask::run(
|
||||
@@ -242,9 +257,10 @@ impl Session {
|
||||
pub async fn spawn_task<T: SessionTask>(
|
||||
self: &Arc<Self>,
|
||||
turn_context: Arc<TurnContext>,
|
||||
input: Vec<UserInput>,
|
||||
input: impl Into<TurnTaskInput>,
|
||||
task: T,
|
||||
) {
|
||||
let input = input.into();
|
||||
self.abort_all_tasks(TurnAbortReason::Replaced).await;
|
||||
self.clear_connector_selection().await;
|
||||
self.start_task(turn_context, input, task).await;
|
||||
@@ -253,7 +269,7 @@ impl Session {
|
||||
async fn start_task<T: SessionTask>(
|
||||
self: &Arc<Self>,
|
||||
turn_context: Arc<TurnContext>,
|
||||
input: Vec<UserInput>,
|
||||
input: TurnTaskInput,
|
||||
task: T,
|
||||
) {
|
||||
let task: Arc<dyn AnySessionTask> = Arc::new(task);
|
||||
@@ -398,7 +414,7 @@ impl Session {
|
||||
let turn_context = self.new_default_turn_with_sub_id(sub_id).await;
|
||||
self.maybe_emit_unknown_model_warning_for_turn(turn_context.as_ref())
|
||||
.await;
|
||||
self.start_task(turn_context, Vec::new(), RegularTask::new())
|
||||
self.start_task(turn_context, TurnTaskInput::default(), RegularTask::new())
|
||||
.await;
|
||||
}
|
||||
|
||||
|
||||
@@ -8,12 +8,12 @@ use crate::session_startup_prewarm::SessionStartupPrewarmResolution;
|
||||
use crate::state::TaskKind;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::TurnStartedEvent;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use tracing::Instrument;
|
||||
use tracing::trace_span;
|
||||
|
||||
use super::SessionTask;
|
||||
use super::SessionTaskContext;
|
||||
use super::TurnTaskInput;
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct RegularTask;
|
||||
@@ -37,7 +37,7 @@ impl SessionTask for RegularTask {
|
||||
self: Arc<Self>,
|
||||
session: Arc<SessionTaskContext>,
|
||||
ctx: Arc<TurnContext>,
|
||||
input: Vec<UserInput>,
|
||||
input: TurnTaskInput,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
let sess = session.clone_session();
|
||||
@@ -77,7 +77,7 @@ impl SessionTask for RegularTask {
|
||||
if !sess.has_pending_input().await {
|
||||
return last_agent_message;
|
||||
}
|
||||
next_input = Vec::new();
|
||||
next_input = TurnTaskInput::default();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ use codex_protocol::protocol::ExitedReviewModeEvent;
|
||||
use codex_protocol::protocol::ItemCompletedEvent;
|
||||
use codex_protocol::protocol::ReviewOutputEvent;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use codex_utils_template::Template;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
@@ -25,11 +26,11 @@ use crate::session::session::Session;
|
||||
use crate::session::turn_context::TurnContext;
|
||||
use crate::state::TaskKind;
|
||||
use codex_features::Feature;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use std::sync::LazyLock;
|
||||
|
||||
use super::SessionTask;
|
||||
use super::SessionTaskContext;
|
||||
use super::TurnTaskInput;
|
||||
|
||||
static REVIEW_EXIT_SUCCESS_TEMPLATE: LazyLock<Template> = LazyLock::new(|| {
|
||||
let normalized =
|
||||
@@ -60,7 +61,7 @@ impl SessionTask for ReviewTask {
|
||||
self: Arc<Self>,
|
||||
session: Arc<SessionTaskContext>,
|
||||
ctx: Arc<TurnContext>,
|
||||
input: Vec<UserInput>,
|
||||
input: TurnTaskInput,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
session.session.services.session_telemetry.counter(
|
||||
@@ -73,7 +74,7 @@ impl SessionTask for ReviewTask {
|
||||
let output = match start_review_conversation(
|
||||
session.clone(),
|
||||
ctx.clone(),
|
||||
input,
|
||||
input.user_input,
|
||||
cancellation_token.clone(),
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::TurnTaskInput;
|
||||
use crate::session::turn_context::TurnContext;
|
||||
use crate::state::TaskKind;
|
||||
use crate::tasks::SessionTask;
|
||||
@@ -10,7 +11,6 @@ use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::UndoCompletedEvent;
|
||||
use codex_protocol::protocol::UndoStartedEvent;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::error;
|
||||
use tracing::info;
|
||||
@@ -37,7 +37,7 @@ impl SessionTask for UndoTask {
|
||||
self: Arc<Self>,
|
||||
session: Arc<SessionTaskContext>,
|
||||
ctx: Arc<TurnContext>,
|
||||
_input: Vec<UserInput>,
|
||||
_input: TurnTaskInput,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
session
|
||||
|
||||
@@ -3,7 +3,6 @@ use std::time::Duration;
|
||||
|
||||
use codex_async_utils::CancelErr;
|
||||
use codex_async_utils::OrCancelExt;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::error;
|
||||
use uuid::Uuid;
|
||||
@@ -32,6 +31,7 @@ use codex_shell_command::parse_command::parse_command;
|
||||
|
||||
use super::SessionTask;
|
||||
use super::SessionTaskContext;
|
||||
use super::TurnTaskInput;
|
||||
use crate::session::session::Session;
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
@@ -74,7 +74,7 @@ impl SessionTask for UserShellCommandTask {
|
||||
self: Arc<Self>,
|
||||
session: Arc<SessionTaskContext>,
|
||||
turn_context: Arc<TurnContext>,
|
||||
_input: Vec<UserInput>,
|
||||
_input: TurnTaskInput,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
execute_user_shell_command(
|
||||
|
||||
@@ -493,6 +493,18 @@ pub(crate) fn response_input_to_code_mode_result(response: ResponseInputItem) ->
|
||||
content_items_to_code_mode_result(&items)
|
||||
}
|
||||
},
|
||||
ResponseInputItem::FunctionCall {
|
||||
name,
|
||||
namespace,
|
||||
arguments,
|
||||
call_id,
|
||||
} => serde_json::json!({
|
||||
"type": "function_call",
|
||||
"name": name,
|
||||
"namespace": namespace,
|
||||
"arguments": arguments,
|
||||
"call_id": call_id,
|
||||
}),
|
||||
ResponseInputItem::ToolSearchOutput { tools, .. } => JsonValue::Array(tools),
|
||||
ResponseInputItem::McpToolCallOutput { output, .. } => {
|
||||
output.code_mode_result(&ToolPayload::Mcp {
|
||||
|
||||
@@ -9,6 +9,7 @@ use crate::session_prefix::format_subagent_notification_message;
|
||||
use crate::state::TaskKind;
|
||||
use crate::tasks::SessionTask;
|
||||
use crate::tasks::SessionTaskContext;
|
||||
use crate::tasks::TurnTaskInput;
|
||||
use crate::tools::context::ToolOutput;
|
||||
use crate::tools::handlers::multi_agents_v2::CloseAgentHandler as CloseAgentHandlerV2;
|
||||
use crate::tools::handlers::multi_agents_v2::FollowupTaskHandler as FollowupTaskHandlerV2;
|
||||
@@ -234,7 +235,7 @@ impl SessionTask for NeverEndingTask {
|
||||
self: Arc<Self>,
|
||||
_session: Arc<SessionTaskContext>,
|
||||
_ctx: Arc<TurnContext>,
|
||||
_input: Vec<UserInput>,
|
||||
_input: TurnTaskInput,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
cancellation_token.cancelled().await;
|
||||
|
||||
@@ -743,6 +743,22 @@ impl JsReplManager {
|
||||
fn summarize_tool_call_response(response: &ResponseInputItem) -> JsReplToolCallResponseSummary {
|
||||
match response {
|
||||
ResponseInputItem::Message { content, .. } => Self::summarize_message_payload(content),
|
||||
ResponseInputItem::FunctionCall { arguments, .. } => {
|
||||
let payload_text_preview = (!arguments.is_empty()).then(|| {
|
||||
truncate_text(
|
||||
arguments,
|
||||
TruncationPolicy::Bytes(JS_REPL_TOOL_RESPONSE_TEXT_PREVIEW_MAX_BYTES),
|
||||
)
|
||||
});
|
||||
JsReplToolCallResponseSummary {
|
||||
response_type: Some("function_call".to_string()),
|
||||
payload_kind: Some(JsReplToolCallPayloadKind::FunctionText),
|
||||
payload_text_preview,
|
||||
payload_text_length: Some(arguments.len()),
|
||||
payload_item_count: Some(1),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
ResponseInputItem::FunctionCallOutput { output, .. } => {
|
||||
let payload_kind = if output.content_items().is_some() {
|
||||
JsReplToolCallPayloadKind::FunctionContentItems
|
||||
|
||||
@@ -746,6 +746,7 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
|
||||
thread_id: primary_thread_id_for_span.clone(),
|
||||
input: items.into_iter().map(Into::into).collect(),
|
||||
responsesapi_client_metadata: None,
|
||||
prefetched_tool_results: None,
|
||||
environments: None,
|
||||
cwd: Some(default_cwd),
|
||||
approval_policy: Some(default_approval_policy.into()),
|
||||
|
||||
@@ -370,6 +370,14 @@ pub enum ResponseInputItem {
|
||||
role: String,
|
||||
content: Vec<ContentItem>,
|
||||
},
|
||||
FunctionCall {
|
||||
name: String,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
#[ts(optional)]
|
||||
namespace: Option<String>,
|
||||
arguments: String,
|
||||
call_id: String,
|
||||
},
|
||||
FunctionCallOutput {
|
||||
call_id: String,
|
||||
#[ts(as = "FunctionCallOutputBody")]
|
||||
@@ -811,6 +819,18 @@ impl From<ResponseInputItem> for ResponseItem {
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
},
|
||||
ResponseInputItem::FunctionCall {
|
||||
name,
|
||||
namespace,
|
||||
arguments,
|
||||
call_id,
|
||||
} => Self::FunctionCall {
|
||||
id: None,
|
||||
name,
|
||||
namespace,
|
||||
arguments,
|
||||
call_id,
|
||||
},
|
||||
ResponseInputItem::FunctionCallOutput { call_id, output } => {
|
||||
Self::FunctionCallOutput { call_id, output }
|
||||
}
|
||||
|
||||
@@ -443,12 +443,34 @@ pub enum Op {
|
||||
responsesapi_client_metadata: Option<HashMap<String, String>>,
|
||||
},
|
||||
|
||||
/// User input plus prefetched tool-call/result pairs to place immediately
|
||||
/// before the user-visible input in the model-visible turn history.
|
||||
UserInputWithPrefetchedToolResults {
|
||||
/// User input items, see `InputItem`.
|
||||
items: Vec<UserInput>,
|
||||
/// Synthetic Responses API function_call/function_call_output items.
|
||||
prefetched_tool_results: Vec<ResponseInputItem>,
|
||||
/// Optional turn-scoped environment selections.
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
environments: Option<Vec<TurnEnvironmentSelection>>,
|
||||
/// Optional JSON Schema used to constrain the final assistant message for this turn.
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
final_output_json_schema: Option<Value>,
|
||||
/// Optional turn-scoped Responses API `client_metadata`.
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
responsesapi_client_metadata: Option<HashMap<String, String>>,
|
||||
},
|
||||
|
||||
/// Similar to [`Op::UserInput`], but first applies persistent turn-context
|
||||
/// overrides in the same queued operation. This preserves submission order
|
||||
/// and prevents the input from starting if the overrides are rejected.
|
||||
UserInputWithTurnContext {
|
||||
/// User input items, see `InputItem`
|
||||
items: Vec<UserInput>,
|
||||
/// Synthetic Responses API function_call/function_call_output items to
|
||||
/// place before the user-visible input in the model-visible turn history.
|
||||
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||
prefetched_tool_results: Vec<ResponseInputItem>,
|
||||
/// Optional turn-scoped environment selections.
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
environments: Option<Vec<TurnEnvironmentSelection>>,
|
||||
@@ -878,6 +900,9 @@ impl Op {
|
||||
Self::RealtimeConversationClose => "realtime_conversation_close",
|
||||
Self::RealtimeConversationListVoices => "realtime_conversation_list_voices",
|
||||
Self::UserInput { .. } => "user_input",
|
||||
Self::UserInputWithPrefetchedToolResults { .. } => {
|
||||
"user_input_with_prefetched_tool_results"
|
||||
}
|
||||
Self::UserInputWithTurnContext { .. } => "user_input_with_turn_context",
|
||||
Self::UserTurn { .. } => "user_turn",
|
||||
Self::InterAgentCommunication { .. } => "inter_agent_communication",
|
||||
|
||||
@@ -535,6 +535,7 @@ impl AppServerSession {
|
||||
thread_id: thread_id.to_string(),
|
||||
input: items.into_iter().map(Into::into).collect(),
|
||||
responsesapi_client_metadata: None,
|
||||
prefetched_tool_results: None,
|
||||
environments: None,
|
||||
cwd: Some(cwd),
|
||||
approval_policy: Some(approval_policy.into()),
|
||||
|
||||
Reference in New Issue
Block a user