mirror of
https://github.com/openai/codex.git
synced 2026-05-06 12:26:38 +00:00
Compare commits
1 Commits
pr20398
...
jif/new-tr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e4e5829d71 |
20
codex-rs/Cargo.lock
generated
20
codex-rs/Cargo.lock
generated
@@ -1464,6 +1464,7 @@ dependencies = [
|
||||
"codex-state",
|
||||
"codex-thread-store",
|
||||
"codex-tools",
|
||||
"codex-trace",
|
||||
"codex-utils-absolute-path",
|
||||
"codex-utils-cargo-bin",
|
||||
"codex-utils-cli",
|
||||
@@ -1695,6 +1696,7 @@ dependencies = [
|
||||
"codex-state",
|
||||
"codex-stdio-to-uds",
|
||||
"codex-terminal-detection",
|
||||
"codex-trace",
|
||||
"codex-tui",
|
||||
"codex-utils-absolute-path",
|
||||
"codex-utils-cargo-bin",
|
||||
@@ -1943,6 +1945,7 @@ dependencies = [
|
||||
"codex-test-binary-support",
|
||||
"codex-thread-store",
|
||||
"codex-tools",
|
||||
"codex-trace",
|
||||
"codex-utils-absolute-path",
|
||||
"codex-utils-cache",
|
||||
"codex-utils-cargo-bin",
|
||||
@@ -2103,6 +2106,7 @@ dependencies = [
|
||||
"codex-model-provider-info",
|
||||
"codex-otel",
|
||||
"codex-protocol",
|
||||
"codex-trace",
|
||||
"codex-utils-absolute-path",
|
||||
"codex-utils-cargo-bin",
|
||||
"codex-utils-cli",
|
||||
@@ -2911,6 +2915,21 @@ dependencies = [
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "codex-trace"
|
||||
version = "0.0.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"chrono",
|
||||
"pretty_assertions",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tempfile",
|
||||
"tracing",
|
||||
"tracing-appender",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "codex-tui"
|
||||
version = "0.0.0"
|
||||
@@ -2946,6 +2965,7 @@ dependencies = [
|
||||
"codex-shell-command",
|
||||
"codex-state",
|
||||
"codex-terminal-detection",
|
||||
"codex-trace",
|
||||
"codex-utils-absolute-path",
|
||||
"codex-utils-approval-presets",
|
||||
"codex-utils-cargo-bin",
|
||||
|
||||
@@ -57,6 +57,7 @@ members = [
|
||||
"response-debug-context",
|
||||
"sandboxing",
|
||||
"stdio-to-uds",
|
||||
"trace",
|
||||
"otel",
|
||||
"tui",
|
||||
"tools",
|
||||
@@ -169,6 +170,7 @@ codex-skills = { path = "skills" }
|
||||
codex-state = { path = "state" }
|
||||
codex-stdio-to-uds = { path = "stdio-to-uds" }
|
||||
codex-terminal-detection = { path = "terminal-detection" }
|
||||
codex-trace = { path = "trace" }
|
||||
codex-test-binary-support = { path = "test-binary-support" }
|
||||
codex-thread-store = { path = "thread-store" }
|
||||
codex-tools = { path = "tools" }
|
||||
|
||||
@@ -57,6 +57,7 @@ codex-sandboxing = { workspace = true }
|
||||
codex-state = { workspace = true }
|
||||
codex-thread-store = { workspace = true }
|
||||
codex-tools = { workspace = true }
|
||||
codex-trace = { workspace = true }
|
||||
codex-utils-absolute-path = { workspace = true }
|
||||
codex-utils-json-to-toml = { workspace = true }
|
||||
codex-utils-rustls-provider = { workspace = true }
|
||||
|
||||
@@ -522,6 +522,7 @@ pub async fn run_main_with_transport(
|
||||
.map(|layer| layer.with_filter(Targets::new().with_default(Level::TRACE)));
|
||||
let otel_logger_layer = otel.as_ref().and_then(|o| o.logger_layer());
|
||||
let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer());
|
||||
let codex_trace_layer = codex_trace::local_layer_from_env().ok().flatten();
|
||||
let _ = tracing_subscriber::registry()
|
||||
.with(stderr_fmt)
|
||||
.with(feedback_layer)
|
||||
@@ -529,6 +530,7 @@ pub async fn run_main_with_transport(
|
||||
.with(log_db_layer)
|
||||
.with(otel_logger_layer)
|
||||
.with(otel_tracing_layer)
|
||||
.with(codex_trace_layer)
|
||||
.try_init();
|
||||
for warning in &config_warnings {
|
||||
match &warning.details {
|
||||
|
||||
@@ -44,6 +44,7 @@ codex-sandboxing = { workspace = true }
|
||||
codex-state = { workspace = true }
|
||||
codex-stdio-to-uds = { workspace = true }
|
||||
codex-terminal-detection = { workspace = true }
|
||||
codex-trace = { workspace = true }
|
||||
codex-tui = { workspace = true }
|
||||
codex-utils-absolute-path = { workspace = true }
|
||||
codex-utils-path = { workspace = true }
|
||||
|
||||
@@ -24,6 +24,8 @@ use codex_execpolicy::ExecPolicyCheckCommand;
|
||||
use codex_responses_api_proxy::Args as ResponsesApiProxyArgs;
|
||||
use codex_state::StateRuntime;
|
||||
use codex_state::state_db_path;
|
||||
use codex_trace::REDUCED_STATE_FILE_NAME;
|
||||
use codex_trace::reduce_bundle_to_path;
|
||||
use codex_tui::AppExitInfo;
|
||||
use codex_tui::Cli as TuiCli;
|
||||
use codex_tui::ExitReason;
|
||||
@@ -191,6 +193,9 @@ enum DebugSubcommand {
|
||||
/// Render the model-visible prompt input list as JSON.
|
||||
PromptInput(DebugPromptInputCommand),
|
||||
|
||||
/// Reduce a local trace bundle into viewer state JSON.
|
||||
TraceReduce(DebugTraceReduceCommand),
|
||||
|
||||
/// Internal: reset local memory state for a fresh start.
|
||||
#[clap(hide = true)]
|
||||
ClearMemories,
|
||||
@@ -225,6 +230,17 @@ struct DebugPromptInputCommand {
|
||||
images: Vec<PathBuf>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
struct DebugTraceReduceCommand {
|
||||
/// Trace bundle directory containing manifest.json and events.jsonl.
|
||||
#[arg(value_name = "TRACE_BUNDLE")]
|
||||
trace_bundle: PathBuf,
|
||||
|
||||
/// Output state.json path. Defaults to TRACE_BUNDLE/state.json.
|
||||
#[arg(long = "output", short = 'o', value_name = "FILE")]
|
||||
output: Option<PathBuf>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
struct ResumeCommand {
|
||||
/// Conversation/session id (UUID) or thread name. UUIDs take precedence if it parses.
|
||||
@@ -992,6 +1008,14 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
DebugSubcommand::TraceReduce(cmd) => {
|
||||
reject_remote_mode_for_subcommand(
|
||||
root_remote.as_deref(),
|
||||
root_remote_auth_token_env.as_deref(),
|
||||
"debug trace-reduce",
|
||||
)?;
|
||||
run_debug_trace_reduce_command(cmd).await?;
|
||||
}
|
||||
DebugSubcommand::ClearMemories => {
|
||||
reject_remote_mode_for_subcommand(
|
||||
root_remote.as_deref(),
|
||||
@@ -1259,6 +1283,15 @@ async fn run_debug_prompt_input_command(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_debug_trace_reduce_command(cmd: DebugTraceReduceCommand) -> anyhow::Result<()> {
|
||||
let output = cmd
|
||||
.output
|
||||
.unwrap_or_else(|| cmd.trace_bundle.join(REDUCED_STATE_FILE_NAME));
|
||||
reduce_bundle_to_path(&cmd.trace_bundle, &output)?;
|
||||
println!("{}", output.display());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_debug_clear_memories_command(
|
||||
root_config_overrides: &CliConfigOverrides,
|
||||
interactive: &TuiCli,
|
||||
|
||||
@@ -60,6 +60,7 @@ codex-state = { workspace = true }
|
||||
codex-terminal-detection = { workspace = true }
|
||||
codex-thread-store = { workspace = true }
|
||||
codex-tools = { workspace = true }
|
||||
codex-trace = { workspace = true }
|
||||
codex-utils-absolute-path = { workspace = true }
|
||||
codex-utils-cache = { workspace = true }
|
||||
codex-utils-image = { workspace = true }
|
||||
|
||||
@@ -85,6 +85,7 @@ use http::HeaderMap as ApiHeaderMap;
|
||||
use http::HeaderValue;
|
||||
use http::StatusCode as HttpStatusCode;
|
||||
use reqwest::StatusCode;
|
||||
use serde_json::json;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use tokio::sync::mpsc;
|
||||
@@ -137,6 +138,13 @@ const MEMORIES_SUMMARIZE_ENDPOINT: &str = "/memories/trace_summarize";
|
||||
pub(crate) const WEBSOCKET_CONNECT_TIMEOUT: Duration =
|
||||
Duration::from_millis(DEFAULT_WEBSOCKET_CONNECT_TIMEOUT_MS);
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) struct CodexTraceContext {
|
||||
pub(crate) thread_id: String,
|
||||
pub(crate) turn_id: String,
|
||||
pub(crate) inference_call_id: String,
|
||||
}
|
||||
|
||||
/// Session-scoped state shared by all [`ModelClient`] clones.
|
||||
///
|
||||
/// This is intentionally kept minimal so `ModelClient` does not need to hold a full `Config`. Most
|
||||
@@ -1140,6 +1148,7 @@ impl ModelClientSession {
|
||||
summary: ReasoningSummaryConfig,
|
||||
service_tier: Option<ServiceTier>,
|
||||
turn_metadata_header: Option<&str>,
|
||||
trace_context: Option<CodexTraceContext>,
|
||||
) -> Result<ResponseStream> {
|
||||
if let Some(path) = &*CODEX_RS_SSE_FIXTURE {
|
||||
warn!(path, "Streaming from fixture");
|
||||
@@ -1148,7 +1157,11 @@ impl ModelClientSession {
|
||||
self.client.state.provider.stream_idle_timeout(),
|
||||
)
|
||||
.map_err(map_api_error)?;
|
||||
let (stream, _last_request_rx) = map_response_stream(stream, session_telemetry.clone());
|
||||
let (stream, _last_request_rx) = map_response_stream(
|
||||
stream,
|
||||
session_telemetry.clone(),
|
||||
/*trace_context*/ None,
|
||||
);
|
||||
return Ok(stream);
|
||||
}
|
||||
|
||||
@@ -1157,6 +1170,7 @@ impl ModelClientSession {
|
||||
.as_ref()
|
||||
.map(AuthManager::unauthorized_recovery);
|
||||
let mut pending_retry = PendingUnauthorizedRetry::default();
|
||||
let mut trace_started = false;
|
||||
loop {
|
||||
let client_setup = self.client.current_client_setup().await?;
|
||||
let transport = ReqwestTransport::new(build_reqwest_client());
|
||||
@@ -1182,6 +1196,18 @@ impl ModelClientSession {
|
||||
summary,
|
||||
service_tier,
|
||||
)?;
|
||||
if let Some(trace_context) = &trace_context
|
||||
&& !trace_started
|
||||
{
|
||||
let request_payload = codex_trace::write_payload("inference_request", &request);
|
||||
emit_inference_started(
|
||||
trace_context,
|
||||
model_info,
|
||||
&self.client.state.provider.name,
|
||||
request_payload.as_ref(),
|
||||
);
|
||||
trace_started = true;
|
||||
}
|
||||
let client = ApiResponsesClient::new(
|
||||
transport,
|
||||
client_setup.api_provider,
|
||||
@@ -1192,7 +1218,8 @@ impl ModelClientSession {
|
||||
|
||||
match stream_result {
|
||||
Ok(stream) => {
|
||||
let (stream, _) = map_response_stream(stream, session_telemetry.clone());
|
||||
let (stream, _) =
|
||||
map_response_stream(stream, session_telemetry.clone(), trace_context);
|
||||
return Ok(stream);
|
||||
}
|
||||
Err(ApiError::Transport(
|
||||
@@ -1208,7 +1235,13 @@ impl ModelClientSession {
|
||||
);
|
||||
continue;
|
||||
}
|
||||
Err(err) => return Err(map_api_error(err)),
|
||||
Err(err) => {
|
||||
let mapped = map_api_error(err);
|
||||
if let Some(trace_context) = &trace_context {
|
||||
emit_inference_failed(trace_context, &mapped.to_string());
|
||||
}
|
||||
return Err(mapped);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1237,6 +1270,7 @@ impl ModelClientSession {
|
||||
summary: ReasoningSummaryConfig,
|
||||
service_tier: Option<ServiceTier>,
|
||||
turn_metadata_header: Option<&str>,
|
||||
trace_context: Option<CodexTraceContext>,
|
||||
warmup: bool,
|
||||
request_trace: Option<W3cTraceContext>,
|
||||
) -> Result<WebsocketStreamOutcome> {
|
||||
@@ -1312,18 +1346,38 @@ impl ModelClientSession {
|
||||
}
|
||||
|
||||
let ws_request = self.prepare_websocket_request(ws_payload, &request);
|
||||
let request_payload = trace_context
|
||||
.as_ref()
|
||||
.and_then(|_| codex_trace::write_payload("inference_request", &ws_request));
|
||||
if let Some(trace_context) = &trace_context {
|
||||
emit_inference_started(
|
||||
trace_context,
|
||||
model_info,
|
||||
&self.client.state.provider.name,
|
||||
request_payload.as_ref(),
|
||||
);
|
||||
}
|
||||
self.websocket_session.last_request = Some(request);
|
||||
let stream_result = self.websocket_session.connection.as_ref().ok_or_else(|| {
|
||||
map_api_error(ApiError::Stream(
|
||||
"websocket connection is unavailable".to_string(),
|
||||
))
|
||||
})?;
|
||||
let stream_result = stream_result
|
||||
let stream_result = match stream_result
|
||||
.stream_request(ws_request, self.websocket_session.connection_reused())
|
||||
.await
|
||||
.map_err(map_api_error)?;
|
||||
{
|
||||
Ok(stream) => stream,
|
||||
Err(err) => {
|
||||
let mapped = map_api_error(err);
|
||||
if let Some(trace_context) = &trace_context {
|
||||
emit_inference_failed(trace_context, &mapped.to_string());
|
||||
}
|
||||
return Err(mapped);
|
||||
}
|
||||
};
|
||||
let (stream, last_request_rx) =
|
||||
map_response_stream(stream_result, session_telemetry.clone());
|
||||
map_response_stream(stream_result, session_telemetry.clone(), trace_context);
|
||||
self.websocket_session.last_response_rx = Some(last_request_rx);
|
||||
return Ok(WebsocketStreamOutcome::Stream(stream));
|
||||
}
|
||||
@@ -1391,6 +1445,7 @@ impl ModelClientSession {
|
||||
summary,
|
||||
service_tier,
|
||||
turn_metadata_header,
|
||||
/*trace_context*/ None,
|
||||
/*warmup*/ true,
|
||||
current_span_w3c_trace_context(),
|
||||
)
|
||||
@@ -1431,6 +1486,43 @@ impl ModelClientSession {
|
||||
summary: ReasoningSummaryConfig,
|
||||
service_tier: Option<ServiceTier>,
|
||||
turn_metadata_header: Option<&str>,
|
||||
) -> Result<ResponseStream> {
|
||||
self.stream_with_trace(
|
||||
prompt,
|
||||
model_info,
|
||||
session_telemetry,
|
||||
effort,
|
||||
summary,
|
||||
service_tier,
|
||||
turn_metadata_header,
|
||||
/*trace_context*/ None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[tracing::instrument(
|
||||
target = "codex_otel.trace_safe",
|
||||
name = "codex.inference",
|
||||
skip_all,
|
||||
fields(
|
||||
thread.id = %trace_context.as_ref().map(|ctx| ctx.thread_id.as_str()).unwrap_or(""),
|
||||
turn.id = %trace_context.as_ref().map(|ctx| ctx.turn_id.as_str()).unwrap_or(""),
|
||||
inference.id = %trace_context.as_ref().map(|ctx| ctx.inference_call_id.as_str()).unwrap_or(""),
|
||||
model = %model_info.slug,
|
||||
provider.name = %self.client.state.provider.name,
|
||||
)
|
||||
)]
|
||||
pub(crate) async fn stream_with_trace(
|
||||
&mut self,
|
||||
prompt: &Prompt,
|
||||
model_info: &ModelInfo,
|
||||
session_telemetry: &SessionTelemetry,
|
||||
effort: Option<ReasoningEffortConfig>,
|
||||
summary: ReasoningSummaryConfig,
|
||||
service_tier: Option<ServiceTier>,
|
||||
turn_metadata_header: Option<&str>,
|
||||
trace_context: Option<CodexTraceContext>,
|
||||
) -> Result<ResponseStream> {
|
||||
let wire_api = self.client.state.provider.wire_api;
|
||||
match wire_api {
|
||||
@@ -1446,6 +1538,7 @@ impl ModelClientSession {
|
||||
summary,
|
||||
service_tier,
|
||||
turn_metadata_header,
|
||||
trace_context.clone(),
|
||||
/*warmup*/ false,
|
||||
request_trace,
|
||||
)
|
||||
@@ -1466,6 +1559,7 @@ impl ModelClientSession {
|
||||
summary,
|
||||
service_tier,
|
||||
turn_metadata_header,
|
||||
trace_context,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -1558,9 +1652,62 @@ fn parent_thread_id_header_value(session_source: &SessionSource) -> Option<Strin
|
||||
}
|
||||
}
|
||||
|
||||
fn emit_inference_started(
|
||||
trace_context: &CodexTraceContext,
|
||||
model_info: &ModelInfo,
|
||||
provider_name: &str,
|
||||
request_payload: Option<&codex_trace::RawPayloadRef>,
|
||||
) {
|
||||
tracing::event!(
|
||||
target: codex_otel::OTEL_TRACE_SAFE_TARGET,
|
||||
tracing::Level::INFO,
|
||||
event.name = %"codex.inference.started",
|
||||
thread.id = %trace_context.thread_id,
|
||||
turn.id = %trace_context.turn_id,
|
||||
inference.id = %trace_context.inference_call_id,
|
||||
model = %model_info.slug,
|
||||
provider.name = %provider_name,
|
||||
raw_payload.request.id = %request_payload.map(|payload| payload.raw_payload_id.as_str()).unwrap_or(""),
|
||||
raw_payload.request.path = %request_payload.map(|payload| payload.path.as_str()).unwrap_or(""),
|
||||
raw_payload.request.kind = %request_payload.map(|payload| payload.kind.as_str()).unwrap_or(""),
|
||||
);
|
||||
}
|
||||
|
||||
fn emit_inference_completed(
|
||||
trace_context: &CodexTraceContext,
|
||||
response_id: &str,
|
||||
response_payload: Option<&codex_trace::RawPayloadRef>,
|
||||
) {
|
||||
tracing::event!(
|
||||
target: codex_otel::OTEL_TRACE_SAFE_TARGET,
|
||||
tracing::Level::INFO,
|
||||
event.name = %"codex.inference.completed",
|
||||
thread.id = %trace_context.thread_id,
|
||||
turn.id = %trace_context.turn_id,
|
||||
inference.id = %trace_context.inference_call_id,
|
||||
response.id = %response_id,
|
||||
raw_payload.response.id = %response_payload.map(|payload| payload.raw_payload_id.as_str()).unwrap_or(""),
|
||||
raw_payload.response.path = %response_payload.map(|payload| payload.path.as_str()).unwrap_or(""),
|
||||
raw_payload.response.kind = %response_payload.map(|payload| payload.kind.as_str()).unwrap_or(""),
|
||||
);
|
||||
}
|
||||
|
||||
fn emit_inference_failed(trace_context: &CodexTraceContext, error: &str) {
|
||||
tracing::event!(
|
||||
target: codex_otel::OTEL_TRACE_SAFE_TARGET,
|
||||
tracing::Level::INFO,
|
||||
event.name = %"codex.inference.failed",
|
||||
thread.id = %trace_context.thread_id,
|
||||
turn.id = %trace_context.turn_id,
|
||||
inference.id = %trace_context.inference_call_id,
|
||||
error = %error,
|
||||
);
|
||||
}
|
||||
|
||||
fn map_response_stream<S>(
|
||||
api_stream: S,
|
||||
session_telemetry: SessionTelemetry,
|
||||
trace_context: Option<CodexTraceContext>,
|
||||
) -> (ResponseStream, oneshot::Receiver<LastResponse>)
|
||||
where
|
||||
S: futures::Stream<Item = std::result::Result<ResponseEvent, ApiError>>
|
||||
@@ -1592,6 +1739,17 @@ where
|
||||
response_id,
|
||||
token_usage,
|
||||
}) => {
|
||||
if let Some(trace_context) = &trace_context {
|
||||
let payload = codex_trace::write_payload(
|
||||
"inference_response_summary",
|
||||
&json!({
|
||||
"response_id": response_id,
|
||||
"token_usage": token_usage.clone(),
|
||||
"output_items": items_added.clone(),
|
||||
}),
|
||||
);
|
||||
emit_inference_completed(trace_context, &response_id, payload.as_ref());
|
||||
}
|
||||
if let Some(usage) = &token_usage {
|
||||
session_telemetry.sse_event_completed(
|
||||
usage.input_tokens,
|
||||
@@ -1627,6 +1785,9 @@ where
|
||||
let mapped = map_api_error(err);
|
||||
if !logged_error {
|
||||
session_telemetry.see_event_completed_failed(&mapped);
|
||||
if let Some(trace_context) = &trace_context {
|
||||
emit_inference_failed(trace_context, &mapped.to_string());
|
||||
}
|
||||
logged_error = true;
|
||||
}
|
||||
if tx_event.send(Err(mapped)).await.is_err() {
|
||||
|
||||
@@ -180,6 +180,7 @@ use tracing::trace_span;
|
||||
use tracing::warn;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::client::CodexTraceContext;
|
||||
use crate::client::ModelClient;
|
||||
use crate::client::ModelClientSession;
|
||||
use crate::client_common::Prompt;
|
||||
@@ -1327,6 +1328,200 @@ pub(crate) struct AppServerClientMetadata {
|
||||
pub(crate) client_version: Option<String>,
|
||||
}
|
||||
|
||||
fn emit_trace_safe_event_for_event_msg(thread_id: ThreadId, turn_id: &str, msg: &EventMsg) {
|
||||
match msg {
|
||||
EventMsg::TurnComplete(_) => {
|
||||
emit_turn_ended_trace_event(thread_id, turn_id, "completed");
|
||||
}
|
||||
EventMsg::TurnAborted(_) => {
|
||||
emit_turn_ended_trace_event(thread_id, turn_id, "aborted");
|
||||
}
|
||||
EventMsg::CollabAgentSpawnBegin(event) => {
|
||||
let tool_call_id = trace_tool_call_id(&event.call_id);
|
||||
emit_collab_trace_event(
|
||||
"codex.collab.spawn.started",
|
||||
thread_id,
|
||||
turn_id,
|
||||
&tool_call_id,
|
||||
event.sender_thread_id,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
);
|
||||
}
|
||||
EventMsg::CollabAgentSpawnEnd(event) => {
|
||||
let tool_call_id = trace_tool_call_id(&event.call_id);
|
||||
emit_collab_trace_event(
|
||||
"codex.collab.spawn.ended",
|
||||
thread_id,
|
||||
turn_id,
|
||||
&tool_call_id,
|
||||
event.sender_thread_id,
|
||||
event.new_thread_id,
|
||||
event.new_agent_nickname.as_deref(),
|
||||
event.new_agent_role.as_deref(),
|
||||
);
|
||||
}
|
||||
EventMsg::CollabAgentInteractionBegin(event) => {
|
||||
let tool_call_id = trace_tool_call_id(&event.call_id);
|
||||
emit_collab_trace_event(
|
||||
"codex.collab.message.started",
|
||||
thread_id,
|
||||
turn_id,
|
||||
&tool_call_id,
|
||||
event.sender_thread_id,
|
||||
Some(event.receiver_thread_id),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
}
|
||||
EventMsg::CollabAgentInteractionEnd(event) => {
|
||||
let tool_call_id = trace_tool_call_id(&event.call_id);
|
||||
emit_collab_trace_event(
|
||||
"codex.collab.message.ended",
|
||||
thread_id,
|
||||
turn_id,
|
||||
&tool_call_id,
|
||||
event.sender_thread_id,
|
||||
Some(event.receiver_thread_id),
|
||||
event.receiver_agent_nickname.as_deref(),
|
||||
event.receiver_agent_role.as_deref(),
|
||||
);
|
||||
}
|
||||
EventMsg::CollabWaitingBegin(event) => {
|
||||
let tool_call_id = trace_tool_call_id(&event.call_id);
|
||||
for receiver_thread_id in &event.receiver_thread_ids {
|
||||
emit_collab_trace_event(
|
||||
"codex.collab.wait.started",
|
||||
thread_id,
|
||||
turn_id,
|
||||
&tool_call_id,
|
||||
event.sender_thread_id,
|
||||
Some(*receiver_thread_id),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
}
|
||||
}
|
||||
EventMsg::CollabWaitingEnd(event) => {
|
||||
let tool_call_id = trace_tool_call_id(&event.call_id);
|
||||
if event.agent_statuses.is_empty() {
|
||||
if event.statuses.is_empty() {
|
||||
emit_collab_trace_event(
|
||||
"codex.collab.wait.ended",
|
||||
thread_id,
|
||||
turn_id,
|
||||
&tool_call_id,
|
||||
event.sender_thread_id,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
);
|
||||
} else {
|
||||
for receiver_thread_id in event.statuses.keys() {
|
||||
emit_collab_trace_event(
|
||||
"codex.collab.wait.ended",
|
||||
thread_id,
|
||||
turn_id,
|
||||
&tool_call_id,
|
||||
event.sender_thread_id,
|
||||
Some(*receiver_thread_id),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for agent_status in &event.agent_statuses {
|
||||
emit_collab_trace_event(
|
||||
"codex.collab.wait.ended",
|
||||
thread_id,
|
||||
turn_id,
|
||||
&tool_call_id,
|
||||
event.sender_thread_id,
|
||||
Some(agent_status.thread_id),
|
||||
agent_status.agent_nickname.as_deref(),
|
||||
agent_status.agent_role.as_deref(),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
EventMsg::CollabCloseBegin(event) => {
|
||||
let tool_call_id = trace_tool_call_id(&event.call_id);
|
||||
emit_collab_trace_event(
|
||||
"codex.collab.close.started",
|
||||
thread_id,
|
||||
turn_id,
|
||||
&tool_call_id,
|
||||
event.sender_thread_id,
|
||||
Some(event.receiver_thread_id),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
}
|
||||
EventMsg::CollabCloseEnd(event) => {
|
||||
let tool_call_id = trace_tool_call_id(&event.call_id);
|
||||
emit_collab_trace_event(
|
||||
"codex.collab.close.ended",
|
||||
thread_id,
|
||||
turn_id,
|
||||
&tool_call_id,
|
||||
event.sender_thread_id,
|
||||
Some(event.receiver_thread_id),
|
||||
event.receiver_agent_nickname.as_deref(),
|
||||
event.receiver_agent_role.as_deref(),
|
||||
);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
fn emit_turn_ended_trace_event(thread_id: ThreadId, turn_id: &str, status: &str) {
|
||||
tracing::event!(
|
||||
target: codex_otel::OTEL_TRACE_SAFE_TARGET,
|
||||
tracing::Level::INFO,
|
||||
event.name = %"codex.turn.ended",
|
||||
thread.id = %thread_id,
|
||||
turn.id = %turn_id,
|
||||
status = %status,
|
||||
);
|
||||
}
|
||||
|
||||
fn emit_collab_trace_event(
|
||||
event_name: &str,
|
||||
thread_id: ThreadId,
|
||||
turn_id: &str,
|
||||
tool_call_id: &str,
|
||||
sender_thread_id: ThreadId,
|
||||
target_thread_id: Option<ThreadId>,
|
||||
target_agent_nickname: Option<&str>,
|
||||
target_agent_role: Option<&str>,
|
||||
) {
|
||||
let target_thread_id = target_thread_id
|
||||
.map(|thread_id| thread_id.to_string())
|
||||
.unwrap_or_default();
|
||||
tracing::event!(
|
||||
target: codex_otel::OTEL_TRACE_SAFE_TARGET,
|
||||
tracing::Level::INFO,
|
||||
event.name = %event_name,
|
||||
thread.id = %thread_id,
|
||||
turn.id = %turn_id,
|
||||
tool.call_id = %tool_call_id,
|
||||
sender.thread.id = %sender_thread_id,
|
||||
target.thread.id = %target_thread_id,
|
||||
target.agent.nickname = %target_agent_nickname.unwrap_or(""),
|
||||
target.agent.role = %target_agent_role.unwrap_or(""),
|
||||
);
|
||||
}
|
||||
|
||||
fn trace_tool_call_id(call_id: &str) -> String {
|
||||
if call_id.starts_with("tool:") {
|
||||
call_id.to_string()
|
||||
} else {
|
||||
format!("tool:{call_id}")
|
||||
}
|
||||
}
|
||||
|
||||
impl Session {
|
||||
pub(crate) async fn app_server_client_metadata(&self) -> AppServerClientMetadata {
|
||||
let state = self.state.lock().await;
|
||||
@@ -2190,6 +2385,32 @@ impl Session {
|
||||
let mut guard = network_policy_decider_session.write().await;
|
||||
*guard = Arc::downgrade(&sess);
|
||||
}
|
||||
let thread_id = conversation_id.to_string();
|
||||
let (agent_path, parent_thread_id) = match &session_configuration.session_source {
|
||||
SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id,
|
||||
agent_path,
|
||||
..
|
||||
}) => (
|
||||
agent_path
|
||||
.as_ref()
|
||||
.map(ToString::to_string)
|
||||
.unwrap_or_else(|| "/root".to_string()),
|
||||
Some(parent_thread_id.to_string()),
|
||||
),
|
||||
_ => ("/root".to_string(), None),
|
||||
};
|
||||
let session_source = session_configuration.session_source.to_string();
|
||||
tracing::event!(
|
||||
target: codex_otel::OTEL_TRACE_SAFE_TARGET,
|
||||
tracing::Level::INFO,
|
||||
event.name = %"codex.thread.started",
|
||||
thread.id = %thread_id,
|
||||
agent.path = %agent_path,
|
||||
model = session_configuration.collaboration_mode.model(),
|
||||
thread.source = %session_source,
|
||||
parent_thread.id = %parent_thread_id.as_deref().unwrap_or(""),
|
||||
);
|
||||
// Dispatch the SessionConfiguredEvent first and then report any errors.
|
||||
// If resuming, include converted initial messages in the payload so UIs can render them immediately.
|
||||
let initial_messages = initial_history.get_event_msgs();
|
||||
@@ -2764,6 +2985,13 @@ impl Session {
|
||||
if let Some(final_schema) = final_output_json_schema {
|
||||
turn_context.final_output_json_schema = final_schema;
|
||||
}
|
||||
tracing::event!(
|
||||
target: codex_otel::OTEL_TRACE_SAFE_TARGET,
|
||||
tracing::Level::INFO,
|
||||
event.name = %"codex.turn.started",
|
||||
thread.id = %self.conversation_id,
|
||||
turn.id = %turn_context.sub_id,
|
||||
);
|
||||
let turn_context = Arc::new(turn_context);
|
||||
turn_context.turn_metadata_state.spawn_git_enrichment_task();
|
||||
turn_context
|
||||
@@ -2892,6 +3120,11 @@ impl Session {
|
||||
/// Persist the event to rollout and send it to clients.
|
||||
pub(crate) async fn send_event(&self, turn_context: &TurnContext, msg: EventMsg) {
|
||||
let legacy_source = msg.clone();
|
||||
emit_trace_safe_event_for_event_msg(
|
||||
self.conversation_id,
|
||||
&turn_context.sub_id,
|
||||
&legacy_source,
|
||||
);
|
||||
let event = Event {
|
||||
id: turn_context.sub_id.clone(),
|
||||
msg,
|
||||
@@ -7876,8 +8109,13 @@ async fn try_run_sampling_request(
|
||||
auth_mode = sess.services.auth_manager.auth_mode(),
|
||||
features = sess.features.enabled_features(),
|
||||
);
|
||||
let trace_context = CodexTraceContext {
|
||||
thread_id: sess.conversation_id.to_string(),
|
||||
turn_id: turn_context.sub_id.clone(),
|
||||
inference_call_id: codex_trace::next_id("inference"),
|
||||
};
|
||||
let mut stream = client_session
|
||||
.stream(
|
||||
.stream_with_trace(
|
||||
prompt,
|
||||
&turn_context.model_info,
|
||||
&turn_context.session_telemetry,
|
||||
@@ -7885,6 +8123,7 @@ async fn try_run_sampling_request(
|
||||
turn_context.reasoning_summary,
|
||||
turn_context.config.service_tier,
|
||||
turn_metadata_header,
|
||||
Some(trace_context),
|
||||
)
|
||||
.instrument(trace_span!("stream_request"))
|
||||
.or_cancel(&cancellation_token)
|
||||
|
||||
@@ -28,6 +28,7 @@ use codex_tools::ToolSpec;
|
||||
use codex_utils_readiness::Readiness;
|
||||
use futures::future::BoxFuture;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
use tracing::warn;
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
|
||||
@@ -206,9 +207,39 @@ impl ToolRegistry {
|
||||
// }
|
||||
// }
|
||||
|
||||
#[tracing::instrument(
|
||||
target = "codex_otel.trace_safe",
|
||||
name = "codex.tool_call",
|
||||
skip_all,
|
||||
fields(
|
||||
thread.id = %invocation.session.conversation_id,
|
||||
turn.id = %invocation.turn.sub_id,
|
||||
tool.call_id = %invocation.call_id,
|
||||
tool.name = %invocation.tool_name.display(),
|
||||
)
|
||||
)]
|
||||
pub(crate) async fn dispatch_any(
|
||||
&self,
|
||||
invocation: ToolInvocation,
|
||||
) -> Result<AnyToolResult, FunctionCallError> {
|
||||
let call_id_owned = invocation.call_id.clone();
|
||||
let trace_tool_call_id = format!("tool:{call_id_owned}");
|
||||
let invocation_payload =
|
||||
codex_trace::write_payload("tool_invocation", &trace_tool_invocation(&invocation));
|
||||
emit_tool_started(
|
||||
&invocation,
|
||||
&trace_tool_call_id,
|
||||
invocation_payload.as_ref(),
|
||||
);
|
||||
|
||||
let result = self.dispatch_any_inner(invocation).await;
|
||||
emit_tool_ended(&trace_tool_call_id, &result);
|
||||
result
|
||||
}
|
||||
|
||||
async fn dispatch_any_inner(
|
||||
&self,
|
||||
invocation: ToolInvocation,
|
||||
) -> Result<AnyToolResult, FunctionCallError> {
|
||||
let tool_name = invocation.tool_name.clone();
|
||||
let display_name = tool_name.display();
|
||||
@@ -297,10 +328,11 @@ impl ToolRegistry {
|
||||
)
|
||||
.await
|
||||
{
|
||||
return Err(FunctionCallError::RespondToModel(format!(
|
||||
let message = format!(
|
||||
"Command blocked by PreToolUse hook: {reason}. Command: {}",
|
||||
pre_tool_use_payload.command
|
||||
)));
|
||||
);
|
||||
return Err(FunctionCallError::RespondToModel(message));
|
||||
}
|
||||
|
||||
let is_mutating = handler.is_mutating(&invocation).await;
|
||||
@@ -374,7 +406,7 @@ impl ToolRegistry {
|
||||
// Deprecated: this is the legacy AfterToolUse hook. Prefer the new PostToolUse
|
||||
let hook_abort_error = dispatch_after_tool_use_hook(AfterToolUseHookDispatch {
|
||||
invocation: &invocation,
|
||||
output_preview,
|
||||
output_preview: output_preview.clone(),
|
||||
success,
|
||||
executed: true,
|
||||
duration,
|
||||
@@ -429,6 +461,114 @@ impl ToolRegistry {
|
||||
}
|
||||
}
|
||||
|
||||
fn emit_tool_started(
|
||||
invocation: &ToolInvocation,
|
||||
tool_call_id: &str,
|
||||
invocation_payload: Option<&codex_trace::RawPayloadRef>,
|
||||
) {
|
||||
tracing::event!(
|
||||
target: codex_otel::OTEL_TRACE_SAFE_TARGET,
|
||||
tracing::Level::INFO,
|
||||
event.name = %"codex.tool.started",
|
||||
thread.id = %invocation.session.conversation_id,
|
||||
turn.id = %invocation.turn.sub_id,
|
||||
tool.call_id = %tool_call_id,
|
||||
tool.name = %invocation.tool_name.display(),
|
||||
model_visible_call.id = %invocation.call_id,
|
||||
raw_payload.invocation.id = %invocation_payload.map(|payload| payload.raw_payload_id.as_str()).unwrap_or(""),
|
||||
raw_payload.invocation.path = %invocation_payload.map(|payload| payload.path.as_str()).unwrap_or(""),
|
||||
raw_payload.invocation.kind = %invocation_payload.map(|payload| payload.kind.as_str()).unwrap_or(""),
|
||||
);
|
||||
}
|
||||
|
||||
fn emit_tool_ended(tool_call_id: &str, result: &Result<AnyToolResult, FunctionCallError>) {
|
||||
let (status, output_preview) = match result {
|
||||
Ok(result) => {
|
||||
let success = result.result.success_for_logging();
|
||||
let status = if success { "completed" } else { "failed" };
|
||||
(status, result.result.log_preview())
|
||||
}
|
||||
Err(err) => ("failed", err.to_string()),
|
||||
};
|
||||
let result_payload = codex_trace::write_payload_lazy("tool_result", || {
|
||||
trace_tool_result(result, status, &output_preview)
|
||||
});
|
||||
tracing::event!(
|
||||
target: codex_otel::OTEL_TRACE_SAFE_TARGET,
|
||||
tracing::Level::INFO,
|
||||
event.name = %"codex.tool.ended",
|
||||
tool.call_id = %tool_call_id,
|
||||
status = %status,
|
||||
output_preview = %output_preview,
|
||||
raw_payload.result.id = %result_payload.as_ref().map(|payload| payload.raw_payload_id.as_str()).unwrap_or(""),
|
||||
raw_payload.result.path = %result_payload.as_ref().map(|payload| payload.path.as_str()).unwrap_or(""),
|
||||
raw_payload.result.kind = %result_payload.as_ref().map(|payload| payload.kind.as_str()).unwrap_or(""),
|
||||
);
|
||||
}
|
||||
|
||||
fn trace_tool_invocation(invocation: &ToolInvocation) -> Value {
|
||||
let payload = match &invocation.payload {
|
||||
ToolPayload::Function { arguments } => json!({
|
||||
"type": "function",
|
||||
"arguments": arguments,
|
||||
}),
|
||||
ToolPayload::ToolSearch { arguments } => json!({
|
||||
"type": "tool_search",
|
||||
"arguments": arguments,
|
||||
}),
|
||||
ToolPayload::Custom { input } => json!({
|
||||
"type": "custom",
|
||||
"input": input,
|
||||
}),
|
||||
ToolPayload::LocalShell { params } => json!({
|
||||
"type": "local_shell",
|
||||
"command": ¶ms.command,
|
||||
"workdir": ¶ms.workdir,
|
||||
"timeout_ms": params.timeout_ms,
|
||||
}),
|
||||
ToolPayload::Mcp {
|
||||
server,
|
||||
tool,
|
||||
raw_arguments,
|
||||
} => json!({
|
||||
"type": "mcp",
|
||||
"server": server,
|
||||
"tool": tool,
|
||||
"raw_arguments": raw_arguments,
|
||||
}),
|
||||
};
|
||||
json!({
|
||||
"call_id": &invocation.call_id,
|
||||
"tool_name": invocation.tool_name.display(),
|
||||
"turn_id": &invocation.turn.sub_id,
|
||||
"payload": payload,
|
||||
})
|
||||
}
|
||||
|
||||
fn trace_tool_result(
|
||||
result: &Result<AnyToolResult, FunctionCallError>,
|
||||
status: &str,
|
||||
output_preview: &str,
|
||||
) -> Value {
|
||||
match result {
|
||||
Ok(result) => json!({
|
||||
"status": status,
|
||||
"success": result.result.success_for_logging(),
|
||||
"output_preview": output_preview,
|
||||
"model_visible_response_item": result.result.to_response_item(&result.call_id, &result.payload),
|
||||
"code_mode_result": result.result.code_mode_result(&result.payload),
|
||||
}),
|
||||
Err(err) => json!({
|
||||
"status": status,
|
||||
"success": false,
|
||||
"output_preview": output_preview,
|
||||
"error": {
|
||||
"message": err.to_string(),
|
||||
},
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ToolRegistryBuilder {
|
||||
handlers: HashMap<ToolName, Arc<dyn AnyToolHandler>>,
|
||||
specs: Vec<ConfiguredToolSpec>,
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
use super::*;
|
||||
use crate::tools::context::FunctionToolOutput;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
|
||||
struct TestHandler;
|
||||
|
||||
@@ -49,3 +51,34 @@ fn handler_looks_up_namespaced_aliases_explicitly() {
|
||||
.is_some_and(|handler| Arc::ptr_eq(handler, &namespaced_handler))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn trace_tool_result_includes_full_caller_payloads() {
|
||||
let result = Ok(AnyToolResult {
|
||||
call_id: "call-1".to_string(),
|
||||
payload: ToolPayload::Function {
|
||||
arguments: "{}".to_string(),
|
||||
},
|
||||
result: Box::new(FunctionToolOutput::from_text(
|
||||
"tool output".to_string(),
|
||||
Some(true),
|
||||
)),
|
||||
});
|
||||
|
||||
let payload = trace_tool_result(&result, "completed", "tool output");
|
||||
|
||||
assert_eq!(
|
||||
payload,
|
||||
json!({
|
||||
"status": "completed",
|
||||
"success": true,
|
||||
"output_preview": "tool output",
|
||||
"model_visible_response_item": {
|
||||
"type": "function_call_output",
|
||||
"call_id": "call-1",
|
||||
"output": "tool output",
|
||||
},
|
||||
"code_mode_result": "tool output",
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
@@ -34,6 +34,7 @@ codex-login = { workspace = true }
|
||||
codex-model-provider-info = { workspace = true }
|
||||
codex-otel = { workspace = true }
|
||||
codex-protocol = { workspace = true }
|
||||
codex-trace = { workspace = true }
|
||||
codex-utils-absolute-path = { workspace = true }
|
||||
codex-utils-cli = { workspace = true }
|
||||
codex-utils-oss = { workspace = true }
|
||||
|
||||
@@ -449,11 +449,13 @@ pub async fn run_main(cli: Cli, arg0_paths: Arg0DispatchPaths) -> anyhow::Result
|
||||
let otel_logger_layer = otel.as_ref().and_then(|o| o.logger_layer());
|
||||
|
||||
let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer());
|
||||
let codex_trace_layer = codex_trace::local_layer_from_env().ok().flatten();
|
||||
|
||||
let _ = tracing_subscriber::registry()
|
||||
.with(fmt_layer)
|
||||
.with(otel_tracing_layer)
|
||||
.with(otel_logger_layer)
|
||||
.with(codex_trace_layer)
|
||||
.try_init();
|
||||
|
||||
let exec_span = exec_root_span();
|
||||
|
||||
@@ -32,6 +32,8 @@ pub use crate::trace_context::span_w3c_trace_context;
|
||||
pub use crate::trace_context::traceparent_context_from_env;
|
||||
pub use codex_utils_string::sanitize_metric_tag_value;
|
||||
|
||||
pub const OTEL_TRACE_SAFE_TARGET: &str = "codex_otel.trace_safe";
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Display)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ToolDecisionSource {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
pub(crate) const OTEL_TARGET_PREFIX: &str = "codex_otel";
|
||||
pub(crate) const OTEL_LOG_ONLY_TARGET: &str = "codex_otel.log_only";
|
||||
pub(crate) const OTEL_TRACE_SAFE_TARGET: &str = "codex_otel.trace_safe";
|
||||
pub(crate) const OTEL_TRACE_SAFE_TARGET: &str = crate::OTEL_TRACE_SAFE_TARGET;
|
||||
|
||||
pub(crate) fn is_log_export_target(target: &str) -> bool {
|
||||
target.starts_with(OTEL_TARGET_PREFIX) && !is_trace_safe_target(target)
|
||||
|
||||
26
codex-rs/trace/Cargo.toml
Normal file
26
codex-rs/trace/Cargo.toml
Normal file
@@ -0,0 +1,26 @@
|
||||
[package]
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
name = "codex-trace"
|
||||
version.workspace = true
|
||||
|
||||
[lib]
|
||||
doctest = false
|
||||
name = "codex_trace"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
tracing-appender = { workspace = true }
|
||||
tracing-subscriber = { workspace = true, features = ["fmt", "json", "registry"] }
|
||||
|
||||
[dev-dependencies]
|
||||
pretty_assertions = { workspace = true }
|
||||
tempfile = { workspace = true }
|
||||
18
codex-rs/trace/src/lib.rs
Normal file
18
codex-rs/trace/src/lib.rs
Normal file
@@ -0,0 +1,18 @@
|
||||
//! Lightweight local trace capture for Codex debugging.
|
||||
//!
|
||||
//! Runtime code emits ordinary `tracing` events. This crate only provides a
|
||||
//! localhost debug layer, raw payload references, and a reducer that writes the
|
||||
//! `state.json` shape consumed by the standalone trace viewer.
|
||||
|
||||
mod local;
|
||||
mod reduce;
|
||||
|
||||
pub use local::CODEX_TRACE_ROOT_ENV;
|
||||
pub use local::LOCAL_TRACE_TARGET;
|
||||
pub use local::RawPayloadRef;
|
||||
pub use local::local_layer_from_env;
|
||||
pub use local::next_id;
|
||||
pub use local::write_payload;
|
||||
pub use local::write_payload_lazy;
|
||||
pub use reduce::REDUCED_STATE_FILE_NAME;
|
||||
pub use reduce::reduce_bundle_to_path;
|
||||
204
codex-rs/trace/src/local.rs
Normal file
204
codex-rs/trace/src/local.rs
Normal file
@@ -0,0 +1,204 @@
|
||||
use std::fs::File;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Mutex;
|
||||
use std::sync::OnceLock;
|
||||
use std::sync::PoisonError;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::Duration;
|
||||
use std::time::SystemTime;
|
||||
use std::time::UNIX_EPOCH;
|
||||
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use serde::Serialize;
|
||||
use serde_json::json;
|
||||
use tracing::Level;
|
||||
use tracing::Subscriber;
|
||||
use tracing_subscriber::Layer;
|
||||
use tracing_subscriber::filter::Targets;
|
||||
use tracing_subscriber::registry::LookupSpan;
|
||||
|
||||
pub const CODEX_TRACE_ROOT_ENV: &str = "CODEX_TRACE_ROOT";
|
||||
pub const LOCAL_TRACE_TARGET: &str = "codex_otel.trace_safe";
|
||||
|
||||
static PAYLOAD_WRITER: OnceLock<Mutex<PayloadWriter>> = OnceLock::new();
|
||||
static NEXT_ID: AtomicU64 = AtomicU64::new(1);
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
|
||||
pub struct RawPayloadRef {
|
||||
pub raw_payload_id: String,
|
||||
pub kind: String,
|
||||
pub path: String,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PayloadWriter {
|
||||
payloads_dir: PathBuf,
|
||||
next_payload_ordinal: u64,
|
||||
}
|
||||
|
||||
pub fn local_layer_from_env<S>() -> Result<Option<impl Layer<S> + Send + Sync + 'static>>
|
||||
where
|
||||
S: Subscriber + for<'span> LookupSpan<'span>,
|
||||
{
|
||||
let Some(root) = std::env::var_os(CODEX_TRACE_ROOT_ENV) else {
|
||||
return Ok(None);
|
||||
};
|
||||
local_layer_for_root(root).map(Some)
|
||||
}
|
||||
|
||||
fn local_layer_for_root<S>(
|
||||
root: impl Into<PathBuf>,
|
||||
) -> Result<impl Layer<S> + Send + Sync + 'static>
|
||||
where
|
||||
S: Subscriber + for<'span> LookupSpan<'span>,
|
||||
{
|
||||
let root = root.into();
|
||||
std::fs::create_dir_all(&root)
|
||||
.with_context(|| format!("create trace root {}", root.display()))?;
|
||||
let started_at_unix_ms = unix_time_ms();
|
||||
let process_id = std::process::id();
|
||||
let trace_id = format!("trace-{started_at_unix_ms}-{process_id}");
|
||||
let bundle_dir = root.join(&trace_id);
|
||||
let payloads_dir = bundle_dir.join("payloads");
|
||||
std::fs::create_dir_all(&payloads_dir)
|
||||
.with_context(|| format!("create trace payload dir {}", payloads_dir.display()))?;
|
||||
write_json_file(
|
||||
&bundle_dir.join("manifest.json"),
|
||||
&json!({
|
||||
"schema_version": 1,
|
||||
"trace_id": trace_id,
|
||||
"started_at_unix_ms": started_at_unix_ms,
|
||||
"event_log": {
|
||||
"format": "tracing_subscriber_fmt_json",
|
||||
"path": "events.jsonl"
|
||||
},
|
||||
}),
|
||||
)?;
|
||||
|
||||
let writer = PAYLOAD_WRITER.get_or_init(|| {
|
||||
Mutex::new(PayloadWriter {
|
||||
payloads_dir: PathBuf::new(),
|
||||
next_payload_ordinal: 1,
|
||||
})
|
||||
});
|
||||
*writer.lock().unwrap_or_else(PoisonError::into_inner) = PayloadWriter {
|
||||
payloads_dir,
|
||||
next_payload_ordinal: 1,
|
||||
};
|
||||
|
||||
let event_log = tracing_appender::rolling::never(&bundle_dir, "events.jsonl");
|
||||
let layer = tracing_subscriber::fmt::layer()
|
||||
.json()
|
||||
.flatten_event(true)
|
||||
.with_current_span(false)
|
||||
.with_span_list(false)
|
||||
.with_writer(event_log)
|
||||
.with_filter(Targets::new().with_target(LOCAL_TRACE_TARGET, Level::INFO));
|
||||
Ok(layer)
|
||||
}
|
||||
|
||||
pub fn next_id(prefix: &str) -> String {
|
||||
let ordinal = NEXT_ID.fetch_add(1, Ordering::Relaxed);
|
||||
format!("{prefix}:{ordinal}")
|
||||
}
|
||||
|
||||
pub fn write_payload(kind: &str, value: &impl Serialize) -> Option<RawPayloadRef> {
|
||||
let writer = PAYLOAD_WRITER.get()?;
|
||||
let mut writer = writer.lock().unwrap_or_else(PoisonError::into_inner);
|
||||
write_payload_locked(&mut writer, kind, value)
|
||||
}
|
||||
|
||||
pub fn write_payload_lazy<T>(kind: &str, build_value: impl FnOnce() -> T) -> Option<RawPayloadRef>
|
||||
where
|
||||
T: Serialize,
|
||||
{
|
||||
let writer = PAYLOAD_WRITER.get()?;
|
||||
let value = build_value();
|
||||
let mut writer = writer.lock().unwrap_or_else(PoisonError::into_inner);
|
||||
write_payload_locked(&mut writer, kind, &value)
|
||||
}
|
||||
|
||||
fn write_payload_locked(
|
||||
writer: &mut PayloadWriter,
|
||||
kind: &str,
|
||||
value: &impl Serialize,
|
||||
) -> Option<RawPayloadRef> {
|
||||
let ordinal = writer.next_payload_ordinal;
|
||||
writer.next_payload_ordinal += 1;
|
||||
let raw_payload_id = format!("raw_payload:{ordinal}");
|
||||
let path = format!("payloads/{ordinal}.json");
|
||||
let absolute_path = writer.payloads_dir.join(format!("{ordinal}.json"));
|
||||
if write_json_file(&absolute_path, value).is_err() {
|
||||
return None;
|
||||
}
|
||||
Some(RawPayloadRef {
|
||||
raw_payload_id,
|
||||
kind: kind.to_string(),
|
||||
path,
|
||||
})
|
||||
}
|
||||
|
||||
fn write_json_file(path: &Path, value: &impl Serialize) -> Result<()> {
|
||||
let file = File::create(path).with_context(|| format!("create {}", path.display()))?;
|
||||
serde_json::to_writer_pretty(file, value)
|
||||
.with_context(|| format!("write JSON {}", path.display()))
|
||||
}
|
||||
|
||||
pub(crate) fn unix_time_ms() -> i64 {
|
||||
let duration = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_else(|_| Duration::from_millis(0));
|
||||
i64::try_from(duration.as_millis()).unwrap_or(i64::MAX)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use tempfile::TempDir;
|
||||
use tracing_subscriber::prelude::*;
|
||||
|
||||
#[test]
|
||||
fn local_layer_can_be_composed_after_assignment() -> anyhow::Result<()> {
|
||||
let temp = TempDir::new()?;
|
||||
let layer = super::local_layer_for_root(temp.path())?;
|
||||
let _subscriber = tracing_subscriber::registry().with(layer);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn local_layer_writes_standard_json_events() -> anyhow::Result<()> {
|
||||
let temp = TempDir::new()?;
|
||||
let layer = super::local_layer_for_root(temp.path())?;
|
||||
let subscriber = tracing_subscriber::registry().with(layer);
|
||||
tracing::subscriber::with_default(subscriber, || {
|
||||
tracing::event!(
|
||||
target: super::LOCAL_TRACE_TARGET,
|
||||
tracing::Level::INFO,
|
||||
event.name = %"codex.turn.started",
|
||||
thread.id = %"thread-1",
|
||||
turn.id = %"turn-1",
|
||||
);
|
||||
});
|
||||
|
||||
let trace_dir = std::fs::read_dir(temp.path())?
|
||||
.next()
|
||||
.transpose()?
|
||||
.expect("trace directory should be created")
|
||||
.path();
|
||||
let event_log = std::fs::read_to_string(trace_dir.join("events.jsonl"))?;
|
||||
let event: serde_json::Value = serde_json::from_str(event_log.trim())?;
|
||||
assert_eq!(event["target"], super::LOCAL_TRACE_TARGET);
|
||||
assert_eq!(event["event.name"], "codex.turn.started");
|
||||
assert_eq!(event["thread.id"], "thread-1");
|
||||
assert_eq!(event["turn.id"], "turn-1");
|
||||
assert!(
|
||||
event
|
||||
.get("timestamp")
|
||||
.and_then(serde_json::Value::as_str)
|
||||
.is_some()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
1326
codex-rs/trace/src/reduce.rs
Normal file
1326
codex-rs/trace/src/reduce.rs
Normal file
File diff suppressed because it is too large
Load Diff
@@ -48,6 +48,7 @@ codex-rollout = { workspace = true }
|
||||
codex-shell-command = { workspace = true }
|
||||
codex-state = { workspace = true }
|
||||
codex-terminal-detection = { workspace = true }
|
||||
codex-trace = { workspace = true }
|
||||
codex-utils-approval-presets = { workspace = true }
|
||||
codex-utils-absolute-path = { workspace = true }
|
||||
codex-utils-cli = { workspace = true }
|
||||
|
||||
@@ -978,6 +978,7 @@ pub async fn run_main(
|
||||
let otel_logger_layer = otel.as_ref().and_then(|o| o.logger_layer());
|
||||
|
||||
let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer());
|
||||
let codex_trace_layer = codex_trace::local_layer_from_env().ok().flatten();
|
||||
|
||||
let log_db = get_state_db(&config).await.map(log_db::start);
|
||||
let log_db_layer = log_db
|
||||
@@ -991,6 +992,7 @@ pub async fn run_main(
|
||||
.with(log_db_layer)
|
||||
.with(otel_logger_layer)
|
||||
.with(otel_tracing_layer)
|
||||
.with(codex_trace_layer)
|
||||
.try_init();
|
||||
|
||||
run_ratatui_app(
|
||||
|
||||
Reference in New Issue
Block a user