refactoring for scalability

This commit is contained in:
Roy Han
2026-03-16 16:46:12 -07:00
parent 99e2fec922
commit 414b3f260b
6 changed files with 262 additions and 196 deletions

View File

@@ -987,7 +987,13 @@ fn local_time_context() -> (String, String) {
}
}
fn stamp_user_message_type_on_input_item(item: &mut ResponseInputItem, kind: UserMessageType) {
fn stamp_message_metadata_on_input_item(
item: &mut ResponseInputItem,
patch: &ResponseItemMetadata,
) {
if patch.is_empty() {
return;
}
let ResponseInputItem::Message { role, metadata, .. } = item else {
return;
};
@@ -995,8 +1001,8 @@ fn stamp_user_message_type_on_input_item(item: &mut ResponseInputItem, kind: Use
return;
}
let mut metadata_value = metadata.take().unwrap_or_default();
metadata_value.user_message_type = Some(kind);
*metadata = Some(metadata_value);
metadata_value.merge_from(patch.clone());
*metadata = (!metadata_value.is_empty()).then_some(metadata_value);
}
fn sandbox_policy_to_metadata(policy: &SandboxPolicy) -> SandboxPolicyMetadata {
@@ -1009,19 +1015,51 @@ fn sandbox_policy_to_metadata(policy: &SandboxPolicy) -> SandboxPolicyMetadata {
}
}
fn stamp_sandbox_policy_on_input_item(
item: &mut ResponseInputItem,
sandbox_policy: SandboxPolicyMetadata,
) {
let ResponseInputItem::Message { role, metadata, .. } = item else {
return;
};
if role != "user" {
return;
fn user_message_metadata_patch(
kind: UserMessageType,
sandbox_policy: Option<SandboxPolicyMetadata>,
) -> ResponseItemMetadata {
ResponseItemMetadata {
user_message_type: Some(kind),
sandbox_policy,
..ResponseItemMetadata::default()
}
}
fn stamp_message_metadata_on_response_item(
item: ResponseItem,
patch: Option<ResponseItemMetadata>,
) -> ResponseItem {
let Some(patch) = patch else {
return item;
};
if patch.is_empty() {
return item;
}
match item {
ResponseItem::Message {
id,
role,
content,
metadata,
end_turn,
phase,
} if role == "user" => {
let mut metadata_value = metadata.unwrap_or_default();
metadata_value.merge_from(patch);
let metadata = (!metadata_value.is_empty()).then_some(metadata_value);
ResponseItem::Message {
id,
role,
content,
metadata,
end_turn,
phase,
}
}
other => other,
}
let mut metadata_value = metadata.take().unwrap_or_default();
metadata_value.sandbox_policy = Some(sandbox_policy);
*metadata = Some(metadata_value);
}
fn review_decision_to_metadata(decision: &ReviewDecision) -> ReviewDecisionMetadata {
@@ -1058,55 +1096,41 @@ fn response_item_tool_call_id(item: &ResponseItem) -> Option<&str> {
}
}
fn stamp_tool_metadata_on_response_item(
item: ResponseItem,
metadata: ResponseItemMetadata,
) -> ResponseItem {
fn tool_call_metadata_slot_mut(
item: &mut ResponseItem,
) -> Option<&mut Option<ResponseItemMetadata>> {
match item {
ResponseItem::LocalShellCall {
id,
call_id,
status,
action,
..
} => ResponseItem::LocalShellCall {
id,
call_id,
status,
action,
metadata: Some(metadata),
},
ResponseItem::FunctionCall {
id,
name,
namespace,
arguments,
call_id,
..
} => ResponseItem::FunctionCall {
id,
name,
namespace,
arguments,
call_id,
metadata: Some(metadata),
},
ResponseItem::CustomToolCall {
id,
status,
call_id,
name,
input,
..
} => ResponseItem::CustomToolCall {
id,
status,
call_id,
name,
input,
metadata: Some(metadata),
},
other => other,
ResponseItem::LocalShellCall { metadata, .. }
| ResponseItem::FunctionCall { metadata, .. }
| ResponseItem::CustomToolCall { metadata, .. } => Some(metadata),
_ => None,
}
}
fn stamp_tool_metadata_on_response_item(
mut item: ResponseItem,
patch: ResponseItemMetadata,
) -> ResponseItem {
if patch.is_empty() {
return item;
}
let Some(metadata_slot) = tool_call_metadata_slot_mut(&mut item) else {
return item;
};
let mut metadata = metadata_slot.take().unwrap_or_default();
metadata.merge_from(patch);
*metadata_slot = (!metadata.is_empty()).then_some(metadata);
item
}
fn tool_call_metadata_or_default(item: &ResponseItem) -> Option<ResponseItemMetadata> {
match item {
ResponseItem::LocalShellCall { metadata, .. }
| ResponseItem::FunctionCall { metadata, .. }
| ResponseItem::CustomToolCall { metadata, .. } => {
Some(metadata.clone().unwrap_or_default())
}
_ => None,
}
}
@@ -3416,11 +3440,9 @@ impl Session {
(outcome, has_pending_approval)
};
let mut metadata = match &response_item {
ResponseItem::LocalShellCall { metadata, .. }
| ResponseItem::FunctionCall { metadata, .. }
| ResponseItem::CustomToolCall { metadata, .. } => metadata.clone().unwrap_or_default(),
_ => return response_item,
let mut metadata = match tool_call_metadata_or_default(&response_item) {
Some(metadata) => metadata,
None => return response_item,
};
metadata.sandbox_policy = Some(sandbox_policy_to_metadata(
turn_context.sandbox_policy.get(),
@@ -3957,46 +3979,18 @@ impl Session {
turn_context: &TurnContext,
input: &[UserInput],
response_item: ResponseItem,
user_message_type: Option<UserMessageType>,
message_metadata: Option<ResponseItemMetadata>,
) {
let (user_message_type, sandbox_policy) = if self.enabled(Feature::ItemMetadata) {
(
user_message_type,
Some(sandbox_policy_to_metadata(
turn_context.sandbox_policy.get(),
)),
)
let metadata_patch = if self.enabled(Feature::ItemMetadata) {
let mut patch = message_metadata.unwrap_or_default();
patch.sandbox_policy = Some(sandbox_policy_to_metadata(
turn_context.sandbox_policy.get(),
));
Some(patch)
} else {
(None, None)
};
let response_item = match (response_item, user_message_type.clone(), sandbox_policy) {
(
ResponseItem::Message {
id,
role,
content,
metadata,
end_turn,
phase,
},
user_message_type,
sandbox_policy,
) if role == "user" => {
let mut metadata = metadata.unwrap_or_default();
metadata.user_message_type = user_message_type;
metadata.sandbox_policy = sandbox_policy;
ResponseItem::Message {
id,
role,
content,
metadata: Some(metadata),
end_turn,
phase,
}
}
(response_item, _, _) => response_item,
None
};
let response_item = stamp_message_metadata_on_response_item(response_item, metadata_patch);
// Persist the user message to history, but emit the turn item from `UserInput` so
// UI-only `text_elements` are preserved. `ResponseItem::Message` does not carry
@@ -4097,16 +4091,19 @@ impl Session {
}
let mut input_item: ResponseInputItem = input.into();
if self.enabled(Feature::ItemMetadata) {
stamp_user_message_type_on_input_item(&mut input_item, UserMessageType::PromptSteering);
stamp_sandbox_policy_on_input_item(
&mut input_item,
sandbox_policy_to_metadata(active_task.turn_context.sandbox_policy.get()),
);
let metadata = Some(user_message_metadata_patch(
UserMessageType::PromptSteering,
self.enabled(Feature::ItemMetadata)
.then(|| sandbox_policy_to_metadata(active_task.turn_context.sandbox_policy.get())),
));
if self.enabled(Feature::ItemMetadata)
&& let Some(metadata_patch) = metadata.as_ref()
{
stamp_message_metadata_on_input_item(&mut input_item, metadata_patch);
}
let mut turn_state = active_turn.turn_state.lock().await;
turn_state.push_pending_input(input_item, Some(UserMessageType::PromptSteering));
turn_state.push_pending_input(input_item, metadata);
Ok(active_turn_id.to_string())
}
@@ -4123,19 +4120,24 @@ impl Session {
});
let mut ts = at.turn_state.lock().await;
for mut item in input {
let user_message_type = match &item {
ResponseInputItem::Message { .. } => Some(UserMessageType::PromptQueued),
let metadata = match &item {
ResponseInputItem::Message { .. } => Some(user_message_metadata_patch(
UserMessageType::PromptQueued,
if self.enabled(Feature::ItemMetadata) {
sandbox_policy.clone()
} else {
None
},
)),
_ => None,
};
if self.enabled(Feature::ItemMetadata)
&& let Some(kind) = user_message_type.clone()
&& let Some(metadata_patch) = metadata.as_ref()
{
stamp_user_message_type_on_input_item(&mut item, kind);
if let Some(sandbox_policy) = sandbox_policy.clone() {
stamp_sandbox_policy_on_input_item(&mut item, sandbox_policy);
}
stamp_message_metadata_on_input_item(&mut item, metadata_patch);
}
ts.push_pending_input(item, user_message_type);
ts.push_pending_input(item, metadata);
}
Ok(())
}
@@ -4145,14 +4147,14 @@ impl Session {
pub async fn get_pending_input_with_metadata(
&self,
) -> Vec<(ResponseInputItem, Option<UserMessageType>)> {
) -> Vec<(ResponseInputItem, Option<ResponseItemMetadata>)> {
let mut active = self.active_turn.lock().await;
match active.as_mut() {
Some(at) => {
let mut ts = at.turn_state.lock().await;
ts.take_pending_input_with_metadata()
.into_iter()
.map(|item| (item.input, item.user_message_type))
.map(|item| (item.input, item.metadata))
.collect()
}
None => Vec::with_capacity(0),
@@ -5917,19 +5919,22 @@ pub(crate) async fn run_turn(
.await;
let mut initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input.clone());
if sess.enabled(Feature::ItemMetadata) {
stamp_user_message_type_on_input_item(&mut initial_input_for_turn, UserMessageType::Prompt);
stamp_sandbox_policy_on_input_item(
&mut initial_input_for_turn,
sandbox_policy_to_metadata(turn_context.sandbox_policy.get()),
);
let initial_message_metadata = Some(user_message_metadata_patch(
UserMessageType::Prompt,
sess.enabled(Feature::ItemMetadata)
.then(|| sandbox_policy_to_metadata(turn_context.sandbox_policy.get())),
));
if sess.enabled(Feature::ItemMetadata)
&& let Some(metadata_patch) = initial_message_metadata.as_ref()
{
stamp_message_metadata_on_input_item(&mut initial_input_for_turn, metadata_patch);
}
let response_item: ResponseItem = initial_input_for_turn.clone().into();
sess.record_user_prompt_and_emit_turn_item(
turn_context.as_ref(),
&input,
response_item,
Some(UserMessageType::Prompt),
initial_message_metadata,
)
.await;
// Track the previous-turn baseline from the regular user-turn path only so
@@ -6020,7 +6025,7 @@ pub(crate) async fn run_turn(
let pending_response_items = sess.get_pending_input_with_metadata().await;
if !pending_response_items.is_empty() {
for (pending_input, user_message_type) in pending_response_items {
for (pending_input, message_metadata) in pending_response_items {
let response_item = ResponseItem::from(pending_input);
if let Some(TurnItem::UserMessage(user_message)) = parse_turn_item(&response_item) {
// todo(aibrahim): move pending input to be UserInput only to keep TextElements. context: https://github.com/openai/codex/pull/10656#discussion_r2765522480
@@ -6028,7 +6033,7 @@ pub(crate) async fn run_turn(
turn_context.as_ref(),
&user_message.content,
response_item,
user_message_type,
message_metadata,
)
.await;
} else {

View File

@@ -4157,7 +4157,8 @@ async fn steer_input_returns_active_turn_id() {
assert_eq!(
pending_input
.first()
.and_then(|(_, user_message_type)| user_message_type.as_ref()),
.and_then(|(_, metadata)| metadata.as_ref())
.and_then(|metadata| metadata.user_message_type.as_ref()),
Some(&codex_protocol::models::UserMessageType::PromptSteering)
);
}

View File

@@ -376,6 +376,89 @@ pub(crate) struct McpToolApprovalMetadata {
tool_description: Option<String>,
}
#[derive(Default)]
struct McpToolApprovalProjection {
connector_id: Option<String>,
connector_name: Option<String>,
connector_description: Option<String>,
tool_title: Option<String>,
tool_description: Option<String>,
annotations: Option<GuardianMcpAnnotations>,
}
impl McpToolApprovalProjection {
fn from_metadata(metadata: Option<&McpToolApprovalMetadata>) -> Self {
let Some(metadata) = metadata else {
return Self::default();
};
Self {
connector_id: metadata.connector_id.clone(),
connector_name: metadata.connector_name.clone(),
connector_description: metadata.connector_description.clone(),
tool_title: metadata.tool_title.clone(),
tool_description: metadata.tool_description.clone(),
annotations: metadata
.annotations
.as_ref()
.map(|annotations| GuardianMcpAnnotations {
destructive_hint: annotations.destructive_hint,
open_world_hint: annotations.open_world_hint,
read_only_hint: annotations.read_only_hint,
}),
}
}
fn has_connector_source(&self, server: &str) -> bool {
server == CODEX_APPS_MCP_SERVER_NAME
&& (self.connector_id.is_some()
|| self.connector_name.is_some()
|| self.connector_description.is_some())
}
fn add_elicitation_meta(
&self,
server: &str,
meta: &mut serde_json::Map<String, serde_json::Value>,
) {
if let Some(tool_title) = self.tool_title.as_ref() {
meta.insert(
MCP_TOOL_APPROVAL_TOOL_TITLE_KEY.to_string(),
serde_json::Value::String(tool_title.clone()),
);
}
if let Some(tool_description) = self.tool_description.as_ref() {
meta.insert(
MCP_TOOL_APPROVAL_TOOL_DESCRIPTION_KEY.to_string(),
serde_json::Value::String(tool_description.clone()),
);
}
if self.has_connector_source(server) {
meta.insert(
MCP_TOOL_APPROVAL_SOURCE_KEY.to_string(),
serde_json::Value::String(MCP_TOOL_APPROVAL_SOURCE_CONNECTOR.to_string()),
);
if let Some(connector_id) = self.connector_id.as_deref() {
meta.insert(
MCP_TOOL_APPROVAL_CONNECTOR_ID_KEY.to_string(),
serde_json::Value::String(connector_id.to_string()),
);
}
if let Some(connector_name) = self.connector_name.as_ref() {
meta.insert(
MCP_TOOL_APPROVAL_CONNECTOR_NAME_KEY.to_string(),
serde_json::Value::String(connector_name.clone()),
);
}
if let Some(connector_description) = self.connector_description.as_ref() {
meta.insert(
MCP_TOOL_APPROVAL_CONNECTOR_DESCRIPTION_KEY.to_string(),
serde_json::Value::String(connector_description.clone()),
);
}
}
}
}
#[derive(Clone, Copy)]
struct McpToolApprovalPromptOptions {
allow_session_remember: bool,
@@ -664,23 +747,18 @@ pub(crate) fn build_guardian_mcp_tool_review_request(
invocation: &McpInvocation,
metadata: Option<&McpToolApprovalMetadata>,
) -> GuardianApprovalRequest {
let projection = McpToolApprovalProjection::from_metadata(metadata);
GuardianApprovalRequest::McpToolCall {
id: call_id.to_string(),
server: invocation.server.clone(),
tool_name: invocation.tool.clone(),
arguments: invocation.arguments.clone(),
connector_id: metadata.and_then(|metadata| metadata.connector_id.clone()),
connector_name: metadata.and_then(|metadata| metadata.connector_name.clone()),
connector_description: metadata.and_then(|metadata| metadata.connector_description.clone()),
tool_title: metadata.and_then(|metadata| metadata.tool_title.clone()),
tool_description: metadata.and_then(|metadata| metadata.tool_description.clone()),
annotations: metadata
.and_then(|metadata| metadata.annotations.as_ref())
.map(|annotations| GuardianMcpAnnotations {
destructive_hint: annotations.destructive_hint,
open_world_hint: annotations.open_world_hint,
read_only_hint: annotations.read_only_hint,
}),
connector_id: projection.connector_id,
connector_name: projection.connector_name,
connector_description: projection.connector_description,
tool_title: projection.tool_title,
tool_description: projection.tool_description,
annotations: projection.annotations,
}
}
@@ -902,6 +980,7 @@ fn build_mcp_tool_approval_elicitation_meta(
prompt_options: McpToolApprovalPromptOptions,
) -> Option<serde_json::Value> {
let mut meta = serde_json::Map::new();
let projection = McpToolApprovalProjection::from_metadata(metadata);
meta.insert(
MCP_TOOL_APPROVAL_KIND_KEY.to_string(),
serde_json::Value::String(MCP_TOOL_APPROVAL_KIND_MCP_TOOL_CALL.to_string()),
@@ -933,48 +1012,7 @@ fn build_mcp_tool_approval_elicitation_meta(
}
(false, false) => {}
}
if let Some(metadata) = metadata {
if let Some(tool_title) = metadata.tool_title.as_ref() {
meta.insert(
MCP_TOOL_APPROVAL_TOOL_TITLE_KEY.to_string(),
serde_json::Value::String(tool_title.clone()),
);
}
if let Some(tool_description) = metadata.tool_description.as_ref() {
meta.insert(
MCP_TOOL_APPROVAL_TOOL_DESCRIPTION_KEY.to_string(),
serde_json::Value::String(tool_description.clone()),
);
}
if server == CODEX_APPS_MCP_SERVER_NAME
&& (metadata.connector_id.is_some()
|| metadata.connector_name.is_some()
|| metadata.connector_description.is_some())
{
meta.insert(
MCP_TOOL_APPROVAL_SOURCE_KEY.to_string(),
serde_json::Value::String(MCP_TOOL_APPROVAL_SOURCE_CONNECTOR.to_string()),
);
if let Some(connector_id) = metadata.connector_id.as_deref() {
meta.insert(
MCP_TOOL_APPROVAL_CONNECTOR_ID_KEY.to_string(),
serde_json::Value::String(connector_id.to_string()),
);
}
if let Some(connector_name) = metadata.connector_name.as_ref() {
meta.insert(
MCP_TOOL_APPROVAL_CONNECTOR_NAME_KEY.to_string(),
serde_json::Value::String(connector_name.clone()),
);
}
if let Some(connector_description) = metadata.connector_description.as_ref() {
meta.insert(
MCP_TOOL_APPROVAL_CONNECTOR_DESCRIPTION_KEY.to_string(),
serde_json::Value::String(connector_description.clone()),
);
}
}
}
projection.add_elicitation_meta(server, &mut meta);
if let Some(tool_params) = tool_params {
meta.insert(
MCP_TOOL_APPROVAL_TOOL_PARAMS_KEY.to_string(),

View File

@@ -10,8 +10,8 @@ use tokio_util::task::AbortOnDropHandle;
use codex_protocol::dynamic_tools::DynamicToolResponse;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItemMetadata;
use codex_protocol::models::ReviewDecisionMetadata;
use codex_protocol::models::UserMessageType;
use codex_protocol::request_permissions::RequestPermissionsResponse;
use codex_protocol::request_user_input::RequestUserInputResponse;
use codex_rmcp_client::ElicitationResponse;
@@ -61,7 +61,7 @@ pub(crate) struct RunningTask {
#[derive(Debug, Clone)]
pub(crate) struct PendingInputItem {
pub(crate) input: ResponseInputItem,
pub(crate) user_message_type: Option<UserMessageType>,
pub(crate) metadata: Option<ResponseItemMetadata>,
}
impl ActiveTurn {
@@ -223,12 +223,10 @@ impl TurnState {
pub(crate) fn push_pending_input(
&mut self,
input: ResponseInputItem,
user_message_type: Option<UserMessageType>,
metadata: Option<ResponseItemMetadata>,
) {
self.pending_input.push(PendingInputItem {
input,
user_message_type,
});
self.pending_input
.push(PendingInputItem { input, metadata });
}
pub(crate) fn take_pending_input_with_metadata(&mut self) -> Vec<PendingInputItem> {

View File

@@ -256,9 +256,9 @@ impl Session {
}
drop(active);
if !pending_input.is_empty() {
for (pending_input, user_message_type) in pending_input
for (pending_input, message_metadata) in pending_input
.into_iter()
.map(|item| (item.input, item.user_message_type))
.map(|item| (item.input, item.metadata))
{
let response_item = ResponseItem::from(pending_input);
if let Some(TurnItem::UserMessage(user_message)) = parse_turn_item(&response_item) {
@@ -269,7 +269,7 @@ impl Session {
turn_context.as_ref(),
&user_message.content,
response_item,
user_message_type,
message_metadata,
)
.await;
} else {

View File

@@ -300,6 +300,30 @@ pub struct ResponseItemMetadata {
pub review_decision: Option<ReviewDecisionMetadata>,
}
impl ResponseItemMetadata {
pub fn is_empty(&self) -> bool {
self.user_message_type.is_none()
&& self.sandbox_policy.is_none()
&& self.is_tool_call_escalated.is_none()
&& self.review_decision.is_none()
}
pub fn merge_from(&mut self, other: Self) {
if other.user_message_type.is_some() {
self.user_message_type = other.user_message_type;
}
if other.sandbox_policy.is_some() {
self.sandbox_policy = other.sandbox_policy;
}
if other.is_tool_call_escalated.is_some() {
self.is_tool_call_escalated = other.is_tool_call_escalated;
}
if other.review_decision.is_some() {
self.review_decision = other.review_decision;
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, JsonSchema, TS)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ContentItem {