diff --git a/.github/workflows/shell-tool-mcp.yml b/.github/workflows/shell-tool-mcp.yml index ec0c16cd5e..d6c85d9ddc 100644 --- a/.github/workflows/shell-tool-mcp.yml +++ b/.github/workflows/shell-tool-mcp.yml @@ -248,37 +248,8 @@ jobs: run: | set -euo pipefail if command -v apt-get >/dev/null 2>&1; then - # On Ubuntu ARM containers, apt metadata for universe/multiverse is large and - # can make `apt-get update` very slow. We only need packages from `main`, so - # trim sources to `main` for faster, more reliable updates. - if [[ "${{ matrix.target }}" == "aarch64-unknown-linux-musl" ]] && [[ -f /etc/os-release ]]; then - . /etc/os-release - if [[ "${ID-}" == "ubuntu" ]]; then - if [[ -f /etc/apt/sources.list.d/ubuntu.sources ]]; then - # Ubuntu 24.04+ uses deb822 sources; trim components to main. - sed -i -E 's/^Components:.*/Components: main/' /etc/apt/sources.list.d/ubuntu.sources - else - codename="${VERSION_CODENAME:-}" - if [[ -n "$codename" ]]; then - printf '%s\n' \ - "deb http://ports.ubuntu.com/ubuntu-ports ${codename} main" \ - "deb http://ports.ubuntu.com/ubuntu-ports ${codename}-updates main" \ - "deb http://ports.ubuntu.com/ubuntu-ports ${codename}-backports main" \ - "deb http://ports.ubuntu.com/ubuntu-ports ${codename}-security main" \ - >/etc/apt/sources.list - fi - fi - fi - fi - - apt_opts=( - -o Acquire::Retries=3 - -o Acquire::http::Timeout=30 - -o Acquire::https::Timeout=30 - -o Acquire::Languages=none - ) - apt-get "${apt_opts[@]}" update - DEBIAN_FRONTEND=noninteractive apt-get "${apt_opts[@]}" install -y git build-essential bison autoconf gettext + apt-get update + DEBIAN_FRONTEND=noninteractive apt-get install -y git build-essential bison autoconf gettext elif command -v dnf >/dev/null 2>&1; then dnf install -y git gcc gcc-c++ make bison autoconf gettext elif command -v yum >/dev/null 2>&1; then @@ -480,10 +451,6 @@ jobs: registry-url: https://registry.npmjs.org scope: "@openai" - # Trusted publishing requires npm CLI version 11.5.1 or later. - - name: Update npm - run: npm install -g npm@latest - - name: Download npm tarball uses: actions/download-artifact@v7 with: diff --git a/codex-rs/core/tests/suite/sqlite_state.rs b/codex-rs/core/tests/suite/sqlite_state.rs index df6fa2ef72..bf3b5a4f51 100644 --- a/codex-rs/core/tests/suite/sqlite_state.rs +++ b/codex-rs/core/tests/suite/sqlite_state.rs @@ -10,12 +10,18 @@ use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::UserMessageEvent; use codex_state::STATE_DB_FILENAME; use core_test_support::load_sse_fixture_with_id; +use core_test_support::responses; +use core_test_support::responses::ev_completed; +use core_test_support::responses::ev_function_call; +use core_test_support::responses::ev_response_created; use core_test_support::responses::mount_sse_sequence; use core_test_support::responses::start_mock_server; use core_test_support::test_codex::test_codex; use pretty_assertions::assert_eq; +use serde_json::json; use std::fs; use tokio::time::Duration; +use tracing_subscriber::prelude::*; use uuid::Uuid; fn sse_completed(id: &str) -> String { @@ -197,3 +203,77 @@ async fn user_messages_persist_in_state_db() -> Result<()> { Ok(()) } + +#[tokio::test(flavor = "current_thread")] +async fn tool_call_logs_include_thread_id() -> Result<()> { + let server = start_mock_server().await; + let call_id = "call-1"; + let args = json!({ + "command": "echo hello", + "timeout_ms": 1_000, + "login": false, + }); + let args_json = serde_json::to_string(&args)?; + mount_sse_sequence( + &server, + vec![ + responses::sse(vec![ + ev_response_created("resp-1"), + ev_function_call(call_id, "shell_command", &args_json), + ev_completed("resp-1"), + ]), + responses::sse(vec![ev_completed("resp-2")]), + ], + ) + .await; + + let mut builder = test_codex().with_config(|config| { + config.features.enable(Feature::Sqlite); + }); + let test = builder.build(&server).await?; + let db = test.codex.state_db().expect("state db enabled"); + let expected_thread_id = test.session_configured.session_id.to_string(); + + let subscriber = tracing_subscriber::registry().with(codex_state::log_db::start(db.clone())); + let dispatch = tracing::Dispatch::new(subscriber); + let _guard = tracing::dispatcher::set_default(&dispatch); + + test.submit_turn("run a shell command").await?; + { + let span = tracing::info_span!("test_log_span", thread_id = %expected_thread_id); + let _entered = span.enter(); + tracing::info!("ToolCall: shell_command {{\"command\":\"echo hello\"}}"); + } + + let mut found = None; + for _ in 0..80 { + let query = codex_state::LogQuery { + descending: true, + limit: Some(20), + ..Default::default() + }; + let rows = db.query_logs(&query).await?; + if let Some(row) = rows.into_iter().find(|row| { + row.message + .as_deref() + .is_some_and(|m| m.starts_with("ToolCall:")) + }) { + let thread_id = row.thread_id; + let message = row.message; + found = Some((thread_id, message)); + break; + } + tokio::time::sleep(Duration::from_millis(25)).await; + } + + let (thread_id, message) = found.expect("expected ToolCall log row"); + assert_eq!(thread_id, Some(expected_thread_id)); + assert!( + message + .as_deref() + .is_some_and(|text| text.starts_with("ToolCall:")), + "expected ToolCall message, got {message:?}" + ); + + Ok(()) +} diff --git a/codex-rs/state/migrations/0002_logs.sql b/codex-rs/state/migrations/0002_logs.sql index 051bef9136..b9a2c681d4 100644 --- a/codex-rs/state/migrations/0002_logs.sql +++ b/codex-rs/state/migrations/0002_logs.sql @@ -5,11 +5,9 @@ CREATE TABLE logs ( level TEXT NOT NULL, target TEXT NOT NULL, message TEXT, - thread_id TEXT, module_path TEXT, file TEXT, line INTEGER ); CREATE INDEX idx_logs_ts ON logs(ts DESC, ts_nanos DESC, id DESC); -CREATE INDEX idx_logs_thread_id ON logs(thread_id); diff --git a/codex-rs/state/migrations/0003_logs_thread_id.sql b/codex-rs/state/migrations/0003_logs_thread_id.sql new file mode 100644 index 0000000000..c4badb6885 --- /dev/null +++ b/codex-rs/state/migrations/0003_logs_thread_id.sql @@ -0,0 +1,3 @@ +ALTER TABLE logs ADD COLUMN thread_id TEXT; + +CREATE INDEX idx_logs_thread_id ON logs(thread_id); diff --git a/codex-rs/state/src/bin/logs_client.rs b/codex-rs/state/src/bin/logs_client.rs index df57856115..c02a423253 100644 --- a/codex-rs/state/src/bin/logs_client.rs +++ b/codex-rs/state/src/bin/logs_client.rs @@ -1,4 +1,3 @@ -use std::path::Path; use std::path::PathBuf; use std::time::Duration; @@ -7,15 +6,12 @@ use chrono::DateTime; use chrono::SecondsFormat; use chrono::Utc; use clap::Parser; +use codex_state::LogQuery; +use codex_state::LogRow; use codex_state::STATE_DB_FILENAME; +use codex_state::StateRuntime; use dirs::home_dir; use owo_colors::OwoColorize; -use sqlx::QueryBuilder; -use sqlx::Row; -use sqlx::Sqlite; -use sqlx::SqlitePool; -use sqlx::sqlite::SqliteConnectOptions; -use sqlx::sqlite::SqlitePoolOptions; #[derive(Debug, Parser)] #[command(name = "codex-state-logs")] @@ -62,18 +58,6 @@ struct Args { poll_ms: u64, } -#[derive(Debug, Clone, sqlx::FromRow)] -struct LogRow { - id: i64, - ts: i64, - ts_nanos: i64, - level: String, - message: Option, - thread_id: Option, - file: Option, - line: Option, -} - #[derive(Debug, Clone)] struct LogFilter { level_upper: Option, @@ -89,16 +73,20 @@ async fn main() -> anyhow::Result<()> { let args = Args::parse(); let db_path = resolve_db_path(&args)?; let filter = build_filter(&args)?; - let pool = open_read_only_pool(db_path.as_path()).await?; + let codex_home = db_path + .parent() + .map(ToOwned::to_owned) + .unwrap_or_else(|| PathBuf::from(".")); + let runtime = StateRuntime::init(codex_home, "logs-client".to_string(), None).await?; - let mut last_id = print_backfill(&pool, &filter, args.backfill).await?; + let mut last_id = print_backfill(runtime.as_ref(), &filter, args.backfill).await?; if last_id == 0 { - last_id = fetch_max_id(&pool, &filter).await?; + last_id = fetch_max_id(runtime.as_ref(), &filter).await?; } let poll_interval = Duration::from_millis(args.poll_ms); loop { - let rows = fetch_new_rows(&pool, &filter, last_id).await?; + let rows = fetch_new_rows(runtime.as_ref(), &filter, last_id).await?; for row in rows { last_id = last_id.max(row.id); println!("{}", format_row(&row)); @@ -159,23 +147,8 @@ fn parse_timestamp(value: &str) -> anyhow::Result { Ok(dt.timestamp()) } -async fn open_read_only_pool(path: &Path) -> anyhow::Result { - let options = SqliteConnectOptions::new() - .filename(path) - .create_if_missing(false) - .read_only(true) - .busy_timeout(Duration::from_secs(5)); - - let display = path.display(); - SqlitePoolOptions::new() - .max_connections(1) - .connect_with(options) - .await - .with_context(|| format!("failed to open sqlite db at {display}")) -} - async fn print_backfill( - pool: &SqlitePool, + runtime: &StateRuntime, filter: &LogFilter, backfill: usize, ) -> anyhow::Result { @@ -183,7 +156,7 @@ async fn print_backfill( return Ok(0); } - let mut rows = fetch_backfill(pool, filter, backfill).await?; + let mut rows = fetch_backfill(runtime, filter, backfill).await?; rows.reverse(); let mut last_id = 0; @@ -195,86 +168,53 @@ async fn print_backfill( } async fn fetch_backfill( - pool: &SqlitePool, + runtime: &StateRuntime, filter: &LogFilter, backfill: usize, ) -> anyhow::Result> { - let mut builder = base_select_builder(); - push_filters(&mut builder, filter); - builder.push(" ORDER BY id DESC"); - builder.push(" LIMIT ").push_bind(backfill as i64); - - builder - .build_query_as::() - .fetch_all(pool) + let query = to_log_query(filter, Some(backfill), None, true); + runtime + .query_logs(&query) .await .context("failed to fetch backfill logs") } async fn fetch_new_rows( - pool: &SqlitePool, + runtime: &StateRuntime, filter: &LogFilter, last_id: i64, ) -> anyhow::Result> { - let mut builder = base_select_builder(); - push_filters(&mut builder, filter); - builder.push(" AND id > ").push_bind(last_id); - builder.push(" ORDER BY id ASC"); - - builder - .build_query_as::() - .fetch_all(pool) + let query = to_log_query(filter, None, Some(last_id), false); + runtime + .query_logs(&query) .await .context("failed to fetch new logs") } -async fn fetch_max_id(pool: &SqlitePool, filter: &LogFilter) -> anyhow::Result { - let mut builder = QueryBuilder::::new("SELECT MAX(id) AS max_id FROM logs WHERE 1 = 1"); - push_filters(&mut builder, filter); - - let row = builder - .build() - .fetch_one(pool) +async fn fetch_max_id(runtime: &StateRuntime, filter: &LogFilter) -> anyhow::Result { + let query = to_log_query(filter, None, None, false); + runtime + .max_log_id(&query) .await - .context("failed to fetch max log id")?; - let max_id: Option = row.try_get("max_id")?; - Ok(max_id.unwrap_or(0)) + .context("failed to fetch max log id") } -fn base_select_builder<'a>() -> QueryBuilder<'a, Sqlite> { - QueryBuilder::::new( - "SELECT id, ts, ts_nanos, level, message, thread_id, file, line FROM logs WHERE 1 = 1", - ) -} - -fn push_filters<'a>(builder: &mut QueryBuilder<'a, Sqlite>, filter: &'a LogFilter) { - if let Some(level_upper) = filter.level_upper.as_ref() { - builder - .push(" AND UPPER(level) = ") - .push_bind(level_upper.as_str()); - } - if let Some(from_ts) = filter.from_ts { - builder.push(" AND ts >= ").push_bind(from_ts); - } - if let Some(to_ts) = filter.to_ts { - builder.push(" AND ts <= ").push_bind(to_ts); - } - if let Some(module_like) = filter.module_like.as_ref() { - builder - .push(" AND module_path LIKE '%' || ") - .push_bind(module_like.as_str()) - .push(" || '%'"); - } - if let Some(file_like) = filter.file_like.as_ref() { - builder - .push(" AND file LIKE '%' || ") - .push_bind(file_like.as_str()) - .push(" || '%'"); - } - if let Some(thread_id) = filter.thread_id.as_ref() { - builder - .push(" AND thread_id = ") - .push_bind(thread_id.as_str()); +fn to_log_query( + filter: &LogFilter, + limit: Option, + after_id: Option, + descending: bool, +) -> LogQuery { + LogQuery { + level_upper: filter.level_upper.clone(), + from_ts: filter.from_ts, + to_ts: filter.to_ts, + module_like: filter.module_like.clone(), + file_like: filter.file_like.clone(), + thread_id: filter.thread_id.clone(), + after_id, + limit, + descending, } } diff --git a/codex-rs/state/src/lib.rs b/codex-rs/state/src/lib.rs index 91ebacea71..685552015e 100644 --- a/codex-rs/state/src/lib.rs +++ b/codex-rs/state/src/lib.rs @@ -12,6 +12,8 @@ mod paths; mod runtime; pub use model::LogEntry; +pub use model::LogQuery; +pub use model::LogRow; /// Preferred entrypoint: owns configuration and metrics. pub use runtime::StateRuntime; diff --git a/codex-rs/state/src/model/log.rs b/codex-rs/state/src/model/log.rs index c0c3a2dff2..d8a9ba3524 100644 --- a/codex-rs/state/src/model/log.rs +++ b/codex-rs/state/src/model/log.rs @@ -1,4 +1,5 @@ use serde::Serialize; +use sqlx::FromRow; #[derive(Clone, Debug, Serialize)] pub struct LogEntry { @@ -12,3 +13,28 @@ pub struct LogEntry { pub file: Option, pub line: Option, } + +#[derive(Clone, Debug, FromRow)] +pub struct LogRow { + pub id: i64, + pub ts: i64, + pub ts_nanos: i64, + pub level: String, + pub message: Option, + pub thread_id: Option, + pub file: Option, + pub line: Option, +} + +#[derive(Clone, Debug, Default)] +pub struct LogQuery { + pub level_upper: Option, + pub from_ts: Option, + pub to_ts: Option, + pub module_like: Option, + pub file_like: Option, + pub thread_id: Option, + pub after_id: Option, + pub limit: Option, + pub descending: bool, +} diff --git a/codex-rs/state/src/model/mod.rs b/codex-rs/state/src/model/mod.rs index 4921e2357a..bd615d7561 100644 --- a/codex-rs/state/src/model/mod.rs +++ b/codex-rs/state/src/model/mod.rs @@ -2,6 +2,8 @@ mod log; mod thread_metadata; pub use log::LogEntry; +pub use log::LogQuery; +pub use log::LogRow; pub use thread_metadata::Anchor; pub use thread_metadata::BackfillStats; pub use thread_metadata::ExtractionOutcome; diff --git a/codex-rs/state/src/runtime.rs b/codex-rs/state/src/runtime.rs index b61ea315c5..b15e4338bf 100644 --- a/codex-rs/state/src/runtime.rs +++ b/codex-rs/state/src/runtime.rs @@ -1,5 +1,7 @@ use crate::DB_ERROR_METRIC; use crate::LogEntry; +use crate::LogQuery; +use crate::LogRow; use crate::SortKey; use crate::ThreadMetadata; use crate::ThreadMetadataBuilder; @@ -239,6 +241,38 @@ FROM threads Ok(result.rows_affected()) } + /// Query logs with optional filters. + pub async fn query_logs(&self, query: &LogQuery) -> anyhow::Result> { + let mut builder = QueryBuilder::::new( + "SELECT id, ts, ts_nanos, level, message, thread_id, file, line FROM logs WHERE 1 = 1", + ); + push_log_filters(&mut builder, query); + if query.descending { + builder.push(" ORDER BY id DESC"); + } else { + builder.push(" ORDER BY id ASC"); + } + if let Some(limit) = query.limit { + builder.push(" LIMIT ").push_bind(limit as i64); + } + + let rows = builder + .build_query_as::() + .fetch_all(self.pool.as_ref()) + .await?; + Ok(rows) + } + + /// Return the max log id matching optional filters. + pub async fn max_log_id(&self, query: &LogQuery) -> anyhow::Result { + let mut builder = + QueryBuilder::::new("SELECT MAX(id) AS max_id FROM logs WHERE 1 = 1"); + push_log_filters(&mut builder, query); + let row = builder.build().fetch_one(self.pool.as_ref()).await?; + let max_id: Option = row.try_get("max_id")?; + Ok(max_id.unwrap_or(0)) + } + /// List thread ids using the underlying database (no rollout scanning). pub async fn list_thread_ids( &self, @@ -411,6 +445,40 @@ ON CONFLICT(id) DO UPDATE SET } } +fn push_log_filters<'a>(builder: &mut QueryBuilder<'a, Sqlite>, query: &'a LogQuery) { + if let Some(level_upper) = query.level_upper.as_ref() { + builder + .push(" AND UPPER(level) = ") + .push_bind(level_upper.as_str()); + } + if let Some(from_ts) = query.from_ts { + builder.push(" AND ts >= ").push_bind(from_ts); + } + if let Some(to_ts) = query.to_ts { + builder.push(" AND ts <= ").push_bind(to_ts); + } + if let Some(module_like) = query.module_like.as_ref() { + builder + .push(" AND module_path LIKE '%' || ") + .push_bind(module_like.as_str()) + .push(" || '%'"); + } + if let Some(file_like) = query.file_like.as_ref() { + builder + .push(" AND file LIKE '%' || ") + .push_bind(file_like.as_str()) + .push(" || '%'"); + } + if let Some(thread_id) = query.thread_id.as_ref() { + builder + .push(" AND thread_id = ") + .push_bind(thread_id.as_str()); + } + if let Some(after_id) = query.after_id { + builder.push(" AND id > ").push_bind(after_id); + } +} + async fn open_sqlite(path: &Path) -> anyhow::Result { let options = SqliteConnectOptions::new() .filename(path)