mirror of
https://github.com/anomalyco/opencode.git
synced 2026-02-20 07:44:23 +00:00
Compare commits
3 Commits
commit-his
...
snapshot-n
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c585b9b50d | ||
|
|
c5089c9851 | ||
|
|
9c501b1583 |
6
bun.lock
6
bun.lock
@@ -315,7 +315,6 @@
|
||||
"ai": "catalog:",
|
||||
"ai-gateway-provider": "2.3.1",
|
||||
"bonjour-service": "1.3.0",
|
||||
"bun-pty": "0.4.8",
|
||||
"chokidar": "4.0.3",
|
||||
"clipboardy": "4.0.0",
|
||||
"decimal.js": "10.5.0",
|
||||
@@ -331,6 +330,7 @@
|
||||
"jsonc-parser": "3.3.1",
|
||||
"mime-types": "3.0.2",
|
||||
"minimatch": "10.0.3",
|
||||
"node-pty": "1.1.0",
|
||||
"open": "10.1.2",
|
||||
"opentui-spinner": "0.0.6",
|
||||
"partial-json": "0.1.7",
|
||||
@@ -2222,8 +2222,6 @@
|
||||
|
||||
"bun-ffi-structs": ["bun-ffi-structs@0.1.2", "", { "peerDependencies": { "typescript": "^5" } }, "sha512-Lh1oQAYHDcnesJauieA4UNkWGXY9hYck7OA5IaRwE3Bp6K2F2pJSNYqq+hIy7P3uOvo3km3oxS8304g5gDMl/w=="],
|
||||
|
||||
"bun-pty": ["bun-pty@0.4.8", "", {}, "sha512-rO70Mrbr13+jxHHHu2YBkk2pNqrJE5cJn29WE++PUr+GFA0hq/VgtQPZANJ8dJo6d7XImvBk37Innt8GM7O28w=="],
|
||||
|
||||
"bun-types": ["bun-types@1.3.9", "", { "dependencies": { "@types/node": "*" } }, "sha512-+UBWWOakIP4Tswh0Bt0QD0alpTY8cb5hvgiYeWCMet9YukHbzuruIEeXC2D7nMJPB12kbh8C7XJykSexEqGKJg=="],
|
||||
|
||||
"bun-webgpu": ["bun-webgpu@0.1.4", "", { "dependencies": { "@webgpu/types": "^0.1.60" }, "optionalDependencies": { "bun-webgpu-darwin-arm64": "^0.1.4", "bun-webgpu-darwin-x64": "^0.1.4", "bun-webgpu-linux-x64": "^0.1.4", "bun-webgpu-win32-x64": "^0.1.4" } }, "sha512-Kw+HoXl1PMWJTh9wvh63SSRofTA8vYBFCw0XEP1V1fFdQEDhI8Sgf73sdndE/oDpN/7CMx0Yv/q8FCvO39ROMQ=="],
|
||||
@@ -3298,6 +3296,8 @@
|
||||
|
||||
"node-mock-http": ["node-mock-http@1.0.4", "", {}, "sha512-8DY+kFsDkNXy1sJglUfuODx1/opAGJGyrTuFqEoN90oRc2Vk0ZbD4K2qmKXBBEhZQzdKHIVfEJpDU8Ak2NJEvQ=="],
|
||||
|
||||
"node-pty": ["node-pty@1.1.0", "", { "dependencies": { "node-addon-api": "^7.1.0" } }, "sha512-20JqtutY6JPXTUnL0ij1uad7Qe1baT46lyolh2sSENDd4sTzKZ4nmAFkeAARDKwmlLjPx6XKRlwRUxwjOy+lUg=="],
|
||||
|
||||
"node-releases": ["node-releases@2.0.27", "", {}, "sha512-nmh3lCkYZ3grZvqcCH+fjmQ7X+H0OeZgP40OierEaAptX4XofMh5kwNbWh7lBduUzCcV/8kZ+NDLCwm2iorIlA=="],
|
||||
|
||||
"nopt": ["nopt@7.2.1", "", { "dependencies": { "abbrev": "^2.0.0" }, "bin": { "nopt": "bin/nopt.js" } }, "sha512-taM24ViiimT/XntxbPyJQzCG+p4EKOpgD3mxFwW38mGjVUrfERQOeY4EDHjdnptttfHuHQXFx+lTP08Q+mLa/w=="],
|
||||
|
||||
@@ -11,8 +11,7 @@
|
||||
"dev:web": "bun --cwd packages/app dev",
|
||||
"typecheck": "bun turbo typecheck",
|
||||
"prepare": "husky",
|
||||
"random": "echo 'Random script'",
|
||||
"hello": "echo 'Hello World!'",
|
||||
"postinstall": "bun packages/opencode/script/node-pty-helper.ts",
|
||||
"test": "echo 'do not run tests from root' && exit 1"
|
||||
},
|
||||
"workspaces": {
|
||||
|
||||
@@ -15,6 +15,40 @@ import { terminalWriter } from "@/utils/terminal-writer"
|
||||
|
||||
const TOGGLE_TERMINAL_ID = "terminal.toggle"
|
||||
const DEFAULT_TOGGLE_TERMINAL_KEYBIND = "ctrl+`"
|
||||
const FRAME_META = 0
|
||||
const FRAME_OUTPUT = 1
|
||||
const FRAME_INPUT = 2
|
||||
const encoder = new TextEncoder()
|
||||
|
||||
const connection = () => {
|
||||
if (typeof crypto !== "undefined" && typeof crypto.randomUUID === "function") {
|
||||
return crypto.randomUUID()
|
||||
}
|
||||
return `${Date.now().toString(36)}-${Math.random().toString(36).slice(2, 10)}`
|
||||
}
|
||||
|
||||
const frameInput = (id: string, data: string) => {
|
||||
const channel = encoder.encode(id)
|
||||
const body = encoder.encode(data)
|
||||
const out = new Uint8Array(2 + channel.length + body.length)
|
||||
out[0] = FRAME_INPUT
|
||||
out[1] = channel.length
|
||||
out.set(channel, 2)
|
||||
out.set(body, 2 + channel.length)
|
||||
return out
|
||||
}
|
||||
|
||||
const frameOutput = (bytes: Uint8Array, decoder: TextDecoder) => {
|
||||
if (bytes[0] !== FRAME_OUTPUT) return
|
||||
const size = bytes[1]
|
||||
if (!Number.isSafeInteger(size) || size < 0) return
|
||||
if (bytes.length < 2 + size) return
|
||||
return {
|
||||
connection: decoder.decode(bytes.subarray(2, 2 + size)),
|
||||
data: decoder.decode(bytes.subarray(2 + size)),
|
||||
}
|
||||
}
|
||||
|
||||
export interface TerminalProps extends ComponentProps<"div"> {
|
||||
pty: LocalPTY
|
||||
onSubmit?: () => void
|
||||
@@ -396,8 +430,10 @@ export const Terminal = (props: TerminalProps) => {
|
||||
scheduleSize(size.cols, size.rows)
|
||||
})
|
||||
cleanups.push(() => disposeIfDisposable(onResize))
|
||||
const connectionID = connection()
|
||||
const onData = t.onData((data) => {
|
||||
if (ws?.readyState === WebSocket.OPEN) ws.send(data)
|
||||
if (ws?.readyState !== WebSocket.OPEN) return
|
||||
ws.send(frameInput(connectionID, data))
|
||||
})
|
||||
cleanups.push(() => disposeIfDisposable(onData))
|
||||
const onKey = t.onKey((key) => {
|
||||
@@ -450,6 +486,7 @@ export const Terminal = (props: TerminalProps) => {
|
||||
const url = new URL(sdk.url + `/pty/${local.pty.id}/connect`)
|
||||
url.searchParams.set("directory", sdk.directory)
|
||||
url.searchParams.set("cursor", String(start !== undefined ? start : local.pty.buffer ? -1 : 0))
|
||||
url.searchParams.set("connection", connectionID)
|
||||
url.protocol = url.protocol === "https:" ? "wss:" : "ws:"
|
||||
url.username = server.current?.http.username ?? ""
|
||||
url.password = server.current?.http.password ?? ""
|
||||
@@ -471,24 +508,33 @@ export const Terminal = (props: TerminalProps) => {
|
||||
if (closing) return
|
||||
if (event.data instanceof ArrayBuffer) {
|
||||
const bytes = new Uint8Array(event.data)
|
||||
if (bytes[0] !== 0) return
|
||||
const json = decoder.decode(bytes.subarray(1))
|
||||
try {
|
||||
const meta = JSON.parse(json) as { cursor?: unknown }
|
||||
const next = meta?.cursor
|
||||
if (typeof next === "number" && Number.isSafeInteger(next) && next >= 0) {
|
||||
cursor = next
|
||||
if (bytes[0] === FRAME_META) {
|
||||
const json = decoder.decode(bytes.subarray(1))
|
||||
try {
|
||||
const meta = JSON.parse(json) as { cursor?: unknown; connection?: unknown }
|
||||
if (typeof meta?.connection === "string" && meta.connection !== connectionID) return
|
||||
const next = meta?.cursor
|
||||
if (typeof next === "number" && Number.isSafeInteger(next) && next >= 0) {
|
||||
cursor = next
|
||||
}
|
||||
} catch (err) {
|
||||
debugTerminal("invalid websocket control frame", err)
|
||||
}
|
||||
} catch (err) {
|
||||
debugTerminal("invalid websocket control frame", err)
|
||||
return
|
||||
}
|
||||
|
||||
const frame = frameOutput(bytes, decoder)
|
||||
if (!frame) return
|
||||
if (frame.connection !== connectionID) return
|
||||
if (!frame.data) return
|
||||
output?.push(frame.data)
|
||||
cursor += frame.data.length
|
||||
return
|
||||
}
|
||||
|
||||
const data = typeof event.data === "string" ? event.data : ""
|
||||
if (!data) return
|
||||
output?.push(data)
|
||||
cursor += data.length
|
||||
if (typeof event.data === "string") {
|
||||
debugTerminal("ignoring unframed websocket output")
|
||||
}
|
||||
}
|
||||
socket.addEventListener("message", handleMessage)
|
||||
|
||||
|
||||
@@ -100,7 +100,7 @@
|
||||
"ai": "catalog:",
|
||||
"ai-gateway-provider": "2.3.1",
|
||||
"bonjour-service": "1.3.0",
|
||||
"bun-pty": "0.4.8",
|
||||
"node-pty": "1.1.0",
|
||||
"chokidar": "4.0.3",
|
||||
"clipboardy": "4.0.0",
|
||||
"decimal.js": "10.5.0",
|
||||
|
||||
@@ -142,6 +142,8 @@ if (!skipInstall) {
|
||||
await $`bun install --os="*" --cpu="*" @opentui/core@${pkg.dependencies["@opentui/core"]}`
|
||||
await $`bun install --os="*" --cpu="*" @parcel/watcher@${pkg.dependencies["@parcel/watcher"]}`
|
||||
}
|
||||
await $`bun script/node-pty-helper.ts`
|
||||
|
||||
for (const item of targets) {
|
||||
const name = [
|
||||
pkg.name,
|
||||
|
||||
45
packages/opencode/script/node-pty-helper.ts
Normal file
45
packages/opencode/script/node-pty-helper.ts
Normal file
@@ -0,0 +1,45 @@
|
||||
#!/usr/bin/env bun
|
||||
|
||||
import fs from "node:fs"
|
||||
import path from "node:path"
|
||||
import { createRequire } from "node:module"
|
||||
|
||||
const req = createRequire(import.meta.url)
|
||||
|
||||
const resolve = () => {
|
||||
try {
|
||||
return path.dirname(req.resolve("node-pty/package.json"))
|
||||
} catch {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
export const fixNodePtyHelper = () => {
|
||||
const root = resolve()
|
||||
if (!root) return []
|
||||
|
||||
const files = [
|
||||
path.join(root, "prebuilds", "darwin-arm64", "spawn-helper"),
|
||||
path.join(root, "prebuilds", "darwin-x64", "spawn-helper"),
|
||||
path.join(root, "build", "Release", "spawn-helper"),
|
||||
path.join(root, "build", "Debug", "spawn-helper"),
|
||||
]
|
||||
|
||||
return files.flatMap((file) => {
|
||||
if (!fs.existsSync(file)) return []
|
||||
const mode = fs.statSync(file).mode
|
||||
const next = mode | 0o111
|
||||
if (mode === next) return []
|
||||
fs.chmodSync(file, next)
|
||||
return [file]
|
||||
})
|
||||
}
|
||||
|
||||
if (import.meta.main) {
|
||||
const changed = fixNodePtyHelper()
|
||||
if (!changed.length) process.exit(0)
|
||||
console.log(`updated node-pty spawn-helper permissions (${changed.length})`)
|
||||
for (const file of changed) {
|
||||
console.log(`- ${file}`)
|
||||
}
|
||||
}
|
||||
@@ -1,11 +1,10 @@
|
||||
import { BusEvent } from "@/bus/bus-event"
|
||||
import { Bus } from "@/bus"
|
||||
import { type IPty } from "bun-pty"
|
||||
import { type IPty } from "node-pty"
|
||||
import z from "zod"
|
||||
import { Identifier } from "../id/id"
|
||||
import { Log } from "../util/log"
|
||||
import { Instance } from "../project/instance"
|
||||
import { lazy } from "@opencode-ai/util/lazy"
|
||||
import { Shell } from "@/shell/shell"
|
||||
import { Plugin } from "@/plugin"
|
||||
|
||||
@@ -15,22 +14,27 @@ export namespace Pty {
|
||||
const BUFFER_LIMIT = 1024 * 1024 * 2
|
||||
const BUFFER_CHUNK = 64 * 1024
|
||||
const encoder = new TextEncoder()
|
||||
const decoder = new TextDecoder()
|
||||
const FRAME_META = 0
|
||||
const FRAME_OUTPUT = 1
|
||||
const FRAME_INPUT = 2
|
||||
const MAX_CONNECTION = 200
|
||||
|
||||
type Socket = {
|
||||
readyState: number
|
||||
data?: unknown
|
||||
send: (data: string | Uint8Array | ArrayBuffer) => void
|
||||
close: (code?: number, reason?: string) => void
|
||||
}
|
||||
|
||||
type Subscriber = {
|
||||
id: number
|
||||
token: unknown
|
||||
connection: string
|
||||
}
|
||||
|
||||
const sockets = new WeakMap<object, number>()
|
||||
const owners = new WeakMap<object, string>()
|
||||
let socketCounter = 0
|
||||
let connectionCounter = 0
|
||||
|
||||
const tagSocket = (ws: Socket) => {
|
||||
if (!ws || typeof ws !== "object") return
|
||||
@@ -39,33 +43,74 @@ export namespace Pty {
|
||||
return next
|
||||
}
|
||||
|
||||
const token = (ws: Socket) => {
|
||||
const data = ws.data
|
||||
if (!data || typeof data !== "object") return
|
||||
|
||||
const events = (data as { events?: unknown }).events
|
||||
if (events && typeof events === "object") return events
|
||||
|
||||
const url = (data as { url?: unknown }).url
|
||||
if (url && typeof url === "object") return url
|
||||
|
||||
return data
|
||||
const connection = () => {
|
||||
connectionCounter = (connectionCounter + 1) % Number.MAX_SAFE_INTEGER
|
||||
return `${Date.now().toString(36)}-${connectionCounter.toString(36)}`
|
||||
}
|
||||
|
||||
// WebSocket control frame: 0x00 + UTF-8 JSON.
|
||||
const meta = (cursor: number) => {
|
||||
const json = JSON.stringify({ cursor })
|
||||
const normalizeConnection = (value?: string) => {
|
||||
const next = typeof value === "string" ? value.trim() : ""
|
||||
if (!next) return connection()
|
||||
if (next.length > MAX_CONNECTION) return connection()
|
||||
if (encoder.encode(next).length > 255) return connection()
|
||||
return next
|
||||
}
|
||||
|
||||
const output = (connection: string, data: string) => {
|
||||
const channel = encoder.encode(connection)
|
||||
const chunk = encoder.encode(data)
|
||||
const out = new Uint8Array(2 + channel.length + chunk.length)
|
||||
out[0] = FRAME_OUTPUT
|
||||
out[1] = channel.length
|
||||
out.set(channel, 2)
|
||||
out.set(chunk, 2 + channel.length)
|
||||
return out
|
||||
}
|
||||
|
||||
const input = (message: string | Uint8Array | ArrayBuffer) => {
|
||||
if (typeof message === "string") {
|
||||
return { data: message }
|
||||
}
|
||||
|
||||
const bytes = message instanceof Uint8Array ? message : new Uint8Array(message)
|
||||
if (bytes[0] !== FRAME_INPUT) return
|
||||
const size = bytes[1]
|
||||
if (!Number.isSafeInteger(size) || size < 0) return
|
||||
if (bytes.length < 2 + size) return
|
||||
return {
|
||||
connection: decoder.decode(bytes.subarray(2, 2 + size)),
|
||||
data: decoder.decode(bytes.subarray(2 + size)),
|
||||
}
|
||||
}
|
||||
|
||||
// WebSocket control frame: 0x00 + UTF-8 JSON ({ cursor, connection }).
|
||||
const meta = (cursor: number, connection: string) => {
|
||||
const json = JSON.stringify({ cursor, connection })
|
||||
const bytes = encoder.encode(json)
|
||||
const out = new Uint8Array(bytes.length + 1)
|
||||
out[0] = 0
|
||||
out[0] = FRAME_META
|
||||
out.set(bytes, 1)
|
||||
return out
|
||||
}
|
||||
|
||||
const pty = lazy(async () => {
|
||||
const { spawn } = await import("bun-pty")
|
||||
return spawn
|
||||
})
|
||||
type Spawn = (file: string, args: string | string[], options: unknown) => IPty
|
||||
let override: Spawn | undefined
|
||||
let spawn: Spawn | undefined
|
||||
|
||||
const pty = async (): Promise<Spawn> => {
|
||||
if (override) return override
|
||||
if (spawn) return spawn
|
||||
const mod = await import("node-pty")
|
||||
const next = mod.spawn as Spawn
|
||||
spawn = next
|
||||
return next
|
||||
}
|
||||
|
||||
export function setSpawn(input?: Spawn) {
|
||||
override = input
|
||||
if (input) return
|
||||
spawn = undefined
|
||||
}
|
||||
|
||||
export const Info = z
|
||||
.object({
|
||||
@@ -210,13 +255,8 @@ export namespace Pty {
|
||||
continue
|
||||
}
|
||||
|
||||
if (sub.token !== undefined && token(ws) !== sub.token) {
|
||||
session.subscribers.delete(ws)
|
||||
continue
|
||||
}
|
||||
|
||||
try {
|
||||
ws.send(chunk)
|
||||
ws.send(output(sub.connection, chunk))
|
||||
} catch {
|
||||
session.subscribers.delete(ws)
|
||||
}
|
||||
@@ -292,7 +332,7 @@ export namespace Pty {
|
||||
}
|
||||
}
|
||||
|
||||
export function connect(id: string, ws: Socket, cursor?: number) {
|
||||
export function connect(id: string, ws: Socket, cursor?: number, connectionID?: string) {
|
||||
const session = state().get(id)
|
||||
if (!session) {
|
||||
ws.close()
|
||||
@@ -312,7 +352,11 @@ export namespace Pty {
|
||||
}
|
||||
|
||||
owners.set(ws, id)
|
||||
session.subscribers.set(ws, { id: socketId, token: token(ws) })
|
||||
const sub = {
|
||||
id: socketId,
|
||||
connection: normalizeConnection(connectionID),
|
||||
}
|
||||
session.subscribers.set(ws, sub)
|
||||
|
||||
const cleanup = () => {
|
||||
session.subscribers.delete(ws)
|
||||
@@ -336,7 +380,7 @@ export namespace Pty {
|
||||
if (data) {
|
||||
try {
|
||||
for (let i = 0; i < data.length; i += BUFFER_CHUNK) {
|
||||
ws.send(data.slice(i, i + BUFFER_CHUNK))
|
||||
ws.send(output(sub.connection, data.slice(i, i + BUFFER_CHUNK)))
|
||||
}
|
||||
} catch {
|
||||
cleanup()
|
||||
@@ -346,15 +390,18 @@ export namespace Pty {
|
||||
}
|
||||
|
||||
try {
|
||||
ws.send(meta(end))
|
||||
ws.send(meta(end, sub.connection))
|
||||
} catch {
|
||||
cleanup()
|
||||
ws.close()
|
||||
return
|
||||
}
|
||||
return {
|
||||
onMessage: (message: string | ArrayBuffer) => {
|
||||
session.process.write(String(message))
|
||||
onMessage: (message: string | Uint8Array | ArrayBuffer) => {
|
||||
const next = input(message)
|
||||
if (!next?.data) return
|
||||
if (next.connection && next.connection !== sub.connection) return
|
||||
session.process.write(next.data)
|
||||
},
|
||||
onClose: () => {
|
||||
log.info("client disconnected from session", { id })
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
import { Hono } from "hono"
|
||||
import { describeRoute, validator, resolver } from "hono-openapi"
|
||||
import { upgradeWebSocket } from "hono/bun"
|
||||
import z from "zod"
|
||||
import { Pty } from "@/pty"
|
||||
import { NotFoundError } from "../../storage/db"
|
||||
import { errors } from "../error"
|
||||
import { lazy } from "../../util/lazy"
|
||||
import { upgradeWebSocket } from "../websocket"
|
||||
|
||||
export const PtyRoutes = lazy(() =>
|
||||
new Hono()
|
||||
@@ -158,6 +158,12 @@ export const PtyRoutes = lazy(() =>
|
||||
if (!Number.isSafeInteger(parsed) || parsed < -1) return
|
||||
return parsed
|
||||
})()
|
||||
const connection = (() => {
|
||||
const value = c.req.query("connection")
|
||||
if (!value) return
|
||||
if (value.length > 200) return
|
||||
return value
|
||||
})()
|
||||
let handler: ReturnType<typeof Pty.connect>
|
||||
if (!Pty.get(id)) throw new Error("Session not found")
|
||||
|
||||
@@ -177,15 +183,20 @@ export const PtyRoutes = lazy(() =>
|
||||
|
||||
return {
|
||||
onOpen(_event, ws) {
|
||||
const socket = ws.raw
|
||||
if (!isSocket(socket)) {
|
||||
if (!isSocket(ws)) {
|
||||
ws.close()
|
||||
return
|
||||
}
|
||||
handler = Pty.connect(id, socket, cursor)
|
||||
handler = Pty.connect(id, ws, cursor, connection)
|
||||
},
|
||||
onMessage(event) {
|
||||
if (typeof event.data !== "string") return
|
||||
if (
|
||||
typeof event.data !== "string" &&
|
||||
!(event.data instanceof ArrayBuffer) &&
|
||||
!(event.data instanceof Uint8Array)
|
||||
) {
|
||||
return
|
||||
}
|
||||
handler?.onMessage(event.data)
|
||||
},
|
||||
onClose() {
|
||||
|
||||
@@ -33,7 +33,7 @@ import { lazy } from "../util/lazy"
|
||||
import { InstanceBootstrap } from "../project/bootstrap"
|
||||
import { NotFoundError } from "../storage/db"
|
||||
import type { ContentfulStatusCode } from "hono/utils/http-status"
|
||||
import { websocket } from "hono/bun"
|
||||
import { websocket } from "./websocket"
|
||||
import { HTTPException } from "hono/http-exception"
|
||||
import { errors } from "./error"
|
||||
import { QuestionRoutes } from "./routes/question"
|
||||
|
||||
1
packages/opencode/src/server/websocket.ts
Normal file
1
packages/opencode/src/server/websocket.ts
Normal file
@@ -0,0 +1 @@
|
||||
export { createBunWebSocket, upgradeWebSocket, websocket } from "hono/bun"
|
||||
@@ -1,9 +1,73 @@
|
||||
import { describe, expect, test } from "bun:test"
|
||||
import { afterEach, beforeEach, describe, expect, test } from "bun:test"
|
||||
import { Instance } from "../../src/project/instance"
|
||||
import { Pty } from "../../src/pty"
|
||||
import { tmpdir } from "../fixture/fixture"
|
||||
|
||||
const encoder = new TextEncoder()
|
||||
const decoder = new TextDecoder()
|
||||
|
||||
const input = (connection: string, data: string) => {
|
||||
const channel = encoder.encode(connection)
|
||||
const body = encoder.encode(data)
|
||||
const out = new Uint8Array(2 + channel.length + body.length)
|
||||
out[0] = 2
|
||||
out[1] = channel.length
|
||||
out.set(channel, 2)
|
||||
out.set(body, 2 + channel.length)
|
||||
return out
|
||||
}
|
||||
|
||||
const output = (connection: string, data: unknown) => {
|
||||
if (typeof data === "string") return data
|
||||
if (!(data instanceof Uint8Array) && !(data instanceof ArrayBuffer)) return ""
|
||||
const bytes = data instanceof Uint8Array ? data : new Uint8Array(data)
|
||||
if (bytes[0] !== 1) return ""
|
||||
const size = bytes[1]
|
||||
if (!Number.isSafeInteger(size) || size < 0) return ""
|
||||
if (bytes.length < 2 + size) return ""
|
||||
const id = decoder.decode(bytes.subarray(2, 2 + size))
|
||||
if (id !== connection) return ""
|
||||
return decoder.decode(bytes.subarray(2 + size))
|
||||
}
|
||||
|
||||
const spawn = () => {
|
||||
let pid = 1000
|
||||
return () => {
|
||||
const data = new Set<(chunk: string) => void>()
|
||||
const exit = new Set<(event: { exitCode: number }) => void>()
|
||||
let closed = false
|
||||
|
||||
return {
|
||||
pid: ++pid,
|
||||
onData: (cb: (chunk: string) => void) => {
|
||||
data.add(cb)
|
||||
},
|
||||
onExit: (cb: (event: { exitCode: number }) => void) => {
|
||||
exit.add(cb)
|
||||
},
|
||||
resize: () => {},
|
||||
write: (chunk: string) => {
|
||||
if (closed) return
|
||||
for (const cb of data) cb(chunk)
|
||||
},
|
||||
kill: () => {
|
||||
if (closed) return
|
||||
closed = true
|
||||
for (const cb of exit) cb({ exitCode: 0 })
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
describe("pty", () => {
|
||||
beforeEach(() => {
|
||||
Pty.setSpawn(spawn() as unknown as Parameters<typeof Pty.setSpawn>[0])
|
||||
})
|
||||
|
||||
afterEach(() => {
|
||||
Pty.setSpawn()
|
||||
})
|
||||
|
||||
test("does not leak output when websocket objects are reused", async () => {
|
||||
await using dir = await tmpdir({ git: true })
|
||||
|
||||
@@ -18,9 +82,9 @@ describe("pty", () => {
|
||||
|
||||
const ws = {
|
||||
readyState: 1,
|
||||
data: { events: { connection: "a" } },
|
||||
send: (data: unknown) => {
|
||||
outA.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8"))
|
||||
const text = output("conn-a", data)
|
||||
if (text) outA.push(text)
|
||||
},
|
||||
close: () => {
|
||||
// no-op (simulate abrupt drop)
|
||||
@@ -28,14 +92,14 @@ describe("pty", () => {
|
||||
}
|
||||
|
||||
// Connect "a" first with ws.
|
||||
Pty.connect(a.id, ws as any)
|
||||
Pty.connect(a.id, ws as any, undefined, "conn-a")
|
||||
|
||||
// Now "reuse" the same ws object for another connection.
|
||||
ws.data = { events: { connection: "b" } }
|
||||
ws.send = (data: unknown) => {
|
||||
outB.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8"))
|
||||
const text = output("conn-b", data)
|
||||
if (text) outB.push(text)
|
||||
}
|
||||
Pty.connect(b.id, ws as any)
|
||||
Pty.connect(b.id, ws as any, undefined, "conn-b")
|
||||
|
||||
// Clear connect metadata writes.
|
||||
outA.length = 0
|
||||
@@ -54,7 +118,7 @@ describe("pty", () => {
|
||||
})
|
||||
})
|
||||
|
||||
test("does not leak output when Bun recycles websocket objects before re-connect", async () => {
|
||||
test("does not leak output when websocket objects are recycled before re-connect", async () => {
|
||||
await using dir = await tmpdir({ git: true })
|
||||
|
||||
await Instance.provide({
|
||||
@@ -67,9 +131,9 @@ describe("pty", () => {
|
||||
|
||||
const ws = {
|
||||
readyState: 1,
|
||||
data: { events: { connection: "a" } },
|
||||
send: (data: unknown) => {
|
||||
outA.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8"))
|
||||
const text = output("conn-a", data)
|
||||
if (text) outA.push(text)
|
||||
},
|
||||
close: () => {
|
||||
// no-op (simulate abrupt drop)
|
||||
@@ -77,14 +141,14 @@ describe("pty", () => {
|
||||
}
|
||||
|
||||
// Connect "a" first.
|
||||
Pty.connect(a.id, ws as any)
|
||||
Pty.connect(a.id, ws as any, undefined, "conn-a")
|
||||
outA.length = 0
|
||||
|
||||
// Simulate Bun reusing the same websocket object for another
|
||||
// connection before the next onOpen calls Pty.connect.
|
||||
ws.data = { events: { connection: "b" } }
|
||||
// Simulate websocket object reuse for another connection before
|
||||
// the next onOpen calls Pty.connect.
|
||||
ws.send = (data: unknown) => {
|
||||
outB.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8"))
|
||||
const text = output("conn-b", data)
|
||||
if (text) outB.push(text)
|
||||
}
|
||||
|
||||
Pty.write(a.id, "AAA\n")
|
||||
@@ -97,4 +161,42 @@ describe("pty", () => {
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("drops input frames that carry a different connection id", async () => {
|
||||
await using dir = await tmpdir({ git: true })
|
||||
|
||||
await Instance.provide({
|
||||
directory: dir.path,
|
||||
fn: async () => {
|
||||
const a = await Pty.create({ command: "cat", title: "a" })
|
||||
try {
|
||||
const out: string[] = []
|
||||
|
||||
const ws = {
|
||||
readyState: 1,
|
||||
send: (data: unknown) => {
|
||||
const text = output("conn-a", data)
|
||||
if (text) out.push(text)
|
||||
},
|
||||
close: () => {
|
||||
// no-op
|
||||
},
|
||||
}
|
||||
|
||||
const handler = Pty.connect(a.id, ws as any, undefined, "conn-a")
|
||||
out.length = 0
|
||||
|
||||
handler?.onMessage(input("conn-b", "BBB\n"))
|
||||
await Bun.sleep(100)
|
||||
expect(out.join("")).not.toContain("BBB")
|
||||
|
||||
handler?.onMessage(input("conn-a", "AAA\n"))
|
||||
await Bun.sleep(100)
|
||||
expect(out.join("")).toContain("AAA")
|
||||
} finally {
|
||||
await Pty.remove(a.id)
|
||||
}
|
||||
},
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user