Compare commits

...

2 Commits

Author SHA1 Message Date
starr-openai
8806787833 polish generated memory store seam 2026-06-03 10:50:44 -07:00
starr-openai
479d29d6e1 Extract generated memory store seam 2026-06-03 01:10:48 -07:00
6 changed files with 283 additions and 111 deletions

View File

@@ -25,6 +25,7 @@ pub use extensions::prune_old_extension_resources;
pub use prompts::build_consolidation_prompt;
pub use prompts::build_stage_one_input_message;
pub use start::start_memories_startup_task;
pub use start::start_memories_startup_task_with_store;
pub use storage::rebuild_raw_memories_file_from_memories;
pub use storage::rollout_summary_file_stem;
pub use storage::sync_rollout_summaries_from_memories;

View File

@@ -18,6 +18,7 @@ use codex_protocol::protocol::TokenUsage;
use codex_rollout::INTERACTIVE_SESSION_SOURCES;
use codex_rollout::should_persist_response_item_for_memories;
use codex_secrets::redact_secrets;
use codex_state::GeneratedMemoryStore;
use futures::StreamExt;
use serde::Deserialize;
use serde_json::Value;
@@ -67,12 +68,17 @@ struct StageOneOutput {
/// 2) build one stage-1 request context
/// 3) run stage-1 extraction jobs in parallel
/// 4) emit metrics and logs
pub async fn run(context: Arc<MemoryStartupContext>, config: Arc<Config>) {
pub async fn run(
context: Arc<MemoryStartupContext>,
config: Arc<Config>,
store: Arc<dyn GeneratedMemoryStore>,
) {
let stage_one_context = build_request_context(context.as_ref(), config.as_ref()).await;
let _phase_one_e2e_timer = stage_one_context.start_timer(MEMORY_PHASE_ONE_E2E_MS);
// 1. Claim startup job.
let Some(claimed_candidates) = claim_startup_jobs(context.as_ref(), &config.memories).await
let Some(claimed_candidates) =
claim_startup_jobs(context.as_ref(), &config.memories, store.as_ref()).await
else {
return;
};
@@ -89,6 +95,7 @@ pub async fn run(context: Arc<MemoryStartupContext>, config: Arc<Config>) {
let outcomes = run_jobs(
context,
config,
store,
claimed_candidates,
stage_one_context.clone(),
)
@@ -108,27 +115,24 @@ pub async fn run(context: Arc<MemoryStartupContext>, config: Arc<Config>) {
}
/// Prune old un-used "dead" raw memories.
pub async fn prune(context: &MemoryStartupContext, config: &Config) {
if let Some(db) = context.state_db() {
let max_unused_days = config.memories.max_unused_days;
match db
.memories()
.prune_stage1_outputs_for_retention(max_unused_days, crate::stage_one::PRUNE_BATCH_SIZE)
.await
{
Ok(pruned) => {
if pruned > 0 {
info!(
"memory startup pruned {pruned} stale stage-1 output row(s) older than {max_unused_days} days"
);
}
}
Err(err) => {
warn!(
"memories db prune_stage1_outputs_for_retention failed during memories startup: {err}"
pub async fn prune(store: &dyn GeneratedMemoryStore, config: &Config) {
let max_unused_days = config.memories.max_unused_days;
match store
.prune_stage1_outputs_for_retention(max_unused_days, crate::stage_one::PRUNE_BATCH_SIZE)
.await
{
Ok(pruned) => {
if pruned > 0 {
info!(
"memory startup pruned {pruned} stale stage-1 output row(s) older than {max_unused_days} days"
);
}
}
Err(err) => {
warn!(
"memories db prune_stage1_outputs_for_retention failed during memories startup: {err}"
);
}
}
}
@@ -149,20 +153,14 @@ pub fn output_schema() -> Value {
async fn claim_startup_jobs(
context: &MemoryStartupContext,
memories_config: &MemoriesConfig,
store: &dyn GeneratedMemoryStore,
) -> Option<Vec<codex_state::Stage1JobClaim>> {
let Some(state_db) = context.state_db() else {
// This should not happen.
warn!("state db unavailable while claiming phase-1 startup jobs; skipping");
return None;
};
let allowed_sources = INTERACTIVE_SESSION_SOURCES
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>();
match state_db
.memories()
match store
.claim_stage1_jobs_for_startup(
context.thread_id(),
codex_state::Stage1StartupClaimParams {
@@ -203,6 +201,7 @@ async fn build_request_context(
async fn run_jobs(
context: Arc<MemoryStartupContext>,
config: Arc<Config>,
store: Arc<dyn GeneratedMemoryStore>,
claimed_candidates: Vec<codex_state::Stage1JobClaim>,
stage_one_context: StageOneRequestContext,
) -> Vec<JobResult> {
@@ -210,9 +209,17 @@ async fn run_jobs(
.map(|claim| {
let context = Arc::clone(&context);
let config = Arc::clone(&config);
let store = Arc::clone(&store);
let stage_one_context = stage_one_context.clone();
async move {
job::run(context.as_ref(), config.as_ref(), claim, &stage_one_context).await
job::run(
context.as_ref(),
config.as_ref(),
store.as_ref(),
claim,
&stage_one_context,
)
.await
}
})
.buffer_unordered(crate::stage_one::CONCURRENCY_LIMIT)
@@ -226,6 +233,7 @@ mod job {
pub(crate) async fn run(
context: &MemoryStartupContext,
config: &Config,
store: &dyn GeneratedMemoryStore,
claim: codex_state::Stage1JobClaim,
stage_one_context: &StageOneRequestContext,
) -> JobResult {
@@ -242,7 +250,7 @@ mod job {
Ok(output) => output,
Err(reason) => {
result::failed(
context,
store,
claimed_thread.id,
&claim.ownership_token,
&reason.to_string(),
@@ -257,15 +265,14 @@ mod job {
if stage_one_output.raw_memory.is_empty() || stage_one_output.rollout_summary.is_empty() {
return JobResult {
outcome: result::no_output(context, claimed_thread.id, &claim.ownership_token)
.await,
outcome: result::no_output(store, claimed_thread.id, &claim.ownership_token).await,
token_usage,
};
}
JobResult {
outcome: result::success(
context,
store,
claimed_thread.id,
&claim.ownership_token,
claimed_thread.updated_at.timestamp(),
@@ -325,36 +332,28 @@ mod job {
use super::*;
pub(crate) async fn failed(
context: &MemoryStartupContext,
store: &dyn GeneratedMemoryStore,
thread_id: codex_protocol::ThreadId,
ownership_token: &str,
reason: &str,
) {
tracing::warn!("Phase 1 job failed for thread {thread_id}: {reason}");
if let Some(state_db) = context.state_db() {
let _ = state_db
.memories()
.mark_stage1_job_failed(
thread_id,
ownership_token,
reason,
crate::stage_one::JOB_RETRY_DELAY_SECONDS,
)
.await;
}
let _ = store
.mark_stage1_job_failed(
thread_id,
ownership_token,
reason,
crate::stage_one::JOB_RETRY_DELAY_SECONDS,
)
.await;
}
pub(crate) async fn no_output(
context: &MemoryStartupContext,
store: &dyn GeneratedMemoryStore,
thread_id: codex_protocol::ThreadId,
ownership_token: &str,
) -> JobOutcome {
let Some(state_db) = context.state_db() else {
return JobOutcome::Failed;
};
if state_db
.memories()
if store
.mark_stage1_job_succeeded_no_output(thread_id, ownership_token)
.await
.unwrap_or(false)
@@ -366,7 +365,7 @@ mod job {
}
pub(crate) async fn success(
context: &MemoryStartupContext,
store: &dyn GeneratedMemoryStore,
thread_id: codex_protocol::ThreadId,
ownership_token: &str,
source_updated_at: i64,
@@ -374,12 +373,7 @@ mod job {
rollout_summary: &str,
rollout_slug: Option<&str>,
) -> JobOutcome {
let Some(state_db) = context.state_db() else {
return JobOutcome::Failed;
};
if state_db
.memories()
if store
.mark_stage1_job_succeeded(
thread_id,
ownership_token,

View File

@@ -22,8 +22,8 @@ use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::TokenUsage;
use codex_protocol::user_input::UserInput;
use codex_state::GeneratedMemoryStore;
use codex_state::Stage1Output;
use codex_state::StateRuntime;
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
@@ -42,19 +42,19 @@ struct Counters {
/// Runs memory phase 2 (aka consolidation) in strict order. The method represents the linear
/// flow of the consolidation phase.
pub async fn run(context: Arc<MemoryStartupContext>, config: Arc<Config>) {
pub async fn run(
context: Arc<MemoryStartupContext>,
config: Arc<Config>,
store: Arc<dyn GeneratedMemoryStore>,
) {
let phase_two_e2e_timer = context.start_timer(MEMORY_PHASE_TWO_E2E_MS);
let Some(db) = context.state_db() else {
// This should not happen.
return;
};
let root = memory_root(&config.codex_home);
let max_raw_memories = config.memories.max_raw_memories_for_consolidation;
let max_unused_days = config.memories.max_unused_days;
// 1. Claim the global Phase 2 lock before touching the memory workspace.
let claim = match job::claim(context.as_ref(), db.as_ref()).await {
let claim = match job::claim(context.as_ref(), store.as_ref()).await {
Ok(claim) => claim,
Err(e) => {
context.counter(MEMORY_PHASE_TWO_JOBS, /*inc*/ 1, &[("status", e)]);
@@ -67,7 +67,7 @@ pub async fn run(context: Arc<MemoryStartupContext>, config: Arc<Config>) {
tracing::error!("failed preparing memory workspace: {err}");
job::failed(
context.as_ref(),
db.as_ref(),
store.as_ref(),
&claim,
"failed_prepare_workspace",
)
@@ -81,7 +81,7 @@ pub async fn run(context: Arc<MemoryStartupContext>, config: Arc<Config>) {
tracing::error!("failed to get agent config");
job::failed(
context.as_ref(),
db.as_ref(),
store.as_ref(),
&claim,
"failed_sandbox_policy",
)
@@ -90,8 +90,7 @@ pub async fn run(context: Arc<MemoryStartupContext>, config: Arc<Config>) {
};
// 4. Load current DB-backed Phase 2 inputs.
let raw_memories = match db
.memories()
let raw_memories = match store
.get_phase2_input_selection(max_raw_memories, max_unused_days)
.await
{
@@ -100,7 +99,7 @@ pub async fn run(context: Arc<MemoryStartupContext>, config: Arc<Config>) {
tracing::error!("failed to list stage1 outputs from global: {err}");
job::failed(
context.as_ref(),
db.as_ref(),
store.as_ref(),
&claim,
"failed_load_stage1_outputs",
)
@@ -116,7 +115,7 @@ pub async fn run(context: Arc<MemoryStartupContext>, config: Arc<Config>) {
tracing::error!("failed syncing phase2 workspace inputs: {err}");
job::failed(
context.as_ref(),
db.as_ref(),
store.as_ref(),
&claim,
"failed_sync_workspace_inputs",
)
@@ -131,7 +130,7 @@ pub async fn run(context: Arc<MemoryStartupContext>, config: Arc<Config>) {
tracing::error!("failed checking memory workspace changes: {err}");
job::failed(
context.as_ref(),
db.as_ref(),
store.as_ref(),
&claim,
"failed_workspace_status",
)
@@ -144,7 +143,7 @@ pub async fn run(context: Arc<MemoryStartupContext>, config: Arc<Config>) {
// We check only after sync of the file system.
job::succeed(
context.as_ref(),
db.as_ref(),
store.as_ref(),
&claim,
new_watermark,
&raw_memories,
@@ -159,7 +158,7 @@ pub async fn run(context: Arc<MemoryStartupContext>, config: Arc<Config>) {
tracing::error!("failed writing memory workspace diff file: {err}");
job::failed(
context.as_ref(),
db.as_ref(),
store.as_ref(),
&claim,
"failed_workspace_diff_file",
)
@@ -176,7 +175,13 @@ pub async fn run(context: Arc<MemoryStartupContext>, config: Arc<Config>) {
Ok(agent) => agent,
Err(err) => {
tracing::error!("failed to spawn global memory consolidation agent: {err}");
job::failed(context.as_ref(), db.as_ref(), &claim, "failed_spawn_agent").await;
job::failed(
context.as_ref(),
store.as_ref(),
&claim,
"failed_spawn_agent",
)
.await;
return;
}
};
@@ -184,6 +189,7 @@ pub async fn run(context: Arc<MemoryStartupContext>, config: Arc<Config>) {
// 9. Hand off completion handling, heartbeats, and baseline reset.
agent::handle(
Arc::clone(&context),
Arc::clone(&store),
claim,
new_watermark,
raw_memories.clone(),
@@ -215,10 +221,9 @@ mod job {
pub(super) async fn claim(
context: &MemoryStartupContext,
db: &StateRuntime,
store: &dyn GeneratedMemoryStore,
) -> Result<Claim, &'static str> {
let claim = db
.memories()
let claim = store
.try_claim_global_phase2_job(context.thread_id(), crate::stage_two::JOB_LEASE_SECONDS)
.await
.map_err(|e| {
@@ -251,13 +256,13 @@ mod job {
pub(super) async fn failed(
context: &MemoryStartupContext,
db: &StateRuntime,
store: &dyn GeneratedMemoryStore,
claim: &Claim,
reason: &'static str,
) {
context.counter(MEMORY_PHASE_TWO_JOBS, /*inc*/ 1, &[("status", reason)]);
if matches!(
db.memories()
store
.mark_global_phase2_job_failed(
&claim.token,
reason,
@@ -266,8 +271,7 @@ mod job {
.await,
Ok(false)
) {
let _ = db
.memories()
let _ = store
.mark_global_phase2_job_failed_if_unowned(
&claim.token,
reason,
@@ -279,14 +283,14 @@ mod job {
pub(super) async fn succeed(
context: &MemoryStartupContext,
db: &StateRuntime,
store: &dyn GeneratedMemoryStore,
claim: &Claim,
completion_watermark: i64,
selected_outputs: &[codex_state::Stage1Output],
reason: &'static str,
) -> bool {
context.counter(MEMORY_PHASE_TWO_JOBS, /*inc*/ 1, &[("status", reason)]);
db.memories()
store
.mark_global_phase2_job_succeeded(&claim.token, completion_watermark, selected_outputs)
.await
.unwrap_or(false)
@@ -358,6 +362,7 @@ mod agent {
#[allow(clippy::too_many_arguments)]
pub(super) fn handle(
context: Arc<MemoryStartupContext>,
store: Arc<dyn GeneratedMemoryStore>,
claim: Claim,
new_watermark: i64,
selected_outputs: Vec<codex_state::Stage1Output>,
@@ -365,17 +370,13 @@ mod agent {
agent: SpawnedConsolidationAgent,
phase_two_e2e_timer: Option<codex_otel::Timer>,
) {
let Some(db) = context.state_db() else {
return;
};
tokio::spawn(async move {
let _phase_two_e2e_timer = phase_two_e2e_timer;
let SpawnedConsolidationAgent { thread_id, thread } = agent;
// Loop the agent until we have the final status.
let final_status =
loop_agent(db.clone(), claim.token.clone(), thread_id, &thread).await;
loop_agent(Arc::clone(&store), claim.token.clone(), thread_id, &thread).await;
if matches!(final_status, AgentStatus::Completed(_)) {
if let Some(token_usage) = thread
@@ -386,8 +387,7 @@ mod agent {
emit_token_usage_metrics(context.as_ref(), &token_usage);
}
// Do not reset the workspace baseline if we lost the lock.
let still_owns_lock = match db
.memories()
let still_owns_lock = match store
.heartbeat_global_phase2_job(
&claim.token,
crate::stage_two::JOB_LEASE_SECONDS,
@@ -406,7 +406,12 @@ mod agent {
false
}
Err(_) => {
job::failed(context.as_ref(), &db, &claim, "failed_confirm_ownership")
job::failed(
context.as_ref(),
store.as_ref(),
&claim,
"failed_confirm_ownership",
)
.await;
false
}
@@ -414,10 +419,16 @@ mod agent {
if still_owns_lock {
if let Err(err) = reset_memory_workspace_baseline(&memory_root).await {
tracing::error!("failed resetting memory workspace baseline: {err}");
job::failed(context.as_ref(), &db, &claim, "failed_workspace_commit").await;
job::failed(
context.as_ref(),
store.as_ref(),
&claim,
"failed_workspace_commit",
)
.await;
} else if !job::succeed(
context.as_ref(),
&db,
store.as_ref(),
&claim,
new_watermark,
&selected_outputs,
@@ -431,7 +442,7 @@ mod agent {
}
}
} else {
job::failed(context.as_ref(), &db, &claim, "failed_agent").await;
job::failed(context.as_ref(), store.as_ref(), &claim, "failed_agent").await;
}
let cleanup_context = Arc::clone(&context);
@@ -449,7 +460,7 @@ mod agent {
}
async fn loop_agent(
db: Arc<StateRuntime>,
store: Arc<dyn GeneratedMemoryStore>,
token: String,
thread_id: ThreadId,
thread: &codex_core::CodexThread,
@@ -484,8 +495,7 @@ mod agent {
_ = status_poll_interval.tick() => {
}
_ = heartbeat_interval.tick() => {
match db
.memories()
match store
.heartbeat_global_phase2_job(
&token,
crate::stage_two::JOB_LEASE_SECONDS,

View File

@@ -12,6 +12,7 @@ use codex_features::Feature;
use codex_login::AuthManager;
use codex_protocol::ThreadId;
use codex_protocol::protocol::SessionSource;
use codex_state::GeneratedMemoryStore;
use std::sync::Arc;
use tracing::warn;
@@ -27,27 +28,57 @@ pub fn start_memories_startup_task(
config: Arc<Config>,
source: &SessionSource,
) {
if config.ephemeral
|| !config.features.enabled(Feature::MemoryTool)
|| source.is_non_root_agent()
{
if memories_startup_is_disabled(config.as_ref(), source) {
return;
}
let context = Arc::new(MemoryStartupContext::new(
let context = memory_startup_context(
thread_manager,
Arc::clone(&auth_manager),
thread_id,
thread,
config.as_ref(),
source.clone(),
));
if context.state_db().is_none() {
source,
);
let Some(state_db) = context.state_db() else {
warn!("state db unavailable for memories startup pipeline; skipping");
return;
};
let store: Arc<dyn GeneratedMemoryStore> = Arc::new(state_db.memories().clone());
spawn_memories_startup_task(context, auth_manager, config, store);
}
/// Starts startup memory generation with an injected generated-memory store.
pub fn start_memories_startup_task_with_store(
thread_manager: Arc<ThreadManager>,
auth_manager: Arc<AuthManager>,
thread_id: ThreadId,
thread: Arc<CodexThread>,
config: Arc<Config>,
source: &SessionSource,
store: Arc<dyn GeneratedMemoryStore>,
) {
if memories_startup_is_disabled(config.as_ref(), source) {
return;
}
let context = memory_startup_context(
thread_manager,
Arc::clone(&auth_manager),
thread_id,
thread,
config.as_ref(),
source,
);
spawn_memories_startup_task(context, auth_manager, config, store);
}
fn spawn_memories_startup_task(
context: Arc<MemoryStartupContext>,
auth_manager: Arc<AuthManager>,
config: Arc<Config>,
store: Arc<dyn GeneratedMemoryStore>,
) {
tokio::spawn(async move {
let root = memory_root(&config.codex_home);
if let Err(err) = tokio::fs::create_dir_all(&root).await {
@@ -60,7 +91,7 @@ pub fn start_memories_startup_task(
// Clean memories to make preserve DB size. This does not consume tokens so can be
// done before the quota check.
phase1::prune(context.as_ref(), &config).await;
phase1::prune(store.as_ref(), &config).await;
if !guard::rate_limits_ok(&auth_manager, &config).await {
context.counter(
@@ -72,8 +103,35 @@ pub fn start_memories_startup_task(
}
// Run phase 1.
phase1::run(Arc::clone(&context), Arc::clone(&config)).await;
phase1::run(
Arc::clone(&context),
Arc::clone(&config),
Arc::clone(&store),
)
.await;
// Run phase 2.
phase2::run(context, config).await;
phase2::run(context, config, store).await;
});
}
fn memory_startup_context(
thread_manager: Arc<ThreadManager>,
auth_manager: Arc<AuthManager>,
thread_id: ThreadId,
thread: Arc<CodexThread>,
config: &Config,
source: &SessionSource,
) -> Arc<MemoryStartupContext> {
Arc::new(MemoryStartupContext::new(
thread_manager,
auth_manager,
thread_id,
thread,
config,
source.clone(),
))
}
fn memories_startup_is_disabled(config: &Config, source: &SessionSource) -> bool {
config.ephemeral || !config.features.enabled(Feature::MemoryTool) || source.is_non_root_agent()
}

View File

@@ -0,0 +1,106 @@
use crate::MemoryStore;
use crate::Phase2JobClaimOutcome;
use crate::Stage1JobClaim;
use crate::Stage1Output;
use crate::Stage1StartupClaimParams;
use codex_protocol::ThreadId;
use std::future::Future;
use std::pin::Pin;
/// Boxed generated-memory store future usable behind a trait object.
pub type GeneratedMemoryStoreFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
macro_rules! generated_memory_store {
($( $(#[$method_doc:meta])* fn $method:ident<$lifetime:lifetime>(
$($arg:ident: $arg_type:ty),* $(,)?
) -> $output:ty; )*) => {
/// Persistence boundary for generated-memory startup job and output state.
///
/// Implementations own the stage-1 extraction rows plus the singleton
/// phase-2 consolidation lease. Callers rely on [`MemoryStore`]
/// ownership-token semantics: successful writes return `false` after
/// the caller loses the relevant job.
pub trait GeneratedMemoryStore: Send + Sync + 'static {
$( $(#[$method_doc])* fn $method<$lifetime>(
&$lifetime self,
$($arg: $arg_type),*
) -> GeneratedMemoryStoreFuture<$lifetime, $output>; )*
}
impl GeneratedMemoryStore for MemoryStore {
$( fn $method<$lifetime>(
&$lifetime self,
$($arg: $arg_type),*
) -> GeneratedMemoryStoreFuture<$lifetime, $output> {
Box::pin(MemoryStore::$method(self, $($arg),*))
} )*
}
};
}
generated_memory_store! {
/// Prunes stale generated stage-1 outputs that are no longer retained.
fn prune_stage1_outputs_for_retention<'a>(
max_unused_days: i64,
limit: usize,
) -> anyhow::Result<usize>;
/// Claims eligible startup stage-1 extraction jobs.
fn claim_stage1_jobs_for_startup<'a>(
current_thread_id: ThreadId,
params: Stage1StartupClaimParams<'a>,
) -> anyhow::Result<Vec<Stage1JobClaim>>;
/// Marks an owned stage-1 extraction job successful with generated output.
fn mark_stage1_job_succeeded<'a>(
thread_id: ThreadId,
ownership_token: &'a str,
source_updated_at: i64,
raw_memory: &'a str,
rollout_summary: &'a str,
rollout_slug: Option<&'a str>,
) -> anyhow::Result<bool>;
/// Marks an owned stage-1 extraction job successful without output.
fn mark_stage1_job_succeeded_no_output<'a>(
thread_id: ThreadId,
ownership_token: &'a str,
) -> anyhow::Result<bool>;
/// Marks an owned stage-1 extraction job failed with retry backoff.
fn mark_stage1_job_failed<'a>(
thread_id: ThreadId,
ownership_token: &'a str,
failure_reason: &'a str,
retry_delay_seconds: i64,
) -> anyhow::Result<bool>;
/// Claims the singleton global phase-2 consolidation lease.
fn try_claim_global_phase2_job<'a>(
worker_id: ThreadId,
lease_seconds: i64,
) -> anyhow::Result<Phase2JobClaimOutcome>;
/// Returns the current generated-memory inputs for phase-2 consolidation.
fn get_phase2_input_selection<'a>(
n: usize,
max_unused_days: i64,
) -> anyhow::Result<Vec<Stage1Output>>;
/// Extends the owned singleton global phase-2 consolidation lease.
fn heartbeat_global_phase2_job<'a>(
ownership_token: &'a str,
lease_seconds: i64,
) -> anyhow::Result<bool>;
/// Marks the owned singleton global phase-2 consolidation job successful.
fn mark_global_phase2_job_succeeded<'a>(
ownership_token: &'a str,
completed_watermark: i64,
selected_outputs: &'a [Stage1Output],
) -> anyhow::Result<bool>;
/// Marks the owned singleton global phase-2 consolidation job failed.
fn mark_global_phase2_job_failed<'a>(
ownership_token: &'a str,
failure_reason: &'a str,
retry_delay_seconds: i64,
) -> anyhow::Result<bool>;
/// Finalizes a failed singleton phase-2 job after ownership may be lost.
fn mark_global_phase2_job_failed_if_unowned<'a>(
ownership_token: &'a str,
failure_reason: &'a str,
retry_delay_seconds: i64,
) -> anyhow::Result<bool>;
}

View File

@@ -6,6 +6,7 @@
mod audit;
mod extract;
mod generated_memory_store;
pub mod log_db;
mod migrations;
mod model;
@@ -27,6 +28,8 @@ pub use audit::read_thread_state_audit_rows;
/// Most consumers should prefer [`StateRuntime`].
pub use extract::apply_rollout_item;
pub use extract::rollout_item_affects_thread_metadata;
pub use generated_memory_store::GeneratedMemoryStore;
pub use generated_memory_store::GeneratedMemoryStoreFuture;
pub use model::AgentJob;
pub use model::AgentJobCreateParams;
pub use model::AgentJobItem;