compaction

This commit is contained in:
Ahmed Ibrahim
2026-01-27 14:02:38 -08:00
parent 3ae966edd8
commit d8c11f15a8
14 changed files with 132 additions and 18 deletions

View File

@@ -598,6 +598,7 @@ server_notification_definitions! {
ReasoningSummaryTextDelta => "item/reasoning/summaryTextDelta" (v2::ReasoningSummaryTextDeltaNotification),
ReasoningSummaryPartAdded => "item/reasoning/summaryPartAdded" (v2::ReasoningSummaryPartAddedNotification),
ReasoningTextDelta => "item/reasoning/textDelta" (v2::ReasoningTextDeltaNotification),
ContextCompactionStarted => "thread/compaction/started" (v2::ContextCompactionStartedNotification),
ContextCompacted => "thread/compacted" (v2::ContextCompactedNotification),
DeprecationNotice => "deprecationNotice" (v2::DeprecationNoticeNotification),
ConfigWarning => "configWarning" (v2::ConfigWarningNotification),

View File

@@ -56,6 +56,8 @@ impl ThreadHistoryBuilder {
self.handle_agent_reasoning_raw_content(payload)
}
EventMsg::TokenCount(_) => {}
EventMsg::ContextCompactionStarted(_) => {}
EventMsg::ContextCompactionEnded(_) => {}
EventMsg::EnteredReviewMode(_) => {}
EventMsg::ExitedReviewMode(_) => {}
EventMsg::ThreadRolledBack(payload) => self.handle_thread_rollback(payload),

View File

@@ -1969,6 +1969,9 @@ pub enum ThreadItem {
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
ExitedReviewMode { id: String, review: String },
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
ContextCompaction { id: String },
}
impl From<CoreTurnItem> for ThreadItem {
@@ -1997,6 +2000,9 @@ impl From<CoreTurnItem> for ThreadItem {
id: search.id,
query: search.query,
},
CoreTurnItem::ContextCompaction(compaction) => {
ThreadItem::ContextCompaction { id: compaction.id }
}
}
}
}
@@ -2367,6 +2373,14 @@ pub struct ContextCompactedNotification {
pub turn_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ContextCompactionStartedNotification {
pub thread_id: String,
pub turn_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
@@ -2617,6 +2631,7 @@ mod tests {
use super::*;
use codex_protocol::items::AgentMessageContent;
use codex_protocol::items::AgentMessageItem;
use codex_protocol::items::ContextCompactionItem;
use codex_protocol::items::ReasoningItem;
use codex_protocol::items::TurnItem;
use codex_protocol::items::UserMessageItem;
@@ -2741,6 +2756,17 @@ mod tests {
query: "docs".to_string(),
}
);
let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem {
id: "compact-1".to_string(),
});
assert_eq!(
ThreadItem::from(compaction_item),
ThreadItem::ContextCompaction {
id: "compact-1".to_string(),
}
);
}
#[test]

View File

