diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 5ced1803fa..2cbc80c513 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -2636,6 +2636,7 @@ dependencies = [ "libc", "pretty_assertions", "reqwest", + "semver", "serde", "serde_json", "tar", diff --git a/codex-rs/app-server/src/request_processors/turn_processor.rs b/codex-rs/app-server/src/request_processors/turn_processor.rs index 50cefaf4f0..4ad481f994 100644 --- a/codex-rs/app-server/src/request_processors/turn_processor.rs +++ b/codex-rs/app-server/src/request_processors/turn_processor.rs @@ -636,9 +636,7 @@ impl TurnRequestProcessor { collaboration_mode: params.collaboration_mode, personality: params.personality, }; - if thread_settings_request.has_any_overrides() { - self.wait_for_pending_thread_settings(thread_id).await?; - } + self.wait_for_pending_thread_settings(thread_id).await?; let before_snapshot = thread.config_snapshot().await; let before_thread_settings = thread_settings_from_snapshot(&before_snapshot); diff --git a/codex-rs/core-plugins/Cargo.toml b/codex-rs/core-plugins/Cargo.toml index 352d6e5714..83df070dd3 100644 --- a/codex-rs/core-plugins/Cargo.toml +++ b/codex-rs/core-plugins/Cargo.toml @@ -32,6 +32,7 @@ chrono = { workspace = true } dirs = { workspace = true } flate2 = { workspace = true } reqwest = { workspace = true } +semver = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } tar = { workspace = true } diff --git a/codex-rs/core-plugins/src/store.rs b/codex-rs/core-plugins/src/store.rs index fe662a142e..19c3b0116f 100644 --- a/codex-rs/core-plugins/src/store.rs +++ b/codex-rs/core-plugins/src/store.rs @@ -4,8 +4,10 @@ use codex_plugin::PluginId; use codex_plugin::validate_plugin_segment; use codex_utils_absolute_path::AbsolutePathBuf; use codex_utils_plugins::find_plugin_manifest_path; +use semver::Version; use serde::Deserialize; use serde_json::Value as JsonValue; +use std::cmp::Ordering; use std::fs; use std::io; use std::path::Path; @@ -75,7 +77,7 @@ impl PluginStore { }) .filter(|version| validate_plugin_version_segment(version).is_ok()) .collect::>(); - discovered_versions.sort_unstable(); + discovered_versions.sort_unstable_by(|left, right| compare_plugin_versions(left, right)); if discovered_versions.is_empty() { None } else if discovered_versions @@ -286,6 +288,15 @@ fn replace_plugin_root_atomically( let staged_version_root = staged_root.join(plugin_version); copy_dir_recursive(source, &staged_version_root)?; + let target_version_root = target_root.join(plugin_version); + if target_root.exists() && !target_version_root.exists() { + fs::rename(&staged_version_root, &target_version_root).map_err(|err| { + PluginStoreError::io("failed to activate updated plugin cache version", err) + })?; + remove_old_plugin_versions(target_root, plugin_version)?; + return Ok(()); + } + if target_root.exists() { let backup_dir = tempfile::Builder::new() .prefix("plugin-backup-") @@ -322,6 +333,52 @@ fn replace_plugin_root_atomically( Ok(()) } +fn remove_old_plugin_versions( + target_root: &Path, + plugin_version: &str, +) -> Result<(), PluginStoreError> { + let Ok(entries) = fs::read_dir(target_root) else { + return Ok(()); + }; + + for entry in entries.filter_map(Result::ok) { + let Ok(file_type) = entry.file_type() else { + continue; + }; + if !file_type.is_dir() { + continue; + } + let Ok(version) = entry.file_name().into_string() else { + continue; + }; + if version == plugin_version || validate_plugin_version_segment(&version).is_err() { + continue; + } + + if fs::remove_dir_all(entry.path()).is_err() + && old_plugin_version_would_stay_active(&version, plugin_version) + { + return Err(PluginStoreError::Invalid(format!( + "failed to activate updated plugin cache version `{plugin_version}` while `{version}` remains active" + ))); + } + } + + Ok(()) +} + +fn old_plugin_version_would_stay_active(old_version: &str, new_version: &str) -> bool { + old_version == DEFAULT_PLUGIN_VERSION + || compare_plugin_versions(old_version, new_version).is_gt() +} + +fn compare_plugin_versions(left: &str, right: &str) -> Ordering { + match (Version::parse(left), Version::parse(right)) { + (Ok(left), Ok(right)) => left.cmp(&right), + _ => left.cmp(right), + } +} + fn copy_dir_recursive(source: &Path, target: &Path) -> Result<(), PluginStoreError> { fs::create_dir_all(target) .map_err(|err| PluginStoreError::io("failed to create plugin target directory", err))?; diff --git a/codex-rs/core-plugins/src/store_tests.rs b/codex-rs/core-plugins/src/store_tests.rs index 0ba6b0d2c6..200055fe6c 100644 --- a/codex-rs/core-plugins/src/store_tests.rs +++ b/codex-rs/core-plugins/src/store_tests.rs @@ -247,7 +247,7 @@ fn active_plugin_version_prefers_default_local_version_when_multiple_versions_ex } #[test] -fn active_plugin_version_returns_last_sorted_version_when_default_is_missing() { +fn active_plugin_version_returns_latest_version_when_default_is_missing() { let tmp = tempdir().unwrap(); write_plugin( &tmp.path().join("plugins/cache/debug"), @@ -268,6 +268,76 @@ fn active_plugin_version_returns_last_sorted_version_when_default_is_missing() { ); } +#[test] +fn active_plugin_version_compares_semver_versions_semantically() { + let tmp = tempdir().unwrap(); + write_plugin( + &tmp.path().join("plugins/cache/debug"), + "sample-plugin/9.0.0", + "sample-plugin", + ); + write_plugin( + &tmp.path().join("plugins/cache/debug"), + "sample-plugin/10.0.0", + "sample-plugin", + ); + let store = PluginStore::new(tmp.path().to_path_buf()); + let plugin_id = PluginId::new("sample-plugin".to_string(), "debug".to_string()).unwrap(); + + assert_eq!( + store.active_plugin_version(&plugin_id), + Some("10.0.0".to_string()) + ); +} + +#[test] +fn install_with_new_version_keeps_existing_plugin_root_and_prunes_old_versions() { + let tmp = tempdir().unwrap(); + let store = PluginStore::new(tmp.path().to_path_buf()); + let plugin_id = PluginId::new("sample-plugin".to_string(), "debug".to_string()).unwrap(); + + write_plugin_with_version(tmp.path(), "v1", "sample-plugin", Some("1.0.0")); + store + .install( + AbsolutePathBuf::try_from(tmp.path().join("v1")).unwrap(), + plugin_id.clone(), + ) + .unwrap(); + + write_plugin_with_version(tmp.path(), "v2", "sample-plugin", Some("2.0.0")); + store + .install( + AbsolutePathBuf::try_from(tmp.path().join("v2")).unwrap(), + plugin_id.clone(), + ) + .unwrap(); + + assert_eq!( + store.active_plugin_version(&plugin_id), + Some("2.0.0".to_string()) + ); + assert!( + tmp.path() + .join("plugins/cache/debug/sample-plugin/2.0.0") + .is_dir() + ); + assert!( + !tmp.path() + .join("plugins/cache/debug/sample-plugin/1.0.0") + .exists() + ); +} + +#[test] +fn old_plugin_version_would_stay_active_for_local_or_later_versions() { + assert!(old_plugin_version_would_stay_active( + DEFAULT_PLUGIN_VERSION, + "1.0.0" + )); + assert!(old_plugin_version_would_stay_active("10.0.0", "9.0.0")); + assert!(!old_plugin_version_would_stay_active("1.0.0", "2.0.0")); +} + #[test] fn plugin_root_rejects_path_separators_in_key_segments() { let err = PluginId::parse("../../etc@debug").unwrap_err(); diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 0063771480..24788d4d91 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -134,10 +134,6 @@ use tracing::warn; /// - If the model sends only an assistant message, we record it in the /// conversation history and consider the turn complete. /// -#[expect( - clippy::await_holding_invalid_type, - reason = "turn execution must keep active-turn state transitions atomic" -)] pub(crate) async fn run_turn( sess: Arc, turn_context: Arc, @@ -172,131 +168,11 @@ pub(crate) async fn run_turn( return None; } - let skills_outcome = Some(turn_context.turn_skills.outcome.as_ref()); - sess.record_context_updates_and_set_reference_context_item(turn_context.as_ref()) .await; - let loaded_plugins = sess - .services - .plugins_manager - .plugins_for_config(&turn_context.config.plugins_config_input()) - .await; - // Structured plugin:// mentions are resolved from the current session's - // enabled plugins, then converted into turn-scoped guidance below. - let mentioned_plugins = - collect_explicit_plugin_mentions(&input, loaded_plugins.capability_summaries()); - let mcp_tools = if turn_context.apps_enabled() || !mentioned_plugins.is_empty() { - // Plugin mentions need raw MCP/app inventory even when app tools - // are normally hidden so we can describe the plugin's currently - // usable capabilities for this turn. - match sess - .services - .mcp_connection_manager - .read() - .await - .list_all_tools() - .or_cancel(&cancellation_token) - .await - { - Ok(mcp_tools) => mcp_tools, - Err(_) if turn_context.apps_enabled() => return None, - Err(_) => Vec::new(), - } - } else { - Vec::new() - }; - let available_connectors = if turn_context.apps_enabled() { - let connectors = codex_connectors::merge::merge_plugin_connectors_with_accessible( - loaded_plugins - .effective_apps() - .into_iter() - .map(|connector_id| connector_id.0), - connectors::accessible_connectors_from_mcp_tools(&mcp_tools), - ); - connectors::with_app_enabled_state(connectors, &turn_context.config) - } else { - Vec::new() - }; - let connector_slug_counts = build_connector_slug_counts(&available_connectors); - let skill_name_counts_lower = skills_outcome - .as_ref() - .map_or_else(HashMap::new, |outcome| { - build_skill_name_counts(&outcome.skills, &outcome.disabled_paths).1 - }); - let mentioned_skills = skills_outcome.as_ref().map_or_else(Vec::new, |outcome| { - collect_explicit_skill_mentions( - &input, - &outcome.skills, - &outcome.disabled_paths, - &connector_slug_counts, - ) - }); - maybe_prompt_and_install_mcp_dependencies( - sess.as_ref(), - turn_context.as_ref(), - &cancellation_token, - &mentioned_skills, - Some(sess.mcp_elicitation_reviewer()), - ) - .await; - - let session_telemetry = turn_context.session_telemetry.clone(); - let thread_id = sess.conversation_id.to_string(); - let tracking = build_track_events_context( - turn_context.model_info.slug.clone(), - thread_id, - turn_context.sub_id.clone(), - ); - let SkillInjections { - items: skill_injections, - warnings: skill_warnings, - } = build_skill_injections( - &mentioned_skills, - skills_outcome, - Some(&session_telemetry), - &sess.services.analytics_events_client, - tracking.clone(), - ) - .await; - - for message in skill_warnings { - sess.send_event(&turn_context, EventMsg::Warning(WarningEvent { message })) - .await; - } - - let skill_items: Vec = skill_injections - .iter() - .map(|skill| ContextualUserFragment::into(crate::context::SkillInstructions::from(skill))) - .collect(); - - let plugin_items = - build_plugin_injections(&mentioned_plugins, &mcp_tools, &available_connectors); - let mentioned_plugin_metadata = mentioned_plugins - .iter() - .filter_map(crate::plugins::PluginCapabilitySummary::telemetry_metadata) - .collect::>(); - - let mut explicitly_enabled_connectors = collect_explicit_app_ids(&input); - explicitly_enabled_connectors.extend(collect_explicit_app_ids_from_skill_items( - &skill_items, - &available_connectors, - &skill_name_counts_lower, - )); - let connector_names_by_id = available_connectors - .iter() - .map(|connector| (connector.id.as_str(), connector.name.as_str())) - .collect::>(); - let mentioned_app_invocations = explicitly_enabled_connectors - .iter() - .map(|connector_id| AppInvocation { - connector_id: Some(connector_id.clone()), - app_name: connector_names_by_id - .get(connector_id.as_str()) - .map(|name| (*name).to_string()), - invocation_type: Some(InvocationType::Explicit), - }) - .collect::>(); + let (injection_items, explicitly_enabled_connectors) = + build_skills_and_plugins(&sess, turn_context.as_ref(), &input, &cancellation_token).await?; if run_pending_session_start_hooks(&sess, &turn_context).await { return None; @@ -325,14 +201,6 @@ pub(crate) async fn run_turn( .await; user_prompt_submit_outcome.additional_contexts }; - sess.services - .analytics_events_client - .track_app_mentioned(tracking.clone(), mentioned_app_invocations); - for plugin in mentioned_plugin_metadata { - sess.services - .analytics_events_client - .track_plugin_used(tracking.clone(), plugin); - } sess.merge_connector_selection(explicitly_enabled_connectors.clone()) .await; record_additional_contexts(&sess, &turn_context, additional_contexts).await; @@ -346,12 +214,8 @@ pub(crate) async fn run_turn( })) .await; } - if !skill_items.is_empty() { - sess.record_conversation_items(&turn_context, &skill_items) - .await; - } - if !plugin_items.is_empty() { - sess.record_conversation_items(&turn_context, &plugin_items) + for response_item in injection_items { + sess.record_conversation_items(&turn_context, std::slice::from_ref(&response_item)) .await; } @@ -698,6 +562,142 @@ pub(crate) async fn run_turn( last_agent_message } +#[expect( + clippy::await_holding_invalid_type, + reason = "MCP tool listing borrows the read guard across cancellation-aware await" +)] +async fn build_skills_and_plugins( + sess: &Arc, + turn_context: &TurnContext, + input: &[UserInput], + cancellation_token: &CancellationToken, +) -> Option<(Vec, HashSet)> { + let tracking = build_track_events_context( + turn_context.model_info.slug.clone(), + sess.conversation_id.to_string(), + turn_context.sub_id.clone(), + ); + let loaded_plugins = sess + .services + .plugins_manager + .plugins_for_config(&turn_context.config.plugins_config_input()) + .await; + // Structured plugin:// mentions are resolved from the current session's + // enabled plugins, then converted into turn-scoped guidance below. + let mentioned_plugins = + collect_explicit_plugin_mentions(input, loaded_plugins.capability_summaries()); + let mcp_tools = if turn_context.apps_enabled() || !mentioned_plugins.is_empty() { + // Plugin mentions need raw MCP/app inventory even when app tools + // are normally hidden so we can describe the plugin's currently + // usable capabilities for this turn. + match sess + .services + .mcp_connection_manager + .read() + .await + .list_all_tools() + .or_cancel(cancellation_token) + .await + { + Ok(mcp_tools) => mcp_tools, + Err(_) if turn_context.apps_enabled() => return None, + Err(_) => Vec::new(), + } + } else { + Vec::new() + }; + let available_connectors = if turn_context.apps_enabled() { + let connectors = codex_connectors::merge::merge_plugin_connectors_with_accessible( + loaded_plugins + .effective_apps() + .into_iter() + .map(|connector_id| connector_id.0), + connectors::accessible_connectors_from_mcp_tools(&mcp_tools), + ); + connectors::with_app_enabled_state(connectors, &turn_context.config) + } else { + Vec::new() + }; + let skills_outcome = turn_context.turn_skills.outcome.as_ref(); + let connector_slug_counts = build_connector_slug_counts(&available_connectors); + let skill_name_counts_lower = + build_skill_name_counts(&skills_outcome.skills, &skills_outcome.disabled_paths).1; + let mentioned_skills = collect_explicit_skill_mentions( + input, + &skills_outcome.skills, + &skills_outcome.disabled_paths, + &connector_slug_counts, + ); + maybe_prompt_and_install_mcp_dependencies( + sess, + turn_context, + cancellation_token, + &mentioned_skills, + Some(sess.mcp_elicitation_reviewer()), + ) + .await; + + let SkillInjections { + items: skill_injections, + warnings: skill_warnings, + } = build_skill_injections( + &mentioned_skills, + Some(skills_outcome), + Some(&turn_context.session_telemetry), + &sess.services.analytics_events_client, + tracking.clone(), + ) + .await; + + for message in skill_warnings { + sess.send_event(turn_context, EventMsg::Warning(WarningEvent { message })) + .await; + } + + let skill_items: Vec = skill_injections + .iter() + .map(|skill| ContextualUserFragment::into(crate::context::SkillInstructions::from(skill))) + .collect(); + let skill_connector_ids = collect_explicit_app_ids_from_skill_items( + &skill_items, + &available_connectors, + &skill_name_counts_lower, + ); + let plugin_items = + build_plugin_injections(&mentioned_plugins, &mcp_tools, &available_connectors); + let mut explicitly_enabled_connectors = collect_explicit_app_ids(input); + explicitly_enabled_connectors.extend(skill_connector_ids); + let connector_names_by_id = available_connectors + .iter() + .map(|connector| (connector.id.as_str(), connector.name.as_str())) + .collect::>(); + let mentioned_app_invocations = explicitly_enabled_connectors + .iter() + .map(|connector_id| AppInvocation { + connector_id: Some(connector_id.clone()), + app_name: connector_names_by_id + .get(connector_id.as_str()) + .map(|name| (*name).to_string()), + invocation_type: Some(InvocationType::Explicit), + }) + .collect::>(); + sess.services + .analytics_events_client + .track_app_mentioned(tracking.clone(), mentioned_app_invocations); + for plugin in mentioned_plugins + .iter() + .filter_map(crate::plugins::PluginCapabilitySummary::telemetry_metadata) + { + sess.services + .analytics_events_client + .track_plugin_used(tracking.clone(), plugin); + } + + let mut injection_items = skill_items; + injection_items.extend(plugin_items); + Some((injection_items, explicitly_enabled_connectors)) +} + async fn track_turn_resolved_config_analytics( sess: &Session, turn_context: &TurnContext, diff --git a/codex-rs/tui/src/chatwidget/protocol.rs b/codex-rs/tui/src/chatwidget/protocol.rs index 6faaf4faaf..7ccb2bc342 100644 --- a/codex-rs/tui/src/chatwidget/protocol.rs +++ b/codex-rs/tui/src/chatwidget/protocol.rs @@ -220,6 +220,7 @@ impl ChatWidget { | ServerNotification::AccountRateLimitsUpdated(_) | ServerNotification::ThreadStarted(_) | ServerNotification::ThreadStatusChanged(_) + | ServerNotification::ThreadSettingsUpdated(_) | ServerNotification::ThreadArchived(_) | ServerNotification::ThreadUnarchived(_) | ServerNotification::RawResponseItemCompleted(_)