Compare commits

..

2 Commits

Author SHA1 Message Date
starr-openai
8f4d1c2ce6 refactor shell snapshot store seam 2026-06-03 00:19:03 -07:00
starr-openai
eeb7b76548 refactor generated image artifact store seam 2026-06-03 00:18:11 -07:00
23 changed files with 653 additions and 395 deletions

View File

@@ -1,43 +0,0 @@
use crate::store::PluginStoreError;
use codex_plugin::PluginId;
use codex_utils_absolute_path::AbsolutePathBuf;
use std::path::PathBuf;
const PLUGINS_DATA_DIR: &str = "plugins/data";
/// Resolves executor-readable writable roots for mutable plugin data.
pub trait PluginDataStore: Send + Sync + 'static {
/// Returns the writable local root exposed to hooks for one plugin.
fn plugin_data_root(&self, plugin_id: &PluginId) -> AbsolutePathBuf;
}
/// Stores mutable plugin data under the local Codex Home layout.
#[derive(Debug, Clone)]
pub struct LocalPluginDataStore {
root: AbsolutePathBuf,
}
impl LocalPluginDataStore {
/// Creates the local mutable plugin data store for one Codex Home.
pub fn from_codex_home(codex_home: PathBuf) -> Result<Self, PluginStoreError> {
let root = AbsolutePathBuf::from_absolute_path_checked(codex_home.join(PLUGINS_DATA_DIR))
.map_err(|source| PluginStoreError::Io {
context: "failed to resolve plugin data root",
source,
})?;
Ok(Self { root })
}
}
impl PluginDataStore for LocalPluginDataStore {
fn plugin_data_root(&self, plugin_id: &PluginId) -> AbsolutePathBuf {
self.root.join(format!(
"{}-{}",
plugin_id.plugin_name, plugin_id.marketplace_name
))
}
}
#[cfg(test)]
#[path = "data_store_tests.rs"]
mod tests;

View File

@@ -1,16 +0,0 @@
use super::*;
use codex_plugin::PluginId;
use pretty_assertions::assert_eq;
use tempfile::tempdir;
#[test]
fn local_plugin_data_root_derives_path_from_key() {
let tmp = tempdir().unwrap();
let store = LocalPluginDataStore::from_codex_home(tmp.path().to_path_buf()).unwrap();
let plugin_id = PluginId::new("sample".to_string(), "debug".to_string()).unwrap();
assert_eq!(
store.plugin_data_root(&plugin_id).as_path(),
tmp.path().join("plugins/data/sample-debug")
);
}

View File

@@ -1,4 +1,3 @@
pub mod data_store;
mod discoverable;
pub mod installed_marketplaces;
pub mod loader;
@@ -25,8 +24,6 @@ pub const OPENAI_BUNDLED_MARKETPLACE_NAME: &str = "openai-bundled";
pub type LoadedPlugin = codex_plugin::LoadedPlugin<codex_config::McpServerConfig>;
pub type PluginLoadOutcome = codex_plugin::PluginLoadOutcome<codex_config::McpServerConfig>;
pub use data_store::LocalPluginDataStore;
pub use data_store::PluginDataStore;
pub use discoverable::ToolSuggestDiscoverablePlugin;
pub use discoverable::ToolSuggestPluginDiscoveryInput;
pub use manager::ConfiguredMarketplace;
@@ -43,7 +40,6 @@ pub use manager::PluginRemoteSyncError;
pub use manager::PluginUninstallError;
pub use manager::PluginsConfigInput;
pub use manager::PluginsManager;
pub use manager::PluginsManagerStorageDeps;
pub use manager::RemotePluginSyncResult;
pub use marketplace_upgrade::ConfiguredMarketplaceUpgradeError as PluginMarketplaceUpgradeError;
pub use marketplace_upgrade::ConfiguredMarketplaceUpgradeOutcome as PluginMarketplaceUpgradeOutcome;

View File

