[codex-core] Clean up server-side compaction handling [ci changed_files]

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Cooper Gamble
2026-03-11 02:45:10 +00:00
parent 4956a47b5b
commit 8be76fe7b8
3 changed files with 41 additions and 139 deletions

View File

@@ -41,7 +41,6 @@ use crate::realtime_conversation::handle_start as handle_realtime_conversation_s
use crate::realtime_conversation::handle_text as handle_realtime_conversation_text;
use crate::rollout::session_index;
use crate::stream_events_utils::HandleOutputCtx;
use crate::stream_events_utils::ServerSideCompaction;
use crate::stream_events_utils::handle_non_tool_response_item;
use crate::stream_events_utils::handle_output_item_done;
use crate::stream_events_utils::last_assistant_message_from_item;
@@ -81,6 +80,7 @@ use codex_protocol::config_types::Settings;
use codex_protocol::config_types::WebSearchMode;
use codex_protocol::dynamic_tools::DynamicToolResponse;
use codex_protocol::dynamic_tools::DynamicToolSpec;
use codex_protocol::items::ContextCompactionItem;
use codex_protocol::items::PlanItem;
use codex_protocol::items::TurnItem;
use codex_protocol::items::UserMessageItem;
@@ -3543,7 +3543,7 @@ impl Session {
pub(crate) async fn record_context_updates_and_set_reference_context_item(
&self,
turn_context: &TurnContext,
) -> Vec<ResponseItem> {
) {
let reference_context_item = {
let state = self.state.lock().await;
state.reference_context_item()
@@ -3570,7 +3570,6 @@ impl Session {
// context items. This keeps later runtime diffing aligned with the current turn state.
let mut state = self.state.lock().await;
state.set_reference_context_item(Some(turn_context_item));
context_items
}
pub(crate) async fn update_token_usage_info(
@@ -5537,12 +5536,8 @@ pub(crate) async fn run_turn(
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input.clone());
let response_item: ResponseItem = initial_input_for_turn.clone().into();
sess.record_user_prompt_and_emit_turn_item(
turn_context.as_ref(),
&input,
response_item.clone(),
)
.await;
sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), &input, response_item)
.await;
// Track the previous-turn baseline from the regular user-turn path only so
// standalone tasks (compact/shell/review/undo) cannot suppress future
// model/realtime injections.
@@ -5691,22 +5686,9 @@ pub(crate) async fn run_turn(
.await
{
Ok(sampling_request_output) => {
if inline_server_side_compaction_threshold(&sess, &turn_context).is_some() {
let result = if sampling_request_output.observed_server_side_compaction {
"applied"
} else {
"skipped"
};
sess.services.session_telemetry.counter(
"codex.inline_compaction",
1,
&[("result", result)],
);
}
let SamplingRequestResult {
needs_follow_up,
last_agent_message: sampling_request_last_agent_message,
observed_server_side_compaction: _,
} = sampling_request_output;
let total_usage_tokens = sess.get_total_token_usage().await;
let token_limit_reached = total_usage_tokens >= auto_compact_limit;
@@ -5725,19 +5707,18 @@ pub(crate) async fn run_turn(
);
// as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop.
if token_limit_reached && needs_follow_up {
if inline_server_side_compaction_threshold(&sess, &turn_context).is_some() {
} else if run_auto_compact(
if token_limit_reached
&& needs_follow_up
&& inline_server_side_compaction_threshold(&sess, &turn_context).is_none()
&& run_auto_compact(
&sess,
&turn_context,
InitialContextInjection::BeforeLastUserMessage,
)
.await
.is_err()
{
return None;
}
continue;
{
return None;
}
if !needs_follow_up {
@@ -5898,9 +5879,8 @@ fn inline_server_side_compaction_threshold(
if !should_use_remote_compact_task(&turn_context.provider) {
return None;
}
// OpenAI inline auto-compaction uses Responses `context_management`, which has no
// compaction-prompt field. Auto-compaction therefore ignores `compact_prompt`, while manual
// `/compact` still uses the point-in-time compact endpoint.
// Inline Responses compaction has no prompt override; manual `/compact` still uses the
// point-in-time compact endpoint.
turn_context.model_info.auto_compact_token_limit()
}
@@ -6392,7 +6372,6 @@ pub(crate) async fn built_tools(
struct SamplingRequestResult {
needs_follow_up: bool,
last_agent_message: Option<String>,
observed_server_side_compaction: bool,
}
/// Ephemeral per-response state for streaming a single proposed plan.
@@ -6982,7 +6961,6 @@ async fn try_run_sampling_request(
FuturesOrdered::new();
let mut needs_follow_up = false;
let mut last_agent_message: Option<String> = None;
let mut observed_server_side_compaction = false;
let mut active_item: Option<TurnItem> = None;
let mut should_emit_turn_diff = false;
let plan_mode = turn_context.collaboration_mode.mode == ModeKind::Plan;
@@ -7026,7 +7004,6 @@ async fn try_run_sampling_request(
match event {
ResponseEvent::Created => {}
ResponseEvent::OutputItemDone(item) => {
let saw_server_side_compaction = matches!(item, ResponseItem::Compaction { .. });
let previously_active_item = active_item.take();
if let Some(previous) = previously_active_item.as_ref()
&& matches!(previous, TurnItem::AgentMessage(_))
@@ -7055,6 +7032,28 @@ async fn try_run_sampling_request(
continue;
}
if matches!(item, ResponseItem::Compaction { .. }) {
let turn_item = TurnItem::ContextCompaction(match previously_active_item {
Some(TurnItem::ContextCompaction(item)) => item,
_ => ContextCompactionItem::new(),
});
debug!(
turn_id = %turn_context.sub_id,
"emitting streamed server-side raw compaction item for immediate local checkpoint apply"
);
sess.send_event(
&turn_context,
EventMsg::RawResponseItem(RawResponseItemEvent { item: item.clone() }),
)
.await;
sess.apply_server_side_compaction(turn_context.as_ref(), item)
.await;
sess.emit_turn_item_started(&turn_context, &turn_item).await;
sess.emit_turn_item_completed(&turn_context, turn_item)
.await;
continue;
}
let mut ctx = HandleOutputCtx {
sess: sess.clone(),
turn_context: turn_context.clone(),
@@ -7065,16 +7064,6 @@ async fn try_run_sampling_request(
let output_result = handle_output_item_done(&mut ctx, item, previously_active_item)
.instrument(handle_responses)
.await?;
observed_server_side_compaction |= saw_server_side_compaction;
if let Some(ServerSideCompaction { item, turn_item }) =
output_result.server_side_compaction
{
sess.apply_server_side_compaction(turn_context.as_ref(), item)
.await;
sess.emit_turn_item_started(&turn_context, &turn_item).await;
sess.emit_turn_item_completed(&turn_context, turn_item)
.await;
}
if let Some(tool_future) = output_result.tool_future {
in_flight.push_back(tool_future);
}
@@ -7084,6 +7073,9 @@ async fn try_run_sampling_request(
needs_follow_up |= output_result.needs_follow_up;
}
ResponseEvent::OutputItemAdded(item) => {
if matches!(item, ResponseItem::Compaction { .. }) {
continue;
}
if let Some(turn_item) = handle_non_tool_response_item(
sess.as_ref(),
turn_context.as_ref(),
@@ -7093,10 +7085,6 @@ async fn try_run_sampling_request(
.await
{
let mut turn_item = turn_item;
if matches!(turn_item, TurnItem::ContextCompaction(_)) {
active_item = Some(turn_item);
continue;
}
let mut seeded_parsed: Option<ParsedAssistantTextDelta> = None;
let mut seeded_item_id: Option<String> = None;
if matches!(turn_item, TurnItem::AgentMessage(_))
@@ -7186,7 +7174,6 @@ async fn try_run_sampling_request(
break Ok(SamplingRequestResult {
needs_follow_up,
last_agent_message,
observed_server_side_compaction,
});
}
ResponseEvent::OutputTextDelta(delta) => {

View File

@@ -3848,62 +3848,6 @@ async fn task_finish_emits_turn_item_lifecycle_for_leftover_pending_user_input()
));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn inline_compaction_output_item_returns_immediate_compaction_apply_signal() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;
while rx.try_recv().is_ok() {}
let router = Arc::new(ToolRouter::from_config(
&tc.tools_config,
None,
None,
tc.dynamic_tools.as_slice(),
));
let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new()));
let mut ctx = crate::stream_events_utils::HandleOutputCtx {
sess: Arc::clone(&sess),
turn_context: Arc::clone(&tc),
tool_runtime: crate::tools::parallel::ToolCallRuntime::new(
router,
Arc::clone(&sess),
Arc::clone(&tc),
tracker,
),
cancellation_token: tokio_util::sync::CancellationToken::new(),
};
let item = ResponseItem::Compaction {
encrypted_content: "INLINE_SERVER_SUMMARY".to_string(),
};
let output = crate::stream_events_utils::handle_output_item_done(
&mut ctx,
item.clone(),
Some(TurnItem::ContextCompaction(
codex_protocol::items::ContextCompactionItem::new(),
)),
)
.await
.expect("handle output item");
assert!(output.server_side_compaction.is_some());
assert!(output.tool_future.is_none());
assert!(!output.needs_follow_up);
assert!(output.last_agent_message.is_none());
let first = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv())
.await
.expect("expected raw response item event")
.expect("channel open");
assert!(matches!(
first.msg,
EventMsg::RawResponseItem(raw) if raw.item == item
));
assert!(
rx.try_recv().is_err(),
"expected the caller to decide when to commit the compaction lifecycle"
);
}
#[tokio::test]
async fn steer_input_requires_active_turn() {
let (sess, _tc, _rx) = make_session_and_context_with_rx().await;

View File

@@ -5,7 +5,6 @@ use std::sync::Arc;
use base64::Engine;
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
use codex_protocol::config_types::ModeKind;
use codex_protocol::items::ContextCompactionItem;
use codex_protocol::items::TurnItem;
use codex_utils_stream_parser::strip_citations;
use tokio_util::sync::CancellationToken;
@@ -25,8 +24,6 @@ use codex_protocol::models::FunctionCallOutputBody;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::RawResponseItemEvent;
use codex_utils_stream_parser::strip_proposed_plan_blocks;
use futures::Future;
use tracing::debug;
@@ -147,15 +144,9 @@ pub(crate) type InFlightFuture<'f> =
pub(crate) struct OutputItemResult {
pub last_agent_message: Option<String>,
pub needs_follow_up: bool,
pub server_side_compaction: Option<ServerSideCompaction>,
pub tool_future: Option<InFlightFuture<'static>>,
}
pub(crate) struct ServerSideCompaction {
pub item: ResponseItem,
pub turn_item: TurnItem,
}
pub(crate) struct HandleOutputCtx {
pub sess: Arc<Session>,
pub turn_context: Arc<TurnContext>,
@@ -171,27 +162,10 @@ pub(crate) async fn handle_output_item_done(
) -> Result<OutputItemResult> {
let mut output = OutputItemResult::default();
let plan_mode = ctx.turn_context.collaboration_mode.mode == ModeKind::Plan;
if matches!(item, ResponseItem::Compaction { .. }) {
let turn_item = TurnItem::ContextCompaction(match previously_active_item {
Some(TurnItem::ContextCompaction(item)) => item,
_ => ContextCompactionItem::new(),
});
// Preserve the raw wire event immediately; the caller rewrites local history inline and
// then continues appending later same-turn output after the compaction item.
debug!(
turn_id = %ctx.turn_context.sub_id,
"emitting streamed server-side raw compaction item for immediate local checkpoint apply"
);
ctx.sess
.send_event(
&ctx.turn_context,
EventMsg::RawResponseItem(RawResponseItemEvent { item: item.clone() }),
)
.await;
output.server_side_compaction = Some(ServerSideCompaction { item, turn_item });
return Ok(output);
}
debug_assert!(
!matches!(item, ResponseItem::Compaction { .. }),
"compaction items should be handled before handle_output_item_done"
);
match ToolRouter::build_tool_call(ctx.sess.as_ref(), item.clone()).await {
// The model emitted a tool call; log it, persist the item immediately, and queue the tool execution.
@@ -365,9 +339,6 @@ pub(crate) async fn handle_non_tool_response_item(
}
Some(turn_item)
}
ResponseItem::Compaction { .. } => {
Some(TurnItem::ContextCompaction(ContextCompactionItem::new()))
}
ResponseItem::FunctionCallOutput { .. } | ResponseItem::CustomToolCallOutput { .. } => {
debug!("unexpected tool output from stream");
None