Compare commits

..

2 Commits

Author SHA1 Message Date
Joe Gershenson
c37ce40560 Polish pending transcript queue 2026-04-21 01:55:18 -07:00
Joe Gershenson
a514bf1683 Fix interrupt transcript ordering 2026-04-21 01:52:30 -07:00
25 changed files with 644 additions and 2779 deletions

76
MODULE.bazel.lock generated

File diff suppressed because one or more lines are too long

1079
codex-rs/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -244,7 +244,6 @@ env_logger = "0.11.9"
eventsource-stream = "0.2.3"
futures = { version = "0.3", default-features = false }
gethostname = "1.1.0"
gix = { version = "0.81.0", default-features = false, features = ["sha1"] }
glob = "0.3"
globset = "0.4"
hmac = "0.12.1"

View File

@@ -81,7 +81,6 @@ dunce = { workspace = true }
env-flags = { workspace = true }
eventsource-stream = { workspace = true }
futures = { workspace = true }
gix = { workspace = true }
http = { workspace = true }
iana-time-zone = { workspace = true }
image = { workspace = true, features = ["jpeg", "png", "webp"] }

View File

@@ -17,6 +17,7 @@ use codex_hooks::UserPromptSubmitRequest;
use codex_otel::HOOK_RUN_DURATION_METRIC;
use codex_otel::HOOK_RUN_METRIC;
use codex_protocol::items::TurnItem;
use codex_protocol::items::UserMessageItem;
use codex_protocol::models::DeveloperInstructions;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
@@ -34,6 +35,7 @@ use serde_json::Value;
use crate::event_mapping::parse_turn_item;
use crate::session::session::Session;
use crate::session::turn_context::TurnContext;
use crate::state::PendingTurnInput;
use crate::tools::sandboxing::PermissionRequestPayload;
pub(crate) struct HookRuntimeOutcome {
@@ -46,6 +48,12 @@ pub(crate) enum PendingInputHookDisposition {
Blocked { additional_contexts: Vec<String> },
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) enum PendingInputRecordOutcome {
Recorded,
Blocked,
}
pub(crate) enum PendingInputRecord {
UserMessage {
content: Vec<UserInput>,
@@ -268,6 +276,38 @@ pub(crate) async fn inspect_pending_input(
}
}
pub(crate) async fn inspect_pending_turn_input(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
pending_input_item: PendingTurnInput,
) -> PendingInputHookDisposition {
match pending_input_item {
PendingTurnInput::UserInput(input) => {
let response_item: ResponseItem = ResponseInputItem::from(input.clone()).into();
let user_prompt_submit_outcome = run_user_prompt_submit_hooks(
sess,
turn_context,
UserMessageItem::new(&input).message(),
)
.await;
if user_prompt_submit_outcome.should_stop {
PendingInputHookDisposition::Blocked {
additional_contexts: user_prompt_submit_outcome.additional_contexts,
}
} else {
PendingInputHookDisposition::Accepted(Box::new(PendingInputRecord::UserMessage {
content: input,
response_item,
additional_contexts: user_prompt_submit_outcome.additional_contexts,
}))
}
}
PendingTurnInput::ResponseInputItem(input) => {
inspect_pending_input(sess, turn_context, input).await
}
}
}
pub(crate) async fn record_pending_input(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
@@ -294,6 +334,25 @@ pub(crate) async fn record_pending_input(
}
}
pub(crate) async fn record_pending_turn_input(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
pending_input_item: PendingTurnInput,
) -> PendingInputRecordOutcome {
match inspect_pending_turn_input(sess, turn_context, pending_input_item).await {
PendingInputHookDisposition::Accepted(pending_input) => {
record_pending_input(sess, turn_context, *pending_input).await;
PendingInputRecordOutcome::Recorded
}
PendingInputHookDisposition::Blocked {
additional_contexts,
} => {
record_additional_contexts(sess, turn_context, additional_contexts).await;
PendingInputRecordOutcome::Blocked
}
}
}
async fn run_context_injecting_hook<Fut, Outcome>(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,

View File

@@ -83,31 +83,26 @@ What it does:
- syncs local memory artifacts under the memories root:
- `raw_memories.md` (merged raw memories, latest first)
- `rollout_summaries/` (one summary file per retained rollout)
- keeps the memories root itself as a git repository, initialized under
`~/.codex/memories/.git`
- keeps `phase2_workspace_diff.md` in the memories root `.gitignore`
- prunes stale rollout summaries that are no longer retained
- finds old resource files from memory extensions under
`memories/extensions/<extension>/resources/` for extension directories that
`memories_extensions/<extension>/resources/` for extension directories that
have an `instructions.md`, using the memory module retention window
- removes old extension resources before the consolidation agent runs, so the
deletion appears in the workspace diff
- writes `phase2_workspace_diff.md` in the memories root with the git-style diff
from the previous successful Phase 2 commit to the current worktree; this
generated file is ignored by the memories git workspace and commits
- if the memory workspace has no git changes after artifact sync/pruning, marks the job
- if there are no Phase 1 inputs or old extension resources, marks the job
successful and exits
If there is input, it then:
- spawns an internal consolidation sub-agent
- builds the Phase 2 prompt with the path to the generated workspace diff
- points the agent at `phase2_workspace_diff.md` for the detailed diff context
- builds the Phase 2 prompt with a diff of the current Phase 1 input
selection versus the last successful Phase 2 selection (`added`,
`retained`, `removed`)
- includes old extension resource paths in the prompt diff
- runs it with no approvals, no network, and local write access only
- disables collab for that agent (to prevent recursive delegation)
- watches the agent status and heartbeats the global job lease while it runs
- commits all memory workspace changes after the agent completes successfully
- marks the phase-2 job success/failure in the state DB when the agent finishes
- prunes old extension resource files after the consolidation agent completes
and the successful Phase 2 job is recorded
Selection diff behavior:
@@ -123,10 +118,8 @@ Selection diff behavior:
- rows that were previously selected but still exist outside the current top-N
selection are surfaced as `removed`
- before the agent starts, local `rollout_summaries/` and `raw_memories.md`
reflect the current selection so removed-thread summaries show up as
workspace deletions during forgetting; when removals are pending, existing
consolidated artifacts such as `MEMORY.md`, `memory_summary.md`, and
`skills/` are preserved for the agent's targeted cleanup
keep the union of the current selection and the previous successful
selection, so removed-thread evidence stays available during forgetting
Watermark behavior:

View File

@@ -1,15 +1,14 @@
use std::path::Path;
pub async fn clear_memory_roots_contents(codex_home: &Path) -> std::io::Result<()> {
let memory_root = codex_home.join("memories");
clear_memory_root_contents(&memory_root).await?;
let legacy_extensions_root = codex_home.join("memories_extensions");
match tokio::fs::symlink_metadata(&legacy_extensions_root).await {
Ok(_) => clear_memory_root_contents(&legacy_extensions_root).await,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(err) => Err(err),
for memory_root in [
codex_home.join("memories"),
codex_home.join("memories_extensions"),
] {
clear_memory_root_contents(memory_root.as_path()).await?;
}
Ok(())
}
pub(crate) async fn clear_memory_root_contents(memory_root: &Path) -> std::io::Result<()> {

View File

@@ -14,7 +14,6 @@ mod storage;
#[cfg(test)]
mod tests;
pub(crate) mod usage;
mod workspace;
use codex_protocol::openai_models::ReasoningEffort;
@@ -26,7 +25,7 @@ pub use control::clear_memory_roots_contents;
pub(crate) use start::start_memories_startup_task;
mod artifacts {
pub(super) const EXTENSIONS_SUBDIR: &str = "extensions";
pub(super) const EXTENSIONS_SUBDIR: &str = "memories_extensions";
pub(super) const ROLLOUT_SUMMARIES_SUBDIR: &str = "rollout_summaries";
pub(super) const RAW_MEMORIES_FILENAME: &str = "raw_memories.md";
}
@@ -112,7 +111,7 @@ fn rollout_summaries_dir(root: &Path) -> PathBuf {
}
fn memory_extensions_root(root: &Path) -> PathBuf {
root.join(artifacts::EXTENSIONS_SUBDIR)
root.with_file_name(artifacts::EXTENSIONS_SUBDIR)
}
fn raw_memories_file(root: &Path) -> PathBuf {

View File

@@ -1,20 +1,16 @@
use crate::agent::AgentStatus;
use crate::agent::status::is_final as is_final_agent_status;
use crate::config::Config;
use crate::memories::extensions::PendingExtensionResourceRemoval;
use crate::memories::extensions::find_old_extension_resources;
use crate::memories::extensions::remove_extension_resources;
use crate::memories::memory_root;
use crate::memories::metrics;
use crate::memories::phase_two;
use crate::memories::prompts::build_consolidation_prompt;
use crate::memories::storage::EmptyArtifactCleanup;
use crate::memories::storage::rebuild_raw_memories_file_from_memories;
use crate::memories::storage::rollout_summary_file_stem;
use crate::memories::storage::sync_rollout_summaries_from_memories;
use crate::memories::workspace::commit_all;
use crate::memories::workspace::has_changes;
use crate::memories::workspace::prepare_git_repo;
use crate::memories::workspace::remove_workspace_diff;
use crate::memories::workspace::write_workspace_diff;
use crate::session::emit_subagent_session_started;
use crate::session::session::Session;
use codex_config::Constrained;
@@ -28,7 +24,9 @@ use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::TokenUsage;
use codex_protocol::user_input::UserInput;
use codex_state::Stage1Output;
use codex_state::StateRuntime;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch;
@@ -75,12 +73,6 @@ pub(super) async fn run(session: &Arc<Session>, config: Arc<Config>) {
}
};
if let Err(err) = prepare_git_repo(&root).await {
tracing::error!("failed preparing memory workspace: {err}");
job::failed(session, db, &claim, "failed_prepare_workspace").await;
return;
}
// 2. Get the config for the agent
let Some(agent_config) = agent::get_config(config.clone()) else {
// If we can't get the config, we can't consolidate.
@@ -102,24 +94,15 @@ pub(super) async fn run(session: &Arc<Session>, config: Arc<Config>) {
}
};
let raw_memories = selection.selected.to_vec();
let artifact_memories = selection.selected.clone();
let empty_artifact_cleanup = if selection.removed.is_empty() {
EmptyArtifactCleanup::RemoveConsolidated
} else {
EmptyArtifactCleanup::PreserveConsolidated
};
let artifact_memories = artifact_memories_for_phase2(&selection);
let new_watermark = get_watermark(claim.watermark, &raw_memories);
// 4. Update the file system by syncing the raw memories with the one extracted from DB at
// step 3
// [`rollout_summaries/`]
if let Err(err) = sync_rollout_summaries_from_memories(
&root,
&artifact_memories,
artifact_memories.len(),
empty_artifact_cleanup,
)
.await
if let Err(err) =
sync_rollout_summaries_from_memories(&root, &artifact_memories, artifact_memories.len())
.await
{
tracing::error!("failed syncing local memory artifacts for global consolidation: {err}");
job::failed(session, db, &claim, "failed_sync_artifacts").await;
@@ -135,40 +118,26 @@ pub(super) async fn run(session: &Arc<Session>, config: Arc<Config>) {
return;
}
let pending_extension_resource_removals = find_old_extension_resources(&root).await;
remove_extension_resources(&pending_extension_resource_removals).await;
let workspace_has_changes = match has_changes(&root).await {
Ok(has_changes) => has_changes,
Err(err) => {
tracing::error!("failed checking memory workspace changes: {err}");
job::failed(session, db, &claim, "failed_workspace_status").await;
return;
}
};
if !workspace_has_changes {
if let Err(err) = remove_workspace_diff(&root).await {
tracing::warn!("failed removing stale memory workspace diff file: {err}");
}
let removed_extension_resources = pending_extension_resource_removals
.iter()
.map(|resource| resource.removed.clone())
.collect::<Vec<_>>();
if raw_memories.is_empty() && pending_extension_resource_removals.is_empty() {
// We check only after sync of the file system.
job::succeed(
session,
db,
&claim,
new_watermark,
&raw_memories,
&[],
"succeeded_no_input",
)
.await;
return;
}
if let Err(err) = write_workspace_diff(&root).await {
tracing::error!("failed writing memory workspace diff file: {err}");
job::failed(session, db, &claim, "failed_workspace_diff_file").await;
return;
}
// 5. Spawn the agent
let prompt = agent::get_prompt(config);
let prompt = agent::get_prompt(config, &selection, &removed_extension_resources);
let source = SessionSource::SubAgent(SubAgentSource::MemoryConsolidation);
let agent_control = session.services.agent_control.detached_registry();
let thread_id = match agent_control
@@ -210,7 +179,7 @@ pub(super) async fn run(session: &Arc<Session>, config: Arc<Config>) {
claim,
new_watermark,
raw_memories.clone(),
root,
pending_extension_resource_removals,
thread_id,
agent_control,
phase_two_e2e_timer,
@@ -223,6 +192,22 @@ pub(super) async fn run(session: &Arc<Session>, config: Arc<Config>) {
emit_metrics(session, counters);
}
fn artifact_memories_for_phase2(
selection: &codex_state::Phase2InputSelection,
) -> Vec<Stage1Output> {
let mut seen = HashSet::new();
let mut memories = selection.selected.clone();
for memory in &selection.selected {
seen.insert(rollout_summary_file_stem(memory));
}
for memory in &selection.previous_selected {
if seen.insert(rollout_summary_file_stem(memory)) {
memories.push(memory.clone());
}
}
memories
}
mod job {
use super::*;
@@ -363,9 +348,13 @@ mod agent {
Some(agent_config)
}
pub(super) fn get_prompt(config: Arc<Config>) -> Vec<UserInput> {
pub(super) fn get_prompt(
config: Arc<Config>,
selection: &codex_state::Phase2InputSelection,
removed_extension_resources: &[crate::memories::extensions::RemovedExtensionResource],
) -> Vec<UserInput> {
let root = memory_root(&config.codex_home);
let prompt = build_consolidation_prompt(&root);
let prompt = build_consolidation_prompt(&root, selection, removed_extension_resources);
vec![UserInput::Text {
text: prompt,
text_elements: vec![],
@@ -379,7 +368,7 @@ mod agent {
claim: Claim,
new_watermark: i64,
selected_outputs: Vec<codex_state::Stage1Output>,
memory_root: codex_utils_absolute_path::AbsolutePathBuf,
pending_extension_resource_removals: Vec<PendingExtensionResourceRemoval>,
thread_id: ThreadId,
agent_control: crate::agent::AgentControl,
phase_two_e2e_timer: Option<codex_otel::Timer>,
@@ -416,32 +405,7 @@ mod agent {
if let Some(token_usage) = agent_control.get_total_token_usage(thread_id).await {
emit_token_usage_metrics(&session, &token_usage);
}
// Do not commit if we lost the lock
match db
.heartbeat_global_phase2_job(&claim.token, phase_two::JOB_LEASE_SECONDS)
.await
{
Ok(true) => {}
Ok(false) => {
tracing::error!(
"lost global memory consolidation ownership before committing workspace"
);
return;
}
Err(err) => {
tracing::error!(
"failed confirming global memory consolidation ownership before committing workspace: {err}"
);
job::failed(&session, &db, &claim, "failed_confirm_ownership").await;
return;
}
}
if let Err(err) = commit_all(&memory_root).await {
tracing::error!("failed committing memory workspace: {err}");
job::failed(&session, &db, &claim, "failed_workspace_commit").await;
return;
}
if !job::succeed(
if job::succeed(
&session,
&db,
&claim,
@@ -451,9 +415,7 @@ mod agent {
)
.await
{
tracing::error!(
"failed marking global memory consolidation job succeeded after committing workspace"
);
remove_extension_resources(&pending_extension_resource_removals).await;
}
} else {
job::failed(&session, &db, &claim, "failed_agent").await;

View File

@@ -1,12 +1,18 @@
use crate::memories::extensions::EXTENSION_RESOURCE_RETENTION_DAYS;
use crate::memories::extensions::RemovedExtensionResource;
use crate::memories::memory_extensions_root;
use crate::memories::memory_root;
use crate::memories::phase_one;
use crate::memories::workspace::WORKSPACE_DIFF_FILENAME;
use crate::memories::storage::rollout_summary_file_stem_from_parts;
use codex_protocol::openai_models::ModelInfo;
use codex_state::Phase2InputSelection;
use codex_state::Stage1Output;
use codex_state::Stage1OutputRef;
use codex_utils_absolute_path::AbsolutePathBuf;
use codex_utils_output_truncation::TruncationPolicy;
use codex_utils_output_truncation::truncate_text;
use codex_utils_template::Template;
use std::fmt::Write as _;
use std::path::Path;
use std::sync::LazyLock;
use tokio::fs;
@@ -59,9 +65,9 @@ Memory extensions (under {{ memory_extensions_root }}/):
source.
If the user has any memory extensions, you MUST read the instructions for each extension to
determine how to use the memory source. If the workspace diff shows deleted extension resource files,
remove stale memories derived only from those resources. If it has no extension folders, continue
with the standard memory inputs only.
determine how to use the memory source. If the Phase 2 diff lists removed memory extension
resources, use that extension-specific deletion diff to remove stale memories derived only from
those resources. If it has no extension folders, continue with the standard memory inputs only.
"#;
const MEMORY_EXTENSIONS_PRIMARY_INPUTS: &str = r#"
@@ -72,17 +78,20 @@ Under `{{ memory_extensions_root }}/`:
- If extension folders exist, read each instructions.md first and follow it when interpreting
that extension's memory source.
If the workspace diff shows deleted memory extension resources, use that extension-specific deletion
signal to remove stale memories derived only from those resources.
If the Phase 2 diff lists removed memory extension resources, use that extension-specific deletion
diff to remove stale memories derived only from those resources.
"#;
/// Builds the consolidation subagent prompt for a specific memory root.
pub(super) fn build_consolidation_prompt(memory_root: &Path) -> String {
pub(super) fn build_consolidation_prompt(
memory_root: &Path,
selection: &Phase2InputSelection,
removed_extension_resources: &[RemovedExtensionResource],
) -> String {
let memory_extensions_root = memory_extensions_root(memory_root);
let memory_extensions_exist = memory_extensions_root.is_dir();
let memory_root = memory_root.display().to_string();
let memory_extensions_root = memory_extensions_root.display().to_string();
let phase2_workspace_diff_file = WORKSPACE_DIFF_FILENAME.to_string();
let memory_extensions_folder_structure = if memory_extensions_exist {
render_memory_extensions_block(
&MEMORY_EXTENSIONS_FOLDER_STRUCTURE_TEMPLATE,
@@ -99,6 +108,8 @@ pub(super) fn build_consolidation_prompt(memory_root: &Path) -> String {
} else {
String::new()
};
let phase2_input_selection =
render_phase2_input_selection(selection, removed_extension_resources);
CONSOLIDATION_PROMPT_TEMPLATE
.render([
("memory_root", memory_root.as_str()),
@@ -110,15 +121,12 @@ pub(super) fn build_consolidation_prompt(memory_root: &Path) -> String {
"memory_extensions_primary_inputs",
memory_extensions_primary_inputs.as_str(),
),
(
"phase2_workspace_diff_file",
phase2_workspace_diff_file.as_str(),
),
("phase2_input_selection", phase2_input_selection.as_str()),
])
.unwrap_or_else(|err| {
warn!("failed to render memories consolidation prompt template: {err}");
format!(
"## Memory Phase 2 (Consolidation)\nConsolidate Codex memories in: {memory_root}\n\nRead {phase2_workspace_diff_file} first."
"## Memory Phase 2 (Consolidation)\nConsolidate Codex memories in: {memory_root}\n\n{phase2_input_selection}"
)
})
}
@@ -132,6 +140,94 @@ fn render_memory_extensions_block(template: &Template, memory_extensions_root: &
})
}
fn render_phase2_input_selection(
selection: &Phase2InputSelection,
removed_extension_resources: &[RemovedExtensionResource],
) -> String {
let retained = selection.retained_thread_ids.len();
let added = selection.selected.len().saturating_sub(retained);
let selected = if selection.selected.is_empty() {
"- none".to_string()
} else {
selection
.selected
.iter()
.map(|item| {
render_selected_input_line(
item,
selection.retained_thread_ids.contains(&item.thread_id),
)
})
.collect::<Vec<_>>()
.join("\n")
};
let removed = if selection.removed.is_empty() {
"- none".to_string()
} else {
selection
.removed
.iter()
.map(render_removed_input_line)
.collect::<Vec<_>>()
.join("\n")
};
let mut rendered = format!(
"- selected inputs this run: {}\n- newly added since the last successful Phase 2 run: {added}\n- retained from the last successful Phase 2 run: {retained}\n- removed from the last successful Phase 2 run: {}\n\nCurrent selected Phase 1 inputs:\n{selected}\n\nRemoved from the last successful Phase 2 selection:\n{removed}\n",
selection.selected.len(),
selection.removed.len(),
);
if !removed_extension_resources.is_empty() {
rendered.push_str("\nMemory extension resources removed by retention pruning:\n");
let _ = writeln!(
rendered,
"- retention window: {EXTENSION_RESOURCE_RETENTION_DAYS} days"
);
let mut current_extension = "";
for removed_resource in removed_extension_resources {
if removed_resource.extension != current_extension {
current_extension = &removed_resource.extension;
let _ = writeln!(rendered, "- extension: {current_extension}");
}
let _ = writeln!(rendered, " - {}", removed_resource.resource_path);
}
}
rendered
}
fn render_selected_input_line(item: &Stage1Output, retained: bool) -> String {
let status = if retained { "retained" } else { "added" };
let rollout_summary_file = format!(
"rollout_summaries/{}.md",
rollout_summary_file_stem_from_parts(
item.thread_id,
item.source_updated_at,
item.rollout_slug.as_deref(),
)
);
format!(
"- [{status}] thread_id={}, rollout_summary_file={rollout_summary_file}",
item.thread_id
)
}
fn render_removed_input_line(item: &Stage1OutputRef) -> String {
let rollout_summary_file = format!(
"rollout_summaries/{}.md",
rollout_summary_file_stem_from_parts(
item.thread_id,
item.source_updated_at,
item.rollout_slug.as_deref(),
)
);
format!(
"- thread_id={}, rollout_summary_file={rollout_summary_file}",
item.thread_id
)
}
/// Builds the stage-1 user message containing rollout metadata and content.
///
/// Large rollout payloads are truncated to 70% of the active model's effective

View File

@@ -1,5 +1,7 @@
use super::*;
use crate::memories::extensions::RemovedExtensionResource;
use codex_models_manager::model_info::model_info_from_slug;
use codex_state::Phase2InputSelection;
use core_test_support::PathExt;
use pretty_assertions::assert_eq;
use tempfile::tempdir;
@@ -56,17 +58,33 @@ fn build_stage_one_input_message_uses_default_limit_when_model_context_window_mi
}
#[test]
fn build_consolidation_prompt_includes_workspace_changes() {
fn build_consolidation_prompt_includes_removed_extension_resources() {
let temp = tempdir().unwrap();
let memory_root = temp.path().join("memories");
std::fs::create_dir_all(memory_root.join("extensions")).unwrap();
std::fs::create_dir_all(temp.path().join("memories_extensions")).unwrap();
let removed_extension_resources = vec![
RemovedExtensionResource {
extension: "chronicle".to_string(),
resource_path: "resources/2026-04-06T11-59-59-abcd-10min-old.md".to_string(),
},
RemovedExtensionResource {
extension: "chronicle".to_string(),
resource_path: "resources/2026-04-07T12-00-00-abcd-10min-cutoff.md".to_string(),
},
];
let prompt = build_consolidation_prompt(&memory_root);
let prompt = build_consolidation_prompt(
&memory_root,
&Phase2InputSelection::default(),
&removed_extension_resources,
);
assert!(prompt.contains("Memory workspace diff:"));
assert!(prompt.contains("phase2_workspace_diff.md"));
assert!(prompt.contains("previous successful Phase 2 commit"));
assert!(prompt.contains("extension-specific deletion"));
assert!(prompt.contains("Memory extension resources removed by retention pruning:"));
assert!(prompt.contains("- retention window: 7 days"));
assert!(prompt.contains("- extension: chronicle"));
assert!(prompt.contains(" - resources/2026-04-06T11-59-59-abcd-10min-old.md"));
assert!(prompt.contains(" - resources/2026-04-07T12-00-00-abcd-10min-cutoff.md"));
assert!(prompt.contains("extension-specific deletion diff"));
}
#[tokio::test]

View File

@@ -9,12 +9,6 @@ use crate::memories::ensure_layout;
use crate::memories::raw_memories_file;
use crate::memories::rollout_summaries_dir;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum EmptyArtifactCleanup {
RemoveConsolidated,
PreserveConsolidated,
}
/// Rebuild `raw_memories.md` from DB-backed stage-1 outputs.
pub(super) async fn rebuild_raw_memories_file_from_memories(
root: &Path,
@@ -30,7 +24,6 @@ pub(super) async fn sync_rollout_summaries_from_memories(
root: &Path,
memories: &[Stage1Output],
max_raw_memories_for_consolidation: usize,
empty_artifact_cleanup: EmptyArtifactCleanup,
) -> std::io::Result<()> {
ensure_layout(root).await?;
@@ -45,7 +38,7 @@ pub(super) async fn sync_rollout_summaries_from_memories(
write_rollout_summary_for_thread(root, memory).await?;
}
if retained.is_empty() && empty_artifact_cleanup == EmptyArtifactCleanup::RemoveConsolidated {
if retained.is_empty() {
for file_name in ["MEMORY.md", "memory_summary.md"] {
let path = root.join(file_name);
if let Err(err) = tokio::fs::remove_file(path).await

View File

@@ -1,6 +1,4 @@
use super::control::clear_memory_root_contents;
use super::control::clear_memory_roots_contents;
use super::storage::EmptyArtifactCleanup;
use super::storage::rebuild_raw_memories_file_from_memories;
use super::storage::sync_rollout_summaries_from_memories;
use crate::memories::ensure_layout;
@@ -104,59 +102,6 @@ async fn clear_memory_root_contents_preserves_root_directory() {
);
}
#[tokio::test]
async fn clear_memory_roots_contents_clears_legacy_extensions_root() {
let dir = tempdir().expect("tempdir");
let memory_root = dir.path().join("memories");
let new_extensions_root = memory_root.join("extensions");
let legacy_extensions_root = dir.path().join("memories_extensions");
tokio::fs::create_dir_all(&new_extensions_root)
.await
.expect("create new extensions dir");
tokio::fs::create_dir_all(&legacy_extensions_root)
.await
.expect("create legacy extensions dir");
tokio::fs::write(
new_extensions_root.join("stale.md"),
"new stale extension\n",
)
.await
.expect("write new extension artifact");
tokio::fs::write(
legacy_extensions_root.join("stale.md"),
"legacy stale extension\n",
)
.await
.expect("write legacy extension artifact");
clear_memory_roots_contents(dir.path())
.await
.expect("clear memory roots contents");
let mut memory_entries = tokio::fs::read_dir(&memory_root)
.await
.expect("read memory root after clear");
assert!(
memory_entries
.next_entry()
.await
.expect("read memory root entry")
.is_none(),
"memory root should be empty after clearing contents"
);
let mut legacy_entries = tokio::fs::read_dir(&legacy_extensions_root)
.await
.expect("read legacy extensions root after clear");
assert!(
legacy_entries
.next_entry()
.await
.expect("read legacy extensions root entry")
.is_none(),
"legacy extensions root should be empty after clearing contents"
);
}
#[cfg(unix)]
#[tokio::test]
async fn clear_memory_root_contents_rejects_symlinked_root() {
@@ -218,7 +163,6 @@ async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only
&root,
&memories,
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION,
EmptyArtifactCleanup::RemoveConsolidated,
)
.await
.expect("sync rollout summaries");
@@ -292,70 +236,6 @@ async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only
assert!(rollout_path_pos < file_pos);
}
#[tokio::test]
async fn sync_rollout_summaries_preserves_consolidated_artifacts_for_removed_inputs() {
let dir = tempdir().expect("tempdir");
let root = dir.path().join("memory");
let summaries_dir = rollout_summaries_dir(&root);
let skills_dir = root.join("skills/demo");
tokio::fs::create_dir_all(&summaries_dir)
.await
.expect("create rollout summaries dir");
tokio::fs::create_dir_all(&skills_dir)
.await
.expect("create skills dir");
let stale_summary_path = summaries_dir.join("stale.md");
let memory_path = root.join("MEMORY.md");
let memory_summary_path = root.join("memory_summary.md");
let skill_path = skills_dir.join("SKILL.md");
tokio::fs::write(&stale_summary_path, "stale rollout summary\n")
.await
.expect("write stale rollout summary");
tokio::fs::write(&memory_path, "consolidated memory\n")
.await
.expect("write memory");
tokio::fs::write(&memory_summary_path, "summary\n")
.await
.expect("write memory summary");
tokio::fs::write(&skill_path, "skill\n")
.await
.expect("write skill");
sync_rollout_summaries_from_memories(
&root,
&[],
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION,
EmptyArtifactCleanup::PreserveConsolidated,
)
.await
.expect("sync rollout summaries");
assert!(
!tokio::fs::try_exists(&stale_summary_path)
.await
.expect("check stale summary path"),
"removed-only sync should still prune stale rollout summaries"
);
assert!(
tokio::fs::try_exists(&memory_path)
.await
.expect("check memory path"),
"removed-only sync should preserve MEMORY.md for surgical cleanup"
);
assert!(
tokio::fs::try_exists(&memory_summary_path)
.await
.expect("check memory summary path"),
"removed-only sync should preserve memory_summary.md for surgical cleanup"
);
assert!(
tokio::fs::try_exists(&skill_path)
.await
.expect("check skill path"),
"removed-only sync should preserve skills for surgical cleanup"
);
}
#[tokio::test]
async fn sync_rollout_summaries_uses_timestamp_hash_and_sanitized_slug_filename() {
let dir = tempdir().expect("tempdir");
@@ -389,7 +269,6 @@ async fn sync_rollout_summaries_uses_timestamp_hash_and_sanitized_slug_filename(
&root,
&memories,
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION,
EmptyArtifactCleanup::RemoveConsolidated,
)
.await
.expect("sync rollout summaries");
@@ -493,7 +372,6 @@ task_outcome: success
&root,
&memories,
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION,
EmptyArtifactCleanup::RemoveConsolidated,
)
.await
.expect("sync rollout summaries");
@@ -544,10 +422,6 @@ mod phase2 {
use crate::memories::phase2;
use crate::memories::raw_memories_file;
use crate::memories::rollout_summaries_dir;
use crate::memories::storage::EmptyArtifactCleanup;
use crate::memories::storage::rebuild_raw_memories_file_from_memories;
use crate::memories::storage::sync_rollout_summaries_from_memories;
use crate::memories::workspace::prepare_git_repo;
use crate::session::session::Session;
use crate::session::tests::make_session_and_context;
use chrono::Duration as ChronoDuration;
@@ -634,7 +508,7 @@ mod phase2 {
}
}
async fn seed_stage1_output(&self, source_updated_at: i64) -> ThreadId {
async fn seed_stage1_output(&self, source_updated_at: i64) {
let thread_id = ThreadId::new();
let mut metadata_builder = ThreadMetadataBuilder::new(
thread_id,
@@ -683,7 +557,6 @@ mod phase2 {
.expect("mark stage-1 success"),
"stage-1 success should enqueue global consolidation"
);
thread_id
}
async fn shutdown_threads(&self) {
@@ -903,7 +776,7 @@ mod phase2 {
}
#[tokio::test]
async fn dispatch_with_empty_stage1_outputs_spawns_for_workspace_changes() {
async fn dispatch_with_empty_stage1_outputs_rebuilds_local_artifacts() {
let harness = DispatchHarness::new().await;
let root = memory_root(&harness.config.codex_home);
let summaries_dir = rollout_summaries_dir(&root);
@@ -985,160 +858,13 @@ mod phase2 {
.state_db
.try_claim_global_phase2_job(ThreadId::new(), /*lease_seconds*/ 3_600)
.await
.expect("claim global job after empty consolidation dispatch");
pretty_assertions::assert_eq!(next_claim, Phase2JobClaimOutcome::SkippedRunning);
pretty_assertions::assert_eq!(harness.user_input_ops_count(), 1);
let thread_ids = harness.manager.list_thread_ids().await;
pretty_assertions::assert_eq!(thread_ids.len(), 1);
harness.shutdown_threads().await;
}
#[tokio::test]
async fn dispatch_with_removed_only_inputs_preserves_consolidated_artifacts() {
let harness = DispatchHarness::new().await;
let source_updated_at = Utc::now().timestamp();
let thread_id = harness.seed_stage1_output(source_updated_at).await;
let root = memory_root(&harness.config.codex_home);
let selection = harness
.state_db
.get_phase2_input_selection(/*n*/ 1, /*max_unused_days*/ 30)
.await
.expect("load phase2 input selection");
let selected = selection.selected.clone();
sync_rollout_summaries_from_memories(
&root,
&selected,
selected.len(),
EmptyArtifactCleanup::RemoveConsolidated,
)
.await
.expect("sync selected rollout summaries");
rebuild_raw_memories_file_from_memories(&root, &selected, selected.len())
.await
.expect("sync selected raw memories");
let memory_index_path = root.join("MEMORY.md");
let memory_summary_path = root.join("memory_summary.md");
let skill_path = root.join("skills/demo/SKILL.md");
tokio::fs::write(&memory_index_path, "consolidated memory index\n")
.await
.expect("write memory index");
tokio::fs::write(&memory_summary_path, "consolidated memory summary\n")
.await
.expect("write memory summary");
tokio::fs::create_dir_all(skill_path.parent().expect("skill parent"))
.await
.expect("create skill dir");
tokio::fs::write(&skill_path, "consolidated skill\n")
.await
.expect("write skill");
prepare_git_repo(&root)
.await
.expect("commit current memory workspace as baseline");
let claim = harness
.state_db
.try_claim_global_phase2_job(ThreadId::new(), /*lease_seconds*/ 3_600)
.await
.expect("claim global phase2 job");
let Phase2JobClaimOutcome::Claimed {
ownership_token, ..
} = claim
else {
panic!("unexpected phase2 claim outcome: {claim:?}");
};
assert!(
harness
.state_db
.mark_global_phase2_job_succeeded(&ownership_token, source_updated_at, &selected,)
.await
.expect("mark phase2 succeeded"),
"phase2 success should update selected baseline"
);
assert!(
harness
.state_db
.mark_thread_memory_mode_polluted(thread_id)
.await
.expect("mark thread polluted"),
"polluted selected thread should enqueue phase2 forgetting"
);
phase2::run(&harness.session, Arc::clone(&harness.config)).await;
pretty_assertions::assert_eq!(harness.user_input_ops_count(), 1);
assert!(
tokio::fs::try_exists(&memory_index_path)
.await
.expect("check memory index existence"),
"removed-only phase2 should preserve MEMORY.md before agent cleanup"
);
assert!(
tokio::fs::try_exists(&memory_summary_path)
.await
.expect("check memory summary existence"),
"removed-only phase2 should preserve memory_summary.md before agent cleanup"
);
assert!(
tokio::fs::try_exists(&skill_path)
.await
.expect("check skill existence"),
"removed-only phase2 should preserve skills before agent cleanup"
);
let workspace_diff = tokio::fs::read_to_string(root.join("phase2_workspace_diff.md"))
.await
.expect("read workspace diff");
assert!(
workspace_diff.contains("- D rollout_summaries/"),
"removed-only phase2 should surface deleted rollout summaries: {workspace_diff}"
);
assert!(
!workspace_diff.contains("- D MEMORY.md"),
"removed-only phase2 should not delete consolidated memory wholesale: {workspace_diff}"
);
harness.shutdown_threads().await;
}
#[tokio::test]
async fn dispatch_with_clean_workspace_preserves_selected_phase2_baseline() {
let harness = DispatchHarness::new().await;
let thread_id = harness.seed_stage1_output(Utc::now().timestamp()).await;
let root = memory_root(&harness.config.codex_home);
let selection = harness
.state_db
.get_phase2_input_selection(/*n*/ 1, /*max_unused_days*/ 30)
.await
.expect("load phase2 input selection");
let selected = selection.selected.clone();
sync_rollout_summaries_from_memories(
&root,
&selected,
selected.len(),
EmptyArtifactCleanup::RemoveConsolidated,
)
.await
.expect("sync selected rollout summaries");
rebuild_raw_memories_file_from_memories(&root, &selected, selected.len())
.await
.expect("sync selected raw memories");
prepare_git_repo(&root)
.await
.expect("commit current memory workspace as baseline");
phase2::run(&harness.session, Arc::clone(&harness.config)).await;
.expect("claim global job after empty consolidation success");
pretty_assertions::assert_eq!(next_claim, Phase2JobClaimOutcome::SkippedNotDirty);
pretty_assertions::assert_eq!(harness.user_input_ops_count(), 0);
let selection = harness
.state_db
.get_phase2_input_selection(/*n*/ 1, /*max_unused_days*/ 30)
.await
.expect("load phase2 input selection after clean workspace success");
pretty_assertions::assert_eq!(selection.selected.len(), 1);
pretty_assertions::assert_eq!(selection.selected[0].thread_id, thread_id);
pretty_assertions::assert_eq!(selection.retained_thread_ids, vec![thread_id]);
pretty_assertions::assert_eq!(selection.removed, Vec::new());
let thread_ids = harness.manager.list_thread_ids().await;
pretty_assertions::assert_eq!(thread_ids.len(), 0);
harness.shutdown_threads().await;
}
#[tokio::test]
@@ -1281,14 +1007,14 @@ mod phase2 {
let chronicle_resources = config
.codex_home
.join("memories/extensions/chronicle/resources");
.join("memories_extensions/chronicle/resources");
tokio::fs::create_dir_all(&chronicle_resources)
.await
.expect("create chronicle resources");
tokio::fs::write(
config
.codex_home
.join("memories/extensions/chronicle/instructions.md"),
.join("memories_extensions/chronicle/instructions.md"),
"instructions",
)
.await
@@ -1313,10 +1039,10 @@ mod phase2 {
"spawn failures should leave the job in retry backoff instead of running"
);
assert!(
!tokio::fs::try_exists(&old_file)
tokio::fs::try_exists(&old_file)
.await
.expect("check old extension resource"),
"phase2 should prune old extension resources before spawn"
"spawn failures should not prune extension resources before retry"
);
}
}

View File

@@ -1,615 +0,0 @@
use anyhow::Context;
use gix::hash::ObjectId;
use gix::objs::Tree;
use gix::objs::tree::Entry;
use gix::objs::tree::EntryKind;
use gix::objs::tree::EntryMode;
use similar::TextDiff;
use std::collections::BTreeMap;
use std::ffi::OsStr;
use std::fs;
use std::path::Path;
use std::path::PathBuf;
use tokio::task;
/// Generated diff file the Phase 2 consolidation agent reads before editing memories.
pub(super) const WORKSPACE_DIFF_FILENAME: &str = "phase2_workspace_diff.md";
const GITIGNORE_FILENAME: &str = ".gitignore";
const INITIAL_COMMIT_MESSAGE: &str =
"Initialize Codex memories git state\n\nCo-authored-by: Codex <noreply@openai.com>";
const GITIGNORE_COMMIT_MESSAGE: &str =
"Ignore generated Codex memories diff\n\nCo-authored-by: Codex <noreply@openai.com>";
const CONSOLIDATION_COMMIT_MESSAGE: &str =
"Consolidate Codex memories\n\nCo-authored-by: Codex <noreply@openai.com>";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct MemoryWorkspaceFileEntry {
oid: ObjectId,
mode: EntryMode,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum WorkspaceChangeStatus {
Added,
Modified,
Deleted,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct WorkspaceChange {
status: WorkspaceChangeStatus,
path: String,
}
/// Creates the memory root, initializes its git repository, and keeps the generated diff file in
/// `.gitignore`.
///
/// This commits an initial baseline when the repository has no `HEAD` yet. For existing clean
/// repositories that predate the generated diff file, it commits the `.gitignore` update by itself
/// so that internal housekeeping does not wake the consolidation agent.
pub(super) async fn prepare_git_repo(root: &Path) -> anyhow::Result<()> {
let root = root.to_path_buf();
task::spawn_blocking(move || {
fs::create_dir_all(&root)
.with_context(|| format!("create memories root {}", root.display()))?;
let repo = open_or_init(&root)?;
let gitignore_changed = ensure_gitignore_ignores_workspace_diff(&root)?;
if repo.head_id().is_err() && has_workspace_files(&root)? {
commit_current_tree(&repo, INITIAL_COMMIT_MESSAGE)?;
} else if gitignore_changed && only_gitignore_changed(&repo, &root)? {
commit_current_tree(&repo, GITIGNORE_COMMIT_MESSAGE)?;
}
anyhow::Ok(())
})
.await?
}
/// Returns true when the memory root differs from the current git `HEAD` tree.
pub(super) async fn has_changes(root: &Path) -> anyhow::Result<bool> {
let root = root.to_path_buf();
task::spawn_blocking(move || {
let repo = open_or_init(&root)?;
has_changes_blocking(&repo, &root)
})
.await?
}
/// Writes `phase2_workspace_diff.md` with a git-style diff from `HEAD` to the current worktree.
pub(super) async fn write_workspace_diff(root: &Path) -> anyhow::Result<()> {
let root = root.to_path_buf();
task::spawn_blocking(move || {
let repo = open_or_init(&root)?;
let head_entries = head_file_entries(&repo)?;
let current_entries = current_file_entries(&repo, &root)?;
let changes = diff_entries(&head_entries, &current_entries);
let content =
render_workspace_diff_file(&repo, &root, &head_entries, &current_entries, &changes)?;
let path = root.join(WORKSPACE_DIFF_FILENAME);
fs::write(&path, content)
.with_context(|| format!("write memory workspace diff file {}", path.display()))?;
anyhow::Ok(())
})
.await?
}
/// Commits the current memory root as the next normal git commit when it differs from `HEAD`.
pub(super) async fn commit_all(root: &Path) -> anyhow::Result<()> {
let root = root.to_path_buf();
task::spawn_blocking(move || {
let repo = open_or_init(&root)?;
commit_current_tree(&repo, CONSOLIDATION_COMMIT_MESSAGE)?;
anyhow::Ok(())
})
.await?
}
/// Removes the generated workspace diff file when no consolidation agent needs it.
pub(super) async fn remove_workspace_diff(root: &Path) -> anyhow::Result<()> {
let path = root.join(WORKSPACE_DIFF_FILENAME);
match tokio::fs::remove_file(&path).await {
Ok(()) => Ok(()),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(err) => Err(err)
.with_context(|| format!("remove memory workspace diff file {}", path.display())),
}
}
fn open_or_init(root: &Path) -> anyhow::Result<gix::Repository> {
if root.join(".git").is_dir() {
gix::open(root).with_context(|| format!("open memories git repo {}", root.display()))
} else {
gix::init(root).with_context(|| format!("init memories git repo {}", root.display()))
}
}
fn ensure_gitignore_ignores_workspace_diff(root: &Path) -> anyhow::Result<bool> {
let path = root.join(GITIGNORE_FILENAME);
let existing = match fs::read_to_string(&path) {
Ok(existing) => existing,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => String::new(),
Err(err) => return Err(err).with_context(|| format!("read {}", path.display())),
};
if existing
.lines()
.any(|line| line.trim() == WORKSPACE_DIFF_FILENAME)
{
return Ok(false);
}
let mut updated = existing;
if !updated.is_empty() && !updated.ends_with('\n') {
updated.push('\n');
}
updated.push_str(WORKSPACE_DIFF_FILENAME);
updated.push('\n');
fs::write(&path, updated).with_context(|| format!("write {}", path.display()))?;
Ok(true)
}
fn has_workspace_files(root: &Path) -> anyhow::Result<bool> {
for entry in fs::read_dir(root).with_context(|| format!("read {}", root.display()))? {
let entry = entry?;
if entry.file_name() != OsStr::new(".git")
&& !should_ignore_workspace_path(root, &entry.path())
{
return Ok(true);
}
}
Ok(false)
}
fn has_changes_blocking(repo: &gix::Repository, root: &Path) -> anyhow::Result<bool> {
let head_entries = head_file_entries(repo)?;
let current_entries = current_file_entries(repo, root)?;
Ok(head_entries != current_entries)
}
fn only_gitignore_changed(repo: &gix::Repository, root: &Path) -> anyhow::Result<bool> {
let head_entries = head_file_entries(repo)?;
let current_entries = current_file_entries(repo, root)?;
let changes = diff_entries(&head_entries, &current_entries);
Ok(!changes.is_empty()
&& changes
.iter()
.all(|change| change.path == GITIGNORE_FILENAME))
}
fn commit_current_tree(repo: &gix::Repository, message: &str) -> anyhow::Result<bool> {
let root = repo
.workdir()
.context("memories git repo must have a worktree")?;
let tree_id = write_tree(repo, root, root)?;
let parent = repo.head_id().ok().map(gix::Id::detach);
if let Some(parent) = parent {
let parent_tree = repo
.find_commit(parent)
.context("find memories HEAD commit")?
.tree_id()
.context("load memories HEAD tree id")?
.detach();
if parent_tree == tree_id {
return Ok(false);
}
}
let signature = codex_signature();
let mut time = gix::date::parse::TimeBuf::default();
let signature_ref = signature.to_ref(&mut time);
let parents = parent.into_iter().collect::<Vec<_>>();
repo.commit_as(
signature_ref,
signature_ref,
"HEAD",
message,
tree_id,
parents,
)
.context("commit memories git repo")?;
Ok(true)
}
fn codex_signature() -> gix::actor::Signature {
gix::actor::Signature {
name: "Codex".into(),
email: "noreply@openai.com".into(),
time: gix::date::Time {
seconds: chrono::Utc::now().timestamp(),
offset: 0,
},
}
}
fn write_tree(repo: &gix::Repository, root: &Path, dir: &Path) -> anyhow::Result<ObjectId> {
let mut entries = Vec::new();
for entry in fs::read_dir(dir).with_context(|| format!("read {}", dir.display()))? {
let entry = entry?;
let path = entry.path();
let file_name = entry.file_name();
if file_name == OsStr::new(".git") || should_ignore_workspace_path(root, &path) {
continue;
}
let file_type = entry.file_type()?;
if file_type.is_dir() {
let oid = write_tree(repo, root, &path)?;
let tree = repo
.find_tree(oid)
.with_context(|| format!("load tree {}", path.display()))?;
if tree.decode()?.entries.is_empty() {
continue;
}
entries.push(Entry {
mode: EntryKind::Tree.into(),
filename: os_str_to_bstring(&file_name),
oid,
});
} else if file_type.is_file() {
let bytes = fs::read(&path).with_context(|| format!("read {}", path.display()))?;
let oid = repo
.write_blob(bytes)
.with_context(|| format!("write blob {}", path.display()))?
.detach();
entries.push(Entry {
mode: file_mode(&path, EntryKind::Blob)?,
filename: os_str_to_bstring(&file_name),
oid,
});
} else if file_type.is_symlink() {
let target =
fs::read_link(&path).with_context(|| format!("read symlink {}", path.display()))?;
let oid = repo
.write_blob(path_to_bytes(&target))
.with_context(|| format!("write symlink blob {}", path.display()))?
.detach();
entries.push(Entry {
mode: EntryKind::Link.into(),
filename: os_str_to_bstring(&file_name),
oid,
});
}
}
entries.sort();
repo.write_object(&Tree { entries })
.context("write tree object")
.map(gix::Id::detach)
}
fn head_file_entries(
repo: &gix::Repository,
) -> anyhow::Result<BTreeMap<String, MemoryWorkspaceFileEntry>> {
let Ok(tree_id) = repo.head_tree_id() else {
return Ok(BTreeMap::new());
};
let tree = repo.find_tree(tree_id.detach()).context("load HEAD tree")?;
let mut entries = BTreeMap::new();
collect_tree_entries(repo, tree, PathBuf::new(), &mut entries)?;
Ok(entries)
}
fn collect_tree_entries(
repo: &gix::Repository,
tree: gix::Tree<'_>,
prefix: PathBuf,
entries: &mut BTreeMap<String, MemoryWorkspaceFileEntry>,
) -> anyhow::Result<()> {
for entry in tree.iter() {
let entry = entry?;
let file_name = bstr_to_path(entry.inner.filename);
let path = prefix.join(file_name);
if entry.inner.mode.is_tree() {
let tree = repo
.find_tree(entry.inner.oid.to_owned())
.context("load child tree")?;
collect_tree_entries(repo, tree, path, entries)?;
} else {
entries.insert(
path_to_slash_string(&path),
MemoryWorkspaceFileEntry {
oid: entry.inner.oid.to_owned(),
mode: entry.inner.mode,
},
);
}
}
Ok(())
}
fn current_file_entries(
repo: &gix::Repository,
root: &Path,
) -> anyhow::Result<BTreeMap<String, MemoryWorkspaceFileEntry>> {
let mut entries = BTreeMap::new();
collect_current_entries(repo, root, root, &mut entries)?;
Ok(entries)
}
fn collect_current_entries(
repo: &gix::Repository,
root: &Path,
dir: &Path,
entries: &mut BTreeMap<String, MemoryWorkspaceFileEntry>,
) -> anyhow::Result<()> {
for entry in fs::read_dir(dir).with_context(|| format!("read {}", dir.display()))? {
let entry = entry?;
let path = entry.path();
if path.file_name() == Some(OsStr::new(".git")) || should_ignore_workspace_path(root, &path)
{
continue;
}
let file_type = entry.file_type()?;
if file_type.is_dir() {
collect_current_entries(repo, root, &path, entries)?;
} else if file_type.is_file() {
let bytes = fs::read(&path).with_context(|| format!("read {}", path.display()))?;
entries.insert(
relative_slash_path(root, &path)?,
MemoryWorkspaceFileEntry {
oid: blob_oid(repo, &bytes)?,
mode: file_mode(&path, EntryKind::Blob)?,
},
);
} else if file_type.is_symlink() {
let target =
fs::read_link(&path).with_context(|| format!("read symlink {}", path.display()))?;
entries.insert(
relative_slash_path(root, &path)?,
MemoryWorkspaceFileEntry {
oid: blob_oid(repo, &path_to_bytes(&target))?,
mode: EntryKind::Link.into(),
},
);
}
}
Ok(())
}
fn blob_oid(repo: &gix::Repository, bytes: &[u8]) -> anyhow::Result<ObjectId> {
gix::objs::compute_hash(repo.object_hash(), gix::objs::Kind::Blob, bytes)
.context("compute memory workspace blob oid")
}
fn diff_entries(
head: &BTreeMap<String, MemoryWorkspaceFileEntry>,
current: &BTreeMap<String, MemoryWorkspaceFileEntry>,
) -> Vec<WorkspaceChange> {
let mut entries = Vec::new();
for (path, entry) in current {
match head.get(path) {
None => entries.push(WorkspaceChange {
status: WorkspaceChangeStatus::Added,
path: path.clone(),
}),
Some(head_entry) if head_entry != entry => entries.push(WorkspaceChange {
status: WorkspaceChangeStatus::Modified,
path: path.clone(),
}),
Some(_) => {}
}
}
for path in head.keys() {
if !current.contains_key(path) {
entries.push(WorkspaceChange {
status: WorkspaceChangeStatus::Deleted,
path: path.clone(),
});
}
}
entries.sort_by(|left, right| left.path.cmp(&right.path));
entries
}
fn render_workspace_diff_file(
repo: &gix::Repository,
root: &Path,
head_entries: &BTreeMap<String, MemoryWorkspaceFileEntry>,
current_entries: &BTreeMap<String, MemoryWorkspaceFileEntry>,
changes: &[WorkspaceChange],
) -> anyhow::Result<String> {
let mut rendered = String::from(
"# Memory Workspace Diff\n\n\
Generated by Codex before Phase 2 memory consolidation. Read this file first and do not edit it.\n\n\
## Status\n",
);
if changes.is_empty() {
rendered.push_str("- none\n");
return Ok(rendered);
}
for change in changes {
let status = workspace_change_status_label(change.status);
rendered.push_str(&format!("- {status} {}\n", change.path));
}
rendered.push_str("\n## Diff\n\n```diff\n");
for change in changes {
rendered.push_str(&render_workspace_change_diff(
repo,
root,
head_entries,
current_entries,
change,
)?);
}
rendered.push_str("```\n");
Ok(rendered)
}
fn render_workspace_change_diff(
repo: &gix::Repository,
root: &Path,
head_entries: &BTreeMap<String, MemoryWorkspaceFileEntry>,
current_entries: &BTreeMap<String, MemoryWorkspaceFileEntry>,
change: &WorkspaceChange,
) -> anyhow::Result<String> {
let old_entry = head_entries.get(&change.path);
let new_entry = current_entries.get(&change.path);
let old_bytes = old_entry
.map(|entry| read_head_blob(repo, entry))
.transpose()
.with_context(|| format!("read HEAD content for {}", change.path))?;
let new_bytes = new_entry
.map(|_| read_current_file_bytes(root, &change.path))
.transpose()
.with_context(|| format!("read current content for {}", change.path))?;
let old_text = String::from_utf8_lossy(old_bytes.as_deref().unwrap_or_default());
let new_text = String::from_utf8_lossy(new_bytes.as_deref().unwrap_or_default());
let old_header = if old_bytes.is_some() {
format!("a/{}", change.path)
} else {
"/dev/null".to_string()
};
let new_header = if new_bytes.is_some() {
format!("b/{}", change.path)
} else {
"/dev/null".to_string()
};
let mut section = format!("diff --git a/{0} b/{0}\n", change.path);
match (old_entry, new_entry) {
(None, Some(entry)) => {
section.push_str(&format!("new file mode {}\n", mode_label(entry.mode)));
}
(Some(entry), None) => {
section.push_str(&format!("deleted file mode {}\n", mode_label(entry.mode)));
}
(Some(old), Some(new)) if old.mode != new.mode => {
section.push_str(&format!(
"old mode {}\nnew mode {}\n",
mode_label(old.mode),
mode_label(new.mode)
));
}
(Some(_), Some(_)) => {}
(None, None) => return Ok(String::new()),
}
let diff = TextDiff::from_lines(&old_text, &new_text)
.unified_diff()
.context_radius(3)
.header(&old_header, &new_header)
.to_string();
section.push_str(&diff);
if !section.ends_with('\n') {
section.push('\n');
}
Ok(section)
}
fn read_head_blob(
repo: &gix::Repository,
entry: &MemoryWorkspaceFileEntry,
) -> anyhow::Result<Vec<u8>> {
let mut blob = repo.find_blob(entry.oid)?;
Ok(blob.take_data())
}
fn read_current_file_bytes(root: &Path, relative_path: &str) -> anyhow::Result<Vec<u8>> {
let path = root.join(relative_path);
let metadata =
fs::symlink_metadata(&path).with_context(|| format!("stat {}", path.display()))?;
if metadata.file_type().is_symlink() {
let target =
fs::read_link(&path).with_context(|| format!("read symlink {}", path.display()))?;
Ok(path_to_bytes(&target))
} else {
fs::read(&path).with_context(|| format!("read {}", path.display()))
}
}
fn workspace_change_status_label(status: WorkspaceChangeStatus) -> &'static str {
match status {
WorkspaceChangeStatus::Added => "A",
WorkspaceChangeStatus::Modified => "M",
WorkspaceChangeStatus::Deleted => "D",
}
}
fn mode_label(mode: EntryMode) -> &'static str {
match mode.kind() {
EntryKind::Blob => "100644",
EntryKind::BlobExecutable => "100755",
EntryKind::Link => "120000",
EntryKind::Tree => "040000",
EntryKind::Commit => "160000",
}
}
fn should_ignore_workspace_path(root: &Path, path: &Path) -> bool {
path.strip_prefix(root)
.is_ok_and(|relative| relative == Path::new(WORKSPACE_DIFF_FILENAME))
}
#[cfg(unix)]
fn file_mode(path: &Path, default: EntryKind) -> anyhow::Result<EntryMode> {
use std::os::unix::fs::PermissionsExt;
let mode = fs::metadata(path)?.permissions().mode();
Ok(if mode & 0o111 == 0 {
default.into()
} else {
EntryKind::BlobExecutable.into()
})
}
#[cfg(not(unix))]
fn file_mode(_path: &Path, default: EntryKind) -> anyhow::Result<EntryMode> {
Ok(default.into())
}
#[cfg(unix)]
fn os_str_to_bstring(value: &OsStr) -> gix::bstr::BString {
use std::os::unix::ffi::OsStrExt;
value.as_bytes().into()
}
#[cfg(not(unix))]
fn os_str_to_bstring(value: &OsStr) -> gix::bstr::BString {
value.to_string_lossy().as_bytes().into()
}
#[cfg(unix)]
fn path_to_bytes(path: &Path) -> Vec<u8> {
use std::os::unix::ffi::OsStrExt;
path.as_os_str().as_bytes().to_vec()
}
#[cfg(not(unix))]
fn path_to_bytes(path: &Path) -> Vec<u8> {
path.to_string_lossy().as_bytes().to_vec()
}
fn bstr_to_path(value: &gix::bstr::BStr) -> PathBuf {
#[cfg(unix)]
{
use std::os::unix::ffi::OsStrExt;
PathBuf::from(OsStr::from_bytes(value))
}
#[cfg(not(unix))]
{
PathBuf::from(value.to_string())
}
}
fn relative_slash_path(root: &Path, path: &Path) -> anyhow::Result<String> {
path.strip_prefix(root)
.with_context(|| format!("strip {} from {}", root.display(), path.display()))
.map(path_to_slash_string)
}
fn path_to_slash_string(path: &Path) -> String {
path.components()
.map(|component| component.as_os_str().to_string_lossy())
.collect::<Vec<_>>()
.join("/")
}
#[cfg(test)]
#[path = "workspace_tests.rs"]
mod tests;

View File

@@ -1,187 +0,0 @@
use super::*;
use pretty_assertions::assert_eq;
use std::fs;
use tempfile::TempDir;
#[tokio::test]
async fn prepare_creates_repo_gitignore_and_initial_commit() {
let home = TempDir::new().expect("tempdir");
let memory_root = home.path().join("memories");
fs::create_dir_all(&memory_root).expect("create memories");
fs::write(memory_root.join("MEMORY.md"), "baseline").expect("write memory");
prepare_git_repo(&memory_root).await.expect("prepare repo");
assert!(memory_root.join(".git").is_dir());
assert_eq!(
fs::read_to_string(memory_root.join(GITIGNORE_FILENAME)).expect("read gitignore"),
format!("{WORKSPACE_DIFF_FILENAME}\n")
);
assert!(!has_changes(&memory_root).await.expect("has changes"));
}
#[tokio::test]
async fn prepare_commits_gitignore_only_change_in_existing_repo() {
let home = TempDir::new().expect("tempdir");
let memory_root = home.path().join("memories");
fs::create_dir_all(&memory_root).expect("create memories");
fs::write(memory_root.join("MEMORY.md"), "baseline").expect("write memory");
let repo = gix::init(&memory_root).expect("init repo");
commit_current_tree(&repo, INITIAL_COMMIT_MESSAGE).expect("commit baseline");
prepare_git_repo(&memory_root).await.expect("prepare repo");
assert!(!has_changes(&memory_root).await.expect("has changes"));
let repo = gix::open(&memory_root).expect("open repo");
assert!(
head_file_entries(&repo)
.expect("head entries")
.contains_key(GITIGNORE_FILENAME)
);
}
#[tokio::test]
async fn writes_diff_and_commits_workspace_changes() {
let home = TempDir::new().expect("tempdir");
let memory_root = home.path().join("memories");
fs::create_dir_all(&memory_root).expect("create memories");
fs::write(memory_root.join("MEMORY.md"), "old").expect("write memory");
prepare_git_repo(&memory_root).await.expect("prepare repo");
fs::write(memory_root.join("MEMORY.md"), "new").expect("update memory");
fs::write(memory_root.join("memory_summary.md"), "summary").expect("write summary");
assert!(has_changes(&memory_root).await.expect("has changes"));
write_workspace_diff(&memory_root)
.await
.expect("write workspace diff file");
let workspace_diff = fs::read_to_string(memory_root.join(WORKSPACE_DIFF_FILENAME))
.expect("read workspace diff file");
assert!(workspace_diff.contains("- M MEMORY.md"));
assert!(workspace_diff.contains("- A memory_summary.md"));
assert!(workspace_diff.contains("diff --git a/MEMORY.md b/MEMORY.md"));
assert!(workspace_diff.contains("-old"));
assert!(workspace_diff.contains("+new"));
assert!(workspace_diff.contains("diff --git a/memory_summary.md b/memory_summary.md"));
assert!(workspace_diff.contains("+summary"));
assert!(
has_changes(&memory_root).await.expect("has changes"),
"generated diff file should not affect workspace status"
);
commit_all(&memory_root).await.expect("commit workspace");
assert!(!has_changes(&memory_root).await.expect("has changes"));
assert!(
memory_root.join(WORKSPACE_DIFF_FILENAME).is_file(),
"generated diff file remains available but ignored after commit"
);
}
#[tokio::test]
async fn remove_workspace_diff_ignores_missing_file() {
let home = TempDir::new().expect("tempdir");
let memory_root = home.path().join("memories");
fs::create_dir_all(&memory_root).expect("create memories");
remove_workspace_diff(&memory_root)
.await
.expect("remove missing workspace diff");
}
#[tokio::test]
async fn status_scan_does_not_write_added_file_blobs() {
let home = TempDir::new().expect("tempdir");
let memory_root = home.path().join("memories");
prepare_git_repo(&memory_root).await.expect("prepare repo");
let added_content = b"new uncommitted memory";
fs::write(memory_root.join("MEMORY.md"), added_content).expect("write memory");
assert!(has_changes(&memory_root).await.expect("has changes"));
let repo = gix::open(&memory_root).expect("open repo");
let added_oid = blob_oid(&repo, added_content).expect("compute added oid");
assert!(
repo.find_blob(added_oid).is_err(),
"status scans should hash current files without writing loose git objects"
);
}
#[cfg(unix)]
#[tokio::test]
async fn reports_executable_bit_changes_as_modified() {
use std::os::unix::fs::PermissionsExt;
let home = TempDir::new().expect("tempdir");
let memory_root = home.path().join("memories");
fs::create_dir_all(&memory_root).expect("create memories");
let path = memory_root.join("MEMORY.md");
fs::write(&path, "same content").expect("write memory");
prepare_git_repo(&memory_root).await.expect("prepare repo");
let mut permissions = fs::metadata(&path).expect("stat memory").permissions();
permissions.set_mode(permissions.mode() | 0o111);
fs::set_permissions(&path, permissions).expect("chmod memory");
assert!(has_changes(&memory_root).await.expect("has changes"));
write_workspace_diff(&memory_root)
.await
.expect("write workspace diff file");
let workspace_diff = fs::read_to_string(memory_root.join(WORKSPACE_DIFF_FILENAME))
.expect("read workspace diff file");
assert!(workspace_diff.contains("- M MEMORY.md"));
assert!(workspace_diff.contains("old mode 100644"));
assert!(workspace_diff.contains("new mode 100755"));
}
#[tokio::test]
async fn commit_all_creates_normal_parented_history() {
let home = TempDir::new().expect("tempdir");
let memory_root = home.path().join("memories");
fs::create_dir_all(&memory_root).expect("create memories");
fs::write(memory_root.join("MEMORY.md"), "old").expect("write memory");
prepare_git_repo(&memory_root).await.expect("prepare repo");
let first_head = gix::open(&memory_root)
.expect("open repo")
.head_id()
.expect("first head")
.detach();
fs::write(memory_root.join("MEMORY.md"), "new").expect("update memory");
commit_all(&memory_root).await.expect("commit workspace");
let repo = gix::open(&memory_root).expect("open repo");
let second_head = repo.head_id().expect("second head").detach();
assert_ne!(first_head, second_head);
let second_commit = repo.find_commit(second_head).expect("find second commit");
assert_eq!(
second_commit.parent_ids().collect::<Vec<_>>(),
vec![first_head]
);
}
#[tokio::test]
async fn workspace_diff_file_includes_deleted_head_content() {
let home = TempDir::new().expect("tempdir");
let memory_root = home.path().join("memories");
fs::create_dir_all(memory_root.join("rollout_summaries")).expect("create rollout summaries");
let summary_path = memory_root.join("rollout_summaries/deleted.md");
fs::write(
&summary_path,
"thread_id: 00000000-0000-4000-8000-000000000001\nimportant stale evidence\n",
)
.expect("write rollout summary");
prepare_git_repo(&memory_root).await.expect("prepare repo");
fs::remove_file(&summary_path).expect("delete rollout summary");
write_workspace_diff(&memory_root)
.await
.expect("write workspace diff file");
let workspace_diff = fs::read_to_string(memory_root.join(WORKSPACE_DIFF_FILENAME))
.expect("read workspace diff file");
assert!(workspace_diff.contains("- D rollout_summaries/deleted.md"));
assert!(workspace_diff.contains("deleted file mode 100644"));
assert!(workspace_diff.contains("-thread_id: 00000000-0000-4000-8000-000000000001"));
assert!(workspace_diff.contains("-important stale evidence"));
}

View File

@@ -252,6 +252,8 @@ use crate::agents_md::AgentsMdManager;
use crate::context::UserInstructions;
use crate::exec_policy::ExecPolicyUpdateError;
use crate::guardian::GuardianReviewSessionManager;
use crate::hook_runtime::PendingInputRecordOutcome;
use crate::hook_runtime::record_pending_turn_input;
use crate::mcp::McpManager;
use crate::memories;
use crate::network_policy_decision::execpolicy_network_rule_amendment;
@@ -269,6 +271,7 @@ use crate::skills_watcher::SkillsWatcher;
use crate::skills_watcher::SkillsWatcherEvent;
use crate::state::ActiveTurn;
use crate::state::MailboxDeliveryPhase;
use crate::state::PendingTurnInput;
use crate::state::SessionServices;
use crate::state::SessionState;
#[cfg(test)]
@@ -2888,7 +2891,7 @@ impl Session {
}
let mut turn_state = active_turn.turn_state.lock().await;
turn_state.push_pending_input(input.into());
turn_state.push_pending_input(input);
turn_state.accept_mailbox_delivery_for_current_turn();
Ok(active_turn_id.clone())
}
@@ -2975,12 +2978,18 @@ impl Session {
clippy::await_holding_invalid_type,
reason = "active turn checks and turn state updates must remain atomic"
)]
#[cfg(test)]
pub async fn prepend_pending_input(&self, input: Vec<ResponseInputItem>) -> Result<(), ()> {
let mut active = self.active_turn.lock().await;
match active.as_mut() {
Some(at) => {
let mut ts = at.turn_state.lock().await;
ts.prepend_pending_input(input);
ts.prepend_pending_input(
input
.into_iter()
.map(PendingTurnInput::ResponseInputItem)
.collect(),
);
Ok(())
}
None => Err(()),
@@ -2991,7 +3000,7 @@ impl Session {
clippy::await_holding_invalid_type,
reason = "active turn checks and turn state updates must remain atomic"
)]
pub async fn get_pending_input(&self) -> Vec<ResponseInputItem> {
pub(crate) async fn get_pending_turn_input(&self) -> Vec<PendingTurnInput> {
let (pending_input, accepts_mailbox_delivery) = {
let mut active = self.active_turn.lock().await;
match active.as_mut() {
@@ -3013,7 +3022,7 @@ impl Session {
mailbox_rx
.drain()
.into_iter()
.map(|mail| mail.to_response_input_item())
.map(|mail| PendingTurnInput::ResponseInputItem(mail.to_response_input_item()))
.collect::<Vec<_>>()
};
if pending_input.is_empty() {
@@ -3027,6 +3036,73 @@ impl Session {
}
}
#[cfg(test)]
pub async fn get_pending_input(&self) -> Vec<ResponseInputItem> {
self.get_pending_turn_input()
.await
.into_iter()
.map(ResponseInputItem::from)
.collect()
}
async fn fill_pending_input_from_mailbox_if_empty(
&self,
turn_state: &Arc<Mutex<crate::state::TurnState>>,
) {
let should_drain_mailbox = {
let ts = turn_state.lock().await;
!ts.has_pending_input() && ts.accepts_mailbox_delivery_for_current_turn()
};
if !should_drain_mailbox {
return;
}
let mailbox_items = {
let mut mailbox_rx = self.mailbox_rx.lock().await;
mailbox_rx
.drain()
.into_iter()
.map(|mail| PendingTurnInput::ResponseInputItem(mail.to_response_input_item()))
.collect::<Vec<_>>()
};
if mailbox_items.is_empty() {
return;
}
let mut ts = turn_state.lock().await;
for item in mailbox_items {
ts.push_pending_input(item);
}
}
#[expect(
clippy::await_holding_invalid_type,
reason = "pending transcript input must stay queued until hooks and history writes finish"
)]
pub(crate) async fn record_next_pending_turn_input_from_state(
self: &Arc<Self>,
turn_context: &Arc<crate::session::turn_context::TurnContext>,
turn_state: &Arc<Mutex<crate::state::TurnState>>,
) -> Option<PendingInputRecordOutcome> {
self.fill_pending_input_from_mailbox_if_empty(turn_state)
.await;
let mut ts = turn_state.lock().await;
let pending_input = ts.front_pending_input()?;
let outcome = record_pending_turn_input(self, turn_context, pending_input).await;
let _ = ts.pop_front_pending_input();
Some(outcome)
}
pub(crate) async fn record_next_pending_turn_input(
self: &Arc<Self>,
turn_context: &Arc<crate::session::turn_context::TurnContext>,
) -> Option<PendingInputRecordOutcome> {
let turn_state = self.turn_state_for_sub_id(&turn_context.sub_id).await?;
self.record_next_pending_turn_input_from_state(turn_context, &turn_state)
.await
}
/// Queue response items to be injected into the next active turn created for this session.
#[cfg(test)]
pub(crate) async fn queue_response_items_for_next_turn(&self, items: Vec<ResponseInputItem>) {

View File

@@ -310,7 +310,10 @@ async fn interrupting_regular_turn_waiting_on_startup_prewarm_emits_turn_aborted
.await;
sess.spawn_task(
Arc::clone(&tc),
Vec::new(),
vec![UserInput::Text {
text: "first prompt".to_string(),
text_elements: Vec::new(),
}],
crate::tasks::RegularTask::new(),
)
.await;
@@ -326,23 +329,51 @@ async fn interrupting_regular_turn_waiting_on_startup_prewarm_emits_turn_aborted
sess.abort_all_tasks(TurnAbortReason::Interrupted).await;
let second = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv())
.await
.expect("expected turn aborted event")
.expect("channel open");
let EventMsg::TurnAborted(TurnAbortedEvent {
let aborted = loop {
let event = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv())
.await
.expect("expected turn aborted event")
.expect("channel open");
if let EventMsg::TurnAborted(event) = event.msg {
break event;
}
};
let TurnAbortedEvent {
turn_id,
reason,
completed_at,
duration_ms,
}) = second.msg
else {
panic!("expected turn aborted event");
};
} = aborted;
assert_eq!(turn_id, Some(tc.sub_id.clone()));
assert_eq!(reason, TurnAbortReason::Interrupted);
assert!(completed_at.is_some());
assert!(duration_ms.is_some());
let history = sess.clone_history().await;
let expected_prompt = user_message("first prompt");
let prompt_idx = history
.raw_items()
.iter()
.position(|item| item == &expected_prompt);
let aborted_idx = history.raw_items().iter().position(|item| {
let ResponseItem::Message { role, content, .. } = item else {
return false;
};
role == "user"
&& content.iter().any(|content_item| {
let ContentItem::InputText { text } = content_item else {
return false;
};
TurnAborted::matches_text(text)
})
});
let (Some(prompt_idx), Some(aborted_idx)) = (prompt_idx, aborted_idx) else {
panic!(
"expected prompt and interrupted-turn marker in history: {:?}",
history.raw_items()
);
};
assert!(prompt_idx < aborted_idx);
}
fn test_model_client_session() -> crate::client::ModelClientSession {

View File

@@ -18,13 +18,9 @@ use crate::compact_remote::run_inline_remote_auto_compact_task;
use crate::connectors;
use crate::context::ContextualUserFragment;
use crate::feedback_tags;
use crate::hook_runtime::PendingInputHookDisposition;
use crate::hook_runtime::PendingInputRecordOutcome;
use crate::hook_runtime::emit_hook_completed_events;
use crate::hook_runtime::inspect_pending_input;
use crate::hook_runtime::record_additional_contexts;
use crate::hook_runtime::record_pending_input;
use crate::hook_runtime::run_pending_session_start_hooks;
use crate::hook_runtime::run_user_prompt_submit_hooks;
use crate::injection::ToolMentionKind;
use crate::injection::app_id_from_path;
use crate::injection::tool_kind_for_path;
@@ -75,7 +71,6 @@ use codex_protocol::error::CodexErr;
use codex_protocol::error::Result as CodexResult;
use codex_protocol::items::PlanItem;
use codex_protocol::items::TurnItem;
use codex_protocol::items::UserMessageItem;
use codex_protocol::items::build_hook_prompt_message;
use codex_protocol::models::BaseInstructions;
use codex_protocol::models::ContentItem;
@@ -300,30 +295,12 @@ pub(crate) async fn run_turn(
if run_pending_session_start_hooks(&sess, &turn_context).await {
return None;
}
let additional_contexts = if input.is_empty() {
Vec::new()
} else {
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input.clone());
let response_item: ResponseItem = initial_input_for_turn.clone().into();
let user_prompt_submit_outcome = run_user_prompt_submit_hooks(
&sess,
&turn_context,
UserMessageItem::new(&input).message(),
)
.await;
if user_prompt_submit_outcome.should_stop {
record_additional_contexts(
&sess,
&turn_context,
user_prompt_submit_outcome.additional_contexts,
)
.await;
return None;
}
sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), &input, response_item)
.await;
user_prompt_submit_outcome.additional_contexts
};
if !input.is_empty()
&& sess.record_next_pending_turn_input(&turn_context).await
!= Some(PendingInputRecordOutcome::Recorded)
{
return None;
}
sess.services
.analytics_events_client
.track_app_mentioned(tracking.clone(), mentioned_app_invocations);
@@ -334,7 +311,6 @@ pub(crate) async fn run_turn(
}
sess.merge_connector_selection(explicitly_enabled_connectors.clone())
.await;
record_additional_contexts(&sess, &turn_context, additional_contexts).await;
if !input.is_empty() {
// Track the previous-turn baseline from the regular user-turn path only so
// standalone tasks (compact/shell/review/undo) cannot suppress future
@@ -403,7 +379,7 @@ pub(crate) async fn run_turn(
};
// Pending input is drained into history before building the next model request.
// However, we defer that drain until after sampling in two cases:
// 1. At the start of a turn, so the fresh user prompt in `input` gets sampled first.
// 1. At the start of a turn, so work queued behind the fresh prompt is sampled later.
// 2. After auto-compact, when model/tool continuation needs to resume before any steer.
let mut can_drain_pending_input = input.is_empty();
@@ -412,35 +388,18 @@ pub(crate) async fn run_turn(
break;
}
// Note that pending_input would be something like a message the user
// submitted through the UI while the model was running. Though the UI
// may support this, the model might not.
let pending_input = if can_drain_pending_input {
sess.get_pending_input().await
} else {
Vec::new()
};
let mut blocked_pending_input = false;
let mut blocked_pending_input_contexts = Vec::new();
let mut requeued_pending_input = false;
let mut accepted_pending_input = Vec::new();
if !pending_input.is_empty() {
let mut pending_input_iter = pending_input.into_iter();
while let Some(pending_input_item) = pending_input_iter.next() {
match inspect_pending_input(&sess, &turn_context, pending_input_item).await {
PendingInputHookDisposition::Accepted(pending_input) => {
accepted_pending_input.push(*pending_input);
let mut has_accepted_pending_input = false;
if can_drain_pending_input {
// Note that pending input would be something like a message the user
// submitted through the UI while the model was running. Though the UI
// may support this, the model might not.
while let Some(outcome) = sess.record_next_pending_turn_input(&turn_context).await {
match outcome {
PendingInputRecordOutcome::Recorded => {
has_accepted_pending_input = true;
}
PendingInputHookDisposition::Blocked {
additional_contexts,
} => {
let remaining_pending_input = pending_input_iter.collect::<Vec<_>>();
if !remaining_pending_input.is_empty() {
let _ = sess.prepend_pending_input(remaining_pending_input).await;
requeued_pending_input = true;
}
blocked_pending_input_contexts = additional_contexts;
PendingInputRecordOutcome::Blocked => {
blocked_pending_input = true;
break;
}
@@ -448,14 +407,8 @@ pub(crate) async fn run_turn(
}
}
let has_accepted_pending_input = !accepted_pending_input.is_empty();
for pending_input in accepted_pending_input {
record_pending_input(&sess, &turn_context, pending_input).await;
}
record_additional_contexts(&sess, &turn_context, blocked_pending_input_contexts).await;
if blocked_pending_input && !has_accepted_pending_input {
if requeued_pending_input {
if sess.has_pending_input().await {
continue;
}
break;

View File

@@ -6,6 +6,7 @@ pub(crate) use service::SessionServices;
pub(crate) use session::SessionState;
pub(crate) use turn::ActiveTurn;
pub(crate) use turn::MailboxDeliveryPhase;
pub(crate) use turn::PendingTurnInput;
pub(crate) use turn::RunningTask;
pub(crate) use turn::TaskKind;
pub(crate) use turn::TurnState;

View File

@@ -13,6 +13,7 @@ use codex_protocol::dynamic_tools::DynamicToolResponse;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::request_permissions::RequestPermissionsResponse;
use codex_protocol::request_user_input::RequestUserInputResponse;
use codex_protocol::user_input::UserInput;
use codex_rmcp_client::ElicitationResponse;
use rmcp::model::RequestId;
use tokio::sync::oneshot;
@@ -77,6 +78,38 @@ pub(crate) struct RunningTask {
pub(crate) _timer: Option<codex_otel::Timer>,
}
/// Input queued for ordered transcript recording during an active turn.
///
/// User prompts keep the original `UserInput` so client-visible turn items can
/// preserve UI-only spans such as `text_elements`; model-only response input
/// items can stay in their serialized Responses API form.
#[derive(Clone, Debug, PartialEq)]
pub(crate) enum PendingTurnInput {
UserInput(Vec<UserInput>),
ResponseInputItem(ResponseInputItem),
}
impl From<PendingTurnInput> for ResponseInputItem {
fn from(value: PendingTurnInput) -> Self {
match value {
PendingTurnInput::UserInput(input) => input.into(),
PendingTurnInput::ResponseInputItem(input) => input,
}
}
}
impl From<Vec<UserInput>> for PendingTurnInput {
fn from(value: Vec<UserInput>) -> Self {
Self::UserInput(value)
}
}
impl From<ResponseInputItem> for PendingTurnInput {
fn from(value: ResponseInputItem) -> Self {
Self::ResponseInputItem(value)
}
}
impl ActiveTurn {
pub(crate) fn add_task(&mut self, task: RunningTask) {
let sub_id = task.turn_context.sub_id.clone();
@@ -101,7 +134,7 @@ pub(crate) struct TurnState {
pending_user_input: HashMap<String, oneshot::Sender<RequestUserInputResponse>>,
pending_elicitations: HashMap<(String, RequestId), oneshot::Sender<ElicitationResponse>>,
pending_dynamic_tools: HashMap<String, oneshot::Sender<DynamicToolResponse>>,
pending_input: Vec<ResponseInputItem>,
pending_input: Vec<PendingTurnInput>,
mailbox_delivery_phase: MailboxDeliveryPhase,
granted_permissions: Option<PermissionProfile>,
pub(crate) tool_calls: u64,
@@ -198,11 +231,12 @@ impl TurnState {
self.pending_dynamic_tools.remove(key)
}
pub(crate) fn push_pending_input(&mut self, input: ResponseInputItem) {
self.pending_input.push(input);
pub(crate) fn push_pending_input(&mut self, input: impl Into<PendingTurnInput>) {
self.pending_input.push(input.into());
}
pub(crate) fn prepend_pending_input(&mut self, mut input: Vec<ResponseInputItem>) {
#[cfg(test)]
pub(crate) fn prepend_pending_input(&mut self, mut input: Vec<PendingTurnInput>) {
if input.is_empty() {
return;
}
@@ -211,7 +245,19 @@ impl TurnState {
self.pending_input = input;
}
pub(crate) fn take_pending_input(&mut self) -> Vec<ResponseInputItem> {
pub(crate) fn front_pending_input(&self) -> Option<PendingTurnInput> {
self.pending_input.first().cloned()
}
pub(crate) fn pop_front_pending_input(&mut self) -> Option<PendingTurnInput> {
if self.pending_input.is_empty() {
None
} else {
Some(self.pending_input.remove(0))
}
}
pub(crate) fn take_pending_input(&mut self) -> Vec<PendingTurnInput> {
if self.pending_input.is_empty() {
Vec::with_capacity(0)
} else {

View File

@@ -20,13 +20,11 @@ use tracing::trace;
use tracing::warn;
use crate::context::ContextualUserFragment;
use crate::hook_runtime::PendingInputHookDisposition;
use crate::hook_runtime::inspect_pending_input;
use crate::hook_runtime::record_additional_contexts;
use crate::hook_runtime::record_pending_input;
use crate::hook_runtime::record_pending_turn_input;
use crate::session::session::Session;
use crate::session::turn_context::TurnContext;
use crate::state::ActiveTurn;
use crate::state::PendingTurnInput;
use crate::state::RunningTask;
use crate::state::TaskKind;
use codex_analytics::TurnTokenUsageFact;
@@ -38,7 +36,6 @@ use codex_otel::TURN_MEMORY_METRIC;
use codex_otel::TURN_NETWORK_PROXY_METRIC;
use codex_otel::TURN_TOKEN_USAGE_METRIC;
use codex_otel::TURN_TOOL_CALL_METRIC;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::RolloutItem;
@@ -149,6 +146,11 @@ pub(crate) trait SessionTask: Send + Sync + 'static {
/// Returns the tracing name for a spawned task span.
fn span_name(&self) -> &'static str;
/// Whether the submitted user input should be queued for ordered transcript recording.
fn queues_initial_input(&self) -> bool {
false
}
/// Executes the task until completion or cancellation.
///
/// Implementations typically stream protocol events using `session` and
@@ -186,6 +188,8 @@ pub(crate) trait AnySessionTask: Send + Sync + 'static {
fn span_name(&self) -> &'static str;
fn queues_initial_input(&self) -> bool;
fn run(
self: Arc<Self>,
session: Arc<SessionTaskContext>,
@@ -213,6 +217,10 @@ where
SessionTask::span_name(self)
}
fn queues_initial_input(&self) -> bool {
SessionTask::queues_initial_input(self)
}
fn run(
self: Arc<Self>,
session: Arc<SessionTaskContext>,
@@ -259,6 +267,7 @@ impl Session {
let task: Arc<dyn AnySessionTask> = Arc::new(task);
let task_kind = task.kind();
let span_name = task.span_name();
let queue_initial_input = task.queues_initial_input() && !input.is_empty();
let started_at = Instant::now();
turn_context
.turn_timing_state
@@ -270,7 +279,7 @@ impl Session {
let done = Arc::new(Notify::new());
let queued_response_items = self.take_queued_response_items_for_next_turn().await;
let mailbox_items = self.get_pending_input().await;
let mailbox_items = self.get_pending_turn_input().await;
let turn_state = {
let mut active = self.active_turn.lock().await;
let turn = active.get_or_insert_with(ActiveTurn::default);
@@ -280,6 +289,9 @@ impl Session {
{
let mut turn_state = turn_state.lock().await;
turn_state.token_usage_at_turn_start = token_usage_at_turn_start;
if queue_initial_input {
turn_state.push_pending_input(PendingTurnInput::UserInput(input.clone()));
}
for item in queued_response_items {
turn_state.push_pending_input(item);
}
@@ -398,8 +410,10 @@ impl Session {
pub async fn abort_all_tasks(self: &Arc<Self>, reason: TurnAbortReason) {
if let Some(mut active_turn) = self.take_active_turn().await {
let turn_state = Arc::clone(&active_turn.turn_state);
for task in active_turn.drain_tasks() {
self.handle_task_abort(task, reason.clone()).await;
self.handle_task_abort(task, reason.clone(), Arc::clone(&turn_state))
.await;
}
// Let interrupted tasks observe cancellation before dropping pending approvals, or an
// in-flight approval wait can surface as a model-visible rejection before TurnAborted.
@@ -419,7 +433,7 @@ impl Session {
.turn_metadata_state
.cancel_git_enrichment_task();
let mut pending_input = Vec::<ResponseInputItem>::new();
let mut pending_input = Vec::<PendingTurnInput>::new();
let mut should_clear_active_turn = false;
let mut token_usage_at_turn_start = None;
let mut turn_had_memory_citation = false;
@@ -448,16 +462,7 @@ impl Session {
}
if !pending_input.is_empty() {
for pending_input_item in pending_input {
match inspect_pending_input(self, &turn_context, pending_input_item).await {
PendingInputHookDisposition::Accepted(pending_input) => {
record_pending_input(self, &turn_context, *pending_input).await;
}
PendingInputHookDisposition::Blocked {
additional_contexts,
} => {
record_additional_contexts(self, &turn_context, additional_contexts).await;
}
}
record_pending_turn_input(self, &turn_context, pending_input_item).await;
}
}
// Emit token usage metrics.
@@ -594,7 +599,12 @@ impl Session {
}
}
async fn handle_task_abort(self: &Arc<Self>, task: RunningTask, reason: TurnAbortReason) {
async fn handle_task_abort(
self: &Arc<Self>,
task: RunningTask,
reason: TurnAbortReason,
turn_state: Arc<tokio::sync::Mutex<crate::state::TurnState>>,
) {
let sub_id = task.turn_context.sub_id.clone();
if task.cancellation_token.is_cancelled() {
return;
@@ -624,6 +634,11 @@ impl Session {
if reason == TurnAbortReason::Interrupted {
self.cleanup_after_interrupt(&task.turn_context).await;
while self
.record_next_pending_turn_input_from_state(&task.turn_context, &turn_state)
.await
.is_some()
{}
let marker = interrupted_turn_history_marker();
self.record_into_history(std::slice::from_ref(&marker), task.turn_context.as_ref())

View File

@@ -33,6 +33,10 @@ impl SessionTask for RegularTask {
"session_task.turn"
}
fn queues_initial_input(&self) -> bool {
true
}
async fn run(
self: Arc<Self>,
session: Arc<SessionTaskContext>,

View File

@@ -143,31 +143,29 @@ Mode selection:
- INCREMENTAL UPDATE: existing artifacts already exist and `raw_memories.md`
mostly contains new additions.
Memory workspace diff:
Incremental thread diff snapshot (computed before the current artifact sync rewrites local files):
The folder `{{ memory_root }}/` is a git repository managed by Codex. Read
`{{ phase2_workspace_diff_file }}` in this same folder first. It contains the git-style diff from
the previous successful Phase 2 commit to the current worktree. It is generated by Codex for this
run and is not part of the committed memory artifacts.
**Diff since last consolidation:**
{{ phase2_input_selection }}
Incremental update and forgetting mechanism:
- Use the git-style diff in `{{ phase2_workspace_diff_file }}` to identify relevant changed
sections and deleted inputs.
- Use the diff provided
- Do not open raw sessions / original rollout transcripts.
- For added or modified `raw_memories.md` and `rollout_summaries/*.md` files, read the changed
raw-memory sections and the corresponding rollout summaries only when needed for stronger
evidence, task placement, or conflict resolution.
- For each added thread id, search it in `raw_memories.md`, read that raw-memory section, and
read the corresponding `rollout_summaries/*.md` file only when needed for stronger evidence,
task placement, or conflict resolution.
- When scanning a raw-memory section, read the task-level `Preference signals:` subsections
first, then the rest of the task blocks.
- For deleted `rollout_summaries/*.md` or `extensions/*/resources/*.md` files, search their
filenames, paths, and thread ids (when present) in `MEMORY.md`. Delete only memory supported
by deleted inputs.
- If a `MEMORY.md` block contains both deleted and still-present evidence, do not delete the whole
block. Remove only stale references and stale local guidance, preserve shared or still-supported
content, and split or rewrite the block only if needed.
- For each removed thread id, search it in `MEMORY.md` and delete only the memory supported by
that thread. Use `thread_id=<thread_id>` in `### rollout_summary_files` when available; if not,
fall back to rollout summary filenames plus the corresponding `rollout_summaries/*.md` files.
- If a `MEMORY.md` block contains both removed and undeleted threads, do not delete the whole
block. Remove only the removed thread's references and thread-local guidance, preserve shared
or still-supported content, and split or rewrite the block only if needed to keep the undeleted
threads intact.
- After `MEMORY.md` cleanup is done, revisit `memory_summary.md` and remove or rewrite stale
summary/index content that was only supported by deleted files.
summary/index content that was only supported by removed thread ids.
Outputs:
Under `{{ memory_root }}/`:
@@ -745,28 +743,26 @@ WORKFLOW
3. INCREMENTAL UPDATE behavior:
- Read existing `MEMORY.md` and `memory_summary.md` first for continuity and to locate
existing references that may need surgical cleanup.
- Use the injected git-style workspace changes as the first routing pass:
- added/modified `raw_memories.md` and `rollout_summaries/*.md` = ingestion queue
- deleted `rollout_summaries/*.md` and `extensions/*/resources/*.md` = forgetting /
stale-cleanup queue
- Use the injected thread-diff snapshot as the first routing pass:
- added thread ids = ingestion queue
- removed thread ids = forgetting / stale-cleanup queue
- Build an index of rollout references already present in existing `MEMORY.md` before
scanning raw memories so you can route net-new evidence into the right blocks.
- Work in this order:
1. For added or modified rollout inputs, search their paths/thread ids in `raw_memories.md`,
read those sections, and open the corresponding `rollout_summaries/*.md` files when
necessary.
1. For newly added thread ids, search them in `raw_memories.md`, read those sections, and
open the corresponding `rollout_summaries/*.md` files when necessary.
2. Route the new signal into existing `MEMORY.md` blocks or create new ones when needed.
3. For deleted inputs, search `MEMORY.md` and surgically delete or rewrite only the
unsupported memory.
4. If a block mixes deleted and still-present evidence, preserve the still-supported content;
split or rewrite the block if that is the cleanest way to delete only the stale part.
3. For removed thread ids, search `MEMORY.md` and surgically delete or rewrite only the
unsupported thread-local memory.
4. If a block mixes removed and undeleted threads, preserve the undeleted-thread content;
split or rewrite the block if that is the cleanest way to delete only the removed part.
5. After `MEMORY.md` is correct, revisit `memory_summary.md` and remove or rewrite stale
summary/index content that no longer has current support.
summary/index content that no longer has undeleted support.
- Integrate new signal into existing artifacts by:
- scanning added or modified raw-memory entries in recency order and identifying which existing blocks they should update
- scanning the newly added raw-memory entries in recency order and identifying which existing blocks they should update
- updating existing knowledge with better/newer evidence
- updating stale or contradicting guidance
- pruning or downgrading memory whose only provenance comes from deleted inputs
- pruning or downgrading memory whose only provenance comes from removed thread ids
- expanding terse old blocks when new summaries/raw memories make the task family clearer
- doing light clustering and merging if needed
- refreshing `MEMORY.md` top-of-file ordering so recent high-utility task families stay easy to find
@@ -778,8 +774,8 @@ WORKFLOW
target, keep its wording, label, and relative order mostly stable. Rewrite/reorder/rename/
split/merge only when fixing a real problem (staleness, ambiguity, schema drift, wrong
boundaries) or when meaningful new evidence materially improves retrieval clarity/searchability.
- Spend most of your deep-dive budget on added/modified inputs and on mixed blocks touched by
deleted inputs. Do not re-read unchanged older threads unless you need them for
- Spend most of your deep-dive budget on newly added thread ids and on mixed blocks touched by
removed thread ids. Do not re-read unchanged older threads unless you need them for
conflict resolution, clustering, or provenance repair.
4. Evidence deep-dive rule (both modes):
@@ -797,7 +793,8 @@ WORKFLOW
evidence, procedural detail, validation signals, and user feedback before finalizing
`MEMORY.md`.
- When deleting stale memory from a mixed block, use the relevant rollout summaries to decide
which details are uniquely supported by deleted inputs versus still-supported evidence.
which details are uniquely supported by removed threads versus still supported by undeleted
threads.
- Use `updated_at` and validation strength together to resolve stale/conflicting notes.
- For user-profile or preference claims, recurrence matters: repeated evidence across
rollouts should generally outrank a single polished but isolated summary.
@@ -814,7 +811,7 @@ WORKFLOW
- remove duplication in memory_summary, skills/, and MEMORY.md
- remove stale or low-signal blocks that are less likely to be useful in the future
- remove or rewrite blocks/task sections whose supporting rollout references point only to
deleted inputs or missing rollout summary files
removed thread ids or missing rollout summary files
- run a global rollout-reference audit on final `MEMORY.md` and fix accidental duplicate
entries / redundant repetition, while preserving intentional multi-task or multi-block
reuse when it adds distinct task-local value

View File

@@ -55,15 +55,26 @@ async fn memories_startup_phase2_tracks_added_and_removed_inputs_across_runs() -
let first = build_test_codex(&server, home.clone()).await?;
let first_request = wait_for_single_request(&first_phase2).await;
let _first_prompt = phase2_prompt_text(&first_request);
let first_workspace_diff = read_workspace_diff(&home).await?;
let first_prompt = phase2_prompt_text(&first_request);
assert!(
first_workspace_diff.contains("- A raw_memories.md"),
"expected raw memories to be added in first workspace diff: {first_workspace_diff}"
first_prompt.contains("- selected inputs this run: 1"),
"expected selected count in first prompt: {first_prompt}"
);
assert!(
first_workspace_diff.contains("rollout_a.md"),
"expected rollout A summary to be added: {first_workspace_diff}"
first_prompt.contains("- newly added since the last successful Phase 2 run: 1"),
"expected added count in first prompt: {first_prompt}"
);
assert!(
first_prompt.contains("- removed from the last successful Phase 2 run: 0"),
"expected removed count in first prompt: {first_prompt}"
);
assert!(
first_prompt.contains(&format!("- [added] thread_id={thread_a},")),
"expected thread A to be marked added: {first_prompt}"
);
assert!(
first_prompt.contains("Removed from the last successful Phase 2 selection:\n- none"),
"expected no removed items in first prompt: {first_prompt}"
);
wait_for_phase2_success(db.as_ref(), thread_a).await?;
@@ -100,27 +111,34 @@ async fn memories_startup_phase2_tracks_added_and_removed_inputs_across_runs() -
let second = build_test_codex(&server, home.clone()).await?;
let second_request = wait_for_single_request(&second_phase2).await;
let _second_prompt = phase2_prompt_text(&second_request);
let second_workspace_diff = read_workspace_diff(&home).await?;
let second_prompt = phase2_prompt_text(&second_request);
assert!(
second_workspace_diff.contains("- M raw_memories.md"),
"expected raw memories to be modified in second workspace diff: {second_workspace_diff}"
second_prompt.contains("- selected inputs this run: 1"),
"expected selected count in second prompt: {second_prompt}"
);
assert!(
second_workspace_diff.contains("rollout_b.md"),
"expected rollout B summary to be added: {second_workspace_diff}"
second_prompt.contains("- newly added since the last successful Phase 2 run: 1"),
"expected added count in second prompt: {second_prompt}"
);
assert!(
second_workspace_diff.contains("- D rollout_summaries/"),
"expected rollout A summary to be deleted: {second_workspace_diff}"
second_prompt.contains("- removed from the last successful Phase 2 run: 1"),
"expected removed count in second prompt: {second_prompt}"
);
assert!(
second_prompt.contains(&format!("- [added] thread_id={thread_b},")),
"expected thread B to be marked added: {second_prompt}"
);
assert!(
second_prompt.contains(&format!("- thread_id={thread_a},")),
"expected thread A to be marked removed: {second_prompt}"
);
wait_for_phase2_success(db.as_ref(), thread_b).await?;
let raw_memories = tokio::fs::read_to_string(memory_root.join("raw_memories.md")).await?;
assert!(raw_memories.contains("raw memory B"));
assert!(!raw_memories.contains("raw memory A"));
assert!(raw_memories.contains("raw memory A"));
let rollout_summaries = read_rollout_summary_bodies(&memory_root).await?;
assert_eq!(rollout_summaries.len(), 1);
assert_eq!(rollout_summaries.len(), 2);
assert!(
rollout_summaries
.iter()
@@ -132,7 +150,7 @@ async fn memories_startup_phase2_tracks_added_and_removed_inputs_across_runs() -
.any(|summary| summary.contains("git_branch: branch-rollout-b"))
);
assert!(
!rollout_summaries
rollout_summaries
.iter()
.any(|summary| summary.contains("rollout summary A"))
);
@@ -157,11 +175,11 @@ async fn memories_startup_phase2_prunes_old_extension_resources_and_reports_them
)
.await?;
let chronicle_resources = home.path().join("memories/extensions/chronicle/resources");
let chronicle_resources = home.path().join("memories_extensions/chronicle/resources");
tokio::fs::create_dir_all(&chronicle_resources).await?;
tokio::fs::write(
home.path()
.join("memories/extensions/chronicle/instructions.md"),
.join("memories_extensions/chronicle/instructions.md"),
"instructions",
)
.await?;
@@ -189,32 +207,33 @@ async fn memories_startup_phase2_prunes_old_extension_resources_and_reports_them
let codex = build_test_codex(&server, home.clone()).await?;
let request = wait_for_single_request(&phase2).await;
let _prompt = phase2_prompt_text(&request);
let workspace_diff = read_workspace_diff(&home).await?;
let prompt = phase2_prompt_text(&request);
assert!(
workspace_diff.contains(&format!(
"- D extensions/chronicle/resources/{old_file_name}"
)),
"expected old resource deletion in workspace diff: {workspace_diff}"
prompt.contains("Memory extension resources removed by retention pruning:"),
"expected extension resource prune report in prompt: {prompt}"
);
assert!(
prompt.contains("- retention window: 7 days"),
"expected retention window in prompt: {prompt}"
);
assert!(
prompt.contains("- extension: chronicle"),
"expected extension name in prompt: {prompt}"
);
assert!(
prompt.contains(&format!(" - resources/{old_file_name}")),
"expected old resource in prompt: {prompt}"
);
wait_for_phase2_success(db.as_ref(), thread_id).await?;
let old_file = home.path().join(format!(
"memories/extensions/chronicle/resources/{old_file_name}"
));
wait_for_file_removed(&old_file).await?;
assert!(
!tokio::fs::try_exists(&old_file).await?,
"old extension resource should be pruned"
);
assert!(
tokio::fs::try_exists(
home.path()
.join("memories/extensions/chronicle/resources")
.join(recent_file.file_name().expect("recent file name"))
)
.await?,
tokio::fs::try_exists(&recent_file).await?,
"recent extension resource should be retained"
);
@@ -232,11 +251,11 @@ async fn memories_startup_phase2_processes_old_extension_resources_without_stage
.await?;
let now = Utc::now();
let chronicle_resources = home.path().join("memories/extensions/chronicle/resources");
let chronicle_resources = home.path().join("memories_extensions/chronicle/resources");
tokio::fs::create_dir_all(&chronicle_resources).await?;
tokio::fs::write(
home.path()
.join("memories/extensions/chronicle/instructions.md"),
.join("memories_extensions/chronicle/instructions.md"),
"instructions",
)
.await?;
@@ -259,18 +278,20 @@ async fn memories_startup_phase2_processes_old_extension_resources_without_stage
let codex = build_test_codex(&server, home.clone()).await?;
let request = wait_for_single_request(&phase2).await;
let _prompt = phase2_prompt_text(&request);
let workspace_diff = read_workspace_diff(&home).await?;
let prompt = phase2_prompt_text(&request);
assert!(
workspace_diff.contains(&format!(
"- D extensions/chronicle/resources/{old_file_name}"
)),
"expected old resource deletion in workspace diff: {workspace_diff}"
prompt.contains("- selected inputs this run: 0"),
"expected no selected raw inputs in prompt: {prompt}"
);
assert!(
prompt.contains("Memory extension resources removed by retention pruning:"),
"expected extension resource prune report in prompt: {prompt}"
);
assert!(
prompt.contains(&format!(" - resources/{old_file_name}")),
"expected old resource in prompt: {prompt}"
);
let old_file = home.path().join(format!(
"memories/extensions/chronicle/resources/{old_file_name}"
));
wait_for_file_removed(&old_file).await?;
shutdown_test_codex(&codex).await?;
@@ -374,11 +395,18 @@ async fn web_search_pollution_moves_selected_thread_into_removed_phase2_inputs()
let first_phase2_request = wait_for_request(&responses, /*expected_count*/ 1)
.await
.remove(0);
let _first_phase2_prompt = phase2_prompt_text(&first_phase2_request);
let first_workspace_diff = read_workspace_diff(&home).await?;
let first_phase2_prompt = phase2_prompt_text(&first_phase2_request);
assert!(
first_workspace_diff.contains("- A raw_memories.md"),
"expected raw memories to be added before pollution: {first_workspace_diff}"
first_phase2_prompt.contains("- selected inputs this run: 1"),
"expected seeded thread to be selected before pollution: {first_phase2_prompt}"
);
assert!(
first_phase2_prompt.contains("- newly added since the last successful Phase 2 run: 1"),
"expected seeded thread to be added before pollution: {first_phase2_prompt}"
);
assert!(
first_phase2_prompt.contains(&format!("- [added] thread_id={thread_id},")),
"expected selected thread in first phase2 prompt: {first_phase2_prompt}"
);
wait_for_phase2_success(db.as_ref(), thread_id).await?;
@@ -433,39 +461,6 @@ async fn web_search_pollution_moves_selected_thread_into_removed_phase2_inputs()
assert_eq!(selection.removed[0].thread_id, thread_id);
shutdown_test_codex(&resumed).await?;
let removed_phase2 = mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-phase2-removed"),
ev_assistant_message("msg-phase2-removed", "phase2 removed complete"),
ev_completed("resp-phase2-removed"),
]),
)
.await;
let cleanup = build_test_codex(&server, home.clone()).await?;
let removed_request = wait_for_single_request(&removed_phase2).await;
let _removed_prompt = phase2_prompt_text(&removed_request);
let workspace_diff = read_workspace_diff(&home).await?;
assert!(
workspace_diff.contains("- D rollout_summaries/"),
"expected polluted thread rollout summary to be deleted: {workspace_diff}"
);
assert!(
workspace_diff.contains("deleted file mode 100644"),
"expected deleted file section in workspace diff: {workspace_diff}"
);
assert!(
workspace_diff.contains(&format!("-thread_id: {thread_id}")),
"expected deleted rollout summary metadata in workspace diff: {workspace_diff}"
);
assert!(
workspace_diff.contains("-rollout summary seeded for web search pollution"),
"expected deleted rollout summary content in workspace diff: {workspace_diff}"
);
wait_for_phase2_no_pending_inputs(db.as_ref()).await?;
shutdown_test_codex(&cleanup).await?;
Ok(())
}
@@ -530,10 +525,6 @@ async fn wait_for_single_request(mock: &ResponseMock) -> ResponsesRequest {
wait_for_request(mock, /*expected_count*/ 1).await.remove(0)
}
async fn read_workspace_diff(home: &TempDir) -> Result<String> {
Ok(tokio::fs::read_to_string(home.path().join("memories/phase2_workspace_diff.md")).await?)
}
async fn wait_for_file_removed(path: &Path) -> Result<()> {
let deadline = Instant::now() + Duration::from_secs(10);
loop {
@@ -569,7 +560,7 @@ fn phase2_prompt_text(request: &ResponsesRequest) -> String {
request
.message_input_texts("user")
.into_iter()
.find(|text| text.contains("Memory workspace diff:"))
.find(|text| text.contains("Current selected Phase 1 inputs:"))
.expect("phase2 prompt text")
}
@@ -598,27 +589,6 @@ async fn wait_for_phase2_success(
}
}
async fn wait_for_phase2_no_pending_inputs(db: &codex_state::StateRuntime) -> Result<()> {
let deadline = Instant::now() + Duration::from_secs(10);
loop {
let selection = db
.get_phase2_input_selection(/*n*/ 1, /*max_unused_days*/ 30)
.await?;
if selection.selected.is_empty()
&& selection.retained_thread_ids.is_empty()
&& selection.removed.is_empty()
{
return Ok(());
}
assert!(
Instant::now() < deadline,
"timed out waiting for phase2 to clear pending inputs: {selection:?}"
);
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
async fn seed_stage1_output_for_existing_thread(
db: &codex_state::StateRuntime,
thread_id: ThreadId,

View File

@@ -57,114 +57,10 @@ mod job_control;
/// Target frame interval for UI redraw scheduling.
pub(crate) const TARGET_FRAME_INTERVAL: Duration = frame_rate_limiter::MIN_FRAME_INTERVAL;
const DISABLE_KEYBOARD_ENHANCEMENT_ENV_VAR: &str = "CODEX_TUI_DISABLE_KEYBOARD_ENHANCEMENT";
/// A type alias for the terminal type used in this application
pub type Terminal = CustomTerminal<CrosstermBackend<Stdout>>;
fn keyboard_enhancement_disabled() -> bool {
let disable_env = std::env::var(DISABLE_KEYBOARD_ENHANCEMENT_ENV_VAR).ok();
let is_wsl = running_in_wsl();
let is_vscode_terminal = is_wsl && running_in_vscode_terminal();
keyboard_enhancement_disabled_for(disable_env.as_deref(), is_wsl, is_vscode_terminal)
}
fn keyboard_enhancement_disabled_for(
disable_env: Option<&str>,
is_wsl: bool,
is_vscode_terminal: bool,
) -> bool {
if let Some(disabled) = parse_bool_env(disable_env) {
return disabled;
}
// VS Code running a WSL shell can hide TERM_PROGRAM from the Linux process
// environment, so `running_in_vscode_terminal` also probes the Windows-side
// environment through WSL interop.
is_wsl && is_vscode_terminal
}
fn parse_bool_env(value: Option<&str>) -> Option<bool> {
match value.map(str::trim) {
Some("1") => Some(true),
Some(value) if value.eq_ignore_ascii_case("true") => Some(true),
Some(value) if value.eq_ignore_ascii_case("yes") => Some(true),
Some("0") => Some(false),
Some(value) if value.eq_ignore_ascii_case("false") => Some(false),
Some(value) if value.eq_ignore_ascii_case("no") => Some(false),
_ => None,
}
}
fn running_in_wsl() -> bool {
#[cfg(target_os = "linux")]
{
crate::clipboard_paste::is_probably_wsl()
}
#[cfg(not(target_os = "linux"))]
{
false
}
}
fn running_in_vscode_terminal() -> bool {
vscode_terminal_detected(
std::env::var("TERM_PROGRAM").ok().as_deref(),
windows_term_program().as_deref(),
)
}
fn vscode_terminal_detected(
linux_term_program: Option<&str>,
windows_term_program: Option<&str>,
) -> bool {
term_program_is_vscode(linux_term_program) || term_program_is_vscode(windows_term_program)
}
fn term_program_is_vscode(value: Option<&str>) -> bool {
value.is_some_and(|value| value.eq_ignore_ascii_case("vscode"))
}
fn windows_term_program() -> Option<String> {
#[cfg(target_os = "linux")]
{
static WINDOWS_TERM_PROGRAM: std::sync::OnceLock<Option<String>> =
std::sync::OnceLock::new();
WINDOWS_TERM_PROGRAM
.get_or_init(read_windows_term_program)
.clone()
}
#[cfg(not(target_os = "linux"))]
{
None
}
}
#[cfg(target_os = "linux")]
fn read_windows_term_program() -> Option<String> {
let output = std::process::Command::new("cmd.exe")
.args(["/d", "/s", "/c", "set TERM_PROGRAM"])
.stdin(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.output()
.ok()?;
if !output.status.success() {
return None;
}
String::from_utf8_lossy(&output.stdout)
.lines()
.find_map(|line| {
line.trim_end_matches('\r')
.strip_prefix("TERM_PROGRAM=")
.map(str::to_string)
})
.filter(|value| !value.trim().is_empty())
}
fn should_emit_notification(condition: NotificationCondition, terminal_focused: bool) -> bool {
match condition {
NotificationCondition::Unfocused => !terminal_focused,
@@ -174,10 +70,7 @@ fn should_emit_notification(condition: NotificationCondition, terminal_focused:
#[cfg(test)]
mod tests {
use super::keyboard_enhancement_disabled_for;
use super::parse_bool_env;
use super::should_emit_notification;
use super::vscode_terminal_detected;
use codex_config::types::NotificationCondition;
#[test]
@@ -203,68 +96,6 @@ mod tests {
/*terminal_focused*/ false
));
}
#[test]
fn keyboard_enhancement_env_flag_parses_common_values() {
assert_eq!(parse_bool_env(Some("1")), Some(true));
assert_eq!(parse_bool_env(Some("true")), Some(true));
assert_eq!(parse_bool_env(Some("YES")), Some(true));
assert_eq!(parse_bool_env(Some("0")), Some(false));
assert_eq!(parse_bool_env(Some("false")), Some(false));
assert_eq!(parse_bool_env(Some("NO")), Some(false));
assert_eq!(parse_bool_env(Some("unexpected")), None);
assert_eq!(parse_bool_env(/*value*/ None), None);
}
#[test]
fn keyboard_enhancement_auto_disables_for_vscode_in_wsl() {
assert!(keyboard_enhancement_disabled_for(
/*disable_env*/ None, /*is_wsl*/ true, /*is_vscode_terminal*/ true
));
}
#[test]
fn keyboard_enhancement_auto_disable_requires_wsl_and_vscode() {
assert!(!keyboard_enhancement_disabled_for(
/*disable_env*/ None, /*is_wsl*/ true, /*is_vscode_terminal*/ false
));
assert!(!keyboard_enhancement_disabled_for(
/*disable_env*/ None, /*is_wsl*/ false, /*is_vscode_terminal*/ true
));
}
#[test]
fn keyboard_enhancement_env_flag_overrides_auto_detection() {
assert!(!keyboard_enhancement_disabled_for(
Some("0"),
/*is_wsl*/ true,
/*is_vscode_terminal*/ true
));
assert!(keyboard_enhancement_disabled_for(
Some("1"),
/*is_wsl*/ false,
/*is_vscode_terminal*/ false
));
}
#[test]
fn vscode_terminal_detection_uses_linux_and_windows_term_program() {
assert!(vscode_terminal_detected(
Some("vscode"),
/*windows_term_program*/ None
));
assert!(vscode_terminal_detected(
/*linux_term_program*/ None,
Some("vscode")
));
assert!(!vscode_terminal_detected(
/*linux_term_program*/ None,
Some("WindowsTerminal")
));
assert!(!vscode_terminal_detected(
/*linux_term_program*/ None, /*windows_term_program*/ None
));
}
}
pub fn set_modes() -> Result<()> {
@@ -277,16 +108,14 @@ pub fn set_modes() -> Result<()> {
// Some terminals (notably legacy Windows consoles) do not support
// keyboard enhancement flags. Attempt to enable them, but continue
// gracefully if unsupported.
if !keyboard_enhancement_disabled() {
let _ = execute!(
stdout(),
PushKeyboardEnhancementFlags(
KeyboardEnhancementFlags::DISAMBIGUATE_ESCAPE_CODES
| KeyboardEnhancementFlags::REPORT_EVENT_TYPES
| KeyboardEnhancementFlags::REPORT_ALTERNATE_KEYS
)
);
}
let _ = execute!(
stdout(),
PushKeyboardEnhancementFlags(
KeyboardEnhancementFlags::DISAMBIGUATE_ESCAPE_CODES
| KeyboardEnhancementFlags::REPORT_EVENT_TYPES
| KeyboardEnhancementFlags::REPORT_ALTERNATE_KEYS
)
);
let _ = execute!(stdout(), EnableFocusChange);
Ok(())
@@ -476,8 +305,7 @@ impl Tui {
// Detect keyboard enhancement support before any EventStream is created so the
// crossterm poller can acquire its lock without contention.
let enhanced_keys_supported =
!keyboard_enhancement_disabled() && supports_keyboard_enhancement().unwrap_or(false);
let enhanced_keys_supported = supports_keyboard_enhancement().unwrap_or(false);
// Cache this to avoid contention with the event reader.
supports_color::on_cached(supports_color::Stream::Stdout);
let _ = crate::terminal_palette::default_colors();