Add app-server compaction item notifications tests (#10123)

- add v2 tests covering local + remote auto-compaction item
started/completed notifications
This commit is contained in:
Ahmed Ibrahim
2026-01-28 17:00:38 -08:00
committed by GitHub
parent ce3d764ae1
commit 52609c6f42
4 changed files with 357 additions and 0 deletions

View File

@@ -0,0 +1,72 @@
use codex_core::features::FEATURES;
use codex_core::features::Feature;
use std::collections::BTreeMap;
use std::path::Path;
pub fn write_mock_responses_config_toml(
codex_home: &Path,
server_uri: &str,
feature_flags: &BTreeMap<Feature, bool>,
auto_compact_limit: i64,
requires_openai_auth: Option<bool>,
model_provider_id: &str,
compact_prompt: &str,
) -> std::io::Result<()> {
// Phase 1: build the features block for config.toml.
let mut features = BTreeMap::from([(Feature::RemoteModels, false)]);
for (feature, enabled) in feature_flags {
features.insert(*feature, *enabled);
}
let feature_entries = features
.into_iter()
.map(|(feature, enabled)| {
let key = FEATURES
.iter()
.find(|spec| spec.id == feature)
.map(|spec| spec.key)
.unwrap_or_else(|| panic!("missing feature key for {feature:?}"));
format!("{key} = {enabled}")
})
.collect::<Vec<_>>()
.join("\n");
// Phase 2: build provider-specific config bits.
let requires_line = match requires_openai_auth {
Some(true) => "requires_openai_auth = true\n".to_string(),
Some(false) | None => String::new(),
};
let provider_block = if model_provider_id == "openai" {
String::new()
} else {
format!(
r#"
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "responses"
request_max_retries = 0
stream_max_retries = 0
{requires_line}
"#
)
};
// Phase 3: write the final config file.
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
format!(
r#"
model = "mock-model"
approval_policy = "never"
sandbox_mode = "read-only"
compact_prompt = "{compact_prompt}"
model_auto_compact_token_limit = {auto_compact_limit}
model_provider = "{model_provider_id}"
[features]
{feature_entries}
{provider_block}
"#
),
)
}

View File

@@ -1,4 +1,5 @@
mod auth_fixtures;
mod config;
mod mcp_process;
mod mock_model_server;
mod models_cache;
@@ -10,6 +11,7 @@ pub use auth_fixtures::ChatGptIdTokenClaims;
pub use auth_fixtures::encode_id_token;
pub use auth_fixtures::write_chatgpt_auth;
use codex_app_server_protocol::JSONRPCResponse;
pub use config::write_mock_responses_config_toml;
pub use core_test_support::format_with_current_shell;
pub use core_test_support::format_with_current_shell_display;
pub use core_test_support::format_with_current_shell_display_non_login;

View File

