From feae389942710d155a03b940fa59ea9049b80f0a Mon Sep 17 00:00:00 2001 From: jif-oai Date: Fri, 13 Feb 2026 12:59:17 +0000 Subject: [PATCH 1/5] Lower missing rollout log level (#11722) Fix this: https://github.com/openai/codex/issues/11634 --- codex-rs/core/src/rollout/list.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/codex-rs/core/src/rollout/list.rs b/codex-rs/core/src/rollout/list.rs index af9223119c..9b2b00c78b 100644 --- a/codex-rs/core/src/rollout/list.rs +++ b/codex-rs/core/src/rollout/list.rs @@ -1214,7 +1214,7 @@ async fn find_thread_path_by_id_str_in_subdir( let found = results.matches.into_iter().next().map(|m| m.full_path()); if let Some(found_path) = found.as_ref() { - tracing::error!("state db missing rollout path for thread {id_str}"); + tracing::debug!("state db missing rollout path for thread {id_str}"); state_db::record_discrepancy("find_thread_path_by_id_str_in_subdir", "falling_back"); state_db::read_repair_rollout_path( state_db_ctx.as_deref(), From 36541876f4de782ff212f3fed2203e6811722075 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Fri, 13 Feb 2026 13:21:11 +0000 Subject: [PATCH 2/5] chore: streamline phase 2 (#11712) --- .../schema/json/ServerNotification.json | 3 +- .../codex_app_server_protocol.schemas.json | 6 +- .../v1/GetConversationSummaryResponse.json | 3 +- .../json/v1/ListConversationsResponse.json | 3 +- .../schema/json/v2/ThreadForkResponse.json | 3 +- .../schema/json/v2/ThreadListResponse.json | 3 +- .../schema/json/v2/ThreadReadResponse.json | 3 +- .../schema/json/v2/ThreadResumeResponse.json | 3 +- .../json/v2/ThreadRollbackResponse.json | 3 +- .../schema/json/v2/ThreadStartResponse.json | 3 +- .../json/v2/ThreadStartedNotification.json | 3 +- .../json/v2/ThreadUnarchiveResponse.json | 3 +- .../schema/typescript/SubAgentSource.ts | 2 +- codex-rs/codex-api/src/requests/headers.rs | 3 + codex-rs/core/src/client.rs | 3 + codex-rs/core/src/memories/dispatch.rs | 688 ----------------- codex-rs/core/src/memories/mod.rs | 3 - codex-rs/core/src/memories/phase2.rs | 700 +++++++++--------- codex-rs/core/src/memories/start.rs | 3 +- codex-rs/core/src/memories/tests.rs | 495 +++++++++++++ codex-rs/protocol/src/protocol.rs | 2 + 21 files changed, 864 insertions(+), 1074 deletions(-) delete mode 100644 codex-rs/core/src/memories/dispatch.rs diff --git a/codex-rs/app-server-protocol/schema/json/ServerNotification.json b/codex-rs/app-server-protocol/schema/json/ServerNotification.json index b4b271eadd..3277e5650f 100644 --- a/codex-rs/app-server-protocol/schema/json/ServerNotification.json +++ b/codex-rs/app-server-protocol/schema/json/ServerNotification.json @@ -5997,7 +5997,8 @@ { "enum": [ "review", - "compact" + "compact", + "memory_consolidation" ], "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index 581869a60e..d808219e40 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -9126,7 +9126,8 @@ { "enum": [ "review", - "compact" + "compact", + "memory_consolidation" ], "type": "string" }, @@ -14480,7 +14481,8 @@ { "enum": [ "review", - "compact" + "compact", + "memory_consolidation" ], "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v1/GetConversationSummaryResponse.json b/codex-rs/app-server-protocol/schema/json/v1/GetConversationSummaryResponse.json index 954ac28ed1..a54b7ddac1 100644 --- a/codex-rs/app-server-protocol/schema/json/v1/GetConversationSummaryResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v1/GetConversationSummaryResponse.json @@ -113,7 +113,8 @@ { "enum": [ "review", - "compact" + "compact", + "memory_consolidation" ], "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v1/ListConversationsResponse.json b/codex-rs/app-server-protocol/schema/json/v1/ListConversationsResponse.json index b7e3b8f8f1..bb5a0f4861 100644 --- a/codex-rs/app-server-protocol/schema/json/v1/ListConversationsResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v1/ListConversationsResponse.json @@ -113,7 +113,8 @@ { "enum": [ "review", - "compact" + "compact", + "memory_consolidation" ], "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadForkResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadForkResponse.json index 10b9d50007..de1823e9de 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadForkResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadForkResponse.json @@ -666,7 +666,8 @@ { "enum": [ "review", - "compact" + "compact", + "memory_consolidation" ], "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadListResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadListResponse.json index 6fe793cc19..96e3761978 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadListResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadListResponse.json @@ -472,7 +472,8 @@ { "enum": [ "review", - "compact" + "compact", + "memory_consolidation" ], "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadReadResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadReadResponse.json index 444db89431..e0d4cc4f7a 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadReadResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadReadResponse.json @@ -472,7 +472,8 @@ { "enum": [ "review", - "compact" + "compact", + "memory_consolidation" ], "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json index 9b2fb74003..4385565f96 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json @@ -666,7 +666,8 @@ { "enum": [ "review", - "compact" + "compact", + "memory_consolidation" ], "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadRollbackResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadRollbackResponse.json index b6ec97be10..9f81c5ea7b 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadRollbackResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadRollbackResponse.json @@ -472,7 +472,8 @@ { "enum": [ "review", - "compact" + "compact", + "memory_consolidation" ], "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json index 0d22e08810..fd75b56a28 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json @@ -666,7 +666,8 @@ { "enum": [ "review", - "compact" + "compact", + "memory_consolidation" ], "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartedNotification.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartedNotification.json index 6a9324409e..3ad16408ee 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartedNotification.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartedNotification.json @@ -472,7 +472,8 @@ { "enum": [ "review", - "compact" + "compact", + "memory_consolidation" ], "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadUnarchiveResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadUnarchiveResponse.json index 2c1c9950fe..3f8bd66861 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadUnarchiveResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadUnarchiveResponse.json @@ -472,7 +472,8 @@ { "enum": [ "review", - "compact" + "compact", + "memory_consolidation" ], "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/typescript/SubAgentSource.ts b/codex-rs/app-server-protocol/schema/typescript/SubAgentSource.ts index d6da7a466b..269a411be9 100644 --- a/codex-rs/app-server-protocol/schema/typescript/SubAgentSource.ts +++ b/codex-rs/app-server-protocol/schema/typescript/SubAgentSource.ts @@ -3,4 +3,4 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { ThreadId } from "./ThreadId"; -export type SubAgentSource = "review" | "compact" | { "thread_spawn": { parent_thread_id: ThreadId, depth: number, } } | { "other": string }; +export type SubAgentSource = "review" | "compact" | { "thread_spawn": { parent_thread_id: ThreadId, depth: number, } } | "memory_consolidation" | { "other": string }; diff --git a/codex-rs/codex-api/src/requests/headers.rs b/codex-rs/codex-api/src/requests/headers.rs index 02f8c61c31..d1ab834109 100644 --- a/codex-rs/codex-api/src/requests/headers.rs +++ b/codex-rs/codex-api/src/requests/headers.rs @@ -17,6 +17,9 @@ pub(crate) fn subagent_header(source: &Option) -> Option match sub { codex_protocol::protocol::SubAgentSource::Review => Some("review".to_string()), codex_protocol::protocol::SubAgentSource::Compact => Some("compact".to_string()), + codex_protocol::protocol::SubAgentSource::MemoryConsolidation => { + Some("memory_consolidation".to_string()) + } codex_protocol::protocol::SubAgentSource::ThreadSpawn { .. } => { Some("collab_spawn".to_string()) } diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index bbf9b26c98..ec0d1e54f0 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -325,6 +325,9 @@ impl ModelClient { let subagent = match sub { crate::protocol::SubAgentSource::Review => "review".to_string(), crate::protocol::SubAgentSource::Compact => "compact".to_string(), + crate::protocol::SubAgentSource::MemoryConsolidation => { + "memory_consolidation".to_string() + } crate::protocol::SubAgentSource::ThreadSpawn { .. } => "collab_spawn".to_string(), crate::protocol::SubAgentSource::Other(label) => label.clone(), }; diff --git a/codex-rs/core/src/memories/dispatch.rs b/codex-rs/core/src/memories/dispatch.rs deleted file mode 100644 index bfa6daa5a8..0000000000 --- a/codex-rs/core/src/memories/dispatch.rs +++ /dev/null @@ -1,688 +0,0 @@ -use crate::codex::Session; -use crate::config::Config; -use crate::config::Constrained; -use crate::memories::memory_root; -use crate::memories::metrics; -use crate::memories::phase_two; -use crate::memories::phase2::spawn_phase2_completion_task; -use crate::memories::prompts::build_consolidation_prompt; -use crate::memories::storage::rebuild_raw_memories_file_from_memories; -use crate::memories::storage::sync_rollout_summaries_from_memories; -use codex_protocol::protocol::AskForApproval; -use codex_protocol::protocol::SandboxPolicy; -use codex_protocol::protocol::SessionSource; -use codex_protocol::protocol::SubAgentSource; -use codex_protocol::user_input::UserInput; -use codex_utils_absolute_path::AbsolutePathBuf; -use std::sync::Arc; -use tracing::debug; -use tracing::info; -use tracing::warn; - -//TODO(jif) clean. - -fn completion_watermark( - claimed_watermark: i64, - latest_memories: &[codex_state::Stage1Output], -) -> i64 { - latest_memories - .iter() - .map(|memory| memory.source_updated_at.timestamp()) - .max() - .unwrap_or(claimed_watermark) - .max(claimed_watermark) -} - -pub(in crate::memories) async fn run_global_memory_consolidation( - session: &Arc, - config: Arc, -) -> bool { - let otel_manager = &session.services.otel_manager; - let Some(state_db) = session.services.state_db.as_deref() else { - warn!("state db unavailable; skipping global memory consolidation"); - otel_manager.counter( - metrics::MEMORY_PHASE_TWO_JOBS, - 1, - &[("status", "skipped_state_db_unavailable")], - ); - return false; - }; - - let claim = match state_db - .try_claim_global_phase2_job(session.conversation_id, phase_two::JOB_LEASE_SECONDS) - .await - { - Ok(claim) => claim, - Err(err) => { - warn!("state db try_claim_global_phase2_job failed during memories startup: {err}"); - otel_manager.counter( - metrics::MEMORY_PHASE_TWO_JOBS, - 1, - &[("status", "failed_claim")], - ); - return false; - } - }; - let (ownership_token, claimed_watermark) = match claim { - codex_state::Phase2JobClaimOutcome::Claimed { - ownership_token, - input_watermark, - } => { - otel_manager.counter(metrics::MEMORY_PHASE_TWO_JOBS, 1, &[("status", "claimed")]); - (ownership_token, input_watermark) - } - codex_state::Phase2JobClaimOutcome::SkippedNotDirty => { - debug!("memory phase-2 global lock is up-to-date; skipping consolidation"); - otel_manager.counter( - metrics::MEMORY_PHASE_TWO_JOBS, - 1, - &[("status", "skipped_not_dirty")], - ); - return false; - } - codex_state::Phase2JobClaimOutcome::SkippedRunning => { - debug!("memory phase-2 global consolidation already running; skipping"); - otel_manager.counter( - metrics::MEMORY_PHASE_TWO_JOBS, - 1, - &[("status", "skipped_running")], - ); - return false; - } - }; - - let root = memory_root(&config.codex_home); - let consolidation_config = { - let mut consolidation_config = config.as_ref().clone(); - consolidation_config.cwd = root.clone(); - consolidation_config.permissions.approval_policy = - Constrained::allow_only(AskForApproval::Never); - let mut writable_roots = Vec::new(); - match AbsolutePathBuf::from_absolute_path(consolidation_config.codex_home.clone()) { - Ok(codex_home) => writable_roots.push(codex_home), - Err(err) => warn!( - "memory phase-2 consolidation could not add codex_home writable root {}: {err}", - consolidation_config.codex_home.display() - ), - } - let consolidation_sandbox_policy = SandboxPolicy::WorkspaceWrite { - writable_roots, - read_only_access: Default::default(), - network_access: false, - exclude_tmpdir_env_var: false, - exclude_slash_tmp: false, - }; - if let Err(err) = consolidation_config - .permissions - .sandbox_policy - .set(consolidation_sandbox_policy) - { - warn!("memory phase-2 consolidation sandbox policy was rejected by constraints: {err}"); - otel_manager.counter( - metrics::MEMORY_PHASE_TWO_JOBS, - 1, - &[("status", "failed_sandbox_policy")], - ); - let _ = state_db - .mark_global_phase2_job_failed( - &ownership_token, - "consolidation sandbox policy was rejected by constraints", - phase_two::JOB_RETRY_DELAY_SECONDS, - ) - .await; - return false; - } - consolidation_config - }; - - let latest_memories = match state_db - .list_stage1_outputs_for_global(phase_two::MAX_RAW_MEMORIES_FOR_GLOBAL) - .await - { - Ok(memories) => memories, - Err(err) => { - warn!("state db list_stage1_outputs_for_global failed during consolidation: {err}"); - otel_manager.counter( - metrics::MEMORY_PHASE_TWO_JOBS, - 1, - &[("status", "failed_load_stage1_outputs")], - ); - let _ = state_db - .mark_global_phase2_job_failed( - &ownership_token, - "failed to read stage-1 outputs before global consolidation", - phase_two::JOB_RETRY_DELAY_SECONDS, - ) - .await; - return false; - } - }; - if !latest_memories.is_empty() { - otel_manager.counter( - metrics::MEMORY_PHASE_TWO_INPUT, - latest_memories.len() as i64, - &[], - ); - } - let completion_watermark = completion_watermark(claimed_watermark, &latest_memories); - if let Err(err) = sync_rollout_summaries_from_memories(&root, &latest_memories).await { - warn!("failed syncing local memory artifacts for global consolidation: {err}"); - otel_manager.counter( - metrics::MEMORY_PHASE_TWO_JOBS, - 1, - &[("status", "failed_sync_artifacts")], - ); - let _ = state_db - .mark_global_phase2_job_failed( - &ownership_token, - "failed syncing local memory artifacts", - phase_two::JOB_RETRY_DELAY_SECONDS, - ) - .await; - return false; - } - - if let Err(err) = rebuild_raw_memories_file_from_memories(&root, &latest_memories).await { - warn!("failed rebuilding raw memories aggregate for global consolidation: {err}"); - otel_manager.counter( - metrics::MEMORY_PHASE_TWO_JOBS, - 1, - &[("status", "failed_rebuild_raw_memories")], - ); - let _ = state_db - .mark_global_phase2_job_failed( - &ownership_token, - "failed rebuilding raw memories aggregate", - phase_two::JOB_RETRY_DELAY_SECONDS, - ) - .await; - return false; - } - if latest_memories.is_empty() { - debug!("memory phase-2 has no stage-1 outputs; finalized local memory artifacts"); - let _ = state_db - .mark_global_phase2_job_succeeded(&ownership_token, completion_watermark) - .await; - otel_manager.counter( - metrics::MEMORY_PHASE_TWO_JOBS, - 1, - &[("status", "succeeded_no_input")], - ); - return false; - } - - let prompt = build_consolidation_prompt(&root); - let input = vec![UserInput::Text { - text: prompt, - text_elements: vec![], - }]; - let source = SessionSource::SubAgent(SubAgentSource::Other( - phase_two::MEMORY_CONSOLIDATION_SUBAGENT_LABEL.to_string(), - )); - - match session - .services - .agent_control - .spawn_agent(consolidation_config, input, Some(source)) - .await - { - Ok(consolidation_agent_id) => { - info!( - "memory phase-2 global consolidation agent started: agent_id={consolidation_agent_id}" - ); - otel_manager.counter( - metrics::MEMORY_PHASE_TWO_JOBS, - 1, - &[("status", "agent_spawned")], - ); - spawn_phase2_completion_task( - session.as_ref(), - ownership_token, - completion_watermark, - consolidation_agent_id, - ); - true - } - Err(err) => { - warn!("failed to spawn global memory consolidation agent: {err}"); - otel_manager.counter( - metrics::MEMORY_PHASE_TWO_JOBS, - 1, - &[("status", "failed_spawn_agent")], - ); - let _ = state_db - .mark_global_phase2_job_failed( - &ownership_token, - "failed to spawn consolidation agent", - phase_two::JOB_RETRY_DELAY_SECONDS, - ) - .await; - false - } - } -} - -#[cfg(test)] -mod tests { - use super::completion_watermark; - use super::run_global_memory_consolidation; - use crate::CodexAuth; - use crate::ThreadManager; - use crate::agent::control::AgentControl; - use crate::codex::Session; - use crate::codex::make_session_and_context; - use crate::config::Config; - use crate::config::test_config; - use crate::memories::memory_root; - use crate::memories::raw_memories_file; - use crate::memories::rollout_summaries_dir; - use chrono::Utc; - use codex_protocol::ThreadId; - use codex_protocol::protocol::AskForApproval; - use codex_protocol::protocol::Op; - use codex_protocol::protocol::SandboxPolicy; - use codex_protocol::protocol::SessionSource; - use codex_state::Phase2JobClaimOutcome; - use codex_state::Stage1Output; - use codex_state::ThreadMetadataBuilder; - use pretty_assertions::assert_eq; - use std::path::PathBuf; - use std::sync::Arc; - use tempfile::TempDir; - - struct DispatchHarness { - _codex_home: TempDir, - config: Arc, - session: Arc, - manager: ThreadManager, - state_db: Arc, - } - - impl DispatchHarness { - async fn new() -> Self { - let codex_home = tempfile::tempdir().expect("create temp codex home"); - let mut config = test_config(); - config.codex_home = codex_home.path().to_path_buf(); - config.cwd = config.codex_home.clone(); - let config = Arc::new(config); - - let state_db = codex_state::StateRuntime::init( - config.codex_home.clone(), - config.model_provider_id.clone(), - None, - ) - .await - .expect("initialize state db"); - - let manager = ThreadManager::with_models_provider_and_home_for_tests( - CodexAuth::from_api_key("dummy"), - config.model_provider.clone(), - config.codex_home.clone(), - ); - let (mut session, _turn_context) = make_session_and_context().await; - session.services.state_db = Some(Arc::clone(&state_db)); - session.services.agent_control = manager.agent_control(); - - Self { - _codex_home: codex_home, - config, - session: Arc::new(session), - manager, - state_db, - } - } - - async fn seed_stage1_output(&self, source_updated_at: i64) { - let thread_id = ThreadId::new(); - let mut metadata_builder = ThreadMetadataBuilder::new( - thread_id, - self.config - .codex_home - .join(format!("rollout-{thread_id}.jsonl")), - Utc::now(), - SessionSource::Cli, - ); - metadata_builder.cwd = self.config.cwd.clone(); - metadata_builder.model_provider = Some(self.config.model_provider_id.clone()); - let metadata = metadata_builder.build(&self.config.model_provider_id); - - self.state_db - .upsert_thread(&metadata) - .await - .expect("upsert thread metadata"); - - let claim = self - .state_db - .try_claim_stage1_job( - thread_id, - self.session.conversation_id, - source_updated_at, - 3_600, - 64, - ) - .await - .expect("claim stage-1 job"); - let ownership_token = match claim { - codex_state::Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token, - other => panic!("unexpected stage-1 claim outcome: {other:?}"), - }; - assert!( - self.state_db - .mark_stage1_job_succeeded( - thread_id, - &ownership_token, - source_updated_at, - "raw memory", - "rollout summary", - ) - .await - .expect("mark stage-1 success"), - "stage-1 success should enqueue global consolidation" - ); - } - - async fn shutdown_threads(&self) { - self.manager - .remove_and_close_all_threads() - .await - .expect("shutdown spawned threads"); - } - - fn user_input_ops_count(&self) -> usize { - self.manager - .captured_ops() - .into_iter() - .filter(|(_, op)| matches!(op, Op::UserInput { .. })) - .count() - } - } - - #[test] - fn completion_watermark_never_regresses_below_claimed_input_watermark() { - let stage1_output = Stage1Output { - thread_id: ThreadId::new(), - source_updated_at: chrono::DateTime::::from_timestamp(123, 0) - .expect("valid source_updated_at timestamp"), - raw_memory: "raw memory".to_string(), - rollout_summary: "rollout summary".to_string(), - cwd: PathBuf::from("/tmp/workspace"), - generated_at: chrono::DateTime::::from_timestamp(124, 0) - .expect("valid generated_at timestamp"), - }; - - let completion = completion_watermark(1_000, &[stage1_output]); - assert_eq!(completion, 1_000); - } - - #[tokio::test] - async fn dispatch_reclaims_stale_global_lock_and_starts_consolidation() { - let harness = DispatchHarness::new().await; - harness.seed_stage1_output(100).await; - - let stale_claim = harness - .state_db - .try_claim_global_phase2_job(ThreadId::new(), 0) - .await - .expect("claim stale global lock"); - assert!( - matches!(stale_claim, Phase2JobClaimOutcome::Claimed { .. }), - "stale lock precondition should be claimed" - ); - - let scheduled = - run_global_memory_consolidation(&harness.session, Arc::clone(&harness.config)).await; - assert!( - scheduled, - "dispatch should reclaim stale lock and spawn one agent" - ); - - let running_claim = harness - .state_db - .try_claim_global_phase2_job(ThreadId::new(), 3_600) - .await - .expect("claim while running"); - assert_eq!(running_claim, Phase2JobClaimOutcome::SkippedRunning); - - let user_input_ops = harness.user_input_ops_count(); - assert_eq!(user_input_ops, 1); - let thread_ids = harness.manager.list_thread_ids().await; - assert_eq!(thread_ids.len(), 1); - let subagent = harness - .manager - .get_thread(thread_ids[0]) - .await - .expect("get consolidation thread"); - let config_snapshot = subagent.config_snapshot().await; - assert_eq!(config_snapshot.approval_policy, AskForApproval::Never); - assert_eq!(config_snapshot.cwd, memory_root(&harness.config.codex_home)); - match config_snapshot.sandbox_policy { - SandboxPolicy::WorkspaceWrite { writable_roots, .. } => { - assert!( - writable_roots - .iter() - .any(|root| root.as_path() == harness.config.codex_home.as_path()), - "consolidation subagent should have codex_home as writable root" - ); - } - other => panic!("unexpected sandbox policy: {other:?}"), - } - - harness.shutdown_threads().await; - } - - #[tokio::test] - async fn dispatch_schedules_only_one_agent_while_lock_is_running() { - let harness = DispatchHarness::new().await; - harness.seed_stage1_output(200).await; - - let first_run = - run_global_memory_consolidation(&harness.session, Arc::clone(&harness.config)).await; - let second_run = - run_global_memory_consolidation(&harness.session, Arc::clone(&harness.config)).await; - - assert!(first_run, "first dispatch should schedule consolidation"); - assert!( - !second_run, - "second dispatch should skip while the global lock is running" - ); - - let user_input_ops = harness.user_input_ops_count(); - assert_eq!(user_input_ops, 1); - - harness.shutdown_threads().await; - } - - #[tokio::test] - async fn dispatch_with_dirty_job_and_no_stage1_outputs_skips_spawn_and_clears_dirty_flag() { - let harness = DispatchHarness::new().await; - harness - .state_db - .enqueue_global_consolidation(999) - .await - .expect("enqueue global consolidation"); - - let scheduled = - run_global_memory_consolidation(&harness.session, Arc::clone(&harness.config)).await; - assert!( - !scheduled, - "dispatch should not spawn when no stage-1 outputs are available" - ); - assert_eq!(harness.user_input_ops_count(), 0); - - let claim = harness - .state_db - .try_claim_global_phase2_job(ThreadId::new(), 3_600) - .await - .expect("claim global job after empty dispatch"); - assert_eq!( - claim, - Phase2JobClaimOutcome::SkippedNotDirty, - "empty dispatch should finalize global job as up-to-date" - ); - - harness.shutdown_threads().await; - } - - #[tokio::test] - async fn dispatch_with_empty_stage1_outputs_rebuilds_local_artifacts() { - let harness = DispatchHarness::new().await; - let root = memory_root(&harness.config.codex_home); - let summaries_dir = rollout_summaries_dir(&root); - tokio::fs::create_dir_all(&summaries_dir) - .await - .expect("create rollout summaries dir"); - - let stale_summary_path = summaries_dir.join(format!("{}.md", ThreadId::new())); - tokio::fs::write(&stale_summary_path, "stale summary\n") - .await - .expect("write stale rollout summary"); - let raw_memories_path = raw_memories_file(&root); - tokio::fs::write(&raw_memories_path, "stale raw memories\n") - .await - .expect("write stale raw memories"); - let memory_index_path = root.join("MEMORY.md"); - tokio::fs::write(&memory_index_path, "stale memory index\n") - .await - .expect("write stale memory index"); - let memory_summary_path = root.join("memory_summary.md"); - tokio::fs::write(&memory_summary_path, "stale memory summary\n") - .await - .expect("write stale memory summary"); - let stale_skill_file = root.join("skills/demo/SKILL.md"); - tokio::fs::create_dir_all( - stale_skill_file - .parent() - .expect("skills subdirectory parent should exist"), - ) - .await - .expect("create stale skills dir"); - tokio::fs::write(&stale_skill_file, "stale skill\n") - .await - .expect("write stale skill"); - - harness - .state_db - .enqueue_global_consolidation(999) - .await - .expect("enqueue global consolidation"); - - let scheduled = - run_global_memory_consolidation(&harness.session, Arc::clone(&harness.config)).await; - assert!( - !scheduled, - "dispatch should skip subagent spawn when no stage-1 outputs are available" - ); - - assert!( - !tokio::fs::try_exists(&stale_summary_path) - .await - .expect("check stale summary existence"), - "empty consolidation should prune stale rollout summary files" - ); - let raw_memories = tokio::fs::read_to_string(&raw_memories_path) - .await - .expect("read rebuilt raw memories"); - assert_eq!(raw_memories, "# Raw Memories\n\nNo raw memories yet.\n"); - assert!( - !tokio::fs::try_exists(&memory_index_path) - .await - .expect("check memory index existence"), - "empty consolidation should remove stale MEMORY.md" - ); - assert!( - !tokio::fs::try_exists(&memory_summary_path) - .await - .expect("check memory summary existence"), - "empty consolidation should remove stale memory_summary.md" - ); - assert!( - !tokio::fs::try_exists(&stale_skill_file) - .await - .expect("check stale skill existence"), - "empty consolidation should remove stale skills artifacts" - ); - assert!( - !tokio::fs::try_exists(root.join("skills")) - .await - .expect("check skills dir existence"), - "empty consolidation should remove stale skills directory" - ); - - harness.shutdown_threads().await; - } - - #[tokio::test] - async fn dispatch_marks_job_for_retry_when_spawn_agent_fails() { - let codex_home = tempfile::tempdir().expect("create temp codex home"); - let mut config = test_config(); - config.codex_home = codex_home.path().to_path_buf(); - config.cwd = config.codex_home.clone(); - let config = Arc::new(config); - - let state_db = codex_state::StateRuntime::init( - config.codex_home.clone(), - config.model_provider_id.clone(), - None, - ) - .await - .expect("initialize state db"); - - let (mut session, _turn_context) = make_session_and_context().await; - session.services.state_db = Some(Arc::clone(&state_db)); - session.services.agent_control = AgentControl::default(); - let session = Arc::new(session); - - let thread_id = ThreadId::new(); - let mut metadata_builder = ThreadMetadataBuilder::new( - thread_id, - config.codex_home.join(format!("rollout-{thread_id}.jsonl")), - Utc::now(), - SessionSource::Cli, - ); - metadata_builder.cwd = config.cwd.clone(); - metadata_builder.model_provider = Some(config.model_provider_id.clone()); - let metadata = metadata_builder.build(&config.model_provider_id); - state_db - .upsert_thread(&metadata) - .await - .expect("upsert thread metadata"); - - let claim = state_db - .try_claim_stage1_job(thread_id, session.conversation_id, 100, 3_600, 64) - .await - .expect("claim stage-1 job"); - let ownership_token = match claim { - codex_state::Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token, - other => panic!("unexpected stage-1 claim outcome: {other:?}"), - }; - assert!( - state_db - .mark_stage1_job_succeeded( - thread_id, - &ownership_token, - 100, - "raw memory", - "rollout summary", - ) - .await - .expect("mark stage-1 success"), - "stage-1 success should enqueue global consolidation" - ); - - let scheduled = run_global_memory_consolidation(&session, Arc::clone(&config)).await; - assert!( - !scheduled, - "dispatch should return false when consolidation subagent cannot be spawned" - ); - - let retry_claim = state_db - .try_claim_global_phase2_job(ThreadId::new(), 3_600) - .await - .expect("claim global job after spawn failure"); - assert_eq!( - retry_claim, - Phase2JobClaimOutcome::SkippedNotDirty, - "spawn failures should leave the job in retry backoff instead of running" - ); - } -} diff --git a/codex-rs/core/src/memories/mod.rs b/codex-rs/core/src/memories/mod.rs index 179c90cbb5..2892f1faae 100644 --- a/codex-rs/core/src/memories/mod.rs +++ b/codex-rs/core/src/memories/mod.rs @@ -4,7 +4,6 @@ //! - Phase 1: select rollouts, extract stage-1 raw memories, persist stage-1 outputs, and enqueue consolidation. //! - Phase 2: claim a global consolidation lock, materialize consolidation inputs, and dispatch one consolidation agent. -mod dispatch; mod phase1; mod phase2; pub(crate) mod prompts; @@ -58,8 +57,6 @@ mod phase_one { /// Phase 2 (aka `Consolidation`). mod phase_two { - /// Subagent source label used to identify consolidation tasks. - pub(super) const MEMORY_CONSOLIDATION_SUBAGENT_LABEL: &str = "memory_consolidation"; /// Maximum number of recent raw memories retained for global consolidation. pub(super) const MAX_RAW_MEMORIES_FOR_GLOBAL: usize = 1_024; /// Lease duration (seconds) for phase-2 consolidation job ownership. diff --git a/codex-rs/core/src/memories/phase2.rs b/codex-rs/core/src/memories/phase2.rs index ef0f4642b6..a43094a726 100644 --- a/codex-rs/core/src/memories/phase2.rs +++ b/codex-rs/core/src/memories/phase2.rs @@ -1,136 +1,351 @@ use crate::agent::AgentStatus; use crate::agent::status::is_final as is_final_agent_status; use crate::codex::Session; +use crate::config::Config; +use crate::memories::memory_root; use crate::memories::metrics; use crate::memories::phase_two; +use crate::memories::prompts::build_consolidation_prompt; +use crate::memories::storage::rebuild_raw_memories_file_from_memories; +use crate::memories::storage::sync_rollout_summaries_from_memories; +use codex_config::Constrained; use codex_protocol::ThreadId; +use codex_protocol::protocol::AskForApproval; +use codex_protocol::protocol::SandboxPolicy; +use codex_protocol::protocol::SessionSource; +use codex_protocol::protocol::SubAgentSource; +use codex_protocol::user_input::UserInput; +use codex_state::StateRuntime; +use codex_utils_absolute_path::AbsolutePathBuf; use std::sync::Arc; use std::time::Duration; use tokio::sync::watch; -use tracing::debug; -use tracing::info; use tracing::warn; -pub(in crate::memories) fn spawn_phase2_completion_task( - session: &Session, - ownership_token: String, - completion_watermark: i64, - consolidation_agent_id: ThreadId, -) { - let state_db = session.services.state_db.clone(); - let agent_control = session.services.agent_control.clone(); - let otel_manager = session.services.otel_manager.clone(); - - tokio::spawn(async move { - let Some(state_db) = state_db else { - return; - }; - - let status_rx = match agent_control.subscribe_status(consolidation_agent_id).await { - Ok(status_rx) => status_rx, - Err(err) => { - warn!( - "failed to subscribe to global memory consolidation agent {consolidation_agent_id}: {err}" - ); - otel_manager.counter( - metrics::MEMORY_PHASE_TWO_JOBS, - 1, - &[("status", "failed_subscribe_status")], - ); - mark_phase2_failed_with_recovery( - state_db.as_ref(), - &ownership_token, - "failed to subscribe to consolidation agent status", - ) - .await; - return; - } - }; - - let final_status = run_phase2_completion_task( - Arc::clone(&state_db), - ownership_token, - completion_watermark, - consolidation_agent_id, - status_rx, - ) - .await; - if matches!(final_status, AgentStatus::Shutdown | AgentStatus::NotFound) { - otel_manager.counter( - metrics::MEMORY_PHASE_TWO_JOBS, - 1, - &[("status", "failed_agent_unavailable")], - ); - return; - } - if is_phase2_success(&final_status) { - otel_manager.counter( - metrics::MEMORY_PHASE_TWO_JOBS, - 1, - &[("status", "succeeded")], - ); - } else { - otel_manager.counter(metrics::MEMORY_PHASE_TWO_JOBS, 1, &[("status", "failed")]); - } - - tokio::spawn(async move { - if let Err(err) = agent_control.shutdown_agent(consolidation_agent_id).await { - warn!( - "failed to auto-close global memory consolidation agent {consolidation_agent_id}: {err}" - ); - } - }); - }); +#[derive(Debug, Clone, Default)] +struct Claim { + token: String, + watermark: i64, } -async fn run_phase2_completion_task( - state_db: Arc, - ownership_token: String, - completion_watermark: i64, - consolidation_agent_id: ThreadId, - mut status_rx: watch::Receiver, -) -> AgentStatus { - let final_status = { +#[derive(Debug, Clone, Default)] +struct Counters { + input: i64, +} + +/// Runs memory phase 2 (aka consolidation) in strict order. The method represents the linear +/// flow of the consolidation phase. +pub(super) async fn run(session: &Arc, config: Arc) { + let Some(db) = session.services.state_db.as_deref() else { + // This should not happen. + return; + }; + let root = memory_root(&config.codex_home); + + // 1. Claim the job. + let claim = match job::claim(session, db).await { + Ok(claim) => claim, + Err(e) => { + session.services.otel_manager.counter( + metrics::MEMORY_PHASE_TWO_JOBS, + 1, + &[("status", e)], + ); + return; + } + }; + + // 2. Get the config for the agent + let Some(agent_config) = agent::get_config(config.clone()) else { + // If we can't get the config, we can't consolidate. + tracing::error!("failed to get agent config"); + job::failed(session, db, &claim, "failed_sandbox_policy").await; + return; + }; + + // 3. Query the memories + let raw_memories = match db + .list_stage1_outputs_for_global(phase_two::MAX_RAW_MEMORIES_FOR_GLOBAL) + .await + { + Ok(memories) => memories, + Err(err) => { + tracing::error!("failed to list stage1 outputs from global: {}", err); + job::failed(session, db, &claim, "failed_load_stage1_outputs").await; + return; + } + }; + let new_watermark = get_watermark(claim.watermark, &raw_memories); + + // 4. Update the file system by syncing the raw memories with the one extracted from DB at + // step 3 + // [`rollout_summaries/`] + if let Err(err) = sync_rollout_summaries_from_memories(&root, &raw_memories).await { + tracing::error!("failed syncing local memory artifacts for global consolidation: {err}"); + job::failed(session, db, &claim, "failed_sync_artifacts").await; + return; + } + // [`raw_memories.md`] + if let Err(err) = rebuild_raw_memories_file_from_memories(&root, &raw_memories).await { + tracing::error!("failed syncing local memory artifacts for global consolidation: {err}"); + job::failed(session, db, &claim, "failed_rebuild_raw_memories").await; + return; + } + if raw_memories.is_empty() { + // We check only after sync of the file system. + job::succeed(session, db, &claim, new_watermark, "succeeded_no_input").await; + return; + } + + // 5. Spawn the agent + let prompt = agent::get_prompt(config); + let source = SessionSource::SubAgent(SubAgentSource::MemoryConsolidation); + let thread_id = match session + .services + .agent_control + .spawn_agent(agent_config, prompt, Some(source)) + .await + { + Ok(thread_id) => thread_id, + Err(err) => { + tracing::error!("failed to spawn global memory consolidation agent: {err}"); + job::failed(session, db, &claim, "failed_spawn_agent").await; + return; + } + }; + + // 6. Spawn the agent handler. + agent::handle(session, claim, new_watermark, thread_id); + + // 7. Metrics and logs. + let counters = Counters { + input: raw_memories.len() as i64, + }; + emit_metrics(session, counters); +} + +mod job { + use super::*; + + pub(super) async fn claim( + session: &Arc, + db: &StateRuntime, + ) -> Result { + let otel_manager = &session.services.otel_manager; + let claim = db + .try_claim_global_phase2_job(session.conversation_id, phase_two::JOB_LEASE_SECONDS) + .await + .map_err(|e| { + tracing::error!("failed to claim job: {}", e); + "failed_claim" + })?; + let (token, watermark) = match claim { + codex_state::Phase2JobClaimOutcome::Claimed { + ownership_token, + input_watermark, + } => { + otel_manager.counter(metrics::MEMORY_PHASE_TWO_JOBS, 1, &[("status", "claimed")]); + (ownership_token, input_watermark) + } + codex_state::Phase2JobClaimOutcome::SkippedNotDirty => return Err("skipped_not_dirty"), + codex_state::Phase2JobClaimOutcome::SkippedRunning => return Err("skipped_running"), + }; + + Ok(Claim { token, watermark }) + } + + pub(super) async fn failed( + session: &Arc, + db: &StateRuntime, + claim: &Claim, + reason: &'static str, + ) { + session.services.otel_manager.counter( + metrics::MEMORY_PHASE_TWO_JOBS, + 1, + &[("status", reason)], + ); + if matches!( + db.mark_global_phase2_job_failed( + &claim.token, + reason, + phase_two::JOB_RETRY_DELAY_SECONDS, + ) + .await, + Ok(false) + ) { + let _ = db + .mark_global_phase2_job_failed_if_unowned( + &claim.token, + reason, + phase_two::JOB_RETRY_DELAY_SECONDS, + ) + .await; + } + } + + pub(super) async fn succeed( + session: &Arc, + db: &StateRuntime, + claim: &Claim, + completion_watermark: i64, + reason: &'static str, + ) { + session.services.otel_manager.counter( + metrics::MEMORY_PHASE_TWO_JOBS, + 1, + &[("status", reason)], + ); + let _ = db + .mark_global_phase2_job_succeeded(&claim.token, completion_watermark) + .await; + } +} + +mod agent { + use super::*; + + pub(super) fn get_config(config: Arc) -> Option { + let root = memory_root(&config.codex_home); + let mut consolidation_config = config.as_ref().clone(); + + consolidation_config.cwd = root; + // Approval policy + consolidation_config.permissions.approval_policy = + Constrained::allow_only(AskForApproval::Never); + + // Sandbox policy + let mut writable_roots = Vec::new(); + match AbsolutePathBuf::from_absolute_path(consolidation_config.codex_home.clone()) { + Ok(codex_home) => writable_roots.push(codex_home), + Err(err) => warn!( + "memory phase-2 consolidation could not add codex_home writable root {}: {err}", + consolidation_config.codex_home.display() + ), + } + // The consolidation agent only needs local codex_home write access and no network. + let consolidation_sandbox_policy = SandboxPolicy::WorkspaceWrite { + writable_roots, + read_only_access: Default::default(), + network_access: false, + exclude_tmpdir_env_var: false, + exclude_slash_tmp: false, + }; + consolidation_config + .permissions + .sandbox_policy + .set(consolidation_sandbox_policy) + .ok()?; + + Some(consolidation_config) + } + + pub(super) fn get_prompt(config: Arc) -> Vec { + let root = memory_root(&config.codex_home); + let prompt = build_consolidation_prompt(&root); + vec![UserInput::Text { + text: prompt, + text_elements: vec![], + }] + } + + /// Handle the agent while it is running. + pub(super) fn handle( + session: &Arc, + claim: Claim, + new_watermark: i64, + thread_id: ThreadId, + ) { + let Some(db) = session.services.state_db.clone() else { + return; + }; + let session = session.clone(); + + tokio::spawn(async move { + let agent_control = session.services.agent_control.clone(); + + // TODO(jif) we might have a very small race here. + let rx = match agent_control.subscribe_status(thread_id).await { + Ok(rx) => rx, + Err(err) => { + tracing::error!("agent_control.subscribe_status failed: {err:?}"); + job::failed(&session, &db, &claim, "failed_subscribe_status").await; + return; + } + }; + + // Loop the agent until we have the final status. + let final_status = loop_agent( + db.clone(), + claim.token.clone(), + new_watermark, + thread_id, + rx, + ) + .await; + + if matches!(final_status, AgentStatus::Completed(_)) { + job::succeed(&session, &db, &claim, new_watermark, "succeeded").await; + } else { + job::failed(&session, &db, &claim, "failed_agent").await; + } + + // Fire and forget close of the agent. + if !matches!(final_status, AgentStatus::Shutdown | AgentStatus::NotFound) { + tokio::spawn(async move { + if let Err(err) = agent_control.shutdown_agent(thread_id).await { + warn!( + "failed to auto-close global memory consolidation agent {thread_id}: {err}" + ); + } + }); + } else { + tracing::warn!("The agent was already gone"); + } + }); + } + + async fn loop_agent( + db: Arc, + token: String, + _new_watermark: i64, + thread_id: ThreadId, + mut rx: watch::Receiver, + ) -> AgentStatus { let mut heartbeat_interval = tokio::time::interval(Duration::from_secs(phase_two::JOB_HEARTBEAT_SECONDS)); heartbeat_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { - let status = status_rx.borrow().clone(); + let status = rx.borrow().clone(); if is_final_agent_status(&status) { break status; } tokio::select! { - changed = status_rx.changed() => { - if changed.is_err() { - warn!( - "lost status updates for global memory consolidation agent {consolidation_agent_id}" + update = rx.changed() => { + if update.is_err() { + tracing::warn!( + "lost status updates for global memory consolidation agent {thread_id}" ); break status; } } _ = heartbeat_interval.tick() => { - match state_db + match db .heartbeat_global_phase2_job( - &ownership_token, + &token, phase_two::JOB_LEASE_SECONDS, ) .await { Ok(true) => {} Ok(false) => { - warn!( - "memory phase-2 heartbeat lost global ownership; finalizing as failure" - ); break AgentStatus::Errored( "lost global phase-2 ownership during heartbeat".to_string(), ); } Err(err) => { - warn!( - "state db heartbeat_global_phase2_job failed during memories startup: {err}" - ); break AgentStatus::Errored(format!( "phase-2 heartbeat update failed: {err}" )); @@ -139,281 +354,30 @@ async fn run_phase2_completion_task( } } } - }; + } +} - let phase2_success = is_phase2_success(&final_status); - info!( - "memory phase-2 global consolidation complete: agent_id={consolidation_agent_id} success={phase2_success} final_status={final_status:?}" +pub(super) fn get_watermark( + claimed_watermark: i64, + latest_memories: &[codex_state::Stage1Output], +) -> i64 { + latest_memories + .iter() + .map(|memory| memory.source_updated_at.timestamp()) + .max() + .unwrap_or(claimed_watermark) + .max(claimed_watermark) // todo double check the claimed here. +} + +fn emit_metrics(session: &Arc, counters: Counters) { + let otel = session.services.otel_manager.clone(); + if counters.input > 0 { + otel.counter(metrics::MEMORY_PHASE_TWO_INPUT, counters.input, &[]); + } + + otel.counter( + metrics::MEMORY_PHASE_TWO_JOBS, + 1, + &[("status", "agent_spawned")], ); - - if phase2_success { - match state_db - .mark_global_phase2_job_succeeded(&ownership_token, completion_watermark) - .await - { - Ok(true) => {} - Ok(false) => { - debug!( - "memory phase-2 success finalization skipped after global ownership changed" - ); - } - Err(err) => { - warn!( - "state db mark_global_phase2_job_succeeded failed during memories startup: {err}" - ); - } - } - return final_status; - } - - let failure_reason = phase2_failure_reason(&final_status); - mark_phase2_failed_with_recovery(state_db.as_ref(), &ownership_token, &failure_reason).await; - warn!( - "memory phase-2 global consolidation agent finished with non-success status: agent_id={consolidation_agent_id} final_status={final_status:?}" - ); - final_status -} - -async fn mark_phase2_failed_with_recovery( - state_db: &codex_state::StateRuntime, - ownership_token: &str, - failure_reason: &str, -) { - match state_db - .mark_global_phase2_job_failed( - ownership_token, - failure_reason, - phase_two::JOB_RETRY_DELAY_SECONDS, - ) - .await - { - Ok(true) => {} - Ok(false) => match state_db - .mark_global_phase2_job_failed_if_unowned( - ownership_token, - failure_reason, - phase_two::JOB_RETRY_DELAY_SECONDS, - ) - .await - { - Ok(true) => { - debug!( - "memory phase-2 failure finalization applied fallback update for unowned running job" - ); - } - Ok(false) => { - debug!( - "memory phase-2 failure finalization skipped after global ownership changed" - ); - } - Err(err) => { - warn!( - "state db mark_global_phase2_job_failed_if_unowned failed during memories startup: {err}" - ); - } - }, - Err(err) => { - warn!("state db mark_global_phase2_job_failed failed during memories startup: {err}"); - } - } -} - -fn is_phase2_success(final_status: &AgentStatus) -> bool { - matches!(final_status, AgentStatus::Completed(_)) -} - -fn phase2_failure_reason(final_status: &AgentStatus) -> String { - format!("consolidation agent finished with status {final_status:?}") -} - -#[cfg(test)] -mod tests { - use super::is_phase2_success; - use super::phase2_failure_reason; - use super::run_phase2_completion_task; - use crate::agent::AgentStatus; - use codex_protocol::ThreadId; - use codex_state::Phase2JobClaimOutcome; - use pretty_assertions::assert_eq; - use std::sync::Arc; - - #[test] - fn phase2_success_only_for_completed_status() { - assert!(is_phase2_success(&AgentStatus::Completed(None))); - assert!(!is_phase2_success(&AgentStatus::Running)); - assert!(!is_phase2_success(&AgentStatus::Errored( - "oops".to_string() - ))); - } - - #[test] - fn phase2_failure_reason_includes_status() { - let status = AgentStatus::Errored("boom".to_string()); - let reason = phase2_failure_reason(&status); - assert!(reason.contains("consolidation agent finished with status")); - assert!(reason.contains("boom")); - } - - #[tokio::test] - async fn phase2_completion_marks_succeeded_for_completed_status() { - let codex_home = tempfile::tempdir().expect("create temp codex home"); - let state_db = Arc::new( - codex_state::StateRuntime::init( - codex_home.path().to_path_buf(), - "test-provider".to_string(), - None, - ) - .await - .expect("initialize state runtime"), - ); - let owner = ThreadId::new(); - state_db - .enqueue_global_consolidation(123) - .await - .expect("enqueue global consolidation"); - let claim = state_db - .try_claim_global_phase2_job(owner, 3_600) - .await - .expect("claim global phase-2 job"); - let ownership_token = match claim { - Phase2JobClaimOutcome::Claimed { - ownership_token, .. - } => ownership_token, - other => panic!("unexpected phase-2 claim outcome: {other:?}"), - }; - - let (_status_tx, status_rx) = tokio::sync::watch::channel(AgentStatus::Completed(None)); - run_phase2_completion_task( - Arc::clone(&state_db), - ownership_token.clone(), - 123, - ThreadId::new(), - status_rx, - ) - .await; - - let up_to_date_claim = state_db - .try_claim_global_phase2_job(ThreadId::new(), 3_600) - .await - .expect("claim up-to-date global job"); - assert_eq!(up_to_date_claim, Phase2JobClaimOutcome::SkippedNotDirty); - - state_db - .enqueue_global_consolidation(124) - .await - .expect("enqueue advanced consolidation watermark"); - let rerun_claim = state_db - .try_claim_global_phase2_job(ThreadId::new(), 3_600) - .await - .expect("claim rerun global job"); - assert!( - matches!(rerun_claim, Phase2JobClaimOutcome::Claimed { .. }), - "advanced watermark should be claimable after success finalization" - ); - } - - #[tokio::test] - async fn phase2_completion_marks_failed_when_status_updates_are_lost() { - let codex_home = tempfile::tempdir().expect("create temp codex home"); - let state_db = Arc::new( - codex_state::StateRuntime::init( - codex_home.path().to_path_buf(), - "test-provider".to_string(), - None, - ) - .await - .expect("initialize state runtime"), - ); - state_db - .enqueue_global_consolidation(456) - .await - .expect("enqueue global consolidation"); - let claim = state_db - .try_claim_global_phase2_job(ThreadId::new(), 3_600) - .await - .expect("claim global phase-2 job"); - let ownership_token = match claim { - Phase2JobClaimOutcome::Claimed { - ownership_token, .. - } => ownership_token, - other => panic!("unexpected phase-2 claim outcome: {other:?}"), - }; - - let (status_tx, status_rx) = tokio::sync::watch::channel(AgentStatus::Running); - drop(status_tx); - run_phase2_completion_task( - Arc::clone(&state_db), - ownership_token, - 456, - ThreadId::new(), - status_rx, - ) - .await; - - let claim = state_db - .try_claim_global_phase2_job(ThreadId::new(), 3_600) - .await - .expect("claim after failure finalization"); - assert_eq!( - claim, - Phase2JobClaimOutcome::SkippedNotDirty, - "failure finalization should leave global job in retry-backoff, not running ownership" - ); - } - - #[tokio::test] - async fn phase2_completion_heartbeat_loss_does_not_steal_active_other_owner() { - let codex_home = tempfile::tempdir().expect("create temp codex home"); - let state_db = Arc::new( - codex_state::StateRuntime::init( - codex_home.path().to_path_buf(), - "test-provider".to_string(), - None, - ) - .await - .expect("initialize state runtime"), - ); - state_db - .enqueue_global_consolidation(789) - .await - .expect("enqueue global consolidation"); - let claim = state_db - .try_claim_global_phase2_job(ThreadId::new(), 3_600) - .await - .expect("claim global phase-2 job"); - let claimed_token = match claim { - Phase2JobClaimOutcome::Claimed { - ownership_token, .. - } => ownership_token, - other => panic!("unexpected phase-2 claim outcome: {other:?}"), - }; - - let (_status_tx, status_rx) = tokio::sync::watch::channel(AgentStatus::Running); - run_phase2_completion_task( - Arc::clone(&state_db), - "non-owner-token".to_string(), - 789, - ThreadId::new(), - status_rx, - ) - .await; - - let claim = state_db - .try_claim_global_phase2_job(ThreadId::new(), 3_600) - .await - .expect("claim after heartbeat ownership loss"); - assert_eq!( - claim, - Phase2JobClaimOutcome::SkippedRunning, - "heartbeat ownership-loss handling should not steal a live owner lease" - ); - assert_eq!( - state_db - .mark_global_phase2_job_succeeded(claimed_token.as_str(), 789) - .await - .expect("mark original owner success"), - true, - "the original owner should still be able to finalize" - ); - } } diff --git a/codex-rs/core/src/memories/start.rs b/codex-rs/core/src/memories/start.rs index 5b47b6c2bd..3ac5fae85d 100644 --- a/codex-rs/core/src/memories/start.rs +++ b/codex-rs/core/src/memories/start.rs @@ -2,6 +2,7 @@ use crate::codex::Session; use crate::config::Config; use crate::features::Feature; use crate::memories::phase1; +use crate::memories::phase2; use codex_protocol::protocol::SessionSource; use std::sync::Arc; use tracing::warn; @@ -36,6 +37,6 @@ pub(crate) fn start_memories_startup_task( // Run phase 1. phase1::run(&session).await; // Run phase 2. - crate::memories::dispatch::run_global_memory_consolidation(&session, config).await; + phase2::run(&session, config).await; }); } diff --git a/codex-rs/core/src/memories/tests.rs b/codex-rs/core/src/memories/tests.rs index 1baea576c2..82e4517a5f 100644 --- a/codex-rs/core/src/memories/tests.rs +++ b/codex-rs/core/src/memories/tests.rs @@ -87,3 +87,498 @@ async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only assert!(raw_memories.contains(&keep_id)); assert!(raw_memories.contains("cwd: /tmp/workspace")); } + +mod phase2 { + use crate::CodexAuth; + use crate::ThreadManager; + use crate::agent::AgentControl; + use crate::codex::Session; + use crate::codex::make_session_and_context; + use crate::config::Config; + use crate::config::test_config; + use crate::memories::memory_root; + use crate::memories::phase2; + use crate::memories::raw_memories_file; + use crate::memories::rollout_summaries_dir; + use chrono::Utc; + use codex_config::Constrained; + use codex_protocol::ThreadId; + use codex_protocol::protocol::AskForApproval; + use codex_protocol::protocol::Op; + use codex_protocol::protocol::SandboxPolicy; + use codex_protocol::protocol::SessionSource; + use codex_state::Phase2JobClaimOutcome; + use codex_state::Stage1Output; + use codex_state::ThreadMetadataBuilder; + use std::path::PathBuf; + use std::sync::Arc; + use tempfile::TempDir; + + fn stage1_output_with_source_updated_at(source_updated_at: i64) -> Stage1Output { + Stage1Output { + thread_id: ThreadId::new(), + source_updated_at: chrono::DateTime::::from_timestamp(source_updated_at, 0) + .expect("valid source_updated_at timestamp"), + raw_memory: "raw memory".to_string(), + rollout_summary: "rollout summary".to_string(), + cwd: PathBuf::from("/tmp/workspace"), + generated_at: chrono::DateTime::::from_timestamp(source_updated_at + 1, 0) + .expect("valid generated_at timestamp"), + } + } + + struct DispatchHarness { + _codex_home: TempDir, + config: Arc, + session: Arc, + manager: ThreadManager, + state_db: Arc, + } + + impl DispatchHarness { + async fn new() -> Self { + let codex_home = tempfile::tempdir().expect("create temp codex home"); + let mut config = test_config(); + config.codex_home = codex_home.path().to_path_buf(); + config.cwd = config.codex_home.clone(); + let config = Arc::new(config); + + let state_db = codex_state::StateRuntime::init( + config.codex_home.clone(), + config.model_provider_id.clone(), + None, + ) + .await + .expect("initialize state db"); + + let manager = ThreadManager::with_models_provider_and_home_for_tests( + CodexAuth::from_api_key("dummy"), + config.model_provider.clone(), + config.codex_home.clone(), + ); + let (mut session, _turn_context) = make_session_and_context().await; + session.services.state_db = Some(Arc::clone(&state_db)); + session.services.agent_control = manager.agent_control(); + + Self { + _codex_home: codex_home, + config, + session: Arc::new(session), + manager, + state_db, + } + } + + async fn seed_stage1_output(&self, source_updated_at: i64) { + let thread_id = ThreadId::new(); + let mut metadata_builder = ThreadMetadataBuilder::new( + thread_id, + self.config + .codex_home + .join(format!("rollout-{thread_id}.jsonl")), + Utc::now(), + SessionSource::Cli, + ); + metadata_builder.cwd = self.config.cwd.clone(); + metadata_builder.model_provider = Some(self.config.model_provider_id.clone()); + let metadata = metadata_builder.build(&self.config.model_provider_id); + + self.state_db + .upsert_thread(&metadata) + .await + .expect("upsert thread metadata"); + + let claim = self + .state_db + .try_claim_stage1_job( + thread_id, + self.session.conversation_id, + source_updated_at, + 3_600, + 64, + ) + .await + .expect("claim stage-1 job"); + let ownership_token = match claim { + codex_state::Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token, + other => panic!("unexpected stage-1 claim outcome: {other:?}"), + }; + assert!( + self.state_db + .mark_stage1_job_succeeded( + thread_id, + &ownership_token, + source_updated_at, + "raw memory", + "rollout summary", + ) + .await + .expect("mark stage-1 success"), + "stage-1 success should enqueue global consolidation" + ); + } + + async fn shutdown_threads(&self) { + self.manager + .remove_and_close_all_threads() + .await + .expect("shutdown spawned threads"); + } + + fn user_input_ops_count(&self) -> usize { + self.manager + .captured_ops() + .into_iter() + .filter(|(_, op)| matches!(op, Op::UserInput { .. })) + .count() + } + } + + #[test] + fn completion_watermark_never_regresses_below_claimed_input_watermark() { + let stage1_output = stage1_output_with_source_updated_at(123); + + let completion = phase2::get_watermark(1_000, &[stage1_output]); + pretty_assertions::assert_eq!(completion, 1_000); + } + + #[test] + fn completion_watermark_uses_claimed_watermark_when_there_are_no_memories() { + let completion = phase2::get_watermark(777, &[]); + pretty_assertions::assert_eq!(completion, 777); + } + + #[test] + fn completion_watermark_uses_latest_memory_timestamp_when_it_is_newer() { + let older = stage1_output_with_source_updated_at(123); + let newer = stage1_output_with_source_updated_at(456); + + let completion = phase2::get_watermark(200, &[older, newer]); + pretty_assertions::assert_eq!(completion, 456); + } + + #[tokio::test] + async fn dispatch_skips_when_global_job_is_not_dirty() { + let harness = DispatchHarness::new().await; + + phase2::run(&harness.session, Arc::clone(&harness.config)).await; + + pretty_assertions::assert_eq!(harness.user_input_ops_count(), 0); + let thread_ids = harness.manager.list_thread_ids().await; + pretty_assertions::assert_eq!(thread_ids.len(), 0); + } + + #[tokio::test] + async fn dispatch_skips_when_global_job_is_already_running() { + let harness = DispatchHarness::new().await; + harness + .state_db + .enqueue_global_consolidation(123) + .await + .expect("enqueue global consolidation"); + let claimed = harness + .state_db + .try_claim_global_phase2_job(ThreadId::new(), 3_600) + .await + .expect("claim running global lock"); + assert!( + matches!(claimed, Phase2JobClaimOutcome::Claimed { .. }), + "precondition should claim the running lock" + ); + + phase2::run(&harness.session, Arc::clone(&harness.config)).await; + + let running_claim = harness + .state_db + .try_claim_global_phase2_job(ThreadId::new(), 3_600) + .await + .expect("claim while lock is still running"); + pretty_assertions::assert_eq!(running_claim, Phase2JobClaimOutcome::SkippedRunning); + pretty_assertions::assert_eq!(harness.user_input_ops_count(), 0); + let thread_ids = harness.manager.list_thread_ids().await; + pretty_assertions::assert_eq!(thread_ids.len(), 0); + } + + #[tokio::test] + async fn dispatch_reclaims_stale_global_lock_and_starts_consolidation() { + let harness = DispatchHarness::new().await; + harness.seed_stage1_output(100).await; + + let stale_claim = harness + .state_db + .try_claim_global_phase2_job(ThreadId::new(), 0) + .await + .expect("claim stale global lock"); + assert!( + matches!(stale_claim, Phase2JobClaimOutcome::Claimed { .. }), + "stale lock precondition should be claimed" + ); + + phase2::run(&harness.session, Arc::clone(&harness.config)).await; + + let running_claim = harness + .state_db + .try_claim_global_phase2_job(ThreadId::new(), 3_600) + .await + .expect("claim while running"); + pretty_assertions::assert_eq!(running_claim, Phase2JobClaimOutcome::SkippedRunning); + + let user_input_ops = harness.user_input_ops_count(); + pretty_assertions::assert_eq!(user_input_ops, 1); + let thread_ids = harness.manager.list_thread_ids().await; + pretty_assertions::assert_eq!(thread_ids.len(), 1); + let subagent = harness + .manager + .get_thread(thread_ids[0]) + .await + .expect("get consolidation thread"); + let config_snapshot = subagent.config_snapshot().await; + pretty_assertions::assert_eq!(config_snapshot.approval_policy, AskForApproval::Never); + pretty_assertions::assert_eq!(config_snapshot.cwd, memory_root(&harness.config.codex_home)); + match config_snapshot.sandbox_policy { + SandboxPolicy::WorkspaceWrite { writable_roots, .. } => { + assert!( + writable_roots + .iter() + .any(|root| root.as_path() == harness.config.codex_home.as_path()), + "consolidation subagent should have codex_home as writable root" + ); + } + other => panic!("unexpected sandbox policy: {other:?}"), + } + + harness.shutdown_threads().await; + } + + #[tokio::test] + async fn dispatch_with_empty_stage1_outputs_rebuilds_local_artifacts() { + let harness = DispatchHarness::new().await; + let root = memory_root(&harness.config.codex_home); + let summaries_dir = rollout_summaries_dir(&root); + tokio::fs::create_dir_all(&summaries_dir) + .await + .expect("create rollout summaries dir"); + + let stale_summary_path = summaries_dir.join(format!("{}.md", ThreadId::new())); + tokio::fs::write(&stale_summary_path, "stale summary\n") + .await + .expect("write stale rollout summary"); + let raw_memories_path = raw_memories_file(&root); + tokio::fs::write(&raw_memories_path, "stale raw memories\n") + .await + .expect("write stale raw memories"); + let memory_index_path = root.join("MEMORY.md"); + tokio::fs::write(&memory_index_path, "stale memory index\n") + .await + .expect("write stale memory index"); + let memory_summary_path = root.join("memory_summary.md"); + tokio::fs::write(&memory_summary_path, "stale memory summary\n") + .await + .expect("write stale memory summary"); + let stale_skill_file = root.join("skills/demo/SKILL.md"); + tokio::fs::create_dir_all( + stale_skill_file + .parent() + .expect("skills subdirectory parent should exist"), + ) + .await + .expect("create stale skills dir"); + tokio::fs::write(&stale_skill_file, "stale skill\n") + .await + .expect("write stale skill"); + + harness + .state_db + .enqueue_global_consolidation(999) + .await + .expect("enqueue global consolidation"); + + phase2::run(&harness.session, Arc::clone(&harness.config)).await; + + assert!( + !tokio::fs::try_exists(&stale_summary_path) + .await + .expect("check stale summary existence"), + "empty consolidation should prune stale rollout summary files" + ); + let raw_memories = tokio::fs::read_to_string(&raw_memories_path) + .await + .expect("read rebuilt raw memories"); + pretty_assertions::assert_eq!(raw_memories, "# Raw Memories\n\nNo raw memories yet.\n"); + assert!( + !tokio::fs::try_exists(&memory_index_path) + .await + .expect("check memory index existence"), + "empty consolidation should remove stale MEMORY.md" + ); + assert!( + !tokio::fs::try_exists(&memory_summary_path) + .await + .expect("check memory summary existence"), + "empty consolidation should remove stale memory_summary.md" + ); + assert!( + !tokio::fs::try_exists(&stale_skill_file) + .await + .expect("check stale skill existence"), + "empty consolidation should remove stale skills artifacts" + ); + assert!( + !tokio::fs::try_exists(root.join("skills")) + .await + .expect("check skills dir existence"), + "empty consolidation should remove stale skills directory" + ); + let next_claim = harness + .state_db + .try_claim_global_phase2_job(ThreadId::new(), 3_600) + .await + .expect("claim global job after empty consolidation success"); + pretty_assertions::assert_eq!(next_claim, Phase2JobClaimOutcome::SkippedNotDirty); + pretty_assertions::assert_eq!(harness.user_input_ops_count(), 0); + let thread_ids = harness.manager.list_thread_ids().await; + pretty_assertions::assert_eq!(thread_ids.len(), 0); + + harness.shutdown_threads().await; + } + + #[tokio::test] + async fn dispatch_marks_job_for_retry_when_sandbox_policy_cannot_be_overridden() { + let harness = DispatchHarness::new().await; + harness + .state_db + .enqueue_global_consolidation(99) + .await + .expect("enqueue global consolidation"); + let mut constrained_config = harness.config.as_ref().clone(); + constrained_config.permissions.sandbox_policy = + Constrained::allow_only(SandboxPolicy::DangerFullAccess); + + phase2::run(&harness.session, Arc::new(constrained_config)).await; + + let retry_claim = harness + .state_db + .try_claim_global_phase2_job(ThreadId::new(), 3_600) + .await + .expect("claim global job after sandbox policy failure"); + pretty_assertions::assert_eq!(retry_claim, Phase2JobClaimOutcome::SkippedNotDirty); + pretty_assertions::assert_eq!(harness.user_input_ops_count(), 0); + let thread_ids = harness.manager.list_thread_ids().await; + pretty_assertions::assert_eq!(thread_ids.len(), 0); + } + + #[tokio::test] + async fn dispatch_marks_job_for_retry_when_syncing_artifacts_fails() { + let harness = DispatchHarness::new().await; + harness.seed_stage1_output(100).await; + let root = memory_root(&harness.config.codex_home); + tokio::fs::write(&root, "not a directory") + .await + .expect("create file at memory root"); + + phase2::run(&harness.session, Arc::clone(&harness.config)).await; + + let retry_claim = harness + .state_db + .try_claim_global_phase2_job(ThreadId::new(), 3_600) + .await + .expect("claim global job after sync failure"); + pretty_assertions::assert_eq!(retry_claim, Phase2JobClaimOutcome::SkippedNotDirty); + pretty_assertions::assert_eq!(harness.user_input_ops_count(), 0); + let thread_ids = harness.manager.list_thread_ids().await; + pretty_assertions::assert_eq!(thread_ids.len(), 0); + } + + #[tokio::test] + async fn dispatch_marks_job_for_retry_when_rebuilding_raw_memories_fails() { + let harness = DispatchHarness::new().await; + harness.seed_stage1_output(100).await; + let root = memory_root(&harness.config.codex_home); + tokio::fs::create_dir_all(raw_memories_file(&root)) + .await + .expect("create raw_memories.md as a directory"); + + phase2::run(&harness.session, Arc::clone(&harness.config)).await; + + let retry_claim = harness + .state_db + .try_claim_global_phase2_job(ThreadId::new(), 3_600) + .await + .expect("claim global job after rebuild failure"); + pretty_assertions::assert_eq!(retry_claim, Phase2JobClaimOutcome::SkippedNotDirty); + pretty_assertions::assert_eq!(harness.user_input_ops_count(), 0); + let thread_ids = harness.manager.list_thread_ids().await; + pretty_assertions::assert_eq!(thread_ids.len(), 0); + } + + #[tokio::test] + async fn dispatch_marks_job_for_retry_when_spawn_agent_fails() { + let codex_home = tempfile::tempdir().expect("create temp codex home"); + let mut config = test_config(); + config.codex_home = codex_home.path().to_path_buf(); + config.cwd = config.codex_home.clone(); + let config = Arc::new(config); + + let state_db = codex_state::StateRuntime::init( + config.codex_home.clone(), + config.model_provider_id.clone(), + None, + ) + .await + .expect("initialize state db"); + + let (mut session, _turn_context) = make_session_and_context().await; + session.services.state_db = Some(Arc::clone(&state_db)); + session.services.agent_control = AgentControl::default(); + let session = Arc::new(session); + + let thread_id = ThreadId::new(); + let mut metadata_builder = ThreadMetadataBuilder::new( + thread_id, + config.codex_home.join(format!("rollout-{thread_id}.jsonl")), + Utc::now(), + SessionSource::Cli, + ); + metadata_builder.cwd = config.cwd.clone(); + metadata_builder.model_provider = Some(config.model_provider_id.clone()); + let metadata = metadata_builder.build(&config.model_provider_id); + state_db + .upsert_thread(&metadata) + .await + .expect("upsert thread metadata"); + + let claim = state_db + .try_claim_stage1_job(thread_id, session.conversation_id, 100, 3_600, 64) + .await + .expect("claim stage-1 job"); + let ownership_token = match claim { + codex_state::Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token, + other => panic!("unexpected stage-1 claim outcome: {other:?}"), + }; + assert!( + state_db + .mark_stage1_job_succeeded( + thread_id, + &ownership_token, + 100, + "raw memory", + "rollout summary", + ) + .await + .expect("mark stage-1 success"), + "stage-1 success should enqueue global consolidation" + ); + + phase2::run(&session, Arc::clone(&config)).await; + + let retry_claim = state_db + .try_claim_global_phase2_job(ThreadId::new(), 3_600) + .await + .expect("claim global job after spawn failure"); + pretty_assertions::assert_eq!( + retry_claim, + Phase2JobClaimOutcome::SkippedNotDirty, + "spawn failures should leave the job in retry backoff instead of running" + ); + } +} diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 3b65aa05c4..e44bf4c1d8 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -1825,6 +1825,7 @@ pub enum SubAgentSource { parent_thread_id: ThreadId, depth: i32, }, + MemoryConsolidation, Other(String), } @@ -1846,6 +1847,7 @@ impl fmt::Display for SubAgentSource { match self { SubAgentSource::Review => f.write_str("review"), SubAgentSource::Compact => f.write_str("compact"), + SubAgentSource::MemoryConsolidation => f.write_str("memory_consolidation"), SubAgentSource::ThreadSpawn { parent_thread_id, depth, From e00080cea31982f1b2d78d084217beccfe813de7 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Fri, 13 Feb 2026 14:18:15 +0000 Subject: [PATCH 3/5] feat: memories config (#11731) --- codex-rs/core/config.schema.json | 45 +++++++++++++++++ codex-rs/core/src/config/mod.rs | 56 +++++++++++++++++++++ codex-rs/core/src/config/types.rs | 72 +++++++++++++++++++++++++++ codex-rs/core/src/memories/mod.rs | 12 ++--- codex-rs/core/src/memories/phase1.rs | 37 ++++++++++---- codex-rs/core/src/memories/phase2.rs | 37 ++++++++------ codex-rs/core/src/memories/start.rs | 2 +- codex-rs/core/src/memories/storage.rs | 15 ++++-- codex-rs/core/src/memories/tests.rs | 21 +++++--- 9 files changed, 253 insertions(+), 44 deletions(-) diff --git a/codex-rs/core/config.schema.json b/codex-rs/core/config.schema.json index b95a36a78e..c712d51fa4 100644 --- a/codex-rs/core/config.schema.json +++ b/codex-rs/core/config.schema.json @@ -434,6 +434,43 @@ } ] }, + "MemoriesToml": { + "additionalProperties": false, + "description": "Memories settings loaded from config.toml.", + "properties": { + "max_raw_memories_for_global": { + "description": "Maximum number of recent raw memories retained for global consolidation.", + "format": "uint", + "minimum": 0.0, + "type": "integer" + }, + "max_rollout_age_days": { + "description": "Maximum age of the threads used for memories.", + "format": "int64", + "type": "integer" + }, + "max_rollouts_per_startup": { + "description": "Maximum number of rollout candidates processed per pass.", + "format": "uint", + "minimum": 0.0, + "type": "integer" + }, + "min_rollout_idle_hours": { + "description": "Minimum idle time between last thread activity and memory creation (hours). > 12h recommended.", + "format": "int64", + "type": "integer" + }, + "phase_1_model": { + "description": "Model used for thread summarisation.", + "type": "string" + }, + "phase_2_model": { + "description": "Model used for memory consolidation.", + "type": "string" + } + }, + "type": "object" + }, "ModeKind": { "description": "Initial collaboration mode to use when the TUI starts.", "enum": [ @@ -1481,6 +1518,14 @@ "description": "Definition for MCP servers that Codex can reach out to for tool calls.", "type": "object" }, + "memories": { + "allOf": [ + { + "$ref": "#/definitions/MemoriesToml" + } + ], + "description": "Memories subsystem settings." + }, "model": { "description": "Optional override of model selection.", "type": "string" diff --git a/codex-rs/core/src/config/mod.rs b/codex-rs/core/src/config/mod.rs index d5de27a40d..5cfdcf0441 100644 --- a/codex-rs/core/src/config/mod.rs +++ b/codex-rs/core/src/config/mod.rs @@ -7,6 +7,8 @@ use crate::config::types::History; use crate::config::types::McpServerConfig; use crate::config::types::McpServerDisabledReason; use crate::config::types::McpServerTransportConfig; +use crate::config::types::MemoriesConfig; +use crate::config::types::MemoriesToml; use crate::config::types::Notice; use crate::config::types::NotificationMethod; use crate::config::types::Notifications; @@ -289,6 +291,9 @@ pub struct Config { /// Maximum number of agent threads that can be open concurrently. pub agent_max_threads: Option, + /// Memories subsystem settings. + pub memories: MemoriesConfig, + /// Directory containing all Codex state (defaults to `~/.codex` but can be /// overridden by the `CODEX_HOME` environment variable). pub codex_home: PathBuf, @@ -1006,6 +1011,9 @@ pub struct ConfigToml { /// Agent-related settings (thread limits, etc.). pub agents: Option, + /// Memories subsystem settings. + pub memories: Option, + /// User-level skill config entries keyed by SKILL.md path. pub skills: Option, @@ -1771,6 +1779,7 @@ impl Config { .collect(), tool_output_token_limit: cfg.tool_output_token_limit, agent_max_threads, + memories: cfg.memories.unwrap_or_default().into(), codex_home, log_dir, config_layer_stack, @@ -1985,6 +1994,8 @@ mod tests { use crate::config::types::FeedbackConfigToml; use crate::config::types::HistoryPersistence; use crate::config::types::McpServerTransportConfig; + use crate::config::types::MemoriesConfig; + use crate::config::types::MemoriesToml; use crate::config::types::NotificationMethod; use crate::config::types::Notifications; use crate::config_loader::RequirementSource; @@ -2068,6 +2079,47 @@ persistence = "none" }), history_no_persistence_cfg.history ); + + let memories = r#" +[memories] +max_raw_memories_for_global = 512 +max_rollout_age_days = 42 +max_rollouts_per_startup = 9 +min_rollout_idle_hours = 24 +phase_1_model = "gpt-5-mini" +phase_2_model = "gpt-5" +"#; + let memories_cfg = + toml::from_str::(memories).expect("TOML deserialization should succeed"); + assert_eq!( + Some(MemoriesToml { + max_raw_memories_for_global: Some(512), + max_rollout_age_days: Some(42), + max_rollouts_per_startup: Some(9), + min_rollout_idle_hours: Some(24), + phase_1_model: Some("gpt-5-mini".to_string()), + phase_2_model: Some("gpt-5".to_string()), + }), + memories_cfg.memories + ); + + let config = Config::load_from_base_config_with_overrides( + memories_cfg, + ConfigOverrides::default(), + tempdir().expect("tempdir").path().to_path_buf(), + ) + .expect("load config from memories settings"); + assert_eq!( + config.memories, + MemoriesConfig { + max_raw_memories_for_global: 512, + max_rollout_age_days: 42, + max_rollouts_per_startup: 9, + min_rollout_idle_hours: 24, + phase_1_model: Some("gpt-5-mini".to_string()), + phase_2_model: Some("gpt-5".to_string()), + } + ); } #[test] @@ -4047,6 +4099,7 @@ model_verbosity = "high" project_doc_fallback_filenames: Vec::new(), tool_output_token_limit: None, agent_max_threads: DEFAULT_AGENT_MAX_THREADS, + memories: MemoriesConfig::default(), codex_home: fixture.codex_home(), log_dir: fixture.codex_home().join("log"), config_layer_stack: Default::default(), @@ -4156,6 +4209,7 @@ model_verbosity = "high" project_doc_fallback_filenames: Vec::new(), tool_output_token_limit: None, agent_max_threads: DEFAULT_AGENT_MAX_THREADS, + memories: MemoriesConfig::default(), codex_home: fixture.codex_home(), log_dir: fixture.codex_home().join("log"), config_layer_stack: Default::default(), @@ -4263,6 +4317,7 @@ model_verbosity = "high" project_doc_fallback_filenames: Vec::new(), tool_output_token_limit: None, agent_max_threads: DEFAULT_AGENT_MAX_THREADS, + memories: MemoriesConfig::default(), codex_home: fixture.codex_home(), log_dir: fixture.codex_home().join("log"), config_layer_stack: Default::default(), @@ -4356,6 +4411,7 @@ model_verbosity = "high" project_doc_fallback_filenames: Vec::new(), tool_output_token_limit: None, agent_max_threads: DEFAULT_AGENT_MAX_THREADS, + memories: MemoriesConfig::default(), codex_home: fixture.codex_home(), log_dir: fixture.codex_home().join("log"), config_layer_stack: Default::default(), diff --git a/codex-rs/core/src/config/types.rs b/codex-rs/core/src/config/types.rs index 5f54c1df28..80c16281b0 100644 --- a/codex-rs/core/src/config/types.rs +++ b/codex-rs/core/src/config/types.rs @@ -23,6 +23,10 @@ use serde::Serialize; use serde::de::Error as SerdeError; pub const DEFAULT_OTEL_ENVIRONMENT: &str = "dev"; +pub const DEFAULT_MEMORIES_MAX_ROLLOUTS_PER_STARTUP: usize = 8; +pub const DEFAULT_MEMORIES_MAX_ROLLOUT_AGE_DAYS: i64 = 30; +pub const DEFAULT_MEMORIES_MIN_ROLLOUT_IDLE_HOURS: i64 = 12; +pub const DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL: usize = 1_024; #[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, JsonSchema)] #[serde(rename_all = "kebab-case")] @@ -353,6 +357,74 @@ pub struct FeedbackConfigToml { pub enabled: Option, } +/// Memories settings loaded from config.toml. +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema)] +#[schemars(deny_unknown_fields)] +pub struct MemoriesToml { + /// Maximum number of recent raw memories retained for global consolidation. + pub max_raw_memories_for_global: Option, + /// Maximum age of the threads used for memories. + pub max_rollout_age_days: Option, + /// Maximum number of rollout candidates processed per pass. + pub max_rollouts_per_startup: Option, + /// Minimum idle time between last thread activity and memory creation (hours). > 12h recommended. + pub min_rollout_idle_hours: Option, + /// Model used for thread summarisation. + pub phase_1_model: Option, + /// Model used for memory consolidation. + pub phase_2_model: Option, +} + +/// Effective memories settings after defaults are applied. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct MemoriesConfig { + pub max_raw_memories_for_global: usize, + pub max_rollout_age_days: i64, + pub max_rollouts_per_startup: usize, + pub min_rollout_idle_hours: i64, + pub phase_1_model: Option, + pub phase_2_model: Option, +} + +impl Default for MemoriesConfig { + fn default() -> Self { + Self { + max_raw_memories_for_global: DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL, + max_rollout_age_days: DEFAULT_MEMORIES_MAX_ROLLOUT_AGE_DAYS, + max_rollouts_per_startup: DEFAULT_MEMORIES_MAX_ROLLOUTS_PER_STARTUP, + min_rollout_idle_hours: DEFAULT_MEMORIES_MIN_ROLLOUT_IDLE_HOURS, + phase_1_model: None, + phase_2_model: None, + } + } +} + +impl From for MemoriesConfig { + fn from(toml: MemoriesToml) -> Self { + let defaults = Self::default(); + Self { + max_raw_memories_for_global: toml + .max_raw_memories_for_global + .unwrap_or(defaults.max_raw_memories_for_global) + .min(4096), + max_rollout_age_days: toml + .max_rollout_age_days + .unwrap_or(defaults.max_rollout_age_days) + .clamp(0, 90), + max_rollouts_per_startup: toml + .max_rollouts_per_startup + .unwrap_or(defaults.max_rollouts_per_startup) + .min(128), + min_rollout_idle_hours: toml + .min_rollout_idle_hours + .unwrap_or(defaults.min_rollout_idle_hours) + .clamp(1, 48), + phase_1_model: toml.phase_1_model, + phase_2_model: toml.phase_2_model, + } + } +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema)] #[serde(rename_all = "snake_case")] pub enum AppDisabledReason { diff --git a/codex-rs/core/src/memories/mod.rs b/codex-rs/core/src/memories/mod.rs index 2892f1faae..22fe0fb9c9 100644 --- a/codex-rs/core/src/memories/mod.rs +++ b/codex-rs/core/src/memories/mod.rs @@ -25,10 +25,10 @@ mod artifacts { /// Phase 1 (startup extraction). mod phase_one { + /// Default model used for phase 1. + pub(super) const MODEL: &str = "gpt-5.3-codex-spark"; /// Prompt used for phase 1. pub(super) const PROMPT: &str = include_str!("../../templates/memories/stage_one_system.md"); - /// Maximum number of rollout candidates processed per startup pass. - pub(super) const MAX_ROLLOUTS_PER_STARTUP: usize = 8; /// Concurrency cap for startup memory extraction and consolidation scheduling. pub(super) const CONCURRENCY_LIMIT: usize = 8; /// Fallback stage-1 rollout truncation limit (tokens) when model metadata @@ -43,10 +43,6 @@ mod phase_one { /// Keeping this below 100% leaves room for system instructions, prompt /// framing, and model output. pub(super) const CONTEXT_WINDOW_PERCENT: i64 = 70; - /// Maximum rollout age considered for phase-1 extraction. - pub(super) const MAX_ROLLOUT_AGE_DAYS: i64 = 30; - /// Minimum rollout idle time required before phase-1 extraction. - pub(super) const MIN_ROLLOUT_IDLE_HOURS: i64 = 12; /// Lease duration (seconds) for phase-1 job ownership. pub(super) const JOB_LEASE_SECONDS: i64 = 3_600; /// Backoff delay (seconds) before retrying a failed stage-1 extraction job. @@ -57,8 +53,8 @@ mod phase_one { /// Phase 2 (aka `Consolidation`). mod phase_two { - /// Maximum number of recent raw memories retained for global consolidation. - pub(super) const MAX_RAW_MEMORIES_FOR_GLOBAL: usize = 1_024; + /// Default model used for phase 2. + pub(super) const MODEL: &str = "gpt-5.3-codex"; /// Lease duration (seconds) for phase-2 consolidation job ownership. pub(super) const JOB_LEASE_SECONDS: i64 = 3_600; /// Backoff delay (seconds) before retrying a failed phase-2 consolidation diff --git a/codex-rs/core/src/memories/phase1.rs b/codex-rs/core/src/memories/phase1.rs index e11d290097..a0cf88cc84 100644 --- a/codex-rs/core/src/memories/phase1.rs +++ b/codex-rs/core/src/memories/phase1.rs @@ -2,6 +2,8 @@ use crate::Prompt; use crate::RolloutRecorder; use crate::codex::Session; use crate::codex::TurnContext; +use crate::config::Config; +use crate::config::types::MemoriesConfig; use crate::error::CodexErr; use crate::memories::metrics; use crate::memories::phase_one; @@ -78,9 +80,9 @@ struct StageOneOutput { /// 2) build one stage-1 request context /// 3) run stage-1 extraction jobs in parallel /// 4) emit metrics and logs -pub(in crate::memories) async fn run(session: &Arc) { +pub(in crate::memories) async fn run(session: &Arc, config: &Config) { // 1. Claim startup job. - let Some(claimed_candidates) = claim_startup_jobs(session).await else { + let Some(claimed_candidates) = claim_startup_jobs(session, &config.memories).await else { return; }; if claimed_candidates.is_empty() { @@ -93,7 +95,7 @@ pub(in crate::memories) async fn run(session: &Arc) { } // 2. Build request. - let stage_one_context = build_request_context(session).await; + let stage_one_context = build_request_context(session, config).await; // 3. Run the parallel sampling. let outcomes = run_jobs(session, claimed_candidates, stage_one_context).await; @@ -129,18 +131,22 @@ impl RequestContext { pub(in crate::memories) fn from_turn_context( turn_context: &TurnContext, turn_metadata_header: Option, + model_info: ModelInfo, ) -> Self { Self { - model_info: turn_context.model_info.clone(), + model_info, + turn_metadata_header, otel_manager: turn_context.otel_manager.clone(), reasoning_effort: turn_context.reasoning_effort, reasoning_summary: turn_context.reasoning_summary, - turn_metadata_header, } } } -async fn claim_startup_jobs(session: &Arc) -> Option> { +async fn claim_startup_jobs( + session: &Arc, + memories_config: &MemoriesConfig, +) -> Option> { let Some(state_db) = session.services.state_db.as_deref() else { // This should not happen. warn!("state db unavailable while claiming phase-1 startup jobs; skipping"); @@ -157,9 +163,9 @@ async fn claim_startup_jobs(session: &Arc) -> Option) -> Option) -> RequestContext { +async fn build_request_context(session: &Arc, config: &Config) -> RequestContext { + let model_name = config + .memories + .phase_1_model + .clone() + .unwrap_or(phase_one::MODEL.to_string()); + let model = session + .services + .models_manager + .get_model_info(&model_name, config) + .await; let turn_context = session.new_default_turn().await; RequestContext::from_turn_context( turn_context.as_ref(), turn_context.resolve_turn_metadata_header().await, + model, ) } diff --git a/codex-rs/core/src/memories/phase2.rs b/codex-rs/core/src/memories/phase2.rs index a43094a726..28048498ce 100644 --- a/codex-rs/core/src/memories/phase2.rs +++ b/codex-rs/core/src/memories/phase2.rs @@ -41,6 +41,7 @@ pub(super) async fn run(session: &Arc, config: Arc) { return; }; let root = memory_root(&config.codex_home); + let max_raw_memories = config.memories.max_raw_memories_for_global; // 1. Claim the job. let claim = match job::claim(session, db).await { @@ -64,10 +65,7 @@ pub(super) async fn run(session: &Arc, config: Arc) { }; // 3. Query the memories - let raw_memories = match db - .list_stage1_outputs_for_global(phase_two::MAX_RAW_MEMORIES_FOR_GLOBAL) - .await - { + let raw_memories = match db.list_stage1_outputs_for_global(max_raw_memories).await { Ok(memories) => memories, Err(err) => { tracing::error!("failed to list stage1 outputs from global: {}", err); @@ -80,13 +78,17 @@ pub(super) async fn run(session: &Arc, config: Arc) { // 4. Update the file system by syncing the raw memories with the one extracted from DB at // step 3 // [`rollout_summaries/`] - if let Err(err) = sync_rollout_summaries_from_memories(&root, &raw_memories).await { + if let Err(err) = + sync_rollout_summaries_from_memories(&root, &raw_memories, max_raw_memories).await + { tracing::error!("failed syncing local memory artifacts for global consolidation: {err}"); job::failed(session, db, &claim, "failed_sync_artifacts").await; return; } // [`raw_memories.md`] - if let Err(err) = rebuild_raw_memories_file_from_memories(&root, &raw_memories).await { + if let Err(err) = + rebuild_raw_memories_file_from_memories(&root, &raw_memories, max_raw_memories).await + { tracing::error!("failed syncing local memory artifacts for global consolidation: {err}"); job::failed(session, db, &claim, "failed_rebuild_raw_memories").await; return; @@ -207,20 +209,19 @@ mod agent { pub(super) fn get_config(config: Arc) -> Option { let root = memory_root(&config.codex_home); - let mut consolidation_config = config.as_ref().clone(); + let mut agent_config = config.as_ref().clone(); - consolidation_config.cwd = root; + agent_config.cwd = root; // Approval policy - consolidation_config.permissions.approval_policy = - Constrained::allow_only(AskForApproval::Never); + agent_config.permissions.approval_policy = Constrained::allow_only(AskForApproval::Never); // Sandbox policy let mut writable_roots = Vec::new(); - match AbsolutePathBuf::from_absolute_path(consolidation_config.codex_home.clone()) { + match AbsolutePathBuf::from_absolute_path(agent_config.codex_home.clone()) { Ok(codex_home) => writable_roots.push(codex_home), Err(err) => warn!( "memory phase-2 consolidation could not add codex_home writable root {}: {err}", - consolidation_config.codex_home.display() + agent_config.codex_home.display() ), } // The consolidation agent only needs local codex_home write access and no network. @@ -231,13 +232,21 @@ mod agent { exclude_tmpdir_env_var: false, exclude_slash_tmp: false, }; - consolidation_config + agent_config .permissions .sandbox_policy .set(consolidation_sandbox_policy) .ok()?; - Some(consolidation_config) + agent_config.model = Some( + config + .memories + .phase_2_model + .clone() + .unwrap_or(phase_two::MODEL.to_string()), + ); + + Some(agent_config) } pub(super) fn get_prompt(config: Arc) -> Vec { diff --git a/codex-rs/core/src/memories/start.rs b/codex-rs/core/src/memories/start.rs index 3ac5fae85d..b93846857f 100644 --- a/codex-rs/core/src/memories/start.rs +++ b/codex-rs/core/src/memories/start.rs @@ -35,7 +35,7 @@ pub(crate) fn start_memories_startup_task( }; // Run phase 1. - phase1::run(&session).await; + phase1::run(&session, &config).await; // Run phase 2. phase2::run(&session, config).await; }); diff --git a/codex-rs/core/src/memories/storage.rs b/codex-rs/core/src/memories/storage.rs index dd39815b36..be888d3679 100644 --- a/codex-rs/core/src/memories/storage.rs +++ b/codex-rs/core/src/memories/storage.rs @@ -5,7 +5,6 @@ use std::path::Path; use tracing::warn; use crate::memories::ensure_layout; -use crate::memories::phase_two; use crate::memories::raw_memories_file; use crate::memories::rollout_summaries_dir; @@ -15,21 +14,23 @@ use crate::memories::rollout_summaries_dir; pub(super) async fn rebuild_raw_memories_file_from_memories( root: &Path, memories: &[Stage1Output], + max_raw_memories_for_global: usize, ) -> std::io::Result<()> { ensure_layout(root).await?; - rebuild_raw_memories_file(root, memories).await + rebuild_raw_memories_file(root, memories, max_raw_memories_for_global).await } /// Syncs canonical rollout summary files from DB-backed stage-1 output rows. pub(super) async fn sync_rollout_summaries_from_memories( root: &Path, memories: &[Stage1Output], + max_raw_memories_for_global: usize, ) -> std::io::Result<()> { ensure_layout(root).await?; let retained = memories .iter() - .take(phase_two::MAX_RAW_MEMORIES_FOR_GLOBAL) + .take(max_raw_memories_for_global) .collect::>(); let keep = retained .iter() @@ -62,10 +63,14 @@ pub(super) async fn sync_rollout_summaries_from_memories( Ok(()) } -async fn rebuild_raw_memories_file(root: &Path, memories: &[Stage1Output]) -> std::io::Result<()> { +async fn rebuild_raw_memories_file( + root: &Path, + memories: &[Stage1Output], + max_raw_memories_for_global: usize, +) -> std::io::Result<()> { let retained = memories .iter() - .take(phase_two::MAX_RAW_MEMORIES_FOR_GLOBAL) + .take(max_raw_memories_for_global) .collect::>(); let mut body = String::from("# Raw Memories\n\n"); diff --git a/codex-rs/core/src/memories/tests.rs b/codex-rs/core/src/memories/tests.rs index 82e4517a5f..5e0068565a 100644 --- a/codex-rs/core/src/memories/tests.rs +++ b/codex-rs/core/src/memories/tests.rs @@ -1,5 +1,6 @@ use super::storage::rebuild_raw_memories_file_from_memories; use super::storage::sync_rollout_summaries_from_memories; +use crate::config::types::DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL; use crate::memories::ensure_layout; use crate::memories::memory_root; use crate::memories::raw_memories_file; @@ -70,12 +71,20 @@ async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only generated_at: Utc.timestamp_opt(101, 0).single().expect("timestamp"), }]; - sync_rollout_summaries_from_memories(&root, &memories) - .await - .expect("sync rollout summaries"); - rebuild_raw_memories_file_from_memories(&root, &memories) - .await - .expect("rebuild raw memories"); + sync_rollout_summaries_from_memories( + &root, + &memories, + DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL, + ) + .await + .expect("sync rollout summaries"); + rebuild_raw_memories_file_from_memories( + &root, + &memories, + DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL, + ) + .await + .expect("rebuild raw memories"); assert!(keep_path.is_file()); assert!(!drop_path.exists()); From bc80a4a8edd397b077c374341a5c072b41b77484 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Fri, 13 Feb 2026 15:16:57 +0000 Subject: [PATCH 4/5] feat: increase windows workers stack (#11736) Switched arg0 runtime initialization from tokio::runtime::Runtime::new() to an explicit multi-thread builder that sets the thread stack size to 16MiB. This is only for Windows for now but we might need to do this for others in the future. This is required because Codex becomes quite large and Windows tends to consume stack a little bit faster (this is a known thing even though everyone seems to have different theory on it) --- codex-rs/arg0/src/lib.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/codex-rs/arg0/src/lib.rs b/codex-rs/arg0/src/lib.rs index 9c455ddbba..9563737839 100644 --- a/codex-rs/arg0/src/lib.rs +++ b/codex-rs/arg0/src/lib.rs @@ -12,6 +12,8 @@ const LINUX_SANDBOX_ARG0: &str = "codex-linux-sandbox"; const APPLY_PATCH_ARG0: &str = "apply_patch"; const MISSPELLED_APPLY_PATCH_ARG0: &str = "applypatch"; const LOCK_FILENAME: &str = ".lock"; +#[cfg(target_os = "windows")] +const WINDOWS_TOKIO_WORKER_STACK_SIZE_BYTES: usize = 16 * 1024 * 1024; /// Keeps the per-session PATH entry alive and locked for the process lifetime. pub struct Arg0PathEntryGuard { @@ -112,7 +114,7 @@ where // Regular invocation – create a Tokio runtime and execute the provided // async entry-point. - let runtime = tokio::runtime::Runtime::new()?; + let runtime = build_runtime()?; runtime.block_on(async move { let codex_linux_sandbox_exe: Option = if cfg!(target_os = "linux") { std::env::current_exe().ok() @@ -124,6 +126,18 @@ where }) } +fn build_runtime() -> anyhow::Result { + let mut builder = tokio::runtime::Builder::new_multi_thread(); + builder.enable_all(); + #[cfg(target_os = "windows")] + { + // Defensive hardening: Windows worker threads have lower effective + // stack headroom, so use a larger stack for runtime workers. + builder.thread_stack_size(WINDOWS_TOKIO_WORKER_STACK_SIZE_BYTES); + } + Ok(builder.build()?) +} + const ILLEGAL_ENV_VAR_PREFIX: &str = "CODEX_"; /// Load env vars from ~/.codex/.env. From db66d827befa5d5eee161834b88969a83d0b3c2f Mon Sep 17 00:00:00 2001 From: jif-oai Date: Fri, 13 Feb 2026 15:24:03 +0000 Subject: [PATCH 5/5] feat: add slug in name (#11739) --- codex-rs/core/src/memories/phase1.rs | 11 ++- codex-rs/core/src/memories/storage.rs | 93 +++++++++++++++++-- codex-rs/core/src/memories/tests.rs | 92 +++++++++++++++++- .../0009_stage1_outputs_rollout_slug.sql | 2 + codex-rs/state/src/model/memories.rs | 4 + codex-rs/state/src/runtime.rs | 42 ++++++++- codex-rs/state/src/runtime/memories.rs | 15 ++- 7 files changed, 237 insertions(+), 22 deletions(-) create mode 100644 codex-rs/state/migrations/0009_stage1_outputs_rollout_slug.sql diff --git a/codex-rs/core/src/memories/phase1.rs b/codex-rs/core/src/memories/phase1.rs index a0cf88cc84..1d5fc9e506 100644 --- a/codex-rs/core/src/memories/phase1.rs +++ b/codex-rs/core/src/memories/phase1.rs @@ -69,10 +69,9 @@ struct StageOneOutput { /// Compact summary line used for routing and indexing. #[serde(rename = "rollout_summary")] pub(crate) rollout_summary: String, - /// Optional slug accepted from stage-1 output for forward compatibility. - /// This is currently ignored by downstream storage and naming, which remain thread-id based. + /// Optional slug used to derive rollout summary artifact filenames. #[serde(default, rename = "rollout_slug")] - pub(crate) _rollout_slug: Option, + pub(crate) rollout_slug: Option, } /// Runs memory phase 1 in strict step order: @@ -122,7 +121,7 @@ pub fn output_schema() -> Value { "rollout_slug": { "type": "string" }, "raw_memory": { "type": "string" } }, - "required": ["rollout_summary", "rollout_slug", "raw_memory"], + "required": ["rollout_summary", "raw_memory"], "additionalProperties": false }) } @@ -268,6 +267,7 @@ mod job { thread.updated_at.timestamp(), &stage_one_output.raw_memory, &stage_one_output.rollout_summary, + stage_one_output.rollout_slug.as_deref(), ) .await, token_usage, @@ -348,6 +348,7 @@ mod job { let mut output: StageOneOutput = serde_json::from_str(&result)?; output.raw_memory = redact_secrets(output.raw_memory); output.rollout_summary = redact_secrets(output.rollout_summary); + output.rollout_slug = output.rollout_slug.map(redact_secrets); Ok((output, token_usage)) } @@ -401,6 +402,7 @@ mod job { source_updated_at: i64, raw_memory: &str, rollout_summary: &str, + rollout_slug: Option<&str>, ) -> JobOutcome { let Some(state_db) = session.services.state_db.as_deref() else { return JobOutcome::Failed; @@ -413,6 +415,7 @@ mod job { source_updated_at, raw_memory, rollout_summary, + rollout_slug, ) .await .unwrap_or(false) diff --git a/codex-rs/core/src/memories/storage.rs b/codex-rs/core/src/memories/storage.rs index be888d3679..a596ea6f0c 100644 --- a/codex-rs/core/src/memories/storage.rs +++ b/codex-rs/core/src/memories/storage.rs @@ -34,7 +34,7 @@ pub(super) async fn sync_rollout_summaries_from_memories( .collect::>(); let keep = retained .iter() - .map(|memory| memory.thread_id.to_string()) + .map(|memory| rollout_summary_file_stem(memory)) .collect::>(); prune_rollout_summaries(root, &keep).await?; @@ -113,10 +113,10 @@ async fn prune_rollout_summaries(root: &Path, keep: &BTreeSet) -> std::i let Some(file_name) = path.file_name().and_then(|name| name.to_str()) else { continue; }; - let Some(thread_id) = extract_thread_id_from_rollout_summary_filename(file_name) else { + let Some(stem) = file_name.strip_suffix(".md") else { continue; }; - if !keep.contains(thread_id) + if !keep.contains(stem) && let Err(err) = tokio::fs::remove_file(&path).await && err.kind() != std::io::ErrorKind::NotFound { @@ -134,7 +134,8 @@ async fn write_rollout_summary_for_thread( root: &Path, memory: &Stage1Output, ) -> std::io::Result<()> { - let path = rollout_summaries_dir(root).join(format!("{}.md", memory.thread_id)); + let file_stem = rollout_summary_file_stem(memory); + let path = rollout_summaries_dir(root).join(format!("{file_stem}.md")); let mut body = String::new(); writeln!(body, "thread_id: {}", memory.thread_id) @@ -155,7 +156,85 @@ async fn write_rollout_summary_for_thread( tokio::fs::write(path, body).await } -fn extract_thread_id_from_rollout_summary_filename(file_name: &str) -> Option<&str> { - let stem = file_name.strip_suffix(".md")?; - if stem.is_empty() { None } else { Some(stem) } +fn rollout_summary_file_stem(memory: &Stage1Output) -> String { + const ROLLOUT_SLUG_MAX_LEN: usize = 20; + + let thread_id = memory.thread_id.to_string(); + let Some(raw_slug) = memory.rollout_slug.as_deref() else { + return thread_id; + }; + + let mut slug = String::with_capacity(ROLLOUT_SLUG_MAX_LEN); + for ch in raw_slug.chars() { + if slug.len() >= ROLLOUT_SLUG_MAX_LEN { + break; + } + + if ch.is_ascii_alphanumeric() { + slug.push(ch.to_ascii_lowercase()); + } else { + slug.push('_'); + } + } + + while slug.ends_with('_') { + slug.pop(); + } + + if slug.is_empty() { + thread_id + } else { + format!("{thread_id}-{slug}") + } +} + +#[cfg(test)] +mod tests { + use super::rollout_summary_file_stem; + use chrono::TimeZone; + use chrono::Utc; + use codex_protocol::ThreadId; + use codex_state::Stage1Output; + use pretty_assertions::assert_eq; + use std::path::PathBuf; + + fn stage1_output_with_slug(rollout_slug: Option<&str>) -> Stage1Output { + Stage1Output { + thread_id: ThreadId::new(), + source_updated_at: Utc.timestamp_opt(123, 0).single().expect("timestamp"), + raw_memory: "raw memory".to_string(), + rollout_summary: "summary".to_string(), + rollout_slug: rollout_slug.map(ToString::to_string), + cwd: PathBuf::from("/tmp/workspace"), + generated_at: Utc.timestamp_opt(124, 0).single().expect("timestamp"), + } + } + + #[test] + fn rollout_summary_file_stem_uses_thread_id_when_slug_missing() { + let memory = stage1_output_with_slug(None); + let thread_id = memory.thread_id.to_string(); + + assert_eq!(rollout_summary_file_stem(&memory), thread_id); + } + + #[test] + fn rollout_summary_file_stem_sanitizes_and_truncates_slug() { + let memory = + stage1_output_with_slug(Some("Unsafe Slug/With Spaces & Symbols + EXTRA_LONG_12345")); + let thread_id = memory.thread_id.to_string(); + + assert_eq!( + rollout_summary_file_stem(&memory), + format!("{thread_id}-unsafe_slug_with_spa") + ); + } + + #[test] + fn rollout_summary_file_stem_uses_thread_id_when_slug_is_empty() { + let memory = stage1_output_with_slug(Some("")); + let thread_id = memory.thread_id.to_string(); + + assert_eq!(rollout_summary_file_stem(&memory), thread_id); + } } diff --git a/codex-rs/core/src/memories/tests.rs b/codex-rs/core/src/memories/tests.rs index 5e0068565a..900cc21840 100644 --- a/codex-rs/core/src/memories/tests.rs +++ b/codex-rs/core/src/memories/tests.rs @@ -22,7 +22,7 @@ fn memory_root_uses_shared_global_path() { } #[test] -fn stage_one_output_schema_requires_all_declared_properties() { +fn stage_one_output_schema_keeps_rollout_slug_optional() { let schema = crate::memories::phase1::output_schema(); let properties = schema .get("properties") @@ -33,16 +33,17 @@ fn stage_one_output_schema_requires_all_declared_properties() { .and_then(Value::as_array) .expect("required array"); - let mut property_keys = properties.keys().map(String::as_str).collect::>(); - property_keys.sort_unstable(); - let mut required_keys = required .iter() .map(|key| key.as_str().expect("required key string")) .collect::>(); required_keys.sort_unstable(); - assert_eq!(required_keys, property_keys); + assert!( + properties.contains_key("rollout_slug"), + "schema should declare rollout_slug" + ); + assert_eq!(required_keys, vec!["raw_memory", "rollout_summary"]); } #[tokio::test] @@ -67,6 +68,7 @@ async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only source_updated_at: Utc.timestamp_opt(100, 0).single().expect("timestamp"), raw_memory: "raw memory".to_string(), rollout_summary: "short summary".to_string(), + rollout_slug: None, cwd: PathBuf::from("/tmp/workspace"), generated_at: Utc.timestamp_opt(101, 0).single().expect("timestamp"), }]; @@ -97,6 +99,83 @@ async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only assert!(raw_memories.contains("cwd: /tmp/workspace")); } +#[tokio::test] +async fn sync_rollout_summaries_uses_thread_id_and_sanitized_slug_filename() { + let dir = tempdir().expect("tempdir"); + let root = dir.path().join("memory"); + ensure_layout(&root).await.expect("ensure layout"); + + let thread_id = ThreadId::new(); + let stale_unslugged_path = rollout_summaries_dir(&root).join(format!("{thread_id}.md")); + let stale_old_slug_path = + rollout_summaries_dir(&root).join(format!("{thread_id}--old-slug.md")); + tokio::fs::write(&stale_unslugged_path, "stale") + .await + .expect("write stale unslugged file"); + tokio::fs::write(&stale_old_slug_path, "stale") + .await + .expect("write stale old-slug file"); + + let memories = vec![Stage1Output { + thread_id, + source_updated_at: Utc.timestamp_opt(200, 0).single().expect("timestamp"), + raw_memory: "raw memory".to_string(), + rollout_summary: "short summary".to_string(), + rollout_slug: Some("Unsafe Slug/With Spaces & Symbols + EXTRA_LONG_12345".to_string()), + cwd: PathBuf::from("/tmp/workspace"), + generated_at: Utc.timestamp_opt(201, 0).single().expect("timestamp"), + }]; + + sync_rollout_summaries_from_memories( + &root, + &memories, + DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL, + ) + .await + .expect("sync rollout summaries"); + + let mut dir = tokio::fs::read_dir(rollout_summaries_dir(&root)) + .await + .expect("open rollout summaries dir"); + let mut files = Vec::new(); + while let Some(entry) = dir.next_entry().await.expect("read dir entry") { + files.push(entry.file_name().to_string_lossy().to_string()); + } + files.sort_unstable(); + + assert_eq!(files.len(), 1); + let file_name = &files[0]; + let stem = file_name + .strip_suffix(".md") + .expect("rollout summary file should end with .md"); + let slug = stem + .strip_prefix(&format!("{thread_id}-")) + .expect("rollout summary filename should include thread id and slug"); + assert!(slug.len() <= 20, "slug should be capped at 20 chars"); + assert!( + slug.chars() + .all(|ch| ch.is_ascii_lowercase() || ch.is_ascii_digit() || ch == '_'), + "slug should be file-safe lowercase ascii with underscores" + ); + + let summary = tokio::fs::read_to_string(rollout_summaries_dir(&root).join(file_name)) + .await + .expect("read rollout summary"); + assert!(summary.contains(&format!("thread_id: {thread_id}"))); + assert!( + !tokio::fs::try_exists(&stale_unslugged_path) + .await + .expect("check stale unslugged path"), + "slugged sync should prune stale unslugged filename for same thread" + ); + assert!( + !tokio::fs::try_exists(&stale_old_slug_path) + .await + .expect("check stale old slug path"), + "slugged sync should prune stale slugged filename for same thread" + ); +} + mod phase2 { use crate::CodexAuth; use crate::ThreadManager; @@ -130,6 +209,7 @@ mod phase2 { .expect("valid source_updated_at timestamp"), raw_memory: "raw memory".to_string(), rollout_summary: "rollout summary".to_string(), + rollout_slug: None, cwd: PathBuf::from("/tmp/workspace"), generated_at: chrono::DateTime::::from_timestamp(source_updated_at + 1, 0) .expect("valid generated_at timestamp"), @@ -220,6 +300,7 @@ mod phase2 { source_updated_at, "raw memory", "rollout summary", + None, ) .await .expect("mark stage-1 success"), @@ -572,6 +653,7 @@ mod phase2 { 100, "raw memory", "rollout summary", + None, ) .await .expect("mark stage-1 success"), diff --git a/codex-rs/state/migrations/0009_stage1_outputs_rollout_slug.sql b/codex-rs/state/migrations/0009_stage1_outputs_rollout_slug.sql new file mode 100644 index 0000000000..9b3a1e077d --- /dev/null +++ b/codex-rs/state/migrations/0009_stage1_outputs_rollout_slug.sql @@ -0,0 +1,2 @@ +ALTER TABLE stage1_outputs +ADD COLUMN rollout_slug TEXT; diff --git a/codex-rs/state/src/model/memories.rs b/codex-rs/state/src/model/memories.rs index aff702d673..813c999395 100644 --- a/codex-rs/state/src/model/memories.rs +++ b/codex-rs/state/src/model/memories.rs @@ -15,6 +15,7 @@ pub struct Stage1Output { pub source_updated_at: DateTime, pub raw_memory: String, pub rollout_summary: String, + pub rollout_slug: Option, pub cwd: PathBuf, pub generated_at: DateTime, } @@ -25,6 +26,7 @@ pub(crate) struct Stage1OutputRow { source_updated_at: i64, raw_memory: String, rollout_summary: String, + rollout_slug: Option, cwd: String, generated_at: i64, } @@ -36,6 +38,7 @@ impl Stage1OutputRow { source_updated_at: row.try_get("source_updated_at")?, raw_memory: row.try_get("raw_memory")?, rollout_summary: row.try_get("rollout_summary")?, + rollout_slug: row.try_get("rollout_slug")?, cwd: row.try_get("cwd")?, generated_at: row.try_get("generated_at")?, }) @@ -51,6 +54,7 @@ impl TryFrom for Stage1Output { source_updated_at: epoch_seconds_to_datetime(row.source_updated_at)?, raw_memory: row.raw_memory, rollout_summary: row.rollout_summary, + rollout_slug: row.rollout_slug, cwd: PathBuf::from(row.cwd), generated_at: epoch_seconds_to_datetime(row.generated_at)?, }) diff --git a/codex-rs/state/src/runtime.rs b/codex-rs/state/src/runtime.rs index 77930f8001..4cc9c63b17 100644 --- a/codex-rs/state/src/runtime.rs +++ b/codex-rs/state/src/runtime.rs @@ -1143,7 +1143,14 @@ WHERE id = 1 assert!( runtime - .mark_stage1_job_succeeded(thread_id, ownership_token.as_str(), 100, "raw", "sum") + .mark_stage1_job_succeeded( + thread_id, + ownership_token.as_str(), + 100, + "raw", + "sum", + None, + ) .await .expect("mark stage1 succeeded"), "stage1 success should finalize for current token" @@ -1492,6 +1499,7 @@ WHERE id = 1 up_to_date.updated_at.timestamp(), "raw", "summary", + None, ) .await .expect("mark up-to-date thread succeeded"), @@ -1715,6 +1723,7 @@ WHERE kind = 'memory_stage1' claim.thread.updated_at.timestamp(), "raw", "summary", + None, ) .await .expect("mark first-batch stage1 success"), @@ -1766,7 +1775,14 @@ WHERE kind = 'memory_stage1' }; assert!( runtime - .mark_stage1_job_succeeded(thread_id, ownership_token.as_str(), 100, "raw", "sum") + .mark_stage1_job_succeeded( + thread_id, + ownership_token.as_str(), + 100, + "raw", + "sum", + None, + ) .await .expect("mark stage1 succeeded"), "mark stage1 succeeded should write stage1_outputs" @@ -2058,6 +2074,7 @@ WHERE kind = 'memory_stage1' 100, "raw memory a", "summary a", + None, ) .await .expect("mark stage1 succeeded a"), @@ -2080,6 +2097,7 @@ WHERE kind = 'memory_stage1' 101, "raw memory b", "summary b", + Some("rollout-b"), ) .await .expect("mark stage1 succeeded b"), @@ -2093,9 +2111,11 @@ WHERE kind = 'memory_stage1' assert_eq!(outputs.len(), 2); assert_eq!(outputs[0].thread_id, thread_id_b); assert_eq!(outputs[0].rollout_summary, "summary b"); + assert_eq!(outputs[0].rollout_slug.as_deref(), Some("rollout-b")); assert_eq!(outputs[0].cwd, codex_home.join("workspace-b")); assert_eq!(outputs[1].thread_id, thread_id_a); assert_eq!(outputs[1].rollout_summary, "summary a"); + assert_eq!(outputs[1].rollout_slug, None); assert_eq!(outputs[1].cwd, codex_home.join("workspace-a")); let _ = tokio::fs::remove_dir_all(codex_home).await; @@ -2208,7 +2228,14 @@ VALUES (?, ?, ?, ?, ?) }; assert!( runtime - .mark_stage1_job_succeeded(thread_a, token_a.as_str(), 100, "raw-a", "summary-a") + .mark_stage1_job_succeeded( + thread_a, + token_a.as_str(), + 100, + "raw-a", + "summary-a", + None, + ) .await .expect("mark stage1 succeeded a"), "stage1 success should persist output for thread a" @@ -2224,7 +2251,14 @@ VALUES (?, ?, ?, ?, ?) }; assert!( runtime - .mark_stage1_job_succeeded(thread_b, token_b.as_str(), 101, "raw-b", "summary-b") + .mark_stage1_job_succeeded( + thread_b, + token_b.as_str(), + 101, + "raw-b", + "summary-b", + None, + ) .await .expect("mark stage1 succeeded b"), "stage1 success should persist output for thread b" diff --git a/codex-rs/state/src/runtime/memories.rs b/codex-rs/state/src/runtime/memories.rs index 4d1f6ffd59..c2a7f1fe91 100644 --- a/codex-rs/state/src/runtime/memories.rs +++ b/codex-rs/state/src/runtime/memories.rs @@ -191,7 +191,13 @@ LEFT JOIN jobs let rows = sqlx::query( r#" -SELECT so.thread_id, so.source_updated_at, so.raw_memory, so.rollout_summary, so.generated_at +SELECT + so.thread_id, + so.source_updated_at, + so.raw_memory, + so.rollout_summary, + so.rollout_slug, + so.generated_at , COALESCE(t.cwd, '') AS cwd FROM stage1_outputs AS so LEFT JOIN threads AS t @@ -407,6 +413,7 @@ WHERE kind = ? AND job_key = ? /// - sets `status='done'` and `last_success_watermark = input_watermark` /// - upserts `stage1_outputs` for the thread, replacing existing output only /// when `source_updated_at` is newer or equal + /// - persists optional `rollout_slug` for rollout summary artifact naming /// - enqueues/advances the global phase-2 job watermark using /// `source_updated_at` pub async fn mark_stage1_job_succeeded( @@ -416,6 +423,7 @@ WHERE kind = ? AND job_key = ? source_updated_at: i64, raw_memory: &str, rollout_summary: &str, + rollout_slug: Option<&str>, ) -> anyhow::Result { let now = Utc::now().timestamp(); let thread_id = thread_id.to_string(); @@ -454,12 +462,14 @@ INSERT INTO stage1_outputs ( source_updated_at, raw_memory, rollout_summary, + rollout_slug, generated_at -) VALUES (?, ?, ?, ?, ?) +) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT(thread_id) DO UPDATE SET source_updated_at = excluded.source_updated_at, raw_memory = excluded.raw_memory, rollout_summary = excluded.rollout_summary, + rollout_slug = excluded.rollout_slug, generated_at = excluded.generated_at WHERE excluded.source_updated_at >= stage1_outputs.source_updated_at "#, @@ -468,6 +478,7 @@ WHERE excluded.source_updated_at >= stage1_outputs.source_updated_at .bind(source_updated_at) .bind(raw_memory) .bind(rollout_summary) + .bind(rollout_slug) .bind(now) .execute(&mut *tx) .await?;