Compare commits

...

3 Commits

Author SHA1 Message Date
Kevin Liu
d767dcc0a0 slack 2026-02-09 16:45:58 -08:00
Kevin Liu
35f653cc29 up 2026-02-09 16:44:40 -08:00
Kevin Liu
49cb5796a6 Immortality 2026-02-09 16:44:06 -08:00
15 changed files with 947 additions and 1 deletions

View File

@@ -998,6 +998,9 @@ fn merge_interactive_cli_flags(interactive: &mut TuiCli, subcommand_cli: TuiCli)
if subcommand_cli.full_auto {
interactive.full_auto = true;
}
if subcommand_cli.immortal {
interactive.immortal = true;
}
if subcommand_cli.dangerously_bypass_approvals_and_sandbox {
interactive.dangerously_bypass_approvals_and_sandbox = true;
}

View File

@@ -99,6 +99,7 @@ tokio = { workspace = true, features = [
"rt-multi-thread",
"signal",
] }
tokio-tungstenite = { workspace = true }
tokio-util = { workspace = true, features = ["rt"] }
tokio-tungstenite = { workspace = true }
toml = { workspace = true }

View File

@@ -201,6 +201,7 @@ use crate::skills::injection::ToolMentionKind;
use crate::skills::injection::app_id_from_path;
use crate::skills::injection::tool_kind_for_path;
use crate::skills::resolve_skill_dependencies_for_turn;
use crate::slack::SlackThreadManager;
use crate::state::ActiveTurn;
use crate::state::SessionServices;
use crate::state::SessionState;
@@ -422,6 +423,9 @@ impl Codex {
tokio::spawn(
submission_loop(Arc::clone(&session), config, rx_sub).instrument(session_loop_span),
);
if let Some(slack) = session.services.slack.clone() {
slack.start(Arc::clone(&session), tx_sub.clone());
}
let codex = Codex {
next_id: AtomicU64::new(0),
tx_sub,
@@ -1047,6 +1051,19 @@ impl Session {
default_shell.shell_snapshot = rx;
tx
};
let slack_manager = match SlackThreadManager::from_env(
&config,
conversation_id,
session_configuration.cwd.clone(),
)
.await
{
Ok(manager) => manager,
Err(err) => {
warn!("failed to initialize Slack integration: {err:#}");
None
}
};
let thread_name =
match session_index::find_thread_name_by_id(&config.codex_home, &conversation_id).await
{
@@ -1072,6 +1089,7 @@ impl Session {
user_shell: Arc::new(default_shell),
shell_snapshot_tx,
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
slack: slack_manager,
exec_policy,
auth_manager: Arc::clone(&auth_manager),
otel_manager,
@@ -3541,6 +3559,9 @@ mod handlers {
.unified_exec_manager
.terminate_all_processes()
.await;
if let Some(slack) = &sess.services.slack {
slack.shutdown();
}
info!("Shutting down Codex instance");
let history = sess.clone_history().await;
let turn_count = history
@@ -6174,6 +6195,7 @@ mod tests {
user_shell: Arc::new(default_user_shell()),
shell_snapshot_tx: watch::channel(None).0,
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
slack: None,
exec_policy,
auth_manager: auth_manager.clone(),
otel_manager: otel_manager.clone(),
@@ -6307,6 +6329,7 @@ mod tests {
user_shell: Arc::new(default_user_shell()),
shell_snapshot_tx: watch::channel(None).0,
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
slack: None,
exec_policy,
auth_manager: Arc::clone(&auth_manager),
otel_manager: otel_manager.clone(),

View File

@@ -99,6 +99,7 @@ pub mod seatbelt;
pub mod shell;
pub mod shell_snapshot;
pub mod skills;
mod slack;
pub mod spawn;
pub mod state_db;
pub mod terminal;

722
codex-rs/core/src/slack.rs Normal file
View File

@@ -0,0 +1,722 @@
use std::collections::HashMap;
use std::collections::HashSet;
use std::collections::VecDeque;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use anyhow::Context;
use anyhow::Result;
use codex_protocol::ThreadId;
use codex_protocol::protocol::Op;
use codex_protocol::user_input::UserInput;
use futures::StreamExt;
use http::header::AUTHORIZATION;
use http::header::COOKIE;
use http::header::HeaderName;
use http::header::HeaderValue;
use http::header::ORIGIN;
use http::header::REFERER;
use once_cell::sync::Lazy;
use regex::Regex;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value;
use tokio::sync::Mutex;
use tokio::time::sleep;
use tokio_tungstenite::tungstenite::Message;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use tracing::warn;
use crate::codex::Session;
use crate::config::Config;
use crate::default_client::build_reqwest_client;
use crate::protocol::Submission;
const DEFAULT_API_BASE: &str = "https://slack.com/api/";
const USER_AGENT: &str = "wee_slack_mcp/0.1";
const SLACK_THREADS_DIR: &str = "slack_threads";
const SLACK_NOTIFY_DEFAULT_PREFIX: &str = "[Codex]";
const RTM_RECONNECT_DELAY: Duration = Duration::from_secs(2);
const DEDUPE_CAPACITY: usize = 5_000;
static USER_MENTION_RE: Lazy<Regex> =
Lazy::new(|| Regex::new(r"<@([A-Za-z0-9]+)(?:\\|([^>]+))?>").unwrap());
static USER_MENTION_FIX_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"@\\|([^\\s>]+)").unwrap());
#[derive(Clone, Debug)]
struct SlackConfig {
token: Option<String>,
cookie: Option<String>,
api_base: String,
base_url: Option<String>,
}
impl SlackConfig {
fn from_env() -> Result<Option<Self>> {
let token_env = std::env::var("SLACK_TOKEN").ok();
let cookie_env = std::env::var("SLACK_COOKIE").ok();
let workspace_env = std::env::var("SLACK_WORKSPACE").ok();
let mut token = token_env.clone();
let mut cookie = cookie_env.clone();
if let Some(token_val) = token_env.as_deref()
&& token_val.contains(':')
{
let mut parts = token_val.splitn(2, ':');
let token_part = parts.next().unwrap_or("").trim();
let cookie_part = parts.next().unwrap_or("").trim();
if token_part.is_empty() {
anyhow::bail!("Invalid SLACK_TOKEN format (empty token part)");
}
token = Some(token_part.to_string());
if !cookie_part.is_empty() {
cookie = Some(cookie_part.to_string());
}
}
if token.is_none() && cookie.is_none() {
return Ok(None);
}
if token.is_none() {
if let Some(workspace) = workspace_env.as_deref() {
let api_base = format!("https://{workspace}.slack.com/api/");
let base_url = Some(format!("https://{workspace}.slack.com"));
return Ok(Some(Self {
token,
cookie,
api_base,
base_url,
}));
}
anyhow::bail!("SLACK_COOKIE requires SLACK_WORKSPACE (e.g. your-workspace)");
}
Ok(Some(Self {
token,
cookie,
api_base: DEFAULT_API_BASE.to_string(),
base_url: None,
}))
}
}
#[derive(Clone)]
struct SlackClient {
config: SlackConfig,
client: reqwest::Client,
}
impl SlackClient {
fn new(config: SlackConfig) -> Result<Self> {
let client = build_reqwest_client();
Ok(Self { config, client })
}
async fn api_call(&self, method: &str, params: HashMap<&str, String>) -> Result<Value> {
let url = format!("{}{}", self.config.api_base, method);
let mut data = params;
if self.config.api_base != DEFAULT_API_BASE {
if let Some(token) = &self.config.token {
data.entry("token").or_insert_with(|| token.clone());
}
}
let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
reqwest::header::USER_AGENT,
HeaderValue::from_static(USER_AGENT),
);
headers.insert(
reqwest::header::CONTENT_TYPE,
HeaderValue::from_static("application/x-www-form-urlencoded"),
);
headers.insert(
reqwest::header::ACCEPT_ENCODING,
HeaderValue::from_static("gzip"),
);
if let Some(token) = &self.config.token {
let value = format!("Bearer {token}");
headers.insert(AUTHORIZATION, HeaderValue::from_str(&value)?);
}
if let Some(base_url) = &self.config.base_url {
headers.insert(ORIGIN, HeaderValue::from_str(base_url)?);
headers.insert(REFERER, HeaderValue::from_str(&format!("{base_url}/"))?);
headers.insert(
HeaderName::from_static("x-slack-user-agent"),
HeaderValue::from_static("SlackWeb/1.0"),
);
if let Ok(version_ts) = std::env::var("SLACK_VERSION_TS") {
headers.insert(
HeaderName::from_static("x-slack-version-ts"),
HeaderValue::from_str(&version_ts)?,
);
}
}
if let Some(cookie) = &self.config.cookie {
let mut cookie_val = cookie.clone();
if !cookie_val.starts_with("d=") {
cookie_val = format!("d={cookie_val}");
}
headers.insert(COOKIE, HeaderValue::from_str(&cookie_val)?);
}
let response = self
.client
.post(url)
.headers(headers)
.form(&data)
.timeout(Duration::from_secs(30))
.send()
.await
.context("Slack API request failed")?;
let payload: Value = response
.json()
.await
.context("Slack API returned non-JSON response")?;
if payload.get("ok").and_then(Value::as_bool).unwrap_or(false) {
Ok(payload)
} else {
let error = payload
.get("error")
.and_then(Value::as_str)
.unwrap_or("unknown_error");
anyhow::bail!("Slack API error for {method}: {error}");
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
struct SlackThreadRecord {
thread_id: String,
channel_name: String,
channel_id: String,
thread_ts: String,
}
struct SlackThreadStore {
root: PathBuf,
}
impl SlackThreadStore {
fn new(codex_home: &Path) -> Self {
Self {
root: codex_home.join(SLACK_THREADS_DIR),
}
}
fn thread_path(&self, thread_id: &ThreadId) -> PathBuf {
self.root.join(format!("{}.json", thread_id))
}
async fn load(&self, thread_id: &ThreadId) -> Result<Option<SlackThreadRecord>> {
let path = self.thread_path(thread_id);
let bytes = match tokio::fs::read(&path).await {
Ok(bytes) => bytes,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(err) => return Err(err.into()),
};
let record = serde_json::from_slice::<SlackThreadRecord>(&bytes)
.context("failed to parse Slack thread record")?;
Ok(Some(record))
}
async fn save(&self, record: &SlackThreadRecord) -> Result<()> {
tokio::fs::create_dir_all(&self.root).await?;
let path = self.root.join(format!("{}.json", record.thread_id));
let bytes = serde_json::to_vec_pretty(record)?;
tokio::fs::write(path, bytes).await?;
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct SlackNotifyResult {
pub channel: String,
pub ts: String,
pub thread_ts: String,
}
#[derive(Default)]
struct DedupeState {
seen: HashSet<String>,
order: VecDeque<String>,
}
impl DedupeState {
fn insert(&mut self, message_id: String) -> bool {
if self.seen.contains(&message_id) {
return false;
}
self.seen.insert(message_id.clone());
self.order.push_back(message_id);
while self.order.len() > DEDUPE_CAPACITY {
if let Some(oldest) = self.order.pop_front() {
self.seen.remove(&oldest);
}
}
true
}
}
#[derive(Default)]
struct SlackThreadState {
record: Option<SlackThreadRecord>,
}
pub struct SlackThreadManager {
notify_channel: String,
notify_prefix: String,
thread_id: ThreadId,
cwd: PathBuf,
store: SlackThreadStore,
client: SlackClient,
state: Mutex<SlackThreadState>,
dedupe: Mutex<DedupeState>,
self_user_id: Mutex<Option<String>>,
cancel_token: CancellationToken,
started: AtomicBool,
}
impl SlackThreadManager {
pub async fn from_env(
config: &Config,
thread_id: ThreadId,
cwd: PathBuf,
) -> Result<Option<Arc<Self>>> {
let Some(channel) = std::env::var("SLACKMCP_NOTIFY_CHANNEL").ok() else {
return Ok(None);
};
let notify_prefix = SLACK_NOTIFY_DEFAULT_PREFIX.to_string();
let Some(slack_config) = SlackConfig::from_env()? else {
warn!("Slack integration disabled: missing SLACK_TOKEN or SLACK_COOKIE");
return Ok(None);
};
let client = SlackClient::new(slack_config)?;
let store = SlackThreadStore::new(&config.codex_home);
let record = store.load(&thread_id).await?;
let state = SlackThreadState { record };
Ok(Some(Arc::new(Self {
notify_channel: channel,
notify_prefix,
thread_id,
cwd,
store,
client,
state: Mutex::new(state),
dedupe: Mutex::new(DedupeState::default()),
self_user_id: Mutex::new(None),
cancel_token: CancellationToken::new(),
started: AtomicBool::new(false),
})))
}
pub fn start(
self: &Arc<Self>,
session: Arc<Session>,
submission_tx: async_channel::Sender<Submission>,
) {
if self
.started
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_err()
{
return;
}
let manager = Arc::clone(self);
tokio::spawn(async move {
if let Err(err) = manager.ensure_thread().await {
warn!("Failed to initialize Slack thread: {err:#}");
}
manager.run_rtm_listener(session, submission_tx).await;
});
}
pub fn shutdown(&self) {
self.cancel_token.cancel();
}
pub async fn notify_user(&self, message: &str) -> Result<SlackNotifyResult> {
if message.trim().is_empty() {
anyhow::bail!("message must not be empty");
}
let record = self.ensure_thread().await?;
let text = format!("{} {}", self.notify_prefix, message.trim());
let payload = self
.client
.api_call(
"chat.postMessage",
HashMap::from([
("channel", record.channel_id.clone()),
("text", text),
("thread_ts", record.thread_ts.clone()),
]),
)
.await?;
let channel = payload
.get("channel")
.and_then(Value::as_str)
.unwrap_or(&record.channel_id)
.to_string();
let ts = payload
.get("ts")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
Ok(SlackNotifyResult {
channel,
ts,
thread_ts: record.thread_ts,
})
}
async fn ensure_thread(&self) -> Result<SlackThreadRecord> {
{
let state = self.state.lock().await;
if let Some(record) = &state.record
&& record.channel_name == self.notify_channel
{
return Ok(record.clone());
}
}
let channel_id = self.find_channel_id(&self.notify_channel).await?;
let thread_title = format!(
"{} New Codex thread: {} (cwd: {})",
self.notify_prefix,
self.thread_id,
self.cwd.display()
);
let payload = self
.client
.api_call(
"chat.postMessage",
HashMap::from([("channel", channel_id.clone()), ("text", thread_title)]),
)
.await?;
let thread_ts = payload
.get("ts")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
if thread_ts.is_empty() {
anyhow::bail!("Slack API did not return thread timestamp");
}
let record = SlackThreadRecord {
thread_id: self.thread_id.to_string(),
channel_name: self.notify_channel.clone(),
channel_id,
thread_ts,
};
self.store.save(&record).await?;
let mut state = self.state.lock().await;
state.record = Some(record.clone());
Ok(record)
}
async fn find_channel_id(&self, channel_name: &str) -> Result<String> {
let normalized = channel_name.trim().trim_start_matches('#');
let mut cursor: Option<String> = None;
loop {
let mut params = HashMap::from([
("types", "public_channel,private_channel".to_string()),
("exclude_archived", "1".to_string()),
("limit", "1000".to_string()),
]);
if let Some(cursor_val) = cursor.as_deref()
&& !cursor_val.is_empty()
{
params.insert("cursor", cursor_val.to_string());
}
let payload = self.client.api_call("conversations.list", params).await?;
let channels = payload
.get("channels")
.and_then(Value::as_array)
.cloned()
.unwrap_or_default();
for ch in channels {
let name = ch.get("name").and_then(Value::as_str).unwrap_or("");
if name == normalized {
if let Some(id) = ch.get("id").and_then(Value::as_str) {
return Ok(id.to_string());
}
}
}
let next = payload
.get("response_metadata")
.and_then(|meta| meta.get("next_cursor"))
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
if next.is_empty() {
break;
}
cursor = Some(next);
}
anyhow::bail!("Could not find channel #{}", normalized)
}
async fn current_user_id(&self) -> Result<Option<String>> {
let mut guard = self.self_user_id.lock().await;
if guard.is_some() {
return Ok(guard.clone());
}
if self.client.config.token.is_none() {
return Ok(None);
}
let payload = self.client.api_call("auth.test", HashMap::new()).await?;
let user_id = payload
.get("user_id")
.and_then(Value::as_str)
.map(str::to_string);
*guard = user_id.clone();
Ok(user_id)
}
async fn run_rtm_listener(
self: Arc<Self>,
session: Arc<Session>,
submission_tx: async_channel::Sender<Submission>,
) {
if self.client.config.token.is_none() {
debug!("Slack RTM disabled: no token available");
return;
}
loop {
if self.cancel_token.is_cancelled() {
break;
}
let rtm_url = match self.rtm_connect().await {
Ok(url) => url,
Err(err) => {
warn!("Slack RTM connect failed: {err:#}");
sleep(RTM_RECONNECT_DELAY).await;
continue;
}
};
if let Err(err) = self
.listen_on_socket(&rtm_url, &session, &submission_tx)
.await
{
warn!("Slack RTM socket ended: {err:#}");
sleep(RTM_RECONNECT_DELAY).await;
}
}
}
async fn rtm_connect(&self) -> Result<String> {
let payload = self
.client
.api_call(
"rtm.connect",
HashMap::from([("batch_presence_aware", "1".to_string())]),
)
.await?;
payload
.get("url")
.and_then(Value::as_str)
.map(str::to_string)
.context("Slack RTM missing url")
}
async fn listen_on_socket(
&self,
url: &str,
session: &Arc<Session>,
submission_tx: &async_channel::Sender<Submission>,
) -> Result<()> {
let mut request = http::Request::builder().uri(url);
request = request.header(reqwest::header::USER_AGENT, USER_AGENT);
if let Some(base_url) = &self.client.config.base_url {
request = request.header(ORIGIN, base_url);
request = request.header(REFERER, format!("{base_url}/"));
}
if let Some(cookie) = &self.client.config.cookie {
let cookie_val = if cookie.starts_with("d=") {
cookie.clone()
} else {
format!("d={cookie}")
};
request = request.header(COOKIE, cookie_val);
}
let request = request.body(())?;
let (mut socket, _) = tokio_tungstenite::connect_async(request).await?;
loop {
tokio::select! {
_ = self.cancel_token.cancelled() => {
return Ok(());
}
msg = socket.next() => {
let Some(msg) = msg else { return Ok(()); };
let msg = msg?;
match msg {
Message::Text(text) => {
if let Ok(event) = serde_json::from_str::<SlackEvent>(&text) {
if let Some(message) = self.normalize_message_event(event).await? {
self.handle_incoming_message(message, session, submission_tx).await;
}
}
}
Message::Close(_) => return Ok(()),
_ => {}
}
}
}
}
}
async fn handle_incoming_message(
&self,
message: SlackMessageEvent,
session: &Arc<Session>,
submission_tx: &async_channel::Sender<Submission>,
) {
let mut dedupe = self.dedupe.lock().await;
if !dedupe.insert(message.message_id.clone()) {
return;
}
drop(dedupe);
let text = format!(
"Slack thread reply from {}: {}",
message.author, message.text
);
let input = vec![UserInput::Text {
text,
text_elements: Vec::new(),
}];
if session.inject_input(input.clone()).await.is_ok() {
return;
}
let sub = Submission {
id: format!("slack-{}", message.message_id),
op: Op::UserInput {
items: input,
final_output_json_schema: None,
},
};
if let Err(err) = submission_tx.send(sub).await {
warn!("Failed to enqueue Slack input: {err}");
}
}
async fn normalize_message_event(
&self,
event: SlackEvent,
) -> Result<Option<SlackMessageEvent>> {
if event.kind != "message" {
return Ok(None);
}
let mut event = event;
if let Some(subtype) = event.subtype.as_deref() {
if subtype == "message_replied" {
if let Some(message) = event.message.take() {
event.text = message.text;
event.user = message.user;
event.username = message.username;
event.bot_id = message.bot_id;
event.ts = message.ts;
event.thread_ts = message.thread_ts;
event.client_msg_id = message.client_msg_id;
}
} else {
return Ok(None);
}
}
let Some(channel) = event.channel.as_deref() else {
return Ok(None);
};
let record = {
let state = self.state.lock().await;
state.record.clone()
};
let Some(record) = record else {
return Ok(None);
};
if channel != record.channel_id {
return Ok(None);
}
let thread_ts = event.thread_ts.as_deref().unwrap_or("");
if thread_ts != record.thread_ts {
return Ok(None);
}
let user_id = event.user.clone();
if let Some(self_user) = self.current_user_id().await? {
if user_id.as_deref() == Some(self_user.as_str()) {
return Ok(None);
}
}
if event.bot_id.is_some() {
return Ok(None);
}
let text = simplify_mentions(event.text.unwrap_or_default());
let author = event
.username
.or(event.user)
.or(event.bot_id)
.unwrap_or_else(|| "unknown".to_string());
let message_id = event
.client_msg_id
.or(event.ts.clone())
.unwrap_or_else(|| format!("{}:{}", record.channel_id, thread_ts));
Ok(Some(SlackMessageEvent {
author,
text,
message_id,
}))
}
}
#[derive(Debug, Deserialize)]
struct SlackEvent {
#[serde(rename = "type")]
kind: String,
subtype: Option<String>,
channel: Option<String>,
user: Option<String>,
username: Option<String>,
bot_id: Option<String>,
text: Option<String>,
ts: Option<String>,
thread_ts: Option<String>,
client_msg_id: Option<String>,
message: Option<SlackMessage>,
}
#[derive(Debug, Deserialize)]
struct SlackMessage {
user: Option<String>,
username: Option<String>,
bot_id: Option<String>,
text: Option<String>,
ts: Option<String>,
thread_ts: Option<String>,
client_msg_id: Option<String>,
}
#[derive(Debug)]
struct SlackMessageEvent {
author: String,
text: String,
message_id: String,
}
fn simplify_mentions(text: String) -> String {
let simplified = USER_MENTION_RE.replace_all(&text, |caps: &regex::Captures| {
let label = caps.get(2).map(|m| m.as_str());
if let Some(label) = label {
format!("@{label}")
} else {
format!("@{}", &caps[1])
}
});
USER_MENTION_FIX_RE
.replace_all(&simplified, "@$1")
.to_string()
}

View File

@@ -12,6 +12,7 @@ use crate::mcp_connection_manager::McpConnectionManager;
use crate::models_manager::manager::ModelsManager;
use crate::skills::SkillsManager;
use crate::state_db::StateDbHandle;
use crate::slack::SlackThreadManager;
use crate::tools::sandboxing::ApprovalStore;
use crate::unified_exec::UnifiedExecProcessManager;
use codex_otel::OtelManager;
@@ -30,6 +31,7 @@ pub(crate) struct SessionServices {
pub(crate) user_shell: Arc<crate::shell::Shell>,
pub(crate) shell_snapshot_tx: watch::Sender<Option<Arc<crate::shell_snapshot::ShellSnapshot>>>,
pub(crate) show_raw_agent_reasoning: bool,
pub(crate) slack: Option<Arc<SlackThreadManager>>,
pub(crate) exec_policy: ExecPolicyManager,
pub(crate) auth_manager: Arc<AuthManager>,
pub(crate) models_manager: Arc<ModelsManager>,

View File

@@ -10,6 +10,7 @@ mod read_file;
mod request_user_input;
mod search_tool_bm25;
mod shell;
mod slack_notify;
mod test_sync;
mod unified_exec;
mod view_image;
@@ -33,6 +34,7 @@ pub(crate) use search_tool_bm25::DEFAULT_LIMIT as SEARCH_TOOL_BM25_DEFAULT_LIMIT
pub use search_tool_bm25::SearchToolBm25Handler;
pub use shell::ShellCommandHandler;
pub use shell::ShellHandler;
pub use slack_notify::SlackNotifyHandler;
pub use test_sync::TestSyncHandler;
pub use unified_exec::UnifiedExecHandler;
pub use view_image::ViewImageHandler;

View File

@@ -0,0 +1,75 @@
use async_trait::async_trait;
use serde::Deserialize;
use serde::Serialize;
use crate::function_tool::FunctionCallError;
use crate::tools::context::ToolInvocation;
use crate::tools::context::ToolOutput;
use crate::tools::context::ToolPayload;
use crate::tools::handlers::parse_arguments;
use crate::tools::registry::ToolHandler;
use crate::tools::registry::ToolKind;
pub struct SlackNotifyHandler;
#[derive(Debug, Deserialize)]
struct NotifyArgs {
message: String,
}
#[derive(Debug, Serialize)]
struct NotifyResult {
ok: bool,
channel: String,
ts: String,
thread_ts: String,
}
#[async_trait]
impl ToolHandler for SlackNotifyHandler {
fn kind(&self) -> ToolKind {
ToolKind::Function
}
fn matches_kind(&self, payload: &ToolPayload) -> bool {
matches!(payload, ToolPayload::Function { .. })
}
async fn handle(&self, invocation: ToolInvocation) -> Result<ToolOutput, FunctionCallError> {
let ToolInvocation {
session, payload, ..
} = invocation;
let arguments = match payload {
ToolPayload::Function { arguments } => arguments,
_ => {
return Err(FunctionCallError::RespondToModel(
"notify_user handler received unsupported payload".to_string(),
));
}
};
let args: NotifyArgs = parse_arguments(&arguments)?;
let slack = session
.services
.slack
.as_ref()
.ok_or_else(|| FunctionCallError::RespondToModel("Slack integration is not configured. Set SLACKMCP_NOTIFY_CHANNEL and SLACK_TOKEN/SLACK_COOKIE.".to_string()))?;
let result = slack.notify_user(&args.message).await.map_err(|err| {
FunctionCallError::RespondToModel(format!("Slack notify failed: {err:#}"))
})?;
let payload = NotifyResult {
ok: true,
channel: result.channel,
ts: result.ts,
thread_ts: result.thread_ts,
};
let content = serde_json::to_string(&payload).map_err(|err| {
FunctionCallError::Fatal(format!("failed to serialize notify_user result: {err}"))
})?;
Ok(ToolOutput::Function {
content,
success: Some(true),
content_items: None,
})
}
}

View File

@@ -664,6 +664,28 @@ fn create_request_user_input_tool() -> ToolSpec {
})
}
fn create_notify_user_tool() -> ToolSpec {
let mut properties = BTreeMap::new();
properties.insert(
"message".to_string(),
JsonSchema::String {
description: Some("Message to post into the current Codex Slack thread.".to_string()),
},
);
ToolSpec::Function(ResponsesApiTool {
name: "notify_user".to_string(),
description: "Post a message to the Slack thread associated with this Codex session."
.to_string(),
strict: false,
parameters: JsonSchema::Object {
properties,
required: Some(vec!["message".to_string()]),
additional_properties: Some(false.into()),
},
})
}
fn create_close_agent_tool() -> ToolSpec {
let mut properties = BTreeMap::new();
properties.insert(
@@ -1298,6 +1320,7 @@ pub(crate) fn build_specs(
use crate::tools::handlers::SearchToolBm25Handler;
use crate::tools::handlers::ShellCommandHandler;
use crate::tools::handlers::ShellHandler;
use crate::tools::handlers::SlackNotifyHandler;
use crate::tools::handlers::TestSyncHandler;
use crate::tools::handlers::UnifiedExecHandler;
use crate::tools::handlers::ViewImageHandler;
@@ -1316,6 +1339,7 @@ pub(crate) fn build_specs(
let shell_command_handler = Arc::new(ShellCommandHandler);
let request_user_input_handler = Arc::new(RequestUserInputHandler);
let search_tool_handler = Arc::new(SearchToolBm25Handler);
let slack_notify_handler = Arc::new(SlackNotifyHandler);
match &config.shell_type {
ConfigShellToolType::Default => {
@@ -1442,6 +1466,8 @@ pub(crate) fn build_specs(
builder.push_spec_with_parallel_support(create_view_image_tool(), true);
builder.register_handler("view_image", view_image_handler);
}
builder.push_spec(create_notify_user_tool());
builder.register_handler("notify_user", slack_notify_handler);
if config.collab_tools {
let collab_handler = Arc::new(CollabHandler);
@@ -1675,6 +1701,7 @@ mod tests {
external_web_access: Some(true),
},
create_view_image_tool(),
create_notify_user_tool(),
] {
expected.insert(tool_name(&spec).to_string(), spec);
}
@@ -1842,6 +1869,7 @@ mod tests {
"apply_patch",
"web_search",
"view_image",
"notify_user",
],
);
}
@@ -1864,6 +1892,7 @@ mod tests {
"apply_patch",
"web_search",
"view_image",
"notify_user",
],
);
}
@@ -1888,6 +1917,7 @@ mod tests {
"apply_patch",
"web_search",
"view_image",
"notify_user",
],
);
}
@@ -1912,6 +1942,7 @@ mod tests {
"apply_patch",
"web_search",
"view_image",
"notify_user",
],
);
}
@@ -1933,6 +1964,7 @@ mod tests {
"request_user_input",
"web_search",
"view_image",
"notify_user",
],
);
}
@@ -1955,6 +1987,7 @@ mod tests {
"apply_patch",
"web_search",
"view_image",
"notify_user",
],
);
}
@@ -1976,6 +2009,7 @@ mod tests {
"request_user_input",
"web_search",
"view_image",
"notify_user",
],
);
}
@@ -1998,6 +2032,7 @@ mod tests {
"apply_patch",
"web_search",
"view_image",
"notify_user",
],
);
}
@@ -2044,6 +2079,7 @@ mod tests {
"apply_patch",
"web_search",
"view_image",
"notify_user",
],
);
}
@@ -2067,6 +2103,7 @@ mod tests {
"request_user_input",
"web_search",
"view_image",
"notify_user",
],
);
}

