Merge branch 'main' into jif/investigate-shell-snapshot-issue-for

This commit is contained in:
jif-oai
2026-02-13 15:43:13 +00:00
committed by GitHub
32 changed files with 1356 additions and 1128 deletions

View File

@@ -5997,7 +5997,8 @@
{
"enum": [
"review",
"compact"
"compact",
"memory_consolidation"
],
"type": "string"
},

View File

@@ -9126,7 +9126,8 @@
{
"enum": [
"review",
"compact"
"compact",
"memory_consolidation"
],
"type": "string"
},
@@ -14480,7 +14481,8 @@
{
"enum": [
"review",
"compact"
"compact",
"memory_consolidation"
],
"type": "string"
},

View File

@@ -113,7 +113,8 @@
{
"enum": [
"review",
"compact"
"compact",
"memory_consolidation"
],
"type": "string"
},

View File

@@ -113,7 +113,8 @@
{
"enum": [
"review",
"compact"
"compact",
"memory_consolidation"
],
"type": "string"
},

View File

@@ -666,7 +666,8 @@
{
"enum": [
"review",
"compact"
"compact",
"memory_consolidation"
],
"type": "string"
},

View File

@@ -472,7 +472,8 @@
{
"enum": [
"review",
"compact"
"compact",
"memory_consolidation"
],
"type": "string"
},

View File

@@ -472,7 +472,8 @@
{
"enum": [
"review",
"compact"
"compact",
"memory_consolidation"
],
"type": "string"
},

View File

@@ -666,7 +666,8 @@
{
"enum": [
"review",
"compact"
"compact",
"memory_consolidation"
],
"type": "string"
},

View File

@@ -472,7 +472,8 @@
{
"enum": [
"review",
"compact"
"compact",
"memory_consolidation"
],
"type": "string"
},

View File

@@ -666,7 +666,8 @@
{
"enum": [
"review",
"compact"
"compact",
"memory_consolidation"
],
"type": "string"
},

View File

@@ -472,7 +472,8 @@
{
"enum": [
"review",
"compact"
"compact",
"memory_consolidation"
],
"type": "string"
},

View File

@@ -472,7 +472,8 @@
{
"enum": [
"review",
"compact"
"compact",
"memory_consolidation"
],
"type": "string"
},

View File

@@ -3,4 +3,4 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { ThreadId } from "./ThreadId";
export type SubAgentSource = "review" | "compact" | { "thread_spawn": { parent_thread_id: ThreadId, depth: number, } } | { "other": string };
export type SubAgentSource = "review" | "compact" | { "thread_spawn": { parent_thread_id: ThreadId, depth: number, } } | "memory_consolidation" | { "other": string };

View File

