mirror of
https://github.com/openai/codex.git
synced 2026-05-02 02:17:22 +00:00
Compare commits
2 Commits
abhinav/pl
...
codex/orde
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c37ce40560 | ||
|
|
a514bf1683 |
@@ -17,6 +17,7 @@ use codex_hooks::UserPromptSubmitRequest;
|
||||
use codex_otel::HOOK_RUN_DURATION_METRIC;
|
||||
use codex_otel::HOOK_RUN_METRIC;
|
||||
use codex_protocol::items::TurnItem;
|
||||
use codex_protocol::items::UserMessageItem;
|
||||
use codex_protocol::models::DeveloperInstructions;
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
@@ -34,6 +35,7 @@ use serde_json::Value;
|
||||
use crate::event_mapping::parse_turn_item;
|
||||
use crate::session::session::Session;
|
||||
use crate::session::turn_context::TurnContext;
|
||||
use crate::state::PendingTurnInput;
|
||||
use crate::tools::sandboxing::PermissionRequestPayload;
|
||||
|
||||
pub(crate) struct HookRuntimeOutcome {
|
||||
@@ -46,6 +48,12 @@ pub(crate) enum PendingInputHookDisposition {
|
||||
Blocked { additional_contexts: Vec<String> },
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
pub(crate) enum PendingInputRecordOutcome {
|
||||
Recorded,
|
||||
Blocked,
|
||||
}
|
||||
|
||||
pub(crate) enum PendingInputRecord {
|
||||
UserMessage {
|
||||
content: Vec<UserInput>,
|
||||
@@ -268,6 +276,38 @@ pub(crate) async fn inspect_pending_input(
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn inspect_pending_turn_input(
|
||||
sess: &Arc<Session>,
|
||||
turn_context: &Arc<TurnContext>,
|
||||
pending_input_item: PendingTurnInput,
|
||||
) -> PendingInputHookDisposition {
|
||||
match pending_input_item {
|
||||
PendingTurnInput::UserInput(input) => {
|
||||
let response_item: ResponseItem = ResponseInputItem::from(input.clone()).into();
|
||||
let user_prompt_submit_outcome = run_user_prompt_submit_hooks(
|
||||
sess,
|
||||
turn_context,
|
||||
UserMessageItem::new(&input).message(),
|
||||
)
|
||||
.await;
|
||||
if user_prompt_submit_outcome.should_stop {
|
||||
PendingInputHookDisposition::Blocked {
|
||||
additional_contexts: user_prompt_submit_outcome.additional_contexts,
|
||||
}
|
||||
} else {
|
||||
PendingInputHookDisposition::Accepted(Box::new(PendingInputRecord::UserMessage {
|
||||
content: input,
|
||||
response_item,
|
||||
additional_contexts: user_prompt_submit_outcome.additional_contexts,
|
||||
}))
|
||||
}
|
||||
}
|
||||
PendingTurnInput::ResponseInputItem(input) => {
|
||||
inspect_pending_input(sess, turn_context, input).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn record_pending_input(
|
||||
sess: &Arc<Session>,
|
||||
turn_context: &Arc<TurnContext>,
|
||||
@@ -294,6 +334,25 @@ pub(crate) async fn record_pending_input(
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn record_pending_turn_input(
|
||||
sess: &Arc<Session>,
|
||||
turn_context: &Arc<TurnContext>,
|
||||
pending_input_item: PendingTurnInput,
|
||||
) -> PendingInputRecordOutcome {
|
||||
match inspect_pending_turn_input(sess, turn_context, pending_input_item).await {
|
||||
PendingInputHookDisposition::Accepted(pending_input) => {
|
||||
record_pending_input(sess, turn_context, *pending_input).await;
|
||||
PendingInputRecordOutcome::Recorded
|
||||
}
|
||||
PendingInputHookDisposition::Blocked {
|
||||
additional_contexts,
|
||||
} => {
|
||||
record_additional_contexts(sess, turn_context, additional_contexts).await;
|
||||
PendingInputRecordOutcome::Blocked
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_context_injecting_hook<Fut, Outcome>(
|
||||
sess: &Arc<Session>,
|
||||
turn_context: &Arc<TurnContext>,
|
||||
|
||||
@@ -252,6 +252,8 @@ use crate::agents_md::AgentsMdManager;
|
||||
use crate::context::UserInstructions;
|
||||
use crate::exec_policy::ExecPolicyUpdateError;
|
||||
use crate::guardian::GuardianReviewSessionManager;
|
||||
use crate::hook_runtime::PendingInputRecordOutcome;
|
||||
use crate::hook_runtime::record_pending_turn_input;
|
||||
use crate::mcp::McpManager;
|
||||
use crate::memories;
|
||||
use crate::network_policy_decision::execpolicy_network_rule_amendment;
|
||||
@@ -269,6 +271,7 @@ use crate::skills_watcher::SkillsWatcher;
|
||||
use crate::skills_watcher::SkillsWatcherEvent;
|
||||
use crate::state::ActiveTurn;
|
||||
use crate::state::MailboxDeliveryPhase;
|
||||
use crate::state::PendingTurnInput;
|
||||
use crate::state::SessionServices;
|
||||
use crate::state::SessionState;
|
||||
#[cfg(test)]
|
||||
@@ -2888,7 +2891,7 @@ impl Session {
|
||||
}
|
||||
|
||||
let mut turn_state = active_turn.turn_state.lock().await;
|
||||
turn_state.push_pending_input(input.into());
|
||||
turn_state.push_pending_input(input);
|
||||
turn_state.accept_mailbox_delivery_for_current_turn();
|
||||
Ok(active_turn_id.clone())
|
||||
}
|
||||
@@ -2975,12 +2978,18 @@ impl Session {
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "active turn checks and turn state updates must remain atomic"
|
||||
)]
|
||||
#[cfg(test)]
|
||||
pub async fn prepend_pending_input(&self, input: Vec<ResponseInputItem>) -> Result<(), ()> {
|
||||
let mut active = self.active_turn.lock().await;
|
||||
match active.as_mut() {
|
||||
Some(at) => {
|
||||
let mut ts = at.turn_state.lock().await;
|
||||
ts.prepend_pending_input(input);
|
||||
ts.prepend_pending_input(
|
||||
input
|
||||
.into_iter()
|
||||
.map(PendingTurnInput::ResponseInputItem)
|
||||
.collect(),
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
None => Err(()),
|
||||
@@ -2991,7 +3000,7 @@ impl Session {
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "active turn checks and turn state updates must remain atomic"
|
||||
)]
|
||||
pub async fn get_pending_input(&self) -> Vec<ResponseInputItem> {
|
||||
pub(crate) async fn get_pending_turn_input(&self) -> Vec<PendingTurnInput> {
|
||||
let (pending_input, accepts_mailbox_delivery) = {
|
||||
let mut active = self.active_turn.lock().await;
|
||||
match active.as_mut() {
|
||||
@@ -3013,7 +3022,7 @@ impl Session {
|
||||
mailbox_rx
|
||||
.drain()
|
||||
.into_iter()
|
||||
.map(|mail| mail.to_response_input_item())
|
||||
.map(|mail| PendingTurnInput::ResponseInputItem(mail.to_response_input_item()))
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
if pending_input.is_empty() {
|
||||
@@ -3027,6 +3036,73 @@ impl Session {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub async fn get_pending_input(&self) -> Vec<ResponseInputItem> {
|
||||
self.get_pending_turn_input()
|
||||
.await
|
||||
.into_iter()
|
||||
.map(ResponseInputItem::from)
|
||||
.collect()
|
||||
}
|
||||
|
||||
async fn fill_pending_input_from_mailbox_if_empty(
|
||||
&self,
|
||||
turn_state: &Arc<Mutex<crate::state::TurnState>>,
|
||||
) {
|
||||
let should_drain_mailbox = {
|
||||
let ts = turn_state.lock().await;
|
||||
!ts.has_pending_input() && ts.accepts_mailbox_delivery_for_current_turn()
|
||||
};
|
||||
if !should_drain_mailbox {
|
||||
return;
|
||||
}
|
||||
|
||||
let mailbox_items = {
|
||||
let mut mailbox_rx = self.mailbox_rx.lock().await;
|
||||
mailbox_rx
|
||||
.drain()
|
||||
.into_iter()
|
||||
.map(|mail| PendingTurnInput::ResponseInputItem(mail.to_response_input_item()))
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
if mailbox_items.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let mut ts = turn_state.lock().await;
|
||||
for item in mailbox_items {
|
||||
ts.push_pending_input(item);
|
||||
}
|
||||
}
|
||||
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "pending transcript input must stay queued until hooks and history writes finish"
|
||||
)]
|
||||
pub(crate) async fn record_next_pending_turn_input_from_state(
|
||||
self: &Arc<Self>,
|
||||
turn_context: &Arc<crate::session::turn_context::TurnContext>,
|
||||
turn_state: &Arc<Mutex<crate::state::TurnState>>,
|
||||
) -> Option<PendingInputRecordOutcome> {
|
||||
self.fill_pending_input_from_mailbox_if_empty(turn_state)
|
||||
.await;
|
||||
|
||||
let mut ts = turn_state.lock().await;
|
||||
let pending_input = ts.front_pending_input()?;
|
||||
let outcome = record_pending_turn_input(self, turn_context, pending_input).await;
|
||||
let _ = ts.pop_front_pending_input();
|
||||
Some(outcome)
|
||||
}
|
||||
|
||||
pub(crate) async fn record_next_pending_turn_input(
|
||||
self: &Arc<Self>,
|
||||
turn_context: &Arc<crate::session::turn_context::TurnContext>,
|
||||
) -> Option<PendingInputRecordOutcome> {
|
||||
let turn_state = self.turn_state_for_sub_id(&turn_context.sub_id).await?;
|
||||
self.record_next_pending_turn_input_from_state(turn_context, &turn_state)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Queue response items to be injected into the next active turn created for this session.
|
||||
#[cfg(test)]
|
||||
pub(crate) async fn queue_response_items_for_next_turn(&self, items: Vec<ResponseInputItem>) {
|
||||
|
||||
@@ -310,7 +310,10 @@ async fn interrupting_regular_turn_waiting_on_startup_prewarm_emits_turn_aborted
|
||||
.await;
|
||||
sess.spawn_task(
|
||||
Arc::clone(&tc),
|
||||
Vec::new(),
|
||||
vec![UserInput::Text {
|
||||
text: "first prompt".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
crate::tasks::RegularTask::new(),
|
||||
)
|
||||
.await;
|
||||
@@ -326,23 +329,51 @@ async fn interrupting_regular_turn_waiting_on_startup_prewarm_emits_turn_aborted
|
||||
|
||||
sess.abort_all_tasks(TurnAbortReason::Interrupted).await;
|
||||
|
||||
let second = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv())
|
||||
.await
|
||||
.expect("expected turn aborted event")
|
||||
.expect("channel open");
|
||||
let EventMsg::TurnAborted(TurnAbortedEvent {
|
||||
let aborted = loop {
|
||||
let event = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv())
|
||||
.await
|
||||
.expect("expected turn aborted event")
|
||||
.expect("channel open");
|
||||
if let EventMsg::TurnAborted(event) = event.msg {
|
||||
break event;
|
||||
}
|
||||
};
|
||||
let TurnAbortedEvent {
|
||||
turn_id,
|
||||
reason,
|
||||
completed_at,
|
||||
duration_ms,
|
||||
}) = second.msg
|
||||
else {
|
||||
panic!("expected turn aborted event");
|
||||
};
|
||||
} = aborted;
|
||||
assert_eq!(turn_id, Some(tc.sub_id.clone()));
|
||||
assert_eq!(reason, TurnAbortReason::Interrupted);
|
||||
assert!(completed_at.is_some());
|
||||
assert!(duration_ms.is_some());
|
||||
|
||||
let history = sess.clone_history().await;
|
||||
let expected_prompt = user_message("first prompt");
|
||||
let prompt_idx = history
|
||||
.raw_items()
|
||||
.iter()
|
||||
.position(|item| item == &expected_prompt);
|
||||
let aborted_idx = history.raw_items().iter().position(|item| {
|
||||
let ResponseItem::Message { role, content, .. } = item else {
|
||||
return false;
|
||||
};
|
||||
role == "user"
|
||||
&& content.iter().any(|content_item| {
|
||||
let ContentItem::InputText { text } = content_item else {
|
||||
return false;
|
||||
};
|
||||
TurnAborted::matches_text(text)
|
||||
})
|
||||
});
|
||||
let (Some(prompt_idx), Some(aborted_idx)) = (prompt_idx, aborted_idx) else {
|
||||
panic!(
|
||||
"expected prompt and interrupted-turn marker in history: {:?}",
|
||||
history.raw_items()
|
||||
);
|
||||
};
|
||||
assert!(prompt_idx < aborted_idx);
|
||||
}
|
||||
|
||||
fn test_model_client_session() -> crate::client::ModelClientSession {
|
||||
|
||||
@@ -18,13 +18,9 @@ use crate::compact_remote::run_inline_remote_auto_compact_task;
|
||||
use crate::connectors;
|
||||
use crate::context::ContextualUserFragment;
|
||||
use crate::feedback_tags;
|
||||
use crate::hook_runtime::PendingInputHookDisposition;
|
||||
use crate::hook_runtime::PendingInputRecordOutcome;
|
||||
use crate::hook_runtime::emit_hook_completed_events;
|
||||
use crate::hook_runtime::inspect_pending_input;
|
||||
use crate::hook_runtime::record_additional_contexts;
|
||||
use crate::hook_runtime::record_pending_input;
|
||||
use crate::hook_runtime::run_pending_session_start_hooks;
|
||||
use crate::hook_runtime::run_user_prompt_submit_hooks;
|
||||
use crate::injection::ToolMentionKind;
|
||||
use crate::injection::app_id_from_path;
|
||||
use crate::injection::tool_kind_for_path;
|
||||
@@ -75,7 +71,6 @@ use codex_protocol::error::CodexErr;
|
||||
use codex_protocol::error::Result as CodexResult;
|
||||
use codex_protocol::items::PlanItem;
|
||||
use codex_protocol::items::TurnItem;
|
||||
use codex_protocol::items::UserMessageItem;
|
||||
use codex_protocol::items::build_hook_prompt_message;
|
||||
use codex_protocol::models::BaseInstructions;
|
||||
use codex_protocol::models::ContentItem;
|
||||
@@ -300,30 +295,12 @@ pub(crate) async fn run_turn(
|
||||
if run_pending_session_start_hooks(&sess, &turn_context).await {
|
||||
return None;
|
||||
}
|
||||
let additional_contexts = if input.is_empty() {
|
||||
Vec::new()
|
||||
} else {
|
||||
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input.clone());
|
||||
let response_item: ResponseItem = initial_input_for_turn.clone().into();
|
||||
let user_prompt_submit_outcome = run_user_prompt_submit_hooks(
|
||||
&sess,
|
||||
&turn_context,
|
||||
UserMessageItem::new(&input).message(),
|
||||
)
|
||||
.await;
|
||||
if user_prompt_submit_outcome.should_stop {
|
||||
record_additional_contexts(
|
||||
&sess,
|
||||
&turn_context,
|
||||
user_prompt_submit_outcome.additional_contexts,
|
||||
)
|
||||
.await;
|
||||
return None;
|
||||
}
|
||||
sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), &input, response_item)
|
||||
.await;
|
||||
user_prompt_submit_outcome.additional_contexts
|
||||
};
|
||||
if !input.is_empty()
|
||||
&& sess.record_next_pending_turn_input(&turn_context).await
|
||||
!= Some(PendingInputRecordOutcome::Recorded)
|
||||
{
|
||||
return None;
|
||||
}
|
||||
sess.services
|
||||
.analytics_events_client
|
||||
.track_app_mentioned(tracking.clone(), mentioned_app_invocations);
|
||||
@@ -334,7 +311,6 @@ pub(crate) async fn run_turn(
|
||||
}
|
||||
sess.merge_connector_selection(explicitly_enabled_connectors.clone())
|
||||
.await;
|
||||
record_additional_contexts(&sess, &turn_context, additional_contexts).await;
|
||||
if !input.is_empty() {
|
||||
// Track the previous-turn baseline from the regular user-turn path only so
|
||||
// standalone tasks (compact/shell/review/undo) cannot suppress future
|
||||
@@ -403,7 +379,7 @@ pub(crate) async fn run_turn(
|
||||
};
|
||||
// Pending input is drained into history before building the next model request.
|
||||
// However, we defer that drain until after sampling in two cases:
|
||||
// 1. At the start of a turn, so the fresh user prompt in `input` gets sampled first.
|
||||
// 1. At the start of a turn, so work queued behind the fresh prompt is sampled later.
|
||||
// 2. After auto-compact, when model/tool continuation needs to resume before any steer.
|
||||
let mut can_drain_pending_input = input.is_empty();
|
||||
|
||||
@@ -412,35 +388,18 @@ pub(crate) async fn run_turn(
|
||||
break;
|
||||
}
|
||||
|
||||
// Note that pending_input would be something like a message the user
|
||||
// submitted through the UI while the model was running. Though the UI
|
||||
// may support this, the model might not.
|
||||
let pending_input = if can_drain_pending_input {
|
||||
sess.get_pending_input().await
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
|
||||
let mut blocked_pending_input = false;
|
||||
let mut blocked_pending_input_contexts = Vec::new();
|
||||
let mut requeued_pending_input = false;
|
||||
let mut accepted_pending_input = Vec::new();
|
||||
if !pending_input.is_empty() {
|
||||
let mut pending_input_iter = pending_input.into_iter();
|
||||
while let Some(pending_input_item) = pending_input_iter.next() {
|
||||
match inspect_pending_input(&sess, &turn_context, pending_input_item).await {
|
||||
PendingInputHookDisposition::Accepted(pending_input) => {
|
||||
accepted_pending_input.push(*pending_input);
|
||||
let mut has_accepted_pending_input = false;
|
||||
if can_drain_pending_input {
|
||||
// Note that pending input would be something like a message the user
|
||||
// submitted through the UI while the model was running. Though the UI
|
||||
// may support this, the model might not.
|
||||
while let Some(outcome) = sess.record_next_pending_turn_input(&turn_context).await {
|
||||
match outcome {
|
||||
PendingInputRecordOutcome::Recorded => {
|
||||
has_accepted_pending_input = true;
|
||||
}
|
||||
PendingInputHookDisposition::Blocked {
|
||||
additional_contexts,
|
||||
} => {
|
||||
let remaining_pending_input = pending_input_iter.collect::<Vec<_>>();
|
||||
if !remaining_pending_input.is_empty() {
|
||||
let _ = sess.prepend_pending_input(remaining_pending_input).await;
|
||||
requeued_pending_input = true;
|
||||
}
|
||||
blocked_pending_input_contexts = additional_contexts;
|
||||
PendingInputRecordOutcome::Blocked => {
|
||||
blocked_pending_input = true;
|
||||
break;
|
||||
}
|
||||
@@ -448,14 +407,8 @@ pub(crate) async fn run_turn(
|
||||
}
|
||||
}
|
||||
|
||||
let has_accepted_pending_input = !accepted_pending_input.is_empty();
|
||||
for pending_input in accepted_pending_input {
|
||||
record_pending_input(&sess, &turn_context, pending_input).await;
|
||||
}
|
||||
record_additional_contexts(&sess, &turn_context, blocked_pending_input_contexts).await;
|
||||
|
||||
if blocked_pending_input && !has_accepted_pending_input {
|
||||
if requeued_pending_input {
|
||||
if sess.has_pending_input().await {
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
|
||||
@@ -6,6 +6,7 @@ pub(crate) use service::SessionServices;
|
||||
pub(crate) use session::SessionState;
|
||||
pub(crate) use turn::ActiveTurn;
|
||||
pub(crate) use turn::MailboxDeliveryPhase;
|
||||
pub(crate) use turn::PendingTurnInput;
|
||||
pub(crate) use turn::RunningTask;
|
||||
pub(crate) use turn::TaskKind;
|
||||
pub(crate) use turn::TurnState;
|
||||
|
||||
@@ -13,6 +13,7 @@ use codex_protocol::dynamic_tools::DynamicToolResponse;
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
use codex_protocol::request_permissions::RequestPermissionsResponse;
|
||||
use codex_protocol::request_user_input::RequestUserInputResponse;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use codex_rmcp_client::ElicitationResponse;
|
||||
use rmcp::model::RequestId;
|
||||
use tokio::sync::oneshot;
|
||||
@@ -77,6 +78,38 @@ pub(crate) struct RunningTask {
|
||||
pub(crate) _timer: Option<codex_otel::Timer>,
|
||||
}
|
||||
|
||||
/// Input queued for ordered transcript recording during an active turn.
|
||||
///
|
||||
/// User prompts keep the original `UserInput` so client-visible turn items can
|
||||
/// preserve UI-only spans such as `text_elements`; model-only response input
|
||||
/// items can stay in their serialized Responses API form.
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub(crate) enum PendingTurnInput {
|
||||
UserInput(Vec<UserInput>),
|
||||
ResponseInputItem(ResponseInputItem),
|
||||
}
|
||||
|
||||
impl From<PendingTurnInput> for ResponseInputItem {
|
||||
fn from(value: PendingTurnInput) -> Self {
|
||||
match value {
|
||||
PendingTurnInput::UserInput(input) => input.into(),
|
||||
PendingTurnInput::ResponseInputItem(input) => input,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Vec<UserInput>> for PendingTurnInput {
|
||||
fn from(value: Vec<UserInput>) -> Self {
|
||||
Self::UserInput(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ResponseInputItem> for PendingTurnInput {
|
||||
fn from(value: ResponseInputItem) -> Self {
|
||||
Self::ResponseInputItem(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl ActiveTurn {
|
||||
pub(crate) fn add_task(&mut self, task: RunningTask) {
|
||||
let sub_id = task.turn_context.sub_id.clone();
|
||||
@@ -101,7 +134,7 @@ pub(crate) struct TurnState {
|
||||
pending_user_input: HashMap<String, oneshot::Sender<RequestUserInputResponse>>,
|
||||
pending_elicitations: HashMap<(String, RequestId), oneshot::Sender<ElicitationResponse>>,
|
||||
pending_dynamic_tools: HashMap<String, oneshot::Sender<DynamicToolResponse>>,
|
||||
pending_input: Vec<ResponseInputItem>,
|
||||
pending_input: Vec<PendingTurnInput>,
|
||||
mailbox_delivery_phase: MailboxDeliveryPhase,
|
||||
granted_permissions: Option<PermissionProfile>,
|
||||
pub(crate) tool_calls: u64,
|
||||
@@ -198,11 +231,12 @@ impl TurnState {
|
||||
self.pending_dynamic_tools.remove(key)
|
||||
}
|
||||
|
||||
pub(crate) fn push_pending_input(&mut self, input: ResponseInputItem) {
|
||||
self.pending_input.push(input);
|
||||
pub(crate) fn push_pending_input(&mut self, input: impl Into<PendingTurnInput>) {
|
||||
self.pending_input.push(input.into());
|
||||
}
|
||||
|
||||
pub(crate) fn prepend_pending_input(&mut self, mut input: Vec<ResponseInputItem>) {
|
||||
#[cfg(test)]
|
||||
pub(crate) fn prepend_pending_input(&mut self, mut input: Vec<PendingTurnInput>) {
|
||||
if input.is_empty() {
|
||||
return;
|
||||
}
|
||||
@@ -211,7 +245,19 @@ impl TurnState {
|
||||
self.pending_input = input;
|
||||
}
|
||||
|
||||
pub(crate) fn take_pending_input(&mut self) -> Vec<ResponseInputItem> {
|
||||
pub(crate) fn front_pending_input(&self) -> Option<PendingTurnInput> {
|
||||
self.pending_input.first().cloned()
|
||||
}
|
||||
|
||||
pub(crate) fn pop_front_pending_input(&mut self) -> Option<PendingTurnInput> {
|
||||
if self.pending_input.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(self.pending_input.remove(0))
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn take_pending_input(&mut self) -> Vec<PendingTurnInput> {
|
||||
if self.pending_input.is_empty() {
|
||||
Vec::with_capacity(0)
|
||||
} else {
|
||||
|
||||
@@ -20,13 +20,11 @@ use tracing::trace;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::context::ContextualUserFragment;
|
||||
use crate::hook_runtime::PendingInputHookDisposition;
|
||||
use crate::hook_runtime::inspect_pending_input;
|
||||
use crate::hook_runtime::record_additional_contexts;
|
||||
use crate::hook_runtime::record_pending_input;
|
||||
use crate::hook_runtime::record_pending_turn_input;
|
||||
use crate::session::session::Session;
|
||||
use crate::session::turn_context::TurnContext;
|
||||
use crate::state::ActiveTurn;
|
||||
use crate::state::PendingTurnInput;
|
||||
use crate::state::RunningTask;
|
||||
use crate::state::TaskKind;
|
||||
use codex_analytics::TurnTokenUsageFact;
|
||||
@@ -38,7 +36,6 @@ use codex_otel::TURN_MEMORY_METRIC;
|
||||
use codex_otel::TURN_NETWORK_PROXY_METRIC;
|
||||
use codex_otel::TURN_TOKEN_USAGE_METRIC;
|
||||
use codex_otel::TURN_TOOL_CALL_METRIC;
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
@@ -149,6 +146,11 @@ pub(crate) trait SessionTask: Send + Sync + 'static {
|
||||
/// Returns the tracing name for a spawned task span.
|
||||
fn span_name(&self) -> &'static str;
|
||||
|
||||
/// Whether the submitted user input should be queued for ordered transcript recording.
|
||||
fn queues_initial_input(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
/// Executes the task until completion or cancellation.
|
||||
///
|
||||
/// Implementations typically stream protocol events using `session` and
|
||||
@@ -186,6 +188,8 @@ pub(crate) trait AnySessionTask: Send + Sync + 'static {
|
||||
|
||||
fn span_name(&self) -> &'static str;
|
||||
|
||||
fn queues_initial_input(&self) -> bool;
|
||||
|
||||
fn run(
|
||||
self: Arc<Self>,
|
||||
session: Arc<SessionTaskContext>,
|
||||
@@ -213,6 +217,10 @@ where
|
||||
SessionTask::span_name(self)
|
||||
}
|
||||
|
||||
fn queues_initial_input(&self) -> bool {
|
||||
SessionTask::queues_initial_input(self)
|
||||
}
|
||||
|
||||
fn run(
|
||||
self: Arc<Self>,
|
||||
session: Arc<SessionTaskContext>,
|
||||
@@ -259,6 +267,7 @@ impl Session {
|
||||
let task: Arc<dyn AnySessionTask> = Arc::new(task);
|
||||
let task_kind = task.kind();
|
||||
let span_name = task.span_name();
|
||||
let queue_initial_input = task.queues_initial_input() && !input.is_empty();
|
||||
let started_at = Instant::now();
|
||||
turn_context
|
||||
.turn_timing_state
|
||||
@@ -270,7 +279,7 @@ impl Session {
|
||||
let done = Arc::new(Notify::new());
|
||||
|
||||
let queued_response_items = self.take_queued_response_items_for_next_turn().await;
|
||||
let mailbox_items = self.get_pending_input().await;
|
||||
let mailbox_items = self.get_pending_turn_input().await;
|
||||
let turn_state = {
|
||||
let mut active = self.active_turn.lock().await;
|
||||
let turn = active.get_or_insert_with(ActiveTurn::default);
|
||||
@@ -280,6 +289,9 @@ impl Session {
|
||||
{
|
||||
let mut turn_state = turn_state.lock().await;
|
||||
turn_state.token_usage_at_turn_start = token_usage_at_turn_start;
|
||||
if queue_initial_input {
|
||||
turn_state.push_pending_input(PendingTurnInput::UserInput(input.clone()));
|
||||
}
|
||||
for item in queued_response_items {
|
||||
turn_state.push_pending_input(item);
|
||||
}
|
||||
@@ -398,8 +410,10 @@ impl Session {
|
||||
|
||||
pub async fn abort_all_tasks(self: &Arc<Self>, reason: TurnAbortReason) {
|
||||
if let Some(mut active_turn) = self.take_active_turn().await {
|
||||
let turn_state = Arc::clone(&active_turn.turn_state);
|
||||
for task in active_turn.drain_tasks() {
|
||||
self.handle_task_abort(task, reason.clone()).await;
|
||||
self.handle_task_abort(task, reason.clone(), Arc::clone(&turn_state))
|
||||
.await;
|
||||
}
|
||||
// Let interrupted tasks observe cancellation before dropping pending approvals, or an
|
||||
// in-flight approval wait can surface as a model-visible rejection before TurnAborted.
|
||||
@@ -419,7 +433,7 @@ impl Session {
|
||||
.turn_metadata_state
|
||||
.cancel_git_enrichment_task();
|
||||
|
||||
let mut pending_input = Vec::<ResponseInputItem>::new();
|
||||
let mut pending_input = Vec::<PendingTurnInput>::new();
|
||||
let mut should_clear_active_turn = false;
|
||||
let mut token_usage_at_turn_start = None;
|
||||
let mut turn_had_memory_citation = false;
|
||||
@@ -448,16 +462,7 @@ impl Session {
|
||||
}
|
||||
if !pending_input.is_empty() {
|
||||
for pending_input_item in pending_input {
|
||||
match inspect_pending_input(self, &turn_context, pending_input_item).await {
|
||||
PendingInputHookDisposition::Accepted(pending_input) => {
|
||||
record_pending_input(self, &turn_context, *pending_input).await;
|
||||
}
|
||||
PendingInputHookDisposition::Blocked {
|
||||
additional_contexts,
|
||||
} => {
|
||||
record_additional_contexts(self, &turn_context, additional_contexts).await;
|
||||
}
|
||||
}
|
||||
record_pending_turn_input(self, &turn_context, pending_input_item).await;
|
||||
}
|
||||
}
|
||||
// Emit token usage metrics.
|
||||
@@ -594,7 +599,12 @@ impl Session {
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_task_abort(self: &Arc<Self>, task: RunningTask, reason: TurnAbortReason) {
|
||||
async fn handle_task_abort(
|
||||
self: &Arc<Self>,
|
||||
task: RunningTask,
|
||||
reason: TurnAbortReason,
|
||||
turn_state: Arc<tokio::sync::Mutex<crate::state::TurnState>>,
|
||||
) {
|
||||
let sub_id = task.turn_context.sub_id.clone();
|
||||
if task.cancellation_token.is_cancelled() {
|
||||
return;
|
||||
@@ -624,6 +634,11 @@ impl Session {
|
||||
|
||||
if reason == TurnAbortReason::Interrupted {
|
||||
self.cleanup_after_interrupt(&task.turn_context).await;
|
||||
while self
|
||||
.record_next_pending_turn_input_from_state(&task.turn_context, &turn_state)
|
||||
.await
|
||||
.is_some()
|
||||
{}
|
||||
|
||||
let marker = interrupted_turn_history_marker();
|
||||
self.record_into_history(std::slice::from_ref(&marker), task.turn_context.as_ref())
|
||||
|
||||
@@ -33,6 +33,10 @@ impl SessionTask for RegularTask {
|
||||
"session_task.turn"
|
||||
}
|
||||
|
||||
fn queues_initial_input(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
async fn run(
|
||||
self: Arc<Self>,
|
||||
session: Arc<SessionTaskContext>,
|
||||
|
||||
Reference in New Issue
Block a user