From 17d552fb4d53d602eab87076acff1239a0420708 Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Mon, 18 May 2026 18:13:38 -0700 Subject: [PATCH] [codex] Remove external websocket session resets (#23384) ## Why Compaction now installs replacement history inside the session, but the turn and compaction callers were still reaching into `ModelClientSession` to reset websocket transport state after that install. That made a transport-level reset part of the compaction API even though websocket incremental request selection already checks whether the next request is a strict extension of the previous one and falls back to a full `response.create` when it is not. ## What changed - Removed the compaction-side calls to `reset_websocket_session` from `compact.rs` and `session/turn.rs`. - Simplified pre-sampling and mid-turn compaction helpers so they return `CodexResult<()>` instead of carrying a reset flag. - Made `ModelClientSession::reset_websocket_session` private to `client.rs`, leaving only the websocket timeout recovery path inside the client as a caller. ## Validation - `cargo test -p codex-core --test all responses_websocket_creates_on_non_prefix` - `cargo test -p codex-core --test all steered_user_input_waits_for_model_continuation_after_mid_turn_compact` - `cargo test -p codex-core --test all pre_sampling_compact_runs_on_switch_to_smaller_context_model` --- codex-rs/core/src/client.rs | 2 +- codex-rs/core/src/compact.rs | 1 - codex-rs/core/src/session/turn.rs | 77 +++++++++++-------------------- 3 files changed, 29 insertions(+), 51 deletions(-) diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index f604a63458..cd1fcb6696 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -931,7 +931,7 @@ impl Drop for ModelClientSession { } impl ModelClientSession { - pub(crate) fn reset_websocket_session(&mut self) { + fn reset_websocket_session(&mut self) { self.websocket_session.connection = None; self.websocket_session.last_request = None; self.websocket_session.last_response_rx = None; diff --git a/codex-rs/core/src/compact.rs b/codex-rs/core/src/compact.rs index 9d1c82eb65..b5802268d3 100644 --- a/codex-rs/core/src/compact.rs +++ b/codex-rs/core/src/compact.rs @@ -282,7 +282,6 @@ async fn run_compact_task_inner_impl( }; sess.replace_compacted_history(new_history, reference_context_item, compacted_item) .await; - client_session.reset_websocket_session(); sess.recompute_token_usage(&turn_context).await; sess.emit_turn_item_completed(&turn_context, compaction_item) diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 0c5e00927a..95b0f5e312 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -160,25 +160,18 @@ pub(crate) async fn run_turn( // new user message are recorded. Estimate pending incoming items (context // diffs/full reinjection + user input) and trigger compaction preemptively // when they would push the thread over the compaction threshold. - let pre_sampling_compact = - match run_pre_sampling_compact(&sess, &turn_context, &mut client_session).await { - Ok(pre_sampling_compact) => pre_sampling_compact, - Err(err) => { - if err.to_codex_protocol_error() == CodexErrorInfo::UsageLimitExceeded - && let Err(err) = sess - .goal_runtime_apply(GoalRuntimeEvent::UsageLimitReached { - turn_context: turn_context.as_ref(), - }) - .await - { - warn!("failed to usage-limit active goal after usage-limit error: {err}"); - } - error!("Failed to run pre-sampling compact"); - return None; - } - }; - if pre_sampling_compact.reset_client_session { - client_session.reset_websocket_session(); + if let Err(err) = run_pre_sampling_compact(&sess, &turn_context, &mut client_session).await { + if err.to_codex_protocol_error() == CodexErrorInfo::UsageLimitExceeded + && let Err(err) = sess + .goal_runtime_apply(GoalRuntimeEvent::UsageLimitReached { + turn_context: turn_context.as_ref(), + }) + .await + { + warn!("failed to usage-limit active goal after usage-limit error: {err}"); + } + error!("Failed to run pre-sampling compact"); + return None; } let skills_outcome = Some(turn_context.turn_skills.outcome.as_ref()); @@ -519,7 +512,7 @@ pub(crate) async fn run_turn( // as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop. if token_limit_reached && needs_follow_up { - let reset_client_session = match run_auto_compact( + match run_auto_compact( &sess, &turn_context, &mut client_session, @@ -529,7 +522,7 @@ pub(crate) async fn run_turn( ) .await { - Ok(reset_client_session) => reset_client_session, + Ok(()) => {} Err(err) => { if err.to_codex_protocol_error() == CodexErrorInfo::UsageLimitExceeded && let Err(err) = sess @@ -545,9 +538,6 @@ pub(crate) async fn run_turn( return None; } }; - if reset_client_session { - client_session.reset_websocket_session(); - } can_drain_pending_input = !model_needs_follow_up; continue; } @@ -767,24 +757,19 @@ async fn track_turn_resolved_config_analytics( }); } -struct PreSamplingCompactResult { - reset_client_session: bool, -} - async fn run_pre_sampling_compact( sess: &Arc, turn_context: &Arc, client_session: &mut ModelClientSession, -) -> CodexResult { +) -> CodexResult<()> { let total_usage_tokens_before_compaction = sess.get_total_token_usage().await; - let mut pre_sampling_compacted = maybe_run_previous_model_inline_compact( + maybe_run_previous_model_inline_compact( sess, turn_context, client_session, total_usage_tokens_before_compaction, ) .await?; - let mut reset_client_session = pre_sampling_compacted; let total_usage_tokens = sess.get_total_token_usage().await; let auto_compact_limit = turn_context .model_info @@ -792,7 +777,7 @@ async fn run_pre_sampling_compact( .unwrap_or(i64::MAX); // Compact if the total usage tokens are greater than the auto compact limit if total_usage_tokens >= auto_compact_limit { - reset_client_session |= run_auto_compact( + run_auto_compact( sess, turn_context, client_session, @@ -801,27 +786,22 @@ async fn run_pre_sampling_compact( CompactionPhase::PreTurn, ) .await?; - pre_sampling_compacted = true; } - Ok(PreSamplingCompactResult { - reset_client_session: pre_sampling_compacted && reset_client_session, - }) + Ok(()) } /// Runs pre-sampling compaction against the previous model when switching to a smaller /// context-window model. /// -/// Returns `Ok(true)` when compaction ran successfully, `Ok(false)` when compaction was skipped -/// because the model/context-window preconditions were not met, and `Err(_)` only when compaction -/// was attempted and failed. +/// Returns `Err(_)` only when compaction was attempted and failed. async fn maybe_run_previous_model_inline_compact( sess: &Arc, turn_context: &Arc, client_session: &mut ModelClientSession, total_usage_tokens: i64, -) -> CodexResult { +) -> CodexResult<()> { let Some(previous_turn_settings) = sess.previous_turn_settings().await else { - return Ok(false); + return Ok(()); }; let previous_model_turn_context = Arc::new( turn_context @@ -830,10 +810,10 @@ async fn maybe_run_previous_model_inline_compact( ); let Some(old_context_window) = previous_model_turn_context.model_context_window() else { - return Ok(false); + return Ok(()); }; let Some(new_context_window) = turn_context.model_context_window() else { - return Ok(false); + return Ok(()); }; let new_auto_compact_limit = turn_context .model_info @@ -843,7 +823,7 @@ async fn maybe_run_previous_model_inline_compact( && previous_model_turn_context.model_info.slug != turn_context.model_info.slug && old_context_window > new_context_window; if should_run { - let _ = run_auto_compact( + run_auto_compact( sess, &previous_model_turn_context, client_session, @@ -852,9 +832,8 @@ async fn maybe_run_previous_model_inline_compact( CompactionPhase::PreTurn, ) .await?; - return Ok(true); } - Ok(false) + Ok(()) } async fn run_auto_compact( @@ -864,7 +843,7 @@ async fn run_auto_compact( initial_context_injection: InitialContextInjection, reason: CompactionReason, phase: CompactionPhase, -) -> CodexResult { +) -> CodexResult<()> { if should_use_remote_compact_task(turn_context.provider.info()) { if turn_context.features.enabled(Feature::RemoteCompactionV2) { run_inline_remote_auto_compact_task_v2( @@ -876,7 +855,7 @@ async fn run_auto_compact( phase, ) .await?; - return Ok(false); + return Ok(()); } run_inline_remote_auto_compact_task( Arc::clone(sess), @@ -896,7 +875,7 @@ async fn run_auto_compact( ) .await?; } - Ok(true) + Ok(()) } pub(super) fn collect_explicit_app_ids_from_skill_items(