Compare commits

...

5 Commits

Author SHA1 Message Date
Eric Traut
7232276c36 codex: address PR review feedback (#17580) 2026-04-12 20:02:37 -07:00
Eric Traut
bd66ec7c15 codex: address PR review feedback (#17580) 2026-04-12 19:51:14 -07:00
Eric Traut
a015bfed5b codex: propagate external message rename (#17580) 2026-04-12 19:41:31 -07:00
Eric Traut
b7f72a0dc4 codex: address PR review feedback (#17580) 2026-04-12 19:41:31 -07:00
Eric Traut
b1236df88e codex: add queued external message delivery 2026-04-12 19:41:31 -07:00
15 changed files with 933 additions and 15 deletions

View File

@@ -5739,6 +5739,10 @@ impl CodexMessageProcessor {
let _ = ctx
.mark_archived(thread_id, archived_path.as_path(), Utc::now())
.await;
let thread_id_str = thread_id.to_string();
if let Err(err) = ctx.delete_thread_delivery_state(&thread_id_str).await {
warn!("failed to delete delivery state for archived thread {thread_id}: {err}");
}
}
Ok(())
}

View File

@@ -20,6 +20,9 @@ use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::UserInput;
use codex_core::ARCHIVED_SESSIONS_SUBDIR;
use codex_core::find_thread_path_by_id_str;
use codex_state::ExternalMessageCreateParams;
use codex_state::StateRuntime;
use codex_state::ThreadTimerCreateParams;
use pretty_assertions::assert_eq;
use std::path::Path;
use tempfile::TempDir;
@@ -118,6 +121,26 @@ async fn thread_archive_requires_materialized_rollout() -> Result<()> {
.expect("expected rollout path for thread id to exist after materialization");
assert_paths_match_on_disk(&discovered_path, &rollout_path)?;
let state_db = StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".to_string())
.await
.expect("initialize state db");
state_db
.create_external_message(&message_params("message-1", &thread.id))
.await
.expect("create archived thread message");
state_db
.create_external_message(&message_params("message-2", "other-thread"))
.await
.expect("create other thread message");
state_db
.create_thread_timer(&timer_params("timer-1", &thread.id))
.await
.expect("create archived thread timer");
state_db
.create_thread_timer(&timer_params("timer-2", "other-thread"))
.await
.expect("create other thread timer");
let archive_id = mcp
.send_thread_archive_request(ThreadArchiveParams {
thread_id: thread.id.clone(),
@@ -156,6 +179,40 @@ async fn thread_archive_requires_materialized_rollout() -> Result<()> {
"expected archived rollout path {} to exist",
archived_rollout_path.display()
);
assert_eq!(
state_db
.list_external_messages(&thread.id)
.await
.expect("list archived thread messages"),
Vec::new()
);
assert_eq!(
state_db
.list_thread_timers(&thread.id)
.await
.expect("list archived thread timers"),
Vec::new()
);
assert_eq!(
state_db
.list_external_messages("other-thread")
.await
.expect("list other thread messages")
.into_iter()
.map(|message| message.id)
.collect::<Vec<_>>(),
vec!["message-2".to_string()]
);
assert_eq!(
state_db
.list_thread_timers("other-thread")
.await
.expect("list other thread timers")
.into_iter()
.map(|timer| timer.id)
.collect::<Vec<_>>(),
vec!["timer-2".to_string()]
);
Ok(())
}
@@ -323,3 +380,34 @@ fn assert_paths_match_on_disk(actual: &Path, expected: &Path) -> std::io::Result
assert_eq!(actual, expected);
Ok(())
}
fn message_params(id: &str, thread_id: &str) -> ExternalMessageCreateParams {
ExternalMessageCreateParams {
id: id.to_string(),
thread_id: thread_id.to_string(),
source: "external".to_string(),
content: "do something".to_string(),
instructions: None,
meta_json: "{}".to_string(),
delivery: "after-turn".to_string(),
queued_at: 100,
}
}
fn timer_params(id: &str, thread_id: &str) -> ThreadTimerCreateParams {
ThreadTimerCreateParams {
id: id.to_string(),
thread_id: thread_id.to_string(),
source: "agent".to_string(),
client_id: "codex-tui".to_string(),
trigger_json: r#"{"kind":"delay","seconds":10,"repeat":false}"#.to_string(),
content: "run tests".to_string(),
instructions: None,
meta_json: "{}".to_string(),
delivery: "after-turn".to_string(),
created_at: 100,
next_run_at: Some(110),
last_run_at: None,
pending_run: false,
}
}

View File

@@ -40,11 +40,14 @@ mod app_cmd;
mod desktop_app;
mod marketplace_cmd;
mod mcp_cmd;
mod queue_cmd;
#[cfg(not(windows))]
mod wsl_paths;
use crate::marketplace_cmd::MarketplaceCli;
use crate::mcp_cmd::McpCli;
use crate::queue_cmd::QueueCommand;
use crate::queue_cmd::run_queue_command;
use codex_core::config::Config;
use codex_core::config::ConfigOverrides;
@@ -66,6 +69,8 @@ use codex_terminal_detection::TerminalName;
version,
// If a subcommand is given, ignore requirements of the default args.
subcommand_negates_reqs = true,
// Prefer a recognized subcommand over the default interactive prompt positional.
subcommand_precedence_over_arg = true,
// The executable is sometimes invoked via a platformspecific name like
// `codex-x86_64-unknown-linux-musl`, but the help output should always use
// the generic `codex` command name that users run.
@@ -140,6 +145,9 @@ enum Subcommand {
/// Resume a previous interactive session (picker by default; use --last to continue the most recent).
Resume(ResumeCommand),
/// Queue a message to an existing thread.
Queue(QueueCommand),
/// Fork a previous interactive session (picker by default; use --last to fork the most recent).
Fork(ForkCommand),
@@ -807,6 +815,14 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
.await?;
handle_app_exit(exit_info)?;
}
Some(Subcommand::Queue(queue_cli)) => {
reject_remote_mode_for_subcommand(
root_remote.as_deref(),
root_remote_auth_token_env.as_deref(),
"queue",
)?;
run_queue_command(queue_cli, &root_config_overrides, &interactive).await?;
}
Some(Subcommand::Fork(ForkCommand {
session_id,
last,

View File

@@ -0,0 +1,301 @@
//! Implementation for the `codex queue` command.
//!
//! The top-level CLI module owns command routing; this module owns the
//! queue-specific policy for resolving target threads and writing immediate
//! messages into the SQLite state database.
use clap::Parser;
use codex_core::config::Config;
use codex_core::config::ConfigOverrides;
use codex_core::timers::TimerDelivery;
use codex_features::Feature;
use codex_features::Features;
use codex_protocol::ThreadId;
use codex_state::StateRuntime;
use codex_tui::Cli as TuiCli;
use codex_utils_cli::CliConfigOverrides;
use std::path::Path;
#[derive(Debug, Parser)]
pub(crate) struct QueueCommand {
/// Target thread id.
#[arg(long = "thread", value_name = "THREAD_ID")]
thread: String,
/// Message text.
#[arg(long = "message", value_name = "TEXT")]
message: String,
}
pub(crate) async fn run_queue_command(
cmd: QueueCommand,
root_config_overrides: &CliConfigOverrides,
interactive: &TuiCli,
) -> anyhow::Result<()> {
let cli_kv_overrides = root_config_overrides
.parse_overrides()
.map_err(anyhow::Error::msg)?;
let overrides = ConfigOverrides {
config_profile: interactive.config_profile.clone(),
..Default::default()
};
let config =
Config::load_with_cli_overrides_and_harness_overrides(cli_kv_overrides, overrides).await?;
validate_queue_feature_flags(&config.features)?;
let thread_id = resolve_queue_thread_id(config.codex_home.as_path(), &cmd.thread).await?;
let state_db =
StateRuntime::init(config.sqlite_home.clone(), config.model_provider_id.clone()).await?;
let delivery = TimerDelivery::AfterTurn;
let message_params = codex_state::ExternalMessageCreateParams::new(
thread_id,
"external".to_string(),
cmd.message,
/*instructions*/ None,
"{}".to_string(),
delivery.as_str().to_string(),
unix_timestamp_now()?,
);
state_db.create_external_message(&message_params).await?;
remove_queued_message_if_thread_missing(
config.codex_home.as_path(),
&state_db,
&message_params.thread_id,
&message_params.id,
)
.await?;
println!(
"Queued message {} for thread {}.",
message_params.id, message_params.thread_id
);
Ok(())
}
async fn remove_queued_message_if_thread_missing(
codex_home: &Path,
state_db: &StateRuntime,
thread_id: &str,
message_id: &str,
) -> anyhow::Result<()> {
if codex_core::find_thread_path_by_id_str(codex_home, thread_id)
.await?
.is_some()
{
return Ok(());
}
state_db
.delete_external_message(thread_id, message_id)
.await?;
anyhow::bail!("thread `{thread_id}` was archived before queued work could be created");
}
fn validate_queue_feature_flags(features: &Features) -> anyhow::Result<()> {
if !features.enabled(Feature::QueuedMessages) {
anyhow::bail!("codex queue requires the queued_messages feature");
}
Ok(())
}
async fn resolve_queue_thread_id(codex_home: &Path, target: &str) -> anyhow::Result<String> {
if let Ok(thread_id) = ThreadId::from_string(target) {
if codex_core::find_thread_path_by_id_str(codex_home, &thread_id.to_string())
.await?
.is_none()
{
anyhow::bail!("no thread with id `{thread_id}`");
}
return Ok(thread_id.to_string());
}
let mut active_thread_ids = Vec::new();
for thread_id in codex_core::find_thread_ids_by_name(codex_home, target).await? {
if codex_core::find_thread_path_by_id_str(codex_home, &thread_id.to_string())
.await?
.is_some()
{
active_thread_ids.push(thread_id);
}
}
match active_thread_ids.as_slice() {
[] => anyhow::bail!("no thread named `{target}`"),
[thread_id] => Ok(thread_id.to_string()),
_ => anyhow::bail!("more than one thread is named `{target}`; use a thread id instead"),
}
}
fn unix_timestamp_now() -> anyhow::Result<i64> {
let duration = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_err(|err| anyhow::anyhow!("system clock is before unix epoch: {err}"))?;
i64::try_from(duration.as_secs()).map_err(|_| anyhow::anyhow!("current time is out of range"))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::MultitoolCli;
use crate::Subcommand;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
fn write_test_rollout(codex_home: &Path, thread_id: ThreadId) {
let sessions_dir = codex_home
.join("sessions")
.join("2026")
.join("04")
.join("10");
std::fs::create_dir_all(&sessions_dir).expect("create sessions dir");
std::fs::write(
sessions_dir.join(format!("rollout-2026-04-10T12-00-00-{thread_id}.jsonl")),
"",
)
.expect("write rollout");
}
#[test]
fn queue_command_parses_immediate_message() {
let cli = MultitoolCli::try_parse_from([
"codex",
"queue",
"--thread",
"thread-1",
"--message",
"do work",
])
.expect("parse");
let Some(Subcommand::Queue(cmd)) = cli.subcommand else {
unreachable!()
};
assert_eq!(cmd.thread, "thread-1");
assert_eq!(cmd.message, "do work");
}
#[test]
fn queue_without_required_args_is_subcommand_error() {
let err = MultitoolCli::try_parse_from(["codex", "queue"])
.expect_err("queue should be parsed as a subcommand, not as an interactive prompt");
assert_eq!(err.kind(), clap::error::ErrorKind::MissingRequiredArgument);
}
#[test]
fn queue_requires_queued_messages_feature() {
let mut features = Features::with_defaults();
let err = validate_queue_feature_flags(&features)
.expect_err("queue should require queued_messages");
assert_eq!(
err.to_string(),
"codex queue requires the queued_messages feature"
);
features.enable(Feature::QueuedMessages);
validate_queue_feature_flags(&features)
.expect("queued messages feature should permit immediate queue command");
}
#[tokio::test]
async fn queue_thread_resolves_thread_name() {
let temp = TempDir::new().expect("tempdir");
let thread_id = ThreadId::new();
write_test_rollout(temp.path(), thread_id);
codex_core::append_thread_name(temp.path(), thread_id, "named-thread")
.await
.expect("append thread name");
assert_eq!(
resolve_queue_thread_id(temp.path(), "named-thread")
.await
.expect("resolve"),
thread_id.to_string()
);
}
#[tokio::test]
async fn queue_thread_id_requires_existing_thread() {
let temp = TempDir::new().expect("tempdir");
let thread_id = ThreadId::new();
write_test_rollout(temp.path(), thread_id);
assert_eq!(
resolve_queue_thread_id(temp.path(), &thread_id.to_string())
.await
.expect("resolve"),
thread_id.to_string()
);
let missing = ThreadId::new();
assert_eq!(
resolve_queue_thread_id(temp.path(), &missing.to_string())
.await
.expect_err("missing id should fail")
.to_string(),
format!("no thread with id `{missing}`")
);
}
#[tokio::test]
async fn queue_thread_name_rejects_missing_and_ambiguous_names() {
let temp = TempDir::new().expect("tempdir");
let first = ThreadId::new();
let second = ThreadId::new();
write_test_rollout(temp.path(), first);
write_test_rollout(temp.path(), second);
codex_core::append_thread_name(temp.path(), first, "same")
.await
.expect("append first name");
codex_core::append_thread_name(temp.path(), second, "same")
.await
.expect("append second name");
assert_eq!(
resolve_queue_thread_id(temp.path(), "missing")
.await
.expect_err("missing name should fail")
.to_string(),
"no thread named `missing`"
);
assert_eq!(
resolve_queue_thread_id(temp.path(), "same")
.await
.expect_err("ambiguous name should fail")
.to_string(),
"more than one thread is named `same`; use a thread id instead"
);
}
#[tokio::test]
async fn queue_thread_name_ignores_names_without_rollouts() {
let temp = TempDir::new().expect("tempdir");
let stale = ThreadId::new();
let active = ThreadId::new();
write_test_rollout(temp.path(), active);
codex_core::append_thread_name(temp.path(), stale, "same")
.await
.expect("append stale name");
codex_core::append_thread_name(temp.path(), stale, "stale")
.await
.expect("append stale-only name");
codex_core::append_thread_name(temp.path(), active, "same")
.await
.expect("append active name");
assert_eq!(
resolve_queue_thread_id(temp.path(), "same")
.await
.expect("resolve"),
active.to_string()
);
assert_eq!(
resolve_queue_thread_id(temp.path(), "stale")
.await
.expect_err("stale name should fail")
.to_string(),
"no thread named `stale`"
);
}
}

View File

@@ -434,6 +434,9 @@
"prevent_idle_sleep": {
"type": "boolean"
},
"queued_messages": {
"type": "boolean"
},
"realtime_conversation": {
"type": "boolean"
},
@@ -2286,6 +2289,9 @@
"prevent_idle_sleep": {
"type": "boolean"
},
"queued_messages": {
"type": "boolean"
},
"realtime_conversation": {
"type": "boolean"
},

View File

@@ -1,26 +1,29 @@
//! SQLite-backed runtime bridge for thread timers.
//! SQLite-backed runtime bridge for thread timers and queued thread messages.
//!
//! This module connects [`Session`] to the persistent state database, keeps the
//! in-memory timer scheduler reconciled with cross-instance changes, and
//! converts claimed timers into generated model input plus transcript-safe
//! delivery events.
//! converts claimed timers/messages into generated model input plus
//! transcript-safe delivery events.
//!
//! Timer delivery must be single-consumer across all harness instances for a
//! thread, even though those instances share the same SQLite state database. In
//! other words, if two app or CLI processes are attached to the same thread, a
//! due timer should be injected by at most one of them.
//! Timer and queued-message delivery must be single-consumer across all harness
//! instances for a thread, even though those instances share the same SQLite
//! state database. In other words, if two app or CLI processes are attached to
//! the same thread, a due timer or queued message should be injected by at most
//! one of them.
//!
//! The database is the authority for that guarantee. Before this module
//! Timers are first selected from local memory, but delivery proceeds only if
//! the matching SQLite claim also wins: one-shot timers are deleted as part of
//! the claim, and recurring timers are updated with the expected previous run
//! timestamp so competing instances cannot both observe and persist the same
//! run. If another instance wins the database race, this runtime refreshes its
//! local timer view from SQLite and skips delivery.
//! delivers a queued message, it calls into the state layer to atomically claim
//! and remove the next eligible row. Timers are first selected from local
//! memory, but delivery proceeds only if the matching SQLite claim also wins:
//! one-shot timers are deleted as part of the claim, and recurring timers are
//! updated with the expected previous run timestamp so competing instances
//! cannot both observe and persist the same run. If another instance wins the
//! database race, this runtime refreshes its local timer view from SQLite and
//! skips delivery.
//!
//! The local `timer_start_in_progress` flag is still useful, but only as an
//! in-process guard. It prevents this [`Session`] from starting multiple pending
//! timer deliveries concurrently; cross-process exclusivity comes from
//! timer/message deliveries concurrently; cross-process exclusivity comes from
//! the SQLite claim operations above.
use super::BackgroundEventEvent;
@@ -28,6 +31,7 @@ use super::Event;
use super::EventMsg;
use super::INITIAL_SUBMIT_ID;
use super::Session;
use crate::injected_message::InjectedMessage;
use crate::injected_message::MessagePayload;
use crate::pending_input::PendingInputItem;
use crate::timers::ClaimedTimer;
@@ -66,6 +70,17 @@ enum TimerDbSyncStatus {
Changed,
}
enum PendingMessageStart {
Started,
NotReady,
None,
}
enum PendingMessageClaim {
Claimed(Box<PendingInputItem>, TimerDelivery),
NotReady,
}
fn db_timer_to_persisted_timer(row: codex_state::ThreadTimer) -> Option<PersistedTimer> {
let trigger = match serde_json::from_str(&row.trigger_json) {
Ok(trigger) => trigger,
@@ -219,6 +234,10 @@ impl Session {
{
return;
}
match self.maybe_start_pending_message().await {
PendingMessageStart::Started => return,
PendingMessageStart::NotReady | PendingMessageStart::None => {}
}
self.try_start_pending_timer(RecurringTimerPolicy::IncludeAll)
.await;
}
@@ -263,6 +282,111 @@ impl Session {
true
}
async fn maybe_start_pending_message(self: &Arc<Self>) -> PendingMessageStart {
let Some(claim) = self.claim_next_message_for_delivery().await else {
return PendingMessageStart::None;
};
let PendingMessageClaim::Claimed(input_item, delivery) = claim else {
return PendingMessageStart::NotReady;
};
let input_item = *input_item;
match delivery {
TimerDelivery::SteerCurrentTurn => {
if !self
.inject_message_into_active_turn(input_item.clone())
.await
{
self.queue_pending_input_for_next_turn(vec![input_item])
.await;
self.maybe_start_turn_for_pending_work().await;
}
}
TimerDelivery::AfterTurn => {
self.queue_pending_input_for_next_turn(vec![input_item])
.await;
self.maybe_start_turn_for_pending_work().await;
}
}
*self.timer_start_in_progress.lock().await = false;
PendingMessageStart::Started
}
async fn claim_next_message_for_delivery(self: &Arc<Self>) -> Option<PendingMessageClaim> {
if !self.queued_messages_feature_enabled() {
return None;
}
let mut timer_start_in_progress = self.timer_start_in_progress.lock().await;
if *timer_start_in_progress {
return None;
}
*timer_start_in_progress = true;
drop(timer_start_in_progress);
let has_pending_turn_inputs = self.has_queued_response_items_for_next_turn().await
|| self.has_trigger_turn_mailbox_items().await;
let (has_active_turn, active_turn_is_regular) = {
let active_turn = self.active_turn.lock().await;
let has_active_turn = active_turn.is_some();
let active_turn_is_regular = active_turn
.as_ref()
.and_then(|turn| turn.tasks.first())
.is_some_and(|(_, task)| matches!(task.kind, crate::state::TaskKind::Regular));
(has_active_turn, active_turn_is_regular)
};
let can_after_turn = !has_active_turn && !has_pending_turn_inputs;
let can_steer_current_turn = active_turn_is_regular;
let state_db = match self.timer_state_db().await {
Ok(state_db) => state_db,
Err(err) => {
warn!("failed to claim queued message from sqlite: {err}");
*self.timer_start_in_progress.lock().await = false;
return None;
}
};
self.start_timer_db_sync_task(state_db.clone());
loop {
let claim = match state_db
.claim_next_external_message(
&self.thread_id_string(),
can_after_turn,
can_steer_current_turn,
)
.await
{
Ok(claim) => claim,
Err(err) => {
warn!("failed to claim queued message from sqlite: {err}");
*self.timer_start_in_progress.lock().await = false;
return None;
}
};
match claim {
Some(codex_state::ExternalMessageClaim::Claimed(row)) => {
let (message, delivery) = match InjectedMessage::from_external_row(row) {
Ok(parsed) => parsed,
Err(err) => {
warn!("{err}");
continue;
}
};
let input_item =
PendingInputItem::injected(message.prompt_input_item(), message.event());
return Some(PendingMessageClaim::Claimed(Box::new(input_item), delivery));
}
Some(codex_state::ExternalMessageClaim::Invalid { id, reason }) => {
warn!("dropped invalid queued message {id}: {reason}");
continue;
}
Some(codex_state::ExternalMessageClaim::NotReady) | None => {
*self.timer_start_in_progress.lock().await = false;
return claim.map(|_| PendingMessageClaim::NotReady);
}
}
}
}
async fn claim_next_timer_for_delivery(
self: &Arc<Self>,
recurring_timer_policy: RecurringTimerPolicy,
@@ -310,6 +434,10 @@ impl Session {
}
async fn inject_timer_into_active_turn(&self, item: PendingInputItem) -> bool {
self.inject_message_into_active_turn(item).await
}
async fn inject_message_into_active_turn(&self, item: PendingInputItem) -> bool {
let active = self.active_turn.lock().await;
let Some(active_turn) = active.as_ref() else {
return false;
@@ -634,8 +762,12 @@ impl Session {
self.features.enabled(Feature::Timers)
}
fn queued_messages_feature_enabled(&self) -> bool {
self.features.enabled(Feature::QueuedMessages)
}
fn timer_db_sync_feature_enabled(&self) -> bool {
self.timers_feature_enabled()
self.timers_feature_enabled() || self.queued_messages_feature_enabled()
}
fn spawn_restored_timer_tasks(self: &Arc<Self>, restored_tasks: Vec<RestoredTimerTask>) {

View File

@@ -8,6 +8,7 @@
#![allow(dead_code)]
use crate::timers::TimerDelivery;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::protocol::InjectedMessageEvent;
@@ -43,6 +44,22 @@ pub(crate) enum InjectedMessage {
}
impl InjectedMessage {
pub(crate) fn from_external_row(
row: codex_state::ExternalMessage,
) -> Result<(Self, TimerDelivery), String> {
let delivery = serde_json::from_value::<TimerDelivery>(serde_json::Value::String(
row.delivery.clone(),
))
.map_err(|err| format!("invalid message delivery `{}`: {err}", row.delivery))?;
Ok((
Self::External {
source: row.source,
content: row.content,
},
delivery,
))
}
pub(crate) fn prompt_input_item(&self) -> ResponseInputItem {
ResponseInputItem::Message {
role: "user".to_string(),

View File

@@ -171,6 +171,7 @@ pub use rollout::append_thread_name;
pub use rollout::find_archived_thread_path_by_id_str;
#[deprecated(note = "use find_thread_path_by_id_str")]
pub use rollout::find_conversation_path_by_id_str;
pub use rollout::find_thread_ids_by_name;
pub use rollout::find_thread_meta_by_name_str;
pub use rollout::find_thread_name_by_id;
pub use rollout::find_thread_names_by_ids;

View File

@@ -14,6 +14,7 @@ pub use codex_rollout::append_thread_name;
pub use codex_rollout::find_archived_thread_path_by_id_str;
#[deprecated(note = "use find_thread_path_by_id_str")]
pub use codex_rollout::find_conversation_path_by_id_str;
pub use codex_rollout::find_thread_ids_by_name;
pub use codex_rollout::find_thread_meta_by_name_str;
pub use codex_rollout::find_thread_name_by_id;
pub use codex_rollout::find_thread_names_by_ids;

View File

@@ -377,3 +377,253 @@ async fn list_timers_discovers_externally_inserted_timer() -> Result<()> {
Ok(())
}
#[cfg_attr(
target_os = "windows",
ignore = "timer/message integration tests currently exceed the Windows Bazel job timeout"
)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn queued_messages_feature_consumes_messages_without_timers() -> Result<()> {
let server = start_mock_server().await;
let mock = mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-1"),
ev_assistant_message("msg-1", "queued turn"),
ev_completed("resp-1"),
]),
)
.await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::QueuedMessages)
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
config
.features
.enable(Feature::Sqlite)
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
});
let test = builder.build(&server).await?;
let db = test.codex.state_db().expect("state db enabled");
let thread_id = test.session_configured.session_id.to_string();
db.create_external_message(&codex_state::ExternalMessageCreateParams::new(
thread_id,
"external".to_string(),
"queued hello".to_string(),
/*instructions*/ None,
"{}".to_string(),
TimerDelivery::AfterTurn.as_str().to_string(),
Utc::now().timestamp(),
))
.await?;
wait_for_event_with_timeout(
&test.codex,
|event| match event {
EventMsg::InjectedMessage(event) => {
event.source == "external" && event.content == "queued hello"
}
_ => false,
},
TIMER_INTEGRATION_TIMEOUT,
)
.await;
wait_for_event_with_timeout(
&test.codex,
|event| matches!(event, EventMsg::TurnComplete(_)),
TIMER_INTEGRATION_TIMEOUT,
)
.await;
let requests = mock.requests();
assert_eq!(requests.len(), 1);
assert!(requests.iter().any(|request| {
request
.message_input_texts("user")
.iter()
.any(|message| message.contains("<content>\nqueued hello\n</content>"))
}));
Ok(())
}
#[cfg_attr(
target_os = "windows",
ignore = "timer/message integration tests currently exceed the Windows Bazel job timeout"
)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn queued_message_runs_after_idle_recurring_timer() -> Result<()> {
let server = start_mock_server().await;
let mock = mount_sse_sequence(
&server,
vec![
sse(vec![
ev_response_created("resp-1"),
ev_assistant_message("msg-1", "timer turn"),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_assistant_message("msg-2", "queued turn"),
ev_completed("resp-2"),
]),
],
)
.await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::Timers)
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
config
.features
.enable(Feature::QueuedMessages)
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
config
.features
.enable(Feature::Sqlite)
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
});
let test = builder.build(&server).await?;
let db = test.codex.state_db().expect("state db enabled");
let timer = test
.codex
.create_timer(
ThreadTimerTrigger::Delay {
seconds: 0,
repeat: Some(true),
},
MessagePayload {
content: "keep going".to_string(),
instructions: None,
meta: Default::default(),
},
TimerDelivery::AfterTurn,
)
.await
.map_err(|err| anyhow!("{err}"))?;
wait_for_event_with_timeout(
&test.codex,
|event| match event {
EventMsg::InjectedMessage(event) => event.source == format!("timer {}", timer.id),
_ => false,
},
TIMER_INTEGRATION_TIMEOUT,
)
.await;
let thread_id = test.session_configured.session_id.to_string();
db.create_external_message(&codex_state::ExternalMessageCreateParams::new(
thread_id,
"external".to_string(),
"queued hello".to_string(),
/*instructions*/ None,
"{}".to_string(),
TimerDelivery::AfterTurn.as_str().to_string(),
Utc::now().timestamp(),
))
.await?;
assert!(
test.codex
.delete_timer(&timer.id)
.await
.map_err(|err| anyhow!("{err}"))?,
"test should delete the idle recurring timer before it can schedule another turn"
);
wait_for_event_with_timeout(
&test.codex,
|event| matches!(event, EventMsg::TurnComplete(_)),
TIMER_INTEGRATION_TIMEOUT,
)
.await;
wait_for_event_with_timeout(
&test.codex,
|event| match event {
EventMsg::InjectedMessage(event) => event.source == "external",
_ => false,
},
TIMER_INTEGRATION_TIMEOUT,
)
.await;
wait_for_event_with_timeout(
&test.codex,
|event| matches!(event, EventMsg::TurnComplete(_)),
TIMER_INTEGRATION_TIMEOUT,
)
.await;
let requests = mock.requests();
assert!(
requests.len() >= 2,
"expected timer and queued-message turns to run"
);
assert!(
requests[0]
.message_input_texts("user")
.iter()
.any(|message| message.contains("<content>\nTimer fired: keep going\n</content>"))
);
assert!(
requests[1]
.message_input_texts("user")
.iter()
.any(|message| message.contains("<content>\nqueued hello\n</content>"))
);
Ok(())
}
#[cfg_attr(
target_os = "windows",
ignore = "timer/message integration tests currently exceed the Windows Bazel job timeout"
)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn queued_messages_feature_disabled_leaves_messages_queued() -> Result<()> {
let server = start_mock_server().await;
let mock = mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-1"),
ev_assistant_message("msg-1", "first turn"),
ev_completed("resp-1"),
]),
)
.await;
let mut builder = test_codex().with_config(|config| {
config
.features
.enable(Feature::Sqlite)
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
});
let test = builder.build(&server).await?;
let db = test.codex.state_db().expect("state db enabled");
let thread_id = test.session_configured.session_id.to_string();
db.create_external_message(&codex_state::ExternalMessageCreateParams::new(
thread_id.clone(),
"external".to_string(),
"queued hello".to_string(),
/*instructions*/ None,
"{}".to_string(),
TimerDelivery::AfterTurn.as_str().to_string(),
Utc::now().timestamp(),
))
.await?;
test.submit_turn("start").await?;
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(mock.requests().len(), 1);
assert!(
db.claim_next_external_message(
&thread_id, /*can_after_turn*/ true, /*can_steer_current_turn*/ true,
)
.await?
.is_some()
);
Ok(())
}

View File

@@ -174,6 +174,8 @@ pub enum Feature {
Artifact,
/// Enable per-thread persistent timer scheduling tools and APIs.
Timers,
/// Enable queued message creation and delivery.
QueuedMessages,
/// Enable Fast mode selection in the TUI and request layer.
FastMode,
/// Enable experimental realtime voice conversation mode in the TUI.
@@ -865,6 +867,12 @@ pub const FEATURES: &[FeatureSpec] = &[
stage: Stage::UnderDevelopment,
default_enabled: false,
},
FeatureSpec {
id: Feature::QueuedMessages,
key: "queued_messages",
stage: Stage::UnderDevelopment,
default_enabled: false,
},
FeatureSpec {
id: Feature::FastMode,
key: "fast_mode",

View File

@@ -159,6 +159,16 @@ fn timers_are_under_development() {
assert_eq!(Feature::Timers.default_enabled(), false);
}
#[test]
fn queued_messages_are_under_development() {
assert_eq!(
feature_for_key("queued_messages"),
Some(Feature::QueuedMessages)
);
assert_eq!(Feature::QueuedMessages.stage(), Stage::UnderDevelopment);
assert_eq!(Feature::QueuedMessages.default_enabled(), false);
}
#[test]
fn tool_call_mcp_elicitation_is_stable_and_enabled_by_default() {
assert_eq!(Feature::ToolCallMcpElicitation.stage(), Stage::Stable);

View File

@@ -56,6 +56,7 @@ pub use recorder::RolloutRecorder;
pub use recorder::RolloutRecorderParams;
pub use recorder::append_rollout_item_to_path;
pub use session_index::append_thread_name;
pub use session_index::find_thread_ids_by_name;
pub use session_index::find_thread_meta_by_name_str;
pub use session_index::find_thread_name_by_id;
pub use session_index::find_thread_names_by_ids;

View File

@@ -112,6 +112,48 @@ pub async fn find_thread_names_by_ids(
Ok(names)
}
/// Find all thread ids whose latest indexed thread name exactly matches `name`.
pub async fn find_thread_ids_by_name(
codex_home: &Path,
name: &str,
) -> std::io::Result<Vec<ThreadId>> {
let name = name.trim();
if name.is_empty() {
return Ok(Vec::new());
}
let path = session_index_path(codex_home);
if !path.exists() {
return Ok(Vec::new());
}
let file = tokio::fs::File::open(&path).await?;
let reader = tokio::io::BufReader::new(file);
let mut lines = reader.lines();
let mut latest_names = HashMap::<ThreadId, String>::new();
while let Some(line) = lines.next_line().await? {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let Ok(entry) = serde_json::from_str::<SessionIndexEntry>(trimmed) else {
continue;
};
let thread_name = entry.thread_name.trim();
if thread_name.is_empty() {
continue;
}
latest_names.insert(entry.id, thread_name.to_string());
}
let mut ids = latest_names
.into_iter()
.filter_map(|(thread_id, thread_name)| (thread_name == name).then_some(thread_id))
.collect::<Vec<_>>();
ids.sort_by_key(ToString::to_string);
Ok(ids)
}
/// Locate a recorded thread rollout and read its session metadata by thread name.
/// Returns the newest indexed name that still has a readable rollout header.
pub async fn find_thread_meta_by_name_str(

View File

@@ -265,6 +265,47 @@ async fn find_thread_names_by_ids_prefers_latest_entry() -> std::io::Result<()>
Ok(())
}
#[tokio::test]
async fn find_thread_ids_by_name_uses_latest_name_per_thread() -> std::io::Result<()> {
let temp = TempDir::new()?;
let path = session_index_path(temp.path());
let id1 = ThreadId::new();
let id2 = ThreadId::new();
let id3 = ThreadId::new();
let lines = vec![
SessionIndexEntry {
id: id1,
thread_name: "target".to_string(),
updated_at: "2024-01-01T00:00:00Z".to_string(),
},
SessionIndexEntry {
id: id2,
thread_name: "target".to_string(),
updated_at: "2024-01-01T00:00:00Z".to_string(),
},
SessionIndexEntry {
id: id1,
thread_name: "renamed".to_string(),
updated_at: "2024-01-02T00:00:00Z".to_string(),
},
SessionIndexEntry {
id: id3,
thread_name: "target".to_string(),
updated_at: "2024-01-03T00:00:00Z".to_string(),
},
];
write_index(&path, &lines)?;
let found = find_thread_ids_by_name(temp.path(), "target").await?;
let expected = {
let mut ids = vec![id2, id3];
ids.sort_by_key(ToString::to_string);
ids
};
assert_eq!(found, expected);
Ok(())
}
#[test]
fn scan_index_finds_latest_match_among_mixed_entries() -> std::io::Result<()> {
let temp = TempDir::new()?;