Fix turn extension data task plumbing (#22646)

## Summary
- carry the per-turn extension data through RunningTask so abort
handling can rebuild SessionTaskContext
- update stale test ExtensionData::new() callsites to pass the turn id

## Testing
- Not run after PR branch creation; CI will cover.
This commit is contained in:
jif-oai
2026-05-14 16:00:06 +02:00
committed by GitHub
parent 9ea38136b0
commit 12bfb57139
5 changed files with 17 additions and 9 deletions

View File

@@ -7009,7 +7009,9 @@ async fn handle_output_item_done_records_image_save_history_message() {
let mut ctx = HandleOutputCtx {
sess: Arc::clone(&session),
turn_context: Arc::clone(&turn_context),
turn_store: Arc::new(codex_extension_api::ExtensionData::new()),
turn_store: Arc::new(codex_extension_api::ExtensionData::new(
turn_context.sub_id.clone(),
)),
tool_runtime: test_tool_runtime(Arc::clone(&session), Arc::clone(&turn_context)),
cancellation_token: CancellationToken::new(),
};
@@ -7062,7 +7064,9 @@ async fn handle_output_item_done_skips_image_save_message_when_save_fails() {
let mut ctx = HandleOutputCtx {
sess: Arc::clone(&session),
turn_context: Arc::clone(&turn_context),
turn_store: Arc::new(codex_extension_api::ExtensionData::new()),
turn_store: Arc::new(codex_extension_api::ExtensionData::new(
turn_context.sub_id.clone(),
)),
tool_runtime: test_tool_runtime(Arc::clone(&session), Arc::clone(&turn_context)),
cancellation_token: CancellationToken::new(),
};
@@ -8950,7 +8954,7 @@ async fn tool_calls_reopen_mailbox_delivery_for_current_turn() {
let mut ctx = HandleOutputCtx {
sess: Arc::clone(&sess),
turn_context: Arc::clone(&tc),
turn_store: Arc::new(codex_extension_api::ExtensionData::new()),
turn_store: Arc::new(codex_extension_api::ExtensionData::new(tc.sub_id.clone())),
tool_runtime: test_tool_runtime(Arc::clone(&sess), Arc::clone(&tc)),
cancellation_token: CancellationToken::new(),
};

View File

@@ -43,7 +43,7 @@ async fn plan_mode_uses_contributed_turn_item_for_last_agent_message() {
let mut builder = codex_extension_api::ExtensionRegistryBuilder::new();
builder.turn_item_contributor(Arc::new(RewriteAgentMessageContributor));
session.services.extensions = Arc::new(builder.build());
let turn_store = ExtensionData::new();
let turn_store = ExtensionData::new(turn_context.sub_id.clone());
let mut state = PlanModeStreamState::new(&turn_context.sub_id);
let mut last_agent_message = None;
let item = assistant_output_text("original assistant text");

View File

@@ -9,6 +9,7 @@ use tokio::sync::Notify;
use tokio_util::sync::CancellationToken;
use tokio_util::task::AbortOnDropHandle;
use codex_extension_api::ExtensionData;
use codex_protocol::dynamic_tools::DynamicToolResponse;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::request_permissions::RequestPermissionProfile;
@@ -75,6 +76,7 @@ pub(crate) struct RunningTask {
pub(crate) cancellation_token: CancellationToken,
pub(crate) handle: AbortOnDropHandle<()>,
pub(crate) turn_context: Arc<TurnContext>,
pub(crate) turn_extension_data: Arc<ExtensionData>,
// Timer recorded when the task drops to capture the full turn duration.
pub(crate) _timer: Option<codex_otel::Timer>,
}

View File

@@ -214,7 +214,7 @@ async fn handle_non_tool_response_item_runs_turn_item_contributors_only_when_req
let mut builder = codex_extension_api::ExtensionRegistryBuilder::new();
builder.turn_item_contributor(Arc::new(TestTurnItemContributor));
session.services.extensions = Arc::new(builder.build());
let turn_store = ExtensionData::new();
let turn_store = ExtensionData::new(turn_context.sub_id.clone());
let item = assistant_output_text(
"hello<oai-mem-citation>ignored by memory parser</oai-mem-citation> world",
);
@@ -288,8 +288,8 @@ async fn handle_output_item_done_returns_contributed_last_agent_message() {
let item = assistant_output_text("original assistant text");
let mut ctx = HandleOutputCtx {
sess: session,
turn_context,
turn_store: Arc::new(ExtensionData::new()),
turn_context: Arc::clone(&turn_context),
turn_store: Arc::new(ExtensionData::new(turn_context.sub_id.clone())),
tool_runtime,
cancellation_token: CancellationToken::new(),
};
@@ -310,7 +310,7 @@ async fn finalized_turn_item_defers_mailbox_for_contributed_visible_text() {
let mut builder = codex_extension_api::ExtensionRegistryBuilder::new();
builder.turn_item_contributor(Arc::new(RewriteAgentMessageContributor));
session.services.extensions = Arc::new(builder.build());
let turn_store = ExtensionData::new();
let turn_store = ExtensionData::new(turn_context.sub_id.clone());
let item = assistant_output_text("<oai-mem-citation>hidden only</oai-mem-citation>");
let finalized = finalize_non_tool_response_item(
@@ -336,7 +336,7 @@ async fn finalized_turn_item_keeps_mailbox_open_for_commentary_text() {
let mut builder = codex_extension_api::ExtensionRegistryBuilder::new();
builder.turn_item_contributor(Arc::new(RewriteAgentMessageContributor));
session.services.extensions = Arc::new(builder.build());
let turn_store = ExtensionData::new();
let turn_store = ExtensionData::new(turn_context.sub_id.clone());
let item = assistant_output_text_with_phase("still working", Some(MessagePhase::Commentary));
let finalized = finalize_non_tool_response_item(

View File

@@ -367,6 +367,7 @@ impl Session {
}
self.emit_turn_start_lifecycle(turn_context.extension_data.as_ref());
let turn_extension_data = Arc::clone(&turn_context.extension_data);
let mut active = self.active_turn.lock().await;
let turn = active.get_or_insert_with(ActiveTurn::default);
debug_assert!(turn.tasks.is_empty());
@@ -439,6 +440,7 @@ impl Session {
task,
cancellation_token,
turn_context: Arc::clone(&turn_context),
turn_extension_data,
_timer: timer,
};
turn.add_task(running_task);