Compare commits

..

1 Commits

Author SHA1 Message Date
Ahmed Ibrahim
66647e7eb8 prefix 2026-02-12 11:06:53 -08:00
39 changed files with 1016 additions and 1742 deletions

2
codex-rs/Cargo.lock generated
View File

@@ -1692,7 +1692,6 @@ dependencies = [
"codex-utils-home-dir",
"codex-utils-pty",
"codex-utils-readiness",
"codex-utils-sanitizer",
"codex-utils-string",
"codex-windows-sandbox",
"core-foundation 0.9.4",
@@ -1721,6 +1720,7 @@ dependencies = [
"predicates",
"pretty_assertions",
"rand 0.9.2",
"regex",
"regex-lite",
"reqwest",
"rmcp",

View File

@@ -340,6 +340,7 @@ ignored = [
"icu_provider",
"openssl-sys",
"codex-utils-readiness",
"codex-utils-sanitizer",
"codex-secrets",
]

View File

@@ -1,63 +0,0 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"FuzzyFileSearchResult": {
"description": "Superset of [`codex_file_search::FileMatch`]",
"properties": {
"file_name": {
"type": "string"
},
"indices": {
"items": {
"format": "uint32",
"minimum": 0.0,
"type": "integer"
},
"type": [
"array",
"null"
]
},
"path": {
"type": "string"
},
"root": {
"type": "string"
},
"score": {
"format": "uint32",
"minimum": 0.0,
"type": "integer"
}
},
"required": [
"file_name",
"path",
"root",
"score"
],
"type": "object"
}
},
"properties": {
"files": {
"items": {
"$ref": "#/definitions/FuzzyFileSearchResult"
},
"type": "array"
},
"query": {
"type": "string"
},
"sessionId": {
"type": "string"
}
},
"required": [
"files",
"query",
"sessionId"
],
"title": "FuzzyFileSearchSessionUpdatedNotification",
"type": "object"
}

View File

@@ -3684,65 +3684,6 @@
],
"type": "object"
},
"FuzzyFileSearchResult": {
"description": "Superset of [`codex_file_search::FileMatch`]",
"properties": {
"file_name": {
"type": "string"
},
"indices": {
"items": {
"format": "uint32",
"minimum": 0.0,
"type": "integer"
},
"type": [
"array",
"null"
]
},
"path": {
"type": "string"
},
"root": {
"type": "string"
},
"score": {
"format": "uint32",
"minimum": 0.0,
"type": "integer"
}
},
"required": [
"file_name",
"path",
"root",
"score"
],
"type": "object"
},
"FuzzyFileSearchSessionUpdatedNotification": {
"properties": {
"files": {
"items": {
"$ref": "#/definitions/FuzzyFileSearchResult"
},
"type": "array"
},
"query": {
"type": "string"
},
"sessionId": {
"type": "string"
}
},
"required": [
"files",
"query",
"sessionId"
],
"type": "object"
},
"GhostCommit": {
"description": "Details of a ghost commit created from a repository state.",
"properties": {
@@ -8230,26 +8171,6 @@
"title": "ConfigWarningNotification",
"type": "object"
},
{
"properties": {
"method": {
"enum": [
"fuzzyFileSearch/sessionUpdated"
],
"title": "FuzzyFileSearch/sessionUpdatedNotificationMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/FuzzyFileSearchSessionUpdatedNotification"
}
},
"required": [
"method",
"params"
],
"title": "FuzzyFileSearch/sessionUpdatedNotification",
"type": "object"
},
{
"description": "Notifies the user of world-writable directories on Windows, which cannot be protected by the sandbox.",
"properties": {

View File

@@ -5386,30 +5386,6 @@
],
"type": "object"
},
"FuzzyFileSearchSessionUpdatedNotification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"files": {
"items": {
"$ref": "#/definitions/FuzzyFileSearchResult"
},
"type": "array"
},
"query": {
"type": "string"
},
"sessionId": {
"type": "string"
}
},
"required": [
"files",
"query",
"sessionId"
],
"title": "FuzzyFileSearchSessionUpdatedNotification",
"type": "object"
},
"GetAuthStatusParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
@@ -8449,26 +8425,6 @@
"title": "ConfigWarningNotification",
"type": "object"
},
{
"properties": {
"method": {
"enum": [
"fuzzyFileSearch/sessionUpdated"
],
"title": "FuzzyFileSearch/sessionUpdatedNotificationMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/FuzzyFileSearchSessionUpdatedNotification"
}
},
"required": [
"method",
"params"
],
"title": "FuzzyFileSearch/sessionUpdatedNotification",
"type": "object"
},
{
"description": "Notifies the user of world-writable directories on Windows, which cannot be protected by the sandbox.",
"properties": {

View File

@@ -1,6 +0,0 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { FuzzyFileSearchResult } from "./FuzzyFileSearchResult";
export type FuzzyFileSearchSessionUpdatedNotification = { sessionId: string, query: string, files: Array<FuzzyFileSearchResult>, };

View File

@@ -2,7 +2,6 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { AuthStatusChangeNotification } from "./AuthStatusChangeNotification";
import type { FuzzyFileSearchSessionUpdatedNotification } from "./FuzzyFileSearchSessionUpdatedNotification";
import type { LoginChatGptCompleteNotification } from "./LoginChatGptCompleteNotification";
import type { SessionConfiguredNotification } from "./SessionConfiguredNotification";
import type { AccountLoginCompletedNotification } from "./v2/AccountLoginCompletedNotification";
@@ -38,4 +37,4 @@ import type { WindowsWorldWritableWarningNotification } from "./v2/WindowsWorldW
/**
* Notification sent from the server to the client.
*/
export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "app/list/updated", "params": AppListUpdatedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "fuzzyFileSearch/sessionUpdated", "params": FuzzyFileSearchSessionUpdatedNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification } | { "method": "authStatusChange", "params": AuthStatusChangeNotification } | { "method": "loginChatGptComplete", "params": LoginChatGptCompleteNotification } | { "method": "sessionConfigured", "params": SessionConfiguredNotification };
export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "app/list/updated", "params": AppListUpdatedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification } | { "method": "authStatusChange", "params": AuthStatusChangeNotification } | { "method": "loginChatGptComplete", "params": LoginChatGptCompleteNotification } | { "method": "sessionConfigured", "params": SessionConfiguredNotification };

View File

@@ -77,7 +77,6 @@ export type { FunctionCallOutputPayload } from "./FunctionCallOutputPayload";
export type { FuzzyFileSearchParams } from "./FuzzyFileSearchParams";
export type { FuzzyFileSearchResponse } from "./FuzzyFileSearchResponse";
export type { FuzzyFileSearchResult } from "./FuzzyFileSearchResult";
export type { FuzzyFileSearchSessionUpdatedNotification } from "./FuzzyFileSearchSessionUpdatedNotification";
export type { GetAuthStatusParams } from "./GetAuthStatusParams";
export type { GetAuthStatusResponse } from "./GetAuthStatusResponse";
export type { GetConversationSummaryParams } from "./GetConversationSummaryParams";

View File

