mirror of
https://github.com/openai/codex.git
synced 2026-04-18 11:44:46 +00:00
Compare commits
5 Commits
etraut/mes
...
etraut/mes
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7232276c36 | ||
|
|
bd66ec7c15 | ||
|
|
a015bfed5b | ||
|
|
b7f72a0dc4 | ||
|
|
b1236df88e |
@@ -5739,6 +5739,10 @@ impl CodexMessageProcessor {
|
||||
let _ = ctx
|
||||
.mark_archived(thread_id, archived_path.as_path(), Utc::now())
|
||||
.await;
|
||||
let thread_id_str = thread_id.to_string();
|
||||
if let Err(err) = ctx.delete_thread_delivery_state(&thread_id_str).await {
|
||||
warn!("failed to delete delivery state for archived thread {thread_id}: {err}");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -20,6 +20,9 @@ use codex_app_server_protocol::TurnStartResponse;
|
||||
use codex_app_server_protocol::UserInput;
|
||||
use codex_core::ARCHIVED_SESSIONS_SUBDIR;
|
||||
use codex_core::find_thread_path_by_id_str;
|
||||
use codex_state::ExternalMessageCreateParams;
|
||||
use codex_state::StateRuntime;
|
||||
use codex_state::ThreadTimerCreateParams;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::path::Path;
|
||||
use tempfile::TempDir;
|
||||
@@ -118,6 +121,26 @@ async fn thread_archive_requires_materialized_rollout() -> Result<()> {
|
||||
.expect("expected rollout path for thread id to exist after materialization");
|
||||
assert_paths_match_on_disk(&discovered_path, &rollout_path)?;
|
||||
|
||||
let state_db = StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".to_string())
|
||||
.await
|
||||
.expect("initialize state db");
|
||||
state_db
|
||||
.create_external_message(&message_params("message-1", &thread.id))
|
||||
.await
|
||||
.expect("create archived thread message");
|
||||
state_db
|
||||
.create_external_message(&message_params("message-2", "other-thread"))
|
||||
.await
|
||||
.expect("create other thread message");
|
||||
state_db
|
||||
.create_thread_timer(&timer_params("timer-1", &thread.id))
|
||||
.await
|
||||
.expect("create archived thread timer");
|
||||
state_db
|
||||
.create_thread_timer(&timer_params("timer-2", "other-thread"))
|
||||
.await
|
||||
.expect("create other thread timer");
|
||||
|
||||
let archive_id = mcp
|
||||
.send_thread_archive_request(ThreadArchiveParams {
|
||||
thread_id: thread.id.clone(),
|
||||
@@ -156,6 +179,40 @@ async fn thread_archive_requires_materialized_rollout() -> Result<()> {
|
||||
"expected archived rollout path {} to exist",
|
||||
archived_rollout_path.display()
|
||||
);
|
||||
assert_eq!(
|
||||
state_db
|
||||
.list_external_messages(&thread.id)
|
||||
.await
|
||||
.expect("list archived thread messages"),
|
||||
Vec::new()
|
||||
);
|
||||
assert_eq!(
|
||||
state_db
|
||||
.list_thread_timers(&thread.id)
|
||||
.await
|
||||
.expect("list archived thread timers"),
|
||||
Vec::new()
|
||||
);
|
||||
assert_eq!(
|
||||
state_db
|
||||
.list_external_messages("other-thread")
|
||||
.await
|
||||
.expect("list other thread messages")
|
||||
.into_iter()
|
||||
.map(|message| message.id)
|
||||
.collect::<Vec<_>>(),
|
||||
vec!["message-2".to_string()]
|
||||
);
|
||||
assert_eq!(
|
||||
state_db
|
||||
.list_thread_timers("other-thread")
|
||||
.await
|
||||
.expect("list other thread timers")
|
||||
.into_iter()
|
||||
.map(|timer| timer.id)
|
||||
.collect::<Vec<_>>(),
|
||||
vec!["timer-2".to_string()]
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -323,3 +380,34 @@ fn assert_paths_match_on_disk(actual: &Path, expected: &Path) -> std::io::Result
|
||||
assert_eq!(actual, expected);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn message_params(id: &str, thread_id: &str) -> ExternalMessageCreateParams {
|
||||
ExternalMessageCreateParams {
|
||||
id: id.to_string(),
|
||||
thread_id: thread_id.to_string(),
|
||||
source: "external".to_string(),
|
||||
content: "do something".to_string(),
|
||||
instructions: None,
|
||||
meta_json: "{}".to_string(),
|
||||
delivery: "after-turn".to_string(),
|
||||
queued_at: 100,
|
||||
}
|
||||
}
|
||||
|
||||
fn timer_params(id: &str, thread_id: &str) -> ThreadTimerCreateParams {
|
||||
ThreadTimerCreateParams {
|
||||
id: id.to_string(),
|
||||
thread_id: thread_id.to_string(),
|
||||
source: "agent".to_string(),
|
||||
client_id: "codex-tui".to_string(),
|
||||
trigger_json: r#"{"kind":"delay","seconds":10,"repeat":false}"#.to_string(),
|
||||
content: "run tests".to_string(),
|
||||
instructions: None,
|
||||
meta_json: "{}".to_string(),
|
||||
delivery: "after-turn".to_string(),
|
||||
created_at: 100,
|
||||
next_run_at: Some(110),
|
||||
last_run_at: None,
|
||||
pending_run: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,11 +40,14 @@ mod app_cmd;
|
||||
mod desktop_app;
|
||||
mod marketplace_cmd;
|
||||
mod mcp_cmd;
|
||||
mod queue_cmd;
|
||||
#[cfg(not(windows))]
|
||||
mod wsl_paths;
|
||||
|
||||
use crate::marketplace_cmd::MarketplaceCli;
|
||||
use crate::mcp_cmd::McpCli;
|
||||
use crate::queue_cmd::QueueCommand;
|
||||
use crate::queue_cmd::run_queue_command;
|
||||
|
||||
use codex_core::config::Config;
|
||||
use codex_core::config::ConfigOverrides;
|
||||
@@ -66,6 +69,8 @@ use codex_terminal_detection::TerminalName;
|
||||
version,
|
||||
// If a sub‑command is given, ignore requirements of the default args.
|
||||
subcommand_negates_reqs = true,
|
||||
// Prefer a recognized subcommand over the default interactive prompt positional.
|
||||
subcommand_precedence_over_arg = true,
|
||||
// The executable is sometimes invoked via a platform‑specific name like
|
||||
// `codex-x86_64-unknown-linux-musl`, but the help output should always use
|
||||
// the generic `codex` command name that users run.
|
||||
@@ -140,6 +145,9 @@ enum Subcommand {
|
||||
/// Resume a previous interactive session (picker by default; use --last to continue the most recent).
|
||||
Resume(ResumeCommand),
|
||||
|
||||
/// Queue a message to an existing thread.
|
||||
Queue(QueueCommand),
|
||||
|
||||
/// Fork a previous interactive session (picker by default; use --last to fork the most recent).
|
||||
Fork(ForkCommand),
|
||||
|
||||
@@ -807,6 +815,14 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
|
||||
.await?;
|
||||
handle_app_exit(exit_info)?;
|
||||
}
|
||||
Some(Subcommand::Queue(queue_cli)) => {
|
||||
reject_remote_mode_for_subcommand(
|
||||
root_remote.as_deref(),
|
||||
root_remote_auth_token_env.as_deref(),
|
||||
"queue",
|
||||
)?;
|
||||
run_queue_command(queue_cli, &root_config_overrides, &interactive).await?;
|
||||
}
|
||||
Some(Subcommand::Fork(ForkCommand {
|
||||
session_id,
|
||||
last,
|
||||
|
||||
301
codex-rs/cli/src/queue_cmd.rs
Normal file
301
codex-rs/cli/src/queue_cmd.rs
Normal file
@@ -0,0 +1,301 @@
|
||||
//! Implementation for the `codex queue` command.
|
||||
//!
|
||||
//! The top-level CLI module owns command routing; this module owns the
|
||||
//! queue-specific policy for resolving target threads and writing immediate
|
||||
//! messages into the SQLite state database.
|
||||
|
||||
use clap::Parser;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::config::ConfigOverrides;
|
||||
use codex_core::timers::TimerDelivery;
|
||||
use codex_features::Feature;
|
||||
use codex_features::Features;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_state::StateRuntime;
|
||||
use codex_tui::Cli as TuiCli;
|
||||
use codex_utils_cli::CliConfigOverrides;
|
||||
use std::path::Path;
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
pub(crate) struct QueueCommand {
|
||||
/// Target thread id.
|
||||
#[arg(long = "thread", value_name = "THREAD_ID")]
|
||||
thread: String,
|
||||
|
||||
/// Message text.
|
||||
#[arg(long = "message", value_name = "TEXT")]
|
||||
message: String,
|
||||
}
|
||||
|
||||
pub(crate) async fn run_queue_command(
|
||||
cmd: QueueCommand,
|
||||
root_config_overrides: &CliConfigOverrides,
|
||||
interactive: &TuiCli,
|
||||
) -> anyhow::Result<()> {
|
||||
let cli_kv_overrides = root_config_overrides
|
||||
.parse_overrides()
|
||||
.map_err(anyhow::Error::msg)?;
|
||||
let overrides = ConfigOverrides {
|
||||
config_profile: interactive.config_profile.clone(),
|
||||
..Default::default()
|
||||
};
|
||||
let config =
|
||||
Config::load_with_cli_overrides_and_harness_overrides(cli_kv_overrides, overrides).await?;
|
||||
validate_queue_feature_flags(&config.features)?;
|
||||
let thread_id = resolve_queue_thread_id(config.codex_home.as_path(), &cmd.thread).await?;
|
||||
let state_db =
|
||||
StateRuntime::init(config.sqlite_home.clone(), config.model_provider_id.clone()).await?;
|
||||
let delivery = TimerDelivery::AfterTurn;
|
||||
|
||||
let message_params = codex_state::ExternalMessageCreateParams::new(
|
||||
thread_id,
|
||||
"external".to_string(),
|
||||
cmd.message,
|
||||
/*instructions*/ None,
|
||||
"{}".to_string(),
|
||||
delivery.as_str().to_string(),
|
||||
unix_timestamp_now()?,
|
||||
);
|
||||
state_db.create_external_message(&message_params).await?;
|
||||
remove_queued_message_if_thread_missing(
|
||||
config.codex_home.as_path(),
|
||||
&state_db,
|
||||
&message_params.thread_id,
|
||||
&message_params.id,
|
||||
)
|
||||
.await?;
|
||||
println!(
|
||||
"Queued message {} for thread {}.",
|
||||
message_params.id, message_params.thread_id
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn remove_queued_message_if_thread_missing(
|
||||
codex_home: &Path,
|
||||
state_db: &StateRuntime,
|
||||
thread_id: &str,
|
||||
message_id: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
if codex_core::find_thread_path_by_id_str(codex_home, thread_id)
|
||||
.await?
|
||||
.is_some()
|
||||
{
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
state_db
|
||||
.delete_external_message(thread_id, message_id)
|
||||
.await?;
|
||||
anyhow::bail!("thread `{thread_id}` was archived before queued work could be created");
|
||||
}
|
||||
|
||||
fn validate_queue_feature_flags(features: &Features) -> anyhow::Result<()> {
|
||||
if !features.enabled(Feature::QueuedMessages) {
|
||||
anyhow::bail!("codex queue requires the queued_messages feature");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn resolve_queue_thread_id(codex_home: &Path, target: &str) -> anyhow::Result<String> {
|
||||
if let Ok(thread_id) = ThreadId::from_string(target) {
|
||||
if codex_core::find_thread_path_by_id_str(codex_home, &thread_id.to_string())
|
||||
.await?
|
||||
.is_none()
|
||||
{
|
||||
anyhow::bail!("no thread with id `{thread_id}`");
|
||||
}
|
||||
return Ok(thread_id.to_string());
|
||||
}
|
||||
|
||||
let mut active_thread_ids = Vec::new();
|
||||
for thread_id in codex_core::find_thread_ids_by_name(codex_home, target).await? {
|
||||
if codex_core::find_thread_path_by_id_str(codex_home, &thread_id.to_string())
|
||||
.await?
|
||||
.is_some()
|
||||
{
|
||||
active_thread_ids.push(thread_id);
|
||||
}
|
||||
}
|
||||
|
||||
match active_thread_ids.as_slice() {
|
||||
[] => anyhow::bail!("no thread named `{target}`"),
|
||||
[thread_id] => Ok(thread_id.to_string()),
|
||||
_ => anyhow::bail!("more than one thread is named `{target}`; use a thread id instead"),
|
||||
}
|
||||
}
|
||||
|
||||
fn unix_timestamp_now() -> anyhow::Result<i64> {
|
||||
let duration = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.map_err(|err| anyhow::anyhow!("system clock is before unix epoch: {err}"))?;
|
||||
i64::try_from(duration.as_secs()).map_err(|_| anyhow::anyhow!("current time is out of range"))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::MultitoolCli;
|
||||
use crate::Subcommand;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tempfile::TempDir;
|
||||
|
||||
fn write_test_rollout(codex_home: &Path, thread_id: ThreadId) {
|
||||
let sessions_dir = codex_home
|
||||
.join("sessions")
|
||||
.join("2026")
|
||||
.join("04")
|
||||
.join("10");
|
||||
std::fs::create_dir_all(&sessions_dir).expect("create sessions dir");
|
||||
std::fs::write(
|
||||
sessions_dir.join(format!("rollout-2026-04-10T12-00-00-{thread_id}.jsonl")),
|
||||
"",
|
||||
)
|
||||
.expect("write rollout");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn queue_command_parses_immediate_message() {
|
||||
let cli = MultitoolCli::try_parse_from([
|
||||
"codex",
|
||||
"queue",
|
||||
"--thread",
|
||||
"thread-1",
|
||||
"--message",
|
||||
"do work",
|
||||
])
|
||||
.expect("parse");
|
||||
let Some(Subcommand::Queue(cmd)) = cli.subcommand else {
|
||||
unreachable!()
|
||||
};
|
||||
|
||||
assert_eq!(cmd.thread, "thread-1");
|
||||
assert_eq!(cmd.message, "do work");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn queue_without_required_args_is_subcommand_error() {
|
||||
let err = MultitoolCli::try_parse_from(["codex", "queue"])
|
||||
.expect_err("queue should be parsed as a subcommand, not as an interactive prompt");
|
||||
assert_eq!(err.kind(), clap::error::ErrorKind::MissingRequiredArgument);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn queue_requires_queued_messages_feature() {
|
||||
let mut features = Features::with_defaults();
|
||||
|
||||
let err = validate_queue_feature_flags(&features)
|
||||
.expect_err("queue should require queued_messages");
|
||||
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
"codex queue requires the queued_messages feature"
|
||||
);
|
||||
|
||||
features.enable(Feature::QueuedMessages);
|
||||
validate_queue_feature_flags(&features)
|
||||
.expect("queued messages feature should permit immediate queue command");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn queue_thread_resolves_thread_name() {
|
||||
let temp = TempDir::new().expect("tempdir");
|
||||
let thread_id = ThreadId::new();
|
||||
write_test_rollout(temp.path(), thread_id);
|
||||
codex_core::append_thread_name(temp.path(), thread_id, "named-thread")
|
||||
.await
|
||||
.expect("append thread name");
|
||||
|
||||
assert_eq!(
|
||||
resolve_queue_thread_id(temp.path(), "named-thread")
|
||||
.await
|
||||
.expect("resolve"),
|
||||
thread_id.to_string()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn queue_thread_id_requires_existing_thread() {
|
||||
let temp = TempDir::new().expect("tempdir");
|
||||
let thread_id = ThreadId::new();
|
||||
write_test_rollout(temp.path(), thread_id);
|
||||
|
||||
assert_eq!(
|
||||
resolve_queue_thread_id(temp.path(), &thread_id.to_string())
|
||||
.await
|
||||
.expect("resolve"),
|
||||
thread_id.to_string()
|
||||
);
|
||||
|
||||
let missing = ThreadId::new();
|
||||
assert_eq!(
|
||||
resolve_queue_thread_id(temp.path(), &missing.to_string())
|
||||
.await
|
||||
.expect_err("missing id should fail")
|
||||
.to_string(),
|
||||
format!("no thread with id `{missing}`")
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn queue_thread_name_rejects_missing_and_ambiguous_names() {
|
||||
let temp = TempDir::new().expect("tempdir");
|
||||
let first = ThreadId::new();
|
||||
let second = ThreadId::new();
|
||||
write_test_rollout(temp.path(), first);
|
||||
write_test_rollout(temp.path(), second);
|
||||
codex_core::append_thread_name(temp.path(), first, "same")
|
||||
.await
|
||||
.expect("append first name");
|
||||
codex_core::append_thread_name(temp.path(), second, "same")
|
||||
.await
|
||||
.expect("append second name");
|
||||
|
||||
assert_eq!(
|
||||
resolve_queue_thread_id(temp.path(), "missing")
|
||||
.await
|
||||
.expect_err("missing name should fail")
|
||||
.to_string(),
|
||||
"no thread named `missing`"
|
||||
);
|
||||
assert_eq!(
|
||||
resolve_queue_thread_id(temp.path(), "same")
|
||||
.await
|
||||
.expect_err("ambiguous name should fail")
|
||||
.to_string(),
|
||||
"more than one thread is named `same`; use a thread id instead"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn queue_thread_name_ignores_names_without_rollouts() {
|
||||
let temp = TempDir::new().expect("tempdir");
|
||||
let stale = ThreadId::new();
|
||||
let active = ThreadId::new();
|
||||
write_test_rollout(temp.path(), active);
|
||||
codex_core::append_thread_name(temp.path(), stale, "same")
|
||||
.await
|
||||
.expect("append stale name");
|
||||
codex_core::append_thread_name(temp.path(), stale, "stale")
|
||||
.await
|
||||
.expect("append stale-only name");
|
||||
codex_core::append_thread_name(temp.path(), active, "same")
|
||||
.await
|
||||
.expect("append active name");
|
||||
|
||||
assert_eq!(
|
||||
resolve_queue_thread_id(temp.path(), "same")
|
||||
.await
|
||||
.expect("resolve"),
|
||||
active.to_string()
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
resolve_queue_thread_id(temp.path(), "stale")
|
||||
.await
|
||||
.expect_err("stale name should fail")
|
||||
.to_string(),
|
||||
"no thread named `stale`"
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -434,6 +434,9 @@
|
||||
"prevent_idle_sleep": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"queued_messages": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"realtime_conversation": {
|
||||
"type": "boolean"
|
||||
},
|
||||
@@ -2286,6 +2289,9 @@
|
||||
"prevent_idle_sleep": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"queued_messages": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"realtime_conversation": {
|
||||
"type": "boolean"
|
||||
},
|
||||
|
||||
@@ -1,26 +1,29 @@
|
||||
//! SQLite-backed runtime bridge for thread timers.
|
||||
//! SQLite-backed runtime bridge for thread timers and queued thread messages.
|
||||
//!
|
||||
//! This module connects [`Session`] to the persistent state database, keeps the
|
||||
//! in-memory timer scheduler reconciled with cross-instance changes, and
|
||||
//! converts claimed timers into generated model input plus transcript-safe
|
||||
//! delivery events.
|
||||
//! converts claimed timers/messages into generated model input plus
|
||||
//! transcript-safe delivery events.
|
||||
//!
|
||||
//! Timer delivery must be single-consumer across all harness instances for a
|
||||
//! thread, even though those instances share the same SQLite state database. In
|
||||
//! other words, if two app or CLI processes are attached to the same thread, a
|
||||
//! due timer should be injected by at most one of them.
|
||||
//! Timer and queued-message delivery must be single-consumer across all harness
|
||||
//! instances for a thread, even though those instances share the same SQLite
|
||||
//! state database. In other words, if two app or CLI processes are attached to
|
||||
//! the same thread, a due timer or queued message should be injected by at most
|
||||
//! one of them.
|
||||
//!
|
||||
//! The database is the authority for that guarantee. Before this module
|
||||
//! Timers are first selected from local memory, but delivery proceeds only if
|
||||
//! the matching SQLite claim also wins: one-shot timers are deleted as part of
|
||||
//! the claim, and recurring timers are updated with the expected previous run
|
||||
//! timestamp so competing instances cannot both observe and persist the same
|
||||
//! run. If another instance wins the database race, this runtime refreshes its
|
||||
//! local timer view from SQLite and skips delivery.
|
||||
//! delivers a queued message, it calls into the state layer to atomically claim
|
||||
//! and remove the next eligible row. Timers are first selected from local
|
||||
//! memory, but delivery proceeds only if the matching SQLite claim also wins:
|
||||
//! one-shot timers are deleted as part of the claim, and recurring timers are
|
||||
//! updated with the expected previous run timestamp so competing instances
|
||||
//! cannot both observe and persist the same run. If another instance wins the
|
||||
//! database race, this runtime refreshes its local timer view from SQLite and
|
||||
//! skips delivery.
|
||||
//!
|
||||
//! The local `timer_start_in_progress` flag is still useful, but only as an
|
||||
//! in-process guard. It prevents this [`Session`] from starting multiple pending
|
||||
//! timer deliveries concurrently; cross-process exclusivity comes from
|
||||
//! timer/message deliveries concurrently; cross-process exclusivity comes from
|
||||
//! the SQLite claim operations above.
|
||||
|
||||
use super::BackgroundEventEvent;
|
||||
@@ -28,6 +31,7 @@ use super::Event;
|
||||
use super::EventMsg;
|
||||
use super::INITIAL_SUBMIT_ID;
|
||||
use super::Session;
|
||||
use crate::injected_message::InjectedMessage;
|
||||
use crate::injected_message::MessagePayload;
|
||||
use crate::pending_input::PendingInputItem;
|
||||
use crate::timers::ClaimedTimer;
|
||||
@@ -66,6 +70,17 @@ enum TimerDbSyncStatus {
|
||||
Changed,
|
||||
}
|
||||
|
||||
enum PendingMessageStart {
|
||||
Started,
|
||||
NotReady,
|
||||
None,
|
||||
}
|
||||
|
||||
enum PendingMessageClaim {
|
||||
Claimed(Box<PendingInputItem>, TimerDelivery),
|
||||
NotReady,
|
||||
}
|
||||
|
||||
fn db_timer_to_persisted_timer(row: codex_state::ThreadTimer) -> Option<PersistedTimer> {
|
||||
let trigger = match serde_json::from_str(&row.trigger_json) {
|
||||
Ok(trigger) => trigger,
|
||||
@@ -219,6 +234,10 @@ impl Session {
|
||||
{
|
||||
return;
|
||||
}
|
||||
match self.maybe_start_pending_message().await {
|
||||
PendingMessageStart::Started => return,
|
||||
PendingMessageStart::NotReady | PendingMessageStart::None => {}
|
||||
}
|
||||
self.try_start_pending_timer(RecurringTimerPolicy::IncludeAll)
|
||||
.await;
|
||||
}
|
||||
@@ -263,6 +282,111 @@ impl Session {
|
||||
true
|
||||
}
|
||||
|
||||
async fn maybe_start_pending_message(self: &Arc<Self>) -> PendingMessageStart {
|
||||
let Some(claim) = self.claim_next_message_for_delivery().await else {
|
||||
return PendingMessageStart::None;
|
||||
};
|
||||
let PendingMessageClaim::Claimed(input_item, delivery) = claim else {
|
||||
return PendingMessageStart::NotReady;
|
||||
};
|
||||
let input_item = *input_item;
|
||||
|
||||
match delivery {
|
||||
TimerDelivery::SteerCurrentTurn => {
|
||||
if !self
|
||||
.inject_message_into_active_turn(input_item.clone())
|
||||
.await
|
||||
{
|
||||
self.queue_pending_input_for_next_turn(vec![input_item])
|
||||
.await;
|
||||
self.maybe_start_turn_for_pending_work().await;
|
||||
}
|
||||
}
|
||||
TimerDelivery::AfterTurn => {
|
||||
self.queue_pending_input_for_next_turn(vec![input_item])
|
||||
.await;
|
||||
self.maybe_start_turn_for_pending_work().await;
|
||||
}
|
||||
}
|
||||
*self.timer_start_in_progress.lock().await = false;
|
||||
PendingMessageStart::Started
|
||||
}
|
||||
|
||||
async fn claim_next_message_for_delivery(self: &Arc<Self>) -> Option<PendingMessageClaim> {
|
||||
if !self.queued_messages_feature_enabled() {
|
||||
return None;
|
||||
}
|
||||
let mut timer_start_in_progress = self.timer_start_in_progress.lock().await;
|
||||
if *timer_start_in_progress {
|
||||
return None;
|
||||
}
|
||||
*timer_start_in_progress = true;
|
||||
drop(timer_start_in_progress);
|
||||
|
||||
let has_pending_turn_inputs = self.has_queued_response_items_for_next_turn().await
|
||||
|| self.has_trigger_turn_mailbox_items().await;
|
||||
let (has_active_turn, active_turn_is_regular) = {
|
||||
let active_turn = self.active_turn.lock().await;
|
||||
let has_active_turn = active_turn.is_some();
|
||||
let active_turn_is_regular = active_turn
|
||||
.as_ref()
|
||||
.and_then(|turn| turn.tasks.first())
|
||||
.is_some_and(|(_, task)| matches!(task.kind, crate::state::TaskKind::Regular));
|
||||
(has_active_turn, active_turn_is_regular)
|
||||
};
|
||||
let can_after_turn = !has_active_turn && !has_pending_turn_inputs;
|
||||
let can_steer_current_turn = active_turn_is_regular;
|
||||
let state_db = match self.timer_state_db().await {
|
||||
Ok(state_db) => state_db,
|
||||
Err(err) => {
|
||||
warn!("failed to claim queued message from sqlite: {err}");
|
||||
*self.timer_start_in_progress.lock().await = false;
|
||||
return None;
|
||||
}
|
||||
};
|
||||
self.start_timer_db_sync_task(state_db.clone());
|
||||
|
||||
loop {
|
||||
let claim = match state_db
|
||||
.claim_next_external_message(
|
||||
&self.thread_id_string(),
|
||||
can_after_turn,
|
||||
can_steer_current_turn,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(claim) => claim,
|
||||
Err(err) => {
|
||||
warn!("failed to claim queued message from sqlite: {err}");
|
||||
*self.timer_start_in_progress.lock().await = false;
|
||||
return None;
|
||||
}
|
||||
};
|
||||
match claim {
|
||||
Some(codex_state::ExternalMessageClaim::Claimed(row)) => {
|
||||
let (message, delivery) = match InjectedMessage::from_external_row(row) {
|
||||
Ok(parsed) => parsed,
|
||||
Err(err) => {
|
||||
warn!("{err}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let input_item =
|
||||
PendingInputItem::injected(message.prompt_input_item(), message.event());
|
||||
return Some(PendingMessageClaim::Claimed(Box::new(input_item), delivery));
|
||||
}
|
||||
Some(codex_state::ExternalMessageClaim::Invalid { id, reason }) => {
|
||||
warn!("dropped invalid queued message {id}: {reason}");
|
||||
continue;
|
||||
}
|
||||
Some(codex_state::ExternalMessageClaim::NotReady) | None => {
|
||||
*self.timer_start_in_progress.lock().await = false;
|
||||
return claim.map(|_| PendingMessageClaim::NotReady);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn claim_next_timer_for_delivery(
|
||||
self: &Arc<Self>,
|
||||
recurring_timer_policy: RecurringTimerPolicy,
|
||||
@@ -310,6 +434,10 @@ impl Session {
|
||||
}
|
||||
|
||||
async fn inject_timer_into_active_turn(&self, item: PendingInputItem) -> bool {
|
||||
self.inject_message_into_active_turn(item).await
|
||||
}
|
||||
|
||||
async fn inject_message_into_active_turn(&self, item: PendingInputItem) -> bool {
|
||||
let active = self.active_turn.lock().await;
|
||||
let Some(active_turn) = active.as_ref() else {
|
||||
return false;
|
||||
@@ -634,8 +762,12 @@ impl Session {
|
||||
self.features.enabled(Feature::Timers)
|
||||
}
|
||||
|
||||
fn queued_messages_feature_enabled(&self) -> bool {
|
||||
self.features.enabled(Feature::QueuedMessages)
|
||||
}
|
||||
|
||||
fn timer_db_sync_feature_enabled(&self) -> bool {
|
||||
self.timers_feature_enabled()
|
||||
self.timers_feature_enabled() || self.queued_messages_feature_enabled()
|
||||
}
|
||||
|
||||
fn spawn_restored_timer_tasks(self: &Arc<Self>, restored_tasks: Vec<RestoredTimerTask>) {
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
|
||||
#![allow(dead_code)]
|
||||
|
||||
use crate::timers::TimerDelivery;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
use codex_protocol::protocol::InjectedMessageEvent;
|
||||
@@ -43,6 +44,22 @@ pub(crate) enum InjectedMessage {
|
||||
}
|
||||
|
||||
impl InjectedMessage {
|
||||
pub(crate) fn from_external_row(
|
||||
row: codex_state::ExternalMessage,
|
||||
) -> Result<(Self, TimerDelivery), String> {
|
||||
let delivery = serde_json::from_value::<TimerDelivery>(serde_json::Value::String(
|
||||
row.delivery.clone(),
|
||||
))
|
||||
.map_err(|err| format!("invalid message delivery `{}`: {err}", row.delivery))?;
|
||||
Ok((
|
||||
Self::External {
|
||||
source: row.source,
|
||||
content: row.content,
|
||||
},
|
||||
delivery,
|
||||
))
|
||||
}
|
||||
|
||||
pub(crate) fn prompt_input_item(&self) -> ResponseInputItem {
|
||||
ResponseInputItem::Message {
|
||||
role: "user".to_string(),
|
||||
|
||||
@@ -171,6 +171,7 @@ pub use rollout::append_thread_name;
|
||||
pub use rollout::find_archived_thread_path_by_id_str;
|
||||
#[deprecated(note = "use find_thread_path_by_id_str")]
|
||||
pub use rollout::find_conversation_path_by_id_str;
|
||||
pub use rollout::find_thread_ids_by_name;
|
||||
pub use rollout::find_thread_meta_by_name_str;
|
||||
pub use rollout::find_thread_name_by_id;
|
||||
pub use rollout::find_thread_names_by_ids;
|
||||
|
||||
@@ -14,6 +14,7 @@ pub use codex_rollout::append_thread_name;
|
||||
pub use codex_rollout::find_archived_thread_path_by_id_str;
|
||||
#[deprecated(note = "use find_thread_path_by_id_str")]
|
||||
pub use codex_rollout::find_conversation_path_by_id_str;
|
||||
pub use codex_rollout::find_thread_ids_by_name;
|
||||
pub use codex_rollout::find_thread_meta_by_name_str;
|
||||
pub use codex_rollout::find_thread_name_by_id;
|
||||
pub use codex_rollout::find_thread_names_by_ids;
|
||||
|
||||
@@ -377,3 +377,253 @@ async fn list_timers_discovers_externally_inserted_timer() -> Result<()> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg_attr(
|
||||
target_os = "windows",
|
||||
ignore = "timer/message integration tests currently exceed the Windows Bazel job timeout"
|
||||
)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn queued_messages_feature_consumes_messages_without_timers() -> Result<()> {
|
||||
let server = start_mock_server().await;
|
||||
let mock = mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_assistant_message("msg-1", "queued turn"),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config
|
||||
.features
|
||||
.enable(Feature::QueuedMessages)
|
||||
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
|
||||
config
|
||||
.features
|
||||
.enable(Feature::Sqlite)
|
||||
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
let db = test.codex.state_db().expect("state db enabled");
|
||||
let thread_id = test.session_configured.session_id.to_string();
|
||||
db.create_external_message(&codex_state::ExternalMessageCreateParams::new(
|
||||
thread_id,
|
||||
"external".to_string(),
|
||||
"queued hello".to_string(),
|
||||
/*instructions*/ None,
|
||||
"{}".to_string(),
|
||||
TimerDelivery::AfterTurn.as_str().to_string(),
|
||||
Utc::now().timestamp(),
|
||||
))
|
||||
.await?;
|
||||
|
||||
wait_for_event_with_timeout(
|
||||
&test.codex,
|
||||
|event| match event {
|
||||
EventMsg::InjectedMessage(event) => {
|
||||
event.source == "external" && event.content == "queued hello"
|
||||
}
|
||||
_ => false,
|
||||
},
|
||||
TIMER_INTEGRATION_TIMEOUT,
|
||||
)
|
||||
.await;
|
||||
wait_for_event_with_timeout(
|
||||
&test.codex,
|
||||
|event| matches!(event, EventMsg::TurnComplete(_)),
|
||||
TIMER_INTEGRATION_TIMEOUT,
|
||||
)
|
||||
.await;
|
||||
|
||||
let requests = mock.requests();
|
||||
assert_eq!(requests.len(), 1);
|
||||
assert!(requests.iter().any(|request| {
|
||||
request
|
||||
.message_input_texts("user")
|
||||
.iter()
|
||||
.any(|message| message.contains("<content>\nqueued hello\n</content>"))
|
||||
}));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg_attr(
|
||||
target_os = "windows",
|
||||
ignore = "timer/message integration tests currently exceed the Windows Bazel job timeout"
|
||||
)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn queued_message_runs_after_idle_recurring_timer() -> Result<()> {
|
||||
let server = start_mock_server().await;
|
||||
let mock = mount_sse_sequence(
|
||||
&server,
|
||||
vec![
|
||||
sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_assistant_message("msg-1", "timer turn"),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
sse(vec![
|
||||
ev_response_created("resp-2"),
|
||||
ev_assistant_message("msg-2", "queued turn"),
|
||||
ev_completed("resp-2"),
|
||||
]),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config
|
||||
.features
|
||||
.enable(Feature::Timers)
|
||||
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
|
||||
config
|
||||
.features
|
||||
.enable(Feature::QueuedMessages)
|
||||
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
|
||||
config
|
||||
.features
|
||||
.enable(Feature::Sqlite)
|
||||
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
let db = test.codex.state_db().expect("state db enabled");
|
||||
let timer = test
|
||||
.codex
|
||||
.create_timer(
|
||||
ThreadTimerTrigger::Delay {
|
||||
seconds: 0,
|
||||
repeat: Some(true),
|
||||
},
|
||||
MessagePayload {
|
||||
content: "keep going".to_string(),
|
||||
instructions: None,
|
||||
meta: Default::default(),
|
||||
},
|
||||
TimerDelivery::AfterTurn,
|
||||
)
|
||||
.await
|
||||
.map_err(|err| anyhow!("{err}"))?;
|
||||
wait_for_event_with_timeout(
|
||||
&test.codex,
|
||||
|event| match event {
|
||||
EventMsg::InjectedMessage(event) => event.source == format!("timer {}", timer.id),
|
||||
_ => false,
|
||||
},
|
||||
TIMER_INTEGRATION_TIMEOUT,
|
||||
)
|
||||
.await;
|
||||
let thread_id = test.session_configured.session_id.to_string();
|
||||
db.create_external_message(&codex_state::ExternalMessageCreateParams::new(
|
||||
thread_id,
|
||||
"external".to_string(),
|
||||
"queued hello".to_string(),
|
||||
/*instructions*/ None,
|
||||
"{}".to_string(),
|
||||
TimerDelivery::AfterTurn.as_str().to_string(),
|
||||
Utc::now().timestamp(),
|
||||
))
|
||||
.await?;
|
||||
assert!(
|
||||
test.codex
|
||||
.delete_timer(&timer.id)
|
||||
.await
|
||||
.map_err(|err| anyhow!("{err}"))?,
|
||||
"test should delete the idle recurring timer before it can schedule another turn"
|
||||
);
|
||||
|
||||
wait_for_event_with_timeout(
|
||||
&test.codex,
|
||||
|event| matches!(event, EventMsg::TurnComplete(_)),
|
||||
TIMER_INTEGRATION_TIMEOUT,
|
||||
)
|
||||
.await;
|
||||
wait_for_event_with_timeout(
|
||||
&test.codex,
|
||||
|event| match event {
|
||||
EventMsg::InjectedMessage(event) => event.source == "external",
|
||||
_ => false,
|
||||
},
|
||||
TIMER_INTEGRATION_TIMEOUT,
|
||||
)
|
||||
.await;
|
||||
wait_for_event_with_timeout(
|
||||
&test.codex,
|
||||
|event| matches!(event, EventMsg::TurnComplete(_)),
|
||||
TIMER_INTEGRATION_TIMEOUT,
|
||||
)
|
||||
.await;
|
||||
|
||||
let requests = mock.requests();
|
||||
assert!(
|
||||
requests.len() >= 2,
|
||||
"expected timer and queued-message turns to run"
|
||||
);
|
||||
assert!(
|
||||
requests[0]
|
||||
.message_input_texts("user")
|
||||
.iter()
|
||||
.any(|message| message.contains("<content>\nTimer fired: keep going\n</content>"))
|
||||
);
|
||||
assert!(
|
||||
requests[1]
|
||||
.message_input_texts("user")
|
||||
.iter()
|
||||
.any(|message| message.contains("<content>\nqueued hello\n</content>"))
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg_attr(
|
||||
target_os = "windows",
|
||||
ignore = "timer/message integration tests currently exceed the Windows Bazel job timeout"
|
||||
)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn queued_messages_feature_disabled_leaves_messages_queued() -> Result<()> {
|
||||
let server = start_mock_server().await;
|
||||
let mock = mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_assistant_message("msg-1", "first turn"),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config
|
||||
.features
|
||||
.enable(Feature::Sqlite)
|
||||
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
let db = test.codex.state_db().expect("state db enabled");
|
||||
let thread_id = test.session_configured.session_id.to_string();
|
||||
db.create_external_message(&codex_state::ExternalMessageCreateParams::new(
|
||||
thread_id.clone(),
|
||||
"external".to_string(),
|
||||
"queued hello".to_string(),
|
||||
/*instructions*/ None,
|
||||
"{}".to_string(),
|
||||
TimerDelivery::AfterTurn.as_str().to_string(),
|
||||
Utc::now().timestamp(),
|
||||
))
|
||||
.await?;
|
||||
|
||||
test.submit_turn("start").await?;
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
|
||||
assert_eq!(mock.requests().len(), 1);
|
||||
assert!(
|
||||
db.claim_next_external_message(
|
||||
&thread_id, /*can_after_turn*/ true, /*can_steer_current_turn*/ true,
|
||||
)
|
||||
.await?
|
||||
.is_some()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -174,6 +174,8 @@ pub enum Feature {
|
||||
Artifact,
|
||||
/// Enable per-thread persistent timer scheduling tools and APIs.
|
||||
Timers,
|
||||
/// Enable queued message creation and delivery.
|
||||
QueuedMessages,
|
||||
/// Enable Fast mode selection in the TUI and request layer.
|
||||
FastMode,
|
||||
/// Enable experimental realtime voice conversation mode in the TUI.
|
||||
@@ -865,6 +867,12 @@ pub const FEATURES: &[FeatureSpec] = &[
|
||||
stage: Stage::UnderDevelopment,
|
||||
default_enabled: false,
|
||||
},
|
||||
FeatureSpec {
|
||||
id: Feature::QueuedMessages,
|
||||
key: "queued_messages",
|
||||
stage: Stage::UnderDevelopment,
|
||||
default_enabled: false,
|
||||
},
|
||||
FeatureSpec {
|
||||
id: Feature::FastMode,
|
||||
key: "fast_mode",
|
||||
|
||||
@@ -159,6 +159,16 @@ fn timers_are_under_development() {
|
||||
assert_eq!(Feature::Timers.default_enabled(), false);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn queued_messages_are_under_development() {
|
||||
assert_eq!(
|
||||
feature_for_key("queued_messages"),
|
||||
Some(Feature::QueuedMessages)
|
||||
);
|
||||
assert_eq!(Feature::QueuedMessages.stage(), Stage::UnderDevelopment);
|
||||
assert_eq!(Feature::QueuedMessages.default_enabled(), false);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tool_call_mcp_elicitation_is_stable_and_enabled_by_default() {
|
||||
assert_eq!(Feature::ToolCallMcpElicitation.stage(), Stage::Stable);
|
||||
|
||||
@@ -56,6 +56,7 @@ pub use recorder::RolloutRecorder;
|
||||
pub use recorder::RolloutRecorderParams;
|
||||
pub use recorder::append_rollout_item_to_path;
|
||||
pub use session_index::append_thread_name;
|
||||
pub use session_index::find_thread_ids_by_name;
|
||||
pub use session_index::find_thread_meta_by_name_str;
|
||||
pub use session_index::find_thread_name_by_id;
|
||||
pub use session_index::find_thread_names_by_ids;
|
||||
|
||||
@@ -112,6 +112,48 @@ pub async fn find_thread_names_by_ids(
|
||||
Ok(names)
|
||||
}
|
||||
|
||||
/// Find all thread ids whose latest indexed thread name exactly matches `name`.
|
||||
pub async fn find_thread_ids_by_name(
|
||||
codex_home: &Path,
|
||||
name: &str,
|
||||
) -> std::io::Result<Vec<ThreadId>> {
|
||||
let name = name.trim();
|
||||
if name.is_empty() {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
let path = session_index_path(codex_home);
|
||||
if !path.exists() {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
let file = tokio::fs::File::open(&path).await?;
|
||||
let reader = tokio::io::BufReader::new(file);
|
||||
let mut lines = reader.lines();
|
||||
let mut latest_names = HashMap::<ThreadId, String>::new();
|
||||
|
||||
while let Some(line) = lines.next_line().await? {
|
||||
let trimmed = line.trim();
|
||||
if trimmed.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let Ok(entry) = serde_json::from_str::<SessionIndexEntry>(trimmed) else {
|
||||
continue;
|
||||
};
|
||||
let thread_name = entry.thread_name.trim();
|
||||
if thread_name.is_empty() {
|
||||
continue;
|
||||
}
|
||||
latest_names.insert(entry.id, thread_name.to_string());
|
||||
}
|
||||
|
||||
let mut ids = latest_names
|
||||
.into_iter()
|
||||
.filter_map(|(thread_id, thread_name)| (thread_name == name).then_some(thread_id))
|
||||
.collect::<Vec<_>>();
|
||||
ids.sort_by_key(ToString::to_string);
|
||||
Ok(ids)
|
||||
}
|
||||
|
||||
/// Locate a recorded thread rollout and read its session metadata by thread name.
|
||||
/// Returns the newest indexed name that still has a readable rollout header.
|
||||
pub async fn find_thread_meta_by_name_str(
|
||||
|
||||
@@ -265,6 +265,47 @@ async fn find_thread_names_by_ids_prefers_latest_entry() -> std::io::Result<()>
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn find_thread_ids_by_name_uses_latest_name_per_thread() -> std::io::Result<()> {
|
||||
let temp = TempDir::new()?;
|
||||
let path = session_index_path(temp.path());
|
||||
let id1 = ThreadId::new();
|
||||
let id2 = ThreadId::new();
|
||||
let id3 = ThreadId::new();
|
||||
let lines = vec![
|
||||
SessionIndexEntry {
|
||||
id: id1,
|
||||
thread_name: "target".to_string(),
|
||||
updated_at: "2024-01-01T00:00:00Z".to_string(),
|
||||
},
|
||||
SessionIndexEntry {
|
||||
id: id2,
|
||||
thread_name: "target".to_string(),
|
||||
updated_at: "2024-01-01T00:00:00Z".to_string(),
|
||||
},
|
||||
SessionIndexEntry {
|
||||
id: id1,
|
||||
thread_name: "renamed".to_string(),
|
||||
updated_at: "2024-01-02T00:00:00Z".to_string(),
|
||||
},
|
||||
SessionIndexEntry {
|
||||
id: id3,
|
||||
thread_name: "target".to_string(),
|
||||
updated_at: "2024-01-03T00:00:00Z".to_string(),
|
||||
},
|
||||
];
|
||||
write_index(&path, &lines)?;
|
||||
|
||||
let found = find_thread_ids_by_name(temp.path(), "target").await?;
|
||||
let expected = {
|
||||
let mut ids = vec![id2, id3];
|
||||
ids.sort_by_key(ToString::to_string);
|
||||
ids
|
||||
};
|
||||
assert_eq!(found, expected);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn scan_index_finds_latest_match_among_mixed_entries() -> std::io::Result<()> {
|
||||
let temp = TempDir::new()?;
|
||||
|
||||
Reference in New Issue
Block a user