reorganize logic + integration test

This commit is contained in:
Sayan Sisodiya
2026-05-21 21:59:45 -07:00
parent d34820282c
commit 68aef25aac
9 changed files with 385 additions and 87 deletions

1
codex-rs/Cargo.lock generated
View File

@@ -4141,7 +4141,6 @@ dependencies = [
"codex-model-provider-info",
"codex-protocol",
"codex-tools",
"codex-utils-output-truncation",
"http 1.4.0",
"pretty_assertions",
"schemars 0.8.22",

View File

@@ -68,4 +68,5 @@ mod turn_interrupt;
mod turn_start;
mod turn_start_zsh_fork;
mod turn_steer;
mod web_search;
mod windows_sandbox_setup;

View File

@@ -0,0 +1,216 @@
use std::path::Path;
use std::time::Duration;
use anyhow::Context;
use anyhow::Result;
use app_test_support::ChatGptAuthFixture;
use app_test_support::McpProcess;
use app_test_support::to_response;
use app_test_support::write_chatgpt_auth;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::UserInput as V2UserInput;
use codex_config::types::AuthCredentialsStoreMode;
use core_test_support::responses;
use pretty_assertions::assert_eq;
use serde_json::Value;
use serde_json::json;
use tempfile::TempDir;
use tokio::time::timeout;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path;
// macOS and Windows Bazel CI can spend tens of seconds starting app-server
// subprocesses or processing test RPCs under load.
#[cfg(any(target_os = "macos", windows))]
const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(60);
#[cfg(not(any(target_os = "macos", windows)))]
const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(10);
#[tokio::test]
async fn standalone_web_search_round_trips_encrypted_output() -> Result<()> {
let call_id = "web-run-1";
let server = responses::start_mock_server().await;
mount_search_response(&server).await;
let response_mock = responses::mount_sse_sequence(
&server,
vec![
responses::sse(vec![
responses::ev_response_created("resp-1"),
responses::ev_function_call_with_namespace(
call_id,
"web",
"run",
&json!({
"search_query": [{"q": "standalone web search"}],
})
.to_string(),
),
responses::ev_completed("resp-1"),
]),
responses::sse(vec![
responses::ev_assistant_message("msg-1", "Done"),
responses::ev_completed("resp-2"),
]),
],
)
.await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
write_chatgpt_auth(
codex_home.path(),
ChatGptAuthFixture::new("access-chatgpt"),
AuthCredentialsStoreMode::File,
)?;
let mut mcp = McpProcess::new_with_env(codex_home.path(), &[("OPENAI_API_KEY", None)]).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_req = mcp
.send_thread_start_request(ThreadStartParams::default())
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
input: vec![V2UserInput::Text {
text: "Search the web".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
)
.await??;
let _turn: TurnStartResponse = to_response::<TurnStartResponse>(turn_resp)?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let requests = response_mock.requests();
assert_eq!(requests.len(), 2);
let first_response = requests[0].body_json();
assert!(
requests[0].tool_by_name("web", "run").is_some(),
"web.run should be sent to the model"
);
assert!(
!has_hosted_web_search(&first_response),
"standalone web search should replace hosted web search"
);
let search_body = search_request_body(&server).await?;
assert_eq!(
search_body["commands"],
json!({
"search_query": [{"q": "standalone web search"}],
})
);
assert_eq!(
search_body["input"]
.as_array()
.context("search input should be an array")?
.last(),
Some(&json!({
"type": "message",
"role": "user",
"content": [{"type": "input_text", "text": "Search the web"}],
}))
);
assert_eq!(
requests[1].function_call_output(call_id),
json!({
"type": "function_call_output",
"call_id": call_id,
"output": [{
"type": "encrypted_content",
"encrypted_content": "ciphertext",
}],
})
);
Ok(())
}
async fn mount_search_response(server: &MockServer) {
Mock::given(method("POST"))
.and(path("/api/codex/alpha/search"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"encrypted_output": "ciphertext",
})))
.expect(1)
.mount(server)
.await;
}
fn has_hosted_web_search(body: &Value) -> bool {
body.get("tools")
.and_then(Value::as_array)
.is_some_and(|tools| {
tools
.iter()
.any(|tool| tool.get("type").and_then(Value::as_str) == Some("web_search"))
})
}
async fn search_request_body(server: &MockServer) -> Result<Value> {
server
.received_requests()
.await
.context("failed to fetch received requests")?
.into_iter()
.find(|request| request.url.path() == "/api/codex/alpha/search")
.context("expected standalone search request")?
.body_json()
.context("search request body should be JSON")
}
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
std::fs::write(
codex_home.join("config.toml"),
format!(
r#"
model = "mock-model"
approval_policy = "never"
sandbox_mode = "read-only"
model_provider = "openai-custom"
chatgpt_base_url = "{server_uri}"
[features]
standalone_web_search = true
[model_providers.openai-custom]
name = "OpenAI"
base_url = "{server_uri}/api/codex"
wire_api = "responses"
request_max_retries = 0
stream_max_retries = 0
supports_websockets = false
requires_openai_auth = true
"#
),
)
}

