Compare commits

...

1 Commits

Author SHA1 Message Date
rreichel3-oai
1e50865d9a Emit OTEL user prompts when recording turns
core/src/session/mod.rs: move user prompt telemetry to the shared user-message recording path and keep steer_input focused on queuing accepted input.

core/src/session/handlers.rs: remove duplicate prompt telemetry from submission handling now that recording owns it.

core/tests/suite/otel.rs: assert normal and steered inputs emit codex.user_prompt events.
2026-05-08 14:43:54 -04:00
3 changed files with 131 additions and 44 deletions

View File

@@ -244,17 +244,13 @@ pub(super) async fn user_input_or_turn_inner(
)
.await
{
Ok(_) => {
current_context.session_telemetry.user_prompt(&items);
Some(items)
}
Ok(_) => Some(items),
Err(SteerInputError::NoActiveTurn(items)) => {
if let Some(responsesapi_client_metadata) = responsesapi_client_metadata {
current_context
.turn_metadata_state
.set_responsesapi_client_metadata(responsesapi_client_metadata);
}
current_context.session_telemetry.user_prompt(&items);
sess.refresh_mcp_servers_if_requested(
&current_context,
Some(sess.mcp_elicitation_reviewer()),

View File

@@ -2947,6 +2947,7 @@ impl Session {
input: &[UserInput],
response_item: ResponseItem,
) {
turn_context.session_telemetry.user_prompt(input);
// Persist the user message to history, but emit the turn item from `UserInput` so
// UI-only `text_elements` are preserved. `ResponseItem::Message` does not carry
// those spans, and `record_response_item_and_emit_turn_item` would drop them.
@@ -2993,52 +2994,54 @@ impl Session {
return Err(SteerInputError::EmptyInput);
}
let mut active = self.active_turn.lock().await;
let Some(active_turn) = active.as_mut() else {
return Err(SteerInputError::NoActiveTurn(input));
};
let active_turn_id = {
let mut active = self.active_turn.lock().await;
let Some(active_turn) = active.as_mut() else {
return Err(SteerInputError::NoActiveTurn(input));
};
let Some((active_turn_id, _)) = active_turn.tasks.first() else {
return Err(SteerInputError::NoActiveTurn(input));
};
let Some((active_turn_id, active_task)) = active_turn.tasks.first() else {
return Err(SteerInputError::NoActiveTurn(input));
};
let active_turn_id = active_turn_id.clone();
if let Some(expected_turn_id) = expected_turn_id
&& expected_turn_id != active_turn_id
{
return Err(SteerInputError::ExpectedTurnMismatch {
expected: expected_turn_id.to_string(),
actual: active_turn_id.clone(),
});
}
match active_turn.tasks.first().map(|(_, task)| task.kind) {
Some(crate::state::TaskKind::Regular) => {}
Some(crate::state::TaskKind::Review) => {
return Err(SteerInputError::ActiveTurnNotSteerable {
turn_kind: NonSteerableTurnKind::Review,
if let Some(expected_turn_id) = expected_turn_id
&& expected_turn_id != active_turn_id
{
return Err(SteerInputError::ExpectedTurnMismatch {
expected: expected_turn_id.to_string(),
actual: active_turn_id,
});
}
Some(crate::state::TaskKind::Compact) => {
return Err(SteerInputError::ActiveTurnNotSteerable {
turn_kind: NonSteerableTurnKind::Compact,
});
match active_task.kind {
crate::state::TaskKind::Regular => {}
crate::state::TaskKind::Review => {
return Err(SteerInputError::ActiveTurnNotSteerable {
turn_kind: NonSteerableTurnKind::Review,
});
}
crate::state::TaskKind::Compact => {
return Err(SteerInputError::ActiveTurnNotSteerable {
turn_kind: NonSteerableTurnKind::Compact,
});
}
}
None => return Err(SteerInputError::NoActiveTurn(input)),
}
if let Some(responsesapi_client_metadata) = responsesapi_client_metadata
&& let Some((_, active_task)) = active_turn.tasks.first()
{
active_task
.turn_context
.turn_metadata_state
.set_responsesapi_client_metadata(responsesapi_client_metadata);
}
if let Some(responsesapi_client_metadata) = responsesapi_client_metadata {
active_task
.turn_context
.turn_metadata_state
.set_responsesapi_client_metadata(responsesapi_client_metadata);
}
let mut turn_state = active_turn.turn_state.lock().await;
turn_state.push_pending_input(input.into());
turn_state.accept_mailbox_delivery_for_current_turn();
Ok(active_turn_id.clone())
let mut turn_state = active_turn.turn_state.lock().await;
turn_state.push_pending_input(input.into());
turn_state.accept_mailbox_delivery_for_current_turn();
active_turn_id
};
Ok(active_turn_id)
}
/// Returns the input if there was no task running to inject into.

View File

@@ -24,10 +24,13 @@ use core_test_support::responses::mount_sse_once;
use core_test_support::responses::sse;
use core_test_support::responses::sse_response;
use core_test_support::responses::start_mock_server;
use core_test_support::streaming_sse::StreamingSseChunk;
use core_test_support::streaming_sse::start_streaming_sse_server;
use core_test_support::test_codex::TestCodex;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use std::sync::Mutex;
use tokio::sync::oneshot;
use tracing::Level;
use tracing_test::traced_test;
@@ -131,6 +134,91 @@ async fn responses_api_emits_api_request_event() {
.map(|_| Ok(()))
.unwrap_or_else(|| Err("expected codex.conversation_starts event".to_string()))
});
logs_assert(|lines: &[&str]| {
lines
.iter()
.find(|line| line.contains("codex.user_prompt") && line.contains("prompt_length=5"))
.map(|_| Ok(()))
.unwrap_or_else(|| Err("expected codex.user_prompt event".to_string()))
});
}
#[tokio::test]
#[traced_test]
async fn steer_input_emits_user_prompt_event() {
let (complete_first_response_tx, complete_first_response_rx) = oneshot::channel();
let (server, _completions) = start_streaming_sse_server(vec![
vec![
StreamingSseChunk {
gate: None,
body: sse(vec![
ev_response_created("resp-1"),
ev_assistant_message("msg-1", "Working"),
]),
},
StreamingSseChunk {
gate: Some(complete_first_response_rx),
body: sse(vec![ev_completed("resp-1")]),
},
],
vec![StreamingSseChunk {
gate: None,
body: sse(vec![
ev_response_created("resp-2"),
ev_assistant_message("msg-2", "Done"),
ev_completed("resp-2"),
]),
}],
])
.await;
let TestCodex { codex, .. } = test_codex()
.build_with_streaming_server(&server)
.await
.unwrap();
codex
.submit(Op::UserInput {
environments: None,
items: vec![UserInput::Text {
text: "initial prompt".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnStarted(_))).await;
server.wait_for_request_count(/*count*/ 1).await;
codex
.steer_input(
vec![UserInput::Text {
text: "queued telemetry prompt".into(),
text_elements: Vec::new(),
}],
/*expected_turn_id*/ None,
/*responsesapi_client_metadata*/ None,
)
.await
.unwrap();
complete_first_response_tx
.send(())
.expect("first response gate should still be open");
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
server.shutdown().await;
logs_assert(|lines: &[&str]| {
lines
.iter()
.find(|line| line.contains("codex.user_prompt") && line.contains("prompt_length=23"))
.map(|_| Ok(()))
.unwrap_or_else(|| Err("expected codex.user_prompt for steered input".to_string()))
});
}
#[tokio::test]