mirror of
https://github.com/openai/codex.git
synced 2026-02-13 12:23:51 +00:00
Compare commits
1 Commits
remove/pre
...
remove_pre
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
66647e7eb8 |
2
codex-rs/Cargo.lock
generated
2
codex-rs/Cargo.lock
generated
@@ -1692,7 +1692,6 @@ dependencies = [
|
||||
"codex-utils-home-dir",
|
||||
"codex-utils-pty",
|
||||
"codex-utils-readiness",
|
||||
"codex-utils-sanitizer",
|
||||
"codex-utils-string",
|
||||
"codex-windows-sandbox",
|
||||
"core-foundation 0.9.4",
|
||||
@@ -1721,6 +1720,7 @@ dependencies = [
|
||||
"predicates",
|
||||
"pretty_assertions",
|
||||
"rand 0.9.2",
|
||||
"regex",
|
||||
"regex-lite",
|
||||
"reqwest",
|
||||
"rmcp",
|
||||
|
||||
@@ -340,6 +340,7 @@ ignored = [
|
||||
"icu_provider",
|
||||
"openssl-sys",
|
||||
"codex-utils-readiness",
|
||||
"codex-utils-sanitizer",
|
||||
"codex-secrets",
|
||||
]
|
||||
|
||||
|
||||
@@ -1,63 +0,0 @@
|
||||
{
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"definitions": {
|
||||
"FuzzyFileSearchResult": {
|
||||
"description": "Superset of [`codex_file_search::FileMatch`]",
|
||||
"properties": {
|
||||
"file_name": {
|
||||
"type": "string"
|
||||
},
|
||||
"indices": {
|
||||
"items": {
|
||||
"format": "uint32",
|
||||
"minimum": 0.0,
|
||||
"type": "integer"
|
||||
},
|
||||
"type": [
|
||||
"array",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"path": {
|
||||
"type": "string"
|
||||
},
|
||||
"root": {
|
||||
"type": "string"
|
||||
},
|
||||
"score": {
|
||||
"format": "uint32",
|
||||
"minimum": 0.0,
|
||||
"type": "integer"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"file_name",
|
||||
"path",
|
||||
"root",
|
||||
"score"
|
||||
],
|
||||
"type": "object"
|
||||
}
|
||||
},
|
||||
"properties": {
|
||||
"files": {
|
||||
"items": {
|
||||
"$ref": "#/definitions/FuzzyFileSearchResult"
|
||||
},
|
||||
"type": "array"
|
||||
},
|
||||
"query": {
|
||||
"type": "string"
|
||||
},
|
||||
"sessionId": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"files",
|
||||
"query",
|
||||
"sessionId"
|
||||
],
|
||||
"title": "FuzzyFileSearchSessionUpdatedNotification",
|
||||
"type": "object"
|
||||
}
|
||||
@@ -3684,65 +3684,6 @@
|
||||
],
|
||||
"type": "object"
|
||||
},
|
||||
"FuzzyFileSearchResult": {
|
||||
"description": "Superset of [`codex_file_search::FileMatch`]",
|
||||
"properties": {
|
||||
"file_name": {
|
||||
"type": "string"
|
||||
},
|
||||
"indices": {
|
||||
"items": {
|
||||
"format": "uint32",
|
||||
"minimum": 0.0,
|
||||
"type": "integer"
|
||||
},
|
||||
"type": [
|
||||
"array",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"path": {
|
||||
"type": "string"
|
||||
},
|
||||
"root": {
|
||||
"type": "string"
|
||||
},
|
||||
"score": {
|
||||
"format": "uint32",
|
||||
"minimum": 0.0,
|
||||
"type": "integer"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"file_name",
|
||||
"path",
|
||||
"root",
|
||||
"score"
|
||||
],
|
||||
"type": "object"
|
||||
},
|
||||
"FuzzyFileSearchSessionUpdatedNotification": {
|
||||
"properties": {
|
||||
"files": {
|
||||
"items": {
|
||||
"$ref": "#/definitions/FuzzyFileSearchResult"
|
||||
},
|
||||
"type": "array"
|
||||
},
|
||||
"query": {
|
||||
"type": "string"
|
||||
},
|
||||
"sessionId": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"files",
|
||||
"query",
|
||||
"sessionId"
|
||||
],
|
||||
"type": "object"
|
||||
},
|
||||
"GhostCommit": {
|
||||
"description": "Details of a ghost commit created from a repository state.",
|
||||
"properties": {
|
||||
@@ -8230,26 +8171,6 @@
|
||||
"title": "ConfigWarningNotification",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"method": {
|
||||
"enum": [
|
||||
"fuzzyFileSearch/sessionUpdated"
|
||||
],
|
||||
"title": "FuzzyFileSearch/sessionUpdatedNotificationMethod",
|
||||
"type": "string"
|
||||
},
|
||||
"params": {
|
||||
"$ref": "#/definitions/FuzzyFileSearchSessionUpdatedNotification"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"method",
|
||||
"params"
|
||||
],
|
||||
"title": "FuzzyFileSearch/sessionUpdatedNotification",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"description": "Notifies the user of world-writable directories on Windows, which cannot be protected by the sandbox.",
|
||||
"properties": {
|
||||
|
||||
@@ -5386,30 +5386,6 @@
|
||||
],
|
||||
"type": "object"
|
||||
},
|
||||
"FuzzyFileSearchSessionUpdatedNotification": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"properties": {
|
||||
"files": {
|
||||
"items": {
|
||||
"$ref": "#/definitions/FuzzyFileSearchResult"
|
||||
},
|
||||
"type": "array"
|
||||
},
|
||||
"query": {
|
||||
"type": "string"
|
||||
},
|
||||
"sessionId": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"files",
|
||||
"query",
|
||||
"sessionId"
|
||||
],
|
||||
"title": "FuzzyFileSearchSessionUpdatedNotification",
|
||||
"type": "object"
|
||||
},
|
||||
"GetAuthStatusParams": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"properties": {
|
||||
@@ -8449,26 +8425,6 @@
|
||||
"title": "ConfigWarningNotification",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"method": {
|
||||
"enum": [
|
||||
"fuzzyFileSearch/sessionUpdated"
|
||||
],
|
||||
"title": "FuzzyFileSearch/sessionUpdatedNotificationMethod",
|
||||
"type": "string"
|
||||
},
|
||||
"params": {
|
||||
"$ref": "#/definitions/FuzzyFileSearchSessionUpdatedNotification"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"method",
|
||||
"params"
|
||||
],
|
||||
"title": "FuzzyFileSearch/sessionUpdatedNotification",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"description": "Notifies the user of world-writable directories on Windows, which cannot be protected by the sandbox.",
|
||||
"properties": {
|
||||
|
||||
@@ -1,6 +0,0 @@
|
||||
// GENERATED CODE! DO NOT MODIFY BY HAND!
|
||||
|
||||
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
|
||||
import type { FuzzyFileSearchResult } from "./FuzzyFileSearchResult";
|
||||
|
||||
export type FuzzyFileSearchSessionUpdatedNotification = { sessionId: string, query: string, files: Array<FuzzyFileSearchResult>, };
|
||||
@@ -2,7 +2,6 @@
|
||||
|
||||
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
|
||||
import type { AuthStatusChangeNotification } from "./AuthStatusChangeNotification";
|
||||
import type { FuzzyFileSearchSessionUpdatedNotification } from "./FuzzyFileSearchSessionUpdatedNotification";
|
||||
import type { LoginChatGptCompleteNotification } from "./LoginChatGptCompleteNotification";
|
||||
import type { SessionConfiguredNotification } from "./SessionConfiguredNotification";
|
||||
import type { AccountLoginCompletedNotification } from "./v2/AccountLoginCompletedNotification";
|
||||
@@ -38,4 +37,4 @@ import type { WindowsWorldWritableWarningNotification } from "./v2/WindowsWorldW
|
||||
/**
|
||||
* Notification sent from the server to the client.
|
||||
*/
|
||||
export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "app/list/updated", "params": AppListUpdatedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "fuzzyFileSearch/sessionUpdated", "params": FuzzyFileSearchSessionUpdatedNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification } | { "method": "authStatusChange", "params": AuthStatusChangeNotification } | { "method": "loginChatGptComplete", "params": LoginChatGptCompleteNotification } | { "method": "sessionConfigured", "params": SessionConfiguredNotification };
|
||||
export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "app/list/updated", "params": AppListUpdatedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification } | { "method": "authStatusChange", "params": AuthStatusChangeNotification } | { "method": "loginChatGptComplete", "params": LoginChatGptCompleteNotification } | { "method": "sessionConfigured", "params": SessionConfiguredNotification };
|
||||
|
||||
@@ -77,7 +77,6 @@ export type { FunctionCallOutputPayload } from "./FunctionCallOutputPayload";
|
||||
export type { FuzzyFileSearchParams } from "./FuzzyFileSearchParams";
|
||||
export type { FuzzyFileSearchResponse } from "./FuzzyFileSearchResponse";
|
||||
export type { FuzzyFileSearchResult } from "./FuzzyFileSearchResult";
|
||||
export type { FuzzyFileSearchSessionUpdatedNotification } from "./FuzzyFileSearchSessionUpdatedNotification";
|
||||
export type { GetAuthStatusParams } from "./GetAuthStatusParams";
|
||||
export type { GetAuthStatusResponse } from "./GetAuthStatusResponse";
|
||||
export type { GetConversationSummaryParams } from "./GetConversationSummaryParams";
|
||||
|
||||
@@ -458,21 +458,6 @@ client_request_definitions! {
|
||||
params: FuzzyFileSearchParams,
|
||||
response: FuzzyFileSearchResponse,
|
||||
},
|
||||
#[experimental("fuzzyFileSearch/sessionStart")]
|
||||
FuzzyFileSearchSessionStart => "fuzzyFileSearch/sessionStart" {
|
||||
params: FuzzyFileSearchSessionStartParams,
|
||||
response: FuzzyFileSearchSessionStartResponse,
|
||||
},
|
||||
#[experimental("fuzzyFileSearch/sessionUpdate")]
|
||||
FuzzyFileSearchSessionUpdate => "fuzzyFileSearch/sessionUpdate" {
|
||||
params: FuzzyFileSearchSessionUpdateParams,
|
||||
response: FuzzyFileSearchSessionUpdateResponse,
|
||||
},
|
||||
#[experimental("fuzzyFileSearch/sessionStop")]
|
||||
FuzzyFileSearchSessionStop => "fuzzyFileSearch/sessionStop" {
|
||||
params: FuzzyFileSearchSessionStopParams,
|
||||
response: FuzzyFileSearchSessionStopResponse,
|
||||
},
|
||||
/// Execute a command (argv vector) under the server's sandbox.
|
||||
ExecOneOffCommand {
|
||||
params: v1::ExecOneOffCommandParams,
|
||||
@@ -717,47 +702,6 @@ pub struct FuzzyFileSearchResponse {
|
||||
pub files: Vec<FuzzyFileSearchResult>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(rename_all = "camelCase")]
|
||||
pub struct FuzzyFileSearchSessionStartParams {
|
||||
pub session_id: String,
|
||||
pub roots: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS, Default)]
|
||||
pub struct FuzzyFileSearchSessionStartResponse {}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(rename_all = "camelCase")]
|
||||
pub struct FuzzyFileSearchSessionUpdateParams {
|
||||
pub session_id: String,
|
||||
pub query: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS, Default)]
|
||||
pub struct FuzzyFileSearchSessionUpdateResponse {}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(rename_all = "camelCase")]
|
||||
pub struct FuzzyFileSearchSessionStopParams {
|
||||
pub session_id: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS, Default)]
|
||||
pub struct FuzzyFileSearchSessionStopResponse {}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(rename_all = "camelCase")]
|
||||
pub struct FuzzyFileSearchSessionUpdatedNotification {
|
||||
pub session_id: String,
|
||||
pub query: String,
|
||||
pub files: Vec<FuzzyFileSearchResult>,
|
||||
}
|
||||
|
||||
server_notification_definitions! {
|
||||
/// NEW NOTIFICATIONS
|
||||
Error => "error" (v2::ErrorNotification),
|
||||
@@ -790,7 +734,6 @@ server_notification_definitions! {
|
||||
ContextCompacted => "thread/compacted" (v2::ContextCompactedNotification),
|
||||
DeprecationNotice => "deprecationNotice" (v2::DeprecationNoticeNotification),
|
||||
ConfigWarning => "configWarning" (v2::ConfigWarningNotification),
|
||||
FuzzyFileSearchSessionUpdated => "fuzzyFileSearch/sessionUpdated" (FuzzyFileSearchSessionUpdatedNotification),
|
||||
|
||||
/// Notifies the user of world-writable directories on Windows, which cannot be protected by the sandbox.
|
||||
WindowsWorldWritableWarning => "windows/worldWritableWarning" (v2::WindowsWorldWritableWarningNotification),
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
use crate::bespoke_event_handling::apply_bespoke_event_handling;
|
||||
use crate::error_code::INTERNAL_ERROR_CODE;
|
||||
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
|
||||
use crate::fuzzy_file_search::FuzzyFileSearchSession;
|
||||
use crate::fuzzy_file_search::run_fuzzy_file_search;
|
||||
use crate::fuzzy_file_search::start_fuzzy_file_search_session;
|
||||
use crate::models::supported_models;
|
||||
use crate::outgoing_message::ConnectionId;
|
||||
use crate::outgoing_message::ConnectionRequestId;
|
||||
@@ -49,12 +47,6 @@ use codex_app_server_protocol::ForkConversationParams;
|
||||
use codex_app_server_protocol::ForkConversationResponse;
|
||||
use codex_app_server_protocol::FuzzyFileSearchParams;
|
||||
use codex_app_server_protocol::FuzzyFileSearchResponse;
|
||||
use codex_app_server_protocol::FuzzyFileSearchSessionStartParams;
|
||||
use codex_app_server_protocol::FuzzyFileSearchSessionStartResponse;
|
||||
use codex_app_server_protocol::FuzzyFileSearchSessionStopParams;
|
||||
use codex_app_server_protocol::FuzzyFileSearchSessionStopResponse;
|
||||
use codex_app_server_protocol::FuzzyFileSearchSessionUpdateParams;
|
||||
use codex_app_server_protocol::FuzzyFileSearchSessionUpdateResponse;
|
||||
use codex_app_server_protocol::GetAccountParams;
|
||||
use codex_app_server_protocol::GetAccountRateLimitsResponse;
|
||||
use codex_app_server_protocol::GetAccountResponse;
|
||||
@@ -302,7 +294,6 @@ pub(crate) struct CodexMessageProcessor {
|
||||
active_login: Arc<Mutex<Option<ActiveLogin>>>,
|
||||
thread_state_manager: ThreadStateManager,
|
||||
pending_fuzzy_searches: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>>,
|
||||
fuzzy_search_sessions: Arc<Mutex<HashMap<String, FuzzyFileSearchSession>>>,
|
||||
feedback: CodexFeedback,
|
||||
}
|
||||
|
||||
@@ -370,7 +361,6 @@ impl CodexMessageProcessor {
|
||||
active_login: Arc::new(Mutex::new(None)),
|
||||
thread_state_manager: ThreadStateManager::new(),
|
||||
pending_fuzzy_searches: Arc::new(Mutex::new(HashMap::new())),
|
||||
fuzzy_search_sessions: Arc::new(Mutex::new(HashMap::new())),
|
||||
feedback,
|
||||
}
|
||||
}
|
||||
@@ -737,18 +727,6 @@ impl CodexMessageProcessor {
|
||||
self.fuzzy_file_search(to_connection_request_id(request_id), params)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::FuzzyFileSearchSessionStart { request_id, params } => {
|
||||
self.fuzzy_file_search_session_start(to_connection_request_id(request_id), params)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::FuzzyFileSearchSessionUpdate { request_id, params } => {
|
||||
self.fuzzy_file_search_session_update(to_connection_request_id(request_id), params)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::FuzzyFileSearchSessionStop { request_id, params } => {
|
||||
self.fuzzy_file_search_session_stop(to_connection_request_id(request_id), params)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::OneOffCommandExec { request_id, params } => {
|
||||
self.exec_one_off_command(to_connection_request_id(request_id), params)
|
||||
.await;
|
||||
@@ -1932,7 +1910,9 @@ impl CodexMessageProcessor {
|
||||
let core_dynamic_tools = if dynamic_tools.is_empty() {
|
||||
Vec::new()
|
||||
} else {
|
||||
if let Err(message) = validate_dynamic_tools(&dynamic_tools) {
|
||||
let snapshot = collect_mcp_snapshot(&config).await;
|
||||
let mcp_tool_names = snapshot.tools.keys().cloned().collect::<HashSet<_>>();
|
||||
if let Err(message) = validate_dynamic_tools(&dynamic_tools, &mcp_tool_names) {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message,
|
||||
@@ -5578,89 +5558,6 @@ impl CodexMessageProcessor {
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
}
|
||||
|
||||
async fn fuzzy_file_search_session_start(
|
||||
&mut self,
|
||||
request_id: ConnectionRequestId,
|
||||
params: FuzzyFileSearchSessionStartParams,
|
||||
) {
|
||||
let FuzzyFileSearchSessionStartParams { session_id, roots } = params;
|
||||
if session_id.is_empty() {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: "sessionId must not be empty".to_string(),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
|
||||
let session =
|
||||
start_fuzzy_file_search_session(session_id.clone(), roots, self.outgoing.clone());
|
||||
match session {
|
||||
Ok(session) => {
|
||||
let mut sessions = self.fuzzy_search_sessions.lock().await;
|
||||
sessions.insert(session_id, session);
|
||||
self.outgoing
|
||||
.send_response(request_id, FuzzyFileSearchSessionStartResponse {})
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: format!("failed to start fuzzy file search session: {err}"),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn fuzzy_file_search_session_update(
|
||||
&mut self,
|
||||
request_id: ConnectionRequestId,
|
||||
params: FuzzyFileSearchSessionUpdateParams,
|
||||
) {
|
||||
let FuzzyFileSearchSessionUpdateParams { session_id, query } = params;
|
||||
let found = {
|
||||
let sessions = self.fuzzy_search_sessions.lock().await;
|
||||
if let Some(session) = sessions.get(&session_id) {
|
||||
session.update_query(query);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
};
|
||||
if !found {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("fuzzy file search session not found: {session_id}"),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
|
||||
self.outgoing
|
||||
.send_response(request_id, FuzzyFileSearchSessionUpdateResponse {})
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn fuzzy_file_search_session_stop(
|
||||
&mut self,
|
||||
request_id: ConnectionRequestId,
|
||||
params: FuzzyFileSearchSessionStopParams,
|
||||
) {
|
||||
let FuzzyFileSearchSessionStopParams { session_id } = params;
|
||||
{
|
||||
let mut sessions = self.fuzzy_search_sessions.lock().await;
|
||||
sessions.remove(&session_id);
|
||||
}
|
||||
|
||||
self.outgoing
|
||||
.send_response(request_id, FuzzyFileSearchSessionStopResponse {})
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn upload_feedback(&self, request_id: ConnectionRequestId, params: FeedbackUploadParams) {
|
||||
if !self.config.feedback_enabled {
|
||||
let error = JSONRPCErrorError {
|
||||
@@ -5815,7 +5712,10 @@ fn errors_to_info(
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn validate_dynamic_tools(tools: &[ApiDynamicToolSpec]) -> Result<(), String> {
|
||||
fn validate_dynamic_tools(
|
||||
tools: &[ApiDynamicToolSpec],
|
||||
mcp_tool_names: &HashSet<String>,
|
||||
) -> Result<(), String> {
|
||||
let mut seen = HashSet::new();
|
||||
for tool in tools {
|
||||
let name = tool.name.trim();
|
||||
@@ -5831,6 +5731,9 @@ fn validate_dynamic_tools(tools: &[ApiDynamicToolSpec]) -> Result<(), String> {
|
||||
if name == "mcp" || name.starts_with("mcp__") {
|
||||
return Err(format!("dynamic tool name is reserved: {name}"));
|
||||
}
|
||||
if mcp_tool_names.contains(name) {
|
||||
return Err(format!("dynamic tool name conflicts with MCP tool: {name}"));
|
||||
}
|
||||
if !seen.insert(name.to_string()) {
|
||||
return Err(format!("duplicate dynamic tool name: {name}"));
|
||||
}
|
||||
@@ -6333,7 +6236,7 @@ mod tests {
|
||||
description: "test".to_string(),
|
||||
input_schema: json!({"type": "null"}),
|
||||
}];
|
||||
let err = validate_dynamic_tools(&tools).expect_err("invalid schema");
|
||||
let err = validate_dynamic_tools(&tools, &HashSet::new()).expect_err("invalid schema");
|
||||
assert!(err.contains("my_tool"), "unexpected error: {err}");
|
||||
}
|
||||
|
||||
@@ -6345,7 +6248,7 @@ mod tests {
|
||||
// Missing `type` is common; core sanitizes these to a supported schema.
|
||||
input_schema: json!({"properties": {}}),
|
||||
}];
|
||||
validate_dynamic_tools(&tools).expect("valid schema");
|
||||
validate_dynamic_tools(&tools, &HashSet::new()).expect("valid schema");
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -1,18 +1,12 @@
|
||||
use std::num::NonZero;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use codex_app_server_protocol::FuzzyFileSearchResult;
|
||||
use codex_app_server_protocol::FuzzyFileSearchSessionUpdatedNotification;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_file_search as file_search;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::outgoing_message::OutgoingMessageSender;
|
||||
|
||||
const MATCH_LIMIT: usize = 50;
|
||||
const MAX_THREADS: usize = 12;
|
||||
|
||||
@@ -83,148 +77,3 @@ pub(crate) async fn run_fuzzy_file_search(
|
||||
|
||||
files
|
||||
}
|
||||
|
||||
pub(crate) struct FuzzyFileSearchSession {
|
||||
session: file_search::FileSearchSession,
|
||||
shared: Arc<SessionShared>,
|
||||
}
|
||||
|
||||
impl FuzzyFileSearchSession {
|
||||
pub(crate) fn update_query(&self, query: String) {
|
||||
if self.shared.canceled.load(Ordering::Relaxed) {
|
||||
return;
|
||||
}
|
||||
{
|
||||
#[expect(clippy::unwrap_used)]
|
||||
let mut latest_query = self.shared.latest_query.lock().unwrap();
|
||||
*latest_query = query.clone();
|
||||
}
|
||||
self.session.update_query(&query);
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for FuzzyFileSearchSession {
|
||||
fn drop(&mut self) {
|
||||
self.shared.canceled.store(true, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn start_fuzzy_file_search_session(
|
||||
session_id: String,
|
||||
roots: Vec<String>,
|
||||
outgoing: Arc<OutgoingMessageSender>,
|
||||
) -> anyhow::Result<FuzzyFileSearchSession> {
|
||||
#[expect(clippy::expect_used)]
|
||||
let limit = NonZero::new(MATCH_LIMIT).expect("MATCH_LIMIT should be a valid non-zero usize");
|
||||
let cores = std::thread::available_parallelism()
|
||||
.map(std::num::NonZero::get)
|
||||
.unwrap_or(1);
|
||||
let threads = cores.min(MAX_THREADS);
|
||||
#[expect(clippy::expect_used)]
|
||||
let threads = NonZero::new(threads.max(1)).expect("threads should be non-zero");
|
||||
let search_dirs: Vec<PathBuf> = roots.iter().map(PathBuf::from).collect();
|
||||
let canceled = Arc::new(AtomicBool::new(false));
|
||||
|
||||
let shared = Arc::new(SessionShared {
|
||||
session_id,
|
||||
latest_query: Mutex::new(String::new()),
|
||||
outgoing,
|
||||
runtime: tokio::runtime::Handle::current(),
|
||||
canceled: canceled.clone(),
|
||||
});
|
||||
|
||||
let reporter = Arc::new(SessionReporterImpl {
|
||||
shared: shared.clone(),
|
||||
});
|
||||
let session = file_search::create_session(
|
||||
search_dirs,
|
||||
file_search::FileSearchOptions {
|
||||
limit,
|
||||
threads,
|
||||
compute_indices: true,
|
||||
..Default::default()
|
||||
},
|
||||
reporter,
|
||||
Some(canceled),
|
||||
)?;
|
||||
|
||||
Ok(FuzzyFileSearchSession { session, shared })
|
||||
}
|
||||
|
||||
struct SessionShared {
|
||||
session_id: String,
|
||||
latest_query: Mutex<String>,
|
||||
outgoing: Arc<OutgoingMessageSender>,
|
||||
runtime: tokio::runtime::Handle,
|
||||
canceled: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
struct SessionReporterImpl {
|
||||
shared: Arc<SessionShared>,
|
||||
}
|
||||
|
||||
impl SessionReporterImpl {
|
||||
fn send_snapshot(&self, snapshot: &file_search::FileSearchSnapshot) {
|
||||
if self.shared.canceled.load(Ordering::Relaxed) {
|
||||
return;
|
||||
}
|
||||
|
||||
let query = {
|
||||
#[expect(clippy::unwrap_used)]
|
||||
self.shared.latest_query.lock().unwrap().clone()
|
||||
};
|
||||
if snapshot.query != query {
|
||||
return;
|
||||
}
|
||||
|
||||
let files = if query.is_empty() {
|
||||
Vec::new()
|
||||
} else {
|
||||
collect_files(snapshot)
|
||||
};
|
||||
|
||||
let notification = ServerNotification::FuzzyFileSearchSessionUpdated(
|
||||
FuzzyFileSearchSessionUpdatedNotification {
|
||||
session_id: self.shared.session_id.clone(),
|
||||
query,
|
||||
files,
|
||||
},
|
||||
);
|
||||
let outgoing = self.shared.outgoing.clone();
|
||||
self.shared.runtime.spawn(async move {
|
||||
outgoing.send_server_notification(notification).await;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl file_search::SessionReporter for SessionReporterImpl {
|
||||
fn on_update(&self, snapshot: &file_search::FileSearchSnapshot) {
|
||||
self.send_snapshot(snapshot);
|
||||
}
|
||||
|
||||
fn on_complete(&self) {}
|
||||
}
|
||||
|
||||
fn collect_files(snapshot: &file_search::FileSearchSnapshot) -> Vec<FuzzyFileSearchResult> {
|
||||
let mut files = snapshot
|
||||
.matches
|
||||
.iter()
|
||||
.map(|m| {
|
||||
let file_name = m.path.file_name().unwrap_or_default();
|
||||
FuzzyFileSearchResult {
|
||||
root: m.root.to_string_lossy().to_string(),
|
||||
path: m.path.to_string_lossy().to_string(),
|
||||
file_name: file_name.to_string_lossy().to_string(),
|
||||
score: m.score,
|
||||
indices: m.indices.clone(),
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
files.sort_by(file_search::cmp_by_score_desc_then_path_asc::<
|
||||
FuzzyFileSearchResult,
|
||||
_,
|
||||
_,
|
||||
>(|f| f.score, |f| f.path.as_str()));
|
||||
files
|
||||
}
|
||||
|
||||
@@ -678,78 +678,6 @@ impl McpProcess {
|
||||
self.send_request("fuzzyFileSearch", Some(params)).await
|
||||
}
|
||||
|
||||
pub async fn send_fuzzy_file_search_session_start_request(
|
||||
&mut self,
|
||||
session_id: &str,
|
||||
roots: Vec<String>,
|
||||
) -> anyhow::Result<i64> {
|
||||
let params = serde_json::json!({
|
||||
"sessionId": session_id,
|
||||
"roots": roots,
|
||||
});
|
||||
self.send_request("fuzzyFileSearch/sessionStart", Some(params))
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn start_fuzzy_file_search_session(
|
||||
&mut self,
|
||||
session_id: &str,
|
||||
roots: Vec<String>,
|
||||
) -> anyhow::Result<JSONRPCResponse> {
|
||||
let request_id = self
|
||||
.send_fuzzy_file_search_session_start_request(session_id, roots)
|
||||
.await?;
|
||||
self.read_stream_until_response_message(RequestId::Integer(request_id))
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn send_fuzzy_file_search_session_update_request(
|
||||
&mut self,
|
||||
session_id: &str,
|
||||
query: &str,
|
||||
) -> anyhow::Result<i64> {
|
||||
let params = serde_json::json!({
|
||||
"sessionId": session_id,
|
||||
"query": query,
|
||||
});
|
||||
self.send_request("fuzzyFileSearch/sessionUpdate", Some(params))
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn update_fuzzy_file_search_session(
|
||||
&mut self,
|
||||
session_id: &str,
|
||||
query: &str,
|
||||
) -> anyhow::Result<JSONRPCResponse> {
|
||||
let request_id = self
|
||||
.send_fuzzy_file_search_session_update_request(session_id, query)
|
||||
.await?;
|
||||
self.read_stream_until_response_message(RequestId::Integer(request_id))
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn send_fuzzy_file_search_session_stop_request(
|
||||
&mut self,
|
||||
session_id: &str,
|
||||
) -> anyhow::Result<i64> {
|
||||
let params = serde_json::json!({
|
||||
"sessionId": session_id,
|
||||
});
|
||||
self.send_request("fuzzyFileSearch/sessionStop", Some(params))
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn stop_fuzzy_file_search_session(
|
||||
&mut self,
|
||||
session_id: &str,
|
||||
) -> anyhow::Result<JSONRPCResponse> {
|
||||
let request_id = self
|
||||
.send_fuzzy_file_search_session_stop_request(session_id)
|
||||
.await?;
|
||||
self.read_stream_until_response_message(RequestId::Integer(request_id))
|
||||
.await
|
||||
}
|
||||
|
||||
async fn send_request(
|
||||
&mut self,
|
||||
method: &str,
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use anyhow::Result;
|
||||
use anyhow::anyhow;
|
||||
use app_test_support::McpProcess;
|
||||
use codex_app_server_protocol::FuzzyFileSearchSessionUpdatedNotification;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use pretty_assertions::assert_eq;
|
||||
@@ -10,130 +9,6 @@ use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
const SHORT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(500);
|
||||
const STOP_GRACE_PERIOD: std::time::Duration = std::time::Duration::from_millis(250);
|
||||
const SESSION_UPDATED_METHOD: &str = "fuzzyFileSearch/sessionUpdated";
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
enum FileExpectation {
|
||||
Any,
|
||||
Empty,
|
||||
NonEmpty,
|
||||
}
|
||||
|
||||
async fn initialized_mcp(codex_home: &TempDir) -> Result<McpProcess> {
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
Ok(mcp)
|
||||
}
|
||||
|
||||
async fn wait_for_session_updated(
|
||||
mcp: &mut McpProcess,
|
||||
session_id: &str,
|
||||
query: &str,
|
||||
file_expectation: FileExpectation,
|
||||
) -> Result<FuzzyFileSearchSessionUpdatedNotification> {
|
||||
for _ in 0..20 {
|
||||
let notification = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message(SESSION_UPDATED_METHOD),
|
||||
)
|
||||
.await??;
|
||||
let params = notification
|
||||
.params
|
||||
.ok_or_else(|| anyhow!("missing notification params"))?;
|
||||
let payload = serde_json::from_value::<FuzzyFileSearchSessionUpdatedNotification>(params)?;
|
||||
if payload.session_id != session_id || payload.query != query {
|
||||
continue;
|
||||
}
|
||||
let files_match = match file_expectation {
|
||||
FileExpectation::Any => true,
|
||||
FileExpectation::Empty => payload.files.is_empty(),
|
||||
FileExpectation::NonEmpty => !payload.files.is_empty(),
|
||||
};
|
||||
if files_match {
|
||||
return Ok(payload);
|
||||
}
|
||||
}
|
||||
anyhow::bail!(
|
||||
"did not receive expected session update for sessionId={session_id}, query={query}"
|
||||
);
|
||||
}
|
||||
|
||||
async fn assert_update_request_fails_for_missing_session(
|
||||
mcp: &mut McpProcess,
|
||||
session_id: &str,
|
||||
query: &str,
|
||||
) -> Result<()> {
|
||||
let request_id = mcp
|
||||
.send_fuzzy_file_search_session_update_request(session_id, query)
|
||||
.await?;
|
||||
let err = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
|
||||
)
|
||||
.await??;
|
||||
assert_eq!(err.error.code, -32600);
|
||||
assert_eq!(
|
||||
err.error.message,
|
||||
format!("fuzzy file search session not found: {session_id}")
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn assert_no_session_updates_for(
|
||||
mcp: &mut McpProcess,
|
||||
session_id: &str,
|
||||
grace_period: std::time::Duration,
|
||||
duration: std::time::Duration,
|
||||
) -> Result<()> {
|
||||
let grace_deadline = tokio::time::Instant::now() + grace_period;
|
||||
loop {
|
||||
let now = tokio::time::Instant::now();
|
||||
if now >= grace_deadline {
|
||||
break;
|
||||
}
|
||||
let remaining = grace_deadline - now;
|
||||
match timeout(
|
||||
remaining,
|
||||
mcp.read_stream_until_notification_message(SESSION_UPDATED_METHOD),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Err(_) => break,
|
||||
Ok(Err(err)) => return Err(err),
|
||||
Ok(Ok(_)) => {}
|
||||
}
|
||||
}
|
||||
|
||||
let deadline = tokio::time::Instant::now() + duration;
|
||||
loop {
|
||||
let now = tokio::time::Instant::now();
|
||||
if now >= deadline {
|
||||
return Ok(());
|
||||
}
|
||||
let remaining = deadline - now;
|
||||
match timeout(
|
||||
remaining,
|
||||
mcp.read_stream_until_notification_message(SESSION_UPDATED_METHOD),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Err(_) => return Ok(()),
|
||||
Ok(Err(err)) => return Err(err),
|
||||
Ok(Ok(notification)) => {
|
||||
let params = notification
|
||||
.params
|
||||
.ok_or_else(|| anyhow!("missing notification params"))?;
|
||||
let payload =
|
||||
serde_json::from_value::<FuzzyFileSearchSessionUpdatedNotification>(params)?;
|
||||
if payload.session_id == session_id {
|
||||
anyhow::bail!("received unexpected session update after stop: {payload:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_fuzzy_file_search_sorts_and_includes_indices() -> Result<()> {
|
||||
@@ -250,215 +125,3 @@ async fn test_fuzzy_file_search_accepts_cancellation_token() -> Result<()> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_fuzzy_file_search_session_streams_updates() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let root = TempDir::new()?;
|
||||
std::fs::write(root.path().join("alpha.txt"), "contents")?;
|
||||
let mut mcp = initialized_mcp(&codex_home).await?;
|
||||
|
||||
let root_path = root.path().to_string_lossy().to_string();
|
||||
let session_id = "session-1";
|
||||
|
||||
mcp.start_fuzzy_file_search_session(session_id, vec![root_path.clone()])
|
||||
.await?;
|
||||
mcp.update_fuzzy_file_search_session(session_id, "alp")
|
||||
.await?;
|
||||
|
||||
let payload =
|
||||
wait_for_session_updated(&mut mcp, session_id, "alp", FileExpectation::NonEmpty).await?;
|
||||
assert_eq!(payload.files.len(), 1);
|
||||
assert_eq!(payload.files[0].root, root_path);
|
||||
assert_eq!(payload.files[0].path, "alpha.txt");
|
||||
|
||||
mcp.stop_fuzzy_file_search_session(session_id).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_fuzzy_file_search_session_update_before_start_errors() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let mut mcp = initialized_mcp(&codex_home).await?;
|
||||
assert_update_request_fails_for_missing_session(&mut mcp, "missing", "alp").await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_fuzzy_file_search_session_update_works_without_waiting_for_start_response()
|
||||
-> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let root = TempDir::new()?;
|
||||
std::fs::write(root.path().join("alpha.txt"), "contents")?;
|
||||
let mut mcp = initialized_mcp(&codex_home).await?;
|
||||
|
||||
let root_path = root.path().to_string_lossy().to_string();
|
||||
let session_id = "session-no-wait";
|
||||
|
||||
let start_request_id = mcp
|
||||
.send_fuzzy_file_search_session_start_request(session_id, vec![root_path.clone()])
|
||||
.await?;
|
||||
let update_request_id = mcp
|
||||
.send_fuzzy_file_search_session_update_request(session_id, "alp")
|
||||
.await?;
|
||||
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(update_request_id)),
|
||||
)
|
||||
.await??;
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(start_request_id)),
|
||||
)
|
||||
.await??;
|
||||
|
||||
let payload =
|
||||
wait_for_session_updated(&mut mcp, session_id, "alp", FileExpectation::NonEmpty).await?;
|
||||
assert_eq!(payload.files.len(), 1);
|
||||
assert_eq!(payload.files[0].root, root_path);
|
||||
assert_eq!(payload.files[0].path, "alpha.txt");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_fuzzy_file_search_session_multiple_query_updates_work() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let root = TempDir::new()?;
|
||||
std::fs::write(root.path().join("alpha.txt"), "contents")?;
|
||||
std::fs::write(root.path().join("alphabet.txt"), "contents")?;
|
||||
let mut mcp = initialized_mcp(&codex_home).await?;
|
||||
|
||||
let root_path = root.path().to_string_lossy().to_string();
|
||||
let session_id = "session-multi-update";
|
||||
mcp.start_fuzzy_file_search_session(session_id, vec![root_path.clone()])
|
||||
.await?;
|
||||
|
||||
mcp.update_fuzzy_file_search_session(session_id, "alp")
|
||||
.await?;
|
||||
let alp_payload =
|
||||
wait_for_session_updated(&mut mcp, session_id, "alp", FileExpectation::NonEmpty).await?;
|
||||
assert_eq!(
|
||||
alp_payload.files.iter().all(|file| file.root == root_path),
|
||||
true
|
||||
);
|
||||
|
||||
mcp.update_fuzzy_file_search_session(session_id, "zzzz")
|
||||
.await?;
|
||||
let zzzz_payload =
|
||||
wait_for_session_updated(&mut mcp, session_id, "zzzz", FileExpectation::Any).await?;
|
||||
assert_eq!(zzzz_payload.query, "zzzz");
|
||||
assert_eq!(zzzz_payload.files.is_empty(), true);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_fuzzy_file_search_session_update_after_stop_fails() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let root = TempDir::new()?;
|
||||
std::fs::write(root.path().join("alpha.txt"), "contents")?;
|
||||
let mut mcp = initialized_mcp(&codex_home).await?;
|
||||
|
||||
let session_id = "session-stop-fail";
|
||||
let root_path = root.path().to_string_lossy().to_string();
|
||||
mcp.start_fuzzy_file_search_session(session_id, vec![root_path])
|
||||
.await?;
|
||||
mcp.stop_fuzzy_file_search_session(session_id).await?;
|
||||
|
||||
assert_update_request_fails_for_missing_session(&mut mcp, session_id, "alp").await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_fuzzy_file_search_session_stops_sending_updates_after_stop() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let root = TempDir::new()?;
|
||||
for i in 0..10_000 {
|
||||
let file_path = root.path().join(format!("file-{i:04}.txt"));
|
||||
std::fs::write(file_path, "contents")?;
|
||||
}
|
||||
let mut mcp = initialized_mcp(&codex_home).await?;
|
||||
|
||||
let root_path = root.path().to_string_lossy().to_string();
|
||||
let session_id = "session-stop-no-updates";
|
||||
mcp.start_fuzzy_file_search_session(session_id, vec![root_path])
|
||||
.await?;
|
||||
mcp.update_fuzzy_file_search_session(session_id, "file-")
|
||||
.await?;
|
||||
wait_for_session_updated(&mut mcp, session_id, "file-", FileExpectation::NonEmpty).await?;
|
||||
|
||||
mcp.stop_fuzzy_file_search_session(session_id).await?;
|
||||
|
||||
assert_no_session_updates_for(&mut mcp, session_id, STOP_GRACE_PERIOD, SHORT_READ_TIMEOUT)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_fuzzy_file_search_two_sessions_are_independent() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let root_a = TempDir::new()?;
|
||||
let root_b = TempDir::new()?;
|
||||
std::fs::write(root_a.path().join("alpha.txt"), "contents")?;
|
||||
std::fs::write(root_b.path().join("beta.txt"), "contents")?;
|
||||
let mut mcp = initialized_mcp(&codex_home).await?;
|
||||
|
||||
let root_a_path = root_a.path().to_string_lossy().to_string();
|
||||
let root_b_path = root_b.path().to_string_lossy().to_string();
|
||||
let session_a = "session-a";
|
||||
let session_b = "session-b";
|
||||
|
||||
mcp.start_fuzzy_file_search_session(session_a, vec![root_a_path.clone()])
|
||||
.await?;
|
||||
mcp.start_fuzzy_file_search_session(session_b, vec![root_b_path.clone()])
|
||||
.await?;
|
||||
|
||||
mcp.update_fuzzy_file_search_session(session_a, "alp")
|
||||
.await?;
|
||||
|
||||
let session_a_update =
|
||||
wait_for_session_updated(&mut mcp, session_a, "alp", FileExpectation::NonEmpty).await?;
|
||||
assert_eq!(session_a_update.files.len(), 1);
|
||||
assert_eq!(session_a_update.files[0].root, root_a_path);
|
||||
assert_eq!(session_a_update.files[0].path, "alpha.txt");
|
||||
|
||||
mcp.update_fuzzy_file_search_session(session_b, "bet")
|
||||
.await?;
|
||||
let session_b_update =
|
||||
wait_for_session_updated(&mut mcp, session_b, "bet", FileExpectation::NonEmpty).await?;
|
||||
assert_eq!(session_b_update.files.len(), 1);
|
||||
assert_eq!(session_b_update.files[0].root, root_b_path);
|
||||
assert_eq!(session_b_update.files[0].path, "beta.txt");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_fuzzy_file_search_query_cleared_sends_blank_snapshot() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let root = TempDir::new()?;
|
||||
std::fs::write(root.path().join("alpha.txt"), "contents")?;
|
||||
let mut mcp = initialized_mcp(&codex_home).await?;
|
||||
|
||||
let root_path = root.path().to_string_lossy().to_string();
|
||||
let session_id = "session-clear-query";
|
||||
mcp.start_fuzzy_file_search_session(session_id, vec![root_path])
|
||||
.await?;
|
||||
|
||||
mcp.update_fuzzy_file_search_session(session_id, "alp")
|
||||
.await?;
|
||||
wait_for_session_updated(&mut mcp, session_id, "alp", FileExpectation::NonEmpty).await?;
|
||||
|
||||
mcp.update_fuzzy_file_search_session(session_id, "").await?;
|
||||
let payload =
|
||||
wait_for_session_updated(&mut mcp, session_id, "", FileExpectation::Empty).await?;
|
||||
assert_eq!(payload.files.is_empty(), true);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ use app_test_support::create_mock_responses_server_sequence_unchecked;
|
||||
use app_test_support::create_shell_command_sse_response;
|
||||
use app_test_support::format_with_current_shell_display;
|
||||
use app_test_support::to_response;
|
||||
use app_test_support::write_models_cache_with_slug_for_originator;
|
||||
use codex_app_server_protocol::ByteRange;
|
||||
use codex_app_server_protocol::ClientInfo;
|
||||
use codex_app_server_protocol::CommandExecutionApprovalDecision;
|
||||
@@ -59,6 +60,7 @@ const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
const TEST_ORIGINATOR: &str = "codex_vscode";
|
||||
const LOCAL_PRAGMATIC_TEMPLATE: &str = "You are a deeply pragmatic, effective software engineer.";
|
||||
const APP_SERVER_CACHE_ORIGINATOR: &str = "codex_app_server_cache_e2e";
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_start_sends_originator_header() -> Result<()> {
|
||||
@@ -135,6 +137,89 @@ async fn turn_start_sends_originator_header() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_start_uses_originator_scoped_cache_slug() -> Result<()> {
|
||||
let responses = vec![create_final_assistant_message_sse_response("Done")?];
|
||||
let server = create_mock_responses_server_sequence_unchecked(responses).await;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(
|
||||
codex_home.path(),
|
||||
&server.uri(),
|
||||
"never",
|
||||
&BTreeMap::from([(Feature::Personality, true)]),
|
||||
)?;
|
||||
let cached_slug = "app-server-cache-slug-e2e";
|
||||
write_models_cache_with_slug_for_originator(
|
||||
codex_home.path(),
|
||||
APP_SERVER_CACHE_ORIGINATOR,
|
||||
cached_slug,
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new_with_env(
|
||||
codex_home.path(),
|
||||
&[(
|
||||
codex_core::default_client::CODEX_INTERNAL_ORIGINATOR_OVERRIDE_ENV_VAR,
|
||||
Some(APP_SERVER_CACHE_ORIGINATOR),
|
||||
)],
|
||||
)
|
||||
.await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let thread_req = mcp
|
||||
.send_thread_start_request(ThreadStartParams::default())
|
||||
.await?;
|
||||
let thread_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
|
||||
|
||||
let turn_req = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id,
|
||||
input: vec![V2UserInput::Text {
|
||||
text: "Hello".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
|
||||
)
|
||||
.await??;
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("turn/completed"),
|
||||
)
|
||||
.await??;
|
||||
|
||||
let requests = server
|
||||
.received_requests()
|
||||
.await
|
||||
.expect("failed to fetch received requests");
|
||||
let response_request = requests
|
||||
.into_iter()
|
||||
.find(|request| request.url.path().ends_with("/responses"))
|
||||
.expect("expected /responses request");
|
||||
let body: serde_json::Value = serde_json::from_slice(&response_request.body)
|
||||
.expect("responses request body should be json");
|
||||
assert_eq!(body["model"].as_str(), Some(cached_slug));
|
||||
assert!(
|
||||
codex_home
|
||||
.path()
|
||||
.join("models_cache")
|
||||
.join(APP_SERVER_CACHE_ORIGINATOR)
|
||||
.join("models_cache.json")
|
||||
.exists()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_start_emits_user_message_item_with_text_elements() -> Result<()> {
|
||||
let responses = vec![create_final_assistant_message_sse_response("Done")?];
|
||||
|
||||
@@ -49,7 +49,6 @@ codex-utils-absolute-path = { workspace = true }
|
||||
codex-utils-home-dir = { workspace = true }
|
||||
codex-utils-pty = { workspace = true }
|
||||
codex-utils-readiness = { workspace = true }
|
||||
codex-utils-sanitizer = { workspace = true }
|
||||
codex-utils-string = { workspace = true }
|
||||
codex-windows-sandbox = { package = "codex-windows-sandbox", path = "../windows-sandbox-rs" }
|
||||
dirs = { workspace = true }
|
||||
@@ -68,6 +67,7 @@ notify = { workspace = true }
|
||||
once_cell = { workspace = true }
|
||||
os_info = { workspace = true }
|
||||
rand = { workspace = true }
|
||||
regex = { workspace = true }
|
||||
regex-lite = { workspace = true }
|
||||
reqwest = { workspace = true, features = ["json", "stream"] }
|
||||
rmcp = { workspace = true, default-features = false, features = [
|
||||
|
||||
@@ -1411,27 +1411,6 @@ impl Session {
|
||||
state.clear_mcp_tool_selection();
|
||||
}
|
||||
|
||||
// Merges connector IDs into the session-level explicit connector selection.
|
||||
pub(crate) async fn merge_connector_selection(
|
||||
&self,
|
||||
connector_ids: HashSet<String>,
|
||||
) -> HashSet<String> {
|
||||
let mut state = self.state.lock().await;
|
||||
state.merge_connector_selection(connector_ids)
|
||||
}
|
||||
|
||||
// Returns the connector IDs currently selected for this session.
|
||||
pub(crate) async fn get_connector_selection(&self) -> HashSet<String> {
|
||||
let state = self.state.lock().await;
|
||||
state.get_connector_selection()
|
||||
}
|
||||
|
||||
// Clears connector IDs that were accumulated for explicit selection.
|
||||
pub(crate) async fn clear_connector_selection(&self) {
|
||||
let mut state = self.state.lock().await;
|
||||
state.clear_connector_selection();
|
||||
}
|
||||
|
||||
async fn record_initial_history(&self, conversation_history: InitialHistory) {
|
||||
let turn_context = self.new_default_turn().await;
|
||||
match conversation_history {
|
||||
@@ -4077,7 +4056,7 @@ pub(crate) async fn run_turn(
|
||||
.await,
|
||||
);
|
||||
|
||||
let available_connectors = if turn_context.config.features.enabled(Feature::Apps) {
|
||||
let connector_slug_counts = if turn_context.config.features.enabled(Feature::Apps) {
|
||||
let mcp_tools = match sess
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
@@ -4090,16 +4069,11 @@ pub(crate) async fn run_turn(
|
||||
Ok(mcp_tools) => mcp_tools,
|
||||
Err(_) => return None,
|
||||
};
|
||||
connectors::accessible_connectors_from_mcp_tools(&mcp_tools)
|
||||
let connectors = connectors::accessible_connectors_from_mcp_tools(&mcp_tools);
|
||||
build_connector_slug_counts(&connectors)
|
||||
} else {
|
||||
Vec::new()
|
||||
HashMap::new()
|
||||
};
|
||||
let connector_slug_counts = build_connector_slug_counts(&available_connectors);
|
||||
let skill_name_counts_lower = skills_outcome
|
||||
.as_ref()
|
||||
.map_or_else(HashMap::new, |outcome| {
|
||||
build_skill_name_counts(&outcome.skills, &outcome.disabled_paths).1
|
||||
});
|
||||
let mentioned_skills = skills_outcome.as_ref().map_or_else(Vec::new, |outcome| {
|
||||
collect_explicit_skill_mentions(
|
||||
&input,
|
||||
@@ -4108,6 +4082,7 @@ pub(crate) async fn run_turn(
|
||||
&connector_slug_counts,
|
||||
)
|
||||
});
|
||||
let explicitly_enabled_connectors = collect_explicit_app_ids(&input);
|
||||
let config = turn_context.config.clone();
|
||||
if config
|
||||
.features
|
||||
@@ -4144,15 +4119,6 @@ pub(crate) async fn run_turn(
|
||||
.await;
|
||||
}
|
||||
|
||||
let mut explicitly_enabled_connectors = collect_explicit_app_ids(&input);
|
||||
explicitly_enabled_connectors.extend(collect_explicit_app_ids_from_skill_items(
|
||||
&skill_items,
|
||||
&available_connectors,
|
||||
&skill_name_counts_lower,
|
||||
));
|
||||
sess.merge_connector_selection(explicitly_enabled_connectors.clone())
|
||||
.await;
|
||||
|
||||
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input.clone());
|
||||
let response_item: ResponseItem = initial_input_for_turn.clone().into();
|
||||
sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), &input, response_item)
|
||||
@@ -4383,57 +4349,6 @@ async fn run_auto_compact(sess: &Arc<Session>, turn_context: &Arc<TurnContext>)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn collect_explicit_app_ids_from_skill_items(
|
||||
skill_items: &[ResponseItem],
|
||||
connectors: &[connectors::AppInfo],
|
||||
skill_name_counts_lower: &HashMap<String, usize>,
|
||||
) -> HashSet<String> {
|
||||
if skill_items.is_empty() || connectors.is_empty() {
|
||||
return HashSet::new();
|
||||
}
|
||||
|
||||
let skill_messages = skill_items
|
||||
.iter()
|
||||
.filter_map(|item| match item {
|
||||
ResponseItem::Message { content, .. } => {
|
||||
content.iter().find_map(|content_item| match content_item {
|
||||
ContentItem::InputText { text } => Some(text.clone()),
|
||||
_ => None,
|
||||
})
|
||||
}
|
||||
_ => None,
|
||||
})
|
||||
.collect::<Vec<String>>();
|
||||
if skill_messages.is_empty() {
|
||||
return HashSet::new();
|
||||
}
|
||||
|
||||
let mentions = collect_tool_mentions_from_messages(&skill_messages);
|
||||
let mention_names_lower = mentions
|
||||
.plain_names
|
||||
.iter()
|
||||
.map(|name| name.to_ascii_lowercase())
|
||||
.collect::<HashSet<String>>();
|
||||
let mut connector_ids = mentions
|
||||
.paths
|
||||
.iter()
|
||||
.filter(|path| tool_kind_for_path(path) == ToolMentionKind::App)
|
||||
.filter_map(|path| app_id_from_path(path).map(str::to_string))
|
||||
.collect::<HashSet<String>>();
|
||||
|
||||
let connector_slug_counts = build_connector_slug_counts(connectors);
|
||||
for connector in connectors {
|
||||
let slug = connectors::connector_mention_slug(connector);
|
||||
let connector_count = connector_slug_counts.get(&slug).copied().unwrap_or(0);
|
||||
let skill_count = skill_name_counts_lower.get(&slug).copied().unwrap_or(0);
|
||||
if connector_count == 1 && skill_count == 0 && mention_names_lower.contains(&slug) {
|
||||
connector_ids.insert(connector.id.clone());
|
||||
}
|
||||
}
|
||||
|
||||
connector_ids
|
||||
}
|
||||
|
||||
fn filter_connectors_for_input(
|
||||
connectors: Vec<connectors::AppInfo>,
|
||||
input: &[ResponseItem],
|
||||
@@ -4679,9 +4594,6 @@ async fn built_tools(
|
||||
.or_cancel(cancellation_token)
|
||||
.await?;
|
||||
|
||||
let mut effective_explicitly_enabled_connectors = explicitly_enabled_connectors.clone();
|
||||
effective_explicitly_enabled_connectors.extend(sess.get_connector_selection().await);
|
||||
|
||||
let connectors_for_tools = if turn_context.config.features.enabled(Feature::Apps) {
|
||||
let skill_name_counts_lower = skills_outcome.map_or_else(HashMap::new, |outcome| {
|
||||
build_skill_name_counts(&outcome.skills, &outcome.disabled_paths).1
|
||||
@@ -4690,7 +4602,7 @@ async fn built_tools(
|
||||
Some(filter_connectors_for_input(
|
||||
connectors,
|
||||
input,
|
||||
&effective_explicitly_enabled_connectors,
|
||||
explicitly_enabled_connectors,
|
||||
&skill_name_counts_lower,
|
||||
))
|
||||
} else {
|
||||
@@ -5530,18 +5442,6 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
fn skill_message(text: &str) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::InputText {
|
||||
text: text.to_string(),
|
||||
}],
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn make_connector(id: &str, name: &str) -> AppInfo {
|
||||
AppInfo {
|
||||
id: id.to_string(),
|
||||
@@ -5674,49 +5574,6 @@ mod tests {
|
||||
assert_eq!(selected, Vec::new());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn collect_explicit_app_ids_from_skill_items_includes_linked_mentions() {
|
||||
let connectors = vec![make_connector("calendar", "Calendar")];
|
||||
let skill_items = vec\n</skill>",
|
||||
)];
|
||||
|
||||
let connector_ids =
|
||||
collect_explicit_app_ids_from_skill_items(&skill_items, &connectors, &HashMap::new());
|
||||
|
||||
assert_eq!(connector_ids, HashSet::from(["calendar".to_string()]));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn collect_explicit_app_ids_from_skill_items_resolves_unambiguous_plain_mentions() {
|
||||
let connectors = vec![make_connector("calendar", "Calendar")];
|
||||
let skill_items = vec![skill_message(
|
||||
"<skill>\n<name>demo</name>\n<path>/tmp/skills/demo/SKILL.md</path>\nuse $calendar\n</skill>",
|
||||
)];
|
||||
|
||||
let connector_ids =
|
||||
collect_explicit_app_ids_from_skill_items(&skill_items, &connectors, &HashMap::new());
|
||||
|
||||
assert_eq!(connector_ids, HashSet::from(["calendar".to_string()]));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn collect_explicit_app_ids_from_skill_items_skips_plain_mentions_with_skill_conflicts() {
|
||||
let connectors = vec![make_connector("calendar", "Calendar")];
|
||||
let skill_items = vec![skill_message(
|
||||
"<skill>\n<name>demo</name>\n<path>/tmp/skills/demo/SKILL.md</path>\nuse $calendar\n</skill>",
|
||||
)];
|
||||
let skill_name_counts_lower = HashMap::from([("calendar".to_string(), 1)]);
|
||||
|
||||
let connector_ids = collect_explicit_app_ids_from_skill_items(
|
||||
&skill_items,
|
||||
&connectors,
|
||||
&skill_name_counts_lower,
|
||||
);
|
||||
|
||||
assert_eq!(connector_ids, HashSet::<String>::new());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn search_tool_selection_keeps_codex_apps_tools_without_mentions() {
|
||||
let selected_tool_names = vec![
|
||||
|
||||
@@ -4,20 +4,16 @@
|
||||
//! - Phase 1: select rollouts, extract stage-1 raw memories, persist stage-1 outputs, and enqueue consolidation.
|
||||
//! - Phase 2: claim a global consolidation lock, materialize consolidation inputs, and dispatch one consolidation agent.
|
||||
|
||||
mod dispatch;
|
||||
mod phase1;
|
||||
mod phase2;
|
||||
pub(crate) mod prompts;
|
||||
mod start;
|
||||
mod stage_one;
|
||||
mod startup;
|
||||
mod storage;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
/// Starts the memory startup pipeline for eligible root sessions.
|
||||
/// This is the single entrypoint that `codex` uses to trigger memory startup.
|
||||
///
|
||||
/// This is the entry point to read and understand this module.
|
||||
pub(crate) use start::start_memories_startup_task;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
|
||||
mod artifacts {
|
||||
pub(super) const ROLLOUT_SUMMARIES_SUBDIR: &str = "rollout_summaries";
|
||||
@@ -26,12 +22,10 @@ mod artifacts {
|
||||
|
||||
/// Phase 1 (startup extraction).
|
||||
mod phase_one {
|
||||
/// Prompt used for phase 1.
|
||||
pub(super) const PROMPT: &str = include_str!("../../templates/memories/stage_one_system.md");
|
||||
/// Maximum number of rollout candidates processed per startup pass.
|
||||
pub(super) const MAX_ROLLOUTS_PER_STARTUP: usize = 8;
|
||||
pub(super) const MAX_ROLLOUTS_PER_STARTUP: usize = 64;
|
||||
/// Concurrency cap for startup memory extraction and consolidation scheduling.
|
||||
pub(super) const CONCURRENCY_LIMIT: usize = 8;
|
||||
pub(super) const CONCURRENCY_LIMIT: usize = MAX_ROLLOUTS_PER_STARTUP;
|
||||
/// Fallback stage-1 rollout truncation limit (tokens) when model metadata
|
||||
/// does not include a valid context window.
|
||||
pub(super) const DEFAULT_STAGE_ONE_ROLLOUT_TOKEN_LIMIT: usize = 150_000;
|
||||
@@ -52,8 +46,6 @@ mod phase_one {
|
||||
pub(super) const JOB_LEASE_SECONDS: i64 = 3_600;
|
||||
/// Backoff delay (seconds) before retrying a failed stage-1 extraction job.
|
||||
pub(super) const JOB_RETRY_DELAY_SECONDS: i64 = 3_600;
|
||||
/// Maximum number of threads to scan.
|
||||
pub(super) const THREAD_SCAN_LIMIT: usize = 5_000;
|
||||
}
|
||||
|
||||
/// Phase 2 (aka `Consolidation`).
|
||||
@@ -68,7 +60,7 @@ mod phase_two {
|
||||
/// job.
|
||||
pub(super) const JOB_RETRY_DELAY_SECONDS: i64 = 3_600;
|
||||
/// Heartbeat interval (seconds) for phase-2 running jobs.
|
||||
pub(super) const JOB_HEARTBEAT_SECONDS: u64 = 90;
|
||||
pub(super) const JOB_HEARTBEAT_SECONDS: u64 = 30;
|
||||
}
|
||||
|
||||
mod metrics {
|
||||
@@ -82,9 +74,6 @@ mod metrics {
|
||||
pub(super) const MEMORY_PHASE_TWO_INPUT: &str = "codex.memory.phase2.input";
|
||||
}
|
||||
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
|
||||
pub fn memory_root(codex_home: &Path) -> PathBuf {
|
||||
codex_home.join("memories")
|
||||
}
|
||||
@@ -100,3 +89,8 @@ fn raw_memories_file(root: &Path) -> PathBuf {
|
||||
async fn ensure_layout(root: &Path) -> std::io::Result<()> {
|
||||
tokio::fs::create_dir_all(rollout_summaries_dir(root)).await
|
||||
}
|
||||
|
||||
/// Starts the memory startup pipeline for eligible root sessions.
|
||||
///
|
||||
/// This is the single entrypoint that `codex` uses to trigger memory startup.
|
||||
pub(crate) use startup::start_memories_startup_task;
|
||||
|
||||
@@ -1,466 +0,0 @@
|
||||
use crate::Prompt;
|
||||
use crate::RolloutRecorder;
|
||||
use crate::codex::Session;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::error::CodexErr;
|
||||
use crate::memories::metrics;
|
||||
use crate::memories::phase_one;
|
||||
use crate::memories::prompts::build_stage_one_input_message;
|
||||
use crate::rollout::INTERACTIVE_SESSION_SOURCES;
|
||||
use crate::rollout::policy::should_persist_response_item_for_memories;
|
||||
use codex_api::ResponseEvent;
|
||||
use codex_otel::OtelManager;
|
||||
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
|
||||
use codex_protocol::models::BaseInstructions;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::openai_models::ModelInfo;
|
||||
use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_utils_sanitizer::redact_secrets;
|
||||
use futures::StreamExt;
|
||||
use serde::Deserialize;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub(in crate::memories) struct Phase1RequestContext {
|
||||
pub(in crate::memories) model_info: ModelInfo,
|
||||
pub(in crate::memories) otel_manager: OtelManager,
|
||||
pub(in crate::memories) reasoning_effort: Option<ReasoningEffortConfig>,
|
||||
pub(in crate::memories) reasoning_summary: ReasoningSummaryConfig,
|
||||
pub(in crate::memories) turn_metadata_header: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
enum PhaseOneJobOutcome {
|
||||
SucceededWithOutput,
|
||||
SucceededNoOutput,
|
||||
Failed,
|
||||
}
|
||||
|
||||
struct PhaseOneOutcomeCounts {
|
||||
claimed: usize,
|
||||
succeeded_with_output: usize,
|
||||
succeeded_no_output: usize,
|
||||
failed: usize,
|
||||
}
|
||||
|
||||
/// Phase 1 model output payload.
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
struct StageOneOutput {
|
||||
/// Detailed markdown raw memory for a single rollout.
|
||||
#[serde(rename = "raw_memory")]
|
||||
pub(crate) raw_memory: String,
|
||||
/// Compact summary line used for routing and indexing.
|
||||
#[serde(rename = "rollout_summary")]
|
||||
pub(crate) rollout_summary: String,
|
||||
/// Optional slug accepted from stage-1 output for forward compatibility.
|
||||
/// This is currently ignored by downstream storage and naming, which remain thread-id based.
|
||||
#[serde(default, rename = "rollout_slug")]
|
||||
pub(crate) _rollout_slug: Option<String>,
|
||||
}
|
||||
|
||||
/// Runs memory phase 1 in strict step order:
|
||||
/// 1) claim eligible rollout jobs
|
||||
/// 2) build one stage-1 request context
|
||||
/// 3) run stage-1 extraction jobs in parallel
|
||||
/// 4) emit metrics and logs
|
||||
pub(in crate::memories) async fn run(session: &Arc<Session>) {
|
||||
// 1. Claim startup job.
|
||||
let Some(claimed_candidates) = claim_startup_jobs(session).await else {
|
||||
return;
|
||||
};
|
||||
if claimed_candidates.is_empty() {
|
||||
session.services.otel_manager.counter(
|
||||
metrics::MEMORY_PHASE_ONE_JOBS,
|
||||
1,
|
||||
&[("status", "skipped_no_candidates")],
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. Build request.
|
||||
let stage_one_context = build_request_context(session).await;
|
||||
|
||||
// 3. Run the parallel sampling.
|
||||
let outcomes = run_jobs(session, claimed_candidates, stage_one_context).await;
|
||||
|
||||
// 4. Metrics and logs.
|
||||
let counts = count_outcomes(outcomes);
|
||||
emit_metrics(session, &counts);
|
||||
info!(
|
||||
"memory stage-1 extraction complete: {} job(s) claimed, {} succeeded ({} with output, {} no output), {} failed",
|
||||
counts.claimed,
|
||||
counts.succeeded_with_output + counts.succeeded_no_output,
|
||||
counts.succeeded_with_output,
|
||||
counts.succeeded_no_output,
|
||||
counts.failed
|
||||
);
|
||||
}
|
||||
|
||||
/// JSON schema used to constrain phase-1 model output.
|
||||
pub fn output_schema() -> Value {
|
||||
json!({
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"rollout_summary": { "type": "string" },
|
||||
"rollout_slug": { "type": "string" },
|
||||
"raw_memory": { "type": "string" }
|
||||
},
|
||||
"required": ["rollout_summary", "rollout_slug", "raw_memory"],
|
||||
"additionalProperties": false
|
||||
})
|
||||
}
|
||||
|
||||
impl Phase1RequestContext {
|
||||
pub(in crate::memories) fn from_turn_context(
|
||||
turn_context: &TurnContext,
|
||||
turn_metadata_header: Option<String>,
|
||||
) -> Self {
|
||||
Self {
|
||||
model_info: turn_context.model_info.clone(),
|
||||
otel_manager: turn_context.otel_manager.clone(),
|
||||
reasoning_effort: turn_context.reasoning_effort,
|
||||
reasoning_summary: turn_context.reasoning_summary,
|
||||
turn_metadata_header,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn claim_startup_jobs(session: &Arc<Session>) -> Option<Vec<codex_state::Stage1JobClaim>> {
|
||||
let Some(state_db) = session.services.state_db.as_deref() else {
|
||||
// This should not happen.
|
||||
warn!("state db unavailable while claiming phase-1 startup jobs; skipping");
|
||||
return None;
|
||||
};
|
||||
|
||||
let allowed_sources = INTERACTIVE_SESSION_SOURCES
|
||||
.iter()
|
||||
.map(ToString::to_string)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
match state_db
|
||||
.claim_stage1_jobs_for_startup(
|
||||
session.conversation_id,
|
||||
codex_state::Stage1StartupClaimParams {
|
||||
scan_limit: phase_one::THREAD_SCAN_LIMIT,
|
||||
max_claimed: phase_one::MAX_ROLLOUTS_PER_STARTUP,
|
||||
max_age_days: phase_one::MAX_ROLLOUT_AGE_DAYS,
|
||||
min_rollout_idle_hours: phase_one::MIN_ROLLOUT_IDLE_HOURS,
|
||||
allowed_sources: allowed_sources.as_slice(),
|
||||
lease_seconds: phase_one::JOB_LEASE_SECONDS,
|
||||
},
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(claims) => Some(claims),
|
||||
Err(err) => {
|
||||
warn!("state db claim_stage1_jobs_for_startup failed during memories startup: {err}");
|
||||
session.services.otel_manager.counter(
|
||||
metrics::MEMORY_PHASE_ONE_JOBS,
|
||||
1,
|
||||
&[("status", "failed_claim")],
|
||||
);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn build_request_context(session: &Arc<Session>) -> Phase1RequestContext {
|
||||
let turn_context = session.new_default_turn().await;
|
||||
Phase1RequestContext::from_turn_context(
|
||||
turn_context.as_ref(),
|
||||
turn_context.resolve_turn_metadata_header().await,
|
||||
)
|
||||
}
|
||||
|
||||
async fn run_jobs(
|
||||
session: &Arc<Session>,
|
||||
claimed_candidates: Vec<codex_state::Stage1JobClaim>,
|
||||
stage_one_context: Phase1RequestContext,
|
||||
) -> Vec<PhaseOneJobOutcome> {
|
||||
futures::stream::iter(claimed_candidates.into_iter())
|
||||
.map(|claim| {
|
||||
let session = Arc::clone(session);
|
||||
let stage_one_context = stage_one_context.clone();
|
||||
async move { job::run(session.as_ref(), claim, &stage_one_context).await }
|
||||
})
|
||||
.buffer_unordered(phase_one::CONCURRENCY_LIMIT)
|
||||
.collect::<Vec<_>>()
|
||||
.await
|
||||
}
|
||||
|
||||
mod job {
|
||||
use super::*;
|
||||
|
||||
pub(in crate::memories) async fn run(
|
||||
session: &Session,
|
||||
claim: codex_state::Stage1JobClaim,
|
||||
stage_one_context: &Phase1RequestContext,
|
||||
) -> PhaseOneJobOutcome {
|
||||
let thread = claim.thread;
|
||||
let stage_one_output = match sample(
|
||||
session,
|
||||
&thread.rollout_path,
|
||||
&thread.cwd,
|
||||
stage_one_context,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(output) => output,
|
||||
Err(reason) => {
|
||||
result::failed(
|
||||
session,
|
||||
thread.id,
|
||||
&claim.ownership_token,
|
||||
&reason.to_string(),
|
||||
)
|
||||
.await;
|
||||
return PhaseOneJobOutcome::Failed;
|
||||
}
|
||||
};
|
||||
|
||||
if stage_one_output.raw_memory.is_empty() || stage_one_output.rollout_summary.is_empty() {
|
||||
return result::no_output(session, thread.id, &claim.ownership_token).await;
|
||||
}
|
||||
|
||||
result::success(
|
||||
session,
|
||||
thread.id,
|
||||
&claim.ownership_token,
|
||||
thread.updated_at.timestamp(),
|
||||
&stage_one_output.raw_memory,
|
||||
&stage_one_output.rollout_summary,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Extract the rollout and perform the actual sampling.
|
||||
async fn sample(
|
||||
session: &Session,
|
||||
rollout_path: &Path,
|
||||
rollout_cwd: &Path,
|
||||
stage_one_context: &Phase1RequestContext,
|
||||
) -> anyhow::Result<StageOneOutput> {
|
||||
let (rollout_items, _, _) = RolloutRecorder::load_rollout_items(rollout_path).await?;
|
||||
let rollout_contents = serialize_filtered_rollout_response_items(&rollout_items)?;
|
||||
|
||||
let prompt = Prompt {
|
||||
input: vec![ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::InputText {
|
||||
text: build_stage_one_input_message(
|
||||
&stage_one_context.model_info,
|
||||
rollout_path,
|
||||
rollout_cwd,
|
||||
&rollout_contents,
|
||||
)?,
|
||||
}],
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
}],
|
||||
tools: Vec::new(),
|
||||
parallel_tool_calls: false,
|
||||
base_instructions: BaseInstructions {
|
||||
text: phase_one::PROMPT.to_string(),
|
||||
},
|
||||
personality: None,
|
||||
output_schema: Some(output_schema()),
|
||||
};
|
||||
|
||||
let mut client_session = session.services.model_client.new_session();
|
||||
let mut stream = client_session
|
||||
.stream(
|
||||
&prompt,
|
||||
&stage_one_context.model_info,
|
||||
&stage_one_context.otel_manager,
|
||||
stage_one_context.reasoning_effort,
|
||||
stage_one_context.reasoning_summary,
|
||||
stage_one_context.turn_metadata_header.as_deref(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// TODO(jif) we should have a shared helper somewhere for this.
|
||||
// Unwrap the stream.
|
||||
let mut result = String::new();
|
||||
while let Some(message) = stream.next().await.transpose()? {
|
||||
match message {
|
||||
ResponseEvent::OutputTextDelta(delta) => result.push_str(&delta),
|
||||
ResponseEvent::OutputItemDone(item) => {
|
||||
if result.is_empty()
|
||||
&& let ResponseItem::Message { content, .. } = item
|
||||
&& let Some(text) = crate::compact::content_items_to_text(&content)
|
||||
{
|
||||
result.push_str(&text);
|
||||
}
|
||||
}
|
||||
ResponseEvent::Completed { .. } => break,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
let mut output: StageOneOutput = serde_json::from_str(&result)?;
|
||||
output.raw_memory = redact_secrets(output.raw_memory);
|
||||
output.rollout_summary = redact_secrets(output.rollout_summary);
|
||||
|
||||
Ok(output)
|
||||
}
|
||||
|
||||
mod result {
|
||||
use super::*;
|
||||
|
||||
pub(in crate::memories) async fn failed(
|
||||
session: &Session,
|
||||
thread_id: codex_protocol::ThreadId,
|
||||
ownership_token: &str,
|
||||
reason: &str,
|
||||
) {
|
||||
tracing::warn!("Phase 1 job failed for thread {thread_id}: {reason}");
|
||||
if let Some(state_db) = session.services.state_db.as_deref() {
|
||||
let _ = state_db
|
||||
.mark_stage1_job_failed(
|
||||
thread_id,
|
||||
ownership_token,
|
||||
reason,
|
||||
phase_one::JOB_RETRY_DELAY_SECONDS,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
pub(in crate::memories) async fn no_output(
|
||||
session: &Session,
|
||||
thread_id: codex_protocol::ThreadId,
|
||||
ownership_token: &str,
|
||||
) -> PhaseOneJobOutcome {
|
||||
let Some(state_db) = session.services.state_db.as_deref() else {
|
||||
return PhaseOneJobOutcome::Failed;
|
||||
};
|
||||
|
||||
if state_db
|
||||
.mark_stage1_job_succeeded_no_output(thread_id, ownership_token)
|
||||
.await
|
||||
.unwrap_or(false)
|
||||
{
|
||||
PhaseOneJobOutcome::SucceededNoOutput
|
||||
} else {
|
||||
PhaseOneJobOutcome::Failed
|
||||
}
|
||||
}
|
||||
|
||||
pub(in crate::memories) async fn success(
|
||||
session: &Session,
|
||||
thread_id: codex_protocol::ThreadId,
|
||||
ownership_token: &str,
|
||||
source_updated_at: i64,
|
||||
raw_memory: &str,
|
||||
rollout_summary: &str,
|
||||
) -> PhaseOneJobOutcome {
|
||||
let Some(state_db) = session.services.state_db.as_deref() else {
|
||||
return PhaseOneJobOutcome::Failed;
|
||||
};
|
||||
|
||||
if state_db
|
||||
.mark_stage1_job_succeeded(
|
||||
thread_id,
|
||||
ownership_token,
|
||||
source_updated_at,
|
||||
raw_memory,
|
||||
rollout_summary,
|
||||
)
|
||||
.await
|
||||
.unwrap_or(false)
|
||||
{
|
||||
PhaseOneJobOutcome::SucceededWithOutput
|
||||
} else {
|
||||
PhaseOneJobOutcome::Failed
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Serializes filtered stage-1 memory items for prompt inclusion.
|
||||
fn serialize_filtered_rollout_response_items(
|
||||
items: &[RolloutItem],
|
||||
) -> crate::error::Result<String> {
|
||||
let filtered = items
|
||||
.iter()
|
||||
.filter_map(|item| {
|
||||
if let RolloutItem::ResponseItem(item) = item
|
||||
&& should_persist_response_item_for_memories(item)
|
||||
{
|
||||
Some(item.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
serde_json::to_string(&filtered).map_err(|err| {
|
||||
CodexErr::InvalidRequest(format!("failed to serialize rollout memory: {err}"))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn count_outcomes(outcomes: Vec<PhaseOneJobOutcome>) -> PhaseOneOutcomeCounts {
|
||||
let succeeded_with_output = outcomes
|
||||
.iter()
|
||||
.filter(|outcome| matches!(outcome, PhaseOneJobOutcome::SucceededWithOutput))
|
||||
.count();
|
||||
let succeeded_no_output = outcomes
|
||||
.iter()
|
||||
.filter(|outcome| matches!(outcome, PhaseOneJobOutcome::SucceededNoOutput))
|
||||
.count();
|
||||
let failed = outcomes
|
||||
.iter()
|
||||
.filter(|outcome| matches!(outcome, PhaseOneJobOutcome::Failed))
|
||||
.count();
|
||||
|
||||
PhaseOneOutcomeCounts {
|
||||
claimed: outcomes.len(),
|
||||
succeeded_with_output,
|
||||
succeeded_no_output,
|
||||
failed,
|
||||
}
|
||||
}
|
||||
|
||||
fn emit_metrics(session: &Session, counts: &PhaseOneOutcomeCounts) {
|
||||
if counts.claimed > 0 {
|
||||
session.services.otel_manager.counter(
|
||||
metrics::MEMORY_PHASE_ONE_JOBS,
|
||||
counts.claimed as i64,
|
||||
&[("status", "claimed")],
|
||||
);
|
||||
}
|
||||
if counts.succeeded_with_output > 0 {
|
||||
session.services.otel_manager.counter(
|
||||
metrics::MEMORY_PHASE_ONE_JOBS,
|
||||
counts.succeeded_with_output as i64,
|
||||
&[("status", "succeeded")],
|
||||
);
|
||||
session.services.otel_manager.counter(
|
||||
metrics::MEMORY_PHASE_ONE_OUTPUT,
|
||||
counts.succeeded_with_output as i64,
|
||||
&[],
|
||||
);
|
||||
}
|
||||
if counts.succeeded_no_output > 0 {
|
||||
session.services.otel_manager.counter(
|
||||
metrics::MEMORY_PHASE_ONE_JOBS,
|
||||
counts.succeeded_no_output as i64,
|
||||
&[("status", "succeeded_no_output")],
|
||||
);
|
||||
}
|
||||
if counts.failed > 0 {
|
||||
session.services.otel_manager.counter(
|
||||
metrics::MEMORY_PHASE_ONE_JOBS,
|
||||
counts.failed as i64,
|
||||
&[("status", "failed")],
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -30,6 +30,7 @@ struct MemoryToolDeveloperInstructionsTemplate<'a> {
|
||||
}
|
||||
|
||||
/// Builds the consolidation subagent prompt for a specific memory root.
|
||||
///
|
||||
pub(super) fn build_consolidation_prompt(memory_root: &Path) -> String {
|
||||
let memory_root = memory_root.display().to_string();
|
||||
let template = ConsolidationPromptTemplate {
|
||||
@@ -73,9 +74,6 @@ pub(super) fn build_stage_one_input_message(
|
||||
.render()?)
|
||||
}
|
||||
|
||||
/// Build prompt used for read path. This prompt must be added to the developer instructions. In
|
||||
/// case of large memory files, the `memory_summary.md` is truncated at
|
||||
/// [phase_one::MEMORY_TOOL_DEVELOPER_INSTRUCTIONS_SUMMARY_TOKEN_LIMIT].
|
||||
pub(crate) async fn build_memory_tool_developer_instructions(codex_home: &Path) -> Option<String> {
|
||||
let base_path = memory_root(codex_home);
|
||||
let memory_summary_path = base_path.join("memory_summary.md");
|
||||
|
||||
210
codex-rs/core/src/memories/stage_one.rs
Normal file
210
codex-rs/core/src/memories/stage_one.rs
Normal file
@@ -0,0 +1,210 @@
|
||||
use crate::error::CodexErr;
|
||||
use crate::error::Result;
|
||||
use once_cell::sync::Lazy;
|
||||
use regex::Regex;
|
||||
use serde::Deserialize;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
|
||||
/// System prompt for stage-1 raw memory extraction.
|
||||
pub(super) const RAW_MEMORY_PROMPT: &str =
|
||||
include_str!("../../templates/memories/stage_one_system.md");
|
||||
|
||||
static OPENAI_KEY_REGEX: Lazy<Regex> = Lazy::new(|| compile_regex(r"sk-[A-Za-z0-9]{20,}"));
|
||||
static AWS_ACCESS_KEY_ID_REGEX: Lazy<Regex> = Lazy::new(|| compile_regex(r"\bAKIA[0-9A-Z]{16}\b"));
|
||||
static BEARER_TOKEN_REGEX: Lazy<Regex> =
|
||||
Lazy::new(|| compile_regex(r"(?i)\bBearer\s+[A-Za-z0-9._\-]{16,}\b"));
|
||||
static SECRET_ASSIGNMENT_REGEX: Lazy<Regex> = Lazy::new(|| {
|
||||
compile_regex(r#"(?i)\b(api[_-]?key|token|secret|password)\b(\s*[:=]\s*)(["']?)[^\s"']{8,}"#)
|
||||
});
|
||||
|
||||
/// Parsed stage-1 model output payload.
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub(super) struct StageOneOutput {
|
||||
/// Detailed markdown raw memory for a single rollout.
|
||||
#[serde(rename = "raw_memory")]
|
||||
pub(crate) raw_memory: String,
|
||||
/// Compact summary line used for routing and indexing.
|
||||
#[serde(rename = "rollout_summary")]
|
||||
pub(crate) rollout_summary: String,
|
||||
/// Optional slug accepted from stage-1 output for forward compatibility.
|
||||
///
|
||||
/// This is currently ignored by downstream storage and naming, which remain
|
||||
/// thread-id based.
|
||||
#[serde(default, rename = "rollout_slug")]
|
||||
pub(crate) _rollout_slug: Option<String>,
|
||||
}
|
||||
|
||||
/// JSON schema used to constrain stage-1 model output.
|
||||
pub(super) fn stage_one_output_schema() -> Value {
|
||||
json!({
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"rollout_summary": { "type": "string" },
|
||||
"rollout_slug": { "type": "string" },
|
||||
"raw_memory": { "type": "string" }
|
||||
},
|
||||
"required": ["rollout_summary", "rollout_slug", "raw_memory"],
|
||||
"additionalProperties": false
|
||||
})
|
||||
}
|
||||
|
||||
/// Parses and normalizes stage-1 model output into a typed payload.
|
||||
///
|
||||
/// Accepts plain JSON objects, fenced JSON, and object snippets embedded in
|
||||
/// extra text, then enforces redaction and size limits.
|
||||
pub(super) fn parse_stage_one_output(raw: &str) -> Result<StageOneOutput> {
|
||||
let parsed = parse_json_object_loose(raw)?;
|
||||
let output: StageOneOutput = serde_json::from_value(parsed).map_err(|err| {
|
||||
CodexErr::InvalidRequest(format!("invalid stage-1 memory output JSON payload: {err}"))
|
||||
})?;
|
||||
normalize_stage_one_output(output)
|
||||
}
|
||||
|
||||
fn parse_json_object_loose(raw: &str) -> Result<Value> {
|
||||
let raw = raw.trim();
|
||||
|
||||
if let Ok(value) = serde_json::from_str::<Value>(raw)
|
||||
&& value.is_object()
|
||||
{
|
||||
return Ok(value);
|
||||
}
|
||||
|
||||
if let Some(fenced) = raw
|
||||
.strip_prefix("```json")
|
||||
.and_then(|s| s.strip_suffix("```"))
|
||||
.map(str::trim)
|
||||
&& let Ok(value) = serde_json::from_str::<Value>(fenced)
|
||||
&& value.is_object()
|
||||
{
|
||||
return Ok(value);
|
||||
}
|
||||
|
||||
if let Some(fenced) = raw
|
||||
.strip_prefix("```")
|
||||
.and_then(|s| s.strip_suffix("```"))
|
||||
.map(str::trim)
|
||||
&& let Ok(value) = serde_json::from_str::<Value>(fenced)
|
||||
&& value.is_object()
|
||||
{
|
||||
return Ok(value);
|
||||
}
|
||||
|
||||
if let (Some(start), Some(end)) = (raw.find('{'), raw.rfind('}'))
|
||||
&& start < end
|
||||
{
|
||||
let snippet = &raw[start..=end];
|
||||
if let Ok(value) = serde_json::from_str::<Value>(snippet)
|
||||
&& value.is_object()
|
||||
{
|
||||
return Ok(value);
|
||||
}
|
||||
}
|
||||
|
||||
Err(CodexErr::InvalidRequest(
|
||||
"unable to parse stage-1 memory JSON output".to_string(),
|
||||
))
|
||||
}
|
||||
|
||||
fn normalize_stage_one_output(mut output: StageOneOutput) -> Result<StageOneOutput> {
|
||||
output.raw_memory = output.raw_memory.trim().to_string();
|
||||
output.rollout_summary = output.rollout_summary.trim().to_string();
|
||||
output._rollout_slug = output
|
||||
._rollout_slug
|
||||
.map(|slug| slug.trim().to_string())
|
||||
.filter(|slug| !slug.is_empty());
|
||||
|
||||
if output.raw_memory.is_empty() && output.rollout_summary.is_empty() {
|
||||
// Empty pair is a deliberate "no meaningful signal" sentinel.
|
||||
return Ok(output);
|
||||
}
|
||||
|
||||
if output.raw_memory.is_empty() {
|
||||
return Err(CodexErr::InvalidRequest(
|
||||
"stage-1 memory output missing raw_memory".to_string(),
|
||||
));
|
||||
}
|
||||
if output.rollout_summary.is_empty() {
|
||||
return Err(CodexErr::InvalidRequest(
|
||||
"stage-1 memory output missing rollout_summary".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
output.raw_memory = redact_secrets(&output.raw_memory);
|
||||
output.rollout_summary = redact_secrets(&output.rollout_summary);
|
||||
|
||||
Ok(output)
|
||||
}
|
||||
|
||||
fn redact_secrets(input: &str) -> String {
|
||||
let redacted = OPENAI_KEY_REGEX.replace_all(input, "[REDACTED_SECRET]");
|
||||
let redacted = AWS_ACCESS_KEY_ID_REGEX.replace_all(&redacted, "[REDACTED_SECRET]");
|
||||
let redacted = BEARER_TOKEN_REGEX.replace_all(&redacted, "Bearer [REDACTED_SECRET]");
|
||||
|
||||
SECRET_ASSIGNMENT_REGEX
|
||||
.replace_all(&redacted, "$1$2$3[REDACTED_SECRET]")
|
||||
.to_string()
|
||||
}
|
||||
|
||||
fn compile_regex(pattern: &str) -> Regex {
|
||||
match Regex::new(pattern) {
|
||||
Ok(regex) => regex,
|
||||
// Panic is ok thanks to `load_regex` test.
|
||||
Err(err) => panic!("invalid regex pattern `{pattern}`: {err}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn load_regex() {
|
||||
// The goal of this test is just to compile all the regex to prevent the panic
|
||||
let _ = redact_secrets("secret");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn normalize_stage_one_output_redacts_summary() {
|
||||
let output = StageOneOutput {
|
||||
raw_memory: "Token: sk-abcdefghijklmnopqrstuvwxyz123456\nBearer abcdefghijklmnopqrstuvwxyz012345".to_string(),
|
||||
rollout_summary: "password = mysecret123456\n\nsmall".to_string(),
|
||||
_rollout_slug: None,
|
||||
};
|
||||
|
||||
let normalized = normalize_stage_one_output(output).expect("normalized");
|
||||
|
||||
assert!(normalized.raw_memory.contains("[REDACTED_SECRET]"));
|
||||
assert!(!normalized.rollout_summary.contains("mysecret123456"));
|
||||
assert_eq!(
|
||||
normalized.rollout_summary,
|
||||
"password = [REDACTED_SECRET]\n\nsmall"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn normalize_stage_one_output_allows_empty_pair_for_skip() {
|
||||
let output = StageOneOutput {
|
||||
raw_memory: String::new(),
|
||||
rollout_summary: String::new(),
|
||||
_rollout_slug: None,
|
||||
};
|
||||
|
||||
let normalized = normalize_stage_one_output(output).expect("normalized");
|
||||
assert_eq!(normalized.raw_memory, "");
|
||||
assert_eq!(normalized.rollout_summary, "");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn normalize_stage_one_output_rejects_partial_empty_values() {
|
||||
let output = StageOneOutput {
|
||||
raw_memory: String::new(),
|
||||
rollout_summary: "summary".to_string(),
|
||||
_rollout_slug: None,
|
||||
};
|
||||
|
||||
let err = normalize_stage_one_output(output).expect_err("should reject");
|
||||
assert_eq!(err.to_string(), "stage-1 memory output missing raw_memory");
|
||||
}
|
||||
}
|
||||
@@ -1,41 +0,0 @@
|
||||
use crate::codex::Session;
|
||||
use crate::config::Config;
|
||||
use crate::features::Feature;
|
||||
use crate::memories::phase1;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use std::sync::Arc;
|
||||
use tracing::warn;
|
||||
|
||||
/// Starts the asynchronous startup memory pipeline for an eligible root session.
|
||||
///
|
||||
/// The pipeline is skipped for ephemeral sessions, disabled feature flags, and
|
||||
/// subagent sessions.
|
||||
pub(crate) fn start_memories_startup_task(
|
||||
session: &Arc<Session>,
|
||||
config: Arc<Config>,
|
||||
source: &SessionSource,
|
||||
) {
|
||||
if config.ephemeral
|
||||
|| !config.features.enabled(Feature::MemoryTool)
|
||||
|| matches!(source, SessionSource::SubAgent(_))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
if session.services.state_db.is_none() {
|
||||
warn!("state db unavailable for memories startup pipeline; skipping");
|
||||
return;
|
||||
}
|
||||
|
||||
let weak_session = Arc::downgrade(session);
|
||||
tokio::spawn(async move {
|
||||
let Some(session) = weak_session.upgrade() else {
|
||||
return;
|
||||
};
|
||||
|
||||
// Run phase 1.
|
||||
phase1::run(&session).await;
|
||||
// Run phase 2.
|
||||
crate::memories::dispatch::run_global_memory_consolidation(&session, config).await;
|
||||
});
|
||||
}
|
||||
@@ -4,8 +4,8 @@ use crate::config::Constrained;
|
||||
use crate::memories::memory_root;
|
||||
use crate::memories::metrics;
|
||||
use crate::memories::phase_two;
|
||||
use crate::memories::phase2::spawn_phase2_completion_task;
|
||||
use crate::memories::prompts::build_consolidation_prompt;
|
||||
use crate::memories::startup::phase2::spawn_phase2_completion_task;
|
||||
use crate::memories::storage::rebuild_raw_memories_file_from_memories;
|
||||
use crate::memories::storage::sync_rollout_summaries_from_memories;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
@@ -19,8 +19,6 @@ use tracing::debug;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
|
||||
//TODO(jif) clean.
|
||||
|
||||
fn completion_watermark(
|
||||
claimed_watermark: i64,
|
||||
latest_memories: &[codex_state::Stage1Output],
|
||||
@@ -33,7 +31,7 @@ fn completion_watermark(
|
||||
.max(claimed_watermark)
|
||||
}
|
||||
|
||||
pub(in crate::memories) async fn run_global_memory_consolidation(
|
||||
pub(super) async fn run_global_memory_consolidation(
|
||||
session: &Arc<Session>,
|
||||
config: Arc<Config>,
|
||||
) -> bool {
|
||||
@@ -263,6 +261,7 @@ pub(in crate::memories) async fn run_global_memory_consolidation(
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::completion_watermark;
|
||||
use super::memory_root;
|
||||
use super::run_global_memory_consolidation;
|
||||
use crate::CodexAuth;
|
||||
use crate::ThreadManager;
|
||||
@@ -271,7 +270,6 @@ mod tests {
|
||||
use crate::codex::make_session_and_context;
|
||||
use crate::config::Config;
|
||||
use crate::config::test_config;
|
||||
use crate::memories::memory_root;
|
||||
use crate::memories::raw_memories_file;
|
||||
use crate::memories::rollout_summaries_dir;
|
||||
use chrono::Utc;
|
||||
@@ -284,7 +282,6 @@ mod tests {
|
||||
use codex_state::Stage1Output;
|
||||
use codex_state::ThreadMetadataBuilder;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use tempfile::TempDir;
|
||||
|
||||
@@ -403,7 +400,6 @@ mod tests {
|
||||
.expect("valid source_updated_at timestamp"),
|
||||
raw_memory: "raw memory".to_string(),
|
||||
rollout_summary: "rollout summary".to_string(),
|
||||
cwd: PathBuf::from("/tmp/workspace"),
|
||||
generated_at: chrono::DateTime::<Utc>::from_timestamp(124, 0)
|
||||
.expect("valid generated_at timestamp"),
|
||||
};
|
||||
199
codex-rs/core/src/memories/startup/extract.rs
Normal file
199
codex-rs/core/src/memories/startup/extract.rs
Normal file
@@ -0,0 +1,199 @@
|
||||
use crate::client_common::Prompt;
|
||||
use crate::client_common::ResponseEvent;
|
||||
use crate::client_common::ResponseStream;
|
||||
use crate::codex::Session;
|
||||
use crate::error::CodexErr;
|
||||
use crate::error::Result as CodexResult;
|
||||
use crate::rollout::RolloutRecorder;
|
||||
use codex_protocol::models::BaseInstructions;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use futures::StreamExt;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::memories::prompts::build_stage_one_input_message;
|
||||
use crate::memories::stage_one::RAW_MEMORY_PROMPT;
|
||||
use crate::memories::stage_one::StageOneOutput;
|
||||
use crate::memories::stage_one::parse_stage_one_output;
|
||||
use crate::memories::stage_one::stage_one_output_schema;
|
||||
use crate::memories::startup::StageOneRequestContext;
|
||||
use crate::rollout::policy::should_persist_response_item_for_memories;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use std::path::Path;
|
||||
|
||||
pub(super) async fn extract_stage_one_output(
|
||||
session: &Session,
|
||||
rollout_path: &Path,
|
||||
rollout_cwd: &Path,
|
||||
stage_one_context: &StageOneRequestContext,
|
||||
) -> Result<StageOneOutput, &'static str> {
|
||||
let (rollout_items, _thread_id, parse_errors) =
|
||||
match RolloutRecorder::load_rollout_items(rollout_path).await {
|
||||
Ok(result) => result,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to load rollout {} for memories: {err}",
|
||||
rollout_path.display()
|
||||
);
|
||||
return Err("failed to load rollout");
|
||||
}
|
||||
};
|
||||
if parse_errors > 0 {
|
||||
warn!(
|
||||
"rollout {} had {parse_errors} parse errors while preparing stage-1 memory input",
|
||||
rollout_path.display()
|
||||
);
|
||||
}
|
||||
|
||||
let rollout_contents = match serialize_filtered_rollout_response_items(&rollout_items) {
|
||||
Ok(contents) => contents,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to prepare filtered rollout payload {} for memories: {err}",
|
||||
rollout_path.display()
|
||||
);
|
||||
return Err("failed to serialize filtered rollout");
|
||||
}
|
||||
};
|
||||
|
||||
let prompt = Prompt {
|
||||
input: vec![ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::InputText {
|
||||
text: build_stage_one_input_message(
|
||||
&stage_one_context.model_info,
|
||||
rollout_path,
|
||||
rollout_cwd,
|
||||
&rollout_contents,
|
||||
)
|
||||
.map_err(|_e| "error while building the prompt")?,
|
||||
}],
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
}],
|
||||
tools: Vec::new(),
|
||||
parallel_tool_calls: false,
|
||||
base_instructions: BaseInstructions {
|
||||
text: RAW_MEMORY_PROMPT.to_string(),
|
||||
},
|
||||
personality: None,
|
||||
output_schema: Some(stage_one_output_schema()),
|
||||
};
|
||||
|
||||
let mut client_session = session.services.model_client.new_session();
|
||||
let mut stream = match client_session
|
||||
.stream(
|
||||
&prompt,
|
||||
&stage_one_context.model_info,
|
||||
&stage_one_context.otel_manager,
|
||||
stage_one_context.reasoning_effort,
|
||||
stage_one_context.reasoning_summary,
|
||||
stage_one_context.turn_metadata_header.as_deref(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(stream) => stream,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"stage-1 memory request failed for rollout {}: {err}",
|
||||
rollout_path.display()
|
||||
);
|
||||
return Err("stage-1 memory request failed");
|
||||
}
|
||||
};
|
||||
|
||||
let output_text = match collect_response_text_until_completed(&mut stream).await {
|
||||
Ok(text) => text,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed while waiting for stage-1 memory response for rollout {}: {err}",
|
||||
rollout_path.display()
|
||||
);
|
||||
return Err("stage-1 memory response stream failed");
|
||||
}
|
||||
};
|
||||
|
||||
match parse_stage_one_output(&output_text) {
|
||||
Ok(output) => Ok(output),
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"invalid stage-1 memory payload for rollout {}: {err}",
|
||||
rollout_path.display()
|
||||
);
|
||||
Err("invalid stage-1 memory payload")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn collect_response_text_until_completed(stream: &mut ResponseStream) -> CodexResult<String> {
|
||||
let mut output_text = String::new();
|
||||
|
||||
loop {
|
||||
let Some(event) = stream.next().await else {
|
||||
return Err(CodexErr::Stream(
|
||||
"stream closed before response.completed".to_string(),
|
||||
None,
|
||||
));
|
||||
};
|
||||
|
||||
match event? {
|
||||
ResponseEvent::OutputTextDelta(delta) => output_text.push_str(&delta),
|
||||
ResponseEvent::OutputItemDone(item) => {
|
||||
if output_text.is_empty()
|
||||
&& let ResponseItem::Message { content, .. } = item
|
||||
&& let Some(text) = crate::compact::content_items_to_text(&content)
|
||||
{
|
||||
output_text.push_str(&text);
|
||||
}
|
||||
}
|
||||
ResponseEvent::Completed { .. } => return Ok(output_text),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Serializes filtered stage-1 memory items for prompt inclusion.
|
||||
fn serialize_filtered_rollout_response_items(
|
||||
items: &[RolloutItem],
|
||||
) -> crate::error::Result<String> {
|
||||
let filtered = items
|
||||
.iter()
|
||||
.filter_map(|item| {
|
||||
if let RolloutItem::ResponseItem(item) = item
|
||||
&& should_persist_response_item_for_memories(item)
|
||||
{
|
||||
Some(item.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
serde_json::to_string(&filtered).map_err(|err| {
|
||||
CodexErr::InvalidRequest(format!("failed to serialize rollout memory: {err}"))
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn serialize_filtered_rollout_response_items_keeps_response_items_only() {
|
||||
let input = vec![RolloutItem::ResponseItem(ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::InputText {
|
||||
text: "user input".to_string(),
|
||||
}],
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
})];
|
||||
|
||||
let serialized = serialize_filtered_rollout_response_items(&input).expect("serialize");
|
||||
let parsed: Vec<ResponseItem> = serde_json::from_str(&serialized).expect("deserialize");
|
||||
|
||||
pretty_assertions::assert_eq!(parsed.len(), 1);
|
||||
assert!(matches!(parsed[0], ResponseItem::Message { .. }));
|
||||
}
|
||||
}
|
||||
306
codex-rs/core/src/memories/startup/mod.rs
Normal file
306
codex-rs/core/src/memories/startup/mod.rs
Normal file
@@ -0,0 +1,306 @@
|
||||
mod dispatch;
|
||||
mod extract;
|
||||
mod phase2;
|
||||
|
||||
use crate::codex::Session;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::config::Config;
|
||||
use crate::error::Result as CodexResult;
|
||||
use crate::features::Feature;
|
||||
use crate::memories::metrics;
|
||||
use crate::memories::phase_one;
|
||||
use crate::rollout::INTERACTIVE_SESSION_SOURCES;
|
||||
use codex_otel::OtelManager;
|
||||
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
|
||||
use codex_protocol::openai_models::ModelInfo;
|
||||
use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use futures::StreamExt;
|
||||
use std::sync::Arc;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
enum PhaseOneJobOutcome {
|
||||
SucceededWithOutput,
|
||||
SucceededNoOutput,
|
||||
Failed,
|
||||
}
|
||||
|
||||
pub(super) const PHASE_ONE_THREAD_SCAN_LIMIT: usize = 5_000;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct StageOneRequestContext {
|
||||
model_info: ModelInfo,
|
||||
otel_manager: OtelManager,
|
||||
reasoning_effort: Option<ReasoningEffortConfig>,
|
||||
reasoning_summary: ReasoningSummaryConfig,
|
||||
turn_metadata_header: Option<String>,
|
||||
}
|
||||
|
||||
impl StageOneRequestContext {
|
||||
fn from_turn_context(turn_context: &TurnContext, turn_metadata_header: Option<String>) -> Self {
|
||||
Self {
|
||||
model_info: turn_context.model_info.clone(),
|
||||
otel_manager: turn_context.otel_manager.clone(),
|
||||
reasoning_effort: turn_context.reasoning_effort,
|
||||
reasoning_summary: turn_context.reasoning_summary,
|
||||
turn_metadata_header,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Starts the asynchronous startup memory pipeline for an eligible root session.
|
||||
///
|
||||
/// The pipeline is skipped for ephemeral sessions, disabled feature flags, and
|
||||
/// subagent sessions.
|
||||
pub(crate) fn start_memories_startup_task(
|
||||
session: &Arc<Session>,
|
||||
config: Arc<Config>,
|
||||
source: &SessionSource,
|
||||
) {
|
||||
if config.ephemeral
|
||||
|| !config.features.enabled(Feature::MemoryTool)
|
||||
|| matches!(source, SessionSource::SubAgent(_))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
let weak_session = Arc::downgrade(session);
|
||||
tokio::spawn(async move {
|
||||
let Some(session) = weak_session.upgrade() else {
|
||||
return;
|
||||
};
|
||||
if let Err(err) = run_memories_startup_pipeline(&session, config).await {
|
||||
warn!("memories startup pipeline failed: {err}");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Runs the startup memory pipeline.
|
||||
///
|
||||
/// Phase 1 selects rollout candidates, performs stage-1 extraction requests in
|
||||
/// parallel, persists stage-1 outputs, and enqueues consolidation work.
|
||||
///
|
||||
/// Phase 2 claims a global consolidation lock and spawns one consolidation agent.
|
||||
pub(super) async fn run_memories_startup_pipeline(
|
||||
session: &Arc<Session>,
|
||||
config: Arc<Config>,
|
||||
) -> CodexResult<()> {
|
||||
let otel_manager = &session.services.otel_manager;
|
||||
let Some(state_db) = session.services.state_db.as_deref() else {
|
||||
warn!("state db unavailable for memories startup pipeline; skipping");
|
||||
otel_manager.counter(
|
||||
metrics::MEMORY_PHASE_ONE_JOBS,
|
||||
1,
|
||||
&[("status", "skipped_state_db_unavailable")],
|
||||
);
|
||||
otel_manager.counter(
|
||||
metrics::MEMORY_PHASE_ONE_JOBS,
|
||||
1,
|
||||
&[("status", "skipped_state_db_unavailable")],
|
||||
);
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let allowed_sources = INTERACTIVE_SESSION_SOURCES
|
||||
.iter()
|
||||
.map(ToString::to_string)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let claimed_candidates = match state_db
|
||||
.claim_stage1_jobs_for_startup(
|
||||
session.conversation_id,
|
||||
codex_state::Stage1StartupClaimParams {
|
||||
scan_limit: PHASE_ONE_THREAD_SCAN_LIMIT,
|
||||
max_claimed: phase_one::MAX_ROLLOUTS_PER_STARTUP,
|
||||
max_age_days: phase_one::MAX_ROLLOUT_AGE_DAYS,
|
||||
min_rollout_idle_hours: phase_one::MIN_ROLLOUT_IDLE_HOURS,
|
||||
allowed_sources: allowed_sources.as_slice(),
|
||||
lease_seconds: phase_one::JOB_LEASE_SECONDS,
|
||||
},
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(claims) => claims,
|
||||
Err(err) => {
|
||||
warn!("state db claim_stage1_jobs_for_startup failed during memories startup: {err}");
|
||||
otel_manager.counter(
|
||||
metrics::MEMORY_PHASE_ONE_JOBS,
|
||||
1,
|
||||
&[("status", "failed_claim")],
|
||||
);
|
||||
Vec::new()
|
||||
}
|
||||
};
|
||||
|
||||
let claimed_count = claimed_candidates.len();
|
||||
if claimed_count == 0 {
|
||||
otel_manager.counter(
|
||||
metrics::MEMORY_PHASE_ONE_JOBS,
|
||||
1,
|
||||
&[("status", "skipped_no_candidates")],
|
||||
);
|
||||
}
|
||||
let mut phase_one_outcomes = Vec::new();
|
||||
if claimed_count > 0 {
|
||||
let turn_context = session.new_default_turn().await;
|
||||
let stage_one_context = StageOneRequestContext::from_turn_context(
|
||||
turn_context.as_ref(),
|
||||
turn_context.resolve_turn_metadata_header().await,
|
||||
);
|
||||
|
||||
phase_one_outcomes = futures::stream::iter(claimed_candidates.into_iter())
|
||||
.map(|claim| {
|
||||
let session = Arc::clone(session);
|
||||
let stage_one_context = stage_one_context.clone();
|
||||
async move {
|
||||
let thread = claim.thread;
|
||||
let stage_one_output = match extract::extract_stage_one_output(
|
||||
session.as_ref(),
|
||||
&thread.rollout_path,
|
||||
&thread.cwd,
|
||||
&stage_one_context,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(output) => output,
|
||||
Err(reason) => {
|
||||
if let Some(state_db) = session.services.state_db.as_deref() {
|
||||
let _ = state_db
|
||||
.mark_stage1_job_failed(
|
||||
thread.id,
|
||||
&claim.ownership_token,
|
||||
reason,
|
||||
phase_one::JOB_RETRY_DELAY_SECONDS,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
return PhaseOneJobOutcome::Failed;
|
||||
}
|
||||
};
|
||||
|
||||
let Some(state_db) = session.services.state_db.as_deref() else {
|
||||
return PhaseOneJobOutcome::Failed;
|
||||
};
|
||||
|
||||
if stage_one_output.raw_memory.is_empty()
|
||||
&& stage_one_output.rollout_summary.is_empty()
|
||||
{
|
||||
return if state_db
|
||||
.mark_stage1_job_succeeded_no_output(thread.id, &claim.ownership_token)
|
||||
.await
|
||||
.unwrap_or(false)
|
||||
{
|
||||
PhaseOneJobOutcome::SucceededNoOutput
|
||||
} else {
|
||||
PhaseOneJobOutcome::Failed
|
||||
};
|
||||
}
|
||||
|
||||
if state_db
|
||||
.mark_stage1_job_succeeded(
|
||||
thread.id,
|
||||
&claim.ownership_token,
|
||||
thread.updated_at.timestamp(),
|
||||
&stage_one_output.raw_memory,
|
||||
&stage_one_output.rollout_summary,
|
||||
)
|
||||
.await
|
||||
.unwrap_or(false)
|
||||
{
|
||||
PhaseOneJobOutcome::SucceededWithOutput
|
||||
} else {
|
||||
PhaseOneJobOutcome::Failed
|
||||
}
|
||||
}
|
||||
})
|
||||
.buffer_unordered(phase_one::CONCURRENCY_LIMIT)
|
||||
.collect::<Vec<PhaseOneJobOutcome>>()
|
||||
.await;
|
||||
}
|
||||
|
||||
let succeeded_with_output_count = phase_one_outcomes
|
||||
.iter()
|
||||
.filter(|outcome| matches!(outcome, PhaseOneJobOutcome::SucceededWithOutput))
|
||||
.count();
|
||||
let succeeded_no_output_count = phase_one_outcomes
|
||||
.iter()
|
||||
.filter(|outcome| matches!(outcome, PhaseOneJobOutcome::SucceededNoOutput))
|
||||
.count();
|
||||
let failed_count = phase_one_outcomes
|
||||
.iter()
|
||||
.filter(|outcome| matches!(outcome, PhaseOneJobOutcome::Failed))
|
||||
.count();
|
||||
let succeeded_count = succeeded_with_output_count + succeeded_no_output_count;
|
||||
|
||||
if claimed_count > 0 {
|
||||
otel_manager.counter(
|
||||
metrics::MEMORY_PHASE_ONE_JOBS,
|
||||
claimed_count as i64,
|
||||
&[("status", "claimed")],
|
||||
);
|
||||
}
|
||||
if succeeded_with_output_count > 0 {
|
||||
otel_manager.counter(
|
||||
metrics::MEMORY_PHASE_ONE_JOBS,
|
||||
succeeded_with_output_count as i64,
|
||||
&[("status", "succeeded")],
|
||||
);
|
||||
otel_manager.counter(
|
||||
metrics::MEMORY_PHASE_ONE_OUTPUT,
|
||||
succeeded_with_output_count as i64,
|
||||
&[],
|
||||
);
|
||||
}
|
||||
if succeeded_no_output_count > 0 {
|
||||
otel_manager.counter(
|
||||
metrics::MEMORY_PHASE_ONE_JOBS,
|
||||
succeeded_no_output_count as i64,
|
||||
&[("status", "succeeded_no_output")],
|
||||
);
|
||||
}
|
||||
if failed_count > 0 {
|
||||
otel_manager.counter(
|
||||
metrics::MEMORY_PHASE_ONE_JOBS,
|
||||
failed_count as i64,
|
||||
&[("status", "failed")],
|
||||
);
|
||||
}
|
||||
|
||||
info!(
|
||||
"memory stage-1 extraction complete: {} job(s) claimed, {} succeeded ({} with output, {} no output), {} failed",
|
||||
claimed_count,
|
||||
succeeded_count,
|
||||
succeeded_with_output_count,
|
||||
succeeded_no_output_count,
|
||||
failed_count
|
||||
);
|
||||
|
||||
let consolidation_job_count =
|
||||
usize::from(dispatch::run_global_memory_consolidation(session, config).await);
|
||||
info!(
|
||||
"memory consolidation dispatch complete: {} job(s) scheduled",
|
||||
consolidation_job_count
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::run_memories_startup_pipeline;
|
||||
use crate::codex::make_session_and_context;
|
||||
use crate::config::test_config;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[tokio::test]
|
||||
async fn startup_pipeline_is_noop_when_state_db_is_unavailable() {
|
||||
let (session, _turn_context) = make_session_and_context().await;
|
||||
let session = Arc::new(session);
|
||||
let config = Arc::new(test_config());
|
||||
run_memories_startup_pipeline(&session, config)
|
||||
.await
|
||||
.expect("startup pipeline should skip cleanly without state db");
|
||||
}
|
||||
}
|
||||
@@ -11,7 +11,7 @@ use tracing::debug;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
|
||||
pub(in crate::memories) fn spawn_phase2_completion_task(
|
||||
pub(super) fn spawn_phase2_completion_task(
|
||||
session: &Session,
|
||||
ownership_token: String,
|
||||
completion_watermark: i64,
|
||||
@@ -9,8 +9,6 @@ use crate::memories::phase_two;
|
||||
use crate::memories::raw_memories_file;
|
||||
use crate::memories::rollout_summaries_dir;
|
||||
|
||||
//TODO(jif) clean.
|
||||
|
||||
/// Rebuild `raw_memories.md` from DB-backed stage-1 outputs.
|
||||
pub(super) async fn rebuild_raw_memories_file_from_memories(
|
||||
root: &Path,
|
||||
@@ -84,8 +82,6 @@ async fn rebuild_raw_memories_file(root: &Path, memories: &[Stage1Output]) -> st
|
||||
memory.source_updated_at.to_rfc3339()
|
||||
)
|
||||
.map_err(|err| std::io::Error::other(format!("format raw memories: {err}")))?;
|
||||
writeln!(body, "cwd: {}", memory.cwd.display())
|
||||
.map_err(|err| std::io::Error::other(format!("format raw memories: {err}")))?;
|
||||
writeln!(body)
|
||||
.map_err(|err| std::io::Error::other(format!("format raw memories: {err}")))?;
|
||||
body.push_str(memory.raw_memory.trim());
|
||||
@@ -140,8 +136,6 @@ async fn write_rollout_summary_for_thread(
|
||||
memory.source_updated_at.to_rfc3339()
|
||||
)
|
||||
.map_err(|err| std::io::Error::other(format!("format rollout summary: {err}")))?;
|
||||
writeln!(body, "cwd: {}", memory.cwd.display())
|
||||
.map_err(|err| std::io::Error::other(format!("format rollout summary: {err}")))?;
|
||||
writeln!(body)
|
||||
.map_err(|err| std::io::Error::other(format!("format rollout summary: {err}")))?;
|
||||
body.push_str(&memory.rollout_summary);
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use super::stage_one::parse_stage_one_output;
|
||||
use super::storage::rebuild_raw_memories_file_from_memories;
|
||||
use super::storage::sync_rollout_summaries_from_memories;
|
||||
use crate::memories::ensure_layout;
|
||||
@@ -10,7 +11,6 @@ use codex_protocol::ThreadId;
|
||||
use codex_state::Stage1Output;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::Value;
|
||||
use std::path::PathBuf;
|
||||
use tempfile::tempdir;
|
||||
|
||||
#[test]
|
||||
@@ -20,9 +20,40 @@ fn memory_root_uses_shared_global_path() {
|
||||
assert_eq!(memory_root(&codex_home), codex_home.join("memories"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_stage_one_output_accepts_fenced_json() {
|
||||
let raw = "```json\n{\"raw_memory\":\"abc\",\"rollout_summary\":\"short\"}\n```";
|
||||
let parsed = parse_stage_one_output(raw).expect("parsed");
|
||||
assert!(parsed.raw_memory.contains("abc"));
|
||||
assert_eq!(parsed.rollout_summary, "short");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_stage_one_output_rejects_legacy_keys() {
|
||||
let raw = r#"{"rawMemory":"abc","summary":"short"}"#;
|
||||
assert!(parse_stage_one_output(raw).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_stage_one_output_accepts_empty_pair_for_skip() {
|
||||
let raw = r#"{"raw_memory":"","rollout_summary":""}"#;
|
||||
let parsed = parse_stage_one_output(raw).expect("parsed");
|
||||
assert_eq!(parsed.raw_memory, "");
|
||||
assert_eq!(parsed.rollout_summary, "");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_stage_one_output_accepts_optional_rollout_slug() {
|
||||
let raw = r#"{"raw_memory":"abc","rollout_summary":"short","rollout_slug":"my-slug"}"#;
|
||||
let parsed = parse_stage_one_output(raw).expect("parsed");
|
||||
assert!(parsed.raw_memory.contains("abc"));
|
||||
assert_eq!(parsed.rollout_summary, "short");
|
||||
assert_eq!(parsed._rollout_slug, Some("my-slug".to_string()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stage_one_output_schema_requires_all_declared_properties() {
|
||||
let schema = crate::memories::phase1::output_schema();
|
||||
let schema = super::stage_one::stage_one_output_schema();
|
||||
let properties = schema
|
||||
.get("properties")
|
||||
.and_then(Value::as_object)
|
||||
@@ -66,7 +97,6 @@ async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only
|
||||
source_updated_at: Utc.timestamp_opt(100, 0).single().expect("timestamp"),
|
||||
raw_memory: "raw memory".to_string(),
|
||||
rollout_summary: "short summary".to_string(),
|
||||
cwd: PathBuf::from("/tmp/workspace"),
|
||||
generated_at: Utc.timestamp_opt(101, 0).single().expect("timestamp"),
|
||||
}];
|
||||
|
||||
@@ -85,5 +115,4 @@ async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only
|
||||
.expect("read raw memories");
|
||||
assert!(raw_memories.contains("raw memory"));
|
||||
assert!(raw_memories.contains(&keep_id));
|
||||
assert!(raw_memories.contains("cwd: /tmp/workspace"));
|
||||
}
|
||||
|
||||
@@ -137,16 +137,7 @@ impl ModelsManager {
|
||||
// todo(aibrahim): look if we can tighten it to pub(crate)
|
||||
/// Look up model metadata, applying remote overrides and config adjustments.
|
||||
pub async fn get_model_info(&self, model: &str, config: &Config) -> ModelInfo {
|
||||
let remote = self
|
||||
.get_remote_models(config)
|
||||
.await
|
||||
.into_iter()
|
||||
.find(|m| m.slug == model);
|
||||
let model = if let Some(remote) = remote {
|
||||
remote
|
||||
} else {
|
||||
model_info::model_info_from_slug(model)
|
||||
};
|
||||
let model = model_info::model_info_from_slug(model);
|
||||
model_info::with_config_overrides(model, config)
|
||||
}
|
||||
|
||||
|
||||
@@ -37,8 +37,8 @@ pub(crate) fn should_persist_response_item(item: &ResponseItem) -> bool {
|
||||
#[inline]
|
||||
pub(crate) fn should_persist_response_item_for_memories(item: &ResponseItem) -> bool {
|
||||
match item {
|
||||
ResponseItem::Message { role, .. } => role != "developer",
|
||||
ResponseItem::LocalShellCall { .. }
|
||||
ResponseItem::Message { .. }
|
||||
| ResponseItem::LocalShellCall { .. }
|
||||
| ResponseItem::FunctionCall { .. }
|
||||
| ResponseItem::FunctionCallOutput { .. }
|
||||
| ResponseItem::CustomToolCall { .. }
|
||||
|
||||
@@ -30,7 +30,6 @@ pub(crate) struct SessionState {
|
||||
/// Startup regular task pre-created during session initialization.
|
||||
pub(crate) startup_regular_task: Option<RegularTask>,
|
||||
pub(crate) active_mcp_tool_selection: Option<Vec<String>>,
|
||||
pub(crate) active_connector_selection: HashSet<String>,
|
||||
}
|
||||
|
||||
impl SessionState {
|
||||
@@ -48,7 +47,6 @@ impl SessionState {
|
||||
previous_model: None,
|
||||
startup_regular_task: None,
|
||||
active_mcp_tool_selection: None,
|
||||
active_connector_selection: HashSet::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -177,25 +175,6 @@ impl SessionState {
|
||||
pub(crate) fn clear_mcp_tool_selection(&mut self) {
|
||||
self.active_mcp_tool_selection = None;
|
||||
}
|
||||
|
||||
// Adds connector IDs to the active set and returns the merged selection.
|
||||
pub(crate) fn merge_connector_selection<I>(&mut self, connector_ids: I) -> HashSet<String>
|
||||
where
|
||||
I: IntoIterator<Item = String>,
|
||||
{
|
||||
self.active_connector_selection.extend(connector_ids);
|
||||
self.active_connector_selection.clone()
|
||||
}
|
||||
|
||||
// Returns the current connector selection tracked on session state.
|
||||
pub(crate) fn get_connector_selection(&self) -> HashSet<String> {
|
||||
self.active_connector_selection.clone()
|
||||
}
|
||||
|
||||
// Removes all currently tracked connector selections.
|
||||
pub(crate) fn clear_connector_selection(&mut self) {
|
||||
self.active_connector_selection.clear();
|
||||
}
|
||||
}
|
||||
|
||||
// Sometimes new snapshots don't include credits or plan information.
|
||||
@@ -293,35 +272,6 @@ mod tests {
|
||||
assert_eq!(state.get_mcp_tool_selection(), None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
// Verifies connector merging deduplicates repeated IDs.
|
||||
async fn merge_connector_selection_deduplicates_entries() {
|
||||
let session_configuration = make_session_configuration_for_tests().await;
|
||||
let mut state = SessionState::new(session_configuration);
|
||||
let merged = state.merge_connector_selection([
|
||||
"calendar".to_string(),
|
||||
"calendar".to_string(),
|
||||
"drive".to_string(),
|
||||
]);
|
||||
|
||||
assert_eq!(
|
||||
merged,
|
||||
HashSet::from(["calendar".to_string(), "drive".to_string()])
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
// Verifies clearing connector selection removes all saved IDs.
|
||||
async fn clear_connector_selection_removes_entries() {
|
||||
let session_configuration = make_session_configuration_for_tests().await;
|
||||
let mut state = SessionState::new(session_configuration);
|
||||
state.merge_connector_selection(["calendar".to_string()]);
|
||||
|
||||
state.clear_connector_selection();
|
||||
|
||||
assert_eq!(state.get_connector_selection(), HashSet::new());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn set_rate_limits_defaults_limit_id_to_codex_when_missing() {
|
||||
let session_configuration = make_session_configuration_for_tests().await;
|
||||
|
||||
@@ -121,7 +121,6 @@ impl Session {
|
||||
) {
|
||||
self.abort_all_tasks(TurnAbortReason::Replaced).await;
|
||||
self.clear_mcp_tool_selection().await;
|
||||
self.clear_connector_selection().await;
|
||||
self.seed_initial_context_if_needed(turn_context.as_ref())
|
||||
.await;
|
||||
|
||||
|
||||
@@ -97,8 +97,6 @@ Primary inputs (always read these, if exists):
|
||||
Under `{{ memory_root }}/`:
|
||||
- `raw_memories.md`
|
||||
- mechanical merge of `raw_memories` from Phase 1;
|
||||
- source of rollout-level metadata needed for MEMORY.md header annotations;
|
||||
you should be able to find `cwd` and `updated_at` there.
|
||||
- `MEMORY.md`
|
||||
- merged memories; produce a lightly clustered version if applicable
|
||||
- `rollout_summaries/*.md`
|
||||
@@ -131,8 +129,8 @@ Rules:
|
||||
Clustered schema:
|
||||
---
|
||||
rollout_summary_files:
|
||||
- <file1.md> (<annotation that includes status/usefulness, cwd, and updated_at, e.g. "success, most useful architecture walkthrough, cwd=/repo/path, updated_at=2026-02-12T10:30:00Z">)
|
||||
- <file2.md> (<annotation with cwd=/..., updated_at=...>)
|
||||
- <file1.md> (<a few words annotation such as "success, most useful" or "uncertain, no user feedback">)
|
||||
- <file2.md> (<annotation>)
|
||||
description: brief description of the shared tasks/outcomes
|
||||
keywords: k1, k2, k3, ... <searchable handles (tool names, error names, repo concepts, contracts)>
|
||||
---
|
||||
@@ -143,9 +141,6 @@ keywords: k1, k2, k3, ... <searchable handles (tool names, error names, repo con
|
||||
Schema rules (strict):
|
||||
- Keep entries compact and retrieval-friendly.
|
||||
- A single note block may correspond to multiple related tasks; aggregate when tasks and lessons align.
|
||||
- In `rollout_summary_files`, each parenthesized annotation must include
|
||||
`cwd=<path>` and `updated_at=<timestamp>` copied from that rollout summary metadata.
|
||||
If missing from an individual rollout summary, recover them from `raw_memories.md`.
|
||||
- If you need to reference skills, do it in the BODY as bullets, not in the header
|
||||
(e.g., "- Related skill: skills/<skill-name>/SKILL.md").
|
||||
- Use lowercase, hyphenated skill folder names.
|
||||
@@ -343,4 +338,4 @@ Use `rg` for fast retrieval while consolidating:
|
||||
- Search across memory tree:
|
||||
`rg -n -i "<pattern>" "{{ memory_root }}" | head -n 50`
|
||||
- Locate rollout summary files:
|
||||
`rg --files "{{ memory_root }}/rollout_summaries" | head -n 200`
|
||||
`rg --files "{{ memory_root }}/rollout_summaries" | head -n 200`
|
||||
@@ -2,8 +2,19 @@
|
||||
#![allow(clippy::expect_used, clippy::unwrap_used)]
|
||||
|
||||
use codex_core::default_client::CODEX_INTERNAL_ORIGINATOR_OVERRIDE_ENV_VAR;
|
||||
use codex_core::models_manager::client_version_to_whole;
|
||||
use codex_core::test_support::all_model_presets;
|
||||
use codex_protocol::openai_models::ConfigShellToolType;
|
||||
use codex_protocol::openai_models::ModelInfo;
|
||||
use codex_protocol::openai_models::ModelPreset;
|
||||
use codex_protocol::openai_models::ModelVisibility;
|
||||
use codex_protocol::openai_models::TruncationPolicyConfig;
|
||||
use codex_protocol::openai_models::default_input_modalities;
|
||||
use core_test_support::responses;
|
||||
use core_test_support::responses::ResponseMock;
|
||||
use core_test_support::test_codex_exec::test_codex_exec;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::path::Path;
|
||||
use wiremock::matchers::header;
|
||||
|
||||
/// Verify that when the server reports an error, `codex-exec` exits with a
|
||||
@@ -52,3 +63,103 @@ async fn supports_originator_override() -> anyhow::Result<()> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn uses_codex_exec_scoped_cache_and_sends_cached_slug() -> anyhow::Result<()> {
|
||||
let test = test_codex_exec();
|
||||
let cached_slug = "exec-cache-slug-e2e";
|
||||
write_models_cache_for_originator(test.home_path(), "codex_exec", cached_slug)?;
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
let body = responses::sse(vec![
|
||||
responses::ev_response_created("response_1"),
|
||||
responses::ev_assistant_message("response_1", "Hello, world!"),
|
||||
responses::ev_completed("response_1"),
|
||||
]);
|
||||
let response_mock = responses::mount_sse_once(&server, body).await;
|
||||
|
||||
test.cmd_with_server(&server)
|
||||
.env_remove(CODEX_INTERNAL_ORIGINATOR_OVERRIDE_ENV_VAR)
|
||||
.arg("--skip-git-repo-check")
|
||||
.arg("tell me something")
|
||||
.assert()
|
||||
.code(0);
|
||||
|
||||
assert_response_model_slug(&response_mock, cached_slug);
|
||||
assert!(
|
||||
test.home_path()
|
||||
.join("models_cache")
|
||||
.join("codex_exec")
|
||||
.join("models_cache.json")
|
||||
.exists()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn assert_response_model_slug(response_mock: &ResponseMock, expected_slug: &str) {
|
||||
let request = response_mock.single_request();
|
||||
let request_body = request.body_json();
|
||||
assert_eq!(request_body["model"].as_str(), Some(expected_slug));
|
||||
}
|
||||
|
||||
fn write_models_cache_for_originator(
|
||||
codex_home: &Path,
|
||||
originator: &str,
|
||||
slug: &str,
|
||||
) -> std::io::Result<()> {
|
||||
let Some(first_preset) = all_model_presets()
|
||||
.into_iter()
|
||||
.find(|preset| preset.show_in_picker)
|
||||
else {
|
||||
return Err(std::io::Error::other("no visible model presets"));
|
||||
};
|
||||
let mut model = preset_to_info(&first_preset, 0);
|
||||
model.slug = slug.to_string();
|
||||
let cache_path = codex_home
|
||||
.join("models_cache")
|
||||
.join(originator)
|
||||
.join("models_cache.json");
|
||||
if let Some(parent) = cache_path.parent() {
|
||||
std::fs::create_dir_all(parent)?;
|
||||
}
|
||||
let cache = serde_json::json!({
|
||||
"fetched_at": chrono::Utc::now(),
|
||||
"etag": null,
|
||||
"client_version": client_version_to_whole(),
|
||||
"models": [model]
|
||||
});
|
||||
std::fs::write(cache_path, serde_json::to_string_pretty(&cache)?)
|
||||
}
|
||||
|
||||
fn preset_to_info(preset: &ModelPreset, priority: i32) -> ModelInfo {
|
||||
ModelInfo {
|
||||
slug: preset.id.clone(),
|
||||
display_name: preset.display_name.clone(),
|
||||
description: Some(preset.description.clone()),
|
||||
default_reasoning_level: Some(preset.default_reasoning_effort),
|
||||
supported_reasoning_levels: preset.supported_reasoning_efforts.clone(),
|
||||
shell_type: ConfigShellToolType::ShellCommand,
|
||||
visibility: if preset.show_in_picker {
|
||||
ModelVisibility::List
|
||||
} else {
|
||||
ModelVisibility::Hide
|
||||
},
|
||||
supported_in_api: true,
|
||||
priority,
|
||||
upgrade: preset.upgrade.as_ref().map(|upgrade| upgrade.into()),
|
||||
base_instructions: "base instructions".to_string(),
|
||||
model_messages: None,
|
||||
supports_reasoning_summaries: false,
|
||||
support_verbosity: false,
|
||||
default_verbosity: None,
|
||||
apply_patch_tool_type: None,
|
||||
truncation_policy: TruncationPolicyConfig::bytes(10_000),
|
||||
supports_parallel_tool_calls: false,
|
||||
context_window: Some(272_000),
|
||||
auto_compact_token_limit: None,
|
||||
effective_context_window_percent: 95,
|
||||
experimental_supported_tools: Vec::new(),
|
||||
input_modalities: default_input_modalities(),
|
||||
prefer_websockets: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -139,6 +139,19 @@ impl Drop for FileSearchSession {
|
||||
}
|
||||
|
||||
pub fn create_session(
|
||||
search_directory: &Path,
|
||||
options: FileSearchOptions,
|
||||
reporter: Arc<dyn SessionReporter>,
|
||||
) -> anyhow::Result<FileSearchSession> {
|
||||
create_session_inner(
|
||||
vec![search_directory.to_path_buf()],
|
||||
options,
|
||||
reporter,
|
||||
None,
|
||||
)
|
||||
}
|
||||
|
||||
fn create_session_inner(
|
||||
search_directories: Vec<PathBuf>,
|
||||
options: FileSearchOptions,
|
||||
reporter: Arc<dyn SessionReporter>,
|
||||
@@ -278,7 +291,7 @@ pub fn run(
|
||||
cancel_flag: Option<Arc<AtomicBool>>,
|
||||
) -> anyhow::Result<FileSearchResults> {
|
||||
let reporter = Arc::new(RunReporter::default());
|
||||
let session = create_session(roots, options, reporter.clone(), cancel_flag)?;
|
||||
let session = create_session_inner(roots, options, reporter.clone(), cancel_flag)?;
|
||||
|
||||
session.update_query(pattern_text);
|
||||
|
||||
@@ -758,13 +771,8 @@ mod tests {
|
||||
fn session_scanned_file_count_is_monotonic_across_queries() {
|
||||
let dir = create_temp_tree(200);
|
||||
let reporter = Arc::new(RecordingReporter::default());
|
||||
let session = create_session(
|
||||
vec![dir.path().to_path_buf()],
|
||||
FileSearchOptions::default(),
|
||||
reporter.clone(),
|
||||
None,
|
||||
)
|
||||
.expect("session");
|
||||
let session = create_session(dir.path(), FileSearchOptions::default(), reporter.clone())
|
||||
.expect("session");
|
||||
|
||||
session.update_query("file-00");
|
||||
thread::sleep(Duration::from_millis(20));
|
||||
@@ -783,13 +791,8 @@ mod tests {
|
||||
fn session_streams_updates_before_walk_complete() {
|
||||
let dir = create_temp_tree(600);
|
||||
let reporter = Arc::new(RecordingReporter::default());
|
||||
let session = create_session(
|
||||
vec![dir.path().to_path_buf()],
|
||||
FileSearchOptions::default(),
|
||||
reporter.clone(),
|
||||
None,
|
||||
)
|
||||
.expect("session");
|
||||
let session = create_session(dir.path(), FileSearchOptions::default(), reporter.clone())
|
||||
.expect("session");
|
||||
|
||||
session.update_query("file-0");
|
||||
let completed = reporter.wait_for_complete(Duration::from_secs(5));
|
||||
@@ -805,13 +808,8 @@ mod tests {
|
||||
fs::write(dir.path().join("alpha.txt"), "alpha").unwrap();
|
||||
fs::write(dir.path().join("beta.txt"), "beta").unwrap();
|
||||
let reporter = Arc::new(RecordingReporter::default());
|
||||
let session = create_session(
|
||||
vec![dir.path().to_path_buf()],
|
||||
FileSearchOptions::default(),
|
||||
reporter.clone(),
|
||||
None,
|
||||
)
|
||||
.expect("session");
|
||||
let session = create_session(dir.path(), FileSearchOptions::default(), reporter.clone())
|
||||
.expect("session");
|
||||
|
||||
session.update_query("alpha");
|
||||
assert!(reporter.wait_for_complete(Duration::from_secs(5)));
|
||||
@@ -836,7 +834,7 @@ mod tests {
|
||||
fs::write(dir.path().join("alpha.txt"), "alpha").unwrap();
|
||||
fs::write(dir.path().join("beta.txt"), "beta").unwrap();
|
||||
let reporter = Arc::new(RecordingReporter::default());
|
||||
let session = create_session(
|
||||
let session = create_session_inner(
|
||||
vec![dir.path().to_path_buf()],
|
||||
FileSearchOptions::default(),
|
||||
reporter.clone(),
|
||||
@@ -865,7 +863,7 @@ mod tests {
|
||||
let cancel_flag = Arc::new(AtomicBool::new(false));
|
||||
|
||||
let reporter_a = Arc::new(RecordingReporter::default());
|
||||
let session_a = create_session(
|
||||
let session_a = create_session_inner(
|
||||
vec![root_a.path().to_path_buf()],
|
||||
FileSearchOptions::default(),
|
||||
reporter_a,
|
||||
@@ -874,7 +872,7 @@ mod tests {
|
||||
.expect("session_a");
|
||||
|
||||
let reporter_b = Arc::new(RecordingReporter::default());
|
||||
let session_b = create_session(
|
||||
let session_b = create_session_inner(
|
||||
vec![root_b.path().to_path_buf()],
|
||||
FileSearchOptions::default(),
|
||||
reporter_b.clone(),
|
||||
@@ -896,13 +894,8 @@ mod tests {
|
||||
fn session_emits_updates_when_query_changes() {
|
||||
let dir = create_temp_tree(200);
|
||||
let reporter = Arc::new(RecordingReporter::default());
|
||||
let session = create_session(
|
||||
vec![dir.path().to_path_buf()],
|
||||
FileSearchOptions::default(),
|
||||
reporter.clone(),
|
||||
None,
|
||||
)
|
||||
.expect("session");
|
||||
let session = create_session(dir.path(), FileSearchOptions::default(), reporter.clone())
|
||||
.expect("session");
|
||||
|
||||
session.update_query("zzzzzzzz");
|
||||
let completed = reporter.wait_for_complete(Duration::from_secs(5));
|
||||
|
||||
@@ -4,7 +4,6 @@ use chrono::Utc;
|
||||
use codex_protocol::ThreadId;
|
||||
use sqlx::Row;
|
||||
use sqlx::sqlite::SqliteRow;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use super::ThreadMetadata;
|
||||
|
||||
@@ -15,7 +14,6 @@ pub struct Stage1Output {
|
||||
pub source_updated_at: DateTime<Utc>,
|
||||
pub raw_memory: String,
|
||||
pub rollout_summary: String,
|
||||
pub cwd: PathBuf,
|
||||
pub generated_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
@@ -25,7 +23,6 @@ pub(crate) struct Stage1OutputRow {
|
||||
source_updated_at: i64,
|
||||
raw_memory: String,
|
||||
rollout_summary: String,
|
||||
cwd: String,
|
||||
generated_at: i64,
|
||||
}
|
||||
|
||||
@@ -36,7 +33,6 @@ impl Stage1OutputRow {
|
||||
source_updated_at: row.try_get("source_updated_at")?,
|
||||
raw_memory: row.try_get("raw_memory")?,
|
||||
rollout_summary: row.try_get("rollout_summary")?,
|
||||
cwd: row.try_get("cwd")?,
|
||||
generated_at: row.try_get("generated_at")?,
|
||||
})
|
||||
}
|
||||
@@ -51,7 +47,6 @@ impl TryFrom<Stage1OutputRow> for Stage1Output {
|
||||
source_updated_at: epoch_seconds_to_datetime(row.source_updated_at)?,
|
||||
raw_memory: row.raw_memory,
|
||||
rollout_summary: row.rollout_summary,
|
||||
cwd: PathBuf::from(row.cwd),
|
||||
generated_at: epoch_seconds_to_datetime(row.generated_at)?,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -2093,10 +2093,8 @@ WHERE kind = 'memory_stage1'
|
||||
assert_eq!(outputs.len(), 2);
|
||||
assert_eq!(outputs[0].thread_id, thread_id_b);
|
||||
assert_eq!(outputs[0].rollout_summary, "summary b");
|
||||
assert_eq!(outputs[0].cwd, codex_home.join("workspace-b"));
|
||||
assert_eq!(outputs[1].thread_id, thread_id_a);
|
||||
assert_eq!(outputs[1].rollout_summary, "summary a");
|
||||
assert_eq!(outputs[1].cwd, codex_home.join("workspace-a"));
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
@@ -2165,7 +2163,6 @@ VALUES (?, ?, ?, ?, ?)
|
||||
assert_eq!(outputs.len(), 1);
|
||||
assert_eq!(outputs[0].thread_id, thread_id_non_empty);
|
||||
assert_eq!(outputs[0].rollout_summary, "summary");
|
||||
assert_eq!(outputs[0].cwd, codex_home.join("workspace-non-empty"));
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
@@ -178,7 +178,6 @@ LEFT JOIN jobs
|
||||
///
|
||||
/// Query behavior:
|
||||
/// - filters out rows where both `raw_memory` and `rollout_summary` are blank
|
||||
/// - joins `threads` to include thread `cwd`
|
||||
/// - orders by `source_updated_at DESC, thread_id DESC`
|
||||
/// - applies `LIMIT n`
|
||||
pub async fn list_stage1_outputs_for_global(
|
||||
@@ -192,10 +191,7 @@ LEFT JOIN jobs
|
||||
let rows = sqlx::query(
|
||||
r#"
|
||||
SELECT so.thread_id, so.source_updated_at, so.raw_memory, so.rollout_summary, so.generated_at
|
||||
, COALESCE(t.cwd, '') AS cwd
|
||||
FROM stage1_outputs AS so
|
||||
LEFT JOIN threads AS t
|
||||
ON t.id = so.thread_id
|
||||
WHERE length(trim(so.raw_memory)) > 0 OR length(trim(so.rollout_summary)) > 0
|
||||
ORDER BY so.source_updated_at DESC, so.thread_id DESC
|
||||
LIMIT ?
|
||||
|
||||
@@ -81,13 +81,12 @@ impl FileSearchManager {
|
||||
session_token,
|
||||
});
|
||||
let session = file_search::create_session(
|
||||
vec![self.search_dir.clone()],
|
||||
&self.search_dir,
|
||||
file_search::FileSearchOptions {
|
||||
compute_indices: true,
|
||||
..Default::default()
|
||||
},
|
||||
reporter,
|
||||
None,
|
||||
);
|
||||
match session {
|
||||
Ok(session) => st.session = Some(session),
|
||||
|
||||
@@ -1,6 +0,0 @@
|
||||
load("//:defs.bzl", "codex_rust_crate")
|
||||
|
||||
codex_rust_crate(
|
||||
name = "sanitizer",
|
||||
crate_name = "codex_utils_sanitizer",
|
||||
)
|
||||
Reference in New Issue
Block a user