Compare commits

...

1 Commits

Author SHA1 Message Date
Prabhat Agarwal
d6c52671ec Add session automation scheduling 2026-01-09 23:14:48 -08:00
7 changed files with 334 additions and 5 deletions

View File

@@ -91,6 +91,7 @@ tree-sitter-highlight = { workspace = true }
unicode-segmentation = { workspace = true }
unicode-width = { workspace = true }
url = { workspace = true }
uuid = { workspace = true }
codex-windows-sandbox = { workspace = true }
tokio-util = { workspace = true, features = ["time"] }
@@ -121,4 +122,3 @@ pretty_assertions = { workspace = true }
rand = { workspace = true }
serial_test = { workspace = true }
vt100 = { workspace = true }
uuid = { workspace = true }

View File

@@ -702,6 +702,9 @@ impl App {
AppEvent::CommitTick => {
self.chat_widget.on_commit_tick();
}
AppEvent::AutomationTick => {
self.chat_widget.on_automation_tick();
}
AppEvent::CodexEvent(event) => {
if self.suppress_shutdown_complete
&& matches!(event.msg, EventMsg::ShutdownComplete)

View File

@@ -70,6 +70,7 @@ pub(crate) enum AppEvent {
StartCommitAnimation,
StopCommitAnimation,
CommitTick,
AutomationTick,
/// Update the current reasoning effort in the running app and widget.
UpdateReasoningEffort(Option<ReasoningEffort>),

View File

@@ -1342,7 +1342,7 @@ impl ChatComposer {
&& let Some((_n, cmd)) = built_in_slash_commands()
.into_iter()
.find(|(command_name, _)| *command_name == name)
&& cmd == SlashCommand::Review
&& matches!(cmd, SlashCommand::Review | SlashCommand::Automation)
{
return (InputResult::CommandWithArgs(cmd, rest.to_string()), true);
}

View File

@@ -5,6 +5,8 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use chrono::Local;
use chrono::TimeZone;
use codex_app_server_protocol::AuthMode;
use codex_backend_client::Client as BackendClient;
use codex_core::config::Config;
@@ -83,6 +85,7 @@ use ratatui::widgets::Wrap;
use tokio::sync::mpsc::UnboundedSender;
use tokio::task::JoinHandle;
use tracing::debug;
use uuid::Uuid;
use crate::app_event::AppEvent;
#[cfg(target_os = "windows")]
@@ -134,7 +137,6 @@ use self::session_header::SessionHeader;
use crate::streaming::controller::StreamController;
use std::path::Path;
use chrono::Local;
use codex_common::approval_presets::ApprovalPreset;
use codex_common::approval_presets::builtin_approval_presets;
use codex_core::AuthManager;
@@ -194,6 +196,7 @@ fn is_standard_tool_call(parsed_cmd: &[ParsedCommand]) -> bool {
const RATE_LIMIT_WARNING_THRESHOLDS: [f64; 3] = [75.0, 90.0, 95.0];
const NUDGE_MODEL_SLUG: &str = "gpt-5.1-codex-mini";
const RATE_LIMIT_SWITCH_PROMPT_THRESHOLD: f64 = 90.0;
const AUTOMATION_TICK_SECS: u64 = 30;
#[derive(Default)]
struct RateLimitWarningState {
@@ -282,6 +285,71 @@ pub(crate) fn get_limits_duration(windows_minutes: i64) -> String {
}
}
fn now_ms() -> u64 {
let Ok(duration) = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) else {
return 0;
};
duration.as_millis().min(u128::from(u64::MAX)) as u64
}
fn format_timestamp(ms: u64) -> String {
Local
.timestamp_millis_opt(ms as i64)
.single()
.map(|ts| ts.format("%Y-%m-%d %H:%M:%S").to_string())
.unwrap_or_else(|| "-".to_string())
}
fn format_interval(interval: Duration) -> String {
let secs = interval.as_secs();
if secs % 86_400 == 0 {
format!("{}d", secs / 86_400)
} else if secs % 3_600 == 0 {
format!("{}h", secs / 3_600)
} else if secs % 60 == 0 {
format!("{}m", secs / 60)
} else {
format!("{}s", secs)
}
}
fn parse_interval_spec(spec: &str) -> Option<Duration> {
let trimmed = spec.trim();
if trimmed.is_empty() {
return None;
}
let (value_str, unit) = trimmed.split_at(trimmed.len().saturating_sub(1));
let (value, unit) = if value_str.is_empty() || value_str.chars().any(|c| !c.is_ascii_digit()) {
(trimmed, "")
} else {
(value_str, unit)
};
let value: u64 = value.parse().ok()?;
if value == 0 {
return None;
}
let secs = match unit {
"s" => value,
"m" | "" => value.checked_mul(60)?,
"h" => value.checked_mul(3_600)?,
"d" => value.checked_mul(86_400)?,
_ => return None,
};
Some(Duration::from_secs(secs))
}
fn summarize_prompt(prompt: &str) -> String {
const MAX: usize = 80;
let trimmed = prompt.trim().replace('\n', " ");
if trimmed.chars().count() <= MAX {
trimmed
} else {
let mut out = trimmed.chars().take(MAX - 3).collect::<String>();
out.push_str("...");
out
}
}
/// Common initialization parameters shared by all `ChatWidget` constructors.
pub(crate) struct ChatWidgetInit {
pub(crate) config: Config,
@@ -313,6 +381,14 @@ pub(crate) enum ExternalEditorState {
Active,
}
struct Automation {
id: Uuid,
prompt: String,
interval: Duration,
next_run_at_ms: u64,
last_run_at_ms: Option<u64>,
}
pub(crate) struct ChatWidget {
app_event_tx: AppEventSender,
codex_op_tx: UnboundedSender<Op>,
@@ -330,6 +406,7 @@ pub(crate) struct ChatWidget {
rate_limit_warnings: RateLimitWarningState,
rate_limit_switch_prompt: RateLimitSwitchPromptState,
rate_limit_poller: Option<JoinHandle<()>>,
automation_ticker: Option<JoinHandle<()>>,
// Stream lifecycle controller
stream_controller: Option<StreamController>,
running_commands: HashMap<String, RunningCommand>,
@@ -357,6 +434,7 @@ pub(crate) struct ChatWidget {
suppress_session_configured_redraw: bool,
// User messages queued while a turn is in progress
queued_user_messages: VecDeque<UserMessage>,
automations: Vec<Automation>,
// Pending notification to show when unfocused on next Draw
pending_notification: Option<Notification>,
// Simple review mode flag; used to adjust layout and banners.
@@ -374,9 +452,16 @@ pub(crate) struct ChatWidget {
external_editor_state: ExternalEditorState,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum UserMessageSource {
User,
Automation(Uuid),
}
struct UserMessage {
text: String,
image_paths: Vec<PathBuf>,
source: UserMessageSource,
}
impl From<String> for UserMessage {
@@ -384,6 +469,7 @@ impl From<String> for UserMessage {
Self {
text,
image_paths: Vec::new(),
source: UserMessageSource::User,
}
}
}
@@ -393,15 +479,34 @@ impl From<&str> for UserMessage {
Self {
text: text.to_string(),
image_paths: Vec::new(),
source: UserMessageSource::User,
}
}
}
impl UserMessage {
fn automation(prompt: String, id: Uuid) -> Self {
Self {
text: prompt,
image_paths: Vec::new(),
source: UserMessageSource::Automation(id),
}
}
fn is_automation(&self) -> bool {
matches!(self.source, UserMessageSource::Automation(_))
}
}
fn create_initial_user_message(text: String, image_paths: Vec<PathBuf>) -> Option<UserMessage> {
if text.is_empty() && image_paths.is_empty() {
None
} else {
Some(UserMessage { text, image_paths })
Some(UserMessage {
text,
image_paths,
source: UserMessageSource::User,
})
}
}
@@ -1455,6 +1560,7 @@ impl ChatWidget {
rate_limit_warnings: RateLimitWarningState::default(),
rate_limit_switch_prompt: RateLimitSwitchPromptState::default(),
rate_limit_poller: None,
automation_ticker: None,
stream_controller: None,
running_commands: HashMap::new(),
suppressed_exec_calls: HashSet::new(),
@@ -1469,6 +1575,7 @@ impl ChatWidget {
retry_status_header: None,
thread_id: None,
queued_user_messages: VecDeque::new(),
automations: Vec::new(),
show_welcome_banner: is_first_run,
suppress_session_configured_redraw: false,
pending_notification: None,
@@ -1541,6 +1648,7 @@ impl ChatWidget {
rate_limit_warnings: RateLimitWarningState::default(),
rate_limit_switch_prompt: RateLimitSwitchPromptState::default(),
rate_limit_poller: None,
automation_ticker: None,
stream_controller: None,
running_commands: HashMap::new(),
suppressed_exec_calls: HashSet::new(),
@@ -1555,6 +1663,7 @@ impl ChatWidget {
retry_status_header: None,
thread_id: None,
queued_user_messages: VecDeque::new(),
automations: Vec::new(),
show_welcome_banner: false,
suppress_session_configured_redraw: true,
pending_notification: None,
@@ -1636,6 +1745,7 @@ impl ChatWidget {
let user_message = UserMessage {
text,
image_paths: self.bottom_pane.take_recent_submission_images(),
source: UserMessageSource::User,
};
self.queue_user_message(user_message);
}
@@ -1829,6 +1939,9 @@ impl ChatWidget {
SlashCommand::Status => {
self.add_status_output();
}
SlashCommand::Automation => {
self.show_automation_help();
}
SlashCommand::Ps => {
self.add_ps_output();
}
@@ -1909,6 +2022,9 @@ impl ChatWidget {
},
});
}
SlashCommand::Automation => {
self.handle_automation_command(trimmed);
}
_ => self.dispatch_command(cmd),
}
}
@@ -1986,11 +2102,35 @@ impl ChatWidget {
}
}
fn queue_automation_message(&mut self, id: Uuid, prompt: String) {
let message = UserMessage::automation(prompt, id);
if self.bottom_pane.is_task_running() {
if self.queued_user_messages.iter().any(UserMessage::is_automation) {
return;
}
self.queued_user_messages.push_back(message);
self.refresh_queued_user_messages();
} else {
self.submit_user_message(message);
}
}
fn submit_user_message(&mut self, user_message: UserMessage) {
let UserMessage { text, image_paths } = user_message;
let is_automation = user_message.is_automation();
let UserMessage {
text,
image_paths,
..
} = user_message;
if text.is_empty() && image_paths.is_empty() {
return;
}
if is_automation {
self.add_info_message(
format!("Automation fired at {}: {}", format_timestamp(now_ms()), text),
None,
);
}
let mut items: Vec<UserInput> = Vec::new();
@@ -2349,12 +2489,191 @@ impl ChatWidget {
self.add_to_history(history_cell::new_unified_exec_processes_output(processes));
}
fn show_automation_help(&mut self) {
self.add_info_message(
"Automation commands: /automation list | /automation add <interval> <prompt> | /automation remove <id> | /automation clear (interval example: 5m, 1h, 30s)".to_string(),
None,
);
}
fn show_automation_list(&mut self) {
if self.automations.is_empty() {
self.add_info_message("No automations scheduled for this session.".to_string(), None);
return;
}
let mut lines = Vec::with_capacity(self.automations.len() + 1);
lines.push(Line::from("Automations (session-only):"));
for automation in &self.automations {
let next_run = format_timestamp(automation.next_run_at_ms);
let last_run = automation
.last_run_at_ms
.map(format_timestamp)
.unwrap_or_else(|| "-".to_string());
lines.push(Line::from(format!(
"{} | every {} | next {} | last {} | {}",
automation.id,
format_interval(automation.interval),
next_run,
last_run,
summarize_prompt(&automation.prompt),
)));
}
self.add_plain_history_lines(lines);
}
fn add_automation(&mut self, interval: Duration, prompt: String) {
let now = now_ms();
let interval_ms = interval.as_millis().min(u128::from(u64::MAX)) as u64;
let next_run_at_ms = now.saturating_add(interval_ms);
let id = Uuid::new_v4();
self.automations.push(Automation {
id,
prompt,
interval,
next_run_at_ms,
last_run_at_ms: None,
});
self.refresh_automation_ticker();
self.add_info_message(
format!(
"Automation {id} scheduled every {}. Next run at {}.",
format_interval(interval),
format_timestamp(next_run_at_ms),
),
None,
);
}
fn remove_automation(&mut self, id: Uuid) {
let before = self.automations.len();
self.automations.retain(|automation| automation.id != id);
if self.automations.len() == before {
self.add_error_message(format!("No automation found for id {id}."));
return;
}
self.refresh_automation_ticker();
self.add_info_message(format!("Removed automation {id}."), None);
}
fn clear_automations(&mut self) {
if self.automations.is_empty() {
self.add_info_message("No automations to clear.".to_string(), None);
return;
}
self.automations.clear();
self.refresh_automation_ticker();
self.add_info_message("Cleared all automations for this session.".to_string(), None);
}
fn handle_automation_command(&mut self, args: &str) {
let trimmed = args.trim();
if trimmed.is_empty() {
self.show_automation_help();
return;
}
let action = trimmed.split_whitespace().next().unwrap_or("");
let rest = trimmed[action.len()..].trim();
match action {
"list" => self.show_automation_list(),
"add" => {
let interval_spec = rest.split_whitespace().next().unwrap_or("");
let prompt = rest[interval_spec.len()..].trim();
let Some(interval) = parse_interval_spec(interval_spec) else {
self.add_error_message(
"Usage: /automation add <interval> <prompt> (example: /automation add 5m check status)".to_string(),
);
return;
};
if prompt.is_empty() {
self.add_error_message(
"Usage: /automation add <interval> <prompt> (example: /automation add 5m check status)".to_string(),
);
return;
}
self.add_automation(interval, prompt.to_string());
}
"remove" | "rm" | "delete" => {
let id_str = rest.trim();
let Ok(id) = Uuid::parse_str(id_str) else {
self.add_error_message(
"Usage: /automation remove <id> (use /automation list to find ids)".to_string(),
);
return;
};
self.remove_automation(id);
}
"clear" => self.clear_automations(),
_ => self.show_automation_help(),
}
}
fn stop_rate_limit_poller(&mut self) {
if let Some(handle) = self.rate_limit_poller.take() {
handle.abort();
}
}
fn stop_automation_ticker(&mut self) {
if let Some(handle) = self.automation_ticker.take() {
handle.abort();
}
}
fn ensure_automation_ticker(&mut self) {
if self.automation_ticker.is_some() {
return;
}
let app_event_tx = self.app_event_tx.clone();
let handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(AUTOMATION_TICK_SECS));
loop {
interval.tick().await;
app_event_tx.send(AppEvent::AutomationTick);
}
});
self.automation_ticker = Some(handle);
}
fn refresh_automation_ticker(&mut self) {
if self.automations.is_empty() {
self.stop_automation_ticker();
} else {
self.ensure_automation_ticker();
}
}
pub(crate) fn on_automation_tick(&mut self) {
if self.automations.is_empty() {
self.stop_automation_ticker();
return;
}
if self.queued_user_messages.iter().any(UserMessage::is_automation) {
return;
}
let now = now_ms();
let mut next_index: Option<usize> = None;
let mut earliest_due = u64::MAX;
for (idx, automation) in self.automations.iter().enumerate() {
if automation.next_run_at_ms <= now && automation.next_run_at_ms < earliest_due {
earliest_due = automation.next_run_at_ms;
next_index = Some(idx);
}
}
let Some(index) = next_index else {
return;
};
let automation = &mut self.automations[index];
let interval_ms = automation
.interval
.as_millis()
.min(u128::from(u64::MAX)) as u64;
automation.last_run_at_ms = Some(now);
automation.next_run_at_ms = now.saturating_add(interval_ms);
let id = automation.id;
let prompt = automation.prompt.clone();
self.queue_automation_message(id, prompt);
}
fn prefetch_rate_limits(&mut self) {
self.stop_rate_limit_poller();
@@ -3909,6 +4228,7 @@ impl ChatWidget {
impl Drop for ChatWidget {
fn drop(&mut self) {
self.stop_rate_limit_poller();
self.stop_automation_ticker();
}
}

View File

@@ -401,6 +401,7 @@ async fn make_chatwidget_manual(
rate_limit_warnings: RateLimitWarningState::default(),
rate_limit_switch_prompt: RateLimitSwitchPromptState::default(),
rate_limit_poller: None,
automation_ticker: None,
stream_controller: None,
running_commands: HashMap::new(),
suppressed_exec_calls: HashSet::new(),
@@ -417,6 +418,7 @@ async fn make_chatwidget_manual(
frame_requester: FrameRequester::test_dummy(),
show_welcome_banner: true,
queued_user_messages: VecDeque::new(),
automations: Vec::new(),
suppress_session_configured_redraw: false,
pending_notification: None,
is_review_mode: false,

View File

@@ -27,6 +27,7 @@ pub enum SlashCommand {
Diff,
Mention,
Status,
Automation,
Mcp,
Logout,
Quit,
@@ -53,6 +54,7 @@ impl SlashCommand {
SlashCommand::Mention => "mention a file",
SlashCommand::Skills => "use skills to improve how Codex performs specific tasks",
SlashCommand::Status => "show current session configuration and token usage",
SlashCommand::Automation => "manage scheduled prompts for this session",
SlashCommand::Ps => "list background terminals",
SlashCommand::Model => "choose what model and reasoning effort to use",
SlashCommand::Approvals => "choose what Codex can do without approval",
@@ -89,6 +91,7 @@ impl SlashCommand {
| SlashCommand::Mention
| SlashCommand::Skills
| SlashCommand::Status
| SlashCommand::Automation
| SlashCommand::Ps
| SlashCommand::Mcp
| SlashCommand::Feedback