[codex] add otel tracing (#7844)

This commit is contained in:
Anton Panasenko
2025-12-12 17:07:17 -08:00
committed by GitHub
parent 596fcd040f
commit ad7b9d63c3
39 changed files with 958 additions and 315 deletions

View File

@@ -61,9 +61,13 @@ use tokio::sync::Mutex;
use tokio::sync::RwLock;
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use tracing::debug;
use tracing::error;
use tracing::field;
use tracing::info;
use tracing::info_span;
use tracing::instrument;
use tracing::warn;
use crate::ModelProviderInfo;
@@ -141,7 +145,7 @@ use crate::user_notification::UserNotification;
use crate::util::backoff;
use codex_async_utils::OrCancelExt;
use codex_execpolicy::Policy as ExecPolicy;
use codex_otel::otel_event_manager::OtelEventManager;
use codex_otel::otel_manager::OtelManager;
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseInputItem;
@@ -479,7 +483,7 @@ impl Session {
#[allow(clippy::too_many_arguments)]
fn make_turn_context(
auth_manager: Option<Arc<AuthManager>>,
otel_event_manager: &OtelEventManager,
otel_manager: &OtelManager,
provider: ModelProviderInfo,
session_configuration: &SessionConfiguration,
per_turn_config: Config,
@@ -487,7 +491,7 @@ impl Session {
conversation_id: ConversationId,
sub_id: String,
) -> TurnContext {
let otel_event_manager = otel_event_manager.clone().with_model(
let otel_manager = otel_manager.clone().with_model(
session_configuration.model.as_str(),
model_family.get_model_slug(),
);
@@ -497,7 +501,7 @@ impl Session {
per_turn_config.clone(),
auth_manager,
model_family.clone(),
otel_event_manager,
otel_manager,
provider,
session_configuration.model_reasoning_effort,
session_configuration.model_reasoning_summary,
@@ -616,7 +620,7 @@ impl Session {
maybe_push_chat_wire_api_deprecation(&config, &mut post_session_configured_events);
// todo(aibrahim): why are we passing model here while it can change?
let otel_event_manager = OtelEventManager::new(
let otel_manager = OtelManager::new(
conversation_id,
session_configuration.model.as_str(),
session_configuration.model.as_str(),
@@ -625,9 +629,10 @@ impl Session {
auth_manager.auth().map(|a| a.mode),
config.otel.log_user_prompt,
terminal::user_agent(),
session_configuration.session_source.clone(),
);
otel_event_manager.conversation_starts(
otel_manager.conversation_starts(
config.model_provider.name.as_str(),
config.model_reasoning_effort,
config.model_reasoning_summary,
@@ -658,7 +663,7 @@ impl Session {
user_shell: Arc::new(default_shell),
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
auth_manager: Arc::clone(&auth_manager),
otel_event_manager,
otel_manager,
models_manager: Arc::clone(&models_manager),
tool_approvals: Mutex::new(ApprovalStore::default()),
skills: skills.clone(),
@@ -787,15 +792,15 @@ impl Session {
"resuming session with different model: previous={prev}, current={curr}"
);
self.send_event(
&turn_context,
EventMsg::Warning(WarningEvent {
message: format!(
"This session was recorded with model `{prev}` but is resuming with `{curr}`. \
&turn_context,
EventMsg::Warning(WarningEvent {
message: format!(
"This session was recorded with model `{prev}` but is resuming with `{curr}`. \
Consider switching back to `{prev}` as it may affect Codex performance."
),
}),
)
.await;
),
}),
)
.await;
}
}
@@ -868,7 +873,7 @@ impl Session {
.await;
let mut turn_context: TurnContext = Self::make_turn_context(
Some(Arc::clone(&self.services.auth_manager)),
&self.services.otel_event_manager,
&self.services.otel_manager,
session_configuration.provider.clone(),
&session_configuration,
per_turn_config,
@@ -1694,7 +1699,7 @@ mod handlers {
let current_context = sess.new_turn_with_sub_id(sub_id, updates).await;
current_context
.client
.get_otel_event_manager()
.get_otel_manager()
.user_prompt(&items);
// Attempt to inject input into current task
@@ -2003,20 +2008,17 @@ async fn spawn_review_thread(
per_turn_config.model_reasoning_summary = ReasoningSummaryConfig::Detailed;
per_turn_config.features = review_features.clone();
let otel_event_manager = parent_turn_context
.client
.get_otel_event_manager()
.with_model(
config.review_model.as_str(),
review_model_family.slug.as_str(),
);
let otel_manager = parent_turn_context.client.get_otel_manager().with_model(
config.review_model.as_str(),
review_model_family.slug.as_str(),
);
let per_turn_config = Arc::new(per_turn_config);
let client = ModelClient::new(
per_turn_config.clone(),
auth_manager,
model_family.clone(),
otel_event_manager,
otel_manager,
provider,
per_turn_config.model_reasoning_effort,
per_turn_config.model_reasoning_summary,
@@ -2238,6 +2240,14 @@ pub(crate) async fn run_task(
last_agent_message
}
#[instrument(
skip_all,
fields(
turn_id = %turn_context.sub_id,
model = %turn_context.client.get_model(),
cwd = %turn_context.cwd.display()
)
)]
async fn run_turn(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
@@ -2370,6 +2380,13 @@ async fn drain_in_flight(
}
#[allow(clippy::too_many_arguments)]
#[instrument(
skip_all,
fields(
turn_id = %turn_context.sub_id,
model = %turn_context.client.get_model()
)
)]
async fn try_run_turn(
router: Arc<ToolRouter>,
sess: Arc<Session>,
@@ -2392,6 +2409,7 @@ async fn try_run_turn(
.client
.clone()
.stream(prompt)
.instrument(info_span!("stream_request"))
.or_cancel(&cancellation_token)
.await??;
@@ -2406,8 +2424,22 @@ async fn try_run_turn(
let mut needs_follow_up = false;
let mut last_agent_message: Option<String> = None;
let mut active_item: Option<TurnItem> = None;
let receiving_span = info_span!("receiving_stream");
let outcome: CodexResult<TurnRunResult> = loop {
let event = match stream.next().or_cancel(&cancellation_token).await {
let handle_responses = info_span!(
parent: &receiving_span,
"handle_responses",
otel.name = field::Empty,
tool_name = field::Empty,
from = field::Empty,
);
let event = match stream
.next()
.instrument(info_span!(parent: &handle_responses, "receiving"))
.or_cancel(&cancellation_token)
.await
{
Ok(event) => event,
Err(codex_async_utils::CancelErr::Cancelled) => break Err(CodexErr::TurnAborted),
};
@@ -2422,6 +2454,10 @@ async fn try_run_turn(
}
};
sess.services
.otel_manager
.record_responses(&handle_responses, &event);
match event {
ResponseEvent::Created => {}
ResponseEvent::OutputItemDone(item) => {
@@ -2433,8 +2469,9 @@ async fn try_run_turn(
cancellation_token: cancellation_token.child_token(),
};
let output_result =
handle_output_item_done(&mut ctx, item, previously_active_item).await?;
let output_result = handle_output_item_done(&mut ctx, item, previously_active_item)
.instrument(handle_responses)
.await?;
if let Some(tool_future) = output_result.tool_future {
in_flight.push_back(tool_future);
}
@@ -2913,12 +2950,13 @@ mod tests {
})
}
fn otel_event_manager(
fn otel_manager(
conversation_id: ConversationId,
config: &Config,
model_family: &ModelFamily,
) -> OtelEventManager {
OtelEventManager::new(
session_source: SessionSource,
) -> OtelManager {
OtelManager::new(
conversation_id,
ModelsManager::get_model_offline(config.model.as_deref()).as_str(),
model_family.slug.as_str(),
@@ -2927,6 +2965,7 @@ mod tests {
Some(AuthMode::ChatGPT),
false,
"test".to_string(),
session_source,
)
}
@@ -2966,8 +3005,12 @@ mod tests {
session_configuration.model.as_str(),
&per_turn_config,
);
let otel_event_manager =
otel_event_manager(conversation_id, config.as_ref(), &model_family);
let otel_manager = otel_manager(
conversation_id,
config.as_ref(),
&model_family,
session_configuration.session_source.clone(),
);
let state = SessionState::new(session_configuration.clone());
@@ -2980,7 +3023,7 @@ mod tests {
user_shell: Arc::new(default_user_shell()),
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
auth_manager: auth_manager.clone(),
otel_event_manager: otel_event_manager.clone(),
otel_manager: otel_manager.clone(),
models_manager,
tool_approvals: Mutex::new(ApprovalStore::default()),
skills: None,
@@ -2988,7 +3031,7 @@ mod tests {
let turn_context = Session::make_turn_context(
Some(Arc::clone(&auth_manager)),
&otel_event_manager,
&otel_manager,
session_configuration.provider.clone(),
&session_configuration,
per_turn_config,
@@ -3052,8 +3095,12 @@ mod tests {
session_configuration.model.as_str(),
&per_turn_config,
);
let otel_event_manager =
otel_event_manager(conversation_id, config.as_ref(), &model_family);
let otel_manager = otel_manager(
conversation_id,
config.as_ref(),
&model_family,
session_configuration.session_source.clone(),
);
let state = SessionState::new(session_configuration.clone());
@@ -3066,7 +3113,7 @@ mod tests {
user_shell: Arc::new(default_user_shell()),
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
auth_manager: Arc::clone(&auth_manager),
otel_event_manager: otel_event_manager.clone(),
otel_manager: otel_manager.clone(),
models_manager,
tool_approvals: Mutex::new(ApprovalStore::default()),
skills: None,
@@ -3074,7 +3121,7 @@ mod tests {
let turn_context = Arc::new(Session::make_turn_context(
Some(Arc::clone(&auth_manager)),
&otel_event_manager,
&otel_manager,
session_configuration.provider.clone(),
&session_configuration,
per_turn_config,