feat: adding thread ID to logs + filter in the client (#10150)

This commit is contained in:
jif-oai
2026-01-29 16:53:30 +01:00
committed by GitHub
parent b654b7a9ae
commit 89c5f3c4d4
8 changed files with 203 additions and 42 deletions

View File

@@ -84,6 +84,7 @@ use tracing::debug;
use tracing::error;
use tracing::field;
use tracing::info;
use tracing::info_span;
use tracing::instrument;
use tracing::trace_span;
use tracing::warn;
@@ -353,6 +354,7 @@ impl Codex {
let session_source_clone = session_configuration.session_source.clone();
let (agent_status_tx, agent_status_rx) = watch::channel(AgentStatus::PendingInit);
let session_init_span = info_span!("session_init");
let session = Session::new(
session_configuration,
config.clone(),
@@ -366,6 +368,7 @@ impl Codex {
skills_manager,
agent_control,
)
.instrument(session_init_span)
.await
.map_err(|e| {
error!("Failed to create session: {e:#}");
@@ -374,7 +377,10 @@ impl Codex {
let thread_id = session.conversation_id;
// This task will run until Op::Shutdown is received.
tokio::spawn(submission_loop(Arc::clone(&session), config, rx_sub));
let session_loop_span = info_span!("session_loop", thread_id = %thread_id);
tokio::spawn(
submission_loop(Arc::clone(&session), config, rx_sub).instrument(session_loop_span),
);
let codex = Codex {
next_id: AtomicU64::new(0),
tx_sub,

View File

@@ -19,6 +19,8 @@ use tokio::fs;
use tokio::process::Command;
use tokio::sync::watch;
use tokio::time::timeout;
use tracing::Instrument;
use tracing::info_span;
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ShellSnapshot {
@@ -42,17 +44,21 @@ impl ShellSnapshot {
let snapshot_shell = shell.clone();
let snapshot_session_id = session_id;
tokio::spawn(async move {
let timer = otel_manager.start_timer("codex.shell_snapshot.duration_ms", &[]);
let snapshot =
ShellSnapshot::try_new(&codex_home, snapshot_session_id, &snapshot_shell)
.await
.map(Arc::new);
let success = if snapshot.is_some() { "true" } else { "false" };
let _ = timer.map(|timer| timer.record(&[("success", success)]));
otel_manager.counter("codex.shell_snapshot", 1, &[("success", success)]);
let _ = shell_snapshot_tx.send(snapshot);
});
let snapshot_span = info_span!("shell_snapshot", thread_id = %snapshot_session_id);
tokio::spawn(
async move {
let timer = otel_manager.start_timer("codex.shell_snapshot.duration_ms", &[]);
let snapshot =
ShellSnapshot::try_new(&codex_home, snapshot_session_id, &snapshot_shell)
.await
.map(Arc::new);
let success = if snapshot.is_some() { "true" } else { "false" };
let _ = timer.map(|timer| timer.record(&[("success", success)]));
otel_manager.counter("codex.shell_snapshot", 1, &[("success", success)]);
let _ = shell_snapshot_tx.send(snapshot);
}
.instrument(snapshot_span),
);
}
async fn try_new(codex_home: &Path, session_id: ThreadId, shell: &Shell) -> Option<Self> {

View File

@@ -13,6 +13,8 @@ use tokio::select;
use tokio::sync::Notify;
use tokio_util::sync::CancellationToken;
use tokio_util::task::AbortOnDropHandle;
use tracing::Instrument;
use tracing::info_span;
use tracing::trace;
use tracing::warn;
@@ -130,25 +132,30 @@ impl Session {
let ctx = Arc::clone(&turn_context);
let task_for_run = Arc::clone(&task);
let task_cancellation_token = cancellation_token.child_token();
tokio::spawn(async move {
let ctx_for_finish = Arc::clone(&ctx);
let last_agent_message = task_for_run
.run(
Arc::clone(&session_ctx),
ctx,
input,
task_cancellation_token.child_token(),
)
.await;
session_ctx.clone_session().flush_rollout().await;
if !task_cancellation_token.is_cancelled() {
// Emit completion uniformly from spawn site so all tasks share the same lifecycle.
let sess = session_ctx.clone_session();
sess.on_task_finished(ctx_for_finish, last_agent_message)
let thread_id = self.conversation_id;
let session_span = info_span!("session_task", thread_id = %thread_id);
tokio::spawn(
async move {
let ctx_for_finish = Arc::clone(&ctx);
let last_agent_message = task_for_run
.run(
Arc::clone(&session_ctx),
ctx,
input,
task_cancellation_token.child_token(),
)
.await;
session_ctx.clone_session().flush_rollout().await;
if !task_cancellation_token.is_cancelled() {
// Emit completion uniformly from spawn site so all tasks share the same lifecycle.
let sess = session_ctx.clone_session();
sess.on_task_finished(ctx_for_finish, last_agent_message)
.await;
}
done_clone.notify_waiters();
}
done_clone.notify_waiters();
})
.instrument(session_span),
)
};
let timer = turn_context

View File

@@ -5,9 +5,11 @@ CREATE TABLE logs (
level TEXT NOT NULL,
target TEXT NOT NULL,
message TEXT,
thread_id TEXT,
module_path TEXT,
file TEXT,
line INTEGER
);
CREATE INDEX idx_logs_ts ON logs(ts DESC, ts_nanos DESC, id DESC);
CREATE INDEX idx_logs_thread_id ON logs(thread_id);

View File

@@ -49,6 +49,10 @@ struct Args {
#[arg(long)]
file: Option<String>,
/// Match a specific thread id.
#[arg(long)]
thread_id: Option<String>,
/// Number of matching rows to show before tailing.
#[arg(long, default_value_t = 200)]
backfill: usize,
@@ -65,6 +69,7 @@ struct LogRow {
ts_nanos: i64,
level: String,
message: Option<String>,
thread_id: Option<String>,
file: Option<String>,
line: Option<i64>,
}
@@ -76,6 +81,7 @@ struct LogFilter {
to_ts: Option<i64>,
module_like: Option<String>,
file_like: Option<String>,
thread_id: Option<String>,
}
#[tokio::main]
@@ -139,6 +145,7 @@ fn build_filter(args: &Args) -> anyhow::Result<LogFilter> {
to_ts,
module_like: args.module.clone(),
file_like: args.file.clone(),
thread_id: args.thread_id.clone(),
})
}
@@ -236,7 +243,7 @@ async fn fetch_max_id(pool: &SqlitePool, filter: &LogFilter) -> anyhow::Result<i
fn base_select_builder<'a>() -> QueryBuilder<'a, Sqlite> {
QueryBuilder::<Sqlite>::new(
"SELECT id, ts, ts_nanos, level, message, file, line FROM logs WHERE 1 = 1",
"SELECT id, ts, ts_nanos, level, message, thread_id, file, line FROM logs WHERE 1 = 1",
)
}
@@ -264,6 +271,11 @@ fn push_filters<'a>(builder: &mut QueryBuilder<'a, Sqlite>, filter: &'a LogFilte
.push_bind(file_like.as_str())
.push(" || '%'");
}
if let Some(thread_id) = filter.thread_id.as_ref() {
builder
.push(" AND thread_id = ")
.push_bind(thread_id.as_str());
}
}
fn format_row(row: &LogRow) -> String {
@@ -277,9 +289,13 @@ fn format_row(row: &LogRow) -> String {
let message = row.message.as_deref().unwrap_or("");
let level_colored = color_level(level);
let timestamp_colored = timestamp.dimmed().to_string();
let thread_id = row.thread_id.as_deref().unwrap_or("-");
let thread_id_colored = thread_id.yellow().to_string();
let location_colored = location.dimmed().to_string();
let message_colored = message.bold().to_string();
format!("{timestamp_colored} {level_colored} {location_colored} - {message_colored}")
format!(
"{timestamp_colored} {level_colored} [{thread_id_colored}] {location_colored} - {message_colored}"
)
}
fn color_level(level: &str) -> String {

View File

@@ -26,6 +26,9 @@ use tokio::sync::mpsc;
use tracing::Event;
use tracing::field::Field;
use tracing::field::Visit;
use tracing::span::Attributes;
use tracing::span::Id;
use tracing::span::Record;
use tracing_subscriber::Layer;
use tracing_subscriber::registry::LookupSpan;
@@ -51,10 +54,55 @@ impl<S> Layer<S> for LogDbLayer
where
S: tracing::Subscriber + for<'a> LookupSpan<'a>,
{
fn on_event(&self, event: &Event<'_>, _ctx: tracing_subscriber::layer::Context<'_, S>) {
fn on_new_span(
&self,
attrs: &Attributes<'_>,
id: &Id,
ctx: tracing_subscriber::layer::Context<'_, S>,
) {
let mut visitor = SpanFieldVisitor::default();
attrs.record(&mut visitor);
if let Some(span) = ctx.span(id) {
span.extensions_mut().insert(SpanLogContext {
thread_id: visitor.thread_id,
});
}
}
fn on_record(
&self,
id: &Id,
values: &Record<'_>,
ctx: tracing_subscriber::layer::Context<'_, S>,
) {
let mut visitor = SpanFieldVisitor::default();
values.record(&mut visitor);
if visitor.thread_id.is_none() {
return;
}
if let Some(span) = ctx.span(id) {
let mut extensions = span.extensions_mut();
if let Some(log_context) = extensions.get_mut::<SpanLogContext>() {
log_context.thread_id = visitor.thread_id;
} else {
extensions.insert(SpanLogContext {
thread_id: visitor.thread_id,
});
}
}
}
fn on_event(&self, event: &Event<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) {
let metadata = event.metadata();
let mut visitor = MessageVisitor::default();
event.record(&mut visitor);
let thread_id = visitor
.thread_id
.clone()
.or_else(|| event_thread_id(event, &ctx));
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
@@ -65,6 +113,7 @@ where
level: metadata.level().as_str().to_string(),
target: metadata.target().to_string(),
message: visitor.message,
thread_id,
module_path: metadata.module_path().map(ToString::to_string),
file: metadata.file().map(ToString::to_string),
line: metadata.line().map(|line| line as i64),
@@ -74,6 +123,75 @@ where
}
}
#[derive(Clone, Debug, Default)]
struct SpanLogContext {
thread_id: Option<String>,
}
#[derive(Default)]
struct SpanFieldVisitor {
thread_id: Option<String>,
}
impl SpanFieldVisitor {
fn record_field(&mut self, field: &Field, value: String) {
if field.name() == "thread_id" && self.thread_id.is_none() {
self.thread_id = Some(value);
}
}
}
impl Visit for SpanFieldVisitor {
fn record_i64(&mut self, field: &Field, value: i64) {
self.record_field(field, value.to_string());
}
fn record_u64(&mut self, field: &Field, value: u64) {
self.record_field(field, value.to_string());
}
fn record_bool(&mut self, field: &Field, value: bool) {
self.record_field(field, value.to_string());
}
fn record_f64(&mut self, field: &Field, value: f64) {
self.record_field(field, value.to_string());
}
fn record_str(&mut self, field: &Field, value: &str) {
self.record_field(field, value.to_string());
}
fn record_error(&mut self, field: &Field, value: &(dyn std::error::Error + 'static)) {
self.record_field(field, value.to_string());
}
fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
self.record_field(field, format!("{value:?}"));
}
}
fn event_thread_id<S>(
event: &Event<'_>,
ctx: &tracing_subscriber::layer::Context<'_, S>,
) -> Option<String>
where
S: tracing::Subscriber + for<'a> LookupSpan<'a>,
{
let mut thread_id = None;
if let Some(scope) = ctx.event_scope(event) {
for span in scope.from_root() {
let extensions = span.extensions();
if let Some(log_context) = extensions.get::<SpanLogContext>()
&& log_context.thread_id.is_some()
{
thread_id = log_context.thread_id.clone();
}
}
}
thread_id
}
async fn run_inserter(
state_db: std::sync::Arc<StateRuntime>,
mut receiver: mpsc::Receiver<LogEntry>,
@@ -114,42 +232,46 @@ async fn flush(state_db: &std::sync::Arc<StateRuntime>, buffer: &mut Vec<LogEntr
#[derive(Default)]
struct MessageVisitor {
message: Option<String>,
thread_id: Option<String>,
}
impl MessageVisitor {
fn record_message(&mut self, field: &Field, value: String) {
fn record_field(&mut self, field: &Field, value: String) {
if field.name() == "message" && self.message.is_none() {
self.message = Some(value);
self.message = Some(value.clone());
}
if field.name() == "thread_id" && self.thread_id.is_none() {
self.thread_id = Some(value);
}
}
}
impl Visit for MessageVisitor {
fn record_i64(&mut self, field: &Field, value: i64) {
self.record_message(field, value.to_string());
self.record_field(field, value.to_string());
}
fn record_u64(&mut self, field: &Field, value: u64) {
self.record_message(field, value.to_string());
self.record_field(field, value.to_string());
}
fn record_bool(&mut self, field: &Field, value: bool) {
self.record_message(field, value.to_string());
self.record_field(field, value.to_string());
}
fn record_f64(&mut self, field: &Field, value: f64) {
self.record_message(field, value.to_string());
self.record_field(field, value.to_string());
}
fn record_str(&mut self, field: &Field, value: &str) {
self.record_message(field, value.to_string());
self.record_field(field, value.to_string());
}
fn record_error(&mut self, field: &Field, value: &(dyn std::error::Error + 'static)) {
self.record_message(field, value.to_string());
self.record_field(field, value.to_string());
}
fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
self.record_message(field, format!("{value:?}"));
self.record_field(field, format!("{value:?}"));
}
}

View File

@@ -7,6 +7,7 @@ pub struct LogEntry {
pub level: String,
pub target: String,
pub message: Option<String>,
pub thread_id: Option<String>,
pub module_path: Option<String>,
pub file: Option<String>,
pub line: Option<i64>,

View File

@@ -214,7 +214,7 @@ FROM threads
}
let mut builder = QueryBuilder::<Sqlite>::new(
"INSERT INTO logs (ts, ts_nanos, level, target, message, module_path, file, line) ",
"INSERT INTO logs (ts, ts_nanos, level, target, message, thread_id, module_path, file, line) ",
);
builder.push_values(entries, |mut row, entry| {
row.push_bind(entry.ts)
@@ -222,6 +222,7 @@ FROM threads
.push_bind(&entry.level)
.push_bind(&entry.target)
.push_bind(&entry.message)
.push_bind(&entry.thread_id)
.push_bind(&entry.module_path)
.push_bind(&entry.file)
.push_bind(entry.line);