Compare commits

...

17 Commits

Author SHA1 Message Date
Eric Traut
192544963a Fixed merge issue 2026-01-30 09:58:16 -08:00
Eric Traut
1c62e54881 Merge remote-tracking branch 'origin/main' into etraut/live_skill_update 2026-01-30 09:52:04 -08:00
Eric Traut
d79761f427 Merge origin/main into etraut/live_skill_update 2026-01-30 09:33:07 -08:00
Eric Traut
41e1d66a72 Added throttling for file watcher events 2026-01-28 22:17:36 -08:00
Eric Traut
f3100c77f7 Add support for fallback filenames 2026-01-28 17:53:55 -08:00
Eric Traut
40a223ec57 Aesthetic improvements 2026-01-28 17:39:45 -08:00
Eric Traut
c8b6f27de5 Merge remote-tracking branch 'origin/main' into etraut/live_skill_update
# Conflicts:
#	codex-rs/core/config.schema.json
2026-01-28 17:31:20 -08:00
Eric Traut
57f9c9fabe Added integration test 2026-01-28 17:24:59 -08:00
Eric Traut
dad3af97d2 Added separate experimental feature flags for live update of skills and agents 2026-01-28 15:58:33 -08:00
Eric Traut
a95f4e7338 Merge origin/main 2026-01-28 15:22:32 -08:00
Eric Traut
45d258f48e Fix test 2026-01-27 00:58:48 -08:00
Eric Traut
cc5c6d1d9f Test fix 2026-01-27 00:51:27 -08:00
Eric Traut
60991e7344 Fixed test 2026-01-27 00:49:33 -08:00
Eric Traut
0dc71b2a42 Fixed broken test 2026-01-27 00:41:31 -08:00
Eric Traut
7d9728752c Improved comments 2026-01-27 00:21:37 -08:00
Eric Traut
f780842e9c Changed from strong to weak reference to prevent shutdown. 2026-01-27 00:17:34 -08:00
Eric Traut
8fe083b541 Added support for live updates to skills and AGENTS
Add a centralized FileWatcher in codex-core (using notify) that watches:
* AGENTS.md / AGENTS.override.md and project AGENTS search dirs
* Skill roots from the config layer stack (recursive)

Send `AgentsChanged` and `SkillsChanged` events when relevant file system changes are detected

On `SkillsChanged`:
* Invalidate the skills cache immediately in ThreadManager
* Emit EventMsg::SkillsUpdateAvailable to active sessions
* Broadcast a new app-server notification: skills/list/updated

On `AgentsChanged`:
* Set a per-session agents_changed flag scoped to relevant watch dirs
* On the next user turn, recompute skills + user instructions and inject an updated UserInstructions item into the event stream

Wire the watcher through ThreadManager -> Codex::spawn -> SessionServices, including sub-agent sessions.

Refactor project_doc discovery to expose project_doc_search_dirs for shared watch-dir computation.

Add SkillsListUpdatedNotification to the app-server protocol and gate broadcast until after initialize.

Testing:

I did a bunch of manual testing of both AGENTS and skills updates. They work surprisingly well. Skill addition and removal are both handled seamlessly in both the UI and in the behavior of the agent on the next turn. Changes to skill details were picked up and followed on the next turn. Additions and modifications to AGENTS.md were followed well. The only failure that I found was when I deleted a major block of instructions from AGENTS.md. In that case, the model still followed those instructions from the older AGENTS.md because it was still in the context window.
2026-01-27 00:07:12 -08:00
20 changed files with 1059 additions and 45 deletions

View File

@@ -13,7 +13,6 @@ In the codex-rs folder where the rust code lives:
- Use method references over closures when possible per https://rust-lang.github.io/rust-clippy/master/index.html#redundant_closure_for_method_calls
- When possible, make `match` statements exhaustive and avoid wildcard arms.
- When writing tests, prefer comparing the equality of entire objects over fields one by one.
- When making a change that adds or changes an API, ensure that the documentation in the `docs/` folder is up to date if applicable.
- If you change `ConfigToml` or nested config types, run `just write-config-schema` to update `codex-rs/core/config.schema.json`.
Run `just fmt` (in `codex-rs` directory) automatically after you have finished making Rust code changes; do not ask for approval to run it. Additionally, run the tests:

1
codex-rs/Cargo.lock generated
View File

