Files
codex/codex-rs/app-server/src/request_processors/thread_lifecycle.rs
rhan-oai fbdbc6b2fe [codex-analytics] emit tool item events from item lifecycle (#17090)
## Why

After the tool-item schemas are in place, analytics needs to emit them
from the app-server item lifecycle rather than requiring bespoke
tracking at each callsite. The reducer should also reuse the shared
thread analytics context introduced below it in the stack so later event
families do not repeat the same reducer joins or missing-state ladder.

## What changed

- Tracks tool-item completion notifications and emits the matching tool
analytics event when a terminal item arrives.
- Derives event-specific payload details for command execution, file
changes, MCP calls, dynamic tools, collaboration tools, web search, and
image generation.
- Denormalizes thread, app-server client, runtime, and subagent
provenance metadata through the shared thread analytics context.
- Adds reducer coverage for item lifecycle emission and subagent
metadata inheritance.

## Duration semantics

`duration_ms` is computed from the app-server item lifecycle timestamps:
`completed_at_ms - started_at_ms`. That makes it the duration of the
lifecycle Codex observed locally, not necessarily the upstream
provider's full execution time.

- Web search usually has a meaningful observed lifecycle because
Responses can send `response.output_item.added` before
`response.output_item.done`; in that case `started_at_ms` comes from the
added event and `completed_at_ms` comes from the done event.
- Image generation can be much less precise. In the current observed
stream, image generation often arrives only as a completed
`response.output_item.done`; when there is no earlier added event, Codex
synthesizes the started item immediately before completion, so
`duration_ms` can be `0` even though upstream image generation took
longer.
- Standalone web search and standalone image generation work is expected
to land after this stack. Those paths may introduce more direct
lifecycle events or timing points, so the current
web-search/image-generation duration semantics should be treated as the
best available item-lifecycle approximation, not the final latency
contract for those tool families.
- `execution_duration_ms` is populated only where the completed item
already carries a native execution duration; otherwise it remains `null`
while `duration_ms` still reflects the local lifecycle interval.

## Currently placeholder / partial fields

Some fields are included in the schema for the intended steady-state
contract, but this PR does not yet populate them from real
approval/review state:

- `review_count`, `guardian_review_count`, and `user_review_count`
currently default to `0`.
- `final_approval_outcome` currently defaults to `unknown`.
- `requested_additional_permissions` and `requested_network_access`
currently default to `false`.

## Verification

- `cargo test -p codex-analytics`

---
[//]: # (BEGIN SAPLING FOOTER)
Stack created with [Sapling](https://sapling-scm.com). Best reviewed
with [ReviewStack](https://reviewstack.dev/openai/codex/pull/17090).
* #18748
* #18747
* __->__ #17090
* #17089
* #20514
2026-05-06 20:27:41 +00:00

750 lines
26 KiB
Rust

use super::*;
pub(super) const THREAD_UNLOADING_DELAY: Duration = Duration::from_secs(30 * 60);
#[derive(Clone)]
pub(super) struct ListenerTaskContext {
pub(super) thread_manager: Arc<ThreadManager>,
pub(super) thread_state_manager: ThreadStateManager,
pub(super) outgoing: Arc<OutgoingMessageSender>,
pub(super) pending_thread_unloads: Arc<Mutex<HashSet<ThreadId>>>,
pub(super) thread_watch_manager: ThreadWatchManager,
pub(super) thread_list_state_permit: Arc<Semaphore>,
pub(super) fallback_model_provider: String,
pub(super) codex_home: PathBuf,
}
struct UnloadingState {
delay: Duration,
has_subscribers_rx: watch::Receiver<bool>,
has_subscribers: (bool, Instant),
thread_status_rx: watch::Receiver<ThreadStatus>,
is_active: (bool, Instant),
}
impl UnloadingState {
async fn new(
listener_task_context: &ListenerTaskContext,
thread_id: ThreadId,
delay: Duration,
) -> Option<Self> {
let has_subscribers_rx = listener_task_context
.thread_state_manager
.subscribe_to_has_connections(thread_id)
.await?;
let thread_status_rx = listener_task_context
.thread_watch_manager
.subscribe(thread_id)
.await?;
let has_subscribers = (*has_subscribers_rx.borrow(), Instant::now());
let is_active = (
matches!(*thread_status_rx.borrow(), ThreadStatus::Active { .. }),
Instant::now(),
);
Some(Self {
delay,
has_subscribers_rx,
has_subscribers,
thread_status_rx,
is_active,
})
}
fn unloading_target(&self) -> Option<Instant> {
match (self.has_subscribers, self.is_active) {
((false, has_no_subscribers_since), (false, is_inactive_since)) => {
Some(std::cmp::max(has_no_subscribers_since, is_inactive_since) + self.delay)
}
_ => None,
}
}
fn sync_receiver_values(&mut self) {
let has_subscribers = *self.has_subscribers_rx.borrow();
if self.has_subscribers.0 != has_subscribers {
self.has_subscribers = (has_subscribers, Instant::now());
}
let is_active = matches!(*self.thread_status_rx.borrow(), ThreadStatus::Active { .. });
if self.is_active.0 != is_active {
self.is_active = (is_active, Instant::now());
}
}
fn should_unload_now(&mut self) -> bool {
self.sync_receiver_values();
self.unloading_target()
.is_some_and(|target| target <= Instant::now())
}
fn note_thread_activity_observed(&mut self) {
if !self.is_active.0 {
self.is_active = (false, Instant::now());
}
}
async fn wait_for_unloading_trigger(&mut self) -> bool {
loop {
self.sync_receiver_values();
let unloading_target = self.unloading_target();
if let Some(target) = unloading_target
&& target <= Instant::now()
{
return true;
}
let unloading_sleep = async {
if let Some(target) = unloading_target {
tokio::time::sleep_until(target.into()).await;
} else {
futures::future::pending::<()>().await;
}
};
tokio::select! {
_ = unloading_sleep => return true,
changed = self.has_subscribers_rx.changed() => {
if changed.is_err() {
return false;
}
self.sync_receiver_values();
},
changed = self.thread_status_rx.changed() => {
if changed.is_err() {
return false;
}
self.sync_receiver_values();
},
}
}
}
}
pub(super) enum ThreadShutdownResult {
Complete,
SubmitFailed,
TimedOut,
}
pub(super) enum EnsureConversationListenerResult {
Attached,
ConnectionClosed,
}
#[expect(
clippy::await_holding_invalid_type,
reason = "listener subscription must be serialized against pending unloads"
)]
pub(super) async fn ensure_conversation_listener(
listener_task_context: ListenerTaskContext,
conversation_id: ThreadId,
connection_id: ConnectionId,
raw_events_enabled: bool,
) -> Result<EnsureConversationListenerResult, JSONRPCErrorError> {
let conversation = match listener_task_context
.thread_manager
.get_thread(conversation_id)
.await
{
Ok(conv) => conv,
Err(_) => {
return Err(invalid_request(format!(
"thread not found: {conversation_id}"
)));
}
};
let thread_state = {
let pending_thread_unloads = listener_task_context.pending_thread_unloads.lock().await;
if pending_thread_unloads.contains(&conversation_id) {
return Err(invalid_request(format!(
"thread {conversation_id} is closing; retry after the thread is closed"
)));
}
let Some(thread_state) = listener_task_context
.thread_state_manager
.try_ensure_connection_subscribed(conversation_id, connection_id, raw_events_enabled)
.await
else {
return Ok(EnsureConversationListenerResult::ConnectionClosed);
};
thread_state
};
if let Err(error) = ensure_listener_task_running(
listener_task_context.clone(),
conversation_id,
conversation,
thread_state,
)
.await
{
let _ = listener_task_context
.thread_state_manager
.unsubscribe_connection_from_thread(conversation_id, connection_id)
.await;
return Err(error);
}
Ok(EnsureConversationListenerResult::Attached)
}
pub(super) fn log_listener_attach_result(
result: Result<EnsureConversationListenerResult, JSONRPCErrorError>,
thread_id: ThreadId,
connection_id: ConnectionId,
thread_kind: &'static str,
) {
match result {
Ok(EnsureConversationListenerResult::Attached) => {}
Ok(EnsureConversationListenerResult::ConnectionClosed) => {
tracing::debug!(
thread_id = %thread_id,
connection_id = ?connection_id,
"skipping auto-attach for closed connection"
);
}
Err(err) => {
tracing::warn!(
"failed to attach listener for {thread_kind} {thread_id}: {message}",
message = err.message
);
}
}
}
pub(super) async fn ensure_listener_task_running(
listener_task_context: ListenerTaskContext,
conversation_id: ThreadId,
conversation: Arc<CodexThread>,
thread_state: Arc<Mutex<ThreadState>>,
) -> Result<(), JSONRPCErrorError> {
let (cancel_tx, mut cancel_rx) = oneshot::channel();
let Some(mut unloading_state) = UnloadingState::new(
&listener_task_context,
conversation_id,
THREAD_UNLOADING_DELAY,
)
.await
else {
return Err(invalid_request(format!(
"thread {conversation_id} is closing; retry after the thread is closed"
)));
};
let (mut listener_command_rx, listener_generation) = {
let mut thread_state = thread_state.lock().await;
if thread_state.listener_matches(&conversation) {
return Ok(());
}
thread_state.set_listener(cancel_tx, &conversation)
};
let ListenerTaskContext {
outgoing,
thread_manager,
thread_state_manager,
pending_thread_unloads,
thread_watch_manager,
thread_list_state_permit,
fallback_model_provider,
codex_home,
} = listener_task_context;
let outgoing_for_task = Arc::clone(&outgoing);
tokio::spawn(async move {
loop {
tokio::select! {
biased;
_ = &mut cancel_rx => {
// Listener was superseded or the thread is being torn down.
break;
}
listener_command = listener_command_rx.recv() => {
let Some(listener_command) = listener_command else {
break;
};
handle_thread_listener_command(
conversation_id,
&conversation,
codex_home.as_path(),
&thread_state_manager,
&thread_state,
&thread_watch_manager,
&outgoing_for_task,
&pending_thread_unloads,
listener_command,
)
.await;
}
event = conversation.next_event() => {
let event = match event {
Ok(event) => event,
Err(err) => {
tracing::warn!("thread.next_event() failed with: {err}");
break;
}
};
// Track the event before emitting any typed translations
// so thread-local state such as raw event opt-in stays
// synchronized with the conversation.
let raw_events_enabled = {
let mut thread_state = thread_state.lock().await;
thread_state.track_current_turn_event(&event.id, &event.msg);
thread_state.experimental_raw_events
};
let subscribed_connection_ids = thread_state_manager
.subscribed_connection_ids(conversation_id)
.await;
let thread_outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing_for_task.clone(),
subscribed_connection_ids,
conversation_id,
);
if let EventMsg::RawResponseItem(raw_response_item_event) = &event.msg
&& !raw_events_enabled
{
maybe_emit_hook_prompt_item_completed(
conversation_id,
&event.id,
&raw_response_item_event.item,
&thread_outgoing,
)
.await;
continue;
}
apply_bespoke_event_handling(
event.clone(),
conversation_id,
conversation.clone(),
thread_manager.clone(),
thread_outgoing,
thread_state.clone(),
thread_watch_manager.clone(),
thread_list_state_permit.clone(),
fallback_model_provider.clone(),
)
.await;
}
unloading_watchers_open = unloading_state.wait_for_unloading_trigger() => {
if !unloading_watchers_open {
break;
}
if !unloading_state.should_unload_now() {
continue;
}
if matches!(conversation.agent_status().await, AgentStatus::Running) {
unloading_state.note_thread_activity_observed();
continue;
}
{
let mut pending_thread_unloads = pending_thread_unloads.lock().await;
if pending_thread_unloads.contains(&conversation_id) {
continue;
}
if !unloading_state.should_unload_now() {
continue;
}
pending_thread_unloads.insert(conversation_id);
}
unload_thread_without_subscribers(
thread_manager.clone(),
outgoing_for_task.clone(),
pending_thread_unloads.clone(),
thread_state_manager.clone(),
thread_watch_manager.clone(),
conversation_id,
conversation.clone(),
)
.await;
break;
}
}
}
let mut thread_state = thread_state.lock().await;
if thread_state.listener_generation == listener_generation {
thread_state.clear_listener();
}
});
Ok(())
}
pub(super) async fn wait_for_thread_shutdown(thread: &Arc<CodexThread>) -> ThreadShutdownResult {
match tokio::time::timeout(Duration::from_secs(10), thread.shutdown_and_wait()).await {
Ok(Ok(())) => ThreadShutdownResult::Complete,
Ok(Err(_)) => ThreadShutdownResult::SubmitFailed,
Err(_) => ThreadShutdownResult::TimedOut,
}
}
pub(super) async fn unload_thread_without_subscribers(
thread_manager: Arc<ThreadManager>,
outgoing: Arc<OutgoingMessageSender>,
pending_thread_unloads: Arc<Mutex<HashSet<ThreadId>>>,
thread_state_manager: ThreadStateManager,
thread_watch_manager: ThreadWatchManager,
thread_id: ThreadId,
thread: Arc<CodexThread>,
) {
info!("thread {thread_id} has no subscribers and is idle; shutting down");
// Any pending app-server -> client requests for this thread can no longer be
// answered; cancel their callbacks before shutdown/unload.
outgoing
.cancel_requests_for_thread(thread_id, /*error*/ None)
.await;
thread_state_manager.remove_thread_state(thread_id).await;
tokio::spawn(async move {
match wait_for_thread_shutdown(&thread).await {
ThreadShutdownResult::Complete => {
if thread_manager.remove_thread(&thread_id).await.is_none() {
info!("thread {thread_id} was already removed before teardown finalized");
thread_watch_manager
.remove_thread(&thread_id.to_string())
.await;
pending_thread_unloads.lock().await.remove(&thread_id);
return;
}
thread_watch_manager
.remove_thread(&thread_id.to_string())
.await;
let notification = ThreadClosedNotification {
thread_id: thread_id.to_string(),
};
outgoing
.send_server_notification(ServerNotification::ThreadClosed(notification))
.await;
pending_thread_unloads.lock().await.remove(&thread_id);
}
ThreadShutdownResult::SubmitFailed => {
pending_thread_unloads.lock().await.remove(&thread_id);
warn!("failed to submit Shutdown to thread {thread_id}");
}
ThreadShutdownResult::TimedOut => {
pending_thread_unloads.lock().await.remove(&thread_id);
warn!("thread {thread_id} shutdown timed out; leaving thread loaded");
}
}
});
}
#[allow(clippy::too_many_arguments)]
pub(super) async fn handle_thread_listener_command(
conversation_id: ThreadId,
conversation: &Arc<CodexThread>,
codex_home: &Path,
thread_state_manager: &ThreadStateManager,
thread_state: &Arc<Mutex<ThreadState>>,
thread_watch_manager: &ThreadWatchManager,
outgoing: &Arc<OutgoingMessageSender>,
pending_thread_unloads: &Arc<Mutex<HashSet<ThreadId>>>,
listener_command: ThreadListenerCommand,
) {
match listener_command {
ThreadListenerCommand::SendThreadResumeResponse(resume_request) => {
handle_pending_thread_resume_request(
conversation_id,
conversation,
codex_home,
thread_state_manager,
thread_state,
thread_watch_manager,
outgoing,
pending_thread_unloads,
*resume_request,
)
.await;
}
ThreadListenerCommand::EmitThreadGoalUpdated { goal } => {
outgoing
.send_server_notification(ServerNotification::ThreadGoalUpdated(
ThreadGoalUpdatedNotification {
thread_id: conversation_id.to_string(),
turn_id: None,
goal,
},
))
.await;
}
ThreadListenerCommand::EmitThreadGoalCleared => {
outgoing
.send_server_notification(ServerNotification::ThreadGoalCleared(
ThreadGoalClearedNotification {
thread_id: conversation_id.to_string(),
},
))
.await;
}
ThreadListenerCommand::EmitThreadGoalSnapshot { state_db } => {
send_thread_goal_snapshot_notification(outgoing, conversation_id, &state_db).await;
}
ThreadListenerCommand::ResolveServerRequest {
request_id,
completion_tx,
} => {
resolve_pending_server_request(
conversation_id,
thread_state_manager,
outgoing,
request_id,
)
.await;
let _ = completion_tx.send(());
}
}
}
#[allow(clippy::too_many_arguments)]
#[expect(
clippy::await_holding_invalid_type,
reason = "running-thread resume subscription must be serialized against pending unloads"
)]
pub(super) async fn handle_pending_thread_resume_request(
conversation_id: ThreadId,
conversation: &Arc<CodexThread>,
_codex_home: &Path,
thread_state_manager: &ThreadStateManager,
thread_state: &Arc<Mutex<ThreadState>>,
thread_watch_manager: &ThreadWatchManager,
outgoing: &Arc<OutgoingMessageSender>,
pending_thread_unloads: &Arc<Mutex<HashSet<ThreadId>>>,
pending: crate::thread_state::PendingThreadResumeRequest,
) {
let active_turn = {
let state = thread_state.lock().await;
state.active_turn_snapshot()
};
tracing::debug!(
thread_id = %conversation_id,
request_id = ?pending.request_id,
active_turn_present = active_turn.is_some(),
active_turn_id = ?active_turn.as_ref().map(|turn| turn.id.as_str()),
active_turn_status = ?active_turn.as_ref().map(|turn| &turn.status),
"composing running thread resume response"
);
let has_live_in_progress_turn =
matches!(conversation.agent_status().await, AgentStatus::Running)
|| active_turn
.as_ref()
.is_some_and(|turn| matches!(turn.status, TurnStatus::InProgress));
let request_id = pending.request_id;
let connection_id = request_id.connection_id;
let mut thread = pending.thread_summary;
if pending.include_turns {
populate_thread_turns_from_history(
&mut thread,
&pending.history_items,
active_turn.as_ref(),
);
}
let thread_status = thread_watch_manager
.loaded_status_for_thread(&thread.id)
.await;
set_thread_status_and_interrupt_stale_turns(
&mut thread,
thread_status,
has_live_in_progress_turn,
);
{
let pending_thread_unloads = pending_thread_unloads.lock().await;
if pending_thread_unloads.contains(&conversation_id) {
drop(pending_thread_unloads);
outgoing
.send_error(
request_id,
invalid_request(format!(
"thread {conversation_id} is closing; retry thread/resume after the thread is closed"
)),
)
.await;
return;
}
if !thread_state_manager
.try_add_connection_to_thread(conversation_id, connection_id)
.await
{
tracing::debug!(
thread_id = %conversation_id,
connection_id = ?connection_id,
"skipping running thread resume for closed connection"
);
return;
}
}
if pending.emit_thread_goal_update
&& let Err(err) = conversation.apply_goal_resume_runtime_effects().await
{
tracing::warn!("failed to apply goal resume runtime effects: {err}");
}
let ThreadConfigSnapshot {
model,
model_provider_id,
service_tier,
approval_policy,
approvals_reviewer,
permission_profile,
active_permission_profile,
cwd,
reasoning_effort,
..
} = pending.config_snapshot;
let instruction_sources = pending.instruction_sources;
let sandbox = thread_response_sandbox_policy(&permission_profile, cwd.as_path());
let active_permission_profile =
thread_response_active_permission_profile(active_permission_profile);
let session_id = conversation.session_configured().session_id.to_string();
thread.session_id = session_id;
let response = ThreadResumeResponse {
thread,
model,
model_provider: model_provider_id,
service_tier,
cwd,
instruction_sources,
approval_policy: approval_policy.into(),
approvals_reviewer: approvals_reviewer.into(),
sandbox,
permission_profile: Some(permission_profile.into()),
active_permission_profile,
reasoning_effort,
};
let token_usage_thread = pending.include_turns.then(|| response.thread.clone());
outgoing.send_response(request_id, response).await;
// Match cold resume: metadata-only resume should attach the listener without
// paying the cost of turn reconstruction for historical usage replay.
if let Some(token_usage_thread) = token_usage_thread {
let token_usage_turn_id = latest_token_usage_turn_id_from_rollout_items(
&pending.history_items,
token_usage_thread.turns.as_slice(),
);
// Rejoining a loaded thread has the same UI contract as a cold resume, but
// uses the live conversation state instead of reconstructing a new session.
send_thread_token_usage_update_to_connection(
outgoing,
connection_id,
conversation_id,
&token_usage_thread,
conversation.as_ref(),
token_usage_turn_id,
)
.await;
}
if pending.emit_thread_goal_update {
if let Some(state_db) = pending.thread_goal_state_db {
send_thread_goal_snapshot_notification(outgoing, conversation_id, &state_db).await;
} else {
tracing::warn!(
thread_id = %conversation_id,
"state db unavailable when reading thread goal for running thread resume"
);
}
}
outgoing
.replay_requests_to_connection_for_thread(connection_id, conversation_id)
.await;
// App-server owns resume response and snapshot ordering, so wait until
// replay completes before letting core start goal continuation.
if pending.emit_thread_goal_update
&& let Err(err) = conversation.continue_active_goal_if_idle().await
{
tracing::warn!("failed to continue active goal after running-thread resume: {err}");
}
}
pub(super) async fn send_thread_goal_snapshot_notification(
outgoing: &Arc<OutgoingMessageSender>,
thread_id: ThreadId,
state_db: &StateDbHandle,
) {
match state_db.get_thread_goal(thread_id).await {
Ok(Some(goal)) => {
outgoing
.send_server_notification(ServerNotification::ThreadGoalUpdated(
ThreadGoalUpdatedNotification {
thread_id: thread_id.to_string(),
turn_id: None,
goal: api_thread_goal_from_state(goal),
},
))
.await;
}
Ok(None) => {
outgoing
.send_server_notification(ServerNotification::ThreadGoalCleared(
ThreadGoalClearedNotification {
thread_id: thread_id.to_string(),
},
))
.await;
}
Err(err) => {
tracing::warn!(
thread_id = %thread_id,
"failed to read thread goal for resume snapshot: {err}"
);
}
}
}
pub(crate) fn populate_thread_turns_from_history(
thread: &mut Thread,
items: &[RolloutItem],
active_turn: Option<&Turn>,
) {
let mut turns = build_api_turns_from_rollout_items(items);
if let Some(active_turn) = active_turn {
merge_turn_history_with_active_turn(&mut turns, active_turn.clone());
}
thread.turns = turns;
}
pub(super) async fn resolve_pending_server_request(
conversation_id: ThreadId,
thread_state_manager: &ThreadStateManager,
outgoing: &Arc<OutgoingMessageSender>,
request_id: RequestId,
) {
let thread_id = conversation_id.to_string();
let subscribed_connection_ids = thread_state_manager
.subscribed_connection_ids(conversation_id)
.await;
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing.clone(),
subscribed_connection_ids,
conversation_id,
);
outgoing
.send_server_notification(ServerNotification::ServerRequestResolved(
ServerRequestResolvedNotification {
thread_id,
request_id,
},
))
.await;
}
pub(super) fn merge_turn_history_with_active_turn(turns: &mut Vec<Turn>, active_turn: Turn) {
turns.retain(|turn| turn.id != active_turn.id);
turns.push(active_turn);
}
pub(super) fn set_thread_status_and_interrupt_stale_turns(
thread: &mut Thread,
loaded_status: ThreadStatus,
has_live_in_progress_turn: bool,
) {
let status = resolve_thread_status(loaded_status, has_live_in_progress_turn);
if !matches!(status, ThreadStatus::Active { .. }) {
for turn in &mut thread.turns {
if matches!(turn.status, TurnStatus::InProgress) {
turn.status = TurnStatus::Interrupted;
}
}
}
thread.status = status;
}