Add v2 thread/compact

This commit is contained in:
Ahmed Ibrahim
2026-02-02 13:47:54 -08:00
parent ae4eeff440
commit 9c01c75697
6 changed files with 191 additions and 0 deletions

View File

@@ -152,6 +152,10 @@ client_request_definitions! {
params: v2::ThreadReadParams,
response: v2::ThreadReadResponse,
},
ThreadCompact => "thread/compact" {
params: v2::ThreadCompactParams,
response: v2::TurnStartResponse,
},
SkillsList => "skills/list" {
params: v2::SkillsListParams,
response: v2::SkillsListResponse,

View File

@@ -1449,6 +1449,13 @@ pub struct ThreadReadResponse {
pub thread: Thread,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadCompactParams {
pub thread_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]

View File

@@ -81,6 +81,7 @@ Example (from OpenAI's official VSCode extension):
- `thread/list` — page through stored rollouts; supports cursor-based pagination and optional `modelProviders` filtering.
- `thread/loaded/list` — list the thread ids currently loaded in memory.
- `thread/read` — read a stored thread by id without resuming it; optionally include turns via `includeTurns`.
- `thread/compact` — trigger a context compaction run for a thread; returns a `TurnStartResponse`. No `turn/started` notification is emitted; clients should rely on the core event stream (`task_started`) and item notifications.
- `thread/archive` — move a threads rollout file into the archived directory; returns `{}` on success.
- `thread/name/set` — set or update a threads user-facing name; returns `{}` on success. Thread names are not required to be unique; name lookups resolve to the most recently updated thread.
- `thread/unarchive` — move an archived rollout file back into the sessions directory; returns the restored `thread` on success.

View File

@@ -98,6 +98,7 @@ use codex_app_server_protocol::SkillsListResponse;
use codex_app_server_protocol::Thread;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadArchiveResponse;
use codex_app_server_protocol::ThreadCompactParams;
use codex_app_server_protocol::ThreadForkParams;
use codex_app_server_protocol::ThreadForkResponse;
use codex_app_server_protocol::ThreadItem;
@@ -456,6 +457,9 @@ impl CodexMessageProcessor {
ClientRequest::ThreadRead { request_id, params } => {
self.thread_read(request_id, params).await;
}
ClientRequest::ThreadCompact { request_id, params } => {
self.thread_compact(request_id, params).await;
}
ClientRequest::SkillsList { request_id, params } => {
self.skills_list(request_id, params).await;
}
@@ -4153,6 +4157,40 @@ impl CodexMessageProcessor {
}
}
async fn thread_compact(&self, request_id: RequestId, params: ThreadCompactParams) {
let ThreadCompactParams { thread_id } = params;
let (_, thread) = match self.load_thread(&thread_id).await {
Ok(v) => v,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
}
};
let turn_id = thread.submit(Op::Compact).await;
match turn_id {
Ok(turn_id) => {
let turn = Turn {
id: turn_id,
items: Vec::new(),
error: None,
status: TurnStatus::InProgress,
};
let response = TurnStartResponse { turn };
self.outgoing.send_response(request_id, response).await;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to compact thread: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
}
}
}
fn build_review_turn(turn_id: String, display_text: &str) -> Turn {
let items = if display_text.is_empty() {
Vec::new()

View File

@@ -48,6 +48,7 @@ use codex_app_server_protocol::SendUserTurnParams;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::SetDefaultModelParams;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadCompactParams;
use codex_app_server_protocol::ThreadForkParams;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadLoadedListParams;
@@ -427,6 +428,15 @@ impl McpProcess {
self.send_request("thread/read", params).await
}
/// Send a `thread/compact` JSON-RPC request.
pub async fn send_thread_compact_request(
&mut self,
params: ThreadCompactParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("thread/compact", params).await
}
/// Send a `model/list` JSON-RPC request.
pub async fn send_list_models_request(
&mut self,

View File

@@ -15,15 +15,19 @@ use app_test_support::write_chatgpt_auth;
use app_test_support::write_mock_responses_config_toml;
use codex_app_server_protocol::ItemCompletedNotification;
use codex_app_server_protocol::ItemStartedNotification;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadCompactParams;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnCompletedNotification;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStartedNotification;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::UserInput as V2UserInput;
use codex_core::auth::AuthCredentialsStoreMode;
use codex_core::features::Feature;
@@ -38,6 +42,7 @@ use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
const AUTO_COMPACT_LIMIT: i64 = 1_000;
const MANUAL_COMPACT_LIMIT: i64 = 1_000_000;
const COMPACT_PROMPT: &str = "Summarize the conversation.";
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
@@ -195,6 +200,74 @@ async fn auto_compaction_remote_emits_started_and_completed_items() -> Result<()
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn manual_compaction_returns_turn_start_response_and_emits_items() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let sse1 = responses::sse(vec![
responses::ev_assistant_message("m1", "FIRST_REPLY"),
responses::ev_completed_with_tokens("r1", 200),
]);
let sse2 = responses::sse(vec![
responses::ev_assistant_message("m2", "LOCAL_SUMMARY"),
responses::ev_completed_with_tokens("r2", 200),
]);
responses::mount_sse_sequence(&server, vec![sse1, sse2]).await;
let codex_home = TempDir::new()?;
write_mock_responses_config_toml(
codex_home.path(),
&server.uri(),
&BTreeMap::default(),
MANUAL_COMPACT_LIMIT,
None,
"mock_provider",
COMPACT_PROMPT,
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_id = start_thread(&mut mcp).await?;
send_turn_and_wait(&mut mcp, &thread_id, "first").await?;
// Clear turn/started, item/*, etc from the prior turn so we can assert about
// the compaction flow without unrelated buffered notifications.
mcp.clear_message_buffer();
let compact_request_id = mcp
.send_thread_compact_request(ThreadCompactParams {
thread_id: thread_id.clone(),
})
.await?;
let compact_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(compact_request_id)),
)
.await??;
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(compact_resp)?;
assert_eq!(turn.status, TurnStatus::InProgress);
let started =
wait_for_context_compaction_started_without_turn_started(&mut mcp, &turn.id).await?;
let completed =
wait_for_context_compaction_completed_without_turn_started(&mut mcp, &turn.id).await?;
let ThreadItem::ContextCompaction { id: started_id } = started.item else {
unreachable!("started item should be context compaction");
};
let ThreadItem::ContextCompaction { id: completed_id } = completed.item else {
unreachable!("completed item should be context compaction");
};
assert_eq!(started.thread_id, thread_id);
assert_eq!(completed.thread_id, thread_id);
assert_eq!(started_id, completed_id);
Ok(())
}
async fn start_thread(mcp: &mut McpProcess) -> Result<String> {
let thread_id = mcp
.send_thread_start_request(ThreadStartParams {
@@ -280,3 +353,61 @@ async fn wait_for_context_compaction_completed(
}
}
}
async fn wait_for_context_compaction_started_without_turn_started(
mcp: &mut McpProcess,
forbidden_turn_id: &str,
) -> Result<ItemStartedNotification> {
loop {
let message = timeout(DEFAULT_READ_TIMEOUT, mcp.read_next_message()).await??;
match message {
JSONRPCMessage::Notification(notification) if notification.method == "item/started" => {
let started: ItemStartedNotification = serde_json::from_value(
notification.params.clone().expect("item/started params"),
)?;
if let ThreadItem::ContextCompaction { .. } = started.item {
return Ok(started);
}
}
JSONRPCMessage::Notification(notification) if notification.method == "turn/started" => {
let started: TurnStartedNotification = serde_json::from_value(
notification.params.clone().expect("turn/started params"),
)?;
if started.turn.id == forbidden_turn_id {
anyhow::bail!("unexpected v2 turn/started notification for manual compaction");
}
}
_ => {}
}
}
}
async fn wait_for_context_compaction_completed_without_turn_started(
mcp: &mut McpProcess,
forbidden_turn_id: &str,
) -> Result<ItemCompletedNotification> {
loop {
let message = timeout(DEFAULT_READ_TIMEOUT, mcp.read_next_message()).await??;
match message {
JSONRPCMessage::Notification(notification)
if notification.method == "item/completed" =>
{
let completed: ItemCompletedNotification = serde_json::from_value(
notification.params.clone().expect("item/completed params"),
)?;
if let ThreadItem::ContextCompaction { .. } = completed.item {
return Ok(completed);
}
}
JSONRPCMessage::Notification(notification) if notification.method == "turn/started" => {
let started: TurnStartedNotification = serde_json::from_value(
notification.params.clone().expect("turn/started params"),
)?;
if started.turn.id == forbidden_turn_id {
anyhow::bail!("unexpected v2 turn/started notification for manual compaction");
}
}
_ => {}
}
}
}