Fix dropped pending shell output

This commit is contained in:
jif-oai
2026-02-03 14:15:47 +00:00
parent 8622225ae8
commit fdd699ef07
2 changed files with 63 additions and 5 deletions

View File

@@ -4536,6 +4536,7 @@ mod tests {
use codex_app_server_protocol::AppInfo;
use codex_app_server_protocol::AuthMode;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use std::path::Path;
use std::time::Duration;
@@ -5710,6 +5711,50 @@ mod tests {
assert!(rx.try_recv().is_err());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn task_finish_persists_leftover_pending_input() {
let (sess, tc, _rx) = make_session_and_context_with_rx().await;
let input = vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}];
sess.spawn_task(
Arc::clone(&tc),
input,
NeverEndingTask {
kind: TaskKind::Regular,
listen_to_cancellation_token: false,
},
)
.await;
sess.inject_response_items(vec![ResponseInputItem::Message {
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "late pending input".to_string(),
}],
}])
.await
.expect("inject pending input into active turn");
sess.on_task_finished(Arc::clone(&tc), None).await;
let history = sess.clone_history().await;
let expected = ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "late pending input".to_string(),
}],
end_turn: None,
phase: None,
};
assert!(
history.raw_items().iter().any(|item| item == &expected),
"expected pending input to be persisted into history on turn completion"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn abort_review_task_emits_exited_then_aborted_and_records_history() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;

View File

@@ -31,6 +31,7 @@ use crate::state::ActiveTurn;
use crate::state::RunningTask;
use crate::state::TaskKind;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::user_input::UserInput;
@@ -190,15 +191,27 @@ impl Session {
last_agent_message: Option<String>,
) {
let mut active = self.active_turn.lock().await;
let should_close_processes = if let Some(at) = active.as_mut()
let mut pending_input = Vec::<ResponseInputItem>::new();
let mut should_close_processes = false;
if let Some(at) = active.as_mut()
&& at.remove_task(&turn_context.sub_id)
{
let mut ts = at.turn_state.lock().await;
pending_input = ts.take_pending_input();
should_close_processes = true;
}
if should_close_processes {
*active = None;
true
} else {
false
};
}
drop(active);
if !pending_input.is_empty() {
let pending_response_items = pending_input
.into_iter()
.map(ResponseItem::from)
.collect::<Vec<_>>();
self.record_conversation_items(turn_context.as_ref(), &pending_response_items)
.await;
}
if should_close_processes {
self.close_unified_exec_processes().await;
}