View File

@@ -610,6 +610,7 @@ impl App {
model: Some(self.chat_widget.current_model().to_string()),
status_line_invalid_items_warned: self.status_line_invalid_items_warned.clone(),
otel_manager: self.otel_manager.clone(),
immortality_enable_after_first_turn: false,
}
}
@@ -930,6 +931,7 @@ impl App {
active_profile: Option<String>,
initial_prompt: Option<String>,
initial_images: Vec<PathBuf>,
immortal: bool,
session_selection: SessionSelection,
feedback: codex_feedback::CodexFeedback,
is_first_run: bool,
@@ -1030,6 +1032,7 @@ impl App {
model: Some(model.clone()),
status_line_invalid_items_warned: status_line_invalid_items_warned.clone(),
otel_manager: otel_manager.clone(),
immortality_enable_after_first_turn: immortal,
};
ChatWidget::new(init, thread_manager.clone())
}
@@ -1060,6 +1063,7 @@ impl App {
model: config.model.clone(),
status_line_invalid_items_warned: status_line_invalid_items_warned.clone(),
otel_manager: otel_manager.clone(),
immortality_enable_after_first_turn: immortal,
};
ChatWidget::new_from_existing(init, resumed.thread, resumed.session_configured)
}
@@ -1091,6 +1095,7 @@ impl App {
model: config.model.clone(),
status_line_invalid_items_warned: status_line_invalid_items_warned.clone(),
otel_manager: otel_manager.clone(),
immortality_enable_after_first_turn: immortal,
};
ChatWidget::new_from_existing(init, forked.thread, forked.session_configured)
}
@@ -1332,6 +1337,7 @@ impl App {
model: Some(model),
status_line_invalid_items_warned: self.status_line_invalid_items_warned.clone(),
otel_manager: self.otel_manager.clone(),
immortality_enable_after_first_turn: false,
};
self.chat_widget = ChatWidget::new(init, self.server.clone());
self.reset_thread_event_state();

