Compare commits

...

4 Commits

Author SHA1 Message Date
acrognale-oai
18c9ac5efc refactor(runtime-install): orchestrate through executor primitives 2026-05-26 15:47:45 -04:00
acrognale-oai
9aa19c5e5f feat(runtime-install): run installs through executor processes 2026-05-26 12:02:07 -04:00
acrognale-oai
a787408a06 feat(app-server): publish installed runtime resources safely 2026-05-22 11:43:19 -04:00
acrognale-oai
bdb1ebe1f7 feat(exec-server): install validated runtime artifacts 2026-05-22 11:33:10 -04:00
52 changed files with 3843 additions and 36 deletions

20
codex-rs/Cargo.lock generated
View File

@@ -1917,6 +1917,7 @@ dependencies = [
"codex-protocol",
"codex-rmcp-client",
"codex-rollout",
"codex-runtime-install",
"codex-sandboxing",
"codex-shell-command",
"codex-state",
@@ -3584,6 +3585,25 @@ dependencies = [
"uuid",
]
[[package]]
name = "codex-runtime-install"
version = "0.0.0"
dependencies = [
"codex-app-server-protocol",
"codex-exec-server",
"codex-protocol",
"codex-utils-absolute-path",
"pretty_assertions",
"serde",
"serde_json",
"sha2",
"tempfile",
"tokio",
"tokio-util",
"tracing",
"uuid",
]
[[package]]
name = "codex-sandboxing"
version = "0.0.0"

View File

@@ -42,6 +42,7 @@ members = [
"exec",
"file-system",
"exec-server",
"runtime-install",
"execpolicy",
"execpolicy-legacy",
"ext/extension-api",
@@ -161,6 +162,7 @@ codex-core-skills = { path = "core-skills" }
codex-exec = { path = "exec" }
codex-file-system = { path = "file-system" }
codex-exec-server = { path = "exec-server" }
codex-runtime-install = { path = "runtime-install" }
codex-execpolicy = { path = "execpolicy" }
codex-extension-api = { path = "ext/extension-api" }
codex-goal-extension = { path = "ext/goal" }

View File

@@ -2765,6 +2765,82 @@
}
]
},
"RuntimeInstallManifestParams": {
"properties": {
"archiveName": {
"type": [
"string",
"null"
]
},
"archiveSha256": {
"type": "string"
},
"archiveSizeBytes": {
"format": "uint64",
"minimum": 0.0,
"type": [
"integer",
"null"
]
},
"archiveUrl": {
"type": "string"
},
"bundleFormatVersion": {
"format": "uint32",
"minimum": 0.0,
"type": [
"integer",
"null"
]
},
"bundleVersion": {
"type": [
"string",
"null"
]
},
"format": {
"type": [
"string",
"null"
]
},
"runtimeRootDirectoryName": {
"type": [
"string",
"null"
]
}
},
"required": [
"archiveSha256",
"archiveUrl"
],
"type": "object"
},
"RuntimeInstallParams": {
"properties": {
"environmentId": {
"type": [
"string",
"null"
]
},
"manifest": {
"$ref": "#/definitions/RuntimeInstallManifestParams"
},
"release": {
"type": "string"
}
},
"required": [
"manifest",
"release"
],
"type": "object"
},
"SandboxMode": {
"enum": [
"read-only",
@@ -5309,6 +5385,53 @@
"title": "Plugin/installRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
},
"method": {
"enum": [
"runtime/install"
],
"title": "Runtime/installRequestMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/RuntimeInstallParams"
}
},
"required": [
"id",
"method",
"params"
],
"title": "Runtime/installRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
},
"method": {
"enum": [
"runtime/install/cancel"
],
"title": "Runtime/install/cancelRequestMethod",
"type": "string"
},
"params": {
"type": "null"
}
},
"required": [
"id",
"method"
],
"title": "Runtime/install/cancelRequest",
"type": "object"
},
{
"properties": {
"id": {

View File

@@ -2946,6 +2946,51 @@
},
"type": "object"
},
"RuntimeInstallProgressNotification": {
"properties": {
"bundleVersion": {
"type": [
"string",
"null"
]
},
"downloadedBytes": {
"format": "uint64",
"minimum": 0.0,
"type": [
"integer",
"null"
]
},
"phase": {
"$ref": "#/definitions/RuntimeInstallProgressPhase"
},
"totalBytes": {
"format": "uint64",
"minimum": 0.0,
"type": [
"integer",
"null"
]
}
},
"required": [
"phase"
],
"type": "object"
},
"RuntimeInstallProgressPhase": {
"enum": [
"checking",
"downloading",
"verifying",
"extracting",
"validating",
"installed",
"configuring"
],
"type": "string"
},
"SandboxPolicy": {
"oneOf": [
{
@@ -5397,6 +5442,26 @@
"title": "Skills/changedNotification",
"type": "object"
},
{
"properties": {
"method": {
"enum": [
"runtime/install/progress"
],
"title": "Runtime/install/progressNotificationMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/RuntimeInstallProgressNotification"
}
},
"required": [
"method",
"params"
],
"title": "Runtime/install/progressNotification",
"type": "object"
},
{
"properties": {
"method": {
@@ -6546,4 +6611,4 @@
}
],
"title": "ServerNotification"
}
}

View File

@@ -1309,6 +1309,53 @@
"title": "Plugin/installRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
"runtime/install"
],
"title": "Runtime/installRequestMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/v2/RuntimeInstallParams"
}
},
"required": [
"id",
"method",
"params"
],
"title": "Runtime/installRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
"runtime/install/cancel"
],
"title": "Runtime/install/cancelRequestMethod",
"type": "string"
},
"params": {
"type": "null"
}
},
"required": [
"id",
"method"
],
"title": "Runtime/install/cancelRequest",
"type": "object"
},
{
"properties": {
"id": {
@@ -4067,6 +4114,26 @@
"title": "Skills/changedNotification",
"type": "object"
},
{
"properties": {
"method": {
"enum": [
"runtime/install/progress"
],
"title": "Runtime/install/progressNotificationMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/v2/RuntimeInstallProgressNotification"
}
},
"required": [
"method",
"params"
],
"title": "Runtime/install/progressNotification",
"type": "object"
},
{
"properties": {
"method": {
@@ -14613,6 +14680,221 @@
}
]
},
"RuntimeInstallCancelResponse": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"status": {
"$ref": "#/definitions/v2/RuntimeInstallCancelStatus"
}
},
"required": [
"status"
],
"title": "RuntimeInstallCancelResponse",
"type": "object"
},
"RuntimeInstallCancelStatus": {
"enum": [
"canceled",
"not-found"
],
"type": "string"
},
"RuntimeInstallManifestParams": {
"properties": {
"archiveName": {
"type": [
"string",
"null"
]
},
"archiveSha256": {
"type": "string"
},
"archiveSizeBytes": {
"format": "uint64",
"minimum": 0.0,
"type": [
"integer",
"null"
]
},
"archiveUrl": {
"type": "string"
},
"bundleFormatVersion": {
"format": "uint32",
"minimum": 0.0,
"type": [
"integer",
"null"
]
},
"bundleVersion": {
"type": [
"string",
"null"
]
},
"format": {
"type": [
"string",
"null"
]
},
"runtimeRootDirectoryName": {
"type": [
"string",
"null"
]
}
},
"required": [
"archiveSha256",
"archiveUrl"
],
"type": "object"
},
"RuntimeInstallParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"environmentId": {
"type": [
"string",
"null"
]
},
"manifest": {
"$ref": "#/definitions/v2/RuntimeInstallManifestParams"
},
"release": {
"type": "string"
}
},
"required": [
"manifest",
"release"
],
"title": "RuntimeInstallParams",
"type": "object"
},
"RuntimeInstallPaths": {
"properties": {
"bundledPluginMarketplacePaths": {
"items": {
"$ref": "#/definitions/v2/AbsolutePathBuf"
},
"type": "array"
},
"bundledSkillPaths": {
"items": {
"$ref": "#/definitions/v2/AbsolutePathBuf"
},
"type": "array"
},
"nodeModulesPath": {
"$ref": "#/definitions/v2/AbsolutePathBuf"
},
"nodePath": {
"$ref": "#/definitions/v2/AbsolutePathBuf"
},
"pythonPath": {
"$ref": "#/definitions/v2/AbsolutePathBuf"
},
"skillsToRemove": {
"items": {
"type": "string"
},
"type": "array"
}
},
"required": [
"bundledPluginMarketplacePaths",
"bundledSkillPaths",
"nodeModulesPath",
"nodePath",
"pythonPath",
"skillsToRemove"
],
"type": "object"
},
"RuntimeInstallProgressNotification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"bundleVersion": {
"type": [
"string",
"null"
]
},
"downloadedBytes": {
"format": "uint64",
"minimum": 0.0,
"type": [
"integer",
"null"
]
},
"phase": {
"$ref": "#/definitions/v2/RuntimeInstallProgressPhase"
},
"totalBytes": {
"format": "uint64",
"minimum": 0.0,
"type": [
"integer",
"null"
]
}
},
"required": [
"phase"
],
"title": "RuntimeInstallProgressNotification",
"type": "object"
},
"RuntimeInstallProgressPhase": {
"enum": [
"checking",
"downloading",
"verifying",
"extracting",
"validating",
"installed",
"configuring"
],
"type": "string"
},
"RuntimeInstallResponse": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"bundleVersion": {
"type": [
"string",
"null"
]
},
"paths": {
"$ref": "#/definitions/v2/RuntimeInstallPaths"
},
"status": {
"$ref": "#/definitions/v2/RuntimeInstallStatus"
}
},
"required": [
"paths",
"status"
],
"title": "RuntimeInstallResponse",
"type": "object"
},
"RuntimeInstallStatus": {
"enum": [
"already-current",
"installed"
],
"type": "string"
},
"SandboxMode": {
"enum": [
"read-only",

View File

@@ -2035,6 +2035,53 @@
"title": "Plugin/installRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
},
"method": {
"enum": [
"runtime/install"
],
"title": "Runtime/installRequestMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/RuntimeInstallParams"
}
},
"required": [
"id",
"method",
"params"
],
"title": "Runtime/installRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
},
"method": {
"enum": [
"runtime/install/cancel"
],
"title": "Runtime/install/cancelRequestMethod",
"type": "string"
},
"params": {
"type": "null"
}
},
"required": [
"id",
"method"
],
"title": "Runtime/install/cancelRequest",
"type": "object"
},
{
"properties": {
"id": {
@@ -11142,6 +11189,221 @@
}
]
},
"RuntimeInstallCancelResponse": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"status": {
"$ref": "#/definitions/RuntimeInstallCancelStatus"
}
},
"required": [
"status"
],
"title": "RuntimeInstallCancelResponse",
"type": "object"
},
"RuntimeInstallCancelStatus": {
"enum": [
"canceled",
"not-found"
],
"type": "string"
},
"RuntimeInstallManifestParams": {
"properties": {
"archiveName": {
"type": [
"string",
"null"
]
},
"archiveSha256": {
"type": "string"
},
"archiveSizeBytes": {
"format": "uint64",
"minimum": 0.0,
"type": [
"integer",
"null"
]
},
"archiveUrl": {
"type": "string"
},
"bundleFormatVersion": {
"format": "uint32",
"minimum": 0.0,
"type": [
"integer",
"null"
]
},
"bundleVersion": {
"type": [
"string",
"null"
]
},
"format": {
"type": [
"string",
"null"
]
},
"runtimeRootDirectoryName": {
"type": [
"string",
"null"
]
}
},
"required": [
"archiveSha256",
"archiveUrl"
],
"type": "object"
},
"RuntimeInstallParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"environmentId": {
"type": [
"string",
"null"
]
},
"manifest": {
"$ref": "#/definitions/RuntimeInstallManifestParams"
},
"release": {
"type": "string"
}
},
"required": [
"manifest",
"release"
],
"title": "RuntimeInstallParams",
"type": "object"
},
"RuntimeInstallPaths": {
"properties": {
"bundledPluginMarketplacePaths": {
"items": {
"$ref": "#/definitions/AbsolutePathBuf"
},
"type": "array"
},
"bundledSkillPaths": {
"items": {
"$ref": "#/definitions/AbsolutePathBuf"
},
"type": "array"
},
"nodeModulesPath": {
"$ref": "#/definitions/AbsolutePathBuf"
},
"nodePath": {
"$ref": "#/definitions/AbsolutePathBuf"
},
"pythonPath": {
"$ref": "#/definitions/AbsolutePathBuf"
},
"skillsToRemove": {
"items": {
"type": "string"
},
"type": "array"
}
},
"required": [
"bundledPluginMarketplacePaths",
"bundledSkillPaths",
"nodeModulesPath",
"nodePath",
"pythonPath",
"skillsToRemove"
],
"type": "object"
},
"RuntimeInstallProgressNotification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"bundleVersion": {
"type": [
"string",
"null"
]
},
"downloadedBytes": {
"format": "uint64",
"minimum": 0.0,
"type": [
"integer",
"null"
]
},
"phase": {
"$ref": "#/definitions/RuntimeInstallProgressPhase"
},
"totalBytes": {
"format": "uint64",
"minimum": 0.0,
"type": [
"integer",
"null"
]
}
},
"required": [
"phase"
],
"title": "RuntimeInstallProgressNotification",
"type": "object"
},
"RuntimeInstallProgressPhase": {
"enum": [
"checking",
"downloading",
"verifying",
"extracting",
"validating",
"installed",
"configuring"
],
"type": "string"
},
"RuntimeInstallResponse": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"bundleVersion": {
"type": [
"string",
"null"
]
},
"paths": {
"$ref": "#/definitions/RuntimeInstallPaths"
},
"status": {
"$ref": "#/definitions/RuntimeInstallStatus"
}
},
"required": [
"paths",
"status"
],
"title": "RuntimeInstallResponse",
"type": "object"
},
"RuntimeInstallStatus": {
"enum": [
"already-current",
"installed"
],
"type": "string"
},
"SandboxMode": {
"enum": [
"read-only",
@@ -11444,6 +11706,26 @@
"title": "Skills/changedNotification",
"type": "object"
},
{
"properties": {
"method": {
"enum": [
"runtime/install/progress"
],
"title": "Runtime/install/progressNotificationMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/RuntimeInstallProgressNotification"
}
},
"required": [
"method",
"params"
],
"title": "Runtime/install/progressNotification",
"type": "object"
},
{
"properties": {
"method": {

View File

@@ -0,0 +1,22 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"RuntimeInstallCancelStatus": {
"enum": [
"canceled",
"not-found"
],
"type": "string"
}
},
"properties": {
"status": {
"$ref": "#/definitions/RuntimeInstallCancelStatus"
}
},
"required": [
"status"
],
"title": "RuntimeInstallCancelResponse",
"type": "object"
}

View File

@@ -0,0 +1,80 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"RuntimeInstallManifestParams": {
"properties": {
"archiveName": {
"type": [
"string",
"null"
]
},
"archiveSha256": {
"type": "string"
},
"archiveSizeBytes": {
"format": "uint64",
"minimum": 0.0,
"type": [
"integer",
"null"
]
},
"archiveUrl": {
"type": "string"
},
"bundleFormatVersion": {
"format": "uint32",
"minimum": 0.0,
"type": [
"integer",
"null"
]
},
"bundleVersion": {
"type": [
"string",
"null"
]
},
"format": {
"type": [
"string",
"null"
]
},
"runtimeRootDirectoryName": {
"type": [
"string",
"null"
]
}
},
"required": [
"archiveSha256",
"archiveUrl"
],
"type": "object"
}
},
"properties": {
"environmentId": {
"type": [
"string",
"null"
]
},
"manifest": {
"$ref": "#/definitions/RuntimeInstallManifestParams"
},
"release": {
"type": "string"
}
},
"required": [
"manifest",
"release"
],
"title": "RuntimeInstallParams",
"type": "object"
}