@@ -1,5 +1,4 @@
use crate::OPENAI_CURATED_MARKETPLACE_NAME;
use crate::data_store::PluginDataStore;
use crate::manifest::PluginManifestHooks;
use crate::manifest::PluginManifestPaths;
use crate::manifest::load_plugin_manifest;
@@ -114,7 +113,6 @@ pub async fn load_plugins_from_layer_stack(
config_layer_stack: &ConfigLayerStack,
extra_plugins: HashMap<String, PluginConfig>,
store: &PluginStore,
data_store: &dyn PluginDataStore,
restriction_product: Option<Product>,
prefer_remote_curated_conflicts: bool,
) -> PluginLoadOutcome<McpServerConfig> {
@@ -135,7 +133,6 @@ pub async fn load_plugins_from_layer_stack(
configured_name.clone(),
&plugin,
store,
data_store,
restriction_product,
&skill_config_rules,
)
@@ -560,7 +557,6 @@ async fn load_plugin(
config_name: String,
plugin: &PluginConfig,
store: &PluginStore,
data_store: &dyn PluginDataStore,
restriction_product: Option<Product>,
skill_config_rules: &SkillConfigRules,
) -> LoadedPlugin<McpServerConfig> {
@@ -663,7 +659,7 @@ async fn load_plugin(
let (hook_sources, hook_load_warnings) = load_plugin_hooks(
&plugin_root,
&loaded_plugin_id,
&data_store.plugin_data_root(&loaded_plugin_id),
&store.plugin_data_root(&loaded_plugin_id),
manifest_paths,
);
loaded_plugin.hook_sources = hook_sources;

View File

@@ -1,8 +1,6 @@
use super::PluginLoadOutcome;
use super::startup_remote_sync::start_startup_remote_plugin_sync_once;
use crate::OPENAI_CURATED_MARKETPLACE_NAME;
use crate::data_store::LocalPluginDataStore;
use crate::data_store::PluginDataStore;
use crate::installed_marketplaces::installed_marketplace_roots_from_layer_stack;
use crate::loader::configured_curated_plugin_ids_from_codex_home;
use crate::loader::curated_plugin_cache_version;
@@ -400,7 +398,6 @@ impl From<RemotePluginFetchError> for PluginRemoteSyncError {
pub struct PluginsManager {
codex_home: PathBuf,
store: PluginStore,
data_store: Arc<dyn PluginDataStore>,
featured_plugin_ids_cache: RwLock<Option<CachedFeaturedPluginIds>>,
configured_marketplace_upgrade_state: RwLock<ConfiguredMarketplaceUpgradeState>,
non_curated_cache_refresh_state: RwLock<NonCuratedCacheRefreshState>,
@@ -412,26 +409,6 @@ pub struct PluginsManager {
analytics_events_client: RwLock<Option<AnalyticsEventsClient>>,
}
/// Dependencies owned by one plugins manager.
#[derive(Clone)]
pub struct PluginsManagerStorageDeps {
data_store: Arc<dyn PluginDataStore>,
}
impl PluginsManagerStorageDeps {
/// Creates dependencies backed by the local Codex Home layout.
pub fn from_codex_home(codex_home: PathBuf) -> Result<Self, PluginStoreError> {
Ok(Self::new(Arc::new(LocalPluginDataStore::from_codex_home(
codex_home,
)?)))
}
/// Creates dependencies with an alternative mutable plugin data backend.
pub fn new(data_store: Arc<dyn PluginDataStore>) -> Self {
Self { data_store }
}
}
#[derive(Clone)]
struct CachedPluginLoadOutcome {
config_version: String,
@@ -443,35 +420,9 @@ impl PluginsManager {
Self::new_with_restriction_product(codex_home, Some(Product::Codex))
}
/// Creates a plugins manager with injected dependencies.
pub fn new_with_storage_deps(
codex_home: PathBuf,
storage_deps: PluginsManagerStorageDeps,
) -> Self {
Self::new_with_restriction_product_and_storage_deps(
codex_home,
Some(Product::Codex),
storage_deps,
)
}
pub fn new_with_restriction_product(
codex_home: PathBuf,
restriction_product: Option<Product>,
) -> Self {
let storage_deps = PluginsManagerStorageDeps::from_codex_home(codex_home.clone())
.unwrap_or_else(|err| panic!("plugin data root should be absolute: {err}"));
Self::new_with_restriction_product_and_storage_deps(
codex_home,
restriction_product,
storage_deps,
)
}
fn new_with_restriction_product_and_storage_deps(
codex_home: PathBuf,
restriction_product: Option<Product>,
storage_deps: PluginsManagerStorageDeps,
) -> Self {
// Product restrictions are enforced at marketplace admission time for a given CODEX_HOME:
// listing, install, and curated refresh all consult this restriction context before new
@@ -483,7 +434,6 @@ impl PluginsManager {
Self {
codex_home: codex_home.clone(),
store: PluginStore::new(codex_home),
data_store: storage_deps.data_store,
featured_plugin_ids_cache: RwLock::new(None),
configured_marketplace_upgrade_state: RwLock::new(
ConfiguredMarketplaceUpgradeState::default(),
@@ -541,7 +491,6 @@ impl PluginsManager {
&config.config_layer_stack,
self.remote_installed_plugin_configs(),
&self.store,
self.data_store.as_ref(),
self.restriction_product,
config.remote_plugin_enabled,
)
@@ -588,7 +537,6 @@ impl PluginsManager {
config_layer_stack,
self.remote_installed_plugin_configs(),
&self.store,
self.data_store.as_ref(),
self.restriction_product,
config.remote_plugin_enabled,
)
@@ -1508,7 +1456,7 @@ impl PluginsManager {
),
)
.await;
let plugin_data_root = self.data_store.plugin_data_root(&plugin_id);
let plugin_data_root = self.store.plugin_data_root(&plugin_id);
let (hook_sources, _hook_load_warnings) =
load_plugin_hooks(&source_path, &plugin_id, &plugin_data_root, &manifest.paths);
let hooks = plugin_hook_declarations(&hook_sources)

View File

@@ -1,8 +1,6 @@
use super::*;
use crate::LoadedPlugin;
use crate::PluginLoadOutcome;
use crate::data_store::LocalPluginDataStore;
use crate::data_store::PluginDataStore;
use crate::installed_marketplaces::marketplace_install_root;
use crate::loader::load_plugins_from_layer_stack;
use crate::loader::refresh_non_curated_plugin_cache;
@@ -31,12 +29,10 @@ use codex_config::types::McpServerTransportConfig;
use codex_login::CodexAuth;
use codex_protocol::protocol::HookEventName;
use codex_protocol::protocol::Product;
use codex_utils_absolute_path::AbsolutePathBuf;
use codex_utils_absolute_path::test_support::PathBufExt;
use pretty_assertions::assert_eq;
use std::fs;
use std::path::Path;
use std::sync::Arc;
use tempfile::TempDir;
use toml::Value;
use wiremock::Mock;
@@ -49,17 +45,6 @@ use wiremock::matchers::query_param;
const MAX_CAPABILITY_SUMMARY_DESCRIPTION_LEN: usize = 1024;
#[derive(Debug)]
struct FakePluginDataStore {
root: AbsolutePathBuf,
}
impl PluginDataStore for FakePluginDataStore {
fn plugin_data_root(&self, _plugin_id: &PluginId) -> AbsolutePathBuf {
self.root.clone()
}
}
fn write_plugin_with_version(
root: &Path,
dir_name: &str,
@@ -3866,7 +3851,6 @@ async fn load_plugins_ignores_project_config_files() {
&stack,
std::collections::HashMap::new(),
&PluginStore::new(codex_home.path().to_path_buf()),
&LocalPluginDataStore::from_codex_home(codex_home.path().to_path_buf()).unwrap(),
Some(Product::Codex),
/*prefer_remote_curated_conflicts*/ false,
)
@@ -3874,62 +3858,3 @@ async fn load_plugins_ignores_project_config_files() {
assert_eq!(outcome, PluginLoadOutcome::default());
}
#[tokio::test]
async fn plugins_for_layer_stack_uses_injected_plugin_data_store() {
let codex_home = TempDir::new().unwrap();
let plugin_root = codex_home
.path()
.join("plugins/cache")
.join("test/sample/local");
let config_path = AbsolutePathBuf::try_from(codex_home.path().join(CONFIG_TOML_FILE)).unwrap();
let data_root =
AbsolutePathBuf::try_from(codex_home.path().join("injected-plugin-data")).unwrap();
write_file(
&plugin_root.join(".codex-plugin/plugin.json"),
r#"{"name":"sample"}"#,
);
write_file(
&plugin_root.join("hooks/hooks.json"),
r#"{"hooks":{"SessionStart":[{"hooks":[{"type":"command","command":"echo startup"}]}]}}"#,
);
let stack = ConfigLayerStack::new(
vec![ConfigLayerEntry::new(
ConfigLayerSource::User {
file: config_path,
profile: None,
},
toml::from_str(&plugin_config_toml(
/*enabled*/ true, /*plugins_feature_enabled*/ true,
))
.expect("user config should parse"),
)],
ConfigRequirements::default(),
ConfigRequirementsToml::default(),
)
.expect("config layer stack should build");
let config = PluginsConfigInput::new(
stack.clone(),
/*plugins_enabled*/ true,
/*remote_plugin_enabled*/ false,
"https://chatgpt.com/backend-api/".to_string(),
);
let outcome = PluginsManager::new_with_storage_deps(
codex_home.path().to_path_buf(),
PluginsManagerStorageDeps::new(Arc::new(FakePluginDataStore {
root: data_root.clone(),
})),
)
.plugins_for_layer_stack(&stack, &config)
.await;
assert_eq!(
outcome
.effective_plugin_hook_sources()
.into_iter()
.map(|source| source.plugin_data_root)
.collect::<Vec<_>>(),
vec![data_root]
);
}

View File

@@ -15,6 +15,7 @@ use std::path::PathBuf;
pub const DEFAULT_PLUGIN_VERSION: &str = "local";
pub const PLUGINS_CACHE_DIR: &str = "plugins/cache";
pub const PLUGINS_DATA_DIR: &str = "plugins/data";
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PluginInstallResult {
@@ -26,6 +27,7 @@ pub struct PluginInstallResult {
#[derive(Debug, Clone)]
pub struct PluginStore {
root: AbsolutePathBuf,
data_root: AbsolutePathBuf,
}
impl PluginStore {
@@ -37,7 +39,11 @@ impl PluginStore {
pub fn try_new(codex_home: PathBuf) -> Result<Self, PluginStoreError> {
let root = AbsolutePathBuf::from_absolute_path_checked(codex_home.join(PLUGINS_CACHE_DIR))
.map_err(|err| PluginStoreError::io("failed to resolve plugin cache root", err))?;
Ok(Self { root })
let data_root =
AbsolutePathBuf::from_absolute_path_checked(codex_home.join(PLUGINS_DATA_DIR))
.map_err(|err| PluginStoreError::io("failed to resolve plugin data root", err))?;
Ok(Self { root, data_root })
}
pub fn root(&self) -> &AbsolutePathBuf {
@@ -54,6 +60,13 @@ impl PluginStore {
self.plugin_base_root(plugin_id).join(plugin_version)
}
pub fn plugin_data_root(&self, plugin_id: &PluginId) -> AbsolutePathBuf {
self.data_root.join(format!(
"{}-{}",
plugin_id.plugin_name, plugin_id.marketplace_name
))
}
pub fn active_plugin_version(&self, plugin_id: &PluginId) -> Option<String> {
let mut discovered_versions = fs::read_dir(self.plugin_base_root(plugin_id).as_path())
.ok()?

View File

@@ -109,6 +109,18 @@ fn plugin_root_derives_path_from_key_and_version() {
);
}
#[test]
fn plugin_data_root_derives_path_from_key() {
let tmp = tempdir().unwrap();
let store = PluginStore::new(tmp.path().to_path_buf());
let plugin_id = PluginId::new("sample".to_string(), "debug".to_string()).unwrap();
assert_eq!(
store.plugin_data_root(&plugin_id).as_path(),
tmp.path().join("plugins/data/sample-debug")
);
}
#[test]
fn install_with_version_uses_requested_cache_version() {
let tmp = tempdir().unwrap();

View File

@@ -0,0 +1,80 @@
use std::future::Future;
use std::pin::Pin;
use codex_protocol::error::Result;
use codex_utils_absolute_path::AbsolutePathBuf;
const GENERATED_IMAGE_ARTIFACTS_DIR: &str = "generated_images";
/// Stores generated artifacts and materializes executor-readable local paths.
pub trait ArtifactStore: Send + Sync + 'static {
/// Returns the executor-readable path exposed to generated image instructions.
fn generated_image_path(&self, session_id: &str, call_id: &str) -> AbsolutePathBuf;
/// Persists a generated image payload and returns its executor-readable path.
fn write_generated_image(
&self,
session_id: &str,
call_id: &str,
bytes: Vec<u8>,
) -> ArtifactWriteFuture<'_>;
}
/// Future returned by artifact writes.
pub type ArtifactWriteFuture<'a> =
Pin<Box<dyn Future<Output = Result<AbsolutePathBuf>> + Send + 'a>>;
#[derive(Clone)]
pub struct LocalArtifactStore {
codex_home: AbsolutePathBuf,
}
impl LocalArtifactStore {
pub fn from_codex_home(codex_home: &AbsolutePathBuf) -> Self {
Self {
codex_home: codex_home.clone(),
}
}
}
impl ArtifactStore for LocalArtifactStore {
fn generated_image_path(&self, session_id: &str, call_id: &str) -> AbsolutePathBuf {
self.codex_home
.join(GENERATED_IMAGE_ARTIFACTS_DIR)
.join(sanitize_artifact_path_component(session_id))
.join(format!("{}.png", sanitize_artifact_path_component(call_id)))
}
fn write_generated_image(
&self,
session_id: &str,
call_id: &str,
bytes: Vec<u8>,
) -> ArtifactWriteFuture<'_> {
let path = self.generated_image_path(session_id, call_id);
Box::pin(async move {
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
tokio::fs::write(&path, bytes).await?;
Ok(path)
})
}
}
fn sanitize_artifact_path_component(value: &str) -> String {
let mut sanitized: String = value
.chars()
.map(|ch| {
if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
ch
} else {
'_'
}
})
.collect();
if sanitized.is_empty() {
sanitized = "generated_image".to_string();
}
sanitized
}

View File

@@ -105,6 +105,8 @@ pub(crate) async fn run_codex_thread_interactive(
environment_selections: parent_ctx.environments.clone(),
analytics_events_client: Some(parent_session.services.analytics_events_client.clone()),
thread_store: Arc::clone(&parent_session.services.thread_store),
artifact_store: Arc::clone(&parent_session.services.artifact_store),
shell_snapshot_store: Arc::clone(&parent_session.services.shell_snapshot_store),
attestation_provider: parent_session.services.attestation_provider.clone(),
inherited_multi_agent_version: Some(MultiAgentVersion::Disabled),
}))

View File

@@ -24,6 +24,7 @@ pub use codex_thread::CodexThreadSettingsOverrides;
pub use codex_thread::ThreadConfigSnapshot;
pub use session::turn_context::TurnContext;
mod agent;
pub mod artifact_store;
mod attestation;
mod codex_delegate;
mod command_canonicalization;
@@ -132,7 +133,7 @@ mod rollout;
pub(crate) mod safety;
mod session_rollout_init_error;
pub mod shell;
pub(crate) mod shell_snapshot;
pub mod shell_snapshot;
pub mod spawn;
pub(crate) mod state_db_bridge;
pub use state_db_bridge::StateDbHandle;

View File

@@ -13,6 +13,7 @@ use crate::agent::AgentControl;
use crate::agent::AgentStatus;
use crate::agent::agent_status_from_event;
use crate::agent::status::is_final;
use crate::artifact_store::ArtifactStore;
use crate::attestation::AttestationProvider;
use crate::build_available_skills;
use crate::compact;
@@ -419,6 +420,8 @@ pub(crate) struct CodexSpawnArgs {
pub(crate) environment_selections: ResolvedTurnEnvironments,
pub(crate) analytics_events_client: Option<AnalyticsEventsClient>,
pub(crate) thread_store: Arc<dyn ThreadStore>,
pub(crate) artifact_store: Arc<dyn ArtifactStore>,
pub(crate) shell_snapshot_store: Arc<dyn crate::shell_snapshot::ShellSnapshotStore>,
pub(crate) attestation_provider: Option<Arc<dyn AttestationProvider>>,
pub(crate) inherited_multi_agent_version: Option<MultiAgentVersion>,
}
@@ -499,6 +502,8 @@ impl Codex {
environment_selections,
analytics_events_client,
thread_store,
artifact_store,
shell_snapshot_store,
attestation_provider,
inherited_multi_agent_version,
} = args;
@@ -645,6 +650,8 @@ impl Codex {
environment_manager,
analytics_events_client,
thread_store,
artifact_store,
shell_snapshot_store,
parent_rollout_thread_trace,
attestation_provider,
multi_agent_version,
@@ -1349,7 +1356,6 @@ impl Session {
&self,
previous_cwd: &AbsolutePathBuf,
next_cwd: &AbsolutePathBuf,
codex_home: &AbsolutePathBuf,
session_source: &SessionSource,
) {
if previous_cwd == next_cwd {
@@ -1368,7 +1374,7 @@ impl Session {
}
ShellSnapshot::refresh_snapshot(
codex_home.clone(),
Arc::clone(&self.services.shell_snapshot_store),
self.conversation_id,
next_cwd.clone(),
self.services.user_shell.as_ref().clone(),
@@ -1389,7 +1395,6 @@ impl Session {
previous_cwd,
permission_profile_changed,
next_cwd,
codex_home,
session_source,
) = {
let mut state = self.state.lock().await;
@@ -1411,7 +1416,6 @@ impl Session {
let permission_profile_changed =
previous_permission_profile != updated_permission_profile;
let next_cwd = updated.cwd.clone();
let codex_home = updated.codex_home.clone();
let session_source = updated.session_source.clone();
state.session_configuration = updated;
(
@@ -1420,18 +1424,12 @@ impl Session {
previous_cwd,
permission_profile_changed,
next_cwd,
codex_home,
session_source,
)
};
self.emit_config_changed_contributors(previous_config.as_ref(), new_config.as_ref());
self.maybe_refresh_shell_snapshot_for_cwd(
&previous_cwd,
&next_cwd,
&codex_home,
&session_source,
);
self.maybe_refresh_shell_snapshot_for_cwd(&previous_cwd, &next_cwd, &session_source);
if permission_profile_changed {
self.refresh_managed_network_proxy_for_current_permission_profile()
.await;

View File

@@ -1,5 +1,6 @@
use super::input_queue::InputQueue;
use super::*;
use crate::artifact_store::ArtifactStore;
use crate::config::ConstraintError;
use crate::goals::GoalRuntimeState;
use crate::skills::SkillError;
@@ -504,6 +505,8 @@ impl Session {
environment_manager: Arc<EnvironmentManager>,
analytics_events_client: Option<AnalyticsEventsClient>,
thread_store: Arc<dyn ThreadStore>,
artifact_store: Arc<dyn ArtifactStore>,
shell_snapshot_store: Arc<dyn crate::shell_snapshot::ShellSnapshotStore>,
parent_rollout_thread_trace: ThreadTraceContext,
attestation_provider: Option<Arc<dyn AttestationProvider>>,
multi_agent_version: Option<MultiAgentVersion>,
@@ -859,7 +862,7 @@ impl Session {
tx
} else {
ShellSnapshot::start_snapshotting(
config.codex_home.clone(),
Arc::clone(&shell_snapshot_store),
thread_id,
session_configuration.cwd.clone(),
&mut default_shell,
@@ -1006,6 +1009,7 @@ impl Session {
rollout_thread_trace,
user_shell: Arc::new(default_shell),
shell_snapshot_tx,
shell_snapshot_store,
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
exec_policy,
auth_manager: Arc::clone(&auth_manager),
@@ -1030,6 +1034,7 @@ impl Session {
state_db: state_db_ctx.clone(),
live_thread: live_thread_init.as_ref().cloned(),
thread_store: Arc::clone(&thread_store),
artifact_store,
attestation_provider: attestation_provider.clone(),
model_client: ModelClient::new(
Some(Arc::clone(&auth_manager)),

View File

@@ -1,5 +1,7 @@
use super::turn_context::TurnEnvironment;
use super::*;
use crate::artifact_store::ArtifactStore;
use crate::artifact_store::ArtifactWriteFuture;
use crate::config::ConfigBuilder;
use crate::config::ConfigOverrides;
use crate::config::test_config;
@@ -141,6 +143,7 @@ use codex_protocol::protocol::W3cTraceContext;
use codex_protocol::request_user_input::RequestUserInputAnswer;
use codex_protocol::request_user_input::RequestUserInputResponse;
use codex_rmcp_client::ElicitationAction;
use codex_utils_absolute_path::AbsolutePathBuf;
use core_test_support::PathBufExt;
use core_test_support::PathExt;
use core_test_support::context_snapshot;
@@ -190,6 +193,32 @@ struct InstructionsTestCase {
expects_apply_patch_description: bool,
}
#[derive(Clone)]
struct FakeArtifactStore {
writes: Arc<std::sync::Mutex<Vec<Vec<u8>>>>,
}
impl ArtifactStore for FakeArtifactStore {
fn generated_image_path(&self, session_id: &str, call_id: &str) -> AbsolutePathBuf {
let path = format!("/fake/{session_id}/{call_id}.png");
test_path_buf(&path).abs()
}
fn write_generated_image(
&self,
session_id: &str,
call_id: &str,
bytes: Vec<u8>,
) -> ArtifactWriteFuture<'_> {
let writes = Arc::clone(&self.writes);
let path = self.generated_image_path(session_id, call_id);
Box::pin(async move {
writes.lock().expect("lock writes").push(bytes);
Ok(path)
})
}
}
fn user_message(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
@@ -4603,6 +4632,12 @@ async fn session_new_fails_when_zsh_fork_enabled_without_packaged_zsh() {
codex_thread_store::LocalThreadStoreConfig::from_config(config.as_ref()),
/*state_db*/ None,
)),
Arc::new(crate::artifact_store::LocalArtifactStore::from_codex_home(
&config.codex_home,
)),
Arc::new(
crate::shell_snapshot::LocalShellSnapshotStore::from_codex_home(&config.codex_home),
),
codex_rollout_trace::ThreadTraceContext::disabled(),
/*attestation_provider*/ None,
Some(config.multi_agent_version_from_features()),
@@ -4735,6 +4770,9 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
rollout_thread_trace: codex_rollout_trace::ThreadTraceContext::disabled(),
user_shell: Arc::new(default_user_shell()),
shell_snapshot_tx: watch::channel(None).0,
shell_snapshot_store: Arc::new(
crate::shell_snapshot::LocalShellSnapshotStore::from_codex_home(&config.codex_home),
),
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
exec_policy,
auth_manager: auth_manager.clone(),
@@ -4763,6 +4801,9 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
codex_thread_store::LocalThreadStoreConfig::from_config(config.as_ref()),
/*state_db*/ None,
)),
artifact_store: Arc::new(crate::artifact_store::LocalArtifactStore::from_codex_home(
&config.codex_home,
)),
attestation_provider: None,
model_client: ModelClient::new(
Some(auth_manager.clone()),
@@ -4953,6 +4994,12 @@ async fn make_session_with_config_and_rx(
codex_thread_store::LocalThreadStoreConfig::from_config(config.as_ref()),
/*state_db*/ None,
)),
Arc::new(crate::artifact_store::LocalArtifactStore::from_codex_home(
&config.codex_home,
)),
Arc::new(
crate::shell_snapshot::LocalShellSnapshotStore::from_codex_home(&config.codex_home),
),
codex_rollout_trace::ThreadTraceContext::disabled(),
/*attestation_provider*/ None,
Some(config.multi_agent_version_from_features()),
@@ -5065,6 +5112,12 @@ async fn make_session_with_history_source_and_agent_control_and_rx(
.expect("state db should initialize"),
),
)),
Arc::new(crate::artifact_store::LocalArtifactStore::from_codex_home(
&config.codex_home,
)),
Arc::new(
crate::shell_snapshot::LocalShellSnapshotStore::from_codex_home(&config.codex_home),
),
codex_rollout_trace::ThreadTraceContext::disabled(),
/*attestation_provider*/ None,
Some(config.multi_agent_version_from_features()),
@@ -6825,6 +6878,9 @@ where
rollout_thread_trace: codex_rollout_trace::ThreadTraceContext::disabled(),
user_shell: Arc::new(default_user_shell()),
shell_snapshot_tx: watch::channel(None).0,
shell_snapshot_store: Arc::new(
crate::shell_snapshot::LocalShellSnapshotStore::from_codex_home(&config.codex_home),
),
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
exec_policy,
auth_manager: Arc::clone(&auth_manager),
@@ -6853,6 +6909,9 @@ where
codex_thread_store::LocalThreadStoreConfig::from_config(config.as_ref()),
state_db,
)),
artifact_store: Arc::new(crate::artifact_store::LocalArtifactStore::from_codex_home(
&config.codex_home,
)),
attestation_provider: None,
model_client: ModelClient::new(
Some(Arc::clone(&auth_manager)),
@@ -7844,6 +7903,69 @@ async fn handle_output_item_done_records_image_save_history_message() {
let _ = std::fs::remove_file(&expected_saved_path);
}
#[tokio::test]
async fn handle_output_item_done_uses_injected_artifact_store() {
let (mut session, turn_context) = make_session_and_context().await;
let writes = Arc::new(std::sync::Mutex::new(Vec::new()));
session.services.artifact_store = Arc::new(FakeArtifactStore {
writes: Arc::clone(&writes),
});
let session = Arc::new(session);
let turn_context = Arc::new(turn_context);
let call_id = "ig_injected_store";
let expected_saved_path = session
.services
.artifact_store
.generated_image_path(&session.conversation_id.to_string(), call_id);
let item = ResponseItem::ImageGenerationCall {
id: call_id.to_string(),
status: "completed".to_string(),
revised_prompt: Some("a tiny blue square".to_string()),
result: "Zm9v".to_string(),
};
let mut ctx = HandleOutputCtx {
sess: Arc::clone(&session),
turn_context: Arc::clone(&turn_context),
turn_store: Arc::new(codex_extension_api::ExtensionData::new(
turn_context.sub_id.clone(),
)),
tool_runtime: test_tool_runtime(Arc::clone(&session), Arc::clone(&turn_context)),
cancellation_token: CancellationToken::new(),
};
handle_output_item_done(&mut ctx, item.clone(), /*previously_active_item*/ None)
.await
.expect("image generation item should succeed");
assert_eq!(
writes.lock().expect("lock writes").as_slice(),
&[b"foo".to_vec()]
);
let image_output_path = session
.services
.artifact_store
.generated_image_path(&session.conversation_id.to_string(), "<image_id>");
let image_output_dir = image_output_path
.parent()
.expect("generated image path should have a parent");
let image_message: ResponseItem = crate::context::ContextualUserFragment::into(
crate::context::ImageGenerationInstructions::new(
image_output_dir.display(),
image_output_path.display(),
),
);
let history = session.clone_history().await;
assert_eq!(history.raw_items(), &[image_message, item]);
assert_eq!(
expected_saved_path,
test_path_buf(&format!(
"/fake/{}/ig_injected_store.png",
session.conversation_id
))
.abs()
);
}
#[tokio::test]
async fn handle_output_item_done_skips_image_save_message_when_save_fails() {
let (session, turn_context) = make_session_and_context().await;

View File

@@ -702,6 +702,12 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() {
codex_thread_store::LocalThreadStoreConfig::from_config(&config),
/*state_db*/ None,
));
let artifact_store = Arc::new(crate::artifact_store::LocalArtifactStore::from_codex_home(
&config.codex_home,
));
let shell_snapshot_store = Arc::new(
crate::shell_snapshot::LocalShellSnapshotStore::from_codex_home(&config.codex_home),
);
let CodexSpawnOk { codex, .. } = Codex::spawn(CodexSpawnArgs {
config,
@@ -733,6 +739,8 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() {
},
analytics_events_client: None,
thread_store,
artifact_store,
shell_snapshot_store,
attestation_provider: None,
inherited_multi_agent_version: None,
})

View File

@@ -609,7 +609,6 @@ impl Session {
let next_permission_profile = next.permission_profile();
let permission_profile_changed =
previous_permission_profile != next_permission_profile;
let codex_home = next.codex_home.clone();
let session_source = next.session_source.clone();
let previous_config = notify_config_contributors.then(|| {
Self::build_effective_session_config(&state.session_configuration)
@@ -622,7 +621,6 @@ impl Session {
turn_environments,
permission_profile_changed,
previous_cwd,
codex_home,
session_source,
previous_config,
new_config,
@@ -637,7 +635,6 @@ impl Session {
turn_environments,
permission_profile_changed,
previous_cwd,
codex_home,
session_source,
previous_config,
new_config,
@@ -661,7 +658,6 @@ impl Session {
self.maybe_refresh_shell_snapshot_for_cwd(
&previous_cwd,
&session_configuration.cwd,
&codex_home,
&session_source,
);

View File

@@ -1,5 +1,7 @@
use std::future::Future;
use std::io::ErrorKind;
use std::path::Path;
use std::pin::Pin;
use std::process::Stdio;
use std::sync::Arc;
use std::time::Duration;
@@ -35,9 +37,144 @@ const SNAPSHOT_RETENTION: Duration = Duration::from_secs(60 * 60 * 24 * 3); // 3
const SNAPSHOT_DIR: &str = "shell_snapshots";
const EXCLUDED_EXPORT_VARS: &[&str] = &["PWD", "OLDPWD"];
/// Persists executor-local shell snapshot files and owns retention cleanup.
/// Implementations must return local readable paths because shell execution sources snapshots.
pub trait ShellSnapshotStore: Send + Sync + 'static {
/// Allocates the final and temporary paths for one snapshot generation.
fn snapshot_paths(&self, session_id: ThreadId, shell_type: ShellType) -> ShellSnapshotPaths;
/// Removes stale inactive snapshots according to the store's retention policy.
fn cleanup_stale_snapshots(
&self,
active_session_id: ThreadId,
state_db: Option<StateDbHandle>,
) -> ShellSnapshotStoreFuture<'_>;
}
/// Future returned by shell snapshot store cleanup work.
pub type ShellSnapshotStoreFuture<'a> = Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
#[derive(Clone)]
/// Codex Home backed executor-local shell snapshot store.
pub struct LocalShellSnapshotStore {
codex_home: AbsolutePathBuf,
}
impl LocalShellSnapshotStore {
/// Constructs a local shell snapshot store rooted at one Codex Home.
pub fn from_codex_home(codex_home: &AbsolutePathBuf) -> Self {
Self {
codex_home: codex_home.clone(),
}
}
fn snapshot_dir(&self) -> AbsolutePathBuf {
self.codex_home.join(SNAPSHOT_DIR)
}
async fn cleanup_stale_snapshots_impl(
&self,
active_session_id: ThreadId,
state_db: Option<StateDbHandle>,
) -> Result<()> {
let snapshot_dir = self.snapshot_dir();
let mut entries = match fs::read_dir(&snapshot_dir).await {
Ok(entries) => entries,
Err(err) if err.kind() == ErrorKind::NotFound => return Ok(()),
Err(err) => return Err(err.into()),
};
let now = SystemTime::now();
let active_session_id = active_session_id.to_string();
while let Some(entry) = entries.next_entry().await? {
if !entry.file_type().await?.is_file() {
continue;
}
let path = entry.path();
let file_name = entry.file_name();
let file_name = file_name.to_string_lossy();
let Some(session_id) = snapshot_session_id_from_file_name(&file_name) else {
remove_snapshot_file(&path).await;
continue;
};
if session_id == active_session_id {
continue;
}
let rollout_path =
find_thread_path_by_id_str(&self.codex_home, session_id, state_db.as_deref())
.await?;
let Some(rollout_path) = rollout_path else {
remove_snapshot_file(&path).await;
continue;
};
let modified = match fs::metadata(&rollout_path).await.and_then(|m| m.modified()) {
Ok(modified) => modified,
Err(err) => {
tracing::warn!(
"Failed to check rollout age for snapshot {}: {err:?}",
path.display()
);
continue;
}
};
if now
.duration_since(modified)
.ok()
.is_some_and(|age| age >= SNAPSHOT_RETENTION)
{
remove_snapshot_file(&path).await;
}
}
Ok(())
}
}
impl ShellSnapshotStore for LocalShellSnapshotStore {
fn snapshot_paths(&self, session_id: ThreadId, shell_type: ShellType) -> ShellSnapshotPaths {
let extension = match shell_type {
ShellType::PowerShell => "ps1",
_ => "sh",
};
let nonce = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map(|duration| duration.as_nanos())
.unwrap_or(0);
let snapshot_dir = self.snapshot_dir();
ShellSnapshotPaths {
path: snapshot_dir.join(format!("{session_id}.{nonce}.{extension}")),
temp_path: snapshot_dir.join(format!("{session_id}.tmp-{nonce}")),
}
}
fn cleanup_stale_snapshots(
&self,
active_session_id: ThreadId,
state_db: Option<StateDbHandle>,
) -> ShellSnapshotStoreFuture<'_> {
Box::pin(self.cleanup_stale_snapshots_impl(active_session_id, state_db))
}
}
/// Final and temporary local paths for one shell snapshot generation.
pub struct ShellSnapshotPaths {
/// Final path sourced by later shell execution.
pub path: AbsolutePathBuf,
/// Temporary path populated before validation and rename.
pub temp_path: AbsolutePathBuf,
}
impl ShellSnapshot {
pub fn start_snapshotting(
codex_home: AbsolutePathBuf,
pub(crate) fn start_snapshotting(
shell_snapshot_store: Arc<dyn ShellSnapshotStore>,
session_id: ThreadId,
session_cwd: AbsolutePathBuf,
shell: &mut Shell,
@@ -48,7 +185,7 @@ impl ShellSnapshot {
shell.shell_snapshot = shell_snapshot_rx;
Self::spawn_snapshot_task(
codex_home,
shell_snapshot_store,
session_id,
session_cwd,
shell.clone(),
@@ -60,8 +197,8 @@ impl ShellSnapshot {
shell_snapshot_tx
}
pub fn refresh_snapshot(
codex_home: AbsolutePathBuf,
pub(crate) fn refresh_snapshot(
shell_snapshot_store: Arc<dyn ShellSnapshotStore>,
session_id: ThreadId,
session_cwd: AbsolutePathBuf,
shell: Shell,
@@ -70,7 +207,7 @@ impl ShellSnapshot {
state_db: Option<StateDbHandle>,
) {
Self::spawn_snapshot_task(
codex_home,
shell_snapshot_store,
session_id,
session_cwd,
shell,
@@ -81,7 +218,7 @@ impl ShellSnapshot {
}
fn spawn_snapshot_task(
codex_home: AbsolutePathBuf,
shell_snapshot_store: Arc<dyn ShellSnapshotStore>,
session_id: ThreadId,
session_cwd: AbsolutePathBuf,
snapshot_shell: Shell,
@@ -92,13 +229,24 @@ impl ShellSnapshot {
let snapshot_span = info_span!("shell_snapshot", thread_id = %session_id);
tokio::spawn(
async move {
let cleanup_shell_snapshot_store = Arc::clone(&shell_snapshot_store);
let cleanup_session_id = session_id;
let cleanup_state_db = state_db.clone();
tokio::spawn(async move {
if let Err(err) = cleanup_shell_snapshot_store
.cleanup_stale_snapshots(cleanup_session_id, cleanup_state_db)
.await
{
tracing::warn!("Failed to clean up shell snapshots: {err:?}");
}
});
let timer = session_telemetry.start_timer("codex.shell_snapshot.duration_ms", &[]);
let snapshot = ShellSnapshot::try_new(
&codex_home,
shell_snapshot_store.as_ref(),
session_id,
&session_cwd,
&snapshot_shell,
state_db,
)
.await
.map(Arc::new);
@@ -117,38 +265,13 @@ impl ShellSnapshot {
}
async fn try_new(
codex_home: &AbsolutePathBuf,
shell_snapshot_store: &dyn ShellSnapshotStore,
session_id: ThreadId,
session_cwd: &AbsolutePathBuf,
shell: &Shell,
state_db: Option<StateDbHandle>,
) -> std::result::Result<Self, &'static str> {
// File to store the snapshot
let extension = match shell.shell_type {
ShellType::PowerShell => "ps1",
_ => "sh",
};
let nonce = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map(|duration| duration.as_nanos())
.unwrap_or(0);
let path = codex_home
.join(SNAPSHOT_DIR)
.join(format!("{session_id}.{nonce}.{extension}"));
let temp_path = codex_home
.join(SNAPSHOT_DIR)
.join(format!("{session_id}.tmp-{nonce}"));
// Clean the (unlikely) leaked snapshot files.
let codex_home = codex_home.clone();
let cleanup_session_id = session_id;
tokio::spawn(async move {
if let Err(err) =
cleanup_stale_snapshots(&codex_home, cleanup_session_id, state_db).await
{
tracing::warn!("Failed to clean up shell snapshots: {err:?}");
}
});
let ShellSnapshotPaths { path, temp_path } =
shell_snapshot_store.snapshot_paths(session_id, shell.shell_type.clone());
// Make the new snapshot.
if let Err(err) =
@@ -494,72 +617,6 @@ $envVars | ForEach-Object {
"##
}
/// Removes shell snapshots that either lack a matching session rollout file or
/// whose rollouts have not been updated within the retention window.
/// The active session id is exempt from cleanup.
pub async fn cleanup_stale_snapshots(
codex_home: &AbsolutePathBuf,
active_session_id: ThreadId,
state_db: Option<StateDbHandle>,
) -> Result<()> {
let snapshot_dir = codex_home.join(SNAPSHOT_DIR);
let mut entries = match fs::read_dir(&snapshot_dir).await {
Ok(entries) => entries,
Err(err) if err.kind() == ErrorKind::NotFound => return Ok(()),
Err(err) => return Err(err.into()),
};
let now = SystemTime::now();
let active_session_id = active_session_id.to_string();
while let Some(entry) = entries.next_entry().await? {
if !entry.file_type().await?.is_file() {
continue;
}
let path = entry.path();
let file_name = entry.file_name();
let file_name = file_name.to_string_lossy();
let Some(session_id) = snapshot_session_id_from_file_name(&file_name) else {
remove_snapshot_file(&path).await;
continue;
};
if session_id == active_session_id {
continue;
}
let rollout_path =
find_thread_path_by_id_str(codex_home, session_id, state_db.as_deref()).await?;
let Some(rollout_path) = rollout_path else {
remove_snapshot_file(&path).await;
continue;
};
let modified = match fs::metadata(&rollout_path).await.and_then(|m| m.modified()) {
Ok(modified) => modified,
Err(err) => {
tracing::warn!(
"Failed to check rollout age for snapshot {}: {err:?}",
path.display()
);
continue;
}
};
if now
.duration_since(modified)
.ok()
.is_some_and(|age| age >= SNAPSHOT_RETENTION)
{
remove_snapshot_file(&path).await;
}
}
Ok(())
}
async fn remove_snapshot_file(path: &Path) {
if let Err(err) = fs::remove_file(path).await {
tracing::warn!("Failed to delete shell snapshot at {:?}: {err:?}", path);

View File

@@ -196,13 +196,13 @@ async fn try_new_creates_and_deletes_snapshot_file() -> Result<()> {
shell_path: PathBuf::from("/bin/bash"),
shell_snapshot: crate::shell::empty_shell_snapshot_receiver(),
};
let shell_snapshot_store = LocalShellSnapshotStore::from_codex_home(&dir.path().abs());
let snapshot = ShellSnapshot::try_new(
&dir.path().abs(),
&shell_snapshot_store,
ThreadId::new(),
&dir.path().abs(),
&shell,
/*state_db*/ None,
)
.await
.expect("snapshot should be created");
@@ -227,25 +227,16 @@ async fn try_new_uses_distinct_generation_paths() -> Result<()> {
shell_path: PathBuf::from("/bin/bash"),
shell_snapshot: crate::shell::empty_shell_snapshot_receiver(),
};
let shell_snapshot_store = LocalShellSnapshotStore::from_codex_home(&dir.path().abs());
let initial_snapshot = ShellSnapshot::try_new(
&dir.path().abs(),
session_id,
&dir.path().abs(),
&shell,
/*state_db*/ None,
)
.await
.expect("initial snapshot should be created");
let refreshed_snapshot = ShellSnapshot::try_new(
&dir.path().abs(),
session_id,
&dir.path().abs(),
&shell,
/*state_db*/ None,
)
.await
.expect("refreshed snapshot should be created");
let initial_snapshot =
ShellSnapshot::try_new(&shell_snapshot_store, session_id, &dir.path().abs(), &shell)
.await
.expect("initial snapshot should be created");
let refreshed_snapshot =
ShellSnapshot::try_new(&shell_snapshot_store, session_id, &dir.path().abs(), &shell)
.await
.expect("refreshed snapshot should be created");
let initial_path = initial_snapshot.path.clone();
let refreshed_path = refreshed_snapshot.path.clone();
@@ -439,7 +430,9 @@ async fn cleanup_stale_snapshots_removes_orphans_and_keeps_live() -> Result<()>
fs::write(&orphan_snapshot, "orphan").await?;
fs::write(&invalid_snapshot, "invalid").await?;
cleanup_stale_snapshots(&codex_home, ThreadId::new(), /*state_db*/ None).await?;
LocalShellSnapshotStore::from_codex_home(&codex_home)
.cleanup_stale_snapshots(ThreadId::new(), /*state_db*/ None)
.await?;
assert_eq!(live_snapshot.exists(), true);
assert_eq!(orphan_snapshot.exists(), false);
@@ -462,7 +455,9 @@ async fn cleanup_stale_snapshots_removes_stale_rollouts() -> Result<()> {
set_file_mtime(&rollout_path, SNAPSHOT_RETENTION + Duration::from_secs(60))?;
cleanup_stale_snapshots(&codex_home, ThreadId::new(), /*state_db*/ None).await?;
LocalShellSnapshotStore::from_codex_home(&codex_home)
.cleanup_stale_snapshots(ThreadId::new(), /*state_db*/ None)
.await?;
assert_eq!(stale_snapshot.exists(), false);
Ok(())
@@ -483,7 +478,9 @@ async fn cleanup_stale_snapshots_skips_active_session() -> Result<()> {
set_file_mtime(&rollout_path, SNAPSHOT_RETENTION + Duration::from_secs(60))?;
cleanup_stale_snapshots(&codex_home, active_session, /*state_db*/ None).await?;
LocalShellSnapshotStore::from_codex_home(&codex_home)
.cleanup_stale_snapshots(active_session, /*state_db*/ None)
.await?;
assert_eq!(active_snapshot.exists(), true);
Ok(())

View File

@@ -3,6 +3,7 @@ use std::sync::Arc;
use crate::SkillsManager;
use crate::agent::AgentControl;
use crate::artifact_store::ArtifactStore;
use crate::attestation::AttestationProvider;
use crate::client::ModelClient;
use crate::config::NetworkProxyAuditMetadata;
@@ -51,6 +52,7 @@ pub(crate) struct SessionServices {
pub(crate) rollout_thread_trace: ThreadTraceContext,
pub(crate) user_shell: Arc<crate::shell::Shell>,
pub(crate) shell_snapshot_tx: watch::Sender<Option<Arc<crate::shell_snapshot::ShellSnapshot>>>,
pub(crate) shell_snapshot_store: Arc<dyn crate::shell_snapshot::ShellSnapshotStore>,
pub(crate) show_raw_agent_reasoning: bool,
pub(crate) exec_policy: Arc<ExecPolicyManager>,
pub(crate) auth_manager: Arc<AuthManager>,
@@ -74,6 +76,7 @@ pub(crate) struct SessionServices {
pub(crate) state_db: Option<StateDbHandle>,
pub(crate) live_thread: Option<LiveThread>,
pub(crate) thread_store: Arc<dyn ThreadStore>,
pub(crate) artifact_store: Arc<dyn ArtifactStore>,
pub(crate) attestation_provider: Option<Arc<dyn AttestationProvider>>,
/// Session-scoped model client shared across turns.
pub(crate) model_client: ModelClient,

View File

@@ -36,34 +36,17 @@ use tracing::debug;
use tracing::instrument;
use tracing::warn;
const GENERATED_IMAGE_ARTIFACTS_DIR: &str = "generated_images";
use crate::artifact_store::ArtifactStore;
#[cfg(test)]
use crate::artifact_store::LocalArtifactStore;
#[cfg(test)]
pub(crate) fn image_generation_artifact_path(
codex_home: &AbsolutePathBuf,
session_id: &str,
call_id: &str,
) -> AbsolutePathBuf {
let sanitize = |value: &str| {
let mut sanitized: String = value
.chars()
.map(|ch| {
if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
ch
} else {
'_'
}
})
.collect();
if sanitized.is_empty() {
sanitized = "generated_image".to_string();
}
sanitized
};
codex_home
.join(GENERATED_IMAGE_ARTIFACTS_DIR)
.join(sanitize(session_id))
.join(format!("{}.png", sanitize(call_id)))
LocalArtifactStore::from_codex_home(codex_home).generated_image_path(session_id, call_id)
}
fn strip_hidden_assistant_markup(text: &str, plan_mode: bool) -> String {
@@ -108,7 +91,7 @@ pub(crate) fn raw_assistant_output_text_from_item(item: &ResponseItem) -> Option
}
async fn save_image_generation_result(
codex_home: &AbsolutePathBuf,
artifact_store: &dyn ArtifactStore,
session_id: &str,
call_id: &str,
result: &str,
@@ -118,12 +101,9 @@ async fn save_image_generation_result(
.map_err(|err| {
CodexErr::InvalidRequest(format!("invalid image generation payload: {err}"))
})?;
let path = image_generation_artifact_path(codex_home, session_id, call_id);
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
tokio::fs::write(&path, bytes).await?;
Ok(path)
artifact_store
.write_generated_image(session_id, call_id, bytes)
.await
}
pub(crate) async fn persist_image_generation_item(
@@ -134,7 +114,7 @@ pub(crate) async fn persist_image_generation_item(
image_item.saved_path = None;
let session_id = sess.conversation_id.to_string();
match save_image_generation_result(
&turn_context.config.codex_home,
sess.services.artifact_store.as_ref(),
&session_id,
&image_item.id,
&image_item.result,
@@ -146,11 +126,10 @@ pub(crate) async fn persist_image_generation_item(
Some(path)
}
Err(err) => {
let output_path = image_generation_artifact_path(
&turn_context.config.codex_home,
&session_id,
&image_item.id,
);
let output_path = sess
.services
.artifact_store
.generated_image_path(&session_id, &image_item.id);
let output_dir = output_path
.parent()
.unwrap_or_else(|| turn_context.config.codex_home.clone());
@@ -173,8 +152,10 @@ async fn record_image_generation_instructions(
return;
}
let session_id = sess.conversation_id.to_string();
let image_output_path =
image_generation_artifact_path(&turn_context.config.codex_home, &session_id, "<image_id>");
let image_output_path = sess
.services
.artifact_store
.generated_image_path(&session_id, "<image_id>");
let image_output_dir = image_output_path
.parent()
.unwrap_or_else(|| turn_context.config.codex_home.clone());

View File

@@ -8,6 +8,7 @@ use super::image_generation_artifact_path;
use super::last_assistant_message_from_item;
use super::response_item_may_include_external_context;
use super::save_image_generation_result;
use crate::artifact_store::LocalArtifactStore;
use crate::session::tests::make_session_and_context;
use crate::tools::ToolRouter;
use crate::tools::parallel::ToolCallRuntime;
@@ -424,10 +425,14 @@ async fn save_image_generation_result_saves_base64_to_png_in_codex_home() {
let expected_path = image_generation_artifact_path(&codex_home, "session-1", "ig_save_base64");
let _ = std::fs::remove_file(&expected_path);
let saved_path =
save_image_generation_result(&codex_home, "session-1", "ig_save_base64", "Zm9v")
.await
.expect("image should be saved");
let saved_path = save_image_generation_result(
&LocalArtifactStore::from_codex_home(&codex_home),
"session-1",
"ig_save_base64",
"Zm9v",
)
.await
.expect("image should be saved");
assert_eq!(saved_path, expected_path);
assert_eq!(std::fs::read(&saved_path).expect("saved file"), b"foo");
@@ -440,9 +445,14 @@ async fn save_image_generation_result_rejects_data_url_payload() {
let codex_home = tempfile::tempdir().expect("create codex home");
let codex_home = codex_home.path().abs();
let err = save_image_generation_result(&codex_home, "session-1", "ig_456", result)
.await
.expect_err("data url payload should error");
let err = save_image_generation_result(
&LocalArtifactStore::from_codex_home(&codex_home),
"session-1",
"ig_456",
result,
)
.await
.expect_err("data url payload should error");
assert!(matches!(err, CodexErr::InvalidRequest(_)));
}
@@ -459,9 +469,14 @@ async fn save_image_generation_result_overwrites_existing_file() {
.expect("create image output dir");
std::fs::write(&existing_path, b"existing").expect("seed existing image");
let saved_path = save_image_generation_result(&codex_home, "session-1", "ig_overwrite", "Zm9v")
.await
.expect("image should be saved");
let saved_path = save_image_generation_result(
&LocalArtifactStore::from_codex_home(&codex_home),
"session-1",
"ig_overwrite",
"Zm9v",
)
.await
.expect("image should be saved");
assert_eq!(saved_path, existing_path);
assert_eq!(std::fs::read(&saved_path).expect("saved file"), b"foo");
@@ -475,9 +490,14 @@ async fn save_image_generation_result_sanitizes_call_id_for_codex_home_output_pa
let expected_path = image_generation_artifact_path(&codex_home, "session-1", "../ig/..");
let _ = std::fs::remove_file(&expected_path);
let saved_path = save_image_generation_result(&codex_home, "session-1", "../ig/..", "Zm9v")
.await
.expect("image should be saved");
let saved_path = save_image_generation_result(
&LocalArtifactStore::from_codex_home(&codex_home),
"session-1",
"../ig/..",
"Zm9v",
)
.await
.expect("image should be saved");
assert_eq!(saved_path, expected_path);
assert_eq!(std::fs::read(&saved_path).expect("saved file"), b"foo");
@@ -488,9 +508,14 @@ async fn save_image_generation_result_sanitizes_call_id_for_codex_home_output_pa
async fn save_image_generation_result_rejects_non_standard_base64() {
let codex_home = tempfile::tempdir().expect("create codex home");
let codex_home = codex_home.path().abs();
let err = save_image_generation_result(&codex_home, "session-1", "ig_urlsafe", "_-8")
.await
.expect_err("non-standard base64 should error");
let err = save_image_generation_result(
&LocalArtifactStore::from_codex_home(&codex_home),
"session-1",
"ig_urlsafe",
"_-8",
)
.await
.expect_err("non-standard base64 should error");
assert!(matches!(err, CodexErr::InvalidRequest(_)));
}
@@ -499,7 +524,7 @@ async fn save_image_generation_result_rejects_non_base64_data_urls() {
let codex_home = tempfile::tempdir().expect("create codex home");
let codex_home = codex_home.path().abs();
let err = save_image_generation_result(
&codex_home,
&LocalArtifactStore::from_codex_home(&codex_home),
"session-1",
"ig_svg",
"data:image/svg+xml,<svg/>",

View File

@@ -1,5 +1,7 @@
use crate::SkillsManager;
use crate::agent::AgentControl;
use crate::artifact_store::ArtifactStore;
use crate::artifact_store::LocalArtifactStore;
use crate::attestation::AttestationProvider;
use crate::codex_thread::CodexThread;
use crate::config::Config;
@@ -13,7 +15,9 @@ use crate::session::CodexSpawnArgs;
use crate::session::CodexSpawnOk;
use crate::session::INITIAL_SUBMIT_ID;
use crate::session::resolve_multi_agent_version;
use crate::shell_snapshot::LocalShellSnapshotStore;
use crate::shell_snapshot::ShellSnapshot;
use crate::shell_snapshot::ShellSnapshotStore;
use crate::tasks::InterruptedTurnHistoryMarker;
use crate::tasks::interrupted_turn_history_marker;
use codex_analytics::AnalyticsEventsClient;
@@ -208,6 +212,8 @@ pub(crate) struct ThreadManagerState {
mcp_manager: Arc<McpManager>,
extensions: Arc<ExtensionRegistry<Config>>,
thread_store: Arc<dyn ThreadStore>,
artifact_store: Arc<dyn ArtifactStore>,
shell_snapshot_store: Arc<dyn ShellSnapshotStore>,
attestation_provider: Option<Arc<dyn AttestationProvider>>,
session_source: SessionSource,
installation_id: String,
@@ -262,6 +268,98 @@ impl ThreadManager {
state_db: Option<StateDbHandle>,
installation_id: String,
attestation_provider: Option<Arc<dyn AttestationProvider>>,
) -> Self {
Self::new_with_artifact_store(
config,
auth_manager,
session_source,
environment_manager,
extensions,
analytics_events_client,
thread_store,
Arc::new(LocalArtifactStore::from_codex_home(&config.codex_home)),
state_db,
installation_id,
attestation_provider,
)
}
#[allow(clippy::too_many_arguments)]
/// Constructs a thread manager with an explicit generated artifact backend.
pub fn new_with_artifact_store(
config: &Config,
auth_manager: Arc<AuthManager>,
session_source: SessionSource,
environment_manager: Arc<EnvironmentManager>,
extensions: Arc<ExtensionRegistry<Config>>,
analytics_events_client: Option<AnalyticsEventsClient>,
thread_store: Arc<dyn ThreadStore>,
artifact_store: Arc<dyn ArtifactStore>,
state_db: Option<StateDbHandle>,
installation_id: String,
attestation_provider: Option<Arc<dyn AttestationProvider>>,
) -> Self {
Self::new_with_storage_backends(
config,
auth_manager,
session_source,
environment_manager,
extensions,
analytics_events_client,
thread_store,
artifact_store,
Arc::new(LocalShellSnapshotStore::from_codex_home(&config.codex_home)),
state_db,
installation_id,
attestation_provider,
)
}
#[allow(clippy::too_many_arguments)]
/// Constructs a thread manager with an explicit shell snapshot backend.
pub fn new_with_shell_snapshot_store(
config: &Config,
auth_manager: Arc<AuthManager>,
session_source: SessionSource,
environment_manager: Arc<EnvironmentManager>,
extensions: Arc<ExtensionRegistry<Config>>,
analytics_events_client: Option<AnalyticsEventsClient>,
thread_store: Arc<dyn ThreadStore>,
shell_snapshot_store: Arc<dyn ShellSnapshotStore>,
state_db: Option<StateDbHandle>,
installation_id: String,
attestation_provider: Option<Arc<dyn AttestationProvider>>,
) -> Self {
Self::new_with_storage_backends(
config,
auth_manager,
session_source,
environment_manager,
extensions,
analytics_events_client,
thread_store,
Arc::new(LocalArtifactStore::from_codex_home(&config.codex_home)),
shell_snapshot_store,
state_db,
installation_id,
attestation_provider,
)
}
#[allow(clippy::too_many_arguments)]
fn new_with_storage_backends(
config: &Config,
auth_manager: Arc<AuthManager>,
session_source: SessionSource,
environment_manager: Arc<EnvironmentManager>,
extensions: Arc<ExtensionRegistry<Config>>,
analytics_events_client: Option<AnalyticsEventsClient>,
thread_store: Arc<dyn ThreadStore>,
artifact_store: Arc<dyn ArtifactStore>,
shell_snapshot_store: Arc<dyn ShellSnapshotStore>,
state_db: Option<StateDbHandle>,
installation_id: String,
attestation_provider: Option<Arc<dyn AttestationProvider>>,
) -> Self {
let codex_home = config.codex_home.clone();
let restriction_product = session_source.restriction_product();
@@ -287,6 +385,8 @@ impl ThreadManager {
mcp_manager,
extensions,
thread_store,
artifact_store,
shell_snapshot_store,
attestation_provider,
auth_manager,
session_source,
@@ -361,8 +461,10 @@ impl ThreadManager {
restriction_product,
));
let mcp_manager = Arc::new(McpManager::new(Arc::clone(&plugins_manager)));
let shell_snapshot_store: Arc<dyn ShellSnapshotStore> =
Arc::new(LocalShellSnapshotStore::from_codex_home(&skills_codex_home));
let skills_manager = Arc::new(SkillsManager::new_with_restriction_product(
skills_codex_home,
skills_codex_home.clone(),
/*bundled_skills_enabled*/ true,
restriction_product,
));
@@ -376,6 +478,8 @@ impl ThreadManager {
},
state_db.clone(),
));
let artifact_store: Arc<dyn ArtifactStore> =
Arc::new(LocalArtifactStore::from_codex_home(&skills_codex_home));
Self {
state: Arc::new(ThreadManagerState {
threads: Arc::new(RwLock::new(HashMap::new())),
@@ -388,6 +492,8 @@ impl ThreadManager {
mcp_manager,
extensions: empty_extension_registry(),
thread_store,
artifact_store,
shell_snapshot_store,
attestation_provider: None,
auth_manager,
session_source: SessionSource::Exec,
@@ -1334,6 +1440,8 @@ impl ThreadManagerState {
environment_selections,
analytics_events_client: self.analytics_events_client.clone(),
thread_store: Arc::clone(&self.thread_store),
artifact_store: Arc::clone(&self.artifact_store),
shell_snapshot_store: Arc::clone(&self.shell_snapshot_store),
attestation_provider: self.attestation_provider.clone(),
inherited_multi_agent_version: multi_agent_version,
}))

View File

@@ -1,4 +1,6 @@
use super::*;
use crate::artifact_store::ArtifactStore;
use crate::artifact_store::ArtifactWriteFuture;
use crate::config::test_config;
use crate::init_state_db;
use crate::installation_id::INSTALLATION_ID_FILENAME;
@@ -33,6 +35,23 @@ use wiremock::MockServer;
const TEST_INSTALLATION_ID: &str = "11111111-1111-4111-8111-111111111111";
struct FakeArtifactStore;
impl ArtifactStore for FakeArtifactStore {
fn generated_image_path(&self, _session_id: &str, _call_id: &str) -> AbsolutePathBuf {
unreachable!("generated image path should not be requested")
}
fn write_generated_image(
&self,
_session_id: &str,
_call_id: &str,
_bytes: Vec<u8>,
) -> ArtifactWriteFuture<'_> {
unreachable!("generated image write should not be requested")
}
}
fn user_msg(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
@@ -64,6 +83,31 @@ fn developer_interrupted_marker() -> ResponseItem {
.expect("developer interrupted marker should be enabled")
}
#[tokio::test]
async fn start_thread_uses_injected_artifact_store() {
let config = test_config().await;
let artifact_store: Arc<dyn ArtifactStore> = Arc::new(FakeArtifactStore);
let manager = ThreadManager::new_with_artifact_store(
&config,
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing()),
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
empty_extension_registry(),
/*analytics_events_client*/ None,
thread_store_from_config(&config, /*state_db*/ None),
Arc::clone(&artifact_store),
/*state_db*/ None,
TEST_INSTALLATION_ID.to_string(),
/*attestation_provider*/ None,
);
let thread = manager.start_thread(config).await.expect("start thread");
assert!(Arc::ptr_eq(
&thread.thread.codex.session.services.artifact_store,
&artifact_store
));
}
#[test]
fn truncates_before_requested_user_message() {
let items = [