mirror of
https://github.com/openai/codex.git
synced 2026-05-02 10:26:45 +00:00
Compare commits
3 Commits
dependabot
...
jif/cascad
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5d3e1f3745 | ||
|
|
ba5c832133 | ||
|
|
aeb382d350 |
5
codex-rs/Cargo.lock
generated
5
codex-rs/Cargo.lock
generated
@@ -2072,9 +2072,14 @@ name = "codex-feedback"
|
||||
version = "0.0.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"chrono",
|
||||
"codex-protocol",
|
||||
"codex-state",
|
||||
"codex-utils-home-dir",
|
||||
"pretty_assertions",
|
||||
"sentry",
|
||||
"tempfile",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
@@ -7029,32 +7029,13 @@ impl CodexMessageProcessor {
|
||||
{
|
||||
tracing::info!(target: "feedback_tags", chatgpt_user_id);
|
||||
}
|
||||
let snapshot = self.feedback.snapshot(conversation_id);
|
||||
let snapshot = self
|
||||
.feedback
|
||||
.snapshot_with_sqlite_home(conversation_id, Some(self.config.sqlite_home.clone()));
|
||||
let thread_id = snapshot.thread_id.clone();
|
||||
let sqlite_feedback_logs = if include_logs {
|
||||
if let Some(log_db) = self.log_db.as_ref() {
|
||||
log_db.flush().await;
|
||||
}
|
||||
let state_db_ctx = get_state_db(&self.config).await;
|
||||
match (state_db_ctx.as_ref(), conversation_id) {
|
||||
(Some(state_db_ctx), Some(conversation_id)) => {
|
||||
let thread_id_text = conversation_id.to_string();
|
||||
match state_db_ctx.query_feedback_logs(&thread_id_text).await {
|
||||
Ok(logs) if logs.is_empty() => None,
|
||||
Ok(logs) => Some(logs),
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to query feedback logs from sqlite for thread_id={thread_id_text}: {err}"
|
||||
);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
if include_logs && let Some(log_db) = self.log_db.as_ref() {
|
||||
log_db.flush().await;
|
||||
}
|
||||
|
||||
let validated_rollout_path = if include_logs {
|
||||
match conversation_id {
|
||||
@@ -7078,7 +7059,7 @@ impl CodexMessageProcessor {
|
||||
include_logs,
|
||||
&attachment_paths,
|
||||
Some(session_source),
|
||||
sqlite_feedback_logs,
|
||||
/*logs_override*/ None,
|
||||
)
|
||||
})
|
||||
.await;
|
||||
|
||||
@@ -7,9 +7,14 @@ license.workspace = true
|
||||
[dependencies]
|
||||
anyhow = { workspace = true }
|
||||
codex-protocol = { workspace = true }
|
||||
codex-state = { workspace = true }
|
||||
codex-utils-home-dir = { workspace = true }
|
||||
sentry = { version = "0.46" }
|
||||
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
|
||||
tracing = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
chrono = { workspace = true }
|
||||
pretty_assertions = { workspace = true }
|
||||
tempfile = { workspace = true }
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::HashSet;
|
||||
use std::collections::VecDeque;
|
||||
use std::collections::btree_map::Entry;
|
||||
use std::fs;
|
||||
use std::io::Write;
|
||||
use std::io::{self};
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
@@ -32,6 +34,12 @@ const UPLOAD_TIMEOUT_SECS: u64 = 10;
|
||||
const FEEDBACK_TAGS_TARGET: &str = "feedback_tags";
|
||||
const MAX_FEEDBACK_TAGS: usize = 64;
|
||||
|
||||
#[derive(Debug, Clone, Default, PartialEq, Eq)]
|
||||
pub struct FeedbackUploadArtifacts {
|
||||
pub attachment_paths: Vec<PathBuf>,
|
||||
pub logs_override: Option<Vec<u8>>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct CodexFeedback {
|
||||
inner: Arc<FeedbackInner>,
|
||||
@@ -94,6 +102,14 @@ impl CodexFeedback {
|
||||
}
|
||||
|
||||
pub fn snapshot(&self, session_id: Option<ThreadId>) -> FeedbackSnapshot {
|
||||
self.snapshot_with_sqlite_home(session_id, /*sqlite_home*/ None)
|
||||
}
|
||||
|
||||
pub fn snapshot_with_sqlite_home(
|
||||
&self,
|
||||
session_id: Option<ThreadId>,
|
||||
sqlite_home: Option<PathBuf>,
|
||||
) -> FeedbackSnapshot {
|
||||
let bytes = {
|
||||
let guard = self.inner.ring.lock().expect("mutex poisoned");
|
||||
guard.snapshot_bytes()
|
||||
@@ -106,6 +122,7 @@ impl CodexFeedback {
|
||||
bytes,
|
||||
tags,
|
||||
feedback_diagnostics: FeedbackDiagnostics::collect_from_env(),
|
||||
sqlite_home,
|
||||
thread_id: session_id
|
||||
.map(|id| id.to_string())
|
||||
.unwrap_or("no-active-thread-".to_string() + &ThreadId::new().to_string()),
|
||||
@@ -209,6 +226,7 @@ pub struct FeedbackSnapshot {
|
||||
bytes: Vec<u8>,
|
||||
tags: BTreeMap<String, String>,
|
||||
feedback_diagnostics: FeedbackDiagnostics,
|
||||
sqlite_home: Option<PathBuf>,
|
||||
pub thread_id: String,
|
||||
}
|
||||
|
||||
@@ -331,8 +349,19 @@ impl FeedbackSnapshot {
|
||||
}
|
||||
envelope.add_item(EnvelopeItem::Event(event));
|
||||
|
||||
let discovered_upload_artifacts = if include_logs {
|
||||
discover_feedback_upload_artifacts(&self.thread_id, self.sqlite_home.as_deref())
|
||||
} else {
|
||||
FeedbackUploadArtifacts::default()
|
||||
};
|
||||
let merged_attachment_paths = merge_attachment_paths(
|
||||
extra_attachment_paths,
|
||||
&discovered_upload_artifacts.attachment_paths,
|
||||
);
|
||||
let merged_logs_override = logs_override.or(discovered_upload_artifacts.logs_override);
|
||||
|
||||
for attachment in
|
||||
self.feedback_attachments(include_logs, extra_attachment_paths, logs_override)
|
||||
self.feedback_attachments(include_logs, &merged_attachment_paths, merged_logs_override)
|
||||
{
|
||||
envelope.add_item(EnvelopeItem::Attachment(attachment));
|
||||
}
|
||||
@@ -398,6 +427,144 @@ impl FeedbackSnapshot {
|
||||
}
|
||||
}
|
||||
|
||||
fn merge_attachment_paths(
|
||||
extra_attachment_paths: &[PathBuf],
|
||||
discovered_attachment_paths: &[PathBuf],
|
||||
) -> Vec<PathBuf> {
|
||||
let mut merged = Vec::with_capacity(
|
||||
extra_attachment_paths
|
||||
.len()
|
||||
.saturating_add(discovered_attachment_paths.len()),
|
||||
);
|
||||
let mut seen_paths = HashSet::<PathBuf>::new();
|
||||
for path in extra_attachment_paths
|
||||
.iter()
|
||||
.chain(discovered_attachment_paths.iter())
|
||||
{
|
||||
if seen_paths.insert(path.clone()) {
|
||||
merged.push(path.clone());
|
||||
}
|
||||
}
|
||||
merged
|
||||
}
|
||||
|
||||
fn discover_feedback_upload_artifacts(
|
||||
thread_id: &str,
|
||||
sqlite_home: Option<&Path>,
|
||||
) -> FeedbackUploadArtifacts {
|
||||
let Ok(root_thread_id) = ThreadId::from_string(thread_id) else {
|
||||
return FeedbackUploadArtifacts::default();
|
||||
};
|
||||
let sqlite_home = match sqlite_home {
|
||||
Some(sqlite_home) => sqlite_home.to_path_buf(),
|
||||
None => match codex_utils_home_dir::find_codex_home() {
|
||||
Ok(codex_home) => codex_home,
|
||||
Err(_) => return FeedbackUploadArtifacts::default(),
|
||||
},
|
||||
};
|
||||
if !codex_state::state_db_path(sqlite_home.as_path()).exists() {
|
||||
return FeedbackUploadArtifacts::default();
|
||||
}
|
||||
|
||||
std::thread::spawn(move || {
|
||||
let runtime = match tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
{
|
||||
Ok(runtime) => runtime,
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
"failed to build tokio runtime for feedback artifact discovery: {err}"
|
||||
);
|
||||
return FeedbackUploadArtifacts::default();
|
||||
}
|
||||
};
|
||||
runtime.block_on(collect_feedback_upload_artifacts(
|
||||
sqlite_home,
|
||||
root_thread_id,
|
||||
))
|
||||
})
|
||||
.join()
|
||||
.unwrap_or_else(|_| FeedbackUploadArtifacts::default())
|
||||
}
|
||||
|
||||
async fn collect_feedback_upload_artifacts(
|
||||
codex_home: PathBuf,
|
||||
root_thread_id: ThreadId,
|
||||
) -> FeedbackUploadArtifacts {
|
||||
let mut artifacts = FeedbackUploadArtifacts::default();
|
||||
let state_db = match open_feedback_state_db(codex_home.as_path()).await {
|
||||
Some(state_db) => state_db,
|
||||
None => return artifacts,
|
||||
};
|
||||
|
||||
if let Some(root_rollout_path) =
|
||||
find_rollout_path_by_id(state_db.as_ref(), root_thread_id).await
|
||||
{
|
||||
artifacts.attachment_paths.push(root_rollout_path);
|
||||
}
|
||||
|
||||
let descendant_ids = match state_db.list_thread_spawn_descendants(root_thread_id).await {
|
||||
Ok(descendant_ids) => descendant_ids,
|
||||
Err(err) => {
|
||||
tracing::warn!("failed to list descendant threads for feedback upload: {err}");
|
||||
Vec::new()
|
||||
}
|
||||
};
|
||||
let mut seen_paths = artifacts
|
||||
.attachment_paths
|
||||
.iter()
|
||||
.cloned()
|
||||
.collect::<HashSet<_>>();
|
||||
for descendant_id in &descendant_ids {
|
||||
if let Some(path) = find_rollout_path_by_id(state_db.as_ref(), *descendant_id).await
|
||||
&& seen_paths.insert(path.clone())
|
||||
{
|
||||
artifacts.attachment_paths.push(path);
|
||||
}
|
||||
}
|
||||
|
||||
let mut log_thread_ids = Vec::with_capacity(descendant_ids.len().saturating_add(1));
|
||||
log_thread_ids.push(root_thread_id.to_string());
|
||||
log_thread_ids.extend(
|
||||
descendant_ids
|
||||
.into_iter()
|
||||
.map(|thread_id| thread_id.to_string()),
|
||||
);
|
||||
artifacts.logs_override = match state_db
|
||||
.query_feedback_logs_for_threads(&log_thread_ids)
|
||||
.await
|
||||
{
|
||||
Ok(logs) if logs.is_empty() => None,
|
||||
Ok(logs) => Some(logs),
|
||||
Err(err) => {
|
||||
tracing::warn!("failed to query feedback logs from sqlite for upload: {err}");
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
artifacts
|
||||
}
|
||||
|
||||
async fn open_feedback_state_db(codex_home: &Path) -> Option<Arc<codex_state::StateRuntime>> {
|
||||
codex_state::StateRuntime::init(codex_home.to_path_buf(), String::new())
|
||||
.await
|
||||
.ok()
|
||||
}
|
||||
|
||||
async fn find_rollout_path_by_id(
|
||||
state_db: &codex_state::StateRuntime,
|
||||
thread_id: ThreadId,
|
||||
) -> Option<PathBuf> {
|
||||
state_db
|
||||
.find_rollout_path_by_id(thread_id, /*archived_only*/ None)
|
||||
.await
|
||||
.unwrap_or_else(|err| {
|
||||
tracing::warn!("failed to resolve rollout path during feedback upload: {err}");
|
||||
None
|
||||
})
|
||||
}
|
||||
|
||||
fn display_classification(classification: &str) -> String {
|
||||
match classification {
|
||||
"bug" => "Bug".to_string(),
|
||||
@@ -483,8 +650,14 @@ mod tests {
|
||||
use std::fs;
|
||||
|
||||
use super::*;
|
||||
use chrono::TimeZone;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use codex_state::LogEntry;
|
||||
use codex_state::ThreadMetadataBuilder;
|
||||
use feedback_diagnostics::FeedbackDiagnostic;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tempfile::tempdir;
|
||||
use tracing_subscriber::layer::SubscriberExt;
|
||||
use tracing_subscriber::util::SubscriberInitExt;
|
||||
|
||||
@@ -557,6 +730,7 @@ mod tests {
|
||||
);
|
||||
let attachments_without_diagnostics = CodexFeedback::new()
|
||||
.snapshot(None)
|
||||
.with_feedback_diagnostics(FeedbackDiagnostics::new(vec![]))
|
||||
.feedback_attachments(true, &[], Some(vec![1]));
|
||||
|
||||
assert_eq!(
|
||||
@@ -569,4 +743,229 @@ mod tests {
|
||||
assert_eq!(attachments_without_diagnostics[0].buffer, vec![1]);
|
||||
fs::remove_file(extra_path).expect("extra attachment should be removed");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn collect_feedback_upload_artifacts_includes_child_rollouts_and_logs() {
|
||||
let temp_dir = tempdir().expect("temp dir should exist");
|
||||
let codex_home = temp_dir.path().to_path_buf();
|
||||
let state_db = codex_state::StateRuntime::init(codex_home.clone(), String::new())
|
||||
.await
|
||||
.expect("state db should initialize");
|
||||
state_db
|
||||
.mark_backfill_complete(None)
|
||||
.await
|
||||
.expect("backfill should be marked complete");
|
||||
|
||||
let root_thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000111")
|
||||
.expect("root thread id should parse");
|
||||
let child_thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000222")
|
||||
.expect("child thread id should parse");
|
||||
let created_at = chrono::Utc
|
||||
.timestamp_opt(1_742_050_000, 0)
|
||||
.single()
|
||||
.expect("valid ts");
|
||||
let root_rollout_path = codex_home.join("rollout-root.jsonl");
|
||||
let child_rollout_path = codex_home.join("rollout-child.jsonl");
|
||||
|
||||
let root_metadata = ThreadMetadataBuilder::new(
|
||||
root_thread_id,
|
||||
root_rollout_path.clone(),
|
||||
created_at,
|
||||
SessionSource::Cli,
|
||||
)
|
||||
.build("");
|
||||
let child_metadata = ThreadMetadataBuilder::new(
|
||||
child_thread_id,
|
||||
child_rollout_path.clone(),
|
||||
created_at,
|
||||
SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id: root_thread_id,
|
||||
depth: 1,
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
}),
|
||||
)
|
||||
.build("");
|
||||
state_db
|
||||
.upsert_thread(&root_metadata)
|
||||
.await
|
||||
.expect("root thread should persist");
|
||||
state_db
|
||||
.upsert_thread(&child_metadata)
|
||||
.await
|
||||
.expect("child thread should persist");
|
||||
state_db
|
||||
.upsert_thread_spawn_edge(
|
||||
root_thread_id,
|
||||
child_thread_id,
|
||||
codex_state::DirectionalThreadSpawnEdgeStatus::Open,
|
||||
)
|
||||
.await
|
||||
.expect("spawn edge should persist");
|
||||
state_db
|
||||
.insert_logs(&[
|
||||
LogEntry {
|
||||
ts: 1,
|
||||
ts_nanos: 0,
|
||||
level: "INFO".to_string(),
|
||||
target: "cli".to_string(),
|
||||
message: Some("root-log".to_string()),
|
||||
feedback_log_body: None,
|
||||
thread_id: Some(root_thread_id.to_string()),
|
||||
process_uuid: Some("proc-shared".to_string()),
|
||||
file: None,
|
||||
line: None,
|
||||
module_path: None,
|
||||
},
|
||||
LogEntry {
|
||||
ts: 2,
|
||||
ts_nanos: 0,
|
||||
level: "INFO".to_string(),
|
||||
target: "cli".to_string(),
|
||||
message: Some("child-log".to_string()),
|
||||
feedback_log_body: None,
|
||||
thread_id: Some(child_thread_id.to_string()),
|
||||
process_uuid: Some("proc-shared".to_string()),
|
||||
file: None,
|
||||
line: None,
|
||||
module_path: None,
|
||||
},
|
||||
])
|
||||
.await
|
||||
.expect("feedback logs should persist");
|
||||
|
||||
let artifacts = collect_feedback_upload_artifacts(codex_home, root_thread_id).await;
|
||||
|
||||
assert_eq!(
|
||||
artifacts.attachment_paths,
|
||||
vec![root_rollout_path, child_rollout_path]
|
||||
);
|
||||
let logs = String::from_utf8(
|
||||
artifacts
|
||||
.logs_override
|
||||
.expect("feedback logs should be collected"),
|
||||
)
|
||||
.expect("logs should be utf8");
|
||||
assert!(logs.contains("root-log"));
|
||||
assert!(logs.contains("child-log"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn discover_feedback_upload_artifacts_uses_explicit_sqlite_home() {
|
||||
let temp_dir = tempdir().expect("temp dir should exist");
|
||||
let codex_home = temp_dir.path().join("codex-home");
|
||||
let sqlite_home = temp_dir.path().join("sqlite-home");
|
||||
fs::create_dir_all(&codex_home).expect("codex home should exist");
|
||||
fs::create_dir_all(&sqlite_home).expect("sqlite home should exist");
|
||||
|
||||
let root_thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000333")
|
||||
.expect("root thread id should parse");
|
||||
let child_thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000444")
|
||||
.expect("child thread id should parse");
|
||||
let created_at = chrono::Utc
|
||||
.timestamp_opt(1_742_050_100, 0)
|
||||
.single()
|
||||
.expect("valid ts");
|
||||
let root_rollout_path = codex_home.join("rollout-root.jsonl");
|
||||
let child_rollout_path = codex_home.join("rollout-child.jsonl");
|
||||
|
||||
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("runtime should build");
|
||||
runtime.block_on(async {
|
||||
let state_db = codex_state::StateRuntime::init(sqlite_home.clone(), String::new())
|
||||
.await
|
||||
.expect("state db should initialize");
|
||||
state_db
|
||||
.mark_backfill_complete(None)
|
||||
.await
|
||||
.expect("backfill should be marked complete");
|
||||
|
||||
let root_metadata = ThreadMetadataBuilder::new(
|
||||
root_thread_id,
|
||||
root_rollout_path.clone(),
|
||||
created_at,
|
||||
SessionSource::Cli,
|
||||
)
|
||||
.build("");
|
||||
let child_metadata = ThreadMetadataBuilder::new(
|
||||
child_thread_id,
|
||||
child_rollout_path.clone(),
|
||||
created_at,
|
||||
SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id: root_thread_id,
|
||||
depth: 1,
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
}),
|
||||
)
|
||||
.build("");
|
||||
state_db
|
||||
.upsert_thread(&root_metadata)
|
||||
.await
|
||||
.expect("root thread should persist");
|
||||
state_db
|
||||
.upsert_thread(&child_metadata)
|
||||
.await
|
||||
.expect("child thread should persist");
|
||||
state_db
|
||||
.upsert_thread_spawn_edge(
|
||||
root_thread_id,
|
||||
child_thread_id,
|
||||
codex_state::DirectionalThreadSpawnEdgeStatus::Open,
|
||||
)
|
||||
.await
|
||||
.expect("spawn edge should persist");
|
||||
state_db
|
||||
.insert_logs(&[
|
||||
LogEntry {
|
||||
ts: 1,
|
||||
ts_nanos: 0,
|
||||
level: "INFO".to_string(),
|
||||
target: "cli".to_string(),
|
||||
message: Some("root-log".to_string()),
|
||||
feedback_log_body: None,
|
||||
thread_id: Some(root_thread_id.to_string()),
|
||||
process_uuid: Some("proc-shared".to_string()),
|
||||
file: None,
|
||||
line: None,
|
||||
module_path: None,
|
||||
},
|
||||
LogEntry {
|
||||
ts: 2,
|
||||
ts_nanos: 0,
|
||||
level: "INFO".to_string(),
|
||||
target: "cli".to_string(),
|
||||
message: Some("child-log".to_string()),
|
||||
feedback_log_body: None,
|
||||
thread_id: Some(child_thread_id.to_string()),
|
||||
process_uuid: Some("proc-shared".to_string()),
|
||||
file: None,
|
||||
line: None,
|
||||
module_path: None,
|
||||
},
|
||||
])
|
||||
.await
|
||||
.expect("feedback logs should persist");
|
||||
});
|
||||
fs::write(&root_rollout_path, "root rollout").expect("root rollout should be written");
|
||||
fs::write(&child_rollout_path, "child rollout").expect("child rollout should be written");
|
||||
|
||||
let artifacts =
|
||||
discover_feedback_upload_artifacts(&root_thread_id.to_string(), Some(&sqlite_home));
|
||||
|
||||
assert_eq!(
|
||||
artifacts.attachment_paths,
|
||||
vec![root_rollout_path, child_rollout_path]
|
||||
);
|
||||
let logs = String::from_utf8(
|
||||
artifacts
|
||||
.logs_override
|
||||
.expect("feedback logs should be collected"),
|
||||
)
|
||||
.expect("logs should be utf8");
|
||||
assert!(logs.contains("root-log"));
|
||||
assert!(logs.contains("child-log"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -315,26 +315,71 @@ WHERE id IN (
|
||||
|
||||
/// Query per-thread feedback logs, capped to the per-thread SQLite retention budget.
|
||||
pub async fn query_feedback_logs(&self, thread_id: &str) -> anyhow::Result<Vec<u8>> {
|
||||
self.query_feedback_logs_for_threads(&[thread_id.to_string()])
|
||||
.await
|
||||
}
|
||||
|
||||
/// Query feedback logs for multiple threads, capped to the combined SQLite retention budget.
|
||||
pub async fn query_feedback_logs_for_threads(
|
||||
&self,
|
||||
thread_ids: &[String],
|
||||
) -> anyhow::Result<Vec<u8>> {
|
||||
if thread_ids.is_empty() {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
let max_bytes = usize::try_from(LOG_PARTITION_SIZE_LIMIT_BYTES).unwrap_or(usize::MAX);
|
||||
// Bound the fetched rows in SQL first so over-retained partitions do not have to load
|
||||
// every row into memory, then apply the exact whole-line byte cap after formatting.
|
||||
let rows = sqlx::query_as::<_, FeedbackLogRow>(
|
||||
let mut builder = QueryBuilder::<Sqlite>::new(
|
||||
r#"
|
||||
WITH latest_process AS (
|
||||
SELECT process_uuid
|
||||
WITH latest_process_candidates AS (
|
||||
SELECT
|
||||
thread_id,
|
||||
process_uuid,
|
||||
ROW_NUMBER() OVER (
|
||||
PARTITION BY thread_id
|
||||
ORDER BY ts DESC, ts_nanos DESC, id DESC
|
||||
) AS row_number
|
||||
FROM logs
|
||||
WHERE thread_id = ? AND process_uuid IS NOT NULL
|
||||
ORDER BY ts DESC, ts_nanos DESC, id DESC
|
||||
LIMIT 1
|
||||
WHERE process_uuid IS NOT NULL
|
||||
AND thread_id IN (
|
||||
"#,
|
||||
);
|
||||
{
|
||||
let mut separated = builder.separated(", ");
|
||||
for thread_id in thread_ids {
|
||||
separated.push_bind(thread_id);
|
||||
}
|
||||
}
|
||||
builder.push(
|
||||
r#"
|
||||
)
|
||||
),
|
||||
latest_processes AS (
|
||||
SELECT DISTINCT process_uuid
|
||||
FROM latest_process_candidates
|
||||
WHERE row_number = 1
|
||||
),
|
||||
feedback_logs AS (
|
||||
SELECT ts, ts_nanos, level, feedback_log_body, estimated_bytes, id
|
||||
FROM logs
|
||||
WHERE feedback_log_body IS NOT NULL AND (
|
||||
thread_id = ?
|
||||
thread_id IN (
|
||||
"#,
|
||||
);
|
||||
{
|
||||
let mut separated = builder.separated(", ");
|
||||
for thread_id in thread_ids {
|
||||
separated.push_bind(thread_id);
|
||||
}
|
||||
}
|
||||
builder.push(
|
||||
r#"
|
||||
)
|
||||
OR (
|
||||
thread_id IS NULL
|
||||
AND process_uuid IN (SELECT process_uuid FROM latest_process)
|
||||
AND process_uuid IN (SELECT process_uuid FROM latest_processes)
|
||||
)
|
||||
)
|
||||
),
|
||||
@@ -352,15 +397,19 @@ bounded_feedback_logs AS (
|
||||
)
|
||||
SELECT ts, ts_nanos, level, feedback_log_body
|
||||
FROM bounded_feedback_logs
|
||||
WHERE cumulative_estimated_bytes <= ?
|
||||
WHERE cumulative_estimated_bytes <=
|
||||
"#,
|
||||
);
|
||||
builder.push_bind(LOG_PARTITION_SIZE_LIMIT_BYTES);
|
||||
builder.push(
|
||||
r#"
|
||||
ORDER BY ts DESC, ts_nanos DESC, id DESC
|
||||
"#,
|
||||
)
|
||||
.bind(thread_id)
|
||||
.bind(thread_id)
|
||||
.bind(LOG_PARTITION_SIZE_LIMIT_BYTES)
|
||||
.fetch_all(self.logs_pool.as_ref())
|
||||
.await?;
|
||||
);
|
||||
let rows = builder
|
||||
.build_query_as::<FeedbackLogRow>()
|
||||
.fetch_all(self.logs_pool.as_ref())
|
||||
.await?;
|
||||
|
||||
let mut lines = Vec::new();
|
||||
let mut total_bytes = 0usize;
|
||||
@@ -1281,6 +1330,89 @@ mod tests {
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn query_feedback_logs_for_threads_merges_threads_and_dedupes_threadless_process_rows() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string())
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
|
||||
runtime
|
||||
.insert_logs(&[
|
||||
LogEntry {
|
||||
ts: 1,
|
||||
ts_nanos: 0,
|
||||
level: "INFO".to_string(),
|
||||
target: "cli".to_string(),
|
||||
message: Some("thread-1".to_string()),
|
||||
feedback_log_body: None,
|
||||
thread_id: Some("thread-1".to_string()),
|
||||
process_uuid: Some("proc-shared".to_string()),
|
||||
file: None,
|
||||
line: None,
|
||||
module_path: None,
|
||||
},
|
||||
LogEntry {
|
||||
ts: 2,
|
||||
ts_nanos: 0,
|
||||
level: "INFO".to_string(),
|
||||
target: "cli".to_string(),
|
||||
message: Some("shared-threadless".to_string()),
|
||||
feedback_log_body: None,
|
||||
thread_id: None,
|
||||
process_uuid: Some("proc-shared".to_string()),
|
||||
file: None,
|
||||
line: None,
|
||||
module_path: None,
|
||||
},
|
||||
LogEntry {
|
||||
ts: 3,
|
||||
ts_nanos: 0,
|
||||
level: "INFO".to_string(),
|
||||
target: "cli".to_string(),
|
||||
message: Some("thread-2".to_string()),
|
||||
feedback_log_body: None,
|
||||
thread_id: Some("thread-2".to_string()),
|
||||
process_uuid: Some("proc-shared".to_string()),
|
||||
file: None,
|
||||
line: None,
|
||||
module_path: None,
|
||||
},
|
||||
LogEntry {
|
||||
ts: 4,
|
||||
ts_nanos: 0,
|
||||
level: "INFO".to_string(),
|
||||
target: "cli".to_string(),
|
||||
message: Some("other-threadless".to_string()),
|
||||
feedback_log_body: None,
|
||||
thread_id: None,
|
||||
process_uuid: Some("proc-other".to_string()),
|
||||
file: None,
|
||||
line: None,
|
||||
module_path: None,
|
||||
},
|
||||
])
|
||||
.await
|
||||
.expect("insert test logs");
|
||||
|
||||
let bytes = runtime
|
||||
.query_feedback_logs_for_threads(&["thread-1".to_string(), "thread-2".to_string()])
|
||||
.await
|
||||
.expect("query feedback logs");
|
||||
|
||||
assert_eq!(
|
||||
String::from_utf8(bytes).expect("valid utf-8"),
|
||||
[
|
||||
format_feedback_log_line(1, 0, "INFO", "thread-1"),
|
||||
format_feedback_log_line(2, 0, "INFO", "shared-threadless"),
|
||||
format_feedback_log_line(3, 0, "INFO", "thread-2"),
|
||||
]
|
||||
.concat()
|
||||
);
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn query_feedback_logs_includes_threadless_rows_from_same_process() {
|
||||
let codex_home = unique_temp_dir();
|
||||
|
||||
@@ -142,6 +142,17 @@ ON CONFLICT(child_thread_id) DO UPDATE SET
|
||||
.await
|
||||
}
|
||||
|
||||
/// List all spawned descendants of `root_thread_id`.
|
||||
///
|
||||
/// Descendants are returned breadth-first by depth, then by thread id for stable ordering.
|
||||
pub async fn list_thread_spawn_descendants(
|
||||
&self,
|
||||
root_thread_id: ThreadId,
|
||||
) -> anyhow::Result<Vec<ThreadId>> {
|
||||
self.list_thread_spawn_descendants_matching(root_thread_id, /*status*/ None)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn list_thread_spawn_children_matching(
|
||||
&self,
|
||||
parent_thread_id: ThreadId,
|
||||
|
||||
@@ -1508,7 +1508,9 @@ impl ChatWidget {
|
||||
{
|
||||
tracing::info!(target: "feedback_tags", chatgpt_user_id);
|
||||
}
|
||||
let snapshot = self.feedback.snapshot(self.thread_id);
|
||||
let snapshot = self
|
||||
.feedback
|
||||
.snapshot_with_sqlite_home(self.thread_id, Some(self.config.sqlite_home.clone()));
|
||||
self.show_feedback_note(category, include_logs, snapshot);
|
||||
}
|
||||
|
||||
|
||||
@@ -1895,7 +1895,9 @@ impl ChatWidget {
|
||||
category: crate::app_event::FeedbackCategory,
|
||||
include_logs: bool,
|
||||
) {
|
||||
let snapshot = self.feedback.snapshot(self.thread_id);
|
||||
let snapshot = self
|
||||
.feedback
|
||||
.snapshot_with_sqlite_home(self.thread_id, Some(self.config.sqlite_home.clone()));
|
||||
self.show_feedback_note(category, include_logs, snapshot);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user