View File

@@ -0,0 +1,49 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"RuntimeInstallProgressPhase": {
"enum": [
"checking",
"downloading",
"verifying",
"extracting",
"validating",
"installed",
"configuring"
],
"type": "string"
}
},
"properties": {
"bundleVersion": {
"type": [
"string",
"null"
]
},
"downloadedBytes": {
"format": "uint64",
"minimum": 0.0,
"type": [
"integer",
"null"
]
},
"phase": {
"$ref": "#/definitions/RuntimeInstallProgressPhase"
},
"totalBytes": {
"format": "uint64",
"minimum": 0.0,
"type": [
"integer",
"null"
]
}
},
"required": [
"phase"
],
"title": "RuntimeInstallProgressNotification",
"type": "object"
}

View File

@@ -0,0 +1,76 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"AbsolutePathBuf": {
"description": "A path that is guaranteed to be absolute and normalized (though it is not guaranteed to be canonicalized or exist on the filesystem).\n\nIMPORTANT: When deserializing an `AbsolutePathBuf`, a base path must be set using [AbsolutePathBufGuard::new]. If no base path is set, the deserialization will fail unless the path being deserialized is already absolute.",
"type": "string"
},
"RuntimeInstallPaths": {
"properties": {
"bundledPluginMarketplacePaths": {
"items": {
"$ref": "#/definitions/AbsolutePathBuf"
},
"type": "array"
},
"bundledSkillPaths": {
"items": {
"$ref": "#/definitions/AbsolutePathBuf"
},
"type": "array"
},
"nodeModulesPath": {
"$ref": "#/definitions/AbsolutePathBuf"
},
"nodePath": {
"$ref": "#/definitions/AbsolutePathBuf"
},
"pythonPath": {
"$ref": "#/definitions/AbsolutePathBuf"
},
"skillsToRemove": {
"items": {
"type": "string"
},
"type": "array"
}
},
"required": [
"bundledPluginMarketplacePaths",
"bundledSkillPaths",
"nodeModulesPath",
"nodePath",
"pythonPath",
"skillsToRemove"
],
"type": "object"
},
"RuntimeInstallStatus": {
"enum": [
"already-current",
"installed"
],
"type": "string"
}
},
"properties": {
"bundleVersion": {
"type": [
"string",
"null"
]
},
"paths": {
"$ref": "#/definitions/RuntimeInstallPaths"
},
"status": {
"$ref": "#/definitions/RuntimeInstallStatus"
}
},
"required": [
"paths",
"status"
],
"title": "RuntimeInstallResponse",
"type": "object"
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,6 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { RuntimeInstallCancelStatus } from "./RuntimeInstallCancelStatus";
export type RuntimeInstallCancelResponse = { status: RuntimeInstallCancelStatus, };

View File

@@ -0,0 +1,5 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type RuntimeInstallCancelStatus = "canceled" | "not-found";

View File

@@ -0,0 +1,5 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type RuntimeInstallManifestParams = { archiveName?: string | null, archiveSha256: string, archiveSizeBytes?: bigint | null, archiveUrl: string, bundleFormatVersion?: number | null, bundleVersion?: string | null, format?: string | null, runtimeRootDirectoryName?: string | null, };

View File

@@ -0,0 +1,6 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { RuntimeInstallManifestParams } from "./RuntimeInstallManifestParams";
export type RuntimeInstallParams = { environmentId?: string | null, manifest: RuntimeInstallManifestParams, release: string, };

View File

@@ -0,0 +1,6 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { AbsolutePathBuf } from "../AbsolutePathBuf";
export type RuntimeInstallPaths = { bundledPluginMarketplacePaths: Array<AbsolutePathBuf>, bundledSkillPaths: Array<AbsolutePathBuf>, nodeModulesPath: AbsolutePathBuf, nodePath: AbsolutePathBuf, pythonPath: AbsolutePathBuf, skillsToRemove: Array<string>, };

View File

@@ -0,0 +1,6 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { RuntimeInstallProgressPhase } from "./RuntimeInstallProgressPhase";
export type RuntimeInstallProgressNotification = { bundleVersion: string | null, downloadedBytes: bigint | null, phase: RuntimeInstallProgressPhase, totalBytes: bigint | null, };

View File

@@ -0,0 +1,5 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type RuntimeInstallProgressPhase = "checking" | "downloading" | "verifying" | "extracting" | "validating" | "installed" | "configuring";

View File

@@ -0,0 +1,7 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { RuntimeInstallPaths } from "./RuntimeInstallPaths";
import type { RuntimeInstallStatus } from "./RuntimeInstallStatus";
export type RuntimeInstallResponse = { bundleVersion: string | null, paths: RuntimeInstallPaths, status: RuntimeInstallStatus, };

View File

@@ -0,0 +1,5 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type RuntimeInstallStatus = "already-current" | "installed";

View File

@@ -325,6 +325,15 @@ export type { ReviewDelivery } from "./ReviewDelivery";
export type { ReviewStartParams } from "./ReviewStartParams";
export type { ReviewStartResponse } from "./ReviewStartResponse";
export type { ReviewTarget } from "./ReviewTarget";
export type { RuntimeInstallCancelResponse } from "./RuntimeInstallCancelResponse";
export type { RuntimeInstallCancelStatus } from "./RuntimeInstallCancelStatus";
export type { RuntimeInstallManifestParams } from "./RuntimeInstallManifestParams";
export type { RuntimeInstallParams } from "./RuntimeInstallParams";
export type { RuntimeInstallPaths } from "./RuntimeInstallPaths";
export type { RuntimeInstallProgressNotification } from "./RuntimeInstallProgressNotification";
export type { RuntimeInstallProgressPhase } from "./RuntimeInstallProgressPhase";
export type { RuntimeInstallResponse } from "./RuntimeInstallResponse";
export type { RuntimeInstallStatus } from "./RuntimeInstallStatus";
export type { SandboxMode } from "./SandboxMode";
export type { SandboxPolicy } from "./SandboxPolicy";
export type { SandboxWorkspaceWrite } from "./SandboxWorkspaceWrite";

View File