@@ -12,6 +12,8 @@ const LINUX_SANDBOX_ARG0: &str = "codex-linux-sandbox";
const APPLY_PATCH_ARG0: &str = "apply_patch";
const MISSPELLED_APPLY_PATCH_ARG0: &str = "applypatch";
const LOCK_FILENAME: &str = ".lock";
#[cfg(target_os = "windows")]
const WINDOWS_TOKIO_WORKER_STACK_SIZE_BYTES: usize = 16 * 1024 * 1024;
/// Keeps the per-session PATH entry alive and locked for the process lifetime.
pub struct Arg0PathEntryGuard {
@@ -112,7 +114,7 @@ where
// Regular invocation create a Tokio runtime and execute the provided
// async entry-point.
let runtime = tokio::runtime::Runtime::new()?;
let runtime = build_runtime()?;
runtime.block_on(async move {
let codex_linux_sandbox_exe: Option<PathBuf> = if cfg!(target_os = "linux") {
std::env::current_exe().ok()
@@ -124,6 +126,18 @@ where
})
}
fn build_runtime() -> anyhow::Result<tokio::runtime::Runtime> {
let mut builder = tokio::runtime::Builder::new_multi_thread();
builder.enable_all();
#[cfg(target_os = "windows")]
{
// Defensive hardening: Windows worker threads have lower effective
// stack headroom, so use a larger stack for runtime workers.
builder.thread_stack_size(WINDOWS_TOKIO_WORKER_STACK_SIZE_BYTES);
}
Ok(builder.build()?)
}
const ILLEGAL_ENV_VAR_PREFIX: &str = "CODEX_";
/// Load env vars from ~/.codex/.env.

View File

@@ -17,6 +17,9 @@ pub(crate) fn subagent_header(source: &Option<SessionSource>) -> Option<String>
match sub {
codex_protocol::protocol::SubAgentSource::Review => Some("review".to_string()),
codex_protocol::protocol::SubAgentSource::Compact => Some("compact".to_string()),
codex_protocol::protocol::SubAgentSource::MemoryConsolidation => {
Some("memory_consolidation".to_string())
}
codex_protocol::protocol::SubAgentSource::ThreadSpawn { .. } => {
Some("collab_spawn".to_string())
}

View File

@@ -434,6 +434,43 @@
}
]
},
"MemoriesToml": {
"additionalProperties": false,
"description": "Memories settings loaded from config.toml.",
"properties": {
"max_raw_memories_for_global": {
"description": "Maximum number of recent raw memories retained for global consolidation.",
"format": "uint",
"minimum": 0.0,
"type": "integer"
},
"max_rollout_age_days": {
"description": "Maximum age of the threads used for memories.",
"format": "int64",
"type": "integer"
},
"max_rollouts_per_startup": {
"description": "Maximum number of rollout candidates processed per pass.",
"format": "uint",
"minimum": 0.0,
"type": "integer"
},
"min_rollout_idle_hours": {
"description": "Minimum idle time between last thread activity and memory creation (hours). > 12h recommended.",
"format": "int64",
"type": "integer"
},
"phase_1_model": {
"description": "Model used for thread summarisation.",
"type": "string"
},
"phase_2_model": {
"description": "Model used for memory consolidation.",
"type": "string"
}
},
"type": "object"
},
"ModeKind": {
"description": "Initial collaboration mode to use when the TUI starts.",
"enum": [
@@ -1481,6 +1518,14 @@
"description": "Definition for MCP servers that Codex can reach out to for tool calls.",
"type": "object"
},
"memories": {
"allOf": [
{
"$ref": "#/definitions/MemoriesToml"
}
],
"description": "Memories subsystem settings."
},
"model": {
"description": "Optional override of model selection.",
"type": "string"

View File

@@ -325,6 +325,9 @@ impl ModelClient {
let subagent = match sub {
crate::protocol::SubAgentSource::Review => "review".to_string(),
crate::protocol::SubAgentSource::Compact => "compact".to_string(),
crate::protocol::SubAgentSource::MemoryConsolidation => {
"memory_consolidation".to_string()
}
crate::protocol::SubAgentSource::ThreadSpawn { .. } => "collab_spawn".to_string(),
crate::protocol::SubAgentSource::Other(label) => label.clone(),
};

View File

@@ -7,6 +7,8 @@ use crate::config::types::History;
use crate::config::types::McpServerConfig;
use crate::config::types::McpServerDisabledReason;
use crate::config::types::McpServerTransportConfig;
use crate::config::types::MemoriesConfig;
use crate::config::types::MemoriesToml;
use crate::config::types::Notice;
use crate::config::types::NotificationMethod;
use crate::config::types::Notifications;
@@ -289,6 +291,9 @@ pub struct Config {
/// Maximum number of agent threads that can be open concurrently.
pub agent_max_threads: Option<usize>,
/// Memories subsystem settings.
pub memories: MemoriesConfig,
/// Directory containing all Codex state (defaults to `~/.codex` but can be
/// overridden by the `CODEX_HOME` environment variable).
pub codex_home: PathBuf,
@@ -1006,6 +1011,9 @@ pub struct ConfigToml {
/// Agent-related settings (thread limits, etc.).
pub agents: Option<AgentsToml>,
/// Memories subsystem settings.
pub memories: Option<MemoriesToml>,
/// User-level skill config entries keyed by SKILL.md path.
pub skills: Option<SkillsConfig>,
@@ -1771,6 +1779,7 @@ impl Config {
.collect(),
tool_output_token_limit: cfg.tool_output_token_limit,
agent_max_threads,
memories: cfg.memories.unwrap_or_default().into(),
codex_home,
log_dir,
config_layer_stack,
@@ -1985,6 +1994,8 @@ mod tests {
use crate::config::types::FeedbackConfigToml;
use crate::config::types::HistoryPersistence;
use crate::config::types::McpServerTransportConfig;
use crate::config::types::MemoriesConfig;
use crate::config::types::MemoriesToml;
use crate::config::types::NotificationMethod;
use crate::config::types::Notifications;
use crate::config_loader::RequirementSource;
@@ -2068,6 +2079,47 @@ persistence = "none"
}),
history_no_persistence_cfg.history
);
let memories = r#"
[memories]
max_raw_memories_for_global = 512
max_rollout_age_days = 42
max_rollouts_per_startup = 9
min_rollout_idle_hours = 24
phase_1_model = "gpt-5-mini"
phase_2_model = "gpt-5"
"#;
let memories_cfg =
toml::from_str::<ConfigToml>(memories).expect("TOML deserialization should succeed");
assert_eq!(
Some(MemoriesToml {
max_raw_memories_for_global: Some(512),
max_rollout_age_days: Some(42),
max_rollouts_per_startup: Some(9),
min_rollout_idle_hours: Some(24),
phase_1_model: Some("gpt-5-mini".to_string()),
phase_2_model: Some("gpt-5".to_string()),
}),
memories_cfg.memories
);
let config = Config::load_from_base_config_with_overrides(
memories_cfg,
ConfigOverrides::default(),
tempdir().expect("tempdir").path().to_path_buf(),
)
.expect("load config from memories settings");
assert_eq!(
config.memories,
MemoriesConfig {
max_raw_memories_for_global: 512,
max_rollout_age_days: 42,
max_rollouts_per_startup: 9,
min_rollout_idle_hours: 24,
phase_1_model: Some("gpt-5-mini".to_string()),
phase_2_model: Some("gpt-5".to_string()),
}
);
}
#[test]
@@ -4047,6 +4099,7 @@ model_verbosity = "high"
project_doc_fallback_filenames: Vec::new(),
tool_output_token_limit: None,
agent_max_threads: DEFAULT_AGENT_MAX_THREADS,
memories: MemoriesConfig::default(),
codex_home: fixture.codex_home(),
log_dir: fixture.codex_home().join("log"),
config_layer_stack: Default::default(),
@@ -4156,6 +4209,7 @@ model_verbosity = "high"
project_doc_fallback_filenames: Vec::new(),
tool_output_token_limit: None,
agent_max_threads: DEFAULT_AGENT_MAX_THREADS,
memories: MemoriesConfig::default(),
codex_home: fixture.codex_home(),
log_dir: fixture.codex_home().join("log"),
config_layer_stack: Default::default(),
@@ -4263,6 +4317,7 @@ model_verbosity = "high"
project_doc_fallback_filenames: Vec::new(),
tool_output_token_limit: None,
agent_max_threads: DEFAULT_AGENT_MAX_THREADS,
memories: MemoriesConfig::default(),
codex_home: fixture.codex_home(),
log_dir: fixture.codex_home().join("log"),
config_layer_stack: Default::default(),
@@ -4356,6 +4411,7 @@ model_verbosity = "high"
project_doc_fallback_filenames: Vec::new(),
tool_output_token_limit: None,
agent_max_threads: DEFAULT_AGENT_MAX_THREADS,
memories: MemoriesConfig::default(),
codex_home: fixture.codex_home(),
log_dir: fixture.codex_home().join("log"),
config_layer_stack: Default::default(),

View File

@@ -23,6 +23,10 @@ use serde::Serialize;
use serde::de::Error as SerdeError;
pub const DEFAULT_OTEL_ENVIRONMENT: &str = "dev";
pub const DEFAULT_MEMORIES_MAX_ROLLOUTS_PER_STARTUP: usize = 8;
pub const DEFAULT_MEMORIES_MAX_ROLLOUT_AGE_DAYS: i64 = 30;
pub const DEFAULT_MEMORIES_MIN_ROLLOUT_IDLE_HOURS: i64 = 12;
pub const DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL: usize = 1_024;
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, JsonSchema)]
#[serde(rename_all = "kebab-case")]
@@ -353,6 +357,74 @@ pub struct FeedbackConfigToml {
pub enabled: Option<bool>,
}
/// Memories settings loaded from config.toml.
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema)]
#[schemars(deny_unknown_fields)]
pub struct MemoriesToml {
/// Maximum number of recent raw memories retained for global consolidation.
pub max_raw_memories_for_global: Option<usize>,
/// Maximum age of the threads used for memories.
pub max_rollout_age_days: Option<i64>,
/// Maximum number of rollout candidates processed per pass.
pub max_rollouts_per_startup: Option<usize>,
/// Minimum idle time between last thread activity and memory creation (hours). > 12h recommended.
pub min_rollout_idle_hours: Option<i64>,
/// Model used for thread summarisation.
pub phase_1_model: Option<String>,
/// Model used for memory consolidation.
pub phase_2_model: Option<String>,
}
/// Effective memories settings after defaults are applied.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MemoriesConfig {
pub max_raw_memories_for_global: usize,
pub max_rollout_age_days: i64,
pub max_rollouts_per_startup: usize,
pub min_rollout_idle_hours: i64,
pub phase_1_model: Option<String>,
pub phase_2_model: Option<String>,
}
impl Default for MemoriesConfig {
fn default() -> Self {
Self {
max_raw_memories_for_global: DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL,
max_rollout_age_days: DEFAULT_MEMORIES_MAX_ROLLOUT_AGE_DAYS,
max_rollouts_per_startup: DEFAULT_MEMORIES_MAX_ROLLOUTS_PER_STARTUP,
min_rollout_idle_hours: DEFAULT_MEMORIES_MIN_ROLLOUT_IDLE_HOURS,
phase_1_model: None,
phase_2_model: None,
}
}
}
impl From<MemoriesToml> for MemoriesConfig {
fn from(toml: MemoriesToml) -> Self {
let defaults = Self::default();
Self {
max_raw_memories_for_global: toml
.max_raw_memories_for_global
.unwrap_or(defaults.max_raw_memories_for_global)
.min(4096),
max_rollout_age_days: toml
.max_rollout_age_days
.unwrap_or(defaults.max_rollout_age_days)
.clamp(0, 90),
max_rollouts_per_startup: toml
.max_rollouts_per_startup
.unwrap_or(defaults.max_rollouts_per_startup)
.min(128),
min_rollout_idle_hours: toml
.min_rollout_idle_hours
.unwrap_or(defaults.min_rollout_idle_hours)
.clamp(1, 48),
phase_1_model: toml.phase_1_model,
phase_2_model: toml.phase_2_model,
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub enum AppDisabledReason {

View File

@@ -1,688 +0,0 @@
use crate::codex::Session;
use crate::config::Config;
use crate::config::Constrained;
use crate::memories::memory_root;
use crate::memories::metrics;
use crate::memories::phase_two;
use crate::memories::phase2::spawn_phase2_completion_task;
use crate::memories::prompts::build_consolidation_prompt;
use crate::memories::storage::rebuild_raw_memories_file_from_memories;
use crate::memories::storage::sync_rollout_summaries_from_memories;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::user_input::UserInput;
use codex_utils_absolute_path::AbsolutePathBuf;
use std::sync::Arc;
use tracing::debug;
use tracing::info;
use tracing::warn;
//TODO(jif) clean.
fn completion_watermark(
claimed_watermark: i64,
latest_memories: &[codex_state::Stage1Output],
) -> i64 {
latest_memories
.iter()
.map(|memory| memory.source_updated_at.timestamp())
.max()
.unwrap_or(claimed_watermark)
.max(claimed_watermark)
}
pub(in crate::memories) async fn run_global_memory_consolidation(
session: &Arc<Session>,
config: Arc<Config>,
) -> bool {
let otel_manager = &session.services.otel_manager;
let Some(state_db) = session.services.state_db.as_deref() else {
warn!("state db unavailable; skipping global memory consolidation");
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "skipped_state_db_unavailable")],
);
return false;
};
let claim = match state_db
.try_claim_global_phase2_job(session.conversation_id, phase_two::JOB_LEASE_SECONDS)
.await
{
Ok(claim) => claim,
Err(err) => {
warn!("state db try_claim_global_phase2_job failed during memories startup: {err}");
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "failed_claim")],
);
return false;
}
};
let (ownership_token, claimed_watermark) = match claim {
codex_state::Phase2JobClaimOutcome::Claimed {
ownership_token,
input_watermark,
} => {
otel_manager.counter(metrics::MEMORY_PHASE_TWO_JOBS, 1, &[("status", "claimed")]);
(ownership_token, input_watermark)
}
codex_state::Phase2JobClaimOutcome::SkippedNotDirty => {
debug!("memory phase-2 global lock is up-to-date; skipping consolidation");
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "skipped_not_dirty")],
);
return false;
}
codex_state::Phase2JobClaimOutcome::SkippedRunning => {
debug!("memory phase-2 global consolidation already running; skipping");
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "skipped_running")],
);
return false;
}
};
let root = memory_root(&config.codex_home);
let consolidation_config = {
let mut consolidation_config = config.as_ref().clone();
consolidation_config.cwd = root.clone();
consolidation_config.permissions.approval_policy =
Constrained::allow_only(AskForApproval::Never);
let mut writable_roots = Vec::new();
match AbsolutePathBuf::from_absolute_path(consolidation_config.codex_home.clone()) {
Ok(codex_home) => writable_roots.push(codex_home),
Err(err) => warn!(
"memory phase-2 consolidation could not add codex_home writable root {}: {err}",
consolidation_config.codex_home.display()
),
}
let consolidation_sandbox_policy = SandboxPolicy::WorkspaceWrite {
writable_roots,
read_only_access: Default::default(),
network_access: false,
exclude_tmpdir_env_var: false,
exclude_slash_tmp: false,
};
if let Err(err) = consolidation_config
.permissions
.sandbox_policy
.set(consolidation_sandbox_policy)
{
warn!("memory phase-2 consolidation sandbox policy was rejected by constraints: {err}");
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "failed_sandbox_policy")],
);
let _ = state_db
.mark_global_phase2_job_failed(
&ownership_token,
"consolidation sandbox policy was rejected by constraints",
phase_two::JOB_RETRY_DELAY_SECONDS,
)
.await;
return false;
}
consolidation_config
};
let latest_memories = match state_db
.list_stage1_outputs_for_global(phase_two::MAX_RAW_MEMORIES_FOR_GLOBAL)
.await
{
Ok(memories) => memories,
Err(err) => {
warn!("state db list_stage1_outputs_for_global failed during consolidation: {err}");
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "failed_load_stage1_outputs")],
);
let _ = state_db
.mark_global_phase2_job_failed(
&ownership_token,
"failed to read stage-1 outputs before global consolidation",
phase_two::JOB_RETRY_DELAY_SECONDS,
)
.await;
return false;
}
};
if !latest_memories.is_empty() {
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_INPUT,
latest_memories.len() as i64,
&[],
);
}
let completion_watermark = completion_watermark(claimed_watermark, &latest_memories);
if let Err(err) = sync_rollout_summaries_from_memories(&root, &latest_memories).await {
warn!("failed syncing local memory artifacts for global consolidation: {err}");
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "failed_sync_artifacts")],
);
let _ = state_db
.mark_global_phase2_job_failed(
&ownership_token,
"failed syncing local memory artifacts",
phase_two::JOB_RETRY_DELAY_SECONDS,
)
.await;
return false;
}
if let Err(err) = rebuild_raw_memories_file_from_memories(&root, &latest_memories).await {
warn!("failed rebuilding raw memories aggregate for global consolidation: {err}");
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "failed_rebuild_raw_memories")],
);
let _ = state_db
.mark_global_phase2_job_failed(
&ownership_token,
"failed rebuilding raw memories aggregate",
phase_two::JOB_RETRY_DELAY_SECONDS,
)
.await;
return false;
}
if latest_memories.is_empty() {
debug!("memory phase-2 has no stage-1 outputs; finalized local memory artifacts");
let _ = state_db
.mark_global_phase2_job_succeeded(&ownership_token, completion_watermark)
.await;
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "succeeded_no_input")],
);
return false;
}
let prompt = build_consolidation_prompt(&root);
let input = vec![UserInput::Text {
text: prompt,
text_elements: vec![],
}];
let source = SessionSource::SubAgent(SubAgentSource::Other(
phase_two::MEMORY_CONSOLIDATION_SUBAGENT_LABEL.to_string(),
));
match session
.services
.agent_control
.spawn_agent(consolidation_config, input, Some(source))
.await
{
Ok(consolidation_agent_id) => {
info!(
"memory phase-2 global consolidation agent started: agent_id={consolidation_agent_id}"
);
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "agent_spawned")],
);
spawn_phase2_completion_task(
session.as_ref(),
ownership_token,
completion_watermark,
consolidation_agent_id,
);
true
}
Err(err) => {
warn!("failed to spawn global memory consolidation agent: {err}");
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "failed_spawn_agent")],
);
let _ = state_db
.mark_global_phase2_job_failed(
&ownership_token,
"failed to spawn consolidation agent",
phase_two::JOB_RETRY_DELAY_SECONDS,
)
.await;
false
}
}
}
#[cfg(test)]
mod tests {
use super::completion_watermark;
use super::run_global_memory_consolidation;
use crate::CodexAuth;
use crate::ThreadManager;
use crate::agent::control::AgentControl;
use crate::codex::Session;
use crate::codex::make_session_and_context;
use crate::config::Config;
use crate::config::test_config;
use crate::memories::memory_root;
use crate::memories::raw_memories_file;
use crate::memories::rollout_summaries_dir;
use chrono::Utc;
use codex_protocol::ThreadId;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::SessionSource;
use codex_state::Phase2JobClaimOutcome;
use codex_state::Stage1Output;
use codex_state::ThreadMetadataBuilder;
use pretty_assertions::assert_eq;
use std::path::PathBuf;
use std::sync::Arc;
use tempfile::TempDir;
struct DispatchHarness {
_codex_home: TempDir,
config: Arc<Config>,
session: Arc<Session>,
manager: ThreadManager,
state_db: Arc<codex_state::StateRuntime>,
}
impl DispatchHarness {
async fn new() -> Self {
let codex_home = tempfile::tempdir().expect("create temp codex home");
let mut config = test_config();
config.codex_home = codex_home.path().to_path_buf();
config.cwd = config.codex_home.clone();
let config = Arc::new(config);
let state_db = codex_state::StateRuntime::init(
config.codex_home.clone(),
config.model_provider_id.clone(),
None,
)
.await
.expect("initialize state db");
let manager = ThreadManager::with_models_provider_and_home_for_tests(
CodexAuth::from_api_key("dummy"),
config.model_provider.clone(),
config.codex_home.clone(),
);
let (mut session, _turn_context) = make_session_and_context().await;
session.services.state_db = Some(Arc::clone(&state_db));
session.services.agent_control = manager.agent_control();
Self {
_codex_home: codex_home,
config,
session: Arc::new(session),
manager,
state_db,
}
}
async fn seed_stage1_output(&self, source_updated_at: i64) {
let thread_id = ThreadId::new();
let mut metadata_builder = ThreadMetadataBuilder::new(
thread_id,
self.config
.codex_home
.join(format!("rollout-{thread_id}.jsonl")),
Utc::now(),
SessionSource::Cli,
);
metadata_builder.cwd = self.config.cwd.clone();
metadata_builder.model_provider = Some(self.config.model_provider_id.clone());
let metadata = metadata_builder.build(&self.config.model_provider_id);
self.state_db
.upsert_thread(&metadata)
.await
.expect("upsert thread metadata");
let claim = self
.state_db
.try_claim_stage1_job(
thread_id,
self.session.conversation_id,
source_updated_at,
3_600,
64,
)
.await
.expect("claim stage-1 job");
let ownership_token = match claim {
codex_state::Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
other => panic!("unexpected stage-1 claim outcome: {other:?}"),
};
assert!(
self.state_db
.mark_stage1_job_succeeded(
thread_id,
&ownership_token,
source_updated_at,
"raw memory",
"rollout summary",
)
.await
.expect("mark stage-1 success"),
"stage-1 success should enqueue global consolidation"
);
}
async fn shutdown_threads(&self) {
self.manager
.remove_and_close_all_threads()
.await
.expect("shutdown spawned threads");
}
fn user_input_ops_count(&self) -> usize {
self.manager
.captured_ops()
.into_iter()
.filter(|(_, op)| matches!(op, Op::UserInput { .. }))
.count()
}
}
#[test]
fn completion_watermark_never_regresses_below_claimed_input_watermark() {
let stage1_output = Stage1Output {
thread_id: ThreadId::new(),
source_updated_at: chrono::DateTime::<Utc>::from_timestamp(123, 0)
.expect("valid source_updated_at timestamp"),
raw_memory: "raw memory".to_string(),
rollout_summary: "rollout summary".to_string(),
cwd: PathBuf::from("/tmp/workspace"),
generated_at: chrono::DateTime::<Utc>::from_timestamp(124, 0)
.expect("valid generated_at timestamp"),
};
let completion = completion_watermark(1_000, &[stage1_output]);
assert_eq!(completion, 1_000);
}
#[tokio::test]
async fn dispatch_reclaims_stale_global_lock_and_starts_consolidation() {
let harness = DispatchHarness::new().await;
harness.seed_stage1_output(100).await;
let stale_claim = harness
.state_db
.try_claim_global_phase2_job(ThreadId::new(), 0)
.await
.expect("claim stale global lock");
assert!(
matches!(stale_claim, Phase2JobClaimOutcome::Claimed { .. }),
"stale lock precondition should be claimed"
);
let scheduled =
run_global_memory_consolidation(&harness.session, Arc::clone(&harness.config)).await;
assert!(
scheduled,
"dispatch should reclaim stale lock and spawn one agent"
);
let running_claim = harness
.state_db
.try_claim_global_phase2_job(ThreadId::new(), 3_600)
.await
.expect("claim while running");
assert_eq!(running_claim, Phase2JobClaimOutcome::SkippedRunning);
let user_input_ops = harness.user_input_ops_count();
assert_eq!(user_input_ops, 1);
let thread_ids = harness.manager.list_thread_ids().await;
assert_eq!(thread_ids.len(), 1);
let subagent = harness
.manager
.get_thread(thread_ids[0])
.await
.expect("get consolidation thread");
let config_snapshot = subagent.config_snapshot().await;
assert_eq!(config_snapshot.approval_policy, AskForApproval::Never);
assert_eq!(config_snapshot.cwd, memory_root(&harness.config.codex_home));
match config_snapshot.sandbox_policy {
SandboxPolicy::WorkspaceWrite { writable_roots, .. } => {
assert!(
writable_roots
.iter()
.any(|root| root.as_path() == harness.config.codex_home.as_path()),
"consolidation subagent should have codex_home as writable root"
);
}
other => panic!("unexpected sandbox policy: {other:?}"),
}
harness.shutdown_threads().await;
}
#[tokio::test]
async fn dispatch_schedules_only_one_agent_while_lock_is_running() {
let harness = DispatchHarness::new().await;
harness.seed_stage1_output(200).await;
let first_run =
run_global_memory_consolidation(&harness.session, Arc::clone(&harness.config)).await;
let second_run =
run_global_memory_consolidation(&harness.session, Arc::clone(&harness.config)).await;
assert!(first_run, "first dispatch should schedule consolidation");
assert!(
!second_run,
"second dispatch should skip while the global lock is running"
);
let user_input_ops = harness.user_input_ops_count();
assert_eq!(user_input_ops, 1);
harness.shutdown_threads().await;
}
#[tokio::test]
async fn dispatch_with_dirty_job_and_no_stage1_outputs_skips_spawn_and_clears_dirty_flag() {
let harness = DispatchHarness::new().await;
harness
.state_db
.enqueue_global_consolidation(999)
.await
.expect("enqueue global consolidation");
let scheduled =
run_global_memory_consolidation(&harness.session, Arc::clone(&harness.config)).await;
assert!(
!scheduled,
"dispatch should not spawn when no stage-1 outputs are available"
);
assert_eq!(harness.user_input_ops_count(), 0);
let claim = harness
.state_db
.try_claim_global_phase2_job(ThreadId::new(), 3_600)
.await
.expect("claim global job after empty dispatch");
assert_eq!(
claim,
Phase2JobClaimOutcome::SkippedNotDirty,
"empty dispatch should finalize global job as up-to-date"
);
harness.shutdown_threads().await;
}
#[tokio::test]
async fn dispatch_with_empty_stage1_outputs_rebuilds_local_artifacts() {
let harness = DispatchHarness::new().await;
let root = memory_root(&harness.config.codex_home);
let summaries_dir = rollout_summaries_dir(&root);
tokio::fs::create_dir_all(&summaries_dir)
.await
.expect("create rollout summaries dir");
let stale_summary_path = summaries_dir.join(format!("{}.md", ThreadId::new()));
tokio::fs::write(&stale_summary_path, "stale summary\n")
.await
.expect("write stale rollout summary");
let raw_memories_path = raw_memories_file(&root);
tokio::fs::write(&raw_memories_path, "stale raw memories\n")
.await
.expect("write stale raw memories");
let memory_index_path = root.join("MEMORY.md");
tokio::fs::write(&memory_index_path, "stale memory index\n")
.await
.expect("write stale memory index");
let memory_summary_path = root.join("memory_summary.md");
tokio::fs::write(&memory_summary_path, "stale memory summary\n")
.await
.expect("write stale memory summary");
let stale_skill_file = root.join("skills/demo/SKILL.md");
tokio::fs::create_dir_all(
stale_skill_file
.parent()
.expect("skills subdirectory parent should exist"),
)
.await
.expect("create stale skills dir");
tokio::fs::write(&stale_skill_file, "stale skill\n")
.await
.expect("write stale skill");
harness
.state_db
.enqueue_global_consolidation(999)
.await
.expect("enqueue global consolidation");
let scheduled =
run_global_memory_consolidation(&harness.session, Arc::clone(&harness.config)).await;
assert!(
!scheduled,
"dispatch should skip subagent spawn when no stage-1 outputs are available"
);
assert!(
!tokio::fs::try_exists(&stale_summary_path)
.await
.expect("check stale summary existence"),
"empty consolidation should prune stale rollout summary files"
);
let raw_memories = tokio::fs::read_to_string(&raw_memories_path)
.await
.expect("read rebuilt raw memories");
assert_eq!(raw_memories, "# Raw Memories\n\nNo raw memories yet.\n");
assert!(
!tokio::fs::try_exists(&memory_index_path)
.await
.expect("check memory index existence"),
"empty consolidation should remove stale MEMORY.md"
);
assert!(
!tokio::fs::try_exists(&memory_summary_path)
.await
.expect("check memory summary existence"),
"empty consolidation should remove stale memory_summary.md"
);
assert!(
!tokio::fs::try_exists(&stale_skill_file)
.await
.expect("check stale skill existence"),
"empty consolidation should remove stale skills artifacts"
);
assert!(
!tokio::fs::try_exists(root.join("skills"))
.await
.expect("check skills dir existence"),
"empty consolidation should remove stale skills directory"
);
harness.shutdown_threads().await;
}
#[tokio::test]
async fn dispatch_marks_job_for_retry_when_spawn_agent_fails() {
let codex_home = tempfile::tempdir().expect("create temp codex home");
let mut config = test_config();
config.codex_home = codex_home.path().to_path_buf();
config.cwd = config.codex_home.clone();
let config = Arc::new(config);
let state_db = codex_state::StateRuntime::init(
config.codex_home.clone(),
config.model_provider_id.clone(),
None,
)
.await
.expect("initialize state db");
let (mut session, _turn_context) = make_session_and_context().await;
session.services.state_db = Some(Arc::clone(&state_db));
session.services.agent_control = AgentControl::default();
let session = Arc::new(session);
let thread_id = ThreadId::new();
let mut metadata_builder = ThreadMetadataBuilder::new(
thread_id,
config.codex_home.join(format!("rollout-{thread_id}.jsonl")),
Utc::now(),
SessionSource::Cli,
);
metadata_builder.cwd = config.cwd.clone();
metadata_builder.model_provider = Some(config.model_provider_id.clone());
let metadata = metadata_builder.build(&config.model_provider_id);
state_db
.upsert_thread(&metadata)
.await
.expect("upsert thread metadata");
let claim = state_db
.try_claim_stage1_job(thread_id, session.conversation_id, 100, 3_600, 64)
.await
.expect("claim stage-1 job");
let ownership_token = match claim {
codex_state::Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
other => panic!("unexpected stage-1 claim outcome: {other:?}"),
};
assert!(
state_db
.mark_stage1_job_succeeded(
thread_id,
&ownership_token,
100,
"raw memory",
"rollout summary",
)
.await
.expect("mark stage-1 success"),
"stage-1 success should enqueue global consolidation"
);
let scheduled = run_global_memory_consolidation(&session, Arc::clone(&config)).await;
assert!(
!scheduled,
"dispatch should return false when consolidation subagent cannot be spawned"
);
let retry_claim = state_db
.try_claim_global_phase2_job(ThreadId::new(), 3_600)
.await
.expect("claim global job after spawn failure");
assert_eq!(
retry_claim,
Phase2JobClaimOutcome::SkippedNotDirty,
"spawn failures should leave the job in retry backoff instead of running"
);
}
}

