Compare commits

...

2 Commits

Author SHA1 Message Date
Joe Gershenson
84b9e8e0ff Defer steered input until after compact continuation 2026-04-08 12:40:34 -07:00
Joe Gershenson
49e4d19551 Add Warp OSC 9 notification support 2026-04-05 21:19:33 -07:00
4 changed files with 136 additions and 2 deletions

View File

@@ -6019,6 +6019,7 @@ pub(crate) async fn run_turn(
// one instance across retries within this turn.
let mut client_session =
prewarmed_client_session.unwrap_or_else(|| sess.services.model_client.new_session());
let mut can_record_pending_input = input.is_empty();
loop {
if run_pending_session_start_hooks(&sess, &turn_context).await {
@@ -6028,7 +6029,11 @@ pub(crate) async fn run_turn(
// Note that pending_input would be something like a message the user
// submitted through the UI while the model was running. Though the UI
// may support this, the model might not.
let pending_input = sess.get_pending_input().await;
let pending_input = if can_record_pending_input {
sess.get_pending_input().await
} else {
Vec::new()
};
let mut blocked_pending_input = false;
let mut blocked_pending_input_contexts = Vec::new();
@@ -6105,6 +6110,7 @@ pub(crate) async fn run_turn(
needs_follow_up,
last_agent_message: sampling_request_last_agent_message,
} = sampling_request_output;
can_record_pending_input = true;
let total_usage_tokens = sess.get_total_token_usage().await;
let token_limit_reached = total_usage_tokens >= auto_compact_limit;
@@ -6134,6 +6140,7 @@ pub(crate) async fn run_turn(
return None;
}
client_session.reset_websocket_session();
can_record_pending_input = false;
continue;
}

View File

@@ -11,6 +11,7 @@ use core_test_support::context_snapshot;
use core_test_support::context_snapshot::ContextSnapshotOptions;
use core_test_support::responses;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_completed_with_tokens;
use core_test_support::responses::ev_function_call;
use core_test_support::responses::ev_message_item_added;
use core_test_support::responses::ev_output_text_delta;
@@ -426,3 +427,98 @@ async fn user_input_does_not_preempt_after_reasoning_item() {
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn steered_user_input_waits_for_model_continuation_after_mid_turn_compact() {
let first_chunks = vec![
chunk(ev_response_created("resp-1")),
chunk(ev_function_call("call-1", "test_tool", "{}")),
chunk(ev_completed_with_tokens(
"resp-1", /*total_tokens*/ 500,
)),
];
let compact_chunks = vec![
chunk(ev_response_created("resp-compact")),
chunk(ev_message_item_done("msg-compact", "AUTO_COMPACT_SUMMARY")),
chunk(ev_completed_with_tokens(
"resp-compact",
/*total_tokens*/ 50,
)),
];
let post_compact_continuation_chunks = vec![
chunk(ev_response_created("resp-post-compact")),
chunk(ev_message_item_added("msg-post-compact", "")),
chunk(ev_output_text_delta("resumed old task")),
chunk(ev_message_item_done("msg-post-compact", "resumed old task")),
chunk(ev_completed_with_tokens(
"resp-post-compact",
/*total_tokens*/ 60,
)),
];
let steered_follow_up_chunks = vec![
chunk(ev_response_created("resp-steered")),
chunk(ev_message_item_done(
"msg-steered",
"processed steered prompt",
)),
chunk(ev_completed_with_tokens(
"resp-steered",
/*total_tokens*/ 70,
)),
];
let (server, _completions) = start_streaming_sse_server(vec![
first_chunks,
compact_chunks,
post_compact_continuation_chunks,
steered_follow_up_chunks,
])
.await;
let codex = test_codex()
.with_model("gpt-5.1")
.with_config(|config| {
config.model_provider.name = "OpenAI (test)".to_string();
config.model_provider.supports_websockets = false;
config.model_auto_compact_token_limit = Some(200);
})
.build_with_streaming_server(&server)
.await
.unwrap_or_else(|err| panic!("build streaming Codex test session: {err}"))
.codex;
submit_user_input(&codex, "first prompt").await;
submit_user_input(&codex, "second prompt").await;
wait_for_agent_message(&codex, "resumed old task").await;
wait_for_turn_complete(&codex).await;
let requests = server.requests().await;
assert_eq!(requests.len(), 4);
let post_compact_body: Value =
from_slice(&requests[2]).unwrap_or_else(|err| panic!("parse post-compact request: {err}"));
let steered_body: Value =
from_slice(&requests[3]).unwrap_or_else(|err| panic!("parse steered request: {err}"));
let post_compact_user_texts = message_input_texts(&post_compact_body, "user");
assert!(
!post_compact_user_texts
.iter()
.any(|text| text == "second prompt"),
"steered input should stay pending until the model resumes after compaction"
);
let steered_user_texts = message_input_texts(&steered_body, "user");
assert!(
steered_user_texts
.iter()
.any(|text| text == "second prompt"),
"steered input should be recorded on the request after the post-compact continuation"
);
server.shutdown().await;
}

View File

@@ -8,6 +8,8 @@ use bel::BelBackend;
use codex_config::types::NotificationMethod;
use osc9::Osc9Backend;
const WARP_CLI_AGENT_PROTOCOL_VERSION_ENV_VAR: &str = "WARP_CLI_AGENT_PROTOCOL_VERSION";
#[derive(Debug)]
pub enum DesktopNotificationBackend {
Osc9(Osc9Backend),
@@ -48,10 +50,19 @@ pub fn detect_backend(method: NotificationMethod) -> DesktopNotificationBackend
DesktopNotificationBackend::for_method(method)
}
pub(crate) fn warp_cli_agent_protocol_detected() -> bool {
env::var_os(WARP_CLI_AGENT_PROTOCOL_VERSION_ENV_VAR).is_some()
}
fn supports_osc9() -> bool {
if env::var_os("WT_SESSION").is_some() {
return false;
}
// Warp declares OSC 9 support via its CLI agent protocol env var. This also
// covers supported Warp sessions across SSH where TERM_PROGRAM is absent.
if warp_cli_agent_protocol_detected() {
return true;
}
// Prefer TERM_PROGRAM when present, but keep fallbacks for shells/launchers
// that don't set it (e.g., tmux/ssh) to avoid regressing OSC 9 support.
if matches!(
@@ -73,6 +84,7 @@ fn supports_osc9() -> bool {
#[cfg(test)]
mod tests {
use super::WARP_CLI_AGENT_PROTOCOL_VERSION_ENV_VAR;
use super::detect_backend;
use codex_config::types::NotificationMethod;
use serial_test::serial;
@@ -133,6 +145,7 @@ mod tests {
fn auto_prefers_bel_without_hints() {
let _term = EnvVarGuard::remove("TERM");
let _term_program = EnvVarGuard::remove("TERM_PROGRAM");
let _warp_protocol = EnvVarGuard::remove(WARP_CLI_AGENT_PROTOCOL_VERSION_ENV_VAR);
let _iterm = EnvVarGuard::remove("ITERM_SESSION_ID");
let _wt = EnvVarGuard::remove("WT_SESSION");
assert!(matches!(
@@ -141,11 +154,26 @@ mod tests {
));
}
#[test]
#[serial]
fn auto_uses_osc9_for_warp_cli_agent_protocol() {
let _term = EnvVarGuard::remove("TERM");
let _term_program = EnvVarGuard::remove("TERM_PROGRAM");
let _warp_protocol = EnvVarGuard::set(WARP_CLI_AGENT_PROTOCOL_VERSION_ENV_VAR, "1");
let _iterm = EnvVarGuard::remove("ITERM_SESSION_ID");
let _wt = EnvVarGuard::remove("WT_SESSION");
assert!(matches!(
detect_backend(NotificationMethod::Auto),
super::DesktopNotificationBackend::Osc9(_)
));
}
#[test]
#[serial]
fn auto_uses_osc9_for_iterm() {
let _term = EnvVarGuard::remove("TERM");
let _term_program = EnvVarGuard::remove("TERM_PROGRAM");
let _warp_protocol = EnvVarGuard::remove(WARP_CLI_AGENT_PROTOCOL_VERSION_ENV_VAR);
let _iterm = EnvVarGuard::set("ITERM_SESSION_ID", "abc");
let _wt = EnvVarGuard::remove("WT_SESSION");
assert!(matches!(

View File

@@ -42,6 +42,7 @@ use crate::custom_terminal;
use crate::custom_terminal::Terminal as CustomTerminal;
use crate::notifications::DesktopNotificationBackend;
use crate::notifications::detect_backend;
use crate::notifications::warp_cli_agent_protocol_detected;
use crate::tui::event_stream::EventBroker;
use crate::tui::event_stream::TuiEventStream;
#[cfg(unix)]
@@ -364,7 +365,9 @@ impl Tui {
/// Emit a desktop notification now if the terminal is unfocused.
/// Returns true if a notification was posted.
pub fn notify(&mut self, message: impl AsRef<str>) -> bool {
if self.terminal_focused.load(Ordering::Relaxed) {
// Warp handles notification visibility itself but does not relay focus
// events to the PTY, so Codex should not suppress its notifications here.
if self.terminal_focused.load(Ordering::Relaxed) && !warp_cli_agent_protocol_detected() {
return false;
}