mirror of
https://github.com/openai/codex.git
synced 2026-05-18 02:02:30 +00:00
Compare commits
4 Commits
xli-codex/
...
migrate_co
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
41e9266833 | ||
|
|
91efb4efc3 | ||
|
|
c9686130ce | ||
|
|
0af13d5c6a |
@@ -1798,7 +1798,12 @@ pub(crate) async fn run_task(
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
auto_compact_recently_attempted = true;
|
auto_compact_recently_attempted = true;
|
||||||
compact::run_inline_auto_compact_task(sess.clone(), turn_context.clone()).await;
|
compact::run_inline_auto_compact_task(
|
||||||
|
sess.clone(),
|
||||||
|
turn_context.clone(),
|
||||||
|
cancellation_token.child_token(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -2,28 +2,16 @@ use std::sync::Arc;
|
|||||||
|
|
||||||
use super::Session;
|
use super::Session;
|
||||||
use super::TurnContext;
|
use super::TurnContext;
|
||||||
use super::get_last_assistant_message_from_turn;
|
|
||||||
use crate::Prompt;
|
|
||||||
use crate::client_common::ResponseEvent;
|
|
||||||
use crate::error::CodexErr;
|
|
||||||
use crate::error::Result as CodexResult;
|
|
||||||
use crate::protocol::AgentMessageEvent;
|
|
||||||
use crate::protocol::CompactedItem;
|
use crate::protocol::CompactedItem;
|
||||||
use crate::protocol::ErrorEvent;
|
|
||||||
use crate::protocol::EventMsg;
|
|
||||||
use crate::protocol::TaskStartedEvent;
|
|
||||||
use crate::protocol::TurnContextItem;
|
use crate::protocol::TurnContextItem;
|
||||||
use crate::truncate::truncate_middle;
|
use crate::truncate::truncate_middle;
|
||||||
use crate::util::backoff;
|
|
||||||
use askama::Template;
|
use askama::Template;
|
||||||
use codex_protocol::items::TurnItem;
|
use codex_protocol::items::TurnItem;
|
||||||
use codex_protocol::models::ContentItem;
|
use codex_protocol::models::ContentItem;
|
||||||
use codex_protocol::models::ResponseInputItem;
|
|
||||||
use codex_protocol::models::ResponseItem;
|
use codex_protocol::models::ResponseItem;
|
||||||
use codex_protocol::protocol::RolloutItem;
|
use codex_protocol::protocol::RolloutItem;
|
||||||
use codex_protocol::user_input::UserInput;
|
use codex_protocol::user_input::UserInput;
|
||||||
use futures::prelude::*;
|
use tokio_util::sync::CancellationToken;
|
||||||
use tracing::error;
|
|
||||||
|
|
||||||
pub const SUMMARIZATION_PROMPT: &str = include_str!("../../templates/compact/prompt.md");
|
pub const SUMMARIZATION_PROMPT: &str = include_str!("../../templates/compact/prompt.md");
|
||||||
const COMPACT_USER_MESSAGE_MAX_TOKENS: usize = 20_000;
|
const COMPACT_USER_MESSAGE_MAX_TOKENS: usize = 20_000;
|
||||||
@@ -38,137 +26,57 @@ struct HistoryBridgeTemplate<'a> {
|
|||||||
pub(crate) async fn run_inline_auto_compact_task(
|
pub(crate) async fn run_inline_auto_compact_task(
|
||||||
sess: Arc<Session>,
|
sess: Arc<Session>,
|
||||||
turn_context: Arc<TurnContext>,
|
turn_context: Arc<TurnContext>,
|
||||||
|
cancellation_token: CancellationToken,
|
||||||
) {
|
) {
|
||||||
|
persist_turn_context_rollout(&sess, &turn_context).await;
|
||||||
|
|
||||||
let input = vec![UserInput::Text {
|
let input = vec![UserInput::Text {
|
||||||
text: SUMMARIZATION_PROMPT.to_string(),
|
text: SUMMARIZATION_PROMPT.to_string(),
|
||||||
}];
|
}];
|
||||||
run_compact_task_inner(sess, turn_context, input).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) async fn run_compact_task(
|
// Build forked history from parent to seed sub-agent.
|
||||||
sess: Arc<Session>,
|
let history_snapshot = sess.clone_history().await.get_history();
|
||||||
turn_context: Arc<TurnContext>,
|
let forked: Vec<RolloutItem> = history_snapshot
|
||||||
input: Vec<UserInput>,
|
.iter()
|
||||||
) -> Option<String> {
|
.cloned()
|
||||||
let start_event = EventMsg::TaskStarted(TaskStartedEvent {
|
.map(RolloutItem::ResponseItem)
|
||||||
model_context_window: turn_context.client.get_model_context_window(),
|
.collect();
|
||||||
});
|
|
||||||
sess.send_event(&turn_context, start_event).await;
|
|
||||||
run_compact_task_inner(sess.clone(), turn_context, input).await;
|
|
||||||
None
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn run_compact_task_inner(
|
// Launch sub-agent one-shot; drain to completion and capture summary.
|
||||||
sess: Arc<Session>,
|
let config = turn_context.client.config().as_ref().clone();
|
||||||
turn_context: Arc<TurnContext>,
|
if let Ok(io) = crate::codex_delegate::run_codex_conversation_one_shot(
|
||||||
input: Vec<UserInput>,
|
crate::codex_delegate::SubAgentRunParams {
|
||||||
) {
|
config,
|
||||||
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
|
auth_manager: sess.services.auth_manager.clone(),
|
||||||
|
initial_history: Some(codex_protocol::protocol::InitialHistory::Forked(forked)),
|
||||||
let mut history = sess.clone_history().await;
|
sub_source: codex_protocol::protocol::SubAgentSource::Compact,
|
||||||
history.record_items(&[initial_input_for_turn.into()]);
|
parent_session: Arc::clone(&sess),
|
||||||
|
parent_ctx: Arc::clone(&turn_context),
|
||||||
let mut truncated_count = 0usize;
|
cancel_token: cancellation_token.child_token(),
|
||||||
|
},
|
||||||
let max_retries = turn_context.client.get_provider().stream_max_retries();
|
input,
|
||||||
let mut retries = 0;
|
)
|
||||||
|
.await
|
||||||
let rollout_item = RolloutItem::TurnContext(TurnContextItem {
|
{
|
||||||
cwd: turn_context.cwd.clone(),
|
let mut summary_text: Option<String> = None;
|
||||||
approval_policy: turn_context.approval_policy,
|
while let Ok(event) = io.next_event().await {
|
||||||
sandbox_policy: turn_context.sandbox_policy.clone(),
|
if let crate::protocol::EventMsg::TaskComplete(tc) = event.msg {
|
||||||
model: turn_context.client.get_model(),
|
summary_text = tc.last_agent_message;
|
||||||
effort: turn_context.client.get_reasoning_effort(),
|
|
||||||
summary: turn_context.client.get_reasoning_summary(),
|
|
||||||
});
|
|
||||||
sess.persist_rollout_items(&[rollout_item]).await;
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let turn_input = history.get_history_for_prompt();
|
|
||||||
let prompt = Prompt {
|
|
||||||
input: turn_input.clone(),
|
|
||||||
..Default::default()
|
|
||||||
};
|
|
||||||
let attempt_result = drain_to_completed(&sess, turn_context.as_ref(), &prompt).await;
|
|
||||||
|
|
||||||
match attempt_result {
|
|
||||||
Ok(()) => {
|
|
||||||
if truncated_count > 0 {
|
|
||||||
sess.notify_background_event(
|
|
||||||
turn_context.as_ref(),
|
|
||||||
format!(
|
|
||||||
"Trimmed {truncated_count} older conversation item(s) before compacting so the prompt fits the model context window."
|
|
||||||
),
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
Err(CodexErr::Interrupted) => {
|
if matches!(event.msg, crate::protocol::EventMsg::TurnAborted(_)) {
|
||||||
return;
|
break;
|
||||||
}
|
|
||||||
Err(e @ CodexErr::ContextWindowExceeded) => {
|
|
||||||
if turn_input.len() > 1 {
|
|
||||||
// Trim from the beginning to preserve cache (prefix-based) and keep recent messages intact.
|
|
||||||
error!(
|
|
||||||
"Context window exceeded while compacting; removing oldest history item. Error: {e}"
|
|
||||||
);
|
|
||||||
history.remove_first_item();
|
|
||||||
truncated_count += 1;
|
|
||||||
retries = 0;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
sess.set_total_tokens_full(turn_context.as_ref()).await;
|
|
||||||
let event = EventMsg::Error(ErrorEvent {
|
|
||||||
message: e.to_string(),
|
|
||||||
});
|
|
||||||
sess.send_event(&turn_context, event).await;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
if retries < max_retries {
|
|
||||||
retries += 1;
|
|
||||||
let delay = backoff(retries);
|
|
||||||
sess.notify_stream_error(
|
|
||||||
turn_context.as_ref(),
|
|
||||||
format!("Reconnecting... {retries}/{max_retries}"),
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
tokio::time::sleep(delay).await;
|
|
||||||
continue;
|
|
||||||
} else {
|
|
||||||
let event = EventMsg::Error(ErrorEvent {
|
|
||||||
message: e.to_string(),
|
|
||||||
});
|
|
||||||
sess.send_event(&turn_context, event).await;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if let Some(summary) = summary_text {
|
||||||
|
apply_compaction(&sess, &turn_context, &summary).await;
|
||||||
|
let event =
|
||||||
|
crate::protocol::EventMsg::AgentMessage(crate::protocol::AgentMessageEvent {
|
||||||
|
message: "Compact task completed".to_string(),
|
||||||
|
});
|
||||||
|
sess.send_event(&Arc::clone(&turn_context), event).await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let history_snapshot = sess.clone_history().await.get_history();
|
|
||||||
let summary_text = get_last_assistant_message_from_turn(&history_snapshot).unwrap_or_default();
|
|
||||||
let user_messages = collect_user_messages(&history_snapshot);
|
|
||||||
let initial_context = sess.build_initial_context(turn_context.as_ref());
|
|
||||||
let mut new_history = build_compacted_history(initial_context, &user_messages, &summary_text);
|
|
||||||
let ghost_snapshots: Vec<ResponseItem> = history_snapshot
|
|
||||||
.iter()
|
|
||||||
.filter(|item| matches!(item, ResponseItem::GhostSnapshot { .. }))
|
|
||||||
.cloned()
|
|
||||||
.collect();
|
|
||||||
new_history.extend(ghost_snapshots);
|
|
||||||
sess.replace_history(new_history).await;
|
|
||||||
|
|
||||||
let rollout_item = RolloutItem::Compacted(CompactedItem {
|
|
||||||
message: summary_text.clone(),
|
|
||||||
});
|
|
||||||
sess.persist_rollout_items(&[rollout_item]).await;
|
|
||||||
|
|
||||||
let event = EventMsg::AgentMessage(AgentMessageEvent {
|
|
||||||
message: "Compact task completed".to_string(),
|
|
||||||
});
|
|
||||||
sess.send_event(&turn_context, event).await;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn content_items_to_text(content: &[ContentItem]) -> Option<String> {
|
pub fn content_items_to_text(content: &[ContentItem]) -> Option<String> {
|
||||||
@@ -249,36 +157,45 @@ fn build_compacted_history_with_limit(
|
|||||||
history
|
history
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn drain_to_completed(
|
// streaming helpers removed; compact now uses the Codex delegate for sampling.
|
||||||
sess: &Session,
|
|
||||||
turn_context: &TurnContext,
|
/// Apply compaction to the parent session given a summary text: rebuild the
|
||||||
prompt: &Prompt,
|
/// conversation with a bridge message, preserve ghost snapshots, persist the
|
||||||
) -> CodexResult<()> {
|
/// Compacted rollout entry, and replace history.
|
||||||
let mut stream = turn_context.client.clone().stream(prompt).await?;
|
pub(crate) async fn apply_compaction(
|
||||||
loop {
|
sess: &Arc<Session>,
|
||||||
let maybe_event = stream.next().await;
|
turn_context: &Arc<TurnContext>,
|
||||||
let Some(event) = maybe_event else {
|
summary_text: &str,
|
||||||
return Err(CodexErr::Stream(
|
) {
|
||||||
"stream closed before response.completed".into(),
|
let history_snapshot = sess.clone_history().await.get_history();
|
||||||
None,
|
let user_messages = collect_user_messages(&history_snapshot);
|
||||||
));
|
let initial_context = sess.build_initial_context(turn_context.as_ref());
|
||||||
};
|
let mut new_history = build_compacted_history(initial_context, &user_messages, summary_text);
|
||||||
match event {
|
let ghost_snapshots: Vec<ResponseItem> = history_snapshot
|
||||||
Ok(ResponseEvent::OutputItemDone(item)) => {
|
.iter()
|
||||||
sess.record_into_history(std::slice::from_ref(&item)).await;
|
.filter(|item| matches!(item, ResponseItem::GhostSnapshot { .. }))
|
||||||
}
|
.cloned()
|
||||||
Ok(ResponseEvent::RateLimits(snapshot)) => {
|
.collect();
|
||||||
sess.update_rate_limits(turn_context, snapshot).await;
|
new_history.extend(ghost_snapshots);
|
||||||
}
|
sess.replace_history(new_history).await;
|
||||||
Ok(ResponseEvent::Completed { token_usage, .. }) => {
|
|
||||||
sess.update_token_usage_info(turn_context, token_usage.as_ref())
|
let rollout_item = RolloutItem::Compacted(CompactedItem {
|
||||||
.await;
|
message: summary_text.to_string(),
|
||||||
return Ok(());
|
});
|
||||||
}
|
sess.persist_rollout_items(&[rollout_item]).await;
|
||||||
Ok(_) => continue,
|
}
|
||||||
Err(e) => return Err(e),
|
|
||||||
}
|
/// Persist a TurnContext rollout entry capturing the model/session context.
|
||||||
}
|
pub(crate) async fn persist_turn_context_rollout(sess: &Session, turn_context: &TurnContext) {
|
||||||
|
let rollout_item = RolloutItem::TurnContext(TurnContextItem {
|
||||||
|
cwd: turn_context.cwd.clone(),
|
||||||
|
approval_policy: turn_context.approval_policy,
|
||||||
|
sandbox_policy: turn_context.sandbox_policy.clone(),
|
||||||
|
model: turn_context.client.get_model(),
|
||||||
|
effort: turn_context.client.get_reasoning_effort(),
|
||||||
|
summary: turn_context.client.get_reasoning_summary(),
|
||||||
|
});
|
||||||
|
sess.persist_rollout_items(&[rollout_item]).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
@@ -25,38 +25,48 @@ use crate::config::Config;
|
|||||||
use crate::error::CodexErr;
|
use crate::error::CodexErr;
|
||||||
use codex_protocol::protocol::InitialHistory;
|
use codex_protocol::protocol::InitialHistory;
|
||||||
|
|
||||||
|
/// Parameters for running a sub-agent (delegate) conversation.
|
||||||
|
pub(crate) struct SubAgentRunParams {
|
||||||
|
pub config: Config,
|
||||||
|
pub auth_manager: Arc<AuthManager>,
|
||||||
|
pub initial_history: Option<InitialHistory>,
|
||||||
|
pub sub_source: SubAgentSource,
|
||||||
|
pub parent_session: Arc<Session>,
|
||||||
|
pub parent_ctx: Arc<TurnContext>,
|
||||||
|
pub cancel_token: CancellationToken,
|
||||||
|
}
|
||||||
|
|
||||||
/// Start an interactive sub-Codex conversation and return IO channels.
|
/// Start an interactive sub-Codex conversation and return IO channels.
|
||||||
///
|
///
|
||||||
/// The returned `events_rx` yields non-approval events emitted by the sub-agent.
|
/// The returned `events_rx` yields non-approval events emitted by the sub-agent.
|
||||||
/// Approval requests are handled via `parent_session` and are not surfaced.
|
/// Approval requests are handled via `parent_session` and are not surfaced.
|
||||||
/// The returned `ops_tx` allows the caller to submit additional `Op`s to the sub-agent.
|
/// The returned `ops_tx` allows the caller to submit additional `Op`s to the sub-agent.
|
||||||
pub(crate) async fn run_codex_conversation_interactive(
|
pub(crate) async fn run_codex_conversation_interactive(
|
||||||
config: Config,
|
params: SubAgentRunParams,
|
||||||
auth_manager: Arc<AuthManager>,
|
|
||||||
parent_session: Arc<Session>,
|
|
||||||
parent_ctx: Arc<TurnContext>,
|
|
||||||
cancel_token: CancellationToken,
|
|
||||||
) -> Result<Codex, CodexErr> {
|
) -> Result<Codex, CodexErr> {
|
||||||
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
|
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
|
||||||
let (tx_ops, rx_ops) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
|
let (tx_ops, rx_ops) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
|
||||||
|
|
||||||
let CodexSpawnOk { codex, .. } = Codex::spawn(
|
let CodexSpawnOk { codex, .. } = Codex::spawn(
|
||||||
config,
|
params.config.clone(),
|
||||||
auth_manager,
|
Arc::clone(¶ms.auth_manager),
|
||||||
InitialHistory::New,
|
params
|
||||||
SessionSource::SubAgent(SubAgentSource::Review),
|
.initial_history
|
||||||
|
.clone()
|
||||||
|
.unwrap_or(InitialHistory::New),
|
||||||
|
SessionSource::SubAgent(params.sub_source.clone()),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
let codex = Arc::new(codex);
|
let codex = Arc::new(codex);
|
||||||
|
|
||||||
// Use a child token so parent cancel cascades but we can scope it to this task
|
// Use a child token so parent cancel cascades but we can scope it to this task
|
||||||
let cancel_token_events = cancel_token.child_token();
|
let cancel_token_events = params.cancel_token.child_token();
|
||||||
let cancel_token_ops = cancel_token.child_token();
|
let cancel_token_ops = params.cancel_token.child_token();
|
||||||
|
|
||||||
// Forward events from the sub-agent to the consumer, filtering approvals and
|
// Forward events from the sub-agent to the consumer, filtering approvals and
|
||||||
// routing them to the parent session for decisions.
|
// routing them to the parent session for decisions.
|
||||||
let parent_session_clone = Arc::clone(&parent_session);
|
let parent_session_clone = Arc::clone(¶ms.parent_session);
|
||||||
let parent_ctx_clone = Arc::clone(&parent_ctx);
|
let parent_ctx_clone = Arc::clone(¶ms.parent_ctx);
|
||||||
let codex_for_events = Arc::clone(&codex);
|
let codex_for_events = Arc::clone(&codex);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let _ = forward_events(
|
let _ = forward_events(
|
||||||
@@ -87,24 +97,14 @@ pub(crate) async fn run_codex_conversation_interactive(
|
|||||||
///
|
///
|
||||||
/// Internally calls the interactive variant, then immediately submits the provided input.
|
/// Internally calls the interactive variant, then immediately submits the provided input.
|
||||||
pub(crate) async fn run_codex_conversation_one_shot(
|
pub(crate) async fn run_codex_conversation_one_shot(
|
||||||
config: Config,
|
mut params: SubAgentRunParams,
|
||||||
auth_manager: Arc<AuthManager>,
|
|
||||||
input: Vec<UserInput>,
|
input: Vec<UserInput>,
|
||||||
parent_session: Arc<Session>,
|
|
||||||
parent_ctx: Arc<TurnContext>,
|
|
||||||
cancel_token: CancellationToken,
|
|
||||||
) -> Result<Codex, CodexErr> {
|
) -> Result<Codex, CodexErr> {
|
||||||
// Use a child token so we can stop the delegate after completion without
|
// Use a child token so we can stop the delegate after completion without
|
||||||
// requiring the caller to cancel the parent token.
|
// requiring the caller to cancel the parent token.
|
||||||
let child_cancel = cancel_token.child_token();
|
let child_cancel = params.cancel_token.child_token();
|
||||||
let io = run_codex_conversation_interactive(
|
params.cancel_token = child_cancel.clone();
|
||||||
config,
|
let io = run_codex_conversation_interactive(params).await?;
|
||||||
auth_manager,
|
|
||||||
parent_session,
|
|
||||||
parent_ctx,
|
|
||||||
child_cancel.clone(),
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
// Send the initial input to kick off the one-shot turn.
|
// Send the initial input to kick off the one-shot turn.
|
||||||
io.submit(Op::UserInput { items: input }).await?;
|
io.submit(Op::UserInput { items: input }).await?;
|
||||||
|
|||||||
@@ -77,18 +77,6 @@ impl ConversationHistory {
|
|||||||
history
|
history
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn remove_first_item(&mut self) {
|
|
||||||
if !self.items.is_empty() {
|
|
||||||
// Remove the oldest item (front of the list). Items are ordered from
|
|
||||||
// oldest → newest, so index 0 is the first entry recorded.
|
|
||||||
let removed = self.items.remove(0);
|
|
||||||
// If the removed item participates in a call/output pair, also remove
|
|
||||||
// its corresponding counterpart to keep the invariants intact without
|
|
||||||
// running a full normalization pass.
|
|
||||||
self.remove_corresponding_for(&removed);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn replace(&mut self, items: Vec<ResponseItem>) {
|
pub(crate) fn replace(&mut self, items: Vec<ResponseItem>) {
|
||||||
self.items = items;
|
self.items = items;
|
||||||
}
|
}
|
||||||
@@ -313,75 +301,6 @@ impl ConversationHistory {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Removes the corresponding paired item for the provided `item`, if any.
|
|
||||||
///
|
|
||||||
/// Pairs:
|
|
||||||
/// - FunctionCall <-> FunctionCallOutput
|
|
||||||
/// - CustomToolCall <-> CustomToolCallOutput
|
|
||||||
/// - LocalShellCall(call_id: Some) <-> FunctionCallOutput
|
|
||||||
fn remove_corresponding_for(&mut self, item: &ResponseItem) {
|
|
||||||
match item {
|
|
||||||
ResponseItem::FunctionCall { call_id, .. } => {
|
|
||||||
self.remove_first_matching(|i| match i {
|
|
||||||
ResponseItem::FunctionCallOutput {
|
|
||||||
call_id: existing, ..
|
|
||||||
} => existing == call_id,
|
|
||||||
_ => false,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
ResponseItem::CustomToolCall { call_id, .. } => {
|
|
||||||
self.remove_first_matching(|i| match i {
|
|
||||||
ResponseItem::CustomToolCallOutput {
|
|
||||||
call_id: existing, ..
|
|
||||||
} => existing == call_id,
|
|
||||||
_ => false,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
ResponseItem::LocalShellCall {
|
|
||||||
call_id: Some(call_id),
|
|
||||||
..
|
|
||||||
} => {
|
|
||||||
self.remove_first_matching(|i| match i {
|
|
||||||
ResponseItem::FunctionCallOutput {
|
|
||||||
call_id: existing, ..
|
|
||||||
} => existing == call_id,
|
|
||||||
_ => false,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
ResponseItem::FunctionCallOutput { call_id, .. } => {
|
|
||||||
self.remove_first_matching(|i| match i {
|
|
||||||
ResponseItem::FunctionCall {
|
|
||||||
call_id: existing, ..
|
|
||||||
} => existing == call_id,
|
|
||||||
ResponseItem::LocalShellCall {
|
|
||||||
call_id: Some(existing),
|
|
||||||
..
|
|
||||||
} => existing == call_id,
|
|
||||||
_ => false,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
ResponseItem::CustomToolCallOutput { call_id, .. } => {
|
|
||||||
self.remove_first_matching(|i| match i {
|
|
||||||
ResponseItem::CustomToolCall {
|
|
||||||
call_id: existing, ..
|
|
||||||
} => existing == call_id,
|
|
||||||
_ => false,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
_ => {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Remove the first item matching the predicate.
|
|
||||||
fn remove_first_matching<F>(&mut self, predicate: F)
|
|
||||||
where
|
|
||||||
F: FnMut(&ResponseItem) -> bool,
|
|
||||||
{
|
|
||||||
if let Some(pos) = self.items.iter().position(predicate) {
|
|
||||||
self.items.remove(pos);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn process_item(item: &ResponseItem) -> ResponseItem {
|
fn process_item(item: &ResponseItem) -> ResponseItem {
|
||||||
match item {
|
match item {
|
||||||
ResponseItem::FunctionCallOutput { call_id, output } => {
|
ResponseItem::FunctionCallOutput { call_id, output } => {
|
||||||
@@ -649,98 +568,6 @@ mod tests {
|
|||||||
assert_eq!(filtered, vec![]);
|
assert_eq!(filtered, vec![]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn remove_first_item_removes_matching_output_for_function_call() {
|
|
||||||
let items = vec![
|
|
||||||
ResponseItem::FunctionCall {
|
|
||||||
id: None,
|
|
||||||
name: "do_it".to_string(),
|
|
||||||
arguments: "{}".to_string(),
|
|
||||||
call_id: "call-1".to_string(),
|
|
||||||
},
|
|
||||||
ResponseItem::FunctionCallOutput {
|
|
||||||
call_id: "call-1".to_string(),
|
|
||||||
output: FunctionCallOutputPayload {
|
|
||||||
content: "ok".to_string(),
|
|
||||||
..Default::default()
|
|
||||||
},
|
|
||||||
},
|
|
||||||
];
|
|
||||||
let mut h = create_history_with_items(items);
|
|
||||||
h.remove_first_item();
|
|
||||||
assert_eq!(h.contents(), vec![]);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn remove_first_item_removes_matching_call_for_output() {
|
|
||||||
let items = vec![
|
|
||||||
ResponseItem::FunctionCallOutput {
|
|
||||||
call_id: "call-2".to_string(),
|
|
||||||
output: FunctionCallOutputPayload {
|
|
||||||
content: "ok".to_string(),
|
|
||||||
..Default::default()
|
|
||||||
},
|
|
||||||
},
|
|
||||||
ResponseItem::FunctionCall {
|
|
||||||
id: None,
|
|
||||||
name: "do_it".to_string(),
|
|
||||||
arguments: "{}".to_string(),
|
|
||||||
call_id: "call-2".to_string(),
|
|
||||||
},
|
|
||||||
];
|
|
||||||
let mut h = create_history_with_items(items);
|
|
||||||
h.remove_first_item();
|
|
||||||
assert_eq!(h.contents(), vec![]);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn remove_first_item_handles_local_shell_pair() {
|
|
||||||
let items = vec![
|
|
||||||
ResponseItem::LocalShellCall {
|
|
||||||
id: None,
|
|
||||||
call_id: Some("call-3".to_string()),
|
|
||||||
status: LocalShellStatus::Completed,
|
|
||||||
action: LocalShellAction::Exec(LocalShellExecAction {
|
|
||||||
command: vec!["echo".to_string(), "hi".to_string()],
|
|
||||||
timeout_ms: None,
|
|
||||||
working_directory: None,
|
|
||||||
env: None,
|
|
||||||
user: None,
|
|
||||||
}),
|
|
||||||
},
|
|
||||||
ResponseItem::FunctionCallOutput {
|
|
||||||
call_id: "call-3".to_string(),
|
|
||||||
output: FunctionCallOutputPayload {
|
|
||||||
content: "ok".to_string(),
|
|
||||||
..Default::default()
|
|
||||||
},
|
|
||||||
},
|
|
||||||
];
|
|
||||||
let mut h = create_history_with_items(items);
|
|
||||||
h.remove_first_item();
|
|
||||||
assert_eq!(h.contents(), vec![]);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn remove_first_item_handles_custom_tool_pair() {
|
|
||||||
let items = vec![
|
|
||||||
ResponseItem::CustomToolCall {
|
|
||||||
id: None,
|
|
||||||
status: None,
|
|
||||||
call_id: "tool-1".to_string(),
|
|
||||||
name: "my_tool".to_string(),
|
|
||||||
input: "{}".to_string(),
|
|
||||||
},
|
|
||||||
ResponseItem::CustomToolCallOutput {
|
|
||||||
call_id: "tool-1".to_string(),
|
|
||||||
output: "ok".to_string(),
|
|
||||||
},
|
|
||||||
];
|
|
||||||
let mut h = create_history_with_items(items);
|
|
||||||
h.remove_first_item();
|
|
||||||
assert_eq!(h.contents(), vec![]);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn record_items_truncates_function_call_output_content() {
|
fn record_items_truncates_function_call_output_content() {
|
||||||
let mut history = ConversationHistory::new();
|
let mut history = ConversationHistory::new();
|
||||||
|
|||||||
@@ -5,7 +5,13 @@ use tokio_util::sync::CancellationToken;
|
|||||||
|
|
||||||
use crate::codex::TurnContext;
|
use crate::codex::TurnContext;
|
||||||
use crate::codex::compact;
|
use crate::codex::compact;
|
||||||
|
use crate::codex_delegate::SubAgentRunParams;
|
||||||
|
use crate::codex_delegate::run_codex_conversation_one_shot;
|
||||||
|
use crate::protocol::EventMsg;
|
||||||
|
use crate::protocol::SubAgentSource;
|
||||||
use crate::state::TaskKind;
|
use crate::state::TaskKind;
|
||||||
|
use codex_protocol::models::ResponseItem;
|
||||||
|
use codex_protocol::protocol::RolloutItem;
|
||||||
use codex_protocol::user_input::UserInput;
|
use codex_protocol::user_input::UserInput;
|
||||||
|
|
||||||
use super::SessionTask;
|
use super::SessionTask;
|
||||||
@@ -25,8 +31,71 @@ impl SessionTask for CompactTask {
|
|||||||
session: Arc<SessionTaskContext>,
|
session: Arc<SessionTaskContext>,
|
||||||
ctx: Arc<TurnContext>,
|
ctx: Arc<TurnContext>,
|
||||||
input: Vec<UserInput>,
|
input: Vec<UserInput>,
|
||||||
_cancellation_token: CancellationToken,
|
cancellation_token: CancellationToken,
|
||||||
) -> Option<String> {
|
) -> Option<String> {
|
||||||
compact::run_compact_task(session.clone_session(), ctx, input).await
|
// Persist a TurnContext entry in the parent rollout so manual compact
|
||||||
|
// still appears as a separate API turn in rollout, matching prior behavior.
|
||||||
|
crate::codex::compact::persist_turn_context_rollout(
|
||||||
|
session.clone_session().as_ref(),
|
||||||
|
ctx.as_ref(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Build initial forked history from parent so the sub-agent sees the
|
||||||
|
// same context without mutating the parent transcript.
|
||||||
|
let parent_history: Vec<ResponseItem> =
|
||||||
|
session.clone_session().clone_history().await.get_history();
|
||||||
|
let forked: Vec<RolloutItem> = parent_history
|
||||||
|
.into_iter()
|
||||||
|
.map(RolloutItem::ResponseItem)
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// Start sub-agent one-shot conversation for summarization.
|
||||||
|
let config = ctx.client.config().as_ref().clone();
|
||||||
|
let io = run_codex_conversation_one_shot(
|
||||||
|
SubAgentRunParams {
|
||||||
|
config,
|
||||||
|
auth_manager: session.auth_manager(),
|
||||||
|
initial_history: Some(codex_protocol::protocol::InitialHistory::Forked(forked)),
|
||||||
|
sub_source: SubAgentSource::Compact,
|
||||||
|
parent_session: session.clone_session(),
|
||||||
|
parent_ctx: ctx.clone(),
|
||||||
|
cancel_token: cancellation_token.clone(),
|
||||||
|
},
|
||||||
|
input,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Drain events and capture last_agent_message from TaskComplete.
|
||||||
|
let mut summary_text: Option<String> = None;
|
||||||
|
if let Ok(io) = io {
|
||||||
|
while let Ok(event) = io.next_event().await {
|
||||||
|
match event.msg {
|
||||||
|
EventMsg::TaskComplete(done) => {
|
||||||
|
summary_text = done.last_agent_message;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
EventMsg::TurnAborted(_) => break,
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply compaction into the parent session if we captured a summary.
|
||||||
|
if let Some(summary_text) = summary_text {
|
||||||
|
let parent_sess = session.clone_session();
|
||||||
|
compact::apply_compaction(&parent_sess, &ctx, &summary_text).await;
|
||||||
|
// Inform users that compaction finished.
|
||||||
|
session
|
||||||
|
.clone_session()
|
||||||
|
.send_event(
|
||||||
|
ctx.as_ref(),
|
||||||
|
EventMsg::AgentMessage(crate::protocol::AgentMessageEvent {
|
||||||
|
message: "Compact task completed".to_string(),
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ use tokio_util::sync::CancellationToken;
|
|||||||
|
|
||||||
use crate::codex::Session;
|
use crate::codex::Session;
|
||||||
use crate::codex::TurnContext;
|
use crate::codex::TurnContext;
|
||||||
|
use crate::codex_delegate::SubAgentRunParams;
|
||||||
use crate::codex_delegate::run_codex_conversation_one_shot;
|
use crate::codex_delegate::run_codex_conversation_one_shot;
|
||||||
use crate::review_format::format_review_findings_block;
|
use crate::review_format::format_review_findings_block;
|
||||||
use crate::state::TaskKind;
|
use crate::state::TaskKind;
|
||||||
@@ -82,12 +83,16 @@ async fn start_review_conversation(
|
|||||||
// Set explicit review rubric for the sub-agent
|
// Set explicit review rubric for the sub-agent
|
||||||
sub_agent_config.base_instructions = Some(crate::REVIEW_PROMPT.to_string());
|
sub_agent_config.base_instructions = Some(crate::REVIEW_PROMPT.to_string());
|
||||||
(run_codex_conversation_one_shot(
|
(run_codex_conversation_one_shot(
|
||||||
sub_agent_config,
|
SubAgentRunParams {
|
||||||
session.auth_manager(),
|
config: sub_agent_config,
|
||||||
|
auth_manager: session.auth_manager(),
|
||||||
|
initial_history: None,
|
||||||
|
sub_source: codex_protocol::protocol::SubAgentSource::Review,
|
||||||
|
parent_session: session.clone_session(),
|
||||||
|
parent_ctx: ctx.clone(),
|
||||||
|
cancel_token: cancellation_token,
|
||||||
|
},
|
||||||
input,
|
input,
|
||||||
session.clone_session(),
|
|
||||||
ctx.clone(),
|
|
||||||
cancellation_token,
|
|
||||||
)
|
)
|
||||||
.await)
|
.await)
|
||||||
.ok()
|
.ok()
|
||||||
|
|||||||
@@ -146,7 +146,19 @@ async fn compact_resume_and_fork_preserve_model_history_view() {
|
|||||||
.unwrap_or_default()
|
.unwrap_or_default()
|
||||||
.to_string();
|
.to_string();
|
||||||
let tool_calls = json!(requests[0]["tools"].as_array());
|
let tool_calls = json!(requests[0]["tools"].as_array());
|
||||||
let prompt_cache_key = requests[0]["prompt_cache_key"]
|
let prompt_cache_key_first = requests[0]["prompt_cache_key"]
|
||||||
|
.as_str()
|
||||||
|
.unwrap_or_default()
|
||||||
|
.to_string();
|
||||||
|
let prompt_cache_key_compact = requests[1]["prompt_cache_key"]
|
||||||
|
.as_str()
|
||||||
|
.unwrap_or_default()
|
||||||
|
.to_string();
|
||||||
|
let prompt_cache_key_after_compact = requests[2]["prompt_cache_key"]
|
||||||
|
.as_str()
|
||||||
|
.unwrap_or_default()
|
||||||
|
.to_string();
|
||||||
|
let prompt_cache_key_after_resume = requests[3]["prompt_cache_key"]
|
||||||
.as_str()
|
.as_str()
|
||||||
.unwrap_or_default()
|
.unwrap_or_default()
|
||||||
.to_string();
|
.to_string();
|
||||||
@@ -202,7 +214,7 @@ async fn compact_resume_and_fork_preserve_model_history_view() {
|
|||||||
"include": [
|
"include": [
|
||||||
"reasoning.encrypted_content"
|
"reasoning.encrypted_content"
|
||||||
],
|
],
|
||||||
"prompt_cache_key": prompt_cache_key
|
"prompt_cache_key": prompt_cache_key_first
|
||||||
});
|
});
|
||||||
let compact_1 = json!(
|
let compact_1 = json!(
|
||||||
{
|
{
|
||||||
@@ -271,7 +283,7 @@ async fn compact_resume_and_fork_preserve_model_history_view() {
|
|||||||
"include": [
|
"include": [
|
||||||
"reasoning.encrypted_content"
|
"reasoning.encrypted_content"
|
||||||
],
|
],
|
||||||
"prompt_cache_key": prompt_cache_key
|
"prompt_cache_key": prompt_cache_key_compact
|
||||||
});
|
});
|
||||||
let user_turn_2_after_compact = json!(
|
let user_turn_2_after_compact = json!(
|
||||||
{
|
{
|
||||||
@@ -336,7 +348,7 @@ SUMMARY_ONLY_CONTEXT"
|
|||||||
"include": [
|
"include": [
|
||||||
"reasoning.encrypted_content"
|
"reasoning.encrypted_content"
|
||||||
],
|
],
|
||||||
"prompt_cache_key": prompt_cache_key
|
"prompt_cache_key": prompt_cache_key_after_compact
|
||||||
});
|
});
|
||||||
let usert_turn_3_after_resume = json!(
|
let usert_turn_3_after_resume = json!(
|
||||||
{
|
{
|
||||||
@@ -421,7 +433,7 @@ SUMMARY_ONLY_CONTEXT"
|
|||||||
"include": [
|
"include": [
|
||||||
"reasoning.encrypted_content"
|
"reasoning.encrypted_content"
|
||||||
],
|
],
|
||||||
"prompt_cache_key": prompt_cache_key
|
"prompt_cache_key": prompt_cache_key_after_resume
|
||||||
});
|
});
|
||||||
let user_turn_3_after_fork = json!(
|
let user_turn_3_after_fork = json!(
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user