From d8c11f15a8aabb76c956ec139ec00e01b70f6395 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Tue, 27 Jan 2026 14:02:38 -0800 Subject: [PATCH] compaction --- .../src/protocol/common.rs | 1 + .../src/protocol/thread_history.rs | 2 + .../app-server-protocol/src/protocol/v2.rs | 26 +++++++++++ .../app-server/src/bespoke_event_handling.rs | 14 +++++- codex-rs/core/src/compact.rs | 46 +++++++++++++++++-- codex-rs/core/src/compact_remote.rs | 16 +++++-- codex-rs/core/src/rollout/policy.rs | 3 +- codex-rs/core/tests/suite/compact.rs | 2 +- codex-rs/core/tests/suite/compact_remote.rs | 2 +- .../src/event_processor_with_human_output.rs | 7 ++- codex-rs/mcp-server/src/codex_tool_runner.rs | 3 +- codex-rs/protocol/src/items.rs | 8 ++++ codex-rs/protocol/src/protocol.rs | 13 ++++-- codex-rs/tui/src/chatwidget.rs | 7 ++- 14 files changed, 132 insertions(+), 18 deletions(-) diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index e32b060902..b196fde42f 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -598,6 +598,7 @@ server_notification_definitions! { ReasoningSummaryTextDelta => "item/reasoning/summaryTextDelta" (v2::ReasoningSummaryTextDeltaNotification), ReasoningSummaryPartAdded => "item/reasoning/summaryPartAdded" (v2::ReasoningSummaryPartAddedNotification), ReasoningTextDelta => "item/reasoning/textDelta" (v2::ReasoningTextDeltaNotification), + ContextCompactionStarted => "thread/compaction/started" (v2::ContextCompactionStartedNotification), ContextCompacted => "thread/compacted" (v2::ContextCompactedNotification), DeprecationNotice => "deprecationNotice" (v2::DeprecationNoticeNotification), ConfigWarning => "configWarning" (v2::ConfigWarningNotification), diff --git a/codex-rs/app-server-protocol/src/protocol/thread_history.rs b/codex-rs/app-server-protocol/src/protocol/thread_history.rs index e6c679b418..31a728494e 100644 --- a/codex-rs/app-server-protocol/src/protocol/thread_history.rs +++ b/codex-rs/app-server-protocol/src/protocol/thread_history.rs @@ -56,6 +56,8 @@ impl ThreadHistoryBuilder { self.handle_agent_reasoning_raw_content(payload) } EventMsg::TokenCount(_) => {} + EventMsg::ContextCompactionStarted(_) => {} + EventMsg::ContextCompactionEnded(_) => {} EventMsg::EnteredReviewMode(_) => {} EventMsg::ExitedReviewMode(_) => {} EventMsg::ThreadRolledBack(payload) => self.handle_thread_rollback(payload), diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index e4b3b90589..0905b5a2ea 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -1969,6 +1969,9 @@ pub enum ThreadItem { #[serde(rename_all = "camelCase")] #[ts(rename_all = "camelCase")] ExitedReviewMode { id: String, review: String }, + #[serde(rename_all = "camelCase")] + #[ts(rename_all = "camelCase")] + ContextCompaction { id: String }, } impl From for ThreadItem { @@ -1997,6 +2000,9 @@ impl From for ThreadItem { id: search.id, query: search.query, }, + CoreTurnItem::ContextCompaction(compaction) => { + ThreadItem::ContextCompaction { id: compaction.id } + } } } } @@ -2367,6 +2373,14 @@ pub struct ContextCompactedNotification { pub turn_id: String, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ContextCompactionStartedNotification { + pub thread_id: String, + pub turn_id: String, +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] @@ -2617,6 +2631,7 @@ mod tests { use super::*; use codex_protocol::items::AgentMessageContent; use codex_protocol::items::AgentMessageItem; + use codex_protocol::items::ContextCompactionItem; use codex_protocol::items::ReasoningItem; use codex_protocol::items::TurnItem; use codex_protocol::items::UserMessageItem; @@ -2741,6 +2756,17 @@ mod tests { query: "docs".to_string(), } ); + + let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem { + id: "compact-1".to_string(), + }); + + assert_eq!( + ThreadItem::from(compaction_item), + ThreadItem::ContextCompaction { + id: "compact-1".to_string(), + } + ); } #[test] diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 20e7db762b..f28279879a 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -24,6 +24,7 @@ use codex_app_server_protocol::CommandExecutionRequestApprovalParams; use codex_app_server_protocol::CommandExecutionRequestApprovalResponse; use codex_app_server_protocol::CommandExecutionStatus; use codex_app_server_protocol::ContextCompactedNotification; +use codex_app_server_protocol::ContextCompactionStartedNotification; use codex_app_server_protocol::DeprecationNoticeNotification; use codex_app_server_protocol::DynamicToolCallParams; use codex_app_server_protocol::ErrorNotification; @@ -601,7 +602,18 @@ pub(crate) async fn apply_bespoke_event_handling( .send_server_notification(ServerNotification::AgentMessageDelta(notification)) .await; } - EventMsg::ContextCompacted(..) => { + EventMsg::ContextCompactionStarted(..) => { + let notification = ContextCompactionStartedNotification { + thread_id: conversation_id.to_string(), + turn_id: event_turn_id.clone(), + }; + outgoing + .send_server_notification(ServerNotification::ContextCompactionStarted( + notification, + )) + .await; + } + EventMsg::ContextCompactionEnded(..) => { let notification = ContextCompactedNotification { thread_id: conversation_id.to_string(), turn_id: event_turn_id.clone(), diff --git a/codex-rs/core/src/compact.rs b/codex-rs/core/src/compact.rs index c365d2cfda..afc11ddee5 100644 --- a/codex-rs/core/src/compact.rs +++ b/codex-rs/core/src/compact.rs @@ -10,7 +10,8 @@ use crate::error::CodexErr; use crate::error::Result as CodexResult; use crate::features::Feature; use crate::protocol::CompactedItem; -use crate::protocol::ContextCompactedEvent; +use crate::protocol::ContextCompactionEndedEvent; +use crate::protocol::ContextCompactionStartedEvent; use crate::protocol::EventMsg; use crate::protocol::TurnContextItem; use crate::protocol::TurnStartedEvent; @@ -20,6 +21,7 @@ use crate::truncate::TruncationPolicy; use crate::truncate::approx_token_count; use crate::truncate::truncate_text; use crate::util::backoff; +use codex_protocol::items::ContextCompactionItem; use codex_protocol::items::TurnItem; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseInputItem; @@ -28,6 +30,7 @@ use codex_protocol::protocol::RolloutItem; use codex_protocol::user_input::UserInput; use futures::prelude::*; use tracing::error; +use uuid::Uuid; pub const SUMMARIZATION_PROMPT: &str = include_str!("../templates/compact/prompt.md"); pub const SUMMARY_PREFIX: &str = include_str!("../templates/compact/summary_prefix.md"); @@ -71,6 +74,9 @@ async fn run_compact_task_inner( turn_context: Arc, input: Vec, ) { + let compaction_item = compaction_turn_item(); + emit_compaction_started(&sess, &turn_context, &compaction_item).await; + let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input); let mut history = sess.clone_history().await; @@ -131,6 +137,7 @@ async fn run_compact_task_inner( break; } Err(CodexErr::Interrupted) => { + emit_compaction_ended(&sess, &turn_context, compaction_item.clone()).await; return; } Err(e @ CodexErr::ContextWindowExceeded) => { @@ -147,6 +154,7 @@ async fn run_compact_task_inner( sess.set_total_tokens_full(turn_context.as_ref()).await; let event = EventMsg::Error(e.to_error_event(None)); sess.send_event(&turn_context, event).await; + emit_compaction_ended(&sess, &turn_context, compaction_item.clone()).await; return; } Err(e) => { @@ -164,6 +172,7 @@ async fn run_compact_task_inner( } else { let event = EventMsg::Error(e.to_error_event(None)); sess.send_event(&turn_context, event).await; + emit_compaction_ended(&sess, &turn_context, compaction_item.clone()).await; return; } } @@ -193,8 +202,7 @@ async fn run_compact_task_inner( }); sess.persist_rollout_items(&[rollout_item]).await; - let event = EventMsg::ContextCompacted(ContextCompactedEvent {}); - sess.send_event(&turn_context, event).await; + emit_compaction_ended(&sess, &turn_context, compaction_item).await; let warning = EventMsg::Warning(WarningEvent { message: "Heads up: Long threads and multiple compactions can cause the model to be less accurate. Start a new thread when possible to keep threads small and targeted.".to_string(), @@ -202,6 +210,38 @@ async fn run_compact_task_inner( sess.send_event(&turn_context, warning).await; } +fn compaction_turn_item() -> TurnItem { + TurnItem::ContextCompaction(ContextCompactionItem { + id: Uuid::new_v4().to_string(), + }) +} + +pub(crate) async fn emit_compaction_started( + sess: &Session, + turn_context: &TurnContext, + item: &TurnItem, +) { + sess.send_event( + turn_context, + EventMsg::ContextCompactionStarted(ContextCompactionStartedEvent {}), + ) + .await; + sess.emit_turn_item_started(turn_context, item).await; +} + +pub(crate) async fn emit_compaction_ended( + sess: &Session, + turn_context: &TurnContext, + item: TurnItem, +) { + sess.emit_turn_item_completed(turn_context, item).await; + sess.send_event( + turn_context, + EventMsg::ContextCompactionEnded(ContextCompactionEndedEvent {}), + ) + .await; +} + pub fn content_items_to_text(content: &[ContentItem]) -> Option { let mut pieces = Vec::new(); for item in content { diff --git a/codex-rs/core/src/compact_remote.rs b/codex-rs/core/src/compact_remote.rs index aaa7fc68a7..2326d17f65 100644 --- a/codex-rs/core/src/compact_remote.rs +++ b/codex-rs/core/src/compact_remote.rs @@ -3,13 +3,17 @@ use std::sync::Arc; use crate::Prompt; use crate::codex::Session; use crate::codex::TurnContext; +use crate::compact::emit_compaction_ended; +use crate::compact::emit_compaction_started; use crate::error::Result as CodexResult; use crate::protocol::CompactedItem; -use crate::protocol::ContextCompactedEvent; use crate::protocol::EventMsg; use crate::protocol::RolloutItem; use crate::protocol::TurnStartedEvent; +use codex_protocol::items::ContextCompactionItem; +use codex_protocol::items::TurnItem; use codex_protocol::models::ResponseItem; +use uuid::Uuid; pub(crate) async fn run_inline_remote_auto_compact_task( sess: Arc, @@ -28,12 +32,19 @@ pub(crate) async fn run_remote_compact_task(sess: Arc, turn_context: Ar } async fn run_remote_compact_task_inner(sess: &Arc, turn_context: &Arc) { + let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem { + id: Uuid::new_v4().to_string(), + }); + emit_compaction_started(sess, turn_context, &compaction_item).await; + if let Err(err) = run_remote_compact_task_inner_impl(sess, turn_context).await { let event = EventMsg::Error( err.to_error_event(Some("Error running remote compact task".to_string())), ); sess.send_event(turn_context, event).await; } + + emit_compaction_ended(sess, turn_context, compaction_item).await; } async fn run_remote_compact_task_inner_impl( @@ -77,8 +88,5 @@ async fn run_remote_compact_task_inner_impl( sess.persist_rollout_items(&[RolloutItem::Compacted(compacted_item)]) .await; - let event = EventMsg::ContextCompacted(ContextCompactedEvent {}); - sess.send_event(turn_context, event).await; - Ok(()) } diff --git a/codex-rs/core/src/rollout/policy.rs b/codex-rs/core/src/rollout/policy.rs index 2d263c0539..3fa8e5e2cf 100644 --- a/codex-rs/core/src/rollout/policy.rs +++ b/codex-rs/core/src/rollout/policy.rs @@ -42,7 +42,8 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool { | EventMsg::AgentReasoning(_) | EventMsg::AgentReasoningRawContent(_) | EventMsg::TokenCount(_) - | EventMsg::ContextCompacted(_) + | EventMsg::ContextCompactionStarted(_) + | EventMsg::ContextCompactionEnded(_) | EventMsg::EnteredReviewMode(_) | EventMsg::ExitedReviewMode(_) | EventMsg::ThreadRolledBack(_) diff --git a/codex-rs/core/tests/suite/compact.rs b/codex-rs/core/tests/suite/compact.rs index f4706962a0..db9fc9b99f 100644 --- a/codex-rs/core/tests/suite/compact.rs +++ b/codex-rs/core/tests/suite/compact.rs @@ -1277,7 +1277,7 @@ async fn auto_compact_runs_after_resume_when_token_usage_is_over_limit() { .unwrap(); wait_for_event(&resumed.codex, |event| { - matches!(event, EventMsg::ContextCompacted(_)) + matches!(event, EventMsg::ContextCompactionEnded(_)) }) .await; wait_for_event(&resumed.codex, |event| { diff --git a/codex-rs/core/tests/suite/compact_remote.rs b/codex-rs/core/tests/suite/compact_remote.rs index 2fc5ba53c2..d0c4d41c8e 100644 --- a/codex-rs/core/tests/suite/compact_remote.rs +++ b/codex-rs/core/tests/suite/compact_remote.rs @@ -202,7 +202,7 @@ async fn remote_compact_runs_automatically() -> Result<()> { }) .await?; let message = wait_for_event_match(&codex, |ev| match ev { - EventMsg::ContextCompacted(_) => Some(true), + EventMsg::ContextCompactionEnded(_) => Some(true), _ => None, }) .await; diff --git a/codex-rs/exec/src/event_processor_with_human_output.rs b/codex-rs/exec/src/event_processor_with_human_output.rs index fb7aa54ab4..a9b4016a3d 100644 --- a/codex-rs/exec/src/event_processor_with_human_output.rs +++ b/codex-rs/exec/src/event_processor_with_human_output.rs @@ -590,8 +590,11 @@ impl EventProcessor for EventProcessorWithHumanOutput { ts_msg!(self, "task aborted: review ended"); } }, - EventMsg::ContextCompacted(_) => { - ts_msg!(self, "context compacted"); + EventMsg::ContextCompactionStarted(_) => { + ts_msg!(self, "context compaction started"); + } + EventMsg::ContextCompactionEnded(_) => { + ts_msg!(self, "context compaction ended"); } EventMsg::CollabAgentSpawnBegin(CollabAgentSpawnBeginEvent { call_id, diff --git a/codex-rs/mcp-server/src/codex_tool_runner.rs b/codex-rs/mcp-server/src/codex_tool_runner.rs index dcaf8a89e6..91629ed6e3 100644 --- a/codex-rs/mcp-server/src/codex_tool_runner.rs +++ b/codex-rs/mcp-server/src/codex_tool_runner.rs @@ -361,7 +361,8 @@ async fn run_codex_tool_session_inner( | EventMsg::ExitedReviewMode(_) | EventMsg::RequestUserInput(_) | EventMsg::DynamicToolCallRequest(_) - | EventMsg::ContextCompacted(_) + | EventMsg::ContextCompactionStarted(_) + | EventMsg::ContextCompactionEnded(_) | EventMsg::ThreadRolledBack(_) | EventMsg::CollabAgentSpawnBegin(_) | EventMsg::CollabAgentSpawnEnd(_) diff --git a/codex-rs/protocol/src/items.rs b/codex-rs/protocol/src/items.rs index 12fa6a0f51..6175132f90 100644 --- a/codex-rs/protocol/src/items.rs +++ b/codex-rs/protocol/src/items.rs @@ -21,6 +21,7 @@ pub enum TurnItem { AgentMessage(AgentMessageItem), Reasoning(ReasoningItem), WebSearch(WebSearchItem), + ContextCompaction(ContextCompactionItem), } #[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] @@ -57,6 +58,11 @@ pub struct WebSearchItem { pub action: WebSearchAction, } +#[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] +pub struct ContextCompactionItem { + pub id: String, +} + impl UserMessageItem { pub fn new(content: &[UserInput]) -> Self { Self { @@ -195,6 +201,7 @@ impl TurnItem { TurnItem::AgentMessage(item) => item.id.clone(), TurnItem::Reasoning(item) => item.id.clone(), TurnItem::WebSearch(item) => item.id.clone(), + TurnItem::ContextCompaction(item) => item.id.clone(), } } @@ -204,6 +211,7 @@ impl TurnItem { TurnItem::AgentMessage(item) => item.as_legacy_events(), TurnItem::WebSearch(item) => vec![item.as_legacy_event()], TurnItem::Reasoning(item) => item.as_legacy_events(show_raw_agent_reasoning), + TurnItem::ContextCompaction(_) => Vec::new(), } } } diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index c2d282e995..243870c5f0 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -688,8 +688,12 @@ pub enum EventMsg { /// indicates the turn continued but the user should still be notified. Warning(WarningEvent), - /// Conversation history was compacted (either automatically or manually). - ContextCompacted(ContextCompactedEvent), + /// Conversation history compaction has started. + ContextCompactionStarted(ContextCompactionStartedEvent), + + /// Conversation history compaction has ended (either automatically or manually). + #[serde(alias = "context_compacted")] + ContextCompactionEnded(ContextCompactionEndedEvent), /// Conversation history was rolled back by dropping the last N user turns. ThreadRolledBack(ThreadRolledBackEvent), @@ -1083,7 +1087,10 @@ pub struct WarningEvent { } #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)] -pub struct ContextCompactedEvent; +pub struct ContextCompactionStartedEvent; + +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)] +pub struct ContextCompactionEndedEvent; #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)] pub struct TurnCompleteEvent { diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index bd1014b2f4..ce80e78109 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -3077,7 +3077,12 @@ impl ChatWidget { self.on_entered_review_mode(review_request, from_replay) } EventMsg::ExitedReviewMode(review) => self.on_exited_review_mode(review), - EventMsg::ContextCompacted(_) => self.on_agent_message("Context compacted".to_owned()), + EventMsg::ContextCompactionStarted(_) => { + self.on_agent_message("Context compaction started".to_owned()) + } + EventMsg::ContextCompactionEnded(_) => { + self.on_agent_message("Context compaction ended".to_owned()) + } EventMsg::CollabAgentSpawnBegin(_) => {} EventMsg::CollabAgentSpawnEnd(ev) => self.on_collab_event(collab::spawn_end(ev)), EventMsg::CollabAgentInteractionBegin(_) => {}