Compare commits

...

3 Commits

Author SHA1 Message Date
Eric Traut
a9fd5c35e1 Avoid exposing full permission profile 2026-05-19 08:49:02 -07:00
Eric Traut
62df4d9f9e Wait for pending settings before turn start 2026-05-18 21:46:46 -07:00
Eric Traut
b6b22ad1b3 [6 of 7] Add app-server thread settings API 2026-05-18 21:11:15 -07:00
24 changed files with 2461 additions and 156 deletions

View File

@@ -176,6 +176,7 @@ pub(crate) fn server_notification_requires_delivery(notification: &ServerNotific
matches!(
notification,
ServerNotification::TurnCompleted(_)
| ServerNotification::ThreadSettingsUpdated(_)
| ServerNotification::ItemCompleted(_)
| ServerNotification::AgentMessageDelta(_)
| ServerNotification::PlanDelta(_)

View File

@@ -64,6 +64,26 @@
},
"type": "object"
},
"ActivePermissionProfile": {
"properties": {
"extends": {
"default": null,
"description": "Parent profile identifier once permissions profiles support inheritance. This is currently always `null`.",
"type": [
"string",
"null"
]
},
"id": {
"description": "Identifier from `default_permissions` or the implicit built-in default, such as `:workspace` or a user-defined `[permissions.<id>]` profile.",
"type": "string"
}
},
"required": [
"id"
],
"type": "object"
},
"AdditionalFileSystemPermissions": {
"properties": {
"entries": {
@@ -415,6 +435,65 @@
],
"type": "object"
},
"ApprovalsReviewer": {
"description": "Configures who approval requests are routed to for review. Examples include sandbox escapes, blocked network access, MCP approval prompts, and ARC escalations. Defaults to `user`. `auto_review` uses a carefully prompted subagent to gather relevant context and apply a risk-based decision framework before approving or denying the request. The legacy value `guardian_subagent` is accepted for compatibility.",
"enum": [
"user",
"auto_review",
"guardian_subagent"
],
"type": "string"
},
"AskForApproval": {
"oneOf": [
{
"enum": [
"untrusted",
"on-failure",
"on-request",
"never"
],
"type": "string"
},
{
"additionalProperties": false,
"properties": {
"granular": {
"properties": {
"mcp_elicitations": {
"type": "boolean"
},
"request_permissions": {
"default": false,
"type": "boolean"
},
"rules": {
"type": "boolean"
},
"sandbox_approval": {
"type": "boolean"
},
"skill_approval": {
"default": false,
"type": "boolean"
}
},
"required": [
"mcp_elicitations",
"rules",
"sandbox_approval"
],
"type": "object"
}
},
"required": [
"granular"
],
"title": "GranularAskForApproval",
"type": "object"
}
]
},
"AuthMode": {
"description": "Authentication mode for OpenAI-backed providers.",
"oneOf": [
@@ -658,6 +737,22 @@
],
"type": "string"
},
"CollaborationMode": {
"description": "Collaboration mode for a Codex session.",
"properties": {
"mode": {
"$ref": "#/definitions/ModeKind"
},
"settings": {
"$ref": "#/definitions/Settings"
}
},
"required": [
"mode",
"settings"
],
"type": "object"
},
"CommandAction": {
"oneOf": [
{
@@ -2257,6 +2352,14 @@
}
]
},
"ModeKind": {
"description": "Initial collaboration mode to use when the TUI starts.",
"enum": [
"plan",
"default"
],
"type": "string"
},
"ModelRerouteReason": {
"enum": [
"highRiskCyberActivity"
@@ -2318,6 +2421,13 @@
],
"type": "object"
},
"NetworkAccess": {
"enum": [
"restricted",
"enabled"
],
"type": "string"
},
"NetworkApprovalProtocol": {
"enum": [
"http",
@@ -2401,6 +2511,14 @@
}
]
},
"Personality": {
"enum": [
"none",
"friendly",
"pragmatic"
],
"type": "string"
},
"PlanDeltaNotification": {
"description": "EXPERIMENTAL - proposed plan streaming deltas for plan items. Clients should not assume concatenated deltas match the completed plan item content.",
"properties": {
@@ -2654,6 +2772,26 @@
],
"type": "string"
},
"ReasoningSummary": {
"description": "A summary of the reasoning performed by the model. This can be useful for debugging and understanding the model's reasoning process. See https://platform.openai.com/docs/guides/reasoning?api-mode=responses#reasoning-summaries",
"oneOf": [
{
"enum": [
"auto",
"concise",
"detailed"
],
"type": "string"
},
{
"description": "Option to disable reasoning summaries.",
"enum": [
"none"
],
"type": "string"
}
]
},
"ReasoningSummaryPartAddedNotification": {
"properties": {
"itemId": {
@@ -2806,6 +2944,105 @@
},
"type": "object"
},
"SandboxPolicy": {
"oneOf": [
{
"properties": {
"type": {
"enum": [
"dangerFullAccess"
],
"title": "DangerFullAccessSandboxPolicyType",
"type": "string"
}
},
"required": [
"type"
],
"title": "DangerFullAccessSandboxPolicy",
"type": "object"
},
{
"properties": {
"networkAccess": {
"default": false,
"type": "boolean"
},
"type": {
"enum": [
"readOnly"
],
"title": "ReadOnlySandboxPolicyType",
"type": "string"
}
},
"required": [
"type"
],
"title": "ReadOnlySandboxPolicy",
"type": "object"
},
{
"properties": {
"networkAccess": {
"allOf": [
{
"$ref": "#/definitions/NetworkAccess"
}
],
"default": "restricted"
},
"type": {
"enum": [
"externalSandbox"
],
"title": "ExternalSandboxSandboxPolicyType",
"type": "string"
}
},
"required": [
"type"
],
"title": "ExternalSandboxSandboxPolicy",
"type": "object"
},
{
"properties": {
"excludeSlashTmp": {
"default": false,
"type": "boolean"
},
"excludeTmpdirEnvVar": {
"default": false,
"type": "boolean"
},
"networkAccess": {
"default": false,
"type": "boolean"
},
"type": {
"enum": [
"workspaceWrite"
],
"title": "WorkspaceWriteSandboxPolicyType",
"type": "string"
},
"writableRoots": {
"default": [],
"items": {
"$ref": "#/definitions/AbsolutePathBuf"
},
"type": "array"
}
},
"required": [
"type"
],
"title": "WorkspaceWriteSandboxPolicy",
"type": "object"
}
]
},
"ServerRequestResolvedNotification": {
"properties": {
"requestId": {
@@ -2861,6 +3098,34 @@
}
]
},
"Settings": {
"description": "Settings for a collaboration mode.",
"properties": {
"developer_instructions": {
"type": [
"string",
"null"
]
},
"model": {
"type": "string"
},
"reasoning_effort": {
"anyOf": [
{
"$ref": "#/definitions/ReasoningEffort"
},
{
"type": "null"
}
]
}
},
"required": [
"model"
],
"type": "object"
},
"SkillsChangedNotification": {
"description": "Notification emitted when watched local skill files change.\n\nTreat this as an invalidation signal and re-run `skills/list` with the client's current parameters when refreshed skill metadata is needed.",
"type": "object"
@@ -4147,6 +4412,102 @@
],
"type": "object"
},
"ThreadSettings": {
"properties": {
"activePermissionProfile": {
"anyOf": [
{
"$ref": "#/definitions/ActivePermissionProfile"
},
{
"type": "null"
}
]
},
"approvalPolicy": {
"$ref": "#/definitions/AskForApproval"
},
"approvalsReviewer": {
"$ref": "#/definitions/ApprovalsReviewer"
},
"collaborationMode": {
"$ref": "#/definitions/CollaborationMode"
},
"cwd": {
"$ref": "#/definitions/AbsolutePathBuf"
},
"effort": {
"anyOf": [
{
"$ref": "#/definitions/ReasoningEffort"
},
{
"type": "null"
}
]
},
"model": {
"type": "string"
},
"modelProvider": {
"type": "string"
},
"personality": {
"anyOf": [
{
"$ref": "#/definitions/Personality"
},
{
"type": "null"
}
]
},
"sandboxPolicy": {
"$ref": "#/definitions/SandboxPolicy"
},
"serviceTier": {
"type": [
"string",
"null"
]
},
"summary": {
"anyOf": [
{
"$ref": "#/definitions/ReasoningSummary"
},
{
"type": "null"
}
]
}
},
"required": [
"approvalPolicy",
"approvalsReviewer",
"collaborationMode",
"cwd",
"model",
"modelProvider",
"sandboxPolicy"
],
"type": "object"
},
"ThreadSettingsUpdatedNotification": {
"properties": {
"threadId": {
"type": "string"
},
"threadSettings": {
"$ref": "#/definitions/ThreadSettings"
}
},
"required": [
"threadId",
"threadSettings"
],
"type": "object"
},
"ThreadSource": {
"enum": [
"user",
@@ -4948,6 +5309,26 @@
"title": "Thread/status/changedNotification",
"type": "object"
},
{
"properties": {
"method": {
"enum": [
"thread/settings/updated"
],
"title": "Thread/settings/updatedNotificationMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/ThreadSettingsUpdatedNotification"
}
},
"required": [
"method",
"params"
],
"title": "Thread/settings/updatedNotification",
"type": "object"
},
{
"properties": {
"method": {

View File

@@ -3891,6 +3891,26 @@
"title": "Thread/status/changedNotification",
"type": "object"
},
{
"properties": {
"method": {
"enum": [
"thread/settings/updated"
],
"title": "Thread/settings/updatedNotificationMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/v2/ThreadSettingsUpdatedNotification"
}
},
"required": [
"method",
"params"
],
"title": "Thread/settings/updatedNotification",
"type": "object"
},
{
"properties": {
"method": {
@@ -17059,6 +17079,104 @@
"title": "ThreadSetNameResponse",
"type": "object"
},
"ThreadSettings": {
"properties": {
"activePermissionProfile": {
"anyOf": [
{
"$ref": "#/definitions/v2/ActivePermissionProfile"
},
{
"type": "null"
}
]
},
"approvalPolicy": {
"$ref": "#/definitions/v2/AskForApproval"
},
"approvalsReviewer": {
"$ref": "#/definitions/v2/ApprovalsReviewer"
},
"collaborationMode": {
"$ref": "#/definitions/v2/CollaborationMode"
},
"cwd": {
"$ref": "#/definitions/v2/AbsolutePathBuf"
},
"effort": {
"anyOf": [
{
"$ref": "#/definitions/v2/ReasoningEffort"
},
{
"type": "null"
}
]
},
"model": {
"type": "string"
},
"modelProvider": {
"type": "string"
},
"personality": {
"anyOf": [
{
"$ref": "#/definitions/v2/Personality"
},
{
"type": "null"
}
]
},
"sandboxPolicy": {
"$ref": "#/definitions/v2/SandboxPolicy"
},
"serviceTier": {
"type": [
"string",
"null"
]
},
"summary": {
"anyOf": [
{
"$ref": "#/definitions/v2/ReasoningSummary"
},
{
"type": "null"
}
]
}
},
"required": [
"approvalPolicy",
"approvalsReviewer",
"collaborationMode",
"cwd",
"model",
"modelProvider",
"sandboxPolicy"
],
"type": "object"
},
"ThreadSettingsUpdatedNotification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"threadId": {
"type": "string"
},
"threadSettings": {
"$ref": "#/definitions/v2/ThreadSettings"
}
},
"required": [
"threadId",
"threadSettings"
],
"title": "ThreadSettingsUpdatedNotification",
"type": "object"
},
"ThreadShellCommandParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {

View File

@@ -11092,6 +11092,26 @@
"title": "Thread/status/changedNotification",
"type": "object"
},
{
"properties": {
"method": {
"enum": [
"thread/settings/updated"
],
"title": "Thread/settings/updatedNotificationMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/ThreadSettingsUpdatedNotification"
}
},
"required": [
"method",
"params"
],
"title": "Thread/settings/updatedNotification",
"type": "object"
},
{
"properties": {
"method": {
@@ -14883,6 +14903,104 @@
"title": "ThreadSetNameResponse",
"type": "object"
},
"ThreadSettings": {
"properties": {
"activePermissionProfile": {
"anyOf": [
{
"$ref": "#/definitions/ActivePermissionProfile"
},
{
"type": "null"
}
]
},
"approvalPolicy": {
"$ref": "#/definitions/AskForApproval"
},
"approvalsReviewer": {
"$ref": "#/definitions/ApprovalsReviewer"
},
"collaborationMode": {
"$ref": "#/definitions/CollaborationMode"
},
"cwd": {
"$ref": "#/definitions/AbsolutePathBuf"
},
"effort": {
"anyOf": [
{
"$ref": "#/definitions/ReasoningEffort"
},
{
"type": "null"
}
]
},
"model": {
"type": "string"
},
"modelProvider": {
"type": "string"
},
"personality": {
"anyOf": [
{
"$ref": "#/definitions/Personality"
},
{
"type": "null"
}
]
},
"sandboxPolicy": {
"$ref": "#/definitions/SandboxPolicy"
},
"serviceTier": {
"type": [
"string",
"null"
]
},
"summary": {
"anyOf": [
{
"$ref": "#/definitions/ReasoningSummary"
},
{
"type": "null"
}
]
}
},
"required": [
"approvalPolicy",
"approvalsReviewer",
"collaborationMode",
"cwd",
"model",
"modelProvider",
"sandboxPolicy"
],
"type": "object"
},
"ThreadSettingsUpdatedNotification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"threadId": {
"type": "string"
},
"threadSettings": {
"$ref": "#/definitions/ThreadSettings"
}
},
"required": [
"threadId",
"threadSettings"
],
"title": "ThreadSettingsUpdatedNotification",
"type": "object"
},
"ThreadShellCommandParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {

