Compare commits

...

3 Commits

Author SHA1 Message Date
jif-oai
5d3e1f3745 nit fix 2026-03-19 14:04:04 +00:00
jif-oai
ba5c832133 feat: cascade logs for multi-agents in /feedback 2026-03-19 13:53:16 +00:00
jif-oai
aeb382d350 feat: cascade logs for multi-agents in /feedback 2026-03-19 13:29:08 +00:00
8 changed files with 581 additions and 44 deletions

5
codex-rs/Cargo.lock generated
View File

@@ -2072,9 +2072,14 @@ name = "codex-feedback"
version = "0.0.0"
dependencies = [
"anyhow",
"chrono",
"codex-protocol",
"codex-state",
"codex-utils-home-dir",
"pretty_assertions",
"sentry",
"tempfile",
"tokio",
"tracing",
"tracing-subscriber",
]

View File

@@ -7029,32 +7029,13 @@ impl CodexMessageProcessor {
{
tracing::info!(target: "feedback_tags", chatgpt_user_id);
}
let snapshot = self.feedback.snapshot(conversation_id);
let snapshot = self
.feedback
.snapshot_with_sqlite_home(conversation_id, Some(self.config.sqlite_home.clone()));
let thread_id = snapshot.thread_id.clone();
let sqlite_feedback_logs = if include_logs {
if let Some(log_db) = self.log_db.as_ref() {
log_db.flush().await;
}
let state_db_ctx = get_state_db(&self.config).await;
match (state_db_ctx.as_ref(), conversation_id) {
(Some(state_db_ctx), Some(conversation_id)) => {
let thread_id_text = conversation_id.to_string();
match state_db_ctx.query_feedback_logs(&thread_id_text).await {
Ok(logs) if logs.is_empty() => None,
Ok(logs) => Some(logs),
Err(err) => {
warn!(
"failed to query feedback logs from sqlite for thread_id={thread_id_text}: {err}"
);
None
}
}
}
_ => None,
}
} else {
None
};
if include_logs && let Some(log_db) = self.log_db.as_ref() {
log_db.flush().await;
}
let validated_rollout_path = if include_logs {
match conversation_id {
@@ -7078,7 +7059,7 @@ impl CodexMessageProcessor {
include_logs,
&attachment_paths,
Some(session_source),
sqlite_feedback_logs,
/*logs_override*/ None,
)
})
.await;

View File

@@ -7,9 +7,14 @@ license.workspace = true
[dependencies]
anyhow = { workspace = true }
codex-protocol = { workspace = true }
codex-state = { workspace = true }
codex-utils-home-dir = { workspace = true }
sentry = { version = "0.46" }
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
[dev-dependencies]
chrono = { workspace = true }
pretty_assertions = { workspace = true }
tempfile = { workspace = true }

View File

@@ -1,9 +1,11 @@
use std::collections::BTreeMap;
use std::collections::HashSet;
use std::collections::VecDeque;
use std::collections::btree_map::Entry;
use std::fs;
use std::io::Write;
use std::io::{self};
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
@@ -32,6 +34,12 @@ const UPLOAD_TIMEOUT_SECS: u64 = 10;
const FEEDBACK_TAGS_TARGET: &str = "feedback_tags";
const MAX_FEEDBACK_TAGS: usize = 64;
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct FeedbackUploadArtifacts {
pub attachment_paths: Vec<PathBuf>,
pub logs_override: Option<Vec<u8>>,
}
#[derive(Clone)]
pub struct CodexFeedback {
inner: Arc<FeedbackInner>,
@@ -94,6 +102,14 @@ impl CodexFeedback {
}
pub fn snapshot(&self, session_id: Option<ThreadId>) -> FeedbackSnapshot {
self.snapshot_with_sqlite_home(session_id, /*sqlite_home*/ None)
}
pub fn snapshot_with_sqlite_home(
&self,
session_id: Option<ThreadId>,
sqlite_home: Option<PathBuf>,
) -> FeedbackSnapshot {
let bytes = {
let guard = self.inner.ring.lock().expect("mutex poisoned");
guard.snapshot_bytes()
@@ -106,6 +122,7 @@ impl CodexFeedback {
bytes,
tags,
feedback_diagnostics: FeedbackDiagnostics::collect_from_env(),
sqlite_home,
thread_id: session_id
.map(|id| id.to_string())
.unwrap_or("no-active-thread-".to_string() + &ThreadId::new().to_string()),
@@ -209,6 +226,7 @@ pub struct FeedbackSnapshot {
bytes: Vec<u8>,
tags: BTreeMap<String, String>,
feedback_diagnostics: FeedbackDiagnostics,
sqlite_home: Option<PathBuf>,
pub thread_id: String,
}
@@ -331,8 +349,19 @@ impl FeedbackSnapshot {
}
envelope.add_item(EnvelopeItem::Event(event));
let discovered_upload_artifacts = if include_logs {
discover_feedback_upload_artifacts(&self.thread_id, self.sqlite_home.as_deref())
} else {
FeedbackUploadArtifacts::default()
};
let merged_attachment_paths = merge_attachment_paths(
extra_attachment_paths,
&discovered_upload_artifacts.attachment_paths,
);
let merged_logs_override = logs_override.or(discovered_upload_artifacts.logs_override);
for attachment in
self.feedback_attachments(include_logs, extra_attachment_paths, logs_override)
self.feedback_attachments(include_logs, &merged_attachment_paths, merged_logs_override)
{
envelope.add_item(EnvelopeItem::Attachment(attachment));
}
@@ -398,6 +427,144 @@ impl FeedbackSnapshot {
}
}
fn merge_attachment_paths(
extra_attachment_paths: &[PathBuf],
discovered_attachment_paths: &[PathBuf],
) -> Vec<PathBuf> {
let mut merged = Vec::with_capacity(
extra_attachment_paths
.len()
.saturating_add(discovered_attachment_paths.len()),
);
let mut seen_paths = HashSet::<PathBuf>::new();
for path in extra_attachment_paths
.iter()
.chain(discovered_attachment_paths.iter())
{
if seen_paths.insert(path.clone()) {
merged.push(path.clone());
}
}
merged
}
fn discover_feedback_upload_artifacts(
thread_id: &str,
sqlite_home: Option<&Path>,
) -> FeedbackUploadArtifacts {
let Ok(root_thread_id) = ThreadId::from_string(thread_id) else {
return FeedbackUploadArtifacts::default();
};
let sqlite_home = match sqlite_home {
Some(sqlite_home) => sqlite_home.to_path_buf(),
None => match codex_utils_home_dir::find_codex_home() {
Ok(codex_home) => codex_home,
Err(_) => return FeedbackUploadArtifacts::default(),
},
};
if !codex_state::state_db_path(sqlite_home.as_path()).exists() {
return FeedbackUploadArtifacts::default();
}
std::thread::spawn(move || {
let runtime = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(runtime) => runtime,
Err(err) => {
tracing::warn!(
"failed to build tokio runtime for feedback artifact discovery: {err}"
);
return FeedbackUploadArtifacts::default();
}
};
runtime.block_on(collect_feedback_upload_artifacts(
sqlite_home,
root_thread_id,
))
})
.join()
.unwrap_or_else(|_| FeedbackUploadArtifacts::default())
}
async fn collect_feedback_upload_artifacts(
codex_home: PathBuf,
root_thread_id: ThreadId,
) -> FeedbackUploadArtifacts {
let mut artifacts = FeedbackUploadArtifacts::default();
let state_db = match open_feedback_state_db(codex_home.as_path()).await {
Some(state_db) => state_db,
None => return artifacts,
};
if let Some(root_rollout_path) =
find_rollout_path_by_id(state_db.as_ref(), root_thread_id).await
{
artifacts.attachment_paths.push(root_rollout_path);
}
let descendant_ids = match state_db.list_thread_spawn_descendants(root_thread_id).await {
Ok(descendant_ids) => descendant_ids,
Err(err) => {
tracing::warn!("failed to list descendant threads for feedback upload: {err}");
Vec::new()
}
};
let mut seen_paths = artifacts
.attachment_paths
.iter()
.cloned()
.collect::<HashSet<_>>();
for descendant_id in &descendant_ids {
if let Some(path) = find_rollout_path_by_id(state_db.as_ref(), *descendant_id).await
&& seen_paths.insert(path.clone())
{
artifacts.attachment_paths.push(path);
}
}
let mut log_thread_ids = Vec::with_capacity(descendant_ids.len().saturating_add(1));
log_thread_ids.push(root_thread_id.to_string());
log_thread_ids.extend(
descendant_ids
.into_iter()
.map(|thread_id| thread_id.to_string()),
);
artifacts.logs_override = match state_db
.query_feedback_logs_for_threads(&log_thread_ids)
.await
{
Ok(logs) if logs.is_empty() => None,
Ok(logs) => Some(logs),
Err(err) => {
tracing::warn!("failed to query feedback logs from sqlite for upload: {err}");
None
}
};
artifacts
}
async fn open_feedback_state_db(codex_home: &Path) -> Option<Arc<codex_state::StateRuntime>> {
codex_state::StateRuntime::init(codex_home.to_path_buf(), String::new())
.await
.ok()
}
async fn find_rollout_path_by_id(
state_db: &codex_state::StateRuntime,
thread_id: ThreadId,
) -> Option<PathBuf> {
state_db
.find_rollout_path_by_id(thread_id, /*archived_only*/ None)
.await
.unwrap_or_else(|err| {
tracing::warn!("failed to resolve rollout path during feedback upload: {err}");
None
})
}
fn display_classification(classification: &str) -> String {
match classification {
"bug" => "Bug".to_string(),
@@ -483,8 +650,14 @@ mod tests {
use std::fs;
use super::*;
use chrono::TimeZone;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_state::LogEntry;
use codex_state::ThreadMetadataBuilder;
use feedback_diagnostics::FeedbackDiagnostic;
use pretty_assertions::assert_eq;
use tempfile::tempdir;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
@@ -557,6 +730,7 @@ mod tests {
);
let attachments_without_diagnostics = CodexFeedback::new()
.snapshot(None)
.with_feedback_diagnostics(FeedbackDiagnostics::new(vec![]))
.feedback_attachments(true, &[], Some(vec![1]));
assert_eq!(
@@ -569,4 +743,229 @@ mod tests {
assert_eq!(attachments_without_diagnostics[0].buffer, vec![1]);
fs::remove_file(extra_path).expect("extra attachment should be removed");
}
#[tokio::test]
async fn collect_feedback_upload_artifacts_includes_child_rollouts_and_logs() {
let temp_dir = tempdir().expect("temp dir should exist");
let codex_home = temp_dir.path().to_path_buf();
let state_db = codex_state::StateRuntime::init(codex_home.clone(), String::new())
.await
.expect("state db should initialize");
state_db
.mark_backfill_complete(None)
.await
.expect("backfill should be marked complete");
let root_thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000111")
.expect("root thread id should parse");
let child_thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000222")
.expect("child thread id should parse");
let created_at = chrono::Utc
.timestamp_opt(1_742_050_000, 0)
.single()
.expect("valid ts");
let root_rollout_path = codex_home.join("rollout-root.jsonl");
let child_rollout_path = codex_home.join("rollout-child.jsonl");
let root_metadata = ThreadMetadataBuilder::new(
root_thread_id,
root_rollout_path.clone(),
created_at,
SessionSource::Cli,
)
.build("");
let child_metadata = ThreadMetadataBuilder::new(
child_thread_id,
child_rollout_path.clone(),
created_at,
SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: root_thread_id,
depth: 1,
agent_nickname: None,
agent_role: None,
}),
)
.build("");
state_db
.upsert_thread(&root_metadata)
.await
.expect("root thread should persist");
state_db
.upsert_thread(&child_metadata)
.await
.expect("child thread should persist");
state_db
.upsert_thread_spawn_edge(
root_thread_id,
child_thread_id,
codex_state::DirectionalThreadSpawnEdgeStatus::Open,
)
.await
.expect("spawn edge should persist");
state_db
.insert_logs(&[
LogEntry {
ts: 1,
ts_nanos: 0,
level: "INFO".to_string(),
target: "cli".to_string(),
message: Some("root-log".to_string()),
feedback_log_body: None,
thread_id: Some(root_thread_id.to_string()),
process_uuid: Some("proc-shared".to_string()),
file: None,
line: None,
module_path: None,
},
LogEntry {
ts: 2,
ts_nanos: 0,
level: "INFO".to_string(),
target: "cli".to_string(),
message: Some("child-log".to_string()),
feedback_log_body: None,
thread_id: Some(child_thread_id.to_string()),
process_uuid: Some("proc-shared".to_string()),
file: None,
line: None,
module_path: None,
},
])
.await
.expect("feedback logs should persist");
let artifacts = collect_feedback_upload_artifacts(codex_home, root_thread_id).await;
assert_eq!(
artifacts.attachment_paths,
vec![root_rollout_path, child_rollout_path]
);
let logs = String::from_utf8(
artifacts
.logs_override
.expect("feedback logs should be collected"),
)
.expect("logs should be utf8");
assert!(logs.contains("root-log"));
assert!(logs.contains("child-log"));
}
#[test]
fn discover_feedback_upload_artifacts_uses_explicit_sqlite_home() {
let temp_dir = tempdir().expect("temp dir should exist");
let codex_home = temp_dir.path().join("codex-home");
let sqlite_home = temp_dir.path().join("sqlite-home");
fs::create_dir_all(&codex_home).expect("codex home should exist");
fs::create_dir_all(&sqlite_home).expect("sqlite home should exist");
let root_thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000333")
.expect("root thread id should parse");
let child_thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000444")
.expect("child thread id should parse");
let created_at = chrono::Utc
.timestamp_opt(1_742_050_100, 0)
.single()
.expect("valid ts");
let root_rollout_path = codex_home.join("rollout-root.jsonl");
let child_rollout_path = codex_home.join("rollout-child.jsonl");
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("runtime should build");
runtime.block_on(async {
let state_db = codex_state::StateRuntime::init(sqlite_home.clone(), String::new())
.await
.expect("state db should initialize");
state_db
.mark_backfill_complete(None)
.await
.expect("backfill should be marked complete");
let root_metadata = ThreadMetadataBuilder::new(
root_thread_id,
root_rollout_path.clone(),
created_at,
SessionSource::Cli,
)
.build("");
let child_metadata = ThreadMetadataBuilder::new(
child_thread_id,
child_rollout_path.clone(),
created_at,
SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: root_thread_id,
depth: 1,
agent_nickname: None,
agent_role: None,
}),
)
.build("");
state_db
.upsert_thread(&root_metadata)
.await
.expect("root thread should persist");
state_db
.upsert_thread(&child_metadata)
.await
.expect("child thread should persist");
state_db
.upsert_thread_spawn_edge(
root_thread_id,
child_thread_id,
codex_state::DirectionalThreadSpawnEdgeStatus::Open,
)
.await
.expect("spawn edge should persist");
state_db
.insert_logs(&[
LogEntry {
ts: 1,
ts_nanos: 0,
level: "INFO".to_string(),
target: "cli".to_string(),
message: Some("root-log".to_string()),
feedback_log_body: None,
thread_id: Some(root_thread_id.to_string()),
process_uuid: Some("proc-shared".to_string()),
file: None,
line: None,
module_path: None,
},
LogEntry {
ts: 2,
ts_nanos: 0,
level: "INFO".to_string(),
target: "cli".to_string(),
message: Some("child-log".to_string()),
feedback_log_body: None,
thread_id: Some(child_thread_id.to_string()),
process_uuid: Some("proc-shared".to_string()),
file: None,
line: None,
module_path: None,
},
])
.await
.expect("feedback logs should persist");
});
fs::write(&root_rollout_path, "root rollout").expect("root rollout should be written");
fs::write(&child_rollout_path, "child rollout").expect("child rollout should be written");
let artifacts =
discover_feedback_upload_artifacts(&root_thread_id.to_string(), Some(&sqlite_home));
assert_eq!(
artifacts.attachment_paths,
vec![root_rollout_path, child_rollout_path]
);
let logs = String::from_utf8(
artifacts
.logs_override
.expect("feedback logs should be collected"),
)
.expect("logs should be utf8");
assert!(logs.contains("root-log"));
assert!(logs.contains("child-log"));
}
}

View File

@@ -315,26 +315,71 @@ WHERE id IN (
/// Query per-thread feedback logs, capped to the per-thread SQLite retention budget.
pub async fn query_feedback_logs(&self, thread_id: &str) -> anyhow::Result<Vec<u8>> {
self.query_feedback_logs_for_threads(&[thread_id.to_string()])
.await
}
/// Query feedback logs for multiple threads, capped to the combined SQLite retention budget.
pub async fn query_feedback_logs_for_threads(
&self,
thread_ids: &[String],
) -> anyhow::Result<Vec<u8>> {
if thread_ids.is_empty() {
return Ok(Vec::new());
}
let max_bytes = usize::try_from(LOG_PARTITION_SIZE_LIMIT_BYTES).unwrap_or(usize::MAX);
// Bound the fetched rows in SQL first so over-retained partitions do not have to load
// every row into memory, then apply the exact whole-line byte cap after formatting.
let rows = sqlx::query_as::<_, FeedbackLogRow>(
let mut builder = QueryBuilder::<Sqlite>::new(
r#"
WITH latest_process AS (
SELECT process_uuid
WITH latest_process_candidates AS (
SELECT
thread_id,
process_uuid,
ROW_NUMBER() OVER (
PARTITION BY thread_id
ORDER BY ts DESC, ts_nanos DESC, id DESC
) AS row_number
FROM logs
WHERE thread_id = ? AND process_uuid IS NOT NULL
ORDER BY ts DESC, ts_nanos DESC, id DESC
LIMIT 1
WHERE process_uuid IS NOT NULL
AND thread_id IN (
"#,
);
{
let mut separated = builder.separated(", ");
for thread_id in thread_ids {
separated.push_bind(thread_id);
}
}
builder.push(
r#"
)
),
latest_processes AS (
SELECT DISTINCT process_uuid
FROM latest_process_candidates
WHERE row_number = 1
),
feedback_logs AS (
SELECT ts, ts_nanos, level, feedback_log_body, estimated_bytes, id
FROM logs
WHERE feedback_log_body IS NOT NULL AND (
thread_id = ?
thread_id IN (
"#,
);
{
let mut separated = builder.separated(", ");
for thread_id in thread_ids {
separated.push_bind(thread_id);
}
}
builder.push(
r#"
)
OR (
thread_id IS NULL
AND process_uuid IN (SELECT process_uuid FROM latest_process)
AND process_uuid IN (SELECT process_uuid FROM latest_processes)
)
)
),
@@ -352,15 +397,19 @@ bounded_feedback_logs AS (
)
SELECT ts, ts_nanos, level, feedback_log_body
FROM bounded_feedback_logs
WHERE cumulative_estimated_bytes <= ?
WHERE cumulative_estimated_bytes <=
"#,
);
builder.push_bind(LOG_PARTITION_SIZE_LIMIT_BYTES);
builder.push(
r#"
ORDER BY ts DESC, ts_nanos DESC, id DESC
"#,
)
.bind(thread_id)
.bind(thread_id)
.bind(LOG_PARTITION_SIZE_LIMIT_BYTES)
.fetch_all(self.logs_pool.as_ref())
.await?;
);
let rows = builder
.build_query_as::<FeedbackLogRow>()
.fetch_all(self.logs_pool.as_ref())
.await?;
let mut lines = Vec::new();
let mut total_bytes = 0usize;
@@ -1281,6 +1330,89 @@ mod tests {
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn query_feedback_logs_for_threads_merges_threads_and_dedupes_threadless_process_rows() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string())
.await
.expect("initialize runtime");
runtime
.insert_logs(&[
LogEntry {
ts: 1,
ts_nanos: 0,
level: "INFO".to_string(),
target: "cli".to_string(),
message: Some("thread-1".to_string()),
feedback_log_body: None,
thread_id: Some("thread-1".to_string()),
process_uuid: Some("proc-shared".to_string()),
file: None,
line: None,
module_path: None,
},
LogEntry {
ts: 2,
ts_nanos: 0,
level: "INFO".to_string(),
target: "cli".to_string(),
message: Some("shared-threadless".to_string()),
feedback_log_body: None,
thread_id: None,
process_uuid: Some("proc-shared".to_string()),
file: None,
line: None,
module_path: None,
},
LogEntry {
ts: 3,
ts_nanos: 0,
level: "INFO".to_string(),
target: "cli".to_string(),
message: Some("thread-2".to_string()),
feedback_log_body: None,
thread_id: Some("thread-2".to_string()),
process_uuid: Some("proc-shared".to_string()),
file: None,
line: None,
module_path: None,
},
LogEntry {
ts: 4,
ts_nanos: 0,
level: "INFO".to_string(),
target: "cli".to_string(),
message: Some("other-threadless".to_string()),
feedback_log_body: None,
thread_id: None,
process_uuid: Some("proc-other".to_string()),
file: None,
line: None,
module_path: None,
},
])
.await
.expect("insert test logs");
let bytes = runtime
.query_feedback_logs_for_threads(&["thread-1".to_string(), "thread-2".to_string()])
.await
.expect("query feedback logs");
assert_eq!(
String::from_utf8(bytes).expect("valid utf-8"),
[
format_feedback_log_line(1, 0, "INFO", "thread-1"),
format_feedback_log_line(2, 0, "INFO", "shared-threadless"),
format_feedback_log_line(3, 0, "INFO", "thread-2"),
]
.concat()
);
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn query_feedback_logs_includes_threadless_rows_from_same_process() {
let codex_home = unique_temp_dir();

View File

@@ -142,6 +142,17 @@ ON CONFLICT(child_thread_id) DO UPDATE SET
.await
}
/// List all spawned descendants of `root_thread_id`.
///
/// Descendants are returned breadth-first by depth, then by thread id for stable ordering.
pub async fn list_thread_spawn_descendants(
&self,
root_thread_id: ThreadId,
) -> anyhow::Result<Vec<ThreadId>> {
self.list_thread_spawn_descendants_matching(root_thread_id, /*status*/ None)
.await
}
async fn list_thread_spawn_children_matching(
&self,
parent_thread_id: ThreadId,

View File

@@ -1508,7 +1508,9 @@ impl ChatWidget {
{
tracing::info!(target: "feedback_tags", chatgpt_user_id);
}
let snapshot = self.feedback.snapshot(self.thread_id);
let snapshot = self
.feedback
.snapshot_with_sqlite_home(self.thread_id, Some(self.config.sqlite_home.clone()));
self.show_feedback_note(category, include_logs, snapshot);
}

View File

@@ -1895,7 +1895,9 @@ impl ChatWidget {
category: crate::app_event::FeedbackCategory,
include_logs: bool,
) {
let snapshot = self.feedback.snapshot(self.thread_id);
let snapshot = self
.feedback
.snapshot_with_sqlite_home(self.thread_id, Some(self.config.sqlite_home.clone()));
self.show_feedback_note(category, include_logs, snapshot);
}