Compare commits

...

8 Commits

Author SHA1 Message Date
starr-openai
bccff8a142 trim runtime storage compatibility shims 2026-06-03 10:54:33 -07:00
starr-openai
8555969120 polish runtime storage seam naming 2026-06-03 10:52:23 -07:00
starr-openai
7826705ff5 polish runtime storage seam naming 2026-06-03 10:48:43 -07:00
starr-openai
97ad22d373 refactor expose thread storage deps 2026-06-03 00:33:16 -07:00
starr-openai
acc274c12a refactor expose runtime storage deps 2026-06-03 00:29:37 -07:00
starr-openai
4bbb82b331 refactor app-server runtime storage deps 2026-06-03 00:23:08 -07:00
starr-openai
8f4d1c2ce6 refactor shell snapshot store seam 2026-06-03 00:19:03 -07:00
starr-openai
eeb7b76548 refactor generated image artifact store seam 2026-06-03 00:18:11 -07:00
20 changed files with 811 additions and 218 deletions

View File

@@ -25,6 +25,12 @@ use std::io::Result as IoResult;
use std::sync::Arc;
use std::time::Duration;
pub use codex_app_server::AppServerRuntimeStorageOverrides;
pub use codex_app_server::ArtifactStore;
pub use codex_app_server::LocalArtifactStore;
pub use codex_app_server::LocalShellSnapshotStore;
pub use codex_app_server::ShellSnapshotStore;
pub use codex_app_server::ThreadManagerStorageDeps;
pub use codex_app_server::app_server_control_socket_path;
pub use codex_app_server::in_process::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY;
pub use codex_app_server::in_process::InProcessServerEvent;
@@ -489,9 +495,24 @@ impl InProcessAppServerClient {
/// internal event queue is saturated later, server requests are rejected
/// with overload error instead of being silently dropped.
pub async fn start(args: InProcessClientStartArgs) -> IoResult<Self> {
Self::start_with_runtime_storage_overrides(
args,
AppServerRuntimeStorageOverrides::default(),
)
.await
}
/// Starts the in-process runtime with optional thread storage overrides.
pub async fn start_with_runtime_storage_overrides(
args: InProcessClientStartArgs,
runtime_storage_overrides: AppServerRuntimeStorageOverrides,
) -> IoResult<Self> {
let channel_capacity = args.channel_capacity.max(1);
let mut handle =
codex_app_server::in_process::start(args.into_runtime_start_args()).await?;
let mut handle = codex_app_server::in_process::start_with_runtime_storage_overrides(
args.into_runtime_start_args(),
runtime_storage_overrides,
)
.await?;
let request_sender = handle.sender();
let (command_tx, mut command_rx) = mpsc::channel::<ClientCommand>(channel_capacity);
let (event_tx, event_rx) = mpsc::channel::<InProcessServerEvent>(channel_capacity);

View File

@@ -50,6 +50,7 @@ use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use crate::AppServerRuntimeStorageOverrides;
use crate::analytics_utils::analytics_events_client_from_config;
use crate::config_manager::ConfigManager;
use crate::error_code::OVERLOADED_ERROR_CODE;
@@ -348,8 +349,19 @@ impl InProcessClientHandle {
/// the handle, so callers receive a ready-to-use runtime. If initialize fails,
/// the runtime is shut down and an `InvalidData` error is returned.
pub async fn start(args: InProcessStartArgs) -> IoResult<InProcessClientHandle> {
start_with_runtime_storage_overrides(args, AppServerRuntimeStorageOverrides::default()).await
}
/// Starts an in-process app-server runtime with optional thread storage overrides.
///
/// This is the embedded-host path for replacing the standard local artifact
/// and shell snapshot stores without changing the existing start-argument shape.
pub async fn start_with_runtime_storage_overrides(
args: InProcessStartArgs,
runtime_storage_overrides: AppServerRuntimeStorageOverrides,
) -> IoResult<InProcessClientHandle> {
let initialize = args.initialize.clone();
let client = start_uninitialized(args).await?;
let client = start_uninitialized(args, runtime_storage_overrides).await?;
let initialize_response = client
.request(ClientRequest::Initialize {
@@ -369,7 +381,10 @@ pub async fn start(args: InProcessStartArgs) -> IoResult<InProcessClientHandle>
Ok(client)
}
async fn start_uninitialized(args: InProcessStartArgs) -> IoResult<InProcessClientHandle> {
async fn start_uninitialized(
args: InProcessStartArgs,
runtime_storage_overrides: AppServerRuntimeStorageOverrides,
) -> IoResult<InProcessClientHandle> {
let channel_capacity = args.channel_capacity.max(1);
let installation_id = resolve_installation_id(&args.config.codex_home).await?;
let (client_tx, mut client_rx) = mpsc::channel::<InProcessClientMessage>(channel_capacity);
@@ -438,6 +453,7 @@ async fn start_uninitialized(args: InProcessStartArgs) -> IoResult<InProcessClie
rpc_transport: AppServerRpcTransport::InProcess,
remote_control_handle: None,
plugin_startup_tasks: crate::PluginStartupTasks::Start,
runtime_storage_overrides,
}));
let mut thread_created_rx = processor.thread_created_receiver();
let session = Arc::new(ConnectionSessionState::new());

View File

@@ -6,8 +6,13 @@ use codex_config::LoaderOverrides;
use codex_config::NoopThreadConfigLoader;
use codex_config::RemoteThreadConfigLoader;
use codex_config::ThreadConfigLoader;
pub use codex_core::ThreadManagerStorageDeps;
pub use codex_core::artifact_store::ArtifactStore;
pub use codex_core::artifact_store::LocalArtifactStore;
use codex_core::config::Config;
use codex_core::resolve_installation_id;
pub use codex_core::shell_snapshot::LocalShellSnapshotStore;
pub use codex_core::shell_snapshot::ShellSnapshotStore;
use codex_login::AuthManager;
use codex_utils_cli::CliConfigOverrides;
use std::collections::HashMap;
@@ -416,6 +421,16 @@ impl Default for AppServerRuntimeOptions {
}
}
/// Optional storage overrides an app-server host can inject into managed threads.
///
/// Leaving these fields unset lets each managed thread construct its standard
/// local [`ThreadManagerStorageDeps`] from the thread config.
#[derive(Clone, Default)]
pub struct AppServerRuntimeStorageOverrides {
/// Replaces the thread manager's standard local storage bundle when set.
pub thread_manager: Option<ThreadManagerStorageDeps>,
}
#[allow(clippy::too_many_arguments)]
pub async fn run_main_with_transport_options(
arg0_paths: Arg0DispatchPaths,
@@ -427,6 +442,35 @@ pub async fn run_main_with_transport_options(
session_source: SessionSource,
auth: AppServerWebsocketAuthSettings,
runtime_options: AppServerRuntimeOptions,
) -> IoResult<()> {
run_main_with_transport_options_and_storage_overrides(
arg0_paths,
cli_config_overrides,
loader_overrides,
strict_config,
default_analytics_enabled,
transport,
session_source,
auth,
runtime_options,
AppServerRuntimeStorageOverrides::default(),
)
.await
}
/// Runs app-server with optional storage overrides for managed threads.
#[allow(clippy::too_many_arguments)]
pub async fn run_main_with_transport_options_and_storage_overrides(
arg0_paths: Arg0DispatchPaths,
cli_config_overrides: CliConfigOverrides,
loader_overrides: LoaderOverrides,
strict_config: bool,
default_analytics_enabled: bool,
transport: AppServerTransport,
session_source: SessionSource,
auth: AppServerWebsocketAuthSettings,
runtime_options: AppServerRuntimeOptions,
runtime_storage_overrides: AppServerRuntimeStorageOverrides,
) -> IoResult<()> {
let (transport_event_tx, mut transport_event_rx) =
mpsc::channel::<TransportEvent>(CHANNEL_CAPACITY);
@@ -815,6 +859,7 @@ pub async fn run_main_with_transport_options(
rpc_transport: analytics_rpc_transport(&transport),
remote_control_handle: Some(remote_control_handle.clone()),
plugin_startup_tasks: runtime_options.plugin_startup_tasks,
runtime_storage_overrides,
}));
let mut thread_created_rx = processor.thread_created_receiver();
let mut running_turn_count_rx = processor.subscribe_running_assistant_turn_count();

View File

@@ -272,6 +272,7 @@ pub(crate) struct MessageProcessorArgs {
pub(crate) rpc_transport: AppServerRpcTransport,
pub(crate) remote_control_handle: Option<RemoteControlHandle>,
pub(crate) plugin_startup_tasks: crate::PluginStartupTasks,
pub(crate) runtime_storage_overrides: crate::AppServerRuntimeStorageOverrides,
}
impl MessageProcessor {
@@ -295,6 +296,7 @@ impl MessageProcessor {
rpc_transport,
remote_control_handle,
plugin_startup_tasks,
runtime_storage_overrides,
} = args;
auth_manager.set_external_auth(Arc::new(ExternalAuthRefreshBridge {
outgoing: outgoing.clone(),
@@ -305,26 +307,45 @@ impl MessageProcessor {
// resumed, or forked threads to a different persistence backend/root.
let thread_store = codex_core::thread_store_from_config(config.as_ref(), state_db.clone());
let environment_manager_for_requests = Arc::clone(&environment_manager);
let thread_manager_storage_override = runtime_storage_overrides.thread_manager;
let thread_manager = Arc::new_cyclic(|thread_manager| {
ThreadManager::new(
config.as_ref(),
let thread_manager_storage_override = thread_manager_storage_override.clone();
let extensions = thread_extensions(
guardian_agent_spawner(thread_manager.clone()),
app_server_extension_event_sink(outgoing.clone()),
auth_manager.clone(),
session_source,
environment_manager,
thread_extensions(
guardian_agent_spawner(thread_manager.clone()),
app_server_extension_event_sink(outgoing.clone()),
);
let attestation_provider = Some(app_server_attestation_provider(
outgoing.clone(),
thread_state_manager.clone(),
));
match thread_manager_storage_override {
Some(storage_deps) => ThreadManager::new_with_storage_deps(
config.as_ref(),
auth_manager.clone(),
session_source,
environment_manager,
extensions,
Some(analytics_events_client.clone()),
Arc::clone(&thread_store),
storage_deps,
state_db.clone(),
installation_id,
attestation_provider,
),
Some(analytics_events_client.clone()),
Arc::clone(&thread_store),
state_db.clone(),
installation_id,
Some(app_server_attestation_provider(
outgoing.clone(),
thread_state_manager.clone(),
)),
)
None => ThreadManager::new(
config.as_ref(),
auth_manager.clone(),
session_source,
environment_manager,
extensions,
Some(analytics_events_client.clone()),
Arc::clone(&thread_store),
state_db.clone(),
installation_id,
attestation_provider,
),
}
});
thread_manager
.plugins_manager()

View File

@@ -267,6 +267,7 @@ async fn build_test_processor(
rpc_transport: AppServerRpcTransport::Stdio,
remote_control_handle: None,
plugin_startup_tasks: crate::PluginStartupTasks::Start,
runtime_storage_overrides: crate::AppServerRuntimeStorageOverrides::default(),
}));
(processor, outgoing_rx)
}

View File

@@ -0,0 +1,80 @@
use std::future::Future;
use std::pin::Pin;
use codex_protocol::error::Result;
use codex_utils_absolute_path::AbsolutePathBuf;
const GENERATED_IMAGE_ARTIFACTS_DIR: &str = "generated_images";
/// Stores generated artifacts and materializes executor-readable local paths.
pub trait ArtifactStore: Send + Sync + 'static {
/// Returns the executor-readable path exposed to generated image instructions.
fn generated_image_path(&self, session_id: &str, call_id: &str) -> AbsolutePathBuf;
/// Persists a generated image payload and returns its executor-readable path.
fn write_generated_image(
&self,
session_id: &str,
call_id: &str,
bytes: Vec<u8>,
) -> ArtifactWriteFuture<'_>;
}
/// Future returned by artifact writes.
pub type ArtifactWriteFuture<'a> =
Pin<Box<dyn Future<Output = Result<AbsolutePathBuf>> + Send + 'a>>;
#[derive(Clone)]
pub struct LocalArtifactStore {
codex_home: AbsolutePathBuf,
}
impl LocalArtifactStore {
pub fn from_codex_home(codex_home: &AbsolutePathBuf) -> Self {
Self {
codex_home: codex_home.clone(),
}
}
}
impl ArtifactStore for LocalArtifactStore {
fn generated_image_path(&self, session_id: &str, call_id: &str) -> AbsolutePathBuf {
self.codex_home
.join(GENERATED_IMAGE_ARTIFACTS_DIR)
.join(sanitize_artifact_path_component(session_id))
.join(format!("{}.png", sanitize_artifact_path_component(call_id)))
}
fn write_generated_image(
&self,
session_id: &str,
call_id: &str,
bytes: Vec<u8>,
) -> ArtifactWriteFuture<'_> {
let path = self.generated_image_path(session_id, call_id);
Box::pin(async move {
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
tokio::fs::write(&path, bytes).await?;
Ok(path)
})
}
}
fn sanitize_artifact_path_component(value: &str) -> String {
let mut sanitized: String = value
.chars()
.map(|ch| {
if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
ch
} else {
'_'
}
})
.collect();
if sanitized.is_empty() {
sanitized = "generated_image".to_string();
}
sanitized
}

View File

@@ -105,6 +105,8 @@ pub(crate) async fn run_codex_thread_interactive(
environment_selections: parent_ctx.environments.clone(),
analytics_events_client: Some(parent_session.services.analytics_events_client.clone()),
thread_store: Arc::clone(&parent_session.services.thread_store),
artifact_store: Arc::clone(&parent_session.services.artifact_store),
shell_snapshot_store: Arc::clone(&parent_session.services.shell_snapshot_store),
attestation_provider: parent_session.services.attestation_provider.clone(),
inherited_multi_agent_version: Some(MultiAgentVersion::Disabled),
}))

View File

@@ -24,6 +24,7 @@ pub use codex_thread::CodexThreadSettingsOverrides;
pub use codex_thread::ThreadConfigSnapshot;
pub use session::turn_context::TurnContext;
mod agent;
pub mod artifact_store;
mod attestation;
mod codex_delegate;
mod command_canonicalization;
@@ -112,6 +113,7 @@ pub use thread_manager::ForkSnapshot;
pub use thread_manager::NewThread;
pub use thread_manager::StartThreadOptions;
pub use thread_manager::ThreadManager;
pub use thread_manager::ThreadManagerStorageDeps;
pub use thread_manager::ThreadShutdownReport;
pub use thread_manager::build_models_manager;
pub use thread_manager::thread_store_from_config;
@@ -132,7 +134,7 @@ mod rollout;
pub(crate) mod safety;
mod session_rollout_init_error;
pub mod shell;
pub(crate) mod shell_snapshot;
pub mod shell_snapshot;
pub mod spawn;
pub(crate) mod state_db_bridge;
pub use state_db_bridge::StateDbHandle;

View File

@@ -13,6 +13,7 @@ use crate::agent::AgentControl;
use crate::agent::AgentStatus;
use crate::agent::agent_status_from_event;
use crate::agent::status::is_final;
use crate::artifact_store::ArtifactStore;
use crate::attestation::AttestationProvider;
use crate::build_available_skills;
use crate::compact;
@@ -419,6 +420,8 @@ pub(crate) struct CodexSpawnArgs {
pub(crate) environment_selections: ResolvedTurnEnvironments,
pub(crate) analytics_events_client: Option<AnalyticsEventsClient>,
pub(crate) thread_store: Arc<dyn ThreadStore>,
pub(crate) artifact_store: Arc<dyn ArtifactStore>,
pub(crate) shell_snapshot_store: Arc<dyn crate::shell_snapshot::ShellSnapshotStore>,
pub(crate) attestation_provider: Option<Arc<dyn AttestationProvider>>,
pub(crate) inherited_multi_agent_version: Option<MultiAgentVersion>,
}
@@ -499,6 +502,8 @@ impl Codex {
environment_selections,
analytics_events_client,
thread_store,
artifact_store,
shell_snapshot_store,
attestation_provider,
inherited_multi_agent_version,
} = args;
@@ -645,6 +650,8 @@ impl Codex {
environment_manager,
analytics_events_client,
thread_store,
artifact_store,
shell_snapshot_store,
parent_rollout_thread_trace,
attestation_provider,
multi_agent_version,
@@ -1349,7 +1356,6 @@ impl Session {
&self,
previous_cwd: &AbsolutePathBuf,
next_cwd: &AbsolutePathBuf,
codex_home: &AbsolutePathBuf,
session_source: &SessionSource,
) {
if previous_cwd == next_cwd {
@@ -1368,7 +1374,7 @@ impl Session {
}
ShellSnapshot::refresh_snapshot(
codex_home.clone(),
Arc::clone(&self.services.shell_snapshot_store),
self.conversation_id,
next_cwd.clone(),
self.services.user_shell.as_ref().clone(),
@@ -1389,7 +1395,6 @@ impl Session {
previous_cwd,
permission_profile_changed,
next_cwd,
codex_home,
session_source,
) = {
let mut state = self.state.lock().await;
@@ -1411,7 +1416,6 @@ impl Session {
let permission_profile_changed =
previous_permission_profile != updated_permission_profile;
let next_cwd = updated.cwd.clone();
let codex_home = updated.codex_home.clone();
let session_source = updated.session_source.clone();
state.session_configuration = updated;
(
@@ -1420,18 +1424,12 @@ impl Session {
previous_cwd,
permission_profile_changed,
next_cwd,
codex_home,
session_source,
)
};
self.emit_config_changed_contributors(previous_config.as_ref(), new_config.as_ref());
self.maybe_refresh_shell_snapshot_for_cwd(
&previous_cwd,
&next_cwd,
&codex_home,
&session_source,
);
self.maybe_refresh_shell_snapshot_for_cwd(&previous_cwd, &next_cwd, &session_source);
if permission_profile_changed {
self.refresh_managed_network_proxy_for_current_permission_profile()
.await;

View File

@@ -1,5 +1,6 @@
use super::input_queue::InputQueue;
use super::*;
use crate::artifact_store::ArtifactStore;
use crate::config::ConstraintError;
use crate::goals::GoalRuntimeState;
use crate::skills::SkillError;
@@ -504,6 +505,8 @@ impl Session {
environment_manager: Arc<EnvironmentManager>,
analytics_events_client: Option<AnalyticsEventsClient>,
thread_store: Arc<dyn ThreadStore>,
artifact_store: Arc<dyn ArtifactStore>,
shell_snapshot_store: Arc<dyn crate::shell_snapshot::ShellSnapshotStore>,
parent_rollout_thread_trace: ThreadTraceContext,
attestation_provider: Option<Arc<dyn AttestationProvider>>,
multi_agent_version: Option<MultiAgentVersion>,
@@ -859,7 +862,7 @@ impl Session {
tx
} else {
ShellSnapshot::start_snapshotting(
config.codex_home.clone(),
Arc::clone(&shell_snapshot_store),
thread_id,
session_configuration.cwd.clone(),
&mut default_shell,
@@ -1006,6 +1009,7 @@ impl Session {
rollout_thread_trace,
user_shell: Arc::new(default_shell),
shell_snapshot_tx,
shell_snapshot_store,
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
exec_policy,
auth_manager: Arc::clone(&auth_manager),
@@ -1030,6 +1034,7 @@ impl Session {
state_db: state_db_ctx.clone(),
live_thread: live_thread_init.as_ref().cloned(),
thread_store: Arc::clone(&thread_store),
artifact_store,
attestation_provider: attestation_provider.clone(),
model_client: ModelClient::new(
Some(Arc::clone(&auth_manager)),

View File

@@ -1,5 +1,7 @@
use super::turn_context::TurnEnvironment;
use super::*;
use crate::artifact_store::ArtifactStore;
use crate::artifact_store::ArtifactWriteFuture;
use crate::config::ConfigBuilder;
use crate::config::ConfigOverrides;
use crate::config::test_config;
@@ -141,6 +143,7 @@ use codex_protocol::protocol::W3cTraceContext;
use codex_protocol::request_user_input::RequestUserInputAnswer;
use codex_protocol::request_user_input::RequestUserInputResponse;
use codex_rmcp_client::ElicitationAction;
use codex_utils_absolute_path::AbsolutePathBuf;
use core_test_support::PathBufExt;
use core_test_support::PathExt;
use core_test_support::context_snapshot;
@@ -190,6 +193,32 @@ struct InstructionsTestCase {
expects_apply_patch_description: bool,
}
#[derive(Clone)]
struct FakeArtifactStore {
writes: Arc<std::sync::Mutex<Vec<Vec<u8>>>>,
}
impl ArtifactStore for FakeArtifactStore {
fn generated_image_path(&self, session_id: &str, call_id: &str) -> AbsolutePathBuf {
let path = format!("/fake/{session_id}/{call_id}.png");
test_path_buf(&path).abs()
}
fn write_generated_image(
&self,
session_id: &str,
call_id: &str,
bytes: Vec<u8>,
) -> ArtifactWriteFuture<'_> {
let writes = Arc::clone(&self.writes);
let path = self.generated_image_path(session_id, call_id);
Box::pin(async move {
writes.lock().expect("lock writes").push(bytes);
Ok(path)
})
}
}
fn user_message(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
@@ -4603,6 +4632,12 @@ async fn session_new_fails_when_zsh_fork_enabled_without_packaged_zsh() {
codex_thread_store::LocalThreadStoreConfig::from_config(config.as_ref()),
/*state_db*/ None,
)),
Arc::new(crate::artifact_store::LocalArtifactStore::from_codex_home(
&config.codex_home,
)),
Arc::new(
crate::shell_snapshot::LocalShellSnapshotStore::from_codex_home(&config.codex_home),
),
codex_rollout_trace::ThreadTraceContext::disabled(),
/*attestation_provider*/ None,
Some(config.multi_agent_version_from_features()),
@@ -4735,6 +4770,9 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
rollout_thread_trace: codex_rollout_trace::ThreadTraceContext::disabled(),
user_shell: Arc::new(default_user_shell()),
shell_snapshot_tx: watch::channel(None).0,
shell_snapshot_store: Arc::new(
crate::shell_snapshot::LocalShellSnapshotStore::from_codex_home(&config.codex_home),
),
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
exec_policy,
auth_manager: auth_manager.clone(),
@@ -4763,6 +4801,9 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
codex_thread_store::LocalThreadStoreConfig::from_config(config.as_ref()),
/*state_db*/ None,
)),
artifact_store: Arc::new(crate::artifact_store::LocalArtifactStore::from_codex_home(
&config.codex_home,
)),
attestation_provider: None,
model_client: ModelClient::new(
Some(auth_manager.clone()),
@@ -4953,6 +4994,12 @@ async fn make_session_with_config_and_rx(
codex_thread_store::LocalThreadStoreConfig::from_config(config.as_ref()),
/*state_db*/ None,
)),
Arc::new(crate::artifact_store::LocalArtifactStore::from_codex_home(
&config.codex_home,
)),
Arc::new(
crate::shell_snapshot::LocalShellSnapshotStore::from_codex_home(&config.codex_home),
),
codex_rollout_trace::ThreadTraceContext::disabled(),
/*attestation_provider*/ None,
Some(config.multi_agent_version_from_features()),
@@ -5065,6 +5112,12 @@ async fn make_session_with_history_source_and_agent_control_and_rx(
.expect("state db should initialize"),
),
)),
Arc::new(crate::artifact_store::LocalArtifactStore::from_codex_home(
&config.codex_home,
)),
Arc::new(
crate::shell_snapshot::LocalShellSnapshotStore::from_codex_home(&config.codex_home),
),
codex_rollout_trace::ThreadTraceContext::disabled(),
/*attestation_provider*/ None,
Some(config.multi_agent_version_from_features()),
@@ -6825,6 +6878,9 @@ where
rollout_thread_trace: codex_rollout_trace::ThreadTraceContext::disabled(),
user_shell: Arc::new(default_user_shell()),
shell_snapshot_tx: watch::channel(None).0,
shell_snapshot_store: Arc::new(
crate::shell_snapshot::LocalShellSnapshotStore::from_codex_home(&config.codex_home),
),
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
exec_policy,
auth_manager: Arc::clone(&auth_manager),
@@ -6853,6 +6909,9 @@ where
codex_thread_store::LocalThreadStoreConfig::from_config(config.as_ref()),
state_db,
)),
artifact_store: Arc::new(crate::artifact_store::LocalArtifactStore::from_codex_home(
&config.codex_home,
)),
attestation_provider: None,
model_client: ModelClient::new(
Some(Arc::clone(&auth_manager)),
@@ -7844,6 +7903,69 @@ async fn handle_output_item_done_records_image_save_history_message() {
let _ = std::fs::remove_file(&expected_saved_path);
}
#[tokio::test]
async fn handle_output_item_done_uses_injected_artifact_store() {
let (mut session, turn_context) = make_session_and_context().await;
let writes = Arc::new(std::sync::Mutex::new(Vec::new()));
session.services.artifact_store = Arc::new(FakeArtifactStore {
writes: Arc::clone(&writes),
});
let session = Arc::new(session);
let turn_context = Arc::new(turn_context);
let call_id = "ig_injected_store";
let expected_saved_path = session
.services
.artifact_store
.generated_image_path(&session.conversation_id.to_string(), call_id);
let item = ResponseItem::ImageGenerationCall {
id: call_id.to_string(),
status: "completed".to_string(),
revised_prompt: Some("a tiny blue square".to_string()),
result: "Zm9v".to_string(),
};
let mut ctx = HandleOutputCtx {
sess: Arc::clone(&session),
turn_context: Arc::clone(&turn_context),
turn_store: Arc::new(codex_extension_api::ExtensionData::new(
turn_context.sub_id.clone(),
)),
tool_runtime: test_tool_runtime(Arc::clone(&session), Arc::clone(&turn_context)),
cancellation_token: CancellationToken::new(),
};
handle_output_item_done(&mut ctx, item.clone(), /*previously_active_item*/ None)
.await
.expect("image generation item should succeed");
assert_eq!(
writes.lock().expect("lock writes").as_slice(),
&[b"foo".to_vec()]
);
let image_output_path = session
.services
.artifact_store
.generated_image_path(&session.conversation_id.to_string(), "<image_id>");
let image_output_dir = image_output_path
.parent()
.expect("generated image path should have a parent");
let image_message: ResponseItem = crate::context::ContextualUserFragment::into(
crate::context::ImageGenerationInstructions::new(
image_output_dir.display(),
image_output_path.display(),
),
);
let history = session.clone_history().await;
assert_eq!(history.raw_items(), &[image_message, item]);
assert_eq!(
expected_saved_path,
test_path_buf(&format!(
"/fake/{}/ig_injected_store.png",
session.conversation_id
))
.abs()
);
}
#[tokio::test]
async fn handle_output_item_done_skips_image_save_message_when_save_fails() {
let (session, turn_context) = make_session_and_context().await;

View File

@@ -702,6 +702,12 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() {
codex_thread_store::LocalThreadStoreConfig::from_config(&config),
/*state_db*/ None,
));
let artifact_store = Arc::new(crate::artifact_store::LocalArtifactStore::from_codex_home(
&config.codex_home,
));
let shell_snapshot_store = Arc::new(
crate::shell_snapshot::LocalShellSnapshotStore::from_codex_home(&config.codex_home),
);
let CodexSpawnOk { codex, .. } = Codex::spawn(CodexSpawnArgs {
config,
@@ -733,6 +739,8 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() {
},
analytics_events_client: None,
thread_store,
artifact_store,
shell_snapshot_store,
attestation_provider: None,
inherited_multi_agent_version: None,
})

View File

@@ -609,7 +609,6 @@ impl Session {
let next_permission_profile = next.permission_profile();
let permission_profile_changed =
previous_permission_profile != next_permission_profile;
let codex_home = next.codex_home.clone();
let session_source = next.session_source.clone();
let previous_config = notify_config_contributors.then(|| {
Self::build_effective_session_config(&state.session_configuration)
@@ -622,7 +621,6 @@ impl Session {
turn_environments,
permission_profile_changed,
previous_cwd,
codex_home,
session_source,
previous_config,
new_config,
@@ -637,7 +635,6 @@ impl Session {
turn_environments,
permission_profile_changed,
previous_cwd,
codex_home,
session_source,
previous_config,
new_config,
@@ -661,7 +658,6 @@ impl Session {
self.maybe_refresh_shell_snapshot_for_cwd(
&previous_cwd,
&session_configuration.cwd,
&codex_home,
&session_source,
);

View File

@@ -1,5 +1,7 @@
use std::future::Future;
use std::io::ErrorKind;
use std::path::Path;
use std::pin::Pin;
use std::process::Stdio;
use std::sync::Arc;
use std::time::Duration;
@@ -35,9 +37,144 @@ const SNAPSHOT_RETENTION: Duration = Duration::from_secs(60 * 60 * 24 * 3); // 3
const SNAPSHOT_DIR: &str = "shell_snapshots";
const EXCLUDED_EXPORT_VARS: &[&str] = &["PWD", "OLDPWD"];
/// Persists executor-local shell snapshot files and owns retention cleanup.
/// Implementations must return local readable paths because shell execution sources snapshots.
pub trait ShellSnapshotStore: Send + Sync + 'static {
/// Allocates the final and temporary paths for one snapshot generation.
fn snapshot_paths(&self, session_id: ThreadId, shell_type: ShellType) -> ShellSnapshotPaths;
/// Removes stale inactive snapshots according to the store's retention policy.
fn cleanup_stale_snapshots(
&self,
active_session_id: ThreadId,
state_db: Option<StateDbHandle>,
) -> ShellSnapshotStoreFuture<'_>;
}
/// Future returned by shell snapshot store cleanup work.
pub type ShellSnapshotStoreFuture<'a> = Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
#[derive(Clone)]
/// Codex Home backed executor-local shell snapshot store.
pub struct LocalShellSnapshotStore {
codex_home: AbsolutePathBuf,
}
impl LocalShellSnapshotStore {
/// Constructs a local shell snapshot store rooted at one Codex Home.
pub fn from_codex_home(codex_home: &AbsolutePathBuf) -> Self {
Self {
codex_home: codex_home.clone(),
}
}
fn snapshot_dir(&self) -> AbsolutePathBuf {
self.codex_home.join(SNAPSHOT_DIR)
}
async fn cleanup_stale_snapshots_impl(
&self,
active_session_id: ThreadId,
state_db: Option<StateDbHandle>,
) -> Result<()> {
let snapshot_dir = self.snapshot_dir();
let mut entries = match fs::read_dir(&snapshot_dir).await {
Ok(entries) => entries,
Err(err) if err.kind() == ErrorKind::NotFound => return Ok(()),
Err(err) => return Err(err.into()),
};
let now = SystemTime::now();
let active_session_id = active_session_id.to_string();
while let Some(entry) = entries.next_entry().await? {
if !entry.file_type().await?.is_file() {
continue;
}
let path = entry.path();
let file_name = entry.file_name();
let file_name = file_name.to_string_lossy();
let Some(session_id) = snapshot_session_id_from_file_name(&file_name) else {
remove_snapshot_file(&path).await;
continue;
};
if session_id == active_session_id {
continue;
}
let rollout_path =
find_thread_path_by_id_str(&self.codex_home, session_id, state_db.as_deref())
.await?;
let Some(rollout_path) = rollout_path else {
remove_snapshot_file(&path).await;
continue;
};
let modified = match fs::metadata(&rollout_path).await.and_then(|m| m.modified()) {
Ok(modified) => modified,
Err(err) => {
tracing::warn!(
"Failed to check rollout age for snapshot {}: {err:?}",
path.display()
);
continue;
}
};
if now
.duration_since(modified)
.ok()
.is_some_and(|age| age >= SNAPSHOT_RETENTION)
{
remove_snapshot_file(&path).await;
}
}
Ok(())
}
}
impl ShellSnapshotStore for LocalShellSnapshotStore {
fn snapshot_paths(&self, session_id: ThreadId, shell_type: ShellType) -> ShellSnapshotPaths {
let extension = match shell_type {
ShellType::PowerShell => "ps1",
_ => "sh",
};
let nonce = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map(|duration| duration.as_nanos())
.unwrap_or(0);
let snapshot_dir = self.snapshot_dir();
ShellSnapshotPaths {
path: snapshot_dir.join(format!("{session_id}.{nonce}.{extension}")),
temp_path: snapshot_dir.join(format!("{session_id}.tmp-{nonce}")),
}
}
fn cleanup_stale_snapshots(
&self,
active_session_id: ThreadId,
state_db: Option<StateDbHandle>,
) -> ShellSnapshotStoreFuture<'_> {
Box::pin(self.cleanup_stale_snapshots_impl(active_session_id, state_db))
}
}
/// Final and temporary local paths for one shell snapshot generation.
pub struct ShellSnapshotPaths {
/// Final path sourced by later shell execution.
pub path: AbsolutePathBuf,
/// Temporary path populated before validation and rename.
pub temp_path: AbsolutePathBuf,
}
impl ShellSnapshot {
pub fn start_snapshotting(
codex_home: AbsolutePathBuf,
pub(crate) fn start_snapshotting(
shell_snapshot_store: Arc<dyn ShellSnapshotStore>,
session_id: ThreadId,
session_cwd: AbsolutePathBuf,
shell: &mut Shell,
@@ -48,7 +185,7 @@ impl ShellSnapshot {
shell.shell_snapshot = shell_snapshot_rx;
Self::spawn_snapshot_task(
codex_home,
shell_snapshot_store,
session_id,
session_cwd,
shell.clone(),
@@ -60,8 +197,8 @@ impl ShellSnapshot {
shell_snapshot_tx
}
pub fn refresh_snapshot(
codex_home: AbsolutePathBuf,
pub(crate) fn refresh_snapshot(
shell_snapshot_store: Arc<dyn ShellSnapshotStore>,
session_id: ThreadId,
session_cwd: AbsolutePathBuf,
shell: Shell,
@@ -70,7 +207,7 @@ impl ShellSnapshot {
state_db: Option<StateDbHandle>,
) {
Self::spawn_snapshot_task(
codex_home,
shell_snapshot_store,
session_id,
session_cwd,
shell,
@@ -81,7 +218,7 @@ impl ShellSnapshot {
}
fn spawn_snapshot_task(
codex_home: AbsolutePathBuf,
shell_snapshot_store: Arc<dyn ShellSnapshotStore>,
session_id: ThreadId,
session_cwd: AbsolutePathBuf,
snapshot_shell: Shell,
@@ -92,13 +229,24 @@ impl ShellSnapshot {
let snapshot_span = info_span!("shell_snapshot", thread_id = %session_id);
tokio::spawn(
async move {
let cleanup_shell_snapshot_store = Arc::clone(&shell_snapshot_store);
let cleanup_session_id = session_id;
let cleanup_state_db = state_db.clone();
tokio::spawn(async move {
if let Err(err) = cleanup_shell_snapshot_store
.cleanup_stale_snapshots(cleanup_session_id, cleanup_state_db)
.await
{
tracing::warn!("Failed to clean up shell snapshots: {err:?}");
}
});
let timer = session_telemetry.start_timer("codex.shell_snapshot.duration_ms", &[]);
let snapshot = ShellSnapshot::try_new(
&codex_home,
shell_snapshot_store.as_ref(),
session_id,
&session_cwd,
&snapshot_shell,
state_db,
)
.await
.map(Arc::new);
@@ -117,38 +265,13 @@ impl ShellSnapshot {
}
async fn try_new(
codex_home: &AbsolutePathBuf,
shell_snapshot_store: &dyn ShellSnapshotStore,
session_id: ThreadId,
session_cwd: &AbsolutePathBuf,
shell: &Shell,
state_db: Option<StateDbHandle>,
) -> std::result::Result<Self, &'static str> {
// File to store the snapshot
let extension = match shell.shell_type {
ShellType::PowerShell => "ps1",
_ => "sh",
};
let nonce = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map(|duration| duration.as_nanos())
.unwrap_or(0);
let path = codex_home
.join(SNAPSHOT_DIR)
.join(format!("{session_id}.{nonce}.{extension}"));
let temp_path = codex_home
.join(SNAPSHOT_DIR)
.join(format!("{session_id}.tmp-{nonce}"));
// Clean the (unlikely) leaked snapshot files.
let codex_home = codex_home.clone();
let cleanup_session_id = session_id;
tokio::spawn(async move {
if let Err(err) =
cleanup_stale_snapshots(&codex_home, cleanup_session_id, state_db).await
{
tracing::warn!("Failed to clean up shell snapshots: {err:?}");
}
});
let ShellSnapshotPaths { path, temp_path } =
shell_snapshot_store.snapshot_paths(session_id, shell.shell_type.clone());
// Make the new snapshot.
if let Err(err) =
@@ -494,72 +617,6 @@ $envVars | ForEach-Object {
"##
}
/// Removes shell snapshots that either lack a matching session rollout file or
/// whose rollouts have not been updated within the retention window.
/// The active session id is exempt from cleanup.
pub async fn cleanup_stale_snapshots(
codex_home: &AbsolutePathBuf,
active_session_id: ThreadId,
state_db: Option<StateDbHandle>,
) -> Result<()> {
let snapshot_dir = codex_home.join(SNAPSHOT_DIR);
let mut entries = match fs::read_dir(&snapshot_dir).await {
Ok(entries) => entries,
Err(err) if err.kind() == ErrorKind::NotFound => return Ok(()),
Err(err) => return Err(err.into()),
};
let now = SystemTime::now();
let active_session_id = active_session_id.to_string();
while let Some(entry) = entries.next_entry().await? {
if !entry.file_type().await?.is_file() {
continue;
}
let path = entry.path();
let file_name = entry.file_name();
let file_name = file_name.to_string_lossy();
let Some(session_id) = snapshot_session_id_from_file_name(&file_name) else {
remove_snapshot_file(&path).await;
continue;
};
if session_id == active_session_id {
continue;
}
let rollout_path =
find_thread_path_by_id_str(codex_home, session_id, state_db.as_deref()).await?;
let Some(rollout_path) = rollout_path else {
remove_snapshot_file(&path).await;
continue;
};
let modified = match fs::metadata(&rollout_path).await.and_then(|m| m.modified()) {
Ok(modified) => modified,
Err(err) => {
tracing::warn!(
"Failed to check rollout age for snapshot {}: {err:?}",
path.display()
);
continue;
}
};
if now
.duration_since(modified)
.ok()
.is_some_and(|age| age >= SNAPSHOT_RETENTION)
{
remove_snapshot_file(&path).await;
}
}
Ok(())
}
async fn remove_snapshot_file(path: &Path) {
if let Err(err) = fs::remove_file(path).await {
tracing::warn!("Failed to delete shell snapshot at {:?}: {err:?}", path);

View File

@@ -196,13 +196,13 @@ async fn try_new_creates_and_deletes_snapshot_file() -> Result<()> {
shell_path: PathBuf::from("/bin/bash"),
shell_snapshot: crate::shell::empty_shell_snapshot_receiver(),
};
let shell_snapshot_store = LocalShellSnapshotStore::from_codex_home(&dir.path().abs());
let snapshot = ShellSnapshot::try_new(
&dir.path().abs(),
&shell_snapshot_store,
ThreadId::new(),
&dir.path().abs(),
&shell,
/*state_db*/ None,
)
.await
.expect("snapshot should be created");
@@ -227,25 +227,16 @@ async fn try_new_uses_distinct_generation_paths() -> Result<()> {
shell_path: PathBuf::from("/bin/bash"),
shell_snapshot: crate::shell::empty_shell_snapshot_receiver(),
};
let shell_snapshot_store = LocalShellSnapshotStore::from_codex_home(&dir.path().abs());
let initial_snapshot = ShellSnapshot::try_new(
&dir.path().abs(),
session_id,
&dir.path().abs(),
&shell,
/*state_db*/ None,
)
.await
.expect("initial snapshot should be created");
let refreshed_snapshot = ShellSnapshot::try_new(
&dir.path().abs(),
session_id,
&dir.path().abs(),
&shell,
/*state_db*/ None,
)
.await
.expect("refreshed snapshot should be created");
let initial_snapshot =
ShellSnapshot::try_new(&shell_snapshot_store, session_id, &dir.path().abs(), &shell)
.await
.expect("initial snapshot should be created");
let refreshed_snapshot =
ShellSnapshot::try_new(&shell_snapshot_store, session_id, &dir.path().abs(), &shell)
.await
.expect("refreshed snapshot should be created");
let initial_path = initial_snapshot.path.clone();
let refreshed_path = refreshed_snapshot.path.clone();
@@ -439,7 +430,9 @@ async fn cleanup_stale_snapshots_removes_orphans_and_keeps_live() -> Result<()>
fs::write(&orphan_snapshot, "orphan").await?;
fs::write(&invalid_snapshot, "invalid").await?;
cleanup_stale_snapshots(&codex_home, ThreadId::new(), /*state_db*/ None).await?;
LocalShellSnapshotStore::from_codex_home(&codex_home)
.cleanup_stale_snapshots(ThreadId::new(), /*state_db*/ None)
.await?;
assert_eq!(live_snapshot.exists(), true);
assert_eq!(orphan_snapshot.exists(), false);
@@ -462,7 +455,9 @@ async fn cleanup_stale_snapshots_removes_stale_rollouts() -> Result<()> {
set_file_mtime(&rollout_path, SNAPSHOT_RETENTION + Duration::from_secs(60))?;
cleanup_stale_snapshots(&codex_home, ThreadId::new(), /*state_db*/ None).await?;
LocalShellSnapshotStore::from_codex_home(&codex_home)
.cleanup_stale_snapshots(ThreadId::new(), /*state_db*/ None)
.await?;
assert_eq!(stale_snapshot.exists(), false);
Ok(())
@@ -483,7 +478,9 @@ async fn cleanup_stale_snapshots_skips_active_session() -> Result<()> {
set_file_mtime(&rollout_path, SNAPSHOT_RETENTION + Duration::from_secs(60))?;
cleanup_stale_snapshots(&codex_home, active_session, /*state_db*/ None).await?;
LocalShellSnapshotStore::from_codex_home(&codex_home)
.cleanup_stale_snapshots(active_session, /*state_db*/ None)
.await?;
assert_eq!(active_snapshot.exists(), true);
Ok(())

View File

@@ -3,6 +3,7 @@ use std::sync::Arc;
use crate::SkillsManager;
use crate::agent::AgentControl;
use crate::artifact_store::ArtifactStore;
use crate::attestation::AttestationProvider;
use crate::client::ModelClient;
use crate::config::NetworkProxyAuditMetadata;
@@ -51,6 +52,7 @@ pub(crate) struct SessionServices {
pub(crate) rollout_thread_trace: ThreadTraceContext,
pub(crate) user_shell: Arc<crate::shell::Shell>,
pub(crate) shell_snapshot_tx: watch::Sender<Option<Arc<crate::shell_snapshot::ShellSnapshot>>>,
pub(crate) shell_snapshot_store: Arc<dyn crate::shell_snapshot::ShellSnapshotStore>,
pub(crate) show_raw_agent_reasoning: bool,
pub(crate) exec_policy: Arc<ExecPolicyManager>,
pub(crate) auth_manager: Arc<AuthManager>,
@@ -74,6 +76,7 @@ pub(crate) struct SessionServices {
pub(crate) state_db: Option<StateDbHandle>,
pub(crate) live_thread: Option<LiveThread>,
pub(crate) thread_store: Arc<dyn ThreadStore>,
pub(crate) artifact_store: Arc<dyn ArtifactStore>,
pub(crate) attestation_provider: Option<Arc<dyn AttestationProvider>>,
/// Session-scoped model client shared across turns.
pub(crate) model_client: ModelClient,

View File

@@ -36,34 +36,17 @@ use tracing::debug;
use tracing::instrument;
use tracing::warn;
const GENERATED_IMAGE_ARTIFACTS_DIR: &str = "generated_images";
use crate::artifact_store::ArtifactStore;
#[cfg(test)]
use crate::artifact_store::LocalArtifactStore;
#[cfg(test)]
pub(crate) fn image_generation_artifact_path(
codex_home: &AbsolutePathBuf,
session_id: &str,
call_id: &str,
) -> AbsolutePathBuf {
let sanitize = |value: &str| {
let mut sanitized: String = value
.chars()
.map(|ch| {
if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
ch
} else {
'_'
}
})
.collect();
if sanitized.is_empty() {
sanitized = "generated_image".to_string();
}
sanitized
};
codex_home
.join(GENERATED_IMAGE_ARTIFACTS_DIR)
.join(sanitize(session_id))
.join(format!("{}.png", sanitize(call_id)))
LocalArtifactStore::from_codex_home(codex_home).generated_image_path(session_id, call_id)
}
fn strip_hidden_assistant_markup(text: &str, plan_mode: bool) -> String {
@@ -108,7 +91,7 @@ pub(crate) fn raw_assistant_output_text_from_item(item: &ResponseItem) -> Option
}
async fn save_image_generation_result(
codex_home: &AbsolutePathBuf,
artifact_store: &dyn ArtifactStore,
session_id: &str,
call_id: &str,
result: &str,
@@ -118,12 +101,9 @@ async fn save_image_generation_result(
.map_err(|err| {
CodexErr::InvalidRequest(format!("invalid image generation payload: {err}"))
})?;
let path = image_generation_artifact_path(codex_home, session_id, call_id);
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
tokio::fs::write(&path, bytes).await?;
Ok(path)
artifact_store
.write_generated_image(session_id, call_id, bytes)
.await
}
pub(crate) async fn persist_image_generation_item(
@@ -134,7 +114,7 @@ pub(crate) async fn persist_image_generation_item(
image_item.saved_path = None;
let session_id = sess.conversation_id.to_string();
match save_image_generation_result(
&turn_context.config.codex_home,
sess.services.artifact_store.as_ref(),
&session_id,
&image_item.id,
&image_item.result,
@@ -146,11 +126,10 @@ pub(crate) async fn persist_image_generation_item(
Some(path)
}
Err(err) => {
let output_path = image_generation_artifact_path(
&turn_context.config.codex_home,
&session_id,
&image_item.id,
);
let output_path = sess
.services
.artifact_store
.generated_image_path(&session_id, &image_item.id);
let output_dir = output_path
.parent()
.unwrap_or_else(|| turn_context.config.codex_home.clone());
@@ -173,8 +152,10 @@ async fn record_image_generation_instructions(
return;
}
let session_id = sess.conversation_id.to_string();
let image_output_path =
image_generation_artifact_path(&turn_context.config.codex_home, &session_id, "<image_id>");
let image_output_path = sess
.services
.artifact_store
.generated_image_path(&session_id, "<image_id>");
let image_output_dir = image_output_path
.parent()
.unwrap_or_else(|| turn_context.config.codex_home.clone());

View File

@@ -8,6 +8,7 @@ use super::image_generation_artifact_path;
use super::last_assistant_message_from_item;
use super::response_item_may_include_external_context;
use super::save_image_generation_result;
use crate::artifact_store::LocalArtifactStore;
use crate::session::tests::make_session_and_context;
use crate::tools::ToolRouter;
use crate::tools::parallel::ToolCallRuntime;
@@ -424,10 +425,14 @@ async fn save_image_generation_result_saves_base64_to_png_in_codex_home() {
let expected_path = image_generation_artifact_path(&codex_home, "session-1", "ig_save_base64");
let _ = std::fs::remove_file(&expected_path);
let saved_path =
save_image_generation_result(&codex_home, "session-1", "ig_save_base64", "Zm9v")
.await
.expect("image should be saved");
let saved_path = save_image_generation_result(
&LocalArtifactStore::from_codex_home(&codex_home),
"session-1",
"ig_save_base64",
"Zm9v",
)
.await
.expect("image should be saved");
assert_eq!(saved_path, expected_path);
assert_eq!(std::fs::read(&saved_path).expect("saved file"), b"foo");
@@ -440,9 +445,14 @@ async fn save_image_generation_result_rejects_data_url_payload() {
let codex_home = tempfile::tempdir().expect("create codex home");
let codex_home = codex_home.path().abs();
let err = save_image_generation_result(&codex_home, "session-1", "ig_456", result)
.await
.expect_err("data url payload should error");
let err = save_image_generation_result(
&LocalArtifactStore::from_codex_home(&codex_home),
"session-1",
"ig_456",
result,
)
.await
.expect_err("data url payload should error");
assert!(matches!(err, CodexErr::InvalidRequest(_)));
}
@@ -459,9 +469,14 @@ async fn save_image_generation_result_overwrites_existing_file() {
.expect("create image output dir");
std::fs::write(&existing_path, b"existing").expect("seed existing image");
let saved_path = save_image_generation_result(&codex_home, "session-1", "ig_overwrite", "Zm9v")
.await
.expect("image should be saved");
let saved_path = save_image_generation_result(
&LocalArtifactStore::from_codex_home(&codex_home),
"session-1",
"ig_overwrite",
"Zm9v",
)
.await
.expect("image should be saved");
assert_eq!(saved_path, existing_path);
assert_eq!(std::fs::read(&saved_path).expect("saved file"), b"foo");
@@ -475,9 +490,14 @@ async fn save_image_generation_result_sanitizes_call_id_for_codex_home_output_pa
let expected_path = image_generation_artifact_path(&codex_home, "session-1", "../ig/..");
let _ = std::fs::remove_file(&expected_path);
let saved_path = save_image_generation_result(&codex_home, "session-1", "../ig/..", "Zm9v")
.await
.expect("image should be saved");
let saved_path = save_image_generation_result(
&LocalArtifactStore::from_codex_home(&codex_home),
"session-1",
"../ig/..",
"Zm9v",
)
.await
.expect("image should be saved");
assert_eq!(saved_path, expected_path);
assert_eq!(std::fs::read(&saved_path).expect("saved file"), b"foo");
@@ -488,9 +508,14 @@ async fn save_image_generation_result_sanitizes_call_id_for_codex_home_output_pa
async fn save_image_generation_result_rejects_non_standard_base64() {
let codex_home = tempfile::tempdir().expect("create codex home");
let codex_home = codex_home.path().abs();
let err = save_image_generation_result(&codex_home, "session-1", "ig_urlsafe", "_-8")
.await
.expect_err("non-standard base64 should error");
let err = save_image_generation_result(
&LocalArtifactStore::from_codex_home(&codex_home),
"session-1",
"ig_urlsafe",
"_-8",
)
.await
.expect_err("non-standard base64 should error");
assert!(matches!(err, CodexErr::InvalidRequest(_)));
}
@@ -499,7 +524,7 @@ async fn save_image_generation_result_rejects_non_base64_data_urls() {
let codex_home = tempfile::tempdir().expect("create codex home");
let codex_home = codex_home.path().abs();
let err = save_image_generation_result(
&codex_home,
&LocalArtifactStore::from_codex_home(&codex_home),
"session-1",
"ig_svg",
"data:image/svg+xml,<svg/>",

View File

@@ -1,5 +1,7 @@
use crate::SkillsManager;
use crate::agent::AgentControl;
use crate::artifact_store::ArtifactStore;
use crate::artifact_store::LocalArtifactStore;
use crate::attestation::AttestationProvider;
use crate::codex_thread::CodexThread;
use crate::config::Config;
@@ -13,7 +15,9 @@ use crate::session::CodexSpawnArgs;
use crate::session::CodexSpawnOk;
use crate::session::INITIAL_SUBMIT_ID;
use crate::session::resolve_multi_agent_version;
use crate::shell_snapshot::LocalShellSnapshotStore;
use crate::shell_snapshot::ShellSnapshot;
use crate::shell_snapshot::ShellSnapshotStore;
use crate::tasks::InterruptedTurnHistoryMarker;
use crate::tasks::interrupted_turn_history_marker;
use codex_analytics::AnalyticsEventsClient;
@@ -173,6 +177,31 @@ pub struct ThreadManager {
_test_codex_home_guard: Option<TempCodexHomeGuard>,
}
/// Required storage backends owned by [`ThreadManager`].
///
/// The normal constructor builds this bundle with [`Self::local`]. Cloud and
/// embedded hosts can replace the full bundle here without asking core to
/// treat `codex_home` as a remotely mounted filesystem.
#[derive(Clone)]
pub struct ThreadManagerStorageDeps {
/// Stores generated artifacts and materializes executor-readable paths.
pub artifact_store: Arc<dyn ArtifactStore>,
/// Stores executor-local shell snapshots used by shell startup.
pub shell_snapshot_store: Arc<dyn ShellSnapshotStore>,
}
impl ThreadManagerStorageDeps {
/// Builds the standard local backends rooted under the configured Codex home.
pub fn local(config: &Config) -> Self {
Self {
artifact_store: Arc::new(LocalArtifactStore::from_codex_home(&config.codex_home)),
shell_snapshot_store: Arc::new(LocalShellSnapshotStore::from_codex_home(
&config.codex_home,
)),
}
}
}
pub struct StartThreadOptions {
pub config: Config,
pub initial_history: InitialHistory,
@@ -208,6 +237,8 @@ pub(crate) struct ThreadManagerState {
mcp_manager: Arc<McpManager>,
extensions: Arc<ExtensionRegistry<Config>>,
thread_store: Arc<dyn ThreadStore>,
artifact_store: Arc<dyn ArtifactStore>,
shell_snapshot_store: Arc<dyn ShellSnapshotStore>,
attestation_provider: Option<Arc<dyn AttestationProvider>>,
session_source: SessionSource,
installation_id: String,
@@ -263,6 +294,104 @@ impl ThreadManager {
installation_id: String,
attestation_provider: Option<Arc<dyn AttestationProvider>>,
) -> Self {
Self::new_with_storage_deps(
config,
auth_manager,
session_source,
environment_manager,
extensions,
analytics_events_client,
thread_store,
ThreadManagerStorageDeps::local(config),
state_db,
installation_id,
attestation_provider,
)
}
#[allow(clippy::too_many_arguments)]
/// Constructs a thread manager with an explicit generated artifact backend.
pub fn new_with_artifact_store(
config: &Config,
auth_manager: Arc<AuthManager>,
session_source: SessionSource,
environment_manager: Arc<EnvironmentManager>,
extensions: Arc<ExtensionRegistry<Config>>,
analytics_events_client: Option<AnalyticsEventsClient>,
thread_store: Arc<dyn ThreadStore>,
artifact_store: Arc<dyn ArtifactStore>,
state_db: Option<StateDbHandle>,
installation_id: String,
attestation_provider: Option<Arc<dyn AttestationProvider>>,
) -> Self {
let mut storage_deps = ThreadManagerStorageDeps::local(config);
storage_deps.artifact_store = artifact_store;
Self::new_with_storage_deps(
config,
auth_manager,
session_source,
environment_manager,
extensions,
analytics_events_client,
thread_store,
storage_deps,
state_db,
installation_id,
attestation_provider,
)
}
#[allow(clippy::too_many_arguments)]
/// Constructs a thread manager with an explicit shell snapshot backend.
pub fn new_with_shell_snapshot_store(
config: &Config,
auth_manager: Arc<AuthManager>,
session_source: SessionSource,
environment_manager: Arc<EnvironmentManager>,
extensions: Arc<ExtensionRegistry<Config>>,
analytics_events_client: Option<AnalyticsEventsClient>,
thread_store: Arc<dyn ThreadStore>,
shell_snapshot_store: Arc<dyn ShellSnapshotStore>,
state_db: Option<StateDbHandle>,
installation_id: String,
attestation_provider: Option<Arc<dyn AttestationProvider>>,
) -> Self {
let mut storage_deps = ThreadManagerStorageDeps::local(config);
storage_deps.shell_snapshot_store = shell_snapshot_store;
Self::new_with_storage_deps(
config,
auth_manager,
session_source,
environment_manager,
extensions,
analytics_events_client,
thread_store,
storage_deps,
state_db,
installation_id,
attestation_provider,
)
}
#[allow(clippy::too_many_arguments)]
/// Constructs a thread manager with its required storage backend bundle.
pub fn new_with_storage_deps(
config: &Config,
auth_manager: Arc<AuthManager>,
session_source: SessionSource,
environment_manager: Arc<EnvironmentManager>,
extensions: Arc<ExtensionRegistry<Config>>,
analytics_events_client: Option<AnalyticsEventsClient>,
thread_store: Arc<dyn ThreadStore>,
storage_deps: ThreadManagerStorageDeps,
state_db: Option<StateDbHandle>,
installation_id: String,
attestation_provider: Option<Arc<dyn AttestationProvider>>,
) -> Self {
let ThreadManagerStorageDeps {
artifact_store,
shell_snapshot_store,
} = storage_deps;
let codex_home = config.codex_home.clone();
let restriction_product = session_source.restriction_product();
let (thread_created_tx, _) = broadcast::channel(THREAD_CREATED_CHANNEL_CAPACITY);
@@ -287,6 +416,8 @@ impl ThreadManager {
mcp_manager,
extensions,
thread_store,
artifact_store,
shell_snapshot_store,
attestation_provider,
auth_manager,
session_source,
@@ -361,8 +492,10 @@ impl ThreadManager {
restriction_product,
));
let mcp_manager = Arc::new(McpManager::new(Arc::clone(&plugins_manager)));
let shell_snapshot_store: Arc<dyn ShellSnapshotStore> =
Arc::new(LocalShellSnapshotStore::from_codex_home(&skills_codex_home));
let skills_manager = Arc::new(SkillsManager::new_with_restriction_product(
skills_codex_home,
skills_codex_home.clone(),
/*bundled_skills_enabled*/ true,
restriction_product,
));
@@ -376,6 +509,8 @@ impl ThreadManager {
},
state_db.clone(),
));
let artifact_store: Arc<dyn ArtifactStore> =
Arc::new(LocalArtifactStore::from_codex_home(&skills_codex_home));
Self {
state: Arc::new(ThreadManagerState {
threads: Arc::new(RwLock::new(HashMap::new())),
@@ -388,6 +523,8 @@ impl ThreadManager {
mcp_manager,
extensions: empty_extension_registry(),
thread_store,
artifact_store,
shell_snapshot_store,
attestation_provider: None,
auth_manager,
session_source: SessionSource::Exec,
@@ -1334,6 +1471,8 @@ impl ThreadManagerState {
environment_selections,
analytics_events_client: self.analytics_events_client.clone(),
thread_store: Arc::clone(&self.thread_store),
artifact_store: Arc::clone(&self.artifact_store),
shell_snapshot_store: Arc::clone(&self.shell_snapshot_store),
attestation_provider: self.attestation_provider.clone(),
inherited_multi_agent_version: multi_agent_version,
}))

View File

@@ -1,10 +1,14 @@
use super::*;
use crate::artifact_store::ArtifactStore;
use crate::artifact_store::ArtifactWriteFuture;
use crate::config::test_config;
use crate::init_state_db;
use crate::installation_id::INSTALLATION_ID_FILENAME;
use crate::rollout::RolloutRecorder;
use crate::session::session::SessionSettingsUpdate;
use crate::session::tests::make_session_and_context;
use crate::shell_snapshot::ShellSnapshotPaths;
use crate::shell_snapshot::ShellSnapshotStoreFuture;
use crate::tasks::InterruptedTurnHistoryMarker;
use crate::tasks::interrupted_turn_history_marker;
use codex_extension_api::empty_extension_registry;
@@ -33,6 +37,43 @@ use wiremock::MockServer;
const TEST_INSTALLATION_ID: &str = "11111111-1111-4111-8111-111111111111";
struct FakeArtifactStore;
impl ArtifactStore for FakeArtifactStore {
fn generated_image_path(&self, _session_id: &str, _call_id: &str) -> AbsolutePathBuf {
unreachable!("generated image path should not be requested")
}
fn write_generated_image(
&self,
_session_id: &str,
_call_id: &str,
_bytes: Vec<u8>,
) -> ArtifactWriteFuture<'_> {
unreachable!("generated image write should not be requested")
}
}
struct FakeShellSnapshotStore;
impl ShellSnapshotStore for FakeShellSnapshotStore {
fn snapshot_paths(
&self,
_session_id: ThreadId,
_shell_type: crate::shell::ShellType,
) -> ShellSnapshotPaths {
unreachable!("shell snapshot paths should not be requested")
}
fn cleanup_stale_snapshots(
&self,
_active_session_id: ThreadId,
_state_db: Option<StateDbHandle>,
) -> ShellSnapshotStoreFuture<'_> {
unreachable!("shell snapshot cleanup should not be requested")
}
}
fn user_msg(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
@@ -64,6 +105,39 @@ fn developer_interrupted_marker() -> ResponseItem {
.expect("developer interrupted marker should be enabled")
}
#[tokio::test]
async fn start_thread_uses_injected_storage_deps() {
let config = test_config().await;
let artifact_store: Arc<dyn ArtifactStore> = Arc::new(FakeArtifactStore);
let shell_snapshot_store: Arc<dyn ShellSnapshotStore> = Arc::new(FakeShellSnapshotStore);
let manager = ThreadManager::new_with_storage_deps(
&config,
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing()),
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
empty_extension_registry(),
/*analytics_events_client*/ None,
thread_store_from_config(&config, /*state_db*/ None),
ThreadManagerStorageDeps {
artifact_store: Arc::clone(&artifact_store),
shell_snapshot_store: Arc::clone(&shell_snapshot_store),
},
/*state_db*/ None,
TEST_INSTALLATION_ID.to_string(),
/*attestation_provider*/ None,
);
let thread = manager.start_thread(config).await.expect("start thread");
assert!(Arc::ptr_eq(
&thread.thread.codex.session.services.artifact_store,
&artifact_store
));
assert!(Arc::ptr_eq(
&thread.thread.codex.session.services.shell_snapshot_store,
&shell_snapshot_store
));
}
#[test]
fn truncates_before_requested_user_message() {
let items = [