View File

@@ -23,7 +23,6 @@ codex-model-provider = { workspace = true }
codex-model-provider-info = { workspace = true }
codex-protocol = { workspace = true }
codex-tools = { workspace = true }
codex-utils-output-truncation = { workspace = true }
http = { workspace = true }
schemars = { workspace = true }
serde_json = { workspace = true }

View File

@@ -1,9 +1,8 @@
use codex_api::SearchInput;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_utils_output_truncation::TruncationPolicy;
use codex_utils_output_truncation::approx_token_count;
use codex_utils_output_truncation::truncate_text;
use codex_tools::retain_tail_from_last_n_user_messages;
use codex_tools::truncate_assistant_output_text_to_token_budget;
const ASSISTANT_CONTEXT_TOKEN_LIMIT: usize = 1_000;
@@ -17,8 +16,8 @@ pub(crate) fn recent_input(items: &[ResponseItem]) -> Option<SearchInput> {
push_visible_message(&mut messages, item);
}
let mut messages = keep_current_and_previous_turn(messages);
cap_assistant_text(&mut messages);
retain_tail_from_last_n_user_messages(&mut messages, /*user_message_count*/ 2);
truncate_assistant_output_text_to_token_budget(&mut messages, ASSISTANT_CONTEXT_TOKEN_LIMIT);
(!messages.is_empty()).then_some(SearchInput::Items(messages))
}
@@ -49,67 +48,13 @@ fn push_visible_message(messages: &mut Vec<ResponseItem>, item: &ResponseItem) {
}
}
fn is_user_message(item: &ResponseItem) -> bool {
matches!(item, ResponseItem::Message { role, .. } if role == "user")
}
fn keep_current_and_previous_turn(mut messages: Vec<ResponseItem>) -> Vec<ResponseItem> {
let Some(current_user_idx) = messages.iter().rposition(is_user_message) else {
return Vec::new();
};
messages.truncate(current_user_idx + 1);
let previous_user_idx = messages[..current_user_idx]
.iter()
.rposition(is_user_message)
.unwrap_or(current_user_idx);
messages.drain(..previous_user_idx);
messages
}
fn cap_assistant_text(messages: &mut Vec<ResponseItem>) {
let mut remaining_budget = ASSISTANT_CONTEXT_TOKEN_LIMIT;
messages.retain_mut(|item| {
let ResponseItem::Message { role, content, .. } = item else {
return true;
};
if role != "assistant" {
return true;
}
content.retain_mut(|content_item| {
let ContentItem::OutputText { text } = content_item else {
return true;
};
if remaining_budget == 0 {
return false;
}
let token_count = approx_token_count(text);
if token_count <= remaining_budget {
remaining_budget = remaining_budget.saturating_sub(token_count);
return true;
}
*text = truncate_text(text, TruncationPolicy::Tokens(remaining_budget));
remaining_budget = 0;
true
});
!content.is_empty()
});
}
#[cfg(test)]
mod tests {
use codex_api::SearchInput;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_utils_output_truncation::TruncationPolicy;
use codex_utils_output_truncation::truncate_text;
use pretty_assertions::assert_eq;
use super::ASSISTANT_CONTEXT_TOKEN_LIMIT;
use super::recent_input;
fn message(role: &str, text: &str) -> ResponseItem {
@@ -190,30 +135,4 @@ mod tests {
]))
);
}
#[test]
fn caps_assistant_text_in_recent_tail() {
let long_assistant = "a".repeat(4_100);
let items = vec![
message("user", "previous user"),
message("assistant", &long_assistant),
message("assistant", "after the assistant budget"),
message("user", "current user"),
];
assert_eq!(
recent_input(&items),
Some(SearchInput::Items(vec![
message("user", "previous user"),
message(
"assistant",
&truncate_text(
&long_assistant,
TruncationPolicy::Tokens(ASSISTANT_CONTEXT_TOKEN_LIMIT)
),
),
message("user", "current user"),
]))
);
}
}

View File

