This commit is contained in:
Ahmed Ibrahim
2025-10-29 20:47:45 -07:00
parent 0af13d5c6a
commit c9686130ce
5 changed files with 67 additions and 276 deletions

View File

@@ -43,15 +43,17 @@ pub(crate) async fn run_inline_auto_compact_task(
// Launch sub-agent one-shot; drain to completion and capture summary.
let config = turn_context.client.config().as_ref().clone();
let cancel = tokio_util::sync::CancellationToken::new();
if let Ok(io) = crate::codex_delegate::run_codex_conversation_one_shot_with(
config,
sess.services.auth_manager.clone(),
codex_protocol::protocol::InitialHistory::Forked(forked),
codex_protocol::protocol::SubAgentSource::Compact,
if let Ok(io) = crate::codex_delegate::run_codex_conversation_one_shot(
crate::codex_delegate::SubAgentRunParams {
config,
auth_manager: sess.services.auth_manager.clone(),
initial_history: Some(codex_protocol::protocol::InitialHistory::Forked(forked)),
sub_source: codex_protocol::protocol::SubAgentSource::Compact,
parent_session: Arc::clone(&sess),
parent_ctx: Arc::clone(&turn_context),
cancel_token: cancel,
},
input,
Arc::clone(&sess),
Arc::clone(&turn_context),
cancel,
)
.await
{
@@ -66,7 +68,7 @@ pub(crate) async fn run_inline_auto_compact_task(
}
}
if let Some(summary) = summary_text {
apply_compaction(sess, turn_context, &summary).await;
apply_compaction(&sess, &turn_context, &summary).await;
let event =
crate::protocol::EventMsg::AgentMessage(crate::protocol::AgentMessageEvent {
message: "Compact task completed".to_string(),
@@ -160,8 +162,8 @@ fn build_compacted_history_with_limit(
/// conversation with a bridge message, preserve ghost snapshots, persist the
/// Compacted rollout entry, and replace history.
pub(crate) async fn apply_compaction(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
summary_text: &str,
) {
let history_snapshot = sess.clone_history().await.get_history();

View File

@@ -25,60 +25,48 @@ use crate::config::Config;
use crate::error::CodexErr;
use codex_protocol::protocol::InitialHistory;
/// Parameters for running a sub-agent (delegate) conversation.
pub(crate) struct SubAgentRunParams {
pub config: Config,
pub auth_manager: Arc<AuthManager>,
pub initial_history: Option<InitialHistory>,
pub sub_source: SubAgentSource,
pub parent_session: Arc<Session>,
pub parent_ctx: Arc<TurnContext>,
pub cancel_token: CancellationToken,
}
/// Start an interactive sub-Codex conversation and return IO channels.
///
/// The returned `events_rx` yields non-approval events emitted by the sub-agent.
/// Approval requests are handled via `parent_session` and are not surfaced.
/// The returned `ops_tx` allows the caller to submit additional `Op`s to the sub-agent.
pub(crate) async fn run_codex_conversation_interactive(
config: Config,
auth_manager: Arc<AuthManager>,
parent_session: Arc<Session>,
parent_ctx: Arc<TurnContext>,
cancel_token: CancellationToken,
) -> Result<Codex, CodexErr> {
run_codex_conversation_interactive_with(
config,
auth_manager,
InitialHistory::New,
SubAgentSource::Review,
parent_session,
parent_ctx,
cancel_token,
)
.await
}
/// Start an interactive sub-Codex conversation with custom initial history and source.
pub(crate) async fn run_codex_conversation_interactive_with(
config: Config,
auth_manager: Arc<AuthManager>,
initial_history: InitialHistory,
sub_source: SubAgentSource,
parent_session: Arc<Session>,
parent_ctx: Arc<TurnContext>,
cancel_token: CancellationToken,
params: SubAgentRunParams,
) -> Result<Codex, CodexErr> {
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
let (tx_ops, rx_ops) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
let CodexSpawnOk { codex, .. } = Codex::spawn(
config,
auth_manager,
initial_history,
SessionSource::SubAgent(sub_source),
params.config.clone(),
Arc::clone(&params.auth_manager),
params
.initial_history
.clone()
.unwrap_or(InitialHistory::New),
SessionSource::SubAgent(params.sub_source.clone()),
)
.await?;
let codex = Arc::new(codex);
// Use a child token so parent cancel cascades but we can scope it to this task
let cancel_token_events = cancel_token.child_token();
let cancel_token_ops = cancel_token.child_token();
let cancel_token_events = params.cancel_token.child_token();
let cancel_token_ops = params.cancel_token.child_token();
// Forward events from the sub-agent to the consumer, filtering approvals and
// routing them to the parent session for decisions.
let parent_session_clone = Arc::clone(&parent_session);
let parent_ctx_clone = Arc::clone(&parent_ctx);
let parent_session_clone = Arc::clone(&params.parent_session);
let parent_ctx_clone = Arc::clone(&params.parent_ctx);
let codex_for_events = Arc::clone(&codex);
tokio::spawn(async move {
let _ = forward_events(
@@ -109,53 +97,18 @@ pub(crate) async fn run_codex_conversation_interactive_with(
///
/// Internally calls the interactive variant, then immediately submits the provided input.
pub(crate) async fn run_codex_conversation_one_shot(
config: Config,
auth_manager: Arc<AuthManager>,
mut params: SubAgentRunParams,
input: Vec<UserInput>,
parent_session: Arc<Session>,
parent_ctx: Arc<TurnContext>,
cancel_token: CancellationToken,
) -> Result<Codex, CodexErr> {
run_codex_conversation_one_shot_with(
config,
auth_manager,
InitialHistory::New,
SubAgentSource::Review,
input,
parent_session,
parent_ctx,
cancel_token,
)
.await
}
/// One-shot variant with custom initial history and source.
pub(crate) async fn run_codex_conversation_one_shot_with(
config: Config,
auth_manager: Arc<AuthManager>,
initial_history: InitialHistory,
sub_source: SubAgentSource,
input: Vec<UserInput>,
parent_session: Arc<Session>,
parent_ctx: Arc<TurnContext>,
cancel_token: CancellationToken,
) -> Result<Codex, CodexErr> {
// Use a child token so we can stop the delegate after completion without
// requiring the caller to cancel the parent token.
let child_cancel = cancel_token.child_token();
let io = run_codex_conversation_interactive_with(
config,
auth_manager,
initial_history,
sub_source,
parent_session,
parent_ctx,
child_cancel.clone(),
)
.await?;
let child_cancel = params.cancel_token.child_token();
params.cancel_token = child_cancel.clone();
let io_input = input.clone();
let io = run_codex_conversation_interactive(params).await?;
// Send the initial input to kick off the one-shot turn.
io.submit(Op::UserInput { items: input }).await?;
io.submit(Op::UserInput { items: io_input }).await?;
// Bridge events so we can observe completion and shut down automatically.
let (tx_bridge, rx_bridge) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);

View File

@@ -77,18 +77,6 @@ impl ConversationHistory {
history
}
pub(crate) fn remove_first_item(&mut self) {
if !self.items.is_empty() {
// Remove the oldest item (front of the list). Items are ordered from
// oldest → newest, so index 0 is the first entry recorded.
let removed = self.items.remove(0);
// If the removed item participates in a call/output pair, also remove
// its corresponding counterpart to keep the invariants intact without
// running a full normalization pass.
self.remove_corresponding_for(&removed);
}
}
pub(crate) fn replace(&mut self, items: Vec<ResponseItem>) {
self.items = items;
}
@@ -313,75 +301,6 @@ impl ConversationHistory {
}
}
/// Removes the corresponding paired item for the provided `item`, if any.
///
/// Pairs:
/// - FunctionCall <-> FunctionCallOutput
/// - CustomToolCall <-> CustomToolCallOutput
/// - LocalShellCall(call_id: Some) <-> FunctionCallOutput
fn remove_corresponding_for(&mut self, item: &ResponseItem) {
match item {
ResponseItem::FunctionCall { call_id, .. } => {
self.remove_first_matching(|i| match i {
ResponseItem::FunctionCallOutput {
call_id: existing, ..
} => existing == call_id,
_ => false,
});
}
ResponseItem::CustomToolCall { call_id, .. } => {
self.remove_first_matching(|i| match i {
ResponseItem::CustomToolCallOutput {
call_id: existing, ..
} => existing == call_id,
_ => false,
});
}
ResponseItem::LocalShellCall {
call_id: Some(call_id),
..
} => {
self.remove_first_matching(|i| match i {
ResponseItem::FunctionCallOutput {
call_id: existing, ..
} => existing == call_id,
_ => false,
});
}
ResponseItem::FunctionCallOutput { call_id, .. } => {
self.remove_first_matching(|i| match i {
ResponseItem::FunctionCall {
call_id: existing, ..
} => existing == call_id,
ResponseItem::LocalShellCall {
call_id: Some(existing),
..
} => existing == call_id,
_ => false,
});
}
ResponseItem::CustomToolCallOutput { call_id, .. } => {
self.remove_first_matching(|i| match i {
ResponseItem::CustomToolCall {
call_id: existing, ..
} => existing == call_id,
_ => false,
});
}
_ => {}
}
}
/// Remove the first item matching the predicate.
fn remove_first_matching<F>(&mut self, predicate: F)
where
F: FnMut(&ResponseItem) -> bool,
{
if let Some(pos) = self.items.iter().position(predicate) {
self.items.remove(pos);
}
}
fn process_item(item: &ResponseItem) -> ResponseItem {
match item {
ResponseItem::FunctionCallOutput { call_id, output } => {
@@ -649,98 +568,6 @@ mod tests {
assert_eq!(filtered, vec![]);
}
#[test]
fn remove_first_item_removes_matching_output_for_function_call() {
let items = vec![
ResponseItem::FunctionCall {
id: None,
name: "do_it".to_string(),
arguments: "{}".to_string(),
call_id: "call-1".to_string(),
},
ResponseItem::FunctionCallOutput {
call_id: "call-1".to_string(),
output: FunctionCallOutputPayload {
content: "ok".to_string(),
..Default::default()
},
},
];
let mut h = create_history_with_items(items);
h.remove_first_item();
assert_eq!(h.contents(), vec![]);
}
#[test]
fn remove_first_item_removes_matching_call_for_output() {
let items = vec![
ResponseItem::FunctionCallOutput {
call_id: "call-2".to_string(),
output: FunctionCallOutputPayload {
content: "ok".to_string(),
..Default::default()
},
},
ResponseItem::FunctionCall {
id: None,
name: "do_it".to_string(),
arguments: "{}".to_string(),
call_id: "call-2".to_string(),
},
];
let mut h = create_history_with_items(items);
h.remove_first_item();
assert_eq!(h.contents(), vec![]);
}
#[test]
fn remove_first_item_handles_local_shell_pair() {
let items = vec![
ResponseItem::LocalShellCall {
id: None,
call_id: Some("call-3".to_string()),
status: LocalShellStatus::Completed,
action: LocalShellAction::Exec(LocalShellExecAction {
command: vec!["echo".to_string(), "hi".to_string()],
timeout_ms: None,
working_directory: None,
env: None,
user: None,
}),
},
ResponseItem::FunctionCallOutput {
call_id: "call-3".to_string(),
output: FunctionCallOutputPayload {
content: "ok".to_string(),
..Default::default()
},
},
];
let mut h = create_history_with_items(items);
h.remove_first_item();
assert_eq!(h.contents(), vec![]);
}
#[test]
fn remove_first_item_handles_custom_tool_pair() {
let items = vec![
ResponseItem::CustomToolCall {
id: None,
status: None,
call_id: "tool-1".to_string(),
name: "my_tool".to_string(),
input: "{}".to_string(),
},
ResponseItem::CustomToolCallOutput {
call_id: "tool-1".to_string(),
output: "ok".to_string(),
},
];
let mut h = create_history_with_items(items);
h.remove_first_item();
assert_eq!(h.contents(), vec![]);
}
#[test]
fn record_items_truncates_function_call_output_content() {
let mut history = ConversationHistory::new();

View File

@@ -5,13 +5,14 @@ use tokio_util::sync::CancellationToken;
use crate::codex::TurnContext;
use crate::codex::compact;
use crate::codex_delegate::run_codex_conversation_one_shot_with;
use crate::codex_delegate::SubAgentRunParams;
use crate::codex_delegate::run_codex_conversation_one_shot;
use crate::protocol::EventMsg;
use crate::protocol::SubAgentSource;
use crate::state::TaskKind;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::user_input::UserInput;
use std::sync::Arc;
use super::SessionTask;
use super::SessionTaskContext;
@@ -51,15 +52,17 @@ impl SessionTask for CompactTask {
// Start sub-agent one-shot conversation for summarization.
let config = ctx.client.config().as_ref().clone();
let io = run_codex_conversation_one_shot_with(
config,
session.auth_manager(),
codex_protocol::protocol::InitialHistory::Forked(forked),
SubAgentSource::Compact,
let io = run_codex_conversation_one_shot(
SubAgentRunParams {
config,
auth_manager: session.auth_manager(),
initial_history: Some(codex_protocol::protocol::InitialHistory::Forked(forked)),
sub_source: SubAgentSource::Compact,
parent_session: session.clone_session(),
parent_ctx: ctx.clone(),
cancel_token: cancellation_token.clone(),
},
input,
session.clone_session(),
ctx.clone(),
cancellation_token.clone(),
)
.await;
@@ -80,7 +83,8 @@ impl SessionTask for CompactTask {
// Apply compaction into the parent session if we captured a summary.
if let Some(summary_text) = summary_text {
compact::apply_compaction(session.clone_session(), ctx.clone(), &summary_text).await;
let parent_sess = session.clone_session();
compact::apply_compaction(&parent_sess, &ctx, &summary_text).await;
// Inform users that compaction finished.
session
.clone_session()

View File

@@ -13,6 +13,7 @@ use tokio_util::sync::CancellationToken;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::codex_delegate::SubAgentRunParams;
use crate::codex_delegate::run_codex_conversation_one_shot;
use crate::review_format::format_review_findings_block;
use crate::state::TaskKind;
@@ -82,12 +83,16 @@ async fn start_review_conversation(
// Set explicit review rubric for the sub-agent
sub_agent_config.base_instructions = Some(crate::REVIEW_PROMPT.to_string());
(run_codex_conversation_one_shot(
sub_agent_config,
session.auth_manager(),
SubAgentRunParams {
config: sub_agent_config,
auth_manager: session.auth_manager(),
initial_history: None,
sub_source: codex_protocol::protocol::SubAgentSource::Review,
parent_session: session.clone_session(),
parent_ctx: ctx.clone(),
cancel_token: cancellation_token,
},
input,
session.clone_session(),
ctx.clone(),
cancellation_token,
)
.await)
.ok()