Add thread/compact v2 (#10445)

- add `thread/compact` as a trigger-only v2 RPC that submits
`Op::Compact` and returns `{}` immediately.
- add v2 compaction e2e coverage for success and invalid/unknown thread
ids, and update protocol schemas/docs.
This commit is contained in:
Ahmed Ibrahim
2026-02-03 18:15:55 -08:00
committed by GitHub
parent fcaed4cb88
commit 38a47700b5
14 changed files with 313 additions and 1 deletions

View File

@@ -15,9 +15,12 @@ use app_test_support::write_chatgpt_auth;
use app_test_support::write_mock_responses_config_toml;
use codex_app_server_protocol::ItemCompletedNotification;
use codex_app_server_protocol::ItemStartedNotification;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadCompactStartParams;
use codex_app_server_protocol::ThreadCompactStartResponse;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
@@ -39,6 +42,7 @@ use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
const AUTO_COMPACT_LIMIT: i64 = 1_000;
const COMPACT_PROMPT: &str = "Summarize the conversation.";
const INVALID_REQUEST_ERROR_CODE: i64 = -32600;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn auto_compaction_local_emits_started_and_completed_items() -> Result<()> {
@@ -196,6 +200,134 @@ async fn auto_compaction_remote_emits_started_and_completed_items() -> Result<()
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn thread_compact_start_triggers_compaction_and_returns_empty_response() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let sse = responses::sse(vec![
responses::ev_assistant_message("m1", "MANUAL_COMPACT_SUMMARY"),
responses::ev_completed_with_tokens("r1", 200),
]);
responses::mount_sse_sequence(&server, vec![sse]).await;
let codex_home = TempDir::new()?;
write_mock_responses_config_toml(
codex_home.path(),
&server.uri(),
&BTreeMap::default(),
AUTO_COMPACT_LIMIT,
None,
"mock_provider",
COMPACT_PROMPT,
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_id = start_thread(&mut mcp).await?;
let compact_id = mcp
.send_thread_compact_start_request(ThreadCompactStartParams {
thread_id: thread_id.clone(),
})
.await?;
let compact_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(compact_id)),
)
.await??;
let _compact: ThreadCompactStartResponse =
to_response::<ThreadCompactStartResponse>(compact_resp)?;
let started = wait_for_context_compaction_started(&mut mcp).await?;
let completed = wait_for_context_compaction_completed(&mut mcp).await?;
let ThreadItem::ContextCompaction { id: started_id } = started.item else {
unreachable!("started item should be context compaction");
};
let ThreadItem::ContextCompaction { id: completed_id } = completed.item else {
unreachable!("completed item should be context compaction");
};
assert_eq!(started.thread_id, thread_id);
assert_eq!(completed.thread_id, thread_id);
assert_eq!(started_id, completed_id);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn thread_compact_start_rejects_invalid_thread_id() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let codex_home = TempDir::new()?;
write_mock_responses_config_toml(
codex_home.path(),
&server.uri(),
&BTreeMap::default(),
AUTO_COMPACT_LIMIT,
None,
"mock_provider",
COMPACT_PROMPT,
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_thread_compact_start_request(ThreadCompactStartParams {
thread_id: "not-a-thread-id".to_string(),
})
.await?;
let error: JSONRPCError = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(error.error.code, INVALID_REQUEST_ERROR_CODE);
assert!(error.error.message.contains("invalid thread id"));
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn thread_compact_start_rejects_unknown_thread_id() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let codex_home = TempDir::new()?;
write_mock_responses_config_toml(
codex_home.path(),
&server.uri(),
&BTreeMap::default(),
AUTO_COMPACT_LIMIT,
None,
"mock_provider",
COMPACT_PROMPT,
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_thread_compact_start_request(ThreadCompactStartParams {
thread_id: "67e55044-10b1-426f-9247-bb680e5fe0c8".to_string(),
})
.await?;
let error: JSONRPCError = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(error.error.code, INVALID_REQUEST_ERROR_CODE);
assert!(error.error.message.contains("thread not found"));
Ok(())
}
async fn start_thread(mcp: &mut McpProcess) -> Result<String> {
let thread_id = mcp
.send_thread_start_request(ThreadStartParams {