@@ -24,6 +24,7 @@ use codex_app_server_protocol::CommandExecutionRequestApprovalParams;
use codex_app_server_protocol::CommandExecutionRequestApprovalResponse;
use codex_app_server_protocol::CommandExecutionStatus;
use codex_app_server_protocol::ContextCompactedNotification;
use codex_app_server_protocol::ContextCompactionStartedNotification;
use codex_app_server_protocol::DeprecationNoticeNotification;
use codex_app_server_protocol::DynamicToolCallParams;
use codex_app_server_protocol::ErrorNotification;
@@ -601,7 +602,18 @@ pub(crate) async fn apply_bespoke_event_handling(
.send_server_notification(ServerNotification::AgentMessageDelta(notification))
.await;
}
EventMsg::ContextCompacted(..) => {
EventMsg::ContextCompactionStarted(..) => {
let notification = ContextCompactionStartedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_turn_id.clone(),
};
outgoing
.send_server_notification(ServerNotification::ContextCompactionStarted(
notification,
))
.await;
}
EventMsg::ContextCompactionEnded(..) => {
let notification = ContextCompactedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_turn_id.clone(),

View File

@@ -10,7 +10,8 @@ use crate::error::CodexErr;
use crate::error::Result as CodexResult;
use crate::features::Feature;
use crate::protocol::CompactedItem;
use crate::protocol::ContextCompactedEvent;
use crate::protocol::ContextCompactionEndedEvent;
use crate::protocol::ContextCompactionStartedEvent;
use crate::protocol::EventMsg;
use crate::protocol::TurnContextItem;
use crate::protocol::TurnStartedEvent;
@@ -20,6 +21,7 @@ use crate::truncate::TruncationPolicy;
use crate::truncate::approx_token_count;
use crate::truncate::truncate_text;
use crate::util::backoff;
use codex_protocol::items::ContextCompactionItem;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseInputItem;
@@ -28,6 +30,7 @@ use codex_protocol::protocol::RolloutItem;
use codex_protocol::user_input::UserInput;
use futures::prelude::*;
use tracing::error;
use uuid::Uuid;
pub const SUMMARIZATION_PROMPT: &str = include_str!("../templates/compact/prompt.md");
pub const SUMMARY_PREFIX: &str = include_str!("../templates/compact/summary_prefix.md");
@@ -71,6 +74,9 @@ async fn run_compact_task_inner(
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
) {
let compaction_item = compaction_turn_item();
emit_compaction_started(&sess, &turn_context, &compaction_item).await;
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
let mut history = sess.clone_history().await;
@@ -131,6 +137,7 @@ async fn run_compact_task_inner(
break;
}
Err(CodexErr::Interrupted) => {
emit_compaction_ended(&sess, &turn_context, compaction_item.clone()).await;
return;
}
Err(e @ CodexErr::ContextWindowExceeded) => {
@@ -147,6 +154,7 @@ async fn run_compact_task_inner(
sess.set_total_tokens_full(turn_context.as_ref()).await;
let event = EventMsg::Error(e.to_error_event(None));
sess.send_event(&turn_context, event).await;
emit_compaction_ended(&sess, &turn_context, compaction_item.clone()).await;
return;
}
Err(e) => {
@@ -164,6 +172,7 @@ async fn run_compact_task_inner(
} else {
let event = EventMsg::Error(e.to_error_event(None));
sess.send_event(&turn_context, event).await;
emit_compaction_ended(&sess, &turn_context, compaction_item.clone()).await;
return;
}
}
@@ -193,8 +202,7 @@ async fn run_compact_task_inner(
});
sess.persist_rollout_items(&[rollout_item]).await;
let event = EventMsg::ContextCompacted(ContextCompactedEvent {});
sess.send_event(&turn_context, event).await;
emit_compaction_ended(&sess, &turn_context, compaction_item).await;
let warning = EventMsg::Warning(WarningEvent {
message: "Heads up: Long threads and multiple compactions can cause the model to be less accurate. Start a new thread when possible to keep threads small and targeted.".to_string(),
@@ -202,6 +210,38 @@ async fn run_compact_task_inner(
sess.send_event(&turn_context, warning).await;
}
fn compaction_turn_item() -> TurnItem {
TurnItem::ContextCompaction(ContextCompactionItem {
id: Uuid::new_v4().to_string(),
})
}
pub(crate) async fn emit_compaction_started(
sess: &Session,
turn_context: &TurnContext,
item: &TurnItem,
) {
sess.send_event(
turn_context,
EventMsg::ContextCompactionStarted(ContextCompactionStartedEvent {}),
)
.await;
sess.emit_turn_item_started(turn_context, item).await;
}
pub(crate) async fn emit_compaction_ended(
sess: &Session,
turn_context: &TurnContext,
item: TurnItem,
) {
sess.emit_turn_item_completed(turn_context, item).await;
sess.send_event(
turn_context,
EventMsg::ContextCompactionEnded(ContextCompactionEndedEvent {}),
)
.await;
}
pub fn content_items_to_text(content: &[ContentItem]) -> Option<String> {
let mut pieces = Vec::new();
for item in content {

View File

@@ -3,13 +3,17 @@ use std::sync::Arc;
use crate::Prompt;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::compact::emit_compaction_ended;
use crate::compact::emit_compaction_started;
use crate::error::Result as CodexResult;
use crate::protocol::CompactedItem;
use crate::protocol::ContextCompactedEvent;
use crate::protocol::EventMsg;
use crate::protocol::RolloutItem;
use crate::protocol::TurnStartedEvent;
use codex_protocol::items::ContextCompactionItem;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ResponseItem;
use uuid::Uuid;
pub(crate) async fn run_inline_remote_auto_compact_task(
sess: Arc<Session>,
@@ -28,12 +32,19 @@ pub(crate) async fn run_remote_compact_task(sess: Arc<Session>, turn_context: Ar
}
async fn run_remote_compact_task_inner(sess: &Arc<Session>, turn_context: &Arc<TurnContext>) {
let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem {
id: Uuid::new_v4().to_string(),
});
emit_compaction_started(sess, turn_context, &compaction_item).await;
if let Err(err) = run_remote_compact_task_inner_impl(sess, turn_context).await {
let event = EventMsg::Error(
err.to_error_event(Some("Error running remote compact task".to_string())),
);
sess.send_event(turn_context, event).await;
}
emit_compaction_ended(sess, turn_context, compaction_item).await;
}
async fn run_remote_compact_task_inner_impl(
@@ -77,8 +88,5 @@ async fn run_remote_compact_task_inner_impl(
sess.persist_rollout_items(&[RolloutItem::Compacted(compacted_item)])
.await;
let event = EventMsg::ContextCompacted(ContextCompactedEvent {});
sess.send_event(turn_context, event).await;
Ok(())
}

View File

@@ -42,7 +42,8 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool {
| EventMsg::AgentReasoning(_)
| EventMsg::AgentReasoningRawContent(_)
| EventMsg::TokenCount(_)
| EventMsg::ContextCompacted(_)
| EventMsg::ContextCompactionStarted(_)
| EventMsg::ContextCompactionEnded(_)
| EventMsg::EnteredReviewMode(_)
| EventMsg::ExitedReviewMode(_)
| EventMsg::ThreadRolledBack(_)

View File

@@ -1277,7 +1277,7 @@ async fn auto_compact_runs_after_resume_when_token_usage_is_over_limit() {
.unwrap();
wait_for_event(&resumed.codex, |event| {
matches!(event, EventMsg::ContextCompacted(_))
matches!(event, EventMsg::ContextCompactionEnded(_))
})
.await;
wait_for_event(&resumed.codex, |event| {

View File

@@ -202,7 +202,7 @@ async fn remote_compact_runs_automatically() -> Result<()> {
})
.await?;
let message = wait_for_event_match(&codex, |ev| match ev {
EventMsg::ContextCompacted(_) => Some(true),
EventMsg::ContextCompactionEnded(_) => Some(true),
_ => None,
})
.await;

View File

@@ -590,8 +590,11 @@ impl EventProcessor for EventProcessorWithHumanOutput {
ts_msg!(self, "task aborted: review ended");
}
},
EventMsg::ContextCompacted(_) => {
ts_msg!(self, "context compacted");
EventMsg::ContextCompactionStarted(_) => {
ts_msg!(self, "context compaction started");
}
EventMsg::ContextCompactionEnded(_) => {
ts_msg!(self, "context compaction ended");
}
EventMsg::CollabAgentSpawnBegin(CollabAgentSpawnBeginEvent {
call_id,

View File

@@ -361,7 +361,8 @@ async fn run_codex_tool_session_inner(
| EventMsg::ExitedReviewMode(_)
| EventMsg::RequestUserInput(_)
| EventMsg::DynamicToolCallRequest(_)
| EventMsg::ContextCompacted(_)
| EventMsg::ContextCompactionStarted(_)
| EventMsg::ContextCompactionEnded(_)
| EventMsg::ThreadRolledBack(_)
| EventMsg::CollabAgentSpawnBegin(_)
| EventMsg::CollabAgentSpawnEnd(_)

View File

@@ -21,6 +21,7 @@ pub enum TurnItem {
AgentMessage(AgentMessageItem),
Reasoning(ReasoningItem),
WebSearch(WebSearchItem),
ContextCompaction(ContextCompactionItem),
}
#[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)]
@@ -57,6 +58,11 @@ pub struct WebSearchItem {
pub action: WebSearchAction,
}
#[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)]
pub struct ContextCompactionItem {
pub id: String,
}
impl UserMessageItem {
pub fn new(content: &[UserInput]) -> Self {
Self {
@@ -195,6 +201,7 @@ impl TurnItem {
TurnItem::AgentMessage(item) => item.id.clone(),
TurnItem::Reasoning(item) => item.id.clone(),
TurnItem::WebSearch(item) => item.id.clone(),
TurnItem::ContextCompaction(item) => item.id.clone(),
}
}
@@ -204,6 +211,7 @@ impl TurnItem {
TurnItem::AgentMessage(item) => item.as_legacy_events(),
TurnItem::WebSearch(item) => vec![item.as_legacy_event()],
TurnItem::Reasoning(item) => item.as_legacy_events(show_raw_agent_reasoning),
TurnItem::ContextCompaction(_) => Vec::new(),
}
}
}

View File

@@ -688,8 +688,12 @@ pub enum EventMsg {
/// indicates the turn continued but the user should still be notified.
Warning(WarningEvent),
/// Conversation history was compacted (either automatically or manually).
ContextCompacted(ContextCompactedEvent),
/// Conversation history compaction has started.
ContextCompactionStarted(ContextCompactionStartedEvent),
/// Conversation history compaction has ended (either automatically or manually).
#[serde(alias = "context_compacted")]
ContextCompactionEnded(ContextCompactionEndedEvent),
/// Conversation history was rolled back by dropping the last N user turns.
ThreadRolledBack(ThreadRolledBackEvent),
@@ -1083,7 +1087,10 @@ pub struct WarningEvent {
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
pub struct ContextCompactedEvent;
pub struct ContextCompactionStartedEvent;
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
pub struct ContextCompactionEndedEvent;
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
pub struct TurnCompleteEvent {

View File

@@ -3077,7 +3077,12 @@ impl ChatWidget {
self.on_entered_review_mode(review_request, from_replay)
}
EventMsg::ExitedReviewMode(review) => self.on_exited_review_mode(review),
EventMsg::ContextCompacted(_) => self.on_agent_message("Context compacted".to_owned()),
EventMsg::ContextCompactionStarted(_) => {
self.on_agent_message("Context compaction started".to_owned())
}
EventMsg::ContextCompactionEnded(_) => {
self.on_agent_message("Context compaction ended".to_owned())
}
EventMsg::CollabAgentSpawnBegin(_) => {}
EventMsg::CollabAgentSpawnEnd(ev) => self.on_collab_event(collab::spawn_end(ev)),
EventMsg::CollabAgentInteractionBegin(_) => {}