Add turn error lifecycle contributor (#24916)

Summary
- Add TurnErrorInput and TurnLifecycleContributor::on_turn_error to the
extension API.
- Emit the turn-error lifecycle from core turn error paths, including
usage limit failures.
- Add direct lifecycle coverage for the emitted error facts and stores.

Tests
- just fmt
- git diff --check
- Not run: full tests or clippy (per instructions)
This commit is contained in:
jif-oai
2026-05-28 16:13:54 +02:00
committed by GitHub
parent ec803fe6c7
commit e7d156eb08
6 changed files with 135 additions and 4 deletions

View File

@@ -106,6 +106,7 @@ use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::CodexErrorInfo;
use codex_protocol::protocol::CompactedItem;
use codex_protocol::protocol::ConversationAudioParams;
use codex_protocol::protocol::CreditsSnapshot;
@@ -2037,6 +2038,85 @@ async fn turn_start_lifecycle_exposes_turn_metadata_and_token_baseline() {
assert_eq!(vec![expected], actual);
}
#[tokio::test]
async fn turn_error_lifecycle_exposes_error_and_stores() {
struct SessionTurnErrorMarker;
struct ThreadTurnErrorMarker;
#[derive(Debug, PartialEq, Eq)]
struct RecordedTurnError {
session_level_id: String,
thread_level_id: String,
turn_level_id: String,
turn_id: String,
error: CodexErrorInfo,
saw_session_store: bool,
saw_thread_store: bool,
}
struct TurnErrorRecorder {
records: Arc<std::sync::Mutex<Vec<RecordedTurnError>>>,
}
#[async_trait::async_trait]
impl codex_extension_api::TurnLifecycleContributor for TurnErrorRecorder {
async fn on_turn_error(&self, input: codex_extension_api::TurnErrorInput<'_>) {
self.records
.lock()
.expect("turn error records lock")
.push(RecordedTurnError {
session_level_id: input.session_store.level_id().to_string(),
thread_level_id: input.thread_store.level_id().to_string(),
turn_level_id: input.turn_store.level_id().to_string(),
turn_id: input.turn_id.to_string(),
error: input.error,
saw_session_store: input
.session_store
.get::<SessionTurnErrorMarker>()
.is_some(),
saw_thread_store: input.thread_store.get::<ThreadTurnErrorMarker>().is_some(),
});
}
}
let (mut session, turn_context) = make_session_and_context().await;
let records = Arc::new(std::sync::Mutex::new(Vec::new()));
let mut builder = codex_extension_api::ExtensionRegistryBuilder::<crate::config::Config>::new();
builder.turn_lifecycle_contributor(Arc::new(TurnErrorRecorder {
records: Arc::clone(&records),
}));
session.services.extensions = Arc::new(builder.build());
session
.services
.session_extension_data
.insert(SessionTurnErrorMarker);
session
.services
.thread_extension_data
.insert(ThreadTurnErrorMarker);
let expected = RecordedTurnError {
session_level_id: session.session_id().to_string(),
thread_level_id: session.conversation_id.to_string(),
turn_level_id: turn_context.sub_id.clone(),
turn_id: turn_context.sub_id.clone(),
error: CodexErrorInfo::UsageLimitExceeded,
saw_session_store: true,
saw_thread_store: true,
};
session
.emit_turn_error_lifecycle(&turn_context, CodexErrorInfo::UsageLimitExceeded)
.await;
let actual = records
.lock()
.expect("turn error records lock")
.drain(..)
.collect::<Vec<_>>();
assert_eq!(vec![expected], actual);
}
#[tokio::test]
async fn config_change_contributor_observes_effective_config_changes() {
struct SessionConfigMarker;

View File

@@ -145,7 +145,10 @@ pub(crate) async fn run_turn(
// diffs/full reinjection + user input) and trigger compaction preemptively
// when they would push the thread over the compaction threshold.
if let Err(err) = run_pre_sampling_compact(&sess, &turn_context, &mut client_session).await {
if err.to_codex_protocol_error() == CodexErrorInfo::UsageLimitExceeded
let error = err.to_codex_protocol_error();
sess.emit_turn_error_lifecycle(turn_context.as_ref(), error.clone())
.await;
if error == CodexErrorInfo::UsageLimitExceeded
&& let Err(err) = sess
.goal_runtime_apply(GoalRuntimeEvent::UsageLimitReached {
turn_context: turn_context.as_ref(),
@@ -295,7 +298,10 @@ pub(crate) async fn run_turn(
)
.await
{
if err.to_codex_protocol_error() == CodexErrorInfo::UsageLimitExceeded
let error = err.to_codex_protocol_error();
sess.emit_turn_error_lifecycle(turn_context.as_ref(), error.clone())
.await;
if error == CodexErrorInfo::UsageLimitExceeded
&& let Err(err) = sess
.goal_runtime_apply(GoalRuntimeEvent::UsageLimitReached {
turn_context: turn_context.as_ref(),
@@ -374,17 +380,23 @@ pub(crate) async fn run_turn(
}
}
let error = CodexErrorInfo::BadRequest;
sess.emit_turn_error_lifecycle(turn_context.as_ref(), error.clone())
.await;
let event = EventMsg::Error(ErrorEvent {
message: "Invalid image in your last message. Please remove it and try again."
.to_string(),
codex_error_info: Some(CodexErrorInfo::BadRequest),
codex_error_info: Some(error),
});
sess.send_event(&turn_context, event).await;
break;
}
Err(e) => {
info!("Turn error: {e:#}");
if e.to_codex_protocol_error() == CodexErrorInfo::UsageLimitExceeded
let error = e.to_codex_protocol_error();
sess.emit_turn_error_lifecycle(turn_context.as_ref(), error.clone())
.await;
if error == CodexErrorInfo::UsageLimitExceeded
&& let Err(err) = sess
.goal_runtime_apply(GoalRuntimeEvent::UsageLimitReached {
turn_context: turn_context.as_ref(),

View File

@@ -1,4 +1,5 @@
use codex_extension_api::ExtensionData;
use codex_protocol::protocol::CodexErrorInfo;
use codex_protocol::protocol::TokenUsage;
use codex_protocol::protocol::TurnAbortReason;
@@ -53,4 +54,22 @@ impl Session {
.await;
}
}
pub(crate) async fn emit_turn_error_lifecycle(
&self,
turn_context: &TurnContext,
error: CodexErrorInfo,
) {
for contributor in self.services.extensions.turn_lifecycle_contributors() {
contributor
.on_turn_error(codex_extension_api::TurnErrorInput {
turn_id: turn_context.sub_id.as_str(),
error: error.clone(),
session_store: &self.services.session_extension_data,
thread_store: &self.services.thread_extension_data,
turn_store: turn_context.extension_data.as_ref(),
})
.await;
}
}
}

View File

@@ -26,6 +26,7 @@ pub use tool_lifecycle::ToolFinishInput;
pub use tool_lifecycle::ToolLifecycleFuture;
pub use tool_lifecycle::ToolStartInput;
pub use turn_lifecycle::TurnAbortInput;
pub use turn_lifecycle::TurnErrorInput;
pub use turn_lifecycle::TurnStartInput;
pub use turn_lifecycle::TurnStopInput;
@@ -79,6 +80,9 @@ pub trait TurnLifecycleContributor: Send + Sync {
/// Called after the host aborts a running turn.
async fn on_turn_abort(&self, _input: TurnAbortInput<'_>) {}
/// Called when the host observes an error for a running turn.
async fn on_turn_error(&self, _input: TurnErrorInput<'_>) {}
}
/// Contributor for host-owned configuration changes.

View File

@@ -1,4 +1,5 @@
use codex_protocol::config_types::CollaborationMode;
use codex_protocol::protocol::CodexErrorInfo;
use codex_protocol::protocol::TokenUsage;
use codex_protocol::protocol::TurnAbortReason;
@@ -41,3 +42,17 @@ pub struct TurnAbortInput<'a> {
/// Store scoped to this turn runtime.
pub turn_store: &'a ExtensionData,
}
/// Input supplied when the host observes an error for a turn.
pub struct TurnErrorInput<'a> {
/// Stable host-owned turn identifier.
pub turn_id: &'a str,
/// Error surfaced by the host for this turn.
pub error: CodexErrorInfo,
/// Store scoped to the host session runtime.
pub session_store: &'a ExtensionData,
/// Store scoped to this thread runtime.
pub thread_store: &'a ExtensionData,
/// Store scoped to this turn runtime.
pub turn_store: &'a ExtensionData,
}

View File

@@ -41,6 +41,7 @@ pub use contributors::ToolLifecycleContributor;
pub use contributors::ToolLifecycleFuture;
pub use contributors::ToolStartInput;
pub use contributors::TurnAbortInput;
pub use contributors::TurnErrorInput;
pub use contributors::TurnItemContributor;
pub use contributors::TurnLifecycleContributor;
pub use contributors::TurnStartInput;