mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
state: enforce 10 MiB log caps for thread and threadless process logs (#12038)
## Summary - enforce a 10 MiB cap per `thread_id` in state log storage - enforce a 10 MiB cap per `process_uuid` for threadless (`thread_id IS NULL`) logs - scope pruning to only keys affected by the current insert batch - add a cheap per-key `SUM(...)` precheck so windowed prune queries only run for keys that are currently over the cap - add SQLite indexes used by the pruning queries - add focused runtime tests covering both pruning behaviors ## Why This keeps log growth bounded by the intended partition semantics while preserving a small, readable implementation localized to the existing insert path. ## Local Latency Snapshot (No Truncation-Pressure Run) Collected from session `019c734f-1d16-7002-9e00-c966c9fbbcae` using local-only (uncommitted) instrumentation, while not specifically benchmarking the truncation-heavy regime. ### Percentiles By Query (ms) | query | count | p50 | p90 | p95 | p99 | max | |---|---:|---:|---:|---:|---:|---:| | `insert_logs.insert_batch` | 110 | 0.332 | 0.999 | 1.811 | 2.978 | 3.493 | | `insert_logs.precheck.process` | 106 | 0.074 | 0.152 | 0.206 | 0.258 | 0.426 | | `insert_logs.precheck.thread` | 73 | 0.118 | 0.206 | 0.253 | 1.025 | 1.025 | | `insert_logs.prune.process` | 58 | 0.291 | 0.576 | 0.607 | 1.088 | 1.088 | | `insert_logs.prune.thread` | 44 | 0.318 | 0.467 | 0.728 | 0.797 | 0.797 | | `insert_logs.prune_total` | 110 | 0.488 | 0.976 | 1.237 | 1.593 | 1.684 | | `insert_logs.total` | 110 | 1.315 | 2.889 | 3.623 | 5.739 | 5.961 | | `insert_logs.tx_begin` | 110 | 0.133 | 0.235 | 0.282 | 0.412 | 0.546 | | `insert_logs.tx_commit` | 110 | 0.259 | 0.689 | 0.772 | 1.065 | 1.080 | ### `insert_logs.total` Histogram (ms) | bucket | count | |---|---:| | `<= 0.100` | 0 | | `<= 0.250` | 0 | | `<= 0.500` | 7 | | `<= 1.000` | 33 | | `<= 2.000` | 40 | | `<= 5.000` | 28 | | `<= 10.000` | 2 | | `<= 20.000` | 0 | | `<= 50.000` | 0 | | `<= 100.000` | 0 | | `> 100.000` | 0 | ## Local Latency Snapshot (Truncation-Heavy / Cap-Hit Regime) Collected from a run where cap-hit behavior was frequent (`135/180` insert calls), using local-only (uncommitted) instrumentation and a temporary local cap of `10_000` bytes for stress testing (not the merged `10 MiB` cap). ### Percentiles By Query (ms) | query | count | p50 | p90 | p95 | p99 | max | |---|---:|---:|---:|---:|---:|---:| | `insert_logs.insert_batch` | 180 | 0.524 | 1.645 | 2.163 | 3.424 | 3.777 | | `insert_logs.precheck.process` | 171 | 0.086 | 0.235 | 0.373 | 0.758 | 1.147 | | `insert_logs.precheck.thread` | 100 | 0.105 | 0.251 | 0.291 | 1.176 | 1.622 | | `insert_logs.prune.process` | 109 | 0.386 | 0.839 | 1.146 | 1.548 | 2.588 | | `insert_logs.prune.thread` | 56 | 0.253 | 0.550 | 1.148 | 2.484 | 2.484 | | `insert_logs.prune_total` | 180 | 0.511 | 1.221 | 1.695 | 4.548 | 5.512 | | `insert_logs.total` | 180 | 1.631 | 3.902 | 5.103 | 8.901 | 9.095 | | `insert_logs.total_cap_hit` | 135 | 1.876 | 4.501 | 5.547 | 8.902 | 9.096 | | `insert_logs.total_no_cap_hit` | 45 | 0.520 | 1.700 | 2.079 | 3.294 | 3.294 | | `insert_logs.tx_begin` | 180 | 0.109 | 0.253 | 0.287 | 1.088 | 1.406 | | `insert_logs.tx_commit` | 180 | 0.267 | 0.813 | 1.170 | 2.497 | 2.574 | ### `insert_logs.total` Histogram (ms) | bucket | count | |---|---:| | `<= 0.100` | 0 | | `<= 0.250` | 0 | | `<= 0.500` | 16 | | `<= 1.000` | 39 | | `<= 2.000` | 60 | | `<= 5.000` | 54 | | `<= 10.000` | 11 | | `<= 20.000` | 0 | | `<= 50.000` | 0 | | `<= 100.000` | 0 | | `> 100.000` | 0 | ### `insert_logs.total` Histogram When Cap Was Hit (ms) | bucket | count | |---|---:| | `<= 0.100` | 0 | | `<= 0.250` | 0 | | `<= 0.500` | 0 | | `<= 1.000` | 22 | | `<= 2.000` | 51 | | `<= 5.000` | 51 | | `<= 10.000` | 11 | | `<= 20.000` | 0 | | `<= 50.000` | 0 | | `<= 100.000` | 0 | | `> 100.000` | 0 | ### Performance Takeaways - Even in a cap-hit-heavy run (`75%` cap-hit calls), `insert_logs.total` stays sub-10ms at p99 (`8.901ms`) and max (`9.095ms`). - Calls that did **not** hit the cap are materially cheaper (`insert_logs.total_no_cap_hit` p95 `2.079ms`) than cap-hit calls (`insert_logs.total_cap_hit` p95 `5.547ms`). - Compared to the earlier non-truncation-pressure run, overall `insert_logs.total` rose from p95 `3.623ms` to p95 `5.103ms` (+`1.48ms`), indicating bounded overhead when pruning is active. - This truncation-heavy run used an intentionally low local cap for stress testing; with the real 10 MiB cap, cap-hit frequency should be much lower in normal sessions. ## Testing - `just fmt` (in `codex-rs`) - `cargo test -p codex-state` (in `codex-rs`)
This commit is contained in:
committed by
GitHub
parent
1f54496c48
commit
7f3dbaeb25
@@ -0,0 +1,4 @@
|
||||
CREATE INDEX idx_logs_thread_id_ts ON logs(thread_id, ts DESC, ts_nanos DESC, id DESC);
|
||||
|
||||
CREATE INDEX idx_logs_process_uuid_threadless_ts ON logs(process_uuid, ts DESC, ts_nanos DESC, id DESC)
|
||||
WHERE thread_id IS NULL;
|
||||
9
codex-rs/state/migrations/0012_logs_estimated_bytes.sql
Normal file
9
codex-rs/state/migrations/0012_logs_estimated_bytes.sql
Normal file
@@ -0,0 +1,9 @@
|
||||
ALTER TABLE logs ADD COLUMN estimated_bytes INTEGER NOT NULL DEFAULT 0;
|
||||
|
||||
UPDATE logs
|
||||
SET estimated_bytes =
|
||||
LENGTH(CAST(COALESCE(message, '') AS BLOB))
|
||||
+ LENGTH(CAST(level AS BLOB))
|
||||
+ LENGTH(CAST(target AS BLOB))
|
||||
+ LENGTH(CAST(COALESCE(module_path, '') AS BLOB))
|
||||
+ LENGTH(CAST(COALESCE(file, '') AS BLOB));
|
||||
@@ -27,11 +27,13 @@ use sqlx::ConnectOptions;
|
||||
use sqlx::QueryBuilder;
|
||||
use sqlx::Row;
|
||||
use sqlx::Sqlite;
|
||||
use sqlx::SqliteConnection;
|
||||
use sqlx::SqlitePool;
|
||||
use sqlx::sqlite::SqliteConnectOptions;
|
||||
use sqlx::sqlite::SqliteJournalMode;
|
||||
use sqlx::sqlite::SqlitePoolOptions;
|
||||
use sqlx::sqlite::SqliteSynchronous;
|
||||
use std::collections::BTreeSet;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
@@ -42,6 +44,12 @@ use uuid::Uuid;
|
||||
mod memories;
|
||||
// Memory-specific CRUD and phase job lifecycle methods live in `runtime/memories.rs`.
|
||||
|
||||
// "Partition" is the retention bucket we cap at 10 MiB:
|
||||
// - one bucket per non-null thread_id
|
||||
// - one bucket per threadless (thread_id IS NULL) non-null process_uuid
|
||||
// - one bucket for threadless rows with process_uuid IS NULL
|
||||
const LOG_PARTITION_SIZE_LIMIT_BYTES: i64 = 10 * 1024 * 1024;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct StateRuntime {
|
||||
codex_home: PathBuf,
|
||||
@@ -359,10 +367,16 @@ FROM threads
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut tx = self.pool.begin().await?;
|
||||
let mut builder = QueryBuilder::<Sqlite>::new(
|
||||
"INSERT INTO logs (ts, ts_nanos, level, target, message, thread_id, process_uuid, module_path, file, line) ",
|
||||
"INSERT INTO logs (ts, ts_nanos, level, target, message, thread_id, process_uuid, module_path, file, line, estimated_bytes) ",
|
||||
);
|
||||
builder.push_values(entries, |mut row, entry| {
|
||||
let estimated_bytes = entry.message.as_ref().map_or(0, String::len) as i64
|
||||
+ entry.level.len() as i64
|
||||
+ entry.target.len() as i64
|
||||
+ entry.module_path.as_ref().map_or(0, String::len) as i64
|
||||
+ entry.file.as_ref().map_or(0, String::len) as i64;
|
||||
row.push_bind(entry.ts)
|
||||
.push_bind(entry.ts_nanos)
|
||||
.push_bind(&entry.level)
|
||||
@@ -372,9 +386,228 @@ FROM threads
|
||||
.push_bind(&entry.process_uuid)
|
||||
.push_bind(&entry.module_path)
|
||||
.push_bind(&entry.file)
|
||||
.push_bind(entry.line);
|
||||
.push_bind(entry.line)
|
||||
.push_bind(estimated_bytes);
|
||||
});
|
||||
builder.build().execute(self.pool.as_ref()).await?;
|
||||
builder.build().execute(&mut *tx).await?;
|
||||
self.prune_logs_after_insert(entries, &mut tx).await?;
|
||||
tx.commit().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Enforce per-partition log size caps after a successful batch insert.
|
||||
///
|
||||
/// We maintain two independent budgets:
|
||||
/// - Thread logs: rows with `thread_id IS NOT NULL`, capped per `thread_id`.
|
||||
/// - Threadless process logs: rows with `thread_id IS NULL` ("threadless"),
|
||||
/// capped per `process_uuid` (including `process_uuid IS NULL` as its own
|
||||
/// threadless partition).
|
||||
///
|
||||
/// "Threadless" means the log row is not associated with any conversation
|
||||
/// thread, so retention is keyed by process identity instead.
|
||||
///
|
||||
/// This runs inside the same transaction as the insert so callers never
|
||||
/// observe "inserted but not yet pruned" rows.
|
||||
async fn prune_logs_after_insert(
|
||||
&self,
|
||||
entries: &[LogEntry],
|
||||
tx: &mut SqliteConnection,
|
||||
) -> anyhow::Result<()> {
|
||||
let thread_ids: BTreeSet<&str> = entries
|
||||
.iter()
|
||||
.filter_map(|entry| entry.thread_id.as_deref())
|
||||
.collect();
|
||||
if !thread_ids.is_empty() {
|
||||
// Cheap precheck: only run the heavier window-function prune for
|
||||
// threads that are currently above the cap.
|
||||
let mut over_limit_threads_query =
|
||||
QueryBuilder::<Sqlite>::new("SELECT thread_id FROM logs WHERE thread_id IN (");
|
||||
{
|
||||
let mut separated = over_limit_threads_query.separated(", ");
|
||||
for thread_id in &thread_ids {
|
||||
separated.push_bind(*thread_id);
|
||||
}
|
||||
}
|
||||
over_limit_threads_query.push(") GROUP BY thread_id HAVING SUM(");
|
||||
over_limit_threads_query.push("estimated_bytes");
|
||||
over_limit_threads_query.push(") > ");
|
||||
over_limit_threads_query.push_bind(LOG_PARTITION_SIZE_LIMIT_BYTES);
|
||||
let over_limit_thread_ids: Vec<String> = over_limit_threads_query
|
||||
.build()
|
||||
.fetch_all(&mut *tx)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|row| row.try_get("thread_id"))
|
||||
.collect::<Result<_, _>>()?;
|
||||
if !over_limit_thread_ids.is_empty() {
|
||||
// Enforce a strict per-thread cap by deleting every row whose
|
||||
// newest-first cumulative bytes exceed the partition budget.
|
||||
let mut prune_threads = QueryBuilder::<Sqlite>::new(
|
||||
r#"
|
||||
DELETE FROM logs
|
||||
WHERE id IN (
|
||||
SELECT id
|
||||
FROM (
|
||||
SELECT
|
||||
id,
|
||||
SUM(
|
||||
"#,
|
||||
);
|
||||
prune_threads.push("estimated_bytes");
|
||||
prune_threads.push(
|
||||
r#"
|
||||
) OVER (
|
||||
PARTITION BY thread_id
|
||||
ORDER BY ts DESC, ts_nanos DESC, id DESC
|
||||
) AS cumulative_bytes
|
||||
FROM logs
|
||||
WHERE thread_id IN (
|
||||
"#,
|
||||
);
|
||||
{
|
||||
let mut separated = prune_threads.separated(", ");
|
||||
for thread_id in &over_limit_thread_ids {
|
||||
separated.push_bind(thread_id);
|
||||
}
|
||||
}
|
||||
prune_threads.push(
|
||||
r#"
|
||||
)
|
||||
)
|
||||
WHERE cumulative_bytes >
|
||||
"#,
|
||||
);
|
||||
prune_threads.push_bind(LOG_PARTITION_SIZE_LIMIT_BYTES);
|
||||
prune_threads.push("\n)");
|
||||
prune_threads.build().execute(&mut *tx).await?;
|
||||
}
|
||||
}
|
||||
|
||||
let threadless_process_uuids: BTreeSet<&str> = entries
|
||||
.iter()
|
||||
.filter(|entry| entry.thread_id.is_none())
|
||||
.filter_map(|entry| entry.process_uuid.as_deref())
|
||||
.collect();
|
||||
let has_threadless_null_process_uuid = entries
|
||||
.iter()
|
||||
.any(|entry| entry.thread_id.is_none() && entry.process_uuid.is_none());
|
||||
if !threadless_process_uuids.is_empty() {
|
||||
// Threadless logs are budgeted separately per process UUID.
|
||||
let mut over_limit_processes_query = QueryBuilder::<Sqlite>::new(
|
||||
"SELECT process_uuid FROM logs WHERE thread_id IS NULL AND process_uuid IN (",
|
||||
);
|
||||
{
|
||||
let mut separated = over_limit_processes_query.separated(", ");
|
||||
for process_uuid in &threadless_process_uuids {
|
||||
separated.push_bind(*process_uuid);
|
||||
}
|
||||
}
|
||||
over_limit_processes_query.push(") GROUP BY process_uuid HAVING SUM(");
|
||||
over_limit_processes_query.push("estimated_bytes");
|
||||
over_limit_processes_query.push(") > ");
|
||||
over_limit_processes_query.push_bind(LOG_PARTITION_SIZE_LIMIT_BYTES);
|
||||
let over_limit_process_uuids: Vec<String> = over_limit_processes_query
|
||||
.build()
|
||||
.fetch_all(&mut *tx)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|row| row.try_get("process_uuid"))
|
||||
.collect::<Result<_, _>>()?;
|
||||
if !over_limit_process_uuids.is_empty() {
|
||||
// Same strict cap policy as thread pruning, but only for
|
||||
// threadless rows in the affected process UUIDs.
|
||||
let mut prune_threadless_process_logs = QueryBuilder::<Sqlite>::new(
|
||||
r#"
|
||||
DELETE FROM logs
|
||||
WHERE id IN (
|
||||
SELECT id
|
||||
FROM (
|
||||
SELECT
|
||||
id,
|
||||
SUM(
|
||||
"#,
|
||||
);
|
||||
prune_threadless_process_logs.push("estimated_bytes");
|
||||
prune_threadless_process_logs.push(
|
||||
r#"
|
||||
) OVER (
|
||||
PARTITION BY process_uuid
|
||||
ORDER BY ts DESC, ts_nanos DESC, id DESC
|
||||
) AS cumulative_bytes
|
||||
FROM logs
|
||||
WHERE thread_id IS NULL
|
||||
AND process_uuid IN (
|
||||
"#,
|
||||
);
|
||||
{
|
||||
let mut separated = prune_threadless_process_logs.separated(", ");
|
||||
for process_uuid in &over_limit_process_uuids {
|
||||
separated.push_bind(process_uuid);
|
||||
}
|
||||
}
|
||||
prune_threadless_process_logs.push(
|
||||
r#"
|
||||
)
|
||||
)
|
||||
WHERE cumulative_bytes >
|
||||
"#,
|
||||
);
|
||||
prune_threadless_process_logs.push_bind(LOG_PARTITION_SIZE_LIMIT_BYTES);
|
||||
prune_threadless_process_logs.push("\n)");
|
||||
prune_threadless_process_logs
|
||||
.build()
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
if has_threadless_null_process_uuid {
|
||||
// Rows without a process UUID still need a cap; treat NULL as its
|
||||
// own threadless partition.
|
||||
let mut null_process_usage_query = QueryBuilder::<Sqlite>::new("SELECT SUM(");
|
||||
null_process_usage_query.push("estimated_bytes");
|
||||
null_process_usage_query.push(
|
||||
") AS total_bytes FROM logs WHERE thread_id IS NULL AND process_uuid IS NULL",
|
||||
);
|
||||
let total_null_process_bytes: Option<i64> = null_process_usage_query
|
||||
.build()
|
||||
.fetch_one(&mut *tx)
|
||||
.await?
|
||||
.try_get("total_bytes")?;
|
||||
|
||||
if total_null_process_bytes.unwrap_or(0) > LOG_PARTITION_SIZE_LIMIT_BYTES {
|
||||
let mut prune_threadless_null_process_logs = QueryBuilder::<Sqlite>::new(
|
||||
r#"
|
||||
DELETE FROM logs
|
||||
WHERE id IN (
|
||||
SELECT id
|
||||
FROM (
|
||||
SELECT
|
||||
id,
|
||||
SUM(
|
||||
"#,
|
||||
);
|
||||
prune_threadless_null_process_logs.push("estimated_bytes");
|
||||
prune_threadless_null_process_logs.push(
|
||||
r#"
|
||||
) OVER (
|
||||
PARTITION BY process_uuid
|
||||
ORDER BY ts DESC, ts_nanos DESC, id DESC
|
||||
) AS cumulative_bytes
|
||||
FROM logs
|
||||
WHERE thread_id IS NULL
|
||||
AND process_uuid IS NULL
|
||||
)
|
||||
WHERE cumulative_bytes >
|
||||
"#,
|
||||
);
|
||||
prune_threadless_null_process_logs.push_bind(LOG_PARTITION_SIZE_LIMIT_BYTES);
|
||||
prune_threadless_null_process_logs.push("\n)");
|
||||
prune_threadless_null_process_logs
|
||||
.build()
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -2553,6 +2786,300 @@ VALUES (?, ?, ?, ?, ?)
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn insert_logs_prunes_old_rows_when_thread_exceeds_size_limit() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
|
||||
let six_mebibytes = "a".repeat(6 * 1024 * 1024);
|
||||
runtime
|
||||
.insert_logs(&[
|
||||
LogEntry {
|
||||
ts: 1,
|
||||
ts_nanos: 0,
|
||||
level: "INFO".to_string(),
|
||||
target: "cli".to_string(),
|
||||
message: Some(six_mebibytes.clone()),
|
||||
thread_id: Some("thread-1".to_string()),
|
||||
process_uuid: Some("proc-1".to_string()),
|
||||
file: Some("main.rs".to_string()),
|
||||
line: Some(1),
|
||||
module_path: Some("mod".to_string()),
|
||||
},
|
||||
LogEntry {
|
||||
ts: 2,
|
||||
ts_nanos: 0,
|
||||
level: "INFO".to_string(),
|
||||
target: "cli".to_string(),
|
||||
message: Some(six_mebibytes.clone()),
|
||||
thread_id: Some("thread-1".to_string()),
|
||||
process_uuid: Some("proc-1".to_string()),
|
||||
file: Some("main.rs".to_string()),
|
||||
line: Some(2),
|
||||
module_path: Some("mod".to_string()),
|
||||
},
|
||||
])
|
||||
.await
|
||||
.expect("insert test logs");
|
||||
|
||||
let rows = runtime
|
||||
.query_logs(&LogQuery {
|
||||
thread_ids: vec!["thread-1".to_string()],
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.expect("query thread logs");
|
||||
|
||||
assert_eq!(rows.len(), 1);
|
||||
assert_eq!(rows[0].ts, 2);
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn insert_logs_prunes_single_thread_row_when_it_exceeds_size_limit() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
|
||||
let eleven_mebibytes = "d".repeat(11 * 1024 * 1024);
|
||||
runtime
|
||||
.insert_logs(&[LogEntry {
|
||||
ts: 1,
|
||||
ts_nanos: 0,
|
||||
level: "INFO".to_string(),
|
||||
target: "cli".to_string(),
|
||||
message: Some(eleven_mebibytes),
|
||||
thread_id: Some("thread-oversized".to_string()),
|
||||
process_uuid: Some("proc-1".to_string()),
|
||||
file: Some("main.rs".to_string()),
|
||||
line: Some(1),
|
||||
module_path: Some("mod".to_string()),
|
||||
}])
|
||||
.await
|
||||
.expect("insert test log");
|
||||
|
||||
let rows = runtime
|
||||
.query_logs(&LogQuery {
|
||||
thread_ids: vec!["thread-oversized".to_string()],
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.expect("query thread logs");
|
||||
|
||||
assert!(rows.is_empty());
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn insert_logs_prunes_threadless_rows_per_process_uuid_only() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
|
||||
let six_mebibytes = "b".repeat(6 * 1024 * 1024);
|
||||
runtime
|
||||
.insert_logs(&[
|
||||
LogEntry {
|
||||
ts: 1,
|
||||
ts_nanos: 0,
|
||||
level: "INFO".to_string(),
|
||||
target: "cli".to_string(),
|
||||
message: Some(six_mebibytes.clone()),
|
||||
thread_id: None,
|
||||
process_uuid: Some("proc-1".to_string()),
|
||||
file: Some("main.rs".to_string()),
|
||||
line: Some(1),
|
||||
module_path: Some("mod".to_string()),
|
||||
},
|
||||
LogEntry {
|
||||
ts: 2,
|
||||
ts_nanos: 0,
|
||||
level: "INFO".to_string(),
|
||||
target: "cli".to_string(),
|
||||
message: Some(six_mebibytes.clone()),
|
||||
thread_id: None,
|
||||
process_uuid: Some("proc-1".to_string()),
|
||||
file: Some("main.rs".to_string()),
|
||||
line: Some(2),
|
||||
module_path: Some("mod".to_string()),
|
||||
},
|
||||
LogEntry {
|
||||
ts: 3,
|
||||
ts_nanos: 0,
|
||||
level: "INFO".to_string(),
|
||||
target: "cli".to_string(),
|
||||
message: Some(six_mebibytes),
|
||||
thread_id: Some("thread-1".to_string()),
|
||||
process_uuid: Some("proc-1".to_string()),
|
||||
file: Some("main.rs".to_string()),
|
||||
line: Some(3),
|
||||
module_path: Some("mod".to_string()),
|
||||
},
|
||||
])
|
||||
.await
|
||||
.expect("insert test logs");
|
||||
|
||||
let rows = runtime
|
||||
.query_logs(&LogQuery {
|
||||
thread_ids: vec!["thread-1".to_string()],
|
||||
include_threadless: true,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.expect("query thread and threadless logs");
|
||||
|
||||
let mut timestamps: Vec<i64> = rows.into_iter().map(|row| row.ts).collect();
|
||||
timestamps.sort_unstable();
|
||||
assert_eq!(timestamps, vec![2, 3]);
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn insert_logs_prunes_single_threadless_process_row_when_it_exceeds_size_limit() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
|
||||
let eleven_mebibytes = "e".repeat(11 * 1024 * 1024);
|
||||
runtime
|
||||
.insert_logs(&[LogEntry {
|
||||
ts: 1,
|
||||
ts_nanos: 0,
|
||||
level: "INFO".to_string(),
|
||||
target: "cli".to_string(),
|
||||
message: Some(eleven_mebibytes),
|
||||
thread_id: None,
|
||||
process_uuid: Some("proc-oversized".to_string()),
|
||||
file: Some("main.rs".to_string()),
|
||||
line: Some(1),
|
||||
module_path: Some("mod".to_string()),
|
||||
}])
|
||||
.await
|
||||
.expect("insert test log");
|
||||
|
||||
let rows = runtime
|
||||
.query_logs(&LogQuery {
|
||||
include_threadless: true,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.expect("query threadless logs");
|
||||
|
||||
assert!(rows.is_empty());
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn insert_logs_prunes_threadless_rows_with_null_process_uuid() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
|
||||
let six_mebibytes = "c".repeat(6 * 1024 * 1024);
|
||||
runtime
|
||||
.insert_logs(&[
|
||||
LogEntry {
|
||||
ts: 1,
|
||||
ts_nanos: 0,
|
||||
level: "INFO".to_string(),
|
||||
target: "cli".to_string(),
|
||||
message: Some(six_mebibytes.clone()),
|
||||
thread_id: None,
|
||||
process_uuid: None,
|
||||
file: Some("main.rs".to_string()),
|
||||
line: Some(1),
|
||||
module_path: Some("mod".to_string()),
|
||||
},
|
||||
LogEntry {
|
||||
ts: 2,
|
||||
ts_nanos: 0,
|
||||
level: "INFO".to_string(),
|
||||
target: "cli".to_string(),
|
||||
message: Some(six_mebibytes),
|
||||
thread_id: None,
|
||||
process_uuid: None,
|
||||
file: Some("main.rs".to_string()),
|
||||
line: Some(2),
|
||||
module_path: Some("mod".to_string()),
|
||||
},
|
||||
LogEntry {
|
||||
ts: 3,
|
||||
ts_nanos: 0,
|
||||
level: "INFO".to_string(),
|
||||
target: "cli".to_string(),
|
||||
message: Some("small".to_string()),
|
||||
thread_id: None,
|
||||
process_uuid: Some("proc-1".to_string()),
|
||||
file: Some("main.rs".to_string()),
|
||||
line: Some(3),
|
||||
module_path: Some("mod".to_string()),
|
||||
},
|
||||
])
|
||||
.await
|
||||
.expect("insert test logs");
|
||||
|
||||
let rows = runtime
|
||||
.query_logs(&LogQuery {
|
||||
include_threadless: true,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.expect("query threadless logs");
|
||||
|
||||
let mut timestamps: Vec<i64> = rows.into_iter().map(|row| row.ts).collect();
|
||||
timestamps.sort_unstable();
|
||||
assert_eq!(timestamps, vec![2, 3]);
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn insert_logs_prunes_single_threadless_null_process_row_when_it_exceeds_limit() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
|
||||
let eleven_mebibytes = "f".repeat(11 * 1024 * 1024);
|
||||
runtime
|
||||
.insert_logs(&[LogEntry {
|
||||
ts: 1,
|
||||
ts_nanos: 0,
|
||||
level: "INFO".to_string(),
|
||||
target: "cli".to_string(),
|
||||
message: Some(eleven_mebibytes),
|
||||
thread_id: None,
|
||||
process_uuid: None,
|
||||
file: Some("main.rs".to_string()),
|
||||
line: Some(1),
|
||||
module_path: Some("mod".to_string()),
|
||||
}])
|
||||
.await
|
||||
.expect("insert test log");
|
||||
|
||||
let rows = runtime
|
||||
.query_logs(&LogQuery {
|
||||
include_threadless: true,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.expect("query threadless logs");
|
||||
|
||||
assert!(rows.is_empty());
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
fn test_thread_metadata(
|
||||
codex_home: &Path,
|
||||
thread_id: ThreadId,
|
||||
|
||||
Reference in New Issue
Block a user