View File

@@ -147,6 +147,8 @@ use tokio::task::JoinHandle;
use tracing::debug;
use tracing::warn;
const IMMORTALITY_FOLLOWUP_PROMPT: &str = "Please keep going. Work as autonomously as possible. Ask the user for clarification, but don't block on their response. Worst case, if you have nothing useful to do, poll every 30s for a response from the user for 5 minutes and then end your current turn.";
const DEFAULT_MODEL_DISPLAY_NAME: &str = "loading";
const PLAN_IMPLEMENTATION_TITLE: &str = "Implement this plan?";
const PLAN_IMPLEMENTATION_YES: &str = "Yes, implement this plan";
@@ -417,6 +419,7 @@ pub(crate) struct ChatWidgetInit {
// Shared latch so we only warn once about invalid status-line item IDs.
pub(crate) status_line_invalid_items_warned: Arc<AtomicBool>,
pub(crate) otel_manager: OtelManager,
pub(crate) immortality_enable_after_first_turn: bool,
}
#[derive(Default)]
@@ -579,6 +582,10 @@ pub(crate) struct ChatWidget {
is_review_mode: bool,
// Snapshot of token usage to restore after review mode exits.
pre_review_token_info: Option<Option<TokenUsageInfo>>,
/// Auto-continue by submitting a follow-up after each completed turn.
immortality_mode: bool,
/// Enable immortality after the next completed turn (used by --immortal).
immortality_enable_after_first_turn: bool,
// Whether the next streamed assistant content should be preceded by a final message separator.
//
// This is set whenever we insert a visible history cell that conceptually belongs to a turn.
@@ -1364,8 +1371,20 @@ impl ChatWidget {
if !from_replay {
self.saw_plan_item_this_turn = false;
}
if self.immortality_enable_after_first_turn && !self.is_review_mode {
self.immortality_enable_after_first_turn = false;
if !self.immortality_mode {
self.immortality_mode = true;
self.add_info_message(
"Immortality mode enabled (from --immortal).".to_string(),
Some("Press Ctrl+C to interrupt.".to_string()),
);
}
}
// If there is a queued user message, send exactly one now to begin the next turn.
self.maybe_send_next_queued_input();
self.maybe_send_immortality_prompt();
// Emit a notification when the turn completes (suppressed if focused).
self.notify(Notification::AgentTurnComplete {
response: last_agent_message.unwrap_or_default(),
@@ -1692,6 +1711,15 @@ impl ChatWidget {
self.clear_unified_exec_processes();
}
if reason == TurnAbortReason::Interrupted && self.immortality_mode {
self.immortality_mode = false;
self.immortality_enable_after_first_turn = false;
self.add_info_message(
"Immortality mode disabled after interrupt.".to_string(),
None,
);
}
if reason != TurnAbortReason::ReviewEnded {
self.add_to_history(history_cell::new_error_event(
"Conversation interrupted - tell the model what to do differently. Something went wrong? Hit `/feedback` to report the issue.".to_owned(),
@@ -2551,6 +2579,7 @@ impl ChatWidget {
model,
status_line_invalid_items_warned,
otel_manager,
immortality_enable_after_first_turn,
} = common;
let model = model.filter(|m| !m.trim().is_empty());
let mut config = config;
@@ -2813,6 +2842,8 @@ impl ChatWidget {
quit_shortcut_key: None,
is_review_mode: false,
pre_review_token_info: None,
immortality_mode: false,
immortality_enable_after_first_turn,
needs_final_message_separator: false,
had_work_activity: false,
last_separator_elapsed_secs: None,
@@ -2869,6 +2900,8 @@ impl ChatWidget {
model,
status_line_invalid_items_warned,
otel_manager,
immortality_enable_after_first_turn,
..
} = common;
let model = model.filter(|m| !m.trim().is_empty());
let mut rng = rand::rng();
@@ -2962,6 +2995,8 @@ impl ChatWidget {
quit_shortcut_key: None,
is_review_mode: false,
pre_review_token_info: None,
immortality_mode: false,
immortality_enable_after_first_turn,
needs_final_message_separator: false,
had_work_activity: false,
saw_plan_update_this_turn: false,
@@ -3397,6 +3432,23 @@ impl ChatWidget {
SlashCommand::Apps => {
self.add_connectors_output();
}
SlashCommand::Immortality => {
self.immortality_mode = !self.immortality_mode;
let status = if self.immortality_mode {
"enabled"
} else {
"disabled"
};
self.add_info_message(
format!(
"Immortality mode {status}. I will keep the agent running after each turn until interrupted.",
),
Some("Press Ctrl+C to interrupt.".to_string()),
);
if self.immortality_mode {
self.maybe_send_immortality_prompt();
}
}
SlashCommand::Rollout => {
if let Some(path) = self.rollout_path() {
self.add_info_message(
@@ -4195,6 +4247,16 @@ impl ChatWidget {
self.refresh_queued_user_messages();
}
fn maybe_send_immortality_prompt(&mut self) {
if !self.immortality_mode || self.is_review_mode {
return;
}
if self.bottom_pane.is_task_running() || !self.queued_user_messages.is_empty() {
return;
}
self.submit_user_message(UserMessage::from(IMMORTALITY_FOLLOWUP_PROMPT));
}
/// Rebuild and update the queued user messages from the current queue.
fn refresh_queued_user_messages(&mut self) {
let messages: Vec<String> = self

View File

@@ -974,6 +974,7 @@ async fn helpers_are_available_and_do_not_panic() {
model: Some(resolved_model),
status_line_invalid_items_warned: Arc::new(AtomicBool::new(false)),
otel_manager,
immortality_enable_after_first_turn: false,
};
let mut w = ChatWidget::new(init, thread_manager);
// Basic construction sanity.
@@ -1093,6 +1094,8 @@ async fn make_chatwidget_manual(
quit_shortcut_key: None,
is_review_mode: false,
pre_review_token_info: None,
immortality_mode: false,
immortality_enable_after_first_turn: false,
needs_final_message_separator: false,
had_work_activity: false,
saw_plan_update_this_turn: false,

View File

@@ -110,6 +110,10 @@ pub struct Cli {
#[arg(long = "no-alt-screen", default_value_t = false)]
pub no_alt_screen: bool,
/// Enable immortality mode after the first user turn.
#[arg(long = "immortal", default_value_t = false)]
pub immortal: bool,
#[clap(skip)]
pub config_overrides: CliConfigOverrides,
}

View File

@@ -688,6 +688,7 @@ async fn run_ratatui_app(
prompt,
images,
no_alt_screen,
immortal,
..
} = cli;
@@ -703,6 +704,7 @@ async fn run_ratatui_app(
active_profile,
prompt,
images,
immortal,
session_selection,
feedback,
should_show_trust_screen, // Proxy to: is it a first run in this directory?

View File

@@ -27,6 +27,7 @@ pub enum SlashCommand {
Init,
Compact,
Plan,
Immortality,
Collab,
Agent,
// Undo,
@@ -56,6 +57,7 @@ impl SlashCommand {
SlashCommand::New => "start a new chat during a conversation",
SlashCommand::Init => "create an AGENTS.md file with instructions for Codex",
SlashCommand::Compact => "summarize conversation to prevent hitting the context limit",
SlashCommand::Immortality => "auto-continue after each turn",
SlashCommand::Review => "review my current changes and find issues",
SlashCommand::Rename => "rename the current thread",
SlashCommand::Resume => "resume a saved chat",
@@ -131,7 +133,8 @@ impl SlashCommand {
| SlashCommand::Apps
| SlashCommand::Feedback
| SlashCommand::Quit
| SlashCommand::Exit => true,
| SlashCommand::Exit
| SlashCommand::Immortality => true,
SlashCommand::Rollout => true,
SlashCommand::TestApproval => true,
SlashCommand::Collab => true,