View File

@@ -0,0 +1,381 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"AbsolutePathBuf": {
"description": "A path that is guaranteed to be absolute and normalized (though it is not guaranteed to be canonicalized or exist on the filesystem).\n\nIMPORTANT: When deserializing an `AbsolutePathBuf`, a base path must be set using [AbsolutePathBufGuard::new]. If no base path is set, the deserialization will fail unless the path being deserialized is already absolute.",
"type": "string"
},
"ActivePermissionProfile": {
"properties": {
"extends": {
"default": null,
"description": "Parent profile identifier once permissions profiles support inheritance. This is currently always `null`.",
"type": [
"string",
"null"
]
},
"id": {
"description": "Identifier from `default_permissions` or the implicit built-in default, such as `:workspace` or a user-defined `[permissions.<id>]` profile.",
"type": "string"
}
},
"required": [
"id"
],
"type": "object"
},
"ApprovalsReviewer": {
"description": "Configures who approval requests are routed to for review. Examples include sandbox escapes, blocked network access, MCP approval prompts, and ARC escalations. Defaults to `user`. `auto_review` uses a carefully prompted subagent to gather relevant context and apply a risk-based decision framework before approving or denying the request. The legacy value `guardian_subagent` is accepted for compatibility.",
"enum": [
"user",
"auto_review",
"guardian_subagent"
],
"type": "string"
},
"AskForApproval": {
"oneOf": [
{
"enum": [
"untrusted",
"on-failure",
"on-request",
"never"
],
"type": "string"
},
{
"additionalProperties": false,
"properties": {
"granular": {
"properties": {
"mcp_elicitations": {
"type": "boolean"
},
"request_permissions": {
"default": false,
"type": "boolean"
},
"rules": {
"type": "boolean"
},
"sandbox_approval": {
"type": "boolean"
},
"skill_approval": {
"default": false,
"type": "boolean"
}
},
"required": [
"mcp_elicitations",
"rules",
"sandbox_approval"
],
"type": "object"
}
},
"required": [
"granular"
],
"title": "GranularAskForApproval",
"type": "object"
}
]
},
"CollaborationMode": {
"description": "Collaboration mode for a Codex session.",
"properties": {
"mode": {
"$ref": "#/definitions/ModeKind"
},
"settings": {
"$ref": "#/definitions/Settings"
}
},
"required": [
"mode",
"settings"
],
"type": "object"
},
"ModeKind": {
"description": "Initial collaboration mode to use when the TUI starts.",
"enum": [
"plan",
"default"
],
"type": "string"
},
"NetworkAccess": {
"enum": [
"restricted",
"enabled"
],
"type": "string"
},
"Personality": {
"enum": [
"none",
"friendly",
"pragmatic"
],
"type": "string"
},
"ReasoningEffort": {
"description": "See https://platform.openai.com/docs/guides/reasoning?api-mode=responses#get-started-with-reasoning",
"enum": [
"none",
"minimal",
"low",
"medium",
"high",
"xhigh"
],
"type": "string"
},
"ReasoningSummary": {
"description": "A summary of the reasoning performed by the model. This can be useful for debugging and understanding the model's reasoning process. See https://platform.openai.com/docs/guides/reasoning?api-mode=responses#reasoning-summaries",
"oneOf": [
{
"enum": [
"auto",
"concise",
"detailed"
],
"type": "string"
},
{
"description": "Option to disable reasoning summaries.",
"enum": [
"none"
],
"type": "string"
}
]
},
"SandboxPolicy": {
"oneOf": [
{
"properties": {
"type": {
"enum": [
"dangerFullAccess"
],
"title": "DangerFullAccessSandboxPolicyType",
"type": "string"
}
},
"required": [
"type"
],
"title": "DangerFullAccessSandboxPolicy",
"type": "object"
},
{
"properties": {
"networkAccess": {
"default": false,
"type": "boolean"
},
"type": {
"enum": [
"readOnly"
],
"title": "ReadOnlySandboxPolicyType",
"type": "string"
}
},
"required": [
"type"
],
"title": "ReadOnlySandboxPolicy",
"type": "object"
},
{
"properties": {
"networkAccess": {
"allOf": [
{
"$ref": "#/definitions/NetworkAccess"
}
],
"default": "restricted"
},
"type": {
"enum": [
"externalSandbox"
],
"title": "ExternalSandboxSandboxPolicyType",
"type": "string"
}
},
"required": [
"type"
],
"title": "ExternalSandboxSandboxPolicy",
"type": "object"
},
{
"properties": {
"excludeSlashTmp": {
"default": false,
"type": "boolean"
},
"excludeTmpdirEnvVar": {
"default": false,
"type": "boolean"
},
"networkAccess": {
"default": false,
"type": "boolean"
},
"type": {
"enum": [
"workspaceWrite"
],
"title": "WorkspaceWriteSandboxPolicyType",
"type": "string"
},
"writableRoots": {
"default": [],
"items": {
"$ref": "#/definitions/AbsolutePathBuf"
},
"type": "array"
}
},
"required": [
"type"
],
"title": "WorkspaceWriteSandboxPolicy",
"type": "object"
}
]
},
"Settings": {
"description": "Settings for a collaboration mode.",
"properties": {
"developer_instructions": {
"type": [
"string",
"null"
]
},
"model": {
"type": "string"
},
"reasoning_effort": {
"anyOf": [
{
"$ref": "#/definitions/ReasoningEffort"
},
{
"type": "null"
}
]
}
},
"required": [
"model"
],
"type": "object"
},
"ThreadSettings": {
"properties": {
"activePermissionProfile": {
"anyOf": [
{
"$ref": "#/definitions/ActivePermissionProfile"
},
{
"type": "null"
}
]
},
"approvalPolicy": {
"$ref": "#/definitions/AskForApproval"
},
"approvalsReviewer": {
"$ref": "#/definitions/ApprovalsReviewer"
},
"collaborationMode": {
"$ref": "#/definitions/CollaborationMode"
},
"cwd": {
"$ref": "#/definitions/AbsolutePathBuf"
},
"effort": {
"anyOf": [
{
"$ref": "#/definitions/ReasoningEffort"
},
{
"type": "null"
}
]
},
"model": {
"type": "string"
},
"modelProvider": {
"type": "string"
},
"personality": {
"anyOf": [
{
"$ref": "#/definitions/Personality"
},
{
"type": "null"
}
]
},
"sandboxPolicy": {
"$ref": "#/definitions/SandboxPolicy"
},
"serviceTier": {
"type": [
"string",
"null"
]
},
"summary": {
"anyOf": [
{
"$ref": "#/definitions/ReasoningSummary"
},
{
"type": "null"
}
]
}
},
"required": [
"approvalPolicy",
"approvalsReviewer",
"collaborationMode",
"cwd",
"model",
"modelProvider",
"sandboxPolicy"
],
"type": "object"
}
},
"properties": {
"threadId": {
"type": "string"
},
"threadSettings": {
"$ref": "#/definitions/ThreadSettings"
}
},
"required": [
"threadId",
"threadSettings"
],
"title": "ThreadSettingsUpdatedNotification",
"type": "object"
}

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,14 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { AbsolutePathBuf } from "../AbsolutePathBuf";
import type { CollaborationMode } from "../CollaborationMode";
import type { Personality } from "../Personality";
import type { ReasoningEffort } from "../ReasoningEffort";
import type { ReasoningSummary } from "../ReasoningSummary";
import type { ActivePermissionProfile } from "./ActivePermissionProfile";
import type { ApprovalsReviewer } from "./ApprovalsReviewer";
import type { AskForApproval } from "./AskForApproval";
import type { SandboxPolicy } from "./SandboxPolicy";
export type ThreadSettings = { model: string, modelProvider: string, serviceTier: string | null, cwd: AbsolutePathBuf, approvalPolicy: AskForApproval, approvalsReviewer: ApprovalsReviewer, sandboxPolicy: SandboxPolicy, activePermissionProfile: ActivePermissionProfile | null, effort: ReasoningEffort | null, summary: ReasoningSummary | null, personality: Personality | null, collaborationMode: CollaborationMode, };

