Compare commits

...

1 Commits

Author SHA1 Message Date
Vasili Anton
687543f401 Support prefetched tool results for turn start 2026-04-22 14:50:44 -07:00
24 changed files with 700 additions and 146 deletions

View File

@@ -2694,6 +2694,27 @@ export type Config = { stableField: Keep, unstableField: string | null } & ({ [k
Ok(())
}
#[test]
fn experimental_type_fields_ts_filter_removes_turn_start_params_fields() -> Result<()> {
let registered_fields = experimental_fields();
assert!(registered_fields.iter().any(|field| {
field.type_name == "TurnStartParams" && field.field_name == "responsesapiClientMetadata"
}));
let turn_start_ts = v2::TurnStartParams::export_to_string()?;
assert!(type_body_brace_span(&turn_start_ts).is_some());
let mut tree = BTreeMap::from([(PathBuf::from("v2/TurnStartParams.ts"), turn_start_ts)]);
filter_experimental_ts_tree(&mut tree)?;
let filtered = tree
.get(Path::new("v2/TurnStartParams.ts"))
.context("missing filtered TurnStartParams.ts")?;
assert_eq!(filtered.contains("responsesapiClientMetadata"), false);
assert_eq!(filtered.contains("prefetchedToolResults"), false);
assert_eq!(filtered.contains("environments"), false);
assert_eq!(filtered.contains("cwd"), true);
Ok(())
}
#[test]
fn stable_schema_filter_removes_mock_experimental_method() -> Result<()> {
let output_dir = std::env::temp_dir().join(format!("codex_schema_{}", Uuid::now_v7()));

View File

@@ -4810,6 +4810,11 @@ pub struct TurnStartParams {
#[experimental("turn/start.responsesapiClientMetadata")]
#[ts(optional = nullable)]
pub responsesapi_client_metadata: Option<HashMap<String, String>>,
/// EXPERIMENTAL - Raw Responses API `function_call` and matching
/// `function_call_output` items to place before the user input for this turn.
#[experimental("turn/start.prefetchedToolResults")]
#[ts(optional = nullable)]
pub prefetched_tool_results: Option<Vec<JsonValue>>,
/// Optional turn-scoped environment selections.
#[experimental("turn/start.environments")]
#[ts(optional = nullable)]
@@ -10018,6 +10023,7 @@ mod tests {
thread_id: "thread_123".to_string(),
input: vec![],
responsesapi_client_metadata: None,
prefetched_tool_results: None,
environments: None,
cwd: None,
approval_policy: None,

View File

@@ -309,6 +309,7 @@ use codex_protocol::dynamic_tools::DynamicToolSpec as CoreDynamicToolSpec;
use codex_protocol::error::CodexErr;
use codex_protocol::error::Result as CodexResult;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::AgentStatus;
use codex_protocol::protocol::ConversationAudioParams;
@@ -6108,6 +6109,101 @@ impl CodexMessageProcessor {
Ok(())
}
fn invalid_prefetched_tool_results_error(message: String) -> JSONRPCErrorError {
JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message,
data: None,
}
}
fn parse_prefetched_tool_results(
values: Option<Vec<serde_json::Value>>,
) -> Result<Vec<ResponseInputItem>, JSONRPCErrorError> {
let Some(values) = values else {
return Ok(Vec::new());
};
let mut parsed = Vec::with_capacity(values.len());
let mut calls = HashMap::<String, usize>::new();
let mut outputs = HashMap::<String, usize>::new();
for (index, value) in values.into_iter().enumerate() {
let item = serde_json::from_value::<ResponseItem>(value).map_err(|err| {
Self::invalid_prefetched_tool_results_error(format!(
"prefetchedToolResults[{index}] is not a valid response item: {err}"
))
})?;
match item {
ResponseItem::FunctionCall {
name,
namespace,
arguments,
call_id,
..
} => {
if call_id.is_empty() {
return Err(Self::invalid_prefetched_tool_results_error(format!(
"prefetchedToolResults[{index}] function_call call_id must not be empty"
)));
}
if calls.insert(call_id.clone(), index).is_some() {
return Err(Self::invalid_prefetched_tool_results_error(format!(
"prefetchedToolResults contains duplicate function_call call_id `{call_id}`"
)));
}
parsed.push(ResponseInputItem::FunctionCall {
name,
namespace,
arguments,
call_id,
});
}
ResponseItem::FunctionCallOutput { call_id, output } => {
if call_id.is_empty() {
return Err(Self::invalid_prefetched_tool_results_error(format!(
"prefetchedToolResults[{index}] function_call_output call_id must not be empty"
)));
}
if outputs.insert(call_id.clone(), index).is_some() {
return Err(Self::invalid_prefetched_tool_results_error(format!(
"prefetchedToolResults contains duplicate function_call_output call_id `{call_id}`"
)));
}
parsed.push(ResponseInputItem::FunctionCallOutput { call_id, output });
}
_ => {
return Err(Self::invalid_prefetched_tool_results_error(format!(
"prefetchedToolResults[{index}] must be a function_call or function_call_output response item"
)));
}
}
}
for (call_id, output_index) in &outputs {
let Some(call_index) = calls.get(call_id) else {
return Err(Self::invalid_prefetched_tool_results_error(format!(
"prefetchedToolResults contains function_call_output for missing call_id `{call_id}`"
)));
};
if output_index < call_index {
return Err(Self::invalid_prefetched_tool_results_error(format!(
"prefetchedToolResults function_call_output for call_id `{call_id}` must appear after its function_call"
)));
}
}
for call_id in calls.keys() {
if !outputs.contains_key(call_id) {
return Err(Self::invalid_prefetched_tool_results_error(format!(
"prefetchedToolResults contains function_call without function_call_output for call_id `{call_id}`"
)));
}
}
Ok(parsed)
}
async fn send_internal_error(&self, request_id: ConnectionRequestId, message: String) {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
@@ -6783,6 +6879,29 @@ impl CodexMessageProcessor {
self.outgoing.send_error(request_id, error).await;
return;
}
let prefetched_tool_results =
match Self::parse_prefetched_tool_results(params.prefetched_tool_results) {
Ok(items) => items,
Err(error) => {
self.track_error_response(&request_id, &error, /*error_type*/ None);
self.outgoing.send_error(request_id, error).await;
return;
}
};
if params.input.is_empty() && prefetched_tool_results.is_empty() {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "input or prefetchedToolResults must not be empty".to_string(),
data: None,
};
self.track_error_response(
&request_id,
&error,
Some(AnalyticsJsonRpcError::Input(InputError::Empty)),
);
self.outgoing.send_error(request_id, error).await;
return;
}
let (_, thread) = match self.load_thread(&params.thread_id).await {
Ok(v) => v,
Err(error) => {
@@ -6894,6 +7013,7 @@ impl CodexMessageProcessor {
let turn_op = if has_any_overrides {
Op::UserInputWithTurnContext {
items: mapped_items,
prefetched_tool_results,
environments,
final_output_json_schema: params.output_schema,
responsesapi_client_metadata: params.responsesapi_client_metadata,
@@ -6910,13 +7030,21 @@ impl CodexMessageProcessor {
collaboration_mode,
personality,
}
} else {
} else if prefetched_tool_results.is_empty() {
Op::UserInput {
items: mapped_items,
environments,
final_output_json_schema: params.output_schema,
responsesapi_client_metadata: params.responsesapi_client_metadata,
}
} else {
Op::UserInputWithPrefetchedToolResults {
items: mapped_items,
prefetched_tool_results,
environments,
final_output_json_schema: params.output_schema,
responsesapi_client_metadata: params.responsesapi_client_metadata,
}
};
let turn_id = self
.submit_core_op(&request_id, thread.as_ref(), turn_op)
@@ -7055,6 +7183,20 @@ impl CodexMessageProcessor {
self.outgoing.send_error(request_id, error).await;
return;
}
if params.input.is_empty() {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "input must not be empty".to_string(),
data: None,
};
self.track_error_response(
&request_id,
&error,
Some(AnalyticsJsonRpcError::Input(InputError::Empty)),
);
self.outgoing.send_error(request_id, error).await;
return;
}
let mapped_items: Vec<CoreInputItem> = params
.input
@@ -7062,10 +7204,11 @@ impl CodexMessageProcessor {
.map(V2UserInput::into_core)
.collect();
let expected_turn_id = params.expected_turn_id;
match thread
.steer_input(
mapped_items,
Some(&params.expected_turn_id),
Some(&expected_turn_id),
params.responsesapi_client_metadata,
)
.await

View File

@@ -48,6 +48,7 @@ use opentelemetry_sdk::trace::SpanData;
use pretty_assertions::assert_eq;
use serial_test::serial;
use std::collections::BTreeMap;
use std::future::Future;
use std::path::Path;
use std::sync::Arc;
use std::sync::OnceLock;
@@ -57,6 +58,7 @@ use tracing_subscriber::layer::SubscriberExt;
use wiremock::MockServer;
const TEST_CONNECTION_ID: ConnectionId = ConnectionId(7);
const TRACING_TEST_STACK_SIZE_BYTES: usize = 4 * 1024 * 1024;
struct TestTracing {
exporter: InMemorySpanExporter,
@@ -69,6 +71,27 @@ struct RemoteTrace {
context: W3cTraceContext,
}
fn run_tracing_test<F, Fut>(name: &'static str, test: F) -> Result<()>
where
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = Result<()>> + 'static,
{
let handle = std::thread::Builder::new()
.name(name.to_string())
.stack_size(TRACING_TEST_STACK_SIZE_BYTES)
.spawn(move || -> Result<()> {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
runtime.block_on(Box::pin(test()))
})?;
match handle.join() {
Ok(result) => result,
Err(_) => Err(anyhow::anyhow!("{name} thread panicked")),
}
}
impl RemoteTrace {
fn new(trace_id: &str, parent_span_id: &str) -> Self {
let trace_id = TraceId::from_hex(trace_id).expect("trace id");
@@ -568,9 +591,16 @@ where
spans.into_iter().skip(baseline_len).collect()
}
#[tokio::test(flavor = "current_thread")]
#[test]
#[serial(app_server_tracing)]
async fn thread_start_jsonrpc_span_exports_server_span_and_parents_children() -> Result<()> {
fn thread_start_jsonrpc_span_exports_server_span_and_parents_children() -> Result<()> {
run_tracing_test(
"thread_start_jsonrpc_span_exports_server_span_and_parents_children",
thread_start_jsonrpc_span_exports_server_span_and_parents_children_impl,
)
}
async fn thread_start_jsonrpc_span_exports_server_span_and_parents_children_impl() -> Result<()> {
let mut harness = TracingHarness::new().await?;
let RemoteTrace {
@@ -688,9 +718,16 @@ async fn remote_control_origin_rejects_device_key_requests() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "current_thread")]
#[test]
#[serial(app_server_tracing)]
async fn turn_start_jsonrpc_span_parents_core_turn_spans() -> Result<()> {
fn turn_start_jsonrpc_span_parents_core_turn_spans() -> Result<()> {
run_tracing_test(
"turn_start_jsonrpc_span_parents_core_turn_spans",
turn_start_jsonrpc_span_parents_core_turn_spans_impl,
)
}
async fn turn_start_jsonrpc_span_parents_core_turn_spans_impl() -> Result<()> {
let mut harness = TracingHarness::new().await?;
let thread_start_response = harness.start_thread(/*request_id*/ 2, /*trace*/ None).await;
let thread_id = thread_start_response.thread.id.clone();
@@ -714,6 +751,7 @@ async fn turn_start_jsonrpc_span_parents_core_turn_spans() -> Result<()> {
text_elements: Vec::new(),
}],
responsesapi_client_metadata: None,
prefetched_tool_results: None,
cwd: None,
approval_policy: None,
sandbox_policy: None,

View File

@@ -19,6 +19,7 @@ use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_protocol::protocol::RealtimeOutputModality;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::path::Path;
use std::time::Duration;
use tempfile::TempDir;
@@ -280,6 +281,55 @@ async fn thread_start_granular_approval_policy_requires_experimental_api_capabil
Ok(())
}
#[tokio::test]
async fn turn_start_prefetched_tool_results_requires_experimental_api_capability() -> Result<()> {
let codex_home = TempDir::new()?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
let init = mcp
.initialize_with_capabilities(
default_client_info(),
Some(InitializeCapabilities {
experimental_api: false,
opt_out_notification_methods: None,
}),
)
.await?;
let JSONRPCMessage::Response(_) = init else {
anyhow::bail!("expected initialize response, got {init:?}");
};
let request_id = mcp
.send_raw_request(
"turn/start",
Some(json!({
"threadId": "thr_123",
"input": [],
"prefetchedToolResults": [
{
"type": "function_call",
"name": "lookup",
"arguments": "{}",
"call_id": "prefetch-call-1"
},
{
"type": "function_call_output",
"call_id": "prefetch-call-1",
"output": "cached result"
}
]
})),
)
.await?;
let error = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_experimental_capability_error(error, "turn/start.prefetchedToolResults");
Ok(())
}
fn default_client_info() -> ClientInfo {
ClientInfo {
name: DEFAULT_CLIENT_NAME.to_string(),

View File

@@ -254,6 +254,161 @@ async fn turn_start_emits_user_message_item_with_text_elements() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn turn_start_prefetched_tool_results_precede_user_input() -> Result<()> {
let server = responses::start_mock_server().await;
let body = responses::sse(vec![
responses::ev_response_created("resp-1"),
responses::ev_assistant_message("msg-1", "Done"),
responses::ev_completed("resp-1"),
]);
let response_mock = responses::mount_sse_once(&server, body).await;
let codex_home = TempDir::new()?;
create_config_toml(
codex_home.path(),
&server.uri(),
"never",
&BTreeMap::from([(Feature::Personality, true)]),
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_req = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
input: vec![V2UserInput::Text {
text: "Use the cached result".to_string(),
text_elements: Vec::new(),
}],
prefetched_tool_results: Some(vec![
json!({
"type": "function_call",
"name": "lookup",
"arguments": "{\"query\":\"cached\"}",
"call_id": "prefetch-call-1"
}),
json!({
"type": "function_call_output",
"call_id": "prefetch-call-1",
"output": "cached result"
}),
]),
..Default::default()
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
)
.await??;
let _turn: TurnStartResponse = to_response::<TurnStartResponse>(turn_resp)?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let payload = response_mock.single_request().body_json();
let input = payload["input"].as_array().expect("model input array");
let function_call_index = input
.iter()
.position(|item| {
item.get("type").and_then(serde_json::Value::as_str) == Some("function_call")
&& item.get("call_id").and_then(serde_json::Value::as_str)
== Some("prefetch-call-1")
})
.expect("prefetched function_call in model input");
let output_index = input
.iter()
.position(|item| {
item.get("type").and_then(serde_json::Value::as_str) == Some("function_call_output")
&& item.get("call_id").and_then(serde_json::Value::as_str)
== Some("prefetch-call-1")
})
.expect("prefetched function_call_output in model input");
let user_index = input
.iter()
.position(|item| {
item.get("role").and_then(serde_json::Value::as_str) == Some("user")
&& item
.get("content")
.and_then(serde_json::Value::as_array)
.is_some_and(|content| {
content.iter().any(|part| {
part.get("type").and_then(serde_json::Value::as_str)
== Some("input_text")
&& part.get("text").and_then(serde_json::Value::as_str)
== Some("Use the cached result")
})
})
})
.expect("user input in model input");
assert_eq!(
input[function_call_index]["arguments"],
"{\"query\":\"cached\"}"
);
assert!(function_call_index < output_index);
assert!(output_index < user_index);
Ok(())
}
#[tokio::test]
async fn turn_start_rejects_unpaired_prefetched_tool_results() -> Result<()> {
let codex_home = TempDir::new()?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: "thread-does-not-matter".to_string(),
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
}],
prefetched_tool_results: Some(vec![json!({
"type": "function_call",
"name": "lookup",
"arguments": "{}",
"call_id": "missing-output"
})]),
..Default::default()
})
.await?;
let error: JSONRPCError = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(error.error.code, -32600);
assert!(
error
.error
.message
.contains("prefetchedToolResults contains function_call without function_call_output")
);
Ok(())
}
#[tokio::test]
async fn turn_start_emits_thread_scoped_warning_notification_for_trimmed_skills() -> Result<()> {
let responses = vec![create_final_assistant_message_sse_response("Done")?];
@@ -1824,6 +1979,7 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
text_elements: Vec::new(),
}],
responsesapi_client_metadata: None,
prefetched_tool_results: None,
cwd: Some(first_cwd.clone()),
approval_policy: Some(codex_app_server_protocol::AskForApproval::Never),
approvals_reviewer: None,
@@ -1866,6 +2022,7 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
text_elements: Vec::new(),
}],
responsesapi_client_metadata: None,
prefetched_tool_results: None,
cwd: Some(second_cwd.clone()),
approval_policy: Some(codex_app_server_protocol::AskForApproval::Never),
approvals_reviewer: None,

View File

@@ -30,6 +30,7 @@ use crate::review_prompts::resolve_review_request;
use crate::rollout::RolloutRecorder;
use crate::rollout::read_session_meta_line;
use crate::tasks::CompactTask;
use crate::tasks::TurnTaskInput;
use crate::tasks::UndoTask;
use crate::tasks::UserShellCommandMode;
use crate::tasks::UserShellCommandTask;
@@ -129,119 +130,140 @@ pub(super) async fn user_input_or_turn_inner(
op: Op,
mirror_user_text_to_realtime: Option<()>,
) {
let (items, updates, responsesapi_client_metadata, environments) = match op {
Op::UserTurn {
cwd,
approval_policy,
approvals_reviewer,
sandbox_policy,
model,
effort,
summary,
service_tier,
final_output_json_schema,
items,
collaboration_mode,
personality,
environments,
} => {
let collaboration_mode = collaboration_mode.or_else(|| {
Some(CollaborationMode {
mode: ModeKind::Default,
settings: Settings {
model: model.clone(),
reasoning_effort: effort,
developer_instructions: None,
},
})
});
(
let (items, prefetched_tool_results, updates, responsesapi_client_metadata, environments) =
match op {
Op::UserTurn {
cwd,
approval_policy,
approvals_reviewer,
sandbox_policy,
model,
effort,
summary,
service_tier,
final_output_json_schema,
items,
SessionSettingsUpdate {
cwd: Some(cwd),
approval_policy: Some(approval_policy),
approvals_reviewer,
sandbox_policy: Some(sandbox_policy),
permission_profile: None,
windows_sandbox_level: None,
collaboration_mode,
reasoning_summary: summary,
service_tier,
final_output_json_schema: Some(final_output_json_schema),
personality,
app_server_client_name: None,
app_server_client_version: None,
},
None,
collaboration_mode,
personality,
environments,
)
}
Op::UserInputWithTurnContext {
cwd,
approval_policy,
approvals_reviewer,
sandbox_policy,
permission_profile,
windows_sandbox_level,
model,
effort,
summary,
service_tier,
final_output_json_schema,
items,
responsesapi_client_metadata,
collaboration_mode,
personality,
environments,
} => {
let collaboration_mode = if let Some(collab_mode) = collaboration_mode {
Some(collab_mode)
} else {
let state = sess.state.lock().await;
Some(
state
.session_configuration
.collaboration_mode
.with_updates(model, effort, /*developer_instructions*/ None),
} => {
let collaboration_mode = collaboration_mode.or_else(|| {
Some(CollaborationMode {
mode: ModeKind::Default,
settings: Settings {
model: model.clone(),
reasoning_effort: effort,
developer_instructions: None,
},
})
});
(
items,
Vec::new(),
SessionSettingsUpdate {
cwd: Some(cwd),
approval_policy: Some(approval_policy),
approvals_reviewer,
sandbox_policy: Some(sandbox_policy),
permission_profile: None,
windows_sandbox_level: None,
collaboration_mode,
reasoning_summary: summary,
service_tier,
final_output_json_schema: Some(final_output_json_schema),
personality,
app_server_client_name: None,
app_server_client_version: None,
},
None,
environments,
)
};
(
}
Op::UserInputWithTurnContext {
cwd,
approval_policy,
approvals_reviewer,
sandbox_policy,
permission_profile,
windows_sandbox_level,
model,
effort,
summary,
service_tier,
final_output_json_schema,
items,
prefetched_tool_results,
responsesapi_client_metadata,
collaboration_mode,
personality,
environments,
} => {
let collaboration_mode = if let Some(collab_mode) = collaboration_mode {
Some(collab_mode)
} else {
let state = sess.state.lock().await;
Some(
state
.session_configuration
.collaboration_mode
.with_updates(model, effort, /*developer_instructions*/ None),
)
};
(
items,
prefetched_tool_results,
SessionSettingsUpdate {
cwd,
approval_policy,
approvals_reviewer,
sandbox_policy,
permission_profile,
windows_sandbox_level,
collaboration_mode,
reasoning_summary: summary,
service_tier,
final_output_json_schema: Some(final_output_json_schema),
personality,
app_server_client_name: None,
app_server_client_version: None,
},
responsesapi_client_metadata,
environments,
)
}
Op::UserInput {
items,
environments,
final_output_json_schema,
responsesapi_client_metadata,
} => (
items,
Vec::new(),
SessionSettingsUpdate {
cwd,
approval_policy,
approvals_reviewer,
sandbox_policy,
permission_profile,
windows_sandbox_level,
collaboration_mode,
reasoning_summary: summary,
service_tier,
final_output_json_schema: Some(final_output_json_schema),
personality,
app_server_client_name: None,
app_server_client_version: None,
..Default::default()
},
responsesapi_client_metadata,
environments,
)
}
Op::UserInput {
items,
environments,
final_output_json_schema,
responsesapi_client_metadata,
} => (
items,
SessionSettingsUpdate {
final_output_json_schema: Some(final_output_json_schema),
..Default::default()
},
responsesapi_client_metadata,
environments,
),
_ => unreachable!(),
};
),
Op::UserInputWithPrefetchedToolResults {
items,
prefetched_tool_results,
environments,
final_output_json_schema,
responsesapi_client_metadata,
} => (
items,
prefetched_tool_results,
SessionSettingsUpdate {
final_output_json_schema: Some(final_output_json_schema),
..Default::default()
},
responsesapi_client_metadata,
environments,
),
_ => unreachable!(),
};
let Ok(current_context) = sess
.new_turn_with_sub_id(sub_id.clone(), updates, environments)
@@ -252,14 +274,17 @@ pub(super) async fn user_input_or_turn_inner(
};
sess.maybe_emit_unknown_model_warning_for_turn(current_context.as_ref())
.await;
let accepted_items = match sess
.steer_input(
let steer_result = if prefetched_tool_results.is_empty() {
sess.steer_input(
items.clone(),
/*expected_turn_id*/ None,
responsesapi_client_metadata.clone(),
)
.await
{
} else {
Err(SteerInputError::NoActiveTurn(items.clone()))
};
let accepted_items = match steer_result {
Ok(_) => {
current_context.session_telemetry.user_prompt(&items);
Some(items)
@@ -276,7 +301,10 @@ pub(super) async fn user_input_or_turn_inner(
let accepted_items = items.clone();
sess.spawn_task(
Arc::clone(&current_context),
items,
TurnTaskInput {
user_input: items,
prefetched_tool_results,
},
crate::tasks::RegularTask::new(),
)
.await;
@@ -1153,6 +1181,7 @@ pub(super) async fn submission_loop(
false
}
Op::UserInput { .. }
| Op::UserInputWithPrefetchedToolResults { .. }
| Op::UserInputWithTurnContext { .. }
| Op::UserTurn { .. } => {
user_input_or_turn(&sess, sub.id.clone(), sub.op).await;

View File

@@ -2839,7 +2839,7 @@ impl Session {
.run(
Arc::new(SessionTaskContext::new(self.clone())),
turn_context.clone(),
Vec::new(),
crate::tasks::TurnTaskInput::default(),
cancellation_token,
)
.await;

View File

@@ -49,6 +49,7 @@ use crate::rollout::recorder::RolloutRecorder;
use crate::state::TaskKind;
use crate::tasks::SessionTask;
use crate::tasks::SessionTaskContext;
use crate::tasks::TurnTaskInput;
use crate::tasks::UserShellCommandMode;
use crate::tasks::execute_user_shell_command;
use crate::tools::ToolRouter;
@@ -3790,6 +3791,7 @@ fn op_kind_distinguishes_turn_ops() {
Op::UserInputWithTurnContext {
environments: None,
items: vec![],
prefetched_tool_results: vec![],
final_output_json_schema: None,
responsesapi_client_metadata: None,
cwd: None,
@@ -3992,7 +3994,7 @@ async fn spawn_task_turn_span_inherits_dispatch_trace_context() {
self: Arc<Self>,
_session: Arc<SessionTaskContext>,
_ctx: Arc<TurnContext>,
_input: Vec<UserInput>,
_input: TurnTaskInput,
_cancellation_token: CancellationToken,
) -> Option<String> {
let mut trace = self
@@ -5608,7 +5610,7 @@ impl SessionTask for NeverEndingTask {
self: Arc<Self>,
_session: Arc<SessionTaskContext>,
_ctx: Arc<TurnContext>,
_input: Vec<UserInput>,
_input: TurnTaskInput,
cancellation_token: CancellationToken,
) -> Option<String> {
if self.listen_to_cancellation_token {
@@ -5637,7 +5639,7 @@ impl SessionTask for GuardianDeniedApprovalTask {
self: Arc<Self>,
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
_input: Vec<UserInput>,
_input: TurnTaskInput,
cancellation_token: CancellationToken,
) -> Option<String> {
let session = session.clone_session();

View File

@@ -48,6 +48,7 @@ use crate::stream_events_utils::last_assistant_message_from_item;
use crate::stream_events_utils::mark_thread_memory_mode_polluted_if_external_context;
use crate::stream_events_utils::raw_assistant_output_text_from_item;
use crate::stream_events_utils::record_completed_response_item;
use crate::tasks::TurnTaskInput;
use crate::tools::ToolRouter;
use crate::tools::context::SharedTurnDiffTracker;
use crate::tools::parallel::ToolCallRuntime;
@@ -137,11 +138,15 @@ use tracing::warn;
pub(crate) async fn run_turn(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
input: TurnTaskInput,
prewarmed_client_session: Option<ModelClientSession>,
cancellation_token: CancellationToken,
) -> Option<String> {
if input.is_empty() && !sess.has_pending_input().await {
let TurnTaskInput {
user_input: input,
prefetched_tool_results,
} = input;
if input.is_empty() && prefetched_tool_results.is_empty() && !sess.has_pending_input().await {
return None;
}
@@ -300,11 +305,13 @@ pub(crate) async fn run_turn(
if run_pending_session_start_hooks(&sess, &turn_context).await {
return None;
}
let prefetched_tool_result_items = prefetched_tool_results
.into_iter()
.map(ResponseItem::from)
.collect::<Vec<_>>();
let additional_contexts = if input.is_empty() {
Vec::new()
} else {
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input.clone());
let response_item: ResponseItem = initial_input_for_turn.clone().into();
let user_prompt_submit_outcome = run_user_prompt_submit_hooks(
&sess,
&turn_context,
@@ -320,10 +327,18 @@ pub(crate) async fn run_turn(
.await;
return None;
}
sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), &input, response_item)
.await;
user_prompt_submit_outcome.additional_contexts
};
if !prefetched_tool_result_items.is_empty() {
sess.record_conversation_items(&turn_context, &prefetched_tool_result_items)
.await;
}
if !input.is_empty() {
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input.clone());
let response_item: ResponseItem = initial_input_for_turn.into();
sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), &input, response_item)
.await;
}
sess.services
.analytics_events_client
.track_app_mentioned(tracking.clone(), mentioned_app_invocations);

View File

@@ -2,9 +2,9 @@ use std::sync::Arc;
use super::SessionTask;
use super::SessionTaskContext;
use super::TurnTaskInput;
use crate::session::turn_context::TurnContext;
use crate::state::TaskKind;
use codex_protocol::user_input::UserInput;
use tokio_util::sync::CancellationToken;
#[derive(Clone, Copy, Default)]
@@ -23,7 +23,7 @@ impl SessionTask for CompactTask {
self: Arc<Self>,
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
input: Vec<UserInput>,
input: TurnTaskInput,
_cancellation_token: CancellationToken,
) -> Option<String> {
let session = session.clone_session();
@@ -40,7 +40,7 @@ impl SessionTask for CompactTask {
/*inc*/ 1,
&[("type", "local")],
);
crate::compact::run_compact_task(session.clone(), ctx, input).await
crate::compact::run_compact_task(session.clone(), ctx, input.user_input).await
};
None
}

View File

@@ -1,3 +1,4 @@
use super::TurnTaskInput;
use crate::session::turn_context::TurnContext;
use crate::state::TaskKind;
use crate::tasks::SessionTask;
@@ -9,7 +10,6 @@ use codex_git_utils::create_ghost_commit_with_report;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::WarningEvent;
use codex_protocol::user_input::UserInput;
use codex_utils_readiness::Readiness;
use codex_utils_readiness::Token;
use std::sync::Arc;
@@ -38,7 +38,7 @@ impl SessionTask for GhostSnapshotTask {
self: Arc<Self>,
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
_input: Vec<UserInput>,
_input: TurnTaskInput,
cancellation_token: CancellationToken,
) -> Option<String> {
tokio::task::spawn(async move {

View File

@@ -61,6 +61,21 @@ pub(crate) use user_shell::execute_user_shell_command;
const GRACEFULL_INTERRUPTION_TIMEOUT_MS: u64 = 100;
#[derive(Debug, Clone, Default)]
pub(crate) struct TurnTaskInput {
pub(crate) user_input: Vec<UserInput>,
pub(crate) prefetched_tool_results: Vec<ResponseInputItem>,
}
impl From<Vec<UserInput>> for TurnTaskInput {
fn from(user_input: Vec<UserInput>) -> Self {
Self {
user_input,
prefetched_tool_results: Vec::new(),
}
}
}
/// Shared model-visible marker used by both the real interrupt path and
/// interrupted fork snapshots.
pub(crate) fn interrupted_turn_history_marker() -> ResponseItem {
@@ -161,7 +176,7 @@ pub(crate) trait SessionTask: Send + Sync + 'static {
self: Arc<Self>,
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
input: Vec<UserInput>,
input: TurnTaskInput,
cancellation_token: CancellationToken,
) -> impl std::future::Future<Output = Option<String>> + Send;
@@ -190,7 +205,7 @@ pub(crate) trait AnySessionTask: Send + Sync + 'static {
self: Arc<Self>,
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
input: Vec<UserInput>,
input: TurnTaskInput,
cancellation_token: CancellationToken,
) -> BoxFuture<'static, Option<String>>;
@@ -217,7 +232,7 @@ where
self: Arc<Self>,
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
input: Vec<UserInput>,
input: TurnTaskInput,
cancellation_token: CancellationToken,
) -> BoxFuture<'static, Option<String>> {
Box::pin(SessionTask::run(
@@ -242,9 +257,10 @@ impl Session {
pub async fn spawn_task<T: SessionTask>(
self: &Arc<Self>,
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
input: impl Into<TurnTaskInput>,
task: T,
) {
let input = input.into();
self.abort_all_tasks(TurnAbortReason::Replaced).await;
self.clear_connector_selection().await;
self.start_task(turn_context, input, task).await;
@@ -253,7 +269,7 @@ impl Session {
async fn start_task<T: SessionTask>(
self: &Arc<Self>,
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
input: TurnTaskInput,
task: T,
) {
let task: Arc<dyn AnySessionTask> = Arc::new(task);
@@ -398,7 +414,7 @@ impl Session {
let turn_context = self.new_default_turn_with_sub_id(sub_id).await;
self.maybe_emit_unknown_model_warning_for_turn(turn_context.as_ref())
.await;
self.start_task(turn_context, Vec::new(), RegularTask::new())
self.start_task(turn_context, TurnTaskInput::default(), RegularTask::new())
.await;
}

View File

@@ -8,12 +8,12 @@ use crate::session_startup_prewarm::SessionStartupPrewarmResolution;
use crate::state::TaskKind;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::TurnStartedEvent;
use codex_protocol::user_input::UserInput;
use tracing::Instrument;
use tracing::trace_span;
use super::SessionTask;
use super::SessionTaskContext;
use super::TurnTaskInput;
#[derive(Default)]
pub(crate) struct RegularTask;
@@ -37,7 +37,7 @@ impl SessionTask for RegularTask {
self: Arc<Self>,
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
input: Vec<UserInput>,
input: TurnTaskInput,
cancellation_token: CancellationToken,
) -> Option<String> {
let sess = session.clone_session();
@@ -77,7 +77,7 @@ impl SessionTask for RegularTask {
if !sess.has_pending_input().await {
return last_agent_message;
}
next_input = Vec::new();
next_input = TurnTaskInput::default();
}
}
}

View File

@@ -14,6 +14,7 @@ use codex_protocol::protocol::ExitedReviewModeEvent;
use codex_protocol::protocol::ItemCompletedEvent;
use codex_protocol::protocol::ReviewOutputEvent;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::user_input::UserInput;
use codex_utils_template::Template;
use tokio_util::sync::CancellationToken;
@@ -25,11 +26,11 @@ use crate::session::session::Session;
use crate::session::turn_context::TurnContext;
use crate::state::TaskKind;
use codex_features::Feature;
use codex_protocol::user_input::UserInput;
use std::sync::LazyLock;
use super::SessionTask;
use super::SessionTaskContext;
use super::TurnTaskInput;
static REVIEW_EXIT_SUCCESS_TEMPLATE: LazyLock<Template> = LazyLock::new(|| {
let normalized =
@@ -60,7 +61,7 @@ impl SessionTask for ReviewTask {
self: Arc<Self>,
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
input: Vec<UserInput>,
input: TurnTaskInput,
cancellation_token: CancellationToken,
) -> Option<String> {
session.session.services.session_telemetry.counter(
@@ -73,7 +74,7 @@ impl SessionTask for ReviewTask {
let output = match start_review_conversation(
session.clone(),
ctx.clone(),
input,
input.user_input,
cancellation_token.clone(),
)
.await

View File

@@ -1,5 +1,6 @@
use std::sync::Arc;
use super::TurnTaskInput;
use crate::session::turn_context::TurnContext;
use crate::state::TaskKind;
use crate::tasks::SessionTask;
@@ -10,7 +11,6 @@ use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::UndoCompletedEvent;
use codex_protocol::protocol::UndoStartedEvent;
use codex_protocol::user_input::UserInput;
use tokio_util::sync::CancellationToken;
use tracing::error;
use tracing::info;
@@ -37,7 +37,7 @@ impl SessionTask for UndoTask {
self: Arc<Self>,
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
_input: Vec<UserInput>,
_input: TurnTaskInput,
cancellation_token: CancellationToken,
) -> Option<String> {
session

View File

@@ -3,7 +3,6 @@ use std::time::Duration;
use codex_async_utils::CancelErr;
use codex_async_utils::OrCancelExt;
use codex_protocol::user_input::UserInput;
use tokio_util::sync::CancellationToken;
use tracing::error;
use uuid::Uuid;
@@ -32,6 +31,7 @@ use codex_shell_command::parse_command::parse_command;
use super::SessionTask;
use super::SessionTaskContext;
use super::TurnTaskInput;
use crate::session::session::Session;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
@@ -74,7 +74,7 @@ impl SessionTask for UserShellCommandTask {
self: Arc<Self>,
session: Arc<SessionTaskContext>,
turn_context: Arc<TurnContext>,
_input: Vec<UserInput>,
_input: TurnTaskInput,
cancellation_token: CancellationToken,
) -> Option<String> {
execute_user_shell_command(

View File

@@ -493,6 +493,18 @@ pub(crate) fn response_input_to_code_mode_result(response: ResponseInputItem) ->
content_items_to_code_mode_result(&items)
}
},
ResponseInputItem::FunctionCall {
name,
namespace,
arguments,
call_id,
} => serde_json::json!({
"type": "function_call",
"name": name,
"namespace": namespace,
"arguments": arguments,
"call_id": call_id,
}),
ResponseInputItem::ToolSearchOutput { tools, .. } => JsonValue::Array(tools),
ResponseInputItem::McpToolCallOutput { output, .. } => {
output.code_mode_result(&ToolPayload::Mcp {

View File

@@ -9,6 +9,7 @@ use crate::session_prefix::format_subagent_notification_message;
use crate::state::TaskKind;
use crate::tasks::SessionTask;
use crate::tasks::SessionTaskContext;
use crate::tasks::TurnTaskInput;
use crate::tools::context::ToolOutput;
use crate::tools::handlers::multi_agents_v2::CloseAgentHandler as CloseAgentHandlerV2;
use crate::tools::handlers::multi_agents_v2::FollowupTaskHandler as FollowupTaskHandlerV2;
@@ -234,7 +235,7 @@ impl SessionTask for NeverEndingTask {
self: Arc<Self>,
_session: Arc<SessionTaskContext>,
_ctx: Arc<TurnContext>,
_input: Vec<UserInput>,
_input: TurnTaskInput,
cancellation_token: CancellationToken,
) -> Option<String> {
cancellation_token.cancelled().await;

View File

@@ -743,6 +743,22 @@ impl JsReplManager {
fn summarize_tool_call_response(response: &ResponseInputItem) -> JsReplToolCallResponseSummary {
match response {
ResponseInputItem::Message { content, .. } => Self::summarize_message_payload(content),
ResponseInputItem::FunctionCall { arguments, .. } => {
let payload_text_preview = (!arguments.is_empty()).then(|| {
truncate_text(
arguments,
TruncationPolicy::Bytes(JS_REPL_TOOL_RESPONSE_TEXT_PREVIEW_MAX_BYTES),
)
});
JsReplToolCallResponseSummary {
response_type: Some("function_call".to_string()),
payload_kind: Some(JsReplToolCallPayloadKind::FunctionText),
payload_text_preview,
payload_text_length: Some(arguments.len()),
payload_item_count: Some(1),
..Default::default()
}
}
ResponseInputItem::FunctionCallOutput { output, .. } => {
let payload_kind = if output.content_items().is_some() {
JsReplToolCallPayloadKind::FunctionContentItems

View File

@@ -746,6 +746,7 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
thread_id: primary_thread_id_for_span.clone(),
input: items.into_iter().map(Into::into).collect(),
responsesapi_client_metadata: None,
prefetched_tool_results: None,
environments: None,
cwd: Some(default_cwd),
approval_policy: Some(default_approval_policy.into()),

View File

@@ -370,6 +370,14 @@ pub enum ResponseInputItem {
role: String,
content: Vec<ContentItem>,
},
FunctionCall {
name: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[ts(optional)]
namespace: Option<String>,
arguments: String,
call_id: String,
},
FunctionCallOutput {
call_id: String,
#[ts(as = "FunctionCallOutputBody")]
@@ -811,6 +819,18 @@ impl From<ResponseInputItem> for ResponseItem {
end_turn: None,
phase: None,
},
ResponseInputItem::FunctionCall {
name,
namespace,
arguments,
call_id,
} => Self::FunctionCall {
id: None,
name,
namespace,
arguments,
call_id,
},
ResponseInputItem::FunctionCallOutput { call_id, output } => {
Self::FunctionCallOutput { call_id, output }
}

View File

@@ -443,12 +443,34 @@ pub enum Op {
responsesapi_client_metadata: Option<HashMap<String, String>>,
},
/// User input plus prefetched tool-call/result pairs to place immediately
/// before the user-visible input in the model-visible turn history.
UserInputWithPrefetchedToolResults {
/// User input items, see `InputItem`.
items: Vec<UserInput>,
/// Synthetic Responses API function_call/function_call_output items.
prefetched_tool_results: Vec<ResponseInputItem>,
/// Optional turn-scoped environment selections.
#[serde(default, skip_serializing_if = "Option::is_none")]
environments: Option<Vec<TurnEnvironmentSelection>>,
/// Optional JSON Schema used to constrain the final assistant message for this turn.
#[serde(skip_serializing_if = "Option::is_none")]
final_output_json_schema: Option<Value>,
/// Optional turn-scoped Responses API `client_metadata`.
#[serde(default, skip_serializing_if = "Option::is_none")]
responsesapi_client_metadata: Option<HashMap<String, String>>,
},
/// Similar to [`Op::UserInput`], but first applies persistent turn-context
/// overrides in the same queued operation. This preserves submission order
/// and prevents the input from starting if the overrides are rejected.
UserInputWithTurnContext {
/// User input items, see `InputItem`
items: Vec<UserInput>,
/// Synthetic Responses API function_call/function_call_output items to
/// place before the user-visible input in the model-visible turn history.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
prefetched_tool_results: Vec<ResponseInputItem>,
/// Optional turn-scoped environment selections.
#[serde(default, skip_serializing_if = "Option::is_none")]
environments: Option<Vec<TurnEnvironmentSelection>>,
@@ -878,6 +900,9 @@ impl Op {
Self::RealtimeConversationClose => "realtime_conversation_close",
Self::RealtimeConversationListVoices => "realtime_conversation_list_voices",
Self::UserInput { .. } => "user_input",
Self::UserInputWithPrefetchedToolResults { .. } => {
"user_input_with_prefetched_tool_results"
}
Self::UserInputWithTurnContext { .. } => "user_input_with_turn_context",
Self::UserTurn { .. } => "user_turn",
Self::InterAgentCommunication { .. } => "inter_agent_communication",

View File

@@ -535,6 +535,7 @@ impl AppServerSession {
thread_id: thread_id.to_string(),
input: items.into_iter().map(Into::into).collect(),
responsesapi_client_metadata: None,
prefetched_tool_results: None,
environments: None,
cwd: Some(cwd),
approval_policy: Some(approval_policy.into()),