mirror of
https://github.com/openai/codex.git
synced 2026-06-04 04:12:03 +00:00
Compare commits
2 Commits
starr/agen
...
starr/agen
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8f4d1c2ce6 | ||
|
|
eeb7b76548 |
@@ -1,43 +0,0 @@
|
||||
use crate::store::PluginStoreError;
|
||||
use codex_plugin::PluginId;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use std::path::PathBuf;
|
||||
|
||||
const PLUGINS_DATA_DIR: &str = "plugins/data";
|
||||
|
||||
/// Resolves executor-readable writable roots for mutable plugin data.
|
||||
pub trait PluginDataStore: Send + Sync + 'static {
|
||||
/// Returns the writable local root exposed to hooks for one plugin.
|
||||
fn plugin_data_root(&self, plugin_id: &PluginId) -> AbsolutePathBuf;
|
||||
}
|
||||
|
||||
/// Stores mutable plugin data under the local Codex Home layout.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct LocalPluginDataStore {
|
||||
root: AbsolutePathBuf,
|
||||
}
|
||||
|
||||
impl LocalPluginDataStore {
|
||||
/// Creates the local mutable plugin data store for one Codex Home.
|
||||
pub fn from_codex_home(codex_home: PathBuf) -> Result<Self, PluginStoreError> {
|
||||
let root = AbsolutePathBuf::from_absolute_path_checked(codex_home.join(PLUGINS_DATA_DIR))
|
||||
.map_err(|source| PluginStoreError::Io {
|
||||
context: "failed to resolve plugin data root",
|
||||
source,
|
||||
})?;
|
||||
Ok(Self { root })
|
||||
}
|
||||
}
|
||||
|
||||
impl PluginDataStore for LocalPluginDataStore {
|
||||
fn plugin_data_root(&self, plugin_id: &PluginId) -> AbsolutePathBuf {
|
||||
self.root.join(format!(
|
||||
"{}-{}",
|
||||
plugin_id.plugin_name, plugin_id.marketplace_name
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[path = "data_store_tests.rs"]
|
||||
mod tests;
|
||||
@@ -1,16 +0,0 @@
|
||||
use super::*;
|
||||
use codex_plugin::PluginId;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tempfile::tempdir;
|
||||
|
||||
#[test]
|
||||
fn local_plugin_data_root_derives_path_from_key() {
|
||||
let tmp = tempdir().unwrap();
|
||||
let store = LocalPluginDataStore::from_codex_home(tmp.path().to_path_buf()).unwrap();
|
||||
let plugin_id = PluginId::new("sample".to_string(), "debug".to_string()).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
store.plugin_data_root(&plugin_id).as_path(),
|
||||
tmp.path().join("plugins/data/sample-debug")
|
||||
);
|
||||
}
|
||||
@@ -1,4 +1,3 @@
|
||||
pub mod data_store;
|
||||
mod discoverable;
|
||||
pub mod installed_marketplaces;
|
||||
pub mod loader;
|
||||
@@ -25,8 +24,6 @@ pub const OPENAI_BUNDLED_MARKETPLACE_NAME: &str = "openai-bundled";
|
||||
pub type LoadedPlugin = codex_plugin::LoadedPlugin<codex_config::McpServerConfig>;
|
||||
pub type PluginLoadOutcome = codex_plugin::PluginLoadOutcome<codex_config::McpServerConfig>;
|
||||
|
||||
pub use data_store::LocalPluginDataStore;
|
||||
pub use data_store::PluginDataStore;
|
||||
pub use discoverable::ToolSuggestDiscoverablePlugin;
|
||||
pub use discoverable::ToolSuggestPluginDiscoveryInput;
|
||||
pub use manager::ConfiguredMarketplace;
|
||||
@@ -43,7 +40,6 @@ pub use manager::PluginRemoteSyncError;
|
||||
pub use manager::PluginUninstallError;
|
||||
pub use manager::PluginsConfigInput;
|
||||
pub use manager::PluginsManager;
|
||||
pub use manager::PluginsManagerStorageDeps;
|
||||
pub use manager::RemotePluginSyncResult;
|
||||
pub use marketplace_upgrade::ConfiguredMarketplaceUpgradeError as PluginMarketplaceUpgradeError;
|
||||
pub use marketplace_upgrade::ConfiguredMarketplaceUpgradeOutcome as PluginMarketplaceUpgradeOutcome;
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use crate::OPENAI_CURATED_MARKETPLACE_NAME;
|
||||
use crate::data_store::PluginDataStore;
|
||||
use crate::manifest::PluginManifestHooks;
|
||||
use crate::manifest::PluginManifestPaths;
|
||||
use crate::manifest::load_plugin_manifest;
|
||||
@@ -114,7 +113,6 @@ pub async fn load_plugins_from_layer_stack(
|
||||
config_layer_stack: &ConfigLayerStack,
|
||||
extra_plugins: HashMap<String, PluginConfig>,
|
||||
store: &PluginStore,
|
||||
data_store: &dyn PluginDataStore,
|
||||
restriction_product: Option<Product>,
|
||||
prefer_remote_curated_conflicts: bool,
|
||||
) -> PluginLoadOutcome<McpServerConfig> {
|
||||
@@ -135,7 +133,6 @@ pub async fn load_plugins_from_layer_stack(
|
||||
configured_name.clone(),
|
||||
&plugin,
|
||||
store,
|
||||
data_store,
|
||||
restriction_product,
|
||||
&skill_config_rules,
|
||||
)
|
||||
@@ -560,7 +557,6 @@ async fn load_plugin(
|
||||
config_name: String,
|
||||
plugin: &PluginConfig,
|
||||
store: &PluginStore,
|
||||
data_store: &dyn PluginDataStore,
|
||||
restriction_product: Option<Product>,
|
||||
skill_config_rules: &SkillConfigRules,
|
||||
) -> LoadedPlugin<McpServerConfig> {
|
||||
@@ -663,7 +659,7 @@ async fn load_plugin(
|
||||
let (hook_sources, hook_load_warnings) = load_plugin_hooks(
|
||||
&plugin_root,
|
||||
&loaded_plugin_id,
|
||||
&data_store.plugin_data_root(&loaded_plugin_id),
|
||||
&store.plugin_data_root(&loaded_plugin_id),
|
||||
manifest_paths,
|
||||
);
|
||||
loaded_plugin.hook_sources = hook_sources;
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
use super::PluginLoadOutcome;
|
||||
use super::startup_remote_sync::start_startup_remote_plugin_sync_once;
|
||||
use crate::OPENAI_CURATED_MARKETPLACE_NAME;
|
||||
use crate::data_store::LocalPluginDataStore;
|
||||
use crate::data_store::PluginDataStore;
|
||||
use crate::installed_marketplaces::installed_marketplace_roots_from_layer_stack;
|
||||
use crate::loader::configured_curated_plugin_ids_from_codex_home;
|
||||
use crate::loader::curated_plugin_cache_version;
|
||||
@@ -400,7 +398,6 @@ impl From<RemotePluginFetchError> for PluginRemoteSyncError {
|
||||
pub struct PluginsManager {
|
||||
codex_home: PathBuf,
|
||||
store: PluginStore,
|
||||
data_store: Arc<dyn PluginDataStore>,
|
||||
featured_plugin_ids_cache: RwLock<Option<CachedFeaturedPluginIds>>,
|
||||
configured_marketplace_upgrade_state: RwLock<ConfiguredMarketplaceUpgradeState>,
|
||||
non_curated_cache_refresh_state: RwLock<NonCuratedCacheRefreshState>,
|
||||
@@ -412,26 +409,6 @@ pub struct PluginsManager {
|
||||
analytics_events_client: RwLock<Option<AnalyticsEventsClient>>,
|
||||
}
|
||||
|
||||
/// Dependencies owned by one plugins manager.
|
||||
#[derive(Clone)]
|
||||
pub struct PluginsManagerStorageDeps {
|
||||
data_store: Arc<dyn PluginDataStore>,
|
||||
}
|
||||
|
||||
impl PluginsManagerStorageDeps {
|
||||
/// Creates dependencies backed by the local Codex Home layout.
|
||||
pub fn from_codex_home(codex_home: PathBuf) -> Result<Self, PluginStoreError> {
|
||||
Ok(Self::new(Arc::new(LocalPluginDataStore::from_codex_home(
|
||||
codex_home,
|
||||
)?)))
|
||||
}
|
||||
|
||||
/// Creates dependencies with an alternative mutable plugin data backend.
|
||||
pub fn new(data_store: Arc<dyn PluginDataStore>) -> Self {
|
||||
Self { data_store }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct CachedPluginLoadOutcome {
|
||||
config_version: String,
|
||||
@@ -443,35 +420,9 @@ impl PluginsManager {
|
||||
Self::new_with_restriction_product(codex_home, Some(Product::Codex))
|
||||
}
|
||||
|
||||
/// Creates a plugins manager with injected dependencies.
|
||||
pub fn new_with_storage_deps(
|
||||
codex_home: PathBuf,
|
||||
storage_deps: PluginsManagerStorageDeps,
|
||||
) -> Self {
|
||||
Self::new_with_restriction_product_and_storage_deps(
|
||||
codex_home,
|
||||
Some(Product::Codex),
|
||||
storage_deps,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn new_with_restriction_product(
|
||||
codex_home: PathBuf,
|
||||
restriction_product: Option<Product>,
|
||||
) -> Self {
|
||||
let storage_deps = PluginsManagerStorageDeps::from_codex_home(codex_home.clone())
|
||||
.unwrap_or_else(|err| panic!("plugin data root should be absolute: {err}"));
|
||||
Self::new_with_restriction_product_and_storage_deps(
|
||||
codex_home,
|
||||
restriction_product,
|
||||
storage_deps,
|
||||
)
|
||||
}
|
||||
|
||||
fn new_with_restriction_product_and_storage_deps(
|
||||
codex_home: PathBuf,
|
||||
restriction_product: Option<Product>,
|
||||
storage_deps: PluginsManagerStorageDeps,
|
||||
) -> Self {
|
||||
// Product restrictions are enforced at marketplace admission time for a given CODEX_HOME:
|
||||
// listing, install, and curated refresh all consult this restriction context before new
|
||||
@@ -483,7 +434,6 @@ impl PluginsManager {
|
||||
Self {
|
||||
codex_home: codex_home.clone(),
|
||||
store: PluginStore::new(codex_home),
|
||||
data_store: storage_deps.data_store,
|
||||
featured_plugin_ids_cache: RwLock::new(None),
|
||||
configured_marketplace_upgrade_state: RwLock::new(
|
||||
ConfiguredMarketplaceUpgradeState::default(),
|
||||
@@ -541,7 +491,6 @@ impl PluginsManager {
|
||||
&config.config_layer_stack,
|
||||
self.remote_installed_plugin_configs(),
|
||||
&self.store,
|
||||
self.data_store.as_ref(),
|
||||
self.restriction_product,
|
||||
config.remote_plugin_enabled,
|
||||
)
|
||||
@@ -588,7 +537,6 @@ impl PluginsManager {
|
||||
config_layer_stack,
|
||||
self.remote_installed_plugin_configs(),
|
||||
&self.store,
|
||||
self.data_store.as_ref(),
|
||||
self.restriction_product,
|
||||
config.remote_plugin_enabled,
|
||||
)
|
||||
@@ -1508,7 +1456,7 @@ impl PluginsManager {
|
||||
),
|
||||
)
|
||||
.await;
|
||||
let plugin_data_root = self.data_store.plugin_data_root(&plugin_id);
|
||||
let plugin_data_root = self.store.plugin_data_root(&plugin_id);
|
||||
let (hook_sources, _hook_load_warnings) =
|
||||
load_plugin_hooks(&source_path, &plugin_id, &plugin_data_root, &manifest.paths);
|
||||
let hooks = plugin_hook_declarations(&hook_sources)
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
use super::*;
|
||||
use crate::LoadedPlugin;
|
||||
use crate::PluginLoadOutcome;
|
||||
use crate::data_store::LocalPluginDataStore;
|
||||
use crate::data_store::PluginDataStore;
|
||||
use crate::installed_marketplaces::marketplace_install_root;
|
||||
use crate::loader::load_plugins_from_layer_stack;
|
||||
use crate::loader::refresh_non_curated_plugin_cache;
|
||||
@@ -31,12 +29,10 @@ use codex_config::types::McpServerTransportConfig;
|
||||
use codex_login::CodexAuth;
|
||||
use codex_protocol::protocol::HookEventName;
|
||||
use codex_protocol::protocol::Product;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use codex_utils_absolute_path::test_support::PathBufExt;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use tempfile::TempDir;
|
||||
use toml::Value;
|
||||
use wiremock::Mock;
|
||||
@@ -49,17 +45,6 @@ use wiremock::matchers::query_param;
|
||||
|
||||
const MAX_CAPABILITY_SUMMARY_DESCRIPTION_LEN: usize = 1024;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct FakePluginDataStore {
|
||||
root: AbsolutePathBuf,
|
||||
}
|
||||
|
||||
impl PluginDataStore for FakePluginDataStore {
|
||||
fn plugin_data_root(&self, _plugin_id: &PluginId) -> AbsolutePathBuf {
|
||||
self.root.clone()
|
||||
}
|
||||
}
|
||||
|
||||
fn write_plugin_with_version(
|
||||
root: &Path,
|
||||
dir_name: &str,
|
||||
@@ -3866,7 +3851,6 @@ async fn load_plugins_ignores_project_config_files() {
|
||||
&stack,
|
||||
std::collections::HashMap::new(),
|
||||
&PluginStore::new(codex_home.path().to_path_buf()),
|
||||
&LocalPluginDataStore::from_codex_home(codex_home.path().to_path_buf()).unwrap(),
|
||||
Some(Product::Codex),
|
||||
/*prefer_remote_curated_conflicts*/ false,
|
||||
)
|
||||
@@ -3874,62 +3858,3 @@ async fn load_plugins_ignores_project_config_files() {
|
||||
|
||||
assert_eq!(outcome, PluginLoadOutcome::default());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn plugins_for_layer_stack_uses_injected_plugin_data_store() {
|
||||
let codex_home = TempDir::new().unwrap();
|
||||
let plugin_root = codex_home
|
||||
.path()
|
||||
.join("plugins/cache")
|
||||
.join("test/sample/local");
|
||||
let config_path = AbsolutePathBuf::try_from(codex_home.path().join(CONFIG_TOML_FILE)).unwrap();
|
||||
let data_root =
|
||||
AbsolutePathBuf::try_from(codex_home.path().join("injected-plugin-data")).unwrap();
|
||||
|
||||
write_file(
|
||||
&plugin_root.join(".codex-plugin/plugin.json"),
|
||||
r#"{"name":"sample"}"#,
|
||||
);
|
||||
write_file(
|
||||
&plugin_root.join("hooks/hooks.json"),
|
||||
r#"{"hooks":{"SessionStart":[{"hooks":[{"type":"command","command":"echo startup"}]}]}}"#,
|
||||
);
|
||||
let stack = ConfigLayerStack::new(
|
||||
vec![ConfigLayerEntry::new(
|
||||
ConfigLayerSource::User {
|
||||
file: config_path,
|
||||
profile: None,
|
||||
},
|
||||
toml::from_str(&plugin_config_toml(
|
||||
/*enabled*/ true, /*plugins_feature_enabled*/ true,
|
||||
))
|
||||
.expect("user config should parse"),
|
||||
)],
|
||||
ConfigRequirements::default(),
|
||||
ConfigRequirementsToml::default(),
|
||||
)
|
||||
.expect("config layer stack should build");
|
||||
let config = PluginsConfigInput::new(
|
||||
stack.clone(),
|
||||
/*plugins_enabled*/ true,
|
||||
/*remote_plugin_enabled*/ false,
|
||||
"https://chatgpt.com/backend-api/".to_string(),
|
||||
);
|
||||
let outcome = PluginsManager::new_with_storage_deps(
|
||||
codex_home.path().to_path_buf(),
|
||||
PluginsManagerStorageDeps::new(Arc::new(FakePluginDataStore {
|
||||
root: data_root.clone(),
|
||||
})),
|
||||
)
|
||||
.plugins_for_layer_stack(&stack, &config)
|
||||
.await;
|
||||
|
||||
assert_eq!(
|
||||
outcome
|
||||
.effective_plugin_hook_sources()
|
||||
.into_iter()
|
||||
.map(|source| source.plugin_data_root)
|
||||
.collect::<Vec<_>>(),
|
||||
vec![data_root]
|
||||
);
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@ use std::path::PathBuf;
|
||||
|
||||
pub const DEFAULT_PLUGIN_VERSION: &str = "local";
|
||||
pub const PLUGINS_CACHE_DIR: &str = "plugins/cache";
|
||||
pub const PLUGINS_DATA_DIR: &str = "plugins/data";
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct PluginInstallResult {
|
||||
@@ -26,6 +27,7 @@ pub struct PluginInstallResult {
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PluginStore {
|
||||
root: AbsolutePathBuf,
|
||||
data_root: AbsolutePathBuf,
|
||||
}
|
||||
|
||||
impl PluginStore {
|
||||
@@ -37,7 +39,11 @@ impl PluginStore {
|
||||
pub fn try_new(codex_home: PathBuf) -> Result<Self, PluginStoreError> {
|
||||
let root = AbsolutePathBuf::from_absolute_path_checked(codex_home.join(PLUGINS_CACHE_DIR))
|
||||
.map_err(|err| PluginStoreError::io("failed to resolve plugin cache root", err))?;
|
||||
Ok(Self { root })
|
||||
let data_root =
|
||||
AbsolutePathBuf::from_absolute_path_checked(codex_home.join(PLUGINS_DATA_DIR))
|
||||
.map_err(|err| PluginStoreError::io("failed to resolve plugin data root", err))?;
|
||||
|
||||
Ok(Self { root, data_root })
|
||||
}
|
||||
|
||||
pub fn root(&self) -> &AbsolutePathBuf {
|
||||
@@ -54,6 +60,13 @@ impl PluginStore {
|
||||
self.plugin_base_root(plugin_id).join(plugin_version)
|
||||
}
|
||||
|
||||
pub fn plugin_data_root(&self, plugin_id: &PluginId) -> AbsolutePathBuf {
|
||||
self.data_root.join(format!(
|
||||
"{}-{}",
|
||||
plugin_id.plugin_name, plugin_id.marketplace_name
|
||||
))
|
||||
}
|
||||
|
||||
pub fn active_plugin_version(&self, plugin_id: &PluginId) -> Option<String> {
|
||||
let mut discovered_versions = fs::read_dir(self.plugin_base_root(plugin_id).as_path())
|
||||
.ok()?
|
||||
|
||||
@@ -109,6 +109,18 @@ fn plugin_root_derives_path_from_key_and_version() {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn plugin_data_root_derives_path_from_key() {
|
||||
let tmp = tempdir().unwrap();
|
||||
let store = PluginStore::new(tmp.path().to_path_buf());
|
||||
let plugin_id = PluginId::new("sample".to_string(), "debug".to_string()).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
store.plugin_data_root(&plugin_id).as_path(),
|
||||
tmp.path().join("plugins/data/sample-debug")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn install_with_version_uses_requested_cache_version() {
|
||||
let tmp = tempdir().unwrap();
|
||||
|
||||
80
codex-rs/core/src/artifact_store.rs
Normal file
80
codex-rs/core/src/artifact_store.rs
Normal file
@@ -0,0 +1,80 @@
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
|
||||
use codex_protocol::error::Result;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
|
||||
const GENERATED_IMAGE_ARTIFACTS_DIR: &str = "generated_images";
|
||||
|
||||
/// Stores generated artifacts and materializes executor-readable local paths.
|
||||
pub trait ArtifactStore: Send + Sync + 'static {
|
||||
/// Returns the executor-readable path exposed to generated image instructions.
|
||||
fn generated_image_path(&self, session_id: &str, call_id: &str) -> AbsolutePathBuf;
|
||||
|
||||
/// Persists a generated image payload and returns its executor-readable path.
|
||||
fn write_generated_image(
|
||||
&self,
|
||||
session_id: &str,
|
||||
call_id: &str,
|
||||
bytes: Vec<u8>,
|
||||
) -> ArtifactWriteFuture<'_>;
|
||||
}
|
||||
|
||||
/// Future returned by artifact writes.
|
||||
pub type ArtifactWriteFuture<'a> =
|
||||
Pin<Box<dyn Future<Output = Result<AbsolutePathBuf>> + Send + 'a>>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct LocalArtifactStore {
|
||||
codex_home: AbsolutePathBuf,
|
||||
}
|
||||
|
||||
impl LocalArtifactStore {
|
||||
pub fn from_codex_home(codex_home: &AbsolutePathBuf) -> Self {
|
||||
Self {
|
||||
codex_home: codex_home.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ArtifactStore for LocalArtifactStore {
|
||||
fn generated_image_path(&self, session_id: &str, call_id: &str) -> AbsolutePathBuf {
|
||||
self.codex_home
|
||||
.join(GENERATED_IMAGE_ARTIFACTS_DIR)
|
||||
.join(sanitize_artifact_path_component(session_id))
|
||||
.join(format!("{}.png", sanitize_artifact_path_component(call_id)))
|
||||
}
|
||||
|
||||
fn write_generated_image(
|
||||
&self,
|
||||
session_id: &str,
|
||||
call_id: &str,
|
||||
bytes: Vec<u8>,
|
||||
) -> ArtifactWriteFuture<'_> {
|
||||
let path = self.generated_image_path(session_id, call_id);
|
||||
Box::pin(async move {
|
||||
if let Some(parent) = path.parent() {
|
||||
tokio::fs::create_dir_all(parent).await?;
|
||||
}
|
||||
tokio::fs::write(&path, bytes).await?;
|
||||
Ok(path)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn sanitize_artifact_path_component(value: &str) -> String {
|
||||
let mut sanitized: String = value
|
||||
.chars()
|
||||
.map(|ch| {
|
||||
if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
|
||||
ch
|
||||
} else {
|
||||
'_'
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
if sanitized.is_empty() {
|
||||
sanitized = "generated_image".to_string();
|
||||
}
|
||||
sanitized
|
||||
}
|
||||
@@ -105,6 +105,8 @@ pub(crate) async fn run_codex_thread_interactive(
|
||||
environment_selections: parent_ctx.environments.clone(),
|
||||
analytics_events_client: Some(parent_session.services.analytics_events_client.clone()),
|
||||
thread_store: Arc::clone(&parent_session.services.thread_store),
|
||||
artifact_store: Arc::clone(&parent_session.services.artifact_store),
|
||||
shell_snapshot_store: Arc::clone(&parent_session.services.shell_snapshot_store),
|
||||
attestation_provider: parent_session.services.attestation_provider.clone(),
|
||||
inherited_multi_agent_version: Some(MultiAgentVersion::Disabled),
|
||||
}))
|
||||
|
||||
@@ -24,6 +24,7 @@ pub use codex_thread::CodexThreadSettingsOverrides;
|
||||
pub use codex_thread::ThreadConfigSnapshot;
|
||||
pub use session::turn_context::TurnContext;
|
||||
mod agent;
|
||||
pub mod artifact_store;
|
||||
mod attestation;
|
||||
mod codex_delegate;
|
||||
mod command_canonicalization;
|
||||
@@ -132,7 +133,7 @@ mod rollout;
|
||||
pub(crate) mod safety;
|
||||
mod session_rollout_init_error;
|
||||
pub mod shell;
|
||||
pub(crate) mod shell_snapshot;
|
||||
pub mod shell_snapshot;
|
||||
pub mod spawn;
|
||||
pub(crate) mod state_db_bridge;
|
||||
pub use state_db_bridge::StateDbHandle;
|
||||
|
||||
@@ -13,6 +13,7 @@ use crate::agent::AgentControl;
|
||||
use crate::agent::AgentStatus;
|
||||
use crate::agent::agent_status_from_event;
|
||||
use crate::agent::status::is_final;
|
||||
use crate::artifact_store::ArtifactStore;
|
||||
use crate::attestation::AttestationProvider;
|
||||
use crate::build_available_skills;
|
||||
use crate::compact;
|
||||
@@ -419,6 +420,8 @@ pub(crate) struct CodexSpawnArgs {
|
||||
pub(crate) environment_selections: ResolvedTurnEnvironments,
|
||||
pub(crate) analytics_events_client: Option<AnalyticsEventsClient>,
|
||||
pub(crate) thread_store: Arc<dyn ThreadStore>,
|
||||
pub(crate) artifact_store: Arc<dyn ArtifactStore>,
|
||||
pub(crate) shell_snapshot_store: Arc<dyn crate::shell_snapshot::ShellSnapshotStore>,
|
||||
pub(crate) attestation_provider: Option<Arc<dyn AttestationProvider>>,
|
||||
pub(crate) inherited_multi_agent_version: Option<MultiAgentVersion>,
|
||||
}
|
||||
@@ -499,6 +502,8 @@ impl Codex {
|
||||
environment_selections,
|
||||
analytics_events_client,
|
||||
thread_store,
|
||||
artifact_store,
|
||||
shell_snapshot_store,
|
||||
attestation_provider,
|
||||
inherited_multi_agent_version,
|
||||
} = args;
|
||||
@@ -645,6 +650,8 @@ impl Codex {
|
||||
environment_manager,
|
||||
analytics_events_client,
|
||||
thread_store,
|
||||
artifact_store,
|
||||
shell_snapshot_store,
|
||||
parent_rollout_thread_trace,
|
||||
attestation_provider,
|
||||
multi_agent_version,
|
||||
@@ -1349,7 +1356,6 @@ impl Session {
|
||||
&self,
|
||||
previous_cwd: &AbsolutePathBuf,
|
||||
next_cwd: &AbsolutePathBuf,
|
||||
codex_home: &AbsolutePathBuf,
|
||||
session_source: &SessionSource,
|
||||
) {
|
||||
if previous_cwd == next_cwd {
|
||||
@@ -1368,7 +1374,7 @@ impl Session {
|
||||
}
|
||||
|
||||
ShellSnapshot::refresh_snapshot(
|
||||
codex_home.clone(),
|
||||
Arc::clone(&self.services.shell_snapshot_store),
|
||||
self.conversation_id,
|
||||
next_cwd.clone(),
|
||||
self.services.user_shell.as_ref().clone(),
|
||||
@@ -1389,7 +1395,6 @@ impl Session {
|
||||
previous_cwd,
|
||||
permission_profile_changed,
|
||||
next_cwd,
|
||||
codex_home,
|
||||
session_source,
|
||||
) = {
|
||||
let mut state = self.state.lock().await;
|
||||
@@ -1411,7 +1416,6 @@ impl Session {
|
||||
let permission_profile_changed =
|
||||
previous_permission_profile != updated_permission_profile;
|
||||
let next_cwd = updated.cwd.clone();
|
||||
let codex_home = updated.codex_home.clone();
|
||||
let session_source = updated.session_source.clone();
|
||||
state.session_configuration = updated;
|
||||
(
|
||||
@@ -1420,18 +1424,12 @@ impl Session {
|
||||
previous_cwd,
|
||||
permission_profile_changed,
|
||||
next_cwd,
|
||||
codex_home,
|
||||
session_source,
|
||||
)
|
||||
};
|
||||
|
||||
self.emit_config_changed_contributors(previous_config.as_ref(), new_config.as_ref());
|
||||
self.maybe_refresh_shell_snapshot_for_cwd(
|
||||
&previous_cwd,
|
||||
&next_cwd,
|
||||
&codex_home,
|
||||
&session_source,
|
||||
);
|
||||
self.maybe_refresh_shell_snapshot_for_cwd(&previous_cwd, &next_cwd, &session_source);
|
||||
if permission_profile_changed {
|
||||
self.refresh_managed_network_proxy_for_current_permission_profile()
|
||||
.await;
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use super::input_queue::InputQueue;
|
||||
use super::*;
|
||||
use crate::artifact_store::ArtifactStore;
|
||||
use crate::config::ConstraintError;
|
||||
use crate::goals::GoalRuntimeState;
|
||||
use crate::skills::SkillError;
|
||||
@@ -504,6 +505,8 @@ impl Session {
|
||||
environment_manager: Arc<EnvironmentManager>,
|
||||
analytics_events_client: Option<AnalyticsEventsClient>,
|
||||
thread_store: Arc<dyn ThreadStore>,
|
||||
artifact_store: Arc<dyn ArtifactStore>,
|
||||
shell_snapshot_store: Arc<dyn crate::shell_snapshot::ShellSnapshotStore>,
|
||||
parent_rollout_thread_trace: ThreadTraceContext,
|
||||
attestation_provider: Option<Arc<dyn AttestationProvider>>,
|
||||
multi_agent_version: Option<MultiAgentVersion>,
|
||||
@@ -859,7 +862,7 @@ impl Session {
|
||||
tx
|
||||
} else {
|
||||
ShellSnapshot::start_snapshotting(
|
||||
config.codex_home.clone(),
|
||||
Arc::clone(&shell_snapshot_store),
|
||||
thread_id,
|
||||
session_configuration.cwd.clone(),
|
||||
&mut default_shell,
|
||||
@@ -1006,6 +1009,7 @@ impl Session {
|
||||
rollout_thread_trace,
|
||||
user_shell: Arc::new(default_shell),
|
||||
shell_snapshot_tx,
|
||||
shell_snapshot_store,
|
||||
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
|
||||
exec_policy,
|
||||
auth_manager: Arc::clone(&auth_manager),
|
||||
@@ -1030,6 +1034,7 @@ impl Session {
|
||||
state_db: state_db_ctx.clone(),
|
||||
live_thread: live_thread_init.as_ref().cloned(),
|
||||
thread_store: Arc::clone(&thread_store),
|
||||
artifact_store,
|
||||
attestation_provider: attestation_provider.clone(),
|
||||
model_client: ModelClient::new(
|
||||
Some(Arc::clone(&auth_manager)),
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
use super::turn_context::TurnEnvironment;
|
||||
use super::*;
|
||||
use crate::artifact_store::ArtifactStore;
|
||||
use crate::artifact_store::ArtifactWriteFuture;
|
||||
use crate::config::ConfigBuilder;
|
||||
use crate::config::ConfigOverrides;
|
||||
use crate::config::test_config;
|
||||
@@ -141,6 +143,7 @@ use codex_protocol::protocol::W3cTraceContext;
|
||||
use codex_protocol::request_user_input::RequestUserInputAnswer;
|
||||
use codex_protocol::request_user_input::RequestUserInputResponse;
|
||||
use codex_rmcp_client::ElicitationAction;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use core_test_support::PathBufExt;
|
||||
use core_test_support::PathExt;
|
||||
use core_test_support::context_snapshot;
|
||||
@@ -190,6 +193,32 @@ struct InstructionsTestCase {
|
||||
expects_apply_patch_description: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct FakeArtifactStore {
|
||||
writes: Arc<std::sync::Mutex<Vec<Vec<u8>>>>,
|
||||
}
|
||||
|
||||
impl ArtifactStore for FakeArtifactStore {
|
||||
fn generated_image_path(&self, session_id: &str, call_id: &str) -> AbsolutePathBuf {
|
||||
let path = format!("/fake/{session_id}/{call_id}.png");
|
||||
test_path_buf(&path).abs()
|
||||
}
|
||||
|
||||
fn write_generated_image(
|
||||
&self,
|
||||
session_id: &str,
|
||||
call_id: &str,
|
||||
bytes: Vec<u8>,
|
||||
) -> ArtifactWriteFuture<'_> {
|
||||
let writes = Arc::clone(&self.writes);
|
||||
let path = self.generated_image_path(session_id, call_id);
|
||||
Box::pin(async move {
|
||||
writes.lock().expect("lock writes").push(bytes);
|
||||
Ok(path)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn user_message(text: &str) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
@@ -4603,6 +4632,12 @@ async fn session_new_fails_when_zsh_fork_enabled_without_packaged_zsh() {
|
||||
codex_thread_store::LocalThreadStoreConfig::from_config(config.as_ref()),
|
||||
/*state_db*/ None,
|
||||
)),
|
||||
Arc::new(crate::artifact_store::LocalArtifactStore::from_codex_home(
|
||||
&config.codex_home,
|
||||
)),
|
||||
Arc::new(
|
||||
crate::shell_snapshot::LocalShellSnapshotStore::from_codex_home(&config.codex_home),
|
||||
),
|
||||
codex_rollout_trace::ThreadTraceContext::disabled(),
|
||||
/*attestation_provider*/ None,
|
||||
Some(config.multi_agent_version_from_features()),
|
||||
@@ -4735,6 +4770,9 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
|
||||
rollout_thread_trace: codex_rollout_trace::ThreadTraceContext::disabled(),
|
||||
user_shell: Arc::new(default_user_shell()),
|
||||
shell_snapshot_tx: watch::channel(None).0,
|
||||
shell_snapshot_store: Arc::new(
|
||||
crate::shell_snapshot::LocalShellSnapshotStore::from_codex_home(&config.codex_home),
|
||||
),
|
||||
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
|
||||
exec_policy,
|
||||
auth_manager: auth_manager.clone(),
|
||||
@@ -4763,6 +4801,9 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
|
||||
codex_thread_store::LocalThreadStoreConfig::from_config(config.as_ref()),
|
||||
/*state_db*/ None,
|
||||
)),
|
||||
artifact_store: Arc::new(crate::artifact_store::LocalArtifactStore::from_codex_home(
|
||||
&config.codex_home,
|
||||
)),
|
||||
attestation_provider: None,
|
||||
model_client: ModelClient::new(
|
||||
Some(auth_manager.clone()),
|
||||
@@ -4953,6 +4994,12 @@ async fn make_session_with_config_and_rx(
|
||||
codex_thread_store::LocalThreadStoreConfig::from_config(config.as_ref()),
|
||||
/*state_db*/ None,
|
||||
)),
|
||||
Arc::new(crate::artifact_store::LocalArtifactStore::from_codex_home(
|
||||
&config.codex_home,
|
||||
)),
|
||||
Arc::new(
|
||||
crate::shell_snapshot::LocalShellSnapshotStore::from_codex_home(&config.codex_home),
|
||||
),
|
||||
codex_rollout_trace::ThreadTraceContext::disabled(),
|
||||
/*attestation_provider*/ None,
|
||||
Some(config.multi_agent_version_from_features()),
|
||||
@@ -5065,6 +5112,12 @@ async fn make_session_with_history_source_and_agent_control_and_rx(
|
||||
.expect("state db should initialize"),
|
||||
),
|
||||
)),
|
||||
Arc::new(crate::artifact_store::LocalArtifactStore::from_codex_home(
|
||||
&config.codex_home,
|
||||
)),
|
||||
Arc::new(
|
||||
crate::shell_snapshot::LocalShellSnapshotStore::from_codex_home(&config.codex_home),
|
||||
),
|
||||
codex_rollout_trace::ThreadTraceContext::disabled(),
|
||||
/*attestation_provider*/ None,
|
||||
Some(config.multi_agent_version_from_features()),
|
||||
@@ -6825,6 +6878,9 @@ where
|
||||
rollout_thread_trace: codex_rollout_trace::ThreadTraceContext::disabled(),
|
||||
user_shell: Arc::new(default_user_shell()),
|
||||
shell_snapshot_tx: watch::channel(None).0,
|
||||
shell_snapshot_store: Arc::new(
|
||||
crate::shell_snapshot::LocalShellSnapshotStore::from_codex_home(&config.codex_home),
|
||||
),
|
||||
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
|
||||
exec_policy,
|
||||
auth_manager: Arc::clone(&auth_manager),
|
||||
@@ -6853,6 +6909,9 @@ where
|
||||
codex_thread_store::LocalThreadStoreConfig::from_config(config.as_ref()),
|
||||
state_db,
|
||||
)),
|
||||
artifact_store: Arc::new(crate::artifact_store::LocalArtifactStore::from_codex_home(
|
||||
&config.codex_home,
|
||||
)),
|
||||
attestation_provider: None,
|
||||
model_client: ModelClient::new(
|
||||
Some(Arc::clone(&auth_manager)),
|
||||
@@ -7844,6 +7903,69 @@ async fn handle_output_item_done_records_image_save_history_message() {
|
||||
let _ = std::fs::remove_file(&expected_saved_path);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn handle_output_item_done_uses_injected_artifact_store() {
|
||||
let (mut session, turn_context) = make_session_and_context().await;
|
||||
let writes = Arc::new(std::sync::Mutex::new(Vec::new()));
|
||||
session.services.artifact_store = Arc::new(FakeArtifactStore {
|
||||
writes: Arc::clone(&writes),
|
||||
});
|
||||
let session = Arc::new(session);
|
||||
let turn_context = Arc::new(turn_context);
|
||||
let call_id = "ig_injected_store";
|
||||
let expected_saved_path = session
|
||||
.services
|
||||
.artifact_store
|
||||
.generated_image_path(&session.conversation_id.to_string(), call_id);
|
||||
let item = ResponseItem::ImageGenerationCall {
|
||||
id: call_id.to_string(),
|
||||
status: "completed".to_string(),
|
||||
revised_prompt: Some("a tiny blue square".to_string()),
|
||||
result: "Zm9v".to_string(),
|
||||
};
|
||||
|
||||
let mut ctx = HandleOutputCtx {
|
||||
sess: Arc::clone(&session),
|
||||
turn_context: Arc::clone(&turn_context),
|
||||
turn_store: Arc::new(codex_extension_api::ExtensionData::new(
|
||||
turn_context.sub_id.clone(),
|
||||
)),
|
||||
tool_runtime: test_tool_runtime(Arc::clone(&session), Arc::clone(&turn_context)),
|
||||
cancellation_token: CancellationToken::new(),
|
||||
};
|
||||
handle_output_item_done(&mut ctx, item.clone(), /*previously_active_item*/ None)
|
||||
.await
|
||||
.expect("image generation item should succeed");
|
||||
|
||||
assert_eq!(
|
||||
writes.lock().expect("lock writes").as_slice(),
|
||||
&[b"foo".to_vec()]
|
||||
);
|
||||
let image_output_path = session
|
||||
.services
|
||||
.artifact_store
|
||||
.generated_image_path(&session.conversation_id.to_string(), "<image_id>");
|
||||
let image_output_dir = image_output_path
|
||||
.parent()
|
||||
.expect("generated image path should have a parent");
|
||||
let image_message: ResponseItem = crate::context::ContextualUserFragment::into(
|
||||
crate::context::ImageGenerationInstructions::new(
|
||||
image_output_dir.display(),
|
||||
image_output_path.display(),
|
||||
),
|
||||
);
|
||||
let history = session.clone_history().await;
|
||||
assert_eq!(history.raw_items(), &[image_message, item]);
|
||||
assert_eq!(
|
||||
expected_saved_path,
|
||||
test_path_buf(&format!(
|
||||
"/fake/{}/ig_injected_store.png",
|
||||
session.conversation_id
|
||||
))
|
||||
.abs()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn handle_output_item_done_skips_image_save_message_when_save_fails() {
|
||||
let (session, turn_context) = make_session_and_context().await;
|
||||
|
||||
@@ -702,6 +702,12 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() {
|
||||
codex_thread_store::LocalThreadStoreConfig::from_config(&config),
|
||||
/*state_db*/ None,
|
||||
));
|
||||
let artifact_store = Arc::new(crate::artifact_store::LocalArtifactStore::from_codex_home(
|
||||
&config.codex_home,
|
||||
));
|
||||
let shell_snapshot_store = Arc::new(
|
||||
crate::shell_snapshot::LocalShellSnapshotStore::from_codex_home(&config.codex_home),
|
||||
);
|
||||
|
||||
let CodexSpawnOk { codex, .. } = Codex::spawn(CodexSpawnArgs {
|
||||
config,
|
||||
@@ -733,6 +739,8 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() {
|
||||
},
|
||||
analytics_events_client: None,
|
||||
thread_store,
|
||||
artifact_store,
|
||||
shell_snapshot_store,
|
||||
attestation_provider: None,
|
||||
inherited_multi_agent_version: None,
|
||||
})
|
||||
|
||||
@@ -609,7 +609,6 @@ impl Session {
|
||||
let next_permission_profile = next.permission_profile();
|
||||
let permission_profile_changed =
|
||||
previous_permission_profile != next_permission_profile;
|
||||
let codex_home = next.codex_home.clone();
|
||||
let session_source = next.session_source.clone();
|
||||
let previous_config = notify_config_contributors.then(|| {
|
||||
Self::build_effective_session_config(&state.session_configuration)
|
||||
@@ -622,7 +621,6 @@ impl Session {
|
||||
turn_environments,
|
||||
permission_profile_changed,
|
||||
previous_cwd,
|
||||
codex_home,
|
||||
session_source,
|
||||
previous_config,
|
||||
new_config,
|
||||
@@ -637,7 +635,6 @@ impl Session {
|
||||
turn_environments,
|
||||
permission_profile_changed,
|
||||
previous_cwd,
|
||||
codex_home,
|
||||
session_source,
|
||||
previous_config,
|
||||
new_config,
|
||||
@@ -661,7 +658,6 @@ impl Session {
|
||||
self.maybe_refresh_shell_snapshot_for_cwd(
|
||||
&previous_cwd,
|
||||
&session_configuration.cwd,
|
||||
&codex_home,
|
||||
&session_source,
|
||||
);
|
||||
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
use std::future::Future;
|
||||
use std::io::ErrorKind;
|
||||
use std::path::Path;
|
||||
use std::pin::Pin;
|
||||
use std::process::Stdio;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
@@ -35,9 +37,144 @@ const SNAPSHOT_RETENTION: Duration = Duration::from_secs(60 * 60 * 24 * 3); // 3
|
||||
const SNAPSHOT_DIR: &str = "shell_snapshots";
|
||||
const EXCLUDED_EXPORT_VARS: &[&str] = &["PWD", "OLDPWD"];
|
||||
|
||||
/// Persists executor-local shell snapshot files and owns retention cleanup.
|
||||
/// Implementations must return local readable paths because shell execution sources snapshots.
|
||||
pub trait ShellSnapshotStore: Send + Sync + 'static {
|
||||
/// Allocates the final and temporary paths for one snapshot generation.
|
||||
fn snapshot_paths(&self, session_id: ThreadId, shell_type: ShellType) -> ShellSnapshotPaths;
|
||||
|
||||
/// Removes stale inactive snapshots according to the store's retention policy.
|
||||
fn cleanup_stale_snapshots(
|
||||
&self,
|
||||
active_session_id: ThreadId,
|
||||
state_db: Option<StateDbHandle>,
|
||||
) -> ShellSnapshotStoreFuture<'_>;
|
||||
}
|
||||
|
||||
/// Future returned by shell snapshot store cleanup work.
|
||||
pub type ShellSnapshotStoreFuture<'a> = Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
|
||||
|
||||
#[derive(Clone)]
|
||||
/// Codex Home backed executor-local shell snapshot store.
|
||||
pub struct LocalShellSnapshotStore {
|
||||
codex_home: AbsolutePathBuf,
|
||||
}
|
||||
|
||||
impl LocalShellSnapshotStore {
|
||||
/// Constructs a local shell snapshot store rooted at one Codex Home.
|
||||
pub fn from_codex_home(codex_home: &AbsolutePathBuf) -> Self {
|
||||
Self {
|
||||
codex_home: codex_home.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
fn snapshot_dir(&self) -> AbsolutePathBuf {
|
||||
self.codex_home.join(SNAPSHOT_DIR)
|
||||
}
|
||||
|
||||
async fn cleanup_stale_snapshots_impl(
|
||||
&self,
|
||||
active_session_id: ThreadId,
|
||||
state_db: Option<StateDbHandle>,
|
||||
) -> Result<()> {
|
||||
let snapshot_dir = self.snapshot_dir();
|
||||
|
||||
let mut entries = match fs::read_dir(&snapshot_dir).await {
|
||||
Ok(entries) => entries,
|
||||
Err(err) if err.kind() == ErrorKind::NotFound => return Ok(()),
|
||||
Err(err) => return Err(err.into()),
|
||||
};
|
||||
|
||||
let now = SystemTime::now();
|
||||
let active_session_id = active_session_id.to_string();
|
||||
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
if !entry.file_type().await?.is_file() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let path = entry.path();
|
||||
|
||||
let file_name = entry.file_name();
|
||||
let file_name = file_name.to_string_lossy();
|
||||
let Some(session_id) = snapshot_session_id_from_file_name(&file_name) else {
|
||||
remove_snapshot_file(&path).await;
|
||||
continue;
|
||||
};
|
||||
if session_id == active_session_id {
|
||||
continue;
|
||||
}
|
||||
|
||||
let rollout_path =
|
||||
find_thread_path_by_id_str(&self.codex_home, session_id, state_db.as_deref())
|
||||
.await?;
|
||||
let Some(rollout_path) = rollout_path else {
|
||||
remove_snapshot_file(&path).await;
|
||||
continue;
|
||||
};
|
||||
|
||||
let modified = match fs::metadata(&rollout_path).await.and_then(|m| m.modified()) {
|
||||
Ok(modified) => modified,
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
"Failed to check rollout age for snapshot {}: {err:?}",
|
||||
path.display()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if now
|
||||
.duration_since(modified)
|
||||
.ok()
|
||||
.is_some_and(|age| age >= SNAPSHOT_RETENTION)
|
||||
{
|
||||
remove_snapshot_file(&path).await;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl ShellSnapshotStore for LocalShellSnapshotStore {
|
||||
fn snapshot_paths(&self, session_id: ThreadId, shell_type: ShellType) -> ShellSnapshotPaths {
|
||||
let extension = match shell_type {
|
||||
ShellType::PowerShell => "ps1",
|
||||
_ => "sh",
|
||||
};
|
||||
let nonce = SystemTime::now()
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.map(|duration| duration.as_nanos())
|
||||
.unwrap_or(0);
|
||||
let snapshot_dir = self.snapshot_dir();
|
||||
|
||||
ShellSnapshotPaths {
|
||||
path: snapshot_dir.join(format!("{session_id}.{nonce}.{extension}")),
|
||||
temp_path: snapshot_dir.join(format!("{session_id}.tmp-{nonce}")),
|
||||
}
|
||||
}
|
||||
|
||||
fn cleanup_stale_snapshots(
|
||||
&self,
|
||||
active_session_id: ThreadId,
|
||||
state_db: Option<StateDbHandle>,
|
||||
) -> ShellSnapshotStoreFuture<'_> {
|
||||
Box::pin(self.cleanup_stale_snapshots_impl(active_session_id, state_db))
|
||||
}
|
||||
}
|
||||
|
||||
/// Final and temporary local paths for one shell snapshot generation.
|
||||
pub struct ShellSnapshotPaths {
|
||||
/// Final path sourced by later shell execution.
|
||||
pub path: AbsolutePathBuf,
|
||||
/// Temporary path populated before validation and rename.
|
||||
pub temp_path: AbsolutePathBuf,
|
||||
}
|
||||
|
||||
impl ShellSnapshot {
|
||||
pub fn start_snapshotting(
|
||||
codex_home: AbsolutePathBuf,
|
||||
pub(crate) fn start_snapshotting(
|
||||
shell_snapshot_store: Arc<dyn ShellSnapshotStore>,
|
||||
session_id: ThreadId,
|
||||
session_cwd: AbsolutePathBuf,
|
||||
shell: &mut Shell,
|
||||
@@ -48,7 +185,7 @@ impl ShellSnapshot {
|
||||
shell.shell_snapshot = shell_snapshot_rx;
|
||||
|
||||
Self::spawn_snapshot_task(
|
||||
codex_home,
|
||||
shell_snapshot_store,
|
||||
session_id,
|
||||
session_cwd,
|
||||
shell.clone(),
|
||||
@@ -60,8 +197,8 @@ impl ShellSnapshot {
|
||||
shell_snapshot_tx
|
||||
}
|
||||
|
||||
pub fn refresh_snapshot(
|
||||
codex_home: AbsolutePathBuf,
|
||||
pub(crate) fn refresh_snapshot(
|
||||
shell_snapshot_store: Arc<dyn ShellSnapshotStore>,
|
||||
session_id: ThreadId,
|
||||
session_cwd: AbsolutePathBuf,
|
||||
shell: Shell,
|
||||
@@ -70,7 +207,7 @@ impl ShellSnapshot {
|
||||
state_db: Option<StateDbHandle>,
|
||||
) {
|
||||
Self::spawn_snapshot_task(
|
||||
codex_home,
|
||||
shell_snapshot_store,
|
||||
session_id,
|
||||
session_cwd,
|
||||
shell,
|
||||
@@ -81,7 +218,7 @@ impl ShellSnapshot {
|
||||
}
|
||||
|
||||
fn spawn_snapshot_task(
|
||||
codex_home: AbsolutePathBuf,
|
||||
shell_snapshot_store: Arc<dyn ShellSnapshotStore>,
|
||||
session_id: ThreadId,
|
||||
session_cwd: AbsolutePathBuf,
|
||||
snapshot_shell: Shell,
|
||||
@@ -92,13 +229,24 @@ impl ShellSnapshot {
|
||||
let snapshot_span = info_span!("shell_snapshot", thread_id = %session_id);
|
||||
tokio::spawn(
|
||||
async move {
|
||||
let cleanup_shell_snapshot_store = Arc::clone(&shell_snapshot_store);
|
||||
let cleanup_session_id = session_id;
|
||||
let cleanup_state_db = state_db.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) = cleanup_shell_snapshot_store
|
||||
.cleanup_stale_snapshots(cleanup_session_id, cleanup_state_db)
|
||||
.await
|
||||
{
|
||||
tracing::warn!("Failed to clean up shell snapshots: {err:?}");
|
||||
}
|
||||
});
|
||||
|
||||
let timer = session_telemetry.start_timer("codex.shell_snapshot.duration_ms", &[]);
|
||||
let snapshot = ShellSnapshot::try_new(
|
||||
&codex_home,
|
||||
shell_snapshot_store.as_ref(),
|
||||
session_id,
|
||||
&session_cwd,
|
||||
&snapshot_shell,
|
||||
state_db,
|
||||
)
|
||||
.await
|
||||
.map(Arc::new);
|
||||
@@ -117,38 +265,13 @@ impl ShellSnapshot {
|
||||
}
|
||||
|
||||
async fn try_new(
|
||||
codex_home: &AbsolutePathBuf,
|
||||
shell_snapshot_store: &dyn ShellSnapshotStore,
|
||||
session_id: ThreadId,
|
||||
session_cwd: &AbsolutePathBuf,
|
||||
shell: &Shell,
|
||||
state_db: Option<StateDbHandle>,
|
||||
) -> std::result::Result<Self, &'static str> {
|
||||
// File to store the snapshot
|
||||
let extension = match shell.shell_type {
|
||||
ShellType::PowerShell => "ps1",
|
||||
_ => "sh",
|
||||
};
|
||||
let nonce = SystemTime::now()
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.map(|duration| duration.as_nanos())
|
||||
.unwrap_or(0);
|
||||
let path = codex_home
|
||||
.join(SNAPSHOT_DIR)
|
||||
.join(format!("{session_id}.{nonce}.{extension}"));
|
||||
let temp_path = codex_home
|
||||
.join(SNAPSHOT_DIR)
|
||||
.join(format!("{session_id}.tmp-{nonce}"));
|
||||
|
||||
// Clean the (unlikely) leaked snapshot files.
|
||||
let codex_home = codex_home.clone();
|
||||
let cleanup_session_id = session_id;
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) =
|
||||
cleanup_stale_snapshots(&codex_home, cleanup_session_id, state_db).await
|
||||
{
|
||||
tracing::warn!("Failed to clean up shell snapshots: {err:?}");
|
||||
}
|
||||
});
|
||||
let ShellSnapshotPaths { path, temp_path } =
|
||||
shell_snapshot_store.snapshot_paths(session_id, shell.shell_type.clone());
|
||||
|
||||
// Make the new snapshot.
|
||||
if let Err(err) =
|
||||
@@ -494,72 +617,6 @@ $envVars | ForEach-Object {
|
||||
"##
|
||||
}
|
||||
|
||||
/// Removes shell snapshots that either lack a matching session rollout file or
|
||||
/// whose rollouts have not been updated within the retention window.
|
||||
/// The active session id is exempt from cleanup.
|
||||
pub async fn cleanup_stale_snapshots(
|
||||
codex_home: &AbsolutePathBuf,
|
||||
active_session_id: ThreadId,
|
||||
state_db: Option<StateDbHandle>,
|
||||
) -> Result<()> {
|
||||
let snapshot_dir = codex_home.join(SNAPSHOT_DIR);
|
||||
|
||||
let mut entries = match fs::read_dir(&snapshot_dir).await {
|
||||
Ok(entries) => entries,
|
||||
Err(err) if err.kind() == ErrorKind::NotFound => return Ok(()),
|
||||
Err(err) => return Err(err.into()),
|
||||
};
|
||||
|
||||
let now = SystemTime::now();
|
||||
let active_session_id = active_session_id.to_string();
|
||||
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
if !entry.file_type().await?.is_file() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let path = entry.path();
|
||||
|
||||
let file_name = entry.file_name();
|
||||
let file_name = file_name.to_string_lossy();
|
||||
let Some(session_id) = snapshot_session_id_from_file_name(&file_name) else {
|
||||
remove_snapshot_file(&path).await;
|
||||
continue;
|
||||
};
|
||||
if session_id == active_session_id {
|
||||
continue;
|
||||
}
|
||||
|
||||
let rollout_path =
|
||||
find_thread_path_by_id_str(codex_home, session_id, state_db.as_deref()).await?;
|
||||
let Some(rollout_path) = rollout_path else {
|
||||
remove_snapshot_file(&path).await;
|
||||
continue;
|
||||
};
|
||||
|
||||
let modified = match fs::metadata(&rollout_path).await.and_then(|m| m.modified()) {
|
||||
Ok(modified) => modified,
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
"Failed to check rollout age for snapshot {}: {err:?}",
|
||||
path.display()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if now
|
||||
.duration_since(modified)
|
||||
.ok()
|
||||
.is_some_and(|age| age >= SNAPSHOT_RETENTION)
|
||||
{
|
||||
remove_snapshot_file(&path).await;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn remove_snapshot_file(path: &Path) {
|
||||
if let Err(err) = fs::remove_file(path).await {
|
||||
tracing::warn!("Failed to delete shell snapshot at {:?}: {err:?}", path);
|
||||
|
||||
@@ -196,13 +196,13 @@ async fn try_new_creates_and_deletes_snapshot_file() -> Result<()> {
|
||||
shell_path: PathBuf::from("/bin/bash"),
|
||||
shell_snapshot: crate::shell::empty_shell_snapshot_receiver(),
|
||||
};
|
||||
let shell_snapshot_store = LocalShellSnapshotStore::from_codex_home(&dir.path().abs());
|
||||
|
||||
let snapshot = ShellSnapshot::try_new(
|
||||
&dir.path().abs(),
|
||||
&shell_snapshot_store,
|
||||
ThreadId::new(),
|
||||
&dir.path().abs(),
|
||||
&shell,
|
||||
/*state_db*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("snapshot should be created");
|
||||
@@ -227,25 +227,16 @@ async fn try_new_uses_distinct_generation_paths() -> Result<()> {
|
||||
shell_path: PathBuf::from("/bin/bash"),
|
||||
shell_snapshot: crate::shell::empty_shell_snapshot_receiver(),
|
||||
};
|
||||
let shell_snapshot_store = LocalShellSnapshotStore::from_codex_home(&dir.path().abs());
|
||||
|
||||
let initial_snapshot = ShellSnapshot::try_new(
|
||||
&dir.path().abs(),
|
||||
session_id,
|
||||
&dir.path().abs(),
|
||||
&shell,
|
||||
/*state_db*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("initial snapshot should be created");
|
||||
let refreshed_snapshot = ShellSnapshot::try_new(
|
||||
&dir.path().abs(),
|
||||
session_id,
|
||||
&dir.path().abs(),
|
||||
&shell,
|
||||
/*state_db*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("refreshed snapshot should be created");
|
||||
let initial_snapshot =
|
||||
ShellSnapshot::try_new(&shell_snapshot_store, session_id, &dir.path().abs(), &shell)
|
||||
.await
|
||||
.expect("initial snapshot should be created");
|
||||
let refreshed_snapshot =
|
||||
ShellSnapshot::try_new(&shell_snapshot_store, session_id, &dir.path().abs(), &shell)
|
||||
.await
|
||||
.expect("refreshed snapshot should be created");
|
||||
let initial_path = initial_snapshot.path.clone();
|
||||
let refreshed_path = refreshed_snapshot.path.clone();
|
||||
|
||||
@@ -439,7 +430,9 @@ async fn cleanup_stale_snapshots_removes_orphans_and_keeps_live() -> Result<()>
|
||||
fs::write(&orphan_snapshot, "orphan").await?;
|
||||
fs::write(&invalid_snapshot, "invalid").await?;
|
||||
|
||||
cleanup_stale_snapshots(&codex_home, ThreadId::new(), /*state_db*/ None).await?;
|
||||
LocalShellSnapshotStore::from_codex_home(&codex_home)
|
||||
.cleanup_stale_snapshots(ThreadId::new(), /*state_db*/ None)
|
||||
.await?;
|
||||
|
||||
assert_eq!(live_snapshot.exists(), true);
|
||||
assert_eq!(orphan_snapshot.exists(), false);
|
||||
@@ -462,7 +455,9 @@ async fn cleanup_stale_snapshots_removes_stale_rollouts() -> Result<()> {
|
||||
|
||||
set_file_mtime(&rollout_path, SNAPSHOT_RETENTION + Duration::from_secs(60))?;
|
||||
|
||||
cleanup_stale_snapshots(&codex_home, ThreadId::new(), /*state_db*/ None).await?;
|
||||
LocalShellSnapshotStore::from_codex_home(&codex_home)
|
||||
.cleanup_stale_snapshots(ThreadId::new(), /*state_db*/ None)
|
||||
.await?;
|
||||
|
||||
assert_eq!(stale_snapshot.exists(), false);
|
||||
Ok(())
|
||||
@@ -483,7 +478,9 @@ async fn cleanup_stale_snapshots_skips_active_session() -> Result<()> {
|
||||
|
||||
set_file_mtime(&rollout_path, SNAPSHOT_RETENTION + Duration::from_secs(60))?;
|
||||
|
||||
cleanup_stale_snapshots(&codex_home, active_session, /*state_db*/ None).await?;
|
||||
LocalShellSnapshotStore::from_codex_home(&codex_home)
|
||||
.cleanup_stale_snapshots(active_session, /*state_db*/ None)
|
||||
.await?;
|
||||
|
||||
assert_eq!(active_snapshot.exists(), true);
|
||||
Ok(())
|
||||
|
||||
@@ -3,6 +3,7 @@ use std::sync::Arc;
|
||||
|
||||
use crate::SkillsManager;
|
||||
use crate::agent::AgentControl;
|
||||
use crate::artifact_store::ArtifactStore;
|
||||
use crate::attestation::AttestationProvider;
|
||||
use crate::client::ModelClient;
|
||||
use crate::config::NetworkProxyAuditMetadata;
|
||||
@@ -51,6 +52,7 @@ pub(crate) struct SessionServices {
|
||||
pub(crate) rollout_thread_trace: ThreadTraceContext,
|
||||
pub(crate) user_shell: Arc<crate::shell::Shell>,
|
||||
pub(crate) shell_snapshot_tx: watch::Sender<Option<Arc<crate::shell_snapshot::ShellSnapshot>>>,
|
||||
pub(crate) shell_snapshot_store: Arc<dyn crate::shell_snapshot::ShellSnapshotStore>,
|
||||
pub(crate) show_raw_agent_reasoning: bool,
|
||||
pub(crate) exec_policy: Arc<ExecPolicyManager>,
|
||||
pub(crate) auth_manager: Arc<AuthManager>,
|
||||
@@ -74,6 +76,7 @@ pub(crate) struct SessionServices {
|
||||
pub(crate) state_db: Option<StateDbHandle>,
|
||||
pub(crate) live_thread: Option<LiveThread>,
|
||||
pub(crate) thread_store: Arc<dyn ThreadStore>,
|
||||
pub(crate) artifact_store: Arc<dyn ArtifactStore>,
|
||||
pub(crate) attestation_provider: Option<Arc<dyn AttestationProvider>>,
|
||||
/// Session-scoped model client shared across turns.
|
||||
pub(crate) model_client: ModelClient,
|
||||
|
||||
@@ -36,34 +36,17 @@ use tracing::debug;
|
||||
use tracing::instrument;
|
||||
use tracing::warn;
|
||||
|
||||
const GENERATED_IMAGE_ARTIFACTS_DIR: &str = "generated_images";
|
||||
use crate::artifact_store::ArtifactStore;
|
||||
#[cfg(test)]
|
||||
use crate::artifact_store::LocalArtifactStore;
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn image_generation_artifact_path(
|
||||
codex_home: &AbsolutePathBuf,
|
||||
session_id: &str,
|
||||
call_id: &str,
|
||||
) -> AbsolutePathBuf {
|
||||
let sanitize = |value: &str| {
|
||||
let mut sanitized: String = value
|
||||
.chars()
|
||||
.map(|ch| {
|
||||
if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
|
||||
ch
|
||||
} else {
|
||||
'_'
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
if sanitized.is_empty() {
|
||||
sanitized = "generated_image".to_string();
|
||||
}
|
||||
sanitized
|
||||
};
|
||||
|
||||
codex_home
|
||||
.join(GENERATED_IMAGE_ARTIFACTS_DIR)
|
||||
.join(sanitize(session_id))
|
||||
.join(format!("{}.png", sanitize(call_id)))
|
||||
LocalArtifactStore::from_codex_home(codex_home).generated_image_path(session_id, call_id)
|
||||
}
|
||||
|
||||
fn strip_hidden_assistant_markup(text: &str, plan_mode: bool) -> String {
|
||||
@@ -108,7 +91,7 @@ pub(crate) fn raw_assistant_output_text_from_item(item: &ResponseItem) -> Option
|
||||
}
|
||||
|
||||
async fn save_image_generation_result(
|
||||
codex_home: &AbsolutePathBuf,
|
||||
artifact_store: &dyn ArtifactStore,
|
||||
session_id: &str,
|
||||
call_id: &str,
|
||||
result: &str,
|
||||
@@ -118,12 +101,9 @@ async fn save_image_generation_result(
|
||||
.map_err(|err| {
|
||||
CodexErr::InvalidRequest(format!("invalid image generation payload: {err}"))
|
||||
})?;
|
||||
let path = image_generation_artifact_path(codex_home, session_id, call_id);
|
||||
if let Some(parent) = path.parent() {
|
||||
tokio::fs::create_dir_all(parent).await?;
|
||||
}
|
||||
tokio::fs::write(&path, bytes).await?;
|
||||
Ok(path)
|
||||
artifact_store
|
||||
.write_generated_image(session_id, call_id, bytes)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn persist_image_generation_item(
|
||||
@@ -134,7 +114,7 @@ pub(crate) async fn persist_image_generation_item(
|
||||
image_item.saved_path = None;
|
||||
let session_id = sess.conversation_id.to_string();
|
||||
match save_image_generation_result(
|
||||
&turn_context.config.codex_home,
|
||||
sess.services.artifact_store.as_ref(),
|
||||
&session_id,
|
||||
&image_item.id,
|
||||
&image_item.result,
|
||||
@@ -146,11 +126,10 @@ pub(crate) async fn persist_image_generation_item(
|
||||
Some(path)
|
||||
}
|
||||
Err(err) => {
|
||||
let output_path = image_generation_artifact_path(
|
||||
&turn_context.config.codex_home,
|
||||
&session_id,
|
||||
&image_item.id,
|
||||
);
|
||||
let output_path = sess
|
||||
.services
|
||||
.artifact_store
|
||||
.generated_image_path(&session_id, &image_item.id);
|
||||
let output_dir = output_path
|
||||
.parent()
|
||||
.unwrap_or_else(|| turn_context.config.codex_home.clone());
|
||||
@@ -173,8 +152,10 @@ async fn record_image_generation_instructions(
|
||||
return;
|
||||
}
|
||||
let session_id = sess.conversation_id.to_string();
|
||||
let image_output_path =
|
||||
image_generation_artifact_path(&turn_context.config.codex_home, &session_id, "<image_id>");
|
||||
let image_output_path = sess
|
||||
.services
|
||||
.artifact_store
|
||||
.generated_image_path(&session_id, "<image_id>");
|
||||
let image_output_dir = image_output_path
|
||||
.parent()
|
||||
.unwrap_or_else(|| turn_context.config.codex_home.clone());
|
||||
|
||||
@@ -8,6 +8,7 @@ use super::image_generation_artifact_path;
|
||||
use super::last_assistant_message_from_item;
|
||||
use super::response_item_may_include_external_context;
|
||||
use super::save_image_generation_result;
|
||||
use crate::artifact_store::LocalArtifactStore;
|
||||
use crate::session::tests::make_session_and_context;
|
||||
use crate::tools::ToolRouter;
|
||||
use crate::tools::parallel::ToolCallRuntime;
|
||||
@@ -424,10 +425,14 @@ async fn save_image_generation_result_saves_base64_to_png_in_codex_home() {
|
||||
let expected_path = image_generation_artifact_path(&codex_home, "session-1", "ig_save_base64");
|
||||
let _ = std::fs::remove_file(&expected_path);
|
||||
|
||||
let saved_path =
|
||||
save_image_generation_result(&codex_home, "session-1", "ig_save_base64", "Zm9v")
|
||||
.await
|
||||
.expect("image should be saved");
|
||||
let saved_path = save_image_generation_result(
|
||||
&LocalArtifactStore::from_codex_home(&codex_home),
|
||||
"session-1",
|
||||
"ig_save_base64",
|
||||
"Zm9v",
|
||||
)
|
||||
.await
|
||||
.expect("image should be saved");
|
||||
|
||||
assert_eq!(saved_path, expected_path);
|
||||
assert_eq!(std::fs::read(&saved_path).expect("saved file"), b"foo");
|
||||
@@ -440,9 +445,14 @@ async fn save_image_generation_result_rejects_data_url_payload() {
|
||||
let codex_home = tempfile::tempdir().expect("create codex home");
|
||||
let codex_home = codex_home.path().abs();
|
||||
|
||||
let err = save_image_generation_result(&codex_home, "session-1", "ig_456", result)
|
||||
.await
|
||||
.expect_err("data url payload should error");
|
||||
let err = save_image_generation_result(
|
||||
&LocalArtifactStore::from_codex_home(&codex_home),
|
||||
"session-1",
|
||||
"ig_456",
|
||||
result,
|
||||
)
|
||||
.await
|
||||
.expect_err("data url payload should error");
|
||||
assert!(matches!(err, CodexErr::InvalidRequest(_)));
|
||||
}
|
||||
|
||||
@@ -459,9 +469,14 @@ async fn save_image_generation_result_overwrites_existing_file() {
|
||||
.expect("create image output dir");
|
||||
std::fs::write(&existing_path, b"existing").expect("seed existing image");
|
||||
|
||||
let saved_path = save_image_generation_result(&codex_home, "session-1", "ig_overwrite", "Zm9v")
|
||||
.await
|
||||
.expect("image should be saved");
|
||||
let saved_path = save_image_generation_result(
|
||||
&LocalArtifactStore::from_codex_home(&codex_home),
|
||||
"session-1",
|
||||
"ig_overwrite",
|
||||
"Zm9v",
|
||||
)
|
||||
.await
|
||||
.expect("image should be saved");
|
||||
|
||||
assert_eq!(saved_path, existing_path);
|
||||
assert_eq!(std::fs::read(&saved_path).expect("saved file"), b"foo");
|
||||
@@ -475,9 +490,14 @@ async fn save_image_generation_result_sanitizes_call_id_for_codex_home_output_pa
|
||||
let expected_path = image_generation_artifact_path(&codex_home, "session-1", "../ig/..");
|
||||
let _ = std::fs::remove_file(&expected_path);
|
||||
|
||||
let saved_path = save_image_generation_result(&codex_home, "session-1", "../ig/..", "Zm9v")
|
||||
.await
|
||||
.expect("image should be saved");
|
||||
let saved_path = save_image_generation_result(
|
||||
&LocalArtifactStore::from_codex_home(&codex_home),
|
||||
"session-1",
|
||||
"../ig/..",
|
||||
"Zm9v",
|
||||
)
|
||||
.await
|
||||
.expect("image should be saved");
|
||||
|
||||
assert_eq!(saved_path, expected_path);
|
||||
assert_eq!(std::fs::read(&saved_path).expect("saved file"), b"foo");
|
||||
@@ -488,9 +508,14 @@ async fn save_image_generation_result_sanitizes_call_id_for_codex_home_output_pa
|
||||
async fn save_image_generation_result_rejects_non_standard_base64() {
|
||||
let codex_home = tempfile::tempdir().expect("create codex home");
|
||||
let codex_home = codex_home.path().abs();
|
||||
let err = save_image_generation_result(&codex_home, "session-1", "ig_urlsafe", "_-8")
|
||||
.await
|
||||
.expect_err("non-standard base64 should error");
|
||||
let err = save_image_generation_result(
|
||||
&LocalArtifactStore::from_codex_home(&codex_home),
|
||||
"session-1",
|
||||
"ig_urlsafe",
|
||||
"_-8",
|
||||
)
|
||||
.await
|
||||
.expect_err("non-standard base64 should error");
|
||||
assert!(matches!(err, CodexErr::InvalidRequest(_)));
|
||||
}
|
||||
|
||||
@@ -499,7 +524,7 @@ async fn save_image_generation_result_rejects_non_base64_data_urls() {
|
||||
let codex_home = tempfile::tempdir().expect("create codex home");
|
||||
let codex_home = codex_home.path().abs();
|
||||
let err = save_image_generation_result(
|
||||
&codex_home,
|
||||
&LocalArtifactStore::from_codex_home(&codex_home),
|
||||
"session-1",
|
||||
"ig_svg",
|
||||
"data:image/svg+xml,<svg/>",
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
use crate::SkillsManager;
|
||||
use crate::agent::AgentControl;
|
||||
use crate::artifact_store::ArtifactStore;
|
||||
use crate::artifact_store::LocalArtifactStore;
|
||||
use crate::attestation::AttestationProvider;
|
||||
use crate::codex_thread::CodexThread;
|
||||
use crate::config::Config;
|
||||
@@ -13,7 +15,9 @@ use crate::session::CodexSpawnArgs;
|
||||
use crate::session::CodexSpawnOk;
|
||||
use crate::session::INITIAL_SUBMIT_ID;
|
||||
use crate::session::resolve_multi_agent_version;
|
||||
use crate::shell_snapshot::LocalShellSnapshotStore;
|
||||
use crate::shell_snapshot::ShellSnapshot;
|
||||
use crate::shell_snapshot::ShellSnapshotStore;
|
||||
use crate::tasks::InterruptedTurnHistoryMarker;
|
||||
use crate::tasks::interrupted_turn_history_marker;
|
||||
use codex_analytics::AnalyticsEventsClient;
|
||||
@@ -208,6 +212,8 @@ pub(crate) struct ThreadManagerState {
|
||||
mcp_manager: Arc<McpManager>,
|
||||
extensions: Arc<ExtensionRegistry<Config>>,
|
||||
thread_store: Arc<dyn ThreadStore>,
|
||||
artifact_store: Arc<dyn ArtifactStore>,
|
||||
shell_snapshot_store: Arc<dyn ShellSnapshotStore>,
|
||||
attestation_provider: Option<Arc<dyn AttestationProvider>>,
|
||||
session_source: SessionSource,
|
||||
installation_id: String,
|
||||
@@ -262,6 +268,98 @@ impl ThreadManager {
|
||||
state_db: Option<StateDbHandle>,
|
||||
installation_id: String,
|
||||
attestation_provider: Option<Arc<dyn AttestationProvider>>,
|
||||
) -> Self {
|
||||
Self::new_with_artifact_store(
|
||||
config,
|
||||
auth_manager,
|
||||
session_source,
|
||||
environment_manager,
|
||||
extensions,
|
||||
analytics_events_client,
|
||||
thread_store,
|
||||
Arc::new(LocalArtifactStore::from_codex_home(&config.codex_home)),
|
||||
state_db,
|
||||
installation_id,
|
||||
attestation_provider,
|
||||
)
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
/// Constructs a thread manager with an explicit generated artifact backend.
|
||||
pub fn new_with_artifact_store(
|
||||
config: &Config,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
session_source: SessionSource,
|
||||
environment_manager: Arc<EnvironmentManager>,
|
||||
extensions: Arc<ExtensionRegistry<Config>>,
|
||||
analytics_events_client: Option<AnalyticsEventsClient>,
|
||||
thread_store: Arc<dyn ThreadStore>,
|
||||
artifact_store: Arc<dyn ArtifactStore>,
|
||||
state_db: Option<StateDbHandle>,
|
||||
installation_id: String,
|
||||
attestation_provider: Option<Arc<dyn AttestationProvider>>,
|
||||
) -> Self {
|
||||
Self::new_with_storage_backends(
|
||||
config,
|
||||
auth_manager,
|
||||
session_source,
|
||||
environment_manager,
|
||||
extensions,
|
||||
analytics_events_client,
|
||||
thread_store,
|
||||
artifact_store,
|
||||
Arc::new(LocalShellSnapshotStore::from_codex_home(&config.codex_home)),
|
||||
state_db,
|
||||
installation_id,
|
||||
attestation_provider,
|
||||
)
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
/// Constructs a thread manager with an explicit shell snapshot backend.
|
||||
pub fn new_with_shell_snapshot_store(
|
||||
config: &Config,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
session_source: SessionSource,
|
||||
environment_manager: Arc<EnvironmentManager>,
|
||||
extensions: Arc<ExtensionRegistry<Config>>,
|
||||
analytics_events_client: Option<AnalyticsEventsClient>,
|
||||
thread_store: Arc<dyn ThreadStore>,
|
||||
shell_snapshot_store: Arc<dyn ShellSnapshotStore>,
|
||||
state_db: Option<StateDbHandle>,
|
||||
installation_id: String,
|
||||
attestation_provider: Option<Arc<dyn AttestationProvider>>,
|
||||
) -> Self {
|
||||
Self::new_with_storage_backends(
|
||||
config,
|
||||
auth_manager,
|
||||
session_source,
|
||||
environment_manager,
|
||||
extensions,
|
||||
analytics_events_client,
|
||||
thread_store,
|
||||
Arc::new(LocalArtifactStore::from_codex_home(&config.codex_home)),
|
||||
shell_snapshot_store,
|
||||
state_db,
|
||||
installation_id,
|
||||
attestation_provider,
|
||||
)
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn new_with_storage_backends(
|
||||
config: &Config,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
session_source: SessionSource,
|
||||
environment_manager: Arc<EnvironmentManager>,
|
||||
extensions: Arc<ExtensionRegistry<Config>>,
|
||||
analytics_events_client: Option<AnalyticsEventsClient>,
|
||||
thread_store: Arc<dyn ThreadStore>,
|
||||
artifact_store: Arc<dyn ArtifactStore>,
|
||||
shell_snapshot_store: Arc<dyn ShellSnapshotStore>,
|
||||
state_db: Option<StateDbHandle>,
|
||||
installation_id: String,
|
||||
attestation_provider: Option<Arc<dyn AttestationProvider>>,
|
||||
) -> Self {
|
||||
let codex_home = config.codex_home.clone();
|
||||
let restriction_product = session_source.restriction_product();
|
||||
@@ -287,6 +385,8 @@ impl ThreadManager {
|
||||
mcp_manager,
|
||||
extensions,
|
||||
thread_store,
|
||||
artifact_store,
|
||||
shell_snapshot_store,
|
||||
attestation_provider,
|
||||
auth_manager,
|
||||
session_source,
|
||||
@@ -361,8 +461,10 @@ impl ThreadManager {
|
||||
restriction_product,
|
||||
));
|
||||
let mcp_manager = Arc::new(McpManager::new(Arc::clone(&plugins_manager)));
|
||||
let shell_snapshot_store: Arc<dyn ShellSnapshotStore> =
|
||||
Arc::new(LocalShellSnapshotStore::from_codex_home(&skills_codex_home));
|
||||
let skills_manager = Arc::new(SkillsManager::new_with_restriction_product(
|
||||
skills_codex_home,
|
||||
skills_codex_home.clone(),
|
||||
/*bundled_skills_enabled*/ true,
|
||||
restriction_product,
|
||||
));
|
||||
@@ -376,6 +478,8 @@ impl ThreadManager {
|
||||
},
|
||||
state_db.clone(),
|
||||
));
|
||||
let artifact_store: Arc<dyn ArtifactStore> =
|
||||
Arc::new(LocalArtifactStore::from_codex_home(&skills_codex_home));
|
||||
Self {
|
||||
state: Arc::new(ThreadManagerState {
|
||||
threads: Arc::new(RwLock::new(HashMap::new())),
|
||||
@@ -388,6 +492,8 @@ impl ThreadManager {
|
||||
mcp_manager,
|
||||
extensions: empty_extension_registry(),
|
||||
thread_store,
|
||||
artifact_store,
|
||||
shell_snapshot_store,
|
||||
attestation_provider: None,
|
||||
auth_manager,
|
||||
session_source: SessionSource::Exec,
|
||||
@@ -1334,6 +1440,8 @@ impl ThreadManagerState {
|
||||
environment_selections,
|
||||
analytics_events_client: self.analytics_events_client.clone(),
|
||||
thread_store: Arc::clone(&self.thread_store),
|
||||
artifact_store: Arc::clone(&self.artifact_store),
|
||||
shell_snapshot_store: Arc::clone(&self.shell_snapshot_store),
|
||||
attestation_provider: self.attestation_provider.clone(),
|
||||
inherited_multi_agent_version: multi_agent_version,
|
||||
}))
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
use super::*;
|
||||
use crate::artifact_store::ArtifactStore;
|
||||
use crate::artifact_store::ArtifactWriteFuture;
|
||||
use crate::config::test_config;
|
||||
use crate::init_state_db;
|
||||
use crate::installation_id::INSTALLATION_ID_FILENAME;
|
||||
@@ -33,6 +35,23 @@ use wiremock::MockServer;
|
||||
|
||||
const TEST_INSTALLATION_ID: &str = "11111111-1111-4111-8111-111111111111";
|
||||
|
||||
struct FakeArtifactStore;
|
||||
|
||||
impl ArtifactStore for FakeArtifactStore {
|
||||
fn generated_image_path(&self, _session_id: &str, _call_id: &str) -> AbsolutePathBuf {
|
||||
unreachable!("generated image path should not be requested")
|
||||
}
|
||||
|
||||
fn write_generated_image(
|
||||
&self,
|
||||
_session_id: &str,
|
||||
_call_id: &str,
|
||||
_bytes: Vec<u8>,
|
||||
) -> ArtifactWriteFuture<'_> {
|
||||
unreachable!("generated image write should not be requested")
|
||||
}
|
||||
}
|
||||
|
||||
fn user_msg(text: &str) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
@@ -64,6 +83,31 @@ fn developer_interrupted_marker() -> ResponseItem {
|
||||
.expect("developer interrupted marker should be enabled")
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn start_thread_uses_injected_artifact_store() {
|
||||
let config = test_config().await;
|
||||
let artifact_store: Arc<dyn ArtifactStore> = Arc::new(FakeArtifactStore);
|
||||
let manager = ThreadManager::new_with_artifact_store(
|
||||
&config,
|
||||
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing()),
|
||||
SessionSource::Exec,
|
||||
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
|
||||
empty_extension_registry(),
|
||||
/*analytics_events_client*/ None,
|
||||
thread_store_from_config(&config, /*state_db*/ None),
|
||||
Arc::clone(&artifact_store),
|
||||
/*state_db*/ None,
|
||||
TEST_INSTALLATION_ID.to_string(),
|
||||
/*attestation_provider*/ None,
|
||||
);
|
||||
let thread = manager.start_thread(config).await.expect("start thread");
|
||||
|
||||
assert!(Arc::ptr_eq(
|
||||
&thread.thread.codex.session.services.artifact_store,
|
||||
&artifact_store
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn truncates_before_requested_user_message() {
|
||||
let items = [
|
||||
|
||||
Reference in New Issue
Block a user