mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
share experimental thread item filtering
Move generic thread-item experimental filtering into app-server-protocol so transport can reuse typed helpers instead of carrying guardian-specific logic. Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
@@ -1,8 +1,12 @@
|
||||
use std::collections::HashSet;
|
||||
use std::path::Path;
|
||||
use std::sync::OnceLock;
|
||||
|
||||
use crate::JSONRPCNotification;
|
||||
use crate::JSONRPCRequest;
|
||||
use crate::RequestId;
|
||||
use crate::experimental_api::ExperimentalEnumVariantEncoding;
|
||||
use crate::experimental_api::experimental_enum_variants;
|
||||
use crate::export::GeneratedSchema;
|
||||
use crate::export::write_json_schema;
|
||||
use crate::protocol::v1;
|
||||
@@ -11,6 +15,7 @@ use codex_experimental_api_macros::ExperimentalApi;
|
||||
use schemars::JsonSchema;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use serde_json::Value;
|
||||
use strum_macros::Display;
|
||||
use ts_rs::TS;
|
||||
|
||||
@@ -639,6 +644,89 @@ macro_rules! client_notification_definitions {
|
||||
};
|
||||
}
|
||||
|
||||
pub fn filter_experimental_thread_items_in_server_notification(
|
||||
notification: ServerNotification,
|
||||
) -> Option<ServerNotification> {
|
||||
match notification {
|
||||
ServerNotification::ThreadStarted(mut notification) => {
|
||||
notification.thread.strip_experimental_thread_items();
|
||||
Some(ServerNotification::ThreadStarted(notification))
|
||||
}
|
||||
ServerNotification::ItemStarted(notification) if notification.item.is_experimental() => {
|
||||
None
|
||||
}
|
||||
ServerNotification::ItemCompleted(notification) if notification.item.is_experimental() => {
|
||||
None
|
||||
}
|
||||
ServerNotification::TurnStarted(mut notification) => {
|
||||
notification.turn.strip_experimental_thread_items();
|
||||
Some(ServerNotification::TurnStarted(notification))
|
||||
}
|
||||
ServerNotification::TurnCompleted(mut notification) => {
|
||||
notification.turn.strip_experimental_thread_items();
|
||||
Some(ServerNotification::TurnCompleted(notification))
|
||||
}
|
||||
_ => Some(notification),
|
||||
}
|
||||
}
|
||||
|
||||
/// Transport serializes app-server responses before per-connection filtering,
|
||||
/// so stable clients need a last-mile scrub pass over the response JSON.
|
||||
pub fn strip_experimental_thread_items_from_serialized_response(result: &mut Value) {
|
||||
match result {
|
||||
Value::Object(map) => {
|
||||
for (key, value) in map {
|
||||
if key == "items" {
|
||||
if let Value::Array(items) = value {
|
||||
items.retain(|item| !is_experimental_thread_item_value(item));
|
||||
for item in items {
|
||||
strip_experimental_thread_items_from_serialized_response(item);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
strip_experimental_thread_items_from_serialized_response(value);
|
||||
}
|
||||
}
|
||||
}
|
||||
Value::Array(values) => {
|
||||
for value in values {
|
||||
strip_experimental_thread_items_from_serialized_response(value);
|
||||
}
|
||||
}
|
||||
Value::Null | Value::Bool(_) | Value::Number(_) | Value::String(_) => {}
|
||||
}
|
||||
}
|
||||
|
||||
fn is_experimental_thread_item_value(value: &Value) -> bool {
|
||||
matches!(
|
||||
value,
|
||||
Value::Object(map)
|
||||
if matches!(
|
||||
map.get("type"),
|
||||
Some(Value::String(kind))
|
||||
if experimental_thread_item_variant_tags().contains(kind.as_str())
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
fn experimental_thread_item_variant_tags() -> &'static HashSet<&'static str> {
|
||||
static TAGS: OnceLock<HashSet<&'static str>> = OnceLock::new();
|
||||
TAGS.get_or_init(|| {
|
||||
experimental_enum_variants()
|
||||
.into_iter()
|
||||
.filter_map(|variant| match variant {
|
||||
crate::experimental_api::ExperimentalEnumVariant {
|
||||
type_name: "ThreadItem",
|
||||
serialized_name,
|
||||
encoding: ExperimentalEnumVariantEncoding::TaggedObject { tag_name: "type" },
|
||||
..
|
||||
} => Some(*serialized_name),
|
||||
_ => None,
|
||||
})
|
||||
.collect()
|
||||
})
|
||||
}
|
||||
|
||||
impl TryFrom<JSONRPCRequest> for ServerRequest {
|
||||
type Error = serde_json::Error;
|
||||
|
||||
|
||||
@@ -3633,6 +3633,24 @@ impl ThreadItem {
|
||||
| ThreadItem::ContextCompaction { id, .. } => id,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_experimental(&self) -> bool {
|
||||
crate::experimental_api::ExperimentalApi::experimental_reason(self).is_some()
|
||||
}
|
||||
}
|
||||
|
||||
impl Thread {
|
||||
pub fn strip_experimental_thread_items(&mut self) {
|
||||
for turn in &mut self.turns {
|
||||
turn.strip_experimental_thread_items();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Turn {
|
||||
pub fn strip_experimental_thread_items(&mut self) {
|
||||
self.items.retain(|item| !item.is_experimental());
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
|
||||
@@ -7,6 +7,8 @@ use crate::outgoing_message::OutgoingMessage;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_app_server_protocol::filter_experimental_thread_items_in_server_notification;
|
||||
use codex_app_server_protocol::strip_experimental_thread_items_from_serialized_response;
|
||||
use futures::SinkExt;
|
||||
use futures::StreamExt;
|
||||
use owo_colors::OwoColorize;
|
||||
@@ -584,7 +586,9 @@ async fn send_message_to_connection(
|
||||
warn!("dropping message for disconnected connection: {connection_id:?}");
|
||||
return false;
|
||||
};
|
||||
let message = filter_outgoing_message_for_connection(connection_state, message);
|
||||
let Some(message) = filter_outgoing_message_for_connection(connection_state, message) else {
|
||||
return false;
|
||||
};
|
||||
if should_skip_notification_for_connection(connection_state, &message) {
|
||||
return false;
|
||||
}
|
||||
@@ -613,7 +617,7 @@ async fn send_message_to_connection(
|
||||
fn filter_outgoing_message_for_connection(
|
||||
connection_state: &OutboundConnectionState,
|
||||
message: OutgoingMessage,
|
||||
) -> OutgoingMessage {
|
||||
) -> Option<OutgoingMessage> {
|
||||
let experimental_api_enabled = connection_state
|
||||
.experimental_api_enabled
|
||||
.load(Ordering::Acquire);
|
||||
@@ -625,12 +629,25 @@ fn filter_outgoing_message_for_connection(
|
||||
if !experimental_api_enabled {
|
||||
params.strip_experimental_fields();
|
||||
}
|
||||
OutgoingMessage::Request(ServerRequest::CommandExecutionRequestApproval {
|
||||
request_id,
|
||||
params,
|
||||
})
|
||||
Some(OutgoingMessage::Request(
|
||||
ServerRequest::CommandExecutionRequestApproval { request_id, params },
|
||||
))
|
||||
}
|
||||
_ => message,
|
||||
OutgoingMessage::AppServerNotification(notification) => {
|
||||
if experimental_api_enabled {
|
||||
Some(OutgoingMessage::AppServerNotification(notification))
|
||||
} else {
|
||||
filter_experimental_thread_items_in_server_notification(notification)
|
||||
.map(OutgoingMessage::AppServerNotification)
|
||||
}
|
||||
}
|
||||
OutgoingMessage::Response(mut response) => {
|
||||
if !experimental_api_enabled {
|
||||
strip_experimental_thread_items_from_serialized_response(&mut response.result);
|
||||
}
|
||||
Some(OutgoingMessage::Response(response))
|
||||
}
|
||||
_ => Some(message),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user