Compare commits

...

1 Commits

Author SHA1 Message Date
jif-oai
e4e5829d71 test: new tracing 2026-04-17 12:01:28 +01:00
21 changed files with 2226 additions and 11 deletions

20
codex-rs/Cargo.lock generated
View File

@@ -1464,6 +1464,7 @@ dependencies = [
"codex-state",
"codex-thread-store",
"codex-tools",
"codex-trace",
"codex-utils-absolute-path",
"codex-utils-cargo-bin",
"codex-utils-cli",
@@ -1695,6 +1696,7 @@ dependencies = [
"codex-state",
"codex-stdio-to-uds",
"codex-terminal-detection",
"codex-trace",
"codex-tui",
"codex-utils-absolute-path",
"codex-utils-cargo-bin",
@@ -1943,6 +1945,7 @@ dependencies = [
"codex-test-binary-support",
"codex-thread-store",
"codex-tools",
"codex-trace",
"codex-utils-absolute-path",
"codex-utils-cache",
"codex-utils-cargo-bin",
@@ -2103,6 +2106,7 @@ dependencies = [
"codex-model-provider-info",
"codex-otel",
"codex-protocol",
"codex-trace",
"codex-utils-absolute-path",
"codex-utils-cargo-bin",
"codex-utils-cli",
@@ -2911,6 +2915,21 @@ dependencies = [
"tracing",
]
[[package]]
name = "codex-trace"
version = "0.0.0"
dependencies = [
"anyhow",
"chrono",
"pretty_assertions",
"serde",
"serde_json",
"tempfile",
"tracing",
"tracing-appender",
"tracing-subscriber",
]
[[package]]
name = "codex-tui"
version = "0.0.0"
@@ -2946,6 +2965,7 @@ dependencies = [
"codex-shell-command",
"codex-state",
"codex-terminal-detection",
"codex-trace",
"codex-utils-absolute-path",
"codex-utils-approval-presets",
"codex-utils-cargo-bin",

View File

@@ -57,6 +57,7 @@ members = [
"response-debug-context",
"sandboxing",
"stdio-to-uds",
"trace",
"otel",
"tui",
"tools",
@@ -169,6 +170,7 @@ codex-skills = { path = "skills" }
codex-state = { path = "state" }
codex-stdio-to-uds = { path = "stdio-to-uds" }
codex-terminal-detection = { path = "terminal-detection" }
codex-trace = { path = "trace" }
codex-test-binary-support = { path = "test-binary-support" }
codex-thread-store = { path = "thread-store" }
codex-tools = { path = "tools" }

View File

@@ -57,6 +57,7 @@ codex-sandboxing = { workspace = true }
codex-state = { workspace = true }
codex-thread-store = { workspace = true }
codex-tools = { workspace = true }
codex-trace = { workspace = true }
codex-utils-absolute-path = { workspace = true }
codex-utils-json-to-toml = { workspace = true }
codex-utils-rustls-provider = { workspace = true }

View File

@@ -522,6 +522,7 @@ pub async fn run_main_with_transport(
.map(|layer| layer.with_filter(Targets::new().with_default(Level::TRACE)));
let otel_logger_layer = otel.as_ref().and_then(|o| o.logger_layer());
let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer());
let codex_trace_layer = codex_trace::local_layer_from_env().ok().flatten();
let _ = tracing_subscriber::registry()
.with(stderr_fmt)
.with(feedback_layer)
@@ -529,6 +530,7 @@ pub async fn run_main_with_transport(
.with(log_db_layer)
.with(otel_logger_layer)
.with(otel_tracing_layer)
.with(codex_trace_layer)
.try_init();
for warning in &config_warnings {
match &warning.details {

View File

@@ -44,6 +44,7 @@ codex-sandboxing = { workspace = true }
codex-state = { workspace = true }
codex-stdio-to-uds = { workspace = true }
codex-terminal-detection = { workspace = true }
codex-trace = { workspace = true }
codex-tui = { workspace = true }
codex-utils-absolute-path = { workspace = true }
codex-utils-path = { workspace = true }

View File

@@ -24,6 +24,8 @@ use codex_execpolicy::ExecPolicyCheckCommand;
use codex_responses_api_proxy::Args as ResponsesApiProxyArgs;
use codex_state::StateRuntime;
use codex_state::state_db_path;
use codex_trace::REDUCED_STATE_FILE_NAME;
use codex_trace::reduce_bundle_to_path;
use codex_tui::AppExitInfo;
use codex_tui::Cli as TuiCli;
use codex_tui::ExitReason;
@@ -191,6 +193,9 @@ enum DebugSubcommand {
/// Render the model-visible prompt input list as JSON.
PromptInput(DebugPromptInputCommand),
/// Reduce a local trace bundle into viewer state JSON.
TraceReduce(DebugTraceReduceCommand),
/// Internal: reset local memory state for a fresh start.
#[clap(hide = true)]
ClearMemories,
@@ -225,6 +230,17 @@ struct DebugPromptInputCommand {
images: Vec<PathBuf>,
}
#[derive(Debug, Parser)]
struct DebugTraceReduceCommand {
/// Trace bundle directory containing manifest.json and events.jsonl.
#[arg(value_name = "TRACE_BUNDLE")]
trace_bundle: PathBuf,
/// Output state.json path. Defaults to TRACE_BUNDLE/state.json.
#[arg(long = "output", short = 'o', value_name = "FILE")]
output: Option<PathBuf>,
}
#[derive(Debug, Parser)]
struct ResumeCommand {
/// Conversation/session id (UUID) or thread name. UUIDs take precedence if it parses.
@@ -992,6 +1008,14 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
)
.await?;
}
DebugSubcommand::TraceReduce(cmd) => {
reject_remote_mode_for_subcommand(
root_remote.as_deref(),
root_remote_auth_token_env.as_deref(),
"debug trace-reduce",
)?;
run_debug_trace_reduce_command(cmd).await?;
}
DebugSubcommand::ClearMemories => {
reject_remote_mode_for_subcommand(
root_remote.as_deref(),
@@ -1259,6 +1283,15 @@ async fn run_debug_prompt_input_command(
Ok(())
}
async fn run_debug_trace_reduce_command(cmd: DebugTraceReduceCommand) -> anyhow::Result<()> {
let output = cmd
.output
.unwrap_or_else(|| cmd.trace_bundle.join(REDUCED_STATE_FILE_NAME));
reduce_bundle_to_path(&cmd.trace_bundle, &output)?;
println!("{}", output.display());
Ok(())
}
async fn run_debug_clear_memories_command(
root_config_overrides: &CliConfigOverrides,
interactive: &TuiCli,

View File

@@ -60,6 +60,7 @@ codex-state = { workspace = true }
codex-terminal-detection = { workspace = true }
codex-thread-store = { workspace = true }
codex-tools = { workspace = true }
codex-trace = { workspace = true }
codex-utils-absolute-path = { workspace = true }
codex-utils-cache = { workspace = true }
codex-utils-image = { workspace = true }

View File

@@ -85,6 +85,7 @@ use http::HeaderMap as ApiHeaderMap;
use http::HeaderValue;
use http::StatusCode as HttpStatusCode;
use reqwest::StatusCode;
use serde_json::json;
use std::time::Duration;
use std::time::Instant;
use tokio::sync::mpsc;
@@ -137,6 +138,13 @@ const MEMORIES_SUMMARIZE_ENDPOINT: &str = "/memories/trace_summarize";
pub(crate) const WEBSOCKET_CONNECT_TIMEOUT: Duration =
Duration::from_millis(DEFAULT_WEBSOCKET_CONNECT_TIMEOUT_MS);
#[derive(Clone, Debug)]
pub(crate) struct CodexTraceContext {
pub(crate) thread_id: String,
pub(crate) turn_id: String,
pub(crate) inference_call_id: String,
}
/// Session-scoped state shared by all [`ModelClient`] clones.
///
/// This is intentionally kept minimal so `ModelClient` does not need to hold a full `Config`. Most
@@ -1140,6 +1148,7 @@ impl ModelClientSession {
summary: ReasoningSummaryConfig,
service_tier: Option<ServiceTier>,
turn_metadata_header: Option<&str>,
trace_context: Option<CodexTraceContext>,
) -> Result<ResponseStream> {
if let Some(path) = &*CODEX_RS_SSE_FIXTURE {
warn!(path, "Streaming from fixture");
@@ -1148,7 +1157,11 @@ impl ModelClientSession {
self.client.state.provider.stream_idle_timeout(),
)
.map_err(map_api_error)?;
let (stream, _last_request_rx) = map_response_stream(stream, session_telemetry.clone());
let (stream, _last_request_rx) = map_response_stream(
stream,
session_telemetry.clone(),
/*trace_context*/ None,
);
return Ok(stream);
}
@@ -1157,6 +1170,7 @@ impl ModelClientSession {
.as_ref()
.map(AuthManager::unauthorized_recovery);
let mut pending_retry = PendingUnauthorizedRetry::default();
let mut trace_started = false;
loop {
let client_setup = self.client.current_client_setup().await?;
let transport = ReqwestTransport::new(build_reqwest_client());
@@ -1182,6 +1196,18 @@ impl ModelClientSession {
summary,
service_tier,
)?;
if let Some(trace_context) = &trace_context
&& !trace_started
{
let request_payload = codex_trace::write_payload("inference_request", &request);
emit_inference_started(
trace_context,
model_info,
&self.client.state.provider.name,
request_payload.as_ref(),
);
trace_started = true;
}
let client = ApiResponsesClient::new(
transport,
client_setup.api_provider,
@@ -1192,7 +1218,8 @@ impl ModelClientSession {
match stream_result {
Ok(stream) => {
let (stream, _) = map_response_stream(stream, session_telemetry.clone());
let (stream, _) =
map_response_stream(stream, session_telemetry.clone(), trace_context);
return Ok(stream);
}
Err(ApiError::Transport(
@@ -1208,7 +1235,13 @@ impl ModelClientSession {
);
continue;
}
Err(err) => return Err(map_api_error(err)),
Err(err) => {
let mapped = map_api_error(err);
if let Some(trace_context) = &trace_context {
emit_inference_failed(trace_context, &mapped.to_string());
}
return Err(mapped);
}
}
}
}
@@ -1237,6 +1270,7 @@ impl ModelClientSession {
summary: ReasoningSummaryConfig,
service_tier: Option<ServiceTier>,
turn_metadata_header: Option<&str>,
trace_context: Option<CodexTraceContext>,
warmup: bool,
request_trace: Option<W3cTraceContext>,
) -> Result<WebsocketStreamOutcome> {
@@ -1312,18 +1346,38 @@ impl ModelClientSession {
}
let ws_request = self.prepare_websocket_request(ws_payload, &request);
let request_payload = trace_context
.as_ref()
.and_then(|_| codex_trace::write_payload("inference_request", &ws_request));
if let Some(trace_context) = &trace_context {
emit_inference_started(
trace_context,
model_info,
&self.client.state.provider.name,
request_payload.as_ref(),
);
}
self.websocket_session.last_request = Some(request);
let stream_result = self.websocket_session.connection.as_ref().ok_or_else(|| {
map_api_error(ApiError::Stream(
"websocket connection is unavailable".to_string(),
))
})?;
let stream_result = stream_result
let stream_result = match stream_result
.stream_request(ws_request, self.websocket_session.connection_reused())
.await
.map_err(map_api_error)?;
{
Ok(stream) => stream,
Err(err) => {
let mapped = map_api_error(err);
if let Some(trace_context) = &trace_context {
emit_inference_failed(trace_context, &mapped.to_string());
}
return Err(mapped);
}
};
let (stream, last_request_rx) =
map_response_stream(stream_result, session_telemetry.clone());
map_response_stream(stream_result, session_telemetry.clone(), trace_context);
self.websocket_session.last_response_rx = Some(last_request_rx);
return Ok(WebsocketStreamOutcome::Stream(stream));
}
@@ -1391,6 +1445,7 @@ impl ModelClientSession {
summary,
service_tier,
turn_metadata_header,
/*trace_context*/ None,
/*warmup*/ true,
current_span_w3c_trace_context(),
)
@@ -1431,6 +1486,43 @@ impl ModelClientSession {
summary: ReasoningSummaryConfig,
service_tier: Option<ServiceTier>,
turn_metadata_header: Option<&str>,
) -> Result<ResponseStream> {
self.stream_with_trace(
prompt,
model_info,
session_telemetry,
effort,
summary,
service_tier,
turn_metadata_header,
/*trace_context*/ None,
)
.await
}
#[allow(clippy::too_many_arguments)]
#[tracing::instrument(
target = "codex_otel.trace_safe",
name = "codex.inference",
skip_all,
fields(
thread.id = %trace_context.as_ref().map(|ctx| ctx.thread_id.as_str()).unwrap_or(""),
turn.id = %trace_context.as_ref().map(|ctx| ctx.turn_id.as_str()).unwrap_or(""),
inference.id = %trace_context.as_ref().map(|ctx| ctx.inference_call_id.as_str()).unwrap_or(""),
model = %model_info.slug,
provider.name = %self.client.state.provider.name,
)
)]
pub(crate) async fn stream_with_trace(
&mut self,
prompt: &Prompt,
model_info: &ModelInfo,
session_telemetry: &SessionTelemetry,
effort: Option<ReasoningEffortConfig>,
summary: ReasoningSummaryConfig,
service_tier: Option<ServiceTier>,
turn_metadata_header: Option<&str>,
trace_context: Option<CodexTraceContext>,
) -> Result<ResponseStream> {
let wire_api = self.client.state.provider.wire_api;
match wire_api {
@@ -1446,6 +1538,7 @@ impl ModelClientSession {
summary,
service_tier,
turn_metadata_header,
trace_context.clone(),
/*warmup*/ false,
request_trace,
)
@@ -1466,6 +1559,7 @@ impl ModelClientSession {
summary,
service_tier,
turn_metadata_header,
trace_context,
)
.await
}
@@ -1558,9 +1652,62 @@ fn parent_thread_id_header_value(session_source: &SessionSource) -> Option<Strin
}
}
fn emit_inference_started(
trace_context: &CodexTraceContext,
model_info: &ModelInfo,
provider_name: &str,
request_payload: Option<&codex_trace::RawPayloadRef>,
) {
tracing::event!(
target: codex_otel::OTEL_TRACE_SAFE_TARGET,
tracing::Level::INFO,
event.name = %"codex.inference.started",
thread.id = %trace_context.thread_id,
turn.id = %trace_context.turn_id,
inference.id = %trace_context.inference_call_id,
model = %model_info.slug,
provider.name = %provider_name,
raw_payload.request.id = %request_payload.map(|payload| payload.raw_payload_id.as_str()).unwrap_or(""),
raw_payload.request.path = %request_payload.map(|payload| payload.path.as_str()).unwrap_or(""),
raw_payload.request.kind = %request_payload.map(|payload| payload.kind.as_str()).unwrap_or(""),
);
}
fn emit_inference_completed(
trace_context: &CodexTraceContext,
response_id: &str,
response_payload: Option<&codex_trace::RawPayloadRef>,
) {
tracing::event!(
target: codex_otel::OTEL_TRACE_SAFE_TARGET,
tracing::Level::INFO,
event.name = %"codex.inference.completed",
thread.id = %trace_context.thread_id,
turn.id = %trace_context.turn_id,
inference.id = %trace_context.inference_call_id,
response.id = %response_id,
raw_payload.response.id = %response_payload.map(|payload| payload.raw_payload_id.as_str()).unwrap_or(""),
raw_payload.response.path = %response_payload.map(|payload| payload.path.as_str()).unwrap_or(""),
raw_payload.response.kind = %response_payload.map(|payload| payload.kind.as_str()).unwrap_or(""),
);
}
fn emit_inference_failed(trace_context: &CodexTraceContext, error: &str) {
tracing::event!(
target: codex_otel::OTEL_TRACE_SAFE_TARGET,
tracing::Level::INFO,
event.name = %"codex.inference.failed",
thread.id = %trace_context.thread_id,
turn.id = %trace_context.turn_id,
inference.id = %trace_context.inference_call_id,
error = %error,
);
}
fn map_response_stream<S>(
api_stream: S,
session_telemetry: SessionTelemetry,
trace_context: Option<CodexTraceContext>,
) -> (ResponseStream, oneshot::Receiver<LastResponse>)
where
S: futures::Stream<Item = std::result::Result<ResponseEvent, ApiError>>
@@ -1592,6 +1739,17 @@ where
response_id,
token_usage,
}) => {
if let Some(trace_context) = &trace_context {
let payload = codex_trace::write_payload(
"inference_response_summary",
&json!({
"response_id": response_id,
"token_usage": token_usage.clone(),
"output_items": items_added.clone(),
}),
);
emit_inference_completed(trace_context, &response_id, payload.as_ref());
}
if let Some(usage) = &token_usage {
session_telemetry.sse_event_completed(
usage.input_tokens,
@@ -1627,6 +1785,9 @@ where
let mapped = map_api_error(err);
if !logged_error {
session_telemetry.see_event_completed_failed(&mapped);
if let Some(trace_context) = &trace_context {
emit_inference_failed(trace_context, &mapped.to_string());
}
logged_error = true;
}
if tx_event.send(Err(mapped)).await.is_err() {

View File

@@ -180,6 +180,7 @@ use tracing::trace_span;
use tracing::warn;
use uuid::Uuid;
use crate::client::CodexTraceContext;
use crate::client::ModelClient;
use crate::client::ModelClientSession;
use crate::client_common::Prompt;
@@ -1327,6 +1328,200 @@ pub(crate) struct AppServerClientMetadata {
pub(crate) client_version: Option<String>,
}
fn emit_trace_safe_event_for_event_msg(thread_id: ThreadId, turn_id: &str, msg: &EventMsg) {
match msg {
EventMsg::TurnComplete(_) => {
emit_turn_ended_trace_event(thread_id, turn_id, "completed");
}
EventMsg::TurnAborted(_) => {
emit_turn_ended_trace_event(thread_id, turn_id, "aborted");
}
EventMsg::CollabAgentSpawnBegin(event) => {
let tool_call_id = trace_tool_call_id(&event.call_id);
emit_collab_trace_event(
"codex.collab.spawn.started",
thread_id,
turn_id,
&tool_call_id,
event.sender_thread_id,
None,
None,
None,
);
}
EventMsg::CollabAgentSpawnEnd(event) => {
let tool_call_id = trace_tool_call_id(&event.call_id);
emit_collab_trace_event(
"codex.collab.spawn.ended",
thread_id,
turn_id,
&tool_call_id,
event.sender_thread_id,
event.new_thread_id,
event.new_agent_nickname.as_deref(),
event.new_agent_role.as_deref(),
);
}
EventMsg::CollabAgentInteractionBegin(event) => {
let tool_call_id = trace_tool_call_id(&event.call_id);
emit_collab_trace_event(
"codex.collab.message.started",
thread_id,
turn_id,
&tool_call_id,
event.sender_thread_id,
Some(event.receiver_thread_id),
None,
None,
);
}
EventMsg::CollabAgentInteractionEnd(event) => {
let tool_call_id = trace_tool_call_id(&event.call_id);
emit_collab_trace_event(
"codex.collab.message.ended",
thread_id,
turn_id,
&tool_call_id,
event.sender_thread_id,
Some(event.receiver_thread_id),
event.receiver_agent_nickname.as_deref(),
event.receiver_agent_role.as_deref(),
);
}
EventMsg::CollabWaitingBegin(event) => {
let tool_call_id = trace_tool_call_id(&event.call_id);
for receiver_thread_id in &event.receiver_thread_ids {
emit_collab_trace_event(
"codex.collab.wait.started",
thread_id,
turn_id,
&tool_call_id,
event.sender_thread_id,
Some(*receiver_thread_id),
None,
None,
);
}
}
EventMsg::CollabWaitingEnd(event) => {
let tool_call_id = trace_tool_call_id(&event.call_id);
if event.agent_statuses.is_empty() {
if event.statuses.is_empty() {
emit_collab_trace_event(
"codex.collab.wait.ended",
thread_id,
turn_id,
&tool_call_id,
event.sender_thread_id,
None,
None,
None,
);
} else {
for receiver_thread_id in event.statuses.keys() {
emit_collab_trace_event(
"codex.collab.wait.ended",
thread_id,
turn_id,
&tool_call_id,
event.sender_thread_id,
Some(*receiver_thread_id),
None,
None,
);
}
}
} else {
for agent_status in &event.agent_statuses {
emit_collab_trace_event(
"codex.collab.wait.ended",
thread_id,
turn_id,
&tool_call_id,
event.sender_thread_id,
Some(agent_status.thread_id),
agent_status.agent_nickname.as_deref(),
agent_status.agent_role.as_deref(),
);
}
}
}
EventMsg::CollabCloseBegin(event) => {
let tool_call_id = trace_tool_call_id(&event.call_id);
emit_collab_trace_event(
"codex.collab.close.started",
thread_id,
turn_id,
&tool_call_id,
event.sender_thread_id,
Some(event.receiver_thread_id),
None,
None,
);
}
EventMsg::CollabCloseEnd(event) => {
let tool_call_id = trace_tool_call_id(&event.call_id);
emit_collab_trace_event(
"codex.collab.close.ended",
thread_id,
turn_id,
&tool_call_id,
event.sender_thread_id,
Some(event.receiver_thread_id),
event.receiver_agent_nickname.as_deref(),
event.receiver_agent_role.as_deref(),
);
}
_ => {}
}
}
fn emit_turn_ended_trace_event(thread_id: ThreadId, turn_id: &str, status: &str) {
tracing::event!(
target: codex_otel::OTEL_TRACE_SAFE_TARGET,
tracing::Level::INFO,
event.name = %"codex.turn.ended",
thread.id = %thread_id,
turn.id = %turn_id,
status = %status,
);
}
fn emit_collab_trace_event(
event_name: &str,
thread_id: ThreadId,
turn_id: &str,
tool_call_id: &str,
sender_thread_id: ThreadId,
target_thread_id: Option<ThreadId>,
target_agent_nickname: Option<&str>,
target_agent_role: Option<&str>,
) {
let target_thread_id = target_thread_id
.map(|thread_id| thread_id.to_string())
.unwrap_or_default();
tracing::event!(
target: codex_otel::OTEL_TRACE_SAFE_TARGET,
tracing::Level::INFO,
event.name = %event_name,
thread.id = %thread_id,
turn.id = %turn_id,
tool.call_id = %tool_call_id,
sender.thread.id = %sender_thread_id,
target.thread.id = %target_thread_id,
target.agent.nickname = %target_agent_nickname.unwrap_or(""),
target.agent.role = %target_agent_role.unwrap_or(""),
);
}
fn trace_tool_call_id(call_id: &str) -> String {
if call_id.starts_with("tool:") {
call_id.to_string()
} else {
format!("tool:{call_id}")
}
}
impl Session {
pub(crate) async fn app_server_client_metadata(&self) -> AppServerClientMetadata {
let state = self.state.lock().await;
@@ -2190,6 +2385,32 @@ impl Session {
let mut guard = network_policy_decider_session.write().await;
*guard = Arc::downgrade(&sess);
}
let thread_id = conversation_id.to_string();
let (agent_path, parent_thread_id) = match &session_configuration.session_source {
SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
agent_path,
..
}) => (
agent_path
.as_ref()
.map(ToString::to_string)
.unwrap_or_else(|| "/root".to_string()),
Some(parent_thread_id.to_string()),
),
_ => ("/root".to_string(), None),
};
let session_source = session_configuration.session_source.to_string();
tracing::event!(
target: codex_otel::OTEL_TRACE_SAFE_TARGET,
tracing::Level::INFO,
event.name = %"codex.thread.started",
thread.id = %thread_id,
agent.path = %agent_path,
model = session_configuration.collaboration_mode.model(),
thread.source = %session_source,
parent_thread.id = %parent_thread_id.as_deref().unwrap_or(""),
);
// Dispatch the SessionConfiguredEvent first and then report any errors.
// If resuming, include converted initial messages in the payload so UIs can render them immediately.
let initial_messages = initial_history.get_event_msgs();
@@ -2764,6 +2985,13 @@ impl Session {
if let Some(final_schema) = final_output_json_schema {
turn_context.final_output_json_schema = final_schema;
}
tracing::event!(
target: codex_otel::OTEL_TRACE_SAFE_TARGET,
tracing::Level::INFO,
event.name = %"codex.turn.started",
thread.id = %self.conversation_id,
turn.id = %turn_context.sub_id,
);
let turn_context = Arc::new(turn_context);
turn_context.turn_metadata_state.spawn_git_enrichment_task();
turn_context
@@ -2892,6 +3120,11 @@ impl Session {
/// Persist the event to rollout and send it to clients.
pub(crate) async fn send_event(&self, turn_context: &TurnContext, msg: EventMsg) {
let legacy_source = msg.clone();
emit_trace_safe_event_for_event_msg(
self.conversation_id,
&turn_context.sub_id,
&legacy_source,
);
let event = Event {
id: turn_context.sub_id.clone(),
msg,
@@ -7876,8 +8109,13 @@ async fn try_run_sampling_request(
auth_mode = sess.services.auth_manager.auth_mode(),
features = sess.features.enabled_features(),
);
let trace_context = CodexTraceContext {
thread_id: sess.conversation_id.to_string(),
turn_id: turn_context.sub_id.clone(),
inference_call_id: codex_trace::next_id("inference"),
};
let mut stream = client_session
.stream(
.stream_with_trace(
prompt,
&turn_context.model_info,
&turn_context.session_telemetry,
@@ -7885,6 +8123,7 @@ async fn try_run_sampling_request(
turn_context.reasoning_summary,
turn_context.config.service_tier,
turn_metadata_header,
Some(trace_context),
)
.instrument(trace_span!("stream_request"))
.or_cancel(&cancellation_token)

View File

@@ -28,6 +28,7 @@ use codex_tools::ToolSpec;
use codex_utils_readiness::Readiness;
use futures::future::BoxFuture;
use serde_json::Value;
use serde_json::json;
use tracing::warn;
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
@@ -206,9 +207,39 @@ impl ToolRegistry {
// }
// }
#[tracing::instrument(
target = "codex_otel.trace_safe",
name = "codex.tool_call",
skip_all,
fields(
thread.id = %invocation.session.conversation_id,
turn.id = %invocation.turn.sub_id,
tool.call_id = %invocation.call_id,
tool.name = %invocation.tool_name.display(),
)
)]
pub(crate) async fn dispatch_any(
&self,
invocation: ToolInvocation,
) -> Result<AnyToolResult, FunctionCallError> {
let call_id_owned = invocation.call_id.clone();
let trace_tool_call_id = format!("tool:{call_id_owned}");
let invocation_payload =
codex_trace::write_payload("tool_invocation", &trace_tool_invocation(&invocation));
emit_tool_started(
&invocation,
&trace_tool_call_id,
invocation_payload.as_ref(),
);
let result = self.dispatch_any_inner(invocation).await;
emit_tool_ended(&trace_tool_call_id, &result);
result
}
async fn dispatch_any_inner(
&self,
invocation: ToolInvocation,
) -> Result<AnyToolResult, FunctionCallError> {
let tool_name = invocation.tool_name.clone();
let display_name = tool_name.display();
@@ -297,10 +328,11 @@ impl ToolRegistry {
)
.await
{
return Err(FunctionCallError::RespondToModel(format!(
let message = format!(
"Command blocked by PreToolUse hook: {reason}. Command: {}",
pre_tool_use_payload.command
)));
);
return Err(FunctionCallError::RespondToModel(message));
}
let is_mutating = handler.is_mutating(&invocation).await;
@@ -374,7 +406,7 @@ impl ToolRegistry {
// Deprecated: this is the legacy AfterToolUse hook. Prefer the new PostToolUse
let hook_abort_error = dispatch_after_tool_use_hook(AfterToolUseHookDispatch {
invocation: &invocation,
output_preview,
output_preview: output_preview.clone(),
success,
executed: true,
duration,
@@ -429,6 +461,114 @@ impl ToolRegistry {
}
}
fn emit_tool_started(
invocation: &ToolInvocation,
tool_call_id: &str,
invocation_payload: Option<&codex_trace::RawPayloadRef>,
) {
tracing::event!(
target: codex_otel::OTEL_TRACE_SAFE_TARGET,
tracing::Level::INFO,
event.name = %"codex.tool.started",
thread.id = %invocation.session.conversation_id,
turn.id = %invocation.turn.sub_id,
tool.call_id = %tool_call_id,
tool.name = %invocation.tool_name.display(),
model_visible_call.id = %invocation.call_id,
raw_payload.invocation.id = %invocation_payload.map(|payload| payload.raw_payload_id.as_str()).unwrap_or(""),
raw_payload.invocation.path = %invocation_payload.map(|payload| payload.path.as_str()).unwrap_or(""),
raw_payload.invocation.kind = %invocation_payload.map(|payload| payload.kind.as_str()).unwrap_or(""),
);
}
fn emit_tool_ended(tool_call_id: &str, result: &Result<AnyToolResult, FunctionCallError>) {
let (status, output_preview) = match result {
Ok(result) => {
let success = result.result.success_for_logging();
let status = if success { "completed" } else { "failed" };
(status, result.result.log_preview())
}
Err(err) => ("failed", err.to_string()),
};
let result_payload = codex_trace::write_payload_lazy("tool_result", || {
trace_tool_result(result, status, &output_preview)
});
tracing::event!(
target: codex_otel::OTEL_TRACE_SAFE_TARGET,
tracing::Level::INFO,
event.name = %"codex.tool.ended",
tool.call_id = %tool_call_id,
status = %status,
output_preview = %output_preview,
raw_payload.result.id = %result_payload.as_ref().map(|payload| payload.raw_payload_id.as_str()).unwrap_or(""),
raw_payload.result.path = %result_payload.as_ref().map(|payload| payload.path.as_str()).unwrap_or(""),
raw_payload.result.kind = %result_payload.as_ref().map(|payload| payload.kind.as_str()).unwrap_or(""),
);
}
fn trace_tool_invocation(invocation: &ToolInvocation) -> Value {
let payload = match &invocation.payload {
ToolPayload::Function { arguments } => json!({
"type": "function",
"arguments": arguments,
}),
ToolPayload::ToolSearch { arguments } => json!({
"type": "tool_search",
"arguments": arguments,
}),
ToolPayload::Custom { input } => json!({
"type": "custom",
"input": input,
}),
ToolPayload::LocalShell { params } => json!({
"type": "local_shell",
"command": &params.command,
"workdir": &params.workdir,
"timeout_ms": params.timeout_ms,
}),
ToolPayload::Mcp {
server,
tool,
raw_arguments,
} => json!({
"type": "mcp",
"server": server,
"tool": tool,
"raw_arguments": raw_arguments,
}),
};
json!({
"call_id": &invocation.call_id,
"tool_name": invocation.tool_name.display(),
"turn_id": &invocation.turn.sub_id,
"payload": payload,
})
}
fn trace_tool_result(
result: &Result<AnyToolResult, FunctionCallError>,
status: &str,
output_preview: &str,
) -> Value {
match result {
Ok(result) => json!({
"status": status,
"success": result.result.success_for_logging(),
"output_preview": output_preview,
"model_visible_response_item": result.result.to_response_item(&result.call_id, &result.payload),
"code_mode_result": result.result.code_mode_result(&result.payload),
}),
Err(err) => json!({
"status": status,
"success": false,
"output_preview": output_preview,
"error": {
"message": err.to_string(),
},
}),
}
}
pub struct ToolRegistryBuilder {
handlers: HashMap<ToolName, Arc<dyn AnyToolHandler>>,
specs: Vec<ConfiguredToolSpec>,

View File

@@ -1,5 +1,7 @@
use super::*;
use crate::tools::context::FunctionToolOutput;
use pretty_assertions::assert_eq;
use serde_json::json;
struct TestHandler;
@@ -49,3 +51,34 @@ fn handler_looks_up_namespaced_aliases_explicitly() {
.is_some_and(|handler| Arc::ptr_eq(handler, &namespaced_handler))
);
}
#[test]
fn trace_tool_result_includes_full_caller_payloads() {
let result = Ok(AnyToolResult {
call_id: "call-1".to_string(),
payload: ToolPayload::Function {
arguments: "{}".to_string(),
},
result: Box::new(FunctionToolOutput::from_text(
"tool output".to_string(),
Some(true),
)),
});
let payload = trace_tool_result(&result, "completed", "tool output");
assert_eq!(
payload,
json!({
"status": "completed",
"success": true,
"output_preview": "tool output",
"model_visible_response_item": {
"type": "function_call_output",
"call_id": "call-1",
"output": "tool output",
},
"code_mode_result": "tool output",
})
);
}

View File

@@ -34,6 +34,7 @@ codex-login = { workspace = true }
codex-model-provider-info = { workspace = true }
codex-otel = { workspace = true }
codex-protocol = { workspace = true }
codex-trace = { workspace = true }
codex-utils-absolute-path = { workspace = true }
codex-utils-cli = { workspace = true }
codex-utils-oss = { workspace = true }

View File

@@ -449,11 +449,13 @@ pub async fn run_main(cli: Cli, arg0_paths: Arg0DispatchPaths) -> anyhow::Result
let otel_logger_layer = otel.as_ref().and_then(|o| o.logger_layer());
let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer());
let codex_trace_layer = codex_trace::local_layer_from_env().ok().flatten();
let _ = tracing_subscriber::registry()
.with(fmt_layer)
.with(otel_tracing_layer)
.with(otel_logger_layer)
.with(codex_trace_layer)
.try_init();
let exec_span = exec_root_span();

View File

@@ -32,6 +32,8 @@ pub use crate::trace_context::span_w3c_trace_context;
pub use crate::trace_context::traceparent_context_from_env;
pub use codex_utils_string::sanitize_metric_tag_value;
pub const OTEL_TRACE_SAFE_TARGET: &str = "codex_otel.trace_safe";
#[derive(Debug, Clone, Serialize, Display)]
#[serde(rename_all = "snake_case")]
pub enum ToolDecisionSource {

View File

@@ -1,6 +1,6 @@
pub(crate) const OTEL_TARGET_PREFIX: &str = "codex_otel";
pub(crate) const OTEL_LOG_ONLY_TARGET: &str = "codex_otel.log_only";
pub(crate) const OTEL_TRACE_SAFE_TARGET: &str = "codex_otel.trace_safe";
pub(crate) const OTEL_TRACE_SAFE_TARGET: &str = crate::OTEL_TRACE_SAFE_TARGET;
pub(crate) fn is_log_export_target(target: &str) -> bool {
target.starts_with(OTEL_TARGET_PREFIX) && !is_trace_safe_target(target)

26
codex-rs/trace/Cargo.toml Normal file
View File

@@ -0,0 +1,26 @@
[package]
edition.workspace = true
license.workspace = true
name = "codex-trace"
version.workspace = true
[lib]
doctest = false
name = "codex_trace"
path = "src/lib.rs"
[lints]
workspace = true
[dependencies]
anyhow = { workspace = true }
chrono = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
tracing = { workspace = true }
tracing-appender = { workspace = true }
tracing-subscriber = { workspace = true, features = ["fmt", "json", "registry"] }
[dev-dependencies]
pretty_assertions = { workspace = true }
tempfile = { workspace = true }

18
codex-rs/trace/src/lib.rs Normal file
View File

@@ -0,0 +1,18 @@
//! Lightweight local trace capture for Codex debugging.
//!
//! Runtime code emits ordinary `tracing` events. This crate only provides a
//! localhost debug layer, raw payload references, and a reducer that writes the
//! `state.json` shape consumed by the standalone trace viewer.
mod local;
mod reduce;
pub use local::CODEX_TRACE_ROOT_ENV;
pub use local::LOCAL_TRACE_TARGET;
pub use local::RawPayloadRef;
pub use local::local_layer_from_env;
pub use local::next_id;
pub use local::write_payload;
pub use local::write_payload_lazy;
pub use reduce::REDUCED_STATE_FILE_NAME;
pub use reduce::reduce_bundle_to_path;

204
codex-rs/trace/src/local.rs Normal file
View File

@@ -0,0 +1,204 @@
use std::fs::File;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Mutex;
use std::sync::OnceLock;
use std::sync::PoisonError;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use anyhow::Context;
use anyhow::Result;
use serde::Serialize;
use serde_json::json;
use tracing::Level;
use tracing::Subscriber;
use tracing_subscriber::Layer;
use tracing_subscriber::filter::Targets;
use tracing_subscriber::registry::LookupSpan;
pub const CODEX_TRACE_ROOT_ENV: &str = "CODEX_TRACE_ROOT";
pub const LOCAL_TRACE_TARGET: &str = "codex_otel.trace_safe";
static PAYLOAD_WRITER: OnceLock<Mutex<PayloadWriter>> = OnceLock::new();
static NEXT_ID: AtomicU64 = AtomicU64::new(1);
#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
pub struct RawPayloadRef {
pub raw_payload_id: String,
pub kind: String,
pub path: String,
}
#[derive(Debug)]
struct PayloadWriter {
payloads_dir: PathBuf,
next_payload_ordinal: u64,
}
pub fn local_layer_from_env<S>() -> Result<Option<impl Layer<S> + Send + Sync + 'static>>
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
let Some(root) = std::env::var_os(CODEX_TRACE_ROOT_ENV) else {
return Ok(None);
};
local_layer_for_root(root).map(Some)
}
fn local_layer_for_root<S>(
root: impl Into<PathBuf>,
) -> Result<impl Layer<S> + Send + Sync + 'static>
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
let root = root.into();
std::fs::create_dir_all(&root)
.with_context(|| format!("create trace root {}", root.display()))?;
let started_at_unix_ms = unix_time_ms();
let process_id = std::process::id();
let trace_id = format!("trace-{started_at_unix_ms}-{process_id}");
let bundle_dir = root.join(&trace_id);
let payloads_dir = bundle_dir.join("payloads");
std::fs::create_dir_all(&payloads_dir)
.with_context(|| format!("create trace payload dir {}", payloads_dir.display()))?;
write_json_file(
&bundle_dir.join("manifest.json"),
&json!({
"schema_version": 1,
"trace_id": trace_id,
"started_at_unix_ms": started_at_unix_ms,
"event_log": {
"format": "tracing_subscriber_fmt_json",
"path": "events.jsonl"
},
}),
)?;
let writer = PAYLOAD_WRITER.get_or_init(|| {
Mutex::new(PayloadWriter {
payloads_dir: PathBuf::new(),
next_payload_ordinal: 1,
})
});
*writer.lock().unwrap_or_else(PoisonError::into_inner) = PayloadWriter {
payloads_dir,
next_payload_ordinal: 1,
};
let event_log = tracing_appender::rolling::never(&bundle_dir, "events.jsonl");
let layer = tracing_subscriber::fmt::layer()
.json()
.flatten_event(true)
.with_current_span(false)
.with_span_list(false)
.with_writer(event_log)
.with_filter(Targets::new().with_target(LOCAL_TRACE_TARGET, Level::INFO));
Ok(layer)
}
pub fn next_id(prefix: &str) -> String {
let ordinal = NEXT_ID.fetch_add(1, Ordering::Relaxed);
format!("{prefix}:{ordinal}")
}
pub fn write_payload(kind: &str, value: &impl Serialize) -> Option<RawPayloadRef> {
let writer = PAYLOAD_WRITER.get()?;
let mut writer = writer.lock().unwrap_or_else(PoisonError::into_inner);
write_payload_locked(&mut writer, kind, value)
}
pub fn write_payload_lazy<T>(kind: &str, build_value: impl FnOnce() -> T) -> Option<RawPayloadRef>
where
T: Serialize,
{
let writer = PAYLOAD_WRITER.get()?;
let value = build_value();
let mut writer = writer.lock().unwrap_or_else(PoisonError::into_inner);
write_payload_locked(&mut writer, kind, &value)
}
fn write_payload_locked(
writer: &mut PayloadWriter,
kind: &str,
value: &impl Serialize,
) -> Option<RawPayloadRef> {
let ordinal = writer.next_payload_ordinal;
writer.next_payload_ordinal += 1;
let raw_payload_id = format!("raw_payload:{ordinal}");
let path = format!("payloads/{ordinal}.json");
let absolute_path = writer.payloads_dir.join(format!("{ordinal}.json"));
if write_json_file(&absolute_path, value).is_err() {
return None;
}
Some(RawPayloadRef {
raw_payload_id,
kind: kind.to_string(),
path,
})
}
fn write_json_file(path: &Path, value: &impl Serialize) -> Result<()> {
let file = File::create(path).with_context(|| format!("create {}", path.display()))?;
serde_json::to_writer_pretty(file, value)
.with_context(|| format!("write JSON {}", path.display()))
}
pub(crate) fn unix_time_ms() -> i64 {
let duration = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_else(|_| Duration::from_millis(0));
i64::try_from(duration.as_millis()).unwrap_or(i64::MAX)
}
#[cfg(test)]
mod tests {
use tempfile::TempDir;
use tracing_subscriber::prelude::*;
#[test]
fn local_layer_can_be_composed_after_assignment() -> anyhow::Result<()> {
let temp = TempDir::new()?;
let layer = super::local_layer_for_root(temp.path())?;
let _subscriber = tracing_subscriber::registry().with(layer);
Ok(())
}
#[test]
fn local_layer_writes_standard_json_events() -> anyhow::Result<()> {
let temp = TempDir::new()?;
let layer = super::local_layer_for_root(temp.path())?;
let subscriber = tracing_subscriber::registry().with(layer);
tracing::subscriber::with_default(subscriber, || {
tracing::event!(
target: super::LOCAL_TRACE_TARGET,
tracing::Level::INFO,
event.name = %"codex.turn.started",
thread.id = %"thread-1",
turn.id = %"turn-1",
);
});
let trace_dir = std::fs::read_dir(temp.path())?
.next()
.transpose()?
.expect("trace directory should be created")
.path();
let event_log = std::fs::read_to_string(trace_dir.join("events.jsonl"))?;
let event: serde_json::Value = serde_json::from_str(event_log.trim())?;
assert_eq!(event["target"], super::LOCAL_TRACE_TARGET);
assert_eq!(event["event.name"], "codex.turn.started");
assert_eq!(event["thread.id"], "thread-1");
assert_eq!(event["turn.id"], "turn-1");
assert!(
event
.get("timestamp")
.and_then(serde_json::Value::as_str)
.is_some()
);
Ok(())
}
}

1326
codex-rs/trace/src/reduce.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -48,6 +48,7 @@ codex-rollout = { workspace = true }
codex-shell-command = { workspace = true }
codex-state = { workspace = true }
codex-terminal-detection = { workspace = true }
codex-trace = { workspace = true }
codex-utils-approval-presets = { workspace = true }
codex-utils-absolute-path = { workspace = true }
codex-utils-cli = { workspace = true }

View File

@@ -978,6 +978,7 @@ pub async fn run_main(
let otel_logger_layer = otel.as_ref().and_then(|o| o.logger_layer());
let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer());
let codex_trace_layer = codex_trace::local_layer_from_env().ok().flatten();
let log_db = get_state_db(&config).await.map(log_db::start);
let log_db_layer = log_db
@@ -991,6 +992,7 @@ pub async fn run_main(
.with(log_db_layer)
.with(otel_logger_layer)
.with(otel_tracing_layer)
.with(codex_trace_layer)
.try_init();
run_ratatui_app(