extension-api: add TurnItemEmitter to tool calls (#24813)

## Why
Extension-contributed tools need to emit visible turn items through
Codex's normal event and persistence pipeline.

## What
- Add `TurnItemEmitter` to extension `ToolCall`s and route the core
implementation through `Session::emit_turn_item_*`.
- Hold weak session and turn references so retained tool calls cannot
keep host state alive.
- Provide a no-op emitter for extension test callers.

## Test Plan
- `just test -p codex-core -E
'test(passes_turn_fields_and_scoped_turn_item_emitter_to_extension_call)'`

---------

Co-authored-by: jif-oai <jif@openai.com>
This commit is contained in:
sayan-oai
2026-05-28 09:13:43 -07:00
committed by GitHub
parent a061befb46
commit 2066874415
6 changed files with 179 additions and 5 deletions

View File

@@ -1,11 +1,18 @@
use std::sync::Arc;
use std::sync::Weak;
use codex_protocol::items::TurnItem;
use codex_tools::ConversationHistory;
use codex_tools::ExtensionTurnItem;
use codex_tools::ToolCall as ExtensionToolCall;
use codex_tools::ToolName;
use codex_tools::ToolSpec;
use codex_tools::TurnItemEmissionFuture;
use codex_tools::TurnItemEmitter;
use crate::function_tool::FunctionCallError;
use crate::session::session::Session;
use crate::session::turn_context::TurnContext;
use crate::tools::context::ToolInvocation;
use crate::tools::context::ToolOutput;
use crate::tools::context::ToolPayload;
@@ -52,6 +59,39 @@ impl CoreToolRuntime for ExtensionToolAdapter {
}
}
struct CoreTurnItemEmitter {
session: Weak<Session>,
turn: Weak<TurnContext>,
}
fn extension_turn_item(item: ExtensionTurnItem) -> TurnItem {
match item {
ExtensionTurnItem::WebSearch(item) => TurnItem::WebSearch(item),
}
}
impl TurnItemEmitter for CoreTurnItemEmitter {
fn emit_started<'a>(&'a self, item: ExtensionTurnItem) -> TurnItemEmissionFuture<'a> {
Box::pin(async move {
let (Some(session), Some(turn)) = (self.session.upgrade(), self.turn.upgrade()) else {
return;
};
let item = extension_turn_item(item);
session.emit_turn_item_started(turn.as_ref(), &item).await;
})
}
fn emit_completed<'a>(&'a self, item: ExtensionTurnItem) -> TurnItemEmissionFuture<'a> {
Box::pin(async move {
let (Some(session), Some(turn)) = (self.session.upgrade(), self.turn.upgrade()) else {
return;
};
let item = extension_turn_item(item);
session.emit_turn_item_completed(turn.as_ref(), item).await;
})
}
}
async fn to_extension_call(invocation: &ToolInvocation) -> ExtensionToolCall {
let conversation_history =
ConversationHistory::new(invocation.session.clone_history().await.into_raw_items());
@@ -61,6 +101,10 @@ async fn to_extension_call(invocation: &ToolInvocation) -> ExtensionToolCall {
tool_name: invocation.tool_name.clone(),
truncation_policy: invocation.turn.truncation_policy,
conversation_history,
turn_item_emitter: Arc::new(CoreTurnItemEmitter {
session: Arc::downgrade(&invocation.session),
turn: Arc::downgrade(&invocation.turn),
}),
payload: invocation.payload.clone(),
}
}
@@ -69,8 +113,13 @@ async fn to_extension_call(invocation: &ToolInvocation) -> ExtensionToolCall {
mod tests {
use std::sync::Arc;
use codex_protocol::items::TurnItem;
use codex_protocol::items::WebSearchItem;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::models::WebSearchAction;
use codex_protocol::protocol::EventMsg;
use codex_tools::ExtensionTurnItem;
use pretty_assertions::assert_eq;
use serde_json::json;
use tokio::sync::Mutex;
@@ -147,6 +196,16 @@ mod tests {
&self,
call: codex_tools::ToolCall,
) -> Result<Box<dyn codex_tools::ToolOutput>, codex_tools::FunctionCallError> {
let item = ExtensionTurnItem::WebSearch(WebSearchItem {
id: call.call_id.clone(),
query: "rust trait object".to_string(),
action: WebSearchAction::Search {
query: Some("rust trait object".to_string()),
queries: None,
},
});
call.turn_item_emitter.emit_started(item.clone()).await;
call.turn_item_emitter.emit_completed(item).await;
*self.captured_call.lock().await = Some(call);
Ok(Box::new(codex_tools::JsonToolOutput::new(
json!({ "ok": true }),
@@ -191,12 +250,14 @@ mod tests {
}
#[tokio::test]
async fn passes_turn_fields_to_extension_call() {
async fn passes_turn_fields_and_scoped_turn_item_emitter_to_extension_call() {
let captured_call = Arc::new(Mutex::new(None));
let handler = ExtensionToolAdapter::new(Arc::new(CapturingExtensionExecutor {
captured_call: Arc::clone(&captured_call),
}));
let (session, turn) = crate::session::tests::make_session_and_context().await;
let (session, turn, rx) = crate::session::tests::make_session_and_context_with_rx().await;
let weak_session = Arc::downgrade(&session);
let weak_turn = Arc::downgrade(&turn);
let turn_id = turn.sub_id.clone();
let truncation_policy = turn.truncation_policy;
let history_item = ResponseItem::Message {
@@ -211,8 +272,8 @@ mod tests {
.record_conversation_items(&turn, std::slice::from_ref(&history_item))
.await;
let invocation = ToolInvocation {
session: session.into(),
turn: turn.into(),
session,
turn,
cancellation_token: tokio_util::sync::CancellationToken::new(),
tracker: Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new())),
call_id: "call-extension".to_string(),
@@ -228,6 +289,8 @@ mod tests {
.expect("extension call should succeed");
let captured_call = captured_call.lock().await.clone().expect("captured call");
assert!(weak_session.upgrade().is_none());
assert!(weak_turn.upgrade().is_none());
assert_eq!(captured_call.turn_id, turn_id);
assert_eq!(captured_call.call_id, "call-extension");
assert_eq!(
@@ -245,5 +308,43 @@ mod tests {
}
payload => panic!("expected function payload, got {payload:?}"),
}
let started = rx.recv().await.expect("item started event");
let EventMsg::ItemStarted(started) = started.msg else {
panic!("expected item started event");
};
let TurnItem::WebSearch(started_item) = started.item else {
panic!("expected web search item");
};
let begin = rx.recv().await.expect("legacy web search begin event");
let EventMsg::WebSearchBegin(begin) = begin.msg else {
panic!("expected legacy web search begin event");
};
let completed = rx.recv().await.expect("item completed event");
let EventMsg::ItemCompleted(completed) = completed.msg else {
panic!("expected item completed event");
};
let TurnItem::WebSearch(completed_item) = completed.item else {
panic!("expected web search item");
};
let end = rx.recv().await.expect("legacy web search end event");
let EventMsg::WebSearchEnd(end) = end.msg else {
panic!("expected legacy web search end event");
};
let expected = WebSearchItem {
id: "call-extension".to_string(),
query: "rust trait object".to_string(),
action: WebSearchAction::Search {
query: Some("rust trait object".to_string()),
queries: None,
},
};
assert_eq!(started_item, expected);
assert_eq!(completed_item, expected);
assert_eq!(begin.call_id, expected.id);
assert_eq!(end.call_id, expected.id);
assert_eq!(end.query, expected.query);
assert_eq!(end.action, expected.action);
}
}

View File

@@ -11,8 +11,10 @@ pub use capabilities::NoopResponseItemInjector;
pub use capabilities::ResponseItemInjectionFuture;
pub use capabilities::ResponseItemInjector;
pub use codex_tools::ConversationHistory;
pub use codex_tools::ExtensionTurnItem;
pub use codex_tools::FunctionCallError;
pub use codex_tools::JsonToolOutput;
pub use codex_tools::NoopTurnItemEmitter;
pub use codex_tools::ResponsesApiTool;
pub use codex_tools::ToolCall;
pub use codex_tools::ToolExecutor;
@@ -20,6 +22,8 @@ pub use codex_tools::ToolName;
pub use codex_tools::ToolOutput;
pub use codex_tools::ToolPayload;
pub use codex_tools::ToolSpec;
pub use codex_tools::TurnItemEmissionFuture;
pub use codex_tools::TurnItemEmitter;
pub use codex_tools::parse_tool_input_schema;
pub use codex_tools::parse_tool_input_schema_without_compaction;
pub use contributors::ApprovalReviewContributor;

View File

@@ -8,6 +8,7 @@ use codex_extension_api::ExtensionData;
use codex_extension_api::ExtensionEventSink;
use codex_extension_api::ExtensionRegistryBuilder;
use codex_extension_api::FunctionCallError;
use codex_extension_api::NoopTurnItemEmitter;
use codex_extension_api::ThreadResumeInput;
use codex_extension_api::ThreadStartInput;
use codex_extension_api::ToolCall;
@@ -1121,6 +1122,7 @@ fn tool_call(tool_name: &str, call_id: &str, arguments: serde_json::Value) -> To
tool_name: codex_extension_api::ToolName::plain(tool_name),
truncation_policy: TruncationPolicy::Bytes(1024),
conversation_history: codex_extension_api::ConversationHistory::default(),
turn_item_emitter: Arc::new(NoopTurnItemEmitter),
payload: ToolPayload::Function {
arguments: arguments.to_string(),
},

View File

@@ -4,6 +4,7 @@ use std::sync::Arc;
use codex_extension_api::ContextContributor;
use codex_extension_api::ExtensionData;
use codex_extension_api::ExtensionRegistryBuilder;
use codex_extension_api::NoopTurnItemEmitter;
use codex_extension_api::PromptSlot;
use codex_extension_api::ToolCall;
use codex_extension_api::ToolContributor;
@@ -212,6 +213,7 @@ async fn add_ad_hoc_note_tool_creates_note_file() {
tool_name: memory_tool_name(crate::ADD_AD_HOC_NOTE_TOOL_NAME),
truncation_policy: TruncationPolicy::Bytes(1024),
conversation_history: codex_extension_api::ConversationHistory::default(),
turn_item_emitter: Arc::new(NoopTurnItemEmitter),
payload: payload.clone(),
})
.await
@@ -253,6 +255,7 @@ async fn add_ad_hoc_note_tool_rejects_paths_as_filenames() {
tool_name: memory_tool_name(crate::ADD_AD_HOC_NOTE_TOOL_NAME),
truncation_policy: TruncationPolicy::Bytes(1024),
conversation_history: codex_extension_api::ConversationHistory::default(),
turn_item_emitter: Arc::new(NoopTurnItemEmitter),
payload,
})
.await;
@@ -295,6 +298,7 @@ async fn read_tool_reads_memory_file() {
tool_name: memory_tool_name(crate::READ_TOOL_NAME),
truncation_policy: TruncationPolicy::Bytes(1024),
conversation_history: codex_extension_api::ConversationHistory::default(),
turn_item_emitter: Arc::new(NoopTurnItemEmitter),
payload: payload.clone(),
})
.await
@@ -340,6 +344,7 @@ async fn search_tool_accepts_multiple_queries() {
tool_name: memory_tool_name(crate::SEARCH_TOOL_NAME),
truncation_policy: TruncationPolicy::Bytes(1024),
conversation_history: codex_extension_api::ConversationHistory::default(),
turn_item_emitter: Arc::new(NoopTurnItemEmitter),
payload: payload.clone(),
})
.await
@@ -411,6 +416,7 @@ async fn search_tool_accepts_windowed_all_match_mode() {
tool_name: memory_tool_name(crate::SEARCH_TOOL_NAME),
truncation_policy: TruncationPolicy::Bytes(1024),
conversation_history: codex_extension_api::ConversationHistory::default(),
turn_item_emitter: Arc::new(NoopTurnItemEmitter),
payload: payload.clone(),
})
.await
@@ -462,6 +468,7 @@ async fn search_tool_rejects_legacy_single_query() {
tool_name: memory_tool_name(crate::SEARCH_TOOL_NAME),
truncation_policy: TruncationPolicy::Bytes(1024),
conversation_history: codex_extension_api::ConversationHistory::default(),
turn_item_emitter: Arc::new(NoopTurnItemEmitter),
payload,
})
.await;

View File

@@ -62,7 +62,11 @@ pub use responses_api::mcp_tool_to_deferred_responses_api_tool;
pub use responses_api::mcp_tool_to_responses_api_tool;
pub use responses_api::tool_definition_to_responses_api_tool;
pub use tool_call::ConversationHistory;
pub use tool_call::ExtensionTurnItem;
pub use tool_call::NoopTurnItemEmitter;
pub use tool_call::ToolCall;
pub use tool_call::TurnItemEmissionFuture;
pub use tool_call::TurnItemEmitter;
pub use tool_config::ShellCommandBackendConfig;
pub use tool_config::ToolEnvironmentMode;
pub use tool_config::ToolUserShellType;

View File

@@ -1,8 +1,11 @@
use crate::FunctionCallError;
use crate::ToolName;
use crate::ToolPayload;
use codex_protocol::items::WebSearchItem;
use codex_protocol::models::ResponseItem;
use codex_utils_output_truncation::TruncationPolicy;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
/// Raw response history snapshot available when an extension tool is invoked.
@@ -23,17 +26,70 @@ impl ConversationHistory {
}
}
/// Future returned when an extension tool emits a visible turn-item lifecycle event.
pub type TurnItemEmissionFuture<'a> = Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
/// Visible turn items that an extension fully owns and may emit as-is.
///
/// Add only item kinds that require no additional host finalization before
/// persistence or client delivery. Richer items need a host-owned publish path.
#[derive(Clone, Debug, PartialEq)]
pub enum ExtensionTurnItem {
WebSearch(WebSearchItem),
}
/// Host-provided capability for extension tools to emit finalized visible turn items.
///
/// Implementations route lifecycle events through the host's normal item event
/// pipeline, including any persistence and client delivery owned by the host.
pub trait TurnItemEmitter: Send + Sync {
/// Emits the beginning of one visible turn item.
fn emit_started<'a>(&'a self, item: ExtensionTurnItem) -> TurnItemEmissionFuture<'a>;
/// Emits the completion of one visible turn item.
fn emit_completed<'a>(&'a self, item: ExtensionTurnItem) -> TurnItemEmissionFuture<'a>;
}
/// Turn-item emitter used when a caller does not expose visible item emission.
#[derive(Debug, Default, Clone, Copy)]
pub struct NoopTurnItemEmitter;
impl TurnItemEmitter for NoopTurnItemEmitter {
fn emit_started<'a>(&'a self, _item: ExtensionTurnItem) -> TurnItemEmissionFuture<'a> {
Box::pin(std::future::ready(()))
}
fn emit_completed<'a>(&'a self, _item: ExtensionTurnItem) -> TurnItemEmissionFuture<'a> {
Box::pin(std::future::ready(()))
}
}
// TODO: this is temporary and will disappear in the next PR (as we make codex-extension-api generic on Invocation.
#[derive(Clone, Debug)]
#[derive(Clone)]
pub struct ToolCall {
pub turn_id: String,
pub call_id: String,
pub tool_name: ToolName,
pub truncation_policy: TruncationPolicy,
pub conversation_history: ConversationHistory,
pub turn_item_emitter: Arc<dyn TurnItemEmitter>,
pub payload: ToolPayload,
}
impl std::fmt::Debug for ToolCall {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ToolCall")
.field("turn_id", &self.turn_id)
.field("call_id", &self.call_id)
.field("tool_name", &self.tool_name)
.field("truncation_policy", &self.truncation_policy)
.field("conversation_history", &self.conversation_history)
.field("turn_item_emitter", &"<host turn item emitter>")
.field("payload", &self.payload)
.finish()
}
}
impl ToolCall {
pub fn function_arguments(&self) -> Result<&str, FunctionCallError> {
match &self.payload {