View File

@@ -0,0 +1,6 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { ThreadSettings } from "./ThreadSettings";
export type ThreadSettingsUpdatedNotification = { threadId: string, threadSettings: ThreadSettings, };

View File

@@ -393,6 +393,8 @@ export type { ThreadRollbackParams } from "./ThreadRollbackParams";
export type { ThreadRollbackResponse } from "./ThreadRollbackResponse";
export type { ThreadSetNameParams } from "./ThreadSetNameParams";
export type { ThreadSetNameResponse } from "./ThreadSetNameResponse";
export type { ThreadSettings } from "./ThreadSettings";
export type { ThreadSettingsUpdatedNotification } from "./ThreadSettingsUpdatedNotification";
export type { ThreadShellCommandParams } from "./ThreadShellCommandParams";
export type { ThreadShellCommandResponse } from "./ThreadShellCommandResponse";
export type { ThreadSortKey } from "./ThreadSortKey";

View File

@@ -738,6 +738,13 @@ client_request_definitions! {
serialization: thread_id(params.thread_id),
response: v2::TurnStartResponse,
},
#[experimental("thread/settings/update")]
ThreadSettingsUpdate => "thread/settings/update" {
params: v2::ThreadSettingsUpdateParams,
inspect_params: true,
serialization: thread_id(params.thread_id),
response: v2::ThreadSettingsUpdateResponse,
},
TurnSteer => "turn/steer" {
params: v2::TurnSteerParams,
inspect_params: true,
@@ -1456,6 +1463,8 @@ server_notification_definitions! {
Error => "error" (v2::ErrorNotification),
ThreadStarted => "thread/started" (v2::ThreadStartedNotification),
ThreadStatusChanged => "thread/status/changed" (v2::ThreadStatusChangedNotification),
#[experimental("thread/settings/updated")]
ThreadSettingsUpdated => "thread/settings/updated" (v2::ThreadSettingsUpdatedNotification),
ThreadArchived => "thread/archived" (v2::ThreadArchivedNotification),
ThreadUnarchived => "thread/unarchived" (v2::ThreadUnarchivedNotification),
ThreadClosed => "thread/closed" (v2::ThreadClosedNotification),
@@ -1555,6 +1564,9 @@ mod tests {
use anyhow::Result;
use codex_protocol::ThreadId;
use codex_protocol::account::PlanType;
use codex_protocol::config_types::CollaborationMode;
use codex_protocol::config_types::ModeKind;
use codex_protocol::config_types::Settings;
use codex_protocol::models::BUILT_IN_PERMISSION_PROFILE_READ_ONLY;
use codex_protocol::parse_command::ParsedCommand;
use codex_protocol::protocol::RealtimeConversationVersion;
@@ -1577,6 +1589,30 @@ mod tests {
test_path_buf(&path).abs()
}
fn sample_thread_settings(cwd: AbsolutePathBuf) -> v2::ThreadSettings {
v2::ThreadSettings {
model: "gpt-5".to_string(),
model_provider: "openai".to_string(),
service_tier: None,
cwd,
approval_policy: v2::AskForApproval::OnFailure,
approvals_reviewer: v2::ApprovalsReviewer::User,
sandbox_policy: v2::SandboxPolicy::DangerFullAccess,
active_permission_profile: None,
effort: None,
summary: None,
personality: None,
collaboration_mode: CollaborationMode {
mode: ModeKind::Default,
settings: Settings {
model: "gpt-5".to_string(),
reasoning_effort: None,
developer_instructions: None,
},
},
}
}
fn request_id() -> RequestId {
const REQUEST_ID: i64 = 1;
RequestId::Integer(REQUEST_ID)
@@ -1614,6 +1650,20 @@ mod tests {
})
);
let thread_settings_update = ClientRequest::ThreadSettingsUpdate {
request_id: request_id(),
params: v2::ThreadSettingsUpdateParams {
thread_id: thread_id.clone(),
..Default::default()
},
};
assert_eq!(
thread_settings_update.serialization_scope(),
Some(ClientRequestSerializationScope::Thread {
thread_id: thread_id.clone()
})
);
let thread_fork = ClientRequest::ThreadFork {
request_id: request_id(),
params: v2::ThreadForkParams {
@@ -2287,6 +2337,40 @@ mod tests {
Ok(())
}
#[test]
fn serialize_thread_settings_update_request() -> Result<()> {
let request = ClientRequest::ThreadSettingsUpdate {
request_id: RequestId::Integer(5),
params: v2::ThreadSettingsUpdateParams {
thread_id: "thread-1".to_string(),
model: Some("gpt-5.2".to_string()),
service_tier: Some(None),
..Default::default()
},
};
assert_eq!(
json!({
"method": "thread/settings/update",
"id": 5,
"params": {
"threadId": "thread-1",
"cwd": null,
"approvalPolicy": null,
"approvalsReviewer": null,
"sandboxPolicy": null,
"permissions": null,
"model": "gpt-5.2",
"serviceTier": null,
"summary": null,
"personality": null,
"collaborationMode": null
}
}),
serde_json::to_value(&request)?,
);
Ok(())
}
#[test]
fn serialize_client_response() -> Result<()> {
let cwd = absolute_path("/tmp");
@@ -2378,6 +2462,46 @@ mod tests {
Ok(())
}
#[test]
fn serialize_thread_settings_response_and_notification() -> Result<()> {
let cwd = absolute_path("/tmp");
let thread_settings = sample_thread_settings(cwd);
let thread_settings_json = serde_json::to_value(&thread_settings)?;
let response = ClientResponse::ThreadSettingsUpdate {
request_id: RequestId::Integer(11),
response: v2::ThreadSettingsUpdateResponse {
thread_settings: thread_settings.clone(),
},
};
let notification =
ServerNotification::ThreadSettingsUpdated(v2::ThreadSettingsUpdatedNotification {
thread_id: "thread-1".to_string(),
thread_settings,
});
assert_eq!(
json!({
"method": "thread/settings/update",
"id": 11,
"response": {
"threadSettings": thread_settings_json
}
}),
serde_json::to_value(&response)?,
);
assert_eq!(
json!({
"method": "thread/settings/updated",
"params": {
"threadId": "thread-1",
"threadSettings": thread_settings_json
}
}),
serde_json::to_value(&notification)?,
);
Ok(())
}
#[test]
fn serialize_config_requirements_read() -> Result<()> {
let request = ClientRequest::ConfigRequirementsRead {

View File

@@ -3546,6 +3546,39 @@ fn turn_start_params_preserve_explicit_null_service_tier() {
assert_eq!(serialized_without_override.get("serviceTier"), None);
}
#[test]
fn thread_settings_update_params_support_partial_updates_and_explicit_nulls() {
let params: ThreadSettingsUpdateParams = serde_json::from_value(json!({
"threadId": "thread_123",
"model": "gpt-5.2",
"serviceTier": null,
"effort": null
}))
.expect("params should deserialize");
assert_eq!(params.thread_id, "thread_123");
assert_eq!(params.model.as_deref(), Some("gpt-5.2"));
assert_eq!(params.service_tier, Some(None));
assert_eq!(params.effort, Some(None));
assert_eq!(params.cwd, None);
let serialized = serde_json::to_value(&params).expect("params should serialize");
assert_eq!(
serialized.get("serviceTier"),
Some(&serde_json::Value::Null)
);
assert_eq!(serialized.get("effort"), Some(&serde_json::Value::Null));
assert_eq!(serialized.get("cwd"), Some(&serde_json::Value::Null));
let without_overrides = ThreadSettingsUpdateParams {
thread_id: "thread_123".to_string(),
..Default::default()
};
let serialized_without_overrides =
serde_json::to_value(&without_overrides).expect("params should serialize");
assert_eq!(serialized_without_overrides.get("serviceTier"), None);
assert_eq!(serialized_without_overrides.get("effort"), None);
}
#[test]
fn turn_start_params_round_trip_environments() {
let cwd = test_absolute_path();

View File

@@ -11,7 +11,9 @@ use super::TurnEnvironmentParams;
use super::TurnItemsView;
use super::shared::v2_enum_from_core;
use codex_experimental_api_macros::ExperimentalApi;
use codex_protocol::config_types::CollaborationMode;
use codex_protocol::config_types::Personality;
use codex_protocol::config_types::ReasoningSummary;
use codex_protocol::models::ResponseItem;
use codex_protocol::openai_models::ReasoningEffort;
use codex_protocol::protocol::ThreadGoalStatus as CoreThreadGoalStatus;
@@ -170,6 +172,91 @@ pub struct ThreadStartParams {
pub persist_extended_history: bool,
}
#[derive(
Serialize, Deserialize, Debug, Default, Clone, PartialEq, JsonSchema, TS, ExperimentalApi,
)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadSettingsUpdateParams {
pub thread_id: String,
/// Override the working directory for subsequent turns.
#[ts(optional = nullable)]
pub cwd: Option<PathBuf>,
/// Override the approval policy for subsequent turns.
#[experimental(nested)]
#[ts(optional = nullable)]
pub approval_policy: Option<AskForApproval>,
/// Override where approval requests are routed for review on subsequent turns.
#[ts(optional = nullable)]
pub approvals_reviewer: Option<ApprovalsReviewer>,
/// Override the sandbox policy for subsequent turns.
#[ts(optional = nullable)]
pub sandbox_policy: Option<SandboxPolicy>,
/// Select a named permissions profile for subsequent turns. Cannot be
/// combined with `sandboxPolicy`.
#[experimental("thread/settings/update.permissions")]
#[schemars(with = "Option<String>")]
#[ts(type = "string | null")]
#[ts(optional = nullable)]
pub permissions: Option<String>,
/// Override the model for subsequent turns.
#[ts(optional = nullable)]
pub model: Option<String>,
/// Override the service tier for subsequent turns.
#[serde(
default,
deserialize_with = "crate::protocol::serde_helpers::deserialize_double_option",
serialize_with = "crate::protocol::serde_helpers::serialize_double_option",
skip_serializing_if = "Option::is_none"
)]
#[ts(optional = nullable)]
pub service_tier: Option<Option<String>>,
/// Override the reasoning effort for subsequent turns.
#[serde(
default,
deserialize_with = "crate::protocol::serde_helpers::deserialize_double_option",
serialize_with = "crate::protocol::serde_helpers::serialize_double_option",
skip_serializing_if = "Option::is_none"
)]
#[ts(optional = nullable)]
pub effort: Option<Option<ReasoningEffort>>,
/// Override the reasoning summary for subsequent turns.
#[ts(optional = nullable)]
pub summary: Option<ReasoningSummary>,
/// Override the personality for subsequent turns.
#[ts(optional = nullable)]
pub personality: Option<Personality>,
/// Set a pre-set collaboration mode for subsequent turns.
#[experimental("thread/settings/update.collaborationMode")]
#[ts(optional = nullable)]
pub collaboration_mode: Option<CollaborationMode>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadSettings {
pub model: String,
pub model_provider: String,
pub service_tier: Option<String>,
pub cwd: AbsolutePathBuf,
pub approval_policy: AskForApproval,
pub approvals_reviewer: ApprovalsReviewer,
pub sandbox_policy: SandboxPolicy,
pub active_permission_profile: Option<ActivePermissionProfile>,
pub effort: Option<ReasoningEffort>,
pub summary: Option<ReasoningSummary>,
pub personality: Option<Personality>,
pub collaboration_mode: CollaborationMode,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadSettingsUpdateResponse {
pub thread_settings: ThreadSettings,
}
#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
@@ -1139,6 +1226,14 @@ pub struct ThreadStatusChangedNotification {
pub status: ThreadStatus,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadSettingsUpdatedNotification {
pub thread_id: String,
pub thread_settings: ThreadSettings,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]

View File

@@ -157,6 +157,7 @@ Example with notification opt-out:
- `thread/backgroundTerminals/clean` — terminate all running background terminals for a thread (experimental; requires `capabilities.experimentalApi`); returns `{}` when the cleanup request is accepted.
- `thread/rollback` — drop the last N turns from the agents in-memory context and persist a rollback marker in the rollout so future resumes see the pruned history; returns the updated `thread` (with `turns` populated) on success.
- `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications. Experimental `runtimeWorkspaceRoots` replaces the thread-scoped runtime workspace roots used to materialize `:workspace_roots`; relative paths resolve against the effective turn cwd. Prefer experimental `permissions` profile selection by id for permission overrides; the legacy `sandboxPolicy` field is still accepted but cannot be combined with `permissions`. For `collaborationMode`, `settings.developer_instructions: null` means "use built-in instructions for the selected mode".
- `thread/settings/update` — update the stored defaults used by subsequent turns without starting a turn (experimental; requires `capabilities.experimentalApi`). Omitted fields leave the current value unchanged; fields with explicit clearing support, such as `serviceTier`, accept `null` to clear the value. The response is `{ "threadSettings": ... }` with the full effective state, and `thread/settings/updated` is emitted only when that state changes. `turn/start` emits the same notification when its thread-settings overrides change the stored defaults.
- `thread/inject_items` — append raw Responses API items to a loaded threads model-visible history without starting a user turn; returns `{}` on success.
- `turn/steer` — add user input to an already in-flight regular turn without starting a new turn; returns the active `turnId` that accepted the input. Review and manual compaction turns reject `turn/steer`.
- `turn/interrupt` — request cancellation of an in-flight turn by `(thread_id, turn_id)`; success is an empty `{}` response and the turn finishes with `status: "interrupted"`.
@@ -1211,6 +1212,7 @@ The app-server streams JSON-RPC notifications while a turn is running. Each turn
- `turn/started``{ turn }` with the turn id, empty `items`, and `status: "inProgress"`.
- `turn/completed``{ turn }` where `turn.status` is `completed`, `interrupted`, or `failed`; failures carry `{ error: { message, codexErrorInfo?, additionalDetails? } }`.
- `thread/settings/updated``{ threadId, threadSettings }` whenever the effective thread settings change. `threadSettings` includes the full effective state: model/provider, service tier, cwd, approval policy, approvals reviewer, sandbox compatibility projection, permission profile, active permission profile, reasoning effort/summary, personality, and collaboration mode.
- `turn/diff/updated``{ threadId, turnId, diff }` represents the up-to-date snapshot of the turn-level unified diff, emitted after every FileChange item. `diff` is the latest aggregated unified diff across every file change in the turn. UIs can render this to show the full "what changed" view without stitching individual `fileChange` items.
- `turn/plan/updated``{ turnId, explanation?, plan }` whenever the agent shares or changes its plan; each `plan` entry is `{ step, status }` with `status` in `pending`, `inProgress`, or `completed`.
- `model/rerouted``{ threadId, turnId, fromModel, toModel, reason }` when the backend reroutes a request to a different model (for example, due to high-risk cyber safety checks).

View File

@@ -102,7 +102,10 @@ pub const DEFAULT_IN_PROCESS_CHANNEL_CAPACITY: usize = CHANNEL_CAPACITY;
type PendingClientRequestResponse = std::result::Result<Result, JSONRPCErrorError>;
fn server_notification_requires_delivery(notification: &ServerNotification) -> bool {
matches!(notification, ServerNotification::TurnCompleted(_))
matches!(
notification,
ServerNotification::TurnCompleted(_) | ServerNotification::ThreadSettingsUpdated(_)
)
}
/// Input needed to start an in-process app-server runtime.

View File

@@ -1166,6 +1166,11 @@ impl MessageProcessor {
)
.await
}
ClientRequest::ThreadSettingsUpdate { params, .. } => {
self.turn_processor
.thread_settings_update(&request_id, params)
.await
}
ClientRequest::ThreadInjectItems { params, .. } => {
self.turn_processor.thread_inject_items(params).await
}

View File

@@ -213,6 +213,10 @@ use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadRollbackParams;
use codex_app_server_protocol::ThreadSetNameParams;
use codex_app_server_protocol::ThreadSetNameResponse;
use codex_app_server_protocol::ThreadSettings;
use codex_app_server_protocol::ThreadSettingsUpdateParams;
use codex_app_server_protocol::ThreadSettingsUpdateResponse;
use codex_app_server_protocol::ThreadSettingsUpdatedNotification;
use codex_app_server_protocol::ThreadShellCommandParams;
use codex_app_server_protocol::ThreadShellCommandResponse;
use codex_app_server_protocol::ThreadSortKey;
@@ -380,6 +384,8 @@ use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SessionConfiguredEvent;
#[cfg(test)]
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::Submission;
use codex_protocol::protocol::ThreadSettingsOverrides;
use codex_protocol::protocol::TurnEnvironmentSelection;
use codex_protocol::protocol::USER_MESSAGE_BEGIN;
use codex_protocol::protocol::W3cTraceContext;

View File

@@ -1,4 +1,5 @@
use super::*;
use codex_protocol::openai_models::ReasoningEffort;
#[derive(Clone)]
pub(crate) struct TurnRequestProcessor {
@@ -16,6 +17,62 @@ pub(crate) struct TurnRequestProcessor {
skills_watcher: Arc<SkillsWatcher>,
}
struct ThreadSettingsOverrideRequest {
cwd: Option<PathBuf>,
runtime_workspace_roots: Option<Vec<PathBuf>>,
approval_policy: Option<AskForApproval>,
approvals_reviewer: Option<codex_app_server_protocol::ApprovalsReviewer>,
sandbox_policy: Option<codex_app_server_protocol::SandboxPolicy>,
permissions: Option<String>,
model: Option<String>,
service_tier: Option<Option<String>>,
effort: Option<Option<ReasoningEffort>>,
summary: Option<codex_protocol::config_types::ReasoningSummary>,
collaboration_mode: Option<CollaborationMode>,
personality: Option<Personality>,
}
impl ThreadSettingsOverrideRequest {
fn has_any_overrides(&self) -> bool {
self.cwd.is_some()
|| self.runtime_workspace_roots.is_some()
|| self.approval_policy.is_some()
|| self.approvals_reviewer.is_some()
|| self.sandbox_policy.is_some()
|| self.permissions.is_some()
|| self.model.is_some()
|| self.service_tier.is_some()
|| self.effort.is_some()
|| self.summary.is_some()
|| self.collaboration_mode.is_some()
|| self.personality.is_some()
}
}
fn op_thread_settings_overrides(
overrides: CodexThreadSettingsOverrides,
) -> ThreadSettingsOverrides {
ThreadSettingsOverrides {
cwd: overrides.cwd,
workspace_roots: overrides.workspace_roots,
profile_workspace_roots: overrides.profile_workspace_roots,
approval_policy: overrides.approval_policy,
approvals_reviewer: overrides.approvals_reviewer,
sandbox_policy: overrides.sandbox_policy,
permission_profile: overrides.permission_profile,
active_permission_profile: overrides.active_permission_profile,
windows_sandbox_level: overrides.windows_sandbox_level,
model: overrides.model,
effort: overrides.effort,
summary: overrides.summary,
service_tier: overrides.service_tier,
collaboration_mode: overrides.collaboration_mode,
personality: overrides.personality,
}
}
const THREAD_SETTINGS_ACK_TIMEOUT: Duration = Duration::from_secs(5);
fn resolve_runtime_workspace_roots(
workspace_roots: Vec<PathBuf>,
base_cwd: &AbsolutePathBuf,
@@ -30,6 +87,35 @@ fn resolve_runtime_workspace_roots(
resolved_roots
}
fn effective_workspace_roots(
base_snapshot: &ThreadConfigSnapshot,
effective_cwd: &AbsolutePathBuf,
runtime_workspace_roots: Option<&[AbsolutePathBuf]>,
) -> Vec<AbsolutePathBuf> {
if let Some(workspace_roots) = runtime_workspace_roots {
return workspace_roots.to_vec();
}
if effective_cwd != &base_snapshot.cwd
&& base_snapshot.workspace_roots.contains(&base_snapshot.cwd)
{
let mut retargeted_roots = Vec::with_capacity(base_snapshot.workspace_roots.len());
for root in &base_snapshot.workspace_roots {
let root = if root == &base_snapshot.cwd {
effective_cwd.clone()
} else {
root.clone()
};
if !retargeted_roots.contains(&root) {
retargeted_roots.push(root);
}
}
retargeted_roots
} else {
base_snapshot.workspace_roots.clone()
}
}
impl TurnRequestProcessor {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
@@ -88,6 +174,16 @@ impl TurnRequestProcessor {
.map(|response| Some(response.into()))
}
pub(crate) async fn thread_settings_update(
&self,
request_id: &ConnectionRequestId,
params: ThreadSettingsUpdateParams,
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
self.thread_settings_update_inner(request_id, params)
.await
.map(|response| Some(response.into()))
}
pub(crate) async fn turn_steer(
&self,
request_id: &ConnectionRequestId,
@@ -326,6 +422,175 @@ impl TurnRequestProcessor {
Ok(())
}
async fn resolve_thread_settings_overrides(
&self,
base_snapshot: &ThreadConfigSnapshot,
request: ThreadSettingsOverrideRequest,
) -> Result<Option<CodexThreadSettingsOverrides>, JSONRPCErrorError> {
// Both turn/start and thread/settings/update accept the same
// persistent thread-settings fields. Resolve them once into the core
// override shape so validation and permission-profile expansion stay
// consistent between the two entry points.
if request.sandbox_policy.is_some() && request.permissions.is_some() {
return Err(invalid_request(
"`permissions` cannot be combined with `sandboxPolicy`",
));
}
if !request.has_any_overrides() {
return Ok(None);
}
let requested_cwd = request.cwd;
let effective_cwd = requested_cwd
.as_ref()
.map(|cwd| AbsolutePathBuf::resolve_path_against_base(cwd, base_snapshot.cwd.as_path()))
.unwrap_or_else(|| base_snapshot.cwd.clone());
let cwd = requested_cwd.map(|_| effective_cwd.to_path_buf());
let runtime_workspace_roots = request.runtime_workspace_roots.map(|workspace_roots| {
resolve_runtime_workspace_roots(workspace_roots, &effective_cwd)
});
let effective_workspace_roots = effective_workspace_roots(
base_snapshot,
&effective_cwd,
runtime_workspace_roots.as_deref(),
);
let approval_policy = request.approval_policy.map(AskForApproval::to_core);
let approvals_reviewer = request
.approvals_reviewer
.map(codex_app_server_protocol::ApprovalsReviewer::to_core);
let sandbox_policy = request.sandbox_policy.map(|p| p.to_core());
let (permission_profile, active_permission_profile, profile_workspace_roots) =
if let Some(permissions) = request.permissions {
let overrides = ConfigOverrides {
cwd: cwd.clone(),
workspace_roots: Some(
effective_workspace_roots
.iter()
.map(AbsolutePathBuf::to_path_buf)
.collect(),
),
default_permissions: Some(permissions),
codex_linux_sandbox_exe: self.arg0_paths.codex_linux_sandbox_exe.clone(),
main_execve_wrapper_exe: self.arg0_paths.main_execve_wrapper_exe.clone(),
..Default::default()
};
let config = self
.config_manager
.load_for_cwd(
/*request_overrides*/ None,
overrides,
Some(base_snapshot.cwd.to_path_buf()),
)
.await
.map_err(|err| config_load_error(&err))?;
// Startup config is allowed to fall back when requirements
// disallow a configured profile. An explicit thread settings
// update is different: reject it before accepting the request.
if let Some(warning) = config.startup_warnings.iter().find(|warning| {
warning.contains("Configured value for `permission_profile` is disallowed")
}) {
return Err(invalid_request(format!(
"invalid thread settings override: {warning}"
)));
}
(
Some(config.permissions.permission_profile().clone()),
config.permissions.active_permission_profile(),
Some(config.permissions.profile_workspace_roots().to_vec()),
)
} else {
(None, None, None)
};
// None means the caller sent no settings fields at all. Some means at
// least one explicit override was present, even if the effective value
// matches the current thread settings.
Ok(Some(CodexThreadSettingsOverrides {
cwd,
workspace_roots: runtime_workspace_roots,
profile_workspace_roots,
approval_policy,
approvals_reviewer,
sandbox_policy,
permission_profile,
active_permission_profile,
windows_sandbox_level: None,
model: request.model,
effort: request.effort,
summary: request.summary,
service_tier: request.service_tier,
collaboration_mode: request
.collaboration_mode
.map(|mode| self.normalize_turn_start_collaboration_mode(mode)),
personality: request.personality,
}))
}
async fn maybe_emit_thread_settings_updated(
&self,
thread_id: ThreadId,
api_thread_id: &str,
before: &ThreadSettings,
after: ThreadSettings,
) {
if before == &after {
return;
}
let connection_ids = self
.thread_state_manager
.subscribed_connection_ids(thread_id)
.await;
if connection_ids.is_empty() {
return;
}
self.outgoing
.send_server_notification_to_connections(
&connection_ids,
ServerNotification::ThreadSettingsUpdated(ThreadSettingsUpdatedNotification {
thread_id: api_thread_id.to_string(),
thread_settings: after,
}),
)
.await;
}
async fn wait_for_pending_thread_settings(
&self,
thread_id: ThreadId,
) -> Result<(), JSONRPCErrorError> {
let pending = {
let thread_state = self.thread_state_manager.thread_state(thread_id).await;
let mut thread_state = thread_state.lock().await;
thread_state.track_current_pending_thread_settings()
};
for thread_settings_applied in pending {
match tokio::time::timeout(THREAD_SETTINGS_ACK_TIMEOUT, thread_settings_applied).await {
Ok(Ok(Ok(_))) => {}
Ok(Ok(Err(err))) => {
return Err(internal_error(format!(
"failed to apply pending thread settings override: {err}"
)));
}
Ok(Err(_)) => {
return Err(internal_error(
"pending thread settings override waiter was cancelled".to_string(),
));
}
Err(_) => {
return Err(internal_error(
"timed out waiting for pending thread settings overrides to apply"
.to_string(),
));
}
}
}
Ok(())
}
async fn turn_start_inner(
&self,
request_id: ConnectionRequestId,
@@ -357,9 +622,24 @@ impl TurnRequestProcessor {
self.track_error_response(&request_id, error, /*error_type*/ None);
})?;
let collaboration_mode = params
.collaboration_mode
.map(|mode| self.normalize_turn_start_collaboration_mode(mode));
let thread_settings_request = ThreadSettingsOverrideRequest {
cwd: params.cwd,
runtime_workspace_roots: params.runtime_workspace_roots,
approval_policy: params.approval_policy,
approvals_reviewer: params.approvals_reviewer,
sandbox_policy: params.sandbox_policy,
permissions: params.permissions,
model: params.model,
service_tier: params.service_tier,
effort: params.effort.map(Some),
summary: params.summary,
collaboration_mode: params.collaboration_mode,
personality: params.personality,
};
self.wait_for_pending_thread_settings(thread_id).await?;
let before_snapshot = thread.config_snapshot().await;
let before_thread_settings = thread_settings_from_snapshot(&before_snapshot);
let environment_selections = self.parse_environment_selections(params.environments)?;
// Map v2 input items to core input items.
@@ -369,156 +649,24 @@ impl TurnRequestProcessor {
.map(V2UserInput::into_core)
.collect();
let turn_has_input = !mapped_items.is_empty();
let runtime_workspace_roots_request = params.runtime_workspace_roots.clone();
let snapshot = if params.permissions.is_some() || runtime_workspace_roots_request.is_some()
{
Some(thread.config_snapshot().await)
} else {
None
};
let has_any_overrides = params.cwd.is_some()
|| runtime_workspace_roots_request.is_some()
|| params.approval_policy.is_some()
|| params.approvals_reviewer.is_some()
|| params.sandbox_policy.is_some()
|| params.permissions.is_some()
|| params.model.is_some()
|| params.service_tier.is_some()
|| params.effort.is_some()
|| params.summary.is_some()
|| collaboration_mode.is_some()
|| params.personality.is_some();
if params.sandbox_policy.is_some() && params.permissions.is_some() {
return Err(invalid_request(
"`permissions` cannot be combined with `sandboxPolicy`",
));
}
let cwd = params.cwd;
let runtime_workspace_roots = if let Some(workspace_roots) =
runtime_workspace_roots_request.clone()
{
let Some(snapshot) = snapshot.as_ref() else {
return Err(internal_error(
"turn/start runtime workspace roots missing thread snapshot",
));
};
let base_cwd = cwd
.as_ref()
.map(|cwd| AbsolutePathBuf::resolve_path_against_base(cwd, snapshot.cwd.as_path()))
.unwrap_or_else(|| snapshot.cwd.clone());
Some(resolve_runtime_workspace_roots(workspace_roots, &base_cwd))
} else {
None
};
let approval_policy = params.approval_policy.map(AskForApproval::to_core);
let approvals_reviewer = params
.approvals_reviewer
.map(codex_app_server_protocol::ApprovalsReviewer::to_core);
let sandbox_policy = params.sandbox_policy.map(|p| p.to_core());
let (permission_profile, active_permission_profile, profile_workspace_roots) =
if let Some(permissions) = params.permissions {
let Some(snapshot) = snapshot.as_ref() else {
return Err(internal_error(
"turn/start permission selection missing thread snapshot",
));
};
let overrides = ConfigOverrides {
cwd: cwd.clone(),
workspace_roots: Some(runtime_workspace_roots_request.clone().unwrap_or_else(
|| {
snapshot
.workspace_roots
.iter()
.map(AbsolutePathBuf::to_path_buf)
.collect()
},
)),
default_permissions: Some(permissions),
codex_linux_sandbox_exe: self.arg0_paths.codex_linux_sandbox_exe.clone(),
main_execve_wrapper_exe: self.arg0_paths.main_execve_wrapper_exe.clone(),
..Default::default()
};
let config = self
.config_manager
.load_for_cwd(
/*request_overrides*/ None,
overrides,
Some(snapshot.cwd.to_path_buf()),
)
.await
.map_err(|err| config_load_error(&err))?;
// Startup config is allowed to fall back when requirements
// disallow a configured profile. An explicit turn request
// is different: reject it before accepting user input.
if let Some(warning) = config.startup_warnings.iter().find(|warning| {
warning.contains("Configured value for `permission_profile` is disallowed")
}) {
return Err(invalid_request(format!(
"invalid thread settings override: {warning}"
)));
}
(
Some(config.permissions.permission_profile().clone()),
config.permissions.active_permission_profile(),
Some(config.permissions.profile_workspace_roots().to_vec()),
)
} else {
(None, None, None)
};
let model = params.model;
let effort = params.effort.map(Some);
let summary = params.summary;
let service_tier = params.service_tier;
let personality = params.personality;
// If any overrides are provided, validate them synchronously so the
// request can fail before accepting user input. The actual update is
// still queued together with the input below to preserve submission order.
if has_any_overrides {
let resolved_overrides = self
.resolve_thread_settings_overrides(&before_snapshot, thread_settings_request)
.await?;
let has_thread_settings_overrides = resolved_overrides.is_some();
if let Some(overrides) = resolved_overrides.as_ref() {
// Validate before accepting input so the request can fail without
// queuing a user turn.
thread
.preview_thread_settings_overrides(CodexThreadSettingsOverrides {
cwd: cwd.clone(),
workspace_roots: runtime_workspace_roots.clone(),
approval_policy,
approvals_reviewer,
sandbox_policy: sandbox_policy.clone(),
permission_profile: permission_profile.clone(),
active_permission_profile: active_permission_profile.clone(),
profile_workspace_roots: profile_workspace_roots.clone(),
windows_sandbox_level: None,
model: model.clone(),
effort,
summary,
service_tier: service_tier.clone(),
collaboration_mode: collaboration_mode.clone(),
personality,
})
.preview_thread_settings_overrides(overrides.clone())
.await
.map_err(|err| {
invalid_request(format!("invalid thread settings override: {err}"))
})?;
}
let thread_settings = codex_protocol::protocol::ThreadSettingsOverrides {
cwd,
workspace_roots: runtime_workspace_roots,
profile_workspace_roots,
approval_policy,
approvals_reviewer,
sandbox_policy,
permission_profile,
active_permission_profile,
windows_sandbox_level: None,
model,
effort,
summary,
service_tier,
collaboration_mode,
personality,
};
let thread_settings = resolved_overrides
.map(op_thread_settings_overrides)
.unwrap_or_default();
// Start the turn by submitting the user input. Return its submission id as turn_id.
let turn_op = Op::UserInput {
@@ -528,14 +676,71 @@ impl TurnRequestProcessor {
responsesapi_client_metadata: params.responsesapi_client_metadata,
thread_settings,
};
let turn_id = self
.submit_core_op(&request_id, thread.as_ref(), turn_op)
let turn_id = Uuid::now_v7().to_string();
let pending_thread_settings = if has_thread_settings_overrides {
let (thread_settings_applied, notification_lock) = {
let thread_state = self.thread_state_manager.thread_state(thread_id).await;
let mut thread_state = thread_state.lock().await;
thread_state.track_pending_thread_settings(turn_id.clone())
};
Some((
thread_settings_applied,
notification_lock.lock_owned().await,
))
} else {
None
};
if let Err(err) = thread
.submit_with_id(Submission {
id: turn_id.clone(),
op: turn_op,
trace: self.request_trace_context(&request_id).await,
})
.await
.map_err(|err| {
let error = internal_error(format!("failed to start turn: {err}"));
self.track_error_response(&request_id, &error, /*error_type*/ None);
error
})?;
{
if has_thread_settings_overrides {
let thread_state = self.thread_state_manager.thread_state(thread_id).await;
let mut thread_state = thread_state.lock().await;
thread_state.cancel_pending_thread_settings(&turn_id);
}
let error = internal_error(format!("failed to start turn: {err}"));
self.track_error_response(&request_id, &error, /*error_type*/ None);
return Err(error);
}
if let Some((thread_settings_applied, thread_settings_notification_guard)) =
pending_thread_settings
{
let processor = self.clone();
let api_thread_id = params.thread_id.clone();
let tracked_turn_id = turn_id.clone();
tokio::spawn(async move {
let _thread_settings_notification_guard = thread_settings_notification_guard;
match thread_settings_applied.await {
Ok(Ok(payload)) => {
let after_thread_settings = thread_settings_from_applied_event(&payload);
processor
.maybe_emit_thread_settings_updated(
thread_id,
&api_thread_id,
&before_thread_settings,
after_thread_settings,
)
.await;
}
Ok(Err(err)) => {
tracing::warn!(
"failed to apply thread settings overrides for turn {tracked_turn_id}: {err}"
);
}
Err(_) => {
tracing::warn!(
"thread settings override acknowledgement was cancelled for turn {tracked_turn_id}"
);
}
}
});
}
if turn_has_input {
let config_snapshot = thread.config_snapshot().await;
@@ -566,6 +771,104 @@ impl TurnRequestProcessor {
Ok(TurnStartResponse { turn })
}
async fn thread_settings_update_inner(
&self,
request_id: &ConnectionRequestId,
params: ThreadSettingsUpdateParams,
) -> Result<ThreadSettingsUpdateResponse, JSONRPCErrorError> {
let (thread_id, thread) =
self.load_thread(&params.thread_id)
.await
.inspect_err(|error| {
self.track_error_response(request_id, error, /*error_type*/ None);
})?;
let thread_state = self.thread_state_manager.thread_state(thread_id).await;
super::thread_lifecycle::ensure_listener_task_running(
self.listener_task_context(),
thread_id,
thread.clone(),
thread_state.clone(),
)
.await?;
self.wait_for_pending_thread_settings(thread_id).await?;
let before_snapshot = thread.config_snapshot().await;
let before_thread_settings = thread_settings_from_snapshot(&before_snapshot);
let resolved_overrides = self
.resolve_thread_settings_overrides(
&before_snapshot,
ThreadSettingsOverrideRequest {
cwd: params.cwd,
runtime_workspace_roots: None,
approval_policy: params.approval_policy,
approvals_reviewer: params.approvals_reviewer,
sandbox_policy: params.sandbox_policy,
permissions: params.permissions,
model: params.model,
service_tier: params.service_tier,
effort: params.effort,
summary: params.summary,
collaboration_mode: params.collaboration_mode,
personality: params.personality,
},
)
.await?;
let after_thread_settings = if let Some(overrides) = resolved_overrides {
thread
.preview_thread_settings_overrides(overrides.clone())
.await
.map_err(|err| {
invalid_request(format!("invalid thread settings override: {err}"))
})?;
let update_id = Uuid::now_v7().to_string();
let (thread_settings_applied, notification_lock) = {
let mut thread_state = thread_state.lock().await;
thread_state.track_pending_thread_settings(update_id.clone())
};
let thread_settings_notification_guard = notification_lock.lock_owned().await;
if let Err(err) = thread
.submit_with_id(Submission {
id: update_id.clone(),
op: Op::ThreadSettings {
thread_settings: op_thread_settings_overrides(overrides),
},
trace: self.request_trace_context(request_id).await,
})
.await
{
let mut thread_state = thread_state.lock().await;
thread_state.cancel_pending_thread_settings(&update_id);
return Err(internal_error(format!(
"failed to update thread settings: {err}"
)));
}
let after_thread_settings = match thread_settings_applied.await {
Ok(Ok(payload)) => thread_settings_from_applied_event(&payload),
Ok(Err(err)) => return Err(invalid_request(err)),
Err(_) => {
return Err(internal_error(
"thread settings override waiter was cancelled".to_string(),
));
}
};
self.maybe_emit_thread_settings_updated(
thread_id,
&params.thread_id,
&before_thread_settings,
after_thread_settings.clone(),
)
.await;
drop(thread_settings_notification_guard);
after_thread_settings
} else {
before_thread_settings.clone()
};
Ok(ThreadSettingsUpdateResponse {
thread_settings: after_thread_settings,
})
}
async fn thread_inject_items_response_inner(
&self,
params: ThreadInjectItemsParams,
@@ -1160,6 +1463,53 @@ impl TurnRequestProcessor {
}
}
fn thread_settings_from_snapshot(config_snapshot: &ThreadConfigSnapshot) -> ThreadSettings {
ThreadSettings {
model: config_snapshot.model.clone(),
model_provider: config_snapshot.model_provider_id.clone(),
service_tier: config_snapshot.service_tier.clone(),
cwd: config_snapshot.cwd.clone(),
approval_policy: config_snapshot.approval_policy.into(),
approvals_reviewer: config_snapshot.approvals_reviewer.into(),
sandbox_policy: thread_response_sandbox_policy(
&config_snapshot.permission_profile,
config_snapshot.cwd.as_path(),
),
active_permission_profile: thread_response_active_permission_profile(
config_snapshot.active_permission_profile.clone(),
),
effort: config_snapshot.reasoning_effort,
summary: config_snapshot.reasoning_summary,
personality: config_snapshot.personality,
collaboration_mode: config_snapshot.collaboration_mode.clone(),
}
}
fn thread_settings_from_applied_event(
event: &codex_protocol::protocol::ThreadSettingsAppliedEvent,
) -> ThreadSettings {
let thread_settings = &event.thread_settings;
ThreadSettings {
model: thread_settings.model.clone(),
model_provider: thread_settings.model_provider_id.clone(),
service_tier: thread_settings.service_tier.clone(),
cwd: thread_settings.cwd.clone(),
approval_policy: thread_settings.approval_policy.into(),
approvals_reviewer: thread_settings.approvals_reviewer.into(),
sandbox_policy: thread_response_sandbox_policy(
&thread_settings.permission_profile,
thread_settings.cwd.as_path(),
),
active_permission_profile: thread_response_active_permission_profile(
thread_settings.active_permission_profile.clone(),
),
effort: thread_settings.reasoning_effort,
summary: thread_settings.reasoning_summary,
personality: thread_settings.personality,
collaboration_mode: thread_settings.collaboration_mode.clone(),
}
}
fn xcode_26_4_mcp_elicitations_auto_deny(
client_name: Option<&str>,
client_version: Option<&str>,

View File

@@ -11,6 +11,7 @@ use codex_file_watcher::WatchRegistration;
use codex_protocol::ThreadId;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::ThreadSettingsAppliedEvent;
use codex_rollout::state_db::StateDbHandle;
use codex_utils_absolute_path::AbsolutePathBuf;
use std::collections::HashMap;
@@ -24,6 +25,7 @@ use tokio::sync::watch;
use tracing::error;
type PendingInterruptQueue = Vec<ConnectionRequestId>;
type ThreadSettingsAck = Result<ThreadSettingsAppliedEvent, String>;
pub(crate) struct PendingThreadResumeRequest {
pub(crate) request_id: ConnectionRequestId,
@@ -78,6 +80,8 @@ pub(crate) struct ThreadState {
pub(crate) listener_generation: u64,
listener_command_tx: Option<mpsc::UnboundedSender<ThreadListenerCommand>>,
current_turn_history: ThreadHistoryBuilder,
pending_thread_settings_waiters: HashMap<String, Vec<oneshot::Sender<ThreadSettingsAck>>>,
thread_settings_notification_lock: Arc<Mutex<()>>,
listener_thread: Option<Weak<CodexThread>>,
watch_registration: WatchRegistration,
}
@@ -112,6 +116,7 @@ impl ThreadState {
let _ = cancel_tx.send(());
}
self.listener_command_tx = None;
self.pending_thread_settings_waiters.clear();
self.current_turn_history.reset();
self.listener_thread = None;
self.watch_registration = WatchRegistration::default();
@@ -131,11 +136,45 @@ impl ThreadState {
self.current_turn_history.active_turn_snapshot()
}
pub(crate) fn track_pending_thread_settings(
&mut self,
submission_id: String,
) -> (oneshot::Receiver<ThreadSettingsAck>, Arc<Mutex<()>>) {
let (tx, rx) = oneshot::channel();
self.pending_thread_settings_waiters
.entry(submission_id)
.or_default()
.push(tx);
(rx, Arc::clone(&self.thread_settings_notification_lock))
}
pub(crate) fn track_current_pending_thread_settings(
&mut self,
) -> Vec<oneshot::Receiver<ThreadSettingsAck>> {
let mut receivers = Vec::with_capacity(self.pending_thread_settings_waiters.len());
for waiters in self.pending_thread_settings_waiters.values_mut() {
let (tx, rx) = oneshot::channel();
waiters.push(tx);
receivers.push(rx);
}
receivers
}
pub(crate) fn cancel_pending_thread_settings(&mut self, submission_id: &str) {
self.pending_thread_settings_waiters.remove(submission_id);
}
pub(crate) fn track_current_turn_event(&mut self, event_turn_id: &str, event: &EventMsg) {
if let EventMsg::TurnStarted(payload) = event {
self.turn_summary.started_at = payload.started_at;
}
self.current_turn_history.handle_event(event);
if let EventMsg::ThreadSettingsApplied(payload) = event {
self.notify_thread_settings_applied(event_turn_id, Ok(payload.clone()));
}
if let EventMsg::Error(error) = event {
self.notify_thread_settings_applied(event_turn_id, Err(error.message.clone()));
}
if matches!(event, EventMsg::TurnAborted(_) | EventMsg::TurnComplete(_))
&& !self.current_turn_history.has_active_turn()
{
@@ -143,6 +182,14 @@ impl ThreadState {
self.current_turn_history.reset();
}
}
fn notify_thread_settings_applied(&mut self, submission_id: &str, result: ThreadSettingsAck) {
if let Some(waiters) = self.pending_thread_settings_waiters.remove(submission_id) {
for waiter in waiters {
let _ = waiter.send(result.clone());
}
}
}
}
pub(crate) async fn resolve_server_request_on_thread_listener(

View File

@@ -88,6 +88,7 @@ use codex_app_server_protocol::ThreadRealtimeStopParams;
use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadRollbackParams;
use codex_app_server_protocol::ThreadSetNameParams;
use codex_app_server_protocol::ThreadSettingsUpdateParams;
use codex_app_server_protocol::ThreadShellCommandParams;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadTurnsItemsListParams;
@@ -488,6 +489,15 @@ impl McpProcess {
self.send_request("thread/rollback", params).await
}
/// Send a `thread/settings/update` JSON-RPC request.
pub async fn send_thread_settings_update_request(
&mut self,
params: ThreadSettingsUpdateParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("thread/settings/update", params).await
}
/// Send a `thread/list` JSON-RPC request.
pub async fn send_thread_list_request(
&mut self,

View File

@@ -7,6 +7,7 @@ use app_test_support::to_response;
use base64::Engine;
use base64::engine::general_purpose::URL_SAFE_NO_PAD;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::InitializeCapabilities;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCMessage;
@@ -14,8 +15,10 @@ use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ThreadLoadedListParams;
use codex_app_server_protocol::ThreadLoadedListResponse;
use codex_app_server_protocol::ThreadSettingsUpdateParams;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use futures::SinkExt;
@@ -104,6 +107,62 @@ async fn websocket_transport_routes_per_connection_handshake_and_responses() ->
Ok(())
}
#[tokio::test]
async fn websocket_thread_settings_updates_stay_on_subscribed_connections() -> Result<()> {
let server = create_mock_responses_server_sequence_unchecked(Vec::new()).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri(), "never")?;
let (mut process, bind_addr) = spawn_websocket_server(codex_home.path()).await?;
let mut ws1 = connect_websocket(bind_addr).await?;
let mut ws2 = connect_websocket(bind_addr).await?;
send_initialize_experimental_request(&mut ws1, /*id*/ 1, "ws_context_owner").await?;
read_response_for_id(&mut ws1, /*id*/ 1).await?;
send_initialize_experimental_request(&mut ws2, /*id*/ 2, "ws_context_peer").await?;
read_response_for_id(&mut ws2, /*id*/ 2).await?;
let thread_id = start_thread(&mut ws1, /*id*/ 3).await?;
send_request(
&mut ws1,
"thread/settings/update",
/*id*/ 4,
Some(serde_json::to_value(ThreadSettingsUpdateParams {
thread_id: thread_id.clone(),
model: Some("mock-model-updated".to_string()),
..Default::default()
})?),
)
.await?;
let (_response, caller_notification) = read_response_and_notification_for_method(
&mut ws1,
/*id*/ 4,
"thread/settings/updated",
)
.await?;
let ServerNotification::ThreadSettingsUpdated(caller) =
ServerNotification::try_from(caller_notification)?
else {
bail!("expected caller thread/settings/updated notification");
};
assert_eq!(caller.thread_id, thread_id);
assert_no_notification_for_method(
&mut ws2,
"thread/settings/updated",
Duration::from_millis(250),
)
.await?;
process
.kill()
.await
.context("failed to stop websocket app-server process")?;
Ok(())
}
#[tokio::test]
async fn websocket_transport_serves_health_endpoints_on_same_listener() -> Result<()> {
let server = create_mock_responses_server_sequence_unchecked(Vec::new()).await;
@@ -591,6 +650,32 @@ pub(super) async fn send_initialize_request(
stream: &mut WsClient,
id: i64,
client_name: &str,
) -> Result<()> {
send_initialize_request_with_capabilities(stream, id, client_name, /*capabilities*/ None).await
}
async fn send_initialize_experimental_request(
stream: &mut WsClient,
id: i64,
client_name: &str,
) -> Result<()> {
send_initialize_request_with_capabilities(
stream,
id,
client_name,
Some(InitializeCapabilities {
experimental_api: true,
..Default::default()
}),
)
.await
}
async fn send_initialize_request_with_capabilities(
stream: &mut WsClient,
id: i64,
client_name: &str,
capabilities: Option<InitializeCapabilities>,
) -> Result<()> {
let params = InitializeParams {
client_info: ClientInfo {
@@ -598,7 +683,7 @@ pub(super) async fn send_initialize_request(
title: Some("WebSocket Test Client".to_string()),
version: "0.1.0".to_string(),
},
capabilities: None,
capabilities,
};
send_request(
stream,
@@ -823,6 +908,44 @@ pub(super) async fn assert_no_message(stream: &mut WsClient, wait_for: Duration)
}
}
async fn assert_no_notification_for_method(
stream: &mut WsClient,
method: &str,
wait_for: Duration,
) -> Result<()> {
let deadline = Instant::now() + wait_for;
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
return Ok(());
}
let frame = match timeout(remaining, stream.next()).await {
Ok(Some(Ok(frame))) => frame,
Ok(Some(Err(err))) => bail!("unexpected websocket read error: {err}"),
Ok(None) => bail!("websocket closed unexpectedly while waiting for notification"),
Err(_) => return Ok(()),
};
match frame {
WebSocketMessage::Text(text) => {
let message: JSONRPCMessage = serde_json::from_str(text.as_ref())?;
if let JSONRPCMessage::Notification(notification) = message
&& notification.method == method
{
bail!("unexpected notification for method `{method}`");
}
}
WebSocketMessage::Ping(payload) => {
stream.send(WebSocketMessage::Pong(payload)).await?;
}
WebSocketMessage::Pong(_) | WebSocketMessage::Frame(_) => {}
WebSocketMessage::Close(frame) => bail!("websocket closed unexpectedly: {frame:?}"),
WebSocketMessage::Binary(_) => bail!("unexpected binary websocket frame"),
}
}
}
pub(super) fn create_config_toml(
codex_home: &Path,
server_uri: &str,

View File

@@ -57,6 +57,7 @@ mod thread_name_websocket;
mod thread_read;
mod thread_resume;
mod thread_rollback;
mod thread_settings;
mod thread_shell_command;
mod thread_start;
mod thread_status;

View File

@@ -0,0 +1,480 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::create_mock_responses_server_sequence_unchecked;
use app_test_support::to_response;
use app_test_support::write_mock_responses_config_toml;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::SandboxPolicy;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ThreadSettingsUpdateParams;
use codex_app_server_protocol::ThreadSettingsUpdateResponse;
use codex_app_server_protocol::ThreadSettingsUpdatedNotification;
use codex_app_server_protocol::ThreadSource;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::UserInput as V2UserInput;
use codex_features::Feature;
use codex_protocol::openai_models::ReasoningEffort;
use codex_utils_absolute_path::AbsolutePathBuf;
use pretty_assertions::assert_eq;
use std::collections::BTreeMap;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
const INVALID_REQUEST_ERROR_CODE: i64 = -32600;
fn write_config(codex_home: &TempDir, server_uri: &str) -> Result<()> {
write_mock_responses_config_toml(
codex_home.path(),
server_uri,
&BTreeMap::<Feature, bool>::new(),
/*auto_compact_limit*/ 1_000_000,
/*requires_openai_auth*/ None,
"mock_provider",
"compact",
)?;
Ok(())
}
async fn start_thread(mcp: &mut McpProcess) -> Result<ThreadStartResponse> {
let request_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
service_tier: Some(Some("flex".to_string())),
thread_source: Some(ThreadSource::User),
..Default::default()
})
.await?;
read_response(mcp, request_id).await
}
async fn send_thread_settings_update(
mcp: &mut McpProcess,
params: ThreadSettingsUpdateParams,
) -> Result<ThreadSettingsUpdateResponse> {
let request_id = mcp.send_thread_settings_update_request(params).await?;
read_response(mcp, request_id).await
}
async fn read_response<T: serde::de::DeserializeOwned>(
mcp: &mut McpProcess,
request_id: i64,
) -> Result<T> {
let response: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
to_response(response)
}
fn text_input(text: &str) -> V2UserInput {
V2UserInput::Text {
text: text.to_string(),
text_elements: Vec::new(),
}
}
async fn wait_for_turn_completed(mcp: &mut McpProcess) -> Result<()> {
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
Ok(())
}
async fn read_thread_settings_updated(
mcp: &mut McpProcess,
) -> Result<ThreadSettingsUpdatedNotification> {
let notification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/settings/updated"),
)
.await??;
let notification: ServerNotification = notification.try_into()?;
let ServerNotification::ThreadSettingsUpdated(notification) = notification else {
anyhow::bail!("expected thread/settings/updated notification");
};
Ok(notification)
}
#[tokio::test]
async fn thread_settings_update_applies_partial_patch_and_emits_full_state() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
write_config(&codex_home, &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let ThreadStartResponse { thread, .. } = start_thread(&mut mcp).await?;
let response = send_thread_settings_update(
&mut mcp,
ThreadSettingsUpdateParams {
thread_id: thread.id.clone(),
model: Some("gpt-5.2".to_string()),
effort: Some(Some(ReasoningEffort::High)),
..Default::default()
},
)
.await?;
assert_eq!(response.thread_settings.model, "gpt-5.2");
assert_eq!(
response.thread_settings.service_tier.as_deref(),
Some("flex")
);
assert_eq!(response.thread_settings.effort, Some(ReasoningEffort::High));
assert_eq!(response.thread_settings.cwd, thread.cwd);
let notification = read_thread_settings_updated(&mut mcp).await?;
assert_eq!(notification.thread_id, thread.id);
assert_eq!(notification.thread_settings, response.thread_settings);
mcp.clear_message_buffer();
let no_op_response = send_thread_settings_update(
&mut mcp,
ThreadSettingsUpdateParams {
thread_id: thread.id,
model: Some("gpt-5.2".to_string()),
effort: Some(Some(ReasoningEffort::High)),
..Default::default()
},
)
.await?;
assert_eq!(no_op_response.thread_settings, response.thread_settings);
assert!(
!mcp.pending_notification_methods()
.iter()
.any(|method| method == "thread/settings/updated")
);
Ok(())
}
#[tokio::test]
async fn thread_settings_update_absolutizes_relative_cwd_before_permissions() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
write_config(&codex_home, &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let ThreadStartResponse { thread, .. } = start_thread(&mut mcp).await?;
let next_cwd = std::path::PathBuf::from("next-cwd");
let next_cwd_abs = thread.cwd.join(&next_cwd);
std::fs::create_dir_all(next_cwd_abs.as_path())?;
let response = send_thread_settings_update(
&mut mcp,
ThreadSettingsUpdateParams {
thread_id: thread.id.clone(),
cwd: Some(next_cwd),
permissions: Some(":workspace".to_string()),
..Default::default()
},
)
.await?;
assert_eq!(response.thread_settings.cwd, next_cwd_abs);
assert_eq!(
response
.thread_settings
.active_permission_profile
.map(|profile| profile.id),
Some(":workspace".to_string())
);
Ok(())
}
#[tokio::test]
async fn thread_settings_update_clears_service_tier_with_explicit_null() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
write_config(&codex_home, &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let ThreadStartResponse { thread, .. } = start_thread(&mut mcp).await?;
let response = send_thread_settings_update(
&mut mcp,
ThreadSettingsUpdateParams {
thread_id: thread.id,
service_tier: Some(None),
..Default::default()
},
)
.await?;
assert_eq!(response.thread_settings.service_tier, None);
let notification = read_thread_settings_updated(&mut mcp).await?;
assert_eq!(notification.thread_settings.service_tier, None);
Ok(())
}
#[tokio::test]
async fn thread_settings_update_rejects_sandbox_policy_with_permissions() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
write_config(&codex_home, &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let ThreadStartResponse { thread, .. } = start_thread(&mut mcp).await?;
let request_id = mcp
.send_thread_settings_update_request(ThreadSettingsUpdateParams {
thread_id: thread.id,
sandbox_policy: Some(SandboxPolicy::DangerFullAccess),
permissions: Some(":read-only".to_string()),
..Default::default()
})
.await?;
let err: JSONRPCError = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(err.error.code, INVALID_REQUEST_ERROR_CODE);
assert!(
err.error
.message
.contains("`permissions` cannot be combined with `sandboxPolicy`"),
"unexpected error message: {}",
err.error.message
);
Ok(())
}
#[tokio::test]
async fn thread_settings_update_waits_for_pending_cwd_before_permissions() -> Result<()> {
let server = create_mock_responses_server_sequence_unchecked(vec![
create_final_assistant_message_sse_response("Done")?,
])
.await;
let codex_home = TempDir::new()?;
write_config(&codex_home, &server.uri())?;
let next_cwd = TempDir::new()?;
let next_cwd_abs = AbsolutePathBuf::try_from(next_cwd.path())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let ThreadStartResponse { thread, .. } = start_thread(&mut mcp).await?;
let turn_request_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![text_input("Hello")],
cwd: Some(next_cwd.path().to_path_buf()),
..Default::default()
})
.await?;
let update_request_id = mcp
.send_thread_settings_update_request(ThreadSettingsUpdateParams {
thread_id: thread.id.clone(),
permissions: Some(":workspace".to_string()),
..Default::default()
})
.await?;
let _: TurnStartResponse = read_response(&mut mcp, turn_request_id).await?;
let update_response =
read_response::<ThreadSettingsUpdateResponse>(&mut mcp, update_request_id).await?;
assert_eq!(update_response.thread_settings.cwd, next_cwd_abs);
assert_eq!(
update_response
.thread_settings
.active_permission_profile
.map(|profile| profile.id),
Some(":workspace".to_string())
);
wait_for_turn_completed(&mut mcp).await?;
Ok(())
}
#[tokio::test]
async fn turn_start_emits_thread_settings_updated_when_overrides_change_defaults() -> Result<()> {
let server = create_mock_responses_server_sequence_unchecked(vec![
create_final_assistant_message_sse_response("Done")?,
])
.await;
let codex_home = TempDir::new()?;
write_config(&codex_home, &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let ThreadStartResponse { thread, .. } = start_thread(&mut mcp).await?;
let request_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![text_input("Hello")],
model: Some("gpt-5.2".to_string()),
effort: Some(ReasoningEffort::Low),
..Default::default()
})
.await?;
let _: TurnStartResponse = read_response(&mut mcp, request_id).await?;
let notification = read_thread_settings_updated(&mut mcp).await?;
assert_eq!(notification.thread_id, thread.id);
assert_eq!(notification.thread_settings.model, "gpt-5.2");
assert_eq!(
notification.thread_settings.effort,
Some(ReasoningEffort::Low)
);
assert_eq!(
notification.thread_settings.service_tier.as_deref(),
Some("flex")
);
wait_for_turn_completed(&mut mcp).await?;
Ok(())
}
async fn assert_newer_update_survives_turn_start(
turn_start_overrides: TurnStartParams,
) -> Result<()> {
let server = create_mock_responses_server_sequence_unchecked(vec![
create_final_assistant_message_sse_response("Done")?,
])
.await;
let codex_home = TempDir::new()?;
write_config(&codex_home, &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let ThreadStartResponse { thread, .. } = start_thread(&mut mcp).await?;
let turn_request_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![text_input("Hello")],
..turn_start_overrides
})
.await?;
let update_request_id = mcp
.send_thread_settings_update_request(ThreadSettingsUpdateParams {
thread_id: thread.id.clone(),
model: Some("gpt-5.4".to_string()),
effort: Some(Some(ReasoningEffort::High)),
..Default::default()
})
.await?;
let _: TurnStartResponse = read_response(&mut mcp, turn_request_id).await?;
let update_response =
read_response::<ThreadSettingsUpdateResponse>(&mut mcp, update_request_id).await?;
assert_eq!(update_response.thread_settings.model, "gpt-5.4");
assert_eq!(
update_response.thread_settings.effort,
Some(ReasoningEffort::High)
);
wait_for_turn_completed(&mut mcp).await?;
mcp.clear_message_buffer();
let read_current_response = send_thread_settings_update(
&mut mcp,
ThreadSettingsUpdateParams {
thread_id: thread.id,
..Default::default()
},
)
.await?;
assert_eq!(
read_current_response.thread_settings,
update_response.thread_settings
);
Ok(())
}
#[tokio::test]
async fn thread_settings_update_after_turn_start_preserves_newer_update() -> Result<()> {
assert_newer_update_survives_turn_start(TurnStartParams {
model: Some("gpt-5.2".to_string()),
effort: Some(ReasoningEffort::Low),
..Default::default()
})
.await
}
#[tokio::test]
async fn queued_updates_keep_each_thread_settings_notification_snapshot() -> Result<()> {
let server = create_mock_responses_server_sequence_unchecked(vec![
create_final_assistant_message_sse_response("Done")?,
])
.await;
let codex_home = TempDir::new()?;
write_config(&codex_home, &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let ThreadStartResponse { thread, .. } = start_thread(&mut mcp).await?;
let turn_request_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![text_input("Hello")],
model: Some("gpt-5.2".to_string()),
effort: Some(ReasoningEffort::Low),
..Default::default()
})
.await?;
let update_request_id = mcp
.send_thread_settings_update_request(ThreadSettingsUpdateParams {
thread_id: thread.id,
model: Some("gpt-5.4".to_string()),
effort: Some(Some(ReasoningEffort::High)),
..Default::default()
})
.await?;
let _: TurnStartResponse = read_response(&mut mcp, turn_request_id).await?;
let _: ThreadSettingsUpdateResponse = read_response(&mut mcp, update_request_id).await?;
let notifications = [
read_thread_settings_updated(&mut mcp).await?,
read_thread_settings_updated(&mut mcp).await?,
];
assert!(notifications.iter().any(|notification| {
notification.thread_settings.model == "gpt-5.2"
&& notification.thread_settings.effort == Some(ReasoningEffort::Low)
}));
assert!(notifications.iter().any(|notification| {
notification.thread_settings.model == "gpt-5.4"
&& notification.thread_settings.effort == Some(ReasoningEffort::High)
}));
wait_for_turn_completed(&mut mcp).await?;
Ok(())
}
#[tokio::test]
async fn thread_settings_update_after_no_op_turn_start_override_preserves_newer_update()
-> Result<()> {
assert_newer_update_survives_turn_start(TurnStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await
}

View File

@@ -47,6 +47,9 @@ pub(super) fn server_notification_thread_target(
ServerNotification::ThreadStatusChanged(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ThreadSettingsUpdated(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ThreadArchived(notification) => Some(notification.thread_id.as_str()),
ServerNotification::ThreadUnarchived(notification) => Some(notification.thread_id.as_str()),
ServerNotification::ThreadClosed(notification) => Some(notification.thread_id.as_str()),

View File

@@ -217,6 +217,7 @@ impl ChatWidget {
| ServerNotification::AccountRateLimitsUpdated(_)
| ServerNotification::ThreadStarted(_)
| ServerNotification::ThreadStatusChanged(_)
| ServerNotification::ThreadSettingsUpdated(_)
| ServerNotification::ThreadArchived(_)
| ServerNotification::ThreadUnarchived(_)
| ServerNotification::RawResponseItemCompleted(_)