@@ -458,21 +458,6 @@ client_request_definitions! {
params: FuzzyFileSearchParams,
response: FuzzyFileSearchResponse,
},
#[experimental("fuzzyFileSearch/sessionStart")]
FuzzyFileSearchSessionStart => "fuzzyFileSearch/sessionStart" {
params: FuzzyFileSearchSessionStartParams,
response: FuzzyFileSearchSessionStartResponse,
},
#[experimental("fuzzyFileSearch/sessionUpdate")]
FuzzyFileSearchSessionUpdate => "fuzzyFileSearch/sessionUpdate" {
params: FuzzyFileSearchSessionUpdateParams,
response: FuzzyFileSearchSessionUpdateResponse,
},
#[experimental("fuzzyFileSearch/sessionStop")]
FuzzyFileSearchSessionStop => "fuzzyFileSearch/sessionStop" {
params: FuzzyFileSearchSessionStopParams,
response: FuzzyFileSearchSessionStopResponse,
},
/// Execute a command (argv vector) under the server's sandbox.
ExecOneOffCommand {
params: v1::ExecOneOffCommandParams,
@@ -717,47 +702,6 @@ pub struct FuzzyFileSearchResponse {
pub files: Vec<FuzzyFileSearchResult>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
pub struct FuzzyFileSearchSessionStartParams {
pub session_id: String,
pub roots: Vec<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS, Default)]
pub struct FuzzyFileSearchSessionStartResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
pub struct FuzzyFileSearchSessionUpdateParams {
pub session_id: String,
pub query: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS, Default)]
pub struct FuzzyFileSearchSessionUpdateResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
pub struct FuzzyFileSearchSessionStopParams {
pub session_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS, Default)]
pub struct FuzzyFileSearchSessionStopResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
pub struct FuzzyFileSearchSessionUpdatedNotification {
pub session_id: String,
pub query: String,
pub files: Vec<FuzzyFileSearchResult>,
}
server_notification_definitions! {
/// NEW NOTIFICATIONS
Error => "error" (v2::ErrorNotification),
@@ -790,7 +734,6 @@ server_notification_definitions! {
ContextCompacted => "thread/compacted" (v2::ContextCompactedNotification),
DeprecationNotice => "deprecationNotice" (v2::DeprecationNoticeNotification),
ConfigWarning => "configWarning" (v2::ConfigWarningNotification),
FuzzyFileSearchSessionUpdated => "fuzzyFileSearch/sessionUpdated" (FuzzyFileSearchSessionUpdatedNotification),
/// Notifies the user of world-writable directories on Windows, which cannot be protected by the sandbox.
WindowsWorldWritableWarning => "windows/worldWritableWarning" (v2::WindowsWorldWritableWarningNotification),

View File

@@ -1,9 +1,7 @@
use crate::bespoke_event_handling::apply_bespoke_event_handling;
use crate::error_code::INTERNAL_ERROR_CODE;
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
use crate::fuzzy_file_search::FuzzyFileSearchSession;
use crate::fuzzy_file_search::run_fuzzy_file_search;
use crate::fuzzy_file_search::start_fuzzy_file_search_session;
use crate::models::supported_models;
use crate::outgoing_message::ConnectionId;
use crate::outgoing_message::ConnectionRequestId;
@@ -49,12 +47,6 @@ use codex_app_server_protocol::ForkConversationParams;
use codex_app_server_protocol::ForkConversationResponse;
use codex_app_server_protocol::FuzzyFileSearchParams;
use codex_app_server_protocol::FuzzyFileSearchResponse;
use codex_app_server_protocol::FuzzyFileSearchSessionStartParams;
use codex_app_server_protocol::FuzzyFileSearchSessionStartResponse;
use codex_app_server_protocol::FuzzyFileSearchSessionStopParams;
use codex_app_server_protocol::FuzzyFileSearchSessionStopResponse;
use codex_app_server_protocol::FuzzyFileSearchSessionUpdateParams;
use codex_app_server_protocol::FuzzyFileSearchSessionUpdateResponse;
use codex_app_server_protocol::GetAccountParams;
use codex_app_server_protocol::GetAccountRateLimitsResponse;
use codex_app_server_protocol::GetAccountResponse;
@@ -302,7 +294,6 @@ pub(crate) struct CodexMessageProcessor {
active_login: Arc<Mutex<Option<ActiveLogin>>>,
thread_state_manager: ThreadStateManager,
pending_fuzzy_searches: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>>,
fuzzy_search_sessions: Arc<Mutex<HashMap<String, FuzzyFileSearchSession>>>,
feedback: CodexFeedback,
}
@@ -370,7 +361,6 @@ impl CodexMessageProcessor {
active_login: Arc::new(Mutex::new(None)),
thread_state_manager: ThreadStateManager::new(),
pending_fuzzy_searches: Arc::new(Mutex::new(HashMap::new())),
fuzzy_search_sessions: Arc::new(Mutex::new(HashMap::new())),
feedback,
}
}
@@ -737,18 +727,6 @@ impl CodexMessageProcessor {
self.fuzzy_file_search(to_connection_request_id(request_id), params)
.await;
}
ClientRequest::FuzzyFileSearchSessionStart { request_id, params } => {
self.fuzzy_file_search_session_start(to_connection_request_id(request_id), params)
.await;
}
ClientRequest::FuzzyFileSearchSessionUpdate { request_id, params } => {
self.fuzzy_file_search_session_update(to_connection_request_id(request_id), params)
.await;
}
ClientRequest::FuzzyFileSearchSessionStop { request_id, params } => {
self.fuzzy_file_search_session_stop(to_connection_request_id(request_id), params)
.await;
}
ClientRequest::OneOffCommandExec { request_id, params } => {
self.exec_one_off_command(to_connection_request_id(request_id), params)
.await;
@@ -1932,7 +1910,9 @@ impl CodexMessageProcessor {
let core_dynamic_tools = if dynamic_tools.is_empty() {
Vec::new()
} else {
if let Err(message) = validate_dynamic_tools(&dynamic_tools) {
let snapshot = collect_mcp_snapshot(&config).await;
let mcp_tool_names = snapshot.tools.keys().cloned().collect::<HashSet<_>>();
if let Err(message) = validate_dynamic_tools(&dynamic_tools, &mcp_tool_names) {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message,
@@ -5578,89 +5558,6 @@ impl CodexMessageProcessor {
self.outgoing.send_response(request_id, response).await;
}
async fn fuzzy_file_search_session_start(
&mut self,
request_id: ConnectionRequestId,
params: FuzzyFileSearchSessionStartParams,
) {
let FuzzyFileSearchSessionStartParams { session_id, roots } = params;
if session_id.is_empty() {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "sessionId must not be empty".to_string(),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
let session =
start_fuzzy_file_search_session(session_id.clone(), roots, self.outgoing.clone());
match session {
Ok(session) => {
let mut sessions = self.fuzzy_search_sessions.lock().await;
sessions.insert(session_id, session);
self.outgoing
.send_response(request_id, FuzzyFileSearchSessionStartResponse {})
.await;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to start fuzzy file search session: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
}
}
}
async fn fuzzy_file_search_session_update(
&mut self,
request_id: ConnectionRequestId,
params: FuzzyFileSearchSessionUpdateParams,
) {
let FuzzyFileSearchSessionUpdateParams { session_id, query } = params;
let found = {
let sessions = self.fuzzy_search_sessions.lock().await;
if let Some(session) = sessions.get(&session_id) {
session.update_query(query);
true
} else {
false
}
};
if !found {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("fuzzy file search session not found: {session_id}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
self.outgoing
.send_response(request_id, FuzzyFileSearchSessionUpdateResponse {})
.await;
}
async fn fuzzy_file_search_session_stop(
&mut self,
request_id: ConnectionRequestId,
params: FuzzyFileSearchSessionStopParams,
) {
let FuzzyFileSearchSessionStopParams { session_id } = params;
{
let mut sessions = self.fuzzy_search_sessions.lock().await;
sessions.remove(&session_id);
}
self.outgoing
.send_response(request_id, FuzzyFileSearchSessionStopResponse {})
.await;
}
async fn upload_feedback(&self, request_id: ConnectionRequestId, params: FeedbackUploadParams) {
if !self.config.feedback_enabled {
let error = JSONRPCErrorError {
@@ -5815,7 +5712,10 @@ fn errors_to_info(
.collect()
}
fn validate_dynamic_tools(tools: &[ApiDynamicToolSpec]) -> Result<(), String> {
fn validate_dynamic_tools(
tools: &[ApiDynamicToolSpec],
mcp_tool_names: &HashSet<String>,
) -> Result<(), String> {
let mut seen = HashSet::new();
for tool in tools {
let name = tool.name.trim();
@@ -5831,6 +5731,9 @@ fn validate_dynamic_tools(tools: &[ApiDynamicToolSpec]) -> Result<(), String> {
if name == "mcp" || name.starts_with("mcp__") {
return Err(format!("dynamic tool name is reserved: {name}"));
}
if mcp_tool_names.contains(name) {
return Err(format!("dynamic tool name conflicts with MCP tool: {name}"));
}
if !seen.insert(name.to_string()) {
return Err(format!("duplicate dynamic tool name: {name}"));
}
@@ -6333,7 +6236,7 @@ mod tests {
description: "test".to_string(),
input_schema: json!({"type": "null"}),
}];
let err = validate_dynamic_tools(&tools).expect_err("invalid schema");
let err = validate_dynamic_tools(&tools, &HashSet::new()).expect_err("invalid schema");
assert!(err.contains("my_tool"), "unexpected error: {err}");
}
@@ -6345,7 +6248,7 @@ mod tests {
// Missing `type` is common; core sanitizes these to a supported schema.
input_schema: json!({"properties": {}}),
}];
validate_dynamic_tools(&tools).expect("valid schema");
validate_dynamic_tools(&tools, &HashSet::new()).expect("valid schema");
}
#[test]

View File

@@ -1,18 +1,12 @@
use std::num::NonZero;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use codex_app_server_protocol::FuzzyFileSearchResult;
use codex_app_server_protocol::FuzzyFileSearchSessionUpdatedNotification;
use codex_app_server_protocol::ServerNotification;
use codex_file_search as file_search;
use tracing::warn;
use crate::outgoing_message::OutgoingMessageSender;
const MATCH_LIMIT: usize = 50;
const MAX_THREADS: usize = 12;
@@ -83,148 +77,3 @@ pub(crate) async fn run_fuzzy_file_search(
files
}
pub(crate) struct FuzzyFileSearchSession {
session: file_search::FileSearchSession,
shared: Arc<SessionShared>,
}
impl FuzzyFileSearchSession {
pub(crate) fn update_query(&self, query: String) {
if self.shared.canceled.load(Ordering::Relaxed) {
return;
}
{
#[expect(clippy::unwrap_used)]
let mut latest_query = self.shared.latest_query.lock().unwrap();
*latest_query = query.clone();
}
self.session.update_query(&query);
}
}
impl Drop for FuzzyFileSearchSession {
fn drop(&mut self) {
self.shared.canceled.store(true, Ordering::Relaxed);
}
}
pub(crate) fn start_fuzzy_file_search_session(
session_id: String,
roots: Vec<String>,
outgoing: Arc<OutgoingMessageSender>,
) -> anyhow::Result<FuzzyFileSearchSession> {
#[expect(clippy::expect_used)]
let limit = NonZero::new(MATCH_LIMIT).expect("MATCH_LIMIT should be a valid non-zero usize");
let cores = std::thread::available_parallelism()
.map(std::num::NonZero::get)
.unwrap_or(1);
let threads = cores.min(MAX_THREADS);
#[expect(clippy::expect_used)]
let threads = NonZero::new(threads.max(1)).expect("threads should be non-zero");
let search_dirs: Vec<PathBuf> = roots.iter().map(PathBuf::from).collect();
let canceled = Arc::new(AtomicBool::new(false));
let shared = Arc::new(SessionShared {
session_id,
latest_query: Mutex::new(String::new()),
outgoing,
runtime: tokio::runtime::Handle::current(),
canceled: canceled.clone(),
});
let reporter = Arc::new(SessionReporterImpl {
shared: shared.clone(),
});
let session = file_search::create_session(
search_dirs,
file_search::FileSearchOptions {
limit,
threads,
compute_indices: true,
..Default::default()
},
reporter,
Some(canceled),
)?;
Ok(FuzzyFileSearchSession { session, shared })
}
struct SessionShared {
session_id: String,
latest_query: Mutex<String>,
outgoing: Arc<OutgoingMessageSender>,
runtime: tokio::runtime::Handle,
canceled: Arc<AtomicBool>,
}
struct SessionReporterImpl {
shared: Arc<SessionShared>,
}
impl SessionReporterImpl {
fn send_snapshot(&self, snapshot: &file_search::FileSearchSnapshot) {
if self.shared.canceled.load(Ordering::Relaxed) {
return;
}
let query = {
#[expect(clippy::unwrap_used)]
self.shared.latest_query.lock().unwrap().clone()
};
if snapshot.query != query {
return;
}
let files = if query.is_empty() {
Vec::new()
} else {
collect_files(snapshot)
};
let notification = ServerNotification::FuzzyFileSearchSessionUpdated(
FuzzyFileSearchSessionUpdatedNotification {
session_id: self.shared.session_id.clone(),
query,
files,
},
);
let outgoing = self.shared.outgoing.clone();
self.shared.runtime.spawn(async move {
outgoing.send_server_notification(notification).await;
});
}
}
impl file_search::SessionReporter for SessionReporterImpl {
fn on_update(&self, snapshot: &file_search::FileSearchSnapshot) {
self.send_snapshot(snapshot);
}
fn on_complete(&self) {}
}
fn collect_files(snapshot: &file_search::FileSearchSnapshot) -> Vec<FuzzyFileSearchResult> {
let mut files = snapshot
.matches
.iter()
.map(|m| {
let file_name = m.path.file_name().unwrap_or_default();
FuzzyFileSearchResult {
root: m.root.to_string_lossy().to_string(),
path: m.path.to_string_lossy().to_string(),
file_name: file_name.to_string_lossy().to_string(),
score: m.score,
indices: m.indices.clone(),
}
})
.collect::<Vec<_>>();
files.sort_by(file_search::cmp_by_score_desc_then_path_asc::<
FuzzyFileSearchResult,
_,
_,
>(|f| f.score, |f| f.path.as_str()));
files
}

View File

@@ -678,78 +678,6 @@ impl McpProcess {
self.send_request("fuzzyFileSearch", Some(params)).await
}
pub async fn send_fuzzy_file_search_session_start_request(
&mut self,
session_id: &str,
roots: Vec<String>,
) -> anyhow::Result<i64> {
let params = serde_json::json!({
"sessionId": session_id,
"roots": roots,
});
self.send_request("fuzzyFileSearch/sessionStart", Some(params))
.await
}
pub async fn start_fuzzy_file_search_session(
&mut self,
session_id: &str,
roots: Vec<String>,
) -> anyhow::Result<JSONRPCResponse> {
let request_id = self
.send_fuzzy_file_search_session_start_request(session_id, roots)
.await?;
self.read_stream_until_response_message(RequestId::Integer(request_id))
.await
}
pub async fn send_fuzzy_file_search_session_update_request(
&mut self,
session_id: &str,
query: &str,
) -> anyhow::Result<i64> {
let params = serde_json::json!({
"sessionId": session_id,
"query": query,
});
self.send_request("fuzzyFileSearch/sessionUpdate", Some(params))
.await
}
pub async fn update_fuzzy_file_search_session(
&mut self,
session_id: &str,
query: &str,
) -> anyhow::Result<JSONRPCResponse> {
let request_id = self
.send_fuzzy_file_search_session_update_request(session_id, query)
.await?;
self.read_stream_until_response_message(RequestId::Integer(request_id))
.await
}
pub async fn send_fuzzy_file_search_session_stop_request(
&mut self,
session_id: &str,
) -> anyhow::Result<i64> {
let params = serde_json::json!({
"sessionId": session_id,
});
self.send_request("fuzzyFileSearch/sessionStop", Some(params))
.await
}
pub async fn stop_fuzzy_file_search_session(
&mut self,
session_id: &str,
) -> anyhow::Result<JSONRPCResponse> {
let request_id = self
.send_fuzzy_file_search_session_stop_request(session_id)
.await?;
self.read_stream_until_response_message(RequestId::Integer(request_id))
.await
}
async fn send_request(
&mut self,
method: &str,

View File

@@ -1,7 +1,6 @@
use anyhow::Result;
use anyhow::anyhow;
use app_test_support::McpProcess;
use codex_app_server_protocol::FuzzyFileSearchSessionUpdatedNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use pretty_assertions::assert_eq;
@@ -10,130 +9,6 @@ use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
const SHORT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(500);
const STOP_GRACE_PERIOD: std::time::Duration = std::time::Duration::from_millis(250);
const SESSION_UPDATED_METHOD: &str = "fuzzyFileSearch/sessionUpdated";
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum FileExpectation {
Any,
Empty,
NonEmpty,
}
async fn initialized_mcp(codex_home: &TempDir) -> Result<McpProcess> {
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
Ok(mcp)
}
async fn wait_for_session_updated(
mcp: &mut McpProcess,
session_id: &str,
query: &str,
file_expectation: FileExpectation,
) -> Result<FuzzyFileSearchSessionUpdatedNotification> {
for _ in 0..20 {
let notification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message(SESSION_UPDATED_METHOD),
)
.await??;
let params = notification
.params
.ok_or_else(|| anyhow!("missing notification params"))?;
let payload = serde_json::from_value::<FuzzyFileSearchSessionUpdatedNotification>(params)?;
if payload.session_id != session_id || payload.query != query {
continue;
}
let files_match = match file_expectation {
FileExpectation::Any => true,
FileExpectation::Empty => payload.files.is_empty(),
FileExpectation::NonEmpty => !payload.files.is_empty(),
};
if files_match {
return Ok(payload);
}
}
anyhow::bail!(
"did not receive expected session update for sessionId={session_id}, query={query}"
);
}
async fn assert_update_request_fails_for_missing_session(
mcp: &mut McpProcess,
session_id: &str,
query: &str,
) -> Result<()> {
let request_id = mcp
.send_fuzzy_file_search_session_update_request(session_id, query)
.await?;
let err = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(err.error.code, -32600);
assert_eq!(
err.error.message,
format!("fuzzy file search session not found: {session_id}")
);
Ok(())
}
async fn assert_no_session_updates_for(
mcp: &mut McpProcess,
session_id: &str,
grace_period: std::time::Duration,
duration: std::time::Duration,
) -> Result<()> {
let grace_deadline = tokio::time::Instant::now() + grace_period;
loop {
let now = tokio::time::Instant::now();
if now >= grace_deadline {
break;
}
let remaining = grace_deadline - now;
match timeout(
remaining,
mcp.read_stream_until_notification_message(SESSION_UPDATED_METHOD),
)
.await
{
Err(_) => break,
Ok(Err(err)) => return Err(err),
Ok(Ok(_)) => {}
}
}
let deadline = tokio::time::Instant::now() + duration;
loop {
let now = tokio::time::Instant::now();
if now >= deadline {
return Ok(());
}
let remaining = deadline - now;
match timeout(
remaining,
mcp.read_stream_until_notification_message(SESSION_UPDATED_METHOD),
)
.await
{
Err(_) => return Ok(()),
Ok(Err(err)) => return Err(err),
Ok(Ok(notification)) => {
let params = notification
.params
.ok_or_else(|| anyhow!("missing notification params"))?;
let payload =
serde_json::from_value::<FuzzyFileSearchSessionUpdatedNotification>(params)?;
if payload.session_id == session_id {
anyhow::bail!("received unexpected session update after stop: {payload:?}");
}
}
}
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fuzzy_file_search_sorts_and_includes_indices() -> Result<()> {
@@ -250,215 +125,3 @@ async fn test_fuzzy_file_search_accepts_cancellation_token() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fuzzy_file_search_session_streams_updates() -> Result<()> {
let codex_home = TempDir::new()?;
let root = TempDir::new()?;
std::fs::write(root.path().join("alpha.txt"), "contents")?;
let mut mcp = initialized_mcp(&codex_home).await?;
let root_path = root.path().to_string_lossy().to_string();
let session_id = "session-1";
mcp.start_fuzzy_file_search_session(session_id, vec![root_path.clone()])
.await?;
mcp.update_fuzzy_file_search_session(session_id, "alp")
.await?;
let payload =
wait_for_session_updated(&mut mcp, session_id, "alp", FileExpectation::NonEmpty).await?;
assert_eq!(payload.files.len(), 1);
assert_eq!(payload.files[0].root, root_path);
assert_eq!(payload.files[0].path, "alpha.txt");
mcp.stop_fuzzy_file_search_session(session_id).await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fuzzy_file_search_session_update_before_start_errors() -> Result<()> {
let codex_home = TempDir::new()?;
let mut mcp = initialized_mcp(&codex_home).await?;
assert_update_request_fails_for_missing_session(&mut mcp, "missing", "alp").await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fuzzy_file_search_session_update_works_without_waiting_for_start_response()
-> Result<()> {
let codex_home = TempDir::new()?;
let root = TempDir::new()?;
std::fs::write(root.path().join("alpha.txt"), "contents")?;
let mut mcp = initialized_mcp(&codex_home).await?;
let root_path = root.path().to_string_lossy().to_string();
let session_id = "session-no-wait";
let start_request_id = mcp
.send_fuzzy_file_search_session_start_request(session_id, vec![root_path.clone()])
.await?;
let update_request_id = mcp
.send_fuzzy_file_search_session_update_request(session_id, "alp")
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(update_request_id)),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_request_id)),
)
.await??;
let payload =
wait_for_session_updated(&mut mcp, session_id, "alp", FileExpectation::NonEmpty).await?;
assert_eq!(payload.files.len(), 1);
assert_eq!(payload.files[0].root, root_path);
assert_eq!(payload.files[0].path, "alpha.txt");
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fuzzy_file_search_session_multiple_query_updates_work() -> Result<()> {
let codex_home = TempDir::new()?;
let root = TempDir::new()?;
std::fs::write(root.path().join("alpha.txt"), "contents")?;
std::fs::write(root.path().join("alphabet.txt"), "contents")?;
let mut mcp = initialized_mcp(&codex_home).await?;
let root_path = root.path().to_string_lossy().to_string();
let session_id = "session-multi-update";
mcp.start_fuzzy_file_search_session(session_id, vec![root_path.clone()])
.await?;
mcp.update_fuzzy_file_search_session(session_id, "alp")
.await?;
let alp_payload =
wait_for_session_updated(&mut mcp, session_id, "alp", FileExpectation::NonEmpty).await?;
assert_eq!(
alp_payload.files.iter().all(|file| file.root == root_path),
true
);
mcp.update_fuzzy_file_search_session(session_id, "zzzz")
.await?;
let zzzz_payload =
wait_for_session_updated(&mut mcp, session_id, "zzzz", FileExpectation::Any).await?;
assert_eq!(zzzz_payload.query, "zzzz");
assert_eq!(zzzz_payload.files.is_empty(), true);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fuzzy_file_search_session_update_after_stop_fails() -> Result<()> {
let codex_home = TempDir::new()?;
let root = TempDir::new()?;
std::fs::write(root.path().join("alpha.txt"), "contents")?;
let mut mcp = initialized_mcp(&codex_home).await?;
let session_id = "session-stop-fail";
let root_path = root.path().to_string_lossy().to_string();
mcp.start_fuzzy_file_search_session(session_id, vec![root_path])
.await?;
mcp.stop_fuzzy_file_search_session(session_id).await?;
assert_update_request_fails_for_missing_session(&mut mcp, session_id, "alp").await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fuzzy_file_search_session_stops_sending_updates_after_stop() -> Result<()> {
let codex_home = TempDir::new()?;
let root = TempDir::new()?;
for i in 0..10_000 {
let file_path = root.path().join(format!("file-{i:04}.txt"));
std::fs::write(file_path, "contents")?;
}
let mut mcp = initialized_mcp(&codex_home).await?;
let root_path = root.path().to_string_lossy().to_string();
let session_id = "session-stop-no-updates";
mcp.start_fuzzy_file_search_session(session_id, vec![root_path])
.await?;
mcp.update_fuzzy_file_search_session(session_id, "file-")
.await?;
wait_for_session_updated(&mut mcp, session_id, "file-", FileExpectation::NonEmpty).await?;
mcp.stop_fuzzy_file_search_session(session_id).await?;
assert_no_session_updates_for(&mut mcp, session_id, STOP_GRACE_PERIOD, SHORT_READ_TIMEOUT)
.await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fuzzy_file_search_two_sessions_are_independent() -> Result<()> {
let codex_home = TempDir::new()?;
let root_a = TempDir::new()?;
let root_b = TempDir::new()?;
std::fs::write(root_a.path().join("alpha.txt"), "contents")?;
std::fs::write(root_b.path().join("beta.txt"), "contents")?;
let mut mcp = initialized_mcp(&codex_home).await?;
let root_a_path = root_a.path().to_string_lossy().to_string();
let root_b_path = root_b.path().to_string_lossy().to_string();
let session_a = "session-a";
let session_b = "session-b";
mcp.start_fuzzy_file_search_session(session_a, vec![root_a_path.clone()])
.await?;
mcp.start_fuzzy_file_search_session(session_b, vec![root_b_path.clone()])
.await?;
mcp.update_fuzzy_file_search_session(session_a, "alp")
.await?;
let session_a_update =
wait_for_session_updated(&mut mcp, session_a, "alp", FileExpectation::NonEmpty).await?;
assert_eq!(session_a_update.files.len(), 1);
assert_eq!(session_a_update.files[0].root, root_a_path);
assert_eq!(session_a_update.files[0].path, "alpha.txt");
mcp.update_fuzzy_file_search_session(session_b, "bet")
.await?;
let session_b_update =
wait_for_session_updated(&mut mcp, session_b, "bet", FileExpectation::NonEmpty).await?;
assert_eq!(session_b_update.files.len(), 1);
assert_eq!(session_b_update.files[0].root, root_b_path);
assert_eq!(session_b_update.files[0].path, "beta.txt");
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fuzzy_file_search_query_cleared_sends_blank_snapshot() -> Result<()> {
let codex_home = TempDir::new()?;
let root = TempDir::new()?;
std::fs::write(root.path().join("alpha.txt"), "contents")?;
let mut mcp = initialized_mcp(&codex_home).await?;
let root_path = root.path().to_string_lossy().to_string();
let session_id = "session-clear-query";
mcp.start_fuzzy_file_search_session(session_id, vec![root_path])
.await?;
mcp.update_fuzzy_file_search_session(session_id, "alp")
.await?;
wait_for_session_updated(&mut mcp, session_id, "alp", FileExpectation::NonEmpty).await?;
mcp.update_fuzzy_file_search_session(session_id, "").await?;
let payload =
wait_for_session_updated(&mut mcp, session_id, "", FileExpectation::Empty).await?;
assert_eq!(payload.files.is_empty(), true);
Ok(())
}

View File

@@ -9,6 +9,7 @@ use app_test_support::create_mock_responses_server_sequence_unchecked;
use app_test_support::create_shell_command_sse_response;
use app_test_support::format_with_current_shell_display;
use app_test_support::to_response;
use app_test_support::write_models_cache_with_slug_for_originator;
use codex_app_server_protocol::ByteRange;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::CommandExecutionApprovalDecision;
@@ -59,6 +60,7 @@ const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
const TEST_ORIGINATOR: &str = "codex_vscode";
const LOCAL_PRAGMATIC_TEMPLATE: &str = "You are a deeply pragmatic, effective software engineer.";
const APP_SERVER_CACHE_ORIGINATOR: &str = "codex_app_server_cache_e2e";
#[tokio::test]
async fn turn_start_sends_originator_header() -> Result<()> {
@@ -135,6 +137,89 @@ async fn turn_start_sends_originator_header() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn turn_start_uses_originator_scoped_cache_slug() -> Result<()> {
let responses = vec![create_final_assistant_message_sse_response("Done")?];
let server = create_mock_responses_server_sequence_unchecked(responses).await;
let codex_home = TempDir::new()?;
create_config_toml(
codex_home.path(),
&server.uri(),
"never",
&BTreeMap::from([(Feature::Personality, true)]),
)?;
let cached_slug = "app-server-cache-slug-e2e";
write_models_cache_with_slug_for_originator(
codex_home.path(),
APP_SERVER_CACHE_ORIGINATOR,
cached_slug,
)?;
let mut mcp = McpProcess::new_with_env(
codex_home.path(),
&[(
codex_core::default_client::CODEX_INTERNAL_ORIGINATOR_OVERRIDE_ENV_VAR,
Some(APP_SERVER_CACHE_ORIGINATOR),
)],
)
.await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_req = mcp
.send_thread_start_request(ThreadStartParams::default())
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let requests = server
.received_requests()
.await
.expect("failed to fetch received requests");
let response_request = requests
.into_iter()
.find(|request| request.url.path().ends_with("/responses"))
.expect("expected /responses request");
let body: serde_json::Value = serde_json::from_slice(&response_request.body)
.expect("responses request body should be json");
assert_eq!(body["model"].as_str(), Some(cached_slug));
assert!(
codex_home
.path()
.join("models_cache")
.join(APP_SERVER_CACHE_ORIGINATOR)
.join("models_cache.json")
.exists()
);
Ok(())
}
#[tokio::test]
async fn turn_start_emits_user_message_item_with_text_elements() -> Result<()> {
let responses = vec![create_final_assistant_message_sse_response("Done")?];

View File

@@ -49,7 +49,6 @@ codex-utils-absolute-path = { workspace = true }
codex-utils-home-dir = { workspace = true }
codex-utils-pty = { workspace = true }
codex-utils-readiness = { workspace = true }
codex-utils-sanitizer = { workspace = true }
codex-utils-string = { workspace = true }
codex-windows-sandbox = { package = "codex-windows-sandbox", path = "../windows-sandbox-rs" }
dirs = { workspace = true }
@@ -68,6 +67,7 @@ notify = { workspace = true }
once_cell = { workspace = true }
os_info = { workspace = true }
rand = { workspace = true }
regex = { workspace = true }
regex-lite = { workspace = true }
reqwest = { workspace = true, features = ["json", "stream"] }
rmcp = { workspace = true, default-features = false, features = [

View File

@@ -1411,27 +1411,6 @@ impl Session {
state.clear_mcp_tool_selection();
}
// Merges connector IDs into the session-level explicit connector selection.
pub(crate) async fn merge_connector_selection(
&self,
connector_ids: HashSet<String>,
) -> HashSet<String> {
let mut state = self.state.lock().await;
state.merge_connector_selection(connector_ids)
}
// Returns the connector IDs currently selected for this session.
pub(crate) async fn get_connector_selection(&self) -> HashSet<String> {
let state = self.state.lock().await;
state.get_connector_selection()
}
// Clears connector IDs that were accumulated for explicit selection.
pub(crate) async fn clear_connector_selection(&self) {
let mut state = self.state.lock().await;
state.clear_connector_selection();
}
async fn record_initial_history(&self, conversation_history: InitialHistory) {
let turn_context = self.new_default_turn().await;
match conversation_history {
@@ -4077,7 +4056,7 @@ pub(crate) async fn run_turn(
.await,
);
let available_connectors = if turn_context.config.features.enabled(Feature::Apps) {
let connector_slug_counts = if turn_context.config.features.enabled(Feature::Apps) {
let mcp_tools = match sess
.services
.mcp_connection_manager
@@ -4090,16 +4069,11 @@ pub(crate) async fn run_turn(
Ok(mcp_tools) => mcp_tools,
Err(_) => return None,
};
connectors::accessible_connectors_from_mcp_tools(&mcp_tools)
let connectors = connectors::accessible_connectors_from_mcp_tools(&mcp_tools);
build_connector_slug_counts(&connectors)
} else {
Vec::new()
HashMap::new()
};
let connector_slug_counts = build_connector_slug_counts(&available_connectors);
let skill_name_counts_lower = skills_outcome
.as_ref()
.map_or_else(HashMap::new, |outcome| {
build_skill_name_counts(&outcome.skills, &outcome.disabled_paths).1
});
let mentioned_skills = skills_outcome.as_ref().map_or_else(Vec::new, |outcome| {
collect_explicit_skill_mentions(
&input,
@@ -4108,6 +4082,7 @@ pub(crate) async fn run_turn(
&connector_slug_counts,
)
});
let explicitly_enabled_connectors = collect_explicit_app_ids(&input);
let config = turn_context.config.clone();
if config
.features
@@ -4144,15 +4119,6 @@ pub(crate) async fn run_turn(
.await;
}
let mut explicitly_enabled_connectors = collect_explicit_app_ids(&input);
explicitly_enabled_connectors.extend(collect_explicit_app_ids_from_skill_items(
&skill_items,
&available_connectors,
&skill_name_counts_lower,
));
sess.merge_connector_selection(explicitly_enabled_connectors.clone())
.await;
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input.clone());
let response_item: ResponseItem = initial_input_for_turn.clone().into();
sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), &input, response_item)
@@ -4383,57 +4349,6 @@ async fn run_auto_compact(sess: &Arc<Session>, turn_context: &Arc<TurnContext>)
Ok(())
}
fn collect_explicit_app_ids_from_skill_items(
skill_items: &[ResponseItem],
connectors: &[connectors::AppInfo],
skill_name_counts_lower: &HashMap<String, usize>,
) -> HashSet<String> {
if skill_items.is_empty() || connectors.is_empty() {
return HashSet::new();
}
let skill_messages = skill_items
.iter()
.filter_map(|item| match item {
ResponseItem::Message { content, .. } => {
content.iter().find_map(|content_item| match content_item {
ContentItem::InputText { text } => Some(text.clone()),
_ => None,
})
}
_ => None,
})
.collect::<Vec<String>>();
if skill_messages.is_empty() {
return HashSet::new();
}
let mentions = collect_tool_mentions_from_messages(&skill_messages);
let mention_names_lower = mentions
.plain_names
.iter()
.map(|name| name.to_ascii_lowercase())
.collect::<HashSet<String>>();
let mut connector_ids = mentions
.paths
.iter()
.filter(|path| tool_kind_for_path(path) == ToolMentionKind::App)
.filter_map(|path| app_id_from_path(path).map(str::to_string))
.collect::<HashSet<String>>();
let connector_slug_counts = build_connector_slug_counts(connectors);
for connector in connectors {
let slug = connectors::connector_mention_slug(connector);
let connector_count = connector_slug_counts.get(&slug).copied().unwrap_or(0);
let skill_count = skill_name_counts_lower.get(&slug).copied().unwrap_or(0);
if connector_count == 1 && skill_count == 0 && mention_names_lower.contains(&slug) {
connector_ids.insert(connector.id.clone());
}
}
connector_ids
}
fn filter_connectors_for_input(
connectors: Vec<connectors::AppInfo>,
input: &[ResponseItem],
@@ -4679,9 +4594,6 @@ async fn built_tools(
.or_cancel(cancellation_token)
.await?;
let mut effective_explicitly_enabled_connectors = explicitly_enabled_connectors.clone();
effective_explicitly_enabled_connectors.extend(sess.get_connector_selection().await);
let connectors_for_tools = if turn_context.config.features.enabled(Feature::Apps) {
let skill_name_counts_lower = skills_outcome.map_or_else(HashMap::new, |outcome| {
build_skill_name_counts(&outcome.skills, &outcome.disabled_paths).1
@@ -4690,7 +4602,7 @@ async fn built_tools(
Some(filter_connectors_for_input(
connectors,
input,
&effective_explicitly_enabled_connectors,
explicitly_enabled_connectors,
&skill_name_counts_lower,
))
} else {
@@ -5530,18 +5442,6 @@ mod tests {
}
}
fn skill_message(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: text.to_string(),
}],
end_turn: None,
phase: None,
}
}
fn make_connector(id: &str, name: &str) -> AppInfo {
AppInfo {
id: id.to_string(),
@@ -5674,49 +5574,6 @@ mod tests {
assert_eq!(selected, Vec::new());
}
#[test]
fn collect_explicit_app_ids_from_skill_items_includes_linked_mentions() {
let connectors = vec![make_connector("calendar", "Calendar")];
let skill_items = vec![skill_message(
"<skill>\n<name>demo</name>\n<path>/tmp/skills/demo/SKILL.md</path>\nuse [$calendar](app://calendar)\n</skill>",
)];
let connector_ids =
collect_explicit_app_ids_from_skill_items(&skill_items, &connectors, &HashMap::new());
assert_eq!(connector_ids, HashSet::from(["calendar".to_string()]));
}
#[test]
fn collect_explicit_app_ids_from_skill_items_resolves_unambiguous_plain_mentions() {
let connectors = vec![make_connector("calendar", "Calendar")];
let skill_items = vec![skill_message(
"<skill>\n<name>demo</name>\n<path>/tmp/skills/demo/SKILL.md</path>\nuse $calendar\n</skill>",
)];
let connector_ids =
collect_explicit_app_ids_from_skill_items(&skill_items, &connectors, &HashMap::new());
assert_eq!(connector_ids, HashSet::from(["calendar".to_string()]));
}
#[test]
fn collect_explicit_app_ids_from_skill_items_skips_plain_mentions_with_skill_conflicts() {
let connectors = vec![make_connector("calendar", "Calendar")];
let skill_items = vec![skill_message(
"<skill>\n<name>demo</name>\n<path>/tmp/skills/demo/SKILL.md</path>\nuse $calendar\n</skill>",
)];
let skill_name_counts_lower = HashMap::from([("calendar".to_string(), 1)]);
let connector_ids = collect_explicit_app_ids_from_skill_items(
&skill_items,
&connectors,
&skill_name_counts_lower,
);
assert_eq!(connector_ids, HashSet::<String>::new());
}
#[test]
fn search_tool_selection_keeps_codex_apps_tools_without_mentions() {
let selected_tool_names = vec![

View File

@@ -4,20 +4,16 @@
//! - Phase 1: select rollouts, extract stage-1 raw memories, persist stage-1 outputs, and enqueue consolidation.
//! - Phase 2: claim a global consolidation lock, materialize consolidation inputs, and dispatch one consolidation agent.
mod dispatch;
mod phase1;
mod phase2;
pub(crate) mod prompts;
mod start;
mod stage_one;
mod startup;
mod storage;
#[cfg(test)]
mod tests;
/// Starts the memory startup pipeline for eligible root sessions.
/// This is the single entrypoint that `codex` uses to trigger memory startup.
///
/// This is the entry point to read and understand this module.
pub(crate) use start::start_memories_startup_task;
use std::path::Path;
use std::path::PathBuf;
mod artifacts {
pub(super) const ROLLOUT_SUMMARIES_SUBDIR: &str = "rollout_summaries";
@@ -26,12 +22,10 @@ mod artifacts {
/// Phase 1 (startup extraction).
mod phase_one {
/// Prompt used for phase 1.
pub(super) const PROMPT: &str = include_str!("../../templates/memories/stage_one_system.md");
/// Maximum number of rollout candidates processed per startup pass.
pub(super) const MAX_ROLLOUTS_PER_STARTUP: usize = 8;
pub(super) const MAX_ROLLOUTS_PER_STARTUP: usize = 64;
/// Concurrency cap for startup memory extraction and consolidation scheduling.
pub(super) const CONCURRENCY_LIMIT: usize = 8;
pub(super) const CONCURRENCY_LIMIT: usize = MAX_ROLLOUTS_PER_STARTUP;
/// Fallback stage-1 rollout truncation limit (tokens) when model metadata
/// does not include a valid context window.
pub(super) const DEFAULT_STAGE_ONE_ROLLOUT_TOKEN_LIMIT: usize = 150_000;
@@ -52,8 +46,6 @@ mod phase_one {
pub(super) const JOB_LEASE_SECONDS: i64 = 3_600;
/// Backoff delay (seconds) before retrying a failed stage-1 extraction job.
pub(super) const JOB_RETRY_DELAY_SECONDS: i64 = 3_600;
/// Maximum number of threads to scan.
pub(super) const THREAD_SCAN_LIMIT: usize = 5_000;
}
/// Phase 2 (aka `Consolidation`).
@@ -68,7 +60,7 @@ mod phase_two {
/// job.
pub(super) const JOB_RETRY_DELAY_SECONDS: i64 = 3_600;
/// Heartbeat interval (seconds) for phase-2 running jobs.
pub(super) const JOB_HEARTBEAT_SECONDS: u64 = 90;
pub(super) const JOB_HEARTBEAT_SECONDS: u64 = 30;
}
mod metrics {
@@ -82,9 +74,6 @@ mod metrics {
pub(super) const MEMORY_PHASE_TWO_INPUT: &str = "codex.memory.phase2.input";
}
use std::path::Path;
use std::path::PathBuf;
pub fn memory_root(codex_home: &Path) -> PathBuf {
codex_home.join("memories")
}
@@ -100,3 +89,8 @@ fn raw_memories_file(root: &Path) -> PathBuf {
async fn ensure_layout(root: &Path) -> std::io::Result<()> {
tokio::fs::create_dir_all(rollout_summaries_dir(root)).await
}
/// Starts the memory startup pipeline for eligible root sessions.
///
/// This is the single entrypoint that `codex` uses to trigger memory startup.
pub(crate) use startup::start_memories_startup_task;

View File

@@ -1,466 +0,0 @@
use crate::Prompt;
use crate::RolloutRecorder;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::error::CodexErr;
use crate::memories::metrics;
use crate::memories::phase_one;
use crate::memories::prompts::build_stage_one_input_message;
use crate::rollout::INTERACTIVE_SESSION_SOURCES;
use crate::rollout::policy::should_persist_response_item_for_memories;
use codex_api::ResponseEvent;
use codex_otel::OtelManager;
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
use codex_protocol::models::BaseInstructions;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::openai_models::ModelInfo;
use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
use codex_protocol::protocol::RolloutItem;
use codex_utils_sanitizer::redact_secrets;
use futures::StreamExt;
use serde::Deserialize;
use serde_json::Value;
use serde_json::json;
use std::path::Path;
use std::sync::Arc;
use tracing::info;
use tracing::warn;
#[derive(Clone, Debug)]
pub(in crate::memories) struct Phase1RequestContext {
pub(in crate::memories) model_info: ModelInfo,
pub(in crate::memories) otel_manager: OtelManager,
pub(in crate::memories) reasoning_effort: Option<ReasoningEffortConfig>,
pub(in crate::memories) reasoning_summary: ReasoningSummaryConfig,
pub(in crate::memories) turn_metadata_header: Option<String>,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum PhaseOneJobOutcome {
SucceededWithOutput,
SucceededNoOutput,
Failed,
}
struct PhaseOneOutcomeCounts {
claimed: usize,
succeeded_with_output: usize,
succeeded_no_output: usize,
failed: usize,
}
/// Phase 1 model output payload.
#[derive(Debug, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
struct StageOneOutput {
/// Detailed markdown raw memory for a single rollout.
#[serde(rename = "raw_memory")]
pub(crate) raw_memory: String,
/// Compact summary line used for routing and indexing.
#[serde(rename = "rollout_summary")]
pub(crate) rollout_summary: String,
/// Optional slug accepted from stage-1 output for forward compatibility.
/// This is currently ignored by downstream storage and naming, which remain thread-id based.
#[serde(default, rename = "rollout_slug")]
pub(crate) _rollout_slug: Option<String>,
}
/// Runs memory phase 1 in strict step order:
/// 1) claim eligible rollout jobs
/// 2) build one stage-1 request context
/// 3) run stage-1 extraction jobs in parallel
/// 4) emit metrics and logs
pub(in crate::memories) async fn run(session: &Arc<Session>) {
// 1. Claim startup job.
let Some(claimed_candidates) = claim_startup_jobs(session).await else {
return;
};
if claimed_candidates.is_empty() {
session.services.otel_manager.counter(
metrics::MEMORY_PHASE_ONE_JOBS,
1,
&[("status", "skipped_no_candidates")],
);
return;
}
// 2. Build request.
let stage_one_context = build_request_context(session).await;
// 3. Run the parallel sampling.
let outcomes = run_jobs(session, claimed_candidates, stage_one_context).await;
// 4. Metrics and logs.
let counts = count_outcomes(outcomes);
emit_metrics(session, &counts);
info!(
"memory stage-1 extraction complete: {} job(s) claimed, {} succeeded ({} with output, {} no output), {} failed",
counts.claimed,
counts.succeeded_with_output + counts.succeeded_no_output,
counts.succeeded_with_output,
counts.succeeded_no_output,
counts.failed
);
}
/// JSON schema used to constrain phase-1 model output.
pub fn output_schema() -> Value {
json!({
"type": "object",
"properties": {
"rollout_summary": { "type": "string" },
"rollout_slug": { "type": "string" },
"raw_memory": { "type": "string" }
},
"required": ["rollout_summary", "rollout_slug", "raw_memory"],
"additionalProperties": false
})
}
impl Phase1RequestContext {
pub(in crate::memories) fn from_turn_context(
turn_context: &TurnContext,
turn_metadata_header: Option<String>,
) -> Self {
Self {
model_info: turn_context.model_info.clone(),
otel_manager: turn_context.otel_manager.clone(),
reasoning_effort: turn_context.reasoning_effort,
reasoning_summary: turn_context.reasoning_summary,
turn_metadata_header,
}
}
}
async fn claim_startup_jobs(session: &Arc<Session>) -> Option<Vec<codex_state::Stage1JobClaim>> {
let Some(state_db) = session.services.state_db.as_deref() else {
// This should not happen.
warn!("state db unavailable while claiming phase-1 startup jobs; skipping");
return None;
};
let allowed_sources = INTERACTIVE_SESSION_SOURCES
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>();
match state_db
.claim_stage1_jobs_for_startup(
session.conversation_id,
codex_state::Stage1StartupClaimParams {
scan_limit: phase_one::THREAD_SCAN_LIMIT,
max_claimed: phase_one::MAX_ROLLOUTS_PER_STARTUP,
max_age_days: phase_one::MAX_ROLLOUT_AGE_DAYS,
min_rollout_idle_hours: phase_one::MIN_ROLLOUT_IDLE_HOURS,
allowed_sources: allowed_sources.as_slice(),
lease_seconds: phase_one::JOB_LEASE_SECONDS,
},
)
.await
{
Ok(claims) => Some(claims),
Err(err) => {
warn!("state db claim_stage1_jobs_for_startup failed during memories startup: {err}");
session.services.otel_manager.counter(
metrics::MEMORY_PHASE_ONE_JOBS,
1,
&[("status", "failed_claim")],
);
None
}
}
}
async fn build_request_context(session: &Arc<Session>) -> Phase1RequestContext {
let turn_context = session.new_default_turn().await;
Phase1RequestContext::from_turn_context(
turn_context.as_ref(),
turn_context.resolve_turn_metadata_header().await,
)
}
async fn run_jobs(
session: &Arc<Session>,
claimed_candidates: Vec<codex_state::Stage1JobClaim>,
stage_one_context: Phase1RequestContext,
) -> Vec<PhaseOneJobOutcome> {
futures::stream::iter(claimed_candidates.into_iter())
.map(|claim| {
let session = Arc::clone(session);
let stage_one_context = stage_one_context.clone();
async move { job::run(session.as_ref(), claim, &stage_one_context).await }
})
.buffer_unordered(phase_one::CONCURRENCY_LIMIT)
.collect::<Vec<_>>()
.await
}
mod job {
use super::*;
pub(in crate::memories) async fn run(
session: &Session,
claim: codex_state::Stage1JobClaim,
stage_one_context: &Phase1RequestContext,
) -> PhaseOneJobOutcome {
let thread = claim.thread;
let stage_one_output = match sample(
session,
&thread.rollout_path,
&thread.cwd,
stage_one_context,
)
.await
{
Ok(output) => output,
Err(reason) => {
result::failed(
session,
thread.id,
&claim.ownership_token,
&reason.to_string(),
)
.await;
return PhaseOneJobOutcome::Failed;
}
};
if stage_one_output.raw_memory.is_empty() || stage_one_output.rollout_summary.is_empty() {
return result::no_output(session, thread.id, &claim.ownership_token).await;
}
result::success(
session,
thread.id,
&claim.ownership_token,
thread.updated_at.timestamp(),
&stage_one_output.raw_memory,
&stage_one_output.rollout_summary,
)
.await
}
/// Extract the rollout and perform the actual sampling.
async fn sample(
session: &Session,
rollout_path: &Path,
rollout_cwd: &Path,
stage_one_context: &Phase1RequestContext,
) -> anyhow::Result<StageOneOutput> {
let (rollout_items, _, _) = RolloutRecorder::load_rollout_items(rollout_path).await?;
let rollout_contents = serialize_filtered_rollout_response_items(&rollout_items)?;
let prompt = Prompt {
input: vec![ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: build_stage_one_input_message(
&stage_one_context.model_info,
rollout_path,
rollout_cwd,
&rollout_contents,
)?,
}],
end_turn: None,
phase: None,
}],
tools: Vec::new(),
parallel_tool_calls: false,
base_instructions: BaseInstructions {
text: phase_one::PROMPT.to_string(),
},
personality: None,
output_schema: Some(output_schema()),
};
let mut client_session = session.services.model_client.new_session();
let mut stream = client_session
.stream(
&prompt,
&stage_one_context.model_info,
&stage_one_context.otel_manager,
stage_one_context.reasoning_effort,
stage_one_context.reasoning_summary,
stage_one_context.turn_metadata_header.as_deref(),
)
.await?;
// TODO(jif) we should have a shared helper somewhere for this.
// Unwrap the stream.
let mut result = String::new();
while let Some(message) = stream.next().await.transpose()? {
match message {
ResponseEvent::OutputTextDelta(delta) => result.push_str(&delta),
ResponseEvent::OutputItemDone(item) => {
if result.is_empty()
&& let ResponseItem::Message { content, .. } = item
&& let Some(text) = crate::compact::content_items_to_text(&content)
{
result.push_str(&text);
}
}
ResponseEvent::Completed { .. } => break,
_ => {}
}
}
let mut output: StageOneOutput = serde_json::from_str(&result)?;
output.raw_memory = redact_secrets(output.raw_memory);
output.rollout_summary = redact_secrets(output.rollout_summary);
Ok(output)
}
mod result {
use super::*;
pub(in crate::memories) async fn failed(
session: &Session,
thread_id: codex_protocol::ThreadId,
ownership_token: &str,
reason: &str,
) {
tracing::warn!("Phase 1 job failed for thread {thread_id}: {reason}");
if let Some(state_db) = session.services.state_db.as_deref() {
let _ = state_db
.mark_stage1_job_failed(
thread_id,
ownership_token,
reason,
phase_one::JOB_RETRY_DELAY_SECONDS,
)
.await;
}
}
pub(in crate::memories) async fn no_output(
session: &Session,
thread_id: codex_protocol::ThreadId,
ownership_token: &str,
) -> PhaseOneJobOutcome {
let Some(state_db) = session.services.state_db.as_deref() else {
return PhaseOneJobOutcome::Failed;
};
if state_db
.mark_stage1_job_succeeded_no_output(thread_id, ownership_token)
.await
.unwrap_or(false)
{
PhaseOneJobOutcome::SucceededNoOutput
} else {
PhaseOneJobOutcome::Failed
}
}
pub(in crate::memories) async fn success(
session: &Session,
thread_id: codex_protocol::ThreadId,
ownership_token: &str,
source_updated_at: i64,
raw_memory: &str,
rollout_summary: &str,
) -> PhaseOneJobOutcome {
let Some(state_db) = session.services.state_db.as_deref() else {
return PhaseOneJobOutcome::Failed;
};
if state_db
.mark_stage1_job_succeeded(
thread_id,
ownership_token,
source_updated_at,
raw_memory,
rollout_summary,
)
.await
.unwrap_or(false)
{
PhaseOneJobOutcome::SucceededWithOutput
} else {
PhaseOneJobOutcome::Failed
}
}
}
/// Serializes filtered stage-1 memory items for prompt inclusion.
fn serialize_filtered_rollout_response_items(
items: &[RolloutItem],
) -> crate::error::Result<String> {
let filtered = items
.iter()
.filter_map(|item| {
if let RolloutItem::ResponseItem(item) = item
&& should_persist_response_item_for_memories(item)
{
Some(item.clone())
} else {
None
}
})
.collect::<Vec<_>>();
serde_json::to_string(&filtered).map_err(|err| {
CodexErr::InvalidRequest(format!("failed to serialize rollout memory: {err}"))
})
}
}
fn count_outcomes(outcomes: Vec<PhaseOneJobOutcome>) -> PhaseOneOutcomeCounts {
let succeeded_with_output = outcomes
.iter()
.filter(|outcome| matches!(outcome, PhaseOneJobOutcome::SucceededWithOutput))
.count();
let succeeded_no_output = outcomes
.iter()
.filter(|outcome| matches!(outcome, PhaseOneJobOutcome::SucceededNoOutput))
.count();
let failed = outcomes
.iter()
.filter(|outcome| matches!(outcome, PhaseOneJobOutcome::Failed))
.count();
PhaseOneOutcomeCounts {
claimed: outcomes.len(),
succeeded_with_output,
succeeded_no_output,
failed,
}
}
fn emit_metrics(session: &Session, counts: &PhaseOneOutcomeCounts) {
if counts.claimed > 0 {
session.services.otel_manager.counter(
metrics::MEMORY_PHASE_ONE_JOBS,
counts.claimed as i64,
&[("status", "claimed")],
);
}
if counts.succeeded_with_output > 0 {
session.services.otel_manager.counter(
metrics::MEMORY_PHASE_ONE_JOBS,
counts.succeeded_with_output as i64,
&[("status", "succeeded")],
);
session.services.otel_manager.counter(
metrics::MEMORY_PHASE_ONE_OUTPUT,
counts.succeeded_with_output as i64,
&[],
);
}
if counts.succeeded_no_output > 0 {
session.services.otel_manager.counter(
metrics::MEMORY_PHASE_ONE_JOBS,
counts.succeeded_no_output as i64,
&[("status", "succeeded_no_output")],
);
}
if counts.failed > 0 {
session.services.otel_manager.counter(
metrics::MEMORY_PHASE_ONE_JOBS,
counts.failed as i64,
&[("status", "failed")],
);
}
}

View File

@@ -30,6 +30,7 @@ struct MemoryToolDeveloperInstructionsTemplate<'a> {
}
/// Builds the consolidation subagent prompt for a specific memory root.
///
pub(super) fn build_consolidation_prompt(memory_root: &Path) -> String {
let memory_root = memory_root.display().to_string();
let template = ConsolidationPromptTemplate {
@@ -73,9 +74,6 @@ pub(super) fn build_stage_one_input_message(
.render()?)
}
/// Build prompt used for read path. This prompt must be added to the developer instructions. In
/// case of large memory files, the `memory_summary.md` is truncated at
/// [phase_one::MEMORY_TOOL_DEVELOPER_INSTRUCTIONS_SUMMARY_TOKEN_LIMIT].
pub(crate) async fn build_memory_tool_developer_instructions(codex_home: &Path) -> Option<String> {
let base_path = memory_root(codex_home);
let memory_summary_path = base_path.join("memory_summary.md");

View File

@@ -0,0 +1,210 @@
use crate::error::CodexErr;
use crate::error::Result;
use once_cell::sync::Lazy;
use regex::Regex;
use serde::Deserialize;
use serde_json::Value;
use serde_json::json;
/// System prompt for stage-1 raw memory extraction.
pub(super) const RAW_MEMORY_PROMPT: &str =
include_str!("../../templates/memories/stage_one_system.md");
static OPENAI_KEY_REGEX: Lazy<Regex> = Lazy::new(|| compile_regex(r"sk-[A-Za-z0-9]{20,}"));
static AWS_ACCESS_KEY_ID_REGEX: Lazy<Regex> = Lazy::new(|| compile_regex(r"\bAKIA[0-9A-Z]{16}\b"));
static BEARER_TOKEN_REGEX: Lazy<Regex> =
Lazy::new(|| compile_regex(r"(?i)\bBearer\s+[A-Za-z0-9._\-]{16,}\b"));
static SECRET_ASSIGNMENT_REGEX: Lazy<Regex> = Lazy::new(|| {
compile_regex(r#"(?i)\b(api[_-]?key|token|secret|password)\b(\s*[:=]\s*)(["']?)[^\s"']{8,}"#)
});
/// Parsed stage-1 model output payload.
#[derive(Debug, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub(super) struct StageOneOutput {
/// Detailed markdown raw memory for a single rollout.
#[serde(rename = "raw_memory")]
pub(crate) raw_memory: String,
/// Compact summary line used for routing and indexing.
#[serde(rename = "rollout_summary")]
pub(crate) rollout_summary: String,
/// Optional slug accepted from stage-1 output for forward compatibility.
///
/// This is currently ignored by downstream storage and naming, which remain
/// thread-id based.
#[serde(default, rename = "rollout_slug")]
pub(crate) _rollout_slug: Option<String>,
}
/// JSON schema used to constrain stage-1 model output.
pub(super) fn stage_one_output_schema() -> Value {
json!({
"type": "object",
"properties": {
"rollout_summary": { "type": "string" },
"rollout_slug": { "type": "string" },
"raw_memory": { "type": "string" }
},
"required": ["rollout_summary", "rollout_slug", "raw_memory"],
"additionalProperties": false
})
}
/// Parses and normalizes stage-1 model output into a typed payload.
///
/// Accepts plain JSON objects, fenced JSON, and object snippets embedded in
/// extra text, then enforces redaction and size limits.
pub(super) fn parse_stage_one_output(raw: &str) -> Result<StageOneOutput> {
let parsed = parse_json_object_loose(raw)?;
let output: StageOneOutput = serde_json::from_value(parsed).map_err(|err| {
CodexErr::InvalidRequest(format!("invalid stage-1 memory output JSON payload: {err}"))
})?;
normalize_stage_one_output(output)
}
fn parse_json_object_loose(raw: &str) -> Result<Value> {
let raw = raw.trim();
if let Ok(value) = serde_json::from_str::<Value>(raw)
&& value.is_object()
{
return Ok(value);
}
if let Some(fenced) = raw
.strip_prefix("```json")
.and_then(|s| s.strip_suffix("```"))
.map(str::trim)
&& let Ok(value) = serde_json::from_str::<Value>(fenced)
&& value.is_object()
{
return Ok(value);
}
if let Some(fenced) = raw
.strip_prefix("```")
.and_then(|s| s.strip_suffix("```"))
.map(str::trim)
&& let Ok(value) = serde_json::from_str::<Value>(fenced)
&& value.is_object()
{
return Ok(value);
}
if let (Some(start), Some(end)) = (raw.find('{'), raw.rfind('}'))
&& start < end
{
let snippet = &raw[start..=end];
if let Ok(value) = serde_json::from_str::<Value>(snippet)
&& value.is_object()
{
return Ok(value);
}
}
Err(CodexErr::InvalidRequest(
"unable to parse stage-1 memory JSON output".to_string(),
))
}
fn normalize_stage_one_output(mut output: StageOneOutput) -> Result<StageOneOutput> {
output.raw_memory = output.raw_memory.trim().to_string();
output.rollout_summary = output.rollout_summary.trim().to_string();
output._rollout_slug = output
._rollout_slug
.map(|slug| slug.trim().to_string())
.filter(|slug| !slug.is_empty());
if output.raw_memory.is_empty() && output.rollout_summary.is_empty() {
// Empty pair is a deliberate "no meaningful signal" sentinel.
return Ok(output);
}
if output.raw_memory.is_empty() {
return Err(CodexErr::InvalidRequest(
"stage-1 memory output missing raw_memory".to_string(),
));
}
if output.rollout_summary.is_empty() {
return Err(CodexErr::InvalidRequest(
"stage-1 memory output missing rollout_summary".to_string(),
));
}
output.raw_memory = redact_secrets(&output.raw_memory);
output.rollout_summary = redact_secrets(&output.rollout_summary);
Ok(output)
}
fn redact_secrets(input: &str) -> String {
let redacted = OPENAI_KEY_REGEX.replace_all(input, "[REDACTED_SECRET]");
let redacted = AWS_ACCESS_KEY_ID_REGEX.replace_all(&redacted, "[REDACTED_SECRET]");
let redacted = BEARER_TOKEN_REGEX.replace_all(&redacted, "Bearer [REDACTED_SECRET]");
SECRET_ASSIGNMENT_REGEX
.replace_all(&redacted, "$1$2$3[REDACTED_SECRET]")
.to_string()
}
fn compile_regex(pattern: &str) -> Regex {
match Regex::new(pattern) {
Ok(regex) => regex,
// Panic is ok thanks to `load_regex` test.
Err(err) => panic!("invalid regex pattern `{pattern}`: {err}"),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn load_regex() {
// The goal of this test is just to compile all the regex to prevent the panic
let _ = redact_secrets("secret");
}
#[test]
fn normalize_stage_one_output_redacts_summary() {
let output = StageOneOutput {
raw_memory: "Token: sk-abcdefghijklmnopqrstuvwxyz123456\nBearer abcdefghijklmnopqrstuvwxyz012345".to_string(),
rollout_summary: "password = mysecret123456\n\nsmall".to_string(),
_rollout_slug: None,
};
let normalized = normalize_stage_one_output(output).expect("normalized");
assert!(normalized.raw_memory.contains("[REDACTED_SECRET]"));
assert!(!normalized.rollout_summary.contains("mysecret123456"));
assert_eq!(
normalized.rollout_summary,
"password = [REDACTED_SECRET]\n\nsmall"
);
}
#[test]
fn normalize_stage_one_output_allows_empty_pair_for_skip() {
let output = StageOneOutput {
raw_memory: String::new(),
rollout_summary: String::new(),
_rollout_slug: None,
};
let normalized = normalize_stage_one_output(output).expect("normalized");
assert_eq!(normalized.raw_memory, "");
assert_eq!(normalized.rollout_summary, "");
}
#[test]
fn normalize_stage_one_output_rejects_partial_empty_values() {
let output = StageOneOutput {
raw_memory: String::new(),
rollout_summary: "summary".to_string(),
_rollout_slug: None,
};
let err = normalize_stage_one_output(output).expect_err("should reject");
assert_eq!(err.to_string(), "stage-1 memory output missing raw_memory");
}
}

View File

@@ -1,41 +0,0 @@
use crate::codex::Session;
use crate::config::Config;
use crate::features::Feature;
use crate::memories::phase1;
use codex_protocol::protocol::SessionSource;
use std::sync::Arc;
use tracing::warn;
/// Starts the asynchronous startup memory pipeline for an eligible root session.
///
/// The pipeline is skipped for ephemeral sessions, disabled feature flags, and
/// subagent sessions.
pub(crate) fn start_memories_startup_task(
session: &Arc<Session>,
config: Arc<Config>,
source: &SessionSource,
) {
if config.ephemeral
|| !config.features.enabled(Feature::MemoryTool)
|| matches!(source, SessionSource::SubAgent(_))
{
return;
}
if session.services.state_db.is_none() {
warn!("state db unavailable for memories startup pipeline; skipping");
return;
}
let weak_session = Arc::downgrade(session);
tokio::spawn(async move {
let Some(session) = weak_session.upgrade() else {
return;
};
// Run phase 1.
phase1::run(&session).await;
// Run phase 2.
crate::memories::dispatch::run_global_memory_consolidation(&session, config).await;
});
}

View File

@@ -4,8 +4,8 @@ use crate::config::Constrained;
use crate::memories::memory_root;
use crate::memories::metrics;
use crate::memories::phase_two;
use crate::memories::phase2::spawn_phase2_completion_task;
use crate::memories::prompts::build_consolidation_prompt;
use crate::memories::startup::phase2::spawn_phase2_completion_task;
use crate::memories::storage::rebuild_raw_memories_file_from_memories;
use crate::memories::storage::sync_rollout_summaries_from_memories;
use codex_protocol::protocol::AskForApproval;
@@ -19,8 +19,6 @@ use tracing::debug;
use tracing::info;
use tracing::warn;
//TODO(jif) clean.
fn completion_watermark(
claimed_watermark: i64,
latest_memories: &[codex_state::Stage1Output],
@@ -33,7 +31,7 @@ fn completion_watermark(
.max(claimed_watermark)
}
pub(in crate::memories) async fn run_global_memory_consolidation(
pub(super) async fn run_global_memory_consolidation(
session: &Arc<Session>,
config: Arc<Config>,
) -> bool {
@@ -263,6 +261,7 @@ pub(in crate::memories) async fn run_global_memory_consolidation(
#[cfg(test)]
mod tests {
use super::completion_watermark;
use super::memory_root;
use super::run_global_memory_consolidation;
use crate::CodexAuth;
use crate::ThreadManager;
@@ -271,7 +270,6 @@ mod tests {
use crate::codex::make_session_and_context;
use crate::config::Config;
use crate::config::test_config;
use crate::memories::memory_root;
use crate::memories::raw_memories_file;
use crate::memories::rollout_summaries_dir;
use chrono::Utc;
@@ -284,7 +282,6 @@ mod tests {
use codex_state::Stage1Output;
use codex_state::ThreadMetadataBuilder;
use pretty_assertions::assert_eq;
use std::path::PathBuf;
use std::sync::Arc;
use tempfile::TempDir;
@@ -403,7 +400,6 @@ mod tests {
.expect("valid source_updated_at timestamp"),
raw_memory: "raw memory".to_string(),
rollout_summary: "rollout summary".to_string(),
cwd: PathBuf::from("/tmp/workspace"),
generated_at: chrono::DateTime::<Utc>::from_timestamp(124, 0)
.expect("valid generated_at timestamp"),
};

View File

@@ -0,0 +1,199 @@
use crate::client_common::Prompt;
use crate::client_common::ResponseEvent;
use crate::client_common::ResponseStream;
use crate::codex::Session;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
use crate::rollout::RolloutRecorder;
use codex_protocol::models::BaseInstructions;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use futures::StreamExt;
use tracing::warn;
use crate::memories::prompts::build_stage_one_input_message;
use crate::memories::stage_one::RAW_MEMORY_PROMPT;
use crate::memories::stage_one::StageOneOutput;
use crate::memories::stage_one::parse_stage_one_output;
use crate::memories::stage_one::stage_one_output_schema;
use crate::memories::startup::StageOneRequestContext;
use crate::rollout::policy::should_persist_response_item_for_memories;
use codex_protocol::protocol::RolloutItem;
use std::path::Path;
pub(super) async fn extract_stage_one_output(
session: &Session,
rollout_path: &Path,
rollout_cwd: &Path,
stage_one_context: &StageOneRequestContext,
) -> Result<StageOneOutput, &'static str> {
let (rollout_items, _thread_id, parse_errors) =
match RolloutRecorder::load_rollout_items(rollout_path).await {
Ok(result) => result,
Err(err) => {
warn!(
"failed to load rollout {} for memories: {err}",
rollout_path.display()
);
return Err("failed to load rollout");
}
};
if parse_errors > 0 {
warn!(
"rollout {} had {parse_errors} parse errors while preparing stage-1 memory input",
rollout_path.display()
);
}
let rollout_contents = match serialize_filtered_rollout_response_items(&rollout_items) {
Ok(contents) => contents,
Err(err) => {
warn!(
"failed to prepare filtered rollout payload {} for memories: {err}",
rollout_path.display()
);
return Err("failed to serialize filtered rollout");
}
};
let prompt = Prompt {
input: vec![ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: build_stage_one_input_message(
&stage_one_context.model_info,
rollout_path,
rollout_cwd,
&rollout_contents,
)
.map_err(|_e| "error while building the prompt")?,
}],
end_turn: None,
phase: None,
}],
tools: Vec::new(),
parallel_tool_calls: false,
base_instructions: BaseInstructions {
text: RAW_MEMORY_PROMPT.to_string(),
},
personality: None,
output_schema: Some(stage_one_output_schema()),
};
let mut client_session = session.services.model_client.new_session();
let mut stream = match client_session
.stream(
&prompt,
&stage_one_context.model_info,
&stage_one_context.otel_manager,
stage_one_context.reasoning_effort,
stage_one_context.reasoning_summary,
stage_one_context.turn_metadata_header.as_deref(),
)
.await
{
Ok(stream) => stream,
Err(err) => {
warn!(
"stage-1 memory request failed for rollout {}: {err}",
rollout_path.display()
);
return Err("stage-1 memory request failed");
}
};
let output_text = match collect_response_text_until_completed(&mut stream).await {
Ok(text) => text,
Err(err) => {
warn!(
"failed while waiting for stage-1 memory response for rollout {}: {err}",
rollout_path.display()
);
return Err("stage-1 memory response stream failed");
}
};
match parse_stage_one_output(&output_text) {
Ok(output) => Ok(output),
Err(err) => {
warn!(
"invalid stage-1 memory payload for rollout {}: {err}",
rollout_path.display()
);
Err("invalid stage-1 memory payload")
}
}
}
async fn collect_response_text_until_completed(stream: &mut ResponseStream) -> CodexResult<String> {
let mut output_text = String::new();
loop {
let Some(event) = stream.next().await else {
return Err(CodexErr::Stream(
"stream closed before response.completed".to_string(),
None,
));
};
match event? {
ResponseEvent::OutputTextDelta(delta) => output_text.push_str(&delta),
ResponseEvent::OutputItemDone(item) => {
if output_text.is_empty()
&& let ResponseItem::Message { content, .. } = item
&& let Some(text) = crate::compact::content_items_to_text(&content)
{
output_text.push_str(&text);
}
}
ResponseEvent::Completed { .. } => return Ok(output_text),
_ => {}
}
}
}
/// Serializes filtered stage-1 memory items for prompt inclusion.
fn serialize_filtered_rollout_response_items(
items: &[RolloutItem],
) -> crate::error::Result<String> {
let filtered = items
.iter()
.filter_map(|item| {
if let RolloutItem::ResponseItem(item) = item
&& should_persist_response_item_for_memories(item)
{
Some(item.clone())
} else {
None
}
})
.collect::<Vec<_>>();
serde_json::to_string(&filtered).map_err(|err| {
CodexErr::InvalidRequest(format!("failed to serialize rollout memory: {err}"))
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn serialize_filtered_rollout_response_items_keeps_response_items_only() {
let input = vec![RolloutItem::ResponseItem(ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "user input".to_string(),
}],
end_turn: None,
phase: None,
})];
let serialized = serialize_filtered_rollout_response_items(&input).expect("serialize");
let parsed: Vec<ResponseItem> = serde_json::from_str(&serialized).expect("deserialize");
pretty_assertions::assert_eq!(parsed.len(), 1);
assert!(matches!(parsed[0], ResponseItem::Message { .. }));
}
}

View File

@@ -0,0 +1,306 @@
mod dispatch;
mod extract;
mod phase2;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::config::Config;
use crate::error::Result as CodexResult;
use crate::features::Feature;
use crate::memories::metrics;
use crate::memories::phase_one;
use crate::rollout::INTERACTIVE_SESSION_SOURCES;
use codex_otel::OtelManager;
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
use codex_protocol::openai_models::ModelInfo;
use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
use codex_protocol::protocol::SessionSource;
use futures::StreamExt;
use std::sync::Arc;
use tracing::info;
use tracing::warn;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum PhaseOneJobOutcome {
SucceededWithOutput,
SucceededNoOutput,
Failed,
}
pub(super) const PHASE_ONE_THREAD_SCAN_LIMIT: usize = 5_000;
#[derive(Clone)]
struct StageOneRequestContext {
model_info: ModelInfo,
otel_manager: OtelManager,
reasoning_effort: Option<ReasoningEffortConfig>,
reasoning_summary: ReasoningSummaryConfig,
turn_metadata_header: Option<String>,
}
impl StageOneRequestContext {
fn from_turn_context(turn_context: &TurnContext, turn_metadata_header: Option<String>) -> Self {
Self {
model_info: turn_context.model_info.clone(),
otel_manager: turn_context.otel_manager.clone(),
reasoning_effort: turn_context.reasoning_effort,
reasoning_summary: turn_context.reasoning_summary,
turn_metadata_header,
}
}
}
/// Starts the asynchronous startup memory pipeline for an eligible root session.
///
/// The pipeline is skipped for ephemeral sessions, disabled feature flags, and
/// subagent sessions.
pub(crate) fn start_memories_startup_task(
session: &Arc<Session>,
config: Arc<Config>,
source: &SessionSource,
) {
if config.ephemeral
|| !config.features.enabled(Feature::MemoryTool)
|| matches!(source, SessionSource::SubAgent(_))
{
return;
}
let weak_session = Arc::downgrade(session);
tokio::spawn(async move {
let Some(session) = weak_session.upgrade() else {
return;
};
if let Err(err) = run_memories_startup_pipeline(&session, config).await {
warn!("memories startup pipeline failed: {err}");
}
});
}
/// Runs the startup memory pipeline.
///
/// Phase 1 selects rollout candidates, performs stage-1 extraction requests in
/// parallel, persists stage-1 outputs, and enqueues consolidation work.
///
/// Phase 2 claims a global consolidation lock and spawns one consolidation agent.
pub(super) async fn run_memories_startup_pipeline(
session: &Arc<Session>,
config: Arc<Config>,
) -> CodexResult<()> {
let otel_manager = &session.services.otel_manager;
let Some(state_db) = session.services.state_db.as_deref() else {
warn!("state db unavailable for memories startup pipeline; skipping");
otel_manager.counter(
metrics::MEMORY_PHASE_ONE_JOBS,
1,
&[("status", "skipped_state_db_unavailable")],
);
otel_manager.counter(
metrics::MEMORY_PHASE_ONE_JOBS,
1,
&[("status", "skipped_state_db_unavailable")],
);
return Ok(());
};
let allowed_sources = INTERACTIVE_SESSION_SOURCES
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>();
let claimed_candidates = match state_db
.claim_stage1_jobs_for_startup(
session.conversation_id,
codex_state::Stage1StartupClaimParams {
scan_limit: PHASE_ONE_THREAD_SCAN_LIMIT,
max_claimed: phase_one::MAX_ROLLOUTS_PER_STARTUP,
max_age_days: phase_one::MAX_ROLLOUT_AGE_DAYS,
min_rollout_idle_hours: phase_one::MIN_ROLLOUT_IDLE_HOURS,
allowed_sources: allowed_sources.as_slice(),
lease_seconds: phase_one::JOB_LEASE_SECONDS,
},
)
.await
{
Ok(claims) => claims,
Err(err) => {
warn!("state db claim_stage1_jobs_for_startup failed during memories startup: {err}");
otel_manager.counter(
metrics::MEMORY_PHASE_ONE_JOBS,
1,
&[("status", "failed_claim")],
);
Vec::new()
}
};
let claimed_count = claimed_candidates.len();
if claimed_count == 0 {
otel_manager.counter(
metrics::MEMORY_PHASE_ONE_JOBS,
1,
&[("status", "skipped_no_candidates")],
);
}
let mut phase_one_outcomes = Vec::new();
if claimed_count > 0 {
let turn_context = session.new_default_turn().await;
let stage_one_context = StageOneRequestContext::from_turn_context(
turn_context.as_ref(),
turn_context.resolve_turn_metadata_header().await,
);
phase_one_outcomes = futures::stream::iter(claimed_candidates.into_iter())
.map(|claim| {
let session = Arc::clone(session);
let stage_one_context = stage_one_context.clone();
async move {
let thread = claim.thread;
let stage_one_output = match extract::extract_stage_one_output(
session.as_ref(),
&thread.rollout_path,
&thread.cwd,
&stage_one_context,
)
.await
{
Ok(output) => output,
Err(reason) => {
if let Some(state_db) = session.services.state_db.as_deref() {
let _ = state_db
.mark_stage1_job_failed(
thread.id,
&claim.ownership_token,
reason,
phase_one::JOB_RETRY_DELAY_SECONDS,
)
.await;
}
return PhaseOneJobOutcome::Failed;
}
};
let Some(state_db) = session.services.state_db.as_deref() else {
return PhaseOneJobOutcome::Failed;
};
if stage_one_output.raw_memory.is_empty()
&& stage_one_output.rollout_summary.is_empty()
{
return if state_db
.mark_stage1_job_succeeded_no_output(thread.id, &claim.ownership_token)
.await
.unwrap_or(false)
{
PhaseOneJobOutcome::SucceededNoOutput
} else {
PhaseOneJobOutcome::Failed
};
}
if state_db
.mark_stage1_job_succeeded(
thread.id,
&claim.ownership_token,
thread.updated_at.timestamp(),
&stage_one_output.raw_memory,
&stage_one_output.rollout_summary,
)
.await
.unwrap_or(false)
{
PhaseOneJobOutcome::SucceededWithOutput
} else {
PhaseOneJobOutcome::Failed
}
}
})
.buffer_unordered(phase_one::CONCURRENCY_LIMIT)
.collect::<Vec<PhaseOneJobOutcome>>()
.await;
}
let succeeded_with_output_count = phase_one_outcomes
.iter()
.filter(|outcome| matches!(outcome, PhaseOneJobOutcome::SucceededWithOutput))
.count();
let succeeded_no_output_count = phase_one_outcomes
.iter()
.filter(|outcome| matches!(outcome, PhaseOneJobOutcome::SucceededNoOutput))
.count();
let failed_count = phase_one_outcomes
.iter()
.filter(|outcome| matches!(outcome, PhaseOneJobOutcome::Failed))
.count();
let succeeded_count = succeeded_with_output_count + succeeded_no_output_count;
if claimed_count > 0 {
otel_manager.counter(
metrics::MEMORY_PHASE_ONE_JOBS,
claimed_count as i64,
&[("status", "claimed")],
);
}
if succeeded_with_output_count > 0 {
otel_manager.counter(
metrics::MEMORY_PHASE_ONE_JOBS,
succeeded_with_output_count as i64,
&[("status", "succeeded")],
);
otel_manager.counter(
metrics::MEMORY_PHASE_ONE_OUTPUT,
succeeded_with_output_count as i64,
&[],
);
}
if succeeded_no_output_count > 0 {
otel_manager.counter(
metrics::MEMORY_PHASE_ONE_JOBS,
succeeded_no_output_count as i64,
&[("status", "succeeded_no_output")],
);
}
if failed_count > 0 {
otel_manager.counter(
metrics::MEMORY_PHASE_ONE_JOBS,
failed_count as i64,
&[("status", "failed")],
);
}
info!(
"memory stage-1 extraction complete: {} job(s) claimed, {} succeeded ({} with output, {} no output), {} failed",
claimed_count,
succeeded_count,
succeeded_with_output_count,
succeeded_no_output_count,
failed_count
);
let consolidation_job_count =
usize::from(dispatch::run_global_memory_consolidation(session, config).await);
info!(
"memory consolidation dispatch complete: {} job(s) scheduled",
consolidation_job_count
);
Ok(())
}
#[cfg(test)]
mod tests {
use super::run_memories_startup_pipeline;
use crate::codex::make_session_and_context;
use crate::config::test_config;
use std::sync::Arc;
#[tokio::test]
async fn startup_pipeline_is_noop_when_state_db_is_unavailable() {
let (session, _turn_context) = make_session_and_context().await;
let session = Arc::new(session);
let config = Arc::new(test_config());
run_memories_startup_pipeline(&session, config)
.await
.expect("startup pipeline should skip cleanly without state db");
}
}

View File

@@ -11,7 +11,7 @@ use tracing::debug;
use tracing::info;
use tracing::warn;
pub(in crate::memories) fn spawn_phase2_completion_task(
pub(super) fn spawn_phase2_completion_task(
session: &Session,
ownership_token: String,
completion_watermark: i64,

View File

@@ -9,8 +9,6 @@ use crate::memories::phase_two;
use crate::memories::raw_memories_file;
use crate::memories::rollout_summaries_dir;
//TODO(jif) clean.
/// Rebuild `raw_memories.md` from DB-backed stage-1 outputs.
pub(super) async fn rebuild_raw_memories_file_from_memories(
root: &Path,
@@ -84,8 +82,6 @@ async fn rebuild_raw_memories_file(root: &Path, memories: &[Stage1Output]) -> st
memory.source_updated_at.to_rfc3339()
)
.map_err(|err| std::io::Error::other(format!("format raw memories: {err}")))?;
writeln!(body, "cwd: {}", memory.cwd.display())
.map_err(|err| std::io::Error::other(format!("format raw memories: {err}")))?;
writeln!(body)
.map_err(|err| std::io::Error::other(format!("format raw memories: {err}")))?;
body.push_str(memory.raw_memory.trim());
@@ -140,8 +136,6 @@ async fn write_rollout_summary_for_thread(
memory.source_updated_at.to_rfc3339()
)
.map_err(|err| std::io::Error::other(format!("format rollout summary: {err}")))?;
writeln!(body, "cwd: {}", memory.cwd.display())
.map_err(|err| std::io::Error::other(format!("format rollout summary: {err}")))?;
writeln!(body)
.map_err(|err| std::io::Error::other(format!("format rollout summary: {err}")))?;
body.push_str(&memory.rollout_summary);

View File

@@ -1,3 +1,4 @@
use super::stage_one::parse_stage_one_output;
use super::storage::rebuild_raw_memories_file_from_memories;
use super::storage::sync_rollout_summaries_from_memories;
use crate::memories::ensure_layout;
@@ -10,7 +11,6 @@ use codex_protocol::ThreadId;
use codex_state::Stage1Output;
use pretty_assertions::assert_eq;
use serde_json::Value;
use std::path::PathBuf;
use tempfile::tempdir;
#[test]
@@ -20,9 +20,40 @@ fn memory_root_uses_shared_global_path() {
assert_eq!(memory_root(&codex_home), codex_home.join("memories"));
}
#[test]
fn parse_stage_one_output_accepts_fenced_json() {
let raw = "```json\n{\"raw_memory\":\"abc\",\"rollout_summary\":\"short\"}\n```";
let parsed = parse_stage_one_output(raw).expect("parsed");
assert!(parsed.raw_memory.contains("abc"));
assert_eq!(parsed.rollout_summary, "short");
}
#[test]
fn parse_stage_one_output_rejects_legacy_keys() {
let raw = r#"{"rawMemory":"abc","summary":"short"}"#;
assert!(parse_stage_one_output(raw).is_err());
}
#[test]
fn parse_stage_one_output_accepts_empty_pair_for_skip() {
let raw = r#"{"raw_memory":"","rollout_summary":""}"#;
let parsed = parse_stage_one_output(raw).expect("parsed");
assert_eq!(parsed.raw_memory, "");
assert_eq!(parsed.rollout_summary, "");
}
#[test]
fn parse_stage_one_output_accepts_optional_rollout_slug() {
let raw = r#"{"raw_memory":"abc","rollout_summary":"short","rollout_slug":"my-slug"}"#;
let parsed = parse_stage_one_output(raw).expect("parsed");
assert!(parsed.raw_memory.contains("abc"));
assert_eq!(parsed.rollout_summary, "short");
assert_eq!(parsed._rollout_slug, Some("my-slug".to_string()));
}
#[test]
fn stage_one_output_schema_requires_all_declared_properties() {
let schema = crate::memories::phase1::output_schema();
let schema = super::stage_one::stage_one_output_schema();
let properties = schema
.get("properties")
.and_then(Value::as_object)
@@ -66,7 +97,6 @@ async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only
source_updated_at: Utc.timestamp_opt(100, 0).single().expect("timestamp"),
raw_memory: "raw memory".to_string(),
rollout_summary: "short summary".to_string(),
cwd: PathBuf::from("/tmp/workspace"),
generated_at: Utc.timestamp_opt(101, 0).single().expect("timestamp"),
}];
@@ -85,5 +115,4 @@ async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only
.expect("read raw memories");
assert!(raw_memories.contains("raw memory"));
assert!(raw_memories.contains(&keep_id));
assert!(raw_memories.contains("cwd: /tmp/workspace"));
}

View File

@@ -137,16 +137,7 @@ impl ModelsManager {
// todo(aibrahim): look if we can tighten it to pub(crate)
/// Look up model metadata, applying remote overrides and config adjustments.
pub async fn get_model_info(&self, model: &str, config: &Config) -> ModelInfo {
let remote = self
.get_remote_models(config)
.await
.into_iter()
.find(|m| m.slug == model);
let model = if let Some(remote) = remote {
remote
} else {
model_info::model_info_from_slug(model)
};
let model = model_info::model_info_from_slug(model);
model_info::with_config_overrides(model, config)
}

View File

@@ -37,8 +37,8 @@ pub(crate) fn should_persist_response_item(item: &ResponseItem) -> bool {
#[inline]
pub(crate) fn should_persist_response_item_for_memories(item: &ResponseItem) -> bool {
match item {
ResponseItem::Message { role, .. } => role != "developer",
ResponseItem::LocalShellCall { .. }
ResponseItem::Message { .. }
| ResponseItem::LocalShellCall { .. }
| ResponseItem::FunctionCall { .. }
| ResponseItem::FunctionCallOutput { .. }
| ResponseItem::CustomToolCall { .. }

View File

@@ -30,7 +30,6 @@ pub(crate) struct SessionState {
/// Startup regular task pre-created during session initialization.
pub(crate) startup_regular_task: Option<RegularTask>,
pub(crate) active_mcp_tool_selection: Option<Vec<String>>,
pub(crate) active_connector_selection: HashSet<String>,
}
impl SessionState {
@@ -48,7 +47,6 @@ impl SessionState {
previous_model: None,
startup_regular_task: None,
active_mcp_tool_selection: None,
active_connector_selection: HashSet::new(),
}
}
@@ -177,25 +175,6 @@ impl SessionState {
pub(crate) fn clear_mcp_tool_selection(&mut self) {
self.active_mcp_tool_selection = None;
}
// Adds connector IDs to the active set and returns the merged selection.
pub(crate) fn merge_connector_selection<I>(&mut self, connector_ids: I) -> HashSet<String>
where
I: IntoIterator<Item = String>,
{
self.active_connector_selection.extend(connector_ids);
self.active_connector_selection.clone()
}
// Returns the current connector selection tracked on session state.
pub(crate) fn get_connector_selection(&self) -> HashSet<String> {
self.active_connector_selection.clone()
}
// Removes all currently tracked connector selections.
pub(crate) fn clear_connector_selection(&mut self) {
self.active_connector_selection.clear();
}
}
// Sometimes new snapshots don't include credits or plan information.
@@ -293,35 +272,6 @@ mod tests {
assert_eq!(state.get_mcp_tool_selection(), None);
}
#[tokio::test]
// Verifies connector merging deduplicates repeated IDs.
async fn merge_connector_selection_deduplicates_entries() {
let session_configuration = make_session_configuration_for_tests().await;
let mut state = SessionState::new(session_configuration);
let merged = state.merge_connector_selection([
"calendar".to_string(),
"calendar".to_string(),
"drive".to_string(),
]);
assert_eq!(
merged,
HashSet::from(["calendar".to_string(), "drive".to_string()])
);
}
#[tokio::test]
// Verifies clearing connector selection removes all saved IDs.
async fn clear_connector_selection_removes_entries() {
let session_configuration = make_session_configuration_for_tests().await;
let mut state = SessionState::new(session_configuration);
state.merge_connector_selection(["calendar".to_string()]);
state.clear_connector_selection();
assert_eq!(state.get_connector_selection(), HashSet::new());
}
#[tokio::test]
async fn set_rate_limits_defaults_limit_id_to_codex_when_missing() {
let session_configuration = make_session_configuration_for_tests().await;

View File

@@ -121,7 +121,6 @@ impl Session {
) {
self.abort_all_tasks(TurnAbortReason::Replaced).await;
self.clear_mcp_tool_selection().await;
self.clear_connector_selection().await;
self.seed_initial_context_if_needed(turn_context.as_ref())
.await;

View File

@@ -97,8 +97,6 @@ Primary inputs (always read these, if exists):
Under `{{ memory_root }}/`:
- `raw_memories.md`
- mechanical merge of `raw_memories` from Phase 1;
- source of rollout-level metadata needed for MEMORY.md header annotations;
you should be able to find `cwd` and `updated_at` there.
- `MEMORY.md`
- merged memories; produce a lightly clustered version if applicable
- `rollout_summaries/*.md`
@@ -131,8 +129,8 @@ Rules:
Clustered schema:
---
rollout_summary_files:
- <file1.md> (<annotation that includes status/usefulness, cwd, and updated_at, e.g. "success, most useful architecture walkthrough, cwd=/repo/path, updated_at=2026-02-12T10:30:00Z">)
- <file2.md> (<annotation with cwd=/..., updated_at=...>)
- <file1.md> (<a few words annotation such as "success, most useful" or "uncertain, no user feedback">)
- <file2.md> (<annotation>)
description: brief description of the shared tasks/outcomes
keywords: k1, k2, k3, ... <searchable handles (tool names, error names, repo concepts, contracts)>
---
@@ -143,9 +141,6 @@ keywords: k1, k2, k3, ... <searchable handles (tool names, error names, repo con
Schema rules (strict):
- Keep entries compact and retrieval-friendly.
- A single note block may correspond to multiple related tasks; aggregate when tasks and lessons align.
- In `rollout_summary_files`, each parenthesized annotation must include
`cwd=<path>` and `updated_at=<timestamp>` copied from that rollout summary metadata.
If missing from an individual rollout summary, recover them from `raw_memories.md`.
- If you need to reference skills, do it in the BODY as bullets, not in the header
(e.g., "- Related skill: skills/<skill-name>/SKILL.md").
- Use lowercase, hyphenated skill folder names.
@@ -343,4 +338,4 @@ Use `rg` for fast retrieval while consolidating:
- Search across memory tree:
`rg -n -i "<pattern>" "{{ memory_root }}" | head -n 50`
- Locate rollout summary files:
`rg --files "{{ memory_root }}/rollout_summaries" | head -n 200`
`rg --files "{{ memory_root }}/rollout_summaries" | head -n 200`

View File

@@ -2,8 +2,19 @@
#![allow(clippy::expect_used, clippy::unwrap_used)]
use codex_core::default_client::CODEX_INTERNAL_ORIGINATOR_OVERRIDE_ENV_VAR;
use codex_core::models_manager::client_version_to_whole;
use codex_core::test_support::all_model_presets;
use codex_protocol::openai_models::ConfigShellToolType;
use codex_protocol::openai_models::ModelInfo;
use codex_protocol::openai_models::ModelPreset;
use codex_protocol::openai_models::ModelVisibility;
use codex_protocol::openai_models::TruncationPolicyConfig;
use codex_protocol::openai_models::default_input_modalities;
use core_test_support::responses;
use core_test_support::responses::ResponseMock;
use core_test_support::test_codex_exec::test_codex_exec;
use pretty_assertions::assert_eq;
use std::path::Path;
use wiremock::matchers::header;
/// Verify that when the server reports an error, `codex-exec` exits with a
@@ -52,3 +63,103 @@ async fn supports_originator_override() -> anyhow::Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn uses_codex_exec_scoped_cache_and_sends_cached_slug() -> anyhow::Result<()> {
let test = test_codex_exec();
let cached_slug = "exec-cache-slug-e2e";
write_models_cache_for_originator(test.home_path(), "codex_exec", cached_slug)?;
let server = responses::start_mock_server().await;
let body = responses::sse(vec![
responses::ev_response_created("response_1"),
responses::ev_assistant_message("response_1", "Hello, world!"),
responses::ev_completed("response_1"),
]);
let response_mock = responses::mount_sse_once(&server, body).await;
test.cmd_with_server(&server)
.env_remove(CODEX_INTERNAL_ORIGINATOR_OVERRIDE_ENV_VAR)
.arg("--skip-git-repo-check")
.arg("tell me something")
.assert()
.code(0);
assert_response_model_slug(&response_mock, cached_slug);
assert!(
test.home_path()
.join("models_cache")
.join("codex_exec")
.join("models_cache.json")
.exists()
);
Ok(())
}
fn assert_response_model_slug(response_mock: &ResponseMock, expected_slug: &str) {
let request = response_mock.single_request();
let request_body = request.body_json();
assert_eq!(request_body["model"].as_str(), Some(expected_slug));
}
fn write_models_cache_for_originator(
codex_home: &Path,
originator: &str,
slug: &str,
) -> std::io::Result<()> {
let Some(first_preset) = all_model_presets()
.into_iter()
.find(|preset| preset.show_in_picker)
else {
return Err(std::io::Error::other("no visible model presets"));
};
let mut model = preset_to_info(&first_preset, 0);
model.slug = slug.to_string();
let cache_path = codex_home
.join("models_cache")
.join(originator)
.join("models_cache.json");
if let Some(parent) = cache_path.parent() {
std::fs::create_dir_all(parent)?;
}
let cache = serde_json::json!({
"fetched_at": chrono::Utc::now(),
"etag": null,
"client_version": client_version_to_whole(),
"models": [model]
});
std::fs::write(cache_path, serde_json::to_string_pretty(&cache)?)
}
fn preset_to_info(preset: &ModelPreset, priority: i32) -> ModelInfo {
ModelInfo {
slug: preset.id.clone(),
display_name: preset.display_name.clone(),
description: Some(preset.description.clone()),
default_reasoning_level: Some(preset.default_reasoning_effort),
supported_reasoning_levels: preset.supported_reasoning_efforts.clone(),
shell_type: ConfigShellToolType::ShellCommand,
visibility: if preset.show_in_picker {
ModelVisibility::List
} else {
ModelVisibility::Hide
},
supported_in_api: true,
priority,
upgrade: preset.upgrade.as_ref().map(|upgrade| upgrade.into()),
base_instructions: "base instructions".to_string(),
model_messages: None,
supports_reasoning_summaries: false,
support_verbosity: false,
default_verbosity: None,
apply_patch_tool_type: None,
truncation_policy: TruncationPolicyConfig::bytes(10_000),
supports_parallel_tool_calls: false,
context_window: Some(272_000),
auto_compact_token_limit: None,
effective_context_window_percent: 95,
experimental_supported_tools: Vec::new(),
input_modalities: default_input_modalities(),
prefer_websockets: false,
}
}

View File

@@ -139,6 +139,19 @@ impl Drop for FileSearchSession {
}
pub fn create_session(
search_directory: &Path,
options: FileSearchOptions,
reporter: Arc<dyn SessionReporter>,
) -> anyhow::Result<FileSearchSession> {
create_session_inner(
vec![search_directory.to_path_buf()],
options,
reporter,
None,
)
}
fn create_session_inner(
search_directories: Vec<PathBuf>,
options: FileSearchOptions,
reporter: Arc<dyn SessionReporter>,
@@ -278,7 +291,7 @@ pub fn run(
cancel_flag: Option<Arc<AtomicBool>>,
) -> anyhow::Result<FileSearchResults> {
let reporter = Arc::new(RunReporter::default());
let session = create_session(roots, options, reporter.clone(), cancel_flag)?;
let session = create_session_inner(roots, options, reporter.clone(), cancel_flag)?;
session.update_query(pattern_text);
@@ -758,13 +771,8 @@ mod tests {
fn session_scanned_file_count_is_monotonic_across_queries() {
let dir = create_temp_tree(200);
let reporter = Arc::new(RecordingReporter::default());
let session = create_session(
vec![dir.path().to_path_buf()],
FileSearchOptions::default(),
reporter.clone(),
None,
)
.expect("session");
let session = create_session(dir.path(), FileSearchOptions::default(), reporter.clone())
.expect("session");
session.update_query("file-00");
thread::sleep(Duration::from_millis(20));
@@ -783,13 +791,8 @@ mod tests {
fn session_streams_updates_before_walk_complete() {
let dir = create_temp_tree(600);
let reporter = Arc::new(RecordingReporter::default());
let session = create_session(
vec![dir.path().to_path_buf()],
FileSearchOptions::default(),
reporter.clone(),
None,
)
.expect("session");
let session = create_session(dir.path(), FileSearchOptions::default(), reporter.clone())
.expect("session");
session.update_query("file-0");
let completed = reporter.wait_for_complete(Duration::from_secs(5));
@@ -805,13 +808,8 @@ mod tests {
fs::write(dir.path().join("alpha.txt"), "alpha").unwrap();
fs::write(dir.path().join("beta.txt"), "beta").unwrap();
let reporter = Arc::new(RecordingReporter::default());
let session = create_session(
vec![dir.path().to_path_buf()],
FileSearchOptions::default(),
reporter.clone(),
None,
)
.expect("session");
let session = create_session(dir.path(), FileSearchOptions::default(), reporter.clone())
.expect("session");
session.update_query("alpha");
assert!(reporter.wait_for_complete(Duration::from_secs(5)));
@@ -836,7 +834,7 @@ mod tests {
fs::write(dir.path().join("alpha.txt"), "alpha").unwrap();
fs::write(dir.path().join("beta.txt"), "beta").unwrap();
let reporter = Arc::new(RecordingReporter::default());
let session = create_session(
let session = create_session_inner(
vec![dir.path().to_path_buf()],
FileSearchOptions::default(),
reporter.clone(),
@@ -865,7 +863,7 @@ mod tests {
let cancel_flag = Arc::new(AtomicBool::new(false));
let reporter_a = Arc::new(RecordingReporter::default());
let session_a = create_session(
let session_a = create_session_inner(
vec![root_a.path().to_path_buf()],
FileSearchOptions::default(),
reporter_a,
@@ -874,7 +872,7 @@ mod tests {
.expect("session_a");
let reporter_b = Arc::new(RecordingReporter::default());
let session_b = create_session(
let session_b = create_session_inner(
vec![root_b.path().to_path_buf()],
FileSearchOptions::default(),
reporter_b.clone(),
@@ -896,13 +894,8 @@ mod tests {
fn session_emits_updates_when_query_changes() {
let dir = create_temp_tree(200);
let reporter = Arc::new(RecordingReporter::default());
let session = create_session(
vec![dir.path().to_path_buf()],
FileSearchOptions::default(),
reporter.clone(),
None,
)
.expect("session");
let session = create_session(dir.path(), FileSearchOptions::default(), reporter.clone())
.expect("session");
session.update_query("zzzzzzzz");
let completed = reporter.wait_for_complete(Duration::from_secs(5));

View File

@@ -4,7 +4,6 @@ use chrono::Utc;
use codex_protocol::ThreadId;
use sqlx::Row;
use sqlx::sqlite::SqliteRow;
use std::path::PathBuf;
use super::ThreadMetadata;
@@ -15,7 +14,6 @@ pub struct Stage1Output {
pub source_updated_at: DateTime<Utc>,
pub raw_memory: String,
pub rollout_summary: String,
pub cwd: PathBuf,
pub generated_at: DateTime<Utc>,
}
@@ -25,7 +23,6 @@ pub(crate) struct Stage1OutputRow {
source_updated_at: i64,
raw_memory: String,
rollout_summary: String,
cwd: String,
generated_at: i64,
}
@@ -36,7 +33,6 @@ impl Stage1OutputRow {
source_updated_at: row.try_get("source_updated_at")?,
raw_memory: row.try_get("raw_memory")?,
rollout_summary: row.try_get("rollout_summary")?,
cwd: row.try_get("cwd")?,
generated_at: row.try_get("generated_at")?,
})
}
@@ -51,7 +47,6 @@ impl TryFrom<Stage1OutputRow> for Stage1Output {
source_updated_at: epoch_seconds_to_datetime(row.source_updated_at)?,
raw_memory: row.raw_memory,
rollout_summary: row.rollout_summary,
cwd: PathBuf::from(row.cwd),
generated_at: epoch_seconds_to_datetime(row.generated_at)?,
})
}

View File

@@ -2093,10 +2093,8 @@ WHERE kind = 'memory_stage1'
assert_eq!(outputs.len(), 2);
assert_eq!(outputs[0].thread_id, thread_id_b);
assert_eq!(outputs[0].rollout_summary, "summary b");
assert_eq!(outputs[0].cwd, codex_home.join("workspace-b"));
assert_eq!(outputs[1].thread_id, thread_id_a);
assert_eq!(outputs[1].rollout_summary, "summary a");
assert_eq!(outputs[1].cwd, codex_home.join("workspace-a"));
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
@@ -2165,7 +2163,6 @@ VALUES (?, ?, ?, ?, ?)
assert_eq!(outputs.len(), 1);
assert_eq!(outputs[0].thread_id, thread_id_non_empty);
assert_eq!(outputs[0].rollout_summary, "summary");
assert_eq!(outputs[0].cwd, codex_home.join("workspace-non-empty"));
let _ = tokio::fs::remove_dir_all(codex_home).await;
}

View File

@@ -178,7 +178,6 @@ LEFT JOIN jobs
///
/// Query behavior:
/// - filters out rows where both `raw_memory` and `rollout_summary` are blank
/// - joins `threads` to include thread `cwd`
/// - orders by `source_updated_at DESC, thread_id DESC`
/// - applies `LIMIT n`
pub async fn list_stage1_outputs_for_global(
@@ -192,10 +191,7 @@ LEFT JOIN jobs
let rows = sqlx::query(
r#"
SELECT so.thread_id, so.source_updated_at, so.raw_memory, so.rollout_summary, so.generated_at
, COALESCE(t.cwd, '') AS cwd
FROM stage1_outputs AS so
LEFT JOIN threads AS t
ON t.id = so.thread_id
WHERE length(trim(so.raw_memory)) > 0 OR length(trim(so.rollout_summary)) > 0
ORDER BY so.source_updated_at DESC, so.thread_id DESC
LIMIT ?

View File

@@ -81,13 +81,12 @@ impl FileSearchManager {
session_token,
});
let session = file_search::create_session(
vec![self.search_dir.clone()],
&self.search_dir,
file_search::FileSearchOptions {
compute_indices: true,
..Default::default()
},
reporter,
None,
);
match session {
Ok(session) => st.session = Some(session),

View File

@@ -1,6 +0,0 @@
load("//:defs.bzl", "codex_rust_crate")
codex_rust_crate(
name = "sanitizer",
crate_name = "codex_utils_sanitizer",
)