mirror of
https://github.com/openai/codex.git
synced 2026-04-24 22:54:54 +00:00
- add v2 tests covering local + remote auto-compaction item started/completed notifications
283 lines
9.9 KiB
Rust
283 lines
9.9 KiB
Rust
//! End-to-end compaction flow tests.
|
|
//!
|
|
//! Phases:
|
|
//! 1) Arrange: mock responses/compact endpoints + config.
|
|
//! 2) Act: start a thread and submit multiple turns to trigger auto-compaction.
|
|
//! 3) Assert: verify item/started + item/completed notifications for context compaction.
|
|
|
|
#![expect(clippy::expect_used)]
|
|
|
|
use anyhow::Result;
|
|
use app_test_support::ChatGptAuthFixture;
|
|
use app_test_support::McpProcess;
|
|
use app_test_support::to_response;
|
|
use app_test_support::write_chatgpt_auth;
|
|
use app_test_support::write_mock_responses_config_toml;
|
|
use codex_app_server_protocol::ItemCompletedNotification;
|
|
use codex_app_server_protocol::ItemStartedNotification;
|
|
use codex_app_server_protocol::JSONRPCNotification;
|
|
use codex_app_server_protocol::JSONRPCResponse;
|
|
use codex_app_server_protocol::RequestId;
|
|
use codex_app_server_protocol::ThreadItem;
|
|
use codex_app_server_protocol::ThreadStartParams;
|
|
use codex_app_server_protocol::ThreadStartResponse;
|
|
use codex_app_server_protocol::TurnCompletedNotification;
|
|
use codex_app_server_protocol::TurnStartParams;
|
|
use codex_app_server_protocol::TurnStartResponse;
|
|
use codex_app_server_protocol::UserInput as V2UserInput;
|
|
use codex_core::auth::AuthCredentialsStoreMode;
|
|
use codex_core::features::Feature;
|
|
use codex_protocol::models::ContentItem;
|
|
use codex_protocol::models::ResponseItem;
|
|
use core_test_support::responses;
|
|
use core_test_support::skip_if_no_network;
|
|
use pretty_assertions::assert_eq;
|
|
use std::collections::BTreeMap;
|
|
use tempfile::TempDir;
|
|
use tokio::time::timeout;
|
|
|
|
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
|
|
const AUTO_COMPACT_LIMIT: i64 = 1_000;
|
|
const COMPACT_PROMPT: &str = "Summarize the conversation.";
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn auto_compaction_local_emits_started_and_completed_items() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let server = responses::start_mock_server().await;
|
|
let sse1 = responses::sse(vec![
|
|
responses::ev_assistant_message("m1", "FIRST_REPLY"),
|
|
responses::ev_completed_with_tokens("r1", 70_000),
|
|
]);
|
|
let sse2 = responses::sse(vec![
|
|
responses::ev_assistant_message("m2", "SECOND_REPLY"),
|
|
responses::ev_completed_with_tokens("r2", 330_000),
|
|
]);
|
|
let sse3 = responses::sse(vec![
|
|
responses::ev_assistant_message("m3", "LOCAL_SUMMARY"),
|
|
responses::ev_completed_with_tokens("r3", 200),
|
|
]);
|
|
let sse4 = responses::sse(vec![
|
|
responses::ev_assistant_message("m4", "FINAL_REPLY"),
|
|
responses::ev_completed_with_tokens("r4", 120),
|
|
]);
|
|
responses::mount_sse_sequence(&server, vec![sse1, sse2, sse3, sse4]).await;
|
|
|
|
let codex_home = TempDir::new()?;
|
|
write_mock_responses_config_toml(
|
|
codex_home.path(),
|
|
&server.uri(),
|
|
&BTreeMap::default(),
|
|
AUTO_COMPACT_LIMIT,
|
|
None,
|
|
"mock_provider",
|
|
COMPACT_PROMPT,
|
|
)?;
|
|
|
|
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
|
|
|
let thread_id = start_thread(&mut mcp).await?;
|
|
for message in ["first", "second", "third"] {
|
|
send_turn_and_wait(&mut mcp, &thread_id, message).await?;
|
|
}
|
|
|
|
let started = wait_for_context_compaction_started(&mut mcp).await?;
|
|
let completed = wait_for_context_compaction_completed(&mut mcp).await?;
|
|
|
|
let ThreadItem::ContextCompaction { id: started_id } = started.item else {
|
|
unreachable!("started item should be context compaction");
|
|
};
|
|
let ThreadItem::ContextCompaction { id: completed_id } = completed.item else {
|
|
unreachable!("completed item should be context compaction");
|
|
};
|
|
|
|
assert_eq!(started.thread_id, thread_id);
|
|
assert_eq!(completed.thread_id, thread_id);
|
|
assert_eq!(started_id, completed_id);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn auto_compaction_remote_emits_started_and_completed_items() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let server = responses::start_mock_server().await;
|
|
let sse1 = responses::sse(vec![
|
|
responses::ev_assistant_message("m1", "FIRST_REPLY"),
|
|
responses::ev_completed_with_tokens("r1", 70_000),
|
|
]);
|
|
let sse2 = responses::sse(vec![
|
|
responses::ev_assistant_message("m2", "SECOND_REPLY"),
|
|
responses::ev_completed_with_tokens("r2", 330_000),
|
|
]);
|
|
let sse3 = responses::sse(vec![
|
|
responses::ev_assistant_message("m3", "FINAL_REPLY"),
|
|
responses::ev_completed_with_tokens("r3", 120),
|
|
]);
|
|
let responses_log = responses::mount_sse_sequence(&server, vec![sse1, sse2, sse3]).await;
|
|
|
|
let compacted_history = vec![
|
|
ResponseItem::Message {
|
|
id: None,
|
|
role: "assistant".to_string(),
|
|
content: vec![ContentItem::OutputText {
|
|
text: "REMOTE_COMPACT_SUMMARY".to_string(),
|
|
}],
|
|
end_turn: None,
|
|
},
|
|
ResponseItem::Compaction {
|
|
encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(),
|
|
},
|
|
];
|
|
let compact_mock = responses::mount_compact_json_once(
|
|
&server,
|
|
serde_json::json!({ "output": compacted_history }),
|
|
)
|
|
.await;
|
|
|
|
let codex_home = TempDir::new()?;
|
|
let mut features = BTreeMap::default();
|
|
features.insert(Feature::RemoteCompaction, true);
|
|
write_mock_responses_config_toml(
|
|
codex_home.path(),
|
|
&server.uri(),
|
|
&features,
|
|
AUTO_COMPACT_LIMIT,
|
|
Some(true),
|
|
"openai",
|
|
COMPACT_PROMPT,
|
|
)?;
|
|
write_chatgpt_auth(
|
|
codex_home.path(),
|
|
ChatGptAuthFixture::new("access-chatgpt").plan_type("pro"),
|
|
AuthCredentialsStoreMode::File,
|
|
)?;
|
|
|
|
let server_base_url = format!("{}/v1", server.uri());
|
|
let mut mcp = McpProcess::new_with_env(
|
|
codex_home.path(),
|
|
&[
|
|
("OPENAI_BASE_URL", Some(server_base_url.as_str())),
|
|
("OPENAI_API_KEY", None),
|
|
],
|
|
)
|
|
.await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
|
|
|
let thread_id = start_thread(&mut mcp).await?;
|
|
for message in ["first", "second", "third"] {
|
|
send_turn_and_wait(&mut mcp, &thread_id, message).await?;
|
|
}
|
|
|
|
let started = wait_for_context_compaction_started(&mut mcp).await?;
|
|
let completed = wait_for_context_compaction_completed(&mut mcp).await?;
|
|
|
|
let ThreadItem::ContextCompaction { id: started_id } = started.item else {
|
|
unreachable!("started item should be context compaction");
|
|
};
|
|
let ThreadItem::ContextCompaction { id: completed_id } = completed.item else {
|
|
unreachable!("completed item should be context compaction");
|
|
};
|
|
|
|
assert_eq!(started.thread_id, thread_id);
|
|
assert_eq!(completed.thread_id, thread_id);
|
|
assert_eq!(started_id, completed_id);
|
|
|
|
let compact_requests = compact_mock.requests();
|
|
assert_eq!(compact_requests.len(), 1);
|
|
assert_eq!(compact_requests[0].path(), "/v1/responses/compact");
|
|
|
|
let response_requests = responses_log.requests();
|
|
assert_eq!(response_requests.len(), 3);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn start_thread(mcp: &mut McpProcess) -> Result<String> {
|
|
let thread_id = mcp
|
|
.send_thread_start_request(ThreadStartParams {
|
|
model: Some("mock-model".to_string()),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let thread_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(thread_id)),
|
|
)
|
|
.await??;
|
|
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
|
|
Ok(thread.id)
|
|
}
|
|
|
|
async fn send_turn_and_wait(mcp: &mut McpProcess, thread_id: &str, text: &str) -> Result<String> {
|
|
let turn_id = mcp
|
|
.send_turn_start_request(TurnStartParams {
|
|
thread_id: thread_id.to_string(),
|
|
input: vec![V2UserInput::Text {
|
|
text: text.to_string(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let turn_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
|
|
)
|
|
.await??;
|
|
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
|
|
wait_for_turn_completed(mcp, &turn.id).await?;
|
|
Ok(turn.id)
|
|
}
|
|
|
|
async fn wait_for_turn_completed(mcp: &mut McpProcess, turn_id: &str) -> Result<()> {
|
|
loop {
|
|
let notification: JSONRPCNotification = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("turn/completed"),
|
|
)
|
|
.await??;
|
|
let completed: TurnCompletedNotification =
|
|
serde_json::from_value(notification.params.clone().expect("turn/completed params"))?;
|
|
if completed.turn.id == turn_id {
|
|
return Ok(());
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn wait_for_context_compaction_started(
|
|
mcp: &mut McpProcess,
|
|
) -> Result<ItemStartedNotification> {
|
|
loop {
|
|
let notification: JSONRPCNotification = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("item/started"),
|
|
)
|
|
.await??;
|
|
let started: ItemStartedNotification =
|
|
serde_json::from_value(notification.params.clone().expect("item/started params"))?;
|
|
if let ThreadItem::ContextCompaction { .. } = started.item {
|
|
return Ok(started);
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn wait_for_context_compaction_completed(
|
|
mcp: &mut McpProcess,
|
|
) -> Result<ItemCompletedNotification> {
|
|
loop {
|
|
let notification: JSONRPCNotification = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("item/completed"),
|
|
)
|
|
.await??;
|
|
let completed: ItemCompletedNotification =
|
|
serde_json::from_value(notification.params.clone().expect("item/completed params"))?;
|
|
if let ThreadItem::ContextCompaction { .. } = completed.item {
|
|
return Ok(completed);
|
|
}
|
|
}
|
|
}
|