@@ -15,6 +15,7 @@ use codex_login::default_client::build_reqwest_client;
use codex_model_provider::SharedModelProvider;
use codex_tools::ResponsesApiNamespace;
use codex_tools::ResponsesApiNamespaceTool;
use codex_tools::ToolExposure;
use codex_tools::default_namespace_description;
use http::HeaderMap;
@@ -58,6 +59,10 @@ impl ToolExecutor<ToolCall> for WebSearchTool {
})
}
fn exposure(&self) -> ToolExposure {
ToolExposure::DirectModelOnly
}
async fn handle(&self, call: ToolCall) -> Result<Box<dyn ToolOutput>, FunctionCallError> {
let commands = parse_commands(&call)?;
let provider = self

View File

@@ -902,6 +902,13 @@ pub enum ResponseItem {
Other,
}
impl ResponseItem {
/// Returns whether this item is an ordinary user-role message.
pub fn is_user_message(&self) -> bool {
matches!(self, Self::Message { role, .. } if role == "user")
}
}
pub const BASE_INSTRUCTIONS_DEFAULT: &str = include_str!("prompts/base_instructions/default.md");
/// Base instructions for the model in a thread. Corresponds to the `instructions` field in the ResponsesAPI.

View File

@@ -8,6 +8,7 @@ mod image_detail;
mod json_schema;
mod mcp_tool;
mod request_plugin_install;
mod response_history;
mod responses_api;
mod tool_call;
mod tool_config;
@@ -45,6 +46,8 @@ pub use request_plugin_install::RequestPluginInstallResult;
pub use request_plugin_install::all_requested_connectors_picked_up;
pub use request_plugin_install::build_request_plugin_install_elicitation_request;
pub use request_plugin_install::verified_connector_install_completed;
pub use response_history::retain_tail_from_last_n_user_messages;
pub use response_history::truncate_assistant_output_text_to_token_budget;
pub use responses_api::FreeformTool;
pub use responses_api::FreeformToolFormat;
pub use responses_api::LoadableToolSpec;

View File

@@ -0,0 +1,149 @@
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_utils_output_truncation::TruncationPolicy;
use codex_utils_output_truncation::approx_token_count;
use codex_utils_output_truncation::truncate_text;
/// Retains items from the earliest of the last `user_message_count` user
/// messages through the latest user message.
pub fn retain_tail_from_last_n_user_messages(
items: &mut Vec<ResponseItem>,
user_message_count: usize,
) {
if user_message_count == 0 {
items.clear();
return;
}
let Some(latest_user_idx) = items.iter().rposition(ResponseItem::is_user_message) else {
items.clear();
return;
};
items.truncate(latest_user_idx + 1);
let earliest_retained_user_idx = items
.iter()
.enumerate()
.rev()
.filter(|(_, item)| item.is_user_message())
.take(user_message_count)
.last()
.map(|(idx, _)| idx)
.unwrap_or(latest_user_idx);
items.drain(..earliest_retained_user_idx);
}
/// Truncates assistant output text to a shared token budget across items.
pub fn truncate_assistant_output_text_to_token_budget(
items: &mut Vec<ResponseItem>,
max_tokens: usize,
) {
let mut remaining_budget = max_tokens;
items.retain_mut(|item| {
let ResponseItem::Message { role, content, .. } = item else {
return true;
};
if role != "assistant" {
return true;
}
content.retain_mut(|content_item| {
let ContentItem::OutputText { text } = content_item else {
return true;
};
if remaining_budget == 0 {
return false;
}
let token_count = approx_token_count(text);
if token_count <= remaining_budget {
remaining_budget = remaining_budget.saturating_sub(token_count);
return true;
}
*text = truncate_text(text, TruncationPolicy::Tokens(remaining_budget));
remaining_budget = 0;
true
});
!content.is_empty()
});
}
#[cfg(test)]
mod tests {
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_utils_output_truncation::TruncationPolicy;
use codex_utils_output_truncation::truncate_text;
use pretty_assertions::assert_eq;
use super::retain_tail_from_last_n_user_messages;
use super::truncate_assistant_output_text_to_token_budget;
fn message(role: &str, text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
role: role.to_string(),
content: vec![if role == "assistant" {
ContentItem::OutputText {
text: text.to_string(),
}
} else {
ContentItem::InputText {
text: text.to_string(),
}
}],
phase: None,
}
}
#[test]
fn retains_tail_through_latest_user_message() {
let mut items = vec![
message("system", "system"),
message("user", "old user"),
message("assistant", "old assistant"),
message("user", "previous user"),
message("assistant", "previous assistant"),
message("user", "current user"),
message("assistant", "later assistant"),
];
retain_tail_from_last_n_user_messages(&mut items, /*user_message_count*/ 2);
assert_eq!(
items,
vec![
message("user", "previous user"),
message("assistant", "previous assistant"),
message("user", "current user"),
]
);
}
#[test]
fn truncates_assistant_output_text_across_items() {
let long_assistant = "a".repeat(16);
let mut items = vec![
message("user", "previous user"),
message("assistant", &long_assistant),
message("assistant", "after budget"),
message("user", "current user"),
];
truncate_assistant_output_text_to_token_budget(&mut items, /*max_tokens*/ 2);
assert_eq!(
items,
vec![
message("user", "previous user"),
message(
"assistant",
&truncate_text(&long_assistant, TruncationPolicy::Tokens(2)),
),
message("user", "current user"),
]
);
}
}