Compare commits

...

2 Commits

Author SHA1 Message Date
Joe Gershenson
c37ce40560 Polish pending transcript queue 2026-04-21 01:55:18 -07:00
Joe Gershenson
a514bf1683 Fix interrupt transcript ordering 2026-04-21 01:52:30 -07:00
8 changed files with 289 additions and 104 deletions

View File

@@ -17,6 +17,7 @@ use codex_hooks::UserPromptSubmitRequest;
use codex_otel::HOOK_RUN_DURATION_METRIC;
use codex_otel::HOOK_RUN_METRIC;
use codex_protocol::items::TurnItem;
use codex_protocol::items::UserMessageItem;
use codex_protocol::models::DeveloperInstructions;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
@@ -34,6 +35,7 @@ use serde_json::Value;
use crate::event_mapping::parse_turn_item;
use crate::session::session::Session;
use crate::session::turn_context::TurnContext;
use crate::state::PendingTurnInput;
use crate::tools::sandboxing::PermissionRequestPayload;
pub(crate) struct HookRuntimeOutcome {
@@ -46,6 +48,12 @@ pub(crate) enum PendingInputHookDisposition {
Blocked { additional_contexts: Vec<String> },
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) enum PendingInputRecordOutcome {
Recorded,
Blocked,
}
pub(crate) enum PendingInputRecord {
UserMessage {
content: Vec<UserInput>,
@@ -268,6 +276,38 @@ pub(crate) async fn inspect_pending_input(
}
}
pub(crate) async fn inspect_pending_turn_input(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
pending_input_item: PendingTurnInput,
) -> PendingInputHookDisposition {
match pending_input_item {
PendingTurnInput::UserInput(input) => {
let response_item: ResponseItem = ResponseInputItem::from(input.clone()).into();
let user_prompt_submit_outcome = run_user_prompt_submit_hooks(
sess,
turn_context,
UserMessageItem::new(&input).message(),
)
.await;
if user_prompt_submit_outcome.should_stop {
PendingInputHookDisposition::Blocked {
additional_contexts: user_prompt_submit_outcome.additional_contexts,
}
} else {
PendingInputHookDisposition::Accepted(Box::new(PendingInputRecord::UserMessage {
content: input,
response_item,
additional_contexts: user_prompt_submit_outcome.additional_contexts,
}))
}
}
PendingTurnInput::ResponseInputItem(input) => {
inspect_pending_input(sess, turn_context, input).await
}
}
}
pub(crate) async fn record_pending_input(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
@@ -294,6 +334,25 @@ pub(crate) async fn record_pending_input(
}
}
pub(crate) async fn record_pending_turn_input(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
pending_input_item: PendingTurnInput,
) -> PendingInputRecordOutcome {
match inspect_pending_turn_input(sess, turn_context, pending_input_item).await {
PendingInputHookDisposition::Accepted(pending_input) => {
record_pending_input(sess, turn_context, *pending_input).await;
PendingInputRecordOutcome::Recorded
}
PendingInputHookDisposition::Blocked {
additional_contexts,
} => {
record_additional_contexts(sess, turn_context, additional_contexts).await;
PendingInputRecordOutcome::Blocked
}
}
}
async fn run_context_injecting_hook<Fut, Outcome>(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,

View File

@@ -252,6 +252,8 @@ use crate::agents_md::AgentsMdManager;
use crate::context::UserInstructions;
use crate::exec_policy::ExecPolicyUpdateError;
use crate::guardian::GuardianReviewSessionManager;
use crate::hook_runtime::PendingInputRecordOutcome;
use crate::hook_runtime::record_pending_turn_input;
use crate::mcp::McpManager;
use crate::memories;
use crate::network_policy_decision::execpolicy_network_rule_amendment;
@@ -269,6 +271,7 @@ use crate::skills_watcher::SkillsWatcher;
use crate::skills_watcher::SkillsWatcherEvent;
use crate::state::ActiveTurn;
use crate::state::MailboxDeliveryPhase;
use crate::state::PendingTurnInput;
use crate::state::SessionServices;
use crate::state::SessionState;
#[cfg(test)]
@@ -2888,7 +2891,7 @@ impl Session {
}
let mut turn_state = active_turn.turn_state.lock().await;
turn_state.push_pending_input(input.into());
turn_state.push_pending_input(input);
turn_state.accept_mailbox_delivery_for_current_turn();
Ok(active_turn_id.clone())
}
@@ -2975,12 +2978,18 @@ impl Session {
clippy::await_holding_invalid_type,
reason = "active turn checks and turn state updates must remain atomic"
)]
#[cfg(test)]
pub async fn prepend_pending_input(&self, input: Vec<ResponseInputItem>) -> Result<(), ()> {
let mut active = self.active_turn.lock().await;
match active.as_mut() {
Some(at) => {
let mut ts = at.turn_state.lock().await;
ts.prepend_pending_input(input);
ts.prepend_pending_input(
input
.into_iter()
.map(PendingTurnInput::ResponseInputItem)
.collect(),
);
Ok(())
}
None => Err(()),
@@ -2991,7 +3000,7 @@ impl Session {
clippy::await_holding_invalid_type,
reason = "active turn checks and turn state updates must remain atomic"
)]
pub async fn get_pending_input(&self) -> Vec<ResponseInputItem> {
pub(crate) async fn get_pending_turn_input(&self) -> Vec<PendingTurnInput> {
let (pending_input, accepts_mailbox_delivery) = {
let mut active = self.active_turn.lock().await;
match active.as_mut() {
@@ -3013,7 +3022,7 @@ impl Session {
mailbox_rx
.drain()
.into_iter()
.map(|mail| mail.to_response_input_item())
.map(|mail| PendingTurnInput::ResponseInputItem(mail.to_response_input_item()))
.collect::<Vec<_>>()
};
if pending_input.is_empty() {
@@ -3027,6 +3036,73 @@ impl Session {
}
}
#[cfg(test)]
pub async fn get_pending_input(&self) -> Vec<ResponseInputItem> {
self.get_pending_turn_input()
.await
.into_iter()
.map(ResponseInputItem::from)
.collect()
}
async fn fill_pending_input_from_mailbox_if_empty(
&self,
turn_state: &Arc<Mutex<crate::state::TurnState>>,
) {
let should_drain_mailbox = {
let ts = turn_state.lock().await;
!ts.has_pending_input() && ts.accepts_mailbox_delivery_for_current_turn()
};
if !should_drain_mailbox {
return;
}
let mailbox_items = {
let mut mailbox_rx = self.mailbox_rx.lock().await;
mailbox_rx
.drain()
.into_iter()
.map(|mail| PendingTurnInput::ResponseInputItem(mail.to_response_input_item()))
.collect::<Vec<_>>()
};
if mailbox_items.is_empty() {
return;
}
let mut ts = turn_state.lock().await;
for item in mailbox_items {
ts.push_pending_input(item);
}
}
#[expect(
clippy::await_holding_invalid_type,
reason = "pending transcript input must stay queued until hooks and history writes finish"
)]
pub(crate) async fn record_next_pending_turn_input_from_state(
self: &Arc<Self>,
turn_context: &Arc<crate::session::turn_context::TurnContext>,
turn_state: &Arc<Mutex<crate::state::TurnState>>,
) -> Option<PendingInputRecordOutcome> {
self.fill_pending_input_from_mailbox_if_empty(turn_state)
.await;
let mut ts = turn_state.lock().await;
let pending_input = ts.front_pending_input()?;
let outcome = record_pending_turn_input(self, turn_context, pending_input).await;
let _ = ts.pop_front_pending_input();
Some(outcome)
}
pub(crate) async fn record_next_pending_turn_input(
self: &Arc<Self>,
turn_context: &Arc<crate::session::turn_context::TurnContext>,
) -> Option<PendingInputRecordOutcome> {
let turn_state = self.turn_state_for_sub_id(&turn_context.sub_id).await?;
self.record_next_pending_turn_input_from_state(turn_context, &turn_state)
.await
}
/// Queue response items to be injected into the next active turn created for this session.
#[cfg(test)]
pub(crate) async fn queue_response_items_for_next_turn(&self, items: Vec<ResponseInputItem>) {

View File

@@ -310,7 +310,10 @@ async fn interrupting_regular_turn_waiting_on_startup_prewarm_emits_turn_aborted
.await;
sess.spawn_task(
Arc::clone(&tc),
Vec::new(),
vec![UserInput::Text {
text: "first prompt".to_string(),
text_elements: Vec::new(),
}],
crate::tasks::RegularTask::new(),
)
.await;
@@ -326,23 +329,51 @@ async fn interrupting_regular_turn_waiting_on_startup_prewarm_emits_turn_aborted
sess.abort_all_tasks(TurnAbortReason::Interrupted).await;
let second = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv())
.await
.expect("expected turn aborted event")
.expect("channel open");
let EventMsg::TurnAborted(TurnAbortedEvent {
let aborted = loop {
let event = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv())
.await
.expect("expected turn aborted event")
.expect("channel open");
if let EventMsg::TurnAborted(event) = event.msg {
break event;
}
};
let TurnAbortedEvent {
turn_id,
reason,
completed_at,
duration_ms,
}) = second.msg
else {
panic!("expected turn aborted event");
};
} = aborted;
assert_eq!(turn_id, Some(tc.sub_id.clone()));
assert_eq!(reason, TurnAbortReason::Interrupted);
assert!(completed_at.is_some());
assert!(duration_ms.is_some());
let history = sess.clone_history().await;
let expected_prompt = user_message("first prompt");
let prompt_idx = history
.raw_items()
.iter()
.position(|item| item == &expected_prompt);
let aborted_idx = history.raw_items().iter().position(|item| {
let ResponseItem::Message { role, content, .. } = item else {
return false;
};
role == "user"
&& content.iter().any(|content_item| {
let ContentItem::InputText { text } = content_item else {
return false;
};
TurnAborted::matches_text(text)
})
});
let (Some(prompt_idx), Some(aborted_idx)) = (prompt_idx, aborted_idx) else {
panic!(
"expected prompt and interrupted-turn marker in history: {:?}",
history.raw_items()
);
};
assert!(prompt_idx < aborted_idx);
}
fn test_model_client_session() -> crate::client::ModelClientSession {

View File

@@ -18,13 +18,9 @@ use crate::compact_remote::run_inline_remote_auto_compact_task;
use crate::connectors;
use crate::context::ContextualUserFragment;
use crate::feedback_tags;
use crate::hook_runtime::PendingInputHookDisposition;
use crate::hook_runtime::PendingInputRecordOutcome;
use crate::hook_runtime::emit_hook_completed_events;
use crate::hook_runtime::inspect_pending_input;
use crate::hook_runtime::record_additional_contexts;
use crate::hook_runtime::record_pending_input;
use crate::hook_runtime::run_pending_session_start_hooks;
use crate::hook_runtime::run_user_prompt_submit_hooks;
use crate::injection::ToolMentionKind;
use crate::injection::app_id_from_path;
use crate::injection::tool_kind_for_path;
@@ -75,7 +71,6 @@ use codex_protocol::error::CodexErr;
use codex_protocol::error::Result as CodexResult;
use codex_protocol::items::PlanItem;
use codex_protocol::items::TurnItem;
use codex_protocol::items::UserMessageItem;
use codex_protocol::items::build_hook_prompt_message;
use codex_protocol::models::BaseInstructions;
use codex_protocol::models::ContentItem;
@@ -300,30 +295,12 @@ pub(crate) async fn run_turn(
if run_pending_session_start_hooks(&sess, &turn_context).await {
return None;
}
let additional_contexts = if input.is_empty() {
Vec::new()
} else {
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input.clone());
let response_item: ResponseItem = initial_input_for_turn.clone().into();
let user_prompt_submit_outcome = run_user_prompt_submit_hooks(
&sess,
&turn_context,
UserMessageItem::new(&input).message(),
)
.await;
if user_prompt_submit_outcome.should_stop {
record_additional_contexts(
&sess,
&turn_context,
user_prompt_submit_outcome.additional_contexts,
)
.await;
return None;
}
sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), &input, response_item)
.await;
user_prompt_submit_outcome.additional_contexts
};
if !input.is_empty()
&& sess.record_next_pending_turn_input(&turn_context).await
!= Some(PendingInputRecordOutcome::Recorded)
{
return None;
}
sess.services
.analytics_events_client
.track_app_mentioned(tracking.clone(), mentioned_app_invocations);
@@ -334,7 +311,6 @@ pub(crate) async fn run_turn(
}
sess.merge_connector_selection(explicitly_enabled_connectors.clone())
.await;
record_additional_contexts(&sess, &turn_context, additional_contexts).await;
if !input.is_empty() {
// Track the previous-turn baseline from the regular user-turn path only so
// standalone tasks (compact/shell/review/undo) cannot suppress future
@@ -403,7 +379,7 @@ pub(crate) async fn run_turn(
};
// Pending input is drained into history before building the next model request.
// However, we defer that drain until after sampling in two cases:
// 1. At the start of a turn, so the fresh user prompt in `input` gets sampled first.
// 1. At the start of a turn, so work queued behind the fresh prompt is sampled later.
// 2. After auto-compact, when model/tool continuation needs to resume before any steer.
let mut can_drain_pending_input = input.is_empty();
@@ -412,35 +388,18 @@ pub(crate) async fn run_turn(
break;
}
// Note that pending_input would be something like a message the user
// submitted through the UI while the model was running. Though the UI
// may support this, the model might not.
let pending_input = if can_drain_pending_input {
sess.get_pending_input().await
} else {
Vec::new()
};
let mut blocked_pending_input = false;
let mut blocked_pending_input_contexts = Vec::new();
let mut requeued_pending_input = false;
let mut accepted_pending_input = Vec::new();
if !pending_input.is_empty() {
let mut pending_input_iter = pending_input.into_iter();
while let Some(pending_input_item) = pending_input_iter.next() {
match inspect_pending_input(&sess, &turn_context, pending_input_item).await {
PendingInputHookDisposition::Accepted(pending_input) => {
accepted_pending_input.push(*pending_input);
let mut has_accepted_pending_input = false;
if can_drain_pending_input {
// Note that pending input would be something like a message the user
// submitted through the UI while the model was running. Though the UI
// may support this, the model might not.
while let Some(outcome) = sess.record_next_pending_turn_input(&turn_context).await {
match outcome {
PendingInputRecordOutcome::Recorded => {
has_accepted_pending_input = true;
}
PendingInputHookDisposition::Blocked {
additional_contexts,
} => {
let remaining_pending_input = pending_input_iter.collect::<Vec<_>>();
if !remaining_pending_input.is_empty() {
let _ = sess.prepend_pending_input(remaining_pending_input).await;
requeued_pending_input = true;
}
blocked_pending_input_contexts = additional_contexts;
PendingInputRecordOutcome::Blocked => {
blocked_pending_input = true;
break;
}
@@ -448,14 +407,8 @@ pub(crate) async fn run_turn(
}
}
let has_accepted_pending_input = !accepted_pending_input.is_empty();
for pending_input in accepted_pending_input {
record_pending_input(&sess, &turn_context, pending_input).await;
}
record_additional_contexts(&sess, &turn_context, blocked_pending_input_contexts).await;
if blocked_pending_input && !has_accepted_pending_input {
if requeued_pending_input {
if sess.has_pending_input().await {
continue;
}
break;

View File

@@ -6,6 +6,7 @@ pub(crate) use service::SessionServices;
pub(crate) use session::SessionState;
pub(crate) use turn::ActiveTurn;
pub(crate) use turn::MailboxDeliveryPhase;
pub(crate) use turn::PendingTurnInput;
pub(crate) use turn::RunningTask;
pub(crate) use turn::TaskKind;
pub(crate) use turn::TurnState;

View File

@@ -13,6 +13,7 @@ use codex_protocol::dynamic_tools::DynamicToolResponse;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::request_permissions::RequestPermissionsResponse;
use codex_protocol::request_user_input::RequestUserInputResponse;
use codex_protocol::user_input::UserInput;
use codex_rmcp_client::ElicitationResponse;
use rmcp::model::RequestId;
use tokio::sync::oneshot;
@@ -77,6 +78,38 @@ pub(crate) struct RunningTask {
pub(crate) _timer: Option<codex_otel::Timer>,
}
/// Input queued for ordered transcript recording during an active turn.
///
/// User prompts keep the original `UserInput` so client-visible turn items can
/// preserve UI-only spans such as `text_elements`; model-only response input
/// items can stay in their serialized Responses API form.
#[derive(Clone, Debug, PartialEq)]
pub(crate) enum PendingTurnInput {
UserInput(Vec<UserInput>),
ResponseInputItem(ResponseInputItem),
}
impl From<PendingTurnInput> for ResponseInputItem {
fn from(value: PendingTurnInput) -> Self {
match value {
PendingTurnInput::UserInput(input) => input.into(),
PendingTurnInput::ResponseInputItem(input) => input,
}
}
}
impl From<Vec<UserInput>> for PendingTurnInput {
fn from(value: Vec<UserInput>) -> Self {
Self::UserInput(value)
}
}
impl From<ResponseInputItem> for PendingTurnInput {
fn from(value: ResponseInputItem) -> Self {
Self::ResponseInputItem(value)
}
}
impl ActiveTurn {
pub(crate) fn add_task(&mut self, task: RunningTask) {
let sub_id = task.turn_context.sub_id.clone();
@@ -101,7 +134,7 @@ pub(crate) struct TurnState {
pending_user_input: HashMap<String, oneshot::Sender<RequestUserInputResponse>>,
pending_elicitations: HashMap<(String, RequestId), oneshot::Sender<ElicitationResponse>>,
pending_dynamic_tools: HashMap<String, oneshot::Sender<DynamicToolResponse>>,
pending_input: Vec<ResponseInputItem>,
pending_input: Vec<PendingTurnInput>,
mailbox_delivery_phase: MailboxDeliveryPhase,
granted_permissions: Option<PermissionProfile>,
pub(crate) tool_calls: u64,
@@ -198,11 +231,12 @@ impl TurnState {
self.pending_dynamic_tools.remove(key)
}
pub(crate) fn push_pending_input(&mut self, input: ResponseInputItem) {
self.pending_input.push(input);
pub(crate) fn push_pending_input(&mut self, input: impl Into<PendingTurnInput>) {
self.pending_input.push(input.into());
}
pub(crate) fn prepend_pending_input(&mut self, mut input: Vec<ResponseInputItem>) {
#[cfg(test)]
pub(crate) fn prepend_pending_input(&mut self, mut input: Vec<PendingTurnInput>) {
if input.is_empty() {
return;
}
@@ -211,7 +245,19 @@ impl TurnState {
self.pending_input = input;
}
pub(crate) fn take_pending_input(&mut self) -> Vec<ResponseInputItem> {
pub(crate) fn front_pending_input(&self) -> Option<PendingTurnInput> {
self.pending_input.first().cloned()
}
pub(crate) fn pop_front_pending_input(&mut self) -> Option<PendingTurnInput> {
if self.pending_input.is_empty() {
None
} else {
Some(self.pending_input.remove(0))
}
}
pub(crate) fn take_pending_input(&mut self) -> Vec<PendingTurnInput> {
if self.pending_input.is_empty() {
Vec::with_capacity(0)
} else {

View File

@@ -20,13 +20,11 @@ use tracing::trace;
use tracing::warn;
use crate::context::ContextualUserFragment;
use crate::hook_runtime::PendingInputHookDisposition;
use crate::hook_runtime::inspect_pending_input;
use crate::hook_runtime::record_additional_contexts;
use crate::hook_runtime::record_pending_input;
use crate::hook_runtime::record_pending_turn_input;
use crate::session::session::Session;
use crate::session::turn_context::TurnContext;
use crate::state::ActiveTurn;
use crate::state::PendingTurnInput;
use crate::state::RunningTask;
use crate::state::TaskKind;
use codex_analytics::TurnTokenUsageFact;
@@ -38,7 +36,6 @@ use codex_otel::TURN_MEMORY_METRIC;
use codex_otel::TURN_NETWORK_PROXY_METRIC;
use codex_otel::TURN_TOKEN_USAGE_METRIC;
use codex_otel::TURN_TOOL_CALL_METRIC;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::RolloutItem;
@@ -149,6 +146,11 @@ pub(crate) trait SessionTask: Send + Sync + 'static {
/// Returns the tracing name for a spawned task span.
fn span_name(&self) -> &'static str;
/// Whether the submitted user input should be queued for ordered transcript recording.
fn queues_initial_input(&self) -> bool {
false
}
/// Executes the task until completion or cancellation.
///
/// Implementations typically stream protocol events using `session` and
@@ -186,6 +188,8 @@ pub(crate) trait AnySessionTask: Send + Sync + 'static {
fn span_name(&self) -> &'static str;
fn queues_initial_input(&self) -> bool;
fn run(
self: Arc<Self>,
session: Arc<SessionTaskContext>,
@@ -213,6 +217,10 @@ where
SessionTask::span_name(self)
}
fn queues_initial_input(&self) -> bool {
SessionTask::queues_initial_input(self)
}
fn run(
self: Arc<Self>,
session: Arc<SessionTaskContext>,
@@ -259,6 +267,7 @@ impl Session {
let task: Arc<dyn AnySessionTask> = Arc::new(task);
let task_kind = task.kind();
let span_name = task.span_name();
let queue_initial_input = task.queues_initial_input() && !input.is_empty();
let started_at = Instant::now();
turn_context
.turn_timing_state
@@ -270,7 +279,7 @@ impl Session {
let done = Arc::new(Notify::new());
let queued_response_items = self.take_queued_response_items_for_next_turn().await;
let mailbox_items = self.get_pending_input().await;
let mailbox_items = self.get_pending_turn_input().await;
let turn_state = {
let mut active = self.active_turn.lock().await;
let turn = active.get_or_insert_with(ActiveTurn::default);
@@ -280,6 +289,9 @@ impl Session {
{
let mut turn_state = turn_state.lock().await;
turn_state.token_usage_at_turn_start = token_usage_at_turn_start;
if queue_initial_input {
turn_state.push_pending_input(PendingTurnInput::UserInput(input.clone()));
}
for item in queued_response_items {
turn_state.push_pending_input(item);
}
@@ -398,8 +410,10 @@ impl Session {
pub async fn abort_all_tasks(self: &Arc<Self>, reason: TurnAbortReason) {
if let Some(mut active_turn) = self.take_active_turn().await {
let turn_state = Arc::clone(&active_turn.turn_state);
for task in active_turn.drain_tasks() {
self.handle_task_abort(task, reason.clone()).await;
self.handle_task_abort(task, reason.clone(), Arc::clone(&turn_state))
.await;
}
// Let interrupted tasks observe cancellation before dropping pending approvals, or an
// in-flight approval wait can surface as a model-visible rejection before TurnAborted.
@@ -419,7 +433,7 @@ impl Session {
.turn_metadata_state
.cancel_git_enrichment_task();
let mut pending_input = Vec::<ResponseInputItem>::new();
let mut pending_input = Vec::<PendingTurnInput>::new();
let mut should_clear_active_turn = false;
let mut token_usage_at_turn_start = None;
let mut turn_had_memory_citation = false;
@@ -448,16 +462,7 @@ impl Session {
}
if !pending_input.is_empty() {
for pending_input_item in pending_input {
match inspect_pending_input(self, &turn_context, pending_input_item).await {
PendingInputHookDisposition::Accepted(pending_input) => {
record_pending_input(self, &turn_context, *pending_input).await;
}
PendingInputHookDisposition::Blocked {
additional_contexts,
} => {
record_additional_contexts(self, &turn_context, additional_contexts).await;
}
}
record_pending_turn_input(self, &turn_context, pending_input_item).await;
}
}
// Emit token usage metrics.
@@ -594,7 +599,12 @@ impl Session {
}
}
async fn handle_task_abort(self: &Arc<Self>, task: RunningTask, reason: TurnAbortReason) {
async fn handle_task_abort(
self: &Arc<Self>,
task: RunningTask,
reason: TurnAbortReason,
turn_state: Arc<tokio::sync::Mutex<crate::state::TurnState>>,
) {
let sub_id = task.turn_context.sub_id.clone();
if task.cancellation_token.is_cancelled() {
return;
@@ -624,6 +634,11 @@ impl Session {
if reason == TurnAbortReason::Interrupted {
self.cleanup_after_interrupt(&task.turn_context).await;
while self
.record_next_pending_turn_input_from_state(&task.turn_context, &turn_state)
.await
.is_some()
{}
let marker = interrupted_turn_history_marker();
self.record_into_history(std::slice::from_ref(&marker), task.turn_context.as_ref())

View File

@@ -33,6 +33,10 @@ impl SessionTask for RegularTask {
"session_task.turn"
}
fn queues_initial_input(&self) -> bool {
true
}
async fn run(
self: Arc<Self>,
session: Arc<SessionTaskContext>,