mirror of
https://github.com/openai/codex.git
synced 2026-04-28 16:45:54 +00:00
codex: add persistent thread timers
This commit is contained in:
@@ -143,6 +143,7 @@ mod sqlite_state;
|
||||
mod stream_error_allows_next_turn;
|
||||
mod stream_no_completed;
|
||||
mod subagent_notifications;
|
||||
mod timers;
|
||||
mod tool_harness;
|
||||
mod tool_parallelism;
|
||||
mod tool_suggest;
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use codex_core::CodexThread;
|
||||
use codex_protocol::AgentPath;
|
||||
@@ -31,6 +32,7 @@ use serde_json::Value;
|
||||
use serde_json::from_slice;
|
||||
use serde_json::json;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::time::timeout;
|
||||
|
||||
fn ev_message_item_done(id: &str, text: &str) -> Value {
|
||||
serde_json::json!({
|
||||
@@ -159,6 +161,22 @@ async fn submit_queue_only_agent_mail(codex: &CodexThread, text: &str) {
|
||||
.unwrap_or_else(|err| panic!("submit queue-only agent mail: {err}"));
|
||||
}
|
||||
|
||||
async fn wait_for_pending_input(codex: &CodexThread) {
|
||||
if timeout(Duration::from_secs(5), async {
|
||||
loop {
|
||||
if codex.has_pending_input().await {
|
||||
return;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
})
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
panic!("mailbox message should become pending input");
|
||||
}
|
||||
}
|
||||
|
||||
async fn wait_for_reasoning_item_started(codex: &CodexThread) {
|
||||
wait_for_event(codex, |event| {
|
||||
matches!(
|
||||
@@ -346,6 +364,7 @@ async fn queued_inter_agent_mail_triggers_follow_up_after_reasoning_item() {
|
||||
wait_for_reasoning_item_started(&codex).await;
|
||||
|
||||
submit_queue_only_agent_mail(&codex, "queued child update").await;
|
||||
wait_for_pending_input(&codex).await;
|
||||
|
||||
let _ = gate_reasoning_done_tx.send(());
|
||||
|
||||
@@ -408,6 +427,7 @@ async fn queued_inter_agent_mail_triggers_follow_up_after_commentary_message_ite
|
||||
.await;
|
||||
|
||||
submit_queue_only_agent_mail(&codex, "queued child update").await;
|
||||
wait_for_pending_input(&codex).await;
|
||||
|
||||
let _ = gate_message_done_tx.send(());
|
||||
|
||||
|
||||
379
codex-rs/core/tests/suite/timers.rs
Normal file
379
codex-rs/core/tests/suite/timers.rs
Normal file
@@ -0,0 +1,379 @@
|
||||
use anyhow::Result;
|
||||
use anyhow::anyhow;
|
||||
use chrono::Utc;
|
||||
use codex_core::injected_message::MessagePayload;
|
||||
use codex_core::timers::TIMER_FIRED_BACKGROUND_EVENT_PREFIX;
|
||||
use codex_core::timers::ThreadTimer;
|
||||
use codex_core::timers::ThreadTimerTrigger;
|
||||
use codex_core::timers::TimerDelivery;
|
||||
use codex_features::Feature;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::Op;
|
||||
use core_test_support::responses::ResponseMock;
|
||||
use core_test_support::responses::ResponsesRequest;
|
||||
use core_test_support::responses::ev_assistant_message;
|
||||
use core_test_support::responses::ev_completed;
|
||||
use core_test_support::responses::ev_response_created;
|
||||
use core_test_support::responses::mount_sse_once;
|
||||
use core_test_support::responses::mount_sse_sequence;
|
||||
use core_test_support::responses::sse;
|
||||
use core_test_support::responses::start_mock_server;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use core_test_support::wait_for_event;
|
||||
use core_test_support::wait_for_event_with_timeout;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::time::Duration;
|
||||
|
||||
const TIMER_INTEGRATION_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
#[cfg_attr(
|
||||
target_os = "windows",
|
||||
ignore = "timer/message integration tests currently exceed the Windows Bazel job timeout"
|
||||
)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn create_timer_emits_fired_background_event_when_timer_starts() -> Result<()> {
|
||||
assert_after_turn_timer_starts_and_emits_fired_event().await
|
||||
}
|
||||
|
||||
#[cfg_attr(
|
||||
target_os = "windows",
|
||||
ignore = "timer/message integration tests currently exceed the Windows Bazel job timeout"
|
||||
)]
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn create_timer_starts_on_current_thread_runtime() -> Result<()> {
|
||||
assert_after_turn_timer_starts_and_emits_fired_event().await
|
||||
}
|
||||
|
||||
async fn assert_after_turn_timer_starts_and_emits_fired_event() -> Result<()> {
|
||||
let server = start_mock_server().await;
|
||||
let mock = mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_assistant_message("msg-1", "timer ran"),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config
|
||||
.features
|
||||
.enable(Feature::Timers)
|
||||
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
|
||||
config
|
||||
.features
|
||||
.enable(Feature::Sqlite)
|
||||
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
|
||||
let created = test
|
||||
.codex
|
||||
.create_timer(
|
||||
ThreadTimerTrigger::Delay {
|
||||
seconds: 0,
|
||||
repeat: None,
|
||||
},
|
||||
MessagePayload {
|
||||
content: "run timer".to_string(),
|
||||
instructions: None,
|
||||
meta: Default::default(),
|
||||
},
|
||||
TimerDelivery::AfterTurn,
|
||||
)
|
||||
.await
|
||||
.map_err(|err| anyhow!("{err}"))?;
|
||||
|
||||
let payload = wait_for_event_with_timeout(
|
||||
&test.codex,
|
||||
|event| match event {
|
||||
EventMsg::BackgroundEvent(event) => event
|
||||
.message
|
||||
.strip_prefix(TIMER_FIRED_BACKGROUND_EVENT_PREFIX)
|
||||
.is_some(),
|
||||
_ => false,
|
||||
},
|
||||
TIMER_INTEGRATION_TIMEOUT,
|
||||
)
|
||||
.await;
|
||||
let EventMsg::BackgroundEvent(event) = payload else {
|
||||
unreachable!("event predicate only matches timer fired background events");
|
||||
};
|
||||
let payload = event
|
||||
.message
|
||||
.strip_prefix(TIMER_FIRED_BACKGROUND_EVENT_PREFIX)
|
||||
.ok_or_else(|| anyhow!("timer fired event prefix missing"))?;
|
||||
let fired: ThreadTimer = serde_json::from_str(payload)?;
|
||||
assert_eq!(fired, created);
|
||||
|
||||
wait_for_event_with_timeout(
|
||||
&test.codex,
|
||||
|event| matches!(event, EventMsg::TurnComplete(_)),
|
||||
TIMER_INTEGRATION_TIMEOUT,
|
||||
)
|
||||
.await;
|
||||
|
||||
let user_messages = mock.single_request().message_input_texts("user");
|
||||
let timer_messages = user_messages
|
||||
.iter()
|
||||
.filter(|message| message.contains("<timer_message>"))
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(timer_messages.len(), 1);
|
||||
let timer_message = timer_messages[0];
|
||||
assert!(timer_message.contains(&format!("<timer_id>{}</timer_id>", created.id)));
|
||||
assert!(timer_message.contains("<content>\nTimer fired: run timer\n</content>"));
|
||||
assert!(timer_message.contains("<instructions>\nrun timer\n\nThis one-shot timer"));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg_attr(
|
||||
target_os = "windows",
|
||||
ignore = "timer/message integration tests currently exceed the Windows Bazel job timeout"
|
||||
)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn create_timer_persists_source_and_client_metadata() -> Result<()> {
|
||||
let server = start_mock_server().await;
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config
|
||||
.features
|
||||
.enable(Feature::Timers)
|
||||
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
|
||||
config
|
||||
.features
|
||||
.enable(Feature::Sqlite)
|
||||
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
|
||||
let created = test
|
||||
.codex
|
||||
.create_timer(
|
||||
ThreadTimerTrigger::Delay {
|
||||
seconds: 60,
|
||||
repeat: Some(true),
|
||||
},
|
||||
MessagePayload {
|
||||
content: "run timer".to_string(),
|
||||
instructions: None,
|
||||
meta: Default::default(),
|
||||
},
|
||||
TimerDelivery::AfterTurn,
|
||||
)
|
||||
.await
|
||||
.map_err(|err| anyhow!("{err}"))?;
|
||||
|
||||
let db = test.codex.state_db().expect("state db enabled");
|
||||
let timers = db
|
||||
.list_thread_timers(&test.session_configured.session_id.to_string())
|
||||
.await?;
|
||||
|
||||
assert_eq!(timers.len(), 1);
|
||||
assert_eq!(timers[0].id, created.id);
|
||||
assert_eq!(timers[0].source, "agent");
|
||||
assert_eq!(timers[0].client_id, "codex-cli");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg_attr(
|
||||
target_os = "windows",
|
||||
ignore = "timer/message integration tests currently exceed the Windows Bazel job timeout"
|
||||
)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn create_timer_rejects_ephemeral_thread() -> Result<()> {
|
||||
let server = start_mock_server().await;
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config.ephemeral = true;
|
||||
config
|
||||
.features
|
||||
.enable(Feature::Timers)
|
||||
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
|
||||
let err = test
|
||||
.codex
|
||||
.create_timer(
|
||||
ThreadTimerTrigger::Delay {
|
||||
seconds: 60,
|
||||
repeat: Some(true),
|
||||
},
|
||||
MessagePayload {
|
||||
content: "run timer".to_string(),
|
||||
instructions: None,
|
||||
meta: Default::default(),
|
||||
},
|
||||
TimerDelivery::AfterTurn,
|
||||
)
|
||||
.await
|
||||
.expect_err("ephemeral sessions should not create durable timers");
|
||||
|
||||
assert!(err.contains("timer storage is unavailable for ephemeral sessions"));
|
||||
assert!(test.codex.state_db().is_none());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg_attr(
|
||||
target_os = "windows",
|
||||
ignore = "timer/message integration tests currently exceed the Windows Bazel job timeout"
|
||||
)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn resume_due_timer_runs_after_history_reconstruction() -> Result<()> {
|
||||
let server = start_mock_server().await;
|
||||
let mock = mount_sse_sequence(
|
||||
&server,
|
||||
vec![
|
||||
sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_assistant_message("msg-1", "recorded before resume"),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
sse(vec![
|
||||
ev_response_created("resp-2"),
|
||||
ev_assistant_message("msg-2", "timer after resume"),
|
||||
ev_completed("resp-2"),
|
||||
]),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config
|
||||
.features
|
||||
.enable(Feature::Timers)
|
||||
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
|
||||
config
|
||||
.features
|
||||
.enable(Feature::Sqlite)
|
||||
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
|
||||
});
|
||||
let initial = builder.build(&server).await?;
|
||||
initial.submit_turn("context before resume").await?;
|
||||
|
||||
let db = initial.codex.state_db().expect("state db enabled");
|
||||
let home = initial.home.clone();
|
||||
let rollout_path = initial
|
||||
.codex
|
||||
.rollout_path()
|
||||
.expect("rollout path should be materialized");
|
||||
let thread_id = initial.session_configured.session_id.to_string();
|
||||
|
||||
initial.codex.submit(Op::Shutdown).await?;
|
||||
wait_for_event(&initial.codex, |event| {
|
||||
matches!(event, EventMsg::ShutdownComplete)
|
||||
})
|
||||
.await;
|
||||
|
||||
let now = Utc::now().timestamp();
|
||||
db.create_thread_timer(&codex_state::ThreadTimerCreateParams {
|
||||
id: "resume-due-timer".to_string(),
|
||||
thread_id,
|
||||
source: "external".to_string(),
|
||||
client_id: "codex-test".to_string(),
|
||||
trigger_json: r#"{"kind":"delay","seconds":0,"repeat":false}"#.to_string(),
|
||||
content: "resume timer".to_string(),
|
||||
instructions: None,
|
||||
meta_json: "{}".to_string(),
|
||||
delivery: TimerDelivery::AfterTurn.as_str().to_string(),
|
||||
created_at: now - 10,
|
||||
next_run_at: Some(now - 1),
|
||||
last_run_at: None,
|
||||
pending_run: false,
|
||||
})
|
||||
.await?;
|
||||
|
||||
let mut resume_builder = test_codex().with_config(|config| {
|
||||
config
|
||||
.features
|
||||
.enable(Feature::Timers)
|
||||
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
|
||||
config
|
||||
.features
|
||||
.enable(Feature::Sqlite)
|
||||
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
|
||||
});
|
||||
let _resumed = resume_builder.resume(&server, home, rollout_path).await?;
|
||||
|
||||
let resumed_request = wait_for_request_containing(&mock, "Timer fired: resume timer").await;
|
||||
assert!(resumed_request.body_contains_text("context before resume"));
|
||||
assert!(resumed_request.body_contains_text("recorded before resume"));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn wait_for_request_containing(mock: &ResponseMock, text: &str) -> ResponsesRequest {
|
||||
tokio::time::timeout(TIMER_INTEGRATION_TIMEOUT, async {
|
||||
loop {
|
||||
if let Some(request) = mock
|
||||
.requests()
|
||||
.into_iter()
|
||||
.find(|request| request.body_contains_text(text))
|
||||
{
|
||||
return request;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
}
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|_| panic!("timed out waiting for request containing {text:?}"))
|
||||
}
|
||||
|
||||
#[cfg_attr(
|
||||
target_os = "windows",
|
||||
ignore = "timer/message integration tests currently exceed the Windows Bazel job timeout"
|
||||
)]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn list_timers_discovers_externally_inserted_timer() -> Result<()> {
|
||||
let server = start_mock_server().await;
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config
|
||||
.features
|
||||
.enable(Feature::Timers)
|
||||
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
|
||||
config
|
||||
.features
|
||||
.enable(Feature::Sqlite)
|
||||
.unwrap_or_else(|err| panic!("test config should allow feature update: {err}"));
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
let db = test.codex.state_db().expect("state db enabled");
|
||||
let created_at = Utc::now().timestamp();
|
||||
|
||||
db.create_thread_timer(&codex_state::ThreadTimerCreateParams {
|
||||
id: "external-timer".to_string(),
|
||||
thread_id: test.session_configured.session_id.to_string(),
|
||||
source: "client".to_string(),
|
||||
client_id: "external-client".to_string(),
|
||||
trigger_json: r#"{"kind":"delay","seconds":60,"repeat":true}"#.to_string(),
|
||||
content: "external timer".to_string(),
|
||||
instructions: None,
|
||||
meta_json: "{}".to_string(),
|
||||
delivery: "after-turn".to_string(),
|
||||
created_at,
|
||||
next_run_at: Some(created_at + 60),
|
||||
last_run_at: None,
|
||||
pending_run: false,
|
||||
})
|
||||
.await?;
|
||||
|
||||
let timers = test.codex.list_timers().await;
|
||||
|
||||
assert_eq!(timers.len(), 1);
|
||||
assert_eq!(timers[0].id, "external-timer");
|
||||
assert_eq!(
|
||||
timers[0].trigger,
|
||||
ThreadTimerTrigger::Delay {
|
||||
seconds: 60,
|
||||
repeat: Some(true),
|
||||
}
|
||||
);
|
||||
assert_eq!(timers[0].content, "external timer");
|
||||
assert_eq!(timers[0].delivery, TimerDelivery::AfterTurn);
|
||||
assert_eq!(timers[0].created_at, created_at);
|
||||
assert_eq!(timers[0].last_run_at, None);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Reference in New Issue
Block a user