Compare commits

...

2 Commits

Author SHA1 Message Date
Wendy Jiao
b018a1dade Improve memory live update prompts 2026-02-16 09:41:41 -08:00
jif-oai
47149eee84 debug: view mem status line 2026-02-15 06:49:15 -08:00
10 changed files with 277 additions and 23 deletions

4
MODULE.bazel.lock generated

File diff suppressed because one or more lines are too long

View File

@@ -1365,6 +1365,7 @@ impl Session {
&sess,
Arc::clone(&config),
&session_configuration.session_source,
Some(sess.conversation_id.to_string()),
);
Ok(sess)
@@ -3743,10 +3744,15 @@ mod handlers {
state.session_configuration.session_source.clone()
};
crate::memories::start_memories_startup_task(sess, Arc::clone(config), &session_source);
crate::memories::start_memories_startup_task(
sess,
Arc::clone(config),
&session_source,
Some(sub_id.clone()),
);
sess.send_event_raw(Event {
id: sub_id.clone(),
id: sub_id,
msg: EventMsg::Warning(WarningEvent {
message: "Memory update triggered.".to_string(),
}),

View File

@@ -8,6 +8,7 @@ use crate::error::CodexErr;
use crate::memories::metrics;
use crate::memories::phase_one;
use crate::memories::prompts::build_stage_one_input_message;
use crate::memories::start::emit_memory_progress;
use crate::rollout::INTERACTIVE_SESSION_SOURCES;
use crate::rollout::policy::should_persist_response_item_for_memories;
use codex_api::ResponseEvent;
@@ -27,6 +28,8 @@ use serde_json::Value;
use serde_json::json;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use tracing::info;
use tracing::warn;
@@ -79,17 +82,42 @@ struct StageOneOutput {
/// 2) build one stage-1 request context
/// 3) run stage-1 extraction jobs in parallel
/// 4) emit metrics and logs
pub(in crate::memories) async fn run(session: &Arc<Session>, config: &Config) {
pub(in crate::memories) async fn run(
session: &Arc<Session>,
config: &Config,
progress_sub_id: &Option<String>,
) {
emit_memory_progress(
session.as_ref(),
progress_sub_id,
"phase 1 scanning candidates",
)
.await;
// 1. Claim startup job.
let Some(claimed_candidates) = claim_startup_jobs(session, &config.memories).await else {
return;
};
let claimed_count = claimed_candidates.len();
emit_memory_progress(
session.as_ref(),
progress_sub_id,
format!("phase 1 running (0/{claimed_count} done)"),
)
.await;
if claimed_candidates.is_empty() {
session.services.otel_manager.counter(
metrics::MEMORY_PHASE_ONE_JOBS,
1,
&[("status", "skipped_no_candidates")],
);
emit_memory_progress(
session.as_ref(),
progress_sub_id,
"phase 1 complete (0/0 succeeded)",
)
.await;
return;
}
@@ -97,7 +125,14 @@ pub(in crate::memories) async fn run(session: &Arc<Session>, config: &Config) {
let stage_one_context = build_request_context(session, config).await;
// 3. Run the parallel sampling.
let outcomes = run_jobs(session, claimed_candidates, stage_one_context).await;
let outcomes = run_jobs(
session,
claimed_candidates,
stage_one_context,
progress_sub_id,
claimed_count,
)
.await;
// 4. Metrics and logs.
let counts = aggregate_stats(outcomes);
@@ -110,6 +145,13 @@ pub(in crate::memories) async fn run(session: &Arc<Session>, config: &Config) {
counts.succeeded_no_output,
counts.failed
);
let succeeded_count = counts.succeeded_with_output + counts.succeeded_no_output;
emit_memory_progress(
session.as_ref(),
progress_sub_id,
format!("phase 1 complete ({succeeded_count}/{claimed_count} succeeded)"),
)
.await;
}
/// JSON schema used to constrain phase-1 model output.
@@ -207,12 +249,27 @@ async fn run_jobs(
session: &Arc<Session>,
claimed_candidates: Vec<codex_state::Stage1JobClaim>,
stage_one_context: RequestContext,
progress_sub_id: &Option<String>,
claimed_count: usize,
) -> Vec<JobResult> {
let completed_count = Arc::new(AtomicUsize::new(0));
futures::stream::iter(claimed_candidates.into_iter())
.map(|claim| {
let session = Arc::clone(session);
let stage_one_context = stage_one_context.clone();
async move { job::run(session.as_ref(), claim, &stage_one_context).await }
let progress_sub_id = progress_sub_id.clone();
let completed_count = Arc::clone(&completed_count);
async move {
let result = job::run(session.as_ref(), claim, &stage_one_context).await;
let done = completed_count.fetch_add(1, Ordering::Relaxed) + 1;
emit_memory_progress(
session.as_ref(),
&progress_sub_id,
format!("phase 1 running ({done}/{claimed_count} done)"),
)
.await;
result
}
})
.buffer_unordered(phase_one::CONCURRENCY_LIMIT)
.collect::<Vec<_>>()

View File

@@ -6,6 +6,7 @@ use crate::memories::memory_root;
use crate::memories::metrics;
use crate::memories::phase_two;
use crate::memories::prompts::build_consolidation_prompt;
use crate::memories::start::emit_memory_progress;
use crate::memories::storage::rebuild_raw_memories_file_from_memories;
use crate::memories::storage::sync_rollout_summaries_from_memories;
use codex_config::Constrained;
@@ -35,9 +36,18 @@ struct Counters {
/// Runs memory phase 2 (aka consolidation) in strict order. The method represents the linear
/// flow of the consolidation phase.
pub(super) async fn run(session: &Arc<Session>, config: Arc<Config>) {
pub(super) async fn run(
session: &Arc<Session>,
config: Arc<Config>,
progress_sub_id: &Option<String>,
) {
let Some(db) = session.services.state_db.as_deref() else {
// This should not happen.
emit_memory_progress(
session.as_ref(),
progress_sub_id,
"phase 2 skipped (state db unavailable)",
)
.await;
return;
};
let root = memory_root(&config.codex_home);
@@ -52,6 +62,12 @@ pub(super) async fn run(session: &Arc<Session>, config: Arc<Config>) {
1,
&[("status", e)],
);
let progress = match e {
"skipped_not_dirty" => "phase 2 up to date",
"skipped_running" => "phase 2 already running",
_ => "phase 2 failed to claim global job",
};
emit_memory_progress(session.as_ref(), progress_sub_id, progress).await;
return;
}
};
@@ -61,6 +77,12 @@ pub(super) async fn run(session: &Arc<Session>, config: Arc<Config>) {
// If we can't get the config, we can't consolidate.
tracing::error!("failed to get agent config");
job::failed(session, db, &claim, "failed_sandbox_policy").await;
emit_memory_progress(
session.as_ref(),
progress_sub_id,
"phase 2 failed (sandbox policy rejected)",
)
.await;
return;
};
@@ -70,6 +92,12 @@ pub(super) async fn run(session: &Arc<Session>, config: Arc<Config>) {
Err(err) => {
tracing::error!("failed to list stage1 outputs from global: {}", err);
job::failed(session, db, &claim, "failed_load_stage1_outputs").await;
emit_memory_progress(
session.as_ref(),
progress_sub_id,
"phase 2 failed (could not load stage-1 outputs)",
)
.await;
return;
}
};
@@ -83,6 +111,12 @@ pub(super) async fn run(session: &Arc<Session>, config: Arc<Config>) {
{
tracing::error!("failed syncing local memory artifacts for global consolidation: {err}");
job::failed(session, db, &claim, "failed_sync_artifacts").await;
emit_memory_progress(
session.as_ref(),
progress_sub_id,
"phase 2 failed (could not sync local artifacts)",
)
.await;
return;
}
// [`raw_memories.md`]
@@ -91,11 +125,23 @@ pub(super) async fn run(session: &Arc<Session>, config: Arc<Config>) {
{
tracing::error!("failed syncing local memory artifacts for global consolidation: {err}");
job::failed(session, db, &claim, "failed_rebuild_raw_memories").await;
emit_memory_progress(
session.as_ref(),
progress_sub_id,
"phase 2 failed (could not rebuild raw memories)",
)
.await;
return;
}
if raw_memories.is_empty() {
// We check only after sync of the file system.
job::succeed(session, db, &claim, new_watermark, "succeeded_no_input").await;
emit_memory_progress(
session.as_ref(),
progress_sub_id,
"phase 2 complete (no stage-1 outputs)",
)
.await;
return;
}
@@ -112,12 +158,25 @@ pub(super) async fn run(session: &Arc<Session>, config: Arc<Config>) {
Err(err) => {
tracing::error!("failed to spawn global memory consolidation agent: {err}");
job::failed(session, db, &claim, "failed_spawn_agent").await;
emit_memory_progress(
session.as_ref(),
progress_sub_id,
"phase 2 failed (could not spawn consolidation agent)",
)
.await;
return;
}
};
emit_memory_progress(session.as_ref(), progress_sub_id, "phase 2 running").await;
// 6. Spawn the agent handler.
agent::handle(session, claim, new_watermark, thread_id);
agent::handle(
session,
claim,
new_watermark,
thread_id,
progress_sub_id.clone(),
);
// 7. Metrics and logs.
let counters = Counters {
@@ -264,8 +323,18 @@ mod agent {
claim: Claim,
new_watermark: i64,
thread_id: ThreadId,
progress_sub_id: Option<String>,
) {
let Some(db) = session.services.state_db.clone() else {
let session = Arc::clone(session);
tokio::spawn(async move {
emit_memory_progress(
session.as_ref(),
&progress_sub_id,
"phase 2 failed (state db unavailable)",
)
.await;
});
return;
};
let session = session.clone();
@@ -279,6 +348,12 @@ mod agent {
Err(err) => {
tracing::error!("agent_control.subscribe_status failed: {err:?}");
job::failed(&session, &db, &claim, "failed_subscribe_status").await;
emit_memory_progress(
session.as_ref(),
&progress_sub_id,
"phase 2 failed (status subscription unavailable)",
)
.await;
return;
}
};
@@ -298,6 +373,13 @@ mod agent {
} else {
job::failed(&session, &db, &claim, "failed_agent").await;
}
let progress = match &final_status {
AgentStatus::Completed(_) => "phase 2 complete",
AgentStatus::Errored(_) | AgentStatus::NotFound => "phase 2 failed",
AgentStatus::Shutdown => "phase 2 cancelled",
AgentStatus::PendingInit | AgentStatus::Running => "phase 2 failed",
};
emit_memory_progress(session.as_ref(), &progress_sub_id, progress).await;
// Fire and forget close of the agent.
if !matches!(final_status, AgentStatus::Shutdown | AgentStatus::NotFound) {

View File

@@ -3,6 +3,9 @@ use crate::config::Config;
use crate::features::Feature;
use crate::memories::phase1;
use crate::memories::phase2;
use codex_protocol::protocol::BackgroundEventEvent;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::SessionSource;
use std::sync::Arc;
use tracing::warn;
@@ -15,6 +18,7 @@ pub(crate) fn start_memories_startup_task(
session: &Arc<Session>,
config: Arc<Config>,
source: &SessionSource,
progress_sub_id: Option<String>,
) {
if config.ephemeral
|| !config.features.enabled(Feature::MemoryTool)
@@ -25,6 +29,18 @@ pub(crate) fn start_memories_startup_task(
if session.services.state_db.is_none() {
warn!("state db unavailable for memories startup pipeline; skipping");
let weak_session = Arc::downgrade(session);
tokio::spawn(async move {
let Some(session) = weak_session.upgrade() else {
return;
};
emit_memory_progress(
session.as_ref(),
&progress_sub_id,
"phase 1 skipped (state db unavailable)",
)
.await;
});
return;
}
@@ -35,8 +51,27 @@ pub(crate) fn start_memories_startup_task(
};
// Run phase 1.
phase1::run(&session, &config).await;
phase1::run(&session, &config, &progress_sub_id).await;
// Run phase 2.
phase2::run(&session, config).await;
phase2::run(&session, config, &progress_sub_id).await;
});
}
pub(super) async fn emit_memory_progress(
session: &Session,
progress_sub_id: &Option<String>,
message: impl Into<String>,
) {
let Some(sub_id) = progress_sub_id.as_ref() else {
return;
};
session
.send_event_raw(Event {
id: sub_id.clone(),
msg: EventMsg::BackgroundEvent(BackgroundEventEvent {
message: format!("memory startup: {}", message.into()),
}),
})
.await;
}

View File

@@ -368,7 +368,7 @@ mod phase2 {
async fn dispatch_skips_when_global_job_is_not_dirty() {
let harness = DispatchHarness::new().await;
phase2::run(&harness.session, Arc::clone(&harness.config)).await;
phase2::run(&harness.session, Arc::clone(&harness.config), &None).await;
pretty_assertions::assert_eq!(harness.user_input_ops_count(), 0);
let thread_ids = harness.manager.list_thread_ids().await;
@@ -393,7 +393,7 @@ mod phase2 {
"precondition should claim the running lock"
);
phase2::run(&harness.session, Arc::clone(&harness.config)).await;
phase2::run(&harness.session, Arc::clone(&harness.config), &None).await;
let running_claim = harness
.state_db
@@ -421,7 +421,7 @@ mod phase2 {
"stale lock precondition should be claimed"
);
phase2::run(&harness.session, Arc::clone(&harness.config)).await;
phase2::run(&harness.session, Arc::clone(&harness.config), &None).await;
let running_claim = harness
.state_db
@@ -500,7 +500,7 @@ mod phase2 {
.await
.expect("enqueue global consolidation");
phase2::run(&harness.session, Arc::clone(&harness.config)).await;
phase2::run(&harness.session, Arc::clone(&harness.config), &None).await;
assert!(
!tokio::fs::try_exists(&stale_summary_path)
@@ -561,7 +561,7 @@ mod phase2 {
constrained_config.permissions.sandbox_policy =
Constrained::allow_only(SandboxPolicy::DangerFullAccess);
phase2::run(&harness.session, Arc::new(constrained_config)).await;
phase2::run(&harness.session, Arc::new(constrained_config), &None).await;
let retry_claim = harness
.state_db
@@ -583,7 +583,7 @@ mod phase2 {
.await
.expect("create file at memory root");
phase2::run(&harness.session, Arc::clone(&harness.config)).await;
phase2::run(&harness.session, Arc::clone(&harness.config), &None).await;
let retry_claim = harness
.state_db
@@ -605,7 +605,7 @@ mod phase2 {
.await
.expect("create raw_memories.md as a directory");
phase2::run(&harness.session, Arc::clone(&harness.config)).await;
phase2::run(&harness.session, Arc::clone(&harness.config), &None).await;
let retry_claim = harness
.state_db
@@ -677,7 +677,7 @@ mod phase2 {
"stage-1 success should enqueue global consolidation"
);
phase2::run(&session, Arc::clone(&config)).await;
phase2::run(&session, Arc::clone(&config), &None).await;
let retry_claim = state_db
.try_claim_global_phase2_job(ThreadId::new(), 3_600)

View File

@@ -36,11 +36,20 @@ under {{ base_path }}/rollout_summaries/ and {{ base_path }}/skills/.
During execution: if you hit repeated errors, confusing behavior, or you suspect
there is relevant prior context, it is worth redoing the quick memory pass.
When to update memory:
When to update memory (automatic, same turn):
- Treat memory as guidance, not truth: if memory conflicts with the current
repo state, tool outputs, or environment, the user feedback, the current state
wins. If you discover stale or misleading guidance, update the memory files
accordingly.
wins.
- Memory is writable. You are authorized to edit {{ base_path }}/MEMORY.md, {{ base_path }}/memory_summary.md when stale guidance is detected.
- If any memory fact conflicts with current evidence (repo state, tool output, or user correction), you MUST update memory in the same turn. Do not wait for a separate user prompt.
- Required behavior after detecting stale memory:
1) Verify the correct replacement using local evidence.
2) Edit memory files immediately:
- Always update {{ base_path }}/MEMORY.md.
- Also update {{ base_path }}/memory_summary.md if the correction affects reusable guidance.
3) Continue the task using the corrected memory.
4) In the final response, include a short note: Memory updated: <old> -> <new>.
- Only ask a clarifying question instead of editing when the replacement is ambiguous (multiple plausible targets with low confidence).
- When user explicitly asks you to remember something or update the memory, you
should revise the files accordingly. Usually you should directly update
memory_summary.md (such as general tips and user profile section) and MEMORY.md.

View File

@@ -91,6 +91,9 @@ pub(crate) enum StatusLineItem {
/// Full session UUID.
SessionId,
/// Most recent memory startup pipeline progress update.
MemoryProgress,
}
impl StatusLineItem {
@@ -124,6 +127,9 @@ impl StatusLineItem {
StatusLineItem::SessionId => {
"Current session identifier (omitted until session starts)"
}
StatusLineItem::MemoryProgress => {
"Memory startup phase progress (omitted until available)"
}
}
}
@@ -148,6 +154,7 @@ impl StatusLineItem {
StatusLineItem::TotalInputTokens => "17,588 in",
StatusLineItem::TotalOutputTokens => "265 out",
StatusLineItem::SessionId => "019c19bd-ceb6-73b0-adc8-8ec0397b85cf",
StatusLineItem::MemoryProgress => "memory p2 running",
}
}
}

View File

@@ -612,6 +612,8 @@ pub(crate) struct ChatWidget {
session_network_proxy: Option<codex_core::protocol::SessionNetworkProxyRuntime>,
// Shared latch so we only warn once about invalid status-line item IDs.
status_line_invalid_items_warned: Arc<AtomicBool>,
// Most recent memory startup progress message (without prefix).
memory_progress_status: Option<String>,
// Cached git branch name for the status line (None if unknown).
status_line_branch: Option<String>,
// CWD used to resolve the cached branch; change resets branch state.
@@ -2109,11 +2111,28 @@ impl ChatWidget {
fn on_background_event(&mut self, message: String) {
debug!("BackgroundEvent: {message}");
if let Some(progress) = Self::memory_progress_from_background_event(&message) {
self.memory_progress_status = Some(progress);
self.refresh_status_line();
}
self.bottom_pane.ensure_status_indicator();
self.bottom_pane.set_interrupt_hint_visible(true);
self.set_status_header(message);
}
fn memory_progress_from_background_event(message: &str) -> Option<String> {
let (prefix, progress) = message.split_once(':')?;
if !prefix.trim().eq_ignore_ascii_case("memory startup") {
return None;
}
let progress = progress.trim();
if progress.is_empty() {
None
} else {
Some(progress.to_string())
}
}
fn on_undo_started(&mut self, event: UndoStartedEvent) {
self.bottom_pane.ensure_status_indicator();
self.bottom_pane.set_interrupt_hint_visible(false);
@@ -2656,6 +2675,7 @@ impl ChatWidget {
current_cwd,
session_network_proxy: None,
status_line_invalid_items_warned,
memory_progress_status: None,
status_line_branch: None,
status_line_branch_cwd: None,
status_line_branch_pending: false,
@@ -2823,6 +2843,7 @@ impl ChatWidget {
current_cwd,
session_network_proxy: None,
status_line_invalid_items_warned,
memory_progress_status: None,
status_line_branch: None,
status_line_branch_cwd: None,
status_line_branch_pending: false,
@@ -2979,6 +3000,7 @@ impl ChatWidget {
current_cwd,
session_network_proxy: None,
status_line_invalid_items_warned,
memory_progress_status: None,
status_line_branch: None,
status_line_branch_cwd: None,
status_line_branch_pending: false,
@@ -4449,6 +4471,7 @@ impl ChatWidget {
format_tokens_compact(self.status_line_total_usage().output_tokens)
)),
StatusLineItem::SessionId => self.thread_id.map(|id| id.to_string()),
StatusLineItem::MemoryProgress => self.memory_progress_status.clone(),
}
}

View File

@@ -1127,6 +1127,7 @@ async fn make_chatwidget_manual(
current_cwd: None,
session_network_proxy: None,
status_line_invalid_items_warned: Arc::new(AtomicBool::new(false)),
memory_progress_status: None,
status_line_branch: None,
status_line_branch_cwd: None,
status_line_branch_pending: false,
@@ -6123,6 +6124,40 @@ async fn status_line_branch_refreshes_after_interrupt() {
assert!(chat.status_line_branch_pending);
}
#[tokio::test]
async fn status_line_memory_progress_updates_from_background_events() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(None).await;
chat.config.tui_status_line = Some(vec!["memory-progress".to_string()]);
chat.refresh_status_line();
assert_eq!(
chat.status_line_value_for_item(&StatusLineItem::MemoryProgress),
None
);
chat.handle_codex_event(Event {
id: "bg-1".into(),
msg: EventMsg::BackgroundEvent(BackgroundEventEvent {
message: "memory startup: phase 1 running".to_string(),
}),
});
assert_eq!(
chat.status_line_value_for_item(&StatusLineItem::MemoryProgress),
Some("phase 1 running".to_string())
);
chat.handle_codex_event(Event {
id: "bg-2".into(),
msg: EventMsg::BackgroundEvent(BackgroundEventEvent {
message: "Waiting for `vim`".to_string(),
}),
});
assert_eq!(
chat.status_line_value_for_item(&StatusLineItem::MemoryProgress),
Some("phase 1 running".to_string())
);
}
#[tokio::test]
async fn stream_recovery_restores_previous_status_header() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;