View File

@@ -4,7 +4,6 @@
//! - Phase 1: select rollouts, extract stage-1 raw memories, persist stage-1 outputs, and enqueue consolidation.
//! - Phase 2: claim a global consolidation lock, materialize consolidation inputs, and dispatch one consolidation agent.
mod dispatch;
mod phase1;
mod phase2;
pub(crate) mod prompts;
@@ -26,10 +25,10 @@ mod artifacts {
/// Phase 1 (startup extraction).
mod phase_one {
/// Default model used for phase 1.
pub(super) const MODEL: &str = "gpt-5.3-codex-spark";
/// Prompt used for phase 1.
pub(super) const PROMPT: &str = include_str!("../../templates/memories/stage_one_system.md");
/// Maximum number of rollout candidates processed per startup pass.
pub(super) const MAX_ROLLOUTS_PER_STARTUP: usize = 8;
/// Concurrency cap for startup memory extraction and consolidation scheduling.
pub(super) const CONCURRENCY_LIMIT: usize = 8;
/// Fallback stage-1 rollout truncation limit (tokens) when model metadata
@@ -44,10 +43,6 @@ mod phase_one {
/// Keeping this below 100% leaves room for system instructions, prompt
/// framing, and model output.
pub(super) const CONTEXT_WINDOW_PERCENT: i64 = 70;
/// Maximum rollout age considered for phase-1 extraction.
pub(super) const MAX_ROLLOUT_AGE_DAYS: i64 = 30;
/// Minimum rollout idle time required before phase-1 extraction.
pub(super) const MIN_ROLLOUT_IDLE_HOURS: i64 = 12;
/// Lease duration (seconds) for phase-1 job ownership.
pub(super) const JOB_LEASE_SECONDS: i64 = 3_600;
/// Backoff delay (seconds) before retrying a failed stage-1 extraction job.
@@ -58,10 +53,8 @@ mod phase_one {
/// Phase 2 (aka `Consolidation`).
mod phase_two {
/// Subagent source label used to identify consolidation tasks.
pub(super) const MEMORY_CONSOLIDATION_SUBAGENT_LABEL: &str = "memory_consolidation";
/// Maximum number of recent raw memories retained for global consolidation.
pub(super) const MAX_RAW_MEMORIES_FOR_GLOBAL: usize = 1_024;
/// Default model used for phase 2.
pub(super) const MODEL: &str = "gpt-5.3-codex";
/// Lease duration (seconds) for phase-2 consolidation job ownership.
pub(super) const JOB_LEASE_SECONDS: i64 = 3_600;
/// Backoff delay (seconds) before retrying a failed phase-2 consolidation

View File

@@ -2,6 +2,8 @@ use crate::Prompt;
use crate::RolloutRecorder;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::config::Config;
use crate::config::types::MemoriesConfig;
use crate::error::CodexErr;
use crate::memories::metrics;
use crate::memories::phase_one;
@@ -67,10 +69,9 @@ struct StageOneOutput {
/// Compact summary line used for routing and indexing.
#[serde(rename = "rollout_summary")]
pub(crate) rollout_summary: String,
/// Optional slug accepted from stage-1 output for forward compatibility.
/// This is currently ignored by downstream storage and naming, which remain thread-id based.
/// Optional slug used to derive rollout summary artifact filenames.
#[serde(default, rename = "rollout_slug")]
pub(crate) _rollout_slug: Option<String>,
pub(crate) rollout_slug: Option<String>,
}
/// Runs memory phase 1 in strict step order:
@@ -78,9 +79,9 @@ struct StageOneOutput {
/// 2) build one stage-1 request context
/// 3) run stage-1 extraction jobs in parallel
/// 4) emit metrics and logs
pub(in crate::memories) async fn run(session: &Arc<Session>) {
pub(in crate::memories) async fn run(session: &Arc<Session>, config: &Config) {
// 1. Claim startup job.
let Some(claimed_candidates) = claim_startup_jobs(session).await else {
let Some(claimed_candidates) = claim_startup_jobs(session, &config.memories).await else {
return;
};
if claimed_candidates.is_empty() {
@@ -93,7 +94,7 @@ pub(in crate::memories) async fn run(session: &Arc<Session>) {
}
// 2. Build request.
let stage_one_context = build_request_context(session).await;
let stage_one_context = build_request_context(session, config).await;
// 3. Run the parallel sampling.
let outcomes = run_jobs(session, claimed_candidates, stage_one_context).await;
@@ -120,7 +121,7 @@ pub fn output_schema() -> Value {
"rollout_slug": { "type": "string" },
"raw_memory": { "type": "string" }
},
"required": ["rollout_summary", "rollout_slug", "raw_memory"],
"required": ["rollout_summary", "raw_memory"],
"additionalProperties": false
})
}
@@ -129,18 +130,22 @@ impl RequestContext {
pub(in crate::memories) fn from_turn_context(
turn_context: &TurnContext,
turn_metadata_header: Option<String>,
model_info: ModelInfo,
) -> Self {
Self {
model_info: turn_context.model_info.clone(),
model_info,
turn_metadata_header,
otel_manager: turn_context.otel_manager.clone(),
reasoning_effort: turn_context.reasoning_effort,
reasoning_summary: turn_context.reasoning_summary,
turn_metadata_header,
}
}
}
async fn claim_startup_jobs(session: &Arc<Session>) -> Option<Vec<codex_state::Stage1JobClaim>> {
async fn claim_startup_jobs(
session: &Arc<Session>,
memories_config: &MemoriesConfig,
) -> Option<Vec<codex_state::Stage1JobClaim>> {
let Some(state_db) = session.services.state_db.as_deref() else {
// This should not happen.
warn!("state db unavailable while claiming phase-1 startup jobs; skipping");
@@ -157,9 +162,9 @@ async fn claim_startup_jobs(session: &Arc<Session>) -> Option<Vec<codex_state::S
session.conversation_id,
codex_state::Stage1StartupClaimParams {
scan_limit: phase_one::THREAD_SCAN_LIMIT,
max_claimed: phase_one::MAX_ROLLOUTS_PER_STARTUP,
max_age_days: phase_one::MAX_ROLLOUT_AGE_DAYS,
min_rollout_idle_hours: phase_one::MIN_ROLLOUT_IDLE_HOURS,
max_claimed: memories_config.max_rollouts_per_startup,
max_age_days: memories_config.max_rollout_age_days,
min_rollout_idle_hours: memories_config.min_rollout_idle_hours,
allowed_sources: allowed_sources.as_slice(),
lease_seconds: phase_one::JOB_LEASE_SECONDS,
},
@@ -179,11 +184,22 @@ async fn claim_startup_jobs(session: &Arc<Session>) -> Option<Vec<codex_state::S
}
}
async fn build_request_context(session: &Arc<Session>) -> RequestContext {
async fn build_request_context(session: &Arc<Session>, config: &Config) -> RequestContext {
let model_name = config
.memories
.phase_1_model
.clone()
.unwrap_or(phase_one::MODEL.to_string());
let model = session
.services
.models_manager
.get_model_info(&model_name, config)
.await;
let turn_context = session.new_default_turn().await;
RequestContext::from_turn_context(
turn_context.as_ref(),
turn_context.resolve_turn_metadata_header().await,
model,
)
}
@@ -251,6 +267,7 @@ mod job {
thread.updated_at.timestamp(),
&stage_one_output.raw_memory,
&stage_one_output.rollout_summary,
stage_one_output.rollout_slug.as_deref(),
)
.await,
token_usage,
@@ -331,6 +348,7 @@ mod job {
let mut output: StageOneOutput = serde_json::from_str(&result)?;
output.raw_memory = redact_secrets(output.raw_memory);
output.rollout_summary = redact_secrets(output.rollout_summary);
output.rollout_slug = output.rollout_slug.map(redact_secrets);
Ok((output, token_usage))
}
@@ -384,6 +402,7 @@ mod job {
source_updated_at: i64,
raw_memory: &str,
rollout_summary: &str,
rollout_slug: Option<&str>,
) -> JobOutcome {
let Some(state_db) = session.services.state_db.as_deref() else {
return JobOutcome::Failed;
@@ -396,6 +415,7 @@ mod job {
source_updated_at,
raw_memory,
rollout_summary,
rollout_slug,
)
.await
.unwrap_or(false)

View File

@@ -1,136 +1,360 @@
use crate::agent::AgentStatus;
use crate::agent::status::is_final as is_final_agent_status;
use crate::codex::Session;
use crate::config::Config;
use crate::memories::memory_root;
use crate::memories::metrics;
use crate::memories::phase_two;
use crate::memories::prompts::build_consolidation_prompt;
use crate::memories::storage::rebuild_raw_memories_file_from_memories;
use crate::memories::storage::sync_rollout_summaries_from_memories;
use codex_config::Constrained;
use codex_protocol::ThreadId;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::user_input::UserInput;
use codex_state::StateRuntime;
use codex_utils_absolute_path::AbsolutePathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch;
use tracing::debug;
use tracing::info;
use tracing::warn;
pub(in crate::memories) fn spawn_phase2_completion_task(
session: &Session,
ownership_token: String,
completion_watermark: i64,
consolidation_agent_id: ThreadId,
) {
let state_db = session.services.state_db.clone();
let agent_control = session.services.agent_control.clone();
let otel_manager = session.services.otel_manager.clone();
tokio::spawn(async move {
let Some(state_db) = state_db else {
return;
};
let status_rx = match agent_control.subscribe_status(consolidation_agent_id).await {
Ok(status_rx) => status_rx,
Err(err) => {
warn!(
"failed to subscribe to global memory consolidation agent {consolidation_agent_id}: {err}"
);
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "failed_subscribe_status")],
);
mark_phase2_failed_with_recovery(
state_db.as_ref(),
&ownership_token,
"failed to subscribe to consolidation agent status",
)
.await;
return;
}
};
let final_status = run_phase2_completion_task(
Arc::clone(&state_db),
ownership_token,
completion_watermark,
consolidation_agent_id,
status_rx,
)
.await;
if matches!(final_status, AgentStatus::Shutdown | AgentStatus::NotFound) {
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "failed_agent_unavailable")],
);
return;
}
if is_phase2_success(&final_status) {
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "succeeded")],
);
} else {
otel_manager.counter(metrics::MEMORY_PHASE_TWO_JOBS, 1, &[("status", "failed")]);
}
tokio::spawn(async move {
if let Err(err) = agent_control.shutdown_agent(consolidation_agent_id).await {
warn!(
"failed to auto-close global memory consolidation agent {consolidation_agent_id}: {err}"
);
}
});
});
#[derive(Debug, Clone, Default)]
struct Claim {
token: String,
watermark: i64,
}
async fn run_phase2_completion_task(
state_db: Arc<codex_state::StateRuntime>,
ownership_token: String,
completion_watermark: i64,
consolidation_agent_id: ThreadId,
mut status_rx: watch::Receiver<AgentStatus>,
) -> AgentStatus {
let final_status = {
#[derive(Debug, Clone, Default)]
struct Counters {
input: i64,
}
/// Runs memory phase 2 (aka consolidation) in strict order. The method represents the linear
/// flow of the consolidation phase.
pub(super) async fn run(session: &Arc<Session>, config: Arc<Config>) {
let Some(db) = session.services.state_db.as_deref() else {
// This should not happen.
return;
};
let root = memory_root(&config.codex_home);
let max_raw_memories = config.memories.max_raw_memories_for_global;
// 1. Claim the job.
let claim = match job::claim(session, db).await {
Ok(claim) => claim,
Err(e) => {
session.services.otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", e)],
);
return;
}
};
// 2. Get the config for the agent
let Some(agent_config) = agent::get_config(config.clone()) else {
// If we can't get the config, we can't consolidate.
tracing::error!("failed to get agent config");
job::failed(session, db, &claim, "failed_sandbox_policy").await;
return;
};
// 3. Query the memories
let raw_memories = match db.list_stage1_outputs_for_global(max_raw_memories).await {
Ok(memories) => memories,
Err(err) => {
tracing::error!("failed to list stage1 outputs from global: {}", err);
job::failed(session, db, &claim, "failed_load_stage1_outputs").await;
return;
}
};
let new_watermark = get_watermark(claim.watermark, &raw_memories);
// 4. Update the file system by syncing the raw memories with the one extracted from DB at
// step 3
// [`rollout_summaries/`]
if let Err(err) =
sync_rollout_summaries_from_memories(&root, &raw_memories, max_raw_memories).await
{
tracing::error!("failed syncing local memory artifacts for global consolidation: {err}");
job::failed(session, db, &claim, "failed_sync_artifacts").await;
return;
}
// [`raw_memories.md`]
if let Err(err) =
rebuild_raw_memories_file_from_memories(&root, &raw_memories, max_raw_memories).await
{
tracing::error!("failed syncing local memory artifacts for global consolidation: {err}");
job::failed(session, db, &claim, "failed_rebuild_raw_memories").await;
return;
}
if raw_memories.is_empty() {
// We check only after sync of the file system.
job::succeed(session, db, &claim, new_watermark, "succeeded_no_input").await;
return;
}
// 5. Spawn the agent
let prompt = agent::get_prompt(config);
let source = SessionSource::SubAgent(SubAgentSource::MemoryConsolidation);
let thread_id = match session
.services
.agent_control
.spawn_agent(agent_config, prompt, Some(source))
.await
{
Ok(thread_id) => thread_id,
Err(err) => {
tracing::error!("failed to spawn global memory consolidation agent: {err}");
job::failed(session, db, &claim, "failed_spawn_agent").await;
return;
}
};
// 6. Spawn the agent handler.
agent::handle(session, claim, new_watermark, thread_id);
// 7. Metrics and logs.
let counters = Counters {
input: raw_memories.len() as i64,
};
emit_metrics(session, counters);
}
mod job {
use super::*;
pub(super) async fn claim(
session: &Arc<Session>,
db: &StateRuntime,
) -> Result<Claim, &'static str> {
let otel_manager = &session.services.otel_manager;
let claim = db
.try_claim_global_phase2_job(session.conversation_id, phase_two::JOB_LEASE_SECONDS)
.await
.map_err(|e| {
tracing::error!("failed to claim job: {}", e);
"failed_claim"
})?;
let (token, watermark) = match claim {
codex_state::Phase2JobClaimOutcome::Claimed {
ownership_token,
input_watermark,
} => {
otel_manager.counter(metrics::MEMORY_PHASE_TWO_JOBS, 1, &[("status", "claimed")]);
(ownership_token, input_watermark)
}
codex_state::Phase2JobClaimOutcome::SkippedNotDirty => return Err("skipped_not_dirty"),
codex_state::Phase2JobClaimOutcome::SkippedRunning => return Err("skipped_running"),
};
Ok(Claim { token, watermark })
}
pub(super) async fn failed(
session: &Arc<Session>,
db: &StateRuntime,
claim: &Claim,
reason: &'static str,
) {
session.services.otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", reason)],
);
if matches!(
db.mark_global_phase2_job_failed(
&claim.token,
reason,
phase_two::JOB_RETRY_DELAY_SECONDS,
)
.await,
Ok(false)
) {
let _ = db
.mark_global_phase2_job_failed_if_unowned(
&claim.token,
reason,
phase_two::JOB_RETRY_DELAY_SECONDS,
)
.await;
}
}
pub(super) async fn succeed(
session: &Arc<Session>,
db: &StateRuntime,
claim: &Claim,
completion_watermark: i64,
reason: &'static str,
) {
session.services.otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", reason)],
);
let _ = db
.mark_global_phase2_job_succeeded(&claim.token, completion_watermark)
.await;
}
}
mod agent {
use super::*;
pub(super) fn get_config(config: Arc<Config>) -> Option<Config> {
let root = memory_root(&config.codex_home);
let mut agent_config = config.as_ref().clone();
agent_config.cwd = root;
// Approval policy
agent_config.permissions.approval_policy = Constrained::allow_only(AskForApproval::Never);
// Sandbox policy
let mut writable_roots = Vec::new();
match AbsolutePathBuf::from_absolute_path(agent_config.codex_home.clone()) {
Ok(codex_home) => writable_roots.push(codex_home),
Err(err) => warn!(
"memory phase-2 consolidation could not add codex_home writable root {}: {err}",
agent_config.codex_home.display()
),
}
// The consolidation agent only needs local codex_home write access and no network.
let consolidation_sandbox_policy = SandboxPolicy::WorkspaceWrite {
writable_roots,
read_only_access: Default::default(),
network_access: false,
exclude_tmpdir_env_var: false,
exclude_slash_tmp: false,
};
agent_config
.permissions
.sandbox_policy
.set(consolidation_sandbox_policy)
.ok()?;
agent_config.model = Some(
config
.memories
.phase_2_model
.clone()
.unwrap_or(phase_two::MODEL.to_string()),
);
Some(agent_config)
}
pub(super) fn get_prompt(config: Arc<Config>) -> Vec<UserInput> {
let root = memory_root(&config.codex_home);
let prompt = build_consolidation_prompt(&root);
vec![UserInput::Text {
text: prompt,
text_elements: vec![],
}]
}
/// Handle the agent while it is running.
pub(super) fn handle(
session: &Arc<Session>,
claim: Claim,
new_watermark: i64,
thread_id: ThreadId,
) {
let Some(db) = session.services.state_db.clone() else {
return;
};
let session = session.clone();
tokio::spawn(async move {
let agent_control = session.services.agent_control.clone();
// TODO(jif) we might have a very small race here.
let rx = match agent_control.subscribe_status(thread_id).await {
Ok(rx) => rx,
Err(err) => {
tracing::error!("agent_control.subscribe_status failed: {err:?}");
job::failed(&session, &db, &claim, "failed_subscribe_status").await;
return;
}
};
// Loop the agent until we have the final status.
let final_status = loop_agent(
db.clone(),
claim.token.clone(),
new_watermark,
thread_id,
rx,
)
.await;
if matches!(final_status, AgentStatus::Completed(_)) {
job::succeed(&session, &db, &claim, new_watermark, "succeeded").await;
} else {
job::failed(&session, &db, &claim, "failed_agent").await;
}
// Fire and forget close of the agent.
if !matches!(final_status, AgentStatus::Shutdown | AgentStatus::NotFound) {
tokio::spawn(async move {
if let Err(err) = agent_control.shutdown_agent(thread_id).await {
warn!(
"failed to auto-close global memory consolidation agent {thread_id}: {err}"
);
}
});
} else {
tracing::warn!("The agent was already gone");
}
});
}
async fn loop_agent(
db: Arc<StateRuntime>,
token: String,
_new_watermark: i64,
thread_id: ThreadId,
mut rx: watch::Receiver<AgentStatus>,
) -> AgentStatus {
let mut heartbeat_interval =
tokio::time::interval(Duration::from_secs(phase_two::JOB_HEARTBEAT_SECONDS));
heartbeat_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
let status = status_rx.borrow().clone();
let status = rx.borrow().clone();
if is_final_agent_status(&status) {
break status;
}
tokio::select! {
changed = status_rx.changed() => {
if changed.is_err() {
warn!(
"lost status updates for global memory consolidation agent {consolidation_agent_id}"
update = rx.changed() => {
if update.is_err() {
tracing::warn!(
"lost status updates for global memory consolidation agent {thread_id}"
);
break status;
}
}
_ = heartbeat_interval.tick() => {
match state_db
match db
.heartbeat_global_phase2_job(
&ownership_token,
&token,
phase_two::JOB_LEASE_SECONDS,
)
.await
{
Ok(true) => {}
Ok(false) => {
warn!(
"memory phase-2 heartbeat lost global ownership; finalizing as failure"
);
break AgentStatus::Errored(
"lost global phase-2 ownership during heartbeat".to_string(),
);
}
Err(err) => {
warn!(
"state db heartbeat_global_phase2_job failed during memories startup: {err}"
);
break AgentStatus::Errored(format!(
"phase-2 heartbeat update failed: {err}"
));
@@ -139,281 +363,30 @@ async fn run_phase2_completion_task(
}
}
}
};
}
}
let phase2_success = is_phase2_success(&final_status);
info!(
"memory phase-2 global consolidation complete: agent_id={consolidation_agent_id} success={phase2_success} final_status={final_status:?}"
pub(super) fn get_watermark(
claimed_watermark: i64,
latest_memories: &[codex_state::Stage1Output],
) -> i64 {
latest_memories
.iter()
.map(|memory| memory.source_updated_at.timestamp())
.max()
.unwrap_or(claimed_watermark)
.max(claimed_watermark) // todo double check the claimed here.
}
fn emit_metrics(session: &Arc<Session>, counters: Counters) {
let otel = session.services.otel_manager.clone();
if counters.input > 0 {
otel.counter(metrics::MEMORY_PHASE_TWO_INPUT, counters.input, &[]);
}
otel.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "agent_spawned")],
);
if phase2_success {
match state_db
.mark_global_phase2_job_succeeded(&ownership_token, completion_watermark)
.await
{
Ok(true) => {}
Ok(false) => {
debug!(
"memory phase-2 success finalization skipped after global ownership changed"
);
}
Err(err) => {
warn!(
"state db mark_global_phase2_job_succeeded failed during memories startup: {err}"
);
}
}
return final_status;
}
let failure_reason = phase2_failure_reason(&final_status);
mark_phase2_failed_with_recovery(state_db.as_ref(), &ownership_token, &failure_reason).await;
warn!(
"memory phase-2 global consolidation agent finished with non-success status: agent_id={consolidation_agent_id} final_status={final_status:?}"
);
final_status
}
async fn mark_phase2_failed_with_recovery(
state_db: &codex_state::StateRuntime,
ownership_token: &str,
failure_reason: &str,
) {
match state_db
.mark_global_phase2_job_failed(
ownership_token,
failure_reason,
phase_two::JOB_RETRY_DELAY_SECONDS,
)
.await
{
Ok(true) => {}
Ok(false) => match state_db
.mark_global_phase2_job_failed_if_unowned(
ownership_token,
failure_reason,
phase_two::JOB_RETRY_DELAY_SECONDS,
)
.await
{
Ok(true) => {
debug!(
"memory phase-2 failure finalization applied fallback update for unowned running job"
);
}
Ok(false) => {
debug!(
"memory phase-2 failure finalization skipped after global ownership changed"
);
}
Err(err) => {
warn!(
"state db mark_global_phase2_job_failed_if_unowned failed during memories startup: {err}"
);
}
},
Err(err) => {
warn!("state db mark_global_phase2_job_failed failed during memories startup: {err}");
}
}
}
fn is_phase2_success(final_status: &AgentStatus) -> bool {
matches!(final_status, AgentStatus::Completed(_))
}
fn phase2_failure_reason(final_status: &AgentStatus) -> String {
format!("consolidation agent finished with status {final_status:?}")
}
#[cfg(test)]
mod tests {
use super::is_phase2_success;
use super::phase2_failure_reason;
use super::run_phase2_completion_task;
use crate::agent::AgentStatus;
use codex_protocol::ThreadId;
use codex_state::Phase2JobClaimOutcome;
use pretty_assertions::assert_eq;
use std::sync::Arc;
#[test]
fn phase2_success_only_for_completed_status() {
assert!(is_phase2_success(&AgentStatus::Completed(None)));
assert!(!is_phase2_success(&AgentStatus::Running));
assert!(!is_phase2_success(&AgentStatus::Errored(
"oops".to_string()
)));
}
#[test]
fn phase2_failure_reason_includes_status() {
let status = AgentStatus::Errored("boom".to_string());
let reason = phase2_failure_reason(&status);
assert!(reason.contains("consolidation agent finished with status"));
assert!(reason.contains("boom"));
}
#[tokio::test]
async fn phase2_completion_marks_succeeded_for_completed_status() {
let codex_home = tempfile::tempdir().expect("create temp codex home");
let state_db = Arc::new(
codex_state::StateRuntime::init(
codex_home.path().to_path_buf(),
"test-provider".to_string(),
None,
)
.await
.expect("initialize state runtime"),
);
let owner = ThreadId::new();
state_db
.enqueue_global_consolidation(123)
.await
.expect("enqueue global consolidation");
let claim = state_db
.try_claim_global_phase2_job(owner, 3_600)
.await
.expect("claim global phase-2 job");
let ownership_token = match claim {
Phase2JobClaimOutcome::Claimed {
ownership_token, ..
} => ownership_token,
other => panic!("unexpected phase-2 claim outcome: {other:?}"),
};
let (_status_tx, status_rx) = tokio::sync::watch::channel(AgentStatus::Completed(None));
run_phase2_completion_task(
Arc::clone(&state_db),
ownership_token.clone(),
123,
ThreadId::new(),
status_rx,
)
.await;
let up_to_date_claim = state_db
.try_claim_global_phase2_job(ThreadId::new(), 3_600)
.await
.expect("claim up-to-date global job");
assert_eq!(up_to_date_claim, Phase2JobClaimOutcome::SkippedNotDirty);
state_db
.enqueue_global_consolidation(124)
.await
.expect("enqueue advanced consolidation watermark");
let rerun_claim = state_db
.try_claim_global_phase2_job(ThreadId::new(), 3_600)
.await
.expect("claim rerun global job");
assert!(
matches!(rerun_claim, Phase2JobClaimOutcome::Claimed { .. }),
"advanced watermark should be claimable after success finalization"
);
}
#[tokio::test]
async fn phase2_completion_marks_failed_when_status_updates_are_lost() {
let codex_home = tempfile::tempdir().expect("create temp codex home");
let state_db = Arc::new(
codex_state::StateRuntime::init(
codex_home.path().to_path_buf(),
"test-provider".to_string(),
None,
)
.await
.expect("initialize state runtime"),
);
state_db
.enqueue_global_consolidation(456)
.await
.expect("enqueue global consolidation");
let claim = state_db
.try_claim_global_phase2_job(ThreadId::new(), 3_600)
.await
.expect("claim global phase-2 job");
let ownership_token = match claim {
Phase2JobClaimOutcome::Claimed {
ownership_token, ..
} => ownership_token,
other => panic!("unexpected phase-2 claim outcome: {other:?}"),
};
let (status_tx, status_rx) = tokio::sync::watch::channel(AgentStatus::Running);
drop(status_tx);
run_phase2_completion_task(
Arc::clone(&state_db),
ownership_token,
456,
ThreadId::new(),
status_rx,
)
.await;
let claim = state_db
.try_claim_global_phase2_job(ThreadId::new(), 3_600)
.await
.expect("claim after failure finalization");
assert_eq!(
claim,
Phase2JobClaimOutcome::SkippedNotDirty,
"failure finalization should leave global job in retry-backoff, not running ownership"
);
}
#[tokio::test]
async fn phase2_completion_heartbeat_loss_does_not_steal_active_other_owner() {
let codex_home = tempfile::tempdir().expect("create temp codex home");
let state_db = Arc::new(
codex_state::StateRuntime::init(
codex_home.path().to_path_buf(),
"test-provider".to_string(),
None,
)
.await
.expect("initialize state runtime"),
);
state_db
.enqueue_global_consolidation(789)
.await
.expect("enqueue global consolidation");
let claim = state_db
.try_claim_global_phase2_job(ThreadId::new(), 3_600)
.await
.expect("claim global phase-2 job");
let claimed_token = match claim {
Phase2JobClaimOutcome::Claimed {
ownership_token, ..
} => ownership_token,
other => panic!("unexpected phase-2 claim outcome: {other:?}"),
};
let (_status_tx, status_rx) = tokio::sync::watch::channel(AgentStatus::Running);
run_phase2_completion_task(
Arc::clone(&state_db),
"non-owner-token".to_string(),
789,
ThreadId::new(),
status_rx,
)
.await;
let claim = state_db
.try_claim_global_phase2_job(ThreadId::new(), 3_600)
.await
.expect("claim after heartbeat ownership loss");
assert_eq!(
claim,
Phase2JobClaimOutcome::SkippedRunning,
"heartbeat ownership-loss handling should not steal a live owner lease"
);
assert_eq!(
state_db
.mark_global_phase2_job_succeeded(claimed_token.as_str(), 789)
.await
.expect("mark original owner success"),
true,
"the original owner should still be able to finalize"
);
}
}

View File

@@ -2,6 +2,7 @@ use crate::codex::Session;
use crate::config::Config;
use crate::features::Feature;
use crate::memories::phase1;
use crate::memories::phase2;
use codex_protocol::protocol::SessionSource;
use std::sync::Arc;
use tracing::warn;
@@ -34,8 +35,8 @@ pub(crate) fn start_memories_startup_task(
};
// Run phase 1.
phase1::run(&session).await;
phase1::run(&session, &config).await;
// Run phase 2.
crate::memories::dispatch::run_global_memory_consolidation(&session, config).await;
phase2::run(&session, config).await;
});
}

