Compare commits

...

1 Commits

Author SHA1 Message Date
Ahmed Ibrahim
680aaf0fad migrate 2025-10-29 17:34:22 -07:00
3 changed files with 254 additions and 13 deletions

View File

@@ -2,7 +2,6 @@ use std::sync::Arc;
use super::Session;
use super::TurnContext;
use super::get_last_assistant_message_from_turn;
use crate::Prompt;
use crate::client_common::ResponseEvent;
use crate::error::CodexErr;
@@ -20,11 +19,19 @@ use codex_protocol::items::TurnItem;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::user_input::UserInput;
use futures::prelude::*;
use tokio_util::sync::CancellationToken;
use tracing::error;
use crate::codex_delegate::run_codex_conversation_with_history_one_shot;
use crate::features::Feature;
use crate::protocol::Event;
use crate::protocol::EventMsg as Ev;
pub const SUMMARIZATION_PROMPT: &str = include_str!("../../templates/compact/prompt.md");
const COMPACT_USER_MESSAGE_MAX_TOKENS: usize = 20_000;
@@ -42,7 +49,7 @@ pub(crate) async fn run_inline_auto_compact_task(
let input = vec![UserInput::Text {
text: SUMMARIZATION_PROMPT.to_string(),
}];
run_compact_task_inner(sess, turn_context, input).await;
run_compact_task_inner_delegate(sess, turn_context, input).await;
}
pub(crate) async fn run_compact_task(
@@ -54,7 +61,7 @@ pub(crate) async fn run_compact_task(
model_context_window: turn_context.client.get_model_context_window(),
});
sess.send_event(&turn_context, start_event).await;
run_compact_task_inner(sess.clone(), turn_context, input).await;
run_compact_task_inner_delegate(sess.clone(), turn_context, input).await;
None
}
@@ -63,7 +70,7 @@ async fn run_compact_task_inner(
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
) {
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input.clone());
let mut history = sess.clone_history().await;
history.record_items(&[initial_input_for_turn.into()]);
@@ -148,7 +155,17 @@ async fn run_compact_task_inner(
}
let history_snapshot = sess.clone_history().await.get_history();
let summary_text = get_last_assistant_message_from_turn(&history_snapshot).unwrap_or_default();
let summary_text =
super::get_last_assistant_message_from_turn(&history_snapshot).unwrap_or_default();
finish_compact(sess, turn_context, summary_text, history_snapshot).await;
}
async fn finish_compact(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
summary_text: String,
history_snapshot: Vec<ResponseItem>,
) {
let user_messages = collect_user_messages(&history_snapshot);
let initial_context = sess.build_initial_context(turn_context.as_ref());
let mut new_history = build_compacted_history(initial_context, &user_messages, &summary_text);
@@ -171,6 +188,166 @@ async fn run_compact_task_inner(
sess.send_event(&turn_context, event).await;
}
async fn run_compact_task_inner_delegate(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
) {
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input.clone());
let mut history = sess.clone_history().await;
history.record_items(&[initial_input_for_turn.into()]);
let mut truncated_count = 0usize;
let max_retries = turn_context.client.get_provider().stream_max_retries();
let mut retries = 0;
let rollout_item = RolloutItem::TurnContext(TurnContextItem {
cwd: turn_context.cwd.clone(),
approval_policy: turn_context.approval_policy,
sandbox_policy: turn_context.sandbox_policy.clone(),
model: turn_context.client.get_model(),
effort: turn_context.client.get_reasoning_effort(),
summary: turn_context.client.get_reasoning_summary(),
});
sess.persist_rollout_items(&[rollout_item]).await;
// Delegate the model run to a sub-agent while preserving retries and trim behavior.
let mut last_agent_message: Option<String> = None;
loop {
let turn_input = history.get_history_for_prompt();
// Use the full prompt history as initial context for the delegate, and
// pass the summarization trigger as the one-shot user input.
let prefix_items = turn_input.clone();
let input_for_delegate = input.clone();
// Build initial history for the delegate from the prompt prefix.
let initial = InitialHistory::Forked(
prefix_items
.into_iter()
.map(RolloutItem::ResponseItem)
.collect(),
);
// Clone config and run a one-shot sub-agent turn labeled as Compact.
let mut sub_config = turn_context.client.config().as_ref().clone();
// Disable tools for the summarization run to match Prompt::default() semantics.
sub_config
.features
.disable(Feature::WebSearchRequest)
.disable(Feature::ViewImageTool)
.disable(Feature::StreamableShell)
.disable(Feature::UnifiedExec)
.disable(Feature::ApplyPatchFreeform);
let cancel = CancellationToken::new();
let run = run_codex_conversation_with_history_one_shot(
sub_config,
Arc::clone(&sess.services.auth_manager),
initial,
SubAgentSource::Compact,
input_for_delegate,
sess.clone(),
turn_context.clone(),
cancel.clone(),
)
.await;
let Ok(io) = run else {
if retries < max_retries {
retries += 1;
let delay = backoff(retries);
sess.notify_stream_error(
turn_context.as_ref(),
format!("Reconnecting... {retries}/{max_retries}"),
)
.await;
tokio::time::sleep(delay).await;
continue;
} else {
let event = EventMsg::Error(ErrorEvent {
message: "delegate failed to start".to_string(),
});
sess.send_event(&turn_context, event).await;
return;
}
};
// Process delegate events; forward to parent and interpret errors for retry/trim.
let mut saw_error: Option<String> = None;
while let Ok(Event { msg, .. }) = io.next_event().await {
match msg.clone() {
Ev::TaskComplete(done) => {
last_agent_message = done.last_agent_message;
saw_error = None;
break;
}
Ev::TurnAborted(_) => {
return;
}
Ev::Error(err_event) => {
saw_error = Some(err_event.message);
}
other => {
// Forward all other events for UI continuity (streaming text, token counts, etc.)
sess.send_event(&turn_context, other).await;
}
}
}
if let Some(message) = saw_error {
// Treat context window errors specially: trim oldest and retry.
if message == CodexErr::ContextWindowExceeded.to_string() {
if turn_input.len() > 1 {
error!(
"Context window exceeded while compacting; removing oldest history item. Error: {message}"
);
history.remove_first_item();
truncated_count += 1;
retries = 0;
continue;
}
sess.set_total_tokens_full(turn_context.as_ref()).await;
let event = EventMsg::Error(ErrorEvent { message });
sess.send_event(&turn_context, event).await;
return;
}
if retries < max_retries {
retries += 1;
let delay = backoff(retries);
sess.notify_stream_error(
turn_context.as_ref(),
format!("Reconnecting... {retries}/{max_retries}"),
)
.await;
tokio::time::sleep(delay).await;
continue;
} else {
let event = EventMsg::Error(ErrorEvent { message });
sess.send_event(&turn_context, event).await;
return;
}
} else {
if truncated_count > 0 {
sess.notify_background_event(
turn_context.as_ref(),
format!(
"Trimmed {truncated_count} older conversation item(s) before compacting so the prompt fits the model context window."
),
)
.await;
}
break;
}
}
let history_snapshot = sess.clone_history().await.get_history();
let summary_text = last_agent_message.unwrap_or_default();
finish_compact(sess, turn_context, summary_text, history_snapshot).await;
}
pub fn content_items_to_text(content: &[ContentItem]) -> Option<String> {
let mut pieces = Vec::new();
for item in content {

View File

@@ -36,6 +36,31 @@ pub(crate) async fn run_codex_conversation_interactive(
parent_session: Arc<Session>,
parent_ctx: Arc<TurnContext>,
cancel_token: CancellationToken,
) -> Result<Codex, CodexErr> {
run_codex_conversation_interactive_with_history(
config,
auth_manager,
InitialHistory::New,
SubAgentSource::Review,
parent_session,
parent_ctx,
cancel_token,
)
.await
}
/// Start an interactive sub-Codex conversation with a provided initial history and source.
///
/// Mirrors `run_codex_conversation_interactive` but allows the caller to seed history and
/// specify a sub-agent source label for rollout provenance.
pub(crate) async fn run_codex_conversation_interactive_with_history(
config: Config,
auth_manager: Arc<AuthManager>,
initial_history: InitialHistory,
sub_source: SubAgentSource,
parent_session: Arc<Session>,
parent_ctx: Arc<TurnContext>,
cancel_token: CancellationToken,
) -> Result<Codex, CodexErr> {
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
let (tx_ops, rx_ops) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
@@ -43,8 +68,8 @@ pub(crate) async fn run_codex_conversation_interactive(
let CodexSpawnOk { codex, .. } = Codex::spawn(
config,
auth_manager,
InitialHistory::New,
SessionSource::SubAgent(SubAgentSource::Review),
initial_history,
SessionSource::SubAgent(sub_source),
)
.await?;
let codex = Arc::new(codex);
@@ -93,13 +118,39 @@ pub(crate) async fn run_codex_conversation_one_shot(
parent_session: Arc<Session>,
parent_ctx: Arc<TurnContext>,
cancel_token: CancellationToken,
) -> Result<Codex, CodexErr> {
run_codex_conversation_with_history_one_shot(
config,
auth_manager,
InitialHistory::New,
SubAgentSource::Review,
input,
parent_session,
parent_ctx,
cancel_token,
)
.await
}
/// Convenience wrapper for one-time use with initial history and explicit sub-agent source.
pub(crate) async fn run_codex_conversation_with_history_one_shot(
config: Config,
auth_manager: Arc<AuthManager>,
initial_history: InitialHistory,
sub_source: SubAgentSource,
input: Vec<UserInput>,
parent_session: Arc<Session>,
parent_ctx: Arc<TurnContext>,
cancel_token: CancellationToken,
) -> Result<Codex, CodexErr> {
// Use a child token so we can stop the delegate after completion without
// requiring the caller to cancel the parent token.
let child_cancel = cancel_token.child_token();
let io = run_codex_conversation_interactive(
let io = run_codex_conversation_interactive_with_history(
config,
auth_manager,
initial_history,
sub_source,
parent_session,
parent_ctx,
child_cancel.clone(),

View File

@@ -146,7 +146,20 @@ async fn compact_resume_and_fork_preserve_model_history_view() {
.unwrap_or_default()
.to_string();
let tool_calls = json!(requests[0]["tools"].as_array());
let prompt_cache_key = requests[0]["prompt_cache_key"]
// Allow sub-agent runs to use a distinct prompt_cache_key; capture per-index keys.
let pk0 = requests[0]["prompt_cache_key"]
.as_str()
.unwrap_or_default()
.to_string();
let pk1 = requests[1]["prompt_cache_key"]
.as_str()
.unwrap_or_default()
.to_string();
let pk2 = requests[2]["prompt_cache_key"]
.as_str()
.unwrap_or_default()
.to_string();
let pk3 = requests[3]["prompt_cache_key"]
.as_str()
.unwrap_or_default()
.to_string();
@@ -202,7 +215,7 @@ async fn compact_resume_and_fork_preserve_model_history_view() {
"include": [
"reasoning.encrypted_content"
],
"prompt_cache_key": prompt_cache_key
"prompt_cache_key": pk0
});
let compact_1 = json!(
{
@@ -271,7 +284,7 @@ async fn compact_resume_and_fork_preserve_model_history_view() {
"include": [
"reasoning.encrypted_content"
],
"prompt_cache_key": prompt_cache_key
"prompt_cache_key": pk1
});
let user_turn_2_after_compact = json!(
{
@@ -336,7 +349,7 @@ SUMMARY_ONLY_CONTEXT"
"include": [
"reasoning.encrypted_content"
],
"prompt_cache_key": prompt_cache_key
"prompt_cache_key": pk2
});
let usert_turn_3_after_resume = json!(
{
@@ -421,7 +434,7 @@ SUMMARY_ONLY_CONTEXT"
"include": [
"reasoning.encrypted_content"
],
"prompt_cache_key": prompt_cache_key
"prompt_cache_key": pk3
});
let user_turn_3_after_fork = json!(
{