@@ -737,6 +737,16 @@ client_request_definitions! {
serialization: global("config"),
response: v2::PluginInstallResponse,
},
RuntimeInstall => "runtime/install" {
params: v2::RuntimeInstallParams,
serialization: global("runtime-install"),
response: v2::RuntimeInstallResponse,
},
RuntimeInstallCancel => "runtime/install/cancel" {
params: #[ts(type = "undefined")] #[serde(skip_serializing_if = "Option::is_none")] Option<()>,
serialization: None,
response: v2::RuntimeInstallCancelResponse,
},
PluginUninstall => "plugin/uninstall" {
params: v2::PluginUninstallParams,
serialization: global("config"),
@@ -1475,6 +1485,7 @@ server_notification_definitions! {
ThreadUnarchived => "thread/unarchived" (v2::ThreadUnarchivedNotification),
ThreadClosed => "thread/closed" (v2::ThreadClosedNotification),
SkillsChanged => "skills/changed" (v2::SkillsChangedNotification),
RuntimeInstallProgress => "runtime/install/progress" (v2::RuntimeInstallProgressNotification),
ThreadNameUpdated => "thread/name/updated" (v2::ThreadNameUpdatedNotification),
ThreadGoalUpdated => "thread/goal/updated" (v2::ThreadGoalUpdatedNotification),
ThreadGoalCleared => "thread/goal/cleared" (v2::ThreadGoalClearedNotification),

View File

@@ -21,6 +21,7 @@ mod process;
mod realtime;
mod remote_control;
mod review;
mod runtime;
mod thread;
mod thread_data;
mod turn;
@@ -47,6 +48,7 @@ pub use process::*;
pub use realtime::*;
pub use remote_control::*;
pub use review::*;
pub use runtime::*;
pub use shared::*;
pub use thread::*;
pub use thread_data::*;

View File

@@ -0,0 +1,102 @@
use codex_utils_absolute_path::AbsolutePathBuf;
use schemars::JsonSchema;
use serde::Deserialize;
use serde::Serialize;
use ts_rs::TS;
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct RuntimeInstallManifestParams {
#[ts(optional = nullable)]
pub archive_name: Option<String>,
pub archive_sha256: String,
#[ts(optional = nullable)]
pub archive_size_bytes: Option<u64>,
pub archive_url: String,
#[ts(optional = nullable)]
pub bundle_format_version: Option<u32>,
#[ts(optional = nullable)]
pub bundle_version: Option<String>,
#[ts(optional = nullable)]
pub format: Option<String>,
#[ts(optional = nullable)]
pub runtime_root_directory_name: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct RuntimeInstallParams {
#[ts(optional = nullable)]
pub environment_id: Option<String>,
pub manifest: Box<RuntimeInstallManifestParams>,
pub release: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "kebab-case")]
#[ts(export_to = "v2/")]
pub enum RuntimeInstallCancelStatus {
Canceled,
NotFound,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct RuntimeInstallCancelResponse {
pub status: RuntimeInstallCancelStatus,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "kebab-case")]
#[ts(export_to = "v2/")]
pub enum RuntimeInstallStatus {
AlreadyCurrent,
Installed,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct RuntimeInstallPaths {
pub bundled_plugin_marketplace_paths: Vec<AbsolutePathBuf>,
pub bundled_skill_paths: Vec<AbsolutePathBuf>,
pub node_modules_path: AbsolutePathBuf,
pub node_path: AbsolutePathBuf,
pub python_path: AbsolutePathBuf,
pub skills_to_remove: Vec<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct RuntimeInstallResponse {
pub bundle_version: Option<String>,
pub paths: RuntimeInstallPaths,
pub status: RuntimeInstallStatus,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "kebab-case")]
#[ts(export_to = "v2/")]
pub enum RuntimeInstallProgressPhase {
Checking,
Downloading,
Verifying,
Extracting,
Validating,
Installed,
Configuring,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct RuntimeInstallProgressNotification {
pub bundle_version: Option<String>,
pub downloaded_bytes: Option<u64>,
pub phase: RuntimeInstallProgressPhase,
pub total_bytes: Option<u64>,
}

View File

@@ -65,6 +65,7 @@ codex-app-server-transport = { workspace = true }
codex-feedback = { workspace = true }
codex-rmcp-client = { workspace = true }
codex-rollout = { workspace = true }
codex-runtime-install = { workspace = true }
codex-sandboxing = { workspace = true }
codex-state = { workspace = true }
codex-thread-store = { workspace = true }

View File

@@ -212,6 +212,9 @@ Example with notification opt-out:
- `remoteControl/status/changed` — notification emitted when the remote-control status or client-visible environment id changes. `status` is one of `disabled`, `connecting`, `connected`, or `errored`; `serverName` is the local machine name used by this app-server process; `environmentId` is a string when the app-server has a current enrollment and `null` when that enrollment is cleared, invalidated, or remote control is disabled. Newly initialized app-server clients always receive the current status snapshot.
- `skills/config/write` — write user-level skill config by name or absolute path.
- `plugin/install` — install a plugin from a discovered marketplace entry, rejecting marketplace entries marked unavailable for install, install MCPs if any, and return the effective plugin auth policy plus any apps that still need auth (**under development; do not call from production clients yet**).
- `runtime/install` — install the selected primary runtime bundle and return its executable, dependency, bundled-skill, and bundled-marketplace paths after app-server config synchronization. App-server coordinates the install through the selected environment's generic process and filesystem operations; runtime installs are process-wide and serialized.
- `runtime/install/progress` — notification sent to the connection that requested `runtime/install` as the active install moves through checking, downloading, verifying, extracting, validating, installed, and configuring phases. Download notifications include byte counts when available.
- `runtime/install/cancel` — cancel the one active `runtime/install` request for this app-server process and report whether an install was found.
- `plugin/uninstall` — uninstall a local plugin by `pluginId` in `<plugin>@<marketplace>` form by removing its cached files and clearing its user-level config entry, or uninstall a remote ChatGPT plugin by backend `pluginId` by forwarding the uninstall to the ChatGPT plugin backend and removing any downloaded remote-plugin cache (**under development; do not call from production clients yet**).
- `mcpServer/oauth/login` — start an OAuth login for a configured MCP server; returns an `authorization_url` and later emits `mcpServer/oauthLogin/completed` once the browser flow finishes.
- `tool/requestUserInput` — prompt the user with 13 short questions for a tool call and return their answers (experimental).

View File

@@ -95,6 +95,7 @@ mod models;
mod outgoing_message;
mod request_processors;
mod request_serialization;
mod runtime_install;
mod server_request_error;
mod skills_watcher;
mod thread_state;

View File

@@ -31,6 +31,7 @@ use crate::request_processors::McpRequestProcessor;
use crate::request_processors::PluginRequestProcessor;
use crate::request_processors::ProcessExecRequestProcessor;
use crate::request_processors::RemoteControlRequestProcessor;
use crate::request_processors::RuntimeInstallRequestProcessor;
use crate::request_processors::SearchRequestProcessor;
use crate::request_processors::ThreadGoalRequestProcessor;
use crate::request_processors::ThreadRequestProcessor;
@@ -177,6 +178,7 @@ pub(crate) struct MessageProcessor {
mcp_processor: McpRequestProcessor,
plugin_processor: PluginRequestProcessor,
remote_control_processor: RemoteControlRequestProcessor,
runtime_install_processor: RuntimeInstallRequestProcessor,
search_processor: SearchRequestProcessor,
thread_goal_processor: ThreadGoalRequestProcessor,
thread_processor: ThreadRequestProcessor,
@@ -402,6 +404,11 @@ impl MessageProcessor {
workspace_settings_cache,
);
let remote_control_processor = RemoteControlRequestProcessor::new(remote_control_handle);
let runtime_install_processor = RuntimeInstallRequestProcessor::new(
Arc::clone(&environment_manager_for_requests),
outgoing.clone(),
Arc::clone(&thread_manager),
);
let search_processor = SearchRequestProcessor::new(outgoing.clone());
let thread_goal_processor = ThreadGoalRequestProcessor::new(
Arc::clone(&thread_manager),
@@ -498,6 +505,7 @@ impl MessageProcessor {
mcp_processor,
plugin_processor,
remote_control_processor,
runtime_install_processor,
search_processor,
thread_goal_processor,
thread_processor,
@@ -968,6 +976,16 @@ impl MessageProcessor {
.model_provider_capabilities_read()
.await
.map(|response| Some(response.into())),
ClientRequest::RuntimeInstall { params, .. } => {
self.runtime_install_processor
.install_runtime(connection_id, params)
.await
}
ClientRequest::RuntimeInstallCancel { .. } => {
self.runtime_install_processor
.cancel_runtime_install()
.await
}
ClientRequest::ThreadStart { params, .. } => {
self.thread_processor
.thread_start(

View File

@@ -466,6 +466,7 @@ mod mcp_processor;
mod plugins;
mod process_exec_processor;
mod remote_control_processor;
mod runtime_install_processor;
mod search;
mod thread_processor;
mod token_usage_replay;
@@ -488,6 +489,7 @@ pub(crate) use mcp_processor::McpRequestProcessor;
pub(crate) use plugins::PluginRequestProcessor;
pub(crate) use process_exec_processor::ProcessExecRequestProcessor;
pub(crate) use remote_control_processor::RemoteControlRequestProcessor;
pub(crate) use runtime_install_processor::RuntimeInstallRequestProcessor;
pub(crate) use search::SearchRequestProcessor;
pub(crate) use thread_goal_processor::ThreadGoalRequestProcessor;
pub(crate) use thread_processor::ThreadRequestProcessor;

View File

@@ -0,0 +1,156 @@
use super::*;
use codex_app_server_protocol::RuntimeInstallCancelResponse;
use codex_app_server_protocol::RuntimeInstallCancelStatus;
use codex_app_server_protocol::RuntimeInstallParams;
use codex_app_server_protocol::RuntimeInstallProgressNotification;
use codex_app_server_protocol::RuntimeInstallProgressPhase;
use std::sync::Mutex as StdMutex;
#[derive(Clone)]
pub(crate) struct RuntimeInstallRequestProcessor {
environment_manager: Arc<EnvironmentManager>,
outgoing: Arc<OutgoingMessageSender>,
thread_manager: Arc<ThreadManager>,
active_install: Arc<StdMutex<Option<CancellationToken>>>,
}
impl RuntimeInstallRequestProcessor {
pub(crate) fn new(
environment_manager: Arc<EnvironmentManager>,
outgoing: Arc<OutgoingMessageSender>,
thread_manager: Arc<ThreadManager>,
) -> Self {
Self {
environment_manager,
outgoing,
thread_manager,
active_install: Arc::new(StdMutex::new(None)),
}
}
pub(crate) async fn install_runtime(
&self,
connection_id: ConnectionId,
mut params: RuntimeInstallParams,
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
let (cancellation, _active_install) = self.begin_install()?;
let environment = if let Some(environment_id) = params.environment_id.take() {
self.environment_manager
.get_environment(&environment_id)
.ok_or_else(|| {
invalid_request(format!(
"unknown runtime install environment id `{environment_id}`"
))
})?
} else {
self.environment_manager
.default_or_local_environment()
.ok_or_else(|| internal_error("runtime install environment is not configured"))?
};
let (progress_tx, mut progress_rx) = tokio::sync::mpsc::unbounded_channel();
let outgoing = Arc::clone(&self.outgoing);
let progress_forwarder = tokio::spawn(async move {
while let Some(progress) = progress_rx.recv().await {
outgoing
.send_server_notification_to_connections(
&[connection_id],
ServerNotification::RuntimeInstallProgress(progress),
)
.await;
}
});
let install_result = codex_runtime_install::install_runtime_with_progress(
&environment,
params,
progress_tx,
cancellation,
)
.await;
if let Err(error) = progress_forwarder.await {
warn!("runtime install progress forwarder failed: {error}");
}
let response = install_result?;
self.send_progress(
connection_id,
RuntimeInstallProgressNotification {
bundle_version: response.bundle_version.clone(),
downloaded_bytes: None,
phase: RuntimeInstallProgressPhase::Configuring,
total_bytes: None,
},
)
.await;
let response =
crate::runtime_install::finalize_runtime_install(&environment, response).await?;
self.thread_manager.plugins_manager().clear_cache();
self.thread_manager.skills_manager().clear_cache();
Ok(Some(response.into()))
}
pub(crate) async fn cancel_runtime_install(
&self,
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
let status = {
let active_install = self.active_install();
match active_install.as_ref() {
Some(cancellation) => {
cancellation.cancel();
RuntimeInstallCancelStatus::Canceled
}
None => RuntimeInstallCancelStatus::NotFound,
}
};
Ok(Some(RuntimeInstallCancelResponse { status }.into()))
}
fn begin_install(&self) -> Result<(CancellationToken, ActiveInstallGuard), JSONRPCErrorError> {
let cancellation = CancellationToken::new();
let mut active_install = self.active_install();
if active_install.is_some() {
return Err(invalid_request("runtime install is already in progress"));
}
*active_install = Some(cancellation.clone());
drop(active_install);
Ok((
cancellation,
ActiveInstallGuard {
active_install: Arc::clone(&self.active_install),
},
))
}
fn active_install(&self) -> std::sync::MutexGuard<'_, Option<CancellationToken>> {
self.active_install
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
}
async fn send_progress(
&self,
connection_id: ConnectionId,
progress: RuntimeInstallProgressNotification,
) {
self.outgoing
.send_server_notification_to_connections(
&[connection_id],
ServerNotification::RuntimeInstallProgress(progress),
)
.await;
}
}
struct ActiveInstallGuard {
active_install: Arc<StdMutex<Option<CancellationToken>>>,
}
impl Drop for ActiveInstallGuard {
fn drop(&mut self) {
self.active_install
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.take();
}
}

View File

@@ -0,0 +1,655 @@
use std::ffi::OsStr;
use std::io;
use std::path::Component;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::RuntimeInstallPaths;
use codex_app_server_protocol::RuntimeInstallResponse;
use codex_exec_server::CopyOptions;
use codex_exec_server::CreateDirectoryOptions;
use codex_exec_server::Environment;
use codex_exec_server::ExecutorFileSystem;
use codex_exec_server::RemoveOptions;
use codex_utils_absolute_path::AbsolutePathBuf;
use uuid::Uuid;
use crate::error_code::internal_error;
use crate::error_code::invalid_params;
const PUBLISHED_ARTIFACT_NAME: &str = "codex-primary-runtime";
pub(crate) async fn finalize_runtime_install(
environment: &Environment,
mut response: RuntimeInstallResponse,
) -> Result<RuntimeInstallResponse, JSONRPCErrorError> {
if response.paths.bundled_plugin_marketplace_paths.is_empty()
&& response.paths.bundled_skill_paths.is_empty()
&& response.paths.skills_to_remove.is_empty()
{
return Ok(response);
}
let codex_home = environment.codex_home().await?;
response.paths =
finalize_runtime_paths(environment.get_filesystem(), &codex_home, response.paths).await?;
Ok(response)
}
async fn finalize_runtime_paths(
fs: Arc<dyn ExecutorFileSystem>,
codex_home: &AbsolutePathBuf,
mut paths: RuntimeInstallPaths,
) -> Result<RuntimeInstallPaths, JSONRPCErrorError> {
paths.bundled_plugin_marketplace_paths = materialize_bundled_plugin_marketplaces(
Arc::clone(&fs),
codex_home,
&paths.bundled_plugin_marketplace_paths,
)
.await?;
paths.bundled_skill_paths = sync_primary_runtime_skills(
fs,
codex_home,
&paths.bundled_skill_paths,
&paths.skills_to_remove,
)
.await?;
Ok(paths)
}
async fn materialize_bundled_plugin_marketplaces(
fs: Arc<dyn ExecutorFileSystem>,
codex_home: &AbsolutePathBuf,
marketplace_roots: &[AbsolutePathBuf],
) -> Result<Vec<AbsolutePathBuf>, JSONRPCErrorError> {
if marketplace_roots.is_empty() {
return Ok(Vec::new());
}
let destination_root = absolute_path(
codex_home
.as_path()
.join("plugins")
.join(PUBLISHED_ARTIFACT_NAME)
.join("marketplaces"),
)?;
let mut materialized = Vec::with_capacity(marketplace_roots.len());
for marketplace_root in marketplace_roots {
let marketplace_name = marketplace_root.as_path().file_name().ok_or_else(|| {
invalid_params("bundled plugin marketplace path has no directory name")
})?;
let destination = absolute_path(
destination_root
.as_path()
.join(safe_path_segment(marketplace_name)),
)?;
replace_directory(Arc::clone(&fs), marketplace_root, &destination).await?;
materialized.push(destination);
}
Ok(materialized)
}
async fn sync_primary_runtime_skills(
fs: Arc<dyn ExecutorFileSystem>,
codex_home: &AbsolutePathBuf,
bundled_skill_paths: &[AbsolutePathBuf],
skills_to_remove: &[String],
) -> Result<Vec<AbsolutePathBuf>, JSONRPCErrorError> {
if bundled_skill_paths.is_empty() && skills_to_remove.is_empty() {
return Ok(Vec::new());
}
if bundled_skill_paths.is_empty() {
move_legacy_primary_runtime_skills(fs, codex_home, skills_to_remove).await?;
return Ok(Vec::new());
}
let destination_root = absolute_path(
codex_home
.as_path()
.join("skills")
.join(PUBLISHED_ARTIFACT_NAME),
)?;
let staging_root = temporary_sibling_path(&destination_root, "staging")?;
let result = async {
create_directory(Arc::clone(&fs), &staging_root).await?;
let mut materialized = Vec::with_capacity(bundled_skill_paths.len());
for bundled_skill_path in bundled_skill_paths {
let skill_root = absolute_path(
bundled_skill_path
.as_path()
.parent()
.ok_or_else(|| {
invalid_params(format!(
"bundled skill path {} has no parent directory",
bundled_skill_path.display()
))
})?
.to_path_buf(),
)?;
let skill_name = skill_root.as_path().file_name().ok_or_else(|| {
invalid_params(format!(
"bundled skill path {} has no skill directory name",
bundled_skill_path.display()
))
})?;
let staged_skill_root = absolute_path(staging_root.as_path().join(skill_name))?;
copy_directory(Arc::clone(&fs), &skill_root, &staged_skill_root).await?;
materialized.push(absolute_path(
destination_root.as_path().join(skill_name).join("SKILL.md"),
)?);
}
publish_staged_directory(Arc::clone(&fs), &staging_root, &destination_root).await?;
move_legacy_primary_runtime_skills(Arc::clone(&fs), codex_home, skills_to_remove).await?;
Ok(materialized)
}
.await;
cleanup_directory(&fs, &staging_root, "staged primary runtime skills").await;
result
}
async fn move_legacy_primary_runtime_skills(
fs: Arc<dyn ExecutorFileSystem>,
codex_home: &AbsolutePathBuf,
skills_to_remove: &[String],
) -> Result<(), JSONRPCErrorError> {
if skills_to_remove.is_empty() {
return Ok(());
}
let skills_root = absolute_path(codex_home.as_path().join("skills"))?;
for skill_dir in skills_to_remove {
let skill_root = resolve_legacy_skill_directory(&skills_root, skill_dir)?;
let metadata = match fs.get_metadata(&skill_root, /*sandbox*/ None).await {
Ok(metadata) => metadata,
Err(err) if err.kind() == io::ErrorKind::NotFound => continue,
Err(err) => {
return Err(internal_error(format!(
"failed to inspect legacy skill directory {}: {err}",
skill_root.display()
)));
}
};
if !metadata.is_directory {
continue;
}
let backup_path = absolute_path(
codex_home
.as_path()
.join(".tmp")
.join("legacy-primary-runtime-skills")
.join(format!(
"{}-{}",
skill_root
.as_path()
.file_name()
.and_then(OsStr::to_str)
.unwrap_or("skill"),
Uuid::new_v4()
)),
)?;
if let Some(parent) = backup_path.as_path().parent() {
create_directory(Arc::clone(&fs), &absolute_path(parent.to_path_buf())?).await?;
}
copy_directory(Arc::clone(&fs), &skill_root, &backup_path).await?;
remove_if_exists(
Arc::clone(&fs),
&skill_root,
RemoveOptions {
recursive: true,
force: true,
},
)
.await?;
tracing::info!(
skill_dir = %skill_dir,
skill_root = %skill_root.display(),
backup_path = %backup_path.display(),
"moved legacy primary runtime skill"
);
}
Ok(())
}
fn resolve_legacy_skill_directory(
skills_root: &AbsolutePathBuf,
skill_dir: &str,
) -> Result<AbsolutePathBuf, JSONRPCErrorError> {
let relative = Path::new(skill_dir);
if !relative
.components()
.all(|component| matches!(component, Component::Normal(_)))
{
return Err(invalid_params(format!(
"legacy primary runtime skill path must stay within the skills directory: {skill_dir}"
)));
}
absolute_path(skills_root.as_path().join(relative))
}
async fn replace_directory(
fs: Arc<dyn ExecutorFileSystem>,
source: &AbsolutePathBuf,
destination: &AbsolutePathBuf,
) -> Result<(), JSONRPCErrorError> {
if let Some(parent) = destination.as_path().parent() {
create_directory(Arc::clone(&fs), &absolute_path(parent.to_path_buf())?).await?;
}
let staging_path = temporary_sibling_path(destination, "staging")?;
let result = async {
copy_directory(Arc::clone(&fs), source, &staging_path).await?;
publish_staged_directory(Arc::clone(&fs), &staging_path, destination).await
}
.await;
cleanup_directory(&fs, &staging_path, "staged runtime directory").await;
result
}
async fn publish_staged_directory(
fs: Arc<dyn ExecutorFileSystem>,
staging_path: &AbsolutePathBuf,
destination: &AbsolutePathBuf,
) -> Result<(), JSONRPCErrorError> {
let backup_path = temporary_sibling_path(destination, "previous")?;
let result = async {
let destination_exists = path_exists(Arc::clone(&fs), destination).await?;
if destination_exists {
copy_directory(Arc::clone(&fs), destination, &backup_path).await?;
}
remove_if_exists(
Arc::clone(&fs),
destination,
RemoveOptions {
recursive: true,
force: true,
},
)
.await?;
if let Err(error) = copy_directory(Arc::clone(&fs), staging_path, destination).await {
remove_if_exists(
Arc::clone(&fs),
destination,
RemoveOptions {
recursive: true,
force: true,
},
)
.await?;
if destination_exists
&& let Err(restore_error) =
copy_directory(Arc::clone(&fs), &backup_path, destination).await
{
return Err(internal_error(format!(
"failed to restore published runtime directory {} after replacement failed: {}; restore failed: {}",
destination.display(),
error.message,
restore_error.message
)));
}
return Err(error);
}
Ok(())
}
.await;
if result.is_ok() {
cleanup_directory(&fs, &backup_path, "previous runtime directory").await;
}
result
}
async fn copy_directory(
fs: Arc<dyn ExecutorFileSystem>,
source: &AbsolutePathBuf,
destination: &AbsolutePathBuf,
) -> Result<(), JSONRPCErrorError> {
fs.copy(
source,
destination,
CopyOptions { recursive: true },
/*sandbox*/ None,
)
.await
.map_err(|err| {
internal_error(format!(
"failed to copy directory {} to {}: {err}",
source.display(),
destination.display()
))
})
}
async fn create_directory(
fs: Arc<dyn ExecutorFileSystem>,
path: &AbsolutePathBuf,
) -> Result<(), JSONRPCErrorError> {
fs.create_directory(
path,
CreateDirectoryOptions { recursive: true },
/*sandbox*/ None,
)
.await
.map_err(|err| {
internal_error(format!(
"failed to create directory {}: {err}",
path.display()
))
})
}
async fn remove_if_exists(
fs: Arc<dyn ExecutorFileSystem>,
path: &AbsolutePathBuf,
options: RemoveOptions,
) -> Result<(), JSONRPCErrorError> {
match fs.remove(path, options, /*sandbox*/ None).await {
Ok(()) => Ok(()),
Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(()),
Err(err) => Err(internal_error(format!(
"failed to remove directory {}: {err}",
path.display()
))),
}
}
async fn path_exists(
fs: Arc<dyn ExecutorFileSystem>,
path: &AbsolutePathBuf,
) -> Result<bool, JSONRPCErrorError> {
match fs.get_metadata(path, /*sandbox*/ None).await {
Ok(_) => Ok(true),
Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(false),
Err(err) => Err(internal_error(format!(
"failed to inspect runtime path {}: {err}",
path.display()
))),
}
}
async fn cleanup_directory(fs: &Arc<dyn ExecutorFileSystem>, path: &AbsolutePathBuf, label: &str) {
if let Err(error) = remove_if_exists(
Arc::clone(fs),
path,
RemoveOptions {
recursive: true,
force: true,
},
)
.await
{
tracing::warn!(
path = %path.display(),
error = %error.message,
"failed to clean up {label}"
);
}
}
fn temporary_sibling_path(
destination: &AbsolutePathBuf,
label: &str,
) -> Result<AbsolutePathBuf, JSONRPCErrorError> {
let parent = destination.as_path().parent().ok_or_else(|| {
internal_error(format!(
"runtime destination {} has no parent directory",
destination.display()
))
})?;
let destination_name = destination
.as_path()
.file_name()
.map(safe_path_segment)
.unwrap_or_else(|| "runtime-item".to_string());
absolute_path(parent.join(format!(".{destination_name}-{label}-{}", Uuid::new_v4())))
}
fn safe_path_segment(segment: &OsStr) -> String {
let safe = segment
.to_string_lossy()
.chars()
.map(|ch| {
if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.') {
ch
} else {
'-'
}
})
.collect::<String>();
let safe = safe.trim_matches('.').to_string();
if safe.is_empty() || safe == ".." {
"runtime-item".to_string()
} else {
safe
}
}
fn absolute_path(path: PathBuf) -> Result<AbsolutePathBuf, JSONRPCErrorError> {
AbsolutePathBuf::from_absolute_path_checked(path)
.map_err(|err| internal_error(format!("runtime path is not absolute: {err}")))
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use codex_app_server_protocol::RuntimeInstallPaths;
use codex_exec_server::LocalFileSystem;
use pretty_assertions::assert_eq;
use tokio::fs;
use super::*;
#[tokio::test]
async fn finalize_runtime_paths_materializes_marketplaces_and_skills() {
let codex_home = tempfile::tempdir().expect("codex home");
let runtime = tempfile::tempdir().expect("runtime");
let marketplace_root = runtime.path().join("market");
fs::create_dir_all(marketplace_root.join(".agents/plugins"))
.await
.expect("create marketplace manifest dir");
fs::write(
marketplace_root.join(".agents/plugins/marketplace.json"),
r#"{"name":"debug","plugins":[]}"#,
)
.await
.expect("write marketplace");
let bundled_skill_root = runtime.path().join("skills").join("debug");
fs::create_dir_all(&bundled_skill_root)
.await
.expect("create bundled skill");
fs::write(bundled_skill_root.join("SKILL.md"), "debug")
.await
.expect("write bundled skill");
let legacy_skill_root = codex_home.path().join("skills").join("legacy");
fs::create_dir_all(&legacy_skill_root)
.await
.expect("create legacy skill");
fs::write(legacy_skill_root.join("SKILL.md"), "legacy")
.await
.expect("write legacy skill");
let paths = RuntimeInstallPaths {
bundled_plugin_marketplace_paths: vec![
absolute_path(marketplace_root).expect("absolute marketplace path"),
],
bundled_skill_paths: vec![
absolute_path(bundled_skill_root.join("SKILL.md")).expect("absolute skill path"),
],
node_modules_path: absolute_path(runtime.path().join("node_modules"))
.expect("absolute node modules path"),
node_path: absolute_path(runtime.path().join("node")).expect("absolute node path"),
python_path: absolute_path(runtime.path().join("python"))
.expect("absolute python path"),
skills_to_remove: vec!["legacy".to_string()],
};
let finalized = finalize_runtime_paths(
Arc::new(LocalFileSystem::unsandboxed()),
&absolute_path(codex_home.path().to_path_buf()).expect("absolute codex home"),
paths,
)
.await
.expect("finalize runtime paths");
let expected_marketplace_root = codex_home
.path()
.join("plugins")
.join(PUBLISHED_ARTIFACT_NAME)
.join("marketplaces")
.join("market");
let expected_skill_path = codex_home
.path()
.join("skills")
.join(PUBLISHED_ARTIFACT_NAME)
.join("debug")
.join("SKILL.md");
assert_eq!(
finalized.bundled_plugin_marketplace_paths,
vec![absolute_path(expected_marketplace_root.clone()).expect("absolute path")]
);
assert_eq!(
finalized.bundled_skill_paths,
vec![absolute_path(expected_skill_path.clone()).expect("absolute path")]
);
assert!(
expected_marketplace_root
.join(".agents/plugins/marketplace.json")
.is_file()
);
assert_eq!(
fs::read_to_string(expected_skill_path)
.await
.expect("read materialized skill"),
"debug"
);
assert!(!legacy_skill_root.exists());
assert_eq!(
std::fs::read_dir(
codex_home
.path()
.join(".tmp")
.join("legacy-primary-runtime-skills")
)
.expect("read legacy backups")
.count(),
1
);
}
#[tokio::test]
async fn move_legacy_primary_runtime_skills_rejects_parent_path_without_removing_skill() {
let codex_home = tempfile::tempdir().expect("codex home");
let existing_skill_path = codex_home
.path()
.join("skills")
.join("existing")
.join("SKILL.md");
fs::create_dir_all(existing_skill_path.parent().expect("skill parent"))
.await
.expect("create existing skill");
fs::write(&existing_skill_path, "existing")
.await
.expect("write existing skill");
let error = move_legacy_primary_runtime_skills(
Arc::new(LocalFileSystem::unsandboxed()),
&absolute_path(codex_home.path().to_path_buf()).expect("absolute codex home"),
&["../existing".to_string()],
)
.await
.expect_err("parent path should fail");
assert!(
error.message.contains(
"legacy primary runtime skill path must stay within the skills directory"
)
);
assert_eq!(
fs::read_to_string(existing_skill_path)
.await
.expect("read existing skill"),
"existing"
);
}
#[tokio::test]
async fn materialize_bundled_plugin_marketplaces_preserves_existing_copy_on_copy_failure() {
let codex_home = tempfile::tempdir().expect("codex home");
let runtime = tempfile::tempdir().expect("runtime");
let missing_marketplace_root = runtime.path().join("market");
let published_manifest = codex_home
.path()
.join("plugins")
.join(PUBLISHED_ARTIFACT_NAME)
.join("marketplaces")
.join("market")
.join(".agents/plugins/marketplace.json");
fs::create_dir_all(published_manifest.parent().expect("manifest parent"))
.await
.expect("create published marketplace");
fs::write(&published_manifest, "previous")
.await
.expect("write published marketplace");
let error = materialize_bundled_plugin_marketplaces(
Arc::new(LocalFileSystem::unsandboxed()),
&absolute_path(codex_home.path().to_path_buf()).expect("absolute codex home"),
&[absolute_path(missing_marketplace_root).expect("absolute marketplace path")],
)
.await
.expect_err("missing marketplace should fail");
assert!(error.message.contains("failed to copy directory"));
assert_eq!(
fs::read_to_string(published_manifest)
.await
.expect("read published marketplace"),
"previous"
);
}
#[tokio::test]
async fn sync_primary_runtime_skills_preserves_existing_copy_on_copy_failure() {
let codex_home = tempfile::tempdir().expect("codex home");
let runtime = tempfile::tempdir().expect("runtime");
let missing_skill_path = runtime.path().join("skills").join("debug").join("SKILL.md");
let published_skill_path = codex_home
.path()
.join("skills")
.join(PUBLISHED_ARTIFACT_NAME)
.join("existing")
.join("SKILL.md");
fs::create_dir_all(published_skill_path.parent().expect("skill parent"))
.await
.expect("create published skill");
fs::write(&published_skill_path, "previous")
.await
.expect("write published skill");
let error = sync_primary_runtime_skills(
Arc::new(LocalFileSystem::unsandboxed()),
&absolute_path(codex_home.path().to_path_buf()).expect("absolute codex home"),
&[absolute_path(missing_skill_path).expect("absolute skill path")],
&[],
)
.await
.expect_err("missing skill should fail");
assert!(error.message.contains("failed to copy directory"));
assert_eq!(
fs::read_to_string(published_skill_path)
.await
.expect("read published skill"),
"previous"
);
}
}

View File

@@ -30,6 +30,7 @@ reqwest = { workspace = true, features = ["json", "rustls-tls", "stream"] }
prost = "0.14.3"
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
tempfile = { workspace = true }
thiserror = { workspace = true }
toml = { workspace = true }
tokio = { workspace = true, features = [
@@ -55,6 +56,5 @@ ctor = { workspace = true }
http = { workspace = true }
pretty_assertions = { workspace = true }
serial_test = { workspace = true }
tempfile = { workspace = true }
test-case = "3.3.1"
wiremock = { workspace = true }

View File

@@ -8,6 +8,7 @@ use std::time::Duration;
use arc_swap::ArcSwap;
use codex_app_server_protocol::JSONRPCNotification;
use codex_utils_absolute_path::AbsolutePathBuf;
use futures::FutureExt;
use futures::future::BoxFuture;
use serde_json::Value;
@@ -78,6 +79,7 @@ use crate::protocol::WriteResponse;
use crate::rpc::RpcCallError;
use crate::rpc::RpcClient;
use crate::rpc::RpcClientEvent;
use crate::rpc::RpcPendingResponse;
pub(crate) mod http_client;
@@ -175,6 +177,7 @@ struct Inner {
http_body_streams_write_lock: Mutex<()>,
http_body_stream_next_id: AtomicU64,
session_id: std::sync::RwLock<Option<String>>,
codex_home: std::sync::RwLock<Option<AbsolutePathBuf>>,
reader_task: tokio::task::JoinHandle<()>,
}
@@ -341,6 +344,14 @@ impl ExecServerClient {
.unwrap_or_else(std::sync::PoisonError::into_inner);
*session_id = Some(response.session_id.clone());
}
{
let mut codex_home = self
.inner
.codex_home
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner);
*codex_home = Some(response.codex_home.clone());
}
self.notify_initialized().await?;
Ok(response)
})
@@ -463,6 +474,14 @@ impl ExecServerClient {
self.inner.disconnected.get().is_some() || self.inner.client.is_disconnected()
}
pub fn codex_home(&self) -> Option<AbsolutePathBuf> {
self.inner
.codex_home
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
}
pub(crate) async fn connect(
connection: JsonRpcConnection,
options: ExecServerClientConnectOptions,
@@ -510,6 +529,7 @@ impl ExecServerClient {
http_body_streams_write_lock: Mutex::new(()),
http_body_stream_next_id: AtomicU64::new(1),
session_id: std::sync::RwLock::new(None),
codex_home: std::sync::RwLock::new(None),
reader_task,
}
});
@@ -531,6 +551,18 @@ impl ExecServerClient {
where
P: serde::Serialize,
T: serde::de::DeserializeOwned,
{
let response = self.start_call(method, params).await?;
self.finish_call(response).await
}
async fn start_call<P>(
&self,
method: &str,
params: &P,
) -> Result<RpcPendingResponse, ExecServerError>
where
P: serde::Serialize,
{
// Reject new work before allocating a JSON-RPC request id. MCP tool
// calls, process writes, and fs operations all pass through here, so
@@ -539,7 +571,17 @@ impl ExecServerClient {
return Err(error);
}
match self.inner.client.call(method, params).await {
match self.inner.client.start_call(method, params).await {
Ok(response) => Ok(response),
Err(error) => Err(ExecServerError::from(error)),
}
}
async fn finish_call<T>(&self, response: RpcPendingResponse) -> Result<T, ExecServerError>
where
T: serde::de::DeserializeOwned,
{
match response.response().await {
Ok(response) => Ok(response),
Err(error) => {
let error = ExecServerError::from(error);
@@ -912,6 +954,7 @@ mod tests {
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_utils_absolute_path::AbsolutePathBuf;
use futures::SinkExt;
use futures::StreamExt;
use pretty_assertions::assert_eq;
@@ -1047,6 +1090,10 @@ mod tests {
id: request.id,
result: serde_json::to_value(InitializeResponse {
session_id: session_id.to_string(),
codex_home: AbsolutePathBuf::try_from(
std::env::current_dir().expect("current dir"),
)
.expect("absolute current dir"),
})
.expect("initialize response should serialize"),
}),
@@ -1082,7 +1129,7 @@ mod tests {
program: "sh".to_string(),
args: vec![
"-c".to_string(),
"read _line; printf '%s\\n' '{\"id\":1,\"result\":{\"sessionId\":\"stdio-test\"}}'; read _line; sleep 60".to_string(),
"read _line; printf '%s\\n' '{\"id\":1,\"result\":{\"sessionId\":\"stdio-test\",\"codexHome\":\"/tmp\"}}'; read _line; sleep 60".to_string(),
],
env: HashMap::new(),
cwd: None,
@@ -1106,7 +1153,7 @@ mod tests {
program: "sh".to_string(),
args: vec![
"-c".to_string(),
"read _line; printf '%s\\n' '{\"id\":1,\"result\":{\"sessionId\":\"stdio-test\"}}'; read _line; sleep 60".to_string(),
"read _line; printf '%s\\n' '{\"id\":1,\"result\":{\"sessionId\":\"stdio-test\",\"codexHome\":\"/tmp\"}}'; read _line; sleep 60".to_string(),
],
env: HashMap::new(),
cwd: None,
@@ -1129,7 +1176,7 @@ mod tests {
args: vec![
"-NoProfile".to_string(),
"-Command".to_string(),
"$null = [Console]::In.ReadLine(); [Console]::Out.WriteLine('{\"id\":1,\"result\":{\"sessionId\":\"stdio-test\"}}'); $null = [Console]::In.ReadLine(); Start-Sleep -Seconds 60".to_string(),
"$null = [Console]::In.ReadLine(); [Console]::Out.WriteLine('{\"id\":1,\"result\":{\"sessionId\":\"stdio-test\",\"codexHome\":\"C:\\\\Users\\\\codex\\\\.codex\"}}'); $null = [Console]::In.ReadLine(); Start-Sleep -Seconds 60".to_string(),
],
env: HashMap::new(),
cwd: None,
@@ -1154,7 +1201,7 @@ mod tests {
"read _line; \
echo \"$$\" > {}; \
sleep 60 >/dev/null 2>&1 & echo \"$!\" > {}; \
printf '%s\\n' '{{\"id\":1,\"result\":{{\"sessionId\":\"stdio-test\"}}}}'; \
printf '%s\\n' '{{\"id\":1,\"result\":{{\"sessionId\":\"stdio-test\",\"codexHome\":\"/tmp\"}}}}'; \
read _line; \
wait",
shell_quote(pid_file.as_path()),
@@ -1280,6 +1327,10 @@ mod tests {
id: request.id,
result: serde_json::to_value(InitializeResponse {
session_id: "session-1".to_string(),
codex_home: AbsolutePathBuf::try_from(
std::env::current_dir().expect("current dir"),
)
.expect("absolute current dir"),
})
.expect("initialize response should serialize"),
}),
@@ -1423,6 +1474,10 @@ mod tests {
id: request.id,
result: serde_json::to_value(InitializeResponse {
session_id: "session-1".to_string(),
codex_home: AbsolutePathBuf::try_from(
std::env::current_dir().expect("current dir"),
)
.expect("absolute current dir"),
})
.expect("initialize response should serialize"),
}),
@@ -1560,6 +1615,10 @@ mod tests {
id: request.id,
result: serde_json::to_value(InitializeResponse {
session_id: "session-1".to_string(),
codex_home: AbsolutePathBuf::try_from(
std::env::current_dir().expect("current dir"),
)
.expect("absolute current dir"),
})
.expect("initialize response should serialize"),
}),

View File

@@ -0,0 +1,26 @@
use std::path::PathBuf;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_utils_absolute_path::AbsolutePathBuf;
use crate::rpc::internal_error;
pub(crate) fn default_codex_home() -> Result<AbsolutePathBuf, JSONRPCErrorError> {
default_codex_home_path()
.and_then(|path| {
AbsolutePathBuf::from_absolute_path_checked(path)
.map_err(|err| format!("runtime codex home is not absolute: {err}"))
})
.map_err(internal_error)
}
pub(crate) fn default_codex_home_path() -> Result<PathBuf, String> {
if let Some(codex_home) = std::env::var_os("CODEX_HOME") {
return Ok(PathBuf::from(codex_home));
}
let home = std::env::var_os("HOME")
.or_else(|| std::env::var_os("USERPROFILE"))
.map(PathBuf::from)
.ok_or_else(|| "failed to locate home directory".to_string())?;
Ok(home.join(".codex"))
}

View File

@@ -2,6 +2,9 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_utils_absolute_path::AbsolutePathBuf;
use crate::ExecServerError;
use crate::ExecServerRuntimePaths;
use crate::ExecutorFileSystem;
@@ -9,6 +12,7 @@ use crate::HttpClient;
use crate::client::LazyRemoteExecServerClient;
use crate::client::http_client::ReqwestHttpClient;
use crate::client_api::ExecServerTransportParams;
use crate::codex_home::default_codex_home_path;
use crate::environment_provider::DefaultEnvironmentProvider;
use crate::environment_provider::EnvironmentDefault;
use crate::environment_provider::EnvironmentProvider;
@@ -20,6 +24,7 @@ use crate::local_process::LocalProcess;
use crate::process::ExecBackend;
use crate::remote_file_system::RemoteFileSystem;
use crate::remote_process::RemoteProcess;
use crate::rpc::internal_error;
pub const CODEX_EXEC_SERVER_URL_ENV_VAR: &str = "CODEX_EXEC_SERVER_URL";
@@ -91,7 +96,13 @@ impl EnvironmentManager {
local_runtime_paths: Option<ExecServerRuntimePaths>,
) -> Result<Self, ExecServerError> {
let provider = environment_provider_from_codex_home(codex_home.as_ref())?;
Self::from_snapshot(provider.snapshot().await?, local_runtime_paths)
let local_codex_home = AbsolutePathBuf::from_absolute_path_checked(codex_home.as_ref())
.map_err(|err| ExecServerError::Protocol(err.to_string()))?;
Self::from_snapshot_with_codex_home(
provider.snapshot().await?,
local_runtime_paths,
Some(local_codex_home),
)
}
/// Builds a manager from the legacy environment-variable provider without
@@ -131,6 +142,18 @@ impl EnvironmentManager {
fn from_snapshot(
snapshot: EnvironmentProviderSnapshot,
local_runtime_paths: Option<ExecServerRuntimePaths>,
) -> Result<Self, ExecServerError> {
Self::from_snapshot_with_codex_home(
snapshot,
local_runtime_paths,
default_local_codex_home(),
)
}
fn from_snapshot_with_codex_home(
snapshot: EnvironmentProviderSnapshot,
local_runtime_paths: Option<ExecServerRuntimePaths>,
local_codex_home: Option<AbsolutePathBuf>,
) -> Result<Self, ExecServerError> {
let EnvironmentProviderSnapshot {
environments,
@@ -145,7 +168,10 @@ impl EnvironmentManager {
"local environment requires configured runtime paths".to_string(),
)
})?;
let local_environment = Arc::new(Environment::local(local_runtime_paths));
let local_environment = Arc::new(Environment::local_with_codex_home(
local_runtime_paths,
local_codex_home,
));
environment_map.insert(
LOCAL_ENVIRONMENT_ID.to_string(),
Arc::clone(&local_environment),
@@ -289,7 +315,20 @@ pub struct Environment {
exec_backend: Arc<dyn ExecBackend>,
filesystem: Arc<dyn ExecutorFileSystem>,
http_client: Arc<dyn HttpClient>,
remote_client: Option<LazyRemoteExecServerClient>,
local_runtime_paths: Option<ExecServerRuntimePaths>,
codex_home: Option<AbsolutePathBuf>,
}
fn exec_server_error_to_jsonrpc(err: ExecServerError) -> JSONRPCErrorError {
match err {
ExecServerError::Server { code, message } => JSONRPCErrorError {
code,
data: None,
message,
},
_ => internal_error(err.to_string()),
}
}
impl Environment {
@@ -301,7 +340,9 @@ impl Environment {
exec_backend: Arc::new(LocalProcess::default()),
filesystem: Arc::new(LocalFileSystem::unsandboxed()),
http_client: Arc::new(ReqwestHttpClient),
remote_client: None,
local_runtime_paths: None,
codex_home: default_local_codex_home(),
}
}
}
@@ -351,6 +392,13 @@ impl Environment {
}
pub(crate) fn local(local_runtime_paths: ExecServerRuntimePaths) -> Self {
Self::local_with_codex_home(local_runtime_paths, default_local_codex_home())
}
fn local_with_codex_home(
local_runtime_paths: ExecServerRuntimePaths,
codex_home: Option<AbsolutePathBuf>,
) -> Self {
Self {
exec_server_url: None,
remote_transport: None,
@@ -359,7 +407,9 @@ impl Environment {
local_runtime_paths.clone(),
)),
http_client: Arc::new(ReqwestHttpClient),
remote_client: None,
local_runtime_paths: Some(local_runtime_paths),
codex_home,
}
}
@@ -388,14 +438,17 @@ impl Environment {
let exec_backend: Arc<dyn ExecBackend> = Arc::new(RemoteProcess::new(client.clone()));
let filesystem: Arc<dyn ExecutorFileSystem> =
Arc::new(RemoteFileSystem::new(client.clone()));
let http_client = client.clone();
Self {
exec_server_url,
remote_transport: Some(remote_transport),
exec_backend,
filesystem,
http_client: Arc::new(client),
http_client: Arc::new(http_client),
remote_client: Some(client),
local_runtime_paths,
codex_home: None,
}
}
@@ -423,6 +476,25 @@ impl Environment {
pub fn get_filesystem(&self) -> Arc<dyn ExecutorFileSystem> {
Arc::clone(&self.filesystem)
}
pub async fn codex_home(&self) -> Result<AbsolutePathBuf, JSONRPCErrorError> {
if let Some(codex_home) = self.codex_home.clone() {
return Ok(codex_home);
}
let client = self.remote_client.as_ref().ok_or_else(|| {
internal_error("failed to locate local codex home for runtime install")
})?;
let client = client.get().await.map_err(exec_server_error_to_jsonrpc)?;
client
.codex_home()
.ok_or_else(|| internal_error("remote exec-server did not report a codex home"))
}
}
fn default_local_codex_home() -> Option<AbsolutePathBuf> {
default_codex_home_path()
.ok()
.and_then(|path| AbsolutePathBuf::from_absolute_path_checked(path).ok())
}
#[cfg(test)]

View File

@@ -1,6 +1,7 @@
mod client;
mod client_api;
mod client_transport;
mod codex_home;
mod connection;
mod environment;
mod environment_provider;

View File

@@ -59,6 +59,7 @@ pub struct InitializeParams {
#[serde(rename_all = "camelCase")]
pub struct InitializeResponse {
pub session_id: String,
pub codex_home: AbsolutePathBuf,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]

View File

@@ -49,6 +49,22 @@ pub(crate) enum RpcClientEvent {
Disconnected { reason: Option<String> },
}
pub(crate) struct RpcPendingResponse {
response_rx: oneshot::Receiver<Result<Value, RpcCallError>>,
}
impl RpcPendingResponse {
pub(crate) async fn response<T>(self) -> Result<T, RpcCallError>
where
T: DeserializeOwned,
{
let result: Result<Value, RpcCallError> =
self.response_rx.await.map_err(|_| RpcCallError::Closed)?;
let response = result?;
serde_json::from_value(response).map_err(RpcCallError::Json)
}
}
#[derive(Debug, Clone, PartialEq)]
pub(crate) enum RpcServerOutboundMessage {
Response {
@@ -80,7 +96,7 @@ impl RpcNotificationSender {
self.outgoing_tx
.send(RpcServerOutboundMessage::Response { request_id, result })
.await
.map_err(|_| internal_error("RPC connection closed while sending response".into()))
.map_err(|_| internal_error("RPC connection closed while sending response"))
}
pub(crate) async fn notify<P: Serialize>(
@@ -97,7 +113,7 @@ impl RpcNotificationSender {
},
))
.await
.map_err(|_| internal_error("RPC connection closed while sending notification".into()))
.map_err(|_| internal_error("RPC connection closed while sending notification"))
}
}
@@ -320,6 +336,17 @@ impl RpcClient {
where
P: Serialize,
T: DeserializeOwned,
{
self.start_call(method, params).await?.response().await
}
pub(crate) async fn start_call<P>(
&self,
method: &str,
params: &P,
) -> Result<RpcPendingResponse, RpcCallError>
where
P: Serialize,
{
let request_id = RequestId::Integer(self.next_request_id.fetch_add(1, Ordering::SeqCst));
let (response_tx, response_rx) = oneshot::channel();
@@ -356,19 +383,7 @@ impl RpcClient {
return Err(RpcCallError::Closed);
}
// Do not race in-flight requests directly against the transport-close
// watch value. The connection reader receives JSON-RPC messages and
// the terminal disconnect event on one ordered queue, then drains any
// still-pending requests. Awaiting this receiver preserves that order:
// responses already read before EOF still win, and truly pending calls
// are failed once the reader observes the disconnect.
let result: Result<Value, RpcCallError> =
response_rx.await.map_err(|_| RpcCallError::Closed)?;
let response = match result {
Ok(response) => response,
Err(error) => return Err(error),
};
serde_json::from_value(response).map_err(RpcCallError::Json)
Ok(RpcPendingResponse { response_rx })
}
#[cfg(test)]
@@ -425,11 +440,11 @@ pub(crate) fn method_not_found(message: String) -> JSONRPCErrorError {
}
}
pub(crate) fn invalid_params(message: String) -> JSONRPCErrorError {
pub(crate) fn invalid_params(message: impl Into<String>) -> JSONRPCErrorError {
JSONRPCErrorError {
code: -32602,
data: None,
message,
message: message.into(),
}
}
@@ -441,11 +456,11 @@ pub(crate) fn not_found(message: String) -> JSONRPCErrorError {
}
}
pub(crate) fn internal_error(message: String) -> JSONRPCErrorError {
pub(crate) fn internal_error(message: impl Into<String>) -> JSONRPCErrorError {
JSONRPCErrorError {
code: -32603,
data: None,
message,
message: message.into(),
}
}

View File

@@ -1,5 +1,4 @@
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
@@ -50,7 +49,7 @@ use crate::server::session_registry::SessionRegistry;
pub(crate) struct ExecServerHandler {
session_registry: Arc<SessionRegistry>,
notifications: RpcNotificationSender,
session: StdMutex<Option<SessionHandle>>,
session: std::sync::Mutex<Option<SessionHandle>>,
active_body_stream_ids: Mutex<HashSet<String>>,
background_task_shutdown: CancellationToken,
background_tasks: TaskTracker,
@@ -68,7 +67,7 @@ impl ExecServerHandler {
Self {
session_registry,
notifications,
session: StdMutex::new(None),
session: std::sync::Mutex::new(None),
active_body_stream_ids: Mutex::new(HashSet::new()),
background_task_shutdown: CancellationToken::new(),
background_tasks: TaskTracker::new(),
@@ -123,7 +122,10 @@ impl ExecServerHandler {
.session
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner) = Some(session);
Ok(InitializeResponse { session_id })
Ok(InitializeResponse {
session_id,
codex_home: crate::codex_home::default_codex_home()?,
})
}
pub(crate) fn initialized(&self) -> Result<(), String> {

View File

@@ -1,5 +1,4 @@
use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::debug;
use tracing::warn;

View File

@@ -17,6 +17,7 @@ use codex_exec_server::HttpRequestResponse;
use codex_exec_server::InitializeParams;
use codex_exec_server::InitializeResponse;
use codex_exec_server::RemoteExecServerConnectArgs;
use codex_utils_absolute_path::AbsolutePathBuf;
use futures::SinkExt;
use futures::StreamExt;
use pretty_assertions::assert_eq;
@@ -1013,6 +1014,7 @@ impl JsonRpcPeer {
request.id,
InitializeResponse {
session_id: "session-1".to_string(),
codex_home: AbsolutePathBuf::try_from(std::env::current_dir()?)?,
},
)
.await?;

View File

@@ -0,0 +1,6 @@
load("//:defs.bzl", "codex_rust_crate")
codex_rust_crate(
name = "runtime-install",
crate_name = "codex_runtime_install",
)

View File

@@ -0,0 +1,36 @@
[package]
name = "codex-runtime-install"
version.workspace = true
edition.workspace = true
license.workspace = true
[lib]
name = "codex_runtime_install"
path = "src/lib.rs"
doctest = false
[lints]
workspace = true
[dependencies]
codex-app-server-protocol = { workspace = true }
codex-exec-server = { workspace = true }
codex-protocol = { workspace = true }
codex-utils-absolute-path = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
tokio = { workspace = true, features = [
"fs",
"macros",
"process",
"rt",
"sync",
] }
tokio-util = { workspace = true, features = ["rt"] }
tracing = { workspace = true }
uuid = { workspace = true, features = ["v7"] }
[dev-dependencies]
pretty_assertions = { workspace = true }
sha2 = { workspace = true }
tempfile = { workspace = true }

View File

@@ -0,0 +1,20 @@
use codex_app_server_protocol::JSONRPCErrorError;
const INVALID_PARAMS_ERROR_CODE: i64 = -32602;
const INTERNAL_ERROR_CODE: i64 = -32603;
pub(crate) fn invalid_params(message: impl Into<String>) -> JSONRPCErrorError {
error(INVALID_PARAMS_ERROR_CODE, message)
}
pub(crate) fn internal_error(message: impl Into<String>) -> JSONRPCErrorError {
error(INTERNAL_ERROR_CODE, message)
}
fn error(code: i64, message: impl Into<String>) -> JSONRPCErrorError {
JSONRPCErrorError {
code,
message: message.into(),
data: None,
}
}

View File

@@ -0,0 +1,377 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_exec_server::Environment;
use codex_exec_server::ExecBackend;
use codex_exec_server::ExecEnvPolicy;
use codex_exec_server::ExecOutputStream;
use codex_exec_server::ExecParams;
use codex_exec_server::ExecProcessEvent;
use codex_exec_server::ExecutorFileSystem;
use codex_exec_server::ProcessId;
use codex_protocol::config_types::ShellEnvironmentPolicyInherit;
use codex_utils_absolute_path::AbsolutePathBuf;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
use crate::errors::internal_error;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) enum RuntimeArchiveFormat {
TarXz,
Zip,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) enum TargetPlatform {
Unix,
Windows,
}
impl TargetPlatform {
pub(crate) fn runtime_name(self) -> &'static str {
match self {
Self::Unix => "unix",
Self::Windows => "win32",
}
}
}
pub(crate) struct InstallTarget {
pub(crate) install_root: AbsolutePathBuf,
pub(crate) platform: TargetPlatform,
}
/// Executes runtime installation operations in the selected environment through
/// the generic executor process and filesystem interfaces.
pub(crate) struct RuntimeExecutor {
backend: Arc<dyn ExecBackend>,
filesystem: Arc<dyn ExecutorFileSystem>,
cwd: AbsolutePathBuf,
}
impl RuntimeExecutor {
pub(crate) async fn new(environment: &Environment) -> Result<Self, JSONRPCErrorError> {
Ok(Self {
backend: environment.get_exec_backend(),
filesystem: environment.get_filesystem(),
cwd: environment.codex_home().await?,
})
}
pub(crate) fn filesystem(&self) -> Arc<dyn ExecutorFileSystem> {
Arc::clone(&self.filesystem)
}
pub(crate) async fn discover_target(
&self,
cancellation: &CancellationToken,
) -> Result<InstallTarget, JSONRPCErrorError> {
let unix_output = self
.run_command(
vec![
"sh".to_string(),
"-c".to_string(),
"case \"$(uname -s)\" in CYGWIN*|MINGW*|MSYS*) exit 1;; esac; test -n \"$HOME\" || exit 1; printf 'unix\\n%s/.cache/codex-runtimes\\n' \"$HOME\"".to_string(),
],
cancellation,
"inspect Unix runtime install environment",
)
.await;
if let Ok(output) = unix_output {
return parse_install_target(&output);
}
ensure_not_cancelled(cancellation)?;
let windows_output = self
.run_command(
vec![
"powershell".to_string(),
"-NoProfile".to_string(),
"-NonInteractive".to_string(),
"-Command".to_string(),
"if (-not $env:USERPROFILE) { exit 1 }; [Console]::Out.WriteLine('win32'); [Console]::Out.WriteLine([IO.Path]::Combine($env:USERPROFILE, '.cache', 'codex-runtimes'))".to_string(),
],
cancellation,
"inspect Windows runtime install environment",
)
.await?;
parse_install_target(&windows_output)
}
pub(crate) async fn download_archive(
&self,
platform: TargetPlatform,
url: &str,
destination: &AbsolutePathBuf,
cancellation: &CancellationToken,
) -> Result<(), JSONRPCErrorError> {
let argv = match platform {
TargetPlatform::Unix => vec![
"curl".to_string(),
"--fail".to_string(),
"--location".to_string(),
"--silent".to_string(),
"--show-error".to_string(),
"--output".to_string(),
destination.display().to_string(),
url.to_string(),
],
TargetPlatform::Windows => vec![
"powershell".to_string(),
"-NoProfile".to_string(),
"-NonInteractive".to_string(),
"-Command".to_string(),
"$ProgressPreference = 'SilentlyContinue'; Invoke-WebRequest -UseBasicParsing -Uri $args[0] -OutFile $args[1]".to_string(),
url.to_string(),
destination.display().to_string(),
],
};
self.run_command(argv, cancellation, "download runtime archive")
.await
.map(|_| ())
}
pub(crate) async fn archive_checksum(
&self,
platform: TargetPlatform,
archive_path: &AbsolutePathBuf,
cancellation: &CancellationToken,
) -> Result<String, JSONRPCErrorError> {
let argv = match platform {
TargetPlatform::Unix => vec![
"sh".to_string(),
"-c".to_string(),
"if command -v sha256sum >/dev/null 2>&1; then sha256sum \"$1\"; else shasum -a 256 \"$1\"; fi".to_string(),
"runtime-checksum".to_string(),
archive_path.display().to_string(),
],
TargetPlatform::Windows => vec![
"powershell".to_string(),
"-NoProfile".to_string(),
"-NonInteractive".to_string(),
"-Command".to_string(),
"[Console]::Out.WriteLine((Get-FileHash -LiteralPath $args[0] -Algorithm SHA256).Hash)".to_string(),
archive_path.display().to_string(),
],
};
let output = self
.run_command(argv, cancellation, "checksum runtime archive")
.await?;
output
.split_whitespace()
.next()
.map(str::to_string)
.ok_or_else(|| internal_error("checksum runtime archive returned no digest"))
}
pub(crate) async fn list_archive_entries(
&self,
format: RuntimeArchiveFormat,
platform: TargetPlatform,
archive_path: &AbsolutePathBuf,
cancellation: &CancellationToken,
) -> Result<Vec<String>, JSONRPCErrorError> {
let argv = match (format, platform) {
(RuntimeArchiveFormat::TarXz, _) => vec![
"tar".to_string(),
"-tf".to_string(),
archive_path.display().to_string(),
],
(RuntimeArchiveFormat::Zip, TargetPlatform::Unix) => vec![
"unzip".to_string(),
"-Z1".to_string(),
archive_path.display().to_string(),
],
(RuntimeArchiveFormat::Zip, TargetPlatform::Windows) => vec![
"powershell".to_string(),
"-NoProfile".to_string(),
"-NonInteractive".to_string(),
"-Command".to_string(),
"Add-Type -AssemblyName System.IO.Compression.FileSystem; $archive = [IO.Compression.ZipFile]::OpenRead($args[0]); try { $archive.Entries | ForEach-Object { [Console]::Out.WriteLine($_.FullName) } } finally { $archive.Dispose() }".to_string(),
archive_path.display().to_string(),
],
};
let output = self
.run_command(argv, cancellation, "list runtime archive")
.await?;
Ok(output
.lines()
.map(str::trim)
.filter(|entry| !entry.is_empty())
.map(str::to_string)
.collect())
}
pub(crate) async fn extract_archive(
&self,
format: RuntimeArchiveFormat,
platform: TargetPlatform,
archive_path: &AbsolutePathBuf,
extract_dir: &AbsolutePathBuf,
cancellation: &CancellationToken,
) -> Result<(), JSONRPCErrorError> {
let argv = match (format, platform) {
(RuntimeArchiveFormat::TarXz, _) => vec![
"tar".to_string(),
"-xJf".to_string(),
archive_path.display().to_string(),
"-C".to_string(),
extract_dir.display().to_string(),
],
(RuntimeArchiveFormat::Zip, TargetPlatform::Unix) => vec![
"unzip".to_string(),
"-q".to_string(),
archive_path.display().to_string(),
"-d".to_string(),
extract_dir.display().to_string(),
],
(RuntimeArchiveFormat::Zip, TargetPlatform::Windows) => vec![
"powershell".to_string(),
"-NoProfile".to_string(),
"-NonInteractive".to_string(),
"-Command".to_string(),
"Expand-Archive -LiteralPath $args[0] -DestinationPath $args[1] -Force".to_string(),
archive_path.display().to_string(),
extract_dir.display().to_string(),
],
};
self.run_command(argv, cancellation, "extract runtime archive")
.await
.map(|_| ())
}
pub(crate) async fn move_directory(
&self,
platform: TargetPlatform,
source: &AbsolutePathBuf,
destination: &AbsolutePathBuf,
cancellation: &CancellationToken,
) -> Result<(), JSONRPCErrorError> {
let argv = match platform {
TargetPlatform::Unix => vec![
"mv".to_string(),
source.display().to_string(),
destination.display().to_string(),
],
TargetPlatform::Windows => vec![
"powershell".to_string(),
"-NoProfile".to_string(),
"-NonInteractive".to_string(),
"-Command".to_string(),
"Move-Item -LiteralPath $args[0] -Destination $args[1]".to_string(),
source.display().to_string(),
destination.display().to_string(),
],
};
self.run_command(argv, cancellation, "move runtime directory")
.await
.map(|_| ())
}
async fn run_command(
&self,
argv: Vec<String>,
cancellation: &CancellationToken,
operation: &str,
) -> Result<String, JSONRPCErrorError> {
ensure_not_cancelled(cancellation)?;
let started = self
.backend
.start(ExecParams {
process_id: ProcessId::from(format!("runtime-install-{}", Uuid::now_v7())),
argv,
cwd: self.cwd.as_path().to_path_buf(),
env_policy: Some(ExecEnvPolicy {
inherit: ShellEnvironmentPolicyInherit::All,
ignore_default_excludes: true,
exclude: Vec::new(),
r#set: HashMap::new(),
include_only: Vec::new(),
}),
env: HashMap::new(),
tty: false,
pipe_stdin: false,
arg0: None,
})
.await
.map_err(|err| internal_error(format!("failed to {operation}: {err}")))?;
let mut events = started.process.subscribe_events();
let mut stdout = Vec::new();
let mut stderr = Vec::new();
let mut exit_code = None;
loop {
tokio::select! {
_ = cancellation.cancelled() => {
let _ = started.process.terminate().await;
return Err(runtime_install_canceled());
}
event = events.recv() => {
let event = event.map_err(|err| internal_error(format!("{operation} output stream failed: {err}")))?;
match event {
ExecProcessEvent::Output(chunk) => match chunk.stream {
ExecOutputStream::Stdout => stdout.extend_from_slice(&chunk.chunk.0),
ExecOutputStream::Stderr | ExecOutputStream::Pty => {
stderr.extend_from_slice(&chunk.chunk.0);
}
},
ExecProcessEvent::Exited { exit_code: code, .. } => exit_code = Some(code),
ExecProcessEvent::Closed { .. } => {
if exit_code == Some(0) {
return String::from_utf8(stdout).map_err(|err| {
internal_error(format!("{operation} returned invalid UTF-8: {err}"))
});
}
return Err(internal_error(format!(
"{operation} failed (exit code {}): {}",
exit_code.unwrap_or(-1),
String::from_utf8_lossy(&stderr).trim()
)));
}
ExecProcessEvent::Failed(message) => {
return Err(internal_error(format!("{operation} process failed: {message}")));
}
}
}
}
}
}
}
fn parse_install_target(output: &str) -> Result<InstallTarget, JSONRPCErrorError> {
let mut lines = output.lines();
let platform = match lines.next().map(str::trim) {
Some("unix") => TargetPlatform::Unix,
Some("win32") => TargetPlatform::Windows,
_ => {
return Err(internal_error(
"runtime install environment returned invalid platform",
));
}
};
let install_root = lines
.next()
.map(str::trim)
.filter(|line| !line.is_empty())
.ok_or_else(|| internal_error("runtime install environment returned no install root"))?;
let install_root = AbsolutePathBuf::from_absolute_path_checked(PathBuf::from(install_root))
.map_err(|err| internal_error(format!("runtime install root is not absolute: {err}")))?;
Ok(InstallTarget {
install_root,
platform,
})
}
fn ensure_not_cancelled(cancellation: &CancellationToken) -> Result<(), JSONRPCErrorError> {
if cancellation.is_cancelled() {
Err(runtime_install_canceled())
} else {
Ok(())
}
}
pub(crate) fn runtime_install_canceled() -> JSONRPCErrorError {
internal_error("runtime install canceled")
}

View File

@@ -0,0 +1,457 @@
use std::io;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::RuntimeInstallManifestParams;
use codex_app_server_protocol::RuntimeInstallParams;
use codex_app_server_protocol::RuntimeInstallProgressNotification;
use codex_app_server_protocol::RuntimeInstallProgressPhase;
use codex_app_server_protocol::RuntimeInstallResponse;
use codex_app_server_protocol::RuntimeInstallStatus;
use codex_exec_server::CreateDirectoryOptions;
use codex_exec_server::Environment;
use codex_exec_server::RemoveOptions;
use codex_utils_absolute_path::AbsolutePathBuf;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
use crate::errors::internal_error;
use crate::errors::invalid_params;
use crate::executor::RuntimeExecutor;
use crate::executor::TargetPlatform;
use crate::executor::runtime_install_canceled;
use crate::validation::absolute_path;
use crate::validation::assert_archive_entries_stay_within_directory;
use crate::validation::default_archive_name;
use crate::validation::read_installed_runtime_metadata;
use crate::validation::runtime_archive_format;
use crate::validation::runtime_root_directory_name;
use crate::validation::validate_manifest;
use crate::validation::validate_runtime_root;
pub type RuntimeInstallProgressSender = mpsc::UnboundedSender<RuntimeInstallProgressNotification>;
#[derive(Clone)]
struct RuntimeInstallProgressReporter {
bundle_version: Option<String>,
sender: Option<RuntimeInstallProgressSender>,
}
impl RuntimeInstallProgressReporter {
fn new(bundle_version: Option<String>, sender: Option<RuntimeInstallProgressSender>) -> Self {
Self {
bundle_version,
sender,
}
}
fn phase(&self, phase: RuntimeInstallProgressPhase) {
self.send(
phase, /*downloaded_bytes*/ None, /*total_bytes*/ None,
);
}
fn download_progress(&self, downloaded_bytes: u64, total_bytes: Option<u64>) {
self.send(
RuntimeInstallProgressPhase::Downloading,
Some(downloaded_bytes),
total_bytes,
);
}
fn send(
&self,
phase: RuntimeInstallProgressPhase,
downloaded_bytes: Option<u64>,
total_bytes: Option<u64>,
) {
let Some(sender) = self.sender.as_ref() else {
return;
};
let _ = sender.send(RuntimeInstallProgressNotification {
bundle_version: self.bundle_version.clone(),
downloaded_bytes,
phase,
total_bytes,
});
}
}
pub async fn install_runtime_with_progress(
environment: &Environment,
params: RuntimeInstallParams,
progress: RuntimeInstallProgressSender,
cancellation: CancellationToken,
) -> Result<RuntimeInstallResponse, JSONRPCErrorError> {
let executor = RuntimeExecutor::new(environment).await?;
let target = executor.discover_target(&cancellation).await?;
install_runtime_with_root_and_control(
&executor,
params,
target.install_root,
target.platform,
Some(progress),
cancellation,
)
.await
}
async fn install_runtime_with_root_and_control(
executor: &RuntimeExecutor,
params: RuntimeInstallParams,
install_root: AbsolutePathBuf,
platform: TargetPlatform,
progress: Option<RuntimeInstallProgressSender>,
cancellation: CancellationToken,
) -> Result<RuntimeInstallResponse, JSONRPCErrorError> {
validate_manifest(&params.manifest)?;
let archive_format = runtime_archive_format(&params.manifest)?;
let archive_name = params
.manifest
.archive_name
.clone()
.unwrap_or_else(|| default_archive_name(archive_format).to_string());
let progress =
RuntimeInstallProgressReporter::new(params.manifest.bundle_version.clone(), progress);
progress.phase(RuntimeInstallProgressPhase::Checking);
ensure_not_cancelled(&cancellation)?;
if let Some(response) = reuse_current_runtime(
executor,
&params.manifest,
&install_root,
platform,
&progress,
&cancellation,
)
.await?
{
return Ok(response);
}
let staging_dir = make_staging_dir(executor, &install_root).await?;
let archive_path = absolute_path(staging_dir.as_path().join(archive_name))?;
let result = async {
progress.download_progress(
/*downloaded_bytes*/ 0,
params.manifest.archive_size_bytes,
);
executor
.download_archive(
platform,
&params.manifest.archive_url,
&archive_path,
&cancellation,
)
.await?;
if let Some(total_bytes) = params.manifest.archive_size_bytes {
progress.download_progress(total_bytes, Some(total_bytes));
}
install_runtime_from_archive_with_control(
executor,
&params.manifest,
&archive_path,
&install_root,
platform,
&progress,
&cancellation,
)
.await
}
.await;
cleanup_directory(executor, &staging_dir, "runtime install staging directory").await;
result
}
async fn install_runtime_from_archive_with_control(
executor: &RuntimeExecutor,
manifest: &RuntimeInstallManifestParams,
archive_path: &AbsolutePathBuf,
install_root: &AbsolutePathBuf,
platform: TargetPlatform,
progress: &RuntimeInstallProgressReporter,
cancellation: &CancellationToken,
) -> Result<RuntimeInstallResponse, JSONRPCErrorError> {
let runtime_root_directory_name = runtime_root_directory_name(manifest)?;
let installed_runtime_root =
absolute_path(install_root.as_path().join(&runtime_root_directory_name))?;
if let Some(response) = reuse_current_runtime(
executor,
manifest,
install_root,
platform,
progress,
cancellation,
)
.await?
{
return Ok(response);
}
create_directory(executor, install_root).await?;
progress.phase(RuntimeInstallProgressPhase::Verifying);
verify_archive_checksum(executor, platform, archive_path, manifest, cancellation).await?;
let archive_format = runtime_archive_format(manifest)?;
ensure_not_cancelled(cancellation)?;
let staging_dir = make_staging_dir(executor, install_root).await?;
let result = async {
let extract_dir = absolute_path(staging_dir.as_path().join("payload"))?;
create_directory(executor, &extract_dir).await?;
progress.phase(RuntimeInstallProgressPhase::Extracting);
ensure_not_cancelled(cancellation)?;
let entries = executor
.list_archive_entries(archive_format, platform, archive_path, cancellation)
.await?;
assert_archive_entries_stay_within_directory(&entries, extract_dir.as_path())?;
ensure_not_cancelled(cancellation)?;
executor
.extract_archive(
archive_format,
platform,
archive_path,
&extract_dir,
cancellation,
)
.await?;
let extracted_runtime_root =
absolute_path(extract_dir.as_path().join(&runtime_root_directory_name))?;
progress.phase(RuntimeInstallProgressPhase::Validating);
ensure_not_cancelled(cancellation)?;
validate_runtime_root(
executor,
&extracted_runtime_root,
manifest.bundle_format_version,
platform,
)
.await?;
ensure_not_cancelled(cancellation)?;
let previous_runtime_root = absolute_path(
install_root
.as_path()
.join(format!("{runtime_root_directory_name}.previous")),
)?;
remove_dir_if_exists(executor, &previous_runtime_root).await?;
if path_exists(executor, &installed_runtime_root).await? {
executor
.move_directory(
platform,
&installed_runtime_root,
&previous_runtime_root,
cancellation,
)
.await?;
}
let install_result = async {
executor
.move_directory(
platform,
&extracted_runtime_root,
&installed_runtime_root,
cancellation,
)
.await?;
validate_runtime_root(
executor,
&installed_runtime_root,
manifest.bundle_format_version,
platform,
)
.await
}
.await;
let paths = match install_result {
Ok(paths) => paths,
Err(error) => {
remove_dir_if_exists(executor, &installed_runtime_root).await?;
if path_exists(executor, &previous_runtime_root).await? {
executor
.move_directory(
platform,
&previous_runtime_root,
&installed_runtime_root,
cancellation,
)
.await?;
}
return Err(error);
}
};
remove_dir_if_exists(executor, &previous_runtime_root).await?;
Ok(RuntimeInstallResponse {
bundle_version: manifest.bundle_version.clone(),
paths,
status: RuntimeInstallStatus::Installed,
})
}
.await;
cleanup_directory(
executor,
&staging_dir,
"runtime install extraction directory",
)
.await;
if result.is_ok() {
progress.phase(RuntimeInstallProgressPhase::Installed);
}
result
}
async fn reuse_current_runtime(
executor: &RuntimeExecutor,
manifest: &RuntimeInstallManifestParams,
install_root: &AbsolutePathBuf,
platform: TargetPlatform,
progress: &RuntimeInstallProgressReporter,
cancellation: &CancellationToken,
) -> Result<Option<RuntimeInstallResponse>, JSONRPCErrorError> {
let installed_runtime_root = absolute_path(
install_root
.as_path()
.join(runtime_root_directory_name(manifest)?),
)?;
ensure_not_cancelled(cancellation)?;
if let Some(bundle_version) = manifest.bundle_version.as_ref()
&& let Ok(Some(metadata)) =
read_installed_runtime_metadata(executor, &installed_runtime_root).await
&& metadata.bundle_version.as_ref() == Some(bundle_version)
&& let Ok(paths) = validate_runtime_root(
executor,
&installed_runtime_root,
manifest.bundle_format_version,
platform,
)
.await
{
progress.phase(RuntimeInstallProgressPhase::Installed);
return Ok(Some(RuntimeInstallResponse {
bundle_version: Some(bundle_version.clone()),
paths,
status: RuntimeInstallStatus::AlreadyCurrent,
}));
}
Ok(None)
}
async fn make_staging_dir(
executor: &RuntimeExecutor,
install_root: &AbsolutePathBuf,
) -> Result<AbsolutePathBuf, JSONRPCErrorError> {
create_directory(executor, install_root).await?;
let staging_dir = absolute_path(
install_root
.as_path()
.join(format!(".codex-runtime-install-{}", Uuid::now_v7())),
)?;
create_directory(executor, &staging_dir).await?;
Ok(staging_dir)
}
async fn verify_archive_checksum(
executor: &RuntimeExecutor,
platform: TargetPlatform,
archive_path: &AbsolutePathBuf,
manifest: &RuntimeInstallManifestParams,
cancellation: &CancellationToken,
) -> Result<(), JSONRPCErrorError> {
let actual_sha256 = executor
.archive_checksum(platform, archive_path, cancellation)
.await?;
if !actual_sha256.eq_ignore_ascii_case(&manifest.archive_sha256) {
return Err(invalid_params(format!(
"checksum mismatch for '{}': expected {}, got {actual_sha256}",
manifest.archive_url, manifest.archive_sha256
)));
}
Ok(())
}
async fn path_exists(
executor: &RuntimeExecutor,
path: &AbsolutePathBuf,
) -> Result<bool, JSONRPCErrorError> {
match executor
.filesystem()
.get_metadata(path, /*sandbox*/ None)
.await
{
Ok(_) => Ok(true),
Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(false),
Err(err) => Err(internal_error(format!(
"failed to inspect runtime path {}: {err}",
path.display()
))),
}
}
async fn create_directory(
executor: &RuntimeExecutor,
path: &AbsolutePathBuf,
) -> Result<(), JSONRPCErrorError> {
executor
.filesystem()
.create_directory(
path,
CreateDirectoryOptions { recursive: true },
/*sandbox*/ None,
)
.await
.map_err(|err| {
internal_error(format!(
"failed to create runtime directory {}: {err}",
path.display()
))
})
}
async fn remove_dir_if_exists(
executor: &RuntimeExecutor,
path: &AbsolutePathBuf,
) -> Result<(), JSONRPCErrorError> {
match executor
.filesystem()
.remove(
path,
RemoveOptions {
recursive: true,
force: true,
},
/*sandbox*/ None,
)
.await
{
Ok(()) => Ok(()),
Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(()),
Err(err) => Err(internal_error(format!(
"failed to remove runtime directory {}: {err}",
path.display()
))),
}
}
async fn cleanup_directory(executor: &RuntimeExecutor, path: &AbsolutePathBuf, label: &str) {
if let Err(error) = remove_dir_if_exists(executor, path).await {
tracing::warn!(
path = %path.display(),
error = %error.message,
"failed to clean up {label}"
);
}
}
fn ensure_not_cancelled(cancellation: &CancellationToken) -> Result<(), JSONRPCErrorError> {
if cancellation.is_cancelled() {
Err(runtime_install_canceled())
} else {
Ok(())
}
}
#[cfg(test)]
mod tests;

View File

@@ -0,0 +1,332 @@
use std::path::Path;
use codex_app_server_protocol::RuntimeInstallManifestParams;
use codex_app_server_protocol::RuntimeInstallParams;
use codex_app_server_protocol::RuntimeInstallProgressPhase;
use codex_app_server_protocol::RuntimeInstallStatus;
use codex_exec_server::Environment;
use pretty_assertions::assert_eq;
use sha2::Digest;
use sha2::Sha256;
use tokio::fs;
use tokio::process::Command;
use tokio_util::sync::CancellationToken;
use super::*;
use crate::validation::PUBLISHED_ARTIFACT_NAME;
use crate::validation::node_executable_name;
use crate::validation::python_executable_name;
#[test]
fn archive_traversal_entries_are_rejected() {
let temp_dir = tempfile::tempdir().expect("tempdir");
let entries = vec![
"codex-primary-runtime/runtime.json".to_string(),
"../x".to_string(),
];
let error = assert_archive_entries_stay_within_directory(&entries, temp_dir.path())
.expect_err("entry should be rejected");
assert!(error.message.contains("would extract outside target"));
}
#[tokio::test]
async fn install_runtime_reuses_current_runtime_without_downloading_archive() {
let temp_dir = tempfile::tempdir().expect("tempdir");
let executor = test_executor().await;
let install_root = absolute_path(temp_dir.path().join("install")).expect("install root");
let runtime_root =
absolute_path(install_root.as_path().join(PUBLISHED_ARTIFACT_NAME)).expect("runtime root");
create_runtime_root(runtime_root.as_path(), "v1").await;
let archive_path = temp_dir.path().join("unused.tar.xz");
fs::write(&archive_path, b"not used")
.await
.expect("write archive");
let mut manifest = manifest_for_archive(&archive_path, "v1").await;
manifest.archive_url = "not a valid archive URL".to_string();
let response = install_runtime_with_root_and_control(
&executor,
RuntimeInstallParams {
environment_id: None,
manifest: Box::new(manifest),
release: "primary".to_string(),
},
install_root,
local_platform(),
/*progress*/ None,
CancellationToken::new(),
)
.await
.expect("installed runtime should be reused without downloading");
assert_eq!(response.status, RuntimeInstallStatus::AlreadyCurrent);
assert_eq!(response.bundle_version.as_deref(), Some("v1"));
}
#[tokio::test]
async fn validate_runtime_root_rejects_missing_node_executable() {
let temp_dir = tempfile::tempdir().expect("tempdir");
let executor = test_executor().await;
let runtime_root =
absolute_path(temp_dir.path().join(PUBLISHED_ARTIFACT_NAME)).expect("runtime root");
create_runtime_root(runtime_root.as_path(), "v1").await;
fs::remove_file(
runtime_root
.as_path()
.join("dependencies")
.join("node")
.join("bin")
.join(node_executable_name(local_platform().runtime_name())),
)
.await
.expect("remove node");
let error = validate_runtime_root(&executor, &runtime_root, Some(2), local_platform())
.await
.expect_err("node executable should be required");
assert!(error.message.contains("node executable is missing"));
}
#[tokio::test]
async fn validate_runtime_root_rejects_missing_node_modules_directory() {
let temp_dir = tempfile::tempdir().expect("tempdir");
let executor = test_executor().await;
let runtime_root =
absolute_path(temp_dir.path().join(PUBLISHED_ARTIFACT_NAME)).expect("runtime root");
create_runtime_root(runtime_root.as_path(), "v1").await;
fs::remove_dir(
runtime_root
.as_path()
.join("dependencies")
.join("node")
.join("node_modules"),
)
.await
.expect("remove node_modules");
let error = validate_runtime_root(&executor, &runtime_root, Some(2), local_platform())
.await
.expect_err("node_modules directory should be required");
assert!(error.message.contains("node modules directory is missing"));
}
#[tokio::test]
async fn validate_runtime_root_rejects_missing_python_executable() {
let temp_dir = tempfile::tempdir().expect("tempdir");
let executor = test_executor().await;
let runtime_root =
absolute_path(temp_dir.path().join(PUBLISHED_ARTIFACT_NAME)).expect("runtime root");
create_runtime_root(runtime_root.as_path(), "v1").await;
fs::remove_file(
runtime_root
.as_path()
.join("dependencies")
.join("python")
.join("bin")
.join(python_executable_name(local_platform().runtime_name())),
)
.await
.expect("remove python");
let error = validate_runtime_root(&executor, &runtime_root, Some(2), local_platform())
.await
.expect_err("python executable should be required");
assert!(error.message.contains("python executable is missing"));
}
#[tokio::test]
async fn install_from_archive_rejects_checksum_mismatch() {
let temp_dir = tempfile::tempdir().expect("tempdir");
let executor = test_executor().await;
let archive_path = absolute_path(temp_dir.path().join("archive.tar.xz")).expect("archive path");
fs::write(archive_path.as_path(), b"archive")
.await
.expect("write archive");
let manifest = RuntimeInstallManifestParams {
archive_name: None,
archive_sha256: "0".repeat(64),
archive_size_bytes: None,
archive_url: "https://example.com/archive.tar.xz".to_string(),
bundle_format_version: Some(2),
bundle_version: Some("v1".to_string()),
format: Some("tar.xz".to_string()),
runtime_root_directory_name: None,
};
let install_root = absolute_path(temp_dir.path().join("install")).expect("install root");
let error = install_runtime_from_archive_with_control(
&executor,
&manifest,
&archive_path,
&install_root,
local_platform(),
&RuntimeInstallProgressReporter::new(manifest.bundle_version.clone(), None),
&CancellationToken::new(),
)
.await
.expect_err("checksum mismatch should fail");
assert!(error.message.contains("checksum mismatch"));
}
#[tokio::test]
async fn install_from_archive_reports_install_progress() {
let temp_dir = tempfile::tempdir().expect("tempdir");
let executor = test_executor().await;
let payload_root = temp_dir
.path()
.join("payload")
.join(PUBLISHED_ARTIFACT_NAME);
create_runtime_root(&payload_root, "v1").await;
let archive_path = absolute_path(temp_dir.path().join("archive.tar.xz")).expect("archive path");
create_tar_xz(&temp_dir.path().join("payload"), archive_path.as_path()).await;
let manifest = manifest_for_archive(archive_path.as_path(), "v1").await;
let install_root = absolute_path(temp_dir.path().join("install")).expect("install root");
let (progress_tx, mut progress_rx) = mpsc::unbounded_channel();
let progress =
RuntimeInstallProgressReporter::new(manifest.bundle_version.clone(), Some(progress_tx));
install_runtime_from_archive_with_control(
&executor,
&manifest,
&archive_path,
&install_root,
local_platform(),
&progress,
&CancellationToken::new(),
)
.await
.expect("install should succeed");
let mut phases = Vec::new();
while let Ok(notification) = progress_rx.try_recv() {
phases.push(notification.phase);
}
assert_eq!(
phases,
vec![
RuntimeInstallProgressPhase::Verifying,
RuntimeInstallProgressPhase::Extracting,
RuntimeInstallProgressPhase::Validating,
RuntimeInstallProgressPhase::Installed,
]
);
}
#[tokio::test]
async fn install_from_archive_stops_when_canceled() {
let temp_dir = tempfile::tempdir().expect("tempdir");
let executor = test_executor().await;
let archive_path = absolute_path(temp_dir.path().join("unused.tar.xz")).expect("archive path");
fs::write(archive_path.as_path(), b"unused")
.await
.expect("write archive");
let manifest = manifest_for_archive(archive_path.as_path(), "v1").await;
let install_root = absolute_path(temp_dir.path().join("install")).expect("install root");
let cancellation = CancellationToken::new();
cancellation.cancel();
let error = install_runtime_from_archive_with_control(
&executor,
&manifest,
&archive_path,
&install_root,
local_platform(),
&RuntimeInstallProgressReporter::new(manifest.bundle_version.clone(), None),
&cancellation,
)
.await
.expect_err("canceled install should fail");
assert_eq!(error.message, "runtime install canceled");
}
async fn test_executor() -> RuntimeExecutor {
RuntimeExecutor::new(&Environment::default_for_tests())
.await
.expect("test executor")
}
fn local_platform() -> TargetPlatform {
if cfg!(target_os = "windows") {
TargetPlatform::Windows
} else {
TargetPlatform::Unix
}
}
async fn create_runtime_root(runtime_root: &Path, bundle_version: &str) {
let node_bin = runtime_root.join("dependencies").join("node").join("bin");
let python_bin = runtime_root.join("dependencies").join("python").join("bin");
fs::create_dir_all(&node_bin).await.expect("node bin");
fs::create_dir_all(
runtime_root
.join("dependencies")
.join("node")
.join("node_modules"),
)
.await
.expect("node_modules");
fs::create_dir_all(&python_bin).await.expect("python bin");
fs::write(
node_bin.join(node_executable_name(local_platform().runtime_name())),
b"node",
)
.await
.expect("node");
fs::write(
python_bin.join(python_executable_name(local_platform().runtime_name())),
b"python",
)
.await
.expect("python");
fs::write(
runtime_root.join("runtime.json"),
format!(r#"{{"bundleFormatVersion":2,"bundleVersion":"{bundle_version}"}}"#),
)
.await
.expect("runtime metadata");
}
async fn manifest_for_archive(
archive_path: &Path,
bundle_version: &str,
) -> RuntimeInstallManifestParams {
RuntimeInstallManifestParams {
archive_name: None,
archive_sha256: compute_sha256(archive_path).await,
archive_size_bytes: None,
archive_url: "https://example.com/archive.tar.xz".to_string(),
bundle_format_version: Some(2),
bundle_version: Some(bundle_version.to_string()),
format: Some("tar.xz".to_string()),
runtime_root_directory_name: None,
}
}
async fn compute_sha256(path: &Path) -> String {
let bytes = fs::read(path).await.expect("read archive");
format!("{:x}", Sha256::digest(bytes))
}
async fn create_tar_xz(payload_dir: &Path, archive_path: &Path) {
let output = Command::new("tar")
.arg("-cJf")
.arg(archive_path)
.arg("-C")
.arg(payload_dir)
.arg(".")
.output()
.await
.expect("tar should run");
assert!(
output.status.success(),
"tar failed: {}",
String::from_utf8_lossy(&output.stderr)
);
}

View File

@@ -0,0 +1,6 @@
mod errors;
mod executor;
mod installer;
mod validation;
pub use installer::install_runtime_with_progress;

View File

@@ -0,0 +1,363 @@
use std::io;
use std::path::Component;
use std::path::Path;
use std::path::PathBuf;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::RuntimeInstallManifestParams;
use codex_app_server_protocol::RuntimeInstallPaths;
use codex_utils_absolute_path::AbsolutePathBuf;
use serde::Deserialize;
use crate::errors::internal_error;
use crate::errors::invalid_params;
use crate::executor::RuntimeArchiveFormat;
use crate::executor::RuntimeExecutor;
use crate::executor::TargetPlatform;
pub(crate) const PUBLISHED_ARTIFACT_NAME: &str = "codex-primary-runtime";
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct InstalledRuntimeMetadata {
pub(crate) bundle_format_version: Option<u32>,
pub(crate) bundle_version: Option<String>,
bundled_plugins: Option<Vec<String>>,
bundled_skills: Option<Vec<String>>,
skills_to_remove: Option<Vec<String>>,
}
pub(crate) fn validate_manifest(
manifest: &RuntimeInstallManifestParams,
) -> Result<(), JSONRPCErrorError> {
if manifest.archive_url.trim().is_empty() {
return Err(invalid_params(
"runtime manifest archiveUrl must not be empty",
));
}
if !is_sha256(&manifest.archive_sha256) {
return Err(invalid_params(
"runtime manifest archiveSha256 must be a 64-character hex digest",
));
}
if let Some(archive_name) = manifest.archive_name.as_ref() {
validate_path_segment(archive_name, "archiveName")?;
}
if let Some(runtime_root_directory_name) = manifest.runtime_root_directory_name.as_ref() {
validate_path_segment(runtime_root_directory_name, "runtimeRootDirectoryName")?;
}
Ok(())
}
fn is_sha256(value: &str) -> bool {
value.len() == 64 && value.bytes().all(|byte| byte.is_ascii_hexdigit())
}
fn validate_path_segment(value: &str, field_name: &str) -> Result<(), JSONRPCErrorError> {
let value = value.trim();
if value.is_empty()
|| value == "."
|| value == ".."
|| value.contains('/')
|| value.contains('\\')
{
return Err(invalid_params(format!(
"runtime manifest {field_name} must be a single path segment"
)));
}
Ok(())
}
pub(crate) fn runtime_root_directory_name(
manifest: &RuntimeInstallManifestParams,
) -> Result<String, JSONRPCErrorError> {
let runtime_root_directory_name = manifest
.runtime_root_directory_name
.clone()
.unwrap_or_else(|| PUBLISHED_ARTIFACT_NAME.to_string());
validate_path_segment(&runtime_root_directory_name, "runtimeRootDirectoryName")?;
Ok(runtime_root_directory_name)
}
pub(crate) fn runtime_archive_format(
manifest: &RuntimeInstallManifestParams,
) -> Result<RuntimeArchiveFormat, JSONRPCErrorError> {
if let Some(format) = manifest.format.as_deref() {
match format.to_ascii_lowercase().as_str() {
"tar.xz" => return Ok(RuntimeArchiveFormat::TarXz),
"zip" => return Ok(RuntimeArchiveFormat::Zip),
_ => {
return Err(invalid_params(format!(
"unsupported runtime archive format: {format}"
)));
}
}
}
if manifest
.archive_name
.as_deref()
.is_some_and(|name| name.to_ascii_lowercase().ends_with(".zip"))
|| manifest.archive_url.to_ascii_lowercase().ends_with(".zip")
{
return Ok(RuntimeArchiveFormat::Zip);
}
Ok(RuntimeArchiveFormat::TarXz)
}
pub(crate) fn default_archive_name(format: RuntimeArchiveFormat) -> &'static str {
match format {
RuntimeArchiveFormat::TarXz => "node-runtime.tar.xz",
RuntimeArchiveFormat::Zip => "node-runtime.zip",
}
}
pub(crate) fn assert_archive_entries_stay_within_directory(
entries: &[String],
extract_dir: &Path,
) -> Result<(), JSONRPCErrorError> {
let resolved_extract_dir = normalize_path(extract_dir);
for entry in entries {
let resolved_entry_path = normalize_path(extract_dir.join(entry));
if resolved_entry_path != resolved_extract_dir
&& !resolved_entry_path.starts_with(&resolved_extract_dir)
{
return Err(invalid_params(format!(
"archive entry '{entry}' would extract outside target"
)));
}
}
Ok(())
}
fn normalize_path(path: impl AsRef<Path>) -> PathBuf {
let mut normalized = PathBuf::new();
for component in path.as_ref().components() {
match component {
Component::CurDir => {}
Component::ParentDir => {
normalized.pop();
}
_ => normalized.push(component.as_os_str()),
}
}
normalized
}
pub(crate) async fn read_installed_runtime_metadata(
executor: &RuntimeExecutor,
runtime_root: &AbsolutePathBuf,
) -> Result<Option<InstalledRuntimeMetadata>, JSONRPCErrorError> {
let metadata_path = absolute_path(runtime_root.as_path().join("runtime.json"))?;
let raw = match executor
.filesystem()
.read_file_text(&metadata_path, /*sandbox*/ None)
.await
{
Ok(raw) => raw,
Err(err) if err.kind() == io::ErrorKind::NotFound => return Ok(None),
Err(err) => {
return Err(internal_error(format!(
"failed to read installed runtime metadata: {err}"
)));
}
};
serde_json::from_str(&raw)
.map(Some)
.map_err(|err| invalid_params(format!("failed to parse installed runtime metadata: {err}")))
}
pub(crate) async fn validate_runtime_root(
executor: &RuntimeExecutor,
runtime_root: &AbsolutePathBuf,
manifest_bundle_format_version: Option<u32>,
platform: TargetPlatform,
) -> Result<RuntimeInstallPaths, JSONRPCErrorError> {
let metadata = read_installed_runtime_metadata(executor, runtime_root)
.await?
.ok_or_else(|| invalid_params("runtime metadata is missing"))?;
let bundle_format_version = manifest_bundle_format_version
.or(metadata.bundle_format_version)
.unwrap_or(1);
let node_root = if bundle_format_version >= 2 {
runtime_root.as_path().join("dependencies").join("node")
} else {
runtime_root.as_path().to_path_buf()
};
let node_path = absolute_path(
node_root
.join("bin")
.join(node_executable_name(platform.runtime_name())),
)?;
let node_modules_path = absolute_path(node_root.join("node_modules"))?;
require_runtime_file(executor, &node_path, "node executable").await?;
require_runtime_directory(executor, &node_modules_path, "node modules directory").await?;
let python_path =
find_python_path(executor, runtime_root, bundle_format_version, platform).await?;
let bundled_plugin_marketplace_paths = runtime_contained_paths(
runtime_root,
metadata.bundled_plugins.unwrap_or_default(),
&[],
)?;
let bundled_skill_paths = runtime_contained_paths(
runtime_root,
metadata.bundled_skills.unwrap_or_default(),
&["SKILL.md"],
)?;
Ok(RuntimeInstallPaths {
bundled_plugin_marketplace_paths,
bundled_skill_paths,
node_modules_path,
node_path,
python_path,
skills_to_remove: metadata.skills_to_remove.unwrap_or_default(),
})
}
async fn find_python_path(
executor: &RuntimeExecutor,
runtime_root: &AbsolutePathBuf,
bundle_format_version: u32,
platform: TargetPlatform,
) -> Result<AbsolutePathBuf, JSONRPCErrorError> {
let python_root = if bundle_format_version >= 2 {
runtime_root.as_path().join("dependencies").join("python")
} else {
runtime_root.as_path().join("python")
};
let executable_name = python_executable_name(platform.runtime_name());
let candidates = if platform == TargetPlatform::Windows {
vec![
python_root.join(executable_name),
python_root.join("python").join(executable_name),
python_root.join("bin").join(executable_name),
]
} else {
vec![
python_root.join("bin").join(executable_name),
python_root.join("bin").join("python"),
]
};
for candidate in candidates {
let candidate = absolute_path(candidate)?;
match executor
.filesystem()
.get_metadata(&candidate, /*sandbox*/ None)
.await
{
Ok(metadata) if metadata.is_file => return Ok(candidate),
Ok(_) => {}
Err(err) if err.kind() == io::ErrorKind::NotFound => {}
Err(err) => {
return Err(internal_error(format!(
"failed to inspect runtime python executable {}: {err}",
candidate.display()
)));
}
}
}
Err(invalid_params(format!(
"runtime python executable is missing under {}",
python_root.display()
)))
}
fn runtime_contained_paths(
runtime_root: &AbsolutePathBuf,
directories: Vec<String>,
suffix: &[&str],
) -> Result<Vec<AbsolutePathBuf>, JSONRPCErrorError> {
directories
.into_iter()
.map(|directory| {
let mut path = runtime_root.as_path().join(directory);
for segment in suffix {
path.push(segment);
}
let normalized_runtime_root = normalize_path(runtime_root.as_path());
let normalized_path = normalize_path(&path);
if normalized_path != normalized_runtime_root
&& normalized_path.starts_with(&normalized_runtime_root)
{
absolute_path(path)
} else {
Err(invalid_params(
"runtime-contained path must stay within the runtime root",
))
}
})
.collect()
}
pub(crate) fn absolute_path(path: PathBuf) -> Result<AbsolutePathBuf, JSONRPCErrorError> {
AbsolutePathBuf::from_absolute_path_checked(path)
.map_err(|err| internal_error(format!("runtime path is not absolute: {err}")))
}
pub(crate) fn node_executable_name(target_platform: &str) -> &'static str {
if target_platform == "win32" {
"node.exe"
} else {
"node"
}
}
pub(crate) fn python_executable_name(target_platform: &str) -> &'static str {
if target_platform == "win32" {
"python.exe"
} else {
"python3"
}
}
async fn require_runtime_file(
executor: &RuntimeExecutor,
path: &AbsolutePathBuf,
label: &str,
) -> Result<(), JSONRPCErrorError> {
match executor
.filesystem()
.get_metadata(path, /*sandbox*/ None)
.await
{
Ok(metadata) if metadata.is_file => Ok(()),
Ok(_) => Err(invalid_params(format!(
"runtime {label} is not a file: {}",
path.display()
))),
Err(err) if err.kind() == io::ErrorKind::NotFound => Err(invalid_params(format!(
"runtime {label} is missing: {}",
path.display()
))),
Err(err) => Err(internal_error(format!(
"failed to inspect runtime {label} {}: {err}",
path.display()
))),
}
}
async fn require_runtime_directory(
executor: &RuntimeExecutor,
path: &AbsolutePathBuf,
label: &str,
) -> Result<(), JSONRPCErrorError> {
match executor
.filesystem()
.get_metadata(path, /*sandbox*/ None)
.await
{
Ok(metadata) if metadata.is_directory => Ok(()),
Ok(_) => Err(invalid_params(format!(
"runtime {label} is not a directory: {}",
path.display()
))),
Err(err) if err.kind() == io::ErrorKind::NotFound => Err(invalid_params(format!(
"runtime {label} is missing: {}",
path.display()
))),
Err(err) => Err(internal_error(format!(
"failed to inspect runtime {label} {}: {err}",
path.display()
))),
}
}

View File

@@ -162,6 +162,7 @@ pub(super) fn server_notification_thread_target(
| ServerNotification::FsChanged(_)
| ServerNotification::WindowsWorldWritableWarning(_)
| ServerNotification::WindowsSandboxSetupCompleted(_)
| ServerNotification::RuntimeInstallProgress(_)
| ServerNotification::AccountLoginCompleted(_) => None,
};
@@ -181,6 +182,8 @@ mod tests {
use crate::test_support::PathBufExt;
use crate::test_support::test_path_buf;
use codex_app_server_protocol::GuardianWarningNotification;
use codex_app_server_protocol::RuntimeInstallProgressNotification;
use codex_app_server_protocol::RuntimeInstallProgressPhase;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ThreadSettings;
use codex_app_server_protocol::ThreadSettingsUpdatedNotification;
@@ -230,6 +233,21 @@ mod tests {
assert_eq!(target, ServerNotificationThreadTarget::Global);
}
#[test]
fn runtime_install_progress_notifications_are_global() {
let notification =
ServerNotification::RuntimeInstallProgress(RuntimeInstallProgressNotification {
bundle_version: Some("v1".to_string()),
downloaded_bytes: None,
phase: RuntimeInstallProgressPhase::Checking,
total_bytes: None,
});
let target = server_notification_thread_target(&notification);
assert_eq!(target, ServerNotificationThreadTarget::Global);
}
#[test]
fn warning_notifications_route_to_threads_when_thread_id_is_present() {
let thread_id = ThreadId::new();

View File

@@ -239,6 +239,7 @@ impl ChatWidget {
| ServerNotification::ThreadRealtimeTranscriptDone(_)
| ServerNotification::WindowsWorldWritableWarning(_)
| ServerNotification::WindowsSandboxSetupCompleted(_)
| ServerNotification::RuntimeInstallProgress(_)
| ServerNotification::AccountLoginCompleted(_) => {}
ServerNotification::ContextCompacted(_) => {}
}