Compare commits

...

9 Commits

Author SHA1 Message Date
Ahmed Ibrahim
ad1a8040c7 fix: use platform python for notify test 2026-03-02 10:22:12 -07:00
jif-oai
9a42a56d8f chore: /multiagent alias for /agent (#13249)
Add a `/mutli-agents` alias for `/agent` and update the wording
2026-03-02 16:51:54 +00:00
daveaitel-openai
c2e126f92a core: reuse parent shell snapshot for thread-spawn subagents (#13052)
## Summary
- reuse the parent shell snapshot when spawning/forking/resuming
`SessionSource::SubAgent(SubAgentSource::ThreadSpawn { .. })` sessions
- plumb inherited snapshot through `AgentControl -> ThreadManager ->
Codex::spawn -> SessionConfiguration`
- skip shell snapshot refresh on cwd updates for thread-spawn subagents
so inherited snapshots are not replaced

## Why
- avoids per-subagent shell snapshot creation and cleanup work
- keeps thread-spawn subagents on the parent snapshot path, matching the
intended parent/child snapshot model

## Validation
- `just fmt` (in `codex-rs`)
- `cargo test -p codex-core --no-run`
- `cargo test -p codex-core spawn_agent -- --nocapture`
- `cargo test -p codex-core --test all
suite::agent_jobs::spawn_agents_on_csv_runs_and_exports`

## Notes
- full `cargo test -p codex-core --test all` was left running separately
for broader verification

Co-authored-by: Codex <noreply@openai.com>
2026-03-02 15:53:15 +00:00
jif-oai
2a5bcc053f fix: esc in /agent (#13131)
Fix https://github.com/openai/codex/issues/13093
2026-03-02 15:49:06 +00:00
jif-oai
1905597017 feat: update memories config names (#13237) 2026-03-02 15:25:39 +00:00
jif-oai
b649953845 feat: polluted memories (#13008)
Add a feature flag to disable memory creation for "polluted"
2026-03-02 11:57:32 +00:00
jif-oai
b08bdd91e3 fix: /status when sub-agent (#13130)
Fix https://github.com/openai/codex/issues/13066
2026-03-02 11:57:15 +00:00
gabec-openai
9685e7d6d1 Improve subagent contrast in TUI (#13197)
## Summary
- raise contrast for subagent transcript labels and fallback states
- remove low-contrast dim styling from role tags and error details
- make the closed-agent picker dot readable in dark theme

## Validation
- just fmt
- just fix -p codex-tui
- cargo test -p codex-tui

Co-authored-by: Codex <noreply@openai.com>
2026-03-02 12:16:49 +01:00
Eric Traut
d94f0b6ce7 Fix issue deduplication workflow for Codex issues (#13215)
Fixes #13203

Summary
- split the duplicate-finding workflow into two jobs so we gather all
issues first
- add an open-issue fallback job that runs only when the full scan finds
nothing
- centralize final selection so `comment-on-issue` always sees the best
dedupe output
2026-03-01 22:45:50 -07:00
35 changed files with 1282 additions and 144 deletions

View File

@@ -7,15 +7,17 @@ on:
- labeled
jobs:
gather-duplicates:
name: Identify potential duplicates
gather-duplicates-all:
name: Identify potential duplicates (all issues)
# Prevent runs on forks (requires OpenAI API key, wastes Actions minutes)
if: github.repository == 'openai/codex' && (github.event.action == 'opened' || (github.event.action == 'labeled' && github.event.label.name == 'codex-deduplicate'))
runs-on: ubuntu-latest
permissions:
contents: read
outputs:
codex_output: ${{ steps.select-final.outputs.codex_output }}
issues_json: ${{ steps.normalize-all.outputs.issues_json }}
reason: ${{ steps.normalize-all.outputs.reason }}
has_matches: ${{ steps.normalize-all.outputs.has_matches }}
steps:
- uses: actions/checkout@v6
@@ -29,7 +31,6 @@ jobs:
CURRENT_ISSUE_FILE=codex-current-issue.json
EXISTING_ALL_FILE=codex-existing-issues-all.json
EXISTING_OPEN_FILE=codex-existing-issues-open.json
gh issue list --repo "$REPO" \
--json number,title,body,createdAt,updatedAt,state,labels \
@@ -47,22 +48,6 @@ jobs:
}]' \
> "$EXISTING_ALL_FILE"
gh issue list --repo "$REPO" \
--json number,title,body,createdAt,updatedAt,state,labels \
--limit 1000 \
--state open \
--search "sort:created-desc" \
| jq '[.[] | {
number,
title,
body: ((.body // "")[0:4000]),
createdAt,
updatedAt,
state,
labels: ((.labels // []) | map(.name))
}]' \
> "$EXISTING_OPEN_FILE"
gh issue view "$ISSUE_NUMBER" \
--repo "$REPO" \
--json number,title,body \
@@ -71,7 +56,6 @@ jobs:
echo "Prepared duplicate detection input files."
echo "all_issue_count=$(jq 'length' "$EXISTING_ALL_FILE")"
echo "open_issue_count=$(jq 'length' "$EXISTING_OPEN_FILE")"
# Prompt instructions are intentionally inline in this workflow. The old
# .github/prompts/issue-deduplicator.txt file is obsolete and removed.
@@ -158,9 +142,59 @@ jobs:
echo "has_matches=$has_matches"
} >> "$GITHUB_OUTPUT"
gather-duplicates-open:
name: Identify potential duplicates (open issues fallback)
# Pass 1 may drop sudo on the runner, so run the fallback in a fresh job.
needs: gather-duplicates-all
if: ${{ needs.gather-duplicates-all.result == 'success' && needs.gather-duplicates-all.outputs.has_matches != 'true' }}
runs-on: ubuntu-latest
permissions:
contents: read
outputs:
issues_json: ${{ steps.normalize-open.outputs.issues_json }}
reason: ${{ steps.normalize-open.outputs.reason }}
has_matches: ${{ steps.normalize-open.outputs.has_matches }}
steps:
- uses: actions/checkout@v6
- name: Prepare Codex inputs
env:
GH_TOKEN: ${{ github.token }}
REPO: ${{ github.repository }}
ISSUE_NUMBER: ${{ github.event.issue.number }}
run: |
set -eo pipefail
CURRENT_ISSUE_FILE=codex-current-issue.json
EXISTING_OPEN_FILE=codex-existing-issues-open.json
gh issue list --repo "$REPO" \
--json number,title,body,createdAt,updatedAt,state,labels \
--limit 1000 \
--state open \
--search "sort:created-desc" \
| jq '[.[] | {
number,
title,
body: ((.body // "")[0:4000]),
createdAt,
updatedAt,
state,
labels: ((.labels // []) | map(.name))
}]' \
> "$EXISTING_OPEN_FILE"
gh issue view "$ISSUE_NUMBER" \
--repo "$REPO" \
--json number,title,body \
| jq '{number, title, body: ((.body // "")[0:4000])}' \
> "$CURRENT_ISSUE_FILE"
echo "Prepared fallback duplicate detection input files."
echo "open_issue_count=$(jq 'length' "$EXISTING_OPEN_FILE")"
- id: codex-open
name: Find duplicates (pass 2, open issues)
if: ${{ steps.normalize-all.outputs.has_matches != 'true' }}
uses: openai/codex-action@main
with:
openai-api-key: ${{ secrets.CODEX_OPENAI_API_KEY }}
@@ -200,7 +234,6 @@ jobs:
- id: normalize-open
name: Normalize pass 2 output
if: ${{ steps.normalize-all.outputs.has_matches != 'true' }}
env:
CODEX_OUTPUT: ${{ steps.codex-open.outputs.final-message }}
CURRENT_ISSUE_NUMBER: ${{ github.event.issue.number }}
@@ -243,15 +276,27 @@ jobs:
echo "has_matches=$has_matches"
} >> "$GITHUB_OUTPUT"
select-final:
name: Select final duplicate set
needs:
- gather-duplicates-all
- gather-duplicates-open
if: ${{ always() && needs.gather-duplicates-all.result == 'success' && (needs.gather-duplicates-open.result == 'success' || needs.gather-duplicates-open.result == 'skipped') }}
runs-on: ubuntu-latest
permissions:
contents: read
outputs:
codex_output: ${{ steps.select-final.outputs.codex_output }}
steps:
- id: select-final
name: Select final duplicate set
env:
PASS1_ISSUES: ${{ steps.normalize-all.outputs.issues_json }}
PASS1_REASON: ${{ steps.normalize-all.outputs.reason }}
PASS2_ISSUES: ${{ steps.normalize-open.outputs.issues_json }}
PASS2_REASON: ${{ steps.normalize-open.outputs.reason }}
PASS1_HAS_MATCHES: ${{ steps.normalize-all.outputs.has_matches }}
PASS2_HAS_MATCHES: ${{ steps.normalize-open.outputs.has_matches }}
PASS1_ISSUES: ${{ needs.gather-duplicates-all.outputs.issues_json }}
PASS1_REASON: ${{ needs.gather-duplicates-all.outputs.reason }}
PASS2_ISSUES: ${{ needs.gather-duplicates-open.outputs.issues_json }}
PASS2_REASON: ${{ needs.gather-duplicates-open.outputs.reason }}
PASS1_HAS_MATCHES: ${{ needs.gather-duplicates-all.outputs.has_matches }}
PASS2_HAS_MATCHES: ${{ needs.gather-duplicates-open.outputs.has_matches }}
run: |
set -eo pipefail
@@ -289,8 +334,8 @@ jobs:
comment-on-issue:
name: Comment with potential duplicates
needs: gather-duplicates
if: ${{ needs.gather-duplicates.result != 'skipped' }}
needs: select-final
if: ${{ needs.select-final.result != 'skipped' }}
runs-on: ubuntu-latest
permissions:
contents: read
@@ -299,7 +344,7 @@ jobs:
- name: Comment on issue
uses: actions/github-script@v8
env:
CODEX_OUTPUT: ${{ needs.gather-duplicates.outputs.codex_output }}
CODEX_OUTPUT: ${{ needs.select-final.outputs.codex_output }}
with:
github-token: ${{ github.token }}
script: |

View File

@@ -84,6 +84,7 @@ pub fn create_fake_rollout_with_source(
model_provider: model_provider.map(str::to_string),
base_instructions: None,
dynamic_tools: None,
memory_mode: None,
};
let payload = serde_json::to_value(SessionMetaLine {
meta,
@@ -165,6 +166,7 @@ pub fn create_fake_rollout_with_text_elements(
model_provider: model_provider.map(str::to_string),
base_instructions: None,
dynamic_tools: None,
memory_mode: None,
};
let payload = serde_json::to_value(SessionMetaLine {
meta,

View File

@@ -207,12 +207,13 @@ tmp_path.replace(payload_path)
let notify_script = notify_script
.to_str()
.expect("notify script path should be valid UTF-8");
let notify_command = if cfg!(windows) { "python" } else { "python3" };
create_config_toml_with_extra(
codex_home.path(),
&server.uri(),
"never",
&format!(
"notify = [\"python3\", {}]",
"notify = [\"{notify_command}\", {}]",
toml_basic_string(notify_script)
),
)?;
@@ -261,7 +262,12 @@ tmp_path.replace(payload_path)
)
.await??;
fs_wait::wait_for_path_exists(&notify_file, Duration::from_secs(5)).await?;
let notify_timeout = if cfg!(windows) {
Duration::from_secs(15)
} else {
Duration::from_secs(5)
};
fs_wait::wait_for_path_exists(&notify_file, notify_timeout).await?;
let payload_raw = tokio::fs::read_to_string(&notify_file).await?;
let payload: Value = serde_json::from_str(&payload_raw)?;
assert_eq!(payload["client"], "xcode");

View File

@@ -616,11 +616,19 @@
"additionalProperties": false,
"description": "Memories settings loaded from config.toml.",
"properties": {
"consolidation_model": {
"description": "Model used for memory consolidation.",
"type": "string"
},
"extract_model": {
"description": "Model used for thread summarisation.",
"type": "string"
},
"generate_memories": {
"description": "When `false`, newly created threads are stored with `memory_mode = \"disabled\"` in the state DB.",
"type": "boolean"
},
"max_raw_memories_for_global": {
"max_raw_memories_for_consolidation": {
"description": "Maximum number of recent raw memories retained for global consolidation.",
"format": "uint",
"minimum": 0.0,
@@ -647,13 +655,9 @@
"format": "int64",
"type": "integer"
},
"phase_1_model": {
"description": "Model used for thread summarisation.",
"type": "string"
},
"phase_2_model": {
"description": "Model used for memory consolidation.",
"type": "string"
"no_memories_if_mcp_or_web_search": {
"description": "When `true`, web searches and MCP tool calls mark the thread `memory_mode` as `\"polluted\"`.",
"type": "boolean"
},
"use_memories": {
"description": "When `false`, skip injecting memory usage instructions into developer prompts.",

View File

@@ -7,6 +7,7 @@ use crate::find_thread_path_by_id_str;
use crate::rollout::RolloutRecorder;
use crate::session_prefix::format_subagent_context_line;
use crate::session_prefix::format_subagent_notification_message;
use crate::shell_snapshot::ShellSnapshot;
use crate::state_db;
use crate::thread_manager::ThreadManagerState;
use codex_protocol::ThreadId;
@@ -83,6 +84,9 @@ impl AgentControl {
) -> CodexResult<ThreadId> {
let state = self.upgrade()?;
let mut reservation = self.state.reserve_spawn_slot(config.agent_max_threads)?;
let inherited_shell_snapshot = self
.inherited_shell_snapshot_for_source(&state, session_source.as_ref())
.await;
let session_source = match session_source {
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
@@ -161,6 +165,7 @@ impl AgentControl {
self.clone(),
session_source,
false,
inherited_shell_snapshot,
)
.await?
} else {
@@ -171,6 +176,7 @@ impl AgentControl {
session_source,
false,
None,
inherited_shell_snapshot,
)
.await?
}
@@ -235,6 +241,9 @@ impl AgentControl {
other => other,
};
let notification_source = session_source.clone();
let inherited_shell_snapshot = self
.inherited_shell_snapshot_for_source(&state, Some(&session_source))
.await;
let rollout_path =
find_thread_path_by_id_str(config.codex_home.as_path(), &thread_id.to_string())
.await?
@@ -246,6 +255,7 @@ impl AgentControl {
rollout_path,
self.clone(),
session_source,
inherited_shell_snapshot,
)
.await?;
reservation.commit(resumed_thread.thread_id);
@@ -431,6 +441,22 @@ impl AgentControl {
.upgrade()
.ok_or_else(|| CodexErr::UnsupportedOperation("thread manager dropped".to_string()))
}
async fn inherited_shell_snapshot_for_source(
&self,
state: &Arc<ThreadManagerState>,
session_source: Option<&SessionSource>,
) -> Option<Arc<ShellSnapshot>> {
let Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id, ..
})) = session_source
else {
return None;
};
let parent_thread = state.get_thread(*parent_thread_id).await.ok()?;
parent_thread.codex.session.user_shell().shell_snapshot()
}
}
#[cfg(test)]
mod tests {

View File

@@ -345,6 +345,7 @@ impl Codex {
dynamic_tools: Vec<DynamicToolSpec>,
persist_extended_history: bool,
metrics_service_name: Option<String>,
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
) -> CodexResult<CodexSpawnOk> {
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
let (tx_event, rx_event) = async_channel::unbounded();
@@ -475,6 +476,7 @@ impl Codex {
session_source,
dynamic_tools,
persist_extended_history,
inherited_shell_snapshot,
};
// Generate a unique ID for the lifetime of this Codex session.
@@ -865,6 +867,7 @@ pub(crate) struct SessionConfiguration {
session_source: SessionSource,
dynamic_tools: Vec<DynamicToolSpec>,
persist_extended_history: bool,
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
}
impl SessionConfiguration {
@@ -1383,13 +1386,19 @@ impl Session {
};
// Create the mutable state for the Session.
let shell_snapshot_tx = if config.features.enabled(Feature::ShellSnapshot) {
ShellSnapshot::start_snapshotting(
config.codex_home.clone(),
conversation_id,
session_configuration.cwd.clone(),
&mut default_shell,
otel_manager.clone(),
)
if let Some(snapshot) = session_configuration.inherited_shell_snapshot.clone() {
let (tx, rx) = watch::channel(Some(snapshot));
default_shell.shell_snapshot = rx;
tx
} else {
ShellSnapshot::start_snapshotting(
config.codex_home.clone(),
conversation_id,
session_configuration.cwd.clone(),
&mut default_shell,
otel_manager.clone(),
)
}
} else {
let (tx, rx) = watch::channel(None);
default_shell.shell_snapshot = rx;
@@ -1978,6 +1987,7 @@ impl Session {
previous_cwd: &Path,
next_cwd: &Path,
codex_home: &Path,
session_source: &SessionSource,
) {
if previous_cwd == next_cwd {
return;
@@ -1987,6 +1997,13 @@ impl Session {
return;
}
if matches!(
session_source,
SessionSource::SubAgent(SubAgentSource::ThreadSpawn { .. })
) {
return;
}
ShellSnapshot::refresh_snapshot(
codex_home.to_path_buf(),
self.conversation_id,
@@ -2008,10 +2025,16 @@ impl Session {
let previous_cwd = state.session_configuration.cwd.clone();
let next_cwd = updated.cwd.clone();
let codex_home = updated.codex_home.clone();
let session_source = updated.session_source.clone();
state.session_configuration = updated;
drop(state);
self.maybe_refresh_shell_snapshot_for_cwd(&previous_cwd, &next_cwd, &codex_home);
self.maybe_refresh_shell_snapshot_for_cwd(
&previous_cwd,
&next_cwd,
&codex_home,
&session_source,
);
Ok(())
}
@@ -2027,7 +2050,13 @@ impl Session {
sub_id: String,
updates: SessionSettingsUpdate,
) -> ConstraintResult<Arc<TurnContext>> {
let (session_configuration, sandbox_policy_changed, previous_cwd, codex_home) = {
let (
session_configuration,
sandbox_policy_changed,
previous_cwd,
codex_home,
session_source,
) = {
let mut state = self.state.lock().await;
match state.session_configuration.clone().apply(&updates) {
Ok(next) => {
@@ -2035,8 +2064,15 @@ impl Session {
let sandbox_policy_changed =
state.session_configuration.sandbox_policy != next.sandbox_policy;
let codex_home = next.codex_home.clone();
let session_source = next.session_source.clone();
state.session_configuration = next.clone();
(next, sandbox_policy_changed, previous_cwd, codex_home)
(
next,
sandbox_policy_changed,
previous_cwd,
codex_home,
session_source,
)
}
Err(err) => {
drop(state);
@@ -2057,6 +2093,7 @@ impl Session {
&previous_cwd,
&session_configuration.cwd,
&codex_home,
&session_source,
);
Ok(self
@@ -7667,6 +7704,7 @@ mod tests {
session_source: SessionSource::Exec,
dynamic_tools: Vec::new(),
persist_extended_history: false,
inherited_shell_snapshot: None,
};
let mut state = SessionState::new(session_configuration);
@@ -7760,6 +7798,7 @@ mod tests {
session_source: SessionSource::Exec,
dynamic_tools: Vec::new(),
persist_extended_history: false,
inherited_shell_snapshot: None,
};
let mut state = SessionState::new(session_configuration);
@@ -8072,6 +8111,7 @@ mod tests {
session_source: SessionSource::Exec,
dynamic_tools: Vec::new(),
persist_extended_history: false,
inherited_shell_snapshot: None,
}
}
@@ -8126,6 +8166,7 @@ mod tests {
session_source: SessionSource::Exec,
dynamic_tools: Vec::new(),
persist_extended_history: false,
inherited_shell_snapshot: None,
};
let (tx_event, _rx_event) = async_channel::unbounded();
@@ -8216,6 +8257,7 @@ mod tests {
session_source: SessionSource::Exec,
dynamic_tools: Vec::new(),
persist_extended_history: false,
inherited_shell_snapshot: None,
};
let per_turn_config = Session::build_per_turn_config(&session_configuration);
let model_info = ModelsManager::construct_model_info_offline_for_tests(
@@ -8383,6 +8425,7 @@ mod tests {
session_source: SessionSource::Exec,
dynamic_tools,
persist_extended_history: false,
inherited_shell_snapshot: None,
};
let per_turn_config = Session::build_per_turn_config(&session_configuration);
let model_info = ModelsManager::construct_model_info_offline_for_tests(

View File

@@ -62,6 +62,7 @@ pub(crate) async fn run_codex_thread_interactive(
Vec::new(),
false,
None,
None,
)
.await?;
let codex = Arc::new(codex);

View File

@@ -2505,29 +2505,31 @@ persistence = "none"
let memories = r#"
[memories]
no_memories_if_mcp_or_web_search = true
generate_memories = false
use_memories = false
max_raw_memories_for_global = 512
max_raw_memories_for_consolidation = 512
max_unused_days = 21
max_rollout_age_days = 42
max_rollouts_per_startup = 9
min_rollout_idle_hours = 24
phase_1_model = "gpt-5-mini"
phase_2_model = "gpt-5"
extract_model = "gpt-5-mini"
consolidation_model = "gpt-5"
"#;
let memories_cfg =
toml::from_str::<ConfigToml>(memories).expect("TOML deserialization should succeed");
assert_eq!(
Some(MemoriesToml {
no_memories_if_mcp_or_web_search: Some(true),
generate_memories: Some(false),
use_memories: Some(false),
max_raw_memories_for_global: Some(512),
max_raw_memories_for_consolidation: Some(512),
max_unused_days: Some(21),
max_rollout_age_days: Some(42),
max_rollouts_per_startup: Some(9),
min_rollout_idle_hours: Some(24),
phase_1_model: Some("gpt-5-mini".to_string()),
phase_2_model: Some("gpt-5".to_string()),
extract_model: Some("gpt-5-mini".to_string()),
consolidation_model: Some("gpt-5".to_string()),
}),
memories_cfg.memories
);
@@ -2541,15 +2543,16 @@ phase_2_model = "gpt-5"
assert_eq!(
config.memories,
MemoriesConfig {
no_memories_if_mcp_or_web_search: true,
generate_memories: false,
use_memories: false,
max_raw_memories_for_global: 512,
max_raw_memories_for_consolidation: 512,
max_unused_days: 21,
max_rollout_age_days: 42,
max_rollouts_per_startup: 9,
min_rollout_idle_hours: 24,
phase_1_model: Some("gpt-5-mini".to_string()),
phase_2_model: Some("gpt-5".to_string()),
extract_model: Some("gpt-5-mini".to_string()),
consolidation_model: Some("gpt-5".to_string()),
}
);
}

View File

@@ -26,7 +26,7 @@ pub const DEFAULT_OTEL_ENVIRONMENT: &str = "dev";
pub const DEFAULT_MEMORIES_MAX_ROLLOUTS_PER_STARTUP: usize = 16;
pub const DEFAULT_MEMORIES_MAX_ROLLOUT_AGE_DAYS: i64 = 30;
pub const DEFAULT_MEMORIES_MIN_ROLLOUT_IDLE_HOURS: i64 = 6;
pub const DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL: usize = 256;
pub const DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION: usize = 256;
pub const DEFAULT_MEMORIES_MAX_UNUSED_DAYS: i64 = 30;
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, JsonSchema)]
@@ -371,12 +371,14 @@ pub struct FeedbackConfigToml {
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema)]
#[schemars(deny_unknown_fields)]
pub struct MemoriesToml {
/// When `true`, web searches and MCP tool calls mark the thread `memory_mode` as `"polluted"`.
pub no_memories_if_mcp_or_web_search: Option<bool>,
/// When `false`, newly created threads are stored with `memory_mode = "disabled"` in the state DB.
pub generate_memories: Option<bool>,
/// When `false`, skip injecting memory usage instructions into developer prompts.
pub use_memories: Option<bool>,
/// Maximum number of recent raw memories retained for global consolidation.
pub max_raw_memories_for_global: Option<usize>,
pub max_raw_memories_for_consolidation: Option<usize>,
/// Maximum number of days since a memory was last used before it becomes ineligible for phase 2 selection.
pub max_unused_days: Option<i64>,
/// Maximum age of the threads used for memories.
@@ -386,37 +388,39 @@ pub struct MemoriesToml {
/// Minimum idle time between last thread activity and memory creation (hours). > 12h recommended.
pub min_rollout_idle_hours: Option<i64>,
/// Model used for thread summarisation.
pub phase_1_model: Option<String>,
pub extract_model: Option<String>,
/// Model used for memory consolidation.
pub phase_2_model: Option<String>,
pub consolidation_model: Option<String>,
}
/// Effective memories settings after defaults are applied.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MemoriesConfig {
pub no_memories_if_mcp_or_web_search: bool,
pub generate_memories: bool,
pub use_memories: bool,
pub max_raw_memories_for_global: usize,
pub max_raw_memories_for_consolidation: usize,
pub max_unused_days: i64,
pub max_rollout_age_days: i64,
pub max_rollouts_per_startup: usize,
pub min_rollout_idle_hours: i64,
pub phase_1_model: Option<String>,
pub phase_2_model: Option<String>,
pub extract_model: Option<String>,
pub consolidation_model: Option<String>,
}
impl Default for MemoriesConfig {
fn default() -> Self {
Self {
no_memories_if_mcp_or_web_search: false,
generate_memories: true,
use_memories: true,
max_raw_memories_for_global: DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL,
max_raw_memories_for_consolidation: DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION,
max_unused_days: DEFAULT_MEMORIES_MAX_UNUSED_DAYS,
max_rollout_age_days: DEFAULT_MEMORIES_MAX_ROLLOUT_AGE_DAYS,
max_rollouts_per_startup: DEFAULT_MEMORIES_MAX_ROLLOUTS_PER_STARTUP,
min_rollout_idle_hours: DEFAULT_MEMORIES_MIN_ROLLOUT_IDLE_HOURS,
phase_1_model: None,
phase_2_model: None,
extract_model: None,
consolidation_model: None,
}
}
}
@@ -425,11 +429,14 @@ impl From<MemoriesToml> for MemoriesConfig {
fn from(toml: MemoriesToml) -> Self {
let defaults = Self::default();
Self {
no_memories_if_mcp_or_web_search: toml
.no_memories_if_mcp_or_web_search
.unwrap_or(defaults.no_memories_if_mcp_or_web_search),
generate_memories: toml.generate_memories.unwrap_or(defaults.generate_memories),
use_memories: toml.use_memories.unwrap_or(defaults.use_memories),
max_raw_memories_for_global: toml
.max_raw_memories_for_global
.unwrap_or(defaults.max_raw_memories_for_global)
max_raw_memories_for_consolidation: toml
.max_raw_memories_for_consolidation
.unwrap_or(defaults.max_raw_memories_for_consolidation)
.min(4096),
max_unused_days: toml
.max_unused_days
@@ -447,8 +454,8 @@ impl From<MemoriesToml> for MemoriesConfig {
.min_rollout_idle_hours
.unwrap_or(defaults.min_rollout_idle_hours)
.clamp(1, 48),
phase_1_model: toml.phase_1_model,
phase_2_model: toml.phase_2_model,
extract_model: toml.extract_model,
consolidation_model: toml.consolidation_model,
}
}
}

View File

@@ -15,6 +15,7 @@ use crate::protocol::EventMsg;
use crate::protocol::McpInvocation;
use crate::protocol::McpToolCallBeginEvent;
use crate::protocol::McpToolCallEndEvent;
use crate::state_db;
use codex_protocol::mcp::CallToolResult;
use codex_protocol::models::FunctionCallOutputBody;
use codex_protocol::models::FunctionCallOutputPayload;
@@ -121,6 +122,7 @@ pub(crate) async fn handle_mcp_tool_call(
});
notify_mcp_tool_call_event(sess.as_ref(), turn_context, tool_call_begin_event)
.await;
maybe_mark_thread_memory_mode_polluted(sess.as_ref(), turn_context).await;
let start = Instant::now();
let result = sess
@@ -189,6 +191,7 @@ pub(crate) async fn handle_mcp_tool_call(
invocation: invocation.clone(),
});
notify_mcp_tool_call_event(sess.as_ref(), turn_context, tool_call_begin_event).await;
maybe_mark_thread_memory_mode_polluted(sess.as_ref(), turn_context).await;
let start = Instant::now();
// Perform the tool call.
@@ -224,6 +227,22 @@ pub(crate) async fn handle_mcp_tool_call(
ResponseInputItem::McpToolCallOutput { call_id, result }
}
async fn maybe_mark_thread_memory_mode_polluted(sess: &Session, turn_context: &TurnContext) {
if !turn_context
.config
.memories
.no_memories_if_mcp_or_web_search
{
return;
}
state_db::mark_thread_memory_mode_polluted(
sess.services.state_db.as_deref(),
sess.conversation_id,
"mcp_tool_call",
)
.await;
}
fn sanitize_mcp_tool_result_for_model(
supports_image_input: bool,
result: Result<CallToolResult, String>,

View File

@@ -193,7 +193,7 @@ async fn claim_startup_jobs(
async fn build_request_context(session: &Arc<Session>, config: &Config) -> RequestContext {
let model_name = config
.memories
.phase_1_model
.extract_model
.clone()
.unwrap_or(phase_one::MODEL.to_string());
let model = session

View File

@@ -52,7 +52,7 @@ pub(super) async fn run(session: &Arc<Session>, config: Arc<Config>) {
return;
};
let root = memory_root(&config.codex_home);
let max_raw_memories = config.memories.max_raw_memories_for_global;
let max_raw_memories = config.memories.max_raw_memories_for_consolidation;
let max_unused_days = config.memories.max_unused_days;
// 1. Claim the job.
@@ -294,7 +294,7 @@ mod agent {
agent_config.model = Some(
config
.memories
.phase_2_model
.consolidation_model
.clone()
.unwrap_or(phase_two::MODEL.to_string()),
);

View File

@@ -13,21 +13,21 @@ use crate::memories::rollout_summaries_dir;
pub(super) async fn rebuild_raw_memories_file_from_memories(
root: &Path,
memories: &[Stage1Output],
max_raw_memories_for_global: usize,
max_raw_memories_for_consolidation: usize,
) -> std::io::Result<()> {
ensure_layout(root).await?;
rebuild_raw_memories_file(root, memories, max_raw_memories_for_global).await
rebuild_raw_memories_file(root, memories, max_raw_memories_for_consolidation).await
}
/// Syncs canonical rollout summary files from DB-backed stage-1 output rows.
pub(super) async fn sync_rollout_summaries_from_memories(
root: &Path,
memories: &[Stage1Output],
max_raw_memories_for_global: usize,
max_raw_memories_for_consolidation: usize,
) -> std::io::Result<()> {
ensure_layout(root).await?;
let retained = retained_memories(memories, max_raw_memories_for_global);
let retained = retained_memories(memories, max_raw_memories_for_consolidation);
let keep = retained
.iter()
.map(rollout_summary_file_stem)
@@ -62,9 +62,9 @@ pub(super) async fn sync_rollout_summaries_from_memories(
async fn rebuild_raw_memories_file(
root: &Path,
memories: &[Stage1Output],
max_raw_memories_for_global: usize,
max_raw_memories_for_consolidation: usize,
) -> std::io::Result<()> {
let retained = retained_memories(memories, max_raw_memories_for_global);
let retained = retained_memories(memories, max_raw_memories_for_consolidation);
let mut body = String::from("# Raw Memories\n\n");
if retained.is_empty() {
@@ -155,9 +155,9 @@ async fn write_rollout_summary_for_thread(
fn retained_memories(
memories: &[Stage1Output],
max_raw_memories_for_global: usize,
max_raw_memories_for_consolidation: usize,
) -> &[Stage1Output] {
&memories[..memories.len().min(max_raw_memories_for_global)]
&memories[..memories.len().min(max_raw_memories_for_consolidation)]
}
fn raw_memories_format_error(err: std::fmt::Error) -> std::io::Error {

View File

@@ -1,6 +1,6 @@
use super::storage::rebuild_raw_memories_file_from_memories;
use super::storage::sync_rollout_summaries_from_memories;
use crate::config::types::DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL;
use crate::config::types::DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION;
use crate::memories::ensure_layout;
use crate::memories::memory_root;
use crate::memories::raw_memories_file;
@@ -95,14 +95,14 @@ async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only
sync_rollout_summaries_from_memories(
&root,
&memories,
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL,
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION,
)
.await
.expect("sync rollout summaries");
rebuild_raw_memories_file_from_memories(
&root,
&memories,
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL,
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION,
)
.await
.expect("rebuild raw memories");
@@ -201,7 +201,7 @@ async fn sync_rollout_summaries_uses_timestamp_hash_and_sanitized_slug_filename(
sync_rollout_summaries_from_memories(
&root,
&memories,
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL,
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION,
)
.await
.expect("sync rollout summaries");
@@ -304,14 +304,14 @@ task_outcome: success
sync_rollout_summaries_from_memories(
&root,
&memories,
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL,
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION,
)
.await
.expect("sync rollout summaries");
rebuild_raw_memories_file_from_memories(
&root,
&memories,
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL,
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION,
)
.await
.expect("rebuild raw memories");

View File

@@ -177,6 +177,7 @@ mod tests {
model_provider: None,
base_instructions: None,
dynamic_tools: None,
memory_mode: None,
},
git: None,
};

View File

@@ -129,6 +129,13 @@ pub(crate) async fn extract_metadata_from_rollout(
}
Ok(ExtractionOutcome {
metadata,
memory_mode: items.iter().rev().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => meta_line.meta.memory_mode.clone(),
RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
}),
parse_errors,
})
}
@@ -272,6 +279,7 @@ pub(crate) async fn backfill_sessions(
);
}
let mut metadata = outcome.metadata;
let memory_mode = outcome.memory_mode.unwrap_or_else(|| "enabled".to_string());
if rollout.archived && metadata.archived_at.is_none() {
let fallback_archived_at = metadata.updated_at;
metadata.archived_at = file_modified_time_utc(&rollout.path)
@@ -282,6 +290,17 @@ pub(crate) async fn backfill_sessions(
stats.failed = stats.failed.saturating_add(1);
warn!("failed to upsert rollout {}: {err}", rollout.path.display());
} else {
if let Err(err) = runtime
.set_thread_memory_mode(metadata.id, memory_mode.as_str())
.await
{
stats.failed = stats.failed.saturating_add(1);
warn!(
"failed to restore memory mode for {}: {err}",
rollout.path.display()
);
continue;
}
stats.upserted = stats.upserted.saturating_add(1);
if let Ok(meta_line) =
rollout::list::read_session_meta_line(&rollout.path).await
@@ -519,6 +538,7 @@ mod tests {
model_provider: Some("openai".to_string()),
base_instructions: None,
dynamic_tools: None,
memory_mode: None,
};
let session_meta_line = SessionMetaLine {
meta: session_meta,
@@ -543,9 +563,71 @@ mod tests {
expected.updated_at = file_modified_time_utc(&path).await.expect("mtime");
assert_eq!(outcome.metadata, expected);
assert_eq!(outcome.memory_mode, None);
assert_eq!(outcome.parse_errors, 0);
}
#[tokio::test]
async fn extract_metadata_from_rollout_returns_latest_memory_mode() {
let dir = tempdir().expect("tempdir");
let uuid = Uuid::new_v4();
let id = ThreadId::from_string(&uuid.to_string()).expect("thread id");
let path = dir
.path()
.join(format!("rollout-2026-01-27T12-34-56-{uuid}.jsonl"));
let session_meta = SessionMeta {
id,
forked_from_id: None,
timestamp: "2026-01-27T12:34:56Z".to_string(),
cwd: dir.path().to_path_buf(),
originator: "cli".to_string(),
cli_version: "0.0.0".to_string(),
source: SessionSource::default(),
agent_nickname: None,
agent_role: None,
model_provider: Some("openai".to_string()),
base_instructions: None,
dynamic_tools: None,
memory_mode: None,
};
let polluted_meta = SessionMeta {
memory_mode: Some("polluted".to_string()),
..session_meta.clone()
};
let lines = vec![
RolloutLine {
timestamp: "2026-01-27T12:34:56Z".to_string(),
item: RolloutItem::SessionMeta(SessionMetaLine {
meta: session_meta,
git: None,
}),
},
RolloutLine {
timestamp: "2026-01-27T12:35:00Z".to_string(),
item: RolloutItem::SessionMeta(SessionMetaLine {
meta: polluted_meta,
git: None,
}),
},
];
let mut file = File::create(&path).expect("create rollout");
for line in lines {
writeln!(
file,
"{}",
serde_json::to_string(&line).expect("serialize rollout line")
)
.expect("write rollout line");
}
let outcome = extract_metadata_from_rollout(&path, "openai", None)
.await
.expect("extract");
assert_eq!(outcome.memory_mode.as_deref(), Some("polluted"));
}
#[test]
fn builder_from_items_falls_back_to_filename() {
let dir = tempdir().expect("tempdir");
@@ -669,6 +751,7 @@ mod tests {
model_provider: Some("test-provider".to_string()),
base_instructions: None,
dynamic_tools: None,
memory_mode: None,
};
let session_meta_line = SessionMetaLine {
meta: session_meta,

View File

@@ -412,6 +412,8 @@ impl RolloutRecorder {
} else {
Some(dynamic_tools)
},
memory_mode: (!config.memories.generate_memories)
.then_some("disabled".to_string()),
};
(

View File

@@ -1109,6 +1109,7 @@ async fn test_updated_at_uses_file_mtime() -> Result<()> {
model_provider: Some("test-provider".into()),
base_instructions: None,
dynamic_tools: None,
memory_mode: None,
},
git: None,
}),

View File

@@ -337,6 +337,19 @@ pub async fn persist_dynamic_tools(
}
}
pub async fn mark_thread_memory_mode_polluted(
context: Option<&codex_state::StateRuntime>,
thread_id: ThreadId,
stage: &str,
) {
let Some(ctx) = context else {
return;
};
if let Err(err) = ctx.mark_thread_memory_mode_polluted(thread_id).await {
warn!("state db mark_thread_memory_mode_polluted failed during {stage}: {err}");
}
}
/// Reconcile rollout items into SQLite, falling back to scanning the rollout file.
pub async fn reconcile_rollout(
context: Option<&codex_state::StateRuntime>,
@@ -375,6 +388,7 @@ pub async fn reconcile_rollout(
}
};
let mut metadata = outcome.metadata;
let memory_mode = outcome.memory_mode.unwrap_or_else(|| "enabled".to_string());
metadata.cwd = normalize_cwd_for_state_db(&metadata.cwd);
match archived_only {
Some(true) if metadata.archived_at.is_none() => {
@@ -392,6 +406,16 @@ pub async fn reconcile_rollout(
);
return;
}
if let Err(err) = ctx
.set_thread_memory_mode(metadata.id, memory_mode.as_str())
.await
{
warn!(
"state db reconcile_rollout memory_mode update failed {}: {err}",
rollout_path.display()
);
return;
}
if let Ok(meta_line) = crate::rollout::list::read_session_meta_line(rollout_path).await {
persist_dynamic_tools(
Some(ctx),

View File

@@ -58,9 +58,31 @@ pub(crate) async fn record_completed_response_item(
) {
sess.record_conversation_items(turn_context, std::slice::from_ref(item))
.await;
maybe_mark_thread_memory_mode_polluted_from_web_search(sess, turn_context, item).await;
record_stage1_output_usage_for_completed_item(turn_context, item).await;
}
async fn maybe_mark_thread_memory_mode_polluted_from_web_search(
sess: &Session,
turn_context: &TurnContext,
item: &ResponseItem,
) {
if !turn_context
.config
.memories
.no_memories_if_mcp_or_web_search
|| !matches!(item, ResponseItem::WebSearchCall { .. })
{
return;
}
state_db::mark_thread_memory_mode_polluted(
sess.services.state_db.as_deref(),
sess.conversation_id,
"record_completed_response_item",
)
.await;
}
async fn record_stage1_output_usage_for_completed_item(
turn_context: &TurnContext,
item: &ResponseItem,

View File

@@ -20,6 +20,7 @@ use crate::protocol::EventMsg;
use crate::protocol::SessionConfiguredEvent;
use crate::rollout::RolloutRecorder;
use crate::rollout::truncation;
use crate::shell_snapshot::ShellSnapshot;
use crate::skills::SkillsManager;
use codex_protocol::ThreadId;
use codex_protocol::config_types::CollaborationModeMask;
@@ -479,6 +480,7 @@ impl ThreadManagerState {
self.session_source.clone(),
false,
None,
None,
)
.await
}
@@ -490,6 +492,7 @@ impl ThreadManagerState {
session_source: SessionSource,
persist_extended_history: bool,
metrics_service_name: Option<String>,
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
) -> CodexResult<NewThread> {
self.spawn_thread_with_source(
config,
@@ -500,6 +503,7 @@ impl ThreadManagerState {
Vec::new(),
persist_extended_history,
metrics_service_name,
inherited_shell_snapshot,
)
.await
}
@@ -510,6 +514,7 @@ impl ThreadManagerState {
rollout_path: PathBuf,
agent_control: AgentControl,
session_source: SessionSource,
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
) -> CodexResult<NewThread> {
let initial_history = RolloutRecorder::get_rollout_history(&rollout_path).await?;
self.spawn_thread_with_source(
@@ -521,6 +526,7 @@ impl ThreadManagerState {
Vec::new(),
false,
None,
inherited_shell_snapshot,
)
.await
}
@@ -532,6 +538,7 @@ impl ThreadManagerState {
agent_control: AgentControl,
session_source: SessionSource,
persist_extended_history: bool,
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
) -> CodexResult<NewThread> {
self.spawn_thread_with_source(
config,
@@ -542,6 +549,7 @@ impl ThreadManagerState {
Vec::new(),
persist_extended_history,
None,
inherited_shell_snapshot,
)
.await
}
@@ -567,6 +575,7 @@ impl ThreadManagerState {
dynamic_tools,
persist_extended_history,
metrics_service_name,
None,
)
.await
}
@@ -582,6 +591,7 @@ impl ThreadManagerState {
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
persist_extended_history: bool,
metrics_service_name: Option<String>,
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
) -> CodexResult<NewThread> {
let watch_registration = self
.file_watcher
@@ -602,6 +612,7 @@ impl ThreadManagerState {
dynamic_tools,
persist_extended_history,
metrics_service_name,
inherited_shell_snapshot,
)
.await?;
self.finalize_thread_spawn(codex, thread_id, watch_registration)

View File

@@ -11,7 +11,9 @@ use core_test_support::responses::ResponsesRequest;
use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::ev_web_search_call_done;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::mount_sse_sequence;
use core_test_support::responses::sse;
use core_test_support::responses::start_mock_server;
use core_test_support::test_codex::TestCodex;
@@ -157,11 +159,161 @@ async fn memories_startup_phase2_tracks_added_and_removed_inputs_across_runs() -
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn web_search_pollution_moves_selected_thread_into_removed_phase2_inputs() -> Result<()> {
let server = start_mock_server().await;
let home = Arc::new(TempDir::new()?);
let db = init_state_db(&home).await?;
let mut initial_builder = test_codex().with_home(home.clone()).with_config(|config| {
config.features.enable(Feature::Sqlite);
config.features.enable(Feature::MemoryTool);
config.memories.max_raw_memories_for_consolidation = 1;
config.memories.no_memories_if_mcp_or_web_search = true;
});
let initial = initial_builder.build(&server).await?;
mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-initial-1"),
ev_assistant_message("msg-initial-1", "initial turn complete"),
ev_completed("resp-initial-1"),
]),
)
.await;
initial.submit_turn("hello before memories").await?;
let rollout_path = initial
.session_configured
.rollout_path
.clone()
.expect("rollout path");
let thread_id = initial.session_configured.session_id;
let updated_at = {
let deadline = Instant::now() + Duration::from_secs(10);
loop {
if let Some(metadata) = db.get_thread(thread_id).await? {
break metadata.updated_at;
}
assert!(
Instant::now() < deadline,
"timed out waiting for thread metadata for {thread_id}"
);
tokio::time::sleep(Duration::from_millis(50)).await;
}
};
seed_stage1_output_for_existing_thread(
db.as_ref(),
thread_id,
updated_at.timestamp(),
"raw memory seeded for web search pollution",
"rollout summary seeded for web search pollution",
Some("pollution-rollout"),
)
.await?;
shutdown_test_codex(&initial).await?;
let responses = mount_sse_sequence(
&server,
vec![
sse(vec![
ev_response_created("resp-phase2-1"),
ev_assistant_message("msg-phase2-1", "phase2 complete"),
ev_completed("resp-phase2-1"),
]),
sse(vec![
ev_response_created("resp-web-1"),
ev_web_search_call_done("ws-1", "completed", "weather seattle"),
ev_completed("resp-web-1"),
]),
],
)
.await;
let mut resumed_builder = test_codex().with_home(home.clone()).with_config(|config| {
config.features.enable(Feature::Sqlite);
config.features.enable(Feature::MemoryTool);
config.memories.max_raw_memories_for_consolidation = 1;
config.memories.no_memories_if_mcp_or_web_search = true;
});
let resumed = resumed_builder
.resume(&server, home.clone(), rollout_path.clone())
.await?;
let first_phase2_request = wait_for_request(&responses, 1).await.remove(0);
let first_phase2_prompt = phase2_prompt_text(&first_phase2_request);
assert!(
first_phase2_prompt.contains("- selected inputs this run: 1"),
"expected seeded thread to be selected before pollution: {first_phase2_prompt}"
);
assert!(
first_phase2_prompt.contains("- newly added since the last successful Phase 2 run: 1"),
"expected seeded thread to be added before pollution: {first_phase2_prompt}"
);
assert!(
first_phase2_prompt.contains(&format!("- [added] thread_id={thread_id},")),
"expected selected thread in first phase2 prompt: {first_phase2_prompt}"
);
wait_for_phase2_success(db.as_ref(), thread_id).await?;
resumed
.submit_turn("search the web for weather seattle")
.await?;
assert_eq!(
{
let deadline = Instant::now() + Duration::from_secs(10);
loop {
let memory_mode = db.get_thread_memory_mode(thread_id).await?;
if memory_mode.as_deref() == Some("polluted") {
break memory_mode;
}
assert!(
Instant::now() < deadline,
"timed out waiting for polluted memory mode for {thread_id}"
);
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
.as_deref(),
Some("polluted")
);
let selection = {
let deadline = Instant::now() + Duration::from_secs(10);
loop {
let selection = db.get_phase2_input_selection(1, 30).await?;
if selection.selected.is_empty()
&& selection.retained_thread_ids.is_empty()
&& selection.removed.len() == 1
&& selection.removed[0].thread_id == thread_id
{
break selection;
}
assert!(
Instant::now() < deadline,
"timed out waiting for polluted thread to move into removed phase2 inputs: \
{selection:?}"
);
tokio::time::sleep(Duration::from_millis(50)).await;
}
};
assert_eq!(responses.requests().len(), 2);
assert!(selection.selected.is_empty());
assert_eq!(selection.retained_thread_ids, Vec::<ThreadId>::new());
assert_eq!(selection.removed.len(), 1);
assert_eq!(selection.removed[0].thread_id, thread_id);
shutdown_test_codex(&resumed).await?;
Ok(())
}
async fn build_test_codex(server: &wiremock::MockServer, home: Arc<TempDir>) -> Result<TestCodex> {
let mut builder = test_codex().with_home(home).with_config(|config| {
config.features.enable(Feature::Sqlite);
config.features.enable(Feature::MemoryTool);
config.memories.max_raw_memories_for_global = 1;
config.memories.max_raw_memories_for_consolidation = 1;
});
builder.build(server).await
}
@@ -195,46 +347,33 @@ async fn seed_stage1_output(
let metadata = metadata_builder.build("test-provider");
db.upsert_thread(&metadata).await?;
let claim = db
.try_claim_stage1_job(
thread_id,
ThreadId::new(),
updated_at.timestamp(),
3_600,
64,
)
.await?;
let ownership_token = match claim {
codex_state::Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
other => panic!("unexpected stage-1 claim outcome: {other:?}"),
};
assert!(
db.mark_stage1_job_succeeded(
thread_id,
&ownership_token,
updated_at.timestamp(),
raw_memory,
rollout_summary,
Some(rollout_slug),
)
.await?,
"stage-1 success should enqueue global consolidation"
);
seed_stage1_output_for_existing_thread(
db,
thread_id,
updated_at.timestamp(),
raw_memory,
rollout_summary,
Some(rollout_slug),
)
.await?;
Ok(thread_id)
}
async fn wait_for_single_request(mock: &ResponseMock) -> ResponsesRequest {
wait_for_request(mock, 1).await.remove(0)
}
async fn wait_for_request(mock: &ResponseMock, expected_count: usize) -> Vec<ResponsesRequest> {
let deadline = Instant::now() + Duration::from_secs(10);
loop {
let requests = mock.requests();
if let Some(request) = requests.into_iter().next() {
return request;
if requests.len() >= expected_count {
return requests;
}
assert!(
Instant::now() < deadline,
"timed out waiting for phase2 request"
"timed out waiting for {expected_count} phase2 requests"
);
tokio::time::sleep(Duration::from_millis(50)).await;
}
@@ -272,6 +411,39 @@ async fn wait_for_phase2_success(
}
}
async fn seed_stage1_output_for_existing_thread(
db: &codex_state::StateRuntime,
thread_id: ThreadId,
updated_at: i64,
raw_memory: &str,
rollout_summary: &str,
rollout_slug: Option<&str>,
) -> Result<()> {
let owner = ThreadId::new();
let claim = db
.try_claim_stage1_job(thread_id, owner, updated_at, 3_600, 64)
.await?;
let ownership_token = match claim {
codex_state::Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
other => panic!("unexpected stage-1 claim outcome: {other:?}"),
};
assert!(
db.mark_stage1_job_succeeded(
thread_id,
&ownership_token,
updated_at,
raw_memory,
rollout_summary,
rollout_slug,
)
.await?,
"stage-1 success should enqueue global consolidation"
);
Ok(())
}
async fn read_rollout_summary_bodies(memory_root: &Path) -> Result<Vec<String>> {
let mut dir = tokio::fs::read_dir(memory_root.join("rollout_summaries")).await?;
let mut summaries = Vec::new();

View File

@@ -71,6 +71,7 @@ async fn write_rollout_with_user_event(dir: &Path, thread_id: ThreadId) -> io::R
model_provider: None,
base_instructions: None,
dynamic_tools: None,
memory_mode: None,
},
git: None,
};
@@ -114,6 +115,7 @@ async fn write_rollout_with_meta_only(dir: &Path, thread_id: ThreadId) -> io::Re
model_provider: None,
base_instructions: None,
dynamic_tools: None,
memory_mode: None,
},
git: None,
};

View File

@@ -1,23 +1,36 @@
use anyhow::Result;
use codex_core::config::types::McpServerConfig;
use codex_core::config::types::McpServerTransportConfig;
use codex_core::features::Feature;
use codex_protocol::ThreadId;
use codex_protocol::dynamic_tools::DynamicToolSpec;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::SessionMeta;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::UserMessageEvent;
use codex_protocol::user_input::UserInput;
use core_test_support::responses;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_function_call;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::ev_web_search_call_done;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::mount_sse_sequence;
use core_test_support::responses::start_mock_server;
use core_test_support::skip_if_no_network;
use core_test_support::stdio_server_bin;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use core_test_support::wait_for_event_match;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::collections::HashMap;
use std::fs;
use tokio::time::Duration;
use tracing_subscriber::prelude::*;
@@ -128,6 +141,7 @@ async fn backfill_scans_existing_rollouts() -> Result<()> {
model_provider: None,
base_instructions: None,
dynamic_tools: Some(dynamic_tools_for_hook),
memory_mode: None,
},
git: None,
};
@@ -253,6 +267,148 @@ async fn user_messages_persist_in_state_db() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn web_search_marks_thread_memory_mode_polluted_when_configured() -> Result<()> {
let server = start_mock_server().await;
mount_sse_sequence(
&server,
vec![responses::sse(vec![
ev_response_created("resp-1"),
ev_web_search_call_done("ws-1", "completed", "weather seattle"),
ev_completed("resp-1"),
])],
)
.await;
let mut builder = test_codex().with_config(|config| {
config.features.enable(Feature::Sqlite);
config.memories.no_memories_if_mcp_or_web_search = true;
});
let test = builder.build(&server).await?;
let db = test.codex.state_db().expect("state db enabled");
let thread_id = test.session_configured.session_id;
test.submit_turn("search the web").await?;
let mut memory_mode = None;
for _ in 0..100 {
memory_mode = db.get_thread_memory_mode(thread_id).await?;
if memory_mode.as_deref() == Some("polluted") {
break;
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
assert_eq!(memory_mode.as_deref(), Some("polluted"));
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn mcp_call_marks_thread_memory_mode_polluted_when_configured() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let call_id = "call-123";
let server_name = "rmcp";
let tool_name = format!("mcp__{server_name}__echo");
mount_sse_once(
&server,
responses::sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, &tool_name, "{\"message\":\"ping\"}"),
ev_completed("resp-1"),
]),
)
.await;
mount_sse_once(
&server,
responses::sse(vec![
responses::ev_assistant_message("msg-1", "rmcp echo tool completed."),
ev_completed("resp-2"),
]),
)
.await;
let rmcp_test_server_bin = stdio_server_bin()?;
let mut builder = test_codex().with_config(move |config| {
config.features.enable(Feature::Sqlite);
config.memories.no_memories_if_mcp_or_web_search = true;
let mut servers = config.mcp_servers.get().clone();
servers.insert(
server_name.to_string(),
McpServerConfig {
transport: McpServerTransportConfig::Stdio {
command: rmcp_test_server_bin,
args: Vec::new(),
env: Some(HashMap::from([(
"MCP_TEST_VALUE".to_string(),
"propagated-env".to_string(),
)])),
env_vars: Vec::new(),
cwd: None,
},
enabled: true,
required: false,
disabled_reason: None,
startup_timeout_sec: Some(Duration::from_secs(10)),
tool_timeout_sec: None,
enabled_tools: None,
disabled_tools: None,
scopes: None,
oauth_resource: None,
},
);
config
.mcp_servers
.set(servers)
.expect("test mcp servers should accept any configuration");
});
let test = builder.build(&server).await?;
let db = test.codex.state_db().expect("state db enabled");
let thread_id = test.session_configured.session_id;
test.codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "call the rmcp echo tool".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: test.cwd_path().to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::new_read_only_policy(),
model: test.session_configured.model.clone(),
effort: None,
summary: None,
collaboration_mode: None,
personality: None,
})
.await?;
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::McpToolCallEnd(_))
})
.await;
wait_for_event_match(&test.codex, |event| match event {
EventMsg::Error(err) => Some(Err(anyhow::anyhow!(err.message.clone()))),
EventMsg::TurnComplete(_) => Some(Ok(())),
_ => None,
})
.await?;
let mut memory_mode = None;
for _ in 0..100 {
memory_mode = db.get_thread_memory_mode(thread_id).await?;
if memory_mode.as_deref() == Some("polluted") {
break;
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
assert_eq!(memory_mode.as_deref(), Some("polluted"));
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn tool_call_logs_include_thread_id() -> Result<()> {
let server = start_mock_server().await;

View File

@@ -2060,6 +2060,8 @@ pub struct SessionMeta {
pub base_instructions: Option<BaseInstructions>,
#[serde(skip_serializing_if = "Option::is_none")]
pub dynamic_tools: Option<Vec<DynamicToolSpec>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub memory_mode: Option<String>,
}
impl Default for SessionMeta {
@@ -2077,6 +2079,7 @@ impl Default for SessionMeta {
model_provider: None,
base_instructions: None,
dynamic_tools: None,
memory_mode: None,
}
}
}

View File

@@ -242,6 +242,7 @@ mod tests {
model_provider: Some("openai".to_string()),
base_instructions: None,
dynamic_tools: None,
memory_mode: None,
},
git: None,
}),

View File

@@ -45,6 +45,8 @@ pub struct ThreadsPage {
pub struct ExtractionOutcome {
/// The extracted thread metadata.
pub metadata: ThreadMetadata,
/// The explicit thread memory mode from rollout metadata, if present.
pub memory_mode: Option<String>,
/// The number of rollout lines that failed to parse.
pub parse_errors: usize,
}

View File

@@ -277,7 +277,8 @@ SELECT
FROM stage1_outputs AS so
LEFT JOIN threads AS t
ON t.id = so.thread_id
WHERE length(trim(so.raw_memory)) > 0 OR length(trim(so.rollout_summary)) > 0
WHERE t.memory_mode = 'enabled'
AND (length(trim(so.raw_memory)) > 0 OR length(trim(so.rollout_summary)) > 0)
ORDER BY so.source_updated_at DESC, so.thread_id DESC
LIMIT ?
"#,
@@ -304,11 +305,13 @@ LIMIT ?
/// `thread_id DESC`
/// - previously selected rows are identified by `selected_for_phase2 = 1`
/// - `previous_selected` contains the current persisted rows that belonged
/// to the last successful phase-2 baseline
/// to the last successful phase-2 baseline, even if those threads are no
/// longer memory-eligible
/// - `retained_thread_ids` records which current rows still match the exact
/// snapshot selected in the last successful phase-2 run
/// - removed rows are previously selected rows that are still present in
/// `stage1_outputs` but fall outside the current top-`n` selection
/// `stage1_outputs` but are no longer in the current selection, including
/// threads that are no longer memory-eligible
pub async fn get_phase2_input_selection(
&self,
n: usize,
@@ -336,7 +339,8 @@ SELECT
FROM stage1_outputs AS so
LEFT JOIN threads AS t
ON t.id = so.thread_id
WHERE (length(trim(so.raw_memory)) > 0 OR length(trim(so.rollout_summary)) > 0)
WHERE t.memory_mode = 'enabled'
AND (length(trim(so.raw_memory)) > 0 OR length(trim(so.rollout_summary)) > 0)
AND (
(so.last_usage IS NOT NULL AND so.last_usage >= ?)
OR (so.last_usage IS NULL AND so.source_updated_at >= ?)
@@ -421,6 +425,51 @@ ORDER BY so.source_updated_at DESC, so.thread_id DESC
})
}
/// Marks a thread as polluted and enqueues phase-2 forgetting when the
/// thread participated in the last successful phase-2 baseline.
pub async fn mark_thread_memory_mode_polluted(
&self,
thread_id: ThreadId,
) -> anyhow::Result<bool> {
let now = Utc::now().timestamp();
let thread_id = thread_id.to_string();
let mut tx = self.pool.begin().await?;
let rows_affected = sqlx::query(
r#"
UPDATE threads
SET memory_mode = 'polluted'
WHERE id = ? AND memory_mode != 'polluted'
"#,
)
.bind(thread_id.as_str())
.execute(&mut *tx)
.await?
.rows_affected();
if rows_affected == 0 {
tx.commit().await?;
return Ok(false);
}
let selected_for_phase2 = sqlx::query_scalar::<_, i64>(
r#"
SELECT selected_for_phase2
FROM stage1_outputs
WHERE thread_id = ?
"#,
)
.bind(thread_id.as_str())
.fetch_optional(&mut *tx)
.await?
.unwrap_or(0);
if selected_for_phase2 != 0 {
enqueue_global_consolidation_with_executor(&mut *tx, now).await?;
}
tx.commit().await?;
Ok(true)
}
/// Attempts to claim a stage-1 job for a thread at `source_updated_at`.
///
/// Claim semantics:
@@ -2569,6 +2618,71 @@ VALUES (?, ?, ?, ?, ?)
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn list_stage1_outputs_for_global_skips_polluted_threads() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
.await
.expect("initialize runtime");
let thread_id_enabled =
ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id");
let thread_id_polluted =
ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id");
let owner = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id");
for (thread_id, workspace) in [
(thread_id_enabled, "workspace-enabled"),
(thread_id_polluted, "workspace-polluted"),
] {
runtime
.upsert_thread(&test_thread_metadata(
&codex_home,
thread_id,
codex_home.join(workspace),
))
.await
.expect("upsert thread");
let claim = runtime
.try_claim_stage1_job(thread_id, owner, 100, 3600, 64)
.await
.expect("claim stage1");
let ownership_token = match claim {
Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
other => panic!("unexpected stage1 claim outcome: {other:?}"),
};
assert!(
runtime
.mark_stage1_job_succeeded(
thread_id,
ownership_token.as_str(),
100,
"raw memory",
"summary",
None,
)
.await
.expect("mark stage1 succeeded"),
"stage1 success should persist output"
);
}
runtime
.set_thread_memory_mode(thread_id_polluted, "polluted")
.await
.expect("mark thread polluted");
let outputs = runtime
.list_stage1_outputs_for_global(10)
.await
.expect("list stage1 outputs for global");
assert_eq!(outputs.len(), 1);
assert_eq!(outputs[0].thread_id, thread_id_enabled);
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn get_phase2_input_selection_reports_added_retained_and_removed_rows() {
let codex_home = unique_temp_dir();
@@ -2681,6 +2795,197 @@ VALUES (?, ?, ?, ?, ?)
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn get_phase2_input_selection_marks_polluted_previous_selection_as_removed() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
.await
.expect("initialize runtime");
let thread_id_enabled =
ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id");
let thread_id_polluted =
ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id");
let owner = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id");
for (thread_id, updated_at) in [(thread_id_enabled, 100), (thread_id_polluted, 101)] {
runtime
.upsert_thread(&test_thread_metadata(
&codex_home,
thread_id,
codex_home.join(thread_id.to_string()),
))
.await
.expect("upsert thread");
let claim = runtime
.try_claim_stage1_job(thread_id, owner, updated_at, 3600, 64)
.await
.expect("claim stage1");
let ownership_token = match claim {
Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
other => panic!("unexpected stage1 claim outcome: {other:?}"),
};
assert!(
runtime
.mark_stage1_job_succeeded(
thread_id,
ownership_token.as_str(),
updated_at,
&format!("raw-{updated_at}"),
&format!("summary-{updated_at}"),
None,
)
.await
.expect("mark stage1 succeeded"),
"stage1 success should persist output"
);
}
let claim = runtime
.try_claim_global_phase2_job(owner, 3600)
.await
.expect("claim phase2");
let (ownership_token, input_watermark) = match claim {
Phase2JobClaimOutcome::Claimed {
ownership_token,
input_watermark,
} => (ownership_token, input_watermark),
other => panic!("unexpected phase2 claim outcome: {other:?}"),
};
let selected_outputs = runtime
.list_stage1_outputs_for_global(10)
.await
.expect("list stage1 outputs for global");
assert!(
runtime
.mark_global_phase2_job_succeeded(
ownership_token.as_str(),
input_watermark,
&selected_outputs,
)
.await
.expect("mark phase2 success"),
"phase2 success should persist selected rows"
);
runtime
.set_thread_memory_mode(thread_id_polluted, "polluted")
.await
.expect("mark thread polluted");
let selection = runtime
.get_phase2_input_selection(2, 36_500)
.await
.expect("load phase2 input selection");
assert_eq!(selection.selected.len(), 1);
assert_eq!(selection.selected[0].thread_id, thread_id_enabled);
assert_eq!(selection.previous_selected.len(), 2);
assert!(
selection
.previous_selected
.iter()
.any(|item| item.thread_id == thread_id_enabled)
);
assert!(
selection
.previous_selected
.iter()
.any(|item| item.thread_id == thread_id_polluted)
);
assert_eq!(selection.retained_thread_ids, vec![thread_id_enabled]);
assert_eq!(selection.removed.len(), 1);
assert_eq!(selection.removed[0].thread_id, thread_id_polluted);
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn mark_thread_memory_mode_polluted_enqueues_phase2_for_selected_threads() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
.await
.expect("initialize runtime");
let thread_id = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id");
let owner = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id");
runtime
.upsert_thread(&test_thread_metadata(
&codex_home,
thread_id,
codex_home.join("workspace"),
))
.await
.expect("upsert thread");
let claim = runtime
.try_claim_stage1_job(thread_id, owner, 100, 3600, 64)
.await
.expect("claim stage1");
let ownership_token = match claim {
Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
other => panic!("unexpected stage1 claim outcome: {other:?}"),
};
assert!(
runtime
.mark_stage1_job_succeeded(
thread_id,
ownership_token.as_str(),
100,
"raw",
"summary",
None,
)
.await
.expect("mark stage1 succeeded"),
"stage1 success should persist output"
);
let phase2_claim = runtime
.try_claim_global_phase2_job(owner, 3600)
.await
.expect("claim phase2");
let (phase2_token, input_watermark) = match phase2_claim {
Phase2JobClaimOutcome::Claimed {
ownership_token,
input_watermark,
} => (ownership_token, input_watermark),
other => panic!("unexpected phase2 claim outcome: {other:?}"),
};
let selected_outputs = runtime
.list_stage1_outputs_for_global(10)
.await
.expect("list stage1 outputs");
assert!(
runtime
.mark_global_phase2_job_succeeded(
phase2_token.as_str(),
input_watermark,
&selected_outputs,
)
.await
.expect("mark phase2 success"),
"phase2 success should persist selected rows"
);
assert!(
runtime
.mark_thread_memory_mode_polluted(thread_id)
.await
.expect("mark thread polluted"),
"thread should transition to polluted"
);
let next_claim = runtime
.try_claim_global_phase2_job(owner, 3600)
.await
.expect("claim phase2 after pollution");
assert!(matches!(next_claim, Phase2JobClaimOutcome::Claimed { .. }));
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn get_phase2_input_selection_treats_regenerated_selected_rows_as_added() {
let codex_home = unique_temp_dir();

View File

@@ -35,6 +35,14 @@ WHERE id = ?
.transpose()
}
pub async fn get_thread_memory_mode(&self, id: ThreadId) -> anyhow::Result<Option<String>> {
let row = sqlx::query("SELECT memory_mode FROM threads WHERE id = ?")
.bind(id.to_string())
.fetch_optional(self.pool.as_ref())
.await?;
Ok(row.and_then(|row| row.try_get("memory_mode").ok()))
}
/// Get dynamic tools for a thread, if present.
pub async fn get_dynamic_tools(
&self,
@@ -199,6 +207,19 @@ FROM threads
.await
}
pub async fn set_thread_memory_mode(
&self,
thread_id: ThreadId,
memory_mode: &str,
) -> anyhow::Result<bool> {
let result = sqlx::query("UPDATE threads SET memory_mode = ? WHERE id = ?")
.bind(memory_mode)
.bind(thread_id.to_string())
.execute(self.pool.as_ref())
.await?;
Ok(result.rows_affected() > 0)
}
async fn upsert_thread_with_creation_memory_mode(
&self,
metadata: &crate::ThreadMetadata,
@@ -357,6 +378,16 @@ ON CONFLICT(thread_id, position) DO NOTHING
}
return Err(err);
}
if let Some(memory_mode) = extract_memory_mode(items)
&& let Err(err) = self
.set_thread_memory_mode(builder.id, memory_mode.as_str())
.await
{
if let Some(otel) = otel {
otel.counter(DB_ERROR_METRIC, 1, &[("stage", "set_thread_memory_mode")]);
}
return Err(err);
}
let dynamic_tools = extract_dynamic_tools(items);
if let Some(dynamic_tools) = dynamic_tools
&& let Err(err) = self
@@ -438,6 +469,16 @@ pub(super) fn extract_dynamic_tools(items: &[RolloutItem]) -> Option<Option<Vec<
})
}
pub(super) fn extract_memory_mode(items: &[RolloutItem]) -> Option<String> {
items.iter().rev().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => meta_line.meta.memory_mode.clone(),
RolloutItem::ResponseItem(_)
| RolloutItem::Compacted(_)
| RolloutItem::TurnContext(_)
| RolloutItem::EventMsg(_) => None,
})
}
pub(super) fn push_thread_filters<'a>(
builder: &mut QueryBuilder<'a, Sqlite>,
archived_only: bool,
@@ -518,7 +559,11 @@ mod tests {
use super::*;
use crate::runtime::test_support::test_thread_metadata;
use crate::runtime::test_support::unique_temp_dir;
use codex_protocol::protocol::SessionMeta;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::SessionSource;
use pretty_assertions::assert_eq;
use std::path::PathBuf;
#[tokio::test]
async fn upsert_thread_keeps_creation_memory_mode_for_existing_rows() {
@@ -557,4 +602,56 @@ mod tests {
.expect("memory mode should remain readable");
assert_eq!(memory_mode, "disabled");
}
#[tokio::test]
async fn apply_rollout_items_restores_memory_mode_from_session_meta() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
.await
.expect("state db should initialize");
let thread_id =
ThreadId::from_string("00000000-0000-0000-0000-000000000456").expect("valid thread id");
let metadata = test_thread_metadata(&codex_home, thread_id, codex_home.clone());
runtime
.upsert_thread(&metadata)
.await
.expect("initial upsert should succeed");
let builder = ThreadMetadataBuilder::new(
thread_id,
metadata.rollout_path.clone(),
metadata.created_at,
SessionSource::Cli,
);
let items = vec![RolloutItem::SessionMeta(SessionMetaLine {
meta: SessionMeta {
id: thread_id,
forked_from_id: None,
timestamp: metadata.created_at.to_rfc3339(),
cwd: PathBuf::new(),
originator: String::new(),
cli_version: String::new(),
source: SessionSource::Cli,
agent_nickname: None,
agent_role: None,
model_provider: None,
base_instructions: None,
dynamic_tools: None,
memory_mode: Some("polluted".to_string()),
},
git: None,
})];
runtime
.apply_rollout_items(&builder, &items, None, None)
.await
.expect("apply_rollout_items should succeed");
let memory_mode = runtime
.get_thread_memory_mode(thread_id)
.await
.expect("memory mode should load");
assert_eq!(memory_mode.as_deref(), Some("polluted"));
}
}

View File

@@ -1246,8 +1246,8 @@ impl App {
.collect();
self.chat_widget.show_selection_view(SelectionViewParams {
title: Some("Agents".to_string()),
subtitle: Some("Select a thread to focus".to_string()),
title: Some("Multi-agents".to_string()),
subtitle: Some("Select an agent to watch".to_string()),
footer_hint: Some(standard_popup_hint_line()),
items,
initial_selected_idx,

View File

@@ -342,7 +342,7 @@ mod tests {
CommandItem::UserPrompt(_) => None,
})
.collect();
assert_eq!(cmds, vec!["model", "mention", "mcp"]);
assert_eq!(cmds, vec!["model", "mention", "mcp", "multi-agents"]);
}
#[test]

View File

@@ -129,6 +129,7 @@ pub(crate) enum CancellationEvent {
NotHandled,
}
use crate::bottom_pane::prompt_args::parse_slash_name;
pub(crate) use chat_composer::ChatComposer;
pub(crate) use chat_composer::ChatComposerConfig;
pub(crate) use chat_composer::InputResult;
@@ -398,11 +399,20 @@ impl BottomPane {
self.request_redraw();
InputResult::None
} else {
let is_agent_command = self
.composer_text()
.lines()
.next()
.and_then(parse_slash_name)
.is_some_and(|(name, _, _)| name == "agent");
// If a task is running and a status line is visible, allow Esc to
// send an interrupt even while the composer has focus.
// When a popup is active, prefer dismissing it over interrupting the task.
if key_event.code == KeyCode::Esc
&& matches!(key_event.kind, KeyEventKind::Press | KeyEventKind::Repeat)
&& self.is_task_running
&& !is_agent_command
&& !self.composer.popup_active()
&& let Some(status) = &self.status
{
@@ -1593,6 +1603,90 @@ mod tests {
assert_eq!(pane.composer_text(), "/");
}
#[test]
fn esc_with_agent_command_without_popup_does_not_interrupt_task() {
let (tx_raw, mut rx) = unbounded_channel::<AppEvent>();
let tx = AppEventSender::new(tx_raw);
let mut pane = BottomPane::new(BottomPaneParams {
app_event_tx: tx,
frame_requester: FrameRequester::test_dummy(),
has_input_focus: true,
enhanced_keys_supported: false,
placeholder_text: "Ask Codex to do anything".to_string(),
disable_paste_burst: false,
animations_enabled: true,
skills: Some(Vec::new()),
});
pane.set_task_running(true);
// Repro: `/agent ` hides the popup (cursor past command name). Esc should
// keep editing command text instead of interrupting the running task.
pane.insert_str("/agent ");
assert!(
!pane.composer.popup_active(),
"expected command popup to be hidden after entering `/agent `"
);
pane.handle_key_event(KeyEvent::new(KeyCode::Esc, KeyModifiers::NONE));
while let Ok(ev) = rx.try_recv() {
assert!(
!matches!(ev, AppEvent::CodexOp(Op::Interrupt)),
"expected Esc to not send Op::Interrupt while typing `/agent`"
);
}
assert_eq!(pane.composer_text(), "/agent ");
}
#[test]
fn esc_release_after_dismissing_agent_picker_does_not_interrupt_task() {
let (tx_raw, mut rx) = unbounded_channel::<AppEvent>();
let tx = AppEventSender::new(tx_raw);
let mut pane = BottomPane::new(BottomPaneParams {
app_event_tx: tx,
frame_requester: FrameRequester::test_dummy(),
has_input_focus: true,
enhanced_keys_supported: false,
placeholder_text: "Ask Codex to do anything".to_string(),
disable_paste_burst: false,
animations_enabled: true,
skills: Some(Vec::new()),
});
pane.set_task_running(true);
pane.show_selection_view(SelectionViewParams {
title: Some("Agents".to_string()),
items: vec![SelectionItem {
name: "Main".to_string(),
..Default::default()
}],
..Default::default()
});
pane.handle_key_event(KeyEvent::new_with_kind(
KeyCode::Esc,
KeyModifiers::NONE,
KeyEventKind::Press,
));
pane.handle_key_event(KeyEvent::new_with_kind(
KeyCode::Esc,
KeyModifiers::NONE,
KeyEventKind::Release,
));
while let Ok(ev) = rx.try_recv() {
assert!(
!matches!(ev, AppEvent::CodexOp(Op::Interrupt)),
"expected Esc release after dismissing agent picker to not interrupt"
);
}
assert!(
pane.no_modal_or_popup_active(),
"expected Esc press to dismiss the agent picker"
);
}
#[test]
fn esc_interrupts_running_task_when_no_popup() {
let (tx_raw, mut rx) = unbounded_channel::<AppEvent>();

View File

@@ -1147,6 +1147,10 @@ impl ChatWidget {
Some(event.reasoning_effort),
None,
);
if let Some(mask) = self.active_collaboration_mask.as_mut() {
mask.model = Some(model_for_header.clone());
mask.reasoning_effort = Some(event.reasoning_effort);
}
self.refresh_model_display();
self.sync_personality_command_enabled();
let startup_tooltip_override = self.startup_tooltip_override.take();
@@ -3605,7 +3609,7 @@ impl ChatWidget {
}
self.open_collaboration_modes_popup();
}
SlashCommand::Agent => {
SlashCommand::Agent | SlashCommand::MultiAgents => {
self.app_event_tx.send(AppEvent::OpenAgentPicker);
}
SlashCommand::Approvals => {

View File

@@ -38,7 +38,7 @@ struct AgentLabel<'a> {
pub(crate) fn agent_picker_status_dot_spans(is_closed: bool) -> Vec<Span<'static>> {
let dot = if is_closed {
"".dark_gray()
"".into()
} else {
"".green()
};
@@ -283,16 +283,16 @@ fn agent_label_spans(agent: AgentLabel<'_>) -> Vec<Span<'static>> {
let role = agent.role.map(str::trim).filter(|role| !role.is_empty());
if let Some(nickname) = nickname {
spans.push(Span::from(nickname.to_string()).light_blue().bold());
spans.push(Span::from(nickname.to_string()).cyan().bold());
} else if let Some(thread_id) = agent.thread_id {
spans.push(Span::from(thread_id.to_string()).dim());
spans.push(Span::from(thread_id.to_string()).cyan());
} else {
spans.push(Span::from("agent").dim());
spans.push(Span::from("agent").cyan());
}
if let Some(role) = role {
spans.push(Span::from(" ").dim());
spans.push(Span::from(format!("[{role}]")).dim());
spans.push(Span::from(format!("[{role}]")));
}
spans
@@ -346,7 +346,7 @@ fn wait_complete_lines(
agent_statuses: &[CollabAgentStatusEntry],
) -> Vec<Line<'static>> {
if statuses.is_empty() && agent_statuses.is_empty() {
return vec![Line::from(Span::from("No agents completed yet").dim())];
return vec![Line::from(Span::from("No agents completed yet"))];
}
let entries = if agent_statuses.is_empty() {
@@ -409,7 +409,7 @@ fn status_summary_line(status: &AgentStatus) -> Line<'static> {
fn status_summary_spans(status: &AgentStatus) -> Vec<Span<'static>> {
match status {
AgentStatus::PendingInit => vec![Span::from("Pending init").dim()],
AgentStatus::PendingInit => vec![Span::from("Pending init").cyan()],
AgentStatus::Running => vec![Span::from("Running").cyan().bold()],
AgentStatus::Completed(message) => {
let mut spans = vec![Span::from("Completed").green()];
@@ -433,11 +433,11 @@ fn status_summary_spans(status: &AgentStatus) -> Vec<Span<'static>> {
);
if !error_preview.is_empty() {
spans.push(Span::from(" - ").dim());
spans.push(Span::from(error_preview).dim());
spans.push(Span::from(error_preview));
}
spans
}
AgentStatus::Shutdown => vec![Span::from("Shutdown").dim()],
AgentStatus::Shutdown => vec![Span::from("Shutdown")],
AgentStatus::NotFound => vec![Span::from("Not found").red()],
}
}
@@ -553,10 +553,11 @@ mod tests {
let lines = cell.display_lines(200);
let title = &lines[0];
assert_eq!(title.spans[2].content.as_ref(), "Robie");
assert_eq!(title.spans[2].style.fg, Some(Color::LightBlue));
assert_eq!(title.spans[2].style.fg, Some(Color::Cyan));
assert!(title.spans[2].style.add_modifier.contains(Modifier::BOLD));
assert_eq!(title.spans[4].content.as_ref(), "[explorer]");
assert!(title.spans[4].style.add_modifier.contains(Modifier::DIM));
assert_eq!(title.spans[4].style.fg, None);
assert!(!title.spans[4].style.add_modifier.contains(Modifier::DIM));
}
fn cell_to_text(cell: &PlainHistoryCell) -> String {

View File

@@ -53,6 +53,7 @@ pub enum SlashCommand {
Realtime,
Settings,
TestApproval,
MultiAgents,
// Debugging commands.
#[strum(serialize = "debug-m-drop")]
MemoryDrop,
@@ -93,7 +94,7 @@ impl SlashCommand {
SlashCommand::Settings => "configure realtime microphone/speaker",
SlashCommand::Plan => "switch to Plan mode",
SlashCommand::Collab => "change collaboration mode (experimental)",
SlashCommand::Agent => "switch the active agent thread",
SlashCommand::Agent | SlashCommand::MultiAgents => "switch the active agent thread",
SlashCommand::Approvals => "choose what Codex is allowed to do",
SlashCommand::Permissions => "choose what Codex is allowed to do",
SlashCommand::ElevateSandbox => "set up elevated agent sandbox",
@@ -167,7 +168,7 @@ impl SlashCommand {
SlashCommand::Realtime => true,
SlashCommand::Settings => true,
SlashCommand::Collab => true,
SlashCommand::Agent => true,
SlashCommand::Agent | SlashCommand::MultiAgents => true,
SlashCommand::Statusline => false,
SlashCommand::Theme => false,
}