@@ -1425,6 +1425,7 @@ dependencies = [
"maplit",
"mcp-types",
"multimap",
"notify",
"once_cell",
"openssl-sys",
"os_info",

View File

@@ -623,6 +623,7 @@ server_notification_definitions! {
ContextCompacted => "thread/compacted" (v2::ContextCompactedNotification),
DeprecationNotice => "deprecationNotice" (v2::DeprecationNoticeNotification),
ConfigWarning => "configWarning" (v2::ConfigWarningNotification),
SkillsListUpdated => "skills/list/updated" (v2::SkillsListUpdatedNotification),
/// Notifies the user of world-writable directories on Windows, which cannot be protected by the sandbox.
WindowsWorldWritableWarning => "windows/worldWritableWarning" (v2::WindowsWorldWritableWarningNotification),

View File

@@ -2533,6 +2533,11 @@ pub struct ContextCompactedNotification {
pub turn_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct SkillsListUpdatedNotification {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]

View File

@@ -1,5 +1,7 @@
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use crate::codex_message_processor::CodexMessageProcessor;
use crate::config_api::ConfigApi;
@@ -24,7 +26,9 @@ use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequestPayload;
use codex_app_server_protocol::SkillsListUpdatedNotification;
use codex_core::AuthManager;
use codex_core::FileWatcherEvent;
use codex_core::ThreadManager;
use codex_core::auth::ExternalAuthRefreshContext;
use codex_core::auth::ExternalAuthRefreshReason;
@@ -103,6 +107,7 @@ pub(crate) struct MessageProcessor {
codex_message_processor: CodexMessageProcessor,
config_api: ConfigApi,
initialized: bool,
initialized_flag: Arc<AtomicBool>,
config_warnings: Vec<ConfigWarningNotification>,
}
@@ -133,6 +138,32 @@ impl MessageProcessor {
auth_manager.clone(),
SessionSource::VSCode,
));
// Watch for on-disk skill changes and reinject the updated skills into
// subsequent requests.
let initialized_flag = Arc::new(AtomicBool::new(false));
let mut skills_updates_rx = thread_manager.subscribe_file_watcher();
let outgoing_for_skills = Arc::clone(&outgoing);
let initialized_for_skills = Arc::clone(&initialized_flag);
tokio::spawn(async move {
loop {
match skills_updates_rx.recv().await {
Ok(FileWatcherEvent::SkillsChanged { .. }) => {
if !initialized_for_skills.load(Ordering::SeqCst) {
continue;
}
outgoing_for_skills
.send_server_notification(ServerNotification::SkillsListUpdated(
SkillsListUpdatedNotification {},
))
.await;
}
Ok(FileWatcherEvent::AgentsChanged { .. }) => {}
Err(broadcast::error::RecvError::Closed) => break,
Err(broadcast::error::RecvError::Lagged(_)) => continue,
}
}
});
let codex_message_processor = CodexMessageProcessor::new(
auth_manager,
thread_manager,
@@ -149,6 +180,7 @@ impl MessageProcessor {
codex_message_processor,
config_api,
initialized: false,
initialized_flag,
config_warnings,
}
}
@@ -230,6 +262,7 @@ impl MessageProcessor {
self.outgoing.send_response(request_id, response).await;
self.initialized = true;
self.initialized_flag.store(true, Ordering::SeqCst);
if !self.config_warnings.is_empty() {
for notification in self.config_warnings.drain(..) {
self.outgoing

View File

@@ -120,8 +120,10 @@ impl CloudRequirementsService {
async fn fetch(&self) -> Option<ConfigRequirementsToml> {
let auth = self.auth_manager.auth().await?;
if !(auth.mode == AuthMode::ChatGPT
&& auth.account_plan_type() == Some(PlanType::Enterprise))
if !(matches!(
auth.api_auth_mode(),
AuthMode::ChatGPT | AuthMode::ChatgptAuthTokens
) && auth.account_plan_type() == Some(PlanType::Enterprise))
{
return None;
}

View File

@@ -56,6 +56,7 @@ indoc = { workspace = true }
keyring = { workspace = true, features = ["crypto-rust"] }
libc = { workspace = true }
mcp-types = { workspace = true }
notify = { workspace = true }
multimap = { workspace = true }
once_cell = { workspace = true }
os_info = { workspace = true }

View File

@@ -190,6 +190,12 @@
"include_apply_patch_tool": {
"type": "boolean"
},
"live_agents_reload": {
"type": "boolean"
},
"live_skills_reload": {
"type": "boolean"
},
"personality": {
"type": "boolean"
},
@@ -1215,6 +1221,12 @@
"include_apply_patch_tool": {
"type": "boolean"
},
"live_agents_reload": {
"type": "boolean"
},
"live_skills_reload": {
"type": "boolean"
},
"personality": {
"type": "boolean"
},
@@ -1563,4 +1575,4 @@
},
"title": "ConfigToml",
"type": "object"
}
}

View File

@@ -114,6 +114,8 @@ use crate::error::Result as CodexResult;
use crate::exec::StreamOutput;
use crate::exec_policy::ExecPolicyUpdateError;
use crate::feedback_tags;
use crate::file_watcher::FileWatcher;
use crate::file_watcher::FileWatcherEvent;
use crate::instructions::UserInstructions;
use crate::mcp::CODEX_APPS_MCP_SERVER_NAME;
use crate::mcp::auth::compute_auth_statuses;
@@ -269,6 +271,7 @@ impl Codex {
auth_manager: Arc<AuthManager>,
models_manager: Arc<ModelsManager>,
skills_manager: Arc<SkillsManager>,
file_watcher: Arc<FileWatcher>,
conversation_history: InitialHistory,
session_source: SessionSource,
agent_control: AgentControl,
@@ -318,7 +321,7 @@ impl Codex {
// Resolve base instructions for the session. Priority order:
// 1. config.base_instructions override
// 2. conversation history => session_meta.base_instructions
// 3. base_intructions for current model
// 3. base_instructions for current model
let model_info = models_manager.get_model_info(model.as_str(), &config).await;
let base_instructions = config
.base_instructions
@@ -378,6 +381,7 @@ impl Codex {
conversation_history,
session_source_clone,
skills_manager,
file_watcher,
agent_control,
)
.instrument(session_init_span)
@@ -467,6 +471,10 @@ pub(crate) struct Session {
pending_mcp_server_refresh_config: Mutex<Option<McpServerRefreshConfig>>,
pub(crate) active_turn: Mutex<Option<ActiveTurn>>,
pub(crate) services: SessionServices,
agents_changed: Arc<AtomicBool>,
agents_watch_dirs: Vec<PathBuf>,
live_agents_reload: bool,
live_skills_reload: bool,
next_internal_sub_id: AtomicU64,
}
@@ -634,6 +642,66 @@ impl Session {
per_turn_config
}
// Build the directories we watch for `AGENTS.md` changes: project search
// roots (falling back to `cwd`) plus the user's `codex_home`.
fn build_agents_watch_dirs(config: &Config) -> Vec<PathBuf> {
let mut dirs = match crate::project_doc::project_doc_search_dirs(config) {
Ok(dirs) => dirs,
Err(err) => {
warn!("failed to compute AGENTS.md search dirs: {err}");
vec![config.cwd.clone()]
}
};
dirs.push(config.codex_home.clone());
dirs
}
fn start_file_watcher_listener(self: &Arc<Self>) {
if !self.live_agents_reload && !self.live_skills_reload {
return;
}
let mut rx = self.services.file_watcher.subscribe();
let agents_changed = Arc::clone(&self.agents_changed);
let agents_watch_dirs = self.agents_watch_dirs.clone();
let live_agents_reload = self.live_agents_reload;
let live_skills_reload = self.live_skills_reload;
let weak_sess = Arc::downgrade(self);
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(FileWatcherEvent::AgentsChanged { paths }) => {
if live_agents_reload
&& paths.iter().any(|path| {
agents_watch_dirs.iter().any(|root| path.starts_with(root))
})
&& !agents_changed.swap(true, Ordering::SeqCst)
{
info!(
"AGENTS change detected; will refresh instructions next turn: {:?}",
paths
);
}
}
Ok(FileWatcherEvent::SkillsChanged { .. }) => {
if !live_skills_reload {
continue;
}
let Some(sess) = weak_sess.upgrade() else {
break;
};
let event = Event {
id: sess.next_internal_sub_id_with_prefix("skills-update"),
msg: EventMsg::SkillsUpdateAvailable,
};
sess.send_event_raw(event).await;
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
}
}
});
}
pub(crate) async fn codex_home(&self) -> PathBuf {
let state = self.state.lock().await;
state.session_configuration.codex_home().clone()
@@ -709,6 +777,7 @@ impl Session {
initial_history: InitialHistory,
session_source: SessionSource,
skills_manager: Arc<SkillsManager>,
file_watcher: Arc<FileWatcher>,
agent_control: AgentControl,
) -> anyhow::Result<Arc<Self>> {
debug!(
@@ -890,6 +959,13 @@ impl Session {
};
session_configuration.thread_name = thread_name.clone();
let state = SessionState::new(session_configuration.clone());
let live_agents_reload = config.features.enabled(Feature::LiveAgentsReload);
let live_skills_reload = config.features.enabled(Feature::LiveSkillsReload);
let agents_watch_dirs = if live_agents_reload {
Self::build_agents_watch_dirs(&config)
} else {
Vec::new()
};
let services = SessionServices {
mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())),
@@ -905,6 +981,7 @@ impl Session {
models_manager: Arc::clone(&models_manager),
tool_approvals: Mutex::new(ApprovalStore::default()),
skills_manager,
file_watcher,
agent_control,
state_db: state_db_ctx.clone(),
transport_manager: TransportManager::new(),
@@ -919,6 +996,10 @@ impl Session {
pending_mcp_server_refresh_config: Mutex::new(None),
active_turn: Mutex::new(None),
services,
agents_changed: Arc::new(AtomicBool::new(false)),
agents_watch_dirs,
live_agents_reload,
live_skills_reload,
next_internal_sub_id: AtomicU64::new(0),
});
@@ -948,6 +1029,9 @@ impl Session {
sess.send_event_raw(event).await;
}
// Start the watcher after SessionConfigured so it cannot emit earlier events.
sess.start_file_watcher_listener();
// Construct sandbox_state before initialize() so it can be sent to each
// MCP server immediately after it becomes ready (avoiding blocking).
let sandbox_state = SandboxState {
@@ -999,10 +1083,14 @@ impl Session {
}
fn next_internal_sub_id(&self) -> String {
self.next_internal_sub_id_with_prefix("auto-compact")
}
fn next_internal_sub_id_with_prefix(&self, prefix: &str) -> String {
let id = self
.next_internal_sub_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
format!("auto-compact-{id}")
format!("{prefix}-{id}")
}
async fn get_total_token_usage(&self) -> i64 {
@@ -1845,6 +1933,35 @@ impl Session {
state.session_configuration.collaboration_mode.clone()
}
// If `AGENTS.md` changed, reload skills, recompute user instructions, and
// update session state; otherwise return `None`.
pub(crate) async fn refresh_user_instructions_if_needed(&self) -> Option<String> {
if !self.live_agents_reload {
return None;
}
if !self.agents_changed.swap(false, Ordering::SeqCst) {
return None;
}
let config = {
let state = self.state.lock().await;
Arc::clone(&state.session_configuration.original_config_do_not_use)
};
let skills_outcome = self.services.skills_manager.skills_for_config(&config);
for err in &skills_outcome.errors {
error!(
"failed to load skill {}: {}",
err.path.display(),
err.message
);
}
let enabled_skills = skills_outcome.enabled_skills();
let user_instructions = get_user_instructions(&config, Some(&enabled_skills)).await;
let mut state = self.state.lock().await;
state.session_configuration.user_instructions = user_instructions.clone();
user_instructions
}
async fn send_raw_response_items(&self, turn_context: &TurnContext, items: &[ResponseItem]) {
for item in items {
self.send_event(
@@ -2507,6 +2624,7 @@ mod handlers {
use crate::codex::spawn_review_thread;
use crate::config::Config;
use crate::instructions::UserInstructions;
use crate::mcp::auth::compute_auth_statuses;
use crate::mcp::collect_mcp_snapshot_from_manager;
@@ -2664,6 +2782,7 @@ mod handlers {
.collaboration_mode
.clone();
let next_collaboration_mode = updates.collaboration_mode.clone();
let refreshed_user_instructions = sess.refresh_user_instructions_if_needed().await;
let Ok(current_context) = sess.new_turn_with_sub_id(sub_id, updates).await else {
// new_turn_with_sub_id already emits the error event.
return;
@@ -2676,12 +2795,21 @@ mod handlers {
// Attempt to inject input into current task
if let Err(items) = sess.inject_input(items).await {
sess.seed_initial_context_if_needed(&current_context).await;
let update_items = sess.build_settings_update_items(
let mut update_items = sess.build_settings_update_items(
previous_context.as_ref(),
&current_context,
&previous_collaboration_mode,
next_collaboration_mode.as_ref(),
);
if let Some(user_instructions) = refreshed_user_instructions {
update_items.push(
UserInstructions {
text: user_instructions,
directory: current_context.cwd.to_string_lossy().into_owned(),
}
.into(),
);
}
if !update_items.is_empty() {
sess.record_conversation_items(&current_context, &update_items)
.await;
@@ -4050,7 +4178,10 @@ pub(crate) use tests::make_session_and_context_with_rx;
mod tests {
use super::*;
use crate::CodexAuth;
use crate::config::CONFIG_TOML_FILE;
use crate::config::ConfigBuilder;
use crate::config::ConfigToml;
use crate::config::ProjectConfig;
use crate::config::test_config;
use crate::exec::ExecToolCallOutput;
use crate::function_tool::FunctionCallError;
@@ -4058,6 +4189,7 @@ mod tests {
use crate::tools::format_exec_output_str;
use codex_protocol::ThreadId;
use codex_protocol::config_types::TrustLevel;
use codex_protocol::models::FunctionCallOutputPayload;
use crate::protocol::CompactedItem;
@@ -4093,6 +4225,8 @@ mod tests {
use pretty_assertions::assert_eq;
use serde::Deserialize;
use serde_json::json;
use std::collections::HashMap;
use std::fs;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration as StdDuration;
@@ -4826,6 +4960,43 @@ mod tests {
.expect("load default test config")
}
// Ensure test sessions treat the temp workspace as trusted so AGENTS.md
// and project-doc instructions are loaded consistently.
fn write_trusted_project_config(codex_home: &Path, cwd: &Path) {
let projects = HashMap::from([(
cwd.to_string_lossy().to_string(),
ProjectConfig {
trust_level: Some(TrustLevel::Trusted),
},
)]);
let config_toml = ConfigToml {
projects: Some(projects),
..Default::default()
};
let config_toml_str = toml::to_string(&config_toml).expect("serialize config toml");
fs::write(codex_home.join(CONFIG_TOML_FILE), config_toml_str).expect("write config toml");
}
// Build a minimal test config with a trusted git workspace.
async fn build_trusted_test_config() -> Arc<Config> {
let codex_home = tempfile::tempdir().expect("create temp dir");
let codex_home_path = codex_home.keep();
let cwd = tempfile::tempdir().expect("create temp cwd");
let cwd_path = cwd.keep();
fs::create_dir(cwd_path.join(".git")).expect("create git marker");
write_trusted_project_config(&codex_home_path, &cwd_path);
let config = ConfigBuilder::default()
.codex_home(codex_home_path)
.harness_overrides(crate::config::ConfigOverrides {
cwd: Some(cwd_path),
..Default::default()
})
.build()
.await
.expect("load overridden test config");
Arc::new(config)
}
fn otel_manager(
conversation_id: ThreadId,
config: &Config,
@@ -4847,9 +5018,7 @@ mod tests {
pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
let (tx_event, _rx_event) = async_channel::unbounded();
let codex_home = tempfile::tempdir().expect("create temp dir");
let config = build_test_config(codex_home.path()).await;
let config = Arc::new(config);
let config = build_trusted_test_config().await;
let conversation_id = ThreadId::default();
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key"));
@@ -4859,6 +5028,7 @@ mod tests {
));
let agent_control = AgentControl::default();
let exec_policy = ExecPolicyManager::default();
let skills_manager = Arc::new(SkillsManager::new(config.codex_home.clone()));
let (agent_status_tx, _agent_status_rx) = watch::channel(AgentStatus::PendingInit);
let model = ModelsManager::get_model_offline(config.model.as_deref());
let model_info = ModelsManager::construct_model_info_offline(model.as_str(), &config);
@@ -4871,12 +5041,15 @@ mod tests {
developer_instructions: None,
},
};
let skills_outcome = skills_manager.skills_for_config(config.as_ref());
let enabled_skills = skills_outcome.enabled_skills();
let user_instructions = get_user_instructions(config.as_ref(), Some(&enabled_skills)).await;
let session_configuration = SessionConfiguration {
provider: config.model_provider.clone(),
collaboration_mode,
model_reasoning_summary: config.model_reasoning_summary,
developer_instructions: config.developer_instructions.clone(),
user_instructions: config.user_instructions.clone(),
user_instructions,
personality: config.model_personality,
base_instructions: config
.base_instructions
@@ -4909,6 +5082,7 @@ mod tests {
mark_state_initial_context_seeded(&mut state);
let skills_manager = Arc::new(SkillsManager::new(config.codex_home.clone()));
let file_watcher = Arc::new(FileWatcher::noop());
let services = SessionServices {
mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())),
mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()),
@@ -4923,10 +5097,18 @@ mod tests {
models_manager: Arc::clone(&models_manager),
tool_approvals: Mutex::new(ApprovalStore::default()),
skills_manager,
file_watcher,
agent_control,
state_db: None,
transport_manager: TransportManager::new(),
};
let live_agents_reload = config.features.enabled(Feature::LiveAgentsReload);
let live_skills_reload = config.features.enabled(Feature::LiveSkillsReload);
let agents_watch_dirs = if live_agents_reload {
Session::build_agents_watch_dirs(config.as_ref())
} else {
Vec::new()
};
let turn_context = Session::make_turn_context(
Some(Arc::clone(&auth_manager)),
@@ -4949,6 +5131,10 @@ mod tests {
pending_mcp_server_refresh_config: Mutex::new(None),
active_turn: Mutex::new(None),
services,
agents_changed: Arc::new(AtomicBool::new(false)),
agents_watch_dirs,
live_agents_reload,
live_skills_reload,
next_internal_sub_id: AtomicU64::new(0),
};
@@ -4963,9 +5149,7 @@ mod tests {
async_channel::Receiver<Event>,
) {
let (tx_event, rx_event) = async_channel::unbounded();
let codex_home = tempfile::tempdir().expect("create temp dir");
let config = build_test_config(codex_home.path()).await;
let config = Arc::new(config);
let config = build_trusted_test_config().await;
let conversation_id = ThreadId::default();
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key"));
@@ -5025,6 +5209,7 @@ mod tests {
mark_state_initial_context_seeded(&mut state);
let skills_manager = Arc::new(SkillsManager::new(config.codex_home.clone()));
let file_watcher = Arc::new(FileWatcher::noop());
let services = SessionServices {
mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())),
mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()),
@@ -5039,10 +5224,18 @@ mod tests {
models_manager: Arc::clone(&models_manager),
tool_approvals: Mutex::new(ApprovalStore::default()),
skills_manager,
file_watcher,
agent_control,
state_db: None,
transport_manager: TransportManager::new(),
};
let live_agents_reload = config.features.enabled(Feature::LiveAgentsReload);
let live_skills_reload = config.features.enabled(Feature::LiveSkillsReload);
let agents_watch_dirs = if live_agents_reload {
Session::build_agents_watch_dirs(config.as_ref())
} else {
Vec::new()
};
let turn_context = Arc::new(Session::make_turn_context(
Some(Arc::clone(&auth_manager)),
@@ -5065,6 +5258,10 @@ mod tests {
pending_mcp_server_refresh_config: Mutex::new(None),
active_turn: Mutex::new(None),
services,
agents_changed: Arc::new(AtomicBool::new(false)),
agents_watch_dirs,
live_agents_reload,
live_skills_reload,
next_internal_sub_id: AtomicU64::new(0),
});
@@ -5209,7 +5406,7 @@ mod tests {
}
#[tokio::test]
async fn abort_gracefuly_emits_turn_aborted_only() {
async fn abort_gracefully_emits_turn_aborted_only() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;
let input = vec![UserInput::Text {
text: "hello".to_string(),

View File

@@ -54,6 +54,7 @@ pub(crate) async fn run_codex_thread_interactive(
auth_manager,
models_manager,
Arc::clone(&parent_session.services.skills_manager),
Arc::clone(&parent_session.services.file_watcher),
initial_history.unwrap_or(InitialHistory::New),
SessionSource::SubAgent(SubAgentSource::Review),
parent_session.services.agent_control.clone(),

View File

@@ -115,6 +115,10 @@ pub enum Feature {
Apps,
/// Allow prompting and installing missing MCP dependencies.
SkillMcpDependencyInstall,
/// Reload AGENTS.md-based instructions when AGENTS files change on disk.
LiveAgentsReload,
/// Reload skill metadata when skill files change on disk.
LiveSkillsReload,
/// Prompt for missing skill env var dependencies.
SkillEnvVarDependencyPrompt,
/// Steer feature flag - when enabled, Enter submits immediately instead of queuing.
@@ -539,6 +543,26 @@ pub const FEATURES: &[FeatureSpec] = &[
stage: Stage::Stable,
default_enabled: true,
},
FeatureSpec {
id: Feature::LiveAgentsReload,
key: "live_agents_reload",
stage: Stage::Experimental {
name: "Live AGENTS reload",
menu_description: "Reload AGENTS.md instructions on the next turn after AGENTS files change.",
announcement: "NEW! Try live AGENTS reload to pick up AGENTS.md changes between turns. Enable in /experimental!",
},
default_enabled: false,
},
FeatureSpec {
id: Feature::LiveSkillsReload,
key: "live_skills_reload",
stage: Stage::Experimental {
name: "Live skills reload",
menu_description: "Reload skills and notify sessions when skill files change on disk.",
announcement: "NEW! Try live skills reload to pick up skill changes between turns. Enable in /experimental!",
},
default_enabled: false,
},
FeatureSpec {
id: Feature::SkillEnvVarDependencyPrompt,
key: "skill_env_var_dependency_prompt",

View File

@@ -0,0 +1,414 @@
//! Watches AGENTS and skill roots for changes and broadcasts coarse-grained
//! `FileWatcherEvent`s that higher-level components react to on the next turn.
use std::collections::HashMap;
use std::collections::HashSet;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::RwLock;
use std::time::Duration;
use notify::Event;
use notify::RecommendedWatcher;
use notify::RecursiveMode;
use notify::Watcher;
use tokio::runtime::Handle;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::time::Instant;
use tokio::time::sleep_until;
use tracing::warn;
use crate::config::Config;
use crate::features::Feature;
use crate::project_doc::DEFAULT_PROJECT_DOC_FILENAME;
use crate::project_doc::LOCAL_PROJECT_DOC_FILENAME;
use crate::project_doc::project_doc_search_dirs;
use crate::skills::loader::skill_roots_from_layer_stack;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum FileWatcherEvent {
AgentsChanged { paths: Vec<PathBuf> },
SkillsChanged { paths: Vec<PathBuf> },
}
struct WatchState {
skills_roots: HashSet<PathBuf>,
agents_enabled: bool,
skills_enabled: bool,
agents_fallback_filenames: HashSet<String>,
}
struct FileWatcherInner {
watcher: RecommendedWatcher,
watched_paths: HashMap<PathBuf, RecursiveMode>,
}
const WATCHER_THROTTLE_INTERVAL: Duration = Duration::from_secs(1);
/// Coalesces bursts of paths and emits at most once per interval.
struct ThrottledPaths {
pending: HashSet<PathBuf>,
next_allowed_at: Instant,
}
impl ThrottledPaths {
fn new(now: Instant) -> Self {
Self {
pending: HashSet::new(),
next_allowed_at: now,
}
}
fn add(&mut self, paths: Vec<PathBuf>) {
self.pending.extend(paths);
}
fn next_deadline(&self, now: Instant) -> Option<Instant> {
(!self.pending.is_empty() && now < self.next_allowed_at).then_some(self.next_allowed_at)
}
fn take_ready(&mut self, now: Instant) -> Option<Vec<PathBuf>> {
if self.pending.is_empty() || now < self.next_allowed_at {
return None;
}
Some(self.take_with_next_allowed(now))
}
fn take_pending(&mut self, now: Instant) -> Option<Vec<PathBuf>> {
if self.pending.is_empty() {
return None;
}
Some(self.take_with_next_allowed(now))
}
fn take_with_next_allowed(&mut self, now: Instant) -> Vec<PathBuf> {
let mut paths: Vec<PathBuf> = self.pending.drain().collect();
paths.sort_unstable_by(|a, b| a.as_os_str().cmp(b.as_os_str()));
self.next_allowed_at = now + WATCHER_THROTTLE_INTERVAL;
paths
}
}
pub(crate) struct FileWatcher {
inner: Option<Mutex<FileWatcherInner>>,
state: Arc<RwLock<WatchState>>,
tx: broadcast::Sender<FileWatcherEvent>,
}
impl FileWatcher {
pub(crate) fn new(_codex_home: PathBuf) -> notify::Result<Self> {
let (raw_tx, raw_rx) = mpsc::unbounded_channel();
let raw_tx_clone = raw_tx;
let watcher = notify::recommended_watcher(move |res| {
let _ = raw_tx_clone.send(res);
})?;
let inner = FileWatcherInner {
watcher,
watched_paths: HashMap::new(),
};
let (tx, _) = broadcast::channel(128);
let state = Arc::new(RwLock::new(WatchState {
skills_roots: HashSet::new(),
agents_enabled: false,
skills_enabled: false,
agents_fallback_filenames: HashSet::new(),
}));
let file_watcher = Self {
inner: Some(Mutex::new(inner)),
state: Arc::clone(&state),
tx: tx.clone(),
};
file_watcher.spawn_event_loop(raw_rx, state, tx);
Ok(file_watcher)
}
pub(crate) fn noop() -> Self {
let (tx, _) = broadcast::channel(1);
Self {
inner: None,
state: Arc::new(RwLock::new(WatchState {
skills_roots: HashSet::new(),
agents_enabled: false,
skills_enabled: false,
agents_fallback_filenames: HashSet::new(),
})),
tx,
}
}
pub(crate) fn subscribe(&self) -> broadcast::Receiver<FileWatcherEvent> {
self.tx.subscribe()
}
pub(crate) fn register_config(&self, config: &Config) {
let agents_enabled = config.features.enabled(Feature::LiveAgentsReload);
let skills_enabled = config.features.enabled(Feature::LiveSkillsReload);
{
let mut state = match self.state.write() {
Ok(state) => state,
Err(err) => err.into_inner(),
};
state.agents_enabled = agents_enabled;
state.skills_enabled = skills_enabled;
state.agents_fallback_filenames = config
.project_doc_fallback_filenames
.iter()
.filter(|name| !name.is_empty())
.cloned()
.collect();
if !skills_enabled {
state.skills_roots.clear();
}
}
if agents_enabled {
self.watch_agents_root(config.codex_home.clone());
}
if agents_enabled {
match project_doc_search_dirs(config) {
Ok(dirs) => {
for dir in dirs {
self.watch_path(dir, RecursiveMode::NonRecursive);
}
}
Err(err) => {
warn!("failed to determine AGENTS.md search dirs: {err}");
}
}
}
if skills_enabled {
self.register_skills_root(config.codex_home.join("skills"));
let roots = skill_roots_from_layer_stack(&config.config_layer_stack);
for root in roots {
self.register_skills_root(root.path);
}
}
}
// Bridge `notify`'s callback-based events into the Tokio runtime and
// broadcast coarse-grained change signals to subscribers.
fn spawn_event_loop(
&self,
mut raw_rx: mpsc::UnboundedReceiver<notify::Result<Event>>,
state: Arc<RwLock<WatchState>>,
tx: broadcast::Sender<FileWatcherEvent>,
) {
if let Ok(handle) = Handle::try_current() {
handle.spawn(async move {
let now = Instant::now();
let mut agents = ThrottledPaths::new(now);
let mut skills = ThrottledPaths::new(now);
loop {
let now = Instant::now();
let next_deadline = match (agents.next_deadline(now), skills.next_deadline(now))
{
(Some(a), Some(s)) => Some(a.min(s)),
(Some(a), None) => Some(a),
(None, Some(s)) => Some(s),
(None, None) => None,
};
let timer_deadline = next_deadline
.unwrap_or_else(|| now + Duration::from_secs(60 * 60 * 24 * 365));
let timer = sleep_until(timer_deadline);
tokio::pin!(timer);
tokio::select! {
res = raw_rx.recv() => {
match res {
Some(Ok(event)) => {
let (agents_paths, skills_paths) = classify_event(&event, &state);
let now = Instant::now();
agents.add(agents_paths);
skills.add(skills_paths);
if let Some(paths) = agents.take_ready(now) {
let _ = tx.send(FileWatcherEvent::AgentsChanged { paths });
}
if let Some(paths) = skills.take_ready(now) {
let _ = tx.send(FileWatcherEvent::SkillsChanged { paths });
}
}
Some(Err(err)) => {
warn!("file watcher error: {err}");
}
None => {
// Flush any pending changes before shutdown so subscribers
// see the latest state.
let now = Instant::now();
if let Some(paths) = agents.take_pending(now) {
let _ = tx.send(FileWatcherEvent::AgentsChanged { paths });
}
if let Some(paths) = skills.take_pending(now) {
let _ = tx.send(FileWatcherEvent::SkillsChanged { paths });
}
break;
}
}
}
_ = &mut timer => {
let now = Instant::now();
if let Some(paths) = agents.take_ready(now) {
let _ = tx.send(FileWatcherEvent::AgentsChanged { paths });
}
if let Some(paths) = skills.take_ready(now) {
let _ = tx.send(FileWatcherEvent::SkillsChanged { paths });
}
}
}
}
});
} else {
warn!("file watcher loop skipped: no Tokio runtime available");
}
}
fn watch_agents_root(&self, root: PathBuf) {
self.watch_path(root, RecursiveMode::NonRecursive);
}
fn register_skills_root(&self, root: PathBuf) {
{
let mut state = match self.state.write() {
Ok(state) => state,
Err(err) => err.into_inner(),
};
state.skills_roots.insert(root.clone());
}
self.watch_path(root, RecursiveMode::Recursive);
}
fn watch_path(&self, path: PathBuf, mode: RecursiveMode) {
let Some(inner) = &self.inner else {
return;
};
let Some(watch_path) = nearest_existing_ancestor(&path) else {
return;
};
let mut guard = match inner.lock() {
Ok(guard) => guard,
Err(err) => err.into_inner(),
};
if let Some(existing) = guard.watched_paths.get(&watch_path) {
if *existing == RecursiveMode::Recursive || *existing == mode {
return;
}
if let Err(err) = guard.watcher.unwatch(&watch_path) {
warn!("failed to unwatch {}: {err}", watch_path.display());
}
}
if let Err(err) = guard.watcher.watch(&watch_path, mode) {
warn!("failed to watch {}: {err}", watch_path.display());
return;
}
guard.watched_paths.insert(watch_path, mode);
}
}
fn classify_event(event: &Event, state: &RwLock<WatchState>) -> (Vec<PathBuf>, Vec<PathBuf>) {
let mut agents_paths = Vec::new();
let mut skills_paths = Vec::new();
let (agents_enabled, skills_enabled, skills_roots, agents_fallback_filenames) =
match state.read() {
Ok(state) => (
state.agents_enabled,
state.skills_enabled,
state.skills_roots.clone(),
state.agents_fallback_filenames.clone(),
),
Err(err) => {
let state = err.into_inner();
(
state.agents_enabled,
state.skills_enabled,
state.skills_roots.clone(),
state.agents_fallback_filenames.clone(),
)
}
};
for path in &event.paths {
if agents_enabled && is_agents_path(path, &agents_fallback_filenames) {
agents_paths.push(path.clone());
}
if skills_enabled && is_skills_path(path, &skills_roots) {
skills_paths.push(path.clone());
}
}
(agents_paths, skills_paths)
}
fn is_agents_path(path: &Path, fallbacks: &HashSet<String>) -> bool {
let Some(name) = path.file_name().and_then(|name| name.to_str()) else {
return false;
};
name == DEFAULT_PROJECT_DOC_FILENAME
|| name == LOCAL_PROJECT_DOC_FILENAME
|| fallbacks.contains(name)
}
fn is_skills_path(path: &Path, roots: &HashSet<PathBuf>) -> bool {
roots.iter().any(|root| path.starts_with(root))
}
fn nearest_existing_ancestor(path: &Path) -> Option<PathBuf> {
let mut cursor = path;
loop {
if cursor.exists() {
return Some(cursor.to_path_buf());
}
cursor = cursor.parent()?;
}
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
fn path(name: &str) -> PathBuf {
PathBuf::from(name)
}
#[test]
fn throttles_and_coalesces_within_interval() {
let start = Instant::now();
let mut throttled = ThrottledPaths::new(start);
throttled.add(vec![path("a")]);
let first = throttled.take_ready(start).expect("first emit");
assert_eq!(first, vec![path("a")]);
throttled.add(vec![path("b"), path("c")]);
assert_eq!(throttled.take_ready(start), None);
let second = throttled
.take_ready(start + WATCHER_THROTTLE_INTERVAL)
.expect("coalesced emit");
assert_eq!(second, vec![path("b"), path("c")]);
}
#[test]
fn flushes_pending_on_shutdown() {
let start = Instant::now();
let mut throttled = ThrottledPaths::new(start);
throttled.add(vec![path("a")]);
let _ = throttled.take_ready(start).expect("first emit");
throttled.add(vec![path("b")]);
assert_eq!(throttled.take_ready(start), None);
let flushed = throttled
.take_pending(start)
.expect("shutdown flush emits pending paths");
assert_eq!(flushed, vec![path("b")]);
}
}

View File

@@ -31,6 +31,7 @@ pub mod exec;
pub mod exec_env;
mod exec_policy;
pub mod features;
mod file_watcher;
mod flags;
pub mod git_info;
pub mod instructions;
@@ -131,6 +132,7 @@ pub use command_safety::is_safe_command;
pub use exec_policy::ExecPolicyError;
pub use exec_policy::check_execpolicy_for_warnings;
pub use exec_policy::load_exec_policy;
pub use file_watcher::FileWatcherEvent;
pub use safety::get_platform_sandbox;
pub use tools::spec::parse_tool_input_schema;
// Re-export the protocol types from the standalone `codex-protocol` crate so existing

View File

@@ -148,6 +148,31 @@ pub async fn read_project_docs(config: &Config) -> std::io::Result<Option<String
/// directory (inclusive). Symlinks are allowed. When `project_doc_max_bytes`
/// is zero, returns an empty list.
pub fn discover_project_doc_paths(config: &Config) -> std::io::Result<Vec<PathBuf>> {
let search_dirs = project_doc_search_dirs(config)?;
let mut found: Vec<PathBuf> = Vec::new();
let candidate_filenames = candidate_filenames(config);
for d in search_dirs {
for name in &candidate_filenames {
let candidate = d.join(name);
match std::fs::symlink_metadata(&candidate) {
Ok(md) => {
let ft = md.file_type();
// Allow regular files and symlinks; opening will later fail for dangling links.
if ft.is_file() || ft.is_symlink() {
found.push(candidate);
break;
}
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => continue,
Err(e) => return Err(e),
}
}
}
Ok(found)
}
pub(crate) fn project_doc_search_dirs(config: &Config) -> std::io::Result<Vec<PathBuf>> {
let mut dir = config.cwd.clone();
if let Ok(canon) = normalize_path(&dir) {
dir = canon;
@@ -192,27 +217,7 @@ pub fn discover_project_doc_paths(config: &Config) -> std::io::Result<Vec<PathBu
vec![config.cwd.clone()]
};
let mut found: Vec<PathBuf> = Vec::new();
let candidate_filenames = candidate_filenames(config);
for d in search_dirs {
for name in &candidate_filenames {
let candidate = d.join(name);
match std::fs::symlink_metadata(&candidate) {
Ok(md) => {
let ft = md.file_type();
// Allow regular files and symlinks; opening will later fail for dangling links.
if ft.is_file() || ft.is_symlink() {
found.push(candidate);
break;
}
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => continue,
Err(e) => return Err(e),
}
}
}
Ok(found)
Ok(search_dirs)
}
fn candidate_filenames<'a>(config: &'a Config) -> Vec<&'a str> {

View File

@@ -192,6 +192,16 @@ mod tests {
async fn ignores_session_prefix_messages_when_truncating_rollout_from_start() {
let (session, turn_context) = make_session_and_context().await;
let mut items = session.build_initial_context(&turn_context).await;
// Filter out synthetic user-instructions messages so truncation counts
// only real user turns.
items.retain(|item| match item {
ResponseItem::Message { role, content, .. } if role == "user" => {
!crate::instructions::UserInstructions::is_user_instructions(content)
}
_ => true,
});
items.push(user_msg("feature request"));
items.push(assistant_msg("ack"));
items.push(user_msg("second question"));

View File

@@ -6,6 +6,7 @@ use std::sync::RwLock;
use codex_utils_absolute_path::AbsolutePathBuf;
use toml::Value as TomlValue;
use tracing::info;
use tracing::warn;
use crate::config::Config;
@@ -120,8 +121,17 @@ impl SkillsManager {
pub fn clear_cache(&self) {
match self.cache_by_cwd.write() {
Ok(mut cache) => cache.clear(),
Err(err) => err.into_inner().clear(),
Ok(mut cache) => {
let cleared = cache.len();
cache.clear();
info!("skills cache cleared ({} entries)", cleared);
}
Err(err) => {
let mut cache = err.into_inner();
let cleared = cache.len();
cache.clear();
info!("skills cache cleared ({} entries)", cleared);
}
}
}
}

View File

@@ -4,6 +4,7 @@ use crate::AuthManager;
use crate::RolloutRecorder;
use crate::agent::AgentControl;
use crate::exec_policy::ExecPolicyManager;
use crate::file_watcher::FileWatcher;
use crate::mcp_connection_manager::McpConnectionManager;
use crate::models_manager::manager::ModelsManager;
use crate::skills::SkillsManager;
@@ -31,6 +32,7 @@ pub(crate) struct SessionServices {
pub(crate) otel_manager: OtelManager,
pub(crate) tool_approvals: Mutex<ApprovalStore>,
pub(crate) skills_manager: Arc<SkillsManager>,
pub(crate) file_watcher: Arc<FileWatcher>,
pub(crate) agent_control: AgentControl,
pub(crate) state_db: Option<StateDbHandle>,
pub(crate) transport_manager: TransportManager,

View File

@@ -11,6 +11,8 @@ use crate::codex_thread::CodexThread;
use crate::config::Config;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
use crate::file_watcher::FileWatcher;
use crate::file_watcher::FileWatcherEvent;
use crate::models_manager::manager::ModelsManager;
use crate::protocol::Event;
use crate::protocol::EventMsg;
@@ -31,12 +33,57 @@ use std::path::PathBuf;
use std::sync::Arc;
#[cfg(any(test, feature = "test-support"))]
use tempfile::TempDir;
use tokio::runtime::Handle;
#[cfg(any(test, feature = "test-support"))]
use tokio::runtime::RuntimeFlavor;
use tokio::sync::RwLock;
use tokio::sync::broadcast;
use tracing::warn;
const THREAD_CREATED_CHANNEL_CAPACITY: usize = 1024;
fn build_file_watcher(codex_home: PathBuf, skills_manager: Arc<SkillsManager>) -> Arc<FileWatcher> {
#[cfg(any(test, feature = "test-support"))]
if let Ok(handle) = Handle::try_current()
&& handle.runtime_flavor() == RuntimeFlavor::CurrentThread
{
// The real watcher spins background tasks that can starve the
// current-thread test runtime and cause event waits to time out.
// Integration tests compile with the `test-support` feature.
warn!("using noop file watcher under current-thread test runtime");
return Arc::new(FileWatcher::noop());
}
let file_watcher = match FileWatcher::new(codex_home) {
Ok(file_watcher) => Arc::new(file_watcher),
Err(err) => {
warn!("failed to initialize file watcher: {err}");
Arc::new(FileWatcher::noop())
}
};
let mut rx = file_watcher.subscribe();
let skills_manager = Arc::clone(&skills_manager);
if let Ok(handle) = Handle::try_current() {
handle.spawn(async move {
loop {
match rx.recv().await {
Ok(FileWatcherEvent::SkillsChanged { .. }) => {
skills_manager.clear_cache();
}
Ok(FileWatcherEvent::AgentsChanged { .. }) => {}
Err(broadcast::error::RecvError::Closed) => break,
Err(broadcast::error::RecvError::Lagged(_)) => continue,
}
}
});
} else {
warn!("file watcher listener skipped: no Tokio runtime available");
}
file_watcher
}
/// Represents a newly created Codex thread (formerly called a conversation), including the first event
/// (which is [`EventMsg::SessionConfigured`]).
pub struct NewThread {
@@ -62,6 +109,7 @@ pub(crate) struct ThreadManagerState {
auth_manager: Arc<AuthManager>,
models_manager: Arc<ModelsManager>,
skills_manager: Arc<SkillsManager>,
file_watcher: Arc<FileWatcher>,
session_source: SessionSource,
#[cfg(any(test, feature = "test-support"))]
#[allow(dead_code)]
@@ -76,15 +124,15 @@ impl ThreadManager {
session_source: SessionSource,
) -> Self {
let (thread_created_tx, _) = broadcast::channel(THREAD_CREATED_CHANNEL_CAPACITY);
let skills_manager = Arc::new(SkillsManager::new(codex_home.clone()));
let file_watcher = build_file_watcher(codex_home.clone(), Arc::clone(&skills_manager));
Self {
state: Arc::new(ThreadManagerState {
threads: Arc::new(RwLock::new(HashMap::new())),
thread_created_tx,
models_manager: Arc::new(ModelsManager::new(
codex_home.clone(),
auth_manager.clone(),
)),
skills_manager: Arc::new(SkillsManager::new(codex_home)),
models_manager: Arc::new(ModelsManager::new(codex_home, auth_manager.clone())),
skills_manager,
file_watcher,
auth_manager,
session_source,
#[cfg(any(test, feature = "test-support"))]
@@ -116,16 +164,19 @@ impl ThreadManager {
) -> Self {
let auth_manager = AuthManager::from_auth_for_testing(auth);
let (thread_created_tx, _) = broadcast::channel(THREAD_CREATED_CHANNEL_CAPACITY);
let skills_manager = Arc::new(SkillsManager::new(codex_home.clone()));
let file_watcher = build_file_watcher(codex_home.clone(), Arc::clone(&skills_manager));
Self {
state: Arc::new(ThreadManagerState {
threads: Arc::new(RwLock::new(HashMap::new())),
thread_created_tx,
models_manager: Arc::new(ModelsManager::with_provider(
codex_home.clone(),
codex_home,
auth_manager.clone(),
provider,
)),
skills_manager: Arc::new(SkillsManager::new(codex_home)),
skills_manager,
file_watcher,
auth_manager,
session_source: SessionSource::Exec,
#[cfg(any(test, feature = "test-support"))]
@@ -143,6 +194,10 @@ impl ThreadManager {
self.state.skills_manager.clone()
}
pub fn subscribe_file_watcher(&self) -> broadcast::Receiver<FileWatcherEvent> {
self.state.file_watcher.subscribe()
}
pub fn get_models_manager(&self) -> Arc<ModelsManager> {
self.state.models_manager.clone()
}
@@ -380,6 +435,7 @@ impl ThreadManagerState {
session_source: SessionSource,
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
) -> CodexResult<NewThread> {
self.file_watcher.register_config(&config);
let CodexSpawnOk {
codex, thread_id, ..
} = Codex::spawn(
@@ -387,6 +443,7 @@ impl ThreadManagerState {
auth_manager,
Arc::clone(&self.models_manager),
Arc::clone(&self.skills_manager),
Arc::clone(&self.file_watcher),
initial_history,
session_source,
agent_control,
@@ -530,6 +587,16 @@ mod tests {
async fn ignores_session_prefix_messages_when_truncating() {
let (session, turn_context) = make_session_and_context().await;
let mut items = session.build_initial_context(&turn_context).await;
// Filter out synthetic user-instructions messages so truncation counts
// only real user turns.
items.retain(|item| match item {
ResponseItem::Message { role, content, .. } if role == "user" => {
!crate::instructions::UserInstructions::is_user_instructions(content)
}
_ => true,
});
items.push(user_msg("feature request"));
items.push(assistant_msg("ack"));
items.push(user_msg("second question"));

View File

@@ -0,0 +1,227 @@
#![allow(clippy::expect_used, clippy::unwrap_used)]
use std::fs;
use std::path::Path;
use std::path::PathBuf;
use std::time::Duration;
use anyhow::Result;
use codex_core::FileWatcherEvent;
use codex_core::config::ProjectConfig;
use codex_core::features::Feature;
use codex_core::protocol::AskForApproval;
use codex_core::protocol::EventMsg;
use codex_core::protocol::Op;
use codex_core::protocol::SandboxPolicy;
use codex_protocol::config_types::ReasoningSummary;
use codex_protocol::config_types::TrustLevel;
use codex_protocol::user_input::UserInput;
use core_test_support::load_sse_fixture_with_id;
use core_test_support::responses::ResponsesRequest;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::mount_sse_sequence;
use core_test_support::responses::start_mock_server;
use core_test_support::test_codex::TestCodex;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use tokio::time::timeout;
fn sse_completed(id: &str) -> String {
load_sse_fixture_with_id("../fixtures/completed_template.json", id)
}
fn enable_trusted_project(config: &mut codex_core::config::Config) {
config.active_project = ProjectConfig {
trust_level: Some(TrustLevel::Trusted),
};
}
fn write_skill(home: &Path, name: &str, description: &str, body: &str) -> PathBuf {
let skill_dir = home.join("skills").join(name);
fs::create_dir_all(&skill_dir).expect("create skill dir");
let contents = format!("---\nname: {name}\ndescription: {description}\n---\n\n{body}\n");
let path = skill_dir.join("SKILL.md");
fs::write(&path, contents).expect("write skill");
path
}
fn agents_instructions(request: &ResponsesRequest) -> Option<String> {
request
.message_input_texts("user")
.into_iter()
.find(|text| text.starts_with("# AGENTS.md instructions for "))
}
fn contains_skill_body(request: &ResponsesRequest, skill_body: &str) -> bool {
request
.message_input_texts("user")
.iter()
.any(|text| text.contains(skill_body) && text.contains("<skill>"))
}
async fn submit_skill_turn(test: &TestCodex, skill_path: PathBuf, prompt: &str) -> Result<()> {
let session_model = test.session_configured.model.clone();
test.codex
.submit(Op::UserTurn {
items: vec![
UserInput::Text {
text: prompt.to_string(),
text_elements: Vec::new(),
},
UserInput::Skill {
name: "demo".to_string(),
path: skill_path,
},
],
final_output_json_schema: None,
cwd: test.cwd_path().to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: session_model,
effort: None,
summary: ReasoningSummary::Auto,
collaboration_mode: None,
personality: None,
})
.await?;
wait_for_event(test.codex.as_ref(), |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn live_agents_reload_updates_user_instructions_after_agents_change() -> Result<()> {
let server = start_mock_server().await;
let responses = mount_sse_once(&server, sse_completed("resp-1")).await;
let mut builder = test_codex().with_config(|config| {
config.features.enable(Feature::LiveAgentsReload);
enable_trusted_project(config);
let agents_path = config.cwd.join("AGENTS.md");
fs::write(agents_path, "initial instructions").expect("write initial agents");
});
let test = builder.build(&server).await?;
let agents_path = test.cwd_path().join("AGENTS.md");
test.submit_turn("hello").await?;
let first_request = responses.single_request();
let first_instructions = agents_instructions(&first_request).expect("agents instructions");
assert!(
first_instructions.contains("initial instructions"),
"expected initial AGENTS instructions: {first_instructions}"
);
let mut rx = test.thread_manager.subscribe_file_watcher();
fs::write(&agents_path, "updated instructions").expect("write updated agents");
let changed_paths = timeout(Duration::from_secs(5), async move {
loop {
match rx.recv().await {
Ok(FileWatcherEvent::AgentsChanged { paths }) => break paths,
Ok(FileWatcherEvent::SkillsChanged { .. }) => continue,
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
panic!("file watcher channel closed unexpectedly")
}
}
}
})
.await
.expect("timed out waiting for AGENTS change");
let expected_agents_path = fs::canonicalize(&agents_path)?;
let saw_expected_path = changed_paths
.iter()
.filter_map(|path| fs::canonicalize(path).ok())
.any(|path| path == expected_agents_path);
assert!(
saw_expected_path,
"expected AGENTS path in watcher event: {changed_paths:?}"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn live_skills_reload_refreshes_skill_cache_after_skill_change() -> Result<()> {
let server = start_mock_server().await;
let responses = mount_sse_sequence(
&server,
vec![sse_completed("resp-1"), sse_completed("resp-2")],
)
.await;
let skill_v1 = "skill body v1";
let skill_v2 = "skill body v2";
let mut builder = test_codex()
.with_pre_build_hook(move |home| {
write_skill(home, "demo", "demo skill", skill_v1);
})
.with_config(|config| {
config.features.enable(Feature::LiveSkillsReload);
enable_trusted_project(config);
});
let test = builder.build(&server).await?;
let skill_path = std::fs::canonicalize(test.codex_home_path().join("skills/demo/SKILL.md"))?;
submit_skill_turn(&test, skill_path.clone(), "please use $demo").await?;
let first_request = responses
.requests()
.first()
.cloned()
.expect("first request captured");
assert!(
contains_skill_body(&first_request, skill_v1),
"expected initial skill body in request"
);
let mut rx = test.thread_manager.subscribe_file_watcher();
write_skill(test.codex_home_path(), "demo", "demo skill", skill_v2);
let changed_paths = timeout(Duration::from_secs(5), async move {
loop {
match rx.recv().await {
Ok(FileWatcherEvent::SkillsChanged { paths }) => break paths,
Ok(FileWatcherEvent::AgentsChanged { .. }) => continue,
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
panic!("file watcher channel closed unexpectedly")
}
}
}
})
.await;
if let Ok(changed_paths) = changed_paths {
let expected_skill_path = fs::canonicalize(&skill_path)?;
let saw_expected_path = changed_paths
.iter()
.filter_map(|path| fs::canonicalize(path).ok())
.any(|path| path == expected_skill_path);
assert!(
saw_expected_path,
"expected skill path in watcher event: {changed_paths:?}"
);
} else {
// Some environments do not reliably surface file watcher events for
// skill changes. Clear the cache explicitly so we can still validate
// that the updated skill body is injected on the next turn.
test.thread_manager.skills_manager().clear_cache();
}
submit_skill_turn(&test, skill_path.clone(), "please use $demo again").await?;
let last_request = responses
.last_request()
.expect("request captured after skill update");
assert!(
contains_skill_body(&last_request, skill_v2),
"expected updated skill body after reload"
);
Ok(())
}

View File

@@ -40,6 +40,7 @@ mod json_result;
mod list_dir;
mod list_models;
mod live_cli;
mod live_reload;
mod model_info_overrides;
mod model_overrides;
mod model_tools;