View File

@@ -5,7 +5,6 @@ use std::path::Path;
use tracing::warn;
use crate::memories::ensure_layout;
use crate::memories::phase_two;
use crate::memories::raw_memories_file;
use crate::memories::rollout_summaries_dir;
@@ -15,25 +14,27 @@ use crate::memories::rollout_summaries_dir;
pub(super) async fn rebuild_raw_memories_file_from_memories(
root: &Path,
memories: &[Stage1Output],
max_raw_memories_for_global: usize,
) -> std::io::Result<()> {
ensure_layout(root).await?;
rebuild_raw_memories_file(root, memories).await
rebuild_raw_memories_file(root, memories, max_raw_memories_for_global).await
}
/// Syncs canonical rollout summary files from DB-backed stage-1 output rows.
pub(super) async fn sync_rollout_summaries_from_memories(
root: &Path,
memories: &[Stage1Output],
max_raw_memories_for_global: usize,
) -> std::io::Result<()> {
ensure_layout(root).await?;
let retained = memories
.iter()
.take(phase_two::MAX_RAW_MEMORIES_FOR_GLOBAL)
.take(max_raw_memories_for_global)
.collect::<Vec<_>>();
let keep = retained
.iter()
.map(|memory| memory.thread_id.to_string())
.map(|memory| rollout_summary_file_stem(memory))
.collect::<BTreeSet<_>>();
prune_rollout_summaries(root, &keep).await?;
@@ -62,10 +63,14 @@ pub(super) async fn sync_rollout_summaries_from_memories(
Ok(())
}
async fn rebuild_raw_memories_file(root: &Path, memories: &[Stage1Output]) -> std::io::Result<()> {
async fn rebuild_raw_memories_file(
root: &Path,
memories: &[Stage1Output],
max_raw_memories_for_global: usize,
) -> std::io::Result<()> {
let retained = memories
.iter()
.take(phase_two::MAX_RAW_MEMORIES_FOR_GLOBAL)
.take(max_raw_memories_for_global)
.collect::<Vec<_>>();
let mut body = String::from("# Raw Memories\n\n");
@@ -108,10 +113,10 @@ async fn prune_rollout_summaries(root: &Path, keep: &BTreeSet<String>) -> std::i
let Some(file_name) = path.file_name().and_then(|name| name.to_str()) else {
continue;
};
let Some(thread_id) = extract_thread_id_from_rollout_summary_filename(file_name) else {
let Some(stem) = file_name.strip_suffix(".md") else {
continue;
};
if !keep.contains(thread_id)
if !keep.contains(stem)
&& let Err(err) = tokio::fs::remove_file(&path).await
&& err.kind() != std::io::ErrorKind::NotFound
{
@@ -129,7 +134,8 @@ async fn write_rollout_summary_for_thread(
root: &Path,
memory: &Stage1Output,
) -> std::io::Result<()> {
let path = rollout_summaries_dir(root).join(format!("{}.md", memory.thread_id));
let file_stem = rollout_summary_file_stem(memory);
let path = rollout_summaries_dir(root).join(format!("{file_stem}.md"));
let mut body = String::new();
writeln!(body, "thread_id: {}", memory.thread_id)
@@ -150,7 +156,85 @@ async fn write_rollout_summary_for_thread(
tokio::fs::write(path, body).await
}
fn extract_thread_id_from_rollout_summary_filename(file_name: &str) -> Option<&str> {
let stem = file_name.strip_suffix(".md")?;
if stem.is_empty() { None } else { Some(stem) }
fn rollout_summary_file_stem(memory: &Stage1Output) -> String {
const ROLLOUT_SLUG_MAX_LEN: usize = 20;
let thread_id = memory.thread_id.to_string();
let Some(raw_slug) = memory.rollout_slug.as_deref() else {
return thread_id;
};
let mut slug = String::with_capacity(ROLLOUT_SLUG_MAX_LEN);
for ch in raw_slug.chars() {
if slug.len() >= ROLLOUT_SLUG_MAX_LEN {
break;
}
if ch.is_ascii_alphanumeric() {
slug.push(ch.to_ascii_lowercase());
} else {
slug.push('_');
}
}
while slug.ends_with('_') {
slug.pop();
}
if slug.is_empty() {
thread_id
} else {
format!("{thread_id}-{slug}")
}
}
#[cfg(test)]
mod tests {
use super::rollout_summary_file_stem;
use chrono::TimeZone;
use chrono::Utc;
use codex_protocol::ThreadId;
use codex_state::Stage1Output;
use pretty_assertions::assert_eq;
use std::path::PathBuf;
fn stage1_output_with_slug(rollout_slug: Option<&str>) -> Stage1Output {
Stage1Output {
thread_id: ThreadId::new(),
source_updated_at: Utc.timestamp_opt(123, 0).single().expect("timestamp"),
raw_memory: "raw memory".to_string(),
rollout_summary: "summary".to_string(),
rollout_slug: rollout_slug.map(ToString::to_string),
cwd: PathBuf::from("/tmp/workspace"),
generated_at: Utc.timestamp_opt(124, 0).single().expect("timestamp"),
}
}
#[test]
fn rollout_summary_file_stem_uses_thread_id_when_slug_missing() {
let memory = stage1_output_with_slug(None);
let thread_id = memory.thread_id.to_string();
assert_eq!(rollout_summary_file_stem(&memory), thread_id);
}
#[test]
fn rollout_summary_file_stem_sanitizes_and_truncates_slug() {
let memory =
stage1_output_with_slug(Some("Unsafe Slug/With Spaces & Symbols + EXTRA_LONG_12345"));
let thread_id = memory.thread_id.to_string();
assert_eq!(
rollout_summary_file_stem(&memory),
format!("{thread_id}-unsafe_slug_with_spa")
);
}
#[test]
fn rollout_summary_file_stem_uses_thread_id_when_slug_is_empty() {
let memory = stage1_output_with_slug(Some(""));
let thread_id = memory.thread_id.to_string();
assert_eq!(rollout_summary_file_stem(&memory), thread_id);
}
}

View File

@@ -1,5 +1,6 @@
use super::storage::rebuild_raw_memories_file_from_memories;
use super::storage::sync_rollout_summaries_from_memories;
use crate::config::types::DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL;
use crate::memories::ensure_layout;
use crate::memories::memory_root;
use crate::memories::raw_memories_file;
@@ -21,7 +22,7 @@ fn memory_root_uses_shared_global_path() {
}
#[test]
fn stage_one_output_schema_requires_all_declared_properties() {
fn stage_one_output_schema_keeps_rollout_slug_optional() {
let schema = crate::memories::phase1::output_schema();
let properties = schema
.get("properties")
@@ -32,16 +33,17 @@ fn stage_one_output_schema_requires_all_declared_properties() {
.and_then(Value::as_array)
.expect("required array");
let mut property_keys = properties.keys().map(String::as_str).collect::<Vec<_>>();
property_keys.sort_unstable();
let mut required_keys = required
.iter()
.map(|key| key.as_str().expect("required key string"))
.collect::<Vec<_>>();
required_keys.sort_unstable();
assert_eq!(required_keys, property_keys);
assert!(
properties.contains_key("rollout_slug"),
"schema should declare rollout_slug"
);
assert_eq!(required_keys, vec!["raw_memory", "rollout_summary"]);
}
#[tokio::test]
@@ -66,16 +68,25 @@ async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only
source_updated_at: Utc.timestamp_opt(100, 0).single().expect("timestamp"),
raw_memory: "raw memory".to_string(),
rollout_summary: "short summary".to_string(),
rollout_slug: None,
cwd: PathBuf::from("/tmp/workspace"),
generated_at: Utc.timestamp_opt(101, 0).single().expect("timestamp"),
}];
sync_rollout_summaries_from_memories(&root, &memories)
.await
.expect("sync rollout summaries");
rebuild_raw_memories_file_from_memories(&root, &memories)
.await
.expect("rebuild raw memories");
sync_rollout_summaries_from_memories(
&root,
&memories,
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL,
)
.await
.expect("sync rollout summaries");
rebuild_raw_memories_file_from_memories(
&root,
&memories,
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL,
)
.await
.expect("rebuild raw memories");
assert!(keep_path.is_file());
assert!(!drop_path.exists());
@@ -87,3 +98,578 @@ async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only
assert!(raw_memories.contains(&keep_id));
assert!(raw_memories.contains("cwd: /tmp/workspace"));
}
#[tokio::test]
async fn sync_rollout_summaries_uses_thread_id_and_sanitized_slug_filename() {
let dir = tempdir().expect("tempdir");
let root = dir.path().join("memory");
ensure_layout(&root).await.expect("ensure layout");
let thread_id = ThreadId::new();
let stale_unslugged_path = rollout_summaries_dir(&root).join(format!("{thread_id}.md"));
let stale_old_slug_path =
rollout_summaries_dir(&root).join(format!("{thread_id}--old-slug.md"));
tokio::fs::write(&stale_unslugged_path, "stale")
.await
.expect("write stale unslugged file");
tokio::fs::write(&stale_old_slug_path, "stale")
.await
.expect("write stale old-slug file");
let memories = vec![Stage1Output {
thread_id,
source_updated_at: Utc.timestamp_opt(200, 0).single().expect("timestamp"),
raw_memory: "raw memory".to_string(),
rollout_summary: "short summary".to_string(),
rollout_slug: Some("Unsafe Slug/With Spaces & Symbols + EXTRA_LONG_12345".to_string()),
cwd: PathBuf::from("/tmp/workspace"),
generated_at: Utc.timestamp_opt(201, 0).single().expect("timestamp"),
}];
sync_rollout_summaries_from_memories(
&root,
&memories,
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL,
)
.await
.expect("sync rollout summaries");
let mut dir = tokio::fs::read_dir(rollout_summaries_dir(&root))
.await
.expect("open rollout summaries dir");
let mut files = Vec::new();
while let Some(entry) = dir.next_entry().await.expect("read dir entry") {
files.push(entry.file_name().to_string_lossy().to_string());
}
files.sort_unstable();
assert_eq!(files.len(), 1);
let file_name = &files[0];
let stem = file_name
.strip_suffix(".md")
.expect("rollout summary file should end with .md");
let slug = stem
.strip_prefix(&format!("{thread_id}-"))
.expect("rollout summary filename should include thread id and slug");
assert!(slug.len() <= 20, "slug should be capped at 20 chars");
assert!(
slug.chars()
.all(|ch| ch.is_ascii_lowercase() || ch.is_ascii_digit() || ch == '_'),
"slug should be file-safe lowercase ascii with underscores"
);
let summary = tokio::fs::read_to_string(rollout_summaries_dir(&root).join(file_name))
.await
.expect("read rollout summary");
assert!(summary.contains(&format!("thread_id: {thread_id}")));
assert!(
!tokio::fs::try_exists(&stale_unslugged_path)
.await
.expect("check stale unslugged path"),
"slugged sync should prune stale unslugged filename for same thread"
);
assert!(
!tokio::fs::try_exists(&stale_old_slug_path)
.await
.expect("check stale old slug path"),
"slugged sync should prune stale slugged filename for same thread"
);
}
mod phase2 {
use crate::CodexAuth;
use crate::ThreadManager;
use crate::agent::AgentControl;
use crate::codex::Session;
use crate::codex::make_session_and_context;
use crate::config::Config;
use crate::config::test_config;
use crate::memories::memory_root;
use crate::memories::phase2;
use crate::memories::raw_memories_file;
use crate::memories::rollout_summaries_dir;
use chrono::Utc;
use codex_config::Constrained;
use codex_protocol::ThreadId;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::SessionSource;
use codex_state::Phase2JobClaimOutcome;
use codex_state::Stage1Output;
use codex_state::ThreadMetadataBuilder;
use std::path::PathBuf;
use std::sync::Arc;
use tempfile::TempDir;
fn stage1_output_with_source_updated_at(source_updated_at: i64) -> Stage1Output {
Stage1Output {
thread_id: ThreadId::new(),
source_updated_at: chrono::DateTime::<Utc>::from_timestamp(source_updated_at, 0)
.expect("valid source_updated_at timestamp"),
raw_memory: "raw memory".to_string(),
rollout_summary: "rollout summary".to_string(),
rollout_slug: None,
cwd: PathBuf::from("/tmp/workspace"),
generated_at: chrono::DateTime::<Utc>::from_timestamp(source_updated_at + 1, 0)
.expect("valid generated_at timestamp"),
}
}
struct DispatchHarness {
_codex_home: TempDir,
config: Arc<Config>,
session: Arc<Session>,
manager: ThreadManager,
state_db: Arc<codex_state::StateRuntime>,
}
impl DispatchHarness {
async fn new() -> Self {
let codex_home = tempfile::tempdir().expect("create temp codex home");
let mut config = test_config();
config.codex_home = codex_home.path().to_path_buf();
config.cwd = config.codex_home.clone();
let config = Arc::new(config);
let state_db = codex_state::StateRuntime::init(
config.codex_home.clone(),
config.model_provider_id.clone(),
None,
)
.await
.expect("initialize state db");
let manager = ThreadManager::with_models_provider_and_home_for_tests(
CodexAuth::from_api_key("dummy"),
config.model_provider.clone(),
config.codex_home.clone(),
);
let (mut session, _turn_context) = make_session_and_context().await;
session.services.state_db = Some(Arc::clone(&state_db));
session.services.agent_control = manager.agent_control();
Self {
_codex_home: codex_home,
config,
session: Arc::new(session),
manager,
state_db,
}
}
async fn seed_stage1_output(&self, source_updated_at: i64) {
let thread_id = ThreadId::new();
let mut metadata_builder = ThreadMetadataBuilder::new(
thread_id,
self.config
.codex_home
.join(format!("rollout-{thread_id}.jsonl")),
Utc::now(),
SessionSource::Cli,
);
metadata_builder.cwd = self.config.cwd.clone();
metadata_builder.model_provider = Some(self.config.model_provider_id.clone());
let metadata = metadata_builder.build(&self.config.model_provider_id);
self.state_db
.upsert_thread(&metadata)
.await
.expect("upsert thread metadata");
let claim = self
.state_db
.try_claim_stage1_job(
thread_id,
self.session.conversation_id,
source_updated_at,
3_600,
64,
)
.await
.expect("claim stage-1 job");
let ownership_token = match claim {
codex_state::Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
other => panic!("unexpected stage-1 claim outcome: {other:?}"),
};
assert!(
self.state_db
.mark_stage1_job_succeeded(
thread_id,
&ownership_token,
source_updated_at,
"raw memory",
"rollout summary",
None,
)
.await
.expect("mark stage-1 success"),
"stage-1 success should enqueue global consolidation"
);
}
async fn shutdown_threads(&self) {
self.manager
.remove_and_close_all_threads()
.await
.expect("shutdown spawned threads");
}
fn user_input_ops_count(&self) -> usize {
self.manager
.captured_ops()
.into_iter()
.filter(|(_, op)| matches!(op, Op::UserInput { .. }))
.count()
}
}
#[test]
fn completion_watermark_never_regresses_below_claimed_input_watermark() {
let stage1_output = stage1_output_with_source_updated_at(123);
let completion = phase2::get_watermark(1_000, &[stage1_output]);
pretty_assertions::assert_eq!(completion, 1_000);
}
#[test]
fn completion_watermark_uses_claimed_watermark_when_there_are_no_memories() {
let completion = phase2::get_watermark(777, &[]);
pretty_assertions::assert_eq!(completion, 777);
}
#[test]
fn completion_watermark_uses_latest_memory_timestamp_when_it_is_newer() {
let older = stage1_output_with_source_updated_at(123);
let newer = stage1_output_with_source_updated_at(456);
let completion = phase2::get_watermark(200, &[older, newer]);
pretty_assertions::assert_eq!(completion, 456);
}
#[tokio::test]
async fn dispatch_skips_when_global_job_is_not_dirty() {
let harness = DispatchHarness::new().await;
phase2::run(&harness.session, Arc::clone(&harness.config)).await;
pretty_assertions::assert_eq!(harness.user_input_ops_count(), 0);
let thread_ids = harness.manager.list_thread_ids().await;
pretty_assertions::assert_eq!(thread_ids.len(), 0);
}
#[tokio::test]
async fn dispatch_skips_when_global_job_is_already_running() {
let harness = DispatchHarness::new().await;
harness
.state_db
.enqueue_global_consolidation(123)
.await
.expect("enqueue global consolidation");
let claimed = harness
.state_db
.try_claim_global_phase2_job(ThreadId::new(), 3_600)
.await
.expect("claim running global lock");
assert!(
matches!(claimed, Phase2JobClaimOutcome::Claimed { .. }),
"precondition should claim the running lock"
);
phase2::run(&harness.session, Arc::clone(&harness.config)).await;
let running_claim = harness
.state_db
.try_claim_global_phase2_job(ThreadId::new(), 3_600)
.await
.expect("claim while lock is still running");
pretty_assertions::assert_eq!(running_claim, Phase2JobClaimOutcome::SkippedRunning);
pretty_assertions::assert_eq!(harness.user_input_ops_count(), 0);
let thread_ids = harness.manager.list_thread_ids().await;
pretty_assertions::assert_eq!(thread_ids.len(), 0);
}
#[tokio::test]
async fn dispatch_reclaims_stale_global_lock_and_starts_consolidation() {
let harness = DispatchHarness::new().await;
harness.seed_stage1_output(100).await;
let stale_claim = harness
.state_db
.try_claim_global_phase2_job(ThreadId::new(), 0)
.await
.expect("claim stale global lock");
assert!(
matches!(stale_claim, Phase2JobClaimOutcome::Claimed { .. }),
"stale lock precondition should be claimed"
);
phase2::run(&harness.session, Arc::clone(&harness.config)).await;
let running_claim = harness
.state_db
.try_claim_global_phase2_job(ThreadId::new(), 3_600)
.await
.expect("claim while running");
pretty_assertions::assert_eq!(running_claim, Phase2JobClaimOutcome::SkippedRunning);
let user_input_ops = harness.user_input_ops_count();
pretty_assertions::assert_eq!(user_input_ops, 1);
let thread_ids = harness.manager.list_thread_ids().await;
pretty_assertions::assert_eq!(thread_ids.len(), 1);
let subagent = harness
.manager
.get_thread(thread_ids[0])
.await
.expect("get consolidation thread");
let config_snapshot = subagent.config_snapshot().await;
pretty_assertions::assert_eq!(config_snapshot.approval_policy, AskForApproval::Never);
pretty_assertions::assert_eq!(config_snapshot.cwd, memory_root(&harness.config.codex_home));
match config_snapshot.sandbox_policy {
SandboxPolicy::WorkspaceWrite { writable_roots, .. } => {
assert!(
writable_roots
.iter()
.any(|root| root.as_path() == harness.config.codex_home.as_path()),
"consolidation subagent should have codex_home as writable root"
);
}
other => panic!("unexpected sandbox policy: {other:?}"),
}
harness.shutdown_threads().await;
}
#[tokio::test]
async fn dispatch_with_empty_stage1_outputs_rebuilds_local_artifacts() {
let harness = DispatchHarness::new().await;
let root = memory_root(&harness.config.codex_home);
let summaries_dir = rollout_summaries_dir(&root);
tokio::fs::create_dir_all(&summaries_dir)
.await
.expect("create rollout summaries dir");
let stale_summary_path = summaries_dir.join(format!("{}.md", ThreadId::new()));
tokio::fs::write(&stale_summary_path, "stale summary\n")
.await
.expect("write stale rollout summary");
let raw_memories_path = raw_memories_file(&root);
tokio::fs::write(&raw_memories_path, "stale raw memories\n")
.await
.expect("write stale raw memories");
let memory_index_path = root.join("MEMORY.md");
tokio::fs::write(&memory_index_path, "stale memory index\n")
.await
.expect("write stale memory index");
let memory_summary_path = root.join("memory_summary.md");
tokio::fs::write(&memory_summary_path, "stale memory summary\n")
.await
.expect("write stale memory summary");
let stale_skill_file = root.join("skills/demo/SKILL.md");
tokio::fs::create_dir_all(
stale_skill_file
.parent()
.expect("skills subdirectory parent should exist"),
)
.await
.expect("create stale skills dir");
tokio::fs::write(&stale_skill_file, "stale skill\n")
.await
.expect("write stale skill");
harness
.state_db
.enqueue_global_consolidation(999)
.await
.expect("enqueue global consolidation");
phase2::run(&harness.session, Arc::clone(&harness.config)).await;
assert!(
!tokio::fs::try_exists(&stale_summary_path)
.await
.expect("check stale summary existence"),
"empty consolidation should prune stale rollout summary files"
);
let raw_memories = tokio::fs::read_to_string(&raw_memories_path)
.await
.expect("read rebuilt raw memories");
pretty_assertions::assert_eq!(raw_memories, "# Raw Memories\n\nNo raw memories yet.\n");
assert!(
!tokio::fs::try_exists(&memory_index_path)
.await
.expect("check memory index existence"),
"empty consolidation should remove stale MEMORY.md"
);
assert!(
!tokio::fs::try_exists(&memory_summary_path)
.await
.expect("check memory summary existence"),
"empty consolidation should remove stale memory_summary.md"
);
assert!(
!tokio::fs::try_exists(&stale_skill_file)
.await
.expect("check stale skill existence"),
"empty consolidation should remove stale skills artifacts"
);
assert!(
!tokio::fs::try_exists(root.join("skills"))
.await
.expect("check skills dir existence"),
"empty consolidation should remove stale skills directory"
);
let next_claim = harness
.state_db
.try_claim_global_phase2_job(ThreadId::new(), 3_600)
.await
.expect("claim global job after empty consolidation success");
pretty_assertions::assert_eq!(next_claim, Phase2JobClaimOutcome::SkippedNotDirty);
pretty_assertions::assert_eq!(harness.user_input_ops_count(), 0);
let thread_ids = harness.manager.list_thread_ids().await;
pretty_assertions::assert_eq!(thread_ids.len(), 0);
harness.shutdown_threads().await;
}
#[tokio::test]
async fn dispatch_marks_job_for_retry_when_sandbox_policy_cannot_be_overridden() {
let harness = DispatchHarness::new().await;
harness
.state_db
.enqueue_global_consolidation(99)
.await
.expect("enqueue global consolidation");
let mut constrained_config = harness.config.as_ref().clone();
constrained_config.permissions.sandbox_policy =
Constrained::allow_only(SandboxPolicy::DangerFullAccess);
phase2::run(&harness.session, Arc::new(constrained_config)).await;
let retry_claim = harness
.state_db
.try_claim_global_phase2_job(ThreadId::new(), 3_600)
.await
.expect("claim global job after sandbox policy failure");
pretty_assertions::assert_eq!(retry_claim, Phase2JobClaimOutcome::SkippedNotDirty);
pretty_assertions::assert_eq!(harness.user_input_ops_count(), 0);
let thread_ids = harness.manager.list_thread_ids().await;
pretty_assertions::assert_eq!(thread_ids.len(), 0);
}
#[tokio::test]
async fn dispatch_marks_job_for_retry_when_syncing_artifacts_fails() {
let harness = DispatchHarness::new().await;
harness.seed_stage1_output(100).await;
let root = memory_root(&harness.config.codex_home);
tokio::fs::write(&root, "not a directory")
.await
.expect("create file at memory root");
phase2::run(&harness.session, Arc::clone(&harness.config)).await;
let retry_claim = harness
.state_db
.try_claim_global_phase2_job(ThreadId::new(), 3_600)
.await
.expect("claim global job after sync failure");
pretty_assertions::assert_eq!(retry_claim, Phase2JobClaimOutcome::SkippedNotDirty);
pretty_assertions::assert_eq!(harness.user_input_ops_count(), 0);
let thread_ids = harness.manager.list_thread_ids().await;
pretty_assertions::assert_eq!(thread_ids.len(), 0);
}
#[tokio::test]
async fn dispatch_marks_job_for_retry_when_rebuilding_raw_memories_fails() {
let harness = DispatchHarness::new().await;
harness.seed_stage1_output(100).await;
let root = memory_root(&harness.config.codex_home);
tokio::fs::create_dir_all(raw_memories_file(&root))
.await
.expect("create raw_memories.md as a directory");
phase2::run(&harness.session, Arc::clone(&harness.config)).await;
let retry_claim = harness
.state_db
.try_claim_global_phase2_job(ThreadId::new(), 3_600)
.await
.expect("claim global job after rebuild failure");
pretty_assertions::assert_eq!(retry_claim, Phase2JobClaimOutcome::SkippedNotDirty);
pretty_assertions::assert_eq!(harness.user_input_ops_count(), 0);
let thread_ids = harness.manager.list_thread_ids().await;
pretty_assertions::assert_eq!(thread_ids.len(), 0);
}
#[tokio::test]
async fn dispatch_marks_job_for_retry_when_spawn_agent_fails() {
let codex_home = tempfile::tempdir().expect("create temp codex home");
let mut config = test_config();
config.codex_home = codex_home.path().to_path_buf();
config.cwd = config.codex_home.clone();
let config = Arc::new(config);
let state_db = codex_state::StateRuntime::init(
config.codex_home.clone(),
config.model_provider_id.clone(),
None,
)
.await
.expect("initialize state db");
let (mut session, _turn_context) = make_session_and_context().await;
session.services.state_db = Some(Arc::clone(&state_db));
session.services.agent_control = AgentControl::default();
let session = Arc::new(session);
let thread_id = ThreadId::new();
let mut metadata_builder = ThreadMetadataBuilder::new(
thread_id,
config.codex_home.join(format!("rollout-{thread_id}.jsonl")),
Utc::now(),
SessionSource::Cli,
);
metadata_builder.cwd = config.cwd.clone();
metadata_builder.model_provider = Some(config.model_provider_id.clone());
let metadata = metadata_builder.build(&config.model_provider_id);
state_db
.upsert_thread(&metadata)
.await
.expect("upsert thread metadata");
let claim = state_db
.try_claim_stage1_job(thread_id, session.conversation_id, 100, 3_600, 64)
.await
.expect("claim stage-1 job");
let ownership_token = match claim {
codex_state::Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
other => panic!("unexpected stage-1 claim outcome: {other:?}"),
};
assert!(
state_db
.mark_stage1_job_succeeded(
thread_id,
&ownership_token,
100,
"raw memory",
"rollout summary",
None,
)
.await
.expect("mark stage-1 success"),
"stage-1 success should enqueue global consolidation"
);
phase2::run(&session, Arc::clone(&config)).await;
let retry_claim = state_db
.try_claim_global_phase2_job(ThreadId::new(), 3_600)
.await
.expect("claim global job after spawn failure");
pretty_assertions::assert_eq!(
retry_claim,
Phase2JobClaimOutcome::SkippedNotDirty,
"spawn failures should leave the job in retry backoff instead of running"
);
}
}

View File

@@ -1214,7 +1214,7 @@ async fn find_thread_path_by_id_str_in_subdir(
let found = results.matches.into_iter().next().map(|m| m.full_path());
if let Some(found_path) = found.as_ref() {
tracing::error!("state db missing rollout path for thread {id_str}");
tracing::debug!("state db missing rollout path for thread {id_str}");
state_db::record_discrepancy("find_thread_path_by_id_str_in_subdir", "falling_back");
state_db::read_repair_rollout_path(
state_db_ctx.as_deref(),

View File

@@ -1825,6 +1825,7 @@ pub enum SubAgentSource {
parent_thread_id: ThreadId,
depth: i32,
},
MemoryConsolidation,
Other(String),
}
@@ -1846,6 +1847,7 @@ impl fmt::Display for SubAgentSource {
match self {
SubAgentSource::Review => f.write_str("review"),
SubAgentSource::Compact => f.write_str("compact"),
SubAgentSource::MemoryConsolidation => f.write_str("memory_consolidation"),
SubAgentSource::ThreadSpawn {
parent_thread_id,
depth,

View File

@@ -0,0 +1,2 @@
ALTER TABLE stage1_outputs
ADD COLUMN rollout_slug TEXT;

View File

@@ -15,6 +15,7 @@ pub struct Stage1Output {
pub source_updated_at: DateTime<Utc>,
pub raw_memory: String,
pub rollout_summary: String,
pub rollout_slug: Option<String>,
pub cwd: PathBuf,
pub generated_at: DateTime<Utc>,
}
@@ -25,6 +26,7 @@ pub(crate) struct Stage1OutputRow {
source_updated_at: i64,
raw_memory: String,
rollout_summary: String,
rollout_slug: Option<String>,
cwd: String,
generated_at: i64,
}
@@ -36,6 +38,7 @@ impl Stage1OutputRow {
source_updated_at: row.try_get("source_updated_at")?,
raw_memory: row.try_get("raw_memory")?,
rollout_summary: row.try_get("rollout_summary")?,
rollout_slug: row.try_get("rollout_slug")?,
cwd: row.try_get("cwd")?,
generated_at: row.try_get("generated_at")?,
})
@@ -51,6 +54,7 @@ impl TryFrom<Stage1OutputRow> for Stage1Output {
source_updated_at: epoch_seconds_to_datetime(row.source_updated_at)?,
raw_memory: row.raw_memory,
rollout_summary: row.rollout_summary,
rollout_slug: row.rollout_slug,
cwd: PathBuf::from(row.cwd),
generated_at: epoch_seconds_to_datetime(row.generated_at)?,
})

View File

@@ -1143,7 +1143,14 @@ WHERE id = 1
assert!(
runtime
.mark_stage1_job_succeeded(thread_id, ownership_token.as_str(), 100, "raw", "sum")
.mark_stage1_job_succeeded(
thread_id,
ownership_token.as_str(),
100,
"raw",
"sum",
None,
)
.await
.expect("mark stage1 succeeded"),
"stage1 success should finalize for current token"
@@ -1492,6 +1499,7 @@ WHERE id = 1
up_to_date.updated_at.timestamp(),
"raw",
"summary",
None,
)
.await
.expect("mark up-to-date thread succeeded"),
@@ -1715,6 +1723,7 @@ WHERE kind = 'memory_stage1'
claim.thread.updated_at.timestamp(),
"raw",
"summary",
None,
)
.await
.expect("mark first-batch stage1 success"),
@@ -1766,7 +1775,14 @@ WHERE kind = 'memory_stage1'
};
assert!(
runtime
.mark_stage1_job_succeeded(thread_id, ownership_token.as_str(), 100, "raw", "sum")
.mark_stage1_job_succeeded(
thread_id,
ownership_token.as_str(),
100,
"raw",
"sum",
None,
)
.await
.expect("mark stage1 succeeded"),
"mark stage1 succeeded should write stage1_outputs"
@@ -2058,6 +2074,7 @@ WHERE kind = 'memory_stage1'
100,
"raw memory a",
"summary a",
None,
)
.await
.expect("mark stage1 succeeded a"),
@@ -2080,6 +2097,7 @@ WHERE kind = 'memory_stage1'
101,
"raw memory b",
"summary b",
Some("rollout-b"),
)
.await
.expect("mark stage1 succeeded b"),
@@ -2093,9 +2111,11 @@ WHERE kind = 'memory_stage1'
assert_eq!(outputs.len(), 2);
assert_eq!(outputs[0].thread_id, thread_id_b);
assert_eq!(outputs[0].rollout_summary, "summary b");
assert_eq!(outputs[0].rollout_slug.as_deref(), Some("rollout-b"));
assert_eq!(outputs[0].cwd, codex_home.join("workspace-b"));
assert_eq!(outputs[1].thread_id, thread_id_a);
assert_eq!(outputs[1].rollout_summary, "summary a");
assert_eq!(outputs[1].rollout_slug, None);
assert_eq!(outputs[1].cwd, codex_home.join("workspace-a"));
let _ = tokio::fs::remove_dir_all(codex_home).await;
@@ -2208,7 +2228,14 @@ VALUES (?, ?, ?, ?, ?)
};
assert!(
runtime
.mark_stage1_job_succeeded(thread_a, token_a.as_str(), 100, "raw-a", "summary-a")
.mark_stage1_job_succeeded(
thread_a,
token_a.as_str(),
100,
"raw-a",
"summary-a",
None,
)
.await
.expect("mark stage1 succeeded a"),
"stage1 success should persist output for thread a"
@@ -2224,7 +2251,14 @@ VALUES (?, ?, ?, ?, ?)
};
assert!(
runtime
.mark_stage1_job_succeeded(thread_b, token_b.as_str(), 101, "raw-b", "summary-b")
.mark_stage1_job_succeeded(
thread_b,
token_b.as_str(),
101,
"raw-b",
"summary-b",
None,
)
.await
.expect("mark stage1 succeeded b"),
"stage1 success should persist output for thread b"

View File

@@ -191,7 +191,13 @@ LEFT JOIN jobs
let rows = sqlx::query(
r#"
SELECT so.thread_id, so.source_updated_at, so.raw_memory, so.rollout_summary, so.generated_at
SELECT
so.thread_id,
so.source_updated_at,
so.raw_memory,
so.rollout_summary,
so.rollout_slug,
so.generated_at
, COALESCE(t.cwd, '') AS cwd
FROM stage1_outputs AS so
LEFT JOIN threads AS t
@@ -407,6 +413,7 @@ WHERE kind = ? AND job_key = ?
/// - sets `status='done'` and `last_success_watermark = input_watermark`
/// - upserts `stage1_outputs` for the thread, replacing existing output only
/// when `source_updated_at` is newer or equal
/// - persists optional `rollout_slug` for rollout summary artifact naming
/// - enqueues/advances the global phase-2 job watermark using
/// `source_updated_at`
pub async fn mark_stage1_job_succeeded(
@@ -416,6 +423,7 @@ WHERE kind = ? AND job_key = ?
source_updated_at: i64,
raw_memory: &str,
rollout_summary: &str,
rollout_slug: Option<&str>,
) -> anyhow::Result<bool> {
let now = Utc::now().timestamp();
let thread_id = thread_id.to_string();
@@ -454,12 +462,14 @@ INSERT INTO stage1_outputs (
source_updated_at,
raw_memory,
rollout_summary,
rollout_slug,
generated_at
) VALUES (?, ?, ?, ?, ?)
) VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(thread_id) DO UPDATE SET
source_updated_at = excluded.source_updated_at,
raw_memory = excluded.raw_memory,
rollout_summary = excluded.rollout_summary,
rollout_slug = excluded.rollout_slug,
generated_at = excluded.generated_at
WHERE excluded.source_updated_at >= stage1_outputs.source_updated_at
"#,
@@ -468,6 +478,7 @@ WHERE excluded.source_updated_at >= stage1_outputs.source_updated_at
.bind(source_updated_at)
.bind(raw_memory)
.bind(rollout_summary)
.bind(rollout_slug)
.bind(now)
.execute(&mut *tx)
.await?;