@@ -0,0 +1,282 @@
//! End-to-end compaction flow tests.
//!
//! Phases:
//! 1) Arrange: mock responses/compact endpoints + config.
//! 2) Act: start a thread and submit multiple turns to trigger auto-compaction.
//! 3) Assert: verify item/started + item/completed notifications for context compaction.
#![expect(clippy::expect_used)]
use anyhow::Result;
use app_test_support::ChatGptAuthFixture;
use app_test_support::McpProcess;
use app_test_support::to_response;
use app_test_support::write_chatgpt_auth;
use app_test_support::write_mock_responses_config_toml;
use codex_app_server_protocol::ItemCompletedNotification;
use codex_app_server_protocol::ItemStartedNotification;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnCompletedNotification;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::UserInput as V2UserInput;
use codex_core::auth::AuthCredentialsStoreMode;
use codex_core::features::Feature;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use core_test_support::responses;
use core_test_support::skip_if_no_network;
use pretty_assertions::assert_eq;
use std::collections::BTreeMap;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
const AUTO_COMPACT_LIMIT: i64 = 1_000;
const COMPACT_PROMPT: &str = "Summarize the conversation.";
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn auto_compaction_local_emits_started_and_completed_items() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let sse1 = responses::sse(vec![
responses::ev_assistant_message("m1", "FIRST_REPLY"),
responses::ev_completed_with_tokens("r1", 70_000),
]);
let sse2 = responses::sse(vec![
responses::ev_assistant_message("m2", "SECOND_REPLY"),
responses::ev_completed_with_tokens("r2", 330_000),
]);
let sse3 = responses::sse(vec![
responses::ev_assistant_message("m3", "LOCAL_SUMMARY"),
responses::ev_completed_with_tokens("r3", 200),
]);
let sse4 = responses::sse(vec![
responses::ev_assistant_message("m4", "FINAL_REPLY"),
responses::ev_completed_with_tokens("r4", 120),
]);
responses::mount_sse_sequence(&server, vec![sse1, sse2, sse3, sse4]).await;
let codex_home = TempDir::new()?;
write_mock_responses_config_toml(
codex_home.path(),
&server.uri(),
&BTreeMap::default(),
AUTO_COMPACT_LIMIT,
None,
"mock_provider",
COMPACT_PROMPT,
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_id = start_thread(&mut mcp).await?;
for message in ["first", "second", "third"] {
send_turn_and_wait(&mut mcp, &thread_id, message).await?;
}
let started = wait_for_context_compaction_started(&mut mcp).await?;
let completed = wait_for_context_compaction_completed(&mut mcp).await?;
let ThreadItem::ContextCompaction { id: started_id } = started.item else {
unreachable!("started item should be context compaction");
};
let ThreadItem::ContextCompaction { id: completed_id } = completed.item else {
unreachable!("completed item should be context compaction");
};
assert_eq!(started.thread_id, thread_id);
assert_eq!(completed.thread_id, thread_id);
assert_eq!(started_id, completed_id);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn auto_compaction_remote_emits_started_and_completed_items() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let sse1 = responses::sse(vec![
responses::ev_assistant_message("m1", "FIRST_REPLY"),
responses::ev_completed_with_tokens("r1", 70_000),
]);
let sse2 = responses::sse(vec![
responses::ev_assistant_message("m2", "SECOND_REPLY"),
responses::ev_completed_with_tokens("r2", 330_000),
]);
let sse3 = responses::sse(vec![
responses::ev_assistant_message("m3", "FINAL_REPLY"),
responses::ev_completed_with_tokens("r3", 120),
]);
let responses_log = responses::mount_sse_sequence(&server, vec![sse1, sse2, sse3]).await;
let compacted_history = vec![
ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: "REMOTE_COMPACT_SUMMARY".to_string(),
}],
end_turn: None,
},
ResponseItem::Compaction {
encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(),
},
];
let compact_mock = responses::mount_compact_json_once(
&server,
serde_json::json!({ "output": compacted_history }),
)
.await;
let codex_home = TempDir::new()?;
let mut features = BTreeMap::default();
features.insert(Feature::RemoteCompaction, true);
write_mock_responses_config_toml(
codex_home.path(),
&server.uri(),
&features,
AUTO_COMPACT_LIMIT,
Some(true),
"openai",
COMPACT_PROMPT,
)?;
write_chatgpt_auth(
codex_home.path(),
ChatGptAuthFixture::new("access-chatgpt").plan_type("pro"),
AuthCredentialsStoreMode::File,
)?;
let server_base_url = format!("{}/v1", server.uri());
let mut mcp = McpProcess::new_with_env(
codex_home.path(),
&[
("OPENAI_BASE_URL", Some(server_base_url.as_str())),
("OPENAI_API_KEY", None),
],
)
.await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_id = start_thread(&mut mcp).await?;
for message in ["first", "second", "third"] {
send_turn_and_wait(&mut mcp, &thread_id, message).await?;
}
let started = wait_for_context_compaction_started(&mut mcp).await?;
let completed = wait_for_context_compaction_completed(&mut mcp).await?;
let ThreadItem::ContextCompaction { id: started_id } = started.item else {
unreachable!("started item should be context compaction");
};
let ThreadItem::ContextCompaction { id: completed_id } = completed.item else {
unreachable!("completed item should be context compaction");
};
assert_eq!(started.thread_id, thread_id);
assert_eq!(completed.thread_id, thread_id);
assert_eq!(started_id, completed_id);
let compact_requests = compact_mock.requests();
assert_eq!(compact_requests.len(), 1);
assert_eq!(compact_requests[0].path(), "/v1/responses/compact");
let response_requests = responses_log.requests();
assert_eq!(response_requests.len(), 3);
Ok(())
}
async fn start_thread(mcp: &mut McpProcess) -> Result<String> {
let thread_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
Ok(thread.id)
}
async fn send_turn_and_wait(mcp: &mut McpProcess, thread_id: &str, text: &str) -> Result<String> {
let turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread_id.to_string(),
input: vec![V2UserInput::Text {
text: text.to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
)
.await??;
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
wait_for_turn_completed(mcp, &turn.id).await?;
Ok(turn.id)
}
async fn wait_for_turn_completed(mcp: &mut McpProcess, turn_id: &str) -> Result<()> {
loop {
let notification: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let completed: TurnCompletedNotification =
serde_json::from_value(notification.params.clone().expect("turn/completed params"))?;
if completed.turn.id == turn_id {
return Ok(());
}
}
}
async fn wait_for_context_compaction_started(
mcp: &mut McpProcess,
) -> Result<ItemStartedNotification> {
loop {
let notification: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("item/started"),
)
.await??;
let started: ItemStartedNotification =
serde_json::from_value(notification.params.clone().expect("item/started params"))?;
if let ThreadItem::ContextCompaction { .. } = started.item {
return Ok(started);
}
}
}
async fn wait_for_context_compaction_completed(
mcp: &mut McpProcess,
) -> Result<ItemCompletedNotification> {
loop {
let notification: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("item/completed"),
)
.await??;
let completed: ItemCompletedNotification =
serde_json::from_value(notification.params.clone().expect("item/completed params"))?;
if let ThreadItem::ContextCompaction { .. } = completed.item {
return Ok(completed);
}
}
}

View File

@@ -2,6 +2,7 @@ mod account;
mod analytics;
mod app_list;
mod collaboration_mode_list;
mod compaction;
mod config_rpc;
mod dynamic_tools;
mod initialize;