mirror of
https://github.com/openai/codex.git
synced 2026-06-03 11:52:03 +00:00
Compare commits
5 Commits
starr/agen
...
starr/agen
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
97ad22d373 | ||
|
|
acc274c12a | ||
|
|
4bbb82b331 | ||
|
|
8f4d1c2ce6 | ||
|
|
eeb7b76548 |
@@ -25,6 +25,12 @@ use std::io::Result as IoResult;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
pub use codex_app_server::AppServerRuntimeStorageDeps;
|
||||
pub use codex_app_server::ArtifactStore;
|
||||
pub use codex_app_server::LocalArtifactStore;
|
||||
pub use codex_app_server::LocalShellSnapshotStore;
|
||||
pub use codex_app_server::ShellSnapshotStore;
|
||||
pub use codex_app_server::ThreadManagerStorageDeps;
|
||||
pub use codex_app_server::app_server_control_socket_path;
|
||||
pub use codex_app_server::in_process::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY;
|
||||
pub use codex_app_server::in_process::InProcessServerEvent;
|
||||
@@ -489,9 +495,20 @@ impl InProcessAppServerClient {
|
||||
/// internal event queue is saturated later, server requests are rejected
|
||||
/// with overload error instead of being silently dropped.
|
||||
pub async fn start(args: InProcessClientStartArgs) -> IoResult<Self> {
|
||||
Self::start_with_runtime_storage_deps(args, AppServerRuntimeStorageDeps::default()).await
|
||||
}
|
||||
|
||||
/// Starts the in-process runtime with explicit thread storage backends.
|
||||
pub async fn start_with_runtime_storage_deps(
|
||||
args: InProcessClientStartArgs,
|
||||
runtime_storage_deps: AppServerRuntimeStorageDeps,
|
||||
) -> IoResult<Self> {
|
||||
let channel_capacity = args.channel_capacity.max(1);
|
||||
let mut handle =
|
||||
codex_app_server::in_process::start(args.into_runtime_start_args()).await?;
|
||||
let mut handle = codex_app_server::in_process::start_with_runtime_storage_deps(
|
||||
args.into_runtime_start_args(),
|
||||
runtime_storage_deps,
|
||||
)
|
||||
.await?;
|
||||
let request_sender = handle.sender();
|
||||
let (command_tx, mut command_rx) = mpsc::channel::<ClientCommand>(channel_capacity);
|
||||
let (event_tx, event_rx) = mpsc::channel::<InProcessServerEvent>(channel_capacity);
|
||||
|
||||
@@ -50,6 +50,7 @@ use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::AppServerRuntimeStorageDeps;
|
||||
use crate::analytics_utils::analytics_events_client_from_config;
|
||||
use crate::config_manager::ConfigManager;
|
||||
use crate::error_code::OVERLOADED_ERROR_CODE;
|
||||
@@ -348,8 +349,19 @@ impl InProcessClientHandle {
|
||||
/// the handle, so callers receive a ready-to-use runtime. If initialize fails,
|
||||
/// the runtime is shut down and an `InvalidData` error is returned.
|
||||
pub async fn start(args: InProcessStartArgs) -> IoResult<InProcessClientHandle> {
|
||||
start_with_runtime_storage_deps(args, AppServerRuntimeStorageDeps::default()).await
|
||||
}
|
||||
|
||||
/// Starts an in-process app-server runtime with explicit thread storage backends.
|
||||
///
|
||||
/// This is the embedded-host path for supplying executor-visible artifact and
|
||||
/// shell snapshot stores without changing the existing start-argument shape.
|
||||
pub async fn start_with_runtime_storage_deps(
|
||||
args: InProcessStartArgs,
|
||||
runtime_storage_deps: AppServerRuntimeStorageDeps,
|
||||
) -> IoResult<InProcessClientHandle> {
|
||||
let initialize = args.initialize.clone();
|
||||
let client = start_uninitialized(args).await?;
|
||||
let client = start_uninitialized(args, runtime_storage_deps).await?;
|
||||
|
||||
let initialize_response = client
|
||||
.request(ClientRequest::Initialize {
|
||||
@@ -369,7 +381,10 @@ pub async fn start(args: InProcessStartArgs) -> IoResult<InProcessClientHandle>
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
async fn start_uninitialized(args: InProcessStartArgs) -> IoResult<InProcessClientHandle> {
|
||||
async fn start_uninitialized(
|
||||
args: InProcessStartArgs,
|
||||
runtime_storage_deps: AppServerRuntimeStorageDeps,
|
||||
) -> IoResult<InProcessClientHandle> {
|
||||
let channel_capacity = args.channel_capacity.max(1);
|
||||
let installation_id = resolve_installation_id(&args.config.codex_home).await?;
|
||||
let (client_tx, mut client_rx) = mpsc::channel::<InProcessClientMessage>(channel_capacity);
|
||||
@@ -438,6 +453,7 @@ async fn start_uninitialized(args: InProcessStartArgs) -> IoResult<InProcessClie
|
||||
rpc_transport: AppServerRpcTransport::InProcess,
|
||||
remote_control_handle: None,
|
||||
plugin_startup_tasks: crate::PluginStartupTasks::Start,
|
||||
runtime_storage_deps,
|
||||
}));
|
||||
let mut thread_created_rx = processor.thread_created_receiver();
|
||||
let session = Arc::new(ConnectionSessionState::new());
|
||||
|
||||
@@ -6,8 +6,13 @@ use codex_config::LoaderOverrides;
|
||||
use codex_config::NoopThreadConfigLoader;
|
||||
use codex_config::RemoteThreadConfigLoader;
|
||||
use codex_config::ThreadConfigLoader;
|
||||
pub use codex_core::ThreadManagerStorageDeps;
|
||||
pub use codex_core::artifact_store::ArtifactStore;
|
||||
pub use codex_core::artifact_store::LocalArtifactStore;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::resolve_installation_id;
|
||||
pub use codex_core::shell_snapshot::LocalShellSnapshotStore;
|
||||
pub use codex_core::shell_snapshot::ShellSnapshotStore;
|
||||
use codex_login::AuthManager;
|
||||
use codex_utils_cli::CliConfigOverrides;
|
||||
use std::collections::HashMap;
|
||||
@@ -416,6 +421,16 @@ impl Default for AppServerRuntimeOptions {
|
||||
}
|
||||
}
|
||||
|
||||
/// Storage backends an app-server host can inject into managed threads.
|
||||
///
|
||||
/// Leaving these fields unset preserves the standard local `codex_home`
|
||||
/// behavior used by the existing app-server entrypoints.
|
||||
#[derive(Clone, Default)]
|
||||
pub struct AppServerRuntimeStorageDeps {
|
||||
/// Optional executor-visible storage backends for app-server threads.
|
||||
pub thread_manager: Option<ThreadManagerStorageDeps>,
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn run_main_with_transport_options(
|
||||
arg0_paths: Arg0DispatchPaths,
|
||||
@@ -427,6 +442,35 @@ pub async fn run_main_with_transport_options(
|
||||
session_source: SessionSource,
|
||||
auth: AppServerWebsocketAuthSettings,
|
||||
runtime_options: AppServerRuntimeOptions,
|
||||
) -> IoResult<()> {
|
||||
run_main_with_transport_options_and_storage_deps(
|
||||
arg0_paths,
|
||||
cli_config_overrides,
|
||||
loader_overrides,
|
||||
strict_config,
|
||||
default_analytics_enabled,
|
||||
transport,
|
||||
session_source,
|
||||
auth,
|
||||
runtime_options,
|
||||
AppServerRuntimeStorageDeps::default(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Runs app-server with explicit storage backends for managed threads.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn run_main_with_transport_options_and_storage_deps(
|
||||
arg0_paths: Arg0DispatchPaths,
|
||||
cli_config_overrides: CliConfigOverrides,
|
||||
loader_overrides: LoaderOverrides,
|
||||
strict_config: bool,
|
||||
default_analytics_enabled: bool,
|
||||
transport: AppServerTransport,
|
||||
session_source: SessionSource,
|
||||
auth: AppServerWebsocketAuthSettings,
|
||||
runtime_options: AppServerRuntimeOptions,
|
||||
runtime_storage_deps: AppServerRuntimeStorageDeps,
|
||||
) -> IoResult<()> {
|
||||
let (transport_event_tx, mut transport_event_rx) =
|
||||
mpsc::channel::<TransportEvent>(CHANNEL_CAPACITY);
|
||||
@@ -815,6 +859,7 @@ pub async fn run_main_with_transport_options(
|
||||
rpc_transport: analytics_rpc_transport(&transport),
|
||||
remote_control_handle: Some(remote_control_handle.clone()),
|
||||
plugin_startup_tasks: runtime_options.plugin_startup_tasks,
|
||||
runtime_storage_deps,
|
||||
}));
|
||||
let mut thread_created_rx = processor.thread_created_receiver();
|
||||
let mut running_turn_count_rx = processor.subscribe_running_assistant_turn_count();
|
||||
|
||||
@@ -272,6 +272,7 @@ pub(crate) struct MessageProcessorArgs {
|
||||
pub(crate) rpc_transport: AppServerRpcTransport,
|
||||
pub(crate) remote_control_handle: Option<RemoteControlHandle>,
|
||||
pub(crate) plugin_startup_tasks: crate::PluginStartupTasks,
|
||||
pub(crate) runtime_storage_deps: crate::AppServerRuntimeStorageDeps,
|
||||
}
|
||||
|
||||
impl MessageProcessor {
|
||||
@@ -295,6 +296,7 @@ impl MessageProcessor {
|
||||
rpc_transport,
|
||||
remote_control_handle,
|
||||
plugin_startup_tasks,
|
||||
runtime_storage_deps,
|
||||
} = args;
|
||||
auth_manager.set_external_auth(Arc::new(ExternalAuthRefreshBridge {
|
||||
outgoing: outgoing.clone(),
|
||||
@@ -305,26 +307,45 @@ impl MessageProcessor {
|
||||
// resumed, or forked threads to a different persistence backend/root.
|
||||
let thread_store = codex_core::thread_store_from_config(config.as_ref(), state_db.clone());
|
||||
let environment_manager_for_requests = Arc::clone(&environment_manager);
|
||||
let thread_manager_storage_deps = runtime_storage_deps.thread_manager;
|
||||
let thread_manager = Arc::new_cyclic(|thread_manager| {
|
||||
ThreadManager::new(
|
||||
config.as_ref(),
|
||||
let thread_manager_storage_deps = thread_manager_storage_deps.clone();
|
||||
let extensions = thread_extensions(
|
||||
guardian_agent_spawner(thread_manager.clone()),
|
||||
app_server_extension_event_sink(outgoing.clone()),
|
||||
auth_manager.clone(),
|
||||
session_source,
|
||||
environment_manager,
|
||||
thread_extensions(
|
||||
guardian_agent_spawner(thread_manager.clone()),
|
||||
app_server_extension_event_sink(outgoing.clone()),
|
||||
);
|
||||
let attestation_provider = Some(app_server_attestation_provider(
|
||||
outgoing.clone(),
|
||||
thread_state_manager.clone(),
|
||||
));
|
||||
match thread_manager_storage_deps {
|
||||
Some(storage_deps) => ThreadManager::new_with_storage_deps(
|
||||
config.as_ref(),
|
||||
auth_manager.clone(),
|
||||
session_source,
|
||||
environment_manager,
|
||||
extensions,
|
||||
Some(analytics_events_client.clone()),
|
||||
Arc::clone(&thread_store),
|
||||
storage_deps,
|
||||
state_db.clone(),
|
||||
installation_id,
|
||||
attestation_provider,
|
||||
),
|
||||
Some(analytics_events_client.clone()),
|
||||
Arc::clone(&thread_store),
|
||||
state_db.clone(),
|
||||
installation_id,
|
||||
Some(app_server_attestation_provider(
|
||||
outgoing.clone(),
|
||||
thread_state_manager.clone(),
|
||||
)),
|
||||
)
|
||||
None => ThreadManager::new(
|
||||
config.as_ref(),
|
||||
auth_manager.clone(),
|
||||
session_source,
|
||||
environment_manager,
|
||||
extensions,
|
||||
Some(analytics_events_client.clone()),
|
||||
Arc::clone(&thread_store),
|
||||
state_db.clone(),
|
||||
installation_id,
|
||||
attestation_provider,
|
||||
),
|
||||
}
|
||||
});
|
||||
thread_manager
|
||||
.plugins_manager()
|
||||
|
||||
@@ -267,6 +267,7 @@ async fn build_test_processor(
|
||||
rpc_transport: AppServerRpcTransport::Stdio,
|
||||
remote_control_handle: None,
|
||||
plugin_startup_tasks: crate::PluginStartupTasks::Start,
|
||||
runtime_storage_deps: crate::AppServerRuntimeStorageDeps::default(),
|
||||
}));
|
||||
(processor, outgoing_rx)
|
||||
}
|
||||
|
||||
80
codex-rs/core/src/artifact_store.rs
Normal file
80
codex-rs/core/src/artifact_store.rs
Normal file
@@ -0,0 +1,80 @@
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
|
||||
use codex_protocol::error::Result;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
|
||||
const GENERATED_IMAGE_ARTIFACTS_DIR: &str = "generated_images";
|
||||
|
||||
/// Stores generated artifacts and materializes executor-readable local paths.
|
||||
pub trait ArtifactStore: Send + Sync + 'static {
|
||||
/// Returns the executor-readable path exposed to generated image instructions.
|
||||
fn generated_image_path(&self, session_id: &str, call_id: &str) -> AbsolutePathBuf;
|
||||
|
||||
/// Persists a generated image payload and returns its executor-readable path.
|
||||
fn write_generated_image(
|
||||
&self,
|
||||
session_id: &str,
|
||||
call_id: &str,
|
||||
bytes: Vec<u8>,
|
||||
) -> ArtifactWriteFuture<'_>;
|
||||
}
|
||||
|
||||
/// Future returned by artifact writes.
|
||||
pub type ArtifactWriteFuture<'a> =
|
||||
Pin<Box<dyn Future<Output = Result<AbsolutePathBuf>> + Send + 'a>>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct LocalArtifactStore {
|
||||
codex_home: AbsolutePathBuf,
|
||||
}
|
||||
|
||||
impl LocalArtifactStore {
|
||||
pub fn from_codex_home(codex_home: &AbsolutePathBuf) -> Self {
|
||||
Self {
|
||||
codex_home: codex_home.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ArtifactStore for LocalArtifactStore {
|
||||
fn generated_image_path(&self, session_id: &str, call_id: &str) -> AbsolutePathBuf {
|
||||
self.codex_home
|
||||
.join(GENERATED_IMAGE_ARTIFACTS_DIR)
|
||||
.join(sanitize_artifact_path_component(session_id))
|
||||
.join(format!("{}.png", sanitize_artifact_path_component(call_id)))
|
||||
}
|
||||
|
||||
fn write_generated_image(
|
||||
&self,
|
||||
session_id: &str,
|
||||
call_id: &str,
|
||||
bytes: Vec<u8>,
|
||||
) -> ArtifactWriteFuture<'_> {
|
||||
let path = self.generated_image_path(session_id, call_id);
|
||||
Box::pin(async move {
|
||||
if let Some(parent) = path.parent() {
|
||||
tokio::fs::create_dir_all(parent).await?;
|
||||
}
|
||||
tokio::fs::write(&path, bytes).await?;
|
||||
Ok(path)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn sanitize_artifact_path_component(value: &str) -> String {
|
||||
let mut sanitized: String = value
|
||||
.chars()
|
||||
.map(|ch| {
|
||||
if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
|
||||
ch
|
||||
} else {
|
||||
'_'
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
if sanitized.is_empty() {
|
||||
sanitized = "generated_image".to_string();
|
||||
}
|
||||
sanitized
|
||||
}
|
||||
@@ -105,6 +105,8 @@ pub(crate) async fn run_codex_thread_interactive(
|
||||
environment_selections: parent_ctx.environments.clone(),
|
||||
analytics_events_client: Some(parent_session.services.analytics_events_client.clone()),
|
||||
thread_store: Arc::clone(&parent_session.services.thread_store),
|
||||
artifact_store: Arc::clone(&parent_session.services.artifact_store),
|
||||
shell_snapshot_store: Arc::clone(&parent_session.services.shell_snapshot_store),
|
||||
attestation_provider: parent_session.services.attestation_provider.clone(),
|
||||
inherited_multi_agent_version: Some(MultiAgentVersion::Disabled),
|
||||
}))
|
||||
|
||||
@@ -24,6 +24,7 @@ pub use codex_thread::CodexThreadSettingsOverrides;
|
||||
pub use codex_thread::ThreadConfigSnapshot;
|
||||
pub use session::turn_context::TurnContext;
|
||||
mod agent;
|
||||
pub mod artifact_store;
|
||||
mod attestation;
|
||||
mod codex_delegate;
|
||||
mod command_canonicalization;
|
||||
@@ -112,6 +113,7 @@ pub use thread_manager::ForkSnapshot;
|
||||
pub use thread_manager::NewThread;
|
||||
pub use thread_manager::StartThreadOptions;
|
||||
pub use thread_manager::ThreadManager;
|
||||
pub use thread_manager::ThreadManagerStorageDeps;
|
||||
pub use thread_manager::ThreadShutdownReport;
|
||||
pub use thread_manager::build_models_manager;
|
||||
pub use thread_manager::thread_store_from_config;
|
||||
@@ -132,7 +134,7 @@ mod rollout;
|
||||
pub(crate) mod safety;
|
||||
mod session_rollout_init_error;
|
||||
pub mod shell;
|
||||
pub(crate) mod shell_snapshot;
|
||||
pub mod shell_snapshot;
|
||||
pub mod spawn;
|
||||
pub(crate) mod state_db_bridge;
|
||||
pub use state_db_bridge::StateDbHandle;
|
||||
|
||||
@@ -13,6 +13,7 @@ use crate::agent::AgentControl;
|
||||
use crate::agent::AgentStatus;
|
||||
use crate::agent::agent_status_from_event;
|
||||
use crate::agent::status::is_final;
|
||||
use crate::artifact_store::ArtifactStore;
|
||||
use crate::attestation::AttestationProvider;
|
||||
use crate::build_available_skills;
|
||||
use crate::compact;
|
||||
@@ -419,6 +420,8 @@ pub(crate) struct CodexSpawnArgs {
|
||||
pub(crate) environment_selections: ResolvedTurnEnvironments,
|
||||
pub(crate) analytics_events_client: Option<AnalyticsEventsClient>,
|
||||
pub(crate) thread_store: Arc<dyn ThreadStore>,
|
||||
pub(crate) artifact_store: Arc<dyn ArtifactStore>,
|
||||
pub(crate) shell_snapshot_store: Arc<dyn crate::shell_snapshot::ShellSnapshotStore>,
|
||||
pub(crate) attestation_provider: Option<Arc<dyn AttestationProvider>>,
|
||||
pub(crate) inherited_multi_agent_version: Option<MultiAgentVersion>,
|
||||
}
|
||||
@@ -499,6 +502,8 @@ impl Codex {
|
||||
environment_selections,
|
||||
analytics_events_client,
|
||||
thread_store,
|
||||
artifact_store,
|
||||
shell_snapshot_store,
|
||||
attestation_provider,
|
||||
inherited_multi_agent_version,
|
||||
} = args;
|
||||
@@ -645,6 +650,8 @@ impl Codex {
|
||||
environment_manager,
|
||||
analytics_events_client,
|
||||
thread_store,
|
||||
artifact_store,
|
||||
shell_snapshot_store,
|
||||
parent_rollout_thread_trace,
|
||||
attestation_provider,
|
||||
multi_agent_version,
|
||||
@@ -1349,7 +1356,6 @@ impl Session {
|
||||
&self,
|
||||
previous_cwd: &AbsolutePathBuf,
|
||||
next_cwd: &AbsolutePathBuf,
|
||||
codex_home: &AbsolutePathBuf,
|
||||
session_source: &SessionSource,
|
||||
) {
|
||||
if previous_cwd == next_cwd {
|
||||
@@ -1368,7 +1374,7 @@ impl Session {
|
||||
}
|
||||
|
||||
ShellSnapshot::refresh_snapshot(
|
||||
codex_home.clone(),
|
||||
Arc::clone(&self.services.shell_snapshot_store),
|
||||
self.conversation_id,
|
||||
next_cwd.clone(),
|
||||
self.services.user_shell.as_ref().clone(),
|
||||
@@ -1389,7 +1395,6 @@ impl Session {
|
||||
previous_cwd,
|
||||
permission_profile_changed,
|
||||
next_cwd,
|
||||
codex_home,
|
||||
session_source,
|
||||
) = {
|
||||
let mut state = self.state.lock().await;
|
||||
@@ -1411,7 +1416,6 @@ impl Session {
|
||||
let permission_profile_changed =
|
||||
previous_permission_profile != updated_permission_profile;
|
||||
let next_cwd = updated.cwd.clone();
|
||||
let codex_home = updated.codex_home.clone();
|
||||
let session_source = updated.session_source.clone();
|
||||
state.session_configuration = updated;
|
||||
(
|
||||
@@ -1420,18 +1424,12 @@ impl Session {
|
||||
previous_cwd,
|
||||
permission_profile_changed,
|
||||
next_cwd,
|
||||
codex_home,
|
||||
session_source,
|
||||
)
|
||||
};
|
||||
|
||||
self.emit_config_changed_contributors(previous_config.as_ref(), new_config.as_ref());
|
||||
self.maybe_refresh_shell_snapshot_for_cwd(
|
||||
&previous_cwd,
|
||||
&next_cwd,
|
||||
&codex_home,
|
||||
&session_source,
|
||||
);
|
||||
self.maybe_refresh_shell_snapshot_for_cwd(&previous_cwd, &next_cwd, &session_source);
|
||||
if permission_profile_changed {
|
||||
self.refresh_managed_network_proxy_for_current_permission_profile()
|
||||
.await;
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use super::input_queue::InputQueue;
|
||||
use super::*;
|
||||
use crate::artifact_store::ArtifactStore;
|
||||
use crate::config::ConstraintError;
|
||||
use crate::goals::GoalRuntimeState;
|
||||
use crate::skills::SkillError;
|
||||
@@ -504,6 +505,8 @@ impl Session {
|
||||
environment_manager: Arc<EnvironmentManager>,
|
||||
analytics_events_client: Option<AnalyticsEventsClient>,
|
||||
thread_store: Arc<dyn ThreadStore>,
|
||||
artifact_store: Arc<dyn ArtifactStore>,
|
||||
shell_snapshot_store: Arc<dyn crate::shell_snapshot::ShellSnapshotStore>,
|
||||
parent_rollout_thread_trace: ThreadTraceContext,
|
||||
attestation_provider: Option<Arc<dyn AttestationProvider>>,
|
||||
multi_agent_version: Option<MultiAgentVersion>,
|
||||
@@ -859,7 +862,7 @@ impl Session {
|
||||
tx
|
||||
} else {
|
||||
ShellSnapshot::start_snapshotting(
|
||||
config.codex_home.clone(),
|
||||
Arc::clone(&shell_snapshot_store),
|
||||
thread_id,
|
||||
session_configuration.cwd.clone(),
|
||||
&mut default_shell,
|
||||
@@ -1006,6 +1009,7 @@ impl Session {
|
||||
rollout_thread_trace,
|
||||
user_shell: Arc::new(default_shell),
|
||||
shell_snapshot_tx,
|
||||
shell_snapshot_store,
|
||||
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
|
||||
exec_policy,
|
||||
auth_manager: Arc::clone(&auth_manager),
|
||||
@@ -1030,6 +1034,7 @@ impl Session {
|
||||
state_db: state_db_ctx.clone(),
|
||||
live_thread: live_thread_init.as_ref().cloned(),
|
||||
thread_store: Arc::clone(&thread_store),
|
||||
artifact_store,
|
||||
attestation_provider: attestation_provider.clone(),
|
||||
model_client: ModelClient::new(
|
||||
Some(Arc::clone(&auth_manager)),
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
use super::turn_context::TurnEnvironment;
|
||||
use super::*;
|
||||
use crate::artifact_store::ArtifactStore;
|
||||
use crate::artifact_store::ArtifactWriteFuture;
|
||||
use crate::config::ConfigBuilder;
|
||||
use crate::config::ConfigOverrides;
|
||||
use crate::config::test_config;
|
||||
@@ -141,6 +143,7 @@ use codex_protocol::protocol::W3cTraceContext;
|
||||
use codex_protocol::request_user_input::RequestUserInputAnswer;
|
||||
use codex_protocol::request_user_input::RequestUserInputResponse;
|
||||
use codex_rmcp_client::ElicitationAction;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use core_test_support::PathBufExt;
|
||||
use core_test_support::PathExt;
|
||||
use core_test_support::context_snapshot;
|
||||
@@ -190,6 +193,32 @@ struct InstructionsTestCase {
|
||||
expects_apply_patch_description: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct FakeArtifactStore {
|
||||
writes: Arc<std::sync::Mutex<Vec<Vec<u8>>>>,
|
||||
}
|
||||
|
||||
impl ArtifactStore for FakeArtifactStore {
|
||||
fn generated_image_path(&self, session_id: &str, call_id: &str) -> AbsolutePathBuf {
|
||||
let path = format!("/fake/{session_id}/{call_id}.png");
|
||||
test_path_buf(&path).abs()
|
||||
}
|
||||
|
||||
fn write_generated_image(
|
||||
&self,
|
||||
session_id: &str,
|
||||
call_id: &str,
|
||||
bytes: Vec<u8>,
|
||||
) -> ArtifactWriteFuture<'_> {
|
||||
let writes = Arc::clone(&self.writes);
|
||||
let path = self.generated_image_path(session_id, call_id);
|
||||
Box::pin(async move {
|
||||
writes.lock().expect("lock writes").push(bytes);
|
||||
Ok(path)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn user_message(text: &str) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
@@ -4603,6 +4632,12 @@ async fn session_new_fails_when_zsh_fork_enabled_without_packaged_zsh() {
|
||||
codex_thread_store::LocalThreadStoreConfig::from_config(config.as_ref()),
|
||||
/*state_db*/ None,
|
||||
)),
|
||||
Arc::new(crate::artifact_store::LocalArtifactStore::from_codex_home(
|
||||
&config.codex_home,
|
||||
)),
|
||||
Arc::new(
|
||||
crate::shell_snapshot::LocalShellSnapshotStore::from_codex_home(&config.codex_home),
|
||||
),
|
||||
codex_rollout_trace::ThreadTraceContext::disabled(),
|
||||
/*attestation_provider*/ None,
|
||||
Some(config.multi_agent_version_from_features()),
|
||||
@@ -4735,6 +4770,9 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
|
||||
rollout_thread_trace: codex_rollout_trace::ThreadTraceContext::disabled(),
|
||||
user_shell: Arc::new(default_user_shell()),
|
||||
shell_snapshot_tx: watch::channel(None).0,
|
||||
shell_snapshot_store: Arc::new(
|
||||
crate::shell_snapshot::LocalShellSnapshotStore::from_codex_home(&config.codex_home),
|
||||
),
|
||||
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
|
||||
exec_policy,
|
||||
auth_manager: auth_manager.clone(),
|
||||
@@ -4763,6 +4801,9 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
|
||||
codex_thread_store::LocalThreadStoreConfig::from_config(config.as_ref()),
|
||||
/*state_db*/ None,
|
||||
)),
|
||||
artifact_store: Arc::new(crate::artifact_store::LocalArtifactStore::from_codex_home(
|
||||
&config.codex_home,
|
||||
)),
|
||||
attestation_provider: None,
|
||||
model_client: ModelClient::new(
|
||||
Some(auth_manager.clone()),
|
||||
@@ -4953,6 +4994,12 @@ async fn make_session_with_config_and_rx(
|
||||
codex_thread_store::LocalThreadStoreConfig::from_config(config.as_ref()),
|
||||
/*state_db*/ None,
|
||||
)),
|
||||
Arc::new(crate::artifact_store::LocalArtifactStore::from_codex_home(
|
||||
&config.codex_home,
|
||||
)),
|
||||
Arc::new(
|
||||
crate::shell_snapshot::LocalShellSnapshotStore::from_codex_home(&config.codex_home),
|
||||
),
|
||||
codex_rollout_trace::ThreadTraceContext::disabled(),
|
||||
/*attestation_provider*/ None,
|
||||
Some(config.multi_agent_version_from_features()),
|
||||
@@ -5065,6 +5112,12 @@ async fn make_session_with_history_source_and_agent_control_and_rx(
|
||||
.expect("state db should initialize"),
|
||||
),
|
||||
)),
|
||||
Arc::new(crate::artifact_store::LocalArtifactStore::from_codex_home(
|
||||
&config.codex_home,
|
||||
)),
|
||||
Arc::new(
|
||||
crate::shell_snapshot::LocalShellSnapshotStore::from_codex_home(&config.codex_home),
|
||||
),
|
||||
codex_rollout_trace::ThreadTraceContext::disabled(),
|
||||
/*attestation_provider*/ None,
|
||||
Some(config.multi_agent_version_from_features()),
|
||||
@@ -6825,6 +6878,9 @@ where
|
||||
rollout_thread_trace: codex_rollout_trace::ThreadTraceContext::disabled(),
|
||||
user_shell: Arc::new(default_user_shell()),
|
||||
shell_snapshot_tx: watch::channel(None).0,
|
||||
shell_snapshot_store: Arc::new(
|
||||
crate::shell_snapshot::LocalShellSnapshotStore::from_codex_home(&config.codex_home),
|
||||
),
|
||||
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
|
||||
exec_policy,
|
||||
auth_manager: Arc::clone(&auth_manager),
|
||||
@@ -6853,6 +6909,9 @@ where
|
||||
codex_thread_store::LocalThreadStoreConfig::from_config(config.as_ref()),
|
||||
state_db,
|
||||
)),
|
||||
artifact_store: Arc::new(crate::artifact_store::LocalArtifactStore::from_codex_home(
|
||||
&config.codex_home,
|
||||
)),
|
||||
attestation_provider: None,
|
||||
model_client: ModelClient::new(
|
||||
Some(Arc::clone(&auth_manager)),
|
||||
@@ -7844,6 +7903,69 @@ async fn handle_output_item_done_records_image_save_history_message() {
|
||||
let _ = std::fs::remove_file(&expected_saved_path);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn handle_output_item_done_uses_injected_artifact_store() {
|
||||
let (mut session, turn_context) = make_session_and_context().await;
|
||||
let writes = Arc::new(std::sync::Mutex::new(Vec::new()));
|
||||
session.services.artifact_store = Arc::new(FakeArtifactStore {
|
||||
writes: Arc::clone(&writes),
|
||||
});
|
||||
let session = Arc::new(session);
|
||||
let turn_context = Arc::new(turn_context);
|
||||
let call_id = "ig_injected_store";
|
||||
let expected_saved_path = session
|
||||
.services
|
||||
.artifact_store
|
||||
.generated_image_path(&session.conversation_id.to_string(), call_id);
|
||||
let item = ResponseItem::ImageGenerationCall {
|
||||
id: call_id.to_string(),
|
||||
status: "completed".to_string(),
|
||||
revised_prompt: Some("a tiny blue square".to_string()),
|
||||
result: "Zm9v".to_string(),
|
||||
};
|
||||
|
||||
let mut ctx = HandleOutputCtx {
|
||||
sess: Arc::clone(&session),
|
||||
turn_context: Arc::clone(&turn_context),
|
||||
turn_store: Arc::new(codex_extension_api::ExtensionData::new(
|
||||
turn_context.sub_id.clone(),
|
||||
)),
|
||||
tool_runtime: test_tool_runtime(Arc::clone(&session), Arc::clone(&turn_context)),
|
||||
cancellation_token: CancellationToken::new(),
|
||||
};
|
||||
handle_output_item_done(&mut ctx, item.clone(), /*previously_active_item*/ None)
|
||||
.await
|
||||
.expect("image generation item should succeed");
|
||||
|
||||
assert_eq!(
|
||||
writes.lock().expect("lock writes").as_slice(),
|
||||
&[b"foo".to_vec()]
|
||||
);
|
||||
let image_output_path = session
|
||||
.services
|
||||
.artifact_store
|
||||
.generated_image_path(&session.conversation_id.to_string(), "<image_id>");
|
||||
let image_output_dir = image_output_path
|
||||
.parent()
|
||||
.expect("generated image path should have a parent");
|
||||
let image_message: ResponseItem = crate::context::ContextualUserFragment::into(
|
||||
crate::context::ImageGenerationInstructions::new(
|
||||
image_output_dir.display(),
|
||||
image_output_path.display(),
|
||||
),
|
||||
);
|
||||
let history = session.clone_history().await;
|
||||
assert_eq!(history.raw_items(), &[image_message, item]);
|
||||
assert_eq!(
|
||||
expected_saved_path,
|
||||
test_path_buf(&format!(
|
||||
"/fake/{}/ig_injected_store.png",
|
||||
session.conversation_id
|
||||
))
|
||||
.abs()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn handle_output_item_done_skips_image_save_message_when_save_fails() {
|
||||
let (session, turn_context) = make_session_and_context().await;
|
||||
|
||||
@@ -702,6 +702,12 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() {
|
||||
codex_thread_store::LocalThreadStoreConfig::from_config(&config),
|
||||
/*state_db*/ None,
|
||||
));
|
||||
let artifact_store = Arc::new(crate::artifact_store::LocalArtifactStore::from_codex_home(
|
||||
&config.codex_home,
|
||||
));
|
||||
let shell_snapshot_store = Arc::new(
|
||||
crate::shell_snapshot::LocalShellSnapshotStore::from_codex_home(&config.codex_home),
|
||||
);
|
||||
|
||||
let CodexSpawnOk { codex, .. } = Codex::spawn(CodexSpawnArgs {
|
||||
config,
|
||||
@@ -733,6 +739,8 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() {
|
||||
},
|
||||
analytics_events_client: None,
|
||||
thread_store,
|
||||
artifact_store,
|
||||
shell_snapshot_store,
|
||||
attestation_provider: None,
|
||||
inherited_multi_agent_version: None,
|
||||
})
|
||||
|
||||
@@ -609,7 +609,6 @@ impl Session {
|
||||
let next_permission_profile = next.permission_profile();
|
||||
let permission_profile_changed =
|
||||
previous_permission_profile != next_permission_profile;
|
||||
let codex_home = next.codex_home.clone();
|
||||
let session_source = next.session_source.clone();
|
||||
let previous_config = notify_config_contributors.then(|| {
|
||||
Self::build_effective_session_config(&state.session_configuration)
|
||||
@@ -622,7 +621,6 @@ impl Session {
|
||||
turn_environments,
|
||||
permission_profile_changed,
|
||||
previous_cwd,
|
||||
codex_home,
|
||||
session_source,
|
||||
previous_config,
|
||||
new_config,
|
||||
@@ -637,7 +635,6 @@ impl Session {
|
||||
turn_environments,
|
||||
permission_profile_changed,
|
||||
previous_cwd,
|
||||
codex_home,
|
||||
session_source,
|
||||
previous_config,
|
||||
new_config,
|
||||
@@ -661,7 +658,6 @@ impl Session {
|
||||
self.maybe_refresh_shell_snapshot_for_cwd(
|
||||
&previous_cwd,
|
||||
&session_configuration.cwd,
|
||||
&codex_home,
|
||||
&session_source,
|
||||
);
|
||||
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
use std::future::Future;
|
||||
use std::io::ErrorKind;
|
||||
use std::path::Path;
|
||||
use std::pin::Pin;
|
||||
use std::process::Stdio;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
@@ -35,9 +37,144 @@ const SNAPSHOT_RETENTION: Duration = Duration::from_secs(60 * 60 * 24 * 3); // 3
|
||||
const SNAPSHOT_DIR: &str = "shell_snapshots";
|
||||
const EXCLUDED_EXPORT_VARS: &[&str] = &["PWD", "OLDPWD"];
|
||||
|
||||
/// Persists executor-local shell snapshot files and owns retention cleanup.
|
||||
/// Implementations must return local readable paths because shell execution sources snapshots.
|
||||
pub trait ShellSnapshotStore: Send + Sync + 'static {
|
||||
/// Allocates the final and temporary paths for one snapshot generation.
|
||||
fn snapshot_paths(&self, session_id: ThreadId, shell_type: ShellType) -> ShellSnapshotPaths;
|
||||
|
||||
/// Removes stale inactive snapshots according to the store's retention policy.
|
||||
fn cleanup_stale_snapshots(
|
||||
&self,
|
||||
active_session_id: ThreadId,
|
||||
state_db: Option<StateDbHandle>,
|
||||
) -> ShellSnapshotStoreFuture<'_>;
|
||||
}
|
||||
|
||||
/// Future returned by shell snapshot store cleanup work.
|
||||
pub type ShellSnapshotStoreFuture<'a> = Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
|
||||
|
||||
#[derive(Clone)]
|
||||
/// Codex Home backed executor-local shell snapshot store.
|
||||
pub struct LocalShellSnapshotStore {
|
||||
codex_home: AbsolutePathBuf,
|
||||
}
|
||||
|
||||
impl LocalShellSnapshotStore {
|
||||
/// Constructs a local shell snapshot store rooted at one Codex Home.
|
||||
pub fn from_codex_home(codex_home: &AbsolutePathBuf) -> Self {
|
||||
Self {
|
||||
codex_home: codex_home.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
fn snapshot_dir(&self) -> AbsolutePathBuf {
|
||||
self.codex_home.join(SNAPSHOT_DIR)
|
||||
}
|
||||
|
||||
async fn cleanup_stale_snapshots_impl(
|
||||
&self,
|
||||
active_session_id: ThreadId,
|
||||
state_db: Option<StateDbHandle>,
|
||||
) -> Result<()> {
|
||||
let snapshot_dir = self.snapshot_dir();
|
||||
|
||||
let mut entries = match fs::read_dir(&snapshot_dir).await {
|
||||
Ok(entries) => entries,
|
||||
Err(err) if err.kind() == ErrorKind::NotFound => return Ok(()),
|
||||
Err(err) => return Err(err.into()),
|
||||
};
|
||||
|
||||
let now = SystemTime::now();
|
||||
let active_session_id = active_session_id.to_string();
|
||||
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
if !entry.file_type().await?.is_file() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let path = entry.path();
|
||||
|
||||
let file_name = entry.file_name();
|
||||
let file_name = file_name.to_string_lossy();
|
||||
let Some(session_id) = snapshot_session_id_from_file_name(&file_name) else {
|
||||
remove_snapshot_file(&path).await;
|
||||
continue;
|
||||
};
|
||||
if session_id == active_session_id {
|
||||
continue;
|
||||
}
|
||||
|
||||
let rollout_path =
|
||||
find_thread_path_by_id_str(&self.codex_home, session_id, state_db.as_deref())
|
||||
.await?;
|
||||
let Some(rollout_path) = rollout_path else {
|
||||
remove_snapshot_file(&path).await;
|
||||
continue;
|
||||
};
|
||||
|
||||
let modified = match fs::metadata(&rollout_path).await.and_then(|m| m.modified()) {
|
||||
Ok(modified) => modified,
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
"Failed to check rollout age for snapshot {}: {err:?}",
|
||||
path.display()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if now
|
||||
.duration_since(modified)
|
||||
.ok()
|
||||
.is_some_and(|age| age >= SNAPSHOT_RETENTION)
|
||||
{
|
||||
remove_snapshot_file(&path).await;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl ShellSnapshotStore for LocalShellSnapshotStore {
|
||||
fn snapshot_paths(&self, session_id: ThreadId, shell_type: ShellType) -> ShellSnapshotPaths {
|
||||
let extension = match shell_type {
|
||||
ShellType::PowerShell => "ps1",
|
||||
_ => "sh",
|
||||
};
|
||||
let nonce = SystemTime::now()
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.map(|duration| duration.as_nanos())
|
||||
.unwrap_or(0);
|
||||
let snapshot_dir = self.snapshot_dir();
|
||||
|
||||
ShellSnapshotPaths {
|
||||
path: snapshot_dir.join(format!("{session_id}.{nonce}.{extension}")),
|
||||
temp_path: snapshot_dir.join(format!("{session_id}.tmp-{nonce}")),
|
||||
}
|
||||
}
|
||||
|
||||
fn cleanup_stale_snapshots(
|
||||
&self,
|
||||
active_session_id: ThreadId,
|
||||
state_db: Option<StateDbHandle>,
|
||||
) -> ShellSnapshotStoreFuture<'_> {
|
||||
Box::pin(self.cleanup_stale_snapshots_impl(active_session_id, state_db))
|
||||
}
|
||||
}
|
||||
|
||||
/// Final and temporary local paths for one shell snapshot generation.
|
||||
pub struct ShellSnapshotPaths {
|
||||
/// Final path sourced by later shell execution.
|
||||
pub path: AbsolutePathBuf,
|
||||
/// Temporary path populated before validation and rename.
|
||||
pub temp_path: AbsolutePathBuf,
|
||||
}
|
||||
|
||||
impl ShellSnapshot {
|
||||
pub fn start_snapshotting(
|
||||
codex_home: AbsolutePathBuf,
|
||||
pub(crate) fn start_snapshotting(
|
||||
shell_snapshot_store: Arc<dyn ShellSnapshotStore>,
|
||||
session_id: ThreadId,
|
||||
session_cwd: AbsolutePathBuf,
|
||||
shell: &mut Shell,
|
||||
@@ -48,7 +185,7 @@ impl ShellSnapshot {
|
||||
shell.shell_snapshot = shell_snapshot_rx;
|
||||
|
||||
Self::spawn_snapshot_task(
|
||||
codex_home,
|
||||
shell_snapshot_store,
|
||||
session_id,
|
||||
session_cwd,
|
||||
shell.clone(),
|
||||
@@ -60,8 +197,8 @@ impl ShellSnapshot {
|
||||
shell_snapshot_tx
|
||||
}
|
||||
|
||||
pub fn refresh_snapshot(
|
||||
codex_home: AbsolutePathBuf,
|
||||
pub(crate) fn refresh_snapshot(
|
||||
shell_snapshot_store: Arc<dyn ShellSnapshotStore>,
|
||||
session_id: ThreadId,
|
||||
session_cwd: AbsolutePathBuf,
|
||||
shell: Shell,
|
||||
@@ -70,7 +207,7 @@ impl ShellSnapshot {
|
||||
state_db: Option<StateDbHandle>,
|
||||
) {
|
||||
Self::spawn_snapshot_task(
|
||||
codex_home,
|
||||
shell_snapshot_store,
|
||||
session_id,
|
||||
session_cwd,
|
||||
shell,
|
||||
@@ -81,7 +218,7 @@ impl ShellSnapshot {
|
||||
}
|
||||
|
||||
fn spawn_snapshot_task(
|
||||
codex_home: AbsolutePathBuf,
|
||||
shell_snapshot_store: Arc<dyn ShellSnapshotStore>,
|
||||
session_id: ThreadId,
|
||||
session_cwd: AbsolutePathBuf,
|
||||
snapshot_shell: Shell,
|
||||
@@ -92,13 +229,24 @@ impl ShellSnapshot {
|
||||
let snapshot_span = info_span!("shell_snapshot", thread_id = %session_id);
|
||||
tokio::spawn(
|
||||
async move {
|
||||
let cleanup_shell_snapshot_store = Arc::clone(&shell_snapshot_store);
|
||||
let cleanup_session_id = session_id;
|
||||
let cleanup_state_db = state_db.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) = cleanup_shell_snapshot_store
|
||||
.cleanup_stale_snapshots(cleanup_session_id, cleanup_state_db)
|
||||
.await
|
||||
{
|
||||
tracing::warn!("Failed to clean up shell snapshots: {err:?}");
|
||||
}
|
||||
});
|
||||
|
||||
let timer = session_telemetry.start_timer("codex.shell_snapshot.duration_ms", &[]);
|
||||
let snapshot = ShellSnapshot::try_new(
|
||||
&codex_home,
|
||||
shell_snapshot_store.as_ref(),
|
||||
session_id,
|
||||
&session_cwd,
|
||||
&snapshot_shell,
|
||||
state_db,
|
||||
)
|
||||
.await
|
||||
.map(Arc::new);
|
||||
@@ -117,38 +265,13 @@ impl ShellSnapshot {
|
||||
}
|
||||
|
||||
async fn try_new(
|
||||
codex_home: &AbsolutePathBuf,
|
||||
shell_snapshot_store: &dyn ShellSnapshotStore,
|
||||
session_id: ThreadId,
|
||||
session_cwd: &AbsolutePathBuf,
|
||||
shell: &Shell,
|
||||
state_db: Option<StateDbHandle>,
|
||||
) -> std::result::Result<Self, &'static str> {
|
||||
// File to store the snapshot
|
||||
let extension = match shell.shell_type {
|
||||
ShellType::PowerShell => "ps1",
|
||||
_ => "sh",
|
||||
};
|
||||
let nonce = SystemTime::now()
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.map(|duration| duration.as_nanos())
|
||||
.unwrap_or(0);
|
||||
let path = codex_home
|
||||
.join(SNAPSHOT_DIR)
|
||||
.join(format!("{session_id}.{nonce}.{extension}"));
|
||||
let temp_path = codex_home
|
||||
.join(SNAPSHOT_DIR)
|
||||
.join(format!("{session_id}.tmp-{nonce}"));
|
||||
|
||||
// Clean the (unlikely) leaked snapshot files.
|
||||
let codex_home = codex_home.clone();
|
||||
let cleanup_session_id = session_id;
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) =
|
||||
cleanup_stale_snapshots(&codex_home, cleanup_session_id, state_db).await
|
||||
{
|
||||
tracing::warn!("Failed to clean up shell snapshots: {err:?}");
|
||||
}
|
||||
});
|
||||
let ShellSnapshotPaths { path, temp_path } =
|
||||
shell_snapshot_store.snapshot_paths(session_id, shell.shell_type.clone());
|
||||
|
||||
// Make the new snapshot.
|
||||
if let Err(err) =
|
||||
@@ -494,72 +617,6 @@ $envVars | ForEach-Object {
|
||||
"##
|
||||
}
|
||||
|
||||
/// Removes shell snapshots that either lack a matching session rollout file or
|
||||
/// whose rollouts have not been updated within the retention window.
|
||||
/// The active session id is exempt from cleanup.
|
||||
pub async fn cleanup_stale_snapshots(
|
||||
codex_home: &AbsolutePathBuf,
|
||||
active_session_id: ThreadId,
|
||||
state_db: Option<StateDbHandle>,
|
||||
) -> Result<()> {
|
||||
let snapshot_dir = codex_home.join(SNAPSHOT_DIR);
|
||||
|
||||
let mut entries = match fs::read_dir(&snapshot_dir).await {
|
||||
Ok(entries) => entries,
|
||||
Err(err) if err.kind() == ErrorKind::NotFound => return Ok(()),
|
||||
Err(err) => return Err(err.into()),
|
||||
};
|
||||
|
||||
let now = SystemTime::now();
|
||||
let active_session_id = active_session_id.to_string();
|
||||
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
if !entry.file_type().await?.is_file() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let path = entry.path();
|
||||
|
||||
let file_name = entry.file_name();
|
||||
let file_name = file_name.to_string_lossy();
|
||||
let Some(session_id) = snapshot_session_id_from_file_name(&file_name) else {
|
||||
remove_snapshot_file(&path).await;
|
||||
continue;
|
||||
};
|
||||
if session_id == active_session_id {
|
||||
continue;
|
||||
}
|
||||
|
||||
let rollout_path =
|
||||
find_thread_path_by_id_str(codex_home, session_id, state_db.as_deref()).await?;
|
||||
let Some(rollout_path) = rollout_path else {
|
||||
remove_snapshot_file(&path).await;
|
||||
continue;
|
||||
};
|
||||
|
||||
let modified = match fs::metadata(&rollout_path).await.and_then(|m| m.modified()) {
|
||||
Ok(modified) => modified,
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
"Failed to check rollout age for snapshot {}: {err:?}",
|
||||
path.display()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if now
|
||||
.duration_since(modified)
|
||||
.ok()
|
||||
.is_some_and(|age| age >= SNAPSHOT_RETENTION)
|
||||
{
|
||||
remove_snapshot_file(&path).await;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn remove_snapshot_file(path: &Path) {
|
||||
if let Err(err) = fs::remove_file(path).await {
|
||||
tracing::warn!("Failed to delete shell snapshot at {:?}: {err:?}", path);
|
||||
|
||||
@@ -196,13 +196,13 @@ async fn try_new_creates_and_deletes_snapshot_file() -> Result<()> {
|
||||
shell_path: PathBuf::from("/bin/bash"),
|
||||
shell_snapshot: crate::shell::empty_shell_snapshot_receiver(),
|
||||
};
|
||||
let shell_snapshot_store = LocalShellSnapshotStore::from_codex_home(&dir.path().abs());
|
||||
|
||||
let snapshot = ShellSnapshot::try_new(
|
||||
&dir.path().abs(),
|
||||
&shell_snapshot_store,
|
||||
ThreadId::new(),
|
||||
&dir.path().abs(),
|
||||
&shell,
|
||||
/*state_db*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("snapshot should be created");
|
||||
@@ -227,25 +227,16 @@ async fn try_new_uses_distinct_generation_paths() -> Result<()> {
|
||||
shell_path: PathBuf::from("/bin/bash"),
|
||||
shell_snapshot: crate::shell::empty_shell_snapshot_receiver(),
|
||||
};
|
||||
let shell_snapshot_store = LocalShellSnapshotStore::from_codex_home(&dir.path().abs());
|
||||
|
||||
let initial_snapshot = ShellSnapshot::try_new(
|
||||
&dir.path().abs(),
|
||||
session_id,
|
||||
&dir.path().abs(),
|
||||
&shell,
|
||||
/*state_db*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("initial snapshot should be created");
|
||||
let refreshed_snapshot = ShellSnapshot::try_new(
|
||||
&dir.path().abs(),
|
||||
session_id,
|
||||
&dir.path().abs(),
|
||||
&shell,
|
||||
/*state_db*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("refreshed snapshot should be created");
|
||||
let initial_snapshot =
|
||||
ShellSnapshot::try_new(&shell_snapshot_store, session_id, &dir.path().abs(), &shell)
|
||||
.await
|
||||
.expect("initial snapshot should be created");
|
||||
let refreshed_snapshot =
|
||||
ShellSnapshot::try_new(&shell_snapshot_store, session_id, &dir.path().abs(), &shell)
|
||||
.await
|
||||
.expect("refreshed snapshot should be created");
|
||||
let initial_path = initial_snapshot.path.clone();
|
||||
let refreshed_path = refreshed_snapshot.path.clone();
|
||||
|
||||
@@ -439,7 +430,9 @@ async fn cleanup_stale_snapshots_removes_orphans_and_keeps_live() -> Result<()>
|
||||
fs::write(&orphan_snapshot, "orphan").await?;
|
||||
fs::write(&invalid_snapshot, "invalid").await?;
|
||||
|
||||
cleanup_stale_snapshots(&codex_home, ThreadId::new(), /*state_db*/ None).await?;
|
||||
LocalShellSnapshotStore::from_codex_home(&codex_home)
|
||||
.cleanup_stale_snapshots(ThreadId::new(), /*state_db*/ None)
|
||||
.await?;
|
||||
|
||||
assert_eq!(live_snapshot.exists(), true);
|
||||
assert_eq!(orphan_snapshot.exists(), false);
|
||||
@@ -462,7 +455,9 @@ async fn cleanup_stale_snapshots_removes_stale_rollouts() -> Result<()> {
|
||||
|
||||
set_file_mtime(&rollout_path, SNAPSHOT_RETENTION + Duration::from_secs(60))?;
|
||||
|
||||
cleanup_stale_snapshots(&codex_home, ThreadId::new(), /*state_db*/ None).await?;
|
||||
LocalShellSnapshotStore::from_codex_home(&codex_home)
|
||||
.cleanup_stale_snapshots(ThreadId::new(), /*state_db*/ None)
|
||||
.await?;
|
||||
|
||||
assert_eq!(stale_snapshot.exists(), false);
|
||||
Ok(())
|
||||
@@ -483,7 +478,9 @@ async fn cleanup_stale_snapshots_skips_active_session() -> Result<()> {
|
||||
|
||||
set_file_mtime(&rollout_path, SNAPSHOT_RETENTION + Duration::from_secs(60))?;
|
||||
|
||||
cleanup_stale_snapshots(&codex_home, active_session, /*state_db*/ None).await?;
|
||||
LocalShellSnapshotStore::from_codex_home(&codex_home)
|
||||
.cleanup_stale_snapshots(active_session, /*state_db*/ None)
|
||||
.await?;
|
||||
|
||||
assert_eq!(active_snapshot.exists(), true);
|
||||
Ok(())
|
||||
|
||||
@@ -3,6 +3,7 @@ use std::sync::Arc;
|
||||
|
||||
use crate::SkillsManager;
|
||||
use crate::agent::AgentControl;
|
||||
use crate::artifact_store::ArtifactStore;
|
||||
use crate::attestation::AttestationProvider;
|
||||
use crate::client::ModelClient;
|
||||
use crate::config::NetworkProxyAuditMetadata;
|
||||
@@ -51,6 +52,7 @@ pub(crate) struct SessionServices {
|
||||
pub(crate) rollout_thread_trace: ThreadTraceContext,
|
||||
pub(crate) user_shell: Arc<crate::shell::Shell>,
|
||||
pub(crate) shell_snapshot_tx: watch::Sender<Option<Arc<crate::shell_snapshot::ShellSnapshot>>>,
|
||||
pub(crate) shell_snapshot_store: Arc<dyn crate::shell_snapshot::ShellSnapshotStore>,
|
||||
pub(crate) show_raw_agent_reasoning: bool,
|
||||
pub(crate) exec_policy: Arc<ExecPolicyManager>,
|
||||
pub(crate) auth_manager: Arc<AuthManager>,
|
||||
@@ -74,6 +76,7 @@ pub(crate) struct SessionServices {
|
||||
pub(crate) state_db: Option<StateDbHandle>,
|
||||
pub(crate) live_thread: Option<LiveThread>,
|
||||
pub(crate) thread_store: Arc<dyn ThreadStore>,
|
||||
pub(crate) artifact_store: Arc<dyn ArtifactStore>,
|
||||
pub(crate) attestation_provider: Option<Arc<dyn AttestationProvider>>,
|
||||
/// Session-scoped model client shared across turns.
|
||||
pub(crate) model_client: ModelClient,
|
||||
|
||||
@@ -36,34 +36,17 @@ use tracing::debug;
|
||||
use tracing::instrument;
|
||||
use tracing::warn;
|
||||
|
||||
const GENERATED_IMAGE_ARTIFACTS_DIR: &str = "generated_images";
|
||||
use crate::artifact_store::ArtifactStore;
|
||||
#[cfg(test)]
|
||||
use crate::artifact_store::LocalArtifactStore;
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn image_generation_artifact_path(
|
||||
codex_home: &AbsolutePathBuf,
|
||||
session_id: &str,
|
||||
call_id: &str,
|
||||
) -> AbsolutePathBuf {
|
||||
let sanitize = |value: &str| {
|
||||
let mut sanitized: String = value
|
||||
.chars()
|
||||
.map(|ch| {
|
||||
if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
|
||||
ch
|
||||
} else {
|
||||
'_'
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
if sanitized.is_empty() {
|
||||
sanitized = "generated_image".to_string();
|
||||
}
|
||||
sanitized
|
||||
};
|
||||
|
||||
codex_home
|
||||
.join(GENERATED_IMAGE_ARTIFACTS_DIR)
|
||||
.join(sanitize(session_id))
|
||||
.join(format!("{}.png", sanitize(call_id)))
|
||||
LocalArtifactStore::from_codex_home(codex_home).generated_image_path(session_id, call_id)
|
||||
}
|
||||
|
||||
fn strip_hidden_assistant_markup(text: &str, plan_mode: bool) -> String {
|
||||
@@ -108,7 +91,7 @@ pub(crate) fn raw_assistant_output_text_from_item(item: &ResponseItem) -> Option
|
||||
}
|
||||
|
||||
async fn save_image_generation_result(
|
||||
codex_home: &AbsolutePathBuf,
|
||||
artifact_store: &dyn ArtifactStore,
|
||||
session_id: &str,
|
||||
call_id: &str,
|
||||
result: &str,
|
||||
@@ -118,12 +101,9 @@ async fn save_image_generation_result(
|
||||
.map_err(|err| {
|
||||
CodexErr::InvalidRequest(format!("invalid image generation payload: {err}"))
|
||||
})?;
|
||||
let path = image_generation_artifact_path(codex_home, session_id, call_id);
|
||||
if let Some(parent) = path.parent() {
|
||||
tokio::fs::create_dir_all(parent).await?;
|
||||
}
|
||||
tokio::fs::write(&path, bytes).await?;
|
||||
Ok(path)
|
||||
artifact_store
|
||||
.write_generated_image(session_id, call_id, bytes)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn persist_image_generation_item(
|
||||
@@ -134,7 +114,7 @@ pub(crate) async fn persist_image_generation_item(
|
||||
image_item.saved_path = None;
|
||||
let session_id = sess.conversation_id.to_string();
|
||||
match save_image_generation_result(
|
||||
&turn_context.config.codex_home,
|
||||
sess.services.artifact_store.as_ref(),
|
||||
&session_id,
|
||||
&image_item.id,
|
||||
&image_item.result,
|
||||
@@ -146,11 +126,10 @@ pub(crate) async fn persist_image_generation_item(
|
||||
Some(path)
|
||||
}
|
||||
Err(err) => {
|
||||
let output_path = image_generation_artifact_path(
|
||||
&turn_context.config.codex_home,
|
||||
&session_id,
|
||||
&image_item.id,
|
||||
);
|
||||
let output_path = sess
|
||||
.services
|
||||
.artifact_store
|
||||
.generated_image_path(&session_id, &image_item.id);
|
||||
let output_dir = output_path
|
||||
.parent()
|
||||
.unwrap_or_else(|| turn_context.config.codex_home.clone());
|
||||
@@ -173,8 +152,10 @@ async fn record_image_generation_instructions(
|
||||
return;
|
||||
}
|
||||
let session_id = sess.conversation_id.to_string();
|
||||
let image_output_path =
|
||||
image_generation_artifact_path(&turn_context.config.codex_home, &session_id, "<image_id>");
|
||||
let image_output_path = sess
|
||||
.services
|
||||
.artifact_store
|
||||
.generated_image_path(&session_id, "<image_id>");
|
||||
let image_output_dir = image_output_path
|
||||
.parent()
|
||||
.unwrap_or_else(|| turn_context.config.codex_home.clone());
|
||||
|
||||
@@ -8,6 +8,7 @@ use super::image_generation_artifact_path;
|
||||
use super::last_assistant_message_from_item;
|
||||
use super::response_item_may_include_external_context;
|
||||
use super::save_image_generation_result;
|
||||
use crate::artifact_store::LocalArtifactStore;
|
||||
use crate::session::tests::make_session_and_context;
|
||||
use crate::tools::ToolRouter;
|
||||
use crate::tools::parallel::ToolCallRuntime;
|
||||
@@ -424,10 +425,14 @@ async fn save_image_generation_result_saves_base64_to_png_in_codex_home() {
|
||||
let expected_path = image_generation_artifact_path(&codex_home, "session-1", "ig_save_base64");
|
||||
let _ = std::fs::remove_file(&expected_path);
|
||||
|
||||
let saved_path =
|
||||
save_image_generation_result(&codex_home, "session-1", "ig_save_base64", "Zm9v")
|
||||
.await
|
||||
.expect("image should be saved");
|
||||
let saved_path = save_image_generation_result(
|
||||
&LocalArtifactStore::from_codex_home(&codex_home),
|
||||
"session-1",
|
||||
"ig_save_base64",
|
||||
"Zm9v",
|
||||
)
|
||||
.await
|
||||
.expect("image should be saved");
|
||||
|
||||
assert_eq!(saved_path, expected_path);
|
||||
assert_eq!(std::fs::read(&saved_path).expect("saved file"), b"foo");
|
||||
@@ -440,9 +445,14 @@ async fn save_image_generation_result_rejects_data_url_payload() {
|
||||
let codex_home = tempfile::tempdir().expect("create codex home");
|
||||
let codex_home = codex_home.path().abs();
|
||||
|
||||
let err = save_image_generation_result(&codex_home, "session-1", "ig_456", result)
|
||||
.await
|
||||
.expect_err("data url payload should error");
|
||||
let err = save_image_generation_result(
|
||||
&LocalArtifactStore::from_codex_home(&codex_home),
|
||||
"session-1",
|
||||
"ig_456",
|
||||
result,
|
||||
)
|
||||
.await
|
||||
.expect_err("data url payload should error");
|
||||
assert!(matches!(err, CodexErr::InvalidRequest(_)));
|
||||
}
|
||||
|
||||
@@ -459,9 +469,14 @@ async fn save_image_generation_result_overwrites_existing_file() {
|
||||
.expect("create image output dir");
|
||||
std::fs::write(&existing_path, b"existing").expect("seed existing image");
|
||||
|
||||
let saved_path = save_image_generation_result(&codex_home, "session-1", "ig_overwrite", "Zm9v")
|
||||
.await
|
||||
.expect("image should be saved");
|
||||
let saved_path = save_image_generation_result(
|
||||
&LocalArtifactStore::from_codex_home(&codex_home),
|
||||
"session-1",
|
||||
"ig_overwrite",
|
||||
"Zm9v",
|
||||
)
|
||||
.await
|
||||
.expect("image should be saved");
|
||||
|
||||
assert_eq!(saved_path, existing_path);
|
||||
assert_eq!(std::fs::read(&saved_path).expect("saved file"), b"foo");
|
||||
@@ -475,9 +490,14 @@ async fn save_image_generation_result_sanitizes_call_id_for_codex_home_output_pa
|
||||
let expected_path = image_generation_artifact_path(&codex_home, "session-1", "../ig/..");
|
||||
let _ = std::fs::remove_file(&expected_path);
|
||||
|
||||
let saved_path = save_image_generation_result(&codex_home, "session-1", "../ig/..", "Zm9v")
|
||||
.await
|
||||
.expect("image should be saved");
|
||||
let saved_path = save_image_generation_result(
|
||||
&LocalArtifactStore::from_codex_home(&codex_home),
|
||||
"session-1",
|
||||
"../ig/..",
|
||||
"Zm9v",
|
||||
)
|
||||
.await
|
||||
.expect("image should be saved");
|
||||
|
||||
assert_eq!(saved_path, expected_path);
|
||||
assert_eq!(std::fs::read(&saved_path).expect("saved file"), b"foo");
|
||||
@@ -488,9 +508,14 @@ async fn save_image_generation_result_sanitizes_call_id_for_codex_home_output_pa
|
||||
async fn save_image_generation_result_rejects_non_standard_base64() {
|
||||
let codex_home = tempfile::tempdir().expect("create codex home");
|
||||
let codex_home = codex_home.path().abs();
|
||||
let err = save_image_generation_result(&codex_home, "session-1", "ig_urlsafe", "_-8")
|
||||
.await
|
||||
.expect_err("non-standard base64 should error");
|
||||
let err = save_image_generation_result(
|
||||
&LocalArtifactStore::from_codex_home(&codex_home),
|
||||
"session-1",
|
||||
"ig_urlsafe",
|
||||
"_-8",
|
||||
)
|
||||
.await
|
||||
.expect_err("non-standard base64 should error");
|
||||
assert!(matches!(err, CodexErr::InvalidRequest(_)));
|
||||
}
|
||||
|
||||
@@ -499,7 +524,7 @@ async fn save_image_generation_result_rejects_non_base64_data_urls() {
|
||||
let codex_home = tempfile::tempdir().expect("create codex home");
|
||||
let codex_home = codex_home.path().abs();
|
||||
let err = save_image_generation_result(
|
||||
&codex_home,
|
||||
&LocalArtifactStore::from_codex_home(&codex_home),
|
||||
"session-1",
|
||||
"ig_svg",
|
||||
"data:image/svg+xml,<svg/>",
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
use crate::SkillsManager;
|
||||
use crate::agent::AgentControl;
|
||||
use crate::artifact_store::ArtifactStore;
|
||||
use crate::artifact_store::LocalArtifactStore;
|
||||
use crate::attestation::AttestationProvider;
|
||||
use crate::codex_thread::CodexThread;
|
||||
use crate::config::Config;
|
||||
@@ -13,7 +15,9 @@ use crate::session::CodexSpawnArgs;
|
||||
use crate::session::CodexSpawnOk;
|
||||
use crate::session::INITIAL_SUBMIT_ID;
|
||||
use crate::session::resolve_multi_agent_version;
|
||||
use crate::shell_snapshot::LocalShellSnapshotStore;
|
||||
use crate::shell_snapshot::ShellSnapshot;
|
||||
use crate::shell_snapshot::ShellSnapshotStore;
|
||||
use crate::tasks::InterruptedTurnHistoryMarker;
|
||||
use crate::tasks::interrupted_turn_history_marker;
|
||||
use codex_analytics::AnalyticsEventsClient;
|
||||
@@ -173,6 +177,30 @@ pub struct ThreadManager {
|
||||
_test_codex_home_guard: Option<TempCodexHomeGuard>,
|
||||
}
|
||||
|
||||
/// Storage backends used by [`ThreadManager`] for executor-visible artifacts.
|
||||
///
|
||||
/// Cloud and embedded hosts can provide implementations here without asking
|
||||
/// core to treat `codex_home` as a remotely mounted filesystem.
|
||||
#[derive(Clone)]
|
||||
pub struct ThreadManagerStorageDeps {
|
||||
/// Stores generated artifacts and materializes executor-readable paths.
|
||||
pub artifact_store: Arc<dyn ArtifactStore>,
|
||||
/// Stores executor-local shell snapshots used by shell startup.
|
||||
pub shell_snapshot_store: Arc<dyn ShellSnapshotStore>,
|
||||
}
|
||||
|
||||
impl ThreadManagerStorageDeps {
|
||||
/// Builds the standard local backends rooted under the configured Codex home.
|
||||
pub fn local(config: &Config) -> Self {
|
||||
Self {
|
||||
artifact_store: Arc::new(LocalArtifactStore::from_codex_home(&config.codex_home)),
|
||||
shell_snapshot_store: Arc::new(LocalShellSnapshotStore::from_codex_home(
|
||||
&config.codex_home,
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct StartThreadOptions {
|
||||
pub config: Config,
|
||||
pub initial_history: InitialHistory,
|
||||
@@ -208,6 +236,8 @@ pub(crate) struct ThreadManagerState {
|
||||
mcp_manager: Arc<McpManager>,
|
||||
extensions: Arc<ExtensionRegistry<Config>>,
|
||||
thread_store: Arc<dyn ThreadStore>,
|
||||
artifact_store: Arc<dyn ArtifactStore>,
|
||||
shell_snapshot_store: Arc<dyn ShellSnapshotStore>,
|
||||
attestation_provider: Option<Arc<dyn AttestationProvider>>,
|
||||
session_source: SessionSource,
|
||||
installation_id: String,
|
||||
@@ -263,6 +293,104 @@ impl ThreadManager {
|
||||
installation_id: String,
|
||||
attestation_provider: Option<Arc<dyn AttestationProvider>>,
|
||||
) -> Self {
|
||||
Self::new_with_storage_deps(
|
||||
config,
|
||||
auth_manager,
|
||||
session_source,
|
||||
environment_manager,
|
||||
extensions,
|
||||
analytics_events_client,
|
||||
thread_store,
|
||||
ThreadManagerStorageDeps::local(config),
|
||||
state_db,
|
||||
installation_id,
|
||||
attestation_provider,
|
||||
)
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
/// Constructs a thread manager with an explicit generated artifact backend.
|
||||
pub fn new_with_artifact_store(
|
||||
config: &Config,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
session_source: SessionSource,
|
||||
environment_manager: Arc<EnvironmentManager>,
|
||||
extensions: Arc<ExtensionRegistry<Config>>,
|
||||
analytics_events_client: Option<AnalyticsEventsClient>,
|
||||
thread_store: Arc<dyn ThreadStore>,
|
||||
artifact_store: Arc<dyn ArtifactStore>,
|
||||
state_db: Option<StateDbHandle>,
|
||||
installation_id: String,
|
||||
attestation_provider: Option<Arc<dyn AttestationProvider>>,
|
||||
) -> Self {
|
||||
let mut storage_deps = ThreadManagerStorageDeps::local(config);
|
||||
storage_deps.artifact_store = artifact_store;
|
||||
Self::new_with_storage_deps(
|
||||
config,
|
||||
auth_manager,
|
||||
session_source,
|
||||
environment_manager,
|
||||
extensions,
|
||||
analytics_events_client,
|
||||
thread_store,
|
||||
storage_deps,
|
||||
state_db,
|
||||
installation_id,
|
||||
attestation_provider,
|
||||
)
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
/// Constructs a thread manager with an explicit shell snapshot backend.
|
||||
pub fn new_with_shell_snapshot_store(
|
||||
config: &Config,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
session_source: SessionSource,
|
||||
environment_manager: Arc<EnvironmentManager>,
|
||||
extensions: Arc<ExtensionRegistry<Config>>,
|
||||
analytics_events_client: Option<AnalyticsEventsClient>,
|
||||
thread_store: Arc<dyn ThreadStore>,
|
||||
shell_snapshot_store: Arc<dyn ShellSnapshotStore>,
|
||||
state_db: Option<StateDbHandle>,
|
||||
installation_id: String,
|
||||
attestation_provider: Option<Arc<dyn AttestationProvider>>,
|
||||
) -> Self {
|
||||
let mut storage_deps = ThreadManagerStorageDeps::local(config);
|
||||
storage_deps.shell_snapshot_store = shell_snapshot_store;
|
||||
Self::new_with_storage_deps(
|
||||
config,
|
||||
auth_manager,
|
||||
session_source,
|
||||
environment_manager,
|
||||
extensions,
|
||||
analytics_events_client,
|
||||
thread_store,
|
||||
storage_deps,
|
||||
state_db,
|
||||
installation_id,
|
||||
attestation_provider,
|
||||
)
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
/// Constructs a thread manager with explicit executor-visible storage backends.
|
||||
pub fn new_with_storage_deps(
|
||||
config: &Config,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
session_source: SessionSource,
|
||||
environment_manager: Arc<EnvironmentManager>,
|
||||
extensions: Arc<ExtensionRegistry<Config>>,
|
||||
analytics_events_client: Option<AnalyticsEventsClient>,
|
||||
thread_store: Arc<dyn ThreadStore>,
|
||||
storage_deps: ThreadManagerStorageDeps,
|
||||
state_db: Option<StateDbHandle>,
|
||||
installation_id: String,
|
||||
attestation_provider: Option<Arc<dyn AttestationProvider>>,
|
||||
) -> Self {
|
||||
let ThreadManagerStorageDeps {
|
||||
artifact_store,
|
||||
shell_snapshot_store,
|
||||
} = storage_deps;
|
||||
let codex_home = config.codex_home.clone();
|
||||
let restriction_product = session_source.restriction_product();
|
||||
let (thread_created_tx, _) = broadcast::channel(THREAD_CREATED_CHANNEL_CAPACITY);
|
||||
@@ -287,6 +415,8 @@ impl ThreadManager {
|
||||
mcp_manager,
|
||||
extensions,
|
||||
thread_store,
|
||||
artifact_store,
|
||||
shell_snapshot_store,
|
||||
attestation_provider,
|
||||
auth_manager,
|
||||
session_source,
|
||||
@@ -361,8 +491,10 @@ impl ThreadManager {
|
||||
restriction_product,
|
||||
));
|
||||
let mcp_manager = Arc::new(McpManager::new(Arc::clone(&plugins_manager)));
|
||||
let shell_snapshot_store: Arc<dyn ShellSnapshotStore> =
|
||||
Arc::new(LocalShellSnapshotStore::from_codex_home(&skills_codex_home));
|
||||
let skills_manager = Arc::new(SkillsManager::new_with_restriction_product(
|
||||
skills_codex_home,
|
||||
skills_codex_home.clone(),
|
||||
/*bundled_skills_enabled*/ true,
|
||||
restriction_product,
|
||||
));
|
||||
@@ -376,6 +508,8 @@ impl ThreadManager {
|
||||
},
|
||||
state_db.clone(),
|
||||
));
|
||||
let artifact_store: Arc<dyn ArtifactStore> =
|
||||
Arc::new(LocalArtifactStore::from_codex_home(&skills_codex_home));
|
||||
Self {
|
||||
state: Arc::new(ThreadManagerState {
|
||||
threads: Arc::new(RwLock::new(HashMap::new())),
|
||||
@@ -388,6 +522,8 @@ impl ThreadManager {
|
||||
mcp_manager,
|
||||
extensions: empty_extension_registry(),
|
||||
thread_store,
|
||||
artifact_store,
|
||||
shell_snapshot_store,
|
||||
attestation_provider: None,
|
||||
auth_manager,
|
||||
session_source: SessionSource::Exec,
|
||||
@@ -1334,6 +1470,8 @@ impl ThreadManagerState {
|
||||
environment_selections,
|
||||
analytics_events_client: self.analytics_events_client.clone(),
|
||||
thread_store: Arc::clone(&self.thread_store),
|
||||
artifact_store: Arc::clone(&self.artifact_store),
|
||||
shell_snapshot_store: Arc::clone(&self.shell_snapshot_store),
|
||||
attestation_provider: self.attestation_provider.clone(),
|
||||
inherited_multi_agent_version: multi_agent_version,
|
||||
}))
|
||||
|
||||
@@ -1,10 +1,14 @@
|
||||
use super::*;
|
||||
use crate::artifact_store::ArtifactStore;
|
||||
use crate::artifact_store::ArtifactWriteFuture;
|
||||
use crate::config::test_config;
|
||||
use crate::init_state_db;
|
||||
use crate::installation_id::INSTALLATION_ID_FILENAME;
|
||||
use crate::rollout::RolloutRecorder;
|
||||
use crate::session::session::SessionSettingsUpdate;
|
||||
use crate::session::tests::make_session_and_context;
|
||||
use crate::shell_snapshot::ShellSnapshotPaths;
|
||||
use crate::shell_snapshot::ShellSnapshotStoreFuture;
|
||||
use crate::tasks::InterruptedTurnHistoryMarker;
|
||||
use crate::tasks::interrupted_turn_history_marker;
|
||||
use codex_extension_api::empty_extension_registry;
|
||||
@@ -33,6 +37,43 @@ use wiremock::MockServer;
|
||||
|
||||
const TEST_INSTALLATION_ID: &str = "11111111-1111-4111-8111-111111111111";
|
||||
|
||||
struct FakeArtifactStore;
|
||||
|
||||
impl ArtifactStore for FakeArtifactStore {
|
||||
fn generated_image_path(&self, _session_id: &str, _call_id: &str) -> AbsolutePathBuf {
|
||||
unreachable!("generated image path should not be requested")
|
||||
}
|
||||
|
||||
fn write_generated_image(
|
||||
&self,
|
||||
_session_id: &str,
|
||||
_call_id: &str,
|
||||
_bytes: Vec<u8>,
|
||||
) -> ArtifactWriteFuture<'_> {
|
||||
unreachable!("generated image write should not be requested")
|
||||
}
|
||||
}
|
||||
|
||||
struct FakeShellSnapshotStore;
|
||||
|
||||
impl ShellSnapshotStore for FakeShellSnapshotStore {
|
||||
fn snapshot_paths(
|
||||
&self,
|
||||
_session_id: ThreadId,
|
||||
_shell_type: crate::shell::ShellType,
|
||||
) -> ShellSnapshotPaths {
|
||||
unreachable!("shell snapshot paths should not be requested")
|
||||
}
|
||||
|
||||
fn cleanup_stale_snapshots(
|
||||
&self,
|
||||
_active_session_id: ThreadId,
|
||||
_state_db: Option<StateDbHandle>,
|
||||
) -> ShellSnapshotStoreFuture<'_> {
|
||||
unreachable!("shell snapshot cleanup should not be requested")
|
||||
}
|
||||
}
|
||||
|
||||
fn user_msg(text: &str) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
@@ -64,6 +105,39 @@ fn developer_interrupted_marker() -> ResponseItem {
|
||||
.expect("developer interrupted marker should be enabled")
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn start_thread_uses_injected_storage_deps() {
|
||||
let config = test_config().await;
|
||||
let artifact_store: Arc<dyn ArtifactStore> = Arc::new(FakeArtifactStore);
|
||||
let shell_snapshot_store: Arc<dyn ShellSnapshotStore> = Arc::new(FakeShellSnapshotStore);
|
||||
let manager = ThreadManager::new_with_storage_deps(
|
||||
&config,
|
||||
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing()),
|
||||
SessionSource::Exec,
|
||||
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
|
||||
empty_extension_registry(),
|
||||
/*analytics_events_client*/ None,
|
||||
thread_store_from_config(&config, /*state_db*/ None),
|
||||
ThreadManagerStorageDeps {
|
||||
artifact_store: Arc::clone(&artifact_store),
|
||||
shell_snapshot_store: Arc::clone(&shell_snapshot_store),
|
||||
},
|
||||
/*state_db*/ None,
|
||||
TEST_INSTALLATION_ID.to_string(),
|
||||
/*attestation_provider*/ None,
|
||||
);
|
||||
let thread = manager.start_thread(config).await.expect("start thread");
|
||||
|
||||
assert!(Arc::ptr_eq(
|
||||
&thread.thread.codex.session.services.artifact_store,
|
||||
&artifact_store
|
||||
));
|
||||
assert!(Arc::ptr_eq(
|
||||
&thread.thread.codex.session.services.shell_snapshot_store,
|
||||
&shell_snapshot_store
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn truncates_before_requested_user_message() {
|
||||
let items = [
|
||||
|
||||
@@ -25,7 +25,6 @@ pub use extensions::prune_old_extension_resources;
|
||||
pub use prompts::build_consolidation_prompt;
|
||||
pub use prompts::build_stage_one_input_message;
|
||||
pub use start::start_memories_startup_task;
|
||||
pub use start::start_memories_startup_task_with_store;
|
||||
pub use storage::rebuild_raw_memories_file_from_memories;
|
||||
pub use storage::rollout_summary_file_stem;
|
||||
pub use storage::sync_rollout_summaries_from_memories;
|
||||
|
||||
@@ -18,7 +18,6 @@ use codex_protocol::protocol::TokenUsage;
|
||||
use codex_rollout::INTERACTIVE_SESSION_SOURCES;
|
||||
use codex_rollout::should_persist_response_item_for_memories;
|
||||
use codex_secrets::redact_secrets;
|
||||
use codex_state::GeneratedMemoryStore;
|
||||
use futures::StreamExt;
|
||||
use serde::Deserialize;
|
||||
use serde_json::Value;
|
||||
@@ -68,17 +67,12 @@ struct StageOneOutput {
|
||||
/// 2) build one stage-1 request context
|
||||
/// 3) run stage-1 extraction jobs in parallel
|
||||
/// 4) emit metrics and logs
|
||||
pub async fn run(
|
||||
context: Arc<MemoryStartupContext>,
|
||||
config: Arc<Config>,
|
||||
store: Arc<dyn GeneratedMemoryStore>,
|
||||
) {
|
||||
pub async fn run(context: Arc<MemoryStartupContext>, config: Arc<Config>) {
|
||||
let stage_one_context = build_request_context(context.as_ref(), config.as_ref()).await;
|
||||
let _phase_one_e2e_timer = stage_one_context.start_timer(MEMORY_PHASE_ONE_E2E_MS);
|
||||
|
||||
// 1. Claim startup job.
|
||||
let Some(claimed_candidates) =
|
||||
claim_startup_jobs(context.as_ref(), &config.memories, store.as_ref()).await
|
||||
let Some(claimed_candidates) = claim_startup_jobs(context.as_ref(), &config.memories).await
|
||||
else {
|
||||
return;
|
||||
};
|
||||
@@ -95,7 +89,6 @@ pub async fn run(
|
||||
let outcomes = run_jobs(
|
||||
context,
|
||||
config,
|
||||
store,
|
||||
claimed_candidates,
|
||||
stage_one_context.clone(),
|
||||
)
|
||||
@@ -115,24 +108,27 @@ pub async fn run(
|
||||
}
|
||||
|
||||
/// Prune old un-used "dead" raw memories.
|
||||
pub async fn prune(store: &dyn GeneratedMemoryStore, config: &Config) {
|
||||
let max_unused_days = config.memories.max_unused_days;
|
||||
match store
|
||||
.prune_stage1_outputs_for_retention(max_unused_days, crate::stage_one::PRUNE_BATCH_SIZE)
|
||||
.await
|
||||
{
|
||||
Ok(pruned) => {
|
||||
if pruned > 0 {
|
||||
info!(
|
||||
"memory startup pruned {pruned} stale stage-1 output row(s) older than {max_unused_days} days"
|
||||
pub async fn prune(context: &MemoryStartupContext, config: &Config) {
|
||||
if let Some(db) = context.state_db() {
|
||||
let max_unused_days = config.memories.max_unused_days;
|
||||
match db
|
||||
.memories()
|
||||
.prune_stage1_outputs_for_retention(max_unused_days, crate::stage_one::PRUNE_BATCH_SIZE)
|
||||
.await
|
||||
{
|
||||
Ok(pruned) => {
|
||||
if pruned > 0 {
|
||||
info!(
|
||||
"memory startup pruned {pruned} stale stage-1 output row(s) older than {max_unused_days} days"
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"memories db prune_stage1_outputs_for_retention failed during memories startup: {err}"
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"memories db prune_stage1_outputs_for_retention failed during memories startup: {err}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -153,14 +149,20 @@ pub fn output_schema() -> Value {
|
||||
async fn claim_startup_jobs(
|
||||
context: &MemoryStartupContext,
|
||||
memories_config: &MemoriesConfig,
|
||||
store: &dyn GeneratedMemoryStore,
|
||||
) -> Option<Vec<codex_state::Stage1JobClaim>> {
|
||||
let Some(state_db) = context.state_db() else {
|
||||
// This should not happen.
|
||||
warn!("state db unavailable while claiming phase-1 startup jobs; skipping");
|
||||
return None;
|
||||
};
|
||||
|
||||
let allowed_sources = INTERACTIVE_SESSION_SOURCES
|
||||
.iter()
|
||||
.map(ToString::to_string)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
match store
|
||||
match state_db
|
||||
.memories()
|
||||
.claim_stage1_jobs_for_startup(
|
||||
context.thread_id(),
|
||||
codex_state::Stage1StartupClaimParams {
|
||||
@@ -201,7 +203,6 @@ async fn build_request_context(
|
||||
async fn run_jobs(
|
||||
context: Arc<MemoryStartupContext>,
|
||||
config: Arc<Config>,
|
||||
store: Arc<dyn GeneratedMemoryStore>,
|
||||
claimed_candidates: Vec<codex_state::Stage1JobClaim>,
|
||||
stage_one_context: StageOneRequestContext,
|
||||
) -> Vec<JobResult> {
|
||||
@@ -209,17 +210,9 @@ async fn run_jobs(
|
||||
.map(|claim| {
|
||||
let context = Arc::clone(&context);
|
||||
let config = Arc::clone(&config);
|
||||
let store = Arc::clone(&store);
|
||||
let stage_one_context = stage_one_context.clone();
|
||||
async move {
|
||||
job::run(
|
||||
context.as_ref(),
|
||||
config.as_ref(),
|
||||
store.as_ref(),
|
||||
claim,
|
||||
&stage_one_context,
|
||||
)
|
||||
.await
|
||||
job::run(context.as_ref(), config.as_ref(), claim, &stage_one_context).await
|
||||
}
|
||||
})
|
||||
.buffer_unordered(crate::stage_one::CONCURRENCY_LIMIT)
|
||||
@@ -233,7 +226,6 @@ mod job {
|
||||
pub(crate) async fn run(
|
||||
context: &MemoryStartupContext,
|
||||
config: &Config,
|
||||
store: &dyn GeneratedMemoryStore,
|
||||
claim: codex_state::Stage1JobClaim,
|
||||
stage_one_context: &StageOneRequestContext,
|
||||
) -> JobResult {
|
||||
@@ -250,7 +242,7 @@ mod job {
|
||||
Ok(output) => output,
|
||||
Err(reason) => {
|
||||
result::failed(
|
||||
store,
|
||||
context,
|
||||
claimed_thread.id,
|
||||
&claim.ownership_token,
|
||||
&reason.to_string(),
|
||||
@@ -265,14 +257,15 @@ mod job {
|
||||
|
||||
if stage_one_output.raw_memory.is_empty() || stage_one_output.rollout_summary.is_empty() {
|
||||
return JobResult {
|
||||
outcome: result::no_output(store, claimed_thread.id, &claim.ownership_token).await,
|
||||
outcome: result::no_output(context, claimed_thread.id, &claim.ownership_token)
|
||||
.await,
|
||||
token_usage,
|
||||
};
|
||||
}
|
||||
|
||||
JobResult {
|
||||
outcome: result::success(
|
||||
store,
|
||||
context,
|
||||
claimed_thread.id,
|
||||
&claim.ownership_token,
|
||||
claimed_thread.updated_at.timestamp(),
|
||||
@@ -332,28 +325,36 @@ mod job {
|
||||
use super::*;
|
||||
|
||||
pub(crate) async fn failed(
|
||||
store: &dyn GeneratedMemoryStore,
|
||||
context: &MemoryStartupContext,
|
||||
thread_id: codex_protocol::ThreadId,
|
||||
ownership_token: &str,
|
||||
reason: &str,
|
||||
) {
|
||||
tracing::warn!("Phase 1 job failed for thread {thread_id}: {reason}");
|
||||
let _ = store
|
||||
.mark_stage1_job_failed(
|
||||
thread_id,
|
||||
ownership_token,
|
||||
reason,
|
||||
crate::stage_one::JOB_RETRY_DELAY_SECONDS,
|
||||
)
|
||||
.await;
|
||||
if let Some(state_db) = context.state_db() {
|
||||
let _ = state_db
|
||||
.memories()
|
||||
.mark_stage1_job_failed(
|
||||
thread_id,
|
||||
ownership_token,
|
||||
reason,
|
||||
crate::stage_one::JOB_RETRY_DELAY_SECONDS,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn no_output(
|
||||
store: &dyn GeneratedMemoryStore,
|
||||
context: &MemoryStartupContext,
|
||||
thread_id: codex_protocol::ThreadId,
|
||||
ownership_token: &str,
|
||||
) -> JobOutcome {
|
||||
if store
|
||||
let Some(state_db) = context.state_db() else {
|
||||
return JobOutcome::Failed;
|
||||
};
|
||||
|
||||
if state_db
|
||||
.memories()
|
||||
.mark_stage1_job_succeeded_no_output(thread_id, ownership_token)
|
||||
.await
|
||||
.unwrap_or(false)
|
||||
@@ -365,7 +366,7 @@ mod job {
|
||||
}
|
||||
|
||||
pub(crate) async fn success(
|
||||
store: &dyn GeneratedMemoryStore,
|
||||
context: &MemoryStartupContext,
|
||||
thread_id: codex_protocol::ThreadId,
|
||||
ownership_token: &str,
|
||||
source_updated_at: i64,
|
||||
@@ -373,7 +374,12 @@ mod job {
|
||||
rollout_summary: &str,
|
||||
rollout_slug: Option<&str>,
|
||||
) -> JobOutcome {
|
||||
if store
|
||||
let Some(state_db) = context.state_db() else {
|
||||
return JobOutcome::Failed;
|
||||
};
|
||||
|
||||
if state_db
|
||||
.memories()
|
||||
.mark_stage1_job_succeeded(
|
||||
thread_id,
|
||||
ownership_token,
|
||||
|
||||
@@ -22,8 +22,8 @@ use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use codex_protocol::protocol::TokenUsage;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use codex_state::GeneratedMemoryStore;
|
||||
use codex_state::Stage1Output;
|
||||
use codex_state::StateRuntime;
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
@@ -42,19 +42,19 @@ struct Counters {
|
||||
|
||||
/// Runs memory phase 2 (aka consolidation) in strict order. The method represents the linear
|
||||
/// flow of the consolidation phase.
|
||||
pub async fn run(
|
||||
context: Arc<MemoryStartupContext>,
|
||||
config: Arc<Config>,
|
||||
store: Arc<dyn GeneratedMemoryStore>,
|
||||
) {
|
||||
pub async fn run(context: Arc<MemoryStartupContext>, config: Arc<Config>) {
|
||||
let phase_two_e2e_timer = context.start_timer(MEMORY_PHASE_TWO_E2E_MS);
|
||||
|
||||
let Some(db) = context.state_db() else {
|
||||
// This should not happen.
|
||||
return;
|
||||
};
|
||||
let root = memory_root(&config.codex_home);
|
||||
let max_raw_memories = config.memories.max_raw_memories_for_consolidation;
|
||||
let max_unused_days = config.memories.max_unused_days;
|
||||
|
||||
// 1. Claim the global Phase 2 lock before touching the memory workspace.
|
||||
let claim = match job::claim(context.as_ref(), store.as_ref()).await {
|
||||
let claim = match job::claim(context.as_ref(), db.as_ref()).await {
|
||||
Ok(claim) => claim,
|
||||
Err(e) => {
|
||||
context.counter(MEMORY_PHASE_TWO_JOBS, /*inc*/ 1, &[("status", e)]);
|
||||
@@ -67,7 +67,7 @@ pub async fn run(
|
||||
tracing::error!("failed preparing memory workspace: {err}");
|
||||
job::failed(
|
||||
context.as_ref(),
|
||||
store.as_ref(),
|
||||
db.as_ref(),
|
||||
&claim,
|
||||
"failed_prepare_workspace",
|
||||
)
|
||||
@@ -81,7 +81,7 @@ pub async fn run(
|
||||
tracing::error!("failed to get agent config");
|
||||
job::failed(
|
||||
context.as_ref(),
|
||||
store.as_ref(),
|
||||
db.as_ref(),
|
||||
&claim,
|
||||
"failed_sandbox_policy",
|
||||
)
|
||||
@@ -90,7 +90,8 @@ pub async fn run(
|
||||
};
|
||||
|
||||
// 4. Load current DB-backed Phase 2 inputs.
|
||||
let raw_memories = match store
|
||||
let raw_memories = match db
|
||||
.memories()
|
||||
.get_phase2_input_selection(max_raw_memories, max_unused_days)
|
||||
.await
|
||||
{
|
||||
@@ -99,7 +100,7 @@ pub async fn run(
|
||||
tracing::error!("failed to list stage1 outputs from global: {err}");
|
||||
job::failed(
|
||||
context.as_ref(),
|
||||
store.as_ref(),
|
||||
db.as_ref(),
|
||||
&claim,
|
||||
"failed_load_stage1_outputs",
|
||||
)
|
||||
@@ -115,7 +116,7 @@ pub async fn run(
|
||||
tracing::error!("failed syncing phase2 workspace inputs: {err}");
|
||||
job::failed(
|
||||
context.as_ref(),
|
||||
store.as_ref(),
|
||||
db.as_ref(),
|
||||
&claim,
|
||||
"failed_sync_workspace_inputs",
|
||||
)
|
||||
@@ -130,7 +131,7 @@ pub async fn run(
|
||||
tracing::error!("failed checking memory workspace changes: {err}");
|
||||
job::failed(
|
||||
context.as_ref(),
|
||||
store.as_ref(),
|
||||
db.as_ref(),
|
||||
&claim,
|
||||
"failed_workspace_status",
|
||||
)
|
||||
@@ -143,7 +144,7 @@ pub async fn run(
|
||||
// We check only after sync of the file system.
|
||||
job::succeed(
|
||||
context.as_ref(),
|
||||
store.as_ref(),
|
||||
db.as_ref(),
|
||||
&claim,
|
||||
new_watermark,
|
||||
&raw_memories,
|
||||
@@ -158,7 +159,7 @@ pub async fn run(
|
||||
tracing::error!("failed writing memory workspace diff file: {err}");
|
||||
job::failed(
|
||||
context.as_ref(),
|
||||
store.as_ref(),
|
||||
db.as_ref(),
|
||||
&claim,
|
||||
"failed_workspace_diff_file",
|
||||
)
|
||||
@@ -175,13 +176,7 @@ pub async fn run(
|
||||
Ok(agent) => agent,
|
||||
Err(err) => {
|
||||
tracing::error!("failed to spawn global memory consolidation agent: {err}");
|
||||
job::failed(
|
||||
context.as_ref(),
|
||||
store.as_ref(),
|
||||
&claim,
|
||||
"failed_spawn_agent",
|
||||
)
|
||||
.await;
|
||||
job::failed(context.as_ref(), db.as_ref(), &claim, "failed_spawn_agent").await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
@@ -189,7 +184,6 @@ pub async fn run(
|
||||
// 9. Hand off completion handling, heartbeats, and baseline reset.
|
||||
agent::handle(
|
||||
Arc::clone(&context),
|
||||
Arc::clone(&store),
|
||||
claim,
|
||||
new_watermark,
|
||||
raw_memories.clone(),
|
||||
@@ -221,9 +215,10 @@ mod job {
|
||||
|
||||
pub(super) async fn claim(
|
||||
context: &MemoryStartupContext,
|
||||
store: &dyn GeneratedMemoryStore,
|
||||
db: &StateRuntime,
|
||||
) -> Result<Claim, &'static str> {
|
||||
let claim = store
|
||||
let claim = db
|
||||
.memories()
|
||||
.try_claim_global_phase2_job(context.thread_id(), crate::stage_two::JOB_LEASE_SECONDS)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
@@ -256,13 +251,13 @@ mod job {
|
||||
|
||||
pub(super) async fn failed(
|
||||
context: &MemoryStartupContext,
|
||||
store: &dyn GeneratedMemoryStore,
|
||||
db: &StateRuntime,
|
||||
claim: &Claim,
|
||||
reason: &'static str,
|
||||
) {
|
||||
context.counter(MEMORY_PHASE_TWO_JOBS, /*inc*/ 1, &[("status", reason)]);
|
||||
if matches!(
|
||||
store
|
||||
db.memories()
|
||||
.mark_global_phase2_job_failed(
|
||||
&claim.token,
|
||||
reason,
|
||||
@@ -271,7 +266,8 @@ mod job {
|
||||
.await,
|
||||
Ok(false)
|
||||
) {
|
||||
let _ = store
|
||||
let _ = db
|
||||
.memories()
|
||||
.mark_global_phase2_job_failed_if_unowned(
|
||||
&claim.token,
|
||||
reason,
|
||||
@@ -283,14 +279,14 @@ mod job {
|
||||
|
||||
pub(super) async fn succeed(
|
||||
context: &MemoryStartupContext,
|
||||
store: &dyn GeneratedMemoryStore,
|
||||
db: &StateRuntime,
|
||||
claim: &Claim,
|
||||
completion_watermark: i64,
|
||||
selected_outputs: &[codex_state::Stage1Output],
|
||||
reason: &'static str,
|
||||
) -> bool {
|
||||
context.counter(MEMORY_PHASE_TWO_JOBS, /*inc*/ 1, &[("status", reason)]);
|
||||
store
|
||||
db.memories()
|
||||
.mark_global_phase2_job_succeeded(&claim.token, completion_watermark, selected_outputs)
|
||||
.await
|
||||
.unwrap_or(false)
|
||||
@@ -362,7 +358,6 @@ mod agent {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(super) fn handle(
|
||||
context: Arc<MemoryStartupContext>,
|
||||
store: Arc<dyn GeneratedMemoryStore>,
|
||||
claim: Claim,
|
||||
new_watermark: i64,
|
||||
selected_outputs: Vec<codex_state::Stage1Output>,
|
||||
@@ -370,13 +365,17 @@ mod agent {
|
||||
agent: SpawnedConsolidationAgent,
|
||||
phase_two_e2e_timer: Option<codex_otel::Timer>,
|
||||
) {
|
||||
let Some(db) = context.state_db() else {
|
||||
return;
|
||||
};
|
||||
|
||||
tokio::spawn(async move {
|
||||
let _phase_two_e2e_timer = phase_two_e2e_timer;
|
||||
let SpawnedConsolidationAgent { thread_id, thread } = agent;
|
||||
|
||||
// Loop the agent until we have the final status.
|
||||
let final_status =
|
||||
loop_agent(Arc::clone(&store), claim.token.clone(), thread_id, &thread).await;
|
||||
loop_agent(db.clone(), claim.token.clone(), thread_id, &thread).await;
|
||||
|
||||
if matches!(final_status, AgentStatus::Completed(_)) {
|
||||
if let Some(token_usage) = thread
|
||||
@@ -387,7 +386,8 @@ mod agent {
|
||||
emit_token_usage_metrics(context.as_ref(), &token_usage);
|
||||
}
|
||||
// Do not reset the workspace baseline if we lost the lock.
|
||||
let still_owns_lock = match store
|
||||
let still_owns_lock = match db
|
||||
.memories()
|
||||
.heartbeat_global_phase2_job(
|
||||
&claim.token,
|
||||
crate::stage_two::JOB_LEASE_SECONDS,
|
||||
@@ -406,12 +406,7 @@ mod agent {
|
||||
false
|
||||
}
|
||||
Err(_) => {
|
||||
job::failed(
|
||||
context.as_ref(),
|
||||
store.as_ref(),
|
||||
&claim,
|
||||
"failed_confirm_ownership",
|
||||
)
|
||||
job::failed(context.as_ref(), &db, &claim, "failed_confirm_ownership")
|
||||
.await;
|
||||
false
|
||||
}
|
||||
@@ -419,16 +414,10 @@ mod agent {
|
||||
if still_owns_lock {
|
||||
if let Err(err) = reset_memory_workspace_baseline(&memory_root).await {
|
||||
tracing::error!("failed resetting memory workspace baseline: {err}");
|
||||
job::failed(
|
||||
context.as_ref(),
|
||||
store.as_ref(),
|
||||
&claim,
|
||||
"failed_workspace_commit",
|
||||
)
|
||||
.await;
|
||||
job::failed(context.as_ref(), &db, &claim, "failed_workspace_commit").await;
|
||||
} else if !job::succeed(
|
||||
context.as_ref(),
|
||||
store.as_ref(),
|
||||
&db,
|
||||
&claim,
|
||||
new_watermark,
|
||||
&selected_outputs,
|
||||
@@ -442,7 +431,7 @@ mod agent {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
job::failed(context.as_ref(), store.as_ref(), &claim, "failed_agent").await;
|
||||
job::failed(context.as_ref(), &db, &claim, "failed_agent").await;
|
||||
}
|
||||
|
||||
let cleanup_context = Arc::clone(&context);
|
||||
@@ -460,7 +449,7 @@ mod agent {
|
||||
}
|
||||
|
||||
async fn loop_agent(
|
||||
store: Arc<dyn GeneratedMemoryStore>,
|
||||
db: Arc<StateRuntime>,
|
||||
token: String,
|
||||
thread_id: ThreadId,
|
||||
thread: &codex_core::CodexThread,
|
||||
@@ -495,7 +484,8 @@ mod agent {
|
||||
_ = status_poll_interval.tick() => {
|
||||
}
|
||||
_ = heartbeat_interval.tick() => {
|
||||
match store
|
||||
match db
|
||||
.memories()
|
||||
.heartbeat_global_phase2_job(
|
||||
&token,
|
||||
crate::stage_two::JOB_LEASE_SECONDS,
|
||||
|
||||
@@ -12,7 +12,6 @@ use codex_features::Feature;
|
||||
use codex_login::AuthManager;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_state::GeneratedMemoryStore;
|
||||
use std::sync::Arc;
|
||||
use tracing::warn;
|
||||
|
||||
@@ -28,57 +27,27 @@ pub fn start_memories_startup_task(
|
||||
config: Arc<Config>,
|
||||
source: &SessionSource,
|
||||
) {
|
||||
if memories_startup_is_disabled(config.as_ref(), source) {
|
||||
if config.ephemeral
|
||||
|| !config.features.enabled(Feature::MemoryTool)
|
||||
|| source.is_non_root_agent()
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
let context = memory_startup_context(
|
||||
let context = Arc::new(MemoryStartupContext::new(
|
||||
thread_manager,
|
||||
Arc::clone(&auth_manager),
|
||||
thread_id,
|
||||
thread,
|
||||
config.as_ref(),
|
||||
source,
|
||||
);
|
||||
let Some(state_db) = context.state_db() else {
|
||||
source.clone(),
|
||||
));
|
||||
|
||||
if context.state_db().is_none() {
|
||||
warn!("state db unavailable for memories startup pipeline; skipping");
|
||||
return;
|
||||
};
|
||||
let store: Arc<dyn GeneratedMemoryStore> = Arc::new(state_db.memories().clone());
|
||||
spawn_memories_startup_task(context, auth_manager, config, store);
|
||||
}
|
||||
|
||||
/// Starts startup memory generation with an injected generated-memory store.
|
||||
pub fn start_memories_startup_task_with_store(
|
||||
thread_manager: Arc<ThreadManager>,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
thread_id: ThreadId,
|
||||
thread: Arc<CodexThread>,
|
||||
config: Arc<Config>,
|
||||
source: &SessionSource,
|
||||
store: Arc<dyn GeneratedMemoryStore>,
|
||||
) {
|
||||
if memories_startup_is_disabled(config.as_ref(), source) {
|
||||
return;
|
||||
}
|
||||
|
||||
let context = memory_startup_context(
|
||||
thread_manager,
|
||||
Arc::clone(&auth_manager),
|
||||
thread_id,
|
||||
thread,
|
||||
config.as_ref(),
|
||||
source,
|
||||
);
|
||||
spawn_memories_startup_task(context, auth_manager, config, store);
|
||||
}
|
||||
|
||||
fn spawn_memories_startup_task(
|
||||
context: Arc<MemoryStartupContext>,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
config: Arc<Config>,
|
||||
store: Arc<dyn GeneratedMemoryStore>,
|
||||
) {
|
||||
tokio::spawn(async move {
|
||||
let root = memory_root(&config.codex_home);
|
||||
if let Err(err) = tokio::fs::create_dir_all(&root).await {
|
||||
@@ -91,7 +60,7 @@ fn spawn_memories_startup_task(
|
||||
|
||||
// Clean memories to make preserve DB size. This does not consume tokens so can be
|
||||
// done before the quota check.
|
||||
phase1::prune(store.as_ref(), &config).await;
|
||||
phase1::prune(context.as_ref(), &config).await;
|
||||
|
||||
if !guard::rate_limits_ok(&auth_manager, &config).await {
|
||||
context.counter(
|
||||
@@ -103,35 +72,8 @@ fn spawn_memories_startup_task(
|
||||
}
|
||||
|
||||
// Run phase 1.
|
||||
phase1::run(
|
||||
Arc::clone(&context),
|
||||
Arc::clone(&config),
|
||||
Arc::clone(&store),
|
||||
)
|
||||
.await;
|
||||
phase1::run(Arc::clone(&context), Arc::clone(&config)).await;
|
||||
// Run phase 2.
|
||||
phase2::run(context, config, store).await;
|
||||
phase2::run(context, config).await;
|
||||
});
|
||||
}
|
||||
|
||||
fn memory_startup_context(
|
||||
thread_manager: Arc<ThreadManager>,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
thread_id: ThreadId,
|
||||
thread: Arc<CodexThread>,
|
||||
config: &Config,
|
||||
source: &SessionSource,
|
||||
) -> Arc<MemoryStartupContext> {
|
||||
Arc::new(MemoryStartupContext::new(
|
||||
thread_manager,
|
||||
auth_manager,
|
||||
thread_id,
|
||||
thread,
|
||||
config,
|
||||
source.clone(),
|
||||
))
|
||||
}
|
||||
|
||||
fn memories_startup_is_disabled(config: &Config, source: &SessionSource) -> bool {
|
||||
config.ephemeral || !config.features.enabled(Feature::MemoryTool) || source.is_non_root_agent()
|
||||
}
|
||||
|
||||
@@ -1,106 +0,0 @@
|
||||
use crate::MemoryStore;
|
||||
use crate::Phase2JobClaimOutcome;
|
||||
use crate::Stage1JobClaim;
|
||||
use crate::Stage1Output;
|
||||
use crate::Stage1StartupClaimParams;
|
||||
use codex_protocol::ThreadId;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
|
||||
/// Boxed generated-memory store future usable behind a trait object.
|
||||
pub type GeneratedMemoryStoreFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
|
||||
|
||||
macro_rules! generated_memory_store {
|
||||
($( $(#[$method_doc:meta])* fn $method:ident<$lifetime:lifetime>(
|
||||
$($arg:ident: $arg_type:ty),* $(,)?
|
||||
) -> $output:ty; )*) => {
|
||||
/// Persistence boundary for generated-memory startup job and output state.
|
||||
///
|
||||
/// Implementations own the stage-1 extraction rows plus the singleton
|
||||
/// phase-2 consolidation lease. Callers rely on [`MemoryStore`]
|
||||
/// ownership-token semantics: successful writes return `false` after
|
||||
/// the caller loses the relevant job.
|
||||
pub trait GeneratedMemoryStore: Send + Sync {
|
||||
$( $(#[$method_doc])* fn $method<$lifetime>(
|
||||
&$lifetime self,
|
||||
$($arg: $arg_type),*
|
||||
) -> GeneratedMemoryStoreFuture<$lifetime, $output>; )*
|
||||
}
|
||||
|
||||
impl GeneratedMemoryStore for MemoryStore {
|
||||
$( fn $method<$lifetime>(
|
||||
&$lifetime self,
|
||||
$($arg: $arg_type),*
|
||||
) -> GeneratedMemoryStoreFuture<$lifetime, $output> {
|
||||
Box::pin(MemoryStore::$method(self, $($arg),*))
|
||||
} )*
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
generated_memory_store! {
|
||||
/// Prunes stale generated stage-1 outputs that are no longer retained.
|
||||
fn prune_stage1_outputs_for_retention<'a>(
|
||||
max_unused_days: i64,
|
||||
limit: usize,
|
||||
) -> anyhow::Result<usize>;
|
||||
/// Claims eligible startup stage-1 extraction jobs.
|
||||
fn claim_stage1_jobs_for_startup<'a>(
|
||||
current_thread_id: ThreadId,
|
||||
params: Stage1StartupClaimParams<'a>,
|
||||
) -> anyhow::Result<Vec<Stage1JobClaim>>;
|
||||
/// Marks an owned stage-1 extraction job successful with generated output.
|
||||
fn mark_stage1_job_succeeded<'a>(
|
||||
thread_id: ThreadId,
|
||||
ownership_token: &'a str,
|
||||
source_updated_at: i64,
|
||||
raw_memory: &'a str,
|
||||
rollout_summary: &'a str,
|
||||
rollout_slug: Option<&'a str>,
|
||||
) -> anyhow::Result<bool>;
|
||||
/// Marks an owned stage-1 extraction job successful without output.
|
||||
fn mark_stage1_job_succeeded_no_output<'a>(
|
||||
thread_id: ThreadId,
|
||||
ownership_token: &'a str,
|
||||
) -> anyhow::Result<bool>;
|
||||
/// Marks an owned stage-1 extraction job failed with retry backoff.
|
||||
fn mark_stage1_job_failed<'a>(
|
||||
thread_id: ThreadId,
|
||||
ownership_token: &'a str,
|
||||
failure_reason: &'a str,
|
||||
retry_delay_seconds: i64,
|
||||
) -> anyhow::Result<bool>;
|
||||
/// Claims the singleton global phase-2 consolidation lease.
|
||||
fn try_claim_global_phase2_job<'a>(
|
||||
worker_id: ThreadId,
|
||||
lease_seconds: i64,
|
||||
) -> anyhow::Result<Phase2JobClaimOutcome>;
|
||||
/// Returns the current generated-memory inputs for phase-2 consolidation.
|
||||
fn get_phase2_input_selection<'a>(
|
||||
n: usize,
|
||||
max_unused_days: i64,
|
||||
) -> anyhow::Result<Vec<Stage1Output>>;
|
||||
/// Extends the owned singleton global phase-2 consolidation lease.
|
||||
fn heartbeat_global_phase2_job<'a>(
|
||||
ownership_token: &'a str,
|
||||
lease_seconds: i64,
|
||||
) -> anyhow::Result<bool>;
|
||||
/// Marks the owned singleton global phase-2 consolidation job successful.
|
||||
fn mark_global_phase2_job_succeeded<'a>(
|
||||
ownership_token: &'a str,
|
||||
completed_watermark: i64,
|
||||
selected_outputs: &'a [Stage1Output],
|
||||
) -> anyhow::Result<bool>;
|
||||
/// Marks the owned singleton global phase-2 consolidation job failed.
|
||||
fn mark_global_phase2_job_failed<'a>(
|
||||
ownership_token: &'a str,
|
||||
failure_reason: &'a str,
|
||||
retry_delay_seconds: i64,
|
||||
) -> anyhow::Result<bool>;
|
||||
/// Finalizes a failed singleton phase-2 job after ownership may be lost.
|
||||
fn mark_global_phase2_job_failed_if_unowned<'a>(
|
||||
ownership_token: &'a str,
|
||||
failure_reason: &'a str,
|
||||
retry_delay_seconds: i64,
|
||||
) -> anyhow::Result<bool>;
|
||||
}
|
||||
@@ -6,7 +6,6 @@
|
||||
|
||||
mod audit;
|
||||
mod extract;
|
||||
mod generated_memory_store;
|
||||
pub mod log_db;
|
||||
mod migrations;
|
||||
mod model;
|
||||
@@ -28,8 +27,6 @@ pub use audit::read_thread_state_audit_rows;
|
||||
/// Most consumers should prefer [`StateRuntime`].
|
||||
pub use extract::apply_rollout_item;
|
||||
pub use extract::rollout_item_affects_thread_metadata;
|
||||
pub use generated_memory_store::GeneratedMemoryStore;
|
||||
pub use generated_memory_store::GeneratedMemoryStoreFuture;
|
||||
pub use model::AgentJob;
|
||||
pub use model::AgentJobCreateParams;
|
||||
pub use model::AgentJobItem;
|
||||
|
||||
Reference in New Issue
Block a user