mirror of
https://github.com/openai/codex.git
synced 2026-06-04 12:22:15 +00:00
Compare commits
2 Commits
abhinav/pr
...
starr/agen
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8806787833 | ||
|
|
479d29d6e1 |
@@ -25,6 +25,7 @@ pub use extensions::prune_old_extension_resources;
|
||||
pub use prompts::build_consolidation_prompt;
|
||||
pub use prompts::build_stage_one_input_message;
|
||||
pub use start::start_memories_startup_task;
|
||||
pub use start::start_memories_startup_task_with_store;
|
||||
pub use storage::rebuild_raw_memories_file_from_memories;
|
||||
pub use storage::rollout_summary_file_stem;
|
||||
pub use storage::sync_rollout_summaries_from_memories;
|
||||
|
||||
@@ -18,6 +18,7 @@ use codex_protocol::protocol::TokenUsage;
|
||||
use codex_rollout::INTERACTIVE_SESSION_SOURCES;
|
||||
use codex_rollout::should_persist_response_item_for_memories;
|
||||
use codex_secrets::redact_secrets;
|
||||
use codex_state::GeneratedMemoryStore;
|
||||
use futures::StreamExt;
|
||||
use serde::Deserialize;
|
||||
use serde_json::Value;
|
||||
@@ -67,12 +68,17 @@ struct StageOneOutput {
|
||||
/// 2) build one stage-1 request context
|
||||
/// 3) run stage-1 extraction jobs in parallel
|
||||
/// 4) emit metrics and logs
|
||||
pub async fn run(context: Arc<MemoryStartupContext>, config: Arc<Config>) {
|
||||
pub async fn run(
|
||||
context: Arc<MemoryStartupContext>,
|
||||
config: Arc<Config>,
|
||||
store: Arc<dyn GeneratedMemoryStore>,
|
||||
) {
|
||||
let stage_one_context = build_request_context(context.as_ref(), config.as_ref()).await;
|
||||
let _phase_one_e2e_timer = stage_one_context.start_timer(MEMORY_PHASE_ONE_E2E_MS);
|
||||
|
||||
// 1. Claim startup job.
|
||||
let Some(claimed_candidates) = claim_startup_jobs(context.as_ref(), &config.memories).await
|
||||
let Some(claimed_candidates) =
|
||||
claim_startup_jobs(context.as_ref(), &config.memories, store.as_ref()).await
|
||||
else {
|
||||
return;
|
||||
};
|
||||
@@ -89,6 +95,7 @@ pub async fn run(context: Arc<MemoryStartupContext>, config: Arc<Config>) {
|
||||
let outcomes = run_jobs(
|
||||
context,
|
||||
config,
|
||||
store,
|
||||
claimed_candidates,
|
||||
stage_one_context.clone(),
|
||||
)
|
||||
@@ -108,27 +115,24 @@ pub async fn run(context: Arc<MemoryStartupContext>, config: Arc<Config>) {
|
||||
}
|
||||
|
||||
/// Prune old un-used "dead" raw memories.
|
||||
pub async fn prune(context: &MemoryStartupContext, config: &Config) {
|
||||
if let Some(db) = context.state_db() {
|
||||
let max_unused_days = config.memories.max_unused_days;
|
||||
match db
|
||||
.memories()
|
||||
.prune_stage1_outputs_for_retention(max_unused_days, crate::stage_one::PRUNE_BATCH_SIZE)
|
||||
.await
|
||||
{
|
||||
Ok(pruned) => {
|
||||
if pruned > 0 {
|
||||
info!(
|
||||
"memory startup pruned {pruned} stale stage-1 output row(s) older than {max_unused_days} days"
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"memories db prune_stage1_outputs_for_retention failed during memories startup: {err}"
|
||||
pub async fn prune(store: &dyn GeneratedMemoryStore, config: &Config) {
|
||||
let max_unused_days = config.memories.max_unused_days;
|
||||
match store
|
||||
.prune_stage1_outputs_for_retention(max_unused_days, crate::stage_one::PRUNE_BATCH_SIZE)
|
||||
.await
|
||||
{
|
||||
Ok(pruned) => {
|
||||
if pruned > 0 {
|
||||
info!(
|
||||
"memory startup pruned {pruned} stale stage-1 output row(s) older than {max_unused_days} days"
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"memories db prune_stage1_outputs_for_retention failed during memories startup: {err}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -149,20 +153,14 @@ pub fn output_schema() -> Value {
|
||||
async fn claim_startup_jobs(
|
||||
context: &MemoryStartupContext,
|
||||
memories_config: &MemoriesConfig,
|
||||
store: &dyn GeneratedMemoryStore,
|
||||
) -> Option<Vec<codex_state::Stage1JobClaim>> {
|
||||
let Some(state_db) = context.state_db() else {
|
||||
// This should not happen.
|
||||
warn!("state db unavailable while claiming phase-1 startup jobs; skipping");
|
||||
return None;
|
||||
};
|
||||
|
||||
let allowed_sources = INTERACTIVE_SESSION_SOURCES
|
||||
.iter()
|
||||
.map(ToString::to_string)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
match state_db
|
||||
.memories()
|
||||
match store
|
||||
.claim_stage1_jobs_for_startup(
|
||||
context.thread_id(),
|
||||
codex_state::Stage1StartupClaimParams {
|
||||
@@ -203,6 +201,7 @@ async fn build_request_context(
|
||||
async fn run_jobs(
|
||||
context: Arc<MemoryStartupContext>,
|
||||
config: Arc<Config>,
|
||||
store: Arc<dyn GeneratedMemoryStore>,
|
||||
claimed_candidates: Vec<codex_state::Stage1JobClaim>,
|
||||
stage_one_context: StageOneRequestContext,
|
||||
) -> Vec<JobResult> {
|
||||
@@ -210,9 +209,17 @@ async fn run_jobs(
|
||||
.map(|claim| {
|
||||
let context = Arc::clone(&context);
|
||||
let config = Arc::clone(&config);
|
||||
let store = Arc::clone(&store);
|
||||
let stage_one_context = stage_one_context.clone();
|
||||
async move {
|
||||
job::run(context.as_ref(), config.as_ref(), claim, &stage_one_context).await
|
||||
job::run(
|
||||
context.as_ref(),
|
||||
config.as_ref(),
|
||||
store.as_ref(),
|
||||
claim,
|
||||
&stage_one_context,
|
||||
)
|
||||
.await
|
||||
}
|
||||
})
|
||||
.buffer_unordered(crate::stage_one::CONCURRENCY_LIMIT)
|
||||
@@ -226,6 +233,7 @@ mod job {
|
||||
pub(crate) async fn run(
|
||||
context: &MemoryStartupContext,
|
||||
config: &Config,
|
||||
store: &dyn GeneratedMemoryStore,
|
||||
claim: codex_state::Stage1JobClaim,
|
||||
stage_one_context: &StageOneRequestContext,
|
||||
) -> JobResult {
|
||||
@@ -242,7 +250,7 @@ mod job {
|
||||
Ok(output) => output,
|
||||
Err(reason) => {
|
||||
result::failed(
|
||||
context,
|
||||
store,
|
||||
claimed_thread.id,
|
||||
&claim.ownership_token,
|
||||
&reason.to_string(),
|
||||
@@ -257,15 +265,14 @@ mod job {
|
||||
|
||||
if stage_one_output.raw_memory.is_empty() || stage_one_output.rollout_summary.is_empty() {
|
||||
return JobResult {
|
||||
outcome: result::no_output(context, claimed_thread.id, &claim.ownership_token)
|
||||
.await,
|
||||
outcome: result::no_output(store, claimed_thread.id, &claim.ownership_token).await,
|
||||
token_usage,
|
||||
};
|
||||
}
|
||||
|
||||
JobResult {
|
||||
outcome: result::success(
|
||||
context,
|
||||
store,
|
||||
claimed_thread.id,
|
||||
&claim.ownership_token,
|
||||
claimed_thread.updated_at.timestamp(),
|
||||
@@ -325,36 +332,28 @@ mod job {
|
||||
use super::*;
|
||||
|
||||
pub(crate) async fn failed(
|
||||
context: &MemoryStartupContext,
|
||||
store: &dyn GeneratedMemoryStore,
|
||||
thread_id: codex_protocol::ThreadId,
|
||||
ownership_token: &str,
|
||||
reason: &str,
|
||||
) {
|
||||
tracing::warn!("Phase 1 job failed for thread {thread_id}: {reason}");
|
||||
if let Some(state_db) = context.state_db() {
|
||||
let _ = state_db
|
||||
.memories()
|
||||
.mark_stage1_job_failed(
|
||||
thread_id,
|
||||
ownership_token,
|
||||
reason,
|
||||
crate::stage_one::JOB_RETRY_DELAY_SECONDS,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
let _ = store
|
||||
.mark_stage1_job_failed(
|
||||
thread_id,
|
||||
ownership_token,
|
||||
reason,
|
||||
crate::stage_one::JOB_RETRY_DELAY_SECONDS,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
pub(crate) async fn no_output(
|
||||
context: &MemoryStartupContext,
|
||||
store: &dyn GeneratedMemoryStore,
|
||||
thread_id: codex_protocol::ThreadId,
|
||||
ownership_token: &str,
|
||||
) -> JobOutcome {
|
||||
let Some(state_db) = context.state_db() else {
|
||||
return JobOutcome::Failed;
|
||||
};
|
||||
|
||||
if state_db
|
||||
.memories()
|
||||
if store
|
||||
.mark_stage1_job_succeeded_no_output(thread_id, ownership_token)
|
||||
.await
|
||||
.unwrap_or(false)
|
||||
@@ -366,7 +365,7 @@ mod job {
|
||||
}
|
||||
|
||||
pub(crate) async fn success(
|
||||
context: &MemoryStartupContext,
|
||||
store: &dyn GeneratedMemoryStore,
|
||||
thread_id: codex_protocol::ThreadId,
|
||||
ownership_token: &str,
|
||||
source_updated_at: i64,
|
||||
@@ -374,12 +373,7 @@ mod job {
|
||||
rollout_summary: &str,
|
||||
rollout_slug: Option<&str>,
|
||||
) -> JobOutcome {
|
||||
let Some(state_db) = context.state_db() else {
|
||||
return JobOutcome::Failed;
|
||||
};
|
||||
|
||||
if state_db
|
||||
.memories()
|
||||
if store
|
||||
.mark_stage1_job_succeeded(
|
||||
thread_id,
|
||||
ownership_token,
|
||||
|
||||
@@ -22,8 +22,8 @@ use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use codex_protocol::protocol::TokenUsage;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use codex_state::GeneratedMemoryStore;
|
||||
use codex_state::Stage1Output;
|
||||
use codex_state::StateRuntime;
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
@@ -42,19 +42,19 @@ struct Counters {
|
||||
|
||||
/// Runs memory phase 2 (aka consolidation) in strict order. The method represents the linear
|
||||
/// flow of the consolidation phase.
|
||||
pub async fn run(context: Arc<MemoryStartupContext>, config: Arc<Config>) {
|
||||
pub async fn run(
|
||||
context: Arc<MemoryStartupContext>,
|
||||
config: Arc<Config>,
|
||||
store: Arc<dyn GeneratedMemoryStore>,
|
||||
) {
|
||||
let phase_two_e2e_timer = context.start_timer(MEMORY_PHASE_TWO_E2E_MS);
|
||||
|
||||
let Some(db) = context.state_db() else {
|
||||
// This should not happen.
|
||||
return;
|
||||
};
|
||||
let root = memory_root(&config.codex_home);
|
||||
let max_raw_memories = config.memories.max_raw_memories_for_consolidation;
|
||||
let max_unused_days = config.memories.max_unused_days;
|
||||
|
||||
// 1. Claim the global Phase 2 lock before touching the memory workspace.
|
||||
let claim = match job::claim(context.as_ref(), db.as_ref()).await {
|
||||
let claim = match job::claim(context.as_ref(), store.as_ref()).await {
|
||||
Ok(claim) => claim,
|
||||
Err(e) => {
|
||||
context.counter(MEMORY_PHASE_TWO_JOBS, /*inc*/ 1, &[("status", e)]);
|
||||
@@ -67,7 +67,7 @@ pub async fn run(context: Arc<MemoryStartupContext>, config: Arc<Config>) {
|
||||
tracing::error!("failed preparing memory workspace: {err}");
|
||||
job::failed(
|
||||
context.as_ref(),
|
||||
db.as_ref(),
|
||||
store.as_ref(),
|
||||
&claim,
|
||||
"failed_prepare_workspace",
|
||||
)
|
||||
@@ -81,7 +81,7 @@ pub async fn run(context: Arc<MemoryStartupContext>, config: Arc<Config>) {
|
||||
tracing::error!("failed to get agent config");
|
||||
job::failed(
|
||||
context.as_ref(),
|
||||
db.as_ref(),
|
||||
store.as_ref(),
|
||||
&claim,
|
||||
"failed_sandbox_policy",
|
||||
)
|
||||
@@ -90,8 +90,7 @@ pub async fn run(context: Arc<MemoryStartupContext>, config: Arc<Config>) {
|
||||
};
|
||||
|
||||
// 4. Load current DB-backed Phase 2 inputs.
|
||||
let raw_memories = match db
|
||||
.memories()
|
||||
let raw_memories = match store
|
||||
.get_phase2_input_selection(max_raw_memories, max_unused_days)
|
||||
.await
|
||||
{
|
||||
@@ -100,7 +99,7 @@ pub async fn run(context: Arc<MemoryStartupContext>, config: Arc<Config>) {
|
||||
tracing::error!("failed to list stage1 outputs from global: {err}");
|
||||
job::failed(
|
||||
context.as_ref(),
|
||||
db.as_ref(),
|
||||
store.as_ref(),
|
||||
&claim,
|
||||
"failed_load_stage1_outputs",
|
||||
)
|
||||
@@ -116,7 +115,7 @@ pub async fn run(context: Arc<MemoryStartupContext>, config: Arc<Config>) {
|
||||
tracing::error!("failed syncing phase2 workspace inputs: {err}");
|
||||
job::failed(
|
||||
context.as_ref(),
|
||||
db.as_ref(),
|
||||
store.as_ref(),
|
||||
&claim,
|
||||
"failed_sync_workspace_inputs",
|
||||
)
|
||||
@@ -131,7 +130,7 @@ pub async fn run(context: Arc<MemoryStartupContext>, config: Arc<Config>) {
|
||||
tracing::error!("failed checking memory workspace changes: {err}");
|
||||
job::failed(
|
||||
context.as_ref(),
|
||||
db.as_ref(),
|
||||
store.as_ref(),
|
||||
&claim,
|
||||
"failed_workspace_status",
|
||||
)
|
||||
@@ -144,7 +143,7 @@ pub async fn run(context: Arc<MemoryStartupContext>, config: Arc<Config>) {
|
||||
// We check only after sync of the file system.
|
||||
job::succeed(
|
||||
context.as_ref(),
|
||||
db.as_ref(),
|
||||
store.as_ref(),
|
||||
&claim,
|
||||
new_watermark,
|
||||
&raw_memories,
|
||||
@@ -159,7 +158,7 @@ pub async fn run(context: Arc<MemoryStartupContext>, config: Arc<Config>) {
|
||||
tracing::error!("failed writing memory workspace diff file: {err}");
|
||||
job::failed(
|
||||
context.as_ref(),
|
||||
db.as_ref(),
|
||||
store.as_ref(),
|
||||
&claim,
|
||||
"failed_workspace_diff_file",
|
||||
)
|
||||
@@ -176,7 +175,13 @@ pub async fn run(context: Arc<MemoryStartupContext>, config: Arc<Config>) {
|
||||
Ok(agent) => agent,
|
||||
Err(err) => {
|
||||
tracing::error!("failed to spawn global memory consolidation agent: {err}");
|
||||
job::failed(context.as_ref(), db.as_ref(), &claim, "failed_spawn_agent").await;
|
||||
job::failed(
|
||||
context.as_ref(),
|
||||
store.as_ref(),
|
||||
&claim,
|
||||
"failed_spawn_agent",
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
@@ -184,6 +189,7 @@ pub async fn run(context: Arc<MemoryStartupContext>, config: Arc<Config>) {
|
||||
// 9. Hand off completion handling, heartbeats, and baseline reset.
|
||||
agent::handle(
|
||||
Arc::clone(&context),
|
||||
Arc::clone(&store),
|
||||
claim,
|
||||
new_watermark,
|
||||
raw_memories.clone(),
|
||||
@@ -215,10 +221,9 @@ mod job {
|
||||
|
||||
pub(super) async fn claim(
|
||||
context: &MemoryStartupContext,
|
||||
db: &StateRuntime,
|
||||
store: &dyn GeneratedMemoryStore,
|
||||
) -> Result<Claim, &'static str> {
|
||||
let claim = db
|
||||
.memories()
|
||||
let claim = store
|
||||
.try_claim_global_phase2_job(context.thread_id(), crate::stage_two::JOB_LEASE_SECONDS)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
@@ -251,13 +256,13 @@ mod job {
|
||||
|
||||
pub(super) async fn failed(
|
||||
context: &MemoryStartupContext,
|
||||
db: &StateRuntime,
|
||||
store: &dyn GeneratedMemoryStore,
|
||||
claim: &Claim,
|
||||
reason: &'static str,
|
||||
) {
|
||||
context.counter(MEMORY_PHASE_TWO_JOBS, /*inc*/ 1, &[("status", reason)]);
|
||||
if matches!(
|
||||
db.memories()
|
||||
store
|
||||
.mark_global_phase2_job_failed(
|
||||
&claim.token,
|
||||
reason,
|
||||
@@ -266,8 +271,7 @@ mod job {
|
||||
.await,
|
||||
Ok(false)
|
||||
) {
|
||||
let _ = db
|
||||
.memories()
|
||||
let _ = store
|
||||
.mark_global_phase2_job_failed_if_unowned(
|
||||
&claim.token,
|
||||
reason,
|
||||
@@ -279,14 +283,14 @@ mod job {
|
||||
|
||||
pub(super) async fn succeed(
|
||||
context: &MemoryStartupContext,
|
||||
db: &StateRuntime,
|
||||
store: &dyn GeneratedMemoryStore,
|
||||
claim: &Claim,
|
||||
completion_watermark: i64,
|
||||
selected_outputs: &[codex_state::Stage1Output],
|
||||
reason: &'static str,
|
||||
) -> bool {
|
||||
context.counter(MEMORY_PHASE_TWO_JOBS, /*inc*/ 1, &[("status", reason)]);
|
||||
db.memories()
|
||||
store
|
||||
.mark_global_phase2_job_succeeded(&claim.token, completion_watermark, selected_outputs)
|
||||
.await
|
||||
.unwrap_or(false)
|
||||
@@ -358,6 +362,7 @@ mod agent {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(super) fn handle(
|
||||
context: Arc<MemoryStartupContext>,
|
||||
store: Arc<dyn GeneratedMemoryStore>,
|
||||
claim: Claim,
|
||||
new_watermark: i64,
|
||||
selected_outputs: Vec<codex_state::Stage1Output>,
|
||||
@@ -365,17 +370,13 @@ mod agent {
|
||||
agent: SpawnedConsolidationAgent,
|
||||
phase_two_e2e_timer: Option<codex_otel::Timer>,
|
||||
) {
|
||||
let Some(db) = context.state_db() else {
|
||||
return;
|
||||
};
|
||||
|
||||
tokio::spawn(async move {
|
||||
let _phase_two_e2e_timer = phase_two_e2e_timer;
|
||||
let SpawnedConsolidationAgent { thread_id, thread } = agent;
|
||||
|
||||
// Loop the agent until we have the final status.
|
||||
let final_status =
|
||||
loop_agent(db.clone(), claim.token.clone(), thread_id, &thread).await;
|
||||
loop_agent(Arc::clone(&store), claim.token.clone(), thread_id, &thread).await;
|
||||
|
||||
if matches!(final_status, AgentStatus::Completed(_)) {
|
||||
if let Some(token_usage) = thread
|
||||
@@ -386,8 +387,7 @@ mod agent {
|
||||
emit_token_usage_metrics(context.as_ref(), &token_usage);
|
||||
}
|
||||
// Do not reset the workspace baseline if we lost the lock.
|
||||
let still_owns_lock = match db
|
||||
.memories()
|
||||
let still_owns_lock = match store
|
||||
.heartbeat_global_phase2_job(
|
||||
&claim.token,
|
||||
crate::stage_two::JOB_LEASE_SECONDS,
|
||||
@@ -406,7 +406,12 @@ mod agent {
|
||||
false
|
||||
}
|
||||
Err(_) => {
|
||||
job::failed(context.as_ref(), &db, &claim, "failed_confirm_ownership")
|
||||
job::failed(
|
||||
context.as_ref(),
|
||||
store.as_ref(),
|
||||
&claim,
|
||||
"failed_confirm_ownership",
|
||||
)
|
||||
.await;
|
||||
false
|
||||
}
|
||||
@@ -414,10 +419,16 @@ mod agent {
|
||||
if still_owns_lock {
|
||||
if let Err(err) = reset_memory_workspace_baseline(&memory_root).await {
|
||||
tracing::error!("failed resetting memory workspace baseline: {err}");
|
||||
job::failed(context.as_ref(), &db, &claim, "failed_workspace_commit").await;
|
||||
job::failed(
|
||||
context.as_ref(),
|
||||
store.as_ref(),
|
||||
&claim,
|
||||
"failed_workspace_commit",
|
||||
)
|
||||
.await;
|
||||
} else if !job::succeed(
|
||||
context.as_ref(),
|
||||
&db,
|
||||
store.as_ref(),
|
||||
&claim,
|
||||
new_watermark,
|
||||
&selected_outputs,
|
||||
@@ -431,7 +442,7 @@ mod agent {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
job::failed(context.as_ref(), &db, &claim, "failed_agent").await;
|
||||
job::failed(context.as_ref(), store.as_ref(), &claim, "failed_agent").await;
|
||||
}
|
||||
|
||||
let cleanup_context = Arc::clone(&context);
|
||||
@@ -449,7 +460,7 @@ mod agent {
|
||||
}
|
||||
|
||||
async fn loop_agent(
|
||||
db: Arc<StateRuntime>,
|
||||
store: Arc<dyn GeneratedMemoryStore>,
|
||||
token: String,
|
||||
thread_id: ThreadId,
|
||||
thread: &codex_core::CodexThread,
|
||||
@@ -484,8 +495,7 @@ mod agent {
|
||||
_ = status_poll_interval.tick() => {
|
||||
}
|
||||
_ = heartbeat_interval.tick() => {
|
||||
match db
|
||||
.memories()
|
||||
match store
|
||||
.heartbeat_global_phase2_job(
|
||||
&token,
|
||||
crate::stage_two::JOB_LEASE_SECONDS,
|
||||
|
||||
@@ -12,6 +12,7 @@ use codex_features::Feature;
|
||||
use codex_login::AuthManager;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_state::GeneratedMemoryStore;
|
||||
use std::sync::Arc;
|
||||
use tracing::warn;
|
||||
|
||||
@@ -27,27 +28,57 @@ pub fn start_memories_startup_task(
|
||||
config: Arc<Config>,
|
||||
source: &SessionSource,
|
||||
) {
|
||||
if config.ephemeral
|
||||
|| !config.features.enabled(Feature::MemoryTool)
|
||||
|| source.is_non_root_agent()
|
||||
{
|
||||
if memories_startup_is_disabled(config.as_ref(), source) {
|
||||
return;
|
||||
}
|
||||
|
||||
let context = Arc::new(MemoryStartupContext::new(
|
||||
let context = memory_startup_context(
|
||||
thread_manager,
|
||||
Arc::clone(&auth_manager),
|
||||
thread_id,
|
||||
thread,
|
||||
config.as_ref(),
|
||||
source.clone(),
|
||||
));
|
||||
|
||||
if context.state_db().is_none() {
|
||||
source,
|
||||
);
|
||||
let Some(state_db) = context.state_db() else {
|
||||
warn!("state db unavailable for memories startup pipeline; skipping");
|
||||
return;
|
||||
};
|
||||
let store: Arc<dyn GeneratedMemoryStore> = Arc::new(state_db.memories().clone());
|
||||
spawn_memories_startup_task(context, auth_manager, config, store);
|
||||
}
|
||||
|
||||
/// Starts startup memory generation with an injected generated-memory store.
|
||||
pub fn start_memories_startup_task_with_store(
|
||||
thread_manager: Arc<ThreadManager>,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
thread_id: ThreadId,
|
||||
thread: Arc<CodexThread>,
|
||||
config: Arc<Config>,
|
||||
source: &SessionSource,
|
||||
store: Arc<dyn GeneratedMemoryStore>,
|
||||
) {
|
||||
if memories_startup_is_disabled(config.as_ref(), source) {
|
||||
return;
|
||||
}
|
||||
|
||||
let context = memory_startup_context(
|
||||
thread_manager,
|
||||
Arc::clone(&auth_manager),
|
||||
thread_id,
|
||||
thread,
|
||||
config.as_ref(),
|
||||
source,
|
||||
);
|
||||
spawn_memories_startup_task(context, auth_manager, config, store);
|
||||
}
|
||||
|
||||
fn spawn_memories_startup_task(
|
||||
context: Arc<MemoryStartupContext>,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
config: Arc<Config>,
|
||||
store: Arc<dyn GeneratedMemoryStore>,
|
||||
) {
|
||||
tokio::spawn(async move {
|
||||
let root = memory_root(&config.codex_home);
|
||||
if let Err(err) = tokio::fs::create_dir_all(&root).await {
|
||||
@@ -60,7 +91,7 @@ pub fn start_memories_startup_task(
|
||||
|
||||
// Clean memories to make preserve DB size. This does not consume tokens so can be
|
||||
// done before the quota check.
|
||||
phase1::prune(context.as_ref(), &config).await;
|
||||
phase1::prune(store.as_ref(), &config).await;
|
||||
|
||||
if !guard::rate_limits_ok(&auth_manager, &config).await {
|
||||
context.counter(
|
||||
@@ -72,8 +103,35 @@ pub fn start_memories_startup_task(
|
||||
}
|
||||
|
||||
// Run phase 1.
|
||||
phase1::run(Arc::clone(&context), Arc::clone(&config)).await;
|
||||
phase1::run(
|
||||
Arc::clone(&context),
|
||||
Arc::clone(&config),
|
||||
Arc::clone(&store),
|
||||
)
|
||||
.await;
|
||||
// Run phase 2.
|
||||
phase2::run(context, config).await;
|
||||
phase2::run(context, config, store).await;
|
||||
});
|
||||
}
|
||||
|
||||
fn memory_startup_context(
|
||||
thread_manager: Arc<ThreadManager>,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
thread_id: ThreadId,
|
||||
thread: Arc<CodexThread>,
|
||||
config: &Config,
|
||||
source: &SessionSource,
|
||||
) -> Arc<MemoryStartupContext> {
|
||||
Arc::new(MemoryStartupContext::new(
|
||||
thread_manager,
|
||||
auth_manager,
|
||||
thread_id,
|
||||
thread,
|
||||
config,
|
||||
source.clone(),
|
||||
))
|
||||
}
|
||||
|
||||
fn memories_startup_is_disabled(config: &Config, source: &SessionSource) -> bool {
|
||||
config.ephemeral || !config.features.enabled(Feature::MemoryTool) || source.is_non_root_agent()
|
||||
}
|
||||
|
||||
106
codex-rs/state/src/generated_memory_store.rs
Normal file
106
codex-rs/state/src/generated_memory_store.rs
Normal file
@@ -0,0 +1,106 @@
|
||||
use crate::MemoryStore;
|
||||
use crate::Phase2JobClaimOutcome;
|
||||
use crate::Stage1JobClaim;
|
||||
use crate::Stage1Output;
|
||||
use crate::Stage1StartupClaimParams;
|
||||
use codex_protocol::ThreadId;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
|
||||
/// Boxed generated-memory store future usable behind a trait object.
|
||||
pub type GeneratedMemoryStoreFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
|
||||
|
||||
macro_rules! generated_memory_store {
|
||||
($( $(#[$method_doc:meta])* fn $method:ident<$lifetime:lifetime>(
|
||||
$($arg:ident: $arg_type:ty),* $(,)?
|
||||
) -> $output:ty; )*) => {
|
||||
/// Persistence boundary for generated-memory startup job and output state.
|
||||
///
|
||||
/// Implementations own the stage-1 extraction rows plus the singleton
|
||||
/// phase-2 consolidation lease. Callers rely on [`MemoryStore`]
|
||||
/// ownership-token semantics: successful writes return `false` after
|
||||
/// the caller loses the relevant job.
|
||||
pub trait GeneratedMemoryStore: Send + Sync + 'static {
|
||||
$( $(#[$method_doc])* fn $method<$lifetime>(
|
||||
&$lifetime self,
|
||||
$($arg: $arg_type),*
|
||||
) -> GeneratedMemoryStoreFuture<$lifetime, $output>; )*
|
||||
}
|
||||
|
||||
impl GeneratedMemoryStore for MemoryStore {
|
||||
$( fn $method<$lifetime>(
|
||||
&$lifetime self,
|
||||
$($arg: $arg_type),*
|
||||
) -> GeneratedMemoryStoreFuture<$lifetime, $output> {
|
||||
Box::pin(MemoryStore::$method(self, $($arg),*))
|
||||
} )*
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
generated_memory_store! {
|
||||
/// Prunes stale generated stage-1 outputs that are no longer retained.
|
||||
fn prune_stage1_outputs_for_retention<'a>(
|
||||
max_unused_days: i64,
|
||||
limit: usize,
|
||||
) -> anyhow::Result<usize>;
|
||||
/// Claims eligible startup stage-1 extraction jobs.
|
||||
fn claim_stage1_jobs_for_startup<'a>(
|
||||
current_thread_id: ThreadId,
|
||||
params: Stage1StartupClaimParams<'a>,
|
||||
) -> anyhow::Result<Vec<Stage1JobClaim>>;
|
||||
/// Marks an owned stage-1 extraction job successful with generated output.
|
||||
fn mark_stage1_job_succeeded<'a>(
|
||||
thread_id: ThreadId,
|
||||
ownership_token: &'a str,
|
||||
source_updated_at: i64,
|
||||
raw_memory: &'a str,
|
||||
rollout_summary: &'a str,
|
||||
rollout_slug: Option<&'a str>,
|
||||
) -> anyhow::Result<bool>;
|
||||
/// Marks an owned stage-1 extraction job successful without output.
|
||||
fn mark_stage1_job_succeeded_no_output<'a>(
|
||||
thread_id: ThreadId,
|
||||
ownership_token: &'a str,
|
||||
) -> anyhow::Result<bool>;
|
||||
/// Marks an owned stage-1 extraction job failed with retry backoff.
|
||||
fn mark_stage1_job_failed<'a>(
|
||||
thread_id: ThreadId,
|
||||
ownership_token: &'a str,
|
||||
failure_reason: &'a str,
|
||||
retry_delay_seconds: i64,
|
||||
) -> anyhow::Result<bool>;
|
||||
/// Claims the singleton global phase-2 consolidation lease.
|
||||
fn try_claim_global_phase2_job<'a>(
|
||||
worker_id: ThreadId,
|
||||
lease_seconds: i64,
|
||||
) -> anyhow::Result<Phase2JobClaimOutcome>;
|
||||
/// Returns the current generated-memory inputs for phase-2 consolidation.
|
||||
fn get_phase2_input_selection<'a>(
|
||||
n: usize,
|
||||
max_unused_days: i64,
|
||||
) -> anyhow::Result<Vec<Stage1Output>>;
|
||||
/// Extends the owned singleton global phase-2 consolidation lease.
|
||||
fn heartbeat_global_phase2_job<'a>(
|
||||
ownership_token: &'a str,
|
||||
lease_seconds: i64,
|
||||
) -> anyhow::Result<bool>;
|
||||
/// Marks the owned singleton global phase-2 consolidation job successful.
|
||||
fn mark_global_phase2_job_succeeded<'a>(
|
||||
ownership_token: &'a str,
|
||||
completed_watermark: i64,
|
||||
selected_outputs: &'a [Stage1Output],
|
||||
) -> anyhow::Result<bool>;
|
||||
/// Marks the owned singleton global phase-2 consolidation job failed.
|
||||
fn mark_global_phase2_job_failed<'a>(
|
||||
ownership_token: &'a str,
|
||||
failure_reason: &'a str,
|
||||
retry_delay_seconds: i64,
|
||||
) -> anyhow::Result<bool>;
|
||||
/// Finalizes a failed singleton phase-2 job after ownership may be lost.
|
||||
fn mark_global_phase2_job_failed_if_unowned<'a>(
|
||||
ownership_token: &'a str,
|
||||
failure_reason: &'a str,
|
||||
retry_delay_seconds: i64,
|
||||
) -> anyhow::Result<bool>;
|
||||
}
|
||||
@@ -6,6 +6,7 @@
|
||||
|
||||
mod audit;
|
||||
mod extract;
|
||||
mod generated_memory_store;
|
||||
pub mod log_db;
|
||||
mod migrations;
|
||||
mod model;
|
||||
@@ -27,6 +28,8 @@ pub use audit::read_thread_state_audit_rows;
|
||||
/// Most consumers should prefer [`StateRuntime`].
|
||||
pub use extract::apply_rollout_item;
|
||||
pub use extract::rollout_item_affects_thread_metadata;
|
||||
pub use generated_memory_store::GeneratedMemoryStore;
|
||||
pub use generated_memory_store::GeneratedMemoryStoreFuture;
|
||||
pub use model::AgentJob;
|
||||
pub use model::AgentJobCreateParams;
|
||||
pub use model::AgentJobItem;
|
||||
|
||||
Reference in New Issue
Block a user