mirror of
https://github.com/openai/codex.git
synced 2026-05-24 13:04:29 +00:00
[codex] Remove external websocket session resets (#23384)
## Why Compaction now installs replacement history inside the session, but the turn and compaction callers were still reaching into `ModelClientSession` to reset websocket transport state after that install. That made a transport-level reset part of the compaction API even though websocket incremental request selection already checks whether the next request is a strict extension of the previous one and falls back to a full `response.create` when it is not. ## What changed - Removed the compaction-side calls to `reset_websocket_session` from `compact.rs` and `session/turn.rs`. - Simplified pre-sampling and mid-turn compaction helpers so they return `CodexResult<()>` instead of carrying a reset flag. - Made `ModelClientSession::reset_websocket_session` private to `client.rs`, leaving only the websocket timeout recovery path inside the client as a caller. ## Validation - `cargo test -p codex-core --test all responses_websocket_creates_on_non_prefix` - `cargo test -p codex-core --test all steered_user_input_waits_for_model_continuation_after_mid_turn_compact` - `cargo test -p codex-core --test all pre_sampling_compact_runs_on_switch_to_smaller_context_model`
This commit is contained in:
@@ -931,7 +931,7 @@ impl Drop for ModelClientSession {
|
||||
}
|
||||
|
||||
impl ModelClientSession {
|
||||
pub(crate) fn reset_websocket_session(&mut self) {
|
||||
fn reset_websocket_session(&mut self) {
|
||||
self.websocket_session.connection = None;
|
||||
self.websocket_session.last_request = None;
|
||||
self.websocket_session.last_response_rx = None;
|
||||
|
||||
@@ -282,7 +282,6 @@ async fn run_compact_task_inner_impl(
|
||||
};
|
||||
sess.replace_compacted_history(new_history, reference_context_item, compacted_item)
|
||||
.await;
|
||||
client_session.reset_websocket_session();
|
||||
sess.recompute_token_usage(&turn_context).await;
|
||||
|
||||
sess.emit_turn_item_completed(&turn_context, compaction_item)
|
||||
|
||||
@@ -160,25 +160,18 @@ pub(crate) async fn run_turn(
|
||||
// new user message are recorded. Estimate pending incoming items (context
|
||||
// diffs/full reinjection + user input) and trigger compaction preemptively
|
||||
// when they would push the thread over the compaction threshold.
|
||||
let pre_sampling_compact =
|
||||
match run_pre_sampling_compact(&sess, &turn_context, &mut client_session).await {
|
||||
Ok(pre_sampling_compact) => pre_sampling_compact,
|
||||
Err(err) => {
|
||||
if err.to_codex_protocol_error() == CodexErrorInfo::UsageLimitExceeded
|
||||
&& let Err(err) = sess
|
||||
.goal_runtime_apply(GoalRuntimeEvent::UsageLimitReached {
|
||||
turn_context: turn_context.as_ref(),
|
||||
})
|
||||
.await
|
||||
{
|
||||
warn!("failed to usage-limit active goal after usage-limit error: {err}");
|
||||
}
|
||||
error!("Failed to run pre-sampling compact");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
if pre_sampling_compact.reset_client_session {
|
||||
client_session.reset_websocket_session();
|
||||
if let Err(err) = run_pre_sampling_compact(&sess, &turn_context, &mut client_session).await {
|
||||
if err.to_codex_protocol_error() == CodexErrorInfo::UsageLimitExceeded
|
||||
&& let Err(err) = sess
|
||||
.goal_runtime_apply(GoalRuntimeEvent::UsageLimitReached {
|
||||
turn_context: turn_context.as_ref(),
|
||||
})
|
||||
.await
|
||||
{
|
||||
warn!("failed to usage-limit active goal after usage-limit error: {err}");
|
||||
}
|
||||
error!("Failed to run pre-sampling compact");
|
||||
return None;
|
||||
}
|
||||
|
||||
let skills_outcome = Some(turn_context.turn_skills.outcome.as_ref());
|
||||
@@ -519,7 +512,7 @@ pub(crate) async fn run_turn(
|
||||
|
||||
// as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop.
|
||||
if token_limit_reached && needs_follow_up {
|
||||
let reset_client_session = match run_auto_compact(
|
||||
match run_auto_compact(
|
||||
&sess,
|
||||
&turn_context,
|
||||
&mut client_session,
|
||||
@@ -529,7 +522,7 @@ pub(crate) async fn run_turn(
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(reset_client_session) => reset_client_session,
|
||||
Ok(()) => {}
|
||||
Err(err) => {
|
||||
if err.to_codex_protocol_error() == CodexErrorInfo::UsageLimitExceeded
|
||||
&& let Err(err) = sess
|
||||
@@ -545,9 +538,6 @@ pub(crate) async fn run_turn(
|
||||
return None;
|
||||
}
|
||||
};
|
||||
if reset_client_session {
|
||||
client_session.reset_websocket_session();
|
||||
}
|
||||
can_drain_pending_input = !model_needs_follow_up;
|
||||
continue;
|
||||
}
|
||||
@@ -767,24 +757,19 @@ async fn track_turn_resolved_config_analytics(
|
||||
});
|
||||
}
|
||||
|
||||
struct PreSamplingCompactResult {
|
||||
reset_client_session: bool,
|
||||
}
|
||||
|
||||
async fn run_pre_sampling_compact(
|
||||
sess: &Arc<Session>,
|
||||
turn_context: &Arc<TurnContext>,
|
||||
client_session: &mut ModelClientSession,
|
||||
) -> CodexResult<PreSamplingCompactResult> {
|
||||
) -> CodexResult<()> {
|
||||
let total_usage_tokens_before_compaction = sess.get_total_token_usage().await;
|
||||
let mut pre_sampling_compacted = maybe_run_previous_model_inline_compact(
|
||||
maybe_run_previous_model_inline_compact(
|
||||
sess,
|
||||
turn_context,
|
||||
client_session,
|
||||
total_usage_tokens_before_compaction,
|
||||
)
|
||||
.await?;
|
||||
let mut reset_client_session = pre_sampling_compacted;
|
||||
let total_usage_tokens = sess.get_total_token_usage().await;
|
||||
let auto_compact_limit = turn_context
|
||||
.model_info
|
||||
@@ -792,7 +777,7 @@ async fn run_pre_sampling_compact(
|
||||
.unwrap_or(i64::MAX);
|
||||
// Compact if the total usage tokens are greater than the auto compact limit
|
||||
if total_usage_tokens >= auto_compact_limit {
|
||||
reset_client_session |= run_auto_compact(
|
||||
run_auto_compact(
|
||||
sess,
|
||||
turn_context,
|
||||
client_session,
|
||||
@@ -801,27 +786,22 @@ async fn run_pre_sampling_compact(
|
||||
CompactionPhase::PreTurn,
|
||||
)
|
||||
.await?;
|
||||
pre_sampling_compacted = true;
|
||||
}
|
||||
Ok(PreSamplingCompactResult {
|
||||
reset_client_session: pre_sampling_compacted && reset_client_session,
|
||||
})
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Runs pre-sampling compaction against the previous model when switching to a smaller
|
||||
/// context-window model.
|
||||
///
|
||||
/// Returns `Ok(true)` when compaction ran successfully, `Ok(false)` when compaction was skipped
|
||||
/// because the model/context-window preconditions were not met, and `Err(_)` only when compaction
|
||||
/// was attempted and failed.
|
||||
/// Returns `Err(_)` only when compaction was attempted and failed.
|
||||
async fn maybe_run_previous_model_inline_compact(
|
||||
sess: &Arc<Session>,
|
||||
turn_context: &Arc<TurnContext>,
|
||||
client_session: &mut ModelClientSession,
|
||||
total_usage_tokens: i64,
|
||||
) -> CodexResult<bool> {
|
||||
) -> CodexResult<()> {
|
||||
let Some(previous_turn_settings) = sess.previous_turn_settings().await else {
|
||||
return Ok(false);
|
||||
return Ok(());
|
||||
};
|
||||
let previous_model_turn_context = Arc::new(
|
||||
turn_context
|
||||
@@ -830,10 +810,10 @@ async fn maybe_run_previous_model_inline_compact(
|
||||
);
|
||||
|
||||
let Some(old_context_window) = previous_model_turn_context.model_context_window() else {
|
||||
return Ok(false);
|
||||
return Ok(());
|
||||
};
|
||||
let Some(new_context_window) = turn_context.model_context_window() else {
|
||||
return Ok(false);
|
||||
return Ok(());
|
||||
};
|
||||
let new_auto_compact_limit = turn_context
|
||||
.model_info
|
||||
@@ -843,7 +823,7 @@ async fn maybe_run_previous_model_inline_compact(
|
||||
&& previous_model_turn_context.model_info.slug != turn_context.model_info.slug
|
||||
&& old_context_window > new_context_window;
|
||||
if should_run {
|
||||
let _ = run_auto_compact(
|
||||
run_auto_compact(
|
||||
sess,
|
||||
&previous_model_turn_context,
|
||||
client_session,
|
||||
@@ -852,9 +832,8 @@ async fn maybe_run_previous_model_inline_compact(
|
||||
CompactionPhase::PreTurn,
|
||||
)
|
||||
.await?;
|
||||
return Ok(true);
|
||||
}
|
||||
Ok(false)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_auto_compact(
|
||||
@@ -864,7 +843,7 @@ async fn run_auto_compact(
|
||||
initial_context_injection: InitialContextInjection,
|
||||
reason: CompactionReason,
|
||||
phase: CompactionPhase,
|
||||
) -> CodexResult<bool> {
|
||||
) -> CodexResult<()> {
|
||||
if should_use_remote_compact_task(turn_context.provider.info()) {
|
||||
if turn_context.features.enabled(Feature::RemoteCompactionV2) {
|
||||
run_inline_remote_auto_compact_task_v2(
|
||||
@@ -876,7 +855,7 @@ async fn run_auto_compact(
|
||||
phase,
|
||||
)
|
||||
.await?;
|
||||
return Ok(false);
|
||||
return Ok(());
|
||||
}
|
||||
run_inline_remote_auto_compact_task(
|
||||
Arc::clone(sess),
|
||||
@@ -896,7 +875,7 @@ async fn run_auto_compact(
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
Ok(true)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(super) fn collect_explicit_app_ids_from_skill_items(
|
||||
|
||||
Reference in New Issue
Block a user