mirror of
https://github.com/openai/codex.git
synced 2026-03-02 20:53:19 +00:00
Compare commits
9 Commits
dev/mousse
...
fix/notify
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ad1a8040c7 | ||
|
|
9a42a56d8f | ||
|
|
c2e126f92a | ||
|
|
2a5bcc053f | ||
|
|
1905597017 | ||
|
|
b649953845 | ||
|
|
b08bdd91e3 | ||
|
|
9685e7d6d1 | ||
|
|
d94f0b6ce7 |
109
.github/workflows/issue-deduplicator.yml
vendored
109
.github/workflows/issue-deduplicator.yml
vendored
@@ -7,15 +7,17 @@ on:
|
||||
- labeled
|
||||
|
||||
jobs:
|
||||
gather-duplicates:
|
||||
name: Identify potential duplicates
|
||||
gather-duplicates-all:
|
||||
name: Identify potential duplicates (all issues)
|
||||
# Prevent runs on forks (requires OpenAI API key, wastes Actions minutes)
|
||||
if: github.repository == 'openai/codex' && (github.event.action == 'opened' || (github.event.action == 'labeled' && github.event.label.name == 'codex-deduplicate'))
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: read
|
||||
outputs:
|
||||
codex_output: ${{ steps.select-final.outputs.codex_output }}
|
||||
issues_json: ${{ steps.normalize-all.outputs.issues_json }}
|
||||
reason: ${{ steps.normalize-all.outputs.reason }}
|
||||
has_matches: ${{ steps.normalize-all.outputs.has_matches }}
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
|
||||
@@ -29,7 +31,6 @@ jobs:
|
||||
|
||||
CURRENT_ISSUE_FILE=codex-current-issue.json
|
||||
EXISTING_ALL_FILE=codex-existing-issues-all.json
|
||||
EXISTING_OPEN_FILE=codex-existing-issues-open.json
|
||||
|
||||
gh issue list --repo "$REPO" \
|
||||
--json number,title,body,createdAt,updatedAt,state,labels \
|
||||
@@ -47,22 +48,6 @@ jobs:
|
||||
}]' \
|
||||
> "$EXISTING_ALL_FILE"
|
||||
|
||||
gh issue list --repo "$REPO" \
|
||||
--json number,title,body,createdAt,updatedAt,state,labels \
|
||||
--limit 1000 \
|
||||
--state open \
|
||||
--search "sort:created-desc" \
|
||||
| jq '[.[] | {
|
||||
number,
|
||||
title,
|
||||
body: ((.body // "")[0:4000]),
|
||||
createdAt,
|
||||
updatedAt,
|
||||
state,
|
||||
labels: ((.labels // []) | map(.name))
|
||||
}]' \
|
||||
> "$EXISTING_OPEN_FILE"
|
||||
|
||||
gh issue view "$ISSUE_NUMBER" \
|
||||
--repo "$REPO" \
|
||||
--json number,title,body \
|
||||
@@ -71,7 +56,6 @@ jobs:
|
||||
|
||||
echo "Prepared duplicate detection input files."
|
||||
echo "all_issue_count=$(jq 'length' "$EXISTING_ALL_FILE")"
|
||||
echo "open_issue_count=$(jq 'length' "$EXISTING_OPEN_FILE")"
|
||||
|
||||
# Prompt instructions are intentionally inline in this workflow. The old
|
||||
# .github/prompts/issue-deduplicator.txt file is obsolete and removed.
|
||||
@@ -158,9 +142,59 @@ jobs:
|
||||
echo "has_matches=$has_matches"
|
||||
} >> "$GITHUB_OUTPUT"
|
||||
|
||||
gather-duplicates-open:
|
||||
name: Identify potential duplicates (open issues fallback)
|
||||
# Pass 1 may drop sudo on the runner, so run the fallback in a fresh job.
|
||||
needs: gather-duplicates-all
|
||||
if: ${{ needs.gather-duplicates-all.result == 'success' && needs.gather-duplicates-all.outputs.has_matches != 'true' }}
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: read
|
||||
outputs:
|
||||
issues_json: ${{ steps.normalize-open.outputs.issues_json }}
|
||||
reason: ${{ steps.normalize-open.outputs.reason }}
|
||||
has_matches: ${{ steps.normalize-open.outputs.has_matches }}
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
|
||||
- name: Prepare Codex inputs
|
||||
env:
|
||||
GH_TOKEN: ${{ github.token }}
|
||||
REPO: ${{ github.repository }}
|
||||
ISSUE_NUMBER: ${{ github.event.issue.number }}
|
||||
run: |
|
||||
set -eo pipefail
|
||||
|
||||
CURRENT_ISSUE_FILE=codex-current-issue.json
|
||||
EXISTING_OPEN_FILE=codex-existing-issues-open.json
|
||||
|
||||
gh issue list --repo "$REPO" \
|
||||
--json number,title,body,createdAt,updatedAt,state,labels \
|
||||
--limit 1000 \
|
||||
--state open \
|
||||
--search "sort:created-desc" \
|
||||
| jq '[.[] | {
|
||||
number,
|
||||
title,
|
||||
body: ((.body // "")[0:4000]),
|
||||
createdAt,
|
||||
updatedAt,
|
||||
state,
|
||||
labels: ((.labels // []) | map(.name))
|
||||
}]' \
|
||||
> "$EXISTING_OPEN_FILE"
|
||||
|
||||
gh issue view "$ISSUE_NUMBER" \
|
||||
--repo "$REPO" \
|
||||
--json number,title,body \
|
||||
| jq '{number, title, body: ((.body // "")[0:4000])}' \
|
||||
> "$CURRENT_ISSUE_FILE"
|
||||
|
||||
echo "Prepared fallback duplicate detection input files."
|
||||
echo "open_issue_count=$(jq 'length' "$EXISTING_OPEN_FILE")"
|
||||
|
||||
- id: codex-open
|
||||
name: Find duplicates (pass 2, open issues)
|
||||
if: ${{ steps.normalize-all.outputs.has_matches != 'true' }}
|
||||
uses: openai/codex-action@main
|
||||
with:
|
||||
openai-api-key: ${{ secrets.CODEX_OPENAI_API_KEY }}
|
||||
@@ -200,7 +234,6 @@ jobs:
|
||||
|
||||
- id: normalize-open
|
||||
name: Normalize pass 2 output
|
||||
if: ${{ steps.normalize-all.outputs.has_matches != 'true' }}
|
||||
env:
|
||||
CODEX_OUTPUT: ${{ steps.codex-open.outputs.final-message }}
|
||||
CURRENT_ISSUE_NUMBER: ${{ github.event.issue.number }}
|
||||
@@ -243,15 +276,27 @@ jobs:
|
||||
echo "has_matches=$has_matches"
|
||||
} >> "$GITHUB_OUTPUT"
|
||||
|
||||
select-final:
|
||||
name: Select final duplicate set
|
||||
needs:
|
||||
- gather-duplicates-all
|
||||
- gather-duplicates-open
|
||||
if: ${{ always() && needs.gather-duplicates-all.result == 'success' && (needs.gather-duplicates-open.result == 'success' || needs.gather-duplicates-open.result == 'skipped') }}
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: read
|
||||
outputs:
|
||||
codex_output: ${{ steps.select-final.outputs.codex_output }}
|
||||
steps:
|
||||
- id: select-final
|
||||
name: Select final duplicate set
|
||||
env:
|
||||
PASS1_ISSUES: ${{ steps.normalize-all.outputs.issues_json }}
|
||||
PASS1_REASON: ${{ steps.normalize-all.outputs.reason }}
|
||||
PASS2_ISSUES: ${{ steps.normalize-open.outputs.issues_json }}
|
||||
PASS2_REASON: ${{ steps.normalize-open.outputs.reason }}
|
||||
PASS1_HAS_MATCHES: ${{ steps.normalize-all.outputs.has_matches }}
|
||||
PASS2_HAS_MATCHES: ${{ steps.normalize-open.outputs.has_matches }}
|
||||
PASS1_ISSUES: ${{ needs.gather-duplicates-all.outputs.issues_json }}
|
||||
PASS1_REASON: ${{ needs.gather-duplicates-all.outputs.reason }}
|
||||
PASS2_ISSUES: ${{ needs.gather-duplicates-open.outputs.issues_json }}
|
||||
PASS2_REASON: ${{ needs.gather-duplicates-open.outputs.reason }}
|
||||
PASS1_HAS_MATCHES: ${{ needs.gather-duplicates-all.outputs.has_matches }}
|
||||
PASS2_HAS_MATCHES: ${{ needs.gather-duplicates-open.outputs.has_matches }}
|
||||
run: |
|
||||
set -eo pipefail
|
||||
|
||||
@@ -289,8 +334,8 @@ jobs:
|
||||
|
||||
comment-on-issue:
|
||||
name: Comment with potential duplicates
|
||||
needs: gather-duplicates
|
||||
if: ${{ needs.gather-duplicates.result != 'skipped' }}
|
||||
needs: select-final
|
||||
if: ${{ needs.select-final.result != 'skipped' }}
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: read
|
||||
@@ -299,7 +344,7 @@ jobs:
|
||||
- name: Comment on issue
|
||||
uses: actions/github-script@v8
|
||||
env:
|
||||
CODEX_OUTPUT: ${{ needs.gather-duplicates.outputs.codex_output }}
|
||||
CODEX_OUTPUT: ${{ needs.select-final.outputs.codex_output }}
|
||||
with:
|
||||
github-token: ${{ github.token }}
|
||||
script: |
|
||||
|
||||
@@ -84,6 +84,7 @@ pub fn create_fake_rollout_with_source(
|
||||
model_provider: model_provider.map(str::to_string),
|
||||
base_instructions: None,
|
||||
dynamic_tools: None,
|
||||
memory_mode: None,
|
||||
};
|
||||
let payload = serde_json::to_value(SessionMetaLine {
|
||||
meta,
|
||||
@@ -165,6 +166,7 @@ pub fn create_fake_rollout_with_text_elements(
|
||||
model_provider: model_provider.map(str::to_string),
|
||||
base_instructions: None,
|
||||
dynamic_tools: None,
|
||||
memory_mode: None,
|
||||
};
|
||||
let payload = serde_json::to_value(SessionMetaLine {
|
||||
meta,
|
||||
|
||||
@@ -207,12 +207,13 @@ tmp_path.replace(payload_path)
|
||||
let notify_script = notify_script
|
||||
.to_str()
|
||||
.expect("notify script path should be valid UTF-8");
|
||||
let notify_command = if cfg!(windows) { "python" } else { "python3" };
|
||||
create_config_toml_with_extra(
|
||||
codex_home.path(),
|
||||
&server.uri(),
|
||||
"never",
|
||||
&format!(
|
||||
"notify = [\"python3\", {}]",
|
||||
"notify = [\"{notify_command}\", {}]",
|
||||
toml_basic_string(notify_script)
|
||||
),
|
||||
)?;
|
||||
@@ -261,7 +262,12 @@ tmp_path.replace(payload_path)
|
||||
)
|
||||
.await??;
|
||||
|
||||
fs_wait::wait_for_path_exists(¬ify_file, Duration::from_secs(5)).await?;
|
||||
let notify_timeout = if cfg!(windows) {
|
||||
Duration::from_secs(15)
|
||||
} else {
|
||||
Duration::from_secs(5)
|
||||
};
|
||||
fs_wait::wait_for_path_exists(¬ify_file, notify_timeout).await?;
|
||||
let payload_raw = tokio::fs::read_to_string(¬ify_file).await?;
|
||||
let payload: Value = serde_json::from_str(&payload_raw)?;
|
||||
assert_eq!(payload["client"], "xcode");
|
||||
|
||||
@@ -616,11 +616,19 @@
|
||||
"additionalProperties": false,
|
||||
"description": "Memories settings loaded from config.toml.",
|
||||
"properties": {
|
||||
"consolidation_model": {
|
||||
"description": "Model used for memory consolidation.",
|
||||
"type": "string"
|
||||
},
|
||||
"extract_model": {
|
||||
"description": "Model used for thread summarisation.",
|
||||
"type": "string"
|
||||
},
|
||||
"generate_memories": {
|
||||
"description": "When `false`, newly created threads are stored with `memory_mode = \"disabled\"` in the state DB.",
|
||||
"type": "boolean"
|
||||
},
|
||||
"max_raw_memories_for_global": {
|
||||
"max_raw_memories_for_consolidation": {
|
||||
"description": "Maximum number of recent raw memories retained for global consolidation.",
|
||||
"format": "uint",
|
||||
"minimum": 0.0,
|
||||
@@ -647,13 +655,9 @@
|
||||
"format": "int64",
|
||||
"type": "integer"
|
||||
},
|
||||
"phase_1_model": {
|
||||
"description": "Model used for thread summarisation.",
|
||||
"type": "string"
|
||||
},
|
||||
"phase_2_model": {
|
||||
"description": "Model used for memory consolidation.",
|
||||
"type": "string"
|
||||
"no_memories_if_mcp_or_web_search": {
|
||||
"description": "When `true`, web searches and MCP tool calls mark the thread `memory_mode` as `\"polluted\"`.",
|
||||
"type": "boolean"
|
||||
},
|
||||
"use_memories": {
|
||||
"description": "When `false`, skip injecting memory usage instructions into developer prompts.",
|
||||
|
||||
@@ -7,6 +7,7 @@ use crate::find_thread_path_by_id_str;
|
||||
use crate::rollout::RolloutRecorder;
|
||||
use crate::session_prefix::format_subagent_context_line;
|
||||
use crate::session_prefix::format_subagent_notification_message;
|
||||
use crate::shell_snapshot::ShellSnapshot;
|
||||
use crate::state_db;
|
||||
use crate::thread_manager::ThreadManagerState;
|
||||
use codex_protocol::ThreadId;
|
||||
@@ -83,6 +84,9 @@ impl AgentControl {
|
||||
) -> CodexResult<ThreadId> {
|
||||
let state = self.upgrade()?;
|
||||
let mut reservation = self.state.reserve_spawn_slot(config.agent_max_threads)?;
|
||||
let inherited_shell_snapshot = self
|
||||
.inherited_shell_snapshot_for_source(&state, session_source.as_ref())
|
||||
.await;
|
||||
let session_source = match session_source {
|
||||
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id,
|
||||
@@ -161,6 +165,7 @@ impl AgentControl {
|
||||
self.clone(),
|
||||
session_source,
|
||||
false,
|
||||
inherited_shell_snapshot,
|
||||
)
|
||||
.await?
|
||||
} else {
|
||||
@@ -171,6 +176,7 @@ impl AgentControl {
|
||||
session_source,
|
||||
false,
|
||||
None,
|
||||
inherited_shell_snapshot,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
@@ -235,6 +241,9 @@ impl AgentControl {
|
||||
other => other,
|
||||
};
|
||||
let notification_source = session_source.clone();
|
||||
let inherited_shell_snapshot = self
|
||||
.inherited_shell_snapshot_for_source(&state, Some(&session_source))
|
||||
.await;
|
||||
let rollout_path =
|
||||
find_thread_path_by_id_str(config.codex_home.as_path(), &thread_id.to_string())
|
||||
.await?
|
||||
@@ -246,6 +255,7 @@ impl AgentControl {
|
||||
rollout_path,
|
||||
self.clone(),
|
||||
session_source,
|
||||
inherited_shell_snapshot,
|
||||
)
|
||||
.await?;
|
||||
reservation.commit(resumed_thread.thread_id);
|
||||
@@ -431,6 +441,22 @@ impl AgentControl {
|
||||
.upgrade()
|
||||
.ok_or_else(|| CodexErr::UnsupportedOperation("thread manager dropped".to_string()))
|
||||
}
|
||||
|
||||
async fn inherited_shell_snapshot_for_source(
|
||||
&self,
|
||||
state: &Arc<ThreadManagerState>,
|
||||
session_source: Option<&SessionSource>,
|
||||
) -> Option<Arc<ShellSnapshot>> {
|
||||
let Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id, ..
|
||||
})) = session_source
|
||||
else {
|
||||
return None;
|
||||
};
|
||||
|
||||
let parent_thread = state.get_thread(*parent_thread_id).await.ok()?;
|
||||
parent_thread.codex.session.user_shell().shell_snapshot()
|
||||
}
|
||||
}
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
@@ -345,6 +345,7 @@ impl Codex {
|
||||
dynamic_tools: Vec<DynamicToolSpec>,
|
||||
persist_extended_history: bool,
|
||||
metrics_service_name: Option<String>,
|
||||
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
|
||||
) -> CodexResult<CodexSpawnOk> {
|
||||
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
|
||||
let (tx_event, rx_event) = async_channel::unbounded();
|
||||
@@ -475,6 +476,7 @@ impl Codex {
|
||||
session_source,
|
||||
dynamic_tools,
|
||||
persist_extended_history,
|
||||
inherited_shell_snapshot,
|
||||
};
|
||||
|
||||
// Generate a unique ID for the lifetime of this Codex session.
|
||||
@@ -865,6 +867,7 @@ pub(crate) struct SessionConfiguration {
|
||||
session_source: SessionSource,
|
||||
dynamic_tools: Vec<DynamicToolSpec>,
|
||||
persist_extended_history: bool,
|
||||
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
|
||||
}
|
||||
|
||||
impl SessionConfiguration {
|
||||
@@ -1383,13 +1386,19 @@ impl Session {
|
||||
};
|
||||
// Create the mutable state for the Session.
|
||||
let shell_snapshot_tx = if config.features.enabled(Feature::ShellSnapshot) {
|
||||
ShellSnapshot::start_snapshotting(
|
||||
config.codex_home.clone(),
|
||||
conversation_id,
|
||||
session_configuration.cwd.clone(),
|
||||
&mut default_shell,
|
||||
otel_manager.clone(),
|
||||
)
|
||||
if let Some(snapshot) = session_configuration.inherited_shell_snapshot.clone() {
|
||||
let (tx, rx) = watch::channel(Some(snapshot));
|
||||
default_shell.shell_snapshot = rx;
|
||||
tx
|
||||
} else {
|
||||
ShellSnapshot::start_snapshotting(
|
||||
config.codex_home.clone(),
|
||||
conversation_id,
|
||||
session_configuration.cwd.clone(),
|
||||
&mut default_shell,
|
||||
otel_manager.clone(),
|
||||
)
|
||||
}
|
||||
} else {
|
||||
let (tx, rx) = watch::channel(None);
|
||||
default_shell.shell_snapshot = rx;
|
||||
@@ -1978,6 +1987,7 @@ impl Session {
|
||||
previous_cwd: &Path,
|
||||
next_cwd: &Path,
|
||||
codex_home: &Path,
|
||||
session_source: &SessionSource,
|
||||
) {
|
||||
if previous_cwd == next_cwd {
|
||||
return;
|
||||
@@ -1987,6 +1997,13 @@ impl Session {
|
||||
return;
|
||||
}
|
||||
|
||||
if matches!(
|
||||
session_source,
|
||||
SessionSource::SubAgent(SubAgentSource::ThreadSpawn { .. })
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
ShellSnapshot::refresh_snapshot(
|
||||
codex_home.to_path_buf(),
|
||||
self.conversation_id,
|
||||
@@ -2008,10 +2025,16 @@ impl Session {
|
||||
let previous_cwd = state.session_configuration.cwd.clone();
|
||||
let next_cwd = updated.cwd.clone();
|
||||
let codex_home = updated.codex_home.clone();
|
||||
let session_source = updated.session_source.clone();
|
||||
state.session_configuration = updated;
|
||||
drop(state);
|
||||
|
||||
self.maybe_refresh_shell_snapshot_for_cwd(&previous_cwd, &next_cwd, &codex_home);
|
||||
self.maybe_refresh_shell_snapshot_for_cwd(
|
||||
&previous_cwd,
|
||||
&next_cwd,
|
||||
&codex_home,
|
||||
&session_source,
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -2027,7 +2050,13 @@ impl Session {
|
||||
sub_id: String,
|
||||
updates: SessionSettingsUpdate,
|
||||
) -> ConstraintResult<Arc<TurnContext>> {
|
||||
let (session_configuration, sandbox_policy_changed, previous_cwd, codex_home) = {
|
||||
let (
|
||||
session_configuration,
|
||||
sandbox_policy_changed,
|
||||
previous_cwd,
|
||||
codex_home,
|
||||
session_source,
|
||||
) = {
|
||||
let mut state = self.state.lock().await;
|
||||
match state.session_configuration.clone().apply(&updates) {
|
||||
Ok(next) => {
|
||||
@@ -2035,8 +2064,15 @@ impl Session {
|
||||
let sandbox_policy_changed =
|
||||
state.session_configuration.sandbox_policy != next.sandbox_policy;
|
||||
let codex_home = next.codex_home.clone();
|
||||
let session_source = next.session_source.clone();
|
||||
state.session_configuration = next.clone();
|
||||
(next, sandbox_policy_changed, previous_cwd, codex_home)
|
||||
(
|
||||
next,
|
||||
sandbox_policy_changed,
|
||||
previous_cwd,
|
||||
codex_home,
|
||||
session_source,
|
||||
)
|
||||
}
|
||||
Err(err) => {
|
||||
drop(state);
|
||||
@@ -2057,6 +2093,7 @@ impl Session {
|
||||
&previous_cwd,
|
||||
&session_configuration.cwd,
|
||||
&codex_home,
|
||||
&session_source,
|
||||
);
|
||||
|
||||
Ok(self
|
||||
@@ -7667,6 +7704,7 @@ mod tests {
|
||||
session_source: SessionSource::Exec,
|
||||
dynamic_tools: Vec::new(),
|
||||
persist_extended_history: false,
|
||||
inherited_shell_snapshot: None,
|
||||
};
|
||||
|
||||
let mut state = SessionState::new(session_configuration);
|
||||
@@ -7760,6 +7798,7 @@ mod tests {
|
||||
session_source: SessionSource::Exec,
|
||||
dynamic_tools: Vec::new(),
|
||||
persist_extended_history: false,
|
||||
inherited_shell_snapshot: None,
|
||||
};
|
||||
|
||||
let mut state = SessionState::new(session_configuration);
|
||||
@@ -8072,6 +8111,7 @@ mod tests {
|
||||
session_source: SessionSource::Exec,
|
||||
dynamic_tools: Vec::new(),
|
||||
persist_extended_history: false,
|
||||
inherited_shell_snapshot: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8126,6 +8166,7 @@ mod tests {
|
||||
session_source: SessionSource::Exec,
|
||||
dynamic_tools: Vec::new(),
|
||||
persist_extended_history: false,
|
||||
inherited_shell_snapshot: None,
|
||||
};
|
||||
|
||||
let (tx_event, _rx_event) = async_channel::unbounded();
|
||||
@@ -8216,6 +8257,7 @@ mod tests {
|
||||
session_source: SessionSource::Exec,
|
||||
dynamic_tools: Vec::new(),
|
||||
persist_extended_history: false,
|
||||
inherited_shell_snapshot: None,
|
||||
};
|
||||
let per_turn_config = Session::build_per_turn_config(&session_configuration);
|
||||
let model_info = ModelsManager::construct_model_info_offline_for_tests(
|
||||
@@ -8383,6 +8425,7 @@ mod tests {
|
||||
session_source: SessionSource::Exec,
|
||||
dynamic_tools,
|
||||
persist_extended_history: false,
|
||||
inherited_shell_snapshot: None,
|
||||
};
|
||||
let per_turn_config = Session::build_per_turn_config(&session_configuration);
|
||||
let model_info = ModelsManager::construct_model_info_offline_for_tests(
|
||||
|
||||
@@ -62,6 +62,7 @@ pub(crate) async fn run_codex_thread_interactive(
|
||||
Vec::new(),
|
||||
false,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
let codex = Arc::new(codex);
|
||||
|
||||
@@ -2505,29 +2505,31 @@ persistence = "none"
|
||||
|
||||
let memories = r#"
|
||||
[memories]
|
||||
no_memories_if_mcp_or_web_search = true
|
||||
generate_memories = false
|
||||
use_memories = false
|
||||
max_raw_memories_for_global = 512
|
||||
max_raw_memories_for_consolidation = 512
|
||||
max_unused_days = 21
|
||||
max_rollout_age_days = 42
|
||||
max_rollouts_per_startup = 9
|
||||
min_rollout_idle_hours = 24
|
||||
phase_1_model = "gpt-5-mini"
|
||||
phase_2_model = "gpt-5"
|
||||
extract_model = "gpt-5-mini"
|
||||
consolidation_model = "gpt-5"
|
||||
"#;
|
||||
let memories_cfg =
|
||||
toml::from_str::<ConfigToml>(memories).expect("TOML deserialization should succeed");
|
||||
assert_eq!(
|
||||
Some(MemoriesToml {
|
||||
no_memories_if_mcp_or_web_search: Some(true),
|
||||
generate_memories: Some(false),
|
||||
use_memories: Some(false),
|
||||
max_raw_memories_for_global: Some(512),
|
||||
max_raw_memories_for_consolidation: Some(512),
|
||||
max_unused_days: Some(21),
|
||||
max_rollout_age_days: Some(42),
|
||||
max_rollouts_per_startup: Some(9),
|
||||
min_rollout_idle_hours: Some(24),
|
||||
phase_1_model: Some("gpt-5-mini".to_string()),
|
||||
phase_2_model: Some("gpt-5".to_string()),
|
||||
extract_model: Some("gpt-5-mini".to_string()),
|
||||
consolidation_model: Some("gpt-5".to_string()),
|
||||
}),
|
||||
memories_cfg.memories
|
||||
);
|
||||
@@ -2541,15 +2543,16 @@ phase_2_model = "gpt-5"
|
||||
assert_eq!(
|
||||
config.memories,
|
||||
MemoriesConfig {
|
||||
no_memories_if_mcp_or_web_search: true,
|
||||
generate_memories: false,
|
||||
use_memories: false,
|
||||
max_raw_memories_for_global: 512,
|
||||
max_raw_memories_for_consolidation: 512,
|
||||
max_unused_days: 21,
|
||||
max_rollout_age_days: 42,
|
||||
max_rollouts_per_startup: 9,
|
||||
min_rollout_idle_hours: 24,
|
||||
phase_1_model: Some("gpt-5-mini".to_string()),
|
||||
phase_2_model: Some("gpt-5".to_string()),
|
||||
extract_model: Some("gpt-5-mini".to_string()),
|
||||
consolidation_model: Some("gpt-5".to_string()),
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@@ -26,7 +26,7 @@ pub const DEFAULT_OTEL_ENVIRONMENT: &str = "dev";
|
||||
pub const DEFAULT_MEMORIES_MAX_ROLLOUTS_PER_STARTUP: usize = 16;
|
||||
pub const DEFAULT_MEMORIES_MAX_ROLLOUT_AGE_DAYS: i64 = 30;
|
||||
pub const DEFAULT_MEMORIES_MIN_ROLLOUT_IDLE_HOURS: i64 = 6;
|
||||
pub const DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL: usize = 256;
|
||||
pub const DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION: usize = 256;
|
||||
pub const DEFAULT_MEMORIES_MAX_UNUSED_DAYS: i64 = 30;
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, JsonSchema)]
|
||||
@@ -371,12 +371,14 @@ pub struct FeedbackConfigToml {
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema)]
|
||||
#[schemars(deny_unknown_fields)]
|
||||
pub struct MemoriesToml {
|
||||
/// When `true`, web searches and MCP tool calls mark the thread `memory_mode` as `"polluted"`.
|
||||
pub no_memories_if_mcp_or_web_search: Option<bool>,
|
||||
/// When `false`, newly created threads are stored with `memory_mode = "disabled"` in the state DB.
|
||||
pub generate_memories: Option<bool>,
|
||||
/// When `false`, skip injecting memory usage instructions into developer prompts.
|
||||
pub use_memories: Option<bool>,
|
||||
/// Maximum number of recent raw memories retained for global consolidation.
|
||||
pub max_raw_memories_for_global: Option<usize>,
|
||||
pub max_raw_memories_for_consolidation: Option<usize>,
|
||||
/// Maximum number of days since a memory was last used before it becomes ineligible for phase 2 selection.
|
||||
pub max_unused_days: Option<i64>,
|
||||
/// Maximum age of the threads used for memories.
|
||||
@@ -386,37 +388,39 @@ pub struct MemoriesToml {
|
||||
/// Minimum idle time between last thread activity and memory creation (hours). > 12h recommended.
|
||||
pub min_rollout_idle_hours: Option<i64>,
|
||||
/// Model used for thread summarisation.
|
||||
pub phase_1_model: Option<String>,
|
||||
pub extract_model: Option<String>,
|
||||
/// Model used for memory consolidation.
|
||||
pub phase_2_model: Option<String>,
|
||||
pub consolidation_model: Option<String>,
|
||||
}
|
||||
|
||||
/// Effective memories settings after defaults are applied.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct MemoriesConfig {
|
||||
pub no_memories_if_mcp_or_web_search: bool,
|
||||
pub generate_memories: bool,
|
||||
pub use_memories: bool,
|
||||
pub max_raw_memories_for_global: usize,
|
||||
pub max_raw_memories_for_consolidation: usize,
|
||||
pub max_unused_days: i64,
|
||||
pub max_rollout_age_days: i64,
|
||||
pub max_rollouts_per_startup: usize,
|
||||
pub min_rollout_idle_hours: i64,
|
||||
pub phase_1_model: Option<String>,
|
||||
pub phase_2_model: Option<String>,
|
||||
pub extract_model: Option<String>,
|
||||
pub consolidation_model: Option<String>,
|
||||
}
|
||||
|
||||
impl Default for MemoriesConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
no_memories_if_mcp_or_web_search: false,
|
||||
generate_memories: true,
|
||||
use_memories: true,
|
||||
max_raw_memories_for_global: DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL,
|
||||
max_raw_memories_for_consolidation: DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION,
|
||||
max_unused_days: DEFAULT_MEMORIES_MAX_UNUSED_DAYS,
|
||||
max_rollout_age_days: DEFAULT_MEMORIES_MAX_ROLLOUT_AGE_DAYS,
|
||||
max_rollouts_per_startup: DEFAULT_MEMORIES_MAX_ROLLOUTS_PER_STARTUP,
|
||||
min_rollout_idle_hours: DEFAULT_MEMORIES_MIN_ROLLOUT_IDLE_HOURS,
|
||||
phase_1_model: None,
|
||||
phase_2_model: None,
|
||||
extract_model: None,
|
||||
consolidation_model: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -425,11 +429,14 @@ impl From<MemoriesToml> for MemoriesConfig {
|
||||
fn from(toml: MemoriesToml) -> Self {
|
||||
let defaults = Self::default();
|
||||
Self {
|
||||
no_memories_if_mcp_or_web_search: toml
|
||||
.no_memories_if_mcp_or_web_search
|
||||
.unwrap_or(defaults.no_memories_if_mcp_or_web_search),
|
||||
generate_memories: toml.generate_memories.unwrap_or(defaults.generate_memories),
|
||||
use_memories: toml.use_memories.unwrap_or(defaults.use_memories),
|
||||
max_raw_memories_for_global: toml
|
||||
.max_raw_memories_for_global
|
||||
.unwrap_or(defaults.max_raw_memories_for_global)
|
||||
max_raw_memories_for_consolidation: toml
|
||||
.max_raw_memories_for_consolidation
|
||||
.unwrap_or(defaults.max_raw_memories_for_consolidation)
|
||||
.min(4096),
|
||||
max_unused_days: toml
|
||||
.max_unused_days
|
||||
@@ -447,8 +454,8 @@ impl From<MemoriesToml> for MemoriesConfig {
|
||||
.min_rollout_idle_hours
|
||||
.unwrap_or(defaults.min_rollout_idle_hours)
|
||||
.clamp(1, 48),
|
||||
phase_1_model: toml.phase_1_model,
|
||||
phase_2_model: toml.phase_2_model,
|
||||
extract_model: toml.extract_model,
|
||||
consolidation_model: toml.consolidation_model,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@ use crate::protocol::EventMsg;
|
||||
use crate::protocol::McpInvocation;
|
||||
use crate::protocol::McpToolCallBeginEvent;
|
||||
use crate::protocol::McpToolCallEndEvent;
|
||||
use crate::state_db;
|
||||
use codex_protocol::mcp::CallToolResult;
|
||||
use codex_protocol::models::FunctionCallOutputBody;
|
||||
use codex_protocol::models::FunctionCallOutputPayload;
|
||||
@@ -121,6 +122,7 @@ pub(crate) async fn handle_mcp_tool_call(
|
||||
});
|
||||
notify_mcp_tool_call_event(sess.as_ref(), turn_context, tool_call_begin_event)
|
||||
.await;
|
||||
maybe_mark_thread_memory_mode_polluted(sess.as_ref(), turn_context).await;
|
||||
|
||||
let start = Instant::now();
|
||||
let result = sess
|
||||
@@ -189,6 +191,7 @@ pub(crate) async fn handle_mcp_tool_call(
|
||||
invocation: invocation.clone(),
|
||||
});
|
||||
notify_mcp_tool_call_event(sess.as_ref(), turn_context, tool_call_begin_event).await;
|
||||
maybe_mark_thread_memory_mode_polluted(sess.as_ref(), turn_context).await;
|
||||
|
||||
let start = Instant::now();
|
||||
// Perform the tool call.
|
||||
@@ -224,6 +227,22 @@ pub(crate) async fn handle_mcp_tool_call(
|
||||
ResponseInputItem::McpToolCallOutput { call_id, result }
|
||||
}
|
||||
|
||||
async fn maybe_mark_thread_memory_mode_polluted(sess: &Session, turn_context: &TurnContext) {
|
||||
if !turn_context
|
||||
.config
|
||||
.memories
|
||||
.no_memories_if_mcp_or_web_search
|
||||
{
|
||||
return;
|
||||
}
|
||||
state_db::mark_thread_memory_mode_polluted(
|
||||
sess.services.state_db.as_deref(),
|
||||
sess.conversation_id,
|
||||
"mcp_tool_call",
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
fn sanitize_mcp_tool_result_for_model(
|
||||
supports_image_input: bool,
|
||||
result: Result<CallToolResult, String>,
|
||||
|
||||
@@ -193,7 +193,7 @@ async fn claim_startup_jobs(
|
||||
async fn build_request_context(session: &Arc<Session>, config: &Config) -> RequestContext {
|
||||
let model_name = config
|
||||
.memories
|
||||
.phase_1_model
|
||||
.extract_model
|
||||
.clone()
|
||||
.unwrap_or(phase_one::MODEL.to_string());
|
||||
let model = session
|
||||
|
||||
@@ -52,7 +52,7 @@ pub(super) async fn run(session: &Arc<Session>, config: Arc<Config>) {
|
||||
return;
|
||||
};
|
||||
let root = memory_root(&config.codex_home);
|
||||
let max_raw_memories = config.memories.max_raw_memories_for_global;
|
||||
let max_raw_memories = config.memories.max_raw_memories_for_consolidation;
|
||||
let max_unused_days = config.memories.max_unused_days;
|
||||
|
||||
// 1. Claim the job.
|
||||
@@ -294,7 +294,7 @@ mod agent {
|
||||
agent_config.model = Some(
|
||||
config
|
||||
.memories
|
||||
.phase_2_model
|
||||
.consolidation_model
|
||||
.clone()
|
||||
.unwrap_or(phase_two::MODEL.to_string()),
|
||||
);
|
||||
|
||||
@@ -13,21 +13,21 @@ use crate::memories::rollout_summaries_dir;
|
||||
pub(super) async fn rebuild_raw_memories_file_from_memories(
|
||||
root: &Path,
|
||||
memories: &[Stage1Output],
|
||||
max_raw_memories_for_global: usize,
|
||||
max_raw_memories_for_consolidation: usize,
|
||||
) -> std::io::Result<()> {
|
||||
ensure_layout(root).await?;
|
||||
rebuild_raw_memories_file(root, memories, max_raw_memories_for_global).await
|
||||
rebuild_raw_memories_file(root, memories, max_raw_memories_for_consolidation).await
|
||||
}
|
||||
|
||||
/// Syncs canonical rollout summary files from DB-backed stage-1 output rows.
|
||||
pub(super) async fn sync_rollout_summaries_from_memories(
|
||||
root: &Path,
|
||||
memories: &[Stage1Output],
|
||||
max_raw_memories_for_global: usize,
|
||||
max_raw_memories_for_consolidation: usize,
|
||||
) -> std::io::Result<()> {
|
||||
ensure_layout(root).await?;
|
||||
|
||||
let retained = retained_memories(memories, max_raw_memories_for_global);
|
||||
let retained = retained_memories(memories, max_raw_memories_for_consolidation);
|
||||
let keep = retained
|
||||
.iter()
|
||||
.map(rollout_summary_file_stem)
|
||||
@@ -62,9 +62,9 @@ pub(super) async fn sync_rollout_summaries_from_memories(
|
||||
async fn rebuild_raw_memories_file(
|
||||
root: &Path,
|
||||
memories: &[Stage1Output],
|
||||
max_raw_memories_for_global: usize,
|
||||
max_raw_memories_for_consolidation: usize,
|
||||
) -> std::io::Result<()> {
|
||||
let retained = retained_memories(memories, max_raw_memories_for_global);
|
||||
let retained = retained_memories(memories, max_raw_memories_for_consolidation);
|
||||
let mut body = String::from("# Raw Memories\n\n");
|
||||
|
||||
if retained.is_empty() {
|
||||
@@ -155,9 +155,9 @@ async fn write_rollout_summary_for_thread(
|
||||
|
||||
fn retained_memories(
|
||||
memories: &[Stage1Output],
|
||||
max_raw_memories_for_global: usize,
|
||||
max_raw_memories_for_consolidation: usize,
|
||||
) -> &[Stage1Output] {
|
||||
&memories[..memories.len().min(max_raw_memories_for_global)]
|
||||
&memories[..memories.len().min(max_raw_memories_for_consolidation)]
|
||||
}
|
||||
|
||||
fn raw_memories_format_error(err: std::fmt::Error) -> std::io::Error {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use super::storage::rebuild_raw_memories_file_from_memories;
|
||||
use super::storage::sync_rollout_summaries_from_memories;
|
||||
use crate::config::types::DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL;
|
||||
use crate::config::types::DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION;
|
||||
use crate::memories::ensure_layout;
|
||||
use crate::memories::memory_root;
|
||||
use crate::memories::raw_memories_file;
|
||||
@@ -95,14 +95,14 @@ async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only
|
||||
sync_rollout_summaries_from_memories(
|
||||
&root,
|
||||
&memories,
|
||||
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL,
|
||||
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION,
|
||||
)
|
||||
.await
|
||||
.expect("sync rollout summaries");
|
||||
rebuild_raw_memories_file_from_memories(
|
||||
&root,
|
||||
&memories,
|
||||
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL,
|
||||
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION,
|
||||
)
|
||||
.await
|
||||
.expect("rebuild raw memories");
|
||||
@@ -201,7 +201,7 @@ async fn sync_rollout_summaries_uses_timestamp_hash_and_sanitized_slug_filename(
|
||||
sync_rollout_summaries_from_memories(
|
||||
&root,
|
||||
&memories,
|
||||
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL,
|
||||
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION,
|
||||
)
|
||||
.await
|
||||
.expect("sync rollout summaries");
|
||||
@@ -304,14 +304,14 @@ task_outcome: success
|
||||
sync_rollout_summaries_from_memories(
|
||||
&root,
|
||||
&memories,
|
||||
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL,
|
||||
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION,
|
||||
)
|
||||
.await
|
||||
.expect("sync rollout summaries");
|
||||
rebuild_raw_memories_file_from_memories(
|
||||
&root,
|
||||
&memories,
|
||||
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL,
|
||||
DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION,
|
||||
)
|
||||
.await
|
||||
.expect("rebuild raw memories");
|
||||
|
||||
@@ -177,6 +177,7 @@ mod tests {
|
||||
model_provider: None,
|
||||
base_instructions: None,
|
||||
dynamic_tools: None,
|
||||
memory_mode: None,
|
||||
},
|
||||
git: None,
|
||||
};
|
||||
|
||||
@@ -129,6 +129,13 @@ pub(crate) async fn extract_metadata_from_rollout(
|
||||
}
|
||||
Ok(ExtractionOutcome {
|
||||
metadata,
|
||||
memory_mode: items.iter().rev().find_map(|item| match item {
|
||||
RolloutItem::SessionMeta(meta_line) => meta_line.meta.memory_mode.clone(),
|
||||
RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
}),
|
||||
parse_errors,
|
||||
})
|
||||
}
|
||||
@@ -272,6 +279,7 @@ pub(crate) async fn backfill_sessions(
|
||||
);
|
||||
}
|
||||
let mut metadata = outcome.metadata;
|
||||
let memory_mode = outcome.memory_mode.unwrap_or_else(|| "enabled".to_string());
|
||||
if rollout.archived && metadata.archived_at.is_none() {
|
||||
let fallback_archived_at = metadata.updated_at;
|
||||
metadata.archived_at = file_modified_time_utc(&rollout.path)
|
||||
@@ -282,6 +290,17 @@ pub(crate) async fn backfill_sessions(
|
||||
stats.failed = stats.failed.saturating_add(1);
|
||||
warn!("failed to upsert rollout {}: {err}", rollout.path.display());
|
||||
} else {
|
||||
if let Err(err) = runtime
|
||||
.set_thread_memory_mode(metadata.id, memory_mode.as_str())
|
||||
.await
|
||||
{
|
||||
stats.failed = stats.failed.saturating_add(1);
|
||||
warn!(
|
||||
"failed to restore memory mode for {}: {err}",
|
||||
rollout.path.display()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
stats.upserted = stats.upserted.saturating_add(1);
|
||||
if let Ok(meta_line) =
|
||||
rollout::list::read_session_meta_line(&rollout.path).await
|
||||
@@ -519,6 +538,7 @@ mod tests {
|
||||
model_provider: Some("openai".to_string()),
|
||||
base_instructions: None,
|
||||
dynamic_tools: None,
|
||||
memory_mode: None,
|
||||
};
|
||||
let session_meta_line = SessionMetaLine {
|
||||
meta: session_meta,
|
||||
@@ -543,9 +563,71 @@ mod tests {
|
||||
expected.updated_at = file_modified_time_utc(&path).await.expect("mtime");
|
||||
|
||||
assert_eq!(outcome.metadata, expected);
|
||||
assert_eq!(outcome.memory_mode, None);
|
||||
assert_eq!(outcome.parse_errors, 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn extract_metadata_from_rollout_returns_latest_memory_mode() {
|
||||
let dir = tempdir().expect("tempdir");
|
||||
let uuid = Uuid::new_v4();
|
||||
let id = ThreadId::from_string(&uuid.to_string()).expect("thread id");
|
||||
let path = dir
|
||||
.path()
|
||||
.join(format!("rollout-2026-01-27T12-34-56-{uuid}.jsonl"));
|
||||
|
||||
let session_meta = SessionMeta {
|
||||
id,
|
||||
forked_from_id: None,
|
||||
timestamp: "2026-01-27T12:34:56Z".to_string(),
|
||||
cwd: dir.path().to_path_buf(),
|
||||
originator: "cli".to_string(),
|
||||
cli_version: "0.0.0".to_string(),
|
||||
source: SessionSource::default(),
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
model_provider: Some("openai".to_string()),
|
||||
base_instructions: None,
|
||||
dynamic_tools: None,
|
||||
memory_mode: None,
|
||||
};
|
||||
let polluted_meta = SessionMeta {
|
||||
memory_mode: Some("polluted".to_string()),
|
||||
..session_meta.clone()
|
||||
};
|
||||
let lines = vec![
|
||||
RolloutLine {
|
||||
timestamp: "2026-01-27T12:34:56Z".to_string(),
|
||||
item: RolloutItem::SessionMeta(SessionMetaLine {
|
||||
meta: session_meta,
|
||||
git: None,
|
||||
}),
|
||||
},
|
||||
RolloutLine {
|
||||
timestamp: "2026-01-27T12:35:00Z".to_string(),
|
||||
item: RolloutItem::SessionMeta(SessionMetaLine {
|
||||
meta: polluted_meta,
|
||||
git: None,
|
||||
}),
|
||||
},
|
||||
];
|
||||
let mut file = File::create(&path).expect("create rollout");
|
||||
for line in lines {
|
||||
writeln!(
|
||||
file,
|
||||
"{}",
|
||||
serde_json::to_string(&line).expect("serialize rollout line")
|
||||
)
|
||||
.expect("write rollout line");
|
||||
}
|
||||
|
||||
let outcome = extract_metadata_from_rollout(&path, "openai", None)
|
||||
.await
|
||||
.expect("extract");
|
||||
|
||||
assert_eq!(outcome.memory_mode.as_deref(), Some("polluted"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn builder_from_items_falls_back_to_filename() {
|
||||
let dir = tempdir().expect("tempdir");
|
||||
@@ -669,6 +751,7 @@ mod tests {
|
||||
model_provider: Some("test-provider".to_string()),
|
||||
base_instructions: None,
|
||||
dynamic_tools: None,
|
||||
memory_mode: None,
|
||||
};
|
||||
let session_meta_line = SessionMetaLine {
|
||||
meta: session_meta,
|
||||
|
||||
@@ -412,6 +412,8 @@ impl RolloutRecorder {
|
||||
} else {
|
||||
Some(dynamic_tools)
|
||||
},
|
||||
memory_mode: (!config.memories.generate_memories)
|
||||
.then_some("disabled".to_string()),
|
||||
};
|
||||
|
||||
(
|
||||
|
||||
@@ -1109,6 +1109,7 @@ async fn test_updated_at_uses_file_mtime() -> Result<()> {
|
||||
model_provider: Some("test-provider".into()),
|
||||
base_instructions: None,
|
||||
dynamic_tools: None,
|
||||
memory_mode: None,
|
||||
},
|
||||
git: None,
|
||||
}),
|
||||
|
||||
@@ -337,6 +337,19 @@ pub async fn persist_dynamic_tools(
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn mark_thread_memory_mode_polluted(
|
||||
context: Option<&codex_state::StateRuntime>,
|
||||
thread_id: ThreadId,
|
||||
stage: &str,
|
||||
) {
|
||||
let Some(ctx) = context else {
|
||||
return;
|
||||
};
|
||||
if let Err(err) = ctx.mark_thread_memory_mode_polluted(thread_id).await {
|
||||
warn!("state db mark_thread_memory_mode_polluted failed during {stage}: {err}");
|
||||
}
|
||||
}
|
||||
|
||||
/// Reconcile rollout items into SQLite, falling back to scanning the rollout file.
|
||||
pub async fn reconcile_rollout(
|
||||
context: Option<&codex_state::StateRuntime>,
|
||||
@@ -375,6 +388,7 @@ pub async fn reconcile_rollout(
|
||||
}
|
||||
};
|
||||
let mut metadata = outcome.metadata;
|
||||
let memory_mode = outcome.memory_mode.unwrap_or_else(|| "enabled".to_string());
|
||||
metadata.cwd = normalize_cwd_for_state_db(&metadata.cwd);
|
||||
match archived_only {
|
||||
Some(true) if metadata.archived_at.is_none() => {
|
||||
@@ -392,6 +406,16 @@ pub async fn reconcile_rollout(
|
||||
);
|
||||
return;
|
||||
}
|
||||
if let Err(err) = ctx
|
||||
.set_thread_memory_mode(metadata.id, memory_mode.as_str())
|
||||
.await
|
||||
{
|
||||
warn!(
|
||||
"state db reconcile_rollout memory_mode update failed {}: {err}",
|
||||
rollout_path.display()
|
||||
);
|
||||
return;
|
||||
}
|
||||
if let Ok(meta_line) = crate::rollout::list::read_session_meta_line(rollout_path).await {
|
||||
persist_dynamic_tools(
|
||||
Some(ctx),
|
||||
|
||||
@@ -58,9 +58,31 @@ pub(crate) async fn record_completed_response_item(
|
||||
) {
|
||||
sess.record_conversation_items(turn_context, std::slice::from_ref(item))
|
||||
.await;
|
||||
maybe_mark_thread_memory_mode_polluted_from_web_search(sess, turn_context, item).await;
|
||||
record_stage1_output_usage_for_completed_item(turn_context, item).await;
|
||||
}
|
||||
|
||||
async fn maybe_mark_thread_memory_mode_polluted_from_web_search(
|
||||
sess: &Session,
|
||||
turn_context: &TurnContext,
|
||||
item: &ResponseItem,
|
||||
) {
|
||||
if !turn_context
|
||||
.config
|
||||
.memories
|
||||
.no_memories_if_mcp_or_web_search
|
||||
|| !matches!(item, ResponseItem::WebSearchCall { .. })
|
||||
{
|
||||
return;
|
||||
}
|
||||
state_db::mark_thread_memory_mode_polluted(
|
||||
sess.services.state_db.as_deref(),
|
||||
sess.conversation_id,
|
||||
"record_completed_response_item",
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn record_stage1_output_usage_for_completed_item(
|
||||
turn_context: &TurnContext,
|
||||
item: &ResponseItem,
|
||||
|
||||
@@ -20,6 +20,7 @@ use crate::protocol::EventMsg;
|
||||
use crate::protocol::SessionConfiguredEvent;
|
||||
use crate::rollout::RolloutRecorder;
|
||||
use crate::rollout::truncation;
|
||||
use crate::shell_snapshot::ShellSnapshot;
|
||||
use crate::skills::SkillsManager;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::config_types::CollaborationModeMask;
|
||||
@@ -479,6 +480,7 @@ impl ThreadManagerState {
|
||||
self.session_source.clone(),
|
||||
false,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -490,6 +492,7 @@ impl ThreadManagerState {
|
||||
session_source: SessionSource,
|
||||
persist_extended_history: bool,
|
||||
metrics_service_name: Option<String>,
|
||||
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
|
||||
) -> CodexResult<NewThread> {
|
||||
self.spawn_thread_with_source(
|
||||
config,
|
||||
@@ -500,6 +503,7 @@ impl ThreadManagerState {
|
||||
Vec::new(),
|
||||
persist_extended_history,
|
||||
metrics_service_name,
|
||||
inherited_shell_snapshot,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -510,6 +514,7 @@ impl ThreadManagerState {
|
||||
rollout_path: PathBuf,
|
||||
agent_control: AgentControl,
|
||||
session_source: SessionSource,
|
||||
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
|
||||
) -> CodexResult<NewThread> {
|
||||
let initial_history = RolloutRecorder::get_rollout_history(&rollout_path).await?;
|
||||
self.spawn_thread_with_source(
|
||||
@@ -521,6 +526,7 @@ impl ThreadManagerState {
|
||||
Vec::new(),
|
||||
false,
|
||||
None,
|
||||
inherited_shell_snapshot,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -532,6 +538,7 @@ impl ThreadManagerState {
|
||||
agent_control: AgentControl,
|
||||
session_source: SessionSource,
|
||||
persist_extended_history: bool,
|
||||
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
|
||||
) -> CodexResult<NewThread> {
|
||||
self.spawn_thread_with_source(
|
||||
config,
|
||||
@@ -542,6 +549,7 @@ impl ThreadManagerState {
|
||||
Vec::new(),
|
||||
persist_extended_history,
|
||||
None,
|
||||
inherited_shell_snapshot,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -567,6 +575,7 @@ impl ThreadManagerState {
|
||||
dynamic_tools,
|
||||
persist_extended_history,
|
||||
metrics_service_name,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -582,6 +591,7 @@ impl ThreadManagerState {
|
||||
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
|
||||
persist_extended_history: bool,
|
||||
metrics_service_name: Option<String>,
|
||||
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
|
||||
) -> CodexResult<NewThread> {
|
||||
let watch_registration = self
|
||||
.file_watcher
|
||||
@@ -602,6 +612,7 @@ impl ThreadManagerState {
|
||||
dynamic_tools,
|
||||
persist_extended_history,
|
||||
metrics_service_name,
|
||||
inherited_shell_snapshot,
|
||||
)
|
||||
.await?;
|
||||
self.finalize_thread_spawn(codex, thread_id, watch_registration)
|
||||
|
||||
@@ -11,7 +11,9 @@ use core_test_support::responses::ResponsesRequest;
|
||||
use core_test_support::responses::ev_assistant_message;
|
||||
use core_test_support::responses::ev_completed;
|
||||
use core_test_support::responses::ev_response_created;
|
||||
use core_test_support::responses::ev_web_search_call_done;
|
||||
use core_test_support::responses::mount_sse_once;
|
||||
use core_test_support::responses::mount_sse_sequence;
|
||||
use core_test_support::responses::sse;
|
||||
use core_test_support::responses::start_mock_server;
|
||||
use core_test_support::test_codex::TestCodex;
|
||||
@@ -157,11 +159,161 @@ async fn memories_startup_phase2_tracks_added_and_removed_inputs_across_runs() -
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn web_search_pollution_moves_selected_thread_into_removed_phase2_inputs() -> Result<()> {
|
||||
let server = start_mock_server().await;
|
||||
let home = Arc::new(TempDir::new()?);
|
||||
let db = init_state_db(&home).await?;
|
||||
|
||||
let mut initial_builder = test_codex().with_home(home.clone()).with_config(|config| {
|
||||
config.features.enable(Feature::Sqlite);
|
||||
config.features.enable(Feature::MemoryTool);
|
||||
config.memories.max_raw_memories_for_consolidation = 1;
|
||||
config.memories.no_memories_if_mcp_or_web_search = true;
|
||||
});
|
||||
let initial = initial_builder.build(&server).await?;
|
||||
mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-initial-1"),
|
||||
ev_assistant_message("msg-initial-1", "initial turn complete"),
|
||||
ev_completed("resp-initial-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
initial.submit_turn("hello before memories").await?;
|
||||
let rollout_path = initial
|
||||
.session_configured
|
||||
.rollout_path
|
||||
.clone()
|
||||
.expect("rollout path");
|
||||
let thread_id = initial.session_configured.session_id;
|
||||
let updated_at = {
|
||||
let deadline = Instant::now() + Duration::from_secs(10);
|
||||
loop {
|
||||
if let Some(metadata) = db.get_thread(thread_id).await? {
|
||||
break metadata.updated_at;
|
||||
}
|
||||
assert!(
|
||||
Instant::now() < deadline,
|
||||
"timed out waiting for thread metadata for {thread_id}"
|
||||
);
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
}
|
||||
};
|
||||
|
||||
seed_stage1_output_for_existing_thread(
|
||||
db.as_ref(),
|
||||
thread_id,
|
||||
updated_at.timestamp(),
|
||||
"raw memory seeded for web search pollution",
|
||||
"rollout summary seeded for web search pollution",
|
||||
Some("pollution-rollout"),
|
||||
)
|
||||
.await?;
|
||||
|
||||
shutdown_test_codex(&initial).await?;
|
||||
|
||||
let responses = mount_sse_sequence(
|
||||
&server,
|
||||
vec![
|
||||
sse(vec![
|
||||
ev_response_created("resp-phase2-1"),
|
||||
ev_assistant_message("msg-phase2-1", "phase2 complete"),
|
||||
ev_completed("resp-phase2-1"),
|
||||
]),
|
||||
sse(vec![
|
||||
ev_response_created("resp-web-1"),
|
||||
ev_web_search_call_done("ws-1", "completed", "weather seattle"),
|
||||
ev_completed("resp-web-1"),
|
||||
]),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut resumed_builder = test_codex().with_home(home.clone()).with_config(|config| {
|
||||
config.features.enable(Feature::Sqlite);
|
||||
config.features.enable(Feature::MemoryTool);
|
||||
config.memories.max_raw_memories_for_consolidation = 1;
|
||||
config.memories.no_memories_if_mcp_or_web_search = true;
|
||||
});
|
||||
let resumed = resumed_builder
|
||||
.resume(&server, home.clone(), rollout_path.clone())
|
||||
.await?;
|
||||
|
||||
let first_phase2_request = wait_for_request(&responses, 1).await.remove(0);
|
||||
let first_phase2_prompt = phase2_prompt_text(&first_phase2_request);
|
||||
assert!(
|
||||
first_phase2_prompt.contains("- selected inputs this run: 1"),
|
||||
"expected seeded thread to be selected before pollution: {first_phase2_prompt}"
|
||||
);
|
||||
assert!(
|
||||
first_phase2_prompt.contains("- newly added since the last successful Phase 2 run: 1"),
|
||||
"expected seeded thread to be added before pollution: {first_phase2_prompt}"
|
||||
);
|
||||
assert!(
|
||||
first_phase2_prompt.contains(&format!("- [added] thread_id={thread_id},")),
|
||||
"expected selected thread in first phase2 prompt: {first_phase2_prompt}"
|
||||
);
|
||||
|
||||
wait_for_phase2_success(db.as_ref(), thread_id).await?;
|
||||
|
||||
resumed
|
||||
.submit_turn("search the web for weather seattle")
|
||||
.await?;
|
||||
assert_eq!(
|
||||
{
|
||||
let deadline = Instant::now() + Duration::from_secs(10);
|
||||
loop {
|
||||
let memory_mode = db.get_thread_memory_mode(thread_id).await?;
|
||||
if memory_mode.as_deref() == Some("polluted") {
|
||||
break memory_mode;
|
||||
}
|
||||
assert!(
|
||||
Instant::now() < deadline,
|
||||
"timed out waiting for polluted memory mode for {thread_id}"
|
||||
);
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
}
|
||||
}
|
||||
.as_deref(),
|
||||
Some("polluted")
|
||||
);
|
||||
|
||||
let selection = {
|
||||
let deadline = Instant::now() + Duration::from_secs(10);
|
||||
loop {
|
||||
let selection = db.get_phase2_input_selection(1, 30).await?;
|
||||
if selection.selected.is_empty()
|
||||
&& selection.retained_thread_ids.is_empty()
|
||||
&& selection.removed.len() == 1
|
||||
&& selection.removed[0].thread_id == thread_id
|
||||
{
|
||||
break selection;
|
||||
}
|
||||
assert!(
|
||||
Instant::now() < deadline,
|
||||
"timed out waiting for polluted thread to move into removed phase2 inputs: \
|
||||
{selection:?}"
|
||||
);
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
}
|
||||
};
|
||||
assert_eq!(responses.requests().len(), 2);
|
||||
assert!(selection.selected.is_empty());
|
||||
assert_eq!(selection.retained_thread_ids, Vec::<ThreadId>::new());
|
||||
assert_eq!(selection.removed.len(), 1);
|
||||
assert_eq!(selection.removed[0].thread_id, thread_id);
|
||||
|
||||
shutdown_test_codex(&resumed).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn build_test_codex(server: &wiremock::MockServer, home: Arc<TempDir>) -> Result<TestCodex> {
|
||||
let mut builder = test_codex().with_home(home).with_config(|config| {
|
||||
config.features.enable(Feature::Sqlite);
|
||||
config.features.enable(Feature::MemoryTool);
|
||||
config.memories.max_raw_memories_for_global = 1;
|
||||
config.memories.max_raw_memories_for_consolidation = 1;
|
||||
});
|
||||
builder.build(server).await
|
||||
}
|
||||
@@ -195,46 +347,33 @@ async fn seed_stage1_output(
|
||||
let metadata = metadata_builder.build("test-provider");
|
||||
db.upsert_thread(&metadata).await?;
|
||||
|
||||
let claim = db
|
||||
.try_claim_stage1_job(
|
||||
thread_id,
|
||||
ThreadId::new(),
|
||||
updated_at.timestamp(),
|
||||
3_600,
|
||||
64,
|
||||
)
|
||||
.await?;
|
||||
let ownership_token = match claim {
|
||||
codex_state::Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
|
||||
other => panic!("unexpected stage-1 claim outcome: {other:?}"),
|
||||
};
|
||||
|
||||
assert!(
|
||||
db.mark_stage1_job_succeeded(
|
||||
thread_id,
|
||||
&ownership_token,
|
||||
updated_at.timestamp(),
|
||||
raw_memory,
|
||||
rollout_summary,
|
||||
Some(rollout_slug),
|
||||
)
|
||||
.await?,
|
||||
"stage-1 success should enqueue global consolidation"
|
||||
);
|
||||
seed_stage1_output_for_existing_thread(
|
||||
db,
|
||||
thread_id,
|
||||
updated_at.timestamp(),
|
||||
raw_memory,
|
||||
rollout_summary,
|
||||
Some(rollout_slug),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(thread_id)
|
||||
}
|
||||
|
||||
async fn wait_for_single_request(mock: &ResponseMock) -> ResponsesRequest {
|
||||
wait_for_request(mock, 1).await.remove(0)
|
||||
}
|
||||
|
||||
async fn wait_for_request(mock: &ResponseMock, expected_count: usize) -> Vec<ResponsesRequest> {
|
||||
let deadline = Instant::now() + Duration::from_secs(10);
|
||||
loop {
|
||||
let requests = mock.requests();
|
||||
if let Some(request) = requests.into_iter().next() {
|
||||
return request;
|
||||
if requests.len() >= expected_count {
|
||||
return requests;
|
||||
}
|
||||
assert!(
|
||||
Instant::now() < deadline,
|
||||
"timed out waiting for phase2 request"
|
||||
"timed out waiting for {expected_count} phase2 requests"
|
||||
);
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
}
|
||||
@@ -272,6 +411,39 @@ async fn wait_for_phase2_success(
|
||||
}
|
||||
}
|
||||
|
||||
async fn seed_stage1_output_for_existing_thread(
|
||||
db: &codex_state::StateRuntime,
|
||||
thread_id: ThreadId,
|
||||
updated_at: i64,
|
||||
raw_memory: &str,
|
||||
rollout_summary: &str,
|
||||
rollout_slug: Option<&str>,
|
||||
) -> Result<()> {
|
||||
let owner = ThreadId::new();
|
||||
let claim = db
|
||||
.try_claim_stage1_job(thread_id, owner, updated_at, 3_600, 64)
|
||||
.await?;
|
||||
let ownership_token = match claim {
|
||||
codex_state::Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
|
||||
other => panic!("unexpected stage-1 claim outcome: {other:?}"),
|
||||
};
|
||||
|
||||
assert!(
|
||||
db.mark_stage1_job_succeeded(
|
||||
thread_id,
|
||||
&ownership_token,
|
||||
updated_at,
|
||||
raw_memory,
|
||||
rollout_summary,
|
||||
rollout_slug,
|
||||
)
|
||||
.await?,
|
||||
"stage-1 success should enqueue global consolidation"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn read_rollout_summary_bodies(memory_root: &Path) -> Result<Vec<String>> {
|
||||
let mut dir = tokio::fs::read_dir(memory_root.join("rollout_summaries")).await?;
|
||||
let mut summaries = Vec::new();
|
||||
|
||||
@@ -71,6 +71,7 @@ async fn write_rollout_with_user_event(dir: &Path, thread_id: ThreadId) -> io::R
|
||||
model_provider: None,
|
||||
base_instructions: None,
|
||||
dynamic_tools: None,
|
||||
memory_mode: None,
|
||||
},
|
||||
git: None,
|
||||
};
|
||||
@@ -114,6 +115,7 @@ async fn write_rollout_with_meta_only(dir: &Path, thread_id: ThreadId) -> io::Re
|
||||
model_provider: None,
|
||||
base_instructions: None,
|
||||
dynamic_tools: None,
|
||||
memory_mode: None,
|
||||
},
|
||||
git: None,
|
||||
};
|
||||
|
||||
@@ -1,23 +1,36 @@
|
||||
use anyhow::Result;
|
||||
use codex_core::config::types::McpServerConfig;
|
||||
use codex_core::config::types::McpServerTransportConfig;
|
||||
use codex_core::features::Feature;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::dynamic_tools::DynamicToolSpec;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::RolloutLine;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use codex_protocol::protocol::SessionMeta;
|
||||
use codex_protocol::protocol::SessionMetaLine;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::UserMessageEvent;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use core_test_support::responses;
|
||||
use core_test_support::responses::ev_completed;
|
||||
use core_test_support::responses::ev_function_call;
|
||||
use core_test_support::responses::ev_response_created;
|
||||
use core_test_support::responses::ev_web_search_call_done;
|
||||
use core_test_support::responses::mount_sse_once;
|
||||
use core_test_support::responses::mount_sse_sequence;
|
||||
use core_test_support::responses::start_mock_server;
|
||||
use core_test_support::skip_if_no_network;
|
||||
use core_test_support::stdio_server_bin;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use core_test_support::wait_for_event;
|
||||
use core_test_support::wait_for_event_match;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use std::collections::HashMap;
|
||||
use std::fs;
|
||||
use tokio::time::Duration;
|
||||
use tracing_subscriber::prelude::*;
|
||||
@@ -128,6 +141,7 @@ async fn backfill_scans_existing_rollouts() -> Result<()> {
|
||||
model_provider: None,
|
||||
base_instructions: None,
|
||||
dynamic_tools: Some(dynamic_tools_for_hook),
|
||||
memory_mode: None,
|
||||
},
|
||||
git: None,
|
||||
};
|
||||
@@ -253,6 +267,148 @@ async fn user_messages_persist_in_state_db() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn web_search_marks_thread_memory_mode_polluted_when_configured() -> Result<()> {
|
||||
let server = start_mock_server().await;
|
||||
mount_sse_sequence(
|
||||
&server,
|
||||
vec![responses::sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_web_search_call_done("ws-1", "completed", "weather seattle"),
|
||||
ev_completed("resp-1"),
|
||||
])],
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config.features.enable(Feature::Sqlite);
|
||||
config.memories.no_memories_if_mcp_or_web_search = true;
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
let db = test.codex.state_db().expect("state db enabled");
|
||||
let thread_id = test.session_configured.session_id;
|
||||
|
||||
test.submit_turn("search the web").await?;
|
||||
|
||||
let mut memory_mode = None;
|
||||
for _ in 0..100 {
|
||||
memory_mode = db.get_thread_memory_mode(thread_id).await?;
|
||||
if memory_mode.as_deref() == Some("polluted") {
|
||||
break;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(25)).await;
|
||||
}
|
||||
|
||||
assert_eq!(memory_mode.as_deref(), Some("polluted"));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn mcp_call_marks_thread_memory_mode_polluted_when_configured() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let call_id = "call-123";
|
||||
let server_name = "rmcp";
|
||||
let tool_name = format!("mcp__{server_name}__echo");
|
||||
mount_sse_once(
|
||||
&server,
|
||||
responses::sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_function_call(call_id, &tool_name, "{\"message\":\"ping\"}"),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
mount_sse_once(
|
||||
&server,
|
||||
responses::sse(vec![
|
||||
responses::ev_assistant_message("msg-1", "rmcp echo tool completed."),
|
||||
ev_completed("resp-2"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let rmcp_test_server_bin = stdio_server_bin()?;
|
||||
let mut builder = test_codex().with_config(move |config| {
|
||||
config.features.enable(Feature::Sqlite);
|
||||
config.memories.no_memories_if_mcp_or_web_search = true;
|
||||
|
||||
let mut servers = config.mcp_servers.get().clone();
|
||||
servers.insert(
|
||||
server_name.to_string(),
|
||||
McpServerConfig {
|
||||
transport: McpServerTransportConfig::Stdio {
|
||||
command: rmcp_test_server_bin,
|
||||
args: Vec::new(),
|
||||
env: Some(HashMap::from([(
|
||||
"MCP_TEST_VALUE".to_string(),
|
||||
"propagated-env".to_string(),
|
||||
)])),
|
||||
env_vars: Vec::new(),
|
||||
cwd: None,
|
||||
},
|
||||
enabled: true,
|
||||
required: false,
|
||||
disabled_reason: None,
|
||||
startup_timeout_sec: Some(Duration::from_secs(10)),
|
||||
tool_timeout_sec: None,
|
||||
enabled_tools: None,
|
||||
disabled_tools: None,
|
||||
scopes: None,
|
||||
oauth_resource: None,
|
||||
},
|
||||
);
|
||||
config
|
||||
.mcp_servers
|
||||
.set(servers)
|
||||
.expect("test mcp servers should accept any configuration");
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
let db = test.codex.state_db().expect("state db enabled");
|
||||
let thread_id = test.session_configured.session_id;
|
||||
|
||||
test.codex
|
||||
.submit(Op::UserTurn {
|
||||
items: vec![UserInput::Text {
|
||||
text: "call the rmcp echo tool".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
cwd: test.cwd_path().to_path_buf(),
|
||||
approval_policy: AskForApproval::Never,
|
||||
sandbox_policy: SandboxPolicy::new_read_only_policy(),
|
||||
model: test.session_configured.model.clone(),
|
||||
effort: None,
|
||||
summary: None,
|
||||
collaboration_mode: None,
|
||||
personality: None,
|
||||
})
|
||||
.await?;
|
||||
wait_for_event(&test.codex, |event| {
|
||||
matches!(event, EventMsg::McpToolCallEnd(_))
|
||||
})
|
||||
.await;
|
||||
wait_for_event_match(&test.codex, |event| match event {
|
||||
EventMsg::Error(err) => Some(Err(anyhow::anyhow!(err.message.clone()))),
|
||||
EventMsg::TurnComplete(_) => Some(Ok(())),
|
||||
_ => None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
let mut memory_mode = None;
|
||||
for _ in 0..100 {
|
||||
memory_mode = db.get_thread_memory_mode(thread_id).await?;
|
||||
if memory_mode.as_deref() == Some("polluted") {
|
||||
break;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(25)).await;
|
||||
}
|
||||
|
||||
assert_eq!(memory_mode.as_deref(), Some("polluted"));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn tool_call_logs_include_thread_id() -> Result<()> {
|
||||
let server = start_mock_server().await;
|
||||
|
||||
@@ -2060,6 +2060,8 @@ pub struct SessionMeta {
|
||||
pub base_instructions: Option<BaseInstructions>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub dynamic_tools: Option<Vec<DynamicToolSpec>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub memory_mode: Option<String>,
|
||||
}
|
||||
|
||||
impl Default for SessionMeta {
|
||||
@@ -2077,6 +2079,7 @@ impl Default for SessionMeta {
|
||||
model_provider: None,
|
||||
base_instructions: None,
|
||||
dynamic_tools: None,
|
||||
memory_mode: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -242,6 +242,7 @@ mod tests {
|
||||
model_provider: Some("openai".to_string()),
|
||||
base_instructions: None,
|
||||
dynamic_tools: None,
|
||||
memory_mode: None,
|
||||
},
|
||||
git: None,
|
||||
}),
|
||||
|
||||
@@ -45,6 +45,8 @@ pub struct ThreadsPage {
|
||||
pub struct ExtractionOutcome {
|
||||
/// The extracted thread metadata.
|
||||
pub metadata: ThreadMetadata,
|
||||
/// The explicit thread memory mode from rollout metadata, if present.
|
||||
pub memory_mode: Option<String>,
|
||||
/// The number of rollout lines that failed to parse.
|
||||
pub parse_errors: usize,
|
||||
}
|
||||
|
||||
@@ -277,7 +277,8 @@ SELECT
|
||||
FROM stage1_outputs AS so
|
||||
LEFT JOIN threads AS t
|
||||
ON t.id = so.thread_id
|
||||
WHERE length(trim(so.raw_memory)) > 0 OR length(trim(so.rollout_summary)) > 0
|
||||
WHERE t.memory_mode = 'enabled'
|
||||
AND (length(trim(so.raw_memory)) > 0 OR length(trim(so.rollout_summary)) > 0)
|
||||
ORDER BY so.source_updated_at DESC, so.thread_id DESC
|
||||
LIMIT ?
|
||||
"#,
|
||||
@@ -304,11 +305,13 @@ LIMIT ?
|
||||
/// `thread_id DESC`
|
||||
/// - previously selected rows are identified by `selected_for_phase2 = 1`
|
||||
/// - `previous_selected` contains the current persisted rows that belonged
|
||||
/// to the last successful phase-2 baseline
|
||||
/// to the last successful phase-2 baseline, even if those threads are no
|
||||
/// longer memory-eligible
|
||||
/// - `retained_thread_ids` records which current rows still match the exact
|
||||
/// snapshot selected in the last successful phase-2 run
|
||||
/// - removed rows are previously selected rows that are still present in
|
||||
/// `stage1_outputs` but fall outside the current top-`n` selection
|
||||
/// `stage1_outputs` but are no longer in the current selection, including
|
||||
/// threads that are no longer memory-eligible
|
||||
pub async fn get_phase2_input_selection(
|
||||
&self,
|
||||
n: usize,
|
||||
@@ -336,7 +339,8 @@ SELECT
|
||||
FROM stage1_outputs AS so
|
||||
LEFT JOIN threads AS t
|
||||
ON t.id = so.thread_id
|
||||
WHERE (length(trim(so.raw_memory)) > 0 OR length(trim(so.rollout_summary)) > 0)
|
||||
WHERE t.memory_mode = 'enabled'
|
||||
AND (length(trim(so.raw_memory)) > 0 OR length(trim(so.rollout_summary)) > 0)
|
||||
AND (
|
||||
(so.last_usage IS NOT NULL AND so.last_usage >= ?)
|
||||
OR (so.last_usage IS NULL AND so.source_updated_at >= ?)
|
||||
@@ -421,6 +425,51 @@ ORDER BY so.source_updated_at DESC, so.thread_id DESC
|
||||
})
|
||||
}
|
||||
|
||||
/// Marks a thread as polluted and enqueues phase-2 forgetting when the
|
||||
/// thread participated in the last successful phase-2 baseline.
|
||||
pub async fn mark_thread_memory_mode_polluted(
|
||||
&self,
|
||||
thread_id: ThreadId,
|
||||
) -> anyhow::Result<bool> {
|
||||
let now = Utc::now().timestamp();
|
||||
let thread_id = thread_id.to_string();
|
||||
let mut tx = self.pool.begin().await?;
|
||||
let rows_affected = sqlx::query(
|
||||
r#"
|
||||
UPDATE threads
|
||||
SET memory_mode = 'polluted'
|
||||
WHERE id = ? AND memory_mode != 'polluted'
|
||||
"#,
|
||||
)
|
||||
.bind(thread_id.as_str())
|
||||
.execute(&mut *tx)
|
||||
.await?
|
||||
.rows_affected();
|
||||
|
||||
if rows_affected == 0 {
|
||||
tx.commit().await?;
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let selected_for_phase2 = sqlx::query_scalar::<_, i64>(
|
||||
r#"
|
||||
SELECT selected_for_phase2
|
||||
FROM stage1_outputs
|
||||
WHERE thread_id = ?
|
||||
"#,
|
||||
)
|
||||
.bind(thread_id.as_str())
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?
|
||||
.unwrap_or(0);
|
||||
if selected_for_phase2 != 0 {
|
||||
enqueue_global_consolidation_with_executor(&mut *tx, now).await?;
|
||||
}
|
||||
|
||||
tx.commit().await?;
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
/// Attempts to claim a stage-1 job for a thread at `source_updated_at`.
|
||||
///
|
||||
/// Claim semantics:
|
||||
@@ -2569,6 +2618,71 @@ VALUES (?, ?, ?, ?, ?)
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_stage1_outputs_for_global_skips_polluted_threads() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
|
||||
let thread_id_enabled =
|
||||
ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id");
|
||||
let thread_id_polluted =
|
||||
ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id");
|
||||
let owner = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id");
|
||||
|
||||
for (thread_id, workspace) in [
|
||||
(thread_id_enabled, "workspace-enabled"),
|
||||
(thread_id_polluted, "workspace-polluted"),
|
||||
] {
|
||||
runtime
|
||||
.upsert_thread(&test_thread_metadata(
|
||||
&codex_home,
|
||||
thread_id,
|
||||
codex_home.join(workspace),
|
||||
))
|
||||
.await
|
||||
.expect("upsert thread");
|
||||
|
||||
let claim = runtime
|
||||
.try_claim_stage1_job(thread_id, owner, 100, 3600, 64)
|
||||
.await
|
||||
.expect("claim stage1");
|
||||
let ownership_token = match claim {
|
||||
Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
|
||||
other => panic!("unexpected stage1 claim outcome: {other:?}"),
|
||||
};
|
||||
assert!(
|
||||
runtime
|
||||
.mark_stage1_job_succeeded(
|
||||
thread_id,
|
||||
ownership_token.as_str(),
|
||||
100,
|
||||
"raw memory",
|
||||
"summary",
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.expect("mark stage1 succeeded"),
|
||||
"stage1 success should persist output"
|
||||
);
|
||||
}
|
||||
|
||||
runtime
|
||||
.set_thread_memory_mode(thread_id_polluted, "polluted")
|
||||
.await
|
||||
.expect("mark thread polluted");
|
||||
|
||||
let outputs = runtime
|
||||
.list_stage1_outputs_for_global(10)
|
||||
.await
|
||||
.expect("list stage1 outputs for global");
|
||||
assert_eq!(outputs.len(), 1);
|
||||
assert_eq!(outputs[0].thread_id, thread_id_enabled);
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn get_phase2_input_selection_reports_added_retained_and_removed_rows() {
|
||||
let codex_home = unique_temp_dir();
|
||||
@@ -2681,6 +2795,197 @@ VALUES (?, ?, ?, ?, ?)
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn get_phase2_input_selection_marks_polluted_previous_selection_as_removed() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
|
||||
let thread_id_enabled =
|
||||
ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id");
|
||||
let thread_id_polluted =
|
||||
ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id");
|
||||
let owner = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id");
|
||||
|
||||
for (thread_id, updated_at) in [(thread_id_enabled, 100), (thread_id_polluted, 101)] {
|
||||
runtime
|
||||
.upsert_thread(&test_thread_metadata(
|
||||
&codex_home,
|
||||
thread_id,
|
||||
codex_home.join(thread_id.to_string()),
|
||||
))
|
||||
.await
|
||||
.expect("upsert thread");
|
||||
|
||||
let claim = runtime
|
||||
.try_claim_stage1_job(thread_id, owner, updated_at, 3600, 64)
|
||||
.await
|
||||
.expect("claim stage1");
|
||||
let ownership_token = match claim {
|
||||
Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
|
||||
other => panic!("unexpected stage1 claim outcome: {other:?}"),
|
||||
};
|
||||
assert!(
|
||||
runtime
|
||||
.mark_stage1_job_succeeded(
|
||||
thread_id,
|
||||
ownership_token.as_str(),
|
||||
updated_at,
|
||||
&format!("raw-{updated_at}"),
|
||||
&format!("summary-{updated_at}"),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.expect("mark stage1 succeeded"),
|
||||
"stage1 success should persist output"
|
||||
);
|
||||
}
|
||||
|
||||
let claim = runtime
|
||||
.try_claim_global_phase2_job(owner, 3600)
|
||||
.await
|
||||
.expect("claim phase2");
|
||||
let (ownership_token, input_watermark) = match claim {
|
||||
Phase2JobClaimOutcome::Claimed {
|
||||
ownership_token,
|
||||
input_watermark,
|
||||
} => (ownership_token, input_watermark),
|
||||
other => panic!("unexpected phase2 claim outcome: {other:?}"),
|
||||
};
|
||||
let selected_outputs = runtime
|
||||
.list_stage1_outputs_for_global(10)
|
||||
.await
|
||||
.expect("list stage1 outputs for global");
|
||||
assert!(
|
||||
runtime
|
||||
.mark_global_phase2_job_succeeded(
|
||||
ownership_token.as_str(),
|
||||
input_watermark,
|
||||
&selected_outputs,
|
||||
)
|
||||
.await
|
||||
.expect("mark phase2 success"),
|
||||
"phase2 success should persist selected rows"
|
||||
);
|
||||
|
||||
runtime
|
||||
.set_thread_memory_mode(thread_id_polluted, "polluted")
|
||||
.await
|
||||
.expect("mark thread polluted");
|
||||
|
||||
let selection = runtime
|
||||
.get_phase2_input_selection(2, 36_500)
|
||||
.await
|
||||
.expect("load phase2 input selection");
|
||||
|
||||
assert_eq!(selection.selected.len(), 1);
|
||||
assert_eq!(selection.selected[0].thread_id, thread_id_enabled);
|
||||
assert_eq!(selection.previous_selected.len(), 2);
|
||||
assert!(
|
||||
selection
|
||||
.previous_selected
|
||||
.iter()
|
||||
.any(|item| item.thread_id == thread_id_enabled)
|
||||
);
|
||||
assert!(
|
||||
selection
|
||||
.previous_selected
|
||||
.iter()
|
||||
.any(|item| item.thread_id == thread_id_polluted)
|
||||
);
|
||||
assert_eq!(selection.retained_thread_ids, vec![thread_id_enabled]);
|
||||
assert_eq!(selection.removed.len(), 1);
|
||||
assert_eq!(selection.removed[0].thread_id, thread_id_polluted);
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn mark_thread_memory_mode_polluted_enqueues_phase2_for_selected_threads() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
|
||||
let thread_id = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id");
|
||||
let owner = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id");
|
||||
runtime
|
||||
.upsert_thread(&test_thread_metadata(
|
||||
&codex_home,
|
||||
thread_id,
|
||||
codex_home.join("workspace"),
|
||||
))
|
||||
.await
|
||||
.expect("upsert thread");
|
||||
|
||||
let claim = runtime
|
||||
.try_claim_stage1_job(thread_id, owner, 100, 3600, 64)
|
||||
.await
|
||||
.expect("claim stage1");
|
||||
let ownership_token = match claim {
|
||||
Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
|
||||
other => panic!("unexpected stage1 claim outcome: {other:?}"),
|
||||
};
|
||||
assert!(
|
||||
runtime
|
||||
.mark_stage1_job_succeeded(
|
||||
thread_id,
|
||||
ownership_token.as_str(),
|
||||
100,
|
||||
"raw",
|
||||
"summary",
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.expect("mark stage1 succeeded"),
|
||||
"stage1 success should persist output"
|
||||
);
|
||||
|
||||
let phase2_claim = runtime
|
||||
.try_claim_global_phase2_job(owner, 3600)
|
||||
.await
|
||||
.expect("claim phase2");
|
||||
let (phase2_token, input_watermark) = match phase2_claim {
|
||||
Phase2JobClaimOutcome::Claimed {
|
||||
ownership_token,
|
||||
input_watermark,
|
||||
} => (ownership_token, input_watermark),
|
||||
other => panic!("unexpected phase2 claim outcome: {other:?}"),
|
||||
};
|
||||
let selected_outputs = runtime
|
||||
.list_stage1_outputs_for_global(10)
|
||||
.await
|
||||
.expect("list stage1 outputs");
|
||||
assert!(
|
||||
runtime
|
||||
.mark_global_phase2_job_succeeded(
|
||||
phase2_token.as_str(),
|
||||
input_watermark,
|
||||
&selected_outputs,
|
||||
)
|
||||
.await
|
||||
.expect("mark phase2 success"),
|
||||
"phase2 success should persist selected rows"
|
||||
);
|
||||
|
||||
assert!(
|
||||
runtime
|
||||
.mark_thread_memory_mode_polluted(thread_id)
|
||||
.await
|
||||
.expect("mark thread polluted"),
|
||||
"thread should transition to polluted"
|
||||
);
|
||||
|
||||
let next_claim = runtime
|
||||
.try_claim_global_phase2_job(owner, 3600)
|
||||
.await
|
||||
.expect("claim phase2 after pollution");
|
||||
assert!(matches!(next_claim, Phase2JobClaimOutcome::Claimed { .. }));
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn get_phase2_input_selection_treats_regenerated_selected_rows_as_added() {
|
||||
let codex_home = unique_temp_dir();
|
||||
|
||||
@@ -35,6 +35,14 @@ WHERE id = ?
|
||||
.transpose()
|
||||
}
|
||||
|
||||
pub async fn get_thread_memory_mode(&self, id: ThreadId) -> anyhow::Result<Option<String>> {
|
||||
let row = sqlx::query("SELECT memory_mode FROM threads WHERE id = ?")
|
||||
.bind(id.to_string())
|
||||
.fetch_optional(self.pool.as_ref())
|
||||
.await?;
|
||||
Ok(row.and_then(|row| row.try_get("memory_mode").ok()))
|
||||
}
|
||||
|
||||
/// Get dynamic tools for a thread, if present.
|
||||
pub async fn get_dynamic_tools(
|
||||
&self,
|
||||
@@ -199,6 +207,19 @@ FROM threads
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn set_thread_memory_mode(
|
||||
&self,
|
||||
thread_id: ThreadId,
|
||||
memory_mode: &str,
|
||||
) -> anyhow::Result<bool> {
|
||||
let result = sqlx::query("UPDATE threads SET memory_mode = ? WHERE id = ?")
|
||||
.bind(memory_mode)
|
||||
.bind(thread_id.to_string())
|
||||
.execute(self.pool.as_ref())
|
||||
.await?;
|
||||
Ok(result.rows_affected() > 0)
|
||||
}
|
||||
|
||||
async fn upsert_thread_with_creation_memory_mode(
|
||||
&self,
|
||||
metadata: &crate::ThreadMetadata,
|
||||
@@ -357,6 +378,16 @@ ON CONFLICT(thread_id, position) DO NOTHING
|
||||
}
|
||||
return Err(err);
|
||||
}
|
||||
if let Some(memory_mode) = extract_memory_mode(items)
|
||||
&& let Err(err) = self
|
||||
.set_thread_memory_mode(builder.id, memory_mode.as_str())
|
||||
.await
|
||||
{
|
||||
if let Some(otel) = otel {
|
||||
otel.counter(DB_ERROR_METRIC, 1, &[("stage", "set_thread_memory_mode")]);
|
||||
}
|
||||
return Err(err);
|
||||
}
|
||||
let dynamic_tools = extract_dynamic_tools(items);
|
||||
if let Some(dynamic_tools) = dynamic_tools
|
||||
&& let Err(err) = self
|
||||
@@ -438,6 +469,16 @@ pub(super) fn extract_dynamic_tools(items: &[RolloutItem]) -> Option<Option<Vec<
|
||||
})
|
||||
}
|
||||
|
||||
pub(super) fn extract_memory_mode(items: &[RolloutItem]) -> Option<String> {
|
||||
items.iter().rev().find_map(|item| match item {
|
||||
RolloutItem::SessionMeta(meta_line) => meta_line.meta.memory_mode.clone(),
|
||||
RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
})
|
||||
}
|
||||
|
||||
pub(super) fn push_thread_filters<'a>(
|
||||
builder: &mut QueryBuilder<'a, Sqlite>,
|
||||
archived_only: bool,
|
||||
@@ -518,7 +559,11 @@ mod tests {
|
||||
use super::*;
|
||||
use crate::runtime::test_support::test_thread_metadata;
|
||||
use crate::runtime::test_support::unique_temp_dir;
|
||||
use codex_protocol::protocol::SessionMeta;
|
||||
use codex_protocol::protocol::SessionMetaLine;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::path::PathBuf;
|
||||
|
||||
#[tokio::test]
|
||||
async fn upsert_thread_keeps_creation_memory_mode_for_existing_rows() {
|
||||
@@ -557,4 +602,56 @@ mod tests {
|
||||
.expect("memory mode should remain readable");
|
||||
assert_eq!(memory_mode, "disabled");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn apply_rollout_items_restores_memory_mode_from_session_meta() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("state db should initialize");
|
||||
let thread_id =
|
||||
ThreadId::from_string("00000000-0000-0000-0000-000000000456").expect("valid thread id");
|
||||
let metadata = test_thread_metadata(&codex_home, thread_id, codex_home.clone());
|
||||
|
||||
runtime
|
||||
.upsert_thread(&metadata)
|
||||
.await
|
||||
.expect("initial upsert should succeed");
|
||||
|
||||
let builder = ThreadMetadataBuilder::new(
|
||||
thread_id,
|
||||
metadata.rollout_path.clone(),
|
||||
metadata.created_at,
|
||||
SessionSource::Cli,
|
||||
);
|
||||
let items = vec![RolloutItem::SessionMeta(SessionMetaLine {
|
||||
meta: SessionMeta {
|
||||
id: thread_id,
|
||||
forked_from_id: None,
|
||||
timestamp: metadata.created_at.to_rfc3339(),
|
||||
cwd: PathBuf::new(),
|
||||
originator: String::new(),
|
||||
cli_version: String::new(),
|
||||
source: SessionSource::Cli,
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
model_provider: None,
|
||||
base_instructions: None,
|
||||
dynamic_tools: None,
|
||||
memory_mode: Some("polluted".to_string()),
|
||||
},
|
||||
git: None,
|
||||
})];
|
||||
|
||||
runtime
|
||||
.apply_rollout_items(&builder, &items, None, None)
|
||||
.await
|
||||
.expect("apply_rollout_items should succeed");
|
||||
|
||||
let memory_mode = runtime
|
||||
.get_thread_memory_mode(thread_id)
|
||||
.await
|
||||
.expect("memory mode should load");
|
||||
assert_eq!(memory_mode.as_deref(), Some("polluted"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1246,8 +1246,8 @@ impl App {
|
||||
.collect();
|
||||
|
||||
self.chat_widget.show_selection_view(SelectionViewParams {
|
||||
title: Some("Agents".to_string()),
|
||||
subtitle: Some("Select a thread to focus".to_string()),
|
||||
title: Some("Multi-agents".to_string()),
|
||||
subtitle: Some("Select an agent to watch".to_string()),
|
||||
footer_hint: Some(standard_popup_hint_line()),
|
||||
items,
|
||||
initial_selected_idx,
|
||||
|
||||
@@ -342,7 +342,7 @@ mod tests {
|
||||
CommandItem::UserPrompt(_) => None,
|
||||
})
|
||||
.collect();
|
||||
assert_eq!(cmds, vec!["model", "mention", "mcp"]);
|
||||
assert_eq!(cmds, vec!["model", "mention", "mcp", "multi-agents"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -129,6 +129,7 @@ pub(crate) enum CancellationEvent {
|
||||
NotHandled,
|
||||
}
|
||||
|
||||
use crate::bottom_pane::prompt_args::parse_slash_name;
|
||||
pub(crate) use chat_composer::ChatComposer;
|
||||
pub(crate) use chat_composer::ChatComposerConfig;
|
||||
pub(crate) use chat_composer::InputResult;
|
||||
@@ -398,11 +399,20 @@ impl BottomPane {
|
||||
self.request_redraw();
|
||||
InputResult::None
|
||||
} else {
|
||||
let is_agent_command = self
|
||||
.composer_text()
|
||||
.lines()
|
||||
.next()
|
||||
.and_then(parse_slash_name)
|
||||
.is_some_and(|(name, _, _)| name == "agent");
|
||||
|
||||
// If a task is running and a status line is visible, allow Esc to
|
||||
// send an interrupt even while the composer has focus.
|
||||
// When a popup is active, prefer dismissing it over interrupting the task.
|
||||
if key_event.code == KeyCode::Esc
|
||||
&& matches!(key_event.kind, KeyEventKind::Press | KeyEventKind::Repeat)
|
||||
&& self.is_task_running
|
||||
&& !is_agent_command
|
||||
&& !self.composer.popup_active()
|
||||
&& let Some(status) = &self.status
|
||||
{
|
||||
@@ -1593,6 +1603,90 @@ mod tests {
|
||||
assert_eq!(pane.composer_text(), "/");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn esc_with_agent_command_without_popup_does_not_interrupt_task() {
|
||||
let (tx_raw, mut rx) = unbounded_channel::<AppEvent>();
|
||||
let tx = AppEventSender::new(tx_raw);
|
||||
let mut pane = BottomPane::new(BottomPaneParams {
|
||||
app_event_tx: tx,
|
||||
frame_requester: FrameRequester::test_dummy(),
|
||||
has_input_focus: true,
|
||||
enhanced_keys_supported: false,
|
||||
placeholder_text: "Ask Codex to do anything".to_string(),
|
||||
disable_paste_burst: false,
|
||||
animations_enabled: true,
|
||||
skills: Some(Vec::new()),
|
||||
});
|
||||
|
||||
pane.set_task_running(true);
|
||||
|
||||
// Repro: `/agent ` hides the popup (cursor past command name). Esc should
|
||||
// keep editing command text instead of interrupting the running task.
|
||||
pane.insert_str("/agent ");
|
||||
assert!(
|
||||
!pane.composer.popup_active(),
|
||||
"expected command popup to be hidden after entering `/agent `"
|
||||
);
|
||||
|
||||
pane.handle_key_event(KeyEvent::new(KeyCode::Esc, KeyModifiers::NONE));
|
||||
|
||||
while let Ok(ev) = rx.try_recv() {
|
||||
assert!(
|
||||
!matches!(ev, AppEvent::CodexOp(Op::Interrupt)),
|
||||
"expected Esc to not send Op::Interrupt while typing `/agent`"
|
||||
);
|
||||
}
|
||||
assert_eq!(pane.composer_text(), "/agent ");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn esc_release_after_dismissing_agent_picker_does_not_interrupt_task() {
|
||||
let (tx_raw, mut rx) = unbounded_channel::<AppEvent>();
|
||||
let tx = AppEventSender::new(tx_raw);
|
||||
let mut pane = BottomPane::new(BottomPaneParams {
|
||||
app_event_tx: tx,
|
||||
frame_requester: FrameRequester::test_dummy(),
|
||||
has_input_focus: true,
|
||||
enhanced_keys_supported: false,
|
||||
placeholder_text: "Ask Codex to do anything".to_string(),
|
||||
disable_paste_burst: false,
|
||||
animations_enabled: true,
|
||||
skills: Some(Vec::new()),
|
||||
});
|
||||
|
||||
pane.set_task_running(true);
|
||||
pane.show_selection_view(SelectionViewParams {
|
||||
title: Some("Agents".to_string()),
|
||||
items: vec![SelectionItem {
|
||||
name: "Main".to_string(),
|
||||
..Default::default()
|
||||
}],
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
pane.handle_key_event(KeyEvent::new_with_kind(
|
||||
KeyCode::Esc,
|
||||
KeyModifiers::NONE,
|
||||
KeyEventKind::Press,
|
||||
));
|
||||
pane.handle_key_event(KeyEvent::new_with_kind(
|
||||
KeyCode::Esc,
|
||||
KeyModifiers::NONE,
|
||||
KeyEventKind::Release,
|
||||
));
|
||||
|
||||
while let Ok(ev) = rx.try_recv() {
|
||||
assert!(
|
||||
!matches!(ev, AppEvent::CodexOp(Op::Interrupt)),
|
||||
"expected Esc release after dismissing agent picker to not interrupt"
|
||||
);
|
||||
}
|
||||
assert!(
|
||||
pane.no_modal_or_popup_active(),
|
||||
"expected Esc press to dismiss the agent picker"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn esc_interrupts_running_task_when_no_popup() {
|
||||
let (tx_raw, mut rx) = unbounded_channel::<AppEvent>();
|
||||
|
||||
@@ -1147,6 +1147,10 @@ impl ChatWidget {
|
||||
Some(event.reasoning_effort),
|
||||
None,
|
||||
);
|
||||
if let Some(mask) = self.active_collaboration_mask.as_mut() {
|
||||
mask.model = Some(model_for_header.clone());
|
||||
mask.reasoning_effort = Some(event.reasoning_effort);
|
||||
}
|
||||
self.refresh_model_display();
|
||||
self.sync_personality_command_enabled();
|
||||
let startup_tooltip_override = self.startup_tooltip_override.take();
|
||||
@@ -3605,7 +3609,7 @@ impl ChatWidget {
|
||||
}
|
||||
self.open_collaboration_modes_popup();
|
||||
}
|
||||
SlashCommand::Agent => {
|
||||
SlashCommand::Agent | SlashCommand::MultiAgents => {
|
||||
self.app_event_tx.send(AppEvent::OpenAgentPicker);
|
||||
}
|
||||
SlashCommand::Approvals => {
|
||||
|
||||
@@ -38,7 +38,7 @@ struct AgentLabel<'a> {
|
||||
|
||||
pub(crate) fn agent_picker_status_dot_spans(is_closed: bool) -> Vec<Span<'static>> {
|
||||
let dot = if is_closed {
|
||||
"•".dark_gray()
|
||||
"•".into()
|
||||
} else {
|
||||
"•".green()
|
||||
};
|
||||
@@ -283,16 +283,16 @@ fn agent_label_spans(agent: AgentLabel<'_>) -> Vec<Span<'static>> {
|
||||
let role = agent.role.map(str::trim).filter(|role| !role.is_empty());
|
||||
|
||||
if let Some(nickname) = nickname {
|
||||
spans.push(Span::from(nickname.to_string()).light_blue().bold());
|
||||
spans.push(Span::from(nickname.to_string()).cyan().bold());
|
||||
} else if let Some(thread_id) = agent.thread_id {
|
||||
spans.push(Span::from(thread_id.to_string()).dim());
|
||||
spans.push(Span::from(thread_id.to_string()).cyan());
|
||||
} else {
|
||||
spans.push(Span::from("agent").dim());
|
||||
spans.push(Span::from("agent").cyan());
|
||||
}
|
||||
|
||||
if let Some(role) = role {
|
||||
spans.push(Span::from(" ").dim());
|
||||
spans.push(Span::from(format!("[{role}]")).dim());
|
||||
spans.push(Span::from(format!("[{role}]")));
|
||||
}
|
||||
|
||||
spans
|
||||
@@ -346,7 +346,7 @@ fn wait_complete_lines(
|
||||
agent_statuses: &[CollabAgentStatusEntry],
|
||||
) -> Vec<Line<'static>> {
|
||||
if statuses.is_empty() && agent_statuses.is_empty() {
|
||||
return vec![Line::from(Span::from("No agents completed yet").dim())];
|
||||
return vec![Line::from(Span::from("No agents completed yet"))];
|
||||
}
|
||||
|
||||
let entries = if agent_statuses.is_empty() {
|
||||
@@ -409,7 +409,7 @@ fn status_summary_line(status: &AgentStatus) -> Line<'static> {
|
||||
|
||||
fn status_summary_spans(status: &AgentStatus) -> Vec<Span<'static>> {
|
||||
match status {
|
||||
AgentStatus::PendingInit => vec![Span::from("Pending init").dim()],
|
||||
AgentStatus::PendingInit => vec![Span::from("Pending init").cyan()],
|
||||
AgentStatus::Running => vec![Span::from("Running").cyan().bold()],
|
||||
AgentStatus::Completed(message) => {
|
||||
let mut spans = vec![Span::from("Completed").green()];
|
||||
@@ -433,11 +433,11 @@ fn status_summary_spans(status: &AgentStatus) -> Vec<Span<'static>> {
|
||||
);
|
||||
if !error_preview.is_empty() {
|
||||
spans.push(Span::from(" - ").dim());
|
||||
spans.push(Span::from(error_preview).dim());
|
||||
spans.push(Span::from(error_preview));
|
||||
}
|
||||
spans
|
||||
}
|
||||
AgentStatus::Shutdown => vec![Span::from("Shutdown").dim()],
|
||||
AgentStatus::Shutdown => vec![Span::from("Shutdown")],
|
||||
AgentStatus::NotFound => vec![Span::from("Not found").red()],
|
||||
}
|
||||
}
|
||||
@@ -553,10 +553,11 @@ mod tests {
|
||||
let lines = cell.display_lines(200);
|
||||
let title = &lines[0];
|
||||
assert_eq!(title.spans[2].content.as_ref(), "Robie");
|
||||
assert_eq!(title.spans[2].style.fg, Some(Color::LightBlue));
|
||||
assert_eq!(title.spans[2].style.fg, Some(Color::Cyan));
|
||||
assert!(title.spans[2].style.add_modifier.contains(Modifier::BOLD));
|
||||
assert_eq!(title.spans[4].content.as_ref(), "[explorer]");
|
||||
assert!(title.spans[4].style.add_modifier.contains(Modifier::DIM));
|
||||
assert_eq!(title.spans[4].style.fg, None);
|
||||
assert!(!title.spans[4].style.add_modifier.contains(Modifier::DIM));
|
||||
}
|
||||
|
||||
fn cell_to_text(cell: &PlainHistoryCell) -> String {
|
||||
|
||||
@@ -53,6 +53,7 @@ pub enum SlashCommand {
|
||||
Realtime,
|
||||
Settings,
|
||||
TestApproval,
|
||||
MultiAgents,
|
||||
// Debugging commands.
|
||||
#[strum(serialize = "debug-m-drop")]
|
||||
MemoryDrop,
|
||||
@@ -93,7 +94,7 @@ impl SlashCommand {
|
||||
SlashCommand::Settings => "configure realtime microphone/speaker",
|
||||
SlashCommand::Plan => "switch to Plan mode",
|
||||
SlashCommand::Collab => "change collaboration mode (experimental)",
|
||||
SlashCommand::Agent => "switch the active agent thread",
|
||||
SlashCommand::Agent | SlashCommand::MultiAgents => "switch the active agent thread",
|
||||
SlashCommand::Approvals => "choose what Codex is allowed to do",
|
||||
SlashCommand::Permissions => "choose what Codex is allowed to do",
|
||||
SlashCommand::ElevateSandbox => "set up elevated agent sandbox",
|
||||
@@ -167,7 +168,7 @@ impl SlashCommand {
|
||||
SlashCommand::Realtime => true,
|
||||
SlashCommand::Settings => true,
|
||||
SlashCommand::Collab => true,
|
||||
SlashCommand::Agent => true,
|
||||
SlashCommand::Agent | SlashCommand::MultiAgents => true,
|
||||
SlashCommand::Statusline => false,
|
||||
SlashCommand::Theme => false,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user