diff --git a/codex-rs/app-server-protocol/schema/json/ServerNotification.json b/codex-rs/app-server-protocol/schema/json/ServerNotification.json index b4b271eadd..3277e5650f 100644 --- a/codex-rs/app-server-protocol/schema/json/ServerNotification.json +++ b/codex-rs/app-server-protocol/schema/json/ServerNotification.json @@ -5997,7 +5997,8 @@ { "enum": [ "review", - "compact" + "compact", + "memory_consolidation" ], "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index 581869a60e..d808219e40 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -9126,7 +9126,8 @@ { "enum": [ "review", - "compact" + "compact", + "memory_consolidation" ], "type": "string" }, @@ -14480,7 +14481,8 @@ { "enum": [ "review", - "compact" + "compact", + "memory_consolidation" ], "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v1/GetConversationSummaryResponse.json b/codex-rs/app-server-protocol/schema/json/v1/GetConversationSummaryResponse.json index 954ac28ed1..a54b7ddac1 100644 --- a/codex-rs/app-server-protocol/schema/json/v1/GetConversationSummaryResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v1/GetConversationSummaryResponse.json @@ -113,7 +113,8 @@ { "enum": [ "review", - "compact" + "compact", + "memory_consolidation" ], "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v1/ListConversationsResponse.json b/codex-rs/app-server-protocol/schema/json/v1/ListConversationsResponse.json index b7e3b8f8f1..bb5a0f4861 100644 --- a/codex-rs/app-server-protocol/schema/json/v1/ListConversationsResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v1/ListConversationsResponse.json @@ -113,7 +113,8 @@ { "enum": [ "review", - "compact" + "compact", + "memory_consolidation" ], "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadForkResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadForkResponse.json index 10b9d50007..de1823e9de 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadForkResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadForkResponse.json @@ -666,7 +666,8 @@ { "enum": [ "review", - "compact" + "compact", + "memory_consolidation" ], "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadListResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadListResponse.json index 6fe793cc19..96e3761978 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadListResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadListResponse.json @@ -472,7 +472,8 @@ { "enum": [ "review", - "compact" + "compact", + "memory_consolidation" ], "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadReadResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadReadResponse.json index 444db89431..e0d4cc4f7a 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadReadResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadReadResponse.json @@ -472,7 +472,8 @@ { "enum": [ "review", - "compact" + "compact", + "memory_consolidation" ], "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json index 9b2fb74003..4385565f96 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json @@ -666,7 +666,8 @@ { "enum": [ "review", - "compact" + "compact", + "memory_consolidation" ], "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadRollbackResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadRollbackResponse.json index b6ec97be10..9f81c5ea7b 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadRollbackResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadRollbackResponse.json @@ -472,7 +472,8 @@ { "enum": [ "review", - "compact" + "compact", + "memory_consolidation" ], "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json index 0d22e08810..fd75b56a28 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json @@ -666,7 +666,8 @@ { "enum": [ "review", - "compact" + "compact", + "memory_consolidation" ], "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartedNotification.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartedNotification.json index 6a9324409e..3ad16408ee 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartedNotification.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartedNotification.json @@ -472,7 +472,8 @@ { "enum": [ "review", - "compact" + "compact", + "memory_consolidation" ], "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadUnarchiveResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadUnarchiveResponse.json index 2c1c9950fe..3f8bd66861 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadUnarchiveResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadUnarchiveResponse.json @@ -472,7 +472,8 @@ { "enum": [ "review", - "compact" + "compact", + "memory_consolidation" ], "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/typescript/SubAgentSource.ts b/codex-rs/app-server-protocol/schema/typescript/SubAgentSource.ts index d6da7a466b..269a411be9 100644 --- a/codex-rs/app-server-protocol/schema/typescript/SubAgentSource.ts +++ b/codex-rs/app-server-protocol/schema/typescript/SubAgentSource.ts @@ -3,4 +3,4 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { ThreadId } from "./ThreadId"; -export type SubAgentSource = "review" | "compact" | { "thread_spawn": { parent_thread_id: ThreadId, depth: number, } } | { "other": string }; +export type SubAgentSource = "review" | "compact" | { "thread_spawn": { parent_thread_id: ThreadId, depth: number, } } | "memory_consolidation" | { "other": string }; diff --git a/codex-rs/arg0/src/lib.rs b/codex-rs/arg0/src/lib.rs index 9c455ddbba..9563737839 100644 --- a/codex-rs/arg0/src/lib.rs +++ b/codex-rs/arg0/src/lib.rs @@ -12,6 +12,8 @@ const LINUX_SANDBOX_ARG0: &str = "codex-linux-sandbox"; const APPLY_PATCH_ARG0: &str = "apply_patch"; const MISSPELLED_APPLY_PATCH_ARG0: &str = "applypatch"; const LOCK_FILENAME: &str = ".lock"; +#[cfg(target_os = "windows")] +const WINDOWS_TOKIO_WORKER_STACK_SIZE_BYTES: usize = 16 * 1024 * 1024; /// Keeps the per-session PATH entry alive and locked for the process lifetime. pub struct Arg0PathEntryGuard { @@ -112,7 +114,7 @@ where // Regular invocation – create a Tokio runtime and execute the provided // async entry-point. - let runtime = tokio::runtime::Runtime::new()?; + let runtime = build_runtime()?; runtime.block_on(async move { let codex_linux_sandbox_exe: Option = if cfg!(target_os = "linux") { std::env::current_exe().ok() @@ -124,6 +126,18 @@ where }) } +fn build_runtime() -> anyhow::Result { + let mut builder = tokio::runtime::Builder::new_multi_thread(); + builder.enable_all(); + #[cfg(target_os = "windows")] + { + // Defensive hardening: Windows worker threads have lower effective + // stack headroom, so use a larger stack for runtime workers. + builder.thread_stack_size(WINDOWS_TOKIO_WORKER_STACK_SIZE_BYTES); + } + Ok(builder.build()?) +} + const ILLEGAL_ENV_VAR_PREFIX: &str = "CODEX_"; /// Load env vars from ~/.codex/.env. diff --git a/codex-rs/codex-api/src/requests/headers.rs b/codex-rs/codex-api/src/requests/headers.rs index 02f8c61c31..d1ab834109 100644 --- a/codex-rs/codex-api/src/requests/headers.rs +++ b/codex-rs/codex-api/src/requests/headers.rs @@ -17,6 +17,9 @@ pub(crate) fn subagent_header(source: &Option) -> Option match sub { codex_protocol::protocol::SubAgentSource::Review => Some("review".to_string()), codex_protocol::protocol::SubAgentSource::Compact => Some("compact".to_string()), + codex_protocol::protocol::SubAgentSource::MemoryConsolidation => { + Some("memory_consolidation".to_string()) + } codex_protocol::protocol::SubAgentSource::ThreadSpawn { .. } => { Some("collab_spawn".to_string()) } diff --git a/codex-rs/core/config.schema.json b/codex-rs/core/config.schema.json index b95a36a78e..c712d51fa4 100644 --- a/codex-rs/core/config.schema.json +++ b/codex-rs/core/config.schema.json @@ -434,6 +434,43 @@ } ] }, + "MemoriesToml": { + "additionalProperties": false, + "description": "Memories settings loaded from config.toml.", + "properties": { + "max_raw_memories_for_global": { + "description": "Maximum number of recent raw memories retained for global consolidation.", + "format": "uint", + "minimum": 0.0, + "type": "integer" + }, + "max_rollout_age_days": { + "description": "Maximum age of the threads used for memories.", + "format": "int64", + "type": "integer" + }, + "max_rollouts_per_startup": { + "description": "Maximum number of rollout candidates processed per pass.", + "format": "uint", + "minimum": 0.0, + "type": "integer" + }, + "min_rollout_idle_hours": { + "description": "Minimum idle time between last thread activity and memory creation (hours). > 12h recommended.", + "format": "int64", + "type": "integer" + }, + "phase_1_model": { + "description": "Model used for thread summarisation.", + "type": "string" + }, + "phase_2_model": { + "description": "Model used for memory consolidation.", + "type": "string" + } + }, + "type": "object" + }, "ModeKind": { "description": "Initial collaboration mode to use when the TUI starts.", "enum": [ @@ -1481,6 +1518,14 @@ "description": "Definition for MCP servers that Codex can reach out to for tool calls.", "type": "object" }, + "memories": { + "allOf": [ + { + "$ref": "#/definitions/MemoriesToml" + } + ], + "description": "Memories subsystem settings." + }, "model": { "description": "Optional override of model selection.", "type": "string" diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index bbf9b26c98..ec0d1e54f0 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -325,6 +325,9 @@ impl ModelClient { let subagent = match sub { crate::protocol::SubAgentSource::Review => "review".to_string(), crate::protocol::SubAgentSource::Compact => "compact".to_string(), + crate::protocol::SubAgentSource::MemoryConsolidation => { + "memory_consolidation".to_string() + } crate::protocol::SubAgentSource::ThreadSpawn { .. } => "collab_spawn".to_string(), crate::protocol::SubAgentSource::Other(label) => label.clone(), }; diff --git a/codex-rs/core/src/config/mod.rs b/codex-rs/core/src/config/mod.rs index d5de27a40d..5cfdcf0441 100644 --- a/codex-rs/core/src/config/mod.rs +++ b/codex-rs/core/src/config/mod.rs @@ -7,6 +7,8 @@ use crate::config::types::History; use crate::config::types::McpServerConfig; use crate::config::types::McpServerDisabledReason; use crate::config::types::McpServerTransportConfig; +use crate::config::types::MemoriesConfig; +use crate::config::types::MemoriesToml; use crate::config::types::Notice; use crate::config::types::NotificationMethod; use crate::config::types::Notifications; @@ -289,6 +291,9 @@ pub struct Config { /// Maximum number of agent threads that can be open concurrently. pub agent_max_threads: Option, + /// Memories subsystem settings. + pub memories: MemoriesConfig, + /// Directory containing all Codex state (defaults to `~/.codex` but can be /// overridden by the `CODEX_HOME` environment variable). pub codex_home: PathBuf, @@ -1006,6 +1011,9 @@ pub struct ConfigToml { /// Agent-related settings (thread limits, etc.). pub agents: Option, + /// Memories subsystem settings. + pub memories: Option, + /// User-level skill config entries keyed by SKILL.md path. pub skills: Option, @@ -1771,6 +1779,7 @@ impl Config { .collect(), tool_output_token_limit: cfg.tool_output_token_limit, agent_max_threads, + memories: cfg.memories.unwrap_or_default().into(), codex_home, log_dir, config_layer_stack, @@ -1985,6 +1994,8 @@ mod tests { use crate::config::types::FeedbackConfigToml; use crate::config::types::HistoryPersistence; use crate::config::types::McpServerTransportConfig; + use crate::config::types::MemoriesConfig; + use crate::config::types::MemoriesToml; use crate::config::types::NotificationMethod; use crate::config::types::Notifications; use crate::config_loader::RequirementSource; @@ -2068,6 +2079,47 @@ persistence = "none" }), history_no_persistence_cfg.history ); + + let memories = r#" +[memories] +max_raw_memories_for_global = 512 +max_rollout_age_days = 42 +max_rollouts_per_startup = 9 +min_rollout_idle_hours = 24 +phase_1_model = "gpt-5-mini" +phase_2_model = "gpt-5" +"#; + let memories_cfg = + toml::from_str::(memories).expect("TOML deserialization should succeed"); + assert_eq!( + Some(MemoriesToml { + max_raw_memories_for_global: Some(512), + max_rollout_age_days: Some(42), + max_rollouts_per_startup: Some(9), + min_rollout_idle_hours: Some(24), + phase_1_model: Some("gpt-5-mini".to_string()), + phase_2_model: Some("gpt-5".to_string()), + }), + memories_cfg.memories + ); + + let config = Config::load_from_base_config_with_overrides( + memories_cfg, + ConfigOverrides::default(), + tempdir().expect("tempdir").path().to_path_buf(), + ) + .expect("load config from memories settings"); + assert_eq!( + config.memories, + MemoriesConfig { + max_raw_memories_for_global: 512, + max_rollout_age_days: 42, + max_rollouts_per_startup: 9, + min_rollout_idle_hours: 24, + phase_1_model: Some("gpt-5-mini".to_string()), + phase_2_model: Some("gpt-5".to_string()), + } + ); } #[test] @@ -4047,6 +4099,7 @@ model_verbosity = "high" project_doc_fallback_filenames: Vec::new(), tool_output_token_limit: None, agent_max_threads: DEFAULT_AGENT_MAX_THREADS, + memories: MemoriesConfig::default(), codex_home: fixture.codex_home(), log_dir: fixture.codex_home().join("log"), config_layer_stack: Default::default(), @@ -4156,6 +4209,7 @@ model_verbosity = "high" project_doc_fallback_filenames: Vec::new(), tool_output_token_limit: None, agent_max_threads: DEFAULT_AGENT_MAX_THREADS, + memories: MemoriesConfig::default(), codex_home: fixture.codex_home(), log_dir: fixture.codex_home().join("log"), config_layer_stack: Default::default(), @@ -4263,6 +4317,7 @@ model_verbosity = "high" project_doc_fallback_filenames: Vec::new(), tool_output_token_limit: None, agent_max_threads: DEFAULT_AGENT_MAX_THREADS, + memories: MemoriesConfig::default(), codex_home: fixture.codex_home(), log_dir: fixture.codex_home().join("log"), config_layer_stack: Default::default(), @@ -4356,6 +4411,7 @@ model_verbosity = "high" project_doc_fallback_filenames: Vec::new(), tool_output_token_limit: None, agent_max_threads: DEFAULT_AGENT_MAX_THREADS, + memories: MemoriesConfig::default(), codex_home: fixture.codex_home(), log_dir: fixture.codex_home().join("log"), config_layer_stack: Default::default(), diff --git a/codex-rs/core/src/config/types.rs b/codex-rs/core/src/config/types.rs index 5f54c1df28..80c16281b0 100644 --- a/codex-rs/core/src/config/types.rs +++ b/codex-rs/core/src/config/types.rs @@ -23,6 +23,10 @@ use serde::Serialize; use serde::de::Error as SerdeError; pub const DEFAULT_OTEL_ENVIRONMENT: &str = "dev"; +pub const DEFAULT_MEMORIES_MAX_ROLLOUTS_PER_STARTUP: usize = 8; +pub const DEFAULT_MEMORIES_MAX_ROLLOUT_AGE_DAYS: i64 = 30; +pub const DEFAULT_MEMORIES_MIN_ROLLOUT_IDLE_HOURS: i64 = 12; +pub const DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL: usize = 1_024; #[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, JsonSchema)] #[serde(rename_all = "kebab-case")] @@ -353,6 +357,74 @@ pub struct FeedbackConfigToml { pub enabled: Option, } +/// Memories settings loaded from config.toml. +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema)] +#[schemars(deny_unknown_fields)] +pub struct MemoriesToml { + /// Maximum number of recent raw memories retained for global consolidation. + pub max_raw_memories_for_global: Option, + /// Maximum age of the threads used for memories. + pub max_rollout_age_days: Option, + /// Maximum number of rollout candidates processed per pass. + pub max_rollouts_per_startup: Option, + /// Minimum idle time between last thread activity and memory creation (hours). > 12h recommended. + pub min_rollout_idle_hours: Option, + /// Model used for thread summarisation. + pub phase_1_model: Option, + /// Model used for memory consolidation. + pub phase_2_model: Option, +} + +/// Effective memories settings after defaults are applied. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct MemoriesConfig { + pub max_raw_memories_for_global: usize, + pub max_rollout_age_days: i64, + pub max_rollouts_per_startup: usize, + pub min_rollout_idle_hours: i64, + pub phase_1_model: Option, + pub phase_2_model: Option, +} + +impl Default for MemoriesConfig { + fn default() -> Self { + Self { + max_raw_memories_for_global: DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL, + max_rollout_age_days: DEFAULT_MEMORIES_MAX_ROLLOUT_AGE_DAYS, + max_rollouts_per_startup: DEFAULT_MEMORIES_MAX_ROLLOUTS_PER_STARTUP, + min_rollout_idle_hours: DEFAULT_MEMORIES_MIN_ROLLOUT_IDLE_HOURS, + phase_1_model: None, + phase_2_model: None, + } + } +} + +impl From for MemoriesConfig { + fn from(toml: MemoriesToml) -> Self { + let defaults = Self::default(); + Self { + max_raw_memories_for_global: toml + .max_raw_memories_for_global + .unwrap_or(defaults.max_raw_memories_for_global) + .min(4096), + max_rollout_age_days: toml + .max_rollout_age_days + .unwrap_or(defaults.max_rollout_age_days) + .clamp(0, 90), + max_rollouts_per_startup: toml + .max_rollouts_per_startup + .unwrap_or(defaults.max_rollouts_per_startup) + .min(128), + min_rollout_idle_hours: toml + .min_rollout_idle_hours + .unwrap_or(defaults.min_rollout_idle_hours) + .clamp(1, 48), + phase_1_model: toml.phase_1_model, + phase_2_model: toml.phase_2_model, + } + } +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema)] #[serde(rename_all = "snake_case")] pub enum AppDisabledReason { diff --git a/codex-rs/core/src/memories/dispatch.rs b/codex-rs/core/src/memories/dispatch.rs deleted file mode 100644 index bfa6daa5a8..0000000000 --- a/codex-rs/core/src/memories/dispatch.rs +++ /dev/null @@ -1,688 +0,0 @@ -use crate::codex::Session; -use crate::config::Config; -use crate::config::Constrained; -use crate::memories::memory_root; -use crate::memories::metrics; -use crate::memories::phase_two; -use crate::memories::phase2::spawn_phase2_completion_task; -use crate::memories::prompts::build_consolidation_prompt; -use crate::memories::storage::rebuild_raw_memories_file_from_memories; -use crate::memories::storage::sync_rollout_summaries_from_memories; -use codex_protocol::protocol::AskForApproval; -use codex_protocol::protocol::SandboxPolicy; -use codex_protocol::protocol::SessionSource; -use codex_protocol::protocol::SubAgentSource; -use codex_protocol::user_input::UserInput; -use codex_utils_absolute_path::AbsolutePathBuf; -use std::sync::Arc; -use tracing::debug; -use tracing::info; -use tracing::warn; - -//TODO(jif) clean. - -fn completion_watermark( - claimed_watermark: i64, - latest_memories: &[codex_state::Stage1Output], -) -> i64 { - latest_memories - .iter() - .map(|memory| memory.source_updated_at.timestamp()) - .max() - .unwrap_or(claimed_watermark) - .max(claimed_watermark) -} - -pub(in crate::memories) async fn run_global_memory_consolidation( - session: &Arc, - config: Arc, -) -> bool { - let otel_manager = &session.services.otel_manager; - let Some(state_db) = session.services.state_db.as_deref() else { - warn!("state db unavailable; skipping global memory consolidation"); - otel_manager.counter( - metrics::MEMORY_PHASE_TWO_JOBS, - 1, - &[("status", "skipped_state_db_unavailable")], - ); - return false; - }; - - let claim = match state_db - .try_claim_global_phase2_job(session.conversation_id, phase_two::JOB_LEASE_SECONDS) - .await - { - Ok(claim) => claim, - Err(err) => { - warn!("state db try_claim_global_phase2_job failed during memories startup: {err}"); - otel_manager.counter( - metrics::MEMORY_PHASE_TWO_JOBS, - 1, - &[("status", "failed_claim")], - ); - return false; - } - }; - let (ownership_token, claimed_watermark) = match claim { - codex_state::Phase2JobClaimOutcome::Claimed { - ownership_token, - input_watermark, - } => { - otel_manager.counter(metrics::MEMORY_PHASE_TWO_JOBS, 1, &[("status", "claimed")]); - (ownership_token, input_watermark) - } - codex_state::Phase2JobClaimOutcome::SkippedNotDirty => { - debug!("memory phase-2 global lock is up-to-date; skipping consolidation"); - otel_manager.counter( - metrics::MEMORY_PHASE_TWO_JOBS, - 1, - &[("status", "skipped_not_dirty")], - ); - return false; - } - codex_state::Phase2JobClaimOutcome::SkippedRunning => { - debug!("memory phase-2 global consolidation already running; skipping"); - otel_manager.counter( - metrics::MEMORY_PHASE_TWO_JOBS, - 1, - &[("status", "skipped_running")], - ); - return false; - } - }; - - let root = memory_root(&config.codex_home); - let consolidation_config = { - let mut consolidation_config = config.as_ref().clone(); - consolidation_config.cwd = root.clone(); - consolidation_config.permissions.approval_policy = - Constrained::allow_only(AskForApproval::Never); - let mut writable_roots = Vec::new(); - match AbsolutePathBuf::from_absolute_path(consolidation_config.codex_home.clone()) { - Ok(codex_home) => writable_roots.push(codex_home), - Err(err) => warn!( - "memory phase-2 consolidation could not add codex_home writable root {}: {err}", - consolidation_config.codex_home.display() - ), - } - let consolidation_sandbox_policy = SandboxPolicy::WorkspaceWrite { - writable_roots, - read_only_access: Default::default(), - network_access: false, - exclude_tmpdir_env_var: false, - exclude_slash_tmp: false, - }; - if let Err(err) = consolidation_config - .permissions - .sandbox_policy - .set(consolidation_sandbox_policy) - { - warn!("memory phase-2 consolidation sandbox policy was rejected by constraints: {err}"); - otel_manager.counter( - metrics::MEMORY_PHASE_TWO_JOBS, - 1, - &[("status", "failed_sandbox_policy")], - ); - let _ = state_db - .mark_global_phase2_job_failed( - &ownership_token, - "consolidation sandbox policy was rejected by constraints", - phase_two::JOB_RETRY_DELAY_SECONDS, - ) - .await; - return false; - } - consolidation_config - }; - - let latest_memories = match state_db - .list_stage1_outputs_for_global(phase_two::MAX_RAW_MEMORIES_FOR_GLOBAL) - .await - { - Ok(memories) => memories, - Err(err) => { - warn!("state db list_stage1_outputs_for_global failed during consolidation: {err}"); - otel_manager.counter( - metrics::MEMORY_PHASE_TWO_JOBS, - 1, - &[("status", "failed_load_stage1_outputs")], - ); - let _ = state_db - .mark_global_phase2_job_failed( - &ownership_token, - "failed to read stage-1 outputs before global consolidation", - phase_two::JOB_RETRY_DELAY_SECONDS, - ) - .await; - return false; - } - }; - if !latest_memories.is_empty() { - otel_manager.counter( - metrics::MEMORY_PHASE_TWO_INPUT, - latest_memories.len() as i64, - &[], - ); - } - let completion_watermark = completion_watermark(claimed_watermark, &latest_memories); - if let Err(err) = sync_rollout_summaries_from_memories(&root, &latest_memories).await { - warn!("failed syncing local memory artifacts for global consolidation: {err}"); - otel_manager.counter( - metrics::MEMORY_PHASE_TWO_JOBS, - 1, - &[("status", "failed_sync_artifacts")], - ); - let _ = state_db - .mark_global_phase2_job_failed( - &ownership_token, - "failed syncing local memory artifacts", - phase_two::JOB_RETRY_DELAY_SECONDS, - ) - .await; - return false; - } - - if let Err(err) = rebuild_raw_memories_file_from_memories(&root, &latest_memories).await { - warn!("failed rebuilding raw memories aggregate for global consolidation: {err}"); - otel_manager.counter( - metrics::MEMORY_PHASE_TWO_JOBS, - 1, - &[("status", "failed_rebuild_raw_memories")], - ); - let _ = state_db - .mark_global_phase2_job_failed( - &ownership_token, - "failed rebuilding raw memories aggregate", - phase_two::JOB_RETRY_DELAY_SECONDS, - ) - .await; - return false; - } - if latest_memories.is_empty() { - debug!("memory phase-2 has no stage-1 outputs; finalized local memory artifacts"); - let _ = state_db - .mark_global_phase2_job_succeeded(&ownership_token, completion_watermark) - .await; - otel_manager.counter( - metrics::MEMORY_PHASE_TWO_JOBS, - 1, - &[("status", "succeeded_no_input")], - ); - return false; - } - - let prompt = build_consolidation_prompt(&root); - let input = vec![UserInput::Text { - text: prompt, - text_elements: vec![], - }]; - let source = SessionSource::SubAgent(SubAgentSource::Other( - phase_two::MEMORY_CONSOLIDATION_SUBAGENT_LABEL.to_string(), - )); - - match session - .services - .agent_control - .spawn_agent(consolidation_config, input, Some(source)) - .await - { - Ok(consolidation_agent_id) => { - info!( - "memory phase-2 global consolidation agent started: agent_id={consolidation_agent_id}" - ); - otel_manager.counter( - metrics::MEMORY_PHASE_TWO_JOBS, - 1, - &[("status", "agent_spawned")], - ); - spawn_phase2_completion_task( - session.as_ref(), - ownership_token, - completion_watermark, - consolidation_agent_id, - ); - true - } - Err(err) => { - warn!("failed to spawn global memory consolidation agent: {err}"); - otel_manager.counter( - metrics::MEMORY_PHASE_TWO_JOBS, - 1, - &[("status", "failed_spawn_agent")], - ); - let _ = state_db - .mark_global_phase2_job_failed( - &ownership_token, - "failed to spawn consolidation agent", - phase_two::JOB_RETRY_DELAY_SECONDS, - ) - .await; - false - } - } -} - -#[cfg(test)] -mod tests { - use super::completion_watermark; - use super::run_global_memory_consolidation; - use crate::CodexAuth; - use crate::ThreadManager; - use crate::agent::control::AgentControl; - use crate::codex::Session; - use crate::codex::make_session_and_context; - use crate::config::Config; - use crate::config::test_config; - use crate::memories::memory_root; - use crate::memories::raw_memories_file; - use crate::memories::rollout_summaries_dir; - use chrono::Utc; - use codex_protocol::ThreadId; - use codex_protocol::protocol::AskForApproval; - use codex_protocol::protocol::Op; - use codex_protocol::protocol::SandboxPolicy; - use codex_protocol::protocol::SessionSource; - use codex_state::Phase2JobClaimOutcome; - use codex_state::Stage1Output; - use codex_state::ThreadMetadataBuilder; - use pretty_assertions::assert_eq; - use std::path::PathBuf; - use std::sync::Arc; - use tempfile::TempDir; - - struct DispatchHarness { - _codex_home: TempDir, - config: Arc, - session: Arc, - manager: ThreadManager, - state_db: Arc, - } - - impl DispatchHarness { - async fn new() -> Self { - let codex_home = tempfile::tempdir().expect("create temp codex home"); - let mut config = test_config(); - config.codex_home = codex_home.path().to_path_buf(); - config.cwd = config.codex_home.clone(); - let config = Arc::new(config); - - let state_db = codex_state::StateRuntime::init( - config.codex_home.clone(), - config.model_provider_id.clone(), - None, - ) - .await - .expect("initialize state db"); - - let manager = ThreadManager::with_models_provider_and_home_for_tests( - CodexAuth::from_api_key("dummy"), - config.model_provider.clone(), - config.codex_home.clone(), - ); - let (mut session, _turn_context) = make_session_and_context().await; - session.services.state_db = Some(Arc::clone(&state_db)); - session.services.agent_control = manager.agent_control(); - - Self { - _codex_home: codex_home, - config, - session: Arc::new(session), - manager, - state_db, - } - } - - async fn seed_stage1_output(&self, source_updated_at: i64) { - let thread_id = ThreadId::new(); - let mut metadata_builder = ThreadMetadataBuilder::new( - thread_id, - self.config - .codex_home - .join(format!("rollout-{thread_id}.jsonl")), - Utc::now(), - SessionSource::Cli, - ); - metadata_builder.cwd = self.config.cwd.clone(); - metadata_builder.model_provider = Some(self.config.model_provider_id.clone()); - let metadata = metadata_builder.build(&self.config.model_provider_id); - - self.state_db - .upsert_thread(&metadata) - .await - .expect("upsert thread metadata"); - - let claim = self - .state_db - .try_claim_stage1_job( - thread_id, - self.session.conversation_id, - source_updated_at, - 3_600, - 64, - ) - .await - .expect("claim stage-1 job"); - let ownership_token = match claim { - codex_state::Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token, - other => panic!("unexpected stage-1 claim outcome: {other:?}"), - }; - assert!( - self.state_db - .mark_stage1_job_succeeded( - thread_id, - &ownership_token, - source_updated_at, - "raw memory", - "rollout summary", - ) - .await - .expect("mark stage-1 success"), - "stage-1 success should enqueue global consolidation" - ); - } - - async fn shutdown_threads(&self) { - self.manager - .remove_and_close_all_threads() - .await - .expect("shutdown spawned threads"); - } - - fn user_input_ops_count(&self) -> usize { - self.manager - .captured_ops() - .into_iter() - .filter(|(_, op)| matches!(op, Op::UserInput { .. })) - .count() - } - } - - #[test] - fn completion_watermark_never_regresses_below_claimed_input_watermark() { - let stage1_output = Stage1Output { - thread_id: ThreadId::new(), - source_updated_at: chrono::DateTime::::from_timestamp(123, 0) - .expect("valid source_updated_at timestamp"), - raw_memory: "raw memory".to_string(), - rollout_summary: "rollout summary".to_string(), - cwd: PathBuf::from("/tmp/workspace"), - generated_at: chrono::DateTime::::from_timestamp(124, 0) - .expect("valid generated_at timestamp"), - }; - - let completion = completion_watermark(1_000, &[stage1_output]); - assert_eq!(completion, 1_000); - } - - #[tokio::test] - async fn dispatch_reclaims_stale_global_lock_and_starts_consolidation() { - let harness = DispatchHarness::new().await; - harness.seed_stage1_output(100).await; - - let stale_claim = harness - .state_db - .try_claim_global_phase2_job(ThreadId::new(), 0) - .await - .expect("claim stale global lock"); - assert!( - matches!(stale_claim, Phase2JobClaimOutcome::Claimed { .. }), - "stale lock precondition should be claimed" - ); - - let scheduled = - run_global_memory_consolidation(&harness.session, Arc::clone(&harness.config)).await; - assert!( - scheduled, - "dispatch should reclaim stale lock and spawn one agent" - ); - - let running_claim = harness - .state_db - .try_claim_global_phase2_job(ThreadId::new(), 3_600) - .await - .expect("claim while running"); - assert_eq!(running_claim, Phase2JobClaimOutcome::SkippedRunning); - - let user_input_ops = harness.user_input_ops_count(); - assert_eq!(user_input_ops, 1); - let thread_ids = harness.manager.list_thread_ids().await; - assert_eq!(thread_ids.len(), 1); - let subagent = harness - .manager - .get_thread(thread_ids[0]) - .await - .expect("get consolidation thread"); - let config_snapshot = subagent.config_snapshot().await; - assert_eq!(config_snapshot.approval_policy, AskForApproval::Never); - assert_eq!(config_snapshot.cwd, memory_root(&harness.config.codex_home)); - match config_snapshot.sandbox_policy { - SandboxPolicy::WorkspaceWrite { writable_roots, .. } => { - assert!( - writable_roots - .iter() - .any(|root| root.as_path() == harness.config.codex_home.as_path()), - "consolidation subagent should have codex_home as writable root" - ); - } - other => panic!("unexpected sandbox policy: {other:?}"), - } - - harness.shutdown_threads().await; - } - - #[tokio::test] - async fn dispatch_schedules_only_one_agent_while_lock_is_running() { - let harness = DispatchHarness::new().await; - harness.seed_stage1_output(200).await; - - let first_run = - run_global_memory_consolidation(&harness.session, Arc::clone(&harness.config)).await; - let second_run = - run_global_memory_consolidation(&harness.session, Arc::clone(&harness.config)).await; - - assert!(first_run, "first dispatch should schedule consolidation"); - assert!( - !second_run, - "second dispatch should skip while the global lock is running" - ); - - let user_input_ops = harness.user_input_ops_count(); - assert_eq!(user_input_ops, 1); - - harness.shutdown_threads().await; - } - - #[tokio::test] - async fn dispatch_with_dirty_job_and_no_stage1_outputs_skips_spawn_and_clears_dirty_flag() { - let harness = DispatchHarness::new().await; - harness - .state_db - .enqueue_global_consolidation(999) - .await - .expect("enqueue global consolidation"); - - let scheduled = - run_global_memory_consolidation(&harness.session, Arc::clone(&harness.config)).await; - assert!( - !scheduled, - "dispatch should not spawn when no stage-1 outputs are available" - ); - assert_eq!(harness.user_input_ops_count(), 0); - - let claim = harness - .state_db - .try_claim_global_phase2_job(ThreadId::new(), 3_600) - .await - .expect("claim global job after empty dispatch"); - assert_eq!( - claim, - Phase2JobClaimOutcome::SkippedNotDirty, - "empty dispatch should finalize global job as up-to-date" - ); - - harness.shutdown_threads().await; - } - - #[tokio::test] - async fn dispatch_with_empty_stage1_outputs_rebuilds_local_artifacts() { - let harness = DispatchHarness::new().await; - let root = memory_root(&harness.config.codex_home); - let summaries_dir = rollout_summaries_dir(&root); - tokio::fs::create_dir_all(&summaries_dir) - .await - .expect("create rollout summaries dir"); - - let stale_summary_path = summaries_dir.join(format!("{}.md", ThreadId::new())); - tokio::fs::write(&stale_summary_path, "stale summary\n") - .await - .expect("write stale rollout summary"); - let raw_memories_path = raw_memories_file(&root); - tokio::fs::write(&raw_memories_path, "stale raw memories\n") - .await - .expect("write stale raw memories"); - let memory_index_path = root.join("MEMORY.md"); - tokio::fs::write(&memory_index_path, "stale memory index\n") - .await - .expect("write stale memory index"); - let memory_summary_path = root.join("memory_summary.md"); - tokio::fs::write(&memory_summary_path, "stale memory summary\n") - .await - .expect("write stale memory summary"); - let stale_skill_file = root.join("skills/demo/SKILL.md"); - tokio::fs::create_dir_all( - stale_skill_file - .parent() - .expect("skills subdirectory parent should exist"), - ) - .await - .expect("create stale skills dir"); - tokio::fs::write(&stale_skill_file, "stale skill\n") - .await - .expect("write stale skill"); - - harness - .state_db - .enqueue_global_consolidation(999) - .await - .expect("enqueue global consolidation"); - - let scheduled = - run_global_memory_consolidation(&harness.session, Arc::clone(&harness.config)).await; - assert!( - !scheduled, - "dispatch should skip subagent spawn when no stage-1 outputs are available" - ); - - assert!( - !tokio::fs::try_exists(&stale_summary_path) - .await - .expect("check stale summary existence"), - "empty consolidation should prune stale rollout summary files" - ); - let raw_memories = tokio::fs::read_to_string(&raw_memories_path) - .await - .expect("read rebuilt raw memories"); - assert_eq!(raw_memories, "# Raw Memories\n\nNo raw memories yet.\n"); - assert!( - !tokio::fs::try_exists(&memory_index_path) - .await - .expect("check memory index existence"), - "empty consolidation should remove stale MEMORY.md" - ); - assert!( - !tokio::fs::try_exists(&memory_summary_path) - .await - .expect("check memory summary existence"), - "empty consolidation should remove stale memory_summary.md" - ); - assert!( - !tokio::fs::try_exists(&stale_skill_file) - .await - .expect("check stale skill existence"), - "empty consolidation should remove stale skills artifacts" - ); - assert!( - !tokio::fs::try_exists(root.join("skills")) - .await - .expect("check skills dir existence"), - "empty consolidation should remove stale skills directory" - ); - - harness.shutdown_threads().await; - } - - #[tokio::test] - async fn dispatch_marks_job_for_retry_when_spawn_agent_fails() { - let codex_home = tempfile::tempdir().expect("create temp codex home"); - let mut config = test_config(); - config.codex_home = codex_home.path().to_path_buf(); - config.cwd = config.codex_home.clone(); - let config = Arc::new(config); - - let state_db = codex_state::StateRuntime::init( - config.codex_home.clone(), - config.model_provider_id.clone(), - None, - ) - .await - .expect("initialize state db"); - - let (mut session, _turn_context) = make_session_and_context().await; - session.services.state_db = Some(Arc::clone(&state_db)); - session.services.agent_control = AgentControl::default(); - let session = Arc::new(session); - - let thread_id = ThreadId::new(); - let mut metadata_builder = ThreadMetadataBuilder::new( - thread_id, - config.codex_home.join(format!("rollout-{thread_id}.jsonl")), - Utc::now(), - SessionSource::Cli, - ); - metadata_builder.cwd = config.cwd.clone(); - metadata_builder.model_provider = Some(config.model_provider_id.clone()); - let metadata = metadata_builder.build(&config.model_provider_id); - state_db - .upsert_thread(&metadata) - .await - .expect("upsert thread metadata"); - - let claim = state_db - .try_claim_stage1_job(thread_id, session.conversation_id, 100, 3_600, 64) - .await - .expect("claim stage-1 job"); - let ownership_token = match claim { - codex_state::Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token, - other => panic!("unexpected stage-1 claim outcome: {other:?}"), - }; - assert!( - state_db - .mark_stage1_job_succeeded( - thread_id, - &ownership_token, - 100, - "raw memory", - "rollout summary", - ) - .await - .expect("mark stage-1 success"), - "stage-1 success should enqueue global consolidation" - ); - - let scheduled = run_global_memory_consolidation(&session, Arc::clone(&config)).await; - assert!( - !scheduled, - "dispatch should return false when consolidation subagent cannot be spawned" - ); - - let retry_claim = state_db - .try_claim_global_phase2_job(ThreadId::new(), 3_600) - .await - .expect("claim global job after spawn failure"); - assert_eq!( - retry_claim, - Phase2JobClaimOutcome::SkippedNotDirty, - "spawn failures should leave the job in retry backoff instead of running" - ); - } -} diff --git a/codex-rs/core/src/memories/mod.rs b/codex-rs/core/src/memories/mod.rs index 179c90cbb5..22fe0fb9c9 100644 --- a/codex-rs/core/src/memories/mod.rs +++ b/codex-rs/core/src/memories/mod.rs @@ -4,7 +4,6 @@ //! - Phase 1: select rollouts, extract stage-1 raw memories, persist stage-1 outputs, and enqueue consolidation. //! - Phase 2: claim a global consolidation lock, materialize consolidation inputs, and dispatch one consolidation agent. -mod dispatch; mod phase1; mod phase2; pub(crate) mod prompts; @@ -26,10 +25,10 @@ mod artifacts { /// Phase 1 (startup extraction). mod phase_one { + /// Default model used for phase 1. + pub(super) const MODEL: &str = "gpt-5.3-codex-spark"; /// Prompt used for phase 1. pub(super) const PROMPT: &str = include_str!("../../templates/memories/stage_one_system.md"); - /// Maximum number of rollout candidates processed per startup pass. - pub(super) const MAX_ROLLOUTS_PER_STARTUP: usize = 8; /// Concurrency cap for startup memory extraction and consolidation scheduling. pub(super) const CONCURRENCY_LIMIT: usize = 8; /// Fallback stage-1 rollout truncation limit (tokens) when model metadata @@ -44,10 +43,6 @@ mod phase_one { /// Keeping this below 100% leaves room for system instructions, prompt /// framing, and model output. pub(super) const CONTEXT_WINDOW_PERCENT: i64 = 70; - /// Maximum rollout age considered for phase-1 extraction. - pub(super) const MAX_ROLLOUT_AGE_DAYS: i64 = 30; - /// Minimum rollout idle time required before phase-1 extraction. - pub(super) const MIN_ROLLOUT_IDLE_HOURS: i64 = 12; /// Lease duration (seconds) for phase-1 job ownership. pub(super) const JOB_LEASE_SECONDS: i64 = 3_600; /// Backoff delay (seconds) before retrying a failed stage-1 extraction job. @@ -58,10 +53,8 @@ mod phase_one { /// Phase 2 (aka `Consolidation`). mod phase_two { - /// Subagent source label used to identify consolidation tasks. - pub(super) const MEMORY_CONSOLIDATION_SUBAGENT_LABEL: &str = "memory_consolidation"; - /// Maximum number of recent raw memories retained for global consolidation. - pub(super) const MAX_RAW_MEMORIES_FOR_GLOBAL: usize = 1_024; + /// Default model used for phase 2. + pub(super) const MODEL: &str = "gpt-5.3-codex"; /// Lease duration (seconds) for phase-2 consolidation job ownership. pub(super) const JOB_LEASE_SECONDS: i64 = 3_600; /// Backoff delay (seconds) before retrying a failed phase-2 consolidation diff --git a/codex-rs/core/src/memories/phase1.rs b/codex-rs/core/src/memories/phase1.rs index e11d290097..1d5fc9e506 100644 --- a/codex-rs/core/src/memories/phase1.rs +++ b/codex-rs/core/src/memories/phase1.rs @@ -2,6 +2,8 @@ use crate::Prompt; use crate::RolloutRecorder; use crate::codex::Session; use crate::codex::TurnContext; +use crate::config::Config; +use crate::config::types::MemoriesConfig; use crate::error::CodexErr; use crate::memories::metrics; use crate::memories::phase_one; @@ -67,10 +69,9 @@ struct StageOneOutput { /// Compact summary line used for routing and indexing. #[serde(rename = "rollout_summary")] pub(crate) rollout_summary: String, - /// Optional slug accepted from stage-1 output for forward compatibility. - /// This is currently ignored by downstream storage and naming, which remain thread-id based. + /// Optional slug used to derive rollout summary artifact filenames. #[serde(default, rename = "rollout_slug")] - pub(crate) _rollout_slug: Option, + pub(crate) rollout_slug: Option, } /// Runs memory phase 1 in strict step order: @@ -78,9 +79,9 @@ struct StageOneOutput { /// 2) build one stage-1 request context /// 3) run stage-1 extraction jobs in parallel /// 4) emit metrics and logs -pub(in crate::memories) async fn run(session: &Arc) { +pub(in crate::memories) async fn run(session: &Arc, config: &Config) { // 1. Claim startup job. - let Some(claimed_candidates) = claim_startup_jobs(session).await else { + let Some(claimed_candidates) = claim_startup_jobs(session, &config.memories).await else { return; }; if claimed_candidates.is_empty() { @@ -93,7 +94,7 @@ pub(in crate::memories) async fn run(session: &Arc) { } // 2. Build request. - let stage_one_context = build_request_context(session).await; + let stage_one_context = build_request_context(session, config).await; // 3. Run the parallel sampling. let outcomes = run_jobs(session, claimed_candidates, stage_one_context).await; @@ -120,7 +121,7 @@ pub fn output_schema() -> Value { "rollout_slug": { "type": "string" }, "raw_memory": { "type": "string" } }, - "required": ["rollout_summary", "rollout_slug", "raw_memory"], + "required": ["rollout_summary", "raw_memory"], "additionalProperties": false }) } @@ -129,18 +130,22 @@ impl RequestContext { pub(in crate::memories) fn from_turn_context( turn_context: &TurnContext, turn_metadata_header: Option, + model_info: ModelInfo, ) -> Self { Self { - model_info: turn_context.model_info.clone(), + model_info, + turn_metadata_header, otel_manager: turn_context.otel_manager.clone(), reasoning_effort: turn_context.reasoning_effort, reasoning_summary: turn_context.reasoning_summary, - turn_metadata_header, } } } -async fn claim_startup_jobs(session: &Arc) -> Option> { +async fn claim_startup_jobs( + session: &Arc, + memories_config: &MemoriesConfig, +) -> Option> { let Some(state_db) = session.services.state_db.as_deref() else { // This should not happen. warn!("state db unavailable while claiming phase-1 startup jobs; skipping"); @@ -157,9 +162,9 @@ async fn claim_startup_jobs(session: &Arc) -> Option) -> Option) -> RequestContext { +async fn build_request_context(session: &Arc, config: &Config) -> RequestContext { + let model_name = config + .memories + .phase_1_model + .clone() + .unwrap_or(phase_one::MODEL.to_string()); + let model = session + .services + .models_manager + .get_model_info(&model_name, config) + .await; let turn_context = session.new_default_turn().await; RequestContext::from_turn_context( turn_context.as_ref(), turn_context.resolve_turn_metadata_header().await, + model, ) } @@ -251,6 +267,7 @@ mod job { thread.updated_at.timestamp(), &stage_one_output.raw_memory, &stage_one_output.rollout_summary, + stage_one_output.rollout_slug.as_deref(), ) .await, token_usage, @@ -331,6 +348,7 @@ mod job { let mut output: StageOneOutput = serde_json::from_str(&result)?; output.raw_memory = redact_secrets(output.raw_memory); output.rollout_summary = redact_secrets(output.rollout_summary); + output.rollout_slug = output.rollout_slug.map(redact_secrets); Ok((output, token_usage)) } @@ -384,6 +402,7 @@ mod job { source_updated_at: i64, raw_memory: &str, rollout_summary: &str, + rollout_slug: Option<&str>, ) -> JobOutcome { let Some(state_db) = session.services.state_db.as_deref() else { return JobOutcome::Failed; @@ -396,6 +415,7 @@ mod job { source_updated_at, raw_memory, rollout_summary, + rollout_slug, ) .await .unwrap_or(false) diff --git a/codex-rs/core/src/memories/phase2.rs b/codex-rs/core/src/memories/phase2.rs index ef0f4642b6..28048498ce 100644 --- a/codex-rs/core/src/memories/phase2.rs +++ b/codex-rs/core/src/memories/phase2.rs @@ -1,136 +1,360 @@ use crate::agent::AgentStatus; use crate::agent::status::is_final as is_final_agent_status; use crate::codex::Session; +use crate::config::Config; +use crate::memories::memory_root; use crate::memories::metrics; use crate::memories::phase_two; +use crate::memories::prompts::build_consolidation_prompt; +use crate::memories::storage::rebuild_raw_memories_file_from_memories; +use crate::memories::storage::sync_rollout_summaries_from_memories; +use codex_config::Constrained; use codex_protocol::ThreadId; +use codex_protocol::protocol::AskForApproval; +use codex_protocol::protocol::SandboxPolicy; +use codex_protocol::protocol::SessionSource; +use codex_protocol::protocol::SubAgentSource; +use codex_protocol::user_input::UserInput; +use codex_state::StateRuntime; +use codex_utils_absolute_path::AbsolutePathBuf; use std::sync::Arc; use std::time::Duration; use tokio::sync::watch; -use tracing::debug; -use tracing::info; use tracing::warn; -pub(in crate::memories) fn spawn_phase2_completion_task( - session: &Session, - ownership_token: String, - completion_watermark: i64, - consolidation_agent_id: ThreadId, -) { - let state_db = session.services.state_db.clone(); - let agent_control = session.services.agent_control.clone(); - let otel_manager = session.services.otel_manager.clone(); - - tokio::spawn(async move { - let Some(state_db) = state_db else { - return; - }; - - let status_rx = match agent_control.subscribe_status(consolidation_agent_id).await { - Ok(status_rx) => status_rx, - Err(err) => { - warn!( - "failed to subscribe to global memory consolidation agent {consolidation_agent_id}: {err}" - ); - otel_manager.counter( - metrics::MEMORY_PHASE_TWO_JOBS, - 1, - &[("status", "failed_subscribe_status")], - ); - mark_phase2_failed_with_recovery( - state_db.as_ref(), - &ownership_token, - "failed to subscribe to consolidation agent status", - ) - .await; - return; - } - }; - - let final_status = run_phase2_completion_task( - Arc::clone(&state_db), - ownership_token, - completion_watermark, - consolidation_agent_id, - status_rx, - ) - .await; - if matches!(final_status, AgentStatus::Shutdown | AgentStatus::NotFound) { - otel_manager.counter( - metrics::MEMORY_PHASE_TWO_JOBS, - 1, - &[("status", "failed_agent_unavailable")], - ); - return; - } - if is_phase2_success(&final_status) { - otel_manager.counter( - metrics::MEMORY_PHASE_TWO_JOBS, - 1, - &[("status", "succeeded")], - ); - } else { - otel_manager.counter(metrics::MEMORY_PHASE_TWO_JOBS, 1, &[("status", "failed")]); - } - - tokio::spawn(async move { - if let Err(err) = agent_control.shutdown_agent(consolidation_agent_id).await { - warn!( - "failed to auto-close global memory consolidation agent {consolidation_agent_id}: {err}" - ); - } - }); - }); +#[derive(Debug, Clone, Default)] +struct Claim { + token: String, + watermark: i64, } -async fn run_phase2_completion_task( - state_db: Arc, - ownership_token: String, - completion_watermark: i64, - consolidation_agent_id: ThreadId, - mut status_rx: watch::Receiver, -) -> AgentStatus { - let final_status = { +#[derive(Debug, Clone, Default)] +struct Counters { + input: i64, +} + +/// Runs memory phase 2 (aka consolidation) in strict order. The method represents the linear +/// flow of the consolidation phase. +pub(super) async fn run(session: &Arc, config: Arc) { + let Some(db) = session.services.state_db.as_deref() else { + // This should not happen. + return; + }; + let root = memory_root(&config.codex_home); + let max_raw_memories = config.memories.max_raw_memories_for_global; + + // 1. Claim the job. + let claim = match job::claim(session, db).await { + Ok(claim) => claim, + Err(e) => { + session.services.otel_manager.counter( + metrics::MEMORY_PHASE_TWO_JOBS, + 1, + &[("status", e)], + ); + return; + } + }; + + // 2. Get the config for the agent + let Some(agent_config) = agent::get_config(config.clone()) else { + // If we can't get the config, we can't consolidate. + tracing::error!("failed to get agent config"); + job::failed(session, db, &claim, "failed_sandbox_policy").await; + return; + }; + + // 3. Query the memories + let raw_memories = match db.list_stage1_outputs_for_global(max_raw_memories).await { + Ok(memories) => memories, + Err(err) => { + tracing::error!("failed to list stage1 outputs from global: {}", err); + job::failed(session, db, &claim, "failed_load_stage1_outputs").await; + return; + } + }; + let new_watermark = get_watermark(claim.watermark, &raw_memories); + + // 4. Update the file system by syncing the raw memories with the one extracted from DB at + // step 3 + // [`rollout_summaries/`] + if let Err(err) = + sync_rollout_summaries_from_memories(&root, &raw_memories, max_raw_memories).await + { + tracing::error!("failed syncing local memory artifacts for global consolidation: {err}"); + job::failed(session, db, &claim, "failed_sync_artifacts").await; + return; + } + // [`raw_memories.md`] + if let Err(err) = + rebuild_raw_memories_file_from_memories(&root, &raw_memories, max_raw_memories).await + { + tracing::error!("failed syncing local memory artifacts for global consolidation: {err}"); + job::failed(session, db, &claim, "failed_rebuild_raw_memories").await; + return; + } + if raw_memories.is_empty() { + // We check only after sync of the file system. + job::succeed(session, db, &claim, new_watermark, "succeeded_no_input").await; + return; + } + + // 5. Spawn the agent + let prompt = agent::get_prompt(config); + let source = SessionSource::SubAgent(SubAgentSource::MemoryConsolidation); + let thread_id = match session + .services + .agent_control + .spawn_agent(agent_config, prompt, Some(source)) + .await + { + Ok(thread_id) => thread_id, + Err(err) => { + tracing::error!("failed to spawn global memory consolidation agent: {err}"); + job::failed(session, db, &claim, "failed_spawn_agent").await; + return; + } + }; + + // 6. Spawn the agent handler. + agent::handle(session, claim, new_watermark, thread_id); + + // 7. Metrics and logs. + let counters = Counters { + input: raw_memories.len() as i64, + }; + emit_metrics(session, counters); +} + +mod job { + use super::*; + + pub(super) async fn claim( + session: &Arc, + db: &StateRuntime, + ) -> Result { + let otel_manager = &session.services.otel_manager; + let claim = db + .try_claim_global_phase2_job(session.conversation_id, phase_two::JOB_LEASE_SECONDS) + .await + .map_err(|e| { + tracing::error!("failed to claim job: {}", e); + "failed_claim" + })?; + let (token, watermark) = match claim { + codex_state::Phase2JobClaimOutcome::Claimed { + ownership_token, + input_watermark, + } => { + otel_manager.counter(metrics::MEMORY_PHASE_TWO_JOBS, 1, &[("status", "claimed")]); + (ownership_token, input_watermark) + } + codex_state::Phase2JobClaimOutcome::SkippedNotDirty => return Err("skipped_not_dirty"), + codex_state::Phase2JobClaimOutcome::SkippedRunning => return Err("skipped_running"), + }; + + Ok(Claim { token, watermark }) + } + + pub(super) async fn failed( + session: &Arc, + db: &StateRuntime, + claim: &Claim, + reason: &'static str, + ) { + session.services.otel_manager.counter( + metrics::MEMORY_PHASE_TWO_JOBS, + 1, + &[("status", reason)], + ); + if matches!( + db.mark_global_phase2_job_failed( + &claim.token, + reason, + phase_two::JOB_RETRY_DELAY_SECONDS, + ) + .await, + Ok(false) + ) { + let _ = db + .mark_global_phase2_job_failed_if_unowned( + &claim.token, + reason, + phase_two::JOB_RETRY_DELAY_SECONDS, + ) + .await; + } + } + + pub(super) async fn succeed( + session: &Arc, + db: &StateRuntime, + claim: &Claim, + completion_watermark: i64, + reason: &'static str, + ) { + session.services.otel_manager.counter( + metrics::MEMORY_PHASE_TWO_JOBS, + 1, + &[("status", reason)], + ); + let _ = db + .mark_global_phase2_job_succeeded(&claim.token, completion_watermark) + .await; + } +} + +mod agent { + use super::*; + + pub(super) fn get_config(config: Arc) -> Option { + let root = memory_root(&config.codex_home); + let mut agent_config = config.as_ref().clone(); + + agent_config.cwd = root; + // Approval policy + agent_config.permissions.approval_policy = Constrained::allow_only(AskForApproval::Never); + + // Sandbox policy + let mut writable_roots = Vec::new(); + match AbsolutePathBuf::from_absolute_path(agent_config.codex_home.clone()) { + Ok(codex_home) => writable_roots.push(codex_home), + Err(err) => warn!( + "memory phase-2 consolidation could not add codex_home writable root {}: {err}", + agent_config.codex_home.display() + ), + } + // The consolidation agent only needs local codex_home write access and no network. + let consolidation_sandbox_policy = SandboxPolicy::WorkspaceWrite { + writable_roots, + read_only_access: Default::default(), + network_access: false, + exclude_tmpdir_env_var: false, + exclude_slash_tmp: false, + }; + agent_config + .permissions + .sandbox_policy + .set(consolidation_sandbox_policy) + .ok()?; + + agent_config.model = Some( + config + .memories + .phase_2_model + .clone() + .unwrap_or(phase_two::MODEL.to_string()), + ); + + Some(agent_config) + } + + pub(super) fn get_prompt(config: Arc) -> Vec { + let root = memory_root(&config.codex_home); + let prompt = build_consolidation_prompt(&root); + vec![UserInput::Text { + text: prompt, + text_elements: vec![], + }] + } + + /// Handle the agent while it is running. + pub(super) fn handle( + session: &Arc, + claim: Claim, + new_watermark: i64, + thread_id: ThreadId, + ) { + let Some(db) = session.services.state_db.clone() else { + return; + }; + let session = session.clone(); + + tokio::spawn(async move { + let agent_control = session.services.agent_control.clone(); + + // TODO(jif) we might have a very small race here. + let rx = match agent_control.subscribe_status(thread_id).await { + Ok(rx) => rx, + Err(err) => { + tracing::error!("agent_control.subscribe_status failed: {err:?}"); + job::failed(&session, &db, &claim, "failed_subscribe_status").await; + return; + } + }; + + // Loop the agent until we have the final status. + let final_status = loop_agent( + db.clone(), + claim.token.clone(), + new_watermark, + thread_id, + rx, + ) + .await; + + if matches!(final_status, AgentStatus::Completed(_)) { + job::succeed(&session, &db, &claim, new_watermark, "succeeded").await; + } else { + job::failed(&session, &db, &claim, "failed_agent").await; + } + + // Fire and forget close of the agent. + if !matches!(final_status, AgentStatus::Shutdown | AgentStatus::NotFound) { + tokio::spawn(async move { + if let Err(err) = agent_control.shutdown_agent(thread_id).await { + warn!( + "failed to auto-close global memory consolidation agent {thread_id}: {err}" + ); + } + }); + } else { + tracing::warn!("The agent was already gone"); + } + }); + } + + async fn loop_agent( + db: Arc, + token: String, + _new_watermark: i64, + thread_id: ThreadId, + mut rx: watch::Receiver, + ) -> AgentStatus { let mut heartbeat_interval = tokio::time::interval(Duration::from_secs(phase_two::JOB_HEARTBEAT_SECONDS)); heartbeat_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { - let status = status_rx.borrow().clone(); + let status = rx.borrow().clone(); if is_final_agent_status(&status) { break status; } tokio::select! { - changed = status_rx.changed() => { - if changed.is_err() { - warn!( - "lost status updates for global memory consolidation agent {consolidation_agent_id}" + update = rx.changed() => { + if update.is_err() { + tracing::warn!( + "lost status updates for global memory consolidation agent {thread_id}" ); break status; } } _ = heartbeat_interval.tick() => { - match state_db + match db .heartbeat_global_phase2_job( - &ownership_token, + &token, phase_two::JOB_LEASE_SECONDS, ) .await { Ok(true) => {} Ok(false) => { - warn!( - "memory phase-2 heartbeat lost global ownership; finalizing as failure" - ); break AgentStatus::Errored( "lost global phase-2 ownership during heartbeat".to_string(), ); } Err(err) => { - warn!( - "state db heartbeat_global_phase2_job failed during memories startup: {err}" - ); break AgentStatus::Errored(format!( "phase-2 heartbeat update failed: {err}" )); @@ -139,281 +363,30 @@ async fn run_phase2_completion_task( } } } - }; + } +} - let phase2_success = is_phase2_success(&final_status); - info!( - "memory phase-2 global consolidation complete: agent_id={consolidation_agent_id} success={phase2_success} final_status={final_status:?}" +pub(super) fn get_watermark( + claimed_watermark: i64, + latest_memories: &[codex_state::Stage1Output], +) -> i64 { + latest_memories + .iter() + .map(|memory| memory.source_updated_at.timestamp()) + .max() + .unwrap_or(claimed_watermark) + .max(claimed_watermark) // todo double check the claimed here. +} + +fn emit_metrics(session: &Arc, counters: Counters) { + let otel = session.services.otel_manager.clone(); + if counters.input > 0 { + otel.counter(metrics::MEMORY_PHASE_TWO_INPUT, counters.input, &[]); + } + + otel.counter( + metrics::MEMORY_PHASE_TWO_JOBS, + 1, + &[("status", "agent_spawned")], ); - - if phase2_success { - match state_db - .mark_global_phase2_job_succeeded(&ownership_token, completion_watermark) - .await - { - Ok(true) => {} - Ok(false) => { - debug!( - "memory phase-2 success finalization skipped after global ownership changed" - ); - } - Err(err) => { - warn!( - "state db mark_global_phase2_job_succeeded failed during memories startup: {err}" - ); - } - } - return final_status; - } - - let failure_reason = phase2_failure_reason(&final_status); - mark_phase2_failed_with_recovery(state_db.as_ref(), &ownership_token, &failure_reason).await; - warn!( - "memory phase-2 global consolidation agent finished with non-success status: agent_id={consolidation_agent_id} final_status={final_status:?}" - ); - final_status -} - -async fn mark_phase2_failed_with_recovery( - state_db: &codex_state::StateRuntime, - ownership_token: &str, - failure_reason: &str, -) { - match state_db - .mark_global_phase2_job_failed( - ownership_token, - failure_reason, - phase_two::JOB_RETRY_DELAY_SECONDS, - ) - .await - { - Ok(true) => {} - Ok(false) => match state_db - .mark_global_phase2_job_failed_if_unowned( - ownership_token, - failure_reason, - phase_two::JOB_RETRY_DELAY_SECONDS, - ) - .await - { - Ok(true) => { - debug!( - "memory phase-2 failure finalization applied fallback update for unowned running job" - ); - } - Ok(false) => { - debug!( - "memory phase-2 failure finalization skipped after global ownership changed" - ); - } - Err(err) => { - warn!( - "state db mark_global_phase2_job_failed_if_unowned failed during memories startup: {err}" - ); - } - }, - Err(err) => { - warn!("state db mark_global_phase2_job_failed failed during memories startup: {err}"); - } - } -} - -fn is_phase2_success(final_status: &AgentStatus) -> bool { - matches!(final_status, AgentStatus::Completed(_)) -} - -fn phase2_failure_reason(final_status: &AgentStatus) -> String { - format!("consolidation agent finished with status {final_status:?}") -} - -#[cfg(test)] -mod tests { - use super::is_phase2_success; - use super::phase2_failure_reason; - use super::run_phase2_completion_task; - use crate::agent::AgentStatus; - use codex_protocol::ThreadId; - use codex_state::Phase2JobClaimOutcome; - use pretty_assertions::assert_eq; - use std::sync::Arc; - - #[test] - fn phase2_success_only_for_completed_status() { - assert!(is_phase2_success(&AgentStatus::Completed(None))); - assert!(!is_phase2_success(&AgentStatus::Running)); - assert!(!is_phase2_success(&AgentStatus::Errored( - "oops".to_string() - ))); - } - - #[test] - fn phase2_failure_reason_includes_status() { - let status = AgentStatus::Errored("boom".to_string()); - let reason = phase2_failure_reason(&status); - assert!(reason.contains("consolidation agent finished with status")); - assert!(reason.contains("boom")); - } - - #[tokio::test] - async fn phase2_completion_marks_succeeded_for_completed_status() { - let codex_home = tempfile::tempdir().expect("create temp codex home"); - let state_db = Arc::new( - codex_state::StateRuntime::init( - codex_home.path().to_path_buf(), - "test-provider".to_string(), - None, - ) - .await - .expect("initialize state runtime"), - ); - let owner = ThreadId::new(); - state_db - .enqueue_global_consolidation(123) - .await - .expect("enqueue global consolidation"); - let claim = state_db - .try_claim_global_phase2_job(owner, 3_600) - .await - .expect("claim global phase-2 job"); - let ownership_token = match claim { - Phase2JobClaimOutcome::Claimed { - ownership_token, .. - } => ownership_token, - other => panic!("unexpected phase-2 claim outcome: {other:?}"), - }; - - let (_status_tx, status_rx) = tokio::sync::watch::channel(AgentStatus::Completed(None)); - run_phase2_completion_task( - Arc::clone(&state_db), - ownership_token.clone(), - 123, - ThreadId::new(), - status_rx, - ) - .await; - - let up_to_date_claim = state_db - .try_claim_global_phase2_job(ThreadId::new(), 3_600) - .await - .expect("claim up-to-date global job"); - assert_eq!(up_to_date_claim, Phase2JobClaimOutcome::SkippedNotDirty); - - state_db - .enqueue_global_consolidation(124) - .await - .expect("enqueue advanced consolidation watermark"); - let rerun_claim = state_db - .try_claim_global_phase2_job(ThreadId::new(), 3_600) - .await - .expect("claim rerun global job"); - assert!( - matches!(rerun_claim, Phase2JobClaimOutcome::Claimed { .. }), - "advanced watermark should be claimable after success finalization" - ); - } - - #[tokio::test] - async fn phase2_completion_marks_failed_when_status_updates_are_lost() { - let codex_home = tempfile::tempdir().expect("create temp codex home"); - let state_db = Arc::new( - codex_state::StateRuntime::init( - codex_home.path().to_path_buf(), - "test-provider".to_string(), - None, - ) - .await - .expect("initialize state runtime"), - ); - state_db - .enqueue_global_consolidation(456) - .await - .expect("enqueue global consolidation"); - let claim = state_db - .try_claim_global_phase2_job(ThreadId::new(), 3_600) - .await - .expect("claim global phase-2 job"); - let ownership_token = match claim { - Phase2JobClaimOutcome::Claimed { - ownership_token, .. - } => ownership_token, - other => panic!("unexpected phase-2 claim outcome: {other:?}"), - }; - - let (status_tx, status_rx) = tokio::sync::watch::channel(AgentStatus::Running); - drop(status_tx); - run_phase2_completion_task( - Arc::clone(&state_db), - ownership_token, - 456, - ThreadId::new(), - status_rx, - ) - .await; - - let claim = state_db - .try_claim_global_phase2_job(ThreadId::new(), 3_600) - .await - .expect("claim after failure finalization"); - assert_eq!( - claim, - Phase2JobClaimOutcome::SkippedNotDirty, - "failure finalization should leave global job in retry-backoff, not running ownership" - ); - } - - #[tokio::test] - async fn phase2_completion_heartbeat_loss_does_not_steal_active_other_owner() { - let codex_home = tempfile::tempdir().expect("create temp codex home"); - let state_db = Arc::new( - codex_state::StateRuntime::init( - codex_home.path().to_path_buf(), - "test-provider".to_string(), - None, - ) - .await - .expect("initialize state runtime"), - ); - state_db - .enqueue_global_consolidation(789) - .await - .expect("enqueue global consolidation"); - let claim = state_db - .try_claim_global_phase2_job(ThreadId::new(), 3_600) - .await - .expect("claim global phase-2 job"); - let claimed_token = match claim { - Phase2JobClaimOutcome::Claimed { - ownership_token, .. - } => ownership_token, - other => panic!("unexpected phase-2 claim outcome: {other:?}"), - }; - - let (_status_tx, status_rx) = tokio::sync::watch::channel(AgentStatus::Running); - run_phase2_completion_task( - Arc::clone(&state_db), - "non-owner-token".to_string(), - 789, - ThreadId::new(), - status_rx, - ) - .await; - - let claim = state_db - .try_claim_global_phase2_job(ThreadId::new(), 3_600) - .await - .expect("claim after heartbeat ownership loss"); - assert_eq!( - claim, - Phase2JobClaimOutcome::SkippedRunning, - "heartbeat ownership-loss handling should not steal a live owner lease" - ); - assert_eq!( - state_db - .mark_global_phase2_job_succeeded(claimed_token.as_str(), 789) - .await - .expect("mark original owner success"), - true, - "the original owner should still be able to finalize" - ); - } } diff --git a/codex-rs/core/src/memories/start.rs b/codex-rs/core/src/memories/start.rs index 5b47b6c2bd..b93846857f 100644 --- a/codex-rs/core/src/memories/start.rs +++ b/codex-rs/core/src/memories/start.rs @@ -2,6 +2,7 @@ use crate::codex::Session; use crate::config::Config; use crate::features::Feature; use crate::memories::phase1; +use crate::memories::phase2; use codex_protocol::protocol::SessionSource; use std::sync::Arc; use tracing::warn; @@ -34,8 +35,8 @@ pub(crate) fn start_memories_startup_task( }; // Run phase 1. - phase1::run(&session).await; + phase1::run(&session, &config).await; // Run phase 2. - crate::memories::dispatch::run_global_memory_consolidation(&session, config).await; + phase2::run(&session, config).await; }); } diff --git a/codex-rs/core/src/memories/storage.rs b/codex-rs/core/src/memories/storage.rs index dd39815b36..a596ea6f0c 100644 --- a/codex-rs/core/src/memories/storage.rs +++ b/codex-rs/core/src/memories/storage.rs @@ -5,7 +5,6 @@ use std::path::Path; use tracing::warn; use crate::memories::ensure_layout; -use crate::memories::phase_two; use crate::memories::raw_memories_file; use crate::memories::rollout_summaries_dir; @@ -15,25 +14,27 @@ use crate::memories::rollout_summaries_dir; pub(super) async fn rebuild_raw_memories_file_from_memories( root: &Path, memories: &[Stage1Output], + max_raw_memories_for_global: usize, ) -> std::io::Result<()> { ensure_layout(root).await?; - rebuild_raw_memories_file(root, memories).await + rebuild_raw_memories_file(root, memories, max_raw_memories_for_global).await } /// Syncs canonical rollout summary files from DB-backed stage-1 output rows. pub(super) async fn sync_rollout_summaries_from_memories( root: &Path, memories: &[Stage1Output], + max_raw_memories_for_global: usize, ) -> std::io::Result<()> { ensure_layout(root).await?; let retained = memories .iter() - .take(phase_two::MAX_RAW_MEMORIES_FOR_GLOBAL) + .take(max_raw_memories_for_global) .collect::>(); let keep = retained .iter() - .map(|memory| memory.thread_id.to_string()) + .map(|memory| rollout_summary_file_stem(memory)) .collect::>(); prune_rollout_summaries(root, &keep).await?; @@ -62,10 +63,14 @@ pub(super) async fn sync_rollout_summaries_from_memories( Ok(()) } -async fn rebuild_raw_memories_file(root: &Path, memories: &[Stage1Output]) -> std::io::Result<()> { +async fn rebuild_raw_memories_file( + root: &Path, + memories: &[Stage1Output], + max_raw_memories_for_global: usize, +) -> std::io::Result<()> { let retained = memories .iter() - .take(phase_two::MAX_RAW_MEMORIES_FOR_GLOBAL) + .take(max_raw_memories_for_global) .collect::>(); let mut body = String::from("# Raw Memories\n\n"); @@ -108,10 +113,10 @@ async fn prune_rollout_summaries(root: &Path, keep: &BTreeSet) -> std::i let Some(file_name) = path.file_name().and_then(|name| name.to_str()) else { continue; }; - let Some(thread_id) = extract_thread_id_from_rollout_summary_filename(file_name) else { + let Some(stem) = file_name.strip_suffix(".md") else { continue; }; - if !keep.contains(thread_id) + if !keep.contains(stem) && let Err(err) = tokio::fs::remove_file(&path).await && err.kind() != std::io::ErrorKind::NotFound { @@ -129,7 +134,8 @@ async fn write_rollout_summary_for_thread( root: &Path, memory: &Stage1Output, ) -> std::io::Result<()> { - let path = rollout_summaries_dir(root).join(format!("{}.md", memory.thread_id)); + let file_stem = rollout_summary_file_stem(memory); + let path = rollout_summaries_dir(root).join(format!("{file_stem}.md")); let mut body = String::new(); writeln!(body, "thread_id: {}", memory.thread_id) @@ -150,7 +156,85 @@ async fn write_rollout_summary_for_thread( tokio::fs::write(path, body).await } -fn extract_thread_id_from_rollout_summary_filename(file_name: &str) -> Option<&str> { - let stem = file_name.strip_suffix(".md")?; - if stem.is_empty() { None } else { Some(stem) } +fn rollout_summary_file_stem(memory: &Stage1Output) -> String { + const ROLLOUT_SLUG_MAX_LEN: usize = 20; + + let thread_id = memory.thread_id.to_string(); + let Some(raw_slug) = memory.rollout_slug.as_deref() else { + return thread_id; + }; + + let mut slug = String::with_capacity(ROLLOUT_SLUG_MAX_LEN); + for ch in raw_slug.chars() { + if slug.len() >= ROLLOUT_SLUG_MAX_LEN { + break; + } + + if ch.is_ascii_alphanumeric() { + slug.push(ch.to_ascii_lowercase()); + } else { + slug.push('_'); + } + } + + while slug.ends_with('_') { + slug.pop(); + } + + if slug.is_empty() { + thread_id + } else { + format!("{thread_id}-{slug}") + } +} + +#[cfg(test)] +mod tests { + use super::rollout_summary_file_stem; + use chrono::TimeZone; + use chrono::Utc; + use codex_protocol::ThreadId; + use codex_state::Stage1Output; + use pretty_assertions::assert_eq; + use std::path::PathBuf; + + fn stage1_output_with_slug(rollout_slug: Option<&str>) -> Stage1Output { + Stage1Output { + thread_id: ThreadId::new(), + source_updated_at: Utc.timestamp_opt(123, 0).single().expect("timestamp"), + raw_memory: "raw memory".to_string(), + rollout_summary: "summary".to_string(), + rollout_slug: rollout_slug.map(ToString::to_string), + cwd: PathBuf::from("/tmp/workspace"), + generated_at: Utc.timestamp_opt(124, 0).single().expect("timestamp"), + } + } + + #[test] + fn rollout_summary_file_stem_uses_thread_id_when_slug_missing() { + let memory = stage1_output_with_slug(None); + let thread_id = memory.thread_id.to_string(); + + assert_eq!(rollout_summary_file_stem(&memory), thread_id); + } + + #[test] + fn rollout_summary_file_stem_sanitizes_and_truncates_slug() { + let memory = + stage1_output_with_slug(Some("Unsafe Slug/With Spaces & Symbols + EXTRA_LONG_12345")); + let thread_id = memory.thread_id.to_string(); + + assert_eq!( + rollout_summary_file_stem(&memory), + format!("{thread_id}-unsafe_slug_with_spa") + ); + } + + #[test] + fn rollout_summary_file_stem_uses_thread_id_when_slug_is_empty() { + let memory = stage1_output_with_slug(Some("")); + let thread_id = memory.thread_id.to_string(); + + assert_eq!(rollout_summary_file_stem(&memory), thread_id); + } } diff --git a/codex-rs/core/src/memories/tests.rs b/codex-rs/core/src/memories/tests.rs index 1baea576c2..900cc21840 100644 --- a/codex-rs/core/src/memories/tests.rs +++ b/codex-rs/core/src/memories/tests.rs @@ -1,5 +1,6 @@ use super::storage::rebuild_raw_memories_file_from_memories; use super::storage::sync_rollout_summaries_from_memories; +use crate::config::types::DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL; use crate::memories::ensure_layout; use crate::memories::memory_root; use crate::memories::raw_memories_file; @@ -21,7 +22,7 @@ fn memory_root_uses_shared_global_path() { } #[test] -fn stage_one_output_schema_requires_all_declared_properties() { +fn stage_one_output_schema_keeps_rollout_slug_optional() { let schema = crate::memories::phase1::output_schema(); let properties = schema .get("properties") @@ -32,16 +33,17 @@ fn stage_one_output_schema_requires_all_declared_properties() { .and_then(Value::as_array) .expect("required array"); - let mut property_keys = properties.keys().map(String::as_str).collect::>(); - property_keys.sort_unstable(); - let mut required_keys = required .iter() .map(|key| key.as_str().expect("required key string")) .collect::>(); required_keys.sort_unstable(); - assert_eq!(required_keys, property_keys); + assert!( + properties.contains_key("rollout_slug"), + "schema should declare rollout_slug" + ); + assert_eq!(required_keys, vec!["raw_memory", "rollout_summary"]); } #[tokio::test] @@ -66,16 +68,25 @@ async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only source_updated_at: Utc.timestamp_opt(100, 0).single().expect("timestamp"), raw_memory: "raw memory".to_string(), rollout_summary: "short summary".to_string(), + rollout_slug: None, cwd: PathBuf::from("/tmp/workspace"), generated_at: Utc.timestamp_opt(101, 0).single().expect("timestamp"), }]; - sync_rollout_summaries_from_memories(&root, &memories) - .await - .expect("sync rollout summaries"); - rebuild_raw_memories_file_from_memories(&root, &memories) - .await - .expect("rebuild raw memories"); + sync_rollout_summaries_from_memories( + &root, + &memories, + DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL, + ) + .await + .expect("sync rollout summaries"); + rebuild_raw_memories_file_from_memories( + &root, + &memories, + DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL, + ) + .await + .expect("rebuild raw memories"); assert!(keep_path.is_file()); assert!(!drop_path.exists()); @@ -87,3 +98,578 @@ async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only assert!(raw_memories.contains(&keep_id)); assert!(raw_memories.contains("cwd: /tmp/workspace")); } + +#[tokio::test] +async fn sync_rollout_summaries_uses_thread_id_and_sanitized_slug_filename() { + let dir = tempdir().expect("tempdir"); + let root = dir.path().join("memory"); + ensure_layout(&root).await.expect("ensure layout"); + + let thread_id = ThreadId::new(); + let stale_unslugged_path = rollout_summaries_dir(&root).join(format!("{thread_id}.md")); + let stale_old_slug_path = + rollout_summaries_dir(&root).join(format!("{thread_id}--old-slug.md")); + tokio::fs::write(&stale_unslugged_path, "stale") + .await + .expect("write stale unslugged file"); + tokio::fs::write(&stale_old_slug_path, "stale") + .await + .expect("write stale old-slug file"); + + let memories = vec![Stage1Output { + thread_id, + source_updated_at: Utc.timestamp_opt(200, 0).single().expect("timestamp"), + raw_memory: "raw memory".to_string(), + rollout_summary: "short summary".to_string(), + rollout_slug: Some("Unsafe Slug/With Spaces & Symbols + EXTRA_LONG_12345".to_string()), + cwd: PathBuf::from("/tmp/workspace"), + generated_at: Utc.timestamp_opt(201, 0).single().expect("timestamp"), + }]; + + sync_rollout_summaries_from_memories( + &root, + &memories, + DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL, + ) + .await + .expect("sync rollout summaries"); + + let mut dir = tokio::fs::read_dir(rollout_summaries_dir(&root)) + .await + .expect("open rollout summaries dir"); + let mut files = Vec::new(); + while let Some(entry) = dir.next_entry().await.expect("read dir entry") { + files.push(entry.file_name().to_string_lossy().to_string()); + } + files.sort_unstable(); + + assert_eq!(files.len(), 1); + let file_name = &files[0]; + let stem = file_name + .strip_suffix(".md") + .expect("rollout summary file should end with .md"); + let slug = stem + .strip_prefix(&format!("{thread_id}-")) + .expect("rollout summary filename should include thread id and slug"); + assert!(slug.len() <= 20, "slug should be capped at 20 chars"); + assert!( + slug.chars() + .all(|ch| ch.is_ascii_lowercase() || ch.is_ascii_digit() || ch == '_'), + "slug should be file-safe lowercase ascii with underscores" + ); + + let summary = tokio::fs::read_to_string(rollout_summaries_dir(&root).join(file_name)) + .await + .expect("read rollout summary"); + assert!(summary.contains(&format!("thread_id: {thread_id}"))); + assert!( + !tokio::fs::try_exists(&stale_unslugged_path) + .await + .expect("check stale unslugged path"), + "slugged sync should prune stale unslugged filename for same thread" + ); + assert!( + !tokio::fs::try_exists(&stale_old_slug_path) + .await + .expect("check stale old slug path"), + "slugged sync should prune stale slugged filename for same thread" + ); +} + +mod phase2 { + use crate::CodexAuth; + use crate::ThreadManager; + use crate::agent::AgentControl; + use crate::codex::Session; + use crate::codex::make_session_and_context; + use crate::config::Config; + use crate::config::test_config; + use crate::memories::memory_root; + use crate::memories::phase2; + use crate::memories::raw_memories_file; + use crate::memories::rollout_summaries_dir; + use chrono::Utc; + use codex_config::Constrained; + use codex_protocol::ThreadId; + use codex_protocol::protocol::AskForApproval; + use codex_protocol::protocol::Op; + use codex_protocol::protocol::SandboxPolicy; + use codex_protocol::protocol::SessionSource; + use codex_state::Phase2JobClaimOutcome; + use codex_state::Stage1Output; + use codex_state::ThreadMetadataBuilder; + use std::path::PathBuf; + use std::sync::Arc; + use tempfile::TempDir; + + fn stage1_output_with_source_updated_at(source_updated_at: i64) -> Stage1Output { + Stage1Output { + thread_id: ThreadId::new(), + source_updated_at: chrono::DateTime::::from_timestamp(source_updated_at, 0) + .expect("valid source_updated_at timestamp"), + raw_memory: "raw memory".to_string(), + rollout_summary: "rollout summary".to_string(), + rollout_slug: None, + cwd: PathBuf::from("/tmp/workspace"), + generated_at: chrono::DateTime::::from_timestamp(source_updated_at + 1, 0) + .expect("valid generated_at timestamp"), + } + } + + struct DispatchHarness { + _codex_home: TempDir, + config: Arc, + session: Arc, + manager: ThreadManager, + state_db: Arc, + } + + impl DispatchHarness { + async fn new() -> Self { + let codex_home = tempfile::tempdir().expect("create temp codex home"); + let mut config = test_config(); + config.codex_home = codex_home.path().to_path_buf(); + config.cwd = config.codex_home.clone(); + let config = Arc::new(config); + + let state_db = codex_state::StateRuntime::init( + config.codex_home.clone(), + config.model_provider_id.clone(), + None, + ) + .await + .expect("initialize state db"); + + let manager = ThreadManager::with_models_provider_and_home_for_tests( + CodexAuth::from_api_key("dummy"), + config.model_provider.clone(), + config.codex_home.clone(), + ); + let (mut session, _turn_context) = make_session_and_context().await; + session.services.state_db = Some(Arc::clone(&state_db)); + session.services.agent_control = manager.agent_control(); + + Self { + _codex_home: codex_home, + config, + session: Arc::new(session), + manager, + state_db, + } + } + + async fn seed_stage1_output(&self, source_updated_at: i64) { + let thread_id = ThreadId::new(); + let mut metadata_builder = ThreadMetadataBuilder::new( + thread_id, + self.config + .codex_home + .join(format!("rollout-{thread_id}.jsonl")), + Utc::now(), + SessionSource::Cli, + ); + metadata_builder.cwd = self.config.cwd.clone(); + metadata_builder.model_provider = Some(self.config.model_provider_id.clone()); + let metadata = metadata_builder.build(&self.config.model_provider_id); + + self.state_db + .upsert_thread(&metadata) + .await + .expect("upsert thread metadata"); + + let claim = self + .state_db + .try_claim_stage1_job( + thread_id, + self.session.conversation_id, + source_updated_at, + 3_600, + 64, + ) + .await + .expect("claim stage-1 job"); + let ownership_token = match claim { + codex_state::Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token, + other => panic!("unexpected stage-1 claim outcome: {other:?}"), + }; + assert!( + self.state_db + .mark_stage1_job_succeeded( + thread_id, + &ownership_token, + source_updated_at, + "raw memory", + "rollout summary", + None, + ) + .await + .expect("mark stage-1 success"), + "stage-1 success should enqueue global consolidation" + ); + } + + async fn shutdown_threads(&self) { + self.manager + .remove_and_close_all_threads() + .await + .expect("shutdown spawned threads"); + } + + fn user_input_ops_count(&self) -> usize { + self.manager + .captured_ops() + .into_iter() + .filter(|(_, op)| matches!(op, Op::UserInput { .. })) + .count() + } + } + + #[test] + fn completion_watermark_never_regresses_below_claimed_input_watermark() { + let stage1_output = stage1_output_with_source_updated_at(123); + + let completion = phase2::get_watermark(1_000, &[stage1_output]); + pretty_assertions::assert_eq!(completion, 1_000); + } + + #[test] + fn completion_watermark_uses_claimed_watermark_when_there_are_no_memories() { + let completion = phase2::get_watermark(777, &[]); + pretty_assertions::assert_eq!(completion, 777); + } + + #[test] + fn completion_watermark_uses_latest_memory_timestamp_when_it_is_newer() { + let older = stage1_output_with_source_updated_at(123); + let newer = stage1_output_with_source_updated_at(456); + + let completion = phase2::get_watermark(200, &[older, newer]); + pretty_assertions::assert_eq!(completion, 456); + } + + #[tokio::test] + async fn dispatch_skips_when_global_job_is_not_dirty() { + let harness = DispatchHarness::new().await; + + phase2::run(&harness.session, Arc::clone(&harness.config)).await; + + pretty_assertions::assert_eq!(harness.user_input_ops_count(), 0); + let thread_ids = harness.manager.list_thread_ids().await; + pretty_assertions::assert_eq!(thread_ids.len(), 0); + } + + #[tokio::test] + async fn dispatch_skips_when_global_job_is_already_running() { + let harness = DispatchHarness::new().await; + harness + .state_db + .enqueue_global_consolidation(123) + .await + .expect("enqueue global consolidation"); + let claimed = harness + .state_db + .try_claim_global_phase2_job(ThreadId::new(), 3_600) + .await + .expect("claim running global lock"); + assert!( + matches!(claimed, Phase2JobClaimOutcome::Claimed { .. }), + "precondition should claim the running lock" + ); + + phase2::run(&harness.session, Arc::clone(&harness.config)).await; + + let running_claim = harness + .state_db + .try_claim_global_phase2_job(ThreadId::new(), 3_600) + .await + .expect("claim while lock is still running"); + pretty_assertions::assert_eq!(running_claim, Phase2JobClaimOutcome::SkippedRunning); + pretty_assertions::assert_eq!(harness.user_input_ops_count(), 0); + let thread_ids = harness.manager.list_thread_ids().await; + pretty_assertions::assert_eq!(thread_ids.len(), 0); + } + + #[tokio::test] + async fn dispatch_reclaims_stale_global_lock_and_starts_consolidation() { + let harness = DispatchHarness::new().await; + harness.seed_stage1_output(100).await; + + let stale_claim = harness + .state_db + .try_claim_global_phase2_job(ThreadId::new(), 0) + .await + .expect("claim stale global lock"); + assert!( + matches!(stale_claim, Phase2JobClaimOutcome::Claimed { .. }), + "stale lock precondition should be claimed" + ); + + phase2::run(&harness.session, Arc::clone(&harness.config)).await; + + let running_claim = harness + .state_db + .try_claim_global_phase2_job(ThreadId::new(), 3_600) + .await + .expect("claim while running"); + pretty_assertions::assert_eq!(running_claim, Phase2JobClaimOutcome::SkippedRunning); + + let user_input_ops = harness.user_input_ops_count(); + pretty_assertions::assert_eq!(user_input_ops, 1); + let thread_ids = harness.manager.list_thread_ids().await; + pretty_assertions::assert_eq!(thread_ids.len(), 1); + let subagent = harness + .manager + .get_thread(thread_ids[0]) + .await + .expect("get consolidation thread"); + let config_snapshot = subagent.config_snapshot().await; + pretty_assertions::assert_eq!(config_snapshot.approval_policy, AskForApproval::Never); + pretty_assertions::assert_eq!(config_snapshot.cwd, memory_root(&harness.config.codex_home)); + match config_snapshot.sandbox_policy { + SandboxPolicy::WorkspaceWrite { writable_roots, .. } => { + assert!( + writable_roots + .iter() + .any(|root| root.as_path() == harness.config.codex_home.as_path()), + "consolidation subagent should have codex_home as writable root" + ); + } + other => panic!("unexpected sandbox policy: {other:?}"), + } + + harness.shutdown_threads().await; + } + + #[tokio::test] + async fn dispatch_with_empty_stage1_outputs_rebuilds_local_artifacts() { + let harness = DispatchHarness::new().await; + let root = memory_root(&harness.config.codex_home); + let summaries_dir = rollout_summaries_dir(&root); + tokio::fs::create_dir_all(&summaries_dir) + .await + .expect("create rollout summaries dir"); + + let stale_summary_path = summaries_dir.join(format!("{}.md", ThreadId::new())); + tokio::fs::write(&stale_summary_path, "stale summary\n") + .await + .expect("write stale rollout summary"); + let raw_memories_path = raw_memories_file(&root); + tokio::fs::write(&raw_memories_path, "stale raw memories\n") + .await + .expect("write stale raw memories"); + let memory_index_path = root.join("MEMORY.md"); + tokio::fs::write(&memory_index_path, "stale memory index\n") + .await + .expect("write stale memory index"); + let memory_summary_path = root.join("memory_summary.md"); + tokio::fs::write(&memory_summary_path, "stale memory summary\n") + .await + .expect("write stale memory summary"); + let stale_skill_file = root.join("skills/demo/SKILL.md"); + tokio::fs::create_dir_all( + stale_skill_file + .parent() + .expect("skills subdirectory parent should exist"), + ) + .await + .expect("create stale skills dir"); + tokio::fs::write(&stale_skill_file, "stale skill\n") + .await + .expect("write stale skill"); + + harness + .state_db + .enqueue_global_consolidation(999) + .await + .expect("enqueue global consolidation"); + + phase2::run(&harness.session, Arc::clone(&harness.config)).await; + + assert!( + !tokio::fs::try_exists(&stale_summary_path) + .await + .expect("check stale summary existence"), + "empty consolidation should prune stale rollout summary files" + ); + let raw_memories = tokio::fs::read_to_string(&raw_memories_path) + .await + .expect("read rebuilt raw memories"); + pretty_assertions::assert_eq!(raw_memories, "# Raw Memories\n\nNo raw memories yet.\n"); + assert!( + !tokio::fs::try_exists(&memory_index_path) + .await + .expect("check memory index existence"), + "empty consolidation should remove stale MEMORY.md" + ); + assert!( + !tokio::fs::try_exists(&memory_summary_path) + .await + .expect("check memory summary existence"), + "empty consolidation should remove stale memory_summary.md" + ); + assert!( + !tokio::fs::try_exists(&stale_skill_file) + .await + .expect("check stale skill existence"), + "empty consolidation should remove stale skills artifacts" + ); + assert!( + !tokio::fs::try_exists(root.join("skills")) + .await + .expect("check skills dir existence"), + "empty consolidation should remove stale skills directory" + ); + let next_claim = harness + .state_db + .try_claim_global_phase2_job(ThreadId::new(), 3_600) + .await + .expect("claim global job after empty consolidation success"); + pretty_assertions::assert_eq!(next_claim, Phase2JobClaimOutcome::SkippedNotDirty); + pretty_assertions::assert_eq!(harness.user_input_ops_count(), 0); + let thread_ids = harness.manager.list_thread_ids().await; + pretty_assertions::assert_eq!(thread_ids.len(), 0); + + harness.shutdown_threads().await; + } + + #[tokio::test] + async fn dispatch_marks_job_for_retry_when_sandbox_policy_cannot_be_overridden() { + let harness = DispatchHarness::new().await; + harness + .state_db + .enqueue_global_consolidation(99) + .await + .expect("enqueue global consolidation"); + let mut constrained_config = harness.config.as_ref().clone(); + constrained_config.permissions.sandbox_policy = + Constrained::allow_only(SandboxPolicy::DangerFullAccess); + + phase2::run(&harness.session, Arc::new(constrained_config)).await; + + let retry_claim = harness + .state_db + .try_claim_global_phase2_job(ThreadId::new(), 3_600) + .await + .expect("claim global job after sandbox policy failure"); + pretty_assertions::assert_eq!(retry_claim, Phase2JobClaimOutcome::SkippedNotDirty); + pretty_assertions::assert_eq!(harness.user_input_ops_count(), 0); + let thread_ids = harness.manager.list_thread_ids().await; + pretty_assertions::assert_eq!(thread_ids.len(), 0); + } + + #[tokio::test] + async fn dispatch_marks_job_for_retry_when_syncing_artifacts_fails() { + let harness = DispatchHarness::new().await; + harness.seed_stage1_output(100).await; + let root = memory_root(&harness.config.codex_home); + tokio::fs::write(&root, "not a directory") + .await + .expect("create file at memory root"); + + phase2::run(&harness.session, Arc::clone(&harness.config)).await; + + let retry_claim = harness + .state_db + .try_claim_global_phase2_job(ThreadId::new(), 3_600) + .await + .expect("claim global job after sync failure"); + pretty_assertions::assert_eq!(retry_claim, Phase2JobClaimOutcome::SkippedNotDirty); + pretty_assertions::assert_eq!(harness.user_input_ops_count(), 0); + let thread_ids = harness.manager.list_thread_ids().await; + pretty_assertions::assert_eq!(thread_ids.len(), 0); + } + + #[tokio::test] + async fn dispatch_marks_job_for_retry_when_rebuilding_raw_memories_fails() { + let harness = DispatchHarness::new().await; + harness.seed_stage1_output(100).await; + let root = memory_root(&harness.config.codex_home); + tokio::fs::create_dir_all(raw_memories_file(&root)) + .await + .expect("create raw_memories.md as a directory"); + + phase2::run(&harness.session, Arc::clone(&harness.config)).await; + + let retry_claim = harness + .state_db + .try_claim_global_phase2_job(ThreadId::new(), 3_600) + .await + .expect("claim global job after rebuild failure"); + pretty_assertions::assert_eq!(retry_claim, Phase2JobClaimOutcome::SkippedNotDirty); + pretty_assertions::assert_eq!(harness.user_input_ops_count(), 0); + let thread_ids = harness.manager.list_thread_ids().await; + pretty_assertions::assert_eq!(thread_ids.len(), 0); + } + + #[tokio::test] + async fn dispatch_marks_job_for_retry_when_spawn_agent_fails() { + let codex_home = tempfile::tempdir().expect("create temp codex home"); + let mut config = test_config(); + config.codex_home = codex_home.path().to_path_buf(); + config.cwd = config.codex_home.clone(); + let config = Arc::new(config); + + let state_db = codex_state::StateRuntime::init( + config.codex_home.clone(), + config.model_provider_id.clone(), + None, + ) + .await + .expect("initialize state db"); + + let (mut session, _turn_context) = make_session_and_context().await; + session.services.state_db = Some(Arc::clone(&state_db)); + session.services.agent_control = AgentControl::default(); + let session = Arc::new(session); + + let thread_id = ThreadId::new(); + let mut metadata_builder = ThreadMetadataBuilder::new( + thread_id, + config.codex_home.join(format!("rollout-{thread_id}.jsonl")), + Utc::now(), + SessionSource::Cli, + ); + metadata_builder.cwd = config.cwd.clone(); + metadata_builder.model_provider = Some(config.model_provider_id.clone()); + let metadata = metadata_builder.build(&config.model_provider_id); + state_db + .upsert_thread(&metadata) + .await + .expect("upsert thread metadata"); + + let claim = state_db + .try_claim_stage1_job(thread_id, session.conversation_id, 100, 3_600, 64) + .await + .expect("claim stage-1 job"); + let ownership_token = match claim { + codex_state::Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token, + other => panic!("unexpected stage-1 claim outcome: {other:?}"), + }; + assert!( + state_db + .mark_stage1_job_succeeded( + thread_id, + &ownership_token, + 100, + "raw memory", + "rollout summary", + None, + ) + .await + .expect("mark stage-1 success"), + "stage-1 success should enqueue global consolidation" + ); + + phase2::run(&session, Arc::clone(&config)).await; + + let retry_claim = state_db + .try_claim_global_phase2_job(ThreadId::new(), 3_600) + .await + .expect("claim global job after spawn failure"); + pretty_assertions::assert_eq!( + retry_claim, + Phase2JobClaimOutcome::SkippedNotDirty, + "spawn failures should leave the job in retry backoff instead of running" + ); + } +} diff --git a/codex-rs/core/src/rollout/list.rs b/codex-rs/core/src/rollout/list.rs index af9223119c..9b2b00c78b 100644 --- a/codex-rs/core/src/rollout/list.rs +++ b/codex-rs/core/src/rollout/list.rs @@ -1214,7 +1214,7 @@ async fn find_thread_path_by_id_str_in_subdir( let found = results.matches.into_iter().next().map(|m| m.full_path()); if let Some(found_path) = found.as_ref() { - tracing::error!("state db missing rollout path for thread {id_str}"); + tracing::debug!("state db missing rollout path for thread {id_str}"); state_db::record_discrepancy("find_thread_path_by_id_str_in_subdir", "falling_back"); state_db::read_repair_rollout_path( state_db_ctx.as_deref(), diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 3b65aa05c4..e44bf4c1d8 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -1825,6 +1825,7 @@ pub enum SubAgentSource { parent_thread_id: ThreadId, depth: i32, }, + MemoryConsolidation, Other(String), } @@ -1846,6 +1847,7 @@ impl fmt::Display for SubAgentSource { match self { SubAgentSource::Review => f.write_str("review"), SubAgentSource::Compact => f.write_str("compact"), + SubAgentSource::MemoryConsolidation => f.write_str("memory_consolidation"), SubAgentSource::ThreadSpawn { parent_thread_id, depth, diff --git a/codex-rs/state/migrations/0009_stage1_outputs_rollout_slug.sql b/codex-rs/state/migrations/0009_stage1_outputs_rollout_slug.sql new file mode 100644 index 0000000000..9b3a1e077d --- /dev/null +++ b/codex-rs/state/migrations/0009_stage1_outputs_rollout_slug.sql @@ -0,0 +1,2 @@ +ALTER TABLE stage1_outputs +ADD COLUMN rollout_slug TEXT; diff --git a/codex-rs/state/src/model/memories.rs b/codex-rs/state/src/model/memories.rs index aff702d673..813c999395 100644 --- a/codex-rs/state/src/model/memories.rs +++ b/codex-rs/state/src/model/memories.rs @@ -15,6 +15,7 @@ pub struct Stage1Output { pub source_updated_at: DateTime, pub raw_memory: String, pub rollout_summary: String, + pub rollout_slug: Option, pub cwd: PathBuf, pub generated_at: DateTime, } @@ -25,6 +26,7 @@ pub(crate) struct Stage1OutputRow { source_updated_at: i64, raw_memory: String, rollout_summary: String, + rollout_slug: Option, cwd: String, generated_at: i64, } @@ -36,6 +38,7 @@ impl Stage1OutputRow { source_updated_at: row.try_get("source_updated_at")?, raw_memory: row.try_get("raw_memory")?, rollout_summary: row.try_get("rollout_summary")?, + rollout_slug: row.try_get("rollout_slug")?, cwd: row.try_get("cwd")?, generated_at: row.try_get("generated_at")?, }) @@ -51,6 +54,7 @@ impl TryFrom for Stage1Output { source_updated_at: epoch_seconds_to_datetime(row.source_updated_at)?, raw_memory: row.raw_memory, rollout_summary: row.rollout_summary, + rollout_slug: row.rollout_slug, cwd: PathBuf::from(row.cwd), generated_at: epoch_seconds_to_datetime(row.generated_at)?, }) diff --git a/codex-rs/state/src/runtime.rs b/codex-rs/state/src/runtime.rs index 77930f8001..4cc9c63b17 100644 --- a/codex-rs/state/src/runtime.rs +++ b/codex-rs/state/src/runtime.rs @@ -1143,7 +1143,14 @@ WHERE id = 1 assert!( runtime - .mark_stage1_job_succeeded(thread_id, ownership_token.as_str(), 100, "raw", "sum") + .mark_stage1_job_succeeded( + thread_id, + ownership_token.as_str(), + 100, + "raw", + "sum", + None, + ) .await .expect("mark stage1 succeeded"), "stage1 success should finalize for current token" @@ -1492,6 +1499,7 @@ WHERE id = 1 up_to_date.updated_at.timestamp(), "raw", "summary", + None, ) .await .expect("mark up-to-date thread succeeded"), @@ -1715,6 +1723,7 @@ WHERE kind = 'memory_stage1' claim.thread.updated_at.timestamp(), "raw", "summary", + None, ) .await .expect("mark first-batch stage1 success"), @@ -1766,7 +1775,14 @@ WHERE kind = 'memory_stage1' }; assert!( runtime - .mark_stage1_job_succeeded(thread_id, ownership_token.as_str(), 100, "raw", "sum") + .mark_stage1_job_succeeded( + thread_id, + ownership_token.as_str(), + 100, + "raw", + "sum", + None, + ) .await .expect("mark stage1 succeeded"), "mark stage1 succeeded should write stage1_outputs" @@ -2058,6 +2074,7 @@ WHERE kind = 'memory_stage1' 100, "raw memory a", "summary a", + None, ) .await .expect("mark stage1 succeeded a"), @@ -2080,6 +2097,7 @@ WHERE kind = 'memory_stage1' 101, "raw memory b", "summary b", + Some("rollout-b"), ) .await .expect("mark stage1 succeeded b"), @@ -2093,9 +2111,11 @@ WHERE kind = 'memory_stage1' assert_eq!(outputs.len(), 2); assert_eq!(outputs[0].thread_id, thread_id_b); assert_eq!(outputs[0].rollout_summary, "summary b"); + assert_eq!(outputs[0].rollout_slug.as_deref(), Some("rollout-b")); assert_eq!(outputs[0].cwd, codex_home.join("workspace-b")); assert_eq!(outputs[1].thread_id, thread_id_a); assert_eq!(outputs[1].rollout_summary, "summary a"); + assert_eq!(outputs[1].rollout_slug, None); assert_eq!(outputs[1].cwd, codex_home.join("workspace-a")); let _ = tokio::fs::remove_dir_all(codex_home).await; @@ -2208,7 +2228,14 @@ VALUES (?, ?, ?, ?, ?) }; assert!( runtime - .mark_stage1_job_succeeded(thread_a, token_a.as_str(), 100, "raw-a", "summary-a") + .mark_stage1_job_succeeded( + thread_a, + token_a.as_str(), + 100, + "raw-a", + "summary-a", + None, + ) .await .expect("mark stage1 succeeded a"), "stage1 success should persist output for thread a" @@ -2224,7 +2251,14 @@ VALUES (?, ?, ?, ?, ?) }; assert!( runtime - .mark_stage1_job_succeeded(thread_b, token_b.as_str(), 101, "raw-b", "summary-b") + .mark_stage1_job_succeeded( + thread_b, + token_b.as_str(), + 101, + "raw-b", + "summary-b", + None, + ) .await .expect("mark stage1 succeeded b"), "stage1 success should persist output for thread b" diff --git a/codex-rs/state/src/runtime/memories.rs b/codex-rs/state/src/runtime/memories.rs index 4d1f6ffd59..c2a7f1fe91 100644 --- a/codex-rs/state/src/runtime/memories.rs +++ b/codex-rs/state/src/runtime/memories.rs @@ -191,7 +191,13 @@ LEFT JOIN jobs let rows = sqlx::query( r#" -SELECT so.thread_id, so.source_updated_at, so.raw_memory, so.rollout_summary, so.generated_at +SELECT + so.thread_id, + so.source_updated_at, + so.raw_memory, + so.rollout_summary, + so.rollout_slug, + so.generated_at , COALESCE(t.cwd, '') AS cwd FROM stage1_outputs AS so LEFT JOIN threads AS t @@ -407,6 +413,7 @@ WHERE kind = ? AND job_key = ? /// - sets `status='done'` and `last_success_watermark = input_watermark` /// - upserts `stage1_outputs` for the thread, replacing existing output only /// when `source_updated_at` is newer or equal + /// - persists optional `rollout_slug` for rollout summary artifact naming /// - enqueues/advances the global phase-2 job watermark using /// `source_updated_at` pub async fn mark_stage1_job_succeeded( @@ -416,6 +423,7 @@ WHERE kind = ? AND job_key = ? source_updated_at: i64, raw_memory: &str, rollout_summary: &str, + rollout_slug: Option<&str>, ) -> anyhow::Result { let now = Utc::now().timestamp(); let thread_id = thread_id.to_string(); @@ -454,12 +462,14 @@ INSERT INTO stage1_outputs ( source_updated_at, raw_memory, rollout_summary, + rollout_slug, generated_at -) VALUES (?, ?, ?, ?, ?) +) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT(thread_id) DO UPDATE SET source_updated_at = excluded.source_updated_at, raw_memory = excluded.raw_memory, rollout_summary = excluded.rollout_summary, + rollout_slug = excluded.rollout_slug, generated_at = excluded.generated_at WHERE excluded.source_updated_at >= stage1_outputs.source_updated_at "#, @@ -468,6 +478,7 @@ WHERE excluded.source_updated_at >= stage1_outputs.source_updated_at .bind(source_updated_at) .bind(raw_memory) .bind(rollout_summary) + .bind(rollout_slug) .bind(now) .execute(&mut *tx) .await?;