[codex] Defer steering until after sampling the model post-compaction (#17163)

## Summary
- keep pending steered input buffered until the active user prompt has
received a model response
- keep steering pending across auto-compact when there is real
model/tool continuation to resume
- allow queued steering to follow compaction immediately when the prior
model response was already final
- keep pending-input follow-up owned by `run_turn` instead of folding it
into `SamplingRequestResult`
- add regression coverage for mid-turn compaction, final-response
compaction, and compaction triggered before the next request after tool
output

## Root Cause
Steered input was drained at the top of every `run_turn` loop. After
auto-compaction, the loop continued and immediately appended any pending
steer after the compact summary, making a queued prompt look like the
newest task instead of letting the model first resume interrupted
model/tool work.

## Implementation Notes
This patch keeps the follow-up signals separated:

- `SamplingRequestResult.needs_follow_up` means model/tool continuation
is needed
- `sess.has_pending_input().await` means queued user steering exists
- `run_turn` computes the combined loop condition from those two signals

In `run_turn`:

```rust
let has_pending_input = sess.has_pending_input().await;
let needs_follow_up = model_needs_follow_up || has_pending_input;
```

After auto-compact we choose whether the next request may drain
steering:

```rust
can_drain_pending_input = !model_needs_follow_up;
```

That means:

- model/tool continuation + pending steer: compact -> resume once
without draining steer
- completed model answer + pending steer: compact -> drain/send the
steer immediately
- fresh user prompt: do not drain steering before the model has answered
the prompt once

The drain is still only `sess.get_pending_input().await`; when
`can_drain_pending_input` is false, core uses an empty local vec and
leaves the steer pending in session state.

## Validation
- PASS `cargo test -p codex-core --test all steered_user_input --
--nocapture`
- PASS `just fmt`
- PASS `git diff --check`
- NOT PASSING HERE `just fix -p codex-core` currently stops before
linting this change on an unrelated mainline test-build error:
`core/src/tools/spec_tests.rs` initializes `ToolsConfigParams` without
`image_generation_tool_auth_allowed`; this PR does not touch that file.
This commit is contained in:
jgershen-oai
2026-04-09 02:08:41 -07:00
committed by GitHub
parent 84a24fe333
commit 8f705b0702
2 changed files with 344 additions and 4 deletions

View File

@@ -3,14 +3,17 @@ use std::sync::Arc;
use codex_core::CodexThread;
use codex_protocol::AgentPath;
use codex_protocol::items::TurnItem;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::InterAgentCommunication;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::user_input::UserInput;
use core_test_support::context_snapshot;
use core_test_support::context_snapshot::ContextSnapshotOptions;
use core_test_support::responses;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_completed_with_tokens;
use core_test_support::responses::ev_function_call;
use core_test_support::responses::ev_message_item_added;
use core_test_support::responses::ev_output_text_delta;
@@ -20,6 +23,7 @@ use core_test_support::responses::ev_response_created;
use core_test_support::streaming_sse::StreamingSseChunk;
use core_test_support::streaming_sse::StreamingSseServer;
use core_test_support::streaming_sse::start_streaming_sse_server;
use core_test_support::test_codex::TestCodex;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use pretty_assertions::assert_eq;
@@ -101,6 +105,29 @@ async fn submit_user_input(codex: &CodexThread, text: &str) {
.unwrap_or_else(|err| panic!("submit user input: {err}"));
}
async fn submit_danger_full_access_user_turn(test: &TestCodex, text: &str) {
test.codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: text.to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: test.config.cwd.to_path_buf(),
approval_policy: AskForApproval::Never,
approvals_reviewer: None,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: test.session_configured.model.clone(),
effort: None,
summary: None,
service_tier: None,
collaboration_mode: None,
personality: None,
})
.await
.unwrap_or_else(|err| panic!("submit user turn: {err}"));
}
async fn steer_user_input(codex: &CodexThread, text: &str) {
codex
.steer_input(
@@ -439,3 +466,303 @@ async fn user_input_does_not_preempt_after_reasoning_item() {
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn steered_user_input_waits_for_model_continuation_after_mid_turn_compact() {
let first_chunks = vec![
chunk(ev_response_created("resp-1")),
chunk(ev_function_call("call-1", "test_tool", "{}")),
chunk(ev_completed_with_tokens(
"resp-1", /*total_tokens*/ 500,
)),
];
let compact_chunks = vec![
chunk(ev_response_created("resp-compact")),
chunk(ev_message_item_done("msg-compact", "AUTO_COMPACT_SUMMARY")),
chunk(ev_completed_with_tokens(
"resp-compact",
/*total_tokens*/ 50,
)),
];
let post_compact_continuation_chunks = vec![
chunk(ev_response_created("resp-post-compact")),
chunk(ev_message_item_added("msg-post-compact", "")),
chunk(ev_output_text_delta("resumed old task")),
chunk(ev_message_item_done("msg-post-compact", "resumed old task")),
chunk(ev_completed_with_tokens(
"resp-post-compact",
/*total_tokens*/ 60,
)),
];
let steered_follow_up_chunks = vec![
chunk(ev_response_created("resp-steered")),
chunk(ev_message_item_done(
"msg-steered",
"processed steered prompt",
)),
chunk(ev_completed_with_tokens(
"resp-steered",
/*total_tokens*/ 70,
)),
];
let (server, _completions) = start_streaming_sse_server(vec![
first_chunks,
compact_chunks,
post_compact_continuation_chunks,
steered_follow_up_chunks,
])
.await;
let codex = test_codex()
.with_model("gpt-5.1")
.with_config(|config| {
config.model_provider.name = "OpenAI (test)".to_string();
config.model_provider.supports_websockets = false;
config.model_auto_compact_token_limit = Some(200);
})
.build_with_streaming_server(&server)
.await
.unwrap_or_else(|err| panic!("build streaming Codex test session: {err}"))
.codex;
submit_user_input(&codex, "first prompt").await;
submit_user_input(&codex, "second prompt").await;
wait_for_agent_message(&codex, "resumed old task").await;
wait_for_turn_complete(&codex).await;
let requests = server.requests().await;
assert_eq!(requests.len(), 4);
let post_compact_body: Value =
from_slice(&requests[2]).unwrap_or_else(|err| panic!("parse post-compact request: {err}"));
let steered_body: Value =
from_slice(&requests[3]).unwrap_or_else(|err| panic!("parse steered request: {err}"));
let post_compact_user_texts = message_input_texts(&post_compact_body, "user");
assert!(
!post_compact_user_texts
.iter()
.any(|text| text == "second prompt"),
"steered input should stay pending until the model resumes after compaction"
);
let steered_user_texts = message_input_texts(&steered_body, "user");
assert!(
steered_user_texts
.iter()
.any(|text| text == "second prompt"),
"steered input should be recorded on the request after the post-compact continuation"
);
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn steered_user_input_follows_compact_when_only_the_steer_needs_follow_up() {
let (gate_first_completed_tx, gate_first_completed_rx) = oneshot::channel();
let first_chunks = vec![
chunk(ev_response_created("resp-1")),
chunk(ev_message_item_added("msg-1", "")),
chunk(ev_output_text_delta("first answer")),
chunk(ev_message_item_done("msg-1", "first answer")),
gated_chunk(
gate_first_completed_rx,
vec![ev_completed_with_tokens(
"resp-1", /*total_tokens*/ 500,
)],
),
];
let compact_chunks = vec![
chunk(ev_response_created("resp-compact")),
chunk(ev_message_item_done("msg-compact", "AUTO_COMPACT_SUMMARY")),
chunk(ev_completed_with_tokens(
"resp-compact",
/*total_tokens*/ 50,
)),
];
let steered_follow_up_chunks = vec![
chunk(ev_response_created("resp-steered")),
chunk(ev_message_item_done(
"msg-steered",
"processed steered prompt",
)),
chunk(ev_completed_with_tokens(
"resp-steered",
/*total_tokens*/ 70,
)),
];
let (server, _completions) =
start_streaming_sse_server(vec![first_chunks, compact_chunks, steered_follow_up_chunks])
.await;
let codex = test_codex()
.with_model("gpt-5.1")
.with_config(|config| {
config.model_provider.name = "OpenAI (test)".to_string();
config.model_provider.supports_websockets = false;
config.model_auto_compact_token_limit = Some(200);
})
.build_with_streaming_server(&server)
.await
.unwrap_or_else(|err| panic!("build streaming Codex test session: {err}"))
.codex;
submit_user_input(&codex, "first prompt").await;
wait_for_agent_message(&codex, "first answer").await;
steer_user_input(&codex, "second prompt").await;
let _ = gate_first_completed_tx.send(());
wait_for_agent_message(&codex, "processed steered prompt").await;
wait_for_turn_complete(&codex).await;
let requests = server.requests().await;
assert_eq!(requests.len(), 3);
let compact_body: Value =
from_slice(&requests[1]).unwrap_or_else(|err| panic!("parse compact request: {err}"));
let steered_body: Value =
from_slice(&requests[2]).unwrap_or_else(|err| panic!("parse steered request: {err}"));
let compact_user_texts = message_input_texts(&compact_body, "user");
assert!(
!compact_user_texts
.iter()
.any(|text| text == "second prompt"),
"steered input should not be included in the compaction request"
);
let steered_user_texts = message_input_texts(&steered_body, "user");
assert!(
steered_user_texts
.iter()
.any(|text| text == "second prompt"),
"steered input should follow compaction without an empty resume request when the model was already done"
);
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn steered_user_input_waits_when_tool_output_triggers_compact_before_next_request() {
let (gate_first_completed_tx, gate_first_completed_rx) = oneshot::channel();
let first_chunks = vec![
chunk(ev_response_created("resp-1")),
chunk(ev_function_call(
"call-1",
"shell_command",
r#"{"command":"printf '%04000d' 0","login":false,"timeout_ms":2000}"#,
)),
gated_chunk(
gate_first_completed_rx,
vec![ev_completed_with_tokens(
"resp-1", /*total_tokens*/ 100,
)],
),
];
let compact_chunks = vec![
chunk(ev_response_created("resp-compact")),
chunk(ev_message_item_done("msg-compact", "TOOL_OUTPUT_SUMMARY")),
chunk(ev_completed_with_tokens(
"resp-compact",
/*total_tokens*/ 50,
)),
];
let post_compact_continuation_chunks = vec![
chunk(ev_response_created("resp-post-compact")),
chunk(ev_message_item_done(
"msg-post-compact",
"resumed after compacting tool output",
)),
chunk(ev_completed_with_tokens(
"resp-post-compact",
/*total_tokens*/ 60,
)),
];
let steered_follow_up_chunks = vec![
chunk(ev_response_created("resp-steered")),
chunk(ev_message_item_done(
"msg-steered",
"processed steered prompt",
)),
chunk(ev_completed_with_tokens(
"resp-steered",
/*total_tokens*/ 70,
)),
];
let (server, _completions) = start_streaming_sse_server(vec![
first_chunks,
compact_chunks,
post_compact_continuation_chunks,
steered_follow_up_chunks,
])
.await;
let test = test_codex()
.with_model("gpt-5.1")
.with_config(|config| {
config.model_provider.name = "OpenAI (test)".to_string();
config.model_provider.supports_websockets = false;
config.model_auto_compact_token_limit = Some(200);
})
.build_with_streaming_server(&server)
.await
.unwrap_or_else(|err| panic!("build streaming Codex test session: {err}"));
let codex = test.codex.clone();
submit_danger_full_access_user_turn(&test, "first prompt").await;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnStarted(_))).await;
steer_user_input(&codex, "second prompt").await;
let _ = gate_first_completed_tx.send(());
wait_for_turn_complete(&codex).await;
let requests = server.requests().await;
assert_eq!(requests.len(), 4);
let compact_body: Value =
from_slice(&requests[1]).unwrap_or_else(|err| panic!("parse compact request: {err}"));
let post_compact_body: Value =
from_slice(&requests[2]).unwrap_or_else(|err| panic!("parse post-compact request: {err}"));
let steered_body: Value =
from_slice(&requests[3]).unwrap_or_else(|err| panic!("parse steered request: {err}"));
let compact_user_texts = message_input_texts(&compact_body, "user");
assert!(
!compact_user_texts
.iter()
.any(|text| text == "second prompt"),
"steered input should not be included in the compaction request"
);
let post_compact_user_texts = message_input_texts(&post_compact_body, "user");
assert!(
!post_compact_user_texts
.iter()
.any(|text| text == "second prompt"),
"steered input should stay pending until after the compacted continuation"
);
let steered_user_texts = message_input_texts(&steered_body, "user");
assert!(
steered_user_texts
.iter()
.any(|text| text == "second prompt"),
"steered input should be recorded on the request after the post-compact continuation"
);
server.shutdown().await;
}