mirror of
https://github.com/openai/codex.git
synced 2026-02-01 22:47:52 +00:00
Compare commits
17 Commits
main
...
etraut/liv
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
192544963a | ||
|
|
1c62e54881 | ||
|
|
d79761f427 | ||
|
|
41e1d66a72 | ||
|
|
f3100c77f7 | ||
|
|
40a223ec57 | ||
|
|
c8b6f27de5 | ||
|
|
57f9c9fabe | ||
|
|
dad3af97d2 | ||
|
|
a95f4e7338 | ||
|
|
45d258f48e | ||
|
|
cc5c6d1d9f | ||
|
|
60991e7344 | ||
|
|
0dc71b2a42 | ||
|
|
7d9728752c | ||
|
|
f780842e9c | ||
|
|
8fe083b541 |
@@ -13,7 +13,6 @@ In the codex-rs folder where the rust code lives:
|
||||
- Use method references over closures when possible per https://rust-lang.github.io/rust-clippy/master/index.html#redundant_closure_for_method_calls
|
||||
- When possible, make `match` statements exhaustive and avoid wildcard arms.
|
||||
- When writing tests, prefer comparing the equality of entire objects over fields one by one.
|
||||
- When making a change that adds or changes an API, ensure that the documentation in the `docs/` folder is up to date if applicable.
|
||||
- If you change `ConfigToml` or nested config types, run `just write-config-schema` to update `codex-rs/core/config.schema.json`.
|
||||
|
||||
Run `just fmt` (in `codex-rs` directory) automatically after you have finished making Rust code changes; do not ask for approval to run it. Additionally, run the tests:
|
||||
|
||||
1
codex-rs/Cargo.lock
generated
1
codex-rs/Cargo.lock
generated
@@ -1425,6 +1425,7 @@ dependencies = [
|
||||
"maplit",
|
||||
"mcp-types",
|
||||
"multimap",
|
||||
"notify",
|
||||
"once_cell",
|
||||
"openssl-sys",
|
||||
"os_info",
|
||||
|
||||
@@ -623,6 +623,7 @@ server_notification_definitions! {
|
||||
ContextCompacted => "thread/compacted" (v2::ContextCompactedNotification),
|
||||
DeprecationNotice => "deprecationNotice" (v2::DeprecationNoticeNotification),
|
||||
ConfigWarning => "configWarning" (v2::ConfigWarningNotification),
|
||||
SkillsListUpdated => "skills/list/updated" (v2::SkillsListUpdatedNotification),
|
||||
|
||||
/// Notifies the user of world-writable directories on Windows, which cannot be protected by the sandbox.
|
||||
WindowsWorldWritableWarning => "windows/worldWritableWarning" (v2::WindowsWorldWritableWarningNotification),
|
||||
|
||||
@@ -2533,6 +2533,11 @@ pub struct ContextCompactedNotification {
|
||||
pub turn_id: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct SkillsListUpdatedNotification {}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use crate::codex_message_processor::CodexMessageProcessor;
|
||||
use crate::config_api::ConfigApi;
|
||||
@@ -24,7 +26,9 @@ use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequestPayload;
|
||||
use codex_app_server_protocol::SkillsListUpdatedNotification;
|
||||
use codex_core::AuthManager;
|
||||
use codex_core::FileWatcherEvent;
|
||||
use codex_core::ThreadManager;
|
||||
use codex_core::auth::ExternalAuthRefreshContext;
|
||||
use codex_core::auth::ExternalAuthRefreshReason;
|
||||
@@ -103,6 +107,7 @@ pub(crate) struct MessageProcessor {
|
||||
codex_message_processor: CodexMessageProcessor,
|
||||
config_api: ConfigApi,
|
||||
initialized: bool,
|
||||
initialized_flag: Arc<AtomicBool>,
|
||||
config_warnings: Vec<ConfigWarningNotification>,
|
||||
}
|
||||
|
||||
@@ -133,6 +138,32 @@ impl MessageProcessor {
|
||||
auth_manager.clone(),
|
||||
SessionSource::VSCode,
|
||||
));
|
||||
|
||||
// Watch for on-disk skill changes and reinject the updated skills into
|
||||
// subsequent requests.
|
||||
let initialized_flag = Arc::new(AtomicBool::new(false));
|
||||
let mut skills_updates_rx = thread_manager.subscribe_file_watcher();
|
||||
let outgoing_for_skills = Arc::clone(&outgoing);
|
||||
let initialized_for_skills = Arc::clone(&initialized_flag);
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match skills_updates_rx.recv().await {
|
||||
Ok(FileWatcherEvent::SkillsChanged { .. }) => {
|
||||
if !initialized_for_skills.load(Ordering::SeqCst) {
|
||||
continue;
|
||||
}
|
||||
outgoing_for_skills
|
||||
.send_server_notification(ServerNotification::SkillsListUpdated(
|
||||
SkillsListUpdatedNotification {},
|
||||
))
|
||||
.await;
|
||||
}
|
||||
Ok(FileWatcherEvent::AgentsChanged { .. }) => {}
|
||||
Err(broadcast::error::RecvError::Closed) => break,
|
||||
Err(broadcast::error::RecvError::Lagged(_)) => continue,
|
||||
}
|
||||
}
|
||||
});
|
||||
let codex_message_processor = CodexMessageProcessor::new(
|
||||
auth_manager,
|
||||
thread_manager,
|
||||
@@ -149,6 +180,7 @@ impl MessageProcessor {
|
||||
codex_message_processor,
|
||||
config_api,
|
||||
initialized: false,
|
||||
initialized_flag,
|
||||
config_warnings,
|
||||
}
|
||||
}
|
||||
@@ -230,6 +262,7 @@ impl MessageProcessor {
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
|
||||
self.initialized = true;
|
||||
self.initialized_flag.store(true, Ordering::SeqCst);
|
||||
if !self.config_warnings.is_empty() {
|
||||
for notification in self.config_warnings.drain(..) {
|
||||
self.outgoing
|
||||
|
||||
@@ -120,8 +120,10 @@ impl CloudRequirementsService {
|
||||
|
||||
async fn fetch(&self) -> Option<ConfigRequirementsToml> {
|
||||
let auth = self.auth_manager.auth().await?;
|
||||
if !(auth.mode == AuthMode::ChatGPT
|
||||
&& auth.account_plan_type() == Some(PlanType::Enterprise))
|
||||
if !(matches!(
|
||||
auth.api_auth_mode(),
|
||||
AuthMode::ChatGPT | AuthMode::ChatgptAuthTokens
|
||||
) && auth.account_plan_type() == Some(PlanType::Enterprise))
|
||||
{
|
||||
return None;
|
||||
}
|
||||
|
||||
@@ -56,6 +56,7 @@ indoc = { workspace = true }
|
||||
keyring = { workspace = true, features = ["crypto-rust"] }
|
||||
libc = { workspace = true }
|
||||
mcp-types = { workspace = true }
|
||||
notify = { workspace = true }
|
||||
multimap = { workspace = true }
|
||||
once_cell = { workspace = true }
|
||||
os_info = { workspace = true }
|
||||
|
||||
@@ -190,6 +190,12 @@
|
||||
"include_apply_patch_tool": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"live_agents_reload": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"live_skills_reload": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"personality": {
|
||||
"type": "boolean"
|
||||
},
|
||||
@@ -1215,6 +1221,12 @@
|
||||
"include_apply_patch_tool": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"live_agents_reload": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"live_skills_reload": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"personality": {
|
||||
"type": "boolean"
|
||||
},
|
||||
@@ -1563,4 +1575,4 @@
|
||||
},
|
||||
"title": "ConfigToml",
|
||||
"type": "object"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -114,6 +114,8 @@ use crate::error::Result as CodexResult;
|
||||
use crate::exec::StreamOutput;
|
||||
use crate::exec_policy::ExecPolicyUpdateError;
|
||||
use crate::feedback_tags;
|
||||
use crate::file_watcher::FileWatcher;
|
||||
use crate::file_watcher::FileWatcherEvent;
|
||||
use crate::instructions::UserInstructions;
|
||||
use crate::mcp::CODEX_APPS_MCP_SERVER_NAME;
|
||||
use crate::mcp::auth::compute_auth_statuses;
|
||||
@@ -269,6 +271,7 @@ impl Codex {
|
||||
auth_manager: Arc<AuthManager>,
|
||||
models_manager: Arc<ModelsManager>,
|
||||
skills_manager: Arc<SkillsManager>,
|
||||
file_watcher: Arc<FileWatcher>,
|
||||
conversation_history: InitialHistory,
|
||||
session_source: SessionSource,
|
||||
agent_control: AgentControl,
|
||||
@@ -318,7 +321,7 @@ impl Codex {
|
||||
// Resolve base instructions for the session. Priority order:
|
||||
// 1. config.base_instructions override
|
||||
// 2. conversation history => session_meta.base_instructions
|
||||
// 3. base_intructions for current model
|
||||
// 3. base_instructions for current model
|
||||
let model_info = models_manager.get_model_info(model.as_str(), &config).await;
|
||||
let base_instructions = config
|
||||
.base_instructions
|
||||
@@ -378,6 +381,7 @@ impl Codex {
|
||||
conversation_history,
|
||||
session_source_clone,
|
||||
skills_manager,
|
||||
file_watcher,
|
||||
agent_control,
|
||||
)
|
||||
.instrument(session_init_span)
|
||||
@@ -467,6 +471,10 @@ pub(crate) struct Session {
|
||||
pending_mcp_server_refresh_config: Mutex<Option<McpServerRefreshConfig>>,
|
||||
pub(crate) active_turn: Mutex<Option<ActiveTurn>>,
|
||||
pub(crate) services: SessionServices,
|
||||
agents_changed: Arc<AtomicBool>,
|
||||
agents_watch_dirs: Vec<PathBuf>,
|
||||
live_agents_reload: bool,
|
||||
live_skills_reload: bool,
|
||||
next_internal_sub_id: AtomicU64,
|
||||
}
|
||||
|
||||
@@ -634,6 +642,66 @@ impl Session {
|
||||
per_turn_config
|
||||
}
|
||||
|
||||
// Build the directories we watch for `AGENTS.md` changes: project search
|
||||
// roots (falling back to `cwd`) plus the user's `codex_home`.
|
||||
fn build_agents_watch_dirs(config: &Config) -> Vec<PathBuf> {
|
||||
let mut dirs = match crate::project_doc::project_doc_search_dirs(config) {
|
||||
Ok(dirs) => dirs,
|
||||
Err(err) => {
|
||||
warn!("failed to compute AGENTS.md search dirs: {err}");
|
||||
vec![config.cwd.clone()]
|
||||
}
|
||||
};
|
||||
dirs.push(config.codex_home.clone());
|
||||
dirs
|
||||
}
|
||||
|
||||
fn start_file_watcher_listener(self: &Arc<Self>) {
|
||||
if !self.live_agents_reload && !self.live_skills_reload {
|
||||
return;
|
||||
}
|
||||
let mut rx = self.services.file_watcher.subscribe();
|
||||
let agents_changed = Arc::clone(&self.agents_changed);
|
||||
let agents_watch_dirs = self.agents_watch_dirs.clone();
|
||||
let live_agents_reload = self.live_agents_reload;
|
||||
let live_skills_reload = self.live_skills_reload;
|
||||
let weak_sess = Arc::downgrade(self);
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
Ok(FileWatcherEvent::AgentsChanged { paths }) => {
|
||||
if live_agents_reload
|
||||
&& paths.iter().any(|path| {
|
||||
agents_watch_dirs.iter().any(|root| path.starts_with(root))
|
||||
})
|
||||
&& !agents_changed.swap(true, Ordering::SeqCst)
|
||||
{
|
||||
info!(
|
||||
"AGENTS change detected; will refresh instructions next turn: {:?}",
|
||||
paths
|
||||
);
|
||||
}
|
||||
}
|
||||
Ok(FileWatcherEvent::SkillsChanged { .. }) => {
|
||||
if !live_skills_reload {
|
||||
continue;
|
||||
}
|
||||
let Some(sess) = weak_sess.upgrade() else {
|
||||
break;
|
||||
};
|
||||
let event = Event {
|
||||
id: sess.next_internal_sub_id_with_prefix("skills-update"),
|
||||
msg: EventMsg::SkillsUpdateAvailable,
|
||||
};
|
||||
sess.send_event_raw(event).await;
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
pub(crate) async fn codex_home(&self) -> PathBuf {
|
||||
let state = self.state.lock().await;
|
||||
state.session_configuration.codex_home().clone()
|
||||
@@ -709,6 +777,7 @@ impl Session {
|
||||
initial_history: InitialHistory,
|
||||
session_source: SessionSource,
|
||||
skills_manager: Arc<SkillsManager>,
|
||||
file_watcher: Arc<FileWatcher>,
|
||||
agent_control: AgentControl,
|
||||
) -> anyhow::Result<Arc<Self>> {
|
||||
debug!(
|
||||
@@ -890,6 +959,13 @@ impl Session {
|
||||
};
|
||||
session_configuration.thread_name = thread_name.clone();
|
||||
let state = SessionState::new(session_configuration.clone());
|
||||
let live_agents_reload = config.features.enabled(Feature::LiveAgentsReload);
|
||||
let live_skills_reload = config.features.enabled(Feature::LiveSkillsReload);
|
||||
let agents_watch_dirs = if live_agents_reload {
|
||||
Self::build_agents_watch_dirs(&config)
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
|
||||
let services = SessionServices {
|
||||
mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())),
|
||||
@@ -905,6 +981,7 @@ impl Session {
|
||||
models_manager: Arc::clone(&models_manager),
|
||||
tool_approvals: Mutex::new(ApprovalStore::default()),
|
||||
skills_manager,
|
||||
file_watcher,
|
||||
agent_control,
|
||||
state_db: state_db_ctx.clone(),
|
||||
transport_manager: TransportManager::new(),
|
||||
@@ -919,6 +996,10 @@ impl Session {
|
||||
pending_mcp_server_refresh_config: Mutex::new(None),
|
||||
active_turn: Mutex::new(None),
|
||||
services,
|
||||
agents_changed: Arc::new(AtomicBool::new(false)),
|
||||
agents_watch_dirs,
|
||||
live_agents_reload,
|
||||
live_skills_reload,
|
||||
next_internal_sub_id: AtomicU64::new(0),
|
||||
});
|
||||
|
||||
@@ -948,6 +1029,9 @@ impl Session {
|
||||
sess.send_event_raw(event).await;
|
||||
}
|
||||
|
||||
// Start the watcher after SessionConfigured so it cannot emit earlier events.
|
||||
sess.start_file_watcher_listener();
|
||||
|
||||
// Construct sandbox_state before initialize() so it can be sent to each
|
||||
// MCP server immediately after it becomes ready (avoiding blocking).
|
||||
let sandbox_state = SandboxState {
|
||||
@@ -999,10 +1083,14 @@ impl Session {
|
||||
}
|
||||
|
||||
fn next_internal_sub_id(&self) -> String {
|
||||
self.next_internal_sub_id_with_prefix("auto-compact")
|
||||
}
|
||||
|
||||
fn next_internal_sub_id_with_prefix(&self, prefix: &str) -> String {
|
||||
let id = self
|
||||
.next_internal_sub_id
|
||||
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
|
||||
format!("auto-compact-{id}")
|
||||
format!("{prefix}-{id}")
|
||||
}
|
||||
|
||||
async fn get_total_token_usage(&self) -> i64 {
|
||||
@@ -1845,6 +1933,35 @@ impl Session {
|
||||
state.session_configuration.collaboration_mode.clone()
|
||||
}
|
||||
|
||||
// If `AGENTS.md` changed, reload skills, recompute user instructions, and
|
||||
// update session state; otherwise return `None`.
|
||||
pub(crate) async fn refresh_user_instructions_if_needed(&self) -> Option<String> {
|
||||
if !self.live_agents_reload {
|
||||
return None;
|
||||
}
|
||||
if !self.agents_changed.swap(false, Ordering::SeqCst) {
|
||||
return None;
|
||||
}
|
||||
|
||||
let config = {
|
||||
let state = self.state.lock().await;
|
||||
Arc::clone(&state.session_configuration.original_config_do_not_use)
|
||||
};
|
||||
let skills_outcome = self.services.skills_manager.skills_for_config(&config);
|
||||
for err in &skills_outcome.errors {
|
||||
error!(
|
||||
"failed to load skill {}: {}",
|
||||
err.path.display(),
|
||||
err.message
|
||||
);
|
||||
}
|
||||
let enabled_skills = skills_outcome.enabled_skills();
|
||||
let user_instructions = get_user_instructions(&config, Some(&enabled_skills)).await;
|
||||
let mut state = self.state.lock().await;
|
||||
state.session_configuration.user_instructions = user_instructions.clone();
|
||||
user_instructions
|
||||
}
|
||||
|
||||
async fn send_raw_response_items(&self, turn_context: &TurnContext, items: &[ResponseItem]) {
|
||||
for item in items {
|
||||
self.send_event(
|
||||
@@ -2507,6 +2624,7 @@ mod handlers {
|
||||
|
||||
use crate::codex::spawn_review_thread;
|
||||
use crate::config::Config;
|
||||
use crate::instructions::UserInstructions;
|
||||
|
||||
use crate::mcp::auth::compute_auth_statuses;
|
||||
use crate::mcp::collect_mcp_snapshot_from_manager;
|
||||
@@ -2664,6 +2782,7 @@ mod handlers {
|
||||
.collaboration_mode
|
||||
.clone();
|
||||
let next_collaboration_mode = updates.collaboration_mode.clone();
|
||||
let refreshed_user_instructions = sess.refresh_user_instructions_if_needed().await;
|
||||
let Ok(current_context) = sess.new_turn_with_sub_id(sub_id, updates).await else {
|
||||
// new_turn_with_sub_id already emits the error event.
|
||||
return;
|
||||
@@ -2676,12 +2795,21 @@ mod handlers {
|
||||
// Attempt to inject input into current task
|
||||
if let Err(items) = sess.inject_input(items).await {
|
||||
sess.seed_initial_context_if_needed(¤t_context).await;
|
||||
let update_items = sess.build_settings_update_items(
|
||||
let mut update_items = sess.build_settings_update_items(
|
||||
previous_context.as_ref(),
|
||||
¤t_context,
|
||||
&previous_collaboration_mode,
|
||||
next_collaboration_mode.as_ref(),
|
||||
);
|
||||
if let Some(user_instructions) = refreshed_user_instructions {
|
||||
update_items.push(
|
||||
UserInstructions {
|
||||
text: user_instructions,
|
||||
directory: current_context.cwd.to_string_lossy().into_owned(),
|
||||
}
|
||||
.into(),
|
||||
);
|
||||
}
|
||||
if !update_items.is_empty() {
|
||||
sess.record_conversation_items(¤t_context, &update_items)
|
||||
.await;
|
||||
@@ -4050,7 +4178,10 @@ pub(crate) use tests::make_session_and_context_with_rx;
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::CodexAuth;
|
||||
use crate::config::CONFIG_TOML_FILE;
|
||||
use crate::config::ConfigBuilder;
|
||||
use crate::config::ConfigToml;
|
||||
use crate::config::ProjectConfig;
|
||||
use crate::config::test_config;
|
||||
use crate::exec::ExecToolCallOutput;
|
||||
use crate::function_tool::FunctionCallError;
|
||||
@@ -4058,6 +4189,7 @@ mod tests {
|
||||
use crate::tools::format_exec_output_str;
|
||||
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::config_types::TrustLevel;
|
||||
use codex_protocol::models::FunctionCallOutputPayload;
|
||||
|
||||
use crate::protocol::CompactedItem;
|
||||
@@ -4093,6 +4225,8 @@ mod tests {
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde::Deserialize;
|
||||
use serde_json::json;
|
||||
use std::collections::HashMap;
|
||||
use std::fs;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration as StdDuration;
|
||||
@@ -4826,6 +4960,43 @@ mod tests {
|
||||
.expect("load default test config")
|
||||
}
|
||||
|
||||
// Ensure test sessions treat the temp workspace as trusted so AGENTS.md
|
||||
// and project-doc instructions are loaded consistently.
|
||||
fn write_trusted_project_config(codex_home: &Path, cwd: &Path) {
|
||||
let projects = HashMap::from([(
|
||||
cwd.to_string_lossy().to_string(),
|
||||
ProjectConfig {
|
||||
trust_level: Some(TrustLevel::Trusted),
|
||||
},
|
||||
)]);
|
||||
let config_toml = ConfigToml {
|
||||
projects: Some(projects),
|
||||
..Default::default()
|
||||
};
|
||||
let config_toml_str = toml::to_string(&config_toml).expect("serialize config toml");
|
||||
fs::write(codex_home.join(CONFIG_TOML_FILE), config_toml_str).expect("write config toml");
|
||||
}
|
||||
|
||||
// Build a minimal test config with a trusted git workspace.
|
||||
async fn build_trusted_test_config() -> Arc<Config> {
|
||||
let codex_home = tempfile::tempdir().expect("create temp dir");
|
||||
let codex_home_path = codex_home.keep();
|
||||
let cwd = tempfile::tempdir().expect("create temp cwd");
|
||||
let cwd_path = cwd.keep();
|
||||
fs::create_dir(cwd_path.join(".git")).expect("create git marker");
|
||||
write_trusted_project_config(&codex_home_path, &cwd_path);
|
||||
let config = ConfigBuilder::default()
|
||||
.codex_home(codex_home_path)
|
||||
.harness_overrides(crate::config::ConfigOverrides {
|
||||
cwd: Some(cwd_path),
|
||||
..Default::default()
|
||||
})
|
||||
.build()
|
||||
.await
|
||||
.expect("load overridden test config");
|
||||
Arc::new(config)
|
||||
}
|
||||
|
||||
fn otel_manager(
|
||||
conversation_id: ThreadId,
|
||||
config: &Config,
|
||||
@@ -4847,9 +5018,7 @@ mod tests {
|
||||
|
||||
pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
|
||||
let (tx_event, _rx_event) = async_channel::unbounded();
|
||||
let codex_home = tempfile::tempdir().expect("create temp dir");
|
||||
let config = build_test_config(codex_home.path()).await;
|
||||
let config = Arc::new(config);
|
||||
let config = build_trusted_test_config().await;
|
||||
let conversation_id = ThreadId::default();
|
||||
let auth_manager =
|
||||
AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key"));
|
||||
@@ -4859,6 +5028,7 @@ mod tests {
|
||||
));
|
||||
let agent_control = AgentControl::default();
|
||||
let exec_policy = ExecPolicyManager::default();
|
||||
let skills_manager = Arc::new(SkillsManager::new(config.codex_home.clone()));
|
||||
let (agent_status_tx, _agent_status_rx) = watch::channel(AgentStatus::PendingInit);
|
||||
let model = ModelsManager::get_model_offline(config.model.as_deref());
|
||||
let model_info = ModelsManager::construct_model_info_offline(model.as_str(), &config);
|
||||
@@ -4871,12 +5041,15 @@ mod tests {
|
||||
developer_instructions: None,
|
||||
},
|
||||
};
|
||||
let skills_outcome = skills_manager.skills_for_config(config.as_ref());
|
||||
let enabled_skills = skills_outcome.enabled_skills();
|
||||
let user_instructions = get_user_instructions(config.as_ref(), Some(&enabled_skills)).await;
|
||||
let session_configuration = SessionConfiguration {
|
||||
provider: config.model_provider.clone(),
|
||||
collaboration_mode,
|
||||
model_reasoning_summary: config.model_reasoning_summary,
|
||||
developer_instructions: config.developer_instructions.clone(),
|
||||
user_instructions: config.user_instructions.clone(),
|
||||
user_instructions,
|
||||
personality: config.model_personality,
|
||||
base_instructions: config
|
||||
.base_instructions
|
||||
@@ -4909,6 +5082,7 @@ mod tests {
|
||||
mark_state_initial_context_seeded(&mut state);
|
||||
let skills_manager = Arc::new(SkillsManager::new(config.codex_home.clone()));
|
||||
|
||||
let file_watcher = Arc::new(FileWatcher::noop());
|
||||
let services = SessionServices {
|
||||
mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())),
|
||||
mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()),
|
||||
@@ -4923,10 +5097,18 @@ mod tests {
|
||||
models_manager: Arc::clone(&models_manager),
|
||||
tool_approvals: Mutex::new(ApprovalStore::default()),
|
||||
skills_manager,
|
||||
file_watcher,
|
||||
agent_control,
|
||||
state_db: None,
|
||||
transport_manager: TransportManager::new(),
|
||||
};
|
||||
let live_agents_reload = config.features.enabled(Feature::LiveAgentsReload);
|
||||
let live_skills_reload = config.features.enabled(Feature::LiveSkillsReload);
|
||||
let agents_watch_dirs = if live_agents_reload {
|
||||
Session::build_agents_watch_dirs(config.as_ref())
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
|
||||
let turn_context = Session::make_turn_context(
|
||||
Some(Arc::clone(&auth_manager)),
|
||||
@@ -4949,6 +5131,10 @@ mod tests {
|
||||
pending_mcp_server_refresh_config: Mutex::new(None),
|
||||
active_turn: Mutex::new(None),
|
||||
services,
|
||||
agents_changed: Arc::new(AtomicBool::new(false)),
|
||||
agents_watch_dirs,
|
||||
live_agents_reload,
|
||||
live_skills_reload,
|
||||
next_internal_sub_id: AtomicU64::new(0),
|
||||
};
|
||||
|
||||
@@ -4963,9 +5149,7 @@ mod tests {
|
||||
async_channel::Receiver<Event>,
|
||||
) {
|
||||
let (tx_event, rx_event) = async_channel::unbounded();
|
||||
let codex_home = tempfile::tempdir().expect("create temp dir");
|
||||
let config = build_test_config(codex_home.path()).await;
|
||||
let config = Arc::new(config);
|
||||
let config = build_trusted_test_config().await;
|
||||
let conversation_id = ThreadId::default();
|
||||
let auth_manager =
|
||||
AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key"));
|
||||
@@ -5025,6 +5209,7 @@ mod tests {
|
||||
mark_state_initial_context_seeded(&mut state);
|
||||
let skills_manager = Arc::new(SkillsManager::new(config.codex_home.clone()));
|
||||
|
||||
let file_watcher = Arc::new(FileWatcher::noop());
|
||||
let services = SessionServices {
|
||||
mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())),
|
||||
mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()),
|
||||
@@ -5039,10 +5224,18 @@ mod tests {
|
||||
models_manager: Arc::clone(&models_manager),
|
||||
tool_approvals: Mutex::new(ApprovalStore::default()),
|
||||
skills_manager,
|
||||
file_watcher,
|
||||
agent_control,
|
||||
state_db: None,
|
||||
transport_manager: TransportManager::new(),
|
||||
};
|
||||
let live_agents_reload = config.features.enabled(Feature::LiveAgentsReload);
|
||||
let live_skills_reload = config.features.enabled(Feature::LiveSkillsReload);
|
||||
let agents_watch_dirs = if live_agents_reload {
|
||||
Session::build_agents_watch_dirs(config.as_ref())
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
|
||||
let turn_context = Arc::new(Session::make_turn_context(
|
||||
Some(Arc::clone(&auth_manager)),
|
||||
@@ -5065,6 +5258,10 @@ mod tests {
|
||||
pending_mcp_server_refresh_config: Mutex::new(None),
|
||||
active_turn: Mutex::new(None),
|
||||
services,
|
||||
agents_changed: Arc::new(AtomicBool::new(false)),
|
||||
agents_watch_dirs,
|
||||
live_agents_reload,
|
||||
live_skills_reload,
|
||||
next_internal_sub_id: AtomicU64::new(0),
|
||||
});
|
||||
|
||||
@@ -5209,7 +5406,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn abort_gracefuly_emits_turn_aborted_only() {
|
||||
async fn abort_gracefully_emits_turn_aborted_only() {
|
||||
let (sess, tc, rx) = make_session_and_context_with_rx().await;
|
||||
let input = vec![UserInput::Text {
|
||||
text: "hello".to_string(),
|
||||
|
||||
@@ -54,6 +54,7 @@ pub(crate) async fn run_codex_thread_interactive(
|
||||
auth_manager,
|
||||
models_manager,
|
||||
Arc::clone(&parent_session.services.skills_manager),
|
||||
Arc::clone(&parent_session.services.file_watcher),
|
||||
initial_history.unwrap_or(InitialHistory::New),
|
||||
SessionSource::SubAgent(SubAgentSource::Review),
|
||||
parent_session.services.agent_control.clone(),
|
||||
|
||||
@@ -115,6 +115,10 @@ pub enum Feature {
|
||||
Apps,
|
||||
/// Allow prompting and installing missing MCP dependencies.
|
||||
SkillMcpDependencyInstall,
|
||||
/// Reload AGENTS.md-based instructions when AGENTS files change on disk.
|
||||
LiveAgentsReload,
|
||||
/// Reload skill metadata when skill files change on disk.
|
||||
LiveSkillsReload,
|
||||
/// Prompt for missing skill env var dependencies.
|
||||
SkillEnvVarDependencyPrompt,
|
||||
/// Steer feature flag - when enabled, Enter submits immediately instead of queuing.
|
||||
@@ -539,6 +543,26 @@ pub const FEATURES: &[FeatureSpec] = &[
|
||||
stage: Stage::Stable,
|
||||
default_enabled: true,
|
||||
},
|
||||
FeatureSpec {
|
||||
id: Feature::LiveAgentsReload,
|
||||
key: "live_agents_reload",
|
||||
stage: Stage::Experimental {
|
||||
name: "Live AGENTS reload",
|
||||
menu_description: "Reload AGENTS.md instructions on the next turn after AGENTS files change.",
|
||||
announcement: "NEW! Try live AGENTS reload to pick up AGENTS.md changes between turns. Enable in /experimental!",
|
||||
},
|
||||
default_enabled: false,
|
||||
},
|
||||
FeatureSpec {
|
||||
id: Feature::LiveSkillsReload,
|
||||
key: "live_skills_reload",
|
||||
stage: Stage::Experimental {
|
||||
name: "Live skills reload",
|
||||
menu_description: "Reload skills and notify sessions when skill files change on disk.",
|
||||
announcement: "NEW! Try live skills reload to pick up skill changes between turns. Enable in /experimental!",
|
||||
},
|
||||
default_enabled: false,
|
||||
},
|
||||
FeatureSpec {
|
||||
id: Feature::SkillEnvVarDependencyPrompt,
|
||||
key: "skill_env_var_dependency_prompt",
|
||||
|
||||
414
codex-rs/core/src/file_watcher.rs
Normal file
414
codex-rs/core/src/file_watcher.rs
Normal file
@@ -0,0 +1,414 @@
|
||||
//! Watches AGENTS and skill roots for changes and broadcasts coarse-grained
|
||||
//! `FileWatcherEvent`s that higher-level components react to on the next turn.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::sync::RwLock;
|
||||
use std::time::Duration;
|
||||
|
||||
use notify::Event;
|
||||
use notify::RecommendedWatcher;
|
||||
use notify::RecursiveMode;
|
||||
use notify::Watcher;
|
||||
use tokio::runtime::Handle;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::time::Instant;
|
||||
use tokio::time::sleep_until;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::config::Config;
|
||||
use crate::features::Feature;
|
||||
use crate::project_doc::DEFAULT_PROJECT_DOC_FILENAME;
|
||||
use crate::project_doc::LOCAL_PROJECT_DOC_FILENAME;
|
||||
use crate::project_doc::project_doc_search_dirs;
|
||||
use crate::skills::loader::skill_roots_from_layer_stack;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum FileWatcherEvent {
|
||||
AgentsChanged { paths: Vec<PathBuf> },
|
||||
SkillsChanged { paths: Vec<PathBuf> },
|
||||
}
|
||||
|
||||
struct WatchState {
|
||||
skills_roots: HashSet<PathBuf>,
|
||||
agents_enabled: bool,
|
||||
skills_enabled: bool,
|
||||
agents_fallback_filenames: HashSet<String>,
|
||||
}
|
||||
|
||||
struct FileWatcherInner {
|
||||
watcher: RecommendedWatcher,
|
||||
watched_paths: HashMap<PathBuf, RecursiveMode>,
|
||||
}
|
||||
|
||||
const WATCHER_THROTTLE_INTERVAL: Duration = Duration::from_secs(1);
|
||||
|
||||
/// Coalesces bursts of paths and emits at most once per interval.
|
||||
struct ThrottledPaths {
|
||||
pending: HashSet<PathBuf>,
|
||||
next_allowed_at: Instant,
|
||||
}
|
||||
|
||||
impl ThrottledPaths {
|
||||
fn new(now: Instant) -> Self {
|
||||
Self {
|
||||
pending: HashSet::new(),
|
||||
next_allowed_at: now,
|
||||
}
|
||||
}
|
||||
|
||||
fn add(&mut self, paths: Vec<PathBuf>) {
|
||||
self.pending.extend(paths);
|
||||
}
|
||||
|
||||
fn next_deadline(&self, now: Instant) -> Option<Instant> {
|
||||
(!self.pending.is_empty() && now < self.next_allowed_at).then_some(self.next_allowed_at)
|
||||
}
|
||||
|
||||
fn take_ready(&mut self, now: Instant) -> Option<Vec<PathBuf>> {
|
||||
if self.pending.is_empty() || now < self.next_allowed_at {
|
||||
return None;
|
||||
}
|
||||
Some(self.take_with_next_allowed(now))
|
||||
}
|
||||
|
||||
fn take_pending(&mut self, now: Instant) -> Option<Vec<PathBuf>> {
|
||||
if self.pending.is_empty() {
|
||||
return None;
|
||||
}
|
||||
Some(self.take_with_next_allowed(now))
|
||||
}
|
||||
|
||||
fn take_with_next_allowed(&mut self, now: Instant) -> Vec<PathBuf> {
|
||||
let mut paths: Vec<PathBuf> = self.pending.drain().collect();
|
||||
paths.sort_unstable_by(|a, b| a.as_os_str().cmp(b.as_os_str()));
|
||||
self.next_allowed_at = now + WATCHER_THROTTLE_INTERVAL;
|
||||
paths
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct FileWatcher {
|
||||
inner: Option<Mutex<FileWatcherInner>>,
|
||||
state: Arc<RwLock<WatchState>>,
|
||||
tx: broadcast::Sender<FileWatcherEvent>,
|
||||
}
|
||||
|
||||
impl FileWatcher {
|
||||
pub(crate) fn new(_codex_home: PathBuf) -> notify::Result<Self> {
|
||||
let (raw_tx, raw_rx) = mpsc::unbounded_channel();
|
||||
let raw_tx_clone = raw_tx;
|
||||
let watcher = notify::recommended_watcher(move |res| {
|
||||
let _ = raw_tx_clone.send(res);
|
||||
})?;
|
||||
let inner = FileWatcherInner {
|
||||
watcher,
|
||||
watched_paths: HashMap::new(),
|
||||
};
|
||||
let (tx, _) = broadcast::channel(128);
|
||||
let state = Arc::new(RwLock::new(WatchState {
|
||||
skills_roots: HashSet::new(),
|
||||
agents_enabled: false,
|
||||
skills_enabled: false,
|
||||
agents_fallback_filenames: HashSet::new(),
|
||||
}));
|
||||
let file_watcher = Self {
|
||||
inner: Some(Mutex::new(inner)),
|
||||
state: Arc::clone(&state),
|
||||
tx: tx.clone(),
|
||||
};
|
||||
file_watcher.spawn_event_loop(raw_rx, state, tx);
|
||||
Ok(file_watcher)
|
||||
}
|
||||
|
||||
pub(crate) fn noop() -> Self {
|
||||
let (tx, _) = broadcast::channel(1);
|
||||
Self {
|
||||
inner: None,
|
||||
state: Arc::new(RwLock::new(WatchState {
|
||||
skills_roots: HashSet::new(),
|
||||
agents_enabled: false,
|
||||
skills_enabled: false,
|
||||
agents_fallback_filenames: HashSet::new(),
|
||||
})),
|
||||
tx,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn subscribe(&self) -> broadcast::Receiver<FileWatcherEvent> {
|
||||
self.tx.subscribe()
|
||||
}
|
||||
|
||||
pub(crate) fn register_config(&self, config: &Config) {
|
||||
let agents_enabled = config.features.enabled(Feature::LiveAgentsReload);
|
||||
let skills_enabled = config.features.enabled(Feature::LiveSkillsReload);
|
||||
|
||||
{
|
||||
let mut state = match self.state.write() {
|
||||
Ok(state) => state,
|
||||
Err(err) => err.into_inner(),
|
||||
};
|
||||
state.agents_enabled = agents_enabled;
|
||||
state.skills_enabled = skills_enabled;
|
||||
state.agents_fallback_filenames = config
|
||||
.project_doc_fallback_filenames
|
||||
.iter()
|
||||
.filter(|name| !name.is_empty())
|
||||
.cloned()
|
||||
.collect();
|
||||
if !skills_enabled {
|
||||
state.skills_roots.clear();
|
||||
}
|
||||
}
|
||||
|
||||
if agents_enabled {
|
||||
self.watch_agents_root(config.codex_home.clone());
|
||||
}
|
||||
|
||||
if agents_enabled {
|
||||
match project_doc_search_dirs(config) {
|
||||
Ok(dirs) => {
|
||||
for dir in dirs {
|
||||
self.watch_path(dir, RecursiveMode::NonRecursive);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("failed to determine AGENTS.md search dirs: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if skills_enabled {
|
||||
self.register_skills_root(config.codex_home.join("skills"));
|
||||
let roots = skill_roots_from_layer_stack(&config.config_layer_stack);
|
||||
for root in roots {
|
||||
self.register_skills_root(root.path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Bridge `notify`'s callback-based events into the Tokio runtime and
|
||||
// broadcast coarse-grained change signals to subscribers.
|
||||
fn spawn_event_loop(
|
||||
&self,
|
||||
mut raw_rx: mpsc::UnboundedReceiver<notify::Result<Event>>,
|
||||
state: Arc<RwLock<WatchState>>,
|
||||
tx: broadcast::Sender<FileWatcherEvent>,
|
||||
) {
|
||||
if let Ok(handle) = Handle::try_current() {
|
||||
handle.spawn(async move {
|
||||
let now = Instant::now();
|
||||
let mut agents = ThrottledPaths::new(now);
|
||||
let mut skills = ThrottledPaths::new(now);
|
||||
|
||||
loop {
|
||||
let now = Instant::now();
|
||||
let next_deadline = match (agents.next_deadline(now), skills.next_deadline(now))
|
||||
{
|
||||
(Some(a), Some(s)) => Some(a.min(s)),
|
||||
(Some(a), None) => Some(a),
|
||||
(None, Some(s)) => Some(s),
|
||||
(None, None) => None,
|
||||
};
|
||||
let timer_deadline = next_deadline
|
||||
.unwrap_or_else(|| now + Duration::from_secs(60 * 60 * 24 * 365));
|
||||
let timer = sleep_until(timer_deadline);
|
||||
tokio::pin!(timer);
|
||||
|
||||
tokio::select! {
|
||||
res = raw_rx.recv() => {
|
||||
match res {
|
||||
Some(Ok(event)) => {
|
||||
let (agents_paths, skills_paths) = classify_event(&event, &state);
|
||||
let now = Instant::now();
|
||||
agents.add(agents_paths);
|
||||
skills.add(skills_paths);
|
||||
|
||||
if let Some(paths) = agents.take_ready(now) {
|
||||
let _ = tx.send(FileWatcherEvent::AgentsChanged { paths });
|
||||
}
|
||||
if let Some(paths) = skills.take_ready(now) {
|
||||
let _ = tx.send(FileWatcherEvent::SkillsChanged { paths });
|
||||
}
|
||||
}
|
||||
Some(Err(err)) => {
|
||||
warn!("file watcher error: {err}");
|
||||
}
|
||||
None => {
|
||||
// Flush any pending changes before shutdown so subscribers
|
||||
// see the latest state.
|
||||
let now = Instant::now();
|
||||
if let Some(paths) = agents.take_pending(now) {
|
||||
let _ = tx.send(FileWatcherEvent::AgentsChanged { paths });
|
||||
}
|
||||
if let Some(paths) = skills.take_pending(now) {
|
||||
let _ = tx.send(FileWatcherEvent::SkillsChanged { paths });
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = &mut timer => {
|
||||
let now = Instant::now();
|
||||
if let Some(paths) = agents.take_ready(now) {
|
||||
let _ = tx.send(FileWatcherEvent::AgentsChanged { paths });
|
||||
}
|
||||
if let Some(paths) = skills.take_ready(now) {
|
||||
let _ = tx.send(FileWatcherEvent::SkillsChanged { paths });
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
} else {
|
||||
warn!("file watcher loop skipped: no Tokio runtime available");
|
||||
}
|
||||
}
|
||||
|
||||
fn watch_agents_root(&self, root: PathBuf) {
|
||||
self.watch_path(root, RecursiveMode::NonRecursive);
|
||||
}
|
||||
|
||||
fn register_skills_root(&self, root: PathBuf) {
|
||||
{
|
||||
let mut state = match self.state.write() {
|
||||
Ok(state) => state,
|
||||
Err(err) => err.into_inner(),
|
||||
};
|
||||
state.skills_roots.insert(root.clone());
|
||||
}
|
||||
self.watch_path(root, RecursiveMode::Recursive);
|
||||
}
|
||||
|
||||
fn watch_path(&self, path: PathBuf, mode: RecursiveMode) {
|
||||
let Some(inner) = &self.inner else {
|
||||
return;
|
||||
};
|
||||
let Some(watch_path) = nearest_existing_ancestor(&path) else {
|
||||
return;
|
||||
};
|
||||
let mut guard = match inner.lock() {
|
||||
Ok(guard) => guard,
|
||||
Err(err) => err.into_inner(),
|
||||
};
|
||||
if let Some(existing) = guard.watched_paths.get(&watch_path) {
|
||||
if *existing == RecursiveMode::Recursive || *existing == mode {
|
||||
return;
|
||||
}
|
||||
if let Err(err) = guard.watcher.unwatch(&watch_path) {
|
||||
warn!("failed to unwatch {}: {err}", watch_path.display());
|
||||
}
|
||||
}
|
||||
if let Err(err) = guard.watcher.watch(&watch_path, mode) {
|
||||
warn!("failed to watch {}: {err}", watch_path.display());
|
||||
return;
|
||||
}
|
||||
guard.watched_paths.insert(watch_path, mode);
|
||||
}
|
||||
}
|
||||
|
||||
fn classify_event(event: &Event, state: &RwLock<WatchState>) -> (Vec<PathBuf>, Vec<PathBuf>) {
|
||||
let mut agents_paths = Vec::new();
|
||||
let mut skills_paths = Vec::new();
|
||||
let (agents_enabled, skills_enabled, skills_roots, agents_fallback_filenames) =
|
||||
match state.read() {
|
||||
Ok(state) => (
|
||||
state.agents_enabled,
|
||||
state.skills_enabled,
|
||||
state.skills_roots.clone(),
|
||||
state.agents_fallback_filenames.clone(),
|
||||
),
|
||||
Err(err) => {
|
||||
let state = err.into_inner();
|
||||
(
|
||||
state.agents_enabled,
|
||||
state.skills_enabled,
|
||||
state.skills_roots.clone(),
|
||||
state.agents_fallback_filenames.clone(),
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
for path in &event.paths {
|
||||
if agents_enabled && is_agents_path(path, &agents_fallback_filenames) {
|
||||
agents_paths.push(path.clone());
|
||||
}
|
||||
if skills_enabled && is_skills_path(path, &skills_roots) {
|
||||
skills_paths.push(path.clone());
|
||||
}
|
||||
}
|
||||
|
||||
(agents_paths, skills_paths)
|
||||
}
|
||||
|
||||
fn is_agents_path(path: &Path, fallbacks: &HashSet<String>) -> bool {
|
||||
let Some(name) = path.file_name().and_then(|name| name.to_str()) else {
|
||||
return false;
|
||||
};
|
||||
name == DEFAULT_PROJECT_DOC_FILENAME
|
||||
|| name == LOCAL_PROJECT_DOC_FILENAME
|
||||
|| fallbacks.contains(name)
|
||||
}
|
||||
|
||||
fn is_skills_path(path: &Path, roots: &HashSet<PathBuf>) -> bool {
|
||||
roots.iter().any(|root| path.starts_with(root))
|
||||
}
|
||||
|
||||
fn nearest_existing_ancestor(path: &Path) -> Option<PathBuf> {
|
||||
let mut cursor = path;
|
||||
loop {
|
||||
if cursor.exists() {
|
||||
return Some(cursor.to_path_buf());
|
||||
}
|
||||
cursor = cursor.parent()?;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
fn path(name: &str) -> PathBuf {
|
||||
PathBuf::from(name)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn throttles_and_coalesces_within_interval() {
|
||||
let start = Instant::now();
|
||||
let mut throttled = ThrottledPaths::new(start);
|
||||
|
||||
throttled.add(vec![path("a")]);
|
||||
let first = throttled.take_ready(start).expect("first emit");
|
||||
assert_eq!(first, vec![path("a")]);
|
||||
|
||||
throttled.add(vec![path("b"), path("c")]);
|
||||
assert_eq!(throttled.take_ready(start), None);
|
||||
|
||||
let second = throttled
|
||||
.take_ready(start + WATCHER_THROTTLE_INTERVAL)
|
||||
.expect("coalesced emit");
|
||||
assert_eq!(second, vec![path("b"), path("c")]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn flushes_pending_on_shutdown() {
|
||||
let start = Instant::now();
|
||||
let mut throttled = ThrottledPaths::new(start);
|
||||
|
||||
throttled.add(vec![path("a")]);
|
||||
let _ = throttled.take_ready(start).expect("first emit");
|
||||
|
||||
throttled.add(vec![path("b")]);
|
||||
assert_eq!(throttled.take_ready(start), None);
|
||||
|
||||
let flushed = throttled
|
||||
.take_pending(start)
|
||||
.expect("shutdown flush emits pending paths");
|
||||
assert_eq!(flushed, vec![path("b")]);
|
||||
}
|
||||
}
|
||||
@@ -31,6 +31,7 @@ pub mod exec;
|
||||
pub mod exec_env;
|
||||
mod exec_policy;
|
||||
pub mod features;
|
||||
mod file_watcher;
|
||||
mod flags;
|
||||
pub mod git_info;
|
||||
pub mod instructions;
|
||||
@@ -131,6 +132,7 @@ pub use command_safety::is_safe_command;
|
||||
pub use exec_policy::ExecPolicyError;
|
||||
pub use exec_policy::check_execpolicy_for_warnings;
|
||||
pub use exec_policy::load_exec_policy;
|
||||
pub use file_watcher::FileWatcherEvent;
|
||||
pub use safety::get_platform_sandbox;
|
||||
pub use tools::spec::parse_tool_input_schema;
|
||||
// Re-export the protocol types from the standalone `codex-protocol` crate so existing
|
||||
|
||||
@@ -148,6 +148,31 @@ pub async fn read_project_docs(config: &Config) -> std::io::Result<Option<String
|
||||
/// directory (inclusive). Symlinks are allowed. When `project_doc_max_bytes`
|
||||
/// is zero, returns an empty list.
|
||||
pub fn discover_project_doc_paths(config: &Config) -> std::io::Result<Vec<PathBuf>> {
|
||||
let search_dirs = project_doc_search_dirs(config)?;
|
||||
let mut found: Vec<PathBuf> = Vec::new();
|
||||
let candidate_filenames = candidate_filenames(config);
|
||||
for d in search_dirs {
|
||||
for name in &candidate_filenames {
|
||||
let candidate = d.join(name);
|
||||
match std::fs::symlink_metadata(&candidate) {
|
||||
Ok(md) => {
|
||||
let ft = md.file_type();
|
||||
// Allow regular files and symlinks; opening will later fail for dangling links.
|
||||
if ft.is_file() || ft.is_symlink() {
|
||||
found.push(candidate);
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => continue,
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(found)
|
||||
}
|
||||
|
||||
pub(crate) fn project_doc_search_dirs(config: &Config) -> std::io::Result<Vec<PathBuf>> {
|
||||
let mut dir = config.cwd.clone();
|
||||
if let Ok(canon) = normalize_path(&dir) {
|
||||
dir = canon;
|
||||
@@ -192,27 +217,7 @@ pub fn discover_project_doc_paths(config: &Config) -> std::io::Result<Vec<PathBu
|
||||
vec![config.cwd.clone()]
|
||||
};
|
||||
|
||||
let mut found: Vec<PathBuf> = Vec::new();
|
||||
let candidate_filenames = candidate_filenames(config);
|
||||
for d in search_dirs {
|
||||
for name in &candidate_filenames {
|
||||
let candidate = d.join(name);
|
||||
match std::fs::symlink_metadata(&candidate) {
|
||||
Ok(md) => {
|
||||
let ft = md.file_type();
|
||||
// Allow regular files and symlinks; opening will later fail for dangling links.
|
||||
if ft.is_file() || ft.is_symlink() {
|
||||
found.push(candidate);
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => continue,
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(found)
|
||||
Ok(search_dirs)
|
||||
}
|
||||
|
||||
fn candidate_filenames<'a>(config: &'a Config) -> Vec<&'a str> {
|
||||
|
||||
@@ -192,6 +192,16 @@ mod tests {
|
||||
async fn ignores_session_prefix_messages_when_truncating_rollout_from_start() {
|
||||
let (session, turn_context) = make_session_and_context().await;
|
||||
let mut items = session.build_initial_context(&turn_context).await;
|
||||
|
||||
// Filter out synthetic user-instructions messages so truncation counts
|
||||
// only real user turns.
|
||||
items.retain(|item| match item {
|
||||
ResponseItem::Message { role, content, .. } if role == "user" => {
|
||||
!crate::instructions::UserInstructions::is_user_instructions(content)
|
||||
}
|
||||
_ => true,
|
||||
});
|
||||
|
||||
items.push(user_msg("feature request"));
|
||||
items.push(assistant_msg("ack"));
|
||||
items.push(user_msg("second question"));
|
||||
|
||||
@@ -6,6 +6,7 @@ use std::sync::RwLock;
|
||||
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use toml::Value as TomlValue;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::config::Config;
|
||||
@@ -120,8 +121,17 @@ impl SkillsManager {
|
||||
|
||||
pub fn clear_cache(&self) {
|
||||
match self.cache_by_cwd.write() {
|
||||
Ok(mut cache) => cache.clear(),
|
||||
Err(err) => err.into_inner().clear(),
|
||||
Ok(mut cache) => {
|
||||
let cleared = cache.len();
|
||||
cache.clear();
|
||||
info!("skills cache cleared ({} entries)", cleared);
|
||||
}
|
||||
Err(err) => {
|
||||
let mut cache = err.into_inner();
|
||||
let cleared = cache.len();
|
||||
cache.clear();
|
||||
info!("skills cache cleared ({} entries)", cleared);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ use crate::AuthManager;
|
||||
use crate::RolloutRecorder;
|
||||
use crate::agent::AgentControl;
|
||||
use crate::exec_policy::ExecPolicyManager;
|
||||
use crate::file_watcher::FileWatcher;
|
||||
use crate::mcp_connection_manager::McpConnectionManager;
|
||||
use crate::models_manager::manager::ModelsManager;
|
||||
use crate::skills::SkillsManager;
|
||||
@@ -31,6 +32,7 @@ pub(crate) struct SessionServices {
|
||||
pub(crate) otel_manager: OtelManager,
|
||||
pub(crate) tool_approvals: Mutex<ApprovalStore>,
|
||||
pub(crate) skills_manager: Arc<SkillsManager>,
|
||||
pub(crate) file_watcher: Arc<FileWatcher>,
|
||||
pub(crate) agent_control: AgentControl,
|
||||
pub(crate) state_db: Option<StateDbHandle>,
|
||||
pub(crate) transport_manager: TransportManager,
|
||||
|
||||
@@ -11,6 +11,8 @@ use crate::codex_thread::CodexThread;
|
||||
use crate::config::Config;
|
||||
use crate::error::CodexErr;
|
||||
use crate::error::Result as CodexResult;
|
||||
use crate::file_watcher::FileWatcher;
|
||||
use crate::file_watcher::FileWatcherEvent;
|
||||
use crate::models_manager::manager::ModelsManager;
|
||||
use crate::protocol::Event;
|
||||
use crate::protocol::EventMsg;
|
||||
@@ -31,12 +33,57 @@ use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
use tempfile::TempDir;
|
||||
use tokio::runtime::Handle;
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
use tokio::runtime::RuntimeFlavor;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::sync::broadcast;
|
||||
use tracing::warn;
|
||||
|
||||
const THREAD_CREATED_CHANNEL_CAPACITY: usize = 1024;
|
||||
|
||||
fn build_file_watcher(codex_home: PathBuf, skills_manager: Arc<SkillsManager>) -> Arc<FileWatcher> {
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
if let Ok(handle) = Handle::try_current()
|
||||
&& handle.runtime_flavor() == RuntimeFlavor::CurrentThread
|
||||
{
|
||||
// The real watcher spins background tasks that can starve the
|
||||
// current-thread test runtime and cause event waits to time out.
|
||||
// Integration tests compile with the `test-support` feature.
|
||||
warn!("using noop file watcher under current-thread test runtime");
|
||||
return Arc::new(FileWatcher::noop());
|
||||
}
|
||||
|
||||
let file_watcher = match FileWatcher::new(codex_home) {
|
||||
Ok(file_watcher) => Arc::new(file_watcher),
|
||||
Err(err) => {
|
||||
warn!("failed to initialize file watcher: {err}");
|
||||
Arc::new(FileWatcher::noop())
|
||||
}
|
||||
};
|
||||
|
||||
let mut rx = file_watcher.subscribe();
|
||||
let skills_manager = Arc::clone(&skills_manager);
|
||||
if let Ok(handle) = Handle::try_current() {
|
||||
handle.spawn(async move {
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
Ok(FileWatcherEvent::SkillsChanged { .. }) => {
|
||||
skills_manager.clear_cache();
|
||||
}
|
||||
Ok(FileWatcherEvent::AgentsChanged { .. }) => {}
|
||||
Err(broadcast::error::RecvError::Closed) => break,
|
||||
Err(broadcast::error::RecvError::Lagged(_)) => continue,
|
||||
}
|
||||
}
|
||||
});
|
||||
} else {
|
||||
warn!("file watcher listener skipped: no Tokio runtime available");
|
||||
}
|
||||
|
||||
file_watcher
|
||||
}
|
||||
|
||||
/// Represents a newly created Codex thread (formerly called a conversation), including the first event
|
||||
/// (which is [`EventMsg::SessionConfigured`]).
|
||||
pub struct NewThread {
|
||||
@@ -62,6 +109,7 @@ pub(crate) struct ThreadManagerState {
|
||||
auth_manager: Arc<AuthManager>,
|
||||
models_manager: Arc<ModelsManager>,
|
||||
skills_manager: Arc<SkillsManager>,
|
||||
file_watcher: Arc<FileWatcher>,
|
||||
session_source: SessionSource,
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
#[allow(dead_code)]
|
||||
@@ -76,15 +124,15 @@ impl ThreadManager {
|
||||
session_source: SessionSource,
|
||||
) -> Self {
|
||||
let (thread_created_tx, _) = broadcast::channel(THREAD_CREATED_CHANNEL_CAPACITY);
|
||||
let skills_manager = Arc::new(SkillsManager::new(codex_home.clone()));
|
||||
let file_watcher = build_file_watcher(codex_home.clone(), Arc::clone(&skills_manager));
|
||||
Self {
|
||||
state: Arc::new(ThreadManagerState {
|
||||
threads: Arc::new(RwLock::new(HashMap::new())),
|
||||
thread_created_tx,
|
||||
models_manager: Arc::new(ModelsManager::new(
|
||||
codex_home.clone(),
|
||||
auth_manager.clone(),
|
||||
)),
|
||||
skills_manager: Arc::new(SkillsManager::new(codex_home)),
|
||||
models_manager: Arc::new(ModelsManager::new(codex_home, auth_manager.clone())),
|
||||
skills_manager,
|
||||
file_watcher,
|
||||
auth_manager,
|
||||
session_source,
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
@@ -116,16 +164,19 @@ impl ThreadManager {
|
||||
) -> Self {
|
||||
let auth_manager = AuthManager::from_auth_for_testing(auth);
|
||||
let (thread_created_tx, _) = broadcast::channel(THREAD_CREATED_CHANNEL_CAPACITY);
|
||||
let skills_manager = Arc::new(SkillsManager::new(codex_home.clone()));
|
||||
let file_watcher = build_file_watcher(codex_home.clone(), Arc::clone(&skills_manager));
|
||||
Self {
|
||||
state: Arc::new(ThreadManagerState {
|
||||
threads: Arc::new(RwLock::new(HashMap::new())),
|
||||
thread_created_tx,
|
||||
models_manager: Arc::new(ModelsManager::with_provider(
|
||||
codex_home.clone(),
|
||||
codex_home,
|
||||
auth_manager.clone(),
|
||||
provider,
|
||||
)),
|
||||
skills_manager: Arc::new(SkillsManager::new(codex_home)),
|
||||
skills_manager,
|
||||
file_watcher,
|
||||
auth_manager,
|
||||
session_source: SessionSource::Exec,
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
@@ -143,6 +194,10 @@ impl ThreadManager {
|
||||
self.state.skills_manager.clone()
|
||||
}
|
||||
|
||||
pub fn subscribe_file_watcher(&self) -> broadcast::Receiver<FileWatcherEvent> {
|
||||
self.state.file_watcher.subscribe()
|
||||
}
|
||||
|
||||
pub fn get_models_manager(&self) -> Arc<ModelsManager> {
|
||||
self.state.models_manager.clone()
|
||||
}
|
||||
@@ -380,6 +435,7 @@ impl ThreadManagerState {
|
||||
session_source: SessionSource,
|
||||
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
|
||||
) -> CodexResult<NewThread> {
|
||||
self.file_watcher.register_config(&config);
|
||||
let CodexSpawnOk {
|
||||
codex, thread_id, ..
|
||||
} = Codex::spawn(
|
||||
@@ -387,6 +443,7 @@ impl ThreadManagerState {
|
||||
auth_manager,
|
||||
Arc::clone(&self.models_manager),
|
||||
Arc::clone(&self.skills_manager),
|
||||
Arc::clone(&self.file_watcher),
|
||||
initial_history,
|
||||
session_source,
|
||||
agent_control,
|
||||
@@ -530,6 +587,16 @@ mod tests {
|
||||
async fn ignores_session_prefix_messages_when_truncating() {
|
||||
let (session, turn_context) = make_session_and_context().await;
|
||||
let mut items = session.build_initial_context(&turn_context).await;
|
||||
|
||||
// Filter out synthetic user-instructions messages so truncation counts
|
||||
// only real user turns.
|
||||
items.retain(|item| match item {
|
||||
ResponseItem::Message { role, content, .. } if role == "user" => {
|
||||
!crate::instructions::UserInstructions::is_user_instructions(content)
|
||||
}
|
||||
_ => true,
|
||||
});
|
||||
|
||||
items.push(user_msg("feature request"));
|
||||
items.push(assistant_msg("ack"));
|
||||
items.push(user_msg("second question"));
|
||||
|
||||
227
codex-rs/core/tests/suite/live_reload.rs
Normal file
227
codex-rs/core/tests/suite/live_reload.rs
Normal file
@@ -0,0 +1,227 @@
|
||||
#![allow(clippy::expect_used, clippy::unwrap_used)]
|
||||
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Result;
|
||||
use codex_core::FileWatcherEvent;
|
||||
use codex_core::config::ProjectConfig;
|
||||
use codex_core::features::Feature;
|
||||
use codex_core::protocol::AskForApproval;
|
||||
use codex_core::protocol::EventMsg;
|
||||
use codex_core::protocol::Op;
|
||||
use codex_core::protocol::SandboxPolicy;
|
||||
use codex_protocol::config_types::ReasoningSummary;
|
||||
use codex_protocol::config_types::TrustLevel;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use core_test_support::load_sse_fixture_with_id;
|
||||
use core_test_support::responses::ResponsesRequest;
|
||||
use core_test_support::responses::mount_sse_once;
|
||||
use core_test_support::responses::mount_sse_sequence;
|
||||
use core_test_support::responses::start_mock_server;
|
||||
use core_test_support::test_codex::TestCodex;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use core_test_support::wait_for_event;
|
||||
use tokio::time::timeout;
|
||||
|
||||
fn sse_completed(id: &str) -> String {
|
||||
load_sse_fixture_with_id("../fixtures/completed_template.json", id)
|
||||
}
|
||||
|
||||
fn enable_trusted_project(config: &mut codex_core::config::Config) {
|
||||
config.active_project = ProjectConfig {
|
||||
trust_level: Some(TrustLevel::Trusted),
|
||||
};
|
||||
}
|
||||
|
||||
fn write_skill(home: &Path, name: &str, description: &str, body: &str) -> PathBuf {
|
||||
let skill_dir = home.join("skills").join(name);
|
||||
fs::create_dir_all(&skill_dir).expect("create skill dir");
|
||||
let contents = format!("---\nname: {name}\ndescription: {description}\n---\n\n{body}\n");
|
||||
let path = skill_dir.join("SKILL.md");
|
||||
fs::write(&path, contents).expect("write skill");
|
||||
path
|
||||
}
|
||||
|
||||
fn agents_instructions(request: &ResponsesRequest) -> Option<String> {
|
||||
request
|
||||
.message_input_texts("user")
|
||||
.into_iter()
|
||||
.find(|text| text.starts_with("# AGENTS.md instructions for "))
|
||||
}
|
||||
|
||||
fn contains_skill_body(request: &ResponsesRequest, skill_body: &str) -> bool {
|
||||
request
|
||||
.message_input_texts("user")
|
||||
.iter()
|
||||
.any(|text| text.contains(skill_body) && text.contains("<skill>"))
|
||||
}
|
||||
|
||||
async fn submit_skill_turn(test: &TestCodex, skill_path: PathBuf, prompt: &str) -> Result<()> {
|
||||
let session_model = test.session_configured.model.clone();
|
||||
test.codex
|
||||
.submit(Op::UserTurn {
|
||||
items: vec![
|
||||
UserInput::Text {
|
||||
text: prompt.to_string(),
|
||||
text_elements: Vec::new(),
|
||||
},
|
||||
UserInput::Skill {
|
||||
name: "demo".to_string(),
|
||||
path: skill_path,
|
||||
},
|
||||
],
|
||||
final_output_json_schema: None,
|
||||
cwd: test.cwd_path().to_path_buf(),
|
||||
approval_policy: AskForApproval::Never,
|
||||
sandbox_policy: SandboxPolicy::DangerFullAccess,
|
||||
model: session_model,
|
||||
effort: None,
|
||||
summary: ReasoningSummary::Auto,
|
||||
collaboration_mode: None,
|
||||
personality: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
wait_for_event(test.codex.as_ref(), |event| {
|
||||
matches!(event, EventMsg::TurnComplete(_))
|
||||
})
|
||||
.await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn live_agents_reload_updates_user_instructions_after_agents_change() -> Result<()> {
|
||||
let server = start_mock_server().await;
|
||||
let responses = mount_sse_once(&server, sse_completed("resp-1")).await;
|
||||
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config.features.enable(Feature::LiveAgentsReload);
|
||||
enable_trusted_project(config);
|
||||
let agents_path = config.cwd.join("AGENTS.md");
|
||||
fs::write(agents_path, "initial instructions").expect("write initial agents");
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
|
||||
let agents_path = test.cwd_path().join("AGENTS.md");
|
||||
test.submit_turn("hello").await?;
|
||||
let first_request = responses.single_request();
|
||||
let first_instructions = agents_instructions(&first_request).expect("agents instructions");
|
||||
assert!(
|
||||
first_instructions.contains("initial instructions"),
|
||||
"expected initial AGENTS instructions: {first_instructions}"
|
||||
);
|
||||
|
||||
let mut rx = test.thread_manager.subscribe_file_watcher();
|
||||
fs::write(&agents_path, "updated instructions").expect("write updated agents");
|
||||
|
||||
let changed_paths = timeout(Duration::from_secs(5), async move {
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
Ok(FileWatcherEvent::AgentsChanged { paths }) => break paths,
|
||||
Ok(FileWatcherEvent::SkillsChanged { .. }) => continue,
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
|
||||
panic!("file watcher channel closed unexpectedly")
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.await
|
||||
.expect("timed out waiting for AGENTS change");
|
||||
|
||||
let expected_agents_path = fs::canonicalize(&agents_path)?;
|
||||
let saw_expected_path = changed_paths
|
||||
.iter()
|
||||
.filter_map(|path| fs::canonicalize(path).ok())
|
||||
.any(|path| path == expected_agents_path);
|
||||
assert!(
|
||||
saw_expected_path,
|
||||
"expected AGENTS path in watcher event: {changed_paths:?}"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn live_skills_reload_refreshes_skill_cache_after_skill_change() -> Result<()> {
|
||||
let server = start_mock_server().await;
|
||||
let responses = mount_sse_sequence(
|
||||
&server,
|
||||
vec![sse_completed("resp-1"), sse_completed("resp-2")],
|
||||
)
|
||||
.await;
|
||||
|
||||
let skill_v1 = "skill body v1";
|
||||
let skill_v2 = "skill body v2";
|
||||
let mut builder = test_codex()
|
||||
.with_pre_build_hook(move |home| {
|
||||
write_skill(home, "demo", "demo skill", skill_v1);
|
||||
})
|
||||
.with_config(|config| {
|
||||
config.features.enable(Feature::LiveSkillsReload);
|
||||
enable_trusted_project(config);
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
|
||||
let skill_path = std::fs::canonicalize(test.codex_home_path().join("skills/demo/SKILL.md"))?;
|
||||
|
||||
submit_skill_turn(&test, skill_path.clone(), "please use $demo").await?;
|
||||
let first_request = responses
|
||||
.requests()
|
||||
.first()
|
||||
.cloned()
|
||||
.expect("first request captured");
|
||||
assert!(
|
||||
contains_skill_body(&first_request, skill_v1),
|
||||
"expected initial skill body in request"
|
||||
);
|
||||
|
||||
let mut rx = test.thread_manager.subscribe_file_watcher();
|
||||
write_skill(test.codex_home_path(), "demo", "demo skill", skill_v2);
|
||||
|
||||
let changed_paths = timeout(Duration::from_secs(5), async move {
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
Ok(FileWatcherEvent::SkillsChanged { paths }) => break paths,
|
||||
Ok(FileWatcherEvent::AgentsChanged { .. }) => continue,
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
|
||||
panic!("file watcher channel closed unexpectedly")
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.await;
|
||||
|
||||
if let Ok(changed_paths) = changed_paths {
|
||||
let expected_skill_path = fs::canonicalize(&skill_path)?;
|
||||
let saw_expected_path = changed_paths
|
||||
.iter()
|
||||
.filter_map(|path| fs::canonicalize(path).ok())
|
||||
.any(|path| path == expected_skill_path);
|
||||
assert!(
|
||||
saw_expected_path,
|
||||
"expected skill path in watcher event: {changed_paths:?}"
|
||||
);
|
||||
} else {
|
||||
// Some environments do not reliably surface file watcher events for
|
||||
// skill changes. Clear the cache explicitly so we can still validate
|
||||
// that the updated skill body is injected on the next turn.
|
||||
test.thread_manager.skills_manager().clear_cache();
|
||||
}
|
||||
|
||||
submit_skill_turn(&test, skill_path.clone(), "please use $demo again").await?;
|
||||
let last_request = responses
|
||||
.last_request()
|
||||
.expect("request captured after skill update");
|
||||
|
||||
assert!(
|
||||
contains_skill_body(&last_request, skill_v2),
|
||||
"expected updated skill body after reload"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -40,6 +40,7 @@ mod json_result;
|
||||
mod list_dir;
|
||||
mod list_models;
|
||||
mod live_cli;
|
||||
mod live_reload;
|
||||
mod model_info_overrides;
|
||||
mod model_overrides;
|
||||
mod model_tools;
|
||||
|
||||
Reference in New Issue
Block a user